mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 11:02:08 +00:00
342 lines
10 KiB
C++
342 lines
10 KiB
C++
#include <Processors/QueryPlan/QueryPlan.h>
|
|
#include <Processors/QueryPlan/IQueryPlanStep.h>
|
|
#include <Processors/QueryPipeline.h>
|
|
#include <IO/WriteBuffer.h>
|
|
#include <IO/Operators.h>
|
|
#include <Interpreters/ActionsDAG.h>
|
|
#include <Interpreters/ArrayJoinAction.h>
|
|
#include <stack>
|
|
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
|
|
|
|
namespace DB
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
{
|
|
extern const int LOGICAL_ERROR;
|
|
}
|
|
|
|
QueryPlan::QueryPlan() = default;
|
|
QueryPlan::~QueryPlan() = default;
|
|
QueryPlan::QueryPlan(QueryPlan &&) = default;
|
|
QueryPlan & QueryPlan::operator=(QueryPlan &&) = default;
|
|
|
|
void QueryPlan::checkInitialized() const
|
|
{
|
|
if (!isInitialized())
|
|
throw Exception("QueryPlan was not initialized", ErrorCodes::LOGICAL_ERROR);
|
|
}
|
|
|
|
void QueryPlan::checkNotCompleted() const
|
|
{
|
|
if (isCompleted())
|
|
throw Exception("QueryPlan was already completed", ErrorCodes::LOGICAL_ERROR);
|
|
}
|
|
|
|
bool QueryPlan::isCompleted() const
|
|
{
|
|
return isInitialized() && !root->step->hasOutputStream();
|
|
}
|
|
|
|
const DataStream & QueryPlan::getCurrentDataStream() const
|
|
{
|
|
checkInitialized();
|
|
checkNotCompleted();
|
|
return root->step->getOutputStream();
|
|
}
|
|
|
|
void QueryPlan::unitePlans(QueryPlanStepPtr step, std::vector<std::unique_ptr<QueryPlan>> plans)
|
|
{
|
|
if (isInitialized())
|
|
throw Exception("Cannot unite plans because current QueryPlan is already initialized",
|
|
ErrorCodes::LOGICAL_ERROR);
|
|
|
|
const auto & inputs = step->getInputStreams();
|
|
size_t num_inputs = step->getInputStreams().size();
|
|
if (num_inputs != plans.size())
|
|
{
|
|
throw Exception("Cannot unite QueryPlans using " + step->getName() +
|
|
" because step has different number of inputs. "
|
|
"Has " + std::to_string(plans.size()) + " plans "
|
|
"and " + std::to_string(num_inputs) + " inputs", ErrorCodes::LOGICAL_ERROR);
|
|
}
|
|
|
|
for (size_t i = 0; i < num_inputs; ++i)
|
|
{
|
|
const auto & step_header = inputs[i].header;
|
|
const auto & plan_header = plans[i]->getCurrentDataStream().header;
|
|
if (!blocksHaveEqualStructure(step_header, plan_header))
|
|
throw Exception("Cannot unite QueryPlans using " + step->getName() + " because "
|
|
"it has incompatible header with plan " + root->step->getName() + " "
|
|
"plan header: " + plan_header.dumpStructure() +
|
|
"step header: " + step_header.dumpStructure(), ErrorCodes::LOGICAL_ERROR);
|
|
}
|
|
|
|
for (auto & plan : plans)
|
|
nodes.splice(nodes.end(), std::move(plan->nodes));
|
|
|
|
nodes.emplace_back(Node{.step = std::move(step)});
|
|
root = &nodes.back();
|
|
|
|
for (auto & plan : plans)
|
|
root->children.emplace_back(plan->root);
|
|
|
|
for (auto & plan : plans)
|
|
{
|
|
max_threads = std::max(max_threads, plan->max_threads);
|
|
interpreter_context.insert(interpreter_context.end(),
|
|
plan->interpreter_context.begin(), plan->interpreter_context.end());
|
|
}
|
|
}
|
|
|
|
void QueryPlan::addStep(QueryPlanStepPtr step)
|
|
{
|
|
checkNotCompleted();
|
|
|
|
size_t num_input_streams = step->getInputStreams().size();
|
|
|
|
if (num_input_streams == 0)
|
|
{
|
|
if (isInitialized())
|
|
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because "
|
|
"step has no inputs, but QueryPlan is already initialized", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
nodes.emplace_back(Node{.step = std::move(step)});
|
|
root = &nodes.back();
|
|
return;
|
|
}
|
|
|
|
if (num_input_streams == 1)
|
|
{
|
|
if (!isInitialized())
|
|
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because "
|
|
"step has input, but QueryPlan is not initialized", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
const auto & root_header = root->step->getOutputStream().header;
|
|
const auto & step_header = step->getInputStreams().front().header;
|
|
if (!blocksHaveEqualStructure(root_header, step_header))
|
|
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because "
|
|
"it has incompatible header with root step " + root->step->getName() + " "
|
|
"root header: " + root_header.dumpStructure() +
|
|
"step header: " + step_header.dumpStructure(), ErrorCodes::LOGICAL_ERROR);
|
|
|
|
nodes.emplace_back(Node{.step = std::move(step), .children = {root}});
|
|
root = &nodes.back();
|
|
return;
|
|
}
|
|
|
|
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because it has " +
|
|
std::to_string(num_input_streams) + " inputs but " + std::to_string(isInitialized() ? 1 : 0) +
|
|
" input expected", ErrorCodes::LOGICAL_ERROR);
|
|
}
|
|
|
|
QueryPipelinePtr QueryPlan::buildQueryPipeline()
|
|
{
|
|
checkInitialized();
|
|
optimize();
|
|
|
|
struct Frame
|
|
{
|
|
Node * node;
|
|
QueryPipelines pipelines = {};
|
|
};
|
|
|
|
QueryPipelinePtr last_pipeline;
|
|
|
|
std::stack<Frame> stack;
|
|
stack.push(Frame{.node = root});
|
|
|
|
while (!stack.empty())
|
|
{
|
|
auto & frame = stack.top();
|
|
|
|
if (last_pipeline)
|
|
{
|
|
frame.pipelines.emplace_back(std::move(last_pipeline));
|
|
last_pipeline = nullptr;
|
|
}
|
|
|
|
size_t next_child = frame.pipelines.size();
|
|
if (next_child == frame.node->children.size())
|
|
{
|
|
bool limit_max_threads = frame.pipelines.empty();
|
|
last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines));
|
|
|
|
if (limit_max_threads && max_threads)
|
|
last_pipeline->limitMaxThreads(max_threads);
|
|
|
|
stack.pop();
|
|
}
|
|
else
|
|
stack.push(Frame{.node = frame.node->children[next_child]});
|
|
}
|
|
|
|
for (auto & context : interpreter_context)
|
|
last_pipeline->addInterpreterContext(std::move(context));
|
|
|
|
return last_pipeline;
|
|
}
|
|
|
|
Pipe QueryPlan::convertToPipe()
|
|
{
|
|
if (!isInitialized())
|
|
return {};
|
|
|
|
if (isCompleted())
|
|
throw Exception("Cannot convert completed QueryPlan to Pipe", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
return QueryPipeline::getPipe(std::move(*buildQueryPipeline()));
|
|
}
|
|
|
|
void QueryPlan::addInterpreterContext(std::shared_ptr<Context> context)
|
|
{
|
|
interpreter_context.emplace_back(std::move(context));
|
|
}
|
|
|
|
|
|
static void explainStep(
|
|
const IQueryPlanStep & step,
|
|
IQueryPlanStep::FormatSettings & settings,
|
|
const QueryPlan::ExplainPlanOptions & options)
|
|
{
|
|
std::string prefix(settings.offset, ' ');
|
|
settings.out << prefix;
|
|
settings.out << step.getName();
|
|
|
|
const auto & description = step.getStepDescription();
|
|
if (options.description && !description.empty())
|
|
settings.out <<" (" << description << ')';
|
|
|
|
settings.out.write('\n');
|
|
|
|
if (options.header)
|
|
{
|
|
settings.out << prefix;
|
|
|
|
if (!step.hasOutputStream())
|
|
settings.out << "No header";
|
|
else if (!step.getOutputStream().header)
|
|
settings.out << "Empty header";
|
|
else
|
|
{
|
|
settings.out << "Header: ";
|
|
bool first = true;
|
|
|
|
for (const auto & elem : step.getOutputStream().header)
|
|
{
|
|
if (!first)
|
|
settings.out << "\n" << prefix << " ";
|
|
|
|
first = false;
|
|
elem.dumpNameAndType(settings.out);
|
|
}
|
|
}
|
|
|
|
settings.out.write('\n');
|
|
}
|
|
|
|
if (options.actions)
|
|
step.describeActions(settings);
|
|
}
|
|
|
|
std::string debugExplainStep(const IQueryPlanStep & step)
|
|
{
|
|
WriteBufferFromOwnString out;
|
|
IQueryPlanStep::FormatSettings settings{.out = out};
|
|
QueryPlan::ExplainPlanOptions options{.actions = true};
|
|
explainStep(step, settings, options);
|
|
return out.str();
|
|
}
|
|
|
|
void QueryPlan::explainPlan(WriteBuffer & buffer, const ExplainPlanOptions & options)
|
|
{
|
|
checkInitialized();
|
|
|
|
IQueryPlanStep::FormatSettings settings{.out = buffer, .write_header = options.header};
|
|
|
|
struct Frame
|
|
{
|
|
Node * node;
|
|
bool is_description_printed = false;
|
|
size_t next_child = 0;
|
|
};
|
|
|
|
std::stack<Frame> stack;
|
|
stack.push(Frame{.node = root});
|
|
|
|
while (!stack.empty())
|
|
{
|
|
auto & frame = stack.top();
|
|
|
|
if (!frame.is_description_printed)
|
|
{
|
|
settings.offset = (stack.size() - 1) * settings.indent;
|
|
explainStep(*frame.node->step, settings, options);
|
|
frame.is_description_printed = true;
|
|
}
|
|
|
|
if (frame.next_child < frame.node->children.size())
|
|
{
|
|
stack.push(Frame{frame.node->children[frame.next_child]});
|
|
++frame.next_child;
|
|
}
|
|
else
|
|
stack.pop();
|
|
}
|
|
}
|
|
|
|
static void explainPipelineStep(IQueryPlanStep & step, IQueryPlanStep::FormatSettings & settings)
|
|
{
|
|
settings.out << String(settings.offset, settings.indent_char) << "(" << step.getName() << ")\n";
|
|
size_t current_offset = settings.offset;
|
|
step.describePipeline(settings);
|
|
if (current_offset == settings.offset)
|
|
settings.offset += settings.indent;
|
|
}
|
|
|
|
void QueryPlan::explainPipeline(WriteBuffer & buffer, const ExplainPipelineOptions & options)
|
|
{
|
|
checkInitialized();
|
|
|
|
IQueryPlanStep::FormatSettings settings{.out = buffer, .write_header = options.header};
|
|
|
|
struct Frame
|
|
{
|
|
Node * node;
|
|
size_t offset = 0;
|
|
bool is_description_printed = false;
|
|
size_t next_child = 0;
|
|
};
|
|
|
|
std::stack<Frame> stack;
|
|
stack.push(Frame{.node = root});
|
|
|
|
while (!stack.empty())
|
|
{
|
|
auto & frame = stack.top();
|
|
|
|
if (!frame.is_description_printed)
|
|
{
|
|
settings.offset = frame.offset;
|
|
explainPipelineStep(*frame.node->step, settings);
|
|
frame.offset = settings.offset;
|
|
frame.is_description_printed = true;
|
|
}
|
|
|
|
if (frame.next_child < frame.node->children.size())
|
|
{
|
|
stack.push(Frame{frame.node->children[frame.next_child], frame.offset});
|
|
++frame.next_child;
|
|
}
|
|
else
|
|
stack.pop();
|
|
}
|
|
}
|
|
|
|
void QueryPlan::optimize()
|
|
{
|
|
QueryPlanOptimizations::optimizeTree(*root, nodes);
|
|
}
|
|
|
|
}
|