First simple implementation

This commit is contained in:
alesapin 2020-10-13 17:25:42 +03:00
parent 163d33fd21
commit fd35368c59
12 changed files with 238 additions and 34 deletions

View File

@ -0,0 +1,49 @@
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
namespace DB
{
BackgroundJobsExecutor::BackgroundJobsExecutor(
MergeTreeData & data_,
Context & global_context)
: data(data_)
, data_processing_pool(global_context.getSettingsRef().background_pool_size, 0, 10000, false)
, move_pool(global_context.getSettingsRef().background_move_pool_size, 0, 10000, false)
{
data_processing_task = global_context.getSchedulePool().createTask(
data.getStorageID().getFullTableName() + " (dataProcessingTask)", [this]{ dataProcessingTask(); });
}
void BackgroundJobsExecutor::dataProcessingTask()
try
{
auto job = data.getDataProcessingJob();
if (job)
data_processing_pool.scheduleOrThrowOnError(*job);
data_processing_task->schedule();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
void BackgroundJobsExecutor::start()
{
if (data_processing_task)
data_processing_task->activateAndSchedule();
}
void BackgroundJobsExecutor::triggerDataProcessing()
{
if (data_processing_task)
data_processing_task->schedule();
}
void BackgroundJobsExecutor::finish()
{
data_processing_task->deactivate();
data_processing_pool.wait();
}
}

View File

@ -0,0 +1,34 @@
#pragma once
#include <Storages/MergeTree/MergeTreeData.h>
#include <Common/ThreadPool.h>
#include <Storages/MergeTree/MergeTreeBackgroundJob.h>
#include <Core/BackgroundSchedulePool.h>
namespace DB
{
class BackgroundJobsExecutor
{
private:
MergeTreeData & data;
ThreadPool data_processing_pool;
ThreadPool move_pool;
BackgroundSchedulePool::TaskHolder data_processing_task;
BackgroundSchedulePool::TaskHolder move_processing_task;
void dataProcessingTask();
public:
BackgroundJobsExecutor(
MergeTreeData & data_,
Context & global_context_);
void triggerDataProcessing();
void triggerMovesProcessing();
void start();
void finish();
};
}

View File

@ -0,0 +1,46 @@
# pragma once
#include <functional>
#include <Common/ThreadPool.h>
#include <Common/CurrentMetrics.h>
#include <common/logger_useful.h>
namespace DB
{
enum PoolType
{
MERGE_MUTATE,
MOVE,
FETCH,
};
struct MergeTreeBackgroundJob
{
ThreadPool::Job job;
CurrentMetrics::Metric metric;
PoolType execute_in_pool;
MergeTreeBackgroundJob(ThreadPool::Job && job_, CurrentMetrics::Metric metric_, PoolType execute_in_pool_)
: job(std::move(job_)), metric(metric_), execute_in_pool(execute_in_pool_)
{}
void operator()()
try
{
if (metric != 0)
{
CurrentMetrics::Increment metric_increment{metric};
job();
}
else
{
job();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
};
}

View File

@ -27,11 +27,8 @@ void MergeTreeBlockOutputStream::write(const Block & block)
PartLog::addNewPart(storage.global_context, part, watch.elapsed());
if (storage.merging_mutating_task_handle)
{
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.
storage.merging_mutating_task_handle->signalReadyToRun();
}
storage.background_executor.triggerDataProcessing();
}
}

View File

@ -25,6 +25,7 @@
#include <Interpreters/Aggregator.h>
#include <Storages/extractKeyExpressionList.h>
#include <Storages/PartitionCommands.h>
#include <Storages/MergeTree/MergeTreeBackgroundJob.h>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
@ -710,6 +711,8 @@ public:
/// Mutex for currently_moving_parts
mutable std::mutex moving_parts_mutex;
virtual std::optional<MergeTreeBackgroundJob> getDataProcessingJob() { return {}; }
protected:
friend class IMergeTreeDataPart;

View File

@ -154,7 +154,6 @@ MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, si
{
}
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge() const
{
size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed);
@ -166,7 +165,9 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge() const
UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used) const
{
if (pool_used > pool_size)
{
throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR);
}
size_t free_entries = pool_size - pool_used;
const auto data_settings = data.getSettings();

View File

@ -1287,7 +1287,7 @@ ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToP
}
if (entry)
return { entry, std::unique_ptr<CurrentlyExecuting>{ new CurrentlyExecuting(entry, *this) } };
return { entry, std::shared_ptr<CurrentlyExecuting>{ new CurrentlyExecuting(entry, *this) } };
else
return {};
}

View File

@ -319,7 +319,7 @@ public:
/** Select the next action to process.
* merger_mutator is used only to check if the merges are not suspended.
*/
using SelectedEntry = std::pair<ReplicatedMergeTreeQueue::LogEntryPtr, std::unique_ptr<CurrentlyExecuting>>;
using SelectedEntry = std::pair<ReplicatedMergeTreeQueue::LogEntryPtr, std::shared_ptr<CurrentlyExecuting>>;
SelectedEntry selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data);
/** Execute `func` function to handle the action.

View File

@ -27,6 +27,10 @@
#include <Processors/Pipe.h>
#include <optional>
namespace CurrentMetrics
{
extern const Metric BackgroundPoolTask;
}
namespace DB
{
@ -73,7 +77,8 @@ StorageMergeTree::StorageMergeTree(
attach)
, reader(*this)
, writer(*this)
, merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads())
, merger_mutator(*this, global_context.getSettingsRef().background_pool_size)
, background_executor(*this, global_context)
{
loadDataParts(has_force_restore_data_flag);
@ -100,11 +105,7 @@ void StorageMergeTree::startup()
try
{
auto & merge_pool = global_context.getBackgroundPool();
merging_mutating_task_handle = merge_pool.createTask([this] { return mergeMutateTask(); });
/// Ensure that thread started only after assignment to 'merging_mutating_task_handle' is done.
merge_pool.startTask(merging_mutating_task_handle);
background_executor.start();
startBackgroundMovesIfNeeded();
}
catch (...)
@ -142,8 +143,7 @@ void StorageMergeTree::shutdown()
merger_mutator.merges_blocker.cancelForever();
parts_mover.moves_blocker.cancelForever();
if (merging_mutating_task_handle)
global_context.getBackgroundPool().removeTask(merging_mutating_task_handle);
background_executor.finish();
if (moving_task_handle)
global_context.getBackgroundMovePool().removeTask(moving_task_handle);
@ -361,7 +361,7 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, String
current_mutations_by_version.emplace(version, insertion.first->second);
LOG_INFO(log, "Added mutation: {}", mutation_file_name);
merging_mutating_task_handle->signalReadyToRun();
background_executor.triggerDataProcessing();
return version;
}
@ -591,7 +591,7 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
}
/// Maybe there is another mutation that was blocked by the killed one. Try to execute it immediately.
merging_mutating_task_handle->signalReadyToRun();
background_executor.triggerDataProcessing();
return CancellationCode::CancelSent;
}
@ -712,10 +712,8 @@ std::optional<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::sele
return {};
}
merging_tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, metadata_snapshot, false);
auto table_id = getStorageID();
merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
return MergeMutateSelectedEntry{future_part, std::move(merging_tagger), std::move(merge_entry), {}};
merging_tagger = std::make_shared<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, metadata_snapshot, false);
return MergeMutateSelectedEntry{future_part, std::move(merging_tagger), {}};
}
bool StorageMergeTree::merge(
@ -739,6 +737,9 @@ bool StorageMergeTree::mergeSelectedParts(const StorageMetadataPtr & metadata_sn
auto & future_part = merge_mutate_entry.future_part;
Stopwatch stopwatch;
MutableDataPartPtr new_part;
auto table_id = getStorageID();
auto merge_list_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
auto write_part_log = [&] (const ExecutionStatus & execution_status)
{
@ -749,13 +750,13 @@ bool StorageMergeTree::mergeSelectedParts(const StorageMetadataPtr & metadata_sn
future_part.name,
new_part,
future_part.parts,
merge_mutate_entry.merge_entry.get());
merge_list_entry.get());
};
try
{
new_part = merger_mutator.mergePartsToTemporaryPart(
future_part, metadata_snapshot, *(merge_mutate_entry.merge_entry), table_lock_holder, time(nullptr),
future_part, metadata_snapshot, *(merge_list_entry), table_lock_holder, time(nullptr),
global_context, merge_mutate_entry.tagger->reserved_space, deduplicate);
merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
@ -868,10 +869,8 @@ std::optional<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::sele
future_part.name = part->getNewName(new_part_info);
future_part.type = part->getType();
tagger = std::make_unique<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true);
auto table_id = getStorageID();
MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
return MergeMutateSelectedEntry{future_part, std::move(tagger), std::move(merge_entry), commands};
tagger = std::make_shared<CurrentlyMergingPartsTagger>(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true);
return MergeMutateSelectedEntry{future_part, std::move(tagger), commands};
}
return {};
}
@ -880,6 +879,9 @@ bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_sn
{
auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
auto & future_part = merge_mutate_entry.future_part;
auto table_id = getStorageID();
auto merge_list_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
Stopwatch stopwatch;
MutableDataPartPtr new_part;
@ -892,13 +894,13 @@ bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_sn
future_part.name,
new_part,
future_part.parts,
merge_mutate_entry.merge_entry.get());
merge_list_entry.get());
};
try
{
new_part = merger_mutator.mutatePartToTemporaryPart(
future_part, metadata_snapshot, merge_mutate_entry.commands, *(merge_mutate_entry.merge_entry),
future_part, metadata_snapshot, merge_mutate_entry.commands, *(merge_list_entry),
time(nullptr), global_context, merge_mutate_entry.tagger->reserved_space, table_lock_holder);
renameTempPartAndReplace(new_part);
@ -916,6 +918,52 @@ bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_sn
return true;
}
std::optional<MergeTreeBackgroundJob> StorageMergeTree::getDataProcessingJob()
{
if (shutdown_called)
return {};
if (merger_mutator.merges_blocker.isCancelled())
return {};
auto metadata_snapshot = getInMemoryMetadataPtr();
std::optional<MergeMutateSelectedEntry> merge_entry, mutate_entry;
merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, nullptr);
if (!merge_entry)
mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr);
if (merge_entry || mutate_entry)
{
auto job = [this, metadata_snapshot, merge_entry{std::move(merge_entry)}, mutate_entry{std::move(mutate_entry)}] () mutable
{
if (merge_entry)
mergeSelectedParts(metadata_snapshot, false, *merge_entry);
else if (mutate_entry)
mutateSelectedPart(metadata_snapshot, *mutate_entry);
};
return std::make_optional<MergeTreeBackgroundJob>(std::move(job), CurrentMetrics::BackgroundPoolTask, PoolType::MERGE_MUTATE);
}
else if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1))
{
auto job = [this] ()
{
{
auto share_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
/// All use relative_data_path which changes during rename
/// so execute under share lock.
clearOldPartsFromFilesystem();
clearOldTemporaryDirectories();
clearOldWriteAheadLogs();
}
clearOldMutations();
};
return std::make_optional<MergeTreeBackgroundJob>(std::move(job), 0, PoolType::MERGE_MUTATE);
}
return {};
}
BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask()
{
if (shutdown_called)

View File

@ -16,6 +16,7 @@
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Common/SimpleIncrement.h>
#include <Core/BackgroundSchedulePool.h>
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
namespace DB
@ -87,6 +88,7 @@ public:
CheckResults checkData(const ASTPtr & query, const Context & context) override;
std::optional<MergeTreeBackgroundJob> getDataProcessingJob() override;
private:
/// Mutex and condvar for synchronous mutations wait
@ -119,7 +121,7 @@ private:
std::atomic<bool> shutdown_called {false};
/// Task handler for merges, mutations and moves.
BackgroundProcessingPool::TaskHandle merging_mutating_task_handle;
BackgroundJobsExecutor background_executor;
BackgroundProcessingPool::TaskHandle moving_task_handle;
void loadMutations();
@ -142,13 +144,12 @@ private:
friend struct CurrentlyMergingPartsTagger;
using CurrentlyMergingPartsTaggerPtr = std::unique_ptr<CurrentlyMergingPartsTagger>;
using CurrentlyMergingPartsTaggerPtr = std::shared_ptr<CurrentlyMergingPartsTagger>;
struct MergeMutateSelectedEntry
{
FutureMergedMutatedPart future_part;
CurrentlyMergingPartsTaggerPtr tagger;
MergeList::EntryPtr merge_entry;
MutationCommands commands;
};

View File

@ -194,7 +194,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
, replica_path(zookeeper_path + "/replicas/" + replica_name)
, reader(*this)
, writer(*this)
, merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads())
, merger_mutator(*this, global_context.getSettingsRef().background_pool_size)
, queue(*this)
, fetcher(*this)
, cleanup_thread(*this)
@ -2599,6 +2599,29 @@ bool StorageReplicatedMergeTree::processQueueEntry(ReplicatedMergeTreeQueue::Sel
});
}
std::optional<MergeTreeBackgroundJob> StorageReplicatedMergeTree::getDataProcessingJob()
{
/// If replication queue is stopped exit immediately as we successfully executed the task
if (queue.actions_blocker.isCancelled())
return {};
/// This object will mark the element of the queue as running.
ReplicatedMergeTreeQueue::SelectedEntry selected_entry = selectQueueEntry();
LogEntryPtr & entry = selected_entry.first;
if (!entry)
return {};
auto job = [this, selected_entry{std::move(selected_entry)}] () mutable
{
processQueueEntry(selected_entry);
};
return std::make_optional<MergeTreeBackgroundJob>(std::move(job), CurrentMetrics::BackgroundPoolTask, PoolType::MERGE_MUTATE);
}
BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask()
{
/// If replication queue is stopped exit immediately as we successfully executed the task

View File

@ -195,6 +195,8 @@ public:
*/
static void dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica, Poco::Logger * logger);
std::optional<MergeTreeBackgroundJob> getDataProcessingJob() override;
private:
/// Get a sequential consistent view of current parts.