diff --git a/docs/en/operations/system-tables/query_log.md b/docs/en/operations/system-tables/query_log.md index 7143520835f..ced97166702 100644 --- a/docs/en/operations/system-tables/query_log.md +++ b/docs/en/operations/system-tables/query_log.md @@ -101,7 +101,8 @@ Columns: - `ProfileEvents` ([Map(String, UInt64)](../../sql-reference/data-types/map.md)) — ProfileEvents that measure different metrics. The description of them could be found in the table [system.events](../../operations/system-tables/events.md#system_tables-events) - `Settings` ([Map(String, String)](../../sql-reference/data-types/map.md)) — Settings that were changed when the client ran the query. To enable logging changes to settings, set the `log_query_settings` parameter to 1. - `log_comment` ([String](../../sql-reference/data-types/string.md)) — Log comment. It can be set to arbitrary string no longer than [max_query_size](../../operations/settings/settings.md#settings-max_query_size). An empty string if it is not defined. -- `thread_ids` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — Thread ids that are participating in query execution. +- `thread_ids` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — Thread ids that are participating in query execution. These threads may not have run simultaneously. +- `peak_threads_usage` ([UInt64)](../../sql-reference/data-types/int-uint.md)) — Maximum count of simultaneous threads executing the query. - `used_aggregate_functions` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `aggregate functions`, which were used during query execution. - `used_aggregate_function_combinators` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `aggregate functions combinators`, which were used during query execution. - `used_database_engines` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `database engines`, which were used during query execution. diff --git a/docs/ru/operations/system-tables/query_log.md b/docs/ru/operations/system-tables/query_log.md index 29793188d3d..8d79aa0eef7 100644 --- a/docs/ru/operations/system-tables/query_log.md +++ b/docs/ru/operations/system-tables/query_log.md @@ -99,7 +99,8 @@ ClickHouse не удаляет данные из таблица автомати - `ProfileEvents` ([Map(String, UInt64)](../../sql-reference/data-types/array.md)) — счетчики для изменения различных метрик. Описание метрик можно получить из таблицы [system.events](#system_tables-events)(#system_tables-events - `Settings` ([Map(String, String)](../../sql-reference/data-types/array.md)) — имена настроек, которые меняются, когда клиент выполняет запрос. Чтобы разрешить логирование изменений настроек, установите параметр `log_query_settings` равным 1. - `log_comment` ([String](../../sql-reference/data-types/string.md)) — комментарий к записи в логе. Представляет собой произвольную строку, длина которой должна быть не больше, чем [max_query_size](../../operations/settings/settings.md#settings-max_query_size). Если нет комментария, то пустая строка. -- `thread_ids` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — идентификаторы потоков, участвующих в обработке запросов. +- `thread_ids` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — идентификаторы потоков, участвующих в обработке запросов, эти потоки не обязательно выполняются одновременно. +- `peak_threads_usage` ([UInt64)](../../sql-reference/data-types/int-uint.md)) — максимальное количество одновременно работавших потоков, участвоваших в обработке запроса. - `used_aggregate_functions` ([Array(String)](../../sql-reference/data-types/array.md)) — канонические имена `агрегатных функций`, использованных при выполнении запроса. - `used_aggregate_function_combinators` ([Array(String)](../../sql-reference/data-types/array.md)) — канонические имена `комбинаторов агрегатных функций`, использованных при выполнении запроса. - `used_database_engines` ([Array(String)](../../sql-reference/data-types/array.md)) — канонические имена `движков баз данных`, использованных при выполнении запроса. diff --git a/src/Common/ConcurrencyControl.cpp b/src/Common/ConcurrencyControl.cpp index de46f0e90ca..c9fe51550dc 100644 --- a/src/Common/ConcurrencyControl.cpp +++ b/src/Common/ConcurrencyControl.cpp @@ -57,7 +57,7 @@ ConcurrencyControl::Allocation::Allocation(ConcurrencyControl & parent_, SlotCou *waiter = this; } -// Grant single slot to allocation, returns true iff more slot(s) are required +// Grant single slot to allocation returns true iff more slot(s) are required bool ConcurrencyControl::Allocation::grant() { std::unique_lock lock{mutex}; diff --git a/src/Common/ThreadStatus.h b/src/Common/ThreadStatus.h index 7c8dbdb68bd..970bb5cd6e6 100644 --- a/src/Common/ThreadStatus.h +++ b/src/Common/ThreadStatus.h @@ -107,15 +107,25 @@ public: static ThreadGroupPtr createForBackgroundProcess(ContextPtr storage_context); std::vector getInvolvedThreadIds() const; - void linkThread(UInt64 thread_it); + size_t getPeakThreadsUsage() const; + + void linkThread(UInt64 thread_id); + void unlinkThread(); private: mutable std::mutex mutex; /// Set up at creation, no race when reading - SharedData shared_data; + SharedData shared_data TSA_GUARDED_BY(mutex); + /// Set of all thread ids which has been attached to the group - std::unordered_set thread_ids; + std::unordered_set thread_ids TSA_GUARDED_BY(mutex); + + /// Count of simultaneously working threads + size_t active_thread_count TSA_GUARDED_BY(mutex) = 0; + + /// Peak threads count in the group + size_t peak_threads_usage TSA_GUARDED_BY(mutex) = 0; }; /** diff --git a/src/Interpreters/ProcessList.cpp b/src/Interpreters/ProcessList.cpp index c299572ef41..8572470abc1 100644 --- a/src/Interpreters/ProcessList.cpp +++ b/src/Interpreters/ProcessList.cpp @@ -590,8 +590,10 @@ QueryStatusInfo QueryStatus::getInfo(bool get_thread_list, bool get_profile_even res.peak_memory_usage = thread_group->memory_tracker.getPeak(); if (get_thread_list) + { res.thread_ids = thread_group->getInvolvedThreadIds(); - + res.peak_threads_usage = thread_group->getPeakThreadsUsage(); + } if (get_profile_events) res.profile_counters = std::make_shared(thread_group->performance_counters.getPartiallyAtomicSnapshot()); } diff --git a/src/Interpreters/ProcessList.h b/src/Interpreters/ProcessList.h index 2eea49e1267..75a0eaa34bc 100644 --- a/src/Interpreters/ProcessList.h +++ b/src/Interpreters/ProcessList.h @@ -67,6 +67,7 @@ struct QueryStatusInfo /// Optional fields, filled by query std::vector thread_ids; + size_t peak_threads_usage; std::shared_ptr profile_counters; std::shared_ptr query_settings; std::string current_database; diff --git a/src/Interpreters/QueryLog.cpp b/src/Interpreters/QueryLog.cpp index df21e82305a..57f3968fba1 100644 --- a/src/Interpreters/QueryLog.cpp +++ b/src/Interpreters/QueryLog.cpp @@ -118,6 +118,7 @@ NamesAndTypesList QueryLogElement::getNamesAndTypes() {"log_comment", std::make_shared()}, {"thread_ids", std::make_shared(std::make_shared())}, + {"peak_threads_usage", std::make_shared()}, {"ProfileEvents", std::make_shared(low_cardinality_string, std::make_shared())}, {"Settings", std::make_shared(low_cardinality_string, low_cardinality_string)}, @@ -230,6 +231,8 @@ void QueryLogElement::appendToBlock(MutableColumns & columns) const columns[i++]->insert(threads_array); } + columns[i++]->insert(peak_threads_usage); + if (profile_counters) { auto * column = columns[i++].get(); diff --git a/src/Interpreters/QueryLog.h b/src/Interpreters/QueryLog.h index 5bc80280eac..fe9b7cbdbc8 100644 --- a/src/Interpreters/QueryLog.h +++ b/src/Interpreters/QueryLog.h @@ -91,6 +91,7 @@ struct QueryLogElement String log_comment; std::vector thread_ids; + UInt64 peak_threads_usage = 0; std::shared_ptr profile_counters; std::shared_ptr async_read_counters; std::shared_ptr query_settings; diff --git a/src/Interpreters/ThreadStatusExt.cpp b/src/Interpreters/ThreadStatusExt.cpp index 6ee8ec987db..efb8c6792bc 100644 --- a/src/Interpreters/ThreadStatusExt.cpp +++ b/src/Interpreters/ThreadStatusExt.cpp @@ -61,10 +61,27 @@ std::vector ThreadGroup::getInvolvedThreadIds() const return res; } -void ThreadGroup::linkThread(UInt64 thread_it) +size_t ThreadGroup::getPeakThreadsUsage() const { std::lock_guard lock(mutex); - thread_ids.insert(thread_it); + return peak_threads_usage; +} + + +void ThreadGroup::linkThread(UInt64 thread_id) +{ + std::lock_guard lock(mutex); + thread_ids.insert(thread_id); + + ++active_thread_count; + peak_threads_usage = std::max(peak_threads_usage, active_thread_count); +} + +void ThreadGroup::unlinkThread() +{ + std::lock_guard lock(mutex); + chassert(active_thread_count > 0); + --active_thread_count; } ThreadGroupPtr ThreadGroup::createForQuery(ContextPtr query_context_, std::function fatal_error_callback_) @@ -243,6 +260,8 @@ void ThreadStatus::detachFromGroup() /// Extract MemoryTracker out from query and user context memory_tracker.setParent(&total_memory_tracker); + thread_group->unlinkThread(); + thread_group.reset(); query_id_from_query_context.clear(); diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index bac6807b682..1bfeeaa8ad4 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -241,6 +241,7 @@ addStatusInfoToQueryLogElement(QueryLogElement & element, const QueryStatusInfo element.memory_usage = info.peak_memory_usage > 0 ? info.peak_memory_usage : 0; element.thread_ids = info.thread_ids; + element.peak_threads_usage = info.peak_threads_usage; element.profile_counters = info.profile_counters; /// We need to refresh the access info since dependent views might have added extra information, either during diff --git a/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp b/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp index 3f5753a0c95..c218acce903 100644 --- a/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp +++ b/src/Storages/MergeTree/MergePlainMergeTreeTask.cpp @@ -40,7 +40,6 @@ bool MergePlainMergeTreeTask::executeStep() if (merge_list_entry) { switcher.emplace((*merge_list_entry)->thread_group); - } switch (state) diff --git a/tests/queries/0_stateless/02871_peak_threads_usage.reference b/tests/queries/0_stateless/02871_peak_threads_usage.reference new file mode 100644 index 00000000000..d772a2c4b4e --- /dev/null +++ b/tests/queries/0_stateless/02871_peak_threads_usage.reference @@ -0,0 +1,14 @@ +1 2 1 1 +2 2 1 1 +3 2 1 1 +4 2 1 1 +5 4 1 1 +6 6 1 1 +7 2 1 1 +8 2 1 1 +9 2 1 1 +10 6 1 1 +11 6 1 1 +12 6 1 1 +13 2 1 1 +14 2 1 1 diff --git a/tests/queries/0_stateless/02871_peak_threads_usage.sh b/tests/queries/0_stateless/02871_peak_threads_usage.sh new file mode 100755 index 00000000000..dfb3e665020 --- /dev/null +++ b/tests/queries/0_stateless/02871_peak_threads_usage.sh @@ -0,0 +1,85 @@ +#!/usr/bin/env bash +# Tags: no-parallel +# Tag no-parallel: Avoid using threads in other parallel queries. + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +QUERY_OPTIONS=( + "--log_query_threads=1" + "--log_queries_min_type=QUERY_FINISH" + "--log_queries=1" + "--format=Null" + "--use_concurrency_control=0" +) + +UNIQUE_QUERY_ID="02871_1_$$" + +# TCPHandler and QueryPullPipeEx threads are always part of the query thread group, but those threads are not within the max_threads limit. +${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_1" --query='SELECT 1' "${QUERY_OPTIONS[@]}" +${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_2" --query='SELECT 1 SETTINGS max_threads = 1' "${QUERY_OPTIONS[@]}" +${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_3" --query='SELECT 1 SETTINGS max_threads = 8' "${QUERY_OPTIONS[@]}" +${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_4" --query='SELECT * FROM numbers_mt(500000) SETTINGS max_threads = 1' "${QUERY_OPTIONS[@]}" +${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_5" --query='SELECT * FROM numbers_mt(500000) SETTINGS max_threads = 2' "${QUERY_OPTIONS[@]}" +${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_6" --query='SELECT * FROM numbers_mt(500000) SETTINGS max_threads = 4' "${QUERY_OPTIONS[@]}" +${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_7" --query='SELECT * FROM numbers_mt(5000), numbers(5000) SETTINGS max_threads = 1, joined_subquery_requires_alias=0' "${QUERY_OPTIONS[@]}" +${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_8" --query='SELECT * FROM numbers_mt(5000), numbers(5000) SETTINGS max_threads = 4, joined_subquery_requires_alias=0' "${QUERY_OPTIONS[@]}" + +${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_9" -mn --query=""" +SELECT count() FROM + (SELECT number FROM numbers_mt(1,100000) + UNION ALL SELECT number FROM numbers_mt(10000, 200000) + UNION ALL SELECT number FROM numbers_mt(30000, 40000) + UNION ALL SELECT number FROM numbers_mt(30000, 40000) + UNION ALL SELECT number FROM numbers_mt(300000, 400000) + UNION ALL SELECT number FROM numbers_mt(300000, 400000) + UNION ALL SELECT number FROM numbers_mt(300000, 4000000) + UNION ALL SELECT number FROM numbers_mt(300000, 4000000) + ) SETTINGS max_threads = 1""" "${QUERY_OPTIONS[@]}" + +${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_10" -mn --query=""" +SELECT count() FROM + (SELECT number FROM numbers_mt(1,100000) + UNION ALL SELECT number FROM numbers_mt(10000, 2000) + UNION ALL SELECT number FROM numbers_mt(30000, 40000) + UNION ALL SELECT number FROM numbers_mt(30000, 40) + UNION ALL SELECT number FROM numbers_mt(300000, 400) + UNION ALL SELECT number FROM numbers_mt(300000, 4000) + UNION ALL SELECT number FROM numbers_mt(300000, 40000) + UNION ALL SELECT number FROM numbers_mt(300000, 4000000) + ) SETTINGS max_threads = 4""" "${QUERY_OPTIONS[@]}" + +${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_11" -mn --query=""" +SELECT count() FROM + (SELECT number FROM numbers_mt(1,100000) + UNION ALL SELECT number FROM numbers_mt(1, 1) + UNION ALL SELECT number FROM numbers_mt(1, 1) + UNION ALL SELECT number FROM numbers_mt(1, 1) + UNION ALL SELECT number FROM numbers_mt(1, 1) + UNION ALL SELECT number FROM numbers_mt(1, 1) + UNION ALL SELECT number FROM numbers_mt(1, 1) + UNION ALL SELECT number FROM numbers_mt(1, 4000000) + ) SETTINGS max_threads = 4""" "${QUERY_OPTIONS[@]}" + +${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_12" -mn --query=""" +SELECT sum(number) FROM numbers_mt(100000) +GROUP BY number % 2 +WITH TOTALS ORDER BY number % 2 +SETTINGS max_threads = 4""" "${QUERY_OPTIONS[@]}" + +${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_13" -mn --query="SELECT * FROM numbers(100000) SETTINGS max_threads = 1" "${QUERY_OPTIONS[@]}" + +${CLICKHOUSE_CLIENT} --query_id="${UNIQUE_QUERY_ID}_14" -mn --query="SELECT * FROM numbers(100000) SETTINGS max_threads = 4" "${QUERY_OPTIONS[@]}" + +${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS" +for i in {1..14} +do + ${CLICKHOUSE_CLIENT} -mn --query=""" + SELECT '${i}', + peak_threads_usage, + (select count() from system.query_thread_log WHERE system.query_thread_log.query_id = '${UNIQUE_QUERY_ID}_${i}' AND current_database = currentDatabase()) = length(thread_ids), + length(thread_ids) >= peak_threads_usage + FROM system.query_log + WHERE type = 'QueryFinish' AND query_id = '${UNIQUE_QUERY_ID}_${i}' AND current_database = currentDatabase()" +done