diff --git a/src/Storages/MergeTree/BackgroundJobsExecutor.cpp b/src/Storages/MergeTree/BackgroundJobsExecutor.cpp new file mode 100644 index 00000000000..662fa71d318 --- /dev/null +++ b/src/Storages/MergeTree/BackgroundJobsExecutor.cpp @@ -0,0 +1,49 @@ +#include + +namespace DB +{ + +BackgroundJobsExecutor::BackgroundJobsExecutor( + MergeTreeData & data_, + Context & global_context) + : data(data_) + , data_processing_pool(global_context.getSettingsRef().background_pool_size, 0, 10000, false) + , move_pool(global_context.getSettingsRef().background_move_pool_size, 0, 10000, false) +{ + data_processing_task = global_context.getSchedulePool().createTask( + data.getStorageID().getFullTableName() + " (dataProcessingTask)", [this]{ dataProcessingTask(); }); +} + +void BackgroundJobsExecutor::dataProcessingTask() +try +{ + auto job = data.getDataProcessingJob(); + if (job) + data_processing_pool.scheduleOrThrowOnError(*job); + + data_processing_task->schedule(); +} +catch (...) +{ + tryLogCurrentException(__PRETTY_FUNCTION__); +} + +void BackgroundJobsExecutor::start() +{ + if (data_processing_task) + data_processing_task->activateAndSchedule(); +} + +void BackgroundJobsExecutor::triggerDataProcessing() +{ + if (data_processing_task) + data_processing_task->schedule(); +} + +void BackgroundJobsExecutor::finish() +{ + data_processing_task->deactivate(); + data_processing_pool.wait(); +} + +} diff --git a/src/Storages/MergeTree/BackgroundJobsExecutor.h b/src/Storages/MergeTree/BackgroundJobsExecutor.h new file mode 100644 index 00000000000..aa166eb4d73 --- /dev/null +++ b/src/Storages/MergeTree/BackgroundJobsExecutor.h @@ -0,0 +1,34 @@ +#pragma once + +#include +#include +#include +#include + +namespace DB +{ + +class BackgroundJobsExecutor +{ +private: + MergeTreeData & data; + ThreadPool data_processing_pool; + ThreadPool move_pool; + + BackgroundSchedulePool::TaskHolder data_processing_task; + BackgroundSchedulePool::TaskHolder move_processing_task; + + void dataProcessingTask(); + +public: + BackgroundJobsExecutor( + MergeTreeData & data_, + Context & global_context_); + + void triggerDataProcessing(); + void triggerMovesProcessing(); + void start(); + void finish(); +}; + +} diff --git a/src/Storages/MergeTree/MergeTreeBackgroundJob.h b/src/Storages/MergeTree/MergeTreeBackgroundJob.h new file mode 100644 index 00000000000..2f432114db1 --- /dev/null +++ b/src/Storages/MergeTree/MergeTreeBackgroundJob.h @@ -0,0 +1,46 @@ +# pragma once +#include +#include +#include +#include + +namespace DB +{ + +enum PoolType +{ + MERGE_MUTATE, + MOVE, + FETCH, +}; + +struct MergeTreeBackgroundJob +{ + ThreadPool::Job job; + CurrentMetrics::Metric metric; + PoolType execute_in_pool; + + MergeTreeBackgroundJob(ThreadPool::Job && job_, CurrentMetrics::Metric metric_, PoolType execute_in_pool_) + : job(std::move(job_)), metric(metric_), execute_in_pool(execute_in_pool_) + {} + + void operator()() + try + { + if (metric != 0) + { + CurrentMetrics::Increment metric_increment{metric}; + job(); + } + else + { + job(); + } + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } +}; + +} diff --git a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp index 4269abe2655..5ad6a7eebc1 100644 --- a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp @@ -27,11 +27,8 @@ void MergeTreeBlockOutputStream::write(const Block & block) PartLog::addNewPart(storage.global_context, part, watch.elapsed()); - if (storage.merging_mutating_task_handle) - { - /// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'. - storage.merging_mutating_task_handle->signalReadyToRun(); - } + /// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'. + storage.background_executor.triggerDataProcessing(); } } diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 5c18661dad1..8c5333315fc 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -25,6 +25,7 @@ #include #include #include +#include #include #include @@ -710,6 +711,8 @@ public: /// Mutex for currently_moving_parts mutable std::mutex moving_parts_mutex; + virtual std::optional getDataProcessingJob() { return {}; } + protected: friend class IMergeTreeDataPart; diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index fb0a488700c..942bac0d294 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -154,7 +154,6 @@ MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, si { } - UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge() const { size_t busy_threads_in_pool = CurrentMetrics::values[CurrentMetrics::BackgroundPoolTask].load(std::memory_order_relaxed); @@ -166,7 +165,9 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge() const UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSizeForMerge(size_t pool_size, size_t pool_used) const { if (pool_used > pool_size) + { throw Exception("Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size", ErrorCodes::LOGICAL_ERROR); + } size_t free_entries = pool_size - pool_used; const auto data_settings = data.getSettings(); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 45e16e81208..48caf59e7ba 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1287,7 +1287,7 @@ ReplicatedMergeTreeQueue::SelectedEntry ReplicatedMergeTreeQueue::selectEntryToP } if (entry) - return { entry, std::unique_ptr{ new CurrentlyExecuting(entry, *this) } }; + return { entry, std::shared_ptr{ new CurrentlyExecuting(entry, *this) } }; else return {}; } diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 88a61f50225..c72569a5071 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -319,7 +319,7 @@ public: /** Select the next action to process. * merger_mutator is used only to check if the merges are not suspended. */ - using SelectedEntry = std::pair>; + using SelectedEntry = std::pair>; SelectedEntry selectEntryToProcess(MergeTreeDataMergerMutator & merger_mutator, MergeTreeData & data); /** Execute `func` function to handle the action. diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index b68ce9be37b..00ddb7a57ce 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -27,6 +27,10 @@ #include #include +namespace CurrentMetrics +{ + extern const Metric BackgroundPoolTask; +} namespace DB { @@ -73,7 +77,8 @@ StorageMergeTree::StorageMergeTree( attach) , reader(*this) , writer(*this) - , merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads()) + , merger_mutator(*this, global_context.getSettingsRef().background_pool_size) + , background_executor(*this, global_context) { loadDataParts(has_force_restore_data_flag); @@ -100,11 +105,7 @@ void StorageMergeTree::startup() try { - auto & merge_pool = global_context.getBackgroundPool(); - merging_mutating_task_handle = merge_pool.createTask([this] { return mergeMutateTask(); }); - /// Ensure that thread started only after assignment to 'merging_mutating_task_handle' is done. - merge_pool.startTask(merging_mutating_task_handle); - + background_executor.start(); startBackgroundMovesIfNeeded(); } catch (...) @@ -142,8 +143,7 @@ void StorageMergeTree::shutdown() merger_mutator.merges_blocker.cancelForever(); parts_mover.moves_blocker.cancelForever(); - if (merging_mutating_task_handle) - global_context.getBackgroundPool().removeTask(merging_mutating_task_handle); + background_executor.finish(); if (moving_task_handle) global_context.getBackgroundMovePool().removeTask(moving_task_handle); @@ -361,7 +361,7 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, String current_mutations_by_version.emplace(version, insertion.first->second); LOG_INFO(log, "Added mutation: {}", mutation_file_name); - merging_mutating_task_handle->signalReadyToRun(); + background_executor.triggerDataProcessing(); return version; } @@ -591,7 +591,7 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id) } /// Maybe there is another mutation that was blocked by the killed one. Try to execute it immediately. - merging_mutating_task_handle->signalReadyToRun(); + background_executor.triggerDataProcessing(); return CancellationCode::CancelSent; } @@ -712,10 +712,8 @@ std::optional StorageMergeTree::sele return {}; } - merging_tagger = std::make_unique(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, metadata_snapshot, false); - auto table_id = getStorageID(); - merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part); - return MergeMutateSelectedEntry{future_part, std::move(merging_tagger), std::move(merge_entry), {}}; + merging_tagger = std::make_shared(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, metadata_snapshot, false); + return MergeMutateSelectedEntry{future_part, std::move(merging_tagger), {}}; } bool StorageMergeTree::merge( @@ -739,6 +737,9 @@ bool StorageMergeTree::mergeSelectedParts(const StorageMetadataPtr & metadata_sn auto & future_part = merge_mutate_entry.future_part; Stopwatch stopwatch; MutableDataPartPtr new_part; + auto table_id = getStorageID(); + + auto merge_list_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part); auto write_part_log = [&] (const ExecutionStatus & execution_status) { @@ -749,13 +750,13 @@ bool StorageMergeTree::mergeSelectedParts(const StorageMetadataPtr & metadata_sn future_part.name, new_part, future_part.parts, - merge_mutate_entry.merge_entry.get()); + merge_list_entry.get()); }; try { new_part = merger_mutator.mergePartsToTemporaryPart( - future_part, metadata_snapshot, *(merge_mutate_entry.merge_entry), table_lock_holder, time(nullptr), + future_part, metadata_snapshot, *(merge_list_entry), table_lock_holder, time(nullptr), global_context, merge_mutate_entry.tagger->reserved_space, deduplicate); merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr); @@ -868,10 +869,8 @@ std::optional StorageMergeTree::sele future_part.name = part->getNewName(new_part_info); future_part.type = part->getType(); - tagger = std::make_unique(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true); - auto table_id = getStorageID(); - MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part); - return MergeMutateSelectedEntry{future_part, std::move(tagger), std::move(merge_entry), commands}; + tagger = std::make_shared(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true); + return MergeMutateSelectedEntry{future_part, std::move(tagger), commands}; } return {}; } @@ -880,6 +879,9 @@ bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_sn { auto table_lock_holder = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); auto & future_part = merge_mutate_entry.future_part; + auto table_id = getStorageID(); + + auto merge_list_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part); Stopwatch stopwatch; MutableDataPartPtr new_part; @@ -892,13 +894,13 @@ bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_sn future_part.name, new_part, future_part.parts, - merge_mutate_entry.merge_entry.get()); + merge_list_entry.get()); }; try { new_part = merger_mutator.mutatePartToTemporaryPart( - future_part, metadata_snapshot, merge_mutate_entry.commands, *(merge_mutate_entry.merge_entry), + future_part, metadata_snapshot, merge_mutate_entry.commands, *(merge_list_entry), time(nullptr), global_context, merge_mutate_entry.tagger->reserved_space, table_lock_holder); renameTempPartAndReplace(new_part); @@ -916,6 +918,52 @@ bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_sn return true; } +std::optional StorageMergeTree::getDataProcessingJob() +{ + if (shutdown_called) + return {}; + + if (merger_mutator.merges_blocker.isCancelled()) + return {}; + + auto metadata_snapshot = getInMemoryMetadataPtr(); + std::optional merge_entry, mutate_entry; + + merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, nullptr); + if (!merge_entry) + mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr); + + if (merge_entry || mutate_entry) + { + auto job = [this, metadata_snapshot, merge_entry{std::move(merge_entry)}, mutate_entry{std::move(mutate_entry)}] () mutable + { + if (merge_entry) + mergeSelectedParts(metadata_snapshot, false, *merge_entry); + else if (mutate_entry) + mutateSelectedPart(metadata_snapshot, *mutate_entry); + }; + return std::make_optional(std::move(job), CurrentMetrics::BackgroundPoolTask, PoolType::MERGE_MUTATE); + } + else if (auto lock = time_after_previous_cleanup.compareAndRestartDeferred(1)) + { + auto job = [this] () + { + { + auto share_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations); + /// All use relative_data_path which changes during rename + /// so execute under share lock. + clearOldPartsFromFilesystem(); + clearOldTemporaryDirectories(); + clearOldWriteAheadLogs(); + } + clearOldMutations(); + }; + + return std::make_optional(std::move(job), 0, PoolType::MERGE_MUTATE); + } + return {}; +} + BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask() { if (shutdown_called) diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index 519352a0a8a..c028e15416f 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -16,6 +16,7 @@ #include #include #include +#include namespace DB @@ -87,6 +88,7 @@ public: CheckResults checkData(const ASTPtr & query, const Context & context) override; + std::optional getDataProcessingJob() override; private: /// Mutex and condvar for synchronous mutations wait @@ -119,7 +121,7 @@ private: std::atomic shutdown_called {false}; /// Task handler for merges, mutations and moves. - BackgroundProcessingPool::TaskHandle merging_mutating_task_handle; + BackgroundJobsExecutor background_executor; BackgroundProcessingPool::TaskHandle moving_task_handle; void loadMutations(); @@ -142,13 +144,12 @@ private: friend struct CurrentlyMergingPartsTagger; - using CurrentlyMergingPartsTaggerPtr = std::unique_ptr; + using CurrentlyMergingPartsTaggerPtr = std::shared_ptr; struct MergeMutateSelectedEntry { FutureMergedMutatedPart future_part; CurrentlyMergingPartsTaggerPtr tagger; - MergeList::EntryPtr merge_entry; MutationCommands commands; }; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ae30977a9f2..ae7ad4a3518 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -194,7 +194,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( , replica_path(zookeeper_path + "/replicas/" + replica_name) , reader(*this) , writer(*this) - , merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads()) + , merger_mutator(*this, global_context.getSettingsRef().background_pool_size) , queue(*this) , fetcher(*this) , cleanup_thread(*this) @@ -2599,6 +2599,29 @@ bool StorageReplicatedMergeTree::processQueueEntry(ReplicatedMergeTreeQueue::Sel }); } + +std::optional StorageReplicatedMergeTree::getDataProcessingJob() +{ + /// If replication queue is stopped exit immediately as we successfully executed the task + if (queue.actions_blocker.isCancelled()) + return {}; + + /// This object will mark the element of the queue as running. + ReplicatedMergeTreeQueue::SelectedEntry selected_entry = selectQueueEntry(); + + LogEntryPtr & entry = selected_entry.first; + + if (!entry) + return {}; + + auto job = [this, selected_entry{std::move(selected_entry)}] () mutable + { + processQueueEntry(selected_entry); + }; + + return std::make_optional(std::move(job), CurrentMetrics::BackgroundPoolTask, PoolType::MERGE_MUTATE); +} + BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask() { /// If replication queue is stopped exit immediately as we successfully executed the task diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index bb13cd7f230..1b65ffdbc25 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -195,6 +195,8 @@ public: */ static void dropReplica(zkutil::ZooKeeperPtr zookeeper, const String & zookeeper_path, const String & replica, Poco::Logger * logger); + std::optional getDataProcessingJob() override; + private: /// Get a sequential consistent view of current parts.