Added second variant of compilation

This commit is contained in:
Maksim Kita 2021-06-09 10:40:39 +03:00
parent 1e2f22a183
commit d24d3ae992
8 changed files with 250 additions and 9 deletions

View File

@ -108,6 +108,7 @@ class IColumn;
M(UInt64, min_count_to_compile_expression, 3, "The number of identical expressions before they are JIT-compiled", 0) \
M(Bool, compile_aggregate_expressions, true, "Compile aggregate functions to native code.", 0) \
M(UInt64, min_count_to_compile_aggregate_expression, 0, "The number of identical aggreagte expressions before they are JIT-compiled", 0) \
M(UInt64, aggregation_method, 0, "Aggregation method", 0) \
M(UInt64, group_by_two_level_threshold, 100000, "From what number of keys, a two-level aggregation starts. 0 - the threshold is not set.", 0) \
M(UInt64, group_by_two_level_threshold_bytes, 50000000, "From what size of the aggregation state in bytes, a two-level aggregation begins to be used. 0 - the threshold is not set. Two-level aggregation is used when at least one of the thresholds is triggered.", 0) \
M(Bool, distributed_aggregation_memory_efficient, true, "Is the memory-saving mode of distributed aggregation enabled.", 0) \

View File

@ -34,7 +34,7 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm(
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode, 0, 0,
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
storage_.getContext()->getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions, settings.min_count_to_compile_aggregate_expression);
settings.compile_aggregate_expressions, settings.min_count_to_compile_aggregate_expression, settings.aggregation_method);
aggregator = std::make_unique<Aggregator>(params);
}

View File

@ -321,7 +321,7 @@ void Aggregator::compileAggregateFunctions()
}
/// TODO: Probably better to compile more than 2 functions
if (functions_to_compile.empty())
if (functions_to_compile.empty() || functions_to_compile.size() != aggregate_functions.size())
return;
CompiledAggregateFunctions compiled_aggregate_functions;
@ -597,6 +597,90 @@ void NO_INLINE Aggregator::handleAggregationJIT(
auto add_into_aggregate_states_function = compiled_functions->add_into_aggregate_states_function;
auto create_aggregate_states_function = compiled_functions->create_aggregate_states_function;
std::unique_ptr<AggregateDataPtr[]> places;
bool not_all_functions_compiled = compiled_functions->functions_count != offsets_of_aggregate_states.size();
if (not_all_functions_compiled)
places.reset(new AggregateDataPtr[rows]);
auto get_aggregate_data = [&](size_t row) -> AggregateDataPtr
{
AggregateDataPtr aggregate_data;
if constexpr (!no_more_keys)
{
auto emplace_result = state.emplaceKey(method.data, row, *aggregates_pool);
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
if (emplace_result.isInserted())
{
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
emplace_result.setMapped(nullptr);
aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
create_aggregate_states_function(aggregate_data);
emplace_result.setMapped(aggregate_data);
}
else
aggregate_data = emplace_result.getMapped();
assert(aggregate_data != nullptr);
}
else
{
/// Add only if the key already exists.
/// Overflow row is disabled for JIT.
auto find_result = state.findKey(method.data, row, *aggregates_pool);
assert(find_result.getMapped() != nullptr);
aggregate_data = find_result.getMapped();
}
if (not_all_functions_compiled)
places[row] = aggregate_data;
return aggregate_data;
};
GetAggregateDataFunction get_aggregate_data_function = FunctorToStaticMethodAdaptor<decltype(get_aggregate_data)>::unsafeCall;
GetAggregateDataContext get_aggregate_data_context = reinterpret_cast<char *>(&get_aggregate_data);
add_into_aggregate_states_function(rows, columns_data.data(), get_aggregate_data_function, get_aggregate_data_context);
/// Add values to the aggregate functions.
AggregateFunctionInstruction * inst = aggregate_instructions + compiled_functions->functions_count;
for (; inst->that; ++inst)
{
if (inst->offsets)
inst->batch_that->addBatchArray(rows, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
else
inst->batch_that->addBatch(rows, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
}
}
template <bool no_more_keys, typename Method>
void NO_INLINE Aggregator::handleAggregationJITV2(
Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions) const
{
std::vector<ColumnData> columns_data;
columns_data.reserve(aggregate_functions.size());
/// Add values to the aggregate functions.
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
size_t arguments_size = inst->that->getArgumentTypes().size();
for (size_t i = 0; i < arguments_size; ++i)
columns_data.emplace_back(getColumnData(inst->batch_arguments[i]));
}
auto add_into_aggregate_states_function = compiled_functions->add_into_aggregate_states_function_v2;
auto create_aggregate_states_function = compiled_functions->create_aggregate_states_function;
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[rows]);
/// For all rows.
@ -772,7 +856,10 @@ void NO_INLINE Aggregator::executeImplBatch(
#if USE_EMBEDDED_COMPILER
if (compiled_functions)
{
handleAggregationJIT<no_more_keys>(method, state, aggregates_pool, rows, aggregate_instructions);
if (params.aggregation_method == 0)
handleAggregationJIT<no_more_keys>(method, state, aggregates_pool, rows, aggregate_instructions);
else
handleAggregationJITV2<no_more_keys>(method, state, aggregates_pool, rows, aggregate_instructions);
}
else
#endif

View File

@ -911,6 +911,7 @@ public:
bool compile_aggregate_expressions;
size_t min_count_to_compile_aggregate_expression;
size_t aggregation_method;
Params(
const Block & src_header_,
@ -923,6 +924,7 @@ public:
size_t min_free_disk_space_,
bool compile_aggregate_expressions_,
size_t min_count_to_compile_aggregate_expression_,
size_t aggregation_method_,
const Block & intermediate_header_ = {})
: src_header(src_header_),
intermediate_header(intermediate_header_),
@ -934,14 +936,15 @@ public:
tmp_volume(tmp_volume_), max_threads(max_threads_),
min_free_disk_space(min_free_disk_space_),
compile_aggregate_expressions(compile_aggregate_expressions_),
min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_)
min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_),
aggregation_method(aggregation_method_)
{
}
/// Only parameters that matter during merge.
Params(const Block & intermediate_header_,
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0, false, 0)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0, false, 0, 0)
{
intermediate_header = intermediate_header_;
}
@ -1135,6 +1138,14 @@ private:
size_t rows,
AggregateFunctionInstruction * aggregate_instructions) const;
template <bool no_more_keys, typename Method>
void handleAggregationJITV2(
Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions) const;
template <bool no_more_keys, typename Method>
void handleAggregationDefault(
Method & method,

View File

@ -2040,7 +2040,8 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression);
settings.min_count_to_compile_aggregate_expression,
settings.aggregation_method);
SortDescription group_by_sort_description;
@ -2144,7 +2145,8 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modific
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression);
settings.min_count_to_compile_aggregate_expression,
settings.aggregation_method);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, true);

View File

@ -307,6 +307,137 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const
auto & context = module.getContext();
llvm::IRBuilder<> b(context);
auto * size_type = b.getIntNTy(sizeof(size_t) * 8);
auto * column_data_type = llvm::StructType::get(b.getInt8PtrTy(), b.getInt8PtrTy());
auto * get_place_func_declaration = llvm::FunctionType::get(b.getInt8Ty()->getPointerTo(), { b.getInt8Ty()->getPointerTo(), size_type }, /*isVarArg=*/false);
auto * aggregate_loop_func_declaration = llvm::FunctionType::get(b.getVoidTy(), { size_type, column_data_type->getPointerTo(), get_place_func_declaration->getPointerTo(), b.getInt8Ty()->getPointerTo() }, false);
auto * aggregate_loop_func_definition = llvm::Function::Create(aggregate_loop_func_declaration, llvm::Function::ExternalLinkage, name, module);
auto * arguments = aggregate_loop_func_definition->args().begin();
llvm::Value * rows_count_arg = arguments++;
llvm::Value * columns_arg = arguments++;
llvm::Value * get_place_function_arg = arguments++;
llvm::Value * get_place_function_context_arg = arguments++;
/// Initialize ColumnDataPlaceholder llvm representation of ColumnData
auto * entry = llvm::BasicBlock::Create(b.getContext(), "entry", aggregate_loop_func_definition);
b.SetInsertPoint(entry);
std::vector<ColumnDataPlaceholder> columns;
size_t previous_columns_size = 0;
for (const auto & function : functions)
{
auto argument_types = function.function->getArgumentTypes();
ColumnDataPlaceholder data_placeholder;
size_t function_arguments_size = argument_types.size();
for (size_t column_argument_index = 0; column_argument_index < function_arguments_size; ++column_argument_index)
{
const auto & argument_type = argument_types[column_argument_index];
auto * data = b.CreateLoad(column_data_type, b.CreateConstInBoundsGEP1_32(column_data_type, columns_arg, previous_columns_size + column_argument_index));
data_placeholder.data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(argument_type))->getPointerTo());
data_placeholder.null_init = argument_type->isNullable() ? b.CreateExtractValue(data, {1}) : nullptr;
columns.emplace_back(data_placeholder);
}
previous_columns_size += function_arguments_size;
}
/// Initialize loop
auto * end = llvm::BasicBlock::Create(b.getContext(), "end", aggregate_loop_func_definition);
auto * loop = llvm::BasicBlock::Create(b.getContext(), "loop", aggregate_loop_func_definition);
b.CreateCondBr(b.CreateICmpEQ(rows_count_arg, llvm::ConstantInt::get(size_type, 0)), end, loop);
b.SetInsertPoint(loop);
auto * counter_phi = b.CreatePHI(rows_count_arg->getType(), 2);
counter_phi->addIncoming(llvm::ConstantInt::get(size_type, 0), entry);
for (auto & col : columns)
{
col.data = b.CreatePHI(col.data_init->getType(), 2);
col.data->addIncoming(col.data_init, entry);
if (col.null_init)
{
col.null = b.CreatePHI(col.null_init->getType(), 2);
col.null->addIncoming(col.null_init, entry);
}
}
auto * aggregation_place = b.CreateCall(get_place_func_declaration, get_place_function_arg, { get_place_function_context_arg, counter_phi });
previous_columns_size = 0;
for (const auto & function : functions)
{
size_t aggregate_function_offset = function.aggregate_data_offset;
const auto * aggregate_function_ptr = function.function;
auto arguments_types = function.function->getArgumentTypes();
std::vector<llvm::Value *> arguments_values;
size_t function_arguments_size = arguments_types.size();
arguments_values.resize(function_arguments_size);
for (size_t column_argument_index = 0; column_argument_index < function_arguments_size; ++column_argument_index)
{
auto * column_argument_data = columns[previous_columns_size + column_argument_index].data;
auto * column_argument_null_data = columns[previous_columns_size + column_argument_index].null;
auto & argument_type = arguments_types[column_argument_index];
auto * value = b.CreateLoad(toNativeType(b, removeNullable(argument_type)), column_argument_data);
if (!argument_type->isNullable())
{
arguments_values[column_argument_index] = value;
continue;
}
auto * is_null = b.CreateICmpNE(b.CreateLoad(b.getInt8Ty(), column_argument_null_data), b.getInt8(0));
auto * nullable_unitilized = llvm::Constant::getNullValue(toNativeType(b, argument_type));
auto * nullable_value = b.CreateInsertValue(b.CreateInsertValue(nullable_unitilized, value, {0}), is_null, {1});
arguments_values[column_argument_index] = nullable_value;
}
auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_32(nullptr, aggregation_place, aggregate_function_offset);
aggregate_function_ptr->compileAdd(b, aggregation_place_with_offset, arguments_types, arguments_values);
previous_columns_size += function_arguments_size;
}
/// End of loop
auto * cur_block = b.GetInsertBlock();
for (auto & col : columns)
{
col.data->addIncoming(b.CreateConstInBoundsGEP1_32(nullptr, col.data, 1), cur_block);
if (col.null)
col.null->addIncoming(b.CreateConstInBoundsGEP1_32(nullptr, col.null, 1), cur_block);
}
auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1));
counter_phi->addIncoming(value, cur_block);
b.CreateCondBr(b.CreateICmpEQ(value, rows_count_arg), end, loop);
b.SetInsertPoint(end);
b.CreateRetVoid();
}
static void compileAddIntoAggregateStatesFunctionsV2(llvm::Module & module, const std::vector<AggregateFunctionWithOffset> & functions, const std::string & name)
{
auto & context = module.getContext();
llvm::IRBuilder<> b(context);
auto * size_type = b.getIntNTy(sizeof(size_t) * 8);
auto * places_type = b.getInt8Ty()->getPointerTo()->getPointerTo();
auto * column_data_type = llvm::StructType::get(b.getInt8PtrTy(), b.getInt8PtrTy());
@ -568,6 +699,7 @@ CompiledAggregateFunctions compileAggregateFunctons(CHJIT & jit, const std::vect
{
std::string create_aggregate_states_functions_name = functions_dump_name + "_create";
std::string add_aggregate_states_functions_name = functions_dump_name + "_add";
std::string add_aggregate_states_functions_name_v2 = functions_dump_name + "_add_v2";
std::string merge_aggregate_states_functions_name = functions_dump_name + "_merge";
std::string insert_aggregate_states_functions_name = functions_dump_name + "_insert";
@ -575,17 +707,20 @@ CompiledAggregateFunctions compileAggregateFunctons(CHJIT & jit, const std::vect
{
compileCreateAggregateStatesFunctions(module, functions, create_aggregate_states_functions_name);
compileAddIntoAggregateStatesFunctions(module, functions, add_aggregate_states_functions_name);
compileAddIntoAggregateStatesFunctionsV2(module, functions, add_aggregate_states_functions_name_v2);
compileMergeAggregatesStates(module, functions, merge_aggregate_states_functions_name);
compileInsertAggregatesIntoResultColumns(module, functions, insert_aggregate_states_functions_name);
});
auto create_aggregate_states_function = reinterpret_cast<JITCreateAggregateStatesFunction>(compiled_module.function_name_to_symbol[create_aggregate_states_functions_name]);
auto add_into_aggregate_states_function = reinterpret_cast<JITAddIntoAggregateStatesFunction>(compiled_module.function_name_to_symbol[add_aggregate_states_functions_name]);
auto add_into_aggregate_states_function_v2 = reinterpret_cast<JITAddIntoAggregateStatesFunctionV2>(compiled_module.function_name_to_symbol[add_aggregate_states_functions_name_v2]);
auto merge_aggregate_states_function = reinterpret_cast<JITMergeAggregateStatesFunction>(compiled_module.function_name_to_symbol[merge_aggregate_states_functions_name]);
auto insert_aggregate_states_function = reinterpret_cast<JITInsertAggregatesIntoColumnsFunction>(compiled_module.function_name_to_symbol[insert_aggregate_states_functions_name]);
assert(create_aggregate_states_function);
assert(add_into_aggregate_states_function);
assert(add_into_aggregate_states_function_v2);
assert(merge_aggregate_states_function);
assert(insert_aggregate_states_function);
@ -593,6 +728,7 @@ CompiledAggregateFunctions compileAggregateFunctons(CHJIT & jit, const std::vect
{
.create_aggregate_states_function = create_aggregate_states_function,
.add_into_aggregate_states_function = add_into_aggregate_states_function,
.add_into_aggregate_states_function_v2 = add_into_aggregate_states_function_v2,
.merge_aggregate_states_function = merge_aggregate_states_function,
.insert_aggregates_into_columns_function = insert_aggregate_states_function,

View File

@ -61,7 +61,8 @@ using GetAggregateDataContext = char *;
using GetAggregateDataFunction = AggregateDataPtr (*)(GetAggregateDataContext, size_t);
using JITCreateAggregateStatesFunction = void (*)(AggregateDataPtr);
using JITAddIntoAggregateStatesFunction = void (*)(ColumnDataRowsSize, ColumnData *, AggregateDataPtr *);
using JITAddIntoAggregateStatesFunction = void (*)(ColumnDataRowsSize, ColumnData *, GetAggregateDataFunction, GetAggregateDataContext);
using JITAddIntoAggregateStatesFunctionV2 = void (*)(ColumnDataRowsSize, ColumnData *, AggregateDataPtr *);
using JITMergeAggregateStatesFunction = void (*)(AggregateDataPtr, AggregateDataPtr);
using JITInsertAggregatesIntoColumnsFunction = void (*)(ColumnDataRowsSize, ColumnData *, AggregateDataPtr *);
@ -69,6 +70,7 @@ struct CompiledAggregateFunctions
{
JITCreateAggregateStatesFunction create_aggregate_states_function;
JITAddIntoAggregateStatesFunction add_into_aggregate_states_function;
JITAddIntoAggregateStatesFunctionV2 add_into_aggregate_states_function_v2;
JITMergeAggregateStatesFunction merge_aggregate_states_function;
JITInsertAggregatesIntoColumnsFunction insert_aggregates_into_columns_function;

View File

@ -303,6 +303,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
settings.min_free_disk_space_for_temporary_data,
settings.compile_expressions,
settings.min_count_to_compile_aggregate_expression,
settings.aggregation_method,
header_before_aggregation); // The source header is also an intermediate header
transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), query_info.projection->aggregate_final);
@ -333,7 +334,8 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression);
settings.min_count_to_compile_aggregate_expression,
settings.aggregation_method);
transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), query_info.projection->aggregate_final);
}