fix for projections

This commit is contained in:
yariks5s 2024-05-08 19:43:22 +00:00
parent e1ed0af3d2
commit 8c0786bd80
3 changed files with 31 additions and 41 deletions

View File

@ -142,11 +142,7 @@ Block ApplySquashing::add(Chunk && input_chunk)
Block ApplySquashing::addImpl(Chunk && input_chunk)
{
if (!input_chunk.hasChunkInfo())
{
Block to_return;
std::swap(to_return, accumulated_block);
return to_return;
}
return Block();
const auto *info = getInfoFromChunk(input_chunk);
for (auto & chunk : info->chunks)
@ -225,7 +221,7 @@ Chunk PlanSquashing::addImpl(Chunk && input_chunk)
Chunk res_chunk = convertToChunk(chunks_to_merge_vec);
return res_chunk;
}
return input_chunk;
return {};
}
Chunk PlanSquashing::convertToChunk(std::vector<Chunk> &chunks)
@ -237,7 +233,7 @@ Chunk PlanSquashing::convertToChunk(std::vector<Chunk> &chunks)
for (auto &chunk : chunks)
info->chunks.push_back(std::move(chunk));
chunks.clear(); // we can remove this
chunks.clear();
return Chunk(header.cloneEmptyColumns(), 0, info);
}

View File

@ -41,34 +41,32 @@ IProcessor::Status PlanSquashingTransform::prepareConsume()
return Status::Finished;
}
all_finished = true;
for (auto & input : inputs)
{
if (input.isFinished())
continue;
all_finished = false;
}
if (all_finished) /// If all inputs are closed, we check if we have data in balancing
{
if (balance.isDataLeft()) /// If we have data in balancing, we process this data
{
finished = false;
transform(chunk);
has_data = true;
}
else /// If we don't have data, We send FINISHED
{
for (auto & output : outputs)
output.finish();
return Status::Finished;
}
}
while (!chunk.hasChunkInfo())
{
all_finished = true;
for (auto & input : inputs)
{
if (!input.isFinished())
all_finished = false;
}
if (all_finished) /// If all inputs are closed, we check if we have data in balancing
{
if (balance.isDataLeft()) /// If we have data in balancing, we process this data
{
finished = false;
transform(chunk);
has_data = true;
}
else /// If we don't have data, We send FINISHED
{
for (auto & output : outputs)
output.finish();
return Status::Finished;
}
}
for (auto & input : inputs)
{
if (input.isFinished())
@ -80,12 +78,7 @@ IProcessor::Status PlanSquashingTransform::prepareConsume()
if (!balance.isDataLeft())
return Status::NeedData;
else
{
finished = true;
transform(chunk);
has_data = true;
return Status::Ready;
}
continue;
}
chunk = input.pull();
@ -96,7 +89,8 @@ IProcessor::Status PlanSquashingTransform::prepareConsume()
has_data = true;
return Status::Ready;
}
else
return Status::NeedData;
}
}
return Status::Ready;

View File

@ -311,7 +311,7 @@ Block ProjectionDescription::calculate(const Block & block, ContextPtr context)
builder.resize(1);
// Generate aggregated blocks with rows less or equal than the original block.
// There should be only one output block after this transformation.
builder.addTransform(std::make_shared<PlanSquashingTransform>(builder.getHeader(), block.rows(), 0, builder.getNumStreams()));
builder.addTransform(std::make_shared<PlanSquashingTransform>(builder.getHeader(), block.rows(), 0, 1));
builder.addTransform(std::make_shared<ApplySquashingTransform>(builder.getHeader(), block.rows(), 0));
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));