This commit is contained in:
alesapin 2023-04-25 15:33:41 +02:00
parent 5a72125ab7
commit 3a24f0b6c6
4 changed files with 146 additions and 58 deletions

View File

@ -342,6 +342,31 @@ void ZooKeeper::createAncestors(const std::string & path)
} }
} }
void ZooKeeper::checkExistsAndGetCreateAncestorsOps(const std::string & path, Coordination::Requests & requests)
{
std::vector<std::string> paths_to_check;
size_t pos = 1;
while (true)
{
pos = path.find('/', pos);
if (pos == std::string::npos)
break;
paths_to_check.emplace_back(path.substr(0, pos));
++pos;
}
MultiExistsResponse response = exists(paths_to_check);
for (size_t i = 0; i < paths_to_check.size(); ++i)
{
if (response[i].error != Coordination::Error::ZOK)
{
/// Ephemeral nodes cannot have children
requests.emplace_back(makeCreateRequest(paths_to_check[i], "", CreateMode::Persistent));
}
}
}
Coordination::Error ZooKeeper::removeImpl(const std::string & path, int32_t version) Coordination::Error ZooKeeper::removeImpl(const std::string & path, int32_t version)
{ {
auto future_result = asyncTryRemoveNoThrow(path, version); auto future_result = asyncTryRemoveNoThrow(path, version);

View File

@ -237,6 +237,8 @@ public:
/// Does not create the node itself. /// Does not create the node itself.
void createAncestors(const std::string & path); void createAncestors(const std::string & path);
void checkExistsAndGetCreateAncestorsOps(const std::string & path, Coordination::Requests & requests);
/// Remove the node if the version matches. (if version == -1, remove any version). /// Remove the node if the version matches. (if version == -1, remove any version).
void remove(const std::string & path, int32_t version = -1); void remove(const std::string & path, int32_t version = -1);

View File

@ -1477,7 +1477,8 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd
Coordination::Requests ops; Coordination::Requests ops;
NameSet absent_part_paths_on_replicas; NameSet absent_part_paths_on_replicas;
lockSharedData(*part, false, hardlinked_files); getLockSharedDataOps(*part, zookeeper, false, hardlinked_files, ops);
size_t zero_copy_lock_ops_size = ops.size();
/// Checksums are checked here and `ops` is filled. In fact, the part is added to ZK just below, when executing `multi`. /// Checksums are checked here and `ops` is filled. In fact, the part is added to ZK just below, when executing `multi`.
checkPartChecksumsAndAddCommitOps(zookeeper, part, ops, part->name, &absent_part_paths_on_replicas); checkPartChecksumsAndAddCommitOps(zookeeper, part, ops, part->name, &absent_part_paths_on_replicas);
@ -1509,7 +1510,7 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd
if (e == Coordination::Error::ZNODEEXISTS) if (e == Coordination::Error::ZNODEEXISTS)
{ {
size_t num_check_ops = 2 * absent_part_paths_on_replicas.size(); size_t num_check_ops = 2 * absent_part_paths_on_replicas.size() + zero_copy_lock_ops_size;
size_t failed_op_index = zkutil::getFailedOpIndex(e, responses); size_t failed_op_index = zkutil::getFailedOpIndex(e, responses);
if (failed_op_index < num_check_ops) if (failed_op_index < num_check_ops)
{ {
@ -8150,6 +8151,52 @@ void StorageReplicatedMergeTree::lockSharedData(
return lockSharedData(part, std::make_shared<ZooKeeperWithFaultInjection>(nullptr), replace_existing_lock, hardlinked_files); return lockSharedData(part, std::make_shared<ZooKeeperWithFaultInjection>(nullptr), replace_existing_lock, hardlinked_files);
} }
void StorageReplicatedMergeTree::getLockSharedDataOps(
const IMergeTreeDataPart & part,
const ZooKeeperWithFaultInjectionPtr & zookeeper,
bool replace_existing_lock,
std::optional<HardlinkedFiles> hardlinked_files,
Coordination::Requests & requests) const
{
if (!part.isStoredOnDisk() || !settings->allow_remote_fs_zero_copy_replication)
return;
if (!part.getDataPartStorage().supportZeroCopyReplication())
return;
if (zookeeper->isNull())
return;
String id = part.getUniqueId();
boost::replace_all(id, "/", "_");
Strings zc_zookeeper_paths = getZeroCopyPartPath(
*getSettings(), part.getDataPartStorage().getDiskType(), getTableSharedID(),
part.name, zookeeper_path);
String path_to_set_hardlinked_files;
NameSet hardlinks;
if (hardlinked_files.has_value() && !hardlinked_files->hardlinks_from_source_part.empty())
{
path_to_set_hardlinked_files = getZeroCopyPartPath(
*getSettings(), part.getDataPartStorage().getDiskType(), hardlinked_files->source_table_shared_id,
hardlinked_files->source_part_name, zookeeper_path)[0];
hardlinks = hardlinked_files->hardlinks_from_source_part;
}
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
{
String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name;
getZeroCopyLockNodeCreaetOps(
zookeeper, zookeeper_node, requests, zkutil::CreateMode::Persistent,
replace_existing_lock, path_to_set_hardlinked_files, hardlinks);
}
}
void StorageReplicatedMergeTree::lockSharedData( void StorageReplicatedMergeTree::lockSharedData(
const IMergeTreeDataPart & part, const IMergeTreeDataPart & part,
const ZooKeeperWithFaultInjectionPtr & zookeeper, const ZooKeeperWithFaultInjectionPtr & zookeeper,
@ -8190,11 +8237,13 @@ void StorageReplicatedMergeTree::lockSharedData(
{ {
String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name; String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name;
LOG_TRACE(log, "Set zookeeper persistent lock {}", zookeeper_node); LOG_TRACE(log, "Trying to create zookeeper persistent lock {}", zookeeper_node);
createZeroCopyLockNode( createZeroCopyLockNode(
zookeeper, zookeeper_node, zkutil::CreateMode::Persistent, zookeeper, zookeeper_node, zkutil::CreateMode::Persistent,
replace_existing_lock, path_to_set_hardlinked_files, hardlinks); replace_existing_lock, path_to_set_hardlinked_files, hardlinks);
LOG_TRACE(log, "Zookeeper persistent lock {} created", zookeeper_node);
} }
} }
@ -8355,6 +8404,7 @@ std::pair<bool, NameSet> getParentLockedBlobs(const ZooKeeperWithFaultInjectionP
return {true, files_not_to_remove}; return {true, files_not_to_remove};
} }
} }
LOG_TRACE(log, "No mutation parent found for part {}", part_info_str);
return {false, files_not_to_remove}; return {false, files_not_to_remove};
} }
@ -8406,6 +8456,10 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(
LOG_INFO(logger, "Lock on path {} for part {} doesn't exist, refuse to remove blobs", zookeeper_part_replica_node, part_name); LOG_INFO(logger, "Lock on path {} for part {} doesn't exist, refuse to remove blobs", zookeeper_part_replica_node, part_name);
return {false, {}}; return {false, {}};
} }
else
{
LOG_INFO(logger, "Lock on path {} for part {} doesn't exist, but we don't have mutation parent, can remove blobs", zookeeper_part_replica_node, part_name);
}
} }
else else
{ {
@ -8927,6 +8981,46 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
return true; return true;
} }
void StorageReplicatedMergeTree::getZeroCopyLockNodeCreaetOps(
const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node, Coordination::Requests & requests,
int32_t mode, bool replace_existing_lock,
const String & path_to_set_hardlinked_files, const NameSet & hardlinked_files)
{
/// Ephemeral locks can be created only when we fetch shared data.
/// So it never require to create ancestors. If we create them
/// race condition with source replica drop is possible.
if (mode == zkutil::CreateMode::Persistent)
zookeeper->checkExistsAndGetCreateAncestorsOps(zookeeper_node, requests);
if (replace_existing_lock && zookeeper->exists(zookeeper_node))
{
requests.emplace_back(zkutil::makeRemoveRequest(zookeeper_node, -1));
requests.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode));
if (!path_to_set_hardlinked_files.empty() && !hardlinked_files.empty())
{
std::string data = boost::algorithm::join(hardlinked_files, "\n");
/// List of files used to detect hardlinks. path_to_set_hardlinked_files --
/// is a path to source part zero copy node. During part removal hardlinked
/// files will be left for source part.
requests.emplace_back(zkutil::makeSetRequest(path_to_set_hardlinked_files, data, -1));
}
}
else
{
Coordination::Requests ops;
if (!path_to_set_hardlinked_files.empty() && !hardlinked_files.empty())
{
std::string data = boost::algorithm::join(hardlinked_files, "\n");
/// List of files used to detect hardlinks. path_to_set_hardlinked_files --
/// is a path to source part zero copy node. During part removal hardlinked
/// files will be left for source part.
requests.emplace_back(zkutil::makeSetRequest(path_to_set_hardlinked_files, data, -1));
}
requests.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode));
}
}
void StorageReplicatedMergeTree::createZeroCopyLockNode( void StorageReplicatedMergeTree::createZeroCopyLockNode(
const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node, int32_t mode, const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node, int32_t mode,
@ -8940,64 +9034,18 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode(
{ {
try try
{ {
/// Ephemeral locks can be created only when we fetch shared data. Coordination::Requests ops;
/// So it never require to create ancestors. If we create them getZeroCopyLockNodeCreaetOps(zookeeper, zookeeper_node, ops, mode, replace_existing_lock, path_to_set_hardlinked_files, hardlinked_files);
/// race condition with source replica drop is possible. auto error = zookeeper->tryMulti(ops, responses);
if (mode == zkutil::CreateMode::Persistent) if (error == Coordination::Error::ZOK)
zookeeper->createAncestors(zookeeper_node);
if (replace_existing_lock && zookeeper->exists(zookeeper_node))
{ {
Coordination::Requests ops; created = true;
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_node, -1)); break;
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode));
if (!path_to_set_hardlinked_files.empty() && !hardlinked_files.empty())
{
std::string data = boost::algorithm::join(hardlinked_files, "\n");
/// List of files used to detect hardlinks. path_to_set_hardlinked_files --
/// is a path to source part zero copy node. During part removal hardlinked
/// files will be left for source part.
ops.emplace_back(zkutil::makeSetRequest(path_to_set_hardlinked_files, data, -1));
}
Coordination::Responses responses;
auto error = zookeeper->tryMulti(ops, responses);
if (error == Coordination::Error::ZOK)
{
created = true;
break;
}
else if (error == Coordination::Error::ZNONODE && mode != zkutil::CreateMode::Persistent)
{
throw Exception(ErrorCodes::NOT_FOUND_NODE,
"Cannot create ephemeral zero copy lock {} because part was unlocked from zookeeper", zookeeper_node);
}
} }
else else if (error == Coordination::Error::ZNONODE && mode != zkutil::CreateMode::Persistent)
{ {
Coordination::Requests ops; throw Exception(ErrorCodes::NOT_FOUND_NODE,
if (!path_to_set_hardlinked_files.empty() && !hardlinked_files.empty()) "Cannot create ephemeral zero copy lock {} because part was unlocked from zookeeper", zookeeper_node);
{
std::string data = boost::algorithm::join(hardlinked_files, "\n");
/// List of files used to detect hardlinks. path_to_set_hardlinked_files --
/// is a path to source part zero copy node. During part removal hardlinked
/// files will be left for source part.
ops.emplace_back(zkutil::makeSetRequest(path_to_set_hardlinked_files, data, -1));
}
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode));
Coordination::Responses responses;
auto error = zookeeper->tryMulti(ops, responses);
if (error == Coordination::Error::ZOK || error == Coordination::Error::ZNODEEXISTS)
{
created = true;
break;
}
else if (error == Coordination::Error::ZNONODE && mode != zkutil::CreateMode::Persistent)
{
/// Ephemeral locks used during fetches so if parent node was removed we cannot do anything
throw Exception(ErrorCodes::NOT_FOUND_NODE,
"Cannot create ephemeral zero copy lock {} because part was unlocked from zookeeper", zookeeper_node);
}
} }
} }
catch (const zkutil::KeeperException & e) catch (const zkutil::KeeperException & e)

View File

@ -252,6 +252,13 @@ public:
bool replace_existing_lock, bool replace_existing_lock,
std::optional<HardlinkedFiles> hardlinked_files) const; std::optional<HardlinkedFiles> hardlinked_files) const;
void getLockSharedDataOps(
const IMergeTreeDataPart & part,
const ZooKeeperWithFaultInjectionPtr & zookeeper,
bool replace_existing_lock,
std::optional<HardlinkedFiles> hardlinked_files,
Coordination::Requests & requests) const;
void lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const; void lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const;
/// Unlock shared data part in zookeeper /// Unlock shared data part in zookeeper
@ -861,6 +868,12 @@ private:
int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false, int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false,
const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {}); const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {});
static void getZeroCopyLockNodeCreaetOps(
const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node, Coordination::Requests & requests,
int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false,
const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {});
bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name) override; bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name) override;
/// Create freeze metadata for table and save in zookeeper. Required only if zero-copy replication enabled. /// Create freeze metadata for table and save in zookeeper. Required only if zero-copy replication enabled.