Added preserves_number_of_rows trait.

This commit is contained in:
Nikolai Kochetov 2020-07-27 15:48:33 +03:00
parent b4852e4c69
commit fb3a50024d
23 changed files with 52 additions and 10 deletions

View File

@ -11,6 +11,7 @@ static ITransformingStep::DataStreamTraits getTraits()
.preserves_distinct_columns = false,
.returns_single_stream = false,
.preserves_number_of_streams = false,
.preserves_number_of_rows = false, /// New rows are added from delayed stream
};
}

View File

@ -14,6 +14,7 @@ static ITransformingStep::DataStreamTraits getTraits()
.preserves_distinct_columns = false, /// Actually, we may check that distinct names are in aggregation keys
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_number_of_rows = false,
};
}

View File

@ -13,6 +13,7 @@ static ITransformingStep::DataStreamTraits getTraits()
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_number_of_rows = true,
};
}

View File

@ -13,6 +13,7 @@ static ITransformingStep::DataStreamTraits getTraits()
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_number_of_rows = true,
};
}

View File

@ -12,6 +12,7 @@ static ITransformingStep::DataStreamTraits getTraits()
.preserves_distinct_columns = false,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_number_of_rows = false,
};
}

View File

@ -23,6 +23,7 @@ static ITransformingStep::DataStreamTraits getTraits(bool pre_distinct, bool alr
.preserves_distinct_columns = already_distinct_columns, /// Will be calculated separately otherwise
.returns_single_stream = !pre_distinct && !already_distinct_columns,
.preserves_number_of_streams = pre_distinct || already_distinct_columns,
.preserves_number_of_rows = false,
};
}

View File

@ -15,6 +15,7 @@ static ITransformingStep::DataStreamTraits getTraits(const ExpressionActionsPtr
.preserves_distinct_columns = !expression->hasJoinOrArrayJoin(),
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_number_of_rows = !expression->hasJoinOrArrayJoin(),
};
}

View File

@ -11,6 +11,7 @@ static ITransformingStep::DataStreamTraits getTraits()
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_number_of_rows = true,
};
}

View File

@ -18,6 +18,7 @@ static ITransformingStep::DataStreamTraits getTraits()
.preserves_distinct_columns = false, /// TODO: it seem to actually be true. Check it later.
.returns_single_stream = true,
.preserves_number_of_streams = true,
.preserves_number_of_rows = false,
};
}

View File

@ -14,6 +14,7 @@ static ITransformingStep::DataStreamTraits getTraits(const ExpressionActionsPtr
.preserves_distinct_columns = !expression->hasJoinOrArrayJoin(), /// I suppose it actually never happens
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_number_of_rows = false,
};
}

View File

@ -9,13 +9,14 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
static ITransformingStep::DataStreamTraits getTraits(size_t limit)
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_number_of_rows = limit == 0,
};
}
@ -25,7 +26,7 @@ FinishSortingStep::FinishSortingStep(
SortDescription result_description_,
size_t max_block_size_,
UInt64 limit_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
: ITransformingStep(input_stream_, input_stream_.header, getTraits(limit_))
, prefix_description(std::move(prefix_description_))
, result_description(std::move(result_description_))
, max_block_size(max_block_size_)

View File

@ -6,6 +6,7 @@ namespace DB
ITransformingStep::ITransformingStep(DataStream input_stream, Block output_header, DataStreamTraits traits, bool collect_processors_)
: collect_processors(collect_processors_)
, transform_traits(traits)
{
output_stream = DataStream{.header = std::move(output_header)};

View File

@ -23,6 +23,10 @@ public:
/// Won't change the number of ports for pipeline.
/// Examples: true for ExpressionStep, false for MergeSortingStep
bool preserves_number_of_streams;
/// Won't change the total number of rows.
/// Examples: true ExpressionStep (without join or array join), false for FilterStep
bool preserves_number_of_rows;
};
ITransformingStep(DataStream input_stream, Block output_header, DataStreamTraits traits, bool collect_processors_ = true);
@ -31,6 +35,8 @@ public:
virtual void transformPipeline(QueryPipeline & pipeline) = 0;
const DataStreamTraits & getTransformTraits() const { return transform_traits; }
void describePipeline(FormatSettings & settings) const override;
protected:
@ -41,6 +47,8 @@ private:
/// We collect processors got after pipeline transformation.
Processors processors;
bool collect_processors;
DataStreamTraits transform_traits;
};
}

View File

@ -13,6 +13,7 @@ static ITransformingStep::DataStreamTraits getTraits()
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_number_of_rows = false,
};
}

View File

@ -13,6 +13,7 @@ static ITransformingStep::DataStreamTraits getTraits()
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_number_of_rows = false,
};
}

View File

@ -6,13 +6,14 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
static ITransformingStep::DataStreamTraits getTraits(size_t limit)
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_number_of_rows = limit == 0,
};
}
@ -25,7 +26,7 @@ MergeSortingStep::MergeSortingStep(
size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_,
size_t min_free_disk_space_)
: ITransformingStep(input_stream, input_stream.header, getTraits())
: ITransformingStep(input_stream, input_stream.header, getTraits(limit_))
, description(description_)
, max_merged_block_size(max_merged_block_size_)
, limit(limit_)

View File

@ -14,6 +14,7 @@ static ITransformingStep::DataStreamTraits getTraits()
.preserves_distinct_columns = false,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_number_of_rows = false,
};
}

View File

@ -6,13 +6,14 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
static ITransformingStep::DataStreamTraits getTraits(size_t limit)
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_number_of_rows = limit == 0,
};
}
@ -21,7 +22,7 @@ MergingSortedStep::MergingSortedStep(
SortDescription sort_description_,
size_t max_block_size_,
UInt64 limit_)
: ITransformingStep(input_stream, input_stream.header, getTraits())
: ITransformingStep(input_stream, input_stream.header, getTraits(limit_))
, sort_description(std::move(sort_description_))
, max_block_size(max_block_size_)
, limit(limit_)

View File

@ -13,6 +13,7 @@ static ITransformingStep::DataStreamTraits getTraits()
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_number_of_rows = false,
};
}

View File

@ -7,13 +7,14 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
static ITransformingStep::DataStreamTraits getTraits(size_t limit)
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_number_of_rows = limit == 0,
};
}
@ -22,7 +23,7 @@ PartialSortingStep::PartialSortingStep(
SortDescription sort_description_,
UInt64 limit_,
SizeLimits size_limits_)
: ITransformingStep(input_stream, input_stream.header, getTraits())
: ITransformingStep(input_stream, input_stream.header, getTraits(limit_))
, sort_description(std::move(sort_description_))
, limit(limit_)
, size_limits(size_limits_)

View File

@ -315,7 +315,20 @@ static void tryPushDownLimit(QueryPlanStepPtr & parent, QueryPlanStepPtr & child
if (!limit)
return;
const auto * transforming = typeid_cast<const ITransformingStep *>(child.get());
/// Skip everything which is not transform.
if (!transforming)
return;
/// Now we should decide if pushing down limit possible for this step.
/// Cannot push down if child changes the number of rows.
if (!transforming->getTransformTraits().preserves_number_of_rows)
return;
/// ExtremesStep ? , FinishSorting, MergeSorting, MergingSorted, PartialSorting
}
void QueryPlan::optimize()

View File

@ -12,6 +12,7 @@ static ITransformingStep::DataStreamTraits getTraits()
.preserves_distinct_columns = false,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_number_of_rows = false,
};
}

View File

@ -8,13 +8,14 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
static ITransformingStep::DataStreamTraits getTraits(bool has_filter)
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_number_of_rows = !has_filter,
};
}
@ -29,7 +30,7 @@ TotalsHavingStep::TotalsHavingStep(
: ITransformingStep(
input_stream_,
TotalsHavingTransform::transformHeader(input_stream_.header, expression_, final_),
getTraits())
getTraits(!filter_column_.empty()))
, overflow_row(overflow_row_)
, expression(expression_)
, filter_column_name(filter_column_)