mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Test fixes.
This commit is contained in:
parent
390416b24d
commit
863f3565ad
@ -868,8 +868,6 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
|
|||||||
if (storage_settings.get()->assign_part_uuids)
|
if (storage_settings.get()->assign_part_uuids)
|
||||||
future_part->uuid = UUIDHelpers::generateV4();
|
future_part->uuid = UUIDHelpers::generateV4();
|
||||||
|
|
||||||
auto commands = MutationCommands::create();
|
|
||||||
|
|
||||||
CurrentlyMergingPartsTaggerPtr tagger;
|
CurrentlyMergingPartsTaggerPtr tagger;
|
||||||
|
|
||||||
if (current_mutations_by_version.empty())
|
if (current_mutations_by_version.empty())
|
||||||
@ -885,6 +883,7 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
|
|||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool are_some_mutations_for_some_parts_skipped = false;
|
||||||
auto mutations_end_it = current_mutations_by_version.end();
|
auto mutations_end_it = current_mutations_by_version.end();
|
||||||
for (const auto & part : getDataPartsVector())
|
for (const auto & part : getDataPartsVector())
|
||||||
{
|
{
|
||||||
@ -906,6 +905,8 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto commands = MutationCommands::create();
|
||||||
|
|
||||||
size_t current_ast_elements = 0;
|
size_t current_ast_elements = 0;
|
||||||
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
|
for (auto it = mutations_begin_it; it != mutations_end_it; ++it)
|
||||||
{
|
{
|
||||||
@ -978,7 +979,7 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
|
|||||||
/// Shall not create a new part, but will do that later if mutation with higher version appear.
|
/// Shall not create a new part, but will do that later if mutation with higher version appear.
|
||||||
auto block_range = std::make_pair(part->info.min_block, part->info.max_block);
|
auto block_range = std::make_pair(part->info.min_block, part->info.max_block);
|
||||||
updated_version_by_block_range[block_range] = current_mutations_by_version.rbegin()->first;
|
updated_version_by_block_range[block_range] = current_mutations_by_version.rbegin()->first;
|
||||||
commands.clear();
|
are_some_mutations_for_some_parts_skipped = true;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -995,6 +996,13 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (are_some_mutations_for_some_parts_skipped)
|
||||||
|
{
|
||||||
|
currently_processing_in_background_mutex.unlock();
|
||||||
|
std::lock_guard<std::mutex> mutation_wait_mutex_lock(mutation_wait_mutex);
|
||||||
|
mutation_wait_event.notify_all();
|
||||||
|
}
|
||||||
|
|
||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -134,35 +134,64 @@ def test_mutations_with_truncate_table(started_cluster):
|
|||||||
"SELECT COUNT() FROM system.mutations WHERE table = 'test_mutations_with_ast_elements SETTINGS force_index_by_date = 0, force_primary_key = 0'").rstrip() == '0'
|
"SELECT COUNT() FROM system.mutations WHERE table = 'test_mutations_with_ast_elements SETTINGS force_index_by_date = 0, force_primary_key = 0'").rstrip() == '0'
|
||||||
|
|
||||||
|
|
||||||
def test_mutations_will_not_hang_for_non_existing_parts(started_cluster):
|
def test_mutations_will_not_hang_for_non_existing_parts_sync(started_cluster):
|
||||||
try:
|
try:
|
||||||
numbers = 100
|
numbers = 100
|
||||||
|
|
||||||
name = "test_mutations_will_not_hang_for_non_existing_parts"
|
name = "test_mutations_will_not_hang_for_non_existing_parts_sync"
|
||||||
instance_test_mutations.query(
|
instance_test_mutations.query(
|
||||||
f"""CREATE TABLE {name} (date Date, a UInt64, b String) ENGINE = MergeTree() ORDER BY tuple() PARTITION BY a""")
|
f"""CREATE TABLE {name} (date Date, a UInt64, b String) ENGINE = MergeTree() ORDER BY tuple() PARTITION BY a""")
|
||||||
instance_test_mutations.query(
|
instance_test_mutations.query(
|
||||||
f"""INSERT INTO {name} SELECT '2019-07-29' AS date, number, toString(number) FROM numbers({numbers})""")
|
f"""INSERT INTO {name} SELECT '2019-07-29' AS date, number, toString(number) FROM numbers({numbers})""")
|
||||||
|
|
||||||
for i in range(0, numbers, 3):
|
for i in range(0, numbers, 3):
|
||||||
instance_test_mutations.query(f"""ALTER TABLE {name} DELETE IN PARTITION {i} WHERE a = {i} SETTINGS mutations_sync = 1""")
|
instance_test_mutations.query(f"""ALTER TABLE {name} DELETE IN PARTITION {i+1000} WHERE a = {i} SETTINGS mutations_sync = 1""")
|
||||||
|
|
||||||
def count():
|
def count():
|
||||||
return instance_test_mutations.query(f"SELECT count() FROM {name} SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV").splitlines()
|
return instance_test_mutations.query(f"SELECT count() FROM {name} SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV").splitlines()
|
||||||
|
|
||||||
|
print(instance_test_mutations.query(
|
||||||
|
f"SELECT mutation_id, command, parts_to_do, is_done, latest_failed_part, latest_fail_reason, parts_to_do_names FROM system.mutations WHERE table = '{name}' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT TSVWithNames"))
|
||||||
|
|
||||||
|
assert count() == [f"{numbers}"]
|
||||||
|
assert instance_test_mutations.query(f"SELECT count(), sum(is_done) FROM system.mutations WHERE table = '{name}' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV").splitlines() == [f"34,34"]
|
||||||
|
|
||||||
|
finally:
|
||||||
|
instance_test_mutations.query(f"""DROP TABLE {name}""")
|
||||||
|
|
||||||
|
|
||||||
|
def test_mutations_will_not_hang_for_non_existing_parts_async(started_cluster):
|
||||||
|
try:
|
||||||
|
numbers = 100
|
||||||
|
|
||||||
|
name = "test_mutations_will_not_hang_for_non_existing_parts_async"
|
||||||
|
instance_test_mutations.query(
|
||||||
|
f"""CREATE TABLE {name} (date Date, a UInt64, b String) ENGINE = MergeTree() ORDER BY tuple() PARTITION BY a""")
|
||||||
|
instance_test_mutations.query(
|
||||||
|
f"""INSERT INTO {name} SELECT '2019-07-29' AS date, number, toString(number) FROM numbers({numbers})""")
|
||||||
|
|
||||||
|
for i in range(0, numbers, 3):
|
||||||
|
instance_test_mutations.query(f"""ALTER TABLE {name} DELETE IN PARTITION {i+1000} WHERE a = {i}""")
|
||||||
|
|
||||||
|
def count():
|
||||||
|
return instance_test_mutations.query(f"SELECT count() FROM {name} SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV").splitlines()
|
||||||
|
|
||||||
|
def count_and_sum_is_done():
|
||||||
|
return instance_test_mutations.query(f"SELECT count(), sum(is_done) FROM system.mutations WHERE table = '{name}' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV").splitlines()
|
||||||
|
|
||||||
all_done = False
|
all_done = False
|
||||||
for wait_times_for_mutation in range(100): # wait for replication 80 seconds max
|
for wait_times_for_mutation in range(100): # wait for replication 80 seconds max
|
||||||
time.sleep(0.8)
|
time.sleep(0.8)
|
||||||
|
|
||||||
if count == ["66"]:
|
if count_and_sum_is_done() == ["34,34"]:
|
||||||
all_done = True
|
all_done = True
|
||||||
break
|
break
|
||||||
|
|
||||||
print(instance_test_mutations.query(
|
print(instance_test_mutations.query(
|
||||||
f"SELECT mutation_id, command, parts_to_do, is_done, latest_failed_part, latest_fail_reason, parts_to_do_names FROM system.mutations WHERE table = '{name}' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT TSVWithNames"))
|
f"SELECT mutation_id, command, parts_to_do, is_done, latest_failed_part, latest_fail_reason, parts_to_do_names FROM system.mutations WHERE table = '{name}' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT TSVWithNames"))
|
||||||
|
|
||||||
assert (count(), all_done) == (["66"], True)
|
assert count() == [f"{numbers}"]
|
||||||
assert instance_test_mutations.query(f"SELECT count(), sum(is_done) FROM system.mutations WHERE table = '{name}' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV").splitlines() == ["34,34"]
|
assert count_and_sum_is_done() == ["34,34"]
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
instance_test_mutations.query(f"""DROP TABLE {name}""")
|
instance_test_mutations.query(f"""DROP TABLE {name}""")
|
||||||
|
Loading…
Reference in New Issue
Block a user