mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
reduce an amount of trash in tests_system_merges
This commit is contained in:
parent
8b13b85ea0
commit
6f8c1424df
@ -3,6 +3,7 @@ import time
|
||||
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import assert_eq_with_retry
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
@ -20,6 +21,11 @@ node2 = cluster.add_instance(
|
||||
macros={"shard": 0, "replica": 2},
|
||||
)
|
||||
|
||||
settings = {
|
||||
"mutations_sync": 2,
|
||||
"replication_alter_partitions_sync": 2,
|
||||
"optimize_throw_if_noop": 1,
|
||||
}
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
@ -45,38 +51,29 @@ def split_tsv(data):
|
||||
|
||||
@pytest.mark.parametrize("replicated", ["", "replicated"])
|
||||
def test_merge_simple(started_cluster, replicated):
|
||||
clickhouse_path = "/var/lib/clickhouse"
|
||||
db_name = "test"
|
||||
table_name = "merge_simple"
|
||||
name = db_name + "." + table_name
|
||||
table_path = "data/" + db_name + "/" + table_name
|
||||
nodes = [node1, node2] if replicated else [node1]
|
||||
engine = (
|
||||
"ReplicatedMergeTree('/clickhouse/test_merge_simple', '{replica}')"
|
||||
if replicated
|
||||
else "MergeTree()"
|
||||
)
|
||||
node_check = nodes[-1]
|
||||
starting_block = 0 if replicated else 1
|
||||
|
||||
try:
|
||||
clickhouse_path = "/var/lib/clickhouse"
|
||||
db_name = "test"
|
||||
table_name = "merge_simple"
|
||||
name = db_name + "." + table_name
|
||||
table_path = "data/" + db_name + "/" + table_name
|
||||
nodes = [node1, node2] if replicated else [node1]
|
||||
engine = (
|
||||
"ReplicatedMergeTree('/clickhouse/test_merge_simple', '{replica}')"
|
||||
if replicated
|
||||
else "MergeTree()"
|
||||
)
|
||||
node_check = nodes[-1]
|
||||
starting_block = 0 if replicated else 1
|
||||
|
||||
for node in nodes:
|
||||
node.query(
|
||||
"""
|
||||
CREATE TABLE {name}
|
||||
(
|
||||
`a` Int64
|
||||
)
|
||||
ENGINE = {engine}
|
||||
ORDER BY sleep(2)
|
||||
""".format(
|
||||
engine=engine, name=name
|
||||
)
|
||||
)
|
||||
node.query(f"create table {name} (a Int64) engine={engine} order by tuple()")
|
||||
|
||||
node1.query("INSERT INTO {name} VALUES (1)".format(name=name))
|
||||
node1.query("INSERT INTO {name} VALUES (2)".format(name=name))
|
||||
node1.query("INSERT INTO {name} VALUES (3)".format(name=name))
|
||||
node1.query(f"INSERT INTO {name} VALUES (1)")
|
||||
node1.query(f"INSERT INTO {name} VALUES (2)")
|
||||
node1.query(f"INSERT INTO {name} VALUES (3)")
|
||||
|
||||
node1.query(f"alter table {name} add column b int materialized sleepEachRow(3)", settings=settings)
|
||||
|
||||
parts = [
|
||||
"all_{}_{}_0".format(x, x)
|
||||
@ -84,15 +81,16 @@ def test_merge_simple(started_cluster, replicated):
|
||||
]
|
||||
result_part = "all_{}_{}_1".format(starting_block, starting_block + 2)
|
||||
|
||||
# OPTIMIZE will sleep for 3s * 3 (parts) = 9s
|
||||
def optimize():
|
||||
node1.query("OPTIMIZE TABLE {name}".format(name=name))
|
||||
node1.query("OPTIMIZE TABLE {name}".format(name=name), settings=settings)
|
||||
|
||||
wait = threading.Thread(target=time.sleep, args=(5,))
|
||||
wait.start()
|
||||
t = threading.Thread(target=optimize)
|
||||
t.start()
|
||||
|
||||
time.sleep(1)
|
||||
# Wait for OPTIMIZE to actually start
|
||||
assert_eq_with_retry(node1, f"select count() from system.merges where table='{table_name}'", "1\n", retry_count=30, sleep_time=0.1)
|
||||
|
||||
assert (
|
||||
split_tsv(
|
||||
node_check.query(
|
||||
@ -124,17 +122,17 @@ def test_merge_simple(started_cluster, replicated):
|
||||
]
|
||||
)
|
||||
t.join()
|
||||
wait.join()
|
||||
|
||||
# It still can show a row with progress=1, because OPTIMIZE returns before the entry is removed from MergeList
|
||||
assert (
|
||||
node_check.query(
|
||||
"SELECT * FROM system.merges WHERE table = '{name}' and progress < 1".format(
|
||||
name=table_name
|
||||
)
|
||||
f"SELECT * FROM system.merges WHERE table = '{table_name}' and progress < 1"
|
||||
)
|
||||
== ""
|
||||
)
|
||||
|
||||
# It will eventually disappear
|
||||
assert_eq_with_retry(node_check, f"SELECT * FROM system.merges WHERE table = '{table_name}' and progress < 1", "\n")
|
||||
finally:
|
||||
for node in nodes:
|
||||
node.query("DROP TABLE {name}".format(name=name))
|
||||
@ -142,55 +140,45 @@ def test_merge_simple(started_cluster, replicated):
|
||||
|
||||
@pytest.mark.parametrize("replicated", ["", "replicated"])
|
||||
def test_mutation_simple(started_cluster, replicated):
|
||||
clickhouse_path = "/var/lib/clickhouse"
|
||||
db_name = "test"
|
||||
table_name = "mutation_simple"
|
||||
name = db_name + "." + table_name
|
||||
table_path = "data/" + db_name + "/" + table_name
|
||||
nodes = [node1, node2] if replicated else [node1]
|
||||
engine = (
|
||||
"ReplicatedMergeTree('/clickhouse/test_mutation_simple', '{replica}')"
|
||||
if replicated
|
||||
else "MergeTree()"
|
||||
)
|
||||
node_check = nodes[-1]
|
||||
starting_block = 0 if replicated else 1
|
||||
|
||||
try:
|
||||
clickhouse_path = "/var/lib/clickhouse"
|
||||
db_name = "test"
|
||||
table_name = "mutation_simple"
|
||||
name = db_name + "." + table_name
|
||||
table_path = "data/" + db_name + "/" + table_name
|
||||
nodes = [node1, node2] if replicated else [node1]
|
||||
engine = (
|
||||
"ReplicatedMergeTree('/clickhouse/test_mutation_simple', '{replica}')"
|
||||
if replicated
|
||||
else "MergeTree()"
|
||||
)
|
||||
node_check = nodes[-1]
|
||||
starting_block = 0 if replicated else 1
|
||||
|
||||
for node in nodes:
|
||||
node.query(
|
||||
"""
|
||||
CREATE TABLE {name}
|
||||
(
|
||||
`a` Int64
|
||||
)
|
||||
ENGINE = {engine}
|
||||
ORDER BY tuple()
|
||||
""".format(
|
||||
engine=engine, name=name
|
||||
)
|
||||
)
|
||||
node.query(f"create table {name} (a Int64) engine={engine} order by tuple()")
|
||||
|
||||
node1.query(f"INSERT INTO {name} VALUES (1), (2), (3)")
|
||||
|
||||
node1.query("INSERT INTO {name} VALUES (1)".format(name=name))
|
||||
part = "all_{}_{}_0".format(starting_block, starting_block)
|
||||
result_part = "all_{}_{}_0_{}".format(
|
||||
starting_block, starting_block, starting_block + 1
|
||||
)
|
||||
|
||||
# ALTER will sleep for 3s * 3 (rows) = 9s
|
||||
def alter():
|
||||
node1.query(
|
||||
"ALTER TABLE {name} UPDATE a = 42 WHERE sleep(2) OR 1".format(
|
||||
name=name
|
||||
),
|
||||
settings={
|
||||
"mutations_sync": 1,
|
||||
},
|
||||
f"ALTER TABLE {name} UPDATE a = 42 WHERE sleep(3) OR 1",
|
||||
settings=settings
|
||||
)
|
||||
|
||||
t = threading.Thread(target=alter)
|
||||
t.start()
|
||||
|
||||
time.sleep(1)
|
||||
# Wait for the mutation to actually start
|
||||
assert_eq_with_retry(node1, f"select count() from system.merges where table='{table_name}'", "1\n", retry_count=30, sleep_time=0.1)
|
||||
|
||||
assert (
|
||||
split_tsv(
|
||||
node_check.query(
|
||||
@ -225,13 +213,14 @@ def test_mutation_simple(started_cluster, replicated):
|
||||
|
||||
assert (
|
||||
node_check.query(
|
||||
"SELECT * FROM system.merges WHERE table = '{name}' and progress < 1".format(
|
||||
name=table_name
|
||||
)
|
||||
f"SELECT * FROM system.merges WHERE table = '{table_name}' and progress < 1"
|
||||
)
|
||||
== ""
|
||||
)
|
||||
|
||||
# It will eventually disappear
|
||||
assert_eq_with_retry(node_check, f"SELECT * FROM system.merges WHERE table = '{table_name}' and progress < 1", "\n")
|
||||
|
||||
finally:
|
||||
for node in nodes:
|
||||
node.query("DROP TABLE {name}".format(name=name))
|
||||
|
Loading…
Reference in New Issue
Block a user