squash follow up

This commit is contained in:
Nikita Taranov 2024-11-03 16:42:07 +01:00
parent 4e8a96e9c1
commit 2892aa11e5
3 changed files with 14 additions and 8 deletions

View File

@ -1,9 +1,10 @@
#include <Processors/QueryPlan/JoinStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Transforms/JoiningTransform.h>
#include <IO/Operators.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/TableJoin.h>
#include <IO/Operators.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/Transforms/JoiningTransform.h>
#include <Processors/Transforms/SquashingTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/JSONBuilder.h>
#include <Common/typeid_cast.h>
@ -63,7 +64,7 @@ QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines
return joined_pipeline;
}
return QueryPipelineBuilder::joinPipelinesRightLeft(
auto ppl = QueryPipelineBuilder::joinPipelinesRightLeft(
std::move(pipelines[0]),
std::move(pipelines[1]),
join,
@ -72,6 +73,11 @@ QueryPipelineBuilderPtr JoinStep::updatePipeline(QueryPipelineBuilders pipelines
max_streams,
keep_left_read_in_order,
&processors);
ppl->addSimpleTransform([&](const Block & header)
{ return std::make_shared<SimpleSquashingChunksTransform>(header, max_block_size / 2, 1_MiB / 2); });
return ppl;
}
bool JoinStep::allowPushDownToRight() const

View File

@ -78,7 +78,7 @@ Chunk SimpleSquashingChunksTransform::generate()
bool SimpleSquashingChunksTransform::canGenerate()
{
return !squashed_chunk.empty();
return squashed_chunk.hasRows();
}
Chunk SimpleSquashingChunksTransform::getRemaining()

View File

@ -442,7 +442,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
Processors processors;
for (auto & outport : outports)
{
auto squashing = std::make_shared<SimpleSquashingChunksTransform>(right->getHeader(), max_block_size / 2, 0);
auto squashing = std::make_shared<SimpleSquashingChunksTransform>(right->getHeader(), max_block_size / 2, 1_MiB / 2);
connect(*outport, squashing->getInputs().front());
processors.emplace_back(squashing);
auto adding_joined = std::make_shared<FillingRightJoinSideTransform>(right->getHeader(), join);
@ -501,7 +501,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
Block left_header = left->getHeader();
for (size_t i = 0; i < num_streams; ++i)
{
auto squashing = std::make_shared<SimpleSquashingChunksTransform>(left->getHeader(), max_block_size / 2, 0);
auto squashing = std::make_shared<SimpleSquashingChunksTransform>(left->getHeader(), max_block_size / 2, 1_MiB / 2);
connect(**lit, squashing->getInputs().front());
auto joining = std::make_shared<JoiningTransform>(