mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-12 09:22:05 +00:00
Fix Zero-Copy replication lost locks, fix remove used remote data in DROP DETACHED PART
This commit is contained in:
parent
d409ab0605
commit
80ab73c691
@ -1100,6 +1100,14 @@ void IMergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_
|
|||||||
storage.lockSharedData(*this);
|
storage.lockSharedData(*this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void IMergeTreeDataPart::cleanupOldName(const String & old_name) const
|
||||||
|
{
|
||||||
|
if (name == old_name)
|
||||||
|
return;
|
||||||
|
|
||||||
|
storage.unlockSharedData(*this, old_name);
|
||||||
|
}
|
||||||
|
|
||||||
std::optional<bool> IMergeTreeDataPart::keepSharedDataInDecoupledStorage() const
|
std::optional<bool> IMergeTreeDataPart::keepSharedDataInDecoupledStorage() const
|
||||||
{
|
{
|
||||||
/// NOTE: It's needed for zero-copy replication
|
/// NOTE: It's needed for zero-copy replication
|
||||||
|
@ -334,6 +334,9 @@ public:
|
|||||||
/// Changes only relative_dir_name, you need to update other metadata (name, is_temp) explicitly
|
/// Changes only relative_dir_name, you need to update other metadata (name, is_temp) explicitly
|
||||||
virtual void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const;
|
virtual void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const;
|
||||||
|
|
||||||
|
/// Cleanup after change part
|
||||||
|
virtual void cleanupOldName(const String & old_part_name) const;
|
||||||
|
|
||||||
/// Makes clone of a part in detached/ directory via hard links
|
/// Makes clone of a part in detached/ directory via hard links
|
||||||
virtual void makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const;
|
virtual void makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot) const;
|
||||||
|
|
||||||
|
@ -2449,6 +2449,8 @@ bool MergeTreeData::renameTempPartAndReplace(
|
|||||||
MergeTreePartInfo part_info = part->info;
|
MergeTreePartInfo part_info = part->info;
|
||||||
String part_name;
|
String part_name;
|
||||||
|
|
||||||
|
String old_part_name = part->name;
|
||||||
|
|
||||||
if (DataPartPtr existing_part_in_partition = getAnyPartInPartition(part->info.partition_id, lock))
|
if (DataPartPtr existing_part_in_partition = getAnyPartInPartition(part->info.partition_id, lock))
|
||||||
{
|
{
|
||||||
if (part->partition.value != existing_part_in_partition->partition.value)
|
if (part->partition.value != existing_part_in_partition->partition.value)
|
||||||
@ -2512,6 +2514,7 @@ bool MergeTreeData::renameTempPartAndReplace(
|
|||||||
/// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts
|
/// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts
|
||||||
///
|
///
|
||||||
/// If out_transaction is null, we commit the part to the active set immediately, else add it to the transaction.
|
/// If out_transaction is null, we commit the part to the active set immediately, else add it to the transaction.
|
||||||
|
|
||||||
part->name = part_name;
|
part->name = part_name;
|
||||||
part->info = part_info;
|
part->info = part_info;
|
||||||
part->is_temp = false;
|
part->is_temp = false;
|
||||||
@ -2560,6 +2563,8 @@ bool MergeTreeData::renameTempPartAndReplace(
|
|||||||
out_covered_parts->emplace_back(std::move(covered_part));
|
out_covered_parts->emplace_back(std::move(covered_part));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
part->cleanupOldName(old_part_name);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3885,11 +3890,16 @@ void MergeTreeData::dropDetached(const ASTPtr & partition, bool part, ContextPtr
|
|||||||
|
|
||||||
renamed_parts.tryRenameAll();
|
renamed_parts.tryRenameAll();
|
||||||
|
|
||||||
|
String replica_name = getReplicaName();
|
||||||
|
String zookeeper_name = getZooKeeperName();
|
||||||
|
String zookeeper_path = getZooKeeperPath();
|
||||||
|
|
||||||
for (auto & [old_name, new_name] : renamed_parts.old_and_new_names)
|
for (auto & [old_name, new_name] : renamed_parts.old_and_new_names)
|
||||||
{
|
{
|
||||||
const auto & [path, disk] = renamed_parts.old_part_name_to_path_and_disk[old_name];
|
const auto & [path, disk] = renamed_parts.old_part_name_to_path_and_disk[old_name];
|
||||||
disk->removeRecursive(fs::path(path) / "detached" / new_name / "");
|
bool keep_shared = removeSharedDetachedPart(disk, fs::path(path) / "detached" / new_name / "", old_name,
|
||||||
LOG_DEBUG(log, "Dropped detached part {}", old_name);
|
zookeeper_name, replica_name, zookeeper_path);
|
||||||
|
LOG_DEBUG(log, "Dropped detached part {}, keep shared data: {}", old_name, keep_shared);
|
||||||
old_name.clear();
|
old_name.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -5288,6 +5298,63 @@ PartitionCommandsResultInfo MergeTreeData::unfreezeAll(
|
|||||||
return unfreezePartitionsByMatcher([] (const String &) { return true; }, backup_name, local_context);
|
return unfreezePartitionsByMatcher([] (const String &) { return true; }, backup_name, local_context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool MergeTreeData::removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name)
|
||||||
|
{
|
||||||
|
bool keep_shared = false;
|
||||||
|
|
||||||
|
if (disk->supportZeroCopyReplication())
|
||||||
|
{
|
||||||
|
FreezeMetaData meta;
|
||||||
|
if (meta.load(disk, path) && meta.is_replicated)
|
||||||
|
return removeSharedDetachedPart(disk, path, part_name, meta.zookeeper_name, meta.replica_name, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
disk->removeSharedRecursive(path, keep_shared);
|
||||||
|
|
||||||
|
return keep_shared;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool MergeTreeData::removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name,
|
||||||
|
const String & zookeeper_name, const String & replica_name, const String & zookeeper_path)
|
||||||
|
{
|
||||||
|
bool keep_shared = false;
|
||||||
|
|
||||||
|
if (disk->supportZeroCopyReplication())
|
||||||
|
{
|
||||||
|
zkutil::ZooKeeperPtr zookeeper;
|
||||||
|
if (zookeeper_name == "default")
|
||||||
|
{
|
||||||
|
zookeeper = getContext()->getZooKeeper();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
zookeeper = getContext()->getAuxiliaryZooKeeper(zookeeper_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (zookeeper)
|
||||||
|
{
|
||||||
|
fs::path checksums = fs::path(path) / "checksums.txt";
|
||||||
|
if (disk->exists(checksums))
|
||||||
|
{
|
||||||
|
auto ref_count = disk->getRefCount(checksums);
|
||||||
|
if (ref_count == 0)
|
||||||
|
{
|
||||||
|
String id = disk->getUniqueId(checksums);
|
||||||
|
keep_shared = !StorageReplicatedMergeTree::unlockSharedDataById(id, part_name,
|
||||||
|
replica_name, disk, zookeeper, getContext()->getReplicatedMergeTreeSettings(), log,
|
||||||
|
zookeeper_path);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
keep_shared = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
disk->removeSharedRecursive(path, keep_shared);
|
||||||
|
|
||||||
|
return keep_shared;
|
||||||
|
}
|
||||||
|
|
||||||
PartitionCommandsResultInfo MergeTreeData::unfreezePartitionsByMatcher(MatcherFn matcher, const String & backup_name, ContextPtr)
|
PartitionCommandsResultInfo MergeTreeData::unfreezePartitionsByMatcher(MatcherFn matcher, const String & backup_name, ContextPtr)
|
||||||
{
|
{
|
||||||
auto backup_path = fs::path("shadow") / escapeForFileName(backup_name) / relative_data_path;
|
auto backup_path = fs::path("shadow") / escapeForFileName(backup_name) / relative_data_path;
|
||||||
@ -5316,42 +5383,7 @@ PartitionCommandsResultInfo MergeTreeData::unfreezePartitionsByMatcher(MatcherFn
|
|||||||
|
|
||||||
const auto & path = it->path();
|
const auto & path = it->path();
|
||||||
|
|
||||||
bool keep_shared = false;
|
bool keep_shared = removeSharedDetachedPart(disk, path, partition_directory);
|
||||||
|
|
||||||
if (disk->supportZeroCopyReplication())
|
|
||||||
{
|
|
||||||
FreezeMetaData meta;
|
|
||||||
if (meta.load(disk, path) && meta.is_replicated)
|
|
||||||
{
|
|
||||||
zkutil::ZooKeeperPtr zookeeper;
|
|
||||||
if (meta.zookeeper_name == "default")
|
|
||||||
{
|
|
||||||
zookeeper = getContext()->getZooKeeper();
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
zookeeper = getContext()->getAuxiliaryZooKeeper(meta.zookeeper_name);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (zookeeper)
|
|
||||||
{
|
|
||||||
fs::path checksums = fs::path(path) / "checksums.txt";
|
|
||||||
if (disk->exists(checksums))
|
|
||||||
{
|
|
||||||
auto ref_count = disk->getRefCount(checksums);
|
|
||||||
if (ref_count == 0)
|
|
||||||
{
|
|
||||||
String id = disk->getUniqueId(checksums);
|
|
||||||
keep_shared = !StorageReplicatedMergeTree::unlockSharedDataById(id, partition_directory,
|
|
||||||
meta.replica_name, disk, zookeeper, getContext()->getReplicatedMergeTreeSettings(), log,
|
|
||||||
nullptr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
disk->removeSharedRecursive(path, keep_shared);
|
|
||||||
|
|
||||||
result.push_back(PartitionCommandResultInfo{
|
result.push_back(PartitionCommandResultInfo{
|
||||||
.partition_id = partition_id,
|
.partition_id = partition_id,
|
||||||
@ -5361,7 +5393,7 @@ PartitionCommandsResultInfo MergeTreeData::unfreezePartitionsByMatcher(MatcherFn
|
|||||||
.backup_name = backup_name,
|
.backup_name = backup_name,
|
||||||
});
|
});
|
||||||
|
|
||||||
LOG_DEBUG(log, "Unfreezed part by path {}, keep shared data {}", disk->getPath() + path, keep_shared);
|
LOG_DEBUG(log, "Unfreezed part by path {}, keep shared data: {}", disk->getPath() + path, keep_shared);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -867,11 +867,15 @@ public:
|
|||||||
/// Overridden in StorageReplicatedMergeTree
|
/// Overridden in StorageReplicatedMergeTree
|
||||||
virtual bool unlockSharedData(const IMergeTreeDataPart &) const { return true; }
|
virtual bool unlockSharedData(const IMergeTreeDataPart &) const { return true; }
|
||||||
|
|
||||||
|
/// Unlock same part with other (old) name
|
||||||
|
virtual bool unlockSharedData(const IMergeTreeDataPart &, const String &) const { return true; }
|
||||||
|
|
||||||
/// Fetch part only if some replica has it on shared storage like S3
|
/// Fetch part only if some replica has it on shared storage like S3
|
||||||
/// Overridden in StorageReplicatedMergeTree
|
/// Overridden in StorageReplicatedMergeTree
|
||||||
virtual bool tryToFetchIfShared(const IMergeTreeDataPart &, const DiskPtr &, const String &) { return false; }
|
virtual bool tryToFetchIfShared(const IMergeTreeDataPart &, const DiskPtr &, const String &) { return false; }
|
||||||
|
|
||||||
virtual String getZooKeeperName() const { return ""; }
|
virtual String getZooKeeperName() const { return ""; }
|
||||||
|
virtual String getZooKeeperPath() const { return ""; }
|
||||||
|
|
||||||
/// Parts that currently submerging (merging to bigger parts) or emerging
|
/// Parts that currently submerging (merging to bigger parts) or emerging
|
||||||
/// (to be appeared after merging finished). These two variables have to be used
|
/// (to be appeared after merging finished). These two variables have to be used
|
||||||
@ -1174,6 +1178,12 @@ private:
|
|||||||
DataPartsVector & duplicate_parts_to_remove,
|
DataPartsVector & duplicate_parts_to_remove,
|
||||||
MutableDataPartsVector & parts_from_wal,
|
MutableDataPartsVector & parts_from_wal,
|
||||||
DataPartsLock & part_lock);
|
DataPartsLock & part_lock);
|
||||||
|
|
||||||
|
/// Check shared data usage on other replicas for detached/freezed part
|
||||||
|
/// Remove local files and remote files if needed
|
||||||
|
bool removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name);
|
||||||
|
bool removeSharedDetachedPart(DiskPtr disk, const String & path, const String & part_name,
|
||||||
|
const String & zookeeper_name, const String & replica_name, const String & zookeeper_path);
|
||||||
};
|
};
|
||||||
|
|
||||||
/// RAII struct to record big parts that are submerging or emerging.
|
/// RAII struct to record big parts that are submerging or emerging.
|
||||||
|
@ -222,6 +222,8 @@ void ReplicatedMergeTreeSink::commitPart(
|
|||||||
|
|
||||||
bool is_already_existing_part = false;
|
bool is_already_existing_part = false;
|
||||||
|
|
||||||
|
String old_part_name = part->name;
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
/// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem.
|
/// Obtain incremental block number and lock it. The lock holds our intention to add the block to the filesystem.
|
||||||
@ -502,6 +504,8 @@ void ReplicatedMergeTreeSink::commitPart(
|
|||||||
|
|
||||||
waitForQuorum(zookeeper, part->name, quorum_info.status_path, quorum_info.is_active_node_value);
|
waitForQuorum(zookeeper, part->name, quorum_info.status_path, quorum_info.is_active_node_value);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
part->cleanupOldName(old_part_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReplicatedMergeTreeSink::onStart()
|
void ReplicatedMergeTreeSink::onStart()
|
||||||
|
@ -4111,13 +4111,8 @@ void StorageReplicatedMergeTree::startup()
|
|||||||
getContext()->getInterserverIOHandler().addEndpoint(data_parts_exchange_ptr->getId(replica_path), data_parts_exchange_ptr);
|
getContext()->getInterserverIOHandler().addEndpoint(data_parts_exchange_ptr->getId(replica_path), data_parts_exchange_ptr);
|
||||||
|
|
||||||
convertZeroCopySchema();
|
convertZeroCopySchema();
|
||||||
|
|
||||||
is_zero_copy_in_compatible_mode = isZeroCopySchemaInCompatibleMode();
|
is_zero_copy_in_compatible_mode = isZeroCopySchemaInCompatibleMode();
|
||||||
|
|
||||||
if (!is_zero_copy_in_compatible_mode)
|
|
||||||
{ /// All replicas have new version
|
|
||||||
cleanupOldZeroCopySchema();
|
cleanupOldZeroCopySchema();
|
||||||
}
|
|
||||||
|
|
||||||
/// In this thread replica will be activated.
|
/// In this thread replica will be activated.
|
||||||
restarting_thread.start();
|
restarting_thread.start();
|
||||||
@ -7144,9 +7139,7 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part)
|
|||||||
String id = part.getUniqueId();
|
String id = part.getUniqueId();
|
||||||
boost::replace_all(id, "/", "_");
|
boost::replace_all(id, "/", "_");
|
||||||
|
|
||||||
const String * zookeeper_path_ptr = is_zero_copy_in_compatible_mode ? &zookeeper_path : nullptr;
|
Strings zc_zookeeper_paths = getZeroCopyRootPath(*getDefaultSettings(), is_zero_copy_in_compatible_mode ? zookeeper_path : "");
|
||||||
|
|
||||||
Strings zc_zookeeper_paths = getZeroCopyRootPath(*getDefaultSettings(), zookeeper_path_ptr);
|
|
||||||
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
|
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
|
||||||
{
|
{
|
||||||
String zookeeper_node = fs::path(zc_zookeeper_path) / zero_copy / "shared" / part.name / id / replica_name;
|
String zookeeper_node = fs::path(zc_zookeeper_path) / zero_copy / "shared" / part.name / id / replica_name;
|
||||||
@ -7158,6 +7151,12 @@ void StorageReplicatedMergeTree::lockSharedData(const IMergeTreeDataPart & part)
|
|||||||
|
|
||||||
|
|
||||||
bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part) const
|
bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part) const
|
||||||
|
{
|
||||||
|
return unlockSharedData(part, part.name);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & part, const String & name) const
|
||||||
{
|
{
|
||||||
if (!part.volume)
|
if (!part.volume)
|
||||||
return true;
|
return true;
|
||||||
@ -7170,25 +7169,23 @@ bool StorageReplicatedMergeTree::unlockSharedData(const IMergeTreeDataPart & par
|
|||||||
return true;
|
return true;
|
||||||
|
|
||||||
auto ref_count = part.getRefCount();
|
auto ref_count = part.getRefCount();
|
||||||
LOG_TRACE(log, "RefCount {} for part {}", ref_count, part.name);
|
|
||||||
if (ref_count > 0) /// Keep part shard info for frozen backups
|
if (ref_count > 0) /// Keep part shard info for frozen backups
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
const String * zookeeper_path_ptr = is_zero_copy_in_compatible_mode ? &zookeeper_path : nullptr;
|
return unlockSharedDataById(part.getUniqueId(), name, replica_name, disk, zookeeper, *getDefaultSettings(), log,
|
||||||
|
is_zero_copy_in_compatible_mode ? zookeeper_path : String(""));
|
||||||
return unlockSharedDataById(part.getUniqueId(), part.name, replica_name, disk, zookeeper, *getDefaultSettings(), log, zookeeper_path_ptr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool StorageReplicatedMergeTree::unlockSharedDataById(String id, const String & part_name,
|
bool StorageReplicatedMergeTree::unlockSharedDataById(String id, const String & part_name,
|
||||||
const String & replica_name_, DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_, const MergeTreeSettings & settings,
|
const String & replica_name_, DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_ptr, const MergeTreeSettings & settings,
|
||||||
Poco::Logger * logger, const String * zookeeper_path_ptr)
|
Poco::Logger * logger, const String & zookeeper_path_old)
|
||||||
{
|
{
|
||||||
boost::replace_all(id, "/", "_");
|
boost::replace_all(id, "/", "_");
|
||||||
|
|
||||||
String zero_copy = fmt::format("zero_copy_{}", toString(disk->getType()));
|
String zero_copy = fmt::format("zero_copy_{}", toString(disk->getType()));
|
||||||
|
|
||||||
Strings zc_zookeeper_paths = getZeroCopyRootPath(settings, zookeeper_path_ptr);
|
Strings zc_zookeeper_paths = getZeroCopyRootPath(settings, zookeeper_path_old);
|
||||||
|
|
||||||
bool res = true;
|
bool res = true;
|
||||||
|
|
||||||
@ -7200,10 +7197,10 @@ bool StorageReplicatedMergeTree::unlockSharedDataById(String id, const String &
|
|||||||
|
|
||||||
LOG_TRACE(logger, "Remove zookeeper lock {}", zookeeper_node);
|
LOG_TRACE(logger, "Remove zookeeper lock {}", zookeeper_node);
|
||||||
|
|
||||||
zookeeper_->tryRemove(zookeeper_node);
|
zookeeper_ptr->tryRemove(zookeeper_node);
|
||||||
|
|
||||||
Strings children;
|
Strings children;
|
||||||
zookeeper_->tryGetChildren(zookeeper_part_uniq_node, children);
|
zookeeper_ptr->tryGetChildren(zookeeper_part_uniq_node, children);
|
||||||
|
|
||||||
if (!children.empty())
|
if (!children.empty())
|
||||||
{
|
{
|
||||||
@ -7212,14 +7209,16 @@ bool StorageReplicatedMergeTree::unlockSharedDataById(String id, const String &
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
zookeeper_->tryRemove(zookeeper_part_uniq_node);
|
zookeeper_ptr->tryRemove(zookeeper_part_uniq_node);
|
||||||
|
|
||||||
/// Even when we have lock with same part name, but with different uniq, we can remove files on S3
|
/// Even when we have lock with same part name, but with different uniq, we can remove files on S3
|
||||||
children.clear();
|
children.clear();
|
||||||
zookeeper_->tryGetChildren(zookeeper_part_node, children);
|
zookeeper_ptr->tryGetChildren(zookeeper_part_node, children);
|
||||||
if (children.empty())
|
if (children.empty())
|
||||||
|
{
|
||||||
/// Cleanup after last uniq removing
|
/// Cleanup after last uniq removing
|
||||||
zookeeper_->tryRemove(zookeeper_part_node);
|
zookeeper_ptr->tryRemove(zookeeper_part_node);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
@ -7257,9 +7256,7 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
|
|||||||
|
|
||||||
String zero_copy = fmt::format("zero_copy_{}", toString(disk_type));
|
String zero_copy = fmt::format("zero_copy_{}", toString(disk_type));
|
||||||
|
|
||||||
const String * zookeeper_path_ptr = is_zero_copy_in_compatible_mode ? &zookeeper_path : nullptr;
|
Strings zc_zookeeper_paths = getZeroCopyRootPath(*getDefaultSettings(), is_zero_copy_in_compatible_mode ? zookeeper_path : "");
|
||||||
|
|
||||||
Strings zc_zookeeper_paths = getZeroCopyRootPath(*getDefaultSettings(), zookeeper_path_ptr);
|
|
||||||
|
|
||||||
std::set<String> replicas;
|
std::set<String> replicas;
|
||||||
|
|
||||||
@ -7331,30 +7328,30 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
Strings StorageReplicatedMergeTree::getZeroCopyRootPath(const MergeTreeSettings & settings, const String * zookeeper_path_ptr)
|
Strings StorageReplicatedMergeTree::getZeroCopyRootPath(const MergeTreeSettings & settings, const String & zookeeper_path_old)
|
||||||
{
|
{
|
||||||
Strings res;
|
Strings res;
|
||||||
|
|
||||||
res.push_back(settings.remote_fs_zero_copy_zookeeper_path);
|
res.push_back(settings.remote_fs_zero_copy_zookeeper_path);
|
||||||
if (zookeeper_path_ptr) /// Compatibility mode for cluster with old and new versions
|
if (!zookeeper_path_old.empty()) /// Compatibility mode for cluster with old and new versions
|
||||||
res.push_back(*zookeeper_path_ptr);
|
res.push_back(zookeeper_path_old);
|
||||||
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
String StorageReplicatedMergeTree::findReplicaHavingPart(
|
String StorageReplicatedMergeTree::findReplicaHavingPart(
|
||||||
const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_)
|
const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_ptr)
|
||||||
{
|
{
|
||||||
Strings replicas = zookeeper_->getChildren(fs::path(zookeeper_path_) / "replicas");
|
Strings replicas = zookeeper_ptr->getChildren(fs::path(zookeeper_path_) / "replicas");
|
||||||
|
|
||||||
/// Select replicas in uniformly random order.
|
/// Select replicas in uniformly random order.
|
||||||
std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
|
std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
|
||||||
|
|
||||||
for (const String & replica : replicas)
|
for (const String & replica : replicas)
|
||||||
{
|
{
|
||||||
if (zookeeper_->exists(fs::path(zookeeper_path_) / "replicas" / replica / "parts" / part_name)
|
if (zookeeper_ptr->exists(fs::path(zookeeper_path_) / "replicas" / replica / "parts" / part_name)
|
||||||
&& zookeeper_->exists(fs::path(zookeeper_path_) / "replicas" / replica / "is_active"))
|
&& zookeeper_ptr->exists(fs::path(zookeeper_path_) / "replicas" / replica / "is_active"))
|
||||||
return fs::path(zookeeper_path_) / "replicas" / replica;
|
return fs::path(zookeeper_path_) / "replicas" / replica;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -7594,6 +7591,8 @@ void StorageReplicatedMergeTree::convertZeroCopySchema()
|
|||||||
LOG_INFO(log, "Convert zero_copy version from {} to {} for {}", zero_copy_version, required_zero_copy_version,
|
LOG_INFO(log, "Convert zero_copy version from {} to {} for {}", zero_copy_version, required_zero_copy_version,
|
||||||
version_path.string());
|
version_path.string());
|
||||||
|
|
||||||
|
unsigned long converted_part_counter = 0;
|
||||||
|
|
||||||
for (auto const & disk_type : disk_types)
|
for (auto const & disk_type : disk_types)
|
||||||
{
|
{
|
||||||
String zero_copy = fmt::format("zero_copy_{}", disk_type);
|
String zero_copy = fmt::format("zero_copy_{}", disk_type);
|
||||||
@ -7605,18 +7604,26 @@ void StorageReplicatedMergeTree::convertZeroCopySchema()
|
|||||||
auto old_shard_root = revert_to_version == 1 ? shard_root_v2 : shard_root_v1;
|
auto old_shard_root = revert_to_version == 1 ? shard_root_v2 : shard_root_v1;
|
||||||
auto new_shard_root = revert_to_version == 1 ? shard_root_v1 : shard_root_v2;
|
auto new_shard_root = revert_to_version == 1 ? shard_root_v1 : shard_root_v2;
|
||||||
|
|
||||||
Coordination::Stat stat;
|
Strings parts;
|
||||||
Strings parts = current_zookeeper->getChildren(old_shard_root, &stat);
|
current_zookeeper->tryGetChildren(old_shard_root, parts);
|
||||||
|
|
||||||
for (const auto & part : parts)
|
for (const auto & part_name : parts)
|
||||||
{
|
{
|
||||||
Strings ids = current_zookeeper->getChildren(old_shard_root / part, &stat);
|
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
|
||||||
|
auto part = getPartIfExists(part_info, {MergeTreeDataPartState::Committed});
|
||||||
|
|
||||||
|
if (part)
|
||||||
|
{ /// Do not move lost locks
|
||||||
|
Strings ids;
|
||||||
|
current_zookeeper->tryGetChildren(old_shard_root / part_name, ids);
|
||||||
for (const auto & id : ids)
|
for (const auto & id : ids)
|
||||||
{
|
{
|
||||||
if (current_zookeeper->exists(old_shard_root / part / id / replica_name))
|
if (current_zookeeper->exists(old_shard_root / part_name / id / replica_name))
|
||||||
{
|
{
|
||||||
auto zookeeper_node = new_shard_root / part / id / replica_name;
|
auto zookeeper_node = new_shard_root / part_name / id / replica_name;
|
||||||
createZeroCopyLockNode(current_zookeeper, zookeeper_node.string());
|
createZeroCopyLockNode(current_zookeeper, zookeeper_node.string());
|
||||||
|
++converted_part_counter;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -7630,13 +7637,16 @@ void StorageReplicatedMergeTree::convertZeroCopySchema()
|
|||||||
current_zookeeper->createOrUpdate(version_path / "cleanup_required", std::to_string(zero_copy_version),
|
current_zookeeper->createOrUpdate(version_path / "cleanup_required", std::to_string(zero_copy_version),
|
||||||
zkutil::CreateMode::Persistent);
|
zkutil::CreateMode::Persistent);
|
||||||
|
|
||||||
LOG_INFO(log, "Convert zero_copy version from {} to {} for {} complete", zero_copy_version, required_zero_copy_version,
|
LOG_INFO(log, "Convert zero_copy version from {} to {} for {} complete, converted {} locks", zero_copy_version, required_zero_copy_version,
|
||||||
version_path.string());
|
version_path.string(), converted_part_counter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void StorageReplicatedMergeTree::cleanupOldZeroCopySchema()
|
void StorageReplicatedMergeTree::cleanupOldZeroCopySchema()
|
||||||
{
|
{
|
||||||
|
if (is_zero_copy_in_compatible_mode)
|
||||||
|
return; /// Some replicas have old version
|
||||||
|
|
||||||
if (!current_zookeeper)
|
if (!current_zookeeper)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@ -7672,17 +7682,7 @@ void StorageReplicatedMergeTree::cleanupOldZeroCopySchema()
|
|||||||
|
|
||||||
auto old_shard_root = fs::path(zookeeper_path) / zero_copy / "shared";
|
auto old_shard_root = fs::path(zookeeper_path) / zero_copy / "shared";
|
||||||
|
|
||||||
Coordination::Stat stat;
|
current_zookeeper->tryRemoveRecursive(old_shard_root);
|
||||||
Strings parts = current_zookeeper->getChildren(old_shard_root, &stat);
|
|
||||||
|
|
||||||
for (const auto & part : parts)
|
|
||||||
{
|
|
||||||
Strings ids = current_zookeeper->getChildren(old_shard_root / part, &stat);
|
|
||||||
for (const auto & id : ids)
|
|
||||||
{
|
|
||||||
current_zookeeper->remove(old_shard_root / part / id / replica_name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
current_zookeeper->remove(old_version_path);
|
current_zookeeper->remove(old_version_path);
|
||||||
|
@ -237,12 +237,15 @@ public:
|
|||||||
/// Return false if data is still used by another node
|
/// Return false if data is still used by another node
|
||||||
bool unlockSharedData(const IMergeTreeDataPart & part) const override;
|
bool unlockSharedData(const IMergeTreeDataPart & part) const override;
|
||||||
|
|
||||||
|
/// Unlock same part with other (old) name
|
||||||
|
bool unlockSharedData(const IMergeTreeDataPart & part, const String & name) const override;
|
||||||
|
|
||||||
/// Unlock shared data part in zookeeper by part id
|
/// Unlock shared data part in zookeeper by part id
|
||||||
/// Return true if data unlocked
|
/// Return true if data unlocked
|
||||||
/// Return false if data is still used by another node
|
/// Return false if data is still used by another node
|
||||||
static bool unlockSharedDataById(String id, const String & part_name, const String & replica_name_,
|
static bool unlockSharedDataById(String id, const String & part_name, const String & replica_name_,
|
||||||
DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_, const MergeTreeSettings & settings, Poco::Logger * logger,
|
DiskPtr disk, zkutil::ZooKeeperPtr zookeeper_, const MergeTreeSettings & settings, Poco::Logger * logger,
|
||||||
const String * zookeeper_path_ptr);
|
const String & zookeeper_path_old);
|
||||||
|
|
||||||
/// Fetch part only if some replica has it on shared storage like S3
|
/// Fetch part only if some replica has it on shared storage like S3
|
||||||
bool tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;
|
bool tryToFetchIfShared(const IMergeTreeDataPart & part, const DiskPtr & disk, const String & path) override;
|
||||||
@ -272,6 +275,7 @@ public:
|
|||||||
bool createEmptyPartInsteadOfLost(zkutil::ZooKeeperPtr zookeeper, const String & lost_part_name);
|
bool createEmptyPartInsteadOfLost(zkutil::ZooKeeperPtr zookeeper, const String & lost_part_name);
|
||||||
|
|
||||||
virtual String getZooKeeperName() const override { return zookeeper_name; }
|
virtual String getZooKeeperName() const override { return zookeeper_name; }
|
||||||
|
virtual String getZooKeeperPath() const override { return zookeeper_path; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::atomic_bool are_restoring_replica {false};
|
std::atomic_bool are_restoring_replica {false};
|
||||||
@ -737,7 +741,7 @@ private:
|
|||||||
PartitionBlockNumbersHolder allocateBlockNumbersInAffectedPartitions(
|
PartitionBlockNumbersHolder allocateBlockNumbersInAffectedPartitions(
|
||||||
const MutationCommands & commands, ContextPtr query_context, const zkutil::ZooKeeperPtr & zookeeper) const;
|
const MutationCommands & commands, ContextPtr query_context, const zkutil::ZooKeeperPtr & zookeeper) const;
|
||||||
|
|
||||||
static Strings getZeroCopyRootPath(const MergeTreeSettings & settings, const String * zookeeper_path_ptr = nullptr);
|
static Strings getZeroCopyRootPath(const MergeTreeSettings & settings, const String & zookeeper_path_old);
|
||||||
|
|
||||||
/// Upgrave zero-copy version
|
/// Upgrave zero-copy version
|
||||||
/// version 1 - lock for shared part inside table node in ZooKeeper
|
/// version 1 - lock for shared part inside table node in ZooKeeper
|
||||||
|
Loading…
Reference in New Issue
Block a user