| /* This Source Code Form is subject to the terms of the Mozilla Public |
| * License, v. 2.0. If a copy of the MPL was not distributed with this |
| * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ |
| |
| use std::cell::Cell; |
| use std::rc::Rc; |
| |
| use dom_struct::dom_struct; |
| use js::jsapi::{Heap, JSAutoRealm}; |
| use js::jsval::{JSVal, UndefinedValue}; |
| use js::rust::HandleValue as SafeHandleValue; |
| |
| use super::bindings::reflector::reflect_dom_object; |
| use super::bindings::root::DomRoot; |
| use super::bindings::structuredclone; |
| use crate::dom::bindings::reflector::{DomGlobal, Reflector}; |
| use crate::dom::bindings::root::Dom; |
| use crate::dom::bindings::trace::RootedTraceableBox; |
| use crate::dom::defaultteeunderlyingsource::DefaultTeeUnderlyingSource; |
| use crate::dom::globalscope::GlobalScope; |
| use crate::dom::promise::Promise; |
| use crate::dom::readablestream::ReadableStream; |
| use crate::microtask::{Microtask, MicrotaskRunnable}; |
| use crate::realms::enter_realm; |
| use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; |
| |
| #[derive(JSTraceable, MallocSizeOf)] |
| #[cfg_attr(crown, allow(crown::unrooted_must_root))] |
| pub(crate) struct DefaultTeeReadRequestMicrotask { |
| #[ignore_malloc_size_of = "mozjs"] |
| chunk: Box<Heap<JSVal>>, |
| tee_read_request: Dom<DefaultTeeReadRequest>, |
| } |
| |
| impl MicrotaskRunnable for DefaultTeeReadRequestMicrotask { |
| fn handler(&self, can_gc: CanGc) { |
| let cx = GlobalScope::get_cx(); |
| self.tee_read_request.chunk_steps(cx, &self.chunk, can_gc); |
| } |
| |
| fn enter_realm(&self) -> JSAutoRealm { |
| enter_realm(&*self.tee_read_request) |
| } |
| } |
| |
| #[dom_struct] |
| /// <https://streams.spec.whatwg.org/#ref-for-read-request%E2%91%A2> |
| pub(crate) struct DefaultTeeReadRequest { |
| reflector_: Reflector, |
| stream: Dom<ReadableStream>, |
| branch_1: Dom<ReadableStream>, |
| branch_2: Dom<ReadableStream>, |
| #[ignore_malloc_size_of = "Rc"] |
| reading: Rc<Cell<bool>>, |
| #[ignore_malloc_size_of = "Rc"] |
| read_again: Rc<Cell<bool>>, |
| #[ignore_malloc_size_of = "Rc"] |
| canceled_1: Rc<Cell<bool>>, |
| #[ignore_malloc_size_of = "Rc"] |
| canceled_2: Rc<Cell<bool>>, |
| #[ignore_malloc_size_of = "Rc"] |
| clone_for_branch_2: Rc<Cell<bool>>, |
| #[ignore_malloc_size_of = "Rc"] |
| cancel_promise: Rc<Promise>, |
| tee_underlying_source: Dom<DefaultTeeUnderlyingSource>, |
| } |
| impl DefaultTeeReadRequest { |
| #[allow(clippy::too_many_arguments)] |
| #[cfg_attr(crown, allow(crown::unrooted_must_root))] |
| pub(crate) fn new( |
| stream: &ReadableStream, |
| branch_1: &ReadableStream, |
| branch_2: &ReadableStream, |
| reading: Rc<Cell<bool>>, |
| read_again: Rc<Cell<bool>>, |
| canceled_1: Rc<Cell<bool>>, |
| canceled_2: Rc<Cell<bool>>, |
| clone_for_branch_2: Rc<Cell<bool>>, |
| cancel_promise: Rc<Promise>, |
| tee_underlying_source: &DefaultTeeUnderlyingSource, |
| can_gc: CanGc, |
| ) -> DomRoot<Self> { |
| reflect_dom_object( |
| Box::new(DefaultTeeReadRequest { |
| reflector_: Reflector::new(), |
| stream: Dom::from_ref(stream), |
| branch_1: Dom::from_ref(branch_1), |
| branch_2: Dom::from_ref(branch_2), |
| reading, |
| read_again, |
| canceled_1, |
| canceled_2, |
| clone_for_branch_2, |
| cancel_promise, |
| tee_underlying_source: Dom::from_ref(tee_underlying_source), |
| }), |
| &*stream.global(), |
| can_gc, |
| ) |
| } |
| /// Call into cancel of the stream, |
| /// <https://streams.spec.whatwg.org/#readable-stream-cancel> |
| pub(crate) fn stream_cancel( |
| &self, |
| cx: SafeJSContext, |
| global: &GlobalScope, |
| reason: SafeHandleValue, |
| can_gc: CanGc, |
| ) { |
| self.stream.cancel(cx, global, reason, can_gc); |
| } |
| /// Enqueue a microtask to perform the chunk steps |
| /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A2> |
| pub(crate) fn enqueue_chunk_steps(&self, chunk: RootedTraceableBox<Heap<JSVal>>) { |
| // Queue a microtask to perform the following steps: |
| let tee_read_request_chunk = DefaultTeeReadRequestMicrotask { |
| chunk: Heap::boxed(*chunk.handle()), |
| tee_read_request: Dom::from_ref(self), |
| }; |
| let global = self.stream.global(); |
| let microtask_queue = global.microtask_queue(); |
| let cx = GlobalScope::get_cx(); |
| microtask_queue.enqueue( |
| Microtask::ReadableStreamTeeReadRequest(tee_read_request_chunk), |
| cx, |
| ); |
| } |
| /// <https://streams.spec.whatwg.org/#ref-for-read-request-chunk-steps%E2%91%A2> |
| #[allow(clippy::borrowed_box)] |
| pub(crate) fn chunk_steps(&self, cx: SafeJSContext, chunk: &Box<Heap<JSVal>>, can_gc: CanGc) { |
| let global = &self.stream.global(); |
| // Set readAgain to false. |
| self.read_again.set(false); |
| // Let chunk1 and chunk2 be chunk. |
| let chunk1 = chunk; |
| let chunk2 = chunk; |
| |
| rooted!(in(*cx) let chunk1_value = chunk1.get()); |
| rooted!(in(*cx) let chunk2_value = chunk2.get()); |
| // If canceled_2 is false and cloneForBranch2 is true, |
| if !self.canceled_2.get() && self.clone_for_branch_2.get() { |
| // Let cloneResult be StructuredClone(chunk2). |
| rooted!(in(*cx) let mut clone_result = UndefinedValue()); |
| let data = structuredclone::write(cx, chunk2_value.handle(), None).unwrap(); |
| // If cloneResult is an abrupt completion, |
| if structuredclone::read(global, data, clone_result.handle_mut()).is_err() { |
| // Perform ! ReadableStreamDefaultControllerError(branch_1.[[controller]], cloneResult.[[Value]]). |
| self.readable_stream_default_controller_error( |
| &self.branch_1, |
| clone_result.handle(), |
| can_gc, |
| ); |
| |
| // Perform ! ReadableStreamDefaultControllerError(branch_2.[[controller]], cloneResult.[[Value]]). |
| self.readable_stream_default_controller_error( |
| &self.branch_2, |
| clone_result.handle(), |
| can_gc, |
| ); |
| // Resolve cancelPromise with ! ReadableStreamCancel(stream, cloneResult.[[Value]]). |
| self.stream_cancel(cx, global, clone_result.handle(), can_gc); |
| // Return. |
| return; |
| } else { |
| // Otherwise, set chunk2 to cloneResult.[[Value]]. |
| chunk2.set(*clone_result); |
| } |
| } |
| // If canceled_1 is false, perform ! ReadableStreamDefaultControllerEnqueue(branch_1.[[controller]], chunk1). |
| if !self.canceled_1.get() { |
| self.readable_stream_default_controller_enqueue( |
| &self.branch_1, |
| chunk1_value.handle(), |
| can_gc, |
| ); |
| } |
| // If canceled_2 is false, perform ! ReadableStreamDefaultControllerEnqueue(branch_2.[[controller]], chunk2). |
| if !self.canceled_2.get() { |
| self.readable_stream_default_controller_enqueue( |
| &self.branch_2, |
| chunk2_value.handle(), |
| can_gc, |
| ); |
| } |
| // Set reading to false. |
| self.reading.set(false); |
| // If readAgain is true, perform pullAlgorithm. |
| if self.read_again.get() { |
| self.pull_algorithm(can_gc); |
| } |
| } |
| /// <https://streams.spec.whatwg.org/#read-request-close-steps> |
| pub(crate) fn close_steps(&self, can_gc: CanGc) { |
| // Set reading to false. |
| self.reading.set(false); |
| // If canceled_1 is false, perform ! ReadableStreamDefaultControllerClose(branch_1.[[controller]]). |
| if !self.canceled_1.get() { |
| self.readable_stream_default_controller_close(&self.branch_1, can_gc); |
| } |
| // If canceled_2 is false, perform ! ReadableStreamDefaultControllerClose(branch_2.[[controller]]). |
| if !self.canceled_2.get() { |
| self.readable_stream_default_controller_close(&self.branch_2, can_gc); |
| } |
| // If canceled_1 is false or canceled_2 is false, resolve cancelPromise with undefined. |
| if !self.canceled_1.get() || !self.canceled_2.get() { |
| self.cancel_promise.resolve_native(&(), can_gc); |
| } |
| } |
| /// <https://streams.spec.whatwg.org/#read-request-error-steps> |
| pub(crate) fn error_steps(&self) { |
| // Set reading to false. |
| self.reading.set(false); |
| } |
| /// Call into enqueue of the default controller of a stream, |
| /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-enqueue> |
| fn readable_stream_default_controller_enqueue( |
| &self, |
| stream: &ReadableStream, |
| chunk: SafeHandleValue, |
| can_gc: CanGc, |
| ) { |
| stream |
| .get_default_controller() |
| .enqueue(GlobalScope::get_cx(), chunk, can_gc) |
| .expect("enqueue failed for stream controller in DefaultTeeReadRequest"); |
| } |
| |
| /// Call into close of the default controller of a stream, |
| /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-close> |
| fn readable_stream_default_controller_close(&self, stream: &ReadableStream, can_gc: CanGc) { |
| stream.get_default_controller().close(can_gc); |
| } |
| |
| /// Call into error of the default controller of stream, |
| /// <https://streams.spec.whatwg.org/#readable-stream-default-controller-error> |
| fn readable_stream_default_controller_error( |
| &self, |
| stream: &ReadableStream, |
| error: SafeHandleValue, |
| can_gc: CanGc, |
| ) { |
| stream.get_default_controller().error(error, can_gc); |
| } |
| |
| pub(crate) fn pull_algorithm(&self, can_gc: CanGc) { |
| self.tee_underlying_source.pull_algorithm(can_gc); |
| } |
| } |