Attempt to skip mutations of different partitions in StorageMergeTree.

This commit is contained in:
Vladimir Chebotarev 2021-02-04 09:00:58 +03:00
parent b39c19399f
commit 6599124dba
2 changed files with 95 additions and 0 deletions

View File

@ -954,6 +954,29 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
if (!commands->empty()) if (!commands->empty())
{ {
bool affected = false;
for (const auto & command : commands)
{
if (command.partition == nullptr)
{
affected = true;
break;
}
const String partition_id = part->storage.getPartitionIDFromQuery(command.partition, global_context);
if (partition_id == part->info.partition_id)
{
affected = true;
break;
}
}
if (!affected)
{
/// Shall not create a new part, but will do that later if mutation with higher version appear.
continue;
}
auto new_part_info = part->info; auto new_part_info = part->info;
new_part_info.mutation = current_mutations_by_version.rbegin()->first; new_part_info.mutation = current_mutations_by_version.rbegin()->first;

View File

@ -22,6 +22,78 @@ def started_cluster():
cluster.shutdown() cluster.shutdown()
def test_mutations_in_partition_background(started_cluster):
try:
numbers = 10 # FIXME too many mutations (66) simultaneously will stuck ClickHouse
name = "test_mutations_in_partition"
instance_test_mutations.query(
f'''CREATE TABLE {name} (date Date, a UInt64, b String) ENGINE = MergeTree() ORDER BY tuple() PARTITION BY a''')
instance_test_mutations.query(
f'''INSERT INTO {name} SELECT '2019-07-29' AS date, number, toString(number) FROM numbers({numbers})''')
for i in range(0, numbers, 3):
instance_test_mutations.query(f'''ALTER TABLE {name} DELETE IN PARTITION {i} WHERE a = {i}''')
for i in range(1, numbers, 3):
instance_test_mutations.query(f'''ALTER TABLE {name} UPDATE b = 'changed' IN PARTITION {i} WHERE a = {i} ''')
def count_and_changed():
return instance_test_mutations.query(f"SELECT count(), countIf(b == 'changed') FROM {name} SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV").splitlines()
all_done = False
for wait_times_for_mutation in range(100): # wait for replication 80 seconds max
time.sleep(0.8)
if count_and_changed() == ["6,3"]:
all_done = True
break
print(instance_test_mutations.query(
f"SELECT mutation_id, command, parts_to_do, is_done, latest_failed_part, latest_fail_reason FROM system.mutations WHERE table = '{name}' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT TSVWithNames"))
assert (count_and_changed(), all_done) == (["6,3"], True)
assert instance_test_mutations.query(f"SELECT count(), sum(is_done) FROM system.mutations WHERE table = '{name}' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV").splitlines() == ["7,7"]
finally:
instance_test_mutations.query(f'''DROP TABLE {name}''')
@pytest.mark.parametrize("sync", [
("last",),
("all",)
])
def test_mutations_in_partition_sync(started_cluster, sync):
try:
numbers = 10 # FIXME too many mutations (66) simultaneously will stuck ClickHouse
name = "test_mutations_in_partition_sync"
instance_test_mutations.query(
f'''CREATE TABLE {name} (date Date, a UInt64, b String) ENGINE = MergeTree() ORDER BY tuple() PARTITION BY a''')
instance_test_mutations.query(
f'''INSERT INTO {name} SELECT '2019-07-29' AS date, number, toString(number) FROM numbers({numbers})''')
for i in range(0, numbers, 3):
instance_test_mutations.query(f'''ALTER TABLE {name} DELETE IN PARTITION {i} WHERE a = {i}'''
+ (' SETTINGS mutations_sync = 1' if sync == 'all' else ''))
for reverse_index, i in reversed(list(enumerate(reversed(range(1, numbers, 3))))):
instance_test_mutations.query(f'''ALTER TABLE {name} UPDATE b = 'changed' IN PARTITION {i} WHERE a = {i}'''
+ (' SETTINGS mutations_sync = 1' if not reverse_index or sync == 'all' else ''))
def count_and_changed():
return instance_test_mutations.query(f"SELECT count(), countIf(b == 'changed') FROM {name} SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV").splitlines()
print(instance_test_mutations.query(
f"SELECT mutation_id, command, parts_to_do, is_done, latest_failed_part, latest_fail_reason FROM system.mutations WHERE table = '{name}' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT TSVWithNames"))
assert count_and_changed() == ["6,3"]
assert instance_test_mutations.query(f"SELECT count(), sum(is_done) FROM system.mutations WHERE table = '{name}' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV").splitlines() == ["7,7"]
finally:
instance_test_mutations.query(f'''DROP TABLE {name}''')
def test_mutations_with_merge_background_task(started_cluster): def test_mutations_with_merge_background_task(started_cluster):
instance_test_mutations.query('''SYSTEM STOP MERGES test_mutations_with_ast_elements''') instance_test_mutations.query('''SYSTEM STOP MERGES test_mutations_with_ast_elements''')