diff --git a/src/Analyzer/Resolve/QueryAnalyzer.cpp b/src/Analyzer/Resolve/QueryAnalyzer.cpp index 390418494e7..03ebd893c47 100644 --- a/src/Analyzer/Resolve/QueryAnalyzer.cpp +++ b/src/Analyzer/Resolve/QueryAnalyzer.cpp @@ -1,3 +1,4 @@ +#include #include #include @@ -676,6 +677,8 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden "tuple"}); } } + + logProcessorProfile(context, io.pipeline.getProcessors()); } scalars_cache.emplace(node_with_hash, scalar_block); diff --git a/src/Interpreters/ExecuteScalarSubqueriesVisitor.cpp b/src/Interpreters/ExecuteScalarSubqueriesVisitor.cpp index d4da038c089..9ae2ffc208d 100644 --- a/src/Interpreters/ExecuteScalarSubqueriesVisitor.cpp +++ b/src/Interpreters/ExecuteScalarSubqueriesVisitor.cpp @@ -5,9 +5,11 @@ #include #include #include +#include #include #include #include +#include #include #include #include @@ -19,9 +21,8 @@ #include #include #include -#include #include -#include +#include namespace ProfileEvents { @@ -246,6 +247,8 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr if (tmp_block.rows() != 0) throw Exception(ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY, "Scalar subquery returned more than one row"); + + logProcessorProfile(data.getContext(), io.pipeline.getProcessors()); } block = materializeBlock(block); diff --git a/src/Interpreters/PreparedSets.cpp b/src/Interpreters/PreparedSets.cpp index 538108165fb..c69e2f84d42 100644 --- a/src/Interpreters/PreparedSets.cpp +++ b/src/Interpreters/PreparedSets.cpp @@ -1,21 +1,22 @@ #include #include -#include -#include -#include -#include -#include +#include #include -#include +#include +#include +#include +#include +#include #include #include +#include #include +#include #include #include -#include -#include #include #include +#include namespace DB { @@ -239,6 +240,8 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context) if (!set_and_key->set->isCreated()) return nullptr; + logProcessorProfile(context, pipeline.getProcessors()); + return set_and_key->set; } diff --git a/src/Interpreters/ProcessorsProfileLog.cpp b/src/Interpreters/ProcessorsProfileLog.cpp index 8a646b5d0e7..d7811e5e9e2 100644 --- a/src/Interpreters/ProcessorsProfileLog.cpp +++ b/src/Interpreters/ProcessorsProfileLog.cpp @@ -1,5 +1,6 @@ #include +#include #include #include #include @@ -8,16 +9,19 @@ #include #include #include +#include #include #include #include -#include - -#include namespace DB { +namespace Setting +{ +extern const SettingsBool log_processors_profiles; +} + ColumnsDescription ProcessorProfileLogElement::getColumnsDescription() { return ColumnsDescription @@ -81,5 +85,57 @@ void ProcessorProfileLogElement::appendToBlock(MutableColumns & columns) const columns[i++]->insert(output_bytes); } +void logProcessorProfile(ContextPtr context, const Processors & processors) +{ + const Settings & settings = context->getSettingsRef(); + if (settings[Setting::log_processors_profiles]) + { + if (auto processors_profile_log = context->getProcessorsProfileLog()) + { + ProcessorProfileLogElement processor_elem; + const auto time_now = std::chrono::system_clock::now(); + processor_elem.event_time = timeInSeconds(time_now); + processor_elem.event_time_microseconds = timeInMicroseconds(time_now); + processor_elem.initial_query_id = context->getInitialQueryId(); + processor_elem.query_id = context->getCurrentQueryId(); + + auto get_proc_id = [](const IProcessor & proc) -> UInt64 { return reinterpret_cast(&proc); }; + + for (const auto & processor : processors) + { + std::vector parents; + for (const auto & port : processor->getOutputs()) + { + if (!port.isConnected()) + continue; + const IProcessor & next = port.getInputPort().getProcessor(); + parents.push_back(get_proc_id(next)); + } + + processor_elem.id = get_proc_id(*processor); + processor_elem.parent_ids = std::move(parents); + + processor_elem.plan_step = reinterpret_cast(processor->getQueryPlanStep()); + processor_elem.plan_step_name = processor->getPlanStepName(); + processor_elem.plan_step_description = processor->getPlanStepDescription(); + processor_elem.plan_group = processor->getQueryPlanStepGroup(); + + processor_elem.processor_name = processor->getName(); + + processor_elem.elapsed_us = static_cast(processor->getElapsedNs() / 1000U); + processor_elem.input_wait_elapsed_us = static_cast(processor->getInputWaitElapsedNs() / 1000U); + processor_elem.output_wait_elapsed_us = static_cast(processor->getOutputWaitElapsedNs() / 1000U); + + auto stats = processor->getProcessorDataStats(); + processor_elem.input_rows = stats.input_rows; + processor_elem.input_bytes = stats.input_bytes; + processor_elem.output_rows = stats.output_rows; + processor_elem.output_bytes = stats.output_bytes; + + processors_profile_log->add(processor_elem); + } + } + } +} } diff --git a/src/Interpreters/ProcessorsProfileLog.h b/src/Interpreters/ProcessorsProfileLog.h index fbf52f45f56..9cc2ab6c7f0 100644 --- a/src/Interpreters/ProcessorsProfileLog.h +++ b/src/Interpreters/ProcessorsProfileLog.h @@ -50,4 +50,5 @@ public: using SystemLog::SystemLog; }; +void logProcessorProfile(ContextPtr context, const Processors & processors); } diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 9250c069283..fa28fa04ab1 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -117,7 +117,6 @@ namespace Setting extern const SettingsOverflowMode join_overflow_mode; extern const SettingsString log_comment; extern const SettingsBool log_formatted_queries; - extern const SettingsBool log_processors_profiles; extern const SettingsBool log_profile_events; extern const SettingsUInt64 log_queries_cut_to_length; extern const SettingsBool log_queries; @@ -551,53 +550,8 @@ void logQueryFinish( if (auto query_log = context->getQueryLog()) query_log->add(elem); } - if (settings[Setting::log_processors_profiles]) - { - if (auto processors_profile_log = context->getProcessorsProfileLog()) - { - ProcessorProfileLogElement processor_elem; - processor_elem.event_time = elem.event_time; - processor_elem.event_time_microseconds = elem.event_time_microseconds; - processor_elem.initial_query_id = elem.client_info.initial_query_id; - processor_elem.query_id = elem.client_info.current_query_id; - auto get_proc_id = [](const IProcessor & proc) -> UInt64 { return reinterpret_cast(&proc); }; - - for (const auto & processor : query_pipeline.getProcessors()) - { - std::vector parents; - for (const auto & port : processor->getOutputs()) - { - if (!port.isConnected()) - continue; - const IProcessor & next = port.getInputPort().getProcessor(); - parents.push_back(get_proc_id(next)); - } - - processor_elem.id = get_proc_id(*processor); - processor_elem.parent_ids = std::move(parents); - - processor_elem.plan_step = reinterpret_cast(processor->getQueryPlanStep()); - processor_elem.plan_step_name = processor->getPlanStepName(); - processor_elem.plan_step_description = processor->getPlanStepDescription(); - processor_elem.plan_group = processor->getQueryPlanStepGroup(); - - processor_elem.processor_name = processor->getName(); - - processor_elem.elapsed_us = static_cast(processor->getElapsedNs() / 1000U); - processor_elem.input_wait_elapsed_us = static_cast(processor->getInputWaitElapsedNs() / 1000U); - processor_elem.output_wait_elapsed_us = static_cast(processor->getOutputWaitElapsedNs() / 1000U); - - auto stats = processor->getProcessorDataStats(); - processor_elem.input_rows = stats.input_rows; - processor_elem.input_bytes = stats.input_bytes; - processor_elem.output_rows = stats.output_rows; - processor_elem.output_bytes = stats.output_bytes; - - processors_profile_log->add(processor_elem); - } - } - } + logProcessorProfile(context, query_pipeline.getProcessors()); logQueryMetricLogFinish(context, internal, elem.client_info.current_query_id, time_now, std::make_shared(info)); }