ClickHouse/tests/integration/test_ttl_replicated/test.py
2021-06-27 19:18:15 +03:00

432 lines
20 KiB
Python

import time
import helpers.client as client
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV, exec_query_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True)
node2 = cluster.add_instance('node2', with_zookeeper=True)
node3 = cluster.add_instance('node3', with_zookeeper=True)
node4 = cluster.add_instance('node4', with_zookeeper=True, image='yandex/clickhouse-server', tag='20.12.4.5', stay_alive=True, with_installed_binary=True)
node5 = cluster.add_instance('node5', with_zookeeper=True, image='yandex/clickhouse-server', tag='20.12.4.5', stay_alive=True, with_installed_binary=True)
node6 = cluster.add_instance('node6', with_zookeeper=True, image='yandex/clickhouse-server', tag='20.12.4.5', stay_alive=True, with_installed_binary=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
except Exception as ex:
print(ex)
finally:
cluster.shutdown()
def drop_table(nodes, table_name):
for node in nodes:
node.query("DROP TABLE IF EXISTS {} NO DELAY".format(table_name))
# Column TTL works only with wide parts, because it's very expensive to apply it for compact parts
def test_ttl_columns(started_cluster):
drop_table([node1, node2], "test_ttl")
for node in [node1, node2]:
node.query(
'''
CREATE TABLE test_ttl(date DateTime, id UInt32, a Int32 TTL date + INTERVAL 1 DAY, b Int32 TTL date + INTERVAL 1 MONTH)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_columns', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date) SETTINGS merge_with_ttl_timeout=0, min_bytes_for_wide_part=0;
'''.format(replica=node.name))
node1.query("INSERT INTO test_ttl VALUES (toDateTime('2000-10-10 00:00:00'), 1, 1, 3)")
node1.query("INSERT INTO test_ttl VALUES (toDateTime('2000-10-11 10:00:00'), 2, 2, 4)")
time.sleep(1) # sleep to allow use ttl merge selector for second time
node1.query("OPTIMIZE TABLE test_ttl FINAL")
expected = "1\t0\t0\n2\t0\t0\n"
assert TSV(node1.query("SELECT id, a, b FROM test_ttl ORDER BY id")) == TSV(expected)
assert TSV(node2.query("SELECT id, a, b FROM test_ttl ORDER BY id")) == TSV(expected)
def test_merge_with_ttl_timeout(started_cluster):
table = "test_merge_with_ttl_timeout"
drop_table([node1, node2], table)
for node in [node1, node2]:
node.query(
'''
CREATE TABLE {table}(date DateTime, id UInt32, a Int32 TTL date + INTERVAL 1 DAY, b Int32 TTL date + INTERVAL 1 MONTH)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/{table}', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
SETTINGS min_bytes_for_wide_part=0;
'''.format(replica=node.name, table=table))
node1.query("SYSTEM STOP TTL MERGES {table}".format(table=table))
node2.query("SYSTEM STOP TTL MERGES {table}".format(table=table))
for i in range(1, 4):
node1.query(
"INSERT INTO {table} VALUES (toDateTime('2000-10-{day:02d} 10:00:00'), 1, 2, 3)".format(day=i, table=table))
assert node1.query("SELECT countIf(a = 0) FROM {table}".format(table=table)) == "0\n"
assert node2.query("SELECT countIf(a = 0) FROM {table}".format(table=table)) == "0\n"
node1.query("SYSTEM START TTL MERGES {table}".format(table=table))
node2.query("SYSTEM START TTL MERGES {table}".format(table=table))
time.sleep(15) # TTL merges shall happen.
for i in range(1, 4):
node1.query(
"INSERT INTO {table} VALUES (toDateTime('2000-10-{day:02d} 10:00:00'), 1, 2, 3)".format(day=i, table=table))
time.sleep(15) # TTL merges shall not happen.
assert node1.query("SELECT countIf(a = 0) FROM {table}".format(table=table)) == "3\n"
assert node2.query("SELECT countIf(a = 0) FROM {table}".format(table=table)) == "3\n"
def test_ttl_many_columns(started_cluster):
drop_table([node1, node2], "test_ttl_2")
for node in [node1, node2]:
node.query(
'''
CREATE TABLE test_ttl_2(date DateTime, id UInt32,
a Int32 TTL date,
_idx Int32 TTL date,
_offset Int32 TTL date,
_partition Int32 TTL date)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_2', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date) SETTINGS merge_with_ttl_timeout=0;
'''.format(replica=node.name))
node1.query("SYSTEM STOP TTL MERGES test_ttl_2")
node2.query("SYSTEM STOP TTL MERGES test_ttl_2")
node1.query("INSERT INTO test_ttl_2 VALUES (toDateTime('2000-10-10 00:00:00'), 1, 2, 3, 4, 5)")
node1.query("INSERT INTO test_ttl_2 VALUES (toDateTime('2100-10-10 10:00:00'), 6, 7, 8, 9, 10)")
node2.query("SYSTEM SYNC REPLICA test_ttl_2", timeout=5)
# Check that part will appear in result of merge
node1.query("SYSTEM STOP FETCHES test_ttl_2")
node2.query("SYSTEM STOP FETCHES test_ttl_2")
node1.query("SYSTEM START TTL MERGES test_ttl_2")
node2.query("SYSTEM START TTL MERGES test_ttl_2")
time.sleep(1) # sleep to allow use ttl merge selector for second time
node1.query("OPTIMIZE TABLE test_ttl_2 FINAL", timeout=5)
node2.query("SYSTEM SYNC REPLICA test_ttl_2", timeout=5)
expected = "1\t0\t0\t0\t0\n6\t7\t8\t9\t10\n"
assert TSV(node1.query("SELECT id, a, _idx, _offset, _partition FROM test_ttl_2 ORDER BY id")) == TSV(expected)
assert TSV(node2.query("SELECT id, a, _idx, _offset, _partition FROM test_ttl_2 ORDER BY id")) == TSV(expected)
@pytest.mark.parametrize("delete_suffix", [
"",
"DELETE",
])
def test_ttl_table(started_cluster, delete_suffix):
drop_table([node1, node2], "test_ttl")
for node in [node1, node2]:
node.query(
'''
CREATE TABLE test_ttl(date DateTime, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 1 DAY {delete_suffix} SETTINGS merge_with_ttl_timeout=0;
'''.format(replica=node.name, delete_suffix=delete_suffix))
node1.query("INSERT INTO test_ttl VALUES (toDateTime('2000-10-10 00:00:00'), 1)")
node1.query("INSERT INTO test_ttl VALUES (toDateTime('2000-10-11 10:00:00'), 2)")
time.sleep(1) # sleep to allow use ttl merge selector for second time
node1.query("OPTIMIZE TABLE test_ttl FINAL")
assert TSV(node1.query("SELECT * FROM test_ttl")) == TSV("")
assert TSV(node2.query("SELECT * FROM test_ttl")) == TSV("")
def test_modify_ttl(started_cluster):
drop_table([node1, node2], "test_ttl")
for node in [node1, node2]:
node.query(
'''
CREATE TABLE test_ttl(d DateTime, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_modify', '{replica}')
ORDER BY id
'''.format(replica=node.name))
node1.query(
"INSERT INTO test_ttl VALUES (now() - INTERVAL 5 HOUR, 1), (now() - INTERVAL 3 HOUR, 2), (now() - INTERVAL 1 HOUR, 3)")
node2.query("SYSTEM SYNC REPLICA test_ttl", timeout=20)
node1.query("ALTER TABLE test_ttl MODIFY TTL d + INTERVAL 4 HOUR SETTINGS mutations_sync = 2")
assert node2.query("SELECT id FROM test_ttl") == "2\n3\n"
node2.query("ALTER TABLE test_ttl MODIFY TTL d + INTERVAL 2 HOUR SETTINGS mutations_sync = 2")
assert node1.query("SELECT id FROM test_ttl") == "3\n"
node1.query("ALTER TABLE test_ttl MODIFY TTL d + INTERVAL 30 MINUTE SETTINGS mutations_sync = 2")
assert node2.query("SELECT id FROM test_ttl") == ""
def test_modify_column_ttl(started_cluster):
drop_table([node1, node2], "test_ttl")
for node in [node1, node2]:
node.query(
'''
CREATE TABLE test_ttl(d DateTime, id UInt32 DEFAULT 42)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_column', '{replica}')
ORDER BY d
'''.format(replica=node.name))
node1.query(
"INSERT INTO test_ttl VALUES (now() - INTERVAL 5 HOUR, 1), (now() - INTERVAL 3 HOUR, 2), (now() - INTERVAL 1 HOUR, 3)")
node2.query("SYSTEM SYNC REPLICA test_ttl", timeout=20)
node1.query("ALTER TABLE test_ttl MODIFY COLUMN id UInt32 TTL d + INTERVAL 4 HOUR SETTINGS mutations_sync = 2")
assert node2.query("SELECT id FROM test_ttl") == "42\n2\n3\n"
node1.query("ALTER TABLE test_ttl MODIFY COLUMN id UInt32 TTL d + INTERVAL 2 HOUR SETTINGS mutations_sync = 2")
assert node1.query("SELECT id FROM test_ttl") == "42\n42\n3\n"
node1.query("ALTER TABLE test_ttl MODIFY COLUMN id UInt32 TTL d + INTERVAL 30 MINUTE SETTINGS mutations_sync = 2")
assert node2.query("SELECT id FROM test_ttl") == "42\n42\n42\n"
def test_ttl_double_delete_rule_returns_error(started_cluster):
drop_table([node1, node2], "test_ttl")
try:
node1.query('''
CREATE TABLE test_ttl(date DateTime, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_double_delete', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 1 DAY, date + INTERVAL 2 DAY SETTINGS merge_with_ttl_timeout=0
'''.format(replica=node1.name))
assert False
except client.QueryRuntimeException:
pass
except:
assert False
def optimize_with_retry(node, table_name, retry=20):
for i in range(retry):
try:
node.query("OPTIMIZE TABLE {name} FINAL SETTINGS optimize_throw_if_noop = 1".format(name=table_name), settings={"optimize_throw_if_noop": "1"})
break
except e:
time.sleep(0.5)
@pytest.mark.parametrize("name,engine", [
pytest.param("test_ttl_alter_delete", "MergeTree()", id="test_ttl_alter_delete"),
pytest.param("test_replicated_ttl_alter_delete", "ReplicatedMergeTree('/clickhouse/test_replicated_ttl_alter_delete', '1')", id="test_ttl_alter_delete_replicated"),
])
def test_ttl_alter_delete(started_cluster, name, engine):
"""Copyright 2019, Altinity LTD
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
"""Check compatibility with old TTL delete expressions to make sure
that:
* alter modify of column's TTL delete expression works
* alter to add new columns works
* alter modify to add TTL delete expression to a a new column works
for a table that has TTL delete expression defined but
no explicit storage policy assigned.
"""
drop_table([node1], name)
node1.query(
"""
CREATE TABLE {name} (
s1 String,
d1 DateTime
) ENGINE = {engine}
ORDER BY tuple()
TTL d1 + INTERVAL 1 DAY DELETE
SETTINGS min_bytes_for_wide_part=0
""".format(name=name, engine=engine))
node1.query("""ALTER TABLE {name} MODIFY COLUMN s1 String TTL d1 + INTERVAL 1 SECOND""".format(name=name))
node1.query("""ALTER TABLE {name} ADD COLUMN b1 Int32""".format(name=name))
node1.query("""INSERT INTO {name} (s1, b1, d1) VALUES ('hello1', 1, toDateTime({time}))""".format(name=name,
time=time.time()))
node1.query("""INSERT INTO {name} (s1, b1, d1) VALUES ('hello2', 2, toDateTime({time}))""".format(name=name,
time=time.time() + 360))
time.sleep(1)
optimize_with_retry(node1, name)
r = node1.query("SELECT s1, b1 FROM {name} ORDER BY b1, s1".format(name=name)).splitlines()
assert r == ["\t1", "hello2\t2"]
node1.query("""ALTER TABLE {name} MODIFY COLUMN b1 Int32 TTL d1""".format(name=name))
node1.query("""INSERT INTO {name} (s1, b1, d1) VALUES ('hello3', 3, toDateTime({time}))""".format(name=name,
time=time.time()))
time.sleep(1)
optimize_with_retry(node1, name)
r = node1.query("SELECT s1, b1 FROM {name} ORDER BY b1, s1".format(name=name)).splitlines()
assert r == ["\t0", "\t0", "hello2\t2"]
def test_ttl_empty_parts(started_cluster):
drop_table([node1, node2], "test_ttl_empty_parts")
for node in [node1, node2]:
node.query(
'''
CREATE TABLE test_ttl_empty_parts(date Date, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_empty_parts', '{replica}')
ORDER BY id
SETTINGS max_bytes_to_merge_at_min_space_in_pool = 1, max_bytes_to_merge_at_max_space_in_pool = 1,
cleanup_delay_period = 1, cleanup_delay_period_random_add = 0
'''.format(replica=node.name))
for i in range (1, 7):
node1.query("INSERT INTO test_ttl_empty_parts SELECT '2{}00-01-0{}', number FROM numbers(1000)".format(i % 2, i))
assert node1.query("SELECT count() FROM test_ttl_empty_parts") == "6000\n"
assert node1.query("SELECT name FROM system.parts WHERE table = 'test_ttl_empty_parts' AND active ORDER BY name") == \
"all_0_0_0\nall_1_1_0\nall_2_2_0\nall_3_3_0\nall_4_4_0\nall_5_5_0\n"
node1.query("ALTER TABLE test_ttl_empty_parts MODIFY TTL date")
assert node1.query("SELECT count() FROM test_ttl_empty_parts") == "3000\n"
time.sleep(3) # Wait for cleanup thread
assert node1.query("SELECT name FROM system.parts WHERE table = 'test_ttl_empty_parts' AND active ORDER BY name") == \
"all_0_0_0_6\nall_2_2_0_6\nall_4_4_0_6\n"
for node in [node1, node2]:
node.query("ALTER TABLE test_ttl_empty_parts MODIFY SETTING max_bytes_to_merge_at_min_space_in_pool = 1000000000")
node.query("ALTER TABLE test_ttl_empty_parts MODIFY SETTING max_bytes_to_merge_at_max_space_in_pool = 1000000000")
optimize_with_retry(node1, 'test_ttl_empty_parts')
assert node1.query("SELECT name FROM system.parts WHERE table = 'test_ttl_empty_parts' AND active ORDER BY name") == "all_0_4_1_6\n"
# Check that after removing empty parts mutations and merges works
node1.query("INSERT INTO test_ttl_empty_parts SELECT '2100-01-20', number FROM numbers(1000)")
node1.query("ALTER TABLE test_ttl_empty_parts DELETE WHERE id % 2 = 0 SETTINGS mutations_sync = 2")
assert node1.query("SELECT count() FROM test_ttl_empty_parts") == "2000\n"
optimize_with_retry(node1, 'test_ttl_empty_parts')
assert node1.query("SELECT name FROM system.parts WHERE table = 'test_ttl_empty_parts' AND active ORDER BY name") == "all_0_7_2_8\n"
node2.query('SYSTEM SYNC REPLICA test_ttl_empty_parts', timeout=20)
error_msg = '<Error> default.test_ttl_empty_parts (ReplicatedMergeTreeCleanupThread)'
assert not node1.contains_in_log(error_msg)
assert not node2.contains_in_log(error_msg)
@pytest.mark.parametrize(
('node_left', 'node_right', 'num_run'),
[(node1, node2, 0), (node3, node4, 1), (node5, node6, 2)]
)
def test_ttl_compatibility(started_cluster, node_left, node_right, num_run):
drop_table([node_left, node_right], "test_ttl_delete")
drop_table([node_left, node_right], "test_ttl_group_by")
drop_table([node_left, node_right], "test_ttl_where")
for node in [node_left, node_right]:
node.query(
'''
CREATE TABLE test_ttl_delete(date DateTime, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_delete_{suff}', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 3 SECOND
SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100
'''.format(suff=num_run, replica=node.name))
node.query(
'''
CREATE TABLE test_ttl_group_by(date DateTime, id UInt32, val UInt64)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_group_by_{suff}', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 3 SECOND GROUP BY id SET val = sum(val)
SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100
'''.format(suff=num_run, replica=node.name))
node.query(
'''
CREATE TABLE test_ttl_where(date DateTime, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl_where_{suff}', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 3 SECOND DELETE WHERE id % 2 = 1
SETTINGS max_number_of_merges_with_ttl_in_pool=100, max_replicated_merges_with_ttl_in_queue=100
'''.format(suff=num_run, replica=node.name))
node_left.query("INSERT INTO test_ttl_delete VALUES (now(), 1)")
node_left.query("INSERT INTO test_ttl_delete VALUES (toDateTime('2100-10-11 10:00:00'), 2)")
node_right.query("INSERT INTO test_ttl_delete VALUES (now(), 3)")
node_right.query("INSERT INTO test_ttl_delete VALUES (toDateTime('2100-10-11 10:00:00'), 4)")
node_left.query("INSERT INTO test_ttl_group_by VALUES (now(), 0, 1)")
node_left.query("INSERT INTO test_ttl_group_by VALUES (now(), 0, 2)")
node_right.query("INSERT INTO test_ttl_group_by VALUES (now(), 0, 3)")
node_right.query("INSERT INTO test_ttl_group_by VALUES (now(), 0, 4)")
node_left.query("INSERT INTO test_ttl_where VALUES (now(), 1)")
node_left.query("INSERT INTO test_ttl_where VALUES (now(), 2)")
node_right.query("INSERT INTO test_ttl_where VALUES (now(), 3)")
node_right.query("INSERT INTO test_ttl_where VALUES (now(), 4)")
if node_left.with_installed_binary:
node_left.restart_with_latest_version()
if node_right.with_installed_binary:
node_right.restart_with_latest_version()
time.sleep(5) # Wait for TTL
# after restart table can be in readonly mode
exec_query_with_retry(node_right, "OPTIMIZE TABLE test_ttl_delete FINAL")
node_right.query("OPTIMIZE TABLE test_ttl_group_by FINAL")
node_right.query("OPTIMIZE TABLE test_ttl_where FINAL")
exec_query_with_retry(node_left, "OPTIMIZE TABLE test_ttl_delete FINAL")
node_left.query("OPTIMIZE TABLE test_ttl_group_by FINAL", timeout=20)
node_left.query("OPTIMIZE TABLE test_ttl_where FINAL", timeout=20)
# After OPTIMIZE TABLE, it is not guaranteed that everything is merged.
# Possible scenario (for test_ttl_group_by):
# 1. Two independent merges assigned: [0_0, 1_1] -> 0_1 and [2_2, 3_3] -> 2_3
# 2. Another one merge assigned: [0_1, 2_3] -> 0_3
# 3. Merge to 0_3 is delayed:
# `Not executing log entry for part 0_3 because 2 merges with TTL already executing, maximum 2
# 4. OPTIMIZE FINAL does nothing, cause there is an entry for 0_3
#
# So, let's also sync replicas for node_right (for now).
exec_query_with_retry(node_right, "SYSTEM SYNC REPLICA test_ttl_delete")
node_right.query("SYSTEM SYNC REPLICA test_ttl_group_by", timeout=20)
node_right.query("SYSTEM SYNC REPLICA test_ttl_where", timeout=20)
exec_query_with_retry(node_left, "SYSTEM SYNC REPLICA test_ttl_delete")
node_left.query("SYSTEM SYNC REPLICA test_ttl_group_by", timeout=20)
node_left.query("SYSTEM SYNC REPLICA test_ttl_where", timeout=20)
assert node_left.query("SELECT id FROM test_ttl_delete ORDER BY id") == "2\n4\n"
assert node_right.query("SELECT id FROM test_ttl_delete ORDER BY id") == "2\n4\n"
assert node_left.query("SELECT val FROM test_ttl_group_by ORDER BY id") == "10\n"
assert node_right.query("SELECT val FROM test_ttl_group_by ORDER BY id") == "10\n"
assert node_left.query("SELECT id FROM test_ttl_where ORDER BY id") == "2\n4\n"
assert node_right.query("SELECT id FROM test_ttl_where ORDER BY id") == "2\n4\n"