This commit is contained in:
ttanay 2024-11-20 16:26:10 -08:00 committed by GitHub
commit e0c4b07b54
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 158 additions and 13 deletions

View File

@ -23,6 +23,7 @@
#include <Parsers/ASTSetQuery.h>
#include <Storages/StorageView.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
@ -238,6 +239,19 @@ struct QueryPipelineSettings
std::unordered_map<std::string, std::reference_wrapper<Int64>> integer_settings;
};
struct ExecutionAnalysisSettings
{
bool graph = true;
constexpr static char name[] = "ANALYZE";
std::unordered_map<std::string, std::reference_wrapper<bool>> boolean_settings = {
{"graph", graph},
};
std::unordered_map<std::string, std::reference_wrapper<Int64>> integer_settings;
};
template <typename Settings>
struct ExplainSettings : public Settings
{
@ -618,6 +632,45 @@ QueryPipeline InterpreterExplainQuery::executeImpl()
break;
}
case ASTExplainQuery::ExecutionAnalysis: {
if (dynamic_cast<const ASTSelectWithUnionQuery *>(ast.getExplainedQuery().get()))
{
auto settings = checkAndGetSettings<ExecutionAnalysisSettings>(ast.getSettings());
BlockIO res;
//Build Query Plan
if (getContext()->getSettingsRef()[Setting::allow_experimental_analyzer])
{
InterpreterSelectQueryAnalyzer interpreter(ast.getExplainedQuery(), getContext(), options);
res = interpreter.execute();
}
else
{
InterpreterSelectWithUnionQuery interpreter(ast.getExplainedQuery(), getContext(), options);
res = interpreter.execute();
}
auto & pipeline = res.pipeline;
const auto & processors = pipeline.getProcessors();
PullingPipelineExecutor pulling_executor(pipeline, true);
while (true)
{
Block block;
if (!pulling_executor.pull(block))
break;
}
if (settings.graph)
{
printExecutionAnalysis(processors, buf);
}
else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Text mode is not supported yet");
}
else
throw Exception(ErrorCodes::INCORRECT_QUERY, "Only SELECT is supposed for EXPLAIN ANALYZE query");
}
}
buf.finalize();
if (insert_buf)

View File

@ -8,7 +8,7 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int BAD_ARGUMENTS;
}
/// AST, EXPLAIN or other query with meaning of explanation query instead of execution
@ -25,20 +25,31 @@ public:
QueryEstimates, /// 'EXPLAIN ESTIMATE ...'
TableOverride, /// 'EXPLAIN TABLE OVERRIDE ...'
CurrentTransaction, /// 'EXPLAIN CURRENT TRANSACTION'
ExecutionAnalysis, /// 'EXPLAIN ANALYZE'
};
static String toString(ExplainKind kind)
{
switch (kind)
{
case ParsedAST: return "EXPLAIN AST";
case AnalyzedSyntax: return "EXPLAIN SYNTAX";
case QueryTree: return "EXPLAIN QUERY TREE";
case QueryPlan: return "EXPLAIN";
case QueryPipeline: return "EXPLAIN PIPELINE";
case QueryEstimates: return "EXPLAIN ESTIMATE";
case TableOverride: return "EXPLAIN TABLE OVERRIDE";
case CurrentTransaction: return "EXPLAIN CURRENT TRANSACTION";
case ParsedAST:
return "EXPLAIN AST";
case AnalyzedSyntax:
return "EXPLAIN SYNTAX";
case QueryTree:
return "EXPLAIN QUERY TREE";
case QueryPlan:
return "EXPLAIN";
case QueryPipeline:
return "EXPLAIN PIPELINE";
case QueryEstimates:
return "EXPLAIN ESTIMATE";
case TableOverride:
return "EXPLAIN TABLE OVERRIDE";
case CurrentTransaction:
return "EXPLAIN CURRENT TRANSACTION";
case ExecutionAnalysis:
return "EXPLAIN ANALYZE";
}
}
@ -60,11 +71,13 @@ public:
return TableOverride;
if (str == "EXPLAIN CURRENT TRANSACTION")
return CurrentTransaction;
if (str == "EXPLAIN ANALYZE")
return ExecutionAnalysis;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown explain kind '{}'", str);
}
explicit ASTExplainQuery(ExplainKind kind_) : kind(kind_) {}
explicit ASTExplainQuery(ExplainKind kind_) : kind(kind_) { }
String getID(char delim) const override { return "Explain" + (delim + toString(kind)); }
ExplainKind getKind() const { return kind; }

View File

@ -35,6 +35,7 @@ namespace DB
MR_MACROS(ALTER_USER, "ALTER USER") \
MR_MACROS(ALTER, "ALTER") \
MR_MACROS(AND_STDOUT, "AND STDOUT") \
MR_MACROS(ANALYZE, "ANALYZE") \
MR_MACROS(AND, "AND") \
MR_MACROS(ANTI, "ANTI") \
MR_MACROS(ANY, "ANY") \

View File

@ -25,6 +25,7 @@ bool ParserExplainQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_estimates(Keyword::ESTIMATE);
ParserKeyword s_table_override(Keyword::TABLE_OVERRIDE);
ParserKeyword s_current_transaction(Keyword::CURRENT_TRANSACTION);
ParserKeyword s_analyze(Keyword::ANALYZE);
if (s_explain.ignore(pos, expected))
{
@ -46,6 +47,8 @@ bool ParserExplainQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
kind = ASTExplainQuery::ExplainKind::TableOverride;
else if (s_current_transaction.ignore(pos, expected))
kind = ASTExplainQuery::ExplainKind::CurrentTransaction;
else if (s_analyze.ignore(pos, expected))
kind = ASTExplainQuery::ExplainKind::ExecutionAnalysis;
}
else
return false;

View File

@ -72,6 +72,12 @@ PipelineExecutor::PipelineExecutor(std::shared_ptr<Processors> & processors, Que
}
}
PipelineExecutor::PipelineExecutor(std::shared_ptr<Processors> & processors, QueryStatusPtr elem, bool profile_processors_)
: PipelineExecutor(processors, elem)
{
profile_processors = profile_processors || profile_processors_;
}
PipelineExecutor::~PipelineExecutor()
{
if (process_list_element)

View File

@ -35,6 +35,7 @@ public:
///
/// Explicit graph representation is built in constructor. Throws if graph is not correct.
explicit PipelineExecutor(std::shared_ptr<Processors> & processors, QueryStatusPtr elem);
explicit PipelineExecutor(std::shared_ptr<Processors> & processors, QueryStatusPtr elem, bool profile_processors_);
~PipelineExecutor();
/// Execute pipeline in multiple threads. Must be called once.
@ -88,7 +89,8 @@ private:
/// Flag that checks that initializeExecution was called.
bool is_execution_initialized = false;
/// system.processors_profile_log
/// Whether execution statistics need to br written to
/// system.processors_profile_log or reported as part of `EXPLAIN ANALYZE`
bool profile_processors = false;
/// system.opentelemetry_span_log
bool trace_processors = false;

View File

@ -23,6 +23,11 @@ PullingPipelineExecutor::PullingPipelineExecutor(QueryPipeline & pipeline_) : pi
pipeline.complete(pulling_format);
}
PullingPipelineExecutor::PullingPipelineExecutor(QueryPipeline & pipeline_, bool profile_processors_): PullingPipelineExecutor(pipeline_)
{
profile_processors = profile_processors_;
}
PullingPipelineExecutor::~PullingPipelineExecutor()
{
try
@ -44,7 +49,7 @@ bool PullingPipelineExecutor::pull(Chunk & chunk)
{
if (!executor)
{
executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element);
executor = std::make_shared<PipelineExecutor>(pipeline.processors, pipeline.process_list_element, profile_processors);
executor->setReadProgressCallback(pipeline.getReadProgressCallback());
}

View File

@ -24,6 +24,7 @@ class PullingPipelineExecutor
{
public:
explicit PullingPipelineExecutor(QueryPipeline & pipeline_);
explicit PullingPipelineExecutor(QueryPipeline & pipeline_, bool profile_processors_);
~PullingPipelineExecutor();
/// Get structure of returned block or chunk.
@ -53,6 +54,10 @@ private:
QueryPipeline & pipeline;
std::shared_ptr<PullingOutputFormat> pulling_format;
PipelineExecutorPtr executor;
// Used for EXPLAIN ANALYZE
// TODO: Refactor to make it applicable to all Pipeline types
bool profile_processors = false;
};
}

View File

@ -174,4 +174,59 @@ void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool
out << "}\n";
}
void printExecutionAnalysis(const Processors & processors, WriteBuffer & out)
{
out << "digraph\n";
out << "{\n";
out << " rankdir=\"LR\";\n";
out << " { node [shape=rect margin=0]\n";
std::unordered_map<const void *, std::size_t> pointer_to_id;
auto get_proc_id = [&](const IProcessor & proc) -> std::size_t
{
auto [it, inserted] = pointer_to_id.try_emplace(&proc, pointer_to_id.size());
return it->second;
};
for (const auto & processor : processors)
{
const auto & description = processor->getDescription();
out << " n" << get_proc_id(*processor) << "[label=<\n";
out << " <table border=\"0\" cellborder=\"1\" cellspacing=\"0\">\n";
out << " <tr><td>" << processor->getName() << (description.empty() ? "" : ":") << description << "</td></tr>\n";
out << " <tr>\n";
out << " <td>\n";
out << " <table border=\"0\" cellborder=\"0\" cellspacing=\"0\">\n";
out << " <tr><td>Input wait time(us): " << processor->getInputWaitElapsedNs() / 1000U << "</td></tr>\n";
out << " <tr><td>Execution time(us): " << processor->getElapsedNs() / 1000U << "</td></tr>\n";
out << " <tr><td>Output wait time(us): " << processor->getOutputWaitElapsedNs() / 1000U
<< "</td></tr>\n";
out << " </table>\n";
out << " </td>\n";
out << " </tr>\n";
out << " </table>\n";
out << " >];\n";
}
out << " }\n";
/// Edges
for (const auto & processor : processors)
{
for (const auto & port : processor->getOutputs())
{
if (!port.isConnected())
continue;
const IProcessor & curr = *processor;
const IProcessor & next = port.getInputPort().getProcessor();
const auto curr_stats = curr.getProcessorDataStats();
out << " n" << get_proc_id(curr) << " -> n" << get_proc_id(next) << " [label=\"" << curr_stats.output_rows << " rows\n "
<< curr_stats.output_bytes << " bytes\"];\n";
}
}
out << "}\n";
}
}

View File

@ -65,10 +65,12 @@ void printPipeline(const Processors & processors, WriteBuffer & out)
{
printPipeline(processors, std::vector<IProcessor::Status>(), out);
}
/// Prints pipeline in compact representation.
/// Group processors by it's name, QueryPlanStep and QueryPlanStepGroup.
/// If QueryPlanStep wasn't set for processor, representation may be not correct.
/// If with_header is set, prints block header for each edge.
void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool with_header);
// Prints the execution analysis in graph Format
void printExecutionAnalysis(const Processors & processors, WriteBuffer & out);
}