fixup! Add support for EXPLAIN ANALYZE in graph mode

This commit is contained in:
Tanay Tummalapalli 2024-10-15 23:40:39 +05:30
parent 29eb8281cb
commit 88eeba57f8
4 changed files with 25 additions and 17 deletions

View File

@ -245,8 +245,7 @@ struct ExecutionAnalysisSettings
constexpr static char name[] = "ANALYZE"; constexpr static char name[] = "ANALYZE";
std::unordered_map<std::string, std::reference_wrapper<bool>> boolean_settings = std::unordered_map<std::string, std::reference_wrapper<bool>> boolean_settings = {
{
{"graph", graph}, {"graph", graph},
}; };
@ -631,9 +630,9 @@ QueryPipeline InterpreterExplainQuery::executeImpl()
break; break;
} }
case ASTExplainQuery::ExecutionAnalysis: case ASTExplainQuery::ExecutionAnalysis: {
if (dynamic_cast<const ASTSelectWithUnionQuery *>(ast.getExplainedQuery().get()))
{ {
if(dynamic_cast<const ASTSelectWithUnionQuery *>(ast.getExplainedQuery().get())) {
auto settings = checkAndGetSettings<ExecutionAnalysisSettings>(ast.getSettings()); auto settings = checkAndGetSettings<ExecutionAnalysisSettings>(ast.getSettings());
BlockIO res; BlockIO res;
@ -643,7 +642,8 @@ QueryPipeline InterpreterExplainQuery::executeImpl()
InterpreterSelectQueryAnalyzer interpreter(ast.getExplainedQuery(), getContext(), options); InterpreterSelectQueryAnalyzer interpreter(ast.getExplainedQuery(), getContext(), options);
res = interpreter.execute(); res = interpreter.execute();
} }
else { else
{
InterpreterSelectWithUnionQuery interpreter(ast.getExplainedQuery(), getContext(), options); InterpreterSelectWithUnionQuery interpreter(ast.getExplainedQuery(), getContext(), options);
res = interpreter.execute(); res = interpreter.execute();
} }
@ -652,13 +652,15 @@ QueryPipeline InterpreterExplainQuery::executeImpl()
const auto & processors = pipeline.getProcessors(); const auto & processors = pipeline.getProcessors();
PullingPipelineExecutor pulling_executor(pipeline, true); PullingPipelineExecutor pulling_executor(pipeline, true);
while(true) { while (true)
{
Block block; Block block;
if (!pulling_executor.pull(block)) if (!pulling_executor.pull(block))
break; break;
} }
if(settings.graph) { if (settings.graph)
{
printExecutionAnalysis(processors, buf); printExecutionAnalysis(processors, buf);
} }
else else

View File

@ -71,7 +71,9 @@ PipelineExecutor::PipelineExecutor(std::shared_ptr<Processors> & processors, Que
} }
} }
PipelineExecutor::PipelineExecutor(std::shared_ptr<Processors> & processors, QueryStatusPtr elem, bool profile_processors_): PipelineExecutor(processors, elem) { PipelineExecutor::PipelineExecutor(std::shared_ptr<Processors> & processors, QueryStatusPtr elem, bool profile_processors_)
: PipelineExecutor(processors, elem)
{
profile_processors = profile_processors || profile_processors_; profile_processors = profile_processors || profile_processors_;
} }

View File

@ -174,7 +174,8 @@ void printPipelineCompact(const Processors & processors, WriteBuffer & out, bool
out << "}\n"; out << "}\n";
} }
void printExecutionAnalysis(const Processors & processors, WriteBuffer & out) { void printExecutionAnalysis(const Processors & processors, WriteBuffer & out)
{
out << "digraph\n"; out << "digraph\n";
out << "{\n"; out << "{\n";
out << " rankdir=\"LR\";\n"; out << " rankdir=\"LR\";\n";
@ -188,7 +189,8 @@ void printExecutionAnalysis(const Processors & processors, WriteBuffer & out) {
}; };
for(const auto & processor : processors) { for (const auto & processor : processors)
{
const auto & description = processor->getDescription(); const auto & description = processor->getDescription();
out << " n" << get_proc_id(*processor) << "[label=<\n"; out << " n" << get_proc_id(*processor) << "[label=<\n";
out << " <table border=\"0\" cellborder=\"1\" cellspacing=\"0\">\n"; out << " <table border=\"0\" cellborder=\"1\" cellspacing=\"0\">\n";
@ -198,7 +200,8 @@ void printExecutionAnalysis(const Processors & processors, WriteBuffer & out) {
out << " <table border=\"0\" cellborder=\"0\" cellspacing=\"0\">\n"; out << " <table border=\"0\" cellborder=\"0\" cellspacing=\"0\">\n";
out << " <tr><td>Input wait time(us): " << processor->getInputWaitElapsedNs() / 1000U << "</td></tr>\n"; out << " <tr><td>Input wait time(us): " << processor->getInputWaitElapsedNs() / 1000U << "</td></tr>\n";
out << " <tr><td>Execution time(us): " << processor->getElapsedNs() / 1000U << "</td></tr>\n"; out << " <tr><td>Execution time(us): " << processor->getElapsedNs() / 1000U << "</td></tr>\n";
out << " <tr><td>Output wait time(us): " << processor->getOutputWaitElapsedNs() / 1000U << "</td></tr>\n"; out << " <tr><td>Output wait time(us): " << processor->getOutputWaitElapsedNs() / 1000U
<< "</td></tr>\n";
out << " </table>\n"; out << " </table>\n";
out << " </td>\n"; out << " </td>\n";
out << " </tr>\n"; out << " </tr>\n";
@ -220,7 +223,8 @@ void printExecutionAnalysis(const Processors & processors, WriteBuffer & out) {
const IProcessor & next = port.getInputPort().getProcessor(); const IProcessor & next = port.getInputPort().getProcessor();
const auto curr_stats = curr.getProcessorDataStats(); const auto curr_stats = curr.getProcessorDataStats();
out << " n" << get_proc_id(curr) << " -> n" << get_proc_id(next) << " [label=\"" << curr_stats.output_rows << " rows\n " << curr_stats.output_bytes << " bytes\"];\n"; out << " n" << get_proc_id(curr) << " -> n" << get_proc_id(next) << " [label=\"" << curr_stats.output_rows << " rows\n "
<< curr_stats.output_bytes << " bytes\"];\n";
} }
} }
out << "}\n"; out << "}\n";