Merge pull request #12315 from ClickHouse/fix-race-condition-replicated-merge-tree-queue

Fix race condition in ReplicatedMergeTreeQueue
This commit is contained in:
alexey-milovidov 2020-07-10 08:11:56 +03:00 committed by GitHub
commit c5ebf596c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 31 deletions

View File

@ -25,7 +25,18 @@ ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree &
, format_version(storage.format_version)
, current_parts(format_version)
, virtual_parts(format_version)
{}
{
zookeeper_path = storage.zookeeper_path;
replica_path = storage.replica_path;
logger_name = storage.getStorageID().getFullTableName() + " (ReplicatedMergeTreeQueue)";
log = &Poco::Logger::get(logger_name);
}
void ReplicatedMergeTreeQueue::initialize(const MergeTreeData::DataParts & parts)
{
addVirtualParts(parts);
}
void ReplicatedMergeTreeQueue::addVirtualParts(const MergeTreeData::DataParts & parts)
@ -109,19 +120,6 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
}
void ReplicatedMergeTreeQueue::initialize(
const String & zookeeper_path_, const String & replica_path_, const String & logger_name_,
const MergeTreeData::DataParts & parts)
{
zookeeper_path = zookeeper_path_;
replica_path = replica_path_;
logger_name = logger_name_;
log = &Poco::Logger::get(logger_name);
addVirtualParts(parts);
}
void ReplicatedMergeTreeQueue::insertUnlocked(
const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed,
std::lock_guard<std::mutex> & state_lock)

View File

@ -166,7 +166,8 @@ private:
void notifySubscribers(size_t new_queue_size);
/// Check that entry_ptr is REPLACE_RANGE entry and can be removed from queue because current entry covers it
bool checkReplaceRangeCanBeRemoved(const MergeTreePartInfo & part_info, const LogEntryPtr entry_ptr, const ReplicatedMergeTreeLogEntryData & current) const;
bool checkReplaceRangeCanBeRemoved(
const MergeTreePartInfo & part_info, const LogEntryPtr entry_ptr, const ReplicatedMergeTreeLogEntryData & current) const;
/// Ensures that only one thread is simultaneously updating mutations.
std::mutex update_mutations_mutex;
@ -251,12 +252,10 @@ private:
public:
ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_);
~ReplicatedMergeTreeQueue();
void initialize(const String & zookeeper_path_, const String & replica_path_, const String & logger_name_,
const MergeTreeData::DataParts & parts);
void initialize(const MergeTreeData::DataParts & parts);
/** Inserts an action to the end of the queue.
* To restore broken parts during operation.

View File

@ -153,6 +153,18 @@ zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeper() const
}
static std::string normalizeZooKeeperPath(std::string zookeeper_path)
{
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
zookeeper_path.resize(zookeeper_path.size() - 1);
/// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it.
if (!zookeeper_path.empty() && zookeeper_path.front() != '/')
zookeeper_path = "/" + zookeeper_path;
return zookeeper_path;
}
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & zookeeper_path_,
const String & replica_name_,
@ -175,8 +187,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
true, /// require_part_metadata
attach,
[this] (const std::string & name) { enqueuePartForCheck(name); })
, zookeeper_path(global_context.getMacros()->expand(zookeeper_path_, table_id_.database_name, table_id_.table_name))
, zookeeper_path(normalizeZooKeeperPath(global_context.getMacros()->expand(zookeeper_path_, table_id_.database_name, table_id_.table_name)))
, replica_name(global_context.getMacros()->expand(replica_name_, table_id_.database_name, table_id_.table_name))
, replica_path(zookeeper_path + "/replicas/" + replica_name)
, reader(*this)
, writer(*this)
, merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads())
@ -186,13 +199,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
, part_check_thread(*this)
, restarting_thread(*this)
{
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
zookeeper_path.resize(zookeeper_path.size() - 1);
/// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it.
if (!zookeeper_path.empty() && zookeeper_path.front() != '/')
zookeeper_path = "/" + zookeeper_path;
replica_path = zookeeper_path + "/replicas/" + replica_name;
queue_updating_task = global_context.getSchedulePool().createTask(
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::queueUpdatingTask)", [this]{ queueUpdatingTask(); });
@ -201,6 +207,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
merge_selecting_task = global_context.getSchedulePool().createTask(
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mergeSelectingTask)", [this] { mergeSelectingTask(); });
/// Will be activated if we win leader election.
merge_selecting_task->deactivate();
@ -1434,7 +1441,6 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
}
}
MergeTreePartInfo new_part_info = MergeTreePartInfo::fromPartName(
entry.new_part_name, format_version);
MutationCommands commands = queue.getMutationCommands(source_part, new_part_info.mutation);
@ -3246,10 +3252,7 @@ void StorageReplicatedMergeTree::startup()
try
{
queue.initialize(
zookeeper_path, replica_path,
getStorageID().getFullTableName() + " (ReplicatedMergeTreeQueue)",
getDataParts());
queue.initialize(getDataParts());
data_parts_exchange_endpoint = std::make_shared<DataPartsExchange::Service>(*this);
global_context.getInterserverIOHandler().addEndpoint(data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint);