Add CreatingSets step.

This commit is contained in:
Nikolai Kochetov 2020-06-17 18:50:21 +03:00
parent f279df6015
commit 1ad4f2c0fd
6 changed files with 71 additions and 9 deletions

View File

@ -88,6 +88,7 @@
#include <Processors/QueryPlan/MergingAggregatedStep.h>
#include <Processors/QueryPlan/AddingDelayedStreamStep.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
namespace DB
@ -1873,12 +1874,14 @@ void InterpreterSelectQuery::executeSubqueriesInSetsAndJoins(QueryPipeline & pip
const Settings & settings = context->getSettingsRef();
auto creating_sets = std::make_shared<CreatingSetsTransform>(
pipeline.getHeader(), subqueries_for_sets,
CreatingSetsStep creating_sets(
DataStream{.header = pipeline.getHeader()},
subqueries_for_sets,
SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode),
*context);
pipeline.addCreatingSetsTransform(std::move(creating_sets));
creating_sets.setStepDescription("Create sets for subqueries and joins");
creating_sets.transformPipeline(pipeline);
}

View File

@ -0,0 +1,30 @@
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/CreatingSetsTransform.h>
namespace DB
{
CreatingSetsStep::CreatingSetsStep(
const DataStream & input_stream_,
SubqueriesForSets subqueries_for_sets_,
SizeLimits network_transfer_limits_,
const Context & context_)
: ITransformingStep(input_stream_, input_stream_)
, subqueries_for_sets(std::move(subqueries_for_sets_))
, network_transfer_limits(std::move(network_transfer_limits_))
, context(context_)
{
}
void CreatingSetsStep::transformPipeline(QueryPipeline & pipeline)
{
auto creating_sets = std::make_shared<CreatingSetsTransform>(
pipeline.getHeader(), subqueries_for_sets,
network_transfer_limits,
context);
pipeline.addCreatingSetsTransform(std::move(creating_sets));
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <DataStreams/SizeLimits.h>
#include <Interpreters/SubqueryForSet.h>
namespace DB
{
class CreatingSetsStep : public ITransformingStep
{
public:
CreatingSetsStep(
const DataStream & input_stream_,
SubqueriesForSets subqueries_for_sets_,
SizeLimits network_transfer_limits_,
const Context & context_);
String getName() const override { return "CreatingSets"; }
void transformPipeline(QueryPipeline & pipeline) override;
private:
SubqueriesForSets subqueries_for_sets;
SizeLimits network_transfer_limits;
const Context & context;
};
}

View File

@ -23,13 +23,13 @@ namespace ErrorCodes
CreatingSetsTransform::CreatingSetsTransform(
Block out_header_,
const SubqueriesForSets & subqueries_for_sets_,
const SizeLimits & network_transfer_limits_,
SubqueriesForSets subqueries_for_sets_,
SizeLimits network_transfer_limits_,
const Context & context_)
: IProcessor({}, {std::move(out_header_)})
, subqueries_for_sets(subqueries_for_sets_)
, subqueries_for_sets(std::move(subqueries_for_sets_))
, cur_subquery(subqueries_for_sets.begin())
, network_transfer_limits(network_transfer_limits_)
, network_transfer_limits(std::move(network_transfer_limits_))
, context(context_)
{
}

View File

@ -21,8 +21,8 @@ class CreatingSetsTransform : public IProcessor
public:
CreatingSetsTransform(
Block out_header_,
const SubqueriesForSets & subqueries_for_sets_,
const SizeLimits & network_transfer_limits_,
SubqueriesForSets subqueries_for_sets_,
SizeLimits network_transfer_limits_,
const Context & context_);
String getName() const override { return "CreatingSetsTransform"; }

View File

@ -139,6 +139,7 @@ SRCS(
Transforms/AggregatingInOrderTransform.cpp
QueryPlan/AddingDelayedStreamStep.cpp
QueryPlan/AggregatingStep.cpp
QueryPlan/CreatingSetsStep.cpp
QueryPlan/DistinctStep.cpp
QueryPlan/ExpressionStep.cpp
QueryPlan/FilterStep.cpp