squash small blocks before join transforms

This commit is contained in:
Nikita Taranov 2024-11-02 15:29:09 +01:00
parent fce3a0463b
commit 4e8a96e9c1
6 changed files with 21 additions and 9 deletions

View File

@ -26,6 +26,7 @@
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Transforms/PasteJoinTransform.h>
#include <Processors/Transforms/SquashingTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
#include <QueryPipeline/narrowPipe.h>
#include <Common/CurrentThread.h>
@ -441,9 +442,12 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
Processors processors;
for (auto & outport : outports)
{
auto squashing = std::make_shared<SimpleSquashingChunksTransform>(right->getHeader(), max_block_size / 2, 0);
connect(*outport, squashing->getInputs().front());
processors.emplace_back(squashing);
auto adding_joined = std::make_shared<FillingRightJoinSideTransform>(right->getHeader(), join);
connect(*outport, adding_joined->getInputs().front());
processors.emplace_back(adding_joined);
connect(squashing->getOutputPort(), adding_joined->getInputs().front());
processors.emplace_back(std::move(adding_joined));
}
return processors;
};
@ -497,10 +501,13 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
Block left_header = left->getHeader();
for (size_t i = 0; i < num_streams; ++i)
{
auto squashing = std::make_shared<SimpleSquashingChunksTransform>(left->getHeader(), max_block_size / 2, 0);
connect(**lit, squashing->getInputs().front());
auto joining = std::make_shared<JoiningTransform>(
left_header, output_header, join, max_block_size, false, default_totals, finish_counter);
connect(**lit, joining->getInputs().front());
connect(squashing->getOutputPort(), joining->getInputs().front());
connect(**rit, joining->getInputs().back());
if (delayed_root)
{
@ -532,6 +539,7 @@ std::unique_ptr<QueryPipelineBuilder> QueryPipelineBuilder::joinPipelinesRightLe
if (collected_processors)
collected_processors->emplace_back(joining);
left->pipe.processors->emplace_back(std::move(squashing));
left->pipe.processors->emplace_back(std::move(joining));
}

View File

@ -5,11 +5,12 @@
<fill_query>INSERT INTO test SELECT number % 10000, number % 10000, number % 10000 FROM numbers(10000000)</fill_query>
<fill_query>INSERT INTO test1 SELECT number % 1000 , number % 1000, number % 1000 FROM numbers(100000)</fill_query>
<query tag='INNER'>SELECT MAX(test1.a) FROM test INNER JOIN test1 on test.b = test1.b</query>
<query tag='INNER'>SELECT MAX(test1.a) FROM test INNER JOIN test1 on test.b = test1.b settings join_algorithm='hash'</query>
<query tag='INNER'>SELECT MAX(test1.a) FROM test INNER JOIN test1 on test.b = test1.b settings join_algorithm='parallel_hash'</query>
<query tag='LEFT'>SELECT MAX(test1.a) FROM test LEFT JOIN test1 on test.b = test1.b</query>
<query tag='RIGHT'>SELECT MAX(test1.a) FROM test RIGHT JOIN test1 on test.b = test1.b</query>
<query tag='FULL'>SELECT MAX(test1.a) FROM test FULL JOIN test1 on test.b = test1.b</query>
<drop_query>DROP TABLE IF EXISTS test</drop_query>
<drop_query>DROP TABLE IF EXISTS test1</drop_query>
</test>
</test>

View File

@ -1,3 +1,4 @@
<test>
<query tag='INNER'>SELECT count(c) FROM numbers_mt(100000000) AS a INNER JOIN (SELECT number, toString(number) AS c FROM numbers(2000000)) AS b ON (a.number % 10000000) = b.number</query>
<query tag='INNER'>SELECT count(c) FROM numbers_mt(100000000) AS a INNER JOIN (SELECT number, toString(number) AS c FROM numbers(2000000)) AS b ON (a.number % 10000000) = b.number settings join_algorithm='hash'</query>
<query tag='INNER'>SELECT count(c) FROM numbers_mt(100000000) AS a INNER JOIN (SELECT number, toString(number) AS c FROM numbers(2000000)) AS b ON (a.number % 10000000) = b.number settings join_algorithm='parallel_hash'</query>
</test>

View File

@ -13,7 +13,8 @@
<query short='1' tag='ANY LEFT IN'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>
<query tag='INNER'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 = 20042</query>
<query tag='INNER KEY'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0</query>
<query tag='INNER KEY'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0 settings join_algorithm='hash'</query>
<query tag='INNER KEY'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 20042 settings query_plan_filter_push_down = 0 settings join_algorithm='parallel_hash'</query>
<query tag='INNER ON'>SELECT COUNT() FROM ints l INNER JOIN ints r ON l.i64 = r.i64 WHERE i32 = 20042</query>
<query tag='INNER IN'>SELECT COUNT() FROM ints l INNER JOIN ints r USING i64 WHERE i32 IN(42, 10042, 20042, 30042, 40042)</query>

File diff suppressed because one or more lines are too long

View File

@ -9,7 +9,8 @@
<max_threads>1</max_threads>
</settings>
<query>SELECT 1 FROM hits_10m_words AS l ANY LEFT JOIN hits_10m_words AS r USING (word) FORMAT Null</query>
<query>SELECT 1 FROM hits_10m_words AS l ANY LEFT JOIN hits_10m_words AS r USING (word) FORMAT Null settings join_algorithm='hash'</query>
<query>SELECT 1 FROM hits_10m_words AS l ANY LEFT JOIN hits_10m_words AS r USING (word) FORMAT Null settings join_algorithm='parallel_hash'</query>
<query>SELECT 1 FROM strings AS l ANY LEFT JOIN strings AS r USING (short) FORMAT Null</query>
<query>SELECT 1 FROM strings AS l ANY LEFT JOIN strings AS r USING (long) FORMAT Null</query>