get rid of half of allocations

This commit is contained in:
Nikita Mikhaylov 2021-09-02 17:40:29 +00:00
parent 7f21cd7f3d
commit a6fe91ca47
9 changed files with 505 additions and 62 deletions

114
src/Common/PlainMultiSet.h Normal file
View File

@ -0,0 +1,114 @@
#pragma once
#include <vector>
namespace DB
{
/**
* Class with O(n) complexity for all methods
* Size has to be fixed.
* The main reason to use this is to get rid of any allocations.
* Used is some executors, where the number of elements is really small.
*/
template <class T>
class PlainMultiSet
{
public:
explicit PlainMultiSet(size_t capacity_)
{
buffer.resize(capacity_);
}
bool tryPush(T element)
{
for (auto & item : buffer)
{
if (item.state == State::EMPTY)
{
item.state = State::FILLED;
item.value = std::move(element);
++count;
return true;
}
}
return false;
}
bool has(T element)
{
for (auto & item : buffer)
if (item.state == State::FILLED && item.value == element)
return true;
return false;
}
template <class Predicate>
std::vector<T> getAll(Predicate && predicate)
{
std::vector<T> suitable;
for (auto & item : buffer)
if (item.state == State::FILLED && predicate(item.value))
suitable.emplace_back(item.value);
return suitable;
}
bool tryErase(const T & element)
{
for (auto & item : buffer)
{
if (item.state == State::FILLED && item.value == element)
{
item.state = State::EMPTY;
item.value = T{};
--count;
return true;
}
}
return false;
}
size_t size()
{
return count;
}
void reserve(size_t new_capacity)
{
if (buffer.size() >= new_capacity)
return;
std::vector<Item> new_buffer(std::move(buffer));
new_buffer.reserve(new_capacity);
std::swap(new_buffer, buffer);
}
private:
enum class State
{
EMPTY,
FILLED
};
struct Item
{
T value;
State state{State::EMPTY};
};
size_t count{0};
std::vector<Item> buffer;
};
}

122
src/Common/RingBuffer.h Normal file
View File

@ -0,0 +1,122 @@
#pragma once
#include <vector>
#include <iostream>
namespace DB
{
/**
* A ring buffer of fixed size.
* With an ability to expand / narrow.
* When narrowing only first N elements remain.
*/
template <class T>
class RingBuffer {
public:
explicit RingBuffer(size_t capacity_) : capacity(capacity_)
{
buffer.assign(capacity, {});
}
size_t size() const
{
return count;
}
bool empty() const
{
return count == 0;
}
bool tryPush(T element)
{
if (count == capacity) {
return false;
}
buffer[(position + count) % capacity] = element;
++count;
return true;
}
bool tryPop(T * element)
{
if (empty()) {
return false;
}
*element = std::move(buffer[position]);
--count;
position = (position + 1) % capacity;
return true;
}
template <typename Predicate>
void removeElements(Predicate && predicate)
{
/// Shift all elements to the beginning of the buffer
std::rotate(buffer.begin(), buffer.begin() + position, buffer.end());
/// Remove elements
auto end_removed = std::remove_if(buffer.begin(), buffer.begin() + count, predicate);
size_t new_count = std::distance(buffer.begin(), end_removed);
for (size_t i = new_count; i < count; ++i)
buffer[i] = T{};
count = new_count;
position = 0;
}
void resize(size_t new_capacity)
{
if (new_capacity > capacity)
expand(new_capacity);
else if (new_capacity < capacity)
narrow(new_capacity);
}
private:
void expand(size_t new_capacity)
{
bool overflow = (position + count) > capacity;
buffer.resize(new_capacity);
if (overflow)
{
size_t count_before_end = capacity - position;
for (size_t i = 0; i < count_before_end; ++i)
buffer[new_capacity - i] = buffer[capacity - i];
position = new_capacity - count_before_end;
}
capacity = new_capacity;
}
void narrow(size_t new_capacity)
{
std::vector<T> new_buffer(new_capacity);
count = std::min(new_capacity, count);
for (size_t i = 0; i < count; ++i)
new_buffer[i] = buffer[(position + i) % capacity];
std::swap(buffer, new_buffer);
position = 0;
capacity = new_capacity;
}
std::vector<T> buffer;
size_t position{0};
size_t count{0};
size_t capacity{0};
};
}

View File

@ -107,7 +107,6 @@ private:
std::list<Thread> threads;
std::exception_ptr first_exception;
template <typename ReturnType>
ReturnType scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds);

View File

@ -0,0 +1,22 @@
#include <gtest/gtest.h>
#include <random>
#include <Common/PlainMultiSet.h>
using namespace DB;
TEST(PlainMultiSet, Simple)
{
PlainMultiSet<int> set(10);
ASSERT_TRUE(set.tryPush(1));
ASSERT_TRUE(set.tryPush(1));
ASSERT_TRUE(set.tryPush(2));
ASSERT_TRUE(set.tryPush(3));
ASSERT_TRUE(set.has(1));
ASSERT_TRUE(set.has(2));
ASSERT_TRUE(set.has(3));
}

View File

@ -0,0 +1,142 @@
#include <gtest/gtest.h>
#include <random>
#include <Common/RingBuffer.h>
using namespace DB;
TEST(RingBuffer, Empty)
{
RingBuffer<int> buffer(1);
ASSERT_TRUE(buffer.size() == 0u); // NOLINT
ASSERT_TRUE(buffer.empty());
}
TEST(RingBuffer, PushAndPop)
{
RingBuffer<int> buffer(2);
int i;
ASSERT_TRUE(true == buffer.tryPush(0));
ASSERT_TRUE(true == buffer.tryPush(1));
ASSERT_TRUE(false == buffer.tryPush(2));
ASSERT_TRUE(2u == buffer.size());
ASSERT_TRUE(false == buffer.empty());
ASSERT_TRUE(true == buffer.tryPop(&i));
ASSERT_TRUE(0 == i);
ASSERT_TRUE(true == buffer.tryPop(&i));
ASSERT_TRUE(1 == i);
ASSERT_TRUE(false == buffer.tryPop(&i));
ASSERT_TRUE(buffer.empty());
ASSERT_TRUE(true == buffer.empty());
}
TEST(RingBuffer, Random)
{
std::random_device device;
std::mt19937 generator(device());
std::uniform_int_distribution<> distribution(0, 1);
RingBuffer<int> buffer(10);
int next_element = 0;
int next_received_element = 0;
for (int i = 0; i < 100000; ++i) {
if (distribution(generator) == 0)
{
if (buffer.tryPush(next_element))
next_element++;
}
else
{
int element;
if (buffer.tryPop(&element))
{
ASSERT_TRUE(next_received_element == element);
next_received_element++;
}
}
}
}
TEST(RingBuffer, Resize)
{
RingBuffer<int> buffer(10);
for (size_t i = 0; i < 10; ++i)
ASSERT_TRUE(buffer.tryPush(i));
buffer.resize(0);
ASSERT_TRUE(buffer.empty());
ASSERT_EQ(buffer.size(), 0u);
ASSERT_FALSE(buffer.tryPush(42));
int value;
ASSERT_FALSE(buffer.tryPop(&value));
buffer.resize(1);
ASSERT_TRUE(buffer.tryPush(42));
ASSERT_TRUE(buffer.tryPop(&value));
ASSERT_EQ(value, 42);
buffer.resize(42);
for (size_t i = 0; i < 42; ++i)
ASSERT_TRUE(buffer.tryPush(i));
buffer.resize(56);
for (size_t i = 0; i < 42; ++i)
{
ASSERT_TRUE(buffer.tryPop(&value));
ASSERT_EQ(value, i);
}
for (size_t i = 0; i < 56; ++i)
ASSERT_TRUE(buffer.tryPush(i));
buffer.resize(13);
for (size_t i = 0; i < 13; ++i)
{
ASSERT_TRUE(buffer.tryPop(&value));
ASSERT_EQ(value, i);
}
}
TEST(RingBuffer, removeElements)
{
RingBuffer<int> buffer(10);
for (size_t i = 0; i < 10; ++i)
ASSERT_TRUE(buffer.tryPush(i));
int value;
ASSERT_TRUE(buffer.tryPop(&value));
ASSERT_TRUE(buffer.tryPop(&value));
ASSERT_TRUE(buffer.tryPop(&value));
buffer.removeElements([](int current) { return current % 2 == 0; });
ASSERT_EQ(buffer.size(), 4);
ASSERT_TRUE(buffer.tryPop(&value));
ASSERT_EQ(value, 3);
ASSERT_TRUE(buffer.tryPop(&value));
ASSERT_EQ(value, 5);
ASSERT_TRUE(buffer.tryPop(&value));
ASSERT_EQ(value, 7);
ASSERT_TRUE(buffer.tryPop(&value));
ASSERT_EQ(value, 9);
}

View File

@ -2729,21 +2729,30 @@ PartUUIDsPtr Context::getIgnoredPartUUIDs() const
void Context::initializeBackgroundExecutors()
{
shared->merge_mutate_executor = MergeTreeBackgroundExecutor::create(MergeTreeBackgroundExecutor::Type::MERGE_MUTATE);
shared->moves_executor = MergeTreeBackgroundExecutor::create(MergeTreeBackgroundExecutor::Type::MOVE);
shared->fetch_executor = MergeTreeBackgroundExecutor::create(MergeTreeBackgroundExecutor::Type::FETCH);
shared->merge_mutate_executor = MergeTreeBackgroundExecutor::create
(
MergeTreeBackgroundExecutor::Type::MERGE_MUTATE,
[this] () { return getSettingsRef().background_pool_size; },
[this] () { return getSettingsRef().background_pool_size; },
CurrentMetrics::BackgroundPoolTask
);
shared->merge_mutate_executor->setThreadsCount([this] () { return getSettingsRef().background_pool_size; });
shared->merge_mutate_executor->setTasksCount([this] () { return getSettingsRef().background_pool_size; });
shared->merge_mutate_executor->setMetric(CurrentMetrics::BackgroundPoolTask);
shared->moves_executor = MergeTreeBackgroundExecutor::create
(
MergeTreeBackgroundExecutor::Type::MOVE,
[this] () { return getSettingsRef().background_move_pool_size; },
[this] () { return getSettingsRef().background_move_pool_size; },
CurrentMetrics::BackgroundMovePoolTask
);
shared->moves_executor->setThreadsCount([this] () { return getSettingsRef().background_move_pool_size; });
shared->moves_executor->setTasksCount([this] () { return getSettingsRef().background_move_pool_size; });
shared->moves_executor->setMetric(CurrentMetrics::BackgroundMovePoolTask);
shared->fetch_executor->setThreadsCount([this] () { return getSettingsRef().background_fetches_pool_size; });
shared->fetch_executor->setTasksCount([this] () { return getSettingsRef().background_fetches_pool_size; });
shared->fetch_executor->setMetric(CurrentMetrics::BackgroundFetchesPoolTask);
shared->fetch_executor = MergeTreeBackgroundExecutor::create
(
MergeTreeBackgroundExecutor::Type::FETCH,
[this] () { return getSettingsRef().background_fetches_pool_size; },
[this] () { return getSettingsRef().background_fetches_pool_size; },
CurrentMetrics::BackgroundFetchesPoolTask
);
}

View File

@ -32,12 +32,10 @@ void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id
/// Mark this StorageID as deleting
currently_deleting.emplace(id);
std::erase_if(pending, [&] (auto item) -> bool { return item->task->getStorageID() == id; });
pending.removeElements([&] (auto item) -> bool { return item->task->getStorageID() == id; });
/// Find pending to wait
for (const auto & item : active)
if (item->task->getStorageID() == id)
tasks_to_wait.emplace_back(item);
tasks_to_wait = active.getAll([&] (auto item) -> bool { return item->task->getStorageID() == id; });
}
@ -66,13 +64,24 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
if (shutdown_suspend)
break;
auto item = std::move(pending.front());
pending.pop_front();
ItemPtr item;
if (!pending.tryPop(&item))
continue;
active.emplace(item);
active.tryPush(item);
try
{
/// This is needed to increase / decrease the number of threads at runtime
if (update_timer.compareAndRestartDeferred(1.))
updateConfiguration();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
/// This is needed to increase / decrease the number of threads at runtime
updatePoolConfiguration();
bool res = pool.trySchedule([this, item] ()
{
@ -80,7 +89,7 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
auto check_if_deleting = [&] () -> bool
{
active.erase(item);
active.tryErase(item);
for (auto & id : currently_deleting)
{
@ -108,7 +117,7 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
if (check_if_deleting())
return;
pending.emplace_back(item);
pending.tryPush(item);
has_tasks.notify_one();
return;
}
@ -130,8 +139,8 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
if (!res)
{
active.erase(item);
pending.emplace_back(item);
active.tryErase(item);
pending.tryPush(item);
}
}

View File

@ -5,10 +5,13 @@
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <unordered_set>
#include <common/shared_ptr_helper.h>
#include <Common/ThreadPool.h>
#include <Common/ArenaAllocator.h>
#include <Common/Stopwatch.h>
#include <Common/RingBuffer.h>
#include <Common/PlainMultiSet.h>
#include <Storages/MergeTree/ExecutableTask.h>
#include <Storages/MergeTree/MergeTreeData.h>
@ -61,9 +64,19 @@ public:
MOVE
};
explicit MergeTreeBackgroundExecutor(Type type_) : type(type_)
MergeTreeBackgroundExecutor(
Type type_,
CountGetter && threads_count_getter_,
CountGetter && max_task_count_getter_,
CurrentMetrics::Metric metric_)
: type(type_)
, threads_count_getter(threads_count_getter_)
, max_task_count_getter(max_task_count_getter_)
, metric(metric_)
{
name = toString(type);
updateConfiguration();
scheduler = ThreadFromGlobalPool([this]() { schedulerThreadFunction(); });
}
@ -72,21 +85,6 @@ public:
wait();
}
void setThreadsCount(CountGetter && getter)
{
threads_count_getter = getter;
}
void setTasksCount(CountGetter && getter)
{
max_task_count_getter = getter;
}
void setMetric(CurrentMetrics::Metric metric_)
{
metric = metric_;
}
bool trySchedule(ExecutableTaskPtr task)
{
std::lock_guard lock(mutex);
@ -95,10 +93,13 @@ public:
return false;
auto & value = CurrentMetrics::values[metric];
if (value.load() >= static_cast<int64_t>(max_task_count_getter()))
if (value.load() >= static_cast<int64_t>(max_tasks_count))
return false;
pending.emplace_back(std::make_shared<Item>(std::move(task), metric));
if (!pending.tryPush(std::make_shared<Item>(std::move(task), metric)))
return false;
has_tasks.notify_one();
return true;
}
@ -133,12 +134,27 @@ public:
private:
void updatePoolConfiguration()
void updateConfiguration()
{
const auto max_threads = threads_count_getter();
pool.setMaxFreeThreads(0);
pool.setMaxThreads(max_threads);
pool.setQueueSize(max_threads);
auto new_threads_count = threads_count_getter();
auto new_max_tasks_count = max_task_count_getter();
try
{
pending.resize(new_max_tasks_count);
active.reserve(new_max_tasks_count);
pool.setMaxFreeThreads(0);
pool.setMaxThreads(new_threads_count);
pool.setQueueSize(new_max_tasks_count);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
threads_count = new_threads_count;
max_tasks_count = new_max_tasks_count;
}
void schedulerThreadFunction();
@ -151,6 +167,11 @@ private:
CountGetter max_task_count_getter;
CurrentMetrics::Metric metric;
size_t threads_count{0};
size_t max_tasks_count{0};
AtomicStopwatch update_timer;
struct Item
{
explicit Item(ExecutableTaskPtr && task_, CurrentMetrics::Metric metric_)
@ -169,8 +190,9 @@ private:
using ItemPtr = std::shared_ptr<Item>;
std::deque<ItemPtr> pending;
std::set<ItemPtr> active;
/// Initially it will be empty
RingBuffer<ItemPtr> pending{0};
PlainMultiSet<ItemPtr> active{0};
std::set<StorageID> currently_deleting;
std::mutex remove_mutex;

View File

@ -54,14 +54,16 @@ private:
TEST(Executor, RemoveTasks)
{
auto executor = DB::MergeTreeBackgroundExecutor::create(DB::MergeTreeBackgroundExecutor::Type::MERGE_MUTATE);
const size_t tasks_kinds = 25;
const size_t batch = 100;
executor->setThreadsCount([]() { return tasks_kinds; });
executor->setTasksCount([] () { return tasks_kinds * batch; });
executor->setMetric(CurrentMetrics::BackgroundPoolTask);
auto executor = DB::MergeTreeBackgroundExecutor::create
(
DB::MergeTreeBackgroundExecutor::Type::MERGE_MUTATE,
[] () { return tasks_kinds; },
[] () { return tasks_kinds * batch; },
CurrentMetrics::BackgroundPoolTask
);
for (size_t i = 0; i < batch; ++i)
for (size_t j = 0; j < tasks_kinds; ++j)
@ -93,16 +95,18 @@ TEST(Executor, RemoveTasks)
TEST(Executor, RemoveTasksStress)
{
auto executor = DB::MergeTreeBackgroundExecutor::create(DB::MergeTreeBackgroundExecutor::Type::MERGE_MUTATE);
const size_t tasks_kinds = 25;
const size_t batch = 100;
const size_t schedulers_count = 5;
const size_t removers_count = 5;
executor->setThreadsCount([]() { return tasks_kinds; });
executor->setTasksCount([] () { return tasks_kinds * batch * (schedulers_count + removers_count); });
executor->setMetric(CurrentMetrics::BackgroundPoolTask);
auto executor = DB::MergeTreeBackgroundExecutor::create
(
DB::MergeTreeBackgroundExecutor::Type::MERGE_MUTATE,
[] () { return tasks_kinds; },
[] () { return tasks_kinds * batch * (schedulers_count + removers_count); },
CurrentMetrics::BackgroundPoolTask
);
std::barrier barrier(schedulers_count + removers_count);