Merge pull request #54335 from arenadata/ADQM-1109

Added peak_threads_usage to query_log table
This commit is contained in:
Dmitry Novik 2023-09-12 16:03:27 +02:00 committed by GitHub
commit 7edc0a30a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 147 additions and 10 deletions

View File

@ -101,7 +101,8 @@ Columns:
- `ProfileEvents` ([Map(String, UInt64)](../../sql-reference/data-types/map.md)) — ProfileEvents that measure different metrics. The description of them could be found in the table [system.events](../../operations/system-tables/events.md#system_tables-events) - `ProfileEvents` ([Map(String, UInt64)](../../sql-reference/data-types/map.md)) — ProfileEvents that measure different metrics. The description of them could be found in the table [system.events](../../operations/system-tables/events.md#system_tables-events)
- `Settings` ([Map(String, String)](../../sql-reference/data-types/map.md)) — Settings that were changed when the client ran the query. To enable logging changes to settings, set the `log_query_settings` parameter to 1. - `Settings` ([Map(String, String)](../../sql-reference/data-types/map.md)) — Settings that were changed when the client ran the query. To enable logging changes to settings, set the `log_query_settings` parameter to 1.
- `log_comment` ([String](../../sql-reference/data-types/string.md)) — Log comment. It can be set to arbitrary string no longer than [max_query_size](../../operations/settings/settings.md#settings-max_query_size). An empty string if it is not defined. - `log_comment` ([String](../../sql-reference/data-types/string.md)) — Log comment. It can be set to arbitrary string no longer than [max_query_size](../../operations/settings/settings.md#settings-max_query_size). An empty string if it is not defined.
- `thread_ids` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — Thread ids that are participating in query execution. - `thread_ids` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — Thread ids that are participating in query execution. These threads may not have run simultaneously.
- `peak_threads_usage` ([UInt64)](../../sql-reference/data-types/int-uint.md)) — Maximum count of simultaneous threads executing the query.
- `used_aggregate_functions` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `aggregate functions`, which were used during query execution. - `used_aggregate_functions` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `aggregate functions`, which were used during query execution.
- `used_aggregate_function_combinators` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `aggregate functions combinators`, which were used during query execution. - `used_aggregate_function_combinators` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `aggregate functions combinators`, which were used during query execution.
- `used_database_engines` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `database engines`, which were used during query execution. - `used_database_engines` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `database engines`, which were used during query execution.

View File

@ -99,7 +99,8 @@ ClickHouse не удаляет данные из таблица автомати
- `ProfileEvents` ([Map(String, UInt64)](../../sql-reference/data-types/array.md)) — счетчики для изменения различных метрик. Описание метрик можно получить из таблицы [system.events](#system_tables-events)(#system_tables-events - `ProfileEvents` ([Map(String, UInt64)](../../sql-reference/data-types/array.md)) — счетчики для изменения различных метрик. Описание метрик можно получить из таблицы [system.events](#system_tables-events)(#system_tables-events
- `Settings` ([Map(String, String)](../../sql-reference/data-types/array.md)) — имена настроек, которые меняются, когда клиент выполняет запрос. Чтобы разрешить логирование изменений настроек, установите параметр `log_query_settings` равным 1. - `Settings` ([Map(String, String)](../../sql-reference/data-types/array.md)) — имена настроек, которые меняются, когда клиент выполняет запрос. Чтобы разрешить логирование изменений настроек, установите параметр `log_query_settings` равным 1.
- `log_comment` ([String](../../sql-reference/data-types/string.md)) — комментарий к записи в логе. Представляет собой произвольную строку, длина которой должна быть не больше, чем [max_query_size](../../operations/settings/settings.md#settings-max_query_size). Если нет комментария, то пустая строка. - `log_comment` ([String](../../sql-reference/data-types/string.md)) — комментарий к записи в логе. Представляет собой произвольную строку, длина которой должна быть не больше, чем [max_query_size](../../operations/settings/settings.md#settings-max_query_size). Если нет комментария, то пустая строка.
- `thread_ids` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — идентификаторы потоков, участвующих в обработке запросов. - `thread_ids` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — идентификаторы потоков, участвующих в обработке запросов, эти потоки не обязательно выполняются одновременно.
- `peak_threads_usage` ([UInt64)](../../sql-reference/data-types/int-uint.md)) — максимальное количество одновременно работавших потоков, участвоваших в обработке запроса.
- `used_aggregate_functions` ([Array(String)](../../sql-reference/data-types/array.md)) — канонические имена `агрегатных функций`, использованных при выполнении запроса. - `used_aggregate_functions` ([Array(String)](../../sql-reference/data-types/array.md)) — канонические имена `агрегатных функций`, использованных при выполнении запроса.
- `used_aggregate_function_combinators` ([Array(String)](../../sql-reference/data-types/array.md)) — канонические имена `комбинаторов агрегатных функций`, использованных при выполнении запроса. - `used_aggregate_function_combinators` ([Array(String)](../../sql-reference/data-types/array.md)) — канонические имена `комбинаторов агрегатных функций`, использованных при выполнении запроса.
- `used_database_engines` ([Array(String)](../../sql-reference/data-types/array.md)) — канонические имена `движков баз данных`, использованных при выполнении запроса. - `used_database_engines` ([Array(String)](../../sql-reference/data-types/array.md)) — канонические имена `движков баз данных`, использованных при выполнении запроса.

View File

@ -57,7 +57,7 @@ ConcurrencyControl::Allocation::Allocation(ConcurrencyControl & parent_, SlotCou
*waiter = this; *waiter = this;
} }
// Grant single slot to allocation, returns true iff more slot(s) are required // Grant single slot to allocation returns true iff more slot(s) are required
bool ConcurrencyControl::Allocation::grant() bool ConcurrencyControl::Allocation::grant()
{ {
std::unique_lock lock{mutex}; std::unique_lock lock{mutex};

View File

@ -107,15 +107,25 @@ public:
static ThreadGroupPtr createForBackgroundProcess(ContextPtr storage_context); static ThreadGroupPtr createForBackgroundProcess(ContextPtr storage_context);
std::vector<UInt64> getInvolvedThreadIds() const; std::vector<UInt64> getInvolvedThreadIds() const;
void linkThread(UInt64 thread_it); size_t getPeakThreadsUsage() const;
void linkThread(UInt64 thread_id);
void unlinkThread();
private: private:
mutable std::mutex mutex; mutable std::mutex mutex;
/// Set up at creation, no race when reading /// Set up at creation, no race when reading
SharedData shared_data; SharedData shared_data TSA_GUARDED_BY(mutex);
/// Set of all thread ids which has been attached to the group /// Set of all thread ids which has been attached to the group
std::unordered_set<UInt64> thread_ids; std::unordered_set<UInt64> thread_ids TSA_GUARDED_BY(mutex);
/// Count of simultaneously working threads
size_t active_thread_count TSA_GUARDED_BY(mutex) = 0;
/// Peak threads count in the group
size_t peak_threads_usage TSA_GUARDED_BY(mutex) = 0;
}; };
/** /**

View File

@ -590,8 +590,10 @@ QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_even
res.peak_memory_usage = thread_group->memory_tracker.getPeak(); res.peak_memory_usage = thread_group->memory_tracker.getPeak();
if (get_thread_list) if (get_thread_list)
{
res.thread_ids = thread_group->getInvolvedThreadIds(); res.thread_ids = thread_group->getInvolvedThreadIds();
res.peak_threads_usage = thread_group->getPeakThreadsUsage();
}
if (get_profile_events) if (get_profile_events)
res.profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_group->performance_counters.getPartiallyAtomicSnapshot()); res.profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(thread_group->performance_counters.getPartiallyAtomicSnapshot());
} }

View File

@ -67,6 +67,7 @@ struct QueryStatusInfo
/// Optional fields, filled by query /// Optional fields, filled by query
std::vector<UInt64> thread_ids; std::vector<UInt64> thread_ids;
size_t peak_threads_usage;
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters; std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters;
std::shared_ptr<Settings> query_settings; std::shared_ptr<Settings> query_settings;
std::string current_database; std::string current_database;

View File

@ -118,6 +118,7 @@ NamesAndTypesList QueryLogElement::getNamesAndTypes()
{"log_comment", std::make_shared<DataTypeString>()}, {"log_comment", std::make_shared<DataTypeString>()},
{"thread_ids", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>())}, {"thread_ids", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>())},
{"peak_threads_usage", std::make_shared<DataTypeUInt64>()},
{"ProfileEvents", std::make_shared<DataTypeMap>(low_cardinality_string, std::make_shared<DataTypeUInt64>())}, {"ProfileEvents", std::make_shared<DataTypeMap>(low_cardinality_string, std::make_shared<DataTypeUInt64>())},
{"Settings", std::make_shared<DataTypeMap>(low_cardinality_string, low_cardinality_string)}, {"Settings", std::make_shared<DataTypeMap>(low_cardinality_string, low_cardinality_string)},
@ -230,6 +231,8 @@ void QueryLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(threads_array); columns[i++]->insert(threads_array);
} }
columns[i++]->insert(peak_threads_usage);
if (profile_counters) if (profile_counters)
{ {
auto * column = columns[i++].get(); auto * column = columns[i++].get();

View File

@ -91,6 +91,7 @@ struct QueryLogElement
String log_comment; String log_comment;
std::vector<UInt64> thread_ids; std::vector<UInt64> thread_ids;
UInt64 peak_threads_usage = 0;
std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters; std::shared_ptr<ProfileEvents::Counters::Snapshot> profile_counters;
std::shared_ptr<AsyncReadCounters> async_read_counters; std::shared_ptr<AsyncReadCounters> async_read_counters;
std::shared_ptr<Settings> query_settings; std::shared_ptr<Settings> query_settings;

View File

@ -61,10 +61,27 @@ std::vector<UInt64> ThreadGroup::getInvolvedThreadIds() const
return res; return res;
} }
void ThreadGroup::linkThread(UInt64 thread_it) size_t ThreadGroup::getPeakThreadsUsage() const
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
thread_ids.insert(thread_it); return peak_threads_usage;
}
void ThreadGroup::linkThread(UInt64 thread_id)
{
std::lock_guard lock(mutex);
thread_ids.insert(thread_id);
++active_thread_count;
peak_threads_usage = std::max(peak_threads_usage, active_thread_count);
}
void ThreadGroup::unlinkThread()
{
std::lock_guard lock(mutex);
chassert(active_thread_count > 0);
--active_thread_count;
} }
ThreadGroupPtr ThreadGroup::createForQuery(ContextPtr query_context_, std::function<void()> fatal_error_callback_) ThreadGroupPtr ThreadGroup::createForQuery(ContextPtr query_context_, std::function<void()> fatal_error_callback_)
@ -243,6 +260,8 @@ void ThreadStatus::detachFromGroup()
/// Extract MemoryTracker out from query and user context /// Extract MemoryTracker out from query and user context
memory_tracker.setParent(&total_memory_tracker); memory_tracker.setParent(&total_memory_tracker);
thread_group->unlinkThread();
thread_group.reset(); thread_group.reset();
query_id_from_query_context.clear(); query_id_from_query_context.clear();

View File

@ -241,6 +241,7 @@ addStatusInfoToQueryLogElement(QueryLogElement & element, const QueryStatusInfo
element.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0; element.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0;
element.thread_ids = info.thread_ids; element.thread_ids = info.thread_ids;
element.peak_threads_usage = info.peak_threads_usage;
element.profile_counters = info.profile_counters; element.profile_counters = info.profile_counters;
/// We need to refresh the access info since dependent views might have added extra information, either during /// We need to refresh the access info since dependent views might have added extra information, either during

View File

@ -40,7 +40,6 @@ bool MergePlainMergeTreeTask::executeStep()
if (merge_list_entry) if (merge_list_entry)
{ {
switcher.emplace((*merge_list_entry)->thread_group); switcher.emplace((*merge_list_entry)->thread_group);
} }
switch (state) switch (state)

View File

@ -0,0 +1,14 @@
1 2 1 1
2 2 1 1
3 2 1 1
4 2 1 1
5 4 1 1
6 6 1 1
7 2 1 1
8 2 1 1
9 2 1 1
10 6 1 1
11 6 1 1
12 6 1 1
13 2 1 1
14 2 1 1

View File

@ -0,0 +1,85 @@
#!/usr/bin/env bash
# Tags: no-parallel
# Tag no-parallel: Avoid using threads in other parallel queries.
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
QUERY_OPTIONS=(
"--log_query_threads=1"
"--log_queries_min_type=QUERY_FINISH"
"--log_queries=1"
"--format=Null"
"--use_concurrency_control=0"
)
UNIQUE_QUERY_ID="02871_1_$$"
# TCPHandler and QueryPullPipeEx threads are always part of the query thread group, but those threads are not within the max_threads limit.
${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_1" --query='SELECT 1' "${QUERY_OPTIONS[@]}"
${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_2" --query='SELECT 1 SETTINGS max_threads = 1' "${QUERY_OPTIONS[@]}"
${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_3" --query='SELECT 1 SETTINGS max_threads = 8' "${QUERY_OPTIONS[@]}"
${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_4" --query='SELECT * FROM numbers_mt(500000) SETTINGS max_threads = 1' "${QUERY_OPTIONS[@]}"
${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_5" --query='SELECT * FROM numbers_mt(500000) SETTINGS max_threads = 2' "${QUERY_OPTIONS[@]}"
${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_6" --query='SELECT * FROM numbers_mt(500000) SETTINGS max_threads = 4' "${QUERY_OPTIONS[@]}"
${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_7" --query='SELECT * FROM numbers_mt(5000), numbers(5000) SETTINGS max_threads = 1, joined_subquery_requires_alias=0' "${QUERY_OPTIONS[@]}"
${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_8" --query='SELECT * FROM numbers_mt(5000), numbers(5000) SETTINGS max_threads = 4, joined_subquery_requires_alias=0' "${QUERY_OPTIONS[@]}"
${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_9" -mn --query="""
SELECT count() FROM
(SELECT number FROM numbers_mt(1,100000)
UNION ALL SELECT number FROM numbers_mt(10000, 200000)
UNION ALL SELECT number FROM numbers_mt(30000, 40000)
UNION ALL SELECT number FROM numbers_mt(30000, 40000)
UNION ALL SELECT number FROM numbers_mt(300000, 400000)
UNION ALL SELECT number FROM numbers_mt(300000, 400000)
UNION ALL SELECT number FROM numbers_mt(300000, 4000000)
UNION ALL SELECT number FROM numbers_mt(300000, 4000000)
) SETTINGS max_threads = 1""" "${QUERY_OPTIONS[@]}"
${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_10" -mn --query="""
SELECT count() FROM
(SELECT number FROM numbers_mt(1,100000)
UNION ALL SELECT number FROM numbers_mt(10000, 2000)
UNION ALL SELECT number FROM numbers_mt(30000, 40000)
UNION ALL SELECT number FROM numbers_mt(30000, 40)
UNION ALL SELECT number FROM numbers_mt(300000, 400)
UNION ALL SELECT number FROM numbers_mt(300000, 4000)
UNION ALL SELECT number FROM numbers_mt(300000, 40000)
UNION ALL SELECT number FROM numbers_mt(300000, 4000000)
) SETTINGS max_threads = 4""" "${QUERY_OPTIONS[@]}"
${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_11" -mn --query="""
SELECT count() FROM
(SELECT number FROM numbers_mt(1,100000)
UNION ALL SELECT number FROM numbers_mt(1, 1)
UNION ALL SELECT number FROM numbers_mt(1, 1)
UNION ALL SELECT number FROM numbers_mt(1, 1)
UNION ALL SELECT number FROM numbers_mt(1, 1)
UNION ALL SELECT number FROM numbers_mt(1, 1)
UNION ALL SELECT number FROM numbers_mt(1, 1)
UNION ALL SELECT number FROM numbers_mt(1, 4000000)
) SETTINGS max_threads = 4""" "${QUERY_OPTIONS[@]}"
${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_12" -mn --query="""
SELECT sum(number) FROM numbers_mt(100000)
GROUP BY number % 2
WITH TOTALS ORDER BY number % 2
SETTINGS max_threads = 4""" "${QUERY_OPTIONS[@]}"
${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_13" -mn --query="SELECT * FROM numbers(100000) SETTINGS max_threads = 1" "${QUERY_OPTIONS[@]}"
${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_14" -mn --query="SELECT * FROM numbers(100000) SETTINGS max_threads = 4" "${QUERY_OPTIONS[@]}"
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
for i in {1..14}
do
${CLICKHOUSE_CLIENT} -mn --query="""
SELECT '${i}',
peak_threads_usage,
(select count() from system.query_thread_log WHERE system.query_thread_log.query_id = '${UNIQUE_QUERY_ID}_${i}' AND current_database = currentDatabase()) = length(thread_ids),
length(thread_ids) >= peak_threads_usage
FROM system.query_log
WHERE type = 'QueryFinish' AND query_id = '${UNIQUE_QUERY_ID}_${i}' AND current_database = currentDatabase()"
done