Merge remote-tracking branch 'ch/rename_keeper' into test_multiple_nodes

This commit is contained in:
Alexander Tokmakov 2021-03-29 22:44:40 +03:00
commit ccd31d1b72
96 changed files with 471 additions and 492 deletions

View File

@ -70,7 +70,7 @@ function start_server
--path "$FASTTEST_DATA"
--user_files_path "$FASTTEST_DATA/user_files"
--top_level_domains_path "$FASTTEST_DATA/top_level_domains"
--test_keeper_server.log_storage_path "$FASTTEST_DATA/coordination"
--keeper_server.log_storage_path "$FASTTEST_DATA/coordination"
)
clickhouse-server "${opts[@]}" &>> "$FASTTEST_OUTPUT/server.log" &
server_pid=$!

View File

@ -97,7 +97,7 @@
#endif
#if USE_NURAFT
# include <Server/NuKeeperTCPHandlerFactory.h>
# include <Server/KeeperTCPHandlerFactory.h>
#endif
namespace CurrentMetrics
@ -867,15 +867,15 @@ int Server::main(const std::vector<std::string> & /*args*/)
listen_try = true;
}
if (config().has("test_keeper_server"))
if (config().has("keeper_server"))
{
#if USE_NURAFT
/// Initialize test keeper RAFT. Do nothing if no nu_keeper_server in config.
global_context->initializeNuKeeperStorageDispatcher();
global_context->initializeKeeperStorageDispatcher();
for (const auto & listen_host : listen_hosts)
{
/// TCP NuKeeper
const char * port_name = "test_keeper_server.tcp_port";
/// TCP Keeper
const char * port_name = "keeper_server.tcp_port";
createServer(listen_host, port_name, listen_try, [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
@ -885,9 +885,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
servers_to_start_before_tables->emplace_back(
port_name,
std::make_unique<Poco::Net::TCPServer>(
new NuKeeperTCPHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams));
new KeeperTCPHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening for connections to NuKeeper (tcp): {}", address.toString());
LOG_INFO(log, "Listening for connections to Keeper (tcp): {}", address.toString());
});
}
#else
@ -934,7 +934,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
else
LOG_INFO(log, "Closed connections to servers for tables.");
global_context->shutdownNuKeeperStorageDispatcher();
global_context->shutdownKeeperStorageDispatcher();
}
/** Explicitly destroy Context. It is more convenient than in destructor of Server, because logger is still available.

View File

@ -0,0 +1 @@
../../../tests/config/config.d/keeper_port.xml

View File

@ -1 +0,0 @@
../../../tests/config/config.d/test_keeper_port.xml

View File

@ -1,40 +1,40 @@
#include <Coordination/NuKeeperLogStore.h>
#include <Coordination/KeeperLogStore.h>
namespace DB
{
NuKeeperLogStore::NuKeeperLogStore(const std::string & changelogs_path, size_t rotate_interval_, bool force_sync_)
: log(&Poco::Logger::get("NuKeeperLogStore"))
KeeperLogStore::KeeperLogStore(const std::string & changelogs_path, size_t rotate_interval_, bool force_sync_)
: log(&Poco::Logger::get("KeeperLogStore"))
, changelog(changelogs_path, rotate_interval_, log)
, force_sync(force_sync_)
{
}
size_t NuKeeperLogStore::start_index() const
size_t KeeperLogStore::start_index() const
{
std::lock_guard lock(changelog_lock);
return changelog.getStartIndex();
}
void NuKeeperLogStore::init(size_t last_commited_log_index, size_t logs_to_keep)
void KeeperLogStore::init(size_t last_commited_log_index, size_t logs_to_keep)
{
std::lock_guard lock(changelog_lock);
changelog.readChangelogAndInitWriter(last_commited_log_index, logs_to_keep);
}
size_t NuKeeperLogStore::next_slot() const
size_t KeeperLogStore::next_slot() const
{
std::lock_guard lock(changelog_lock);
return changelog.getNextEntryIndex();
}
nuraft::ptr<nuraft::log_entry> NuKeeperLogStore::last_entry() const
nuraft::ptr<nuraft::log_entry> KeeperLogStore::last_entry() const
{
std::lock_guard lock(changelog_lock);
return changelog.getLastEntry();
}
size_t NuKeeperLogStore::append(nuraft::ptr<nuraft::log_entry> & entry)
size_t KeeperLogStore::append(nuraft::ptr<nuraft::log_entry> & entry)
{
std::lock_guard lock(changelog_lock);
size_t idx = changelog.getNextEntryIndex();
@ -43,25 +43,25 @@ size_t NuKeeperLogStore::append(nuraft::ptr<nuraft::log_entry> & entry)
}
void NuKeeperLogStore::write_at(size_t index, nuraft::ptr<nuraft::log_entry> & entry)
void KeeperLogStore::write_at(size_t index, nuraft::ptr<nuraft::log_entry> & entry)
{
std::lock_guard lock(changelog_lock);
changelog.writeAt(index, entry, force_sync);
}
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> NuKeeperLogStore::log_entries(size_t start, size_t end)
nuraft::ptr<std::vector<nuraft::ptr<nuraft::log_entry>>> KeeperLogStore::log_entries(size_t start, size_t end)
{
std::lock_guard lock(changelog_lock);
return changelog.getLogEntriesBetween(start, end);
}
nuraft::ptr<nuraft::log_entry> NuKeeperLogStore::entry_at(size_t index)
nuraft::ptr<nuraft::log_entry> KeeperLogStore::entry_at(size_t index)
{
std::lock_guard lock(changelog_lock);
return changelog.entryAt(index);
}
size_t NuKeeperLogStore::term_at(size_t index)
size_t KeeperLogStore::term_at(size_t index)
{
std::lock_guard lock(changelog_lock);
auto entry = changelog.entryAt(index);
@ -70,33 +70,33 @@ size_t NuKeeperLogStore::term_at(size_t index)
return 0;
}
nuraft::ptr<nuraft::buffer> NuKeeperLogStore::pack(size_t index, int32_t cnt)
nuraft::ptr<nuraft::buffer> KeeperLogStore::pack(size_t index, int32_t cnt)
{
std::lock_guard lock(changelog_lock);
return changelog.serializeEntriesToBuffer(index, cnt);
}
bool NuKeeperLogStore::compact(size_t last_log_index)
bool KeeperLogStore::compact(size_t last_log_index)
{
std::lock_guard lock(changelog_lock);
changelog.compact(last_log_index);
return true;
}
bool NuKeeperLogStore::flush()
bool KeeperLogStore::flush()
{
std::lock_guard lock(changelog_lock);
changelog.flush();
return true;
}
void NuKeeperLogStore::apply_pack(size_t index, nuraft::buffer & pack)
void KeeperLogStore::apply_pack(size_t index, nuraft::buffer & pack)
{
std::lock_guard lock(changelog_lock);
changelog.applyEntriesFromBuffer(index, pack, force_sync);
}
size_t NuKeeperLogStore::size() const
size_t KeeperLogStore::size() const
{
std::lock_guard lock(changelog_lock);
return changelog.size();

View File

@ -9,10 +9,10 @@
namespace DB
{
class NuKeeperLogStore : public nuraft::log_store
class KeeperLogStore : public nuraft::log_store
{
public:
NuKeeperLogStore(const std::string & changelogs_path, size_t rotate_interval_, bool force_sync_);
KeeperLogStore(const std::string & changelogs_path, size_t rotate_interval_, bool force_sync_);
void init(size_t last_commited_log_index, size_t logs_to_keep);

View File

@ -1,7 +1,7 @@
#include <Coordination/NuKeeperServer.h>
#include <Coordination/KeeperServer.h>
#include <Coordination/LoggerWrapper.h>
#include <Coordination/NuKeeperStateMachine.h>
#include <Coordination/NuKeeperStateManager.h>
#include <Coordination/KeeperStateMachine.h>
#include <Coordination/KeeperStateManager.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <Coordination/ReadBufferFromNuraftBuffer.h>
#include <IO/ReadHelpers.h>
@ -18,7 +18,7 @@ namespace ErrorCodes
extern const int RAFT_ERROR;
}
NuKeeperServer::NuKeeperServer(
KeeperServer::KeeperServer(
int server_id_,
const CoordinationSettingsPtr & coordination_settings_,
const Poco::Util::AbstractConfiguration & config,
@ -26,15 +26,18 @@ NuKeeperServer::NuKeeperServer(
SnapshotsQueue & snapshots_queue_)
: server_id(server_id_)
, coordination_settings(coordination_settings_)
, state_machine(nuraft::cs_new<NuKeeperStateMachine>(responses_queue_, snapshots_queue_, config.getString("test_keeper_server.snapshot_storage_path", config.getString("path", DBMS_DEFAULT_PATH) + "coordination/snapshots"), coordination_settings))
, state_manager(nuraft::cs_new<NuKeeperStateManager>(server_id, "test_keeper_server", config, coordination_settings))
, state_machine(nuraft::cs_new<KeeperStateMachine>(
responses_queue_, snapshots_queue_,
config.getString("keeper_server.snapshot_storage_path", config.getString("path", DBMS_DEFAULT_PATH) + "coordination/snapshots"),
coordination_settings))
, state_manager(nuraft::cs_new<KeeperStateManager>(server_id, "keeper_server", config, coordination_settings))
, responses_queue(responses_queue_)
{
if (coordination_settings->quorum_reads)
LOG_WARNING(&Poco::Logger::get("NuKeeperServer"), "Quorum reads enabled, NuKeeper will work slower.");
LOG_WARNING(&Poco::Logger::get("KeeperServer"), "Quorum reads enabled, Keeper will work slower.");
}
void NuKeeperServer::startup()
void KeeperServer::startup()
{
state_machine->init();
@ -84,13 +87,13 @@ void NuKeeperServer::startup()
throw Exception(ErrorCodes::RAFT_ERROR, "Cannot allocate RAFT instance");
}
void NuKeeperServer::shutdown()
void KeeperServer::shutdown()
{
state_machine->shutdownStorage();
state_manager->flushLogStore();
auto timeout = coordination_settings->shutdown_timeout.totalSeconds();
if (!launcher.shutdown(timeout))
LOG_WARNING(&Poco::Logger::get("NuKeeperServer"), "Failed to shutdown RAFT server in {} seconds", timeout);
LOG_WARNING(&Poco::Logger::get("KeeperServer"), "Failed to shutdown RAFT server in {} seconds", timeout);
}
namespace
@ -106,7 +109,7 @@ nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, const Coord
}
void NuKeeperServer::putRequest(const NuKeeperStorage::RequestForSession & request_for_session)
void KeeperServer::putRequest(const KeeperStorage::RequestForSession & request_for_session)
{
auto [session_id, request] = request_for_session;
if (!coordination_settings->quorum_reads && isLeaderAlive() && request->isReadRequest())
@ -123,29 +126,29 @@ void NuKeeperServer::putRequest(const NuKeeperStorage::RequestForSession & reque
auto result = raft_instance->append_entries(entries);
if (!result->get_accepted())
{
NuKeeperStorage::ResponsesForSessions responses;
KeeperStorage::ResponsesForSessions responses;
auto response = request->makeResponse();
response->xid = request->xid;
response->zxid = 0;
response->error = Coordination::Error::ZOPERATIONTIMEOUT;
responses_queue.push(DB::NuKeeperStorage::ResponseForSession{session_id, response});
responses_queue.push(DB::KeeperStorage::ResponseForSession{session_id, response});
}
if (result->get_result_code() == nuraft::cmd_result_code::TIMEOUT)
{
NuKeeperStorage::ResponsesForSessions responses;
KeeperStorage::ResponsesForSessions responses;
auto response = request->makeResponse();
response->xid = request->xid;
response->zxid = 0;
response->error = Coordination::Error::ZOPERATIONTIMEOUT;
responses_queue.push(DB::NuKeeperStorage::ResponseForSession{session_id, response});
responses_queue.push(DB::KeeperStorage::ResponseForSession{session_id, response});
}
else if (result->get_result_code() != nuraft::cmd_result_code::OK)
throw Exception(ErrorCodes::RAFT_ERROR, "Requests result failed with code {} and message: '{}'", result->get_result_code(), result->get_result_str());
}
}
int64_t NuKeeperServer::getSessionID(int64_t session_timeout_ms)
int64_t KeeperServer::getSessionID(int64_t session_timeout_ms)
{
auto entry = nuraft::buffer::alloc(sizeof(int64_t));
/// Just special session request
@ -170,17 +173,17 @@ int64_t NuKeeperServer::getSessionID(int64_t session_timeout_ms)
return bs_resp.get_i64();
}
bool NuKeeperServer::isLeader() const
bool KeeperServer::isLeader() const
{
return raft_instance->is_leader();
}
bool NuKeeperServer::isLeaderAlive() const
bool KeeperServer::isLeaderAlive() const
{
return raft_instance->is_leader_alive();
}
nuraft::cb_func::ReturnCode NuKeeperServer::callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * /* param */)
nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * /* param */)
{
size_t last_commited = state_machine->last_commit_index();
size_t next_index = state_manager->getLogStore()->next_slot();
@ -240,7 +243,7 @@ nuraft::cb_func::ReturnCode NuKeeperServer::callbackFunc(nuraft::cb_func::Type t
}
}
void NuKeeperServer::waitInit()
void KeeperServer::waitInit()
{
std::unique_lock lock(initialized_mutex);
int64_t timeout = coordination_settings->startup_timeout.totalMilliseconds();
@ -248,7 +251,7 @@ void NuKeeperServer::waitInit()
throw Exception(ErrorCodes::RAFT_ERROR, "Failed to wait RAFT initialization");
}
std::unordered_set<int64_t> NuKeeperServer::getDeadSessions()
std::unordered_set<int64_t> KeeperServer::getDeadSessions()
{
return state_machine->getDeadSessions();
}

View File

@ -2,25 +2,25 @@
#include <libnuraft/nuraft.hxx> // Y_IGNORE
#include <Coordination/InMemoryLogStore.h>
#include <Coordination/NuKeeperStateManager.h>
#include <Coordination/NuKeeperStateMachine.h>
#include <Coordination/NuKeeperStorage.h>
#include <Coordination/KeeperStateManager.h>
#include <Coordination/KeeperStateMachine.h>
#include <Coordination/KeeperStorage.h>
#include <Coordination/CoordinationSettings.h>
#include <unordered_map>
namespace DB
{
class NuKeeperServer
class KeeperServer
{
private:
int server_id;
CoordinationSettingsPtr coordination_settings;
nuraft::ptr<NuKeeperStateMachine> state_machine;
nuraft::ptr<KeeperStateMachine> state_machine;
nuraft::ptr<NuKeeperStateManager> state_manager;
nuraft::ptr<KeeperStateManager> state_manager;
nuraft::raft_launcher launcher;
@ -38,7 +38,7 @@ private:
nuraft::cb_func::ReturnCode callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * param);
public:
NuKeeperServer(
KeeperServer(
int server_id_,
const CoordinationSettingsPtr & coordination_settings_,
const Poco::Util::AbstractConfiguration & config,
@ -47,7 +47,7 @@ public:
void startup();
void putRequest(const NuKeeperStorage::RequestForSession & request);
void putRequest(const KeeperStorage::RequestForSession & request);
int64_t getSessionID(int64_t session_timeout_ms);

View File

@ -1,4 +1,4 @@
#include <Coordination/NuKeeperSnapshotManager.h>
#include <Coordination/KeeperSnapshotManager.h>
#include <IO/WriteHelpers.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
@ -51,7 +51,7 @@ namespace
return "/";
}
void writeNode(const NuKeeperStorage::Node & node, WriteBuffer & out)
void writeNode(const KeeperStorage::Node & node, WriteBuffer & out)
{
writeBinary(node.data, out);
@ -81,7 +81,7 @@ namespace
writeBinary(node.seq_num, out);
}
void readNode(NuKeeperStorage::Node & node, ReadBuffer & in)
void readNode(KeeperStorage::Node & node, ReadBuffer & in)
{
readBinary(node.data, in);
@ -132,7 +132,7 @@ namespace
}
void NuKeeperStorageSnapshot::serialize(const NuKeeperStorageSnapshot & snapshot, WriteBuffer & out)
void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, WriteBuffer & out)
{
writeBinary(static_cast<uint8_t>(snapshot.version), out);
serializeSnapshotMetadata(snapshot.snapshot_meta, out);
@ -159,7 +159,7 @@ void NuKeeperStorageSnapshot::serialize(const NuKeeperStorageSnapshot & snapshot
}
}
SnapshotMetadataPtr NuKeeperStorageSnapshot::deserialize(NuKeeperStorage & storage, ReadBuffer & in)
SnapshotMetadataPtr KeeperStorageSnapshot::deserialize(KeeperStorage & storage, ReadBuffer & in)
{
uint8_t version;
readBinary(version, in);
@ -180,7 +180,7 @@ SnapshotMetadataPtr NuKeeperStorageSnapshot::deserialize(NuKeeperStorage & stora
{
std::string path;
readBinary(path, in);
NuKeeperStorage::Node node;
KeeperStorage::Node node;
readNode(node, in);
storage.container.insertOrReplace(path, node);
if (node.stat.ephemeralOwner != 0)
@ -194,7 +194,7 @@ SnapshotMetadataPtr NuKeeperStorageSnapshot::deserialize(NuKeeperStorage & stora
if (itr.key != "/")
{
auto parent_path = parentPath(itr.key);
storage.container.updateValue(parent_path, [&path = itr.key] (NuKeeperStorage::Node & value) { value.children.insert(getBaseName(path)); });
storage.container.updateValue(parent_path, [&path = itr.key] (KeeperStorage::Node & value) { value.children.insert(getBaseName(path)); });
}
}
@ -214,7 +214,7 @@ SnapshotMetadataPtr NuKeeperStorageSnapshot::deserialize(NuKeeperStorage & stora
return result;
}
NuKeeperStorageSnapshot::NuKeeperStorageSnapshot(NuKeeperStorage * storage_, size_t up_to_log_idx_)
KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, size_t up_to_log_idx_)
: storage(storage_)
, snapshot_meta(std::make_shared<SnapshotMetadata>(up_to_log_idx_, 0, std::make_shared<nuraft::cluster_config>()))
, session_id(storage->session_id_counter)
@ -225,7 +225,7 @@ NuKeeperStorageSnapshot::NuKeeperStorageSnapshot(NuKeeperStorage * storage_, siz
session_and_timeout = storage->getActiveSessions();
}
NuKeeperStorageSnapshot::NuKeeperStorageSnapshot(NuKeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_)
KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_)
: storage(storage_)
, snapshot_meta(snapshot_meta_)
, session_id(storage->session_id_counter)
@ -236,12 +236,12 @@ NuKeeperStorageSnapshot::NuKeeperStorageSnapshot(NuKeeperStorage * storage_, con
session_and_timeout = storage->getActiveSessions();
}
NuKeeperStorageSnapshot::~NuKeeperStorageSnapshot()
KeeperStorageSnapshot::~KeeperStorageSnapshot()
{
storage->disableSnapshotMode();
}
NuKeeperSnapshotManager::NuKeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, size_t storage_tick_time_)
KeeperSnapshotManager::KeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, size_t storage_tick_time_)
: snapshots_path(snapshots_path_)
, snapshots_to_keep(snapshots_to_keep_)
, storage_tick_time(storage_tick_time_)
@ -266,7 +266,7 @@ NuKeeperSnapshotManager::NuKeeperSnapshotManager(const std::string & snapshots_p
}
std::string NuKeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer & buffer, size_t up_to_log_idx)
std::string KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer & buffer, size_t up_to_log_idx)
{
ReadBufferFromNuraftBuffer reader(buffer);
@ -287,7 +287,7 @@ std::string NuKeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffe
return new_snapshot_path;
}
nuraft::ptr<nuraft::buffer> NuKeeperSnapshotManager::deserializeLatestSnapshotBufferFromDisk()
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeLatestSnapshotBufferFromDisk()
{
while (!existing_snapshots.empty())
{
@ -307,7 +307,7 @@ nuraft::ptr<nuraft::buffer> NuKeeperSnapshotManager::deserializeLatestSnapshotBu
return nullptr;
}
nuraft::ptr<nuraft::buffer> NuKeeperSnapshotManager::deserializeSnapshotBufferFromDisk(size_t up_to_log_idx) const
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::deserializeSnapshotBufferFromDisk(size_t up_to_log_idx) const
{
const std::string & snapshot_path = existing_snapshots.at(up_to_log_idx);
WriteBufferFromNuraftBuffer writer;
@ -316,26 +316,26 @@ nuraft::ptr<nuraft::buffer> NuKeeperSnapshotManager::deserializeSnapshotBufferFr
return writer.getBuffer();
}
nuraft::ptr<nuraft::buffer> NuKeeperSnapshotManager::serializeSnapshotToBuffer(const NuKeeperStorageSnapshot & snapshot)
nuraft::ptr<nuraft::buffer> KeeperSnapshotManager::serializeSnapshotToBuffer(const KeeperStorageSnapshot & snapshot)
{
WriteBufferFromNuraftBuffer writer;
CompressedWriteBuffer compressed_writer(writer);
NuKeeperStorageSnapshot::serialize(snapshot, compressed_writer);
KeeperStorageSnapshot::serialize(snapshot, compressed_writer);
compressed_writer.finalize();
return writer.getBuffer();
}
SnapshotMetaAndStorage NuKeeperSnapshotManager::deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const
SnapshotMetaAndStorage KeeperSnapshotManager::deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const
{
ReadBufferFromNuraftBuffer reader(buffer);
CompressedReadBuffer compressed_reader(reader);
auto storage = std::make_unique<NuKeeperStorage>(storage_tick_time);
auto snapshot_metadata = NuKeeperStorageSnapshot::deserialize(*storage, compressed_reader);
auto storage = std::make_unique<KeeperStorage>(storage_tick_time);
auto snapshot_metadata = KeeperStorageSnapshot::deserialize(*storage, compressed_reader);
return std::make_pair(snapshot_metadata, std::move(storage));
}
SnapshotMetaAndStorage NuKeeperSnapshotManager::restoreFromLatestSnapshot()
SnapshotMetaAndStorage KeeperSnapshotManager::restoreFromLatestSnapshot()
{
if (existing_snapshots.empty())
return {};
@ -346,13 +346,13 @@ SnapshotMetaAndStorage NuKeeperSnapshotManager::restoreFromLatestSnapshot()
return deserializeSnapshotFromBuffer(buffer);
}
void NuKeeperSnapshotManager::removeOutdatedSnapshotsIfNeeded()
void KeeperSnapshotManager::removeOutdatedSnapshotsIfNeeded()
{
while (existing_snapshots.size() > snapshots_to_keep)
removeSnapshot(existing_snapshots.begin()->first);
}
void NuKeeperSnapshotManager::removeSnapshot(size_t log_idx)
void KeeperSnapshotManager::removeSnapshot(size_t log_idx)
{
auto itr = existing_snapshots.find(log_idx);
if (itr == existing_snapshots.end())

View File

@ -1,6 +1,6 @@
#pragma once
#include <libnuraft/nuraft.hxx> // Y_IGNORE
#include <Coordination/NuKeeperStorage.h>
#include <Coordination/KeeperStorage.h>
#include <IO/WriteBuffer.h>
#include <IO/ReadBuffer.h>
@ -15,42 +15,42 @@ enum SnapshotVersion : uint8_t
V0 = 0,
};
struct NuKeeperStorageSnapshot
struct KeeperStorageSnapshot
{
public:
NuKeeperStorageSnapshot(NuKeeperStorage * storage_, size_t up_to_log_idx_);
KeeperStorageSnapshot(KeeperStorage * storage_, size_t up_to_log_idx_);
NuKeeperStorageSnapshot(NuKeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_);
~NuKeeperStorageSnapshot();
KeeperStorageSnapshot(KeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_);
~KeeperStorageSnapshot();
static void serialize(const NuKeeperStorageSnapshot & snapshot, WriteBuffer & out);
static void serialize(const KeeperStorageSnapshot & snapshot, WriteBuffer & out);
static SnapshotMetadataPtr deserialize(NuKeeperStorage & storage, ReadBuffer & in);
static SnapshotMetadataPtr deserialize(KeeperStorage & storage, ReadBuffer & in);
NuKeeperStorage * storage;
KeeperStorage * storage;
SnapshotVersion version = SnapshotVersion::V0;
SnapshotMetadataPtr snapshot_meta;
int64_t session_id;
size_t snapshot_container_size;
NuKeeperStorage::Container::const_iterator begin;
KeeperStorage::Container::const_iterator begin;
SessionAndTimeout session_and_timeout;
};
using NuKeeperStorageSnapshotPtr = std::shared_ptr<NuKeeperStorageSnapshot>;
using CreateSnapshotCallback = std::function<void(NuKeeperStorageSnapshotPtr &&)>;
using KeeperStorageSnapshotPtr = std::shared_ptr<KeeperStorageSnapshot>;
using CreateSnapshotCallback = std::function<void(KeeperStorageSnapshotPtr &&)>;
using SnapshotMetaAndStorage = std::pair<SnapshotMetadataPtr, NuKeeperStoragePtr>;
using SnapshotMetaAndStorage = std::pair<SnapshotMetadataPtr, KeeperStoragePtr>;
class NuKeeperSnapshotManager
class KeeperSnapshotManager
{
public:
NuKeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, size_t storage_tick_time_ = 500);
KeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, size_t storage_tick_time_ = 500);
SnapshotMetaAndStorage restoreFromLatestSnapshot();
static nuraft::ptr<nuraft::buffer> serializeSnapshotToBuffer(const NuKeeperStorageSnapshot & snapshot);
static nuraft::ptr<nuraft::buffer> serializeSnapshotToBuffer(const KeeperStorageSnapshot & snapshot);
std::string serializeSnapshotBufferToDisk(nuraft::buffer & buffer, size_t up_to_log_idx);
SnapshotMetaAndStorage deserializeSnapshotFromBuffer(nuraft::ptr<nuraft::buffer> buffer) const;
@ -82,7 +82,7 @@ private:
struct CreateSnapshotTask
{
NuKeeperStorageSnapshotPtr snapshot;
KeeperStorageSnapshotPtr snapshot;
CreateSnapshotCallback create_snapshot;
};

View File

@ -1,9 +1,9 @@
#include <Coordination/NuKeeperStateMachine.h>
#include <Coordination/KeeperStateMachine.h>
#include <Coordination/ReadBufferFromNuraftBuffer.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <IO/ReadHelpers.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Coordination/NuKeeperSnapshotManager.h>
#include <Coordination/KeeperSnapshotManager.h>
#include <future>
namespace DB
@ -14,10 +14,10 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
NuKeeperStorage::RequestForSession parseRequest(nuraft::buffer & data)
KeeperStorage::RequestForSession parseRequest(nuraft::buffer & data)
{
ReadBufferFromNuraftBuffer buffer(data);
NuKeeperStorage::RequestForSession request_for_session;
KeeperStorage::RequestForSession request_for_session;
readIntBinary(request_for_session.session_id, buffer);
int32_t length;
@ -36,17 +36,17 @@ NuKeeperStorage::RequestForSession parseRequest(nuraft::buffer & data)
return request_for_session;
}
NuKeeperStateMachine::NuKeeperStateMachine(ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_)
KeeperStateMachine::KeeperStateMachine(ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_)
: coordination_settings(coordination_settings_)
, snapshot_manager(snapshots_path_, coordination_settings->snapshots_to_keep, coordination_settings->dead_session_check_period_ms.totalMicroseconds())
, responses_queue(responses_queue_)
, snapshots_queue(snapshots_queue_)
, last_committed_idx(0)
, log(&Poco::Logger::get("NuKeeperStateMachine"))
, log(&Poco::Logger::get("KeeperStateMachine"))
{
}
void NuKeeperStateMachine::init()
void KeeperStateMachine::init()
{
/// Do everything without mutexes, no other threads exist.
LOG_DEBUG(log, "Totally have {} snapshots", snapshot_manager.totalSnapshots());
@ -85,10 +85,10 @@ void NuKeeperStateMachine::init()
}
if (!storage)
storage = std::make_unique<NuKeeperStorage>(coordination_settings->dead_session_check_period_ms.totalMilliseconds());
storage = std::make_unique<KeeperStorage>(coordination_settings->dead_session_check_period_ms.totalMilliseconds());
}
nuraft::ptr<nuraft::buffer> NuKeeperStateMachine::commit(const size_t log_idx, nuraft::buffer & data)
nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const size_t log_idx, nuraft::buffer & data)
{
if (data.size() == sizeof(int64_t))
{
@ -109,7 +109,7 @@ nuraft::ptr<nuraft::buffer> NuKeeperStateMachine::commit(const size_t log_idx, n
else
{
auto request_for_session = parseRequest(data);
NuKeeperStorage::ResponsesForSessions responses_for_sessions;
KeeperStorage::ResponsesForSessions responses_for_sessions;
{
std::lock_guard lock(storage_lock);
responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx);
@ -122,7 +122,7 @@ nuraft::ptr<nuraft::buffer> NuKeeperStateMachine::commit(const size_t log_idx, n
}
}
bool NuKeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
{
LOG_DEBUG(log, "Applying snapshot {}", s.get_last_log_idx());
nuraft::ptr<nuraft::buffer> latest_snapshot_ptr;
@ -142,14 +142,14 @@ bool NuKeeperStateMachine::apply_snapshot(nuraft::snapshot & s)
return true;
}
nuraft::ptr<nuraft::snapshot> NuKeeperStateMachine::last_snapshot()
nuraft::ptr<nuraft::snapshot> KeeperStateMachine::last_snapshot()
{
/// Just return the latest snapshot.
std::lock_guard<std::mutex> lock(snapshots_lock);
return latest_snapshot_meta;
}
void NuKeeperStateMachine::create_snapshot(
void KeeperStateMachine::create_snapshot(
nuraft::snapshot & s,
nuraft::async_result<bool>::handler_type & when_done)
{
@ -160,10 +160,10 @@ void NuKeeperStateMachine::create_snapshot(
CreateSnapshotTask snapshot_task;
{
std::lock_guard lock(storage_lock);
snapshot_task.snapshot = std::make_shared<NuKeeperStorageSnapshot>(storage.get(), snapshot_meta_copy);
snapshot_task.snapshot = std::make_shared<KeeperStorageSnapshot>(storage.get(), snapshot_meta_copy);
}
snapshot_task.create_snapshot = [this, when_done] (NuKeeperStorageSnapshotPtr && snapshot)
snapshot_task.create_snapshot = [this, when_done] (KeeperStorageSnapshotPtr && snapshot)
{
nuraft::ptr<std::exception> exception(nullptr);
bool ret = true;
@ -203,7 +203,7 @@ void NuKeeperStateMachine::create_snapshot(
snapshots_queue.push(std::move(snapshot_task));
}
void NuKeeperStateMachine::save_logical_snp_obj(
void KeeperStateMachine::save_logical_snp_obj(
nuraft::snapshot & s,
size_t & obj_id,
nuraft::buffer & data,
@ -217,7 +217,7 @@ void NuKeeperStateMachine::save_logical_snp_obj(
if (obj_id == 0)
{
std::lock_guard lock(storage_lock);
NuKeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx());
KeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx());
cloned_buffer = snapshot_manager.serializeSnapshotToBuffer(snapshot);
}
else
@ -235,7 +235,7 @@ void NuKeeperStateMachine::save_logical_snp_obj(
std::shared_ptr<std::promise<void>> waiter = std::make_shared<std::promise<void>>();
auto future = waiter->get_future();
snapshot_task.snapshot = nullptr;
snapshot_task.create_snapshot = [this, waiter, cloned_buffer, log_idx = s.get_last_log_idx()] (NuKeeperStorageSnapshotPtr &&)
snapshot_task.create_snapshot = [this, waiter, cloned_buffer, log_idx = s.get_last_log_idx()] (KeeperStorageSnapshotPtr &&)
{
try
{
@ -261,7 +261,7 @@ void NuKeeperStateMachine::save_logical_snp_obj(
obj_id++;
}
int NuKeeperStateMachine::read_logical_snp_obj(
int KeeperStateMachine::read_logical_snp_obj(
nuraft::snapshot & s,
void* & /*user_snp_ctx*/,
ulong obj_id,
@ -289,9 +289,9 @@ int NuKeeperStateMachine::read_logical_snp_obj(
return 0;
}
void NuKeeperStateMachine::processReadRequest(const NuKeeperStorage::RequestForSession & request_for_session)
void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session)
{
NuKeeperStorage::ResponsesForSessions responses;
KeeperStorage::ResponsesForSessions responses;
{
std::lock_guard lock(storage_lock);
responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt);
@ -300,13 +300,13 @@ void NuKeeperStateMachine::processReadRequest(const NuKeeperStorage::RequestForS
responses_queue.push(response);
}
std::unordered_set<int64_t> NuKeeperStateMachine::getDeadSessions()
std::unordered_set<int64_t> KeeperStateMachine::getDeadSessions()
{
std::lock_guard lock(storage_lock);
return storage->getDeadSessions();
}
void NuKeeperStateMachine::shutdownStorage()
void KeeperStateMachine::shutdownStorage()
{
std::lock_guard lock(storage_lock);
storage->finalize();

View File

@ -1,22 +1,22 @@
#pragma once
#include <Coordination/NuKeeperStorage.h>
#include <Coordination/KeeperStorage.h>
#include <libnuraft/nuraft.hxx> // Y_IGNORE
#include <common/logger_useful.h>
#include <Coordination/ThreadSafeQueue.h>
#include <Coordination/CoordinationSettings.h>
#include <Coordination/NuKeeperSnapshotManager.h>
#include <Coordination/KeeperSnapshotManager.h>
namespace DB
{
using ResponsesQueue = ThreadSafeQueue<NuKeeperStorage::ResponseForSession>;
using ResponsesQueue = ThreadSafeQueue<KeeperStorage::ResponseForSession>;
using SnapshotsQueue = ConcurrentBoundedQueue<CreateSnapshotTask>;
class NuKeeperStateMachine : public nuraft::state_machine
class KeeperStateMachine : public nuraft::state_machine
{
public:
NuKeeperStateMachine(ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_);
KeeperStateMachine(ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_);
void init();
@ -50,12 +50,12 @@ public:
nuraft::ptr<nuraft::buffer> & data_out,
bool & is_last_obj) override;
NuKeeperStorage & getStorage()
KeeperStorage & getStorage()
{
return *storage;
}
void processReadRequest(const NuKeeperStorage::RequestForSession & request_for_session);
void processReadRequest(const KeeperStorage::RequestForSession & request_for_session);
std::unordered_set<int64_t> getDeadSessions();
@ -68,9 +68,9 @@ private:
CoordinationSettingsPtr coordination_settings;
NuKeeperStoragePtr storage;
KeeperStoragePtr storage;
NuKeeperSnapshotManager snapshot_manager;
KeeperSnapshotManager snapshot_manager;
ResponsesQueue & responses_queue;

View File

@ -1,4 +1,4 @@
#include <Coordination/NuKeeperStateManager.h>
#include <Coordination/KeeperStateManager.h>
#include <Common/Exception.h>
namespace DB
@ -9,23 +9,23 @@ namespace ErrorCodes
extern const int RAFT_ERROR;
}
NuKeeperStateManager::NuKeeperStateManager(int server_id_, const std::string & host, int port, const std::string & logs_path)
KeeperStateManager::KeeperStateManager(int server_id_, const std::string & host, int port, const std::string & logs_path)
: my_server_id(server_id_)
, my_port(port)
, log_store(nuraft::cs_new<NuKeeperLogStore>(logs_path, 5000, false))
, log_store(nuraft::cs_new<KeeperLogStore>(logs_path, 5000, false))
, cluster_config(nuraft::cs_new<nuraft::cluster_config>())
{
auto peer_config = nuraft::cs_new<nuraft::srv_config>(my_server_id, host + ":" + std::to_string(port));
cluster_config->get_servers().push_back(peer_config);
}
NuKeeperStateManager::NuKeeperStateManager(
KeeperStateManager::KeeperStateManager(
int my_server_id_,
const std::string & config_prefix,
const Poco::Util::AbstractConfiguration & config,
const CoordinationSettingsPtr & coordination_settings)
: my_server_id(my_server_id_)
, log_store(nuraft::cs_new<NuKeeperLogStore>(
, log_store(nuraft::cs_new<KeeperLogStore>(
config.getString(config_prefix + ".log_storage_path", config.getString("path", DBMS_DEFAULT_PATH) + "coordination/logs"),
coordination_settings->rotate_log_storage_interval, coordination_settings->force_sync))
, cluster_config(nuraft::cs_new<nuraft::cluster_config>())
@ -64,17 +64,17 @@ NuKeeperStateManager::NuKeeperStateManager(
throw Exception(ErrorCodes::RAFT_ERROR, "At least one of servers should be able to start as leader (without <start_as_follower>)");
}
void NuKeeperStateManager::loadLogStore(size_t last_commited_index, size_t logs_to_keep)
void KeeperStateManager::loadLogStore(size_t last_commited_index, size_t logs_to_keep)
{
log_store->init(last_commited_index, logs_to_keep);
}
void NuKeeperStateManager::flushLogStore()
void KeeperStateManager::flushLogStore()
{
log_store->flush();
}
void NuKeeperStateManager::save_config(const nuraft::cluster_config & config)
void KeeperStateManager::save_config(const nuraft::cluster_config & config)
{
// Just keep in memory in this example.
// Need to write to disk here, if want to make it durable.
@ -82,7 +82,7 @@ void NuKeeperStateManager::save_config(const nuraft::cluster_config & config)
cluster_config = nuraft::cluster_config::deserialize(*buf);
}
void NuKeeperStateManager::save_state(const nuraft::srv_state & state)
void KeeperStateManager::save_state(const nuraft::srv_state & state)
{
// Just keep in memory in this example.
// Need to write to disk here, if want to make it durable.

View File

@ -2,7 +2,7 @@
#include <Core/Types.h>
#include <string>
#include <Coordination/NuKeeperLogStore.h>
#include <Coordination/KeeperLogStore.h>
#include <Coordination/CoordinationSettings.h>
#include <libnuraft/nuraft.hxx> // Y_IGNORE
#include <Poco/Util/AbstractConfiguration.h>
@ -10,16 +10,16 @@
namespace DB
{
class NuKeeperStateManager : public nuraft::state_mgr
class KeeperStateManager : public nuraft::state_mgr
{
public:
NuKeeperStateManager(
KeeperStateManager(
int server_id_,
const std::string & config_prefix,
const Poco::Util::AbstractConfiguration & config,
const CoordinationSettingsPtr & coordination_settings);
NuKeeperStateManager(
KeeperStateManager(
int server_id_,
const std::string & host,
int port,
@ -52,7 +52,7 @@ public:
return start_as_follower_servers.count(my_server_id);
}
nuraft::ptr<NuKeeperLogStore> getLogStore() const { return log_store; }
nuraft::ptr<KeeperLogStore> getLogStore() const { return log_store; }
size_t getTotalServers() const { return total_servers; }
@ -61,7 +61,7 @@ private:
int my_port;
size_t total_servers{0};
std::unordered_set<int> start_as_follower_servers;
nuraft::ptr<NuKeeperLogStore> log_store;
nuraft::ptr<KeeperLogStore> log_store;
nuraft::ptr<nuraft::srv_config> my_server_config;
nuraft::ptr<nuraft::cluster_config> cluster_config;
nuraft::ptr<nuraft::srv_state> server_state;

View File

@ -1,4 +1,4 @@
#include <Coordination/NuKeeperStorage.h>
#include <Coordination/KeeperStorage.h>
#include <Common/ZooKeeper/IKeeper.h>
#include <Common/setThreadName.h>
#include <mutex>
@ -31,9 +31,9 @@ static std::string getBaseName(const String & path)
return std::string{&path[basename_start + 1], path.length() - basename_start - 1};
}
static NuKeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, NuKeeperStorage::Watches & watches, NuKeeperStorage::Watches & list_watches, Coordination::Event event_type)
static KeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches, Coordination::Event event_type)
{
NuKeeperStorage::ResponsesForSessions result;
KeeperStorage::ResponsesForSessions result;
auto it = watches.find(path);
if (it != watches.end())
{
@ -44,7 +44,7 @@ static NuKeeperStorage::ResponsesForSessions processWatchesImpl(const String & p
watch_response->type = event_type;
watch_response->state = Coordination::State::CONNECTED;
for (auto watcher_session : it->second)
result.push_back(NuKeeperStorage::ResponseForSession{watcher_session, watch_response});
result.push_back(KeeperStorage::ResponseForSession{watcher_session, watch_response});
watches.erase(it);
}
@ -60,14 +60,14 @@ static NuKeeperStorage::ResponsesForSessions processWatchesImpl(const String & p
watch_list_response->type = Coordination::Event::CHILD;
watch_list_response->state = Coordination::State::CONNECTED;
for (auto watcher_session : it->second)
result.push_back(NuKeeperStorage::ResponseForSession{watcher_session, watch_list_response});
result.push_back(KeeperStorage::ResponseForSession{watcher_session, watch_list_response});
list_watches.erase(it);
}
return result;
}
NuKeeperStorage::NuKeeperStorage(int64_t tick_time_ms)
KeeperStorage::KeeperStorage(int64_t tick_time_ms)
: session_expiry_queue(tick_time_ms)
{
container.insert("/", Node());
@ -75,32 +75,32 @@ NuKeeperStorage::NuKeeperStorage(int64_t tick_time_ms)
using Undo = std::function<void()>;
struct NuKeeperStorageRequest
struct KeeperStorageRequest
{
Coordination::ZooKeeperRequestPtr zk_request;
explicit NuKeeperStorageRequest(const Coordination::ZooKeeperRequestPtr & zk_request_)
explicit KeeperStorageRequest(const Coordination::ZooKeeperRequestPtr & zk_request_)
: zk_request(zk_request_)
{}
virtual std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const = 0;
virtual NuKeeperStorage::ResponsesForSessions processWatches(NuKeeperStorage::Watches & /*watches*/, NuKeeperStorage::Watches & /*list_watches*/) const { return {}; }
virtual std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage::Container & container, KeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const = 0;
virtual KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & /*watches*/, KeeperStorage::Watches & /*list_watches*/) const { return {}; }
virtual ~NuKeeperStorageRequest() = default;
virtual ~KeeperStorageRequest() = default;
};
struct NuKeeperStorageHeartbeatRequest final : public NuKeeperStorageRequest
struct KeeperStorageHeartbeatRequest final : public KeeperStorageRequest
{
using NuKeeperStorageRequest::NuKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & /* container */, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t /* zxid */, int64_t /* session_id */) const override
using KeeperStorageRequest::KeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage::Container & /* container */, KeeperStorage::Ephemerals & /* ephemerals */, int64_t /* zxid */, int64_t /* session_id */) const override
{
return {zk_request->makeResponse(), {}};
}
};
struct NuKeeperStorageSyncRequest final : public NuKeeperStorageRequest
struct KeeperStorageSyncRequest final : public KeeperStorageRequest
{
using NuKeeperStorageRequest::NuKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & /* container */, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t /* zxid */, int64_t /* session_id */) const override
using KeeperStorageRequest::KeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage::Container & /* container */, KeeperStorage::Ephemerals & /* ephemerals */, int64_t /* zxid */, int64_t /* session_id */) const override
{
auto response = zk_request->makeResponse();
dynamic_cast<Coordination::ZooKeeperSyncResponse *>(response.get())->path = dynamic_cast<Coordination::ZooKeeperSyncRequest *>(zk_request.get())->path;
@ -108,16 +108,16 @@ struct NuKeeperStorageSyncRequest final : public NuKeeperStorageRequest
}
};
struct NuKeeperStorageCreateRequest final : public NuKeeperStorageRequest
struct KeeperStorageCreateRequest final : public KeeperStorageRequest
{
using NuKeeperStorageRequest::NuKeeperStorageRequest;
using KeeperStorageRequest::KeeperStorageRequest;
NuKeeperStorage::ResponsesForSessions processWatches(NuKeeperStorage::Watches & watches, NuKeeperStorage::Watches & list_watches) const override
KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override
{
return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CREATED);
}
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage::Container & container, KeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Undo undo;
@ -143,7 +143,7 @@ struct NuKeeperStorageCreateRequest final : public NuKeeperStorageRequest
}
else
{
NuKeeperStorage::Node created_node;
KeeperStorage::Node created_node;
created_node.stat.czxid = zxid;
created_node.stat.mzxid = zxid;
created_node.stat.ctime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
@ -167,7 +167,7 @@ struct NuKeeperStorageCreateRequest final : public NuKeeperStorageRequest
}
auto child_path = getBaseName(path_created);
container.updateValue(parent_path, [child_path] (NuKeeperStorage::Node & parent)
container.updateValue(parent_path, [child_path] (KeeperStorage::Node & parent)
{
/// Increment sequential number even if node is not sequential
++parent.seq_num;
@ -188,7 +188,7 @@ struct NuKeeperStorageCreateRequest final : public NuKeeperStorageRequest
if (is_ephemeral)
ephemerals[session_id].erase(path_created);
container.updateValue(parent_path, [child_path] (NuKeeperStorage::Node & undo_parent)
container.updateValue(parent_path, [child_path] (KeeperStorage::Node & undo_parent)
{
--undo_parent.stat.cversion;
--undo_parent.stat.numChildren;
@ -205,10 +205,10 @@ struct NuKeeperStorageCreateRequest final : public NuKeeperStorageRequest
}
};
struct NuKeeperStorageGetRequest final : public NuKeeperStorageRequest
struct KeeperStorageGetRequest final : public KeeperStorageRequest
{
using NuKeeperStorageRequest::NuKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t /* zxid */, int64_t /* session_id */) const override
using KeeperStorageRequest::KeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage::Container & container, KeeperStorage::Ephemerals & /* ephemerals */, int64_t /* zxid */, int64_t /* session_id */) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperGetResponse & response = dynamic_cast<Coordination::ZooKeeperGetResponse &>(*response_ptr);
@ -230,10 +230,10 @@ struct NuKeeperStorageGetRequest final : public NuKeeperStorageRequest
}
};
struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest
struct KeeperStorageRemoveRequest final : public KeeperStorageRequest
{
using NuKeeperStorageRequest::NuKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & ephemerals, int64_t /*zxid*/, int64_t /*session_id*/) const override
using KeeperStorageRequest::KeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage::Container & container, KeeperStorage::Ephemerals & ephemerals, int64_t /*zxid*/, int64_t /*session_id*/) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperRemoveResponse & response = dynamic_cast<Coordination::ZooKeeperRemoveResponse &>(*response_ptr);
@ -265,7 +265,7 @@ struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest
}
auto child_basename = getBaseName(it->key);
container.updateValue(parentPath(request.path), [&child_basename] (NuKeeperStorage::Node & parent)
container.updateValue(parentPath(request.path), [&child_basename] (KeeperStorage::Node & parent)
{
--parent.stat.numChildren;
++parent.stat.cversion;
@ -282,7 +282,7 @@ struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest
ephemerals[prev_node.stat.ephemeralOwner].emplace(path);
container.insert(path, prev_node);
container.updateValue(parentPath(path), [&child_basename] (NuKeeperStorage::Node & parent)
container.updateValue(parentPath(path), [&child_basename] (KeeperStorage::Node & parent)
{
++parent.stat.numChildren;
--parent.stat.cversion;
@ -294,16 +294,16 @@ struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest
return { response_ptr, undo };
}
NuKeeperStorage::ResponsesForSessions processWatches(NuKeeperStorage::Watches & watches, NuKeeperStorage::Watches & list_watches) const override
KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override
{
return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED);
}
};
struct NuKeeperStorageExistsRequest final : public NuKeeperStorageRequest
struct KeeperStorageExistsRequest final : public KeeperStorageRequest
{
using NuKeeperStorageRequest::NuKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t /*zxid*/, int64_t /* session_id */) const override
using KeeperStorageRequest::KeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage::Container & container, KeeperStorage::Ephemerals & /* ephemerals */, int64_t /*zxid*/, int64_t /* session_id */) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperExistsResponse & response = dynamic_cast<Coordination::ZooKeeperExistsResponse &>(*response_ptr);
@ -324,10 +324,10 @@ struct NuKeeperStorageExistsRequest final : public NuKeeperStorageRequest
}
};
struct NuKeeperStorageSetRequest final : public NuKeeperStorageRequest
struct KeeperStorageSetRequest final : public KeeperStorageRequest
{
using NuKeeperStorageRequest::NuKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t zxid, int64_t /* session_id */) const override
using KeeperStorageRequest::KeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage::Container & container, KeeperStorage::Ephemerals & /* ephemerals */, int64_t zxid, int64_t /* session_id */) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperSetResponse & response = dynamic_cast<Coordination::ZooKeeperSetResponse &>(*response_ptr);
@ -343,7 +343,7 @@ struct NuKeeperStorageSetRequest final : public NuKeeperStorageRequest
{
auto prev_node = it->value;
auto itr = container.updateValue(request.path, [zxid, request] (NuKeeperStorage::Node & value)
auto itr = container.updateValue(request.path, [zxid, request] (KeeperStorage::Node & value)
{
value.data = request.data;
value.stat.version++;
@ -353,7 +353,7 @@ struct NuKeeperStorageSetRequest final : public NuKeeperStorageRequest
value.data = request.data;
});
container.updateValue(parentPath(request.path), [] (NuKeeperStorage::Node & parent)
container.updateValue(parentPath(request.path), [] (KeeperStorage::Node & parent)
{
parent.stat.cversion++;
});
@ -363,8 +363,8 @@ struct NuKeeperStorageSetRequest final : public NuKeeperStorageRequest
undo = [prev_node, &container, path = request.path]
{
container.updateValue(path, [&prev_node] (NuKeeperStorage::Node & value) { value = prev_node; });
container.updateValue(parentPath(path), [] (NuKeeperStorage::Node & parent)
container.updateValue(path, [&prev_node] (KeeperStorage::Node & value) { value = prev_node; });
container.updateValue(parentPath(path), [] (KeeperStorage::Node & parent)
{
parent.stat.cversion--;
});
@ -378,16 +378,16 @@ struct NuKeeperStorageSetRequest final : public NuKeeperStorageRequest
return { response_ptr, undo };
}
NuKeeperStorage::ResponsesForSessions processWatches(NuKeeperStorage::Watches & watches, NuKeeperStorage::Watches & list_watches) const override
KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override
{
return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CHANGED);
}
};
struct NuKeeperStorageListRequest final : public NuKeeperStorageRequest
struct KeeperStorageListRequest final : public KeeperStorageRequest
{
using NuKeeperStorageRequest::NuKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t /*zxid*/, int64_t /*session_id*/) const override
using KeeperStorageRequest::KeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage::Container & container, KeeperStorage::Ephemerals & /* ephemerals */, int64_t /*zxid*/, int64_t /*session_id*/) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperListResponse & response = dynamic_cast<Coordination::ZooKeeperListResponse &>(*response_ptr);
@ -415,10 +415,10 @@ struct NuKeeperStorageListRequest final : public NuKeeperStorageRequest
}
};
struct NuKeeperStorageCheckRequest final : public NuKeeperStorageRequest
struct KeeperStorageCheckRequest final : public KeeperStorageRequest
{
using NuKeeperStorageRequest::NuKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t /*zxid*/, int64_t /*session_id*/) const override
using KeeperStorageRequest::KeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage::Container & container, KeeperStorage::Ephemerals & /* ephemerals */, int64_t /*zxid*/, int64_t /*session_id*/) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperCheckResponse & response = dynamic_cast<Coordination::ZooKeeperCheckResponse &>(*response_ptr);
@ -441,11 +441,11 @@ struct NuKeeperStorageCheckRequest final : public NuKeeperStorageRequest
}
};
struct NuKeeperStorageMultiRequest final : public NuKeeperStorageRequest
struct KeeperStorageMultiRequest final : public KeeperStorageRequest
{
std::vector<NuKeeperStorageRequestPtr> concrete_requests;
explicit NuKeeperStorageMultiRequest(const Coordination::ZooKeeperRequestPtr & zk_request_)
: NuKeeperStorageRequest(zk_request_)
std::vector<KeeperStorageRequestPtr> concrete_requests;
explicit KeeperStorageMultiRequest(const Coordination::ZooKeeperRequestPtr & zk_request_)
: KeeperStorageRequest(zk_request_)
{
Coordination::ZooKeeperMultiRequest & request = dynamic_cast<Coordination::ZooKeeperMultiRequest &>(*zk_request);
concrete_requests.reserve(request.requests.size());
@ -455,26 +455,26 @@ struct NuKeeperStorageMultiRequest final : public NuKeeperStorageRequest
auto sub_zk_request = std::dynamic_pointer_cast<Coordination::ZooKeeperRequest>(sub_request);
if (sub_zk_request->getOpNum() == Coordination::OpNum::Create)
{
concrete_requests.push_back(std::make_shared<NuKeeperStorageCreateRequest>(sub_zk_request));
concrete_requests.push_back(std::make_shared<KeeperStorageCreateRequest>(sub_zk_request));
}
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Remove)
{
concrete_requests.push_back(std::make_shared<NuKeeperStorageRemoveRequest>(sub_zk_request));
concrete_requests.push_back(std::make_shared<KeeperStorageRemoveRequest>(sub_zk_request));
}
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Set)
{
concrete_requests.push_back(std::make_shared<NuKeeperStorageSetRequest>(sub_zk_request));
concrete_requests.push_back(std::make_shared<KeeperStorageSetRequest>(sub_zk_request));
}
else if (sub_zk_request->getOpNum() == Coordination::OpNum::Check)
{
concrete_requests.push_back(std::make_shared<NuKeeperStorageCheckRequest>(sub_zk_request));
concrete_requests.push_back(std::make_shared<KeeperStorageCheckRequest>(sub_zk_request));
}
else
throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Illegal command as part of multi ZooKeeper request {}", sub_zk_request->getOpNum());
}
}
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage::Container & container, KeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperMultiResponse & response = dynamic_cast<Coordination::ZooKeeperMultiResponse &>(*response_ptr);
@ -527,9 +527,9 @@ struct NuKeeperStorageMultiRequest final : public NuKeeperStorageRequest
}
}
NuKeeperStorage::ResponsesForSessions processWatches(NuKeeperStorage::Watches & watches, NuKeeperStorage::Watches & list_watches) const override
KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override
{
NuKeeperStorage::ResponsesForSessions result;
KeeperStorage::ResponsesForSessions result;
for (const auto & generic_request : concrete_requests)
{
auto responses = generic_request->processWatches(watches, list_watches);
@ -539,16 +539,16 @@ struct NuKeeperStorageMultiRequest final : public NuKeeperStorageRequest
}
};
struct NuKeeperStorageCloseRequest final : public NuKeeperStorageRequest
struct KeeperStorageCloseRequest final : public KeeperStorageRequest
{
using NuKeeperStorageRequest::NuKeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(NuKeeperStorage::Container &, NuKeeperStorage::Ephemerals &, int64_t, int64_t) const override
using KeeperStorageRequest::KeeperStorageRequest;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage::Container &, KeeperStorage::Ephemerals &, int64_t, int64_t) const override
{
throw DB::Exception("Called process on close request", ErrorCodes::LOGICAL_ERROR);
}
};
void NuKeeperStorage::finalize()
void KeeperStorage::finalize()
{
if (finalized)
throw DB::Exception("Testkeeper storage already finalized", ErrorCodes::LOGICAL_ERROR);
@ -568,20 +568,20 @@ void NuKeeperStorage::finalize()
}
class NuKeeperWrapperFactory final : private boost::noncopyable
class KeeperWrapperFactory final : private boost::noncopyable
{
public:
using Creator = std::function<NuKeeperStorageRequestPtr(const Coordination::ZooKeeperRequestPtr &)>;
using Creator = std::function<KeeperStorageRequestPtr(const Coordination::ZooKeeperRequestPtr &)>;
using OpNumToRequest = std::unordered_map<Coordination::OpNum, Creator>;
static NuKeeperWrapperFactory & instance()
static KeeperWrapperFactory & instance()
{
static NuKeeperWrapperFactory factory;
static KeeperWrapperFactory factory;
return factory;
}
NuKeeperStorageRequestPtr get(const Coordination::ZooKeeperRequestPtr & zk_request) const
KeeperStorageRequestPtr get(const Coordination::ZooKeeperRequestPtr & zk_request) const
{
auto it = op_num_to_request.find(zk_request->getOpNum());
if (it == op_num_to_request.end())
@ -598,37 +598,37 @@ public:
private:
OpNumToRequest op_num_to_request;
NuKeeperWrapperFactory();
KeeperWrapperFactory();
};
template<Coordination::OpNum num, typename RequestT>
void registerNuKeeperRequestWrapper(NuKeeperWrapperFactory & factory)
void registerKeeperRequestWrapper(KeeperWrapperFactory & factory)
{
factory.registerRequest(num, [] (const Coordination::ZooKeeperRequestPtr & zk_request) { return std::make_shared<RequestT>(zk_request); });
}
NuKeeperWrapperFactory::NuKeeperWrapperFactory()
KeeperWrapperFactory::KeeperWrapperFactory()
{
registerNuKeeperRequestWrapper<Coordination::OpNum::Heartbeat, NuKeeperStorageHeartbeatRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Sync, NuKeeperStorageSyncRequest>(*this);
//registerNuKeeperRequestWrapper<Coordination::OpNum::Auth, NuKeeperStorageAuthRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Close, NuKeeperStorageCloseRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Create, NuKeeperStorageCreateRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Remove, NuKeeperStorageRemoveRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Exists, NuKeeperStorageExistsRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Get, NuKeeperStorageGetRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Set, NuKeeperStorageSetRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::List, NuKeeperStorageListRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::SimpleList, NuKeeperStorageListRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Check, NuKeeperStorageCheckRequest>(*this);
registerNuKeeperRequestWrapper<Coordination::OpNum::Multi, NuKeeperStorageMultiRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Heartbeat, KeeperStorageHeartbeatRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Sync, KeeperStorageSyncRequest>(*this);
//registerKeeperRequestWrapper<Coordination::OpNum::Auth, KeeperStorageAuthRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Close, KeeperStorageCloseRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Create, KeeperStorageCreateRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Remove, KeeperStorageRemoveRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Exists, KeeperStorageExistsRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Get, KeeperStorageGetRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Set, KeeperStorageSetRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::List, KeeperStorageListRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::SimpleList, KeeperStorageListRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Check, KeeperStorageCheckRequest>(*this);
registerKeeperRequestWrapper<Coordination::OpNum::Multi, KeeperStorageMultiRequest>(*this);
}
NuKeeperStorage::ResponsesForSessions NuKeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id, std::optional<int64_t> new_last_zxid)
KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id, std::optional<int64_t> new_last_zxid)
{
NuKeeperStorage::ResponsesForSessions results;
KeeperStorage::ResponsesForSessions results;
if (new_last_zxid)
{
if (zxid >= *new_last_zxid)
@ -645,7 +645,7 @@ NuKeeperStorage::ResponsesForSessions NuKeeperStorage::processRequest(const Coor
for (const auto & ephemeral_path : it->second)
{
container.erase(ephemeral_path);
container.updateValue(parentPath(ephemeral_path), [&ephemeral_path] (NuKeeperStorage::Node & parent)
container.updateValue(parentPath(ephemeral_path), [&ephemeral_path] (KeeperStorage::Node & parent)
{
--parent.stat.numChildren;
++parent.stat.cversion;
@ -669,7 +669,7 @@ NuKeeperStorage::ResponsesForSessions NuKeeperStorage::processRequest(const Coor
}
else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat)
{
NuKeeperStorageRequestPtr storage_request = NuKeeperWrapperFactory::instance().get(zk_request);
KeeperStorageRequestPtr storage_request = KeeperWrapperFactory::instance().get(zk_request);
auto [response, _] = storage_request->process(container, ephemerals, zxid, session_id);
response->xid = zk_request->xid;
response->zxid = getZXID();
@ -678,7 +678,7 @@ NuKeeperStorage::ResponsesForSessions NuKeeperStorage::processRequest(const Coor
}
else
{
NuKeeperStorageRequestPtr storage_request = NuKeeperWrapperFactory::instance().get(zk_request);
KeeperStorageRequestPtr storage_request = KeeperWrapperFactory::instance().get(zk_request);
auto [response, _] = storage_request->process(container, ephemerals, zxid, session_id);
if (zk_request->has_watch)
@ -715,7 +715,7 @@ NuKeeperStorage::ResponsesForSessions NuKeeperStorage::processRequest(const Coor
}
void NuKeeperStorage::clearDeadWatches(int64_t session_id)
void KeeperStorage::clearDeadWatches(int64_t session_id)
{
auto watches_it = sessions_and_watchers.find(session_id);
if (watches_it != sessions_and_watchers.end())

View File

@ -14,15 +14,15 @@ namespace DB
{
using namespace DB;
struct NuKeeperStorageRequest;
using NuKeeperStorageRequestPtr = std::shared_ptr<NuKeeperStorageRequest>;
struct KeeperStorageRequest;
using KeeperStorageRequestPtr = std::shared_ptr<KeeperStorageRequest>;
using ResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr &)>;
using ChildrenSet = std::unordered_set<std::string>;
using SessionAndTimeout = std::unordered_map<int64_t, int64_t>;
struct NuKeeperStorageSnapshot;
struct KeeperStorageSnapshot;
class NuKeeperStorage
class KeeperStorage
{
public:
int64_t session_id_counter{1};
@ -80,7 +80,7 @@ public:
}
public:
NuKeeperStorage(int64_t tick_time_ms);
KeeperStorage(int64_t tick_time_ms);
int64_t getSessionID(int64_t session_timeout_ms)
{
@ -131,6 +131,6 @@ public:
}
};
using NuKeeperStoragePtr = std::unique_ptr<NuKeeperStorage>;
using KeeperStoragePtr = std::unique_ptr<KeeperStorage>;
}

View File

@ -1,4 +1,4 @@
#include <Coordination/NuKeeperStorageDispatcher.h>
#include <Coordination/KeeperStorageDispatcher.h>
#include <Common/setThreadName.h>
namespace DB
@ -11,18 +11,18 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
}
NuKeeperStorageDispatcher::NuKeeperStorageDispatcher()
KeeperStorageDispatcher::KeeperStorageDispatcher()
: coordination_settings(std::make_shared<CoordinationSettings>())
, log(&Poco::Logger::get("NuKeeperDispatcher"))
, log(&Poco::Logger::get("KeeperDispatcher"))
{
}
void NuKeeperStorageDispatcher::requestThread()
void KeeperStorageDispatcher::requestThread()
{
setThreadName("NuKeeperReqT");
setThreadName("KeeperReqT");
while (!shutdown_called)
{
NuKeeperStorage::RequestForSession request;
KeeperStorage::RequestForSession request;
UInt64 max_wait = UInt64(coordination_settings->operation_timeout_ms.totalMilliseconds());
@ -43,12 +43,12 @@ void NuKeeperStorageDispatcher::requestThread()
}
}
void NuKeeperStorageDispatcher::responseThread()
void KeeperStorageDispatcher::responseThread()
{
setThreadName("NuKeeperRspT");
setThreadName("KeeperRspT");
while (!shutdown_called)
{
NuKeeperStorage::ResponseForSession response_for_session;
KeeperStorage::ResponseForSession response_for_session;
UInt64 max_wait = UInt64(coordination_settings->operation_timeout_ms.totalMilliseconds());
@ -69,9 +69,9 @@ void NuKeeperStorageDispatcher::responseThread()
}
}
void NuKeeperStorageDispatcher::snapshotThread()
void KeeperStorageDispatcher::snapshotThread()
{
setThreadName("NuKeeperSnpT");
setThreadName("KeeperSnpT");
while (!shutdown_called)
{
CreateSnapshotTask task;
@ -91,7 +91,7 @@ void NuKeeperStorageDispatcher::snapshotThread()
}
}
void NuKeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)
void KeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response)
{
std::lock_guard lock(session_to_response_callback_mutex);
auto session_writer = session_to_response_callback.find(session_id);
@ -104,7 +104,7 @@ void NuKeeperStorageDispatcher::setResponse(int64_t session_id, const Coordinati
session_to_response_callback.erase(session_writer);
}
bool NuKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
bool KeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id)
{
{
std::lock_guard lock(session_to_response_callback_mutex);
@ -112,7 +112,7 @@ bool NuKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestP
return false;
}
NuKeeperStorage::RequestForSession request_info;
KeeperStorage::RequestForSession request_info;
request_info.request = request;
request_info.session_id = session_id;
@ -125,18 +125,18 @@ bool NuKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestP
return true;
}
void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfiguration & config)
void KeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfiguration & config)
{
LOG_DEBUG(log, "Initializing storage dispatcher");
int myid = config.getInt("test_keeper_server.server_id");
int myid = config.getInt("keeper_server.server_id");
coordination_settings->loadFromConfig("test_keeper_server.coordination_settings", config);
coordination_settings->loadFromConfig("keeper_server.coordination_settings", config);
request_thread = ThreadFromGlobalPool([this] { requestThread(); });
responses_thread = ThreadFromGlobalPool([this] { responseThread(); });
snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); });
server = std::make_unique<NuKeeperServer>(myid, coordination_settings, config, responses_queue, snapshots_queue);
server = std::make_unique<KeeperServer>(myid, coordination_settings, config, responses_queue, snapshots_queue);
try
{
LOG_DEBUG(log, "Waiting server to initialize");
@ -158,7 +158,7 @@ void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigurati
LOG_DEBUG(log, "Dispatcher initialized");
}
void NuKeeperStorageDispatcher::shutdown()
void KeeperStorageDispatcher::shutdown()
{
try
{
@ -191,7 +191,7 @@ void NuKeeperStorageDispatcher::shutdown()
if (server)
server->shutdown();
NuKeeperStorage::RequestForSession request_for_session;
KeeperStorage::RequestForSession request_for_session;
while (requests_queue.tryPop(request_for_session))
{
if (request_for_session.request)
@ -215,19 +215,19 @@ void NuKeeperStorageDispatcher::shutdown()
LOG_DEBUG(log, "Dispatcher shut down");
}
NuKeeperStorageDispatcher::~NuKeeperStorageDispatcher()
KeeperStorageDispatcher::~KeeperStorageDispatcher()
{
shutdown();
}
void NuKeeperStorageDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCallback callback)
void KeeperStorageDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCallback callback)
{
std::lock_guard lock(session_to_response_callback_mutex);
if (!session_to_response_callback.try_emplace(session_id, callback).second)
throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session with id {} already registered in dispatcher", session_id);
}
void NuKeeperStorageDispatcher::sessionCleanerTask()
void KeeperStorageDispatcher::sessionCleanerTask()
{
while (true)
{
@ -244,7 +244,7 @@ void NuKeeperStorageDispatcher::sessionCleanerTask()
LOG_INFO(log, "Found dead session {}, will try to close it", dead_session);
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
request->xid = Coordination::CLOSE_XID;
NuKeeperStorage::RequestForSession request_info;
KeeperStorage::RequestForSession request_info;
request_info.request = request;
request_info.session_id = dead_session;
{
@ -265,7 +265,7 @@ void NuKeeperStorageDispatcher::sessionCleanerTask()
}
}
void NuKeeperStorageDispatcher::finishSession(int64_t session_id)
void KeeperStorageDispatcher::finishSession(int64_t session_id)
{
std::lock_guard lock(session_to_response_callback_mutex);
auto session_it = session_to_response_callback.find(session_id);

View File

@ -13,7 +13,7 @@
#include <Common/Exception.h>
#include <common/logger_useful.h>
#include <functional>
#include <Coordination/NuKeeperServer.h>
#include <Coordination/KeeperServer.h>
#include <Coordination/CoordinationSettings.h>
@ -22,14 +22,14 @@ namespace DB
using ZooKeeperResponseCallback = std::function<void(const Coordination::ZooKeeperResponsePtr & response)>;
class NuKeeperStorageDispatcher
class KeeperStorageDispatcher
{
private:
std::mutex push_request_mutex;
CoordinationSettingsPtr coordination_settings;
using RequestsQueue = ConcurrentBoundedQueue<NuKeeperStorage::RequestForSession>;
using RequestsQueue = ConcurrentBoundedQueue<KeeperStorage::RequestForSession>;
using SessionToResponseCallback = std::unordered_map<int64_t, ZooKeeperResponseCallback>;
RequestsQueue requests_queue{1};
@ -46,7 +46,7 @@ private:
ThreadFromGlobalPool session_cleaner_thread;
ThreadFromGlobalPool snapshot_thread;
std::unique_ptr<NuKeeperServer> server;
std::unique_ptr<KeeperServer> server;
Poco::Logger * log;
@ -58,13 +58,13 @@ private:
void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response);
public:
NuKeeperStorageDispatcher();
KeeperStorageDispatcher();
void initialize(const Poco::Util::AbstractConfiguration & config);
void shutdown();
~NuKeeperStorageDispatcher();
~KeeperStorageDispatcher();
bool putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id);

View File

@ -1,24 +0,0 @@
#pragma once
#include <Common/ZooKeeper/ZooKeeperCommon.h>
namespace DB
{
struct NuKeeperRequest
{
int64_t session_id;
Coordination::ZooKeeperRequestPtr request;
};
using NuKeeperRequests = std::vector<NuKeeperRequest>;
struct NuKeeperResponse
{
int64_t session_id;
Coordination::ZooKeeperRequestPtr response;
};
using NuKeeperResponses = std::vector<NuKeeperResponse>;
}

View File

@ -9,10 +9,10 @@
#include <Poco/ConsoleChannel.h>
#include <Poco/Logger.h>
#include <Coordination/InMemoryLogStore.h>
#include <Coordination/NuKeeperStateManager.h>
#include <Coordination/NuKeeperSnapshotManager.h>
#include <Coordination/KeeperStateManager.h>
#include <Coordination/KeeperSnapshotManager.h>
#include <Coordination/SummingStateMachine.h>
#include <Coordination/NuKeeperStateMachine.h>
#include <Coordination/KeeperStateMachine.h>
#include <Coordination/LoggerWrapper.h>
#include <Coordination/WriteBufferFromNuraftBuffer.h>
#include <Coordination/ReadBufferFromNuraftBuffer.h>
@ -24,7 +24,7 @@
#include <common/logger_useful.h>
#include <libnuraft/nuraft.hxx> // Y_IGNORE
#include <thread>
#include <Coordination/NuKeeperLogStore.h>
#include <Coordination/KeeperLogStore.h>
#include <Coordination/Changelog.h>
#include <filesystem>
@ -102,7 +102,7 @@ struct SimpliestRaftServer
, port(port_)
, endpoint(hostname + ":" + std::to_string(port))
, state_machine(nuraft::cs_new<StateMachine>())
, state_manager(nuraft::cs_new<DB::NuKeeperStateManager>(server_id, hostname, port, logs_path))
, state_manager(nuraft::cs_new<DB::KeeperStateManager>(server_id, hostname, port, logs_path))
{
state_manager->loadLogStore(1, 0);
nuraft::raft_params params;
@ -153,7 +153,7 @@ struct SimpliestRaftServer
nuraft::ptr<StateMachine> state_machine;
// State manager.
nuraft::ptr<DB::NuKeeperStateManager> state_manager;
nuraft::ptr<DB::KeeperStateManager> state_manager;
// Raft launcher.
nuraft::raft_launcher launcher;
@ -207,7 +207,7 @@ DB::LogEntryPtr getLogEntry(const std::string & s, size_t term)
TEST(CoordinationTest, ChangelogTestSimple)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 5, true);
DB::KeeperLogStore changelog("./logs", 5, true);
changelog.init(1, 0);
auto entry = getLogEntry("hello world", 77);
changelog.append(entry);
@ -221,7 +221,7 @@ TEST(CoordinationTest, ChangelogTestSimple)
TEST(CoordinationTest, ChangelogTestFile)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 5, true);
DB::KeeperLogStore changelog("./logs", 5, true);
changelog.init(1, 0);
auto entry = getLogEntry("hello world", 77);
changelog.append(entry);
@ -242,7 +242,7 @@ TEST(CoordinationTest, ChangelogTestFile)
TEST(CoordinationTest, ChangelogReadWrite)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 1000, true);
DB::KeeperLogStore changelog("./logs", 1000, true);
changelog.init(1, 0);
for (size_t i = 0; i < 10; ++i)
{
@ -250,7 +250,7 @@ TEST(CoordinationTest, ChangelogReadWrite)
changelog.append(entry);
}
EXPECT_EQ(changelog.size(), 10);
DB::NuKeeperLogStore changelog_reader("./logs", 1000, true);
DB::KeeperLogStore changelog_reader("./logs", 1000, true);
changelog_reader.init(1, 0);
EXPECT_EQ(changelog_reader.size(), 10);
EXPECT_EQ(changelog_reader.last_entry()->get_term(), changelog.last_entry()->get_term());
@ -269,7 +269,7 @@ TEST(CoordinationTest, ChangelogReadWrite)
TEST(CoordinationTest, ChangelogWriteAt)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 1000, true);
DB::KeeperLogStore changelog("./logs", 1000, true);
changelog.init(1, 0);
for (size_t i = 0; i < 10; ++i)
{
@ -285,7 +285,7 @@ TEST(CoordinationTest, ChangelogWriteAt)
EXPECT_EQ(changelog.entry_at(7)->get_term(), 77);
EXPECT_EQ(changelog.next_slot(), 8);
DB::NuKeeperLogStore changelog_reader("./logs", 1000, true);
DB::KeeperLogStore changelog_reader("./logs", 1000, true);
changelog_reader.init(1, 0);
EXPECT_EQ(changelog_reader.size(), changelog.size());
@ -298,7 +298,7 @@ TEST(CoordinationTest, ChangelogWriteAt)
TEST(CoordinationTest, ChangelogTestAppendAfterRead)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 5, true);
DB::KeeperLogStore changelog("./logs", 5, true);
changelog.init(1, 0);
for (size_t i = 0; i < 7; ++i)
{
@ -310,7 +310,7 @@ TEST(CoordinationTest, ChangelogTestAppendAfterRead)
EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin"));
EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin"));
DB::NuKeeperLogStore changelog_reader("./logs", 5, true);
DB::KeeperLogStore changelog_reader("./logs", 5, true);
changelog_reader.init(1, 0);
EXPECT_EQ(changelog_reader.size(), 7);
@ -346,7 +346,7 @@ TEST(CoordinationTest, ChangelogTestAppendAfterRead)
TEST(CoordinationTest, ChangelogTestCompaction)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 5, true);
DB::KeeperLogStore changelog("./logs", 5, true);
changelog.init(1, 0);
for (size_t i = 0; i < 3; ++i)
@ -387,7 +387,7 @@ TEST(CoordinationTest, ChangelogTestCompaction)
EXPECT_EQ(changelog.next_slot(), 8);
EXPECT_EQ(changelog.last_entry()->get_term(), 60);
/// And we able to read it
DB::NuKeeperLogStore changelog_reader("./logs", 5, true);
DB::KeeperLogStore changelog_reader("./logs", 5, true);
changelog_reader.init(7, 0);
EXPECT_EQ(changelog_reader.size(), 1);
EXPECT_EQ(changelog_reader.start_index(), 7);
@ -398,7 +398,7 @@ TEST(CoordinationTest, ChangelogTestCompaction)
TEST(CoordinationTest, ChangelogTestBatchOperations)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 100, true);
DB::KeeperLogStore changelog("./logs", 100, true);
changelog.init(1, 0);
for (size_t i = 0; i < 10; ++i)
{
@ -410,7 +410,7 @@ TEST(CoordinationTest, ChangelogTestBatchOperations)
auto entries = changelog.pack(1, 5);
DB::NuKeeperLogStore apply_changelog("./logs", 100, true);
DB::KeeperLogStore apply_changelog("./logs", 100, true);
apply_changelog.init(1, 0);
for (size_t i = 0; i < 10; ++i)
@ -440,7 +440,7 @@ TEST(CoordinationTest, ChangelogTestBatchOperations)
TEST(CoordinationTest, ChangelogTestBatchOperationsEmpty)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 100, true);
DB::KeeperLogStore changelog("./logs", 100, true);
changelog.init(1, 0);
for (size_t i = 0; i < 10; ++i)
{
@ -453,7 +453,7 @@ TEST(CoordinationTest, ChangelogTestBatchOperationsEmpty)
auto entries = changelog.pack(5, 5);
ChangelogDirTest test1("./logs1");
DB::NuKeeperLogStore changelog_new("./logs1", 100, true);
DB::KeeperLogStore changelog_new("./logs1", 100, true);
changelog_new.init(1, 0);
EXPECT_EQ(changelog_new.size(), 0);
@ -472,7 +472,7 @@ TEST(CoordinationTest, ChangelogTestBatchOperationsEmpty)
EXPECT_EQ(changelog_new.start_index(), 5);
EXPECT_EQ(changelog_new.next_slot(), 11);
DB::NuKeeperLogStore changelog_reader("./logs1", 100, true);
DB::KeeperLogStore changelog_reader("./logs1", 100, true);
changelog_reader.init(5, 0);
}
@ -480,7 +480,7 @@ TEST(CoordinationTest, ChangelogTestBatchOperationsEmpty)
TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 5, true);
DB::KeeperLogStore changelog("./logs", 5, true);
changelog.init(1, 0);
for (size_t i = 0; i < 33; ++i)
@ -515,7 +515,7 @@ TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile)
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin"));
DB::NuKeeperLogStore changelog_read("./logs", 5, true);
DB::KeeperLogStore changelog_read("./logs", 5, true);
changelog_read.init(1, 0);
EXPECT_EQ(changelog_read.size(), 7);
EXPECT_EQ(changelog_read.start_index(), 1);
@ -526,7 +526,7 @@ TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile)
TEST(CoordinationTest, ChangelogTestWriteAtFileBorder)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 5, true);
DB::KeeperLogStore changelog("./logs", 5, true);
changelog.init(1, 0);
for (size_t i = 0; i < 33; ++i)
@ -561,7 +561,7 @@ TEST(CoordinationTest, ChangelogTestWriteAtFileBorder)
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin"));
DB::NuKeeperLogStore changelog_read("./logs", 5, true);
DB::KeeperLogStore changelog_read("./logs", 5, true);
changelog_read.init(1, 0);
EXPECT_EQ(changelog_read.size(), 11);
EXPECT_EQ(changelog_read.start_index(), 1);
@ -572,7 +572,7 @@ TEST(CoordinationTest, ChangelogTestWriteAtFileBorder)
TEST(CoordinationTest, ChangelogTestWriteAtAllFiles)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 5, true);
DB::KeeperLogStore changelog("./logs", 5, true);
changelog.init(1, 0);
for (size_t i = 0; i < 33; ++i)
@ -611,7 +611,7 @@ TEST(CoordinationTest, ChangelogTestWriteAtAllFiles)
TEST(CoordinationTest, ChangelogTestStartNewLogAfterRead)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 5, true);
DB::KeeperLogStore changelog("./logs", 5, true);
changelog.init(1, 0);
for (size_t i = 0; i < 35; ++i)
@ -630,7 +630,7 @@ TEST(CoordinationTest, ChangelogTestStartNewLogAfterRead)
EXPECT_FALSE(fs::exists("./logs/changelog_36_40.bin"));
DB::NuKeeperLogStore changelog_reader("./logs", 5, true);
DB::KeeperLogStore changelog_reader("./logs", 5, true);
changelog_reader.init(1, 0);
auto entry = getLogEntry("36_hello_world", 360);
@ -652,7 +652,7 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 5, true);
DB::KeeperLogStore changelog("./logs", 5, true);
changelog.init(1, 0);
for (size_t i = 0; i < 35; ++i)
@ -672,7 +672,7 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
DB::WriteBufferFromFile plain_buf("./logs/changelog_11_15.bin", DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(0);
DB::NuKeeperLogStore changelog_reader("./logs", 5, true);
DB::KeeperLogStore changelog_reader("./logs", 5, true);
changelog_reader.init(1, 0);
EXPECT_EQ(changelog_reader.size(), 10);
@ -701,7 +701,7 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate)
EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin"));
EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin"));
DB::NuKeeperLogStore changelog_reader2("./logs", 5, true);
DB::KeeperLogStore changelog_reader2("./logs", 5, true);
changelog_reader2.init(1, 0);
EXPECT_EQ(changelog_reader2.size(), 11);
EXPECT_EQ(changelog_reader2.last_entry()->get_term(), 7777);
@ -711,7 +711,7 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 20, true);
DB::KeeperLogStore changelog("./logs", 20, true);
changelog.init(1, 0);
for (size_t i = 0; i < 35; ++i)
@ -726,7 +726,7 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
DB::WriteBufferFromFile plain_buf("./logs/changelog_1_20.bin", DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
plain_buf.truncate(140);
DB::NuKeeperLogStore changelog_reader("./logs", 20, true);
DB::KeeperLogStore changelog_reader("./logs", 20, true);
changelog_reader.init(1, 0);
EXPECT_EQ(changelog_reader.size(), 2);
@ -739,7 +739,7 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2)
EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777);
DB::NuKeeperLogStore changelog_reader2("./logs", 20, true);
DB::KeeperLogStore changelog_reader2("./logs", 20, true);
changelog_reader2.init(1, 0);
EXPECT_EQ(changelog_reader2.size(), 3);
EXPECT_EQ(changelog_reader2.last_entry()->get_term(), 7777);
@ -749,7 +749,7 @@ TEST(CoordinationTest, ChangelogTestLostFiles)
{
ChangelogDirTest test("./logs");
DB::NuKeeperLogStore changelog("./logs", 20, true);
DB::KeeperLogStore changelog("./logs", 20, true);
changelog.init(1, 0);
for (size_t i = 0; i < 35; ++i)
@ -763,7 +763,7 @@ TEST(CoordinationTest, ChangelogTestLostFiles)
fs::remove("./logs/changelog_1_20.bin");
DB::NuKeeperLogStore changelog_reader("./logs", 20, true);
DB::KeeperLogStore changelog_reader("./logs", 20, true);
/// It should print error message, but still able to start
changelog_reader.init(5, 0);
EXPECT_FALSE(fs::exists("./logs/changelog_1_20.bin"));
@ -862,9 +862,9 @@ TEST(CoordinationTest, SnapshotableHashMapTrySnapshot)
map_snp.disableSnapshotMode();
}
void addNode(DB::NuKeeperStorage & storage, const std::string & path, const std::string & data, int64_t ephemeral_owner=0)
void addNode(DB::KeeperStorage & storage, const std::string & path, const std::string & data, int64_t ephemeral_owner=0)
{
using Node = DB::NuKeeperStorage::Node;
using Node = DB::KeeperStorage::Node;
Node node{};
node.data = data;
node.stat.ephemeralOwner = ephemeral_owner;
@ -874,9 +874,9 @@ void addNode(DB::NuKeeperStorage & storage, const std::string & path, const std:
TEST(CoordinationTest, TestStorageSnapshotSimple)
{
ChangelogDirTest test("./snapshots");
DB::NuKeeperSnapshotManager manager("./snapshots", 3);
DB::KeeperSnapshotManager manager("./snapshots", 3);
DB::NuKeeperStorage storage(500);
DB::KeeperStorage storage(500);
addNode(storage, "/hello", "world", 1);
addNode(storage, "/hello/somepath", "somedata", 3);
storage.session_id_counter = 5;
@ -886,7 +886,7 @@ TEST(CoordinationTest, TestStorageSnapshotSimple)
storage.getSessionID(130);
storage.getSessionID(130);
DB::NuKeeperStorageSnapshot snapshot(&storage, 2);
DB::KeeperStorageSnapshot snapshot(&storage, 2);
EXPECT_EQ(snapshot.snapshot_meta->get_last_log_idx(), 2);
EXPECT_EQ(snapshot.session_id, 7);
@ -921,9 +921,9 @@ TEST(CoordinationTest, TestStorageSnapshotSimple)
TEST(CoordinationTest, TestStorageSnapshotMoreWrites)
{
ChangelogDirTest test("./snapshots");
DB::NuKeeperSnapshotManager manager("./snapshots", 3);
DB::KeeperSnapshotManager manager("./snapshots", 3);
DB::NuKeeperStorage storage(500);
DB::KeeperStorage storage(500);
storage.getSessionID(130);
for (size_t i = 0; i < 50; ++i)
@ -931,7 +931,7 @@ TEST(CoordinationTest, TestStorageSnapshotMoreWrites)
addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i));
}
DB::NuKeeperStorageSnapshot snapshot(&storage, 50);
DB::KeeperStorageSnapshot snapshot(&storage, 50);
EXPECT_EQ(snapshot.snapshot_meta->get_last_log_idx(), 50);
EXPECT_EQ(snapshot.snapshot_container_size, 51);
@ -961,9 +961,9 @@ TEST(CoordinationTest, TestStorageSnapshotMoreWrites)
TEST(CoordinationTest, TestStorageSnapshotManySnapshots)
{
ChangelogDirTest test("./snapshots");
DB::NuKeeperSnapshotManager manager("./snapshots", 3);
DB::KeeperSnapshotManager manager("./snapshots", 3);
DB::NuKeeperStorage storage(500);
DB::KeeperStorage storage(500);
storage.getSessionID(130);
for (size_t j = 1; j <= 5; ++j)
@ -973,7 +973,7 @@ TEST(CoordinationTest, TestStorageSnapshotManySnapshots)
addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i));
}
DB::NuKeeperStorageSnapshot snapshot(&storage, j * 50);
DB::KeeperStorageSnapshot snapshot(&storage, j * 50);
auto buf = manager.serializeSnapshotToBuffer(snapshot);
manager.serializeSnapshotBufferToDisk(*buf, j * 50);
EXPECT_TRUE(fs::exists(std::string{"./snapshots/snapshot_"} + std::to_string(j * 50) + ".bin"));
@ -999,15 +999,15 @@ TEST(CoordinationTest, TestStorageSnapshotManySnapshots)
TEST(CoordinationTest, TestStorageSnapshotMode)
{
ChangelogDirTest test("./snapshots");
DB::NuKeeperSnapshotManager manager("./snapshots", 3);
DB::NuKeeperStorage storage(500);
DB::KeeperSnapshotManager manager("./snapshots", 3);
DB::KeeperStorage storage(500);
for (size_t i = 0; i < 50; ++i)
{
addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i));
}
{
DB::NuKeeperStorageSnapshot snapshot(&storage, 50);
DB::KeeperStorageSnapshot snapshot(&storage, 50);
for (size_t i = 0; i < 50; ++i)
{
addNode(storage, "/hello_" + std::to_string(i), "wlrd_" + std::to_string(i));
@ -1050,14 +1050,14 @@ TEST(CoordinationTest, TestStorageSnapshotMode)
TEST(CoordinationTest, TestStorageSnapshotBroken)
{
ChangelogDirTest test("./snapshots");
DB::NuKeeperSnapshotManager manager("./snapshots", 3);
DB::NuKeeperStorage storage(500);
DB::KeeperSnapshotManager manager("./snapshots", 3);
DB::KeeperStorage storage(500);
for (size_t i = 0; i < 50; ++i)
{
addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i));
}
{
DB::NuKeeperStorageSnapshot snapshot(&storage, 50);
DB::KeeperStorageSnapshot snapshot(&storage, 50);
auto buf = manager.serializeSnapshotToBuffer(snapshot);
manager.serializeSnapshotBufferToDisk(*buf, 50);
}
@ -1095,9 +1095,9 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, size
ResponsesQueue queue;
SnapshotsQueue snapshots_queue{1};
auto state_machine = std::make_shared<NuKeeperStateMachine>(queue, snapshots_queue, "./snapshots", settings);
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, "./snapshots", settings);
state_machine->init();
DB::NuKeeperLogStore changelog("./logs", settings->rotate_log_storage_interval, true);
DB::KeeperLogStore changelog("./logs", settings->rotate_log_storage_interval, true);
changelog.init(state_machine->last_commit_index() + 1, settings->reserved_log_items);
for (size_t i = 1; i < total_logs + 1; ++i)
{
@ -1132,11 +1132,11 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, size
}
SnapshotsQueue snapshots_queue1{1};
auto restore_machine = std::make_shared<NuKeeperStateMachine>(queue, snapshots_queue1, "./snapshots", settings);
auto restore_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue1, "./snapshots", settings);
restore_machine->init();
EXPECT_EQ(restore_machine->last_commit_index(), total_logs - total_logs % settings->snapshot_distance);
DB::NuKeeperLogStore restore_changelog("./logs", settings->rotate_log_storage_interval, true);
DB::KeeperLogStore restore_changelog("./logs", settings->rotate_log_storage_interval, true);
restore_changelog.init(restore_machine->last_commit_index() + 1, settings->reserved_log_items);
EXPECT_EQ(restore_changelog.size(), std::min(settings->reserved_log_items + total_logs % settings->snapshot_distance, total_logs));
@ -1242,7 +1242,7 @@ TEST(CoordinationTest, TestEphemeralNodeRemove)
ResponsesQueue queue;
SnapshotsQueue snapshots_queue{1};
auto state_machine = std::make_shared<NuKeeperStateMachine>(queue, snapshots_queue, "./snapshots", settings);
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, "./snapshots", settings);
state_machine->init();
std::shared_ptr<ZooKeeperCreateRequest> request_c = std::make_shared<ZooKeeperCreateRequest>();

View File

@ -12,7 +12,7 @@
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/thread_local_rng.h>
#include <Coordination/NuKeeperStorageDispatcher.h>
#include <Coordination/KeeperStorageDispatcher.h>
#include <Compression/ICompressionCodec.h>
#include <Core/BackgroundSchedulePool.h>
#include <Formats/FormatFactory.h>
@ -314,7 +314,7 @@ struct ContextShared
#if USE_NURAFT
mutable std::mutex nu_keeper_storage_dispatcher_mutex;
mutable std::shared_ptr<NuKeeperStorageDispatcher> nu_keeper_storage_dispatcher;
mutable std::shared_ptr<KeeperStorageDispatcher> nu_keeper_storage_dispatcher;
#endif
mutable std::mutex auxiliary_zookeepers_mutex;
mutable std::map<String, zkutil::ZooKeeperPtr> auxiliary_zookeepers; /// Map for auxiliary ZooKeeper clients.
@ -1616,35 +1616,35 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const
}
void Context::initializeNuKeeperStorageDispatcher() const
void Context::initializeKeeperStorageDispatcher() const
{
#if USE_NURAFT
std::lock_guard lock(shared->nu_keeper_storage_dispatcher_mutex);
if (shared->nu_keeper_storage_dispatcher)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to initialize NuKeeper multiple times");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to initialize Keeper multiple times");
const auto & config = getConfigRef();
if (config.has("test_keeper_server"))
if (config.has("keeper_server"))
{
shared->nu_keeper_storage_dispatcher = std::make_shared<NuKeeperStorageDispatcher>();
shared->nu_keeper_storage_dispatcher = std::make_shared<KeeperStorageDispatcher>();
shared->nu_keeper_storage_dispatcher->initialize(config);
}
#endif
}
#if USE_NURAFT
std::shared_ptr<NuKeeperStorageDispatcher> & Context::getNuKeeperStorageDispatcher() const
std::shared_ptr<KeeperStorageDispatcher> & Context::getKeeperStorageDispatcher() const
{
std::lock_guard lock(shared->nu_keeper_storage_dispatcher_mutex);
if (!shared->nu_keeper_storage_dispatcher)
throw Exception(ErrorCodes::LOGICAL_ERROR, "NuKeeper must be initialized before requests");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Keeper must be initialized before requests");
return shared->nu_keeper_storage_dispatcher;
}
#endif
void Context::shutdownNuKeeperStorageDispatcher() const
void Context::shutdownKeeperStorageDispatcher() const
{
#if USE_NURAFT
std::lock_guard lock(shared->nu_keeper_storage_dispatcher_mutex);

View File

@ -111,7 +111,7 @@ class StoragePolicySelector;
using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;
struct PartUUIDs;
using PartUUIDsPtr = std::shared_ptr<PartUUIDs>;
class NuKeeperStorageDispatcher;
class KeeperStorageDispatcher;
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
@ -598,10 +598,10 @@ public:
std::shared_ptr<zkutil::ZooKeeper> getAuxiliaryZooKeeper(const String & name) const;
#if USE_NURAFT
std::shared_ptr<NuKeeperStorageDispatcher> & getNuKeeperStorageDispatcher() const;
std::shared_ptr<KeeperStorageDispatcher> & getKeeperStorageDispatcher() const;
#endif
void initializeNuKeeperStorageDispatcher() const;
void shutdownNuKeeperStorageDispatcher() const;
void initializeKeeperStorageDispatcher() const;
void shutdownKeeperStorageDispatcher() const;
/// Set auxiliary zookeepers configuration at server starting or configuration reloading.
void reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config);

View File

@ -1,4 +1,4 @@
#include <Server/NuKeeperTCPHandler.h>
#include <Server/KeeperTCPHandler.h>
#if USE_NURAFT
@ -189,20 +189,20 @@ struct SocketInterruptablePollWrapper
#endif
};
NuKeeperTCPHandler::NuKeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_)
KeeperTCPHandler::KeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, log(&Poco::Logger::get("NuKeeperTCPHandler"))
, log(&Poco::Logger::get("KeeperTCPHandler"))
, global_context(server.context())
, nu_keeper_storage_dispatcher(global_context.getNuKeeperStorageDispatcher())
, operation_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000)
, session_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.session_timeout_ms", Coordination::DEFAULT_SESSION_TIMEOUT_MS) * 1000)
, nu_keeper_storage_dispatcher(global_context.getKeeperStorageDispatcher())
, operation_timeout(0, global_context.getConfigRef().getUInt("keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000)
, session_timeout(0, global_context.getConfigRef().getUInt("keeper_server.session_timeout_ms", Coordination::DEFAULT_SESSION_TIMEOUT_MS) * 1000)
, poll_wrapper(std::make_unique<SocketInterruptablePollWrapper>(socket_))
, responses(std::make_unique<ThreadSafeResponseQueue>())
{
}
void NuKeeperTCPHandler::sendHandshake(bool has_leader)
void KeeperTCPHandler::sendHandshake(bool has_leader)
{
Coordination::write(Coordination::SERVER_HANDSHAKE_LENGTH, *out);
if (has_leader)
@ -217,12 +217,12 @@ void NuKeeperTCPHandler::sendHandshake(bool has_leader)
out->next();
}
void NuKeeperTCPHandler::run()
void KeeperTCPHandler::run()
{
runImpl();
}
Poco::Timespan NuKeeperTCPHandler::receiveHandshake()
Poco::Timespan KeeperTCPHandler::receiveHandshake()
{
int32_t handshake_length;
int32_t protocol_version;
@ -254,7 +254,7 @@ Poco::Timespan NuKeeperTCPHandler::receiveHandshake()
}
void NuKeeperTCPHandler::runImpl()
void KeeperTCPHandler::runImpl()
{
setThreadName("TstKprHandler");
ThreadStatus thread_status;
@ -393,7 +393,7 @@ void NuKeeperTCPHandler::runImpl()
}
}
std::pair<Coordination::OpNum, Coordination::XID> NuKeeperTCPHandler::receiveRequest()
std::pair<Coordination::OpNum, Coordination::XID> KeeperTCPHandler::receiveRequest()
{
int32_t length;
Coordination::read(length, *in);

View File

@ -13,7 +13,7 @@
#include <Interpreters/Context.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <Coordination/NuKeeperStorageDispatcher.h>
#include <Coordination/KeeperStorageDispatcher.h>
#include <IO/WriteBufferFromPocoSocket.h>
#include <IO/ReadBufferFromPocoSocket.h>
#include <Coordination/ThreadSafeQueue.h>
@ -29,16 +29,16 @@ using ThreadSafeResponseQueue = ThreadSafeQueue<Coordination::ZooKeeperResponseP
using ThreadSafeResponseQueuePtr = std::unique_ptr<ThreadSafeResponseQueue>;
class NuKeeperTCPHandler : public Poco::Net::TCPServerConnection
class KeeperTCPHandler : public Poco::Net::TCPServerConnection
{
public:
NuKeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_);
KeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_);
void run() override;
private:
IServer & server;
Poco::Logger * log;
Context global_context;
std::shared_ptr<NuKeeperStorageDispatcher> nu_keeper_storage_dispatcher;
std::shared_ptr<KeeperStorageDispatcher> nu_keeper_storage_dispatcher;
Poco::Timespan operation_timeout;
Poco::Timespan session_timeout;
int64_t session_id{-1};

View File

@ -1,6 +1,6 @@
#pragma once
#include <Server/NuKeeperTCPHandler.h>
#include <Server/KeeperTCPHandler.h>
#include <Poco/Net/TCPServerConnectionFactory.h>
#include <Poco/Net/NetException.h>
#include <common/logger_useful.h>
@ -9,7 +9,7 @@
namespace DB
{
class NuKeeperTCPHandlerFactory : public Poco::Net::TCPServerConnectionFactory
class KeeperTCPHandlerFactory : public Poco::Net::TCPServerConnectionFactory
{
private:
IServer & server;
@ -21,9 +21,9 @@ private:
void run() override {}
};
public:
NuKeeperTCPHandlerFactory(IServer & server_)
KeeperTCPHandlerFactory(IServer & server_)
: server(server_)
, log(&Poco::Logger::get("NuKeeperTCPHandlerFactory"))
, log(&Poco::Logger::get("KeeperTCPHandlerFactory"))
{
}
@ -31,8 +31,8 @@ public:
{
try
{
LOG_TRACE(log, "NuKeeper request. Address: {}", socket.peerAddress().toString());
return new NuKeeperTCPHandler(server, socket);
LOG_TRACE(log, "Keeper request. Address: {}", socket.peerAddress().toString());
return new KeeperTCPHandler(server, socket);
}
catch (const Poco::Net::NetException &)
{

View File

@ -22,10 +22,10 @@ SRCS(
HTTPHandler.cpp
HTTPHandlerFactory.cpp
InterserverIOHTTPHandler.cpp
KeeperTCPHandler.cpp
MySQLHandler.cpp
MySQLHandlerFactory.cpp
NotFoundHandler.cpp
NuKeeperTCPHandler.cpp
PostgreSQLHandler.cpp
PostgreSQLHandlerFactory.cpp
PrometheusMetricsWriter.cpp

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
@ -17,5 +17,5 @@
<port>44444</port>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -29,7 +29,7 @@ ln -sf $SRC_PATH/config.d/graphite.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/test_cluster_with_incorrect_pw.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/test_keeper_port.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/keeper_port.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/logging_no_rotate.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/tcp_with_proxy.xml $DEST_SERVER_PATH/config.d/
ln -sf $SRC_PATH/config.d/top_level_domains_lists.xml $DEST_SERVER_PATH/config.d/

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -19,5 +19,5 @@
<port>44444</port>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -7,7 +7,7 @@ import time
from multiprocessing.dummy import Pool
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', main_configs=['configs/enable_test_keeper.xml', 'configs/logs_conf.xml'], with_zookeeper=True)
node = cluster.add_instance('node', main_configs=['configs/enable_keeper.xml', 'configs/logs_conf.xml'], with_zookeeper=True)
from kazoo.client import KazooClient, KazooState, KeeperState
def get_genuine_zk():

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -37,5 +37,5 @@
<priority>1</priority>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -37,5 +37,5 @@
<priority>1</priority>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -37,5 +37,5 @@
<priority>1</priority>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -9,9 +9,9 @@ from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_test_keeper1.xml', 'configs/log_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_test_keeper2.xml', 'configs/log_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_test_keeper3.xml', 'configs/log_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml', 'configs/log_conf.xml', 'configs/use_keeper.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml', 'configs/log_conf.xml', 'configs/use_keeper.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_keeper3.xml', 'configs/log_conf.xml', 'configs/use_keeper.xml'], stay_alive=True)
from kazoo.client import KazooClient, KazooState

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -37,5 +37,5 @@
<priority>1</priority>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -37,5 +37,5 @@
<priority>1</priority>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -37,5 +37,5 @@
<priority>1</priority>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -9,9 +9,9 @@ from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_test_keeper1.xml', 'configs/log_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_test_keeper2.xml', 'configs/log_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_test_keeper3.xml', 'configs/log_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml', 'configs/log_conf.xml', 'configs/use_keeper.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml', 'configs/log_conf.xml', 'configs/use_keeper.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_keeper3.xml', 'configs/log_conf.xml', 'configs/use_keeper.xml'], stay_alive=True)
from kazoo.client import KazooClient, KazooState

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -18,5 +18,5 @@
<port>44444</port>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -10,7 +10,7 @@ from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', main_configs=['configs/enable_test_keeper.xml', 'configs/logs_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
node = cluster.add_instance('node', main_configs=['configs/enable_keeper.xml', 'configs/logs_conf.xml', 'configs/use_keeper.xml'], stay_alive=True)
def random_string(length):

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -36,5 +36,5 @@
<priority>1</priority>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -36,5 +36,5 @@
<priority>1</priority>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -36,5 +36,5 @@
<priority>1</priority>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -7,9 +7,9 @@ import os
import time
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_test_keeper1.xml', 'configs/log_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_test_keeper2.xml', 'configs/log_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_test_keeper3.xml', 'configs/log_conf.xml', 'configs/use_test_keeper.xml'], stay_alive=True)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml', 'configs/log_conf.xml', 'configs/use_keeper.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml', 'configs/log_conf.xml', 'configs/use_keeper.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_keeper3.xml', 'configs/log_conf.xml', 'configs/use_keeper.xml'], stay_alive=True)
from kazoo.client import KazooClient, KazooState

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -39,5 +39,5 @@
<priority>10</priority>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -39,5 +39,5 @@
<priority>10</priority>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -39,5 +39,5 @@
<priority>10</priority>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -7,9 +7,9 @@ import os
import time
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_test_keeper1.xml', 'configs/log_conf.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_test_keeper2.xml', 'configs/log_conf.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_test_keeper3.xml', 'configs/log_conf.xml'], stay_alive=True)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml', 'configs/log_conf.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml', 'configs/log_conf.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_keeper3.xml', 'configs/log_conf.xml'], stay_alive=True)
from kazoo.client import KazooClient, KazooState

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -20,5 +20,5 @@
<port>44444</port>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -13,7 +13,7 @@ from kazoo.client import KazooClient, KazooState
cluster = ClickHouseCluster(__file__)
# clickhouse itself will use external zookeeper
node = cluster.add_instance('node', main_configs=['configs/enable_test_keeper.xml', 'configs/logs_conf.xml'], stay_alive=True, with_zookeeper=True)
node = cluster.add_instance('node', main_configs=['configs/enable_keeper.xml', 'configs/logs_conf.xml'], stay_alive=True, with_zookeeper=True)
def random_string(length):
return ''.join(random.choices(string.ascii_lowercase + string.digits, k=length))

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -38,5 +38,5 @@
<priority>1</priority>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -38,5 +38,5 @@
<priority>1</priority>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
@ -38,5 +38,5 @@
<priority>1</priority>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -7,9 +7,9 @@ import os
import time
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_test_keeper1.xml', 'configs/log_conf.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_test_keeper2.xml', 'configs/log_conf.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_test_keeper3.xml', 'configs/log_conf.xml'], stay_alive=True)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml', 'configs/log_conf.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml', 'configs/log_conf.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_keeper3.xml', 'configs/log_conf.xml'], stay_alive=True)
from kazoo.client import KazooClient, KazooState

View File

@ -1,13 +1,13 @@
(defproject jepsen.nukeeper "0.1.0-SNAPSHOT"
:injections [(.. System (setProperty "zookeeper.request.timeout" "10000"))]
:description "A jepsen tests for ClickHouse NuKeeper"
:description "A jepsen tests for ClickHouse Keeper"
:url "https://clickhouse.tech/"
:license {:name "EPL-2.0"
:url "https://www.eclipse.org/legal/epl-2.0/"}
:main jepsen.nukeeper.main
:main jepsen.clickhouse-keeper.main
:plugins [[lein-cljfmt "0.7.0"]]
:dependencies [[org.clojure/clojure "1.10.1"]
[jepsen "0.2.3"]
[zookeeper-clj "0.9.4"]
[org.apache.zookeeper/zookeeper "3.6.1" :exclusions [org.slf4j/slf4j-log4j12]]]
:repl-options {:init-ns jepsen.nukeeper.main})
:repl-options {:init-ns jepsen.clickhouse-keeper.main})

View File

@ -1,5 +1,5 @@
<yandex>
<test_keeper_server>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>{id}</server_id>
@ -32,5 +32,5 @@
<port>9444</port>
</server>
</raft_configuration>
</test_keeper_server>
</keeper_server>
</yandex>

View File

@ -1,4 +1,4 @@
(ns jepsen.nukeeper.constants)
(ns jepsen.clickhouse-keeper.constants)
(def common-prefix "/home/robot-clickhouse")

View File

@ -1,11 +1,11 @@
(ns jepsen.nukeeper.counter
(ns jepsen.clickhouse-keeper.counter
(:require
[clojure.tools.logging :refer :all]
[jepsen
[checker :as checker]
[client :as client]
[generator :as gen]]
[jepsen.nukeeper.utils :refer :all]
[jepsen.clickhouse-keeper.utils :refer :all]
[zookeeper :as zk])
(:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)))

View File

@ -1,11 +1,11 @@
(ns jepsen.nukeeper.db
(ns jepsen.clickhouse-keeper.db
(:require [clojure.tools.logging :refer :all]
[jepsen
[control :as c]
[db :as db]
[util :as util :refer [meh]]]
[jepsen.nukeeper.constants :refer :all]
[jepsen.nukeeper.utils :refer :all]
[jepsen.clickhouse-keeper.constants :refer :all]
[jepsen.clickhouse-keeper.utils :refer :all]
[clojure.java.io :as io]
[jepsen.control.util :as cu]
[jepsen.os.ubuntu :as ubuntu]))
@ -88,7 +88,7 @@
(c/exec :echo (slurp (io/resource "config.xml")) :> (str configs-dir "/config.xml"))
(c/exec :echo (slurp (io/resource "users.xml")) :> (str configs-dir "/users.xml"))
(c/exec :echo (slurp (io/resource "listen.xml")) :> (str sub-configs-dir "/listen.xml"))
(c/exec :echo (cluster-config test node (slurp (io/resource "test_keeper_config.xml"))) :> (str sub-configs-dir "/test_keeper_config.xml")))
(c/exec :echo (cluster-config test node (slurp (io/resource "keeper_config.xml"))) :> (str sub-configs-dir "/keeper_config.xml")))
(defn db
[version reuse-binary]
@ -116,7 +116,7 @@
(c/exec :rm :-rf binary-path))
(c/exec :rm :-rf pid-file-path)
(c/exec :rm :-rf data-dir)
(c/exec :rm :-rf logs-dir)
;(c/exec :rm :-rf logs-dir)
(c/exec :rm :-rf configs-dir)))
db/LogFiles

View File

@ -1,15 +1,15 @@
(ns jepsen.nukeeper.main
(ns jepsen.clickhouse-keeper.main
(:require [clojure.tools.logging :refer :all]
[jepsen.nukeeper.utils :refer :all]
[jepsen.clickhouse-keeper.utils :refer :all]
[clojure.pprint :refer [pprint]]
[jepsen.nukeeper.set :as set]
[jepsen.nukeeper.db :refer :all]
[jepsen.nukeeper.nemesis :as custom-nemesis]
[jepsen.nukeeper.register :as register]
[jepsen.nukeeper.unique :as unique]
[jepsen.nukeeper.queue :as queue]
[jepsen.nukeeper.counter :as counter]
[jepsen.nukeeper.constants :refer :all]
[jepsen.clickhouse-keeper.set :as set]
[jepsen.clickhouse-keeper.db :refer :all]
[jepsen.clickhouse-keeper.nemesis :as custom-nemesis]
[jepsen.clickhouse-keeper.register :as register]
[jepsen.clickhouse-keeper.unique :as unique]
[jepsen.clickhouse-keeper.queue :as queue]
[jepsen.clickhouse-keeper.counter :as counter]
[jepsen.clickhouse-keeper.constants :refer :all]
[clojure.string :as str]
[jepsen
[checker :as checker]
@ -75,7 +75,7 @@
["-c" "--clickhouse-source URL" "URL for clickhouse deb or tgz package"
:default "https://clickhouse-builds.s3.yandex.net/21677/ef82333089156907a0979669d9374c2e18daabe5/clickhouse_build_check/clang-11_relwithdebuginfo_none_bundled_unsplitted_disable_False_deb/clickhouse-common-static_21.4.1.6313_amd64.deb"]])
(defn nukeeper-test
(defn clickhouse-keeper-test
"Given an options map from the command line runner (e.g. :nodes, :ssh,
:concurrency, ...), constructs a test map."
[opts]
@ -151,9 +151,9 @@
[& args]
(.setLevel
(LoggerFactory/getLogger "org.apache.zookeeper") Level/OFF)
(cli/run! (merge (cli/single-test-cmd {:test-fn nukeeper-test
(cli/run! (merge (cli/single-test-cmd {:test-fn clickhouse-keeper-test
:opt-spec cli-opts})
(cli/test-all-cmd {:tests-fn (partial all-tests nukeeper-test)
(cli/test-all-cmd {:tests-fn (partial all-tests clickhouse-keeper-test)
:opt-spec cli-opts})
(cli/serve-cmd))
args))

View File

@ -1,12 +1,12 @@
(ns jepsen.nukeeper.nemesis
(ns jepsen.clickhouse-keeper.nemesis
(:require
[clojure.tools.logging :refer :all]
[jepsen
[nemesis :as nemesis]
[control :as c]
[generator :as gen]]
[jepsen.nukeeper.constants :refer :all]
[jepsen.nukeeper.utils :refer :all]))
[jepsen.clickhouse-keeper.constants :refer :all]
[jepsen.clickhouse-keeper.utils :refer :all]))
(defn random-node-killer-nemesis
[]

View File

@ -1,4 +1,4 @@
(ns jepsen.nukeeper.queue
(ns jepsen.clickhouse-keeper.queue
(:require
[clojure.tools.logging :refer :all]
[jepsen
@ -7,7 +7,7 @@
[generator :as gen]]
[knossos.model :as model]
[jepsen.checker.timeline :as timeline]
[jepsen.nukeeper.utils :refer :all]
[jepsen.clickhouse-keeper.utils :refer :all]
[zookeeper :as zk])
(:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)))

View File

@ -1,4 +1,4 @@
(ns jepsen.nukeeper.register
(ns jepsen.clickhouse-keeper.register
(:require [jepsen
[checker :as checker]
[client :as client]
@ -6,7 +6,7 @@
[generator :as gen]]
[jepsen.checker.timeline :as timeline]
[knossos.model :as model]
[jepsen.nukeeper.utils :refer :all]
[jepsen.clickhouse-keeper.utils :refer :all]
[zookeeper :as zk])
(:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)))

View File

@ -1,11 +1,11 @@
(ns jepsen.nukeeper.set
(ns jepsen.clickhouse-keeper.set
(:require
[clojure.tools.logging :refer :all]
[jepsen
[checker :as checker]
[client :as client]
[generator :as gen]]
[jepsen.nukeeper.utils :refer :all]
[jepsen.clickhouse-keeper.utils :refer :all]
[zookeeper :as zk])
(:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)))

View File

@ -1,11 +1,11 @@
(ns jepsen.nukeeper.unique
(ns jepsen.clickhouse-keeper.unique
(:require
[clojure.tools.logging :refer :all]
[jepsen
[checker :as checker]
[client :as client]
[generator :as gen]]
[jepsen.nukeeper.utils :refer :all]
[jepsen.clickhouse-keeper.utils :refer :all]
[zookeeper :as zk])
(:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException)))

View File

@ -1,10 +1,10 @@
(ns jepsen.nukeeper.utils
(ns jepsen.clickhouse-keeper.utils
(:require [clojure.string :as str]
[zookeeper.data :as data]
[zookeeper :as zk]
[zookeeper.internal :as zi]
[jepsen.control.util :as cu]
[jepsen.nukeeper.constants :refer :all]
[jepsen.clickhouse-keeper.constants :refer :all]
[jepsen.control :as c]
[clojure.tools.logging :refer :all])
(:import (org.apache.zookeeper.data Stat)
@ -164,8 +164,8 @@
:--top_level_domains_path (str data-dir "/top_level_domains")
:--logger.log (str logs-dir "/clickhouse-server.log")
:--logger.errorlog (str logs-dir "/clickhouse-server.err.log")
:--test_keeper_server.snapshot_storage_path coordination-snapshots-dir
:--test_keeper_server.logs_storage_path coordination-logs-dir)
:--keeper_server.snapshot_storage_path coordination-snapshots-dir
:--keeper_server.logs_storage_path coordination-logs-dir)
(wait-clickhouse-alive! node test)))
(defn exec-with-retries

View File

@ -1,18 +1,18 @@
#include <Poco/ConsoleChannel.h>
#include <Poco/Logger.h>
#include <Coordination/NuKeeperStateMachine.h>
#include <Coordination/KeeperStateMachine.h>
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Exception.h>
#include <libnuraft/nuraft.hxx> // Y_IGNORE
#include <Coordination/NuKeeperLogStore.h>
#include <Coordination/KeeperLogStore.h>
#include <Coordination/Changelog.h>
#include <common/logger_useful.h>
using namespace Coordination;
using namespace DB;
void dumpMachine(std::shared_ptr<NuKeeperStateMachine> machine)
void dumpMachine(std::shared_ptr<KeeperStateMachine> machine)
{
auto & storage = machine->getStorage();
std::queue<std::string> keys;
@ -62,13 +62,13 @@ int main(int argc, char *argv[])
ResponsesQueue queue;
SnapshotsQueue snapshots_queue{1};
CoordinationSettingsPtr settings = std::make_shared<CoordinationSettings>();
auto state_machine = std::make_shared<NuKeeperStateMachine>(queue, snapshots_queue, argv[1], settings);
auto state_machine = std::make_shared<KeeperStateMachine>(queue, snapshots_queue, argv[1], settings);
state_machine->init();
size_t last_commited_index = state_machine->last_commit_index();
LOG_INFO(logger, "Last committed index: {}", last_commited_index);
DB::NuKeeperLogStore changelog(argv[2], 10000000, true);
DB::KeeperLogStore changelog(argv[2], 10000000, true);
changelog.init(last_commited_index, 10000000000UL); /// collect all logs
if (changelog.size() == 0)
LOG_INFO(logger, "Changelog empty");