deleted set

This commit is contained in:
Nikita Mikhaylov 2021-09-02 18:49:37 +00:00
parent a6fe91ca47
commit 6624fa12ba
6 changed files with 43 additions and 153 deletions

View File

@ -1,114 +0,0 @@
#pragma once
#include <vector>
namespace DB
{
/**
* Class with O(n) complexity for all methods
* Size has to be fixed.
* The main reason to use this is to get rid of any allocations.
* Used is some executors, where the number of elements is really small.
*/
template <class T>
class PlainMultiSet
{
public:
explicit PlainMultiSet(size_t capacity_)
{
buffer.resize(capacity_);
}
bool tryPush(T element)
{
for (auto & item : buffer)
{
if (item.state == State::EMPTY)
{
item.state = State::FILLED;
item.value = std::move(element);
++count;
return true;
}
}
return false;
}
bool has(T element)
{
for (auto & item : buffer)
if (item.state == State::FILLED && item.value == element)
return true;
return false;
}
template <class Predicate>
std::vector<T> getAll(Predicate && predicate)
{
std::vector<T> suitable;
for (auto & item : buffer)
if (item.state == State::FILLED && predicate(item.value))
suitable.emplace_back(item.value);
return suitable;
}
bool tryErase(const T & element)
{
for (auto & item : buffer)
{
if (item.state == State::FILLED && item.value == element)
{
item.state = State::EMPTY;
item.value = T{};
--count;
return true;
}
}
return false;
}
size_t size()
{
return count;
}
void reserve(size_t new_capacity)
{
if (buffer.size() >= new_capacity)
return;
std::vector<Item> new_buffer(std::move(buffer));
new_buffer.reserve(new_capacity);
std::swap(new_buffer, buffer);
}
private:
enum class State
{
EMPTY,
FILLED
};
struct Item
{
T value;
State state{State::EMPTY};
};
size_t count{0};
std::vector<Item> buffer;
};
}

View File

@ -35,7 +35,7 @@ public:
if (count == capacity) { if (count == capacity) {
return false; return false;
} }
buffer[(position + count) % capacity] = element; buffer[advance(count)] = element;
++count; ++count;
return true; return true;
} }
@ -47,21 +47,19 @@ public:
} }
*element = std::move(buffer[position]); *element = std::move(buffer[position]);
--count; --count;
position = (position + 1) % capacity; position = advance();
return true; return true;
} }
template <typename Predicate> template <typename Predicate>
void removeElements(Predicate && predicate) void eraseAll(Predicate && predicate)
{ {
/// Shift all elements to the beginning of the buffer /// Shift all elements to the beginning of the buffer
std::rotate(buffer.begin(), buffer.begin() + position, buffer.end()); std::rotate(buffer.begin(), buffer.begin() + position, buffer.end());
/// Remove elements /// Remove elements
auto end_removed = std::remove_if(buffer.begin(), buffer.begin() + count, predicate); auto end_removed = std::remove_if(buffer.begin(), buffer.begin() + count, predicate);
size_t new_count = std::distance(buffer.begin(), end_removed); size_t new_count = std::distance(buffer.begin(), end_removed);
for (size_t i = new_count; i < count; ++i) for (size_t i = new_count; i < count; ++i)
buffer[i] = T{}; buffer[i] = T{};
@ -69,6 +67,31 @@ public:
position = 0; position = 0;
} }
template <class Predicate>
std::vector<T> getAll(Predicate && predicate)
{
std::vector<T> suitable;
for (size_t i = 0; i < count; ++i)
{
auto item = buffer[advance(i)];
if (predicate(item))
suitable.emplace_back(item);
}
return suitable;
}
template <typename Predicate>
bool has(Predicate && predicate)
{
for (size_t i = 0; i < count; ++i)
if (predicate(buffer[advance(i)]))
return true;
return false;
}
void resize(size_t new_capacity) void resize(size_t new_capacity)
{ {
@ -80,6 +103,13 @@ public:
private: private:
size_t advance(size_t amount = 1)
{
if (position + amount >= capacity)
return position + amount - capacity;
return position + amount;
}
void expand(size_t new_capacity) void expand(size_t new_capacity)
{ {
bool overflow = (position + count) > capacity; bool overflow = (position + count) > capacity;
@ -102,7 +132,7 @@ private:
count = std::min(new_capacity, count); count = std::min(new_capacity, count);
for (size_t i = 0; i < count; ++i) for (size_t i = 0; i < count; ++i)
new_buffer[i] = buffer[(position + i) % capacity]; new_buffer[i] = buffer[advance(i)];
std::swap(buffer, new_buffer); std::swap(buffer, new_buffer);

View File

@ -1,22 +0,0 @@
#include <gtest/gtest.h>
#include <random>
#include <Common/PlainMultiSet.h>
using namespace DB;
TEST(PlainMultiSet, Simple)
{
PlainMultiSet<int> set(10);
ASSERT_TRUE(set.tryPush(1));
ASSERT_TRUE(set.tryPush(1));
ASSERT_TRUE(set.tryPush(2));
ASSERT_TRUE(set.tryPush(3));
ASSERT_TRUE(set.has(1));
ASSERT_TRUE(set.has(2));
ASSERT_TRUE(set.has(3));
}

View File

@ -127,7 +127,7 @@ TEST(RingBuffer, removeElements)
ASSERT_TRUE(buffer.tryPop(&value)); ASSERT_TRUE(buffer.tryPop(&value));
ASSERT_TRUE(buffer.tryPop(&value)); ASSERT_TRUE(buffer.tryPop(&value));
buffer.removeElements([](int current) { return current % 2 == 0; }); buffer.eraseAll([](int current) { return current % 2 == 0; });
ASSERT_EQ(buffer.size(), 4); ASSERT_EQ(buffer.size(), 4);

View File

@ -32,9 +32,8 @@ void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id
/// Mark this StorageID as deleting /// Mark this StorageID as deleting
currently_deleting.emplace(id); currently_deleting.emplace(id);
pending.removeElements([&] (auto item) -> bool { return item->task->getStorageID() == id; }); /// Erase storage related tasks from pending and select active tasks to wait for
pending.eraseAll([&] (auto item) -> bool { return item->task->getStorageID() == id; });
/// Find pending to wait
tasks_to_wait = active.getAll([&] (auto item) -> bool { return item->task->getStorageID() == id; }); tasks_to_wait = active.getAll([&] (auto item) -> bool { return item->task->getStorageID() == id; });
} }
@ -45,7 +44,6 @@ void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id
item->future.wait(); item->future.wait();
} }
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
currently_deleting.erase(id); currently_deleting.erase(id);
@ -70,7 +68,6 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
active.tryPush(item); active.tryPush(item);
try try
{ {
/// This is needed to increase / decrease the number of threads at runtime /// This is needed to increase / decrease the number of threads at runtime
@ -89,7 +86,7 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
auto check_if_deleting = [&] () -> bool auto check_if_deleting = [&] () -> bool
{ {
active.tryErase(item); active.eraseAll([&] (auto x) { return x == item; });
for (auto & id : currently_deleting) for (auto & id : currently_deleting)
{ {
@ -139,7 +136,7 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
if (!res) if (!res)
{ {
active.tryErase(item); active.eraseAll([&] (auto x) { return x == item; });
pending.tryPush(item); pending.tryPush(item);
} }

View File

@ -11,7 +11,6 @@
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/RingBuffer.h> #include <Common/RingBuffer.h>
#include <Common/PlainMultiSet.h>
#include <Storages/MergeTree/ExecutableTask.h> #include <Storages/MergeTree/ExecutableTask.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
@ -142,7 +141,7 @@ private:
try try
{ {
pending.resize(new_max_tasks_count); pending.resize(new_max_tasks_count);
active.reserve(new_max_tasks_count); active.resize(new_max_tasks_count);
pool.setMaxFreeThreads(0); pool.setMaxFreeThreads(0);
pool.setMaxThreads(new_threads_count); pool.setMaxThreads(new_threads_count);
@ -192,7 +191,7 @@ private:
/// Initially it will be empty /// Initially it will be empty
RingBuffer<ItemPtr> pending{0}; RingBuffer<ItemPtr> pending{0};
PlainMultiSet<ItemPtr> active{0}; RingBuffer<ItemPtr> active{0};
std::set<StorageID> currently_deleting; std::set<StorageID> currently_deleting;
std::mutex remove_mutex; std::mutex remove_mutex;