Added source steps.

This commit is contained in:
Nikolai Kochetov 2020-06-16 17:11:19 +03:00
parent be8029e54f
commit 4407bd7daa
10 changed files with 125 additions and 6 deletions

View File

@ -77,6 +77,8 @@
#include <Processors/QueryPlan/ReadFromStorageStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ReadNothingStep.h>
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
namespace DB
@ -706,7 +708,8 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn
if (options.only_analyze)
{
pipeline.init(Pipe(std::make_shared<NullSource>(source_header)));
ReadNothingStep read_nothing(DataStream{.header = source_header});
read_nothing.initializePipeline(pipeline);
if (expressions.prewhere_info)
{
@ -736,11 +739,13 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn
{
if (prepared_input)
{
pipeline.init(Pipe(std::make_shared<SourceFromInputStream>(prepared_input)));
ReadFromPreparedSource prepared_source_step(Pipe(std::make_shared<SourceFromInputStream>(prepared_input)));
prepared_source_step.initializePipeline(pipeline);
}
else if (prepared_pipe)
{
pipeline.init(std::move(*prepared_pipe));
ReadFromPreparedSource prepared_source_step(std::move(*prepared_pipe));
prepared_source_step.initializePipeline(pipeline);
}
if (from_stage == QueryProcessingStage::WithMergeableState &&
@ -1050,7 +1055,9 @@ void InterpreterSelectQuery::executeFetchColumns(
{std::move(column), std::make_shared<DataTypeAggregateFunction>(func, argument_types, desc.parameters), desc.column_name}};
auto istream = std::make_shared<OneBlockInputStream>(block_with_count);
pipeline.init(Pipe(std::make_shared<SourceFromInputStream>(istream)));
ReadFromPreparedSource prepared_count(Pipe(std::make_shared<SourceFromInputStream>(istream)));
prepared_count.setStepDescription("Optimized trivial count");
prepared_count.initializePipeline(pipeline);
from_stage = QueryProcessingStage::WithMergeableState;
analysis_result.first_stage = false;
return;

View File

@ -0,0 +1,19 @@
#include <Processors/QueryPlan/ISourceStep.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
ISourceStep::ISourceStep(DataStream output_stream_)
{
output_stream = std::move(output_stream_);
}
QueryPipelinePtr ISourceStep::updatePipeline(QueryPipelines)
{
auto pipeline = std::make_unique<QueryPipeline>();
initializePipeline(*pipeline);
return pipeline;
}
}

View File

@ -0,0 +1,17 @@
#pragma once
#include <Processors/QueryPlan/IQueryPlanStep.h>
namespace DB
{
class ISourceStep : public IQueryPlanStep
{
public:
explicit ISourceStep(DataStream output_stream_);
QueryPipelinePtr updatePipeline(QueryPipelines pipelines) override;
virtual void initializePipeline(QueryPipeline & pipeline) = 0;
};
}

View File

@ -82,7 +82,7 @@ QueryPipelinePtr QueryPlan::buildQueryPipeline()
struct Frame
{
Node * node;
QueryPipelines pipelines;
QueryPipelines pipelines = {};
};
QueryPipelinePtr last_pipeline;

View File

@ -30,7 +30,7 @@ private:
struct Node
{
QueryPlanStepPtr step;
std::vector<Node *> children;
std::vector<Node *> children = {};
};
using Nodes = std::list<Node>;

View File

@ -0,0 +1,18 @@
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
ReadFromPreparedSource::ReadFromPreparedSource(Pipe pipe_)
: ISourceStep(DataStream{.header = pipe_.getHeader()})
, pipe(std::move(pipe_))
{
}
void ReadFromPreparedSource::initializePipeline(QueryPipeline & pipeline)
{
pipeline.init(std::move(pipe));
}
}

View File

@ -0,0 +1,21 @@
#pragma once
#include <Processors/QueryPlan/ISourceStep.h>
#include <Processors/Pipe.h>
namespace DB
{
class ReadFromPreparedSource : public ISourceStep
{
public:
explicit ReadFromPreparedSource(Pipe pipe_);
String getName() const override { return "ReadNothing"; }
void initializePipeline(QueryPipeline & pipeline) override;
private:
Pipe pipe;
};
}

View File

@ -0,0 +1,18 @@
#include <Processors/QueryPlan/ReadNothingStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Sources/NullSource.h>
namespace DB
{
ReadNothingStep::ReadNothingStep(DataStream output_stream_)
: ISourceStep(std::move(output_stream_))
{
}
void ReadNothingStep::initializePipeline(QueryPipeline & pipeline)
{
pipeline.init(Pipe(std::make_shared<NullSource>(output_stream.header)));
}
}

View File

@ -0,0 +1,17 @@
#pragma once
#include <Processors/QueryPlan/ISourceStep.h>
namespace DB
{
class ReadNothingStep : public ISourceStep
{
public:
explicit ReadNothingStep(DataStream output_stream_);
String getName() const override { return "ReadNothing"; }
void initializePipeline(QueryPipeline & pipeline) override;
};
}

View File

@ -139,9 +139,11 @@ SRCS(
Transforms/AggregatingInOrderTransform.cpp
QueryPlan/ExpressionStep.cpp
QueryPlan/FilterStep.cpp
QueryPlan/ISourceStep.cpp
QueryPlan/ITransformingStep.cpp
QueryPlan/IQueryPlanStep.cpp
QueryPlan/ReadFromStorageStep.cpp
QueryPlan/ReadNothingStep.cpp
QueryPlan/QueryPlan.cpp
)