Create ZK ancestors optimistically

This commit is contained in:
Raúl Marín 2023-07-17 12:02:32 +02:00
parent 44929b7d28
commit 41bdcdabfa
2 changed files with 27 additions and 8 deletions

View File

@ -3,8 +3,10 @@
#include "KeeperException.h"
#include "TestKeeper.h"
#include <functional>
#include <filesystem>
#include <functional>
#include <ranges>
#include <vector>
#include <Common/ZooKeeper/Types.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
@ -350,15 +352,33 @@ void ZooKeeper::createIfNotExists(const std::string & path, const std::string &
void ZooKeeper::createAncestors(const std::string & path)
{
size_t pos = 1;
std::string data = "";
std::string path_created; // Ignored
std::vector<std::string> pending_nodes;
size_t last_pos = path.rfind('/');
std::string current_node = path.substr(0, last_pos);
while (true)
{
pos = path.find('/', pos);
if (pos == std::string::npos)
Coordination::Error code = createImpl(current_node, data, CreateMode::Persistent, path_created);
if (code == Coordination::Error::ZNONODE)
{
/// The parent node doesn't exist. Save the current node and try with the parent
last_pos = current_node.rfind('/');
if (last_pos == std::string::npos || last_pos == 0)
throw KeeperException(code, path);
pending_nodes.emplace_back(std::move(current_node));
current_node = path.substr(0, last_pos);
}
else if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNODEEXISTS)
break;
createIfNotExists(path.substr(0, pos), "");
++pos;
else
throw KeeperException(code, path);
}
for (const std::string & pending : pending_nodes | std::views::reverse)
createIfNotExists(pending, data);
}
void ZooKeeper::checkExistsAndGetCreateAncestorsOps(const std::string & path, Coordination::Requests & requests)

View File

@ -9173,8 +9173,7 @@ std::optional<ZeroCopyLock> StorageReplicatedMergeTree::tryCreateZeroCopyExclusi
String zc_zookeeper_path = *getZeroCopyPartPath(part_name, disk);
/// Just recursively create ancestors for lock
zookeeper->createAncestors(zc_zookeeper_path);
zookeeper->createIfNotExists(zc_zookeeper_path, "");
zookeeper->createAncestors(zc_zookeeper_path + "/");
/// Create actual lock
ZeroCopyLock lock(zookeeper, zc_zookeeper_path, replica_name);