mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Merge pull request #16235 from excitoon-favorites/fixflappyafterasyncdrops
Fixed flappy `test_multiple_disks`
This commit is contained in:
commit
cc5f15da29
@ -286,7 +286,7 @@ def test_query_parser(start_cluster):
|
|||||||
node1.query(
|
node1.query(
|
||||||
"ALTER TABLE table_with_normal_policy MODIFY SETTING storage_policy='moving_jbod_with_external'")
|
"ALTER TABLE table_with_normal_policy MODIFY SETTING storage_policy='moving_jbod_with_external'")
|
||||||
finally:
|
finally:
|
||||||
node1.query("DROP TABLE IF EXISTS table_with_normal_policy")
|
node1.query("DROP TABLE IF EXISTS table_with_normal_policy SYNC")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("name,engine", [
|
@pytest.mark.parametrize("name,engine", [
|
||||||
@ -327,7 +327,7 @@ def test_alter_policy(start_cluster, name, engine):
|
|||||||
name=name)) == "jbods_with_external\n"
|
name=name)) == "jbods_with_external\n"
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
||||||
|
|
||||||
|
|
||||||
def get_random_string(length):
|
def get_random_string(length):
|
||||||
@ -355,9 +355,7 @@ def test_no_warning_about_zero_max_data_part_size(start_cluster):
|
|||||||
ORDER BY tuple()
|
ORDER BY tuple()
|
||||||
SETTINGS storage_policy='small_jbod_with_external'
|
SETTINGS storage_policy='small_jbod_with_external'
|
||||||
""")
|
""")
|
||||||
node.query("""
|
node.query("DROP TABLE default.test_warning_table SYNC")
|
||||||
DROP TABLE default.test_warning_table
|
|
||||||
""")
|
|
||||||
log = get_log(node)
|
log = get_log(node)
|
||||||
assert not re.search("Warning.*Volume.*special_warning_zero_volume", log)
|
assert not re.search("Warning.*Volume.*special_warning_zero_volume", log)
|
||||||
assert not re.search("Warning.*Volume.*special_warning_default_volume", log)
|
assert not re.search("Warning.*Volume.*special_warning_default_volume", log)
|
||||||
@ -398,7 +396,7 @@ def test_round_robin(start_cluster, name, engine):
|
|||||||
assert used_disks[0] != used_disks[1]
|
assert used_disks[0] != used_disks[1]
|
||||||
assert used_disks[2] == used_disks[0]
|
assert used_disks[2] == used_disks[0]
|
||||||
finally:
|
finally:
|
||||||
node1.query("DROP TABLE IF EXISTS {}".format(name))
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("name,engine", [
|
@pytest.mark.parametrize("name,engine", [
|
||||||
@ -425,7 +423,7 @@ def test_max_data_part_size(start_cluster, name, engine):
|
|||||||
assert len(used_disks) == 1
|
assert len(used_disks) == 1
|
||||||
assert used_disks[0] == 'external'
|
assert used_disks[0] == 'external'
|
||||||
finally:
|
finally:
|
||||||
node1.query("DROP TABLE IF EXISTS {}".format(name))
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("name,engine", [
|
@pytest.mark.parametrize("name,engine", [
|
||||||
@ -451,8 +449,11 @@ def test_jbod_overflow(start_cluster, name, engine):
|
|||||||
data.append(get_random_string(1024 * 1024)) # 1MB row
|
data.append(get_random_string(1024 * 1024)) # 1MB row
|
||||||
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
||||||
|
|
||||||
|
for p in ("/jbod1", "/jbod2", "/external"):
|
||||||
|
print(node1.exec_in_container([f"bash", "-c", f"find {p} | xargs -n1 du -sh"]))
|
||||||
|
|
||||||
used_disks = get_used_disks_for_table(node1, name)
|
used_disks = get_used_disks_for_table(node1, name)
|
||||||
assert all(disk == 'jbod1' for disk in used_disks)
|
assert set(used_disks) == {'jbod1'}
|
||||||
|
|
||||||
# should go to the external disk (jbod is overflown)
|
# should go to the external disk (jbod is overflown)
|
||||||
data = [] # 10MB in total
|
data = [] # 10MB in total
|
||||||
@ -461,6 +462,9 @@ def test_jbod_overflow(start_cluster, name, engine):
|
|||||||
|
|
||||||
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
node1.query("INSERT INTO {} VALUES {}".format(name, ','.join(["('" + x + "')" for x in data])))
|
||||||
|
|
||||||
|
for p in ("/jbod1", "/jbod2", "/external"):
|
||||||
|
print(node1.exec_in_container([f"bash", "-c", f"find {p} | xargs -n1 du -sh"]))
|
||||||
|
|
||||||
used_disks = get_used_disks_for_table(node1, name)
|
used_disks = get_used_disks_for_table(node1, name)
|
||||||
|
|
||||||
assert used_disks[-1] == 'external'
|
assert used_disks[-1] == 'external'
|
||||||
@ -478,7 +482,7 @@ def test_jbod_overflow(start_cluster, name, engine):
|
|||||||
assert all(disk == 'external' for disk in disks_for_merges)
|
assert all(disk == 'external' for disk in disks_for_merges)
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
node1.query("DROP TABLE IF EXISTS {}".format(name))
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("name,engine", [
|
@pytest.mark.parametrize("name,engine", [
|
||||||
@ -495,6 +499,8 @@ def test_background_move(start_cluster, name, engine):
|
|||||||
SETTINGS storage_policy='moving_jbod_with_external'
|
SETTINGS storage_policy='moving_jbod_with_external'
|
||||||
""".format(name=name, engine=engine))
|
""".format(name=name, engine=engine))
|
||||||
|
|
||||||
|
node1.query(f"SYSTEM START MERGES {name}")
|
||||||
|
|
||||||
for i in range(5):
|
for i in range(5):
|
||||||
data = [] # 5MB in total
|
data = [] # 5MB in total
|
||||||
for i in range(5):
|
for i in range(5):
|
||||||
@ -523,8 +529,10 @@ def test_background_move(start_cluster, name, engine):
|
|||||||
# first (oldest) part was moved to external
|
# first (oldest) part was moved to external
|
||||||
assert path.startswith("/external")
|
assert path.startswith("/external")
|
||||||
|
|
||||||
|
node1.query(f"SYSTEM START MERGES {name}")
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("name,engine", [
|
@pytest.mark.parametrize("name,engine", [
|
||||||
@ -611,7 +619,7 @@ def test_start_stop_moves(start_cluster, name, engine):
|
|||||||
assert used_disks[0] == 'external'
|
assert used_disks[0] == 'external'
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
||||||
|
|
||||||
|
|
||||||
def get_path_for_part_from_part_log(node, table, part_name):
|
def get_path_for_part_from_part_log(node, table, part_name):
|
||||||
@ -699,7 +707,7 @@ def test_alter_move(start_cluster, name, engine):
|
|||||||
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "4\n"
|
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "4\n"
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("volume_or_disk", [
|
@pytest.mark.parametrize("volume_or_disk", [
|
||||||
@ -748,7 +756,7 @@ def test_alter_move_half_of_partition(start_cluster, volume_or_disk):
|
|||||||
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "2\n"
|
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "2\n"
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("volume_or_disk", [
|
@pytest.mark.parametrize("volume_or_disk", [
|
||||||
@ -792,7 +800,7 @@ def test_alter_double_move_partition(start_cluster, volume_or_disk):
|
|||||||
volume_or_disk=volume_or_disk))
|
volume_or_disk=volume_or_disk))
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
||||||
|
|
||||||
|
|
||||||
def produce_alter_move(node, name):
|
def produce_alter_move(node, name):
|
||||||
@ -876,7 +884,7 @@ def test_concurrent_alter_move(start_cluster, name, engine):
|
|||||||
assert node1.query("SELECT 1") == "1\n"
|
assert node1.query("SELECT 1") == "1\n"
|
||||||
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "500\n"
|
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "500\n"
|
||||||
finally:
|
finally:
|
||||||
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("name,engine", [
|
@pytest.mark.parametrize("name,engine", [
|
||||||
@ -929,7 +937,7 @@ def test_concurrent_alter_move_and_drop(start_cluster, name, engine):
|
|||||||
assert node1.query("SELECT 1") == "1\n"
|
assert node1.query("SELECT 1") == "1\n"
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("name,engine", [
|
@pytest.mark.parametrize("name,engine", [
|
||||||
@ -960,7 +968,7 @@ def test_detach_attach(start_cluster, name, engine):
|
|||||||
assert node1.query("SELECT count() FROM {}".format(name)).strip() == "5"
|
assert node1.query("SELECT count() FROM {}".format(name)).strip() == "5"
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("name,engine", [
|
@pytest.mark.parametrize("name,engine", [
|
||||||
@ -1006,7 +1014,7 @@ def test_mutate_to_another_disk(start_cluster, name, engine):
|
|||||||
|
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("name,engine", [
|
@pytest.mark.parametrize("name,engine", [
|
||||||
@ -1064,7 +1072,7 @@ def test_concurrent_alter_modify(start_cluster, name, engine):
|
|||||||
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "100\n"
|
assert node1.query("SELECT COUNT() FROM {}".format(name)) == "100\n"
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
||||||
|
|
||||||
|
|
||||||
def test_simple_replication_and_moves(start_cluster):
|
def test_simple_replication_and_moves(start_cluster):
|
||||||
@ -1131,7 +1139,7 @@ def test_simple_replication_and_moves(start_cluster):
|
|||||||
set(disks2) == set(["jbod1", "external"])
|
set(disks2) == set(["jbod1", "external"])
|
||||||
finally:
|
finally:
|
||||||
for node in [node1, node2]:
|
for node in [node1, node2]:
|
||||||
node.query("DROP TABLE IF EXISTS replicated_table_for_moves")
|
node.query("DROP TABLE IF EXISTS replicated_table_for_moves SYNC")
|
||||||
|
|
||||||
|
|
||||||
def test_download_appropriate_disk(start_cluster):
|
def test_download_appropriate_disk(start_cluster):
|
||||||
@ -1165,7 +1173,7 @@ def test_download_appropriate_disk(start_cluster):
|
|||||||
|
|
||||||
finally:
|
finally:
|
||||||
for node in [node1, node2]:
|
for node in [node1, node2]:
|
||||||
node.query("DROP TABLE IF EXISTS replicated_table_for_download")
|
node.query("DROP TABLE IF EXISTS replicated_table_for_download SYNC")
|
||||||
|
|
||||||
|
|
||||||
def test_rename(start_cluster):
|
def test_rename(start_cluster):
|
||||||
@ -1202,9 +1210,9 @@ def test_rename(start_cluster):
|
|||||||
node1.query("SELECT COUNT() FROM default.renaming_table1")
|
node1.query("SELECT COUNT() FROM default.renaming_table1")
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
node1.query("DROP TABLE IF EXISTS default.renaming_table")
|
node1.query("DROP TABLE IF EXISTS default.renaming_table SYNC")
|
||||||
node1.query("DROP TABLE IF EXISTS default.renaming_table1")
|
node1.query("DROP TABLE IF EXISTS default.renaming_table1 SYNC")
|
||||||
node1.query("DROP TABLE IF EXISTS test.renaming_table2")
|
node1.query("DROP TABLE IF EXISTS test.renaming_table2 SYNC")
|
||||||
|
|
||||||
|
|
||||||
def test_freeze(start_cluster):
|
def test_freeze(start_cluster):
|
||||||
@ -1238,7 +1246,7 @@ def test_freeze(start_cluster):
|
|||||||
node1.exec_in_container(["bash", "-c", "find /external/shadow -name '*.mrk2' | grep '.*'"])
|
node1.exec_in_container(["bash", "-c", "find /external/shadow -name '*.mrk2' | grep '.*'"])
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
node1.query("DROP TABLE IF EXISTS default.freezing_table")
|
node1.query("DROP TABLE IF EXISTS default.freezing_table SYNC")
|
||||||
node1.exec_in_container(["rm", "-rf", "/jbod1/shadow", "/external/shadow"])
|
node1.exec_in_container(["rm", "-rf", "/jbod1/shadow", "/external/shadow"])
|
||||||
|
|
||||||
|
|
||||||
@ -1282,7 +1290,7 @@ def test_kill_while_insert(start_cluster):
|
|||||||
|
|
||||||
finally:
|
finally:
|
||||||
try:
|
try:
|
||||||
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
||||||
except:
|
except:
|
||||||
"""ClickHouse may be inactive at this moment and we don't want to mask a meaningful exception."""
|
"""ClickHouse may be inactive at this moment and we don't want to mask a meaningful exception."""
|
||||||
|
|
||||||
@ -1343,7 +1351,7 @@ def test_move_while_merge(start_cluster):
|
|||||||
assert node1.query("SELECT count() FROM {name}".format(name=name)).splitlines() == ["2"]
|
assert node1.query("SELECT count() FROM {name}".format(name=name)).splitlines() == ["2"]
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
||||||
|
|
||||||
|
|
||||||
def test_move_across_policies_does_not_work(start_cluster):
|
def test_move_across_policies_does_not_work(start_cluster):
|
||||||
@ -1384,8 +1392,8 @@ def test_move_across_policies_does_not_work(start_cluster):
|
|||||||
assert node1.query("""SELECT * FROM {name}""".format(name=name)).splitlines() == ["1"]
|
assert node1.query("""SELECT * FROM {name}""".format(name=name)).splitlines() == ["1"]
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
node1.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
node1.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
||||||
node1.query("DROP TABLE IF EXISTS {name}2".format(name=name))
|
node1.query(f"DROP TABLE IF EXISTS {name}2 SYNC")
|
||||||
|
|
||||||
|
|
||||||
def _insert_merge_execute(node, name, policy, parts, cmds, parts_before_cmds, parts_after_cmds):
|
def _insert_merge_execute(node, name, policy, parts, cmds, parts_before_cmds, parts_after_cmds):
|
||||||
@ -1420,7 +1428,7 @@ def _insert_merge_execute(node, name, policy, parts, cmds, parts_before_cmds, pa
|
|||||||
assert len(parts) == parts_after_cmds
|
assert len(parts) == parts_after_cmds
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
node.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
node.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
||||||
|
|
||||||
|
|
||||||
def _check_merges_are_working(node, storage_policy, volume, shall_work):
|
def _check_merges_are_working(node, storage_policy, volume, shall_work):
|
||||||
@ -1458,7 +1466,7 @@ def _check_merges_are_working(node, storage_policy, volume, shall_work):
|
|||||||
assert len(parts) == 1 if shall_work else created_parts
|
assert len(parts) == 1 if shall_work else created_parts
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
node.query("DROP TABLE IF EXISTS {name}".format(name=name))
|
node.query(f"DROP TABLE IF EXISTS {name} SYNC")
|
||||||
|
|
||||||
|
|
||||||
def _get_prefer_not_to_merge_for_storage_policy(node, storage_policy):
|
def _get_prefer_not_to_merge_for_storage_policy(node, storage_policy):
|
||||||
|
Loading…
Reference in New Issue
Block a user