Use boost::circular_buffer

This commit is contained in:
Nikita Mikhaylov 2021-09-03 22:15:20 +00:00
parent 26ab6ebc0a
commit 3eec8a3c2f
8 changed files with 157 additions and 425 deletions

2
contrib/boost vendored

@ -1 +1 @@
Subproject commit 9cf09dbfd55a5c6202dedbdf40781a51b02c2675
Subproject commit 4b98e2befd3f3265b0db0acb5d20c4812ef8d88e

View File

@ -14,11 +14,12 @@ if (NOT USE_INTERNAL_BOOST_LIBRARY)
context
coroutine
graph
circular_buffer
)
if(Boost_INCLUDE_DIR AND Boost_FILESYSTEM_LIBRARY AND Boost_FILESYSTEM_LIBRARY AND
if(Boost_INCLUDE_DIR AND Boost_FILESYSTEM_LIBRARY AND
Boost_PROGRAM_OPTIONS_LIBRARY AND Boost_REGEX_LIBRARY AND Boost_SYSTEM_LIBRARY AND Boost_CONTEXT_LIBRARY AND
Boost_COROUTINE_LIBRARY AND Boost_GRAPH_LIBRARY)
Boost_COROUTINE_LIBRARY AND Boost_GRAPH_LIBRARY AND Boost_CIRCULAR_BUFFER_LIBRARY)
set(EXTERNAL_BOOST_FOUND 1)
@ -34,6 +35,7 @@ if (NOT USE_INTERNAL_BOOST_LIBRARY)
add_library (_boost_context INTERFACE)
add_library (_boost_coroutine INTERFACE)
add_library (_boost_graph INTERFACE)
add_library (_boost_circular_buffer INTERFACE)
target_link_libraries (_boost_filesystem INTERFACE ${Boost_FILESYSTEM_LIBRARY})
target_link_libraries (_boost_iostreams INTERFACE ${Boost_IOSTREAMS_LIBRARY})
@ -43,6 +45,7 @@ if (NOT USE_INTERNAL_BOOST_LIBRARY)
target_link_libraries (_boost_context INTERFACE ${Boost_CONTEXT_LIBRARY})
target_link_libraries (_boost_coroutine INTERFACE ${Boost_COROUTINE_LIBRARY})
target_link_libraries (_boost_graph INTERFACE ${Boost_GRAPH_LIBRARY})
target_link_libraries (_boost_circular_buffer INTERFACE ${Boost_CIRCULAR_BUFFER_LIBRARY})
add_library (boost::filesystem ALIAS _boost_filesystem)
add_library (boost::iostreams ALIAS _boost_iostreams)
@ -52,6 +55,7 @@ if (NOT USE_INTERNAL_BOOST_LIBRARY)
add_library (boost::context ALIAS _boost_context)
add_library (boost::coroutine ALIAS _boost_coroutine)
add_library (boost::graph ALIAS _boost_graph)
add_library (boost::circular_buffer ALIAS _boost_circular_buffer)
else()
set(EXTERNAL_BOOST_FOUND 0)
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't find system boost")
@ -238,4 +242,9 @@ if (NOT EXTERNAL_BOOST_FOUND)
target_include_directories (_boost_graph PRIVATE ${LIBRARY_DIR})
target_link_libraries(_boost_graph PRIVATE _boost_regex)
add_library(_boost_circular_buffer INTERFACE)
add_library(boost::circular_buffer ALIAS _boost_circular_buffer)
target_include_directories(_boost_circular_buffer SYSTEM BEFORE INTERFACE ${LIBRARY_DIR})
endif ()

View File

@ -297,6 +297,7 @@ target_link_libraries(clickhouse_common_io
PUBLIC
boost::program_options
boost::system
boost::circular_buffer
${CITYHASH_LIBRARIES}
${ZLIB_LIBRARIES}
pcg_random
@ -335,6 +336,7 @@ dbms_target_link_libraries (
PRIVATE
boost::filesystem
boost::program_options
boost::circular_buffer
clickhouse_common_config
clickhouse_common_zookeeper
clickhouse_dictionaries_embedded

View File

@ -1,153 +0,0 @@
#pragma once
#include <vector>
#include <iostream>
namespace DB
{
/**
* A ring buffer of fixed size.
* With an ability to expand / narrow.
* When narrowing only first N elements remain.
*/
template <class T>
class RingBuffer
{
public:
explicit RingBuffer(size_t capacity_) : capacity(capacity_)
{
buffer.assign(capacity, {});
}
size_t size() const { return count; }
bool empty() const { return count == 0; }
bool tryPush(T element)
{
if (count == capacity) {
return false;
}
buffer[advance(count)] = std::move(element);
++count;
return true;
}
bool tryPop(T * element)
{
if (empty()) {
return false;
}
*element = std::move(buffer[position]);
--count;
position = advance();
return true;
}
/// In case of T = std::shared_ptr<Something> it won't cause any allocations
template <typename Predicate>
bool eraseAll(Predicate && predicate)
{
/// Shift all elements to the beginning of the buffer
std::rotate(buffer.begin(), buffer.begin() + position, buffer.end());
position = 0;
/// Remove elements
auto end_removed = std::remove_if(buffer.begin(), buffer.begin() + count, predicate);
if (end_removed == buffer.begin() + count)
return false;
size_t new_count = std::distance(buffer.begin(), end_removed);
for (size_t i = new_count; i < count; ++i)
buffer[i] = T{};
count = new_count;
return true;
}
template <class Predicate>
std::vector<T> getAll(Predicate && predicate)
{
std::vector<T> suitable;
for (size_t i = 0; i < count; ++i)
{
auto item = buffer[advance(i)];
if (predicate(item))
suitable.emplace_back(item);
}
return suitable;
}
template <typename Predicate>
bool has(Predicate && predicate)
{
for (size_t i = 0; i < count; ++i)
if (predicate(buffer[advance(i)]))
return true;
return false;
}
void resize(size_t new_capacity)
{
if (new_capacity > capacity)
expand(new_capacity);
else if (new_capacity < capacity)
narrow(new_capacity);
}
private:
size_t advance(size_t amount = 1)
{
if (position + amount >= capacity)
return position + amount - capacity;
return position + amount;
}
void expand(size_t new_capacity)
{
bool overflow = (position + count) > capacity;
buffer.resize(new_capacity);
if (overflow)
{
size_t count_before_end = capacity - position;
for (size_t i = 0; i < count_before_end; ++i)
buffer[new_capacity - i] = buffer[capacity - i];
position = new_capacity - count_before_end;
}
capacity = new_capacity;
}
void narrow(size_t new_capacity)
{
std::vector<T> new_buffer(new_capacity);
count = std::min(new_capacity, count);
for (size_t i = 0; i < count; ++i)
new_buffer[i] = buffer[advance(i)];
std::swap(buffer, new_buffer);
position = 0;
capacity = new_capacity;
}
std::vector<T> buffer;
size_t position{0};
size_t count{0};
size_t capacity{0};
};
}

View File

@ -1,142 +0,0 @@
#include <gtest/gtest.h>
#include <random>
#include <Common/RingBuffer.h>
using namespace DB;
TEST(RingBuffer, Empty)
{
RingBuffer<int> buffer(1);
ASSERT_TRUE(buffer.size() == 0u); // NOLINT
ASSERT_TRUE(buffer.empty());
}
TEST(RingBuffer, PushAndPop)
{
RingBuffer<int> buffer(2);
int i;
ASSERT_TRUE(true == buffer.tryPush(0));
ASSERT_TRUE(true == buffer.tryPush(1));
ASSERT_TRUE(false == buffer.tryPush(2));
ASSERT_TRUE(2u == buffer.size());
ASSERT_TRUE(false == buffer.empty());
ASSERT_TRUE(true == buffer.tryPop(&i));
ASSERT_TRUE(0 == i);
ASSERT_TRUE(true == buffer.tryPop(&i));
ASSERT_TRUE(1 == i);
ASSERT_TRUE(false == buffer.tryPop(&i));
ASSERT_TRUE(buffer.empty());
ASSERT_TRUE(true == buffer.empty());
}
TEST(RingBuffer, Random)
{
std::random_device device;
std::mt19937 generator(device());
std::uniform_int_distribution<> distribution(0, 3);
RingBuffer<int> buffer(10);
int next_element = 0;
int next_received_element = 0;
for (int i = 0; i < 100000; ++i) {
if (distribution(generator) == 0)
{
if (buffer.tryPush(next_element))
next_element++;
}
else
{
int element;
if (buffer.tryPop(&element))
{
ASSERT_TRUE(next_received_element == element);
next_received_element++;
}
}
}
}
TEST(RingBuffer, Resize)
{
RingBuffer<int> buffer(10);
for (size_t i = 0; i < 10; ++i)
ASSERT_TRUE(buffer.tryPush(i));
buffer.resize(0);
ASSERT_TRUE(buffer.empty());
ASSERT_EQ(buffer.size(), 0u);
ASSERT_FALSE(buffer.tryPush(42));
int value;
ASSERT_FALSE(buffer.tryPop(&value));
buffer.resize(1);
ASSERT_TRUE(buffer.tryPush(42));
ASSERT_TRUE(buffer.tryPop(&value));
ASSERT_EQ(value, 42);
buffer.resize(42);
for (size_t i = 0; i < 42; ++i)
ASSERT_TRUE(buffer.tryPush(i));
buffer.resize(56);
for (size_t i = 0; i < 42; ++i)
{
ASSERT_TRUE(buffer.tryPop(&value));
ASSERT_EQ(value, i);
}
for (size_t i = 0; i < 56; ++i)
ASSERT_TRUE(buffer.tryPush(i));
buffer.resize(13);
for (size_t i = 0; i < 13; ++i)
{
ASSERT_TRUE(buffer.tryPop(&value));
ASSERT_EQ(value, i);
}
}
TEST(RingBuffer, removeElements)
{
RingBuffer<int> buffer(10);
for (size_t i = 0; i < 10; ++i)
ASSERT_TRUE(buffer.tryPush(i));
int value;
ASSERT_TRUE(buffer.tryPop(&value));
ASSERT_TRUE(buffer.tryPop(&value));
ASSERT_TRUE(buffer.tryPop(&value));
buffer.eraseAll([](int current) { return current % 2 == 0; });
ASSERT_EQ(buffer.size(), 4);
ASSERT_TRUE(buffer.tryPop(&value));
ASSERT_EQ(value, 3);
ASSERT_TRUE(buffer.tryPop(&value));
ASSERT_EQ(value, 5);
ASSERT_TRUE(buffer.tryPop(&value));
ASSERT_EQ(value, 7);
ASSERT_TRUE(buffer.tryPop(&value));
ASSERT_EQ(value, 9);
}

View File

@ -51,30 +51,21 @@ void BackgroundJobAssignee::postpone()
void BackgroundJobAssignee::scheduleMergeMutateTask(ExecutableTaskPtr merge_task)
{
bool res = getContext()->getMergeMutateExecutor()->trySchedule(merge_task);
if (res)
trigger();
else
postpone();
res ? trigger() : postpone();
}
void BackgroundJobAssignee::scheduleFetchTask(ExecutableTaskPtr fetch_task)
{
bool res = getContext()->getFetchesExecutor()->trySchedule(fetch_task);
if (res)
trigger();
else
postpone();
res ? trigger() : postpone();
}
void BackgroundJobAssignee::scheduleMoveTask(ExecutableTaskPtr move_task)
{
bool res = getContext()->getMovesExecutor()->trySchedule(move_task);
if (res)
trigger();
else
postpone();
res ? trigger() : postpone();
}

View File

@ -22,8 +22,88 @@ String MergeTreeBackgroundExecutor::toString(Type type)
}
void MergeTreeBackgroundExecutor::updateConfiguration()
{
auto new_threads_count = threads_count_getter();
auto new_max_tasks_count = max_task_count_getter();
try
{
pending.set_capacity(new_max_tasks_count);
active.set_capacity(new_max_tasks_count);
pool.setMaxFreeThreads(new_threads_count);
pool.setMaxThreads(new_threads_count);
pool.setQueueSize(new_max_tasks_count);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
threads_count = new_threads_count;
max_tasks_count = new_max_tasks_count;
}
void MergeTreeBackgroundExecutor::wait()
{
{
std::lock_guard lock(mutex);
shutdown = true;
has_tasks.notify_all();
}
if (scheduler.joinable())
scheduler.join();
pool.wait();
}
bool MergeTreeBackgroundExecutor::trySchedule(ExecutableTaskPtr task)
{
std::lock_guard lock(mutex);
if (shutdown)
return false;
try
{
/// This is needed to increase / decrease the number of threads at runtime
if (update_timer.compareAndRestartDeferred(10.))
updateConfiguration();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
auto & value = CurrentMetrics::values[metric];
if (value.load() >= static_cast<int64_t>(max_tasks_count))
return false;
/// Just check if the main scheduler thread in excellent condition
if (!scheduler.joinable())
{
LOG_ERROR(&Poco::Logger::get("MergeTreeBackgroundExecutor"), "Scheduler thread is dead. Trying to alive..");
scheduler = ThreadFromGlobalPool([this]() { schedulerThreadFunction(); });
if (!scheduler.joinable())
LOG_FATAL(&Poco::Logger::get("MergeTreeBackgroundExecutor"), "Scheduler thread is dead permanently. Restart is needed");
}
pending.push_back(std::make_shared<Item>(std::move(task), metric));
has_tasks.notify_one();
return true;
}
void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id)
{
/// Executor is global, so protect from any concurrent storage shutdowns
std::lock_guard remove_lock(remove_mutex);
std::vector<ItemPtr> tasks_to_wait;
@ -34,8 +114,13 @@ void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id
currently_deleting.emplace(id);
/// Erase storage related tasks from pending and select active tasks to wait for
pending.eraseAll([&] (auto item) -> bool { return item->task->getStorageID() == id; });
tasks_to_wait = active.getAll([&] (auto item) -> bool { return item->task->getStorageID() == id; });
auto it = std::remove_if(pending.begin(), pending.end(),
[&] (auto item) -> bool { return item->task->getStorageID() == id; } );
pending.erase(it, pending.end());
/// Copy items to wait for their completion
std::copy_if(active.begin(), active.end(), std::back_inserter(tasks_to_wait),
[&] (auto item) -> bool { return item->task->getStorageID() == id; });
}
@ -60,55 +145,48 @@ void MergeTreeBackgroundExecutor::routine(ItemPtr item)
{
setThreadName(name.c_str());
bool checked{false};
auto check_if_currently_deleting = [&] ()
auto erase_from_active = [&]
{
checked = true;
return active.eraseAll([&] (auto & x) { return x == item; });
DENY_ALLOCATIONS_IN_SCOPE;
active.erase(std::remove(active.begin(), active.end(), item), active.end());
};
SCOPE_EXIT({
if (checked)
return;
std::lock_guard guard(mutex);
check_if_currently_deleting();
});
try
{
if (item->task->execute())
{
DENY_ALLOCATIONS_IN_SCOPE;
std::lock_guard guard(mutex);
if (check_if_currently_deleting())
if (currently_deleting.contains(item->task->getStorageID()))
{
erase_from_active();
return;
}
pending.tryPush(item);
pending.push_back(item);
erase_from_active();
item->is_done.reset();
has_tasks.notify_one();
return;
}
std::lock_guard guard(mutex);
erase_from_active();
has_tasks.notify_one();
/// In a situation of a lack of memory this method can throw an exception,
/// because it may interact somehow with BackgroundSchedulePool, which may allocate memory
/// But it is rather safe, because we have try...catch block here, and another one in ThreadPool.
item->task->onCompleted();
std::lock_guard guard(mutex);
has_tasks.notify_one();
}
catch(...)
{
std::lock_guard guard(mutex);
erase_from_active();
has_tasks.notify_one();
try
{
item->task->onCompleted();
}
catch (...) {}
tryLogCurrentException(__PRETTY_FUNCTION__);
/// Do not want any exceptions
try { item->task->onCompleted(); } catch (...) {}
}
}
@ -117,37 +195,36 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
{
DENY_ALLOCATIONS_IN_SCOPE;
bool status;
while (true)
{
std::unique_lock lock(mutex);
has_tasks.wait(lock, [this](){ return !pending.empty() || shutdown_suspend; });
has_tasks.wait(lock, [this](){ return !pending.empty() || shutdown; });
if (shutdown_suspend)
if (shutdown)
break;
ItemPtr item;
if (!pending.tryPop(&item))
continue;
ItemPtr item = std::move(pending.front());
pending.pop_front();
/// Execute a piece of task
bool res = pool.trySchedule([this, item]
{
routine(item);
/// When storage shutdowns it will wait until all related background tasks
/// are finished, because they may want to interact with its fields
/// and this will cause segfault.
item->is_done.set();
});
if (!res)
{
active.eraseAll([&] (auto x) { return x == item; });
status = pending.tryPush(item);
assert(status);
active.erase(std::remove(active.begin(), active.end(), item), active.end());
pending.push_back(item);
continue;
}
status = active.tryPush(std::move(item));
assert(status);
active.push_back(std::move(item));
}
}

View File

@ -8,17 +8,36 @@
#include <condition_variable>
#include <set>
#include <boost/circular_buffer.hpp>
#include <common/shared_ptr_helper.h>
#include <common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <Common/Stopwatch.h>
#include <Common/RingBuffer.h>
#include <Storages/MergeTree/ExecutableTask.h>
namespace DB
{
/**
* Executor for a background MergeTree related operations such as merges, mutations, fetches an so on.
* It can execute only successors of ExecutableTask interface.
* Which is a self-written coroutine. It suspends, when returns true from execute() method.
*
* Executor consists of ThreadPool to execute pieces of a task (basically calls 'execute' on a task)
* and a scheduler thread, which manages the tasks. Due to bad experience of working with high memory under
* high memory pressure scheduler thread mustn't do any allocations,
* because it will be a fatal error if this thread will die from a random exception.
*
* There are two queues of a tasks: pending (main queue for all the tasks) and active (currently executing).
* There is an invariant, that task may occur only in one of these queue. It can occur in both queues only in critical sections.
*
* Due to all caveats I described above we use boost::circular_buffer as a container for queues.
*
* Another nuisance that we faces with is than backgroud operations always interacts with an associated Storage.
* So, when a Storage want to shutdown, it must wait until all its background operaions are finished.
*/
class MergeTreeBackgroundExecutor : public shared_ptr_helper<MergeTreeBackgroundExecutor>
{
public:
@ -54,61 +73,11 @@ public:
wait();
}
bool trySchedule(ExecutableTaskPtr task)
{
std::lock_guard lock(mutex);
if (shutdown_suspend)
return false;
try
{
/// This is needed to increase / decrease the number of threads at runtime
if (update_timer.compareAndRestartDeferred(10.))
updateConfiguration();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
auto & value = CurrentMetrics::values[metric];
if (value.load() >= static_cast<int64_t>(max_tasks_count))
return false;
if (!scheduler.joinable())
{
LOG_ERROR(&Poco::Logger::get("MergeTreeBackgroundExecutor"), "Scheduler thread is dead. Trying to alive..");
scheduler = ThreadFromGlobalPool([this]() { schedulerThreadFunction(); });
if (!scheduler.joinable())
LOG_FATAL(&Poco::Logger::get("MergeTreeBackgroundExecutor"), "Scheduler thread is dead permanently. Restart is needed");
}
if (!pending.tryPush(std::make_shared<Item>(std::move(task), metric)))
return false;
has_tasks.notify_one();
return true;
}
bool trySchedule(ExecutableTaskPtr task);
void removeTasksCorrespondingToStorage(StorageID id);
void wait()
{
{
std::lock_guard lock(mutex);
shutdown_suspend = true;
has_tasks.notify_all();
}
if (scheduler.joinable())
scheduler.join();
pool.wait();
}
void wait();
size_t activeCount()
{
@ -124,28 +93,7 @@ public:
private:
void updateConfiguration()
{
auto new_threads_count = threads_count_getter();
auto new_max_tasks_count = max_task_count_getter();
try
{
pending.resize(new_max_tasks_count);
active.resize(new_max_tasks_count);
pool.setMaxFreeThreads(new_threads_count);
pool.setMaxThreads(new_threads_count);
pool.setQueueSize(new_max_tasks_count);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
threads_count = new_threads_count;
max_tasks_count = new_max_tasks_count;
}
void updateConfiguration();
static String toString(Type type);
@ -179,15 +127,15 @@ private:
void schedulerThreadFunction();
/// Initially it will be empty
RingBuffer<ItemPtr> pending{0};
RingBuffer<ItemPtr> active{0};
boost::circular_buffer<ItemPtr> pending{0};
boost::circular_buffer<ItemPtr> active{0};
std::set<StorageID> currently_deleting;
std::mutex remove_mutex;
std::mutex mutex;
std::condition_variable has_tasks;
std::atomic_bool shutdown_suspend{false};
std::atomic_bool shutdown{false};
ThreadPool pool;
ThreadFromGlobalPool scheduler;