Better care about grouping keys order for GROUPING SETS.

This commit is contained in:
Nikolai Kochetov 2024-08-27 15:06:43 +00:00
parent 9d9ef69196
commit 90cc619966
11 changed files with 517 additions and 194 deletions

View File

@ -59,6 +59,18 @@ class CompiledAggregateFunctionsHolder;
class NativeWriter;
struct OutputBlockColumns;
struct GroupingSetsParams
{
GroupingSetsParams() = default;
GroupingSetsParams(Names used_keys_, Names missing_keys_) : used_keys(std::move(used_keys_)), missing_keys(std::move(missing_keys_)) { }
Names used_keys;
Names missing_keys;
};
using GroupingSetsParamsList = std::vector<GroupingSetsParams>;
/** How are "total" values calculated with WITH TOTALS?
* (For more details, see TotalsHavingTransform.)
*

View File

@ -347,6 +347,27 @@ bool shouldIgnoreQuotaAndLimits(const StorageID & table_id)
return false;
}
GroupingSetsParamsList getAggregatorGroupingSetsParams(const NamesAndTypesLists & aggregation_keys_list, const Names & all_keys)
{
GroupingSetsParamsList result;
for (const auto & aggregation_keys : aggregation_keys_list)
{
NameSet keys;
for (const auto & key : aggregation_keys)
keys.insert(key.name);
Names missing_keys;
for (const auto & key : all_keys)
if (!keys.contains(key))
missing_keys.push_back(key);
result.emplace_back(aggregation_keys.getNames(), std::move(missing_keys));
}
return result;
}
}
InterpreterSelectQuery::InterpreterSelectQuery(
@ -2005,6 +2026,7 @@ static void executeMergeAggregatedImpl(
bool has_grouping_sets,
const Settings & settings,
const NamesAndTypesList & aggregation_keys,
const NamesAndTypesLists & aggregation_keys_list,
const AggregateDescriptions & aggregates,
bool should_produce_results_in_order_of_bucket_number,
SortDescription group_by_sort_description)
@ -2027,10 +2049,12 @@ static void executeMergeAggregatedImpl(
*/
Aggregator::Params params(keys, aggregates, overflow_row, settings.max_threads, settings.max_block_size, settings.min_hit_rate_to_use_consecutive_keys_optimization);
auto grouping_sets_params = getAggregatorGroupingSetsParams(aggregation_keys_list, keys);
auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
query_plan.getCurrentDataStream(),
params,
grouping_sets_params,
final,
/// Grouping sets don't work with distributed_aggregation_memory_efficient enabled (#43989)
settings.distributed_aggregation_memory_efficient && is_remote_storage && !has_grouping_sets,
@ -2651,30 +2675,6 @@ static Aggregator::Params getAggregatorParams(
};
}
static GroupingSetsParamsList getAggregatorGroupingSetsParams(const SelectQueryExpressionAnalyzer & query_analyzer, const Names & all_keys)
{
GroupingSetsParamsList result;
if (query_analyzer.useGroupingSetKey())
{
auto const & aggregation_keys_list = query_analyzer.aggregationKeysList();
for (const auto & aggregation_keys : aggregation_keys_list)
{
NameSet keys;
for (const auto & key : aggregation_keys)
keys.insert(key.name);
Names missing_keys;
for (const auto & key : all_keys)
if (!keys.contains(key))
missing_keys.push_back(key);
result.emplace_back(aggregation_keys.getNames(), std::move(missing_keys));
}
}
return result;
}
void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const ActionsAndProjectInputsFlagPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info)
{
executeExpression(query_plan, expression, "Before GROUP BY");
@ -2694,7 +2694,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes);
auto grouping_sets_params = getAggregatorGroupingSetsParams(*query_analyzer, keys);
auto grouping_sets_params = getAggregatorGroupingSetsParams(query_analyzer->aggregationKeysList(), keys);
SortDescription group_by_sort_description;
SortDescription sort_description_for_merging;
@ -2762,6 +2762,7 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool
has_grouping_sets,
context->getSettingsRef(),
query_analyzer->aggregationKeys(),
query_analyzer->aggregationKeysList(),
query_analyzer->aggregates(),
should_produce_results_in_order_of_bucket_number,
std::move(group_by_sort_description));

View File

@ -528,6 +528,7 @@ void addMergingAggregatedStep(QueryPlan & query_plan,
auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
query_plan.getCurrentDataStream(),
params,
aggregation_analysis_result.grouping_sets_parameters_list,
query_analysis_result.aggregate_final,
/// Grouping sets don't work with distributed_aggregation_memory_efficient enabled (#43989)
settings.distributed_aggregation_memory_efficient && (is_remote_storage || parallel_replicas_from_merge_tree) && !query_analysis_result.aggregation_with_rollup_or_cube_or_grouping_sets,

View File

@ -151,6 +151,61 @@ void AggregatingStep::applyOrder(SortDescription sort_description_for_merging_,
explicit_sorting_required_for_aggregation_in_order = false;
}
ActionsDAG AggregatingStep::makeCreatingMissingKeysForGroupingSetDAG(
const Block & in_header,
const Block & out_header,
const GroupingSetsParamsList & grouping_sets_params,
UInt64 group,
bool group_by_use_nulls)
{
/// Here we create a DAG which fills missing keys and adds `__grouping_set` column
ActionsDAG dag(in_header.getColumnsWithTypeAndName());
ActionsDAG::NodeRawConstPtrs outputs;
outputs.reserve(out_header.columns() + 1);
auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, group), 0);
const auto * grouping_node = &dag.addColumn(
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
grouping_node = &dag.materializeNode(*grouping_node);
outputs.push_back(grouping_node);
const auto & missing_columns = grouping_sets_params[group].missing_keys;
const auto & used_keys = grouping_sets_params[group].used_keys;
auto to_nullable_function = FunctionFactory::instance().get("toNullable", nullptr);
for (size_t i = 0; i < out_header.columns(); ++i)
{
const auto & col = out_header.getByPosition(i);
const auto missing_it = std::find_if(
missing_columns.begin(), missing_columns.end(), [&](const auto & missing_col) { return missing_col == col.name; });
const auto used_it = std::find_if(
used_keys.begin(), used_keys.end(), [&](const auto & used_col) { return used_col == col.name; });
if (missing_it != missing_columns.end())
{
auto column_with_default = col.column->cloneEmpty();
col.type->insertDefaultInto(*column_with_default);
column_with_default->finalize();
auto column = ColumnConst::create(std::move(column_with_default), 0);
const auto * node = &dag.addColumn({ColumnPtr(std::move(column)), col.type, col.name});
node = &dag.materializeNode(*node);
outputs.push_back(node);
}
else
{
const auto * column_node = dag.getOutputs()[in_header.getPositionByName(col.name)];
if (used_it != used_keys.end() && group_by_use_nulls && column_node->result_type->canBeInsideNullable())
outputs.push_back(&dag.addFunction(to_nullable_function, { column_node }, col.name));
else
outputs.push_back(column_node);
}
}
dag.getOutputs().swap(outputs);
return dag;
}
void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
{
QueryPipelineProcessorsCollector collector(pipeline, this);
@ -300,51 +355,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
{
const auto & header = ports[set_counter]->getHeader();
/// Here we create a DAG which fills missing keys and adds `__grouping_set` column
ActionsDAG dag(header.getColumnsWithTypeAndName());
ActionsDAG::NodeRawConstPtrs outputs;
outputs.reserve(output_header.columns() + 1);
auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, set_counter), 0);
const auto * grouping_node = &dag.addColumn(
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
grouping_node = &dag.materializeNode(*grouping_node);
outputs.push_back(grouping_node);
const auto & missing_columns = grouping_sets_params[set_counter].missing_keys;
const auto & used_keys = grouping_sets_params[set_counter].used_keys;
auto to_nullable_function = FunctionFactory::instance().get("toNullable", nullptr);
for (size_t i = 0; i < output_header.columns(); ++i)
{
auto & col = output_header.getByPosition(i);
const auto missing_it = std::find_if(
missing_columns.begin(), missing_columns.end(), [&](const auto & missing_col) { return missing_col == col.name; });
const auto used_it = std::find_if(
used_keys.begin(), used_keys.end(), [&](const auto & used_col) { return used_col == col.name; });
if (missing_it != missing_columns.end())
{
auto column_with_default = col.column->cloneEmpty();
col.type->insertDefaultInto(*column_with_default);
column_with_default->finalize();
auto column = ColumnConst::create(std::move(column_with_default), 0);
const auto * node = &dag.addColumn({ColumnPtr(std::move(column)), col.type, col.name});
node = &dag.materializeNode(*node);
outputs.push_back(node);
}
else
{
const auto * column_node = dag.getOutputs()[header.getPositionByName(col.name)];
if (used_it != used_keys.end() && group_by_use_nulls && column_node->result_type->canBeInsideNullable())
outputs.push_back(&dag.addFunction(to_nullable_function, { column_node }, col.name));
else
outputs.push_back(column_node);
}
}
dag.getOutputs().swap(outputs);
auto dag = makeCreatingMissingKeysForGroupingSetDAG(header, output_header, grouping_sets_params, set_counter, group_by_use_nulls);
auto expression = std::make_shared<ExpressionActions>(std::move(dag), settings.getActionsSettings());
auto transform = std::make_shared<ExpressionTransform>(header, expression);

View File

@ -7,18 +7,6 @@
namespace DB
{
struct GroupingSetsParams
{
GroupingSetsParams() = default;
GroupingSetsParams(Names used_keys_, Names missing_keys_) : used_keys(std::move(used_keys_)), missing_keys(std::move(missing_keys_)) { }
Names used_keys;
Names missing_keys;
};
using GroupingSetsParamsList = std::vector<GroupingSetsParams>;
Block appendGroupingSetColumn(Block header);
Block generateOutputHeader(const Block & input_header, const Names & keys, bool use_nulls);
@ -77,6 +65,13 @@ public:
/// Argument input_stream would be the second input (from projection).
std::unique_ptr<AggregatingProjectionStep> convertToAggregatingProjection(const DataStream & input_stream) const;
static ActionsDAG makeCreatingMissingKeysForGroupingSetDAG(
const Block & in_header,
const Block & out_header,
const GroupingSetsParamsList & grouping_sets_params,
UInt64 group,
bool group_by_use_nulls);
private:
void updateOutputStream() override;

View File

@ -42,6 +42,7 @@ static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_
MergingAggregatedStep::MergingAggregatedStep(
const DataStream & input_stream_,
Aggregator::Params params_,
GroupingSetsParamsList grouping_sets_params_,
bool final_,
bool memory_efficient_aggregation_,
size_t max_threads_,
@ -56,6 +57,7 @@ MergingAggregatedStep::MergingAggregatedStep(
MergingAggregatedTransform::appendGroupingIfNeeded(input_stream_.header, params_.getHeader(input_stream_.header, final_)),
getTraits(should_produce_results_in_order_of_bucket_number_))
, params(std::move(params_))
, grouping_sets_params(std::move(grouping_sets_params_))
, final(final_)
, memory_efficient_aggregation(memory_efficient_aggregation_)
, max_threads(max_threads_)
@ -94,14 +96,13 @@ void MergingAggregatedStep::applyOrder(SortDescription sort_description, DataStr
void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getHeader(), std::move(params), final);
if (memoryBoundMergingWillBeUsed())
{
if (input_streams.front().header.has("__grouping_set"))
if (input_streams.front().header.has("__grouping_set") || !grouping_sets_params.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Memory bound merging of aggregated results is not supported for grouping sets.");
auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getHeader(), std::move(params), final);
auto transform = std::make_shared<FinishAggregatingInOrderTransform>(
pipeline.getHeader(),
pipeline.getNumStreams(),
@ -136,18 +137,19 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c
pipeline.resize(1);
/// Now merge the aggregated blocks
pipeline.addSimpleTransform([&](const Block & header)
{ return std::make_shared<MergingAggregatedTransform>(header, transform_params, max_threads); });
auto transform = std::make_shared<MergingAggregatedTransform>(pipeline.getHeader(), params, final, grouping_sets_params, max_threads);
pipeline.addTransform(std::move(transform));
}
else
{
if (input_streams.front().header.has("__grouping_set"))
if (input_streams.front().header.has("__grouping_set") || !grouping_sets_params.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Memory efficient merging of aggregated results is not supported for grouping sets.");
auto num_merge_threads = memory_efficient_merge_threads
? memory_efficient_merge_threads
: max_threads;
auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getHeader(), std::move(params), final);
pipeline.addMergingAggregatedMemoryEfficientTransform(transform_params, num_merge_threads);
}

View File

@ -16,6 +16,7 @@ public:
MergingAggregatedStep(
const DataStream & input_stream_,
Aggregator::Params params_,
GroupingSetsParamsList grouping_sets_params_,
bool final_,
bool memory_efficient_aggregation_,
size_t max_threads_,
@ -43,6 +44,7 @@ private:
Aggregator::Params params;
GroupingSetsParamsList grouping_sets_params;
bool final;
bool memory_efficient_aggregation;
size_t max_threads;

View File

@ -1,7 +1,9 @@
#include <Processors/Transforms/MergingAggregatedTransform.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Common/logger_useful.h>
#include <Interpreters/ExpressionActions.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
@ -23,19 +25,93 @@ Block MergingAggregatedTransform::appendGroupingIfNeeded(const Block & in_header
return out_header;
}
MergingAggregatedTransform::~MergingAggregatedTransform() = default;
MergingAggregatedTransform::MergingAggregatedTransform(
Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_)
: IAccumulatingTransform(header_, appendGroupingIfNeeded(header_, params_->getHeader()))
, params(std::move(params_)), max_threads(max_threads_), has_grouping_sets(header_.has("__grouping_set"))
Block header_,
Aggregator::Params params,
bool final,
GroupingSetsParamsList grouping_sets_params,
size_t max_threads_)
: IAccumulatingTransform(header_, appendGroupingIfNeeded(header_, params.getHeader(header_, final)))
, max_threads(max_threads_)
{
if (!grouping_sets_params.empty())
{
if (!header_.has("__grouping_set"))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Cannot find __grouping_set column in header of MergingAggregatedTransform with grouping sets."
"Header {}", header_.dumpStructure());
auto in_header = header_;
in_header.erase(header_.getPositionByName("__grouping_set"));
auto out_header = params.getHeader(header_, final);
grouping_sets.reserve(grouping_sets_params.size());
for (const auto & grouping_set_params : grouping_sets_params)
{
size_t group = grouping_sets.size();
ActionsDAG reordering(in_header.getColumnsWithTypeAndName());
auto & outputs = reordering.getOutputs();
ActionsDAG::NodeRawConstPtrs new_outputs;
new_outputs.reserve(in_header.columns() + grouping_set_params.used_keys.size() - grouping_set_params.used_keys.size());
std::unordered_map<std::string_view, size_t> index;
for (size_t pos = 0; pos < outputs.size(); ++pos)
index.emplace(outputs[pos]->result_name, pos);
for (const auto & used_name : grouping_set_params.used_keys)
{
auto & idx = index[used_name];
new_outputs.push_back(outputs[idx]);
}
for (const auto & used_name : grouping_set_params.used_keys)
index[used_name] = outputs.size();
for (const auto & missing_name : grouping_set_params.missing_keys)
index[missing_name] = outputs.size();
for (const auto * output : outputs)
{
if (index[output->result_name] != outputs.size())
new_outputs.push_back(output);
}
outputs.swap(new_outputs);
Aggregator::Params set_params(grouping_set_params.used_keys,
params.aggregates,
params.overflow_row,
params.max_threads,
params.max_block_size,
params.min_hit_rate_to_use_consecutive_keys_optimization);
auto transform_params = std::make_shared<AggregatingTransformParams>(reordering.updateHeader(in_header), std::move(set_params), final);
auto creating = AggregatingStep::makeCreatingMissingKeysForGroupingSetDAG(
transform_params->getHeader(),
out_header,
grouping_sets_params, group, false);
auto & groupiung_set = grouping_sets.emplace_back();
groupiung_set.reordering_key_columns_actions = std::make_shared<ExpressionActions>(std::move(reordering));
groupiung_set.creating_missing_keys_actions = std::make_shared<ExpressionActions>(std::move(creating));
groupiung_set.params = std::move(transform_params);
}
}
else
{
auto & groupiung_set = grouping_sets.emplace_back();
groupiung_set.params = std::make_shared<AggregatingTransformParams>(header_, std::move(params), final);
}
}
void MergingAggregatedTransform::addBlock(Block block)
{
if (!has_grouping_sets)
if (grouping_sets.size() == 1)
{
auto & bucket_to_blocks = grouping_sets[0];
bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block));
grouping_sets[0].bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block));
return;
}
@ -49,13 +125,12 @@ void MergingAggregatedTransform::addBlock(Block block)
if (!grouping_column_typed)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected UInt64 column for __grouping_set, got {}", grouping_column->getName());
/// Enumerate groups and fill the selector.
std::map<UInt64, size_t> enumerated_groups;
IColumn::Selector selector;
const auto & grouping_data = grouping_column_typed->getData();
size_t num_rows = grouping_data.size();
UInt64 last_group = grouping_data[0];
UInt64 max_group = last_group;
for (size_t row = 1; row < num_rows; ++row)
{
auto group = grouping_data[row];
@ -65,32 +140,32 @@ void MergingAggregatedTransform::addBlock(Block block)
continue;
/// Optimization for single group.
if (enumerated_groups.empty())
{
if (selector.empty())
selector.reserve(num_rows);
enumerated_groups.emplace(last_group, enumerated_groups.size());
}
/// Fill the last equal range.
selector.resize_fill(row, enumerated_groups[last_group]);
/// Enumerate new group if did not see it before.
enumerated_groups.emplace(group, enumerated_groups.size());
selector.resize_fill(row, last_group);
last_group = group;
max_group = std::max(last_group, max_group);
}
if (max_group >= grouping_sets.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Invalid group number {}. Number of groups {}.", last_group, grouping_sets.size());
/// Optimization for single group.
if (enumerated_groups.empty())
if (selector.empty())
{
auto & bucket_to_blocks = grouping_sets[last_group];
bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block));
auto bucket = block.info.bucket_num;
grouping_sets[last_group].reordering_key_columns_actions->execute(block);
grouping_sets[last_group].bucket_to_blocks[bucket].emplace_back(std::move(block));
return;
}
/// Fill the last equal range.
selector.resize_fill(num_rows, enumerated_groups[last_group]);
selector.resize_fill(num_rows, last_group);
const size_t num_groups = enumerated_groups.size();
const size_t num_groups = max_group + 1;
Blocks splitted_blocks(num_groups);
for (size_t group_id = 0; group_id < num_groups; ++group_id)
@ -104,28 +179,28 @@ void MergingAggregatedTransform::addBlock(Block block)
splitted_blocks[group_id].getByPosition(col_idx_in_block).column = std::move(splitted_columns[group_id]);
}
for (auto [group, group_id] : enumerated_groups)
for (size_t group = 0; group < num_groups; ++group)
{
auto & bucket_to_blocks = grouping_sets[group];
auto & splitted_block = splitted_blocks[group_id];
auto & splitted_block = splitted_blocks[group];
splitted_block.info = block.info;
bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(splitted_block));
grouping_sets[group].reordering_key_columns_actions->execute(splitted_block);
grouping_sets[group].bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(splitted_block));
}
}
void MergingAggregatedTransform::appendGroupingColumn(UInt64 group, BlocksList & block_list)
{
auto grouping_position = getOutputPort().getHeader().getPositionByName("__grouping_set");
for (auto & block : block_list)
{
auto num_rows = block.rows();
ColumnWithTypeAndName col;
col.type = std::make_shared<DataTypeUInt64>();
col.name = "__grouping_set";
col.column = ColumnUInt64::create(num_rows, group);
block.insert(grouping_position, std::move(col));
}
}
// void MergingAggregatedTransform::appendGroupingColumn(UInt64 group, BlocksList & block_list)
// {
// auto grouping_position = getOutputPort().getHeader().getPositionByName("__grouping_set");
// for (auto & block : block_list)
// {
// auto num_rows = block.rows();
// ColumnWithTypeAndName col;
// col.type = std::make_shared<DataTypeUInt64>();
// col.name = "__grouping_set";
// col.column = ColumnUInt64::create(num_rows, group);
// block.insert(grouping_position, std::move(col));
// }
// }
void MergingAggregatedTransform::consume(Chunk chunk)
{
@ -170,6 +245,25 @@ void MergingAggregatedTransform::consume(Chunk chunk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk should have AggregatedChunkInfo in MergingAggregatedTransform.");
}
// static void debugBlock(const Block & block)
// {
// std::cerr << block.dumpStructure() << std::endl;
// size_t rows = block.rows();
// for (size_t row = 0; row < rows; ++row)
// {
// for (size_t col = 0; col < block.columns(); ++col)
// {
// const auto & c = block.getByPosition(col);
// if (c.column->isNumeric())
// std::cerr << c.column->getUInt(row) << ' ';
// else
// std::cerr << c.column->getDataAt(row).toString() << ' ';
// }
// std::cerr << std::endl;
// }
// }
Chunk MergingAggregatedTransform::generate()
{
if (!generate_started)
@ -180,15 +274,31 @@ Chunk MergingAggregatedTransform::generate()
/// Exception safety. Make iterator valid in case any method below throws.
next_block = blocks.begin();
for (auto & [group, group_blocks] : grouping_sets)
for (auto & grouping_set : grouping_sets)
{
/// TODO: this operation can be made async. Add async for IAccumulatingTransform.
auto & params = grouping_set.params;
auto & bucket_to_blocks = grouping_set.bucket_to_blocks;
AggregatedDataVariants data_variants;
params->aggregator.mergeBlocks(std::move(group_blocks), data_variants, max_threads, is_cancelled);
// std::cerr << "== Group " << group << std::endl;
// for (const auto & [buk, lst] : bucket_to_blocks)
// {
// std::cerr << ".. buk " << buk << std::endl;
// for (const auto & b : lst)
// debugBlock(b);
// }
/// TODO: this operation can be made async. Add async for IAccumulatingTransform.
params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads, is_cancelled);
auto merged_blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads);
if (has_grouping_sets)
appendGroupingColumn(group, merged_blocks);
if (grouping_set.creating_missing_keys_actions)
for (auto & block : merged_blocks)
grouping_set.creating_missing_keys_actions->execute(block);
// std::cerr << "== Merged " << group << std::endl;
// for (const auto & b : merged_blocks)
// debugBlock(b);
blocks.splice(blocks.end(), std::move(merged_blocks));
}

View File

@ -6,13 +6,24 @@
namespace DB
{
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
/** A pre-aggregate stream of blocks in which each block is already aggregated.
* Aggregate functions in blocks should not be finalized so that their states can be merged.
*/
class MergingAggregatedTransform : public IAccumulatingTransform
{
public:
MergingAggregatedTransform(Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_);
MergingAggregatedTransform(
Block header_,
Aggregator::Params params_,
bool final_,
GroupingSetsParamsList grouping_sets_params,
size_t max_threads_);
~MergingAggregatedTransform() override;
String getName() const override { return "MergingAggregatedTransform"; }
static Block appendGroupingIfNeeded(const Block & in_header, Block out_header);
@ -22,13 +33,19 @@ protected:
Chunk generate() override;
private:
AggregatingTransformParamsPtr params;
LoggerPtr log = getLogger("MergingAggregatedTransform");
size_t max_threads;
using GroupingSets = std::unordered_map<UInt64, Aggregator::BucketToBlocks>;
struct GroupingSet
{
Aggregator::BucketToBlocks bucket_to_blocks;
ExpressionActionsPtr reordering_key_columns_actions;
ExpressionActionsPtr creating_missing_keys_actions;
AggregatingTransformParamsPtr params;
};
using GroupingSets = std::vector<GroupingSet>;
GroupingSets grouping_sets;
const bool has_grouping_sets;
UInt64 total_input_rows = 0;
UInt64 total_input_blocks = 0;
@ -40,7 +57,6 @@ private:
bool generate_started = false;
void addBlock(Block block);
void appendGroupingColumn(UInt64 group, BlocksList & block_list);
};
}

View File

@ -11,57 +11,215 @@
0 6 4
1 10 4
2 14 4
['.']
['.','.']
['.','.','.']
['.','.','.','.']
['.','.','.','.','.']
['.','.','.','.','.','.']
['.','.','.','.','.','.','.']
['.','.','.','.','.','.','.','.']
['.','.','.','.','.','.','.','.','.']
['.']
['.']
['.','.']
['.','.']
['.','.','.']
['.','.','.']
['.','.','.','.']
['.','.','.','.']
['.','.','.','.','.']
['.','.','.','.','.']
['.','.','.','.','.','.']
['.','.','.','.','.','.']
['.','.','.','.','.','.','.']
['.','.','.','.','.','.','.']
['.','.','.','.','.','.','.','.']
['.','.','.','.','.','.','.','.']
['.','.','.','.','.','.','.','.','.']
['.','.','.','.','.','.','.','.','.']
1
2
3
4
5
6
7
8
9
1
1
2
2
3
3
4
4
5
5
6
6
7
7
8
8
9
9
-- { echo On }
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 ['.']
2 ['.','.']
2 ['.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 ['.']
1 ['.']
2 ['.','.']
2 ['.','.']
2 ['.','.','.']
2 ['.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 1
2 2
2 3
2 4
2 5
2 6
2 7
2 8
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 1
1 1
2 2
2 2
2 3
2 3
2 4
2 4
2 5
2 5
2 6
2 6
2 7
2 7
2 8
2 8
2 9
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 1
1 1
2 2
2 2
2 3
2 3
2 4
2 4
2 5
2 5
2 6
2 6
2 7
2 7
2 8
2 8
2 9
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
1 1
1 1
1 1
2 2
2 2
2 2
2 3
2 3
2 3
2 4
2 4
2 4
2 5
2 5
2 5
2 6
2 6
2 6
2 7
2 7
2 7
2 8
2 8
2 8
2 9
2 9
2 9
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 ['.']
2 ['.','.']
2 ['.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 ['.']
2 ['.']
2 ['.','.']
2 ['.','.']
2 ['.','.','.']
2 ['.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
2 ['.','.','.','.','.','.','.','.','.']
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 1
2 2
2 3
2 4
2 5
2 6
2 7
2 8
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 1
2 1
2 2
2 2
2 3
2 3
2 4
2 4
2 5
2 5
2 6
2 6
2 7
2 7
2 8
2 8
2 9
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 1
2 1
2 2
2 2
2 3
2 3
2 4
2 4
2 5
2 5
2 6
2 6
2 7
2 7
2 8
2 8
2 9
2 9
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
2 1
2 1
2 1
2 2
2 2
2 2
2 3
2 3
2 3
2 4
2 4
2 4
2 5
2 5
2 5
2 6
2 6
2 6
2 7
2 7
2 7
2 8
2 8
2 8
2 9
2 9
2 9

View File

@ -44,7 +44,22 @@ ORDER BY
sum_value ASC,
count_value ASC;
SELECT arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
set prefer_localhost_replica = 1;
-- { echo On }
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;