remove wrong code for skipping mutations in MergeTree

This commit is contained in:
Alexander Tokmakov 2022-08-24 20:58:59 +02:00
parent e6e7f5db93
commit 3cd26aafe4
5 changed files with 68 additions and 117 deletions

View File

@ -569,9 +569,25 @@ bool StorageMergeTree::hasLightweightDeletedMask() const
return has_lightweight_delete_parts.load(std::memory_order_relaxed);
}
namespace
{
struct PartVersionWithName
{
Int64 version;
String name;
};
bool comparator(const PartVersionWithName & f, const PartVersionWithName & s)
{
return f.version < s.version;
}
}
std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsStatus(Int64 mutation_version, std::set<String> * mutation_ids) const
{
std::unique_lock lock(currently_processing_in_background_mutex);
std::lock_guard lock(currently_processing_in_background_mutex);
auto current_mutation_it = current_mutations_by_version.find(mutation_version);
/// Killed
@ -587,7 +603,7 @@ std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsS
auto data_parts = getVisibleDataPartsVector(txn);
for (const auto & data_part : data_parts)
{
Int64 data_version = getUpdatedDataVersion(data_part, lock);
Int64 data_version = data_part->info.getDataVersion();
if (data_version < mutation_version)
{
if (!mutation_entry.latest_fail_reason.empty())
@ -630,9 +646,14 @@ std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsS
std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() const
{
std::unique_lock lock(currently_processing_in_background_mutex);
std::lock_guard lock(currently_processing_in_background_mutex);
auto part_versions_with_names = getSortedPartVersionsWithNames(lock);
std::vector<PartVersionWithName> part_versions_with_names;
auto data_parts = getDataPartsVectorForInternalUsage();
part_versions_with_names.reserve(data_parts.size());
for (const auto & part : data_parts)
part_versions_with_names.emplace_back(PartVersionWithName{part->info.getDataVersion(), part->name});
std::sort(part_versions_with_names.begin(), part_versions_with_names.end(), comparator);
std::vector<MergeTreeMutationStatus> result;
for (const auto & kv : current_mutations_by_version)
@ -641,7 +662,7 @@ std::vector<MergeTreeMutationStatus> StorageMergeTree::getMutationsStatus() cons
const MergeTreeMutationEntry & entry = kv.second;
const PartVersionWithName needle{mutation_version, ""};
auto versions_it = std::lower_bound(
part_versions_with_names.begin(), part_versions_with_names.end(), needle);
part_versions_with_names.begin(), part_versions_with_names.end(), needle, comparator);
size_t parts_to_do = versions_it - part_versions_with_names.begin();
Names parts_to_do_names;
@ -963,8 +984,7 @@ bool StorageMergeTree::partIsAssignedToBackgroundOperation(const DataPartPtr & p
std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
const StorageMetadataPtr & metadata_snapshot, String * /* disable_reason */, TableLockHolder & /* table_lock_holder */,
std::unique_lock<std::mutex> & currently_processing_in_background_mutex_lock,
bool & were_some_mutations_for_some_parts_skipped)
std::unique_lock<std::mutex> & /*currently_processing_in_background_mutex_lock*/)
{
size_t max_ast_elements = getContext()->getSettingsRef().max_expanded_ast_elements;
@ -993,7 +1013,7 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
if (currently_merging_mutating_parts.contains(part))
continue;
auto mutations_begin_it = current_mutations_by_version.upper_bound(getUpdatedDataVersion(part, currently_processing_in_background_mutex_lock));
auto mutations_begin_it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
if (mutations_begin_it == mutations_end_it)
continue;
@ -1080,33 +1100,6 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
assert(commands->empty() == (last_mutation_to_apply == mutations_end_it));
if (!commands->empty())
{
bool is_partition_affected = false;
for (const auto & command : *commands)
{
if (command.partition == nullptr)
{
is_partition_affected = true;
break;
}
const String partition_id = part->storage.getPartitionIDFromQuery(command.partition, getContext());
if (partition_id == part->info.partition_id)
{
is_partition_affected = true;
break;
}
}
if (!is_partition_affected)
{
/// Shall not create a new part, but will do that later if mutation with higher version appear.
/// This is needed in order to not produce excessive mutations of non-related parts.
auto block_range = std::make_pair(part->info.min_block, part->info.max_block);
updated_version_by_block_range[block_range] = last_mutation_to_apply->first;
were_some_mutations_for_some_parts_skipped = true;
continue;
}
auto new_part_info = part->info;
new_part_info.mutation = last_mutation_to_apply->first;
@ -1133,7 +1126,6 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
auto metadata_snapshot = getInMemoryMetadataPtr();
std::shared_ptr<MergeMutateSelectedEntry> merge_entry, mutate_entry;
bool were_some_mutations_skipped = false;
auto share_lock = lockForShare(RWLockImpl::NO_QUERY, getSettings()->lock_acquire_timeout_for_background_operations);
@ -1154,19 +1146,11 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
merge_entry = selectPartsToMerge(metadata_snapshot, false, {}, false, nullptr, share_lock, lock, txn);
if (!merge_entry)
mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr, share_lock, lock, were_some_mutations_skipped);
mutate_entry = selectPartsToMutate(metadata_snapshot, nullptr, share_lock, lock);
has_mutations = !current_mutations_by_version.empty();
}
if ((!mutate_entry && has_mutations) || were_some_mutations_skipped)
{
/// Notify in case of errors or if some mutation was skipped (because it has no effect on the part).
/// TODO @azat: we can also spot some selection errors when `mutate_entry` is true.
std::lock_guard lock(mutation_wait_mutex);
mutation_wait_event.notify_all();
}
if (merge_entry)
{
auto task = std::make_shared<MergePlainMergeTreeTask>(*this, metadata_snapshot, false, Names{}, merge_entry, share_lock, common_assignee_trigger);
@ -1228,22 +1212,11 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
return scheduled;
}
Int64 StorageMergeTree::getUpdatedDataVersion(
const DataPartPtr & part,
std::unique_lock<std::mutex> & /* currently_processing_in_background_mutex_lock */) const
{
auto it = updated_version_by_block_range.find(std::make_pair(part->info.min_block, part->info.max_block));
if (it != updated_version_by_block_range.end())
return std::max(part->info.getDataVersion(), static_cast<Int64>(it->second));
else
return part->info.getDataVersion();
}
UInt64 StorageMergeTree::getCurrentMutationVersion(
const DataPartPtr & part,
std::unique_lock<std::mutex> & currently_processing_in_background_mutex_lock) const
std::unique_lock<std::mutex> & /*currently_processing_in_background_mutex_lock*/) const
{
auto it = current_mutations_by_version.upper_bound(getUpdatedDataVersion(part, currently_processing_in_background_mutex_lock));
auto it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
if (it == current_mutations_by_version.begin())
return 0;
--it;
@ -1256,7 +1229,7 @@ size_t StorageMergeTree::clearOldMutations(bool truncate)
std::vector<MergeTreeMutationEntry> mutations_to_delete;
{
std::unique_lock<std::mutex> lock(currently_processing_in_background_mutex);
std::lock_guard<std::mutex> lock(currently_processing_in_background_mutex);
if (current_mutations_by_version.size() <= finished_mutations_to_keep)
return 0;
@ -1268,23 +1241,6 @@ size_t StorageMergeTree::clearOldMutations(bool truncate)
end_it = current_mutations_by_version.upper_bound(*min_version);
size_t done_count = std::distance(begin_it, end_it);
if (done_count <= finished_mutations_to_keep)
return 0;
auto part_versions_with_names = getSortedPartVersionsWithNames(lock);
for (auto it = begin_it; it != end_it; ++it)
{
const PartVersionWithName needle{static_cast<Int64>(it->first), ""};
auto versions_it = std::lower_bound(
part_versions_with_names.begin(), part_versions_with_names.end(), needle);
if (versions_it != part_versions_with_names.begin() || !it->second.tid.isPrehistoric())
{
done_count = std::distance(begin_it, it);
break;
}
}
if (done_count <= finished_mutations_to_keep)
return 0;
@ -1312,21 +1268,6 @@ size_t StorageMergeTree::clearOldMutations(bool truncate)
return mutations_to_delete.size();
}
std::vector<StorageMergeTree::PartVersionWithName> StorageMergeTree::getSortedPartVersionsWithNames(
std::unique_lock<std::mutex> & currently_processing_in_background_mutex_lock) const
{
std::vector<PartVersionWithName> part_versions_with_names;
auto data_parts = getDataPartsVectorForInternalUsage();
part_versions_with_names.reserve(data_parts.size());
for (const auto & part : data_parts)
part_versions_with_names.emplace_back(PartVersionWithName{
getUpdatedDataVersion(part, currently_processing_in_background_mutex_lock),
part->name
});
::sort(part_versions_with_names.begin(), part_versions_with_names.end());
return part_versions_with_names;
}
bool StorageMergeTree::optimize(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
@ -1868,9 +1809,9 @@ void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
MutationCommands StorageMergeTree::getFirstAlterMutationCommandsForPart(const DataPartPtr & part) const
{
std::unique_lock lock(currently_processing_in_background_mutex);
std::lock_guard lock(currently_processing_in_background_mutex);
auto it = current_mutations_by_version.upper_bound(getUpdatedDataVersion(part, lock));
auto it = current_mutations_by_version.upper_bound(part->info.getDataVersion());
if (it == current_mutations_by_version.end())
return {};
return it->second.commands;

View File

@ -147,11 +147,6 @@ private:
std::map<UInt64, MergeTreeMutationEntry> current_mutations_by_version;
/// We store information about mutations which are not applicable to the partition of each part.
/// The value is a maximum version for a part which will be the same as his current version,
/// that is, to which version it can be upgraded without any change.
std::map<std::pair<Int64, Int64>, UInt64> updated_version_by_block_range;
std::atomic<bool> shutdown_called {false};
std::atomic<bool> flush_called {false};
@ -192,17 +187,6 @@ private:
friend struct CurrentlyMergingPartsTagger;
struct PartVersionWithName
{
Int64 version;
String name;
bool operator <(const PartVersionWithName & s) const
{
return version < s.version;
}
};
std::shared_ptr<MergeMutateSelectedEntry> selectPartsToMerge(
const StorageMetadataPtr & metadata_snapshot,
bool aggressive,
@ -216,7 +200,9 @@ private:
SelectPartsDecision * select_decision_out = nullptr);
std::shared_ptr<MergeMutateSelectedEntry> selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String * disable_reason, TableLockHolder & table_lock_holder, std::unique_lock<std::mutex> & currently_processing_in_background_mutex_lock, bool & were_some_mutations_for_some_parts_skipped);
std::shared_ptr<MergeMutateSelectedEntry> selectPartsToMutate(
const StorageMetadataPtr & metadata_snapshot, String * disable_reason,
TableLockHolder & table_lock_holder, std::unique_lock<std::mutex> & currently_processing_in_background_mutex_lock);
/// For current mutations queue, returns maximum version of mutation for a part,
/// with respect of mutations which would not change it.
@ -225,15 +211,8 @@ private:
const DataPartPtr & part,
std::unique_lock<std::mutex> & /* currently_processing_in_background_mutex_lock */) const;
/// Returns maximum version of a part, with respect of mutations which would not change it.
Int64 getUpdatedDataVersion(
const DataPartPtr & part,
std::unique_lock<std::mutex> & /* currently_processing_in_background_mutex_lock */) const;
size_t clearOldMutations(bool truncate = false);
std::vector<PartVersionWithName> getSortedPartVersionsWithNames(std::unique_lock<std::mutex> & /* currently_processing_in_background_mutex_lock */) const;
// Partition helpers
void dropPartNoWaitNoThrow(const String & part_name) override;
void dropPart(const String & part_name, bool detach, ContextPtr context) override;

View File

@ -27,6 +27,7 @@ def started_cluster():
cluster.shutdown()
@pytest.mark.skip(reason="Skipping mutations in partition does not work")
def test_mutations_in_partition_background(started_cluster):
try:
numbers = 100
@ -79,6 +80,7 @@ def test_mutations_in_partition_background(started_cluster):
instance_test_mutations.query(f"""DROP TABLE {name}""")
@pytest.mark.skip(reason="Skipping mutations in partition does not work")
@pytest.mark.parametrize("sync", [("last",), ("all",)])
def test_mutations_in_partition_sync(started_cluster, sync):
try:
@ -190,6 +192,7 @@ def test_mutations_with_truncate_table(started_cluster):
)
@pytest.mark.skip(reason="Skipping mutations in partition does not work")
def test_mutations_will_not_hang_for_non_existing_parts_sync(started_cluster):
try:
numbers = 100
@ -227,6 +230,7 @@ def test_mutations_will_not_hang_for_non_existing_parts_sync(started_cluster):
instance_test_mutations.query(f"""DROP TABLE {name}""")
@pytest.mark.skip(reason="Skipping mutations in partition does not work")
def test_mutations_will_not_hang_for_non_existing_parts_async(started_cluster):
try:
numbers = 100

View File

@ -0,0 +1,9 @@
1 1 1_1_1_0
1 2 1_3_3_0
2 1 2_2_2_0
2 2 2_4_4_0
mutation_5.txt UPDATE n = n + (n NOT IN (default.m)) IN PARTITION ID \'1\' WHERE 1 [] 0 1
1 2
1 3
2 1
2 2

View File

@ -0,0 +1,18 @@
drop table if exists mt;
drop table if exists m;
create table mt (p int, n int) engine=MergeTree order by tuple() partition by p;
create table m (n int) engine=Memory;
insert into mt values (1, 1), (2, 1);
insert into mt values (1, 2), (2, 2);
select *, _part from mt order by _part;
alter table mt update n = n + (n not in m) in partition id '1' where 1 settings mutations_sync=1;
drop table m;
optimize table mt final;
select mutation_id, command, parts_to_do_names, parts_to_do, is_done from system.mutations where database=currentDatabase();
select * from mt order by p, n;
drop table mt;