From 02490155153f127d3ad10b1f8949fde3aff8903e Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Mon, 6 Sep 2021 11:37:51 +0000 Subject: [PATCH] added priority queue --- contrib/boost | 2 +- contrib/boost-cmake/CMakeLists.txt | 10 +++- src/CMakeLists.txt | 2 +- src/Common/PriorityQueue.h | 53 ------------------- src/Common/ThreadPool.cpp | 12 +++-- src/Common/ThreadPool.h | 4 +- src/Common/tests/gtest_priority_queue.cpp | 25 --------- .../MergeTree/MergeMutateExecutor.cpp | 20 ++++--- 8 files changed, 34 insertions(+), 94 deletions(-) delete mode 100644 src/Common/PriorityQueue.h delete mode 100644 src/Common/tests/gtest_priority_queue.cpp diff --git a/contrib/boost b/contrib/boost index 4b98e2befd3..66d17f060c4 160000 --- a/contrib/boost +++ b/contrib/boost @@ -1 +1 @@ -Subproject commit 4b98e2befd3f3265b0db0acb5d20c4812ef8d88e +Subproject commit 66d17f060c4867aeea99fa2a20cfdae89ae2a2ec diff --git a/contrib/boost-cmake/CMakeLists.txt b/contrib/boost-cmake/CMakeLists.txt index 9a2234ac9f6..7996d1b66b8 100644 --- a/contrib/boost-cmake/CMakeLists.txt +++ b/contrib/boost-cmake/CMakeLists.txt @@ -15,11 +15,12 @@ if (NOT USE_INTERNAL_BOOST_LIBRARY) coroutine graph circular_buffer + heap ) if(Boost_INCLUDE_DIR AND Boost_FILESYSTEM_LIBRARY AND Boost_PROGRAM_OPTIONS_LIBRARY AND Boost_REGEX_LIBRARY AND Boost_SYSTEM_LIBRARY AND Boost_CONTEXT_LIBRARY AND - Boost_COROUTINE_LIBRARY AND Boost_GRAPH_LIBRARY AND Boost_CIRCULAR_BUFFER_LIBRARY) + Boost_COROUTINE_LIBRARY AND Boost_GRAPH_LIBRARY AND Boost_CIRCULAR_BUFFER_LIBRARY AND Boost_HEAP_LIBRARY) set(EXTERNAL_BOOST_FOUND 1) @@ -242,9 +243,14 @@ if (NOT EXTERNAL_BOOST_FOUND) target_include_directories (_boost_graph PRIVATE ${LIBRARY_DIR}) target_link_libraries(_boost_graph PRIVATE _boost_regex) - + # circular buffer add_library(_boost_circular_buffer INTERFACE) add_library(boost::circular_buffer ALIAS _boost_circular_buffer) target_include_directories(_boost_circular_buffer SYSTEM BEFORE INTERFACE ${LIBRARY_DIR}) + # heap + add_library(_boost_heap INTERFACE) + add_library(boost::heap ALIAS _boost_heap) + target_include_directories(_boost_heap SYSTEM BEFORE INTERFACE ${LIBRARY_DIR}) + endif () diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 54e73389d3f..48cff3e8e91 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -297,7 +297,6 @@ target_link_libraries(clickhouse_common_io PUBLIC boost::program_options boost::system - boost::circular_buffer ${CITYHASH_LIBRARIES} ${ZLIB_LIBRARIES} pcg_random @@ -337,6 +336,7 @@ dbms_target_link_libraries ( boost::filesystem boost::program_options boost::circular_buffer + boost::heap clickhouse_common_config clickhouse_common_zookeeper clickhouse_dictionaries_embedded diff --git a/src/Common/PriorityQueue.h b/src/Common/PriorityQueue.h deleted file mode 100644 index b845fc0caa8..00000000000 --- a/src/Common/PriorityQueue.h +++ /dev/null @@ -1,53 +0,0 @@ -#pragma once - -#include - -namespace DB -{ - - -template > -class PriorityQueue -{ -public: - - T pop() - { - assert(!buffer.empty()); - std::pop_heap(buffer.begin(), buffer.end(), comparator); - auto element = std::move(buffer.back()); - buffer.pop_back(); - return element; - } - - void push(T element) - { - buffer.push_back(std::move(element)); - std::push_heap(buffer.begin(), buffer.end(), comparator); - } - - template< class... Args > - void emplace(Args &&... args) - { - buffer.emplace_back(std::forward(args)...); - std::push_heap(buffer.begin(), buffer.end(), comparator); - } - - bool empty() { return buffer.empty(); } - size_t size() { return buffer.size(); } - void reserve(size_t count) { buffer.reserve(count); } - void resize(size_t count) - { - buffer.resize(count); - std::make_heap(buffer.begin(), buffer.end(), comparator); - } - -private: - - Comparator comparator; - std::vector buffer; - - -}; - -} diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index 641b51e0d60..f14d68da662 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -123,7 +123,6 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, int priority, std::opti /// Check if there are enough threads to process job. if (threads.size() < std::min(max_threads, scheduled_jobs + 1)) { - ALLOW_ALLOCATIONS_IN_SCOPE; try { threads.emplace_front(); @@ -249,9 +248,16 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ need_shutdown = shutdown; if (!jobs.empty()) - job = std::move(jobs.pop().job); + { + job = std::move(jobs.top().job); + jobs.pop(); + } else - return; /// shutdown is true, simply finish the thread. + { + /// shutdown is true, simply finish the thread. + return; + } + } if (!need_shutdown) diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index 25d86a2fabc..b7e59041839 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -9,6 +9,8 @@ #include #include +#include + #include #include #include @@ -104,7 +106,7 @@ private: } }; - DB::PriorityQueue jobs; + boost::heap::priority_queue jobs; std::list threads; std::exception_ptr first_exception; diff --git a/src/Common/tests/gtest_priority_queue.cpp b/src/Common/tests/gtest_priority_queue.cpp deleted file mode 100644 index c2c943ea4cc..00000000000 --- a/src/Common/tests/gtest_priority_queue.cpp +++ /dev/null @@ -1,25 +0,0 @@ -#include - -#include - -#include - -using namespace DB; - -TEST(PriorityQueue, Simple) -{ - PriorityQueue my; - std::priority_queue original; - - for (int i = 0; i < 1000; ++i) - { - my.push(i); - original.emplace(i); - } - - for (int i = 0; i < 1000; ++i) - { - ASSERT_EQ(my.pop(), original.top()); - original.pop(); - } -} diff --git a/src/Storages/MergeTree/MergeMutateExecutor.cpp b/src/Storages/MergeTree/MergeMutateExecutor.cpp index 15ee6f7f2fc..cbed2aa9f69 100644 --- a/src/Storages/MergeTree/MergeMutateExecutor.cpp +++ b/src/Storages/MergeTree/MergeMutateExecutor.cpp @@ -207,15 +207,19 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction() ItemPtr item = std::move(pending.front()); pending.pop_front(); - /// Execute a piece of task - bool res = pool.trySchedule([this, item] + bool res = false; { - routine(item); - /// When storage shutdowns it will wait until all related background tasks - /// are finished, because they may want to interact with its fields - /// and this will cause segfault. - item->is_done.set(); - }); + ALLOW_ALLOCATIONS_IN_SCOPE; + /// Execute a piece of task + res = pool.trySchedule([this, item] + { + routine(item); + /// When storage shutdowns it will wait until all related background tasks + /// are finished, because they may want to interact with its fields + /// and this will cause segfault. + item->is_done.set(); + }); + } if (!res) {