Update query plan traits for DISTINCT. Add more comments.

This commit is contained in:
Nikolai Kochetov 2020-06-22 13:18:28 +03:00
parent 5decc73b5d
commit d7d334bf6f
60 changed files with 211 additions and 147 deletions

View File

@ -45,7 +45,7 @@
#include <Processors/QueryPlan/LimitByStep.h>
#include <Processors/QueryPlan/LimitStep.h>
#include <Processors/QueryPlan/MergingAggregatedStep.h>
#include <Processors/QueryPlan/AddingDelayedStreamStep.h>
#include <Processors/QueryPlan/AddingDelayedStreamSource.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/TotalsHavingStep.h>
@ -891,7 +891,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
if (auto stream = join->createStreamWithNonJoinedRows(join_result_sample, settings.max_block_size))
{
auto source = std::make_shared<SourceFromInputStream>(std::move(stream));
auto add_non_joined_rows_step = std::make_unique<AddingDelayedStreamStep>(
auto add_non_joined_rows_step = std::make_unique<AddingDelayedStreamSource>(
query_plan.getCurrentDataStream(), std::move(source));
add_non_joined_rows_step->setStepDescription("Add non-joined rows after JOIN");

View File

@ -33,7 +33,7 @@ private:
RowsBeforeLimitCounterPtr rows_before_limit_at_least;
/// State of port's pair.
/// Chunks from different port pairs are not mixed for berret cache locality.
/// Chunks from different port pairs are not mixed for better cache locality.
struct PortsData
{
Chunk current_chunk;

View File

@ -20,7 +20,7 @@ private:
RowsBeforeLimitCounterPtr rows_before_limit_at_least;
/// State of port's pair.
/// Chunks from different port pairs are not mixed for berret cache locality.
/// Chunks from different port pairs are not mixed for better cache locality.
struct PortsData
{
Chunk current_chunk;

View File

@ -1,4 +1,4 @@
#include <Processors/QueryPlan/AddingDelayedStreamStep.h>
#include <Processors/QueryPlan/AddingDelayedStreamSource.h>
#include <Processors/QueryPipeline.h>
namespace DB
@ -8,11 +8,13 @@ static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = false
.preserves_distinct_columns = false,
.returns_single_stream = false,
.preserves_number_of_streams = false,
};
}
AddingDelayedStreamStep::AddingDelayedStreamStep(
AddingDelayedStreamSource::AddingDelayedStreamSource(
const DataStream & input_stream_,
ProcessorPtr source_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
@ -20,7 +22,7 @@ AddingDelayedStreamStep::AddingDelayedStreamStep(
{
}
void AddingDelayedStreamStep::transformPipeline(QueryPipeline & pipeline)
void AddingDelayedStreamSource::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addDelayedStream(source);
}

View File

@ -8,14 +8,16 @@ namespace DB
class IProcessor;
using ProcessorPtr = std::shared_ptr<IProcessor>;
class AddingDelayedStreamStep : public ITransformingStep
/// Adds another source to pipeline. Data from this source will be read after data from all other sources.
/// NOTE: tis step is needed because of non-joined data from JOIN. Remove this step after adding JoinStep.
class AddingDelayedStreamSource : public ITransformingStep
{
public:
AddingDelayedStreamStep(
AddingDelayedStreamSource(
const DataStream & input_stream_,
ProcessorPtr source_);
String getName() const override { return "AddingDelayedStream"; }
String getName() const override { return "AddingDelayedSource"; }
void transformPipeline(QueryPipeline & pipeline) override;

View File

@ -11,7 +11,9 @@ static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = false /// Actually, we may check that distinct names are in aggregation keys
.preserves_distinct_columns = false, /// Actually, we may check that distinct names are in aggregation keys
.returns_single_stream = true,
.preserves_number_of_streams = false,
};
}

View File

@ -10,6 +10,7 @@ namespace DB
struct AggregatingTransformParams;
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
/// Aggregation. See AggregatingTransform.
class AggregatingStep : public ITransformingStep
{
public:

View File

@ -9,33 +9,17 @@ static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
};
}
static void filterDistinctColumns(const Block & res_header, NameSet & distinct_columns)
{
if (distinct_columns.empty())
return;
NameSet new_distinct_columns;
for (const auto & column : res_header)
if (distinct_columns.count(column.name))
new_distinct_columns.insert(column.name);
distinct_columns.swap(new_distinct_columns);
}
ConvertingStep::ConvertingStep(const DataStream & input_stream_, Block result_header_)
: ITransformingStep(
input_stream_,
result_header_,
getTraits())
, result_header(std::move(result_header_))
: ITransformingStep(input_stream_, result_header_, getTraits())
, result_header(std::move(result_header_))
{
/// Some columns may be removed
filterDistinctColumns(output_stream->header, output_stream->distinct_columns);
filterDistinctColumns(output_stream->header, output_stream->local_distinct_columns);
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void ConvertingStep::transformPipeline(QueryPipeline & pipeline)

View File

@ -4,6 +4,7 @@
namespace DB
{
/// Convert one block structure to another. See ConvertingTransform.
class ConvertingStep : public ITransformingStep
{
public:

View File

@ -9,7 +9,9 @@ static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
};
}

View File

@ -6,6 +6,7 @@
namespace DB
{
/// Creates sets for subqueries and JOIN. See CreatingSetsTransform.
class CreatingSetsStep : public ITransformingStep
{
public:

View File

@ -9,7 +9,9 @@ static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = false
.preserves_distinct_columns = false,
.returns_single_stream = true,
.preserves_number_of_streams = false,
};
}

View File

@ -8,6 +8,7 @@ namespace DB
struct AggregatingTransformParams;
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
/// WITH CUBE. See CubeTransform.
class CubeStep : public ITransformingStep
{
public:

View File

@ -5,35 +5,6 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
};
}
DistinctStep::DistinctStep(
const DataStream & input_stream_,
const SizeLimits & set_size_limits_,
UInt64 limit_hint_,
const Names & columns_,
bool pre_distinct_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, set_size_limits(set_size_limits_)
, limit_hint(limit_hint_)
, columns(columns_)
, pre_distinct(pre_distinct_)
{
auto & distinct_columns = pre_distinct ? output_stream->local_distinct_columns
: output_stream->distinct_columns;
/// Add more distinct columns.
for (const auto & name : columns)
distinct_columns.insert(name);
}
static bool checkColumnsAlreadyDistinct(const Names & columns, const NameSet & distinct_names)
{
bool columns_already_distinct = true;
@ -44,15 +15,47 @@ static bool checkColumnsAlreadyDistinct(const Names & columns, const NameSet & d
return columns_already_distinct;
}
static ITransformingStep::DataStreamTraits getTraits(bool pre_distinct, bool already_distinct_columns)
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = already_distinct_columns, /// Will be calculated separately otherwise
.returns_single_stream = !pre_distinct && !already_distinct_columns,
.preserves_number_of_streams = pre_distinct || already_distinct_columns,
};
}
DistinctStep::DistinctStep(
const DataStream & input_stream_,
const SizeLimits & set_size_limits_,
UInt64 limit_hint_,
const Names & columns_,
bool pre_distinct_)
: ITransformingStep(
input_stream_,
input_stream_.header,
getTraits(pre_distinct_, checkColumnsAlreadyDistinct(columns_, input_stream_.distinct_columns)))
, set_size_limits(set_size_limits_)
, limit_hint(limit_hint_)
, columns(columns_)
, pre_distinct(pre_distinct_)
{
if (!output_stream->distinct_columns.empty() /// Columns already distinct, do nothing
&& (!pre_distinct /// Main distinct
|| input_stream_.has_single_port)) /// pre_distinct for single port works as usual one
{
/// Build distinct set.
for (const auto & name : columns)
output_stream->distinct_columns.insert(name);
}
}
void DistinctStep::transformPipeline(QueryPipeline & pipeline)
{
if (checkColumnsAlreadyDistinct(columns, input_streams.front().distinct_columns))
return;
if ((pre_distinct || pipeline.getNumStreams() <= 1)
&& checkColumnsAlreadyDistinct(columns, input_streams.front().local_distinct_columns))
return;
if (!pre_distinct)
pipeline.resize(1);

View File

@ -5,6 +5,7 @@
namespace DB
{
/// Execute DISTINCT for specified columns.
class DistinctStep : public ITransformingStep
{
public:

View File

@ -11,23 +11,12 @@ static ITransformingStep::DataStreamTraits getTraits(const ExpressionActionsPtr
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = !expression->hasJoinOrArrayJoin()
.preserves_distinct_columns = !expression->hasJoinOrArrayJoin(),
.returns_single_stream = false,
.preserves_number_of_streams = true,
};
}
static void filterDistinctColumns(const Block & res_header, NameSet & distinct_columns)
{
if (distinct_columns.empty())
return;
NameSet new_distinct_columns;
for (const auto & column : res_header)
if (distinct_columns.count(column.name))
new_distinct_columns.insert(column.name);
distinct_columns.swap(new_distinct_columns);
}
ExpressionStep::ExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_, bool default_totals_)
: ITransformingStep(
input_stream_,
@ -37,9 +26,7 @@ ExpressionStep::ExpressionStep(const DataStream & input_stream_, ExpressionActio
, default_totals(default_totals_)
{
/// Some columns may be removed by expression.
/// TODO: also check aliases, functions and some types of join
filterDistinctColumns(output_stream->header, output_stream->distinct_columns);
filterDistinctColumns(output_stream->header, output_stream->local_distinct_columns);
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void ExpressionStep::transformPipeline(QueryPipeline & pipeline)
@ -67,8 +54,7 @@ InflatingExpressionStep::InflatingExpressionStep(const DataStream & input_stream
, expression(std::move(expression_))
, default_totals(default_totals_)
{
filterDistinctColumns(output_stream->header, output_stream->distinct_columns);
filterDistinctColumns(output_stream->header, output_stream->local_distinct_columns);
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void InflatingExpressionStep::transformPipeline(QueryPipeline & pipeline)

View File

@ -7,6 +7,7 @@ namespace DB
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/// Calculates specified expression. See ExpressionTransform.
class ExpressionStep : public ITransformingStep
{
public:
@ -25,7 +26,7 @@ class InflatingExpressionStep : public ITransformingStep
{
public:
explicit InflatingExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_, bool default_totals_ = false);
String getName() const override { return "Expression"; }
String getName() const override { return "InflatingExpression"; }
void transformPipeline(QueryPipeline & pipeline) override;

View File

@ -8,7 +8,9 @@ static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
};
}

View File

@ -3,10 +3,11 @@
namespace DB
{
/// Calculate extremes. Add special port for extremes.
class ExtremesStep : public ITransformingStep
{
public:
ExtremesStep(const DataStream & input_stream_);
explicit ExtremesStep(const DataStream & input_stream_);
String getName() const override { return "Extremes"; }

View File

@ -9,7 +9,9 @@ static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = false /// TODO: it seem to actually be true. Check it later.
.preserves_distinct_columns = false, /// TODO: it seem to actually be true. Check it later.
.returns_single_stream = true,
.preserves_number_of_streams = true,
};
}
@ -17,6 +19,8 @@ FillingStep::FillingStep(const DataStream & input_stream_, SortDescription sort_
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, sort_description(std::move(sort_description_))
{
if (!input_stream_.has_single_port)
throw Exception("FillingStep expects single input", ErrorCodes::LOGICAL_ERROR);
}
void FillingStep::transformPipeline(QueryPipeline & pipeline)

View File

@ -5,6 +5,7 @@
namespace DB
{
/// Implements modifier WITH FILL of ORDER BY clause. See FillingTransform.
class FillingStep : public ITransformingStep
{
public:

View File

@ -10,24 +10,12 @@ static ITransformingStep::DataStreamTraits getTraits(const ExpressionActionsPtr
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = !expression->hasJoinOrArrayJoin() /// I suppose it actually never happens
.preserves_distinct_columns = !expression->hasJoinOrArrayJoin(), /// I suppose it actually never happens
.returns_single_stream = false,
.preserves_number_of_streams = true,
};
}
static void filterDistinctColumns(const Block & res_header, NameSet & distinct_columns)
{
if (distinct_columns.empty())
return;
NameSet new_distinct_columns;
for (const auto & column : res_header)
if (distinct_columns.count(column.name))
new_distinct_columns.insert(column.name);
distinct_columns.swap(new_distinct_columns);
}
FilterStep::FilterStep(
const DataStream & input_stream_,
ExpressionActionsPtr expression_,
@ -42,8 +30,7 @@ FilterStep::FilterStep(
, remove_filter_column(remove_filter_column_)
{
/// TODO: it would be easier to remove all expressions from filter step. It should only filter by column name.
filterDistinctColumns(output_stream->header, output_stream->distinct_columns);
filterDistinctColumns(output_stream->header, output_stream->local_distinct_columns);
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void FilterStep::transformPipeline(QueryPipeline & pipeline)

View File

@ -7,6 +7,7 @@ namespace DB
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/// Implements WHERE, HAVING operations. See FilterTransform.
class FilterStep : public ITransformingStep
{
public:

View File

@ -12,7 +12,9 @@ static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
};
}
@ -28,9 +30,6 @@ FinishSortingStep::FinishSortingStep(
, max_block_size(max_block_size_)
, limit(limit_)
{
/// Streams are merged together, only global distinct keys remain distinct.
/// Note: we can not clear it if know that there will be only one stream in pipeline. Should we add info about it?
output_stream->local_distinct_columns.clear();
}
void FinishSortingStep::transformPipeline(QueryPipeline & pipeline)

View File

@ -5,6 +5,7 @@
namespace DB
{
/// Finish sorting of pre-sorted data. See FinishSortingTransform.
class FinishSortingStep : public ITransformingStep
{
public:

View File

@ -9,13 +9,19 @@ using QueryPipelinePtr = std::unique_ptr<QueryPipeline>;
using QueryPipelines = std::vector<QueryPipelinePtr>;
/// Description of data stream.
/// Single logical data stream may relate to many ports of pipeline.
class DataStream
{
public:
Block header;
/// Tuples with those columns are distinct.
/// It doesn't mean that columns are distinct separately.
/// Removing any column from this list brakes this invariant.
NameSet distinct_columns = {};
NameSet local_distinct_columns = {}; /// Those columns are distinct in separate thread, but not in general.
/// QueryPipeline has single port. Totals or extremes ports are not counted.
bool has_single_port = false;
/// Things which may be added:
/// * sort description

View File

@ -4,6 +4,7 @@
namespace DB
{
/// Step which takes empty pipeline and initializes it. Returns single logical DataStream.
class ISourceStep : public IQueryPlanStep
{
public:

View File

@ -6,14 +6,17 @@ namespace DB
ITransformingStep::ITransformingStep(DataStream input_stream, Block output_header, DataStreamTraits traits)
{
input_streams.emplace_back(std::move(input_stream));
output_stream = DataStream{.header = std::move(output_header)};
if (traits.preserves_distinct_columns)
{
output_stream->distinct_columns = input_streams.front().distinct_columns;
output_stream->local_distinct_columns = input_streams.front().local_distinct_columns;
}
output_stream->has_single_port = traits.returns_single_stream
|| (input_stream.has_single_port && traits.preserves_number_of_streams);
input_streams.emplace_back(std::move(input_stream));
}
QueryPipelinePtr ITransformingStep::updatePipeline(QueryPipelines pipelines)
@ -22,4 +25,19 @@ QueryPipelinePtr ITransformingStep::updatePipeline(QueryPipelines pipelines)
return std::move(pipelines.front());
}
void ITransformingStep::updateDistinctColumns(const Block & res_header, NameSet & distinct_columns)
{
if (distinct_columns.empty())
return;
for (const auto & column : res_header)
{
if (distinct_columns.count(column.name) == 0)
{
distinct_columns.clear();
break;
}
}
}
}

View File

@ -4,12 +4,25 @@
namespace DB
{
/// Step which has single input and single output data stream.
/// It doesn't mean that pipeline has single port before or after such step.
class ITransformingStep : public IQueryPlanStep
{
public:
struct DataStreamTraits
{
/// Keep distinct_columns unchanged.
/// Examples: true for LimitStep, false for ExpressionStep with ARRAY JOIN
/// It some columns may be removed from result header, call updateDistinctColumns
bool preserves_distinct_columns;
/// True if pipeline has single output port after this step.
/// Examples: MergeSortingStep, AggregatingStep
bool returns_single_stream;
/// Won't change the number of ports for pipeline.
/// Examples: true for ExpressionStep, false for MergeSortingStep
bool preserves_number_of_streams;
};
ITransformingStep(DataStream input_stream, Block output_header, DataStreamTraits traits);
@ -17,6 +30,10 @@ public:
QueryPipelinePtr updatePipeline(QueryPipelines pipelines) override;
virtual void transformPipeline(QueryPipeline & pipeline) = 0;
protected:
/// Clear distinct_columns if res_header doesn't contain all af them.
static void updateDistinctColumns(const Block & res_header, NameSet & distinct_columns);
};
}

View File

@ -9,7 +9,9 @@ static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
};
}

View File

@ -4,6 +4,7 @@
namespace DB
{
/// Executes LIMIT BY for specified columns. See LimitByTransform.
class LimitByStep : public ITransformingStep
{
public:

View File

@ -9,7 +9,9 @@ static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
};
}
@ -24,7 +26,6 @@ LimitStep::LimitStep(
, always_read_till_end(always_read_till_end_)
, with_ties(with_ties_), description(std::move(description_))
{
}
void LimitStep::transformPipeline(QueryPipeline & pipeline)

View File

@ -6,14 +6,15 @@
namespace DB
{
/// Executes LIMIT. See LimitTransform.
class LimitStep : public ITransformingStep
{
public:
LimitStep(
const DataStream & input_stream_,
size_t limit_, size_t offset_,
bool always_read_till_end_ = false,
bool with_ties_ = false,
bool always_read_till_end_ = false, /// Read all data even if limit is reached. Needed for totals.
bool with_ties_ = false, /// Limit with ties.
SortDescription description_ = {});
String getName() const override { return "Limit"; }

View File

@ -9,7 +9,9 @@ static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
};
}

View File

@ -7,6 +7,7 @@
namespace DB
{
/// Sorts stream of data. See MergeSortingTransform.
class MergeSortingStep : public ITransformingStep
{
public:

View File

@ -11,7 +11,9 @@ static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = false
.preserves_distinct_columns = false,
.returns_single_stream = true,
.preserves_number_of_streams = false,
};
}
@ -36,7 +38,7 @@ void MergingAggregatedStep::transformPipeline(QueryPipeline & pipeline)
{
if (!memory_efficient_aggregation)
{
/// We union several sources into one, parallelizing the work.
/// We union several sources into one, paralleling the work.
pipeline.resize(1);
/// Now merge the aggregated blocks

View File

@ -8,6 +8,7 @@ namespace DB
struct AggregatingTransformParams;
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
/// This step finishes aggregation. See AggregatingSortedTransform.
class MergingAggregatedStep : public ITransformingStep
{
public:

View File

@ -9,7 +9,9 @@ static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
};
}
@ -23,9 +25,6 @@ MergingSortedStep::MergingSortedStep(
, max_block_size(max_block_size_)
, limit(limit_)
{
/// Streams are merged together, only global distinct keys remain distinct.
/// Note: we can not clear it if know that there will be only one stream in pipeline. Should we add info about it?
output_stream->local_distinct_columns.clear();
}
void MergingSortedStep::transformPipeline(QueryPipeline & pipeline)

View File

@ -7,6 +7,7 @@
namespace DB
{
/// Merge streams of data into single sorted stream.
class MergingSortedStep : public ITransformingStep
{
public:

View File

@ -9,7 +9,9 @@ static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
};
}
@ -21,13 +23,10 @@ OffsetsStep::OffsetsStep(const DataStream & input_stream_, size_t offset_)
void OffsetsStep::transformPipeline(QueryPipeline & pipeline)
{
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr
{
if (stream_type != QueryPipeline::StreamType::Main)
return nullptr;
auto transform = std::make_shared<OffsetTransform>(
pipeline.getHeader(), offset, pipeline.getNumStreams());
return std::make_shared<OffsetTransform>(header, offset, 1);
});
pipeline.addPipe({std::move(transform)});
}
}

View File

@ -5,6 +5,7 @@
namespace DB
{
/// Executes OFFSET (without LIMIT). See OffsetTransform.
class OffsetsStep : public ITransformingStep
{
public:

View File

@ -10,7 +10,9 @@ static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
};
}

View File

@ -6,6 +6,7 @@
namespace DB
{
/// Sort separate chunks of data.
class PartialSortingStep : public ITransformingStep
{
public:

View File

@ -17,6 +17,8 @@ using QueryPipelinePtr = std::unique_ptr<QueryPipeline>;
class Context;
/// A tree of query steps.
/// The goal of QueryPlan is to build QueryPipeline.
/// QueryPlan let delay pipeline creation which is helpful for pipeline-level optimisations.
class QueryPlan
{
public:
@ -38,6 +40,7 @@ public:
void addInterpreterContext(std::shared_ptr<Context> context);
private:
/// Tree node. Step and it's children.
struct Node
{
QueryPlanStepPtr step;
@ -52,8 +55,8 @@ private:
void checkInitialized() const;
void checkNotCompleted() const;
/// Those fields are passed to QueryPipeline.
size_t max_threads = 0;
std::vector<std::shared_ptr<Context>> interpreter_context;
};

View File

@ -5,7 +5,7 @@ namespace DB
{
ReadFromPreparedSource::ReadFromPreparedSource(Pipe pipe_, std::shared_ptr<Context> context_)
: ISourceStep(DataStream{.header = pipe_.getHeader()})
: ISourceStep(DataStream{.header = pipe_.getHeader(), .has_single_port = true})
, pipe(std::move(pipe_))
, context(std::move(context_))
{

View File

@ -5,6 +5,7 @@
namespace DB
{
/// Create source from prepared pipe.
class ReadFromPreparedSource : public ISourceStep
{
public:

View File

@ -122,7 +122,7 @@ ReadFromStorageStep::ReadFromStorageStep(
pipeline->addInterpreterContext(std::move(context));
pipeline->addStorageHolder(std::move(storage));
output_stream = DataStream{.header = pipeline->getHeader()};
output_stream = DataStream{.header = pipeline->getHeader(), .has_single_port = pipeline->getNumStreams() == 1};
}
ReadFromStorageStep::~ReadFromStorageStep() = default;

View File

@ -6,7 +6,7 @@ namespace DB
{
ReadNothingStep::ReadNothingStep(Block output_header)
: ISourceStep(DataStream{.header = std::move(output_header)})
: ISourceStep(DataStream{.header = std::move(output_header), .has_single_port = true})
{
}

View File

@ -4,6 +4,7 @@
namespace DB
{
/// Create NullSource with specified structure.
class ReadNothingStep : public ISourceStep
{
public:

View File

@ -9,7 +9,9 @@ static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = false
.preserves_distinct_columns = false,
.returns_single_stream = true,
.preserves_number_of_streams = false,
};
}

View File

@ -8,6 +8,7 @@ namespace DB
struct AggregatingTransformParams;
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
/// WITH ROLLUP. See RollupTransform.
class RollupStep : public ITransformingStep
{
public:

View File

@ -10,7 +10,9 @@ static ITransformingStep::DataStreamTraits getTraits()
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
};
}

View File

@ -9,6 +9,7 @@ using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
enum class TotalsMode;
/// Execute HAVING and calculate totals. See TotalsHavingTransform.
class TotalsHavingStep : public ITransformingStep
{
public:

View File

@ -12,8 +12,10 @@ UnionStep::UnionStep(DataStreams input_streams_, Block result_header, size_t max
{
input_streams = std::move(input_streams_);
/// TODO: update traits
output_stream = DataStream{.header = header};
if (input_streams.size() == 1)
output_stream = input_streams.front();
else
output_stream = DataStream{.header = header};
}
QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines)

View File

@ -4,6 +4,7 @@
namespace DB
{
/// Unite several logical streams of data into single logical stream with specified structure.
class UnionStep : public IQueryPlanStep
{
public:

View File

@ -7,6 +7,11 @@ namespace DB
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/** Executes a certain expression over the block.
* The expression consists of column identifiers from the block, constants, common functions.
* For example: hits * 2 + 3, url LIKE '%yandex%'
* The expression processes each row independently of the others.
*/
class ExpressionTransform : public ISimpleTransform
{
public:

View File

@ -8,8 +8,9 @@ namespace DB
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/** Has one input and one output.
* Simply pull a block from input, transform it, and push it to output.
/** Implements WHERE, HAVING operations.
* Takes an expression, which adds to the block one ColumnUInt8 column containing the filtering conditions.
* The expression is evaluated and result chunks contain only the filtered rows.
* If remove_filter_column is true, remove filter column from block.
*/
class FilterTransform : public ISimpleTransform

View File

@ -6,6 +6,7 @@
namespace DB
{
/// Executes LIMIT BY for specified columns.
class LimitByTransform : public ISimpleTransform
{
public:

View File

@ -12,6 +12,8 @@ namespace DB
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
/// Takes sorted separate chunks of data. Sorts them.
/// Returns stream with globally sorted data.
class MergeSortingTransform : public SortingTransform
{
public:

View File

@ -138,7 +138,7 @@ SRCS(
Transforms/SortingTransform.cpp
Transforms/TotalsHavingTransform.cpp
Transforms/AggregatingInOrderTransform.cpp
QueryPlan/AddingDelayedStreamStep.cpp
QueryPlan/AddingDelayedStreamSource.cpp
QueryPlan/AggregatingStep.cpp
QueryPlan/ConvertingStep.cpp
QueryPlan/CreatingSetsStep.cpp