From 57a16420353be05b84742c15c7ceef3ed91d4247 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Tue, 22 Dec 2020 14:30:29 +0300 Subject: [PATCH] fix race in aggregation with combinator distinct --- src/Common/ThreadPool.cpp | 7 +++++ src/Common/ThreadPool.h | 1 + src/Interpreters/Aggregator.cpp | 31 ++++++++++++------- src/Interpreters/Aggregator.h | 1 + .../01605_dictinct_two_level.reference | 20 ++++++++++++ .../0_stateless/01605_dictinct_two_level.sql | 25 +++++++++++++++ 6 files changed, 74 insertions(+), 11 deletions(-) create mode 100644 tests/queries/0_stateless/01605_dictinct_two_level.reference create mode 100644 tests/queries/0_stateless/01605_dictinct_two_level.sql diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index 7b2c2108629..7fc0d65aa5b 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -55,6 +55,13 @@ void ThreadPoolImpl::setMaxThreads(size_t value) max_threads = value; } +template +size_t ThreadPoolImpl::getMaxThreads() const +{ + std::lock_guard lock(mutex); + return max_threads; +} + template void ThreadPoolImpl::setMaxFreeThreads(size_t value) { diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index 8dd6cbbe02c..0ae023e4ebd 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -71,6 +71,7 @@ public: void setMaxThreads(size_t value); void setMaxFreeThreads(size_t value); void setQueueSize(size_t value); + size_t getMaxThreads() const; private: mutable std::mutex mutex; diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 87abca4d7cd..f6f1f7c8d53 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -913,15 +913,15 @@ template Block Aggregator::convertOneBucketToBlock( AggregatedDataVariants & data_variants, Method & method, + Arena * arena, bool final, size_t bucket) const { Block block = prepareBlockAndFill(data_variants, final, method.data.impls[bucket].size(), - [bucket, &method, this] ( + [bucket, &method, arena, this] ( MutableColumns & key_columns, AggregateColumnsData & aggregate_columns, MutableColumns & final_aggregate_columns, - Arena * arena, bool final_) { convertToBlockImpl(method, method.data.impls[bucket], @@ -950,7 +950,7 @@ Block Aggregator::mergeAndConvertOneBucketToBlock( mergeBucketImpl(variants, bucket, arena); \ if (is_cancelled && is_cancelled->load(std::memory_order_seq_cst)) \ return {}; \ - block = convertOneBucketToBlock(merged_data, *merged_data.NAME, final, bucket); \ + block = convertOneBucketToBlock(merged_data, *merged_data.NAME, arena, final, bucket); \ } APPLY_FOR_VARIANTS_TWO_LEVEL(M) @@ -982,7 +982,7 @@ void Aggregator::writeToTemporaryFileImpl( for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket) { - Block block = convertOneBucketToBlock(data_variants, method, false, bucket); + Block block = convertOneBucketToBlock(data_variants, method, data_variants.aggregates_pool, false, bucket); out.write(block); update_max_sizes(block); } @@ -1285,7 +1285,7 @@ Block Aggregator::prepareBlockAndFill( } } - filler(key_columns, aggregate_columns_data, final_aggregate_columns, data_variants.aggregates_pool, final); + filler(key_columns, aggregate_columns_data, final_aggregate_columns, final); Block res = header.cloneEmpty(); @@ -1352,7 +1352,6 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va MutableColumns & key_columns, AggregateColumnsData & aggregate_columns, MutableColumns & final_aggregate_columns, - Arena * arena, bool final_) { if (data_variants.type == AggregatedDataVariants::Type::without_key || params.overflow_row) @@ -1367,7 +1366,8 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va } else { - insertAggregatesIntoColumns(data, final_aggregate_columns, arena); + /// Always single-thread. It's safe to pass current arena from 'aggregates_pool'. + insertAggregatesIntoColumns(data, final_aggregate_columns, data_variants.aggregates_pool); } if (params.overflow_row) @@ -1395,13 +1395,12 @@ Block Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_v MutableColumns & key_columns, AggregateColumnsData & aggregate_columns, MutableColumns & final_aggregate_columns, - Arena * arena, bool final_) { #define M(NAME) \ else if (data_variants.type == AggregatedDataVariants::Type::NAME) \ convertToBlockImpl(*data_variants.NAME, data_variants.NAME->data, \ - key_columns, aggregate_columns, final_aggregate_columns, arena, final_); + key_columns, aggregate_columns, final_aggregate_columns, data_variants.aggregates_pool, final_); if (false) {} // NOLINT APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) @@ -1435,11 +1434,21 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl( bool final, ThreadPool * thread_pool) const { + size_t max_threads = thread_pool->getMaxThreads(); + if (max_threads > data_variants.aggregates_pools.size()) + for (size_t i = data_variants.aggregates_pools.size(); i < max_threads; ++i) + data_variants.aggregates_pools.push_back(std::make_shared()); + auto converter = [&](size_t bucket, ThreadGroupStatusPtr thread_group) { if (thread_group) CurrentThread::attachToIfDetached(thread_group); - return convertOneBucketToBlock(data_variants, method, final, bucket); + + /// Select Arena to avoid race conditions + size_t thread_number = static_cast(bucket) % max_threads; + Arena * arena = data_variants.aggregates_pools.at(thread_number).get(); + + return convertOneBucketToBlock(data_variants, method, arena, final, bucket); }; /// packaged_task is used to ensure that exceptions are automatically thrown into the main stream. @@ -1949,7 +1958,7 @@ private: else if (method == AggregatedDataVariants::Type::NAME) \ { \ aggregator.mergeBucketImpl(data, bucket_num, arena); \ - block = aggregator.convertOneBucketToBlock(merged_data, *merged_data.NAME, final, bucket_num); \ + block = aggregator.convertOneBucketToBlock(merged_data, *merged_data.NAME, arena, final, bucket_num); \ } APPLY_FOR_VARIANTS_TWO_LEVEL(M) diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index c688da9d32d..86806b7fbad 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -1212,6 +1212,7 @@ protected: Block convertOneBucketToBlock( AggregatedDataVariants & data_variants, Method & method, + Arena * arena, bool final, size_t bucket) const; diff --git a/tests/queries/0_stateless/01605_dictinct_two_level.reference b/tests/queries/0_stateless/01605_dictinct_two_level.reference new file mode 100644 index 00000000000..50d1615e1aa --- /dev/null +++ b/tests/queries/0_stateless/01605_dictinct_two_level.reference @@ -0,0 +1,20 @@ +['0'] +['1'] +['2'] +['3'] +['4'] +['5'] +['6'] +['7'] +['8'] +['9'] +test.com ['foo3223','foo6455','foo382','foo5566','foo1037'] +test.com0 ['foo0'] +test.com0.0001 ['foo1'] +test.com0.0002 ['foo2'] +test.com0.0003 ['foo3'] +test.com0.0004 ['foo4'] +test.com0.0005 ['foo5'] +test.com0.0006 ['foo6'] +test.com0.0007 ['foo7'] +test.com0.0008 ['foo8'] diff --git a/tests/queries/0_stateless/01605_dictinct_two_level.sql b/tests/queries/0_stateless/01605_dictinct_two_level.sql new file mode 100644 index 00000000000..5f20ae590c5 --- /dev/null +++ b/tests/queries/0_stateless/01605_dictinct_two_level.sql @@ -0,0 +1,25 @@ +SET group_by_two_level_threshold_bytes = 1; +SET group_by_two_level_threshold = 1; + +SELECT groupArray(DISTINCT toString(number % 10)) FROM numbers_mt(50000) + GROUP BY number ORDER BY number LIMIT 10 + SETTINGS max_threads = 2, max_block_size = 2000; + +DROP TABLE IF EXISTS dictinct_two_level; + +CREATE TABLE dictinct_two_level ( + time DateTime64(3), + domain String, + subdomain String +) ENGINE = MergeTree ORDER BY time; + +INSERT INTO dictinct_two_level SELECT 1546300800000, 'test.com', concat('foo', toString(number % 10000)) from numbers(10000); +INSERT INTO dictinct_two_level SELECT 1546300800000, concat('test.com', toString(number / 10000)) , concat('foo', toString(number % 10000)) from numbers(10000); + +SELECT + domain, groupArraySample(5, 11111)(DISTINCT subdomain) AS example_subdomains +FROM dictinct_two_level +GROUP BY domain ORDER BY domain, example_subdomains +LIMIT 10; + +DROP TABLE IF EXISTS dictinct_two_level;