ClickHouse/tests/integration/test_replicated_mutations/test.py
2024-09-27 10:19:49 +00:00

303 lines
9.2 KiB
Python

import logging
import random
import threading
import time
from collections import Counter
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance("node1", macros={"cluster": "test1"}, with_zookeeper=True)
# Check, that limits on max part size for merges doesn`t affect mutations
node2 = cluster.add_instance(
"node2",
macros={"cluster": "test1"},
main_configs=["configs/merge_tree.xml"],
user_configs=["configs/users.xml"],
with_zookeeper=True,
)
node3 = cluster.add_instance(
"node3",
macros={"cluster": "test2"},
main_configs=["configs/merge_tree_max_parts.xml"],
user_configs=["configs/users.xml"],
with_zookeeper=True,
)
node4 = cluster.add_instance(
"node4",
macros={"cluster": "test2"},
main_configs=["configs/merge_tree_max_parts.xml"],
user_configs=["configs/users.xml"],
with_zookeeper=True,
)
node5 = cluster.add_instance(
"node5",
macros={"cluster": "test3"},
main_configs=["configs/merge_tree_max_parts.xml"],
user_configs=["configs/users.xml"],
)
all_nodes = [node1, node2, node3, node4, node5]
def prepare_cluster():
for node in all_nodes:
node.query("DROP TABLE IF EXISTS test_mutations SYNC")
for node in [node1, node2, node3, node4]:
node.query(
"""
CREATE TABLE test_mutations(d Date, x UInt32, i UInt32)
ENGINE ReplicatedMergeTree('/clickhouse/{cluster}/tables/test/test_mutations', '{instance}')
ORDER BY x
PARTITION BY toYYYYMM(d)
SETTINGS number_of_free_entries_in_pool_to_execute_mutation=0
"""
)
node5.query(
"CREATE TABLE test_mutations(d Date, x UInt32, i UInt32) ENGINE MergeTree() ORDER BY x PARTITION BY toYYYYMM(d)"
)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
class Runner:
def __init__(self, nodes):
self.nodes = nodes
self.mtx = threading.Lock()
self.total_inserted_xs = 0
self.total_inserted_rows = 0
self.total_mutations = 0
self.total_deleted_xs = 0
self.total_deleted_rows = 0
self.current_xs = Counter()
self.currently_inserting_xs = Counter()
self.currently_deleting_xs = set()
self.stop_ev = threading.Event()
self.exceptions = []
def do_insert(self, thread_num, partitions_num):
self.stop_ev.wait(random.random())
# Each thread inserts a small random number of rows with random year, month 01 and day determined
# by the thread number. The idea is to avoid spurious duplicates and to insert into a
# nontrivial number of partitions.
month = "01"
day = str(thread_num + 1).zfill(2)
i = 1
while not self.stop_ev.is_set():
xs = [random.randint(1, 10) for _ in range(random.randint(1, 10))]
with self.mtx:
xs = [x for x in xs if x not in self.currently_deleting_xs]
if len(xs) == 0:
continue
for x in xs:
self.currently_inserting_xs[x] += 1
year = 2000 + random.randint(0, partitions_num)
date_str = "{year}-{month}-{day}".format(year=year, month=month, day=day)
payload = ""
for x in xs:
payload += "{date_str} {x} {i}\n".format(date_str=date_str, x=x, i=i)
i += 1
try:
logging.debug(f"thread {thread_num}: insert for {date_str}: {xs}")
random.choice(self.nodes).query(
"INSERT INTO test_mutations FORMAT TSV", payload
)
with self.mtx:
for x in xs:
self.current_xs[x] += 1
self.total_inserted_xs += sum(xs)
self.total_inserted_rows += len(xs)
except Exception as e:
logging.debug(f"Exception while inserting: {e}")
self.exceptions.append(e)
finally:
with self.mtx:
for x in xs:
self.currently_inserting_xs[x] -= 1
self.stop_ev.wait(0.2 + random.random() / 5)
def do_delete(self, thread_num):
self.stop_ev.wait(1.0 + random.random())
while not self.stop_ev.is_set():
chosen = False
with self.mtx:
if self.current_xs:
x = random.choice(list(self.current_xs.elements()))
if (
self.currently_inserting_xs[x] == 0
and x not in self.currently_deleting_xs
):
chosen = True
self.currently_deleting_xs.add(x)
to_delete_count = self.current_xs[x]
if not chosen:
self.stop_ev.wait(0.1 * random.random())
continue
try:
logging.debug(f"thread {thread_num}: delete {to_delete_count} * {x}")
random.choice(self.nodes).query(
"ALTER TABLE test_mutations DELETE WHERE x = {}".format(x)
)
with self.mtx:
self.total_mutations += 1
self.current_xs[x] -= to_delete_count
self.total_deleted_xs += to_delete_count * x
self.total_deleted_rows += to_delete_count
except Exception as e:
logging.debug(f"Exception while deleting: {e}")
finally:
with self.mtx:
self.currently_deleting_xs.remove(x)
self.stop_ev.wait(1.0 + random.random() * 2)
def wait_for_mutations(nodes, number_of_mutations):
for i in range(100): # wait for replication 80 seconds max
time.sleep(0.8)
def get_done_mutations(node):
return int(
node.query(
"SELECT sum(is_done) FROM system.mutations WHERE table = 'test_mutations'"
).rstrip()
)
if all([get_done_mutations(n) == number_of_mutations for n in nodes]):
return True
return False
def test_mutations(started_cluster):
prepare_cluster()
DURATION_SECONDS = 30
nodes = [node1, node2]
runner = Runner(nodes)
threads = []
for thread_num in range(5):
threads.append(threading.Thread(target=runner.do_insert, args=(thread_num, 10)))
for thread_num in (11, 12, 13):
threads.append(threading.Thread(target=runner.do_delete, args=(thread_num,)))
for t in threads:
t.start()
time.sleep(DURATION_SECONDS)
runner.stop_ev.set()
for t in threads:
t.join()
# Sanity check: at least something was inserted and something was deleted
assert runner.total_inserted_rows > 0
assert runner.total_mutations > 0
all_done = wait_for_mutations(nodes, runner.total_mutations)
logging.debug(f"Total mutations: {runner.total_mutations}")
for node in nodes:
logging.debug(
node.query(
"SELECT mutation_id, command, parts_to_do, is_done FROM system.mutations WHERE table = 'test_mutations' FORMAT TSVWithNames"
)
)
assert all_done
expected_sum = runner.total_inserted_xs - runner.total_deleted_xs
actual_sums = []
for i, node in enumerate(nodes):
actual_sums.append(
int(node.query("SELECT sum(x) FROM test_mutations").rstrip())
)
assert actual_sums[i] == expected_sum
@pytest.mark.parametrize(
("nodes",),
[
(
[
node5,
],
), # MergeTree
([node3, node4],), # ReplicatedMergeTree
],
)
def test_mutations_dont_prevent_merges(started_cluster, nodes):
prepare_cluster()
for year in range(2000, 2016):
rows = ""
date_str = "{}-01-{}".format(year, random.randint(1, 10))
for i in range(10):
rows += "{} {} {}\n".format(date_str, random.randint(1, 10), i)
nodes[0].query("INSERT INTO test_mutations FORMAT TSV", rows)
# will run mutations of 16 parts in parallel, mutations will sleep for about 20 seconds
nodes[0].query("ALTER TABLE test_mutations UPDATE i = sleepEachRow(2) WHERE 1")
runner = Runner(nodes)
threads = []
for thread_num in range(2):
threads.append(threading.Thread(target=runner.do_insert, args=(thread_num, 0)))
# will insert approx 8-10 new parts per 1 second into one partition
for t in threads:
t.start()
all_done = wait_for_mutations(nodes, 1)
runner.stop_ev.set()
for t in threads:
t.join()
for node in nodes:
logging.debug(
node.query(
"SELECT mutation_id, command, parts_to_do, is_done FROM system.mutations WHERE table = 'test_mutations' FORMAT TSVWithNames"
)
)
logging.debug(
node.query(
"SELECT partition, count(name), sum(active), sum(active*rows) FROM system.parts WHERE table ='test_mutations' GROUP BY partition FORMAT TSVWithNames"
)
)
assert all_done, "All done"
assert all([str(e).find("Too many parts") < 0 for e in runner.exceptions])