This commit is contained in:
Nikita Mikhaylov 2021-09-06 12:01:16 +00:00
parent 0249015515
commit ea0fbf81af
14 changed files with 41 additions and 41 deletions

View File

@ -13,7 +13,6 @@
#include <Poco/Event.h> #include <Poco/Event.h>
#include <Common/ThreadStatus.h> #include <Common/ThreadStatus.h>
#include <Common/PriorityQueue.h>
#include <common/scope_guard.h> #include <common/scope_guard.h>
/** Very simple thread pool similar to boost::threadpool. /** Very simple thread pool similar to boost::threadpool.

View File

@ -77,7 +77,7 @@
#include <Common/RemoteHostFilter.h> #include <Common/RemoteHostFilter.h>
#include <Interpreters/DatabaseCatalog.h> #include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/JIT/CompiledExpressionCache.h> #include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Storages/MergeTree/BackgroundJobsExecutor.h> #include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h> #include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Interpreters/SynonymsExtensions.h> #include <Interpreters/SynonymsExtensions.h>
#include <Interpreters/Lemmatizers.h> #include <Interpreters/Lemmatizers.h>

View File

@ -1,4 +1,4 @@
#include <Storages/MergeTree/BackgroundJobsExecutor.h> #include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/randomSeed.h> #include <Common/randomSeed.h>
@ -8,7 +8,7 @@
namespace DB namespace DB
{ {
BackgroundJobAssignee::BackgroundJobAssignee(MergeTreeData & data_, BackgroundJobAssignee::Type type_, ContextPtr global_context_) BackgroundJobsAssignee::BackgroundJobsAssignee(MergeTreeData & data_, BackgroundJobsAssignee::Type type_, ContextPtr global_context_)
: WithContext(global_context_) : WithContext(global_context_)
, data(data_) , data(data_)
, sleep_settings(global_context_->getBackgroundMoveTaskSchedulingSettings()) , sleep_settings(global_context_->getBackgroundMoveTaskSchedulingSettings())
@ -17,7 +17,7 @@ BackgroundJobAssignee::BackgroundJobAssignee(MergeTreeData & data_, BackgroundJo
{ {
} }
void BackgroundJobAssignee::trigger() void BackgroundJobsAssignee::trigger()
{ {
std::lock_guard lock(holder_mutex); std::lock_guard lock(holder_mutex);
@ -29,7 +29,7 @@ void BackgroundJobAssignee::trigger()
holder->schedule(); holder->schedule();
} }
void BackgroundJobAssignee::postpone() void BackgroundJobsAssignee::postpone()
{ {
std::lock_guard lock(holder_mutex); std::lock_guard lock(holder_mutex);
@ -48,28 +48,28 @@ void BackgroundJobAssignee::postpone()
} }
void BackgroundJobAssignee::scheduleMergeMutateTask(ExecutableTaskPtr merge_task) void BackgroundJobsAssignee::scheduleMergeMutateTask(ExecutableTaskPtr merge_task)
{ {
bool res = getContext()->getMergeMutateExecutor()->trySchedule(merge_task); bool res = getContext()->getMergeMutateExecutor()->trySchedule(merge_task);
res ? trigger() : postpone(); res ? trigger() : postpone();
} }
void BackgroundJobAssignee::scheduleFetchTask(ExecutableTaskPtr fetch_task) void BackgroundJobsAssignee::scheduleFetchTask(ExecutableTaskPtr fetch_task)
{ {
bool res = getContext()->getFetchesExecutor()->trySchedule(fetch_task); bool res = getContext()->getFetchesExecutor()->trySchedule(fetch_task);
res ? trigger() : postpone(); res ? trigger() : postpone();
} }
void BackgroundJobAssignee::scheduleMoveTask(ExecutableTaskPtr move_task) void BackgroundJobsAssignee::scheduleMoveTask(ExecutableTaskPtr move_task)
{ {
bool res = getContext()->getMovesExecutor()->trySchedule(move_task); bool res = getContext()->getMovesExecutor()->trySchedule(move_task);
res ? trigger() : postpone(); res ? trigger() : postpone();
} }
String BackgroundJobAssignee::toString(Type type) String BackgroundJobsAssignee::toString(Type type)
{ {
switch (type) switch (type)
{ {
@ -80,16 +80,16 @@ String BackgroundJobAssignee::toString(Type type)
} }
} }
void BackgroundJobAssignee::start() void BackgroundJobsAssignee::start()
{ {
std::lock_guard lock(holder_mutex); std::lock_guard lock(holder_mutex);
if (!holder) if (!holder)
holder = getContext()->getSchedulePool().createTask("BackgroundJobAssignee:" + toString(type), [this]{ main(); }); holder = getContext()->getSchedulePool().createTask("BackgroundJobsAssignee:" + toString(type), [this]{ main(); });
holder->activateAndSchedule(); holder->activateAndSchedule();
} }
void BackgroundJobAssignee::finish() void BackgroundJobsAssignee::finish()
{ {
/// No lock here, because scheduled tasks could call trigger method /// No lock here, because scheduled tasks could call trigger method
if (holder) if (holder)
@ -105,7 +105,7 @@ void BackgroundJobAssignee::finish()
} }
void BackgroundJobAssignee::main() void BackgroundJobsAssignee::main()
try try
{ {
bool succeed = false; bool succeed = false;
@ -128,7 +128,7 @@ catch (...) /// Catch any exception to avoid thread termination.
postpone(); postpone();
} }
BackgroundJobAssignee::~BackgroundJobAssignee() BackgroundJobsAssignee::~BackgroundJobsAssignee()
{ {
try try
{ {

View File

@ -1,6 +1,6 @@
#pragma once #pragma once
#include <Storages/MergeTree/MergeMutateExecutor.h> #include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Core/BackgroundSchedulePool.h> #include <Core/BackgroundSchedulePool.h>
#include <pcg_random.hpp> #include <pcg_random.hpp>
@ -29,7 +29,7 @@ struct ExecutableTaskSchedulingSettings
class MergeTreeData; class MergeTreeData;
class BackgroundJobAssignee : protected WithContext class BackgroundJobsAssignee : protected WithContext
{ {
private: private:
MergeTreeData & data; MergeTreeData & data;
@ -66,9 +66,9 @@ public:
void scheduleMoveTask(ExecutableTaskPtr move_task); void scheduleMoveTask(ExecutableTaskPtr move_task);
/// Just call finish /// Just call finish
virtual ~BackgroundJobAssignee(); virtual ~BackgroundJobsAssignee();
BackgroundJobAssignee( BackgroundJobsAssignee(
MergeTreeData & data_, MergeTreeData & data_,
Type type, Type type,
ContextPtr global_context_); ContextPtr global_context_);

View File

@ -1,7 +1,7 @@
#include <Storages/MergeTree/MergeMutateExecutor.h> #include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
#include <Common/setThreadName.h> #include <Common/setThreadName.h>
#include <Storages/MergeTree/BackgroundJobsExecutor.h> #include <Storages/MergeTree/BackgroundJobsAssignee.h>
namespace DB namespace DB
@ -57,7 +57,7 @@ void MergeTreeBackgroundExecutor::wait()
if (scheduler.joinable()) if (scheduler.joinable())
scheduler.join(); scheduler.join();
pool.wait(); /// ThreadPool will be finalized in destructor.
} }
@ -115,7 +115,7 @@ void MergeTreeBackgroundExecutor::removeTasksCorrespondingToStorage(StorageID id
/// Erase storage related tasks from pending and select active tasks to wait for /// Erase storage related tasks from pending and select active tasks to wait for
auto it = std::remove_if(pending.begin(), pending.end(), auto it = std::remove_if(pending.begin(), pending.end(),
[&] (auto item) -> bool { return item->task->getStorageID() == id; } ); [&] (auto item) -> bool { return item->task->getStorageID() == id; });
pending.erase(it, pending.end()); pending.erase(it, pending.end());
/// Copy items to wait for their completion /// Copy items to wait for their completion
@ -179,7 +179,7 @@ void MergeTreeBackgroundExecutor::routine(ItemPtr item)
/// But it is rather safe, because we have try...catch block here, and another one in ThreadPool. /// But it is rather safe, because we have try...catch block here, and another one in ThreadPool.
item->task->onCompleted(); item->task->onCompleted();
} }
catch(...) catch (...)
{ {
std::lock_guard guard(mutex); std::lock_guard guard(mutex);
erase_from_active(); erase_from_active();

View File

@ -35,7 +35,7 @@ namespace DB
* *
* Due to all caveats I described above we use boost::circular_buffer as a container for queues. * Due to all caveats I described above we use boost::circular_buffer as a container for queues.
* *
* Another nuisance that we faces with is than backgroud operations always interacts with an associated Storage. * Another nuisance that we faces with is than background operations always interact with an associated Storage.
* So, when a Storage want to shutdown, it must wait until all its background operaions are finished. * So, when a Storage want to shutdown, it must wait until all its background operaions are finished.
*/ */
class MergeTreeBackgroundExecutor : public shared_ptr_helper<MergeTreeBackgroundExecutor> class MergeTreeBackgroundExecutor : public shared_ptr_helper<MergeTreeBackgroundExecutor>

View File

@ -200,8 +200,8 @@ MergeTreeData::MergeTreeData(
, data_parts_by_info(data_parts_indexes.get<TagByInfo>()) , data_parts_by_info(data_parts_indexes.get<TagByInfo>())
, data_parts_by_state_and_info(data_parts_indexes.get<TagByStateAndInfo>()) , data_parts_by_state_and_info(data_parts_indexes.get<TagByStateAndInfo>())
, parts_mover(this) , parts_mover(this)
, background_executor(*this, BackgroundJobAssignee::Type::DataProcessing, getContext()) , background_executor(*this, BackgroundJobsAssignee::Type::DataProcessing, getContext())
, background_moves_executor(*this, BackgroundJobAssignee::Type::Moving, getContext()) , background_moves_executor(*this, BackgroundJobsAssignee::Type::Moving, getContext())
{ {
const auto settings = getSettings(); const auto settings = getSettings();
allow_nullable_key = attach || settings->allow_nullable_key; allow_nullable_key = attach || settings->allow_nullable_key;
@ -5029,7 +5029,7 @@ MergeTreeData::CurrentlyMovingPartsTagger::~CurrentlyMovingPartsTagger()
} }
} }
bool MergeTreeData::scheduleDataMovingJob(BackgroundJobAssignee & executor) bool MergeTreeData::scheduleDataMovingJob(BackgroundJobsAssignee & executor)
{ {
if (parts_mover.moves_blocker.isCancelled()) if (parts_mover.moves_blocker.isCancelled())
return false; return false;

View File

@ -3,7 +3,7 @@
#include <Common/SimpleIncrement.h> #include <Common/SimpleIncrement.h>
#include <Common/MultiVersion.h> #include <Common/MultiVersion.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <Storages/MergeTree/BackgroundJobsExecutor.h> #include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Storages/MergeTree/MergeTreeIndices.h> #include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreePartInfo.h> #include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreeSettings.h> #include <Storages/MergeTree/MergeTreeSettings.h>
@ -827,9 +827,9 @@ public:
PinnedPartUUIDsPtr getPinnedPartUUIDs() const; PinnedPartUUIDsPtr getPinnedPartUUIDs() const;
/// Schedules background job to like merge/mutate/fetch an executor /// Schedules background job to like merge/mutate/fetch an executor
virtual bool scheduleDataProcessingJob(BackgroundJobAssignee & executor) = 0; virtual bool scheduleDataProcessingJob(BackgroundJobsAssignee & executor) = 0;
/// Schedules job to move parts between disks/volumes and so on. /// Schedules job to move parts between disks/volumes and so on.
bool scheduleDataMovingJob(BackgroundJobAssignee & executor); bool scheduleDataMovingJob(BackgroundJobsAssignee & executor);
bool areBackgroundMovesNeeded() const; bool areBackgroundMovesNeeded() const;
/// Lock part in zookeeper for shared data in several nodes /// Lock part in zookeeper for shared data in several nodes
@ -925,8 +925,8 @@ protected:
/// Executors are common for both ReplicatedMergeTree and plain MergeTree /// Executors are common for both ReplicatedMergeTree and plain MergeTree
/// but they are being started and finished in derived classes, so let them be protected. /// but they are being started and finished in derived classes, so let them be protected.
BackgroundJobAssignee background_executor; BackgroundJobsAssignee background_executor;
BackgroundJobAssignee background_moves_executor; BackgroundJobsAssignee background_moves_executor;
/// Every task that is finished will ask to assign a new one into an executor. /// Every task that is finished will ask to assign a new one into an executor.
std::function<void(bool)> common_assignee_trigger; std::function<void(bool)> common_assignee_trigger;

View File

@ -6,7 +6,7 @@
#include <random> #include <random>
#include <Storages/MergeTree/ExecutableTask.h> #include <Storages/MergeTree/ExecutableTask.h>
#include <Storages/MergeTree/MergeMutateExecutor.h> #include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
using namespace DB; using namespace DB;

View File

@ -1041,7 +1041,7 @@ bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_sn
return true; return true;
} }
bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobAssignee & executor) //-V657 bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & executor) //-V657
{ {
if (shutdown_called) if (shutdown_called)
return false; return false;

View File

@ -95,7 +95,7 @@ public:
RestoreDataTasks restoreFromBackup(const BackupPtr & backup, const String & data_path_in_backup, const ASTs & partitions, ContextMutablePtr context) override; RestoreDataTasks restoreFromBackup(const BackupPtr & backup, const String & data_path_in_backup, const ASTs & partitions, ContextMutablePtr context) override;
bool scheduleDataProcessingJob(BackgroundJobAssignee & executor) override; bool scheduleDataProcessingJob(BackgroundJobsAssignee & executor) override;
MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); } MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); }

View File

@ -17,7 +17,7 @@
#include <Storages/StorageReplicatedMergeTree.h> #include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h> #include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeList.h> #include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MergeMutateExecutor.h> #include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h> #include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/PinnedPartUUIDs.h> #include <Storages/MergeTree/PinnedPartUUIDs.h>
#include <Storages/MergeTree/PartitionPruner.h> #include <Storages/MergeTree/PartitionPruner.h>
@ -3173,7 +3173,7 @@ bool StorageReplicatedMergeTree::processQueueEntry(ReplicatedMergeTreeQueue::Sel
}); });
} }
bool StorageReplicatedMergeTree::scheduleDataProcessingJob(BackgroundJobAssignee & executor) bool StorageReplicatedMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & executor)
{ {
/// If replication queue is stopped exit immediately as we successfully executed the task /// If replication queue is stopped exit immediately as we successfully executed the task
if (queue.actions_blocker.isCancelled()) if (queue.actions_blocker.isCancelled())

View File

@ -29,7 +29,7 @@
#include <Common/Throttler.h> #include <Common/Throttler.h>
#include <Core/BackgroundSchedulePool.h> #include <Core/BackgroundSchedulePool.h>
#include <Processors/Pipe.h> #include <Processors/Pipe.h>
#include <Storages/MergeTree/BackgroundJobsExecutor.h> #include <Storages/MergeTree/BackgroundJobsAssignee.h>
namespace DB namespace DB
@ -218,7 +218,7 @@ public:
const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger); const zkutil::EphemeralNodeHolder::Ptr & metadata_drop_lock, Poco::Logger * logger);
/// Schedules job to execute in background pool (merge, mutate, drop range and so on) /// Schedules job to execute in background pool (merge, mutate, drop range and so on)
bool scheduleDataProcessingJob(BackgroundJobAssignee & executor) override; bool scheduleDataProcessingJob(BackgroundJobsAssignee & executor) override;
/// Checks that fetches are not disabled with action blocker and pool for fetches /// Checks that fetches are not disabled with action blocker and pool for fetches
/// is not overloaded /// is not overloaded

View File

@ -27,7 +27,7 @@ SRCS(
MemorySettings.cpp MemorySettings.cpp
MergeTree/ActiveDataPartSet.cpp MergeTree/ActiveDataPartSet.cpp
MergeTree/AllMergeSelector.cpp MergeTree/AllMergeSelector.cpp
MergeTree/BackgroundJobsExecutor.cpp MergeTree/BackgroundJobsAssignee.cpp
MergeTree/BoolMask.cpp MergeTree/BoolMask.cpp
MergeTree/DataPartsExchange.cpp MergeTree/DataPartsExchange.cpp
MergeTree/DropPartsRanges.cpp MergeTree/DropPartsRanges.cpp
@ -40,6 +40,7 @@ SRCS(
MergeTree/LevelMergeSelector.cpp MergeTree/LevelMergeSelector.cpp
MergeTree/MergeAlgorithm.cpp MergeTree/MergeAlgorithm.cpp
MergeTree/MergeList.cpp MergeTree/MergeList.cpp
MergeTree/MergeTreeBackgroundExecutor.cpp
MergeTree/MergeTreeBaseSelectProcessor.cpp MergeTree/MergeTreeBaseSelectProcessor.cpp
MergeTree/MergeTreeBlockReadUtils.cpp MergeTree/MergeTreeBlockReadUtils.cpp
MergeTree/MergeTreeData.cpp MergeTree/MergeTreeData.cpp