This commit is contained in:
alesapin 2020-06-08 14:25:30 +03:00
parent 174b687a44
commit 483c60fadd

View File

@ -61,7 +61,7 @@ def get_used_disks_for_table(node, table_name, partition=None):
def check_used_disks_with_retry(node, table_name, expected_disks, retries): def check_used_disks_with_retry(node, table_name, expected_disks, retries):
for _ in range(retries): for _ in range(retries):
used_disks = get_used_disks_for_table(node, table_name) used_disks = get_used_disks_for_table(node, table_name)
if set(used_disks) == expected_disks: if set(used_disks).issubset(expected_disks):
return True return True
time.sleep(0.5) time.sleep(0.5)
return False return False
@ -830,7 +830,8 @@ def test_concurrent_alter_with_ttl_move(started_cluster, name, engine):
def optimize_table(num): def optimize_table(num):
for i in range(num): for i in range(num):
try: # optimize may throw after concurrent alter try: # optimize may throw after concurrent alter
node1.query("OPTIMIZE TABLE {} FINAL".format(name)) node1.query("OPTIMIZE TABLE {} FINAL".format(name), settings={'optimize_throw_if_noop': '1'})
break
except: except:
pass pass
@ -903,3 +904,93 @@ def test_double_move_while_select(started_cluster, name, positive):
finally: finally:
node1.query("DROP TABLE IF EXISTS {name}".format(name=name)) node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
@pytest.mark.parametrize("name,engine,positive", [
("mt_test_alter_with_merge_do_not_work","MergeTree()",0),
("replicated_mt_test_alter_with_merge_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_alter_with_merge_do_not_work', '1')",0),
("mt_test_alter_with_merge_work","MergeTree()",1),
("replicated_mt_test_alter_with_merge_work","ReplicatedMergeTree('/clickhouse/replicated_test_alter_with_merge_work', '1')",1),
])
def test_alter_with_merge_work(started_cluster, name, engine, positive):
"""Copyright 2019, Altinity LTD
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
"""Check that TTL expressions are re-evaluated for
existing parts after ALTER command changes TTL expressions
and parts are merged.
"""
try:
node1.query("""
CREATE TABLE {name} (
s1 String,
d1 DateTime
) ENGINE = {engine}
ORDER BY tuple()
TTL d1 + INTERVAL 3000 SECOND TO DISK 'jbod2',
d1 + INTERVAL 6000 SECOND TO VOLUME 'external'
SETTINGS storage_policy='jbods_with_external', merge_with_ttl_timeout=0
""".format(name=name, engine=engine))
def optimize_table(num):
for i in range(num):
try: # optimize may throw after concurrent alter
node1.query("OPTIMIZE TABLE {} FINAL".format(name), settings={'optimize_throw_if_noop': '1'})
break
except:
pass
for p in range(3):
data = [] # 6MB in total
now = time.time()
for i in range(2):
s1 = get_random_string(1024 * 1024) # 1MB
d1 = now - 1 if positive else now + 300
data.append("('{}', toDateTime({}))".format(s1, d1))
values = ",".join(data)
node1.query("INSERT INTO {name} (s1, d1) VALUES {values}".format(name=name, values=values))
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod1", "jbod2"}
node1.query("SELECT count() FROM {name}".format(name=name)).splitlines() == ["6"]
node1.query("""
ALTER TABLE {name} MODIFY
TTL d1 + INTERVAL 0 SECOND TO DISK 'jbod2',
d1 + INTERVAL 5 SECOND TO VOLUME 'external',
d1 + INTERVAL 10 SECOND DELETE
""".format(name=name))
optimize_table(20)
assert node1.query("SELECT count() FROM system.parts WHERE table = '{name}' AND active = 1".format(name=name)) == "1\n"
time.sleep(5)
optimize_table(20)
if positive:
assert check_used_disks_with_retry(node1, name, set(["external"]), 50)
else:
assert check_used_disks_with_retry(node1, name, set(["jbod1", "jbod2"]), 50)
time.sleep(5)
optimize_table(20)
if positive:
assert node1.query("SELECT count() FROM {name}".format(name=name)) == "0\n"
else:
assert node1.query("SELECT count() FROM {name}".format(name=name)) == "6\n"
finally:
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))