Added AddingDelayedStreamStep

This commit is contained in:
Nikolai Kochetov 2020-06-17 16:38:17 +03:00
parent 0829d1d16d
commit 582ea24469
4 changed files with 53 additions and 1 deletions

View File

@ -86,6 +86,7 @@
#include <Processors/QueryPlan/LimitByStep.h>
#include <Processors/QueryPlan/LimitStep.h>
#include <Processors/QueryPlan/MergingAggregatedStep.h>
#include <Processors/QueryPlan/AddingDelayedStreamStep.h>
namespace DB
@ -883,7 +884,11 @@ void InterpreterSelectQuery::executeImpl(QueryPipeline & pipeline, const BlockIn
if (auto stream = join->createStreamWithNonJoinedRows(join_result_sample, settings.max_block_size))
{
auto source = std::make_shared<SourceFromInputStream>(std::move(stream));
pipeline.addDelayedStream(source);
AddingDelayedStreamStep add_non_joined_rows_step(
DataStream{.header = pipeline.getHeader()}, std::move(source));
add_non_joined_rows_step.setStepDescription("Add non-joined rows after JOIN");
add_non_joined_rows_step.transformPipeline(pipeline);
}
}
}

View File

@ -0,0 +1,20 @@
#include <Processors/QueryPlan/AddingDelayedStreamStep.h>
#include <Processors/QueryPipeline.h>
namespace DB
{
AddingDelayedStreamStep::AddingDelayedStreamStep(
const DataStream & input_stream_,
ProcessorPtr source_)
: ITransformingStep(input_stream_, input_stream_)
, source(std::move(source_))
{
}
void AddingDelayedStreamStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addDelayedStream(source);
}
}

View File

@ -0,0 +1,26 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
namespace DB
{
class IProcessor;
using ProcessorPtr = std::shared_ptr<IProcessor>;
class AddingDelayedStreamStep : public ITransformingStep
{
public:
explicit AddingDelayedStreamStep(
const DataStream & input_stream_,
ProcessorPtr source_);
String getName() const override { return "AddingDelayedStream"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
ProcessorPtr source;
};
}

View File

@ -137,6 +137,7 @@ SRCS(
Transforms/SortingTransform.cpp
Transforms/TotalsHavingTransform.cpp
Transforms/AggregatingInOrderTransform.cpp
QueryPlan/AddingDelayedStreamStep.cpp
QueryPlan/DistinctStep.cpp
QueryPlan/ExpressionStep.cpp
QueryPlan/FilterStep.cpp