From 6976b65b2deb2ba701873b753f7376b5ee00a7c1 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 4 Mar 2019 19:06:28 +0300 Subject: [PATCH] Added MergingAggregatedTransform. --- dbms/src/Interpreters/Aggregator.cpp | 13 +++- dbms/src/Interpreters/Aggregator.h | 4 ++ .../Transforms/AggregatingTransform.cpp | 10 ++- .../Transforms/AggregatingTransform.h | 9 ++- .../Transforms/MergingAggregatedTransform.cpp | 71 +++++++++++++++++++ .../Transforms/MergingAggregatedTransform.h | 53 ++++++++++++++ 6 files changed, 157 insertions(+), 3 deletions(-) create mode 100644 dbms/src/Processors/Transforms/MergingAggregatedTransform.cpp create mode 100644 dbms/src/Processors/Transforms/MergingAggregatedTransform.h diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 222b6cc796e..b17db91aefa 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -2069,11 +2069,22 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block)); } - LOG_TRACE(log, "Read " << total_input_blocks << " blocks of partially aggregated data, total " << total_input_rows << " rows."); + LOG_TRACE(log, "Read " << total_input_blocks << " blocks of partially aggregated data, total " << total_input_rows + << " rows."); + mergeBlocks(bucket_to_blocks, result, max_threads); +} + +void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads) +{ if (bucket_to_blocks.empty()) return; + UInt64 total_input_blocks = 0; + for (auto & bucket : bucket_to_blocks) + for (auto & block : bucket.second) + total_input_blocks += block.rows(); + /** `minus one` means the absence of information about the bucket * - in the case of single-level aggregation, as well as for blocks with "overflowing" values. * If there is at least one block with a bucket number greater or equal than zero, then there was a two-level aggregation. diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index 87febbc77e8..b55dceed53f 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -854,6 +854,10 @@ public: */ void mergeStream(const BlockInputStreamPtr & stream, AggregatedDataVariants & result, size_t max_threads); + using BucketToBlocks = std::map; + /// Merge partially aggregated blocks separated to buckets into one data structure. + void mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads); + /// Merge several partially aggregated blocks into one. /// Precondition: for all blocks block.info.is_overflows flag must be the same. /// (either all blocks are from overflow data or none blocks are). diff --git a/dbms/src/Processors/Transforms/AggregatingTransform.cpp b/dbms/src/Processors/Transforms/AggregatingTransform.cpp index d0e8586f1c0..e9fd0720e5c 100644 --- a/dbms/src/Processors/Transforms/AggregatingTransform.cpp +++ b/dbms/src/Processors/Transforms/AggregatingTransform.cpp @@ -101,8 +101,16 @@ Chunk AggregatingTransform::generate() if (!block) return {}; + + auto info = std::make_shared(); + info->bucket_num = block.info.bucket_num; + info->is_overflows = block.info.is_overflows; + UInt64 num_rows = block.rows(); - return Chunk(block.getColumns(), num_rows); + Chunk chunk(block.getColumns(), num_rows); + chunk.setChunkInfo(std::move(info)); + + return chunk; } AggregatingTransform::TemporaryFileStream::TemporaryFileStream(const std::string & path) diff --git a/dbms/src/Processors/Transforms/AggregatingTransform.h b/dbms/src/Processors/Transforms/AggregatingTransform.h index 8950288ecd8..dbc384ee761 100644 --- a/dbms/src/Processors/Transforms/AggregatingTransform.h +++ b/dbms/src/Processors/Transforms/AggregatingTransform.h @@ -7,6 +7,13 @@ namespace DB { +class AggregatedChunkInfo : public ChunkInfo +{ +public: + bool is_overflows = false; + Int32 bucket_num = -1; +}; + class IBlockInputStream; using BlockInputStreamPtr = std::shared_ptr; @@ -48,7 +55,7 @@ private: }; AggregatingTransformParamsPtr params; - Logger * log = &Logger::get("AggregatingBlockInputStream"); + Logger * log = &Logger::get("AggregatingTransform"); StringRefs key; ColumnRawPtrs key_columns; diff --git a/dbms/src/Processors/Transforms/MergingAggregatedTransform.cpp b/dbms/src/Processors/Transforms/MergingAggregatedTransform.cpp new file mode 100644 index 00000000000..e4d3fc5c61f --- /dev/null +++ b/dbms/src/Processors/Transforms/MergingAggregatedTransform.cpp @@ -0,0 +1,71 @@ +#include +#include + +namespace DB +{ + +MergingAggregatedTransform::MergingAggregatedTransform( + Block header, MergingAggregatedTransformParamsPtr params, size_t max_threads) + : IAccumulatingTransform(std::move(header), params->getHeader()) + , params(std::move(params)), max_threads(max_threads) +{ +} + +void MergingAggregatedTransform::consume(Chunk chunk) +{ + if (!consume_started) + { + consume_started = true; + LOG_TRACE(log, "Reading blocks of partially aggregated data."); + } + + total_input_rows += chunk.getNumRows(); + ++total_input_blocks; + + auto & info = chunk.getChunkInfo(); + if (!info) + throw Exception("Chunk info was not set for chunk in MergingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR); + + auto * agg_info = typeid_cast(info.get()); + if (!agg_info) + throw Exception("Chunk should have AggregatedChunkInfo in MergingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR); + + auto block = getInputPort().getHeader().cloneWithColumns(chunk.getColumns()); + block.info.is_overflows = agg_info->is_overflows; + block.info.bucket_num = agg_info->bucket_num; + + bucket_to_blocks[agg_info->bucket_num].emplace_back(std::move(block)); +} + +Chunk MergingAggregatedTransform::generate() +{ + if (!generate_started) + { + generate_started = true; + LOG_TRACE(log, "Read " << total_input_blocks << " blocks of partially aggregated data, total " << total_input_rows + << " rows."); + + /// TODO: this operation can be made async. Add async for IAccumulatingTransform. + params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads); + blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads); + next_block = blocks.begin(); + } + + if (next_block == blocks.end()) + return {}; + + auto block = std::move(*next_block); + ++next_block; + + auto info = std::make_shared(); + info->bucket_num = block.info.bucket_num; + info->is_overflows = block.info.is_overflows; + + UInt64 num_rows = block.rows(); + Chunk chunk(block.getColumns(), num_rows); + chunk.setChunkInfo(std::move(info)); + + return chunk; +} + +} diff --git a/dbms/src/Processors/Transforms/MergingAggregatedTransform.h b/dbms/src/Processors/Transforms/MergingAggregatedTransform.h new file mode 100644 index 00000000000..79befc629bf --- /dev/null +++ b/dbms/src/Processors/Transforms/MergingAggregatedTransform.h @@ -0,0 +1,53 @@ +#pragma once +#include +#include + +namespace DB +{ + +struct MergingAggregatedTransformParams +{ + Aggregator::Params params; + Aggregator aggregator; + bool final; + + MergingAggregatedTransformParams(const Aggregator::Params & params, bool final) + : params(params), aggregator(params), final(final) {} + + Block getHeader() const { return aggregator.getHeader(final); } +}; + +using MergingAggregatedTransformParamsPtr = std::unique_ptr; + +/** A pre-aggregate stream of blocks in which each block is already aggregated. + * Aggregate functions in blocks should not be finalized so that their states can be merged. + */ +class MergingAggregatedTransform : public IAccumulatingTransform +{ +public: + MergingAggregatedTransform(Block header, MergingAggregatedTransformParamsPtr params, size_t max_threads); + String getName() const override { return "MergingAggregatedTransform"; } + +protected: + void consume(Chunk chunk) override; + Chunk generate() override; + +private: + MergingAggregatedTransformParamsPtr params; + Logger * log = &Logger::get("MergingAggregatedTransform"); + size_t max_threads; + + AggregatedDataVariants data_variants; + Aggregator::BucketToBlocks bucket_to_blocks; + + UInt64 total_input_rows = 0; + UInt64 total_input_blocks = 0; + + BlocksList blocks; + BlocksList::iterator next_block; + + bool consume_started = false; + bool generate_started = false; +}; + +}