fix race in aggregation with combinator distinct

This commit is contained in:
Anton Popov 2020-12-22 14:30:29 +03:00
parent 26637bd7ee
commit 57a1642035
6 changed files with 74 additions and 11 deletions

View File

@ -55,6 +55,13 @@ void ThreadPoolImpl<Thread>::setMaxThreads(size_t value)
max_threads = value; max_threads = value;
} }
template <typename Thread>
size_t ThreadPoolImpl<Thread>::getMaxThreads() const
{
std::lock_guard lock(mutex);
return max_threads;
}
template <typename Thread> template <typename Thread>
void ThreadPoolImpl<Thread>::setMaxFreeThreads(size_t value) void ThreadPoolImpl<Thread>::setMaxFreeThreads(size_t value)
{ {

View File

@ -71,6 +71,7 @@ public:
void setMaxThreads(size_t value); void setMaxThreads(size_t value);
void setMaxFreeThreads(size_t value); void setMaxFreeThreads(size_t value);
void setQueueSize(size_t value); void setQueueSize(size_t value);
size_t getMaxThreads() const;
private: private:
mutable std::mutex mutex; mutable std::mutex mutex;

View File

@ -913,15 +913,15 @@ template <typename Method>
Block Aggregator::convertOneBucketToBlock( Block Aggregator::convertOneBucketToBlock(
AggregatedDataVariants & data_variants, AggregatedDataVariants & data_variants,
Method & method, Method & method,
Arena * arena,
bool final, bool final,
size_t bucket) const size_t bucket) const
{ {
Block block = prepareBlockAndFill(data_variants, final, method.data.impls[bucket].size(), Block block = prepareBlockAndFill(data_variants, final, method.data.impls[bucket].size(),
[bucket, &method, this] ( [bucket, &method, arena, this] (
MutableColumns & key_columns, MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns, AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns, MutableColumns & final_aggregate_columns,
Arena * arena,
bool final_) bool final_)
{ {
convertToBlockImpl(method, method.data.impls[bucket], convertToBlockImpl(method, method.data.impls[bucket],
@ -950,7 +950,7 @@ Block Aggregator::mergeAndConvertOneBucketToBlock(
mergeBucketImpl<decltype(merged_data.NAME)::element_type>(variants, bucket, arena); \ mergeBucketImpl<decltype(merged_data.NAME)::element_type>(variants, bucket, arena); \
if (is_cancelled && is_cancelled->load(std::memory_order_seq_cst)) \ if (is_cancelled && is_cancelled->load(std::memory_order_seq_cst)) \
return {}; \ return {}; \
block = convertOneBucketToBlock(merged_data, *merged_data.NAME, final, bucket); \ block = convertOneBucketToBlock(merged_data, *merged_data.NAME, arena, final, bucket); \
} }
APPLY_FOR_VARIANTS_TWO_LEVEL(M) APPLY_FOR_VARIANTS_TWO_LEVEL(M)
@ -982,7 +982,7 @@ void Aggregator::writeToTemporaryFileImpl(
for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket) for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket)
{ {
Block block = convertOneBucketToBlock(data_variants, method, false, bucket); Block block = convertOneBucketToBlock(data_variants, method, data_variants.aggregates_pool, false, bucket);
out.write(block); out.write(block);
update_max_sizes(block); update_max_sizes(block);
} }
@ -1285,7 +1285,7 @@ Block Aggregator::prepareBlockAndFill(
} }
} }
filler(key_columns, aggregate_columns_data, final_aggregate_columns, data_variants.aggregates_pool, final); filler(key_columns, aggregate_columns_data, final_aggregate_columns, final);
Block res = header.cloneEmpty(); Block res = header.cloneEmpty();
@ -1352,7 +1352,6 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va
MutableColumns & key_columns, MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns, AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns, MutableColumns & final_aggregate_columns,
Arena * arena,
bool final_) bool final_)
{ {
if (data_variants.type == AggregatedDataVariants::Type::without_key || params.overflow_row) if (data_variants.type == AggregatedDataVariants::Type::without_key || params.overflow_row)
@ -1367,7 +1366,8 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va
} }
else else
{ {
insertAggregatesIntoColumns(data, final_aggregate_columns, arena); /// Always single-thread. It's safe to pass current arena from 'aggregates_pool'.
insertAggregatesIntoColumns(data, final_aggregate_columns, data_variants.aggregates_pool);
} }
if (params.overflow_row) if (params.overflow_row)
@ -1395,13 +1395,12 @@ Block Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_v
MutableColumns & key_columns, MutableColumns & key_columns,
AggregateColumnsData & aggregate_columns, AggregateColumnsData & aggregate_columns,
MutableColumns & final_aggregate_columns, MutableColumns & final_aggregate_columns,
Arena * arena,
bool final_) bool final_)
{ {
#define M(NAME) \ #define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \ else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
convertToBlockImpl(*data_variants.NAME, data_variants.NAME->data, \ convertToBlockImpl(*data_variants.NAME, data_variants.NAME->data, \
key_columns, aggregate_columns, final_aggregate_columns, arena, final_); key_columns, aggregate_columns, final_aggregate_columns, data_variants.aggregates_pool, final_);
if (false) {} // NOLINT if (false) {} // NOLINT
APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) APPLY_FOR_VARIANTS_SINGLE_LEVEL(M)
@ -1435,11 +1434,21 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl(
bool final, bool final,
ThreadPool * thread_pool) const ThreadPool * thread_pool) const
{ {
size_t max_threads = thread_pool->getMaxThreads();
if (max_threads > data_variants.aggregates_pools.size())
for (size_t i = data_variants.aggregates_pools.size(); i < max_threads; ++i)
data_variants.aggregates_pools.push_back(std::make_shared<Arena>());
auto converter = [&](size_t bucket, ThreadGroupStatusPtr thread_group) auto converter = [&](size_t bucket, ThreadGroupStatusPtr thread_group)
{ {
if (thread_group) if (thread_group)
CurrentThread::attachToIfDetached(thread_group); CurrentThread::attachToIfDetached(thread_group);
return convertOneBucketToBlock(data_variants, method, final, bucket);
/// Select Arena to avoid race conditions
size_t thread_number = static_cast<size_t>(bucket) % max_threads;
Arena * arena = data_variants.aggregates_pools.at(thread_number).get();
return convertOneBucketToBlock(data_variants, method, arena, final, bucket);
}; };
/// packaged_task is used to ensure that exceptions are automatically thrown into the main stream. /// packaged_task is used to ensure that exceptions are automatically thrown into the main stream.
@ -1949,7 +1958,7 @@ private:
else if (method == AggregatedDataVariants::Type::NAME) \ else if (method == AggregatedDataVariants::Type::NAME) \
{ \ { \
aggregator.mergeBucketImpl<decltype(merged_data.NAME)::element_type>(data, bucket_num, arena); \ aggregator.mergeBucketImpl<decltype(merged_data.NAME)::element_type>(data, bucket_num, arena); \
block = aggregator.convertOneBucketToBlock(merged_data, *merged_data.NAME, final, bucket_num); \ block = aggregator.convertOneBucketToBlock(merged_data, *merged_data.NAME, arena, final, bucket_num); \
} }
APPLY_FOR_VARIANTS_TWO_LEVEL(M) APPLY_FOR_VARIANTS_TWO_LEVEL(M)

View File

@ -1212,6 +1212,7 @@ protected:
Block convertOneBucketToBlock( Block convertOneBucketToBlock(
AggregatedDataVariants & data_variants, AggregatedDataVariants & data_variants,
Method & method, Method & method,
Arena * arena,
bool final, bool final,
size_t bucket) const; size_t bucket) const;

View File

@ -0,0 +1,20 @@
['0']
['1']
['2']
['3']
['4']
['5']
['6']
['7']
['8']
['9']
test.com ['foo3223','foo6455','foo382','foo5566','foo1037']
test.com0 ['foo0']
test.com0.0001 ['foo1']
test.com0.0002 ['foo2']
test.com0.0003 ['foo3']
test.com0.0004 ['foo4']
test.com0.0005 ['foo5']
test.com0.0006 ['foo6']
test.com0.0007 ['foo7']
test.com0.0008 ['foo8']

View File

@ -0,0 +1,25 @@
SET group_by_two_level_threshold_bytes = 1;
SET group_by_two_level_threshold = 1;
SELECT groupArray(DISTINCT toString(number % 10)) FROM numbers_mt(50000)
GROUP BY number ORDER BY number LIMIT 10
SETTINGS max_threads = 2, max_block_size = 2000;
DROP TABLE IF EXISTS dictinct_two_level;
CREATE TABLE dictinct_two_level (
time DateTime64(3),
domain String,
subdomain String
) ENGINE = MergeTree ORDER BY time;
INSERT INTO dictinct_two_level SELECT 1546300800000, 'test.com', concat('foo', toString(number % 10000)) from numbers(10000);
INSERT INTO dictinct_two_level SELECT 1546300800000, concat('test.com', toString(number / 10000)) , concat('foo', toString(number % 10000)) from numbers(10000);
SELECT
domain, groupArraySample(5, 11111)(DISTINCT subdomain) AS example_subdomains
FROM dictinct_two_level
GROUP BY domain ORDER BY domain, example_subdomains
LIMIT 10;
DROP TABLE IF EXISTS dictinct_two_level;