| /* This Source Code Form is subject to the terms of the Mozilla Public |
| * License, v. 2.0. If a copy of the MPL was not distributed with this |
| * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ |
| |
| use std::cell::RefCell; |
| use std::rc::Rc; |
| |
| use dom_struct::dom_struct; |
| use js::jsval::UndefinedValue; |
| use js::rust::{HandleObject as SafeHandleObject, HandleValue as SafeHandleValue}; |
| |
| use crate::dom::bindings::codegen::Bindings::WritableStreamDefaultWriterBinding::WritableStreamDefaultWriterMethods; |
| use crate::dom::bindings::error::{Error, ErrorToJsval}; |
| use crate::dom::bindings::reflector::{DomGlobal, Reflector, reflect_dom_object_with_proto}; |
| use crate::dom::bindings::root::{DomRoot, MutNullableDom}; |
| use crate::dom::globalscope::GlobalScope; |
| use crate::dom::promise::Promise; |
| use crate::dom::writablestream::WritableStream; |
| use crate::realms::InRealm; |
| use crate::script_runtime::{CanGc, JSContext as SafeJSContext}; |
| |
| /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter> |
| #[dom_struct] |
| pub struct WritableStreamDefaultWriter { |
| reflector_: Reflector, |
| |
| #[ignore_malloc_size_of = "Rc is hard"] |
| ready_promise: RefCell<Rc<Promise>>, |
| |
| /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-closedpromise> |
| #[ignore_malloc_size_of = "Rc is hard"] |
| closed_promise: RefCell<Rc<Promise>>, |
| |
| /// <https://streams.spec.whatwg.org/#writablestreamdefaultwriter-stream> |
| stream: MutNullableDom<WritableStream>, |
| } |
| |
| impl WritableStreamDefaultWriter { |
| #[cfg_attr(crown, allow(crown::unrooted_must_root))] |
| /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer> |
| /// The parts that create a new promise. |
| fn new_inherited(global: &GlobalScope, can_gc: CanGc) -> WritableStreamDefaultWriter { |
| WritableStreamDefaultWriter { |
| reflector_: Reflector::new(), |
| stream: Default::default(), |
| closed_promise: RefCell::new(Promise::new(global, can_gc)), |
| ready_promise: RefCell::new(Promise::new(global, can_gc)), |
| } |
| } |
| |
| pub(crate) fn new( |
| global: &GlobalScope, |
| proto: Option<SafeHandleObject>, |
| can_gc: CanGc, |
| ) -> DomRoot<WritableStreamDefaultWriter> { |
| reflect_dom_object_with_proto( |
| Box::new(WritableStreamDefaultWriter::new_inherited(global, can_gc)), |
| global, |
| proto, |
| can_gc, |
| ) |
| } |
| |
| /// <https://streams.spec.whatwg.org/#set-up-writable-stream-default-writer> |
| /// Continuing from `new_inherited`, the rest. |
| pub(crate) fn setup( |
| &self, |
| cx: SafeJSContext, |
| stream: &WritableStream, |
| can_gc: CanGc, |
| ) -> Result<(), Error> { |
| // If ! IsWritableStreamLocked(stream) is true, throw a TypeError exception. |
| if stream.is_locked() { |
| return Err(Error::Type("Stream is locked".to_string())); |
| } |
| |
| // Set writer.[[stream]] to stream. |
| self.stream.set(Some(stream)); |
| |
| // Set stream.[[writer]] to writer. |
| stream.set_writer(Some(self)); |
| |
| // Let state be stream.[[state]]. |
| |
| // If state is "writable", |
| if stream.is_writable() { |
| // If ! WritableStreamCloseQueuedOrInFlight(stream) is false |
| // and stream.[[backpressure]] is true, |
| if !stream.close_queued_or_in_flight() && stream.get_backpressure() { |
| // set writer.[[readyPromise]] to a new promise. |
| // Done in `new_inherited`. |
| } else { |
| // Otherwise, set writer.[[readyPromise]] to a promise resolved with undefined. |
| // Note: new promise created in `new_inherited`. |
| self.ready_promise.borrow().resolve_native(&(), can_gc); |
| } |
| |
| // Set writer.[[closedPromise]] to a new promise. |
| // Done in `new_inherited`. |
| return Ok(()); |
| } |
| |
| // Otherwise, if state is "erroring", |
| if stream.is_erroring() { |
| rooted!(in(*cx) let mut error = UndefinedValue()); |
| stream.get_stored_error(error.handle_mut()); |
| |
| // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]]. |
| // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true. |
| // Note: new promise created in `new_inherited`. |
| let ready_promise = self.ready_promise.borrow(); |
| ready_promise.reject_native(&error.handle(), can_gc); |
| ready_promise.set_promise_is_handled(); |
| |
| // Set writer.[[closedPromise]] to a new promise. |
| // Done in `new_inherited`. |
| return Ok(()); |
| } |
| |
| // Otherwise, if state is "closed", |
| if stream.is_closed() { |
| // Set writer.[[readyPromise]] to a promise resolved with undefined. |
| // Note: new promise created in `new_inherited`. |
| self.ready_promise.borrow().resolve_native(&(), can_gc); |
| |
| // Set writer.[[closedPromise]] to a promise resolved with undefined. |
| // Note: new promise created in `new_inherited`. |
| self.closed_promise.borrow().resolve_native(&(), can_gc); |
| return Ok(()); |
| } |
| |
| // Otherwise, |
| // Assert: state is "errored". |
| assert!(stream.is_errored()); |
| |
| // Let storedError be stream.[[storedError]]. |
| rooted!(in(*cx) let mut error = UndefinedValue()); |
| stream.get_stored_error(error.handle_mut()); |
| |
| // Set writer.[[readyPromise]] to a promise rejected with stream.[[storedError]]. |
| // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true. |
| // Note: new promise created in `new_inherited`. |
| let ready_promise = self.ready_promise.borrow(); |
| ready_promise.reject_native(&error.handle(), can_gc); |
| ready_promise.set_promise_is_handled(); |
| |
| // Set writer.[[closedPromise]] to a promise rejected with storedError. |
| // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true. |
| // Note: new promise created in `new_inherited`. |
| let ready_promise = self.closed_promise.borrow(); |
| ready_promise.reject_native(&error.handle(), can_gc); |
| ready_promise.set_promise_is_handled(); |
| |
| Ok(()) |
| } |
| |
| pub(crate) fn reject_closed_promise_with_stored_error( |
| &self, |
| error: &SafeHandleValue, |
| can_gc: CanGc, |
| ) { |
| self.closed_promise.borrow().reject_native(error, can_gc); |
| } |
| |
| pub(crate) fn set_close_promise_is_handled(&self) { |
| self.closed_promise.borrow().set_promise_is_handled(); |
| } |
| |
| pub(crate) fn set_ready_promise(&self, promise: Rc<Promise>) { |
| *self.ready_promise.borrow_mut() = promise; |
| } |
| |
| pub(crate) fn resolve_ready_promise_with_undefined(&self, can_gc: CanGc) { |
| self.ready_promise.borrow().resolve_native(&(), can_gc); |
| } |
| |
| pub(crate) fn resolve_closed_promise_with_undefined(&self, can_gc: CanGc) { |
| self.closed_promise.borrow().resolve_native(&(), can_gc); |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-ready-promise-rejected> |
| pub(crate) fn ensure_ready_promise_rejected( |
| &self, |
| global: &GlobalScope, |
| error: SafeHandleValue, |
| can_gc: CanGc, |
| ) { |
| let ready_promise = self.ready_promise.borrow().clone(); |
| |
| // If writer.[[readyPromise]].[[PromiseState]] is "pending", |
| if ready_promise.is_pending() { |
| // reject writer.[[readyPromise]] with error. |
| ready_promise.reject_native(&error, can_gc); |
| |
| // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true. |
| ready_promise.set_promise_is_handled(); |
| } else { |
| // Otherwise, set writer.[[readyPromise]] to a promise rejected with error. |
| let promise = Promise::new(global, can_gc); |
| promise.reject_native(&error, can_gc); |
| |
| // Set writer.[[readyPromise]].[[PromiseIsHandled]] to true. |
| promise.set_promise_is_handled(); |
| *self.ready_promise.borrow_mut() = promise; |
| } |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-ensure-closed-promise-rejected> |
| pub(crate) fn ensure_closed_promise_rejected( |
| &self, |
| global: &GlobalScope, |
| error: SafeHandleValue, |
| can_gc: CanGc, |
| ) { |
| let closed_promise = self.closed_promise.borrow().clone(); |
| |
| // If writer.[[closedPromise]].[[PromiseState]] is "pending", |
| if closed_promise.is_pending() { |
| // reject writer.[[closedPromise]] with error. |
| closed_promise.reject_native(&error, can_gc); |
| |
| // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true. |
| closed_promise.set_promise_is_handled(); |
| } else { |
| // Otherwise, set writer.[[closedPromise]] to a promise rejected with error. |
| let promise = Promise::new(global, can_gc); |
| promise.reject_native(&error, can_gc); |
| |
| // Set writer.[[closedPromise]].[[PromiseIsHandled]] to true. |
| promise.set_promise_is_handled(); |
| *self.closed_promise.borrow_mut() = promise; |
| } |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-abort> |
| fn abort( |
| &self, |
| cx: SafeJSContext, |
| global: &GlobalScope, |
| reason: SafeHandleValue, |
| realm: InRealm, |
| can_gc: CanGc, |
| ) -> Rc<Promise> { |
| // Let stream be writer.[[stream]]. |
| let Some(stream) = self.stream.get() else { |
| // Assert: stream is not undefined. |
| unreachable!("Stream should be set."); |
| }; |
| |
| // Return ! WritableStreamAbort(stream, reason). |
| stream.abort(cx, global, reason, realm, can_gc) |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close> |
| fn close(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) -> Rc<Promise> { |
| // Let stream be writer.[[stream]]. |
| let Some(stream) = self.stream.get() else { |
| // Assert: stream is not undefined. |
| unreachable!("Stream should be set."); |
| }; |
| |
| // Return ! WritableStreamClose(stream). |
| stream.close(cx, global, can_gc) |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-write> |
| pub(crate) fn write( |
| &self, |
| cx: SafeJSContext, |
| global: &GlobalScope, |
| chunk: SafeHandleValue, |
| can_gc: CanGc, |
| ) -> Rc<Promise> { |
| // Let stream be writer.[[stream]]. |
| let Some(stream) = self.stream.get() else { |
| // Assert: stream is not undefined. |
| unreachable!("Stream should be set."); |
| }; |
| |
| // Let controller be stream.[[controller]]. |
| // Note: asserting controller is some. |
| let Some(controller) = stream.get_controller() else { |
| unreachable!("Controller should be set."); |
| }; |
| |
| // Let chunkSize be ! WritableStreamDefaultControllerGetChunkSize(controller, chunk). |
| let chunk_size = controller.get_chunk_size(cx, global, chunk, can_gc); |
| |
| // If stream is not equal to writer.[[stream]], |
| // return a promise rejected with a TypeError exception. |
| if !self |
| .stream |
| .get() |
| .is_some_and(|current_stream| current_stream == stream) |
| { |
| let promise = Promise::new(global, can_gc); |
| promise.reject_error( |
| Error::Type("Stream is not equal to writer stream".to_string()), |
| can_gc, |
| ); |
| return promise; |
| } |
| |
| // Let state be stream.[[state]]. |
| // If state is "errored", |
| if stream.is_errored() { |
| // return a promise rejected with stream.[[storedError]]. |
| rooted!(in(*cx) let mut error = UndefinedValue()); |
| stream.get_stored_error(error.handle_mut()); |
| let promise = Promise::new(global, can_gc); |
| promise.reject_native(&error.handle(), can_gc); |
| return promise; |
| } |
| |
| // If ! WritableStreamCloseQueuedOrInFlight(stream) is true |
| // or state is "closed", |
| if stream.close_queued_or_in_flight() || stream.is_closed() { |
| // return a promise rejected with a TypeError exception |
| // indicating that the stream is closing or closed |
| let promise = Promise::new(global, can_gc); |
| promise.reject_error( |
| Error::Type("Stream has been closed, or has close queued or in-flight".to_string()), |
| can_gc, |
| ); |
| return promise; |
| } |
| |
| // If state is "erroring", |
| if stream.is_erroring() { |
| // return a promise rejected with stream.[[storedError]]. |
| rooted!(in(*cx) let mut error = UndefinedValue()); |
| stream.get_stored_error(error.handle_mut()); |
| let promise = Promise::new(global, can_gc); |
| promise.reject_native(&error.handle(), can_gc); |
| return promise; |
| } |
| |
| // Assert: state is "writable". |
| assert!(stream.is_writable()); |
| |
| // Let promise be ! WritableStreamAddWriteRequest(stream). |
| let promise = stream.add_write_request(global, can_gc); |
| |
| // Perform ! WritableStreamDefaultControllerWrite(controller, chunk, chunkSize). |
| controller.write(cx, global, chunk, chunk_size, can_gc); |
| |
| // Return promise. |
| promise |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-release> |
| pub(crate) fn release(&self, cx: SafeJSContext, global: &GlobalScope, can_gc: CanGc) { |
| // Let stream be this.[[stream]]. |
| let Some(stream) = self.stream.get() else { |
| // Assert: stream is not undefined. |
| unreachable!("Stream should be set."); |
| }; |
| |
| // Assert: stream.[[writer]] is writer. |
| assert!(stream.get_writer().is_some_and(|writer| &*writer == self)); |
| |
| // Let releasedError be a new TypeError. |
| let released_error = Error::Type("Writer has been released".to_string()); |
| |
| // Root the js val of the error. |
| rooted!(in(*cx) let mut error = UndefinedValue()); |
| released_error.to_jsval(cx, global, error.handle_mut(), can_gc); |
| |
| // Perform ! WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedError). |
| self.ensure_ready_promise_rejected(global, error.handle(), can_gc); |
| |
| // Perform ! WritableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedError). |
| self.ensure_closed_promise_rejected(global, error.handle(), can_gc); |
| |
| // Set stream.[[writer]] to undefined. |
| stream.set_writer(None); |
| |
| // Set this.[[stream]] to undefined. |
| self.stream.set(None); |
| } |
| |
| /// <https://streams.spec.whatwg.org/#writable-stream-default-writer-close-with-error-propagation> |
| pub(crate) fn close_with_error_propagation( |
| &self, |
| cx: SafeJSContext, |
| global: &GlobalScope, |
| can_gc: CanGc, |
| ) -> Rc<Promise> { |
| // Let stream be writer.[[stream]]. |
| let Some(stream) = self.stream.get() else { |
| // Assert: stream is not undefined. |
| unreachable!("Stream should be set."); |
| }; |
| |
| // Let state be stream.[[state]]. |
| // Used via stream method calls. |
| |
| // If ! WritableStreamCloseQueuedOrInFlight(stream) is true |
| // or state is "closed", |
| if stream.close_queued_or_in_flight() || stream.is_closed() { |
| // return a promise resolved with undefined. |
| let promise = Promise::new(global, can_gc); |
| promise.resolve_native(&(), can_gc); |
| return promise; |
| } |
| |
| // If state is "errored", |
| if stream.is_errored() { |
| // return a promise rejected with stream.[[storedError]]. |
| rooted!(in(*cx) let mut error = UndefinedValue()); |
| stream.get_stored_error(error.handle_mut()); |
| let promise = Promise::new(global, can_gc); |
| promise.reject_native(&error.handle(), can_gc); |
| return promise; |
| } |
| |
| // Assert: state is "writable" or "erroring". |
| assert!(stream.is_writable() || stream.is_erroring()); |
| |
| // Return ! WritableStreamDefaultWriterClose(writer). |
| self.close(cx, global, can_gc) |
| } |
| |
| pub(crate) fn get_stream(&self) -> Option<DomRoot<WritableStream>> { |
| self.stream.get() |
| } |
| } |
| |
| impl WritableStreamDefaultWriterMethods<crate::DomTypeHolder> for WritableStreamDefaultWriter { |
| /// <https://streams.spec.whatwg.org/#default-writer-closed> |
| fn Closed(&self) -> Rc<Promise> { |
| // Return this.[[closedPromise]]. |
| return self.closed_promise.borrow().clone(); |
| } |
| |
| /// <https://streams.spec.whatwg.org/#default-writer-desired-size> |
| fn GetDesiredSize(&self) -> Result<Option<f64>, Error> { |
| // If this.[[stream]] is undefined, throw a TypeError exception. |
| let Some(stream) = self.stream.get() else { |
| return Err(Error::Type("Stream is undefined".to_string())); |
| }; |
| |
| // Return ! WritableStreamDefaultWriterGetDesiredSize(this). |
| Ok(stream.get_desired_size()) |
| } |
| |
| /// <https://streams.spec.whatwg.org/#default-writer-ready> |
| fn Ready(&self) -> Rc<Promise> { |
| // Return this.[[readyPromise]]. |
| return self.ready_promise.borrow().clone(); |
| } |
| |
| /// <https://streams.spec.whatwg.org/#default-writer-abort> |
| fn Abort( |
| &self, |
| cx: SafeJSContext, |
| reason: SafeHandleValue, |
| realm: InRealm, |
| can_gc: CanGc, |
| ) -> Rc<Promise> { |
| let global = GlobalScope::from_safe_context(cx, realm); |
| |
| // If this.[[stream]] is undefined, |
| if self.stream.get().is_none() { |
| // return a promise rejected with a TypeError exception. |
| let promise = Promise::new(&global, can_gc); |
| promise.reject_error(Error::Type("Stream is undefined".to_string()), can_gc); |
| return promise; |
| } |
| |
| // Return ! WritableStreamDefaultWriterAbort(this, reason). |
| self.abort(cx, &global, reason, realm, can_gc) |
| } |
| |
| /// <https://streams.spec.whatwg.org/#default-writer-close> |
| fn Close(&self, in_realm: InRealm, can_gc: CanGc) -> Rc<Promise> { |
| let cx = GlobalScope::get_cx(); |
| let global = GlobalScope::from_safe_context(cx, in_realm); |
| let promise = Promise::new(&global, can_gc); |
| |
| // Let stream be this.[[stream]]. |
| let Some(stream) = self.stream.get() else { |
| // If stream is undefined, |
| // return a promise rejected with a TypeError exception. |
| promise.reject_error(Error::Type("Stream is undefined".to_string()), can_gc); |
| return promise; |
| }; |
| |
| // If ! WritableStreamCloseQueuedOrInFlight(stream) is true |
| if stream.close_queued_or_in_flight() { |
| // return a promise rejected with a TypeError exception. |
| promise.reject_error( |
| Error::Type("Stream has closed queued or in-flight".to_string()), |
| can_gc, |
| ); |
| return promise; |
| } |
| |
| self.close(cx, &global, can_gc) |
| } |
| |
| /// <https://streams.spec.whatwg.org/#default-writer-release-lock> |
| fn ReleaseLock(&self, can_gc: CanGc) { |
| // Let stream be this.[[stream]]. |
| let Some(stream) = self.stream.get() else { |
| // If stream is undefined, return. |
| return; |
| }; |
| |
| // Assert: stream.[[writer]] is not undefined. |
| assert!(stream.get_writer().is_some()); |
| |
| let global = self.global(); |
| let cx = GlobalScope::get_cx(); |
| |
| // Perform ! WritableStreamDefaultWriterRelease(this). |
| self.release(cx, &global, can_gc); |
| } |
| |
| /// <https://streams.spec.whatwg.org/#default-writer-write> |
| fn Write( |
| &self, |
| cx: SafeJSContext, |
| chunk: SafeHandleValue, |
| realm: InRealm, |
| can_gc: CanGc, |
| ) -> Rc<Promise> { |
| let global = GlobalScope::from_safe_context(cx, realm); |
| |
| // If this.[[stream]] is undefined, |
| if self.stream.get().is_none() { |
| // return a promise rejected with a TypeError exception. |
| let global = GlobalScope::from_safe_context(cx, realm); |
| let promise = Promise::new(&global, can_gc); |
| promise.reject_error(Error::Type("Stream is undefined".to_string()), can_gc); |
| return promise; |
| } |
| |
| // Return ! WritableStreamDefaultWriterWrite(this, chunk). |
| self.write(cx, &global, chunk, can_gc) |
| } |
| |
| /// <https://streams.spec.whatwg.org/#default-writer-constructor> |
| fn Constructor( |
| global: &GlobalScope, |
| proto: Option<SafeHandleObject>, |
| can_gc: CanGc, |
| stream: &WritableStream, |
| ) -> Result<DomRoot<WritableStreamDefaultWriter>, Error> { |
| let writer = WritableStreamDefaultWriter::new(global, proto, can_gc); |
| |
| let cx = GlobalScope::get_cx(); |
| |
| // Perform ? SetUpWritableStreamDefaultWriter(this, stream). |
| writer.setup(cx, stream, can_gc)?; |
| |
| Ok(writer) |
| } |
| } |