| /* This Source Code Form is subject to the terms of the Mozilla Public |
| * License, v. 2.0. If a copy of the MPL was not distributed with this |
| * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ |
| |
| use std::cell::{Cell, RefCell}; |
| use std::ptr; |
| use std::rc::Rc; |
| |
| use dom_struct::dom_struct; |
| use js::jsapi::{Heap, IsPromiseObject, JSObject}; |
| use js::jsval::{JSVal, UndefinedValue}; |
| use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue, IntoHandle}; |
| |
| use super::bindings::codegen::Bindings::QueuingStrategyBinding::QueuingStrategySize; |
| use super::types::TransformStream; |
| use crate::dom::bindings::callback::ExceptionHandling; |
| use crate::dom::bindings::codegen::Bindings::UnderlyingSinkBinding::{ |
| UnderlyingSinkAbortCallback, UnderlyingSinkCloseCallback, UnderlyingSinkStartCallback, |
| UnderlyingSinkWriteCallback, |
| }; |
| use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultControllerBinding::WritableStreamDefaultControllerMethods; |
| use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible}; |
| use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object}; |
| use crate::dom::bindings::root::{Dom, DomRoot, MutNullableDom}; |
| use crate::dom::globalscope::GlobalScope; |
| use crate::dom::messageport::MessagePort; |
| use crate::dom::promise::Promise; |
| use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler}; |
| use crate::dom::readablestreamdefaultcontroller::{EnqueuedValue, QueueWithSizes, ValueWithSize}; |
| use crate::dom::types::{AbortController, AbortSignal}; |
| use crate::dom::writablestream::WritableStream; |
| use crate::realms::{InRealm, enter_realm}; |
| use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; |
| |
| impl js::gc::Rootable for CloseAlgorithmFulfillmentHandler {} |
| |
| /// The fulfillment handler for |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close> |
| #[derive(Clone, JSTraceable, MallocSizeOf)] |
| #[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] |
| struct CloseAlgorithmFulfillmentHandler { |
| stream: Dom<WritableStream>, |
| } |
| |
| impl Callback for CloseAlgorithmFulfillmentHandler { |
| fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) { |
| let stream = self.stream.as_rooted(); |
| |
| // Perform ! WritableStreamFinishInFlightClose(stream). |
| stream.finish_in_flight_close(cx, can_gc); |
| } |
| } |
| |
| impl js::gc::Rootable for CloseAlgorithmRejectionHandler {} |
| |
| /// The rejection handler for |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close> |
| #[derive(Clone, JSTraceable, MallocSizeOf)] |
| #[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] |
| struct CloseAlgorithmRejectionHandler { |
| stream: Dom<WritableStream>, |
| } |
| |
| impl Callback for CloseAlgorithmRejectionHandler { |
| fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, realm: InRealm, can_gc: CanGc) { |
| let stream = self.stream.as_rooted(); |
| |
| let global = GlobalScope::from_safe_context(cx, realm); |
| |
| // Perform ! WritableStreamFinishInFlightCloseWithError(stream, reason). |
| stream.finish_in_flight_close_with_error(cx, &global, v, can_gc); |
| } |
| } |
| |
| impl js::gc::Rootable for StartAlgorithmFulfillmentHandler {} |
| |
| /// The fulfillment handler for |
| /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller> |
| #[derive(Clone, JSTraceable, MallocSizeOf)] |
| #[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] |
| struct StartAlgorithmFulfillmentHandler { |
| controller: Dom<WritableStreamDefaultController>, |
| } |
| |
| impl Callback for StartAlgorithmFulfillmentHandler { |
| /// Continuation of <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller> |
| /// Upon fulfillment of startPromise, |
| fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, realm: InRealm, can_gc: CanGc) { |
| let controller = self.controller.as_rooted(); |
| let stream = controller |
| .stream |
| .get() |
| .expect("Controller should have a stream."); |
| |
| // Assert: stream.[[state]] is "writable" or "erroring". |
| assert!(stream.is_erroring() || stream.is_writable()); |
| |
| // Set controller.[[started]] to true. |
| controller.started.set(true); |
| |
| let global = GlobalScope::from_safe_context(cx, realm); |
| |
| // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller). |
| controller.advance_queue_if_needed(cx, &global, can_gc) |
| } |
| } |
| |
| impl js::gc::Rootable for StartAlgorithmRejectionHandler {} |
| |
| /// The rejection handler for |
| /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller> |
| #[derive(Clone, JSTraceable, MallocSizeOf)] |
| #[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] |
| struct StartAlgorithmRejectionHandler { |
| controller: Dom<WritableStreamDefaultController>, |
| } |
| |
| impl Callback for StartAlgorithmRejectionHandler { |
| /// Continuation of <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller> |
| /// Upon rejection of startPromise with reason r, |
| fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, realm: InRealm, can_gc: CanGc) { |
| let controller = self.controller.as_rooted(); |
| let stream = controller |
| .stream |
| .get() |
| .expect("Controller should have a stream."); |
| |
| // Assert: stream.[[state]] is "writable" or "erroring". |
| assert!(stream.is_erroring() || stream.is_writable()); |
| |
| // Set controller.[[started]] to true. |
| controller.started.set(true); |
| |
| let global = GlobalScope::from_safe_context(cx, realm); |
| |
| // Perform ! WritableStreamDealWithRejection(stream, r). |
| stream.deal_with_rejection(cx, &global, v, can_gc); |
| } |
| } |
| |
| impl js::gc::Rootable for TransferBackPressurePromiseReaction {} |
| |
| /// Reacting to backpressurePromise as part of the `writeAlgorithm` of |
| /// <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> |
| #[derive(JSTraceable, MallocSizeOf)] |
| #[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] |
| struct TransferBackPressurePromiseReaction { |
| /// The result of reacting to backpressurePromise. |
| #[ignore_malloc_size_of = "Rc is hard"] |
| result_promise: Rc<Promise>, |
| |
| /// The backpressurePromise. |
| #[ignore_malloc_size_of = "Rc is hard"] |
| backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>, |
| |
| /// The chunk received by the `writeAlgorithm`. |
| #[ignore_malloc_size_of = "mozjs"] |
| chunk: Box<Heap<JSVal>>, |
| |
| /// The port used in the algorithm. |
| port: Dom<MessagePort>, |
| } |
| |
| impl Callback for TransferBackPressurePromiseReaction { |
| /// Reacting to backpressurePromise with the following fulfillment steps: |
| fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, _realm: InRealm, can_gc: CanGc) { |
| let global = self.result_promise.global(); |
| // Set backpressurePromise to a new promise. |
| *self.backpressure_promise.borrow_mut() = Some(Promise::new(&global, can_gc)); |
| |
| // Let result be PackAndPostMessageHandlingError(port, "chunk", chunk). |
| rooted!(in(*cx) let mut chunk = UndefinedValue()); |
| chunk.set(self.chunk.get()); |
| let result = |
| self.port |
| .pack_and_post_message_handling_error("chunk", chunk.handle(), can_gc); |
| |
| // If result is an abrupt completion, |
| if let Err(error) = result { |
| // Disentangle port. |
| global.disentangle_port(&self.port, can_gc); |
| |
| // Return a promise rejected with result.[[Value]]. |
| self.result_promise.reject_error(error, can_gc); |
| } else { |
| // Otherwise, return a promise resolved with undefined. |
| self.result_promise.resolve_native(&(), can_gc); |
| } |
| } |
| } |
| |
| impl js::gc::Rootable for WriteAlgorithmFulfillmentHandler {} |
| |
| /// The fulfillment handler for |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write> |
| #[derive(Clone, JSTraceable, MallocSizeOf)] |
| #[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] |
| struct WriteAlgorithmFulfillmentHandler { |
| controller: Dom<WritableStreamDefaultController>, |
| } |
| |
| impl Callback for WriteAlgorithmFulfillmentHandler { |
| fn callback(&self, cx: SafeJSContext, _v: SafeHandleValue, realm: InRealm, can_gc: CanGc) { |
| let controller = self.controller.as_rooted(); |
| let stream = controller |
| .stream |
| .get() |
| .expect("Controller should have a stream."); |
| |
| // Perform ! WritableStreamFinishInFlightWrite(stream). |
| stream.finish_in_flight_write(can_gc); |
| |
| // Let state be stream.[[state]]. |
| // Assert: state is "writable" or "erroring". |
| assert!(stream.is_erroring() || stream.is_writable()); |
| |
| // Perform ! DequeueValue(controller). |
| { |
| rooted!(in(*cx) let mut rval = UndefinedValue()); |
| let mut queue = controller.queue.borrow_mut(); |
| queue.dequeue_value(cx, Some(rval.handle_mut()), can_gc); |
| } |
| |
| let global = GlobalScope::from_safe_context(cx, realm); |
| |
| // If ! WritableStreamCloseQueuedOrInFlight(stream) is false and state is "writable", |
| if !stream.close_queued_or_in_flight() && stream.is_writable() { |
| // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller). |
| let backpressure = controller.get_backpressure(); |
| |
| // Perform ! WritableStreamUpdateBackpressure(stream, backpressure). |
| stream.update_backpressure(backpressure, &global, can_gc); |
| } |
| |
| // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller). |
| controller.advance_queue_if_needed(cx, &global, can_gc) |
| } |
| } |
| |
| impl js::gc::Rootable for WriteAlgorithmRejectionHandler {} |
| |
| /// The rejection handler for |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write> |
| #[derive(Clone, JSTraceable, MallocSizeOf)] |
| #[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] |
| struct WriteAlgorithmRejectionHandler { |
| controller: Dom<WritableStreamDefaultController>, |
| } |
| |
| impl Callback for WriteAlgorithmRejectionHandler { |
| fn callback(&self, cx: SafeJSContext, v: SafeHandleValue, realm: InRealm, can_gc: CanGc) { |
| let controller = self.controller.as_rooted(); |
| let stream = controller |
| .stream |
| .get() |
| .expect("Controller should have a stream."); |
| |
| // If stream.[[state]] is "writable", |
| if stream.is_writable() { |
| // perform ! WritableStreamDefaultControllerClearAlgorithms(controller). |
| controller.clear_algorithms(); |
| } |
| |
| let global = GlobalScope::from_safe_context(cx, realm); |
| |
| // Perform ! WritableStreamFinishInFlightWriteWithError(stream, reason). |
| stream.finish_in_flight_write_with_error(cx, &global, v, can_gc); |
| } |
| } |
| |
| /// The type of sink algorithms we are using. |
| #[derive(JSTraceable, PartialEq)] |
| #[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)] |
| pub enum UnderlyingSinkType { |
| /// Algorithms are provided by Js callbacks. |
| Js { |
| /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortalgorithm> |
| abort: RefCell<Option<Rc<UnderlyingSinkAbortCallback>>>, |
| |
| start: RefCell<Option<Rc<UnderlyingSinkStartCallback>>>, |
| |
| /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-closealgorithm> |
| close: RefCell<Option<Rc<UnderlyingSinkCloseCallback>>>, |
| |
| /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-writealgorithm> |
| write: RefCell<Option<Rc<UnderlyingSinkWriteCallback>>>, |
| }, |
| /// Algorithms supporting streams transfer are implemented in Rust. |
| /// The promise and port used in those algorithms are stored here. |
| Transfer { |
| backpressure_promise: Rc<RefCell<Option<Rc<Promise>>>>, |
| port: Dom<MessagePort>, |
| }, |
| /// Algorithms supporting transform streams are implemented in Rust. |
| Transform(Dom<TransformStream>, Rc<Promise>), |
| } |
| |
| impl UnderlyingSinkType { |
| pub(crate) fn new_js( |
| abort: Option<Rc<UnderlyingSinkAbortCallback>>, |
| start: Option<Rc<UnderlyingSinkStartCallback>>, |
| close: Option<Rc<UnderlyingSinkCloseCallback>>, |
| write: Option<Rc<UnderlyingSinkWriteCallback>>, |
| ) -> Self { |
| UnderlyingSinkType::Js { |
| abort: RefCell::new(abort), |
| start: RefCell::new(start), |
| close: RefCell::new(close), |
| write: RefCell::new(write), |
| } |
| } |
| } |
| |
| /// <https://streams.spec.whatwg.org/#ws-default-controller-class> |
| #[dom_struct] |
| pub struct WritableStreamDefaultController { |
| reflector_: Reflector, |
| |
| /// The type of underlying sink used. Besides the default JS one, |
| /// there will be others for stream transfer, and for transform stream. |
| #[ignore_malloc_size_of = "underlying_sink_type"] |
| underlying_sink_type: UnderlyingSinkType, |
| |
| /// The JS object used as `this` when invoking sink algorithms. |
| #[ignore_malloc_size_of = "mozjs"] |
| underlying_sink_obj: Heap<*mut JSObject>, |
| |
| /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-queue> |
| queue: RefCell<QueueWithSizes>, |
| |
| /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-started> |
| started: Cell<bool>, |
| |
| /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-strategyhwm> |
| strategy_hwm: f64, |
| |
| /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-strategysizealgorithm> |
| #[ignore_malloc_size_of = "Rc is hard"] |
| strategy_size: RefCell<Option<Rc<QueuingStrategySize>>>, |
| |
| /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-stream> |
| stream: MutNullableDom<WritableStream>, |
| |
| /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-abortcontroller> |
| abort_controller: Dom<AbortController>, |
| } |
| |
| impl WritableStreamDefaultController { |
| /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller-from-underlying-sink> |
| #[cfg_attr(crown, allow(crown::unrooted_must_root))] |
| fn new_inherited( |
| global: &GlobalScope, |
| underlying_sink_type: UnderlyingSinkType, |
| strategy_hwm: f64, |
| strategy_size: Rc<QueuingStrategySize>, |
| can_gc: CanGc, |
| ) -> WritableStreamDefaultController { |
| WritableStreamDefaultController { |
| reflector_: Reflector::new(), |
| underlying_sink_type, |
| queue: Default::default(), |
| stream: Default::default(), |
| underlying_sink_obj: Default::default(), |
| strategy_hwm, |
| strategy_size: RefCell::new(Some(strategy_size)), |
| started: Default::default(), |
| abort_controller: Dom::from_ref(&AbortController::new_with_proto(global, None, can_gc)), |
| } |
| } |
| |
| #[cfg_attr(crown, allow(crown::unrooted_must_root))] |
| pub(crate) fn new( |
| global: &GlobalScope, |
| underlying_sink_type: UnderlyingSinkType, |
| strategy_hwm: f64, |
| strategy_size: Rc<QueuingStrategySize>, |
| can_gc: CanGc, |
| ) -> DomRoot<WritableStreamDefaultController> { |
| reflect_dom_object( |
| Box::new(WritableStreamDefaultController::new_inherited( |
| global, |
| underlying_sink_type, |
| strategy_hwm, |
| strategy_size, |
| can_gc, |
| )), |
| global, |
| can_gc, |
| ) |
| } |
| |
| pub(crate) fn started(&self) -> bool { |
| self.started.get() |
| } |
| |
| /// Setting the JS object after the heap has settled down. |
| pub(crate) fn set_underlying_sink_this_object(&self, this_object: SafeHandleObject) { |
| self.underlying_sink_obj.set(*this_object); |
| } |
| |
| /// "Signal abort" call from <https://streams.spec.whatwg.org/#writable-stream-abort> |
| pub(crate) fn signal_abort( |
| &self, |
| cx: SafeJSContext, |
| reason: SafeHandleValue, |
| realm: InRealm, |
| can_gc: CanGc, |
| ) { |
| self.abort_controller |
| .signal_abort(cx, reason, realm, can_gc); |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-clear-algorithms> |
| fn clear_algorithms(&self) { |
| match &self.underlying_sink_type { |
| UnderlyingSinkType::Js { |
| abort, |
| start: _, |
| close, |
| write, |
| } => { |
| // Set controller.[[writeAlgorithm]] to undefined. |
| write.borrow_mut().take(); |
| |
| // Set controller.[[closeAlgorithm]] to undefined. |
| close.borrow_mut().take(); |
| |
| // Set controller.[[abortAlgorithm]] to undefined. |
| abort.borrow_mut().take(); |
| }, |
| UnderlyingSinkType::Transfer { |
| backpressure_promise, |
| .. |
| } => { |
| backpressure_promise.borrow_mut().take(); |
| }, |
| UnderlyingSinkType::Transform(_, _) => { |
| return; |
| }, |
| } |
| |
| // Set controller.[[strategySizeAlgorithm]] to undefined. |
| self.strategy_size.borrow_mut().take(); |
| } |
| |
| /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-controller> |
| pub(crate) fn setup( |
| &self, |
| cx: SafeJSContext, |
| global: &GlobalScope, |
| stream: &WritableStream, |
| can_gc: CanGc, |
| ) -> Result<(), Error> { |
| // Assert: stream implements WritableStream. |
| // Implied by stream type. |
| |
| // Assert: stream.[[controller]] is undefined. |
| stream.assert_no_controller(); |
| |
| // Set controller.[[stream]] to stream. |
| self.stream.set(Some(stream)); |
| |
| // Set stream.[[controller]] to controller. |
| stream.set_default_controller(self); |
| |
| // Perform ! ResetQueue(controller). |
| |
| // Set controller.[[abortController]] to a new AbortController. |
| |
| // Set controller.[[started]] to false. |
| |
| // Set controller.[[strategySizeAlgorithm]] to sizeAlgorithm. |
| |
| // Set controller.[[strategyHWM]] to highWaterMark. |
| |
| // Set controller.[[writeAlgorithm]] to writeAlgorithm. |
| |
| // Set controller.[[closeAlgorithm]] to closeAlgorithm. |
| |
| // Set controller.[[abortAlgorithm]] to abortAlgorithm. |
| |
| // Note: above steps are done in `new_inherited`. |
| |
| // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller). |
| let backpressure = self.get_backpressure(); |
| |
| // Perform ! WritableStreamUpdateBackpressure(stream, backpressure). |
| stream.update_backpressure(backpressure, global, can_gc); |
| |
| // Let startResult be the result of performing startAlgorithm. (This may throw an exception.) |
| // Let startPromise be a promise resolved with startResult. |
| let start_promise = self.start_algorithm(cx, global, can_gc)?; |
| |
| let rooted_default_controller = DomRoot::from_ref(self); |
| |
| // Upon fulfillment of startPromise, |
| rooted!(in(*cx) let mut fulfillment_handler = Some(StartAlgorithmFulfillmentHandler { |
| controller: Dom::from_ref(&rooted_default_controller), |
| })); |
| |
| // Upon rejection of startPromise with reason r, |
| rooted!(in(*cx) let mut rejection_handler = Some(StartAlgorithmRejectionHandler { |
| controller: Dom::from_ref(&rooted_default_controller), |
| })); |
| |
| let handler = PromiseNativeHandler::new( |
| global, |
| fulfillment_handler.take().map(|h| Box::new(h) as Box<_>), |
| rejection_handler.take().map(|h| Box::new(h) as Box<_>), |
| can_gc, |
| ); |
| let realm = enter_realm(global); |
| let comp = InRealm::Entered(&realm); |
| start_promise.append_native_handler(&handler, comp, can_gc); |
| |
| Ok(()) |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-close> |
| pub(crate) fn close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) { |
| // Perform ! EnqueueValueWithSize(controller, close sentinel, 0). |
| { |
| let mut queue = self.queue.borrow_mut(); |
| queue |
| .enqueue_value_with_size(EnqueuedValue::CloseSentinel) |
| .expect("Enqueuing the close sentinel should not fail."); |
| } |
| // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller). |
| self.advance_queue_if_needed(cx, global, can_gc); |
| } |
| |
| #[allow(unsafe_code)] |
| fn start_algorithm( |
| &self, |
| cx: SafeJSContext, |
| global: &GlobalScope, |
| can_gc: CanGc, |
| ) -> Fallible<Rc<Promise>> { |
| match &self.underlying_sink_type { |
| UnderlyingSinkType::Js { |
| start, |
| abort: _, |
| close: _, |
| write: _, |
| } => { |
| let algo = start.borrow().clone(); |
| let start_promise = if let Some(start) = algo { |
| rooted!(in(*cx) let mut result_object = ptr::null_mut::<JSObject>()); |
| rooted!(in(*cx) let mut result: JSVal); |
| rooted!(in(*cx) let this_object = self.underlying_sink_obj.get()); |
| start.Call_( |
| &this_object.handle(), |
| self, |
| result.handle_mut(), |
| ExceptionHandling::Rethrow, |
| can_gc, |
| )?; |
| let is_promise = unsafe { |
| if result.is_object() { |
| result_object.set(result.to_object()); |
| IsPromiseObject(result_object.handle().into_handle()) |
| } else { |
| false |
| } |
| }; |
| if is_promise { |
| Promise::new_with_js_promise(result_object.handle(), cx) |
| } else { |
| Promise::new_resolved(global, cx, result.get(), can_gc) |
| } |
| } else { |
| // Let startAlgorithm be an algorithm that returns undefined. |
| Promise::new_resolved(global, cx, (), can_gc) |
| }; |
| |
| Ok(start_promise) |
| }, |
| UnderlyingSinkType::Transfer { .. } => { |
| // Let startAlgorithm be an algorithm that returns undefined. |
| Ok(Promise::new_resolved(global, cx, (), can_gc)) |
| }, |
| UnderlyingSinkType::Transform(_, start_promise) => { |
| // Let startAlgorithm be an algorithm that returns startPromise. |
| Ok(start_promise.clone()) |
| }, |
| } |
| } |
| |
| /// <https://streams.spec.whatwg.org/#ref-for-abstract-opdef-writablestreamcontroller-abortsteps> |
| pub(crate) fn abort_steps( |
| &self, |
| cx: SafeJSContext, |
| global: &GlobalScope, |
| reason: SafeHandleValue, |
| can_gc: CanGc, |
| ) -> Rc<Promise> { |
| let result = match &self.underlying_sink_type { |
| UnderlyingSinkType::Js { |
| abort, |
| start: _, |
| close: _, |
| write: _, |
| } => { |
| rooted!(in(*cx) let this_object = self.underlying_sink_obj.get()); |
| let algo = abort.borrow().clone(); |
| // Let result be the result of performing this.[[abortAlgorithm]], passing reason. |
| let result = if let Some(algo) = algo { |
| algo.Call_( |
| &this_object.handle(), |
| Some(reason), |
| ExceptionHandling::Rethrow, |
| can_gc, |
| ) |
| } else { |
| Ok(Promise::new_resolved(global, cx, (), can_gc)) |
| }; |
| result.unwrap_or_else(|e| { |
| let promise = Promise::new(global, can_gc); |
| promise.reject_error(e, can_gc); |
| promise |
| }) |
| }, |
| UnderlyingSinkType::Transfer { port, .. } => { |
| // The steps from the `abortAlgorithm` at |
| // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> |
| |
| // Let result be PackAndPostMessageHandlingError(port, "error", reason). |
| let result = port.pack_and_post_message_handling_error("error", reason, can_gc); |
| |
| // Disentangle port. |
| global.disentangle_port(port, can_gc); |
| |
| let promise = Promise::new(global, can_gc); |
| |
| // If result is an abrupt completion, return a promise rejected with result.[[Value]] |
| if let Err(error) = result { |
| promise.reject_error(error, can_gc); |
| } else { |
| // Otherwise, return a promise resolved with undefined. |
| promise.resolve_native(&(), can_gc); |
| } |
| promise |
| }, |
| UnderlyingSinkType::Transform(stream, _) => { |
| // Return ! TransformStreamDefaultSinkAbortAlgorithm(stream, reason). |
| stream |
| .transform_stream_default_sink_abort_algorithm(cx, global, reason, can_gc) |
| .expect("Transform stream default sink abort algorithm should not fail.") |
| }, |
| }; |
| |
| // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller). |
| self.clear_algorithms(); |
| |
| result |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-writealgorithm> |
| fn call_write_algorithm( |
| &self, |
| cx: SafeJSContext, |
| chunk: SafeHandleValue, |
| global: &GlobalScope, |
| can_gc: CanGc, |
| ) -> Rc<Promise> { |
| match &self.underlying_sink_type { |
| UnderlyingSinkType::Js { |
| abort: _, |
| start: _, |
| close: _, |
| write, |
| } => { |
| rooted!(in(*cx) let this_object = self.underlying_sink_obj.get()); |
| let algo = write.borrow().clone(); |
| let result = if let Some(algo) = algo { |
| algo.Call_( |
| &this_object.handle(), |
| chunk, |
| self, |
| ExceptionHandling::Rethrow, |
| can_gc, |
| ) |
| } else { |
| Ok(Promise::new_resolved(global, cx, (), can_gc)) |
| }; |
| result.unwrap_or_else(|e| { |
| let promise = Promise::new(global, can_gc); |
| promise.reject_error(e, can_gc); |
| promise |
| }) |
| }, |
| UnderlyingSinkType::Transfer { |
| backpressure_promise, |
| port, |
| } => { |
| // The steps from the `writeAlgorithm` at |
| // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> |
| |
| { |
| // If backpressurePromise is undefined, |
| // set backpressurePromise to a promise resolved with undefined. |
| let mut backpressure_promise = backpressure_promise.borrow_mut(); |
| if backpressure_promise.is_none() { |
| *backpressure_promise = Some(Promise::new_resolved(global, cx, (), can_gc)); |
| } |
| } |
| |
| // Return the result of reacting to backpressurePromise with the following fulfillment steps: |
| let result_promise = Promise::new(global, can_gc); |
| rooted!(in(*cx) let mut fulfillment_handler = Some(TransferBackPressurePromiseReaction { |
| port: port.clone(), |
| backpressure_promise: backpressure_promise.clone(), |
| chunk: Heap::boxed(chunk.get()), |
| result_promise: result_promise.clone(), |
| })); |
| let handler = PromiseNativeHandler::new( |
| global, |
| fulfillment_handler.take().map(|h| Box::new(h) as Box<_>), |
| None, |
| can_gc, |
| ); |
| let realm = enter_realm(global); |
| let comp = InRealm::Entered(&realm); |
| backpressure_promise |
| .borrow() |
| .as_ref() |
| .expect("Promise must be some by now.") |
| .append_native_handler(&handler, comp, can_gc); |
| result_promise |
| }, |
| UnderlyingSinkType::Transform(stream, _) => { |
| // Return ! TransformStreamDefaultSinkWriteAlgorithm(stream, chunk). |
| stream |
| .transform_stream_default_sink_write_algorithm(cx, global, chunk, can_gc) |
| .expect("Transform stream default sink write algorithm should not fail.") |
| }, |
| } |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writablestreamdefaultcontroller-closealgorithm> |
| fn call_close_algorithm( |
| &self, |
| cx: SafeJSContext, |
| global: &GlobalScope, |
| can_gc: CanGc, |
| ) -> Rc<Promise> { |
| match &self.underlying_sink_type { |
| UnderlyingSinkType::Js { |
| abort: _, |
| start: _, |
| close, |
| write: _, |
| } => { |
| rooted!(in(*cx) let mut this_object = ptr::null_mut::<JSObject>()); |
| this_object.set(self.underlying_sink_obj.get()); |
| let algo = close.borrow().clone(); |
| let result = if let Some(algo) = algo { |
| algo.Call_(&this_object.handle(), ExceptionHandling::Rethrow, can_gc) |
| } else { |
| Ok(Promise::new_resolved(global, cx, (), can_gc)) |
| }; |
| result.unwrap_or_else(|e| { |
| let promise = Promise::new(global, can_gc); |
| promise.reject_error(e, can_gc); |
| promise |
| }) |
| }, |
| UnderlyingSinkType::Transfer { port, .. } => { |
| // The steps from the `closeAlgorithm` at |
| // <https://streams.spec.whatwg.org/#abstract-opdef-setupcrossrealmtransformwritable> |
| |
| // Perform ! PackAndPostMessage(port, "close", undefined). |
| rooted!(in(*cx) let mut value = UndefinedValue()); |
| port.pack_and_post_message("close", value.handle(), can_gc) |
| .expect("Sending close should not fail."); |
| |
| // Disentangle port. |
| global.disentangle_port(port, can_gc); |
| |
| // Return a promise resolved with undefined. |
| Promise::new_resolved(global, cx, (), can_gc) |
| }, |
| UnderlyingSinkType::Transform(stream, _) => { |
| // Return ! TransformStreamDefaultSinkCloseAlgorithm(stream). |
| stream |
| .transform_stream_default_sink_close_algorithm(cx, global, can_gc) |
| .expect("Transform stream default sink close algorithm should not fail.") |
| }, |
| } |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-close> |
| pub(crate) fn process_close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) { |
| // Let stream be controller.[[stream]]. |
| let Some(stream) = self.stream.get() else { |
| unreachable!("Controller should have a stream"); |
| }; |
| |
| // Perform ! WritableStreamMarkCloseRequestInFlight(stream). |
| stream.mark_close_request_in_flight(); |
| |
| // Perform ! DequeueValue(controller). |
| { |
| let mut queue = self.queue.borrow_mut(); |
| queue.dequeue_value(cx, None, can_gc); |
| } |
| |
| // Assert: controller.[[queue]] is empty. |
| assert!(self.queue.borrow().is_empty()); |
| |
| // Let sinkClosePromise be the result of performing controller.[[closeAlgorithm]]. |
| let sink_close_promise = self.call_close_algorithm(cx, global, can_gc); |
| |
| // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller). |
| self.clear_algorithms(); |
| |
| // Upon fulfillment of sinkClosePromise, |
| rooted!(in(*cx) let mut fulfillment_handler = Some(CloseAlgorithmFulfillmentHandler { |
| stream: Dom::from_ref(&stream), |
| })); |
| |
| // Upon rejection of sinkClosePromise with reason reason, |
| rooted!(in(*cx) let mut rejection_handler = Some(CloseAlgorithmRejectionHandler { |
| stream: Dom::from_ref(&stream), |
| })); |
| |
| // Attach handlers to the promise. |
| let handler = PromiseNativeHandler::new( |
| global, |
| fulfillment_handler.take().map(|h| Box::new(h) as Box<_>), |
| rejection_handler.take().map(|h| Box::new(h) as Box<_>), |
| can_gc, |
| ); |
| let realm = enter_realm(global); |
| let comp = InRealm::Entered(&realm); |
| sink_close_promise.append_native_handler(&handler, comp, can_gc); |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-advance-queue-if-needed> |
| fn advance_queue_if_needed(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) { |
| // Let stream be controller.[[stream]]. |
| let Some(stream) = self.stream.get() else { |
| unreachable!("Controller should have a stream"); |
| }; |
| |
| // If controller.[[started]] is false, return. |
| if !self.started.get() { |
| return; |
| } |
| |
| // If stream.[[inFlightWriteRequest]] is not undefined, return. |
| if stream.has_in_flight_write_request() { |
| return; |
| } |
| |
| // Let state be stream.[[state]]. |
| |
| // Assert: state is not "closed" or "errored". |
| assert!(!(stream.is_errored() || stream.is_closed())); |
| |
| // If state is "erroring", |
| if stream.is_erroring() { |
| // Perform ! WritableStreamFinishErroring(stream). |
| stream.finish_erroring(cx, global, can_gc); |
| |
| // Return. |
| return; |
| } |
| |
| // Let value be ! PeekQueueValue(controller). |
| rooted!(in(*cx) let mut value = UndefinedValue()); |
| let is_closed = { |
| let queue = self.queue.borrow_mut(); |
| |
| // If controller.[[queue]] is empty, return. |
| if queue.is_empty() { |
| return; |
| } |
| queue.peek_queue_value(cx, value.handle_mut(), can_gc) |
| }; |
| |
| if is_closed { |
| // If value is the close sentinel, perform ! WritableStreamDefaultControllerProcessClose(controller). |
| self.process_close(cx, global, can_gc); |
| } else { |
| // Otherwise, perform ! WritableStreamDefaultControllerProcessWrite(controller, value). |
| self.process_write(cx, value.handle(), global, can_gc); |
| }; |
| } |
| |
| /// <https://streams.spec.whatwg.org/#ws-default-controller-private-error> |
| pub(crate) fn perform_error_steps(&self) { |
| // Perform ! ResetQueue(this). |
| self.queue.borrow_mut().reset(); |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-process-write> |
| fn process_write( |
| &self, |
| cx: SafeJSContext, |
| chunk: SafeHandleValue, |
| global: &GlobalScope, |
| can_gc: CanGc, |
| ) { |
| // Let stream be controller.[[stream]]. |
| let Some(stream) = self.stream.get() else { |
| unreachable!("Controller should have a stream"); |
| }; |
| |
| // Perform ! WritableStreamMarkFirstWriteRequestInFlight(stream). |
| stream.mark_first_write_request_in_flight(); |
| |
| // Let sinkWritePromise be the result of performing controller.[[writeAlgorithm]], passing in chunk. |
| let sink_write_promise = self.call_write_algorithm(cx, chunk, global, can_gc); |
| |
| // Upon fulfillment of sinkWritePromise, |
| rooted!(in(*cx) let mut fulfillment_handler = Some(WriteAlgorithmFulfillmentHandler { |
| controller: Dom::from_ref(self), |
| })); |
| |
| // Upon rejection of sinkWritePromise with reason, |
| rooted!(in(*cx) let mut rejection_handler = Some(WriteAlgorithmRejectionHandler { |
| controller: Dom::from_ref(self), |
| })); |
| |
| // Attach handlers to the promise. |
| let handler = PromiseNativeHandler::new( |
| global, |
| fulfillment_handler.take().map(|h| Box::new(h) as Box<_>), |
| rejection_handler.take().map(|h| Box::new(h) as Box<_>), |
| can_gc, |
| ); |
| let realm = enter_realm(global); |
| let comp = InRealm::Entered(&realm); |
| sink_write_promise.append_native_handler(&handler, comp, can_gc); |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-desired-size> |
| pub(crate) fn get_desired_size(&self) -> f64 { |
| // Return controller.[[strategyHWM]] − controller.[[queueTotalSize]]. |
| let queue = self.queue.borrow(); |
| let desired_size = self.strategy_hwm - queue.total_size.clamp(0.0, f64::MAX); |
| desired_size.clamp(desired_size, self.strategy_hwm) |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-backpressure> |
| fn get_backpressure(&self) -> bool { |
| // Let desiredSize be ! WritableStreamDefaultControllerGetDesiredSize(controller). |
| let desired_size = self.get_desired_size(); |
| |
| // Return true if desiredSize ≤ 0, or false otherwise. |
| desired_size == 0.0 || desired_size.is_sign_negative() |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-get-chunk-size> |
| pub(crate) fn get_chunk_size( |
| &self, |
| cx: SafeJSContext, |
| global: &GlobalScope, |
| chunk: SafeHandleValue, |
| can_gc: CanGc, |
| ) -> f64 { |
| // If controller.[[strategySizeAlgorithm]] is undefined, then: |
| let Some(strategy_size) = self.strategy_size.borrow().clone() else { |
| // Assert: controller.[[stream]].[[state]] is not "writable". |
| let Some(stream) = self.stream.get() else { |
| unreachable!("Controller should have a stream"); |
| }; |
| assert!(!stream.is_writable()); |
| |
| // Return 1. |
| return 1.0; |
| }; |
| |
| // Let returnValue be the result of performing controller.[[strategySizeAlgorithm]], |
| // passing in chunk, and interpreting the result as a completion record. |
| let result = strategy_size.Call__(chunk, ExceptionHandling::Rethrow, can_gc); |
| |
| match result { |
| // Let chunkSize be result.[[Value]]. |
| Ok(size) => size, |
| Err(error) => { |
| // If result is an abrupt completion, |
| |
| // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, returnValue.[[Value]]). |
| // Create a rooted value for the error. |
| rooted!(in(*cx) let mut rooted_error = UndefinedValue()); |
| error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc); |
| self.error_if_needed(cx, rooted_error.handle(), global, can_gc); |
| |
| // Return 1. |
| 1.0 |
| }, |
| } |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-write> |
| pub(crate) fn write( |
| &self, |
| cx: SafeJSContext, |
| global: &GlobalScope, |
| chunk: SafeHandleValue, |
| chunk_size: f64, |
| can_gc: CanGc, |
| ) { |
| // Let enqueueResult be EnqueueValueWithSize(controller, chunk, chunkSize). |
| let enqueue_result = { |
| let mut queue = self.queue.borrow_mut(); |
| queue.enqueue_value_with_size(EnqueuedValue::Js(ValueWithSize { |
| value: Heap::boxed(chunk.get()), |
| size: chunk_size, |
| })) |
| }; |
| |
| // If enqueueResult is an abrupt completion, |
| if let Err(error) = enqueue_result { |
| // Perform ! WritableStreamDefaultControllerErrorIfNeeded(controller, enqueueResult.[[Value]]). |
| // Create a rooted value for the error. |
| rooted!(in(*cx) let mut rooted_error = UndefinedValue()); |
| error.to_jsval(cx, global, rooted_error.handle_mut(), can_gc); |
| self.error_if_needed(cx, rooted_error.handle(), global, can_gc); |
| |
| // Return. |
| return; |
| } |
| |
| // Let stream be controller.[[stream]]. |
| let Some(stream) = self.stream.get() else { |
| unreachable!("Controller should have a stream"); |
| }; |
| |
| // If ! WritableStreamCloseQueuedOrInFlight(stream) is false and stream.[[state]] is "writable", |
| if !stream.close_queued_or_in_flight() && stream.is_writable() { |
| // Let backpressure be ! WritableStreamDefaultControllerGetBackpressure(controller). |
| let backpressure = self.get_backpressure(); |
| |
| // Perform ! WritableStreamUpdateBackpressure(stream, backpressure). |
| stream.update_backpressure(backpressure, global, can_gc); |
| } |
| |
| // Perform ! WritableStreamDefaultControllerAdvanceQueueIfNeeded(controller). |
| self.advance_queue_if_needed(cx, global, can_gc); |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-error-if-needed> |
| pub(crate) fn error_if_needed( |
| &self, |
| cx: SafeJSContext, |
| error: SafeHandleValue, |
| global: &GlobalScope, |
| can_gc: CanGc, |
| ) { |
| // Let stream be controller.[[stream]]. |
| let Some(stream) = self.stream.get() else { |
| unreachable!("Controller should have a stream"); |
| }; |
| |
| // If stream.[[state]] is "writable", |
| if stream.is_writable() { |
| // Perform ! WritableStreamDefaultControllerError(controller, e). |
| self.error(&stream, cx, error, global, can_gc); |
| } |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-controller-error> |
| pub(crate) fn error( |
| &self, |
| stream: &WritableStream, |
| cx: SafeJSContext, |
| e: SafeHandleValue, |
| global: &GlobalScope, |
| can_gc: CanGc, |
| ) { |
| // Let stream be controller.[[stream]]. |
| // Done above with the argument. |
| |
| // Assert: stream.[[state]] is "writable". |
| assert!(stream.is_writable()); |
| |
| // Perform ! WritableStreamDefaultControllerClearAlgorithms(controller). |
| self.clear_algorithms(); |
| |
| // Perform ! WritableStreamStartErroring(stream, error). |
| stream.start_erroring(cx, global, e, can_gc); |
| } |
| } |
| |
| impl WritableStreamDefaultControllerMethods<crate::DomTypeHolder> |
| for WritableStreamDefaultController |
| { |
| /// <https://streams.spec.whatwg.org/#ws-default-controller-error> |
| fn Error(&self, cx: SafeJSContext, e: SafeHandleValue, realm: InRealm, can_gc: CanGc) { |
| // Let state be this.[[stream]].[[state]]. |
| let Some(stream) = self.stream.get() else { |
| unreachable!("Controller should have a stream"); |
| }; |
| |
| // If state is not "writable", return. |
| if !stream.is_writable() { |
| return; |
| } |
| |
| let global = GlobalScope::from_safe_context(cx, realm); |
| |
| // Perform ! WritableStreamDefaultControllerError(this, e). |
| self.error(&stream, cx, e, &global, can_gc); |
| } |
| |
| /// <https://streams.spec.whatwg.org/#ws-default-controller-signal> |
| fn Signal(&self) -> DomRoot<AbortSignal> { |
| // Return this.[[abortController]]’s signal. |
| self.abort_controller.signal() |
| } |
| } |