mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 21:24:28 +00:00
parent
839d6f7072
commit
88a6d4e206
@ -25,10 +25,10 @@ static String parentPath(const String & path)
|
|||||||
return "/";
|
return "/";
|
||||||
}
|
}
|
||||||
|
|
||||||
static String baseName(const String & path)
|
static std::string_view getBaseNameView(const String & path)
|
||||||
{
|
{
|
||||||
auto rslash_pos = path.rfind('/');
|
size_t basename_start = path.rfind('/');
|
||||||
return path.substr(rslash_pos + 1);
|
return std::string_view{&path[basename_start + 1], path.length() - basename_start - 1};
|
||||||
}
|
}
|
||||||
|
|
||||||
static NuKeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, NuKeeperStorage::Watches & watches, NuKeeperStorage::Watches & list_watches, Coordination::Event event_type)
|
static NuKeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, NuKeeperStorage::Watches & watches, NuKeeperStorage::Watches & list_watches, Coordination::Event event_type)
|
||||||
@ -167,14 +167,17 @@ struct NuKeeperStorageCreateRequest final : public NuKeeperStorageRequest
|
|||||||
|
|
||||||
/// Increment sequential number even if node is not sequential
|
/// Increment sequential number even if node is not sequential
|
||||||
++it->second.seq_num;
|
++it->second.seq_num;
|
||||||
|
|
||||||
response.path_created = path_created;
|
response.path_created = path_created;
|
||||||
container.emplace(path_created, std::move(created_node));
|
|
||||||
|
auto [child_itr, created] = container.emplace(path_created, std::move(created_node));
|
||||||
|
|
||||||
|
auto child_path_view = getBaseNameView(child_itr->first);
|
||||||
|
it->second.children.insert(child_path_view);
|
||||||
|
|
||||||
if (request.is_ephemeral)
|
if (request.is_ephemeral)
|
||||||
ephemerals[session_id].emplace(path_created);
|
ephemerals[session_id].emplace(path_created);
|
||||||
|
|
||||||
undo = [&container, &ephemerals, session_id, path_created, is_ephemeral = request.is_ephemeral, parent_path = it->first]
|
undo = [&container, &ephemerals, session_id, path_created, is_ephemeral = request.is_ephemeral, parent_path = it->first, child_path_view]
|
||||||
{
|
{
|
||||||
container.erase(path_created);
|
container.erase(path_created);
|
||||||
if (is_ephemeral)
|
if (is_ephemeral)
|
||||||
@ -183,6 +186,7 @@ struct NuKeeperStorageCreateRequest final : public NuKeeperStorageRequest
|
|||||||
--undo_parent.stat.cversion;
|
--undo_parent.stat.cversion;
|
||||||
--undo_parent.stat.numChildren;
|
--undo_parent.stat.numChildren;
|
||||||
--undo_parent.seq_num;
|
--undo_parent.seq_num;
|
||||||
|
undo_parent.children.erase(child_path_view);
|
||||||
};
|
};
|
||||||
|
|
||||||
++it->second.stat.cversion;
|
++it->second.stat.cversion;
|
||||||
@ -250,21 +254,25 @@ struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest
|
|||||||
if (prev_node.is_ephemeral)
|
if (prev_node.is_ephemeral)
|
||||||
ephemerals[session_id].erase(request.path);
|
ephemerals[session_id].erase(request.path);
|
||||||
|
|
||||||
container.erase(it);
|
auto child_basename_view = getBaseNameView(it->first);
|
||||||
auto & parent = container.at(parentPath(request.path));
|
auto & parent = container.at(parentPath(request.path));
|
||||||
--parent.stat.numChildren;
|
--parent.stat.numChildren;
|
||||||
++parent.stat.cversion;
|
++parent.stat.cversion;
|
||||||
|
parent.children.erase(child_basename_view);
|
||||||
response.error = Coordination::Error::ZOK;
|
response.error = Coordination::Error::ZOK;
|
||||||
|
|
||||||
|
container.erase(it);
|
||||||
|
|
||||||
undo = [prev_node, &container, &ephemerals, session_id, path = request.path]
|
undo = [prev_node, &container, &ephemerals, session_id, path = request.path]
|
||||||
{
|
{
|
||||||
if (prev_node.is_ephemeral)
|
if (prev_node.is_ephemeral)
|
||||||
ephemerals[session_id].emplace(path);
|
ephemerals[session_id].emplace(path);
|
||||||
|
|
||||||
container.emplace(path, prev_node);
|
auto [itr, inserted] = container.emplace(path, prev_node);
|
||||||
auto & undo_parent = container.at(parentPath(path));
|
auto & undo_parent = container.at(parentPath(path));
|
||||||
++undo_parent.stat.numChildren;
|
++undo_parent.stat.numChildren;
|
||||||
--undo_parent.stat.cversion;
|
--undo_parent.stat.cversion;
|
||||||
|
undo_parent.children.insert(getBaseNameView(itr->first));
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -370,17 +378,10 @@ struct NuKeeperStorageListRequest final : public NuKeeperStorageRequest
|
|||||||
if (path_prefix.empty())
|
if (path_prefix.empty())
|
||||||
throw DB::Exception("Logical error: path cannot be empty", ErrorCodes::LOGICAL_ERROR);
|
throw DB::Exception("Logical error: path cannot be empty", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
if (path_prefix.back() != '/')
|
for (const auto & name : it->second.children)
|
||||||
path_prefix += '/';
|
response.names.emplace_back(name);
|
||||||
|
|
||||||
/// Fairly inefficient.
|
std::sort(response.names.begin(), response.names.end());
|
||||||
for (auto child_it = container.upper_bound(path_prefix);
|
|
||||||
child_it != container.end() && startsWith(child_it->first, path_prefix);
|
|
||||||
++child_it)
|
|
||||||
{
|
|
||||||
if (parentPath(child_it->first) == request.path)
|
|
||||||
response.names.emplace_back(baseName(child_it->first));
|
|
||||||
}
|
|
||||||
|
|
||||||
response.stat = it->second.stat;
|
response.stat = it->second.stat;
|
||||||
response.error = Coordination::Error::ZOK;
|
response.error = Coordination::Error::ZOK;
|
||||||
|
@ -16,6 +16,7 @@ using namespace DB;
|
|||||||
struct NuKeeperStorageRequest;
|
struct NuKeeperStorageRequest;
|
||||||
using NuKeeperStorageRequestPtr = std::shared_ptr<NuKeeperStorageRequest>;
|
using NuKeeperStorageRequestPtr = std::shared_ptr<NuKeeperStorageRequest>;
|
||||||
using ResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr &)>;
|
using ResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr &)>;
|
||||||
|
using ChildrenRefSet = std::unordered_set<std::string_view>;
|
||||||
|
|
||||||
class NuKeeperStorage
|
class NuKeeperStorage
|
||||||
{
|
{
|
||||||
@ -30,6 +31,7 @@ public:
|
|||||||
bool is_sequental = false;
|
bool is_sequental = false;
|
||||||
Coordination::Stat stat{};
|
Coordination::Stat stat{};
|
||||||
int32_t seq_num = 0;
|
int32_t seq_num = 0;
|
||||||
|
ChildrenRefSet children;
|
||||||
};
|
};
|
||||||
|
|
||||||
struct ResponseForSession
|
struct ResponseForSession
|
||||||
@ -48,9 +50,9 @@ public:
|
|||||||
|
|
||||||
using RequestsForSessions = std::vector<RequestForSession>;
|
using RequestsForSessions = std::vector<RequestForSession>;
|
||||||
|
|
||||||
using Container = std::map<std::string, Node>;
|
using Container = std::unordered_map<std::string, Node>;
|
||||||
using Ephemerals = std::unordered_map<int64_t, std::unordered_set<String>>;
|
using Ephemerals = std::unordered_map<int64_t, std::unordered_set<std::string>>;
|
||||||
using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<String>>;
|
using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<std::string>>;
|
||||||
using SessionAndTimeout = std::unordered_map<int64_t, long>;
|
using SessionAndTimeout = std::unordered_map<int64_t, long>;
|
||||||
using SessionIDs = std::vector<int64_t>;
|
using SessionIDs = std::vector<int64_t>;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user