Almost working replicated MT

This commit is contained in:
alesapin 2019-08-20 11:38:02 +03:00
parent e9e67a2688
commit 8408ffbfa6
3 changed files with 58 additions and 39 deletions

View File

@ -15,7 +15,7 @@ namespace ErrorCodes
{
extern const int UNEXPECTED_NODE_IN_ZOOKEEPER;
extern const int UNFINISHED;
extern const int PART_IS_TEMPORARY_LOCKED;
extern const int PART_IS_TEMPORARILY_LOCKED;
}
@ -27,21 +27,39 @@ ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree &
{}
void ReplicatedMergeTreeQueue::addVirtualParts(const MergeTreeData::DataParts & parts, bool throw_if_already_virtual)
void ReplicatedMergeTreeQueue::addVirtualParts(const MergeTreeData::DataParts & parts)
{
std::lock_guard lock(state_mutex);
for (const auto & part : parts)
for (auto part : parts)
{
if (throw_if_already_virtual && !virtual_parts.getContainingPart(part->info).empty())
throw Exception("Part " + part->name + " or covering part is already contains in virtual (future) parts set.", ErrorCodes::PART_IS_TEMPORARY_LOCKED);
current_parts.add(part->name);
virtual_parts.add(part->name);
}
}
void ReplicatedMergeTreeQueue::disableMergesForParts(const MergeTreeData::DataPartsVector & data_parts)
{
std::lock_guard lock(state_mutex);
for (const auto & data_part : data_parts)
{
if (!virtual_parts.getContainingPart(data_part->name).empty())
throw Exception("Part " + data_part->name + " or covering part is"
+ " already contains in virtual (future) parts set.", ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
}
for (const auto & data_part : data_parts)
virtual_parts.add(data_part->name);
}
bool ReplicatedMergeTreeQueue::isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const
{
std::lock_guard lock(state_mutex);
return !virtual_parts.getContainingPart(data_part->name).empty();
}
bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
{
auto queue_path = replica_path + "/queue";
@ -1734,24 +1752,6 @@ bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const ReplicatedMerge
}
ReplicatedMergeTreeMovePredicate::ReplicatedMergeTreeMovePredicate(const ReplicatedMergeTreeQueue & queue_)
: queue(queue_)
, queue_state_lock(queue_.state_mutex)
{
}
bool ReplicatedMergeTreeMovePredicate::operator()(const MergeTreeData::DataPartPtr & part, String * /* out_reason */) const
{
return !queue.virtual_parts.getContainingPart(part->info).empty();
}
ReplicatedMergeTreeMovePredicate::~ReplicatedMergeTreeMovePredicate()
{
queue_state_lock.unlock();
}
ReplicatedMergeTreeQueue::SubscriberHandler
ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCallBack && callback)
{

View File

@ -158,6 +158,8 @@ private:
/// Ensures that only one thread is simultaneously updating mutations.
std::mutex update_mutations_mutex;
/// Put a set of (already existing) parts in virtual_parts.
void addVirtualParts(const MergeTreeData::DataParts & parts);
void insertUnlocked(
const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed,
@ -228,8 +230,6 @@ public:
~ReplicatedMergeTreeQueue();
/// Put a set of (already existing) parts in virtual_parts. TODO(MOVE TO PRIVATE)
void addVirtualParts(const MergeTreeData::DataParts & parts, bool throw_if_already_virtual=false);
void initialize(const String & zookeeper_path_, const String & replica_path_, const String & logger_name_,
const MergeTreeData::DataParts & parts);
@ -327,6 +327,15 @@ public:
/// Part maybe fake (look at ReplicatedMergeTreeMergePredicate).
void disableMergesInBlockRange(const String & part_name);
/// Prohibit merges for specified parts.
/// Add part to virtual_parts, which means that part must exist
/// after processing replication log up to log_pointer.
/// Throws exception if any part was in virtual parts
void disableMergesForParts(const MergeTreeData::DataPartsVector & data_parts);
/// Cheks that part is already in virtual parts
bool isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const;
/// Check that part isn't in currently generating parts and isn't covered by them and add it to future_parts.
/// Locks queue's mutex.
bool addFuturePartIfNotCoveredByThem(const String & part_name, LogEntry & entry, String & reject_reason);

View File

@ -2181,17 +2181,18 @@ public:
: parts(std::move(parts_))
, queue(queue_)
{
MergeTreeData::DataParts data_parts;
MergeTreeData::DataPartsVector data_parts;
/// Assume queue mutex is already locked, because this method is called from tryMoveParts.
for (const auto & moving_part : parts)
data_parts.emplace(moving_part.part);
data_parts.emplace_back(moving_part.part);
/// Throws exception if some parts already exists
queue.addVirtualParts(data_parts, true);
queue.disableMergesForParts(data_parts);
}
~CurrentlyMovingPartsTagger()
{
/// May return false, but we don't care, it's ok
for (auto & part : parts)
queue.removeFromVirtualParts(part.part->info);
}
@ -2206,19 +2207,24 @@ BackgroundProcessingPoolTaskResult StorageReplicatedMergeTree::tryMoveParts()
{
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
MergeTreeMovingParts parts_to_move;
auto can_move = [this](const DataPartPtr & part, String *) -> bool
{
/// Holds lock on queue until selection finished
ReplicatedMergeTreeMovePredicate can_move(queue);
return !queue.isVirtualPart(part);
};
if (!parts_mover.selectPartsToMove(parts_to_move, can_move))
{
LOG_INFO(log, "Nothing to move.");
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
}
MergeTreeMovingParts parts_to_move;
if (!parts_mover.selectPartsToMove(parts_to_move, can_move))
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
try
{
moving_tagger.emplace(std::move(parts_to_move), queue);
}
catch (const DB::Exception & ex)
{
LOG_INFO(log, "Cannot start moving: " + ex.displayText());
return BackgroundProcessingPoolTaskResult::ERROR;
}
moving_tagger.emplace(std::move(parts_to_move), queue);
}
LOG_INFO(log, "Found " << moving_tagger->parts.size() << " parts to move.");
@ -2954,6 +2960,10 @@ void StorageReplicatedMergeTree::shutdown()
global_context.getBackgroundPool().removeTask(queue_task_handle);
queue_task_handle.reset();
if (move_parts_task_handle)
global_context.getBackgroundPool().removeTask(move_parts_task_handle);
move_parts_task_handle.reset();
if (data_parts_exchange_endpoint_holder)
{
data_parts_exchange_endpoint_holder->getBlocker().cancelForever();