Merge pull request #34355 from azat/processors-profiling

Profiling on Processors level
This commit is contained in:
Nikolai Kochetov 2022-04-07 12:13:14 +02:00 committed by GitHub
commit 3e1b3f14c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 430 additions and 21 deletions

View File

@ -1062,6 +1062,15 @@ Result:
└─────────────┴───────────┘
```
## log_processors_profiles {#settings-log_processors_profiles}
Write time that processor spent during execution/waiting for data to `system.processors_profile_log` table.
See also:
- [`system.processors_profile_log`](../../operations/system-tables/processors_profile_log.md#system-processors_profile_log)
- [`EXPLAIN PIPELINE`](../../sql-reference/statements/explain.md#explain-pipeline)
## max_insert_block_size {#settings-max_insert_block_size}
The size of blocks (in a count of rows) to form for insertion into a table.

View File

@ -0,0 +1,75 @@
# system.processors_profile_log {#system-processors_profile_log}
This table contains profiling on processors level (that you can find in [`EXPLAIN PIPELINE`](../../sql-reference/statements/explain.md#explain-pipeline)).
Columns:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — The date when the event happened.
- `event_time` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — The date and time when the event happened.
- `id` ([UInt64](../../sql-reference/data-types/int-uint.md)) — ID of processor
- `parent_ids` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — Parent processors IDs
- `query_id` ([String](../../sql-reference/data-types/string.md)) — ID of the query
- `name` ([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md)) — Name of the processor.
- `elapsed_us` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of microseconds this processor was executed.
- `input_wait_elapsed_us` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of microseconds this processor was waiting for data (from other processor).
- `output_wait_elapsed_us` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of microseconds this processor was waiting because output port was full.
**Example**
Query:
``` sql
EXPLAIN PIPELINE
SELECT sleep(1)
┌─explain─────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (SettingQuotaAndLimits) │
│ (ReadFromStorage) │
│ SourceFromSingleChunk 0 → 1 │
└─────────────────────────────────┘
SELECT sleep(1)
SETTINGS log_processors_profiles = 1
Query id: feb5ed16-1c24-4227-aa54-78c02b3b27d4
┌─sleep(1)─┐
│ 0 │
└──────────┘
1 rows in set. Elapsed: 1.018 sec.
SELECT
name,
elapsed_us,
input_wait_elapsed_us,
output_wait_elapsed_us
FROM system.processors_profile_log
WHERE query_id = 'feb5ed16-1c24-4227-aa54-78c02b3b27d4'
ORDER BY name ASC
```
Result:
``` text
┌─name────────────────────┬─elapsed_us─┬─input_wait_elapsed_us─┬─output_wait_elapsed_us─┐
│ ExpressionTransform │ 1000497 │ 2823 │ 197 │
│ LazyOutputFormat │ 36 │ 1002188 │ 0 │
│ LimitsCheckingTransform │ 10 │ 1002994 │ 106 │
│ NullSource │ 5 │ 1002074 │ 0 │
│ NullSource │ 1 │ 1002084 │ 0 │
│ SourceFromSingleChunk │ 45 │ 4736 │ 1000819 │
└─────────────────────────┴────────────┴───────────────────────┴────────────────────────┘
```
Here you can see:
- `ExpressionTransform` was executing `sleep(1)` function, so it `work` will takes 1e6, and so `elapsed_us` > 1e6.
- `SourceFromSingleChunk` need to wait, because `ExpressionTransform` does not accept any data during execution of `sleep(1)`, so it will be in `PortFull` state for 1e6 us, and so `output_wait_elapsed_us` > 1e6.
- `LimitsCheckingTransform`/`NullSource`/`LazyOutputFormat` need to wait until `ExpressionTransform` will execute `sleep(1)` to process the result, so `input_wait_elapsed_us` > 1e6.
**See Also**
- [`EXPLAIN PIPELINE`](../../sql-reference/statements/explain.md#explain-pipeline)

View File

@ -1042,6 +1042,15 @@
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</session_log> -->
<!-- Profiling on Processors level. -->
<processors_profile_log>
<database>system</database>
<table>processors_profile_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
</processors_profile_log>
<!-- <top_level_domains_path>/var/lib/clickhouse/top_level_domains/</top_level_domains_path> -->
<!-- Custom TLD lists.
Format: <name>/path/to/file</name>

View File

@ -9,6 +9,7 @@
#include <Interpreters/SessionLog.h>
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Common/MemoryTrackerBlockerInThread.h>

View File

@ -24,6 +24,7 @@
M(SessionLogElement) \
M(TraceLogElement) \
M(ZooKeeperLogElement) \
M(ProcessorProfileLogElement) \
M(TextLogElement)
namespace Poco

View File

@ -195,6 +195,7 @@ class IColumn;
M(UInt64, log_queries_cut_to_length, 100000, "If query length is greater than specified threshold (in bytes), then cut query when writing to query log. Also limit length of printed query in ordinary text log.", 0) \
M(Float, log_queries_probability, 1., "Log queries with the specified probabality.", 0) \
\
M(Bool, log_processors_profiles, false, "Log Processors profile events.", 0) \
M(DistributedProductMode, distributed_product_mode, DistributedProductMode::DENY, "How are distributed subqueries performed inside IN or JOIN sections?", IMPORTANT) \
\
M(UInt64, max_concurrent_queries_for_all_users, 0, "The maximum number of concurrent requests for all users.", 0) \

View File

@ -2474,6 +2474,17 @@ std::shared_ptr<ZooKeeperLog> Context::getZooKeeperLog() const
}
std::shared_ptr<ProcessorsProfileLog> Context::getProcessorsProfileLog() const
{
auto lock = getLock();
if (!shared->system_logs)
return {};
return shared->system_logs->processors_profile_log;
}
CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double part_size_ratio) const
{
auto lock = getLock();

View File

@ -80,6 +80,7 @@ class AsynchronousMetricLog;
class OpenTelemetrySpanLog;
class ZooKeeperLog;
class SessionLog;
class ProcessorsProfileLog;
struct MergeTreeSettings;
class StorageS3Settings;
class IDatabase;
@ -800,6 +801,7 @@ public:
std::shared_ptr<OpenTelemetrySpanLog> getOpenTelemetrySpanLog() const;
std::shared_ptr<ZooKeeperLog> getZooKeeperLog() const;
std::shared_ptr<SessionLog> getSessionLog() const;
std::shared_ptr<ProcessorsProfileLog> getProcessorsProfileLog() const;
/// Returns an object used to log operations with parts if it possible.
/// Provide table name to make required checks.

View File

@ -29,6 +29,7 @@
#include <Interpreters/AsynchronousMetricLog.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Access/ContextAccess.h>
#include <Access/Common/AllowedClientHosts.h>
@ -443,7 +444,8 @@ BlockIO InterpreterSystemQuery::execute()
[&] { if (auto opentelemetry_span_log = getContext()->getOpenTelemetrySpanLog()) opentelemetry_span_log->flush(true); },
[&] { if (auto query_views_log = getContext()->getQueryViewsLog()) query_views_log->flush(true); },
[&] { if (auto zookeeper_log = getContext()->getZooKeeperLog()) zookeeper_log->flush(true); },
[&] { if (auto session_log = getContext()->getSessionLog()) session_log->flush(true); }
[&] { if (auto session_log = getContext()->getSessionLog()) session_log->flush(true); },
[&] { if (auto processors_profile_log = getContext()->getProcessorsProfileLog()) processors_profile_log->flush(true); }
);
break;
}

View File

@ -0,0 +1,70 @@
#include <Interpreters/ProcessorsProfileLog.h>
#include <Common/ClickHouseRevision.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeArray.h>
#include <base/logger_useful.h>
#include <array>
namespace DB
{
NamesAndTypesList ProcessorProfileLogElement::getNamesAndTypes()
{
return
{
{"event_date", std::make_shared<DataTypeDate>()},
{"event_time", std::make_shared<DataTypeDateTime>()},
{"event_time_microseconds", std::make_shared<DataTypeDateTime64>(6)},
{"id", std::make_shared<DataTypeUInt64>()},
{"parent_ids", std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt64>())},
{"query_id", std::make_shared<DataTypeString>()},
{"name", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"elapsed_us", std::make_shared<DataTypeUInt64>()},
{"input_wait_elapsed_us", std::make_shared<DataTypeUInt64>()},
{"output_wait_elapsed_us", std::make_shared<DataTypeUInt64>()},
};
}
void ProcessorProfileLogElement::appendToBlock(MutableColumns & columns) const
{
size_t i = 0;
columns[i++]->insert(DateLUT::instance().toDayNum(event_time).toUnderType());
columns[i++]->insert(event_time);
columns[i++]->insert(event_time_microseconds);
columns[i++]->insert(id);
{
Array parent_ids_array;
parent_ids_array.reserve(parent_ids.size());
for (const UInt64 parent : parent_ids)
parent_ids_array.emplace_back(parent);
columns[i++]->insert(parent_ids_array);
}
columns[i++]->insertData(query_id.data(), query_id.size());
columns[i++]->insertData(processor_name.data(), processor_name.size());
columns[i++]->insert(elapsed_us);
columns[i++]->insert(input_wait_elapsed_us);
columns[i++]->insert(output_wait_elapsed_us);
}
ProcessorsProfileLog::ProcessorsProfileLog(ContextPtr context_, const String & database_name_,
const String & table_name_, const String & storage_def_,
size_t flush_interval_milliseconds_)
: SystemLog<ProcessorProfileLogElement>(context_, database_name_, table_name_,
storage_def_, flush_interval_milliseconds_)
{
}
}

View File

@ -0,0 +1,46 @@
#pragma once
#include <Interpreters/SystemLog.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <Processors/IProcessor.h>
namespace DB
{
struct ProcessorProfileLogElement
{
time_t event_time{};
Decimal64 event_time_microseconds{};
UInt64 id;
std::vector<UInt64> parent_ids;
String query_id;
String processor_name;
/// Milliseconds spend in IProcessor::work()
UInt32 elapsed_us{};
/// IProcessor::NeedData
UInt32 input_wait_elapsed_us{};
/// IProcessor::PortFull
UInt32 output_wait_elapsed_us{};
static std::string name() { return "ProcessorsProfileLog"; }
static NamesAndTypesList getNamesAndTypes();
static NamesAndAliases getNamesAndAliases() { return {}; }
void appendToBlock(MutableColumns & columns) const;
};
class ProcessorsProfileLog : public SystemLog<ProcessorProfileLogElement>
{
public:
ProcessorsProfileLog(
ContextPtr context_,
const String & database_name_,
const String & table_name_,
const String & storage_def_,
size_t flush_interval_milliseconds_);
};
}

View File

@ -9,6 +9,7 @@
#include <Interpreters/SessionLog.h>
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/InterpreterRenameQuery.h>
@ -201,6 +202,7 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
query_views_log = createSystemLog<QueryViewsLog>(global_context, "system", "query_views_log", config, "query_views_log");
zookeeper_log = createSystemLog<ZooKeeperLog>(global_context, "system", "zookeeper_log", config, "zookeeper_log");
session_log = createSystemLog<SessionLog>(global_context, "system", "session_log", config, "session_log");
processors_profile_log = createSystemLog<ProcessorsProfileLog>(global_context, "system", "processors_profile_log", config, "processors_profile_log");
if (query_log)
logs.emplace_back(query_log.get());
@ -226,6 +228,8 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
logs.emplace_back(zookeeper_log.get());
if (session_log)
logs.emplace_back(session_log.get());
if (processors_profile_log)
logs.emplace_back(processors_profile_log.get());
try
{

View File

@ -43,6 +43,7 @@ class OpenTelemetrySpanLog;
class QueryViewsLog;
class ZooKeeperLog;
class SessionLog;
class ProcessorsProfileLog;
/// System logs should be destroyed in destructor of the last Context and before tables,
/// because SystemLog destruction makes insert query while flushing data into underlying tables
@ -70,6 +71,8 @@ struct SystemLogs
std::shared_ptr<ZooKeeperLog> zookeeper_log;
/// Login, LogOut and Login failure events
std::shared_ptr<SessionLog> session_log;
/// Used to log processors profiling
std::shared_ptr<ProcessorsProfileLog> processors_profile_log;
std::vector<ISystemLog *> logs;
};

View File

@ -45,6 +45,7 @@
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/QueryLog.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/executeQuery.h>
@ -811,6 +812,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
log_queries,
log_queries_min_type = settings.log_queries_min_type,
log_queries_min_query_duration_ms = settings.log_queries_min_query_duration_ms.totalMilliseconds(),
log_processors_profiles = settings.log_processors_profiles,
status_info_to_query_log,
pulling_pipeline = pipeline.pulling()
]
@ -866,6 +868,44 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (auto query_log = context->getQueryLog())
query_log->add(elem);
}
if (log_processors_profiles)
{
if (auto processors_profile_log = context->getProcessorsProfileLog())
{
ProcessorProfileLogElement processor_elem;
processor_elem.event_time = time_in_seconds(finish_time);
processor_elem.event_time_microseconds = time_in_microseconds(finish_time);
processor_elem.query_id = elem.client_info.current_query_id;
auto get_proc_id = [](const IProcessor & proc) -> UInt64
{
return reinterpret_cast<std::uintptr_t>(&proc);
};
for (const auto & processor : query_pipeline.getProcessors())
{
std::vector<UInt64> parents;
for (const auto & port : processor->getOutputs())
{
if (!port.isConnected())
continue;
const IProcessor & next = port.getInputPort().getProcessor();
parents.push_back(get_proc_id(next));
}
processor_elem.id = get_proc_id(*processor);
processor_elem.parent_ids = std::move(parents);
processor_elem.processor_name = processor->getName();
processor_elem.elapsed_us = processor->getElapsedUs();
processor_elem.input_wait_elapsed_us = processor->getInputWaitElapsedUs();
processor_elem.output_wait_elapsed_us = processor->getOutputWaitElapsedUs();
processors_profile_log->add(processor_elem);
}
}
}
if (auto opentelemetry_span_log = context->getOpenTelemetrySpanLog();
context->query_trace_context.trace_id != UUID()

View File

@ -10,7 +10,9 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
ExecutingGraph::ExecutingGraph(Processors & processors_) : processors(processors_)
ExecutingGraph::ExecutingGraph(Processors & processors_, bool profile_processors_)
: processors(processors_)
, profile_processors(profile_processors_)
{
uint64_t num_processors = processors.size();
nodes.reserve(num_processors);
@ -263,7 +265,33 @@ bool ExecutingGraph::updateNode(uint64_t pid, Queue & queue, Queue & async_queue
try
{
node.last_processor_status = node.processor->prepare(node.updated_input_ports, node.updated_output_ports);
auto & processor = *node.processor;
IProcessor::Status last_status = node.last_processor_status;
IProcessor::Status status = processor.prepare(node.updated_input_ports, node.updated_output_ports);
node.last_processor_status = status;
if (profile_processors)
{
/// NeedData
if (last_status != IProcessor::Status::NeedData && status == IProcessor::Status::NeedData)
{
processor.input_wait_watch.restart();
}
else if (last_status == IProcessor::Status::NeedData && status != IProcessor::Status::NeedData)
{
processor.input_wait_elapsed_us += processor.input_wait_watch.elapsedMicroseconds();
}
/// PortFull
if (last_status != IProcessor::Status::PortFull && status == IProcessor::Status::PortFull)
{
processor.output_wait_watch.restart();
}
else if (last_status == IProcessor::Status::PortFull && status != IProcessor::Status::PortFull)
{
processor.output_wait_elapsed_us += processor.output_wait_watch.elapsedMicroseconds();
}
}
}
catch (...)
{

View File

@ -123,7 +123,7 @@ public:
using ProcessorsMap = std::unordered_map<const IProcessor *, uint64_t>;
ProcessorsMap processors_map;
explicit ExecutingGraph(Processors & processors_);
explicit ExecutingGraph(Processors & processors_, bool profile_processors_);
const Processors & getProcessors() const { return processors; }
@ -153,6 +153,8 @@ private:
std::mutex processors_mutex;
UpgradableMutex nodes_mutex;
const bool profile_processors;
};
}

View File

@ -54,8 +54,13 @@ static void executeJob(IProcessor * processor)
bool ExecutionThreadContext::executeTask()
{
std::optional<Stopwatch> execution_time_watch;
#ifndef NDEBUG
Stopwatch execution_time_watch;
execution_time_watch.emplace();
#else
if (profile_processors)
execution_time_watch.emplace();
#endif
try
@ -69,8 +74,11 @@ bool ExecutionThreadContext::executeTask()
node->exception = std::current_exception();
}
if (profile_processors)
node->processor->elapsed_us += execution_time_watch->elapsedMicroseconds();
#ifndef NDEBUG
execution_time_ns += execution_time_watch.elapsed();
execution_time_ns += execution_time_watch->elapsed();
#endif
return node->exception == nullptr;

View File

@ -35,6 +35,7 @@ public:
#endif
const size_t thread_number;
const bool profile_processors;
void wait(std::atomic_bool & finished);
void wakeUp();
@ -55,7 +56,10 @@ public:
void setException(std::exception_ptr exception_) { exception = std::move(exception_); }
void rethrowExceptionIfHas();
explicit ExecutionThreadContext(size_t thread_number_) : thread_number(thread_number_) {}
explicit ExecutionThreadContext(size_t thread_number_, bool profile_processors_)
: thread_number(thread_number_)
, profile_processors(profile_processors_)
{}
};
}

View File

@ -137,7 +137,7 @@ void ExecutorTasks::pushTasks(Queue & queue, Queue & async_queue, ExecutionThrea
}
}
void ExecutorTasks::init(size_t num_threads_)
void ExecutorTasks::init(size_t num_threads_, bool profile_processors)
{
num_threads = num_threads_;
threads_queue.init(num_threads);
@ -148,7 +148,7 @@ void ExecutorTasks::init(size_t num_threads_)
executor_contexts.reserve(num_threads);
for (size_t i = 0; i < num_threads; ++i)
executor_contexts.emplace_back(std::make_unique<ExecutionThreadContext>(i));
executor_contexts.emplace_back(std::make_unique<ExecutionThreadContext>(i, profile_processors));
}
}

View File

@ -53,7 +53,7 @@ public:
void tryGetTask(ExecutionThreadContext & context);
void pushTasks(Queue & queue, Queue & async_queue, ExecutionThreadContext & context);
void init(size_t num_threads_);
void init(size_t num_threads_, bool profile_processors);
void fill(Queue & queue);
void processAsyncTasks();

View File

@ -8,6 +8,7 @@
#include <QueryPipeline/printPipeline.h>
#include <Processors/ISource.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/Context.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <base/scope_guard_safe.h>
@ -27,9 +28,12 @@ namespace ErrorCodes
PipelineExecutor::PipelineExecutor(Processors & processors, QueryStatus * elem)
: process_list_element(elem)
{
if (process_list_element)
profile_processors = process_list_element->getContext()->getSettingsRef().log_processors_profiles;
try
{
graph = std::make_unique<ExecutingGraph>(processors);
graph = std::make_unique<ExecutingGraph>(processors, profile_processors);
}
catch (Exception & exception)
{
@ -259,7 +263,7 @@ void PipelineExecutor::initializeExecution(size_t num_threads)
Queue queue;
graph->initializeExecution(queue);
tasks.init(num_threads);
tasks.init(num_threads, profile_processors);
tasks.fill(queue);
}

View File

@ -56,6 +56,8 @@ private:
/// Flag that checks that initializeExecution was called.
bool is_execution_initialized = false;
/// system.processors_profile_log
bool profile_processors = false;
std::atomic_bool cancelled = false;

View File

@ -15,16 +15,16 @@ namespace ErrorCodes
class PushingSource : public ISource
{
public:
explicit PushingSource(const Block & header, std::atomic_bool & need_data_flag_)
explicit PushingSource(const Block & header, std::atomic_bool & input_wait_flag_)
: ISource(header)
, need_data_flag(need_data_flag_)
, input_wait_flag(input_wait_flag_)
{}
String getName() const override { return "PushingSource"; }
void setData(Chunk chunk)
{
need_data_flag = false;
input_wait_flag = false;
data = std::move(chunk);
}
@ -34,7 +34,7 @@ protected:
{
auto status = ISource::prepare();
if (status == Status::Ready)
need_data_flag = true;
input_wait_flag = true;
return status;
}
@ -46,7 +46,7 @@ protected:
private:
Chunk data;
std::atomic_bool & need_data_flag;
std::atomic_bool & input_wait_flag;
};
@ -55,7 +55,7 @@ PushingPipelineExecutor::PushingPipelineExecutor(QueryPipeline & pipeline_) : pi
if (!pipeline.pushing())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline for PushingPipelineExecutor must be pushing");
pushing_source = std::make_shared<PushingSource>(pipeline.input->getHeader(), need_data_flag);
pushing_source = std::make_shared<PushingSource>(pipeline.input->getHeader(), input_wait_flag);
connect(pushing_source->getPort(), *pipeline.input);
pipeline.processors.emplace_back(pushing_source);
}
@ -86,7 +86,7 @@ void PushingPipelineExecutor::start()
started = true;
executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
if (!executor->executeStep(&need_data_flag))
if (!executor->executeStep(&input_wait_flag))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Pipeline for PushingPipelineExecutor was finished before all data was inserted");
}
@ -98,7 +98,7 @@ void PushingPipelineExecutor::push(Chunk chunk)
pushing_source->setData(std::move(chunk));
if (!executor->executeStep(&need_data_flag))
if (!executor->executeStep(&input_wait_flag))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Pipeline for PushingPipelineExecutor was finished before all data was inserted");
}

View File

@ -47,7 +47,7 @@ public:
private:
QueryPipeline & pipeline;
std::atomic_bool need_data_flag = false;
std::atomic_bool input_wait_flag = false;
std::shared_ptr<PushingSource> pushing_source;
PipelineExecutorPtr executor;

View File

@ -2,6 +2,7 @@
#include <memory>
#include <Processors/Port.h>
#include <Common/Stopwatch.h>
class EventCounter;
@ -299,14 +300,33 @@ public:
IQueryPlanStep * getQueryPlanStep() const { return query_plan_step; }
size_t getQueryPlanStepGroup() const { return query_plan_step_group; }
uint64_t getElapsedUs() const { return elapsed_us; }
uint64_t getInputWaitElapsedUs() const { return input_wait_elapsed_us; }
uint64_t getOutputWaitElapsedUs() const { return output_wait_elapsed_us; }
protected:
virtual void onCancel() {}
private:
/// For:
/// - elapsed_us
friend class ExecutionThreadContext;
/// For
/// - input_wait_elapsed_us
/// - output_wait_elapsed_us
friend class ExecutingGraph;
std::atomic<bool> is_cancelled{false};
std::string processor_description;
/// For processors_profile_log
uint64_t elapsed_us = 0;
Stopwatch input_wait_watch;
uint64_t input_wait_elapsed_us = 0;
Stopwatch output_wait_watch;
uint64_t output_wait_elapsed_us = 0;
size_t stream_number = NO_STREAM;
IQueryPlanStep * query_plan_step = nullptr;

View File

@ -18,6 +18,7 @@
<part_log remove="remove" />
<crash_log remove="remove" />
<opentelemetry_span_log remove="remove" />
<processors_profile_log remove="remove" />
<!-- just in case it will be enabled by default -->
<zookeeper_log remove="remove" />
</clickhouse>

View File

@ -0,0 +1,38 @@
-- { echo }
EXPLAIN PIPELINE SELECT sleep(1);
(Expression)
ExpressionTransform
(SettingQuotaAndLimits)
(ReadFromStorage)
SourceFromSingleChunk 0 → 1
SELECT sleep(1) SETTINGS log_processors_profiles=true, log_queries=1, log_queries_min_type='QUERY_FINISH';
0
SYSTEM FLUSH LOGS;
WITH
(
SELECT query_id
FROM system.query_log
WHERE current_database = currentDatabase() AND Settings['log_processors_profiles']='1'
) AS query_id_
SELECT
name,
multiIf(
-- ExpressionTransform executes sleep(),
-- so IProcessor::work() will spend 1 sec.
name = 'ExpressionTransform', elapsed_us>1e6,
-- SourceFromSingleChunk, that feed data to ExpressionTransform,
-- will feed first block and then wait in PortFull.
name = 'SourceFromSingleChunk', output_wait_elapsed_us>1e6,
-- NullSource/LazyOutputFormatLazyOutputFormat are the outputs
-- so they cannot starts to execute before sleep(1) will be executed.
input_wait_elapsed_us>1e6)
elapsed
FROM system.processors_profile_log
WHERE query_id = query_id_
ORDER BY name;
ExpressionTransform 1
LazyOutputFormat 1
LimitsCheckingTransform 1
NullSource 1
NullSource 1
SourceFromSingleChunk 1

View File

@ -0,0 +1,28 @@
-- { echo }
EXPLAIN PIPELINE SELECT sleep(1);
SELECT sleep(1) SETTINGS log_processors_profiles=true, log_queries=1, log_queries_min_type='QUERY_FINISH';
SYSTEM FLUSH LOGS;
WITH
(
SELECT query_id
FROM system.query_log
WHERE current_database = currentDatabase() AND Settings['log_processors_profiles']='1'
) AS query_id_
SELECT
name,
multiIf(
-- ExpressionTransform executes sleep(),
-- so IProcessor::work() will spend 1 sec.
name = 'ExpressionTransform', elapsed_us>1e6,
-- SourceFromSingleChunk, that feed data to ExpressionTransform,
-- will feed first block and then wait in PortFull.
name = 'SourceFromSingleChunk', output_wait_elapsed_us>1e6,
-- NullSource/LazyOutputFormatLazyOutputFormat are the outputs
-- so they cannot starts to execute before sleep(1) will be executed.
input_wait_elapsed_us>1e6)
elapsed
FROM system.processors_profile_log
WHERE query_id = query_id_
ORDER BY name;