This commit is contained in:
morning-color 2024-07-04 19:58:22 +08:00
parent 355f144cda
commit 5f42e15182
2 changed files with 11 additions and 5 deletions

View File

@ -276,16 +276,22 @@ static void initRowsBeforeLimit(IOutputFormat * output_format)
} }
static void initRowsBeforeAggregation(std::shared_ptr<Processors> processors, IOutputFormat * output_format) static void initRowsBeforeAggregation(std::shared_ptr<Processors> processors, IOutputFormat * output_format)
{ {
bool has_aggregation = false;
if (!processors->empty()) if (!processors->empty())
{ {
RowsBeforeAggregationCounterPtr rows_before_aggregation_at_least = std::make_shared<RowsBeforeStepCounter>(); RowsBeforeAggregationCounterPtr rows_before_aggregation_at_least = std::make_shared<RowsBeforeStepCounter>();
for (auto & processor : *processors) for (auto processor : *processors)
{ {
if (auto transform = std::dynamic_pointer_cast<AggregatingTransform>(processor)) if (auto transform = std::dynamic_pointer_cast<AggregatingTransform>(processor))
{
transform->setRowsBeforeAggregationCounter(rows_before_aggregation_at_least); transform->setRowsBeforeAggregationCounter(rows_before_aggregation_at_least);
if (auto remote = std::dynamic_pointer_cast<RemoteSource>(processor)) has_aggregation = true;
remote->setRowsBeforeAggregationCounter(rows_before_aggregation_at_least);
} }
if (typeid_cast<RemoteSource *>(processor.get()) || typeid_cast<DelayedSource *>(processor.get()))
processor->setRowsBeforeAggregationCounter(rows_before_aggregation_at_least);
}
if (has_aggregation)
rows_before_aggregation_at_least->add(0); rows_before_aggregation_at_least->add(0);
output_format->setRowsBeforeAggregationCounter(rows_before_aggregation_at_least); output_format->setRowsBeforeAggregationCounter(rows_before_aggregation_at_least);
} }