Reduce overhead of the mutations for SELECTs (v2)

SELECTs are affected by the mutations, since it tries to apply them on
fly, and scanning over existing mutations can take significant amount of
time (for simple queries, i.e. count())

And also even after mutation had been finished, it still a problem,
because mutations are not removed instantly.

So instead introduce an atomic counter alter_conversions_mutations, that
is incremented for new mutations and decremented once mutation
finished/killed, that way once the mutation finished they will not
affect queries.

Here are some numbers for non-RENAME mutations:

    rmt vanilla w/o mutations | queries: 3693, QPS: 494.813
    rmt vanilla w/ mutations  | queries: 2190, QPS: 388.256
    rmt patched w/o mutations | queries: 3168, QPS: 620.061
    rmt patched w/ mutations  | queries: 3155, QPS: 614.424
    mt vanilla w/o mutations  | queries: 3498, QPS: 656.399
    mt vanilla w/ mutations   | queries: 3821, QPS: 600.425
    mt patched w/o mutations  | queries: 5732, QPS: 745.585
    mt patched w/ mutations   | queries: 4719, QPS: 715.034

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2024-03-05 12:20:05 +01:00
parent de2e9f8983
commit be6777bc86
7 changed files with 91 additions and 9 deletions

View File

@ -8371,4 +8371,29 @@ bool MergeTreeData::initializeDiskOnConfigChange(const std::set<String> & new_ad
}
return true;
}
bool updateAlterConversionsMutations(const MutationCommands & commands, std::atomic<ssize_t> & alter_conversions_mutations, bool remove)
{
for (const auto & command : commands)
{
if (AlterConversions::supportsMutationCommandType(command.type))
{
if (remove)
{
--alter_conversions_mutations;
if (alter_conversions_mutations < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly mutations counter is negative ({})", alter_conversions_mutations);
}
else
{
if (alter_conversions_mutations < 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "On-fly mutations counter is negative ({})", alter_conversions_mutations);
++alter_conversions_mutations;
}
return true;
}
}
return false;
}
}

View File

@ -1708,4 +1708,8 @@ struct CurrentlySubmergingEmergingTagger
|| (settings.min_compressed_bytes_to_fsync_after_merge && input_bytes >= settings.min_compressed_bytes_to_fsync_after_merge));
}
/// Look at MutationCommands if it contains mutations for AlterConversions, update the counter.
/// Return true if the counter had been updated
bool updateAlterConversionsMutations(const MutationCommands & commands, std::atomic<ssize_t> & alter_conversions_mutations, bool remove);
}

View File

@ -6,8 +6,11 @@
#include <Storages/MergeTree/ReplicatedMergeTreeMergeStrategyPicker.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/noexcept_scope.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/CurrentMetrics.h>
#include <Storages/MutationCommands.h>
#include <base/defines.h>
#include <Parsers/formatAST.h>
#include <base/sort.h>
#include <ranges>
@ -942,7 +945,14 @@ int32_t ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper
mutations_by_partition.erase(partition_and_block_num.first);
}
it = mutations_by_znode.erase(it);
if (!it->second.is_done)
{
const auto commands = entry.commands;
it = mutations_by_znode.erase(it);
updateAlterConversionsMutations(commands, alter_conversions_mutations, /* remove= */ true);
}
else
it = mutations_by_znode.erase(it);
}
else
++it;
@ -991,12 +1001,15 @@ int32_t ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper
auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry, format_version))
.first->second;
for (const auto & pair : entry->block_numbers)
{
const String & partition_id = pair.first;
Int64 block_num = pair.second;
mutations_by_partition[partition_id].emplace(block_num, &mutation);
}
updateAlterConversionsMutations(entry->commands, alter_conversions_mutations, /* remove= */ false);
NOEXCEPT_SCOPE({
for (const auto & pair : entry->block_numbers)
{
const String & partition_id = pair.first;
Int64 block_num = pair.second;
mutations_by_partition[partition_id].emplace(block_num, &mutation);
}
});
LOG_TRACE(log, "Adding mutation {} for {} partitions (data versions: {})",
entry->znode_name, entry->block_numbers.size(), entry->getBlockNumbersForLogs());
@ -1062,6 +1075,8 @@ ReplicatedMergeTreeMutationEntryPtr ReplicatedMergeTreeQueue::removeMutation(
}
mutations_by_znode.erase(it);
/// updateAlterConversionsMutations() will be called in updateMutations()
LOG_DEBUG(log, "Removed mutation {} from local state.", entry->znode_name);
}
@ -1887,6 +1902,10 @@ ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zk
MutationCommands ReplicatedMergeTreeQueue::getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const
{
chassert(alter_conversions_mutations >= 0);
if (alter_conversions_mutations == 0)
return {};
std::unique_lock lock(state_mutex);
auto in_partition = mutations_by_partition.find(part->info.partition_id);
@ -2019,6 +2038,8 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
"were executed concurrently on different replicas.", znode);
mutation.parts_to_do.clear();
}
updateAlterConversionsMutations(mutation.entry->commands, alter_conversions_mutations, /* remove= */ true);
}
else if (mutation.parts_to_do.size() == 0)
{
@ -2075,6 +2096,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
LOG_TRACE(log, "Finishing data alter with version {} for entry {}", entry->alter_version, entry->znode_name);
alter_sequence.finishDataAlter(entry->alter_version, lock);
}
updateAlterConversionsMutations(entry->commands, alter_conversions_mutations, /* remove= */ true);
}
}
}

View File

@ -151,6 +151,8 @@ private:
/// Mapping from znode path to Mutations Status
std::map<String, MutationStatus> mutations_by_znode;
/// Unfinished mutations that is required AlterConversions (see getAlterMutationCommandsForPart())
std::atomic<ssize_t> alter_conversions_mutations = 0;
/// Partition -> (block_number -> MutationStatus)
std::unordered_map<String, std::map<Int64, MutationStatus *>> mutations_by_partition;
/// Znode ID of the latest mutation that is done.

View File

@ -521,9 +521,18 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, Context
String mutation_id = entry.file_name;
if (txn)
txn->addMutation(shared_from_this(), mutation_id);
bool alter_conversions_mutations_updated = updateAlterConversionsMutations(entry.commands, alter_conversions_mutations, /* remove= */ false);
bool inserted = current_mutations_by_version.try_emplace(version, std::move(entry)).second;
if (!inserted)
{
if (alter_conversions_mutations_updated)
{
--alter_conversions_mutations;
chassert(alter_conversions_mutations >= 0);
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", version);
}
LOG_INFO(log, "Added mutation: {}{}", mutation_id, additional_info);
}
@ -559,6 +568,8 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
entry.latest_fail_reason.clear();
if (static_cast<UInt64>(result_part->part_info.mutation) == it->first)
mutation_backoff_policy.removePartFromFailed(failed_part->name);
updateAlterConversionsMutations(it->second.commands, alter_conversions_mutations, /* remove= */ true);
}
}
else
@ -837,8 +848,20 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
auto it = current_mutations_by_version.find(mutation_version);
if (it != current_mutations_by_version.end())
{
bool mutation_finished = true;
if (std::optional<Int64> min_version = getMinPartDataVersion())
mutation_finished = *min_version > static_cast<Int64>(mutation_version);
to_kill.emplace(std::move(it->second));
current_mutations_by_version.erase(it);
if (!mutation_finished)
{
const auto commands = it->second.commands;
current_mutations_by_version.erase(it);
updateAlterConversionsMutations(commands, alter_conversions_mutations, /* remove= */ true);
}
else
current_mutations_by_version.erase(it);
}
}
@ -916,6 +939,7 @@ void StorageMergeTree::loadMutations()
auto inserted = current_mutations_by_version.try_emplace(block_number, std::move(entry)).second;
if (!inserted)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation {} already exists, it's a bug", block_number);
updateAlterConversionsMutations(entry.commands, alter_conversions_mutations, /* remove= */ false);
}
else if (startsWith(it->name(), "tmp_mutation_"))
{
@ -2409,6 +2433,10 @@ void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
MutationCommands StorageMergeTree::getAlterMutationCommandsForPart(const DataPartPtr & part) const
{
chassert(alter_conversions_mutations >= 0);
if (alter_conversions_mutations == 0)
return {};
std::lock_guard lock(currently_processing_in_background_mutex);
UInt64 part_data_version = part->info.getDataVersion();

View File

@ -147,6 +147,8 @@ private:
DataParts currently_merging_mutating_parts;
std::map<UInt64, MergeTreeMutationEntry> current_mutations_by_version;
/// Unfinished mutations that is required AlterConversions (see getAlterMutationCommandsForPart())
std::atomic<ssize_t> alter_conversions_mutations = 0;
std::atomic<bool> shutdown_called {false};
std::atomic<bool> flush_called {false};

View File

@ -32,4 +32,3 @@
<drop_query>drop table alter_select_{engine}</drop_query>
</test>