From d7d334bf6f046788da3aaf669c4c358724c9b3d0 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 22 Jun 2020 13:18:28 +0300 Subject: [PATCH] Update query plan traits for DISTINCT. Add more comments. --- src/Interpreters/InterpreterSelectQuery.cpp | 4 +- src/Processors/LimitTransform.h | 2 +- src/Processors/OffsetTransform.h | 2 +- ...Step.cpp => AddingDelayedStreamSource.cpp} | 10 +-- ...reamStep.h => AddingDelayedStreamSource.h} | 8 ++- src/Processors/QueryPlan/AggregatingStep.cpp | 4 +- src/Processors/QueryPlan/AggregatingStep.h | 1 + src/Processors/QueryPlan/ConvertingStep.cpp | 28 ++------ src/Processors/QueryPlan/ConvertingStep.h | 1 + src/Processors/QueryPlan/CreatingSetsStep.cpp | 4 +- src/Processors/QueryPlan/CreatingSetsStep.h | 1 + src/Processors/QueryPlan/CubeStep.cpp | 4 +- src/Processors/QueryPlan/CubeStep.h | 1 + src/Processors/QueryPlan/DistinctStep.cpp | 69 ++++++++++--------- src/Processors/QueryPlan/DistinctStep.h | 1 + src/Processors/QueryPlan/ExpressionStep.cpp | 24 ++----- src/Processors/QueryPlan/ExpressionStep.h | 3 +- src/Processors/QueryPlan/ExtremesStep.cpp | 4 +- src/Processors/QueryPlan/ExtremesStep.h | 3 +- src/Processors/QueryPlan/FillingStep.cpp | 6 +- src/Processors/QueryPlan/FillingStep.h | 1 + src/Processors/QueryPlan/FilterStep.cpp | 21 ++---- src/Processors/QueryPlan/FilterStep.h | 1 + .../QueryPlan/FinishSortingStep.cpp | 7 +- src/Processors/QueryPlan/FinishSortingStep.h | 1 + src/Processors/QueryPlan/IQueryPlanStep.h | 8 ++- src/Processors/QueryPlan/ISourceStep.h | 1 + .../QueryPlan/ITransformingStep.cpp | 22 +++++- src/Processors/QueryPlan/ITransformingStep.h | 17 +++++ src/Processors/QueryPlan/LimitByStep.cpp | 4 +- src/Processors/QueryPlan/LimitByStep.h | 1 + src/Processors/QueryPlan/LimitStep.cpp | 5 +- src/Processors/QueryPlan/LimitStep.h | 5 +- src/Processors/QueryPlan/MergeSortingStep.cpp | 4 +- src/Processors/QueryPlan/MergeSortingStep.h | 1 + .../QueryPlan/MergingAggregatedStep.cpp | 6 +- .../QueryPlan/MergingAggregatedStep.h | 1 + .../QueryPlan/MergingSortedStep.cpp | 7 +- src/Processors/QueryPlan/MergingSortedStep.h | 1 + src/Processors/QueryPlan/OffsetsStep.cpp | 13 ++-- src/Processors/QueryPlan/OffsetsStep.h | 1 + .../QueryPlan/PartialSortingStep.cpp | 4 +- src/Processors/QueryPlan/PartialSortingStep.h | 1 + src/Processors/QueryPlan/QueryPlan.h | 5 +- .../QueryPlan/ReadFromPreparedSource.cpp | 2 +- .../QueryPlan/ReadFromPreparedSource.h | 1 + .../QueryPlan/ReadFromStorageStep.cpp | 2 +- src/Processors/QueryPlan/ReadNothingStep.cpp | 2 +- src/Processors/QueryPlan/ReadNothingStep.h | 1 + src/Processors/QueryPlan/RollupStep.cpp | 4 +- src/Processors/QueryPlan/RollupStep.h | 1 + src/Processors/QueryPlan/TotalsHavingStep.cpp | 4 +- src/Processors/QueryPlan/TotalsHavingStep.h | 1 + src/Processors/QueryPlan/UnionStep.cpp | 6 +- src/Processors/QueryPlan/UnionStep.h | 1 + .../Transforms/ExpressionTransform.h | 5 ++ src/Processors/Transforms/FilterTransform.h | 5 +- src/Processors/Transforms/LimitByTransform.h | 1 + .../Transforms/MergeSortingTransform.h | 2 + src/Processors/ya.make | 2 +- 60 files changed, 211 insertions(+), 147 deletions(-) rename src/Processors/QueryPlan/{AddingDelayedStreamStep.cpp => AddingDelayedStreamSource.cpp} (55%) rename src/Processors/QueryPlan/{AddingDelayedStreamStep.h => AddingDelayedStreamSource.h} (50%) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 042ab9e093c..88056d5cf49 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -45,7 +45,7 @@ #include #include #include -#include +#include #include #include #include @@ -891,7 +891,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu if (auto stream = join->createStreamWithNonJoinedRows(join_result_sample, settings.max_block_size)) { auto source = std::make_shared(std::move(stream)); - auto add_non_joined_rows_step = std::make_unique( + auto add_non_joined_rows_step = std::make_unique( query_plan.getCurrentDataStream(), std::move(source)); add_non_joined_rows_step->setStepDescription("Add non-joined rows after JOIN"); diff --git a/src/Processors/LimitTransform.h b/src/Processors/LimitTransform.h index a6989483c00..ffa151bc064 100644 --- a/src/Processors/LimitTransform.h +++ b/src/Processors/LimitTransform.h @@ -33,7 +33,7 @@ private: RowsBeforeLimitCounterPtr rows_before_limit_at_least; /// State of port's pair. - /// Chunks from different port pairs are not mixed for berret cache locality. + /// Chunks from different port pairs are not mixed for better cache locality. struct PortsData { Chunk current_chunk; diff --git a/src/Processors/OffsetTransform.h b/src/Processors/OffsetTransform.h index 3fee4e791a5..905e8298d15 100644 --- a/src/Processors/OffsetTransform.h +++ b/src/Processors/OffsetTransform.h @@ -20,7 +20,7 @@ private: RowsBeforeLimitCounterPtr rows_before_limit_at_least; /// State of port's pair. - /// Chunks from different port pairs are not mixed for berret cache locality. + /// Chunks from different port pairs are not mixed for better cache locality. struct PortsData { Chunk current_chunk; diff --git a/src/Processors/QueryPlan/AddingDelayedStreamStep.cpp b/src/Processors/QueryPlan/AddingDelayedStreamSource.cpp similarity index 55% rename from src/Processors/QueryPlan/AddingDelayedStreamStep.cpp rename to src/Processors/QueryPlan/AddingDelayedStreamSource.cpp index 522094cb790..b55bd265083 100644 --- a/src/Processors/QueryPlan/AddingDelayedStreamStep.cpp +++ b/src/Processors/QueryPlan/AddingDelayedStreamSource.cpp @@ -1,4 +1,4 @@ -#include +#include #include namespace DB @@ -8,11 +8,13 @@ static ITransformingStep::DataStreamTraits getTraits() { return ITransformingStep::DataStreamTraits { - .preserves_distinct_columns = false + .preserves_distinct_columns = false, + .returns_single_stream = false, + .preserves_number_of_streams = false, }; } -AddingDelayedStreamStep::AddingDelayedStreamStep( +AddingDelayedStreamSource::AddingDelayedStreamSource( const DataStream & input_stream_, ProcessorPtr source_) : ITransformingStep(input_stream_, input_stream_.header, getTraits()) @@ -20,7 +22,7 @@ AddingDelayedStreamStep::AddingDelayedStreamStep( { } -void AddingDelayedStreamStep::transformPipeline(QueryPipeline & pipeline) +void AddingDelayedStreamSource::transformPipeline(QueryPipeline & pipeline) { pipeline.addDelayedStream(source); } diff --git a/src/Processors/QueryPlan/AddingDelayedStreamStep.h b/src/Processors/QueryPlan/AddingDelayedStreamSource.h similarity index 50% rename from src/Processors/QueryPlan/AddingDelayedStreamStep.h rename to src/Processors/QueryPlan/AddingDelayedStreamSource.h index 4bf2c62c905..8b2d6673ab3 100644 --- a/src/Processors/QueryPlan/AddingDelayedStreamStep.h +++ b/src/Processors/QueryPlan/AddingDelayedStreamSource.h @@ -8,14 +8,16 @@ namespace DB class IProcessor; using ProcessorPtr = std::shared_ptr; -class AddingDelayedStreamStep : public ITransformingStep +/// Adds another source to pipeline. Data from this source will be read after data from all other sources. +/// NOTE: tis step is needed because of non-joined data from JOIN. Remove this step after adding JoinStep. +class AddingDelayedStreamSource : public ITransformingStep { public: - AddingDelayedStreamStep( + AddingDelayedStreamSource( const DataStream & input_stream_, ProcessorPtr source_); - String getName() const override { return "AddingDelayedStream"; } + String getName() const override { return "AddingDelayedSource"; } void transformPipeline(QueryPipeline & pipeline) override; diff --git a/src/Processors/QueryPlan/AggregatingStep.cpp b/src/Processors/QueryPlan/AggregatingStep.cpp index 47cb444654c..cc6e1833a72 100644 --- a/src/Processors/QueryPlan/AggregatingStep.cpp +++ b/src/Processors/QueryPlan/AggregatingStep.cpp @@ -11,7 +11,9 @@ static ITransformingStep::DataStreamTraits getTraits() { return ITransformingStep::DataStreamTraits { - .preserves_distinct_columns = false /// Actually, we may check that distinct names are in aggregation keys + .preserves_distinct_columns = false, /// Actually, we may check that distinct names are in aggregation keys + .returns_single_stream = true, + .preserves_number_of_streams = false, }; } diff --git a/src/Processors/QueryPlan/AggregatingStep.h b/src/Processors/QueryPlan/AggregatingStep.h index df75eee1b3f..e1be5ff0d34 100644 --- a/src/Processors/QueryPlan/AggregatingStep.h +++ b/src/Processors/QueryPlan/AggregatingStep.h @@ -10,6 +10,7 @@ namespace DB struct AggregatingTransformParams; using AggregatingTransformParamsPtr = std::shared_ptr; +/// Aggregation. See AggregatingTransform. class AggregatingStep : public ITransformingStep { public: diff --git a/src/Processors/QueryPlan/ConvertingStep.cpp b/src/Processors/QueryPlan/ConvertingStep.cpp index 99287c50b90..1c4c3529e80 100644 --- a/src/Processors/QueryPlan/ConvertingStep.cpp +++ b/src/Processors/QueryPlan/ConvertingStep.cpp @@ -9,33 +9,17 @@ static ITransformingStep::DataStreamTraits getTraits() { return ITransformingStep::DataStreamTraits { - .preserves_distinct_columns = true + .preserves_distinct_columns = true, + .returns_single_stream = false, + .preserves_number_of_streams = true, }; } -static void filterDistinctColumns(const Block & res_header, NameSet & distinct_columns) -{ - if (distinct_columns.empty()) - return; - - NameSet new_distinct_columns; - for (const auto & column : res_header) - if (distinct_columns.count(column.name)) - new_distinct_columns.insert(column.name); - - distinct_columns.swap(new_distinct_columns); -} - ConvertingStep::ConvertingStep(const DataStream & input_stream_, Block result_header_) - : ITransformingStep( - input_stream_, - result_header_, - getTraits()) - , result_header(std::move(result_header_)) + : ITransformingStep(input_stream_, result_header_, getTraits()) + , result_header(std::move(result_header_)) { - /// Some columns may be removed - filterDistinctColumns(output_stream->header, output_stream->distinct_columns); - filterDistinctColumns(output_stream->header, output_stream->local_distinct_columns); + updateDistinctColumns(output_stream->header, output_stream->distinct_columns); } void ConvertingStep::transformPipeline(QueryPipeline & pipeline) diff --git a/src/Processors/QueryPlan/ConvertingStep.h b/src/Processors/QueryPlan/ConvertingStep.h index 540deece246..cb53dfcbd5f 100644 --- a/src/Processors/QueryPlan/ConvertingStep.h +++ b/src/Processors/QueryPlan/ConvertingStep.h @@ -4,6 +4,7 @@ namespace DB { +/// Convert one block structure to another. See ConvertingTransform. class ConvertingStep : public ITransformingStep { public: diff --git a/src/Processors/QueryPlan/CreatingSetsStep.cpp b/src/Processors/QueryPlan/CreatingSetsStep.cpp index 4480fd53f32..e402318ad70 100644 --- a/src/Processors/QueryPlan/CreatingSetsStep.cpp +++ b/src/Processors/QueryPlan/CreatingSetsStep.cpp @@ -9,7 +9,9 @@ static ITransformingStep::DataStreamTraits getTraits() { return ITransformingStep::DataStreamTraits { - .preserves_distinct_columns = true + .preserves_distinct_columns = true, + .returns_single_stream = false, + .preserves_number_of_streams = true, }; } diff --git a/src/Processors/QueryPlan/CreatingSetsStep.h b/src/Processors/QueryPlan/CreatingSetsStep.h index d3c4db30502..e749b79c9a5 100644 --- a/src/Processors/QueryPlan/CreatingSetsStep.h +++ b/src/Processors/QueryPlan/CreatingSetsStep.h @@ -6,6 +6,7 @@ namespace DB { +/// Creates sets for subqueries and JOIN. See CreatingSetsTransform. class CreatingSetsStep : public ITransformingStep { public: diff --git a/src/Processors/QueryPlan/CubeStep.cpp b/src/Processors/QueryPlan/CubeStep.cpp index 1e95a608655..b9f826a5ac7 100644 --- a/src/Processors/QueryPlan/CubeStep.cpp +++ b/src/Processors/QueryPlan/CubeStep.cpp @@ -9,7 +9,9 @@ static ITransformingStep::DataStreamTraits getTraits() { return ITransformingStep::DataStreamTraits { - .preserves_distinct_columns = false + .preserves_distinct_columns = false, + .returns_single_stream = true, + .preserves_number_of_streams = false, }; } diff --git a/src/Processors/QueryPlan/CubeStep.h b/src/Processors/QueryPlan/CubeStep.h index c04f6a4f854..707f62ce7d6 100644 --- a/src/Processors/QueryPlan/CubeStep.h +++ b/src/Processors/QueryPlan/CubeStep.h @@ -8,6 +8,7 @@ namespace DB struct AggregatingTransformParams; using AggregatingTransformParamsPtr = std::shared_ptr; +/// WITH CUBE. See CubeTransform. class CubeStep : public ITransformingStep { public: diff --git a/src/Processors/QueryPlan/DistinctStep.cpp b/src/Processors/QueryPlan/DistinctStep.cpp index 0e1cab637fa..be810a5f5cd 100644 --- a/src/Processors/QueryPlan/DistinctStep.cpp +++ b/src/Processors/QueryPlan/DistinctStep.cpp @@ -5,35 +5,6 @@ namespace DB { -static ITransformingStep::DataStreamTraits getTraits() -{ - return ITransformingStep::DataStreamTraits - { - .preserves_distinct_columns = true - }; -} - - -DistinctStep::DistinctStep( - const DataStream & input_stream_, - const SizeLimits & set_size_limits_, - UInt64 limit_hint_, - const Names & columns_, - bool pre_distinct_) - : ITransformingStep(input_stream_, input_stream_.header, getTraits()) - , set_size_limits(set_size_limits_) - , limit_hint(limit_hint_) - , columns(columns_) - , pre_distinct(pre_distinct_) -{ - auto & distinct_columns = pre_distinct ? output_stream->local_distinct_columns - : output_stream->distinct_columns; - - /// Add more distinct columns. - for (const auto & name : columns) - distinct_columns.insert(name); -} - static bool checkColumnsAlreadyDistinct(const Names & columns, const NameSet & distinct_names) { bool columns_already_distinct = true; @@ -44,15 +15,47 @@ static bool checkColumnsAlreadyDistinct(const Names & columns, const NameSet & d return columns_already_distinct; } +static ITransformingStep::DataStreamTraits getTraits(bool pre_distinct, bool already_distinct_columns) +{ + return ITransformingStep::DataStreamTraits + { + .preserves_distinct_columns = already_distinct_columns, /// Will be calculated separately otherwise + .returns_single_stream = !pre_distinct && !already_distinct_columns, + .preserves_number_of_streams = pre_distinct || already_distinct_columns, + }; +} + + +DistinctStep::DistinctStep( + const DataStream & input_stream_, + const SizeLimits & set_size_limits_, + UInt64 limit_hint_, + const Names & columns_, + bool pre_distinct_) + : ITransformingStep( + input_stream_, + input_stream_.header, + getTraits(pre_distinct_, checkColumnsAlreadyDistinct(columns_, input_stream_.distinct_columns))) + , set_size_limits(set_size_limits_) + , limit_hint(limit_hint_) + , columns(columns_) + , pre_distinct(pre_distinct_) +{ + if (!output_stream->distinct_columns.empty() /// Columns already distinct, do nothing + && (!pre_distinct /// Main distinct + || input_stream_.has_single_port)) /// pre_distinct for single port works as usual one + { + /// Build distinct set. + for (const auto & name : columns) + output_stream->distinct_columns.insert(name); + } +} + void DistinctStep::transformPipeline(QueryPipeline & pipeline) { if (checkColumnsAlreadyDistinct(columns, input_streams.front().distinct_columns)) return; - if ((pre_distinct || pipeline.getNumStreams() <= 1) - && checkColumnsAlreadyDistinct(columns, input_streams.front().local_distinct_columns)) - return; - if (!pre_distinct) pipeline.resize(1); diff --git a/src/Processors/QueryPlan/DistinctStep.h b/src/Processors/QueryPlan/DistinctStep.h index 5ec6e683bba..4460b3a3764 100644 --- a/src/Processors/QueryPlan/DistinctStep.h +++ b/src/Processors/QueryPlan/DistinctStep.h @@ -5,6 +5,7 @@ namespace DB { +/// Execute DISTINCT for specified columns. class DistinctStep : public ITransformingStep { public: diff --git a/src/Processors/QueryPlan/ExpressionStep.cpp b/src/Processors/QueryPlan/ExpressionStep.cpp index 75c07554318..59027f7d3c6 100644 --- a/src/Processors/QueryPlan/ExpressionStep.cpp +++ b/src/Processors/QueryPlan/ExpressionStep.cpp @@ -11,23 +11,12 @@ static ITransformingStep::DataStreamTraits getTraits(const ExpressionActionsPtr { return ITransformingStep::DataStreamTraits { - .preserves_distinct_columns = !expression->hasJoinOrArrayJoin() + .preserves_distinct_columns = !expression->hasJoinOrArrayJoin(), + .returns_single_stream = false, + .preserves_number_of_streams = true, }; } -static void filterDistinctColumns(const Block & res_header, NameSet & distinct_columns) -{ - if (distinct_columns.empty()) - return; - - NameSet new_distinct_columns; - for (const auto & column : res_header) - if (distinct_columns.count(column.name)) - new_distinct_columns.insert(column.name); - - distinct_columns.swap(new_distinct_columns); -} - ExpressionStep::ExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_, bool default_totals_) : ITransformingStep( input_stream_, @@ -37,9 +26,7 @@ ExpressionStep::ExpressionStep(const DataStream & input_stream_, ExpressionActio , default_totals(default_totals_) { /// Some columns may be removed by expression. - /// TODO: also check aliases, functions and some types of join - filterDistinctColumns(output_stream->header, output_stream->distinct_columns); - filterDistinctColumns(output_stream->header, output_stream->local_distinct_columns); + updateDistinctColumns(output_stream->header, output_stream->distinct_columns); } void ExpressionStep::transformPipeline(QueryPipeline & pipeline) @@ -67,8 +54,7 @@ InflatingExpressionStep::InflatingExpressionStep(const DataStream & input_stream , expression(std::move(expression_)) , default_totals(default_totals_) { - filterDistinctColumns(output_stream->header, output_stream->distinct_columns); - filterDistinctColumns(output_stream->header, output_stream->local_distinct_columns); + updateDistinctColumns(output_stream->header, output_stream->distinct_columns); } void InflatingExpressionStep::transformPipeline(QueryPipeline & pipeline) diff --git a/src/Processors/QueryPlan/ExpressionStep.h b/src/Processors/QueryPlan/ExpressionStep.h index 4f268944c95..5ddbab2db8f 100644 --- a/src/Processors/QueryPlan/ExpressionStep.h +++ b/src/Processors/QueryPlan/ExpressionStep.h @@ -7,6 +7,7 @@ namespace DB class ExpressionActions; using ExpressionActionsPtr = std::shared_ptr; +/// Calculates specified expression. See ExpressionTransform. class ExpressionStep : public ITransformingStep { public: @@ -25,7 +26,7 @@ class InflatingExpressionStep : public ITransformingStep { public: explicit InflatingExpressionStep(const DataStream & input_stream_, ExpressionActionsPtr expression_, bool default_totals_ = false); - String getName() const override { return "Expression"; } + String getName() const override { return "InflatingExpression"; } void transformPipeline(QueryPipeline & pipeline) override; diff --git a/src/Processors/QueryPlan/ExtremesStep.cpp b/src/Processors/QueryPlan/ExtremesStep.cpp index 61777f4241a..15575f02776 100644 --- a/src/Processors/QueryPlan/ExtremesStep.cpp +++ b/src/Processors/QueryPlan/ExtremesStep.cpp @@ -8,7 +8,9 @@ static ITransformingStep::DataStreamTraits getTraits() { return ITransformingStep::DataStreamTraits { - .preserves_distinct_columns = true + .preserves_distinct_columns = true, + .returns_single_stream = false, + .preserves_number_of_streams = true, }; } diff --git a/src/Processors/QueryPlan/ExtremesStep.h b/src/Processors/QueryPlan/ExtremesStep.h index 1a691026196..cd6f27228f9 100644 --- a/src/Processors/QueryPlan/ExtremesStep.h +++ b/src/Processors/QueryPlan/ExtremesStep.h @@ -3,10 +3,11 @@ namespace DB { +/// Calculate extremes. Add special port for extremes. class ExtremesStep : public ITransformingStep { public: - ExtremesStep(const DataStream & input_stream_); + explicit ExtremesStep(const DataStream & input_stream_); String getName() const override { return "Extremes"; } diff --git a/src/Processors/QueryPlan/FillingStep.cpp b/src/Processors/QueryPlan/FillingStep.cpp index 80dba794943..0e1b095dac9 100644 --- a/src/Processors/QueryPlan/FillingStep.cpp +++ b/src/Processors/QueryPlan/FillingStep.cpp @@ -9,7 +9,9 @@ static ITransformingStep::DataStreamTraits getTraits() { return ITransformingStep::DataStreamTraits { - .preserves_distinct_columns = false /// TODO: it seem to actually be true. Check it later. + .preserves_distinct_columns = false, /// TODO: it seem to actually be true. Check it later. + .returns_single_stream = true, + .preserves_number_of_streams = true, }; } @@ -17,6 +19,8 @@ FillingStep::FillingStep(const DataStream & input_stream_, SortDescription sort_ : ITransformingStep(input_stream_, input_stream_.header, getTraits()) , sort_description(std::move(sort_description_)) { + if (!input_stream_.has_single_port) + throw Exception("FillingStep expects single input", ErrorCodes::LOGICAL_ERROR); } void FillingStep::transformPipeline(QueryPipeline & pipeline) diff --git a/src/Processors/QueryPlan/FillingStep.h b/src/Processors/QueryPlan/FillingStep.h index e7ec7ab17d7..fba63a3f0af 100644 --- a/src/Processors/QueryPlan/FillingStep.h +++ b/src/Processors/QueryPlan/FillingStep.h @@ -5,6 +5,7 @@ namespace DB { +/// Implements modifier WITH FILL of ORDER BY clause. See FillingTransform. class FillingStep : public ITransformingStep { public: diff --git a/src/Processors/QueryPlan/FilterStep.cpp b/src/Processors/QueryPlan/FilterStep.cpp index 38ab4471e53..4b9dd449b7c 100644 --- a/src/Processors/QueryPlan/FilterStep.cpp +++ b/src/Processors/QueryPlan/FilterStep.cpp @@ -10,24 +10,12 @@ static ITransformingStep::DataStreamTraits getTraits(const ExpressionActionsPtr { return ITransformingStep::DataStreamTraits { - .preserves_distinct_columns = !expression->hasJoinOrArrayJoin() /// I suppose it actually never happens + .preserves_distinct_columns = !expression->hasJoinOrArrayJoin(), /// I suppose it actually never happens + .returns_single_stream = false, + .preserves_number_of_streams = true, }; } -static void filterDistinctColumns(const Block & res_header, NameSet & distinct_columns) -{ - if (distinct_columns.empty()) - return; - - NameSet new_distinct_columns; - for (const auto & column : res_header) - if (distinct_columns.count(column.name)) - new_distinct_columns.insert(column.name); - - distinct_columns.swap(new_distinct_columns); -} - - FilterStep::FilterStep( const DataStream & input_stream_, ExpressionActionsPtr expression_, @@ -42,8 +30,7 @@ FilterStep::FilterStep( , remove_filter_column(remove_filter_column_) { /// TODO: it would be easier to remove all expressions from filter step. It should only filter by column name. - filterDistinctColumns(output_stream->header, output_stream->distinct_columns); - filterDistinctColumns(output_stream->header, output_stream->local_distinct_columns); + updateDistinctColumns(output_stream->header, output_stream->distinct_columns); } void FilterStep::transformPipeline(QueryPipeline & pipeline) diff --git a/src/Processors/QueryPlan/FilterStep.h b/src/Processors/QueryPlan/FilterStep.h index faadd41a58d..940253955ee 100644 --- a/src/Processors/QueryPlan/FilterStep.h +++ b/src/Processors/QueryPlan/FilterStep.h @@ -7,6 +7,7 @@ namespace DB class ExpressionActions; using ExpressionActionsPtr = std::shared_ptr; +/// Implements WHERE, HAVING operations. See FilterTransform. class FilterStep : public ITransformingStep { public: diff --git a/src/Processors/QueryPlan/FinishSortingStep.cpp b/src/Processors/QueryPlan/FinishSortingStep.cpp index 4b0e6d6a66d..a6a071b7848 100644 --- a/src/Processors/QueryPlan/FinishSortingStep.cpp +++ b/src/Processors/QueryPlan/FinishSortingStep.cpp @@ -12,7 +12,9 @@ static ITransformingStep::DataStreamTraits getTraits() { return ITransformingStep::DataStreamTraits { - .preserves_distinct_columns = true + .preserves_distinct_columns = true, + .returns_single_stream = true, + .preserves_number_of_streams = false, }; } @@ -28,9 +30,6 @@ FinishSortingStep::FinishSortingStep( , max_block_size(max_block_size_) , limit(limit_) { - /// Streams are merged together, only global distinct keys remain distinct. - /// Note: we can not clear it if know that there will be only one stream in pipeline. Should we add info about it? - output_stream->local_distinct_columns.clear(); } void FinishSortingStep::transformPipeline(QueryPipeline & pipeline) diff --git a/src/Processors/QueryPlan/FinishSortingStep.h b/src/Processors/QueryPlan/FinishSortingStep.h index 43bdf261e97..e5bba41d51e 100644 --- a/src/Processors/QueryPlan/FinishSortingStep.h +++ b/src/Processors/QueryPlan/FinishSortingStep.h @@ -5,6 +5,7 @@ namespace DB { +/// Finish sorting of pre-sorted data. See FinishSortingTransform. class FinishSortingStep : public ITransformingStep { public: diff --git a/src/Processors/QueryPlan/IQueryPlanStep.h b/src/Processors/QueryPlan/IQueryPlanStep.h index 4f9e4d3268c..6612f165188 100644 --- a/src/Processors/QueryPlan/IQueryPlanStep.h +++ b/src/Processors/QueryPlan/IQueryPlanStep.h @@ -9,13 +9,19 @@ using QueryPipelinePtr = std::unique_ptr; using QueryPipelines = std::vector; /// Description of data stream. +/// Single logical data stream may relate to many ports of pipeline. class DataStream { public: Block header; + /// Tuples with those columns are distinct. + /// It doesn't mean that columns are distinct separately. + /// Removing any column from this list brakes this invariant. NameSet distinct_columns = {}; - NameSet local_distinct_columns = {}; /// Those columns are distinct in separate thread, but not in general. + + /// QueryPipeline has single port. Totals or extremes ports are not counted. + bool has_single_port = false; /// Things which may be added: /// * sort description diff --git a/src/Processors/QueryPlan/ISourceStep.h b/src/Processors/QueryPlan/ISourceStep.h index 8373a34b57f..7fdfa3e0e38 100644 --- a/src/Processors/QueryPlan/ISourceStep.h +++ b/src/Processors/QueryPlan/ISourceStep.h @@ -4,6 +4,7 @@ namespace DB { +/// Step which takes empty pipeline and initializes it. Returns single logical DataStream. class ISourceStep : public IQueryPlanStep { public: diff --git a/src/Processors/QueryPlan/ITransformingStep.cpp b/src/Processors/QueryPlan/ITransformingStep.cpp index bf4afdfccd4..7884bc9a50a 100644 --- a/src/Processors/QueryPlan/ITransformingStep.cpp +++ b/src/Processors/QueryPlan/ITransformingStep.cpp @@ -6,14 +6,17 @@ namespace DB ITransformingStep::ITransformingStep(DataStream input_stream, Block output_header, DataStreamTraits traits) { - input_streams.emplace_back(std::move(input_stream)); output_stream = DataStream{.header = std::move(output_header)}; if (traits.preserves_distinct_columns) { output_stream->distinct_columns = input_streams.front().distinct_columns; - output_stream->local_distinct_columns = input_streams.front().local_distinct_columns; } + + output_stream->has_single_port = traits.returns_single_stream + || (input_stream.has_single_port && traits.preserves_number_of_streams); + + input_streams.emplace_back(std::move(input_stream)); } QueryPipelinePtr ITransformingStep::updatePipeline(QueryPipelines pipelines) @@ -22,4 +25,19 @@ QueryPipelinePtr ITransformingStep::updatePipeline(QueryPipelines pipelines) return std::move(pipelines.front()); } +void ITransformingStep::updateDistinctColumns(const Block & res_header, NameSet & distinct_columns) +{ + if (distinct_columns.empty()) + return; + + for (const auto & column : res_header) + { + if (distinct_columns.count(column.name) == 0) + { + distinct_columns.clear(); + break; + } + } +} + } diff --git a/src/Processors/QueryPlan/ITransformingStep.h b/src/Processors/QueryPlan/ITransformingStep.h index f18b6ba3c8d..3d5245bef0f 100644 --- a/src/Processors/QueryPlan/ITransformingStep.h +++ b/src/Processors/QueryPlan/ITransformingStep.h @@ -4,12 +4,25 @@ namespace DB { +/// Step which has single input and single output data stream. +/// It doesn't mean that pipeline has single port before or after such step. class ITransformingStep : public IQueryPlanStep { public: struct DataStreamTraits { + /// Keep distinct_columns unchanged. + /// Examples: true for LimitStep, false for ExpressionStep with ARRAY JOIN + /// It some columns may be removed from result header, call updateDistinctColumns bool preserves_distinct_columns; + + /// True if pipeline has single output port after this step. + /// Examples: MergeSortingStep, AggregatingStep + bool returns_single_stream; + + /// Won't change the number of ports for pipeline. + /// Examples: true for ExpressionStep, false for MergeSortingStep + bool preserves_number_of_streams; }; ITransformingStep(DataStream input_stream, Block output_header, DataStreamTraits traits); @@ -17,6 +30,10 @@ public: QueryPipelinePtr updatePipeline(QueryPipelines pipelines) override; virtual void transformPipeline(QueryPipeline & pipeline) = 0; + +protected: + /// Clear distinct_columns if res_header doesn't contain all af them. + static void updateDistinctColumns(const Block & res_header, NameSet & distinct_columns); }; } diff --git a/src/Processors/QueryPlan/LimitByStep.cpp b/src/Processors/QueryPlan/LimitByStep.cpp index e18df84258e..5eea4193666 100644 --- a/src/Processors/QueryPlan/LimitByStep.cpp +++ b/src/Processors/QueryPlan/LimitByStep.cpp @@ -9,7 +9,9 @@ static ITransformingStep::DataStreamTraits getTraits() { return ITransformingStep::DataStreamTraits { - .preserves_distinct_columns = true + .preserves_distinct_columns = true, + .returns_single_stream = true, + .preserves_number_of_streams = false, }; } diff --git a/src/Processors/QueryPlan/LimitByStep.h b/src/Processors/QueryPlan/LimitByStep.h index 744918cb836..bf3943a7f59 100644 --- a/src/Processors/QueryPlan/LimitByStep.h +++ b/src/Processors/QueryPlan/LimitByStep.h @@ -4,6 +4,7 @@ namespace DB { +/// Executes LIMIT BY for specified columns. See LimitByTransform. class LimitByStep : public ITransformingStep { public: diff --git a/src/Processors/QueryPlan/LimitStep.cpp b/src/Processors/QueryPlan/LimitStep.cpp index 4b7928bfc5a..9e11a820567 100644 --- a/src/Processors/QueryPlan/LimitStep.cpp +++ b/src/Processors/QueryPlan/LimitStep.cpp @@ -9,7 +9,9 @@ static ITransformingStep::DataStreamTraits getTraits() { return ITransformingStep::DataStreamTraits { - .preserves_distinct_columns = true + .preserves_distinct_columns = true, + .returns_single_stream = false, + .preserves_number_of_streams = true, }; } @@ -24,7 +26,6 @@ LimitStep::LimitStep( , always_read_till_end(always_read_till_end_) , with_ties(with_ties_), description(std::move(description_)) { - } void LimitStep::transformPipeline(QueryPipeline & pipeline) diff --git a/src/Processors/QueryPlan/LimitStep.h b/src/Processors/QueryPlan/LimitStep.h index 4a12e8f6705..3fd77bea22f 100644 --- a/src/Processors/QueryPlan/LimitStep.h +++ b/src/Processors/QueryPlan/LimitStep.h @@ -6,14 +6,15 @@ namespace DB { +/// Executes LIMIT. See LimitTransform. class LimitStep : public ITransformingStep { public: LimitStep( const DataStream & input_stream_, size_t limit_, size_t offset_, - bool always_read_till_end_ = false, - bool with_ties_ = false, + bool always_read_till_end_ = false, /// Read all data even if limit is reached. Needed for totals. + bool with_ties_ = false, /// Limit with ties. SortDescription description_ = {}); String getName() const override { return "Limit"; } diff --git a/src/Processors/QueryPlan/MergeSortingStep.cpp b/src/Processors/QueryPlan/MergeSortingStep.cpp index 4c8f265c2ca..8fa5f4782f6 100644 --- a/src/Processors/QueryPlan/MergeSortingStep.cpp +++ b/src/Processors/QueryPlan/MergeSortingStep.cpp @@ -9,7 +9,9 @@ static ITransformingStep::DataStreamTraits getTraits() { return ITransformingStep::DataStreamTraits { - .preserves_distinct_columns = true + .preserves_distinct_columns = true, + .returns_single_stream = false, + .preserves_number_of_streams = true, }; } diff --git a/src/Processors/QueryPlan/MergeSortingStep.h b/src/Processors/QueryPlan/MergeSortingStep.h index 3d12bda3139..a0e33a82690 100644 --- a/src/Processors/QueryPlan/MergeSortingStep.h +++ b/src/Processors/QueryPlan/MergeSortingStep.h @@ -7,6 +7,7 @@ namespace DB { +/// Sorts stream of data. See MergeSortingTransform. class MergeSortingStep : public ITransformingStep { public: diff --git a/src/Processors/QueryPlan/MergingAggregatedStep.cpp b/src/Processors/QueryPlan/MergingAggregatedStep.cpp index 459a0b90040..870203842a1 100644 --- a/src/Processors/QueryPlan/MergingAggregatedStep.cpp +++ b/src/Processors/QueryPlan/MergingAggregatedStep.cpp @@ -11,7 +11,9 @@ static ITransformingStep::DataStreamTraits getTraits() { return ITransformingStep::DataStreamTraits { - .preserves_distinct_columns = false + .preserves_distinct_columns = false, + .returns_single_stream = true, + .preserves_number_of_streams = false, }; } @@ -36,7 +38,7 @@ void MergingAggregatedStep::transformPipeline(QueryPipeline & pipeline) { if (!memory_efficient_aggregation) { - /// We union several sources into one, parallelizing the work. + /// We union several sources into one, paralleling the work. pipeline.resize(1); /// Now merge the aggregated blocks diff --git a/src/Processors/QueryPlan/MergingAggregatedStep.h b/src/Processors/QueryPlan/MergingAggregatedStep.h index 51a907285df..c0da5961c52 100644 --- a/src/Processors/QueryPlan/MergingAggregatedStep.h +++ b/src/Processors/QueryPlan/MergingAggregatedStep.h @@ -8,6 +8,7 @@ namespace DB struct AggregatingTransformParams; using AggregatingTransformParamsPtr = std::shared_ptr; +/// This step finishes aggregation. See AggregatingSortedTransform. class MergingAggregatedStep : public ITransformingStep { public: diff --git a/src/Processors/QueryPlan/MergingSortedStep.cpp b/src/Processors/QueryPlan/MergingSortedStep.cpp index 9c4bf874510..dc44a96e45e 100644 --- a/src/Processors/QueryPlan/MergingSortedStep.cpp +++ b/src/Processors/QueryPlan/MergingSortedStep.cpp @@ -9,7 +9,9 @@ static ITransformingStep::DataStreamTraits getTraits() { return ITransformingStep::DataStreamTraits { - .preserves_distinct_columns = true + .preserves_distinct_columns = true, + .returns_single_stream = true, + .preserves_number_of_streams = false, }; } @@ -23,9 +25,6 @@ MergingSortedStep::MergingSortedStep( , max_block_size(max_block_size_) , limit(limit_) { - /// Streams are merged together, only global distinct keys remain distinct. - /// Note: we can not clear it if know that there will be only one stream in pipeline. Should we add info about it? - output_stream->local_distinct_columns.clear(); } void MergingSortedStep::transformPipeline(QueryPipeline & pipeline) diff --git a/src/Processors/QueryPlan/MergingSortedStep.h b/src/Processors/QueryPlan/MergingSortedStep.h index 920073da8cb..19584303239 100644 --- a/src/Processors/QueryPlan/MergingSortedStep.h +++ b/src/Processors/QueryPlan/MergingSortedStep.h @@ -7,6 +7,7 @@ namespace DB { +/// Merge streams of data into single sorted stream. class MergingSortedStep : public ITransformingStep { public: diff --git a/src/Processors/QueryPlan/OffsetsStep.cpp b/src/Processors/QueryPlan/OffsetsStep.cpp index e09a169f4bd..fc6ef1e7c30 100644 --- a/src/Processors/QueryPlan/OffsetsStep.cpp +++ b/src/Processors/QueryPlan/OffsetsStep.cpp @@ -9,7 +9,9 @@ static ITransformingStep::DataStreamTraits getTraits() { return ITransformingStep::DataStreamTraits { - .preserves_distinct_columns = true + .preserves_distinct_columns = true, + .returns_single_stream = false, + .preserves_number_of_streams = true, }; } @@ -21,13 +23,10 @@ OffsetsStep::OffsetsStep(const DataStream & input_stream_, size_t offset_) void OffsetsStep::transformPipeline(QueryPipeline & pipeline) { - pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type) -> ProcessorPtr - { - if (stream_type != QueryPipeline::StreamType::Main) - return nullptr; + auto transform = std::make_shared( + pipeline.getHeader(), offset, pipeline.getNumStreams()); - return std::make_shared(header, offset, 1); - }); + pipeline.addPipe({std::move(transform)}); } } diff --git a/src/Processors/QueryPlan/OffsetsStep.h b/src/Processors/QueryPlan/OffsetsStep.h index 83f0c43dd7d..41efac1a422 100644 --- a/src/Processors/QueryPlan/OffsetsStep.h +++ b/src/Processors/QueryPlan/OffsetsStep.h @@ -5,6 +5,7 @@ namespace DB { +/// Executes OFFSET (without LIMIT). See OffsetTransform. class OffsetsStep : public ITransformingStep { public: diff --git a/src/Processors/QueryPlan/PartialSortingStep.cpp b/src/Processors/QueryPlan/PartialSortingStep.cpp index c8be58eb324..e9400492710 100644 --- a/src/Processors/QueryPlan/PartialSortingStep.cpp +++ b/src/Processors/QueryPlan/PartialSortingStep.cpp @@ -10,7 +10,9 @@ static ITransformingStep::DataStreamTraits getTraits() { return ITransformingStep::DataStreamTraits { - .preserves_distinct_columns = true + .preserves_distinct_columns = true, + .returns_single_stream = false, + .preserves_number_of_streams = true, }; } diff --git a/src/Processors/QueryPlan/PartialSortingStep.h b/src/Processors/QueryPlan/PartialSortingStep.h index c4967e8ec30..c4656b8c9f3 100644 --- a/src/Processors/QueryPlan/PartialSortingStep.h +++ b/src/Processors/QueryPlan/PartialSortingStep.h @@ -6,6 +6,7 @@ namespace DB { +/// Sort separate chunks of data. class PartialSortingStep : public ITransformingStep { public: diff --git a/src/Processors/QueryPlan/QueryPlan.h b/src/Processors/QueryPlan/QueryPlan.h index 45dfd6cf601..9fadc45c3b2 100644 --- a/src/Processors/QueryPlan/QueryPlan.h +++ b/src/Processors/QueryPlan/QueryPlan.h @@ -17,6 +17,8 @@ using QueryPipelinePtr = std::unique_ptr; class Context; /// A tree of query steps. +/// The goal of QueryPlan is to build QueryPipeline. +/// QueryPlan let delay pipeline creation which is helpful for pipeline-level optimisations. class QueryPlan { public: @@ -38,6 +40,7 @@ public: void addInterpreterContext(std::shared_ptr context); private: + /// Tree node. Step and it's children. struct Node { QueryPlanStepPtr step; @@ -52,8 +55,8 @@ private: void checkInitialized() const; void checkNotCompleted() const; + /// Those fields are passed to QueryPipeline. size_t max_threads = 0; - std::vector> interpreter_context; }; diff --git a/src/Processors/QueryPlan/ReadFromPreparedSource.cpp b/src/Processors/QueryPlan/ReadFromPreparedSource.cpp index 47393be1f10..6f0d1693ce0 100644 --- a/src/Processors/QueryPlan/ReadFromPreparedSource.cpp +++ b/src/Processors/QueryPlan/ReadFromPreparedSource.cpp @@ -5,7 +5,7 @@ namespace DB { ReadFromPreparedSource::ReadFromPreparedSource(Pipe pipe_, std::shared_ptr context_) - : ISourceStep(DataStream{.header = pipe_.getHeader()}) + : ISourceStep(DataStream{.header = pipe_.getHeader(), .has_single_port = true}) , pipe(std::move(pipe_)) , context(std::move(context_)) { diff --git a/src/Processors/QueryPlan/ReadFromPreparedSource.h b/src/Processors/QueryPlan/ReadFromPreparedSource.h index e4a4a39a7be..73703f74de2 100644 --- a/src/Processors/QueryPlan/ReadFromPreparedSource.h +++ b/src/Processors/QueryPlan/ReadFromPreparedSource.h @@ -5,6 +5,7 @@ namespace DB { +/// Create source from prepared pipe. class ReadFromPreparedSource : public ISourceStep { public: diff --git a/src/Processors/QueryPlan/ReadFromStorageStep.cpp b/src/Processors/QueryPlan/ReadFromStorageStep.cpp index 83b8682e09c..c28298490b8 100644 --- a/src/Processors/QueryPlan/ReadFromStorageStep.cpp +++ b/src/Processors/QueryPlan/ReadFromStorageStep.cpp @@ -122,7 +122,7 @@ ReadFromStorageStep::ReadFromStorageStep( pipeline->addInterpreterContext(std::move(context)); pipeline->addStorageHolder(std::move(storage)); - output_stream = DataStream{.header = pipeline->getHeader()}; + output_stream = DataStream{.header = pipeline->getHeader(), .has_single_port = pipeline->getNumStreams() == 1}; } ReadFromStorageStep::~ReadFromStorageStep() = default; diff --git a/src/Processors/QueryPlan/ReadNothingStep.cpp b/src/Processors/QueryPlan/ReadNothingStep.cpp index cdf1c248e57..9316ecab481 100644 --- a/src/Processors/QueryPlan/ReadNothingStep.cpp +++ b/src/Processors/QueryPlan/ReadNothingStep.cpp @@ -6,7 +6,7 @@ namespace DB { ReadNothingStep::ReadNothingStep(Block output_header) - : ISourceStep(DataStream{.header = std::move(output_header)}) + : ISourceStep(DataStream{.header = std::move(output_header), .has_single_port = true}) { } diff --git a/src/Processors/QueryPlan/ReadNothingStep.h b/src/Processors/QueryPlan/ReadNothingStep.h index 8580331f6b0..da4740574da 100644 --- a/src/Processors/QueryPlan/ReadNothingStep.h +++ b/src/Processors/QueryPlan/ReadNothingStep.h @@ -4,6 +4,7 @@ namespace DB { +/// Create NullSource with specified structure. class ReadNothingStep : public ISourceStep { public: diff --git a/src/Processors/QueryPlan/RollupStep.cpp b/src/Processors/QueryPlan/RollupStep.cpp index 5e940068ef8..593744f0253 100644 --- a/src/Processors/QueryPlan/RollupStep.cpp +++ b/src/Processors/QueryPlan/RollupStep.cpp @@ -9,7 +9,9 @@ static ITransformingStep::DataStreamTraits getTraits() { return ITransformingStep::DataStreamTraits { - .preserves_distinct_columns = false + .preserves_distinct_columns = false, + .returns_single_stream = true, + .preserves_number_of_streams = false, }; } diff --git a/src/Processors/QueryPlan/RollupStep.h b/src/Processors/QueryPlan/RollupStep.h index 56e8d81e37b..b6d9b2af5bf 100644 --- a/src/Processors/QueryPlan/RollupStep.h +++ b/src/Processors/QueryPlan/RollupStep.h @@ -8,6 +8,7 @@ namespace DB struct AggregatingTransformParams; using AggregatingTransformParamsPtr = std::shared_ptr; +/// WITH ROLLUP. See RollupTransform. class RollupStep : public ITransformingStep { public: diff --git a/src/Processors/QueryPlan/TotalsHavingStep.cpp b/src/Processors/QueryPlan/TotalsHavingStep.cpp index ec3788bc7d3..8464e6d2ba6 100644 --- a/src/Processors/QueryPlan/TotalsHavingStep.cpp +++ b/src/Processors/QueryPlan/TotalsHavingStep.cpp @@ -10,7 +10,9 @@ static ITransformingStep::DataStreamTraits getTraits() { return ITransformingStep::DataStreamTraits { - .preserves_distinct_columns = true + .preserves_distinct_columns = true, + .returns_single_stream = true, + .preserves_number_of_streams = false, }; } diff --git a/src/Processors/QueryPlan/TotalsHavingStep.h b/src/Processors/QueryPlan/TotalsHavingStep.h index 52cc936f622..76d793bff77 100644 --- a/src/Processors/QueryPlan/TotalsHavingStep.h +++ b/src/Processors/QueryPlan/TotalsHavingStep.h @@ -9,6 +9,7 @@ using ExpressionActionsPtr = std::shared_ptr; enum class TotalsMode; +/// Execute HAVING and calculate totals. See TotalsHavingTransform. class TotalsHavingStep : public ITransformingStep { public: diff --git a/src/Processors/QueryPlan/UnionStep.cpp b/src/Processors/QueryPlan/UnionStep.cpp index c39a2fcafda..14a43cac78b 100644 --- a/src/Processors/QueryPlan/UnionStep.cpp +++ b/src/Processors/QueryPlan/UnionStep.cpp @@ -12,8 +12,10 @@ UnionStep::UnionStep(DataStreams input_streams_, Block result_header, size_t max { input_streams = std::move(input_streams_); - /// TODO: update traits - output_stream = DataStream{.header = header}; + if (input_streams.size() == 1) + output_stream = input_streams.front(); + else + output_stream = DataStream{.header = header}; } QueryPipelinePtr UnionStep::updatePipeline(QueryPipelines pipelines) diff --git a/src/Processors/QueryPlan/UnionStep.h b/src/Processors/QueryPlan/UnionStep.h index 2c3d17b2e82..8209c95fa98 100644 --- a/src/Processors/QueryPlan/UnionStep.h +++ b/src/Processors/QueryPlan/UnionStep.h @@ -4,6 +4,7 @@ namespace DB { +/// Unite several logical streams of data into single logical stream with specified structure. class UnionStep : public IQueryPlanStep { public: diff --git a/src/Processors/Transforms/ExpressionTransform.h b/src/Processors/Transforms/ExpressionTransform.h index 60d6dc0f777..e3b82091add 100644 --- a/src/Processors/Transforms/ExpressionTransform.h +++ b/src/Processors/Transforms/ExpressionTransform.h @@ -7,6 +7,11 @@ namespace DB class ExpressionActions; using ExpressionActionsPtr = std::shared_ptr; +/** Executes a certain expression over the block. + * The expression consists of column identifiers from the block, constants, common functions. + * For example: hits * 2 + 3, url LIKE '%yandex%' + * The expression processes each row independently of the others. + */ class ExpressionTransform : public ISimpleTransform { public: diff --git a/src/Processors/Transforms/FilterTransform.h b/src/Processors/Transforms/FilterTransform.h index 0497a339c82..c0ccf0fd072 100644 --- a/src/Processors/Transforms/FilterTransform.h +++ b/src/Processors/Transforms/FilterTransform.h @@ -8,8 +8,9 @@ namespace DB class ExpressionActions; using ExpressionActionsPtr = std::shared_ptr; -/** Has one input and one output. - * Simply pull a block from input, transform it, and push it to output. +/** Implements WHERE, HAVING operations. + * Takes an expression, which adds to the block one ColumnUInt8 column containing the filtering conditions. + * The expression is evaluated and result chunks contain only the filtered rows. * If remove_filter_column is true, remove filter column from block. */ class FilterTransform : public ISimpleTransform diff --git a/src/Processors/Transforms/LimitByTransform.h b/src/Processors/Transforms/LimitByTransform.h index 344cf6feea4..815114946c8 100644 --- a/src/Processors/Transforms/LimitByTransform.h +++ b/src/Processors/Transforms/LimitByTransform.h @@ -6,6 +6,7 @@ namespace DB { +/// Executes LIMIT BY for specified columns. class LimitByTransform : public ISimpleTransform { public: diff --git a/src/Processors/Transforms/MergeSortingTransform.h b/src/Processors/Transforms/MergeSortingTransform.h index 22812e08b40..d328cb818d5 100644 --- a/src/Processors/Transforms/MergeSortingTransform.h +++ b/src/Processors/Transforms/MergeSortingTransform.h @@ -12,6 +12,8 @@ namespace DB class IVolume; using VolumePtr = std::shared_ptr; +/// Takes sorted separate chunks of data. Sorts them. +/// Returns stream with globally sorted data. class MergeSortingTransform : public SortingTransform { public: diff --git a/src/Processors/ya.make b/src/Processors/ya.make index 4e6ec2372da..89b48d2b04b 100644 --- a/src/Processors/ya.make +++ b/src/Processors/ya.make @@ -138,7 +138,7 @@ SRCS( Transforms/SortingTransform.cpp Transforms/TotalsHavingTransform.cpp Transforms/AggregatingInOrderTransform.cpp - QueryPlan/AddingDelayedStreamStep.cpp + QueryPlan/AddingDelayedStreamSource.cpp QueryPlan/AggregatingStep.cpp QueryPlan/ConvertingStep.cpp QueryPlan/CreatingSetsStep.cpp