Superdigest support

This commit is contained in:
alesapin 2021-05-23 20:54:42 +03:00
parent bdb52bb643
commit 472a41b287
8 changed files with 41 additions and 11 deletions

View File

@ -16,6 +16,7 @@
#include <string>
#include <filesystem>
#include <Poco/Util/Application.h>
#include <boost/algorithm/string.hpp>
namespace DB
{
@ -26,6 +27,7 @@ namespace ErrorCodes
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int SUPPORT_IS_DISABLED;
extern const int LOGICAL_ERROR;
extern const int INVALID_CONFIG_PARAMETER;
}
namespace
@ -75,6 +77,20 @@ std::string getSnapshotsPathFromConfig(const Poco::Util::AbstractConfiguration &
return std::filesystem::path{config.getString("path", DBMS_DEFAULT_PATH)} / "coordination/snapshots";
}
std::string checkAndGetSuperdigest(const Poco::Util::AbstractConfiguration & config)
{
if (!config.has("keeper_server.superdigest"))
return "";
auto user_and_digest = config.getString("keeper_server.superdigest");
std::vector<std::string> scheme_and_id;
boost::split(scheme_and_id, user_and_digest, [](char c) { return c == ':'; });
if (scheme_and_id.size() != 2 || scheme_and_id[0] != "super")
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Incorrect superdigest in keeper_server config. Must be 'super:base64string'");
return scheme_and_id[1];
}
}
KeeperServer::KeeperServer(
@ -89,7 +105,8 @@ KeeperServer::KeeperServer(
, state_machine(nuraft::cs_new<KeeperStateMachine>(
responses_queue_, snapshots_queue_,
getSnapshotsPathFromConfig(config, standalone_keeper),
coordination_settings))
coordination_settings,
checkAndGetSuperdigest(config)))
, state_manager(nuraft::cs_new<KeeperStateManager>(server_id, "keeper_server", config, coordination_settings, standalone_keeper))
, log(&Poco::Logger::get("KeeperServer"))
{

View File

@ -48,7 +48,6 @@ private:
void shutdownRaftServer();
public:
KeeperServer(
int server_id_,

View File

@ -241,9 +241,10 @@ KeeperStorageSnapshot::~KeeperStorageSnapshot()
storage->disableSnapshotMode();
}
KeeperSnapshotManager::KeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, size_t storage_tick_time_)
KeeperSnapshotManager::KeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, const std::string & superdigest_, size_t storage_tick_time_)
: snapshots_path(snapshots_path_)
, snapshots_to_keep(snapshots_to_keep_)
, superdigest(superdigest_)
, storage_tick_time(storage_tick_time_)
{
namespace fs = std::filesystem;
@ -330,7 +331,7 @@ SnapshotMetaAndStorage KeeperSnapshotManager::deserializeSnapshotFromBuffer(nura
{
ReadBufferFromNuraftBuffer reader(buffer);
CompressedReadBuffer compressed_reader(reader);
auto storage = std::make_unique<KeeperStorage>(storage_tick_time);
auto storage = std::make_unique<KeeperStorage>(storage_tick_time, superdigest);
auto snapshot_metadata = KeeperStorageSnapshot::deserialize(*storage, compressed_reader);
return std::make_pair(snapshot_metadata, std::move(storage));
}

View File

@ -46,7 +46,7 @@ using SnapshotMetaAndStorage = std::pair<SnapshotMetadataPtr, KeeperStoragePtr>;
class KeeperSnapshotManager
{
public:
KeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, size_t storage_tick_time_ = 500);
KeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, const std::string & superdigest_ = "", size_t storage_tick_time_ = 500);
SnapshotMetaAndStorage restoreFromLatestSnapshot();
@ -77,6 +77,7 @@ private:
const std::string snapshots_path;
const size_t snapshots_to_keep;
std::map<uint64_t, std::string> existing_snapshots;
const std::string superdigest;
size_t storage_tick_time;
};

View File

@ -36,13 +36,19 @@ KeeperStorage::RequestForSession parseRequest(nuraft::buffer & data)
return request_for_session;
}
KeeperStateMachine::KeeperStateMachine(ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_)
KeeperStateMachine::KeeperStateMachine(
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_,
const std::string & snapshots_path_,
const CoordinationSettingsPtr & coordination_settings_,
const std::string & superdigest_)
: coordination_settings(coordination_settings_)
, snapshot_manager(snapshots_path_, coordination_settings->snapshots_to_keep, coordination_settings->dead_session_check_period_ms.totalMicroseconds())
, snapshot_manager(snapshots_path_, coordination_settings->snapshots_to_keep, superdigest_, coordination_settings->dead_session_check_period_ms.totalMicroseconds())
, responses_queue(responses_queue_)
, snapshots_queue(snapshots_queue_)
, last_committed_idx(0)
, log(&Poco::Logger::get("KeeperStateMachine"))
, superdigest(superdigest_)
{
}
@ -85,7 +91,7 @@ void KeeperStateMachine::init()
}
if (!storage)
storage = std::make_unique<KeeperStorage>(coordination_settings->dead_session_check_period_ms.totalMilliseconds());
storage = std::make_unique<KeeperStorage>(coordination_settings->dead_session_check_period_ms.totalMilliseconds(), superdigest);
}
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data)

View File

@ -16,7 +16,10 @@ using SnapshotsQueue = ConcurrentBoundedQueue<CreateSnapshotTask>;
class KeeperStateMachine : public nuraft::state_machine
{
public:
KeeperStateMachine(ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_);
KeeperStateMachine(
ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_,
const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_,
const std::string & superdigest_ = "");
void init();
@ -84,6 +87,8 @@ private:
/// Last committed Raft log number.
std::atomic<uint64_t> last_committed_idx;
Poco::Logger * log;
const std::string superdigest;
};
}

View File

@ -148,8 +148,9 @@ static KeeperStorage::ResponsesForSessions processWatchesImpl(const String & pat
return result;
}
KeeperStorage::KeeperStorage(int64_t tick_time_ms)
KeeperStorage::KeeperStorage(int64_t tick_time_ms, const String & superdigest_)
: session_expiry_queue(tick_time_ms)
, superdigest(superdigest_)
{
container.insert("/", Node());
}

View File

@ -98,7 +98,7 @@ public:
const String superdigest;
public:
KeeperStorage(int64_t tick_time_ms);
KeeperStorage(int64_t tick_time_ms, const String & superdigest_);
int64_t getSessionID(int64_t session_timeout_ms)
{