Fix error

This commit is contained in:
Alexey Milovidov 2020-07-10 02:45:29 +03:00
parent ad6fcd57b2
commit 7fc90aa070

View File

@ -153,6 +153,18 @@ zkutil::ZooKeeperPtr StorageReplicatedMergeTree::getZooKeeper() const
}
static std::string normalizeZooKeeperPath(std::string zookeeper_path)
{
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
zookeeper_path.resize(zookeeper_path.size() - 1);
/// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it.
if (!zookeeper_path.empty() && zookeeper_path.front() != '/')
zookeeper_path = "/" + zookeeper_path;
return zookeeper_path;
}
StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & zookeeper_path_,
const String & replica_name_,
@ -175,8 +187,9 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
true, /// require_part_metadata
attach,
[this] (const std::string & name) { enqueuePartForCheck(name); })
, zookeeper_path(global_context.getMacros()->expand(zookeeper_path_, table_id_.database_name, table_id_.table_name))
, zookeeper_path(normalizeZooKeeperPath(global_context.getMacros()->expand(zookeeper_path_, table_id_.database_name, table_id_.table_name)))
, replica_name(global_context.getMacros()->expand(replica_name_, table_id_.database_name, table_id_.table_name))
, replica_path(zookeeper_path + "/replicas/" + replica_name)
, reader(*this)
, writer(*this)
, merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads())
@ -186,13 +199,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
, part_check_thread(*this)
, restarting_thread(*this)
{
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
zookeeper_path.resize(zookeeper_path.size() - 1);
/// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it.
if (!zookeeper_path.empty() && zookeeper_path.front() != '/')
zookeeper_path = "/" + zookeeper_path;
replica_path = zookeeper_path + "/replicas/" + replica_name;
queue_updating_task = global_context.getSchedulePool().createTask(
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::queueUpdatingTask)", [this]{ queueUpdatingTask(); });
@ -201,6 +207,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
merge_selecting_task = global_context.getSchedulePool().createTask(
getStorageID().getFullTableName() + " (StorageReplicatedMergeTree::mergeSelectingTask)", [this] { mergeSelectingTask(); });
/// Will be activated if we win leader election.
merge_selecting_task->deactivate();
@ -1434,7 +1441,6 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
}
}
MergeTreePartInfo new_part_info = MergeTreePartInfo::fromPartName(
entry.new_part_name, format_version);
MutationCommands commands = queue.getMutationCommands(source_part, new_part_info.mutation);