| // Copyright (c) 2019 The Chromium Embedded Framework Authors. Portions |
| // Copyright (c) 2018 The Chromium Authors. All rights reserved. Use of this |
| // source code is governed by a BSD-style license that can be found in the |
| // LICENSE file. |
| |
| #include "libcef/browser/net_service/stream_reader_url_loader.h" |
| |
| #include "libcef/browser/thread_util.h" |
| #include "libcef/common/net_service/net_service_util.h" |
| |
| #include "base/bind.h" |
| #include "base/callback.h" |
| #include "base/strings/string_number_conversions.h" |
| #include "base/strings/string_util.h" |
| #include "base/strings/stringprintf.h" |
| #include "base/task/post_task.h" |
| #include "base/threading/thread.h" |
| #include "base/threading/thread_task_runner_handle.h" |
| #include "content/public/browser/browser_thread.h" |
| #include "net/base/io_buffer.h" |
| #include "net/http/http_status_code.h" |
| #include "net/http/http_util.h" |
| #include "services/network/public/cpp/url_loader_completion_status.h" |
| |
| namespace net_service { |
| |
| namespace { |
| |
| using OnInputStreamOpenedCallback = |
| base::OnceCallback<void(std::unique_ptr<StreamReaderURLLoader::Delegate>, |
| std::unique_ptr<InputStream>)>; |
| |
| // Helper for executing the OnInputStreamOpenedCallback. |
| class OpenInputStreamWrapper |
| : public base::RefCountedThreadSafe<OpenInputStreamWrapper> { |
| public: |
| static base::OnceClosure Open( |
| std::unique_ptr<StreamReaderURLLoader::Delegate> delegate, |
| scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner, |
| const RequestId& request_id, |
| const network::ResourceRequest& request, |
| OnInputStreamOpenedCallback callback) WARN_UNUSED_RESULT { |
| scoped_refptr<OpenInputStreamWrapper> wrapper = new OpenInputStreamWrapper( |
| std::move(delegate), work_thread_task_runner, |
| base::ThreadTaskRunnerHandle::Get(), std::move(callback)); |
| wrapper->Start(request_id, request); |
| |
| return wrapper->GetCancelCallback(); |
| } |
| |
| private: |
| friend class base::RefCountedThreadSafe<OpenInputStreamWrapper>; |
| |
| OpenInputStreamWrapper( |
| std::unique_ptr<StreamReaderURLLoader::Delegate> delegate, |
| scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner, |
| scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner, |
| OnInputStreamOpenedCallback callback) |
| : delegate_(std::move(delegate)), |
| work_thread_task_runner_(work_thread_task_runner), |
| job_thread_task_runner_(job_thread_task_runner), |
| callback_(std::move(callback)) {} |
| virtual ~OpenInputStreamWrapper() {} |
| |
| void Start(const RequestId& request_id, |
| const network::ResourceRequest& request) { |
| work_thread_task_runner_->PostTask( |
| FROM_HERE, |
| base::BindOnce(&OpenInputStreamWrapper::OpenOnWorkThread, |
| base::WrapRefCounted(this), request_id, request)); |
| } |
| |
| base::OnceClosure GetCancelCallback() { |
| return base::BindOnce(&OpenInputStreamWrapper::CancelOnJobThread, |
| base::WrapRefCounted(this)); |
| } |
| |
| void CancelOnJobThread() { |
| DCHECK(job_thread_task_runner_->RunsTasksInCurrentSequence()); |
| if (callback_.is_null()) |
| return; |
| |
| callback_.Reset(); |
| |
| work_thread_task_runner_->PostTask( |
| FROM_HERE, base::BindOnce(&OpenInputStreamWrapper::CancelOnWorkThread, |
| base::WrapRefCounted(this))); |
| } |
| |
| void CancelOnWorkThread() { |
| DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence()); |
| if (is_canceled_) |
| return; |
| is_canceled_ = true; |
| OnCallback(nullptr); |
| } |
| |
| void OpenOnWorkThread(const RequestId& request_id, |
| const network::ResourceRequest& request) { |
| DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence()); |
| if (is_canceled_) |
| return; |
| |
| // |delegate_| will remain valid until OnCallback() is executed on |
| // |job_thread_task_runner_|. |
| if (!delegate_->OpenInputStream( |
| request_id, request, |
| base::BindOnce(&OpenInputStreamWrapper::OnCallback, |
| base::WrapRefCounted(this)))) { |
| is_canceled_ = true; |
| OnCallback(nullptr); |
| } |
| } |
| |
| void OnCallback(std::unique_ptr<InputStream> input_stream) { |
| if (!job_thread_task_runner_->RunsTasksInCurrentSequence()) { |
| job_thread_task_runner_->PostTask( |
| FROM_HERE, |
| base::BindOnce(&OpenInputStreamWrapper::OnCallback, |
| base::WrapRefCounted(this), std::move(input_stream))); |
| return; |
| } |
| |
| // May be null if CancelOnJobThread() was called on |
| // |job_thread_task_runner_| while OpenOnWorkThread() was pending on |
| // |work_thread_task_runner_|. |
| if (callback_.is_null()) { |
| delegate_.reset(); |
| return; |
| } |
| |
| std::move(callback_).Run(std::move(delegate_), std::move(input_stream)); |
| } |
| |
| std::unique_ptr<StreamReaderURLLoader::Delegate> delegate_; |
| |
| scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner_; |
| scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner_; |
| |
| // Only accessed on |job_thread_task_runner_|. |
| OnInputStreamOpenedCallback callback_; |
| |
| // Only accessed on |work_thread_task_runner_|. |
| bool is_canceled_ = false; |
| |
| DISALLOW_COPY_AND_ASSIGN(OpenInputStreamWrapper); |
| }; |
| |
| } // namespace |
| |
| //============================== |
| // InputStreamReader |
| //============================= |
| |
| // Class responsible for reading from the InputStream. |
| class InputStreamReader : public base::RefCountedThreadSafe<InputStreamReader> { |
| public: |
| // The constructor is called on the IO thread, not on the worker thread. |
| // Callbacks will be executed on the IO thread. |
| InputStreamReader( |
| std::unique_ptr<InputStream> stream, |
| scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner); |
| |
| // Skip |skip_bytes| number of bytes from |stream_|. |callback| will be |
| // executed asynchronously on the IO thread. A negative value passed to |
| // |callback| will indicate an error code, a positive value will indicate the |
| // number of bytes skipped. |
| void Skip(int64_t skip_bytes, InputStream::SkipCallback callback); |
| |
| // Read up to |dest_size| bytes from |stream_| into |dest|. |callback| will be |
| // executed asynchronously on the IO thread. A negative value passed to |
| // |callback| will indicate an error code, a positive value will indicate the |
| // number of bytes read. |
| void Read(scoped_refptr<net::IOBuffer> dest, |
| int dest_size, |
| InputStream::ReadCallback callback); |
| |
| private: |
| friend class base::RefCountedThreadSafe<InputStreamReader>; |
| virtual ~InputStreamReader(); |
| |
| void SkipOnWorkThread(int64_t skip_bytes, InputStream::SkipCallback callback); |
| void ReadOnWorkThread(scoped_refptr<net::IOBuffer> buffer, |
| int buffer_size, |
| InputStream::ReadCallback callback); |
| |
| void SkipToRequestedRange(); |
| |
| static void ContinueSkipCallback( |
| scoped_refptr<InputStreamReader> stream, |
| scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner, |
| int callback_id, |
| int64_t bytes_skipped); |
| static void ContinueReadCallback( |
| scoped_refptr<InputStreamReader> stream, |
| scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner, |
| int callback_id, |
| int bytes_read); |
| |
| void ContinueSkipCallbackOnWorkThread(int callback_id, int64_t bytes_skipped); |
| void ContinueReadCallbackOnWorkThread(int callback_id, int bytes_read); |
| |
| void RunSkipCallback(int64_t bytes_skipped); |
| void RunReadCallback(int bytes_read); |
| |
| static void RunSkipCallbackOnJobThread( |
| int64_t bytes_skipped, |
| InputStream::SkipCallback skip_callback); |
| static void RunReadCallbackOnJobThread( |
| int bytes_read, |
| InputStream::ReadCallback read_callback); |
| |
| std::unique_ptr<InputStream> stream_; |
| |
| // All InputStream methods are called this task runner. |
| scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner_; |
| |
| // All callbacks are executed on this task runner. |
| scoped_refptr<base::SingleThreadTaskRunner> job_thread_task_runner_; |
| |
| // The below members are only accessed on the work thread. |
| int64_t bytes_skipped_; |
| int64_t bytes_to_skip_; |
| InputStream::SkipCallback pending_skip_callback_; |
| |
| scoped_refptr<net::IOBuffer> buffer_; |
| InputStream::ReadCallback pending_read_callback_; |
| |
| int pending_callback_id_ = -1; |
| |
| int next_callback_id_ = 0; |
| |
| DISALLOW_COPY_AND_ASSIGN(InputStreamReader); |
| }; |
| |
| InputStreamReader::InputStreamReader( |
| std::unique_ptr<InputStream> stream, |
| scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner) |
| : stream_(std::move(stream)), |
| work_thread_task_runner_(work_thread_task_runner), |
| job_thread_task_runner_(base::ThreadTaskRunnerHandle::Get()) { |
| CEF_REQUIRE_IOT(); |
| DCHECK(stream_); |
| DCHECK(work_thread_task_runner_); |
| } |
| |
| InputStreamReader::~InputStreamReader() {} |
| |
| void InputStreamReader::Skip(int64_t skip_bytes, |
| InputStream::SkipCallback callback) { |
| work_thread_task_runner_->PostTask( |
| FROM_HERE, base::BindOnce(&InputStreamReader::SkipOnWorkThread, |
| base::WrapRefCounted(this), skip_bytes, |
| std::move(callback))); |
| } |
| |
| void InputStreamReader::Read(scoped_refptr<net::IOBuffer> dest, |
| int dest_size, |
| InputStream::ReadCallback callback) { |
| work_thread_task_runner_->PostTask( |
| FROM_HERE, base::BindOnce(&InputStreamReader::ReadOnWorkThread, |
| base::WrapRefCounted(this), dest, dest_size, |
| std::move(callback))); |
| } |
| |
| void InputStreamReader::SkipOnWorkThread(int64_t skip_bytes, |
| InputStream::SkipCallback callback) { |
| DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence()); |
| |
| // No callback should currently be pending. |
| DCHECK_EQ(pending_callback_id_, -1); |
| DCHECK(pending_skip_callback_.is_null()); |
| |
| pending_skip_callback_ = std::move(callback); |
| |
| if (skip_bytes <= 0) { |
| RunSkipCallback(0); |
| return; |
| } |
| |
| bytes_skipped_ = bytes_to_skip_ = skip_bytes; |
| SkipToRequestedRange(); |
| } |
| |
| void InputStreamReader::ReadOnWorkThread(scoped_refptr<net::IOBuffer> dest, |
| int dest_size, |
| InputStream::ReadCallback callback) { |
| DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence()); |
| |
| // No callback should currently be pending. |
| DCHECK_EQ(pending_callback_id_, -1); |
| DCHECK(pending_read_callback_.is_null()); |
| |
| pending_read_callback_ = std::move(callback); |
| |
| if (!dest_size) { |
| RunReadCallback(0); |
| return; |
| } |
| |
| DCHECK_GT(dest_size, 0); |
| |
| buffer_ = dest; |
| pending_callback_id_ = ++next_callback_id_; |
| |
| int bytes_read = 0; |
| bool result = stream_->Read( |
| buffer_.get(), dest_size, &bytes_read, |
| base::BindOnce(&InputStreamReader::ContinueReadCallback, |
| base::WrapRefCounted(this), work_thread_task_runner_, |
| pending_callback_id_)); |
| |
| // Check if the callback will execute asynchronously. |
| if (result && bytes_read == 0) |
| return; |
| |
| RunReadCallback(result || bytes_read <= 0 ? bytes_read : net::ERR_FAILED); |
| } |
| |
| void InputStreamReader::SkipToRequestedRange() { |
| DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence()); |
| |
| // Skip to the start of the requested data. This has to be done in a loop |
| // because the underlying InputStream is not guaranteed to skip the requested |
| // number of bytes. |
| do { |
| pending_callback_id_ = ++next_callback_id_; |
| |
| int64_t skipped = 0; |
| bool result = stream_->Skip( |
| bytes_to_skip_, &skipped, |
| base::BindOnce(&InputStreamReader::ContinueSkipCallback, |
| base::WrapRefCounted(this), work_thread_task_runner_, |
| pending_callback_id_)); |
| |
| // Check if the callback will execute asynchronously. |
| if (result && skipped == 0) |
| return; |
| |
| if (!result || skipped <= 0) { |
| RunSkipCallback(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE); |
| return; |
| } |
| DCHECK_LE(skipped, bytes_to_skip_); |
| |
| bytes_to_skip_ -= skipped; |
| } while (bytes_to_skip_ > 0); |
| |
| // All done, the requested number of bytes were skipped. |
| RunSkipCallback(bytes_skipped_); |
| } |
| |
| // static |
| void InputStreamReader::ContinueSkipCallback( |
| scoped_refptr<InputStreamReader> stream, |
| scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner, |
| int callback_id, |
| int64_t bytes_skipped) { |
| // Always execute asynchronously. |
| work_thread_task_runner->PostTask( |
| FROM_HERE, |
| base::BindOnce(&InputStreamReader::ContinueSkipCallbackOnWorkThread, |
| stream, callback_id, bytes_skipped)); |
| } |
| |
| // static |
| void InputStreamReader::ContinueReadCallback( |
| scoped_refptr<InputStreamReader> stream, |
| scoped_refptr<base::SequencedTaskRunner> work_thread_task_runner, |
| int callback_id, |
| int bytes_read) { |
| // Always execute asynchronously. |
| work_thread_task_runner->PostTask( |
| FROM_HERE, |
| base::BindOnce(&InputStreamReader::ContinueReadCallbackOnWorkThread, |
| stream, callback_id, bytes_read)); |
| } |
| |
| void InputStreamReader::ContinueSkipCallbackOnWorkThread( |
| int callback_id, |
| int64_t bytes_skipped) { |
| DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence()); |
| |
| // Check for out of order callbacks. |
| if (pending_callback_id_ != callback_id) |
| return; |
| |
| DCHECK_LE(bytes_skipped, bytes_to_skip_); |
| |
| if (bytes_to_skip_ > 0 && bytes_skipped > 0) |
| bytes_to_skip_ -= bytes_skipped; |
| |
| if (bytes_skipped <= 0) { |
| RunSkipCallback(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE); |
| } else if (bytes_to_skip_ > 0) { |
| // Continue execution asynchronously. |
| work_thread_task_runner_->PostTask( |
| FROM_HERE, |
| base::BindOnce(&InputStreamReader::SkipToRequestedRange, this)); |
| } else { |
| // All done, the requested number of bytes were skipped. |
| RunSkipCallback(bytes_skipped_); |
| } |
| } |
| |
| void InputStreamReader::ContinueReadCallbackOnWorkThread(int callback_id, |
| int bytes_read) { |
| DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence()); |
| |
| // Check for out of order callbacks. |
| if (pending_callback_id_ != callback_id) |
| return; |
| |
| RunReadCallback(bytes_read); |
| } |
| |
| void InputStreamReader::RunSkipCallback(int64_t bytes_skipped) { |
| DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence()); |
| |
| DCHECK(!pending_skip_callback_.is_null()); |
| job_thread_task_runner_->PostTask( |
| FROM_HERE, |
| base::BindOnce(InputStreamReader::RunSkipCallbackOnJobThread, |
| bytes_skipped, std::move(pending_skip_callback_))); |
| |
| // Reset callback state. |
| pending_callback_id_ = -1; |
| bytes_skipped_ = bytes_to_skip_ = -1; |
| } |
| |
| void InputStreamReader::RunReadCallback(int bytes_read) { |
| DCHECK(work_thread_task_runner_->RunsTasksInCurrentSequence()); |
| |
| DCHECK(!pending_read_callback_.is_null()); |
| job_thread_task_runner_->PostTask( |
| FROM_HERE, base::BindOnce(InputStreamReader::RunReadCallbackOnJobThread, |
| bytes_read, std::move(pending_read_callback_))); |
| |
| // Reset callback state. |
| pending_callback_id_ = -1; |
| buffer_ = nullptr; |
| } |
| |
| // static |
| void InputStreamReader::RunSkipCallbackOnJobThread( |
| int64_t bytes_skipped, |
| InputStream::SkipCallback skip_callback) { |
| std::move(skip_callback).Run(bytes_skipped); |
| } |
| |
| // static |
| void InputStreamReader::RunReadCallbackOnJobThread( |
| int bytes_read, |
| InputStream::ReadCallback read_callback) { |
| std::move(read_callback).Run(bytes_read); |
| } |
| |
| //============================== |
| // RequestId |
| //============================== |
| |
| std::string RequestId::ToString() const { |
| return base::StringPrintf("RequestId(%u, %u)", request_id_, routing_id_); |
| } |
| |
| std::string RequestId::ToString(base::StringPiece debug_label) const { |
| return base::StringPrintf("RequestId[%s](%u, %u)", |
| debug_label.as_string().c_str(), request_id_, |
| routing_id_); |
| } |
| |
| std::ostream& operator<<(std::ostream& out, const RequestId& request_id) { |
| return out << request_id.ToString(); |
| } |
| |
| //============================== |
| // StreamReaderURLLoader |
| //============================= |
| |
| StreamReaderURLLoader::StreamReaderURLLoader( |
| const RequestId& request_id, |
| const network::ResourceRequest& request, |
| network::mojom::URLLoaderClientPtr client, |
| network::mojom::TrustedHeaderClientPtr header_client, |
| const net::MutableNetworkTrafficAnnotationTag& traffic_annotation, |
| std::unique_ptr<Delegate> response_delegate) |
| : request_id_(request_id), |
| request_(request), |
| client_(std::move(client)), |
| header_client_(std::move(header_client)), |
| traffic_annotation_(traffic_annotation), |
| response_delegate_(std::move(response_delegate)), |
| writable_handle_watcher_(FROM_HERE, |
| mojo::SimpleWatcher::ArmingPolicy::MANUAL, |
| base::SequencedTaskRunnerHandle::Get()), |
| weak_factory_(this) { |
| DCHECK(response_delegate_); |
| // If there is a client error, clean up the request. |
| client_.set_connection_error_handler( |
| base::BindOnce(&StreamReaderURLLoader::RequestComplete, |
| weak_factory_.GetWeakPtr(), net::ERR_ABORTED)); |
| |
| // All InputStream work will be performed on this task runner. |
| stream_work_task_runner_ = |
| base::CreateSequencedTaskRunner({base::ThreadPool(), base::MayBlock()}); |
| } |
| |
| StreamReaderURLLoader::~StreamReaderURLLoader() { |
| if (open_cancel_callback_) { |
| // Release the Delegate held by OpenInputStreamWrapper. |
| std::move(open_cancel_callback_).Run(); |
| } |
| } |
| |
| void StreamReaderURLLoader::Start() { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| |
| if (!ParseRange(request_.headers)) { |
| RequestComplete(net::ERR_REQUEST_RANGE_NOT_SATISFIABLE); |
| return; |
| } |
| |
| if (header_client_.is_bound()) { |
| header_client_->OnBeforeSendHeaders( |
| request_.headers, |
| base::BindOnce(&StreamReaderURLLoader::ContinueWithRequestHeaders, |
| weak_factory_.GetWeakPtr())); |
| } else { |
| ContinueWithRequestHeaders(net::OK, base::nullopt); |
| } |
| } |
| |
| void StreamReaderURLLoader::ContinueWithRequestHeaders( |
| int32_t result, |
| const base::Optional<net::HttpRequestHeaders>& headers) { |
| if (result != net::OK) { |
| RequestComplete(result); |
| return; |
| } |
| |
| if (headers) { |
| DCHECK(header_client_.is_bound()); |
| request_.headers = *headers; |
| } |
| |
| open_cancel_callback_ = OpenInputStreamWrapper::Open( |
| // This is intentional - the loader could be deleted while |
| // the callback is executing on the background thread. The |
| // delegate will be "returned" to the loader once the |
| // InputStream open attempt is completed. |
| std::move(response_delegate_), stream_work_task_runner_, request_id_, |
| request_, |
| base::BindOnce(&StreamReaderURLLoader::OnInputStreamOpened, |
| weak_factory_.GetWeakPtr())); |
| } |
| |
| void StreamReaderURLLoader::FollowRedirect( |
| const std::vector<std::string>& removed_headers, |
| const net::HttpRequestHeaders& modified_headers, |
| const base::Optional<GURL>& new_url) { |
| NOTREACHED(); |
| } |
| |
| void StreamReaderURLLoader::SetPriority(net::RequestPriority priority, |
| int intra_priority_value) {} |
| |
| void StreamReaderURLLoader::PauseReadingBodyFromNet() {} |
| |
| void StreamReaderURLLoader::ResumeReadingBodyFromNet() {} |
| |
| void StreamReaderURLLoader::OnInputStreamOpened( |
| std::unique_ptr<StreamReaderURLLoader::Delegate> returned_delegate, |
| std::unique_ptr<InputStream> input_stream) { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| DCHECK(returned_delegate); |
| response_delegate_ = std::move(returned_delegate); |
| open_cancel_callback_.Reset(); |
| |
| if (!input_stream) { |
| bool restarted = false; |
| response_delegate_->OnInputStreamOpenFailed(request_id_, &restarted); |
| if (restarted) { |
| // The request has been restarted with a new loader. |
| // |this| will be deleted. |
| CleanUp(); |
| } else { |
| HeadersComplete(net::HTTP_NOT_FOUND, -1); |
| } |
| return; |
| } |
| |
| input_stream_reader_ = base::MakeRefCounted<InputStreamReader>( |
| std::move(input_stream), stream_work_task_runner_); |
| |
| if (!byte_range_valid()) { |
| OnReaderSkipCompleted(0); |
| } else { |
| input_stream_reader_->Skip( |
| byte_range_.first_byte_position(), |
| base::BindOnce(&StreamReaderURLLoader::OnReaderSkipCompleted, |
| weak_factory_.GetWeakPtr())); |
| } |
| } |
| |
| void StreamReaderURLLoader::OnReaderSkipCompleted(int64_t bytes_skipped) { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| |
| if (!byte_range_valid()) { |
| // Expected content length is unspecified. |
| HeadersComplete(net::HTTP_OK, -1); |
| } else if (bytes_skipped == byte_range_.first_byte_position()) { |
| // We skipped the expected number of bytes. |
| int64_t expected_content_length = -1; |
| if (byte_range_.HasLastBytePosition()) { |
| expected_content_length = byte_range_.last_byte_position() - |
| byte_range_.first_byte_position() + 1; |
| DCHECK_GE(expected_content_length, 0); |
| } |
| HeadersComplete(net::HTTP_OK, expected_content_length); |
| } else { |
| RequestComplete(bytes_skipped < 0 ? bytes_skipped : net::ERR_FAILED); |
| } |
| } |
| |
| void StreamReaderURLLoader::HeadersComplete(int orig_status_code, |
| int64_t expected_content_length) { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| |
| int status_code = orig_status_code; |
| std::string status_text = |
| net::GetHttpReasonPhrase(static_cast<net::HttpStatusCode>(status_code)); |
| std::string mime_type, charset; |
| int64_t content_length = expected_content_length; |
| ResourceResponse::HeaderMap extra_headers; |
| response_delegate_->GetResponseHeaders(request_id_, &status_code, |
| &status_text, &mime_type, &charset, |
| &content_length, &extra_headers); |
| |
| if (status_code < 0) { |
| // Early exit if the handler reported an error. |
| RequestComplete(status_code); |
| return; |
| } |
| |
| auto pending_response = network::mojom::URLResponseHead::New(); |
| pending_response->request_start = base::TimeTicks::Now(); |
| pending_response->response_start = base::TimeTicks::Now(); |
| |
| auto headers = MakeResponseHeaders( |
| status_code, status_text, mime_type, charset, content_length, |
| extra_headers, false /* allow_existing_header_override */); |
| pending_response->headers = headers; |
| |
| if (content_length >= 0) |
| pending_response->content_length = content_length; |
| |
| if (!mime_type.empty()) { |
| pending_response->mime_type = mime_type; |
| if (!charset.empty()) |
| pending_response->charset = charset; |
| } |
| |
| if (header_client_.is_bound()) { |
| header_client_->OnHeadersReceived( |
| headers->raw_headers(), net::IPEndPoint(), |
| base::BindOnce(&StreamReaderURLLoader::ContinueWithResponseHeaders, |
| weak_factory_.GetWeakPtr(), |
| std::move(pending_response))); |
| } else { |
| ContinueWithResponseHeaders(std::move(pending_response), net::OK, |
| base::nullopt, base::nullopt); |
| } |
| } |
| |
| void StreamReaderURLLoader::ContinueWithResponseHeaders( |
| network::mojom::URLResponseHeadPtr pending_response, |
| int32_t result, |
| const base::Optional<std::string>& headers, |
| const base::Optional<GURL>& redirect_url) { |
| if (result != net::OK) { |
| RequestComplete(result); |
| return; |
| } |
| |
| if (headers) { |
| DCHECK(header_client_.is_bound()); |
| pending_response->headers = |
| base::MakeRefCounted<net::HttpResponseHeaders>(*headers); |
| } |
| |
| auto pending_headers = pending_response->headers; |
| |
| // What the length would be if we sent headers over the network. Used to |
| // calculate data length. |
| header_length_ = pending_headers->raw_headers().length(); |
| |
| DCHECK(client_.is_bound()); |
| |
| std::string location; |
| const auto has_redirect_url = redirect_url && !redirect_url->is_empty(); |
| if (has_redirect_url || pending_headers->IsRedirect(&location)) { |
| pending_response->encoded_data_length = header_length_; |
| pending_response->content_length = pending_response->encoded_body_length = |
| 0; |
| const GURL new_location = |
| has_redirect_url ? *redirect_url : request_.url.Resolve(location); |
| client_->OnReceiveRedirect( |
| MakeRedirectInfo(request_, pending_headers.get(), new_location, |
| pending_headers->response_code()), |
| std::move(pending_response)); |
| // The client will restart the request with a new loader. |
| // |this| will be deleted. |
| CleanUp(); |
| } else { |
| client_->OnReceiveResponse(std::move(pending_response)); |
| } |
| } |
| |
| void StreamReaderURLLoader::ContinueResponse(bool was_redirected) { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| |
| if (was_redirected) { |
| // Special case where we allow the client to perform the redirect. |
| // The client will restart the request with a new loader. |
| // |this| will be deleted. |
| CleanUp(); |
| } else { |
| SendBody(); |
| } |
| } |
| |
| void StreamReaderURLLoader::SendBody() { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| |
| mojo::ScopedDataPipeConsumerHandle consumer_handle; |
| if (CreateDataPipe(nullptr /*options*/, &producer_handle_, |
| &consumer_handle) != MOJO_RESULT_OK) { |
| RequestComplete(net::ERR_FAILED); |
| return; |
| } |
| writable_handle_watcher_.Watch( |
| producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE, |
| base::BindRepeating(&StreamReaderURLLoader::OnDataPipeWritable, |
| base::Unretained(this))); |
| client_->OnStartLoadingResponseBody(std::move(consumer_handle)); |
| |
| ReadMore(); |
| } |
| |
| void StreamReaderURLLoader::ReadMore() { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| DCHECK(!pending_buffer_.get()); |
| |
| uint32_t num_bytes; |
| MojoResult mojo_result = network::NetToMojoPendingBuffer::BeginWrite( |
| &producer_handle_, &pending_buffer_, &num_bytes); |
| if (mojo_result == MOJO_RESULT_SHOULD_WAIT) { |
| // The pipe is full. We need to wait for it to have more space. |
| writable_handle_watcher_.ArmOrNotify(); |
| return; |
| } else if (mojo_result == MOJO_RESULT_FAILED_PRECONDITION) { |
| // The data pipe consumer handle has been closed. |
| RequestComplete(net::ERR_ABORTED); |
| return; |
| } else if (mojo_result != MOJO_RESULT_OK) { |
| // The body stream is in a bad state. Bail out. |
| RequestComplete(net::ERR_UNEXPECTED); |
| return; |
| } |
| scoped_refptr<net::IOBuffer> buffer( |
| new network::NetToMojoIOBuffer(pending_buffer_.get())); |
| |
| if (!input_stream_reader_.get()) { |
| // This will happen if opening the InputStream fails in which case the |
| // error is communicated by setting the HTTP response status header rather |
| // than failing the request during the header fetch phase. |
| OnReaderReadCompleted(0); |
| return; |
| } |
| |
| input_stream_reader_->Read( |
| buffer, base::checked_cast<int>(num_bytes), |
| base::BindOnce(&StreamReaderURLLoader::OnReaderReadCompleted, |
| weak_factory_.GetWeakPtr())); |
| } |
| |
| void StreamReaderURLLoader::OnDataPipeWritable(MojoResult result) { |
| if (result == MOJO_RESULT_FAILED_PRECONDITION) { |
| RequestComplete(net::ERR_ABORTED); |
| return; |
| } |
| DCHECK_EQ(result, MOJO_RESULT_OK) << result; |
| |
| ReadMore(); |
| } |
| |
| void StreamReaderURLLoader::OnReaderReadCompleted(int bytes_read) { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| |
| DCHECK(pending_buffer_); |
| if (bytes_read < 0) { |
| // Error case. |
| RequestComplete(bytes_read); |
| return; |
| } |
| if (bytes_read == 0) { |
| // Eof, read completed. |
| pending_buffer_->Complete(0); |
| RequestComplete(net::OK); |
| return; |
| } |
| producer_handle_ = pending_buffer_->Complete(bytes_read); |
| pending_buffer_ = nullptr; |
| |
| client_->OnTransferSizeUpdated(bytes_read); |
| total_bytes_read_ += bytes_read; |
| |
| base::ThreadTaskRunnerHandle::Get()->PostTask( |
| FROM_HERE, base::BindOnce(&StreamReaderURLLoader::ReadMore, |
| weak_factory_.GetWeakPtr())); |
| } |
| |
| void StreamReaderURLLoader::RequestComplete(int status_code) { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| |
| auto status = network::URLLoaderCompletionStatus(status_code); |
| status.completion_time = base::TimeTicks::Now(); |
| status.encoded_data_length = total_bytes_read_ + header_length_; |
| status.encoded_body_length = total_bytes_read_; |
| // We don't support decoders, so use the same value. |
| status.decoded_body_length = total_bytes_read_; |
| |
| client_->OnComplete(status); |
| CleanUp(); |
| } |
| |
| void StreamReaderURLLoader::CleanUp() { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| |
| // Resets the watchers and pipes, so that we will never be called back. |
| writable_handle_watcher_.Cancel(); |
| pending_buffer_ = nullptr; |
| producer_handle_.reset(); |
| |
| // Manages its own lifetime. |
| delete this; |
| } |
| |
| bool StreamReaderURLLoader::ParseRange(const net::HttpRequestHeaders& headers) { |
| DCHECK(thread_checker_.CalledOnValidThread()); |
| |
| std::string range_header; |
| if (headers.GetHeader(net::HttpRequestHeaders::kRange, &range_header)) { |
| // This loader only cares about the Range header so that we know how many |
| // bytes in the stream to skip and how many to read after that. |
| std::vector<net::HttpByteRange> ranges; |
| if (net::HttpUtil::ParseRangeHeader(range_header, &ranges)) { |
| // In case of multi-range request only use the first range. |
| // We don't support multirange requests. |
| if (ranges.size() == 1) |
| byte_range_ = ranges[0]; |
| } else { |
| // This happens if the range header could not be parsed or is invalid. |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| bool StreamReaderURLLoader::byte_range_valid() const { |
| return byte_range_.IsValid() && byte_range_.first_byte_position() >= 0; |
| } |
| |
| } // namespace net_service |