mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 09:02:00 +00:00
Fix outdated session kill
This commit is contained in:
parent
9d414d215e
commit
51c221f993
@ -274,7 +274,13 @@ void NuKeeperStorageDispatcher::sessionCleanerTask()
|
||||
LOG_INFO(log, "Found dead session {}, will try to close it", dead_session);
|
||||
Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
|
||||
request->xid = Coordination::CLOSE_XID;
|
||||
putRequest(request, dead_session);
|
||||
NuKeeperStorage::RequestForSession request_info;
|
||||
request_info.request = request;
|
||||
request_info.session_id = dead_session;
|
||||
{
|
||||
std::lock_guard lock(push_request_mutex);
|
||||
requests_queue.push(std::move(request_info));
|
||||
}
|
||||
finishSession(dead_session);
|
||||
}
|
||||
}
|
||||
|
@ -27,23 +27,8 @@ def started_cluster():
|
||||
def smaller_exception(ex):
|
||||
return '\n'.join(str(ex).split('\n')[0:2])
|
||||
|
||||
def test_simple_replicated_table(started_cluster):
|
||||
|
||||
for i, node in enumerate([node1, node2, node3]):
|
||||
node.query("CREATE TABLE t (value UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/t', '{}') ORDER BY tuple()".format(i + 1))
|
||||
|
||||
node2.query("INSERT INTO t SELECT number FROM numbers(10)")
|
||||
|
||||
node1.query("SYSTEM SYNC REPLICA t", timeout=10)
|
||||
node3.query("SYSTEM SYNC REPLICA t", timeout=10)
|
||||
|
||||
assert node1.query("SELECT COUNT() FROM t") == "10\n"
|
||||
assert node2.query("SELECT COUNT() FROM t") == "10\n"
|
||||
assert node3.query("SELECT COUNT() FROM t") == "10\n"
|
||||
|
||||
|
||||
def get_fake_zk(nodename):
|
||||
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=30.0)
|
||||
def get_fake_zk(nodename, timeout=30.0):
|
||||
_fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout)
|
||||
def reset_last_zxid_listener(state):
|
||||
print("Fake zk callback called for state", state)
|
||||
_fake_zk_instance.last_zxid = 0
|
||||
@ -146,6 +131,56 @@ def test_watch_on_follower(started_cluster):
|
||||
pass
|
||||
|
||||
|
||||
def test_session_expiration(started_cluster):
|
||||
try:
|
||||
node1_zk = get_fake_zk("node1")
|
||||
node2_zk = get_fake_zk("node2")
|
||||
node3_zk = get_fake_zk("node3", timeout=3.0)
|
||||
|
||||
node3_zk.create("/test_ephemeral_node", b"world", ephemeral=True)
|
||||
|
||||
with PartitionManager() as pm:
|
||||
pm.partition_instances(node3, node2)
|
||||
pm.partition_instances(node3, node1)
|
||||
node3_zk.stop()
|
||||
node3_zk.close()
|
||||
time.sleep(5)
|
||||
|
||||
assert node1_zk.exists("/test_ephemeral_node") is None
|
||||
assert node2_zk.exists("/test_ephemeral_node") is None
|
||||
|
||||
finally:
|
||||
try:
|
||||
for zk_conn in [node1_zk, node2_zk, node3_zk]:
|
||||
try:
|
||||
zk_conn.stop()
|
||||
zk_conn.close()
|
||||
except:
|
||||
pass
|
||||
except:
|
||||
pass
|
||||
|
||||
def test_simple_replicated_table(started_cluster):
|
||||
# something may be wrong after partition in other tests
|
||||
# so create with retry
|
||||
for i, node in enumerate([node1, node2, node3]):
|
||||
for i in range(100):
|
||||
try:
|
||||
node.query("CREATE TABLE t (value UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/t', '{}') ORDER BY tuple()".format(i + 1))
|
||||
break
|
||||
except:
|
||||
time.sleep(0.1)
|
||||
|
||||
node2.query("INSERT INTO t SELECT number FROM numbers(10)")
|
||||
|
||||
node1.query("SYSTEM SYNC REPLICA t", timeout=10)
|
||||
node3.query("SYSTEM SYNC REPLICA t", timeout=10)
|
||||
|
||||
assert node1.query("SELECT COUNT() FROM t") == "10\n"
|
||||
assert node2.query("SELECT COUNT() FROM t") == "10\n"
|
||||
assert node3.query("SELECT COUNT() FROM t") == "10\n"
|
||||
|
||||
|
||||
# in extremely rare case it can take more than 5 minutes in debug build with sanitizer
|
||||
@pytest.mark.timeout(600)
|
||||
def test_blocade_leader(started_cluster):
|
||||
|
Loading…
Reference in New Issue
Block a user