From 8c0786bd80a2aad2934395124d9c1213fe79e0cc Mon Sep 17 00:00:00 2001 From: yariks5s Date: Wed, 8 May 2024 19:43:22 +0000 Subject: [PATCH] fix for projections --- src/Interpreters/Squashing.cpp | 10 +--- .../Transforms/PlanSquashingTransform.cpp | 60 +++++++++---------- src/Storages/ProjectionsDescription.cpp | 2 +- 3 files changed, 31 insertions(+), 41 deletions(-) diff --git a/src/Interpreters/Squashing.cpp b/src/Interpreters/Squashing.cpp index 6063714e8db..ece124e8a15 100644 --- a/src/Interpreters/Squashing.cpp +++ b/src/Interpreters/Squashing.cpp @@ -142,11 +142,7 @@ Block ApplySquashing::add(Chunk && input_chunk) Block ApplySquashing::addImpl(Chunk && input_chunk) { if (!input_chunk.hasChunkInfo()) - { - Block to_return; - std::swap(to_return, accumulated_block); - return to_return; - } + return Block(); const auto *info = getInfoFromChunk(input_chunk); for (auto & chunk : info->chunks) @@ -225,7 +221,7 @@ Chunk PlanSquashing::addImpl(Chunk && input_chunk) Chunk res_chunk = convertToChunk(chunks_to_merge_vec); return res_chunk; } - return input_chunk; + return {}; } Chunk PlanSquashing::convertToChunk(std::vector &chunks) @@ -237,7 +233,7 @@ Chunk PlanSquashing::convertToChunk(std::vector &chunks) for (auto &chunk : chunks) info->chunks.push_back(std::move(chunk)); - chunks.clear(); // we can remove this + chunks.clear(); return Chunk(header.cloneEmptyColumns(), 0, info); } diff --git a/src/Processors/Transforms/PlanSquashingTransform.cpp b/src/Processors/Transforms/PlanSquashingTransform.cpp index 62ff3a0bf39..fe0f6ed39f5 100644 --- a/src/Processors/Transforms/PlanSquashingTransform.cpp +++ b/src/Processors/Transforms/PlanSquashingTransform.cpp @@ -41,34 +41,32 @@ IProcessor::Status PlanSquashingTransform::prepareConsume() return Status::Finished; } - all_finished = true; - for (auto & input : inputs) - { - if (input.isFinished()) - continue; - - all_finished = false; - } - - if (all_finished) /// If all inputs are closed, we check if we have data in balancing - { - if (balance.isDataLeft()) /// If we have data in balancing, we process this data - { - finished = false; - transform(chunk); - has_data = true; - } - else /// If we don't have data, We send FINISHED - { - for (auto & output : outputs) - output.finish(); - - return Status::Finished; - } - } - while (!chunk.hasChunkInfo()) { + all_finished = true; + for (auto & input : inputs) + { + if (!input.isFinished()) + all_finished = false; + } + + if (all_finished) /// If all inputs are closed, we check if we have data in balancing + { + if (balance.isDataLeft()) /// If we have data in balancing, we process this data + { + finished = false; + transform(chunk); + has_data = true; + } + else /// If we don't have data, We send FINISHED + { + for (auto & output : outputs) + output.finish(); + + return Status::Finished; + } + } + for (auto & input : inputs) { if (input.isFinished()) @@ -80,12 +78,7 @@ IProcessor::Status PlanSquashingTransform::prepareConsume() if (!balance.isDataLeft()) return Status::NeedData; else - { - finished = true; - transform(chunk); - has_data = true; - return Status::Ready; - } + continue; } chunk = input.pull(); @@ -96,7 +89,8 @@ IProcessor::Status PlanSquashingTransform::prepareConsume() has_data = true; return Status::Ready; } - + else + return Status::NeedData; } } return Status::Ready; diff --git a/src/Storages/ProjectionsDescription.cpp b/src/Storages/ProjectionsDescription.cpp index d1bcc89cbe0..87e203e8665 100644 --- a/src/Storages/ProjectionsDescription.cpp +++ b/src/Storages/ProjectionsDescription.cpp @@ -311,7 +311,7 @@ Block ProjectionDescription::calculate(const Block & block, ContextPtr context) builder.resize(1); // Generate aggregated blocks with rows less or equal than the original block. // There should be only one output block after this transformation. - builder.addTransform(std::make_shared(builder.getHeader(), block.rows(), 0, builder.getNumStreams())); + builder.addTransform(std::make_shared(builder.getHeader(), block.rows(), 0, 1)); builder.addTransform(std::make_shared(builder.getHeader(), block.rows(), 0)); auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));