Remove distinct_columns

This commit is contained in:
Igor Nikonov 2023-03-10 20:32:09 +00:00
parent acffc98bd1
commit 73119b920b
22 changed files with 7 additions and 120 deletions

View File

@ -38,7 +38,6 @@ static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false, /// Actually, we may check that distinct names are in aggregation keys
.returns_single_stream = should_produce_results_in_order_of_bucket_number,
.preserves_number_of_streams = false,
.preserves_sorting = false,

View File

@ -14,7 +14,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = false,

View File

@ -40,7 +40,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,

View File

@ -21,7 +21,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,

View File

@ -14,7 +14,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_sorting = false,
@ -32,9 +31,6 @@ CubeStep::CubeStep(const DataStream & input_stream_, Aggregator::Params params_,
, final(final_)
, use_nulls(use_nulls_)
{
/// Aggregation keys are distinct
for (const auto & key : params.keys)
output_stream->distinct_columns.insert(key);
}
ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, bool use_nulls, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number)
@ -89,9 +85,5 @@ void CubeStep::updateOutputStream()
{
output_stream = createOutputStream(
input_streams.front(), generateOutputHeader(params.getHeader(input_streams.front().header, final), params.keys, use_nulls), getDataStreamTraits());
/// Aggregation keys are distinct
for (const auto & key : params.keys)
output_stream->distinct_columns.insert(key);
}
}

View File

@ -10,28 +10,13 @@
namespace DB
{
static bool checkColumnsAlreadyDistinct(const Names & columns, const NameSet & distinct_names)
{
if (distinct_names.empty())
return false;
/// Now we need to check that distinct_names is a subset of columns.
std::unordered_set<std::string_view> columns_set(columns.begin(), columns.end());
for (const auto & name : distinct_names)
if (!columns_set.contains(name))
return false;
return true;
}
static ITransformingStep::Traits getTraits(bool pre_distinct, bool already_distinct_columns)
static ITransformingStep::Traits getTraits(bool pre_distinct)
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = already_distinct_columns, /// Will be calculated separately otherwise
.returns_single_stream = !pre_distinct && !already_distinct_columns,
.preserves_number_of_streams = pre_distinct || already_distinct_columns,
.returns_single_stream = !pre_distinct,
.preserves_number_of_streams = pre_distinct,
.preserves_sorting = true, /// Sorting is preserved indeed because of implementation.
},
{
@ -62,34 +47,23 @@ DistinctStep::DistinctStep(
: ITransformingStep(
input_stream_,
input_stream_.header,
getTraits(pre_distinct_, checkColumnsAlreadyDistinct(columns_, input_stream_.distinct_columns)))
getTraits(pre_distinct_))
, set_size_limits(set_size_limits_)
, limit_hint(limit_hint_)
, columns(columns_)
, pre_distinct(pre_distinct_)
, optimize_distinct_in_order(optimize_distinct_in_order_)
{
if (!output_stream->distinct_columns.empty() /// Columns already distinct, do nothing
&& (!pre_distinct /// Main distinct
|| input_stream_.has_single_port)) /// pre_distinct for single port works as usual one
{
/// Build distinct set.
for (const auto & name : columns)
output_stream->distinct_columns.insert(name);
}
}
void DistinctStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
const auto & input_stream = input_streams.back();
if (checkColumnsAlreadyDistinct(columns, input_stream.distinct_columns))
return;
if (!pre_distinct)
pipeline.resize(1);
if (optimize_distinct_in_order)
{
const auto & input_stream = input_streams.back();
const SortDescription distinct_sort_desc = getSortDescription(input_stream.sort_description, columns);
if (!distinct_sort_desc.empty())
{
@ -197,16 +171,7 @@ void DistinctStep::updateOutputStream()
output_stream = createOutputStream(
input_streams.front(),
input_streams.front().header,
getTraits(pre_distinct, checkColumnsAlreadyDistinct(columns, input_streams.front().distinct_columns)).data_stream_traits);
if (!output_stream->distinct_columns.empty() /// Columns already distinct, do nothing
&& (!pre_distinct /// Main distinct
|| input_streams.front().has_single_port)) /// pre_distinct for single port works as usual one
{
/// Build distinct set.
for (const auto & name : columns)
output_stream->distinct_columns.insert(name);
}
getTraits(pre_distinct).data_stream_traits);
}
}

View File

@ -15,7 +15,6 @@ static ITransformingStep::Traits getTraits(const ActionsDAGPtr & actions, const
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = !actions->hasArrayJoin(),
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = actions->isSortingPreserved(header, sort_description),
@ -33,8 +32,6 @@ ExpressionStep::ExpressionStep(const DataStream & input_stream_, const ActionsDA
getTraits(actions_dag_, input_stream_.header, input_stream_.sort_description))
, actions_dag(actions_dag_)
{
/// Some columns may be removed by expression.
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void ExpressionStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)

View File

@ -9,7 +9,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,

View File

@ -17,7 +17,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false, /// TODO: it seem to actually be true. Check it later.
.returns_single_stream = true,
.preserves_number_of_streams = true,
.preserves_sorting = true,

View File

@ -23,7 +23,6 @@ static ITransformingStep::Traits getTraits(const ActionsDAGPtr & expression, con
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = !expression->hasArrayJoin(), /// I suppose it actually never happens
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = preserves_sorting,
@ -51,8 +50,6 @@ FilterStep::FilterStep(
, filter_column_name(std::move(filter_column_name_))
, remove_filter_column(remove_filter_column_)
{
/// TODO: it would be easier to remove all expressions from filter step. It should only filter by column name.
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void FilterStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)

View File

@ -23,11 +23,6 @@ class DataStream
public:
Block header;
/// Tuples with those columns are distinct.
/// It doesn't mean that columns are distinct separately.
/// Removing any column from this list breaks this invariant.
NameSet distinct_columns = {};
/// QueryPipeline has single port. Totals or extremes ports are not counted.
bool has_single_port = false;
@ -51,8 +46,7 @@ public:
bool hasEqualPropertiesWith(const DataStream & other) const
{
return distinct_columns == other.distinct_columns
&& has_single_port == other.has_single_port
return has_single_port == other.has_single_port
&& sort_description == other.sort_description
&& (sort_description.empty() || sort_scope == other.sort_scope);
}

View File

@ -20,9 +20,6 @@ DataStream ITransformingStep::createOutputStream(
{
DataStream output_stream{.header = std::move(output_header)};
if (stream_traits.preserves_distinct_columns)
output_stream.distinct_columns = input_stream.distinct_columns;
output_stream.has_single_port = stream_traits.returns_single_stream
|| (input_stream.has_single_port && stream_traits.preserves_number_of_streams);
@ -50,21 +47,6 @@ QueryPipelineBuilderPtr ITransformingStep::updatePipeline(QueryPipelineBuilders
return std::move(pipelines.front());
}
void ITransformingStep::updateDistinctColumns(const Block & res_header, NameSet & distinct_columns)
{
if (distinct_columns.empty())
return;
for (const auto & column : distinct_columns)
{
if (!res_header.has(column))
{
distinct_columns.clear();
break;
}
}
}
void ITransformingStep::describePipeline(FormatSettings & settings) const
{
IQueryPlanStep::describePipeline(processors, settings);

View File

@ -18,11 +18,6 @@ public:
/// They are specified in constructor and cannot be changed.
struct DataStreamTraits
{
/// Keep distinct_columns unchanged.
/// Examples: true for LimitStep, false for ExpressionStep with ARRAY JOIN
/// It some columns may be removed from result header, call updateDistinctColumns
bool preserves_distinct_columns;
/// True if pipeline has single output port after this step.
/// Examples: MergeSortingStep, AggregatingStep
bool returns_single_stream;
@ -69,8 +64,6 @@ public:
input_streams.emplace_back(std::move(input_stream));
updateOutputStream();
updateDistinctColumns(output_stream->header, output_stream->distinct_columns);
}
void describePipeline(FormatSettings & settings) const override;
@ -83,9 +76,6 @@ public:
}
protected:
/// Clear distinct_columns if res_header doesn't contain all of them.
static void updateDistinctColumns(const Block & res_header, NameSet & distinct_columns);
/// Create output stream from header and traits.
static DataStream createOutputStream(
const DataStream & input_stream,

View File

@ -83,7 +83,6 @@ static ITransformingStep::Traits getStorageJoinTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = false,

View File

@ -12,7 +12,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_sorting = true,

View File

@ -12,7 +12,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,

View File

@ -24,7 +24,6 @@ static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = should_produce_results_in_order_of_bucket_number,
.preserves_number_of_streams = false,
.preserves_sorting = false,
@ -62,10 +61,6 @@ MergingAggregatedStep::MergingAggregatedStep(
, should_produce_results_in_order_of_bucket_number(should_produce_results_in_order_of_bucket_number_)
, memory_bound_merging_of_aggregation_results_enabled(memory_bound_merging_of_aggregation_results_enabled_)
{
/// Aggregation keys are distinct
for (const auto & key : params.keys)
output_stream->distinct_columns.insert(key);
if (memoryBoundMergingWillBeUsed() && should_produce_results_in_order_of_bucket_number)
{
output_stream->sort_description = group_by_sort_description;
@ -157,10 +152,6 @@ void MergingAggregatedStep::describeActions(JSONBuilder::JSONMap & map) const
void MergingAggregatedStep::updateOutputStream()
{
output_stream = createOutputStream(input_streams.front(), params.getHeader(input_streams.front().header, final), getDataStreamTraits());
/// Aggregation keys are distinct
for (const auto & key : params.keys)
output_stream->distinct_columns.insert(key);
}
bool MergingAggregatedStep::memoryBoundMergingWillBeUsed() const

View File

@ -12,7 +12,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,

View File

@ -11,7 +11,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_sorting = false,
@ -29,9 +28,6 @@ RollupStep::RollupStep(const DataStream & input_stream_, Aggregator::Params para
, final(final_)
, use_nulls(use_nulls_)
{
/// Aggregation keys are distinct
for (const auto & key : params.keys)
output_stream->distinct_columns.insert(key);
}
ProcessorPtr addGroupingSetForTotals(const Block & header, const Names & keys, bool use_nulls, const BuildQueryPipelineSettings & settings, UInt64 grouping_set_number);
@ -54,10 +50,6 @@ void RollupStep::updateOutputStream()
{
output_stream = createOutputStream(
input_streams.front(), appendGroupingSetColumn(params.getHeader(input_streams.front().header, final)), getDataStreamTraits());
/// Aggregation keys are distinct
for (const auto & key : params.keys)
output_stream->distinct_columns.insert(key);
}

View File

@ -45,7 +45,6 @@ static ITransformingStep::Traits getTraits(size_t limit)
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_sorting = false,

View File

@ -14,7 +14,6 @@ static ITransformingStep::Traits getTraits(bool has_filter)
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_sorting = true,

View File

@ -15,7 +15,6 @@ static ITransformingStep::Traits getTraits()
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,