blob: 770286884da13e500e11de94fddc08a6e46a7019 [file] [log] [blame]
// Copyright (c) 2019 The Chromium Embedded Framework Authors. All rights
// reserved. Use of this source code is governed by a BSD-style license that
// can be found in the LICENSE file.
#include "libcef/browser/net_service/response_filter_wrapper.h"
#include <queue>
#include "mojo/public/cpp/system/simple_watcher.h"
#include "mojo/public/cpp/system/string_data_source.h"
namespace net_service {
namespace {
// Match the default |capacity_num_bytes| value from mojo::Core::CreateDataPipe.
static const size_t kBufferSize = 64 * 1024; // 64 Kbytes.
static const size_t kMinBufferSpace = 1024; // 1 Kbytes.
class ResponseFilterWrapper {
public:
ResponseFilterWrapper(CefRefPtr<CefResponseFilter> filter,
mojo::ScopedDataPipeConsumerHandle source_handle,
base::OnceClosure error_callback);
// Creates and returns the output handle, or |source_handle| on failure.
bool CreateOutputHandle(mojo::ScopedDataPipeConsumerHandle* output_handle);
private:
void OnSourceReadable(MojoResult, const mojo::HandleSignalsState&);
void Filter(const char* data, size_t size);
void Write(std::unique_ptr<std::string> data);
void OnWriteComplete(std::unique_ptr<std::string>, MojoResult result);
void Drain(bool complete);
void MaybeSuccess();
void Cleanup(bool success);
CefRefPtr<CefResponseFilter> filter_;
mojo::ScopedDataPipeConsumerHandle source_handle_;
base::OnceClosure error_callback_;
std::unique_ptr<mojo::DataPipeProducer> forwarder_;
mojo::SimpleWatcher source_watcher_;
bool read_pending_ = false;
bool write_pending_ = false;
std::queue<std::unique_ptr<std::string>> pending_data_;
cef_response_filter_status_t last_status_ = RESPONSE_FILTER_NEED_MORE_DATA;
DISALLOW_COPY_AND_ASSIGN(ResponseFilterWrapper);
};
ResponseFilterWrapper::ResponseFilterWrapper(
CefRefPtr<CefResponseFilter> filter,
mojo::ScopedDataPipeConsumerHandle source_handle,
base::OnceClosure error_callback)
: filter_(filter),
source_handle_(std::move(source_handle)),
error_callback_(std::move(error_callback)),
source_watcher_(FROM_HERE, mojo::SimpleWatcher::ArmingPolicy::MANUAL) {}
bool ResponseFilterWrapper::CreateOutputHandle(
mojo::ScopedDataPipeConsumerHandle* output_handle) {
if (!filter_->InitFilter()) {
*output_handle = std::move(source_handle_);
return false;
}
mojo::ScopedDataPipeProducerHandle forwarding_handle;
if (CreateDataPipe(nullptr, &forwarding_handle, output_handle) !=
MOJO_RESULT_OK) {
*output_handle = std::move(source_handle_);
return false;
}
forwarder_ =
std::make_unique<mojo::DataPipeProducer>(std::move(forwarding_handle));
source_watcher_.Watch(
source_handle_.get(),
MOJO_HANDLE_SIGNAL_READABLE | MOJO_HANDLE_SIGNAL_PEER_CLOSED,
MOJO_TRIGGER_CONDITION_SIGNALS_SATISFIED,
base::BindRepeating(&ResponseFilterWrapper::OnSourceReadable,
base::Unretained(this)));
source_watcher_.ArmOrNotify();
read_pending_ = true;
return true;
}
void ResponseFilterWrapper::OnSourceReadable(MojoResult,
const mojo::HandleSignalsState&) {
const void* buffer = nullptr;
uint32_t read_bytes = 0;
MojoResult result = source_handle_->BeginReadData(&buffer, &read_bytes,
MOJO_READ_DATA_FLAG_NONE);
if (result == MOJO_RESULT_SHOULD_WAIT) {
source_watcher_.ArmOrNotify();
return;
}
if (result != MOJO_RESULT_OK) {
// Whole body has been read, or something went wrong.
Drain(result == MOJO_RESULT_FAILED_PRECONDITION);
return;
}
Filter(static_cast<const char*>(buffer), read_bytes);
if (last_status_ == RESPONSE_FILTER_ERROR) {
// Something went wrong.
Drain(false);
return;
}
source_handle_->EndReadData(read_bytes);
source_watcher_.ArmOrNotify();
}
void ResponseFilterWrapper::Filter(const char* data, size_t size) {
size_t data_in_size = size;
auto data_in_ptr = data_in_size > 0 ? data : nullptr;
size_t data_out_offset = 0;
std::unique_ptr<std::string> data_out;
while (true) {
size_t data_in_read = 0;
if (!data_out) {
// Start a new buffer. Should have no offset to begin with.
DCHECK_EQ(0U, data_out_offset);
data_out = std::make_unique<std::string>();
data_out->resize(kBufferSize);
}
auto data_out_ptr = data_out->data() + data_out_offset;
size_t data_out_size = kBufferSize - data_out_offset;
size_t data_out_written = 0;
last_status_ = filter_->Filter(
const_cast<char*>(data_in_ptr), data_in_size, data_in_read,
const_cast<char*>(data_out_ptr), data_out_size, data_out_written);
if (last_status_ == RESPONSE_FILTER_ERROR)
break;
// Validate the out values.
if (data_in_read > data_in_size) {
LOG(ERROR) << "potential buffer overflow; data_in_read > data_in_size";
last_status_ = RESPONSE_FILTER_ERROR;
break;
}
if (data_out_written > data_out_size) {
LOG(ERROR)
<< "potential buffer overflow; data_out_written > data_out_size";
last_status_ = RESPONSE_FILTER_ERROR;
break;
}
if (data_out_written == 0 && data_in_read != data_in_size) {
LOG(ERROR) << "when no data is written all input must be consumed; "
"data_out_written == 0 && data_in_read != data_in_size";
last_status_ = RESPONSE_FILTER_ERROR;
break;
}
if (data_out_written > 0) {
data_out_offset += data_out_written;
if (data_out_offset > kBufferSize - kMinBufferSpace) {
// The buffer is full or almost full. Write the data that we've
// received so far and start a new buffer.
data_out->resize(data_out_offset);
Write(std::move(data_out));
data_out_offset = 0;
}
}
if (data_in_read < data_in_size) {
// Keep going until the user reads all data.
data_in_ptr += data_in_read;
data_in_size -= data_in_read;
continue;
}
// At this point the user has read all data...
if (data_in_ptr) {
// Clear the input buffer.
data_in_read = data_in_size = 0;
data_in_ptr = nullptr;
}
if (data_out_written == data_out_size &&
last_status_ == RESPONSE_FILTER_NEED_MORE_DATA) {
// Output buffer was filled, but data is still pending.
continue;
}
if (data_out_offset > 0) {
// Write the last of the data that we've received.
data_out->resize(data_out_offset);
Write(std::move(data_out));
}
break;
}
}
void ResponseFilterWrapper::Write(std::unique_ptr<std::string> data) {
if (write_pending_) {
// Only one write at a time is supported.
pending_data_.push(std::move(data));
return;
}
write_pending_ = true;
base::StringPiece string_piece(*data);
forwarder_->Write(std::make_unique<mojo::StringDataSource>(
string_piece, mojo::StringDataSource::AsyncWritingMode::
STRING_STAYS_VALID_UNTIL_COMPLETION),
base::BindOnce(&ResponseFilterWrapper::OnWriteComplete,
base::Unretained(this), std::move(data)));
}
void ResponseFilterWrapper::OnWriteComplete(std::unique_ptr<std::string>,
MojoResult result) {
write_pending_ = false;
if (result != MOJO_RESULT_OK) {
// Something went wrong.
Cleanup(false);
return;
}
MaybeSuccess();
}
void ResponseFilterWrapper::Drain(bool complete) {
read_pending_ = false;
source_handle_.reset();
source_watcher_.Cancel();
if (!complete) {
// Something went wrong.
Cleanup(false);
return;
}
if (last_status_ == RESPONSE_FILTER_NEED_MORE_DATA) {
// Let the user write any remaining data.
Filter(nullptr, 0);
if (last_status_ != RESPONSE_FILTER_DONE) {
// Something went wrong.
Cleanup(false);
return;
}
}
MaybeSuccess();
}
void ResponseFilterWrapper::MaybeSuccess() {
if (!write_pending_ && !pending_data_.empty()) {
// Write the next data segment.
auto next = std::move(pending_data_.front());
pending_data_.pop();
Write(std::move(next));
return;
}
if (!read_pending_ && !write_pending_)
Cleanup(true);
}
void ResponseFilterWrapper::Cleanup(bool success) {
if (!success && error_callback_)
std::move(error_callback_).Run();
delete this;
}
} // namespace
mojo::ScopedDataPipeConsumerHandle CreateResponseFilterHandler(
CefRefPtr<CefResponseFilter> filter,
mojo::ScopedDataPipeConsumerHandle source_handle,
base::OnceClosure error_callback) {
// |filter_wrapper| will delete itself when filtering is complete if
// CreateOutputHandle returns true. Otherwise, it will return the
// original |source_handle|.
auto filter_wrapper = new ResponseFilterWrapper(
filter, std::move(source_handle), std::move(error_callback));
mojo::ScopedDataPipeConsumerHandle output_handle;
if (!filter_wrapper->CreateOutputHandle(&output_handle))
delete filter_wrapper;
return output_handle;
}
} // namespace net_service