This commit is contained in:
Nikita Mikhaylov 2021-09-02 10:39:27 +00:00
parent ceab6feb2a
commit 7f21cd7f3d
6 changed files with 63 additions and 38 deletions

View File

@ -229,6 +229,11 @@ struct ContextSharedPart
std::optional<StorageS3Settings> storage_s3_settings; /// Settings of S3 storage
std::vector<String> warnings; /// Store warning messages about server configuration.
/// Background executors for *MergeTree tables
MergeTreeBackgroundExecutorPtr merge_mutate_executor;
MergeTreeBackgroundExecutorPtr moves_executor;
MergeTreeBackgroundExecutorPtr fetch_executor;
RemoteHostFilter remote_host_filter; /// Allowed URL from config.xml
std::optional<TraceCollector> trace_collector; /// Thread collecting traces from threads executing queries
@ -298,6 +303,13 @@ struct ContextSharedPart
DatabaseCatalog::shutdown();
if (merge_mutate_executor)
merge_mutate_executor->wait();
if (fetch_executor)
fetch_executor->wait();
if (moves_executor)
moves_executor->wait();
std::unique_ptr<SystemLogs> delete_system_logs;
{
auto lock = std::lock_guard(mutex);
@ -2372,13 +2384,6 @@ void Context::shutdown()
}
}
if (merge_mutate_executor)
merge_mutate_executor->wait();
if (fetch_executor)
fetch_executor->wait();
if (moves_executor)
moves_executor->wait();
shared->shutdown();
}
@ -2724,37 +2729,37 @@ PartUUIDsPtr Context::getIgnoredPartUUIDs() const
void Context::initializeBackgroundExecutors()
{
merge_mutate_executor = MergeTreeBackgroundExecutor::create();
moves_executor = MergeTreeBackgroundExecutor::create();
fetch_executor = MergeTreeBackgroundExecutor::create();
shared->merge_mutate_executor = MergeTreeBackgroundExecutor::create(MergeTreeBackgroundExecutor::Type::MERGE_MUTATE);
shared->moves_executor = MergeTreeBackgroundExecutor::create(MergeTreeBackgroundExecutor::Type::MOVE);
shared->fetch_executor = MergeTreeBackgroundExecutor::create(MergeTreeBackgroundExecutor::Type::FETCH);
merge_mutate_executor->setThreadsCount([this] () { return getSettingsRef().background_pool_size; });
merge_mutate_executor->setTasksCount([this] () { return getSettingsRef().background_pool_size; });
merge_mutate_executor->setMetric(CurrentMetrics::BackgroundPoolTask);
shared->merge_mutate_executor->setThreadsCount([this] () { return getSettingsRef().background_pool_size; });
shared->merge_mutate_executor->setTasksCount([this] () { return getSettingsRef().background_pool_size; });
shared->merge_mutate_executor->setMetric(CurrentMetrics::BackgroundPoolTask);
moves_executor->setThreadsCount([this] () { return getSettingsRef().background_move_pool_size; });
moves_executor->setTasksCount([this] () { return getSettingsRef().background_move_pool_size; });
moves_executor->setMetric(CurrentMetrics::BackgroundMovePoolTask);
shared->moves_executor->setThreadsCount([this] () { return getSettingsRef().background_move_pool_size; });
shared->moves_executor->setTasksCount([this] () { return getSettingsRef().background_move_pool_size; });
shared->moves_executor->setMetric(CurrentMetrics::BackgroundMovePoolTask);
fetch_executor->setThreadsCount([this] () { return getSettingsRef().background_fetches_pool_size; });
fetch_executor->setTasksCount([this] () { return getSettingsRef().background_fetches_pool_size; });
fetch_executor->setMetric(CurrentMetrics::BackgroundFetchesPoolTask);
shared->fetch_executor->setThreadsCount([this] () { return getSettingsRef().background_fetches_pool_size; });
shared->fetch_executor->setTasksCount([this] () { return getSettingsRef().background_fetches_pool_size; });
shared->fetch_executor->setMetric(CurrentMetrics::BackgroundFetchesPoolTask);
}
MergeTreeBackgroundExecutorPtr Context::getMergeMutateExecutor() const
{
return merge_mutate_executor;
return shared->merge_mutate_executor;
}
MergeTreeBackgroundExecutorPtr Context::getMovesExecutor() const
{
return moves_executor;
return shared->moves_executor;
}
MergeTreeBackgroundExecutorPtr Context::getFetchesExecutor() const
{
return fetch_executor;
return shared->fetch_executor;
}

View File

@ -281,12 +281,6 @@ private:
/// A flag, used to distinguish between user query and internal query to a database engine (MaterializePostgreSQL).
bool is_internal_query = false;
/// Background executors for *MergeTree tables
/// Must be in global context
MergeTreeBackgroundExecutorPtr merge_mutate_executor;
MergeTreeBackgroundExecutorPtr moves_executor;
MergeTreeBackgroundExecutorPtr fetch_executor;
public:
// Top-level OpenTelemetry trace context for the query. Makes sense only for a query context.
OpenTelemetryTraceContext query_trace_context;

View File

@ -105,12 +105,11 @@ void BackgroundJobAssignee::finish()
{
holder->deactivate();
auto context = getContext();
auto storage_id = data.getStorageID();
context->getMovesExecutor()->removeTasksCorrespondingToStorage(storage_id);
context->getFetchesExecutor()->removeTasksCorrespondingToStorage(storage_id);
context->getMergeMutateExecutor()->removeTasksCorrespondingToStorage(storage_id);
getContext()->getMovesExecutor()->removeTasksCorrespondingToStorage(storage_id);
getContext()->getFetchesExecutor()->removeTasksCorrespondingToStorage(storage_id);
getContext()->getMergeMutateExecutor()->removeTasksCorrespondingToStorage(storage_id);
}
}

View File

@ -7,6 +7,20 @@ namespace DB
{
String MergeTreeBackgroundExecutor::toString(Type type)
{
switch (type)
{
case Type::MERGE_MUTATE:
return "MergeMutate";
case Type::FETCH:
return "Fetch";
case Type::MOVE:
return "Move";
}
}
void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id)
{
std::lock_guard remove_lock(remove_mutex);
@ -21,13 +35,13 @@ void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id
std::erase_if(pending, [&] (auto item) -> bool { return item->task->getStorageID() == id; });
/// Find pending to wait
for (auto & item : active)
for (const auto & item : active)
if (item->task->getStorageID() == id)
tasks_to_wait.emplace_back(item);
}
for (auto & item : tasks_to_wait)
for (const auto & item : tasks_to_wait)
{
assert(item->future.valid());
item->future.wait();
@ -62,6 +76,8 @@ void MergeTreeBackgroundExecutor::schedulerThreadFunction()
bool res = pool.trySchedule([this, item] ()
{
setThreadName(name.c_str());
auto check_if_deleting = [&] () -> bool
{
active.erase(item);

View File

@ -8,6 +8,7 @@
#include <common/shared_ptr_helper.h>
#include <Common/ThreadPool.h>
#include <Common/ArenaAllocator.h>
#include <Storages/MergeTree/ExecutableTask.h>
#include <Storages/MergeTree/MergeTreeData.h>
@ -53,9 +54,16 @@ public:
using CountGetter = std::function<size_t()>;
using Callback = std::function<void()>;
MergeTreeBackgroundExecutor()
enum class Type
{
MERGE_MUTATE,
FETCH,
MOVE
};
explicit MergeTreeBackgroundExecutor(Type type_) : type(type_)
{
name = toString(type);
scheduler = ThreadFromGlobalPool([this]() { schedulerThreadFunction(); });
}
@ -135,7 +143,10 @@ private:
void schedulerThreadFunction();
static String toString(Type type);
Type type;
String name;
CountGetter threads_count_getter;
CountGetter max_task_count_getter;
CurrentMetrics::Metric metric;

View File

@ -54,7 +54,7 @@ private:
TEST(Executor, RemoveTasks)
{
auto executor = DB::MergeTreeBackgroundExecutor::create();
auto executor = DB::MergeTreeBackgroundExecutor::create(DB::MergeTreeBackgroundExecutor::Type::MERGE_MUTATE);
const size_t tasks_kinds = 25;
const size_t batch = 100;
@ -93,7 +93,7 @@ TEST(Executor, RemoveTasks)
TEST(Executor, RemoveTasksStress)
{
auto executor = DB::MergeTreeBackgroundExecutor::create();
auto executor = DB::MergeTreeBackgroundExecutor::create(DB::MergeTreeBackgroundExecutor::Type::MERGE_MUTATE);
const size_t tasks_kinds = 25;
const size_t batch = 100;