Add QueryPlan.

This commit is contained in:
Nikolai Kochetov 2020-06-08 12:14:58 +03:00
parent 108575c8ad
commit 4c179e454a
4 changed files with 159 additions and 0 deletions

View File

@ -51,4 +51,5 @@ protected:
std::optional<DataStream> output_stream;
};
using QueryPlanStepPtr = std::unique_ptr<IQueryPlanStep>;
}

View File

@ -0,0 +1,112 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <stack>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
void QueryPlan::checkInitialized() const
{
if (!isInitialized())
throw Exception("QueryPlan was not initialized", ErrorCodes::LOGICAL_ERROR);
}
void QueryPlan::checkNotCompleted() const
{
if (isCompleted())
throw Exception("QueryPlan was already completed", ErrorCodes::LOGICAL_ERROR);
}
bool QueryPlan::isCompleted() const
{
return isInitialized() && !root->step->hasOutputStream();
}
const DataStream & QueryPlan::getCurrentDataStream() const
{
checkInitialized();
checkNotCompleted();
return root->step->getOutputStream();
}
void QueryPlan::addStep(QueryPlanStepPtr step)
{
checkNotCompleted();
size_t num_input_streams = step->getInputStreams().size();
if (num_input_streams == 0)
{
if (isInitialized())
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because "
"step has no inputs, but QueryPlan is already initialised", ErrorCodes::LOGICAL_ERROR);
nodes.emplace_back(Node{.step = std::move(step)});
return;
}
if (num_input_streams == 1)
{
if (!isInitialized())
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because "
"step has input, but QueryPlan is not initialised", ErrorCodes::LOGICAL_ERROR);
const auto & root_header = root->step->getOutputStream().header;
const auto & step_header = step->getInputStreams().front().header;
if (!blocksHaveEqualStructure(root_header, step_header))
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because "
"it has incompatible header with root step " + root->step->getName() + " "
"root header: " + root_header.dumpStructure() +
"step header: " + step_header.dumpStructure(), ErrorCodes::LOGICAL_ERROR);
nodes.emplace_back(Node{.step = std::move(step), .children = {root}});
root = &nodes.back();
return;
}
throw Exception("Cannot add step " + step->getName() + " to QueryPlan because it has " +
std::to_string(num_input_streams) + " inputs but " + std::to_string(isInitialized() ? 1 : 0) +
" input expected", ErrorCodes::LOGICAL_ERROR);
}
QueryPipelinePtr QueryPlan::buildQueryPipeline()
{
checkInitialized();
struct Frame
{
Node * node;
QueryPipelines pipelines;
};
QueryPipelinePtr last_pipeline;
std::stack<Frame> stack;
stack.push({.node = root});
while (!stack.empty())
{
auto & frame = stack.top();
if (last_pipeline)
frame.pipelines.emplace_back(std::move(last_pipeline));
size_t next_child = frame.pipelines.size();
if (next_child == frame.node->children.size())
{
last_pipeline = frame.node->step->updatePipeline(std::move(frame.pipelines));
stack.pop();
}
else
stack.push({.node = frame.node->children[next_child]});
}
return last_pipeline;
}
}

View File

@ -0,0 +1,45 @@
#pragma once
#include <memory>
#include <list>
#include <vector>
namespace DB
{
class DataStream;
class IQueryPlanStep;
using QueryPlanStepPtr = std::unique_ptr<IQueryPlanStep>;
class QueryPipeline;
using QueryPipelinePtr = std::unique_ptr<QueryPipeline>;
/// A tree of query steps.
class QueryPlan
{
public:
void addStep(QueryPlanStepPtr step);
bool isInitialized() const { return root != nullptr; } /// Tree is not empty
bool isCompleted() const; /// Tree is not empty and root hasOutputStream()
const DataStream & getCurrentDataStream() const; /// Checks that (isInitialized() && !isCompleted())
QueryPipelinePtr buildQueryPipeline();
private:
struct Node
{
QueryPlanStepPtr step;
std::vector<Node *> children;
};
using Nodes = std::list<Node>;
Nodes nodes;
Node * root = nullptr;
void checkInitialized() const;
void checkNotCompleted() const;
};
}

View File

@ -135,6 +135,7 @@ SRCS(
Transforms/SortingTransform.cpp
Transforms/TotalsHavingTransform.cpp
QueryPlan/IQueryPlanStep.cpp
QueryPlan/QueryPlan.cpp
)
END()