ClickHouse/src/Coordination/KeeperStorage.cpp

2042 lines
75 KiB
C++
Raw Normal View History

2022-05-06 12:25:25 +00:00
#include <functional>
#include <iomanip>
#include <iterator>
#include <mutex>
#include <sstream>
2021-03-29 08:24:56 +00:00
#include <Coordination/KeeperStorage.h>
2022-05-06 12:25:25 +00:00
#include <Coordination/pathUtils.h>
#include <IO/Operators.h>
#include <IO/WriteHelpers.h>
#include <boost/algorithm/string.hpp>
#include <Poco/Base64Encoder.h>
#include <Poco/SHA1Engine.h>
2022-05-13 13:43:42 +00:00
#include "Common/SipHash.h"
2022-05-05 10:32:41 +00:00
#include "Common/ZooKeeper/ZooKeeperConstants.h"
2020-11-04 18:54:55 +00:00
#include <Common/StringUtils/StringUtils.h>
2022-05-06 12:25:25 +00:00
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/hex.h>
2022-04-27 15:05:45 +00:00
#include <Common/logger_useful.h>
2022-05-06 12:25:25 +00:00
#include <Common/setThreadName.h>
2020-10-30 14:16:47 +00:00
2020-11-11 13:55:28 +00:00
namespace DB
{
2020-11-25 13:19:09 +00:00
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
2022-04-05 06:27:03 +00:00
namespace
{
2022-05-09 09:57:06 +00:00
String base64Encode(const String & decoded)
{
std::ostringstream ostr; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
ostr.exceptions(std::ios::failbit);
Poco::Base64Encoder encoder(ostr);
encoder.rdbuf()->setLineLength(0);
encoder << decoded;
encoder.close();
return ostr.str();
}
2021-05-21 21:19:22 +00:00
2022-05-09 09:57:06 +00:00
String getSHA1(const String & userdata)
{
Poco::SHA1Engine engine;
engine.update(userdata);
const auto & digest_id = engine.digest();
return String{digest_id.begin(), digest_id.end()};
}
2021-05-22 16:07:47 +00:00
2022-05-09 09:57:06 +00:00
String generateDigest(const String & userdata)
{
std::vector<String> user_password;
boost::split(user_password, userdata, [](char character) { return character == ':'; });
return user_password[0] + ":" + base64Encode(getSHA1(userdata));
}
2021-05-21 21:19:22 +00:00
2022-05-09 09:57:06 +00:00
bool fixupACL(
const std::vector<Coordination::ACL> & request_acls,
const std::vector<KeeperStorage::AuthID> & current_ids,
std::vector<Coordination::ACL> & result_acls)
{
if (request_acls.empty())
return true;
bool valid_found = false;
for (const auto & request_acl : request_acls)
{
if (request_acl.scheme == "auth")
2021-05-22 16:21:52 +00:00
{
2022-05-09 09:57:06 +00:00
for (const auto & current_id : current_ids)
2021-05-22 16:21:52 +00:00
{
2021-05-24 12:18:04 +00:00
valid_found = true;
2022-05-06 12:25:25 +00:00
Coordination::ACL new_acl = request_acl;
2022-05-09 09:57:06 +00:00
new_acl.scheme = current_id.scheme;
new_acl.id = current_id.id;
2022-05-06 12:25:25 +00:00
result_acls.push_back(new_acl);
}
2021-05-24 12:18:04 +00:00
}
2022-05-09 09:57:06 +00:00
else if (request_acl.scheme == "world" && request_acl.id == "anyone")
{
/// We don't need to save default ACLs
valid_found = true;
}
else if (request_acl.scheme == "digest")
{
Coordination::ACL new_acl = request_acl;
/// Bad auth
if (std::count(new_acl.id.begin(), new_acl.id.end(), ':') != 1)
return false;
valid_found = true;
result_acls.push_back(new_acl);
}
2021-05-22 16:21:52 +00:00
}
2022-05-09 09:57:06 +00:00
return valid_found;
}
2021-05-22 16:21:52 +00:00
2022-05-09 09:57:06 +00:00
KeeperStorage::ResponsesForSessions processWatchesImpl(
const String & path, KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches, Coordination::Event event_type)
{
KeeperStorage::ResponsesForSessions result;
auto watch_it = watches.find(path);
if (watch_it != watches.end())
2020-11-10 13:43:10 +00:00
{
2022-05-09 09:57:06 +00:00
std::shared_ptr<Coordination::ZooKeeperWatchResponse> watch_response = std::make_shared<Coordination::ZooKeeperWatchResponse>();
watch_response->path = path;
watch_response->xid = Coordination::WATCH_XID;
watch_response->zxid = -1;
watch_response->type = event_type;
watch_response->state = Coordination::State::CONNECTED;
for (auto watcher_session : watch_it->second)
result.push_back(KeeperStorage::ResponseForSession{watcher_session, watch_response});
watches.erase(watch_it);
}
2020-11-10 13:43:10 +00:00
2022-05-09 09:57:06 +00:00
auto parent_path = parentPath(path);
2022-05-09 09:57:06 +00:00
Strings paths_to_check_for_list_watches;
if (event_type == Coordination::Event::CREATED)
{
paths_to_check_for_list_watches.push_back(parent_path.toString()); /// Trigger list watches for parent
}
else if (event_type == Coordination::Event::DELETED)
{
paths_to_check_for_list_watches.push_back(path); /// Trigger both list watches for this path
paths_to_check_for_list_watches.push_back(parent_path.toString()); /// And for parent path
}
/// CHANGED event never trigger list wathes
2020-11-10 13:43:10 +00:00
2022-05-09 09:57:06 +00:00
for (const auto & path_to_check : paths_to_check_for_list_watches)
{
watch_it = list_watches.find(path_to_check);
if (watch_it != list_watches.end())
2022-05-06 12:25:25 +00:00
{
2022-05-09 09:57:06 +00:00
std::shared_ptr<Coordination::ZooKeeperWatchResponse> watch_list_response
= std::make_shared<Coordination::ZooKeeperWatchResponse>();
watch_list_response->path = path_to_check;
watch_list_response->xid = Coordination::WATCH_XID;
watch_list_response->zxid = -1;
if (path_to_check == parent_path)
watch_list_response->type = Coordination::Event::CHILD;
else
watch_list_response->type = Coordination::Event::DELETED;
watch_list_response->state = Coordination::State::CONNECTED;
for (auto watcher_session : watch_it->second)
result.push_back(KeeperStorage::ResponseForSession{watcher_session, watch_list_response});
2022-05-09 09:57:06 +00:00
list_watches.erase(watch_it);
}
2020-11-10 13:43:10 +00:00
}
2022-05-09 09:57:06 +00:00
return result;
}
2022-05-16 12:12:29 +00:00
// When this function is updated, update CURRENT_DIGEST_VERSION!!
2022-05-13 13:43:42 +00:00
UInt64 calculateDigest(std::string_view path, std::string_view data, const Coordination::Stat & stat)
{
SipHash hash;
hash.update(path);
hash.update(data);
hash.update(stat.czxid);
hash.update(stat.czxid);
hash.update(stat.mzxid);
hash.update(stat.ctime);
hash.update(stat.mtime);
hash.update(stat.version);
hash.update(stat.cversion);
hash.update(stat.aversion);
hash.update(stat.ephemeralOwner);
hash.update(stat.dataLength);
hash.update(stat.numChildren);
hash.update(stat.pzxid);
return hash.get64();
}
2022-04-05 06:27:03 +00:00
}
void KeeperStorage::Node::setData(String new_data)
{
size_bytes = size_bytes - data.size() + new_data.size();
data = std::move(new_data);
}
void KeeperStorage::Node::addChild(StringRef child_path)
{
size_bytes += sizeof child_path;
children.insert(child_path);
}
void KeeperStorage::Node::removeChild(StringRef child_path)
{
size_bytes -= sizeof child_path;
children.erase(child_path);
}
2020-11-10 13:43:10 +00:00
2022-05-13 13:43:42 +00:00
void KeeperStorage::Node::invalidateDigestCache() const
{
cached_digest.reset();
}
UInt64 KeeperStorage::Node::getDigest(const std::string_view path) const
{
if (!cached_digest)
cached_digest = calculateDigest(path, data, stat);
return *cached_digest;
};
void KeeperStorage::Node::setDigest(UInt64 digest)
{
cached_digest.emplace(digest);
}
2021-05-23 17:54:42 +00:00
KeeperStorage::KeeperStorage(int64_t tick_time_ms, const String & superdigest_)
2022-05-06 12:25:25 +00:00
: session_expiry_queue(tick_time_ms), superdigest(superdigest_)
2020-11-03 14:49:30 +00:00
{
2021-02-26 13:53:34 +00:00
container.insert("/", Node());
2020-11-03 14:49:30 +00:00
}
2022-05-06 12:25:25 +00:00
template <class... Ts>
struct Overloaded : Ts...
2022-05-05 10:32:41 +00:00
{
2022-05-06 12:25:25 +00:00
using Ts::operator()...;
};
template <class... Ts>
Overloaded(Ts...) -> Overloaded<Ts...>;
2022-05-13 13:43:42 +00:00
std::shared_ptr<KeeperStorage::Node> KeeperStorage::UncommittedState::getNode(StringRef path, std::optional<int64_t> last_zxid) const
2022-05-06 12:25:25 +00:00
{
std::shared_ptr<Node> node{nullptr};
2022-05-05 10:32:41 +00:00
if (auto maybe_node_it = storage.container.find(path); maybe_node_it != storage.container.end())
{
2022-05-06 12:25:25 +00:00
const auto & committed_node = maybe_node_it->value;
node = std::make_shared<KeeperStorage::Node>();
node->stat = committed_node.stat;
2022-05-09 11:53:19 +00:00
node->seq_num = committed_node.seq_num;
node->setData(committed_node.getData());
2022-05-13 13:43:42 +00:00
node->setDigest(committed_node.getDigest(path.toView()));
2022-05-05 10:32:41 +00:00
}
2022-05-06 12:25:25 +00:00
applyDeltas(
path,
Overloaded{
[&](const CreateNodeDelta & create_delta)
{
assert(!node);
node = std::make_shared<Node>();
node->stat = create_delta.stat;
node->setData(create_delta.data);
},
[&](const RemoveNodeDelta & /*remove_delta*/)
{
assert(node);
node = nullptr;
},
2022-05-09 11:53:19 +00:00
[&](const UpdateNodeDelta & update_delta)
{
assert(node);
2022-05-16 12:12:29 +00:00
node->invalidateDigestCache();
2022-05-09 11:53:19 +00:00
update_delta.update_fn(*node);
},
2022-05-06 12:25:25 +00:00
[&](auto && /*delta*/) {},
2022-05-13 13:43:42 +00:00
},
last_zxid);
2022-05-06 12:25:25 +00:00
return node;
}
2022-05-11 09:08:39 +00:00
bool KeeperStorage::UncommittedState::hasNode(StringRef path) const
2022-05-06 12:25:25 +00:00
{
bool exists = storage.container.contains(std::string{path});
applyDeltas(
path,
Overloaded{
[&](const CreateNodeDelta & /*create_delta*/)
{
assert(!exists);
exists = true;
},
[&](const RemoveNodeDelta & /*remove_delta*/)
{
assert(exists);
exists = false;
},
[&](auto && /*delta*/) {},
});
return exists;
2022-05-05 10:32:41 +00:00
}
2022-05-11 09:08:39 +00:00
Coordination::ACLs KeeperStorage::UncommittedState::getACLs(StringRef path) const
2022-05-09 09:16:05 +00:00
{
std::optional<uint64_t> acl_id;
if (auto maybe_node_it = storage.container.find(path); maybe_node_it != storage.container.end())
acl_id.emplace(maybe_node_it->value.acl_id);
const Coordination::ACLs * acls{nullptr};
applyDeltas(
path,
Overloaded{
[&](const CreateNodeDelta & create_delta)
{
assert(!acl_id);
acls = &create_delta.acls;
},
2022-05-11 12:10:17 +00:00
[&](const RemoveNodeDelta & /*remove_delta*/)
{
assert(acl_id || acls);
acl_id.reset();
acls = nullptr;
},
2022-05-09 09:16:05 +00:00
[&](const SetACLDelta & set_acl_delta)
{
assert(acl_id || acls);
acls = &set_acl_delta.acls;
},
[&](auto && /*delta*/) {},
});
if (acls)
return *acls;
return acl_id ? storage.acl_map.convertNumber(*acl_id) : Coordination::ACLs{};
}
2022-05-06 12:25:25 +00:00
namespace
2022-05-05 10:32:41 +00:00
{
2022-05-11 12:10:17 +00:00
[[noreturn]] void onStorageInconsistency()
{
2022-05-13 13:43:42 +00:00
LOG_ERROR(
&Poco::Logger::get("KeeperStorage"),
"Inconsistency found between uncommitted and committed data. Keeper will terminate to avoid undefined behaviour.");
2022-05-11 12:10:17 +00:00
std::terminate();
}
2022-05-05 10:32:41 +00:00
}
2022-05-06 12:25:25 +00:00
Coordination::Error KeeperStorage::commit(int64_t commit_zxid, int64_t session_id)
2022-05-05 10:32:41 +00:00
{
2022-05-11 09:08:39 +00:00
for (auto & delta : uncommitted_state.deltas)
2022-05-05 10:32:41 +00:00
{
2022-05-06 12:25:25 +00:00
if (delta.zxid > commit_zxid)
break;
2022-05-09 07:02:11 +00:00
bool finish_subdelta = false;
2022-05-06 12:25:25 +00:00
auto result = std::visit(
2022-05-10 09:53:15 +00:00
[&, &path = delta.path]<typename DeltaType>(DeltaType & operation) -> Coordination::Error
2022-05-09 09:42:23 +00:00
{
if constexpr (std::same_as<DeltaType, KeeperStorage::CreateNodeDelta>)
2022-05-06 12:25:25 +00:00
{
if (!createNode(
path,
2022-05-09 09:42:23 +00:00
std::move(operation.data),
operation.stat,
operation.is_sequental,
operation.is_ephemeral,
std::move(operation.acls),
2022-05-06 12:25:25 +00:00
session_id))
2022-05-11 12:10:17 +00:00
onStorageInconsistency();
2022-05-06 12:25:25 +00:00
return Coordination::Error::ZOK;
2022-05-09 09:42:23 +00:00
}
else if constexpr (std::same_as<DeltaType, KeeperStorage::UpdateNodeDelta>)
2022-05-06 12:25:25 +00:00
{
2022-05-09 09:16:05 +00:00
auto node_it = container.find(path);
if (node_it == container.end())
2022-05-11 12:10:17 +00:00
onStorageInconsistency();
2022-05-06 12:25:25 +00:00
2022-05-09 09:42:23 +00:00
if (operation.version != -1 && operation.version != node_it->value.stat.version)
2022-05-11 12:10:17 +00:00
onStorageInconsistency();
2022-05-06 12:25:25 +00:00
2022-05-16 12:12:29 +00:00
nodes_digest -= node_it->value.getDigest(path);
2022-05-13 13:43:42 +00:00
auto updated_node = container.updateValue(path, operation.update_fn);
2022-05-16 12:12:29 +00:00
updated_node->value.invalidateDigestCache();
nodes_digest += updated_node->value.getDigest(path);
2022-05-13 13:43:42 +00:00
2022-05-06 12:25:25 +00:00
return Coordination::Error::ZOK;
2022-05-09 09:42:23 +00:00
}
else if constexpr (std::same_as<DeltaType, KeeperStorage::RemoveNodeDelta>)
2022-05-06 12:25:25 +00:00
{
2022-05-09 09:42:23 +00:00
if (!removeNode(path, operation.version))
2022-05-11 12:10:17 +00:00
onStorageInconsistency();
2022-05-06 12:25:25 +00:00
return Coordination::Error::ZOK;
2022-05-09 09:42:23 +00:00
}
else if constexpr (std::same_as<DeltaType, KeeperStorage::SetACLDelta>)
2022-05-06 12:25:25 +00:00
{
2022-05-09 09:16:05 +00:00
auto node_it = container.find(path);
2022-05-11 07:53:32 +00:00
if (node_it == container.end())
2022-05-11 12:10:17 +00:00
onStorageInconsistency();
2022-05-06 12:25:25 +00:00
2022-05-09 09:42:23 +00:00
if (operation.version != -1 && operation.version != node_it->value.stat.aversion)
2022-05-11 12:10:17 +00:00
onStorageInconsistency();
2022-05-06 12:25:25 +00:00
2022-05-09 09:16:05 +00:00
acl_map.removeUsage(node_it->value.acl_id);
2022-05-06 12:25:25 +00:00
2022-05-09 09:42:23 +00:00
uint64_t acl_id = acl_map.convertACLs(operation.acls);
2022-05-06 12:25:25 +00:00
acl_map.addUsage(acl_id);
container.updateValue(path, [acl_id](KeeperStorage::Node & node) { node.acl_id = acl_id; });
return Coordination::Error::ZOK;
2022-05-09 09:42:23 +00:00
}
2022-05-09 10:19:20 +00:00
else if constexpr (std::same_as<DeltaType, KeeperStorage::ErrorDelta>)
2022-05-09 09:42:23 +00:00
return operation.error;
else if constexpr (std::same_as<DeltaType, KeeperStorage::SubDeltaEnd>)
2022-05-09 07:02:11 +00:00
{
finish_subdelta = true;
return Coordination::Error::ZOK;
2022-05-09 09:42:23 +00:00
}
else if constexpr (std::same_as<DeltaType, KeeperStorage::AddAuthDelta>)
{
session_and_auth[operation.session_id].emplace_back(std::move(operation.auth_id));
return Coordination::Error::ZOK;
}
2022-05-09 09:42:23 +00:00
else
2022-05-09 07:02:11 +00:00
{
2022-05-09 09:42:23 +00:00
// shouldn't be called in any process functions
2022-05-11 12:10:17 +00:00
onStorageInconsistency();
2022-05-09 09:42:23 +00:00
}
},
2022-05-06 12:25:25 +00:00
delta.operation);
if (result != Coordination::Error::ZOK)
return result;
2022-05-09 07:02:11 +00:00
if (finish_subdelta)
return Coordination::Error::ZOK;
2022-05-06 12:25:25 +00:00
}
return Coordination::Error::ZOK;
}
bool KeeperStorage::createNode(
const std::string & path,
String data,
const Coordination::Stat & stat,
bool is_sequental,
bool is_ephemeral,
Coordination::ACLs node_acls,
int64_t session_id)
{
auto parent_path = parentPath(path);
2022-05-09 09:16:05 +00:00
auto node_it = container.find(parent_path);
2022-05-06 12:25:25 +00:00
2022-05-09 09:16:05 +00:00
if (node_it == container.end())
2022-05-06 12:25:25 +00:00
return false;
2022-05-09 09:16:05 +00:00
if (node_it->value.stat.ephemeralOwner != 0)
2022-05-06 12:25:25 +00:00
return false;
if (container.contains(path))
return false;
KeeperStorage::Node created_node;
uint64_t acl_id = acl_map.convertACLs(node_acls);
acl_map.addUsage(acl_id);
created_node.acl_id = acl_id;
created_node.stat = stat;
created_node.setData(std::move(data));
created_node.is_sequental = is_sequental;
auto [map_key, _] = container.insert(path, created_node);
/// Take child path from key owned by map.
auto child_path = getBaseName(map_key->getKey());
container.updateValue(parent_path, [child_path](KeeperStorage::Node & parent) { parent.addChild(child_path); });
if (is_ephemeral)
ephemerals[session_id].emplace(path);
2022-05-13 13:43:42 +00:00
auto digest = map_key->getMapped()->value.getDigest(map_key->getKey().toView());
2022-05-16 12:12:29 +00:00
nodes_digest += digest;
2022-05-13 13:43:42 +00:00
2022-05-06 12:25:25 +00:00
return true;
};
2022-05-05 10:32:41 +00:00
2022-05-06 12:25:25 +00:00
bool KeeperStorage::removeNode(const std::string & path, int32_t version)
{
2022-05-09 09:16:05 +00:00
auto node_it = container.find(path);
if (node_it == container.end())
2022-05-06 12:25:25 +00:00
return false;
2022-05-09 09:16:05 +00:00
if (version != -1 && version != node_it->value.stat.version)
2022-05-06 12:25:25 +00:00
return false;
2022-05-09 09:16:05 +00:00
if (node_it->value.stat.numChildren)
2022-05-06 12:25:25 +00:00
return false;
2022-05-09 09:16:05 +00:00
auto prev_node = node_it->value;
2022-05-06 12:25:25 +00:00
if (prev_node.stat.ephemeralOwner != 0)
{
auto ephemerals_it = ephemerals.find(prev_node.stat.ephemeralOwner);
ephemerals_it->second.erase(path);
if (ephemerals_it->second.empty())
ephemerals.erase(ephemerals_it);
2022-05-05 10:32:41 +00:00
}
2022-05-06 12:25:25 +00:00
acl_map.removeUsage(prev_node.acl_id);
container.updateValue(
2022-05-09 09:16:05 +00:00
parentPath(path),
[child_basename = getBaseName(node_it->key)](KeeperStorage::Node & parent) { parent.removeChild(child_basename); });
2022-05-06 12:25:25 +00:00
container.erase(path);
2022-05-13 13:43:42 +00:00
2022-05-16 12:12:29 +00:00
nodes_digest -= prev_node.getDigest(path);
2022-05-06 12:25:25 +00:00
return true;
2022-05-05 10:32:41 +00:00
}
struct KeeperStorageRequestProcessor
2020-10-30 19:57:30 +00:00
{
Coordination::ZooKeeperRequestPtr zk_request;
2022-05-06 12:25:25 +00:00
explicit KeeperStorageRequestProcessor(const Coordination::ZooKeeperRequestPtr & zk_request_) : zk_request(zk_request_) { }
virtual Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const = 0;
virtual std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & /*storage*/, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /*time*/) const
{
return {};
}
2022-05-09 08:32:25 +00:00
// process the request using locally committed data
virtual Coordination::ZooKeeperResponsePtr
processLocal(KeeperStorage & /*storage*/, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /*time*/) const
{
throw Exception{DB::ErrorCodes::LOGICAL_ERROR, "Cannot process the request locally"};
}
2022-05-06 12:25:25 +00:00
virtual KeeperStorage::ResponsesForSessions
processWatches(KeeperStorage::Watches & /*watches*/, KeeperStorage::Watches & /*list_watches*/) const
{
return {};
}
2022-05-09 09:16:05 +00:00
virtual bool checkAuth(KeeperStorage & /*storage*/, int64_t /*session_id*/, bool /*is_local*/) const { return true; }
2020-11-10 13:43:10 +00:00
virtual ~KeeperStorageRequestProcessor() = default;
2020-11-20 12:36:10 +00:00
};
2020-10-30 19:57:30 +00:00
struct KeeperStorageHeartbeatRequestProcessor final : public KeeperStorageRequestProcessor
2020-11-03 14:49:30 +00:00
{
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
2022-05-06 12:25:25 +00:00
Coordination::ZooKeeperResponsePtr
process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */, int64_t /* time */) const override
2020-11-03 14:49:30 +00:00
{
2022-05-09 11:53:19 +00:00
return zk_request->makeResponse();
2020-11-03 14:49:30 +00:00
}
};
struct KeeperStorageSyncRequestProcessor final : public KeeperStorageRequestProcessor
2021-02-10 13:01:05 +00:00
{
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
2022-05-06 12:25:25 +00:00
Coordination::ZooKeeperResponsePtr
process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */, int64_t /* time */) const override
2021-02-10 13:01:05 +00:00
{
auto response = zk_request->makeResponse();
dynamic_cast<Coordination::ZooKeeperSyncResponse &>(*response).path
= dynamic_cast<Coordination::ZooKeeperSyncRequest &>(*zk_request).path;
2022-05-09 07:02:11 +00:00
return response;
2021-02-10 13:01:05 +00:00
}
};
2022-05-06 12:25:25 +00:00
namespace
{
2022-05-09 09:16:05 +00:00
Coordination::ACLs getNodeACLs(KeeperStorage & storage, StringRef path, bool is_local)
2022-05-09 09:16:05 +00:00
{
if (is_local)
{
auto node_it = storage.container.find(path);
if (node_it == storage.container.end())
return {};
return storage.acl_map.convertNumber(node_it->value.acl_id);
}
2022-05-11 09:08:39 +00:00
return storage.uncommitted_state.getACLs(path);
2022-05-09 09:16:05 +00:00
}
2022-05-06 12:25:25 +00:00
}
bool KeeperStorage::checkACL(StringRef path, int32_t permission, int64_t session_id, bool is_local)
{
const auto node_acls = getNodeACLs(*this, path, is_local);
if (node_acls.empty())
return true;
2022-05-11 09:08:39 +00:00
if (uncommitted_state.hasACL(session_id, is_local, [](const auto & auth_id) { return auth_id.scheme == "super"; }))
return true;
2022-05-11 07:53:32 +00:00
for (const auto & node_acl : node_acls)
{
if (node_acl.permissions & permission)
{
if (node_acl.scheme == "world" && node_acl.id == "anyone")
return true;
2022-05-11 09:08:39 +00:00
if (uncommitted_state.hasACL(
session_id,
is_local,
[&](const auto & auth_id) { return auth_id.scheme == node_acl.scheme && auth_id.id == node_acl.id; }))
return true;
}
}
return false;
}
2022-05-06 12:25:25 +00:00
struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestProcessor
2020-10-30 19:57:30 +00:00
{
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
2020-11-10 13:43:10 +00:00
2022-05-06 12:25:25 +00:00
KeeperStorage::ResponsesForSessions
processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override
2020-11-10 13:43:10 +00:00
{
2021-01-19 14:22:28 +00:00
return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CREATED);
2020-11-10 13:43:10 +00:00
}
2022-05-09 09:16:05 +00:00
bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override
2020-10-30 19:57:30 +00:00
{
2022-01-22 15:29:36 +00:00
auto path = zk_request->getPath();
return storage.checkACL(parentPath(path), Coordination::ACL::Create, session_id, is_local);
2021-05-21 21:19:22 +00:00
}
2022-05-06 12:25:25 +00:00
std::vector<KeeperStorage::Delta> preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
2022-05-05 10:32:41 +00:00
{
Coordination::ZooKeeperCreateRequest & request = dynamic_cast<Coordination::ZooKeeperCreateRequest &>(*zk_request);
2022-05-06 12:25:25 +00:00
std::vector<KeeperStorage::Delta> new_deltas;
2022-05-05 10:32:41 +00:00
auto parent_path = parentPath(request.path);
2022-05-11 09:08:39 +00:00
auto parent_node = storage.uncommitted_state.getNode(parent_path);
2022-05-05 10:32:41 +00:00
if (parent_node == nullptr)
2022-05-09 07:02:11 +00:00
return {{zxid, Coordination::Error::ZNONODE}};
2022-05-06 12:25:25 +00:00
2022-05-05 10:32:41 +00:00
else if (parent_node->stat.ephemeralOwner != 0)
2022-05-09 07:02:11 +00:00
return {{zxid, Coordination::Error::ZNOCHILDRENFOREPHEMERALS}};
2022-05-05 10:32:41 +00:00
std::string path_created = request.path;
if (request.is_sequential)
{
auto seq_num = parent_node->seq_num;
2022-05-06 12:25:25 +00:00
std::stringstream seq_num_str; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
2022-05-05 10:32:41 +00:00
seq_num_str.exceptions(std::ios::failbit);
seq_num_str << std::setw(10) << std::setfill('0') << seq_num;
path_created += seq_num_str.str();
}
2022-05-11 09:08:39 +00:00
if (storage.uncommitted_state.hasNode(path_created))
2022-05-09 07:02:11 +00:00
return {{zxid, Coordination::Error::ZNODEEXISTS}};
2022-05-05 10:32:41 +00:00
if (getBaseName(path_created).size == 0)
2022-05-09 07:02:11 +00:00
return {{zxid, Coordination::Error::ZBADARGUMENTS}};
2022-05-05 10:32:41 +00:00
2022-05-06 12:25:25 +00:00
Coordination::ACLs node_acls;
if (!fixupACL(request.acls, storage.session_and_auth[session_id], node_acls))
2022-05-09 07:02:11 +00:00
return {{zxid, Coordination::Error::ZINVALIDACL}};
2022-05-06 12:25:25 +00:00
Coordination::Stat stat;
stat.czxid = zxid;
stat.mzxid = zxid;
stat.pzxid = zxid;
stat.ctime = time;
stat.mtime = time;
stat.numChildren = 0;
2022-05-09 11:53:19 +00:00
stat.version = 0;
stat.aversion = 0;
2022-05-10 13:31:39 +00:00
stat.cversion = 0;
2022-05-06 12:25:25 +00:00
stat.dataLength = request.data.length();
stat.ephemeralOwner = request.is_ephemeral ? session_id : 0;
new_deltas.emplace_back(
2022-05-10 13:31:39 +00:00
std::move(path_created),
2022-05-06 12:25:25 +00:00
zxid,
KeeperStorage::CreateNodeDelta{stat, request.is_ephemeral, request.is_sequential, std::move(node_acls), request.data});
2022-05-05 10:32:41 +00:00
int32_t parent_cversion = request.parent_cversion;
2022-05-06 12:25:25 +00:00
new_deltas.emplace_back(
std::string{parent_path},
zxid,
KeeperStorage::UpdateNodeDelta{[parent_cversion, zxid](KeeperStorage::Node & node)
{
++node.seq_num;
if (parent_cversion == -1)
++node.stat.cversion;
else if (parent_cversion > node.stat.cversion)
node.stat.cversion = parent_cversion;
if (zxid > node.stat.pzxid)
node.stat.pzxid = zxid;
++node.stat.numChildren;
}});
return new_deltas;
2022-05-05 10:32:41 +00:00
}
2022-05-06 12:25:25 +00:00
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /*time*/) const override
2021-05-21 21:19:22 +00:00
{
2020-10-30 19:57:30 +00:00
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperCreateResponse & response = dynamic_cast<Coordination::ZooKeeperCreateResponse &>(*response_ptr);
2022-05-09 08:32:25 +00:00
if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK)
2020-10-30 19:57:30 +00:00
{
2022-05-06 12:25:25 +00:00
response.error = result;
2022-05-09 07:02:11 +00:00
return response_ptr;
2020-10-30 19:57:30 +00:00
}
2022-05-11 09:08:39 +00:00
const auto & deltas = storage.uncommitted_state.deltas;
2022-05-06 12:25:25 +00:00
auto create_delta_it = std::find_if(
deltas.begin(),
deltas.end(),
[zxid](const auto & delta)
{ return delta.zxid == zxid && std::holds_alternative<KeeperStorage::CreateNodeDelta>(delta.operation); });
2022-05-11 12:10:17 +00:00
assert(create_delta_it != deltas.end());
2022-05-06 12:25:25 +00:00
response.path_created = create_delta_it->path;
response.error = Coordination::Error::ZOK;
2022-05-06 12:25:25 +00:00
return response_ptr;
2020-10-30 19:57:30 +00:00
}
};
struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProcessor
2020-11-03 14:49:30 +00:00
{
2022-05-09 09:16:05 +00:00
bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override
2021-05-21 21:19:22 +00:00
{
return storage.checkACL(zk_request->getPath(), Coordination::ACL::Read, session_id, is_local);
2021-05-21 21:19:22 +00:00
}
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
2022-05-06 12:25:25 +00:00
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/) const override
{
Coordination::ZooKeeperGetRequest & request = dynamic_cast<Coordination::ZooKeeperGetRequest &>(*zk_request);
2022-05-11 09:08:39 +00:00
if (!storage.uncommitted_state.hasNode(request.path))
2022-05-09 07:02:11 +00:00
return {{zxid, Coordination::Error::ZNONODE}};
2022-05-06 12:25:25 +00:00
return {};
}
2022-05-09 08:32:25 +00:00
template <bool local>
Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const
2020-11-03 14:49:30 +00:00
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperGetResponse & response = dynamic_cast<Coordination::ZooKeeperGetResponse &>(*response_ptr);
Coordination::ZooKeeperGetRequest & request = dynamic_cast<Coordination::ZooKeeperGetRequest &>(*zk_request);
2022-05-10 07:00:38 +00:00
if constexpr (!local)
2022-05-09 08:32:25 +00:00
{
if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK)
{
response.error = result;
return response_ptr;
}
}
const auto on_error = [&]([[maybe_unused]] const auto error_code)
2020-11-03 14:49:30 +00:00
{
2022-05-09 08:32:25 +00:00
if constexpr (local)
response.error = error_code;
else
2022-05-11 12:10:17 +00:00
onStorageInconsistency();
2022-05-09 08:32:25 +00:00
};
auto & container = storage.container;
auto node_it = container.find(request.path);
if (node_it == container.end())
{
on_error(Coordination::Error::ZNONODE);
2020-11-03 14:49:30 +00:00
}
else
{
2022-05-09 08:32:25 +00:00
response.stat = node_it->value.stat;
response.data = node_it->value.getData();
2020-11-03 14:49:30 +00:00
response.error = Coordination::Error::ZOK;
}
2022-05-06 12:25:25 +00:00
return response_ptr;
2020-11-03 14:49:30 +00:00
}
2022-05-09 08:32:25 +00:00
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<false>(storage, zxid, session_id, time);
}
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<true>(storage, zxid, session_id, time);
}
2020-11-03 14:49:30 +00:00
};
struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestProcessor
2020-11-04 18:54:55 +00:00
{
2022-05-09 09:16:05 +00:00
bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override
2021-05-21 21:19:22 +00:00
{
return storage.checkACL(parentPath(zk_request->getPath()), Coordination::ACL::Delete, session_id, is_local);
2021-05-21 21:19:22 +00:00
}
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
2022-05-06 12:25:25 +00:00
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/) const override
2020-11-04 18:54:55 +00:00
{
Coordination::ZooKeeperRemoveRequest & request = dynamic_cast<Coordination::ZooKeeperRemoveRequest &>(*zk_request);
2020-11-19 16:06:19 +00:00
2022-05-06 12:25:25 +00:00
std::vector<KeeperStorage::Delta> new_deltas;
2022-05-06 12:25:25 +00:00
const auto update_parent_pzxid = [&]()
{
auto parent_path = parentPath(request.path);
2022-05-11 09:08:39 +00:00
if (!storage.uncommitted_state.hasNode(parent_path))
2022-05-06 12:25:25 +00:00
return;
new_deltas.emplace_back(
std::string{parent_path},
zxid,
KeeperStorage::UpdateNodeDelta{[zxid](KeeperStorage::Node & parent)
{
if (parent.stat.pzxid < zxid)
parent.stat.pzxid = zxid;
}});
};
2021-02-26 13:53:34 +00:00
2022-05-11 09:08:39 +00:00
auto node = storage.uncommitted_state.getNode(request.path);
2022-05-06 12:25:25 +00:00
if (!node)
{
if (request.restored_from_zookeeper_log)
update_parent_pzxid();
2022-05-09 07:02:11 +00:00
return {{zxid, Coordination::Error::ZNONODE}};
2022-05-06 12:25:25 +00:00
}
else if (request.version != -1 && request.version != node->stat.version)
2022-05-09 07:02:11 +00:00
return {{zxid, Coordination::Error::ZBADVERSION}};
2022-05-06 12:25:25 +00:00
else if (node->stat.numChildren)
2022-05-09 07:02:11 +00:00
return {{zxid, Coordination::Error::ZNOTEMPTY}};
2022-05-06 12:25:25 +00:00
if (request.restored_from_zookeeper_log)
update_parent_pzxid();
new_deltas.emplace_back(
std::string{parentPath(request.path)},
zxid,
KeeperStorage::UpdateNodeDelta{[](KeeperStorage::Node & parent)
{
--parent.stat.numChildren;
++parent.stat.cversion;
}});
new_deltas.emplace_back(request.path, zxid, KeeperStorage::RemoveNodeDelta{request.version});
return new_deltas;
}
2022-05-06 12:25:25 +00:00
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperRemoveResponse & response = dynamic_cast<Coordination::ZooKeeperRemoveResponse &>(*response_ptr);
2020-11-20 08:37:16 +00:00
2022-05-09 08:32:25 +00:00
response.error = storage.commit(zxid, session_id);
2022-05-06 12:25:25 +00:00
return response_ptr;
2020-11-04 18:54:55 +00:00
}
2020-11-10 13:43:10 +00:00
2022-05-06 12:25:25 +00:00
KeeperStorage::ResponsesForSessions
processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override
2020-11-10 13:43:10 +00:00
{
2021-01-19 14:22:28 +00:00
return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED);
2020-11-10 13:43:10 +00:00
}
2020-11-04 18:54:55 +00:00
};
struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestProcessor
2020-11-04 18:54:55 +00:00
{
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
2022-05-06 12:25:25 +00:00
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/) const override
{
Coordination::ZooKeeperExistsRequest & request = dynamic_cast<Coordination::ZooKeeperExistsRequest &>(*zk_request);
2022-05-11 09:08:39 +00:00
if (!storage.uncommitted_state.hasNode(request.path))
2022-05-09 07:02:11 +00:00
return {{zxid, Coordination::Error::ZNONODE}};
2022-05-06 12:25:25 +00:00
return {};
}
2022-05-09 08:32:25 +00:00
template <bool local>
Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const
2020-11-04 18:54:55 +00:00
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperExistsResponse & response = dynamic_cast<Coordination::ZooKeeperExistsResponse &>(*response_ptr);
Coordination::ZooKeeperExistsRequest & request = dynamic_cast<Coordination::ZooKeeperExistsRequest &>(*zk_request);
2022-05-10 07:00:38 +00:00
if constexpr (!local)
2020-11-04 18:54:55 +00:00
{
2022-05-09 08:32:25 +00:00
if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK)
{
response.error = result;
return response_ptr;
}
}
const auto on_error = [&]([[maybe_unused]] const auto error_code)
{
if constexpr (local)
response.error = error_code;
else
2022-05-11 12:10:17 +00:00
onStorageInconsistency();
2022-05-09 08:32:25 +00:00
};
auto & container = storage.container;
auto node_it = container.find(request.path);
if (node_it == container.end())
{
on_error(Coordination::Error::ZNONODE);
2020-11-04 18:54:55 +00:00
}
else
{
2022-05-09 08:32:25 +00:00
response.stat = node_it->value.stat;
response.error = Coordination::Error::ZOK;
2020-11-04 18:54:55 +00:00
}
2022-05-06 12:25:25 +00:00
return response_ptr;
2020-11-04 18:54:55 +00:00
}
2022-05-09 08:32:25 +00:00
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<false>(storage, zxid, session_id, time);
}
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<true>(storage, zxid, session_id, time);
}
2020-11-04 18:54:55 +00:00
};
struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProcessor
2020-11-04 18:54:55 +00:00
{
2022-05-09 09:16:05 +00:00
bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override
2021-05-21 21:19:22 +00:00
{
return storage.checkACL(zk_request->getPath(), Coordination::ACL::Write, session_id, is_local);
2021-05-21 21:19:22 +00:00
}
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
2022-05-06 12:25:25 +00:00
std::vector<KeeperStorage::Delta> preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t time) const override
{
Coordination::ZooKeeperSetRequest & request = dynamic_cast<Coordination::ZooKeeperSetRequest &>(*zk_request);
std::vector<KeeperStorage::Delta> new_deltas;
2022-05-11 09:08:39 +00:00
if (!storage.uncommitted_state.hasNode(request.path))
2022-05-09 07:02:11 +00:00
return {{zxid, Coordination::Error::ZNONODE}};
2022-05-06 12:25:25 +00:00
2022-05-11 09:08:39 +00:00
auto node = storage.uncommitted_state.getNode(request.path);
2022-05-06 12:25:25 +00:00
if (request.version != -1 && request.version != node->stat.version)
2022-05-09 07:02:11 +00:00
return {{zxid, Coordination::Error::ZBADVERSION}};
2022-05-06 12:25:25 +00:00
new_deltas.emplace_back(
request.path,
zxid,
KeeperStorage::UpdateNodeDelta{
2022-05-10 09:53:15 +00:00
[zxid, data = request.data, time](KeeperStorage::Node & value)
2022-05-06 12:25:25 +00:00
{
value.stat.version++;
value.stat.mzxid = zxid;
value.stat.mtime = time;
value.stat.dataLength = data.length();
2022-05-10 09:53:15 +00:00
value.setData(data);
2022-05-06 12:25:25 +00:00
},
request.version});
2022-05-13 13:43:42 +00:00
new_deltas.emplace_back(parentPath(request.path).toString(), zxid, KeeperStorage::UpdateNodeDelta{[](KeeperStorage::Node & parent) {
parent.stat.cversion++;
}});
2022-05-06 12:25:25 +00:00
return new_deltas;
}
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /*time*/) const override
2020-11-04 18:54:55 +00:00
{
2021-05-21 21:19:22 +00:00
auto & container = storage.container;
2020-11-04 18:54:55 +00:00
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperSetResponse & response = dynamic_cast<Coordination::ZooKeeperSetResponse &>(*response_ptr);
Coordination::ZooKeeperSetRequest & request = dynamic_cast<Coordination::ZooKeeperSetRequest &>(*zk_request);
2022-05-09 08:32:25 +00:00
if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK)
2020-11-04 18:54:55 +00:00
{
2022-05-06 12:25:25 +00:00
response.error = result;
2022-05-09 07:02:11 +00:00
return response_ptr;
2020-11-04 18:54:55 +00:00
}
2021-02-26 13:53:34 +00:00
2022-05-09 08:32:25 +00:00
auto node_it = container.find(request.path);
if (node_it == container.end())
2022-05-11 12:10:17 +00:00
onStorageInconsistency();
2020-11-04 18:54:55 +00:00
2022-05-09 08:32:25 +00:00
response.stat = node_it->value.stat;
2022-05-06 12:25:25 +00:00
response.error = Coordination::Error::ZOK;
2020-11-04 18:54:55 +00:00
2022-05-06 12:25:25 +00:00
return response_ptr;
2020-11-04 18:54:55 +00:00
}
2020-11-10 13:43:10 +00:00
2022-05-06 12:25:25 +00:00
KeeperStorage::ResponsesForSessions
processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override
2020-11-10 13:43:10 +00:00
{
2021-01-19 14:22:28 +00:00
return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CHANGED);
2020-11-10 13:43:10 +00:00
}
2020-11-04 18:54:55 +00:00
};
struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProcessor
2020-11-04 18:54:55 +00:00
{
2022-05-09 09:16:05 +00:00
bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override
2021-05-21 21:19:22 +00:00
{
return storage.checkACL(zk_request->getPath(), Coordination::ACL::Read, session_id, is_local);
2021-05-21 21:19:22 +00:00
}
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
2022-05-06 12:25:25 +00:00
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/) const override
{
Coordination::ZooKeeperListRequest & request = dynamic_cast<Coordination::ZooKeeperListRequest &>(*zk_request);
2022-05-11 09:08:39 +00:00
if (!storage.uncommitted_state.hasNode(request.path))
2022-05-09 07:02:11 +00:00
return {{zxid, Coordination::Error::ZNONODE}};
2022-05-06 12:25:25 +00:00
return {};
}
2022-05-09 08:32:25 +00:00
template <bool local>
Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const
2020-11-04 18:54:55 +00:00
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperListResponse & response = dynamic_cast<Coordination::ZooKeeperListResponse &>(*response_ptr);
Coordination::ZooKeeperListRequest & request = dynamic_cast<Coordination::ZooKeeperListRequest &>(*zk_request);
2022-01-22 19:36:23 +00:00
2022-05-10 07:00:38 +00:00
if constexpr (!local)
2020-11-04 18:54:55 +00:00
{
2022-05-09 08:32:25 +00:00
if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK)
{
response.error = result;
return response_ptr;
}
}
const auto on_error = [&]([[maybe_unused]] const auto error_code)
{
if constexpr (local)
response.error = error_code;
else
2022-05-11 12:10:17 +00:00
onStorageInconsistency();
2022-05-09 08:32:25 +00:00
};
auto & container = storage.container;
auto node_it = container.find(request.path);
if (node_it == container.end())
{
on_error(Coordination::Error::ZNONODE);
2020-11-04 18:54:55 +00:00
}
else
{
auto path_prefix = request.path;
if (path_prefix.empty())
2020-11-25 13:19:09 +00:00
throw DB::Exception("Logical error: path cannot be empty", ErrorCodes::LOGICAL_ERROR);
2020-11-04 18:54:55 +00:00
2022-05-09 08:32:25 +00:00
const auto & children = node_it->value.getChildren();
2022-04-05 06:27:03 +00:00
response.names.reserve(children.size());
2022-01-19 11:46:29 +00:00
2022-04-05 06:27:03 +00:00
for (const auto child : children)
2022-01-19 11:46:29 +00:00
response.names.push_back(child.toString());
2020-11-04 18:54:55 +00:00
2022-05-09 08:32:25 +00:00
response.stat = node_it->value.stat;
2020-11-04 18:54:55 +00:00
response.error = Coordination::Error::ZOK;
}
2022-05-06 12:25:25 +00:00
return response_ptr;
2020-11-04 18:54:55 +00:00
}
2022-05-09 08:32:25 +00:00
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<false>(storage, zxid, session_id, time);
}
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<true>(storage, zxid, session_id, time);
}
2020-11-04 18:54:55 +00:00
};
struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestProcessor
2020-11-04 18:54:55 +00:00
{
2022-05-09 09:16:05 +00:00
bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override
2021-05-21 21:19:22 +00:00
{
return storage.checkACL(zk_request->getPath(), Coordination::ACL::Read, session_id, is_local);
2021-05-21 21:19:22 +00:00
}
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
2022-05-06 12:25:25 +00:00
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/) const override
{
Coordination::ZooKeeperCheckRequest & request = dynamic_cast<Coordination::ZooKeeperCheckRequest &>(*zk_request);
2022-05-11 09:08:39 +00:00
if (!storage.uncommitted_state.hasNode(request.path))
2022-05-09 07:02:11 +00:00
return {{zxid, Coordination::Error::ZNONODE}};
2022-05-06 12:25:25 +00:00
2022-05-11 09:08:39 +00:00
auto node = storage.uncommitted_state.getNode(request.path);
2022-05-06 12:25:25 +00:00
if (request.version != -1 && request.version != node->stat.version)
2022-05-09 07:02:11 +00:00
return {{zxid, Coordination::Error::ZBADVERSION}};
2022-05-06 12:25:25 +00:00
return {};
}
2022-05-09 08:32:25 +00:00
template <bool local>
Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const
2020-11-04 18:54:55 +00:00
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperCheckResponse & response = dynamic_cast<Coordination::ZooKeeperCheckResponse &>(*response_ptr);
Coordination::ZooKeeperCheckRequest & request = dynamic_cast<Coordination::ZooKeeperCheckRequest &>(*zk_request);
2022-05-09 08:32:25 +00:00
2022-05-10 07:00:38 +00:00
if constexpr (!local)
2022-05-09 08:32:25 +00:00
{
if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK)
{
response.error = result;
return response_ptr;
}
}
const auto on_error = [&]([[maybe_unused]] const auto error_code)
{
if constexpr (local)
response.error = error_code;
else
2022-05-11 12:10:17 +00:00
onStorageInconsistency();
2022-05-09 08:32:25 +00:00
};
auto & container = storage.container;
auto node_it = container.find(request.path);
if (node_it == container.end())
2020-11-04 18:54:55 +00:00
{
2022-05-09 08:32:25 +00:00
on_error(Coordination::Error::ZNONODE);
2020-11-04 18:54:55 +00:00
}
2022-05-09 08:32:25 +00:00
else if (request.version != -1 && request.version != node_it->value.stat.version)
2020-11-04 18:54:55 +00:00
{
2022-05-09 08:32:25 +00:00
on_error(Coordination::Error::ZBADVERSION);
2020-11-04 18:54:55 +00:00
}
else
{
response.error = Coordination::Error::ZOK;
}
2022-05-06 12:25:25 +00:00
return response_ptr;
2020-11-04 18:54:55 +00:00
}
2022-05-09 08:32:25 +00:00
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<false>(storage, zxid, session_id, time);
}
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<true>(storage, zxid, session_id, time);
}
2020-11-04 18:54:55 +00:00
};
2021-06-22 10:49:35 +00:00
struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestProcessor
2021-06-22 10:49:35 +00:00
{
2022-05-09 09:16:05 +00:00
bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override
2021-06-22 10:49:35 +00:00
{
return storage.checkACL(zk_request->getPath(), Coordination::ACL::Admin, session_id, is_local);
2021-06-22 10:49:35 +00:00
}
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
2021-06-22 10:49:35 +00:00
2022-05-06 12:25:25 +00:00
std::vector<KeeperStorage::Delta> preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /*time*/) const override
2021-06-22 10:49:35 +00:00
{
2022-05-06 12:25:25 +00:00
Coordination::ZooKeeperSetACLRequest & request = dynamic_cast<Coordination::ZooKeeperSetACLRequest &>(*zk_request);
2022-05-11 09:08:39 +00:00
auto & uncommitted_state = storage.uncommitted_state;
if (!uncommitted_state.hasNode(request.path))
2022-05-09 07:02:11 +00:00
return {{zxid, Coordination::Error::ZNONODE}};
2022-05-06 12:25:25 +00:00
2022-05-11 09:08:39 +00:00
auto node = uncommitted_state.getNode(request.path);
2022-05-06 12:25:25 +00:00
if (request.version != -1 && request.version != node->stat.aversion)
2022-05-09 07:02:11 +00:00
return {{zxid, Coordination::Error::ZBADVERSION}};
2022-05-06 12:25:25 +00:00
auto & session_auth_ids = storage.session_and_auth[session_id];
Coordination::ACLs node_acls;
if (!fixupACL(request.acls, session_auth_ids, node_acls))
2022-05-09 07:02:11 +00:00
return {{zxid, Coordination::Error::ZINVALIDACL}};
2022-05-06 12:25:25 +00:00
return {
{request.path, zxid, KeeperStorage::SetACLDelta{std::move(node_acls), request.version}},
{request.path, zxid, KeeperStorage::UpdateNodeDelta{[](KeeperStorage::Node & n) { ++n.stat.aversion; }}}};
}
2021-06-22 10:49:35 +00:00
2022-05-06 12:25:25 +00:00
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const override
{
2021-06-22 10:49:35 +00:00
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperSetACLResponse & response = dynamic_cast<Coordination::ZooKeeperSetACLResponse &>(*response_ptr);
Coordination::ZooKeeperSetACLRequest & request = dynamic_cast<Coordination::ZooKeeperSetACLRequest &>(*zk_request);
2022-05-06 12:25:25 +00:00
2022-05-09 08:32:25 +00:00
if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK)
2021-06-22 10:49:35 +00:00
{
2022-05-06 12:25:25 +00:00
response.error = result;
2022-05-09 07:02:11 +00:00
return response_ptr;
2021-06-22 10:49:35 +00:00
}
2022-05-09 08:32:25 +00:00
auto node_it = storage.container.find(request.path);
if (node_it == storage.container.end())
2022-05-11 12:10:17 +00:00
onStorageInconsistency();
2022-05-09 08:32:25 +00:00
response.stat = node_it->value.stat;
2022-05-06 12:25:25 +00:00
response.error = Coordination::Error::ZOK;
2021-06-22 10:49:35 +00:00
2022-05-06 12:25:25 +00:00
return response_ptr;
2021-06-22 10:49:35 +00:00
}
};
struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestProcessor
2021-06-22 10:49:35 +00:00
{
2022-05-09 09:16:05 +00:00
bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override
2021-06-22 10:49:35 +00:00
{
return storage.checkACL(zk_request->getPath(), Coordination::ACL::Admin | Coordination::ACL::Read, session_id, is_local);
2021-06-22 10:49:35 +00:00
}
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
2021-06-22 10:49:35 +00:00
2022-05-06 12:25:25 +00:00
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/) const override
{
Coordination::ZooKeeperGetACLRequest & request = dynamic_cast<Coordination::ZooKeeperGetACLRequest &>(*zk_request);
2022-05-11 09:08:39 +00:00
if (!storage.uncommitted_state.hasNode(request.path))
2022-05-09 07:02:11 +00:00
return {{zxid, Coordination::Error::ZNONODE}};
2022-05-06 12:25:25 +00:00
return {};
}
2022-05-09 08:32:25 +00:00
template <bool local>
Coordination::ZooKeeperResponsePtr processImpl(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const
2021-06-22 10:49:35 +00:00
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperGetACLResponse & response = dynamic_cast<Coordination::ZooKeeperGetACLResponse &>(*response_ptr);
Coordination::ZooKeeperGetACLRequest & request = dynamic_cast<Coordination::ZooKeeperGetACLRequest &>(*zk_request);
2022-05-09 08:32:25 +00:00
2022-05-10 07:00:38 +00:00
if constexpr (!local)
2022-05-09 08:32:25 +00:00
{
if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK)
{
response.error = result;
return response_ptr;
}
}
const auto on_error = [&]([[maybe_unused]] const auto error_code)
{
if constexpr (local)
response.error = error_code;
else
2022-05-11 12:10:17 +00:00
onStorageInconsistency();
2022-05-09 08:32:25 +00:00
};
2021-06-22 10:49:35 +00:00
auto & container = storage.container;
2022-05-09 08:32:25 +00:00
auto node_it = container.find(request.path);
if (node_it == container.end())
2021-06-22 10:49:35 +00:00
{
2022-05-09 08:32:25 +00:00
on_error(Coordination::Error::ZNONODE);
2021-06-22 10:49:35 +00:00
}
else
{
2022-05-09 08:32:25 +00:00
response.stat = node_it->value.stat;
response.acl = storage.acl_map.convertNumber(node_it->value.acl_id);
2021-06-22 10:49:35 +00:00
}
2022-05-06 12:25:25 +00:00
return response_ptr;
2021-06-22 10:49:35 +00:00
}
2022-05-09 08:32:25 +00:00
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<false>(storage, zxid, session_id, time);
}
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
return processImpl<true>(storage, zxid, session_id, time);
}
2021-06-22 10:49:35 +00:00
};
struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestProcessor
2020-11-04 18:54:55 +00:00
{
2022-05-09 09:16:05 +00:00
bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override
2021-05-21 21:19:22 +00:00
{
for (const auto & concrete_request : concrete_requests)
2022-05-09 09:16:05 +00:00
if (!concrete_request->checkAuth(storage, session_id, is_local))
2021-05-21 21:19:22 +00:00
return false;
return true;
}
std::vector<KeeperStorageRequestProcessorPtr> concrete_requests;
explicit KeeperStorageMultiRequestProcessor(const Coordination::ZooKeeperRequestPtr & zk_request_)
: KeeperStorageRequestProcessor(zk_request_)
2020-11-04 18:54:55 +00:00
{
Coordination::ZooKeeperMultiRequest & request = dynamic_cast<Coordination::ZooKeeperMultiRequest &>(*zk_request);
concrete_requests.reserve(request.requests.size());
2020-11-25 13:19:09 +00:00
for (const auto & sub_request : request.requests)
2020-11-04 18:54:55 +00:00
{
2020-12-10 21:33:13 +00:00
auto sub_zk_request = std::dynamic_pointer_cast<Coordination::ZooKeeperRequest>(sub_request);
2022-04-05 06:27:03 +00:00
switch (sub_zk_request->getOpNum())
2020-11-04 18:54:55 +00:00
{
2022-04-05 06:27:03 +00:00
case Coordination::OpNum::Create:
concrete_requests.push_back(std::make_shared<KeeperStorageCreateRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::Remove:
concrete_requests.push_back(std::make_shared<KeeperStorageRemoveRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::Set:
concrete_requests.push_back(std::make_shared<KeeperStorageSetRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::Check:
concrete_requests.push_back(std::make_shared<KeeperStorageCheckRequestProcessor>(sub_zk_request));
break;
default:
2022-05-06 12:25:25 +00:00
throw DB::Exception(
ErrorCodes::BAD_ARGUMENTS, "Illegal command as part of multi ZooKeeper request {}", sub_zk_request->getOpNum());
2020-11-04 18:54:55 +00:00
}
}
}
2022-05-09 07:02:11 +00:00
std::vector<KeeperStorage::Delta> preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
2022-05-06 12:25:25 +00:00
{
2022-05-09 08:32:25 +00:00
// manually add deltas so that the result of previous request in the transaction is used in the next request
2022-05-11 09:08:39 +00:00
auto & saved_deltas = storage.uncommitted_state.deltas;
2022-05-09 07:02:11 +00:00
std::vector<Coordination::Error> response_errors;
response_errors.reserve(concrete_requests.size());
for (size_t i = 0; i < concrete_requests.size(); ++i)
{
auto new_deltas = concrete_requests[i]->preprocess(storage, zxid, session_id, time);
2022-05-09 11:53:19 +00:00
if (!new_deltas.empty())
2022-05-09 07:02:11 +00:00
{
2022-05-09 11:53:19 +00:00
if (auto * error = std::get_if<KeeperStorage::ErrorDelta>(&new_deltas.back().operation))
{
std::erase_if(saved_deltas, [zxid](const auto & delta) { return delta.zxid == zxid; });
2022-05-09 07:02:11 +00:00
2022-05-09 11:53:19 +00:00
response_errors.push_back(error->error);
2022-05-09 07:02:11 +00:00
2022-05-09 11:53:19 +00:00
for (size_t j = i + 1; j < concrete_requests.size(); ++j)
{
response_errors.push_back(Coordination::Error::ZRUNTIMEINCONSISTENCY);
}
2022-05-09 07:02:11 +00:00
2022-05-09 11:53:19 +00:00
return {{zxid, KeeperStorage::FailedMultiDelta{std::move(response_errors)}}};
}
2022-05-09 07:02:11 +00:00
}
new_deltas.emplace_back(zxid, KeeperStorage::SubDeltaEnd{});
response_errors.push_back(Coordination::Error::ZOK);
saved_deltas.insert(saved_deltas.end(), std::make_move_iterator(new_deltas.begin()), std::make_move_iterator(new_deltas.end()));
}
return {};
2022-05-06 12:25:25 +00:00
}
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
2020-11-04 18:54:55 +00:00
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperMultiResponse & response = dynamic_cast<Coordination::ZooKeeperMultiResponse &>(*response_ptr);
2022-05-11 09:08:39 +00:00
auto & deltas = storage.uncommitted_state.deltas;
2022-05-09 07:02:11 +00:00
if (auto * failed_multi = std::get_if<KeeperStorage::FailedMultiDelta>(&deltas.front().operation))
2020-11-04 18:54:55 +00:00
{
2022-05-09 07:02:11 +00:00
for (size_t i = 0; i < concrete_requests.size(); ++i)
2020-11-04 18:54:55 +00:00
{
2022-05-09 07:02:11 +00:00
response.responses[i] = std::make_shared<Coordination::ZooKeeperErrorResponse>();
response.responses[i]->error = failed_multi->error_codes[i];
}
2020-12-08 13:28:39 +00:00
2022-05-09 07:02:11 +00:00
return response_ptr;
}
2020-12-08 13:28:39 +00:00
2022-05-09 08:32:25 +00:00
for (size_t i = 0; i < concrete_requests.size(); ++i)
2022-05-09 07:02:11 +00:00
{
2022-05-09 08:32:25 +00:00
auto cur_response = concrete_requests[i]->process(storage, zxid, session_id, time);
2020-11-04 18:54:55 +00:00
2022-05-09 07:02:11 +00:00
while (!deltas.empty())
{
if (std::holds_alternative<KeeperStorage::SubDeltaEnd>(deltas.front().operation))
{
deltas.pop_front();
break;
2020-11-04 18:54:55 +00:00
}
2020-12-08 13:28:39 +00:00
2022-05-09 07:02:11 +00:00
deltas.pop_front();
2020-11-04 18:54:55 +00:00
}
2022-05-09 07:02:11 +00:00
response.responses[i] = cur_response;
2022-05-09 08:32:25 +00:00
}
response.error = Coordination::Error::ZOK;
return response_ptr;
}
Coordination::ZooKeeperResponsePtr processLocal(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperMultiResponse & response = dynamic_cast<Coordination::ZooKeeperMultiResponse &>(*response_ptr);
for (size_t i = 0; i < concrete_requests.size(); ++i)
{
auto cur_response = concrete_requests[i]->process(storage, zxid, session_id, time);
response.responses[i] = cur_response;
if (cur_response->error != Coordination::Error::ZOK)
{
for (size_t j = 0; j <= i; ++j)
{
auto response_error = response.responses[j]->error;
response.responses[j] = std::make_shared<Coordination::ZooKeeperErrorResponse>();
response.responses[j]->error = response_error;
}
for (size_t j = i + 1; j < response.responses.size(); ++j)
{
response.responses[j] = std::make_shared<Coordination::ZooKeeperErrorResponse>();
response.responses[j]->error = Coordination::Error::ZRUNTIMEINCONSISTENCY;
}
2022-05-09 07:02:11 +00:00
2022-05-09 08:32:25 +00:00
return response_ptr;
}
2020-11-04 18:54:55 +00:00
}
2022-05-09 07:02:11 +00:00
response.error = Coordination::Error::ZOK;
return response_ptr;
2020-11-04 18:54:55 +00:00
}
2020-11-10 13:43:10 +00:00
2022-05-06 12:25:25 +00:00
KeeperStorage::ResponsesForSessions
processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override
2020-11-10 13:43:10 +00:00
{
2021-03-29 08:24:56 +00:00
KeeperStorage::ResponsesForSessions result;
2020-11-10 13:43:10 +00:00
for (const auto & generic_request : concrete_requests)
2021-01-19 14:22:28 +00:00
{
auto responses = generic_request->processWatches(watches, list_watches);
result.insert(result.end(), responses.begin(), responses.end());
}
return result;
2020-11-10 13:43:10 +00:00
}
2020-11-04 18:54:55 +00:00
};
struct KeeperStorageCloseRequestProcessor final : public KeeperStorageRequestProcessor
2020-11-19 16:06:19 +00:00
{
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
2022-05-06 12:25:25 +00:00
Coordination::ZooKeeperResponsePtr process(KeeperStorage &, int64_t, int64_t, int64_t /* time */) const override
2020-11-19 16:06:19 +00:00
{
2020-11-25 13:19:09 +00:00
throw DB::Exception("Called process on close request", ErrorCodes::LOGICAL_ERROR);
2020-11-19 16:06:19 +00:00
}
};
struct KeeperStorageAuthRequestProcessor final : public KeeperStorageRequestProcessor
2021-05-15 15:01:00 +00:00
{
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::vector<KeeperStorage::Delta> preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /*time*/) const override
2021-05-15 15:01:00 +00:00
{
2021-05-21 21:19:22 +00:00
Coordination::ZooKeeperAuthRequest & auth_request = dynamic_cast<Coordination::ZooKeeperAuthRequest &>(*zk_request);
2021-05-15 15:01:00 +00:00
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
2021-05-21 21:19:22 +00:00
2021-05-24 12:18:04 +00:00
if (auth_request.scheme != "digest" || std::count(auth_request.data.begin(), auth_request.data.end(), ':') != 1)
return {{zxid, Coordination::Error::ZAUTHFAILED}};
std::vector<KeeperStorage::Delta> new_deltas;
auto digest = generateDigest(auth_request.data);
if (digest == storage.superdigest)
2021-05-24 12:18:04 +00:00
{
KeeperStorage::AuthID auth{"super", ""};
new_deltas.emplace_back(zxid, KeeperStorage::AddAuthDelta{session_id, std::move(auth)});
2021-05-24 12:18:04 +00:00
}
else
2021-05-22 16:07:47 +00:00
{
KeeperStorage::AuthID new_auth{auth_request.scheme, digest};
2022-05-11 09:08:39 +00:00
if (!storage.uncommitted_state.hasACL(session_id, false, [&](const auto & auth_id) { return new_auth == auth_id; }))
new_deltas.emplace_back(zxid, KeeperStorage::AddAuthDelta{session_id, std::move(new_auth)});
2021-05-21 21:19:22 +00:00
}
return new_deltas;
}
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperAuthResponse & auth_response = dynamic_cast<Coordination::ZooKeeperAuthResponse &>(*response_ptr);
if (const auto result = storage.commit(zxid, session_id); result != Coordination::Error::ZOK)
auth_response.error = result;
2022-05-09 07:02:11 +00:00
return response_ptr;
2021-05-15 15:01:00 +00:00
}
};
2021-03-29 08:24:56 +00:00
void KeeperStorage::finalize()
2020-11-03 14:49:30 +00:00
{
2021-01-19 14:22:28 +00:00
if (finalized)
throw DB::Exception("Testkeeper storage already finalized", ErrorCodes::LOGICAL_ERROR);
2020-11-10 13:43:10 +00:00
2021-01-19 14:22:28 +00:00
finalized = true;
2020-11-03 14:49:30 +00:00
2021-02-04 13:22:30 +00:00
for (const auto & [session_id, ephemerals_paths] : ephemerals)
for (const String & ephemeral_path : ephemerals_paths)
container.erase(ephemeral_path);
ephemerals.clear();
2020-11-18 20:36:25 +00:00
2021-01-19 14:22:28 +00:00
watches.clear();
list_watches.clear();
sessions_and_watchers.clear();
session_expiry_queue.clear();
2020-11-04 18:54:55 +00:00
}
class KeeperStorageRequestProcessorsFactory final : private boost::noncopyable
2020-11-04 18:54:55 +00:00
{
public:
using Creator = std::function<KeeperStorageRequestProcessorPtr(const Coordination::ZooKeeperRequestPtr &)>;
2020-11-11 13:07:06 +00:00
using OpNumToRequest = std::unordered_map<Coordination::OpNum, Creator>;
2020-11-04 18:54:55 +00:00
static KeeperStorageRequestProcessorsFactory & instance()
2020-11-04 18:54:55 +00:00
{
static KeeperStorageRequestProcessorsFactory factory;
2020-11-04 18:54:55 +00:00
return factory;
}
KeeperStorageRequestProcessorPtr get(const Coordination::ZooKeeperRequestPtr & zk_request) const
2020-11-04 18:54:55 +00:00
{
2022-05-09 09:45:03 +00:00
auto request_it = op_num_to_request.find(zk_request->getOpNum());
if (request_it == op_num_to_request.end())
2020-11-25 13:19:09 +00:00
throw DB::Exception("Unknown operation type " + toString(zk_request->getOpNum()), ErrorCodes::LOGICAL_ERROR);
2020-11-04 18:54:55 +00:00
2022-05-09 09:45:03 +00:00
return request_it->second(zk_request);
2020-11-04 18:54:55 +00:00
}
2020-11-03 14:49:30 +00:00
2020-11-11 13:07:06 +00:00
void registerRequest(Coordination::OpNum op_num, Creator creator)
2020-11-04 18:54:55 +00:00
{
if (!op_num_to_request.try_emplace(op_num, creator).second)
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Request with op num {} already registered", op_num);
}
private:
OpNumToRequest op_num_to_request;
KeeperStorageRequestProcessorsFactory();
2020-11-04 18:54:55 +00:00
};
2022-05-06 12:25:25 +00:00
template <Coordination::OpNum num, typename RequestT>
void registerKeeperRequestProcessor(KeeperStorageRequestProcessorsFactory & factory)
2020-11-04 18:54:55 +00:00
{
2022-05-06 12:25:25 +00:00
factory.registerRequest(
num, [](const Coordination::ZooKeeperRequestPtr & zk_request) { return std::make_shared<RequestT>(zk_request); });
2020-11-04 18:54:55 +00:00
}
KeeperStorageRequestProcessorsFactory::KeeperStorageRequestProcessorsFactory()
2020-11-04 18:54:55 +00:00
{
registerKeeperRequestProcessor<Coordination::OpNum::Heartbeat, KeeperStorageHeartbeatRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Sync, KeeperStorageSyncRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Auth, KeeperStorageAuthRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Close, KeeperStorageCloseRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Create, KeeperStorageCreateRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Remove, KeeperStorageRemoveRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Exists, KeeperStorageExistsRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Get, KeeperStorageGetRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Set, KeeperStorageSetRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::List, KeeperStorageListRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::SimpleList, KeeperStorageListRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Check, KeeperStorageCheckRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Multi, KeeperStorageMultiRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::SetACL, KeeperStorageSetACLRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::GetACL, KeeperStorageGetACLRequestProcessor>(*this);
2020-11-03 14:49:30 +00:00
}
2022-05-16 12:12:29 +00:00
UInt64 KeeperStorage::calculateNodesDigest(UInt64 current_digest, int64_t current_zxid) const
2022-05-13 13:43:42 +00:00
{
std::unordered_map<std::string, std::shared_ptr<Node>> updated_nodes;
for (const auto & delta : uncommitted_state.deltas)
{
if (delta.zxid != current_zxid)
continue;
std::visit(
Overloaded{
[&](const CreateNodeDelta & create_delta)
{
auto node = std::make_shared<Node>();
node->stat = create_delta.stat;
node->setData(create_delta.data);
updated_nodes.emplace(delta.path, node);
},
[&](const RemoveNodeDelta & /* remove_delta */)
{
if (!updated_nodes.contains(delta.path))
{
auto old_digest = uncommitted_state.getNode(delta.path, current_zxid)->getDigest(delta.path);
2022-05-16 12:12:29 +00:00
current_digest -= old_digest;
2022-05-13 13:43:42 +00:00
}
updated_nodes.insert_or_assign(delta.path, nullptr);
},
[&](const UpdateNodeDelta & update_delta)
{
std::shared_ptr<Node> node{nullptr};
auto updated_node_it = updated_nodes.find(delta.path);
if (updated_node_it == updated_nodes.end())
{
node = uncommitted_state.getNode(delta.path, current_zxid);
2022-05-16 12:12:29 +00:00
current_digest -= node->getDigest(delta.path);
2022-05-13 13:43:42 +00:00
updated_nodes.emplace(delta.path, node);
}
else
node = updated_node_it->second;
update_delta.update_fn(*node);
},
[](auto && /* delta */) {}},
delta.operation);
}
for (const auto & [path, updated_node] : updated_nodes)
{
if (updated_node)
{
updated_node->invalidateDigestCache();
2022-05-16 12:12:29 +00:00
current_digest += updated_node->getDigest(path);
2022-05-13 13:43:42 +00:00
}
}
2022-05-16 12:12:29 +00:00
return current_digest;
2022-05-13 13:43:42 +00:00
}
2022-05-09 07:02:11 +00:00
void KeeperStorage::preprocessRequest(
2022-05-13 13:43:42 +00:00
const Coordination::ZooKeeperRequestPtr & zk_request,
int64_t session_id,
int64_t time,
int64_t new_last_zxid,
2022-05-16 12:12:29 +00:00
Digest expected_digest,
2022-05-13 13:43:42 +00:00
bool check_acl)
2022-05-05 10:32:41 +00:00
{
2022-05-13 13:43:42 +00:00
int64_t last_zxid = getNextZXID() - 1;
2022-05-12 08:58:36 +00:00
2022-05-16 12:12:29 +00:00
if (uncommitted_transactions.empty())
2022-05-13 13:43:42 +00:00
{
2022-05-16 12:12:29 +00:00
if (new_last_zxid <= last_zxid)
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Got new ZXID {} smaller or equal to current ZXID ({}). It's a bug", new_last_zxid, last_zxid);
}
else
{
// if we are leader node, the request potentially already got processed
auto txn_it = std::lower_bound(
uncommitted_transactions.begin(),
uncommitted_transactions.end(),
new_last_zxid,
[&](const auto & request, const auto value) { return request.zxid < value; });
// this zxid is not found in the uncommitted_transactions so do the regular check
if (txn_it == uncommitted_transactions.end())
{
if (new_last_zxid <= last_zxid)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Got new ZXID {} smaller or equal to current ZXID ({}). It's a bug",
new_last_zxid,
last_zxid);
}
else
{
if (txn_it->zxid == new_last_zxid && checkDigest(txn_it->nodes_digest, expected_digest))
// we found the preprocessed request with the same ZXID, we can skip it
return;
2022-05-13 13:43:42 +00:00
2022-05-16 12:12:29 +00:00
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Found invalid state of uncommitted transactions, missing request with ZXID {}", new_last_zxid);
}
2022-05-13 13:43:42 +00:00
}
2022-05-12 08:58:36 +00:00
2022-05-13 13:43:42 +00:00
TransactionInfo transaction{.zxid = new_last_zxid};
SCOPE_EXIT({
2022-05-16 12:12:29 +00:00
if (expected_digest.version == DigestVersion::NO_DIGEST)
transaction.nodes_digest = Digest{CURRENT_DIGEST_VERSION, calculateNodesDigest(getNodesDigest(false).value, transaction.zxid)};
else
transaction.nodes_digest = expected_digest;
2022-05-13 13:43:42 +00:00
uncommitted_transactions.emplace_back(transaction);
});
2022-05-12 08:58:36 +00:00
2022-05-09 09:16:05 +00:00
KeeperStorageRequestProcessorPtr request_processor = KeeperStorageRequestProcessorsFactory::instance().get(zk_request);
2022-05-09 09:35:16 +00:00
if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special
{
2022-05-11 09:08:39 +00:00
auto & deltas = uncommitted_state.deltas;
2022-05-09 09:35:16 +00:00
auto session_ephemerals = ephemerals.find(session_id);
if (session_ephemerals != ephemerals.end())
{
for (const auto & ephemeral_path : session_ephemerals->second)
{
2022-05-11 09:08:39 +00:00
if (uncommitted_state.hasNode(ephemeral_path))
2022-05-09 09:35:16 +00:00
{
2022-05-09 09:42:23 +00:00
deltas.emplace_back(
parentPath(ephemeral_path).toString(),
new_last_zxid,
UpdateNodeDelta{[ephemeral_path](Node & parent)
{
--parent.stat.numChildren;
++parent.stat.cversion;
}});
2022-05-09 09:35:16 +00:00
deltas.emplace_back(ephemeral_path, new_last_zxid, RemoveNodeDelta());
}
}
}
return;
}
2022-05-09 09:16:05 +00:00
if (check_acl && !request_processor->checkAuth(*this, session_id, false))
{
2022-05-11 09:08:39 +00:00
uncommitted_state.deltas.emplace_back(new_last_zxid, Coordination::Error::ZNOAUTH);
2022-05-09 09:16:05 +00:00
return;
}
2022-05-13 13:43:42 +00:00
auto new_deltas = request_processor->preprocess(*this, transaction.zxid, session_id, time);
2022-05-11 09:08:39 +00:00
uncommitted_state.deltas.insert(
uncommitted_state.deltas.end(), std::make_move_iterator(new_deltas.begin()), std::make_move_iterator(new_deltas.end()));
2022-05-05 10:32:41 +00:00
}
2022-05-06 12:25:25 +00:00
KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
const Coordination::ZooKeeperRequestPtr & zk_request,
int64_t session_id,
int64_t time,
std::optional<int64_t> new_last_zxid,
2022-05-09 08:32:25 +00:00
bool check_acl,
bool is_local)
2020-11-03 14:49:30 +00:00
{
2021-02-25 08:34:05 +00:00
if (new_last_zxid)
{
2022-05-13 13:43:42 +00:00
if (uncommitted_transactions.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to commit a ZXID ({}) which was not preprocessed", *new_last_zxid);
2022-05-12 08:58:36 +00:00
2022-05-13 13:43:42 +00:00
if (uncommitted_transactions.front().zxid != *new_last_zxid)
2022-05-12 08:58:36 +00:00
throw Exception(
2022-05-13 13:43:42 +00:00
ErrorCodes::LOGICAL_ERROR,
"Trying to commit a ZXID {} while the next ZXID to commit is {}",
*new_last_zxid,
uncommitted_transactions.front().zxid);
2022-05-12 08:58:36 +00:00
2021-02-25 08:34:05 +00:00
zxid = *new_last_zxid;
2022-05-13 13:43:42 +00:00
uncommitted_transactions.pop_front();
2021-02-25 08:34:05 +00:00
}
2022-05-12 08:58:36 +00:00
KeeperStorage::ResponsesForSessions results;
2021-09-02 11:54:32 +00:00
/// ZooKeeper update sessions expirity for each request, not only for heartbeats
2021-09-02 11:43:34 +00:00
session_expiry_queue.addNewSessionOrUpdate(session_id, session_and_timeout[session_id]);
if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special
2020-11-03 14:49:30 +00:00
{
2022-05-09 09:35:16 +00:00
commit(zxid, session_id);
2021-03-16 12:36:54 +00:00
2022-05-11 09:08:39 +00:00
for (const auto & delta : uncommitted_state.deltas)
2022-05-09 09:35:16 +00:00
{
if (delta.zxid > zxid)
break;
2022-01-22 19:36:23 +00:00
2022-05-09 09:35:16 +00:00
if (std::holds_alternative<RemoveNodeDelta>(delta.operation))
{
auto responses = processWatchesImpl(delta.path, watches, list_watches, Coordination::Event::DELETED);
2021-01-19 14:22:28 +00:00
results.insert(results.end(), responses.begin(), responses.end());
}
}
2022-05-09 09:35:16 +00:00
2022-05-11 09:08:39 +00:00
std::erase_if(uncommitted_state.deltas, [this](const auto & delta) { return delta.zxid == zxid; });
2022-05-09 09:35:16 +00:00
2021-01-19 14:22:28 +00:00
clearDeadWatches(session_id);
2021-05-27 08:15:46 +00:00
auto auth_it = session_and_auth.find(session_id);
if (auth_it != session_and_auth.end())
session_and_auth.erase(auth_it);
2021-01-19 14:22:28 +00:00
/// Finish connection
auto response = std::make_shared<Coordination::ZooKeeperCloseResponse>();
response->xid = zk_request->xid;
response->zxid = getZXID();
session_expiry_queue.remove(session_id);
session_and_timeout.erase(session_id);
results.push_back(ResponseForSession{session_id, response});
}
else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat) /// Heartbeat request is also special
{
KeeperStorageRequestProcessorPtr storage_request = KeeperStorageRequestProcessorsFactory::instance().get(zk_request);
2022-05-09 07:02:11 +00:00
auto response = storage_request->process(*this, zxid, session_id, time);
response->xid = zk_request->xid;
response->zxid = getZXID();
2021-01-19 14:45:45 +00:00
results.push_back(ResponseForSession{session_id, response});
2020-11-03 14:49:30 +00:00
}
else /// normal requests proccession
2020-11-03 14:49:30 +00:00
{
KeeperStorageRequestProcessorPtr request_processor = KeeperStorageRequestProcessorsFactory::instance().get(zk_request);
2021-05-21 21:19:22 +00:00
Coordination::ZooKeeperResponsePtr response;
2022-05-09 09:16:05 +00:00
if (is_local)
2021-05-21 21:19:22 +00:00
{
2022-05-09 09:16:05 +00:00
if (check_acl && !request_processor->checkAuth(*this, session_id, true))
{
response = zk_request->makeResponse();
/// Original ZooKeeper always throws no auth, even when user provided some credentials
response->error = Coordination::Error::ZNOAUTH;
}
2022-05-11 07:53:32 +00:00
else
{
response = request_processor->processLocal(*this, zxid, session_id, time);
}
2021-05-21 21:19:22 +00:00
}
else
{
2022-05-09 09:16:05 +00:00
response = request_processor->process(*this, zxid, session_id, time);
2022-05-11 09:08:39 +00:00
std::erase_if(uncommitted_state.deltas, [this](const auto & delta) { return delta.zxid == zxid; });
2021-05-21 21:19:22 +00:00
}
2021-01-19 14:22:28 +00:00
/// Watches for this requests are added to the watches lists
2021-01-19 14:22:28 +00:00
if (zk_request->has_watch)
{
if (response->error == Coordination::Error::ZOK)
{
2022-05-06 12:25:25 +00:00
auto & watches_type
= zk_request->getOpNum() == Coordination::OpNum::List || zk_request->getOpNum() == Coordination::OpNum::SimpleList
2021-01-19 14:22:28 +00:00
? list_watches
: watches;
watches_type[zk_request->getPath()].emplace_back(session_id);
sessions_and_watchers[session_id].emplace(zk_request->getPath());
}
else if (response->error == Coordination::Error::ZNONODE && zk_request->getOpNum() == Coordination::OpNum::Exists)
{
watches[zk_request->getPath()].emplace_back(session_id);
sessions_and_watchers[session_id].emplace(zk_request->getPath());
}
}
/// If this requests processed successfully we need to check watches
2021-01-19 14:22:28 +00:00
if (response->error == Coordination::Error::ZOK)
{
auto watch_responses = request_processor->processWatches(watches, list_watches);
2021-01-19 14:22:28 +00:00
results.insert(results.end(), watch_responses.begin(), watch_responses.end());
}
response->xid = zk_request->xid;
response->zxid = getZXID();
2021-01-19 14:45:45 +00:00
results.push_back(ResponseForSession{session_id, response});
2020-11-03 14:49:30 +00:00
}
2021-01-19 14:22:28 +00:00
return results;
2020-11-03 14:49:30 +00:00
}
2022-05-10 13:04:35 +00:00
void KeeperStorage::rollbackRequest(int64_t rollback_zxid)
{
2022-05-13 13:43:42 +00:00
if (uncommitted_transactions.empty() || uncommitted_transactions.back().zxid != rollback_zxid)
2022-05-12 08:58:36 +00:00
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Trying to rollback invalid ZXID ({}). It should be the last preprocessed.", rollback_zxid);
2022-05-10 13:04:35 +00:00
// we can only rollback the last zxid (if there is any)
// if there is a delta with a larger zxid, we have invalid state
2022-05-11 09:08:39 +00:00
assert(uncommitted_state.deltas.empty() || uncommitted_state.deltas.back().zxid <= rollback_zxid);
std::erase_if(uncommitted_state.deltas, [rollback_zxid](const auto & delta) { return delta.zxid == rollback_zxid; });
2022-05-13 13:43:42 +00:00
uncommitted_transactions.pop_back();
}
2022-05-16 12:12:29 +00:00
KeeperStorage::Digest KeeperStorage::getNodesDigest(bool committed) const
2022-05-13 13:43:42 +00:00
{
if (committed || uncommitted_transactions.empty())
2022-05-16 12:12:29 +00:00
return {CURRENT_DIGEST_VERSION, nodes_digest};
2022-05-13 13:43:42 +00:00
2022-05-16 12:12:29 +00:00
return uncommitted_transactions.back().nodes_digest;
2022-05-10 13:04:35 +00:00
}
2021-01-19 14:22:28 +00:00
2021-03-29 08:24:56 +00:00
void KeeperStorage::clearDeadWatches(int64_t session_id)
2020-11-26 14:57:32 +00:00
{
2021-08-25 09:31:02 +00:00
/// Clear all watches for this session
2020-11-26 14:57:32 +00:00
auto watches_it = sessions_and_watchers.find(session_id);
if (watches_it != sessions_and_watchers.end())
{
for (const auto & watch_path : watches_it->second)
{
/// Maybe it's a normal watch
2020-11-26 14:57:32 +00:00
auto watch = watches.find(watch_path);
if (watch != watches.end())
{
auto & watches_for_path = watch->second;
for (auto w_it = watches_for_path.begin(); w_it != watches_for_path.end();)
{
2021-01-19 14:22:28 +00:00
if (*w_it == session_id)
2020-11-26 14:57:32 +00:00
w_it = watches_for_path.erase(w_it);
else
++w_it;
}
if (watches_for_path.empty())
watches.erase(watch);
}
2020-12-14 16:01:29 +00:00
/// Maybe it's a list watch
2020-12-14 16:01:29 +00:00
auto list_watch = list_watches.find(watch_path);
if (list_watch != list_watches.end())
{
auto & list_watches_for_path = list_watch->second;
for (auto w_it = list_watches_for_path.begin(); w_it != list_watches_for_path.end();)
{
2021-01-19 14:22:28 +00:00
if (*w_it == session_id)
2020-12-14 16:01:29 +00:00
w_it = list_watches_for_path.erase(w_it);
else
++w_it;
}
if (list_watches_for_path.empty())
list_watches.erase(list_watch);
}
2020-11-26 14:57:32 +00:00
}
2020-11-26 14:57:32 +00:00
sessions_and_watchers.erase(watches_it);
}
}
2021-11-05 10:21:34 +00:00
void KeeperStorage::dumpWatches(WriteBufferFromOwnString & buf) const
{
2021-11-18 20:17:22 +00:00
for (const auto & [session_id, watches_paths] : sessions_and_watchers)
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
buf << "0x" << getHexUIntLowercase(session_id) << "\n";
for (const String & path : watches_paths)
2021-11-12 12:48:42 +00:00
buf << "\t" << path << "\n";
2021-11-05 10:21:34 +00:00
}
}
void KeeperStorage::dumpWatchesByPath(WriteBufferFromOwnString & buf) const
{
2021-11-18 20:17:22 +00:00
auto write_int_vec = [&buf](const std::vector<int64_t> & session_ids)
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
for (int64_t session_id : session_ids)
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
buf << "\t0x" << getHexUIntLowercase(session_id) << "\n";
2021-11-05 10:21:34 +00:00
}
};
2021-11-18 20:17:22 +00:00
for (const auto & [watch_path, sessions] : watches)
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
buf << watch_path << "\n";
write_int_vec(sessions);
2021-11-05 10:21:34 +00:00
}
2021-11-18 20:17:22 +00:00
for (const auto & [watch_path, sessions] : list_watches)
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
buf << watch_path << "\n";
write_int_vec(sessions);
2021-11-05 10:21:34 +00:00
}
}
2021-11-18 20:17:22 +00:00
void KeeperStorage::dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
auto write_str_set = [&buf](const std::unordered_set<String> & ephemeral_paths)
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
for (const String & path : ephemeral_paths)
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
buf << "\t" << path << "\n";
2021-11-05 10:21:34 +00:00
}
};
2021-11-18 20:17:22 +00:00
buf << "Sessions dump (" << session_and_timeout.size() << "):\n";
for (const auto & [session_id, _] : session_and_timeout)
{
buf << "0x" << getHexUIntLowercase(session_id) << "\n";
}
buf << "Sessions with Ephemerals (" << getSessionWithEphemeralNodesCount() << "):\n";
for (const auto & [session_id, ephemeral_paths] : ephemerals)
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
buf << "0x" << getHexUIntLowercase(session_id) << "\n";
write_str_set(ephemeral_paths);
2021-11-05 10:21:34 +00:00
}
}
2021-11-18 20:17:22 +00:00
uint64_t KeeperStorage::getTotalWatchesCount() const
{
uint64_t ret = 0;
for (const auto & [path, subscribed_sessions] : watches)
ret += subscribed_sessions.size();
for (const auto & [path, subscribed_sessions] : list_watches)
ret += subscribed_sessions.size();
return ret;
}
uint64_t KeeperStorage::getSessionsWithWatchesCount() const
{
std::unordered_set<int64_t> counter;
for (const auto & [path, subscribed_sessions] : watches)
counter.insert(subscribed_sessions.begin(), subscribed_sessions.end());
for (const auto & [path, subscribed_sessions] : list_watches)
counter.insert(subscribed_sessions.begin(), subscribed_sessions.end());
return counter.size();
}
uint64_t KeeperStorage::getTotalEphemeralNodesCount() const
{
uint64_t ret = 0;
for (const auto & [session_id, nodes] : ephemerals)
ret += nodes.size();
return ret;
}
2020-10-30 14:16:47 +00:00
}