grouing sets transformer instead of fork processor in aggregating transformer

This commit is contained in:
fanzhou 2021-08-23 14:28:49 +08:00 committed by Dmitry Novik
parent 912338629e
commit 20d8614dc7
8 changed files with 365 additions and 53 deletions

View File

@ -44,6 +44,7 @@
#include <Processors/QueryPlan/ExtremesStep.h> #include <Processors/QueryPlan/ExtremesStep.h>
#include <Processors/QueryPlan/FillingStep.h> #include <Processors/QueryPlan/FillingStep.h>
#include <Processors/QueryPlan/FilterStep.h> #include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/GroupingSetsStep.h>
#include <Processors/QueryPlan/JoinStep.h> #include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/LimitByStep.h> #include <Processors/QueryPlan/LimitByStep.h>
#include <Processors/QueryPlan/LimitStep.h> #include <Processors/QueryPlan/LimitStep.h>
@ -967,7 +968,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
bool aggregate_final = bool aggregate_final =
expressions.need_aggregate && expressions.need_aggregate &&
options.to_stage > QueryProcessingStage::WithMergeableState && options.to_stage > QueryProcessingStage::WithMergeableState &&
!query.group_by_with_totals && !query.group_by_with_rollup && !query.group_by_with_cube; !query.group_by_with_totals && !query.group_by_with_rollup && !query.group_by_with_cube && !query.group_by_with_grouping_sets;
// if (query.group_by_with_grouping_sets && query.group_by_with_totals) // if (query.group_by_with_grouping_sets && query.group_by_with_totals)
// throw Exception("WITH TOTALS and GROUPING SETS are not supported together", ErrorCodes::NOT_IMPLEMENTED); // throw Exception("WITH TOTALS and GROUPING SETS are not supported together", ErrorCodes::NOT_IMPLEMENTED);
@ -1262,6 +1263,8 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
executeRollupOrCube(query_plan, Modificator::ROLLUP); executeRollupOrCube(query_plan, Modificator::ROLLUP);
else if (query.group_by_with_cube) else if (query.group_by_with_cube)
executeRollupOrCube(query_plan, Modificator::CUBE); executeRollupOrCube(query_plan, Modificator::CUBE);
else if (query.group_by_with_grouping_sets)
executeRollupOrCube(query_plan, Modificator::GROUPING_SETS);
if ((query.group_by_with_rollup || query.group_by_with_cube || query.group_by_with_grouping_sets) && expressions.hasHaving()) if ((query.group_by_with_rollup || query.group_by_with_cube || query.group_by_with_grouping_sets) && expressions.hasHaving())
{ {
@ -2240,15 +2243,71 @@ void InterpreterSelectQuery::executeTotalsAndHaving(
void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modificator modificator) void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modificator modificator)
{ {
const auto & header_before_transform = query_plan.getCurrentDataStream().header; const auto & header_before_transform = query_plan.getCurrentDataStream().header;
ColumnNumbers keys; ColumnNumbers keys;
ColumnNumbers all_keys;
ColumnNumbersTwoDimension keys_vector;
auto & query = getSelectQuery();
if (query.group_by_with_grouping_sets)
{
std::set<size_t> keys_set;
for (const auto & aggregation_keys : query_analyzer->aggregationKeysList())
{
keys.clear();
for (const auto & key : aggregation_keys)
{
size_t key_name_pos = header_before_transform.getPositionByName(key.name);
if (!keys_set.count(key_name_pos))
{
keys_set.insert(key_name_pos);
}
keys.push_back(key_name_pos);
}
keys_vector.push_back(keys);
LOG_DEBUG(
log,
"execute aggregation with grouping sets add keys set of size {}",
keys.size());
}
all_keys.assign(keys_set.begin(), keys_set.end());
LOG_DEBUG(
log,
"execute aggregation with grouping sets add all keys of size {}",
all_keys.size());
}
else
{
for (const auto & key : query_analyzer->aggregationKeys()) for (const auto & key : query_analyzer->aggregationKeys())
{
keys.push_back(header_before_transform.getPositionByName(key.name)); keys.push_back(header_before_transform.getPositionByName(key.name));
}
}
const Settings & settings = context->getSettingsRef(); const Settings & settings = context->getSettingsRef();
Aggregator::Params params( std::shared_ptr<Aggregator::Params> params_ptr;
if (query.group_by_with_grouping_sets)
{
params_ptr = std::make_shared<Aggregator::Params>(
header_before_transform,
all_keys,
keys_vector,
query_analyzer->aggregates(),
false,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
0,
0,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression);
}
else
{
params_ptr = std::make_shared<Aggregator::Params>(
header_before_transform, header_before_transform,
keys, keys,
query_analyzer->aggregates(), query_analyzer->aggregates(),
@ -2264,14 +2323,17 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modific
settings.min_free_disk_space_for_temporary_data, settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions, settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression); settings.min_count_to_compile_aggregate_expression);
}
auto transform_params = std::make_shared<AggregatingTransformParams>(params, true); auto transform_params = std::make_shared<AggregatingTransformParams>(*params_ptr, true);
QueryPlanStepPtr step; QueryPlanStepPtr step;
if (modificator == Modificator::ROLLUP) if (modificator == Modificator::ROLLUP)
step = std::make_unique<RollupStep>(query_plan.getCurrentDataStream(), std::move(transform_params)); step = std::make_unique<RollupStep>(query_plan.getCurrentDataStream(), std::move(transform_params));
else else if (modificator == Modificator::CUBE)
step = std::make_unique<CubeStep>(query_plan.getCurrentDataStream(), std::move(transform_params)); step = std::make_unique<CubeStep>(query_plan.getCurrentDataStream(), std::move(transform_params));
else
step = std::make_unique<GroupingSetsStep>(query_plan.getCurrentDataStream(), std::move(transform_params));
query_plan.addStep(std::move(step)); query_plan.addStep(std::move(step));
} }

View File

@ -157,7 +157,8 @@ private:
enum class Modificator enum class Modificator
{ {
ROLLUP = 0, ROLLUP = 0,
CUBE = 1 CUBE = 1,
GROUPING_SETS = 2
}; };
void executeRollupOrCube(QueryPlan & query_plan, Modificator modificator); void executeRollupOrCube(QueryPlan & query_plan, Modificator modificator);

View File

@ -0,0 +1,46 @@
#include <Processors/QueryPlan/GroupingSetsStep.h>
#include <Processors/Transforms/GroupingSetsTransform.h>
#include "QueryPipeline/QueryPipelineBuilder.h"
namespace DB
{
static ITransformingStep::Traits getTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = true,
.preserves_number_of_streams = false,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = false,
}
};
}
GroupingSetsStep::GroupingSetsStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_)
: ITransformingStep(input_stream_, params_->getHeader(), getTraits())
, params(std::move(params_))
{
/// Aggregation keys are distinct
for (auto key : params->params.keys)
output_stream->distinct_columns.insert(params->params.src_header.getByPosition(key).name);
}
void GroupingSetsStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
pipeline.resize(1);
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type) -> ProcessorPtr
{
if (stream_type == QueryPipelineBuilder::StreamType::Totals)
return nullptr;
return std::make_shared<GroupingSetsTransform>(header, std::move(params));
});
}
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <Processors/QueryPlan/ITransformingStep.h>
#include <QueryPipeline/SizeLimits.h>
#include "QueryPipeline/QueryPipelineBuilder.h"
namespace DB
{
struct AggregatingTransformParams;
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
class GroupingSetsStep : public ITransformingStep
{
public:
GroupingSetsStep(const DataStream & input_stream_, AggregatingTransformParamsPtr params_);
String getName() const override { return "GroupingSets"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
private:
AggregatingTransformParamsPtr params;
};
}

View File

@ -0,0 +1,76 @@
#include <Processors/Transforms/GroupingSetsTransform.h>
#include <Processors/Transforms/TotalsHavingTransform.h>
namespace DB
{
GroupingSetsTransform::GroupingSetsTransform(Block header, AggregatingTransformParamsPtr params_)
: IAccumulatingTransform(std::move(header), params_->getHeader())
, params(std::move(params_))
, keys(params->params.keys)
, keys_vector(params->params.keys_vector)
, keys_vector_idx(0)
{
}
void GroupingSetsTransform::consume(Chunk chunk)
{
consumed_chunks.emplace_back(std::move(chunk));
}
Chunk GroupingSetsTransform::merge(Chunks && chunks, bool final)
{
BlocksList grouping_sets_blocks;
for (auto & chunk : chunks)
grouping_sets_blocks.emplace_back(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
auto grouping_sets_block = params->aggregator.mergeBlocks(grouping_sets_blocks, final);
auto num_rows = grouping_sets_block.rows();
return Chunk(grouping_sets_block.getColumns(), num_rows);
}
Chunk GroupingSetsTransform::generate()
{
if (!consumed_chunks.empty())
{
if (consumed_chunks.size() > 1)
grouping_sets_chunk = merge(std::move(consumed_chunks), false);
else
grouping_sets_chunk = std::move(consumed_chunks.front());
consumed_chunks.clear();
auto num_rows = grouping_sets_chunk.getNumRows();
current_columns = grouping_sets_chunk.getColumns();
current_zero_columns.clear();
for (auto key : keys)
current_zero_columns.emplace(key, current_columns[key]->cloneEmpty()->cloneResized(num_rows));
}
Chunk gen_chunk;
if (keys_vector_idx < keys_vector.size())
{
auto columns = current_columns;
std::set<size_t> key_vector(keys_vector[keys_vector_idx].begin(), keys_vector[keys_vector_idx].end());
for (auto key : keys)
{
if (!key_vector.contains(key))
columns[key] = current_zero_columns[key];
}
Chunks chunks;
chunks.emplace_back(std::move(columns), current_columns.front()->size());
gen_chunk = merge(std::move(chunks), false);
++keys_vector_idx;
}
finalizeChunk(gen_chunk);
return gen_chunk;
}
}

View File

@ -0,0 +1,35 @@
#pragma once
#include <Processors/IAccumulatingTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
namespace DB
{
class GroupingSetsTransform : public IAccumulatingTransform
{
public:
GroupingSetsTransform(Block header, AggregatingTransformParamsPtr params);
String getName() const override { return "GroupingSetsTransform"; }
protected:
void consume(Chunk chunk) override;
Chunk generate() override;
private:
AggregatingTransformParamsPtr params;
ColumnNumbers keys;
ColumnNumbersTwoDimension keys_vector;
Chunks consumed_chunks;
Chunk grouping_sets_chunk;
Columns current_columns;
std::unordered_map<size_t, ColumnPtr> current_zero_columns;
UInt64 keys_vector_idx = 0;
Poco::Logger * log = &Poco::Logger::get("GroupingSetsTransform");
Chunk merge(Chunks && chunks, bool final);
};
}

View File

@ -1,20 +1,69 @@
1 40 4 1 0 1 4500
2 80 4 1 0 3 4700
a 0 70 4 1 0 5 4900
b 0 50 4 1 0 7 5100
1 40 4 1 0 9 5300
2 80 4 1 1 0 4500
a 0 70 4 1 2 0 5100
b 0 50 4 1 3 0 4700
1 4 0 5300
1 5 0 4900
2 0 2 4600
2 0 4 4800
2 0 6 5000
2 0 8 5200
2 0 10 5400
2 1 0 5000
2 2 0 4600
2 3 0 5200
2 4 0 4800
2 5 0 5400
0 240 16 0 0 1 1 4500
1 40 4 0 0 2 2 4600
2 80 4 0 0 3 3 4700
a 0 70 4 0 0 4 4 4800
b 0 50 4 0 0 5 5 4900
1 40 4 0 0 6 6 5000
2 80 4 0 0 7 7 5100
a 0 70 4 0 0 8 8 5200
b 0 50 4 0 0 9 9 5300
0 0 10 10 5400
1 1 0 0 4500
1 2 0 0 5100
1 3 0 0 4700
1 4 0 0 5300
1 5 0 0 4900
2 1 0 0 5000
2 2 0 0 4600
2 3 0 0 5200
2 4 0 0 4800
2 5 0 0 5400
0 240 16 1 0 24500
1 1 4500
1 3 4700
1 5 4900
1 7 5100
1 9 5300
2 0 25000
2 2 4600
2 4 4800
2 6 5000
2 8 5200
2 10 5400
0 0 49500
1 0 24500
1 1 4500
1 3 4700
1 5 4900
1 7 5100
1 9 5300
2 0 25000
2 2 4600
2 4 4800
2 6 5000
2 8 5200
2 10 5400
0 0 49500

View File

@ -1,20 +1,38 @@
DROP TABLE IF EXISTS grouping_sets; DROP TABLE IF EXISTS grouping_sets;
CREATE TABLE grouping_sets(a String, b Int32, s Int32) ENGINE = Memory;
INSERT INTO grouping_sets VALUES ('a', 1, 10), ('a', 1, 15), ('a', 2, 20); CREATE TABLE grouping_sets(fact_1_id Int32, fact_2_id Int32, fact_3_id Int32, fact_4_id Int32, sales_value Int32) ENGINE = Memory;
INSERT INTO grouping_sets VALUES ('a', 2, 25), ('b', 1, 10), ('b', 1, 5);
INSERT INTO grouping_sets VALUES ('b', 2, 20), ('b', 2, 15);
SELECT a, b, sum(s), count() from grouping_sets GROUP BY GROUPING SETS(a, b) ORDER BY a, b; INSERT INTO grouping_sets
SELECT
number % 2 + 1 AS fact_1_id,
number % 5 + 1 AS fact_2_id,
number % 10 + 1 AS fact_3_id,
number % 10 + 1 AS fact_4_id,
number % 100 AS sales_value
FROM system.numbers limit 1000;
SELECT a, b, sum(s), count() from grouping_sets GROUP BY GROUPING SETS(a, b) WITH TOTALS ORDER BY a, b; SELECT fact_1_id, fact_2_id, fact_3_id, SUM(sales_value) AS sales_value from grouping_sets
GROUP BY GROUPING SETS((fact_1_id, fact_2_id), (fact_1_id, fact_3_id))
ORDER BY fact_1_id, fact_2_id, fact_3_id;
SELECT a, b, sum(s), count() from grouping_sets GROUP BY a, b WITH GROUPING SETS ORDER BY a, b; SELECT fact_1_id, fact_2_id, fact_3_id, fact_4_id, SUM(sales_value) AS sales_value from grouping_sets
GROUP BY GROUPING SETS((fact_1_id, fact_2_id), (fact_3_id, fact_4_id))
ORDER BY fact_1_id, fact_2_id, fact_3_id, fact_4_id;
SELECT a, b, sum(s), count() from grouping_sets GROUP BY a, b WITH GROUPING SETS WITH TOTALS ORDER BY a, b; SELECT
fact_1_id,
fact_3_id,
SUM(sales_value) AS sales_value
FROM grouping_sets
GROUP BY grouping sets((fact_1_id), (fact_1_id, fact_3_id)) WITH TOTALS
ORDER BY fact_1_id, fact_3_id;
-- not sure that always works SELECT
-- SET group_by_two_level_threshold = 1; fact_1_id,
-- SELECT a, b, sum(s), count() from grouping_sets GROUP BY a, b WITH GROUPING SETS ORDER BY a, b; fact_3_id,
SUM(sales_value) AS sales_value
FROM grouping_sets
GROUP BY grouping sets(fact_1_id, (fact_1_id, fact_3_id)) WITH TOTALS
ORDER BY fact_1_id, fact_3_id;
DROP TABLE grouping_sets; DROP TABLE grouping_sets;