From 4b2b76cf012906d393a08e6958c40411e8048e2e Mon Sep 17 00:00:00 2001 From: Francisco Paisana Date: Mon, 13 Apr 2020 18:00:02 +0100 Subject: [PATCH] created inplace task to avoid mallocs in task enqueuing --- lib/include/srslte/common/multiqueue.h | 82 ++++++++---- lib/include/srslte/common/task.h | 174 +++++++++++++++++++++++++ lib/test/common/queue_test.cc | 37 ++++++ 3 files changed, 267 insertions(+), 26 deletions(-) create mode 100644 lib/include/srslte/common/task.h diff --git a/lib/include/srslte/common/multiqueue.h b/lib/include/srslte/common/multiqueue.h index 1e28e6317..1550618fb 100644 --- a/lib/include/srslte/common/multiqueue.h +++ b/lib/include/srslte/common/multiqueue.h @@ -28,6 +28,7 @@ #ifndef SRSLTE_MULTIQUEUE_H #define SRSLTE_MULTIQUEUE_H +#include "task.h" #include #include #include @@ -40,21 +41,50 @@ namespace srslte { template class multiqueue_handler { - // NOTE: needed to create a queue wrapper to make its move ctor noexcept. - // otherwise we couldnt use the resize method of std::vector> if myobj is move-only - class queue_wrapper : private std::queue + class circular_buffer { public: - queue_wrapper() = default; - queue_wrapper(queue_wrapper&& other) noexcept : std::queue(std::move(other)) {} - using std::queue::push; - using std::queue::pop; - using std::queue::size; - using std::queue::empty; - using std::queue::front; + circular_buffer(uint32_t cap) : buffer(cap + 1) {} + circular_buffer(circular_buffer&& other) noexcept + { + active = other.active; + other.active = false; + widx = other.widx; + ridx = other.ridx; + buffer = std::move(other.buffer); + } std::condition_variable cv_full; bool active = true; + + bool empty() const { return widx == ridx; } + size_t size() const { return widx >= ridx ? widx - ridx : widx + (buffer.size() - ridx); } + bool full() const { return (ridx > 0) ? widx == ridx - 1 : widx == buffer.size() - 1; } + size_t capacity() const { return buffer.size() - 1; } + + template + void push(T&& o) noexcept + { + buffer[widx++] = std::forward(o); + if (widx >= buffer.size()) { + widx = 0; + } + } + + void pop() noexcept + { + ridx++; + if (ridx >= buffer.size()) { + ridx = 0; + } + } + + myobj& front() noexcept { return buffer[ridx]; } + const myobj& front() const noexcept { return buffer[ridx]; } + + private: + std::vector buffer; + size_t widx = 0, ridx = 0; }; public: @@ -76,7 +106,7 @@ public: int queue_id = -1; }; - explicit multiqueue_handler(uint32_t capacity_ = std::numeric_limits::max()) : capacity(capacity_) {} + explicit multiqueue_handler(uint32_t capacity_ = 8192) : capacity(capacity_) {} ~multiqueue_handler() { reset(); } void reset() @@ -108,7 +138,7 @@ public: ; if (qidx == queues.size()) { // create new queue - queues.emplace_back(); + queues.emplace_back(capacity); } else { queues[qidx].active = true; } @@ -130,7 +160,7 @@ public: { { std::unique_lock lock(mutex); - while (is_queue_active_(q_idx) and queues[q_idx].size() >= capacity) { + while (is_queue_active_(q_idx) and queues[q_idx].full()) { nof_threads_waiting++; queues[q_idx].cv_full.wait(lock); nof_threads_waiting--; @@ -148,7 +178,7 @@ public: { { std::lock_guard lock(mutex); - if (not is_queue_active_(q_idx) or queues[q_idx].size() >= capacity) { + if (not is_queue_active_(q_idx) or queues[q_idx].full()) { return false; } queues[q_idx].push(value); @@ -161,7 +191,7 @@ public: { { std::lock_guard lck(mutex); - if (not is_queue_active_(q_idx) or queues[q_idx].size() >= capacity) { + if (not is_queue_active_(q_idx) or queues[q_idx].full()) { return {false, std::move(value)}; } queues[q_idx].push(std::move(value)); @@ -250,7 +280,7 @@ private: bool round_robin_pop_(myobj* value) { // Round-robin for all queues - for (const queue_wrapper& q : queues) { + for (const circular_buffer& q : queues) { spin_idx = (spin_idx + 1) % queues.size(); if (is_queue_active_(spin_idx) and not queues[spin_idx].empty()) { if (value) { @@ -263,13 +293,13 @@ private: return false; } - std::mutex mutex; - std::condition_variable cv_empty, cv_exit; - uint32_t spin_idx = 0; - bool running = true; - std::vector queues; - uint32_t capacity = 0; - uint32_t nof_threads_waiting = 0; + std::mutex mutex; + std::condition_variable cv_empty, cv_exit; + uint32_t spin_idx = 0; + bool running = true; + std::vector queues; + uint32_t capacity = 0; + uint32_t nof_threads_waiting = 0; }; /*********************************************************** @@ -283,8 +313,7 @@ public: move_function() = default; template move_function(Func&& f) : task_ptr(new derived_task(std::forward(f))) - { - } + {} void operator()(Args&&... args) { (*task_ptr)(std::forward(args)...); } private: @@ -304,7 +333,8 @@ private: std::unique_ptr task_ptr; }; -using move_task_t = move_function<>; +// using move_task_t = move_function<>; +using move_task_t = inplace_task; using task_multiqueue = multiqueue_handler; } // namespace srslte diff --git a/lib/include/srslte/common/task.h b/lib/include/srslte/common/task.h new file mode 100644 index 000000000..2bd6c97de --- /dev/null +++ b/lib/include/srslte/common/task.h @@ -0,0 +1,174 @@ +/* + * Copyright 2013-2020 Software Radio Systems Limited + * + * This file is part of srsLTE. + * + * srsLTE is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * srsLTE is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * A copy of the GNU Affero General Public License can be found in + * the LICENSE file in the top-level directory of this distribution + * and at http://www.gnu.org/licenses/. + * + */ + +#ifndef SRSLTE_TASK_H +#define SRSLTE_TASK_H + +#include +#include +#include + +namespace srslte { + +constexpr size_t default_buffer_size = 256; + +template +class inplace_task; + +namespace task_details { + +template +struct oper_table_t { + using call_oper_t = R (*)(void* src, Args&&... args); + using move_oper_t = void (*)(void* src, void* dest); + // using copy_oper_t = void (*)(void* src, void* dest); + using dtor_oper_t = void (*)(void* src); + + static oper_table_t* get_empty() noexcept + { + static oper_table_t t; + t.call = [](void* src, Args&&... args) -> R { throw std::bad_function_call(); }; + t.move = [](void*, void*) {}; + // t.copy = [](void*, void*) {}; + t.dtor = [](void*) {}; + return &t; + } + + template + static oper_table_t* get() noexcept + { + static oper_table_t t{}; + t.call = [](void* src, Args&&... args) -> R { return (*static_cast(src))(std::forward(args)...); }; + t.move = [](void* src, void* dest) -> void { + ::new (dest) Func{std::move(*static_cast(src))}; + static_cast(src)->~Func(); + }; + // t.copy = [](void* src, void* dest) -> void { ::new (dest) Func{*static_cast(src)}; }; + t.dtor = [](void* src) -> void { static_cast(src)->~Func(); }; + return &t; + } + + oper_table_t(const oper_table_t&) = delete; + oper_table_t(oper_table_t&&) = delete; + oper_table_t& operator=(const oper_table_t&) = delete; + oper_table_t& operator=(oper_table_t&&) = delete; + ~oper_table_t() = default; + + call_oper_t call; + move_oper_t move; + // copy_oper_t copy; + dtor_oper_t dtor; + + static oper_table_t* empty_oper; + +private: + oper_table_t() = default; +}; + +template +struct is_inplace_task : std::false_type {}; +template +struct is_inplace_task > : std::true_type {}; + +template +oper_table_t* oper_table_t::empty_oper = oper_table_t::get_empty(); + +} // namespace task_details + +template +class inplace_task +{ + using storage_t = typename std::aligned_storage::type; + using oper_table_t = task_details::oper_table_t; + +public: + inplace_task() noexcept { oper_ptr = oper_table_t::empty_oper; } + + template ::type, + typename = typename std::enable_if::value>::type> + inplace_task(T&& function) + { + static_assert(sizeof(FunT) <= sizeof(buffer), "inplace_task cannot store object with given size.\n"); + static_assert(Alignment % alignof(FunT) == 0, "inplace_task cannot store object with given alignment.\n"); + + ::new (&buffer) FunT{std::forward(function)}; + oper_ptr = oper_table_t::template get(); + } + + inplace_task(inplace_task&& other) noexcept + { + oper_ptr = other.oper_ptr; + other.oper_ptr = oper_table_t::empty_oper; + oper_ptr->move(&other.buffer, &buffer); + } + + // inplace_task(const inplace_task& other) noexcept + // { + // oper_ptr = other.oper_ptr; + // oper_ptr->copy(&other.buffer, &buffer); + // } + + ~inplace_task() { oper_ptr->dtor(&buffer); } + + inplace_task& operator=(inplace_task&& other) noexcept + { + oper_ptr->dtor(&buffer); + oper_ptr = other.oper_ptr; + other.oper_ptr = oper_table_t::empty_oper; + oper_ptr->move(&other.buffer, &buffer); + return *this; + } + + // inplace_task& operator=(const inplace_task& other) noexcept + // { + // if (this != &other) { + // oper_ptr->dtor(&buffer); + // oper_ptr = other.oper_ptr; + // oper_ptr->copy(&other.buffer, &buffer); + // } + // return *this; + // } + + R operator()(Args&&... args) { return oper_ptr->call(&buffer, std::forward(args)...); } + + bool is_empty() const { return oper_ptr == oper_table_t::empty_oper; } + + void swap(inplace_task& other) noexcept + { + if (this == &other) + return; + + storage_t tmp; + oper_ptr->move(&buffer, &tmp); + other.oper_ptr->move(&other.buffer, &buffer); + oper_ptr->move(&tmp, &other.buffer); + std::swap(oper_ptr, other.oper_ptr); + } + +private: + storage_t buffer; + oper_table_t* oper_ptr; +}; + +} // namespace srslte + +#endif // SRSLTE_TASK_H diff --git a/lib/test/common/queue_test.cc b/lib/test/common/queue_test.cc index c65b66db4..b6c66019c 100644 --- a/lib/test/common/queue_test.cc +++ b/lib/test/common/queue_test.cc @@ -20,6 +20,7 @@ */ #include "srslte/common/multiqueue.h" +#include "srslte/common/task.h" #include "srslte/common/thread_pool.h" #include #include @@ -320,6 +321,40 @@ int test_task_thread_pool3() return 0; } +struct C { + std::unique_ptr val{new int{5}}; +}; + +int test_inplace_task() +{ + std::cout << "\n======= TEST inplace task: start =======\n"; + int v = 0; + + srslte::inplace_task t{[&v]() { v = 1; }}; + srslte::inplace_task t2{[v]() mutable { v = 2; }}; + + t(); + t2(); + TESTASSERT(v == 1); + v = 2; + decltype(t) t3 = std::move(t); + t3(); + TESTASSERT(v == 1); + + C c; + srslte::inplace_task t4{std::bind([&v](C& c) { v = *c.val; }, std::move(c))}; + { + decltype(t4) t5; + t5 = std::move(t4); + t5(); + TESTASSERT(v == 5); + } + + std::cout << "outcome: Success\n"; + std::cout << "========================================\n"; + return 0; +} + int main() { TESTASSERT(test_multiqueue() == 0); @@ -330,4 +365,6 @@ int main() TESTASSERT(test_task_thread_pool() == 0); TESTASSERT(test_task_thread_pool2() == 0); TESTASSERT(test_task_thread_pool3() == 0); + + TESTASSERT(test_inplace_task() == 0); }