Refactoring

This commit is contained in:
alesapin 2019-09-02 14:35:53 +03:00
parent 1e8e9b1699
commit 198031f787
6 changed files with 248 additions and 149 deletions

View File

@ -6,15 +6,21 @@
namespace DB
{
namespace ErrorCodes
{
extern const int ABORTED;
}
namespace
{
/// Contains minimal number of heaviest parts, which sum size on disk is greater than required.
/// If there are not enough summary size, than contains all.
class LargestPartsWithRequiredSize
{
struct PartsSizeOnDiskComparator
{
bool operator() (const MergeTreeData::DataPartPtr & f, const MergeTreeData::DataPartPtr & s) const
bool operator()(const MergeTreeData::DataPartPtr & f, const MergeTreeData::DataPartPtr & s) const
{
return f->bytes_on_disk < s->bytes_on_disk;
}
@ -25,10 +31,7 @@ class LargestPartsWithRequiredSize
UInt64 current_size_sum = 0;
public:
LargestPartsWithRequiredSize(UInt64 required_sum_size_)
: required_size_sum(required_sum_size_)
{
}
LargestPartsWithRequiredSize(UInt64 required_sum_size_) : required_size_sum(required_sum_size_) {}
void add(MergeTreeData::DataPartPtr part)
{
@ -100,8 +103,13 @@ bool MergeTreePartsMover::selectPartsToMove(
for (const auto & part : data_parts)
{
if (!can_move(part, nullptr))
String reason;
if (!can_move(part, &reason))
{
LOG_TRACE(log, "Cannot select part '" << part->name << "' to move, becase " << reason);
continue;
}
auto to_insert = need_to_move.find(part->disk);
if (to_insert != need_to_move.end())
to_insert->second.add(part);
@ -132,6 +140,15 @@ MergeTreeData::DataPartsVector MergeTreePartsMover::cloneParts(const MergeTreeMo
MergeTreeData::DataPartsVector res;
for (auto && move : parts)
{
if (moves_blocker.isCancelled())
{
/// Removing all copied parts from disk
for (auto & part : res)
part->remove();
throw Exception("Cancelled moving parts.", ErrorCodes::ABORTED);
}
LOG_TRACE(log, "Cloning part " << move.part->name);
move.part->makeCloneOnDiskDetached(move.reserved_space);
@ -151,22 +168,31 @@ MergeTreeData::DataPartsVector MergeTreePartsMover::cloneParts(const MergeTreeMo
bool MergeTreePartsMover::swapClonedParts(const MergeTreeData::DataPartsVector & cloned_parts, String * out_reason)
{
std::vector<String> failed_parts;
for (auto && cloned_part : cloned_parts)
for (size_t i = 0; i < cloned_parts.size(); ++i)
{
auto part = data.getActiveContainingPart(cloned_part->name);
if (!part || part->name != cloned_part->name)
if (moves_blocker.isCancelled())
{
LOG_INFO(log, "Failed to swap " << cloned_part->name << ". Active part doesn't exist."
/// Removing all copied parts from disk
for (size_t j = i; j < cloned_parts.size(); ++j)
cloned_parts[j]->remove();
throw Exception("Cancelled moving parts.", ErrorCodes::ABORTED);
}
auto part = data.getActiveContainingPart(cloned_parts[i]->name);
if (!part || part->name != cloned_parts[i]->name)
{
LOG_INFO(log, "Failed to swap " << cloned_parts[i]->name << ". Active part doesn't exist."
<< " It can be removed by merge or deleted by hand. Will remove copy on path '"
<< cloned_part->getFullPath() << "'.");
failed_parts.push_back(cloned_part->name);
cloned_part->remove();
<< cloned_parts[i]->getFullPath() << "'.");
failed_parts.push_back(cloned_parts[i]->name);
cloned_parts[i]->remove();
continue;
}
cloned_part->renameTo(part->name);
cloned_parts[i]->renameTo(part->name);
data.swapActivePart(cloned_part);
data.swapActivePart(cloned_parts[i]);
}
if (!failed_parts.empty())

View File

@ -1,14 +1,20 @@
#pragma once
#include <Common/DiskSpaceMonitor.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <functional>
#include <vector>
#include <optional>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Common/ActionBlocker.h>
#include <Common/DiskSpaceMonitor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
struct MargeTreeMoveEntry
{
@ -24,6 +30,44 @@ struct MargeTreeMoveEntry
using MergeTreeMovingParts = std::vector<MargeTreeMoveEntry>;
struct MovingPartsTagger
{
MergeTreeMovingParts parts_to_move;
std::unique_lock<std::mutex> background_lock;
MergeTreeData::DataParts & all_moving_parts;
MovingPartsTagger(MergeTreeMovingParts && moving_parts_,
std::unique_lock<std::mutex> && background_lock_,
MergeTreeData::DataParts & all_moving_data_parts_)
: parts_to_move(std::move(moving_parts_))
, background_lock(std::move(background_lock_))
, all_moving_parts(all_moving_data_parts_)
{
if (!background_lock)
throw Exception("Cannot tag moving parts without background lock.", ErrorCodes::LOGICAL_ERROR);
for (const auto & moving_part : parts_to_move)
if(!all_moving_parts.emplace(moving_part.part).second)
throw Exception("Cannot move part '" + moving_part.part->name + "'. It's already moving.", ErrorCodes::LOGICAL_ERROR);
background_lock.unlock();
}
~MovingPartsTagger()
{
background_lock.lock();
for (const auto & moving_part : parts_to_move)
{
/// Something went completely wrong
if (!all_moving_parts.count(moving_part.part))
std::terminate();
all_moving_parts.erase(moving_part.part);
}
}
};
class MergeTreePartsMover
{
using AllowedMovingPredicate = std::function<bool (const MergeTreeData::DataPartPtr &, String * reason)>;
@ -42,6 +86,9 @@ public:
bool swapClonedParts(const MergeTreeData::DataPartsVector & cloned_parts, String * out_reason);
public:
ActionBlocker moves_blocker;
private:
MergeTreeData & data;

View File

@ -83,8 +83,6 @@ StorageMergeTree::StorageMergeTree(
increment.set(getMaxBlockNumber());
loadMutations();
moving_parts_task = global_context.getSchedulePool().createTask(database_name + "." + table_name + " (StorageMergeTree::movingPartsTask)", [this] { movingPartsTask(); });
}
@ -99,7 +97,6 @@ void StorageMergeTree::startup()
/// NOTE background task will also do the above cleanups periodically.
time_after_previous_cleanup.restart();
background_task_handle = background_pool.addTask([this] { return backgroundTask(); });
moving_parts_task->activateAndSchedule();
}
@ -109,8 +106,12 @@ void StorageMergeTree::shutdown()
return;
shutdown_called = true;
merger_mutator.merges_blocker.cancelForever();
parts_mover.moves_blocker.cancelForever();
if (background_task_handle)
background_pool.removeTask(background_task_handle);
background_task_handle.reset();
}
@ -317,7 +318,7 @@ void StorageMergeTree::alter(
}
/// While exists, marks parts as 'currently_processing_in_background' and reserves free space on filesystem.
/// While exists, marks parts as 'currently_merging_mutating_parts' and reserves free space on filesystem.
struct CurrentlyMergingPartsTagger
{
FutureMergedMutatedPart future_part;
@ -339,10 +340,10 @@ public:
for (const auto & part : future_part.parts)
{
if (storage.currently_processing_in_background.count(part))
if (storage.currently_merging_mutating_parts.count(part))
throw Exception("Tagging already tagged part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
storage.currently_processing_in_background.insert(future_part.parts.begin(), future_part.parts.end());
storage.currently_merging_mutating_parts.insert(future_part.parts.begin(), future_part.parts.end());
}
~CurrentlyMergingPartsTagger()
@ -351,9 +352,9 @@ public:
for (const auto & part : future_part.parts)
{
if (!storage.currently_processing_in_background.count(part))
if (!storage.currently_merging_mutating_parts.count(part))
std::terminate();
storage.currently_processing_in_background.erase(part);
storage.currently_merging_mutating_parts.erase(part);
}
/// Update the information about failed parts in the system.mutations table.
@ -533,7 +534,7 @@ bool StorageMergeTree::merge(
auto can_merge = [this, &lock] (const DataPartPtr & left, const DataPartPtr & right, String *)
{
return !currently_processing_in_background.count(left) && !currently_processing_in_background.count(right)
return !currently_merging_mutating_parts.count(left) && !currently_merging_mutating_parts.count(right)
&& getCurrentMutationVersion(left, lock) == getCurrentMutationVersion(right, lock);
};
@ -641,9 +642,10 @@ void StorageMergeTree::movePartsToSpace(const MergeTreeData::DataPartsVector & p
{
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
MergeTreeMovingParts parts_to_move;
std::optional<MovingPartsTagger> moving_tagger;
{
std::lock_guard background_processing_lock(currently_processing_in_background_mutex);
MergeTreeMovingParts parts_to_move;
std::unique_lock background_processing_lock(currently_processing_in_background_mutex);
for (const auto & part : parts)
{
@ -658,51 +660,68 @@ void StorageMergeTree::movePartsToSpace(const MergeTreeData::DataPartsVector & p
throw Exception("Move is not possible: " + path_to_clone + part->name + " already exists.",
ErrorCodes::DIRECTORY_ALREADY_EXISTS);
if (currently_processing_in_background.count(part))
if (currently_merging_mutating_parts.count(part))
throw Exception("Cannot move part '" + part->name + "' because it's participating in background process.",
ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
parts_to_move.emplace_back(part, std::move(reservation));
}
moving_tagger.emplace(std::move(parts_to_move), std::move(background_processing_lock), currently_moving_parts);
}
std::string reason;
auto cloned_parts = parts_mover.cloneParts(parts_to_move);
auto cloned_parts = parts_mover.cloneParts(moving_tagger->parts_to_move);
if (!parts_mover.swapClonedParts(cloned_parts, &reason))
throw Exception("Move failed. " + reason, ErrorCodes::LOGICAL_ERROR);
}
void StorageMergeTree::movingPartsTask()
bool StorageMergeTree::moveParts()
{
LOG_INFO(log, "TRYING TO MOVE SMS");
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
MergeTreeMovingParts parts_to_move;
std::optional<MovingPartsTagger> moving_tagger;
{
std::lock_guard background_processing_lock(currently_processing_in_background_mutex);
MergeTreeMovingParts parts_to_move;
std::unique_lock background_processing_lock(currently_processing_in_background_mutex);
auto can_move = [this](const DataPartPtr & part, String *) -> bool
auto can_move = [this](const DataPartPtr & part, String * reason) -> bool
{
return !currently_processing_in_background.count(part);
if (currently_merging_mutating_parts.count(part))
{
*reason = "part is already assigned to merge or mutation.";
return false;
}
if (currently_moving_parts.count(part))
{
*reason = "part is already moving.";
return false;
}
return true;
};
if (!parts_mover.selectPartsToMove(parts_to_move, can_move))
{
moving_parts_task->scheduleAfter(1 * 1000);
return;
}
}
LOG_INFO(log, "Found " << parts_to_move.size() << " parts to move.");
return false;
auto cloned_parts = parts_mover.cloneParts(parts_to_move);
LOG_INFO(log, "Found " << parts_to_move.size() << " parts to move.");
moving_tagger.emplace(std::move(parts_to_move), std::move(background_processing_lock), currently_moving_parts);
}
auto cloned_parts = parts_mover.cloneParts(moving_tagger->parts_to_move);
std::string reason;
if (!parts_mover.swapClonedParts(cloned_parts, &reason))
{
LOG_WARNING(log, "Move failed: " << reason);
return false;
}
moving_parts_task->scheduleAfter(1 * 1000);
return true;
}
@ -726,7 +745,7 @@ bool StorageMergeTree::tryMutatePart()
auto mutations_end_it = current_mutations_by_version.end();
for (const auto & part : getDataPartsVector())
{
if (currently_processing_in_background.count(part))
if (currently_merging_mutating_parts.count(part))
continue;
auto mutations_begin_it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
@ -828,8 +847,8 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::backgroundTask()
if (shutdown_called)
return BackgroundProcessingPoolTaskResult::ERROR;
if (merger_mutator.merges_blocker.isCancelled())
return BackgroundProcessingPoolTaskResult::ERROR;
bool merges_mutations_blocked = merger_mutator.merges_blocker.isCancelled();
bool moves_blocked = parts_mover.moves_blocker.isCancelled();
try
{
@ -846,10 +865,13 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::backgroundTask()
}
///TODO: read deduplicate option from table config
if (merge(false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/))
if (!merges_mutations_blocked && merge(false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/))
return BackgroundProcessingPoolTaskResult::SUCCESS;
if (tryMutatePart())
if (!moves_blocked && moveParts())
return BackgroundProcessingPoolTaskResult::SUCCESS;
if (!merges_mutations_blocked && tryMutatePart())
return BackgroundProcessingPoolTaskResult::SUCCESS;
else
return BackgroundProcessingPoolTaskResult::ERROR;
@ -1156,52 +1178,6 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par
PartsTemporaryRename renamed_parts(*this, "detached/");
MutableDataPartsVector loaded_parts = tryLoadPartsToAttach(partition, attach_part, context, renamed_parts);
//String source_dir = "detached/";
//std::map<String, DiskSpace::DiskPtr> name_to_disk;
///// Let's make a list of parts to add.
//Strings parts;
//if (attach_part)
//{
// parts.push_back(partition_id);
//}
//else
//{
// LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir);
// ActiveDataPartSet active_parts(format_version);
// const auto disks = storage_policy->getDisks();
// for (const DiskSpace::DiskPtr & disk : disks)
// {
// const auto full_path = getFullPathOnDisk(disk);
// for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
// {
// const String & name = it.name();
// MergeTreePartInfo part_info;
// if (!MergeTreePartInfo::tryParsePartName(name, &part_info, format_version)
// || part_info.partition_id != partition_id)
// {
// continue;
// }
// LOG_DEBUG(log, "Found part " << name);
// active_parts.add(name);
// name_to_disk[name] = disk;
// }
// }
// LOG_DEBUG(log, active_parts.size() << " of them are active");
// parts = active_parts.getParts();
//}
//for (const auto & source_part_name : parts)
//{
// const auto & source_part_disk = name_to_disk[source_part_name];
// LOG_DEBUG(log, "Checking data");
// MergeTreeData::MutableDataPartPtr part = loadPartAndFixMetadata(source_part_disk, source_dir + source_part_name);
// LOG_INFO(log, "Attaching part " << source_part_name << " from " << getFullPathOnDisk(source_part_disk));
// renameTempPartAndAdd(part, &increment);
for (size_t i = 0; i < loaded_parts.size(); ++i)
{
LOG_INFO(log, "Attaching part " << loaded_parts[i]->name << " from " << renamed_parts.old_and_new_names[i].second);

View File

@ -84,19 +84,30 @@ private:
/// For clearOldParts, clearOldTemporaryDirectories.
AtomicStopwatch time_after_previous_cleanup;
/// Mutex for parts currently processing in background
/// merging (also with TTL), mutating or moving.
mutable std::mutex currently_processing_in_background_mutex;
DataParts currently_processing_in_background;
/// Parts that currently participate in merge or mutation.
/// This set have to be used with `currently_processing_in_background_mutex`.
DataParts currently_merging_mutating_parts;
/// Parts that currently moving from disk/volume to another.
/// This set have to be used with `currently_processing_in_background_mutex`.
/// Moving may conflict with merges and mutations, but this is OK, because
/// if we decide to move some part to another disk, than we
/// assuredly will choose this disk for containing part, which will appear
/// as result of merge or mutation.
DataParts currently_moving_parts;
std::map<String, MergeTreeMutationEntry> current_mutations_by_id;
std::multimap<Int64, MergeTreeMutationEntry &> current_mutations_by_version;
std::atomic<bool> shutdown_called {false};
/// Task handler for merges, mutations and moves.
BackgroundProcessingPool::TaskHandle background_task_handle;
/// A task which move parts to another disks/volumes
/// executes in background schedule pool
BackgroundSchedulePool::TaskHolder moving_parts_task;
std::vector<MergeTreeData::AlterDataPartTransactionPtr> prepareAlterTransactions(
const ColumnsDescription & new_columns, const IndicesDescription & new_indices, const Context & context);
@ -108,7 +119,7 @@ private:
*/
bool merge(bool aggressive, const String & partition_id, bool final, bool deduplicate, String * out_disable_reason = nullptr);
void movingPartsTask();
bool moveParts();
/// Try and find a single part to mutate and mutate it. If some part was successfully mutated, return true.
bool tryMutatePart();

View File

@ -126,7 +126,6 @@ namespace ActionLocks
static const auto QUEUE_UPDATE_ERROR_SLEEP_MS = 1 * 1000;
static const auto MERGE_SELECTING_SLEEP_MS = 5 * 1000;
static const auto MUTATIONS_FINALIZING_SLEEP_MS = 1 * 1000;
static const auto MOVE_PARTS_SLEEP_MS = 1 * 1000;
/** There are three places for each part, where it should be
* 1. In the RAM, data_parts, all_data_parts.
@ -234,9 +233,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
mutations_finalizing_task = global_context.getSchedulePool().createTask(database_name + "." + table_name + " (StorageReplicatedMergeTree::mutationsFinalizingTask)", [this] { mutationsFinalizingTask(); });
moving_parts_task = global_context.getSchedulePool().createTask(database_name + "." + table_name + " (StorageReplicatedMergeTree::movingPartsTask)", [this] { movingPartsTask(); });
moving_parts_task->activateAndSchedule();
if (global_context.hasZooKeeper())
current_zookeeper = global_context.getZooKeeper();
@ -2208,34 +2204,62 @@ BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask()
return need_sleep ? BackgroundProcessingPoolTaskResult::ERROR : BackgroundProcessingPoolTaskResult::SUCCESS;
}
void StorageReplicatedMergeTree::movingPartsTask()
BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::movingPartsTask()
{
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
MergeTreeMovingParts parts_to_move;
auto can_move = [this](const DataPartPtr & part, String *) -> bool
{
return !queue.isPartAssignedToBackgroundOperation(part);
};
if (!parts_mover.selectPartsToMove(parts_to_move, can_move))
try
{
moving_parts_task->scheduleAfter(MOVE_PARTS_SLEEP_MS);
return;
std::optional<MovingPartsTagger> moving_tagger;
{
MergeTreeMovingParts parts_to_move;
std::unique_lock moving_parts_lock(moving_parts_mutex);
auto can_move = [this](const DataPartPtr & part, String * reason) -> bool
{
if (queue.isPartAssignedToBackgroundOperation(part))
{
*reason = "part already assigned to replicated background operation.";
return false;
}
if (!currently_moving_parts.count(part))
{
*reason = "part is already moving.";
return false;
}
return true;
};
if (!parts_mover.selectPartsToMove(parts_to_move, can_move))
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
LOG_INFO(log, "Found " << parts_to_move.size() << " parts to move.");
moving_tagger.emplace(std::move(parts_to_move), std::move(moving_parts_lock), currently_moving_parts);
}
auto cloned_parts = parts_mover.cloneParts(moving_tagger->parts_to_move);
std::string reason;
if (!parts_mover.swapClonedParts(cloned_parts, &reason))
{
LOG_INFO(log, "Move failed. " << reason);
return BackgroundProcessingPoolTaskResult::ERROR;
}
return BackgroundProcessingPoolTaskResult::SUCCESS;
}
LOG_INFO(log, "Found " << parts_to_move.size() << " parts to move.");
auto cloned_parts = parts_mover.cloneParts(parts_to_move);
std::string reason;
if (!parts_mover.swapClonedParts(cloned_parts, &reason))
catch (const Exception & e)
{
LOG_INFO(log, "Move failed. " << reason);
moving_parts_task->scheduleAfter(MOVE_PARTS_SLEEP_MS);
if (e.code() == ErrorCodes::ABORTED)
{
LOG_INFO(log, e.message());
return BackgroundProcessingPoolTaskResult::ERROR;
}
throw;
}
else
moving_parts_task->schedule();
}
@ -2243,30 +2267,37 @@ void StorageReplicatedMergeTree::movePartsToSpace(const MergeTreeData::DataParts
{
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
MergeTreeMovingParts parts_to_move;
for (const auto & part : parts)
std::optional<MovingPartsTagger> moving_tagger;
{
auto reservation = space->reserve(part->bytes_on_disk);
if (!reservation)
throw Exception("Move is not possible. Not enough space " + space->getName() + ".", ErrorCodes::NOT_ENOUGH_SPACE);
MergeTreeMovingParts parts_to_move;
std::unique_lock moving_parts_lock(moving_parts_mutex);
for (const auto & part : parts)
{
auto reservation = space->reserve(part->bytes_on_disk);
if (!reservation)
throw Exception("Move is not possible. Not enough space " + space->getName() + ".", ErrorCodes::NOT_ENOUGH_SPACE);
auto & reserved_disk = reservation->getDisk();
String path_to_clone = getFullPathOnDisk(reserved_disk);
auto & reserved_disk = reservation->getDisk();
String path_to_clone = getFullPathOnDisk(reserved_disk);
if (Poco::File(path_to_clone + part->name).exists())
throw Exception("Move is not possible: " + path_to_clone + part->name + " already exists.",
ErrorCodes::DIRECTORY_ALREADY_EXISTS);
if (Poco::File(path_to_clone + part->name).exists())
throw Exception(
"Move is not possible: " + path_to_clone + part->name + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
if (queue.isPartAssignedToBackgroundOperation(part))
throw Exception("Cannot move part '" + part->name + "' because it's participating in background process.",
ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
if (queue.isPartAssignedToBackgroundOperation(part))
throw Exception(
"Cannot move part '" + part->name + "' because it's participating in background process.",
ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
parts_to_move.emplace_back(part, std::move(reservation));
parts_to_move.emplace_back(part, std::move(reservation));
}
moving_tagger.emplace(std::move(parts_to_move), std::move(moving_parts_lock), currently_moving_parts);
}
std::string reason;
auto cloned_parts = parts_mover.cloneParts(parts_to_move);
auto cloned_parts = parts_mover.cloneParts(moving_tagger->parts_to_move);
std::string reason;
if (!parts_mover.swapClonedParts(cloned_parts, &reason))
throw Exception("Move failed. " + reason, ErrorCodes::LOGICAL_ERROR);
}
@ -2973,6 +3004,7 @@ void StorageReplicatedMergeTree::startup()
data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint, global_context.getInterserverIOHandler());
queue_task_handle = global_context.getBackgroundPool().addTask([this] { return queueTask(); });
move_parts_task_handle = global_context.getBackgroundPool().addTask([this] { return movingPartsTask(); } );
/// In this thread replica will be activated.
restarting_thread.start();
@ -2987,6 +3019,7 @@ void StorageReplicatedMergeTree::shutdown()
/// Cancel fetches, merges and mutations to force the queue_task to finish ASAP.
fetcher.blocker.cancelForever();
merger_mutator.merges_blocker.cancelForever();
parts_mover.moves_blocker.cancelForever();
restarting_thread.shutdown();
@ -2994,6 +3027,10 @@ void StorageReplicatedMergeTree::shutdown()
global_context.getBackgroundPool().removeTask(queue_task_handle);
queue_task_handle.reset();
if (move_parts_task_handle)
global_context.getBackgroundPool().removeTask(move_parts_task_handle);
move_parts_task_handle.reset();
if (data_parts_exchange_endpoint_holder)
{
data_parts_exchange_endpoint_holder->getBlocker().cancelForever();

View File

@ -264,6 +264,8 @@ private:
/// A task that performs actions from the queue.
BackgroundProcessingPool::TaskHandle queue_task_handle;
/// A task which move parts to another disks/volumes
/// Transparent for replication.
BackgroundProcessingPool::TaskHandle move_parts_task_handle;
/// A task that selects parts to merge.
@ -274,10 +276,6 @@ private:
/// A task that marks finished mutations as done.
BackgroundSchedulePool::TaskHolder mutations_finalizing_task;
/// A task which move parts to another disks/volumes
/// Transparent for replication.
BackgroundSchedulePool::TaskHolder moving_parts_task;
/// A thread that removes old parts, log entries, and blocks.
ReplicatedMergeTreeCleanupThread cleanup_thread;
@ -412,9 +410,9 @@ private:
*/
BackgroundProcessingPoolTaskResult queueTask();
/// Perform moves of parts to another disks
/// No log entry, because moves are not replicated
void movingPartsTask();
/// Perform moves of parts to another disks.
/// Local operation, doesn't interact with replicationg queue.
BackgroundProcessingPoolTaskResult movingPartsTask();
/// Postcondition:
@ -473,6 +471,10 @@ private:
std::unordered_set<String> currently_fetching_parts;
std::mutex currently_fetching_parts_mutex;
///
DataParts currently_moving_parts;
std::mutex moving_parts_mutex;
/// With the quorum being tracked, add a replica to the quorum for the part.
void updateQuorum(const String & part_name);