#include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; } QueryPlan::QueryPlan() = default; QueryPlan::~QueryPlan() = default; QueryPlan::QueryPlan(QueryPlan &&) = default; QueryPlan & QueryPlan::operator=(QueryPlan &&) = default; void QueryPlan::checkInitialized() const { if (!isInitialized()) throw Exception("QueryPlan was not initialized", ErrorCodes::LOGICAL_ERROR); } void QueryPlan::checkNotCompleted() const { if (isCompleted()) throw Exception("QueryPlan was already completed", ErrorCodes::LOGICAL_ERROR); } bool QueryPlan::isCompleted() const { return isInitialized() && !root->step->hasOutputStream(); } const DataStream & QueryPlan::getCurrentDataStream() const { checkInitialized(); checkNotCompleted(); return root->step->getOutputStream(); } void QueryPlan::unitePlans(QueryPlanStepPtr step, std::vector> plans) { if (isInitialized()) throw Exception("Cannot unite plans because current QueryPlan is already initialized", ErrorCodes::LOGICAL_ERROR); const auto & inputs = step->getInputStreams(); size_t num_inputs = step->getInputStreams().size(); if (num_inputs != plans.size()) { throw Exception("Cannot unite QueryPlans using " + step->getName() + " because step has different number of inputs. " "Has " + std::to_string(plans.size()) + " plans " "and " + std::to_string(num_inputs) + " inputs", ErrorCodes::LOGICAL_ERROR); } for (size_t i = 0; i < num_inputs; ++i) { const auto & step_header = inputs[i].header; const auto & plan_header = plans[i]->getCurrentDataStream().header; if (!blocksHaveEqualStructure(step_header, plan_header)) throw Exception("Cannot unite QueryPlans using " + step->getName() + " because " "it has incompatible header with plan " + root->step->getName() + " " "plan header: " + plan_header.dumpStructure() + "step header: " + step_header.dumpStructure(), ErrorCodes::LOGICAL_ERROR); } for (auto & plan : plans) nodes.splice(nodes.end(), std::move(plan->nodes)); nodes.emplace_back(Node{.step = std::move(step)}); root = &nodes.back(); for (auto & plan : plans) root->children.emplace_back(plan->root); for (auto & plan : plans) { max_threads = std::max(max_threads, plan->max_threads); interpreter_context.insert(interpreter_context.end(), plan->interpreter_context.begin(), plan->interpreter_context.end()); } } void QueryPlan::addStep(QueryPlanStepPtr step) { checkNotCompleted(); size_t num_input_streams = step->getInputStreams().size(); if (num_input_streams == 0) { if (isInitialized()) throw Exception("Cannot add step " + step->getName() + " to QueryPlan because " "step has no inputs, but QueryPlan is already initialized", ErrorCodes::LOGICAL_ERROR); nodes.emplace_back(Node{.step = std::move(step)}); root = &nodes.back(); return; } if (num_input_streams == 1) { if (!isInitialized()) throw Exception("Cannot add step " + step->getName() + " to QueryPlan because " "step has input, but QueryPlan is not initialized", ErrorCodes::LOGICAL_ERROR); const auto & root_header = root->step->getOutputStream().header; const auto & step_header = step->getInputStreams().front().header; if (!blocksHaveEqualStructure(root_header, step_header)) throw Exception("Cannot add step " + step->getName() + " to QueryPlan because " "it has incompatible header with root step " + root->step->getName() + " " "root header: " + root_header.dumpStructure() + "step header: " + step_header.dumpStructure(), ErrorCodes::LOGICAL_ERROR); nodes.emplace_back(Node{.step = std::move(step), .children = {root}}); root = &nodes.back(); return; } throw Exception("Cannot add step " + step->getName() + " to QueryPlan because it has " + std::to_string(num_input_streams) + " inputs but " + std::to_string(isInitialized() ? 1 : 0) + " input expected", ErrorCodes::LOGICAL_ERROR); } QueryPipelinePtr QueryPlan::buildQueryPipeline( const QueryPlanOptimizationSettings & optimization_settings, const BuildQueryPipelineSettings & build_pipeline_settings) { checkInitialized(); optimize(optimization_settings); struct Frame { Node * node; QueryPipelines pipelines = {}; }; QueryPipelinePtr last_pipeline; std::stack stack; stack.push(Frame{.node = root}); while (!stack.empty()) { auto & frame = stack.top(); if (last_pipeline) { frame.pipelines.emplace_back(std::move(last_pipeline)); last_pipeline = nullptr; } size_t next_child = frame.pipelines.size(); if (next_child == frame.node->children.size()) { bool limit_max_threads = frame.pipelines.empty(); last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines), build_pipeline_settings); if (limit_max_threads && max_threads) last_pipeline->limitMaxThreads(max_threads); stack.pop(); } else stack.push(Frame{.node = frame.node->children[next_child]}); } for (auto & context : interpreter_context) last_pipeline->addInterpreterContext(std::move(context)); return last_pipeline; } Pipe QueryPlan::convertToPipe( const QueryPlanOptimizationSettings & optimization_settings, const BuildQueryPipelineSettings & build_pipeline_settings) { if (!isInitialized()) return {}; if (isCompleted()) throw Exception("Cannot convert completed QueryPlan to Pipe", ErrorCodes::LOGICAL_ERROR); return QueryPipeline::getPipe(std::move(*buildQueryPipeline(optimization_settings, build_pipeline_settings))); } void QueryPlan::addInterpreterContext(std::shared_ptr context) { interpreter_context.emplace_back(std::move(context)); } static void explainStep( const IQueryPlanStep & step, IQueryPlanStep::FormatSettings & settings, const QueryPlan::ExplainPlanOptions & options) { std::string prefix(settings.offset, ' '); settings.out << prefix; settings.out << step.getName(); const auto & description = step.getStepDescription(); if (options.description && !description.empty()) settings.out <<" (" << description << ')'; settings.out.write('\n'); if (options.header) { settings.out << prefix; if (!step.hasOutputStream()) settings.out << "No header"; else if (!step.getOutputStream().header) settings.out << "Empty header"; else { settings.out << "Header: "; bool first = true; for (const auto & elem : step.getOutputStream().header) { if (!first) settings.out << "\n" << prefix << " "; first = false; elem.dumpNameAndType(settings.out); } } settings.out.write('\n'); } if (options.actions) step.describeActions(settings); } std::string debugExplainStep(const IQueryPlanStep & step) { WriteBufferFromOwnString out; IQueryPlanStep::FormatSettings settings{.out = out}; QueryPlan::ExplainPlanOptions options{.actions = true}; explainStep(step, settings, options); return out.str(); } void QueryPlan::explainPlan(WriteBuffer & buffer, const ExplainPlanOptions & options) { checkInitialized(); IQueryPlanStep::FormatSettings settings{.out = buffer, .write_header = options.header}; struct Frame { Node * node; bool is_description_printed = false; size_t next_child = 0; }; std::stack stack; stack.push(Frame{.node = root}); while (!stack.empty()) { auto & frame = stack.top(); if (!frame.is_description_printed) { settings.offset = (stack.size() - 1) * settings.indent; explainStep(*frame.node->step, settings, options); frame.is_description_printed = true; } if (frame.next_child < frame.node->children.size()) { stack.push(Frame{frame.node->children[frame.next_child]}); ++frame.next_child; } else stack.pop(); } } static void explainPipelineStep(IQueryPlanStep & step, IQueryPlanStep::FormatSettings & settings) { settings.out << String(settings.offset, settings.indent_char) << "(" << step.getName() << ")\n"; size_t current_offset = settings.offset; step.describePipeline(settings); if (current_offset == settings.offset) settings.offset += settings.indent; } void QueryPlan::explainPipeline(WriteBuffer & buffer, const ExplainPipelineOptions & options) { checkInitialized(); IQueryPlanStep::FormatSettings settings{.out = buffer, .write_header = options.header}; struct Frame { Node * node; size_t offset = 0; bool is_description_printed = false; size_t next_child = 0; }; std::stack stack; stack.push(Frame{.node = root}); while (!stack.empty()) { auto & frame = stack.top(); if (!frame.is_description_printed) { settings.offset = frame.offset; explainPipelineStep(*frame.node->step, settings); frame.offset = settings.offset; frame.is_description_printed = true; } if (frame.next_child < frame.node->children.size()) { stack.push(Frame{frame.node->children[frame.next_child], frame.offset}); ++frame.next_child; } else stack.pop(); } } void QueryPlan::optimize(const QueryPlanOptimizationSettings & optimization_settings) { QueryPlanOptimizations::optimizeTree(optimization_settings, *root, nodes); } }