| // Copyright 2018 The Fuchsia Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| #include <lib/async-testing/dispatcher_stub.h> |
| #include <lib/async-testing/test_loop_dispatcher.h> |
| #include <lib/async/default.h> |
| #include <lib/async/dispatcher.h> |
| #include <lib/async/task.h> |
| #include <lib/async/wait.h> |
| #include <lib/fit/defer.h> |
| #include <lib/zx/port.h> |
| #include <lib/zx/time.h> |
| #include <zircon/assert.h> |
| #include <zircon/compiler.h> |
| #include <zircon/errors.h> |
| #include <zircon/status.h> |
| #include <zircon/syscalls.h> |
| #include <zircon/syscalls/port.h> |
| |
| #include <list> |
| #include <memory> |
| #include <mutex> |
| #include <set> |
| |
| namespace async { |
| namespace { |
| |
| // An asynchronous dispatcher with an abstracted sense of time, controlled by an |
| // external time-keeping object, for use in testing. |
| class TestLoopDispatcher : public DispatcherStub, public async_test_subloop_t { |
| public: |
| TestLoopDispatcher(); |
| ~TestLoopDispatcher(); |
| TestLoopDispatcher(const TestLoopDispatcher&) = delete; |
| TestLoopDispatcher& operator=(const TestLoopDispatcher&) = delete; |
| |
| // async_dispatcher_t operation implementations. |
| zx::time Now() override __TA_EXCLUDES(&dispatcher_mtx_) { |
| std::lock_guard<std::mutex> lock(dispatcher_mtx_); |
| return NowLocked(); |
| } |
| zx_status_t BeginWait(async_wait_t* wait) __TA_EXCLUDES(&dispatcher_mtx_) override; |
| zx_status_t CancelWait(async_wait_t* wait) __TA_EXCLUDES(&dispatcher_mtx_) override; |
| zx_status_t PostTask(async_task_t* task) __TA_EXCLUDES(&dispatcher_mtx_) override; |
| zx_status_t CancelTask(async_task_t* task) __TA_EXCLUDES(&dispatcher_mtx_) override; |
| |
| // async_test_loop_provider_t operations implementations. |
| static void AdvanceTimeTo(async_test_subloop_t* subloop, zx_time_t time) |
| __TA_EXCLUDES(&dispatcher_mtx_); |
| static uint8_t DispatchNextDueMessage(async_test_subloop_t* subloop) |
| __TA_EXCLUDES(&dispatcher_mtx_); |
| static uint8_t HasPendingWork(async_test_subloop_t* subloop) __TA_EXCLUDES(&dispatcher_mtx_); |
| static zx_time_t GetNextTaskDueTime(async_test_subloop_t* subloop) |
| __TA_EXCLUDES(&dispatcher_mtx_); |
| static void Finalize(async_test_subloop_t* subloop) __TA_EXCLUDES(&dispatcher_mtx_); |
| |
| private: |
| class Activated; |
| class TaskActivated; |
| class WaitActivated; |
| |
| class AsyncTaskComparator { |
| public: |
| bool operator()(async_task_t* t1, async_task_t* t2) const { |
| return t1->deadline < t2->deadline; |
| } |
| }; |
| |
| // async_test_loop_provider_t operations implementations. |
| void AdvanceTimeTo(zx::time time) __TA_EXCLUDES(&dispatcher_mtx_); |
| bool DispatchNextDueMessage() __TA_EXCLUDES(&dispatcher_mtx_); |
| bool HasPendingWork() __TA_EXCLUDES(&dispatcher_mtx_); |
| zx::time GetNextTaskDueTime() __TA_EXCLUDES(&dispatcher_mtx_); |
| |
| zx::time NowLocked() const __TA_REQUIRES(&dispatcher_mtx_) { return now_; } |
| |
| // Extracts activated tasks and waits to |activated_|. |
| void ExtractActivatedLocked() __TA_REQUIRES(&dispatcher_mtx_); |
| |
| // Removes the given task or wait from |activables_| and |activated_|. |
| zx_status_t CancelActivatedTaskOrWaitLocked(void* task_or_wait) __TA_REQUIRES(&dispatcher_mtx_); |
| |
| // Dispatches all remaining posted waits and tasks, invoking their handlers |
| // with status ZX_ERR_CANCELED. |
| void Shutdown() __TA_EXCLUDES(&dispatcher_mtx_); |
| |
| // Whether the loop is shutting down. |
| bool in_shutdown_ __TA_GUARDED(&dispatcher_mtx_) = false; |
| |
| std::mutex dispatcher_mtx_; |
| |
| // The current time. |
| zx::time now_ __TA_GUARDED(&dispatcher_mtx_) = zx::time::infinite_past(); |
| |
| // Pending tasks activable in the future. |
| // The ordering of the set is based on the task timeline. Multiple tasks |
| // with the same deadline will be equivalent, and be ordered by order of |
| // insertion. |
| std::multiset<async_task_t*, AsyncTaskComparator> future_tasks_ __TA_GUARDED(&dispatcher_mtx_); |
| // Pending waits. |
| std::set<async_wait_t*> pending_waits_ __TA_GUARDED(&dispatcher_mtx_); |
| // Activated elements, ready to be dispatched. |
| std::list<std::unique_ptr<Activated>> activated_ __TA_GUARDED(&dispatcher_mtx_); |
| // Port used to register waits. |
| zx::port port_; |
| }; |
| |
| const async_test_subloop_ops_t subloop_ops = { |
| TestLoopDispatcher::AdvanceTimeTo, TestLoopDispatcher::DispatchNextDueMessage, |
| TestLoopDispatcher::HasPendingWork, TestLoopDispatcher::GetNextTaskDueTime, |
| TestLoopDispatcher::Finalize, |
| }; |
| |
| // An element in the loop that can be activated. It is either a task or a wait. |
| class TestLoopDispatcher::Activated { |
| public: |
| virtual ~Activated() {} |
| |
| // Dispatch the element, calling its handler. |
| virtual void Dispatch() const = 0; |
| // Cancel the element, calling its handler with a canceled status. |
| virtual void Cancel() const = 0; |
| // Returns whether this |Activated| corresponds to the given task or wait. |
| virtual bool Matches(void* task_or_wait) const = 0; |
| // Returns the due time for this |Activable|. If the |Activable| is a task, |
| // this corresponds to its deadline, otherwise this is an infinite time in |
| // the future. |
| virtual zx::time DueTime() const = 0; |
| }; |
| |
| class TestLoopDispatcher::TaskActivated : public Activated { |
| public: |
| TaskActivated(async_dispatcher_t* dispatcher, async_task_t* task) |
| : dispatcher_(dispatcher), task_(task) {} |
| |
| void Dispatch() const override { task_->handler(dispatcher_, task_, ZX_OK); } |
| |
| void Cancel() const override { task_->handler(dispatcher_, task_, ZX_ERR_CANCELED); } |
| |
| bool Matches(void* task_or_wait) const override { return task_or_wait == task_; } |
| |
| zx::time DueTime() const override { return zx::time(task_->deadline); } |
| |
| private: |
| async_dispatcher_t* const dispatcher_; |
| async_task_t* const task_; |
| }; |
| |
| class TestLoopDispatcher::WaitActivated : public Activated { |
| public: |
| WaitActivated(async_dispatcher_t* dispatcher, async_wait_t* wait, zx_port_packet_t packet) |
| : dispatcher_(dispatcher), wait_(wait), packet_(std::move(packet)) {} |
| |
| void Dispatch() const override { |
| wait_->handler(dispatcher_, wait_, packet_.status, &packet_.signal); |
| } |
| |
| void Cancel() const override { wait_->handler(dispatcher_, wait_, ZX_ERR_CANCELED, nullptr); } |
| |
| bool Matches(void* task_or_wait) const override { return task_or_wait == wait_; } |
| |
| zx::time DueTime() const override { return zx::time::infinite(); } |
| |
| private: |
| async_dispatcher_t* const dispatcher_; |
| async_wait_t* const wait_; |
| zx_port_packet_t const packet_; |
| }; |
| |
| TestLoopDispatcher::TestLoopDispatcher() : async_test_subloop_t{&subloop_ops}, in_shutdown_(false) { |
| zx_status_t status = zx::port::create(0u, &port_); |
| ZX_ASSERT_MSG(status == ZX_OK, "zx_port_create: %s", zx_status_get_string(status)); |
| } |
| |
| TestLoopDispatcher::~TestLoopDispatcher() { Shutdown(); } |
| |
| zx_status_t TestLoopDispatcher::BeginWait(async_wait_t* wait) { |
| ZX_DEBUG_ASSERT(wait); |
| |
| std::lock_guard<std::mutex> lock(dispatcher_mtx_); |
| if (in_shutdown_) { |
| return ZX_ERR_CANCELED; |
| } |
| |
| zx_status_t status = zx_object_wait_async( |
| wait->object, port_.get(), reinterpret_cast<uintptr_t>(wait), wait->trigger, wait->options); |
| if (status != ZX_OK) { |
| return status; |
| } |
| pending_waits_.insert(wait); |
| return ZX_OK; |
| } |
| |
| zx_status_t TestLoopDispatcher::CancelWait(async_wait_t* wait) { |
| ZX_DEBUG_ASSERT(wait); |
| std::lock_guard<std::mutex> lock(dispatcher_mtx_); |
| auto it = pending_waits_.find(wait); |
| if (it != pending_waits_.end()) { |
| pending_waits_.erase(it); |
| return zx_port_cancel(port_.get(), wait->object, reinterpret_cast<uintptr_t>(wait)); |
| } |
| |
| return CancelActivatedTaskOrWaitLocked(wait); |
| } |
| |
| zx_status_t TestLoopDispatcher::PostTask(async_task_t* task) { |
| ZX_DEBUG_ASSERT(task); |
| |
| std::lock_guard<std::mutex> lock(dispatcher_mtx_); |
| if (in_shutdown_) { |
| return ZX_ERR_CANCELED; |
| } |
| |
| if (task->deadline <= NowLocked().get()) { |
| ExtractActivatedLocked(); |
| activated_.push_back(std::make_unique<TaskActivated>(this, task)); |
| return ZX_OK; |
| } |
| |
| future_tasks_.insert(task); |
| return ZX_OK; |
| } |
| |
| zx_status_t TestLoopDispatcher::CancelTask(async_task_t* task) { |
| ZX_DEBUG_ASSERT(task); |
| std::lock_guard<std::mutex> lock(dispatcher_mtx_); |
| auto task_it = std::find(future_tasks_.begin(), future_tasks_.end(), task); |
| if (task_it != future_tasks_.end()) { |
| future_tasks_.erase(task_it); |
| return ZX_OK; |
| } |
| |
| return CancelActivatedTaskOrWaitLocked(task); |
| } |
| |
| void TestLoopDispatcher::AdvanceTimeTo(zx::time time) { |
| std::lock_guard<std::mutex> lock(dispatcher_mtx_); |
| ZX_DEBUG_ASSERT(now_ <= time); |
| now_ = time; |
| } |
| |
| zx::time TestLoopDispatcher::GetNextTaskDueTime() { |
| std::lock_guard<std::mutex> lock(dispatcher_mtx_); |
| for (const auto& activated : activated_) { |
| if (activated->DueTime() < zx::time::infinite()) { |
| return activated->DueTime(); |
| } |
| } |
| if (!future_tasks_.empty()) { |
| return zx::time((*future_tasks_.begin())->deadline); |
| } |
| return zx::time::infinite(); |
| } |
| |
| bool TestLoopDispatcher::HasPendingWork() { |
| std::lock_guard<std::mutex> lock(dispatcher_mtx_); |
| ExtractActivatedLocked(); |
| return !activated_.empty(); |
| } |
| |
| bool TestLoopDispatcher::DispatchNextDueMessage() { |
| std::unique_ptr<Activated> activated_element = nullptr; |
| { |
| std::lock_guard<std::mutex> lock(dispatcher_mtx_); |
| ExtractActivatedLocked(); |
| if (activated_.empty()) { |
| return false; |
| } |
| activated_element = std::move(activated_.front()); |
| activated_.erase(activated_.begin()); |
| } |
| // Release the lock to avoid deadlocking on reentrant tasks. |
| async_dispatcher_t* previous_dispatcher = async_get_default_dispatcher(); |
| async_set_default_dispatcher(this); |
| activated_element->Dispatch(); |
| async_set_default_dispatcher(previous_dispatcher); |
| |
| return true; |
| } |
| |
| void TestLoopDispatcher::ExtractActivatedLocked() { |
| zx_port_packet_t packet; |
| while (port_.wait(zx::time(0), &packet) == ZX_OK) { |
| async_wait_t* wait = reinterpret_cast<async_wait_t*>(packet.key); |
| pending_waits_.erase(wait); |
| activated_.push_back(std::make_unique<WaitActivated>(this, wait, std::move(packet))); |
| } |
| |
| // Move all tasks that reach their deadline to the activated list. |
| while (!future_tasks_.empty() && (*future_tasks_.begin())->deadline <= NowLocked().get()) { |
| activated_.push_back(std::make_unique<TaskActivated>(this, (*future_tasks_.begin()))); |
| future_tasks_.erase(future_tasks_.begin()); |
| } |
| } |
| |
| // Unique lock does not support TA annotations. |
| // Lock needs to be released for reentrant handlers. |
| void TestLoopDispatcher::Shutdown() __TA_NO_THREAD_SAFETY_ANALYSIS { |
| std::unique_lock<std::mutex> lock(dispatcher_mtx_); |
| |
| if (in_shutdown_) { |
| return; |
| } |
| |
| in_shutdown_ = true; |
| |
| while (!future_tasks_.empty()) { |
| auto task = *future_tasks_.begin(); |
| future_tasks_.erase(future_tasks_.begin()); |
| lock.unlock(); |
| task->handler(this, task, ZX_ERR_CANCELED); |
| lock.lock(); |
| } |
| |
| while (!pending_waits_.empty()) { |
| auto wait = *pending_waits_.begin(); |
| pending_waits_.erase(pending_waits_.begin()); |
| lock.unlock(); |
| wait->handler(this, wait, ZX_ERR_CANCELED, nullptr); |
| lock.lock(); |
| } |
| |
| while (!activated_.empty()) { |
| auto activated = std::move(activated_.front()); |
| activated_.erase(activated_.begin()); |
| lock.unlock(); |
| activated->Cancel(); |
| lock.lock(); |
| } |
| } |
| |
| zx_status_t TestLoopDispatcher::CancelActivatedTaskOrWaitLocked(void* task_or_wait) { |
| auto activated_it = |
| std::find_if(activated_.begin(), activated_.end(), |
| [&](const auto& activated) { return activated->Matches(task_or_wait); }); |
| if (activated_it != activated_.end()) { |
| activated_.erase(activated_it); |
| return ZX_OK; |
| } |
| |
| return ZX_ERR_NOT_FOUND; |
| } |
| |
| void TestLoopDispatcher::AdvanceTimeTo(async_test_subloop_t* subloop, zx_time_t time) { |
| TestLoopDispatcher* self = static_cast<TestLoopDispatcher*>(subloop); |
| return self->AdvanceTimeTo(zx::time(time)); |
| } |
| |
| uint8_t TestLoopDispatcher::DispatchNextDueMessage(async_test_subloop_t* subloop) { |
| TestLoopDispatcher* self = static_cast<TestLoopDispatcher*>(subloop); |
| return self->DispatchNextDueMessage(); |
| } |
| |
| uint8_t TestLoopDispatcher::HasPendingWork(async_test_subloop_t* subloop) { |
| TestLoopDispatcher* self = static_cast<TestLoopDispatcher*>(subloop); |
| return self->HasPendingWork(); |
| } |
| |
| zx_time_t TestLoopDispatcher::GetNextTaskDueTime(async_test_subloop_t* subloop) { |
| TestLoopDispatcher* self = static_cast<TestLoopDispatcher*>(subloop); |
| return self->GetNextTaskDueTime().get(); |
| } |
| |
| void TestLoopDispatcher::Finalize(async_test_subloop_t* subloop) { |
| auto self = std::unique_ptr<TestLoopDispatcher>(static_cast<TestLoopDispatcher*>(subloop)); |
| } |
| |
| } // namespace |
| |
| void NewTestLoopDispatcher(async_dispatcher_t** dispatcher, async_test_subloop_t** loop) { |
| auto dispatcher_loop = std::make_unique<TestLoopDispatcher>(); |
| *dispatcher = dispatcher_loop.get(); |
| *loop = dispatcher_loop.release(); |
| } |
| |
| } // namespace async |