From be132a32a22872420ef5b32722e1235b3a7448cf Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 29 Mar 2021 11:24:56 +0300 Subject: [PATCH] More renames --- docker/test/fasttest/run.sh | 2 +- programs/server/Server.cpp | 16 +- src/Coordination/KeeperLogStore.cpp | 34 ++-- src/Coordination/KeeperLogStore.h | 4 +- src/Coordination/KeeperServer.cpp | 45 +++-- src/Coordination/KeeperServer.h | 16 +- src/Coordination/KeeperSnapshotManager.cpp | 44 ++--- src/Coordination/KeeperSnapshotManager.h | 32 +-- src/Coordination/KeeperStateMachine.cpp | 46 ++--- src/Coordination/KeeperStateMachine.h | 18 +- src/Coordination/KeeperStateManager.cpp | 18 +- src/Coordination/KeeperStateManager.h | 12 +- src/Coordination/KeeperStorage.cpp | 184 +++++++++--------- src/Coordination/KeeperStorage.h | 12 +- src/Coordination/KeeperStorageDispatcher.cpp | 50 ++--- src/Coordination/KeeperStorageDispatcher.h | 12 +- src/Coordination/tests/gtest_for_build.cpp | 116 +++++------ src/Interpreters/Context.cpp | 18 +- src/Interpreters/Context.h | 8 +- ...perTCPHandler.cpp => KeeperTCPHandler.cpp} | 22 +-- ...uKeeperTCPHandler.h => KeeperTCPHandler.h} | 8 +- ...lerFactory.h => KeeperTCPHandlerFactory.h} | 12 +- src/Server/ya.make | 2 +- .../{test_keeper_port.xml => keeper_port.xml} | 4 +- tests/config/install.sh | 2 +- .../.gitignore | 0 .../LICENSE | 0 .../README.md | 0 .../doc/intro.md | 0 .../project.clj | 6 +- .../resources/config.xml | 0 .../resources/listen.xml | 0 .../resources/test_keeper_config.xml | 4 +- .../resources/users.xml | 0 .../jepsen/clickhouse-keeper}/constants.clj | 2 +- .../src/jepsen/clickhouse-keeper}/counter.clj | 4 +- .../src/jepsen/clickhouse-keeper}/db.clj | 8 +- .../src/jepsen/clickhouse-keeper}/main.clj | 26 +-- .../src/jepsen/clickhouse-keeper}/nemesis.clj | 6 +- .../src/jepsen/clickhouse-keeper}/queue.clj | 4 +- .../jepsen/clickhouse-keeper}/register.clj | 4 +- .../src/jepsen/clickhouse-keeper}/set.clj | 4 +- .../src/jepsen/clickhouse-keeper}/unique.clj | 4 +- .../src/jepsen/clickhouse-keeper}/utils.clj | 8 +- .../test/jepsen/nukeeper_test.clj | 0 45 files changed, 410 insertions(+), 407 deletions(-) rename src/Server/{NuKeeperTCPHandler.cpp => KeeperTCPHandler.cpp} (94%) rename src/Server/{NuKeeperTCPHandler.h => KeeperTCPHandler.h} (85%) rename src/Server/{NuKeeperTCPHandlerFactory.h => KeeperTCPHandlerFactory.h} (68%) rename tests/config/config.d/{test_keeper_port.xml => keeper_port.xml} (91%) rename tests/{jepsen.nukeeper => jepsen.clickhouse-keeper}/.gitignore (100%) rename tests/{jepsen.nukeeper => jepsen.clickhouse-keeper}/LICENSE (100%) rename tests/{jepsen.nukeeper => jepsen.clickhouse-keeper}/README.md (100%) rename tests/{jepsen.nukeeper => jepsen.clickhouse-keeper}/doc/intro.md (100%) rename tests/{jepsen.nukeeper => jepsen.clickhouse-keeper}/project.clj (76%) rename tests/{jepsen.nukeeper => jepsen.clickhouse-keeper}/resources/config.xml (100%) rename tests/{jepsen.nukeeper => jepsen.clickhouse-keeper}/resources/listen.xml (100%) rename tests/{jepsen.nukeeper => jepsen.clickhouse-keeper}/resources/test_keeper_config.xml (95%) rename tests/{jepsen.nukeeper => jepsen.clickhouse-keeper}/resources/users.xml (100%) rename tests/{jepsen.nukeeper/src/jepsen/nukeeper => jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper}/constants.clj (93%) rename tests/{jepsen.nukeeper/src/jepsen/nukeeper => jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper}/counter.clj (94%) rename tests/{jepsen.nukeeper/src/jepsen/nukeeper => jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper}/db.clj (94%) rename tests/{jepsen.nukeeper/src/jepsen/nukeeper => jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper}/main.clj (90%) rename tests/{jepsen.nukeeper/src/jepsen/nukeeper => jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper}/nemesis.clj (97%) rename tests/{jepsen.nukeeper/src/jepsen/nukeeper => jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper}/queue.clj (97%) rename tests/{jepsen.nukeeper/src/jepsen/nukeeper => jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper}/register.clj (96%) rename tests/{jepsen.nukeeper/src/jepsen/nukeeper => jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper}/set.clj (94%) rename tests/{jepsen.nukeeper/src/jepsen/nukeeper => jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper}/unique.clj (92%) rename tests/{jepsen.nukeeper/src/jepsen/nukeeper => jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper}/utils.clj (96%) rename tests/{jepsen.nukeeper => jepsen.clickhouse-keeper}/test/jepsen/nukeeper_test.clj (100%) diff --git a/docker/test/fasttest/run.sh b/docker/test/fasttest/run.sh index bbd5443ffb6..c8bfce3848d 100755 --- a/docker/test/fasttest/run.sh +++ b/docker/test/fasttest/run.sh @@ -70,7 +70,7 @@ function start_server --path "$FASTTEST_DATA" --user_files_path "$FASTTEST_DATA/user_files" --top_level_domains_path "$FASTTEST_DATA/top_level_domains" - --test_keeper_server.log_storage_path "$FASTTEST_DATA/coordination" + --keeper_server.log_storage_path "$FASTTEST_DATA/coordination" ) clickhouse-server "${opts[@]}" &>> "$FASTTEST_OUTPUT/server.log" & server_pid=$! diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index b54e882c699..0aaf47989b9 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -97,7 +97,7 @@ #endif #if USE_NURAFT -# include +# include #endif namespace CurrentMetrics @@ -867,15 +867,15 @@ int Server::main(const std::vector & /*args*/) listen_try = true; } - if (config().has("test_keeper_server")) + if (config().has("keeper_server")) { #if USE_NURAFT /// Initialize test keeper RAFT. Do nothing if no nu_keeper_server in config. - global_context->initializeNuKeeperStorageDispatcher(); + global_context->initializeKeeperStorageDispatcher(); for (const auto & listen_host : listen_hosts) { - /// TCP NuKeeper - const char * port_name = "test_keeper_server.tcp_port"; + /// TCP Keeper + const char * port_name = "keeper_server.tcp_port"; createServer(listen_host, port_name, listen_try, [&](UInt16 port) { Poco::Net::ServerSocket socket; @@ -885,9 +885,9 @@ int Server::main(const std::vector & /*args*/) servers_to_start_before_tables->emplace_back( port_name, std::make_unique( - new NuKeeperTCPHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams)); + new KeeperTCPHandlerFactory(*this), server_pool, socket, new Poco::Net::TCPServerParams)); - LOG_INFO(log, "Listening for connections to NuKeeper (tcp): {}", address.toString()); + LOG_INFO(log, "Listening for connections to Keeper (tcp): {}", address.toString()); }); } #else @@ -934,7 +934,7 @@ int Server::main(const std::vector & /*args*/) else LOG_INFO(log, "Closed connections to servers for tables."); - global_context->shutdownNuKeeperStorageDispatcher(); + global_context->shutdownKeeperStorageDispatcher(); } /** Explicitly destroy Context. It is more convenient than in destructor of Server, because logger is still available. diff --git a/src/Coordination/KeeperLogStore.cpp b/src/Coordination/KeeperLogStore.cpp index 586dbabefa2..2b1306bffb9 100644 --- a/src/Coordination/KeeperLogStore.cpp +++ b/src/Coordination/KeeperLogStore.cpp @@ -1,40 +1,40 @@ -#include +#include namespace DB { -NuKeeperLogStore::NuKeeperLogStore(const std::string & changelogs_path, size_t rotate_interval_, bool force_sync_) - : log(&Poco::Logger::get("NuKeeperLogStore")) +KeeperLogStore::KeeperLogStore(const std::string & changelogs_path, size_t rotate_interval_, bool force_sync_) + : log(&Poco::Logger::get("KeeperLogStore")) , changelog(changelogs_path, rotate_interval_, log) , force_sync(force_sync_) { } -size_t NuKeeperLogStore::start_index() const +size_t KeeperLogStore::start_index() const { std::lock_guard lock(changelog_lock); return changelog.getStartIndex(); } -void NuKeeperLogStore::init(size_t last_commited_log_index, size_t logs_to_keep) +void KeeperLogStore::init(size_t last_commited_log_index, size_t logs_to_keep) { std::lock_guard lock(changelog_lock); changelog.readChangelogAndInitWriter(last_commited_log_index, logs_to_keep); } -size_t NuKeeperLogStore::next_slot() const +size_t KeeperLogStore::next_slot() const { std::lock_guard lock(changelog_lock); return changelog.getNextEntryIndex(); } -nuraft::ptr NuKeeperLogStore::last_entry() const +nuraft::ptr KeeperLogStore::last_entry() const { std::lock_guard lock(changelog_lock); return changelog.getLastEntry(); } -size_t NuKeeperLogStore::append(nuraft::ptr & entry) +size_t KeeperLogStore::append(nuraft::ptr & entry) { std::lock_guard lock(changelog_lock); size_t idx = changelog.getNextEntryIndex(); @@ -43,25 +43,25 @@ size_t NuKeeperLogStore::append(nuraft::ptr & entry) } -void NuKeeperLogStore::write_at(size_t index, nuraft::ptr & entry) +void KeeperLogStore::write_at(size_t index, nuraft::ptr & entry) { std::lock_guard lock(changelog_lock); changelog.writeAt(index, entry, force_sync); } -nuraft::ptr>> NuKeeperLogStore::log_entries(size_t start, size_t end) +nuraft::ptr>> KeeperLogStore::log_entries(size_t start, size_t end) { std::lock_guard lock(changelog_lock); return changelog.getLogEntriesBetween(start, end); } -nuraft::ptr NuKeeperLogStore::entry_at(size_t index) +nuraft::ptr KeeperLogStore::entry_at(size_t index) { std::lock_guard lock(changelog_lock); return changelog.entryAt(index); } -size_t NuKeeperLogStore::term_at(size_t index) +size_t KeeperLogStore::term_at(size_t index) { std::lock_guard lock(changelog_lock); auto entry = changelog.entryAt(index); @@ -70,33 +70,33 @@ size_t NuKeeperLogStore::term_at(size_t index) return 0; } -nuraft::ptr NuKeeperLogStore::pack(size_t index, int32_t cnt) +nuraft::ptr KeeperLogStore::pack(size_t index, int32_t cnt) { std::lock_guard lock(changelog_lock); return changelog.serializeEntriesToBuffer(index, cnt); } -bool NuKeeperLogStore::compact(size_t last_log_index) +bool KeeperLogStore::compact(size_t last_log_index) { std::lock_guard lock(changelog_lock); changelog.compact(last_log_index); return true; } -bool NuKeeperLogStore::flush() +bool KeeperLogStore::flush() { std::lock_guard lock(changelog_lock); changelog.flush(); return true; } -void NuKeeperLogStore::apply_pack(size_t index, nuraft::buffer & pack) +void KeeperLogStore::apply_pack(size_t index, nuraft::buffer & pack) { std::lock_guard lock(changelog_lock); changelog.applyEntriesFromBuffer(index, pack, force_sync); } -size_t NuKeeperLogStore::size() const +size_t KeeperLogStore::size() const { std::lock_guard lock(changelog_lock); return changelog.size(); diff --git a/src/Coordination/KeeperLogStore.h b/src/Coordination/KeeperLogStore.h index 33f021baf48..c9c20e144ba 100644 --- a/src/Coordination/KeeperLogStore.h +++ b/src/Coordination/KeeperLogStore.h @@ -9,10 +9,10 @@ namespace DB { -class NuKeeperLogStore : public nuraft::log_store +class KeeperLogStore : public nuraft::log_store { public: - NuKeeperLogStore(const std::string & changelogs_path, size_t rotate_interval_, bool force_sync_); + KeeperLogStore(const std::string & changelogs_path, size_t rotate_interval_, bool force_sync_); void init(size_t last_commited_log_index, size_t logs_to_keep); diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index 7e6c10ca125..fef373a399e 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -1,7 +1,7 @@ -#include +#include #include -#include -#include +#include +#include #include #include #include @@ -18,7 +18,7 @@ namespace ErrorCodes extern const int RAFT_ERROR; } -NuKeeperServer::NuKeeperServer( +KeeperServer::KeeperServer( int server_id_, const CoordinationSettingsPtr & coordination_settings_, const Poco::Util::AbstractConfiguration & config, @@ -26,15 +26,18 @@ NuKeeperServer::NuKeeperServer( SnapshotsQueue & snapshots_queue_) : server_id(server_id_) , coordination_settings(coordination_settings_) - , state_machine(nuraft::cs_new(responses_queue_, snapshots_queue_, config.getString("test_keeper_server.snapshot_storage_path", config.getString("path", DBMS_DEFAULT_PATH) + "coordination/snapshots"), coordination_settings)) - , state_manager(nuraft::cs_new(server_id, "test_keeper_server", config, coordination_settings)) + , state_machine(nuraft::cs_new( + responses_queue_, snapshots_queue_, + config.getString("keeper_server.snapshot_storage_path", config.getString("path", DBMS_DEFAULT_PATH) + "coordination/snapshots"), + coordination_settings)) + , state_manager(nuraft::cs_new(server_id, "keeper_server", config, coordination_settings)) , responses_queue(responses_queue_) { if (coordination_settings->quorum_reads) - LOG_WARNING(&Poco::Logger::get("NuKeeperServer"), "Quorum reads enabled, NuKeeper will work slower."); + LOG_WARNING(&Poco::Logger::get("KeeperServer"), "Quorum reads enabled, Keeper will work slower."); } -void NuKeeperServer::startup() +void KeeperServer::startup() { state_machine->init(); @@ -84,13 +87,13 @@ void NuKeeperServer::startup() throw Exception(ErrorCodes::RAFT_ERROR, "Cannot allocate RAFT instance"); } -void NuKeeperServer::shutdown() +void KeeperServer::shutdown() { state_machine->shutdownStorage(); state_manager->flushLogStore(); auto timeout = coordination_settings->shutdown_timeout.totalSeconds(); if (!launcher.shutdown(timeout)) - LOG_WARNING(&Poco::Logger::get("NuKeeperServer"), "Failed to shutdown RAFT server in {} seconds", timeout); + LOG_WARNING(&Poco::Logger::get("KeeperServer"), "Failed to shutdown RAFT server in {} seconds", timeout); } namespace @@ -106,7 +109,7 @@ nuraft::ptr getZooKeeperLogEntry(int64_t session_id, const Coord } -void NuKeeperServer::putRequest(const NuKeeperStorage::RequestForSession & request_for_session) +void KeeperServer::putRequest(const KeeperStorage::RequestForSession & request_for_session) { auto [session_id, request] = request_for_session; if (!coordination_settings->quorum_reads && isLeaderAlive() && request->isReadRequest()) @@ -123,29 +126,29 @@ void NuKeeperServer::putRequest(const NuKeeperStorage::RequestForSession & reque auto result = raft_instance->append_entries(entries); if (!result->get_accepted()) { - NuKeeperStorage::ResponsesForSessions responses; + KeeperStorage::ResponsesForSessions responses; auto response = request->makeResponse(); response->xid = request->xid; response->zxid = 0; response->error = Coordination::Error::ZOPERATIONTIMEOUT; - responses_queue.push(DB::NuKeeperStorage::ResponseForSession{session_id, response}); + responses_queue.push(DB::KeeperStorage::ResponseForSession{session_id, response}); } if (result->get_result_code() == nuraft::cmd_result_code::TIMEOUT) { - NuKeeperStorage::ResponsesForSessions responses; + KeeperStorage::ResponsesForSessions responses; auto response = request->makeResponse(); response->xid = request->xid; response->zxid = 0; response->error = Coordination::Error::ZOPERATIONTIMEOUT; - responses_queue.push(DB::NuKeeperStorage::ResponseForSession{session_id, response}); + responses_queue.push(DB::KeeperStorage::ResponseForSession{session_id, response}); } else if (result->get_result_code() != nuraft::cmd_result_code::OK) throw Exception(ErrorCodes::RAFT_ERROR, "Requests result failed with code {} and message: '{}'", result->get_result_code(), result->get_result_str()); } } -int64_t NuKeeperServer::getSessionID(int64_t session_timeout_ms) +int64_t KeeperServer::getSessionID(int64_t session_timeout_ms) { auto entry = nuraft::buffer::alloc(sizeof(int64_t)); /// Just special session request @@ -170,17 +173,17 @@ int64_t NuKeeperServer::getSessionID(int64_t session_timeout_ms) return bs_resp.get_i64(); } -bool NuKeeperServer::isLeader() const +bool KeeperServer::isLeader() const { return raft_instance->is_leader(); } -bool NuKeeperServer::isLeaderAlive() const +bool KeeperServer::isLeaderAlive() const { return raft_instance->is_leader_alive(); } -nuraft::cb_func::ReturnCode NuKeeperServer::callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * /* param */) +nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * /* param */) { size_t last_commited = state_machine->last_commit_index(); size_t next_index = state_manager->getLogStore()->next_slot(); @@ -240,7 +243,7 @@ nuraft::cb_func::ReturnCode NuKeeperServer::callbackFunc(nuraft::cb_func::Type t } } -void NuKeeperServer::waitInit() +void KeeperServer::waitInit() { std::unique_lock lock(initialized_mutex); int64_t timeout = coordination_settings->startup_timeout.totalMilliseconds(); @@ -248,7 +251,7 @@ void NuKeeperServer::waitInit() throw Exception(ErrorCodes::RAFT_ERROR, "Failed to wait RAFT initialization"); } -std::unordered_set NuKeeperServer::getDeadSessions() +std::unordered_set KeeperServer::getDeadSessions() { return state_machine->getDeadSessions(); } diff --git a/src/Coordination/KeeperServer.h b/src/Coordination/KeeperServer.h index b5c13e62212..091a2b0150f 100644 --- a/src/Coordination/KeeperServer.h +++ b/src/Coordination/KeeperServer.h @@ -2,25 +2,25 @@ #include // Y_IGNORE #include -#include -#include -#include +#include +#include +#include #include #include namespace DB { -class NuKeeperServer +class KeeperServer { private: int server_id; CoordinationSettingsPtr coordination_settings; - nuraft::ptr state_machine; + nuraft::ptr state_machine; - nuraft::ptr state_manager; + nuraft::ptr state_manager; nuraft::raft_launcher launcher; @@ -38,7 +38,7 @@ private: nuraft::cb_func::ReturnCode callbackFunc(nuraft::cb_func::Type type, nuraft::cb_func::Param * param); public: - NuKeeperServer( + KeeperServer( int server_id_, const CoordinationSettingsPtr & coordination_settings_, const Poco::Util::AbstractConfiguration & config, @@ -47,7 +47,7 @@ public: void startup(); - void putRequest(const NuKeeperStorage::RequestForSession & request); + void putRequest(const KeeperStorage::RequestForSession & request); int64_t getSessionID(int64_t session_timeout_ms); diff --git a/src/Coordination/KeeperSnapshotManager.cpp b/src/Coordination/KeeperSnapshotManager.cpp index 1caa1ea94b8..70d540ca06b 100644 --- a/src/Coordination/KeeperSnapshotManager.cpp +++ b/src/Coordination/KeeperSnapshotManager.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include #include @@ -51,7 +51,7 @@ namespace return "/"; } - void writeNode(const NuKeeperStorage::Node & node, WriteBuffer & out) + void writeNode(const KeeperStorage::Node & node, WriteBuffer & out) { writeBinary(node.data, out); @@ -81,7 +81,7 @@ namespace writeBinary(node.seq_num, out); } - void readNode(NuKeeperStorage::Node & node, ReadBuffer & in) + void readNode(KeeperStorage::Node & node, ReadBuffer & in) { readBinary(node.data, in); @@ -132,7 +132,7 @@ namespace } -void NuKeeperStorageSnapshot::serialize(const NuKeeperStorageSnapshot & snapshot, WriteBuffer & out) +void KeeperStorageSnapshot::serialize(const KeeperStorageSnapshot & snapshot, WriteBuffer & out) { writeBinary(static_cast(snapshot.version), out); serializeSnapshotMetadata(snapshot.snapshot_meta, out); @@ -159,7 +159,7 @@ void NuKeeperStorageSnapshot::serialize(const NuKeeperStorageSnapshot & snapshot } } -SnapshotMetadataPtr NuKeeperStorageSnapshot::deserialize(NuKeeperStorage & storage, ReadBuffer & in) +SnapshotMetadataPtr KeeperStorageSnapshot::deserialize(KeeperStorage & storage, ReadBuffer & in) { uint8_t version; readBinary(version, in); @@ -180,7 +180,7 @@ SnapshotMetadataPtr NuKeeperStorageSnapshot::deserialize(NuKeeperStorage & stora { std::string path; readBinary(path, in); - NuKeeperStorage::Node node; + KeeperStorage::Node node; readNode(node, in); storage.container.insertOrReplace(path, node); if (node.stat.ephemeralOwner != 0) @@ -194,7 +194,7 @@ SnapshotMetadataPtr NuKeeperStorageSnapshot::deserialize(NuKeeperStorage & stora if (itr.key != "/") { auto parent_path = parentPath(itr.key); - storage.container.updateValue(parent_path, [&path = itr.key] (NuKeeperStorage::Node & value) { value.children.insert(getBaseName(path)); }); + storage.container.updateValue(parent_path, [&path = itr.key] (KeeperStorage::Node & value) { value.children.insert(getBaseName(path)); }); } } @@ -214,7 +214,7 @@ SnapshotMetadataPtr NuKeeperStorageSnapshot::deserialize(NuKeeperStorage & stora return result; } -NuKeeperStorageSnapshot::NuKeeperStorageSnapshot(NuKeeperStorage * storage_, size_t up_to_log_idx_) +KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, size_t up_to_log_idx_) : storage(storage_) , snapshot_meta(std::make_shared(up_to_log_idx_, 0, std::make_shared())) , session_id(storage->session_id_counter) @@ -225,7 +225,7 @@ NuKeeperStorageSnapshot::NuKeeperStorageSnapshot(NuKeeperStorage * storage_, siz session_and_timeout = storage->getActiveSessions(); } -NuKeeperStorageSnapshot::NuKeeperStorageSnapshot(NuKeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_) +KeeperStorageSnapshot::KeeperStorageSnapshot(KeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_) : storage(storage_) , snapshot_meta(snapshot_meta_) , session_id(storage->session_id_counter) @@ -236,12 +236,12 @@ NuKeeperStorageSnapshot::NuKeeperStorageSnapshot(NuKeeperStorage * storage_, con session_and_timeout = storage->getActiveSessions(); } -NuKeeperStorageSnapshot::~NuKeeperStorageSnapshot() +KeeperStorageSnapshot::~KeeperStorageSnapshot() { storage->disableSnapshotMode(); } -NuKeeperSnapshotManager::NuKeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, size_t storage_tick_time_) +KeeperSnapshotManager::KeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, size_t storage_tick_time_) : snapshots_path(snapshots_path_) , snapshots_to_keep(snapshots_to_keep_) , storage_tick_time(storage_tick_time_) @@ -266,7 +266,7 @@ NuKeeperSnapshotManager::NuKeeperSnapshotManager(const std::string & snapshots_p } -std::string NuKeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer & buffer, size_t up_to_log_idx) +std::string KeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffer & buffer, size_t up_to_log_idx) { ReadBufferFromNuraftBuffer reader(buffer); @@ -287,7 +287,7 @@ std::string NuKeeperSnapshotManager::serializeSnapshotBufferToDisk(nuraft::buffe return new_snapshot_path; } -nuraft::ptr NuKeeperSnapshotManager::deserializeLatestSnapshotBufferFromDisk() +nuraft::ptr KeeperSnapshotManager::deserializeLatestSnapshotBufferFromDisk() { while (!existing_snapshots.empty()) { @@ -307,7 +307,7 @@ nuraft::ptr NuKeeperSnapshotManager::deserializeLatestSnapshotBu return nullptr; } -nuraft::ptr NuKeeperSnapshotManager::deserializeSnapshotBufferFromDisk(size_t up_to_log_idx) const +nuraft::ptr KeeperSnapshotManager::deserializeSnapshotBufferFromDisk(size_t up_to_log_idx) const { const std::string & snapshot_path = existing_snapshots.at(up_to_log_idx); WriteBufferFromNuraftBuffer writer; @@ -316,26 +316,26 @@ nuraft::ptr NuKeeperSnapshotManager::deserializeSnapshotBufferFr return writer.getBuffer(); } -nuraft::ptr NuKeeperSnapshotManager::serializeSnapshotToBuffer(const NuKeeperStorageSnapshot & snapshot) +nuraft::ptr KeeperSnapshotManager::serializeSnapshotToBuffer(const KeeperStorageSnapshot & snapshot) { WriteBufferFromNuraftBuffer writer; CompressedWriteBuffer compressed_writer(writer); - NuKeeperStorageSnapshot::serialize(snapshot, compressed_writer); + KeeperStorageSnapshot::serialize(snapshot, compressed_writer); compressed_writer.finalize(); return writer.getBuffer(); } -SnapshotMetaAndStorage NuKeeperSnapshotManager::deserializeSnapshotFromBuffer(nuraft::ptr buffer) const +SnapshotMetaAndStorage KeeperSnapshotManager::deserializeSnapshotFromBuffer(nuraft::ptr buffer) const { ReadBufferFromNuraftBuffer reader(buffer); CompressedReadBuffer compressed_reader(reader); - auto storage = std::make_unique(storage_tick_time); - auto snapshot_metadata = NuKeeperStorageSnapshot::deserialize(*storage, compressed_reader); + auto storage = std::make_unique(storage_tick_time); + auto snapshot_metadata = KeeperStorageSnapshot::deserialize(*storage, compressed_reader); return std::make_pair(snapshot_metadata, std::move(storage)); } -SnapshotMetaAndStorage NuKeeperSnapshotManager::restoreFromLatestSnapshot() +SnapshotMetaAndStorage KeeperSnapshotManager::restoreFromLatestSnapshot() { if (existing_snapshots.empty()) return {}; @@ -346,13 +346,13 @@ SnapshotMetaAndStorage NuKeeperSnapshotManager::restoreFromLatestSnapshot() return deserializeSnapshotFromBuffer(buffer); } -void NuKeeperSnapshotManager::removeOutdatedSnapshotsIfNeeded() +void KeeperSnapshotManager::removeOutdatedSnapshotsIfNeeded() { while (existing_snapshots.size() > snapshots_to_keep) removeSnapshot(existing_snapshots.begin()->first); } -void NuKeeperSnapshotManager::removeSnapshot(size_t log_idx) +void KeeperSnapshotManager::removeSnapshot(size_t log_idx) { auto itr = existing_snapshots.find(log_idx); if (itr == existing_snapshots.end()) diff --git a/src/Coordination/KeeperSnapshotManager.h b/src/Coordination/KeeperSnapshotManager.h index d844a52eaf4..e53473d71c7 100644 --- a/src/Coordination/KeeperSnapshotManager.h +++ b/src/Coordination/KeeperSnapshotManager.h @@ -1,6 +1,6 @@ #pragma once #include // Y_IGNORE -#include +#include #include #include @@ -15,42 +15,42 @@ enum SnapshotVersion : uint8_t V0 = 0, }; -struct NuKeeperStorageSnapshot +struct KeeperStorageSnapshot { public: - NuKeeperStorageSnapshot(NuKeeperStorage * storage_, size_t up_to_log_idx_); + KeeperStorageSnapshot(KeeperStorage * storage_, size_t up_to_log_idx_); - NuKeeperStorageSnapshot(NuKeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_); - ~NuKeeperStorageSnapshot(); + KeeperStorageSnapshot(KeeperStorage * storage_, const SnapshotMetadataPtr & snapshot_meta_); + ~KeeperStorageSnapshot(); - static void serialize(const NuKeeperStorageSnapshot & snapshot, WriteBuffer & out); + static void serialize(const KeeperStorageSnapshot & snapshot, WriteBuffer & out); - static SnapshotMetadataPtr deserialize(NuKeeperStorage & storage, ReadBuffer & in); + static SnapshotMetadataPtr deserialize(KeeperStorage & storage, ReadBuffer & in); - NuKeeperStorage * storage; + KeeperStorage * storage; SnapshotVersion version = SnapshotVersion::V0; SnapshotMetadataPtr snapshot_meta; int64_t session_id; size_t snapshot_container_size; - NuKeeperStorage::Container::const_iterator begin; + KeeperStorage::Container::const_iterator begin; SessionAndTimeout session_and_timeout; }; -using NuKeeperStorageSnapshotPtr = std::shared_ptr; -using CreateSnapshotCallback = std::function; +using KeeperStorageSnapshotPtr = std::shared_ptr; +using CreateSnapshotCallback = std::function; -using SnapshotMetaAndStorage = std::pair; +using SnapshotMetaAndStorage = std::pair; -class NuKeeperSnapshotManager +class KeeperSnapshotManager { public: - NuKeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, size_t storage_tick_time_ = 500); + KeeperSnapshotManager(const std::string & snapshots_path_, size_t snapshots_to_keep_, size_t storage_tick_time_ = 500); SnapshotMetaAndStorage restoreFromLatestSnapshot(); - static nuraft::ptr serializeSnapshotToBuffer(const NuKeeperStorageSnapshot & snapshot); + static nuraft::ptr serializeSnapshotToBuffer(const KeeperStorageSnapshot & snapshot); std::string serializeSnapshotBufferToDisk(nuraft::buffer & buffer, size_t up_to_log_idx); SnapshotMetaAndStorage deserializeSnapshotFromBuffer(nuraft::ptr buffer) const; @@ -82,7 +82,7 @@ private: struct CreateSnapshotTask { - NuKeeperStorageSnapshotPtr snapshot; + KeeperStorageSnapshotPtr snapshot; CreateSnapshotCallback create_snapshot; }; diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index a7037b8d644..1a1957b1979 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -1,9 +1,9 @@ -#include +#include #include #include #include #include -#include +#include #include namespace DB @@ -14,10 +14,10 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -NuKeeperStorage::RequestForSession parseRequest(nuraft::buffer & data) +KeeperStorage::RequestForSession parseRequest(nuraft::buffer & data) { ReadBufferFromNuraftBuffer buffer(data); - NuKeeperStorage::RequestForSession request_for_session; + KeeperStorage::RequestForSession request_for_session; readIntBinary(request_for_session.session_id, buffer); int32_t length; @@ -36,17 +36,17 @@ NuKeeperStorage::RequestForSession parseRequest(nuraft::buffer & data) return request_for_session; } -NuKeeperStateMachine::NuKeeperStateMachine(ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_) +KeeperStateMachine::KeeperStateMachine(ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_) : coordination_settings(coordination_settings_) , snapshot_manager(snapshots_path_, coordination_settings->snapshots_to_keep, coordination_settings->dead_session_check_period_ms.totalMicroseconds()) , responses_queue(responses_queue_) , snapshots_queue(snapshots_queue_) , last_committed_idx(0) - , log(&Poco::Logger::get("NuKeeperStateMachine")) + , log(&Poco::Logger::get("KeeperStateMachine")) { } -void NuKeeperStateMachine::init() +void KeeperStateMachine::init() { /// Do everything without mutexes, no other threads exist. LOG_DEBUG(log, "Totally have {} snapshots", snapshot_manager.totalSnapshots()); @@ -85,10 +85,10 @@ void NuKeeperStateMachine::init() } if (!storage) - storage = std::make_unique(coordination_settings->dead_session_check_period_ms.totalMilliseconds()); + storage = std::make_unique(coordination_settings->dead_session_check_period_ms.totalMilliseconds()); } -nuraft::ptr NuKeeperStateMachine::commit(const size_t log_idx, nuraft::buffer & data) +nuraft::ptr KeeperStateMachine::commit(const size_t log_idx, nuraft::buffer & data) { if (data.size() == sizeof(int64_t)) { @@ -109,7 +109,7 @@ nuraft::ptr NuKeeperStateMachine::commit(const size_t log_idx, n else { auto request_for_session = parseRequest(data); - NuKeeperStorage::ResponsesForSessions responses_for_sessions; + KeeperStorage::ResponsesForSessions responses_for_sessions; { std::lock_guard lock(storage_lock); responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx); @@ -122,7 +122,7 @@ nuraft::ptr NuKeeperStateMachine::commit(const size_t log_idx, n } } -bool NuKeeperStateMachine::apply_snapshot(nuraft::snapshot & s) +bool KeeperStateMachine::apply_snapshot(nuraft::snapshot & s) { LOG_DEBUG(log, "Applying snapshot {}", s.get_last_log_idx()); nuraft::ptr latest_snapshot_ptr; @@ -142,14 +142,14 @@ bool NuKeeperStateMachine::apply_snapshot(nuraft::snapshot & s) return true; } -nuraft::ptr NuKeeperStateMachine::last_snapshot() +nuraft::ptr KeeperStateMachine::last_snapshot() { /// Just return the latest snapshot. std::lock_guard lock(snapshots_lock); return latest_snapshot_meta; } -void NuKeeperStateMachine::create_snapshot( +void KeeperStateMachine::create_snapshot( nuraft::snapshot & s, nuraft::async_result::handler_type & when_done) { @@ -160,10 +160,10 @@ void NuKeeperStateMachine::create_snapshot( CreateSnapshotTask snapshot_task; { std::lock_guard lock(storage_lock); - snapshot_task.snapshot = std::make_shared(storage.get(), snapshot_meta_copy); + snapshot_task.snapshot = std::make_shared(storage.get(), snapshot_meta_copy); } - snapshot_task.create_snapshot = [this, when_done] (NuKeeperStorageSnapshotPtr && snapshot) + snapshot_task.create_snapshot = [this, when_done] (KeeperStorageSnapshotPtr && snapshot) { nuraft::ptr exception(nullptr); bool ret = true; @@ -203,7 +203,7 @@ void NuKeeperStateMachine::create_snapshot( snapshots_queue.push(std::move(snapshot_task)); } -void NuKeeperStateMachine::save_logical_snp_obj( +void KeeperStateMachine::save_logical_snp_obj( nuraft::snapshot & s, size_t & obj_id, nuraft::buffer & data, @@ -217,7 +217,7 @@ void NuKeeperStateMachine::save_logical_snp_obj( if (obj_id == 0) { std::lock_guard lock(storage_lock); - NuKeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx()); + KeeperStorageSnapshot snapshot(storage.get(), s.get_last_log_idx()); cloned_buffer = snapshot_manager.serializeSnapshotToBuffer(snapshot); } else @@ -235,7 +235,7 @@ void NuKeeperStateMachine::save_logical_snp_obj( std::shared_ptr> waiter = std::make_shared>(); auto future = waiter->get_future(); snapshot_task.snapshot = nullptr; - snapshot_task.create_snapshot = [this, waiter, cloned_buffer, log_idx = s.get_last_log_idx()] (NuKeeperStorageSnapshotPtr &&) + snapshot_task.create_snapshot = [this, waiter, cloned_buffer, log_idx = s.get_last_log_idx()] (KeeperStorageSnapshotPtr &&) { try { @@ -261,7 +261,7 @@ void NuKeeperStateMachine::save_logical_snp_obj( obj_id++; } -int NuKeeperStateMachine::read_logical_snp_obj( +int KeeperStateMachine::read_logical_snp_obj( nuraft::snapshot & s, void* & /*user_snp_ctx*/, ulong obj_id, @@ -289,9 +289,9 @@ int NuKeeperStateMachine::read_logical_snp_obj( return 0; } -void NuKeeperStateMachine::processReadRequest(const NuKeeperStorage::RequestForSession & request_for_session) +void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSession & request_for_session) { - NuKeeperStorage::ResponsesForSessions responses; + KeeperStorage::ResponsesForSessions responses; { std::lock_guard lock(storage_lock); responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt); @@ -300,13 +300,13 @@ void NuKeeperStateMachine::processReadRequest(const NuKeeperStorage::RequestForS responses_queue.push(response); } -std::unordered_set NuKeeperStateMachine::getDeadSessions() +std::unordered_set KeeperStateMachine::getDeadSessions() { std::lock_guard lock(storage_lock); return storage->getDeadSessions(); } -void NuKeeperStateMachine::shutdownStorage() +void KeeperStateMachine::shutdownStorage() { std::lock_guard lock(storage_lock); storage->finalize(); diff --git a/src/Coordination/KeeperStateMachine.h b/src/Coordination/KeeperStateMachine.h index af9ad6de4d2..aefdbe3f9f3 100644 --- a/src/Coordination/KeeperStateMachine.h +++ b/src/Coordination/KeeperStateMachine.h @@ -1,22 +1,22 @@ #pragma once -#include +#include #include // Y_IGNORE #include #include #include -#include +#include namespace DB { -using ResponsesQueue = ThreadSafeQueue; +using ResponsesQueue = ThreadSafeQueue; using SnapshotsQueue = ConcurrentBoundedQueue; -class NuKeeperStateMachine : public nuraft::state_machine +class KeeperStateMachine : public nuraft::state_machine { public: - NuKeeperStateMachine(ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_); + KeeperStateMachine(ResponsesQueue & responses_queue_, SnapshotsQueue & snapshots_queue_, const std::string & snapshots_path_, const CoordinationSettingsPtr & coordination_settings_); void init(); @@ -50,12 +50,12 @@ public: nuraft::ptr & data_out, bool & is_last_obj) override; - NuKeeperStorage & getStorage() + KeeperStorage & getStorage() { return *storage; } - void processReadRequest(const NuKeeperStorage::RequestForSession & request_for_session); + void processReadRequest(const KeeperStorage::RequestForSession & request_for_session); std::unordered_set getDeadSessions(); @@ -68,9 +68,9 @@ private: CoordinationSettingsPtr coordination_settings; - NuKeeperStoragePtr storage; + KeeperStoragePtr storage; - NuKeeperSnapshotManager snapshot_manager; + KeeperSnapshotManager snapshot_manager; ResponsesQueue & responses_queue; diff --git a/src/Coordination/KeeperStateManager.cpp b/src/Coordination/KeeperStateManager.cpp index 7b39dd160a5..ad656cd1f95 100644 --- a/src/Coordination/KeeperStateManager.cpp +++ b/src/Coordination/KeeperStateManager.cpp @@ -1,4 +1,4 @@ -#include +#include #include namespace DB @@ -9,23 +9,23 @@ namespace ErrorCodes extern const int RAFT_ERROR; } -NuKeeperStateManager::NuKeeperStateManager(int server_id_, const std::string & host, int port, const std::string & logs_path) +KeeperStateManager::KeeperStateManager(int server_id_, const std::string & host, int port, const std::string & logs_path) : my_server_id(server_id_) , my_port(port) - , log_store(nuraft::cs_new(logs_path, 5000, false)) + , log_store(nuraft::cs_new(logs_path, 5000, false)) , cluster_config(nuraft::cs_new()) { auto peer_config = nuraft::cs_new(my_server_id, host + ":" + std::to_string(port)); cluster_config->get_servers().push_back(peer_config); } -NuKeeperStateManager::NuKeeperStateManager( +KeeperStateManager::KeeperStateManager( int my_server_id_, const std::string & config_prefix, const Poco::Util::AbstractConfiguration & config, const CoordinationSettingsPtr & coordination_settings) : my_server_id(my_server_id_) - , log_store(nuraft::cs_new( + , log_store(nuraft::cs_new( config.getString(config_prefix + ".log_storage_path", config.getString("path", DBMS_DEFAULT_PATH) + "coordination/logs"), coordination_settings->rotate_log_storage_interval, coordination_settings->force_sync)) , cluster_config(nuraft::cs_new()) @@ -64,17 +64,17 @@ NuKeeperStateManager::NuKeeperStateManager( throw Exception(ErrorCodes::RAFT_ERROR, "At least one of servers should be able to start as leader (without )"); } -void NuKeeperStateManager::loadLogStore(size_t last_commited_index, size_t logs_to_keep) +void KeeperStateManager::loadLogStore(size_t last_commited_index, size_t logs_to_keep) { log_store->init(last_commited_index, logs_to_keep); } -void NuKeeperStateManager::flushLogStore() +void KeeperStateManager::flushLogStore() { log_store->flush(); } -void NuKeeperStateManager::save_config(const nuraft::cluster_config & config) +void KeeperStateManager::save_config(const nuraft::cluster_config & config) { // Just keep in memory in this example. // Need to write to disk here, if want to make it durable. @@ -82,7 +82,7 @@ void NuKeeperStateManager::save_config(const nuraft::cluster_config & config) cluster_config = nuraft::cluster_config::deserialize(*buf); } -void NuKeeperStateManager::save_state(const nuraft::srv_state & state) +void KeeperStateManager::save_state(const nuraft::srv_state & state) { // Just keep in memory in this example. // Need to write to disk here, if want to make it durable. diff --git a/src/Coordination/KeeperStateManager.h b/src/Coordination/KeeperStateManager.h index 82dd0072b9c..f0575082b02 100644 --- a/src/Coordination/KeeperStateManager.h +++ b/src/Coordination/KeeperStateManager.h @@ -2,7 +2,7 @@ #include #include -#include +#include #include #include // Y_IGNORE #include @@ -10,16 +10,16 @@ namespace DB { -class NuKeeperStateManager : public nuraft::state_mgr +class KeeperStateManager : public nuraft::state_mgr { public: - NuKeeperStateManager( + KeeperStateManager( int server_id_, const std::string & config_prefix, const Poco::Util::AbstractConfiguration & config, const CoordinationSettingsPtr & coordination_settings); - NuKeeperStateManager( + KeeperStateManager( int server_id_, const std::string & host, int port, @@ -52,7 +52,7 @@ public: return start_as_follower_servers.count(my_server_id); } - nuraft::ptr getLogStore() const { return log_store; } + nuraft::ptr getLogStore() const { return log_store; } size_t getTotalServers() const { return total_servers; } @@ -61,7 +61,7 @@ private: int my_port; size_t total_servers{0}; std::unordered_set start_as_follower_servers; - nuraft::ptr log_store; + nuraft::ptr log_store; nuraft::ptr my_server_config; nuraft::ptr cluster_config; nuraft::ptr server_state; diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index c1a8ebdfb44..3e9b64cd0a4 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1,4 +1,4 @@ -#include +#include #include #include #include @@ -31,9 +31,9 @@ static std::string getBaseName(const String & path) return std::string{&path[basename_start + 1], path.length() - basename_start - 1}; } -static NuKeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, NuKeeperStorage::Watches & watches, NuKeeperStorage::Watches & list_watches, Coordination::Event event_type) +static KeeperStorage::ResponsesForSessions processWatchesImpl(const String & path, KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches, Coordination::Event event_type) { - NuKeeperStorage::ResponsesForSessions result; + KeeperStorage::ResponsesForSessions result; auto it = watches.find(path); if (it != watches.end()) { @@ -44,7 +44,7 @@ static NuKeeperStorage::ResponsesForSessions processWatchesImpl(const String & p watch_response->type = event_type; watch_response->state = Coordination::State::CONNECTED; for (auto watcher_session : it->second) - result.push_back(NuKeeperStorage::ResponseForSession{watcher_session, watch_response}); + result.push_back(KeeperStorage::ResponseForSession{watcher_session, watch_response}); watches.erase(it); } @@ -60,14 +60,14 @@ static NuKeeperStorage::ResponsesForSessions processWatchesImpl(const String & p watch_list_response->type = Coordination::Event::CHILD; watch_list_response->state = Coordination::State::CONNECTED; for (auto watcher_session : it->second) - result.push_back(NuKeeperStorage::ResponseForSession{watcher_session, watch_list_response}); + result.push_back(KeeperStorage::ResponseForSession{watcher_session, watch_list_response}); list_watches.erase(it); } return result; } -NuKeeperStorage::NuKeeperStorage(int64_t tick_time_ms) +KeeperStorage::KeeperStorage(int64_t tick_time_ms) : session_expiry_queue(tick_time_ms) { container.insert("/", Node()); @@ -75,32 +75,32 @@ NuKeeperStorage::NuKeeperStorage(int64_t tick_time_ms) using Undo = std::function; -struct NuKeeperStorageRequest +struct KeeperStorageRequest { Coordination::ZooKeeperRequestPtr zk_request; - explicit NuKeeperStorageRequest(const Coordination::ZooKeeperRequestPtr & zk_request_) + explicit KeeperStorageRequest(const Coordination::ZooKeeperRequestPtr & zk_request_) : zk_request(zk_request_) {} - virtual std::pair process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const = 0; - virtual NuKeeperStorage::ResponsesForSessions processWatches(NuKeeperStorage::Watches & /*watches*/, NuKeeperStorage::Watches & /*list_watches*/) const { return {}; } + virtual std::pair process(KeeperStorage::Container & container, KeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const = 0; + virtual KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & /*watches*/, KeeperStorage::Watches & /*list_watches*/) const { return {}; } - virtual ~NuKeeperStorageRequest() = default; + virtual ~KeeperStorageRequest() = default; }; -struct NuKeeperStorageHeartbeatRequest final : public NuKeeperStorageRequest +struct KeeperStorageHeartbeatRequest final : public KeeperStorageRequest { - using NuKeeperStorageRequest::NuKeeperStorageRequest; - std::pair process(NuKeeperStorage::Container & /* container */, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t /* zxid */, int64_t /* session_id */) const override + using KeeperStorageRequest::KeeperStorageRequest; + std::pair process(KeeperStorage::Container & /* container */, KeeperStorage::Ephemerals & /* ephemerals */, int64_t /* zxid */, int64_t /* session_id */) const override { return {zk_request->makeResponse(), {}}; } }; -struct NuKeeperStorageSyncRequest final : public NuKeeperStorageRequest +struct KeeperStorageSyncRequest final : public KeeperStorageRequest { - using NuKeeperStorageRequest::NuKeeperStorageRequest; - std::pair process(NuKeeperStorage::Container & /* container */, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t /* zxid */, int64_t /* session_id */) const override + using KeeperStorageRequest::KeeperStorageRequest; + std::pair process(KeeperStorage::Container & /* container */, KeeperStorage::Ephemerals & /* ephemerals */, int64_t /* zxid */, int64_t /* session_id */) const override { auto response = zk_request->makeResponse(); dynamic_cast(response.get())->path = dynamic_cast(zk_request.get())->path; @@ -108,16 +108,16 @@ struct NuKeeperStorageSyncRequest final : public NuKeeperStorageRequest } }; -struct NuKeeperStorageCreateRequest final : public NuKeeperStorageRequest +struct KeeperStorageCreateRequest final : public KeeperStorageRequest { - using NuKeeperStorageRequest::NuKeeperStorageRequest; + using KeeperStorageRequest::KeeperStorageRequest; - NuKeeperStorage::ResponsesForSessions processWatches(NuKeeperStorage::Watches & watches, NuKeeperStorage::Watches & list_watches) const override + KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override { return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CREATED); } - std::pair process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const override + std::pair process(KeeperStorage::Container & container, KeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const override { Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Undo undo; @@ -143,7 +143,7 @@ struct NuKeeperStorageCreateRequest final : public NuKeeperStorageRequest } else { - NuKeeperStorage::Node created_node; + KeeperStorage::Node created_node; created_node.stat.czxid = zxid; created_node.stat.mzxid = zxid; created_node.stat.ctime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1); @@ -167,7 +167,7 @@ struct NuKeeperStorageCreateRequest final : public NuKeeperStorageRequest } auto child_path = getBaseName(path_created); - container.updateValue(parent_path, [child_path] (NuKeeperStorage::Node & parent) + container.updateValue(parent_path, [child_path] (KeeperStorage::Node & parent) { /// Increment sequential number even if node is not sequential ++parent.seq_num; @@ -188,7 +188,7 @@ struct NuKeeperStorageCreateRequest final : public NuKeeperStorageRequest if (is_ephemeral) ephemerals[session_id].erase(path_created); - container.updateValue(parent_path, [child_path] (NuKeeperStorage::Node & undo_parent) + container.updateValue(parent_path, [child_path] (KeeperStorage::Node & undo_parent) { --undo_parent.stat.cversion; --undo_parent.stat.numChildren; @@ -205,10 +205,10 @@ struct NuKeeperStorageCreateRequest final : public NuKeeperStorageRequest } }; -struct NuKeeperStorageGetRequest final : public NuKeeperStorageRequest +struct KeeperStorageGetRequest final : public KeeperStorageRequest { - using NuKeeperStorageRequest::NuKeeperStorageRequest; - std::pair process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t /* zxid */, int64_t /* session_id */) const override + using KeeperStorageRequest::KeeperStorageRequest; + std::pair process(KeeperStorage::Container & container, KeeperStorage::Ephemerals & /* ephemerals */, int64_t /* zxid */, int64_t /* session_id */) const override { Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperGetResponse & response = dynamic_cast(*response_ptr); @@ -230,10 +230,10 @@ struct NuKeeperStorageGetRequest final : public NuKeeperStorageRequest } }; -struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest +struct KeeperStorageRemoveRequest final : public KeeperStorageRequest { - using NuKeeperStorageRequest::NuKeeperStorageRequest; - std::pair process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & ephemerals, int64_t /*zxid*/, int64_t /*session_id*/) const override + using KeeperStorageRequest::KeeperStorageRequest; + std::pair process(KeeperStorage::Container & container, KeeperStorage::Ephemerals & ephemerals, int64_t /*zxid*/, int64_t /*session_id*/) const override { Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperRemoveResponse & response = dynamic_cast(*response_ptr); @@ -265,7 +265,7 @@ struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest } auto child_basename = getBaseName(it->key); - container.updateValue(parentPath(request.path), [&child_basename] (NuKeeperStorage::Node & parent) + container.updateValue(parentPath(request.path), [&child_basename] (KeeperStorage::Node & parent) { --parent.stat.numChildren; ++parent.stat.cversion; @@ -282,7 +282,7 @@ struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest ephemerals[prev_node.stat.ephemeralOwner].emplace(path); container.insert(path, prev_node); - container.updateValue(parentPath(path), [&child_basename] (NuKeeperStorage::Node & parent) + container.updateValue(parentPath(path), [&child_basename] (KeeperStorage::Node & parent) { ++parent.stat.numChildren; --parent.stat.cversion; @@ -294,16 +294,16 @@ struct NuKeeperStorageRemoveRequest final : public NuKeeperStorageRequest return { response_ptr, undo }; } - NuKeeperStorage::ResponsesForSessions processWatches(NuKeeperStorage::Watches & watches, NuKeeperStorage::Watches & list_watches) const override + KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override { return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::DELETED); } }; -struct NuKeeperStorageExistsRequest final : public NuKeeperStorageRequest +struct KeeperStorageExistsRequest final : public KeeperStorageRequest { - using NuKeeperStorageRequest::NuKeeperStorageRequest; - std::pair process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t /*zxid*/, int64_t /* session_id */) const override + using KeeperStorageRequest::KeeperStorageRequest; + std::pair process(KeeperStorage::Container & container, KeeperStorage::Ephemerals & /* ephemerals */, int64_t /*zxid*/, int64_t /* session_id */) const override { Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperExistsResponse & response = dynamic_cast(*response_ptr); @@ -324,10 +324,10 @@ struct NuKeeperStorageExistsRequest final : public NuKeeperStorageRequest } }; -struct NuKeeperStorageSetRequest final : public NuKeeperStorageRequest +struct KeeperStorageSetRequest final : public KeeperStorageRequest { - using NuKeeperStorageRequest::NuKeeperStorageRequest; - std::pair process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t zxid, int64_t /* session_id */) const override + using KeeperStorageRequest::KeeperStorageRequest; + std::pair process(KeeperStorage::Container & container, KeeperStorage::Ephemerals & /* ephemerals */, int64_t zxid, int64_t /* session_id */) const override { Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperSetResponse & response = dynamic_cast(*response_ptr); @@ -343,7 +343,7 @@ struct NuKeeperStorageSetRequest final : public NuKeeperStorageRequest { auto prev_node = it->value; - auto itr = container.updateValue(request.path, [zxid, request] (NuKeeperStorage::Node & value) + auto itr = container.updateValue(request.path, [zxid, request] (KeeperStorage::Node & value) { value.data = request.data; value.stat.version++; @@ -353,7 +353,7 @@ struct NuKeeperStorageSetRequest final : public NuKeeperStorageRequest value.data = request.data; }); - container.updateValue(parentPath(request.path), [] (NuKeeperStorage::Node & parent) + container.updateValue(parentPath(request.path), [] (KeeperStorage::Node & parent) { parent.stat.cversion++; }); @@ -363,8 +363,8 @@ struct NuKeeperStorageSetRequest final : public NuKeeperStorageRequest undo = [prev_node, &container, path = request.path] { - container.updateValue(path, [&prev_node] (NuKeeperStorage::Node & value) { value = prev_node; }); - container.updateValue(parentPath(path), [] (NuKeeperStorage::Node & parent) + container.updateValue(path, [&prev_node] (KeeperStorage::Node & value) { value = prev_node; }); + container.updateValue(parentPath(path), [] (KeeperStorage::Node & parent) { parent.stat.cversion--; }); @@ -378,16 +378,16 @@ struct NuKeeperStorageSetRequest final : public NuKeeperStorageRequest return { response_ptr, undo }; } - NuKeeperStorage::ResponsesForSessions processWatches(NuKeeperStorage::Watches & watches, NuKeeperStorage::Watches & list_watches) const override + KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override { return processWatchesImpl(zk_request->getPath(), watches, list_watches, Coordination::Event::CHANGED); } }; -struct NuKeeperStorageListRequest final : public NuKeeperStorageRequest +struct KeeperStorageListRequest final : public KeeperStorageRequest { - using NuKeeperStorageRequest::NuKeeperStorageRequest; - std::pair process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t /*zxid*/, int64_t /*session_id*/) const override + using KeeperStorageRequest::KeeperStorageRequest; + std::pair process(KeeperStorage::Container & container, KeeperStorage::Ephemerals & /* ephemerals */, int64_t /*zxid*/, int64_t /*session_id*/) const override { Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperListResponse & response = dynamic_cast(*response_ptr); @@ -415,10 +415,10 @@ struct NuKeeperStorageListRequest final : public NuKeeperStorageRequest } }; -struct NuKeeperStorageCheckRequest final : public NuKeeperStorageRequest +struct KeeperStorageCheckRequest final : public KeeperStorageRequest { - using NuKeeperStorageRequest::NuKeeperStorageRequest; - std::pair process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & /* ephemerals */, int64_t /*zxid*/, int64_t /*session_id*/) const override + using KeeperStorageRequest::KeeperStorageRequest; + std::pair process(KeeperStorage::Container & container, KeeperStorage::Ephemerals & /* ephemerals */, int64_t /*zxid*/, int64_t /*session_id*/) const override { Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperCheckResponse & response = dynamic_cast(*response_ptr); @@ -441,11 +441,11 @@ struct NuKeeperStorageCheckRequest final : public NuKeeperStorageRequest } }; -struct NuKeeperStorageMultiRequest final : public NuKeeperStorageRequest +struct KeeperStorageMultiRequest final : public KeeperStorageRequest { - std::vector concrete_requests; - explicit NuKeeperStorageMultiRequest(const Coordination::ZooKeeperRequestPtr & zk_request_) - : NuKeeperStorageRequest(zk_request_) + std::vector concrete_requests; + explicit KeeperStorageMultiRequest(const Coordination::ZooKeeperRequestPtr & zk_request_) + : KeeperStorageRequest(zk_request_) { Coordination::ZooKeeperMultiRequest & request = dynamic_cast(*zk_request); concrete_requests.reserve(request.requests.size()); @@ -455,26 +455,26 @@ struct NuKeeperStorageMultiRequest final : public NuKeeperStorageRequest auto sub_zk_request = std::dynamic_pointer_cast(sub_request); if (sub_zk_request->getOpNum() == Coordination::OpNum::Create) { - concrete_requests.push_back(std::make_shared(sub_zk_request)); + concrete_requests.push_back(std::make_shared(sub_zk_request)); } else if (sub_zk_request->getOpNum() == Coordination::OpNum::Remove) { - concrete_requests.push_back(std::make_shared(sub_zk_request)); + concrete_requests.push_back(std::make_shared(sub_zk_request)); } else if (sub_zk_request->getOpNum() == Coordination::OpNum::Set) { - concrete_requests.push_back(std::make_shared(sub_zk_request)); + concrete_requests.push_back(std::make_shared(sub_zk_request)); } else if (sub_zk_request->getOpNum() == Coordination::OpNum::Check) { - concrete_requests.push_back(std::make_shared(sub_zk_request)); + concrete_requests.push_back(std::make_shared(sub_zk_request)); } else throw DB::Exception(ErrorCodes::BAD_ARGUMENTS, "Illegal command as part of multi ZooKeeper request {}", sub_zk_request->getOpNum()); } } - std::pair process(NuKeeperStorage::Container & container, NuKeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const override + std::pair process(KeeperStorage::Container & container, KeeperStorage::Ephemerals & ephemerals, int64_t zxid, int64_t session_id) const override { Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperMultiResponse & response = dynamic_cast(*response_ptr); @@ -527,9 +527,9 @@ struct NuKeeperStorageMultiRequest final : public NuKeeperStorageRequest } } - NuKeeperStorage::ResponsesForSessions processWatches(NuKeeperStorage::Watches & watches, NuKeeperStorage::Watches & list_watches) const override + KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & watches, KeeperStorage::Watches & list_watches) const override { - NuKeeperStorage::ResponsesForSessions result; + KeeperStorage::ResponsesForSessions result; for (const auto & generic_request : concrete_requests) { auto responses = generic_request->processWatches(watches, list_watches); @@ -539,16 +539,16 @@ struct NuKeeperStorageMultiRequest final : public NuKeeperStorageRequest } }; -struct NuKeeperStorageCloseRequest final : public NuKeeperStorageRequest +struct KeeperStorageCloseRequest final : public KeeperStorageRequest { - using NuKeeperStorageRequest::NuKeeperStorageRequest; - std::pair process(NuKeeperStorage::Container &, NuKeeperStorage::Ephemerals &, int64_t, int64_t) const override + using KeeperStorageRequest::KeeperStorageRequest; + std::pair process(KeeperStorage::Container &, KeeperStorage::Ephemerals &, int64_t, int64_t) const override { throw DB::Exception("Called process on close request", ErrorCodes::LOGICAL_ERROR); } }; -void NuKeeperStorage::finalize() +void KeeperStorage::finalize() { if (finalized) throw DB::Exception("Testkeeper storage already finalized", ErrorCodes::LOGICAL_ERROR); @@ -568,20 +568,20 @@ void NuKeeperStorage::finalize() } -class NuKeeperWrapperFactory final : private boost::noncopyable +class KeeperWrapperFactory final : private boost::noncopyable { public: - using Creator = std::function; + using Creator = std::function; using OpNumToRequest = std::unordered_map; - static NuKeeperWrapperFactory & instance() + static KeeperWrapperFactory & instance() { - static NuKeeperWrapperFactory factory; + static KeeperWrapperFactory factory; return factory; } - NuKeeperStorageRequestPtr get(const Coordination::ZooKeeperRequestPtr & zk_request) const + KeeperStorageRequestPtr get(const Coordination::ZooKeeperRequestPtr & zk_request) const { auto it = op_num_to_request.find(zk_request->getOpNum()); if (it == op_num_to_request.end()) @@ -598,37 +598,37 @@ public: private: OpNumToRequest op_num_to_request; - NuKeeperWrapperFactory(); + KeeperWrapperFactory(); }; template -void registerNuKeeperRequestWrapper(NuKeeperWrapperFactory & factory) +void registerKeeperRequestWrapper(KeeperWrapperFactory & factory) { factory.registerRequest(num, [] (const Coordination::ZooKeeperRequestPtr & zk_request) { return std::make_shared(zk_request); }); } -NuKeeperWrapperFactory::NuKeeperWrapperFactory() +KeeperWrapperFactory::KeeperWrapperFactory() { - registerNuKeeperRequestWrapper(*this); - registerNuKeeperRequestWrapper(*this); - //registerNuKeeperRequestWrapper(*this); - registerNuKeeperRequestWrapper(*this); - registerNuKeeperRequestWrapper(*this); - registerNuKeeperRequestWrapper(*this); - registerNuKeeperRequestWrapper(*this); - registerNuKeeperRequestWrapper(*this); - registerNuKeeperRequestWrapper(*this); - registerNuKeeperRequestWrapper(*this); - registerNuKeeperRequestWrapper(*this); - registerNuKeeperRequestWrapper(*this); - registerNuKeeperRequestWrapper(*this); + registerKeeperRequestWrapper(*this); + registerKeeperRequestWrapper(*this); + //registerKeeperRequestWrapper(*this); + registerKeeperRequestWrapper(*this); + registerKeeperRequestWrapper(*this); + registerKeeperRequestWrapper(*this); + registerKeeperRequestWrapper(*this); + registerKeeperRequestWrapper(*this); + registerKeeperRequestWrapper(*this); + registerKeeperRequestWrapper(*this); + registerKeeperRequestWrapper(*this); + registerKeeperRequestWrapper(*this); + registerKeeperRequestWrapper(*this); } -NuKeeperStorage::ResponsesForSessions NuKeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id, std::optional new_last_zxid) +KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id, std::optional new_last_zxid) { - NuKeeperStorage::ResponsesForSessions results; + KeeperStorage::ResponsesForSessions results; if (new_last_zxid) { if (zxid >= *new_last_zxid) @@ -645,7 +645,7 @@ NuKeeperStorage::ResponsesForSessions NuKeeperStorage::processRequest(const Coor for (const auto & ephemeral_path : it->second) { container.erase(ephemeral_path); - container.updateValue(parentPath(ephemeral_path), [&ephemeral_path] (NuKeeperStorage::Node & parent) + container.updateValue(parentPath(ephemeral_path), [&ephemeral_path] (KeeperStorage::Node & parent) { --parent.stat.numChildren; ++parent.stat.cversion; @@ -669,7 +669,7 @@ NuKeeperStorage::ResponsesForSessions NuKeeperStorage::processRequest(const Coor } else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat) { - NuKeeperStorageRequestPtr storage_request = NuKeeperWrapperFactory::instance().get(zk_request); + KeeperStorageRequestPtr storage_request = KeeperWrapperFactory::instance().get(zk_request); auto [response, _] = storage_request->process(container, ephemerals, zxid, session_id); response->xid = zk_request->xid; response->zxid = getZXID(); @@ -678,7 +678,7 @@ NuKeeperStorage::ResponsesForSessions NuKeeperStorage::processRequest(const Coor } else { - NuKeeperStorageRequestPtr storage_request = NuKeeperWrapperFactory::instance().get(zk_request); + KeeperStorageRequestPtr storage_request = KeeperWrapperFactory::instance().get(zk_request); auto [response, _] = storage_request->process(container, ephemerals, zxid, session_id); if (zk_request->has_watch) @@ -715,7 +715,7 @@ NuKeeperStorage::ResponsesForSessions NuKeeperStorage::processRequest(const Coor } -void NuKeeperStorage::clearDeadWatches(int64_t session_id) +void KeeperStorage::clearDeadWatches(int64_t session_id) { auto watches_it = sessions_and_watchers.find(session_id); if (watches_it != sessions_and_watchers.end()) diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index 058eed55cab..585426a7441 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -14,15 +14,15 @@ namespace DB { using namespace DB; -struct NuKeeperStorageRequest; -using NuKeeperStorageRequestPtr = std::shared_ptr; +struct KeeperStorageRequest; +using KeeperStorageRequestPtr = std::shared_ptr; using ResponseCallback = std::function; using ChildrenSet = std::unordered_set; using SessionAndTimeout = std::unordered_map; -struct NuKeeperStorageSnapshot; +struct KeeperStorageSnapshot; -class NuKeeperStorage +class KeeperStorage { public: int64_t session_id_counter{1}; @@ -80,7 +80,7 @@ public: } public: - NuKeeperStorage(int64_t tick_time_ms); + KeeperStorage(int64_t tick_time_ms); int64_t getSessionID(int64_t session_timeout_ms) { @@ -131,6 +131,6 @@ public: } }; -using NuKeeperStoragePtr = std::unique_ptr; +using KeeperStoragePtr = std::unique_ptr; } diff --git a/src/Coordination/KeeperStorageDispatcher.cpp b/src/Coordination/KeeperStorageDispatcher.cpp index 5b35b9c4829..7f9f9170dc2 100644 --- a/src/Coordination/KeeperStorageDispatcher.cpp +++ b/src/Coordination/KeeperStorageDispatcher.cpp @@ -1,4 +1,4 @@ -#include +#include #include namespace DB @@ -11,18 +11,18 @@ namespace ErrorCodes extern const int TIMEOUT_EXCEEDED; } -NuKeeperStorageDispatcher::NuKeeperStorageDispatcher() +KeeperStorageDispatcher::KeeperStorageDispatcher() : coordination_settings(std::make_shared()) - , log(&Poco::Logger::get("NuKeeperDispatcher")) + , log(&Poco::Logger::get("KeeperDispatcher")) { } -void NuKeeperStorageDispatcher::requestThread() +void KeeperStorageDispatcher::requestThread() { - setThreadName("NuKeeperReqT"); + setThreadName("KeeperReqT"); while (!shutdown_called) { - NuKeeperStorage::RequestForSession request; + KeeperStorage::RequestForSession request; UInt64 max_wait = UInt64(coordination_settings->operation_timeout_ms.totalMilliseconds()); @@ -43,12 +43,12 @@ void NuKeeperStorageDispatcher::requestThread() } } -void NuKeeperStorageDispatcher::responseThread() +void KeeperStorageDispatcher::responseThread() { - setThreadName("NuKeeperRspT"); + setThreadName("KeeperRspT"); while (!shutdown_called) { - NuKeeperStorage::ResponseForSession response_for_session; + KeeperStorage::ResponseForSession response_for_session; UInt64 max_wait = UInt64(coordination_settings->operation_timeout_ms.totalMilliseconds()); @@ -69,9 +69,9 @@ void NuKeeperStorageDispatcher::responseThread() } } -void NuKeeperStorageDispatcher::snapshotThread() +void KeeperStorageDispatcher::snapshotThread() { - setThreadName("NuKeeperSnpT"); + setThreadName("KeeperSnpT"); while (!shutdown_called) { CreateSnapshotTask task; @@ -91,7 +91,7 @@ void NuKeeperStorageDispatcher::snapshotThread() } } -void NuKeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response) +void KeeperStorageDispatcher::setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response) { std::lock_guard lock(session_to_response_callback_mutex); auto session_writer = session_to_response_callback.find(session_id); @@ -104,7 +104,7 @@ void NuKeeperStorageDispatcher::setResponse(int64_t session_id, const Coordinati session_to_response_callback.erase(session_writer); } -bool NuKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id) +bool KeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id) { { std::lock_guard lock(session_to_response_callback_mutex); @@ -112,7 +112,7 @@ bool NuKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestP return false; } - NuKeeperStorage::RequestForSession request_info; + KeeperStorage::RequestForSession request_info; request_info.request = request; request_info.session_id = session_id; @@ -125,18 +125,18 @@ bool NuKeeperStorageDispatcher::putRequest(const Coordination::ZooKeeperRequestP return true; } -void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfiguration & config) +void KeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfiguration & config) { LOG_DEBUG(log, "Initializing storage dispatcher"); - int myid = config.getInt("test_keeper_server.server_id"); + int myid = config.getInt("keeper_server.server_id"); - coordination_settings->loadFromConfig("test_keeper_server.coordination_settings", config); + coordination_settings->loadFromConfig("keeper_server.coordination_settings", config); request_thread = ThreadFromGlobalPool([this] { requestThread(); }); responses_thread = ThreadFromGlobalPool([this] { responseThread(); }); snapshot_thread = ThreadFromGlobalPool([this] { snapshotThread(); }); - server = std::make_unique(myid, coordination_settings, config, responses_queue, snapshots_queue); + server = std::make_unique(myid, coordination_settings, config, responses_queue, snapshots_queue); try { LOG_DEBUG(log, "Waiting server to initialize"); @@ -158,7 +158,7 @@ void NuKeeperStorageDispatcher::initialize(const Poco::Util::AbstractConfigurati LOG_DEBUG(log, "Dispatcher initialized"); } -void NuKeeperStorageDispatcher::shutdown() +void KeeperStorageDispatcher::shutdown() { try { @@ -191,7 +191,7 @@ void NuKeeperStorageDispatcher::shutdown() if (server) server->shutdown(); - NuKeeperStorage::RequestForSession request_for_session; + KeeperStorage::RequestForSession request_for_session; while (requests_queue.tryPop(request_for_session)) { if (request_for_session.request) @@ -215,19 +215,19 @@ void NuKeeperStorageDispatcher::shutdown() LOG_DEBUG(log, "Dispatcher shut down"); } -NuKeeperStorageDispatcher::~NuKeeperStorageDispatcher() +KeeperStorageDispatcher::~KeeperStorageDispatcher() { shutdown(); } -void NuKeeperStorageDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCallback callback) +void KeeperStorageDispatcher::registerSession(int64_t session_id, ZooKeeperResponseCallback callback) { std::lock_guard lock(session_to_response_callback_mutex); if (!session_to_response_callback.try_emplace(session_id, callback).second) throw Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session with id {} already registered in dispatcher", session_id); } -void NuKeeperStorageDispatcher::sessionCleanerTask() +void KeeperStorageDispatcher::sessionCleanerTask() { while (true) { @@ -244,7 +244,7 @@ void NuKeeperStorageDispatcher::sessionCleanerTask() LOG_INFO(log, "Found dead session {}, will try to close it", dead_session); Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close); request->xid = Coordination::CLOSE_XID; - NuKeeperStorage::RequestForSession request_info; + KeeperStorage::RequestForSession request_info; request_info.request = request; request_info.session_id = dead_session; { @@ -265,7 +265,7 @@ void NuKeeperStorageDispatcher::sessionCleanerTask() } } -void NuKeeperStorageDispatcher::finishSession(int64_t session_id) +void KeeperStorageDispatcher::finishSession(int64_t session_id) { std::lock_guard lock(session_to_response_callback_mutex); auto session_it = session_to_response_callback.find(session_id); diff --git a/src/Coordination/KeeperStorageDispatcher.h b/src/Coordination/KeeperStorageDispatcher.h index 1b15bd149e5..622b63be800 100644 --- a/src/Coordination/KeeperStorageDispatcher.h +++ b/src/Coordination/KeeperStorageDispatcher.h @@ -13,7 +13,7 @@ #include #include #include -#include +#include #include @@ -22,14 +22,14 @@ namespace DB using ZooKeeperResponseCallback = std::function; -class NuKeeperStorageDispatcher +class KeeperStorageDispatcher { private: std::mutex push_request_mutex; CoordinationSettingsPtr coordination_settings; - using RequestsQueue = ConcurrentBoundedQueue; + using RequestsQueue = ConcurrentBoundedQueue; using SessionToResponseCallback = std::unordered_map; RequestsQueue requests_queue{1}; @@ -46,7 +46,7 @@ private: ThreadFromGlobalPool session_cleaner_thread; ThreadFromGlobalPool snapshot_thread; - std::unique_ptr server; + std::unique_ptr server; Poco::Logger * log; @@ -58,13 +58,13 @@ private: void setResponse(int64_t session_id, const Coordination::ZooKeeperResponsePtr & response); public: - NuKeeperStorageDispatcher(); + KeeperStorageDispatcher(); void initialize(const Poco::Util::AbstractConfiguration & config); void shutdown(); - ~NuKeeperStorageDispatcher(); + ~KeeperStorageDispatcher(); bool putRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id); diff --git a/src/Coordination/tests/gtest_for_build.cpp b/src/Coordination/tests/gtest_for_build.cpp index cc3dcc04e53..06e60e5592f 100644 --- a/src/Coordination/tests/gtest_for_build.cpp +++ b/src/Coordination/tests/gtest_for_build.cpp @@ -9,10 +9,10 @@ #include #include #include -#include -#include +#include +#include #include -#include +#include #include #include #include @@ -24,7 +24,7 @@ #include #include // Y_IGNORE #include -#include +#include #include #include @@ -102,7 +102,7 @@ struct SimpliestRaftServer , port(port_) , endpoint(hostname + ":" + std::to_string(port)) , state_machine(nuraft::cs_new()) - , state_manager(nuraft::cs_new(server_id, hostname, port, logs_path)) + , state_manager(nuraft::cs_new(server_id, hostname, port, logs_path)) { state_manager->loadLogStore(1, 0); nuraft::raft_params params; @@ -153,7 +153,7 @@ struct SimpliestRaftServer nuraft::ptr state_machine; // State manager. - nuraft::ptr state_manager; + nuraft::ptr state_manager; // Raft launcher. nuraft::raft_launcher launcher; @@ -207,7 +207,7 @@ DB::LogEntryPtr getLogEntry(const std::string & s, size_t term) TEST(CoordinationTest, ChangelogTestSimple) { ChangelogDirTest test("./logs"); - DB::NuKeeperLogStore changelog("./logs", 5, true); + DB::KeeperLogStore changelog("./logs", 5, true); changelog.init(1, 0); auto entry = getLogEntry("hello world", 77); changelog.append(entry); @@ -221,7 +221,7 @@ TEST(CoordinationTest, ChangelogTestSimple) TEST(CoordinationTest, ChangelogTestFile) { ChangelogDirTest test("./logs"); - DB::NuKeeperLogStore changelog("./logs", 5, true); + DB::KeeperLogStore changelog("./logs", 5, true); changelog.init(1, 0); auto entry = getLogEntry("hello world", 77); changelog.append(entry); @@ -242,7 +242,7 @@ TEST(CoordinationTest, ChangelogTestFile) TEST(CoordinationTest, ChangelogReadWrite) { ChangelogDirTest test("./logs"); - DB::NuKeeperLogStore changelog("./logs", 1000, true); + DB::KeeperLogStore changelog("./logs", 1000, true); changelog.init(1, 0); for (size_t i = 0; i < 10; ++i) { @@ -250,7 +250,7 @@ TEST(CoordinationTest, ChangelogReadWrite) changelog.append(entry); } EXPECT_EQ(changelog.size(), 10); - DB::NuKeeperLogStore changelog_reader("./logs", 1000, true); + DB::KeeperLogStore changelog_reader("./logs", 1000, true); changelog_reader.init(1, 0); EXPECT_EQ(changelog_reader.size(), 10); EXPECT_EQ(changelog_reader.last_entry()->get_term(), changelog.last_entry()->get_term()); @@ -269,7 +269,7 @@ TEST(CoordinationTest, ChangelogReadWrite) TEST(CoordinationTest, ChangelogWriteAt) { ChangelogDirTest test("./logs"); - DB::NuKeeperLogStore changelog("./logs", 1000, true); + DB::KeeperLogStore changelog("./logs", 1000, true); changelog.init(1, 0); for (size_t i = 0; i < 10; ++i) { @@ -285,7 +285,7 @@ TEST(CoordinationTest, ChangelogWriteAt) EXPECT_EQ(changelog.entry_at(7)->get_term(), 77); EXPECT_EQ(changelog.next_slot(), 8); - DB::NuKeeperLogStore changelog_reader("./logs", 1000, true); + DB::KeeperLogStore changelog_reader("./logs", 1000, true); changelog_reader.init(1, 0); EXPECT_EQ(changelog_reader.size(), changelog.size()); @@ -298,7 +298,7 @@ TEST(CoordinationTest, ChangelogWriteAt) TEST(CoordinationTest, ChangelogTestAppendAfterRead) { ChangelogDirTest test("./logs"); - DB::NuKeeperLogStore changelog("./logs", 5, true); + DB::KeeperLogStore changelog("./logs", 5, true); changelog.init(1, 0); for (size_t i = 0; i < 7; ++i) { @@ -310,7 +310,7 @@ TEST(CoordinationTest, ChangelogTestAppendAfterRead) EXPECT_TRUE(fs::exists("./logs/changelog_1_5.bin")); EXPECT_TRUE(fs::exists("./logs/changelog_6_10.bin")); - DB::NuKeeperLogStore changelog_reader("./logs", 5, true); + DB::KeeperLogStore changelog_reader("./logs", 5, true); changelog_reader.init(1, 0); EXPECT_EQ(changelog_reader.size(), 7); @@ -346,7 +346,7 @@ TEST(CoordinationTest, ChangelogTestAppendAfterRead) TEST(CoordinationTest, ChangelogTestCompaction) { ChangelogDirTest test("./logs"); - DB::NuKeeperLogStore changelog("./logs", 5, true); + DB::KeeperLogStore changelog("./logs", 5, true); changelog.init(1, 0); for (size_t i = 0; i < 3; ++i) @@ -387,7 +387,7 @@ TEST(CoordinationTest, ChangelogTestCompaction) EXPECT_EQ(changelog.next_slot(), 8); EXPECT_EQ(changelog.last_entry()->get_term(), 60); /// And we able to read it - DB::NuKeeperLogStore changelog_reader("./logs", 5, true); + DB::KeeperLogStore changelog_reader("./logs", 5, true); changelog_reader.init(7, 0); EXPECT_EQ(changelog_reader.size(), 1); EXPECT_EQ(changelog_reader.start_index(), 7); @@ -398,7 +398,7 @@ TEST(CoordinationTest, ChangelogTestCompaction) TEST(CoordinationTest, ChangelogTestBatchOperations) { ChangelogDirTest test("./logs"); - DB::NuKeeperLogStore changelog("./logs", 100, true); + DB::KeeperLogStore changelog("./logs", 100, true); changelog.init(1, 0); for (size_t i = 0; i < 10; ++i) { @@ -410,7 +410,7 @@ TEST(CoordinationTest, ChangelogTestBatchOperations) auto entries = changelog.pack(1, 5); - DB::NuKeeperLogStore apply_changelog("./logs", 100, true); + DB::KeeperLogStore apply_changelog("./logs", 100, true); apply_changelog.init(1, 0); for (size_t i = 0; i < 10; ++i) @@ -440,7 +440,7 @@ TEST(CoordinationTest, ChangelogTestBatchOperations) TEST(CoordinationTest, ChangelogTestBatchOperationsEmpty) { ChangelogDirTest test("./logs"); - DB::NuKeeperLogStore changelog("./logs", 100, true); + DB::KeeperLogStore changelog("./logs", 100, true); changelog.init(1, 0); for (size_t i = 0; i < 10; ++i) { @@ -453,7 +453,7 @@ TEST(CoordinationTest, ChangelogTestBatchOperationsEmpty) auto entries = changelog.pack(5, 5); ChangelogDirTest test1("./logs1"); - DB::NuKeeperLogStore changelog_new("./logs1", 100, true); + DB::KeeperLogStore changelog_new("./logs1", 100, true); changelog_new.init(1, 0); EXPECT_EQ(changelog_new.size(), 0); @@ -472,7 +472,7 @@ TEST(CoordinationTest, ChangelogTestBatchOperationsEmpty) EXPECT_EQ(changelog_new.start_index(), 5); EXPECT_EQ(changelog_new.next_slot(), 11); - DB::NuKeeperLogStore changelog_reader("./logs1", 100, true); + DB::KeeperLogStore changelog_reader("./logs1", 100, true); changelog_reader.init(5, 0); } @@ -480,7 +480,7 @@ TEST(CoordinationTest, ChangelogTestBatchOperationsEmpty) TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile) { ChangelogDirTest test("./logs"); - DB::NuKeeperLogStore changelog("./logs", 5, true); + DB::KeeperLogStore changelog("./logs", 5, true); changelog.init(1, 0); for (size_t i = 0; i < 33; ++i) @@ -515,7 +515,7 @@ TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile) EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin")); EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin")); - DB::NuKeeperLogStore changelog_read("./logs", 5, true); + DB::KeeperLogStore changelog_read("./logs", 5, true); changelog_read.init(1, 0); EXPECT_EQ(changelog_read.size(), 7); EXPECT_EQ(changelog_read.start_index(), 1); @@ -526,7 +526,7 @@ TEST(CoordinationTest, ChangelogTestWriteAtPreviousFile) TEST(CoordinationTest, ChangelogTestWriteAtFileBorder) { ChangelogDirTest test("./logs"); - DB::NuKeeperLogStore changelog("./logs", 5, true); + DB::KeeperLogStore changelog("./logs", 5, true); changelog.init(1, 0); for (size_t i = 0; i < 33; ++i) @@ -561,7 +561,7 @@ TEST(CoordinationTest, ChangelogTestWriteAtFileBorder) EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin")); EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin")); - DB::NuKeeperLogStore changelog_read("./logs", 5, true); + DB::KeeperLogStore changelog_read("./logs", 5, true); changelog_read.init(1, 0); EXPECT_EQ(changelog_read.size(), 11); EXPECT_EQ(changelog_read.start_index(), 1); @@ -572,7 +572,7 @@ TEST(CoordinationTest, ChangelogTestWriteAtFileBorder) TEST(CoordinationTest, ChangelogTestWriteAtAllFiles) { ChangelogDirTest test("./logs"); - DB::NuKeeperLogStore changelog("./logs", 5, true); + DB::KeeperLogStore changelog("./logs", 5, true); changelog.init(1, 0); for (size_t i = 0; i < 33; ++i) @@ -611,7 +611,7 @@ TEST(CoordinationTest, ChangelogTestWriteAtAllFiles) TEST(CoordinationTest, ChangelogTestStartNewLogAfterRead) { ChangelogDirTest test("./logs"); - DB::NuKeeperLogStore changelog("./logs", 5, true); + DB::KeeperLogStore changelog("./logs", 5, true); changelog.init(1, 0); for (size_t i = 0; i < 35; ++i) @@ -630,7 +630,7 @@ TEST(CoordinationTest, ChangelogTestStartNewLogAfterRead) EXPECT_FALSE(fs::exists("./logs/changelog_36_40.bin")); - DB::NuKeeperLogStore changelog_reader("./logs", 5, true); + DB::KeeperLogStore changelog_reader("./logs", 5, true); changelog_reader.init(1, 0); auto entry = getLogEntry("36_hello_world", 360); @@ -652,7 +652,7 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate) { ChangelogDirTest test("./logs"); - DB::NuKeeperLogStore changelog("./logs", 5, true); + DB::KeeperLogStore changelog("./logs", 5, true); changelog.init(1, 0); for (size_t i = 0; i < 35; ++i) @@ -672,7 +672,7 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate) DB::WriteBufferFromFile plain_buf("./logs/changelog_11_15.bin", DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY); plain_buf.truncate(0); - DB::NuKeeperLogStore changelog_reader("./logs", 5, true); + DB::KeeperLogStore changelog_reader("./logs", 5, true); changelog_reader.init(1, 0); EXPECT_EQ(changelog_reader.size(), 10); @@ -701,7 +701,7 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate) EXPECT_FALSE(fs::exists("./logs/changelog_26_30.bin")); EXPECT_FALSE(fs::exists("./logs/changelog_31_35.bin")); - DB::NuKeeperLogStore changelog_reader2("./logs", 5, true); + DB::KeeperLogStore changelog_reader2("./logs", 5, true); changelog_reader2.init(1, 0); EXPECT_EQ(changelog_reader2.size(), 11); EXPECT_EQ(changelog_reader2.last_entry()->get_term(), 7777); @@ -711,7 +711,7 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2) { ChangelogDirTest test("./logs"); - DB::NuKeeperLogStore changelog("./logs", 20, true); + DB::KeeperLogStore changelog("./logs", 20, true); changelog.init(1, 0); for (size_t i = 0; i < 35; ++i) @@ -726,7 +726,7 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2) DB::WriteBufferFromFile plain_buf("./logs/changelog_1_20.bin", DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY); plain_buf.truncate(140); - DB::NuKeeperLogStore changelog_reader("./logs", 20, true); + DB::KeeperLogStore changelog_reader("./logs", 20, true); changelog_reader.init(1, 0); EXPECT_EQ(changelog_reader.size(), 2); @@ -739,7 +739,7 @@ TEST(CoordinationTest, ChangelogTestReadAfterBrokenTruncate2) EXPECT_EQ(changelog_reader.last_entry()->get_term(), 7777); - DB::NuKeeperLogStore changelog_reader2("./logs", 20, true); + DB::KeeperLogStore changelog_reader2("./logs", 20, true); changelog_reader2.init(1, 0); EXPECT_EQ(changelog_reader2.size(), 3); EXPECT_EQ(changelog_reader2.last_entry()->get_term(), 7777); @@ -749,7 +749,7 @@ TEST(CoordinationTest, ChangelogTestLostFiles) { ChangelogDirTest test("./logs"); - DB::NuKeeperLogStore changelog("./logs", 20, true); + DB::KeeperLogStore changelog("./logs", 20, true); changelog.init(1, 0); for (size_t i = 0; i < 35; ++i) @@ -763,7 +763,7 @@ TEST(CoordinationTest, ChangelogTestLostFiles) fs::remove("./logs/changelog_1_20.bin"); - DB::NuKeeperLogStore changelog_reader("./logs", 20, true); + DB::KeeperLogStore changelog_reader("./logs", 20, true); /// It should print error message, but still able to start changelog_reader.init(5, 0); EXPECT_FALSE(fs::exists("./logs/changelog_1_20.bin")); @@ -862,9 +862,9 @@ TEST(CoordinationTest, SnapshotableHashMapTrySnapshot) map_snp.disableSnapshotMode(); } -void addNode(DB::NuKeeperStorage & storage, const std::string & path, const std::string & data, int64_t ephemeral_owner=0) +void addNode(DB::KeeperStorage & storage, const std::string & path, const std::string & data, int64_t ephemeral_owner=0) { - using Node = DB::NuKeeperStorage::Node; + using Node = DB::KeeperStorage::Node; Node node{}; node.data = data; node.stat.ephemeralOwner = ephemeral_owner; @@ -874,9 +874,9 @@ void addNode(DB::NuKeeperStorage & storage, const std::string & path, const std: TEST(CoordinationTest, TestStorageSnapshotSimple) { ChangelogDirTest test("./snapshots"); - DB::NuKeeperSnapshotManager manager("./snapshots", 3); + DB::KeeperSnapshotManager manager("./snapshots", 3); - DB::NuKeeperStorage storage(500); + DB::KeeperStorage storage(500); addNode(storage, "/hello", "world", 1); addNode(storage, "/hello/somepath", "somedata", 3); storage.session_id_counter = 5; @@ -886,7 +886,7 @@ TEST(CoordinationTest, TestStorageSnapshotSimple) storage.getSessionID(130); storage.getSessionID(130); - DB::NuKeeperStorageSnapshot snapshot(&storage, 2); + DB::KeeperStorageSnapshot snapshot(&storage, 2); EXPECT_EQ(snapshot.snapshot_meta->get_last_log_idx(), 2); EXPECT_EQ(snapshot.session_id, 7); @@ -921,9 +921,9 @@ TEST(CoordinationTest, TestStorageSnapshotSimple) TEST(CoordinationTest, TestStorageSnapshotMoreWrites) { ChangelogDirTest test("./snapshots"); - DB::NuKeeperSnapshotManager manager("./snapshots", 3); + DB::KeeperSnapshotManager manager("./snapshots", 3); - DB::NuKeeperStorage storage(500); + DB::KeeperStorage storage(500); storage.getSessionID(130); for (size_t i = 0; i < 50; ++i) @@ -931,7 +931,7 @@ TEST(CoordinationTest, TestStorageSnapshotMoreWrites) addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i)); } - DB::NuKeeperStorageSnapshot snapshot(&storage, 50); + DB::KeeperStorageSnapshot snapshot(&storage, 50); EXPECT_EQ(snapshot.snapshot_meta->get_last_log_idx(), 50); EXPECT_EQ(snapshot.snapshot_container_size, 51); @@ -961,9 +961,9 @@ TEST(CoordinationTest, TestStorageSnapshotMoreWrites) TEST(CoordinationTest, TestStorageSnapshotManySnapshots) { ChangelogDirTest test("./snapshots"); - DB::NuKeeperSnapshotManager manager("./snapshots", 3); + DB::KeeperSnapshotManager manager("./snapshots", 3); - DB::NuKeeperStorage storage(500); + DB::KeeperStorage storage(500); storage.getSessionID(130); for (size_t j = 1; j <= 5; ++j) @@ -973,7 +973,7 @@ TEST(CoordinationTest, TestStorageSnapshotManySnapshots) addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i)); } - DB::NuKeeperStorageSnapshot snapshot(&storage, j * 50); + DB::KeeperStorageSnapshot snapshot(&storage, j * 50); auto buf = manager.serializeSnapshotToBuffer(snapshot); manager.serializeSnapshotBufferToDisk(*buf, j * 50); EXPECT_TRUE(fs::exists(std::string{"./snapshots/snapshot_"} + std::to_string(j * 50) + ".bin")); @@ -999,15 +999,15 @@ TEST(CoordinationTest, TestStorageSnapshotManySnapshots) TEST(CoordinationTest, TestStorageSnapshotMode) { ChangelogDirTest test("./snapshots"); - DB::NuKeeperSnapshotManager manager("./snapshots", 3); - DB::NuKeeperStorage storage(500); + DB::KeeperSnapshotManager manager("./snapshots", 3); + DB::KeeperStorage storage(500); for (size_t i = 0; i < 50; ++i) { addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i)); } { - DB::NuKeeperStorageSnapshot snapshot(&storage, 50); + DB::KeeperStorageSnapshot snapshot(&storage, 50); for (size_t i = 0; i < 50; ++i) { addNode(storage, "/hello_" + std::to_string(i), "wlrd_" + std::to_string(i)); @@ -1050,14 +1050,14 @@ TEST(CoordinationTest, TestStorageSnapshotMode) TEST(CoordinationTest, TestStorageSnapshotBroken) { ChangelogDirTest test("./snapshots"); - DB::NuKeeperSnapshotManager manager("./snapshots", 3); - DB::NuKeeperStorage storage(500); + DB::KeeperSnapshotManager manager("./snapshots", 3); + DB::KeeperStorage storage(500); for (size_t i = 0; i < 50; ++i) { addNode(storage, "/hello_" + std::to_string(i), "world_" + std::to_string(i)); } { - DB::NuKeeperStorageSnapshot snapshot(&storage, 50); + DB::KeeperStorageSnapshot snapshot(&storage, 50); auto buf = manager.serializeSnapshotToBuffer(snapshot); manager.serializeSnapshotBufferToDisk(*buf, 50); } @@ -1095,9 +1095,9 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, size ResponsesQueue queue; SnapshotsQueue snapshots_queue{1}; - auto state_machine = std::make_shared(queue, snapshots_queue, "./snapshots", settings); + auto state_machine = std::make_shared(queue, snapshots_queue, "./snapshots", settings); state_machine->init(); - DB::NuKeeperLogStore changelog("./logs", settings->rotate_log_storage_interval, true); + DB::KeeperLogStore changelog("./logs", settings->rotate_log_storage_interval, true); changelog.init(state_machine->last_commit_index() + 1, settings->reserved_log_items); for (size_t i = 1; i < total_logs + 1; ++i) { @@ -1132,11 +1132,11 @@ void testLogAndStateMachine(Coordination::CoordinationSettingsPtr settings, size } SnapshotsQueue snapshots_queue1{1}; - auto restore_machine = std::make_shared(queue, snapshots_queue1, "./snapshots", settings); + auto restore_machine = std::make_shared(queue, snapshots_queue1, "./snapshots", settings); restore_machine->init(); EXPECT_EQ(restore_machine->last_commit_index(), total_logs - total_logs % settings->snapshot_distance); - DB::NuKeeperLogStore restore_changelog("./logs", settings->rotate_log_storage_interval, true); + DB::KeeperLogStore restore_changelog("./logs", settings->rotate_log_storage_interval, true); restore_changelog.init(restore_machine->last_commit_index() + 1, settings->reserved_log_items); EXPECT_EQ(restore_changelog.size(), std::min(settings->reserved_log_items + total_logs % settings->snapshot_distance, total_logs)); @@ -1242,7 +1242,7 @@ TEST(CoordinationTest, TestEphemeralNodeRemove) ResponsesQueue queue; SnapshotsQueue snapshots_queue{1}; - auto state_machine = std::make_shared(queue, snapshots_queue, "./snapshots", settings); + auto state_machine = std::make_shared(queue, snapshots_queue, "./snapshots", settings); state_machine->init(); std::shared_ptr request_c = std::make_shared(); diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 7989ababac9..d758173998d 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -12,7 +12,7 @@ #include #include #include -#include +#include #include #include #include @@ -313,7 +313,7 @@ struct ContextShared #if USE_NURAFT mutable std::mutex nu_keeper_storage_dispatcher_mutex; - mutable std::shared_ptr nu_keeper_storage_dispatcher; + mutable std::shared_ptr nu_keeper_storage_dispatcher; #endif mutable std::mutex auxiliary_zookeepers_mutex; mutable std::map auxiliary_zookeepers; /// Map for auxiliary ZooKeeper clients. @@ -1615,35 +1615,35 @@ zkutil::ZooKeeperPtr Context::getZooKeeper() const } -void Context::initializeNuKeeperStorageDispatcher() const +void Context::initializeKeeperStorageDispatcher() const { #if USE_NURAFT std::lock_guard lock(shared->nu_keeper_storage_dispatcher_mutex); if (shared->nu_keeper_storage_dispatcher) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to initialize NuKeeper multiple times"); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to initialize Keeper multiple times"); const auto & config = getConfigRef(); - if (config.has("test_keeper_server")) + if (config.has("keeper_server")) { - shared->nu_keeper_storage_dispatcher = std::make_shared(); + shared->nu_keeper_storage_dispatcher = std::make_shared(); shared->nu_keeper_storage_dispatcher->initialize(config); } #endif } #if USE_NURAFT -std::shared_ptr & Context::getNuKeeperStorageDispatcher() const +std::shared_ptr & Context::getKeeperStorageDispatcher() const { std::lock_guard lock(shared->nu_keeper_storage_dispatcher_mutex); if (!shared->nu_keeper_storage_dispatcher) - throw Exception(ErrorCodes::LOGICAL_ERROR, "NuKeeper must be initialized before requests"); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Keeper must be initialized before requests"); return shared->nu_keeper_storage_dispatcher; } #endif -void Context::shutdownNuKeeperStorageDispatcher() const +void Context::shutdownKeeperStorageDispatcher() const { #if USE_NURAFT std::lock_guard lock(shared->nu_keeper_storage_dispatcher_mutex); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 3e24c8520a4..5c98a2ba64a 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -111,7 +111,7 @@ class StoragePolicySelector; using StoragePolicySelectorPtr = std::shared_ptr; struct PartUUIDs; using PartUUIDsPtr = std::shared_ptr; -class NuKeeperStorageDispatcher; +class KeeperStorageDispatcher; class IOutputFormat; using OutputFormatPtr = std::shared_ptr; @@ -598,10 +598,10 @@ public: std::shared_ptr getAuxiliaryZooKeeper(const String & name) const; #if USE_NURAFT - std::shared_ptr & getNuKeeperStorageDispatcher() const; + std::shared_ptr & getKeeperStorageDispatcher() const; #endif - void initializeNuKeeperStorageDispatcher() const; - void shutdownNuKeeperStorageDispatcher() const; + void initializeKeeperStorageDispatcher() const; + void shutdownKeeperStorageDispatcher() const; /// Set auxiliary zookeepers configuration at server starting or configuration reloading. void reloadAuxiliaryZooKeepersConfigIfChanged(const ConfigurationPtr & config); diff --git a/src/Server/NuKeeperTCPHandler.cpp b/src/Server/KeeperTCPHandler.cpp similarity index 94% rename from src/Server/NuKeeperTCPHandler.cpp rename to src/Server/KeeperTCPHandler.cpp index b676331f6c0..bf725581a29 100644 --- a/src/Server/NuKeeperTCPHandler.cpp +++ b/src/Server/KeeperTCPHandler.cpp @@ -1,4 +1,4 @@ -#include +#include #if USE_NURAFT @@ -189,20 +189,20 @@ struct SocketInterruptablePollWrapper #endif }; -NuKeeperTCPHandler::NuKeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_) +KeeperTCPHandler::KeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_) : Poco::Net::TCPServerConnection(socket_) , server(server_) - , log(&Poco::Logger::get("NuKeeperTCPHandler")) + , log(&Poco::Logger::get("KeeperTCPHandler")) , global_context(server.context()) - , nu_keeper_storage_dispatcher(global_context.getNuKeeperStorageDispatcher()) - , operation_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000) - , session_timeout(0, global_context.getConfigRef().getUInt("test_keeper_server.session_timeout_ms", Coordination::DEFAULT_SESSION_TIMEOUT_MS) * 1000) + , nu_keeper_storage_dispatcher(global_context.getKeeperStorageDispatcher()) + , operation_timeout(0, global_context.getConfigRef().getUInt("keeper_server.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS) * 1000) + , session_timeout(0, global_context.getConfigRef().getUInt("keeper_server.session_timeout_ms", Coordination::DEFAULT_SESSION_TIMEOUT_MS) * 1000) , poll_wrapper(std::make_unique(socket_)) , responses(std::make_unique()) { } -void NuKeeperTCPHandler::sendHandshake(bool has_leader) +void KeeperTCPHandler::sendHandshake(bool has_leader) { Coordination::write(Coordination::SERVER_HANDSHAKE_LENGTH, *out); if (has_leader) @@ -217,12 +217,12 @@ void NuKeeperTCPHandler::sendHandshake(bool has_leader) out->next(); } -void NuKeeperTCPHandler::run() +void KeeperTCPHandler::run() { runImpl(); } -Poco::Timespan NuKeeperTCPHandler::receiveHandshake() +Poco::Timespan KeeperTCPHandler::receiveHandshake() { int32_t handshake_length; int32_t protocol_version; @@ -254,7 +254,7 @@ Poco::Timespan NuKeeperTCPHandler::receiveHandshake() } -void NuKeeperTCPHandler::runImpl() +void KeeperTCPHandler::runImpl() { setThreadName("TstKprHandler"); ThreadStatus thread_status; @@ -393,7 +393,7 @@ void NuKeeperTCPHandler::runImpl() } } -std::pair NuKeeperTCPHandler::receiveRequest() +std::pair KeeperTCPHandler::receiveRequest() { int32_t length; Coordination::read(length, *in); diff --git a/src/Server/NuKeeperTCPHandler.h b/src/Server/KeeperTCPHandler.h similarity index 85% rename from src/Server/NuKeeperTCPHandler.h rename to src/Server/KeeperTCPHandler.h index 03a857ad1d7..fecaf1cc38f 100644 --- a/src/Server/NuKeeperTCPHandler.h +++ b/src/Server/KeeperTCPHandler.h @@ -13,7 +13,7 @@ #include #include #include -#include +#include #include #include #include @@ -29,16 +29,16 @@ using ThreadSafeResponseQueue = ThreadSafeQueue; -class NuKeeperTCPHandler : public Poco::Net::TCPServerConnection +class KeeperTCPHandler : public Poco::Net::TCPServerConnection { public: - NuKeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_); + KeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_); void run() override; private: IServer & server; Poco::Logger * log; Context global_context; - std::shared_ptr nu_keeper_storage_dispatcher; + std::shared_ptr nu_keeper_storage_dispatcher; Poco::Timespan operation_timeout; Poco::Timespan session_timeout; int64_t session_id{-1}; diff --git a/src/Server/NuKeeperTCPHandlerFactory.h b/src/Server/KeeperTCPHandlerFactory.h similarity index 68% rename from src/Server/NuKeeperTCPHandlerFactory.h rename to src/Server/KeeperTCPHandlerFactory.h index 0fd86ebc21f..adeb829b4c3 100644 --- a/src/Server/NuKeeperTCPHandlerFactory.h +++ b/src/Server/KeeperTCPHandlerFactory.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include #include #include @@ -9,7 +9,7 @@ namespace DB { -class NuKeeperTCPHandlerFactory : public Poco::Net::TCPServerConnectionFactory +class KeeperTCPHandlerFactory : public Poco::Net::TCPServerConnectionFactory { private: IServer & server; @@ -21,9 +21,9 @@ private: void run() override {} }; public: - NuKeeperTCPHandlerFactory(IServer & server_) + KeeperTCPHandlerFactory(IServer & server_) : server(server_) - , log(&Poco::Logger::get("NuKeeperTCPHandlerFactory")) + , log(&Poco::Logger::get("KeeperTCPHandlerFactory")) { } @@ -31,8 +31,8 @@ public: { try { - LOG_TRACE(log, "NuKeeper request. Address: {}", socket.peerAddress().toString()); - return new NuKeeperTCPHandler(server, socket); + LOG_TRACE(log, "Keeper request. Address: {}", socket.peerAddress().toString()); + return new KeeperTCPHandler(server, socket); } catch (const Poco::Net::NetException &) { diff --git a/src/Server/ya.make b/src/Server/ya.make index ef5ef6d5f57..91a6414fe24 100644 --- a/src/Server/ya.make +++ b/src/Server/ya.make @@ -25,7 +25,7 @@ SRCS( MySQLHandler.cpp MySQLHandlerFactory.cpp NotFoundHandler.cpp - NuKeeperTCPHandler.cpp + KeeperTCPHandler.cpp PostgreSQLHandler.cpp PostgreSQLHandlerFactory.cpp PrometheusMetricsWriter.cpp diff --git a/tests/config/config.d/test_keeper_port.xml b/tests/config/config.d/keeper_port.xml similarity index 91% rename from tests/config/config.d/test_keeper_port.xml rename to tests/config/config.d/keeper_port.xml index 438ecef60ea..c41040f1613 100644 --- a/tests/config/config.d/test_keeper_port.xml +++ b/tests/config/config.d/keeper_port.xml @@ -1,5 +1,5 @@ - + 9181 1 @@ -17,5 +17,5 @@ 44444 - + diff --git a/tests/config/install.sh b/tests/config/install.sh index 1fca2b11e04..75f6f2b8c46 100755 --- a/tests/config/install.sh +++ b/tests/config/install.sh @@ -29,7 +29,7 @@ ln -sf $SRC_PATH/config.d/graphite.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/database_atomic.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/max_concurrent_queries.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/test_cluster_with_incorrect_pw.xml $DEST_SERVER_PATH/config.d/ -ln -sf $SRC_PATH/config.d/test_keeper_port.xml $DEST_SERVER_PATH/config.d/ +ln -sf $SRC_PATH/config.d/keeper_port.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/logging_no_rotate.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/tcp_with_proxy.xml $DEST_SERVER_PATH/config.d/ ln -sf $SRC_PATH/config.d/top_level_domains_lists.xml $DEST_SERVER_PATH/config.d/ diff --git a/tests/jepsen.nukeeper/.gitignore b/tests/jepsen.clickhouse-keeper/.gitignore similarity index 100% rename from tests/jepsen.nukeeper/.gitignore rename to tests/jepsen.clickhouse-keeper/.gitignore diff --git a/tests/jepsen.nukeeper/LICENSE b/tests/jepsen.clickhouse-keeper/LICENSE similarity index 100% rename from tests/jepsen.nukeeper/LICENSE rename to tests/jepsen.clickhouse-keeper/LICENSE diff --git a/tests/jepsen.nukeeper/README.md b/tests/jepsen.clickhouse-keeper/README.md similarity index 100% rename from tests/jepsen.nukeeper/README.md rename to tests/jepsen.clickhouse-keeper/README.md diff --git a/tests/jepsen.nukeeper/doc/intro.md b/tests/jepsen.clickhouse-keeper/doc/intro.md similarity index 100% rename from tests/jepsen.nukeeper/doc/intro.md rename to tests/jepsen.clickhouse-keeper/doc/intro.md diff --git a/tests/jepsen.nukeeper/project.clj b/tests/jepsen.clickhouse-keeper/project.clj similarity index 76% rename from tests/jepsen.nukeeper/project.clj rename to tests/jepsen.clickhouse-keeper/project.clj index e7150c9e5d4..c9b24e0ce2c 100644 --- a/tests/jepsen.nukeeper/project.clj +++ b/tests/jepsen.clickhouse-keeper/project.clj @@ -1,13 +1,13 @@ (defproject jepsen.nukeeper "0.1.0-SNAPSHOT" :injections [(.. System (setProperty "zookeeper.request.timeout" "10000"))] - :description "A jepsen tests for ClickHouse NuKeeper" + :description "A jepsen tests for ClickHouse Keeper" :url "https://clickhouse.tech/" :license {:name "EPL-2.0" :url "https://www.eclipse.org/legal/epl-2.0/"} - :main jepsen.nukeeper.main + :main jepsen.clickhouse-keeper.main :plugins [[lein-cljfmt "0.7.0"]] :dependencies [[org.clojure/clojure "1.10.1"] [jepsen "0.2.3"] [zookeeper-clj "0.9.4"] [org.apache.zookeeper/zookeeper "3.6.1" :exclusions [org.slf4j/slf4j-log4j12]]] - :repl-options {:init-ns jepsen.nukeeper.main}) + :repl-options {:init-ns jepsen.clickhouse-keeper.main}) diff --git a/tests/jepsen.nukeeper/resources/config.xml b/tests/jepsen.clickhouse-keeper/resources/config.xml similarity index 100% rename from tests/jepsen.nukeeper/resources/config.xml rename to tests/jepsen.clickhouse-keeper/resources/config.xml diff --git a/tests/jepsen.nukeeper/resources/listen.xml b/tests/jepsen.clickhouse-keeper/resources/listen.xml similarity index 100% rename from tests/jepsen.nukeeper/resources/listen.xml rename to tests/jepsen.clickhouse-keeper/resources/listen.xml diff --git a/tests/jepsen.nukeeper/resources/test_keeper_config.xml b/tests/jepsen.clickhouse-keeper/resources/test_keeper_config.xml similarity index 95% rename from tests/jepsen.nukeeper/resources/test_keeper_config.xml rename to tests/jepsen.clickhouse-keeper/resources/test_keeper_config.xml index c69fb0f228c..223481bdaea 100644 --- a/tests/jepsen.nukeeper/resources/test_keeper_config.xml +++ b/tests/jepsen.clickhouse-keeper/resources/test_keeper_config.xml @@ -1,5 +1,5 @@ - + 9181 {id} @@ -32,5 +32,5 @@ 9444 - + diff --git a/tests/jepsen.nukeeper/resources/users.xml b/tests/jepsen.clickhouse-keeper/resources/users.xml similarity index 100% rename from tests/jepsen.nukeeper/resources/users.xml rename to tests/jepsen.clickhouse-keeper/resources/users.xml diff --git a/tests/jepsen.nukeeper/src/jepsen/nukeeper/constants.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/constants.clj similarity index 93% rename from tests/jepsen.nukeeper/src/jepsen/nukeeper/constants.clj rename to tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/constants.clj index d6245d450f5..15dafa1a514 100644 --- a/tests/jepsen.nukeeper/src/jepsen/nukeeper/constants.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/constants.clj @@ -1,4 +1,4 @@ -(ns jepsen.nukeeper.constants) +(ns jepsen.clickhouse-keeper.constants) (def common-prefix "/home/robot-clickhouse") diff --git a/tests/jepsen.nukeeper/src/jepsen/nukeeper/counter.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/counter.clj similarity index 94% rename from tests/jepsen.nukeeper/src/jepsen/nukeeper/counter.clj rename to tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/counter.clj index b426a8ea90d..dfccf7dd635 100644 --- a/tests/jepsen.nukeeper/src/jepsen/nukeeper/counter.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/counter.clj @@ -1,11 +1,11 @@ -(ns jepsen.nukeeper.counter +(ns jepsen.clickhouse-keeper.counter (:require [clojure.tools.logging :refer :all] [jepsen [checker :as checker] [client :as client] [generator :as gen]] - [jepsen.nukeeper.utils :refer :all] + [jepsen.clickhouse-keeper.utils :refer :all] [zookeeper :as zk]) (:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException))) diff --git a/tests/jepsen.nukeeper/src/jepsen/nukeeper/db.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/db.clj similarity index 94% rename from tests/jepsen.nukeeper/src/jepsen/nukeeper/db.clj rename to tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/db.clj index d82d628cc95..7e93a6e8a81 100644 --- a/tests/jepsen.nukeeper/src/jepsen/nukeeper/db.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/db.clj @@ -1,11 +1,11 @@ -(ns jepsen.nukeeper.db +(ns jepsen.clickhouse-keeper.db (:require [clojure.tools.logging :refer :all] [jepsen [control :as c] [db :as db] [util :as util :refer [meh]]] - [jepsen.nukeeper.constants :refer :all] - [jepsen.nukeeper.utils :refer :all] + [jepsen.clickhouse-keeper.constants :refer :all] + [jepsen.clickhouse-keeper.utils :refer :all] [clojure.java.io :as io] [jepsen.control.util :as cu] [jepsen.os.ubuntu :as ubuntu])) @@ -88,7 +88,7 @@ (c/exec :echo (slurp (io/resource "config.xml")) :> (str configs-dir "/config.xml")) (c/exec :echo (slurp (io/resource "users.xml")) :> (str configs-dir "/users.xml")) (c/exec :echo (slurp (io/resource "listen.xml")) :> (str sub-configs-dir "/listen.xml")) - (c/exec :echo (cluster-config test node (slurp (io/resource "test_keeper_config.xml"))) :> (str sub-configs-dir "/test_keeper_config.xml"))) + (c/exec :echo (cluster-config test node (slurp (io/resource "keeper_config.xml"))) :> (str sub-configs-dir "/keeper_config.xml"))) (defn db [version reuse-binary] diff --git a/tests/jepsen.nukeeper/src/jepsen/nukeeper/main.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/main.clj similarity index 90% rename from tests/jepsen.nukeeper/src/jepsen/nukeeper/main.clj rename to tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/main.clj index b9439097e85..f88026500e6 100644 --- a/tests/jepsen.nukeeper/src/jepsen/nukeeper/main.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/main.clj @@ -1,15 +1,15 @@ -(ns jepsen.nukeeper.main +(ns jepsen.clickhouse-keeper.main (:require [clojure.tools.logging :refer :all] - [jepsen.nukeeper.utils :refer :all] + [jepsen.clickhouse-keeper.utils :refer :all] [clojure.pprint :refer [pprint]] - [jepsen.nukeeper.set :as set] - [jepsen.nukeeper.db :refer :all] - [jepsen.nukeeper.nemesis :as custom-nemesis] - [jepsen.nukeeper.register :as register] - [jepsen.nukeeper.unique :as unique] - [jepsen.nukeeper.queue :as queue] - [jepsen.nukeeper.counter :as counter] - [jepsen.nukeeper.constants :refer :all] + [jepsen.clickhouse-keeper.set :as set] + [jepsen.clickhouse-keeper.db :refer :all] + [jepsen.clickhouse-keeper.nemesis :as custom-nemesis] + [jepsen.clickhouse-keeper.register :as register] + [jepsen.clickhouse-keeper.unique :as unique] + [jepsen.clickhouse-keeper.queue :as queue] + [jepsen.clickhouse-keeper.counter :as counter] + [jepsen.clickhouse-keeper.constants :refer :all] [clojure.string :as str] [jepsen [checker :as checker] @@ -75,7 +75,7 @@ ["-c" "--clickhouse-source URL" "URL for clickhouse deb or tgz package" :default "https://clickhouse-builds.s3.yandex.net/21677/ef82333089156907a0979669d9374c2e18daabe5/clickhouse_build_check/clang-11_relwithdebuginfo_none_bundled_unsplitted_disable_False_deb/clickhouse-common-static_21.4.1.6313_amd64.deb"]]) -(defn nukeeper-test +(defn clickhouse-keeper-test "Given an options map from the command line runner (e.g. :nodes, :ssh, :concurrency, ...), constructs a test map." [opts] @@ -151,9 +151,9 @@ [& args] (.setLevel (LoggerFactory/getLogger "org.apache.zookeeper") Level/OFF) - (cli/run! (merge (cli/single-test-cmd {:test-fn nukeeper-test + (cli/run! (merge (cli/single-test-cmd {:test-fn clickhouse-keeper-test :opt-spec cli-opts}) - (cli/test-all-cmd {:tests-fn (partial all-tests nukeeper-test) + (cli/test-all-cmd {:tests-fn (partial all-tests clickhouse-keeper-test) :opt-spec cli-opts}) (cli/serve-cmd)) args)) diff --git a/tests/jepsen.nukeeper/src/jepsen/nukeeper/nemesis.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/nemesis.clj similarity index 97% rename from tests/jepsen.nukeeper/src/jepsen/nukeeper/nemesis.clj rename to tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/nemesis.clj index 7d4941cdc8e..caf59d3a25f 100644 --- a/tests/jepsen.nukeeper/src/jepsen/nukeeper/nemesis.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/nemesis.clj @@ -1,12 +1,12 @@ -(ns jepsen.nukeeper.nemesis +(ns jepsen.clickhouse-keeper.nemesis (:require [clojure.tools.logging :refer :all] [jepsen [nemesis :as nemesis] [control :as c] [generator :as gen]] - [jepsen.nukeeper.constants :refer :all] - [jepsen.nukeeper.utils :refer :all])) + [jepsen.clickhouse-keeper.constants :refer :all] + [jepsen.clickhouse-keeper.utils :refer :all])) (defn random-node-killer-nemesis [] diff --git a/tests/jepsen.nukeeper/src/jepsen/nukeeper/queue.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/queue.clj similarity index 97% rename from tests/jepsen.nukeeper/src/jepsen/nukeeper/queue.clj rename to tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/queue.clj index 308778983aa..30ff7c01ec4 100644 --- a/tests/jepsen.nukeeper/src/jepsen/nukeeper/queue.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/queue.clj @@ -1,4 +1,4 @@ -(ns jepsen.nukeeper.queue +(ns jepsen.clickhouse-keeper.queue (:require [clojure.tools.logging :refer :all] [jepsen @@ -7,7 +7,7 @@ [generator :as gen]] [knossos.model :as model] [jepsen.checker.timeline :as timeline] - [jepsen.nukeeper.utils :refer :all] + [jepsen.clickhouse-keeper.utils :refer :all] [zookeeper :as zk]) (:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException))) diff --git a/tests/jepsen.nukeeper/src/jepsen/nukeeper/register.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/register.clj similarity index 96% rename from tests/jepsen.nukeeper/src/jepsen/nukeeper/register.clj rename to tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/register.clj index 98322845346..b2f381168bd 100644 --- a/tests/jepsen.nukeeper/src/jepsen/nukeeper/register.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/register.clj @@ -1,4 +1,4 @@ -(ns jepsen.nukeeper.register +(ns jepsen.clickhouse-keeper.register (:require [jepsen [checker :as checker] [client :as client] @@ -6,7 +6,7 @@ [generator :as gen]] [jepsen.checker.timeline :as timeline] [knossos.model :as model] - [jepsen.nukeeper.utils :refer :all] + [jepsen.clickhouse-keeper.utils :refer :all] [zookeeper :as zk]) (:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException))) diff --git a/tests/jepsen.nukeeper/src/jepsen/nukeeper/set.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/set.clj similarity index 94% rename from tests/jepsen.nukeeper/src/jepsen/nukeeper/set.clj rename to tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/set.clj index f9d21a8dc62..a05338a7bc4 100644 --- a/tests/jepsen.nukeeper/src/jepsen/nukeeper/set.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/set.clj @@ -1,11 +1,11 @@ -(ns jepsen.nukeeper.set +(ns jepsen.clickhouse-keeper.set (:require [clojure.tools.logging :refer :all] [jepsen [checker :as checker] [client :as client] [generator :as gen]] - [jepsen.nukeeper.utils :refer :all] + [jepsen.clickhouse-keeper.utils :refer :all] [zookeeper :as zk]) (:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException))) diff --git a/tests/jepsen.nukeeper/src/jepsen/nukeeper/unique.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/unique.clj similarity index 92% rename from tests/jepsen.nukeeper/src/jepsen/nukeeper/unique.clj rename to tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/unique.clj index 9dfb906bc17..c50f33924e0 100644 --- a/tests/jepsen.nukeeper/src/jepsen/nukeeper/unique.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/unique.clj @@ -1,11 +1,11 @@ -(ns jepsen.nukeeper.unique +(ns jepsen.clickhouse-keeper.unique (:require [clojure.tools.logging :refer :all] [jepsen [checker :as checker] [client :as client] [generator :as gen]] - [jepsen.nukeeper.utils :refer :all] + [jepsen.clickhouse-keeper.utils :refer :all] [zookeeper :as zk]) (:import (org.apache.zookeeper ZooKeeper KeeperException KeeperException$BadVersionException))) diff --git a/tests/jepsen.nukeeper/src/jepsen/nukeeper/utils.clj b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/utils.clj similarity index 96% rename from tests/jepsen.nukeeper/src/jepsen/nukeeper/utils.clj rename to tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/utils.clj index cfe9add238b..ffb948041d1 100644 --- a/tests/jepsen.nukeeper/src/jepsen/nukeeper/utils.clj +++ b/tests/jepsen.clickhouse-keeper/src/jepsen/clickhouse-keeper/utils.clj @@ -1,10 +1,10 @@ -(ns jepsen.nukeeper.utils +(ns jepsen.clickhouse-keeper.utils (:require [clojure.string :as str] [zookeeper.data :as data] [zookeeper :as zk] [zookeeper.internal :as zi] [jepsen.control.util :as cu] - [jepsen.nukeeper.constants :refer :all] + [jepsen.clickhouse-keeper.constants :refer :all] [jepsen.control :as c] [clojure.tools.logging :refer :all]) (:import (org.apache.zookeeper.data Stat) @@ -164,8 +164,8 @@ :--top_level_domains_path (str data-dir "/top_level_domains") :--logger.log (str logs-dir "/clickhouse-server.log") :--logger.errorlog (str logs-dir "/clickhouse-server.err.log") - :--test_keeper_server.snapshot_storage_path coordination-snapshots-dir - :--test_keeper_server.logs_storage_path coordination-logs-dir) + :--keeper_server.snapshot_storage_path coordination-snapshots-dir + :--keeper_server.logs_storage_path coordination-logs-dir) (wait-clickhouse-alive! node test))) (defn exec-with-retries diff --git a/tests/jepsen.nukeeper/test/jepsen/nukeeper_test.clj b/tests/jepsen.clickhouse-keeper/test/jepsen/nukeeper_test.clj similarity index 100% rename from tests/jepsen.nukeeper/test/jepsen/nukeeper_test.clj rename to tests/jepsen.clickhouse-keeper/test/jepsen/nukeeper_test.clj