ClickHouse/tests/integration/test_part_moves_between_shards/test.py

163 lines
5.1 KiB
Python

import random
import time
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
s0r0 = cluster.add_instance(
's0r0',
main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'],
with_zookeeper=True)
s0r1 = cluster.add_instance(
's0r1',
main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'],
with_zookeeper=True)
s1r0 = cluster.add_instance(
's1r0',
main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'],
with_zookeeper=True)
s1r1 = cluster.add_instance(
's1r1',
main_configs=['configs/remote_servers.xml', 'configs/merge_tree.xml'],
with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_move(started_cluster):
for shard_ix, rs in enumerate([[s0r0, s0r1], [s1r0, s1r1]]):
for replica_ix, r in enumerate(rs):
r.query("""
CREATE TABLE t(v UInt64)
ENGINE ReplicatedMergeTree('/clickhouse/shard_{}/tables/t', '{}')
ORDER BY tuple()
""".format(shard_ix, replica_ix))
s0r0.query("SYSTEM STOP MERGES t")
s0r0.query("INSERT INTO t VALUES (1)")
s0r0.query("INSERT INTO t VALUES (2)")
assert "2" == s0r0.query("SELECT count() FROM t").strip()
assert "0" == s1r0.query("SELECT count() FROM t").strip()
s0r0.query("ALTER TABLE t MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/t'")
print(s0r0.query("SELECT * FROM system.part_moves_between_shards"))
s0r0.query("SYSTEM START MERGES t")
s0r0.query("OPTIMIZE TABLE t FINAL")
while True:
time.sleep(3)
print(s0r0.query("SELECT * FROM system.part_moves_between_shards"))
# Eventually.
if "DONE" == s0r0.query("SELECT state FROM system.part_moves_between_shards").strip():
break
for n in [s0r0, s0r1]:
assert "1" == n.query("SELECT count() FROM t").strip()
for n in [s1r0, s1r1]:
assert "1" == n.query("SELECT count() FROM t").strip()
# Move part back
s1r0.query("ALTER TABLE t MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_0/tables/t'")
while True:
time.sleep(3)
print(s1r0.query("SELECT * FROM system.part_moves_between_shards"))
# Eventually.
if "DONE" == s1r0.query("SELECT state FROM system.part_moves_between_shards").strip():
break
for n in [s0r0, s0r1]:
assert "2" == n.query("SELECT count() FROM t").strip()
for n in [s1r0, s1r1]:
assert "0" == n.query("SELECT count() FROM t").strip()
# Cleanup.
for n in started_cluster.instances.values():
n.query("DROP TABLE t SYNC")
def test_deduplication_while_move(started_cluster):
for shard_ix, rs in enumerate([[s0r0, s0r1], [s1r0, s1r1]]):
for replica_ix, r in enumerate(rs):
r.query("""
CREATE TABLE t(v UInt64)
ENGINE ReplicatedMergeTree('/clickhouse/shard_{}/tables/t', '{}')
ORDER BY tuple()
""".format(shard_ix, replica_ix))
r.query("""
CREATE TABLE t_d AS t
ENGINE Distributed('test_cluster', '', t)
""")
s0r0.query("SYSTEM STOP MERGES t")
s0r0.query("INSERT INTO t VALUES (1)")
s0r0.query("INSERT INTO t VALUES (2)")
s0r1.query("SYSTEM SYNC REPLICA t")
assert "2" == s0r0.query("SELECT count() FROM t").strip()
assert "0" == s1r0.query("SELECT count() FROM t").strip()
s0r0.query("ALTER TABLE t MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/t'")
s0r0.query("SYSTEM START MERGES t")
expected = """
1
2
"""
# Verify that we get consisntent result at all times while the part is moving from one shard to another.
while "DONE" != s0r0.query("SELECT state FROM system.part_moves_between_shards ORDER BY create_time DESC LIMIT 1").strip():
n = random.choice(list(started_cluster.instances.values()))
assert TSV(n.query("SELECT * FROM t_d ORDER BY v", settings={
"allow_experimental_query_deduplication": 1
})) == TSV(expected)
def test_move_not_permitted(started_cluster):
for ix, n in enumerate([s0r0, s1r0]):
n.query("""
CREATE TABLE not_permitted(v_{} UInt64)
ENGINE ReplicatedMergeTree('/clickhouse/shard_{}/tables/not_permitted', 'r')
ORDER BY tuple()
""".format(ix, ix))
s0r0.query("INSERT INTO not_permitted VALUES (1)")
with pytest.raises(QueryRuntimeException) as exc:
s0r0.query("ALTER TABLE not_permitted MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_1/tables/not_permitted'")
assert "DB::Exception: Table columns structure in ZooKeeper is different from local table structure." in str(exc.value)
with pytest.raises(QueryRuntimeException) as exc:
s0r0.query("ALTER TABLE not_permitted MOVE PART 'all_0_0_0' TO SHARD '/clickhouse/shard_0/tables/not_permitted'")
assert "DB::Exception: Source and destination are the same" in str(exc.value)