Refector ROLLUP and CUBE

This commit is contained in:
Dmitry Novik 2022-06-30 10:13:58 +00:00
parent 1d15d72211
commit 98e9bc84d5
12 changed files with 221 additions and 103 deletions

View File

@ -1618,7 +1618,7 @@ static void executeMergeAggregatedImpl(
Aggregator::Params params(header_before_merge, keys, aggregates, overflow_row, settings.max_threads);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, final);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, final, false);
auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
query_plan.getCurrentDataStream(),
@ -2363,7 +2363,7 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modific
keys.push_back(header_before_transform.getPositionByName(key.name));
auto params = getAggregatorParams(query_ptr, *query_analyzer, *context, header_before_transform, keys, query_analyzer->aggregates(), false, settings, 0, 0);
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), true);
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), true, settings.group_by_use_nulls);
QueryPlanStepPtr step;
if (modificator == Modificator::ROLLUP)

View File

@ -65,17 +65,20 @@ Block generateOutputHeader(const Block & input_header)
return header;
}
Block generateOutputHeader(const Block & input_header, const ColumnNumbers & keys)
Block generateOutputHeader(const Block & input_header, const ColumnNumbers & keys, bool use_nulls)
{
auto header = appendGroupingSetColumn(input_header);
for (auto key : keys)
if (use_nulls)
{
auto & column = header.getByPosition(key + 1);
if (!isAggregateFunction(column.type))
for (auto key : keys)
{
column.type = makeNullable(column.type);
column.column = makeNullable(column.column);
auto & column = header.getByPosition(key + 1);
if (!isAggregateFunction(column.type))
{
column.type = makeNullable(column.type);
column.column = makeNullable(column.column);
}
}
}
return header;
@ -144,7 +147,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
* 1. Parallel aggregation is done, and the results should be merged in parallel.
* 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way.
*/
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), final);
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), final, false);
if (!grouping_sets_params.empty())
{
@ -194,7 +197,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
transform_params->params.intermediate_header,
transform_params->params.stats_collecting_params
};
auto transform_params_for_set = std::make_shared<AggregatingTransformParams>(std::move(params_for_set), final);
auto transform_params_for_set = std::make_shared<AggregatingTransformParams>(std::move(params_for_set), final, false);
if (streams > 1)
{

View File

@ -28,7 +28,7 @@ using GroupingSetsParamsList = std::vector<GroupingSetsParams>;
Block appendGroupingSetColumn(Block header);
Block generateOutputHeader(const Block & input_header);
Block generateOutputHeader(const Block & input_header, const ColumnNumbers & keys);
Block generateOutputHeader(const Block & input_header, const ColumnNumbers & keys, bool use_nulls);
/// Aggregation. See AggregatingTransform.
class AggregatingStep : public ITransformingStep

View File

@ -25,7 +25,7 @@ static ITransformingStep::Traits getTraits()
}
CubeStep::CubeStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_)
: ITransformingStep(input_stream_, appendGroupingSetColumn(params_->getHeader()), getTraits())
: ITransformingStep(input_stream_, generateOutputHeader(params_->getHeader(), params_->params.keys, params_->use_nulls), getTraits())
, keys_size(params_->params.keys_size)
, params(std::move(params_))
{

View File

@ -23,7 +23,7 @@ static ITransformingStep::Traits getTraits()
}
RollupStep::RollupStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_)
: ITransformingStep(input_stream_, generateOutputHeader(params_->getHeader(), params_->params.keys), getTraits())
: ITransformingStep(input_stream_, generateOutputHeader(params_->getHeader(), params_->params.keys, params_->use_nulls), getTraits())
, params(std::move(params_))
, keys_size(params->params.keys_size)
{

View File

@ -35,12 +35,14 @@ struct AggregatingTransformParams
Aggregator & aggregator;
bool final;
bool only_merge = false;
bool use_nulls = false;
AggregatingTransformParams(const Aggregator::Params & params_, bool final_)
AggregatingTransformParams(const Aggregator::Params & params_, bool final_, bool use_nulls_)
: params(params_)
, aggregator_list_ptr(std::make_shared<AggregatorList>())
, aggregator(*aggregator_list_ptr->emplace(aggregator_list_ptr->end(), params))
, final(final_)
, use_nulls(use_nulls_)
{
}

View File

@ -1,6 +1,7 @@
#include <Processors/Transforms/CubeTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include "Processors/Transforms/RollupTransform.h"
namespace DB
{
@ -10,57 +11,31 @@ namespace ErrorCodes
}
CubeTransform::CubeTransform(Block header, AggregatingTransformParamsPtr params_)
: IAccumulatingTransform(std::move(header), appendGroupingSetColumn(params_->getHeader()))
, params(std::move(params_))
, keys(params->params.keys)
: GroupByModifierTransform(std::move(header), params_)
, aggregates_mask(getAggregatesMask(params->getHeader(), params->params.aggregates))
{
if (keys.size() >= 8 * sizeof(mask))
throw Exception("Too many keys are used for CubeTransform.", ErrorCodes::LOGICAL_ERROR);
}
Chunk CubeTransform::merge(Chunks && chunks, bool final)
{
BlocksList rollup_blocks;
for (auto & chunk : chunks)
rollup_blocks.emplace_back(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
auto rollup_block = params->aggregator.mergeBlocks(rollup_blocks, final);
auto num_rows = rollup_block.rows();
return Chunk(rollup_block.getColumns(), num_rows);
}
void CubeTransform::consume(Chunk chunk)
{
consumed_chunks.emplace_back(std::move(chunk));
}
MutableColumnPtr getColumnWithDefaults(Block const & header, size_t key, size_t n);
Chunk CubeTransform::generate()
{
if (!consumed_chunks.empty())
{
if (consumed_chunks.size() > 1)
cube_chunk = merge(std::move(consumed_chunks), false);
else
cube_chunk = std::move(consumed_chunks.front());
mergeConsumed();
consumed_chunks.clear();
auto num_rows = cube_chunk.getNumRows();
auto num_rows = current_chunk.getNumRows();
mask = (static_cast<UInt64>(1) << keys.size()) - 1;
current_columns = cube_chunk.getColumns();
current_columns = current_chunk.getColumns();
current_zero_columns.clear();
current_zero_columns.reserve(keys.size());
auto const & input_header = getInputPort().getHeader();
for (auto key : keys)
current_zero_columns.emplace_back(getColumnWithDefaults(input_header, key, num_rows));
current_zero_columns.emplace_back(getColumnWithDefaults(key, num_rows));
}
auto gen_chunk = std::move(cube_chunk);
auto gen_chunk = std::move(current_chunk);
if (mask)
{
@ -75,7 +50,7 @@ Chunk CubeTransform::generate()
Chunks chunks;
chunks.emplace_back(std::move(columns), current_columns.front()->size());
cube_chunk = merge(std::move(chunks), false);
current_chunk = merge(std::move(chunks), !params->use_nulls, false);
}
finalizeChunk(gen_chunk, aggregates_mask);

View File

@ -1,6 +1,7 @@
#pragma once
#include <Processors/IInflatingTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/RollupTransform.h>
#include <Processors/Transforms/finalizeChunk.h>
@ -9,30 +10,23 @@ namespace DB
/// Takes blocks after grouping, with non-finalized aggregate functions.
/// Calculates all subsets of columns and aggregates over them.
class CubeTransform : public IAccumulatingTransform
class CubeTransform : public GroupByModifierTransform
{
public:
CubeTransform(Block header, AggregatingTransformParamsPtr params);
String getName() const override { return "CubeTransform"; }
protected:
void consume(Chunk chunk) override;
Chunk generate() override;
private:
AggregatingTransformParamsPtr params;
const ColumnNumbers keys;
const ColumnsMask aggregates_mask;
Chunks consumed_chunks;
Chunk cube_chunk;
Columns current_columns;
Columns current_zero_columns;
UInt64 mask = 0;
UInt64 grouping_set = 0;
Chunk merge(Chunks && chunks, bool final);
};
}

View File

@ -8,36 +8,71 @@
namespace DB
{
RollupTransform::RollupTransform(Block header, AggregatingTransformParamsPtr params_)
: IAccumulatingTransform(std::move(header), generateOutputHeader(params_->getHeader(), params_->params.keys))
GroupByModifierTransform::GroupByModifierTransform(Block header, AggregatingTransformParamsPtr params_)
: IAccumulatingTransform(std::move(header), generateOutputHeader(params_->getHeader(), params_->params.keys, params_->use_nulls))
, params(std::move(params_))
, keys(params->params.keys)
, aggregates_mask(getAggregatesMask(params->getHeader(), params->params.aggregates))
{
auto output_aggregator_params = params->params;
intermediate_header = getOutputPort().getHeader();
intermediate_header.erase(0);
output_aggregator_params.src_header = intermediate_header;
output_aggregator = std::make_unique<Aggregator>(output_aggregator_params);
if (params->use_nulls)
{
auto output_aggregator_params = params->params;
intermediate_header.erase(0);
output_aggregator_params.src_header = intermediate_header;
output_aggregator = std::make_unique<Aggregator>(output_aggregator_params);
}
}
void RollupTransform::consume(Chunk chunk)
void GroupByModifierTransform::consume(Chunk chunk)
{
consumed_chunks.emplace_back(std::move(chunk));
}
Chunk RollupTransform::merge(Chunks && chunks, bool is_input, bool final)
void GroupByModifierTransform::mergeConsumed()
{
BlocksList rollup_blocks;
auto header = is_input ? getInputPort().getHeader() : intermediate_header;
for (auto & chunk : chunks)
rollup_blocks.emplace_back(header.cloneWithColumns(chunk.detachColumns()));
if (consumed_chunks.size() > 1)
current_chunk = merge(std::move(consumed_chunks), true, false);
else
current_chunk = std::move(consumed_chunks.front());
auto rollup_block = is_input ? params->aggregator.mergeBlocks(rollup_blocks, final) : output_aggregator->mergeBlocks(rollup_blocks, final);
auto num_rows = rollup_block.rows();
return Chunk(rollup_block.getColumns(), num_rows);
size_t rows = current_chunk.getNumRows();
auto columns = current_chunk.getColumns();
if (params->use_nulls)
{
for (auto key : keys)
columns[key] = makeNullable(columns[key]);
}
current_chunk = Chunk{ columns, rows };
consumed_chunks.clear();
}
Chunk GroupByModifierTransform::merge(Chunks && chunks, bool is_input, bool final)
{
auto header = is_input ? getInputPort().getHeader() : intermediate_header;
BlocksList blocks;
for (auto & chunk : chunks)
blocks.emplace_back(header.cloneWithColumns(chunk.detachColumns()));
auto current_block = is_input ? params->aggregator.mergeBlocks(blocks, final) : output_aggregator->mergeBlocks(blocks, final);
auto num_rows = current_block.rows();
return Chunk(current_block.getColumns(), num_rows);
}
MutableColumnPtr GroupByModifierTransform::getColumnWithDefaults(size_t key, size_t n) const
{
auto const & col = intermediate_header.getByPosition(key);
auto result_column = col.column->cloneEmpty();
col.type->insertManyDefaultsInto(*result_column, n);
return result_column;
}
RollupTransform::RollupTransform(Block header, AggregatingTransformParamsPtr params_)
: GroupByModifierTransform(std::move(header), params_)
, aggregates_mask(getAggregatesMask(params->getHeader(), params->params.aggregates))
{}
MutableColumnPtr getColumnWithDefaults(Block const & header, size_t key, size_t n)
{
auto const & col = header.getByPosition(key);
@ -50,23 +85,11 @@ Chunk RollupTransform::generate()
{
if (!consumed_chunks.empty())
{
if (consumed_chunks.size() > 1)
rollup_chunk = merge(std::move(consumed_chunks), true, false);
else
rollup_chunk = std::move(consumed_chunks.front());
size_t rows = rollup_chunk.getNumRows();
auto columns = rollup_chunk.getColumns();
for (auto key : keys)
columns[key] = makeNullable(columns[key]);
rollup_chunk = Chunk{ columns, rows };
LOG_DEBUG(&Poco::Logger::get("RollupTransform"), "Chunk source: {}", rollup_chunk.dumpStructure());
consumed_chunks.clear();
mergeConsumed();
last_removed_key = keys.size();
}
auto gen_chunk = std::move(rollup_chunk);
auto gen_chunk = std::move(current_chunk);
if (last_removed_key)
{
@ -75,12 +98,11 @@ Chunk RollupTransform::generate()
auto num_rows = gen_chunk.getNumRows();
auto columns = gen_chunk.getColumns();
columns[key] = getColumnWithDefaults(intermediate_header, key, num_rows);
columns[key] = getColumnWithDefaults(key, num_rows);
Chunks chunks;
chunks.emplace_back(std::move(columns), num_rows);
rollup_chunk = merge(std::move(chunks), false, false);
LOG_DEBUG(&Poco::Logger::get("RollupTransform"), "Chunk generated: {}", rollup_chunk.dumpStructure());
current_chunk = merge(std::move(chunks), !params->use_nulls, false);
}
finalizeChunk(gen_chunk, aggregates_mask);

View File

@ -7,33 +7,47 @@
namespace DB
{
/// Takes blocks after grouping, with non-finalized aggregate functions.
/// Calculates subtotals and grand totals values for a set of columns.
class RollupTransform : public IAccumulatingTransform
struct GroupByModifierTransform : public IAccumulatingTransform
{
public:
RollupTransform(Block header, AggregatingTransformParamsPtr params);
String getName() const override { return "RollupTransform"; }
GroupByModifierTransform(Block header, AggregatingTransformParamsPtr params_);
protected:
void consume(Chunk chunk) override;
Chunk generate() override;
private:
void mergeConsumed();
Chunk merge(Chunks && chunks, bool is_input, bool final);
MutableColumnPtr getColumnWithDefaults(size_t key, size_t n) const;
AggregatingTransformParamsPtr params;
const ColumnNumbers keys;
const ColumnsMask aggregates_mask;
const ColumnNumbers & keys;
std::unique_ptr<Aggregator> output_aggregator;
Block intermediate_header;
Chunks consumed_chunks;
Chunk rollup_chunk;
Chunk current_chunk;
};
/// Takes blocks after grouping, with non-finalized aggregate functions.
/// Calculates subtotals and grand totals values for a set of columns.
class RollupTransform : public GroupByModifierTransform
{
public:
RollupTransform(Block header, AggregatingTransformParamsPtr params);
String getName() const override { return "RollupTransform"; }
protected:
Chunk generate() override;
private:
const ColumnsMask aggregates_mask;
size_t last_removed_key = 0;
size_t set_counter = 0;
Chunk merge(Chunks && chunks, bool is_input, bool final);
};
}

View File

@ -1,3 +1,9 @@
-- { echoOn }
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY ROLLUP(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=1;
0 0 0
0 \N 0
1 1 1
@ -19,3 +25,85 @@
9 1 9
9 \N 9
\N \N 45
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY ROLLUP(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=0;
0 0 0
0 0 0
0 0 45
1 0 1
1 1 1
2 0 2
2 0 2
3 0 3
3 1 3
4 0 4
4 0 4
5 0 5
5 1 5
6 0 6
6 0 6
7 0 7
7 1 7
8 0 8
8 0 8
9 0 9
9 1 9
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY CUBE(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=1;
0 0 0
0 \N 0
1 1 1
1 \N 1
2 0 2
2 \N 2
3 1 3
3 \N 3
4 0 4
4 \N 4
5 1 5
5 \N 5
6 0 6
6 \N 6
7 1 7
7 \N 7
8 0 8
8 \N 8
9 1 9
9 \N 9
\N 0 20
\N 1 25
\N \N 45
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY CUBE(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=0;
0 0 0
0 0 0
0 0 20
0 0 45
0 1 25
1 0 1
1 1 1
2 0 2
2 0 2
3 0 3
3 1 3
4 0 4
4 0 4
5 0 5
5 1 5
6 0 6
6 0 6
7 0 7
7 1 7
8 0 8
8 0 8
9 0 9
9 1 9

View File

@ -1,4 +1,24 @@
-- { echoOn }
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY ROLLUP(number, number % 2)
ORDER BY (number, number % 2, val);
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=1;
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY ROLLUP(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=0;
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY CUBE(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=1;
SELECT number, number % 2, sum(number) AS val
FROM numbers(10)
GROUP BY CUBE(number, number % 2)
ORDER BY (number, number % 2, val)
SETTINGS group_by_use_nulls=0;