Merge pull request #33441 from nicelulu/znode_time

Make the znode ctime and mtime consistent between servers.
This commit is contained in:
Dmitry Novik 2022-02-22 06:55:46 -08:00 committed by GitHub
commit f85d8cd3b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 313 additions and 31 deletions

View File

@ -240,6 +240,8 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
KeeperStorage::RequestForSession request_info;
request_info.request = request;
using namespace std::chrono;
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
request_info.session_id = session_id;
std::lock_guard lock(push_request_mutex);
@ -400,6 +402,8 @@ void KeeperDispatcher::sessionCleanerTask()
request->xid = Coordination::CLOSE_XID;
KeeperStorage::RequestForSession request_info;
request_info.request = request;
using namespace std::chrono;
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
request_info.session_id = dead_session;
{
std::lock_guard lock(push_request_mutex);
@ -433,7 +437,7 @@ void KeeperDispatcher::finishSession(int64_t session_id)
void KeeperDispatcher::addErrorResponses(const KeeperStorage::RequestsForSessions & requests_for_sessions, Coordination::Error error)
{
for (const auto & [session_id, request] : requests_for_sessions)
for (const auto & [session_id, time, request] : requests_for_sessions)
{
KeeperStorage::ResponsesForSessions responses;
auto response = request->makeResponse();
@ -477,6 +481,8 @@ int64_t KeeperDispatcher::getSessionID(int64_t session_timeout_ms)
request->server_id = server->getServerID();
request_info.request = request;
using namespace std::chrono;
request_info.time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
request_info.session_id = -1;
auto promise = std::make_shared<std::promise<int64_t>>();

View File

@ -260,11 +260,12 @@ void KeeperServer::shutdown()
namespace
{
nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, const Coordination::ZooKeeperRequestPtr & request)
nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, int64_t time, const Coordination::ZooKeeperRequestPtr & request)
{
DB::WriteBufferFromNuraftBuffer buf;
DB::writeIntBinary(session_id, buf);
request->write(buf);
DB::writeIntBinary(time, buf);
return buf.getBuffer();
}
@ -283,8 +284,8 @@ RaftAppendResult KeeperServer::putRequestBatch(const KeeperStorage::RequestsForS
{
std::vector<nuraft::ptr<nuraft::buffer>> entries;
for (const auto & [session_id, request] : requests_for_sessions)
entries.push_back(getZooKeeperLogEntry(session_id, request));
for (const auto & [session_id, time, request] : requests_for_sessions)
entries.push_back(getZooKeeperLogEntry(session_id, time, request));
return raft_instance->append_entries(entries);
}

View File

@ -38,6 +38,8 @@ namespace
request_for_session.request = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
request_for_session.request->xid = xid;
request_for_session.request->readImpl(buffer);
readIntBinary(request_for_session.time, buffer);
return request_for_session;
}
}
@ -133,7 +135,7 @@ nuraft::ptr<nuraft::buffer> KeeperStateMachine::commit(const uint64_t log_idx, n
else
{
std::lock_guard lock(storage_and_responses_lock);
KeeperStorage::ResponsesForSessions responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, log_idx);
KeeperStorage::ResponsesForSessions responses_for_sessions = storage->processRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, log_idx);
for (auto & response_for_session : responses_for_sessions)
if (!responses_queue.push(response_for_session))
throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", response_for_session.session_id);
@ -358,7 +360,7 @@ void KeeperStateMachine::processReadRequest(const KeeperStorage::RequestForSessi
{
/// Pure local request, just process it with storage
std::lock_guard lock(storage_and_responses_lock);
auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, std::nullopt);
auto responses = storage->processRequest(request_for_session.request, request_for_session.session_id, request_for_session.time, std::nullopt);
for (const auto & response : responses)
if (!responses_queue.push(response))
throw Exception(ErrorCodes::SYSTEM_ERROR, "Could not push response with session id {} into responses queue", response.session_id);

View File

@ -191,7 +191,7 @@ struct KeeperStorageRequestProcessor
explicit KeeperStorageRequestProcessor(const Coordination::ZooKeeperRequestPtr & zk_request_)
: zk_request(zk_request_)
{}
virtual std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t session_id) const = 0;
virtual std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const = 0;
virtual KeeperStorage::ResponsesForSessions processWatches(KeeperStorage::Watches & /*watches*/, KeeperStorage::Watches & /*list_watches*/) const { return {}; }
virtual bool checkAuth(KeeperStorage & /*storage*/, int64_t /*session_id*/) const { return true; }
@ -201,7 +201,7 @@ struct KeeperStorageRequestProcessor
struct KeeperStorageHeartbeatRequestProcessor final : public KeeperStorageRequestProcessor
{
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */, int64_t /* time */) const override
{
return {zk_request->makeResponse(), {}};
}
@ -210,7 +210,7 @@ struct KeeperStorageHeartbeatRequestProcessor final : public KeeperStorageReques
struct KeeperStorageSyncRequestProcessor final : public KeeperStorageRequestProcessor
{
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & /* storage */, int64_t /* zxid */, int64_t /* session_id */, int64_t /* time */) const override
{
auto response = zk_request->makeResponse();
dynamic_cast<Coordination::ZooKeeperSyncResponse &>(*response).path
@ -246,7 +246,7 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
return checkACL(Coordination::ACL::Create, node_acls, session_auths);
}
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t session_id) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
auto & container = storage.container;
auto & ephemerals = storage.ephemerals;
@ -309,8 +309,8 @@ struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestPr
created_node.stat.czxid = zxid;
created_node.stat.mzxid = zxid;
created_node.stat.pzxid = zxid;
created_node.stat.ctime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
created_node.stat.mtime = created_node.stat.ctime;
created_node.stat.ctime = time;
created_node.stat.mtime = time;
created_node.stat.numChildren = 0;
created_node.stat.dataLength = request.data.length();
created_node.stat.ephemeralOwner = request.is_ephemeral ? session_id : 0;
@ -394,7 +394,7 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
}
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /* zxid */, int64_t /* session_id */) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /* zxid */, int64_t /* session_id */, int64_t /* time */) const override
{
auto & container = storage.container;
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
@ -453,7 +453,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
}
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /* time */) const override
{
auto & container = storage.container;
auto & ephemerals = storage.ephemerals;
@ -538,7 +538,7 @@ struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestPr
struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestProcessor
{
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /* session_id */) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /* session_id */, int64_t /* time */) const override
{
auto & container = storage.container;
@ -579,7 +579,7 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
}
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t /* session_id */) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t /* session_id */, int64_t time) const override
{
auto & container = storage.container;
@ -598,11 +598,11 @@ struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProce
auto prev_node = it->value;
auto itr = container.updateValue(request.path, [zxid, request] (KeeperStorage::Node & value)
auto itr = container.updateValue(request.path, [zxid, request, time] (KeeperStorage::Node & value)
{
value.stat.version++;
value.stat.mzxid = zxid;
value.stat.mtime = std::chrono::system_clock::now().time_since_epoch() / std::chrono::milliseconds(1);
value.stat.mtime = time;
value.stat.dataLength = request.data.length();
value.size_bytes = value.size_bytes + request.data.size() - value.data.size();
value.data = request.data;
@ -657,7 +657,7 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
}
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override
{
auto & container = storage.container;
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
@ -706,7 +706,7 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
}
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override
{
auto & container = storage.container;
@ -751,7 +751,7 @@ struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestPr
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t session_id) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t session_id, int64_t /* time */) const override
{
auto & container = storage.container;
@ -815,7 +815,7 @@ struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestPr
}
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t /*session_id*/, int64_t /* time */) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperGetACLResponse & response = dynamic_cast<Coordination::ZooKeeperGetACLResponse &>(*response_ptr);
@ -877,7 +877,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
}
}
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t session_id) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t zxid, int64_t session_id, int64_t time) const override
{
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
Coordination::ZooKeeperMultiResponse & response = dynamic_cast<Coordination::ZooKeeperMultiResponse &>(*response_ptr);
@ -888,7 +888,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
size_t i = 0;
for (const auto & concrete_request : concrete_requests)
{
auto [ cur_response, undo_action ] = concrete_request->process(storage, zxid, session_id);
auto [ cur_response, undo_action ] = concrete_request->process(storage, zxid, session_id, time);
response.responses[i] = cur_response;
if (cur_response->error != Coordination::Error::ZOK)
@ -945,7 +945,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
struct KeeperStorageCloseRequestProcessor final : public KeeperStorageRequestProcessor
{
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage &, int64_t, int64_t) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage &, int64_t, int64_t, int64_t /* time */) const override
{
throw DB::Exception("Called process on close request", ErrorCodes::LOGICAL_ERROR);
}
@ -954,7 +954,7 @@ struct KeeperStorageCloseRequestProcessor final : public KeeperStorageRequestPro
struct KeeperStorageAuthRequestProcessor final : public KeeperStorageRequestProcessor
{
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t session_id) const override
std::pair<Coordination::ZooKeeperResponsePtr, Undo> process(KeeperStorage & storage, int64_t /*zxid*/, int64_t session_id, int64_t /* time */) const override
{
Coordination::ZooKeeperAuthRequest & auth_request = dynamic_cast<Coordination::ZooKeeperAuthRequest &>(*zk_request);
Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse();
@ -1067,7 +1067,7 @@ KeeperStorageRequestProcessorsFactory::KeeperStorageRequestProcessorsFactory()
}
KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id, std::optional<int64_t> new_last_zxid, bool check_acl)
KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordination::ZooKeeperRequestPtr & zk_request, int64_t session_id, int64_t time, std::optional<int64_t> new_last_zxid, bool check_acl)
{
KeeperStorage::ResponsesForSessions results;
if (new_last_zxid)
@ -1119,7 +1119,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina
else if (zk_request->getOpNum() == Coordination::OpNum::Heartbeat) /// Heartbeat request is also special
{
KeeperStorageRequestProcessorPtr storage_request = KeeperStorageRequestProcessorsFactory::instance().get(zk_request);
auto [response, _] = storage_request->process(*this, zxid, session_id);
auto [response, _] = storage_request->process(*this, zxid, session_id, time);
response->xid = zk_request->xid;
response->zxid = getZXID();
@ -1138,7 +1138,7 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina
}
else
{
std::tie(response, std::ignore) = request_processor->process(*this, zxid, session_id);
std::tie(response, std::ignore) = request_processor->process(*this, zxid, session_id, time);
}
/// Watches for this requests are added to the watches lists

View File

@ -66,6 +66,7 @@ public:
struct RequestForSession
{
int64_t session_id;
int64_t time;
Coordination::ZooKeeperRequestPtr request;
};
@ -153,7 +154,7 @@ public:
/// Process user request and return response.
/// check_acl = false only when converting data from ZooKeeper.
ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, std::optional<int64_t> new_last_zxid, bool check_acl = true);
ResponsesForSessions processRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t session_id, int64_t time, std::optional<int64_t> new_last_zxid, bool check_acl = true);
void finalize();

View File

@ -518,7 +518,7 @@ bool deserializeTxn(KeeperStorage & storage, ReadBuffer & in, Poco::Logger * /*l
if (request->getOpNum() == Coordination::OpNum::Multi && hasErrorsInMultiRequest(request))
return true;
storage.processRequest(request, session_id, zxid, /* check_acl = */ false);
storage.processRequest(request, session_id, time, zxid, /* check_acl = */ false);
}
}

View File

@ -1219,6 +1219,9 @@ nuraft::ptr<nuraft::buffer> getBufferFromZKRequest(int64_t session_id, const Coo
DB::WriteBufferFromNuraftBuffer buf;
DB::writeIntBinary(session_id, buf);
request->write(buf);
using namespace std::chrono;
auto time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
DB::writeIntBinary(time, buf);
return buf.getBuffer();
}

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,41 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>1</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>2</priority>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>1</priority>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,41 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>2</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>2</priority>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>1</priority>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,41 @@
<clickhouse>
<keeper_server>
<tcp_port>9181</tcp_port>
<server_id>3</server_id>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<coordination_settings>
<operation_timeout_ms>5000</operation_timeout_ms>
<session_timeout_ms>10000</session_timeout_ms>
<snapshot_distance>75</snapshot_distance>
<raft_logs_level>trace</raft_logs_level>
</coordination_settings>
<raft_configuration>
<server>
<id>1</id>
<hostname>node1</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<priority>3</priority>
</server>
<server>
<id>2</id>
<hostname>node2</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>2</priority>
</server>
<server>
<id>3</id>
<hostname>node3</hostname>
<port>9234</port>
<can_become_leader>true</can_become_leader>
<start_as_follower>true</start_as_follower>
<priority>1</priority>
</server>
</raft_configuration>
</keeper_server>
</clickhouse>

View File

@ -0,0 +1,16 @@
<clickhouse>
<zookeeper>
<node index="1">
<host>node1</host>
<port>9181</port>
</node>
<node index="2">
<host>node2</host>
<port>9181</port>
</node>
<node index="3">
<host>node3</host>
<port>9181</port>
</node>
</zookeeper>
</clickhouse>

View File

@ -0,0 +1,129 @@
import pytest
from helpers.cluster import ClickHouseCluster
import random
import string
import os
import time
from multiprocessing.dummy import Pool
from helpers.network import PartitionManager
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/enable_keeper1.xml', 'configs/use_keeper.xml'], stay_alive=True)
node2 = cluster.add_instance('node2', main_configs=['configs/enable_keeper2.xml', 'configs/use_keeper.xml'], stay_alive=True)
node3 = cluster.add_instance('node3', main_configs=['configs/enable_keeper3.xml', 'configs/use_keeper.xml'], stay_alive=True)
from kazoo.client import KazooClient, KazooState
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def smaller_exception(ex):
return '\n'.join(str(ex).split('\n')[0:2])
def wait_node(node):
for _ in range(100):
zk = None
try:
node.query("SELECT * FROM system.zookeeper WHERE path = '/'")
zk = get_fake_zk(node.name, timeout=30.0)
zk.create("/test", sequence=True)
print("node", node.name, "ready")
break
except Exception as ex:
time.sleep(0.2)
print("Waiting until", node.name, "will be ready, exception", ex)
finally:
if zk:
zk.stop()
zk.close()
else:
raise Exception("Can't wait node", node.name, "to become ready")
def wait_nodes():
for node in [node1, node2, node3]:
wait_node(node)
def get_fake_zk(nodename, timeout=30.0):
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
_fake_zk_instance.start()
return _fake_zk_instance
def assert_eq_stats(stat1, stat2):
assert stat1.version == stat2.version
assert stat1.cversion == stat2.cversion
assert stat1.aversion == stat2.aversion
assert stat1.aversion == stat2.aversion
assert stat1.dataLength == stat2.dataLength
assert stat1.numChildren == stat2.numChildren
assert stat1.ctime == stat2.ctime
assert stat1.mtime == stat2.mtime
def test_between_servers(started_cluster):
try:
wait_nodes()
node1_zk = get_fake_zk("node1")
node2_zk = get_fake_zk("node2")
node3_zk = get_fake_zk("node3")
node1_zk.create("/test_between_servers")
for child_node in range(1000):
node1_zk.create("/test_between_servers/" + str(child_node))
for child_node in range(1000):
node1_zk.set("/test_between_servers/" + str(child_node), b"somevalue")
for child_node in range(1000):
stats1 = node1_zk.exists("/test_between_servers/" + str(child_node))
stats2 = node2_zk.exists("/test_between_servers/" + str(child_node))
stats3 = node3_zk.exists("/test_between_servers/" + str(child_node))
assert_eq_stats(stats1, stats2)
assert_eq_stats(stats2, stats3)
finally:
try:
for zk_conn in [node1_zk, node2_zk, node3_zk]:
zk_conn.stop()
zk_conn.close()
except:
pass
def test_server_restart(started_cluster):
try:
wait_nodes()
node1_zk = get_fake_zk("node1")
node1_zk.create("/test_server_restart")
for child_node in range(1000):
node1_zk.create("/test_server_restart/" + str(child_node))
for child_node in range(1000):
node1_zk.set("/test_server_restart/" + str(child_node), b"somevalue")
node3.restart_clickhouse(kill=True)
node2_zk = get_fake_zk("node2")
node3_zk = get_fake_zk("node3")
for child_node in range(1000):
stats1 = node1_zk.exists("/test_between_servers/" + str(child_node))
stats2 = node2_zk.exists("/test_between_servers/" + str(child_node))
stats3 = node3_zk.exists("/test_between_servers/" + str(child_node))
assert_eq_stats(stats1, stats2)
assert_eq_stats(stats2, stats3)
finally:
try:
for zk_conn in [node1_zk, node2_zk, node3_zk]:
zk_conn.stop()
zk_conn.close()
except:
pass