blob: c339a77ad701e834d3daad46d41e5ea512f88df7 [file] [log] [blame] [edit]
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
use std::cell::Cell;
use std::cmp::min;
use std::collections::VecDeque;
use std::rc::Rc;
use dom_struct::dom_struct;
use js::jsapi::{Heap, Type};
use js::jsval::UndefinedValue;
use js::rust::{HandleObject, HandleValue as SafeHandleValue, HandleValue};
use js::typedarray::{ArrayBufferU8, ArrayBufferViewU8};
use super::bindings::buffer_source::HeapBufferSource;
use super::bindings::cell::DomRefCell;
use super::bindings::codegen::Bindings::ReadableStreamBYOBReaderBinding::ReadableStreamBYOBReaderReadOptions;
use super::bindings::reflector::reflect_dom_object;
use super::bindings::root::Dom;
use super::readablestreambyobreader::ReadIntoRequest;
use super::readablestreamdefaultreader::ReadRequest;
use super::underlyingsourcecontainer::{UnderlyingSourceContainer, UnderlyingSourceType};
use crate::dom::bindings::buffer_source::{
Constructor, byte_size, create_array_buffer_with_size, create_buffer_source_with_constructor,
};
use crate::dom::bindings::codegen::Bindings::ReadableByteStreamControllerBinding::ReadableByteStreamControllerMethods;
use crate::dom::bindings::codegen::UnionTypes::ReadableStreamDefaultControllerOrReadableByteStreamController as Controller;
use crate::dom::bindings::error::{Error, ErrorToJsval, Fallible};
use crate::dom::bindings::reflector::{DomGlobal, Reflector};
use crate::dom::bindings::root::{DomRoot, MutNullableDom};
use crate::dom::bindings::trace::RootedTraceableBox;
use crate::dom::globalscope::GlobalScope;
use crate::dom::promise::Promise;
use crate::dom::promisenativehandler::{Callback, PromiseNativeHandler};
use crate::dom::readablestream::ReadableStream;
use crate::dom::readablestreambyobrequest::ReadableStreamBYOBRequest;
use crate::realms::{InRealm, enter_realm};
use crate::script_runtime::{CanGc, JSContext as SafeJSContext};
/// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry>
#[derive(JSTraceable, MallocSizeOf)]
pub(crate) struct QueueEntry {
/// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-buffer>
#[ignore_malloc_size_of = "HeapBufferSource"]
buffer: HeapBufferSource<ArrayBufferU8>,
/// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-offset>
byte_offset: usize,
/// <https://streams.spec.whatwg.org/#readable-byte-stream-queue-entry-byte-length>
byte_length: usize,
}
impl QueueEntry {
pub(crate) fn new(
buffer: HeapBufferSource<ArrayBufferU8>,
byte_offset: usize,
byte_length: usize,
) -> QueueEntry {
QueueEntry {
buffer,
byte_offset,
byte_length,
}
}
}
#[derive(Debug, Eq, JSTraceable, MallocSizeOf, PartialEq)]
pub(crate) enum ReaderType {
/// <https://streams.spec.whatwg.org/#readablestreambyobreader>
Byob,
/// <https://streams.spec.whatwg.org/#readablestreamdefaultreader>
Default,
}
/// <https://streams.spec.whatwg.org/#pull-into-descriptor>
#[derive(Eq, JSTraceable, MallocSizeOf, PartialEq)]
pub(crate) struct PullIntoDescriptor {
#[ignore_malloc_size_of = "HeapBufferSource"]
/// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer>
buffer: HeapBufferSource<ArrayBufferU8>,
/// <https://streams.spec.whatwg.org/#pull-into-descriptor-buffer-byte-length>
buffer_byte_length: u64,
/// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-offset>
byte_offset: u64,
/// <https://streams.spec.whatwg.org/#pull-into-descriptor-byte-length>
byte_length: u64,
/// <https://streams.spec.whatwg.org/#pull-into-descriptor-bytes-filled>
bytes_filled: Cell<u64>,
/// <https://streams.spec.whatwg.org/#pull-into-descriptor-minimum-fill>
minimum_fill: u64,
/// <https://streams.spec.whatwg.org/#pull-into-descriptor-element-size>
element_size: u64,
/// <https://streams.spec.whatwg.org/#pull-into-descriptor-view-constructor>
view_constructor: Constructor,
/// <https://streams.spec.whatwg.org/#pull-into-descriptor-reader-type>
reader_type: Option<ReaderType>,
}
/// The fulfillment handler for
/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
#[derive(Clone, JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
struct StartAlgorithmFulfillmentHandler {
controller: Dom<ReadableByteStreamController>,
}
impl Callback for StartAlgorithmFulfillmentHandler {
/// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
/// Upon fulfillment of startPromise,
fn callback(&self, _cx: SafeJSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
// Set controller.[[started]] to true.
self.controller.started.set(true);
// Assert: controller.[[pulling]] is false.
assert!(!self.controller.pulling.get());
// Assert: controller.[[pullAgain]] is false.
assert!(!self.controller.pull_again.get());
// Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
self.controller.call_pull_if_needed(can_gc);
}
}
/// The rejection handler for
/// <https://streams.spec.whatwg.org/#dom-underlyingsource-start>
#[derive(Clone, JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
struct StartAlgorithmRejectionHandler {
controller: Dom<ReadableByteStreamController>,
}
impl Callback for StartAlgorithmRejectionHandler {
/// Continuation of <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
/// Upon rejection of startPromise with reason r,
fn callback(&self, _cx: SafeJSContext, v: HandleValue, _realm: InRealm, can_gc: CanGc) {
// Perform ! ReadableByteStreamControllerError(controller, r).
self.controller.error(v, can_gc);
}
}
/// The fulfillment handler for
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
#[derive(Clone, JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
struct PullAlgorithmFulfillmentHandler {
controller: Dom<ReadableByteStreamController>,
}
impl Callback for PullAlgorithmFulfillmentHandler {
/// Continuation of <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
/// Upon fulfillment of pullPromise
fn callback(&self, _cx: SafeJSContext, _v: HandleValue, _realm: InRealm, can_gc: CanGc) {
// Set controller.[[pulling]] to false.
self.controller.pulling.set(false);
// If controller.[[pullAgain]] is true,
if self.controller.pull_again.get() {
// Set controller.[[pullAgain]] to false.
self.controller.pull_again.set(false);
// Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
self.controller.call_pull_if_needed(can_gc);
}
}
}
/// The rejection handler for
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
#[derive(Clone, JSTraceable, MallocSizeOf)]
#[cfg_attr(crown, crown::unrooted_must_root_lint::must_root)]
struct PullAlgorithmRejectionHandler {
controller: Dom<ReadableByteStreamController>,
}
impl Callback for PullAlgorithmRejectionHandler {
/// Continuation of <https://streams.spec.whatwg.org/#readable-stream-byte-controller-call-pull-if-needed>
/// Upon rejection of pullPromise with reason e.
fn callback(&self, _cx: SafeJSContext, v: HandleValue, _realm: InRealm, can_gc: CanGc) {
// Perform ! ReadableByteStreamControllerError(controller, e).
self.controller.error(v, can_gc);
}
}
/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller>
#[dom_struct]
pub(crate) struct ReadableByteStreamController {
reflector_: Reflector,
/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-autoallocatechunksize>
auto_allocate_chunk_size: Option<u64>,
/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-stream>
stream: MutNullableDom<ReadableStream>,
/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-strategyhwm>
strategy_hwm: f64,
/// A mutable reference to the underlying source is used to implement these two
/// internal slots:
///
/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-cancelalgorithm>
underlying_source: MutNullableDom<UnderlyingSourceContainer>,
/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queue>
queue: DomRefCell<VecDeque<QueueEntry>>,
/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-queuetotalsize>
queue_total_size: Cell<f64>,
/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-byobrequest>
byob_request: MutNullableDom<ReadableStreamBYOBRequest>,
/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pendingpullintos>
pending_pull_intos: DomRefCell<Vec<PullIntoDescriptor>>,
/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-closerequested>
close_requested: Cell<bool>,
/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-started>
started: Cell<bool>,
/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pulling>
pulling: Cell<bool>,
/// <https://streams.spec.whatwg.org/#readablebytestreamcontroller-pullalgorithm>
pull_again: Cell<bool>,
}
impl ReadableByteStreamController {
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
fn new_inherited(
underlying_source_type: UnderlyingSourceType,
strategy_hwm: f64,
global: &GlobalScope,
can_gc: CanGc,
) -> ReadableByteStreamController {
let underlying_source_container =
UnderlyingSourceContainer::new(global, underlying_source_type, can_gc);
let auto_allocate_chunk_size = underlying_source_container.auto_allocate_chunk_size();
ReadableByteStreamController {
reflector_: Reflector::new(),
byob_request: MutNullableDom::new(None),
stream: MutNullableDom::new(None),
underlying_source: MutNullableDom::new(Some(&*underlying_source_container)),
auto_allocate_chunk_size,
pending_pull_intos: DomRefCell::new(Vec::new()),
strategy_hwm,
close_requested: Default::default(),
queue: DomRefCell::new(Default::default()),
queue_total_size: Default::default(),
started: Default::default(),
pulling: Default::default(),
pull_again: Default::default(),
}
}
#[cfg_attr(crown, allow(crown::unrooted_must_root))]
pub(crate) fn new(
underlying_source_type: UnderlyingSourceType,
strategy_hwm: f64,
global: &GlobalScope,
can_gc: CanGc,
) -> DomRoot<ReadableByteStreamController> {
reflect_dom_object(
Box::new(ReadableByteStreamController::new_inherited(
underlying_source_type,
strategy_hwm,
global,
can_gc,
)),
global,
can_gc,
)
}
pub(crate) fn set_stream(&self, stream: &ReadableStream) {
self.stream.set(Some(stream))
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-pull-into>
pub(crate) fn perform_pull_into(
&self,
cx: SafeJSContext,
read_into_request: &ReadIntoRequest,
view: HeapBufferSource<ArrayBufferViewU8>,
options: &ReadableStreamBYOBReaderReadOptions,
can_gc: CanGc,
) {
// Let stream be controller.[[stream]].
let stream = self.stream.get().unwrap();
// Let elementSize be 1.
let mut element_size = 1;
// Let ctor be %DataView%.
let mut ctor = Constructor::DataView;
// If view has a [[TypedArrayName]] internal slot (i.e., it is not a DataView),
if view.has_typed_array_name() {
// Set elementSize to the element size specified in the
// typed array constructors table for view.[[TypedArrayName]].
let view_typw = view.get_array_buffer_view_type();
element_size = byte_size(view_typw);
// Set ctor to the constructor specified in the typed array constructors table for view.[[TypedArrayName]].
ctor = Constructor::Name(view_typw);
}
// Let minimumFill be min × elementSize.
let minimum_fill = options.min * element_size;
// Assert: minimumFill ≥ 0 and minimumFill ≤ view.[[ByteLength]].
assert!(minimum_fill <= (view.byte_length() as u64));
// Assert: the remainder after dividing minimumFill by elementSize is 0.
assert_eq!(minimum_fill % element_size, 0);
// Let byteOffset be view.[[ByteOffset]].
let byte_offset = view.get_byte_offset();
// Let byteLength be view.[[ByteLength]].
let byte_length = view.byte_length();
// Let bufferResult be TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
match view
.get_array_buffer_view_buffer(cx)
.transfer_array_buffer(cx)
{
Ok(buffer) => {
// Let buffer be bufferResult.[[Value]].
// Let pullIntoDescriptor be a new pull-into descriptor with
// buffer buffer
// buffer byte length buffer.[[ArrayBufferByteLength]]
// byte offset byteOffset
// byte length byteLength
// bytes filled 0
// minimum fill minimumFill
// element size elementSize
// view constructor ctor
// reader type "byob"
let buffer_byte_length = buffer.byte_length();
let pull_into_descriptor = PullIntoDescriptor {
buffer,
buffer_byte_length: buffer_byte_length as u64,
byte_offset: byte_offset as u64,
byte_length: byte_length as u64,
bytes_filled: Cell::new(0),
minimum_fill,
element_size,
view_constructor: ctor.clone(),
reader_type: Some(ReaderType::Byob),
};
// If controller.[[pendingPullIntos]] is not empty,
{
let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
if !pending_pull_intos.is_empty() {
// Append pullIntoDescriptor to controller.[[pendingPullIntos]].
pending_pull_intos.push(pull_into_descriptor);
// Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
stream.add_read_into_request(read_into_request);
// Return.
return;
}
}
// If stream.[[state]] is "closed",
if stream.is_closed() {
// Let emptyView be ! Construct(ctor, « pullIntoDescriptor’s buffer,
// pullIntoDescriptor’s byte offset, 0 »).
if let Ok(empty_view) = create_buffer_source_with_constructor(
cx,
&ctor,
&pull_into_descriptor.buffer,
pull_into_descriptor.byte_offset as usize,
0,
) {
// Perform readIntoRequest’s close steps, given emptyView.
let result = RootedTraceableBox::new(Heap::default());
rooted!(in(*cx) let mut view_value = UndefinedValue());
empty_view.get_buffer_view_value(cx, view_value.handle_mut());
result.set(*view_value);
read_into_request.close_steps(Some(result), can_gc);
// Return.
return;
} else {
return;
}
}
// If controller.[[queueTotalSize]] > 0,
if self.queue_total_size.get() > 0.0 {
// If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(
// controller, pullIntoDescriptor) is true,
if self.fill_pull_into_descriptor_from_queue(cx, &pull_into_descriptor) {
// Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(
// pullIntoDescriptor).
if let Ok(filled_view) =
self.convert_pull_into_descriptor(cx, &pull_into_descriptor)
{
// Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
self.handle_queue_drain(can_gc);
// Perform readIntoRequest’s chunk steps, given filledView.
let result = RootedTraceableBox::new(Heap::default());
rooted!(in(*cx) let mut view_value = UndefinedValue());
filled_view.get_buffer_view_value(cx, view_value.handle_mut());
result.set(*view_value);
read_into_request.chunk_steps(result, can_gc);
// Return.
return;
} else {
return;
}
}
// If controller.[[closeRequested]] is true,
if self.close_requested.get() {
// Let e be a new TypeError exception.
rooted!(in(*cx) let mut error = UndefinedValue());
Error::Type("close requested".to_owned()).to_jsval(
cx,
&self.global(),
error.handle_mut(),
can_gc,
);
// Perform ! ReadableByteStreamControllerError(controller, e).
self.error(error.handle(), can_gc);
// Perform readIntoRequest’s error steps, given e.
read_into_request.error_steps(error.handle(), can_gc);
// Return.
return;
}
}
// Append pullIntoDescriptor to controller.[[pendingPullIntos]].
{
self.pending_pull_intos
.borrow_mut()
.push(pull_into_descriptor);
}
// Perform ! ReadableStreamAddReadIntoRequest(stream, readIntoRequest).
stream.add_read_into_request(read_into_request);
// Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
self.call_pull_if_needed(can_gc);
},
Err(error) => {
// If bufferResult is an abrupt completion,
// Perform readIntoRequest’s error steps, given bufferResult.[[Value]].
rooted!(in(*cx) let mut rval = UndefinedValue());
error
.clone()
.to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
read_into_request.error_steps(rval.handle(), can_gc);
// Return.
},
}
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond>
pub(crate) fn respond(
&self,
cx: SafeJSContext,
bytes_written: u64,
can_gc: CanGc,
) -> Fallible<()> {
{
// Assert: controller.[[pendingPullIntos]] is not empty.
let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
assert!(!pending_pull_intos.is_empty());
// Let firstDescriptor be controller.[[pendingPullIntos]][0].
let first_descriptor = pending_pull_intos.first_mut().unwrap();
// Let state be controller.[[stream]].[[state]].
let stream = self.stream.get().unwrap();
// If state is "closed",
if stream.is_closed() {
// If bytesWritten is not 0, throw a TypeError exception.
if bytes_written != 0 {
return Err(Error::Type(
"bytesWritten not zero on closed stream".to_owned(),
));
}
} else {
// Assert: state is "readable".
assert!(stream.is_readable());
// If bytesWritten is 0, throw a TypeError exception.
if bytes_written == 0 {
return Err(Error::Type("bytesWritten is 0".to_owned()));
}
// If firstDescriptor’s bytes filled + bytesWritten > firstDescriptor’s byte length,
// throw a RangeError exception.
if first_descriptor.bytes_filled.get() + bytes_written >
first_descriptor.byte_length
{
return Err(Error::Range(
"bytes filled + bytesWritten > byte length".to_owned(),
));
}
}
// Set firstDescriptor’s buffer to ! TransferArrayBuffer(firstDescriptor’s buffer).
first_descriptor.buffer = first_descriptor.buffer.transfer_array_buffer(cx)?;
}
// Perform ? ReadableByteStreamControllerRespondInternal(controller, bytesWritten).
self.respond_internal(cx, bytes_written, can_gc)
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-internal>
pub(crate) fn respond_internal(
&self,
cx: SafeJSContext,
bytes_written: u64,
can_gc: CanGc,
) -> Fallible<()> {
{
// Let firstDescriptor be controller.[[pendingPullIntos]][0].
let pending_pull_intos = self.pending_pull_intos.borrow();
let first_descriptor = pending_pull_intos.first().unwrap();
// Assert: ! CanTransferArrayBuffer(firstDescriptor’s buffer) is true
assert!(first_descriptor.buffer.can_transfer_array_buffer(cx));
}
// Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
self.invalidate_byob_request();
// Let state be controller.[[stream]].[[state]].
let stream = self.stream.get().unwrap();
// If state is "closed",
if stream.is_closed() {
// Assert: bytesWritten is 0.
assert_eq!(bytes_written, 0);
// Perform ! ReadableByteStreamControllerRespondInClosedState(controller, firstDescriptor).
self.respond_in_closed_state(cx, can_gc)?;
} else {
// Assert: state is "readable".
assert!(stream.is_readable());
// Assert: bytesWritten > 0.
assert!(bytes_written > 0);
// Perform ? ReadableByteStreamControllerRespondInReadableState(controller, bytesWritten, firstDescriptor).
self.respond_in_readable_state(cx, bytes_written, can_gc)?;
}
// Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
self.call_pull_if_needed(can_gc);
Ok(())
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-closed-state>
pub(crate) fn respond_in_closed_state(&self, cx: SafeJSContext, can_gc: CanGc) -> Fallible<()> {
let pending_pull_intos = self.pending_pull_intos.borrow();
let first_descriptor = pending_pull_intos.first().unwrap();
// Assert: the remainder after dividing firstDescriptor’s bytes filled
// by firstDescriptor’s element size is 0.
assert_eq!(
first_descriptor.bytes_filled.get() % first_descriptor.element_size,
0
);
// If firstDescriptor’s reader type is "none",
// perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
let reader_type = first_descriptor.reader_type.is_none();
// needed to drop the borrow and avoid BorrowMutError
drop(pending_pull_intos);
if reader_type {
self.shift_pending_pull_into();
}
// Let stream be controller.[[stream]].
let stream = self.stream.get().unwrap();
// If ! ReadableStreamHasBYOBReader(stream) is true,
if stream.has_byob_reader() {
// Let filledPullIntos be a new empty list.
let mut filled_pull_intos = Vec::new();
// While filledPullIntos’s size < ! ReadableStreamGetNumReadIntoRequests(stream),
while filled_pull_intos.len() < stream.get_num_read_into_requests() {
// Let pullIntoDescriptor be ! ReadableByteStreamControllerShiftPendingPullInto(controller).
let pull_into_descriptor = self.shift_pending_pull_into();
// Append pullIntoDescriptor to filledPullIntos.
filled_pull_intos.push(pull_into_descriptor);
}
// For each filledPullInto of filledPullIntos,
for filled_pull_into in filled_pull_intos {
// Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)?;
}
}
Ok(())
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-readable-state>
pub(crate) fn respond_in_readable_state(
&self,
cx: SafeJSContext,
bytes_written: u64,
can_gc: CanGc,
) -> Fallible<()> {
let pending_pull_intos = self.pending_pull_intos.borrow();
let first_descriptor = pending_pull_intos.first().unwrap();
// Assert: pullIntoDescriptor’s bytes filled + bytesWritten ≤ pullIntoDescriptor’s byte length.
assert!(
first_descriptor.bytes_filled.get() + bytes_written <= first_descriptor.byte_length
);
// Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
// controller, bytesWritten, pullIntoDescriptor).
self.fill_head_pull_into_descriptor(bytes_written, first_descriptor);
// If pullIntoDescriptor’s reader type is "none",
if first_descriptor.reader_type.is_none() {
// needed to drop the borrow and avoid BorrowMutError
drop(pending_pull_intos);
// Perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(controller, pullIntoDescriptor).
self.enqueue_detached_pull_into_to_queue(cx, can_gc)?;
// Let filledPullIntos be the result of performing
// ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
// For each filledPullInto of filledPullIntos,
for filled_pull_into in filled_pull_intos {
// Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]]
// , filledPullInto).
self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)?;
}
// Return.
return Ok(());
}
// If pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill, return.
if first_descriptor.bytes_filled.get() < first_descriptor.minimum_fill {
return Ok(());
}
// needed to drop the borrow and avoid BorrowMutError
drop(pending_pull_intos);
// Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
let pull_into_descriptor = self.shift_pending_pull_into();
// Let remainderSize be the remainder after dividing pullIntoDescriptor’s bytes
// filled by pullIntoDescriptor’s element size.
let remainder_size =
pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size;
// If remainderSize > 0,
if remainder_size > 0 {
// Let end be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
let end = pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
// Perform ? ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
// pullIntoDescriptor’s buffer, end − remainderSize, remainderSize).
self.enqueue_cloned_chunk_to_queue(
cx,
&pull_into_descriptor.buffer,
end - remainder_size,
remainder_size,
can_gc,
)?;
}
// Set pullIntoDescriptor’s bytes filled to pullIntoDescriptor’s bytes filled − remainderSize.
pull_into_descriptor
.bytes_filled
.set(pull_into_descriptor.bytes_filled.get() - remainder_size);
// Let filledPullIntos be the result of performing
// ! ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
// Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], pullIntoDescriptor).
self.commit_pull_into_descriptor(cx, &pull_into_descriptor, can_gc)?;
// For each filledPullInto of filledPullIntos,
for filled_pull_into in filled_pull_intos {
// Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(controller.[[stream]], filledPullInto).
self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)?;
}
Ok(())
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-with-new-view>
pub(crate) fn respond_with_new_view(
&self,
cx: SafeJSContext,
view: HeapBufferSource<ArrayBufferViewU8>,
can_gc: CanGc,
) -> Fallible<()> {
let view_byte_length;
{
// Assert: controller.[[pendingPullIntos]] is not empty.
let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
assert!(!pending_pull_intos.is_empty());
// Assert: ! IsDetachedBuffer(view.[[ViewedArrayBuffer]]) is false.
assert!(!view.is_detached_buffer(cx));
// Let firstDescriptor be controller.[[pendingPullIntos]][0].
let first_descriptor = pending_pull_intos.first_mut().unwrap();
// Let state be controller.[[stream]].[[state]].
let stream = self.stream.get().unwrap();
// If state is "closed",
if stream.is_closed() {
// If view.[[ByteLength]] is not 0, throw a TypeError exception.
if view.byte_length() != 0 {
return Err(Error::Type("view byte length is not 0".to_owned()));
}
} else {
// Assert: state is "readable".
assert!(stream.is_readable());
// If view.[[ByteLength]] is 0, throw a TypeError exception.
if view.byte_length() == 0 {
return Err(Error::Type("view byte length is 0".to_owned()));
}
}
// If firstDescriptor’s byte offset + firstDescriptor’ bytes filled is not view.[[ByteOffset]],
// throw a RangeError exception.
if first_descriptor.byte_offset + first_descriptor.bytes_filled.get() !=
(view.get_byte_offset() as u64)
{
return Err(Error::Range(
"firstDescriptor's byte offset + bytes filled is not view byte offset"
.to_owned(),
));
}
// If firstDescriptor’s buffer byte length is not view.[[ViewedArrayBuffer]].[[ByteLength]],
// throw a RangeError exception.
if first_descriptor.buffer_byte_length !=
(view.viewed_buffer_array_byte_length(cx) as u64)
{
return Err(Error::Range(
"firstDescriptor's buffer byte length is not view viewed buffer array byte length"
.to_owned(),
));
}
// If firstDescriptor’s bytes filled + view.[[ByteLength]] > firstDescriptor’s byte length,
// throw a RangeError exception.
if first_descriptor.bytes_filled.get() + (view.byte_length()) as u64 >
first_descriptor.byte_length
{
return Err(Error::Range(
"bytes filled + view byte length > byte length".to_owned(),
));
}
// Let viewByteLength be view.[[ByteLength]].
view_byte_length = view.byte_length();
// Set firstDescriptor’s buffer to ? TransferArrayBuffer(view.[[ViewedArrayBuffer]]).
first_descriptor.buffer = view
.get_array_buffer_view_buffer(cx)
.transfer_array_buffer(cx)?;
}
// Perform ? ReadableByteStreamControllerRespondInternal(controller, viewByteLength).
self.respond_internal(cx, view_byte_length as u64, can_gc)
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-get-desired-size>
pub(crate) fn get_desired_size(&self) -> Option<f64> {
// Let state be controller.[[stream]].[[state]].
let stream = self.stream.get()?;
// If state is "errored", return null.
if stream.is_errored() {
return None;
}
// If state is "closed", return 0.
if stream.is_closed() {
return Some(0.0);
}
// Return controller.[[strategyHWM]] − controller.[[queueTotalSize]].
Some(self.strategy_hwm - self.queue_total_size.get())
}
/// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollergetbyobrequest>
pub(crate) fn get_byob_request(
&self,
cx: SafeJSContext,
can_gc: CanGc,
) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
// If controller.[[byobRequest]] is null and controller.[[pendingPullIntos]] is not empty,
let pending_pull_intos = self.pending_pull_intos.borrow();
if self.byob_request.get().is_none() && !pending_pull_intos.is_empty() {
// Let firstDescriptor be controller.[[pendingPullIntos]][0].
let first_descriptor = pending_pull_intos.first().unwrap();
// Let view be ! Construct(%Uint8Array%, « firstDescriptor’s buffer,
// firstDescriptor’s byte offset + firstDescriptor’s bytes filled,
// firstDescriptor’s byte length − firstDescriptor’s bytes filled »).
let byte_offset = first_descriptor.byte_offset + first_descriptor.bytes_filled.get();
let byte_length = first_descriptor.byte_length - first_descriptor.bytes_filled.get();
let view = create_buffer_source_with_constructor(
cx,
&Constructor::Name(Type::Uint8),
&first_descriptor.buffer,
byte_offset as usize,
byte_length as usize,
)?;
// Let byobRequest be a new ReadableStreamBYOBRequest.
let byob_request = ReadableStreamBYOBRequest::new(&self.global(), can_gc);
// Set byobRequest.[[controller]] to controller.
byob_request.set_controller(Some(&DomRoot::from_ref(self)));
// Set byobRequest.[[view]] to view.
byob_request.set_view(Some(view));
// Set controller.[[byobRequest]] to byobRequest.
self.byob_request.set(Some(&byob_request));
}
// Return controller.[[byobRequest]].
Ok(self.byob_request.get())
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-close>
pub(crate) fn close(&self, cx: SafeJSContext, can_gc: CanGc) -> Fallible<()> {
// Let stream be controller.[[stream]].
let stream = self.stream.get().unwrap();
// If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
if self.close_requested.get() || !stream.is_readable() {
return Ok(());
}
// If controller.[[queueTotalSize]] > 0,
if self.queue_total_size.get() > 0.0 {
// Set controller.[[closeRequested]] to true.
self.close_requested.set(true);
// Return.
return Ok(());
}
// If controller.[[pendingPullIntos]] is not empty,
let pending_pull_intos = self.pending_pull_intos.borrow();
if !pending_pull_intos.is_empty() {
// Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
let first_pending_pull_into = pending_pull_intos.first().unwrap();
// If the remainder after dividing firstPendingPullInto’s bytes filled by
// firstPendingPullInto’s element size is not 0,
if first_pending_pull_into.bytes_filled.get() % first_pending_pull_into.element_size !=
0
{
// needed to drop the borrow and avoid BorrowMutError
drop(pending_pull_intos);
// Let e be a new TypeError exception.
let e = Error::Type(
"remainder after dividing firstPendingPullInto's bytes
filled by firstPendingPullInto's element size is not 0"
.to_owned(),
);
// Perform ! ReadableByteStreamControllerError(controller, e).
rooted!(in(*cx) let mut error = UndefinedValue());
e.clone()
.to_jsval(cx, &self.global(), error.handle_mut(), can_gc);
self.error(error.handle(), can_gc);
// Throw e.
return Err(e);
}
}
// Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
self.clear_algorithms();
// Perform ! ReadableStreamClose(stream).
stream.close(can_gc);
Ok(())
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-error>
pub(crate) fn error(&self, e: SafeHandleValue, can_gc: CanGc) {
// Let stream be controller.[[stream]].
let stream = self.stream.get().unwrap();
// If stream.[[state]] is not "readable", return.
if !stream.is_readable() {
return;
}
// Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
self.clear_pending_pull_intos();
// Perform ! ResetQueue(controller).
self.reset_queue();
// Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
self.clear_algorithms();
// Perform ! ReadableStreamError(stream, e).
stream.error(e, can_gc);
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-algorithms>
fn clear_algorithms(&self) {
// Set controller.[[pullAlgorithm]] to undefined.
// Set controller.[[cancelAlgorithm]] to undefined.
self.underlying_source.set(None);
}
/// <https://streams.spec.whatwg.org/#reset-queue>
pub(crate) fn reset_queue(&self) {
// Assert: container has [[queue]] and [[queueTotalSize]] internal slots.
// Set container.[[queue]] to a new empty list.
self.queue.borrow_mut().clear();
// Set container.[[queueTotalSize]] to 0.
self.queue_total_size.set(0.0);
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-clear-pending-pull-intos>
pub(crate) fn clear_pending_pull_intos(&self) {
// Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
self.invalidate_byob_request();
// Set controller.[[pendingPullIntos]] to a new empty list.
self.pending_pull_intos.borrow_mut().clear();
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-invalidate-byob-request>
pub(crate) fn invalidate_byob_request(&self) {
if let Some(byob_request) = self.byob_request.get() {
// Set controller.[[byobRequest]].[[controller]] to undefined.
byob_request.set_controller(None);
// Set controller.[[byobRequest]].[[view]] to null.
byob_request.set_view(None);
// Set controller.[[byobRequest]] to null.
self.byob_request.set(None);
}
// If controller.[[byobRequest]] is null, return.
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue>
pub(crate) fn enqueue(
&self,
cx: SafeJSContext,
chunk: HeapBufferSource<ArrayBufferViewU8>,
can_gc: CanGc,
) -> Fallible<()> {
// Let stream be controller.[[stream]].
let stream = self.stream.get().unwrap();
// If controller.[[closeRequested]] is true or stream.[[state]] is not "readable", return.
if self.close_requested.get() || !stream.is_readable() {
return Ok(());
}
// Let buffer be chunk.[[ViewedArrayBuffer]].
let buffer = chunk.get_array_buffer_view_buffer(cx);
// Let byteOffset be chunk.[[ByteOffset]].
let byte_offset = chunk.get_byte_offset();
// Let byteLength be chunk.[[ByteLength]].
let byte_length = chunk.byte_length();
// If ! IsDetachedBuffer(buffer) is true, throw a TypeError exception.
if buffer.is_detached_buffer(cx) {
return Err(Error::Type("buffer is detached".to_owned()));
}
// Let transferredBuffer be ? TransferArrayBuffer(buffer).
let transferred_buffer = buffer.transfer_array_buffer(cx)?;
// If controller.[[pendingPullIntos]] is not empty,
{
let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
if !pending_pull_intos.is_empty() {
// Let firstPendingPullInto be controller.[[pendingPullIntos]][0].
let first_descriptor = pending_pull_intos.first_mut().unwrap();
// If ! IsDetachedBuffer(firstPendingPullInto’s buffer) is true, throw a TypeError exception.
if first_descriptor.buffer.is_detached_buffer(cx) {
return Err(Error::Type("buffer is detached".to_owned()));
}
// Perform ! ReadableByteStreamControllerInvalidateBYOBRequest(controller).
self.invalidate_byob_request();
// Set firstPendingPullInto’s buffer to ! TransferArrayBuffer(firstPendingPullInto’s buffer).
first_descriptor.buffer = first_descriptor.buffer.transfer_array_buffer(cx)?;
// If firstPendingPullInto’s reader type is "none",
if first_descriptor.reader_type.is_none() {
// needed to drop the borrow and avoid BorrowMutError
drop(pending_pull_intos);
// perform ? ReadableByteStreamControllerEnqueueDetachedPullIntoToQueue(
// controller, firstPendingPullInto).
self.enqueue_detached_pull_into_to_queue(cx, can_gc)?;
}
}
}
// If ! ReadableStreamHasDefaultReader(stream) is true,
if stream.has_default_reader() {
// Perform ! ReadableByteStreamControllerProcessReadRequestsUsingQueue(controller).
self.process_read_requests_using_queue(cx, can_gc)?;
// If ! ReadableStreamGetNumReadRequests(stream) is 0,
if stream.get_num_read_requests() == 0 {
// Assert: controller.[[pendingPullIntos]] is empty.
{
assert!(self.pending_pull_intos.borrow().is_empty());
}
// Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
// controller, transferredBuffer, byteOffset, byteLength).
self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
} else {
// Assert: controller.[[queue]] is empty.
assert!(self.queue.borrow().is_empty());
// If controller.[[pendingPullIntos]] is not empty,
let pending_pull_intos = self.pending_pull_intos.borrow();
if !pending_pull_intos.is_empty() {
// Assert: controller.[[pendingPullIntos]][0]'s reader type is "default".
assert!(matches!(
pending_pull_intos.first().unwrap().reader_type,
Some(ReaderType::Default)
));
// needed to drop the borrow and avoid BorrowMutError
drop(pending_pull_intos);
// Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
self.shift_pending_pull_into();
}
// Let transferredView be ! Construct(%Uint8Array%, « transferredBuffer, byteOffset, byteLength »).
let transferred_view = create_buffer_source_with_constructor(
cx,
&Constructor::Name(Type::Uint8),
&transferred_buffer,
byte_offset,
byte_length,
)?;
// Perform ! ReadableStreamFulfillReadRequest(stream, transferredView, false).
rooted!(in(*cx) let mut view_value = UndefinedValue());
transferred_view.get_buffer_view_value(cx, view_value.handle_mut());
stream.fulfill_read_request(view_value.handle(), false, can_gc);
}
// Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true,
} else if stream.has_byob_reader() {
// Perform ! ReadableByteStreamControllerEnqueueChunkToQueue(
// controller, transferredBuffer, byteOffset, byteLength).
self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
// Let filledPullIntos be the result of performing !
// ReadableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(controller).
let filled_pull_intos = self.process_pull_into_descriptors_using_queue(cx);
// For each filledPullInto of filledPullIntos,
// Perform ! ReadableByteStreamControllerCommitPullIntoDescriptor(stream, filledPullInto).
for filled_pull_into in filled_pull_intos {
self.commit_pull_into_descriptor(cx, &filled_pull_into, can_gc)?;
}
} else {
// Assert: ! IsReadableStreamLocked(stream) is false.
assert!(!stream.is_locked());
// Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
// (controller, transferredBuffer, byteOffset, byteLength).
self.enqueue_chunk_to_queue(transferred_buffer, byte_offset, byte_length);
}
// Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
self.call_pull_if_needed(can_gc);
Ok(())
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-commit-pull-into-descriptor>
pub(crate) fn commit_pull_into_descriptor(
&self,
cx: SafeJSContext,
pull_into_descriptor: &PullIntoDescriptor,
can_gc: CanGc,
) -> Fallible<()> {
// Assert: stream.[[state]] is not "errored".
let stream = self.stream.get().unwrap();
assert!(!stream.is_errored());
// Assert: pullIntoDescriptor.reader type is not "none".
assert!(pull_into_descriptor.reader_type.is_some());
// Let done be false.
let mut done = false;
// If stream.[[state]] is "closed",
if stream.is_closed() {
// Assert: the remainder after dividing pullIntoDescriptor’s bytes filled
// by pullIntoDescriptor’s element size is 0.
assert!(
pull_into_descriptor.bytes_filled.get() % pull_into_descriptor.element_size == 0
);
// Set done to true.
done = true;
}
// Let filledView be ! ReadableByteStreamControllerConvertPullIntoDescriptor(pullIntoDescriptor).
let filled_view = self.convert_pull_into_descriptor(cx, pull_into_descriptor)?;
rooted!(in(*cx) let mut view_value = UndefinedValue());
filled_view.get_buffer_view_value(cx, view_value.handle_mut());
// If pullIntoDescriptor’s reader type is "default",
if matches!(pull_into_descriptor.reader_type, Some(ReaderType::Default)) {
// Perform ! ReadableStreamFulfillReadRequest(stream, filledView, done).
stream.fulfill_read_request(view_value.handle(), done, can_gc);
} else {
// Assert: pullIntoDescriptor’s reader type is "byob".
assert!(matches!(
pull_into_descriptor.reader_type,
Some(ReaderType::Byob)
));
// Perform ! ReadableStreamFulfillReadIntoRequest(stream, filledView, done).
stream.fulfill_read_into_request(view_value.handle(), done, can_gc);
}
Ok(())
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-convert-pull-into-descriptor>
pub(crate) fn convert_pull_into_descriptor(
&self,
cx: SafeJSContext,
pull_into_descriptor: &PullIntoDescriptor,
) -> Fallible<HeapBufferSource<ArrayBufferViewU8>> {
// Let bytesFilled be pullIntoDescriptor’s bytes filled.
let bytes_filled = pull_into_descriptor.bytes_filled.get();
// Let elementSize be pullIntoDescriptor’s element size.
let element_size = pull_into_descriptor.element_size;
// Assert: bytesFilled ≤ pullIntoDescriptor’s byte length.
assert!(bytes_filled <= pull_into_descriptor.byte_length);
// Assert: the remainder after dividing bytesFilled by elementSize is 0.
assert!(bytes_filled % element_size == 0);
// Let buffer be ! TransferArrayBuffer(pullIntoDescriptor’s buffer).
let buffer = pull_into_descriptor.buffer.transfer_array_buffer(cx)?;
// Return ! Construct(pullIntoDescriptor’s view constructor,
// « buffer, pullIntoDescriptor’s byte offset, bytesFilled ÷ elementSize »).
create_buffer_source_with_constructor(
cx,
&pull_into_descriptor.view_constructor,
&buffer,
pull_into_descriptor.byte_offset as usize,
(bytes_filled / element_size) as usize,
)
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-process-pull-into-descriptors-using-queue>
pub(crate) fn process_pull_into_descriptors_using_queue(
&self,
cx: SafeJSContext,
) -> Vec<PullIntoDescriptor> {
// Assert: controller.[[closeRequested]] is false.
assert!(!self.close_requested.get());
// Let filledPullIntos be a new empty list.
let mut filled_pull_intos = Vec::new();
// While controller.[[pendingPullIntos]] is not empty,
loop {
// If controller.[[queueTotalSize]] is 0, then break.
if self.queue_total_size.get() == 0.0 {
break;
}
// Let pullIntoDescriptor be controller.[[pendingPullIntos]][0].
let fill_pull_result = {
let pending_pull_intos = self.pending_pull_intos.borrow();
let Some(pull_into_descriptor) = pending_pull_intos.first() else {
break;
};
self.fill_pull_into_descriptor_from_queue(cx, pull_into_descriptor)
};
// If ! ReadableByteStreamControllerFillPullIntoDescriptorFromQueue(controller, pullIntoDescriptor) is true,
if fill_pull_result {
// Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
let pull_into_descriptor = self.shift_pending_pull_into();
// Append pullIntoDescriptor to filledPullIntos.
filled_pull_intos.push(pull_into_descriptor);
}
}
// Return filledPullIntos.
filled_pull_intos
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-pull-into-descriptor-from-queue>
pub(crate) fn fill_pull_into_descriptor_from_queue(
&self,
cx: SafeJSContext,
pull_into_descriptor: &PullIntoDescriptor,
) -> bool {
// Let maxBytesToCopy be min(controller.[[queueTotalSize]],
// pullIntoDescriptor’s byte length − pullIntoDescriptor’s bytes filled).
let max_bytes_to_copy = min(
self.queue_total_size.get() as usize,
(pull_into_descriptor.byte_length - pull_into_descriptor.bytes_filled.get()) as usize,
);
// Let maxBytesFilled be pullIntoDescriptor’s bytes filled + maxBytesToCopy.
let max_bytes_filled = pull_into_descriptor.bytes_filled.get() as usize + max_bytes_to_copy;
// Let totalBytesToCopyRemaining be maxBytesToCopy.
let mut total_bytes_to_copy_remaining = max_bytes_to_copy;
// Let ready be false.
let mut ready = false;
// Assert: ! IsDetachedBuffer(pullIntoDescriptor’s buffer) is false.
assert!(!pull_into_descriptor.buffer.is_detached_buffer(cx));
// Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
// Let remainderBytes be the remainder after dividing maxBytesFilled by pullIntoDescriptor’s element size.
let remainder_bytes = max_bytes_filled % pull_into_descriptor.element_size as usize;
// Let maxAlignedBytes be maxBytesFilled − remainderBytes.
let max_aligned_bytes = max_bytes_filled - remainder_bytes;
// If maxAlignedBytes ≥ pullIntoDescriptor’s minimum fill,
if max_aligned_bytes >= pull_into_descriptor.minimum_fill as usize {
// Set totalBytesToCopyRemaining to maxAlignedBytes − pullIntoDescriptor’s bytes filled.
total_bytes_to_copy_remaining =
max_aligned_bytes - (pull_into_descriptor.bytes_filled.get() as usize);
// Set ready to true.
ready = true;
}
// Let queue be controller.[[queue]].
let mut queue = self.queue.borrow_mut();
// While totalBytesToCopyRemaining > 0,
while total_bytes_to_copy_remaining > 0 {
// Let headOfQueue be queue[0].
let head_of_queue = queue.front_mut().unwrap();
// Let bytesToCopy be min(totalBytesToCopyRemaining, headOfQueue’s byte length).
let bytes_to_copy = total_bytes_to_copy_remaining.min(head_of_queue.byte_length);
// Let destStart be pullIntoDescriptor’s byte offset + pullIntoDescriptor’s bytes filled.
let dest_start =
pull_into_descriptor.byte_offset + pull_into_descriptor.bytes_filled.get();
// Let descriptorBuffer be pullIntoDescriptor’s buffer.
let descriptor_buffer = &pull_into_descriptor.buffer;
// Let queueBuffer be headOfQueue’s buffer.
let queue_buffer = &head_of_queue.buffer;
// Let queueByteOffset be headOfQueue’s byte offset.
let queue_byte_offset = head_of_queue.byte_offset;
// Assert: ! CanCopyDataBlockBytes(descriptorBuffer, destStart,
// queueBuffer, queueByteOffset, bytesToCopy) is true.
assert!(descriptor_buffer.can_copy_data_block_bytes(
cx,
dest_start as usize,
queue_buffer,
queue_byte_offset,
bytes_to_copy
));
// Perform ! CopyDataBlockBytes(descriptorBuffer.[[ArrayBufferData]], destStart,
// queueBuffer.[[ArrayBufferData]], queueByteOffset, bytesToCopy).
descriptor_buffer.copy_data_block_bytes(
cx,
dest_start as usize,
queue_buffer,
queue_byte_offset,
bytes_to_copy,
);
// If headOfQueue’s byte length is bytesToCopy,
if head_of_queue.byte_length == bytes_to_copy {
// Remove queue[0].
queue.pop_front().unwrap();
} else {
// Set headOfQueue’s byte offset to headOfQueue’s byte offset + bytesToCopy.
head_of_queue.byte_offset += bytes_to_copy;
// Set headOfQueue’s byte length to headOfQueue’s byte length − bytesToCopy.
head_of_queue.byte_length -= bytes_to_copy;
}
// Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − bytesToCopy.
self.queue_total_size
.set(self.queue_total_size.get() - (bytes_to_copy as f64));
// Perform ! ReadableByteStreamControllerFillHeadPullIntoDescriptor(
// controller, bytesToCopy, pullIntoDescriptor).
self.fill_head_pull_into_descriptor(bytes_to_copy as u64, pull_into_descriptor);
// Set totalBytesToCopyRemaining to totalBytesToCopyRemaining − bytesToCopy.
total_bytes_to_copy_remaining -= bytes_to_copy;
}
// If ready is false,
if !ready {
// Assert: controller.[[queueTotalSize]] is 0.
assert!(self.queue_total_size.get() == 0.0);
// Assert: pullIntoDescriptor’s bytes filled > 0.
assert!(pull_into_descriptor.bytes_filled.get() > 0);
// Assert: pullIntoDescriptor’s bytes filled < pullIntoDescriptor’s minimum fill.
assert!(pull_into_descriptor.bytes_filled.get() < pull_into_descriptor.minimum_fill);
}
// Return ready.
ready
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-fill-head-pull-into-descriptor>
pub(crate) fn fill_head_pull_into_descriptor(
&self,
bytes_copied: u64,
pull_into_descriptor: &PullIntoDescriptor,
) {
// Assert: either controller.[[pendingPullIntos]] is empty,
// or controller.[[pendingPullIntos]][0] is pullIntoDescriptor.
{
let pending_pull_intos = self.pending_pull_intos.borrow();
assert!(
pending_pull_intos.is_empty() ||
pending_pull_intos.first().unwrap() == pull_into_descriptor
);
}
// Assert: controller.[[byobRequest]] is null.
assert!(self.byob_request.get().is_none());
// Set pullIntoDescriptor’s bytes filled to bytes filled + size.
pull_into_descriptor
.bytes_filled
.set(pull_into_descriptor.bytes_filled.get() + bytes_copied);
}
/// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueuedetachedpullintotoqueue>
pub(crate) fn enqueue_detached_pull_into_to_queue(
&self,
cx: SafeJSContext,
can_gc: CanGc,
) -> Fallible<()> {
// first_descriptor: &PullIntoDescriptor,
let pending_pull_intos = self.pending_pull_intos.borrow();
let first_descriptor = pending_pull_intos.first().unwrap();
// Assert: pullIntoDescriptor’s reader type is "none".
assert!(first_descriptor.reader_type.is_none());
// If pullIntoDescriptor’s bytes filled > 0, perform ?
// ReadableByteStreamControllerEnqueueClonedChunkToQueue(controller,
// pullIntoDescriptor’s buffer, pullIntoDescriptor’s byte offset, pullIntoDescriptor’s bytes filled).
if first_descriptor.bytes_filled.get() > 0 {
self.enqueue_cloned_chunk_to_queue(
cx,
&first_descriptor.buffer,
first_descriptor.byte_offset,
first_descriptor.bytes_filled.get(),
can_gc,
)?;
}
// needed to drop the borrow and avoid BorrowMutError
drop(pending_pull_intos);
// Perform ! ReadableByteStreamControllerShiftPendingPullInto(controller).
self.shift_pending_pull_into();
Ok(())
}
/// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerenqueueclonedchunktoqueue>
pub(crate) fn enqueue_cloned_chunk_to_queue(
&self,
cx: SafeJSContext,
buffer: &HeapBufferSource<ArrayBufferU8>,
byte_offset: u64,
byte_length: u64,
can_gc: CanGc,
) -> Fallible<()> {
// Let cloneResult be CloneArrayBuffer(buffer, byteOffset, byteLength, %ArrayBuffer%).
if let Some(clone_result) =
buffer.clone_array_buffer(cx, byte_offset as usize, byte_length as usize)
{
// Perform ! ReadableByteStreamControllerEnqueueChunkToQueue
// (controller, cloneResult.[[Value]], 0, byteLength).
self.enqueue_chunk_to_queue(clone_result, 0, byte_length as usize);
Ok(())
} else {
// If cloneResult is an abrupt completion,
// Perform ! ReadableByteStreamControllerError(controller, cloneResult.[[Value]]).
rooted!(in(*cx) let mut rval = UndefinedValue());
let error = Error::Type("can not clone array buffer".to_owned());
error
.clone()
.to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
self.error(rval.handle(), can_gc);
// Return cloneResult.
Err(error)
}
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-enqueue-chunk-to-queue>
pub(crate) fn enqueue_chunk_to_queue(
&self,
buffer: HeapBufferSource<ArrayBufferU8>,
byte_offset: usize,
byte_length: usize,
) {
// Let entry be a new ReadableByteStreamQueueEntry object.
let entry = QueueEntry::new(buffer, byte_offset, byte_length);
// Append entry to controller.[[queue]].
self.queue.borrow_mut().push_back(entry);
// Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] + byteLength.
self.queue_total_size
.set(self.queue_total_size.get() + byte_length as f64);
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-shift-pending-pull-into>
pub(crate) fn shift_pending_pull_into(&self) -> PullIntoDescriptor {
// Assert: controller.[[byobRequest]] is null.
assert!(self.byob_request.get().is_none());
// Let descriptor be controller.[[pendingPullIntos]][0].
// Remove descriptor from controller.[[pendingPullIntos]].
// Return descriptor.
self.pending_pull_intos.borrow_mut().remove(0)
}
/// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerprocessreadrequestsusingqueue>
pub(crate) fn process_read_requests_using_queue(
&self,
cx: SafeJSContext,
can_gc: CanGc,
) -> Fallible<()> {
// Let reader be controller.[[stream]].[[reader]].
// Assert: reader implements ReadableStreamDefaultReader.
let reader = self.stream.get().unwrap().get_default_reader();
// Step 3
reader.process_read_requests(cx, DomRoot::from_ref(self), can_gc)
}
/// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontrollerfillreadrequestfromqueue>
pub(crate) fn fill_read_request_from_queue(
&self,
cx: SafeJSContext,
read_request: &ReadRequest,
can_gc: CanGc,
) -> Fallible<()> {
// Assert: controller.[[queueTotalSize]] > 0.
assert!(self.queue_total_size.get() > 0.0);
// Also assert that the queue has a non-zero length;
assert!(!self.queue.borrow().is_empty());
// Let entry be controller.[[queue]][0].
// Remove entry from controller.[[queue]].
let entry = self.remove_entry();
// Set controller.[[queueTotalSize]] to controller.[[queueTotalSize]] − entry’s byte length.
self.queue_total_size
.set(self.queue_total_size.get() - entry.byte_length as f64);
// Perform ! ReadableByteStreamControllerHandleQueueDrain(controller).
self.handle_queue_drain(can_gc);
// Let view be ! Construct(%Uint8Array%, « entry’s buffer, entry’s byte offset, entry’s byte length »).
let view = create_buffer_source_with_constructor(
cx,
&Constructor::Name(Type::Uint8),
&entry.buffer,
entry.byte_offset,
entry.byte_length,
)?;
// Perform readRequest’s chunk steps, given view.
let result = RootedTraceableBox::new(Heap::default());
rooted!(in(*cx) let mut view_value = UndefinedValue());
view.get_buffer_view_value(cx, view_value.handle_mut());
result.set(*view_value);
read_request.chunk_steps(result, can_gc);
Ok(())
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-handle-queue-drain>
pub(crate) fn handle_queue_drain(&self, can_gc: CanGc) {
// Assert: controller.[[stream]].[[state]] is "readable".
assert!(self.stream.get().unwrap().is_readable());
// If controller.[[queueTotalSize]] is 0 and controller.[[closeRequested]] is true,
if self.queue_total_size.get() == 0.0 && self.close_requested.get() {
// Perform ! ReadableByteStreamControllerClearAlgorithms(controller).
self.clear_algorithms();
// Perform ! ReadableStreamClose(controller.[[stream]]).
self.stream.get().unwrap().close(can_gc);
} else {
// Perform ! ReadableByteStreamControllerCallPullIfNeeded(controller).
self.call_pull_if_needed(can_gc);
}
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-call-pull-if-needed>
pub(crate) fn call_pull_if_needed(&self, can_gc: CanGc) {
// Let shouldPull be ! ReadableByteStreamControllerShouldCallPull(controller).
let should_pull = self.should_call_pull();
// If shouldPull is false, return.
if !should_pull {
return;
}
// If controller.[[pulling]] is true,
if self.pulling.get() {
// Set controller.[[pullAgain]] to true.
self.pull_again.set(true);
// Return.
return;
}
// Assert: controller.[[pullAgain]] is false.
assert!(!self.pull_again.get());
// Set controller.[[pulling]] to true.
self.pulling.set(true);
// Let pullPromise be the result of performing controller.[[pullAlgorithm]].
// Continues into the resolve and reject handling of the native handler.
let global = self.global();
let rooted_controller = DomRoot::from_ref(self);
let controller = Controller::ReadableByteStreamController(rooted_controller.clone());
if let Some(underlying_source) = self.underlying_source.get() {
let handler = PromiseNativeHandler::new(
&global,
Some(Box::new(PullAlgorithmFulfillmentHandler {
controller: Dom::from_ref(&rooted_controller),
})),
Some(Box::new(PullAlgorithmRejectionHandler {
controller: Dom::from_ref(&rooted_controller),
})),
can_gc,
);
let realm = enter_realm(&*global);
let comp = InRealm::Entered(&realm);
let result = underlying_source
.call_pull_algorithm(controller, &global, can_gc)
.unwrap_or_else(|| {
let promise = Promise::new(&global, can_gc);
promise.resolve_native(&(), can_gc);
Ok(promise)
});
let promise = result.unwrap_or_else(|error| {
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut rval = UndefinedValue());
// TODO: check if `self.global()` is the right globalscope.
error
.clone()
.to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
let promise = Promise::new(&global, can_gc);
promise.reject_native(&rval.handle(), can_gc);
promise
});
promise.append_native_handler(&handler, comp, can_gc);
}
}
/// <https://streams.spec.whatwg.org/#readable-byte-stream-controller-should-call-pull>
fn should_call_pull(&self) -> bool {
// Let stream be controller.[[stream]].
// Note: the spec does not assert that stream is not undefined here,
// so we return false if it is.
let stream = self.stream.get().unwrap();
// If stream.[[state]] is not "readable", return false.
if !stream.is_readable() {
return false;
}
// If controller.[[closeRequested]] is true, return false.
if self.close_requested.get() {
return false;
}
// If controller.[[started]] is false, return false.
if !self.started.get() {
return false;
}
// If ! ReadableStreamHasDefaultReader(stream) is true and ! ReadableStreamGetNumReadRequests(stream) > 0
// , return true.
if stream.has_default_reader() && stream.get_num_read_requests() > 0 {
return true;
}
// If ! ReadableStreamHasBYOBReader(stream) is true and ! ReadableStreamGetNumReadIntoRequests(stream) > 0
// , return true.
if stream.has_byob_reader() && stream.get_num_read_into_requests() > 0 {
return true;
}
// Let desiredSize be ! ReadableByteStreamControllerGetDesiredSize(controller).
let desired_size = self.get_desired_size();
// Assert: desiredSize is not null.
assert!(desired_size.is_some());
// If desiredSize > 0, return true.
if desired_size.unwrap() > 0. {
return true;
}
// Return false.
false
}
/// <https://streams.spec.whatwg.org/#set-up-readable-byte-stream-controller>
pub(crate) fn setup(
&self,
global: &GlobalScope,
stream: DomRoot<ReadableStream>,
can_gc: CanGc,
) -> Fallible<()> {
// Assert: stream.[[controller]] is undefined.
stream.assert_no_controller();
// If autoAllocateChunkSize is not undefined,
if self.auto_allocate_chunk_size.is_some() {
// Assert: ! IsInteger(autoAllocateChunkSize) is true. Implicit
// Assert: autoAllocateChunkSize is positive. (Implicit by type.)
}
// Set controller.[[stream]] to stream.
self.stream.set(Some(&stream));
// Set controller.[[pullAgain]] and controller.[[pulling]] to false.
self.pull_again.set(false);
self.pulling.set(false);
// Set controller.[[byobRequest]] to null.
self.byob_request.set(None);
// Perform ! ResetQueue(controller).
self.reset_queue();
// Set controller.[[closeRequested]] and controller.[[started]] to false.
self.close_requested.set(false);
self.started.set(false);
// Set controller.[[strategyHWM]] to highWaterMark.
// Set controller.[[pullAlgorithm]] to pullAlgorithm.
// Set controller.[[cancelAlgorithm]] to cancelAlgorithm.
// Set controller.[[autoAllocateChunkSize]] to autoAllocateChunkSize.
// Set controller.[[pendingPullIntos]] to a new empty list.
// Note: the above steps are done in `new`.
// Set stream.[[controller]] to controller.
let rooted_byte_controller = DomRoot::from_ref(self);
stream.set_byte_controller(&rooted_byte_controller);
if let Some(underlying_source) = rooted_byte_controller.underlying_source.get() {
// Let startResult be the result of performing startAlgorithm. (This might throw an exception.)
let start_result = underlying_source
.call_start_algorithm(
Controller::ReadableByteStreamController(rooted_byte_controller.clone()),
can_gc,
)
.unwrap_or_else(|| {
let promise = Promise::new(global, can_gc);
promise.resolve_native(&(), can_gc);
Ok(promise)
});
// Let startPromise be a promise resolved with startResult.
let start_promise = start_result?;
// Upon fulfillment of startPromise, Upon rejection of startPromise with reason r,
let handler = PromiseNativeHandler::new(
global,
Some(Box::new(StartAlgorithmFulfillmentHandler {
controller: Dom::from_ref(&rooted_byte_controller),
})),
Some(Box::new(StartAlgorithmRejectionHandler {
controller: Dom::from_ref(&rooted_byte_controller),
})),
can_gc,
);
let realm = enter_realm(global);
let comp = InRealm::Entered(&realm);
start_promise.append_native_handler(&handler, comp, can_gc);
};
Ok(())
}
// <https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontroller-releasesteps
pub(crate) fn perform_release_steps(&self) -> Fallible<()> {
// If this.[[pendingPullIntos]] is not empty,
let mut pending_pull_intos = self.pending_pull_intos.borrow_mut();
if !pending_pull_intos.is_empty() {
// Let firstPendingPullInto be this.[[pendingPullIntos]][0].
let mut first_pending_pull_into = pending_pull_intos.remove(0);
// Set firstPendingPullInto’s reader type to "none".
first_pending_pull_into.reader_type = None;
// Set this.[[pendingPullIntos]] to the list « firstPendingPullInto »
pending_pull_intos.clear();
pending_pull_intos.push(first_pending_pull_into);
}
Ok(())
}
/// <https://streams.spec.whatwg.org/#rbs-controller-private-cancel>
pub(crate) fn perform_cancel_steps(
&self,
cx: SafeJSContext,
global: &GlobalScope,
reason: SafeHandleValue,
can_gc: CanGc,
) -> Rc<Promise> {
// Perform ! ReadableByteStreamControllerClearPendingPullIntos(this).
self.clear_pending_pull_intos();
// Perform ! ResetQueue(this).
self.reset_queue();
let underlying_source = self
.underlying_source
.get()
.expect("Controller should have a source when the cancel steps are called into.");
// Let result be the result of performing this.[[cancelAlgorithm]], passing in reason.
let result = underlying_source
.call_cancel_algorithm(cx, global, reason, can_gc)
.unwrap_or_else(|| {
let promise = Promise::new(global, can_gc);
promise.resolve_native(&(), can_gc);
Ok(promise)
});
let promise = result.unwrap_or_else(|error| {
let cx = GlobalScope::get_cx();
rooted!(in(*cx) let mut rval = UndefinedValue());
error
.clone()
.to_jsval(cx, global, rval.handle_mut(), can_gc);
let promise = Promise::new(global, can_gc);
promise.reject_native(&rval.handle(), can_gc);
promise
});
// Perform ! ReadableByteStreamControllerClearAlgorithms(this).
self.clear_algorithms();
// Return result(the promise).
promise
}
/// <https://streams.spec.whatwg.org/#rbs-controller-private-pull>
pub(crate) fn perform_pull_steps(
&self,
cx: SafeJSContext,
read_request: &ReadRequest,
can_gc: CanGc,
) {
// Let stream be this.[[stream]].
let stream = self.stream.get().unwrap();
// Assert: ! ReadableStreamHasDefaultReader(stream) is true.
assert!(stream.has_default_reader());
// If this.[[queueTotalSize]] > 0,
if self.queue_total_size.get() > 0.0 {
// Assert: ! ReadableStreamGetNumReadRequests(stream) is 0.
assert_eq!(stream.get_num_read_requests(), 0);
// Perform ! ReadableByteStreamControllerFillReadRequestFromQueue(this, readRequest).
let _ = self.fill_read_request_from_queue(cx, read_request, can_gc);
// Return.
return;
}
// Let autoAllocateChunkSize be this.[[autoAllocateChunkSize]].
let auto_allocate_chunk_size = self.auto_allocate_chunk_size;
// If autoAllocateChunkSize is not undefined,
if let Some(auto_allocate_chunk_size) = auto_allocate_chunk_size {
// create_array_buffer_with_size
// Let buffer be Construct(%ArrayBuffer%, « autoAllocateChunkSize »).
match create_array_buffer_with_size(cx, auto_allocate_chunk_size as usize) {
Ok(buffer) => {
// Let pullIntoDescriptor be a new pull-into descriptor with
// buffer buffer.[[Value]]
// buffer byte length autoAllocateChunkSize
// byte offset 0
// byte length autoAllocateChunkSize
// bytes filled 0
// minimum fill 1
// element size 1
// view constructor %Uint8Array%
// reader type "default"
let pull_into_descriptor = PullIntoDescriptor {
buffer,
buffer_byte_length: auto_allocate_chunk_size,
byte_length: auto_allocate_chunk_size,
byte_offset: 0,
bytes_filled: Cell::new(0),
minimum_fill: 1,
element_size: 1,
view_constructor: Constructor::Name(Type::Uint8),
reader_type: Some(ReaderType::Default),
};
// Append pullIntoDescriptor to this.[[pendingPullIntos]].
self.pending_pull_intos
.borrow_mut()
.push(pull_into_descriptor);
},
Err(error) => {
// If buffer is an abrupt completion,
// Perform readRequest’s error steps, given buffer.[[Value]].
rooted!(in(*cx) let mut rval = UndefinedValue());
error
.clone()
.to_jsval(cx, &self.global(), rval.handle_mut(), can_gc);
read_request.error_steps(rval.handle(), can_gc);
// Return.
return;
},
}
}
// Perform ! ReadableStreamAddReadRequest(stream, readRequest).
stream.add_read_request(read_request);
// Perform ! ReadableByteStreamControllerCallPullIfNeeded(this).
self.call_pull_if_needed(can_gc);
}
/// Setting the JS object after the heap has settled down.
pub(crate) fn set_underlying_source_this_object(&self, this_object: HandleObject) {
if let Some(underlying_source) = self.underlying_source.get() {
underlying_source.set_underlying_source_this_object(this_object);
}
}
pub(crate) fn remove_entry(&self) -> QueueEntry {
self.queue
.borrow_mut()
.pop_front()
.expect("Reader must have read request when remove is called into.")
}
pub(crate) fn get_queue_total_size(&self) -> f64 {
self.queue_total_size.get()
}
}
impl ReadableByteStreamControllerMethods<crate::DomTypeHolder> for ReadableByteStreamController {
/// <https://streams.spec.whatwg.org/#rbs-controller-byob-request>
fn GetByobRequest(
&self,
can_gc: CanGc,
) -> Fallible<Option<DomRoot<ReadableStreamBYOBRequest>>> {
let cx = GlobalScope::get_cx();
// Return ! ReadableByteStreamControllerGetBYOBRequest(this).
self.get_byob_request(cx, can_gc)
}
/// <https://streams.spec.whatwg.org/#rbs-controller-desired-size>
fn GetDesiredSize(&self) -> Option<f64> {
// Return ! ReadableByteStreamControllerGetDesiredSize(this).
self.get_desired_size()
}
/// <https://streams.spec.whatwg.org/#rbs-controller-close>
fn Close(&self, can_gc: CanGc) -> Fallible<()> {
let cx = GlobalScope::get_cx();
// If this.[[closeRequested]] is true, throw a TypeError exception.
if self.close_requested.get() {
return Err(Error::Type("closeRequested is true".to_owned()));
}
// If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
if !self.stream.get().unwrap().is_readable() {
return Err(Error::Type("stream is not readable".to_owned()));
}
// Perform ? ReadableByteStreamControllerClose(this).
self.close(cx, can_gc)
}
/// <https://streams.spec.whatwg.org/#rbs-controller-enqueue>
fn Enqueue(
&self,
chunk: js::gc::CustomAutoRooterGuard<js::typedarray::ArrayBufferView>,
can_gc: CanGc,
) -> Fallible<()> {
let cx = GlobalScope::get_cx();
let chunk = HeapBufferSource::<ArrayBufferViewU8>::from_view(chunk);
// If chunk.[[ByteLength]] is 0, throw a TypeError exception.
if chunk.byte_length() == 0 {
return Err(Error::Type("chunk.ByteLength is 0".to_owned()));
}
// If chunk.[[ViewedArrayBuffer]].[[ByteLength]] is 0, throw a TypeError exception.
if chunk.viewed_buffer_array_byte_length(cx) == 0 {
return Err(Error::Type(
"chunk.ViewedArrayBuffer.ByteLength is 0".to_owned(),
));
}
// If this.[[closeRequested]] is true, throw a TypeError exception.
if self.close_requested.get() {
return Err(Error::Type("closeRequested is true".to_owned()));
}
// If this.[[stream]].[[state]] is not "readable", throw a TypeError exception.
if !self.stream.get().unwrap().is_readable() {
return Err(Error::Type("stream is not readable".to_owned()));
}
// Return ? ReadableByteStreamControllerEnqueue(this, chunk).
self.enqueue(cx, chunk, can_gc)
}
/// <https://streams.spec.whatwg.org/#rbs-controller-error>
fn Error(&self, _cx: SafeJSContext, e: SafeHandleValue, can_gc: CanGc) -> Fallible<()> {
// Perform ! ReadableByteStreamControllerError(this, e).
self.error(e, can_gc);
Ok(())
}
}