some preparement to support RocksDB as alternative keeper storage

This commit is contained in:
Han Fei 2023-11-11 19:37:30 +01:00
parent a5e17dc919
commit 3bccce2eae
15 changed files with 900 additions and 669 deletions

View File

@ -44,16 +44,17 @@ int mainEntryClickHouseKeeperConverter(int argc, char ** argv)
keeper_context->setDigestEnabled(true); keeper_context->setDigestEnabled(true);
keeper_context->setSnapshotDisk(std::make_shared<DiskLocal>("Keeper-snapshots", options["output-dir"].as<std::string>())); keeper_context->setSnapshotDisk(std::make_shared<DiskLocal>("Keeper-snapshots", options["output-dir"].as<std::string>()));
DB::KeeperStorage storage(/* tick_time_ms */ 500, /* superdigest */ "", keeper_context, /* initialize_system_nodes */ false); /// TODO(hanfei): support rocksdb here
DB::KeeperMemoryStorage storage(/* tick_time_ms */ 500, /* superdigest */ "", keeper_context, /* initialize_system_nodes */ false);
DB::deserializeKeeperStorageFromSnapshotsDir(storage, options["zookeeper-snapshots-dir"].as<std::string>(), logger); DB::deserializeKeeperStorageFromSnapshotsDir(storage, options["zookeeper-snapshots-dir"].as<std::string>(), logger);
storage.initializeSystemNodes(); storage.initializeSystemNodes();
DB::deserializeLogsAndApplyToStorage(storage, options["zookeeper-logs-dir"].as<std::string>(), logger); DB::deserializeLogsAndApplyToStorage(storage, options["zookeeper-logs-dir"].as<std::string>(), logger);
DB::SnapshotMetadataPtr snapshot_meta = std::make_shared<DB::SnapshotMetadata>(storage.getZXID(), 1, std::make_shared<nuraft::cluster_config>()); DB::SnapshotMetadataPtr snapshot_meta = std::make_shared<DB::SnapshotMetadata>(storage.getZXID(), 1, std::make_shared<nuraft::cluster_config>());
DB::KeeperStorageSnapshot snapshot(&storage, snapshot_meta); DB::KeeperStorageSnapshot<DB::KeeperMemoryStorage> snapshot(&storage, snapshot_meta);
DB::KeeperSnapshotManager manager(1, keeper_context); DB::KeeperSnapshotManager<DB::KeeperMemoryStorage> manager(1, keeper_context);
auto snp = manager.serializeSnapshotToBuffer(snapshot); auto snp = manager.serializeSnapshotToBuffer(snapshot);
auto file_info = manager.serializeSnapshotBufferToDisk(*snp, storage.getZXID()); auto file_info = manager.serializeSnapshotBufferToDisk(*snp, storage.getZXID());
std::cout << "Snapshot serialized to path:" << fs::path(file_info.disk->getPath()) / file_info.path << std::endl; std::cout << "Snapshot serialized to path:" << fs::path(file_info.disk->getPath()) / file_info.path << std::endl;

View File

@ -66,13 +66,13 @@ void KeeperDispatcher::requestThread()
RaftAppendResult prev_result = nullptr; RaftAppendResult prev_result = nullptr;
/// Requests from previous iteration. We store them to be able /// Requests from previous iteration. We store them to be able
/// to send errors to the client. /// to send errors to the client.
KeeperStorage::RequestsForSessions prev_batch; KeeperStorageBase::RequestsForSessions prev_batch;
auto & shutdown_called = keeper_context->shutdown_called; auto & shutdown_called = keeper_context->shutdown_called;
while (!shutdown_called) while (!shutdown_called)
{ {
KeeperStorage::RequestForSession request; KeeperStorageBase::RequestForSession request;
auto coordination_settings = configuration_and_settings->coordination_settings; auto coordination_settings = configuration_and_settings->coordination_settings;
uint64_t max_wait = coordination_settings->operation_timeout_ms.totalMilliseconds(); uint64_t max_wait = coordination_settings->operation_timeout_ms.totalMilliseconds();
@ -93,7 +93,7 @@ void KeeperDispatcher::requestThread()
if (shutdown_called) if (shutdown_called)
break; break;
KeeperStorage::RequestsForSessions current_batch; KeeperStorageBase::RequestsForSessions current_batch;
size_t current_batch_bytes_size = 0; size_t current_batch_bytes_size = 0;
bool has_read_request = false; bool has_read_request = false;
@ -244,7 +244,7 @@ void KeeperDispatcher::responseThread()
auto & shutdown_called = keeper_context->shutdown_called; auto & shutdown_called = keeper_context->shutdown_called;
while (!shutdown_called) while (!shutdown_called)
{ {
KeeperStorage::ResponseForSession response_for_session; KeeperStorageBase::ResponseForSession response_for_session;
uint64_t max_wait = configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds(); uint64_t max_wait = configuration_and_settings->coordination_settings->operation_timeout_ms.totalMilliseconds();
@ -340,7 +340,7 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
return false; return false;
} }
KeeperStorage::RequestForSession request_info; KeeperStorageBase::RequestForSession request_info;
request_info.request = request; request_info.request = request;
using namespace std::chrono; using namespace std::chrono;
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
@ -386,7 +386,7 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
snapshots_queue, snapshots_queue,
keeper_context, keeper_context,
snapshot_s3, snapshot_s3,
[this](uint64_t log_idx, const KeeperStorage::RequestForSession & request_for_session) [this](uint64_t log_idx, const KeeperStorageBase::RequestForSession & request_for_session)
{ {
{ {
/// check if we have queue of read requests depending on this request to be committed /// check if we have queue of read requests depending on this request to be committed
@ -486,7 +486,7 @@ void KeeperDispatcher::shutdown()
update_configuration_thread.join(); update_configuration_thread.join();
} }
KeeperStorage::RequestForSession request_for_session; KeeperStorageBase::RequestForSession request_for_session;
/// Set session expired for all pending requests /// Set session expired for all pending requests
while (requests_queue && requests_queue->tryPop(request_for_session)) while (requests_queue && requests_queue->tryPop(request_for_session))
@ -497,7 +497,7 @@ void KeeperDispatcher::shutdown()
setResponse(request_for_session.session_id, response); setResponse(request_for_session.session_id, response);
} }
KeeperStorage::RequestsForSessions close_requests; KeeperStorageBase::RequestsForSessions close_requests;
{ {
/// Clear all registered sessions /// Clear all registered sessions
std::lock_guard lock(session_to_response_callback_mutex); std::lock_guard lock(session_to_response_callback_mutex);
@ -511,7 +511,7 @@ void KeeperDispatcher::shutdown()
auto request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close); auto request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
request->xid = Coordination::CLOSE_XID; request->xid = Coordination::CLOSE_XID;
using namespace std::chrono; using namespace std::chrono;
KeeperStorage::RequestForSession request_info KeeperStorageBase::RequestForSession request_info
{ {
.session_id = session, .session_id = session,
.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(), .time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(),
@ -609,7 +609,7 @@ void KeeperDispatcher::sessionCleanerTask()
auto request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close); auto request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
request->xid = Coordination::CLOSE_XID; request->xid = Coordination::CLOSE_XID;
using namespace std::chrono; using namespace std::chrono;
KeeperStorage::RequestForSession request_info KeeperStorageBase::RequestForSession request_info
{ {
.session_id = dead_session, .session_id = dead_session,
.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(), .time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(),
@ -657,16 +657,16 @@ void KeeperDispatcher::finishSession(int64_t session_id)
} }
} }
void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSessions & requests_for_sessions, Coordination::Error error) void KeeperDispatcher::addErrorResponses(const KeeperStorageBase::RequestsForSessions & requests_for_sessions, Coordination::Error error)
{ {
for (const auto & request_for_session : requests_for_sessions) for (const auto & request_for_session : requests_for_sessions)
{ {
KeeperStorage::ResponsesForSessions responses; KeeperStorageBase::ResponsesForSessions responses;
auto response = request_for_session.request->makeResponse(); auto response = request_for_session.request->makeResponse();
response->xid = request_for_session.request->xid; response->xid = request_for_session.request->xid;
response->zxid = 0; response->zxid = 0;
response->error = error; response->error = error;
if (!responses_queue.push(DB::KeeperStorage::ResponseForSession{request_for_session.session_id, response})) if (!responses_queue.push(DB::KeeperStorageBase::ResponseForSession{request_for_session.session_id, response}))
throw Exception(ErrorCodes::SYSTEM_ERROR, throw Exception(ErrorCodes::SYSTEM_ERROR,
"Could not push error response xid {} zxid {} error message {} to responses queue", "Could not push error response xid {} zxid {} error message {} to responses queue",
response->xid, response->xid,
@ -675,7 +675,7 @@ void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSession
} }
} }
nuraft::ptr<nuraft::buffer> KeeperDispatcher::forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions) nuraft::ptr<nuraft::buffer> KeeperDispatcher::forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorageBase::RequestsForSessions & requests_for_sessions)
{ {
if (!result->has_result()) if (!result->has_result())
result->get(); result->get();
@ -697,7 +697,7 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
{ {
/// New session id allocation is a special request, because we cannot process it in normal /// New session id allocation is a special request, because we cannot process it in normal
/// way: get request -> put to raft -> set response for registered callback. /// way: get request -> put to raft -> set response for registered callback.
KeeperStorage::RequestForSession request_info; KeeperStorageBase::RequestForSession request_info;
std::shared_ptr<Coordination::ZooKeeperSessionIDRequest> request = std::make_shared<Coordination::ZooKeeperSessionIDRequest>(); std::shared_ptr<Coordination::ZooKeeperSessionIDRequest> request = std::make_shared<Coordination::ZooKeeperSessionIDRequest>();
/// Internal session id. It's a temporary number which is unique for each client on this server /// Internal session id. It's a temporary number which is unique for each client on this server
/// but can be same on different servers. /// but can be same on different servers.

View File

@ -27,7 +27,7 @@ using ZooKeeperResponseCallback = std::function<void(const Coordination::ZooKeep
class KeeperDispatcher class KeeperDispatcher
{ {
private: private:
using RequestsQueue = ConcurrentBoundedQueue<KeeperStorage::RequestForSession>; using RequestsQueue = ConcurrentBoundedQueue<KeeperStorageBase::RequestForSession>;
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>; using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
using ClusterUpdateQueue = ConcurrentBoundedQueue<ClusterUpdateAction>; using ClusterUpdateQueue = ConcurrentBoundedQueue<ClusterUpdateAction>;
@ -96,11 +96,11 @@ private:
/// Add error responses for requests to responses queue. /// Add error responses for requests to responses queue.
/// Clears requests. /// Clears requests.
void addErrorResponses(const KeeperStorage::RequestsForSessions & requests_for_sessions, Coordination::Error error); void addErrorResponses(const KeeperStorageBase::RequestsForSessions & requests_for_sessions, Coordination::Error error);
/// Forcefully wait for result and sets errors if something when wrong. /// Forcefully wait for result and sets errors if something when wrong.
/// Clears both arguments /// Clears both arguments
nuraft::ptr<nuraft::buffer> forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions); nuraft::ptr<nuraft::buffer> forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorageBase::RequestsForSessions & requests_for_sessions);
public: public:
std::mutex read_request_queue_mutex; std::mutex read_request_queue_mutex;
@ -108,7 +108,7 @@ public:
std::atomic<uint64_t> our_last_committed_log_idx = 0; std::atomic<uint64_t> our_last_committed_log_idx = 0;
/// queue of read requests that can be processed after a request with specific session ID and XID is committed /// queue of read requests that can be processed after a request with specific session ID and XID is committed
std::unordered_map<int64_t, std::unordered_map<Coordination::XID, KeeperStorage::RequestsForSessions>> read_request_queue; std::unordered_map<int64_t, std::unordered_map<Coordination::XID, KeeperStorageBase::RequestsForSessions>> read_request_queue;
/// Just allocate some objects, real initialization is done by `intialize method` /// Just allocate some objects, real initialization is done by `intialize method`
KeeperDispatcher(); KeeperDispatcher();
@ -189,7 +189,7 @@ public:
Keeper4LWInfo getKeeper4LWInfo() const; Keeper4LWInfo getKeeper4LWInfo() const;
const KeeperStateMachine & getStateMachine() const const IKeeperStateMachine & getStateMachine() const
{ {
return *server->getKeeperStateMachine(); return *server->getKeeperStateMachine();
} }

View File

@ -116,7 +116,7 @@ KeeperServer::KeeperServer(
SnapshotsQueue & snapshots_queue_, SnapshotsQueue & snapshots_queue_,
KeeperContextPtr keeper_context_, KeeperContextPtr keeper_context_,
KeeperSnapshotManagerS3 & snapshot_manager_s3, KeeperSnapshotManagerS3 & snapshot_manager_s3,
KeeperStateMachine::CommitCallback commit_callback) IKeeperStateMachine::CommitCallback commit_callback)
: server_id(configuration_and_settings_->server_id) : server_id(configuration_and_settings_->server_id)
, coordination_settings(configuration_and_settings_->coordination_settings) , coordination_settings(configuration_and_settings_->coordination_settings)
, log(&Poco::Logger::get("KeeperServer")) , log(&Poco::Logger::get("KeeperServer"))
@ -128,7 +128,7 @@ KeeperServer::KeeperServer(
if (coordination_settings->quorum_reads) if (coordination_settings->quorum_reads)
LOG_WARNING(log, "Quorum reads enabled, Keeper will work slower."); LOG_WARNING(log, "Quorum reads enabled, Keeper will work slower.");
state_machine = nuraft::cs_new<KeeperStateMachine>( state_machine = nuraft::cs_new<KeeperStateMachine<KeeperMemoryStorage>>(
responses_queue_, responses_queue_,
snapshots_queue_, snapshots_queue_,
coordination_settings, coordination_settings,
@ -478,7 +478,7 @@ namespace
{ {
// Serialize the request for the log entry // Serialize the request for the log entry
nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(const KeeperStorage::RequestForSession & request_for_session) nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(const KeeperStorageBase::RequestForSession & request_for_session)
{ {
DB::WriteBufferFromNuraftBuffer write_buf; DB::WriteBufferFromNuraftBuffer write_buf;
DB::writeIntBinary(request_for_session.session_id, write_buf); DB::writeIntBinary(request_for_session.session_id, write_buf);
@ -486,7 +486,7 @@ nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(const KeeperStorage::RequestFor
DB::writeIntBinary(request_for_session.time, write_buf); DB::writeIntBinary(request_for_session.time, write_buf);
/// we fill with dummy values to eliminate unnecessary copy later on when we will write correct values /// we fill with dummy values to eliminate unnecessary copy later on when we will write correct values
DB::writeIntBinary(static_cast<int64_t>(0), write_buf); /// zxid DB::writeIntBinary(static_cast<int64_t>(0), write_buf); /// zxid
DB::writeIntBinary(KeeperStorage::DigestVersion::NO_DIGEST, write_buf); /// digest version or NO_DIGEST flag DB::writeIntBinary(KeeperStorageBase::DigestVersion::NO_DIGEST, write_buf); /// digest version or NO_DIGEST flag
DB::writeIntBinary(static_cast<uint64_t>(0), write_buf); /// digest value DB::writeIntBinary(static_cast<uint64_t>(0), write_buf); /// digest value
/// if new fields are added, update KeeperStateMachine::ZooKeeperLogSerializationVersion along with parseRequest function and PreAppendLog callback handler /// if new fields are added, update KeeperStateMachine::ZooKeeperLogSerializationVersion along with parseRequest function and PreAppendLog callback handler
return write_buf.getBuffer(); return write_buf.getBuffer();
@ -494,7 +494,7 @@ nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(const KeeperStorage::RequestFor
} }
void KeeperServer::putLocalReadRequest(const KeeperStorage::RequestForSession & request_for_session) void KeeperServer::putLocalReadRequest(const KeeperStorageBase::RequestForSession & request_for_session)
{ {
if (!request_for_session.request->isReadRequest()) if (!request_for_session.request->isReadRequest())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot process non-read request locally"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot process non-read request locally");
@ -502,7 +502,7 @@ void KeeperServer::putLocalReadRequest(const KeeperStorage::RequestForSession &
state_machine->processReadRequest(request_for_session); state_machine->processReadRequest(request_for_session);
} }
RaftAppendResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForSessions & requests_for_sessions) RaftAppendResult KeeperServer::putRequestBatch(const KeeperStorageBase::RequestsForSessions & requests_for_sessions)
{ {
std::vector<nuraft::ptr<nuraft::buffer>> entries; std::vector<nuraft::ptr<nuraft::buffer>> entries;
for (const auto & request_for_session : requests_for_sessions) for (const auto & request_for_session : requests_for_sessions)
@ -725,7 +725,7 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
auto entry_buf = entry->get_buf_ptr(); auto entry_buf = entry->get_buf_ptr();
KeeperStateMachine::ZooKeeperLogSerializationVersion serialization_version; IKeeperStateMachine::ZooKeeperLogSerializationVersion serialization_version;
auto request_for_session = state_machine->parseRequest(*entry_buf, /*final=*/false, &serialization_version); auto request_for_session = state_machine->parseRequest(*entry_buf, /*final=*/false, &serialization_version);
request_for_session->zxid = next_zxid; request_for_session->zxid = next_zxid;
if (!state_machine->preprocess(*request_for_session)) if (!state_machine->preprocess(*request_for_session))
@ -735,10 +735,10 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
/// older versions of Keeper can send logs that are missing some fields /// older versions of Keeper can send logs that are missing some fields
size_t bytes_missing = 0; size_t bytes_missing = 0;
if (serialization_version < KeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_TIME) if (serialization_version < IKeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_TIME)
bytes_missing += sizeof(request_for_session->time); bytes_missing += sizeof(request_for_session->time);
if (serialization_version < KeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_ZXID_DIGEST) if (serialization_version < IKeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_ZXID_DIGEST)
bytes_missing += sizeof(request_for_session->zxid) + sizeof(request_for_session->digest->version) + sizeof(request_for_session->digest->value); bytes_missing += sizeof(request_for_session->zxid) + sizeof(request_for_session->digest->version) + sizeof(request_for_session->digest->value);
if (bytes_missing != 0) if (bytes_missing != 0)
@ -752,19 +752,19 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
size_t write_buffer_header_size size_t write_buffer_header_size
= sizeof(request_for_session->zxid) + sizeof(request_for_session->digest->version) + sizeof(request_for_session->digest->value); = sizeof(request_for_session->zxid) + sizeof(request_for_session->digest->version) + sizeof(request_for_session->digest->value);
if (serialization_version < KeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_TIME) if (serialization_version < IKeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_TIME)
write_buffer_header_size += sizeof(request_for_session->time); write_buffer_header_size += sizeof(request_for_session->time);
auto * buffer_start = reinterpret_cast<BufferBase::Position>(entry_buf->data_begin() + entry_buf->size() - write_buffer_header_size); auto * buffer_start = reinterpret_cast<BufferBase::Position>(entry_buf->data_begin() + entry_buf->size() - write_buffer_header_size);
WriteBufferFromPointer write_buf(buffer_start, write_buffer_header_size); WriteBufferFromPointer write_buf(buffer_start, write_buffer_header_size);
if (serialization_version < KeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_TIME) if (serialization_version < IKeeperStateMachine::ZooKeeperLogSerializationVersion::WITH_TIME)
writeIntBinary(request_for_session->time, write_buf); writeIntBinary(request_for_session->time, write_buf);
writeIntBinary(request_for_session->zxid, write_buf); writeIntBinary(request_for_session->zxid, write_buf);
writeIntBinary(request_for_session->digest->version, write_buf); writeIntBinary(request_for_session->digest->version, write_buf);
if (request_for_session->digest->version != KeeperStorage::NO_DIGEST) if (request_for_session->digest->version != KeeperStorageBase::NO_DIGEST)
writeIntBinary(request_for_session->digest->value, write_buf); writeIntBinary(request_for_session->digest->value, write_buf);
write_buf.finalize(); write_buf.finalize();

View File

@ -24,7 +24,7 @@ private:
CoordinationSettingsPtr coordination_settings; CoordinationSettingsPtr coordination_settings;
nuraft::ptr<KeeperStateMachine> state_machine; nuraft::ptr<IKeeperStateMachine> state_machine;
nuraft::ptr<KeeperStateManager> state_manager; nuraft::ptr<KeeperStateManager> state_manager;
@ -79,26 +79,26 @@ public:
SnapshotsQueue & snapshots_queue_, SnapshotsQueue & snapshots_queue_,
KeeperContextPtr keeper_context_, KeeperContextPtr keeper_context_,
KeeperSnapshotManagerS3 & snapshot_manager_s3, KeeperSnapshotManagerS3 & snapshot_manager_s3,
KeeperStateMachine::CommitCallback commit_callback); IKeeperStateMachine::CommitCallback commit_callback);
/// Load state machine from the latest snapshot and load log storage. Start NuRaft with required settings. /// Load state machine from the latest snapshot and load log storage. Start NuRaft with required settings.
void startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6 = true); void startup(const Poco::Util::AbstractConfiguration & config, bool enable_ipv6 = true);
/// Put local read request and execute in state machine directly and response into /// Put local read request and execute in state machine directly and response into
/// responses queue /// responses queue
void putLocalReadRequest(const KeeperStorage::RequestForSession & request); void putLocalReadRequest(const KeeperStorageBase::RequestForSession & request);
bool isRecovering() const { return is_recovering; } bool isRecovering() const { return is_recovering; }
bool reconfigEnabled() const { return enable_reconfiguration; } bool reconfigEnabled() const { return enable_reconfiguration; }
/// Put batch of requests into Raft and get result of put. Responses will be set separately into /// Put batch of requests into Raft and get result of put. Responses will be set separately into
/// responses_queue. /// responses_queue.
RaftAppendResult putRequestBatch(const KeeperStorage::RequestsForSessions & requests); RaftAppendResult putRequestBatch(const KeeperStorageBase::RequestsForSessions & requests);
/// Return set of the non-active sessions /// Return set of the non-active sessions
std::vector<int64_t> getDeadSessions(); std::vector<int64_t> getDeadSessions();
nuraft::ptr<KeeperStateMachine> getKeeperStateMachine() const { return state_machine; } nuraft::ptr<IKeeperStateMachine> getKeeperStateMachine() const { return state_machine; }
void forceRecovery(); void forceRecovery();

View File

@ -68,7 +68,8 @@ namespace
return base; return base;
} }
void writeNode(const KeeperStorage::Node & node, SnapshotVersion version, WriteBuffer & out) template<typename Node>
void writeNode(const Node & node, SnapshotVersion version, WriteBuffer & out)
{ {
writeBinary(node.getData(), out); writeBinary(node.getData(), out);
@ -96,7 +97,8 @@ namespace
} }
} }
void readNode(KeeperStorage::Node & node, ReadBuffer & in, SnapshotVersion version, ACLMap & acl_map) template<typename Node>
void readNode(Node & node, ReadBuffer & in, SnapshotVersion version, ACLMap & acl_map)
{ {
String new_data; String new_data;
readBinary(new_data, in); readBinary(new_data, in);
@ -169,7 +171,8 @@ namespace
} }
} }
void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, WriteBuffer & out, KeeperContextPtr keeper_context) template<typename Storage>
void KeeperStorageSnapshot<Storage>::serialize(const KeeperStorageSnapshot<Storage> & snapshot, WriteBuffer & out, KeeperContextPtr keeper_context)
{ {
writeBinary(static_cast<uint8_t>(snapshot.version), out); writeBinary(static_cast<uint8_t>(snapshot.version), out);
serializeSnapshotMetadata(snapshot.snapshot_meta, out); serializeSnapshotMetadata(snapshot.snapshot_meta, out);
@ -179,11 +182,11 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr
writeBinary(snapshot.zxid, out); writeBinary(snapshot.zxid, out);
if (keeper_context->digestEnabled()) if (keeper_context->digestEnabled())
{ {
writeBinary(static_cast<uint8_t>(KeeperStorage::CURRENT_DIGEST_VERSION), out); writeBinary(static_cast<uint8_t>(Storage::CURRENT_DIGEST_VERSION), out);
writeBinary(snapshot.nodes_digest, out); writeBinary(snapshot.nodes_digest, out);
} }
else else
writeBinary(static_cast<uint8_t>(KeeperStorage::NO_DIGEST), out); writeBinary(static_cast<uint8_t>(Storage::NO_DIGEST), out);
} }
writeBinary(snapshot.session_id, out); writeBinary(snapshot.session_id, out);
@ -256,7 +259,7 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr
writeBinary(session_id, out); writeBinary(session_id, out);
writeBinary(timeout, out); writeBinary(timeout, out);
KeeperStorage::AuthIDs ids; KeeperStorageBase::AuthIDs ids;
if (snapshot.session_and_auth.contains(session_id)) if (snapshot.session_and_auth.contains(session_id))
ids = snapshot.session_and_auth.at(session_id); ids = snapshot.session_and_auth.at(session_id);
@ -277,7 +280,8 @@ void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, Wr
} }
} }
void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserialization_result, ReadBuffer & in, KeeperContextPtr keeper_context) template<typename Storage>
void KeeperStorageSnapshot<Storage>::deserialize(SnapshotDeserializationResult<Storage> & deserialization_result, ReadBuffer & in, KeeperContextPtr keeper_context)
{ {
uint8_t version; uint8_t version;
readBinary(version, in); readBinary(version, in);
@ -286,7 +290,7 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported snapshot version {}", version); throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported snapshot version {}", version);
deserialization_result.snapshot_meta = deserializeSnapshotMetadata(in); deserialization_result.snapshot_meta = deserializeSnapshotMetadata(in);
KeeperStorage & storage = *deserialization_result.storage; Storage & storage = *deserialization_result.storage;
bool recalculate_digest = keeper_context->digestEnabled(); bool recalculate_digest = keeper_context->digestEnabled();
if (version >= SnapshotVersion::V5) if (version >= SnapshotVersion::V5)
@ -294,11 +298,11 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
readBinary(storage.zxid, in); readBinary(storage.zxid, in);
uint8_t digest_version; uint8_t digest_version;
readBinary(digest_version, in); readBinary(digest_version, in);
if (digest_version != KeeperStorage::DigestVersion::NO_DIGEST) if (digest_version != Storage::DigestVersion::NO_DIGEST)
{ {
uint64_t nodes_digest; uint64_t nodes_digest;
readBinary(nodes_digest, in); readBinary(nodes_digest, in);
if (digest_version == KeeperStorage::CURRENT_DIGEST_VERSION) if (digest_version == Storage::CURRENT_DIGEST_VERSION)
{ {
storage.nodes_digest = nodes_digest; storage.nodes_digest = nodes_digest;
recalculate_digest = false; recalculate_digest = false;
@ -361,7 +365,7 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
{ {
std::string path; std::string path;
readBinary(path, in); readBinary(path, in);
KeeperStorage::Node node{}; typename Storage::Node node{};
readNode(node, in, current_version, storage.acl_map); readNode(node, in, current_version, storage.acl_map);
using enum Coordination::PathMatchResult; using enum Coordination::PathMatchResult;
@ -389,7 +393,7 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
if (keeper_context->ignoreSystemPathOnStartup() || keeper_context->getServerState() != KeeperContext::Phase::INIT) if (keeper_context->ignoreSystemPathOnStartup() || keeper_context->getServerState() != KeeperContext::Phase::INIT)
{ {
LOG_ERROR(&Poco::Logger::get("KeeperSnapshotManager"), "{}. Ignoring it", error_msg); LOG_ERROR(&Poco::Logger::get("KeeperSnapshotManager"), "{}. Ignoring it", error_msg);
node = KeeperStorage::Node{}; node = typename Storage::Node{};
} }
else else
throw Exception( throw Exception(
@ -417,7 +421,7 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
{ {
auto parent_path = parentNodePath(itr.key); auto parent_path = parentNodePath(itr.key);
storage.container.updateValue( storage.container.updateValue(
parent_path, [version, path = itr.key](KeeperStorage::Node & value) { value.addChild(getBaseNodeName(path), /*update_size*/ version < SnapshotVersion::V4); }); parent_path, [version, path = itr.key](typename Storage::Node & value) { value.addChild(getBaseNodeName(path), /*update_size*/ version < SnapshotVersion::V4); });
} }
} }
@ -457,14 +461,14 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
size_t session_auths_size; size_t session_auths_size;
readBinary(session_auths_size, in); readBinary(session_auths_size, in);
KeeperStorage::AuthIDs ids; typename Storage::AuthIDs ids;
size_t session_auth_counter = 0; size_t session_auth_counter = 0;
while (session_auth_counter < session_auths_size) while (session_auth_counter < session_auths_size)
{ {
String scheme, id; String scheme, id;
readBinary(scheme, in); readBinary(scheme, in);
readBinary(id, in); readBinary(id, in);
ids.emplace_back(KeeperStorage::AuthID{scheme, id}); ids.emplace_back(typename Storage::AuthID{scheme, id});
session_auth_counter++; session_auth_counter++;
} }
@ -487,7 +491,8 @@ void KeeperStorageSnapshot::deserialize(SnapshotDeserializationResult & deserial
} }
} }
KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t up_to_log_idx_, const ClusterConfigPtr & cluster_config_) template<typename Storage>
KeeperStorageSnapshot<Storage>::KeeperStorageSnapshot(Storage * storage_, uint64_t up_to_log_idx_, const ClusterConfigPtr & cluster_config_)
: storage(storage_) : storage(storage_)
, snapshot_meta(std::make_shared<SnapshotMetadata>(up_to_log_idx_, 0, std::make_shared<nuraft::cluster_config>())) , snapshot_meta(std::make_shared<SnapshotMetadata>(up_to_log_idx_, 0, std::make_shared<nuraft::cluster_config>()))
, session_id(storage->session_id_counter) , session_id(storage->session_id_counter)
@ -504,8 +509,9 @@ KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t
session_and_auth = storage->session_and_auth; session_and_auth = storage->session_and_auth;
} }
KeeperStorageSnapshot::KeeperStorageSnapshot( template<typename Storage>
KeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_, const ClusterConfigPtr & cluster_config_) KeeperStorageSnapshot<Storage>::KeeperStorageSnapshot(
Storage * storage_, const SnapshotMetadataPtr & snapshot_meta_, const ClusterConfigPtr & cluster_config_)
: storage(storage_) : storage(storage_)
, snapshot_meta(snapshot_meta_) , snapshot_meta(snapshot_meta_)
, session_id(storage->session_id_counter) , session_id(storage->session_id_counter)
@ -522,12 +528,14 @@ KeeperStorageSnapshot::KeeperStorageSnapshot(
session_and_auth = storage->session_and_auth; session_and_auth = storage->session_and_auth;
} }
KeeperStorageSnapshot::~KeeperStorageSnapshot() template<typename Storage>
KeeperStorageSnapshot<Storage>::~KeeperStorageSnapshot()
{ {
storage->disableSnapshotMode(); storage->disableSnapshotMode();
} }
KeeperSnapshotManager::KeeperSnapshotManager( template<typename Storage>
KeeperSnapshotManager<Storage>::KeeperSnapshotManager(
size_t snapshots_to_keep_, size_t snapshots_to_keep_,
const KeeperContextPtr & keeper_context_, const KeeperContextPtr & keeper_context_,
bool compress_snapshots_zstd_, bool compress_snapshots_zstd_,
@ -606,7 +614,8 @@ KeeperSnapshotManager::KeeperSnapshotManager(
moveSnapshotsIfNeeded(); moveSnapshotsIfNeeded();
} }
SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx) template<typename Storage>
SnapshotFileInfo KeeperSnapshotManager<Storage>::serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx)
{ {
ReadBufferFromNuraftBuffer reader(buffer); ReadBufferFromNuraftBuffer reader(buffer);
@ -634,7 +643,8 @@ SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::bu
return {snapshot_file_name, disk}; return {snapshot_file_name, disk};
} }
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeLatestSnapshotBufferFromDisk() template<typename Storage>
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager<Storage>::deserializeLatestSnapshotBufferFromDisk()
{ {
while (!existing_snapshots.empty()) while (!existing_snapshots.empty())
{ {
@ -655,7 +665,8 @@ nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeLatestSnapshotBuff
return nullptr; return nullptr;
} }
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeSnapshotBufferFromDisk(uint64_t up_to_log_idx) const template<typename Storage>
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager<Storage>::deserializeSnapshotBufferFromDisk(uint64_t up_to_log_idx) const
{ {
const auto & [snapshot_path, snapshot_disk] = existing_snapshots.at(up_to_log_idx); const auto & [snapshot_path, snapshot_disk] = existing_snapshots.at(up_to_log_idx);
WriteBufferFromNuraftBuffer writer; WriteBufferFromNuraftBuffer writer;
@ -664,7 +675,8 @@ nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeSnapshotBufferFrom
return writer.getBuffer(); return writer.getBuffer();
} }
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::serializeSnapshotToBuffer(const KeeperStorageSnapshot & snapshot) const template<typename Storage>
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager<Storage>::serializeSnapshotToBuffer(const KeeperStorageSnapshot<Storage> & snapshot) const
{ {
std::unique_ptr<WriteBufferFromNuraftBuffer> writer = std::make_unique<WriteBufferFromNuraftBuffer>(); std::unique_ptr<WriteBufferFromNuraftBuffer> writer = std::make_unique<WriteBufferFromNuraftBuffer>();
auto * buffer_raw_ptr = writer.get(); auto * buffer_raw_ptr = writer.get();
@ -674,13 +686,13 @@ nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::serializeSnapshotToBuffer(con
else else
compressed_writer = std::make_unique<CompressedWriteBuffer>(*writer); compressed_writer = std::make_unique<CompressedWriteBuffer>(*writer);
KeeperStorageSnapshot::serialize(snapshot, *compressed_writer, keeper_context); KeeperStorageSnapshot<Storage>::serialize(snapshot, *compressed_writer, keeper_context);
compressed_writer->finalize(); compressed_writer->finalize();
return buffer_raw_ptr->getBuffer(); return buffer_raw_ptr->getBuffer();
} }
template<typename Storage>
bool KeeperSnapshotManager::isZstdCompressed(nuraft::ptr<nuraft::buffer> buffer) bool KeeperSnapshotManager<Storage>::isZstdCompressed(nuraft::ptr<nuraft::buffer> buffer)
{ {
static constexpr unsigned char ZSTD_COMPRESSED_MAGIC[4] = {0x28, 0xB5, 0x2F, 0xFD}; static constexpr unsigned char ZSTD_COMPRESSED_MAGIC[4] = {0x28, 0xB5, 0x2F, 0xFD};
@ -691,7 +703,8 @@ bool KeeperSnapshotManager::isZstdCompressed(nuraft::ptr<nuraft::buffer> buffer)
return memcmp(magic_from_buffer, ZSTD_COMPRESSED_MAGIC, 4) == 0; return memcmp(magic_from_buffer, ZSTD_COMPRESSED_MAGIC, 4) == 0;
} }
SnapshotDeserializationResult KeeperSnapshotManager::deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const template<typename Storage>
SnapshotDeserializationResult<Storage> KeeperSnapshotManager<Storage>::deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const
{ {
bool is_zstd_compressed = isZstdCompressed(buffer); bool is_zstd_compressed = isZstdCompressed(buffer);
@ -703,14 +716,15 @@ SnapshotDeserializationResult KeeperSnapshotManager::deserializeSnapshotFromBuff
else else
compressed_reader = std::make_unique<CompressedReadBuffer>(*reader); compressed_reader = std::make_unique<CompressedReadBuffer>(*reader);
SnapshotDeserializationResult result; SnapshotDeserializationResult<Storage> result;
result.storage = std::make_unique<KeeperStorage>(storage_tick_time, superdigest, keeper_context, /* initialize_system_nodes */ false); result.storage = std::make_unique<Storage>(storage_tick_time, superdigest, keeper_context, /* initialize_system_nodes */ false);
KeeperStorageSnapshot::deserialize(result, *compressed_reader, keeper_context); KeeperStorageSnapshot<Storage>::deserialize(result, *compressed_reader, keeper_context);
result.storage->initializeSystemNodes(); result.storage->initializeSystemNodes();
return result; return result;
} }
SnapshotDeserializationResult KeeperSnapshotManager::restoreFromLatestSnapshot() template<typename Storage>
SnapshotDeserializationResult<Storage> KeeperSnapshotManager<Storage>::restoreFromLatestSnapshot()
{ {
if (existing_snapshots.empty()) if (existing_snapshots.empty())
return {}; return {};
@ -721,23 +735,27 @@ SnapshotDeserializationResult KeeperSnapshotManager::restoreFromLatestSnapshot()
return deserializeSnapshotFromBuffer(buffer); return deserializeSnapshotFromBuffer(buffer);
} }
DiskPtr KeeperSnapshotManager::getDisk() const template<typename Storage>
DiskPtr KeeperSnapshotManager<Storage>::getDisk() const
{ {
return keeper_context->getSnapshotDisk(); return keeper_context->getSnapshotDisk();
} }
DiskPtr KeeperSnapshotManager::getLatestSnapshotDisk() const template<typename Storage>
DiskPtr KeeperSnapshotManager<Storage>::getLatestSnapshotDisk() const
{ {
return keeper_context->getLatestSnapshotDisk(); return keeper_context->getLatestSnapshotDisk();
} }
void KeeperSnapshotManager::removeOutdatedSnapshotsIfNeeded() template<typename Storage>
void KeeperSnapshotManager<Storage>::removeOutdatedSnapshotsIfNeeded()
{ {
while (existing_snapshots.size() > snapshots_to_keep) while (existing_snapshots.size() > snapshots_to_keep)
removeSnapshot(existing_snapshots.begin()->first); removeSnapshot(existing_snapshots.begin()->first);
} }
void KeeperSnapshotManager::moveSnapshotsIfNeeded() template<typename Storage>
void KeeperSnapshotManager<Storage>::moveSnapshotsIfNeeded()
{ {
/// move snapshots to correct disks /// move snapshots to correct disks
@ -767,7 +785,8 @@ void KeeperSnapshotManager::moveSnapshotsIfNeeded()
} }
void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx) template<typename Storage>
void KeeperSnapshotManager<Storage>::removeSnapshot(uint64_t log_idx)
{ {
auto itr = existing_snapshots.find(log_idx); auto itr = existing_snapshots.find(log_idx);
if (itr == existing_snapshots.end()) if (itr == existing_snapshots.end())
@ -777,7 +796,8 @@ void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx)
existing_snapshots.erase(itr); existing_snapshots.erase(itr);
} }
SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotToDisk(const KeeperStorageSnapshot & snapshot) template<typename Storage>
SnapshotFileInfo KeeperSnapshotManager<Storage>::serializeSnapshotToDisk(const KeeperStorageSnapshot<Storage> & snapshot)
{ {
auto up_to_log_idx = snapshot.snapshot_meta->get_last_log_idx(); auto up_to_log_idx = snapshot.snapshot_meta->get_last_log_idx();
auto snapshot_file_name = getSnapshotFileName(up_to_log_idx, compress_snapshots_zstd); auto snapshot_file_name = getSnapshotFileName(up_to_log_idx, compress_snapshots_zstd);
@ -796,7 +816,7 @@ SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotToDisk(const KeeperStor
else else
compressed_writer = std::make_unique<CompressedWriteBuffer>(*writer); compressed_writer = std::make_unique<CompressedWriteBuffer>(*writer);
KeeperStorageSnapshot::serialize(snapshot, *compressed_writer, keeper_context); KeeperStorageSnapshot<Storage>::serialize(snapshot, *compressed_writer, keeper_context);
compressed_writer->finalize(); compressed_writer->finalize();
compressed_writer->sync(); compressed_writer->sync();
@ -809,4 +829,7 @@ SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotToDisk(const KeeperStor
return {snapshot_file_name, disk}; return {snapshot_file_name, disk};
} }
template struct KeeperStorageSnapshot<KeeperMemoryStorage>;
template class KeeperSnapshotManager<KeeperMemoryStorage>;
} }

View File

@ -29,16 +29,19 @@ enum SnapshotVersion : uint8_t
static constexpr auto CURRENT_SNAPSHOT_VERSION = SnapshotVersion::V5; static constexpr auto CURRENT_SNAPSHOT_VERSION = SnapshotVersion::V5;
/// What is stored in binary snapshot /// What is stored in binary snapshot
template<typename Storage>
struct SnapshotDeserializationResult struct SnapshotDeserializationResult
{ {
/// Storage /// Storage
KeeperStoragePtr storage; std::unique_ptr<Storage> storage;
/// Snapshot metadata (up_to_log_idx and so on) /// Snapshot metadata (up_to_log_idx and so on)
SnapshotMetadataPtr snapshot_meta; SnapshotMetadataPtr snapshot_meta;
/// Cluster config /// Cluster config
ClusterConfigPtr cluster_config; ClusterConfigPtr cluster_config;
}; };
struct IKeeperStorageSnapshot {};
/// In memory keeper snapshot. Keeper Storage based on a hash map which can be /// In memory keeper snapshot. Keeper Storage based on a hash map which can be
/// turned into snapshot mode. This operation is fast and KeeperStorageSnapshot /// turned into snapshot mode. This operation is fast and KeeperStorageSnapshot
/// class do it in constructor. It also copies iterators from storage hash table /// class do it in constructor. It also copies iterators from storage hash table
@ -47,21 +50,22 @@ struct SnapshotDeserializationResult
/// ///
/// This representation of snapshot have to be serialized into NuRaft /// This representation of snapshot have to be serialized into NuRaft
/// buffer and send over network or saved to file. /// buffer and send over network or saved to file.
struct KeeperStorageSnapshot template<typename Storage>
struct KeeperStorageSnapshot : IKeeperStorageSnapshot
{ {
public: public:
KeeperStorageSnapshot(KeeperStorage * storage_, uint64_t up_to_log_idx_, const ClusterConfigPtr & cluster_config_ = nullptr); KeeperStorageSnapshot(Storage * storage_, uint64_t up_to_log_idx_, const ClusterConfigPtr & cluster_config_ = nullptr);
KeeperStorageSnapshot( KeeperStorageSnapshot(
KeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_, const ClusterConfigPtr & cluster_config_ = nullptr); Storage * storage_, const SnapshotMetadataPtr & snapshot_meta_, const ClusterConfigPtr & cluster_config_ = nullptr);
~KeeperStorageSnapshot(); ~KeeperStorageSnapshot();
static void serialize(const KeeperStorageSnapshot & snapshot, WriteBuffer & out, KeeperContextPtr keeper_context); static void serialize(const KeeperStorageSnapshot<Storage> & snapshot, WriteBuffer & out, KeeperContextPtr keeper_context);
static void deserialize(SnapshotDeserializationResult & deserialization_result, ReadBuffer & in, KeeperContextPtr keeper_context); static void deserialize(SnapshotDeserializationResult<Storage> & deserialization_result, ReadBuffer & in, KeeperContextPtr keeper_context);
KeeperStorage * storage; Storage * storage;
SnapshotVersion version = CURRENT_SNAPSHOT_VERSION; SnapshotVersion version = CURRENT_SNAPSHOT_VERSION;
/// Snapshot metadata /// Snapshot metadata
@ -72,11 +76,11 @@ public:
/// so we have for loop for (i = 0; i < snapshot_container_size; ++i) { doSmth(begin + i); } /// so we have for loop for (i = 0; i < snapshot_container_size; ++i) { doSmth(begin + i); }
size_t snapshot_container_size; size_t snapshot_container_size;
/// Iterator to the start of the storage /// Iterator to the start of the storage
KeeperStorage::Container::const_iterator begin; Storage::Container::const_iterator begin;
/// Active sessions and their timeouts /// Active sessions and their timeouts
SessionAndTimeout session_and_timeout; SessionAndTimeout session_and_timeout;
/// Sessions credentials /// Sessions credentials
KeeperStorage::SessionAndAuth session_and_auth; Storage::SessionAndAuth session_and_auth;
/// ACLs cache for better performance. Without we cannot deserialize storage. /// ACLs cache for better performance. Without we cannot deserialize storage.
std::unordered_map<uint64_t, Coordination::ACLs> acl_map; std::unordered_map<uint64_t, Coordination::ACLs> acl_map;
/// Cluster config from snapshot, can be empty /// Cluster config from snapshot, can be empty
@ -93,14 +97,12 @@ struct SnapshotFileInfo
DiskPtr disk; DiskPtr disk;
}; };
using KeeperStorageSnapshotPtr = std::shared_ptr<KeeperStorageSnapshot>; using KeeperStorageSnapshotPtr = std::shared_ptr<IKeeperStorageSnapshot>;
using CreateSnapshotCallback = std::function<SnapshotFileInfo(KeeperStorageSnapshotPtr &&)>; using CreateSnapshotCallback = std::function<SnapshotFileInfo(KeeperStorageSnapshotPtr &&)>;
using SnapshotMetaAndStorage = std::pair<SnapshotMetadataPtr, KeeperStoragePtr>;
/// Class responsible for snapshots serialization and deserialization. Each snapshot /// Class responsible for snapshots serialization and deserialization. Each snapshot
/// has it's path on disk and log index. /// has it's path on disk and log index.
template<typename Storage>
class KeeperSnapshotManager class KeeperSnapshotManager
{ {
public: public:
@ -112,18 +114,18 @@ public:
size_t storage_tick_time_ = 500); size_t storage_tick_time_ = 500);
/// Restore storage from latest available snapshot /// Restore storage from latest available snapshot
SnapshotDeserializationResult restoreFromLatestSnapshot(); SnapshotDeserializationResult<Storage> restoreFromLatestSnapshot();
/// Compress snapshot and serialize it to buffer /// Compress snapshot and serialize it to buffer
nuraft::ptr<nuraft::buffer> serializeSnapshotToBuffer(const KeeperStorageSnapshot & snapshot) const; nuraft::ptr<nuraft::buffer> serializeSnapshotToBuffer(const KeeperStorageSnapshot<Storage> & snapshot) const;
/// Serialize already compressed snapshot to disk (return path) /// Serialize already compressed snapshot to disk (return path)
SnapshotFileInfo serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx); SnapshotFileInfo serializeSnapshotBufferToDisk(nuraft::buffer & buffer, uint64_t up_to_log_idx);
/// Serialize snapshot directly to disk /// Serialize snapshot directly to disk
SnapshotFileInfo serializeSnapshotToDisk(const KeeperStorageSnapshot & snapshot); SnapshotFileInfo serializeSnapshotToDisk(const KeeperStorageSnapshot<Storage> & snapshot);
SnapshotDeserializationResult deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const; SnapshotDeserializationResult<Storage> deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const;
/// Deserialize snapshot with log index up_to_log_idx from disk into compressed nuraft buffer. /// Deserialize snapshot with log index up_to_log_idx from disk into compressed nuraft buffer.
nuraft::ptr<nuraft::buffer> deserializeSnapshotBufferFromDisk(uint64_t up_to_log_idx) const; nuraft::ptr<nuraft::buffer> deserializeSnapshotBufferFromDisk(uint64_t up_to_log_idx) const;

View File

@ -40,7 +40,7 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
KeeperStateMachine::KeeperStateMachine( IKeeperStateMachine::IKeeperStateMachine(
ResponsesQueue & responses_queue_, ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_, SnapshotsQueue & snapshots_queue_,
const CoordinationSettingsPtr & coordination_settings_, const CoordinationSettingsPtr & coordination_settings_,
@ -50,12 +50,6 @@ KeeperStateMachine::KeeperStateMachine(
const std::string & superdigest_) const std::string & superdigest_)
: commit_callback(commit_callback_) : commit_callback(commit_callback_)
, coordination_settings(coordination_settings_) , coordination_settings(coordination_settings_)
, snapshot_manager(
coordination_settings->snapshots_to_keep,
keeper_context_,
coordination_settings->compress_snapshots_with_zstd_format,
superdigest_,
coordination_settings->dead_session_check_period_ms.totalMilliseconds())
, responses_queue(responses_queue_) , responses_queue(responses_queue_)
, snapshots_queue(snapshots_queue_) , snapshots_queue(snapshots_queue_)
, min_request_size_to_cache(coordination_settings_->min_request_size_for_cache) , min_request_size_to_cache(coordination_settings_->min_request_size_for_cache)
@ -67,6 +61,32 @@ KeeperStateMachine::KeeperStateMachine(
{ {
} }
template<typename Storage>
KeeperStateMachine<Storage>::KeeperStateMachine(
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_,
const CoordinationSettingsPtr & coordination_settings_,
const KeeperContextPtr & keeper_context_,
KeeperSnapshotManagerS3 * snapshot_manager_s3_,
IKeeperStateMachine::CommitCallback commit_callback_,
const std::string & superdigest_)
: IKeeperStateMachine(
responses_queue_,
snapshots_queue_,
coordination_settings_,
keeper_context_,
snapshot_manager_s3_,
commit_callback_,
superdigest_),
snapshot_manager(
coordination_settings->snapshots_to_keep,
keeper_context_,
coordination_settings->compress_snapshots_with_zstd_format,
superdigest_,
coordination_settings->dead_session_check_period_ms.totalMilliseconds())
{
}
namespace namespace
{ {
@ -77,7 +97,8 @@ bool isLocalDisk(const IDisk & disk)
} }
void KeeperStateMachine::init() template<typename Storage>
void KeeperStateMachine<Storage>::init()
{ {
/// Do everything without mutexes, no other threads exist. /// Do everything without mutexes, no other threads exist.
LOG_DEBUG(log, "Totally have {} snapshots", snapshot_manager.totalSnapshots()); LOG_DEBUG(log, "Totally have {} snapshots", snapshot_manager.totalSnapshots());
@ -129,7 +150,7 @@ void KeeperStateMachine::init()
} }
if (!storage) if (!storage)
storage = std::make_unique<KeeperStorage>( storage = std::make_unique<Storage>(
coordination_settings->dead_session_check_period_ms.totalMilliseconds(), superdigest, keeper_context); coordination_settings->dead_session_check_period_ms.totalMilliseconds(), superdigest, keeper_context);
} }
@ -137,12 +158,12 @@ namespace
{ {
void assertDigest( void assertDigest(
const KeeperStorage::Digest & first, const KeeperStorageBase::Digest & first,
const KeeperStorage::Digest & second, const KeeperStorageBase::Digest & second,
const Coordination::ZooKeeperRequest & request, const Coordination::ZooKeeperRequest & request,
bool committing) bool committing)
{ {
if (!KeeperStorage::checkDigest(first, second)) if (!KeeperStorageBase::checkDigest(first, second))
{ {
LOG_FATAL( LOG_FATAL(
&Poco::Logger::get("KeeperStateMachine"), &Poco::Logger::get("KeeperStateMachine"),
@ -160,7 +181,8 @@ void assertDigest(
} }
nuraft::ptr<nuraft::buffer> KeeperStateMachine::pre_commit(uint64_t log_idx, nuraft::buffer & data) template<typename Storage>
nuraft::ptr<nuraft::buffer> KeeperStateMachine<Storage>::pre_commit(uint64_t log_idx, nuraft::buffer & data)
{ {
auto request_for_session = parseRequest(data, /*final=*/false); auto request_for_session = parseRequest(data, /*final=*/false);
if (!request_for_session->zxid) if (!request_for_session->zxid)
@ -175,10 +197,10 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::pre_commit(uint64_t log_idx, nur
return result; return result;
} }
std::shared_ptr<KeeperStorage::RequestForSession> KeeperStateMachine::parseRequest(nuraft::buffer & data, bool final, ZooKeeperLogSerializationVersion * serialization_version) std::shared_ptr<KeeperStorageBase::RequestForSession> IKeeperStateMachine::parseRequest(nuraft::buffer & data, bool final, ZooKeeperLogSerializationVersion * serialization_version)
{ {
ReadBufferFromNuraftBuffer buffer(data); ReadBufferFromNuraftBuffer buffer(data);
auto request_for_session = std::make_shared<KeeperStorage::RequestForSession>(); auto request_for_session = std::make_shared<KeeperStorageBase::RequestForSession>();
readIntBinary(request_for_session->session_id, buffer); readIntBinary(request_for_session->session_id, buffer);
int32_t length; int32_t length;
@ -251,7 +273,7 @@ std::shared_ptr<KeeperStorage::RequestForSession> KeeperStateMachine::parseReque
request_for_session->digest.emplace(); request_for_session->digest.emplace();
readIntBinary(request_for_session->digest->version, buffer); readIntBinary(request_for_session->digest->version, buffer);
if (request_for_session->digest->version != KeeperStorage::DigestVersion::NO_DIGEST || !buffer.eof()) if (request_for_session->digest->version != KeeperStorageBase::DigestVersion::NO_DIGEST || !buffer.eof())
readIntBinary(request_for_session->digest->value, buffer); readIntBinary(request_for_session->digest->value, buffer);
} }
@ -267,7 +289,8 @@ std::shared_ptr<KeeperStorage::RequestForSession> KeeperStateMachine::parseReque
return request_for_session; return request_for_session;
} }
bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & request_for_session) template<typename Storage>
bool KeeperStateMachine<Storage>::preprocess(const KeeperStorageBase::RequestForSession & request_for_session)
{ {
const auto op_num = request_for_session.request->getOpNum(); const auto op_num = request_for_session.request->getOpNum();
if (op_num == Coordination::OpNum::SessionID || op_num == Coordination::OpNum::Reconfig) if (op_num == Coordination::OpNum::SessionID || op_num == Coordination::OpNum::Reconfig)
@ -301,10 +324,11 @@ bool KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
return true; return true;
} }
void KeeperStateMachine::reconfigure(const KeeperStorage::RequestForSession& request_for_session) template<typename Storage>
void KeeperStateMachine<Storage>::reconfigure(const KeeperStorageBase::RequestForSession& request_for_session)
{ {
std::lock_guard _(storage_and_responses_lock); std::lock_guard _(storage_and_responses_lock);
KeeperStorage::ResponseForSession response = processReconfiguration(request_for_session); KeeperStorageBase::ResponseForSession response = processReconfiguration(request_for_session);
if (!responses_queue.push(response)) if (!responses_queue.push(response))
{ {
ProfileEvents::increment(ProfileEvents::KeeperCommitsFailed); ProfileEvents::increment(ProfileEvents::KeeperCommitsFailed);
@ -314,8 +338,9 @@ void KeeperStateMachine::reconfigure(const KeeperStorage::RequestForSession& req
} }
} }
KeeperStorage::ResponseForSession KeeperStateMachine::processReconfiguration( template<typename Storage>
const KeeperStorage::RequestForSession & request_for_session) KeeperStorageBase::ResponseForSession KeeperStateMachine<Storage>::processReconfiguration(
const KeeperStorageBase::RequestForSession & request_for_session)
{ {
ProfileEvents::increment(ProfileEvents::KeeperReconfigRequest); ProfileEvents::increment(ProfileEvents::KeeperReconfigRequest);
@ -324,7 +349,7 @@ KeeperStorage::ResponseForSession KeeperStateMachine::processReconfiguration(
const int64_t zxid = request_for_session.zxid; const int64_t zxid = request_for_session.zxid;
using enum Coordination::Error; using enum Coordination::Error;
auto bad_request = [&](Coordination::Error code = ZBADARGUMENTS) -> KeeperStorage::ResponseForSession auto bad_request = [&](Coordination::Error code = ZBADARGUMENTS) -> KeeperStorageBase::ResponseForSession
{ {
auto res = std::make_shared<Coordination::ZooKeeperReconfigResponse>(); auto res = std::make_shared<Coordination::ZooKeeperReconfigResponse>();
res->xid = request.xid; res->xid = request.xid;
@ -381,7 +406,8 @@ KeeperStorage::ResponseForSession KeeperStateMachine::processReconfiguration(
return { session_id, std::move(response) }; return { session_id, std::move(response) };
} }
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data) template<typename Storage>
nuraft::ptr<nuraft::buffer> KeeperStateMachine<Storage>::commit(const uint64_t log_idx, nuraft::buffer & data)
{ {
auto request_for_session = parseRequest(data, true); auto request_for_session = parseRequest(data, true);
if (!request_for_session->zxid) if (!request_for_session->zxid)
@ -392,7 +418,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
if (!keeper_context->local_logs_preprocessed) if (!keeper_context->local_logs_preprocessed)
preprocess(*request_for_session); preprocess(*request_for_session);
auto try_push = [this](const KeeperStorage::ResponseForSession& response) auto try_push = [this](const KeeperStorageBase::ResponseForSession& response)
{ {
if (!responses_queue.push(response)) if (!responses_queue.push(response))
{ {
@ -412,7 +438,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
std::shared_ptr<Coordination::ZooKeeperSessionIDResponse> response = std::make_shared<Coordination::ZooKeeperSessionIDResponse>(); std::shared_ptr<Coordination::ZooKeeperSessionIDResponse> response = std::make_shared<Coordination::ZooKeeperSessionIDResponse>();
response->internal_id = session_id_request.internal_id; response->internal_id = session_id_request.internal_id;
response->server_id = session_id_request.server_id; response->server_id = session_id_request.server_id;
KeeperStorage::ResponseForSession response_for_session; KeeperStorageBase::ResponseForSession response_for_session;
response_for_session.session_id = -1; response_for_session.session_id = -1;
response_for_session.response = response; response_for_session.response = response;
@ -431,7 +457,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
} }
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
KeeperStorage::ResponsesForSessions responses_for_sessions KeeperStorageBase::ResponsesForSessions responses_for_sessions
= storage->processRequest(request_for_session->request, request_for_session->session_id, request_for_session->zxid); = storage->processRequest(request_for_session->request, request_for_session->session_id, request_for_session->zxid);
for (auto & response_for_session : responses_for_sessions) for (auto & response_for_session : responses_for_sessions)
try_push(response_for_session); try_push(response_for_session);
@ -448,7 +474,8 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
return nullptr; return nullptr;
} }
bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s) template<typename Storage>
bool KeeperStateMachine<Storage>::apply_snapshot(nuraft::snapshot & s)
{ {
LOG_DEBUG(log, "Applying snapshot {}", s.get_last_log_idx()); LOG_DEBUG(log, "Applying snapshot {}", s.get_last_log_idx());
nuraft::ptr<nuraft::buffer> latest_snapshot_ptr; nuraft::ptr<nuraft::buffer> latest_snapshot_ptr;
@ -475,7 +502,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
{ /// deserialize and apply snapshot to storage { /// deserialize and apply snapshot to storage
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
SnapshotDeserializationResult snapshot_deserialization_result; SnapshotDeserializationResult<Storage> snapshot_deserialization_result;
if (latest_snapshot_ptr) if (latest_snapshot_ptr)
snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_ptr); snapshot_deserialization_result = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_ptr);
else else
@ -496,7 +523,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
} }
void KeeperStateMachine::commit_config(const uint64_t log_idx, nuraft::ptr<nuraft::cluster_config> & new_conf) void IKeeperStateMachine::commit_config(const uint64_t log_idx, nuraft::ptr<nuraft::cluster_config> & new_conf)
{ {
std::lock_guard lock(cluster_config_lock); std::lock_guard lock(cluster_config_lock);
auto tmp = new_conf->serialize(); auto tmp = new_conf->serialize();
@ -504,7 +531,7 @@ void KeeperStateMachine::commit_config(const uint64_t log_idx, nuraft::ptr<nuraf
last_committed_idx = log_idx; last_committed_idx = log_idx;
} }
void KeeperStateMachine::rollback(uint64_t log_idx, nuraft::buffer & data) void IKeeperStateMachine::rollback(uint64_t log_idx, nuraft::buffer & data)
{ {
auto request_for_session = parseRequest(data, true); auto request_for_session = parseRequest(data, true);
// If we received a log from an older node, use the log_idx as the zxid // If we received a log from an older node, use the log_idx as the zxid
@ -516,7 +543,8 @@ void KeeperStateMachine::rollback(uint64_t log_idx, nuraft::buffer & data)
rollbackRequest(*request_for_session, false); rollbackRequest(*request_for_session, false);
} }
void KeeperStateMachine::rollbackRequest(const KeeperStorage::RequestForSession & request_for_session, bool allow_missing) template<typename Storage>
void KeeperStateMachine<Storage>::rollbackRequest(const KeeperStorageBase::RequestForSession & request_for_session, bool allow_missing)
{ {
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID) if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
return; return;
@ -525,7 +553,8 @@ void KeeperStateMachine::rollbackRequest(const KeeperStorage::RequestForSession
storage->rollbackRequest(request_for_session.zxid, allow_missing); storage->rollbackRequest(request_for_session.zxid, allow_missing);
} }
void KeeperStateMachine::rollbackRequestNoLock(const KeeperStorage::RequestForSession & request_for_session, bool allow_missing) template<typename Storage>
void KeeperStateMachine<Storage>::rollbackRequestNoLock(const KeeperStorageBase::RequestForSession & request_for_session, bool allow_missing)
{ {
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID) if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
return; return;
@ -533,14 +562,15 @@ void KeeperStateMachine::rollbackRequestNoLock(const KeeperStorage::RequestForSe
storage->rollbackRequest(request_for_session.zxid, allow_missing); storage->rollbackRequest(request_for_session.zxid, allow_missing);
} }
nuraft::ptr<nuraft::snapshot> KeeperStateMachine::last_snapshot() nuraft::ptr<nuraft::snapshot> IKeeperStateMachine::last_snapshot()
{ {
/// Just return the latest snapshot. /// Just return the latest snapshot.
std::lock_guard lock(snapshots_lock); std::lock_guard lock(snapshots_lock);
return latest_snapshot_meta; return latest_snapshot_meta;
} }
void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_result<bool>::handler_type & when_done) template<typename Storage>
void KeeperStateMachine<Storage>::create_snapshot(nuraft::snapshot & s, nuraft::async_result<bool>::handler_type & when_done)
{ {
LOG_DEBUG(log, "Creating snapshot {}", s.get_last_log_idx()); LOG_DEBUG(log, "Creating snapshot {}", s.get_last_log_idx());
@ -549,12 +579,13 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
CreateSnapshotTask snapshot_task; CreateSnapshotTask snapshot_task;
{ /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking. { /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking.
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
snapshot_task.snapshot = std::make_shared<KeeperStorageSnapshot>(storage.get(), snapshot_meta_copy, getClusterConfig()); snapshot_task.snapshot = std::make_shared<KeeperStorageSnapshot<Storage>>(storage.get(), snapshot_meta_copy, getClusterConfig());
} }
/// create snapshot task for background execution (in snapshot thread) /// create snapshot task for background execution (in snapshot thread)
snapshot_task.create_snapshot = [this, when_done](KeeperStorageSnapshotPtr && snapshot) snapshot_task.create_snapshot = [this, when_done](KeeperStorageSnapshotPtr && snapshot_)
{ {
auto * snapshot = typeid_cast<KeeperStorageSnapshot<Storage> *>(snapshot_.get());
nuraft::ptr<std::exception> exception(nullptr); nuraft::ptr<std::exception> exception(nullptr);
bool ret = true; bool ret = true;
try try
@ -601,7 +632,7 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
/// Turn off "snapshot mode" and clear outdate part of storage state /// Turn off "snapshot mode" and clear outdate part of storage state
storage->clearGarbageAfterSnapshot(); storage->clearGarbageAfterSnapshot();
LOG_TRACE(log, "Cleared garbage after snapshot"); LOG_TRACE(log, "Cleared garbage after snapshot");
snapshot.reset(); snapshot_.reset();
} }
} }
catch (...) catch (...)
@ -638,7 +669,8 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
LOG_WARNING(log, "Cannot push snapshot task into queue"); LOG_WARNING(log, "Cannot push snapshot task into queue");
} }
void KeeperStateMachine::save_logical_snp_obj( template<typename Storage>
void KeeperStateMachine<Storage>::save_logical_snp_obj(
nuraft::snapshot & s, uint64_t & obj_id, nuraft::buffer & data, bool /*is_first_obj*/, bool /*is_last_obj*/) nuraft::snapshot & s, uint64_t & obj_id, nuraft::buffer & data, bool /*is_first_obj*/, bool /*is_last_obj*/)
{ {
LOG_DEBUG(log, "Saving snapshot {} obj_id {}", s.get_last_log_idx(), obj_id); LOG_DEBUG(log, "Saving snapshot {} obj_id {}", s.get_last_log_idx(), obj_id);
@ -703,7 +735,7 @@ static int bufferFromFile(Poco::Logger * log, const std::string & path, nuraft::
return 0; return 0;
} }
int KeeperStateMachine::read_logical_snp_obj( int IKeeperStateMachine::read_logical_snp_obj(
nuraft::snapshot & s, void *& /*user_snp_ctx*/, uint64_t obj_id, nuraft::ptr<nuraft::buffer> & data_out, bool & is_last_obj) nuraft::snapshot & s, void *& /*user_snp_ctx*/, uint64_t obj_id, nuraft::ptr<nuraft::buffer> & data_out, bool & is_last_obj)
{ {
LOG_DEBUG(log, "Reading snapshot {} obj_id {}", s.get_last_log_idx(), obj_id); LOG_DEBUG(log, "Reading snapshot {} obj_id {}", s.get_last_log_idx(), obj_id);
@ -743,7 +775,8 @@ int KeeperStateMachine::read_logical_snp_obj(
return 1; return 1;
} }
void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session) template<typename Storage>
void KeeperStateMachine<Storage>::processReadRequest(const KeeperStorageBase::RequestForSession & request_for_session)
{ {
/// Pure local request, just process it with storage /// Pure local request, just process it with storage
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
@ -754,103 +787,120 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi
LOG_WARNING(log, "Failed to push response with session id {} to the queue, probably because of shutdown", response.session_id); LOG_WARNING(log, "Failed to push response with session id {} to the queue, probably because of shutdown", response.session_id);
} }
void KeeperStateMachine::shutdownStorage() template<typename Storage>
void KeeperStateMachine<Storage>::shutdownStorage()
{ {
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
storage->finalize(); storage->finalize();
} }
std::vector<int64_t> KeeperStateMachine::getDeadSessions() template<typename Storage>
std::vector<int64_t> KeeperStateMachine<Storage>::getDeadSessions()
{ {
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
return storage->getDeadSessions(); return storage->getDeadSessions();
} }
int64_t KeeperStateMachine::getNextZxid() const template<typename Storage>
int64_t KeeperStateMachine<Storage>::getNextZxid() const
{ {
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
return storage->getNextZXID(); return storage->getNextZXID();
} }
KeeperStorage::Digest KeeperStateMachine::getNodesDigest() const template<typename Storage>
KeeperStorageBase::Digest KeeperStateMachine<Storage>::getNodesDigest() const
{ {
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
return storage->getNodesDigest(false); return storage->getNodesDigest(false);
} }
uint64_t KeeperStateMachine::getLastProcessedZxid() const template<typename Storage>
uint64_t KeeperStateMachine<Storage>::getLastProcessedZxid() const
{ {
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
return storage->getZXID(); return storage->getZXID();
} }
uint64_t KeeperStateMachine::getNodesCount() const template<typename Storage>
uint64_t KeeperStateMachine<Storage>::getNodesCount() const
{ {
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
return storage->getNodesCount(); return storage->getNodesCount();
} }
uint64_t KeeperStateMachine::getTotalWatchesCount() const template<typename Storage>
uint64_t KeeperStateMachine<Storage>::getTotalWatchesCount() const
{ {
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
return storage->getTotalWatchesCount(); return storage->getTotalWatchesCount();
} }
uint64_t KeeperStateMachine::getWatchedPathsCount() const template<typename Storage>
uint64_t KeeperStateMachine<Storage>::getWatchedPathsCount() const
{ {
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
return storage->getWatchedPathsCount(); return storage->getWatchedPathsCount();
} }
uint64_t KeeperStateMachine::getSessionsWithWatchesCount() const template<typename Storage>
uint64_t KeeperStateMachine<Storage>::getSessionsWithWatchesCount() const
{ {
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
return storage->getSessionsWithWatchesCount(); return storage->getSessionsWithWatchesCount();
} }
uint64_t KeeperStateMachine::getTotalEphemeralNodesCount() const template<typename Storage>
uint64_t KeeperStateMachine<Storage>::getTotalEphemeralNodesCount() const
{ {
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
return storage->getTotalEphemeralNodesCount(); return storage->getTotalEphemeralNodesCount();
} }
uint64_t KeeperStateMachine::getSessionWithEphemeralNodesCount() const template<typename Storage>
uint64_t KeeperStateMachine<Storage>::getSessionWithEphemeralNodesCount() const
{ {
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
return storage->getSessionWithEphemeralNodesCount(); return storage->getSessionWithEphemeralNodesCount();
} }
void KeeperStateMachine::dumpWatches(WriteBufferFromOwnString & buf) const template<typename Storage>
void KeeperStateMachine<Storage>::dumpWatches(WriteBufferFromOwnString & buf) const
{ {
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
storage->dumpWatches(buf); storage->dumpWatches(buf);
} }
void KeeperStateMachine::dumpWatchesByPath(WriteBufferFromOwnString & buf) const template<typename Storage>
void KeeperStateMachine<Storage>::dumpWatchesByPath(WriteBufferFromOwnString & buf) const
{ {
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
storage->dumpWatchesByPath(buf); storage->dumpWatchesByPath(buf);
} }
void KeeperStateMachine::dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const template<typename Storage>
void KeeperStateMachine<Storage>::dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const
{ {
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
storage->dumpSessionsAndEphemerals(buf); storage->dumpSessionsAndEphemerals(buf);
} }
uint64_t KeeperStateMachine::getApproximateDataSize() const template<typename Storage>
uint64_t KeeperStateMachine<Storage>::getApproximateDataSize() const
{ {
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
return storage->getApproximateDataSize(); return storage->getApproximateDataSize();
} }
uint64_t KeeperStateMachine::getKeyArenaSize() const template<typename Storage>
uint64_t KeeperStateMachine<Storage>::getKeyArenaSize() const
{ {
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
return storage->getArenaDataSize(); return storage->getArenaDataSize();
} }
uint64_t KeeperStateMachine::getLatestSnapshotBufSize() const template<typename Storage>
uint64_t KeeperStateMachine<Storage>::getLatestSnapshotBufSize() const
{ {
std::lock_guard lock(snapshots_lock); std::lock_guard lock(snapshots_lock);
if (latest_snapshot_buf) if (latest_snapshot_buf)
@ -858,7 +908,7 @@ uint64_t KeeperStateMachine::getLatestSnapshotBufSize() const
return 0; return 0;
} }
ClusterConfigPtr KeeperStateMachine::getClusterConfig() const ClusterConfigPtr IKeeperStateMachine::getClusterConfig() const
{ {
std::lock_guard lock(cluster_config_lock); std::lock_guard lock(cluster_config_lock);
if (cluster_config) if (cluster_config)
@ -870,11 +920,15 @@ ClusterConfigPtr KeeperStateMachine::getClusterConfig() const
return nullptr; return nullptr;
} }
void KeeperStateMachine::recalculateStorageStats() template<typename Storage>
void KeeperStateMachine<Storage>::recalculateStorageStats()
{ {
std::lock_guard lock(storage_and_responses_lock); std::lock_guard lock(storage_and_responses_lock);
LOG_INFO(log, "Recalculating storage stats"); LOG_INFO(log, "Recalculating storage stats");
storage->recalculateStats(); storage->recalculateStats();
LOG_INFO(log, "Done recalculating storage stats"); LOG_INFO(log, "Done recalculating storage stats");
} }
template class KeeperStateMachine<KeeperMemoryStorage>;
} }

View File

@ -12,27 +12,25 @@
namespace DB namespace DB
{ {
using ResponsesQueue = ConcurrentBoundedQueue<KeeperStorage::ResponseForSession>; using ResponsesQueue = ConcurrentBoundedQueue<KeeperStorageBase::ResponseForSession>;
using SnapshotsQueue = ConcurrentBoundedQueue<CreateSnapshotTask>; using SnapshotsQueue = ConcurrentBoundedQueue<CreateSnapshotTask>;
/// ClickHouse Keeper state machine. Wrapper for KeeperStorage. class IKeeperStateMachine : public nuraft::state_machine
/// Responsible for entries commit, snapshots creation and so on.
class KeeperStateMachine : public nuraft::state_machine
{ {
public: public:
using CommitCallback = std::function<void(uint64_t, const KeeperStorage::RequestForSession &)>; using CommitCallback = std::function<void(uint64_t, const KeeperStorageBase::RequestForSession &)>;
KeeperStateMachine( IKeeperStateMachine(
ResponsesQueue & responses_queue_, ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_, SnapshotsQueue & snapshots_queue_,
const CoordinationSettingsPtr & coordination_settings_, const CoordinationSettingsPtr & coordination_settings_,
const KeeperContextPtr & keeper_context_, const KeeperContextPtr & keeper_context_,
KeeperSnapshotManagerS3 * snapshot_manager_s3_, KeeperSnapshotManagerS3 * snapshot_manager_s3_,
CommitCallback commit_callback_ = {}, CommitCallback commit_callback_,
const std::string & superdigest_ = ""); const std::string & superdisgest_);
/// Read state from the latest snapshot /// Read state from the latest snapshot
void init(); virtual void init() = 0;
enum ZooKeeperLogSerializationVersion enum ZooKeeperLogSerializationVersion
{ {
@ -49,89 +47,66 @@ public:
/// ///
/// final - whether it's the final time we will fetch the request so we can safely remove it from cache /// final - whether it's the final time we will fetch the request so we can safely remove it from cache
/// serialization_version - information about which fields were parsed from the buffer so we can modify the buffer accordingly /// serialization_version - information about which fields were parsed from the buffer so we can modify the buffer accordingly
std::shared_ptr<KeeperStorage::RequestForSession> parseRequest(nuraft::buffer & data, bool final, ZooKeeperLogSerializationVersion * serialization_version = nullptr); std::shared_ptr<KeeperStorageBase::RequestForSession> parseRequest(nuraft::buffer & data, bool final, ZooKeeperLogSerializationVersion * serialization_version = nullptr);
bool preprocess(const KeeperStorage::RequestForSession & request_for_session); virtual bool preprocess(const KeeperStorageBase::RequestForSession & request_for_session) = 0;
nuraft::ptr<nuraft::buffer> pre_commit(uint64_t log_idx, nuraft::buffer & data) override;
nuraft::ptr<nuraft::buffer> commit(const uint64_t log_idx, nuraft::buffer & data) override; /// NOLINT
/// Save new cluster config to our snapshot (copy of the config stored in StateManager)
void commit_config(const uint64_t log_idx, nuraft::ptr<nuraft::cluster_config> & new_conf) override; /// NOLINT void commit_config(const uint64_t log_idx, nuraft::ptr<nuraft::cluster_config> & new_conf) override; /// NOLINT
void rollback(uint64_t log_idx, nuraft::buffer & data) override; void rollback(uint64_t log_idx, nuraft::buffer & data) override;
// allow_missing - whether the transaction we want to rollback can be missing from storage // allow_missing - whether the transaction we want to rollback can be missing from storage
// (can happen in case of exception during preprocessing) // (can happen in case of exception during preprocessing)
void rollbackRequest(const KeeperStorage::RequestForSession & request_for_session, bool allow_missing); virtual void rollbackRequest(const KeeperStorageBase::RequestForSession & request_for_session, bool allow_missing) = 0;
void rollbackRequestNoLock(
const KeeperStorage::RequestForSession & request_for_session,
bool allow_missing) TSA_NO_THREAD_SAFETY_ANALYSIS;
uint64_t last_commit_index() override { return last_committed_idx; } uint64_t last_commit_index() override { return last_committed_idx; }
/// Apply preliminarily saved (save_logical_snp_obj) snapshot to our state.
bool apply_snapshot(nuraft::snapshot & s) override;
nuraft::ptr<nuraft::snapshot> last_snapshot() override; nuraft::ptr<nuraft::snapshot> last_snapshot() override;
/// Create new snapshot from current state. /// Create new snapshot from current state.
void create_snapshot(nuraft::snapshot & s, nuraft::async_result<bool>::handler_type & when_done) override; void create_snapshot(nuraft::snapshot & s, nuraft::async_result<bool>::handler_type & when_done) override = 0;
/// Save snapshot which was send by leader to us. After that we will apply it in apply_snapshot. /// Save snapshot which was send by leader to us. After that we will apply it in apply_snapshot.
void save_logical_snp_obj(nuraft::snapshot & s, uint64_t & obj_id, nuraft::buffer & data, bool is_first_obj, bool is_last_obj) override; void save_logical_snp_obj(nuraft::snapshot & s, uint64_t & obj_id, nuraft::buffer & data, bool is_first_obj, bool is_last_obj) override = 0;
/// Better name is `serialize snapshot` -- save existing snapshot (created by create_snapshot) into
/// in-memory buffer data_out.
int read_logical_snp_obj( int read_logical_snp_obj(
nuraft::snapshot & s, void *& user_snp_ctx, uint64_t obj_id, nuraft::ptr<nuraft::buffer> & data_out, bool & is_last_obj) override; nuraft::snapshot & s, void *& user_snp_ctx, uint64_t obj_id, nuraft::ptr<nuraft::buffer> & data_out, bool & is_last_obj) override;
// This should be used only for tests or keeper-data-dumper because it violates virtual void shutdownStorage() = 0;
// TSA -- we can't acquire the lock outside of this class or return a storage under lock
// in a reasonable way.
KeeperStorage & getStorageUnsafe() TSA_NO_THREAD_SAFETY_ANALYSIS
{
return *storage;
}
void shutdownStorage();
ClusterConfigPtr getClusterConfig() const; ClusterConfigPtr getClusterConfig() const;
/// Process local read request virtual void processReadRequest(const KeeperStorageBase::RequestForSession & request_for_session) = 0;
void processReadRequest(const KeeperStorage::RequestForSession & request_for_session);
std::vector<int64_t> getDeadSessions(); virtual std::vector<int64_t> getDeadSessions() = 0;
int64_t getNextZxid() const; virtual int64_t getNextZxid() const = 0;
KeeperStorage::Digest getNodesDigest() const; virtual KeeperStorageBase::Digest getNodesDigest() const = 0;
/// Introspection functions for 4lw commands /// Introspection functions for 4lw commands
uint64_t getLastProcessedZxid() const; virtual uint64_t getLastProcessedZxid() const = 0;
uint64_t getNodesCount() const; virtual uint64_t getNodesCount() const = 0;
uint64_t getTotalWatchesCount() const; virtual uint64_t getTotalWatchesCount() const = 0;
uint64_t getWatchedPathsCount() const; virtual uint64_t getWatchedPathsCount() const = 0;
uint64_t getSessionsWithWatchesCount() const; virtual uint64_t getSessionsWithWatchesCount() const = 0;
void dumpWatches(WriteBufferFromOwnString & buf) const; virtual void dumpWatches(WriteBufferFromOwnString & buf) const = 0;
void dumpWatchesByPath(WriteBufferFromOwnString & buf) const; virtual void dumpWatchesByPath(WriteBufferFromOwnString & buf) const = 0;
void dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const; virtual void dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const = 0;
uint64_t getSessionWithEphemeralNodesCount() const; virtual uint64_t getSessionWithEphemeralNodesCount() const = 0;
uint64_t getTotalEphemeralNodesCount() const; virtual uint64_t getTotalEphemeralNodesCount() const = 0;
uint64_t getApproximateDataSize() const; virtual uint64_t getApproximateDataSize() const = 0;
uint64_t getKeyArenaSize() const; virtual uint64_t getKeyArenaSize() const = 0;
uint64_t getLatestSnapshotBufSize() const; virtual uint64_t getLatestSnapshotBufSize() const = 0;
void recalculateStorageStats(); virtual void recalculateStorageStats() = 0;
void reconfigure(const KeeperStorage::RequestForSession& request_for_session); virtual void reconfigure(const KeeperStorageBase::RequestForSession& request_for_session) = 0;
private: protected:
CommitCallback commit_callback; CommitCallback commit_callback;
/// In our state machine we always have a single snapshot which is stored /// In our state machine we always have a single snapshot which is stored
/// in memory in compressed (serialized) format. /// in memory in compressed (serialized) format.
@ -141,12 +116,7 @@ private:
CoordinationSettingsPtr coordination_settings; CoordinationSettingsPtr coordination_settings;
/// Main state machine logic
KeeperStoragePtr storage TSA_PT_GUARDED_BY(storage_and_responses_lock);
/// Save/Load and Serialize/Deserialize logic for snapshots. /// Save/Load and Serialize/Deserialize logic for snapshots.
KeeperSnapshotManager snapshot_manager;
/// Put processed responses into this queue /// Put processed responses into this queue
ResponsesQueue & responses_queue; ResponsesQueue & responses_queue;
@ -163,7 +133,7 @@ private:
/// for request. /// for request.
mutable std::mutex storage_and_responses_lock; mutable std::mutex storage_and_responses_lock;
std::unordered_map<int64_t, std::unordered_map<Coordination::XID, std::shared_ptr<KeeperStorage::RequestForSession>>> parsed_request_cache; std::unordered_map<int64_t, std::unordered_map<Coordination::XID, std::shared_ptr<KeeperStorageBase::RequestForSession>>> parsed_request_cache;
uint64_t min_request_size_to_cache{0}; uint64_t min_request_size_to_cache{0};
/// we only need to protect the access to the map itself /// we only need to protect the access to the map itself
/// requests can be modified from anywhere without lock because a single request /// requests can be modified from anywhere without lock because a single request
@ -188,8 +158,105 @@ private:
KeeperSnapshotManagerS3 * snapshot_manager_s3; KeeperSnapshotManagerS3 * snapshot_manager_s3;
KeeperStorage::ResponseForSession processReconfiguration( virtual KeeperStorageBase::ResponseForSession processReconfiguration(
const KeeperStorage::RequestForSession& request_for_session) const KeeperStorageBase::RequestForSession& request_for_session)
TSA_REQUIRES(storage_and_responses_lock); TSA_REQUIRES(storage_and_responses_lock) = 0;
};
/// ClickHouse Keeper state machine. Wrapper for KeeperStorage.
/// Responsible for entries commit, snapshots creation and so on.
template<typename Storage>
class KeeperStateMachine : public IKeeperStateMachine
{
public:
/// using CommitCallback = std::function<void(uint64_t, const KeeperStorage::RequestForSession &)>;
KeeperStateMachine(
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_,
const CoordinationSettingsPtr & coordination_settings_,
const KeeperContextPtr & keeper_context_,
KeeperSnapshotManagerS3 * snapshot_manager_s3_,
CommitCallback commit_callback_ = {},
const std::string & superdigest_ = "");
/// Read state from the latest snapshot
void init() override;
bool preprocess(const KeeperStorageBase::RequestForSession & request_for_session) override;
nuraft::ptr<nuraft::buffer> pre_commit(uint64_t log_idx, nuraft::buffer & data) override;
nuraft::ptr<nuraft::buffer> commit(const uint64_t log_idx, nuraft::buffer & data) override; /// NOLINT
// allow_missing - whether the transaction we want to rollback can be missing from storage
// (can happen in case of exception during preprocessing)
void rollbackRequest(const KeeperStorageBase::RequestForSession & request_for_session, bool allow_missing) override;
void rollbackRequestNoLock(
const KeeperStorageBase::RequestForSession & request_for_session,
bool allow_missing) TSA_NO_THREAD_SAFETY_ANALYSIS;
/// Apply preliminarily saved (save_logical_snp_obj) snapshot to our state.
bool apply_snapshot(nuraft::snapshot & s) override;
/// Create new snapshot from current state.
void create_snapshot(nuraft::snapshot & s, nuraft::async_result<bool>::handler_type & when_done) override;
/// Save snapshot which was send by leader to us. After that we will apply it in apply_snapshot.
void save_logical_snp_obj(nuraft::snapshot & s, uint64_t & obj_id, nuraft::buffer & data, bool is_first_obj, bool is_last_obj) override;
// This should be used only for tests or keeper-data-dumper because it violates
// TSA -- we can't acquire the lock outside of this class or return a storage under lock
// in a reasonable way.
Storage & getStorageUnsafe() TSA_NO_THREAD_SAFETY_ANALYSIS
{
return *storage;
}
void shutdownStorage() override;
/// Process local read request
void processReadRequest(const KeeperStorageBase::RequestForSession & request_for_session) override;
std::vector<int64_t> getDeadSessions() override;
int64_t getNextZxid() const override;
KeeperStorageBase::Digest getNodesDigest() const override;
/// Introspection functions for 4lw commands
uint64_t getLastProcessedZxid() const override;
uint64_t getNodesCount() const override;
uint64_t getTotalWatchesCount() const override;
uint64_t getWatchedPathsCount() const override;
uint64_t getSessionsWithWatchesCount() const override;
void dumpWatches(WriteBufferFromOwnString & buf) const override;
void dumpWatchesByPath(WriteBufferFromOwnString & buf) const override;
void dumpSessionsAndEphemerals(WriteBufferFromOwnString & buf) const override;
uint64_t getSessionWithEphemeralNodesCount() const override;
uint64_t getTotalEphemeralNodesCount() const override;
uint64_t getApproximateDataSize() const override;
uint64_t getKeyArenaSize() const override;
uint64_t getLatestSnapshotBufSize() const override;
void recalculateStorageStats() override;
void reconfigure(const KeeperStorageBase::RequestForSession& request_for_session) override;
private:
/// Main state machine logic
std::unique_ptr<Storage> storage; //TSA_PT_GUARDED_BY(storage_and_responses_lock);
/// Save/Load and Serialize/Deserialize logic for snapshots.
KeeperSnapshotManager<Storage> snapshot_manager;
KeeperStorageBase::ResponseForSession processReconfiguration(
const KeeperStorageBase::RequestForSession& request_for_session)
TSA_REQUIRES(storage_and_responses_lock) override;
}; };
} }

File diff suppressed because it is too large Load Diff

View File

@ -16,62 +16,55 @@
namespace DB namespace DB
{ {
struct KeeperStorageRequestProcessor;
using KeeperStorageRequestProcessorPtr = std::shared_ptr<KeeperStorageRequestProcessor>;
using ResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr &)>; using ResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr &)>;
using ChildrenSet = absl::flat_hash_set<StringRef, StringRefHash>; using ChildrenSet = absl::flat_hash_set<StringRef, StringRefHash>;
using SessionAndTimeout = std::unordered_map<int64_t, int64_t>; using SessionAndTimeout = std::unordered_map<int64_t, int64_t>;
struct KeeperStorageSnapshot; struct KeeperMemNode
{
uint64_t acl_id = 0; /// 0 -- no ACL by default
bool is_sequental = false;
Coordination::Stat stat{};
int32_t seq_num = 0;
uint64_t size_bytes; // save size to avoid calculate every time
/// Keeper state machine almost equal to the ZooKeeper's state machine. KeeperMemNode() : size_bytes(sizeof(KeeperMemNode)) { }
/// Implements all logic of operations, data changes, sessions allocation.
/// In-memory and not thread safe. /// Object memory size
class KeeperStorage uint64_t sizeInBytes() const { return size_bytes; }
void setData(String new_data);
const auto & getData() const noexcept { return data; }
void addChild(StringRef child_path, bool update_size = true);
void removeChild(StringRef child_path);
const auto & getChildren() const noexcept { return children; }
// Invalidate the calculated digest so it's recalculated again on the next
// getDigest call
void invalidateDigestCache() const;
// get the calculated digest of the node
UInt64 getDigest(std::string_view path) const;
// copy only necessary information for preprocessing and digest calculation
// (e.g. we don't need to copy list of children)
void shallowCopy(const KeeperMemNode & other);
void recalculateSize();
private:
String data;
ChildrenSet children{};
mutable std::optional<UInt64> cached_digest;
};
class KeeperStorageBase
{ {
public: public:
struct Node
{
uint64_t acl_id = 0; /// 0 -- no ACL by default
bool is_sequental = false;
Coordination::Stat stat{};
int32_t seq_num = 0;
uint64_t size_bytes; // save size to avoid calculate every time
Node() : size_bytes(sizeof(Node)) { }
/// Object memory size
uint64_t sizeInBytes() const { return size_bytes; }
void setData(String new_data);
const auto & getData() const noexcept { return data; }
void addChild(StringRef child_path, bool update_size = true);
void removeChild(StringRef child_path);
const auto & getChildren() const noexcept { return children; }
// Invalidate the calculated digest so it's recalculated again on the next
// getDigest call
void invalidateDigestCache() const;
// get the calculated digest of the node
UInt64 getDigest(std::string_view path) const;
// copy only necessary information for preprocessing and digest calculation
// (e.g. we don't need to copy list of children)
void shallowCopy(const Node & other);
void recalculateSize();
private:
String data;
ChildrenSet children{};
mutable std::optional<UInt64> cached_digest;
};
enum DigestVersion : uint8_t enum DigestVersion : uint8_t
{ {
NO_DIGEST = 0, NO_DIGEST = 0,
@ -79,7 +72,11 @@ public:
V2 = 2 // added system nodes that modify the digest on startup so digest from V0 is invalid V2 = 2 // added system nodes that modify the digest on startup so digest from V0 is invalid
}; };
static constexpr auto CURRENT_DIGEST_VERSION = DigestVersion::V2; struct Digest
{
DigestVersion version{DigestVersion::NO_DIGEST};
uint64_t value{0};
};
struct ResponseForSession struct ResponseForSession
{ {
@ -88,11 +85,34 @@ public:
}; };
using ResponsesForSessions = std::vector<ResponseForSession>; using ResponsesForSessions = std::vector<ResponseForSession>;
struct Digest struct RequestForSession
{ {
DigestVersion version{DigestVersion::NO_DIGEST}; int64_t session_id;
uint64_t value{0}; int64_t time{0};
Coordination::ZooKeeperRequestPtr request;
int64_t zxid{0};
std::optional<Digest> digest;
int64_t log_idx{0};
}; };
using RequestsForSessions = std::vector<RequestForSession>;
struct AuthID
{
std::string scheme;
std::string id;
bool operator==(const AuthID & other) const { return scheme == other.scheme && id == other.id; }
};
// using Container = SnapshotableHashTable<Node>;
using Ephemerals = std::unordered_map<int64_t, std::unordered_set<std::string>>;
using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<std::string>>;
using SessionIDs = std::unordered_set<int64_t>;
/// Just vector of SHA1 from user:password
using AuthIDs = std::vector<AuthID>;
using SessionAndAuth = std::unordered_map<int64_t, AuthIDs>;
using Watches = std::unordered_map<String /* path, relative of root_path */, SessionIDs>;
static bool checkDigest(const Digest & first, const Digest & second) static bool checkDigest(const Digest & first, const Digest & second)
{ {
@ -105,38 +125,22 @@ public:
return first.value == second.value; return first.value == second.value;
} }
};
/// Keeper state machine almost equal to the ZooKeeper's state machine.
/// Implements all logic of operations, data changes, sessions allocation.
/// In-memory and not thread safe.
template<typename Container_>
class KeeperStorage : public KeeperStorageBase
{
public:
using Container = Container_;
using Node = Container::Node;
static constexpr auto CURRENT_DIGEST_VERSION = DigestVersion::V2;
static String generateDigest(const String & userdata); static String generateDigest(const String & userdata);
struct RequestForSession
{
int64_t session_id;
int64_t time{0};
Coordination::ZooKeeperRequestPtr request;
int64_t zxid{0};
std::optional<Digest> digest;
int64_t log_idx{0};
};
struct AuthID
{
std::string scheme;
std::string id;
bool operator==(const AuthID & other) const { return scheme == other.scheme && id == other.id; }
};
using RequestsForSessions = std::vector<RequestForSession>;
using Container = SnapshotableHashTable<Node>;
using Ephemerals = std::unordered_map<int64_t, std::unordered_set<std::string>>;
using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<std::string>>;
using SessionIDs = std::unordered_set<int64_t>;
/// Just vector of SHA1 from user:password
using AuthIDs = std::vector<AuthID>;
using SessionAndAuth = std::unordered_map<int64_t, AuthIDs>;
using Watches = std::unordered_map<String /* path, relative of root_path */, SessionIDs>;
int64_t session_id_counter{1}; int64_t session_id_counter{1};
SessionAndAuth session_and_auth; SessionAndAuth session_and_auth;
@ -307,7 +311,7 @@ public:
std::unordered_map<std::string, std::list<const Delta *>, Hash, Equal> deltas_for_path; std::unordered_map<std::string, std::list<const Delta *>, Hash, Equal> deltas_for_path;
std::list<Delta> deltas; std::list<Delta> deltas;
KeeperStorage & storage; KeeperStorage<Container> & storage;
}; };
UncommittedState uncommitted_state{*this}; UncommittedState uncommitted_state{*this};
@ -487,6 +491,6 @@ private:
void addDigest(const Node & node, std::string_view path); void addDigest(const Node & node, std::string_view path);
}; };
using KeeperStoragePtr = std::unique_ptr<KeeperStorage>; using KeeperMemoryStorage = KeeperStorage<SnapshotableHashTable<KeeperMemNode>>;
} }

View File

@ -121,6 +121,7 @@ private:
public: public:
using Node = V;
using iterator = typename List::iterator; using iterator = typename List::iterator;
using const_iterator = typename List::const_iterator; using const_iterator = typename List::const_iterator;
using ValueUpdater = std::function<void(V & value)>; using ValueUpdater = std::function<void(V & value)>;

View File

@ -43,7 +43,8 @@ void deserializeSnapshotMagic(ReadBuffer & in)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Incorrect magic header in file, expected {}, got {}", SNP_HEADER, magic_header); throw Exception(ErrorCodes::CORRUPTED_DATA, "Incorrect magic header in file, expected {}, got {}", SNP_HEADER, magic_header);
} }
int64_t deserializeSessionAndTimeout(KeeperStorage & storage, ReadBuffer & in) template<typename Storage>
int64_t deserializeSessionAndTimeout(Storage & storage, ReadBuffer & in)
{ {
int32_t count; int32_t count;
Coordination::read(count, in); Coordination::read(count, in);
@ -62,7 +63,8 @@ int64_t deserializeSessionAndTimeout(KeeperStorage & storage, ReadBuffer & in)
return max_session_id; return max_session_id;
} }
void deserializeACLMap(KeeperStorage & storage, ReadBuffer & in) template<typename Storage>
void deserializeACLMap(Storage & storage, ReadBuffer & in)
{ {
int32_t count; int32_t count;
Coordination::read(count, in); Coordination::read(count, in);
@ -90,7 +92,8 @@ void deserializeACLMap(KeeperStorage & storage, ReadBuffer & in)
} }
} }
int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::Logger * log) template<typename Storage>
int64_t deserializeStorageData(Storage & storage, ReadBuffer & in, Poco::Logger * log)
{ {
int64_t max_zxid = 0; int64_t max_zxid = 0;
std::string path; std::string path;
@ -98,7 +101,7 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::L
size_t count = 0; size_t count = 0;
while (path != "/") while (path != "/")
{ {
KeeperStorage::Node node{}; typename Storage::Node node{};
String data; String data;
Coordination::read(data, in); Coordination::read(data, in);
node.setData(std::move(data)); node.setData(std::move(data));
@ -140,14 +143,15 @@ int64_t deserializeStorageData(KeeperStorage & storage, ReadBuffer & in, Poco::L
if (itr.key != "/") if (itr.key != "/")
{ {
auto parent_path = parentNodePath(itr.key); auto parent_path = parentNodePath(itr.key);
storage.container.updateValue(parent_path, [my_path = itr.key] (KeeperStorage::Node & value) { value.addChild(getBaseNodeName(my_path)); ++value.stat.numChildren; }); storage.container.updateValue(parent_path, [my_path = itr.key] (typename Storage::Node & value) { value.addChild(getBaseNodeName(my_path)); ++value.stat.numChildren; });
} }
} }
return max_zxid; return max_zxid;
} }
void deserializeKeeperStorageFromSnapshot(KeeperStorage & storage, const std::string & snapshot_path, Poco::Logger * log) template<typename Storage>
void deserializeKeeperStorageFromSnapshot(Storage & storage, const std::string & snapshot_path, Poco::Logger * log)
{ {
LOG_INFO(log, "Deserializing storage snapshot {}", snapshot_path); LOG_INFO(log, "Deserializing storage snapshot {}", snapshot_path);
int64_t zxid = getZxidFromName(snapshot_path); int64_t zxid = getZxidFromName(snapshot_path);
@ -186,7 +190,8 @@ void deserializeKeeperStorageFromSnapshot(KeeperStorage & storage, const std::st
LOG_INFO(log, "Finished, snapshot ZXID {}", storage.zxid); LOG_INFO(log, "Finished, snapshot ZXID {}", storage.zxid);
} }
void deserializeKeeperStorageFromSnapshotsDir(KeeperStorage & storage, const std::string & path, Poco::Logger * log) template<typename Storage>
void deserializeKeeperStorageFromSnapshotsDir(Storage & storage, const std::string & path, Poco::Logger * log)
{ {
namespace fs = std::filesystem; namespace fs = std::filesystem;
std::map<int64_t, std::string> existing_snapshots; std::map<int64_t, std::string> existing_snapshots;
@ -474,7 +479,8 @@ bool hasErrorsInMultiRequest(Coordination::ZooKeeperRequestPtr request)
} }
bool deserializeTxn(KeeperStorage & storage, ReadBuffer & in, Poco::Logger * /*log*/) template<typename Storage>
bool deserializeTxn(Storage & storage, ReadBuffer & in, Poco::Logger * /*log*/)
{ {
int64_t checksum; int64_t checksum;
Coordination::read(checksum, in); Coordination::read(checksum, in);
@ -529,7 +535,8 @@ bool deserializeTxn(KeeperStorage & storage, ReadBuffer & in, Poco::Logger * /*l
return true; return true;
} }
void deserializeLogAndApplyToStorage(KeeperStorage & storage, const std::string & log_path, Poco::Logger * log) template<typename Storage>
void deserializeLogAndApplyToStorage(Storage & storage, const std::string & log_path, Poco::Logger * log)
{ {
ReadBufferFromFile reader(log_path); ReadBufferFromFile reader(log_path);
@ -553,7 +560,8 @@ void deserializeLogAndApplyToStorage(KeeperStorage & storage, const std::string
LOG_INFO(log, "Finished {} deserialization, totally read {} records", log_path, counter); LOG_INFO(log, "Finished {} deserialization, totally read {} records", log_path, counter);
} }
void deserializeLogsAndApplyToStorage(KeeperStorage & storage, const std::string & path, Poco::Logger * log) template<typename Storage>
void deserializeLogsAndApplyToStorage(Storage & storage, const std::string & path, Poco::Logger * log)
{ {
namespace fs = std::filesystem; namespace fs = std::filesystem;
std::map<int64_t, std::string> existing_logs; std::map<int64_t, std::string> existing_logs;
@ -589,4 +597,9 @@ void deserializeLogsAndApplyToStorage(KeeperStorage & storage, const std::string
} }
} }
template void deserializeKeeperStorageFromSnapshot<KeeperMemoryStorage>(KeeperMemoryStorage & storage, const std::string & snapshot_path, Poco::Logger * log);
template void deserializeKeeperStorageFromSnapshotsDir<KeeperMemoryStorage>(KeeperMemoryStorage & storage, const std::string & path, Poco::Logger * log);
template void deserializeLogAndApplyToStorage<KeeperMemoryStorage>(KeeperMemoryStorage & storage, const std::string & log_path, Poco::Logger * log);
template void deserializeLogsAndApplyToStorage<KeeperMemoryStorage>(KeeperMemoryStorage & storage, const std::string & path, Poco::Logger * log);
} }

View File

@ -5,12 +5,16 @@
namespace DB namespace DB
{ {
void deserializeKeeperStorageFromSnapshot(KeeperStorage & storage, const std::string & snapshot_path, Poco::Logger * log); template<typename Storage>
void deserializeKeeperStorageFromSnapshot(Storage & storage, const std::string & snapshot_path, Poco::Logger * log);
void deserializeKeeperStorageFromSnapshotsDir(KeeperStorage & storage, const std::string & path, Poco::Logger * log); template<typename Storage>
void deserializeKeeperStorageFromSnapshotsDir(Storage & storage, const std::string & path, Poco::Logger * log);
void deserializeLogAndApplyToStorage(KeeperStorage & storage, const std::string & log_path, Poco::Logger * log); template<typename Storage>
void deserializeLogAndApplyToStorage(Storage & storage, const std::string & log_path, Poco::Logger * log);
void deserializeLogsAndApplyToStorage(KeeperStorage & storage, const std::string & path, Poco::Logger * log); template<typename Storage>
void deserializeLogsAndApplyToStorage(Storage & storage, const std::string & path, Poco::Logger * log);
} }

View File

@ -1320,7 +1320,7 @@ TEST_P(CoordinationTest, SnapshotableHashMapDataSize)
EXPECT_EQ(hello.getApproximateDataSize(), 0); EXPECT_EQ(hello.getApproximateDataSize(), 0);
/// Node /// Node
using Node = DB::KeeperStorage::Node; using Node = DB::KeeperMemoryStorage::Node;
DB::SnapshotableHashTable<Node> world; DB::SnapshotableHashTable<Node> world;
Node n1; Node n1;
n1.setData("1234"); n1.setData("1234");
@ -1359,9 +1359,9 @@ TEST_P(CoordinationTest, SnapshotableHashMapDataSize)
EXPECT_EQ(world.getApproximateDataSize(), 0); EXPECT_EQ(world.getApproximateDataSize(), 0);
} }
void addNode(DB::KeeperStorage & storage, const std::string & path, const std::string & data, int64_t ephemeral_owner = 0) void addNode(DB::KeeperMemoryStorage & storage, const std::string & path, const std::string & data, int64_t ephemeral_owner = 0)
{ {
using Node = DB::KeeperStorage::Node; using Node = DB::KeeperMemoryStorage::Node;
Node node{}; Node node{};
node.setData(data); node.setData(data);
node.stat.ephemeralOwner = ephemeral_owner; node.stat.ephemeralOwner = ephemeral_owner;
@ -1383,9 +1383,9 @@ TEST_P(CoordinationTest, TestStorageSnapshotSimple)
ChangelogDirTest test("./snapshots"); ChangelogDirTest test("./snapshots");
setSnapshotDirectory("./snapshots"); setSnapshotDirectory("./snapshots");
DB::KeeperSnapshotManager manager(3, keeper_context, params.enable_compression); DB::KeeperSnapshotManager<DB::KeeperMemoryStorage> manager(3, keeper_context, params.enable_compression);
DB::KeeperStorage storage(500, "", keeper_context); DB::KeeperMemoryStorage storage(500, "", keeper_context);
addNode(storage, "/hello", "world", 1); addNode(storage, "/hello", "world", 1);
addNode(storage, "/hello/somepath", "somedata", 3); addNode(storage, "/hello/somepath", "somedata", 3);
storage.session_id_counter = 5; storage.session_id_counter = 5;
@ -1395,7 +1395,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotSimple)
storage.getSessionID(130); storage.getSessionID(130);
storage.getSessionID(130); storage.getSessionID(130);
DB::KeeperStorageSnapshot snapshot(&storage, 2); DB::KeeperStorageSnapshot<DB::KeeperMemoryStorage> snapshot(&storage, 2);
EXPECT_EQ(snapshot.snapshot_meta->get_last_log_idx(), 2); EXPECT_EQ(snapshot.snapshot_meta->get_last_log_idx(), 2);
EXPECT_EQ(snapshot.session_id, 7); EXPECT_EQ(snapshot.session_id, 7);
@ -1433,9 +1433,9 @@ TEST_P(CoordinationTest, TestStorageSnapshotMoreWrites)
ChangelogDirTest test("./snapshots"); ChangelogDirTest test("./snapshots");
setSnapshotDirectory("./snapshots"); setSnapshotDirectory("./snapshots");
DB::KeeperSnapshotManager manager(3, keeper_context, params.enable_compression); DB::KeeperSnapshotManager<DB::KeeperMemoryStorage> manager(3, keeper_context, params.enable_compression);
DB::KeeperStorage storage(500, "", keeper_context); DB::KeeperMemoryStorage storage(500, "", keeper_context);
storage.getSessionID(130); storage.getSessionID(130);
for (size_t i = 0; i < 50; ++i) for (size_t i = 0; i < 50; ++i)
@ -1443,7 +1443,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotMoreWrites)
addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i)); addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i));
} }
DB::KeeperStorageSnapshot snapshot(&storage, 50); DB::KeeperStorageSnapshot<DB::KeeperMemoryStorage> snapshot(&storage, 50);
EXPECT_EQ(snapshot.snapshot_meta->get_last_log_idx(), 50); EXPECT_EQ(snapshot.snapshot_meta->get_last_log_idx(), 50);
EXPECT_EQ(snapshot.snapshot_container_size, 54); EXPECT_EQ(snapshot.snapshot_container_size, 54);
@ -1476,9 +1476,9 @@ TEST_P(CoordinationTest, TestStorageSnapshotManySnapshots)
ChangelogDirTest test("./snapshots"); ChangelogDirTest test("./snapshots");
setSnapshotDirectory("./snapshots"); setSnapshotDirectory("./snapshots");
DB::KeeperSnapshotManager manager(3, keeper_context, params.enable_compression); DB::KeeperSnapshotManager<DB::KeeperMemoryStorage> manager(3, keeper_context, params.enable_compression);
DB::KeeperStorage storage(500, "", keeper_context); DB::KeeperMemoryStorage storage(500, "", keeper_context);
storage.getSessionID(130); storage.getSessionID(130);
for (size_t j = 1; j <= 5; ++j) for (size_t j = 1; j <= 5; ++j)
@ -1488,7 +1488,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotManySnapshots)
addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i)); addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i));
} }
DB::KeeperStorageSnapshot snapshot(&storage, j * 50); DB::KeeperStorageSnapshot<DB::KeeperMemoryStorage> snapshot(&storage, j * 50);
auto buf = manager.serializeSnapshotToBuffer(snapshot); auto buf = manager.serializeSnapshotToBuffer(snapshot);
manager.serializeSnapshotBufferToDisk(*buf, j * 50); manager.serializeSnapshotBufferToDisk(*buf, j * 50);
EXPECT_TRUE(fs::exists(std::string{"./snapshots/snapshot_"} + std::to_string(j * 50) + ".bin" + params.extension)); EXPECT_TRUE(fs::exists(std::string{"./snapshots/snapshot_"} + std::to_string(j * 50) + ".bin" + params.extension));
@ -1517,15 +1517,15 @@ TEST_P(CoordinationTest, TestStorageSnapshotMode)
ChangelogDirTest test("./snapshots"); ChangelogDirTest test("./snapshots");
setSnapshotDirectory("./snapshots"); setSnapshotDirectory("./snapshots");
DB::KeeperSnapshotManager manager(3, keeper_context, params.enable_compression); DB::KeeperSnapshotManager<DB::KeeperMemoryStorage> manager(3, keeper_context, params.enable_compression);
DB::KeeperStorage storage(500, "", keeper_context); DB::KeeperMemoryStorage storage(500, "", keeper_context);
for (size_t i = 0; i < 50; ++i) for (size_t i = 0; i < 50; ++i)
{ {
addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i)); addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i));
} }
{ {
DB::KeeperStorageSnapshot snapshot(&storage, 50); DB::KeeperStorageSnapshot<DB::KeeperMemoryStorage> snapshot(&storage, 50);
for (size_t i = 0; i < 50; ++i) for (size_t i = 0; i < 50; ++i)
{ {
addNode(storage, "/hello_" + std::to_string(i), "wlrd_" + std::to_string(i)); addNode(storage, "/hello_" + std::to_string(i), "wlrd_" + std::to_string(i));
@ -1571,14 +1571,14 @@ TEST_P(CoordinationTest, TestStorageSnapshotBroken)
ChangelogDirTest test("./snapshots"); ChangelogDirTest test("./snapshots");
setSnapshotDirectory("./snapshots"); setSnapshotDirectory("./snapshots");
DB::KeeperSnapshotManager manager(3, keeper_context, params.enable_compression); DB::KeeperSnapshotManager<DB::KeeperMemoryStorage> manager(3, keeper_context, params.enable_compression);
DB::KeeperStorage storage(500, "", keeper_context); DB::KeeperMemoryStorage storage(500, "", keeper_context);
for (size_t i = 0; i < 50; ++i) for (size_t i = 0; i < 50; ++i)
{ {
addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i)); addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i));
} }
{ {
DB::KeeperStorageSnapshot snapshot(&storage, 50); DB::KeeperStorageSnapshot<DB::KeeperMemoryStorage> snapshot(&storage, 50);
auto buf = manager.serializeSnapshotToBuffer(snapshot); auto buf = manager.serializeSnapshotToBuffer(snapshot);
manager.serializeSnapshotBufferToDisk(*buf, 50); manager.serializeSnapshotBufferToDisk(*buf, 50);
} }
@ -1602,7 +1602,7 @@ nuraft::ptr<nuraft::buffer> getBufferFromZKRequest(int64_t session_id, int64_t z
auto time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); auto time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
DB::writeIntBinary(time, buf); DB::writeIntBinary(time, buf);
DB::writeIntBinary(zxid, buf); DB::writeIntBinary(zxid, buf);
DB::writeIntBinary(DB::KeeperStorage::DigestVersion::NO_DIGEST, buf); DB::writeIntBinary(DB::KeeperMemoryStorage::DigestVersion::NO_DIGEST, buf);
return buf.getBuffer(); return buf.getBuffer();
} }
@ -1629,7 +1629,7 @@ void testLogAndStateMachine(
ResponsesQueue queue(std::numeric_limits<size_t>::max()); ResponsesQueue queue(std::numeric_limits<size_t>::max());
SnapshotsQueue snapshots_queue{1}; SnapshotsQueue snapshots_queue{1};
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, settings, keeper_context, nullptr); auto state_machine = std::make_shared<KeeperStateMachine<DB::KeeperMemoryStorage>>(queue, snapshots_queue, settings, keeper_context, nullptr);
state_machine->init(); state_machine->init();
DB::KeeperLogStore changelog( DB::KeeperLogStore changelog(
DB::LogFileSettings{ DB::LogFileSettings{
@ -1672,7 +1672,7 @@ void testLogAndStateMachine(
} }
SnapshotsQueue snapshots_queue1{1}; SnapshotsQueue snapshots_queue1{1};
auto restore_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue1, settings, keeper_context, nullptr); auto restore_machine = std::make_shared<KeeperStateMachine<DB::KeeperMemoryStorage>>(queue, snapshots_queue1, settings, keeper_context, nullptr);
restore_machine->init(); restore_machine->init();
EXPECT_EQ(restore_machine->last_commit_index(), total_logs - total_logs % settings->snapshot_distance); EXPECT_EQ(restore_machine->last_commit_index(), total_logs - total_logs % settings->snapshot_distance);
@ -1791,7 +1791,7 @@ TEST_P(CoordinationTest, TestEphemeralNodeRemove)
ResponsesQueue queue(std::numeric_limits<size_t>::max()); ResponsesQueue queue(std::numeric_limits<size_t>::max());
SnapshotsQueue snapshots_queue{1}; SnapshotsQueue snapshots_queue{1};
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, settings, keeper_context, nullptr); auto state_machine = std::make_shared<KeeperStateMachine<DB::KeeperMemoryStorage>>(queue, snapshots_queue, settings, keeper_context, nullptr);
state_machine->init(); state_machine->init();
std::shared_ptr<ZooKeeperCreateRequest> request_c = std::make_shared<ZooKeeperCreateRequest>(); std::shared_ptr<ZooKeeperCreateRequest> request_c = std::make_shared<ZooKeeperCreateRequest>();
@ -1825,11 +1825,11 @@ TEST_P(CoordinationTest, TestCreateNodeWithAuthSchemeForAclWhenAuthIsPrecommitte
ResponsesQueue queue(std::numeric_limits<size_t>::max()); ResponsesQueue queue(std::numeric_limits<size_t>::max());
SnapshotsQueue snapshots_queue{1}; SnapshotsQueue snapshots_queue{1};
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, settings, keeper_context, nullptr); auto state_machine = std::make_shared<KeeperStateMachine<DB::KeeperMemoryStorage>>(queue, snapshots_queue, settings, keeper_context, nullptr);
state_machine->init(); state_machine->init();
String user_auth_data = "test_user:test_password"; String user_auth_data = "test_user:test_password";
String digest = KeeperStorage::generateDigest(user_auth_data); String digest = KeeperMemoryStorage::generateDigest(user_auth_data);
std::shared_ptr<ZooKeeperAuthRequest> auth_req = std::make_shared<ZooKeeperAuthRequest>(); std::shared_ptr<ZooKeeperAuthRequest> auth_req = std::make_shared<ZooKeeperAuthRequest>();
auth_req->scheme = "digest"; auth_req->scheme = "digest";
@ -1877,11 +1877,11 @@ TEST_P(CoordinationTest, TestSetACLWithAuthSchemeForAclWhenAuthIsPrecommitted)
ResponsesQueue queue(std::numeric_limits<size_t>::max()); ResponsesQueue queue(std::numeric_limits<size_t>::max());
SnapshotsQueue snapshots_queue{1}; SnapshotsQueue snapshots_queue{1};
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, settings, keeper_context, nullptr); auto state_machine = std::make_shared<KeeperStateMachine<DB::KeeperMemoryStorage>>(queue, snapshots_queue, settings, keeper_context, nullptr);
state_machine->init(); state_machine->init();
String user_auth_data = "test_user:test_password"; String user_auth_data = "test_user:test_password";
String digest = KeeperStorage::generateDigest(user_auth_data); String digest = KeeperMemoryStorage::generateDigest(user_auth_data);
std::shared_ptr<ZooKeeperAuthRequest> auth_req = std::make_shared<ZooKeeperAuthRequest>(); std::shared_ptr<ZooKeeperAuthRequest> auth_req = std::make_shared<ZooKeeperAuthRequest>();
auth_req->scheme = "digest"; auth_req->scheme = "digest";
@ -2104,9 +2104,9 @@ TEST_P(CoordinationTest, TestStorageSnapshotDifferentCompressions)
ChangelogDirTest test("./snapshots"); ChangelogDirTest test("./snapshots");
setSnapshotDirectory("./snapshots"); setSnapshotDirectory("./snapshots");
DB::KeeperSnapshotManager manager(3, keeper_context, params.enable_compression); DB::KeeperSnapshotManager<DB::KeeperMemoryStorage> manager(3, keeper_context, params.enable_compression);
DB::KeeperStorage storage(500, "", keeper_context); DB::KeeperMemoryStorage storage(500, "", keeper_context);
addNode(storage, "/hello", "world", 1); addNode(storage, "/hello", "world", 1);
addNode(storage, "/hello/somepath", "somedata", 3); addNode(storage, "/hello/somepath", "somedata", 3);
storage.session_id_counter = 5; storage.session_id_counter = 5;
@ -2116,13 +2116,13 @@ TEST_P(CoordinationTest, TestStorageSnapshotDifferentCompressions)
storage.getSessionID(130); storage.getSessionID(130);
storage.getSessionID(130); storage.getSessionID(130);
DB::KeeperStorageSnapshot snapshot(&storage, 2); DB::KeeperStorageSnapshot<DB::KeeperMemoryStorage> snapshot(&storage, 2);
auto buf = manager.serializeSnapshotToBuffer(snapshot); auto buf = manager.serializeSnapshotToBuffer(snapshot);
manager.serializeSnapshotBufferToDisk(*buf, 2); manager.serializeSnapshotBufferToDisk(*buf, 2);
EXPECT_TRUE(fs::exists("./snapshots/snapshot_2.bin" + params.extension)); EXPECT_TRUE(fs::exists("./snapshots/snapshot_2.bin" + params.extension));
DB::KeeperSnapshotManager new_manager(3, keeper_context, !params.enable_compression); DB::KeeperSnapshotManager<DB::KeeperMemoryStorage> new_manager(3, keeper_context, !params.enable_compression);
auto debuf = new_manager.deserializeSnapshotBufferFromDisk(2); auto debuf = new_manager.deserializeSnapshotBufferFromDisk(2);
@ -2308,9 +2308,9 @@ TEST_P(CoordinationTest, TestStorageSnapshotEqual)
std::optional<UInt128> snapshot_hash; std::optional<UInt128> snapshot_hash;
for (size_t i = 0; i < 15; ++i) for (size_t i = 0; i < 15; ++i)
{ {
DB::KeeperSnapshotManager manager(3, keeper_context, params.enable_compression); DB::KeeperSnapshotManager<DB::KeeperMemoryStorage> manager(3, keeper_context, params.enable_compression);
DB::KeeperStorage storage(500, "", keeper_context); DB::KeeperMemoryStorage storage(500, "", keeper_context);
addNode(storage, "/hello", ""); addNode(storage, "/hello", "");
for (size_t j = 0; j < 5000; ++j) for (size_t j = 0; j < 5000; ++j)
{ {
@ -2326,7 +2326,7 @@ TEST_P(CoordinationTest, TestStorageSnapshotEqual)
for (size_t j = 0; j < 3333; ++j) for (size_t j = 0; j < 3333; ++j)
storage.getSessionID(130 * j); storage.getSessionID(130 * j);
DB::KeeperStorageSnapshot snapshot(&storage, storage.zxid); DB::KeeperStorageSnapshot<DB::KeeperMemoryStorage> snapshot(&storage, storage.zxid);
auto buf = manager.serializeSnapshotToBuffer(snapshot); auto buf = manager.serializeSnapshotToBuffer(snapshot);
@ -2389,7 +2389,7 @@ TEST_P(CoordinationTest, TestUncommittedStateBasicCrud)
using namespace DB; using namespace DB;
using namespace Coordination; using namespace Coordination;
DB::KeeperStorage storage{500, "", keeper_context}; DB::KeeperMemoryStorage storage{500, "", keeper_context};
constexpr std::string_view path = "/test"; constexpr std::string_view path = "/test";
@ -2506,7 +2506,7 @@ TEST_P(CoordinationTest, TestListRequestTypes)
using namespace DB; using namespace DB;
using namespace Coordination; using namespace Coordination;
KeeperStorage storage{500, "", keeper_context}; KeeperMemoryStorage storage{500, "", keeper_context};
int32_t zxid = 0; int32_t zxid = 0;
@ -2660,7 +2660,7 @@ TEST_P(CoordinationTest, TestDurableState)
TEST_P(CoordinationTest, TestFeatureFlags) TEST_P(CoordinationTest, TestFeatureFlags)
{ {
using namespace Coordination; using namespace Coordination;
KeeperStorage storage{500, "", keeper_context}; KeeperMemoryStorage storage{500, "", keeper_context};
auto request = std::make_shared<ZooKeeperGetRequest>(); auto request = std::make_shared<ZooKeeperGetRequest>();
request->path = DB::keeper_api_feature_flags_path; request->path = DB::keeper_api_feature_flags_path;
auto responses = storage.processRequest(request, 0, std::nullopt, true, true); auto responses = storage.processRequest(request, 0, std::nullopt, true, true);
@ -2679,7 +2679,7 @@ TEST_P(CoordinationTest, TestSystemNodeModify)
// On INIT we abort when a system path is modified // On INIT we abort when a system path is modified
keeper_context->setServerState(KeeperContext::Phase::RUNNING); keeper_context->setServerState(KeeperContext::Phase::RUNNING);
KeeperStorage storage{500, "", keeper_context}; KeeperMemoryStorage storage{500, "", keeper_context};
const auto assert_create = [&](const std::string_view path, const auto expected_code) const auto assert_create = [&](const std::string_view path, const auto expected_code)
{ {
auto request = std::make_shared<ZooKeeperCreateRequest>(); auto request = std::make_shared<ZooKeeperCreateRequest>();
@ -2771,7 +2771,7 @@ TEST_P(CoordinationTest, TestCheckNotExistsRequest)
using namespace DB; using namespace DB;
using namespace Coordination; using namespace Coordination;
KeeperStorage storage{500, "", keeper_context}; KeeperMemoryStorage storage{500, "", keeper_context};
int32_t zxid = 0; int32_t zxid = 0;
@ -2850,7 +2850,7 @@ TEST_P(CoordinationTest, TestReapplyingDeltas)
create_request->path = "/test/data"; create_request->path = "/test/data";
create_request->is_sequential = true; create_request->is_sequential = true;
const auto process_create = [](KeeperStorage & storage, const auto & request, int64_t zxid) const auto process_create = [](KeeperMemoryStorage & storage, const auto & request, int64_t zxid)
{ {
storage.preprocessRequest(request, 1, 0, zxid); storage.preprocessRequest(request, 1, 0, zxid);
auto responses = storage.processRequest(request, 1, zxid); auto responses = storage.processRequest(request, 1, zxid);
@ -2871,19 +2871,19 @@ TEST_P(CoordinationTest, TestReapplyingDeltas)
process_create(storage, create_request, zxid); process_create(storage, create_request, zxid);
}; };
KeeperStorage storage1{500, "", keeper_context}; KeeperMemoryStorage storage1{500, "", keeper_context};
commit_initial_data(storage1); commit_initial_data(storage1);
for (int64_t zxid = initial_zxid + 1; zxid < initial_zxid + 50; ++zxid) for (int64_t zxid = initial_zxid + 1; zxid < initial_zxid + 50; ++zxid)
storage1.preprocessRequest(create_request, 1, 0, zxid, /*check_acl=*/true, /*digest=*/std::nullopt, /*log_idx=*/zxid); storage1.preprocessRequest(create_request, 1, 0, zxid, /*check_acl=*/true, /*digest=*/std::nullopt, /*log_idx=*/zxid);
/// create identical new storage /// create identical new storage
KeeperStorage storage2{500, "", keeper_context}; KeeperMemoryStorage storage2{500, "", keeper_context};
commit_initial_data(storage2); commit_initial_data(storage2);
storage1.applyUncommittedState(storage2, initial_zxid); storage1.applyUncommittedState(storage2, initial_zxid);
const auto commit_unprocessed = [&](KeeperStorage & storage) const auto commit_unprocessed = [&](KeeperMemoryStorage & storage)
{ {
for (int64_t zxid = initial_zxid + 1; zxid < initial_zxid + 50; ++zxid) for (int64_t zxid = initial_zxid + 1; zxid < initial_zxid + 50; ++zxid)
{ {
@ -2896,7 +2896,7 @@ TEST_P(CoordinationTest, TestReapplyingDeltas)
commit_unprocessed(storage1); commit_unprocessed(storage1);
commit_unprocessed(storage2); commit_unprocessed(storage2);
const auto get_children = [&](KeeperStorage & storage) const auto get_children = [&](KeeperMemoryStorage & storage)
{ {
const auto list_request = std::make_shared<ZooKeeperListRequest>(); const auto list_request = std::make_shared<ZooKeeperListRequest>();
list_request->path = "/test"; list_request->path = "/test";