diff --git a/src/Coordination/NuKeeperStorageDispatcher.cpp b/src/Coordination/NuKeeperStorageDispatcher.cpp index 914985ee534..8ca5d3fff13 100644 --- a/src/Coordination/NuKeeperStorageDispatcher.cpp +++ b/src/Coordination/NuKeeperStorageDispatcher.cpp @@ -274,7 +274,13 @@ void NuKeeperStorageDispatcher::sessionCleanerTask() LOG_INFO(log, "Found dead session {}, will try to close it", dead_session); Coordination::ZooKeeperRequestPtr request = Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close); request->xid = Coordination::CLOSE_XID; - putRequest(request, dead_session); + NuKeeperStorage::RequestForSession request_info; + request_info.request = request; + request_info.session_id = dead_session; + { + std::lock_guard lock(push_request_mutex); + requests_queue.push(std::move(request_info)); + } finishSession(dead_session); } } diff --git a/tests/integration/test_testkeeper_multinode/test.py b/tests/integration/test_testkeeper_multinode/test.py index 05879613ba6..51f60df7719 100644 --- a/tests/integration/test_testkeeper_multinode/test.py +++ b/tests/integration/test_testkeeper_multinode/test.py @@ -27,23 +27,8 @@ def started_cluster(): def smaller_exception(ex): return '\n'.join(str(ex).split('\n')[0:2]) -def test_simple_replicated_table(started_cluster): - - for i, node in enumerate([node1, node2, node3]): - node.query("CREATE TABLE t (value UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/t', '{}') ORDER BY tuple()".format(i + 1)) - - node2.query("INSERT INTO t SELECT number FROM numbers(10)") - - node1.query("SYSTEM SYNC REPLICA t", timeout=10) - node3.query("SYSTEM SYNC REPLICA t", timeout=10) - - assert node1.query("SELECT COUNT() FROM t") == "10\n" - assert node2.query("SELECT COUNT() FROM t") == "10\n" - assert node3.query("SELECT COUNT() FROM t") == "10\n" - - -def get_fake_zk(nodename): - _fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=30.0) +def get_fake_zk(nodename, timeout=30.0): + _fake_zk_instance = KazooClient(hosts=cluster.get_instance_ip(nodename) + ":9181", timeout=timeout) def reset_last_zxid_listener(state): print("Fake zk callback called for state", state) _fake_zk_instance.last_zxid = 0 @@ -146,6 +131,56 @@ def test_watch_on_follower(started_cluster): pass +def test_session_expiration(started_cluster): + try: + node1_zk = get_fake_zk("node1") + node2_zk = get_fake_zk("node2") + node3_zk = get_fake_zk("node3", timeout=3.0) + + node3_zk.create("/test_ephemeral_node", b"world", ephemeral=True) + + with PartitionManager() as pm: + pm.partition_instances(node3, node2) + pm.partition_instances(node3, node1) + node3_zk.stop() + node3_zk.close() + time.sleep(5) + + assert node1_zk.exists("/test_ephemeral_node") is None + assert node2_zk.exists("/test_ephemeral_node") is None + + finally: + try: + for zk_conn in [node1_zk, node2_zk, node3_zk]: + try: + zk_conn.stop() + zk_conn.close() + except: + pass + except: + pass + +def test_simple_replicated_table(started_cluster): + # something may be wrong after partition in other tests + # so create with retry + for i, node in enumerate([node1, node2, node3]): + for i in range(100): + try: + node.query("CREATE TABLE t (value UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/t', '{}') ORDER BY tuple()".format(i + 1)) + break + except: + time.sleep(0.1) + + node2.query("INSERT INTO t SELECT number FROM numbers(10)") + + node1.query("SYSTEM SYNC REPLICA t", timeout=10) + node3.query("SYSTEM SYNC REPLICA t", timeout=10) + + assert node1.query("SELECT COUNT() FROM t") == "10\n" + assert node2.query("SELECT COUNT() FROM t") == "10\n" + assert node3.query("SELECT COUNT() FROM t") == "10\n" + + # in extremely rare case it can take more than 5 minutes in debug build with sanitizer @pytest.mark.timeout(600) def test_blocade_leader(started_cluster):