From 472a41b2870d68135da3a1460a571c469214a145 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sun, 23 May 2021 20:54:42 +0300 Subject: [PATCH] Superdigest support --- src/Coordination/KeeperServer.cpp | 19 ++++++++++++++++++- src/Coordination/KeeperServer.h | 1 - src/Coordination/KeeperSnapshotManager.cpp | 5 +++-- src/Coordination/KeeperSnapshotManager.h | 3 ++- src/Coordination/KeeperStateMachine.cpp | 12 +++++++++--- src/Coordination/KeeperStateMachine.h | 7 ++++++- src/Coordination/KeeperStorage.cpp | 3 ++- src/Coordination/KeeperStorage.h | 2 +- 8 files changed, 41 insertions(+), 11 deletions(-) diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index ba904a535d0..eca0d05a3f1 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -16,6 +16,7 @@ #include #include #include +#include namespace DB { @@ -26,6 +27,7 @@ namespace ErrorCodes extern const int NO_ELEMENTS_IN_CONFIG; extern const int SUPPORT_IS_DISABLED; extern const int LOGICAL_ERROR; + extern const int INVALID_CONFIG_PARAMETER; } namespace @@ -75,6 +77,20 @@ std::string getSnapshotsPathFromConfig(const Poco::Util::AbstractConfiguration & return std::filesystem::path{config.getString("path", DBMS_DEFAULT_PATH)} / "coordination/snapshots"; } +std::string checkAndGetSuperdigest(const Poco::Util::AbstractConfiguration & config) +{ + if (!config.has("keeper_server.superdigest")) + return ""; + + auto user_and_digest = config.getString("keeper_server.superdigest"); + std::vector scheme_and_id; + boost::split(scheme_and_id, user_and_digest, [](char c) { return c == ':'; }); + if (scheme_and_id.size() != 2 || scheme_and_id[0] != "super") + throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Incorrect superdigest in keeper_server config. Must be 'super:base64string'"); + + return scheme_and_id[1]; +} + } KeeperServer::KeeperServer( @@ -89,7 +105,8 @@ KeeperServer::KeeperServer( , state_machine(nuraft::cs_new( responses_queue_, snapshots_queue_, getSnapshotsPathFromConfig(config, standalone_keeper), - coordination_settings)) + coordination_settings, + checkAndGetSuperdigest(config))) , state_manager(nuraft::cs_new(server_id, "keeper_server", config, coordination_settings, standalone_keeper)) , log(&Poco::Logger::get("KeeperServer")) { diff --git a/src/Coordination/KeeperServer.h b/src/Coordination/KeeperServer.h index 421be331537..282a7b48dfb 100644 --- a/src/Coordination/KeeperServer.h +++ b/src/Coordination/KeeperServer.h @@ -48,7 +48,6 @@ private: void shutdownRaftServer(); - public: KeeperServer( int server_id_, diff --git a/src/Coordination/KeeperSnapshotManager.cpp b/src/Coordination/KeeperSnapshotManager.cpp index 3cfa07fb542..13c8eecbad7 100644 --- a/src/Coordination/KeeperSnapshotManager.cpp +++ b/src/Coordination/KeeperSnapshotManager.cpp @@ -241,9 +241,10 @@ KeeperStorageSnapshot::~KeeperStorageSnapshot() storage->disableSnapshotMode(); } -KeeperSnapshotManager::KeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, size_t storage_tick_time_) +KeeperSnapshotManager::KeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, const std::string & superdigest_, size_t storage_tick_time_) : snapshots_path(snapshots_path_) , snapshots_to_keep(snapshots_to_keep_) + , superdigest(superdigest_) , storage_tick_time(storage_tick_time_) { namespace fs = std::filesystem; @@ -330,7 +331,7 @@ SnapshotMetaAndStorage KeeperSnapshotManager::deserializeSnapshotFromBuffer(nura { ReadBufferFromNuraftBuffer reader(buffer); CompressedReadBuffer compressed_reader(reader); - auto storage = std::make_unique(storage_tick_time); + auto storage = std::make_unique(storage_tick_time, superdigest); auto snapshot_metadata = KeeperStorageSnapshot::deserialize(*storage, compressed_reader); return std::make_pair(snapshot_metadata, std::move(storage)); } diff --git a/src/Coordination/KeeperSnapshotManager.h b/src/Coordination/KeeperSnapshotManager.h index 95d1ce831d4..adc369fd498 100644 --- a/src/Coordination/KeeperSnapshotManager.h +++ b/src/Coordination/KeeperSnapshotManager.h @@ -46,7 +46,7 @@ using SnapshotMetaAndStorage = std::pair; class KeeperSnapshotManager { public: - KeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, size_t storage_tick_time_ = 500); + KeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, const std::string & superdigest_ = "", size_t storage_tick_time_ = 500); SnapshotMetaAndStorage restoreFromLatestSnapshot(); @@ -77,6 +77,7 @@ private: const std::string snapshots_path; const size_t snapshots_to_keep; std::map existing_snapshots; + const std::string superdigest; size_t storage_tick_time; }; diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index b5b393523bf..a76b86a8171 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -36,13 +36,19 @@ KeeperStorage::RequestForSession parseRequest(nuraft::buffer & data) return request_for_session; } -KeeperStateMachine::KeeperStateMachine(ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_) + KeeperStateMachine::KeeperStateMachine( + ResponsesQueue & responses_queue_, + SnapshotsQueue & snapshots_queue_, + const std::string & snapshots_path_, + const CoordinationSettingsPtr & coordination_settings_, + const std::string & superdigest_) : coordination_settings(coordination_settings_) - , snapshot_manager(snapshots_path_, coordination_settings->snapshots_to_keep, coordination_settings->dead_session_check_period_ms.totalMicroseconds()) + , snapshot_manager(snapshots_path_, coordination_settings->snapshots_to_keep, superdigest_, coordination_settings->dead_session_check_period_ms.totalMicroseconds()) , responses_queue(responses_queue_) , snapshots_queue(snapshots_queue_) , last_committed_idx(0) , log(&Poco::Logger::get("KeeperStateMachine")) + , superdigest(superdigest_) { } @@ -85,7 +91,7 @@ void KeeperStateMachine::init() } if (!storage) - storage = std::make_unique(coordination_settings->dead_session_check_period_ms.totalMilliseconds()); + storage = std::make_unique(coordination_settings->dead_session_check_period_ms.totalMilliseconds(), superdigest); } nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data) diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index 8712adaf4b1..fb46f507baf 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -16,7 +16,10 @@ using SnapshotsQueue = ConcurrentBoundedQueue; class KeeperStateMachine : public nuraft::state_machine { public: - KeeperStateMachine(ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_); + KeeperStateMachine( + ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, + const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_, + const std::string & superdigest_ = ""); void init(); @@ -84,6 +87,8 @@ private: /// Last committed Raft log number. std::atomic last_committed_idx; Poco::Logger * log; + + const std::string superdigest; }; } diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index 63272b14a4a..8cc9e157a69 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -148,8 +148,9 @@ static KeeperStorage::ResponsesForSessions processWatchesImpl(const String & pat return result; } -KeeperStorage::KeeperStorage(int64_t tick_time_ms) +KeeperStorage::KeeperStorage(int64_t tick_time_ms, const String & superdigest_) : session_expiry_queue(tick_time_ms) + , superdigest(superdigest_) { container.insert("/", Node()); } diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index f247364f611..7990574c8f1 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -98,7 +98,7 @@ public: const String superdigest; public: - KeeperStorage(int64_t tick_time_ms); + KeeperStorage(int64_t tick_time_ms, const String & superdigest_); int64_t getSessionID(int64_t session_timeout_ms) {