diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 8423f10f3a6..86e22b834d4 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -240,6 +240,8 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ KeeperStorage::RequestForSession request_info; request_info.request = request; + using namespace std::chrono; + request_info.time = duration_cast(system_clock::now().time_since_epoch()).count(); request_info.session_id = session_id; std::lock_guard lock(push_request_mutex); @@ -400,6 +402,8 @@ void KeeperDispatcher::sessionCleanerTask() request->xid = Coordination::CLOSE_XID; KeeperStorage::RequestForSession request_info; request_info.request = request; + using namespace std::chrono; + request_info.time = duration_cast(system_clock::now().time_since_epoch()).count(); request_info.session_id = dead_session; { std::lock_guard lock(push_request_mutex); @@ -433,7 +437,7 @@ void KeeperDispatcher::finishSession(int64_t session_id) void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSessions & requests_for_sessions, Coordination::Error error) { - for (const auto & [session_id, request] : requests_for_sessions) + for (const auto & [session_id, time, request] : requests_for_sessions) { KeeperStorage::ResponsesForSessions responses; auto response = request->makeResponse(); @@ -477,6 +481,8 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms) request->server_id = server->getServerID(); request_info.request = request; + using namespace std::chrono; + request_info.time = duration_cast(system_clock::now().time_since_epoch()).count(); request_info.session_id = -1; auto promise = std::make_shared>(); diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index 558b28f9d46..30db486dd1b 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -260,11 +260,12 @@ void KeeperServer::shutdown() namespace { -nuraft::ptr getZooKeeperLogEntry(int64_t session_id, const Coordination::ZooKeeperRequestPtr & request) +nuraft::ptr getZooKeeperLogEntry(int64_t session_id, int64_t time, const Coordination::ZooKeeperRequestPtr & request) { DB::WriteBufferFromNuraftBuffer buf; DB::writeIntBinary(session_id, buf); request->write(buf); + DB::writeIntBinary(time, buf); return buf.getBuffer(); } @@ -283,8 +284,8 @@ RaftAppendResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForS { std::vector> entries; - for (const auto & [session_id, request] : requests_for_sessions) - entries.push_back(getZooKeeperLogEntry(session_id, request)); + for (const auto & [session_id, time, request] : requests_for_sessions) + entries.push_back(getZooKeeperLogEntry(session_id, time, request)); return raft_instance->append_entries(entries); } diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 4c8be4abe24..e3d99d4775b 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -38,6 +38,8 @@ namespace request_for_session.request = Coordination::ZooKeeperRequestFactory::instance().get(opnum); request_for_session.request->xid = xid; request_for_session.request->readImpl(buffer); + + readIntBinary(request_for_session.time, buffer); return request_for_session; } } @@ -133,7 +135,7 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n else { std::lock_guard lock(storage_and_responses_lock); - KeeperStorage::ResponsesForSessions responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx); + KeeperStorage::ResponsesForSessions responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, log_idx); for (auto & response_for_session : responses_for_sessions) if (!responses_queue.push(response_for_session)) throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", response_for_session.session_id); @@ -358,7 +360,7 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi { /// Pure local request, just process it with storage std::lock_guard lock(storage_and_responses_lock); - auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt); + auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, std::nullopt); for (const auto & response : responses) if (!responses_queue.push(response)) throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", response.session_id); diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index f6992815a6c..1e6081a628f 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -191,7 +191,7 @@ struct KeeperStorageRequestProcessor explicit KeeperStorageRequestProcessor(const Coordination::ZooKeeperRequestPtr & zk_request_) : zk_request(zk_request_) {} - virtual std::pair process(KeeperStorage & storage, int64_t zxid, int64_t session_id) const = 0; + virtual std::pair process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const = 0; virtual KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & /*watches*/, KeeperStorage::Watches & /*list_watches*/) const { return {}; } virtual bool checkAuth(KeeperStorage & /*storage*/, int64_t /*session_id*/) const { return true; } @@ -201,7 +201,7 @@ struct KeeperStorageRequestProcessor struct KeeperStorageHeartbeatRequestProcessor final : public KeeperStorageRequestProcessor { using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */) const override + std::pair process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */, int64_t /* time */) const override { return {zk_request->makeResponse(), {}}; } @@ -210,7 +210,7 @@ struct KeeperStorageHeartbeatRequestProcessor final : public KeeperStorageReques struct KeeperStorageSyncRequestProcessor final : public KeeperStorageRequestProcessor { using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */) const override + std::pair process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */, int64_t /* time */) const override { auto response = zk_request->makeResponse(); dynamic_cast(*response).path @@ -246,7 +246,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr return checkACL(Coordination::ACL::Create, node_acls, session_auths); } - std::pair process(KeeperStorage & storage, int64_t zxid, int64_t session_id) const override + std::pair process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override { auto & container = storage.container; auto & ephemerals = storage.ephemerals; @@ -309,8 +309,8 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr created_node.stat.czxid = zxid; created_node.stat.mzxid = zxid; created_node.stat.pzxid = zxid; - created_node.stat.ctime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1); - created_node.stat.mtime = created_node.stat.ctime; + created_node.stat.ctime = time; + created_node.stat.mtime = time; created_node.stat.numChildren = 0; created_node.stat.dataLength = request.data.length(); created_node.stat.ephemeralOwner = request.is_ephemeral ? session_id : 0; @@ -394,7 +394,7 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce } using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & storage, int64_t /* zxid */, int64_t /* session_id */) const override + std::pair process(KeeperStorage & storage, int64_t /* zxid */, int64_t /* session_id */, int64_t /* time */) const override { auto & container = storage.container; Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); @@ -453,7 +453,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr } using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/) const override + std::pair process(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /* time */) const override { auto & container = storage.container; auto & ephemerals = storage.ephemerals; @@ -538,7 +538,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestProcessor { using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /* session_id */) const override + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /* session_id */, int64_t /* time */) const override { auto & container = storage.container; @@ -579,7 +579,7 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce } using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & storage, int64_t zxid, int64_t /* session_id */) const override + std::pair process(KeeperStorage & storage, int64_t zxid, int64_t /* session_id */, int64_t time) const override { auto & container = storage.container; @@ -598,11 +598,11 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce auto prev_node = it->value; - auto itr = container.updateValue(request.path, [zxid, request] (KeeperStorage::Node & value) + auto itr = container.updateValue(request.path, [zxid, request, time] (KeeperStorage::Node & value) { value.stat.version++; value.stat.mzxid = zxid; - value.stat.mtime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1); + value.stat.mtime = time; value.stat.dataLength = request.data.length(); value.size_bytes = value.size_bytes + request.data.size() - value.data.size(); value.data = request.data; @@ -657,7 +657,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc } using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/) const override + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override { auto & container = storage.container; Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); @@ -706,7 +706,7 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro } using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/) const override + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override { auto & container = storage.container; @@ -751,7 +751,7 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t session_id) const override + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t session_id, int64_t /* time */) const override { auto & container = storage.container; @@ -815,7 +815,7 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr } using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/) const override + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override { Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperGetACLResponse & response = dynamic_cast(*response_ptr); @@ -877,7 +877,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro } } - std::pair process(KeeperStorage & storage, int64_t zxid, int64_t session_id) const override + std::pair process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override { Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperMultiResponse & response = dynamic_cast(*response_ptr); @@ -888,7 +888,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro size_t i = 0; for (const auto & concrete_request : concrete_requests) { - auto [ cur_response, undo_action ] = concrete_request->process(storage, zxid, session_id); + auto [ cur_response, undo_action ] = concrete_request->process(storage, zxid, session_id, time); response.responses[i] = cur_response; if (cur_response->error != Coordination::Error::ZOK) @@ -945,7 +945,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro struct KeeperStorageCloseRequestProcessor final : public KeeperStorageRequestProcessor { using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage &, int64_t, int64_t) const override + std::pair process(KeeperStorage &, int64_t, int64_t, int64_t /* time */) const override { throw DB::Exception("Called process on close request", ErrorCodes::LOGICAL_ERROR); } @@ -954,7 +954,7 @@ struct KeeperStorageCloseRequestProcessor final : public KeeperStorageRequestPro struct KeeperStorageAuthRequestProcessor final : public KeeperStorageRequestProcessor { using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t session_id) const override + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t session_id, int64_t /* time */) const override { Coordination::ZooKeeperAuthRequest & auth_request = dynamic_cast(*zk_request); Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); @@ -1067,7 +1067,7 @@ KeeperStorageRequestProcessorsFactory::KeeperStorageRequestProcessorsFactory() } -KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id, std::optional new_last_zxid, bool check_acl) +KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id, int64_t time, std::optional new_last_zxid, bool check_acl) { KeeperStorage::ResponsesForSessions results; if (new_last_zxid) @@ -1119,7 +1119,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat) /// Heartbeat request is also special { KeeperStorageRequestProcessorPtr storage_request = KeeperStorageRequestProcessorsFactory::instance().get(zk_request); - auto [response, _] = storage_request->process(*this, zxid, session_id); + auto [response, _] = storage_request->process(*this, zxid, session_id, time); response->xid = zk_request->xid; response->zxid = getZXID(); @@ -1138,7 +1138,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina } else { - std::tie(response, std::ignore) = request_processor->process(*this, zxid, session_id); + std::tie(response, std::ignore) = request_processor->process(*this, zxid, session_id, time); } /// Watches for this requests are added to the watches lists diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index cbf33be61a0..eef53e4f6ca 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -66,6 +66,7 @@ public: struct RequestForSession { int64_t session_id; + int64_t time; Coordination::ZooKeeperRequestPtr request; }; @@ -153,7 +154,7 @@ public: /// Process user request and return response. /// check_acl = false only when converting data from ZooKeeper. - ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, std::optional new_last_zxid, bool check_acl = true); + ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, int64_t time, std::optional new_last_zxid, bool check_acl = true); void finalize(); diff --git a/src/Coordination/ZooKeeperDataReader.cpp b/src/Coordination/ZooKeeperDataReader.cpp index 5d7b78d6a28..8a3e177c507 100644 --- a/src/Coordination/ZooKeeperDataReader.cpp +++ b/src/Coordination/ZooKeeperDataReader.cpp @@ -518,7 +518,7 @@ bool deserializeTxn(KeeperStorage & storage, ReadBuffer & in, Poco::Logger * /*l if (request->getOpNum() == Coordination::OpNum::Multi && hasErrorsInMultiRequest(request)) return true; - storage.processRequest(request, session_id, zxid, /* check_acl = */ false); + storage.processRequest(request, session_id, time, zxid, /* check_acl = */ false); } } diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 9c434ebb653..47facf7166b 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -1219,6 +1219,9 @@ nuraft::ptr getBufferFromZKRequest(int64_t session_id, const Coo DB::WriteBufferFromNuraftBuffer buf; DB::writeIntBinary(session_id, buf); request->write(buf); + using namespace std::chrono; + auto time = duration_cast(system_clock::now().time_since_epoch()).count(); + DB::writeIntBinary(time, buf); return buf.getBuffer(); } diff --git a/tests/integration/test_keeper_znode_time/__init__.py b/tests/integration/test_keeper_znode_time/__init__.py new file mode 100644 index 00000000000..e5a0d9b4834 --- /dev/null +++ b/tests/integration/test_keeper_znode_time/__init__.py @@ -0,0 +1 @@ +#!/usr/bin/env python3 diff --git a/tests/integration/test_keeper_znode_time/configs/enable_keeper1.xml b/tests/integration/test_keeper_znode_time/configs/enable_keeper1.xml new file mode 100644 index 00000000000..17455ed12f5 --- /dev/null +++ b/tests/integration/test_keeper_znode_time/configs/enable_keeper1.xml @@ -0,0 +1,41 @@ + + + 9181 + 1 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + + 5000 + 10000 + 75 + trace + + + + + 1 + node1 + 9234 + true + 3 + + + 2 + node2 + 9234 + true + true + 2 + + + 3 + node3 + 9234 + true + true + 1 + + + + diff --git a/tests/integration/test_keeper_znode_time/configs/enable_keeper2.xml b/tests/integration/test_keeper_znode_time/configs/enable_keeper2.xml new file mode 100644 index 00000000000..03a23984cc2 --- /dev/null +++ b/tests/integration/test_keeper_znode_time/configs/enable_keeper2.xml @@ -0,0 +1,41 @@ + + + 9181 + 2 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + + 5000 + 10000 + 75 + trace + + + + + 1 + node1 + 9234 + true + 3 + + + 2 + node2 + 9234 + true + true + 2 + + + 3 + node3 + 9234 + true + true + 1 + + + + diff --git a/tests/integration/test_keeper_znode_time/configs/enable_keeper3.xml b/tests/integration/test_keeper_znode_time/configs/enable_keeper3.xml new file mode 100644 index 00000000000..a3196ac3061 --- /dev/null +++ b/tests/integration/test_keeper_znode_time/configs/enable_keeper3.xml @@ -0,0 +1,41 @@ + + + 9181 + 3 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + + 5000 + 10000 + 75 + trace + + + + + 1 + node1 + 9234 + true + 3 + + + 2 + node2 + 9234 + true + true + 2 + + + 3 + node3 + 9234 + true + true + 1 + + + + diff --git a/tests/integration/test_keeper_znode_time/configs/use_keeper.xml b/tests/integration/test_keeper_znode_time/configs/use_keeper.xml new file mode 100644 index 00000000000..384e984f210 --- /dev/null +++ b/tests/integration/test_keeper_znode_time/configs/use_keeper.xml @@ -0,0 +1,16 @@ + + + + node1 + 9181 + + + node2 + 9181 + + + node3 + 9181 + + + diff --git a/tests/integration/test_keeper_znode_time/test.py b/tests/integration/test_keeper_znode_time/test.py new file mode 100644 index 00000000000..cbe89970d31 --- /dev/null +++ b/tests/integration/test_keeper_znode_time/test.py @@ -0,0 +1,129 @@ +import pytest +from helpers.cluster import ClickHouseCluster +import random +import string +import os +import time +from multiprocessing.dummy import Pool +from helpers.network import PartitionManager +from helpers.test_tools import assert_eq_with_retry + +cluster = ClickHouseCluster(__file__) +node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml', 'configs/use_keeper.xml'], stay_alive=True) +node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml', 'configs/use_keeper.xml'], stay_alive=True) +node3 = cluster.add_instance('node3', main_configs=['configs/enable_keeper3.xml', 'configs/use_keeper.xml'], stay_alive=True) + +from kazoo.client import KazooClient, KazooState + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + yield cluster + + finally: + cluster.shutdown() + +def smaller_exception(ex): + return '\n'.join(str(ex).split('\n')[0:2]) + +def wait_node(node): + for _ in range(100): + zk = None + try: + node.query("SELECT * FROM system.zookeeper WHERE path = '/'") + zk = get_fake_zk(node.name, timeout=30.0) + zk.create("/test", sequence=True) + print("node", node.name, "ready") + break + except Exception as ex: + time.sleep(0.2) + print("Waiting until", node.name, "will be ready, exception", ex) + finally: + if zk: + zk.stop() + zk.close() + else: + raise Exception("Can't wait node", node.name, "to become ready") + +def wait_nodes(): + for node in [node1, node2, node3]: + wait_node(node) + + +def get_fake_zk(nodename, timeout=30.0): + _fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout) + _fake_zk_instance.start() + return _fake_zk_instance + +def assert_eq_stats(stat1, stat2): + assert stat1.version == stat2.version + assert stat1.cversion == stat2.cversion + assert stat1.aversion == stat2.aversion + assert stat1.aversion == stat2.aversion + assert stat1.dataLength == stat2.dataLength + assert stat1.numChildren == stat2.numChildren + assert stat1.ctime == stat2.ctime + assert stat1.mtime == stat2.mtime + +def test_between_servers(started_cluster): + try: + wait_nodes() + node1_zk = get_fake_zk("node1") + node2_zk = get_fake_zk("node2") + node3_zk = get_fake_zk("node3") + + node1_zk.create("/test_between_servers") + for child_node in range(1000): + node1_zk.create("/test_between_servers/" + str(child_node)) + + for child_node in range(1000): + node1_zk.set("/test_between_servers/" + str(child_node), b"somevalue") + + for child_node in range(1000): + stats1 = node1_zk.exists("/test_between_servers/" + str(child_node)) + stats2 = node2_zk.exists("/test_between_servers/" + str(child_node)) + stats3 = node3_zk.exists("/test_between_servers/" + str(child_node)) + assert_eq_stats(stats1, stats2) + assert_eq_stats(stats2, stats3) + + finally: + try: + for zk_conn in [node1_zk, node2_zk, node3_zk]: + zk_conn.stop() + zk_conn.close() + except: + pass + + +def test_server_restart(started_cluster): + try: + wait_nodes() + node1_zk = get_fake_zk("node1") + + node1_zk.create("/test_server_restart") + for child_node in range(1000): + node1_zk.create("/test_server_restart/" + str(child_node)) + + for child_node in range(1000): + node1_zk.set("/test_server_restart/" + str(child_node), b"somevalue") + + node3.restart_clickhouse(kill=True) + + node2_zk = get_fake_zk("node2") + node3_zk = get_fake_zk("node3") + for child_node in range(1000): + stats1 = node1_zk.exists("/test_between_servers/" + str(child_node)) + stats2 = node2_zk.exists("/test_between_servers/" + str(child_node)) + stats3 = node3_zk.exists("/test_between_servers/" + str(child_node)) + assert_eq_stats(stats1, stats2) + assert_eq_stats(stats2, stats3) + + finally: + try: + for zk_conn in [node1_zk, node2_zk, node3_zk]: + zk_conn.stop() + zk_conn.close() + except: + pass