Added MergingAggregatedTransform.

This commit is contained in:
Nikolai Kochetov 2019-03-04 19:06:28 +03:00
parent dd74e82d47
commit 6976b65b2d
6 changed files with 157 additions and 3 deletions

View File

@ -2069,11 +2069,22 @@ void Aggregator::mergeStream(const BlockInputStreamPtr & stream, AggregatedDataV
bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block));
}
LOG_TRACE(log, "Read " << total_input_blocks << " blocks of partially aggregated data, total " << total_input_rows << " rows.");
LOG_TRACE(log, "Read " << total_input_blocks << " blocks of partially aggregated data, total " << total_input_rows
<< " rows.");
mergeBlocks(bucket_to_blocks, result, max_threads);
}
void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads)
{
if (bucket_to_blocks.empty())
return;
UInt64 total_input_blocks = 0;
for (auto & bucket : bucket_to_blocks)
for (auto & block : bucket.second)
total_input_blocks += block.rows();
/** `minus one` means the absence of information about the bucket
* - in the case of single-level aggregation, as well as for blocks with "overflowing" values.
* If there is at least one block with a bucket number greater or equal than zero, then there was a two-level aggregation.

View File

@ -854,6 +854,10 @@ public:
*/
void mergeStream(const BlockInputStreamPtr & stream, AggregatedDataVariants & result, size_t max_threads);
using BucketToBlocks = std::map<Int32, BlocksList>;
/// Merge partially aggregated blocks separated to buckets into one data structure.
void mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVariants & result, size_t max_threads);
/// Merge several partially aggregated blocks into one.
/// Precondition: for all blocks block.info.is_overflows flag must be the same.
/// (either all blocks are from overflow data or none blocks are).

View File

@ -101,8 +101,16 @@ Chunk AggregatingTransform::generate()
if (!block)
return {};
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = block.info.bucket_num;
info->is_overflows = block.info.is_overflows;
UInt64 num_rows = block.rows();
return Chunk(block.getColumns(), num_rows);
Chunk chunk(block.getColumns(), num_rows);
chunk.setChunkInfo(std::move(info));
return chunk;
}
AggregatingTransform::TemporaryFileStream::TemporaryFileStream(const std::string & path)

View File

@ -7,6 +7,13 @@
namespace DB
{
class AggregatedChunkInfo : public ChunkInfo
{
public:
bool is_overflows = false;
Int32 bucket_num = -1;
};
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
@ -48,7 +55,7 @@ private:
};
AggregatingTransformParamsPtr params;
Logger * log = &Logger::get("AggregatingBlockInputStream");
Logger * log = &Logger::get("AggregatingTransform");
StringRefs key;
ColumnRawPtrs key_columns;

View File

@ -0,0 +1,71 @@
#include <Processors/Transforms/MergingAggregatedTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
namespace DB
{
MergingAggregatedTransform::MergingAggregatedTransform(
Block header, MergingAggregatedTransformParamsPtr params, size_t max_threads)
: IAccumulatingTransform(std::move(header), params->getHeader())
, params(std::move(params)), max_threads(max_threads)
{
}
void MergingAggregatedTransform::consume(Chunk chunk)
{
if (!consume_started)
{
consume_started = true;
LOG_TRACE(log, "Reading blocks of partially aggregated data.");
}
total_input_rows += chunk.getNumRows();
++total_input_blocks;
auto & info = chunk.getChunkInfo();
if (!info)
throw Exception("Chunk info was not set for chunk in MergingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR);
auto * agg_info = typeid_cast<const AggregatedChunkInfo *>(info.get());
if (!agg_info)
throw Exception("Chunk should have AggregatedChunkInfo in MergingAggregatedTransform.", ErrorCodes::LOGICAL_ERROR);
auto block = getInputPort().getHeader().cloneWithColumns(chunk.getColumns());
block.info.is_overflows = agg_info->is_overflows;
block.info.bucket_num = agg_info->bucket_num;
bucket_to_blocks[agg_info->bucket_num].emplace_back(std::move(block));
}
Chunk MergingAggregatedTransform::generate()
{
if (!generate_started)
{
generate_started = true;
LOG_TRACE(log, "Read " << total_input_blocks << " blocks of partially aggregated data, total " << total_input_rows
<< " rows.");
/// TODO: this operation can be made async. Add async for IAccumulatingTransform.
params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads);
blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads);
next_block = blocks.begin();
}
if (next_block == blocks.end())
return {};
auto block = std::move(*next_block);
++next_block;
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = block.info.bucket_num;
info->is_overflows = block.info.is_overflows;
UInt64 num_rows = block.rows();
Chunk chunk(block.getColumns(), num_rows);
chunk.setChunkInfo(std::move(info));
return chunk;
}
}

View File

@ -0,0 +1,53 @@
#pragma once
#include <Processors/IAccumulatingTransform.h>
#include <Interpreters/Aggregator.h>
namespace DB
{
struct MergingAggregatedTransformParams
{
Aggregator::Params params;
Aggregator aggregator;
bool final;
MergingAggregatedTransformParams(const Aggregator::Params & params, bool final)
: params(params), aggregator(params), final(final) {}
Block getHeader() const { return aggregator.getHeader(final); }
};
using MergingAggregatedTransformParamsPtr = std::unique_ptr<MergingAggregatedTransformParams>;
/** A pre-aggregate stream of blocks in which each block is already aggregated.
* Aggregate functions in blocks should not be finalized so that their states can be merged.
*/
class MergingAggregatedTransform : public IAccumulatingTransform
{
public:
MergingAggregatedTransform(Block header, MergingAggregatedTransformParamsPtr params, size_t max_threads);
String getName() const override { return "MergingAggregatedTransform"; }
protected:
void consume(Chunk chunk) override;
Chunk generate() override;
private:
MergingAggregatedTransformParamsPtr params;
Logger * log = &Logger::get("MergingAggregatedTransform");
size_t max_threads;
AggregatedDataVariants data_variants;
Aggregator::BucketToBlocks bucket_to_blocks;
UInt64 total_input_rows = 0;
UInt64 total_input_blocks = 0;
BlocksList blocks;
BlocksList::iterator next_block;
bool consume_started = false;
bool generate_started = false;
};
}