mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Better
This commit is contained in:
parent
e398af08dc
commit
6062dd0021
@ -62,8 +62,8 @@ private:
|
|||||||
void logMemoryUsage(Int64 current) const;
|
void logMemoryUsage(Int64 current) const;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
MemoryTracker(VariableContext level_ = VariableContext::Thread);
|
explicit MemoryTracker(VariableContext level_ = VariableContext::Thread);
|
||||||
MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread);
|
explicit MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread);
|
||||||
|
|
||||||
~MemoryTracker();
|
~MemoryTracker();
|
||||||
|
|
||||||
|
@ -2729,19 +2729,21 @@ PartUUIDsPtr Context::getIgnoredPartUUIDs() const
|
|||||||
|
|
||||||
void Context::initializeBackgroundExecutors()
|
void Context::initializeBackgroundExecutors()
|
||||||
{
|
{
|
||||||
|
// Initialize background executors with callbacks to be able to change pool size and tasks count at runtime.
|
||||||
|
|
||||||
shared->merge_mutate_executor = MergeTreeBackgroundExecutor::create
|
shared->merge_mutate_executor = MergeTreeBackgroundExecutor::create
|
||||||
(
|
(
|
||||||
MergeTreeBackgroundExecutor::Type::MERGE_MUTATE,
|
MergeTreeBackgroundExecutor::Type::MERGE_MUTATE,
|
||||||
[this] () { return getSettingsRef().background_pool_size; },
|
[this] () { auto lock = getLock(); return getSettingsRef().background_pool_size; },
|
||||||
[this] () { return getSettingsRef().background_pool_size; },
|
[this] () { auto lock = getLock(); return getSettingsRef().background_pool_size; },
|
||||||
CurrentMetrics::BackgroundPoolTask
|
CurrentMetrics::BackgroundPoolTask
|
||||||
);
|
);
|
||||||
|
|
||||||
shared->moves_executor = MergeTreeBackgroundExecutor::create
|
shared->moves_executor = MergeTreeBackgroundExecutor::create
|
||||||
(
|
(
|
||||||
MergeTreeBackgroundExecutor::Type::MOVE,
|
MergeTreeBackgroundExecutor::Type::MOVE,
|
||||||
[this] () { return getSettingsRef().background_move_pool_size; },
|
[this] () { auto lock = getLock(); return getSettingsRef().background_move_pool_size; },
|
||||||
[this] () { return getSettingsRef().background_move_pool_size; },
|
[this] () { auto lock = getLock(); return getSettingsRef().background_move_pool_size; },
|
||||||
CurrentMetrics::BackgroundMovePoolTask
|
CurrentMetrics::BackgroundMovePoolTask
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -2749,8 +2751,8 @@ void Context::initializeBackgroundExecutors()
|
|||||||
shared->fetch_executor = MergeTreeBackgroundExecutor::create
|
shared->fetch_executor = MergeTreeBackgroundExecutor::create
|
||||||
(
|
(
|
||||||
MergeTreeBackgroundExecutor::Type::FETCH,
|
MergeTreeBackgroundExecutor::Type::FETCH,
|
||||||
[this] () { return getSettingsRef().background_fetches_pool_size; },
|
[this] () { auto lock = getLock(); return getSettingsRef().background_fetches_pool_size; },
|
||||||
[this] () { return getSettingsRef().background_fetches_pool_size; },
|
[this] () { auto lock = getLock(); return getSettingsRef().background_fetches_pool_size; },
|
||||||
CurrentMetrics::BackgroundFetchesPoolTask
|
CurrentMetrics::BackgroundFetchesPoolTask
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -85,7 +85,7 @@ void BackgroundJobsAssignee::start()
|
|||||||
{
|
{
|
||||||
std::lock_guard lock(holder_mutex);
|
std::lock_guard lock(holder_mutex);
|
||||||
if (!holder)
|
if (!holder)
|
||||||
holder = getContext()->getSchedulePool().createTask("BackgroundJobsAssignee:" + toString(type), [this]{ main(); });
|
holder = getContext()->getSchedulePool().createTask("BackgroundJobsAssignee:" + toString(type), [this]{ threadFunc(); });
|
||||||
|
|
||||||
holder->activateAndSchedule();
|
holder->activateAndSchedule();
|
||||||
}
|
}
|
||||||
@ -106,7 +106,7 @@ void BackgroundJobsAssignee::finish()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void BackgroundJobsAssignee::main()
|
void BackgroundJobsAssignee::threadFunc()
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
bool succeed = false;
|
bool succeed = false;
|
||||||
|
@ -29,7 +29,7 @@ struct ExecutableTaskSchedulingSettings
|
|||||||
|
|
||||||
class MergeTreeData;
|
class MergeTreeData;
|
||||||
|
|
||||||
class BackgroundJobsAssignee : protected WithContext
|
class BackgroundJobsAssignee : public WithContext
|
||||||
{
|
{
|
||||||
private:
|
private:
|
||||||
MergeTreeData & data;
|
MergeTreeData & data;
|
||||||
@ -49,6 +49,7 @@ private:
|
|||||||
std::mutex holder_mutex;
|
std::mutex holder_mutex;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
|
///
|
||||||
enum class Type
|
enum class Type
|
||||||
{
|
{
|
||||||
DataProcessing,
|
DataProcessing,
|
||||||
@ -77,7 +78,7 @@ private:
|
|||||||
static String toString(Type type);
|
static String toString(Type type);
|
||||||
|
|
||||||
/// Function that executes in background scheduling pool
|
/// Function that executes in background scheduling pool
|
||||||
void main();
|
void threadFunc();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -9,10 +9,21 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generic interface for background operations. Simply this is self-made coroutine.
|
||||||
|
* The main method is executeStep, which will return true
|
||||||
|
* if the task wants to execute another 'step' in near future and false otherwise.
|
||||||
|
*
|
||||||
|
* Each storage assigns some operations such as merges, mutations, fetches, etc.
|
||||||
|
* We need to ask a storage or some another entity to try to assign another operation when current operation is completed.
|
||||||
|
*
|
||||||
|
* Each task corresponds to a storage, that's why there is a method getStorageID.
|
||||||
|
* This is needed to correctly shutdown a storage, e.g. we need to wait for all background operations to complete.
|
||||||
|
*/
|
||||||
class IExecutableTask
|
class IExecutableTask
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
virtual bool execute() = 0;
|
virtual bool executeStep() = 0;
|
||||||
virtual void onCompleted() = 0;
|
virtual void onCompleted() = 0;
|
||||||
virtual StorageID getStorageID() = 0;
|
virtual StorageID getStorageID() = 0;
|
||||||
virtual ~IExecutableTask() = default;
|
virtual ~IExecutableTask() = default;
|
||||||
@ -21,29 +32,37 @@ public:
|
|||||||
using ExecutableTaskPtr = std::shared_ptr<IExecutableTask>;
|
using ExecutableTaskPtr = std::shared_ptr<IExecutableTask>;
|
||||||
|
|
||||||
|
|
||||||
class LambdaAdapter : public shared_ptr_helper<LambdaAdapter>, public IExecutableTask
|
/**
|
||||||
|
* Some background operations won't represent a coroutines (don't want to be executed step-by-step). For this we have this wrapper.
|
||||||
|
*/
|
||||||
|
class ExecutableLambdaAdapter : public shared_ptr_helper<ExecutableLambdaAdapter>, public IExecutableTask
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
|
||||||
template <typename InnerJob, typename Callback>
|
template <typename Job, typename Callback>
|
||||||
explicit LambdaAdapter(InnerJob && inner_, Callback && callback_, StorageID id_)
|
explicit ExecutableLambdaAdapter(
|
||||||
: inner(inner_), callback(callback_), id(id_) {}
|
Job && job_to_execute_,
|
||||||
|
Callback && job_result_callback_,
|
||||||
|
StorageID id_)
|
||||||
|
: job_to_execute(job_to_execute_)
|
||||||
|
, job_result_callback(job_result_callback_)
|
||||||
|
, id(id_) {}
|
||||||
|
|
||||||
bool execute() override
|
bool executeStep() override
|
||||||
{
|
{
|
||||||
res = inner();
|
res = job_to_execute();
|
||||||
inner = {};
|
job_to_execute = {};
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void onCompleted() override { callback(!res); }
|
void onCompleted() override { job_result_callback(!res); }
|
||||||
|
|
||||||
StorageID getStorageID() override { return id; }
|
StorageID getStorageID() override { return id; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool res = false;
|
bool res = false;
|
||||||
std::function<bool()> inner;
|
std::function<bool()> job_to_execute;
|
||||||
std::function<void(bool)> callback;
|
std::function<void(bool)> job_result_callback;
|
||||||
StorageID id;
|
StorageID id;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -39,6 +39,9 @@ void MergeTreeBackgroundExecutor::updateConfiguration()
|
|||||||
pool.setMaxThreads(new_threads_count);
|
pool.setMaxThreads(new_threads_count);
|
||||||
pool.setQueueSize(new_max_tasks_count);
|
pool.setQueueSize(new_max_tasks_count);
|
||||||
|
|
||||||
|
std::cout << "threads_count " << threads_count << std::endl;
|
||||||
|
|
||||||
|
/// We don't enter this loop if size is decreased.
|
||||||
for (size_t number = threads_count; number < new_threads_count; ++number)
|
for (size_t number = threads_count; number < new_threads_count; ++number)
|
||||||
pool.scheduleOrThrowOnError([this, number] { threadFunction(number); });
|
pool.scheduleOrThrowOnError([this, number] { threadFunction(number); });
|
||||||
}
|
}
|
||||||
@ -74,6 +77,8 @@ bool MergeTreeBackgroundExecutor::trySchedule(ExecutableTaskPtr task)
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
/// This is needed to increase / decrease the number of threads at runtime
|
/// This is needed to increase / decrease the number of threads at runtime
|
||||||
|
/// Using stopwatch here not to do it so often.
|
||||||
|
/// No need to move the time to a config.
|
||||||
if (update_timer.compareAndRestartDeferred(10.))
|
if (update_timer.compareAndRestartDeferred(10.))
|
||||||
updateConfiguration();
|
updateConfiguration();
|
||||||
}
|
}
|
||||||
@ -86,7 +91,7 @@ bool MergeTreeBackgroundExecutor::trySchedule(ExecutableTaskPtr task)
|
|||||||
if (value.load() >= static_cast<int64_t>(max_tasks_count))
|
if (value.load() >= static_cast<int64_t>(max_tasks_count))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
pending.push_back(std::make_shared<Item>(std::move(task), metric));
|
pending.push_back(std::make_shared<TaskRuntimeData>(std::move(task), metric));
|
||||||
|
|
||||||
has_tasks.notify_one();
|
has_tasks.notify_one();
|
||||||
return true;
|
return true;
|
||||||
@ -95,7 +100,7 @@ bool MergeTreeBackgroundExecutor::trySchedule(ExecutableTaskPtr task)
|
|||||||
|
|
||||||
void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id)
|
void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id)
|
||||||
{
|
{
|
||||||
std::vector<ItemPtr> tasks_to_wait;
|
std::vector<TaskRuntimeDataPtr> tasks_to_wait;
|
||||||
{
|
{
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
|
|
||||||
@ -118,7 +123,7 @@ void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void MergeTreeBackgroundExecutor::routine(ItemPtr item)
|
void MergeTreeBackgroundExecutor::routine(TaskRuntimeDataPtr item)
|
||||||
{
|
{
|
||||||
auto erase_from_active = [this, item]
|
auto erase_from_active = [this, item]
|
||||||
{
|
{
|
||||||
@ -127,7 +132,7 @@ void MergeTreeBackgroundExecutor::routine(ItemPtr item)
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (item->task->execute())
|
if (item->task->executeStep())
|
||||||
{
|
{
|
||||||
std::lock_guard guard(mutex);
|
std::lock_guard guard(mutex);
|
||||||
|
|
||||||
@ -143,9 +148,12 @@ void MergeTreeBackgroundExecutor::routine(ItemPtr item)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::lock_guard guard(mutex);
|
{
|
||||||
erase_from_active();
|
std::lock_guard guard(mutex);
|
||||||
has_tasks.notify_one();
|
erase_from_active();
|
||||||
|
has_tasks.notify_one();
|
||||||
|
}
|
||||||
|
|
||||||
/// In a situation of a lack of memory this method can throw an exception,
|
/// In a situation of a lack of memory this method can throw an exception,
|
||||||
/// because it may interact somehow with BackgroundSchedulePool, which may allocate memory
|
/// because it may interact somehow with BackgroundSchedulePool, which may allocate memory
|
||||||
/// But it is rather safe, because we have try...catch block here, and another one in ThreadPool.
|
/// But it is rather safe, because we have try...catch block here, and another one in ThreadPool.
|
||||||
@ -170,11 +178,12 @@ void MergeTreeBackgroundExecutor::threadFunction(size_t number)
|
|||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
ItemPtr item;
|
TaskRuntimeDataPtr item;
|
||||||
{
|
{
|
||||||
std::unique_lock lock(mutex);
|
std::unique_lock lock(mutex);
|
||||||
has_tasks.wait(lock, [this](){ return !pending.empty() || shutdown; });
|
has_tasks.wait(lock, [this](){ return !pending.empty() || shutdown; });
|
||||||
|
|
||||||
|
/// Decrease the number of threads (setting could be dynamically reloaded)
|
||||||
if (number >= threads_count)
|
if (number >= threads_count)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
@ -25,15 +25,25 @@ namespace DB
|
|||||||
* It can execute only successors of ExecutableTask interface.
|
* It can execute only successors of ExecutableTask interface.
|
||||||
* Which is a self-written coroutine. It suspends, when returns true from execute() method.
|
* Which is a self-written coroutine. It suspends, when returns true from execute() method.
|
||||||
*
|
*
|
||||||
* Executor consists of ThreadPool to execute pieces of a task (basically calls 'execute' on a task)
|
|
||||||
* and a scheduler thread, which manages the tasks. Due to bad experience of working with high memory under
|
|
||||||
* high memory pressure scheduler thread mustn't do any allocations,
|
|
||||||
* because it will be a fatal error if this thread will die from a random exception.
|
|
||||||
*
|
|
||||||
* There are two queues of a tasks: pending (main queue for all the tasks) and active (currently executing).
|
* There are two queues of a tasks: pending (main queue for all the tasks) and active (currently executing).
|
||||||
|
* Pending queue is needed since the number of tasks will be more than thread to execute.
|
||||||
|
* Pending tasks are tasks that successfully scheduled to an executor or tasks that have some extra steps to execute.
|
||||||
* There is an invariant, that task may occur only in one of these queue. It can occur in both queues only in critical sections.
|
* There is an invariant, that task may occur only in one of these queue. It can occur in both queues only in critical sections.
|
||||||
*
|
*
|
||||||
* Due to all caveats I described above we use boost::circular_buffer as a container for queues.
|
* Pending: Active:
|
||||||
|
*
|
||||||
|
* |s| |s| |s| |s| |s| |s| |s| |s| |s| |s| |s|
|
||||||
|
* |s| |s| |s| |s| |s| |s| |s| |s| |s| |s|
|
||||||
|
* |s| |s| |s| |s| |s| |s| |s|
|
||||||
|
* |s| |s| |s| |s|
|
||||||
|
* |s| |s|
|
||||||
|
* |s|
|
||||||
|
*
|
||||||
|
* Each task is simply a sequence of steps. Heavier tasks have longer sequences.
|
||||||
|
* When a step of a task is executed, we move tasks to pending queue. And take another from the queue's head.
|
||||||
|
* With these architecture all small merges / mutations will be executed faster, than bigger ones.
|
||||||
|
*
|
||||||
|
* We use boost::circular_buffer as a container for queues not to do any allocations.
|
||||||
*
|
*
|
||||||
* Another nuisance that we faces with is than background operations always interact with an associated Storage.
|
* Another nuisance that we faces with is than background operations always interact with an associated Storage.
|
||||||
* So, when a Storage want to shutdown, it must wait until all its background operaions are finished.
|
* So, when a Storage want to shutdown, it must wait until all its background operaions are finished.
|
||||||
@ -43,7 +53,6 @@ class MergeTreeBackgroundExecutor : public shared_ptr_helper<MergeTreeBackground
|
|||||||
public:
|
public:
|
||||||
|
|
||||||
using CountGetter = std::function<size_t()>;
|
using CountGetter = std::function<size_t()>;
|
||||||
using Callback = std::function<void()>;
|
|
||||||
|
|
||||||
enum class Type
|
enum class Type
|
||||||
{
|
{
|
||||||
@ -107,13 +116,16 @@ private:
|
|||||||
|
|
||||||
AtomicStopwatch update_timer;
|
AtomicStopwatch update_timer;
|
||||||
|
|
||||||
struct Item
|
/**
|
||||||
|
* Has RAII class to determine how many tasks are waiting for the execution and executing at the moment.
|
||||||
|
* Also has some flags and primitives to wait for current task to be executed.
|
||||||
|
*/
|
||||||
|
struct TaskRuntimeData
|
||||||
{
|
{
|
||||||
explicit Item(ExecutableTaskPtr && task_, CurrentMetrics::Metric metric_)
|
TaskRuntimeData(ExecutableTaskPtr && task_, CurrentMetrics::Metric metric_)
|
||||||
: task(std::move(task_))
|
: task(std::move(task_))
|
||||||
, increment(std::move(metric_))
|
, increment(std::move(metric_))
|
||||||
{
|
{}
|
||||||
}
|
|
||||||
|
|
||||||
ExecutableTaskPtr task;
|
ExecutableTaskPtr task;
|
||||||
CurrentMetrics::Increment increment;
|
CurrentMetrics::Increment increment;
|
||||||
@ -124,14 +136,16 @@ private:
|
|||||||
Poco::Event is_done{/*autoreset=*/false};
|
Poco::Event is_done{/*autoreset=*/false};
|
||||||
};
|
};
|
||||||
|
|
||||||
using ItemPtr = std::shared_ptr<Item>;
|
using TaskRuntimeDataPtr = std::shared_ptr<TaskRuntimeData>;
|
||||||
|
|
||||||
void routine(ItemPtr item);
|
void routine(TaskRuntimeDataPtr item);
|
||||||
|
|
||||||
|
/// Number all the threads in ThreadPool. To be able to lower the number of threads in runtime.
|
||||||
void threadFunction(size_t number);
|
void threadFunction(size_t number);
|
||||||
|
|
||||||
/// Initially it will be empty
|
/// Initially it will be empty
|
||||||
boost::circular_buffer<ItemPtr> pending{0};
|
boost::circular_buffer<TaskRuntimeDataPtr> pending{0};
|
||||||
boost::circular_buffer<ItemPtr> active{0};
|
boost::circular_buffer<TaskRuntimeDataPtr> active{0};
|
||||||
|
|
||||||
std::mutex mutex;
|
std::mutex mutex;
|
||||||
std::condition_variable has_tasks;
|
std::condition_variable has_tasks;
|
||||||
|
@ -200,8 +200,8 @@ MergeTreeData::MergeTreeData(
|
|||||||
, data_parts_by_info(data_parts_indexes.get<TagByInfo>())
|
, data_parts_by_info(data_parts_indexes.get<TagByInfo>())
|
||||||
, data_parts_by_state_and_info(data_parts_indexes.get<TagByStateAndInfo>())
|
, data_parts_by_state_and_info(data_parts_indexes.get<TagByStateAndInfo>())
|
||||||
, parts_mover(this)
|
, parts_mover(this)
|
||||||
, background_executor(*this, BackgroundJobsAssignee::Type::DataProcessing, getContext())
|
, background_operations_assignee(*this, BackgroundJobsAssignee::Type::DataProcessing, getContext())
|
||||||
, background_moves_executor(*this, BackgroundJobsAssignee::Type::Moving, getContext())
|
, background_moves_assignee(*this, BackgroundJobsAssignee::Type::Moving, getContext())
|
||||||
{
|
{
|
||||||
const auto settings = getSettings();
|
const auto settings = getSettings();
|
||||||
allow_nullable_key = attach || settings->allow_nullable_key;
|
allow_nullable_key = attach || settings->allow_nullable_key;
|
||||||
@ -311,17 +311,17 @@ MergeTreeData::MergeTreeData(
|
|||||||
common_assignee_trigger = [this] (bool delay) noexcept
|
common_assignee_trigger = [this] (bool delay) noexcept
|
||||||
{
|
{
|
||||||
if (delay)
|
if (delay)
|
||||||
background_executor.postpone();
|
background_operations_assignee.postpone();
|
||||||
else
|
else
|
||||||
background_executor.trigger();
|
background_operations_assignee.trigger();
|
||||||
};
|
};
|
||||||
|
|
||||||
moves_assignee_trigger = [this] (bool delay) noexcept
|
moves_assignee_trigger = [this] (bool delay) noexcept
|
||||||
{
|
{
|
||||||
if (delay)
|
if (delay)
|
||||||
background_moves_executor.postpone();
|
background_moves_assignee.postpone();
|
||||||
else
|
else
|
||||||
background_moves_executor.trigger();
|
background_moves_assignee.trigger();
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -5029,7 +5029,7 @@ MergeTreeData::CurrentlyMovingPartsTagger::~CurrentlyMovingPartsTagger()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & executor)
|
bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & assignee)
|
||||||
{
|
{
|
||||||
if (parts_mover.moves_blocker.isCancelled())
|
if (parts_mover.moves_blocker.isCancelled())
|
||||||
return false;
|
return false;
|
||||||
@ -5038,7 +5038,7 @@ bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & executor)
|
|||||||
if (moving_tagger->parts_to_move.empty())
|
if (moving_tagger->parts_to_move.empty())
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
executor.scheduleMoveTask(LambdaAdapter::create(
|
assignee.scheduleMoveTask(ExecutableLambdaAdapter::create(
|
||||||
[this, moving_tagger] () mutable
|
[this, moving_tagger] () mutable
|
||||||
{
|
{
|
||||||
return moveParts(moving_tagger);
|
return moveParts(moving_tagger);
|
||||||
|
@ -827,9 +827,9 @@ public:
|
|||||||
PinnedPartUUIDsPtr getPinnedPartUUIDs() const;
|
PinnedPartUUIDsPtr getPinnedPartUUIDs() const;
|
||||||
|
|
||||||
/// Schedules background job to like merge/mutate/fetch an executor
|
/// Schedules background job to like merge/mutate/fetch an executor
|
||||||
virtual bool scheduleDataProcessingJob(BackgroundJobsAssignee & executor) = 0;
|
virtual bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) = 0;
|
||||||
/// Schedules job to move parts between disks/volumes and so on.
|
/// Schedules job to move parts between disks/volumes and so on.
|
||||||
bool scheduleDataMovingJob(BackgroundJobsAssignee & executor);
|
bool scheduleDataMovingJob(BackgroundJobsAssignee & assignee);
|
||||||
bool areBackgroundMovesNeeded() const;
|
bool areBackgroundMovesNeeded() const;
|
||||||
|
|
||||||
/// Lock part in zookeeper for shared data in several nodes
|
/// Lock part in zookeeper for shared data in several nodes
|
||||||
@ -925,10 +925,18 @@ protected:
|
|||||||
|
|
||||||
/// Executors are common for both ReplicatedMergeTree and plain MergeTree
|
/// Executors are common for both ReplicatedMergeTree and plain MergeTree
|
||||||
/// but they are being started and finished in derived classes, so let them be protected.
|
/// but they are being started and finished in derived classes, so let them be protected.
|
||||||
BackgroundJobsAssignee background_executor;
|
///
|
||||||
BackgroundJobsAssignee background_moves_executor;
|
/// Why there are two executors, not one? Or an executor for each kind of operation?
|
||||||
|
/// It is historically formed.
|
||||||
|
/// Another explanation is that moving operations are common for Replicated and Plain MergeTree classes.
|
||||||
|
/// Task that schedules this operations is executed with its own timetable and triggered in a specific places in code.
|
||||||
|
/// And for ReplicatedMergeTree we don't have LogEntry type for this operation.
|
||||||
|
BackgroundJobsAssignee background_operations_assignee;
|
||||||
|
BackgroundJobsAssignee background_moves_assignee;
|
||||||
|
|
||||||
|
/// Strongly connected with two fields above.
|
||||||
/// Every task that is finished will ask to assign a new one into an executor.
|
/// Every task that is finished will ask to assign a new one into an executor.
|
||||||
|
/// These callbacks will be passed to the constructor of each task.
|
||||||
std::function<void(bool)> common_assignee_trigger;
|
std::function<void(bool)> common_assignee_trigger;
|
||||||
std::function<void(bool)> moves_assignee_trigger;
|
std::function<void(bool)> moves_assignee_trigger;
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ void MergeTreeSink::consume(Chunk chunk)
|
|||||||
PartLog::addNewPart(storage.getContext(), part, watch.elapsed());
|
PartLog::addNewPart(storage.getContext(), part, watch.elapsed());
|
||||||
|
|
||||||
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.
|
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.
|
||||||
storage.background_executor.trigger();
|
storage.background_operations_assignee.trigger();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -624,7 +624,7 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
storage.background_executor.trigger();
|
storage.background_operations_assignee.trigger();
|
||||||
}
|
}
|
||||||
|
|
||||||
return stat.version;
|
return stat.version;
|
||||||
@ -713,7 +713,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (some_active_mutations_were_killed)
|
if (some_active_mutations_were_killed)
|
||||||
storage.background_executor.trigger();
|
storage.background_operations_assignee.trigger();
|
||||||
|
|
||||||
if (!entries_to_load.empty())
|
if (!entries_to_load.empty())
|
||||||
{
|
{
|
||||||
@ -847,7 +847,7 @@ ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (mutation_was_active)
|
if (mutation_was_active)
|
||||||
storage.background_executor.trigger();
|
storage.background_operations_assignee.trigger();
|
||||||
|
|
||||||
return entry;
|
return entry;
|
||||||
}
|
}
|
||||||
|
@ -204,7 +204,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
|
|||||||
storage.partial_shutdown_event.reset();
|
storage.partial_shutdown_event.reset();
|
||||||
|
|
||||||
/// Start queue processing
|
/// Start queue processing
|
||||||
storage.background_executor.start();
|
storage.background_operations_assignee.start();
|
||||||
|
|
||||||
storage.queue_updating_task->activateAndSchedule();
|
storage.queue_updating_task->activateAndSchedule();
|
||||||
storage.mutations_updating_task->activateAndSchedule();
|
storage.mutations_updating_task->activateAndSchedule();
|
||||||
@ -389,7 +389,7 @@ void ReplicatedMergeTreeRestartingThread::partialShutdown()
|
|||||||
auto fetch_lock = storage.fetcher.blocker.cancel();
|
auto fetch_lock = storage.fetcher.blocker.cancel();
|
||||||
auto merge_lock = storage.merger_mutator.merges_blocker.cancel();
|
auto merge_lock = storage.merger_mutator.merges_blocker.cancel();
|
||||||
auto move_lock = storage.parts_mover.moves_blocker.cancel();
|
auto move_lock = storage.parts_mover.moves_blocker.cancel();
|
||||||
storage.background_executor.finish();
|
storage.background_operations_assignee.finish();
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_TRACE(log, "Threads finished");
|
LOG_TRACE(log, "Threads finished");
|
||||||
|
@ -24,7 +24,7 @@ public:
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
bool execute() override
|
bool executeStep() override
|
||||||
{
|
{
|
||||||
auto sleep_time = distribution(generator);
|
auto sleep_time = distribution(generator);
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(5 * sleep_time));
|
std::this_thread::sleep_for(std::chrono::milliseconds(5 * sleep_time));
|
||||||
|
@ -112,7 +112,7 @@ void StorageMergeTree::startup()
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
background_executor.start();
|
background_operations_assignee.start();
|
||||||
startBackgroundMovesIfNeeded();
|
startBackgroundMovesIfNeeded();
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
@ -150,8 +150,8 @@ void StorageMergeTree::shutdown()
|
|||||||
merger_mutator.merges_blocker.cancelForever();
|
merger_mutator.merges_blocker.cancelForever();
|
||||||
parts_mover.moves_blocker.cancelForever();
|
parts_mover.moves_blocker.cancelForever();
|
||||||
|
|
||||||
background_executor.finish();
|
background_operations_assignee.finish();
|
||||||
background_moves_executor.finish();
|
background_moves_assignee.finish();
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -411,7 +411,7 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, String
|
|||||||
|
|
||||||
LOG_INFO(log, "Added mutation: {}", mutation_file_name);
|
LOG_INFO(log, "Added mutation: {}", mutation_file_name);
|
||||||
}
|
}
|
||||||
background_executor.trigger();
|
background_operations_assignee.trigger();
|
||||||
return version;
|
return version;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -637,7 +637,7 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Maybe there is another mutation that was blocked by the killed one. Try to execute it immediately.
|
/// Maybe there is another mutation that was blocked by the killed one. Try to execute it immediately.
|
||||||
background_executor.trigger();
|
background_operations_assignee.trigger();
|
||||||
|
|
||||||
return CancellationCode::CancelSent;
|
return CancellationCode::CancelSent;
|
||||||
}
|
}
|
||||||
@ -1041,7 +1041,7 @@ bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_sn
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & executor) //-V657
|
bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) //-V657
|
||||||
{
|
{
|
||||||
if (shutdown_called)
|
if (shutdown_called)
|
||||||
return false;
|
return false;
|
||||||
@ -1073,7 +1073,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & execut
|
|||||||
|
|
||||||
if (merge_entry)
|
if (merge_entry)
|
||||||
{
|
{
|
||||||
executor.scheduleMergeMutateTask(LambdaAdapter::create(
|
assignee.scheduleMergeMutateTask(ExecutableLambdaAdapter::create(
|
||||||
[this, metadata_snapshot, merge_entry, share_lock] () mutable
|
[this, metadata_snapshot, merge_entry, share_lock] () mutable
|
||||||
{
|
{
|
||||||
return mergeSelectedParts(metadata_snapshot, false, {}, *merge_entry, share_lock);
|
return mergeSelectedParts(metadata_snapshot, false, {}, *merge_entry, share_lock);
|
||||||
@ -1082,27 +1082,27 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & execut
|
|||||||
}
|
}
|
||||||
if (mutate_entry)
|
if (mutate_entry)
|
||||||
{
|
{
|
||||||
executor.scheduleMergeMutateTask(LambdaAdapter::create(
|
assignee.scheduleMergeMutateTask(ExecutableLambdaAdapter::create(
|
||||||
[this, metadata_snapshot, merge_entry, mutate_entry, share_lock] () mutable
|
[this, metadata_snapshot, merge_entry, mutate_entry, share_lock] () mutable
|
||||||
{
|
{
|
||||||
return mutateSelectedPart(metadata_snapshot, *mutate_entry, share_lock);
|
return mutateSelectedPart(metadata_snapshot, *mutate_entry, share_lock);
|
||||||
}, common_assignee_trigger, getStorageID()));
|
}, common_assignee_trigger, getStorageID()));
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
bool executed = false;
|
bool scheduled = false;
|
||||||
if (time_after_previous_cleanup_temporary_directories.compareAndRestartDeferred(getContext()->getSettingsRef().merge_tree_clear_old_temporary_directories_interval_seconds))
|
if (time_after_previous_cleanup_temporary_directories.compareAndRestartDeferred(getContext()->getSettingsRef().merge_tree_clear_old_temporary_directories_interval_seconds))
|
||||||
{
|
{
|
||||||
executor.scheduleMergeMutateTask(LambdaAdapter::create(
|
assignee.scheduleMergeMutateTask(ExecutableLambdaAdapter::create(
|
||||||
[this, share_lock] ()
|
[this, share_lock] ()
|
||||||
{
|
{
|
||||||
clearOldTemporaryDirectories(getSettings()->temporary_directories_lifetime.totalSeconds());
|
clearOldTemporaryDirectories(getSettings()->temporary_directories_lifetime.totalSeconds());
|
||||||
return true;
|
return true;
|
||||||
}, common_assignee_trigger, getStorageID()));
|
}, common_assignee_trigger, getStorageID()));
|
||||||
executed = true;
|
scheduled = true;
|
||||||
}
|
}
|
||||||
if (time_after_previous_cleanup_parts.compareAndRestartDeferred(getContext()->getSettingsRef().merge_tree_clear_old_parts_interval_seconds))
|
if (time_after_previous_cleanup_parts.compareAndRestartDeferred(getContext()->getSettingsRef().merge_tree_clear_old_parts_interval_seconds))
|
||||||
{
|
{
|
||||||
executor.scheduleMergeMutateTask(LambdaAdapter::create(
|
assignee.scheduleMergeMutateTask(ExecutableLambdaAdapter::create(
|
||||||
[this, share_lock] ()
|
[this, share_lock] ()
|
||||||
{
|
{
|
||||||
/// All use relative_data_path which changes during rename
|
/// All use relative_data_path which changes during rename
|
||||||
@ -1113,10 +1113,10 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & execut
|
|||||||
clearEmptyParts();
|
clearEmptyParts();
|
||||||
return true;
|
return true;
|
||||||
}, common_assignee_trigger, getStorageID()));
|
}, common_assignee_trigger, getStorageID()));
|
||||||
executed = true;
|
scheduled = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return executed;
|
return scheduled;
|
||||||
}
|
}
|
||||||
|
|
||||||
Int64 StorageMergeTree::getCurrentMutationVersion(
|
Int64 StorageMergeTree::getCurrentMutationVersion(
|
||||||
@ -1567,9 +1567,9 @@ ActionLock StorageMergeTree::getActionLock(StorageActionBlockType action_type)
|
|||||||
void StorageMergeTree::onActionLockRemove(StorageActionBlockType action_type)
|
void StorageMergeTree::onActionLockRemove(StorageActionBlockType action_type)
|
||||||
{
|
{
|
||||||
if (action_type == ActionLocks::PartsMerge || action_type == ActionLocks::PartsTTLMerge)
|
if (action_type == ActionLocks::PartsMerge || action_type == ActionLocks::PartsTTLMerge)
|
||||||
background_executor.trigger();
|
background_operations_assignee.trigger();
|
||||||
else if (action_type == ActionLocks::PartsMove)
|
else if (action_type == ActionLocks::PartsMove)
|
||||||
background_moves_executor.trigger();
|
background_moves_assignee.trigger();
|
||||||
}
|
}
|
||||||
|
|
||||||
CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_context)
|
CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_context)
|
||||||
@ -1647,7 +1647,7 @@ MutationCommands StorageMergeTree::getFirstAlterMutationCommandsForPart(const Da
|
|||||||
void StorageMergeTree::startBackgroundMovesIfNeeded()
|
void StorageMergeTree::startBackgroundMovesIfNeeded()
|
||||||
{
|
{
|
||||||
if (areBackgroundMovesNeeded())
|
if (areBackgroundMovesNeeded())
|
||||||
background_moves_executor.start();
|
background_moves_assignee.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<MergeTreeSettings> StorageMergeTree::getDefaultSettings() const
|
std::unique_ptr<MergeTreeSettings> StorageMergeTree::getDefaultSettings() const
|
||||||
|
@ -95,7 +95,7 @@ public:
|
|||||||
|
|
||||||
RestoreDataTasks restoreFromBackup(const BackupPtr & backup, const String & data_path_in_backup, const ASTs & partitions, ContextMutablePtr context) override;
|
RestoreDataTasks restoreFromBackup(const BackupPtr & backup, const String & data_path_in_backup, const ASTs & partitions, ContextMutablePtr context) override;
|
||||||
|
|
||||||
bool scheduleDataProcessingJob(BackgroundJobsAssignee & executor) override;
|
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;
|
||||||
|
|
||||||
MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); }
|
MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); }
|
||||||
|
|
||||||
|
@ -3173,7 +3173,7 @@ bool StorageReplicatedMergeTree::processQueueEntry(ReplicatedMergeTreeQueue::Sel
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
bool StorageReplicatedMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & executor)
|
bool StorageReplicatedMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assignee)
|
||||||
{
|
{
|
||||||
/// If replication queue is stopped exit immediately as we successfully executed the task
|
/// If replication queue is stopped exit immediately as we successfully executed the task
|
||||||
if (queue.actions_blocker.isCancelled())
|
if (queue.actions_blocker.isCancelled())
|
||||||
@ -3188,7 +3188,7 @@ bool StorageReplicatedMergeTree::scheduleDataProcessingJob(BackgroundJobsAssigne
|
|||||||
/// Depending on entry type execute in fetches (small) pool or big merge_mutate pool
|
/// Depending on entry type execute in fetches (small) pool or big merge_mutate pool
|
||||||
if (selected_entry->log_entry->type == LogEntry::GET_PART)
|
if (selected_entry->log_entry->type == LogEntry::GET_PART)
|
||||||
{
|
{
|
||||||
executor.scheduleFetchTask(LambdaAdapter::create(
|
assignee.scheduleFetchTask(ExecutableLambdaAdapter::create(
|
||||||
[this, selected_entry] () mutable
|
[this, selected_entry] () mutable
|
||||||
{
|
{
|
||||||
return processQueueEntry(selected_entry);
|
return processQueueEntry(selected_entry);
|
||||||
@ -3197,7 +3197,7 @@ bool StorageReplicatedMergeTree::scheduleDataProcessingJob(BackgroundJobsAssigne
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
executor.scheduleMergeMutateTask(LambdaAdapter::create(
|
assignee.scheduleMergeMutateTask(ExecutableLambdaAdapter::create(
|
||||||
[this, selected_entry] () mutable
|
[this, selected_entry] () mutable
|
||||||
{
|
{
|
||||||
return processQueueEntry(selected_entry);
|
return processQueueEntry(selected_entry);
|
||||||
@ -4337,7 +4337,7 @@ void StorageReplicatedMergeTree::shutdown()
|
|||||||
parts_mover.moves_blocker.cancelForever();
|
parts_mover.moves_blocker.cancelForever();
|
||||||
|
|
||||||
restarting_thread.shutdown();
|
restarting_thread.shutdown();
|
||||||
background_executor.finish();
|
background_operations_assignee.finish();
|
||||||
part_moves_between_shards_orchestrator.shutdown();
|
part_moves_between_shards_orchestrator.shutdown();
|
||||||
|
|
||||||
{
|
{
|
||||||
@ -4347,7 +4347,7 @@ void StorageReplicatedMergeTree::shutdown()
|
|||||||
/// MUTATE, etc. query.
|
/// MUTATE, etc. query.
|
||||||
queue.pull_log_blocker.cancelForever();
|
queue.pull_log_blocker.cancelForever();
|
||||||
}
|
}
|
||||||
background_moves_executor.finish();
|
background_moves_assignee.finish();
|
||||||
|
|
||||||
auto data_parts_exchange_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, InterserverIOEndpointPtr{});
|
auto data_parts_exchange_ptr = std::atomic_exchange(&data_parts_exchange_endpoint, InterserverIOEndpointPtr{});
|
||||||
if (data_parts_exchange_ptr)
|
if (data_parts_exchange_ptr)
|
||||||
@ -6947,9 +6947,9 @@ void StorageReplicatedMergeTree::onActionLockRemove(StorageActionBlockType actio
|
|||||||
if (action_type == ActionLocks::PartsMerge || action_type == ActionLocks::PartsTTLMerge
|
if (action_type == ActionLocks::PartsMerge || action_type == ActionLocks::PartsTTLMerge
|
||||||
|| action_type == ActionLocks::PartsFetch || action_type == ActionLocks::PartsSend
|
|| action_type == ActionLocks::PartsFetch || action_type == ActionLocks::PartsSend
|
||||||
|| action_type == ActionLocks::ReplicationQueue)
|
|| action_type == ActionLocks::ReplicationQueue)
|
||||||
background_executor.trigger();
|
background_operations_assignee.trigger();
|
||||||
else if (action_type == ActionLocks::PartsMove)
|
else if (action_type == ActionLocks::PartsMove)
|
||||||
background_moves_executor.trigger();
|
background_moves_assignee.trigger();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UInt64 max_wait_milliseconds)
|
bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UInt64 max_wait_milliseconds)
|
||||||
@ -6961,7 +6961,7 @@ bool StorageReplicatedMergeTree::waitForShrinkingQueueSize(size_t queue_size, UI
|
|||||||
|
|
||||||
/// This is significant, because the execution of this task could be delayed at BackgroundPool.
|
/// This is significant, because the execution of this task could be delayed at BackgroundPool.
|
||||||
/// And we force it to be executed.
|
/// And we force it to be executed.
|
||||||
background_executor.trigger();
|
background_operations_assignee.trigger();
|
||||||
|
|
||||||
Poco::Event target_size_event;
|
Poco::Event target_size_event;
|
||||||
auto callback = [&target_size_event, queue_size] (size_t new_queue_size)
|
auto callback = [&target_size_event, queue_size] (size_t new_queue_size)
|
||||||
@ -7195,7 +7195,7 @@ MutationCommands StorageReplicatedMergeTree::getFirstAlterMutationCommandsForPar
|
|||||||
void StorageReplicatedMergeTree::startBackgroundMovesIfNeeded()
|
void StorageReplicatedMergeTree::startBackgroundMovesIfNeeded()
|
||||||
{
|
{
|
||||||
if (areBackgroundMovesNeeded())
|
if (areBackgroundMovesNeeded())
|
||||||
background_moves_executor.start();
|
background_moves_assignee.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<MergeTreeSettings> StorageReplicatedMergeTree::getDefaultSettings() const
|
std::unique_ptr<MergeTreeSettings> StorageReplicatedMergeTree::getDefaultSettings() const
|
||||||
|
@ -218,7 +218,7 @@ public:
|
|||||||
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger);
|
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger);
|
||||||
|
|
||||||
/// Schedules job to execute in background pool (merge, mutate, drop range and so on)
|
/// Schedules job to execute in background pool (merge, mutate, drop range and so on)
|
||||||
bool scheduleDataProcessingJob(BackgroundJobsAssignee & executor) override;
|
bool scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) override;
|
||||||
|
|
||||||
/// Checks that fetches are not disabled with action blocker and pool for fetches
|
/// Checks that fetches are not disabled with action blocker and pool for fetches
|
||||||
/// is not overloaded
|
/// is not overloaded
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
1
|
[[1]]
|
||||||
1
|
[[1]]
|
||||||
1
|
[[1]]
|
||||||
1
|
[[1]]
|
||||||
|
@ -6,7 +6,8 @@ CREATE TABLE table_with_single_pk
|
|||||||
value String
|
value String
|
||||||
)
|
)
|
||||||
ENGINE = MergeTree
|
ENGINE = MergeTree
|
||||||
ORDER BY key;
|
ORDER BY key
|
||||||
|
SETTINGS min_compress_block_size=65536, max_compress_block_size=65536;
|
||||||
|
|
||||||
INSERT INTO table_with_single_pk SELECT number, toString(number % 10) FROM numbers(10000000);
|
INSERT INTO table_with_single_pk SELECT number, toString(number % 10) FROM numbers(10000000);
|
||||||
|
|
||||||
@ -15,9 +16,9 @@ ALTER TABLE table_with_single_pk DELETE WHERE key % 77 = 0 SETTINGS mutations_sy
|
|||||||
SYSTEM FLUSH LOGS;
|
SYSTEM FLUSH LOGS;
|
||||||
|
|
||||||
-- Memory usage for all mutations must be almost constant and less than
|
-- Memory usage for all mutations must be almost constant and less than
|
||||||
-- read_bytes (with a margin)
|
-- read_bytes
|
||||||
SELECT
|
SELECT
|
||||||
DISTINCT 1.1 * read_bytes >= peak_memory_usage
|
arrayDistinct(groupArray(if (read_bytes >= peak_memory_usage, [1], [read_bytes, peak_memory_usage])))
|
||||||
FROM
|
FROM
|
||||||
system.part_log
|
system.part_log
|
||||||
WHERE event_type = 'MutatePart' AND table = 'table_with_single_pk' AND database = currentDatabase();
|
WHERE event_type = 'MutatePart' AND table = 'table_with_single_pk' AND database = currentDatabase();
|
||||||
@ -34,7 +35,8 @@ CREATE TABLE table_with_multi_pk
|
|||||||
value String
|
value String
|
||||||
)
|
)
|
||||||
ENGINE = MergeTree
|
ENGINE = MergeTree
|
||||||
ORDER BY (key1, key2, key3);
|
ORDER BY (key1, key2, key3)
|
||||||
|
SETTINGS min_compress_block_size=65536, max_compress_block_size=65536;
|
||||||
|
|
||||||
INSERT INTO table_with_multi_pk SELECT number % 32, number, toDateTime('2019-10-01 00:00:00'), toString(number % 10) FROM numbers(10000000);
|
INSERT INTO table_with_multi_pk SELECT number % 32, number, toDateTime('2019-10-01 00:00:00'), toString(number % 10) FROM numbers(10000000);
|
||||||
|
|
||||||
@ -43,9 +45,9 @@ ALTER TABLE table_with_multi_pk DELETE WHERE key1 % 77 = 0 SETTINGS mutations_sy
|
|||||||
SYSTEM FLUSH LOGS;
|
SYSTEM FLUSH LOGS;
|
||||||
|
|
||||||
-- Memory usage for all mutations must be almost constant and less than
|
-- Memory usage for all mutations must be almost constant and less than
|
||||||
-- read_bytes (with a margin)
|
-- read_bytes
|
||||||
SELECT
|
SELECT
|
||||||
DISTINCT 1.1 * read_bytes >= peak_memory_usage
|
arrayDistinct(groupArray(if (read_bytes >= peak_memory_usage, [1], [read_bytes, peak_memory_usage])))
|
||||||
FROM
|
FROM
|
||||||
system.part_log
|
system.part_log
|
||||||
WHERE event_type = 'MutatePart' AND table = 'table_with_multi_pk' AND database = currentDatabase();
|
WHERE event_type = 'MutatePart' AND table = 'table_with_multi_pk' AND database = currentDatabase();
|
||||||
@ -64,7 +66,8 @@ CREATE TABLE table_with_function_pk
|
|||||||
value String
|
value String
|
||||||
)
|
)
|
||||||
ENGINE = MergeTree
|
ENGINE = MergeTree
|
||||||
ORDER BY (cast(value as UInt64), key2);
|
ORDER BY (cast(value as UInt64), key2)
|
||||||
|
SETTINGS min_compress_block_size=65536, max_compress_block_size=65536;
|
||||||
|
|
||||||
INSERT INTO table_with_function_pk SELECT number % 32, number, toDateTime('2019-10-01 00:00:00'), toString(number % 10) FROM numbers(10000000);
|
INSERT INTO table_with_function_pk SELECT number % 32, number, toDateTime('2019-10-01 00:00:00'), toString(number % 10) FROM numbers(10000000);
|
||||||
|
|
||||||
@ -73,9 +76,9 @@ ALTER TABLE table_with_function_pk DELETE WHERE key1 % 77 = 0 SETTINGS mutations
|
|||||||
SYSTEM FLUSH LOGS;
|
SYSTEM FLUSH LOGS;
|
||||||
|
|
||||||
-- Memory usage for all mutations must be almost constant and less than
|
-- Memory usage for all mutations must be almost constant and less than
|
||||||
-- read_bytes (with a margin)
|
-- read_bytes
|
||||||
SELECT
|
SELECT
|
||||||
DISTINCT 1.1 * read_bytes >= peak_memory_usage
|
arrayDistinct(groupArray(if (read_bytes >= peak_memory_usage, [1], [read_bytes, peak_memory_usage])))
|
||||||
FROM
|
FROM
|
||||||
system.part_log
|
system.part_log
|
||||||
WHERE event_type = 'MutatePart' AND table = 'table_with_function_pk' AND database = currentDatabase();
|
WHERE event_type = 'MutatePart' AND table = 'table_with_function_pk' AND database = currentDatabase();
|
||||||
@ -92,7 +95,8 @@ CREATE TABLE table_without_pk
|
|||||||
value String
|
value String
|
||||||
)
|
)
|
||||||
ENGINE = MergeTree
|
ENGINE = MergeTree
|
||||||
ORDER BY tuple();
|
ORDER BY tuple()
|
||||||
|
SETTINGS min_compress_block_size=65536, max_compress_block_size=65536;
|
||||||
|
|
||||||
INSERT INTO table_without_pk SELECT number % 32, number, toDateTime('2019-10-01 00:00:00'), toString(number % 10) FROM numbers(10000000);
|
INSERT INTO table_without_pk SELECT number % 32, number, toDateTime('2019-10-01 00:00:00'), toString(number % 10) FROM numbers(10000000);
|
||||||
|
|
||||||
@ -101,9 +105,9 @@ ALTER TABLE table_without_pk DELETE WHERE key1 % 77 = 0 SETTINGS mutations_sync
|
|||||||
SYSTEM FLUSH LOGS;
|
SYSTEM FLUSH LOGS;
|
||||||
|
|
||||||
-- Memory usage for all mutations must be almost constant and less than
|
-- Memory usage for all mutations must be almost constant and less than
|
||||||
-- read_bytes (with a margin)
|
-- read_bytes
|
||||||
SELECT
|
SELECT
|
||||||
DISTINCT 1.1 * read_bytes >= peak_memory_usage
|
arrayDistinct(groupArray(if (read_bytes >= peak_memory_usage, [1], [read_bytes, peak_memory_usage])))
|
||||||
FROM
|
FROM
|
||||||
system.part_log
|
system.part_log
|
||||||
WHERE event_type = 'MutatePart' AND table = 'table_without_pk' AND database = currentDatabase();
|
WHERE event_type = 'MutatePart' AND table = 'table_without_pk' AND database = currentDatabase();
|
||||||
|
Loading…
Reference in New Issue
Block a user