This commit is contained in:
Nikita Taranov 2024-11-11 22:43:03 +01:00
parent d0405135a7
commit b05d3ed6df
6 changed files with 80 additions and 60 deletions

View File

@ -1,3 +1,4 @@
#include <Interpreters/ProcessorsProfileLog.h>
#include <Common/FieldVisitorToString.h>
#include <DataTypes/DataTypesNumber.h>
@ -676,6 +677,8 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
"tuple"});
}
}
logProcessorProfile(context, io.pipeline.getProcessors());
}
scalars_cache.emplace(node_with_hash, scalar_block);

View File

@ -5,9 +5,11 @@
#include <Core/Settings.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/addTypeConversionToAST.h>
#include <Interpreters/misc.h>
#include <Parsers/ASTExpressionList.h>
@ -19,9 +21,8 @@
#include <Parsers/ASTWithElement.h>
#include <Parsers/queryToString.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Common/ProfileEvents.h>
#include <Common/FieldVisitorToString.h>
#include <IO/WriteBufferFromString.h>
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
@ -246,6 +247,8 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
if (tmp_block.rows() != 0)
throw Exception(ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY, "Scalar subquery returned more than one row");
logProcessorProfile(data.getContext(), io.pipeline.getProcessors());
}
block = materializeBlock(block);

View File

@ -1,21 +1,22 @@
#include <chrono>
#include <variant>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/Set.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <IO/Operators.h>
#include <Common/logger_useful.h>
#include <Core/Block.h>
#include <Core/Settings.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <IO/Operators.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/Set.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/Sinks/EmptySink.h>
#include <Processors/Sinks/NullSink.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Core/Block.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/SizeLimits.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -239,6 +240,8 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context)
if (!set_and_key->set->isCreated())
return nullptr;
logProcessorProfile(context, pipeline.getProcessors());
return set_and_key->set;
}

View File

@ -1,5 +1,6 @@
#include <Interpreters/ProcessorsProfileLog.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
@ -8,16 +9,19 @@
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <base/getFQDNOrHostName.h>
#include <Common/ClickHouseRevision.h>
#include <Common/DateLUTImpl.h>
#include <Common/logger_useful.h>
#include <array>
namespace DB
{
namespace Setting
{
extern const SettingsBool log_processors_profiles;
}
ColumnsDescription ProcessorProfileLogElement::getColumnsDescription()
{
return ColumnsDescription
@ -81,5 +85,57 @@ void ProcessorProfileLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(output_bytes);
}
void logProcessorProfile(ContextPtr context, const Processors & processors)
{
const Settings & settings = context->getSettingsRef();
if (settings[Setting::log_processors_profiles])
{
if (auto processors_profile_log = context->getProcessorsProfileLog())
{
ProcessorProfileLogElement processor_elem;
const auto time_now = std::chrono::system_clock::now();
processor_elem.event_time = timeInSeconds(time_now);
processor_elem.event_time_microseconds = timeInMicroseconds(time_now);
processor_elem.initial_query_id = context->getInitialQueryId();
processor_elem.query_id = context->getCurrentQueryId();
auto get_proc_id = [](const IProcessor & proc) -> UInt64 { return reinterpret_cast<std::uintptr_t>(&proc); };
for (const auto & processor : processors)
{
std::vector<UInt64> parents;
for (const auto & port : processor->getOutputs())
{
if (!port.isConnected())
continue;
const IProcessor & next = port.getInputPort().getProcessor();
parents.push_back(get_proc_id(next));
}
processor_elem.id = get_proc_id(*processor);
processor_elem.parent_ids = std::move(parents);
processor_elem.plan_step = reinterpret_cast<std::uintptr_t>(processor->getQueryPlanStep());
processor_elem.plan_step_name = processor->getPlanStepName();
processor_elem.plan_step_description = processor->getPlanStepDescription();
processor_elem.plan_group = processor->getQueryPlanStepGroup();
processor_elem.processor_name = processor->getName();
processor_elem.elapsed_us = static_cast<UInt64>(processor->getElapsedNs() / 1000U);
processor_elem.input_wait_elapsed_us = static_cast<UInt64>(processor->getInputWaitElapsedNs() / 1000U);
processor_elem.output_wait_elapsed_us = static_cast<UInt64>(processor->getOutputWaitElapsedNs() / 1000U);
auto stats = processor->getProcessorDataStats();
processor_elem.input_rows = stats.input_rows;
processor_elem.input_bytes = stats.input_bytes;
processor_elem.output_rows = stats.output_rows;
processor_elem.output_bytes = stats.output_bytes;
processors_profile_log->add(processor_elem);
}
}
}
}
}

View File

@ -50,4 +50,5 @@ public:
using SystemLog<ProcessorProfileLogElement>::SystemLog;
};
void logProcessorProfile(ContextPtr context, const Processors & processors);
}

View File

@ -117,7 +117,6 @@ namespace Setting
extern const SettingsOverflowMode join_overflow_mode;
extern const SettingsString log_comment;
extern const SettingsBool log_formatted_queries;
extern const SettingsBool log_processors_profiles;
extern const SettingsBool log_profile_events;
extern const SettingsUInt64 log_queries_cut_to_length;
extern const SettingsBool log_queries;
@ -551,53 +550,8 @@ void logQueryFinish(
if (auto query_log = context->getQueryLog())
query_log->add(elem);
}
if (settings[Setting::log_processors_profiles])
{
if (auto processors_profile_log = context->getProcessorsProfileLog())
{
ProcessorProfileLogElement processor_elem;
processor_elem.event_time = elem.event_time;
processor_elem.event_time_microseconds = elem.event_time_microseconds;
processor_elem.initial_query_id = elem.client_info.initial_query_id;
processor_elem.query_id = elem.client_info.current_query_id;
auto get_proc_id = [](const IProcessor & proc) -> UInt64 { return reinterpret_cast<std::uintptr_t>(&proc); };
for (const auto & processor : query_pipeline.getProcessors())
{
std::vector<UInt64> parents;
for (const auto & port : processor->getOutputs())
{
if (!port.isConnected())
continue;
const IProcessor & next = port.getInputPort().getProcessor();
parents.push_back(get_proc_id(next));
}
processor_elem.id = get_proc_id(*processor);
processor_elem.parent_ids = std::move(parents);
processor_elem.plan_step = reinterpret_cast<std::uintptr_t>(processor->getQueryPlanStep());
processor_elem.plan_step_name = processor->getPlanStepName();
processor_elem.plan_step_description = processor->getPlanStepDescription();
processor_elem.plan_group = processor->getQueryPlanStepGroup();
processor_elem.processor_name = processor->getName();
processor_elem.elapsed_us = static_cast<UInt64>(processor->getElapsedNs() / 1000U);
processor_elem.input_wait_elapsed_us = static_cast<UInt64>(processor->getInputWaitElapsedNs() / 1000U);
processor_elem.output_wait_elapsed_us = static_cast<UInt64>(processor->getOutputWaitElapsedNs() / 1000U);
auto stats = processor->getProcessorDataStats();
processor_elem.input_rows = stats.input_rows;
processor_elem.input_bytes = stats.input_bytes;
processor_elem.output_rows = stats.output_rows;
processor_elem.output_bytes = stats.output_bytes;
processors_profile_log->add(processor_elem);
}
}
}
logProcessorProfile(context, query_pipeline.getProcessors());
logQueryMetricLogFinish(context, internal, elem.client_info.current_query_id, time_now, std::make_shared<QueryStatusInfo>(info));
}