Split traits of ITransformingStep to DataStreamTraits and TransformTraits.

This commit is contained in:
Nikolai Kochetov 2020-07-28 16:10:48 +03:00
parent 09b8cb0e5f
commit e6508db4f0
23 changed files with 180 additions and 82 deletions

View File

@ -4,15 +4,19 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = false,
.returns_single_stream = false,
.preserves_number_of_streams = false,
.preserves_number_of_rows = false, /// New rows are added from delayed stream
.preserves_sorting = false,
},
{
.preserves_number_of_rows = false, /// New rows are added from delayed stream
}
};
}

View File

@ -7,15 +7,19 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = false, /// Actually, we may check that distinct names are in aggregation keys
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_number_of_rows = false,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = false,
}
};
}

View File

@ -6,15 +6,19 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_number_of_rows = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = true,
}
};
}

View File

@ -6,15 +6,19 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_number_of_rows = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = true,
}
};
}

View File

@ -5,15 +5,19 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = false,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_number_of_rows = false,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = false,
}
};
}

View File

@ -16,15 +16,19 @@ static bool checkColumnsAlreadyDistinct(const Names & columns, const NameSet & d
return columns_already_distinct;
}
static ITransformingStep::DataStreamTraits getTraits(bool pre_distinct, bool already_distinct_columns)
static ITransformingStep::Traits getTraits(bool pre_distinct, bool already_distinct_columns)
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = already_distinct_columns, /// Will be calculated separately otherwise
.returns_single_stream = !pre_distinct && !already_distinct_columns,
.preserves_number_of_streams = pre_distinct || already_distinct_columns,
.preserves_number_of_rows = false,
.preserves_sorting = true, /// Sorting is preserved indeed because of implementation.
},
{
.preserves_number_of_rows = false,
}
};
}

View File

@ -8,15 +8,19 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits(const ExpressionActionsPtr & expression)
static ITransformingStep::Traits getTraits(const ExpressionActionsPtr & expression)
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = !expression->hasJoinOrArrayJoin(),
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_number_of_rows = !expression->hasJoinOrArrayJoin(),
.preserves_sorting = !expression->hasJoinOrArrayJoin(),
},
{
.preserves_number_of_rows = !expression->hasJoinOrArrayJoin(),
}
};
}

View File

@ -4,15 +4,19 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_number_of_rows = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = true,
}
};
}

View File

@ -11,15 +11,19 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
static ITransformingStep::DataStreamTraits getTraits()
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = false, /// TODO: it seem to actually be true. Check it later.
.returns_single_stream = true,
.preserves_number_of_streams = true,
.preserves_number_of_rows = false,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = false,
}
};
}

View File

@ -7,15 +7,19 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits(const ExpressionActionsPtr & expression)
static ITransformingStep::Traits getTraits(const ExpressionActionsPtr & expression)
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = !expression->hasJoinOrArrayJoin(), /// I suppose it actually never happens
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_number_of_rows = false,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = false,
}
};
}

View File

@ -9,15 +9,19 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits(size_t limit)
static ITransformingStep::Traits getTraits(size_t limit)
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_number_of_rows = limit == 0,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = limit == 0,
}
};
}

View File

@ -4,19 +4,20 @@
namespace DB
{
ITransformingStep::ITransformingStep(DataStream input_stream, Block output_header, DataStreamTraits traits, bool collect_processors_)
: collect_processors(collect_processors_)
, transform_traits(traits)
ITransformingStep::ITransformingStep(DataStream input_stream, Block output_header, Traits traits, bool collect_processors_)
: transform_traits(std::move(traits.transform_traits))
, collect_processors(collect_processors_)
, data_stream_traits(std::move(traits.data_stream_traits))
{
output_stream = DataStream{.header = std::move(output_header)};
if (traits.preserves_distinct_columns)
if (data_stream_traits.preserves_distinct_columns)
output_stream->distinct_columns = input_stream.distinct_columns;
output_stream->has_single_port = traits.returns_single_stream
|| (input_stream.has_single_port && traits.preserves_number_of_streams);
output_stream->has_single_port = data_stream_traits.returns_single_stream
|| (input_stream.has_single_port && data_stream_traits.preserves_number_of_streams);
if (traits.preserves_sorting)
if (data_stream_traits.preserves_sorting)
{
output_stream->sort_description = input_stream.sort_description;
output_stream->sort_mode = input_stream.sort_mode;

View File

@ -9,6 +9,8 @@ namespace DB
class ITransformingStep : public IQueryPlanStep
{
public:
/// This flags are used to automatically set properties for output stream.
/// They are specified in constructor and cannot be changed.
struct DataStreamTraits
{
/// Keep distinct_columns unchanged.
@ -24,22 +26,34 @@ public:
/// Examples: true for ExpressionStep, false for MergeSortingStep
bool preserves_number_of_streams;
/// Won't change the total number of rows.
/// Examples: true ExpressionStep (without join or array join), false for FilterStep
bool preserves_number_of_rows;
/// Doesn't change row order.
/// Examples: true FilterStep, false for PartialSortingStep
bool preserves_sorting;
};
ITransformingStep(DataStream input_stream, Block output_header, DataStreamTraits traits, bool collect_processors_ = true);
/// This flags are used by QueryPlan optimizers.
/// They can be changed after some optimizations.
struct TransformTraits
{
/// Won't change the total number of rows.
/// Examples: true ExpressionStep (without join or array join), false for FilterStep
bool preserves_number_of_rows;
};
struct Traits
{
DataStreamTraits data_stream_traits;
TransformTraits transform_traits;
};
ITransformingStep(DataStream input_stream, Block output_header, Traits traits, bool collect_processors_ = true);
QueryPipelinePtr updatePipeline(QueryPipelines pipelines) override;
virtual void transformPipeline(QueryPipeline & pipeline) = 0;
const DataStreamTraits & getTransformTraits() const { return transform_traits; }
const TransformTraits & getTransformTraits() const { return transform_traits; }
const DataStreamTraits & getDataStreamTraits() const { return data_stream_traits; }
void describePipeline(FormatSettings & settings) const override;
@ -47,12 +61,14 @@ protected:
/// Clear distinct_columns if res_header doesn't contain all of them.
static void updateDistinctColumns(const Block & res_header, NameSet & distinct_columns);
TransformTraits transform_traits;
private:
/// We collect processors got after pipeline transformation.
Processors processors;
bool collect_processors;
DataStreamTraits transform_traits;
const DataStreamTraits data_stream_traits;
};
}

View File

@ -6,15 +6,19 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_number_of_rows = false,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = false,
}
};
}

View File

@ -6,15 +6,19 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_number_of_rows = false,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = false,
}
};
}

View File

@ -6,15 +6,19 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits(size_t limit)
static ITransformingStep::Traits getTraits(size_t limit)
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_number_of_rows = limit == 0,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = limit == 0,
}
};
}

View File

@ -7,15 +7,19 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = false,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_number_of_rows = false,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = false,
}
};
}

View File

@ -6,15 +6,19 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits(size_t limit)
static ITransformingStep::Traits getTraits(size_t limit)
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_number_of_rows = limit == 0,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = limit == 0,
}
};
}

View File

@ -6,15 +6,19 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_number_of_rows = false,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = false,
}
};
}

View File

@ -7,15 +7,19 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits(size_t limit)
static ITransformingStep::Traits getTraits(size_t limit)
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_number_of_rows = limit == 0,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = limit == 0,
}
};
}

View File

@ -315,7 +315,7 @@ static void tryPushDownLimit(QueryPlanStepPtr & parent, QueryPlanStepPtr & child
if (!limit)
return;
const auto * transforming = typeid_cast<const ITransformingStep *>(child.get());
const auto * transforming = dynamic_cast<const ITransformingStep *>(child.get());
/// Skip everything which is not transform.
if (!transforming)
@ -323,14 +323,15 @@ static void tryPushDownLimit(QueryPlanStepPtr & parent, QueryPlanStepPtr & child
/// Now we should decide if pushing down limit possible for this step.
const auto & traits = transforming->getTransformTraits();
const auto & transform_traits = transforming->getTransformTraits();
const auto & data_stream_traits = transforming->getDataStreamTraits();
/// Cannot push down if child changes the number of rows.
if (!traits.preserves_number_of_rows)
if (!transform_traits.preserves_number_of_rows)
return;
/// Cannot push down if data was sorted exactly by child stream.
if (!child->getOutputStream().sort_description.empty() && !traits.preserves_sorting)
if (!child->getOutputStream().sort_description.empty() && !data_stream_traits.preserves_sorting)
return;
parent.swap(child);

View File

@ -5,15 +5,19 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits()
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = false,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_number_of_rows = false,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = false,
}
};
}

View File

@ -8,15 +8,19 @@
namespace DB
{
static ITransformingStep::DataStreamTraits getTraits(bool has_filter)
static ITransformingStep::Traits getTraits(bool has_filter)
{
return ITransformingStep::Traits
{
return ITransformingStep::DataStreamTraits
{
.preserves_distinct_columns = true,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_number_of_rows = !has_filter,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = !has_filter,
}
};
}