mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Merge pull request #28374 from nikitamikhaylov/global-merge-executor
Introduced global executor for background MergeTree-related operations
This commit is contained in:
commit
4db5062d6b
2
contrib/boost
vendored
2
contrib/boost
vendored
@ -1 +1 @@
|
||||
Subproject commit 9cf09dbfd55a5c6202dedbdf40781a51b02c2675
|
||||
Subproject commit 66d17f060c4867aeea99fa2a20cfdae89ae2a2ec
|
@ -16,7 +16,7 @@ if (NOT USE_INTERNAL_BOOST_LIBRARY)
|
||||
graph
|
||||
)
|
||||
|
||||
if(Boost_INCLUDE_DIR AND Boost_FILESYSTEM_LIBRARY AND Boost_FILESYSTEM_LIBRARY AND
|
||||
if(Boost_INCLUDE_DIR AND Boost_FILESYSTEM_LIBRARY AND
|
||||
Boost_PROGRAM_OPTIONS_LIBRARY AND Boost_REGEX_LIBRARY AND Boost_SYSTEM_LIBRARY AND Boost_CONTEXT_LIBRARY AND
|
||||
Boost_COROUTINE_LIBRARY AND Boost_GRAPH_LIBRARY)
|
||||
|
||||
@ -238,4 +238,14 @@ if (NOT EXTERNAL_BOOST_FOUND)
|
||||
target_include_directories (_boost_graph PRIVATE ${LIBRARY_DIR})
|
||||
target_link_libraries(_boost_graph PRIVATE _boost_regex)
|
||||
|
||||
# circular buffer
|
||||
add_library(_boost_circular_buffer INTERFACE)
|
||||
add_library(boost::circular_buffer ALIAS _boost_circular_buffer)
|
||||
target_include_directories(_boost_circular_buffer SYSTEM BEFORE INTERFACE ${LIBRARY_DIR})
|
||||
|
||||
# heap
|
||||
add_library(_boost_heap INTERFACE)
|
||||
add_library(boost::heap ALIAS _boost_heap)
|
||||
target_include_directories(_boost_heap SYSTEM BEFORE INTERFACE ${LIBRARY_DIR})
|
||||
|
||||
endif ()
|
||||
|
@ -17,6 +17,7 @@ RUN apt-get update \
|
||||
devscripts \
|
||||
libc++-dev \
|
||||
libc++abi-dev \
|
||||
libboost-all-dev \
|
||||
libboost-program-options-dev \
|
||||
libboost-system-dev \
|
||||
libboost-filesystem-dev \
|
||||
|
@ -246,6 +246,8 @@ try
|
||||
/// Sets external authenticators config (LDAP, Kerberos).
|
||||
global_context->setExternalAuthenticatorsConfig(config());
|
||||
|
||||
global_context->initializeBackgroundExecutors();
|
||||
|
||||
setupUsers();
|
||||
|
||||
/// Limit on total number of concurrently executing queries.
|
||||
|
@ -547,6 +547,8 @@ if (ThreadFuzzer::instance().isEffective())
|
||||
// ignore `max_thread_pool_size` in configs we fetch from ZK, but oh well.
|
||||
GlobalThreadPool::initialize(config().getUInt("max_thread_pool_size", 10000));
|
||||
|
||||
global_context->initializeBackgroundExecutors();
|
||||
|
||||
ConnectionCollector::init(global_context, config().getUInt("max_threads_for_connection_collector", 10));
|
||||
|
||||
bool has_zookeeper = config().has("zookeeper");
|
||||
|
@ -349,6 +349,14 @@ dbms_target_link_libraries (
|
||||
clickhouse_common_io
|
||||
)
|
||||
|
||||
if (NOT_UNBUNDLED)
|
||||
dbms_target_link_libraries (
|
||||
PUBLIC
|
||||
boost::circular_buffer
|
||||
boost::heap
|
||||
)
|
||||
endif()
|
||||
|
||||
target_include_directories(clickhouse_common_io PUBLIC "${CMAKE_CURRENT_BINARY_DIR}/Core/include") # uses some includes from core
|
||||
dbms_target_include_directories(PUBLIC "${CMAKE_CURRENT_BINARY_DIR}/Core/include")
|
||||
|
||||
|
@ -31,8 +31,8 @@ public:
|
||||
/// probably it worth to try to increase stack size for coroutines.
|
||||
///
|
||||
/// Current value is just enough for all tests in our CI. It's not selected in some special
|
||||
/// way. We will have 36 pages with 4KB page size.
|
||||
static constexpr size_t default_stack_size = 144 * 1024; /// 64KB was not enough for tests
|
||||
/// way. We will have 40 pages with 4KB page size.
|
||||
static constexpr size_t default_stack_size = 192 * 1024; /// 64KB was not enough for tests
|
||||
|
||||
explicit FiberStack(size_t stack_size_ = default_stack_size) : stack_size(stack_size_)
|
||||
{
|
||||
|
@ -62,8 +62,8 @@ private:
|
||||
void logMemoryUsage(Int64 current) const;
|
||||
|
||||
public:
|
||||
MemoryTracker(VariableContext level_ = VariableContext::Thread);
|
||||
MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread);
|
||||
explicit MemoryTracker(VariableContext level_ = VariableContext::Thread);
|
||||
explicit MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread);
|
||||
|
||||
~MemoryTracker();
|
||||
|
||||
|
@ -74,6 +74,8 @@ void ThreadPoolImpl<Thread>::setQueueSize(size_t value)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
queue_size = value;
|
||||
/// Reserve memory to get rid of allocations
|
||||
jobs.reserve(queue_size);
|
||||
}
|
||||
|
||||
|
||||
@ -247,7 +249,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
|
||||
|
||||
if (!jobs.empty())
|
||||
{
|
||||
/// std::priority_queue does not provide interface for getting non-const reference to an element
|
||||
/// boost::priority_queue does not provide interface for getting non-const reference to an element
|
||||
/// to prevent us from modifying its priority. We have to use const_cast to force move semantics on JobWithPriority::job.
|
||||
job = std::move(const_cast<Job &>(jobs.top().job));
|
||||
jobs.pop();
|
||||
@ -257,6 +259,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
|
||||
/// shutdown is true, simply finish the thread.
|
||||
return;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (!need_shutdown)
|
||||
|
@ -9,6 +9,8 @@
|
||||
#include <list>
|
||||
#include <optional>
|
||||
|
||||
#include <boost/heap/priority_queue.hpp>
|
||||
|
||||
#include <Poco/Event.h>
|
||||
#include <Common/ThreadStatus.h>
|
||||
#include <common/scope_guard.h>
|
||||
@ -103,11 +105,10 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
std::priority_queue<JobWithPriority> jobs;
|
||||
boost::heap::priority_queue<JobWithPriority> jobs;
|
||||
std::list<Thread> threads;
|
||||
std::exception_ptr first_exception;
|
||||
|
||||
|
||||
template <typename ReturnType>
|
||||
ReturnType scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds);
|
||||
|
||||
|
@ -78,7 +78,7 @@
|
||||
#include <Common/RemoteHostFilter.h>
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
#include <Interpreters/JIT/CompiledExpressionCache.h>
|
||||
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
|
||||
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
|
||||
#include <Interpreters/SynonymsExtensions.h>
|
||||
#include <Interpreters/Lemmatizers.h>
|
||||
@ -101,6 +101,13 @@ namespace CurrentMetrics
|
||||
extern const Metric BackgroundBufferFlushSchedulePoolTask;
|
||||
extern const Metric BackgroundDistributedSchedulePoolTask;
|
||||
extern const Metric BackgroundMessageBrokerSchedulePoolTask;
|
||||
|
||||
|
||||
extern const Metric DelayedInserts;
|
||||
extern const Metric BackgroundPoolTask;
|
||||
extern const Metric BackgroundMovePoolTask;
|
||||
extern const Metric BackgroundFetchesPoolTask;
|
||||
|
||||
}
|
||||
|
||||
namespace DB
|
||||
@ -223,6 +230,11 @@ struct ContextSharedPart
|
||||
std::optional<StorageS3Settings> storage_s3_settings; /// Settings of S3 storage
|
||||
std::vector<String> warnings; /// Store warning messages about server configuration.
|
||||
|
||||
/// Background executors for *MergeTree tables
|
||||
MergeTreeBackgroundExecutorPtr merge_mutate_executor;
|
||||
MergeTreeBackgroundExecutorPtr moves_executor;
|
||||
MergeTreeBackgroundExecutorPtr fetch_executor;
|
||||
|
||||
RemoteHostFilter remote_host_filter; /// Allowed URL from config.xml
|
||||
|
||||
std::optional<TraceCollector> trace_collector; /// Thread collecting traces from threads executing queries
|
||||
@ -292,6 +304,13 @@ struct ContextSharedPart
|
||||
|
||||
DatabaseCatalog::shutdown();
|
||||
|
||||
if (merge_mutate_executor)
|
||||
merge_mutate_executor->wait();
|
||||
if (fetch_executor)
|
||||
fetch_executor->wait();
|
||||
if (moves_executor)
|
||||
moves_executor->wait();
|
||||
|
||||
std::unique_ptr<SystemLogs> delete_system_logs;
|
||||
{
|
||||
auto lock = std::lock_guard(mutex);
|
||||
@ -2718,6 +2737,53 @@ PartUUIDsPtr Context::getIgnoredPartUUIDs() const
|
||||
}
|
||||
|
||||
|
||||
void Context::initializeBackgroundExecutors()
|
||||
{
|
||||
// Initialize background executors with callbacks to be able to change pool size and tasks count at runtime.
|
||||
|
||||
shared->merge_mutate_executor = MergeTreeBackgroundExecutor::create
|
||||
(
|
||||
MergeTreeBackgroundExecutor::Type::MERGE_MUTATE,
|
||||
getSettingsRef().background_pool_size,
|
||||
getSettingsRef().background_pool_size,
|
||||
CurrentMetrics::BackgroundPoolTask
|
||||
);
|
||||
|
||||
shared->moves_executor = MergeTreeBackgroundExecutor::create
|
||||
(
|
||||
MergeTreeBackgroundExecutor::Type::MOVE,
|
||||
getSettingsRef().background_move_pool_size,
|
||||
getSettingsRef().background_move_pool_size,
|
||||
CurrentMetrics::BackgroundMovePoolTask
|
||||
);
|
||||
|
||||
|
||||
shared->fetch_executor = MergeTreeBackgroundExecutor::create
|
||||
(
|
||||
MergeTreeBackgroundExecutor::Type::FETCH,
|
||||
getSettingsRef().background_fetches_pool_size,
|
||||
getSettingsRef().background_fetches_pool_size,
|
||||
CurrentMetrics::BackgroundFetchesPoolTask
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
MergeTreeBackgroundExecutorPtr Context::getMergeMutateExecutor() const
|
||||
{
|
||||
return shared->merge_mutate_executor;
|
||||
}
|
||||
|
||||
MergeTreeBackgroundExecutorPtr Context::getMovesExecutor() const
|
||||
{
|
||||
return shared->moves_executor;
|
||||
}
|
||||
|
||||
MergeTreeBackgroundExecutorPtr Context::getFetchesExecutor() const
|
||||
{
|
||||
return shared->fetch_executor;
|
||||
}
|
||||
|
||||
|
||||
ReadSettings Context::getReadSettings() const
|
||||
{
|
||||
ReadSettings res;
|
||||
|
@ -101,6 +101,8 @@ using StoragePolicyPtr = std::shared_ptr<const IStoragePolicy>;
|
||||
using StoragePoliciesMap = std::map<String, StoragePolicyPtr>;
|
||||
class StoragePolicySelector;
|
||||
using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;
|
||||
class MergeTreeBackgroundExecutor;
|
||||
using MergeTreeBackgroundExecutorPtr = std::shared_ptr<MergeTreeBackgroundExecutor>;
|
||||
struct PartUUIDs;
|
||||
using PartUUIDsPtr = std::shared_ptr<PartUUIDs>;
|
||||
class KeeperDispatcher;
|
||||
@ -830,6 +832,13 @@ public:
|
||||
ReadTaskCallback getReadTaskCallback() const;
|
||||
void setReadTaskCallback(ReadTaskCallback && callback);
|
||||
|
||||
/// Background executors related methods
|
||||
void initializeBackgroundExecutors();
|
||||
|
||||
MergeTreeBackgroundExecutorPtr getMergeMutateExecutor() const;
|
||||
MergeTreeBackgroundExecutorPtr getMovesExecutor() const;
|
||||
MergeTreeBackgroundExecutorPtr getFetchesExecutor() const;
|
||||
|
||||
/** Get settings for reading from filesystem. */
|
||||
ReadSettings getReadSettings() const;
|
||||
|
||||
|
144
src/Storages/MergeTree/BackgroundJobsAssignee.cpp
Normal file
144
src/Storages/MergeTree/BackgroundJobsAssignee.cpp
Normal file
@ -0,0 +1,144 @@
|
||||
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <pcg_random.hpp>
|
||||
#include <random>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
BackgroundJobsAssignee::BackgroundJobsAssignee(MergeTreeData & data_, BackgroundJobsAssignee::Type type_, ContextPtr global_context_)
|
||||
: WithContext(global_context_)
|
||||
, data(data_)
|
||||
, sleep_settings(global_context_->getBackgroundMoveTaskSchedulingSettings())
|
||||
, rng(randomSeed())
|
||||
, type(type_)
|
||||
{
|
||||
}
|
||||
|
||||
void BackgroundJobsAssignee::trigger()
|
||||
{
|
||||
std::lock_guard lock(holder_mutex);
|
||||
|
||||
if (!holder)
|
||||
return;
|
||||
|
||||
no_work_done_count = 0;
|
||||
/// We have background jobs, schedule task as soon as possible
|
||||
holder->schedule();
|
||||
}
|
||||
|
||||
void BackgroundJobsAssignee::postpone()
|
||||
{
|
||||
std::lock_guard lock(holder_mutex);
|
||||
|
||||
if (!holder)
|
||||
return;
|
||||
|
||||
auto no_work_done_times = no_work_done_count.fetch_add(1, std::memory_order_relaxed);
|
||||
double random_addition = std::uniform_real_distribution<double>(0, sleep_settings.task_sleep_seconds_when_no_work_random_part)(rng);
|
||||
|
||||
size_t next_time_to_execute = 1000 * (std::min(
|
||||
sleep_settings.task_sleep_seconds_when_no_work_max,
|
||||
sleep_settings.thread_sleep_seconds_if_nothing_to_do * std::pow(sleep_settings.task_sleep_seconds_when_no_work_multiplier, no_work_done_times))
|
||||
+ random_addition);
|
||||
|
||||
holder->scheduleAfter(next_time_to_execute, false);
|
||||
}
|
||||
|
||||
|
||||
void BackgroundJobsAssignee::scheduleMergeMutateTask(ExecutableTaskPtr merge_task)
|
||||
{
|
||||
bool res = getContext()->getMergeMutateExecutor()->trySchedule(merge_task);
|
||||
res ? trigger() : postpone();
|
||||
}
|
||||
|
||||
|
||||
void BackgroundJobsAssignee::scheduleFetchTask(ExecutableTaskPtr fetch_task)
|
||||
{
|
||||
bool res = getContext()->getFetchesExecutor()->trySchedule(fetch_task);
|
||||
res ? trigger() : postpone();
|
||||
}
|
||||
|
||||
|
||||
void BackgroundJobsAssignee::scheduleMoveTask(ExecutableTaskPtr move_task)
|
||||
{
|
||||
bool res = getContext()->getMovesExecutor()->trySchedule(move_task);
|
||||
res ? trigger() : postpone();
|
||||
}
|
||||
|
||||
|
||||
String BackgroundJobsAssignee::toString(Type type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case Type::DataProcessing:
|
||||
return "DataProcessing";
|
||||
case Type::Moving:
|
||||
return "Moving";
|
||||
}
|
||||
__builtin_unreachable();
|
||||
}
|
||||
|
||||
void BackgroundJobsAssignee::start()
|
||||
{
|
||||
std::lock_guard lock(holder_mutex);
|
||||
if (!holder)
|
||||
holder = getContext()->getSchedulePool().createTask("BackgroundJobsAssignee:" + toString(type), [this]{ threadFunc(); });
|
||||
|
||||
holder->activateAndSchedule();
|
||||
}
|
||||
|
||||
void BackgroundJobsAssignee::finish()
|
||||
{
|
||||
/// No lock here, because scheduled tasks could call trigger method
|
||||
if (holder)
|
||||
{
|
||||
holder->deactivate();
|
||||
|
||||
auto storage_id = data.getStorageID();
|
||||
|
||||
getContext()->getMovesExecutor()->removeTasksCorrespondingToStorage(storage_id);
|
||||
getContext()->getFetchesExecutor()->removeTasksCorrespondingToStorage(storage_id);
|
||||
getContext()->getMergeMutateExecutor()->removeTasksCorrespondingToStorage(storage_id);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void BackgroundJobsAssignee::threadFunc()
|
||||
try
|
||||
{
|
||||
bool succeed = false;
|
||||
switch (type)
|
||||
{
|
||||
case Type::DataProcessing:
|
||||
succeed = data.scheduleDataProcessingJob(*this);
|
||||
break;
|
||||
case Type::Moving:
|
||||
succeed = data.scheduleDataMovingJob(*this);
|
||||
break;
|
||||
}
|
||||
|
||||
if (!succeed)
|
||||
postpone();
|
||||
}
|
||||
catch (...) /// Catch any exception to avoid thread termination.
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
postpone();
|
||||
}
|
||||
|
||||
BackgroundJobsAssignee::~BackgroundJobsAssignee()
|
||||
{
|
||||
try
|
||||
{
|
||||
finish();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
89
src/Storages/MergeTree/BackgroundJobsAssignee.h
Normal file
89
src/Storages/MergeTree/BackgroundJobsAssignee.h
Normal file
@ -0,0 +1,89 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Core/BackgroundSchedulePool.h>
|
||||
#include <pcg_random.hpp>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Settings for background tasks scheduling. Each background assignee has one
|
||||
/// BackgroundSchedulingPoolTask and depending on execution result may put this
|
||||
/// task to sleep according to settings. Look at scheduleTask function for details.
|
||||
struct BackgroundTaskSchedulingSettings
|
||||
{
|
||||
double thread_sleep_seconds_random_part = 1.0;
|
||||
double thread_sleep_seconds_if_nothing_to_do = 0.1;
|
||||
double task_sleep_seconds_when_no_work_max = 600;
|
||||
/// For exponential backoff.
|
||||
double task_sleep_seconds_when_no_work_multiplier = 1.1;
|
||||
|
||||
double task_sleep_seconds_when_no_work_random_part = 1.0;
|
||||
|
||||
/// Deprecated settings, don't affect background execution
|
||||
double thread_sleep_seconds = 10;
|
||||
double task_sleep_seconds_when_no_work_min = 10;
|
||||
};
|
||||
|
||||
class MergeTreeData;
|
||||
|
||||
class BackgroundJobsAssignee : public WithContext
|
||||
{
|
||||
private:
|
||||
MergeTreeData & data;
|
||||
|
||||
/// Settings for execution control of background scheduling task
|
||||
BackgroundTaskSchedulingSettings sleep_settings;
|
||||
/// Useful for random backoff timeouts generation
|
||||
pcg64 rng;
|
||||
|
||||
/// How many times execution of background job failed or we have
|
||||
/// no new jobs.
|
||||
std::atomic<size_t> no_work_done_count{0};
|
||||
|
||||
/// Scheduling task which assign jobs in background pool
|
||||
BackgroundSchedulePool::TaskHolder holder;
|
||||
/// Mutex for thread safety
|
||||
std::mutex holder_mutex;
|
||||
|
||||
public:
|
||||
/// In case of ReplicatedMergeTree the first assignee will be responsible for
|
||||
/// polling the replication queue and schedule operations according to the LogEntry type
|
||||
/// e.g. merges, mutations and fetches. The same will be for Plain MergeTree except there is no
|
||||
/// replication queue, so we will just scan parts and decide what to do.
|
||||
/// Moving operations are the same for all types of MergeTree and also have their own timetable.
|
||||
enum class Type
|
||||
{
|
||||
DataProcessing,
|
||||
Moving
|
||||
};
|
||||
Type type{Type::DataProcessing};
|
||||
|
||||
void start();
|
||||
void trigger();
|
||||
void postpone();
|
||||
void finish();
|
||||
|
||||
void scheduleMergeMutateTask(ExecutableTaskPtr merge_task);
|
||||
void scheduleFetchTask(ExecutableTaskPtr fetch_task);
|
||||
void scheduleMoveTask(ExecutableTaskPtr move_task);
|
||||
|
||||
/// Just call finish
|
||||
virtual ~BackgroundJobsAssignee();
|
||||
|
||||
BackgroundJobsAssignee(
|
||||
MergeTreeData & data_,
|
||||
Type type,
|
||||
ContextPtr global_context_);
|
||||
|
||||
private:
|
||||
static String toString(Type type);
|
||||
|
||||
/// Function that executes in background scheduling pool
|
||||
void threadFunc();
|
||||
};
|
||||
|
||||
|
||||
}
|
@ -1,289 +0,0 @@
|
||||
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <pcg_random.hpp>
|
||||
#include <random>
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric BackgroundPoolTask;
|
||||
extern const Metric BackgroundMovePoolTask;
|
||||
extern const Metric BackgroundFetchesPoolTask;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
IBackgroundJobExecutor::IBackgroundJobExecutor(
|
||||
ContextPtr global_context_,
|
||||
const BackgroundTaskSchedulingSettings & sleep_settings_,
|
||||
const std::vector<PoolConfig> & pools_configs_)
|
||||
: WithContext(global_context_)
|
||||
, sleep_settings(sleep_settings_)
|
||||
, rng(randomSeed())
|
||||
{
|
||||
for (const auto & pool_config : pools_configs_)
|
||||
{
|
||||
const auto max_pool_size = pool_config.get_max_pool_size();
|
||||
pools.try_emplace(pool_config.pool_type, max_pool_size, 0, max_pool_size, false);
|
||||
pools_configs.emplace(pool_config.pool_type, pool_config);
|
||||
}
|
||||
}
|
||||
|
||||
double IBackgroundJobExecutor::getSleepRandomAdd()
|
||||
{
|
||||
std::lock_guard random_lock(random_mutex);
|
||||
return std::uniform_real_distribution<double>(0, sleep_settings.task_sleep_seconds_when_no_work_random_part)(rng);
|
||||
}
|
||||
|
||||
void IBackgroundJobExecutor::runTaskWithoutDelay()
|
||||
{
|
||||
no_work_done_count = 0;
|
||||
/// We have background jobs, schedule task as soon as possible
|
||||
scheduling_task->schedule();
|
||||
}
|
||||
|
||||
void IBackgroundJobExecutor::scheduleTask(bool with_backoff)
|
||||
{
|
||||
size_t next_time_to_execute;
|
||||
if (with_backoff)
|
||||
{
|
||||
auto no_work_done_times = no_work_done_count.fetch_add(1, std::memory_order_relaxed);
|
||||
|
||||
next_time_to_execute = 1000 * (std::min(
|
||||
sleep_settings.task_sleep_seconds_when_no_work_max,
|
||||
sleep_settings.thread_sleep_seconds_if_nothing_to_do * std::pow(sleep_settings.task_sleep_seconds_when_no_work_multiplier, no_work_done_times))
|
||||
+ getSleepRandomAdd());
|
||||
}
|
||||
else
|
||||
{
|
||||
no_work_done_count = 0;
|
||||
next_time_to_execute = 1000 * sleep_settings.thread_sleep_seconds_if_nothing_to_do;
|
||||
}
|
||||
|
||||
scheduling_task->scheduleAfter(next_time_to_execute, false);
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
/// Tricky function: we have separate thread pool with max_threads in each background executor for each table
|
||||
/// But we want total background threads to be less than max_threads value. So we use global atomic counter (BackgroundMetric)
|
||||
/// to limit total number of background threads.
|
||||
bool incrementMetricIfLessThanMax(std::atomic<Int64> & atomic_value, Int64 max_value)
|
||||
{
|
||||
auto value = atomic_value.load(std::memory_order_relaxed);
|
||||
while (value < max_value)
|
||||
{
|
||||
if (atomic_value.compare_exchange_weak(value, value + 1, std::memory_order_release, std::memory_order_relaxed))
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// This is a RAII class which only decrements metric.
|
||||
/// It is added because after all other fixes a bug non-executing merges was occurred again.
|
||||
/// Last hypothesis: task was successfully added to pool, however, was not executed because of internal exception in it.
|
||||
class ParanoidMetricDecrementor
|
||||
{
|
||||
public:
|
||||
explicit ParanoidMetricDecrementor(CurrentMetrics::Metric metric_) : metric(metric_) {}
|
||||
void alarm() { is_alarmed = true; }
|
||||
void decrement()
|
||||
{
|
||||
if (is_alarmed.exchange(false))
|
||||
{
|
||||
CurrentMetrics::values[metric]--;
|
||||
}
|
||||
}
|
||||
|
||||
~ParanoidMetricDecrementor() { decrement(); }
|
||||
|
||||
private:
|
||||
|
||||
CurrentMetrics::Metric metric;
|
||||
std::atomic_bool is_alarmed = false;
|
||||
};
|
||||
|
||||
void IBackgroundJobExecutor::execute(JobAndPool job_and_pool)
|
||||
try
|
||||
{
|
||||
auto & pool_config = pools_configs[job_and_pool.pool_type];
|
||||
const auto max_pool_size = pool_config.get_max_pool_size();
|
||||
|
||||
auto metric_decrementor = std::make_shared<ParanoidMetricDecrementor>(pool_config.tasks_metric);
|
||||
|
||||
/// If corresponding pool is not full increment metric and assign new job
|
||||
if (incrementMetricIfLessThanMax(CurrentMetrics::values[pool_config.tasks_metric], max_pool_size))
|
||||
{
|
||||
metric_decrementor->alarm();
|
||||
try /// this try required because we have to manually decrement metric
|
||||
{
|
||||
/// Synchronize pool size, because config could be reloaded
|
||||
pools[job_and_pool.pool_type].setMaxThreads(max_pool_size);
|
||||
pools[job_and_pool.pool_type].setQueueSize(max_pool_size);
|
||||
|
||||
pools[job_and_pool.pool_type].scheduleOrThrowOnError([this, metric_decrementor, job{std::move(job_and_pool.job)}] ()
|
||||
{
|
||||
try /// We don't want exceptions in background pool
|
||||
{
|
||||
bool job_success = job();
|
||||
/// Job done, decrement metric and reset no_work counter
|
||||
metric_decrementor->decrement();
|
||||
|
||||
if (job_success)
|
||||
{
|
||||
/// Job done, new empty space in pool, schedule background task
|
||||
runTaskWithoutDelay();
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Job done, but failed, schedule with backoff
|
||||
scheduleTask(/* with_backoff = */ true);
|
||||
}
|
||||
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
metric_decrementor->decrement();
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
scheduleTask(/* with_backoff = */ true);
|
||||
}
|
||||
});
|
||||
/// We've scheduled task in the background pool and when it will finish we will be triggered again. But this task can be
|
||||
/// extremely long and we may have a lot of other small tasks to do, so we schedule ourselves here.
|
||||
runTaskWithoutDelay();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// With our Pool settings scheduleOrThrowOnError shouldn't throw exceptions, but for safety catch added here
|
||||
metric_decrementor->decrement();
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
scheduleTask(/* with_backoff = */ true);
|
||||
}
|
||||
}
|
||||
else /// Pool is full and we have some work to do
|
||||
{
|
||||
scheduleTask(/* with_backoff = */ false);
|
||||
}
|
||||
}
|
||||
catch (...) /// Exception while we looking for a task, reschedule
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
|
||||
/// Why do we scheduleTask again?
|
||||
/// To retry on exception, since it may be some temporary exception.
|
||||
scheduleTask(/* with_backoff = */ true);
|
||||
}
|
||||
|
||||
void IBackgroundJobExecutor::start()
|
||||
{
|
||||
std::lock_guard lock(scheduling_task_mutex);
|
||||
if (!scheduling_task)
|
||||
{
|
||||
scheduling_task = getContext()->getSchedulePool().createTask(
|
||||
getBackgroundTaskName(), [this]{ backgroundTaskFunction(); });
|
||||
}
|
||||
|
||||
scheduling_task->activateAndSchedule();
|
||||
}
|
||||
|
||||
void IBackgroundJobExecutor::finish()
|
||||
{
|
||||
std::lock_guard lock(scheduling_task_mutex);
|
||||
if (scheduling_task)
|
||||
{
|
||||
scheduling_task->deactivate();
|
||||
for (auto & [pool_type, pool] : pools)
|
||||
pool.wait();
|
||||
}
|
||||
}
|
||||
|
||||
void IBackgroundJobExecutor::triggerTask()
|
||||
{
|
||||
std::lock_guard lock(scheduling_task_mutex);
|
||||
if (scheduling_task)
|
||||
runTaskWithoutDelay();
|
||||
}
|
||||
|
||||
void IBackgroundJobExecutor::backgroundTaskFunction()
|
||||
try
|
||||
{
|
||||
if (!scheduleJob())
|
||||
scheduleTask(/* with_backoff = */ true);
|
||||
}
|
||||
catch (...) /// Catch any exception to avoid thread termination.
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
scheduleTask(/* with_backoff = */ true);
|
||||
}
|
||||
|
||||
IBackgroundJobExecutor::~IBackgroundJobExecutor()
|
||||
{
|
||||
finish();
|
||||
}
|
||||
|
||||
BackgroundJobsExecutor::BackgroundJobsExecutor(
|
||||
MergeTreeData & data_,
|
||||
ContextPtr global_context_)
|
||||
: IBackgroundJobExecutor(
|
||||
global_context_,
|
||||
global_context_->getBackgroundProcessingTaskSchedulingSettings(),
|
||||
{PoolConfig
|
||||
{
|
||||
.pool_type = PoolType::MERGE_MUTATE,
|
||||
.get_max_pool_size = [global_context_] () { return global_context_->getSettingsRef().background_pool_size; },
|
||||
.tasks_metric = CurrentMetrics::BackgroundPoolTask
|
||||
},
|
||||
PoolConfig
|
||||
{
|
||||
.pool_type = PoolType::FETCH,
|
||||
.get_max_pool_size = [global_context_] () { return global_context_->getSettingsRef().background_fetches_pool_size; },
|
||||
.tasks_metric = CurrentMetrics::BackgroundFetchesPoolTask
|
||||
}
|
||||
})
|
||||
, data(data_)
|
||||
{
|
||||
}
|
||||
|
||||
String BackgroundJobsExecutor::getBackgroundTaskName() const
|
||||
{
|
||||
return data.getStorageID().getFullTableName() + " (dataProcessingTask)";
|
||||
}
|
||||
|
||||
bool BackgroundJobsExecutor::scheduleJob()
|
||||
{
|
||||
return data.scheduleDataProcessingJob(*this);
|
||||
}
|
||||
|
||||
BackgroundMovesExecutor::BackgroundMovesExecutor(
|
||||
MergeTreeData & data_,
|
||||
ContextPtr global_context_)
|
||||
: IBackgroundJobExecutor(
|
||||
global_context_,
|
||||
global_context_->getBackgroundMoveTaskSchedulingSettings(),
|
||||
{PoolConfig
|
||||
{
|
||||
.pool_type = PoolType::MOVE,
|
||||
.get_max_pool_size = [global_context_] () { return global_context_->getSettingsRef().background_move_pool_size; },
|
||||
.tasks_metric = CurrentMetrics::BackgroundMovePoolTask
|
||||
}
|
||||
})
|
||||
, data(data_)
|
||||
{
|
||||
}
|
||||
|
||||
String BackgroundMovesExecutor::getBackgroundTaskName() const
|
||||
{
|
||||
return data.getStorageID().getFullTableName() + " (dataMovingTask)";
|
||||
}
|
||||
|
||||
bool BackgroundMovesExecutor::scheduleJob()
|
||||
{
|
||||
return data.scheduleDataMovingJob(*this);
|
||||
}
|
||||
|
||||
}
|
@ -1,162 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Core/BackgroundSchedulePool.h>
|
||||
#include <pcg_random.hpp>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Settings for background tasks scheduling. Each background executor has one
|
||||
/// BackgroundSchedulingPoolTask and depending on execution result may put this
|
||||
/// task to sleep according to settings. Look at scheduleTask function for details.
|
||||
struct BackgroundTaskSchedulingSettings
|
||||
{
|
||||
double thread_sleep_seconds_random_part = 1.0;
|
||||
double thread_sleep_seconds_if_nothing_to_do = 0.1;
|
||||
double task_sleep_seconds_when_no_work_max = 600;
|
||||
/// For exponential backoff.
|
||||
double task_sleep_seconds_when_no_work_multiplier = 1.1;
|
||||
|
||||
double task_sleep_seconds_when_no_work_random_part = 1.0;
|
||||
|
||||
/// Deprecated settings, don't affect background execution
|
||||
double thread_sleep_seconds = 10;
|
||||
double task_sleep_seconds_when_no_work_min = 10;
|
||||
};
|
||||
|
||||
/// Pool type where we must execute new job. Each background executor can have several
|
||||
/// background pools. When it receives new job it will execute new task in corresponding pool.
|
||||
enum class PoolType
|
||||
{
|
||||
MERGE_MUTATE,
|
||||
MOVE,
|
||||
FETCH,
|
||||
};
|
||||
|
||||
using BackgroundJobFunc = std::function<bool()>;
|
||||
|
||||
/// Result from background job providers. Function which will be executed in pool and pool type.
|
||||
struct JobAndPool
|
||||
{
|
||||
BackgroundJobFunc job;
|
||||
PoolType pool_type;
|
||||
};
|
||||
|
||||
/// Background jobs executor which execute heavy-weight background tasks for MergTree tables, like
|
||||
/// background merges, moves, mutations, fetches and so on.
|
||||
/// Consists of two important parts:
|
||||
/// 1) Task in background scheduling pool which receives new jobs from storages and put them into required pool.
|
||||
/// 2) One or more ThreadPool objects, which execute background jobs.
|
||||
class IBackgroundJobExecutor : protected WithContext
|
||||
{
|
||||
protected:
|
||||
/// Configuration for single background ThreadPool
|
||||
struct PoolConfig
|
||||
{
|
||||
/// This pool type
|
||||
PoolType pool_type;
|
||||
/// Max pool size in threads
|
||||
const std::function<size_t()> get_max_pool_size;
|
||||
/// Metric that we have to increment when we execute task in this pool
|
||||
CurrentMetrics::Metric tasks_metric;
|
||||
};
|
||||
|
||||
private:
|
||||
/// Name for task in background scheduling pool
|
||||
String task_name;
|
||||
/// Settings for execution control of background scheduling task
|
||||
BackgroundTaskSchedulingSettings sleep_settings;
|
||||
/// Useful for random backoff timeouts generation
|
||||
pcg64 rng;
|
||||
|
||||
/// How many times execution of background job failed or we have
|
||||
/// no new jobs.
|
||||
std::atomic<size_t> no_work_done_count{0};
|
||||
|
||||
/// Pools where we execute background jobs
|
||||
std::unordered_map<PoolType, ThreadPool> pools;
|
||||
/// Configs for background pools
|
||||
std::unordered_map<PoolType, PoolConfig> pools_configs;
|
||||
|
||||
/// Scheduling task which assign jobs in background pool
|
||||
BackgroundSchedulePool::TaskHolder scheduling_task;
|
||||
/// Mutex for thread safety
|
||||
std::mutex scheduling_task_mutex;
|
||||
/// Mutex for pcg random generator thread safety
|
||||
std::mutex random_mutex;
|
||||
|
||||
public:
|
||||
/// These three functions are thread safe
|
||||
|
||||
/// Start background task and start to assign jobs
|
||||
void start();
|
||||
/// Schedule background task as soon as possible, even if it sleep at this
|
||||
/// moment for some reason.
|
||||
void triggerTask();
|
||||
/// Finish execution: deactivate background task and wait already scheduled jobs
|
||||
void finish();
|
||||
|
||||
/// Executes job in a nested pool
|
||||
void execute(JobAndPool job_and_pool);
|
||||
|
||||
/// Just call finish
|
||||
virtual ~IBackgroundJobExecutor();
|
||||
|
||||
protected:
|
||||
IBackgroundJobExecutor(
|
||||
ContextPtr global_context_,
|
||||
const BackgroundTaskSchedulingSettings & sleep_settings_,
|
||||
const std::vector<PoolConfig> & pools_configs_);
|
||||
|
||||
/// Name for task in background schedule pool
|
||||
virtual String getBackgroundTaskName() const = 0;
|
||||
|
||||
/// Schedules a job in a nested pool in this class.
|
||||
virtual bool scheduleJob() = 0;
|
||||
|
||||
private:
|
||||
/// Function that executes in background scheduling pool
|
||||
void backgroundTaskFunction();
|
||||
/// Recalculate timeouts when we have to check for a new job
|
||||
void scheduleTask(bool with_backoff);
|
||||
/// Run background task as fast as possible and reset errors counter
|
||||
void runTaskWithoutDelay();
|
||||
/// Return random add for sleep in case of error
|
||||
double getSleepRandomAdd();
|
||||
};
|
||||
|
||||
/// Main jobs executor: merges, mutations, fetches and so on
|
||||
class BackgroundJobsExecutor final : public IBackgroundJobExecutor
|
||||
{
|
||||
private:
|
||||
MergeTreeData & data;
|
||||
public:
|
||||
BackgroundJobsExecutor(
|
||||
MergeTreeData & data_,
|
||||
ContextPtr global_context_);
|
||||
|
||||
protected:
|
||||
String getBackgroundTaskName() const override;
|
||||
bool scheduleJob() override;
|
||||
};
|
||||
|
||||
/// Move jobs executor, move parts between disks in the background
|
||||
/// Does nothing in case of default configuration
|
||||
class BackgroundMovesExecutor final : public IBackgroundJobExecutor
|
||||
{
|
||||
private:
|
||||
MergeTreeData & data;
|
||||
public:
|
||||
BackgroundMovesExecutor(
|
||||
MergeTreeData & data_,
|
||||
ContextPtr global_context_);
|
||||
|
||||
protected:
|
||||
String getBackgroundTaskName() const override;
|
||||
bool scheduleJob() override;
|
||||
};
|
||||
|
||||
}
|
70
src/Storages/MergeTree/IExecutableTask.h
Normal file
70
src/Storages/MergeTree/IExecutableTask.h
Normal file
@ -0,0 +1,70 @@
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <functional>
|
||||
|
||||
#include <common/shared_ptr_helper.h>
|
||||
#include <Interpreters/StorageID.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/**
|
||||
* Generic interface for background operations. Simply this is self-made coroutine.
|
||||
* The main method is executeStep, which will return true
|
||||
* if the task wants to execute another 'step' in near future and false otherwise.
|
||||
*
|
||||
* Each storage assigns some operations such as merges, mutations, fetches, etc.
|
||||
* We need to ask a storage or some another entity to try to assign another operation when current operation is completed.
|
||||
*
|
||||
* Each task corresponds to a storage, that's why there is a method getStorageID.
|
||||
* This is needed to correctly shutdown a storage, e.g. we need to wait for all background operations to complete.
|
||||
*/
|
||||
class IExecutableTask
|
||||
{
|
||||
public:
|
||||
virtual bool executeStep() = 0;
|
||||
virtual void onCompleted() = 0;
|
||||
virtual StorageID getStorageID() = 0;
|
||||
virtual ~IExecutableTask() = default;
|
||||
};
|
||||
|
||||
using ExecutableTaskPtr = std::shared_ptr<IExecutableTask>;
|
||||
|
||||
|
||||
/**
|
||||
* Some background operations won't represent a coroutines (don't want to be executed step-by-step). For this we have this wrapper.
|
||||
*/
|
||||
class ExecutableLambdaAdapter : public shared_ptr_helper<ExecutableLambdaAdapter>, public IExecutableTask
|
||||
{
|
||||
public:
|
||||
|
||||
template <typename Job, typename Callback>
|
||||
explicit ExecutableLambdaAdapter(
|
||||
Job && job_to_execute_,
|
||||
Callback && job_result_callback_,
|
||||
StorageID id_)
|
||||
: job_to_execute(job_to_execute_)
|
||||
, job_result_callback(job_result_callback_)
|
||||
, id(id_) {}
|
||||
|
||||
bool executeStep() override
|
||||
{
|
||||
res = job_to_execute();
|
||||
job_to_execute = {};
|
||||
return false;
|
||||
}
|
||||
|
||||
void onCompleted() override { job_result_callback(!res); }
|
||||
|
||||
StorageID getStorageID() override { return id; }
|
||||
|
||||
private:
|
||||
bool res = false;
|
||||
std::function<bool()> job_to_execute;
|
||||
std::function<void(bool)> job_result_callback;
|
||||
StorageID id;
|
||||
};
|
||||
|
||||
|
||||
}
|
185
src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp
Normal file
185
src/Storages/MergeTree/MergeTreeBackgroundExecutor.cpp
Normal file
@ -0,0 +1,185 @@
|
||||
#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
#include <Common/setThreadName.h>
|
||||
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
||||
String MergeTreeBackgroundExecutor::toString(Type type)
|
||||
{
|
||||
switch (type)
|
||||
{
|
||||
case Type::MERGE_MUTATE:
|
||||
return "MergeMutate";
|
||||
case Type::FETCH:
|
||||
return "Fetch";
|
||||
case Type::MOVE:
|
||||
return "Move";
|
||||
}
|
||||
__builtin_unreachable();
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeBackgroundExecutor::wait()
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
shutdown = true;
|
||||
has_tasks.notify_all();
|
||||
}
|
||||
|
||||
pool.wait();
|
||||
}
|
||||
|
||||
|
||||
bool MergeTreeBackgroundExecutor::trySchedule(ExecutableTaskPtr task)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
if (shutdown)
|
||||
return false;
|
||||
|
||||
auto & value = CurrentMetrics::values[metric];
|
||||
if (value.load() >= static_cast<int64_t>(max_tasks_count))
|
||||
return false;
|
||||
|
||||
pending.push_back(std::make_shared<TaskRuntimeData>(std::move(task), metric));
|
||||
|
||||
has_tasks.notify_one();
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id)
|
||||
{
|
||||
std::vector<TaskRuntimeDataPtr> tasks_to_wait;
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
/// Erase storage related tasks from pending and select active tasks to wait for
|
||||
auto it = std::remove_if(pending.begin(), pending.end(),
|
||||
[&] (auto item) -> bool { return item->task->getStorageID() == id; });
|
||||
pending.erase(it, pending.end());
|
||||
|
||||
/// Copy items to wait for their completion
|
||||
std::copy_if(active.begin(), active.end(), std::back_inserter(tasks_to_wait),
|
||||
[&] (auto item) -> bool { return item->task->getStorageID() == id; });
|
||||
|
||||
for (auto & item : tasks_to_wait)
|
||||
item->is_currently_deleting = true;
|
||||
}
|
||||
|
||||
|
||||
for (auto & item : tasks_to_wait)
|
||||
item->is_done.wait();
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeBackgroundExecutor::routine(TaskRuntimeDataPtr item)
|
||||
{
|
||||
DENY_ALLOCATIONS_IN_SCOPE;
|
||||
|
||||
/// All operations with queues are considered no to do any allocations
|
||||
|
||||
auto erase_from_active = [this, item]
|
||||
{
|
||||
active.erase(std::remove(active.begin(), active.end(), item), active.end());
|
||||
};
|
||||
|
||||
bool need_execute_again = false;
|
||||
|
||||
try
|
||||
{
|
||||
ALLOW_ALLOCATIONS_IN_SCOPE;
|
||||
need_execute_again = item->task->executeStep();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
|
||||
|
||||
if (need_execute_again)
|
||||
{
|
||||
std::lock_guard guard(mutex);
|
||||
|
||||
if (item->is_currently_deleting)
|
||||
{
|
||||
erase_from_active();
|
||||
return;
|
||||
}
|
||||
|
||||
pending.push_back(item);
|
||||
erase_from_active();
|
||||
has_tasks.notify_one();
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
{
|
||||
std::lock_guard guard(mutex);
|
||||
erase_from_active();
|
||||
has_tasks.notify_one();
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
ALLOW_ALLOCATIONS_IN_SCOPE;
|
||||
/// In a situation of a lack of memory this method can throw an exception,
|
||||
/// because it may interact somehow with BackgroundSchedulePool, which may allocate memory
|
||||
/// But it is rather safe, because we have try...catch block here, and another one in ThreadPool.
|
||||
item->task->onCompleted();
|
||||
item->task.reset();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeBackgroundExecutor::threadFunction()
|
||||
{
|
||||
setThreadName(name.c_str());
|
||||
|
||||
DENY_ALLOCATIONS_IN_SCOPE;
|
||||
|
||||
while (true)
|
||||
{
|
||||
try
|
||||
{
|
||||
TaskRuntimeDataPtr item;
|
||||
{
|
||||
std::unique_lock lock(mutex);
|
||||
has_tasks.wait(lock, [this](){ return !pending.empty() || shutdown; });
|
||||
|
||||
if (shutdown)
|
||||
break;
|
||||
|
||||
item = std::move(pending.front());
|
||||
pending.pop_front();
|
||||
active.push_back(item);
|
||||
}
|
||||
|
||||
routine(item);
|
||||
|
||||
/// When storage shutdowns it will wait until all related background tasks
|
||||
/// are finished, because they may want to interact with its fields
|
||||
/// and this will cause segfault.
|
||||
if (item->is_currently_deleting)
|
||||
item->is_done.set();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
tryLogCurrentException(__PRETTY_FUNCTION__);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
156
src/Storages/MergeTree/MergeTreeBackgroundExecutor.h
Normal file
156
src/Storages/MergeTree/MergeTreeBackgroundExecutor.h
Normal file
@ -0,0 +1,156 @@
|
||||
#pragma once
|
||||
|
||||
#include <deque>
|
||||
#include <functional>
|
||||
#include <atomic>
|
||||
#include <mutex>
|
||||
#include <future>
|
||||
#include <condition_variable>
|
||||
#include <set>
|
||||
|
||||
#include <boost/circular_buffer.hpp>
|
||||
|
||||
#include <common/shared_ptr_helper.h>
|
||||
#include <common/logger_useful.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Storages/MergeTree/IExecutableTask.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/**
|
||||
* Executor for a background MergeTree related operations such as merges, mutations, fetches an so on.
|
||||
* It can execute only successors of ExecutableTask interface.
|
||||
* Which is a self-written coroutine. It suspends, when returns true from executeStep() method.
|
||||
*
|
||||
* There are two queues of a tasks: pending (main queue for all the tasks) and active (currently executing).
|
||||
* Pending queue is needed since the number of tasks will be more than thread to execute.
|
||||
* Pending tasks are tasks that successfully scheduled to an executor or tasks that have some extra steps to execute.
|
||||
* There is an invariant, that task may occur only in one of these queue. It can occur in both queues only in critical sections.
|
||||
*
|
||||
* Pending: Active:
|
||||
*
|
||||
* |s| |s| |s| |s| |s| |s| |s| |s| |s| |s| |s|
|
||||
* |s| |s| |s| |s| |s| |s| |s| |s| |s| |s|
|
||||
* |s| |s| |s| |s| |s| |s| |s|
|
||||
* |s| |s| |s| |s|
|
||||
* |s| |s|
|
||||
* |s|
|
||||
*
|
||||
* Each task is simply a sequence of steps. Heavier tasks have longer sequences.
|
||||
* When a step of a task is executed, we move tasks to pending queue. And take another from the queue's head.
|
||||
* With these architecture all small merges / mutations will be executed faster, than bigger ones.
|
||||
*
|
||||
* We use boost::circular_buffer as a container for queues not to do any allocations.
|
||||
*
|
||||
* Another nuisance that we faces with is than background operations always interact with an associated Storage.
|
||||
* So, when a Storage want to shutdown, it must wait until all its background operaions are finished.
|
||||
*/
|
||||
class MergeTreeBackgroundExecutor : public shared_ptr_helper<MergeTreeBackgroundExecutor>
|
||||
{
|
||||
public:
|
||||
|
||||
enum class Type
|
||||
{
|
||||
MERGE_MUTATE,
|
||||
FETCH,
|
||||
MOVE
|
||||
};
|
||||
|
||||
MergeTreeBackgroundExecutor(
|
||||
Type type_,
|
||||
size_t threads_count_,
|
||||
size_t max_tasks_count_,
|
||||
CurrentMetrics::Metric metric_)
|
||||
: type(type_)
|
||||
, threads_count(threads_count_)
|
||||
, max_tasks_count(max_tasks_count_)
|
||||
, metric(metric_)
|
||||
{
|
||||
name = toString(type);
|
||||
|
||||
pending.set_capacity(max_tasks_count);
|
||||
active.set_capacity(max_tasks_count);
|
||||
|
||||
pool.setMaxThreads(std::max(1UL, threads_count));
|
||||
pool.setMaxFreeThreads(std::max(1UL, threads_count));
|
||||
pool.setQueueSize(std::max(1UL, threads_count));
|
||||
|
||||
for (size_t number = 0; number < threads_count; ++number)
|
||||
pool.scheduleOrThrowOnError([this] { threadFunction(); });
|
||||
}
|
||||
|
||||
~MergeTreeBackgroundExecutor()
|
||||
{
|
||||
wait();
|
||||
}
|
||||
|
||||
bool trySchedule(ExecutableTaskPtr task);
|
||||
|
||||
void removeTasksCorrespondingToStorage(StorageID id);
|
||||
|
||||
void wait();
|
||||
|
||||
size_t activeCount()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return active.size();
|
||||
}
|
||||
|
||||
size_t pendingCount()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return pending.size();
|
||||
}
|
||||
|
||||
private:
|
||||
|
||||
static String toString(Type type);
|
||||
|
||||
Type type;
|
||||
String name;
|
||||
size_t threads_count{0};
|
||||
size_t max_tasks_count{0};
|
||||
CurrentMetrics::Metric metric;
|
||||
|
||||
/**
|
||||
* Has RAII class to determine how many tasks are waiting for the execution and executing at the moment.
|
||||
* Also has some flags and primitives to wait for current task to be executed.
|
||||
*/
|
||||
struct TaskRuntimeData
|
||||
{
|
||||
TaskRuntimeData(ExecutableTaskPtr && task_, CurrentMetrics::Metric metric_)
|
||||
: task(std::move(task_))
|
||||
, increment(std::move(metric_))
|
||||
{}
|
||||
|
||||
ExecutableTaskPtr task;
|
||||
CurrentMetrics::Increment increment;
|
||||
std::atomic_bool is_currently_deleting{false};
|
||||
/// Actually autoreset=false is needed only for unit test
|
||||
/// where multiple threads could remove tasks corresponding to the same storage
|
||||
/// This scenario in not possible in reality.
|
||||
Poco::Event is_done{/*autoreset=*/false};
|
||||
};
|
||||
|
||||
using TaskRuntimeDataPtr = std::shared_ptr<TaskRuntimeData>;
|
||||
|
||||
void routine(TaskRuntimeDataPtr item);
|
||||
|
||||
void threadFunction();
|
||||
|
||||
/// Initially it will be empty
|
||||
boost::circular_buffer<TaskRuntimeDataPtr> pending{0};
|
||||
boost::circular_buffer<TaskRuntimeDataPtr> active{0};
|
||||
|
||||
std::mutex mutex;
|
||||
std::condition_variable has_tasks;
|
||||
|
||||
std::atomic_bool shutdown{false};
|
||||
|
||||
ThreadPool pool;
|
||||
};
|
||||
|
||||
}
|
@ -200,6 +200,8 @@ MergeTreeData::MergeTreeData(
|
||||
, data_parts_by_info(data_parts_indexes.get<TagByInfo>())
|
||||
, data_parts_by_state_and_info(data_parts_indexes.get<TagByStateAndInfo>())
|
||||
, parts_mover(this)
|
||||
, background_operations_assignee(*this, BackgroundJobsAssignee::Type::DataProcessing, getContext())
|
||||
, background_moves_assignee(*this, BackgroundJobsAssignee::Type::Moving, getContext())
|
||||
{
|
||||
const auto settings = getSettings();
|
||||
allow_nullable_key = attach || settings->allow_nullable_key;
|
||||
@ -305,6 +307,22 @@ MergeTreeData::MergeTreeData(
|
||||
if (!canUsePolymorphicParts(*settings, &reason) && !reason.empty())
|
||||
LOG_WARNING(log, "{} Settings 'min_rows_for_wide_part', 'min_bytes_for_wide_part', "
|
||||
"'min_rows_for_compact_part' and 'min_bytes_for_compact_part' will be ignored.", reason);
|
||||
|
||||
common_assignee_trigger = [this] (bool delay) noexcept
|
||||
{
|
||||
if (delay)
|
||||
background_operations_assignee.postpone();
|
||||
else
|
||||
background_operations_assignee.trigger();
|
||||
};
|
||||
|
||||
moves_assignee_trigger = [this] (bool delay) noexcept
|
||||
{
|
||||
if (delay)
|
||||
background_moves_assignee.postpone();
|
||||
else
|
||||
background_moves_assignee.trigger();
|
||||
};
|
||||
}
|
||||
|
||||
StoragePolicyPtr MergeTreeData::getStoragePolicy() const
|
||||
@ -5011,7 +5029,7 @@ MergeTreeData::CurrentlyMovingPartsTagger::~CurrentlyMovingPartsTagger()
|
||||
}
|
||||
}
|
||||
|
||||
bool MergeTreeData::scheduleDataMovingJob(IBackgroundJobExecutor & executor)
|
||||
bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & assignee)
|
||||
{
|
||||
if (parts_mover.moves_blocker.isCancelled())
|
||||
return false;
|
||||
@ -5020,10 +5038,11 @@ bool MergeTreeData::scheduleDataMovingJob(IBackgroundJobExecutor & executor)
|
||||
if (moving_tagger->parts_to_move.empty())
|
||||
return false;
|
||||
|
||||
executor.execute({[this, moving_tagger] () mutable
|
||||
{
|
||||
return moveParts(moving_tagger);
|
||||
}, PoolType::MOVE});
|
||||
assignee.scheduleMoveTask(ExecutableLambdaAdapter::create(
|
||||
[this, moving_tagger] () mutable
|
||||
{
|
||||
return moveParts(moving_tagger);
|
||||
}, moves_assignee_trigger, getStorageID()));
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <Common/SimpleIncrement.h>
|
||||
#include <Common/MultiVersion.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
|
||||
#include <Storages/MergeTree/MergeTreeIndices.h>
|
||||
#include <Storages/MergeTree/MergeTreePartInfo.h>
|
||||
#include <Storages/MergeTree/MergeTreeSettings.h>
|
||||
@ -57,7 +58,6 @@ class ExpressionActions;
|
||||
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
|
||||
using ManyExpressionActions = std::vector<ExpressionActionsPtr>;
|
||||
class MergeTreeDeduplicationLog;
|
||||
class IBackgroundJobExecutor;
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
@ -827,9 +827,9 @@ public:
|
||||
PinnedPartUUIDsPtr getPinnedPartUUIDs() const;
|
||||
|
||||
/// Schedules background job to like merge/mutate/fetch an executor
|
||||
virtual bool scheduleDataProcessingJob(IBackgroundJobExecutor & executor) = 0;
|
||||
virtual bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) = 0;
|
||||
/// Schedules job to move parts between disks/volumes and so on.
|
||||
bool scheduleDataMovingJob(IBackgroundJobExecutor & executor);
|
||||
bool scheduleDataMovingJob(BackgroundJobsAssignee & assignee);
|
||||
bool areBackgroundMovesNeeded() const;
|
||||
|
||||
/// Lock part in zookeeper for shared data in several nodes
|
||||
@ -923,6 +923,23 @@ protected:
|
||||
|
||||
MergeTreePartsMover parts_mover;
|
||||
|
||||
/// Executors are common for both ReplicatedMergeTree and plain MergeTree
|
||||
/// but they are being started and finished in derived classes, so let them be protected.
|
||||
///
|
||||
/// Why there are two executors, not one? Or an executor for each kind of operation?
|
||||
/// It is historically formed.
|
||||
/// Another explanation is that moving operations are common for Replicated and Plain MergeTree classes.
|
||||
/// Task that schedules this operations is executed with its own timetable and triggered in a specific places in code.
|
||||
/// And for ReplicatedMergeTree we don't have LogEntry type for this operation.
|
||||
BackgroundJobsAssignee background_operations_assignee;
|
||||
BackgroundJobsAssignee background_moves_assignee;
|
||||
|
||||
/// Strongly connected with two fields above.
|
||||
/// Every task that is finished will ask to assign a new one into an executor.
|
||||
/// These callbacks will be passed to the constructor of each task.
|
||||
std::function<void(bool)> common_assignee_trigger;
|
||||
std::function<void(bool)> moves_assignee_trigger;
|
||||
|
||||
using DataPartIteratorByInfo = DataPartsIndexes::index<TagByInfo>::type::iterator;
|
||||
using DataPartIteratorByStateAndInfo = DataPartsIndexes::index<TagByStateAndInfo>::type::iterator;
|
||||
|
||||
|
@ -37,7 +37,7 @@ void MergeTreeSink::consume(Chunk chunk)
|
||||
PartLog::addNewPart(storage.getContext(), part, watch.elapsed());
|
||||
|
||||
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.
|
||||
storage.background_executor.triggerTask();
|
||||
storage.background_operations_assignee.trigger();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -627,7 +627,7 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
|
||||
}
|
||||
}
|
||||
|
||||
storage.background_executor.triggerTask();
|
||||
storage.background_operations_assignee.trigger();
|
||||
}
|
||||
|
||||
return stat.version;
|
||||
@ -716,7 +716,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
|
||||
}
|
||||
|
||||
if (some_active_mutations_were_killed)
|
||||
storage.background_executor.triggerTask();
|
||||
storage.background_operations_assignee.trigger();
|
||||
|
||||
if (!entries_to_load.empty())
|
||||
{
|
||||
@ -850,7 +850,7 @@ ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation(
|
||||
}
|
||||
|
||||
if (mutation_was_active)
|
||||
storage.background_executor.triggerTask();
|
||||
storage.background_operations_assignee.trigger();
|
||||
|
||||
return entry;
|
||||
}
|
||||
|
@ -204,7 +204,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
|
||||
storage.partial_shutdown_event.reset();
|
||||
|
||||
/// Start queue processing
|
||||
storage.background_executor.start();
|
||||
storage.background_operations_assignee.start();
|
||||
|
||||
storage.queue_updating_task->activateAndSchedule();
|
||||
storage.mutations_updating_task->activateAndSchedule();
|
||||
@ -389,7 +389,7 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
|
||||
auto fetch_lock = storage.fetcher.blocker.cancel();
|
||||
auto merge_lock = storage.merger_mutator.merges_blocker.cancel();
|
||||
auto move_lock = storage.parts_mover.moves_blocker.cancel();
|
||||
storage.background_executor.finish();
|
||||
storage.background_operations_assignee.finish();
|
||||
}
|
||||
|
||||
LOG_TRACE(log, "Threads finished");
|
||||
|
155
src/Storages/MergeTree/tests/gtest_executor.cpp
Normal file
155
src/Storages/MergeTree/tests/gtest_executor.cpp
Normal file
@ -0,0 +1,155 @@
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <barrier>
|
||||
#include <memory>
|
||||
#include <random>
|
||||
|
||||
#include <Storages/MergeTree/IExecutableTask.h>
|
||||
#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
|
||||
|
||||
using namespace DB;
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric BackgroundPoolTask;
|
||||
}
|
||||
|
||||
std::random_device device;
|
||||
|
||||
class FakeExecutableTask : public IExecutableTask
|
||||
{
|
||||
public:
|
||||
explicit FakeExecutableTask(String name_) : generator(device()), distribution(0, 5), name(name_)
|
||||
{
|
||||
}
|
||||
|
||||
bool executeStep() override
|
||||
{
|
||||
auto sleep_time = distribution(generator);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(5 * sleep_time));
|
||||
|
||||
auto choice = distribution(generator);
|
||||
if (choice == 0)
|
||||
throw std::runtime_error("Unlucky...");
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
StorageID getStorageID() override
|
||||
{
|
||||
return {"test", name};
|
||||
}
|
||||
|
||||
void onCompleted() override
|
||||
{
|
||||
auto choice = distribution(generator);
|
||||
if (choice == 0)
|
||||
throw std::runtime_error("Unlucky...");
|
||||
}
|
||||
|
||||
private:
|
||||
std::mt19937 generator;
|
||||
std::uniform_int_distribution<> distribution;
|
||||
|
||||
String name;
|
||||
std::function<void()> on_completed;
|
||||
};
|
||||
|
||||
|
||||
TEST(Executor, RemoveTasks)
|
||||
{
|
||||
const size_t tasks_kinds = 25;
|
||||
const size_t batch = 100;
|
||||
|
||||
auto executor = DB::MergeTreeBackgroundExecutor::create
|
||||
(
|
||||
DB::MergeTreeBackgroundExecutor::Type::MERGE_MUTATE,
|
||||
tasks_kinds,
|
||||
tasks_kinds * batch,
|
||||
CurrentMetrics::BackgroundPoolTask
|
||||
);
|
||||
|
||||
for (size_t i = 0; i < batch; ++i)
|
||||
for (size_t j = 0; j < tasks_kinds; ++j)
|
||||
ASSERT_TRUE(
|
||||
executor->trySchedule(std::make_shared<FakeExecutableTask>(std::to_string(j)))
|
||||
);
|
||||
|
||||
std::vector<std::thread> threads(batch);
|
||||
|
||||
auto remover_routine = [&] ()
|
||||
{
|
||||
for (size_t j = 0; j < tasks_kinds; ++j)
|
||||
executor->removeTasksCorrespondingToStorage({"test", std::to_string(j)});
|
||||
};
|
||||
|
||||
for (auto & thread : threads)
|
||||
thread = std::thread(remover_routine);
|
||||
|
||||
for (auto & thread : threads)
|
||||
thread.join();
|
||||
|
||||
ASSERT_EQ(executor->activeCount(), 0);
|
||||
ASSERT_EQ(executor->pendingCount(), 0);
|
||||
ASSERT_EQ(CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask], 0);
|
||||
|
||||
executor->wait();
|
||||
}
|
||||
|
||||
|
||||
TEST(Executor, RemoveTasksStress)
|
||||
{
|
||||
const size_t tasks_kinds = 25;
|
||||
const size_t batch = 100;
|
||||
const size_t schedulers_count = 5;
|
||||
const size_t removers_count = 5;
|
||||
|
||||
auto executor = DB::MergeTreeBackgroundExecutor::create
|
||||
(
|
||||
DB::MergeTreeBackgroundExecutor::Type::MERGE_MUTATE,
|
||||
tasks_kinds,
|
||||
tasks_kinds * batch * (schedulers_count + removers_count),
|
||||
CurrentMetrics::BackgroundPoolTask
|
||||
);
|
||||
|
||||
std::barrier barrier(schedulers_count + removers_count);
|
||||
|
||||
auto scheduler_routine = [&] ()
|
||||
{
|
||||
barrier.arrive_and_wait();
|
||||
for (size_t i = 0; i < batch; ++i)
|
||||
for (size_t j = 0; j < tasks_kinds; ++j)
|
||||
executor->trySchedule(std::make_shared<FakeExecutableTask>(std::to_string(j)));
|
||||
};
|
||||
|
||||
auto remover_routine = [&] ()
|
||||
{
|
||||
barrier.arrive_and_wait();
|
||||
for (size_t j = 0; j < tasks_kinds; ++j)
|
||||
executor->removeTasksCorrespondingToStorage({"test", std::to_string(j)});
|
||||
};
|
||||
|
||||
std::vector<std::thread> schedulers(schedulers_count);
|
||||
for (auto & scheduler : schedulers)
|
||||
scheduler = std::thread(scheduler_routine);
|
||||
|
||||
std::vector<std::thread> removers(removers_count);
|
||||
for (auto & remover : removers)
|
||||
remover = std::thread(remover_routine);
|
||||
|
||||
for (auto & scheduler : schedulers)
|
||||
scheduler.join();
|
||||
|
||||
for (auto & remover : removers)
|
||||
remover.join();
|
||||
|
||||
for (size_t j = 0; j < tasks_kinds; ++j)
|
||||
executor->removeTasksCorrespondingToStorage({"test", std::to_string(j)});
|
||||
|
||||
ASSERT_EQ(executor->activeCount(), 0);
|
||||
ASSERT_EQ(executor->pendingCount(), 0);
|
||||
ASSERT_EQ(CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask], 0);
|
||||
|
||||
executor->wait();
|
||||
}
|
@ -82,9 +82,6 @@ StorageMergeTree::StorageMergeTree(
|
||||
, reader(*this)
|
||||
, writer(*this)
|
||||
, merger_mutator(*this, getContext()->getSettingsRef().background_pool_size)
|
||||
, background_executor(*this, getContext())
|
||||
, background_moves_executor(*this, getContext())
|
||||
|
||||
{
|
||||
loadDataParts(has_force_restore_data_flag);
|
||||
|
||||
@ -115,7 +112,7 @@ void StorageMergeTree::startup()
|
||||
|
||||
try
|
||||
{
|
||||
background_executor.start();
|
||||
background_operations_assignee.start();
|
||||
startBackgroundMovesIfNeeded();
|
||||
}
|
||||
catch (...)
|
||||
@ -153,8 +150,8 @@ void StorageMergeTree::shutdown()
|
||||
merger_mutator.merges_blocker.cancelForever();
|
||||
parts_mover.moves_blocker.cancelForever();
|
||||
|
||||
background_executor.finish();
|
||||
background_moves_executor.finish();
|
||||
background_operations_assignee.finish();
|
||||
background_moves_assignee.finish();
|
||||
|
||||
try
|
||||
{
|
||||
@ -414,7 +411,7 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, String
|
||||
|
||||
LOG_INFO(log, "Added mutation: {}", mutation_file_name);
|
||||
}
|
||||
background_executor.triggerTask();
|
||||
background_operations_assignee.trigger();
|
||||
return version;
|
||||
}
|
||||
|
||||
@ -640,7 +637,7 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
|
||||
}
|
||||
|
||||
/// Maybe there is another mutation that was blocked by the killed one. Try to execute it immediately.
|
||||
background_executor.triggerTask();
|
||||
background_operations_assignee.trigger();
|
||||
|
||||
return CancellationCode::CancelSent;
|
||||
}
|
||||
@ -1044,7 +1041,7 @@ bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_sn
|
||||
return true;
|
||||
}
|
||||
|
||||
bool StorageMergeTree::scheduleDataProcessingJob(IBackgroundJobExecutor & executor) //-V657
|
||||
bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) //-V657
|
||||
{
|
||||
if (shutdown_called)
|
||||
return false;
|
||||
@ -1076,46 +1073,50 @@ bool StorageMergeTree::scheduleDataProcessingJob(IBackgroundJobExecutor & execut
|
||||
|
||||
if (merge_entry)
|
||||
{
|
||||
executor.execute({[this, metadata_snapshot, merge_entry, share_lock] () mutable
|
||||
{
|
||||
return mergeSelectedParts(metadata_snapshot, false, {}, *merge_entry, share_lock);
|
||||
}, PoolType::MERGE_MUTATE});
|
||||
assignee.scheduleMergeMutateTask(ExecutableLambdaAdapter::create(
|
||||
[this, metadata_snapshot, merge_entry, share_lock] () mutable
|
||||
{
|
||||
return mergeSelectedParts(metadata_snapshot, false, {}, *merge_entry, share_lock);
|
||||
}, common_assignee_trigger, getStorageID()));
|
||||
return true;
|
||||
}
|
||||
if (mutate_entry)
|
||||
{
|
||||
executor.execute({[this, metadata_snapshot, merge_entry, mutate_entry, share_lock] () mutable
|
||||
{
|
||||
assignee.scheduleMergeMutateTask(ExecutableLambdaAdapter::create(
|
||||
[this, metadata_snapshot, merge_entry, mutate_entry, share_lock] () mutable
|
||||
{
|
||||
return mutateSelectedPart(metadata_snapshot, *mutate_entry, share_lock);
|
||||
}, PoolType::MERGE_MUTATE});
|
||||
}, common_assignee_trigger, getStorageID()));
|
||||
return true;
|
||||
}
|
||||
bool executed = false;
|
||||
bool scheduled = false;
|
||||
if (time_after_previous_cleanup_temporary_directories.compareAndRestartDeferred(getContext()->getSettingsRef().merge_tree_clear_old_temporary_directories_interval_seconds))
|
||||
{
|
||||
executor.execute({[this, share_lock] ()
|
||||
{
|
||||
clearOldTemporaryDirectories(getSettings()->temporary_directories_lifetime.totalSeconds());
|
||||
return true;
|
||||
}, PoolType::MERGE_MUTATE});
|
||||
executed = true;
|
||||
assignee.scheduleMergeMutateTask(ExecutableLambdaAdapter::create(
|
||||
[this, share_lock] ()
|
||||
{
|
||||
clearOldTemporaryDirectories(getSettings()->temporary_directories_lifetime.totalSeconds());
|
||||
return true;
|
||||
}, common_assignee_trigger, getStorageID()));
|
||||
scheduled = true;
|
||||
}
|
||||
if (time_after_previous_cleanup_parts.compareAndRestartDeferred(getContext()->getSettingsRef().merge_tree_clear_old_parts_interval_seconds))
|
||||
{
|
||||
executor.execute({[this, share_lock] ()
|
||||
{
|
||||
/// All use relative_data_path which changes during rename
|
||||
/// so execute under share lock.
|
||||
clearOldPartsFromFilesystem();
|
||||
clearOldWriteAheadLogs();
|
||||
clearOldMutations();
|
||||
clearEmptyParts();
|
||||
return true;
|
||||
}, PoolType::MERGE_MUTATE});
|
||||
executed = true;
|
||||
assignee.scheduleMergeMutateTask(ExecutableLambdaAdapter::create(
|
||||
[this, share_lock] ()
|
||||
{
|
||||
/// All use relative_data_path which changes during rename
|
||||
/// so execute under share lock.
|
||||
clearOldPartsFromFilesystem();
|
||||
clearOldWriteAheadLogs();
|
||||
clearOldMutations();
|
||||
clearEmptyParts();
|
||||
return true;
|
||||
}, common_assignee_trigger, getStorageID()));
|
||||
scheduled = true;
|
||||
}
|
||||
|
||||
return executed;
|
||||
return scheduled;
|
||||
}
|
||||
|
||||
Int64 StorageMergeTree::getCurrentMutationVersion(
|
||||
@ -1566,9 +1567,9 @@ ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type)
|
||||
void StorageMergeTree::onActionLockRemove(StorageActionBlockType action_type)
|
||||
{
|
||||
if (action_type == ActionLocks::PartsMerge || action_type == ActionLocks::PartsTTLMerge)
|
||||
background_executor.triggerTask();
|
||||
background_operations_assignee.trigger();
|
||||
else if (action_type == ActionLocks::PartsMove)
|
||||
background_moves_executor.triggerTask();
|
||||
background_moves_assignee.trigger();
|
||||
}
|
||||
|
||||
CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_context)
|
||||
@ -1646,7 +1647,7 @@ MutationCommands StorageMergeTree::getFirstAlterMutationCommandsForPart(const Da
|
||||
void StorageMergeTree::startBackgroundMovesIfNeeded()
|
||||
{
|
||||
if (areBackgroundMovesNeeded())
|
||||
background_moves_executor.start();
|
||||
background_moves_assignee.start();
|
||||
}
|
||||
|
||||
std::unique_ptr<MergeTreeSettings> StorageMergeTree::getDefaultSettings() const
|
||||
|
@ -16,7 +16,6 @@
|
||||
|
||||
#include <Disks/StoragePolicy.h>
|
||||
#include <Common/SimpleIncrement.h>
|
||||
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -96,9 +95,10 @@ public:
|
||||
|
||||
RestoreDataTasks restoreFromBackup(const BackupPtr & backup, const String & data_path_in_backup, const ASTs & partitions, ContextMutablePtr context) override;
|
||||
|
||||
bool scheduleDataProcessingJob(IBackgroundJobExecutor & executor) override;
|
||||
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;
|
||||
|
||||
MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); }
|
||||
|
||||
private:
|
||||
|
||||
/// Mutex and condvar for synchronous mutations wait
|
||||
@ -108,8 +108,6 @@ private:
|
||||
MergeTreeDataSelectExecutor reader;
|
||||
MergeTreeDataWriter writer;
|
||||
MergeTreeDataMergerMutator merger_mutator;
|
||||
BackgroundJobsExecutor background_executor;
|
||||
BackgroundMovesExecutor background_moves_executor;
|
||||
|
||||
std::unique_ptr<MergeTreeDeduplicationLog> deduplication_log;
|
||||
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <Storages/MergeTree/IMergeTreeDataPart.h>
|
||||
#include <Storages/MergeTree/MergeList.h>
|
||||
#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
|
||||
#include <Storages/MergeTree/MergedBlockOutputStream.h>
|
||||
#include <Storages/MergeTree/PinnedPartUUIDs.h>
|
||||
#include <Storages/MergeTree/PartitionPruner.h>
|
||||
@ -283,8 +284,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
||||
, merge_strategy_picker(*this)
|
||||
, queue(*this, merge_strategy_picker)
|
||||
, fetcher(*this)
|
||||
, background_executor(*this, getContext())
|
||||
, background_moves_executor(*this, getContext())
|
||||
, cleanup_thread(*this)
|
||||
, part_check_thread(*this)
|
||||
, restarting_thread(*this)
|
||||
@ -3180,7 +3179,7 @@ bool StorageReplicatedMergeTree::processQueueEntry(ReplicatedMergeTreeQueue::Sel
|
||||
});
|
||||
}
|
||||
|
||||
bool StorageReplicatedMergeTree::scheduleDataProcessingJob(IBackgroundJobExecutor & executor)
|
||||
bool StorageReplicatedMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assignee)
|
||||
{
|
||||
/// If replication queue is stopped exit immediately as we successfully executed the task
|
||||
if (queue.actions_blocker.isCancelled())
|
||||
@ -3195,18 +3194,20 @@ bool StorageReplicatedMergeTree::scheduleDataProcessingJob(IBackgroundJobExecuto
|
||||
/// Depending on entry type execute in fetches (small) pool or big merge_mutate pool
|
||||
if (selected_entry->log_entry->type == LogEntry::GET_PART)
|
||||
{
|
||||
executor.execute({[this, selected_entry] () mutable
|
||||
{
|
||||
return processQueueEntry(selected_entry);
|
||||
}, PoolType::FETCH});
|
||||
assignee.scheduleFetchTask(ExecutableLambdaAdapter::create(
|
||||
[this, selected_entry] () mutable
|
||||
{
|
||||
return processQueueEntry(selected_entry);
|
||||
}, common_assignee_trigger, getStorageID()));
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
executor.execute({[this, selected_entry] () mutable
|
||||
{
|
||||
return processQueueEntry(selected_entry);
|
||||
}, PoolType::MERGE_MUTATE});
|
||||
assignee.scheduleMergeMutateTask(ExecutableLambdaAdapter::create(
|
||||
[this, selected_entry] () mutable
|
||||
{
|
||||
return processQueueEntry(selected_entry);
|
||||
}, common_assignee_trigger, getStorageID()));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@ -4342,7 +4343,7 @@ void StorageReplicatedMergeTree::shutdown()
|
||||
parts_mover.moves_blocker.cancelForever();
|
||||
|
||||
restarting_thread.shutdown();
|
||||
background_executor.finish();
|
||||
background_operations_assignee.finish();
|
||||
part_moves_between_shards_orchestrator.shutdown();
|
||||
|
||||
{
|
||||
@ -4352,7 +4353,7 @@ void StorageReplicatedMergeTree::shutdown()
|
||||
/// MUTATE, etc. query.
|
||||
queue.pull_log_blocker.cancelForever();
|
||||
}
|
||||
background_moves_executor.finish();
|
||||
background_moves_assignee.finish();
|
||||
|
||||
auto data_parts_exchange_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, InterserverIOEndpointPtr{});
|
||||
if (data_parts_exchange_ptr)
|
||||
@ -6952,9 +6953,9 @@ void StorageReplicatedMergeTree::onActionLockRemove(StorageActionBlockType actio
|
||||
if (action_type == ActionLocks::PartsMerge || action_type == ActionLocks::PartsTTLMerge
|
||||
|| action_type == ActionLocks::PartsFetch || action_type == ActionLocks::PartsSend
|
||||
|| action_type == ActionLocks::ReplicationQueue)
|
||||
background_executor.triggerTask();
|
||||
background_operations_assignee.trigger();
|
||||
else if (action_type == ActionLocks::PartsMove)
|
||||
background_moves_executor.triggerTask();
|
||||
background_moves_assignee.trigger();
|
||||
}
|
||||
|
||||
bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UInt64 max_wait_milliseconds)
|
||||
@ -6966,7 +6967,7 @@ bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UI
|
||||
|
||||
/// This is significant, because the execution of this task could be delayed at BackgroundPool.
|
||||
/// And we force it to be executed.
|
||||
background_executor.triggerTask();
|
||||
background_operations_assignee.trigger();
|
||||
|
||||
Poco::Event target_size_event;
|
||||
auto callback = [&target_size_event, queue_size] (size_t new_queue_size)
|
||||
@ -7200,7 +7201,7 @@ MutationCommands StorageReplicatedMergeTree::getFirstAlterMutationCommandsForPar
|
||||
void StorageReplicatedMergeTree::startBackgroundMovesIfNeeded()
|
||||
{
|
||||
if (areBackgroundMovesNeeded())
|
||||
background_moves_executor.start();
|
||||
background_moves_assignee.start();
|
||||
}
|
||||
|
||||
std::unique_ptr<MergeTreeSettings> StorageReplicatedMergeTree::getDefaultSettings() const
|
||||
|
@ -29,7 +29,7 @@
|
||||
#include <Common/Throttler.h>
|
||||
#include <Core/BackgroundSchedulePool.h>
|
||||
#include <Processors/Pipe.h>
|
||||
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
|
||||
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -218,7 +218,7 @@ public:
|
||||
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger);
|
||||
|
||||
/// Schedules job to execute in background pool (merge, mutate, drop range and so on)
|
||||
bool scheduleDataProcessingJob(IBackgroundJobExecutor & executor) override;
|
||||
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;
|
||||
|
||||
/// Checks that fetches are not disabled with action blocker and pool for fetches
|
||||
/// is not overloaded
|
||||
@ -350,9 +350,6 @@ private:
|
||||
int metadata_version = 0;
|
||||
/// Threads.
|
||||
|
||||
BackgroundJobsExecutor background_executor;
|
||||
BackgroundMovesExecutor background_moves_executor;
|
||||
|
||||
/// A task that keeps track of the updates in the logs of all replicas and loads them into the queue.
|
||||
bool queue_update_in_progress = false;
|
||||
BackgroundSchedulePool::TaskHolder queue_updating_task;
|
||||
|
@ -28,7 +28,7 @@ SRCS(
|
||||
MemorySettings.cpp
|
||||
MergeTree/ActiveDataPartSet.cpp
|
||||
MergeTree/AllMergeSelector.cpp
|
||||
MergeTree/BackgroundJobsExecutor.cpp
|
||||
MergeTree/BackgroundJobsAssignee.cpp
|
||||
MergeTree/BoolMask.cpp
|
||||
MergeTree/DataPartsExchange.cpp
|
||||
MergeTree/DropPartsRanges.cpp
|
||||
@ -41,6 +41,7 @@ SRCS(
|
||||
MergeTree/LevelMergeSelector.cpp
|
||||
MergeTree/MergeAlgorithm.cpp
|
||||
MergeTree/MergeList.cpp
|
||||
MergeTree/MergeTreeBackgroundExecutor.cpp
|
||||
MergeTree/MergeTreeBaseSelectProcessor.cpp
|
||||
MergeTree/MergeTreeBlockReadUtils.cpp
|
||||
MergeTree/MergeTreeData.cpp
|
||||
|
@ -1,4 +1,4 @@
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
[[1]]
|
||||
[[1]]
|
||||
[[1]]
|
||||
[[1]]
|
||||
|
@ -6,7 +6,8 @@ CREATE TABLE table_with_single_pk
|
||||
value String
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY key;
|
||||
ORDER BY key
|
||||
SETTINGS min_compress_block_size=65536, max_compress_block_size=65536;
|
||||
|
||||
INSERT INTO table_with_single_pk SELECT number, toString(number % 10) FROM numbers(10000000);
|
||||
|
||||
@ -15,9 +16,9 @@ ALTER TABLE table_with_single_pk DELETE WHERE key % 77 = 0 SETTINGS mutations_sy
|
||||
SYSTEM FLUSH LOGS;
|
||||
|
||||
-- Memory usage for all mutations must be almost constant and less than
|
||||
-- read_bytes.
|
||||
-- read_bytes
|
||||
SELECT
|
||||
DISTINCT read_bytes >= peak_memory_usage
|
||||
arrayDistinct(groupArray(if (read_bytes >= peak_memory_usage, [1], [read_bytes, peak_memory_usage])))
|
||||
FROM
|
||||
system.part_log
|
||||
WHERE event_type = 'MutatePart' AND table = 'table_with_single_pk' AND database = currentDatabase();
|
||||
@ -34,7 +35,8 @@ CREATE TABLE table_with_multi_pk
|
||||
value String
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY (key1, key2, key3);
|
||||
ORDER BY (key1, key2, key3)
|
||||
SETTINGS min_compress_block_size=65536, max_compress_block_size=65536;
|
||||
|
||||
INSERT INTO table_with_multi_pk SELECT number % 32, number, toDateTime('2019-10-01 00:00:00'), toString(number % 10) FROM numbers(10000000);
|
||||
|
||||
@ -43,9 +45,9 @@ ALTER TABLE table_with_multi_pk DELETE WHERE key1 % 77 = 0 SETTINGS mutations_sy
|
||||
SYSTEM FLUSH LOGS;
|
||||
|
||||
-- Memory usage for all mutations must be almost constant and less than
|
||||
-- read_bytes.
|
||||
-- read_bytes
|
||||
SELECT
|
||||
DISTINCT read_bytes >= peak_memory_usage
|
||||
arrayDistinct(groupArray(if (read_bytes >= peak_memory_usage, [1], [read_bytes, peak_memory_usage])))
|
||||
FROM
|
||||
system.part_log
|
||||
WHERE event_type = 'MutatePart' AND table = 'table_with_multi_pk' AND database = currentDatabase();
|
||||
@ -64,7 +66,8 @@ CREATE TABLE table_with_function_pk
|
||||
value String
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY (cast(value as UInt64), key2);
|
||||
ORDER BY (cast(value as UInt64), key2)
|
||||
SETTINGS min_compress_block_size=65536, max_compress_block_size=65536;
|
||||
|
||||
INSERT INTO table_with_function_pk SELECT number % 32, number, toDateTime('2019-10-01 00:00:00'), toString(number % 10) FROM numbers(10000000);
|
||||
|
||||
@ -73,9 +76,9 @@ ALTER TABLE table_with_function_pk DELETE WHERE key1 % 77 = 0 SETTINGS mutations
|
||||
SYSTEM FLUSH LOGS;
|
||||
|
||||
-- Memory usage for all mutations must be almost constant and less than
|
||||
-- read_bytes.
|
||||
-- read_bytes
|
||||
SELECT
|
||||
DISTINCT read_bytes >= peak_memory_usage
|
||||
arrayDistinct(groupArray(if (read_bytes >= peak_memory_usage, [1], [read_bytes, peak_memory_usage])))
|
||||
FROM
|
||||
system.part_log
|
||||
WHERE event_type = 'MutatePart' AND table = 'table_with_function_pk' AND database = currentDatabase();
|
||||
@ -92,7 +95,8 @@ CREATE TABLE table_without_pk
|
||||
value String
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY tuple();
|
||||
ORDER BY tuple()
|
||||
SETTINGS min_compress_block_size=65536, max_compress_block_size=65536;
|
||||
|
||||
INSERT INTO table_without_pk SELECT number % 32, number, toDateTime('2019-10-01 00:00:00'), toString(number % 10) FROM numbers(10000000);
|
||||
|
||||
@ -101,9 +105,9 @@ ALTER TABLE table_without_pk DELETE WHERE key1 % 77 = 0 SETTINGS mutations_sync
|
||||
SYSTEM FLUSH LOGS;
|
||||
|
||||
-- Memory usage for all mutations must be almost constant and less than
|
||||
-- read_bytes.
|
||||
-- read_bytes
|
||||
SELECT
|
||||
DISTINCT read_bytes >= peak_memory_usage
|
||||
arrayDistinct(groupArray(if (read_bytes >= peak_memory_usage, [1], [read_bytes, peak_memory_usage])))
|
||||
FROM
|
||||
system.part_log
|
||||
WHERE event_type = 'MutatePart' AND table = 'table_without_pk' AND database = currentDatabase();
|
||||
|
Loading…
Reference in New Issue
Block a user