2021-03-29 08:24:56 +00:00
|
|
|
#include <Coordination/KeeperStorage.h>
|
2020-10-30 19:57:30 +00:00
|
|
|
#include <Common/ZooKeeper/IKeeper.h>
|
2020-11-03 14:49:30 +00:00
|
|
|
#include <Common/setThreadName.h>
|
|
|
|
#include <mutex>
|
|
|
|
#include <functional>
|
|
|
|
#include <common/logger_useful.h>
|
2020-11-04 18:54:55 +00:00
|
|
|
#include <Common/StringUtils/StringUtils.h>
|
2020-11-18 15:25:59 +00:00
|
|
|
#include <sstream>
|
|
|
|
#include <iomanip>
|
2021-05-21 21:19:22 +00:00
|
|
|
#include <Poco/SHA1Engine.h>
|
2021-05-22 16:07:47 +00:00
|
|
|
#include <Poco/Base64Encoder.h>
|
|
|
|
#include <boost/algorithm/string.hpp>
|
2020-10-30 14:16:47 +00:00
|
|
|
|
2020-11-11 13:55:28 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
2020-11-25 13:19:09 +00:00
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int LOGICAL_ERROR;
|
|
|
|
extern const int BAD_ARGUMENTS;
|
|
|
|
}
|
|
|
|
|
2020-10-30 19:57:30 +00:00
|
|
|
static String parentPath(const String & path)
|
|
|
|
{
|
|
|
|
auto rslash_pos = path.rfind('/');
|
|
|
|
if (rslash_pos > 0)
|
|
|
|
return path.substr(0, rslash_pos);
|
|
|
|
return "/";
|
|
|
|
}
|
|
|
|
|
2021-02-19 07:25:55 +00:00
|
|
|
static std::string getBaseName(const String & path)
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-02-19 07:05:52 +00:00
|
|
|
size_t basename_start = path.rfind('/');
|
2021-02-19 07:25:55 +00:00
|
|
|
return std::string{&path[basename_start + 1], path.length() - basename_start - 1};
|
2020-11-04 18:54:55 +00:00
|
|
|
}
|
2020-11-03 14:49:30 +00:00
|
|
|
|
2021-05-22 16:07:47 +00:00
|
|
|
static String base64Encode(const String & decoded)
|
|
|
|
{
|
|
|
|
std::ostringstream ostr; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
|
|
|
ostr.exceptions(std::ios::failbit);
|
|
|
|
Poco::Base64Encoder encoder(ostr);
|
|
|
|
encoder.rdbuf()->setLineLength(0);
|
|
|
|
encoder << decoded;
|
|
|
|
encoder.close();
|
|
|
|
return ostr.str();
|
|
|
|
}
|
|
|
|
|
2021-05-21 21:19:22 +00:00
|
|
|
static String getSHA1(const String & userdata)
|
|
|
|
{
|
|
|
|
Poco::SHA1Engine engine;
|
|
|
|
engine.update(userdata);
|
|
|
|
const auto & digest_id = engine.digest();
|
|
|
|
return String{digest_id.begin(), digest_id.end()};
|
|
|
|
}
|
|
|
|
|
2021-05-22 16:07:47 +00:00
|
|
|
static String generateDigest(const String & userdata)
|
|
|
|
{
|
|
|
|
std::vector<String> user_password;
|
|
|
|
boost::split(user_password, userdata, [](char c) { return c == ':'; });
|
|
|
|
return user_password[0] + base64Encode(getSHA1(user_password[1]));
|
|
|
|
}
|
|
|
|
|
|
|
|
static bool checkACL(int32_t permission, const Coordination::ACLs & node_acls, const std::vector<KeeperStorage::AuthID> & session_auths)
|
2021-05-21 21:19:22 +00:00
|
|
|
{
|
|
|
|
if (node_acls.empty())
|
|
|
|
return true;
|
|
|
|
|
2021-05-22 16:07:47 +00:00
|
|
|
for (const auto & session_auth : session_auths)
|
|
|
|
if (session_auth.scheme == "super")
|
|
|
|
return true;
|
|
|
|
|
2021-05-21 21:19:22 +00:00
|
|
|
for (size_t i = 0; i < node_acls.size(); ++i)
|
|
|
|
{
|
2021-05-22 16:07:47 +00:00
|
|
|
if (node_acls[i].permissions & permission)
|
|
|
|
{
|
|
|
|
if (node_acls[i].scheme == "world" && node_acls[i].id == "anyone")
|
|
|
|
return true;
|
2021-05-21 21:19:22 +00:00
|
|
|
|
2021-05-22 16:07:47 +00:00
|
|
|
if (node_acls[i].scheme == session_auths[i].scheme && node_acls[i].id == session_auths[i].id)
|
|
|
|
return true;
|
|
|
|
}
|
2021-05-21 21:19:22 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return false;
|
|
|
|
}
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
static KeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches, Coordination::Event event_type)
|
2020-11-10 13:43:10 +00:00
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
KeeperStorage::ResponsesForSessions result;
|
2020-12-08 13:28:39 +00:00
|
|
|
auto it = watches.find(path);
|
2020-11-10 13:43:10 +00:00
|
|
|
if (it != watches.end())
|
|
|
|
{
|
2020-12-08 13:28:39 +00:00
|
|
|
std::shared_ptr<Coordination::ZooKeeperWatchResponse> watch_response = std::make_shared<Coordination::ZooKeeperWatchResponse>();
|
|
|
|
watch_response->path = path;
|
2021-01-21 11:07:55 +00:00
|
|
|
watch_response->xid = Coordination::WATCH_XID;
|
2020-12-08 13:28:39 +00:00
|
|
|
watch_response->zxid = -1;
|
|
|
|
watch_response->type = event_type;
|
|
|
|
watch_response->state = Coordination::State::CONNECTED;
|
2021-01-19 14:22:28 +00:00
|
|
|
for (auto watcher_session : it->second)
|
2021-03-29 08:24:56 +00:00
|
|
|
result.push_back(KeeperStorage::ResponseForSession{watcher_session, watch_response});
|
2020-11-10 13:43:10 +00:00
|
|
|
|
|
|
|
watches.erase(it);
|
|
|
|
}
|
|
|
|
|
2020-12-08 13:28:39 +00:00
|
|
|
auto parent_path = parentPath(path);
|
|
|
|
it = list_watches.find(parent_path);
|
2020-11-10 13:43:10 +00:00
|
|
|
if (it != list_watches.end())
|
|
|
|
{
|
2020-12-08 13:28:39 +00:00
|
|
|
std::shared_ptr<Coordination::ZooKeeperWatchResponse> watch_list_response = std::make_shared<Coordination::ZooKeeperWatchResponse>();
|
|
|
|
watch_list_response->path = parent_path;
|
2021-01-21 11:07:55 +00:00
|
|
|
watch_list_response->xid = Coordination::WATCH_XID;
|
2020-12-08 13:28:39 +00:00
|
|
|
watch_list_response->zxid = -1;
|
|
|
|
watch_list_response->type = Coordination::Event::CHILD;
|
|
|
|
watch_list_response->state = Coordination::State::CONNECTED;
|
2021-01-19 14:22:28 +00:00
|
|
|
for (auto watcher_session : it->second)
|
2021-03-29 08:24:56 +00:00
|
|
|
result.push_back(KeeperStorage::ResponseForSession{watcher_session, watch_list_response});
|
2020-11-10 13:43:10 +00:00
|
|
|
|
|
|
|
list_watches.erase(it);
|
|
|
|
}
|
2021-01-19 14:22:28 +00:00
|
|
|
return result;
|
2020-11-10 13:43:10 +00:00
|
|
|
}
|
|
|
|
|
2021-05-22 16:07:47 +00:00
|
|
|
static bool fixupACL(
|
|
|
|
const std::vector<Coordination::ACL> & request_acls,
|
|
|
|
const std::vector<KeeperStorage::AuthID> & current_ids,
|
|
|
|
std::vector<Coordination::ACL> & result_acls)
|
|
|
|
{
|
|
|
|
if (request_acls.empty())
|
|
|
|
return false;
|
|
|
|
|
|
|
|
for (const auto & request_acl : request_acls)
|
|
|
|
{
|
|
|
|
if (request_acl.scheme == "world" && request_acl.id == "anyone")
|
|
|
|
{
|
|
|
|
result_acls.push_back(request_acl);
|
|
|
|
}
|
|
|
|
else if (request_acl.scheme == "auth")
|
|
|
|
{
|
|
|
|
for (const auto & current_id : current_ids)
|
|
|
|
{
|
|
|
|
Coordination::ACL new_acl = request_acl;
|
|
|
|
new_acl.scheme = current_id.scheme;
|
|
|
|
new_acl.id = current_id.id;
|
|
|
|
result_acls.push_back(new_acl);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return !result_acls.empty();
|
|
|
|
}
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
KeeperStorage::KeeperStorage(int64_t tick_time_ms)
|
2021-02-03 20:32:15 +00:00
|
|
|
: session_expiry_queue(tick_time_ms)
|
2020-11-03 14:49:30 +00:00
|
|
|
{
|
2021-02-26 13:53:34 +00:00
|
|
|
container.insert("/", Node());
|
2020-11-03 14:49:30 +00:00
|
|
|
}
|
|
|
|
|
2020-10-30 19:57:30 +00:00
|
|
|
using Undo = std::function<void()>;
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
struct KeeperStorageRequest
|
2020-10-30 19:57:30 +00:00
|
|
|
{
|
|
|
|
Coordination::ZooKeeperRequestPtr zk_request;
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
explicit KeeperStorageRequest(const Coordination::ZooKeeperRequestPtr & zk_request_)
|
2020-10-30 19:57:30 +00:00
|
|
|
: zk_request(zk_request_)
|
|
|
|
{}
|
2021-05-21 21:19:22 +00:00
|
|
|
virtual std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t session_id) const = 0;
|
2021-03-29 08:24:56 +00:00
|
|
|
virtual KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & /*watches*/, KeeperStorage::Watches & /*list_watches*/) const { return {}; }
|
2021-05-21 21:19:22 +00:00
|
|
|
virtual bool checkAuth(KeeperStorage & /*storage*/, int64_t /*session_id*/) const { return true; }
|
2020-11-10 13:43:10 +00:00
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
virtual ~KeeperStorageRequest() = default;
|
2020-11-20 12:36:10 +00:00
|
|
|
};
|
2020-10-30 19:57:30 +00:00
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
struct KeeperStorageHeartbeatRequest final : public KeeperStorageRequest
|
2020-11-03 14:49:30 +00:00
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
using KeeperStorageRequest::KeeperStorageRequest;
|
2021-05-21 21:19:22 +00:00
|
|
|
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */) const override
|
2020-11-03 14:49:30 +00:00
|
|
|
{
|
|
|
|
return {zk_request->makeResponse(), {}};
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
struct KeeperStorageSyncRequest final : public KeeperStorageRequest
|
2021-02-10 13:01:05 +00:00
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
using KeeperStorageRequest::KeeperStorageRequest;
|
2021-05-21 21:19:22 +00:00
|
|
|
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */) const override
|
2021-02-10 13:01:05 +00:00
|
|
|
{
|
|
|
|
auto response = zk_request->makeResponse();
|
2021-05-08 14:10:06 +00:00
|
|
|
dynamic_cast<Coordination::ZooKeeperSyncResponse &>(*response).path
|
|
|
|
= dynamic_cast<Coordination::ZooKeeperSyncRequest &>(*zk_request).path;
|
2021-02-10 13:01:05 +00:00
|
|
|
return {response, {}};
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
struct KeeperStorageCreateRequest final : public KeeperStorageRequest
|
2020-10-30 19:57:30 +00:00
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
using KeeperStorageRequest::KeeperStorageRequest;
|
2020-11-10 13:43:10 +00:00
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override
|
2020-11-10 13:43:10 +00:00
|
|
|
{
|
2021-01-19 14:22:28 +00:00
|
|
|
return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CREATED);
|
2020-11-10 13:43:10 +00:00
|
|
|
}
|
|
|
|
|
2021-05-21 21:19:22 +00:00
|
|
|
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
|
2020-10-30 19:57:30 +00:00
|
|
|
{
|
2021-05-21 21:19:22 +00:00
|
|
|
auto & container = storage.container;
|
2021-05-22 07:38:50 +00:00
|
|
|
auto parent_path = parentPath(zk_request->getPath());
|
2021-05-21 21:19:22 +00:00
|
|
|
|
|
|
|
auto it = container.find(parent_path);
|
|
|
|
if (it == container.end())
|
|
|
|
return true;
|
|
|
|
|
|
|
|
const auto & node_acls = it->value.acls;
|
|
|
|
const auto & session_auths = storage.session_and_auth[session_id];
|
|
|
|
return checkACL(Coordination::ACL::Create, node_acls, session_auths);
|
|
|
|
}
|
|
|
|
|
|
|
|
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t session_id) const override
|
|
|
|
{
|
|
|
|
auto & container = storage.container;
|
|
|
|
auto & ephemerals = storage.ephemerals;
|
|
|
|
|
2020-10-30 19:57:30 +00:00
|
|
|
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
|
|
|
|
Undo undo;
|
|
|
|
Coordination::ZooKeeperCreateResponse & response = dynamic_cast<Coordination::ZooKeeperCreateResponse &>(*response_ptr);
|
|
|
|
Coordination::ZooKeeperCreateRequest & request = dynamic_cast<Coordination::ZooKeeperCreateRequest &>(*zk_request);
|
|
|
|
|
2021-02-26 13:53:34 +00:00
|
|
|
if (container.contains(request.path))
|
2020-10-30 19:57:30 +00:00
|
|
|
{
|
|
|
|
response.error = Coordination::Error::ZNODEEXISTS;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2021-02-26 13:53:34 +00:00
|
|
|
auto parent_path = parentPath(request.path);
|
|
|
|
auto it = container.find(parent_path);
|
2020-10-30 19:57:30 +00:00
|
|
|
|
|
|
|
if (it == container.end())
|
|
|
|
{
|
|
|
|
response.error = Coordination::Error::ZNONODE;
|
|
|
|
}
|
2021-03-03 12:29:00 +00:00
|
|
|
else if (it->value.stat.ephemeralOwner != 0)
|
2020-10-30 19:57:30 +00:00
|
|
|
{
|
|
|
|
response.error = Coordination::Error::ZNOCHILDRENFOREPHEMERALS;
|
|
|
|
}
|
2021-05-21 21:19:22 +00:00
|
|
|
else if (request.acls.size() != storage.session_and_auth[session_id].size())
|
|
|
|
{
|
|
|
|
response.error = Coordination::Error::ZBADARGUMENTS;
|
|
|
|
}
|
2020-10-30 19:57:30 +00:00
|
|
|
else
|
|
|
|
{
|
2021-05-21 21:19:22 +00:00
|
|
|
auto & session_auth_ids = storage.session_and_auth[session_id];
|
2021-05-22 16:07:47 +00:00
|
|
|
|
|
|
|
KeeperStorage::Node created_node;
|
|
|
|
|
|
|
|
if (!fixupACL(request.acls, session_auth_ids, created_node.acls))
|
2021-05-21 21:19:22 +00:00
|
|
|
{
|
2021-05-22 16:07:47 +00:00
|
|
|
response.error = Coordination::Error::ZINVALIDACL;
|
|
|
|
return {response_ptr, {}};
|
2021-05-21 21:19:22 +00:00
|
|
|
}
|
|
|
|
|
2020-10-30 19:57:30 +00:00
|
|
|
created_node.stat.czxid = zxid;
|
|
|
|
created_node.stat.mzxid = zxid;
|
|
|
|
created_node.stat.ctime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
|
|
|
|
created_node.stat.mtime = created_node.stat.ctime;
|
|
|
|
created_node.stat.numChildren = 0;
|
|
|
|
created_node.stat.dataLength = request.data.length();
|
2021-03-03 12:29:00 +00:00
|
|
|
created_node.stat.ephemeralOwner = request.is_ephemeral ? session_id : 0;
|
2020-10-30 19:57:30 +00:00
|
|
|
created_node.data = request.data;
|
|
|
|
created_node.is_sequental = request.is_sequential;
|
2021-05-21 21:19:22 +00:00
|
|
|
|
2020-10-30 19:57:30 +00:00
|
|
|
std::string path_created = request.path;
|
|
|
|
|
|
|
|
if (request.is_sequential)
|
|
|
|
{
|
2021-02-26 13:53:34 +00:00
|
|
|
auto seq_num = it->value.seq_num;
|
2020-10-30 19:57:30 +00:00
|
|
|
|
2020-11-18 15:25:59 +00:00
|
|
|
std::stringstream seq_num_str; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
|
|
|
seq_num_str.exceptions(std::ios::failbit);
|
2020-10-30 19:57:30 +00:00
|
|
|
seq_num_str << std::setw(10) << std::setfill('0') << seq_num;
|
|
|
|
|
|
|
|
path_created += seq_num_str.str();
|
|
|
|
}
|
|
|
|
|
2021-02-19 07:25:55 +00:00
|
|
|
auto child_path = getBaseName(path_created);
|
2021-03-29 08:24:56 +00:00
|
|
|
container.updateValue(parent_path, [child_path] (KeeperStorage::Node & parent)
|
2021-02-26 13:53:34 +00:00
|
|
|
{
|
|
|
|
/// Increment sequential number even if node is not sequential
|
|
|
|
++parent.seq_num;
|
|
|
|
parent.children.insert(child_path);
|
|
|
|
++parent.stat.cversion;
|
|
|
|
++parent.stat.numChildren;
|
|
|
|
});
|
|
|
|
|
|
|
|
response.path_created = path_created;
|
|
|
|
container.insert(path_created, std::move(created_node));
|
2020-10-30 19:57:30 +00:00
|
|
|
|
2020-11-19 16:06:19 +00:00
|
|
|
if (request.is_ephemeral)
|
|
|
|
ephemerals[session_id].emplace(path_created);
|
|
|
|
|
2021-02-26 13:53:34 +00:00
|
|
|
undo = [&container, &ephemerals, session_id, path_created, is_ephemeral = request.is_ephemeral, parent_path, child_path]
|
2020-10-30 19:57:30 +00:00
|
|
|
{
|
|
|
|
container.erase(path_created);
|
2020-11-19 16:48:49 +00:00
|
|
|
if (is_ephemeral)
|
|
|
|
ephemerals[session_id].erase(path_created);
|
2020-10-30 19:57:30 +00:00
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
container.updateValue(parent_path, [child_path] (KeeperStorage::Node & undo_parent)
|
2021-02-26 13:53:34 +00:00
|
|
|
{
|
|
|
|
--undo_parent.stat.cversion;
|
|
|
|
--undo_parent.stat.numChildren;
|
|
|
|
--undo_parent.seq_num;
|
|
|
|
undo_parent.children.erase(child_path);
|
|
|
|
});
|
|
|
|
};
|
2020-10-30 19:57:30 +00:00
|
|
|
|
|
|
|
response.error = Coordination::Error::ZOK;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return { response_ptr, undo };
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
struct KeeperStorageGetRequest final : public KeeperStorageRequest
|
2020-11-03 14:49:30 +00:00
|
|
|
{
|
2021-05-21 21:19:22 +00:00
|
|
|
|
|
|
|
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
|
|
|
|
{
|
|
|
|
auto & container = storage.container;
|
2021-05-22 07:38:50 +00:00
|
|
|
auto it = container.find(zk_request->getPath());
|
2021-05-21 21:19:22 +00:00
|
|
|
if (it == container.end())
|
|
|
|
return true;
|
|
|
|
|
|
|
|
const auto & node_acls = it->value.acls;
|
|
|
|
const auto & session_auths = storage.session_and_auth[session_id];
|
|
|
|
return checkACL(Coordination::ACL::Read, node_acls, session_auths);
|
|
|
|
}
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
using KeeperStorageRequest::KeeperStorageRequest;
|
2021-05-21 21:19:22 +00:00
|
|
|
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /* zxid */, int64_t /* session_id */) const override
|
2020-11-03 14:49:30 +00:00
|
|
|
{
|
2021-05-21 21:19:22 +00:00
|
|
|
auto & container = storage.container;
|
2020-11-03 14:49:30 +00:00
|
|
|
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
|
|
|
|
Coordination::ZooKeeperGetResponse & response = dynamic_cast<Coordination::ZooKeeperGetResponse &>(*response_ptr);
|
|
|
|
Coordination::ZooKeeperGetRequest & request = dynamic_cast<Coordination::ZooKeeperGetRequest &>(*zk_request);
|
|
|
|
|
|
|
|
auto it = container.find(request.path);
|
|
|
|
if (it == container.end())
|
|
|
|
{
|
|
|
|
response.error = Coordination::Error::ZNONODE;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2021-02-26 13:53:34 +00:00
|
|
|
response.stat = it->value.stat;
|
|
|
|
response.data = it->value.data;
|
2020-11-03 14:49:30 +00:00
|
|
|
response.error = Coordination::Error::ZOK;
|
|
|
|
}
|
|
|
|
|
|
|
|
return { response_ptr, {} };
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
struct KeeperStorageRemoveRequest final : public KeeperStorageRequest
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-05-21 21:19:22 +00:00
|
|
|
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
|
|
|
|
{
|
|
|
|
auto & container = storage.container;
|
2021-05-22 07:38:50 +00:00
|
|
|
auto it = container.find(zk_request->getPath());
|
2021-05-21 21:19:22 +00:00
|
|
|
if (it == container.end())
|
|
|
|
return true;
|
|
|
|
|
|
|
|
const auto & node_acls = it->value.acls;
|
|
|
|
const auto & session_auths = storage.session_and_auth[session_id];
|
|
|
|
return checkACL(Coordination::ACL::Delete, node_acls, session_auths);
|
|
|
|
}
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
using KeeperStorageRequest::KeeperStorageRequest;
|
2021-05-21 21:19:22 +00:00
|
|
|
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/) const override
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-05-21 21:19:22 +00:00
|
|
|
auto & container = storage.container;
|
|
|
|
auto & ephemerals = storage.ephemerals;
|
|
|
|
|
2020-11-04 18:54:55 +00:00
|
|
|
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
|
|
|
|
Coordination::ZooKeeperRemoveResponse & response = dynamic_cast<Coordination::ZooKeeperRemoveResponse &>(*response_ptr);
|
|
|
|
Coordination::ZooKeeperRemoveRequest & request = dynamic_cast<Coordination::ZooKeeperRemoveRequest &>(*zk_request);
|
|
|
|
Undo undo;
|
|
|
|
|
|
|
|
auto it = container.find(request.path);
|
|
|
|
if (it == container.end())
|
|
|
|
{
|
|
|
|
response.error = Coordination::Error::ZNONODE;
|
|
|
|
}
|
2021-02-26 13:53:34 +00:00
|
|
|
else if (request.version != -1 && request.version != it->value.stat.version)
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
|
|
|
response.error = Coordination::Error::ZBADVERSION;
|
|
|
|
}
|
2021-02-26 13:53:34 +00:00
|
|
|
else if (it->value.stat.numChildren)
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
|
|
|
response.error = Coordination::Error::ZNOTEMPTY;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2021-02-26 13:53:34 +00:00
|
|
|
auto prev_node = it->value;
|
2021-03-03 12:29:00 +00:00
|
|
|
if (prev_node.stat.ephemeralOwner != 0)
|
2021-03-24 08:12:37 +00:00
|
|
|
{
|
|
|
|
auto ephemerals_it = ephemerals.find(prev_node.stat.ephemeralOwner);
|
|
|
|
ephemerals_it->second.erase(request.path);
|
|
|
|
if (ephemerals_it->second.empty())
|
|
|
|
ephemerals.erase(ephemerals_it);
|
|
|
|
}
|
2020-11-19 16:06:19 +00:00
|
|
|
|
2021-02-26 13:53:34 +00:00
|
|
|
auto child_basename = getBaseName(it->key);
|
2021-03-29 08:24:56 +00:00
|
|
|
container.updateValue(parentPath(request.path), [&child_basename] (KeeperStorage::Node & parent)
|
2021-02-26 13:53:34 +00:00
|
|
|
{
|
|
|
|
--parent.stat.numChildren;
|
|
|
|
++parent.stat.cversion;
|
|
|
|
parent.children.erase(child_basename);
|
|
|
|
});
|
|
|
|
|
2020-11-04 18:54:55 +00:00
|
|
|
response.error = Coordination::Error::ZOK;
|
|
|
|
|
2021-02-26 13:53:34 +00:00
|
|
|
container.erase(request.path);
|
2021-02-19 07:05:52 +00:00
|
|
|
|
2021-03-24 08:12:37 +00:00
|
|
|
undo = [prev_node, &container, &ephemerals, path = request.path, child_basename]
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-03-03 12:29:00 +00:00
|
|
|
if (prev_node.stat.ephemeralOwner != 0)
|
2021-03-24 08:12:37 +00:00
|
|
|
ephemerals[prev_node.stat.ephemeralOwner].emplace(path);
|
2020-11-20 08:37:16 +00:00
|
|
|
|
2021-02-26 13:53:34 +00:00
|
|
|
container.insert(path, prev_node);
|
2021-03-29 08:24:56 +00:00
|
|
|
container.updateValue(parentPath(path), [&child_basename] (KeeperStorage::Node & parent)
|
2021-02-26 13:53:34 +00:00
|
|
|
{
|
|
|
|
++parent.stat.numChildren;
|
|
|
|
--parent.stat.cversion;
|
|
|
|
parent.children.insert(child_basename);
|
|
|
|
});
|
2020-11-04 18:54:55 +00:00
|
|
|
};
|
|
|
|
}
|
|
|
|
|
|
|
|
return { response_ptr, undo };
|
|
|
|
}
|
2020-11-10 13:43:10 +00:00
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override
|
2020-11-10 13:43:10 +00:00
|
|
|
{
|
2021-01-19 14:22:28 +00:00
|
|
|
return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED);
|
2020-11-10 13:43:10 +00:00
|
|
|
}
|
2020-11-04 18:54:55 +00:00
|
|
|
};
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
struct KeeperStorageExistsRequest final : public KeeperStorageRequest
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-05-21 21:19:22 +00:00
|
|
|
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
|
|
|
|
{
|
|
|
|
auto & container = storage.container;
|
2021-05-22 07:38:50 +00:00
|
|
|
auto it = container.find(zk_request->getPath());
|
2021-05-21 21:19:22 +00:00
|
|
|
if (it == container.end())
|
|
|
|
return true;
|
|
|
|
|
|
|
|
const auto & node_acls = it->value.acls;
|
|
|
|
const auto & session_auths = storage.session_and_auth[session_id];
|
|
|
|
return checkACL(Coordination::ACL::Read, node_acls, session_auths);
|
|
|
|
}
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
using KeeperStorageRequest::KeeperStorageRequest;
|
2021-05-21 21:19:22 +00:00
|
|
|
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /* session_id */) const override
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-05-21 21:19:22 +00:00
|
|
|
auto & container = storage.container;
|
|
|
|
|
2020-11-04 18:54:55 +00:00
|
|
|
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
|
|
|
|
Coordination::ZooKeeperExistsResponse & response = dynamic_cast<Coordination::ZooKeeperExistsResponse &>(*response_ptr);
|
|
|
|
Coordination::ZooKeeperExistsRequest & request = dynamic_cast<Coordination::ZooKeeperExistsRequest &>(*zk_request);
|
|
|
|
|
|
|
|
auto it = container.find(request.path);
|
|
|
|
if (it != container.end())
|
|
|
|
{
|
2021-02-26 13:53:34 +00:00
|
|
|
response.stat = it->value.stat;
|
2020-11-04 18:54:55 +00:00
|
|
|
response.error = Coordination::Error::ZOK;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
response.error = Coordination::Error::ZNONODE;
|
|
|
|
}
|
|
|
|
|
|
|
|
return { response_ptr, {} };
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
struct KeeperStorageSetRequest final : public KeeperStorageRequest
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-05-21 21:19:22 +00:00
|
|
|
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
|
|
|
|
{
|
|
|
|
auto & container = storage.container;
|
2021-05-22 07:38:50 +00:00
|
|
|
auto it = container.find(zk_request->getPath());
|
2021-05-21 21:19:22 +00:00
|
|
|
if (it == container.end())
|
|
|
|
return true;
|
|
|
|
|
|
|
|
const auto & node_acls = it->value.acls;
|
|
|
|
const auto & session_auths = storage.session_and_auth[session_id];
|
|
|
|
return checkACL(Coordination::ACL::Write, node_acls, session_auths);
|
|
|
|
}
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
using KeeperStorageRequest::KeeperStorageRequest;
|
2021-05-21 21:19:22 +00:00
|
|
|
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t /* session_id */) const override
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-05-21 21:19:22 +00:00
|
|
|
auto & container = storage.container;
|
|
|
|
|
2020-11-04 18:54:55 +00:00
|
|
|
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
|
|
|
|
Coordination::ZooKeeperSetResponse & response = dynamic_cast<Coordination::ZooKeeperSetResponse &>(*response_ptr);
|
|
|
|
Coordination::ZooKeeperSetRequest & request = dynamic_cast<Coordination::ZooKeeperSetRequest &>(*zk_request);
|
|
|
|
Undo undo;
|
|
|
|
|
|
|
|
auto it = container.find(request.path);
|
|
|
|
if (it == container.end())
|
|
|
|
{
|
|
|
|
response.error = Coordination::Error::ZNONODE;
|
|
|
|
}
|
2021-02-26 13:53:34 +00:00
|
|
|
else if (request.version == -1 || request.version == it->value.stat.version)
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-02-26 13:53:34 +00:00
|
|
|
auto prev_node = it->value;
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
auto itr = container.updateValue(request.path, [zxid, request] (KeeperStorage::Node & value)
|
2021-02-26 13:53:34 +00:00
|
|
|
{
|
|
|
|
value.data = request.data;
|
|
|
|
value.stat.version++;
|
|
|
|
value.stat.mzxid = zxid;
|
|
|
|
value.stat.mtime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
|
|
|
|
value.stat.dataLength = request.data.length();
|
|
|
|
value.data = request.data;
|
|
|
|
});
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
container.updateValue(parentPath(request.path), [] (KeeperStorage::Node & parent)
|
2021-02-26 13:53:34 +00:00
|
|
|
{
|
|
|
|
parent.stat.cversion++;
|
|
|
|
});
|
|
|
|
|
|
|
|
response.stat = itr->value.stat;
|
2020-11-04 18:54:55 +00:00
|
|
|
response.error = Coordination::Error::ZOK;
|
|
|
|
|
|
|
|
undo = [prev_node, &container, path = request.path]
|
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
container.updateValue(path, [&prev_node] (KeeperStorage::Node & value) { value = prev_node; });
|
|
|
|
container.updateValue(parentPath(path), [] (KeeperStorage::Node & parent)
|
2021-02-26 13:53:34 +00:00
|
|
|
{
|
|
|
|
parent.stat.cversion--;
|
|
|
|
});
|
2020-11-04 18:54:55 +00:00
|
|
|
};
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
response.error = Coordination::Error::ZBADVERSION;
|
|
|
|
}
|
|
|
|
|
2020-11-11 13:07:06 +00:00
|
|
|
return { response_ptr, undo };
|
2020-11-04 18:54:55 +00:00
|
|
|
}
|
2020-11-10 13:43:10 +00:00
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override
|
2020-11-10 13:43:10 +00:00
|
|
|
{
|
2021-01-19 14:22:28 +00:00
|
|
|
return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CHANGED);
|
2020-11-10 13:43:10 +00:00
|
|
|
}
|
2020-11-04 18:54:55 +00:00
|
|
|
};
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
struct KeeperStorageListRequest final : public KeeperStorageRequest
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-05-21 21:19:22 +00:00
|
|
|
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
|
|
|
|
{
|
|
|
|
auto & container = storage.container;
|
2021-05-22 07:38:50 +00:00
|
|
|
auto it = container.find(zk_request->getPath());
|
2021-05-21 21:19:22 +00:00
|
|
|
if (it == container.end())
|
|
|
|
return true;
|
|
|
|
|
|
|
|
const auto & node_acls = it->value.acls;
|
|
|
|
const auto & session_auths = storage.session_and_auth[session_id];
|
|
|
|
return checkACL(Coordination::ACL::Read, node_acls, session_auths);
|
|
|
|
}
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
using KeeperStorageRequest::KeeperStorageRequest;
|
2021-05-21 21:19:22 +00:00
|
|
|
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/) const override
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-05-21 21:19:22 +00:00
|
|
|
auto & container = storage.container;
|
2020-11-04 18:54:55 +00:00
|
|
|
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
|
|
|
|
Coordination::ZooKeeperListResponse & response = dynamic_cast<Coordination::ZooKeeperListResponse &>(*response_ptr);
|
|
|
|
Coordination::ZooKeeperListRequest & request = dynamic_cast<Coordination::ZooKeeperListRequest &>(*zk_request);
|
|
|
|
auto it = container.find(request.path);
|
|
|
|
if (it == container.end())
|
|
|
|
{
|
|
|
|
response.error = Coordination::Error::ZNONODE;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
auto path_prefix = request.path;
|
|
|
|
if (path_prefix.empty())
|
2020-11-25 13:19:09 +00:00
|
|
|
throw DB::Exception("Logical error: path cannot be empty", ErrorCodes::LOGICAL_ERROR);
|
2020-11-04 18:54:55 +00:00
|
|
|
|
2021-02-26 13:53:34 +00:00
|
|
|
response.names.insert(response.names.end(), it->value.children.begin(), it->value.children.end());
|
2020-11-04 18:54:55 +00:00
|
|
|
|
2021-02-26 13:53:34 +00:00
|
|
|
response.stat = it->value.stat;
|
2020-11-04 18:54:55 +00:00
|
|
|
response.error = Coordination::Error::ZOK;
|
|
|
|
}
|
|
|
|
|
|
|
|
return { response_ptr, {} };
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
struct KeeperStorageCheckRequest final : public KeeperStorageRequest
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-05-21 21:19:22 +00:00
|
|
|
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
|
|
|
|
{
|
|
|
|
auto & container = storage.container;
|
2021-05-22 07:38:50 +00:00
|
|
|
auto it = container.find(zk_request->getPath());
|
2021-05-21 21:19:22 +00:00
|
|
|
if (it == container.end())
|
|
|
|
return true;
|
|
|
|
|
|
|
|
const auto & node_acls = it->value.acls;
|
|
|
|
const auto & session_auths = storage.session_and_auth[session_id];
|
|
|
|
return checkACL(Coordination::ACL::Read, node_acls, session_auths);
|
|
|
|
}
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
using KeeperStorageRequest::KeeperStorageRequest;
|
2021-05-21 21:19:22 +00:00
|
|
|
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/) const override
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-05-21 21:19:22 +00:00
|
|
|
auto & container = storage.container;
|
|
|
|
|
2020-11-04 18:54:55 +00:00
|
|
|
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
|
|
|
|
Coordination::ZooKeeperCheckResponse & response = dynamic_cast<Coordination::ZooKeeperCheckResponse &>(*response_ptr);
|
|
|
|
Coordination::ZooKeeperCheckRequest & request = dynamic_cast<Coordination::ZooKeeperCheckRequest &>(*zk_request);
|
|
|
|
auto it = container.find(request.path);
|
|
|
|
if (it == container.end())
|
|
|
|
{
|
|
|
|
response.error = Coordination::Error::ZNONODE;
|
|
|
|
}
|
2021-02-26 13:53:34 +00:00
|
|
|
else if (request.version != -1 && request.version != it->value.stat.version)
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
|
|
|
response.error = Coordination::Error::ZBADVERSION;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
response.error = Coordination::Error::ZOK;
|
|
|
|
}
|
|
|
|
|
|
|
|
return { response_ptr, {} };
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
struct KeeperStorageMultiRequest final : public KeeperStorageRequest
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-05-21 21:19:22 +00:00
|
|
|
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
|
|
|
|
{
|
|
|
|
for (const auto & concrete_request : concrete_requests)
|
|
|
|
if (!concrete_request->checkAuth(storage, session_id))
|
|
|
|
return false;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
std::vector<KeeperStorageRequestPtr> concrete_requests;
|
|
|
|
explicit KeeperStorageMultiRequest(const Coordination::ZooKeeperRequestPtr & zk_request_)
|
|
|
|
: KeeperStorageRequest(zk_request_)
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
|
|
|
Coordination::ZooKeeperMultiRequest & request = dynamic_cast<Coordination::ZooKeeperMultiRequest &>(*zk_request);
|
|
|
|
concrete_requests.reserve(request.requests.size());
|
|
|
|
|
2020-11-25 13:19:09 +00:00
|
|
|
for (const auto & sub_request : request.requests)
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2020-12-10 21:33:13 +00:00
|
|
|
auto sub_zk_request = std::dynamic_pointer_cast<Coordination::ZooKeeperRequest>(sub_request);
|
2020-11-25 13:19:09 +00:00
|
|
|
if (sub_zk_request->getOpNum() == Coordination::OpNum::Create)
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
concrete_requests.push_back(std::make_shared<KeeperStorageCreateRequest>(sub_zk_request));
|
2020-11-04 18:54:55 +00:00
|
|
|
}
|
2020-11-25 13:19:09 +00:00
|
|
|
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Remove)
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
concrete_requests.push_back(std::make_shared<KeeperStorageRemoveRequest>(sub_zk_request));
|
2020-11-04 18:54:55 +00:00
|
|
|
}
|
2020-11-25 13:19:09 +00:00
|
|
|
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Set)
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
concrete_requests.push_back(std::make_shared<KeeperStorageSetRequest>(sub_zk_request));
|
2020-11-04 18:54:55 +00:00
|
|
|
}
|
2020-11-25 13:19:09 +00:00
|
|
|
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Check)
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
concrete_requests.push_back(std::make_shared<KeeperStorageCheckRequest>(sub_zk_request));
|
2020-11-04 18:54:55 +00:00
|
|
|
}
|
|
|
|
else
|
2020-11-25 13:19:09 +00:00
|
|
|
throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Illegal command as part of multi ZooKeeper request {}", sub_zk_request->getOpNum());
|
2020-11-04 18:54:55 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-05-21 21:19:22 +00:00
|
|
|
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t session_id) const override
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
|
|
|
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
|
|
|
|
Coordination::ZooKeeperMultiResponse & response = dynamic_cast<Coordination::ZooKeeperMultiResponse &>(*response_ptr);
|
|
|
|
std::vector<Undo> undo_actions;
|
|
|
|
|
|
|
|
try
|
|
|
|
{
|
2020-11-09 17:50:39 +00:00
|
|
|
size_t i = 0;
|
2020-11-04 18:54:55 +00:00
|
|
|
for (const auto & concrete_request : concrete_requests)
|
|
|
|
{
|
2021-05-21 21:19:22 +00:00
|
|
|
auto [ cur_response, undo_action ] = concrete_request->process(storage, zxid, session_id);
|
2020-12-08 13:28:39 +00:00
|
|
|
|
2020-11-09 17:50:39 +00:00
|
|
|
response.responses[i] = cur_response;
|
2020-11-04 18:54:55 +00:00
|
|
|
if (cur_response->error != Coordination::Error::ZOK)
|
|
|
|
{
|
2020-12-08 13:28:39 +00:00
|
|
|
for (size_t j = 0; j <= i; ++j)
|
|
|
|
{
|
|
|
|
auto response_error = response.responses[j]->error;
|
|
|
|
response.responses[j] = std::make_shared<Coordination::ZooKeeperErrorResponse>();
|
|
|
|
response.responses[j]->error = response_error;
|
|
|
|
}
|
|
|
|
|
|
|
|
for (size_t j = i + 1; j < response.responses.size(); ++j)
|
|
|
|
{
|
|
|
|
response.responses[j] = std::make_shared<Coordination::ZooKeeperErrorResponse>();
|
|
|
|
response.responses[j]->error = Coordination::Error::ZRUNTIMEINCONSISTENCY;
|
|
|
|
}
|
|
|
|
|
2020-11-04 18:54:55 +00:00
|
|
|
for (auto it = undo_actions.rbegin(); it != undo_actions.rend(); ++it)
|
|
|
|
if (*it)
|
|
|
|
(*it)();
|
|
|
|
|
|
|
|
return { response_ptr, {} };
|
|
|
|
}
|
|
|
|
else
|
|
|
|
undo_actions.emplace_back(std::move(undo_action));
|
2020-12-08 13:28:39 +00:00
|
|
|
|
2020-11-09 17:50:39 +00:00
|
|
|
++i;
|
2020-11-04 18:54:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
response.error = Coordination::Error::ZOK;
|
|
|
|
return { response_ptr, {} };
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
for (auto it = undo_actions.rbegin(); it != undo_actions.rend(); ++it)
|
|
|
|
if (*it)
|
|
|
|
(*it)();
|
|
|
|
throw;
|
|
|
|
}
|
|
|
|
}
|
2020-11-10 13:43:10 +00:00
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override
|
2020-11-10 13:43:10 +00:00
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
KeeperStorage::ResponsesForSessions result;
|
2020-11-10 13:43:10 +00:00
|
|
|
for (const auto & generic_request : concrete_requests)
|
2021-01-19 14:22:28 +00:00
|
|
|
{
|
|
|
|
auto responses = generic_request->processWatches(watches, list_watches);
|
|
|
|
result.insert(result.end(), responses.begin(), responses.end());
|
|
|
|
}
|
|
|
|
return result;
|
2020-11-10 13:43:10 +00:00
|
|
|
}
|
2020-11-04 18:54:55 +00:00
|
|
|
};
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
struct KeeperStorageCloseRequest final : public KeeperStorageRequest
|
2020-11-19 16:06:19 +00:00
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
using KeeperStorageRequest::KeeperStorageRequest;
|
2021-05-21 21:19:22 +00:00
|
|
|
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage &, int64_t, int64_t) const override
|
2020-11-19 16:06:19 +00:00
|
|
|
{
|
2020-11-25 13:19:09 +00:00
|
|
|
throw DB::Exception("Called process on close request", ErrorCodes::LOGICAL_ERROR);
|
2020-11-19 16:06:19 +00:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2021-05-15 15:01:00 +00:00
|
|
|
struct KeeperStorageAuthRequest final : public KeeperStorageRequest
|
|
|
|
{
|
|
|
|
using KeeperStorageRequest::KeeperStorageRequest;
|
2021-05-21 21:19:22 +00:00
|
|
|
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t session_id) const override
|
2021-05-15 15:01:00 +00:00
|
|
|
{
|
2021-05-21 21:19:22 +00:00
|
|
|
Coordination::ZooKeeperAuthRequest & auth_request = dynamic_cast<Coordination::ZooKeeperAuthRequest &>(*zk_request);
|
2021-05-15 15:01:00 +00:00
|
|
|
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
|
2021-05-21 21:19:22 +00:00
|
|
|
Coordination::ZooKeeperAuthResponse & auth_response = dynamic_cast<Coordination::ZooKeeperAuthResponse &>(*response_ptr);
|
2021-05-22 16:07:47 +00:00
|
|
|
auto & sessions_and_auth = storage.session_and_auth;
|
2021-05-21 21:19:22 +00:00
|
|
|
|
2021-05-22 16:07:47 +00:00
|
|
|
if (auth_request.scheme == "super")
|
|
|
|
{
|
|
|
|
if (generateDigest(auth_request.data) == storage.superdigest)
|
|
|
|
{
|
|
|
|
KeeperStorage::AuthID auth{"super", ""};
|
|
|
|
sessions_and_auth[session_id].emplace_back(auth);
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
auth_response.error = Coordination::Error::ZAUTHFAILED;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else if (auth_request.scheme == "world" && auth_request.data == "anyone")
|
|
|
|
{
|
|
|
|
KeeperStorage::AuthID auth{"world", "anyone"};
|
|
|
|
sessions_and_auth[session_id].emplace_back(auth);
|
|
|
|
}
|
|
|
|
else if (auth_request.scheme != "digest")
|
2021-05-21 21:19:22 +00:00
|
|
|
{
|
|
|
|
auth_response.error = Coordination::Error::ZAUTHFAILED;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
2021-05-22 16:07:47 +00:00
|
|
|
KeeperStorage::AuthID auth{auth_request.scheme, generateDigest(auth_request.data)};
|
2021-05-21 21:19:22 +00:00
|
|
|
auto & session_ids = sessions_and_auth[session_id];
|
2021-05-22 16:07:47 +00:00
|
|
|
if (std::find(session_ids.begin(), session_ids.end(), auth) == session_ids.end())
|
|
|
|
sessions_and_auth[session_id].emplace_back(auth);
|
2021-05-21 21:19:22 +00:00
|
|
|
}
|
|
|
|
|
2021-05-15 15:01:00 +00:00
|
|
|
return { response_ptr, {} };
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
void KeeperStorage::finalize()
|
2020-11-03 14:49:30 +00:00
|
|
|
{
|
2021-01-19 14:22:28 +00:00
|
|
|
if (finalized)
|
|
|
|
throw DB::Exception("Testkeeper storage already finalized", ErrorCodes::LOGICAL_ERROR);
|
2020-11-10 13:43:10 +00:00
|
|
|
|
2021-01-19 14:22:28 +00:00
|
|
|
finalized = true;
|
2020-11-03 14:49:30 +00:00
|
|
|
|
2021-02-04 13:22:30 +00:00
|
|
|
for (const auto & [session_id, ephemerals_paths] : ephemerals)
|
|
|
|
for (const String & ephemeral_path : ephemerals_paths)
|
2021-02-04 12:07:41 +00:00
|
|
|
container.erase(ephemeral_path);
|
|
|
|
|
|
|
|
ephemerals.clear();
|
2020-11-18 20:36:25 +00:00
|
|
|
|
2021-01-19 14:22:28 +00:00
|
|
|
watches.clear();
|
|
|
|
list_watches.clear();
|
|
|
|
sessions_and_watchers.clear();
|
2021-02-04 12:07:41 +00:00
|
|
|
session_expiry_queue.clear();
|
2020-11-04 18:54:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
class KeeperWrapperFactory final : private boost::noncopyable
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
|
|
|
|
|
|
|
public:
|
2021-03-29 08:24:56 +00:00
|
|
|
using Creator = std::function<KeeperStorageRequestPtr(const Coordination::ZooKeeperRequestPtr &)>;
|
2020-11-11 13:07:06 +00:00
|
|
|
using OpNumToRequest = std::unordered_map<Coordination::OpNum, Creator>;
|
2020-11-04 18:54:55 +00:00
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
static KeeperWrapperFactory & instance()
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
static KeeperWrapperFactory factory;
|
2020-11-04 18:54:55 +00:00
|
|
|
return factory;
|
|
|
|
}
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
KeeperStorageRequestPtr get(const Coordination::ZooKeeperRequestPtr & zk_request) const
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
|
|
|
auto it = op_num_to_request.find(zk_request->getOpNum());
|
|
|
|
if (it == op_num_to_request.end())
|
2020-11-25 13:19:09 +00:00
|
|
|
throw DB::Exception("Unknown operation type " + toString(zk_request->getOpNum()), ErrorCodes::LOGICAL_ERROR);
|
2020-11-04 18:54:55 +00:00
|
|
|
|
|
|
|
return it->second(zk_request);
|
|
|
|
}
|
2020-11-03 14:49:30 +00:00
|
|
|
|
2020-11-11 13:07:06 +00:00
|
|
|
void registerRequest(Coordination::OpNum op_num, Creator creator)
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
|
|
|
if (!op_num_to_request.try_emplace(op_num, creator).second)
|
|
|
|
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Request with op num {} already registered", op_num);
|
|
|
|
}
|
|
|
|
|
|
|
|
private:
|
|
|
|
OpNumToRequest op_num_to_request;
|
2021-03-29 08:24:56 +00:00
|
|
|
KeeperWrapperFactory();
|
2020-11-04 18:54:55 +00:00
|
|
|
};
|
|
|
|
|
2020-11-11 13:07:06 +00:00
|
|
|
template<Coordination::OpNum num, typename RequestT>
|
2021-03-29 08:24:56 +00:00
|
|
|
void registerKeeperRequestWrapper(KeeperWrapperFactory & factory)
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
|
|
|
factory.registerRequest(num, [] (const Coordination::ZooKeeperRequestPtr & zk_request) { return std::make_shared<RequestT>(zk_request); });
|
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
KeeperWrapperFactory::KeeperWrapperFactory()
|
2020-11-04 18:54:55 +00:00
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
registerKeeperRequestWrapper<Coordination::OpNum::Heartbeat, KeeperStorageHeartbeatRequest>(*this);
|
|
|
|
registerKeeperRequestWrapper<Coordination::OpNum::Sync, KeeperStorageSyncRequest>(*this);
|
2021-05-15 15:01:00 +00:00
|
|
|
registerKeeperRequestWrapper<Coordination::OpNum::Auth, KeeperStorageAuthRequest>(*this);
|
2021-03-29 08:24:56 +00:00
|
|
|
registerKeeperRequestWrapper<Coordination::OpNum::Close, KeeperStorageCloseRequest>(*this);
|
|
|
|
registerKeeperRequestWrapper<Coordination::OpNum::Create, KeeperStorageCreateRequest>(*this);
|
|
|
|
registerKeeperRequestWrapper<Coordination::OpNum::Remove, KeeperStorageRemoveRequest>(*this);
|
|
|
|
registerKeeperRequestWrapper<Coordination::OpNum::Exists, KeeperStorageExistsRequest>(*this);
|
|
|
|
registerKeeperRequestWrapper<Coordination::OpNum::Get, KeeperStorageGetRequest>(*this);
|
|
|
|
registerKeeperRequestWrapper<Coordination::OpNum::Set, KeeperStorageSetRequest>(*this);
|
|
|
|
registerKeeperRequestWrapper<Coordination::OpNum::List, KeeperStorageListRequest>(*this);
|
|
|
|
registerKeeperRequestWrapper<Coordination::OpNum::SimpleList, KeeperStorageListRequest>(*this);
|
|
|
|
registerKeeperRequestWrapper<Coordination::OpNum::Check, KeeperStorageCheckRequest>(*this);
|
|
|
|
registerKeeperRequestWrapper<Coordination::OpNum::Multi, KeeperStorageMultiRequest>(*this);
|
2020-11-03 14:49:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id, std::optional<int64_t> new_last_zxid)
|
2020-11-03 14:49:30 +00:00
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
KeeperStorage::ResponsesForSessions results;
|
2021-02-25 08:34:05 +00:00
|
|
|
if (new_last_zxid)
|
|
|
|
{
|
|
|
|
if (zxid >= *new_last_zxid)
|
2021-03-04 11:22:59 +00:00
|
|
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got new ZXID {} smaller or equal than current {}. It's a bug", *new_last_zxid, zxid);
|
2021-02-25 08:34:05 +00:00
|
|
|
zxid = *new_last_zxid;
|
|
|
|
}
|
|
|
|
|
2021-03-06 14:14:38 +00:00
|
|
|
session_expiry_queue.update(session_id, session_and_timeout[session_id]);
|
2021-01-19 14:22:28 +00:00
|
|
|
if (zk_request->getOpNum() == Coordination::OpNum::Close)
|
2020-11-03 14:49:30 +00:00
|
|
|
{
|
2021-01-19 14:22:28 +00:00
|
|
|
auto it = ephemerals.find(session_id);
|
|
|
|
if (it != ephemerals.end())
|
|
|
|
{
|
|
|
|
for (const auto & ephemeral_path : it->second)
|
|
|
|
{
|
|
|
|
container.erase(ephemeral_path);
|
2021-03-29 08:24:56 +00:00
|
|
|
container.updateValue(parentPath(ephemeral_path), [&ephemeral_path] (KeeperStorage::Node & parent)
|
2021-03-16 12:36:54 +00:00
|
|
|
{
|
|
|
|
--parent.stat.numChildren;
|
|
|
|
++parent.stat.cversion;
|
|
|
|
parent.children.erase(getBaseName(ephemeral_path));
|
|
|
|
});
|
|
|
|
|
2021-01-19 14:22:28 +00:00
|
|
|
auto responses = processWatchesImpl(ephemeral_path, watches, list_watches, Coordination::Event::DELETED);
|
|
|
|
results.insert(results.end(), responses.begin(), responses.end());
|
|
|
|
}
|
|
|
|
ephemerals.erase(it);
|
|
|
|
}
|
|
|
|
clearDeadWatches(session_id);
|
|
|
|
|
|
|
|
/// Finish connection
|
|
|
|
auto response = std::make_shared<Coordination::ZooKeeperCloseResponse>();
|
|
|
|
response->xid = zk_request->xid;
|
|
|
|
response->zxid = getZXID();
|
2021-02-03 20:32:15 +00:00
|
|
|
session_expiry_queue.remove(session_id);
|
|
|
|
session_and_timeout.erase(session_id);
|
|
|
|
results.push_back(ResponseForSession{session_id, response});
|
|
|
|
}
|
|
|
|
else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat)
|
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
KeeperStorageRequestPtr storage_request = KeeperWrapperFactory::instance().get(zk_request);
|
2021-05-21 21:19:22 +00:00
|
|
|
auto [response, _] = storage_request->process(*this, zxid, session_id);
|
2021-02-03 20:32:15 +00:00
|
|
|
response->xid = zk_request->xid;
|
|
|
|
response->zxid = getZXID();
|
|
|
|
|
2021-01-19 14:45:45 +00:00
|
|
|
results.push_back(ResponseForSession{session_id, response});
|
2020-11-03 14:49:30 +00:00
|
|
|
}
|
2021-01-19 14:22:28 +00:00
|
|
|
else
|
2020-11-03 14:49:30 +00:00
|
|
|
{
|
2021-03-29 08:24:56 +00:00
|
|
|
KeeperStorageRequestPtr storage_request = KeeperWrapperFactory::instance().get(zk_request);
|
2021-05-21 21:19:22 +00:00
|
|
|
Coordination::ZooKeeperResponsePtr response;
|
|
|
|
|
|
|
|
if (!storage_request->checkAuth(*this, session_id))
|
|
|
|
{
|
|
|
|
response = zk_request->makeResponse();
|
|
|
|
response->error = Coordination::Error::ZAUTHFAILED;
|
|
|
|
}
|
|
|
|
else
|
|
|
|
{
|
|
|
|
std::tie(response, std::ignore) = storage_request->process(*this, zxid, session_id);
|
|
|
|
}
|
2021-01-19 14:22:28 +00:00
|
|
|
|
|
|
|
if (zk_request->has_watch)
|
|
|
|
{
|
|
|
|
if (response->error == Coordination::Error::ZOK)
|
|
|
|
{
|
|
|
|
auto & watches_type = zk_request->getOpNum() == Coordination::OpNum::List || zk_request->getOpNum() == Coordination::OpNum::SimpleList
|
|
|
|
? list_watches
|
|
|
|
: watches;
|
|
|
|
|
|
|
|
watches_type[zk_request->getPath()].emplace_back(session_id);
|
|
|
|
sessions_and_watchers[session_id].emplace(zk_request->getPath());
|
|
|
|
}
|
|
|
|
else if (response->error == Coordination::Error::ZNONODE && zk_request->getOpNum() == Coordination::OpNum::Exists)
|
|
|
|
{
|
|
|
|
watches[zk_request->getPath()].emplace_back(session_id);
|
|
|
|
sessions_and_watchers[session_id].emplace(zk_request->getPath());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if (response->error == Coordination::Error::ZOK)
|
|
|
|
{
|
|
|
|
auto watch_responses = storage_request->processWatches(watches, list_watches);
|
|
|
|
results.insert(results.end(), watch_responses.begin(), watch_responses.end());
|
|
|
|
}
|
|
|
|
|
|
|
|
response->xid = zk_request->xid;
|
|
|
|
response->zxid = getZXID();
|
|
|
|
|
2021-01-19 14:45:45 +00:00
|
|
|
results.push_back(ResponseForSession{session_id, response});
|
2020-11-03 14:49:30 +00:00
|
|
|
}
|
2021-01-19 14:22:28 +00:00
|
|
|
|
|
|
|
return results;
|
2020-11-03 14:49:30 +00:00
|
|
|
}
|
|
|
|
|
2021-01-19 14:22:28 +00:00
|
|
|
|
2021-03-29 08:24:56 +00:00
|
|
|
void KeeperStorage::clearDeadWatches(int64_t session_id)
|
2020-11-26 14:57:32 +00:00
|
|
|
{
|
|
|
|
auto watches_it = sessions_and_watchers.find(session_id);
|
|
|
|
if (watches_it != sessions_and_watchers.end())
|
|
|
|
{
|
|
|
|
for (const auto & watch_path : watches_it->second)
|
|
|
|
{
|
|
|
|
auto watch = watches.find(watch_path);
|
|
|
|
if (watch != watches.end())
|
|
|
|
{
|
|
|
|
auto & watches_for_path = watch->second;
|
|
|
|
for (auto w_it = watches_for_path.begin(); w_it != watches_for_path.end();)
|
|
|
|
{
|
2021-01-19 14:22:28 +00:00
|
|
|
if (*w_it == session_id)
|
2020-11-26 14:57:32 +00:00
|
|
|
w_it = watches_for_path.erase(w_it);
|
|
|
|
else
|
|
|
|
++w_it;
|
|
|
|
}
|
|
|
|
if (watches_for_path.empty())
|
|
|
|
watches.erase(watch);
|
|
|
|
}
|
2020-12-14 16:01:29 +00:00
|
|
|
|
|
|
|
auto list_watch = list_watches.find(watch_path);
|
|
|
|
if (list_watch != list_watches.end())
|
|
|
|
{
|
|
|
|
auto & list_watches_for_path = list_watch->second;
|
|
|
|
for (auto w_it = list_watches_for_path.begin(); w_it != list_watches_for_path.end();)
|
|
|
|
{
|
2021-01-19 14:22:28 +00:00
|
|
|
if (*w_it == session_id)
|
2020-12-14 16:01:29 +00:00
|
|
|
w_it = list_watches_for_path.erase(w_it);
|
|
|
|
else
|
|
|
|
++w_it;
|
|
|
|
}
|
|
|
|
if (list_watches_for_path.empty())
|
|
|
|
list_watches.erase(list_watch);
|
|
|
|
}
|
2020-11-26 14:57:32 +00:00
|
|
|
}
|
|
|
|
sessions_and_watchers.erase(watches_it);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-10-30 14:16:47 +00:00
|
|
|
}
|