From 377cc208d08160183b58ca951ffab2cd4d40d2e5 Mon Sep 17 00:00:00 2001 From: zhangxiao871 <821008736@qq.com> Date: Thu, 6 Jan 2022 21:14:45 +0800 Subject: [PATCH 1/6] add time --- src/Coordination/KeeperServer.cpp | 3 + src/Coordination/KeeperStateMachine.cpp | 3 +- src/Coordination/KeeperStorage.cpp | 46 +++---- src/Coordination/KeeperStorage.h | 3 +- .../test_keeper_znode_time/__init__.py | 1 + .../configs/enable_keeper1.xml | 41 ++++++ .../configs/enable_keeper2.xml | 41 ++++++ .../configs/enable_keeper3.xml | 41 ++++++ .../configs/use_keeper.xml | 16 +++ .../test_keeper_znode_time/test.py | 129 ++++++++++++++++++ 10 files changed, 299 insertions(+), 25 deletions(-) create mode 100644 tests/integration/test_keeper_znode_time/__init__.py create mode 100644 tests/integration/test_keeper_znode_time/configs/enable_keeper1.xml create mode 100644 tests/integration/test_keeper_znode_time/configs/enable_keeper2.xml create mode 100644 tests/integration/test_keeper_znode_time/configs/enable_keeper3.xml create mode 100644 tests/integration/test_keeper_znode_time/configs/use_keeper.xml create mode 100644 tests/integration/test_keeper_znode_time/test.py diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index 82ea100bccb..27c385d0e93 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -263,6 +263,9 @@ nuraft::ptr getZooKeeperLogEntry(int64_t session_id, const Coord { DB::WriteBufferFromNuraftBuffer buf; DB::writeIntBinary(session_id, buf); + using namespace std::chrono; + int64_t time = duration_cast(system_clock::now().time_since_epoch()).count(); + DB::writeIntBinary(time, buf); request->write(buf); return buf.getBuffer(); } diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 1ac1a584451..2d8c07e1ab8 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -22,6 +22,7 @@ namespace ReadBufferFromNuraftBuffer buffer(data); KeeperStorage::RequestForSession request_for_session; readIntBinary(request_for_session.session_id, buffer); + readIntBinary(request_for_session.time, buffer); int32_t length; Coordination::read(length, buffer); @@ -131,7 +132,7 @@ nuraft::ptr KeeperStateMachine::commit(const uint64_t log_idx, n else { std::lock_guard lock(storage_and_responses_lock); - KeeperStorage::ResponsesForSessions responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx); + KeeperStorage::ResponsesForSessions responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, log_idx); for (auto & response_for_session : responses_for_sessions) if (!responses_queue.push(response_for_session)) throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", response_for_session.session_id); diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index a64a7d425f6..1a19d4b11a0 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -202,7 +202,7 @@ struct KeeperStorageRequestProcessor explicit KeeperStorageRequestProcessor(const Coordination::ZooKeeperRequestPtr & zk_request_) : zk_request(zk_request_) {} - virtual std::pair process(KeeperStorage & storage, int64_t zxid, int64_t session_id) const = 0; + virtual std::pair process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const = 0; virtual KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & /*watches*/, KeeperStorage::Watches & /*list_watches*/) const { return {}; } virtual bool checkAuth(KeeperStorage & /*storage*/, int64_t /*session_id*/) const { return true; } @@ -212,7 +212,7 @@ struct KeeperStorageRequestProcessor struct KeeperStorageHeartbeatRequestProcessor final : public KeeperStorageRequestProcessor { using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */) const override + std::pair process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */, int64_t /* time */) const override { return {zk_request->makeResponse(), {}}; } @@ -221,7 +221,7 @@ struct KeeperStorageHeartbeatRequestProcessor final : public KeeperStorageReques struct KeeperStorageSyncRequestProcessor final : public KeeperStorageRequestProcessor { using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */) const override + std::pair process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */, int64_t /* time */) const override { auto response = zk_request->makeResponse(); dynamic_cast(*response).path @@ -256,7 +256,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr return checkACL(Coordination::ACL::Create, node_acls, session_auths); } - std::pair process(KeeperStorage & storage, int64_t zxid, int64_t session_id) const override + std::pair process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override { auto & container = storage.container; auto & ephemerals = storage.ephemerals; @@ -320,8 +320,8 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr created_node.stat.czxid = zxid; created_node.stat.mzxid = zxid; created_node.stat.pzxid = zxid; - created_node.stat.ctime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1); - created_node.stat.mtime = created_node.stat.ctime; + created_node.stat.ctime = time; + created_node.stat.mtime = time; created_node.stat.numChildren = 0; created_node.stat.dataLength = request.data.length(); created_node.stat.ephemeralOwner = request.is_ephemeral ? session_id : 0; @@ -402,7 +402,7 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce } using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & storage, int64_t /* zxid */, int64_t /* session_id */) const override + std::pair process(KeeperStorage & storage, int64_t /* zxid */, int64_t /* session_id */, int64_t /* time */) const override { auto & container = storage.container; Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); @@ -461,7 +461,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr } using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/) const override + std::pair process(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /* time */) const override { auto & container = storage.container; auto & ephemerals = storage.ephemerals; @@ -545,7 +545,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestProcessor { using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /* session_id */) const override + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /* session_id */, int64_t /* time */) const override { auto & container = storage.container; @@ -586,7 +586,7 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce } using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & storage, int64_t zxid, int64_t /* session_id */) const override + std::pair process(KeeperStorage & storage, int64_t zxid, int64_t /* session_id */, int64_t time) const override { auto & container = storage.container; @@ -605,11 +605,11 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce auto prev_node = it->value; - auto itr = container.updateValue(request.path, [zxid, request] (KeeperStorage::Node & value) + auto itr = container.updateValue(request.path, [zxid, request, time] (KeeperStorage::Node & value) { value.stat.version++; value.stat.mzxid = zxid; - value.stat.mtime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1); + value.stat.mtime = time; value.stat.dataLength = request.data.length(); value.size_bytes = value.size_bytes + request.data.size() - value.data.size(); value.data = request.data; @@ -664,7 +664,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc } using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/) const override + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override { auto & container = storage.container; Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); @@ -709,7 +709,7 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro } using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/) const override + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override { auto & container = storage.container; @@ -754,7 +754,7 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t session_id) const override + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t session_id, int64_t /* time */) const override { auto & container = storage.container; @@ -818,7 +818,7 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr } using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/) const override + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override { Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperGetACLResponse & response = dynamic_cast(*response_ptr); @@ -880,7 +880,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro } } - std::pair process(KeeperStorage & storage, int64_t zxid, int64_t session_id) const override + std::pair process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override { Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Coordination::ZooKeeperMultiResponse & response = dynamic_cast(*response_ptr); @@ -891,7 +891,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro size_t i = 0; for (const auto & concrete_request : concrete_requests) { - auto [ cur_response, undo_action ] = concrete_request->process(storage, zxid, session_id); + auto [ cur_response, undo_action ] = concrete_request->process(storage, zxid, session_id, time); response.responses[i] = cur_response; if (cur_response->error != Coordination::Error::ZOK) @@ -948,7 +948,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro struct KeeperStorageCloseRequestProcessor final : public KeeperStorageRequestProcessor { using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage &, int64_t, int64_t) const override + std::pair process(KeeperStorage &, int64_t, int64_t, int64_t /* time */) const override { throw DB::Exception("Called process on close request", ErrorCodes::LOGICAL_ERROR); } @@ -957,7 +957,7 @@ struct KeeperStorageCloseRequestProcessor final : public KeeperStorageRequestPro struct KeeperStorageAuthRequestProcessor final : public KeeperStorageRequestProcessor { using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor; - std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t session_id) const override + std::pair process(KeeperStorage & storage, int64_t /*zxid*/, int64_t session_id, int64_t /* time */) const override { Coordination::ZooKeeperAuthRequest & auth_request = dynamic_cast(*zk_request); Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); @@ -1070,7 +1070,7 @@ KeeperStorageRequestProcessorsFactory::KeeperStorageRequestProcessorsFactory() } -KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id, std::optional new_last_zxid, bool check_acl) +KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id, int64_t time, std::optional new_last_zxid, bool check_acl) { KeeperStorage::ResponsesForSessions results; if (new_last_zxid) @@ -1120,7 +1120,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat) /// Heartbeat request is also special { KeeperStorageRequestProcessorPtr storage_request = KeeperStorageRequestProcessorsFactory::instance().get(zk_request); - auto [response, _] = storage_request->process(*this, zxid, session_id); + auto [response, _] = storage_request->process(*this, zxid, session_id, time); response->xid = zk_request->xid; response->zxid = getZXID(); @@ -1139,7 +1139,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina } else { - std::tie(response, std::ignore) = request_processor->process(*this, zxid, session_id); + std::tie(response, std::ignore) = request_processor->process(*this, zxid, session_id, time); } /// Watches for this requests are added to the watches lists diff --git a/src/Coordination/KeeperStorage.h b/src/Coordination/KeeperStorage.h index f61b17a88a6..c82591ac8e2 100644 --- a/src/Coordination/KeeperStorage.h +++ b/src/Coordination/KeeperStorage.h @@ -63,6 +63,7 @@ public: struct RequestForSession { int64_t session_id; + int64_t time; Coordination::ZooKeeperRequestPtr request; }; @@ -150,7 +151,7 @@ public: /// Process user request and return response. /// check_acl = false only when converting data from ZooKeeper. - ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, std::optional new_last_zxid, bool check_acl = true); + ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, int64_t time, std::optional new_last_zxid, bool check_acl = true); void finalize(); diff --git a/tests/integration/test_keeper_znode_time/__init__.py b/tests/integration/test_keeper_znode_time/__init__.py new file mode 100644 index 00000000000..e5a0d9b4834 --- /dev/null +++ b/tests/integration/test_keeper_znode_time/__init__.py @@ -0,0 +1 @@ +#!/usr/bin/env python3 diff --git a/tests/integration/test_keeper_znode_time/configs/enable_keeper1.xml b/tests/integration/test_keeper_znode_time/configs/enable_keeper1.xml new file mode 100644 index 00000000000..17455ed12f5 --- /dev/null +++ b/tests/integration/test_keeper_znode_time/configs/enable_keeper1.xml @@ -0,0 +1,41 @@ + + + 9181 + 1 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + + 5000 + 10000 + 75 + trace + + + + + 1 + node1 + 9234 + true + 3 + + + 2 + node2 + 9234 + true + true + 2 + + + 3 + node3 + 9234 + true + true + 1 + + + + diff --git a/tests/integration/test_keeper_znode_time/configs/enable_keeper2.xml b/tests/integration/test_keeper_znode_time/configs/enable_keeper2.xml new file mode 100644 index 00000000000..03a23984cc2 --- /dev/null +++ b/tests/integration/test_keeper_znode_time/configs/enable_keeper2.xml @@ -0,0 +1,41 @@ + + + 9181 + 2 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + + 5000 + 10000 + 75 + trace + + + + + 1 + node1 + 9234 + true + 3 + + + 2 + node2 + 9234 + true + true + 2 + + + 3 + node3 + 9234 + true + true + 1 + + + + diff --git a/tests/integration/test_keeper_znode_time/configs/enable_keeper3.xml b/tests/integration/test_keeper_znode_time/configs/enable_keeper3.xml new file mode 100644 index 00000000000..a3196ac3061 --- /dev/null +++ b/tests/integration/test_keeper_znode_time/configs/enable_keeper3.xml @@ -0,0 +1,41 @@ + + + 9181 + 3 + /var/lib/clickhouse/coordination/log + /var/lib/clickhouse/coordination/snapshots + + + 5000 + 10000 + 75 + trace + + + + + 1 + node1 + 9234 + true + 3 + + + 2 + node2 + 9234 + true + true + 2 + + + 3 + node3 + 9234 + true + true + 1 + + + + diff --git a/tests/integration/test_keeper_znode_time/configs/use_keeper.xml b/tests/integration/test_keeper_znode_time/configs/use_keeper.xml new file mode 100644 index 00000000000..384e984f210 --- /dev/null +++ b/tests/integration/test_keeper_znode_time/configs/use_keeper.xml @@ -0,0 +1,16 @@ + + + + node1 + 9181 + + + node2 + 9181 + + + node3 + 9181 + + + diff --git a/tests/integration/test_keeper_znode_time/test.py b/tests/integration/test_keeper_znode_time/test.py new file mode 100644 index 00000000000..cbe89970d31 --- /dev/null +++ b/tests/integration/test_keeper_znode_time/test.py @@ -0,0 +1,129 @@ +import pytest +from helpers.cluster import ClickHouseCluster +import random +import string +import os +import time +from multiprocessing.dummy import Pool +from helpers.network import PartitionManager +from helpers.test_tools import assert_eq_with_retry + +cluster = ClickHouseCluster(__file__) +node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml', 'configs/use_keeper.xml'], stay_alive=True) +node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml', 'configs/use_keeper.xml'], stay_alive=True) +node3 = cluster.add_instance('node3', main_configs=['configs/enable_keeper3.xml', 'configs/use_keeper.xml'], stay_alive=True) + +from kazoo.client import KazooClient, KazooState + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + yield cluster + + finally: + cluster.shutdown() + +def smaller_exception(ex): + return '\n'.join(str(ex).split('\n')[0:2]) + +def wait_node(node): + for _ in range(100): + zk = None + try: + node.query("SELECT * FROM system.zookeeper WHERE path = '/'") + zk = get_fake_zk(node.name, timeout=30.0) + zk.create("/test", sequence=True) + print("node", node.name, "ready") + break + except Exception as ex: + time.sleep(0.2) + print("Waiting until", node.name, "will be ready, exception", ex) + finally: + if zk: + zk.stop() + zk.close() + else: + raise Exception("Can't wait node", node.name, "to become ready") + +def wait_nodes(): + for node in [node1, node2, node3]: + wait_node(node) + + +def get_fake_zk(nodename, timeout=30.0): + _fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout) + _fake_zk_instance.start() + return _fake_zk_instance + +def assert_eq_stats(stat1, stat2): + assert stat1.version == stat2.version + assert stat1.cversion == stat2.cversion + assert stat1.aversion == stat2.aversion + assert stat1.aversion == stat2.aversion + assert stat1.dataLength == stat2.dataLength + assert stat1.numChildren == stat2.numChildren + assert stat1.ctime == stat2.ctime + assert stat1.mtime == stat2.mtime + +def test_between_servers(started_cluster): + try: + wait_nodes() + node1_zk = get_fake_zk("node1") + node2_zk = get_fake_zk("node2") + node3_zk = get_fake_zk("node3") + + node1_zk.create("/test_between_servers") + for child_node in range(1000): + node1_zk.create("/test_between_servers/" + str(child_node)) + + for child_node in range(1000): + node1_zk.set("/test_between_servers/" + str(child_node), b"somevalue") + + for child_node in range(1000): + stats1 = node1_zk.exists("/test_between_servers/" + str(child_node)) + stats2 = node2_zk.exists("/test_between_servers/" + str(child_node)) + stats3 = node3_zk.exists("/test_between_servers/" + str(child_node)) + assert_eq_stats(stats1, stats2) + assert_eq_stats(stats2, stats3) + + finally: + try: + for zk_conn in [node1_zk, node2_zk, node3_zk]: + zk_conn.stop() + zk_conn.close() + except: + pass + + +def test_server_restart(started_cluster): + try: + wait_nodes() + node1_zk = get_fake_zk("node1") + + node1_zk.create("/test_server_restart") + for child_node in range(1000): + node1_zk.create("/test_server_restart/" + str(child_node)) + + for child_node in range(1000): + node1_zk.set("/test_server_restart/" + str(child_node), b"somevalue") + + node3.restart_clickhouse(kill=True) + + node2_zk = get_fake_zk("node2") + node3_zk = get_fake_zk("node3") + for child_node in range(1000): + stats1 = node1_zk.exists("/test_between_servers/" + str(child_node)) + stats2 = node2_zk.exists("/test_between_servers/" + str(child_node)) + stats3 = node3_zk.exists("/test_between_servers/" + str(child_node)) + assert_eq_stats(stats1, stats2) + assert_eq_stats(stats2, stats3) + + finally: + try: + for zk_conn in [node1_zk, node2_zk, node3_zk]: + zk_conn.stop() + zk_conn.close() + except: + pass From 06a6e7f583bc7b89f8dc9877f01099d82f0abe8a Mon Sep 17 00:00:00 2001 From: zhangxiao871 <821008736@qq.com> Date: Thu, 6 Jan 2022 22:44:01 +0800 Subject: [PATCH 2/6] fix build --- src/Coordination/KeeperDispatcher.cpp | 4 +++- src/Coordination/KeeperServer.cpp | 8 +++----- src/Coordination/KeeperStateMachine.cpp | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 438e337b64f..0e19f6934b7 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -240,6 +240,8 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ KeeperStorage::RequestForSession request_info; request_info.request = request; + using namespace std::chrono; + request_info.time = duration_cast(system_clock::now().time_since_epoch()).count(); request_info.session_id = session_id; std::lock_guard lock(push_request_mutex); @@ -433,7 +435,7 @@ void KeeperDispatcher::finishSession(int64_t session_id) void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSessions & requests_for_sessions, Coordination::Error error) { - for (const auto & [session_id, request] : requests_for_sessions) + for (const auto & [session_id, time, request] : requests_for_sessions) { KeeperStorage::ResponsesForSessions responses; auto response = request->makeResponse(); diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index 27c385d0e93..035239ac1fe 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -259,12 +259,10 @@ void KeeperServer::shutdown() namespace { -nuraft::ptr getZooKeeperLogEntry(int64_t session_id, const Coordination::ZooKeeperRequestPtr & request) +nuraft::ptr getZooKeeperLogEntry(int64_t session_id, int64_t time, const Coordination::ZooKeeperRequestPtr & request) { DB::WriteBufferFromNuraftBuffer buf; DB::writeIntBinary(session_id, buf); - using namespace std::chrono; - int64_t time = duration_cast(system_clock::now().time_since_epoch()).count(); DB::writeIntBinary(time, buf); request->write(buf); return buf.getBuffer(); @@ -285,8 +283,8 @@ RaftAppendResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForS { std::vector> entries; - for (const auto & [session_id, request] : requests_for_sessions) - entries.push_back(getZooKeeperLogEntry(session_id, request)); + for (const auto & [session_id, time, request] : requests_for_sessions) + entries.push_back(getZooKeeperLogEntry(session_id, time, request)); { std::lock_guard lock(append_entries_mutex); diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 2d8c07e1ab8..f9a4898d606 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -321,7 +321,7 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi { /// Pure local request, just process it with storage std::lock_guard lock(storage_and_responses_lock); - auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt); + auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, std::nullopt); for (const auto & response : responses) if (!responses_queue.push(response)) throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", response.session_id); From f9342ee2be501fc16708b5b8e941a1909c0107d9 Mon Sep 17 00:00:00 2001 From: zhangxiao871 <821008736@qq.com> Date: Wed, 12 Jan 2022 17:51:18 +0800 Subject: [PATCH 3/6] try fix build and test --- src/Coordination/ZooKeeperDataReader.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Coordination/ZooKeeperDataReader.cpp b/src/Coordination/ZooKeeperDataReader.cpp index 4f71274291b..cc6bbad6c60 100644 --- a/src/Coordination/ZooKeeperDataReader.cpp +++ b/src/Coordination/ZooKeeperDataReader.cpp @@ -529,7 +529,7 @@ bool deserializeTxn(KeeperStorage & storage, ReadBuffer & in, Poco::Logger * /*l if (request->getOpNum() == Coordination::OpNum::Multi && hasErrorsInMultiRequest(request)) return true; - storage.processRequest(request, session_id, zxid, /* check_acl = */ false); + storage.processRequest(request, session_id, time, zxid, /* check_acl = */ false); } } From 264cb193bdfbbbed429f56e7f22a69bb719777a0 Mon Sep 17 00:00:00 2001 From: zhangxiao871 <821008736@qq.com> Date: Thu, 13 Jan 2022 17:49:39 +0800 Subject: [PATCH 4/6] fix use-of-uninitialized-value --- src/Coordination/KeeperDispatcher.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 0e19f6934b7..91992364701 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -402,6 +402,8 @@ void KeeperDispatcher::sessionCleanerTask() request->xid = Coordination::CLOSE_XID; KeeperStorage::RequestForSession request_info; request_info.request = request; + using namespace std::chrono; + request_info.time = duration_cast(system_clock::now().time_since_epoch()).count(); request_info.session_id = dead_session; { std::lock_guard lock(push_request_mutex); @@ -479,6 +481,8 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms) request->server_id = server->getServerID(); request_info.request = request; + using namespace std::chrono; + request_info.time = duration_cast(system_clock::now().time_since_epoch()).count(); request_info.session_id = -1; auto promise = std::make_shared>(); From be3d3886b1ff5de2b8443c3cb5b66702516bd460 Mon Sep 17 00:00:00 2001 From: zhangxiao871 <821008736@qq.com> Date: Thu, 20 Jan 2022 10:49:16 +0800 Subject: [PATCH 5/6] Fix tests --- src/Coordination/tests/gtest_coordination.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index d274ee34a88..41bf3e70642 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -1218,6 +1218,9 @@ nuraft::ptr getBufferFromZKRequest(int64_t session_id, const Coo { DB::WriteBufferFromNuraftBuffer buf; DB::writeIntBinary(session_id, buf); + using namespace std::chrono; + auto time = duration_cast(system_clock::now().time_since_epoch()).count(); + DB::writeIntBinary(time, buf); request->write(buf); return buf.getBuffer(); } From ad124a4d10b023374c0fd5c1560ccaa92e2232ce Mon Sep 17 00:00:00 2001 From: zhangxiao871 <821008736@qq.com> Date: Tue, 22 Feb 2022 11:10:14 +0800 Subject: [PATCH 6/6] compatible. --- src/Coordination/KeeperServer.cpp | 2 +- src/Coordination/KeeperStateMachine.cpp | 3 ++- src/Coordination/tests/gtest_coordination.cpp | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/Coordination/KeeperServer.cpp b/src/Coordination/KeeperServer.cpp index 925f4fd6087..3647cad4333 100644 --- a/src/Coordination/KeeperServer.cpp +++ b/src/Coordination/KeeperServer.cpp @@ -264,8 +264,8 @@ nuraft::ptr getZooKeeperLogEntry(int64_t session_id, int64_t tim { DB::WriteBufferFromNuraftBuffer buf; DB::writeIntBinary(session_id, buf); - DB::writeIntBinary(time, buf); request->write(buf); + DB::writeIntBinary(time, buf); return buf.getBuffer(); } diff --git a/src/Coordination/KeeperStateMachine.cpp b/src/Coordination/KeeperStateMachine.cpp index 8928fdb4575..bd44b8fbd5a 100644 --- a/src/Coordination/KeeperStateMachine.cpp +++ b/src/Coordination/KeeperStateMachine.cpp @@ -22,7 +22,6 @@ namespace ReadBufferFromNuraftBuffer buffer(data); KeeperStorage::RequestForSession request_for_session; readIntBinary(request_for_session.session_id, buffer); - readIntBinary(request_for_session.time, buffer); int32_t length; Coordination::read(length, buffer); @@ -37,6 +36,8 @@ namespace request_for_session.request = Coordination::ZooKeeperRequestFactory::instance().get(opnum); request_for_session.request->xid = xid; request_for_session.request->readImpl(buffer); + + readIntBinary(request_for_session.time, buffer); return request_for_session; } } diff --git a/src/Coordination/tests/gtest_coordination.cpp b/src/Coordination/tests/gtest_coordination.cpp index 820e522cf84..47facf7166b 100644 --- a/src/Coordination/tests/gtest_coordination.cpp +++ b/src/Coordination/tests/gtest_coordination.cpp @@ -1218,10 +1218,10 @@ nuraft::ptr getBufferFromZKRequest(int64_t session_id, const Coo { DB::WriteBufferFromNuraftBuffer buf; DB::writeIntBinary(session_id, buf); + request->write(buf); using namespace std::chrono; auto time = duration_cast(system_clock::now().time_since_epoch()).count(); DB::writeIntBinary(time, buf); - request->write(buf); return buf.getBuffer(); }