2020-09-03 13:02:24 +00:00
|
|
|
import time
|
2021-10-19 17:39:44 +00:00
|
|
|
import logging
|
2020-09-03 13:02:24 +00:00
|
|
|
|
2020-09-16 04:26:10 +00:00
|
|
|
import pytest
|
2020-09-03 13:02:24 +00:00
|
|
|
from helpers.cluster import ClickHouseCluster
|
2021-01-26 15:12:08 +00:00
|
|
|
from helpers.test_tools import assert_eq_with_retry, TSV
|
2020-09-03 13:02:24 +00:00
|
|
|
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
2021-06-29 13:01:15 +00:00
|
|
|
node1 = cluster.add_instance('node1', main_configs=['configs/fast_background_pool.xml'], with_zookeeper=True)
|
|
|
|
node2 = cluster.add_instance('node2', main_configs=['configs/fast_background_pool.xml'], with_zookeeper=True)
|
2020-09-03 13:02:24 +00:00
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
|
|
def started_cluster():
|
|
|
|
try:
|
|
|
|
cluster.start()
|
|
|
|
yield cluster
|
|
|
|
|
|
|
|
finally:
|
|
|
|
cluster.shutdown()
|
|
|
|
|
|
|
|
|
|
|
|
def count_ttl_merges_in_queue(node, table):
|
2020-09-16 04:26:10 +00:00
|
|
|
result = node.query(
|
2021-10-19 17:39:44 +00:00
|
|
|
f"SELECT count() FROM system.replication_queue WHERE merge_type = 'TTL_DELETE' and table = '{table}'")
|
2020-09-03 13:02:24 +00:00
|
|
|
if not result:
|
|
|
|
return 0
|
|
|
|
return int(result.strip())
|
|
|
|
|
|
|
|
|
2021-01-26 15:12:08 +00:00
|
|
|
def count_ttl_merges_in_background_pool(node, table, level):
|
|
|
|
result = TSV(node.query(
|
2021-10-19 17:39:44 +00:00
|
|
|
f"SELECT * FROM system.merges WHERE merge_type = 'TTL_DELETE' and table = '{table}'"))
|
2021-01-26 15:12:08 +00:00
|
|
|
count = len(result)
|
|
|
|
if count >= level:
|
2021-10-19 17:39:44 +00:00
|
|
|
logging.debug(f"count_ttl_merges_in_background_pool: merges more than warn level:\n{result}")
|
2021-01-26 15:12:08 +00:00
|
|
|
return count
|
2020-09-03 13:02:24 +00:00
|
|
|
|
|
|
|
|
|
|
|
def count_regular_merges_in_background_pool(node, table):
|
2021-10-19 17:39:44 +00:00
|
|
|
result = node.query(f"SELECT count() FROM system.merges WHERE merge_type = 'REGULAR' and table = '{table}'")
|
2020-09-03 13:02:24 +00:00
|
|
|
if not result:
|
|
|
|
return 0
|
|
|
|
return int(result.strip())
|
|
|
|
|
|
|
|
|
|
|
|
def count_running_mutations(node, table):
|
2021-10-19 17:39:44 +00:00
|
|
|
result = node.query(f"SELECT count() FROM system.merges WHERE table = '{table}' and is_mutation=1")
|
2020-09-03 13:02:24 +00:00
|
|
|
if not result:
|
|
|
|
return 0
|
|
|
|
return int(result.strip())
|
|
|
|
|
|
|
|
|
2020-09-03 13:29:18 +00:00
|
|
|
# This test was introduced to check concurrency for TTLs merges and mutations
|
|
|
|
# but it revealed a bug when we assign different merges to the same part
|
|
|
|
# on the borders of partitions.
|
2020-09-03 13:02:24 +00:00
|
|
|
def test_no_ttl_merges_in_busy_pool(started_cluster):
|
2020-09-16 04:26:10 +00:00
|
|
|
node1.query(
|
|
|
|
"CREATE TABLE test_ttl (d DateTime, key UInt64, data UInt64) ENGINE = MergeTree() ORDER BY tuple() PARTITION BY key TTL d + INTERVAL 1 MONTH SETTINGS merge_with_ttl_timeout = 0, number_of_free_entries_in_pool_to_execute_mutation = 0")
|
2020-09-03 13:02:24 +00:00
|
|
|
|
|
|
|
node1.query("SYSTEM STOP TTL MERGES")
|
|
|
|
|
|
|
|
for i in range(1, 7):
|
2020-09-16 04:26:10 +00:00
|
|
|
node1.query(
|
2021-10-19 17:39:44 +00:00
|
|
|
f"INSERT INTO test_ttl SELECT now() - INTERVAL 1 MONTH + number - 1, {i}, number FROM numbers(5)")
|
2020-09-03 13:02:24 +00:00
|
|
|
|
|
|
|
node1.query("ALTER TABLE test_ttl UPDATE data = data + 1 WHERE sleepEachRow(1) = 0")
|
|
|
|
|
|
|
|
while count_running_mutations(node1, "test_ttl") < 6:
|
2021-10-19 17:39:44 +00:00
|
|
|
logging.debug(f"Mutations count {count_running_mutations(node1, 'test_ttl')}")
|
2021-01-26 15:12:08 +00:00
|
|
|
assert count_ttl_merges_in_background_pool(node1, "test_ttl", 1) == 0
|
2020-09-03 13:02:24 +00:00
|
|
|
time.sleep(0.5)
|
|
|
|
|
|
|
|
node1.query("SYSTEM START TTL MERGES")
|
|
|
|
|
2020-09-15 14:53:55 +00:00
|
|
|
rows_count = []
|
2020-09-03 13:02:24 +00:00
|
|
|
while count_running_mutations(node1, "test_ttl") == 6:
|
2021-10-19 17:39:44 +00:00
|
|
|
logging.debug(f"Mutations count after start TTL{count_running_mutations(node1, 'test_ttl')}")
|
2020-09-15 14:53:55 +00:00
|
|
|
rows_count.append(int(node1.query("SELECT count() FROM test_ttl").strip()))
|
2020-09-03 13:02:24 +00:00
|
|
|
time.sleep(0.5)
|
|
|
|
|
2020-09-15 14:53:55 +00:00
|
|
|
# at least several seconds we didn't run any TTL merges and rows count equal
|
|
|
|
# to the original value
|
|
|
|
assert sum([1 for count in rows_count if count == 30]) > 4
|
|
|
|
|
2020-09-03 13:02:24 +00:00
|
|
|
assert_eq_with_retry(node1, "SELECT COUNT() FROM test_ttl", "0")
|
2021-10-19 17:39:44 +00:00
|
|
|
node1.query("DROP TABLE test_ttl SYNC")
|
2020-09-04 10:08:09 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_limited_ttl_merges_in_empty_pool(started_cluster):
|
2020-09-16 04:26:10 +00:00
|
|
|
node1.query(
|
|
|
|
"CREATE TABLE test_ttl_v2 (d DateTime, key UInt64, data UInt64) ENGINE = MergeTree() ORDER BY tuple() PARTITION BY key TTL d + INTERVAL 1 MONTH SETTINGS merge_with_ttl_timeout = 0")
|
2020-09-04 10:08:09 +00:00
|
|
|
|
|
|
|
node1.query("SYSTEM STOP TTL MERGES")
|
|
|
|
|
|
|
|
for i in range(100):
|
2021-10-19 17:39:44 +00:00
|
|
|
node1.query(f"INSERT INTO test_ttl_v2 SELECT now() - INTERVAL 1 MONTH, {i}, number FROM numbers(1)")
|
2020-09-04 10:08:09 +00:00
|
|
|
|
|
|
|
assert node1.query("SELECT COUNT() FROM test_ttl_v2") == "100\n"
|
|
|
|
|
|
|
|
node1.query("SYSTEM START TTL MERGES")
|
|
|
|
|
|
|
|
merges_with_ttl_count = set({})
|
|
|
|
while True:
|
2021-01-26 15:12:08 +00:00
|
|
|
merges_with_ttl_count.add(count_ttl_merges_in_background_pool(node1, "test_ttl_v2", 3))
|
2020-09-04 10:08:09 +00:00
|
|
|
time.sleep(0.01)
|
|
|
|
if node1.query("SELECT COUNT() FROM test_ttl_v2") == "0\n":
|
|
|
|
break
|
|
|
|
|
|
|
|
assert max(merges_with_ttl_count) <= 2
|
2021-10-19 17:39:44 +00:00
|
|
|
node1.query("DROP TABLE test_ttl_v2 SYNC")
|
2020-09-04 10:08:09 +00:00
|
|
|
|
|
|
|
|
|
|
|
def test_limited_ttl_merges_in_empty_pool_replicated(started_cluster):
|
2020-09-16 04:26:10 +00:00
|
|
|
node1.query(
|
|
|
|
"CREATE TABLE replicated_ttl (d DateTime, key UInt64, data UInt64) ENGINE = ReplicatedMergeTree('/test/t', '1') ORDER BY tuple() PARTITION BY key TTL d + INTERVAL 1 MONTH SETTINGS merge_with_ttl_timeout = 0")
|
2020-09-04 10:08:09 +00:00
|
|
|
|
|
|
|
node1.query("SYSTEM STOP TTL MERGES")
|
|
|
|
|
|
|
|
for i in range(100):
|
2021-10-19 17:39:44 +00:00
|
|
|
node1.query_with_retry(f"INSERT INTO replicated_ttl SELECT now() - INTERVAL 1 MONTH, {i}, number FROM numbers(1)")
|
2020-09-04 10:08:09 +00:00
|
|
|
|
|
|
|
assert node1.query("SELECT COUNT() FROM replicated_ttl") == "100\n"
|
|
|
|
|
|
|
|
node1.query("SYSTEM START TTL MERGES")
|
|
|
|
|
|
|
|
merges_with_ttl_count = set({})
|
|
|
|
entries_with_ttl_count = set({})
|
|
|
|
while True:
|
2021-01-26 15:12:08 +00:00
|
|
|
merges_with_ttl_count.add(count_ttl_merges_in_background_pool(node1, "replicated_ttl", 3))
|
2020-09-04 10:08:09 +00:00
|
|
|
entries_with_ttl_count.add(count_ttl_merges_in_queue(node1, "replicated_ttl"))
|
|
|
|
time.sleep(0.01)
|
|
|
|
if node1.query("SELECT COUNT() FROM replicated_ttl") == "0\n":
|
|
|
|
break
|
|
|
|
|
|
|
|
assert max(merges_with_ttl_count) <= 2
|
|
|
|
assert max(entries_with_ttl_count) <= 1
|
|
|
|
|
2021-10-19 17:39:44 +00:00
|
|
|
node1.query("DROP TABLE replicated_ttl SYNC")
|
|
|
|
|
2020-09-04 10:08:09 +00:00
|
|
|
|
|
|
|
def test_limited_ttl_merges_two_replicas(started_cluster):
|
|
|
|
# Actually this test quite fast and often we cannot catch any merges.
|
2020-09-16 04:26:10 +00:00
|
|
|
node1.query(
|
|
|
|
"CREATE TABLE replicated_ttl_2 (d DateTime, key UInt64, data UInt64) ENGINE = ReplicatedMergeTree('/test/t2', '1') ORDER BY tuple() PARTITION BY key TTL d + INTERVAL 1 MONTH SETTINGS merge_with_ttl_timeout = 0")
|
|
|
|
node2.query(
|
|
|
|
"CREATE TABLE replicated_ttl_2 (d DateTime, key UInt64, data UInt64) ENGINE = ReplicatedMergeTree('/test/t2', '2') ORDER BY tuple() PARTITION BY key TTL d + INTERVAL 1 MONTH SETTINGS merge_with_ttl_timeout = 0")
|
2020-09-04 10:08:09 +00:00
|
|
|
|
|
|
|
node1.query("SYSTEM STOP TTL MERGES")
|
|
|
|
node2.query("SYSTEM STOP TTL MERGES")
|
|
|
|
|
|
|
|
for i in range(100):
|
2021-04-07 12:22:53 +00:00
|
|
|
node1.query_with_retry(
|
2021-10-19 17:39:44 +00:00
|
|
|
f"INSERT INTO replicated_ttl_2 SELECT now() - INTERVAL 1 MONTH, {i}, number FROM numbers(10000)")
|
2020-09-04 10:08:09 +00:00
|
|
|
|
|
|
|
node2.query("SYSTEM SYNC REPLICA replicated_ttl_2", timeout=10)
|
|
|
|
assert node1.query("SELECT COUNT() FROM replicated_ttl_2") == "1000000\n"
|
|
|
|
assert node2.query("SELECT COUNT() FROM replicated_ttl_2") == "1000000\n"
|
|
|
|
|
|
|
|
node1.query("SYSTEM START TTL MERGES")
|
|
|
|
node2.query("SYSTEM START TTL MERGES")
|
|
|
|
|
|
|
|
merges_with_ttl_count_node1 = set({})
|
|
|
|
merges_with_ttl_count_node2 = set({})
|
|
|
|
while True:
|
2021-01-26 16:33:37 +00:00
|
|
|
merges_with_ttl_count_node1.add(count_ttl_merges_in_background_pool(node1, "replicated_ttl_2", 3))
|
|
|
|
merges_with_ttl_count_node2.add(count_ttl_merges_in_background_pool(node2, "replicated_ttl_2", 3))
|
2020-09-16 04:26:10 +00:00
|
|
|
if node1.query("SELECT COUNT() FROM replicated_ttl_2") == "0\n" and node2.query(
|
|
|
|
"SELECT COUNT() FROM replicated_ttl_2") == "0\n":
|
2020-09-04 10:08:09 +00:00
|
|
|
break
|
|
|
|
|
|
|
|
# Both replicas can assign merges with TTL. If one will perform better than
|
|
|
|
# the other slow replica may have several merges in queue, so we don't
|
|
|
|
# check them
|
|
|
|
assert max(merges_with_ttl_count_node1) <= 2
|
|
|
|
assert max(merges_with_ttl_count_node2) <= 2
|
2021-10-14 14:47:44 +00:00
|
|
|
|
|
|
|
node1.query("DROP TABLE replicated_ttl_2 SYNC")
|
|
|
|
node2.query("DROP TABLE replicated_ttl_2 SYNC")
|