Merge branch 'fix_zero_copy_not_atomic' of github.com:ClickHouse/ClickHouse into fix_zero_copy_not_atomic

This commit is contained in:
alesapin 2023-04-30 15:02:27 +02:00
commit ec2c860a39
4 changed files with 20 additions and 21 deletions

View File

@ -524,8 +524,6 @@ public:
void setServerCompletelyStarted();
private:
friend class EphemeralNodeHolder;
void init(ZooKeeperArgs args_);
/// The following methods don't any throw exceptions but return error codes.

View File

@ -821,6 +821,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
const auto data_settings = data.getSettings();
MergeTreeData::DataPart::Checksums data_checksums;
zkutil::EphemeralNodeHolderPtr zero_copy_temporary_lock_holder;
if (to_remote_disk)
{
readStringBinary(part_id, in);
@ -829,7 +830,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
throw Exception(ErrorCodes::ZERO_COPY_REPLICATION_ERROR, "Part {} unique id {} doesn't exist on {} (with type {}).", part_name, part_id, disk->getName(), toString(disk->getDataSourceDescription().type));
LOG_DEBUG(log, "Downloading part {} unique id {} metadata onto disk {}.", part_name, part_id, disk->getName());
data.lockSharedDataTemporary(part_name, part_id, disk);
zero_copy_temporary_lock_holder = data.lockSharedDataTemporary(part_name, part_id, disk);
}
else
{
@ -938,7 +939,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
if (to_remote_disk)
{
data.lockSharedData(*new_data_part, /* replace_existing_lock = */ true, {});
LOG_DEBUG(log, "Download of part {} unique id {} metadata onto disk {} finished.", part_name, part_id, disk->getName());
}
else
@ -947,6 +947,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
new_data_part->checksums.checkEqual(data_checksums, false);
LOG_DEBUG(log, "Download of part {} onto disk {} finished.", part_name, disk->getName());
}
if (zero_copy_temporary_lock_holder)
zero_copy_temporary_lock_holder->setAlreadyRemoved();
return new_data_part;
}

View File

@ -1468,7 +1468,7 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
}
MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAndCommit(Transaction & transaction,
const MutableDataPartPtr & part, std::optional<MergeTreeData::HardlinkedFiles> hardlinked_files)
const MutableDataPartPtr & part, std::optional<MergeTreeData::HardlinkedFiles> hardlinked_files, bool replace_zero_copy_lock)
{
auto zookeeper = getZooKeeper();
@ -1477,7 +1477,7 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd
Coordination::Requests ops;
NameSet absent_part_paths_on_replicas;
getLockSharedDataOps(*part, std::make_shared<ZooKeeperWithFaultInjection>(zookeeper), false, hardlinked_files, ops);
getLockSharedDataOps(*part, std::make_shared<ZooKeeperWithFaultInjection>(zookeeper), replace_zero_copy_lock, hardlinked_files, ops);
size_t zero_copy_lock_ops_size = ops.size();
/// Checksums are checked here and `ops` is filled. In fact, the part is added to ZK just below, when executing `multi`.
@ -4162,7 +4162,7 @@ bool StorageReplicatedMergeTree::fetchPart(
Transaction transaction(*this, NO_TRANSACTION_RAW);
renameTempPartAndReplace(part, transaction);
replaced_parts = checkPartChecksumsAndCommit(transaction, part, hardlinked_files);
replaced_parts = checkPartChecksumsAndCommit(transaction, part, hardlinked_files, !part_to_clone);
/** If a quorum is tracked for this part, you must update it.
* If you do not have time, in case of losing the session, when you restart the server - see the `ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart` method.
@ -8113,31 +8113,30 @@ std::optional<String> StorageReplicatedMergeTree::tryGetTableSharedIDFromCreateQ
}
void StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const
zkutil::EphemeralNodeHolderPtr StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const
{
auto settings = getSettings();
if (!disk || !disk->supportZeroCopyReplication() || !settings->allow_remote_fs_zero_copy_replication)
return;
return {};
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
return;
return {};
String id = part_id;
boost::replace_all(id, "/", "_");
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(),
part_name, zookeeper_path);
String zc_zookeeper_path = getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(),
part_name, zookeeper_path)[0];
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
{
String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name;
String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name;
LOG_TRACE(log, "Set zookeeper temporary ephemeral lock {}", zookeeper_node);
createZeroCopyLockNode(
std::make_shared<ZooKeeperWithFaultInjection>(zookeeper), zookeeper_node, zkutil::CreateMode::Ephemeral, false);
}
LOG_TRACE(log, "Set zookeeper temporary ephemeral lock {}", zookeeper_node);
createZeroCopyLockNode(
std::make_shared<ZooKeeperWithFaultInjection>(zookeeper), zookeeper_node, zkutil::CreateMode::Ephemeral, false);
return zkutil::EphemeralNodeHolder::existing(zookeeper_node, *zookeeper);
}
void StorageReplicatedMergeTree::lockSharedData(

View File

@ -259,7 +259,7 @@ public:
std::optional<HardlinkedFiles> hardlinked_files,
Coordination::Requests & requests) const;
void lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const;
zkutil::EphemeralNodeHolderPtr lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const;
/// Unlock shared data part in zookeeper
/// Return true if data unlocked
@ -549,7 +549,7 @@ private:
String getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const;
/// Accepts a PreActive part, atomically checks its checksums with ones on other replicas and commit the part
DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction, const MutableDataPartPtr & part, std::optional<HardlinkedFiles> hardlinked_files = {});
DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction, const MutableDataPartPtr & part, std::optional<HardlinkedFiles> hardlinked_files = {}, bool replace_zero_copy_lock=false);
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;