mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #69108 from ClickHouse/backport/24.8/68744
Backport #68744 to 24.8: Fix merging of aggregated data for grouping sets.
This commit is contained in:
commit
8a1dfdee37
@ -59,6 +59,18 @@ class CompiledAggregateFunctionsHolder;
|
||||
class NativeWriter;
|
||||
struct OutputBlockColumns;
|
||||
|
||||
struct GroupingSetsParams
|
||||
{
|
||||
GroupingSetsParams() = default;
|
||||
|
||||
GroupingSetsParams(Names used_keys_, Names missing_keys_) : used_keys(std::move(used_keys_)), missing_keys(std::move(missing_keys_)) { }
|
||||
|
||||
Names used_keys;
|
||||
Names missing_keys;
|
||||
};
|
||||
|
||||
using GroupingSetsParamsList = std::vector<GroupingSetsParams>;
|
||||
|
||||
/** How are "total" values calculated with WITH TOTALS?
|
||||
* (For more details, see TotalsHavingTransform.)
|
||||
*
|
||||
|
@ -347,6 +347,27 @@ bool shouldIgnoreQuotaAndLimits(const StorageID & table_id)
|
||||
return false;
|
||||
}
|
||||
|
||||
GroupingSetsParamsList getAggregatorGroupingSetsParams(const NamesAndTypesLists & aggregation_keys_list, const Names & all_keys)
|
||||
{
|
||||
GroupingSetsParamsList result;
|
||||
|
||||
for (const auto & aggregation_keys : aggregation_keys_list)
|
||||
{
|
||||
NameSet keys;
|
||||
for (const auto & key : aggregation_keys)
|
||||
keys.insert(key.name);
|
||||
|
||||
Names missing_keys;
|
||||
for (const auto & key : all_keys)
|
||||
if (!keys.contains(key))
|
||||
missing_keys.push_back(key);
|
||||
|
||||
result.emplace_back(aggregation_keys.getNames(), std::move(missing_keys));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
InterpreterSelectQuery::InterpreterSelectQuery(
|
||||
@ -2005,13 +2026,12 @@ static void executeMergeAggregatedImpl(
|
||||
bool has_grouping_sets,
|
||||
const Settings & settings,
|
||||
const NamesAndTypesList & aggregation_keys,
|
||||
const NamesAndTypesLists & aggregation_keys_list,
|
||||
const AggregateDescriptions & aggregates,
|
||||
bool should_produce_results_in_order_of_bucket_number,
|
||||
SortDescription group_by_sort_description)
|
||||
{
|
||||
auto keys = aggregation_keys.getNames();
|
||||
if (has_grouping_sets)
|
||||
keys.insert(keys.begin(), "__grouping_set");
|
||||
|
||||
/** There are two modes of distributed aggregation.
|
||||
*
|
||||
@ -2029,10 +2049,12 @@ static void executeMergeAggregatedImpl(
|
||||
*/
|
||||
|
||||
Aggregator::Params params(keys, aggregates, overflow_row, settings.max_threads, settings.max_block_size, settings.min_hit_rate_to_use_consecutive_keys_optimization);
|
||||
auto grouping_sets_params = getAggregatorGroupingSetsParams(aggregation_keys_list, keys);
|
||||
|
||||
auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
|
||||
query_plan.getCurrentDataStream(),
|
||||
params,
|
||||
grouping_sets_params,
|
||||
final,
|
||||
/// Grouping sets don't work with distributed_aggregation_memory_efficient enabled (#43989)
|
||||
settings.distributed_aggregation_memory_efficient && is_remote_storage && !has_grouping_sets,
|
||||
@ -2653,30 +2675,6 @@ static Aggregator::Params getAggregatorParams(
|
||||
};
|
||||
}
|
||||
|
||||
static GroupingSetsParamsList getAggregatorGroupingSetsParams(const SelectQueryExpressionAnalyzer & query_analyzer, const Names & all_keys)
|
||||
{
|
||||
GroupingSetsParamsList result;
|
||||
if (query_analyzer.useGroupingSetKey())
|
||||
{
|
||||
auto const & aggregation_keys_list = query_analyzer.aggregationKeysList();
|
||||
|
||||
for (const auto & aggregation_keys : aggregation_keys_list)
|
||||
{
|
||||
NameSet keys;
|
||||
for (const auto & key : aggregation_keys)
|
||||
keys.insert(key.name);
|
||||
|
||||
Names missing_keys;
|
||||
for (const auto & key : all_keys)
|
||||
if (!keys.contains(key))
|
||||
missing_keys.push_back(key);
|
||||
|
||||
result.emplace_back(aggregation_keys.getNames(), std::move(missing_keys));
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const ActionsAndProjectInputsFlagPtr & expression, bool overflow_row, bool final, InputOrderInfoPtr group_by_info)
|
||||
{
|
||||
executeExpression(query_plan, expression, "Before GROUP BY");
|
||||
@ -2696,7 +2694,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
|
||||
settings.group_by_two_level_threshold,
|
||||
settings.group_by_two_level_threshold_bytes);
|
||||
|
||||
auto grouping_sets_params = getAggregatorGroupingSetsParams(*query_analyzer, keys);
|
||||
auto grouping_sets_params = getAggregatorGroupingSetsParams(query_analyzer->aggregationKeysList(), keys);
|
||||
|
||||
SortDescription group_by_sort_description;
|
||||
SortDescription sort_description_for_merging;
|
||||
@ -2764,6 +2762,7 @@ void InterpreterSelectQuery::executeMergeAggregated(QueryPlan & query_plan, bool
|
||||
has_grouping_sets,
|
||||
context->getSettingsRef(),
|
||||
query_analyzer->aggregationKeys(),
|
||||
query_analyzer->aggregationKeysList(),
|
||||
query_analyzer->aggregates(),
|
||||
should_produce_results_in_order_of_bucket_number,
|
||||
std::move(group_by_sort_description));
|
||||
|
@ -504,8 +504,6 @@ void addMergingAggregatedStep(QueryPlan & query_plan,
|
||||
*/
|
||||
|
||||
auto keys = aggregation_analysis_result.aggregation_keys;
|
||||
if (!aggregation_analysis_result.grouping_sets_parameters_list.empty())
|
||||
keys.insert(keys.begin(), "__grouping_set");
|
||||
|
||||
Aggregator::Params params(keys,
|
||||
aggregation_analysis_result.aggregate_descriptions,
|
||||
@ -530,6 +528,7 @@ void addMergingAggregatedStep(QueryPlan & query_plan,
|
||||
auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
|
||||
query_plan.getCurrentDataStream(),
|
||||
params,
|
||||
aggregation_analysis_result.grouping_sets_parameters_list,
|
||||
query_analysis_result.aggregate_final,
|
||||
/// Grouping sets don't work with distributed_aggregation_memory_efficient enabled (#43989)
|
||||
settings.distributed_aggregation_memory_efficient && (is_remote_storage || parallel_replicas_from_merge_tree) && !query_analysis_result.aggregation_with_rollup_or_cube_or_grouping_sets,
|
||||
|
@ -151,6 +151,61 @@ void AggregatingStep::applyOrder(SortDescription sort_description_for_merging_,
|
||||
explicit_sorting_required_for_aggregation_in_order = false;
|
||||
}
|
||||
|
||||
ActionsDAG AggregatingStep::makeCreatingMissingKeysForGroupingSetDAG(
|
||||
const Block & in_header,
|
||||
const Block & out_header,
|
||||
const GroupingSetsParamsList & grouping_sets_params,
|
||||
UInt64 group,
|
||||
bool group_by_use_nulls)
|
||||
{
|
||||
/// Here we create a DAG which fills missing keys and adds `__grouping_set` column
|
||||
ActionsDAG dag(in_header.getColumnsWithTypeAndName());
|
||||
ActionsDAG::NodeRawConstPtrs outputs;
|
||||
outputs.reserve(out_header.columns() + 1);
|
||||
|
||||
auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, group), 0);
|
||||
const auto * grouping_node = &dag.addColumn(
|
||||
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
|
||||
|
||||
grouping_node = &dag.materializeNode(*grouping_node);
|
||||
outputs.push_back(grouping_node);
|
||||
|
||||
const auto & missing_columns = grouping_sets_params[group].missing_keys;
|
||||
const auto & used_keys = grouping_sets_params[group].used_keys;
|
||||
|
||||
auto to_nullable_function = FunctionFactory::instance().get("toNullable", nullptr);
|
||||
for (size_t i = 0; i < out_header.columns(); ++i)
|
||||
{
|
||||
const auto & col = out_header.getByPosition(i);
|
||||
const auto missing_it = std::find_if(
|
||||
missing_columns.begin(), missing_columns.end(), [&](const auto & missing_col) { return missing_col == col.name; });
|
||||
const auto used_it = std::find_if(
|
||||
used_keys.begin(), used_keys.end(), [&](const auto & used_col) { return used_col == col.name; });
|
||||
if (missing_it != missing_columns.end())
|
||||
{
|
||||
auto column_with_default = col.column->cloneEmpty();
|
||||
col.type->insertDefaultInto(*column_with_default);
|
||||
column_with_default->finalize();
|
||||
|
||||
auto column = ColumnConst::create(std::move(column_with_default), 0);
|
||||
const auto * node = &dag.addColumn({ColumnPtr(std::move(column)), col.type, col.name});
|
||||
node = &dag.materializeNode(*node);
|
||||
outputs.push_back(node);
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto * column_node = dag.getOutputs()[in_header.getPositionByName(col.name)];
|
||||
if (used_it != used_keys.end() && group_by_use_nulls && column_node->result_type->canBeInsideNullable())
|
||||
outputs.push_back(&dag.addFunction(to_nullable_function, { column_node }, col.name));
|
||||
else
|
||||
outputs.push_back(column_node);
|
||||
}
|
||||
}
|
||||
|
||||
dag.getOutputs().swap(outputs);
|
||||
return dag;
|
||||
}
|
||||
|
||||
void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
|
||||
{
|
||||
QueryPipelineProcessorsCollector collector(pipeline, this);
|
||||
@ -300,51 +355,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
|
||||
{
|
||||
const auto & header = ports[set_counter]->getHeader();
|
||||
|
||||
/// Here we create a DAG which fills missing keys and adds `__grouping_set` column
|
||||
ActionsDAG dag(header.getColumnsWithTypeAndName());
|
||||
ActionsDAG::NodeRawConstPtrs outputs;
|
||||
outputs.reserve(output_header.columns() + 1);
|
||||
|
||||
auto grouping_col = ColumnConst::create(ColumnUInt64::create(1, set_counter), 0);
|
||||
const auto * grouping_node = &dag.addColumn(
|
||||
{ColumnPtr(std::move(grouping_col)), std::make_shared<DataTypeUInt64>(), "__grouping_set"});
|
||||
|
||||
grouping_node = &dag.materializeNode(*grouping_node);
|
||||
outputs.push_back(grouping_node);
|
||||
|
||||
const auto & missing_columns = grouping_sets_params[set_counter].missing_keys;
|
||||
const auto & used_keys = grouping_sets_params[set_counter].used_keys;
|
||||
|
||||
auto to_nullable_function = FunctionFactory::instance().get("toNullable", nullptr);
|
||||
for (size_t i = 0; i < output_header.columns(); ++i)
|
||||
{
|
||||
auto & col = output_header.getByPosition(i);
|
||||
const auto missing_it = std::find_if(
|
||||
missing_columns.begin(), missing_columns.end(), [&](const auto & missing_col) { return missing_col == col.name; });
|
||||
const auto used_it = std::find_if(
|
||||
used_keys.begin(), used_keys.end(), [&](const auto & used_col) { return used_col == col.name; });
|
||||
if (missing_it != missing_columns.end())
|
||||
{
|
||||
auto column_with_default = col.column->cloneEmpty();
|
||||
col.type->insertDefaultInto(*column_with_default);
|
||||
column_with_default->finalize();
|
||||
|
||||
auto column = ColumnConst::create(std::move(column_with_default), 0);
|
||||
const auto * node = &dag.addColumn({ColumnPtr(std::move(column)), col.type, col.name});
|
||||
node = &dag.materializeNode(*node);
|
||||
outputs.push_back(node);
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto * column_node = dag.getOutputs()[header.getPositionByName(col.name)];
|
||||
if (used_it != used_keys.end() && group_by_use_nulls && column_node->result_type->canBeInsideNullable())
|
||||
outputs.push_back(&dag.addFunction(to_nullable_function, { column_node }, col.name));
|
||||
else
|
||||
outputs.push_back(column_node);
|
||||
}
|
||||
}
|
||||
|
||||
dag.getOutputs().swap(outputs);
|
||||
auto dag = makeCreatingMissingKeysForGroupingSetDAG(header, output_header, grouping_sets_params, set_counter, group_by_use_nulls);
|
||||
auto expression = std::make_shared<ExpressionActions>(std::move(dag), settings.getActionsSettings());
|
||||
auto transform = std::make_shared<ExpressionTransform>(header, expression);
|
||||
|
||||
|
@ -7,18 +7,6 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct GroupingSetsParams
|
||||
{
|
||||
GroupingSetsParams() = default;
|
||||
|
||||
GroupingSetsParams(Names used_keys_, Names missing_keys_) : used_keys(std::move(used_keys_)), missing_keys(std::move(missing_keys_)) { }
|
||||
|
||||
Names used_keys;
|
||||
Names missing_keys;
|
||||
};
|
||||
|
||||
using GroupingSetsParamsList = std::vector<GroupingSetsParams>;
|
||||
|
||||
Block appendGroupingSetColumn(Block header);
|
||||
Block generateOutputHeader(const Block & input_header, const Names & keys, bool use_nulls);
|
||||
|
||||
@ -77,6 +65,13 @@ public:
|
||||
/// Argument input_stream would be the second input (from projection).
|
||||
std::unique_ptr<AggregatingProjectionStep> convertToAggregatingProjection(const DataStream & input_stream) const;
|
||||
|
||||
static ActionsDAG makeCreatingMissingKeysForGroupingSetDAG(
|
||||
const Block & in_header,
|
||||
const Block & out_header,
|
||||
const GroupingSetsParamsList & grouping_sets_params,
|
||||
UInt64 group,
|
||||
bool group_by_use_nulls);
|
||||
|
||||
private:
|
||||
void updateOutputStream() override;
|
||||
|
||||
|
@ -10,6 +10,11 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
static bool memoryBoundMergingWillBeUsed(
|
||||
const DataStream & input_stream,
|
||||
bool memory_bound_merging_of_aggregation_results_enabled,
|
||||
@ -37,6 +42,7 @@ static ITransformingStep::Traits getTraits(bool should_produce_results_in_order_
|
||||
MergingAggregatedStep::MergingAggregatedStep(
|
||||
const DataStream & input_stream_,
|
||||
Aggregator::Params params_,
|
||||
GroupingSetsParamsList grouping_sets_params_,
|
||||
bool final_,
|
||||
bool memory_efficient_aggregation_,
|
||||
size_t max_threads_,
|
||||
@ -48,9 +54,10 @@ MergingAggregatedStep::MergingAggregatedStep(
|
||||
bool memory_bound_merging_of_aggregation_results_enabled_)
|
||||
: ITransformingStep(
|
||||
input_stream_,
|
||||
params_.getHeader(input_stream_.header, final_),
|
||||
MergingAggregatedTransform::appendGroupingIfNeeded(input_stream_.header, params_.getHeader(input_stream_.header, final_)),
|
||||
getTraits(should_produce_results_in_order_of_bucket_number_))
|
||||
, params(std::move(params_))
|
||||
, grouping_sets_params(std::move(grouping_sets_params_))
|
||||
, final(final_)
|
||||
, memory_efficient_aggregation(memory_efficient_aggregation_)
|
||||
, max_threads(max_threads_)
|
||||
@ -89,10 +96,13 @@ void MergingAggregatedStep::applyOrder(SortDescription sort_description, DataStr
|
||||
|
||||
void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
|
||||
{
|
||||
auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getHeader(), std::move(params), final);
|
||||
|
||||
if (memoryBoundMergingWillBeUsed())
|
||||
{
|
||||
if (input_streams.front().header.has("__grouping_set") || !grouping_sets_params.empty())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Memory bound merging of aggregated results is not supported for grouping sets.");
|
||||
|
||||
auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getHeader(), std::move(params), final);
|
||||
auto transform = std::make_shared<FinishAggregatingInOrderTransform>(
|
||||
pipeline.getHeader(),
|
||||
pipeline.getNumStreams(),
|
||||
@ -127,15 +137,19 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c
|
||||
pipeline.resize(1);
|
||||
|
||||
/// Now merge the aggregated blocks
|
||||
pipeline.addSimpleTransform([&](const Block & header)
|
||||
{ return std::make_shared<MergingAggregatedTransform>(header, transform_params, max_threads); });
|
||||
auto transform = std::make_shared<MergingAggregatedTransform>(pipeline.getHeader(), params, final, grouping_sets_params, max_threads);
|
||||
pipeline.addTransform(std::move(transform));
|
||||
}
|
||||
else
|
||||
{
|
||||
if (input_streams.front().header.has("__grouping_set") || !grouping_sets_params.empty())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Memory efficient merging of aggregated results is not supported for grouping sets.");
|
||||
auto num_merge_threads = memory_efficient_merge_threads
|
||||
? memory_efficient_merge_threads
|
||||
: max_threads;
|
||||
|
||||
auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getHeader(), std::move(params), final);
|
||||
pipeline.addMergingAggregatedMemoryEfficientTransform(transform_params, num_merge_threads);
|
||||
}
|
||||
|
||||
@ -154,7 +168,9 @@ void MergingAggregatedStep::describeActions(JSONBuilder::JSONMap & map) const
|
||||
|
||||
void MergingAggregatedStep::updateOutputStream()
|
||||
{
|
||||
output_stream = createOutputStream(input_streams.front(), params.getHeader(input_streams.front().header, final), getDataStreamTraits());
|
||||
const auto & in_header = input_streams.front().header;
|
||||
output_stream = createOutputStream(input_streams.front(),
|
||||
MergingAggregatedTransform::appendGroupingIfNeeded(in_header, params.getHeader(in_header, final)), getDataStreamTraits());
|
||||
if (is_order_overwritten) /// overwrite order again
|
||||
applyOrder(group_by_sort_description, overwritten_sort_scope);
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ public:
|
||||
MergingAggregatedStep(
|
||||
const DataStream & input_stream_,
|
||||
Aggregator::Params params_,
|
||||
GroupingSetsParamsList grouping_sets_params_,
|
||||
bool final_,
|
||||
bool memory_efficient_aggregation_,
|
||||
size_t max_threads_,
|
||||
@ -43,6 +44,7 @@ private:
|
||||
|
||||
|
||||
Aggregator::Params params;
|
||||
GroupingSetsParamsList grouping_sets_params;
|
||||
bool final;
|
||||
bool memory_efficient_aggregation;
|
||||
size_t max_threads;
|
||||
|
@ -1,7 +1,10 @@
|
||||
#include <Processors/Transforms/MergingAggregatedTransform.h>
|
||||
#include <Processors/Transforms/AggregatingTransform.h>
|
||||
#include <Processors/Transforms/AggregatingInOrderTransform.h>
|
||||
#include <Processors/QueryPlan/AggregatingStep.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Interpreters/ExpressionActions.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -10,11 +13,192 @@ namespace ErrorCodes
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
MergingAggregatedTransform::MergingAggregatedTransform(
|
||||
Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_)
|
||||
: IAccumulatingTransform(std::move(header_), params_->getHeader())
|
||||
, params(std::move(params_)), max_threads(max_threads_)
|
||||
Block MergingAggregatedTransform::appendGroupingIfNeeded(const Block & in_header, Block out_header)
|
||||
{
|
||||
/// __grouping_set is neither GROUP BY key nor an aggregate function.
|
||||
/// It behaves like a GROUP BY key, but we cannot append it to keys
|
||||
/// because it changes hashing method and buckets for two level aggregation.
|
||||
/// Now, this column is processed "manually" by merging each group separately.
|
||||
if (in_header.has("__grouping_set"))
|
||||
out_header.insert(0, in_header.getByName("__grouping_set"));
|
||||
|
||||
return out_header;
|
||||
}
|
||||
|
||||
/// We should keep the order for GROUPING SET keys.
|
||||
/// Initiator creates a separate Aggregator for every group, so should we do here.
|
||||
/// Otherwise, two-level aggregation will split the data into different buckets,
|
||||
/// and the result may have duplicating rows.
|
||||
static ActionsDAG makeReorderingActions(const Block & in_header, const GroupingSetsParams & params)
|
||||
{
|
||||
ActionsDAG reordering(in_header.getColumnsWithTypeAndName());
|
||||
auto & outputs = reordering.getOutputs();
|
||||
ActionsDAG::NodeRawConstPtrs new_outputs;
|
||||
new_outputs.reserve(in_header.columns() + params.used_keys.size() - params.used_keys.size());
|
||||
|
||||
std::unordered_map<std::string_view, size_t> index;
|
||||
for (size_t pos = 0; pos < outputs.size(); ++pos)
|
||||
index.emplace(outputs[pos]->result_name, pos);
|
||||
|
||||
for (const auto & used_name : params.used_keys)
|
||||
{
|
||||
auto & idx = index[used_name];
|
||||
new_outputs.push_back(outputs[idx]);
|
||||
}
|
||||
|
||||
for (const auto & used_name : params.used_keys)
|
||||
index[used_name] = outputs.size();
|
||||
for (const auto & missing_name : params.missing_keys)
|
||||
index[missing_name] = outputs.size();
|
||||
|
||||
for (const auto * output : outputs)
|
||||
{
|
||||
if (index[output->result_name] != outputs.size())
|
||||
new_outputs.push_back(output);
|
||||
}
|
||||
|
||||
outputs.swap(new_outputs);
|
||||
return reordering;
|
||||
}
|
||||
|
||||
MergingAggregatedTransform::~MergingAggregatedTransform() = default;
|
||||
|
||||
MergingAggregatedTransform::MergingAggregatedTransform(
|
||||
Block header_,
|
||||
Aggregator::Params params,
|
||||
bool final,
|
||||
GroupingSetsParamsList grouping_sets_params,
|
||||
size_t max_threads_)
|
||||
: IAccumulatingTransform(header_, appendGroupingIfNeeded(header_, params.getHeader(header_, final)))
|
||||
, max_threads(max_threads_)
|
||||
{
|
||||
if (!grouping_sets_params.empty())
|
||||
{
|
||||
if (!header_.has("__grouping_set"))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Cannot find __grouping_set column in header of MergingAggregatedTransform with grouping sets."
|
||||
"Header {}", header_.dumpStructure());
|
||||
|
||||
auto in_header = header_;
|
||||
in_header.erase(header_.getPositionByName("__grouping_set"));
|
||||
auto out_header = params.getHeader(header_, final);
|
||||
|
||||
grouping_sets.reserve(grouping_sets_params.size());
|
||||
for (const auto & grouping_set_params : grouping_sets_params)
|
||||
{
|
||||
size_t group = grouping_sets.size();
|
||||
|
||||
auto reordering = makeReorderingActions(in_header, grouping_set_params);
|
||||
|
||||
Aggregator::Params set_params(grouping_set_params.used_keys,
|
||||
params.aggregates,
|
||||
params.overflow_row,
|
||||
params.max_threads,
|
||||
params.max_block_size,
|
||||
params.min_hit_rate_to_use_consecutive_keys_optimization);
|
||||
|
||||
auto transform_params = std::make_shared<AggregatingTransformParams>(reordering.updateHeader(in_header), std::move(set_params), final);
|
||||
|
||||
auto creating = AggregatingStep::makeCreatingMissingKeysForGroupingSetDAG(
|
||||
transform_params->getHeader(),
|
||||
out_header,
|
||||
grouping_sets_params, group, false);
|
||||
|
||||
auto & groupiung_set = grouping_sets.emplace_back();
|
||||
groupiung_set.reordering_key_columns_actions = std::make_shared<ExpressionActions>(std::move(reordering));
|
||||
groupiung_set.creating_missing_keys_actions = std::make_shared<ExpressionActions>(std::move(creating));
|
||||
groupiung_set.params = std::move(transform_params);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
auto & groupiung_set = grouping_sets.emplace_back();
|
||||
groupiung_set.params = std::make_shared<AggregatingTransformParams>(header_, std::move(params), final);
|
||||
}
|
||||
}
|
||||
|
||||
void MergingAggregatedTransform::addBlock(Block block)
|
||||
{
|
||||
if (grouping_sets.size() == 1)
|
||||
{
|
||||
auto bucket = block.info.bucket_num;
|
||||
if (grouping_sets[0].reordering_key_columns_actions)
|
||||
grouping_sets[0].reordering_key_columns_actions->execute(block);
|
||||
grouping_sets[0].bucket_to_blocks[bucket].emplace_back(std::move(block));
|
||||
return;
|
||||
}
|
||||
|
||||
auto grouping_position = block.getPositionByName("__grouping_set");
|
||||
auto grouping_column = block.getByPosition(grouping_position).column;
|
||||
block.erase(grouping_position);
|
||||
|
||||
/// Split a block by __grouping_set values.
|
||||
|
||||
const auto * grouping_column_typed = typeid_cast<const ColumnUInt64 *>(grouping_column.get());
|
||||
if (!grouping_column_typed)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected UInt64 column for __grouping_set, got {}", grouping_column->getName());
|
||||
|
||||
IColumn::Selector selector;
|
||||
|
||||
const auto & grouping_data = grouping_column_typed->getData();
|
||||
size_t num_rows = grouping_data.size();
|
||||
UInt64 last_group = grouping_data[0];
|
||||
UInt64 max_group = last_group;
|
||||
for (size_t row = 1; row < num_rows; ++row)
|
||||
{
|
||||
auto group = grouping_data[row];
|
||||
|
||||
/// Optimization for equal ranges.
|
||||
if (last_group == group)
|
||||
continue;
|
||||
|
||||
/// Optimization for single group.
|
||||
if (selector.empty())
|
||||
selector.reserve(num_rows);
|
||||
|
||||
/// Fill the last equal range.
|
||||
selector.resize_fill(row, last_group);
|
||||
last_group = group;
|
||||
max_group = std::max(last_group, max_group);
|
||||
}
|
||||
|
||||
if (max_group >= grouping_sets.size())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR,
|
||||
"Invalid group number {}. Number of groups {}.", last_group, grouping_sets.size());
|
||||
|
||||
/// Optimization for single group.
|
||||
if (selector.empty())
|
||||
{
|
||||
auto bucket = block.info.bucket_num;
|
||||
grouping_sets[last_group].reordering_key_columns_actions->execute(block);
|
||||
grouping_sets[last_group].bucket_to_blocks[bucket].emplace_back(std::move(block));
|
||||
return;
|
||||
}
|
||||
|
||||
/// Fill the last equal range.
|
||||
selector.resize_fill(num_rows, last_group);
|
||||
|
||||
const size_t num_groups = max_group + 1;
|
||||
Blocks splitted_blocks(num_groups);
|
||||
|
||||
for (size_t group_id = 0; group_id < num_groups; ++group_id)
|
||||
splitted_blocks[group_id] = block.cloneEmpty();
|
||||
|
||||
size_t columns_in_block = block.columns();
|
||||
for (size_t col_idx_in_block = 0; col_idx_in_block < columns_in_block; ++col_idx_in_block)
|
||||
{
|
||||
MutableColumns splitted_columns = block.getByPosition(col_idx_in_block).column->scatter(num_groups, selector);
|
||||
for (size_t group_id = 0; group_id < num_groups; ++group_id)
|
||||
splitted_blocks[group_id].getByPosition(col_idx_in_block).column = std::move(splitted_columns[group_id]);
|
||||
}
|
||||
|
||||
for (size_t group = 0; group < num_groups; ++group)
|
||||
{
|
||||
auto & splitted_block = splitted_blocks[group];
|
||||
splitted_block.info = block.info;
|
||||
grouping_sets[group].reordering_key_columns_actions->execute(splitted_block);
|
||||
grouping_sets[group].bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(splitted_block));
|
||||
}
|
||||
}
|
||||
|
||||
void MergingAggregatedTransform::consume(Chunk chunk)
|
||||
@ -46,7 +230,7 @@ void MergingAggregatedTransform::consume(Chunk chunk)
|
||||
block.info.is_overflows = agg_info->is_overflows;
|
||||
block.info.bucket_num = agg_info->bucket_num;
|
||||
|
||||
bucket_to_blocks[agg_info->bucket_num].emplace_back(std::move(block));
|
||||
addBlock(std::move(block));
|
||||
}
|
||||
else if (chunk.getChunkInfos().get<ChunkInfoWithAllocatedBytes>())
|
||||
{
|
||||
@ -54,7 +238,7 @@ void MergingAggregatedTransform::consume(Chunk chunk)
|
||||
block.info.is_overflows = false;
|
||||
block.info.bucket_num = -1;
|
||||
|
||||
bucket_to_blocks[block.info.bucket_num].emplace_back(std::move(block));
|
||||
addBlock(std::move(block));
|
||||
}
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk should have AggregatedChunkInfo in MergingAggregatedTransform.");
|
||||
@ -70,9 +254,23 @@ Chunk MergingAggregatedTransform::generate()
|
||||
/// Exception safety. Make iterator valid in case any method below throws.
|
||||
next_block = blocks.begin();
|
||||
|
||||
/// TODO: this operation can be made async. Add async for IAccumulatingTransform.
|
||||
params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads, is_cancelled);
|
||||
blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads);
|
||||
for (auto & grouping_set : grouping_sets)
|
||||
{
|
||||
auto & params = grouping_set.params;
|
||||
auto & bucket_to_blocks = grouping_set.bucket_to_blocks;
|
||||
AggregatedDataVariants data_variants;
|
||||
|
||||
/// TODO: this operation can be made async. Add async for IAccumulatingTransform.
|
||||
params->aggregator.mergeBlocks(std::move(bucket_to_blocks), data_variants, max_threads, is_cancelled);
|
||||
auto merged_blocks = params->aggregator.convertToBlocks(data_variants, params->final, max_threads);
|
||||
|
||||
if (grouping_set.creating_missing_keys_actions)
|
||||
for (auto & block : merged_blocks)
|
||||
grouping_set.creating_missing_keys_actions->execute(block);
|
||||
|
||||
blocks.splice(blocks.end(), std::move(merged_blocks));
|
||||
}
|
||||
|
||||
next_block = blocks.begin();
|
||||
}
|
||||
|
||||
|
@ -6,26 +6,46 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ExpressionActions;
|
||||
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
|
||||
|
||||
/** A pre-aggregate stream of blocks in which each block is already aggregated.
|
||||
* Aggregate functions in blocks should not be finalized so that their states can be merged.
|
||||
*/
|
||||
class MergingAggregatedTransform : public IAccumulatingTransform
|
||||
{
|
||||
public:
|
||||
MergingAggregatedTransform(Block header_, AggregatingTransformParamsPtr params_, size_t max_threads_);
|
||||
MergingAggregatedTransform(
|
||||
Block header_,
|
||||
Aggregator::Params params_,
|
||||
bool final_,
|
||||
GroupingSetsParamsList grouping_sets_params,
|
||||
size_t max_threads_);
|
||||
|
||||
~MergingAggregatedTransform() override;
|
||||
|
||||
String getName() const override { return "MergingAggregatedTransform"; }
|
||||
|
||||
static Block appendGroupingIfNeeded(const Block & in_header, Block out_header);
|
||||
|
||||
protected:
|
||||
void consume(Chunk chunk) override;
|
||||
Chunk generate() override;
|
||||
|
||||
private:
|
||||
AggregatingTransformParamsPtr params;
|
||||
LoggerPtr log = getLogger("MergingAggregatedTransform");
|
||||
size_t max_threads;
|
||||
|
||||
AggregatedDataVariants data_variants;
|
||||
Aggregator::BucketToBlocks bucket_to_blocks;
|
||||
struct GroupingSet
|
||||
{
|
||||
Aggregator::BucketToBlocks bucket_to_blocks;
|
||||
ExpressionActionsPtr reordering_key_columns_actions;
|
||||
ExpressionActionsPtr creating_missing_keys_actions;
|
||||
AggregatingTransformParamsPtr params;
|
||||
};
|
||||
|
||||
using GroupingSets = std::vector<GroupingSet>;
|
||||
GroupingSets grouping_sets;
|
||||
|
||||
UInt64 total_input_rows = 0;
|
||||
UInt64 total_input_blocks = 0;
|
||||
@ -35,6 +55,8 @@ private:
|
||||
|
||||
bool consume_started = false;
|
||||
bool generate_started = false;
|
||||
|
||||
void addBlock(Block block);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -11,3 +11,215 @@
|
||||
0 6 4
|
||||
1 10 4
|
||||
2 14 4
|
||||
-- { echo On }
|
||||
|
||||
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
1 ['.']
|
||||
2 ['.','.']
|
||||
2 ['.','.','.']
|
||||
2 ['.','.','.','.']
|
||||
2 ['.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.','.','.','.']
|
||||
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
1 ['.']
|
||||
1 ['.']
|
||||
2 ['.','.']
|
||||
2 ['.','.']
|
||||
2 ['.','.','.']
|
||||
2 ['.','.','.']
|
||||
2 ['.','.','.','.']
|
||||
2 ['.','.','.','.']
|
||||
2 ['.','.','.','.','.']
|
||||
2 ['.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.','.','.','.']
|
||||
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
1 1
|
||||
2 2
|
||||
2 3
|
||||
2 4
|
||||
2 5
|
||||
2 6
|
||||
2 7
|
||||
2 8
|
||||
2 9
|
||||
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
1 1
|
||||
1 1
|
||||
2 2
|
||||
2 2
|
||||
2 3
|
||||
2 3
|
||||
2 4
|
||||
2 4
|
||||
2 5
|
||||
2 5
|
||||
2 6
|
||||
2 6
|
||||
2 7
|
||||
2 7
|
||||
2 8
|
||||
2 8
|
||||
2 9
|
||||
2 9
|
||||
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
1 1
|
||||
1 1
|
||||
2 2
|
||||
2 2
|
||||
2 3
|
||||
2 3
|
||||
2 4
|
||||
2 4
|
||||
2 5
|
||||
2 5
|
||||
2 6
|
||||
2 6
|
||||
2 7
|
||||
2 7
|
||||
2 8
|
||||
2 8
|
||||
2 9
|
||||
2 9
|
||||
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
1 1
|
||||
1 1
|
||||
1 1
|
||||
2 2
|
||||
2 2
|
||||
2 2
|
||||
2 3
|
||||
2 3
|
||||
2 3
|
||||
2 4
|
||||
2 4
|
||||
2 4
|
||||
2 5
|
||||
2 5
|
||||
2 5
|
||||
2 6
|
||||
2 6
|
||||
2 6
|
||||
2 7
|
||||
2 7
|
||||
2 7
|
||||
2 8
|
||||
2 8
|
||||
2 8
|
||||
2 9
|
||||
2 9
|
||||
2 9
|
||||
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
2 ['.']
|
||||
2 ['.','.']
|
||||
2 ['.','.','.']
|
||||
2 ['.','.','.','.']
|
||||
2 ['.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.','.','.','.']
|
||||
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
2 ['.']
|
||||
2 ['.']
|
||||
2 ['.','.']
|
||||
2 ['.','.']
|
||||
2 ['.','.','.']
|
||||
2 ['.','.','.']
|
||||
2 ['.','.','.','.']
|
||||
2 ['.','.','.','.']
|
||||
2 ['.','.','.','.','.']
|
||||
2 ['.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.','.','.','.']
|
||||
2 ['.','.','.','.','.','.','.','.','.']
|
||||
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
2 1
|
||||
2 2
|
||||
2 3
|
||||
2 4
|
||||
2 5
|
||||
2 6
|
||||
2 7
|
||||
2 8
|
||||
2 9
|
||||
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
2 1
|
||||
2 1
|
||||
2 2
|
||||
2 2
|
||||
2 3
|
||||
2 3
|
||||
2 4
|
||||
2 4
|
||||
2 5
|
||||
2 5
|
||||
2 6
|
||||
2 6
|
||||
2 7
|
||||
2 7
|
||||
2 8
|
||||
2 8
|
||||
2 9
|
||||
2 9
|
||||
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
2 1
|
||||
2 1
|
||||
2 2
|
||||
2 2
|
||||
2 3
|
||||
2 3
|
||||
2 4
|
||||
2 4
|
||||
2 5
|
||||
2 5
|
||||
2 6
|
||||
2 6
|
||||
2 7
|
||||
2 7
|
||||
2 8
|
||||
2 8
|
||||
2 9
|
||||
2 9
|
||||
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
2 1
|
||||
2 1
|
||||
2 1
|
||||
2 2
|
||||
2 2
|
||||
2 2
|
||||
2 3
|
||||
2 3
|
||||
2 3
|
||||
2 4
|
||||
2 4
|
||||
2 4
|
||||
2 5
|
||||
2 5
|
||||
2 5
|
||||
2 6
|
||||
2 6
|
||||
2 6
|
||||
2 7
|
||||
2 7
|
||||
2 7
|
||||
2 8
|
||||
2 8
|
||||
2 8
|
||||
2 9
|
||||
2 9
|
||||
2 9
|
||||
|
@ -43,3 +43,23 @@ GROUP BY
|
||||
ORDER BY
|
||||
sum_value ASC,
|
||||
count_value ASC;
|
||||
|
||||
set prefer_localhost_replica = 1;
|
||||
|
||||
-- { echo On }
|
||||
|
||||
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
|
||||
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
SELECT count(), toString(number) AS k FROM remote('127.0.0.{1,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
|
||||
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
SELECT count(), arrayMap(x -> '.', range(number % 10)) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
|
||||
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (k, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
SELECT count(), toString(number) AS k FROM remote('127.0.0.{3,2}', numbers(10)) where number > ( queryID() = initialQueryID()) GROUP BY GROUPING SETS ((k), (number + 1, k), (k, number + 2)) ORDER BY k settings group_by_two_level_threshold=9, max_bytes_before_external_group_by=10000000000;
|
||||
|
Loading…
Reference in New Issue
Block a user