more parallelism

This commit is contained in:
Nikita Taranov 2022-07-13 12:36:04 +02:00
parent f650b23ee3
commit 63bc894a42
6 changed files with 32 additions and 7 deletions

View File

@ -1,5 +1,4 @@
#include <algorithm>
#include <functional>
#include <future>
#include <numeric>
#include <Poco/Util/Application.h>
@ -1579,7 +1578,9 @@ Block Aggregator::convertOneBucketToBlock(
bool final,
size_t bucket) const
{
Block block = convertToBlockImpl</* return_single_block */ true>(
// Used in ConvertingAggregatedToChunksSource -> ConvertingAggregatedToChunksTransform (expects single chunk for each bucket_id).
constexpr bool return_single_block = true;
Block block = convertToBlockImpl<return_single_block>(
method, method.data.impls[bucket], arena, data_variants.aggregates_pools, final, method.data.impls[bucket].size());
block.info.bucket_num = bucket;

View File

@ -376,16 +376,15 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
});
/// We add the explicit resize here, but not in case of aggregating in order, since AIO don't use two-level hash tables and thus returns only buckets with bucket_number = -1.
pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : pipeline.getNumStreams(), true /* force */);
pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : params.max_threads, true /* force */);
aggregating = collector.detachProcessors(0);
}
else
{
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingTransform>(header, transform_params);
});
pipeline.addSimpleTransform([&](const Block & header) { return std::make_shared<AggregatingTransform>(header, transform_params); });
pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : params.max_threads, false /* force */);
aggregating = collector.detachProcessors(0);
}

View File

@ -161,6 +161,8 @@ public:
return std::max<size_t>(1, num_threads);
}
size_t getMaxThreads() const { return max_threads; }
/// Set upper limit for the recommend number of threads
void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }

View File

@ -1,4 +1,8 @@
<test>
<query>select sipHash64(number) from numbers(1e7) group by number format Null</query>
<query>select * from (select * from numbers(1e7) group by number) group by number format Null</query>
<query>select * from (select * from numbers(1e7) group by number) order by number format Null</query>
<query>select * from (select * from numbers_mt(1e7) group by number) group by number format Null</query>
<query>select * from (select * from numbers_mt(1e7) group by number) order by number format Null</query>
<query>select * from (select * from numbers_mt(1e7) group by number) group by number format Null settings max_bytes_before_external_group_by = 1</query>

View File

@ -1,5 +1,22 @@
-- { echoOn }
explain pipeline select * from (select * from numbers(1e8) group by number) group by number;
(Expression)
ExpressionTransform × 16
(Aggregating)
Resize 16 → 16
AggregatingTransform × 16
StrictResize 16 → 16
(Expression)
ExpressionTransform × 16
(Aggregating)
Resize 1 → 16
AggregatingTransform
(Expression)
ExpressionTransform
(ReadFromStorage)
Limit
Numbers 0 → 1
explain pipeline select * from (select * from numbers_mt(1e8) group by number) group by number;
(Expression)
ExpressionTransform × 16

View File

@ -4,6 +4,8 @@ set optimize_aggregation_in_order = 0;
-- { echoOn }
explain pipeline select * from (select * from numbers(1e8) group by number) group by number;
explain pipeline select * from (select * from numbers_mt(1e8) group by number) group by number;
explain pipeline select * from (select * from numbers_mt(1e8) group by number) order by number;