Improve performance of SELECTs with active mutations (#59531)

* Configure keeper for perf tests

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>

* Improve performance of SELECTs with active mutations

getAlterMutationCommandsForPart() can be a hot path for query execution
when there are pending mutations.

- LOG_TEST - it is not only check one bool, but actually a bunch of
  atomics as well.

- Return std::vector over std::map (map is not required there) - no
  changes in performance.

- Copy only RENAME_COLUMN (since only this mutation is required by
  AlterConversions).

And here are results:

run|result
-|-
SELECT w/o ALTER|queries: 1565, QPS: 355.259, RPS: 355.259
SELECT w/ ALTER unpatched|queries: 2099, QPS: 220.623, RPS: 220.623
SELECT w/ ALTER and w/o LOG_TEST|queries: 2730, QPS: 235.859, RPS: 235.859
SELECT w/ ALTER and w/o LOG_TEST and w/ RENAME_COLUMN only|queries: 2995, QPS: 290.982, RPS: 290.982

But there are still room for improvements, at least MergeTree engines
could implement getStorageSnapshotForQuery().

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>

* Add AlterConversions::supportsMutationCommandType(), flatten vector<vector<MutationCommand>>

* Work around what appears to be a clang static analysis bug

---------

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
Co-authored-by: Michael Kolupaev <michael.kolupaev@clickhouse.com>
This commit is contained in:
Azat Khuzhin 2024-02-22 09:51:10 +01:00 committed by GitHub
parent 1b238d1180
commit a4f765cae7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 90 additions and 31 deletions

View File

@ -9,6 +9,11 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
bool AlterConversions::supportsMutationCommandType(MutationCommand::Type t)
{
return t == MutationCommand::Type::RENAME_COLUMN;
}
void AlterConversions::addMutationCommand(const MutationCommand & command)
{
/// Currently only RENAME_COLUMN is applied on-fly.

View File

@ -35,6 +35,8 @@ public:
/// Get column old name before rename (lookup by key in rename_map)
std::string getColumnOldName(const std::string & new_name) const;
static bool supportsMutationCommandType(MutationCommand::Type);
private:
/// Rename map new_name -> old_name.
std::vector<RenamePair> rename_map;

View File

@ -7950,11 +7950,10 @@ bool MergeTreeData::canUsePolymorphicParts(const MergeTreeSettings & settings, S
AlterConversionsPtr MergeTreeData::getAlterConversionsForPart(MergeTreeDataPartPtr part) const
{
auto commands_map = getAlterMutationCommandsForPart(part);
auto commands = getAlterMutationCommandsForPart(part);
auto result = std::make_shared<AlterConversions>();
for (const auto & [_, commands] : commands_map)
for (const auto & command : commands)
for (const auto & command : commands | std::views::reverse)
result->addMutationCommand(command);
return result;

View File

@ -1356,11 +1356,12 @@ protected:
/// mechanisms for parts locking
virtual bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const = 0;
/// Return most recent mutations commands for part which weren't applied
/// Used to receive AlterConversions for part and apply them on fly. This
/// method has different implementations for replicated and non replicated
/// MergeTree because they store mutations in different way.
virtual std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const DataPartPtr & part) const = 0;
/// Return pending mutations that weren't applied to `part` yet and should be applied on the fly
/// (i.e. when reading from the part). Mutations not supported by AlterConversions
/// (supportsMutationCommandType()) can be omitted.
///
/// @return list of mutations, in *reverse* order (newest to oldest)
virtual MutationCommands getAlterMutationCommandsForPart(const DataPartPtr & part) const = 0;
struct PartBackupEntries
{

View File

@ -1789,7 +1789,7 @@ ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zk
}
std::map<int64_t, MutationCommands> ReplicatedMergeTreeQueue::getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const
MutationCommands ReplicatedMergeTreeQueue::getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const
{
std::unique_lock lock(state_mutex);
@ -1799,9 +1799,8 @@ std::map<int64_t, MutationCommands> ReplicatedMergeTreeQueue::getAlterMutationCo
Int64 part_data_version = part->info.getDataVersion();
Int64 part_metadata_version = part->getMetadataVersion();
LOG_TEST(log, "Looking for mutations for part {} (part data version {}, part metadata version {})", part->name, part_data_version, part_metadata_version);
std::map<int64_t, MutationCommands> result;
MutationCommands result;
bool seen_all_data_mutations = false;
bool seen_all_metadata_mutations = false;
@ -1814,7 +1813,15 @@ std::map<int64_t, MutationCommands> ReplicatedMergeTreeQueue::getAlterMutationCo
if (seen_all_data_mutations && seen_all_metadata_mutations)
break;
auto alter_version = mutation_status->entry->alter_version;
auto & entry = mutation_status->entry;
auto add_to_result = [&] {
for (const auto & command : entry->commands | std::views::reverse)
if (AlterConversions::supportsMutationCommandType(command.type))
result.emplace_back(command);
};
auto alter_version = entry->alter_version;
if (alter_version != -1)
{
if (alter_version > storage.getInMemoryMetadataPtr()->getMetadataVersion())
@ -1822,22 +1829,19 @@ std::map<int64_t, MutationCommands> ReplicatedMergeTreeQueue::getAlterMutationCo
/// We take commands with bigger metadata version
if (alter_version > part_metadata_version)
result[mutation_version] = mutation_status->entry->commands;
add_to_result();
else
seen_all_metadata_mutations = true;
}
else
{
if (mutation_version > part_data_version)
result[mutation_version] = mutation_status->entry->commands;
add_to_result();
else
seen_all_data_mutations = true;
}
}
LOG_TEST(log, "Got {} commands for part {} (part data version {}, part metadata version {})",
result.size(), part->name, part_data_version, part_metadata_version);
return result;
}

View File

@ -401,7 +401,7 @@ public:
/// Return mutation commands for part which could be not applied to
/// it according to part mutation version. Used when we apply alter commands on fly,
/// without actual data modification on disk.
std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const;
MutationCommands getAlterMutationCommandsForPart(const MergeTreeData::DataPartPtr & part) const;
/// Mark finished mutations as done. If the function needs to be called again at some later time
/// (because some mutations are probably done but we are not sure yet), returns true.

View File

@ -2394,19 +2394,21 @@ void StorageMergeTree::attachRestoredParts(MutableDataPartsVector && parts)
}
std::map<int64_t, MutationCommands> StorageMergeTree::getAlterMutationCommandsForPart(const DataPartPtr & part) const
MutationCommands StorageMergeTree::getAlterMutationCommandsForPart(const DataPartPtr & part) const
{
std::lock_guard lock(currently_processing_in_background_mutex);
UInt64 part_data_version = part->info.getDataVersion();
std::map<int64_t, MutationCommands> result;
MutationCommands result;
for (const auto & [mutation_version, entry] : current_mutations_by_version | std::views::reverse)
{
if (mutation_version > part_data_version)
result[mutation_version] = entry.commands;
else
if (mutation_version <= part_data_version)
break;
for (const auto & command : entry.commands | std::views::reverse)
if (AlterConversions::supportsMutationCommandType(command.type))
result.emplace_back(command);
}
return result;

View File

@ -308,7 +308,7 @@ private:
};
protected:
std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const DataPartPtr & part) const override;
MutationCommands getAlterMutationCommandsForPart(const DataPartPtr & part) const override;
};
}

View File

@ -8957,7 +8957,7 @@ bool StorageReplicatedMergeTree::canUseAdaptiveGranularity() const
}
std::map<int64_t, MutationCommands> StorageReplicatedMergeTree::getAlterMutationCommandsForPart(const DataPartPtr & part) const
MutationCommands StorageReplicatedMergeTree::getAlterMutationCommandsForPart(const DataPartPtr & part) const
{
return queue.getAlterMutationCommandsForPart(part);
}

View File

@ -938,7 +938,7 @@ private:
void waitMutationToFinishOnReplicas(
const Strings & replicas, const String & mutation_id) const;
std::map<int64_t, MutationCommands> getAlterMutationCommandsForPart(const DataPartPtr & part) const override;
MutationCommands getAlterMutationCommandsForPart(const DataPartPtr & part) const override;
void startBackgroundMovesIfNeeded() override;

View File

@ -0,0 +1,35 @@
<test>
<substitutions>
<substitution>
<name>engine</name>
<values>
<value>mt</value>
<value>rmt</value>
</values>
</substitution>
</substitutions>
<create_query>create table alter_select_mt (part_id String, col_0 String) engine=MergeTree() partition by part_id order by tuple() settings max_parts_to_merge_at_once=1</create_query>
<create_query>create table alter_select_rmt (part_id String, col_0 String) engine=ReplicatedMergeTree('/tables/{{database}}', '{{table}}') partition by part_id order by tuple() settings max_parts_to_merge_at_once=1</create_query>
<create_query>system stop merges alter_select_{engine}</create_query>
<fill_query>
insert into alter_select_{engine} (part_id, col_0)
select toString(number % 5000), 0 from numbers(10000)
settings
max_block_size=1,
max_insert_threads=32,
min_insert_block_size_rows=1,
insert_deduplicate=false,
parts_to_delay_insert=100000,
parts_to_throw_insert=100000
</fill_query>
<fill_query>alter table alter_select_{engine} drop column col_0 settings alter_sync = 0</fill_query>
<query>select count() from alter_select_{engine} format Null settings max_threads=1</query>
<query>select * from alter_select_{engine} format Null settings max_threads=1</query>
<drop_query>drop table alter_select_{engine}</drop_query>
</test>

View File

@ -11,8 +11,14 @@ script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
# upstream/master
LEFT_SERVER_PORT=9001
LEFT_SERVER_KEEPER_PORT=9181
LEFT_SERVER_KEEPER_RAFT_PORT=9234
LEFT_SERVER_INTERSERVER_PORT=9009
# patched version
RIGHT_SERVER_PORT=9002
RIGHT_SERVER_PORT=19001
RIGHT_SERVER_KEEPER_PORT=19181
RIGHT_SERVER_KEEPER_RAFT_PORT=19234
RIGHT_SERVER_INTERSERVER_PORT=19009
# abort_conf -- abort if some options is not recognized
# abort -- abort if something is not right in the env (i.e. per-cpu arenas does not work)
@ -127,6 +133,10 @@ function restart
--user_files_path left/db/user_files
--top_level_domains_path "$(left_or_right left top_level_domains)"
--tcp_port $LEFT_SERVER_PORT
--keeper_server.tcp_port $LEFT_SERVER_KEEPER_PORT
--keeper_server.raft_configuration.server.port $LEFT_SERVER_KEEPER_RAFT_PORT
--zookeeper.node.port $LEFT_SERVER_KEEPER_PORT
--interserver_http_port $LEFT_SERVER_INTERSERVER_PORT
)
left/clickhouse-server "${left_server_opts[@]}" &>> left-server-log.log &
left_pid=$!
@ -142,6 +152,10 @@ function restart
--user_files_path right/db/user_files
--top_level_domains_path "$(left_or_right right top_level_domains)"
--tcp_port $RIGHT_SERVER_PORT
--keeper_server.tcp_port $RIGHT_SERVER_KEEPER_PORT
--keeper_server.raft_configuration.server.port $RIGHT_SERVER_KEEPER_RAFT_PORT
--zookeeper.node.port $RIGHT_SERVER_KEEPER_PORT
--interserver_http_port $RIGHT_SERVER_INTERSERVER_PORT
)
right/clickhouse-server "${right_server_opts[@]}" &>> right-server-log.log &
right_pid=$!

View File

@ -2,10 +2,7 @@
<http_port remove="remove"/>
<mysql_port remove="remove"/>
<postgresql_port remove="remove"/>
<interserver_http_port remove="remove"/>
<tcp_with_proxy_port remove="remove"/>
<keeper_server remove="remove"/>
<zookeeper remove="remove"/>
<listen_host>::</listen_host>
<logger>