Merge pull request #67398 from ClickHouse/remove-has_single_port-property

Remove has_single_port property from plan stream.
This commit is contained in:
Nikolai Kochetov 2024-07-31 09:45:04 +00:00 committed by GitHub
commit ad270ccb6e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 12 additions and 17 deletions

View File

@ -134,7 +134,6 @@ AggregatingStep::AggregatingStep(
{
output_stream->sort_description = group_by_sort_description;
output_stream->sort_scope = DataStream::SortScope::Global;
output_stream->has_single_port = true;
}
}
@ -147,7 +146,6 @@ void AggregatingStep::applyOrder(SortDescription sort_description_for_merging_,
{
output_stream->sort_description = group_by_sort_description;
output_stream->sort_scope = DataStream::SortScope::Global;
output_stream->has_single_port = true;
}
explicit_sorting_required_for_aggregation_in_order = false;

View File

@ -10,6 +10,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static ITransformingStep::Traits getTraits(bool pre_distinct)
{
const bool preserves_number_of_streams = pre_distinct;
@ -90,7 +95,8 @@ void DistinctStep::transformPipeline(QueryPipelineBuilder & pipeline, const Buil
/// final distinct for sorted stream (sorting inside and among chunks)
if (input_stream.sort_scope == DataStream::SortScope::Global)
{
assert(input_stream.has_single_port);
if (pipeline.getNumStreams() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "DistinctStep with in-order expects single input");
if (distinct_sort_desc.size() < columns.size())
{

View File

@ -39,12 +39,13 @@ FillingStep::FillingStep(
, interpolate_description(interpolate_description_)
, use_with_fill_by_sorting_prefix(use_with_fill_by_sorting_prefix_)
{
if (!input_stream_.has_single_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FillingStep expects single input");
}
void FillingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
if (pipeline.getNumStreams() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FillingStep expects single input");
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipelineBuilder::StreamType::Totals)
@ -69,9 +70,6 @@ void FillingStep::describeActions(JSONBuilder::JSONMap & map) const
void FillingStep::updateOutputStream()
{
if (!input_streams.front().has_single_port)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FillingStep expects single input");
output_stream = createOutputStream(
input_streams.front(), FillingTransform::transformHeader(input_streams.front().header, sort_description), getDataStreamTraits());
}

View File

@ -28,9 +28,6 @@ class DataStream
public:
Block header;
/// QueryPipeline has single port. Totals or extremes ports are not counted.
bool has_single_port = false;
/// Sorting scope. Please keep the mutual order (more strong mode should have greater value).
enum class SortScope : uint8_t
{
@ -51,8 +48,7 @@ public:
bool hasEqualPropertiesWith(const DataStream & other) const
{
return has_single_port == other.has_single_port
&& sort_description == other.sort_description
return sort_description == other.sort_description
&& (sort_description.empty() || sort_scope == other.sort_scope);
}

View File

@ -20,9 +20,6 @@ DataStream ITransformingStep::createOutputStream(
{
DataStream output_stream{.header = std::move(output_header)};
output_stream.has_single_port = stream_traits.returns_single_stream
|| (input_stream.has_single_port && stream_traits.preserves_number_of_streams);
if (stream_traits.preserves_sorting)
{
output_stream.sort_description = input_stream.sort_description;

View File

@ -6,7 +6,7 @@ namespace DB
{
ReadNothingStep::ReadNothingStep(Block output_header)
: ISourceStep(DataStream{.header = std::move(output_header), .has_single_port = true})
: ISourceStep(DataStream{.header = std::move(output_header)})
{
}