Implement log_processors_profiles (write to processors_profile_log, OFF by default)

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2022-02-06 14:21:05 +03:00
parent 5fd402eaba
commit 765b4bc45a
3 changed files with 29 additions and 3 deletions

View File

@ -1062,6 +1062,15 @@ Result:
└─────────────┴───────────┘
```
## log_processors_profiles {#settings-log_processors_profiles}
Write time that processor spent during execution/waiting for data to `system.processors_profile_log` table.
See also:
- [`system.processors_profile_log`](../../operations/system-tables/processors_profile_log.md#system-processors_profile_log)
- [`EXPLAIN PIPELINE`](../../sql-reference/statements/explain.md#explain-pipeline)
## max_insert_block_size {#settings-max_insert_block_size}
The size of blocks (in a count of rows) to form for insertion into a table.

View File

@ -195,6 +195,7 @@ class IColumn;
M(UInt64, log_queries_cut_to_length, 100000, "If query length is greater than specified threshold (in bytes), then cut query when writing to query log. Also limit length of printed query in ordinary text log.", 0) \
M(Float, log_queries_probability, 1., "Log queries with the specified probabality.", 0) \
\
M(Bool, log_processors_profiles, false, "Log Processors profile events.", 0) \
M(DistributedProductMode, distributed_product_mode, DistributedProductMode::DENY, "How are distributed subqueries performed inside IN or JOIN sections?", IMPORTANT) \
\
M(UInt64, max_concurrent_queries_for_all_users, 0, "The maximum number of concurrent requests for all users.", 0) \

View File

@ -45,6 +45,7 @@
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/QueryLog.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/executeQuery.h>
@ -811,6 +812,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
log_queries,
log_queries_min_type = settings.log_queries_min_type,
log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(),
log_processors_profiles = settings.log_processors_profiles,
status_info_to_query_log,
pulling_pipeline = pipeline.pulling()
]
@ -866,10 +868,24 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (auto query_log = context->getQueryLog())
query_log->add(elem);
}
if (log_processors_profiles)
{
if (auto query_log = context->getQueryLog())
query_log->add(elem);
if (auto processors_profile_log = context->getProcessorsProfileLog())
{
ProcessorProfileLogElement processor_elem;
processor_elem.event_time = time_in_seconds(finish_time);
processor_elem.event_time_microseconds = time_in_microseconds(finish_time);
processor_elem.query_id = elem.client_info.current_query_id;
for (const auto & processor : query_pipeline.getProcessors())
{
processor_elem.processor_name = processor->getName();
processor_elem.elapsed_us = processor->getElapsedUs();
processor_elem.need_data_elapsed_us = processor->getNeedDataElapsedUs();
processor_elem.port_full_elapsed_us = processor->getPortFullElapsedUs();
processors_profile_log->add(processor_elem);
}
}
}
if (auto opentelemetry_span_log = context->getOpenTelemetrySpanLog();