diff --git a/src/Common/PlainMultiSet.h b/src/Common/PlainMultiSet.h deleted file mode 100644 index d5b646a53db..00000000000 --- a/src/Common/PlainMultiSet.h +++ /dev/null @@ -1,114 +0,0 @@ -#pragma once - -#include - -namespace DB -{ - - -/** - * Class with O(n) complexity for all methods - * Size has to be fixed. - * The main reason to use this is to get rid of any allocations. - * Used is some executors, where the number of elements is really small. - */ -template -class PlainMultiSet -{ -public: - - explicit PlainMultiSet(size_t capacity_) - { - buffer.resize(capacity_); - } - - - bool tryPush(T element) - { - for (auto & item : buffer) - { - if (item.state == State::EMPTY) - { - item.state = State::FILLED; - item.value = std::move(element); - ++count; - return true; - } - } - - - return false; - } - - bool has(T element) - { - for (auto & item : buffer) - if (item.state == State::FILLED && item.value == element) - return true; - - return false; - } - - - template - std::vector getAll(Predicate && predicate) - { - std::vector suitable; - for (auto & item : buffer) - if (item.state == State::FILLED && predicate(item.value)) - suitable.emplace_back(item.value); - - return suitable; - } - - - bool tryErase(const T & element) - { - for (auto & item : buffer) - { - if (item.state == State::FILLED && item.value == element) - { - item.state = State::EMPTY; - item.value = T{}; - --count; - return true; - } - } - - return false; - } - - size_t size() - { - return count; - } - - void reserve(size_t new_capacity) - { - if (buffer.size() >= new_capacity) - return; - - std::vector new_buffer(std::move(buffer)); - new_buffer.reserve(new_capacity); - - std::swap(new_buffer, buffer); - } - -private: - enum class State - { - EMPTY, - FILLED - }; - - struct Item - { - T value; - State state{State::EMPTY}; - }; - - size_t count{0}; - std::vector buffer; -}; - -} diff --git a/src/Common/RingBuffer.h b/src/Common/RingBuffer.h index a619ae2dab6..fb208307c3b 100644 --- a/src/Common/RingBuffer.h +++ b/src/Common/RingBuffer.h @@ -35,7 +35,7 @@ public: if (count == capacity) { return false; } - buffer[(position + count) % capacity] = element; + buffer[advance(count)] = element; ++count; return true; } @@ -47,21 +47,19 @@ public: } *element = std::move(buffer[position]); --count; - position = (position + 1) % capacity; + position = advance(); return true; } template - void removeElements(Predicate && predicate) + void eraseAll(Predicate && predicate) { /// Shift all elements to the beginning of the buffer std::rotate(buffer.begin(), buffer.begin() + position, buffer.end()); - /// Remove elements auto end_removed = std::remove_if(buffer.begin(), buffer.begin() + count, predicate); size_t new_count = std::distance(buffer.begin(), end_removed); - for (size_t i = new_count; i < count; ++i) buffer[i] = T{}; @@ -69,6 +67,31 @@ public: position = 0; } + template + std::vector getAll(Predicate && predicate) + { + std::vector suitable; + + for (size_t i = 0; i < count; ++i) + { + auto item = buffer[advance(i)]; + if (predicate(item)) + suitable.emplace_back(item); + } + + return suitable; + } + + template + bool has(Predicate && predicate) + { + for (size_t i = 0; i < count; ++i) + if (predicate(buffer[advance(i)])) + return true; + + return false; + } + void resize(size_t new_capacity) { @@ -80,6 +103,13 @@ public: private: + size_t advance(size_t amount = 1) + { + if (position + amount >= capacity) + return position + amount - capacity; + return position + amount; + } + void expand(size_t new_capacity) { bool overflow = (position + count) > capacity; @@ -102,7 +132,7 @@ private: count = std::min(new_capacity, count); for (size_t i = 0; i < count; ++i) - new_buffer[i] = buffer[(position + i) % capacity]; + new_buffer[i] = buffer[advance(i)]; std::swap(buffer, new_buffer); diff --git a/src/Common/tests/gtest_plain_multiset.cpp b/src/Common/tests/gtest_plain_multiset.cpp deleted file mode 100644 index 2018053d6a7..00000000000 --- a/src/Common/tests/gtest_plain_multiset.cpp +++ /dev/null @@ -1,22 +0,0 @@ -#include - -#include - -#include - -using namespace DB; - - -TEST(PlainMultiSet, Simple) -{ - PlainMultiSet set(10); - - ASSERT_TRUE(set.tryPush(1)); - ASSERT_TRUE(set.tryPush(1)); - ASSERT_TRUE(set.tryPush(2)); - ASSERT_TRUE(set.tryPush(3)); - - ASSERT_TRUE(set.has(1)); - ASSERT_TRUE(set.has(2)); - ASSERT_TRUE(set.has(3)); -} diff --git a/src/Common/tests/gtest_ringbuffer.cpp b/src/Common/tests/gtest_ringbuffer.cpp index e56739b21cc..394922e4e20 100644 --- a/src/Common/tests/gtest_ringbuffer.cpp +++ b/src/Common/tests/gtest_ringbuffer.cpp @@ -127,7 +127,7 @@ TEST(RingBuffer, removeElements) ASSERT_TRUE(buffer.tryPop(&value)); ASSERT_TRUE(buffer.tryPop(&value)); - buffer.removeElements([](int current) { return current % 2 == 0; }); + buffer.eraseAll([](int current) { return current % 2 == 0; }); ASSERT_EQ(buffer.size(), 4); diff --git a/src/Storages/MergeTree/MergeMutateExecutor.cpp b/src/Storages/MergeTree/MergeMutateExecutor.cpp index 5aecf85a7a4..e430c488d30 100644 --- a/src/Storages/MergeTree/MergeMutateExecutor.cpp +++ b/src/Storages/MergeTree/MergeMutateExecutor.cpp @@ -32,9 +32,8 @@ void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id /// Mark this StorageID as deleting currently_deleting.emplace(id); - pending.removeElements([&] (auto item) -> bool { return item->task->getStorageID() == id; }); - - /// Find pending to wait + /// Erase storage related tasks from pending and select active tasks to wait for + pending.eraseAll([&] (auto item) -> bool { return item->task->getStorageID() == id; }); tasks_to_wait = active.getAll([&] (auto item) -> bool { return item->task->getStorageID() == id; }); } @@ -45,7 +44,6 @@ void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id item->future.wait(); } - { std::lock_guard lock(mutex); currently_deleting.erase(id); @@ -70,7 +68,6 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction() active.tryPush(item); - try { /// This is needed to increase / decrease the number of threads at runtime @@ -89,7 +86,7 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction() auto check_if_deleting = [&] () -> bool { - active.tryErase(item); + active.eraseAll([&] (auto x) { return x == item; }); for (auto & id : currently_deleting) { @@ -139,7 +136,7 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction() if (!res) { - active.tryErase(item); + active.eraseAll([&] (auto x) { return x == item; }); pending.tryPush(item); } diff --git a/src/Storages/MergeTree/MergeMutateExecutor.h b/src/Storages/MergeTree/MergeMutateExecutor.h index accb9bf2e0f..807730b8dc6 100644 --- a/src/Storages/MergeTree/MergeMutateExecutor.h +++ b/src/Storages/MergeTree/MergeMutateExecutor.h @@ -11,7 +11,6 @@ #include #include #include -#include #include #include @@ -142,7 +141,7 @@ private: try { pending.resize(new_max_tasks_count); - active.reserve(new_max_tasks_count); + active.resize(new_max_tasks_count); pool.setMaxFreeThreads(0); pool.setMaxThreads(new_threads_count); @@ -192,7 +191,7 @@ private: /// Initially it will be empty RingBuffer pending{0}; - PlainMultiSet active{0}; + RingBuffer active{0}; std::set currently_deleting; std::mutex remove_mutex;