ClickHouse/dbms/src/Processors/Transforms/RollupTransform.cpp

50 lines
1.2 KiB
C++
Raw Normal View History

2019-03-19 13:36:19 +00:00
#include <Processors/Transforms/RollupTransform.h>
2019-04-08 13:02:19 +00:00
#include <Processors/Transforms/TotalsHavingTransform.h>
2019-03-19 11:08:33 +00:00
2019-03-19 13:36:19 +00:00
namespace DB
{
RollupTransform::RollupTransform(Block header, AggregatingTransformParamsPtr params_)
2019-04-18 12:43:13 +00:00
: IInflatingTransform(std::move(header), params_->getHeader())
2019-03-19 13:36:19 +00:00
, params(std::move(params_))
, keys(params->params.keys)
{
}
void RollupTransform::consume(Chunk chunk)
{
consumed_chunk = std::move(chunk);
2019-04-18 12:09:03 +00:00
last_removed_key = keys.size();
2019-03-19 13:36:19 +00:00
}
bool RollupTransform::canGenerate()
{
return consumed_chunk;
}
Chunk RollupTransform::generate()
{
auto gen_chunk = std::move(consumed_chunk);
if (last_removed_key)
{
--last_removed_key;
2019-04-18 12:09:03 +00:00
auto key = keys[last_removed_key];
2019-03-19 13:36:19 +00:00
auto num_rows = gen_chunk.getNumRows();
auto columns = gen_chunk.getColumns();
2019-04-18 12:09:03 +00:00
columns[key] = columns[key]->cloneEmpty()->cloneResized(num_rows);
2019-03-19 13:36:19 +00:00
BlocksList rollup_blocks = { getInputPort().getHeader().cloneWithColumns(columns) };
auto rollup_block = params->aggregator.mergeBlocks(rollup_blocks, false);
num_rows = rollup_block.rows();
consumed_chunk = Chunk(rollup_block.getColumns(), num_rows);
}
2019-04-08 13:02:19 +00:00
finalizeChunk(gen_chunk);
return gen_chunk;
2019-03-19 13:36:19 +00:00
}
}