Merge pull request #53549 from Avogar/group-by-constant-keys

Optimize group by constant keys
This commit is contained in:
Kruglov Pavel 2023-09-18 12:12:40 +02:00 committed by GitHub
commit 9c888ea42b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 176 additions and 32 deletions

View File

@ -663,6 +663,7 @@ class IColumn;
M(SetOperationMode, except_default_mode, SetOperationMode::ALL, "Set default mode in EXCEPT query. Possible values: empty string, 'ALL', 'DISTINCT'. If empty, query without mode will throw exception.", 0) \
M(Bool, optimize_aggregators_of_group_by_keys, true, "Eliminates min/max/any/anyLast aggregators of GROUP BY keys in SELECT section", 0) \
M(Bool, optimize_group_by_function_keys, true, "Eliminates functions of other keys in GROUP BY section", 0) \
M(Bool, optimize_group_by_constant_keys, true, "Optimize GROUP BY when all keys in block are constant", 0) \
M(Bool, legacy_column_name_of_tuple_literal, false, "List all names of element of large tuple literals in their column names instead of hash. This settings exists only for compatibility reasons. It makes sense to set to 'true', while doing rolling update of cluster from version lower than 21.7 to higher.", 0) \
\
M(Bool, query_plan_enable_optimizations, true, "Apply optimizations to query plan", 0) \

View File

@ -80,6 +80,7 @@ namespace SettingsChangesHistory
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
{
{"23.9", {{"optimize_group_by_constant_keys", false, true, "Optimize group by constant keys by default"}}},
{"23.8", {{"rewrite_count_distinct_if_with_count_distinct_implementation", false, true, "Rewrite countDistinctIf with count_distinct_implementation configuration"}}},
{"23.7", {{"function_sleep_max_microseconds_per_block", 0, 3000000, "In previous versions, the maximum sleep time of 3 seconds was applied only for `sleep`, but not for `sleepEachRow` function. In the new version, we introduce this setting. If you set compatibility with the previous versions, we will disable the limit altogether."}}},
{"23.6", {{"http_send_timeout", 180, 30, "3 minutes seems crazy long. Note that this is timeout for a single network write call, not for the whole upload operation."},

View File

@ -1035,11 +1035,12 @@ void Aggregator::executeImpl(
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys,
bool all_keys_are_const,
AggregateDataPtr overflow_row) const
{
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
executeImpl(*result.NAME, result.aggregates_pool, row_begin, row_end, key_columns, aggregate_instructions, no_more_keys, overflow_row);
executeImpl(*result.NAME, result.aggregates_pool, row_begin, row_end, key_columns, aggregate_instructions, no_more_keys, all_keys_are_const, overflow_row);
if (false) {} // NOLINT
APPLY_FOR_AGGREGATED_VARIANTS(M)
@ -1059,6 +1060,7 @@ void NO_INLINE Aggregator::executeImpl(
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys,
bool all_keys_are_const,
AggregateDataPtr overflow_row) const
{
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
@ -1074,25 +1076,25 @@ void NO_INLINE Aggregator::executeImpl(
{
if (prefetch)
executeImplBatch<false, true, true>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
else
executeImplBatch<false, true, false>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
}
else
#endif
{
if (prefetch)
executeImplBatch<false, false, true>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
else
executeImplBatch<false, false, false>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
}
}
else
{
executeImplBatch<true, false, false>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, overflow_row);
executeImplBatch<true, false, false>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
}
}
@ -1104,6 +1106,7 @@ void NO_INLINE Aggregator::executeImplBatch(
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
bool all_keys_are_const,
AggregateDataPtr overflow_row) const
{
using KeyHolder = decltype(state.getKeyHolder(0, std::declval<Arena &>()));
@ -1120,21 +1123,28 @@ void NO_INLINE Aggregator::executeImplBatch(
/// For all rows.
AggregateDataPtr place = aggregates_pool->alloc(0);
for (size_t i = row_begin; i < row_end; ++i)
if (all_keys_are_const)
{
if constexpr (prefetch && HasPrefetchMemberFunc<decltype(method.data), KeyHolder>)
state.emplaceKey(method.data, 0, *aggregates_pool).setMapped(place);
}
else
{
for (size_t i = row_begin; i < row_end; ++i)
{
if (i == row_begin + prefetching.iterationsToMeasure())
prefetch_look_ahead = prefetching.calcPrefetchLookAhead();
if (i + prefetch_look_ahead < row_end)
if constexpr (prefetch && HasPrefetchMemberFunc<decltype(method.data), KeyHolder>)
{
auto && key_holder = state.getKeyHolder(i + prefetch_look_ahead, *aggregates_pool);
method.data.prefetch(std::move(key_holder));
}
}
if (i == row_begin + prefetching.iterationsToMeasure())
prefetch_look_ahead = prefetching.calcPrefetchLookAhead();
state.emplaceKey(method.data, i, *aggregates_pool).setMapped(place);
if (i + prefetch_look_ahead < row_end)
{
auto && key_holder = state.getKeyHolder(i + prefetch_look_ahead, *aggregates_pool);
method.data.prefetch(std::move(key_holder));
}
}
state.emplaceKey(method.data, i, *aggregates_pool).setMapped(place);
}
}
return;
}
@ -1153,7 +1163,7 @@ void NO_INLINE Aggregator::executeImplBatch(
}
}
if (!has_arrays && !hasSparseArguments(aggregate_instructions))
if (!has_arrays && !hasSparseArguments(aggregate_instructions) && !all_keys_are_const)
{
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
@ -1179,10 +1189,23 @@ void NO_INLINE Aggregator::executeImplBatch(
/// - this affects only optimize_aggregation_in_order,
/// - this is just a pointer, so it should not be significant,
/// - and plus this will require other changes in the interface.
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[row_end]);
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[all_keys_are_const ? 1 : row_end]);
/// For all rows.
for (size_t i = row_begin; i < row_end; ++i)
size_t start, end;
/// If all keys are const, key columns contain only 1 row.
if (all_keys_are_const)
{
start = 0;
end = 1;
}
else
{
start = row_begin;
end = row_end;
}
for (size_t i = start; i < end; ++i)
{
AggregateDataPtr aggregate_data = nullptr;
@ -1253,9 +1276,13 @@ void NO_INLINE Aggregator::executeImplBatch(
/// Add only if the key already exists.
auto find_result = state.findKey(method.data, i, *aggregates_pool);
if (find_result.isFound())
{
aggregate_data = find_result.getMapped();
}
else
{
aggregate_data = overflow_row;
}
}
places[i] = aggregate_data;
@ -1278,8 +1305,16 @@ void NO_INLINE Aggregator::executeImplBatch(
columns_data.emplace_back(getColumnData(inst->batch_arguments[argument_index]));
}
auto add_into_aggregate_states_function = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function;
add_into_aggregate_states_function(row_begin, row_end, columns_data.data(), places.get());
if (all_keys_are_const)
{
auto add_into_aggregate_states_function_single_place = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function_single_place;
add_into_aggregate_states_function_single_place(row_begin, row_end, columns_data.data(), places[0]);
}
else
{
auto add_into_aggregate_states_function = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function;
add_into_aggregate_states_function(row_begin, row_end, columns_data.data(), places.get());
}
}
#endif
@ -1294,12 +1329,24 @@ void NO_INLINE Aggregator::executeImplBatch(
AggregateFunctionInstruction * inst = aggregate_instructions + i;
if (inst->offsets)
inst->batch_that->addBatchArray(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
else if (inst->has_sparse_arguments)
inst->batch_that->addBatchSparse(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
if (all_keys_are_const)
{
if (inst->offsets)
inst->batch_that->addBatchSinglePlace(inst->offsets[static_cast<ssize_t>(row_begin) - 1], inst->offsets[row_end - 1], places[0] + inst->state_offset, inst->batch_arguments, aggregates_pool);
else if (inst->has_sparse_arguments)
inst->batch_that->addBatchSparseSinglePlace(row_begin, row_end, places[0] + inst->state_offset, inst->batch_arguments, aggregates_pool);
else
inst->batch_that->addBatchSinglePlace(row_begin, row_end, places[0] + inst->state_offset, inst->batch_arguments, aggregates_pool);
}
else
inst->batch_that->addBatch(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
{
if (inst->offsets)
inst->batch_that->addBatchArray(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
else if (inst->has_sparse_arguments)
inst->batch_that->addBatchSparse(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
else
inst->batch_that->addBatch(row_begin, row_end, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
}
}
}
@ -1539,12 +1586,27 @@ bool Aggregator::executeOnBlock(Columns columns,
* To make them work anyway, we materialize them.
*/
Columns materialized_columns;
bool all_keys_are_const = false;
if (params.optimize_group_by_constant_keys)
{
all_keys_are_const = true;
for (size_t i = 0; i < params.keys_size; ++i)
all_keys_are_const &= isColumnConst(*columns.at(keys_positions[i]));
}
/// Remember the columns we will work with
for (size_t i = 0; i < params.keys_size; ++i)
{
materialized_columns.push_back(recursiveRemoveSparse(columns.at(keys_positions[i]))->convertToFullColumnIfConst());
key_columns[i] = materialized_columns.back().get();
if (all_keys_are_const)
{
key_columns[i] = assert_cast<const ColumnConst &>(*columns.at(keys_positions[i])).getDataColumnPtr().get();
}
else
{
materialized_columns.push_back(recursiveRemoveSparse(columns.at(keys_positions[i]))->convertToFullColumnIfConst());
key_columns[i] = materialized_columns.back().get();
}
if (!result.isLowCardinality())
{
@ -1589,7 +1651,7 @@ bool Aggregator::executeOnBlock(Columns columns,
{
/// This is where data is written that does not fit in `max_rows_to_group_by` with `group_by_overflow_mode = any`.
AggregateDataPtr overflow_row_ptr = params.overflow_row ? result.without_key : nullptr;
executeImpl(result, row_begin, row_end, key_columns, aggregate_functions_instructions.data(), no_more_keys, overflow_row_ptr);
executeImpl(result, row_begin, row_end, key_columns, aggregate_functions_instructions.data(), no_more_keys, all_keys_are_const, overflow_row_ptr);
}
size_t result_size = result.sizeWithoutOverflowRow();

View File

@ -1023,6 +1023,8 @@ public:
bool enable_prefetch;
bool optimize_group_by_constant_keys;
struct StatsCollectingParams
{
StatsCollectingParams();
@ -1060,6 +1062,7 @@ public:
size_t max_block_size_,
bool enable_prefetch_,
bool only_merge_, // true for projections
bool optimize_group_by_constant_keys_,
const StatsCollectingParams & stats_collecting_params_ = {})
: keys(keys_)
, aggregates(aggregates_)
@ -1080,6 +1083,7 @@ public:
, max_block_size(max_block_size_)
, only_merge(only_merge_)
, enable_prefetch(enable_prefetch_)
, optimize_group_by_constant_keys(optimize_group_by_constant_keys_)
, stats_collecting_params(stats_collecting_params_)
{
}
@ -1280,6 +1284,7 @@ private:
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys = false,
bool all_keys_are_const = false,
AggregateDataPtr overflow_row = nullptr) const;
/// Process one data block, aggregate the data into a hash table.
@ -1292,6 +1297,7 @@ private:
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions,
bool no_more_keys,
bool all_keys_are_const,
AggregateDataPtr overflow_row) const;
/// Specialization for a particular value no_more_keys.
@ -1303,6 +1309,7 @@ private:
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
bool all_keys_are_const,
AggregateDataPtr overflow_row) const;
/// For case when there are no keys (all aggregate into one row).

View File

@ -2574,6 +2574,7 @@ static Aggregator::Params getAggregatorParams(
settings.max_block_size,
settings.enable_software_prefetch_in_aggregation,
/* only_merge */ false,
settings.optimize_group_by_constant_keys,
stats_collecting_params
};
}

View File

@ -290,6 +290,7 @@ Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context
settings.max_block_size,
settings.enable_software_prefetch_in_aggregation,
/* only_merge */ false,
settings.optimize_group_by_constant_keys,
stats_collecting_params);
return aggregator_params;

View File

@ -230,6 +230,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
transform_params->params.max_block_size,
transform_params->params.enable_prefetch,
/* only_merge */ false,
transform_params->params.optimize_group_by_constant_keys,
transform_params->params.stats_collecting_params};
auto transform_params_for_set = std::make_shared<AggregatingTransformParams>(src_header, std::move(params_for_set), final);

View File

@ -41,7 +41,8 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm(
settings.min_count_to_compile_aggregate_expression,
settings.max_block_size,
settings.enable_software_prefetch_in_aggregation,
false /* only_merge */);
false /* only_merge */,
settings.optimize_group_by_constant_keys);
aggregator = std::make_unique<Aggregator>(header, params);

View File

@ -329,7 +329,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
settings.min_count_to_compile_aggregate_expression,
settings.max_block_size,
settings.enable_software_prefetch_in_aggregation,
only_merge);
only_merge,
settings.optimize_group_by_constant_keys);
return std::make_pair(params, only_merge);
};

View File

@ -0,0 +1,40 @@
10000000 1 2 3
10000000 1 2 3
10000000 1 2 3
10000000 1 2 3
10
10
10
10
10
10
10
10
10
10
10
10
10 data.1.JSON
10 data.2.JSON
10 data.JSON
10 data.1.JSON
10 data.2.JSON
10 data.JSON
10 data.1.JSON
10 data.2.JSON
10 data.JSON
10 data.1.JSON
10 data.2.JSON
10 data.JSON
10
10
10
10
10
10
10
10
10
10
10
10

View File

@ -0,0 +1,28 @@
select count(number), 1 AS k1, 2 as k2, 3 as k3 from numbers_mt(10000000) group by k1, k2, k3 settings optimize_group_by_constant_keys=1, enable_software_prefetch_in_aggregation=0, compile_aggregate_expressions=0;
select count(number), 1 AS k1, 2 as k2, 3 as k3 from numbers_mt(10000000) group by k1, k2, k3 settings optimize_group_by_constant_keys=1, enable_software_prefetch_in_aggregation=1, compile_aggregate_expressions = 0;
select count(number), 1 AS k1, 2 as k2, 3 as k3 from numbers_mt(10000000) group by k1, k2, k3 settings optimize_group_by_constant_keys=1, enable_software_prefetch_in_aggregation=0, compile_aggregate_expressions = 1;
select count(number), 1 AS k1, 2 as k2, 3 as k3 from numbers_mt(10000000) group by k1, k2, k3 settings optimize_group_by_constant_keys=1, enable_software_prefetch_in_aggregation=1, compile_aggregate_expressions = 1;
drop table if exists test;
create table test (x UInt64) engine=File(JSON);
set engine_file_allow_create_multiple_files = 1;
insert into test select * from numbers(10);
insert into test select * from numbers(10);
insert into test select * from numbers(10);
select count() from test group by _file order by _file settings optimize_group_by_constant_keys=1, enable_software_prefetch_in_aggregation=0, compile_aggregate_expressions=0;
select count() from test group by _file order by _file settings optimize_group_by_constant_keys=1, enable_software_prefetch_in_aggregation=1, compile_aggregate_expressions=0;
select count() from test group by _file order by _file settings optimize_group_by_constant_keys=1, enable_software_prefetch_in_aggregation=0, compile_aggregate_expressions=1;
select count() from test group by _file order by _file settings optimize_group_by_constant_keys=1, enable_software_prefetch_in_aggregation=1, compile_aggregate_expressions=1;
select count(), _file from test group by _file order by _file settings optimize_group_by_constant_keys=1, enable_software_prefetch_in_aggregation=0, compile_aggregate_expressions=0;
select count(), _file from test group by _file order by _file settings optimize_group_by_constant_keys=1, enable_software_prefetch_in_aggregation=1, compile_aggregate_expressions=0;
select count(), _file from test group by _file order by _file settings optimize_group_by_constant_keys=1, enable_software_prefetch_in_aggregation=0, compile_aggregate_expressions=1;
select count(), _file from test group by _file order by _file settings optimize_group_by_constant_keys=1, enable_software_prefetch_in_aggregation=1, compile_aggregate_expressions=1;
select count() from test group by _file, _path order by _file, _path settings optimize_group_by_constant_keys=1, enable_software_prefetch_in_aggregation=0, compile_aggregate_expressions=0;
select count() from test group by _file, _path order by _file, _path settings optimize_group_by_constant_keys=1, enable_software_prefetch_in_aggregation=1, compile_aggregate_expressions=0;
select count() from test group by _file, _path order by _file, _path settings optimize_group_by_constant_keys=1, enable_software_prefetch_in_aggregation=0, compile_aggregate_expressions=1;
select count() from test group by _file, _path order by _file, _path settings optimize_group_by_constant_keys=1, enable_software_prefetch_in_aggregation=1, compile_aggregate_expressions=1;
drop table test;