ClickHouse/tests/integration/test_mutations_with_merge_tree/test.py
Vladimir Chebotarev 863f3565ad Test fixes.
2021-11-25 17:11:28 +03:00

198 lines
9.8 KiB
Python

import time
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
instance_test_mutations = cluster.add_instance('test_mutations_with_merge_tree', main_configs=['configs/config.xml'],
user_configs=['configs/users.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
instance_test_mutations.query(
'''CREATE TABLE test_mutations_with_ast_elements(date Date, a UInt64, b String) ENGINE = MergeTree(date, (a, date), 8192)''')
instance_test_mutations.query(
'''INSERT INTO test_mutations_with_ast_elements SELECT '2019-07-29' AS date, 1, toString(number) FROM numbers(1) SETTINGS force_index_by_date = 0, force_primary_key = 0''')
yield cluster
finally:
cluster.shutdown()
def test_mutations_in_partition_background(started_cluster):
try:
numbers = 100
name = "test_mutations_in_partition"
instance_test_mutations.query(
f'''CREATE TABLE {name} (date Date, a UInt64, b String) ENGINE = MergeTree() ORDER BY tuple() PARTITION BY a''')
instance_test_mutations.query(
f'''INSERT INTO {name} SELECT '2019-07-29' AS date, number, toString(number) FROM numbers({numbers})''')
for i in range(0, numbers, 3):
instance_test_mutations.query(f'''ALTER TABLE {name} DELETE IN PARTITION {i} WHERE a = {i}''')
for i in range(1, numbers, 3):
instance_test_mutations.query(f'''ALTER TABLE {name} UPDATE b = 'changed' IN PARTITION {i} WHERE a = {i} ''')
def count_and_changed():
return instance_test_mutations.query(f"SELECT count(), countIf(b == 'changed') FROM {name} SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV").splitlines()
all_done = False
for wait_times_for_mutation in range(100): # wait for replication 80 seconds max
time.sleep(0.8)
if count_and_changed() == ["66,33"]:
all_done = True
break
print(instance_test_mutations.query(
f"SELECT mutation_id, command, parts_to_do, is_done, latest_failed_part, latest_fail_reason, parts_to_do_names FROM system.mutations WHERE table = '{name}' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT TSVWithNames"))
assert (count_and_changed(), all_done) == (["66,33"], True)
assert instance_test_mutations.query(f"SELECT count(), sum(is_done) FROM system.mutations WHERE table = '{name}' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV").splitlines() == ["67,67"]
finally:
instance_test_mutations.query(f'''DROP TABLE {name}''')
@pytest.mark.parametrize("sync", [
("last",),
("all",)
])
def test_mutations_in_partition_sync(started_cluster, sync):
try:
numbers = 10
name = "test_mutations_in_partition_sync"
instance_test_mutations.query(
f'''CREATE TABLE {name} (date Date, a UInt64, b String) ENGINE = MergeTree() ORDER BY tuple() PARTITION BY a''')
instance_test_mutations.query(
f'''INSERT INTO {name} SELECT '2019-07-29' AS date, number, toString(number) FROM numbers({numbers})''')
for i in range(0, numbers, 3):
instance_test_mutations.query(f'''ALTER TABLE {name} DELETE IN PARTITION {i} WHERE a = {i}'''
+ (' SETTINGS mutations_sync = 1' if sync == 'all' else ''))
for reverse_index, i in reversed(list(enumerate(reversed(range(1, numbers, 3))))):
instance_test_mutations.query(f'''ALTER TABLE {name} UPDATE b = 'changed' IN PARTITION {i} WHERE a = {i}'''
+ (' SETTINGS mutations_sync = 1' if not reverse_index or sync == 'all' else ''))
def count_and_changed():
return instance_test_mutations.query(f"SELECT count(), countIf(b == 'changed') FROM {name} SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV").splitlines()
print(instance_test_mutations.query(
f"SELECT mutation_id, command, parts_to_do, is_done, latest_failed_part, latest_fail_reason FROM system.mutations WHERE table = '{name}' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT TSVWithNames"))
assert count_and_changed() == ["6,3"]
assert instance_test_mutations.query(f"SELECT count(), sum(is_done) FROM system.mutations WHERE table = '{name}' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV").splitlines() == ["7,7"]
finally:
instance_test_mutations.query(f'''DROP TABLE {name}''')
def test_mutations_with_merge_background_task(started_cluster):
instance_test_mutations.query('''SYSTEM STOP MERGES test_mutations_with_ast_elements''')
## The number of asts per query is 15
for execution_times_for_mutation in range(100):
instance_test_mutations.query(
'''ALTER TABLE test_mutations_with_ast_elements DELETE WHERE 1 = 1 AND toUInt32(b) IN (1)''')
all_done = False
for wait_times_for_mutation in range(100): # wait for replication 80 seconds max
time.sleep(0.8)
def get_done_mutations(instance):
instance_test_mutations.query('''DETACH TABLE test_mutations_with_ast_elements''')
instance_test_mutations.query('''ATTACH TABLE test_mutations_with_ast_elements''')
return int(instance.query(
"SELECT sum(is_done) FROM system.mutations WHERE table = 'test_mutations_with_ast_elements' SETTINGS force_index_by_date = 0, force_primary_key = 0").rstrip())
if get_done_mutations(instance_test_mutations) == 100:
all_done = True
break
print(instance_test_mutations.query(
"SELECT mutation_id, command, parts_to_do, is_done FROM system.mutations WHERE table = 'test_mutations_with_ast_elements' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT TSVWithNames"))
assert all_done
def test_mutations_with_truncate_table(started_cluster):
instance_test_mutations.query('''SYSTEM STOP MERGES test_mutations_with_ast_elements''')
## The number of asts per query is 15
for execute_number in range(100):
instance_test_mutations.query(
'''ALTER TABLE test_mutations_with_ast_elements DELETE WHERE 1 = 1 AND toUInt32(b) IN (1)''')
instance_test_mutations.query("TRUNCATE TABLE test_mutations_with_ast_elements")
assert instance_test_mutations.query(
"SELECT COUNT() FROM system.mutations WHERE table = 'test_mutations_with_ast_elements SETTINGS force_index_by_date = 0, force_primary_key = 0'").rstrip() == '0'
def test_mutations_will_not_hang_for_non_existing_parts_sync(started_cluster):
try:
numbers = 100
name = "test_mutations_will_not_hang_for_non_existing_parts_sync"
instance_test_mutations.query(
f"""CREATE TABLE {name} (date Date, a UInt64, b String) ENGINE = MergeTree() ORDER BY tuple() PARTITION BY a""")
instance_test_mutations.query(
f"""INSERT INTO {name} SELECT '2019-07-29' AS date, number, toString(number) FROM numbers({numbers})""")
for i in range(0, numbers, 3):
instance_test_mutations.query(f"""ALTER TABLE {name} DELETE IN PARTITION {i+1000} WHERE a = {i} SETTINGS mutations_sync = 1""")
def count():
return instance_test_mutations.query(f"SELECT count() FROM {name} SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV").splitlines()
print(instance_test_mutations.query(
f"SELECT mutation_id, command, parts_to_do, is_done, latest_failed_part, latest_fail_reason, parts_to_do_names FROM system.mutations WHERE table = '{name}' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT TSVWithNames"))
assert count() == [f"{numbers}"]
assert instance_test_mutations.query(f"SELECT count(), sum(is_done) FROM system.mutations WHERE table = '{name}' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV").splitlines() == [f"34,34"]
finally:
instance_test_mutations.query(f"""DROP TABLE {name}""")
def test_mutations_will_not_hang_for_non_existing_parts_async(started_cluster):
try:
numbers = 100
name = "test_mutations_will_not_hang_for_non_existing_parts_async"
instance_test_mutations.query(
f"""CREATE TABLE {name} (date Date, a UInt64, b String) ENGINE = MergeTree() ORDER BY tuple() PARTITION BY a""")
instance_test_mutations.query(
f"""INSERT INTO {name} SELECT '2019-07-29' AS date, number, toString(number) FROM numbers({numbers})""")
for i in range(0, numbers, 3):
instance_test_mutations.query(f"""ALTER TABLE {name} DELETE IN PARTITION {i+1000} WHERE a = {i}""")
def count():
return instance_test_mutations.query(f"SELECT count() FROM {name} SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV").splitlines()
def count_and_sum_is_done():
return instance_test_mutations.query(f"SELECT count(), sum(is_done) FROM system.mutations WHERE table = '{name}' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV").splitlines()
all_done = False
for wait_times_for_mutation in range(100): # wait for replication 80 seconds max
time.sleep(0.8)
if count_and_sum_is_done() == ["34,34"]:
all_done = True
break
print(instance_test_mutations.query(
f"SELECT mutation_id, command, parts_to_do, is_done, latest_failed_part, latest_fail_reason, parts_to_do_names FROM system.mutations WHERE table = '{name}' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT TSVWithNames"))
assert count() == [f"{numbers}"]
assert count_and_sum_is_done() == ["34,34"]
finally:
instance_test_mutations.query(f"""DROP TABLE {name}""")