mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Upd test_alter_moving_garbage
This commit is contained in:
parent
44c68ffdab
commit
ac638615ae
@ -11,6 +11,7 @@ from helpers.cluster import ClickHouseCluster
|
|||||||
# two replicas in remote_servers.xml
|
# two replicas in remote_servers.xml
|
||||||
REPLICA_COUNT = 2
|
REPLICA_COUNT = 2
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
@pytest.fixture(scope="module")
|
||||||
def cluster():
|
def cluster():
|
||||||
try:
|
try:
|
||||||
@ -82,7 +83,7 @@ def test_create_table(
|
|||||||
|
|
||||||
additional_settings = {}
|
additional_settings = {}
|
||||||
|
|
||||||
# different names for logs readability
|
# Different names for logs readability
|
||||||
table_name = "test_table"
|
table_name = "test_table"
|
||||||
if allow_remote_fs_zero_copy_replication:
|
if allow_remote_fs_zero_copy_replication:
|
||||||
table_name = "test_table_zero_copy"
|
table_name = "test_table_zero_copy"
|
||||||
@ -99,17 +100,7 @@ def test_create_table(
|
|||||||
f"INSERT INTO {table_name} SELECT toDate('{partition}'), number as id, toString(sipHash64(number, {i})) FROM numbers(10_000)"
|
f"INSERT INTO {table_name} SELECT toDate('{partition}'), number as id, toString(sipHash64(number, {i})) FROM numbers(10_000)"
|
||||||
)
|
)
|
||||||
|
|
||||||
def check_count():
|
# Run ALTER in parallel with moving parts
|
||||||
if replicated_engine:
|
|
||||||
return random.choice(nodes).query_with_retry(
|
|
||||||
f"SELECT countDistinct(dt, data) FROM clusterAllReplicas(test_cluster, default.{table_name}) WHERE id % 100 = 0"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
return random.choice(nodes).query(
|
|
||||||
f"SELECT countDistinct(dt, data) FROM {table_name} WHERE id % 100 = 0"
|
|
||||||
)
|
|
||||||
|
|
||||||
assert check_count() == "1000\n"
|
|
||||||
|
|
||||||
stop_alter = False
|
stop_alter = False
|
||||||
|
|
||||||
@ -118,9 +109,14 @@ def test_create_table(
|
|||||||
for d in range(1, 100):
|
for d in range(1, 100):
|
||||||
if stop_alter:
|
if stop_alter:
|
||||||
break
|
break
|
||||||
# I managed to reproduce issue with DELETE, but it can be any other lightweight mutation
|
|
||||||
# Do not delete rows with id % 100 = 0, because they are used in check_count to check that data is not corrupted
|
# Some lightweight mutation should change moving part before it is swapped, then we will have to cleanup it.
|
||||||
|
# Messages `Failed to swap {}. Active part doesn't exist` should appear in logs.
|
||||||
|
#
|
||||||
|
# I managed to reproduce issue with DELETE (`ALTER TABLE {table_name} ADD/DROP COLUMN` also works on real s3 instead of minio)
|
||||||
|
# Note: do not delete rows with id % 100 = 0, because they are used in `check_count` to use them in check that data is not corrupted
|
||||||
random.choice(nodes).query(f"DELETE FROM {table_name} WHERE id % 100 = {d}")
|
random.choice(nodes).query(f"DELETE FROM {table_name} WHERE id % 100 = {d}")
|
||||||
|
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
|
|
||||||
alter_thread = threading.Thread(target=alter)
|
alter_thread = threading.Thread(target=alter)
|
||||||
@ -143,4 +139,17 @@ def test_create_table(
|
|||||||
stop_alter = True
|
stop_alter = True
|
||||||
alter_thread.join()
|
alter_thread.join()
|
||||||
|
|
||||||
assert check_count() == "1000\n"
|
# Check that no data was lost
|
||||||
|
|
||||||
|
data_digest = None
|
||||||
|
if replicated_engine:
|
||||||
|
# We don't know what data was replicated, so we need to check all replicas and take unique values
|
||||||
|
data_digest = random.choice(nodes).query_with_retry(
|
||||||
|
f"SELECT countDistinct(dt, data) FROM clusterAllReplicas(test_cluster, default.{table_name}) WHERE id % 100 == 0"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
data_digest = random.choice(nodes).query(
|
||||||
|
f"SELECT countDistinct(dt, data) FROM {table_name} WHERE id % 100 == 0"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert data_digest == "1000\n"
|
||||||
|
Loading…
Reference in New Issue
Block a user