From a6fe91ca47be99ab91dafc76de966619592dc0cf Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Thu, 2 Sep 2021 17:40:29 +0000 Subject: [PATCH] get rid of half of allocations --- src/Common/PlainMultiSet.h | 114 ++++++++++++++ src/Common/RingBuffer.h | 122 +++++++++++++++ src/Common/ThreadPool.h | 1 - src/Common/tests/gtest_plain_multiset.cpp | 22 +++ src/Common/tests/gtest_ringbuffer.cpp | 142 ++++++++++++++++++ src/Interpreters/Context.cpp | 33 ++-- .../MergeTree/MergeMutateExecutor.cpp | 35 +++-- src/Storages/MergeTree/MergeMutateExecutor.h | 74 +++++---- .../MergeTree/tests/gtest_executor.cpp | 24 +-- 9 files changed, 505 insertions(+), 62 deletions(-) create mode 100644 src/Common/PlainMultiSet.h create mode 100644 src/Common/RingBuffer.h create mode 100644 src/Common/tests/gtest_plain_multiset.cpp create mode 100644 src/Common/tests/gtest_ringbuffer.cpp diff --git a/src/Common/PlainMultiSet.h b/src/Common/PlainMultiSet.h new file mode 100644 index 00000000000..d5b646a53db --- /dev/null +++ b/src/Common/PlainMultiSet.h @@ -0,0 +1,114 @@ +#pragma once + +#include + +namespace DB +{ + + +/** + * Class with O(n) complexity for all methods + * Size has to be fixed. + * The main reason to use this is to get rid of any allocations. + * Used is some executors, where the number of elements is really small. + */ +template +class PlainMultiSet +{ +public: + + explicit PlainMultiSet(size_t capacity_) + { + buffer.resize(capacity_); + } + + + bool tryPush(T element) + { + for (auto & item : buffer) + { + if (item.state == State::EMPTY) + { + item.state = State::FILLED; + item.value = std::move(element); + ++count; + return true; + } + } + + + return false; + } + + bool has(T element) + { + for (auto & item : buffer) + if (item.state == State::FILLED && item.value == element) + return true; + + return false; + } + + + template + std::vector getAll(Predicate && predicate) + { + std::vector suitable; + for (auto & item : buffer) + if (item.state == State::FILLED && predicate(item.value)) + suitable.emplace_back(item.value); + + return suitable; + } + + + bool tryErase(const T & element) + { + for (auto & item : buffer) + { + if (item.state == State::FILLED && item.value == element) + { + item.state = State::EMPTY; + item.value = T{}; + --count; + return true; + } + } + + return false; + } + + size_t size() + { + return count; + } + + void reserve(size_t new_capacity) + { + if (buffer.size() >= new_capacity) + return; + + std::vector new_buffer(std::move(buffer)); + new_buffer.reserve(new_capacity); + + std::swap(new_buffer, buffer); + } + +private: + enum class State + { + EMPTY, + FILLED + }; + + struct Item + { + T value; + State state{State::EMPTY}; + }; + + size_t count{0}; + std::vector buffer; +}; + +} diff --git a/src/Common/RingBuffer.h b/src/Common/RingBuffer.h new file mode 100644 index 00000000000..a619ae2dab6 --- /dev/null +++ b/src/Common/RingBuffer.h @@ -0,0 +1,122 @@ +#pragma once + +#include +#include + +namespace DB +{ + +/** + * A ring buffer of fixed size. + * With an ability to expand / narrow. + * When narrowing only first N elements remain. + */ + +template +class RingBuffer { +public: + explicit RingBuffer(size_t capacity_) : capacity(capacity_) + { + buffer.assign(capacity, {}); + } + + size_t size() const + { + return count; + } + + bool empty() const + { + return count == 0; + } + + bool tryPush(T element) + { + if (count == capacity) { + return false; + } + buffer[(position + count) % capacity] = element; + ++count; + return true; + } + + bool tryPop(T * element) + { + if (empty()) { + return false; + } + *element = std::move(buffer[position]); + --count; + position = (position + 1) % capacity; + return true; + } + + template + void removeElements(Predicate && predicate) + { + /// Shift all elements to the beginning of the buffer + std::rotate(buffer.begin(), buffer.begin() + position, buffer.end()); + + /// Remove elements + auto end_removed = std::remove_if(buffer.begin(), buffer.begin() + count, predicate); + + size_t new_count = std::distance(buffer.begin(), end_removed); + + for (size_t i = new_count; i < count; ++i) + buffer[i] = T{}; + + count = new_count; + position = 0; + } + + + void resize(size_t new_capacity) + { + if (new_capacity > capacity) + expand(new_capacity); + else if (new_capacity < capacity) + narrow(new_capacity); + } + +private: + + void expand(size_t new_capacity) + { + bool overflow = (position + count) > capacity; + buffer.resize(new_capacity); + + if (overflow) + { + size_t count_before_end = capacity - position; + for (size_t i = 0; i < count_before_end; ++i) + buffer[new_capacity - i] = buffer[capacity - i]; + position = new_capacity - count_before_end; + } + + capacity = new_capacity; + } + + void narrow(size_t new_capacity) + { + std::vector new_buffer(new_capacity); + + count = std::min(new_capacity, count); + for (size_t i = 0; i < count; ++i) + new_buffer[i] = buffer[(position + i) % capacity]; + + std::swap(buffer, new_buffer); + + position = 0; + capacity = new_capacity; + } + + + std::vector buffer; + size_t position{0}; + size_t count{0}; + size_t capacity{0}; +}; + + + +} diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index 1da5d25eef0..1cac87abb70 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -107,7 +107,6 @@ private: std::list threads; std::exception_ptr first_exception; - template ReturnType scheduleImpl(Job job, int priority, std::optional wait_microseconds); diff --git a/src/Common/tests/gtest_plain_multiset.cpp b/src/Common/tests/gtest_plain_multiset.cpp new file mode 100644 index 00000000000..2018053d6a7 --- /dev/null +++ b/src/Common/tests/gtest_plain_multiset.cpp @@ -0,0 +1,22 @@ +#include + +#include + +#include + +using namespace DB; + + +TEST(PlainMultiSet, Simple) +{ + PlainMultiSet set(10); + + ASSERT_TRUE(set.tryPush(1)); + ASSERT_TRUE(set.tryPush(1)); + ASSERT_TRUE(set.tryPush(2)); + ASSERT_TRUE(set.tryPush(3)); + + ASSERT_TRUE(set.has(1)); + ASSERT_TRUE(set.has(2)); + ASSERT_TRUE(set.has(3)); +} diff --git a/src/Common/tests/gtest_ringbuffer.cpp b/src/Common/tests/gtest_ringbuffer.cpp new file mode 100644 index 00000000000..e56739b21cc --- /dev/null +++ b/src/Common/tests/gtest_ringbuffer.cpp @@ -0,0 +1,142 @@ +#include + +#include + +#include + +using namespace DB; + +TEST(RingBuffer, Empty) +{ + RingBuffer buffer(1); + + ASSERT_TRUE(buffer.size() == 0u); // NOLINT + ASSERT_TRUE(buffer.empty()); +} + +TEST(RingBuffer, PushAndPop) +{ + RingBuffer buffer(2); + + int i; + ASSERT_TRUE(true == buffer.tryPush(0)); + ASSERT_TRUE(true == buffer.tryPush(1)); + ASSERT_TRUE(false == buffer.tryPush(2)); + + ASSERT_TRUE(2u == buffer.size()); + ASSERT_TRUE(false == buffer.empty()); + + ASSERT_TRUE(true == buffer.tryPop(&i)); + ASSERT_TRUE(0 == i); + ASSERT_TRUE(true == buffer.tryPop(&i)); + ASSERT_TRUE(1 == i); + + ASSERT_TRUE(false == buffer.tryPop(&i)); + ASSERT_TRUE(buffer.empty()); + ASSERT_TRUE(true == buffer.empty()); +} + +TEST(RingBuffer, Random) +{ + std::random_device device; + std::mt19937 generator(device()); + + std::uniform_int_distribution<> distribution(0, 1); + + RingBuffer buffer(10); + + int next_element = 0; + int next_received_element = 0; + for (int i = 0; i < 100000; ++i) { + if (distribution(generator) == 0) + { + if (buffer.tryPush(next_element)) + next_element++; + } + else + { + int element; + if (buffer.tryPop(&element)) + { + ASSERT_TRUE(next_received_element == element); + next_received_element++; + } + } + } +} + + +TEST(RingBuffer, Resize) +{ + RingBuffer buffer(10); + + for (size_t i = 0; i < 10; ++i) + ASSERT_TRUE(buffer.tryPush(i)); + + buffer.resize(0); + + ASSERT_TRUE(buffer.empty()); + ASSERT_EQ(buffer.size(), 0u); + + ASSERT_FALSE(buffer.tryPush(42)); + + int value; + ASSERT_FALSE(buffer.tryPop(&value)); + + buffer.resize(1); + + ASSERT_TRUE(buffer.tryPush(42)); + ASSERT_TRUE(buffer.tryPop(&value)); + ASSERT_EQ(value, 42); + + buffer.resize(42); + + for (size_t i = 0; i < 42; ++i) + ASSERT_TRUE(buffer.tryPush(i)); + + buffer.resize(56); + + for (size_t i = 0; i < 42; ++i) + { + ASSERT_TRUE(buffer.tryPop(&value)); + ASSERT_EQ(value, i); + } + + for (size_t i = 0; i < 56; ++i) + ASSERT_TRUE(buffer.tryPush(i)); + + buffer.resize(13); + + for (size_t i = 0; i < 13; ++i) + { + ASSERT_TRUE(buffer.tryPop(&value)); + ASSERT_EQ(value, i); + } +} + + +TEST(RingBuffer, removeElements) +{ + RingBuffer buffer(10); + + for (size_t i = 0; i < 10; ++i) + ASSERT_TRUE(buffer.tryPush(i)); + + int value; + ASSERT_TRUE(buffer.tryPop(&value)); + ASSERT_TRUE(buffer.tryPop(&value)); + ASSERT_TRUE(buffer.tryPop(&value)); + + buffer.removeElements([](int current) { return current % 2 == 0; }); + + ASSERT_EQ(buffer.size(), 4); + + ASSERT_TRUE(buffer.tryPop(&value)); + ASSERT_EQ(value, 3); + ASSERT_TRUE(buffer.tryPop(&value)); + ASSERT_EQ(value, 5); + ASSERT_TRUE(buffer.tryPop(&value)); + ASSERT_EQ(value, 7); + ASSERT_TRUE(buffer.tryPop(&value)); + ASSERT_EQ(value, 9); +} diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index c1e92f1be0e..2c7bf9e6eca 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2729,21 +2729,30 @@ PartUUIDsPtr Context::getIgnoredPartUUIDs() const void Context::initializeBackgroundExecutors() { - shared->merge_mutate_executor = MergeTreeBackgroundExecutor::create(MergeTreeBackgroundExecutor::Type::MERGE_MUTATE); - shared->moves_executor = MergeTreeBackgroundExecutor::create(MergeTreeBackgroundExecutor::Type::MOVE); - shared->fetch_executor = MergeTreeBackgroundExecutor::create(MergeTreeBackgroundExecutor::Type::FETCH); + shared->merge_mutate_executor = MergeTreeBackgroundExecutor::create + ( + MergeTreeBackgroundExecutor::Type::MERGE_MUTATE, + [this] () { return getSettingsRef().background_pool_size; }, + [this] () { return getSettingsRef().background_pool_size; }, + CurrentMetrics::BackgroundPoolTask + ); - shared->merge_mutate_executor->setThreadsCount([this] () { return getSettingsRef().background_pool_size; }); - shared->merge_mutate_executor->setTasksCount([this] () { return getSettingsRef().background_pool_size; }); - shared->merge_mutate_executor->setMetric(CurrentMetrics::BackgroundPoolTask); + shared->moves_executor = MergeTreeBackgroundExecutor::create + ( + MergeTreeBackgroundExecutor::Type::MOVE, + [this] () { return getSettingsRef().background_move_pool_size; }, + [this] () { return getSettingsRef().background_move_pool_size; }, + CurrentMetrics::BackgroundMovePoolTask + ); - shared->moves_executor->setThreadsCount([this] () { return getSettingsRef().background_move_pool_size; }); - shared->moves_executor->setTasksCount([this] () { return getSettingsRef().background_move_pool_size; }); - shared->moves_executor->setMetric(CurrentMetrics::BackgroundMovePoolTask); - shared->fetch_executor->setThreadsCount([this] () { return getSettingsRef().background_fetches_pool_size; }); - shared->fetch_executor->setTasksCount([this] () { return getSettingsRef().background_fetches_pool_size; }); - shared->fetch_executor->setMetric(CurrentMetrics::BackgroundFetchesPoolTask); + shared->fetch_executor = MergeTreeBackgroundExecutor::create + ( + MergeTreeBackgroundExecutor::Type::FETCH, + [this] () { return getSettingsRef().background_fetches_pool_size; }, + [this] () { return getSettingsRef().background_fetches_pool_size; }, + CurrentMetrics::BackgroundFetchesPoolTask + ); } diff --git a/src/Storages/MergeTree/MergeMutateExecutor.cpp b/src/Storages/MergeTree/MergeMutateExecutor.cpp index c7b025a74f3..5aecf85a7a4 100644 --- a/src/Storages/MergeTree/MergeMutateExecutor.cpp +++ b/src/Storages/MergeTree/MergeMutateExecutor.cpp @@ -32,12 +32,10 @@ void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id /// Mark this StorageID as deleting currently_deleting.emplace(id); - std::erase_if(pending, [&] (auto item) -> bool { return item->task->getStorageID() == id; }); + pending.removeElements([&] (auto item) -> bool { return item->task->getStorageID() == id; }); /// Find pending to wait - for (const auto & item : active) - if (item->task->getStorageID() == id) - tasks_to_wait.emplace_back(item); + tasks_to_wait = active.getAll([&] (auto item) -> bool { return item->task->getStorageID() == id; }); } @@ -66,13 +64,24 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction() if (shutdown_suspend) break; - auto item = std::move(pending.front()); - pending.pop_front(); + ItemPtr item; + if (!pending.tryPop(&item)) + continue; - active.emplace(item); + active.tryPush(item); + + + try + { + /// This is needed to increase / decrease the number of threads at runtime + if (update_timer.compareAndRestartDeferred(1.)) + updateConfiguration(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } - /// This is needed to increase / decrease the number of threads at runtime - updatePoolConfiguration(); bool res = pool.trySchedule([this, item] () { @@ -80,7 +89,7 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction() auto check_if_deleting = [&] () -> bool { - active.erase(item); + active.tryErase(item); for (auto & id : currently_deleting) { @@ -108,7 +117,7 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction() if (check_if_deleting()) return; - pending.emplace_back(item); + pending.tryPush(item); has_tasks.notify_one(); return; } @@ -130,8 +139,8 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction() if (!res) { - active.erase(item); - pending.emplace_back(item); + active.tryErase(item); + pending.tryPush(item); } } diff --git a/src/Storages/MergeTree/MergeMutateExecutor.h b/src/Storages/MergeTree/MergeMutateExecutor.h index db1e89dd1bd..accb9bf2e0f 100644 --- a/src/Storages/MergeTree/MergeMutateExecutor.h +++ b/src/Storages/MergeTree/MergeMutateExecutor.h @@ -5,10 +5,13 @@ #include #include #include +#include #include #include -#include +#include +#include +#include #include #include @@ -61,9 +64,19 @@ public: MOVE }; - explicit MergeTreeBackgroundExecutor(Type type_) : type(type_) + MergeTreeBackgroundExecutor( + Type type_, + CountGetter && threads_count_getter_, + CountGetter && max_task_count_getter_, + CurrentMetrics::Metric metric_) + : type(type_) + , threads_count_getter(threads_count_getter_) + , max_task_count_getter(max_task_count_getter_) + , metric(metric_) { name = toString(type); + + updateConfiguration(); scheduler = ThreadFromGlobalPool([this]() { schedulerThreadFunction(); }); } @@ -72,21 +85,6 @@ public: wait(); } - void setThreadsCount(CountGetter && getter) - { - threads_count_getter = getter; - } - - void setTasksCount(CountGetter && getter) - { - max_task_count_getter = getter; - } - - void setMetric(CurrentMetrics::Metric metric_) - { - metric = metric_; - } - bool trySchedule(ExecutableTaskPtr task) { std::lock_guard lock(mutex); @@ -95,10 +93,13 @@ public: return false; auto & value = CurrentMetrics::values[metric]; - if (value.load() >= static_cast(max_task_count_getter())) + if (value.load() >= static_cast(max_tasks_count)) return false; - pending.emplace_back(std::make_shared(std::move(task), metric)); + if (!pending.tryPush(std::make_shared(std::move(task), metric))) + return false; + + has_tasks.notify_one(); return true; } @@ -133,12 +134,27 @@ public: private: - void updatePoolConfiguration() + void updateConfiguration() { - const auto max_threads = threads_count_getter(); - pool.setMaxFreeThreads(0); - pool.setMaxThreads(max_threads); - pool.setQueueSize(max_threads); + auto new_threads_count = threads_count_getter(); + auto new_max_tasks_count = max_task_count_getter(); + + try + { + pending.resize(new_max_tasks_count); + active.reserve(new_max_tasks_count); + + pool.setMaxFreeThreads(0); + pool.setMaxThreads(new_threads_count); + pool.setQueueSize(new_max_tasks_count); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + + threads_count = new_threads_count; + max_tasks_count = new_max_tasks_count; } void schedulerThreadFunction(); @@ -151,6 +167,11 @@ private: CountGetter max_task_count_getter; CurrentMetrics::Metric metric; + size_t threads_count{0}; + size_t max_tasks_count{0}; + + AtomicStopwatch update_timer; + struct Item { explicit Item(ExecutableTaskPtr && task_, CurrentMetrics::Metric metric_) @@ -169,8 +190,9 @@ private: using ItemPtr = std::shared_ptr; - std::deque pending; - std::set active; + /// Initially it will be empty + RingBuffer pending{0}; + PlainMultiSet active{0}; std::set currently_deleting; std::mutex remove_mutex; diff --git a/src/Storages/MergeTree/tests/gtest_executor.cpp b/src/Storages/MergeTree/tests/gtest_executor.cpp index 0a330a0af76..664b09aa82f 100644 --- a/src/Storages/MergeTree/tests/gtest_executor.cpp +++ b/src/Storages/MergeTree/tests/gtest_executor.cpp @@ -54,14 +54,16 @@ private: TEST(Executor, RemoveTasks) { - auto executor = DB::MergeTreeBackgroundExecutor::create(DB::MergeTreeBackgroundExecutor::Type::MERGE_MUTATE); - const size_t tasks_kinds = 25; const size_t batch = 100; - executor->setThreadsCount([]() { return tasks_kinds; }); - executor->setTasksCount([] () { return tasks_kinds * batch; }); - executor->setMetric(CurrentMetrics::BackgroundPoolTask); + auto executor = DB::MergeTreeBackgroundExecutor::create + ( + DB::MergeTreeBackgroundExecutor::Type::MERGE_MUTATE, + [] () { return tasks_kinds; }, + [] () { return tasks_kinds * batch; }, + CurrentMetrics::BackgroundPoolTask + ); for (size_t i = 0; i < batch; ++i) for (size_t j = 0; j < tasks_kinds; ++j) @@ -93,16 +95,18 @@ TEST(Executor, RemoveTasks) TEST(Executor, RemoveTasksStress) { - auto executor = DB::MergeTreeBackgroundExecutor::create(DB::MergeTreeBackgroundExecutor::Type::MERGE_MUTATE); - const size_t tasks_kinds = 25; const size_t batch = 100; const size_t schedulers_count = 5; const size_t removers_count = 5; - executor->setThreadsCount([]() { return tasks_kinds; }); - executor->setTasksCount([] () { return tasks_kinds * batch * (schedulers_count + removers_count); }); - executor->setMetric(CurrentMetrics::BackgroundPoolTask); + auto executor = DB::MergeTreeBackgroundExecutor::create + ( + DB::MergeTreeBackgroundExecutor::Type::MERGE_MUTATE, + [] () { return tasks_kinds; }, + [] () { return tasks_kinds * batch * (schedulers_count + removers_count); }, + CurrentMetrics::BackgroundPoolTask + ); std::barrier barrier(schedulers_count + removers_count);