From 8839defb636430cf4f3066941f23f3318e280312 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 23 Jan 2020 14:39:42 +0300 Subject: [PATCH 1/6] Fix ConvertingAggregatedToChunksTransform. --- .../Transforms/AggregatingTransform.cpp | 43 ++++++++++++++----- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/dbms/src/Processors/Transforms/AggregatingTransform.cpp b/dbms/src/Processors/Transforms/AggregatingTransform.cpp index c67c9119c9b..bac296f19bb 100644 --- a/dbms/src/Processors/Transforms/AggregatingTransform.cpp +++ b/dbms/src/Processors/Transforms/AggregatingTransform.cpp @@ -29,6 +29,19 @@ static Chunk convertToChunk(const Block & block) return chunk; } +static const AggregatedChunkInfo * getInfoFromChunk(const Chunk & chunk) +{ + auto & info = chunk.getChunkInfo(); + if (!info) + throw Exception("Chunk info was not set for chunk.", ErrorCodes::LOGICAL_ERROR); + + auto * agg_info = typeid_cast(info.get()); + if (!agg_info) + throw Exception("Chunk should have AggregatedChunkInfo.", ErrorCodes::LOGICAL_ERROR); + + return agg_info; +} + namespace { /// Reads chunks from file in native format. Provide chunks with aggregation info. @@ -77,13 +90,13 @@ public: struct SharedData { std::atomic next_bucket_to_merge = 0; - std::array, NUM_BUCKETS> source_for_bucket; + std::array, NUM_BUCKETS> is_bucket_processed; std::atomic is_cancelled = false; SharedData() { - for (auto & source : source_for_bucket) - source = -1; + for (auto & flag : is_bucket_processed) + flag = false; } }; @@ -116,7 +129,7 @@ protected: Block block = params->aggregator.mergeAndConvertOneBucketToBlock(*data, arena, params->final, bucket_num, &shared_data->is_cancelled); Chunk chunk = convertToChunk(block); - shared_data->source_for_bucket[bucket_num] = source_number; + shared_data->is_bucket_processed[bucket_num] = true; return chunk; } @@ -125,7 +138,7 @@ private: AggregatingTransformParamsPtr params; ManyAggregatedDataVariantsPtr data; SharedDataPtr shared_data; - Int32 source_number; + Int32 source_number [[maybe_unused]]; Arena * arena; }; @@ -249,16 +262,23 @@ private: { auto & output = outputs.front(); - Int32 next_input_num = shared_data->source_for_bucket[current_bucket_num]; - if (next_input_num < 0) + for (auto & input : inputs) + { + if (!input.isFinished() && input.hasData()) + { + auto chunk = input.pull(); + auto bucket = getInfoFromChunk(chunk)->bucket_num; + chunks[bucket] = std::move(chunk); + } + } + + if (!shared_data->is_bucket_processed[current_bucket_num]) return Status::NeedData; - auto next_input = std::next(inputs.begin(), next_input_num); - /// next_input can't be finished till data was not pulled. - if (!next_input->hasData()) + if (!chunks[current_bucket_num]) return Status::NeedData; - output.push(next_input->pull()); + output.push(std::move(chunks[current_bucket_num])); ++current_bucket_num; if (current_bucket_num == NUM_BUCKETS) @@ -286,6 +306,7 @@ private: UInt32 current_bucket_num = 0; static constexpr Int32 NUM_BUCKETS = 256; + std::array chunks; Processors processors; From bceb084512818437c829a376965039eee23ae6e3 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 23 Jan 2020 14:44:43 +0300 Subject: [PATCH 2/6] Fix ConvertingAggregatedToChunksTransform. --- dbms/src/Processors/Transforms/AggregatingTransform.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dbms/src/Processors/Transforms/AggregatingTransform.cpp b/dbms/src/Processors/Transforms/AggregatingTransform.cpp index bac296f19bb..ddd50cb60c5 100644 --- a/dbms/src/Processors/Transforms/AggregatingTransform.cpp +++ b/dbms/src/Processors/Transforms/AggregatingTransform.cpp @@ -112,7 +112,6 @@ public: , params(std::move(params_)) , data(std::move(data_)) , shared_data(std::move(shared_data_)) - , source_number(source_number_) , arena(arena_) {} @@ -138,7 +137,6 @@ private: AggregatingTransformParamsPtr params; ManyAggregatedDataVariantsPtr data; SharedDataPtr shared_data; - Int32 source_number [[maybe_unused]]; Arena * arena; }; @@ -380,7 +378,7 @@ private: { Arena * arena = first->aggregates_pools.at(thread).get(); auto source = std::make_shared( - params, data, shared_data, thread, arena); + params, data, shared_data, arena); processors.emplace_back(std::move(source)); } From b6a819c5d88fc66cbb289818e78669d7d13a13ef Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Thu, 23 Jan 2020 14:45:12 +0300 Subject: [PATCH 3/6] Fix ConvertingAggregatedToChunksTransform. --- dbms/src/Processors/Transforms/AggregatingTransform.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/dbms/src/Processors/Transforms/AggregatingTransform.cpp b/dbms/src/Processors/Transforms/AggregatingTransform.cpp index ddd50cb60c5..02fffc545db 100644 --- a/dbms/src/Processors/Transforms/AggregatingTransform.cpp +++ b/dbms/src/Processors/Transforms/AggregatingTransform.cpp @@ -106,7 +106,6 @@ public: AggregatingTransformParamsPtr params_, ManyAggregatedDataVariantsPtr data_, SharedDataPtr shared_data_, - Int32 source_number_, Arena * arena_) : ISource(params_->getHeader()) , params(std::move(params_)) From 1bf614b3a2b44344a0a976a8b17882d9770fc929 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Sat, 25 Jan 2020 21:39:50 +0300 Subject: [PATCH 4/6] Trigger tests. --- dbms/src/Processors/Transforms/AggregatingTransform.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/dbms/src/Processors/Transforms/AggregatingTransform.cpp b/dbms/src/Processors/Transforms/AggregatingTransform.cpp index 02fffc545db..e05c6b40567 100644 --- a/dbms/src/Processors/Transforms/AggregatingTransform.cpp +++ b/dbms/src/Processors/Transforms/AggregatingTransform.cpp @@ -6,6 +6,7 @@ #include #include + namespace ProfileEvents { extern const Event ExternalAggregationMerge; From 328dc523c5a586daba3e2948f9007b439c0c597b Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Sat, 25 Jan 2020 23:36:12 +0300 Subject: [PATCH 5/6] Update AggregatingTransform.cpp --- .../Transforms/AggregatingTransform.cpp | 56 +++++++++---------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/dbms/src/Processors/Transforms/AggregatingTransform.cpp b/dbms/src/Processors/Transforms/AggregatingTransform.cpp index e05c6b40567..9e4d6030e23 100644 --- a/dbms/src/Processors/Transforms/AggregatingTransform.cpp +++ b/dbms/src/Processors/Transforms/AggregatingTransform.cpp @@ -15,36 +15,36 @@ namespace ProfileEvents namespace DB { -/// Convert block to chunk. -/// Adds additional info about aggregation. -static Chunk convertToChunk(const Block & block) -{ - auto info = std::make_shared(); - info->bucket_num = block.info.bucket_num; - info->is_overflows = block.info.is_overflows; - - UInt64 num_rows = block.rows(); - Chunk chunk(block.getColumns(), num_rows); - chunk.setChunkInfo(std::move(info)); - - return chunk; -} - -static const AggregatedChunkInfo * getInfoFromChunk(const Chunk & chunk) -{ - auto & info = chunk.getChunkInfo(); - if (!info) - throw Exception("Chunk info was not set for chunk.", ErrorCodes::LOGICAL_ERROR); - - auto * agg_info = typeid_cast(info.get()); - if (!agg_info) - throw Exception("Chunk should have AggregatedChunkInfo.", ErrorCodes::LOGICAL_ERROR); - - return agg_info; -} - namespace { + /// Convert block to chunk. + /// Adds additional info about aggregation. + static Chunk convertToChunk(const Block & block) + { + auto info = std::make_shared(); + info->bucket_num = block.info.bucket_num; + info->is_overflows = block.info.is_overflows; + + UInt64 num_rows = block.rows(); + Chunk chunk(block.getColumns(), num_rows); + chunk.setChunkInfo(std::move(info)); + + return chunk; + } + + static const AggregatedChunkInfo * getInfoFromChunk(const Chunk & chunk) + { + auto & info = chunk.getChunkInfo(); + if (!info) + throw Exception("Chunk info was not set for chunk.", ErrorCodes::LOGICAL_ERROR); + + auto * agg_info = typeid_cast(info.get()); + if (!agg_info) + throw Exception("Chunk should have AggregatedChunkInfo.", ErrorCodes::LOGICAL_ERROR); + + return agg_info; + } + /// Reads chunks from file in native format. Provide chunks with aggregation info. class SourceFromNativeStream : public ISource { From aff87dbd82e9c4e82a66adf3ae0e678163a77607 Mon Sep 17 00:00:00 2001 From: alexey-milovidov Date: Sat, 25 Jan 2020 23:36:25 +0300 Subject: [PATCH 6/6] Update AggregatingTransform.cpp --- dbms/src/Processors/Transforms/AggregatingTransform.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Processors/Transforms/AggregatingTransform.cpp b/dbms/src/Processors/Transforms/AggregatingTransform.cpp index 9e4d6030e23..5e13813d2a9 100644 --- a/dbms/src/Processors/Transforms/AggregatingTransform.cpp +++ b/dbms/src/Processors/Transforms/AggregatingTransform.cpp @@ -19,7 +19,7 @@ namespace { /// Convert block to chunk. /// Adds additional info about aggregation. - static Chunk convertToChunk(const Block & block) + Chunk convertToChunk(const Block & block) { auto info = std::make_shared(); info->bucket_num = block.info.bucket_num; @@ -32,7 +32,7 @@ namespace return chunk; } - static const AggregatedChunkInfo * getInfoFromChunk(const Chunk & chunk) + const AggregatedChunkInfo * getInfoFromChunk(const Chunk & chunk) { auto & info = chunk.getChunkInfo(); if (!info)