fix: Use correct header for the totals

This commit is contained in:
Sergey Skvortsov 2022-06-20 22:30:56 +03:00
parent 6423e5f2f9
commit 52164efd64
No known key found for this signature in database
GPG Key ID: 120217CE540C3670
2 changed files with 9 additions and 9 deletions

View File

@ -393,11 +393,12 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelines(
DelayedPortsProcessor::PortNumbers delayed_ports_numbers; DelayedPortsProcessor::PortNumbers delayed_ports_numbers;
std::vector<OutputPort*> joined_output_ports; std::vector<OutputPort*> joined_output_ports;
const Block left_header = left->getHeader();
for (size_t i = 0; i < num_streams; ++i) for (size_t i = 0; i < num_streams; ++i)
{ {
auto joining = std::make_shared<JoiningTransform>(left->getHeader(), join, max_block_size, false, default_totals, finish_counter); auto joining = std::make_shared<JoiningTransform>(left_header, join, max_block_size, false, default_totals, finish_counter);
// Process delayed joined blocks when all JoiningTransform are finished. // Process delayed joined blocks when all JoiningTransform are finished.
auto delayed = std::make_shared<DelayedJoinedBlocksTransform>(left->getHeader(), join); auto delayed = std::make_shared<DelayedJoinedBlocksTransform>(left_header, join);
connect(**lit, joining->getInputs().front()); connect(**lit, joining->getInputs().front());
connect(**rit, joining->getInputs().back()); connect(**rit, joining->getInputs().back());
@ -418,7 +419,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelines(
} }
// Process DelayedJoinedBlocksTransform after all JoiningTransforms. // Process DelayedJoinedBlocksTransform after all JoiningTransforms.
auto joined_header = JoiningTransform::transformHeader(left->getHeader(), join); auto joined_header = JoiningTransform::transformHeader(left_header, join);
auto delayed_processor = std::make_shared<DelayedPortsProcessor>(joined_header, num_streams + num_streams, delayed_ports_numbers); auto delayed_processor = std::make_shared<DelayedPortsProcessor>(joined_header, num_streams + num_streams, delayed_ports_numbers);
if (collected_processors) if (collected_processors)
collected_processors->emplace_back(delayed_processor); collected_processors->emplace_back(delayed_processor);
@ -436,7 +437,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelines(
if (left->hasTotals()) if (left->hasTotals())
{ {
auto joining = std::make_shared<JoiningTransform>(left->getHeader(), join, max_block_size, true, default_totals); auto joining = std::make_shared<JoiningTransform>(left_header, join, max_block_size, true, default_totals);
connect(*left->pipe.totals_port, joining->getInputs().front()); connect(*left->pipe.totals_port, joining->getInputs().front());
connect(**rit, joining->getInputs().back()); connect(**rit, joining->getInputs().back());
left->pipe.totals_port = &joining->getOutputs().front(); left->pipe.totals_port = &joining->getOutputs().front();
@ -455,7 +456,6 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelines(
left->pipe.processors.insert(left->pipe.processors.end(), right->pipe.processors.begin(), right->pipe.processors.end()); left->pipe.processors.insert(left->pipe.processors.end(), right->pipe.processors.begin(), right->pipe.processors.end());
left->resources = std::move(right->resources); left->resources = std::move(right->resources);
left->pipe.header = left->pipe.output_ports.front()->getHeader();
left->pipe.max_parallel_streams = std::max(left->pipe.max_parallel_streams, right->pipe.max_parallel_streams); left->pipe.max_parallel_streams = std::max(left->pipe.max_parallel_streams, right->pipe.max_parallel_streams);
return left; return left;
} }

View File

@ -1,10 +1,10 @@
(Expression) (Expression)
ExpressionTransform ExpressionTransform
(Join) (Join)
Concat 2 → 1 Resize 2 → 1
DelayedJoinedBlocksTransform DelayedPorts 2 → 2
JoiningTransform 2 → 2 DelayedJoinedBlocksTransform 0 → 1
Barrier JoiningTransform 2 → 1
(Expression) (Expression)
ExpressionTransform ExpressionTransform
(Limit) (Limit)