Simplify code

This commit is contained in:
alesapin 2019-09-05 16:12:29 +03:00
parent b71faee8cd
commit f576cbb8a4
10 changed files with 232 additions and 368 deletions

View File

@ -27,8 +27,8 @@ void MergeTreeBlockOutputStream::write(const Block & block)
PartLog::addNewPart(storage.global_context, part, watch.elapsed());
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.
if (storage.background_task_handle)
storage.background_task_handle->wake();
if (storage.merging_mutating_task_handle)
storage.merging_mutating_task_handle->wake();
}
}

View File

@ -92,6 +92,7 @@ namespace ErrorCodes
extern const int BAD_DATA_PART_NAME;
extern const int UNKNOWN_SETTING;
extern const int READONLY_SETTING;
extern const int ABORTED;
}
@ -128,6 +129,7 @@ MergeTreeData::MergeTreeData(
, storage_policy(context_.getStoragePolicy(getSettings()->storage_policy_name))
, data_parts_by_info(data_parts_indexes.get<TagByInfo>())
, data_parts_by_state_and_info(data_parts_indexes.get<TagByStateAndInfo>())
, parts_mover(this)
{
const auto settings = getSettings();
setProperties(order_by_ast_, primary_key_ast_, columns_, indices_, constraints_);
@ -2723,7 +2725,8 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String &
throw Exception("Part " + part->name + " already on disk " + name, ErrorCodes::UNKNOWN_DISK);
}
movePartsToSpace(parts, std::static_pointer_cast<const DiskSpace::Space>(disk));
if (!movePartsToSpace(&parts, std::static_pointer_cast<const DiskSpace::Space>(disk)))
throw Exception("Cannot move parts because moves are manually disabled.", ErrorCodes::ABORTED);
}
@ -2756,7 +2759,8 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
if (part->disk->getName() == disk->getName())
throw Exception("Part " + part->name + " already on volume " + name, ErrorCodes::UNKNOWN_DISK);
movePartsToSpace(parts, std::static_pointer_cast<const DiskSpace::Space>(volume));
if (!movePartsToSpace(&parts, std::static_pointer_cast<const DiskSpace::Space>(volume)))
throw Exception("Cannot move parts because moves are manually disabled.", ErrorCodes::ABORTED);
}
@ -3383,5 +3387,144 @@ catch (...)
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
namespace
{
struct MovingPartsTagger
{
MergeTreeMovingParts parts_to_move;
std::unique_lock<std::mutex> background_lock;
MergeTreeData::DataParts & all_moving_parts;
MovingPartsTagger(MergeTreeMovingParts && moving_parts_,
std::unique_lock<std::mutex> && background_lock_,
MergeTreeData::DataParts & all_moving_data_parts_)
: parts_to_move(std::move(moving_parts_))
, background_lock(std::move(background_lock_))
, all_moving_parts(all_moving_data_parts_)
{
if (!background_lock)
throw Exception("Cannot tag moving parts without background lock.", ErrorCodes::LOGICAL_ERROR);
for (const auto & moving_part : parts_to_move)
if (!all_moving_parts.emplace(moving_part.part).second)
throw Exception("Cannot move part '" + moving_part.part->name + "'. It's already moving.", ErrorCodes::LOGICAL_ERROR);
background_lock.unlock();
}
~MovingPartsTagger()
{
background_lock.lock();
for (const auto & moving_part : parts_to_move)
{
/// Something went completely wrong
if (!all_moving_parts.count(moving_part.part))
std::terminate();
all_moving_parts.erase(moving_part.part);
}
}
};
}
bool MergeTreeData::movePartsToSpace(const DataPartsVector * parts, DiskSpace::SpacePtr space)
{
if (parts_mover.moves_blocker.isCancelled())
return false;
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
std::optional<MovingPartsTagger> moving_tagger;
{
MergeTreeMovingParts parts_to_move;
std::unique_lock moving_parts_lock(moving_parts_mutex);
if (parts != nullptr)
{
for (const auto & part : *parts)
{
auto reservation = space->reserve(part->bytes_on_disk);
if (!reservation)
throw Exception("Move is not possible. Not enough space " + space->getName() + ".", ErrorCodes::NOT_ENOUGH_SPACE);
auto & reserved_disk = reservation->getDisk();
String path_to_clone = getFullPathOnDisk(reserved_disk);
if (Poco::File(path_to_clone + part->name).exists())
throw Exception(
"Move is not possible: " + path_to_clone + part->name + " already exists.",
ErrorCodes::DIRECTORY_ALREADY_EXISTS);
if (currently_moving_parts.count(part) || partIsAssignedToBackgroundOperation(part))
throw Exception(
"Cannot move part '" + part->name + "' because it's participating in background process.",
ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
parts_to_move.emplace_back(part, std::move(reservation));
}
}
else
{
auto can_move = [this](const DataPartPtr & part, String * reason) -> bool
{
if (partIsAssignedToBackgroundOperation(part))
{
*reason = "part already assigned to replicated background operation.";
return false;
}
if (currently_moving_parts.count(part))
{
*reason = "part is already moving.";
return false;
}
return true;
};
if (!parts_mover.selectPartsToMove(parts_to_move, can_move))
return false;
}
LOG_INFO(log, "Found " << parts_to_move.size() << " parts to move.");
moving_tagger.emplace(std::move(parts_to_move), std::move(moving_parts_lock), currently_moving_parts);
}
for (const auto & moving_part : moving_tagger->parts_to_move)
{
Stopwatch stopwatch;
DataPartPtr cloned_part;
auto write_part_log = [&](const ExecutionStatus & execution_status)
{
writePartLog(
PartLogElement::Type::MOVE_PART,
execution_status,
stopwatch.elapsed(),
moving_part.part->name,
cloned_part,
{moving_part.part},
nullptr);
};
try
{
cloned_part = parts_mover.clonePart(moving_part);
parts_mover.swapClonedPart(cloned_part);
write_part_log({});
}
catch (...)
{
write_part_log(ExecutionStatus::fromCurrentException());
if (cloned_part)
cloned_part->remove();
throw;
}
}
return true;
}
}

View File

@ -17,6 +17,7 @@
#include <DataStreams/GraphiteRollupSortedBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/IndicesDescription.h>
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Interpreters/PartLog.h>
#include <Common/DiskSpaceMonitor.h>
@ -31,6 +32,7 @@ namespace DB
class MergeListEntry;
class AlterCommands;
class MergeTreePartsMover;
namespace ErrorCodes
{
@ -597,7 +599,11 @@ public:
protected:
/// Moves part to specified space
virtual void movePartsToSpace(const DataPartsVector & parts, DiskSpace::SpacePtr space) = 0;
bool movePartsToSpace(
const DataPartsVector * parts = nullptr,
DiskSpace::SpacePtr space = nullptr);
virtual bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const = 0;
public:
/// Moves partition to specified Disk
@ -726,6 +732,16 @@ public:
bool has_non_adaptive_index_granularity_parts = false;
/// Parts that currently moving from disk/volume to another.
/// This set have to be used with `currently_processing_in_background_mutex`.
/// Moving may conflict with merges and mutations, but this is OK, because
/// if we decide to move some part to another disk, than we
/// assuredly will choose this disk for containing part, which will appear
/// as result of merge or mutation.
DataParts currently_moving_parts;
/// Mutex for currenly_moving_parts
std::mutex moving_parts_mutex;
protected:
@ -799,6 +815,8 @@ protected:
DataPartsIndexes::index<TagByInfo>::type & data_parts_by_info;
DataPartsIndexes::index<TagByStateAndInfo>::type & data_parts_by_state_and_info;
MergeTreePartsMover parts_mover;
using DataPartIteratorByInfo = DataPartsIndexes::index<TagByInfo>::type::iterator;
using DataPartIteratorByStateAndInfo = DataPartsIndexes::index<TagByStateAndInfo>::type::iterator;

View File

@ -73,13 +73,13 @@ bool MergeTreePartsMover::selectPartsToMove(
MergeTreeMovingParts & parts_to_move,
const AllowedMovingPredicate & can_move)
{
MergeTreeData::DataPartsVector data_parts = data.getDataPartsVector();
MergeTreeData::DataPartsVector data_parts = data->getDataPartsVector();
if (data_parts.empty())
return false;
std::unordered_map<DiskSpace::DiskPtr, LargestPartsWithRequiredSize> need_to_move;
const auto & policy = data.getStoragePolicy();
const auto & policy = data->getStoragePolicy();
const auto & volumes = policy->getVolumes();
/// Do not check if policy has one volume
@ -146,7 +146,7 @@ MergeTreeData::DataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEnt
moving_part.part->makeCloneOnDiskDetached(moving_part.reserved_space);
MergeTreeData::MutableDataPartPtr cloned_part =
std::make_shared<MergeTreeData::DataPart>(data, moving_part.reserved_space->getDisk(), moving_part.part->name);
std::make_shared<MergeTreeData::DataPart>(*data, moving_part.reserved_space->getDisk(), moving_part.part->name);
cloned_part->relative_path = "detached/" + moving_part.part->name;
LOG_TRACE(log, "Part " << moving_part.part->name << " was cloned to " << cloned_part->getFullPath());
@ -161,7 +161,7 @@ void MergeTreePartsMover::swapClonedPart(const MergeTreeData::DataPartPtr & clon
if (moves_blocker.isCancelled())
throw Exception("Cancelled moving parts.", ErrorCodes::ABORTED);
auto active_part = data.getActiveContainingPart(cloned_part->name);
auto active_part = data->getActiveContainingPart(cloned_part->name);
if (!active_part || active_part->name != cloned_part->name)
throw Exception("Failed to swap " + cloned_part->name + ". Active part doesn't exist."
@ -170,7 +170,7 @@ void MergeTreePartsMover::swapClonedPart(const MergeTreeData::DataPartPtr & clon
cloned_part->renameTo(active_part->name);
data.swapActivePart(cloned_part);
data->swapActivePart(cloned_part);
}
}

View File

@ -3,7 +3,6 @@
#include <functional>
#include <vector>
#include <optional>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Common/ActionBlocker.h>
#include <Common/DiskSpaceMonitor.h>
@ -18,61 +17,27 @@ namespace ErrorCodes
struct MergeTreeMoveEntry
{
MergeTreeData::DataPartPtr part;
std::shared_ptr<const MergeTreeDataPart> part;
DiskSpace::ReservationPtr reserved_space;
MergeTreeMoveEntry(const MergeTreeData::DataPartPtr & part_, DiskSpace::ReservationPtr reservation_)
: part(part_),
reserved_space(std::move(reservation_))
MergeTreeMoveEntry(const std::shared_ptr<const MergeTreeDataPart> & part_, DiskSpace::ReservationPtr reservation_)
: part(part_), reserved_space(std::move(reservation_))
{
}
};
using MergeTreeMovingParts = std::vector<MergeTreeMoveEntry>;
struct MovingPartsTagger
{
MergeTreeMovingParts parts_to_move;
std::unique_lock<std::mutex> background_lock;
MergeTreeData::DataParts & all_moving_parts;
MovingPartsTagger(MergeTreeMovingParts && moving_parts_,
std::unique_lock<std::mutex> && background_lock_,
MergeTreeData::DataParts & all_moving_data_parts_)
: parts_to_move(std::move(moving_parts_))
, background_lock(std::move(background_lock_))
, all_moving_parts(all_moving_data_parts_)
{
if (!background_lock)
throw Exception("Cannot tag moving parts without background lock.", ErrorCodes::LOGICAL_ERROR);
for (const auto & moving_part : parts_to_move)
if (!all_moving_parts.emplace(moving_part.part).second)
throw Exception("Cannot move part '" + moving_part.part->name + "'. It's already moving.", ErrorCodes::LOGICAL_ERROR);
background_lock.unlock();
}
~MovingPartsTagger()
{
background_lock.lock();
for (const auto & moving_part : parts_to_move)
{
/// Something went completely wrong
if (!all_moving_parts.count(moving_part.part))
std::terminate();
all_moving_parts.erase(moving_part.part);
}
}
};
class MergeTreePartsMover
{
using AllowedMovingPredicate = std::function<bool (const MergeTreeData::DataPartPtr &, String * reason)>;
private:
using AllowedMovingPredicate = std::function<bool(const std::shared_ptr<const MergeTreeDataPart> &, String * reason)>;
public:
MergeTreePartsMover(MergeTreeData & data_)
MergeTreePartsMover(MergeTreeData * data_)
: data(data_)
, log(&Poco::Logger::get("MergeTreePartsMover"))
{
@ -82,16 +47,16 @@ public:
MergeTreeMovingParts & parts_to_move,
const AllowedMovingPredicate & can_move);
MergeTreeData::DataPartPtr clonePart(const MergeTreeMoveEntry & moving_part) const;
std::shared_ptr<const MergeTreeDataPart> clonePart(const MergeTreeMoveEntry & moving_part) const;
void swapClonedPart(const MergeTreeData::DataPartPtr & cloned_parts) const;
void swapClonedPart(const std::shared_ptr<const MergeTreeDataPart> & cloned_parts) const;
public:
ActionBlocker moves_blocker;
private:
MergeTreeData & data;
MergeTreeData * data;
Logger * log;
};

View File

@ -73,10 +73,8 @@ StorageMergeTree::StorageMergeTree(
context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_,
sample_by_ast_, ttl_table_ast_, merging_params_,
std::move(storage_settings_), false, attach),
background_pool(context_.getBackgroundPool()),
reader(*this), writer(*this),
merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads()),
parts_mover(*this)
merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads())
{
loadDataParts(has_force_restore_data_flag);
@ -99,7 +97,8 @@ void StorageMergeTree::startup()
/// NOTE background task will also do the above cleanups periodically.
time_after_previous_cleanup.restart();
background_task_handle = background_pool.addTask([this] { return backgroundTask(); });
merging_mutating_task_handle = global_context.getBackgroundPool().addTask([this] { return mergeMutateTask(); });
moving_task_handle = global_context.getBackgroundPool().addTask([this] { return movePartsTask(); });
}
@ -111,8 +110,11 @@ void StorageMergeTree::shutdown()
merger_mutator.merges_blocker.cancelForever();
parts_mover.moves_blocker.cancelForever();
if (background_task_handle)
background_pool.removeTask(background_task_handle);
if (merging_mutating_task_handle)
global_context.getBackgroundPool().removeTask(merging_mutating_task_handle);
if (moving_task_handle)
global_context.getBackgroundPool().removeTask(moving_task_handle);
}
@ -421,7 +423,7 @@ void StorageMergeTree::mutate(const MutationCommands & commands, const Context &
}
LOG_INFO(log, "Added mutation: " << file_name);
background_task_handle->wake();
merging_mutating_task_handle->wake();
}
std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() const
@ -492,7 +494,7 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
LOG_TRACE(log, "Cancelled part mutations and removed mutation file " << mutation_id);
/// Maybe there is another mutation that was blocked by the killed one. Try to execute it immediately.
background_task_handle->wake();
merging_mutating_task_handle->wake();
return CancellationCode::CancelSent;
}
@ -617,139 +619,28 @@ bool StorageMergeTree::merge(
return true;
}
void StorageMergeTree::movePartsToSpace(const MergeTreeData::DataPartsVector & parts, DiskSpace::SpacePtr space)
bool StorageMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & part) const
{
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
std::optional<MovingPartsTagger> moving_tagger;
{
MergeTreeMovingParts parts_to_move;
std::unique_lock background_processing_lock(currently_processing_in_background_mutex);
for (const auto & part : parts)
{
auto reservation = space->reserve(part->bytes_on_disk);
if (!reservation)
throw Exception("Move is not possible. Not enough space " + space->getName() + ".", ErrorCodes::NOT_ENOUGH_SPACE);
auto & reserved_disk = reservation->getDisk();
String path_to_clone = getFullPathOnDisk(reserved_disk);
if (Poco::File(path_to_clone + part->name).exists())
throw Exception("Move is not possible: " + path_to_clone + part->name + " already exists.",
ErrorCodes::DIRECTORY_ALREADY_EXISTS);
if (currently_merging_mutating_parts.count(part))
throw Exception("Cannot move part '" + part->name + "' because it's participating in background process.",
ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
parts_to_move.emplace_back(part, std::move(reservation));
}
moving_tagger.emplace(std::move(parts_to_move), std::move(background_processing_lock), currently_moving_parts);
}
for (const auto & moving_part : moving_tagger->parts_to_move)
{
Stopwatch stopwatch;
DataPartPtr cloned_part;
auto write_part_log = [&](const ExecutionStatus & execution_status)
{
writePartLog(
PartLogElement::Type::MOVE_PART,
execution_status,
stopwatch.elapsed(),
moving_part.part->name,
cloned_part,
{moving_part.part},
nullptr);
};
try
{
cloned_part = parts_mover.clonePart(moving_part);
parts_mover.swapClonedPart(cloned_part);
write_part_log({});
}
catch (...)
{
write_part_log(ExecutionStatus::fromCurrentException());
if (cloned_part)
cloned_part->remove();
throw;
}
}
std::lock_guard background_processing_lock(currently_processing_in_background_mutex);
return currently_merging_mutating_parts.count(part);
}
bool StorageMergeTree::moveParts()
BackgroundProcessingPoolTaskResult StorageMergeTree::movePartsTask()
{
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
std::optional<MovingPartsTagger> moving_tagger;
{
MergeTreeMovingParts parts_to_move;
std::unique_lock background_processing_lock(currently_processing_in_background_mutex);
auto can_move = [this](const DataPartPtr & part, String * reason) -> bool
{
if (currently_merging_mutating_parts.count(part))
{
*reason = "part is already assigned to merge or mutation.";
return false;
}
if (currently_moving_parts.count(part))
{
*reason = "part is already moving.";
return false;
}
return true;
};
if (!parts_mover.selectPartsToMove(parts_to_move, can_move))
return false;
LOG_INFO(log, "Found " << parts_to_move.size() << " parts to move.");
moving_tagger.emplace(std::move(parts_to_move), std::move(background_processing_lock), currently_moving_parts);
}
for (const auto & moving_part : moving_tagger->parts_to_move)
{
Stopwatch stopwatch;
DataPartPtr cloned_part;
auto write_part_log = [&](const ExecutionStatus & execution_status)
{
writePartLog(
PartLogElement::Type::MOVE_PART,
execution_status,
stopwatch.elapsed(),
moving_part.part->name,
cloned_part,
{moving_part.part},
nullptr);
};
try
{
cloned_part = parts_mover.clonePart(moving_part);
parts_mover.swapClonedPart(cloned_part);
write_part_log({});
if (!movePartsToSpace())
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
return BackgroundProcessingPoolTaskResult::SUCCESS;
}
catch (...)
{
write_part_log(ExecutionStatus::fromCurrentException());
if (cloned_part)
cloned_part->remove();
return false;
tryLogCurrentException(log);
return BackgroundProcessingPoolTaskResult::ERROR;
}
}
return true;
}
@ -849,15 +740,12 @@ bool StorageMergeTree::tryMutatePart()
}
BackgroundProcessingPoolTaskResult StorageMergeTree::backgroundTask()
BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask()
{
if (shutdown_called)
return BackgroundProcessingPoolTaskResult::ERROR;
bool merges_mutations_blocked = merger_mutator.merges_blocker.isCancelled();
bool moves_blocked = parts_mover.moves_blocker.isCancelled();
if (merges_mutations_blocked && moves_blocked)
if (merger_mutator.merges_blocker.isCancelled())
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
try
@ -875,14 +763,11 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::backgroundTask()
}
///TODO: read deduplicate option from table config
if (!merges_mutations_blocked && merge(false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/))
return BackgroundProcessingPoolTaskResult::SUCCESS;
if (!moves_blocked && moveParts())
if (merge(false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/))
return BackgroundProcessingPoolTaskResult::SUCCESS;
if (!merges_mutations_blocked && tryMutatePart())
if (tryMutatePart())
return BackgroundProcessingPoolTaskResult::SUCCESS;
return BackgroundProcessingPoolTaskResult::ERROR;

View File

@ -71,12 +71,10 @@ public:
CheckResults checkData(const ASTPtr & query, const Context & context) override;
private:
BackgroundProcessingPool & background_pool;
MergeTreeDataSelectExecutor reader;
MergeTreeDataWriter writer;
MergeTreeDataMergerMutator merger_mutator;
MergeTreePartsMover parts_mover;
/// For block numbers.
SimpleIncrement increment{0};
@ -92,13 +90,6 @@ private:
/// This set have to be used with `currently_processing_in_background_mutex`.
DataParts currently_merging_mutating_parts;
/// Parts that currently moving from disk/volume to another.
/// This set have to be used with `currently_processing_in_background_mutex`.
/// Moving may conflict with merges and mutations, but this is OK, because
/// if we decide to move some part to another disk, than we
/// assuredly will choose this disk for containing part, which will appear
/// as result of merge or mutation.
DataParts currently_moving_parts;
std::map<String, MergeTreeMutationEntry> current_mutations_by_id;
std::multimap<Int64, MergeTreeMutationEntry &> current_mutations_by_version;
@ -106,7 +97,8 @@ private:
std::atomic<bool> shutdown_called {false};
/// Task handler for merges, mutations and moves.
BackgroundProcessingPool::TaskHandle background_task_handle;
BackgroundProcessingPool::TaskHandle merging_mutating_task_handle;
BackgroundProcessingPool::TaskHandle moving_task_handle;
std::vector<MergeTreeData::AlterDataPartTransactionPtr> prepareAlterTransactions(
const ColumnsDescription & new_columns, const IndicesDescription & new_indices, const Context & context);
@ -119,12 +111,12 @@ private:
*/
bool merge(bool aggressive, const String & partition_id, bool final, bool deduplicate, String * out_disable_reason = nullptr);
bool moveParts();
BackgroundProcessingPoolTaskResult movePartsTask();
/// Try and find a single part to mutate and mutate it. If some part was successfully mutated, return true.
bool tryMutatePart();
BackgroundProcessingPoolTaskResult backgroundTask();
BackgroundProcessingPoolTaskResult mergeMutateTask();
Int64 getCurrentMutationVersion(
const DataPartPtr & part,
@ -137,7 +129,8 @@ private:
void clearColumnOrIndexInPartition(const ASTPtr & partition, const AlterCommand & alter_command, const Context & context);
void attachPartition(const ASTPtr & partition, bool part, const Context & context);
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context);
void movePartsToSpace(const MergeTreeData::DataPartsVector & part, DiskSpace::SpacePtr space) override;
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;
friend class MergeTreeBlockOutputStream;
friend class MergeTreeData;

View File

@ -214,7 +214,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
zookeeper_path(global_context.getMacros()->expand(zookeeper_path_, database_name_, table_name_)),
replica_name(global_context.getMacros()->expand(replica_name_, database_name_, table_name_)),
reader(*this), writer(*this), merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads()),
parts_mover(*this), queue(*this), fetcher(*this), cleanup_thread(*this), alter_thread(*this),
queue(*this), fetcher(*this), cleanup_thread(*this), alter_thread(*this),
part_check_thread(*this), restarting_thread(*this)
{
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
@ -2147,159 +2147,29 @@ BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::queueTask()
return need_sleep ? BackgroundProcessingPoolTaskResult::ERROR : BackgroundProcessingPoolTaskResult::SUCCESS;
}
BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::movingPartsTask()
bool StorageReplicatedMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & part) const
{
if (parts_mover.moves_blocker.isCancelled())
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
return queue.isPartAssignedToBackgroundOperation(part);
}
BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::movePartsTask()
{
try
{
std::optional<MovingPartsTagger> moving_tagger;
{
MergeTreeMovingParts parts_to_move;
std::unique_lock moving_parts_lock(moving_parts_mutex);
auto can_move = [this](const DataPartPtr & part, String * reason) -> bool
{
if (queue.isPartAssignedToBackgroundOperation(part))
{
*reason = "part already assigned to replicated background operation.";
return false;
}
if (currently_moving_parts.count(part))
{
*reason = "part is already moving.";
return false;
}
return true;
};
if (!parts_mover.selectPartsToMove(parts_to_move, can_move))
if (!movePartsToSpace())
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
LOG_INFO(log, "Found " << parts_to_move.size() << " parts to move.");
moving_tagger.emplace(std::move(parts_to_move), std::move(moving_parts_lock), currently_moving_parts);
}
for (const auto & moving_part : moving_tagger->parts_to_move)
{
Stopwatch stopwatch;
DataPartPtr cloned_part;
auto write_part_log = [&](const ExecutionStatus & execution_status)
{
writePartLog(
PartLogElement::Type::MOVE_PART,
execution_status,
stopwatch.elapsed(),
moving_part.part->name,
cloned_part,
{moving_part.part},
nullptr);
};
try
{
cloned_part = parts_mover.clonePart(moving_part);
parts_mover.swapClonedPart(cloned_part);
write_part_log({});
}
catch (...)
{
write_part_log(ExecutionStatus::fromCurrentException());
if (cloned_part)
cloned_part->remove();
return BackgroundProcessingPoolTaskResult::ERROR;
}
}
return BackgroundProcessingPoolTaskResult::SUCCESS;
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::ABORTED)
{
LOG_INFO(log, e.message());
return BackgroundProcessingPoolTaskResult::ERROR;
}
throw;
}
}
void StorageReplicatedMergeTree::movePartsToSpace(const MergeTreeData::DataPartsVector & parts, DiskSpace::SpacePtr space)
{
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
std::optional<MovingPartsTagger> moving_tagger;
{
MergeTreeMovingParts parts_to_move;
std::unique_lock moving_parts_lock(moving_parts_mutex);
for (const auto & part : parts)
{
auto reservation = space->reserve(part->bytes_on_disk);
if (!reservation)
throw Exception("Move is not possible. Not enough space " + space->getName() + ".", ErrorCodes::NOT_ENOUGH_SPACE);
auto & reserved_disk = reservation->getDisk();
String path_to_clone = getFullPathOnDisk(reserved_disk);
if (Poco::File(path_to_clone + part->name).exists())
throw Exception(
"Move is not possible: " + path_to_clone + part->name + " already exists.", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
if (queue.isPartAssignedToBackgroundOperation(part))
throw Exception(
"Cannot move part '" + part->name + "' because it's participating in background process.",
ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
parts_to_move.emplace_back(part, std::move(reservation));
}
LOG_INFO(log, "Found " << parts_to_move.size() << " parts to move.");
moving_tagger.emplace(std::move(parts_to_move), std::move(moving_parts_lock), currently_moving_parts);
}
for (const auto & moving_part : moving_tagger->parts_to_move)
{
Stopwatch stopwatch;
DataPartPtr cloned_part;
auto write_part_log = [&](const ExecutionStatus & execution_status)
{
writePartLog(
PartLogElement::Type::MOVE_PART,
execution_status,
stopwatch.elapsed(),
moving_part.part->name,
cloned_part,
{moving_part.part},
nullptr);
};
try
{
cloned_part = parts_mover.clonePart(moving_part);
parts_mover.swapClonedPart(cloned_part);
write_part_log({});
}
catch (...)
{
write_part_log(ExecutionStatus::fromCurrentException());
if (cloned_part)
cloned_part->remove();
throw;
tryLogCurrentException(log);
return BackgroundProcessingPoolTaskResult::ERROR;
}
}
}
void StorageReplicatedMergeTree::mergeSelectingTask()
{
if (!is_leader)
@ -3002,7 +2872,7 @@ void StorageReplicatedMergeTree::startup()
data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint, global_context.getInterserverIOHandler());
queue_task_handle = global_context.getBackgroundPool().addTask([this] { return queueTask(); });
move_parts_task_handle = global_context.getBackgroundPool().addTask([this] { return movingPartsTask(); });
move_parts_task_handle = global_context.getBackgroundPool().addTask([this] { return movePartsTask(); });
/// In this thread replica will be activated.
restarting_thread.start();

View File

@ -229,7 +229,6 @@ private:
MergeTreeDataSelectExecutor reader;
MergeTreeDataWriter writer;
MergeTreeDataMergerMutator merger_mutator;
MergeTreePartsMover parts_mover;
/** The queue of what needs to be done on this replica to catch up with everyone. It is taken from ZooKeeper (/replicas/me/queue/).
* In ZK entries in chronological order. Here it is not necessary.
@ -338,12 +337,9 @@ private:
DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction,
const DataPartPtr & part);
void movePartsToSpace(const MergeTreeData::DataPartsVector & parts, DiskSpace::SpacePtr space) override;
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;
void getCommitPartOps(
Coordination::Requests & ops,
MutableDataPartPtr & part,
const String & block_id_path = "") const;
void getCommitPartOps(Coordination::Requests & ops, MutableDataPartPtr & part, const String & block_id_path = "") const;
/// Updates info about part columns and checksums in ZooKeeper and commits transaction if successful.
void updatePartHeaderInZooKeeperAndCommit(
@ -406,7 +402,7 @@ private:
/// Perform moves of parts to another disks.
/// Local operation, doesn't interact with replicationg queue.
BackgroundProcessingPoolTaskResult movingPartsTask();
BackgroundProcessingPoolTaskResult movePartsTask();
/// Postcondition:
@ -465,12 +461,6 @@ private:
std::unordered_set<String> currently_fetching_parts;
std::mutex currently_fetching_parts_mutex;
/// Parts currently moving to another disks or volumes.
/// This operation doesn't replicate.
DataParts currently_moving_parts;
/// Mutex for currenly_moving_parts
std::mutex moving_parts_mutex;
/// With the quorum being tracked, add a replica to the quorum for the part.
void updateQuorum(const String & part_name);

View File

@ -495,7 +495,7 @@ def test_mutate_to_another_disk(start_cluster, name, engine):
node1.query("ALTER TABLE {} UPDATE s1 = concat(s1, 'x') WHERE 1".format(name))
retry = 10
retry = 20
while node1.query("SELECT * FROM system.mutations WHERE is_done = 0") != "" and retry > 0:
retry -= 1
time.sleep(0.5)