Merge pull request #28080 from ClickHouse/some_renames_in_keeper

Clickhouse-keeper: renames and comments
This commit is contained in:
alesapin 2021-08-26 10:16:21 +03:00 committed by GitHub
commit 47b995467e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 308 additions and 148 deletions

View File

@ -359,7 +359,7 @@ int Keeper::main(const std::vector<std::string> & /*args*/)
auto servers = std::make_shared<std::vector<ProtocolServerAdapter>>();
/// Initialize test keeper RAFT. Do nothing if no nu_keeper_server in config.
global_context->initializeKeeperStorageDispatcher();
global_context->initializeKeeperDispatcher();
for (const auto & listen_host : listen_hosts)
{
/// TCP Keeper
@ -428,7 +428,7 @@ int Keeper::main(const std::vector<std::string> & /*args*/)
else
LOG_INFO(log, "Closed connections to Keeper.");
global_context->shutdownKeeperStorageDispatcher();
global_context->shutdownKeeperDispatcher();
/// Wait server pool to avoid use-after-free of destroyed context in the handlers
server_pool.joinAll();

View File

@ -996,7 +996,7 @@ if (ThreadFuzzer::instance().isEffective())
{
#if USE_NURAFT
/// Initialize test keeper RAFT. Do nothing if no nu_keeper_server in config.
global_context->initializeKeeperStorageDispatcher();
global_context->initializeKeeperDispatcher();
for (const auto & listen_host : listen_hosts)
{
/// TCP Keeper
@ -1079,7 +1079,7 @@ if (ThreadFuzzer::instance().isEffective())
else
LOG_INFO(log, "Closed connections to servers for tables.");
global_context->shutdownKeeperStorageDispatcher();
global_context->shutdownKeeperDispatcher();
}
/// Wait server pool to avoid use-after-free of destroyed context in the handlers

View File

@ -165,10 +165,11 @@ public:
while (!read_buf.eof())
{
result.last_position = read_buf.count();
/// Read checksum
Checksum record_checksum;
readIntBinary(record_checksum, read_buf);
/// Initialization is required, otherwise checksums may fail
/// Read header
ChangelogRecord record;
readIntBinary(record.header.version, read_buf);
readIntBinary(record.header.index, read_buf);
@ -179,6 +180,7 @@ public:
if (record.header.version > CURRENT_CHANGELOG_VERSION)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported changelog version {} on path {}", record.header.version, filepath);
/// Read data
if (record.header.blob_size != 0)
{
auto buffer = nuraft::buffer::alloc(record.header.blob_size);
@ -189,11 +191,13 @@ public:
else
record.blob = nullptr;
/// Check changelog integrity
if (previous_index != 0 && previous_index + 1 != record.header.index)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Previous log entry {}, next log entry {}, seems like some entries skipped", previous_index, record.header.index);
previous_index = record.header.index;
/// Compare checksums
Checksum checksum = computeRecordChecksum(record);
if (checksum != record_checksum)
{
@ -202,22 +206,25 @@ public:
filepath, record.header.version, record.header.index, record.header.blob_size);
}
/// Check for duplicated changelog ids
if (logs.count(record.header.index) != 0)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Duplicated index id {} in log {}", record.header.index, filepath);
result.entries_read += 1;
/// Read but skip this entry because our state is already more fresh
if (record.header.index < start_log_index)
{
continue;
}
/// Create log entry for read data
auto log_entry = nuraft::cs_new<nuraft::log_entry>(record.header.term, record.blob, record.header.value_type);
if (result.first_read_index == 0)
result.first_read_index = record.header.index;
/// Put it into in memory structure
logs.emplace(record.header.index, log_entry);
index_to_offset[record.header.index] = result.last_position;
if (result.entries_read % 50000 == 0)
LOG_TRACE(log, "Reading changelog from path {}, entries {}", filepath, result.entries_read);
}
@ -235,6 +242,7 @@ public:
result.error = true;
tryLogCurrentException(log);
}
LOG_TRACE(log, "Totally read from changelog {} {} entries", filepath, result.entries_read);
return result;
@ -255,6 +263,7 @@ Changelog::Changelog(
, force_sync(force_sync_)
, log(log_)
{
/// Load all files in changelog directory
namespace fs = std::filesystem;
if (!fs::exists(changelogs_dir))
fs::create_directories(changelogs_dir);
@ -264,23 +273,35 @@ Changelog::Changelog(
auto file_description = getChangelogFileDescription(p.path());
existing_changelogs[file_description.from_log_index] = file_description;
}
if (existing_changelogs.empty())
LOG_WARNING(log, "No logs exists in {}. It's Ok if it's the first run of clickhouse-keeper.", changelogs_dir);
}
void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uint64_t logs_to_keep)
{
uint64_t total_read = 0;
/// Amount of entries in last log index
uint64_t entries_in_last = 0;
uint64_t incomplete_log_index = 0;
/// Log idx of the first incomplete log (key in existing_changelogs)
uint64_t first_incomplete_log_start_index = 0;
ChangelogReadResult result{};
/// First log index which was read from all changelogs
uint64_t first_read_index = 0;
uint64_t start_to_read_from = last_commited_log_index;
if (start_to_read_from > logs_to_keep)
start_to_read_from -= logs_to_keep;
else
start_to_read_from = 1;
/// At least we read something
bool started = false;
for (const auto & [changelog_start_index, changelog_description] : existing_changelogs)
{
entries_in_last = changelog_description.to_log_index - changelog_description.from_log_index + 1;
@ -292,7 +313,7 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
if (changelog_description.from_log_index > last_commited_log_index && (changelog_description.from_log_index - last_commited_log_index) > 1)
{
LOG_ERROR(log, "Some records was lost, last committed log index {}, smallest available log index on disk {}. Hopefully will receive missing records from leader.", last_commited_log_index, changelog_description.from_log_index);
incomplete_log_index = changelog_start_index;
first_incomplete_log_start_index = changelog_start_index;
break;
}
else if (changelog_description.from_log_index > start_to_read_from)
@ -311,7 +332,7 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
/// May happen after truncate, crash or simply unfinished log
if (result.entries_read < entries_in_last)
{
incomplete_log_index = changelog_start_index;
first_incomplete_log_start_index = changelog_start_index;
break;
}
}
@ -322,11 +343,13 @@ void Changelog::readChangelogAndInitWriter(uint64_t last_commited_log_index, uin
else
start_index = last_commited_log_index;
if (incomplete_log_index != 0)
/// Found some broken or non finished logs
/// We have to remove broken data and continue to write into incomplete log.
if (first_incomplete_log_start_index != 0)
{
auto start_remove_from = existing_changelogs.begin();
if (started)
start_remove_from = existing_changelogs.upper_bound(incomplete_log_index);
start_remove_from = existing_changelogs.upper_bound(first_incomplete_log_start_index);
/// All subsequent logs shouldn't exist. But they may exist if we crashed after writeAt started. Remove them.
for (auto itr = start_remove_from; itr != existing_changelogs.end();)
@ -363,6 +386,7 @@ void Changelog::rotate(uint64_t new_start_log_index)
/// Flush previous log
flush();
/// Start new one
ChangelogFileDescription new_description;
new_description.prefix = DEFAULT_PREFIX;
new_description.from_log_index = new_start_log_index;
@ -378,7 +402,7 @@ void Changelog::rotate(uint64_t new_start_log_index)
ChangelogRecord Changelog::buildRecord(uint64_t index, const LogEntryPtr & log_entry)
{
ChangelogRecord record;
record.header.version = ChangelogVersion::V0;
record.header.version = ChangelogVersion::V1;
record.header.index = index;
record.header.term = log_entry->get_term();
record.header.value_type = log_entry->get_val_type();
@ -416,7 +440,9 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
if (index_to_start_pos.count(index) == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot write at index {} because changelog doesn't contain it", index);
/// Complex case when we need to override data from already rotated log
bool go_to_previous_file = index < current_writer->getStartIndex();
if (go_to_previous_file)
{
auto index_changelog = existing_changelogs.lower_bound(index);
@ -450,6 +476,7 @@ void Changelog::writeAt(uint64_t index, const LogEntryPtr & log_entry)
auto log_itr = logs.find(i);
if (log_itr == logs.end())
break;
logs.erase(log_itr);
index_to_start_pos.erase(i);
entries_written--;
@ -467,7 +494,6 @@ void Changelog::compact(uint64_t up_to_log_index)
/// Remove all completely outdated changelog files
if (itr->second.to_log_index <= up_to_log_index)
{
LOG_INFO(log, "Removing changelog {} because of compaction", itr->second.path);
std::erase_if(index_to_start_pos, [right_index = itr->second.to_log_index] (const auto & item) { return item.first <= right_index; });
std::filesystem::remove(itr->second.path);
@ -482,6 +508,7 @@ void Changelog::compact(uint64_t up_to_log_index)
LogEntryPtr Changelog::getLastEntry() const
{
/// This entry treaded in special way by NuRaft
static LogEntryPtr fake_entry = nuraft::cs_new<nuraft::log_entry>(0, nuraft::buffer::alloc(sizeof(uint64_t)));
uint64_t next_index = getNextEntryIndex() - 1;

View File

@ -58,8 +58,8 @@ struct ChangelogFileDescription
class ChangelogWriter;
/// Simplest changelog with files rotation.
/// No compression, no metadata, just entries with headers one by one
/// Able to read broken files/entries and discard them.
/// No compression, no metadata, just entries with headers one by one.
/// Able to read broken files/entries and discard them. Not thread safe.
class Changelog
{

View File

@ -1,4 +1,4 @@
#include <Coordination/KeeperStorageDispatcher.h>
#include <Coordination/KeeperDispatcher.h>
#include <Common/setThreadName.h>
#include <Common/ZooKeeper/KeeperException.h>
#include <future>
@ -9,19 +9,18 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int TIMEOUT_EXCEEDED;
}
KeeperStorageDispatcher::KeeperStorageDispatcher()
KeeperDispatcher::KeeperDispatcher()
: coordination_settings(std::make_shared<CoordinationSettings>())
, log(&Poco::Logger::get("KeeperDispatcher"))
{
}
void KeeperStorageDispatcher::requestThread()
void KeeperDispatcher::requestThread()
{
setThreadName("KeeperReqT");
@ -133,7 +132,7 @@ void KeeperStorageDispatcher::requestThread()
}
}
void KeeperStorageDispatcher::responseThread()
void KeeperDispatcher::responseThread()
{
setThreadName("KeeperRspT");
while (!shutdown_called)
@ -159,7 +158,7 @@ void KeeperStorageDispatcher::responseThread()
}
}
void KeeperStorageDispatcher::snapshotThread()
void KeeperDispatcher::snapshotThread()
{
setThreadName("KeeperSnpT");
while (!shutdown_called)
@ -181,9 +180,11 @@ void KeeperStorageDispatcher::snapshotThread()
}
}
void KeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)
void KeeperDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)
{
std::lock_guard lock(session_to_response_callback_mutex);
/// Special new session response.
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::SessionID)
{
const Coordination::ZooKeeperSessionIDResponse & session_id_resp = dynamic_cast<const Coordination::ZooKeeperSessionIDResponse &>(*response);
@ -196,25 +197,28 @@ void KeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination
callback(response);
new_session_id_response_callback.erase(session_id_resp.internal_id);
}
else
else /// Normal response, just write to client
{
auto session_writer = session_to_response_callback.find(session_id);
if (session_writer == session_to_response_callback.end())
auto session_response_callback = session_to_response_callback.find(session_id);
/// Session was disconnected, just skip this response
if (session_response_callback == session_to_response_callback.end())
return;
session_writer->second(response);
session_response_callback->second(response);
/// Session closed, no more writes
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close)
{
session_to_response_callback.erase(session_writer);
session_to_response_callback.erase(session_response_callback);
}
}
}
bool KeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
{
{
/// If session was already disconnected than we will ignore requests
std::lock_guard lock(session_to_response_callback_mutex);
if (session_to_response_callback.count(session_id) == 0)
return false;
@ -237,7 +241,7 @@ bool KeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr
return true;
}
void KeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper)
void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper)
{
LOG_DEBUG(log, "Initializing storage dispatcher");
int myid = config.getInt("keeper_server.server_id");
@ -251,6 +255,7 @@ void KeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfiguration
server = std::make_unique<KeeperServer>(
myid, coordination_settings, config, responses_queue, snapshots_queue, standalone_keeper);
try
{
LOG_DEBUG(log, "Waiting server to initialize");
@ -266,13 +271,13 @@ void KeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfiguration
throw;
}
/// Start it after keeper server start
session_cleaner_thread = ThreadFromGlobalPool([this] { sessionCleanerTask(); });
LOG_DEBUG(log, "Dispatcher initialized");
}
void KeeperStorageDispatcher::shutdown()
void KeeperDispatcher::shutdown()
{
try
{
@ -306,6 +311,8 @@ void KeeperStorageDispatcher::shutdown()
server->shutdown();
KeeperStorage::RequestForSession request_for_session;
/// Set session expired for all pending requests
while (requests_queue->tryPop(request_for_session))
{
if (request_for_session.request)
@ -320,6 +327,7 @@ void KeeperStorageDispatcher::shutdown()
}
}
/// Clear all registered sessions
std::lock_guard lock(session_to_response_callback_mutex);
session_to_response_callback.clear();
}
@ -331,19 +339,19 @@ void KeeperStorageDispatcher::shutdown()
LOG_DEBUG(log, "Dispatcher shut down");
}
KeeperStorageDispatcher::~KeeperStorageDispatcher()
KeeperDispatcher::~KeeperDispatcher()
{
shutdown();
}
void KeeperStorageDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCallback callback)
void KeeperDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCallback callback)
{
std::lock_guard lock(session_to_response_callback_mutex);
if (!session_to_response_callback.try_emplace(session_id, callback).second)
throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session with id {} already registered in dispatcher", session_id);
}
void KeeperStorageDispatcher::sessionCleanerTask()
void KeeperDispatcher::sessionCleanerTask()
{
while (true)
{
@ -352,12 +360,16 @@ void KeeperStorageDispatcher::sessionCleanerTask()
try
{
/// Only leader node must check dead sessions
if (isLeader())
{
auto dead_sessions = server->getDeadSessions();
for (int64_t dead_session : dead_sessions)
{
LOG_INFO(log, "Found dead session {}, will try to close it", dead_session);
/// Close session == send close request to raft server
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
request->xid = Coordination::CLOSE_XID;
KeeperStorage::RequestForSession request_info;
@ -367,6 +379,8 @@ void KeeperStorageDispatcher::sessionCleanerTask()
std::lock_guard lock(push_request_mutex);
requests_queue->push(std::move(request_info));
}
/// Remove session from registered sessions
finishSession(dead_session);
LOG_INFO(log, "Dead session close request pushed");
}
@ -381,7 +395,7 @@ void KeeperStorageDispatcher::sessionCleanerTask()
}
}
void KeeperStorageDispatcher::finishSession(int64_t session_id)
void KeeperDispatcher::finishSession(int64_t session_id)
{
std::lock_guard lock(session_to_response_callback_mutex);
auto session_it = session_to_response_callback.find(session_id);
@ -389,7 +403,7 @@ void KeeperStorageDispatcher::finishSession(int64_t session_id)
session_to_response_callback.erase(session_it);
}
void KeeperStorageDispatcher::addErrorResponses(const KeeperStorage::RequestsForSessions & requests_for_sessions, Coordination::Error error)
void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSessions & requests_for_sessions, Coordination::Error error)
{
for (const auto & [session_id, request] : requests_for_sessions)
{
@ -402,7 +416,7 @@ void KeeperStorageDispatcher::addErrorResponses(const KeeperStorage::RequestsFor
}
}
void KeeperStorageDispatcher::forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions)
void KeeperDispatcher::forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions)
{
if (!result->has_result())
result->get();
@ -417,10 +431,14 @@ void KeeperStorageDispatcher::forceWaitAndProcessResult(RaftAppendResult & resul
requests_for_sessions.clear();
}
int64_t KeeperStorageDispatcher::getSessionID(int64_t session_timeout_ms)
int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
{
/// New session id allocation is a special request, because we cannot process it in normal
/// way: get request -> put to raft -> set response for registered callback.
KeeperStorage::RequestForSession request_info;
std::shared_ptr<Coordination::ZooKeeperSessionIDRequest> request = std::make_shared<Coordination::ZooKeeperSessionIDRequest>();
/// Internal session id. It's a temporary number which is unique for each client on this server
/// but can be same on different servers.
request->internal_id = internal_session_id_counter.fetch_add(1);
request->session_timeout_ms = session_timeout_ms;
request->server_id = server->getServerID();
@ -430,6 +448,7 @@ int64_t KeeperStorageDispatcher::getSessionID(int64_t session_timeout_ms)
auto promise = std::make_shared<std::promise<int64_t>>();
auto future = promise->get_future();
{
std::lock_guard lock(session_to_response_callback_mutex);
new_session_id_response_callback[request->internal_id] = [promise, internal_id = request->internal_id] (const Coordination::ZooKeeperResponsePtr & response)
@ -452,6 +471,7 @@ int64_t KeeperStorageDispatcher::getSessionID(int64_t session_timeout_ms)
};
}
/// Push new session request to queue
{
std::lock_guard lock(push_request_mutex);
if (!requests_queue->tryPush(std::move(request_info), session_timeout_ms))
@ -461,6 +481,8 @@ int64_t KeeperStorageDispatcher::getSessionID(int64_t session_timeout_ms)
if (future.wait_for(std::chrono::milliseconds(session_timeout_ms)) != std::future_status::ready)
throw Exception("Cannot receive session id within session timeout", ErrorCodes::TIMEOUT_EXCEEDED);
/// Forcefully wait for request execution because we cannot process any other
/// requests for this client until it get new session id.
return future.get();
}

View File

@ -22,7 +22,9 @@ namespace DB
using ZooKeeperResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr & response)>;
class KeeperStorageDispatcher
/// Highlevel wrapper for ClickHouse Keeper.
/// Process user requests via consensus and return responses.
class KeeperDispatcher
{
private:
@ -45,6 +47,7 @@ private:
/// (get, set, list, etc.). Dispatcher determines callback for each response
/// using session id from this map.
SessionToResponseCallback session_to_response_callback;
/// But when client connects to the server for the first time it doesn't
/// have session_id. It request it from server. We give temporary
/// internal id for such requests just to much client with its response.
@ -60,7 +63,7 @@ private:
/// Dumping new snapshots to disk
ThreadFromGlobalPool snapshot_thread;
/// RAFT wrapper. Most important class.
/// RAFT wrapper.
std::unique_ptr<KeeperServer> server;
Poco::Logger * log;
@ -69,10 +72,15 @@ private:
std::atomic<int64_t> internal_session_id_counter{0};
private:
/// Thread put requests to raft
void requestThread();
/// Thread put responses for subscribed sessions
void responseThread();
/// Thread clean disconnected sessions from memory
void sessionCleanerTask();
/// Thread create snapshots in the background
void snapshotThread();
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
/// Add error responses for requests to responses queue.
@ -84,16 +92,23 @@ private:
void forceWaitAndProcessResult(RaftAppendResult & result, KeeperStorage::RequestsForSessions & requests_for_sessions);
public:
KeeperStorageDispatcher();
/// Just allocate some objects, real initialization is done by `intialize method`
KeeperDispatcher();
/// Call shutdown
~KeeperDispatcher();
/// Initialization from config.
/// standalone_keeper -- we are standalone keeper application (not inside clickhouse server)
void initialize(const Poco::Util::AbstractConfiguration & config, bool standalone_keeper);
/// Shutdown internal keeper parts (server, state machine, log storage, etc)
void shutdown();
~KeeperStorageDispatcher();
/// Put request to ClickHouse Keeper
bool putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id);
/// Are we leader
bool isLeader() const
{
return server->isLeader();
@ -104,9 +119,12 @@ public:
return server->isLeaderAlive();
}
/// Get new session ID
int64_t getSessionID(int64_t session_timeout_ms);
/// Register session and subscribe for responses with callback
void registerSession(int64_t session_id, ZooKeeperResponseCallback callback);
/// Call if we don't need any responses for this session no more (session was expired)
void finishSession(int64_t session_id);
};

View File

@ -9,39 +9,53 @@
namespace DB
{
/// Wrapper around Changelog class. Implements RAFT log storage.
class KeeperLogStore : public nuraft::log_store
{
public:
KeeperLogStore(const std::string & changelogs_path, uint64_t rotate_interval_, bool force_sync_);
/// Read log storage from filesystem starting from last_commited_log_index
void init(uint64_t last_commited_log_index, uint64_t logs_to_keep);
uint64_t start_index() const override;
uint64_t next_slot() const override;
/// return last entry from log
nuraft::ptr<nuraft::log_entry> last_entry() const override;
/// Append new entry to log
uint64_t append(nuraft::ptr<nuraft::log_entry> & entry) override;
/// Remove all entries starting from index and write entry into index position
void write_at(uint64_t index, nuraft::ptr<nuraft::log_entry> & entry) override;
/// Return entries between [start, end)
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> log_entries(uint64_t start, uint64_t end) override;
/// Return entry at index
nuraft::ptr<nuraft::log_entry> entry_at(uint64_t index) override;
/// Term if the index
uint64_t term_at(uint64_t index) override;
/// Serialize entries in interval [index, index + cnt)
nuraft::ptr<nuraft::buffer> pack(uint64_t index, int32_t cnt) override;
/// Apply serialized entries starting from index
void apply_pack(uint64_t index, nuraft::buffer & pack) override;
/// Entries from last_log_index can be removed from memory and from disk
bool compact(uint64_t last_log_index) override;
/// Call fsync to the stored data
bool flush() override;
/// Current log storage size
uint64_t size() const;
/// Flush batch of appended entries
void end_of_append_batch(uint64_t start_index, uint64_t count) override;
private:

View File

@ -38,6 +38,8 @@ private:
Poco::Logger * log;
/// Callback func which is called by NuRaft on all internal events.
/// Used to determine the moment when raft is ready to server new requests
nuraft::cb_func::ReturnCode callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * param);
/// Almost copy-paste from nuraft::launcher, but with separated server init and start
@ -57,18 +59,25 @@ public:
SnapshotsQueue & snapshots_queue_,
bool standalone_keeper);
/// Load state machine from the latest snapshot and load log storage. Start NuRaft with required settings.
void startup();
/// Put local read request and execute in state machine directly and response into
/// responses queue
void putLocalReadRequest(const KeeperStorage::RequestForSession & request);
/// Put batch of requests into Raft and get result of put. Responses will be set separately into
/// responses_queue.
RaftAppendResult putRequestBatch(const KeeperStorage::RequestsForSessions & requests);
/// Return set of the non-active sessions
std::unordered_set<int64_t> getDeadSessions();
bool isLeader() const;
bool isLeaderAlive() const;
/// Wait server initialization (see callbackFunc)
void waitInit();
void shutdown();

View File

@ -14,29 +14,32 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
KeeperStorage::RequestForSession parseRequest(nuraft::buffer & data)
namespace
{
ReadBufferFromNuraftBuffer buffer(data);
KeeperStorage::RequestForSession request_for_session;
readIntBinary(request_for_session.session_id, buffer);
KeeperStorage::RequestForSession parseRequest(nuraft::buffer & data)
{
ReadBufferFromNuraftBuffer buffer(data);
KeeperStorage::RequestForSession request_for_session;
readIntBinary(request_for_session.session_id, buffer);
int32_t length;
Coordination::read(length, buffer);
int32_t length;
Coordination::read(length, buffer);
int32_t xid;
Coordination::read(xid, buffer);
int32_t xid;
Coordination::read(xid, buffer);
Coordination::OpNum opnum;
Coordination::OpNum opnum;
Coordination::read(opnum, buffer);
Coordination::read(opnum, buffer);
request_for_session.request = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
request_for_session.request->xid = xid;
request_for_session.request->readImpl(buffer);
return request_for_session;
request_for_session.request = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
request_for_session.request->xid = xid;
request_for_session.request->readImpl(buffer);
return request_for_session;
}
}
KeeperStateMachine::KeeperStateMachine(
KeeperStateMachine::KeeperStateMachine(
ResponsesQueue & responses_queue_,
SnapshotsQueue & snapshots_queue_,
const std::string & snapshots_path_,
@ -58,6 +61,7 @@ void KeeperStateMachine::init()
LOG_DEBUG(log, "Totally have {} snapshots", snapshot_manager.totalSnapshots());
bool loaded = false;
bool has_snapshots = snapshot_manager.totalSnapshots() != 0;
/// Deserialize latest snapshot from disk
while (snapshot_manager.totalSnapshots() != 0)
{
uint64_t latest_log_index = snapshot_manager.getLatestSnapshotIndex();
@ -97,6 +101,7 @@ void KeeperStateMachine::init()
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, nuraft::buffer & data)
{
auto request_for_session = parseRequest(data);
/// Special processing of session_id request
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
{
const Coordination::ZooKeeperSessionIDRequest & session_id_request = dynamic_cast<const Coordination::ZooKeeperSessionIDRequest &>(*request_for_session.request);
@ -136,7 +141,7 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
{
LOG_DEBUG(log, "Applying snapshot {}", s.get_last_log_idx());
nuraft::ptr<nuraft::buffer> latest_snapshot_ptr;
{
{ /// save snapshot into memory
std::lock_guard lock(snapshots_lock);
if (s.get_last_log_idx() != latest_snapshot_meta->get_last_log_idx())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Required to apply snapshot with last log index {}, but our last log index is {}",
@ -144,10 +149,11 @@ bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
latest_snapshot_ptr = latest_snapshot_buf;
}
{
{ /// deserialize and apply snapshot to storage
std::lock_guard lock(storage_lock);
std::tie(latest_snapshot_meta, storage) = snapshot_manager.deserializeSnapshotFromBuffer(latest_snapshot_ptr);
}
last_committed_idx = s.get_last_log_idx();
return true;
}
@ -168,18 +174,19 @@ void KeeperStateMachine::create_snapshot(
nuraft::ptr<nuraft::buffer> snp_buf = s.serialize();
auto snapshot_meta_copy = nuraft::snapshot::deserialize(*snp_buf);
CreateSnapshotTask snapshot_task;
{
{ /// lock storage for a short period time to turn on "snapshot mode". After that we can read consistent storage state without locking.
std::lock_guard lock(storage_lock);
snapshot_task.snapshot = std::make_shared<KeeperStorageSnapshot>(storage.get(), snapshot_meta_copy);
}
/// create snapshot task for background execution (in snapshot thread)
snapshot_task.create_snapshot = [this, when_done] (KeeperStorageSnapshotPtr && snapshot)
{
nuraft::ptr<std::exception> exception(nullptr);
bool ret = true;
try
{
{
{ /// Read storage data without locks and create snapshot
std::lock_guard lock(snapshots_lock);
auto snapshot_buf = snapshot_manager.serializeSnapshotToBuffer(*snapshot);
auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*snapshot_buf, snapshot->snapshot_meta->get_last_log_idx());
@ -192,6 +199,7 @@ void KeeperStateMachine::create_snapshot(
{
/// Must do it with lock (clearing elements from list)
std::lock_guard lock(storage_lock);
/// Turn off "snapshot mode" and clear outdate part of storage state
storage->clearGarbageAfterSnapshot();
/// Destroy snapshot with lock
snapshot.reset();
@ -209,7 +217,9 @@ void KeeperStateMachine::create_snapshot(
when_done(ret, exception);
};
LOG_DEBUG(log, "In memory snapshot {} created, queueing task to flash to disk", s.get_last_log_idx());
/// Flush snapshot to disk in a separate thread.
snapshots_queue.push(std::move(snapshot_task));
}
@ -224,7 +234,7 @@ void KeeperStateMachine::save_logical_snp_obj(
nuraft::ptr<nuraft::buffer> cloned_buffer;
nuraft::ptr<nuraft::snapshot> cloned_meta;
if (obj_id == 0)
if (obj_id == 0) /// Fake snapshot required by NuRaft at startup
{
std::lock_guard lock(storage_lock);
KeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx());
@ -232,15 +242,18 @@ void KeeperStateMachine::save_logical_snp_obj(
}
else
{
/// copy snapshot into memory
cloned_buffer = nuraft::buffer::clone(data);
}
/// copy snapshot meta into memory
nuraft::ptr<nuraft::buffer> snp_buf = s.serialize();
cloned_meta = nuraft::snapshot::deserialize(*snp_buf);
try
{
std::lock_guard lock(snapshots_lock);
/// Serialize snapshot to disk and switch in memory pointers.
auto result_path = snapshot_manager.serializeSnapshotBufferToDisk(*cloned_buffer, s.get_last_log_idx());
latest_snapshot_buf = cloned_buffer;
latest_snapshot_meta = cloned_meta;
@ -262,7 +275,7 @@ int KeeperStateMachine::read_logical_snp_obj(
{
LOG_DEBUG(log, "Reading snapshot {} obj_id {}", s.get_last_log_idx(), obj_id);
if (obj_id == 0)
if (obj_id == 0) /// Fake snapshot required by NuRaft at startup
{
data_out = nuraft::buffer::alloc(sizeof(int32_t));
nuraft::buffer_serializer bs(data_out);
@ -272,6 +285,8 @@ int KeeperStateMachine::read_logical_snp_obj(
else
{
std::lock_guard lock(snapshots_lock);
/// Our snapshot is not equal to required. Maybe we still creating it in the background.
/// Let's wait and NuRaft will retry this call.
if (s.get_last_log_idx() != latest_snapshot_meta->get_last_log_idx())
{
LOG_WARNING(log, "Required to apply snapshot with last log index {}, but our last log index is {}. Will ignore this one and retry",
@ -281,11 +296,13 @@ int KeeperStateMachine::read_logical_snp_obj(
data_out = nuraft::buffer::clone(*latest_snapshot_buf);
is_last_obj = true;
}
return 1;
}
void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session)
{
/// Pure local request, just process it with storage
KeeperStorage::ResponsesForSessions responses;
{
std::lock_guard lock(storage_lock);

View File

@ -13,6 +13,8 @@ namespace DB
using ResponsesQueue = ThreadSafeQueue<KeeperStorage::ResponseForSession>;
using SnapshotsQueue = ConcurrentBoundedQueue<CreateSnapshotTask>;
/// ClickHouse Keeper state machine. Wrapper for KeeperStorage.
/// Responsible for entries commit, snapshots creation and so on.
class KeeperStateMachine : public nuraft::state_machine
{
public:
@ -21,24 +23,30 @@ public:
const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_,
const std::string & superdigest_ = "");
/// Read state from the latest snapshot
void init();
/// Currently not supported
nuraft::ptr<nuraft::buffer> pre_commit(const uint64_t /*log_idx*/, nuraft::buffer & /*data*/) override { return nullptr; }
nuraft::ptr<nuraft::buffer> commit(const uint64_t log_idx, nuraft::buffer & data) override;
/// Currently not supported
void rollback(const uint64_t /*log_idx*/, nuraft::buffer & /*data*/) override {}
uint64_t last_commit_index() override { return last_committed_idx; }
/// Apply preliminarily saved (save_logical_snp_obj) snapshot to our state.
bool apply_snapshot(nuraft::snapshot & s) override;
nuraft::ptr<nuraft::snapshot> last_snapshot() override;
/// Create new snapshot from current state.
void create_snapshot(
nuraft::snapshot & s,
nuraft::async_result<bool>::handler_type & when_done) override;
/// Save snapshot which was send by leader to us. After that we will apply it in apply_snapshot.
void save_logical_snp_obj(
nuraft::snapshot & s,
uint64_t & obj_id,
@ -46,6 +54,8 @@ public:
bool is_first_obj,
bool is_last_obj) override;
/// Better name is `serialize snapshot` -- save existing snapshot (created by create_snapshot) into
/// in-memory buffer data_out.
int read_logical_snp_obj(
nuraft::snapshot & s,
void* & user_snp_ctx,
@ -58,6 +68,7 @@ public:
return *storage;
}
/// Process local read request
void processReadRequest(const KeeperStorage::RequestForSession & request_for_session);
std::unordered_set<int64_t> getDeadSessions();
@ -66,18 +77,25 @@ public:
private:
/// In our state machine we always have a single snapshot which is stored
/// in memory in compressed (serialized) format.
SnapshotMetadataPtr latest_snapshot_meta = nullptr;
nuraft::ptr<nuraft::buffer> latest_snapshot_buf = nullptr;
CoordinationSettingsPtr coordination_settings;
/// Main state machine logic
KeeperStoragePtr storage;
/// Save/Load and Serialize/Deserialize logic for snapshots.
KeeperSnapshotManager snapshot_manager;
/// Put processed responses into this queue
ResponsesQueue & responses_queue;
/// Snapshots to create by snapshot thread
SnapshotsQueue & snapshots_queue;
/// Mutex for snapshots
std::mutex snapshots_lock;
@ -88,6 +106,7 @@ private:
std::atomic<uint64_t> last_committed_idx;
Poco::Logger * log;
/// Special part of ACL system -- superdigest specified in server config.
const std::string superdigest;
};

View File

@ -177,32 +177,32 @@ KeeperStorage::KeeperStorage(int64_t tick_time_ms, const String & superdigest_)
using Undo = std::function<void()>;
struct KeeperStorageRequest
struct KeeperStorageRequestProcessor
{
Coordination::ZooKeeperRequestPtr zk_request;
explicit KeeperStorageRequest(const Coordination::ZooKeeperRequestPtr & zk_request_)
explicit KeeperStorageRequestProcessor(const Coordination::ZooKeeperRequestPtr & zk_request_)
: zk_request(zk_request_)
{}
virtual std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t session_id) const = 0;
virtual KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & /*watches*/, KeeperStorage::Watches & /*list_watches*/) const { return {}; }
virtual bool checkAuth(KeeperStorage & /*storage*/, int64_t /*session_id*/) const { return true; }
virtual ~KeeperStorageRequest() = default;
virtual ~KeeperStorageRequestProcessor() = default;
};
struct KeeperStorageHeartbeatRequest final : public KeeperStorageRequest
struct KeeperStorageHeartbeatRequestProcessor final : public KeeperStorageRequestProcessor
{
using KeeperStorageRequest::KeeperStorageRequest;
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */) const override
{
return {zk_request->makeResponse(), {}};
}
};
struct KeeperStorageSyncRequest final : public KeeperStorageRequest
struct KeeperStorageSyncRequestProcessor final : public KeeperStorageRequestProcessor
{
using KeeperStorageRequest::KeeperStorageRequest;
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */) const override
{
auto response = zk_request->makeResponse();
@ -212,9 +212,9 @@ struct KeeperStorageSyncRequest final : public KeeperStorageRequest
}
};
struct KeeperStorageCreateRequest final : public KeeperStorageRequest
struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestProcessor
{
using KeeperStorageRequest::KeeperStorageRequest;
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override
{
@ -363,7 +363,7 @@ struct KeeperStorageCreateRequest final : public KeeperStorageRequest
}
};
struct KeeperStorageGetRequest final : public KeeperStorageRequest
struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProcessor
{
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
@ -381,7 +381,7 @@ struct KeeperStorageGetRequest final : public KeeperStorageRequest
return checkACL(Coordination::ACL::Read, node_acls, session_auths);
}
using KeeperStorageRequest::KeeperStorageRequest;
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /* zxid */, int64_t /* session_id */) const override
{
auto & container = storage.container;
@ -423,7 +423,7 @@ namespace
}
}
struct KeeperStorageRemoveRequest final : public KeeperStorageRequest
struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestProcessor
{
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
{
@ -440,7 +440,7 @@ struct KeeperStorageRemoveRequest final : public KeeperStorageRequest
return checkACL(Coordination::ACL::Delete, node_acls, session_auths);
}
using KeeperStorageRequest::KeeperStorageRequest;
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/) const override
{
auto & container = storage.container;
@ -520,9 +520,9 @@ struct KeeperStorageRemoveRequest final : public KeeperStorageRequest
}
};
struct KeeperStorageExistsRequest final : public KeeperStorageRequest
struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestProcessor
{
using KeeperStorageRequest::KeeperStorageRequest;
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /* session_id */) const override
{
auto & container = storage.container;
@ -546,7 +546,7 @@ struct KeeperStorageExistsRequest final : public KeeperStorageRequest
}
};
struct KeeperStorageSetRequest final : public KeeperStorageRequest
struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProcessor
{
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
{
@ -563,7 +563,7 @@ struct KeeperStorageSetRequest final : public KeeperStorageRequest
return checkACL(Coordination::ACL::Write, node_acls, session_auths);
}
using KeeperStorageRequest::KeeperStorageRequest;
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t /* session_id */) const override
{
auto & container = storage.container;
@ -624,7 +624,7 @@ struct KeeperStorageSetRequest final : public KeeperStorageRequest
}
};
struct KeeperStorageListRequest final : public KeeperStorageRequest
struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProcessor
{
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
{
@ -641,7 +641,7 @@ struct KeeperStorageListRequest final : public KeeperStorageRequest
return checkACL(Coordination::ACL::Read, node_acls, session_auths);
}
using KeeperStorageRequest::KeeperStorageRequest;
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/) const override
{
auto & container = storage.container;
@ -669,7 +669,7 @@ struct KeeperStorageListRequest final : public KeeperStorageRequest
}
};
struct KeeperStorageCheckRequest final : public KeeperStorageRequest
struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestProcessor
{
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
{
@ -686,7 +686,7 @@ struct KeeperStorageCheckRequest final : public KeeperStorageRequest
return checkACL(Coordination::ACL::Read, node_acls, session_auths);
}
using KeeperStorageRequest::KeeperStorageRequest;
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/) const override
{
auto & container = storage.container;
@ -713,7 +713,7 @@ struct KeeperStorageCheckRequest final : public KeeperStorageRequest
};
struct KeeperStorageSetACLRequest final : public KeeperStorageRequest
struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestProcessor
{
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
{
@ -730,7 +730,7 @@ struct KeeperStorageSetACLRequest final : public KeeperStorageRequest
return checkACL(Coordination::ACL::Admin, node_acls, session_auths);
}
using KeeperStorageRequest::KeeperStorageRequest;
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t session_id) const override
{
@ -777,7 +777,7 @@ struct KeeperStorageSetACLRequest final : public KeeperStorageRequest
}
};
struct KeeperStorageGetACLRequest final : public KeeperStorageRequest
struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestProcessor
{
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
{
@ -794,7 +794,7 @@ struct KeeperStorageGetACLRequest final : public KeeperStorageRequest
/// LOL, GetACL require more permissions, then SetACL...
return checkACL(Coordination::ACL::Admin | Coordination::ACL::Read, node_acls, session_auths);
}
using KeeperStorageRequest::KeeperStorageRequest;
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/) const override
{
@ -817,7 +817,7 @@ struct KeeperStorageGetACLRequest final : public KeeperStorageRequest
}
};
struct KeeperStorageMultiRequest final : public KeeperStorageRequest
struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestProcessor
{
bool checkAuth(KeeperStorage & storage, int64_t session_id) const override
{
@ -827,9 +827,9 @@ struct KeeperStorageMultiRequest final : public KeeperStorageRequest
return true;
}
std::vector<KeeperStorageRequestPtr> concrete_requests;
explicit KeeperStorageMultiRequest(const Coordination::ZooKeeperRequestPtr & zk_request_)
: KeeperStorageRequest(zk_request_)
std::vector<KeeperStorageRequestProcessorPtr> concrete_requests;
explicit KeeperStorageMultiRequestProcessor(const Coordination::ZooKeeperRequestPtr & zk_request_)
: KeeperStorageRequestProcessor(zk_request_)
{
Coordination::ZooKeeperMultiRequest & request = dynamic_cast<Coordination::ZooKeeperMultiRequest &>(*zk_request);
concrete_requests.reserve(request.requests.size());
@ -839,19 +839,19 @@ struct KeeperStorageMultiRequest final : public KeeperStorageRequest
auto sub_zk_request = std::dynamic_pointer_cast<Coordination::ZooKeeperRequest>(sub_request);
if (sub_zk_request->getOpNum() == Coordination::OpNum::Create)
{
concrete_requests.push_back(std::make_shared<KeeperStorageCreateRequest>(sub_zk_request));
concrete_requests.push_back(std::make_shared<KeeperStorageCreateRequestProcessor>(sub_zk_request));
}
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Remove)
{
concrete_requests.push_back(std::make_shared<KeeperStorageRemoveRequest>(sub_zk_request));
concrete_requests.push_back(std::make_shared<KeeperStorageRemoveRequestProcessor>(sub_zk_request));
}
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Set)
{
concrete_requests.push_back(std::make_shared<KeeperStorageSetRequest>(sub_zk_request));
concrete_requests.push_back(std::make_shared<KeeperStorageSetRequestProcessor>(sub_zk_request));
}
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Check)
{
concrete_requests.push_back(std::make_shared<KeeperStorageCheckRequest>(sub_zk_request));
concrete_requests.push_back(std::make_shared<KeeperStorageCheckRequestProcessor>(sub_zk_request));
}
else
throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Illegal command as part of multi ZooKeeper request {}", sub_zk_request->getOpNum());
@ -923,18 +923,18 @@ struct KeeperStorageMultiRequest final : public KeeperStorageRequest
}
};
struct KeeperStorageCloseRequest final : public KeeperStorageRequest
struct KeeperStorageCloseRequestProcessor final : public KeeperStorageRequestProcessor
{
using KeeperStorageRequest::KeeperStorageRequest;
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage &, int64_t, int64_t) const override
{
throw DB::Exception("Called process on close request", ErrorCodes::LOGICAL_ERROR);
}
};
struct KeeperStorageAuthRequest final : public KeeperStorageRequest
struct KeeperStorageAuthRequestProcessor final : public KeeperStorageRequestProcessor
{
using KeeperStorageRequest::KeeperStorageRequest;
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t session_id) const override
{
Coordination::ZooKeeperAuthRequest & auth_request = dynamic_cast<Coordination::ZooKeeperAuthRequest &>(*zk_request);
@ -988,20 +988,20 @@ void KeeperStorage::finalize()
}
class KeeperWrapperFactory final : private boost::noncopyable
class KeeperStorageRequestProcessorsFactory final : private boost::noncopyable
{
public:
using Creator = std::function<KeeperStorageRequestPtr(const Coordination::ZooKeeperRequestPtr &)>;
using Creator = std::function<KeeperStorageRequestProcessorPtr(const Coordination::ZooKeeperRequestPtr &)>;
using OpNumToRequest = std::unordered_map<Coordination::OpNum, Creator>;
static KeeperWrapperFactory & instance()
static KeeperStorageRequestProcessorsFactory & instance()
{
static KeeperWrapperFactory factory;
static KeeperStorageRequestProcessorsFactory factory;
return factory;
}
KeeperStorageRequestPtr get(const Coordination::ZooKeeperRequestPtr & zk_request) const
KeeperStorageRequestProcessorPtr get(const Coordination::ZooKeeperRequestPtr & zk_request) const
{
auto it = op_num_to_request.find(zk_request->getOpNum());
if (it == op_num_to_request.end())
@ -1018,33 +1018,33 @@ public:
private:
OpNumToRequest op_num_to_request;
KeeperWrapperFactory();
KeeperStorageRequestProcessorsFactory();
};
template<Coordination::OpNum num, typename RequestT>
void registerKeeperRequestWrapper(KeeperWrapperFactory & factory)
void registerKeeperRequestProcessor(KeeperStorageRequestProcessorsFactory & factory)
{
factory.registerRequest(num, [] (const Coordination::ZooKeeperRequestPtr & zk_request) { return std::make_shared<RequestT>(zk_request); });
}
KeeperWrapperFactory::KeeperWrapperFactory()
KeeperStorageRequestProcessorsFactory::KeeperStorageRequestProcessorsFactory()
{
registerKeeperRequestWrapper<Coordination::OpNum::Heartbeat, KeeperStorageHeartbeatRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Sync, KeeperStorageSyncRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Auth, KeeperStorageAuthRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Close, KeeperStorageCloseRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Create, KeeperStorageCreateRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Remove, KeeperStorageRemoveRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Exists, KeeperStorageExistsRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Get, KeeperStorageGetRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Set, KeeperStorageSetRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::List, KeeperStorageListRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::SimpleList, KeeperStorageListRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Check, KeeperStorageCheckRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Multi, KeeperStorageMultiRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::SetACL, KeeperStorageSetACLRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::GetACL, KeeperStorageGetACLRequest>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Heartbeat, KeeperStorageHeartbeatRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Sync, KeeperStorageSyncRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Auth, KeeperStorageAuthRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Close, KeeperStorageCloseRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Create, KeeperStorageCreateRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Remove, KeeperStorageRemoveRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Exists, KeeperStorageExistsRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Get, KeeperStorageGetRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Set, KeeperStorageSetRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::List, KeeperStorageListRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::SimpleList, KeeperStorageListRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Check, KeeperStorageCheckRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::Multi, KeeperStorageMultiRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::SetACL, KeeperStorageSetACLRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::GetACL, KeeperStorageGetACLRequestProcessor>(*this);
}
@ -1059,7 +1059,8 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina
}
session_expiry_queue.update(session_id, session_and_timeout[session_id]);
if (zk_request->getOpNum() == Coordination::OpNum::Close)
if (zk_request->getOpNum() == Coordination::OpNum::Close) /// Close request is special
{
auto it = ephemerals.find(session_id);
if (it != ephemerals.end())
@ -1092,21 +1093,21 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina
session_and_timeout.erase(session_id);
results.push_back(ResponseForSession{session_id, response});
}
else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat)
else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat) /// Heartbeat request is also special
{
KeeperStorageRequestPtr storage_request = KeeperWrapperFactory::instance().get(zk_request);
KeeperStorageRequestProcessorPtr storage_request = KeeperStorageRequestProcessorsFactory::instance().get(zk_request);
auto [response, _] = storage_request->process(*this, zxid, session_id);
response->xid = zk_request->xid;
response->zxid = getZXID();
results.push_back(ResponseForSession{session_id, response});
}
else
else /// normal requests proccession
{
KeeperStorageRequestPtr storage_request = KeeperWrapperFactory::instance().get(zk_request);
KeeperStorageRequestProcessorPtr request_processor = KeeperStorageRequestProcessorsFactory::instance().get(zk_request);
Coordination::ZooKeeperResponsePtr response;
if (check_acl && !storage_request->checkAuth(*this, session_id))
if (check_acl && !request_processor->checkAuth(*this, session_id))
{
response = zk_request->makeResponse();
/// Original ZooKeeper always throws no auth, even when user provided some credentials
@ -1114,9 +1115,10 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina
}
else
{
std::tie(response, std::ignore) = storage_request->process(*this, zxid, session_id);
std::tie(response, std::ignore) = request_processor->process(*this, zxid, session_id);
}
/// Watches for this requests are added to the watches lists
if (zk_request->has_watch)
{
if (response->error == Coordination::Error::ZOK)
@ -1135,9 +1137,10 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina
}
}
/// If this requests processed successfully we need to check watches
if (response->error == Coordination::Error::ZOK)
{
auto watch_responses = storage_request->processWatches(watches, list_watches);
auto watch_responses = request_processor->processWatches(watches, list_watches);
results.insert(results.end(), watch_responses.begin(), watch_responses.end());
}
@ -1153,11 +1156,13 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina
void KeeperStorage::clearDeadWatches(int64_t session_id)
{
/// Clear all watches for this session
auto watches_it = sessions_and_watchers.find(session_id);
if (watches_it != sessions_and_watchers.end())
{
for (const auto & watch_path : watches_it->second)
{
/// Maybe it's a normal watch
auto watch = watches.find(watch_path);
if (watch != watches.end())
{
@ -1173,6 +1178,7 @@ void KeeperStorage::clearDeadWatches(int64_t session_id)
watches.erase(watch);
}
/// Maybe it's a list watch
auto list_watch = list_watches.find(watch_path);
if (list_watch != list_watches.end())
{
@ -1188,6 +1194,7 @@ void KeeperStorage::clearDeadWatches(int64_t session_id)
list_watches.erase(list_watch);
}
}
sessions_and_watchers.erase(watches_it);
}
}

View File

@ -15,14 +15,17 @@ namespace DB
{
using namespace DB;
struct KeeperStorageRequest;
using KeeperStorageRequestPtr = std::shared_ptr<KeeperStorageRequest>;
struct KeeperStorageRequestProcessor;
using KeeperStorageRequestProcessorPtr = std::shared_ptr<KeeperStorageRequestProcessor>;
using ResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr &)>;
using ChildrenSet = std::unordered_set<std::string>;
using SessionAndTimeout = std::unordered_map<int64_t, int64_t>;
struct KeeperStorageSnapshot;
/// Keeper state machine almost equal to the ZooKeeper's state machine.
/// Implements all logic of operations, data changes, sessions allocation.
/// In-memory and not thread safe.
class KeeperStorage
{
public:
@ -77,21 +80,34 @@ public:
using Watches = std::map<String /* path, relative of root_path */, SessionIDs>;
/// Main hashtable with nodes. Contain all information about data.
/// All other structures expect session_and_timeout can be restored from
/// container.
Container container;
/// Mapping session_id -> set of ephemeral nodes paths
Ephemerals ephemerals;
/// Mapping sessuib_id -> set of watched nodes paths
SessionAndWatcher sessions_and_watchers;
/// Expiration queue for session, allows to get dead sessions at some point of time
SessionExpiryQueue session_expiry_queue;
/// All active sessions with timeout
SessionAndTimeout session_and_timeout;
/// ACLMap for more compact ACLs storage inside nodes.
ACLMap acl_map;
/// Global id of all requests applied to storage
int64_t zxid{0};
bool finalized{false};
/// Currently active watches (node_path -> subscribed sessions)
Watches watches;
Watches list_watches; /// Watches for 'list' request (watches on children).
void clearDeadWatches(int64_t session_id);
/// Get current zxid
int64_t getZXID() const
{
return zxid;
@ -102,6 +118,7 @@ public:
public:
KeeperStorage(int64_t tick_time_ms, const String & superdigest_);
/// Allocate new session id with the specified timeouts
int64_t getSessionID(int64_t session_timeout_ms)
{
auto result = session_id_counter++;
@ -110,21 +127,28 @@ public:
return result;
}
/// Add session id. Used when restoring KeeperStorage from snapshot.
void addSessionID(int64_t session_id, int64_t session_timeout_ms)
{
session_and_timeout.emplace(session_id, session_timeout_ms);
session_expiry_queue.update(session_id, session_timeout_ms);
}
/// Process user request and return response.
/// check_acl = false only when converting data from ZooKeeper.
ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, std::optional<int64_t> new_last_zxid, bool check_acl = true);
void finalize();
/// Set of methods for creating snapshots
/// Turn on snapshot mode, so data inside Container is not deleted, but replaced with new version.
void enableSnapshotMode()
{
container.enableSnapshotMode();
}
/// Turn off snapshot mode.
void disableSnapshotMode()
{
container.disableSnapshotMode();
@ -135,16 +159,19 @@ public:
return container.begin();
}
/// Clear outdated data from internal container.
void clearGarbageAfterSnapshot()
{
container.clearOutdatedNodes();
}
/// Get all active sessions
const SessionAndTimeout & getActiveSessions() const
{
return session_and_timeout;
}
/// Get all dead sessions
std::unordered_set<int64_t> getDeadSessions()
{
return session_expiry_queue.getExpiredSessions();

View File

@ -14,7 +14,7 @@
#include <Common/Throttler.h>
#include <Common/thread_local_rng.h>
#include <Common/FieldVisitorToString.h>
#include <Coordination/KeeperStorageDispatcher.h>
#include <Coordination/KeeperDispatcher.h>
#include <Compression/ICompressionCodec.h>
#include <Core/BackgroundSchedulePool.h>
#include <Formats/FormatFactory.h>
@ -146,7 +146,7 @@ struct ContextSharedPart
#if USE_NURAFT
mutable std::mutex keeper_storage_dispatcher_mutex;
mutable std::shared_ptr<KeeperStorageDispatcher> keeper_storage_dispatcher;
mutable std::shared_ptr<KeeperDispatcher> keeper_storage_dispatcher;
#endif
mutable std::mutex auxiliary_zookeepers_mutex;
mutable std::map<String, zkutil::ZooKeeperPtr> auxiliary_zookeepers; /// Map for auxiliary ZooKeeper clients.
@ -1649,7 +1649,7 @@ void Context::setSystemZooKeeperLogAfterInitializationIfNeeded()
zk.second->setZooKeeperLog(shared->system_logs->zookeeper_log);
}
void Context::initializeKeeperStorageDispatcher() const
void Context::initializeKeeperDispatcher() const
{
#if USE_NURAFT
std::lock_guard lock(shared->keeper_storage_dispatcher_mutex);
@ -1660,14 +1660,14 @@ void Context::initializeKeeperStorageDispatcher() const
const auto & config = getConfigRef();
if (config.has("keeper_server"))
{
shared->keeper_storage_dispatcher = std::make_shared<KeeperStorageDispatcher>();
shared->keeper_storage_dispatcher = std::make_shared<KeeperDispatcher>();
shared->keeper_storage_dispatcher->initialize(config, getApplicationType() == ApplicationType::KEEPER);
}
#endif
}
#if USE_NURAFT
std::shared_ptr<KeeperStorageDispatcher> & Context::getKeeperStorageDispatcher() const
std::shared_ptr<KeeperDispatcher> & Context::getKeeperDispatcher() const
{
std::lock_guard lock(shared->keeper_storage_dispatcher_mutex);
if (!shared->keeper_storage_dispatcher)
@ -1677,7 +1677,7 @@ std::shared_ptr<KeeperStorageDispatcher> & Context::getKeeperStorageDispatcher()
}
#endif
void Context::shutdownKeeperStorageDispatcher() const
void Context::shutdownKeeperDispatcher() const
{
#if USE_NURAFT
std::lock_guard lock(shared->keeper_storage_dispatcher_mutex);

View File

@ -102,7 +102,7 @@ class StoragePolicySelector;
using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;
struct PartUUIDs;
using PartUUIDsPtr = std::shared_ptr<PartUUIDs>;
class KeeperStorageDispatcher;
class KeeperDispatcher;
class Session;
class IOutputFormat;
@ -647,10 +647,10 @@ public:
std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const;
#if USE_NURAFT
std::shared_ptr<KeeperStorageDispatcher> & getKeeperStorageDispatcher() const;
std::shared_ptr<KeeperDispatcher> & getKeeperDispatcher() const;
#endif
void initializeKeeperStorageDispatcher() const;
void shutdownKeeperStorageDispatcher() const;
void initializeKeeperDispatcher() const;
void shutdownKeeperDispatcher() const;
/// Set auxiliary zookeepers configuration at server starting or configuration reloading.
void reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config);

View File

@ -194,7 +194,7 @@ KeeperTCPHandler::KeeperTCPHandler(IServer & server_, const Poco::Net::StreamSoc
, server(server_)
, log(&Poco::Logger::get("NuKeeperTCPHandler"))
, global_context(Context::createCopy(server.context()))
, keeper_dispatcher(global_context->getKeeperStorageDispatcher())
, keeper_dispatcher(global_context->getKeeperDispatcher())
, operation_timeout(0, global_context->getConfigRef().getUInt("keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000)
, session_timeout(0, global_context->getConfigRef().getUInt("keeper_server.session_timeout_ms", Coordination::DEFAULT_SESSION_TIMEOUT_MS) * 1000)
, poll_wrapper(std::make_unique<SocketInterruptablePollWrapper>(socket_))

View File

@ -13,7 +13,7 @@
#include <Interpreters/Context.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <Coordination/KeeperStorageDispatcher.h>
#include <Coordination/KeeperDispatcher.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <Coordination/ThreadSafeQueue.h>
@ -38,7 +38,7 @@ private:
IServer & server;
Poco::Logger * log;
ContextPtr global_context;
std::shared_ptr<KeeperStorageDispatcher> keeper_dispatcher;
std::shared_ptr<KeeperDispatcher> keeper_dispatcher;
Poco::Timespan operation_timeout;
Poco::Timespan session_timeout;
int64_t session_id{-1};