Merge pull request #37469 from azat/projections-optimize_aggregation_in_order

Implement in order aggregation (optimize_aggregation_in_order) for projections for tables with fully materialized projections
This commit is contained in:
Nikolai Kochetov 2022-06-21 12:17:35 +02:00 committed by GitHub
commit b8d27aa8dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 589 additions and 182 deletions

View File

@ -408,6 +408,29 @@ Block Aggregator::Params::getHeader(
return materializeBlock(res);
}
ColumnRawPtrs Aggregator::Params::makeRawKeyColumns(const Block & block) const
{
ColumnRawPtrs key_columns(keys_size);
for (size_t i = 0; i < keys_size; ++i)
key_columns[i] = block.safeGetByPosition(i).column.get();
return key_columns;
}
Aggregator::AggregateColumnsConstData Aggregator::Params::makeAggregateColumnsData(const Block & block) const
{
AggregateColumnsConstData aggregate_columns(aggregates_size);
for (size_t i = 0; i < aggregates_size; ++i)
{
const auto & aggregate_column_name = aggregates[i].column_name;
aggregate_columns[i] = &typeid_cast<const ColumnAggregateFunction &>(*block.getByName(aggregate_column_name).column).getData();
}
return aggregate_columns;
}
void Aggregator::Params::explain(WriteBuffer & out, size_t indent) const
{
Strings res;
@ -865,6 +888,38 @@ void Aggregator::executeOnBlockSmall(
executeImpl(result, row_begin, row_end, key_columns, aggregate_instructions);
}
void Aggregator::mergeOnBlockSmall(
AggregatedDataVariants & result,
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data,
const ColumnRawPtrs & key_columns) const
{
/// `result` will destroy the states of aggregate functions in the destructor
result.aggregator = this;
/// How to perform the aggregation?
if (result.empty())
{
initDataVariantsWithSizeHint(result, method_chosen, params);
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
}
if (false) {} // NOLINT
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(result.aggregates_pool, *result.NAME, result.NAME->data, \
result.without_key, /* no_more_keys= */ false, \
row_begin, row_end, \
aggregate_columns_data, key_columns);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
}
void Aggregator::executeImpl(
AggregatedDataVariants & result,
size_t row_begin,
@ -1181,8 +1236,7 @@ void NO_INLINE Aggregator::executeOnIntervalWithoutKeyImpl(
AggregatedDataVariants & data_variants,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const
AggregateFunctionInstruction * aggregate_instructions) const
{
/// `data_variants` will destroy the states of aggregate functions in the destructor
data_variants.aggregator = this;
@ -1198,17 +1252,30 @@ void NO_INLINE Aggregator::executeOnIntervalWithoutKeyImpl(
inst->offsets[static_cast<ssize_t>(row_begin) - 1],
inst->offsets[row_end - 1],
res + inst->state_offset,
inst->batch_arguments, arena);
inst->batch_arguments, data_variants.aggregates_pool);
else
inst->batch_that->addBatchSinglePlaceFromInterval(
row_begin,
row_end,
res + inst->state_offset,
inst->batch_arguments,
arena);
data_variants.aggregates_pool);
}
}
void NO_INLINE Aggregator::mergeOnIntervalWithoutKeyImpl(
AggregatedDataVariants & data_variants,
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data) const
{
/// `data_variants` will destroy the states of aggregate functions in the destructor
data_variants.aggregator = this;
data_variants.init(AggregatedDataVariants::Type::without_key);
mergeWithoutKeyStreamsImpl(data_variants, row_begin, row_end, aggregate_columns_data);
}
void Aggregator::prepareAggregateInstructions(Columns columns, AggregateColumns & aggregate_columns, Columns & materialized_columns,
AggregateFunctionInstructions & aggregate_functions_instructions, NestedColumnsHolder & nested_columns_holder) const
@ -2569,33 +2636,20 @@ ManyAggregatedDataVariants Aggregator::prepareVariantsToMerge(ManyAggregatedData
template <bool no_more_keys, typename Method, typename Table>
void NO_INLINE Aggregator::mergeStreamsImplCase(
Block & block,
Arena * aggregates_pool,
Method & method [[maybe_unused]],
Table & data,
AggregateDataPtr overflow_row) const
AggregateDataPtr overflow_row,
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data,
const ColumnRawPtrs & key_columns) const
{
ColumnRawPtrs key_columns(params.keys_size);
AggregateColumnsConstData aggregate_columns(params.aggregates_size);
/// Remember the columns we will work with
for (size_t i = 0; i < params.keys_size; ++i)
key_columns[i] = block.safeGetByPosition(i).column.get();
for (size_t i = 0; i < params.aggregates_size; ++i)
{
const auto & aggregate_column_name = params.aggregates[i].column_name;
aggregate_columns[i] = &typeid_cast<const ColumnAggregateFunction &>(*block.getByName(aggregate_column_name).column).getData();
}
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
/// For all rows.
size_t rows = block.rows();
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[row_end]);
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[rows]);
for (size_t i = 0; i < rows; ++i)
for (size_t i = row_begin; i < row_end; ++i)
{
AggregateDataPtr aggregate_data = nullptr;
@ -2631,45 +2685,69 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
{
/// Merge state of aggregate functions.
aggregate_functions[j]->mergeBatch(
0, rows,
row_begin, row_end,
places.get(), offsets_of_aggregate_states[j],
aggregate_columns[j]->data(),
aggregate_columns_data[j]->data(),
aggregates_pool);
}
/// Early release memory.
block.clear();
}
template <typename Method, typename Table>
void NO_INLINE Aggregator::mergeStreamsImpl(
Block & block,
Block block,
Arena * aggregates_pool,
Method & method,
Table & data,
AggregateDataPtr overflow_row,
bool no_more_keys) const
{
const AggregateColumnsConstData & aggregate_columns_data = params.makeAggregateColumnsData(block);
const ColumnRawPtrs & key_columns = params.makeRawKeyColumns(block);
mergeStreamsImpl<Method, Table>(
aggregates_pool,
method,
data,
overflow_row,
no_more_keys,
0,
block.rows(),
aggregate_columns_data,
key_columns);
}
template <typename Method, typename Table>
void NO_INLINE Aggregator::mergeStreamsImpl(
Arena * aggregates_pool,
Method & method,
Table & data,
AggregateDataPtr overflow_row,
bool no_more_keys,
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data,
const ColumnRawPtrs & key_columns) const
{
if (!no_more_keys)
mergeStreamsImplCase<false>(block, aggregates_pool, method, data, overflow_row);
mergeStreamsImplCase<false>(aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns);
else
mergeStreamsImplCase<true>(block, aggregates_pool, method, data, overflow_row);
mergeStreamsImplCase<true>(aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns);
}
void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
Block & block,
void NO_INLINE Aggregator::mergeBlockWithoutKeyStreamsImpl(
Block block,
AggregatedDataVariants & result) const
{
AggregateColumnsConstData aggregate_columns(params.aggregates_size);
/// Remember the columns we will work with
for (size_t i = 0; i < params.aggregates_size; ++i)
{
const auto & aggregate_column_name = params.aggregates[i].column_name;
aggregate_columns[i] = &typeid_cast<const ColumnAggregateFunction &>(*block.getByName(aggregate_column_name).column).getData();
}
AggregateColumnsConstData aggregate_columns = params.makeAggregateColumnsData(block);
mergeWithoutKeyStreamsImpl(result, 0, block.rows(), aggregate_columns);
}
void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
AggregatedDataVariants & result,
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data) const
{
AggregatedDataWithoutKey & res = result.without_key;
if (!res)
{
@ -2678,17 +2756,15 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
res = place;
}
for (size_t row = 0, rows = block.rows(); row < rows; ++row)
for (size_t row = row_begin; row < row_end; ++row)
{
/// Adding Values
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(res + offsets_of_aggregate_states[i], (*aggregate_columns[i])[row], result.aggregates_pool);
aggregate_functions[i]->merge(res + offsets_of_aggregate_states[i], (*aggregate_columns_data[i])[row], result.aggregates_pool);
}
/// Early release memory.
block.clear();
}
bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool & no_more_keys) const
{
/// `result` will destroy the states of aggregate functions in the destructor
@ -2704,11 +2780,10 @@ bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool
}
if (result.type == AggregatedDataVariants::Type::without_key || block.info.is_overflows)
mergeWithoutKeyStreamsImpl(block, result);
mergeBlockWithoutKeyStreamsImpl(std::move(block), result);
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys);
mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
@ -2824,7 +2899,7 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
{
#define M(NAME) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false);
mergeStreamsImpl(std::move(block), aggregates_pool, *result.NAME, result.NAME->data.impls[bucket], nullptr, false);
if (false) {} // NOLINT
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
@ -2875,11 +2950,11 @@ void Aggregator::mergeBlocks(BucketToBlocks bucket_to_blocks, AggregatedDataVari
break;
if (result.type == AggregatedDataVariants::Type::without_key || block.info.is_overflows)
mergeWithoutKeyStreamsImpl(block, result);
mergeBlockWithoutKeyStreamsImpl(std::move(block), result);
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys);
mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, result.without_key, no_more_keys);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
@ -2942,11 +3017,11 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
bucket_num = -1;
if (result.type == AggregatedDataVariants::Type::without_key || is_overflows)
mergeWithoutKeyStreamsImpl(block, result);
mergeBlockWithoutKeyStreamsImpl(std::move(block), result);
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(block, result.aggregates_pool, *result.NAME, result.NAME->data, nullptr, false);
mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, nullptr, false);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M

View File

@ -890,6 +890,11 @@ class NativeWriter;
class Aggregator final
{
public:
using AggregateColumns = std::vector<ColumnRawPtrs>;
using AggregateColumnsData = std::vector<ColumnAggregateFunction::Container *>;
using AggregateColumnsConstData = std::vector<const ColumnAggregateFunction::Container *>;
using AggregateFunctionsPlainPtrs = std::vector<const IAggregateFunction *>;
struct Params
{
/// Data structure of source blocks.
@ -1015,6 +1020,10 @@ public:
return getHeader(src_header, intermediate_header, keys, aggregates, final);
}
/// Remember the columns we will work with
ColumnRawPtrs makeRawKeyColumns(const Block & block) const;
AggregateColumnsConstData makeAggregateColumnsData(const Block & block) const;
/// Returns keys and aggregated for EXPLAIN query
void explain(WriteBuffer & out, size_t indent) const;
void explain(JSONBuilder::JSONMap & map) const;
@ -1022,11 +1031,6 @@ public:
explicit Aggregator(const Params & params_);
using AggregateColumns = std::vector<ColumnRawPtrs>;
using AggregateColumnsData = std::vector<ColumnAggregateFunction::Container *>;
using AggregateColumnsConstData = std::vector<const ColumnAggregateFunction::Container *>;
using AggregateFunctionsPlainPtrs = std::vector<const IAggregateFunction *>;
/// Process one block. Return false if the processing should be aborted (with group_by_overflow_mode = 'break').
bool executeOnBlock(const Block & block,
AggregatedDataVariants & result,
@ -1181,6 +1185,14 @@ private:
size_t row_end,
ColumnRawPtrs & key_columns,
AggregateFunctionInstruction * aggregate_instructions) const;
void mergeOnBlockSmall(
AggregatedDataVariants & result,
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data,
const ColumnRawPtrs & key_columns) const;
void mergeOnBlockImpl(Block block, AggregatedDataVariants & result, bool no_more_keys) const;
void executeImpl(
AggregatedDataVariants & result,
@ -1227,8 +1239,12 @@ private:
AggregatedDataVariants & data_variants,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const;
AggregateFunctionInstruction * aggregate_instructions) const;
void mergeOnIntervalWithoutKeyImpl(
AggregatedDataVariants & data_variants,
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data) const;
template <typename Method>
void writeToTemporaryFileImpl(
@ -1338,24 +1354,43 @@ private:
template <bool no_more_keys, typename Method, typename Table>
void mergeStreamsImplCase(
Block & block,
Arena * aggregates_pool,
Method & method,
Table & data,
AggregateDataPtr overflow_row) const;
AggregateDataPtr overflow_row,
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data,
const ColumnRawPtrs & key_columns) const;
template <typename Method, typename Table>
void mergeStreamsImpl(
Block & block,
Block block,
Arena * aggregates_pool,
Method & method,
Table & data,
AggregateDataPtr overflow_row,
bool no_more_keys) const;
template <typename Method, typename Table>
void mergeStreamsImpl(
Arena * aggregates_pool,
Method & method,
Table & data,
AggregateDataPtr overflow_row,
bool no_more_keys,
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data,
const ColumnRawPtrs & key_columns) const;
void mergeWithoutKeyStreamsImpl(
Block & block,
void mergeBlockWithoutKeyStreamsImpl(
Block block,
AggregatedDataVariants & result) const;
void mergeWithoutKeyStreamsImpl(
AggregatedDataVariants & result,
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data) const;
template <typename Method>
void mergeBucketImpl(

View File

@ -1617,7 +1617,10 @@ static void executeMergeAggregatedImpl(
Aggregator::Params params(header_before_merge, keys, aggregates, overflow_row, settings.max_threads);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, final);
auto transform_params = std::make_shared<AggregatingTransformParams>(
params,
final,
/* only_merge_= */ false);
auto merging_aggregated = std::make_unique<MergingAggregatedStep>(
query_plan.getCurrentDataStream(),
@ -2289,6 +2292,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
std::move(aggregator_params),
std::move(grouping_sets_params),
final,
/* only_merge_= */ false,
settings.max_block_size,
settings.aggregation_in_order_max_block_bytes,
merge_threads,
@ -2362,7 +2366,10 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modific
keys.push_back(header_before_transform.getPositionByName(key.name));
auto params = getAggregatorParams(query_ptr, *query_analyzer, *context, header_before_transform, keys, query_analyzer->aggregates(), false, settings, 0, 0);
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), true);
auto transform_params = std::make_shared<AggregatingTransformParams>(
std::move(params),
/* final_= */ true,
/* only_merge_= */ false);
QueryPlanStepPtr step;
if (modificator == Modificator::ROLLUP)

View File

@ -69,6 +69,7 @@ AggregatingStep::AggregatingStep(
Aggregator::Params params_,
GroupingSetsParamsList grouping_sets_params_,
bool final_,
bool only_merge_,
size_t max_block_size_,
size_t aggregation_in_order_max_block_bytes_,
size_t merge_threads_,
@ -79,7 +80,8 @@ AggregatingStep::AggregatingStep(
: ITransformingStep(input_stream_, appendGroupingColumn(params_.getHeader(final_), grouping_sets_params_), getTraits(), false)
, params(std::move(params_))
, grouping_sets_params(std::move(grouping_sets_params_))
, final(std::move(final_))
, final(final_)
, only_merge(only_merge_)
, max_block_size(max_block_size_)
, aggregation_in_order_max_block_bytes(aggregation_in_order_max_block_bytes_)
, merge_threads(merge_threads_)
@ -119,7 +121,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
* 1. Parallel aggregation is done, and the results should be merged in parallel.
* 2. An aggregation is done with store of temporary data on the disk, and they need to be merged in a memory efficient way.
*/
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), final);
auto transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), final, only_merge);
if (!grouping_sets_params.empty())
{
@ -169,7 +171,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
transform_params->params.intermediate_header,
transform_params->params.stats_collecting_params
};
auto transform_params_for_set = std::make_shared<AggregatingTransformParams>(std::move(params_for_set), final);
auto transform_params_for_set = std::make_shared<AggregatingTransformParams>(std::move(params_for_set), final, only_merge);
if (streams > 1)
{

View File

@ -7,9 +7,6 @@
namespace DB
{
struct AggregatingTransformParams;
using AggregatingTransformParamsPtr = std::shared_ptr<AggregatingTransformParams>;
struct GroupingSetsParams
{
GroupingSetsParams() = default;
@ -36,6 +33,7 @@ public:
Aggregator::Params params_,
GroupingSetsParamsList grouping_sets_params_,
bool final_,
bool only_merge_,
size_t max_block_size_,
size_t aggregation_in_order_max_block_bytes_,
size_t merge_threads_,
@ -59,6 +57,7 @@ private:
Aggregator::Params params;
GroupingSetsParamsList grouping_sets_params;
bool final;
bool only_merge;
size_t max_block_size;
size_t aggregation_in_order_max_block_bytes;
size_t merge_threads;

View File

@ -31,6 +31,7 @@ AggregatingInOrderTransform::AggregatingInOrderTransform(
, max_block_size(max_block_size_)
, max_block_bytes(max_block_bytes_)
, params(std::move(params_))
, aggregates_mask(getAggregatesMask(params->getHeader(), params->params.aggregates))
, group_by_info(group_by_info_)
, sort_description(group_by_description_)
, aggregate_columns(params->params.aggregates_size)
@ -66,6 +67,7 @@ static Int64 getCurrentMemoryUsage()
void AggregatingInOrderTransform::consume(Chunk chunk)
{
const Columns & columns = chunk.getColumns();
Int64 initial_memory_usage = getCurrentMemoryUsage();
size_t rows = chunk.getNumRows();
@ -85,7 +87,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
Columns key_columns(params->params.keys_size);
for (size_t i = 0; i < params->params.keys_size; ++i)
{
materialized_columns.push_back(chunk.getColumns().at(params->params.keys[i])->convertToFullColumnIfConst());
materialized_columns.push_back(columns.at(params->params.keys[i])->convertToFullColumnIfConst());
key_columns[i] = materialized_columns.back();
if (group_by_key)
key_columns_raw[i] = materialized_columns.back().get();
@ -93,7 +95,7 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
Aggregator::NestedColumnsHolder nested_columns_holder;
Aggregator::AggregateFunctionInstructions aggregate_function_instructions;
params->aggregator.prepareAggregateInstructions(chunk.getColumns(), aggregate_columns, materialized_columns, aggregate_function_instructions, nested_columns_holder);
params->aggregator.prepareAggregateInstructions(columns, aggregate_columns, materialized_columns, aggregate_function_instructions, nested_columns_holder);
size_t key_end = 0;
size_t key_begin = 0;
@ -120,6 +122,17 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
Int64 current_memory_usage = 0;
Aggregator::AggregateColumnsConstData aggregate_columns_data(params->params.aggregates_size);
if (params->only_merge)
{
for (size_t i = 0, j = 0; i < columns.size(); ++i)
{
if (!aggregates_mask[i])
continue;
aggregate_columns_data[j++] = &typeid_cast<const ColumnAggregateFunction &>(*columns[i]).getData();
}
}
/// Will split block into segments with the same key
while (key_end != rows)
{
@ -136,10 +149,20 @@ void AggregatingInOrderTransform::consume(Chunk chunk)
/// Add data to aggr. state if interval is not empty. Empty when haven't found current key in new block.
if (key_begin != key_end)
{
if (group_by_key)
params->aggregator.executeOnBlockSmall(variants, key_begin, key_end, key_columns_raw, aggregate_function_instructions.data());
if (params->only_merge)
{
if (group_by_key)
params->aggregator.mergeOnBlockSmall(variants, key_begin, key_end, aggregate_columns_data, key_columns_raw);
else
params->aggregator.mergeOnIntervalWithoutKeyImpl(variants, key_begin, key_end, aggregate_columns_data);
}
else
params->aggregator.executeOnIntervalWithoutKeyImpl(variants, key_begin, key_end, aggregate_function_instructions.data(), variants.aggregates_pool);
{
if (group_by_key)
params->aggregator.executeOnBlockSmall(variants, key_begin, key_end, key_columns_raw, aggregate_function_instructions.data());
else
params->aggregator.executeOnIntervalWithoutKeyImpl(variants, key_begin, key_end, aggregate_function_instructions.data());
}
}
current_memory_usage = getCurrentMemoryUsage() - initial_memory_usage;

View File

@ -56,6 +56,7 @@ private:
MutableColumns res_aggregate_columns;
AggregatingTransformParamsPtr params;
ColumnsMask aggregates_mask;
InputOrderInfoPtr group_by_info;
/// For sortBlock()

View File

@ -34,21 +34,24 @@ struct AggregatingTransformParams
AggregatorListPtr aggregator_list_ptr;
Aggregator & aggregator;
bool final;
/// Merge data for aggregate projections.
bool only_merge = false;
AggregatingTransformParams(const Aggregator::Params & params_, bool final_)
AggregatingTransformParams(const Aggregator::Params & params_, bool final_, bool only_merge_)
: params(params_)
, aggregator_list_ptr(std::make_shared<AggregatorList>())
, aggregator(*aggregator_list_ptr->emplace(aggregator_list_ptr->end(), params))
, final(final_)
, only_merge(only_merge_)
{
}
AggregatingTransformParams(const Aggregator::Params & params_, const AggregatorListPtr & aggregator_list_ptr_, bool final_)
AggregatingTransformParams(const Aggregator::Params & params_, const AggregatorListPtr & aggregator_list_ptr_, bool final_, bool only_merge_)
: params(params_)
, aggregator_list_ptr(aggregator_list_ptr_)
, aggregator(*aggregator_list_ptr->emplace(aggregator_list_ptr->end(), params))
, final(final_)
, only_merge(only_merge_)
{
}

View File

@ -5593,8 +5593,9 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
// If optimize_aggregation_in_order = true, we need additional information to transform the projection's pipeline.
auto attach_aggregation_in_order_info = [&]()
{
for (const auto & key : keys)
for (const auto & desc : select.getQueryAnalyzer()->aggregationKeys())
{
const String & key = desc.name;
auto actions_dag = analysis_result.before_aggregation->clone();
actions_dag->foldActionsByProjection({key}, sample_block_for_keys);
candidate.group_by_elements_actions.emplace_back(std::make_shared<ExpressionActions>(actions_dag, actions_settings));

View File

@ -25,6 +25,7 @@
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Processors/QueryPlan/UnionStep.h>
#include <Processors/QueryPlan/QueryIdHolder.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Core/UUID.h>
@ -115,6 +116,21 @@ static RelativeSize convertAbsoluteSampleSizeToRelative(const ASTPtr & node, siz
return std::min(RelativeSize(1), RelativeSize(absolute_sample_size) / RelativeSize(approx_total_rows));
}
static SortDescription getSortDescriptionFromGroupBy(const ASTSelectQuery & query)
{
SortDescription order_descr;
order_descr.reserve(query.groupBy()->children.size());
for (const auto & elem : query.groupBy()->children)
{
/// Note, here aliases should not be used, since there will be no such column in a block.
String name = elem->getColumnNameWithoutAlias();
order_descr.emplace_back(name, 1, 1);
}
return order_descr;
}
QueryPlanPtr MergeTreeDataSelectExecutor::read(
const Names & column_names_to_return,
@ -168,9 +184,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
query_info.projection->desc->type,
query_info.projection->desc->name);
Pipes pipes;
Pipe projection_pipe;
Pipe ordinary_pipe;
QueryPlanResourceHolder resources;
auto projection_plan = std::make_unique<QueryPlan>();
@ -217,12 +230,9 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
expression_before_aggregation->setStepDescription("Before GROUP BY");
projection_plan->addStep(std::move(expression_before_aggregation));
}
auto builder = projection_plan->buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
projection_pipe = QueryPipelineBuilder::getPipe(std::move(*builder), resources);
}
auto ordinary_query_plan = std::make_unique<QueryPlan>();
if (query_info.projection->merge_tree_normal_select_result_ptr)
{
auto storage_from_base_parts_of_projection
@ -234,49 +244,27 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
nullptr,
SelectQueryOptions{processed_stage}.projectionQuery());
QueryPlan ordinary_query_plan;
interpreter.buildQueryPlan(ordinary_query_plan);
interpreter.buildQueryPlan(*ordinary_query_plan);
const auto & expressions = interpreter.getAnalysisResult();
if (processed_stage == QueryProcessingStage::Enum::FetchColumns && expressions.before_where)
{
auto where_step = std::make_unique<FilterStep>(
ordinary_query_plan.getCurrentDataStream(),
ordinary_query_plan->getCurrentDataStream(),
expressions.before_where,
expressions.where_column_name,
expressions.remove_where_filter);
where_step->setStepDescription("WHERE");
ordinary_query_plan.addStep(std::move(where_step));
ordinary_query_plan->addStep(std::move(where_step));
}
auto builder = ordinary_query_plan.buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
ordinary_pipe = QueryPipelineBuilder::getPipe(std::move(*builder), resources);
}
Pipe projection_pipe;
Pipe ordinary_pipe;
if (query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
{
/// Here we create shared ManyAggregatedData for both projection and ordinary data.
/// For ordinary data, AggregatedData is filled in a usual way.
/// For projection data, AggregatedData is filled by merging aggregation states.
/// When all AggregatedData is filled, we merge aggregation states together in a usual way.
/// Pipeline will look like:
/// ReadFromProjection -> Aggregating (only merge states) ->
/// ReadFromProjection -> Aggregating (only merge states) ->
/// ... -> Resize -> ConvertingAggregatedToChunks
/// ReadFromOrdinaryPart -> Aggregating (usual) -> (added by last Aggregating)
/// ReadFromOrdinaryPart -> Aggregating (usual) ->
/// ...
auto many_data = std::make_shared<ManyAggregatedData>(projection_pipe.numOutputPorts() + ordinary_pipe.numOutputPorts());
size_t counter = 0;
AggregatorListPtr aggregator_list_ptr = std::make_shared<AggregatorList>();
// TODO apply in_order_optimization here
auto build_aggregate_pipe = [&](Pipe & pipe, bool projection)
auto make_aggregator_params = [&](const Block & header_before_aggregation, bool projection)
{
const auto & header_before_aggregation = pipe.getHeader();
ColumnNumbers keys;
for (const auto & key : query_info.projection->aggregation_keys)
keys.push_back(header_before_aggregation.getPositionByName(key.name));
@ -290,29 +278,28 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
descr.arguments.push_back(header_before_aggregation.getPositionByName(name));
}
AggregatingTransformParamsPtr transform_params;
Aggregator::Params params(
header_before_aggregation,
keys,
aggregates,
query_info.projection->aggregate_overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression);
bool only_merge = false;
if (projection)
{
Aggregator::Params params(
header_before_aggregation,
keys,
aggregates,
query_info.projection->aggregate_overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_expressions,
settings.min_count_to_compile_aggregate_expression,
header_before_aggregation); // The source header is also an intermediate header
transform_params = std::make_shared<AggregatingTransformParams>(
std::move(params), aggregator_list_ptr, query_info.projection->aggregate_final);
/// The source header is also an intermediate header
params.intermediate_header = header_before_aggregation;
/// This part is hacky.
/// We want AggregatingTransform to work with aggregate states instead of normal columns.
@ -321,51 +308,135 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
/// It is needed because data in projection:
/// * is not merged completely (we may have states with the same key in different parts)
/// * is not split into buckets (so if we just use MergingAggregated, it will use single thread)
transform_params->only_merge = true;
}
else
{
Aggregator::Params params(
header_before_aggregation,
keys,
aggregates,
query_info.projection->aggregate_overflow_row,
settings.max_rows_to_group_by,
settings.group_by_overflow_mode,
settings.group_by_two_level_threshold,
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression);
transform_params = std::make_shared<AggregatingTransformParams>(
std::move(params), aggregator_list_ptr, query_info.projection->aggregate_final);
only_merge = true;
}
pipe.resize(pipe.numOutputPorts(), true, true);
auto merge_threads = num_streams;
auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads
? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: static_cast<size_t>(settings.max_threads);
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingTransform>(
header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads);
});
return std::make_pair(params, only_merge);
};
if (!projection_pipe.empty())
build_aggregate_pipe(projection_pipe, true);
if (!ordinary_pipe.empty())
build_aggregate_pipe(ordinary_pipe, false);
if (ordinary_query_plan->isInitialized() && projection_plan->isInitialized())
{
auto projection_builder = projection_plan->buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
projection_pipe = QueryPipelineBuilder::getPipe(std::move(*projection_builder), resources);
auto ordinary_builder = ordinary_query_plan->buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
ordinary_pipe = QueryPipelineBuilder::getPipe(std::move(*ordinary_builder), resources);
/// Here we create shared ManyAggregatedData for both projection and ordinary data.
/// For ordinary data, AggregatedData is filled in a usual way.
/// For projection data, AggregatedData is filled by merging aggregation states.
/// When all AggregatedData is filled, we merge aggregation states together in a usual way.
/// Pipeline will look like:
/// ReadFromProjection -> Aggregating (only merge states) ->
/// ReadFromProjection -> Aggregating (only merge states) ->
/// ... -> Resize -> ConvertingAggregatedToChunks
/// ReadFromOrdinaryPart -> Aggregating (usual) -> (added by last Aggregating)
/// ReadFromOrdinaryPart -> Aggregating (usual) ->
/// ...
auto many_data = std::make_shared<ManyAggregatedData>(projection_pipe.numOutputPorts() + ordinary_pipe.numOutputPorts());
size_t counter = 0;
AggregatorListPtr aggregator_list_ptr = std::make_shared<AggregatorList>();
/// TODO apply optimize_aggregation_in_order here too (like below)
auto build_aggregate_pipe = [&](Pipe & pipe, bool projection)
{
auto [params, only_merge] = make_aggregator_params(pipe.getHeader(), projection);
AggregatingTransformParamsPtr transform_params = std::make_shared<AggregatingTransformParams>(
std::move(params), aggregator_list_ptr, query_info.projection->aggregate_final, only_merge);
pipe.resize(pipe.numOutputPorts(), true, true);
auto merge_threads = num_streams;
auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads
? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: static_cast<size_t>(settings.max_threads);
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AggregatingTransform>(
header, transform_params, many_data, counter++, merge_threads, temporary_data_merge_threads);
});
};
if (!projection_pipe.empty())
build_aggregate_pipe(projection_pipe, true);
if (!ordinary_pipe.empty())
build_aggregate_pipe(ordinary_pipe, false);
}
else
{
auto add_aggregating_step = [&](QueryPlanPtr & query_plan, bool projection)
{
auto [params, only_merge] = make_aggregator_params(query_plan->getCurrentDataStream().header, projection);
auto merge_threads = num_streams;
auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads
? static_cast<size_t>(settings.aggregation_memory_efficient_merge_threads)
: static_cast<size_t>(settings.max_threads);
InputOrderInfoPtr group_by_info = query_info.projection->input_order_info;
SortDescription group_by_sort_description;
if (group_by_info && settings.optimize_aggregation_in_order)
group_by_sort_description = getSortDescriptionFromGroupBy(query_info.query->as<ASTSelectQuery &>());
else
group_by_info = nullptr;
auto aggregating_step = std::make_unique<AggregatingStep>(
query_plan->getCurrentDataStream(),
std::move(params),
/* grouping_sets_params_= */ GroupingSetsParamsList{},
query_info.projection->aggregate_final,
only_merge,
settings.max_block_size,
settings.aggregation_in_order_max_block_bytes,
merge_threads,
temporary_data_merge_threads,
/* storage_has_evenly_distributed_read_= */ false,
std::move(group_by_info),
std::move(group_by_sort_description));
query_plan->addStep(std::move(aggregating_step));
};
if (projection_plan->isInitialized())
{
add_aggregating_step(projection_plan, true);
auto projection_builder = projection_plan->buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
projection_pipe = QueryPipelineBuilder::getPipe(std::move(*projection_builder), resources);
}
if (ordinary_query_plan->isInitialized())
{
add_aggregating_step(ordinary_query_plan, false);
auto ordinary_builder = ordinary_query_plan->buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
ordinary_pipe = QueryPipelineBuilder::getPipe(std::move(*ordinary_builder), resources);
}
}
}
else
{
if (projection_plan->isInitialized())
{
auto projection_builder = projection_plan->buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
projection_pipe = QueryPipelineBuilder::getPipe(std::move(*projection_builder), resources);
}
if (ordinary_query_plan->isInitialized())
{
auto ordinary_builder = ordinary_query_plan->buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context), BuildQueryPipelineSettings::fromContext(context));
ordinary_pipe = QueryPipelineBuilder::getPipe(std::move(*ordinary_builder), resources);
}
}
Pipes pipes;
pipes.emplace_back(std::move(projection_pipe));
pipes.emplace_back(std::move(ordinary_pipe));
auto pipe = Pipe::unitePipes(std::move(pipes));

View File

@ -1,3 +1,6 @@
-- Test that check the correctness of the result for optimize_aggregation_in_order and projections,
-- not that this optimization will take place.
DROP TABLE IF EXISTS normal;
CREATE TABLE normal

View File

@ -0,0 +1,28 @@
SELECT k1, k2, k3, sum(value) v FROM in_order_agg_01710 GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order=0
1 0 0 1249950000
1 0 2 1250000000
1 1 1 1249975000
1 1 3 1250025000
Used processors:
AggregatingTransform
SELECT k1, k2, k3, sum(value) v FROM in_order_agg_01710 GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order=1
1 0 0 1249950000
1 0 2 1250000000
1 1 1 1249975000
1 1 3 1250025000
Used processors:
AggregatingInOrderTransform
SELECT k1, k3, sum(value) v FROM in_order_agg_01710 GROUP BY k1, k3 ORDER BY k1, k3 SETTINGS optimize_aggregation_in_order=0
1 0 1249950000
1 1 1249975000
1 2 1250000000
1 3 1250025000
Used processors:
AggregatingTransform
SELECT k1, k3, sum(value) v FROM in_order_agg_01710 GROUP BY k1, k3 ORDER BY k1, k3 SETTINGS optimize_aggregation_in_order=1
1 0 1249950000
1 1 1249975000
1 2 1250000000
1 3 1250025000
Used processors:
AggregatingInOrderTransform

View File

@ -0,0 +1,63 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nm -q "
DROP TABLE IF EXISTS in_order_agg_01710;
CREATE TABLE in_order_agg_01710
(
k1 UInt32,
k2 UInt32,
k3 UInt32,
value UInt32,
PROJECTION aaaa
(
SELECT
k1,
k2,
k3,
sum(value)
GROUP BY k1, k2, k3
)
)
ENGINE = MergeTree
ORDER BY tuple();
INSERT INTO in_order_agg_01710 SELECT 1, number%2, number%4, number FROM numbers(100000);
"
function random_str()
{
local n=$1 && shift
tr -cd '[:lower:]' < /dev/urandom | head -c"$n"
}
function run_query()
{
local query=$1 && shift
local query_id
query_id="$CLICKHOUSE_TEST_UNIQUE_NAME-$(random_str 6)"
echo "$query"
local opts=(
--allow_experimental_projection_optimization 1
--force_optimize_projection 1
--log_processors_profiles 1
--query_id "$query_id"
)
$CLICKHOUSE_CLIENT "${opts[@]}" "$@" -q "$query"
$CLICKHOUSE_CLIENT -q "SYSTEM FLUSH LOGS"
echo "Used processors:"
$CLICKHOUSE_CLIENT --param_query_id "$query_id" -q "SELECT DISTINCT name FROM system.processors_profile_log WHERE query_id = {query_id:String} AND name LIKE 'Aggregating%'"
}
run_query "SELECT k1, k2, k3, sum(value) v FROM in_order_agg_01710 GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order=0"
run_query "SELECT k1, k2, k3, sum(value) v FROM in_order_agg_01710 GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order=1"
run_query "SELECT k1, k3, sum(value) v FROM in_order_agg_01710 GROUP BY k1, k3 ORDER BY k1, k3 SETTINGS optimize_aggregation_in_order=0"
run_query "SELECT k1, k3, sum(value) v FROM in_order_agg_01710 GROUP BY k1, k3 ORDER BY k1, k3 SETTINGS optimize_aggregation_in_order=1"

View File

@ -0,0 +1,28 @@
SELECT k1, k2, k3, sum(value) v FROM in_order_agg_partial_01710 GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order=0
1 0 0 1249950000
1 0 2 1250000000
1 1 1 1249975000
1 1 3 1250025000
Used processors:
AggregatingTransform
SELECT k1, k2, k3, sum(value) v FROM in_order_agg_partial_01710 GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order=1
1 0 0 1249950000
1 0 2 1250000000
1 1 1 1249975000
1 1 3 1250025000
Used processors:
AggregatingTransform
SELECT k1, k3, sum(value) v FROM in_order_agg_partial_01710 GROUP BY k1, k3 ORDER BY k1, k3 SETTINGS optimize_aggregation_in_order=0
1 0 1249950000
1 1 1249975000
1 2 1250000000
1 3 1250025000
Used processors:
AggregatingTransform
SELECT k1, k3, sum(value) v FROM in_order_agg_partial_01710 GROUP BY k1, k3 ORDER BY k1, k3 SETTINGS optimize_aggregation_in_order=1
1 0 1249950000
1 1 1249975000
1 2 1250000000
1 3 1250025000
Used processors:
AggregatingTransform

View File

@ -0,0 +1,69 @@
#!/usr/bin/env bash
# Test for optimize_aggregation_in_order with partial projections, i.e.:
# - first part will not have projection
# - second part will have projection
# And so two different aggregation should be done.
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -nm -q "
DROP TABLE IF EXISTS in_order_agg_partial_01710;
CREATE TABLE in_order_agg_partial_01710
(
k1 UInt32,
k2 UInt32,
k3 UInt32,
value UInt32
)
ENGINE = MergeTree
ORDER BY tuple();
INSERT INTO in_order_agg_partial_01710 SELECT 1, number%2, number%4, number FROM numbers(50000);
SYSTEM STOP MERGES in_order_agg_partial_01710;
ALTER TABLE in_order_agg_partial_01710 ADD PROJECTION aaaa (
SELECT
k1,
k2,
k3,
sum(value)
GROUP BY k1, k2, k3
);
INSERT INTO in_order_agg_partial_01710 SELECT 1, number%2, number%4, number FROM numbers(100000) LIMIT 50000, 100000;
"
function random_str()
{
local n=$1 && shift
tr -cd '[:lower:]' < /dev/urandom | head -c"$n"
}
function run_query()
{
local query=$1 && shift
local query_id
query_id="$CLICKHOUSE_TEST_UNIQUE_NAME-$(random_str 6)"
echo "$query"
local opts=(
--allow_experimental_projection_optimization 1
--force_optimize_projection 1
--log_processors_profiles 1
--query_id "$query_id"
)
$CLICKHOUSE_CLIENT "${opts[@]}" "$@" -q "$query"
$CLICKHOUSE_CLIENT -q "SYSTEM FLUSH LOGS"
echo "Used processors:"
$CLICKHOUSE_CLIENT --param_query_id "$query_id" -q "SELECT DISTINCT name FROM system.processors_profile_log WHERE query_id = {query_id:String} AND name LIKE 'Aggregating%'"
}
run_query "SELECT k1, k2, k3, sum(value) v FROM in_order_agg_partial_01710 GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order=0"
run_query "SELECT k1, k2, k3, sum(value) v FROM in_order_agg_partial_01710 GROUP BY k1, k2, k3 ORDER BY k1, k2, k3 SETTINGS optimize_aggregation_in_order=1"
run_query "SELECT k1, k3, sum(value) v FROM in_order_agg_partial_01710 GROUP BY k1, k3 ORDER BY k1, k3 SETTINGS optimize_aggregation_in_order=0"
run_query "SELECT k1, k3, sum(value) v FROM in_order_agg_partial_01710 GROUP BY k1, k3 ORDER BY k1, k3 SETTINGS optimize_aggregation_in_order=1"

View File

@ -2,6 +2,5 @@
ExpressionTransform
(ReadFromStorage)
AggregatingTransform
StrictResize
ExpressionTransform
SourceFromSingleChunk 0 → 1
ExpressionTransform
SourceFromSingleChunk 0 → 1