Fix splitting impl of basic operations

This commit is contained in:
Antonio Andelic 2022-05-09 07:02:11 +00:00
parent cff68aa31f
commit 7f6fa9fe83
5 changed files with 128 additions and 125 deletions

View File

@ -505,28 +505,7 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
}
if (initialized_flag)
{
switch (type)
{
case nuraft::cb_func::PreAppendLog:
{
auto & entry = *static_cast<LogEntryPtr *>(param->ctx);
auto log_store = state_manager->load_log_store();
auto next_idx = log_store->next_slot();
auto maybe_digest = state_machine->preprocess(next_idx, entry->get_buf());
if (maybe_digest)
{
auto & buff = entry->get_buf();
DB::WriteBuffer buf(reinterpret_cast<BufferBase::Position>(buff.data_begin() + buff.size() - sizeof(int64_t)), buff.size());
DB::writeIntBinary(*maybe_digest, buf);
}
return nuraft::cb_func::ReturnCode::Ok;
}
default:
return nuraft::cb_func::ReturnCode::Ok;
}
}
return nuraft::cb_func::ReturnCode::Ok;
size_t last_commited = state_machine->last_commit_index();
size_t next_index = state_manager->getLogStore()->next_slot();

View File

@ -44,10 +44,6 @@ namespace
else /// backward compatibility
request_for_session.time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
int64_t digest;
readIntBinary(digest, buffer);
LOG_INFO(&Poco::Logger::get("STORAGEEE"), "Read digest {}", digest);
return request_for_session;
}
}
@ -123,14 +119,13 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::pre_commit(uint64_t log_idx, nur
return nullptr;
}
std::optional<int64_t> KeeperStateMachine::preprocess(const uint64_t log_idx, nuraft::buffer & data)
void KeeperStateMachine::preprocess(const uint64_t log_idx, nuraft::buffer & data)
{
LOG_INFO(&Poco::Logger::get("Storageeee"), "Preprocess called");
auto request_for_session = parseRequest(data);
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
return std::nullopt;
return;
std::lock_guard lock(storage_and_responses_lock);
return storage->preprocessRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, log_idx);
storage->preprocessRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, log_idx);
}
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data)

View File

@ -27,7 +27,7 @@ public:
/// Read state from the latest snapshot
void init();
std::optional<int64_t> preprocess(uint64_t log_idx, nuraft::buffer & data);
void preprocess(uint64_t log_idx, nuraft::buffer & data);
/// Currently not supported
nuraft::ptr<nuraft::buffer> pre_commit(uint64_t log_idx, nuraft::buffer & data) override;

View File

@ -285,6 +285,7 @@ Coordination::Error KeeperStorage::commit(int64_t commit_zxid, int64_t session_i
if (delta.zxid > commit_zxid)
break;
bool finish_subdelta = false;
auto result = std::visit(
Overloaded{
[&, &path = delta.path](KeeperStorage::CreateNodeDelta & create_delta)
@ -338,11 +339,24 @@ Coordination::Error KeeperStorage::commit(int64_t commit_zxid, int64_t session_i
return Coordination::Error::ZOK;
},
[&](KeeperStorage::ErrorDelta & error_delta) { return error_delta.error; }},
[&](KeeperStorage::ErrorDelta & error_delta) { return error_delta.error; },
[&](KeeperStorage::SubDeltaEnd &)
{
finish_subdelta = true;
return Coordination::Error::ZOK;
},
[&](KeeperStorage::FailedMultiDelta &) -> Coordination::Error
{
// this shouldn't be called in any process functions
fail();
}},
delta.operation);
if (result != Coordination::Error::ZOK)
return result;
if (finish_subdelta)
return Coordination::Error::ZOK;
}
return Coordination::Error::ZOK;
@ -421,8 +435,6 @@ bool KeeperStorage::removeNode(const std::string & path, int32_t version)
}
using Undo = std::function<void()>;
struct KeeperStorageRequestProcessor
{
Coordination::ZooKeeperRequestPtr zk_request;
@ -463,7 +475,7 @@ struct KeeperStorageSyncRequestProcessor final : public KeeperStorageRequestProc
auto response = zk_request->makeResponse();
dynamic_cast<Coordination::ZooKeeperSyncResponse &>(*response).path
= dynamic_cast<Coordination::ZooKeeperSyncRequest &>(*zk_request).path;
return {response, {}};
return response;
}
};
@ -508,10 +520,10 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
auto parent_path = parentPath(request.path);
auto parent_node = storage.current_nodes.getNode(std::string{parent_path});
if (parent_node == nullptr)
return {{zxid, KeeperStorage::ErrorDelta{Coordination::Error::ZNONODE}}};
return {{zxid, Coordination::Error::ZNONODE}};
else if (parent_node->stat.ephemeralOwner != 0)
return {{zxid, KeeperStorage::ErrorDelta{Coordination::Error::ZNOCHILDRENFOREPHEMERALS}}};
return {{zxid, Coordination::Error::ZNOCHILDRENFOREPHEMERALS}};
std::string path_created = request.path;
if (request.is_sequential)
@ -526,14 +538,14 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
}
if (storage.current_nodes.hasNode(path_created))
return {{zxid, KeeperStorage::ErrorDelta{Coordination::Error::ZNODEEXISTS}}};
return {{zxid, Coordination::Error::ZNODEEXISTS}};
if (getBaseName(path_created).size == 0)
return {{zxid, KeeperStorage::ErrorDelta{Coordination::Error::ZBADARGUMENTS}}};
return {{zxid, Coordination::Error::ZBADARGUMENTS}};
Coordination::ACLs node_acls;
if (!fixupACL(request.acls, storage.session_and_auth[session_id], node_acls))
return {{zxid, KeeperStorage::ErrorDelta{Coordination::Error::ZINVALIDACL}}};
return {{zxid, Coordination::Error::ZINVALIDACL}};
Coordination::Stat stat;
stat.czxid = zxid;
@ -573,14 +585,13 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /*time*/) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Undo undo;
Coordination::ZooKeeperCreateResponse & response = dynamic_cast<Coordination::ZooKeeperCreateResponse &>(*response_ptr);
auto result = storage.commit(zxid, session_id);
if (result != Coordination::Error::ZOK)
{
response.error = result;
return {response_ptr, {}};
return response_ptr;
}
const auto & deltas = storage.current_nodes.deltas;
@ -624,7 +635,7 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
Coordination::ZooKeeperGetRequest & request = dynamic_cast<Coordination::ZooKeeperGetRequest &>(*zk_request);
if (!storage.current_nodes.hasNode(request.path))
return {{zxid, KeeperStorage::ErrorDelta{Coordination::Error::ZNONODE}}};
return {{zxid, Coordination::Error::ZNONODE}};
return {};
}
@ -700,12 +711,12 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
{
if (request.restored_from_zookeeper_log)
update_parent_pzxid();
return {{zxid, KeeperStorage::ErrorDelta{Coordination::Error::ZNONODE}}};
return {{zxid, Coordination::Error::ZNONODE}};
}
else if (request.version != -1 && request.version != node->stat.version)
return {{zxid, KeeperStorage::ErrorDelta{Coordination::Error::ZBADVERSION}}};
return {{zxid, Coordination::Error::ZBADVERSION}};
else if (node->stat.numChildren)
return {{zxid, KeeperStorage::ErrorDelta{Coordination::Error::ZNOTEMPTY}}};
return {{zxid, Coordination::Error::ZNOTEMPTY}};
if (request.restored_from_zookeeper_log)
update_parent_pzxid();
@ -727,14 +738,13 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t /* time */) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Undo undo;
Coordination::ZooKeeperRemoveResponse & response = dynamic_cast<Coordination::ZooKeeperRemoveResponse &>(*response_ptr);
auto result = storage.commit(zxid, session_id);
if (result != Coordination::Error::ZOK)
{
response.error = result;
return {response_ptr, {}};
return response_ptr;
}
return response_ptr;
@ -757,7 +767,7 @@ struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestPr
Coordination::ZooKeeperExistsRequest & request = dynamic_cast<Coordination::ZooKeeperExistsRequest &>(*zk_request);
if (!storage.current_nodes.hasNode(request.path))
return {{zxid, KeeperStorage::ErrorDelta{Coordination::Error::ZNONODE}}};
return {{zxid, Coordination::Error::ZNONODE}};
return {};
}
@ -811,12 +821,12 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
std::vector<KeeperStorage::Delta> new_deltas;
if (!storage.current_nodes.hasNode(request.path))
return {{zxid, KeeperStorage::ErrorDelta{Coordination::Error::ZNONODE}}};
return {{zxid, Coordination::Error::ZNONODE}};
auto node = storage.current_nodes.getNode(request.path);
if (request.version != -1 && request.version != node->stat.version)
return {{zxid, KeeperStorage::ErrorDelta{Coordination::Error::ZBADVERSION}}};
return {{zxid, Coordination::Error::ZBADVERSION}};
new_deltas.emplace_back(
request.path,
@ -846,13 +856,12 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperSetResponse & response = dynamic_cast<Coordination::ZooKeeperSetResponse &>(*response_ptr);
Coordination::ZooKeeperSetRequest & request = dynamic_cast<Coordination::ZooKeeperSetRequest &>(*zk_request);
Undo undo;
auto result = storage.commit(zxid, session_id);
if (result != Coordination::Error::ZOK)
{
response.error = result;
return {response_ptr, {}};
return response_ptr;
}
auto it = container.find(request.path);
@ -896,7 +905,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
Coordination::ZooKeeperListRequest & request = dynamic_cast<Coordination::ZooKeeperListRequest &>(*zk_request);
if (!storage.current_nodes.hasNode(request.path))
return {{zxid, KeeperStorage::ErrorDelta{Coordination::Error::ZNONODE}}};
return {{zxid, Coordination::Error::ZNONODE}};
return {};
}
@ -958,11 +967,11 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
Coordination::ZooKeeperCheckRequest & request = dynamic_cast<Coordination::ZooKeeperCheckRequest &>(*zk_request);
if (!storage.current_nodes.hasNode(request.path))
return {{zxid, KeeperStorage::ErrorDelta{Coordination::Error::ZNONODE}}};
return {{zxid, Coordination::Error::ZNONODE}};
auto node = storage.current_nodes.getNode(request.path);
if (request.version != -1 && request.version != node->stat.version)
return {{zxid, KeeperStorage::ErrorDelta{Coordination::Error::ZBADVERSION}}};
return {{zxid, Coordination::Error::ZBADVERSION}};
return {};
}
@ -1019,19 +1028,19 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr
auto & current_nodes = storage.current_nodes;
if (!current_nodes.hasNode(request.path))
return {{zxid, KeeperStorage::ErrorDelta{Coordination::Error::ZNONODE}}};
return {{zxid, Coordination::Error::ZNONODE}};
auto node = current_nodes.getNode(request.path);
if (request.version != -1 && request.version != node->stat.aversion)
return {{zxid, KeeperStorage::ErrorDelta{Coordination::Error::ZBADVERSION}}};
return {{zxid, Coordination::Error::ZBADVERSION}};
auto & session_auth_ids = storage.session_and_auth[session_id];
Coordination::ACLs node_acls;
if (!fixupACL(request.acls, session_auth_ids, node_acls))
return {{zxid, KeeperStorage::ErrorDelta{Coordination::Error::ZINVALIDACL}}};
return {{zxid, Coordination::Error::ZINVALIDACL}};
return {
{request.path, zxid, KeeperStorage::SetACLDelta{std::move(node_acls), request.version}},
@ -1048,7 +1057,7 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr
if (result != Coordination::Error::ZOK)
{
response.error = result;
return {response_ptr, {}};
return response_ptr;
}
auto it = storage.container.find(request.path);
@ -1086,7 +1095,7 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr
Coordination::ZooKeeperGetACLRequest & request = dynamic_cast<Coordination::ZooKeeperGetACLRequest &>(*zk_request);
if (!storage.current_nodes.hasNode(request.path))
return {{zxid, KeeperStorage::ErrorDelta{Coordination::Error::ZNONODE}}};
return {{zxid, Coordination::Error::ZNONODE}};
return {};
}
@ -1154,56 +1163,77 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
}
}
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/) const override
std::vector<KeeperStorage::Delta> preprocess(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
for (const auto & concrete_request : concrete_requests)
auto & saved_deltas = storage.current_nodes.deltas;
std::vector<Coordination::Error> response_errors;
response_errors.reserve(concrete_requests.size());
for (size_t i = 0; i < concrete_requests.size(); ++i)
{
auto new_deltas = concrete_requests[i]->preprocess(storage, zxid, session_id, time);
if (auto * error = std::get_if<KeeperStorage::ErrorDelta>(&new_deltas.back().operation))
{
std::erase_if(saved_deltas, [zxid](const auto & delta) { return delta.zxid == zxid; });
response_errors.push_back(error->error);
for (size_t j = i + 1; j < concrete_requests.size(); ++j)
{
response_errors.push_back(Coordination::Error::ZRUNTIMEINCONSISTENCY);
}
return {{zxid, KeeperStorage::FailedMultiDelta{std::move(response_errors)}}};
}
new_deltas.emplace_back(zxid, KeeperStorage::SubDeltaEnd{});
response_errors.push_back(Coordination::Error::ZOK);
saved_deltas.insert(saved_deltas.end(), std::make_move_iterator(new_deltas.begin()), std::make_move_iterator(new_deltas.end()));
}
return {};
}
Coordination::ZooKeeperResponsePtr process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperMultiResponse & response = dynamic_cast<Coordination::ZooKeeperMultiResponse &>(*response_ptr);
Coordination::ZooKeeperMultiRequest & request = dynamic_cast<Coordination::ZooKeeperMultiRequest &>(*zk_request);
try
auto & deltas = storage.current_nodes.deltas;
if (auto * failed_multi = std::get_if<KeeperStorage::FailedMultiDelta>(&deltas.front().operation))
{
size_t i = 0;
for (const auto & concrete_request : concrete_requests)
for (size_t i = 0; i < concrete_requests.size(); ++i)
{
auto [cur_response, undo_action] = concrete_request->process(storage, zxid, session_id, time);
response.responses[i] = cur_response;
if (cur_response->error != Coordination::Error::ZOK)
{
for (size_t j = 0; j <= i; ++j)
{
auto response_error = response.responses[j]->error;
response.responses[j] = std::make_shared<Coordination::ZooKeeperErrorResponse>();
response.responses[j]->error = response_error;
}
for (size_t j = i + 1; j < response.responses.size(); ++j)
{
response.responses[j] = std::make_shared<Coordination::ZooKeeperErrorResponse>();
response.responses[j]->error = Coordination::Error::ZRUNTIMEINCONSISTENCY;
}
for (auto it = undo_actions.rbegin(); it != undo_actions.rend(); ++it)
if (*it)
(*it)();
return {response_ptr, {}};
}
else
undo_actions.emplace_back(std::move(undo_action));
++i;
response.responses[i] = std::make_shared<Coordination::ZooKeeperErrorResponse>();
response.responses[i]->error = failed_multi->error_codes[i];
}
response.error = Coordination::Error::ZOK;
return response_ptr;
}
size_t i = 0;
for (const auto & concrete_request : concrete_requests)
{
auto cur_response = concrete_request->process(storage, zxid, session_id, time);
while (!deltas.empty())
{
if (std::holds_alternative<KeeperStorage::SubDeltaEnd>(deltas.front().operation))
{
deltas.pop_front();
break;
}
deltas.pop_front();
}
response.responses[i] = cur_response;
++i;
}
response.error = Coordination::Error::ZOK;
return response_ptr;
}
KeeperStorage::ResponsesForSessions
@ -1260,7 +1290,7 @@ struct KeeperStorageAuthRequestProcessor final : public KeeperStorageRequestProc
}
}
return {response_ptr, {}};
return response_ptr;
}
};
@ -1344,25 +1374,14 @@ KeeperStorageRequestProcessorsFactory::KeeperStorageRequestProcessorsFactory()
}
std::optional<int64_t>
KeeperStorage::preprocessRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, int64_t time, int64_t new_last_zxid)
void KeeperStorage::preprocessRequest(
const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, int64_t time, int64_t new_last_zxid)
{
if (new_last_zxid <= current_nodes.current_zxid)
return std::nullopt;
current_nodes.current_zxid = new_last_zxid;
KeeperStorageRequestProcessorPtr request_processor = KeeperStorageRequestProcessorsFactory::instance().get(request);
auto new_deltas = request_processor->preprocess(*this, new_last_zxid, session_id, time);
current_nodes.deltas.insert(
current_nodes.deltas.end(), std::make_move_iterator(new_deltas.begin()), std::make_move_iterator(new_deltas.end()));
switch (request->getOpNum())
{
using enum Coordination::OpNum;
case Create:
return new_last_zxid;
default:
return std::nullopt;
}
}
KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
@ -1424,7 +1443,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat) /// Heartbeat request is also special
{
KeeperStorageRequestProcessorPtr storage_request = KeeperStorageRequestProcessorsFactory::instance().get(zk_request);
auto [response, _] = storage_request->process(*this, zxid, session_id, time);
auto response = storage_request->process(*this, zxid, session_id, time);
response->xid = zk_request->xid;
response->zxid = getZXID();
@ -1443,15 +1462,8 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
}
else
{
std::tie(response, std::ignore) = request_processor->process(*this, zxid, session_id, time);
auto & deltas = current_nodes.deltas;
while (!deltas.empty())
{
if (deltas.front().zxid > zxid)
break;
deltas.pop_front();
}
response = request_processor->process(*this, zxid, session_id, time);
std::erase_if(current_nodes.deltas, [this](const auto & delta) { return delta.zxid == zxid; });
}
/// Watches for this requests are added to the watches lists

View File

@ -143,7 +143,16 @@ public:
Coordination::Error error;
};
using Operation = std::variant<CreateNodeDelta, RemoveNodeDelta, UpdateNodeDelta, SetACLDelta, ErrorDelta>;
struct FailedMultiDelta
{
std::vector<Coordination::Error> error_codes;
};
struct SubDeltaEnd
{
};
using Operation = std::variant<CreateNodeDelta, RemoveNodeDelta, UpdateNodeDelta, SetACLDelta, ErrorDelta, SubDeltaEnd, FailedMultiDelta>;
struct Delta
{
@ -153,8 +162,16 @@ public:
, operation(std::move(operation_))
{}
Delta(int64_t zxid_, ErrorDelta error)
: Delta("", zxid_, error)
Delta(int64_t zxid_, Coordination::Error error)
: Delta("", zxid_, ErrorDelta{error})
{}
Delta(int64_t zxid_, SubDeltaEnd subdelta)
: Delta("", zxid_, subdelta)
{}
Delta(int64_t zxid_, FailedMultiDelta failed_multi)
: Delta("", zxid_, failed_multi)
{}
String path;
@ -244,7 +261,7 @@ public:
/// Process user request and return response.
/// check_acl = false only when converting data from ZooKeeper.
ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, int64_t time, std::optional<int64_t> new_last_zxid, bool check_acl = true);
std::optional<int64_t> preprocessRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, int64_t time, int64_t new_last_zxid);
void preprocessRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, int64_t time, int64_t new_last_zxid);
void finalize();