Aggregator added CompiledExpressionCache

This commit is contained in:
Maksim Kita 2021-06-13 15:38:57 +03:00
parent f17e212a72
commit da8c957167
17 changed files with 384 additions and 630 deletions

View File

@ -50,7 +50,7 @@
#include <Interpreters/DNSCacheUpdater.h>
#include <Interpreters/ExternalLoaderXMLConfigRepository.h>
#include <Interpreters/InterserverCredentials.h>
#include <Interpreters/ExpressionJIT.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Access/AccessControlManager.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/System/attachSystemTables.h>

View File

@ -108,7 +108,6 @@ class IColumn;
M(UInt64, min_count_to_compile_expression, 3, "The number of identical expressions before they are JIT-compiled", 0) \
M(Bool, compile_aggregate_expressions, true, "Compile aggregate functions to native code.", 0) \
M(UInt64, min_count_to_compile_aggregate_expression, 0, "The number of identical aggreagte expressions before they are JIT-compiled", 0) \
M(UInt64, aggregation_method, 0, "Aggregation method", 0) \
M(UInt64, group_by_two_level_threshold, 100000, "From what number of keys, a two-level aggregation starts. 0 - the threshold is not set.", 0) \
M(UInt64, group_by_two_level_threshold_bytes, 50000000, "From what size of the aggregation state in bytes, a two-level aggregation begins to be used. 0 - the threshold is not set. Two-level aggregation is used when at least one of the thresholds is triggered.", 0) \
M(Bool, distributed_aggregation_memory_efficient, true, "Is the memory-saving mode of distributed aggregation enabled.", 0) \

View File

@ -34,7 +34,7 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm(
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode, 0, 0,
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
storage_.getContext()->getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions, settings.min_count_to_compile_aggregate_expression, settings.aggregation_method);
settings.compile_aggregate_expressions, settings.min_count_to_compile_aggregate_expression);
aggregator = std::make_unique<Aggregator>(params);
}

View File

@ -8,7 +8,6 @@
#include <Functions/materialize.h>
#include <Functions/FunctionsLogical.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionJIT.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>

View File

@ -24,6 +24,7 @@
#include <AggregateFunctions/AggregateFunctionState.h>
#include <IO/Operators.h>
#include <Interpreters/JIT/compileFunction.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
namespace ProfileEvents
@ -222,6 +223,30 @@ static CHJIT & getJITInstance()
return jit;
}
class CompiledAggregateFunctionsHolder final : public CompiledExpressionCacheEntry
{
public:
explicit CompiledAggregateFunctionsHolder(CompiledAggregateFunctions compiled_function_)
: CompiledExpressionCacheEntry(compiled_function_.compiled_module.size)
, compiled_aggregate_functions(compiled_function_)
{}
~CompiledAggregateFunctionsHolder() override
{
std::string symbol_names;
for (const auto & [name, _] : compiled_aggregate_functions.compiled_module.function_name_to_symbol)
{
symbol_names += name;
symbol_names += ' ';
}
std::cerr << "CompiledAggregateFunctionsHolder::~CompiledAggregateFunctionsHolder " << symbol_names << std::endl;
getJITInstance().deleteCompiledModule(compiled_aggregate_functions.compiled_module);
}
CompiledAggregateFunctions compiled_aggregate_functions;
};
#endif
Aggregator::Aggregator(const Params & params_)
@ -287,7 +312,6 @@ Aggregator::Aggregator(const Params & params_)
void Aggregator::compileAggregateFunctions()
{
static std::unordered_map<UInt128, UInt64, UInt128Hash> aggregate_functions_description_to_count;
static std::unordered_map<std::string, CompiledAggregateFunctions> aggregation_functions_dump_to_add_compiled;
static std::mutex mtx;
if (!params.compile_aggregate_expressions || params.overflow_row)
@ -324,36 +348,37 @@ void Aggregator::compileAggregateFunctions()
if (functions_to_compile.empty() || functions_to_compile.size() != aggregate_functions.size())
return;
CompiledAggregateFunctions compiled_aggregate_functions;
SipHash aggregate_functions_description_hash;
aggregate_functions_description_hash.update(functions_description);
SipHash aggregate_function_description_hash;
aggregate_function_description_hash.update(functions_description);
UInt128 aggregate_function_description_hash_result;
aggregate_function_description_hash.get128(aggregate_function_description_hash_result);
UInt128 aggregate_functions_description_hash_key;
aggregate_functions_description_hash.get128(aggregate_functions_description_hash_key);
{
std::lock_guard<std::mutex> lock(mtx);
if (aggregate_functions_description_to_count[aggregate_function_description_hash_result]++ < params.min_count_to_compile_aggregate_expression)
if (aggregate_functions_description_to_count[aggregate_functions_description_hash_key]++ < params.min_count_to_compile_aggregate_expression)
return;
auto it = aggregation_functions_dump_to_add_compiled.find(functions_description);
if (it != aggregation_functions_dump_to_add_compiled.end())
if (auto * compilation_cache = CompiledExpressionCacheFactory::instance().tryGetCache())
{
compiled_aggregate_functions = it->second;
auto [compiled_function_cache_entry, _] = compilation_cache->getOrSet(aggregate_functions_description_hash_key, [&] ()
{
LOG_TRACE(log, "Compile expression {}", functions_description);
auto compiled_aggregate_functions = compileAggregateFunctons(getJITInstance(), functions_to_compile, functions_description);
return std::make_shared<CompiledAggregateFunctionsHolder>(std::move(compiled_aggregate_functions));
});
compiled_aggregate_functions_holder = std::static_pointer_cast<CompiledAggregateFunctionsHolder>(compiled_function_cache_entry);
}
else
{
LOG_TRACE(log, "Compile expression {}", functions_description);
compiled_aggregate_functions = compileAggregateFunctons(getJITInstance(), functions_to_compile, functions_description);
aggregation_functions_dump_to_add_compiled[functions_description] = compiled_aggregate_functions;
auto compiled_aggregate_functions = compileAggregateFunctons(getJITInstance(), functions_to_compile, functions_description);
compiled_aggregate_functions_holder = std::make_shared<CompiledAggregateFunctionsHolder>(std::move(compiled_aggregate_functions));
}
}
LOG_TRACE(log, "Use compiled expression {}", functions_description);
compiled_functions.emplace(std::move(compiled_aggregate_functions));
}
#endif
@ -568,232 +593,34 @@ void NO_INLINE Aggregator::executeImpl(
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
if (!no_more_keys)
executeImplBatch<false>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
else
executeImplBatch<true>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
}
{
#if USE_EMBEDDED_COMPILER
template <bool no_more_keys, typename Method>
void NO_INLINE Aggregator::handleAggregationJIT(
Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions) const
{
std::vector<ColumnData> columns_data;
columns_data.reserve(aggregate_functions.size());
/// Add values to the aggregate functions.
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
size_t arguments_size = inst->that->getArgumentTypes().size();
for (size_t i = 0; i < arguments_size; ++i)
columns_data.emplace_back(getColumnData(inst->batch_arguments[i]));
}
auto add_into_aggregate_states_function = compiled_functions->add_into_aggregate_states_function;
auto create_aggregate_states_function = compiled_functions->create_aggregate_states_function;
std::unique_ptr<AggregateDataPtr[]> places;
bool not_all_functions_compiled = compiled_functions->functions_count != offsets_of_aggregate_states.size();
if (not_all_functions_compiled)
places.reset(new AggregateDataPtr[rows]);
auto get_aggregate_data = [&](size_t row) -> AggregateDataPtr
{
AggregateDataPtr aggregate_data;
if constexpr (!no_more_keys)
if (compiled_aggregate_functions_holder)
{
auto emplace_result = state.emplaceKey(method.data, row, *aggregates_pool);
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
if (emplace_result.isInserted())
{
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
emplace_result.setMapped(nullptr);
aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
create_aggregate_states_function(aggregate_data);
emplace_result.setMapped(aggregate_data);
}
else
aggregate_data = emplace_result.getMapped();
assert(aggregate_data != nullptr);
executeImplBatch<false, true>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
}
else
{
/// Add only if the key already exists.
/// Overflow row is disabled for JIT.
auto find_result = state.findKey(method.data, row, *aggregates_pool);
assert(find_result.getMapped() != nullptr);
aggregate_data = find_result.getMapped();
}
if (not_all_functions_compiled)
places[row] = aggregate_data;
return aggregate_data;
};
GetAggregateDataFunction get_aggregate_data_function = FunctorToStaticMethodAdaptor<decltype(get_aggregate_data)>::unsafeCall;
GetAggregateDataContext get_aggregate_data_context = reinterpret_cast<char *>(&get_aggregate_data);
add_into_aggregate_states_function(rows, columns_data.data(), get_aggregate_data_function, get_aggregate_data_context);
/// Add values to the aggregate functions.
AggregateFunctionInstruction * inst = aggregate_instructions + compiled_functions->functions_count;
for (; inst->that; ++inst)
{
if (inst->offsets)
inst->batch_that->addBatchArray(rows, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
else
inst->batch_that->addBatch(rows, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
}
}
template <bool no_more_keys, typename Method>
void NO_INLINE Aggregator::handleAggregationJITV2(
Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions) const
{
std::vector<ColumnData> columns_data;
columns_data.reserve(aggregate_functions.size());
/// Add values to the aggregate functions.
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
{
size_t arguments_size = inst->that->getArgumentTypes().size();
for (size_t i = 0; i < arguments_size; ++i)
columns_data.emplace_back(getColumnData(inst->batch_arguments[i]));
}
auto add_into_aggregate_states_function = compiled_functions->add_into_aggregate_states_function_v2;
auto create_aggregate_states_function = compiled_functions->create_aggregate_states_function;
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[rows]);
/// For all rows.
for (size_t i = 0; i < rows; ++i)
{
AggregateDataPtr aggregate_data = nullptr;
if constexpr (!no_more_keys)
{
auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool);
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
if (emplace_result.isInserted())
{
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
emplace_result.setMapped(nullptr);
aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
create_aggregate_states_function(aggregate_data);
createAggregateStates(compiled_functions->functions_count, aggregate_data);
emplace_result.setMapped(aggregate_data);
}
else
aggregate_data = emplace_result.getMapped();
assert(aggregate_data != nullptr);
}
else
{
/// Add only if the key already exists.
auto find_result = state.findKey(method.data, i, *aggregates_pool);
if (find_result.isFound())
aggregate_data = find_result.getMapped();
}
places[i] = aggregate_data;
}
add_into_aggregate_states_function(rows, columns_data.data(), places.get());
/// Add values to the aggregate functions.
AggregateFunctionInstruction * inst = aggregate_instructions + compiled_functions->functions_count;
for (; inst->that; ++inst)
{
if (inst->offsets)
inst->batch_that->addBatchArray(rows, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
else
inst->batch_that->addBatch(rows, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
}
}
#endif
template <bool no_more_keys, typename Method>
void NO_INLINE Aggregator::handleAggregationDefault(
Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions,
AggregateDataPtr overflow_row) const
{
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[rows]);
/// For all rows.
for (size_t i = 0; i < rows; ++i)
{
AggregateDataPtr aggregate_data;
if constexpr (!no_more_keys)
{
auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool);
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
if (emplace_result.isInserted())
{
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
emplace_result.setMapped(nullptr);
aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(aggregate_data);
emplace_result.setMapped(aggregate_data);
}
else
aggregate_data = emplace_result.getMapped();
assert(aggregate_data != nullptr);
executeImplBatch<false, false>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
}
else
{
/// Add only if the key already exists.
auto find_result = state.findKey(method.data, i, *aggregates_pool);
if (find_result.isFound())
aggregate_data = find_result.getMapped();
else
aggregate_data = overflow_row;
}
places[i] = aggregate_data;
}
/// Add values to the aggregate functions.
for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst)
else
{
if (inst->offsets)
inst->batch_that->addBatchArray(rows, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
#if USE_EMBEDDED_COMPILER
if (compiled_aggregate_functions_holder)
{
executeImplBatch<false, true>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
}
else
inst->batch_that->addBatch(rows, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
#endif
{
executeImplBatch<true, false>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
}
}
}
template <bool no_more_keys, typename Method>
template <bool no_more_keys, bool use_compiled_functions, typename Method>
void NO_INLINE Aggregator::executeImplBatch(
Method & method,
typename Method::State & state,
@ -850,18 +677,86 @@ void NO_INLINE Aggregator::executeImplBatch(
}
}
size_t compiled_functions_count = 0;
#if USE_EMBEDDED_COMPILER
if (compiled_functions)
{
// if (params.aggregation_method == 0)
// handleAggregationJIT<no_more_keys>(method, state, aggregates_pool, rows, aggregate_instructions);
// else
handleAggregationJITV2<no_more_keys>(method, state, aggregates_pool, rows, aggregate_instructions);
}
else
if constexpr (use_compiled_functions)
compiled_functions_count = compiled_aggregate_functions_holder->compiled_aggregate_functions.functions_count;
#endif
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[rows]);
/// For all rows.
for (size_t i = 0; i < rows; ++i)
{
handleAggregationDefault<no_more_keys>(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row);
AggregateDataPtr aggregate_data = nullptr;
if constexpr (!no_more_keys)
{
auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool);
/// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key.
if (emplace_result.isInserted())
{
/// exception-safety - if you can not allocate memory or create states, then destructors will not be called.
emplace_result.setMapped(nullptr);
aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
compiled_aggregate_functions_holder->compiled_aggregate_functions.create_aggregate_states_function(aggregate_data);
#endif
createAggregateStates(compiled_functions_count, aggregate_data);
emplace_result.setMapped(aggregate_data);
}
else
aggregate_data = emplace_result.getMapped();
assert(aggregate_data != nullptr);
}
else
{
/// Add only if the key already exists.
auto find_result = state.findKey(method.data, i, *aggregates_pool);
if (find_result.isFound())
aggregate_data = find_result.getMapped();
else
aggregate_data = overflow_row;
}
places[i] = aggregate_data;
}
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
{
std::vector<ColumnData> columns_data;
columns_data.reserve(aggregate_functions.size());
for (size_t compiled_function_index = 0; compiled_function_index < compiled_functions_count; ++compiled_function_index)
{
AggregateFunctionInstruction * inst = aggregate_instructions + compiled_function_index;
size_t arguments_size = inst->that->getArgumentTypes().size();
for (size_t i = 0; i < arguments_size; ++i)
columns_data.emplace_back(getColumnData(inst->batch_arguments[i]));
}
auto add_into_aggregate_states_function = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function;
add_into_aggregate_states_function(rows, columns_data.data(), places.get());
}
#endif
/// Add values to the aggregate functions.
AggregateFunctionInstruction * inst = aggregate_instructions + compiled_functions_count;
for (; inst->that; ++inst)
{
if (inst->offsets)
inst->batch_that->addBatchArray(rows, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool);
else
inst->batch_that->addBatch(rows, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool);
}
}
@ -1304,9 +1199,22 @@ void Aggregator::convertToBlockImpl(
raw_key_columns.push_back(column.get());
if (final)
convertToBlockImplFinal(method, data, std::move(raw_key_columns), final_aggregate_columns, arena);
{
#if USE_EMBEDDED_COMPILER
if (compiled_aggregate_functions_holder)
{
convertToBlockImplFinal<Method, true>(method, data, std::move(raw_key_columns), final_aggregate_columns, arena);
}
else
#endif
{
convertToBlockImplFinal<Method, false>(method, data, std::move(raw_key_columns), final_aggregate_columns, arena);
}
}
else
{
convertToBlockImplNotFinal(method, data, std::move(raw_key_columns), aggregate_columns);
}
/// In order to release memory early.
data.clearAndShrink();
}
@ -1380,7 +1288,7 @@ inline void Aggregator::insertAggregatesIntoColumns(
}
template <typename Method, typename Table>
template <typename Method, bool use_compiled_functions, typename Table>
void NO_INLINE Aggregator::convertToBlockImplFinal(
Method & method,
Table & data,
@ -1400,85 +1308,81 @@ void NO_INLINE Aggregator::convertToBlockImplFinal(
auto shuffled_key_sizes = method.shuffleKeyColumns(key_columns, key_sizes);
const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes;
#if USE_EMBEDDED_COMPILER
if (compiled_functions)
PaddedPODArray<AggregateDataPtr> places;
places.reserve(data.size());
data.forEachValue([&](const auto & key, auto & mapped)
{
PaddedPODArray<AggregateDataPtr> places;
places.reserve(data.size());
method.insertKeyIntoColumns(key, key_columns, key_sizes_ref);
places.emplace_back(mapped);
data.forEachValue([&](const auto & key, auto & mapped)
{
method.insertKeyIntoColumns(key, key_columns, key_sizes_ref);
places.emplace_back(mapped);
/// Mark the cell as destroyed so it will not be destroyed in destructor.
mapped = nullptr;
});
/// Mark the cell as destroyed so it will not be destroyed in destructor.
mapped = nullptr;
});
std::exception_ptr exception;
size_t aggregate_functions_destroy_index = 0;
std::exception_ptr exception;
size_t aggregate_functions_destroy_index = 0;
try
try
{
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
{
/** For JIT compiled functions we need to resize columns before pass them into compiled code.
* insert_aggregates_into_columns_function function does not throw exception.
*/
std::vector<ColumnData> columns_data;
auto compiled_functions = compiled_aggregate_functions_holder->compiled_aggregate_functions;
columns_data.reserve(final_aggregate_columns.size());
for (size_t i = 0; i < compiled_functions->functions_count; ++i)
for (size_t i = 0; i < compiled_functions.functions_count; ++i)
{
auto & final_aggregate_column = final_aggregate_columns[i];
final_aggregate_column = final_aggregate_column->cloneResized(places.size());
columns_data.emplace_back(getColumnData(final_aggregate_column.get()));
}
auto insert_aggregate_states_function = compiled_functions->insert_aggregates_into_columns_function;
insert_aggregate_states_function(places.size(), columns_data.data(), places.data());
auto insert_aggregates_into_columns_function = compiled_functions.insert_aggregates_into_columns_function;
insert_aggregates_into_columns_function(places.size(), columns_data.data(), places.data());
aggregate_functions_destroy_index = compiled_functions->functions_count;
aggregate_functions_destroy_index = compiled_functions.functions_count;
}
#endif
for (; aggregate_functions_destroy_index < params.aggregates_size;)
{
auto & final_aggregate_column = final_aggregate_columns[aggregate_functions_destroy_index];
size_t offset = offsets_of_aggregate_states[aggregate_functions_destroy_index];
for (; aggregate_functions_destroy_index < params.aggregates_size;)
{
auto & final_aggregate_column = final_aggregate_columns[aggregate_functions_destroy_index];
size_t offset = offsets_of_aggregate_states[aggregate_functions_destroy_index];
/** We increase aggregate_functions_destroy_index because by function contract if insertResultIntoAndDestroyBatch
/** We increase aggregate_functions_destroy_index because by function contract if insertResultIntoAndDestroyBatch
* throws exception, it also must destroy all necessary states.
* Then code need to continue to destroy other aggregate function states with next function index.
*/
size_t destroy_index = aggregate_functions_destroy_index;
++aggregate_functions_destroy_index;
size_t destroy_index = aggregate_functions_destroy_index;
++aggregate_functions_destroy_index;
bool is_state = aggregate_functions[destroy_index]->isState();
bool destroy_place_after_insert = !is_state;
bool is_state = aggregate_functions[destroy_index]->isState();
bool destroy_place_after_insert = !is_state;
aggregate_functions[destroy_index]->insertResultIntoAndDestroyBatch(places.size(), places.data(), offset, *final_aggregate_column, arena, destroy_place_after_insert);
}
aggregate_functions[destroy_index]->insertResultIntoAndDestroyBatch(
places.size(), places.data(), offset, *final_aggregate_column, arena, destroy_place_after_insert);
}
catch (...)
{
exception = std::current_exception();
}
for (; aggregate_functions_destroy_index < params.aggregates_size; ++aggregate_functions_destroy_index)
{
size_t offset = offsets_of_aggregate_states[aggregate_functions_destroy_index];
aggregate_functions[aggregate_functions_destroy_index]->destroyBatch(places.size(), places.data(), offset);
}
if (exception)
std::rethrow_exception(exception);
}
else
#endif
catch (...)
{
data.forEachValue([&](const auto & key, auto & mapped)
{
method.insertKeyIntoColumns(key, key_columns, key_sizes_ref);
insertAggregatesIntoColumns(mapped, final_aggregate_columns, arena);
});
exception = std::current_exception();
}
for (; aggregate_functions_destroy_index < params.aggregates_size; ++aggregate_functions_destroy_index)
{
size_t offset = offsets_of_aggregate_states[aggregate_functions_destroy_index];
aggregate_functions[aggregate_functions_destroy_index]->destroyBatch(places.size(), places.data(), offset);
}
if (exception)
std::rethrow_exception(exception);
}
template <typename Method, typename Table>
@ -1898,7 +1802,7 @@ void NO_INLINE Aggregator::mergeDataNullKey(
}
template <typename Method, typename Table>
template <typename Method, bool use_compiled_functions, typename Table>
void NO_INLINE Aggregator::mergeDataImpl(
Table & table_dst,
Table & table_src,
@ -1907,52 +1811,34 @@ void NO_INLINE Aggregator::mergeDataImpl(
if constexpr (Method::low_cardinality_optimization)
mergeDataNullKey<Method, Table>(table_dst, table_src, arena);
table_src.mergeToViaEmplace(table_dst, [&](AggregateDataPtr & __restrict dst, AggregateDataPtr & __restrict src, bool inserted)
{
if (!inserted)
{
size_t compiled_functions_count = 0;
#if USE_EMBEDDED_COMPILER
if (compiled_functions)
{
auto merge_aggregate_states_function_typed = compiled_functions->merge_aggregate_states_function;
table_src.mergeToViaEmplace(table_dst, [&](AggregateDataPtr & __restrict dst, AggregateDataPtr & __restrict src, bool inserted)
{
if (!inserted)
if constexpr (use_compiled_functions)
{
merge_aggregate_states_function_typed(dst, src);
for (size_t i = compiled_functions->functions_count; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(dst + offsets_of_aggregate_states[i], src + offsets_of_aggregate_states[i], arena);
for (size_t i = compiled_functions->functions_count; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(src + offsets_of_aggregate_states[i]);
const auto & compiled_functions = compiled_aggregate_functions_holder->compiled_aggregate_functions;
compiled_functions.merge_aggregate_states_function(dst, src);
compiled_functions_count = compiled_aggregate_functions_holder->compiled_aggregate_functions.functions_count;
}
else
{
dst = src;
}
src = nullptr;
});
}
else
#endif
{
table_src.mergeToViaEmplace(table_dst, [&](AggregateDataPtr & __restrict dst, AggregateDataPtr & __restrict src, bool inserted)
for (size_t i = compiled_functions_count; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(dst + offsets_of_aggregate_states[i], src + offsets_of_aggregate_states[i], arena);
for (size_t i = compiled_functions_count; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(src + offsets_of_aggregate_states[i]);
}
else
{
if (!inserted)
{
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(dst + offsets_of_aggregate_states[i], src + offsets_of_aggregate_states[i], arena);
dst = src;
}
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(src + offsets_of_aggregate_states[i]);
}
else
{
dst = src;
}
src = nullptr;
});
}
src = nullptr;
});
table_src.clearAndShrink();
}
@ -2056,21 +1942,41 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
AggregatedDataVariants & current = *non_empty_data[result_num];
if (!no_more_keys)
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data,
getDataVariant<Method>(current).data,
res->aggregates_pool);
{
#if USE_EMBEDDED_COMPILER
if (compiled_aggregate_functions_holder)
{
mergeDataImpl<Method, true>(
getDataVariant<Method>(*res).data,
getDataVariant<Method>(current).data,
res->aggregates_pool);
}
else
#endif
{
mergeDataImpl<Method, false>(
getDataVariant<Method>(*res).data,
getDataVariant<Method>(current).data,
res->aggregates_pool);
}
}
else if (res->without_key)
{
/// TODO: Use compile function
mergeDataNoMoreKeysImpl<Method>(
getDataVariant<Method>(*res).data,
res->without_key,
getDataVariant<Method>(current).data,
res->aggregates_pool);
}
else
{
/// TODO: Use compile function
mergeDataOnlyExistingKeysImpl<Method>(
getDataVariant<Method>(*res).data,
getDataVariant<Method>(current).data,
res->aggregates_pool);
}
/// `current` will not destroy the states of aggregate functions in the destructor
current.aggregator = nullptr;
@ -2095,11 +2001,22 @@ void NO_INLINE Aggregator::mergeBucketImpl(
return;
AggregatedDataVariants & current = *data[result_num];
mergeDataImpl<Method>(
getDataVariant<Method>(*res).data.impls[bucket],
getDataVariant<Method>(current).data.impls[bucket],
arena);
#if USE_EMBEDDED_COMPILER
if (compiled_aggregate_functions_holder)
{
mergeDataImpl<Method, true>(
getDataVariant<Method>(*res).data.impls[bucket],
getDataVariant<Method>(current).data.impls[bucket],
arena);
}
else
#endif
{
mergeDataImpl<Method, false>(
getDataVariant<Method>(*res).data.impls[bucket],
getDataVariant<Method>(current).data.impls[bucket],
arena);
}
}
}

View File

@ -852,6 +852,8 @@ using AggregatedDataVariantsPtr = std::shared_ptr<AggregatedDataVariants>;
using ManyAggregatedDataVariants = std::vector<AggregatedDataVariantsPtr>;
using ManyAggregatedDataVariantsPtr = std::shared_ptr<ManyAggregatedDataVariants>;
class CompiledAggregateFunctionsHolder;
/** How are "total" values calculated with WITH TOTALS?
* (For more details, see TotalsHavingTransform.)
*
@ -911,7 +913,6 @@ public:
bool compile_aggregate_expressions;
size_t min_count_to_compile_aggregate_expression;
size_t aggregation_method;
Params(
const Block & src_header_,
@ -924,7 +925,6 @@ public:
size_t min_free_disk_space_,
bool compile_aggregate_expressions_,
size_t min_count_to_compile_aggregate_expression_,
size_t aggregation_method_,
const Block & intermediate_header_ = {})
: src_header(src_header_),
intermediate_header(intermediate_header_),
@ -936,15 +936,14 @@ public:
tmp_volume(tmp_volume_), max_threads(max_threads_),
min_free_disk_space(min_free_disk_space_),
compile_aggregate_expressions(compile_aggregate_expressions_),
min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_),
aggregation_method(aggregation_method_)
min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_)
{
}
/// Only parameters that matter during merge.
Params(const Block & intermediate_header_,
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0, false, 0, 0)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0, false, 0)
{
intermediate_header = intermediate_header_;
}
@ -1087,7 +1086,7 @@ private:
TemporaryFiles temporary_files;
#if USE_EMBEDDED_COMPILER
std::optional<CompiledAggregateFunctions> compiled_functions;
std::shared_ptr<CompiledAggregateFunctionsHolder> compiled_aggregate_functions_holder;
#endif
/** Try to compile aggregate functions.
@ -1121,7 +1120,7 @@ private:
AggregateDataPtr overflow_row) const;
/// Specialization for a particular value no_more_keys.
template <bool no_more_keys, typename Method>
template <bool no_more_keys, bool use_compiled_expressions, typename Method>
void executeImplBatch(
Method & method,
typename Method::State & state,
@ -1130,31 +1129,6 @@ private:
AggregateFunctionInstruction * aggregate_instructions,
AggregateDataPtr overflow_row) const;
template <bool no_more_keys, typename Method>
void handleAggregationJIT(
Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions) const;
template <bool no_more_keys, typename Method>
void handleAggregationJITV2(
Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions) const;
template <bool no_more_keys, typename Method>
void handleAggregationDefault(
Method & method,
typename Method::State & state,
Arena * aggregates_pool,
size_t rows,
AggregateFunctionInstruction * aggregate_instructions,
AggregateDataPtr overflow_row) const;
/// For case when there are no keys (all aggregate into one row).
static void executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
@ -1183,7 +1157,7 @@ private:
Arena * arena) const;
/// Merge data from hash table `src` into `dst`.
template <typename Method, typename Table>
template <typename Method, bool use_compiled_functions, typename Table>
void mergeDataImpl(
Table & table_dst,
Table & table_src,
@ -1227,7 +1201,7 @@ private:
MutableColumns & final_aggregate_columns,
Arena * arena) const;
template <typename Method, typename Table>
template <typename Method, bool use_compiled_functions, typename Table>
void convertToBlockImplFinal(
Method & method,
Table & data,

View File

@ -1,6 +1,6 @@
#include <Interpreters/AsynchronousMetrics.h>
#include <Interpreters/AsynchronousMetricLog.h>
#include <Interpreters/ExpressionJIT.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/Context.h>
#include <Common/Exception.h>

View File

@ -45,7 +45,6 @@
#include <Access/SettingsConstraints.h>
#include <Access/ExternalAuthenticators.h>
#include <Access/GSSAcceptor.h>
#include <Interpreters/ExpressionJIT.h>
#include <Dictionaries/Embedded/GeoDictionariesLoader.h>
#include <Interpreters/EmbeddedDictionaries.h>
#include <Interpreters/ExternalDictionariesLoader.h>

View File

@ -1,4 +1,6 @@
#include <Interpreters/ExpressionJIT.h>
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_EMBEDDED_COMPILER
@ -20,6 +22,7 @@
#include <Interpreters/JIT/CHJIT.h>
#include <Interpreters/JIT/CompileDAG.h>
#include <Interpreters/JIT/compileFunction.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Interpreters/ActionsDAG.h>
namespace DB
@ -42,15 +45,16 @@ static Poco::Logger * getLogger()
return &logger;
}
class CompiledFunctionHolder
class CompiledFunctionHolder : public CompiledExpressionCacheEntry
{
public:
explicit CompiledFunctionHolder(CompiledFunction compiled_function_)
: compiled_function(compiled_function_)
: CompiledExpressionCacheEntry(compiled_function_.compiled_module.size)
, compiled_function(compiled_function_)
{}
~CompiledFunctionHolder()
~CompiledFunctionHolder() override
{
getJITInstance().deleteCompiledModule(compiled_function.compiled_module);
}
@ -287,19 +291,18 @@ static FunctionBasePtr compile(
{
LOG_TRACE(getLogger(), "Compile expression {}", llvm_function->getName());
auto compiled_function = compileFunction(getJITInstance(), *llvm_function);
auto compiled_function_holder = std::make_shared<CompiledFunctionHolder>(compiled_function);
return std::make_shared<CompiledFunctionCacheEntry>(std::move(compiled_function_holder), compiled_function.compiled_module.size);
return std::make_shared<CompiledFunctionHolder>(compiled_function);
});
llvm_function->setCompiledFunction(compiled_function_cache_entry->getCompiledFunctionHolder());
std::shared_ptr<CompiledFunctionHolder> compiled_function_holder = std::static_pointer_cast<CompiledFunctionHolder>(compiled_function_cache_entry);
llvm_function->setCompiledFunction(std::move(compiled_function_holder));
}
else
{
auto compiled_function = compileFunction(getJITInstance(), *llvm_function);
auto compiled_function_ptr = std::make_shared<CompiledFunctionHolder>(compiled_function);
auto compiled_function_holder = std::make_shared<CompiledFunctionHolder>(compiled_function);
llvm_function->setCompiledFunction(compiled_function_ptr);
llvm_function->setCompiledFunction(std::move(compiled_function_holder));
}
return llvm_function;
@ -568,25 +571,6 @@ void ActionsDAG::compileFunctions(size_t min_count_to_compile_expression)
}
}
CompiledExpressionCacheFactory & CompiledExpressionCacheFactory::instance()
{
static CompiledExpressionCacheFactory factory;
return factory;
}
void CompiledExpressionCacheFactory::init(size_t cache_size)
{
if (cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "CompiledExpressionCache was already initialized");
cache = std::make_unique<CompiledExpressionCache>(cache_size);
}
CompiledExpressionCache * CompiledExpressionCacheFactory::tryGetCache()
{
return cache.get();
}
}
#endif

View File

@ -1,63 +0,0 @@
#pragma once
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_EMBEDDED_COMPILER
# include <Common/LRUCache.h>
# include <Common/HashTable/Hash.h>
namespace DB
{
class CompiledFunctionHolder;
class CompiledFunctionCacheEntry
{
public:
CompiledFunctionCacheEntry(std::shared_ptr<CompiledFunctionHolder> compiled_function_holder_, size_t compiled_function_size_)
: compiled_function_holder(std::move(compiled_function_holder_))
, compiled_function_size(compiled_function_size_)
{}
std::shared_ptr<CompiledFunctionHolder> getCompiledFunctionHolder() const { return compiled_function_holder; }
size_t getCompiledFunctionSize() const { return compiled_function_size; }
private:
std::shared_ptr<CompiledFunctionHolder> compiled_function_holder;
size_t compiled_function_size;
};
struct CompiledFunctionWeightFunction
{
size_t operator()(const CompiledFunctionCacheEntry & compiled_function) const
{
return compiled_function.getCompiledFunctionSize();
}
};
class CompiledExpressionCache : public LRUCache<UInt128, CompiledFunctionCacheEntry, UInt128Hash, CompiledFunctionWeightFunction>
{
public:
using Base = LRUCache<UInt128, CompiledFunctionCacheEntry, UInt128Hash, CompiledFunctionWeightFunction>;
using Base::Base;
};
class CompiledExpressionCacheFactory
{
private:
std::unique_ptr<CompiledExpressionCache> cache;
public:
static CompiledExpressionCacheFactory & instance();
void init(size_t cache_size);
CompiledExpressionCache * tryGetCache();
};
}
#endif

View File

@ -2040,8 +2040,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression,
settings.aggregation_method);
settings.min_count_to_compile_aggregate_expression);
SortDescription group_by_sort_description;
@ -2145,8 +2144,7 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modific
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression,
settings.aggregation_method);
settings.min_count_to_compile_aggregate_expression);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, true);

View File

@ -25,7 +25,7 @@
#include <Interpreters/MetricLog.h>
#include <Interpreters/AsynchronousMetricLog.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/ExpressionJIT.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Access/ContextAccess.h>
#include <Access/AllowedClientHosts.h>
#include <Databases/IDatabase.h>

View File

@ -0,0 +1,34 @@
#include "CompiledExpressionCache.h"
#if USE_EMBEDDED_COMPILER
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
CompiledExpressionCacheFactory & CompiledExpressionCacheFactory::instance()
{
static CompiledExpressionCacheFactory factory;
return factory;
}
void CompiledExpressionCacheFactory::init(size_t cache_size)
{
if (cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "CompiledExpressionCache was already initialized");
cache = std::make_unique<CompiledExpressionCache>(cache_size);
}
CompiledExpressionCache * CompiledExpressionCacheFactory::tryGetCache()
{
return cache.get();
}
}
#endif

View File

@ -0,0 +1,61 @@
#pragma once
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_EMBEDDED_COMPILER
# include <Common/LRUCache.h>
# include <Common/HashTable/Hash.h>
# include <Interpreters/JIT/CHJIT.h>
namespace DB
{
class CompiledExpressionCacheEntry
{
public:
explicit CompiledExpressionCacheEntry(size_t compiled_expression_size_)
: compiled_expression_size(compiled_expression_size_)
{}
size_t getCompiledExpressionSize() const { return compiled_expression_size; }
virtual ~CompiledExpressionCacheEntry() {}
private:
size_t compiled_expression_size = 0;
};
struct CompiledFunctionWeightFunction
{
size_t operator()(const CompiledExpressionCacheEntry & compiled_function) const
{
return compiled_function.getCompiledExpressionSize();
}
};
class CompiledExpressionCache : public LRUCache<UInt128, CompiledExpressionCacheEntry, UInt128Hash, CompiledFunctionWeightFunction>
{
public:
using Base = LRUCache<UInt128, CompiledExpressionCacheEntry, UInt128Hash, CompiledFunctionWeightFunction>;
using Base::Base;
};
class CompiledExpressionCacheFactory
{
private:
std::unique_ptr<CompiledExpressionCache> cache;
public:
static CompiledExpressionCacheFactory & instance();
void init(size_t cache_size);
CompiledExpressionCache * tryGetCache();
};
}
#endif

View File

@ -307,137 +307,6 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const
auto & context = module.getContext();
llvm::IRBuilder<> b(context);
auto * size_type = b.getIntNTy(sizeof(size_t) * 8);
auto * column_data_type = llvm::StructType::get(b.getInt8PtrTy(), b.getInt8PtrTy());
auto * get_place_func_declaration = llvm::FunctionType::get(b.getInt8Ty()->getPointerTo(), { b.getInt8Ty()->getPointerTo(), size_type }, /*isVarArg=*/false);
auto * aggregate_loop_func_declaration = llvm::FunctionType::get(b.getVoidTy(), { size_type, column_data_type->getPointerTo(), get_place_func_declaration->getPointerTo(), b.getInt8Ty()->getPointerTo() }, false);
auto * aggregate_loop_func_definition = llvm::Function::Create(aggregate_loop_func_declaration, llvm::Function::ExternalLinkage, name, module);
auto * arguments = aggregate_loop_func_definition->args().begin();
llvm::Value * rows_count_arg = arguments++;
llvm::Value * columns_arg = arguments++;
llvm::Value * get_place_function_arg = arguments++;
llvm::Value * get_place_function_context_arg = arguments++;
/// Initialize ColumnDataPlaceholder llvm representation of ColumnData
auto * entry = llvm::BasicBlock::Create(b.getContext(), "entry", aggregate_loop_func_definition);
b.SetInsertPoint(entry);
std::vector<ColumnDataPlaceholder> columns;
size_t previous_columns_size = 0;
for (const auto & function : functions)
{
auto argument_types = function.function->getArgumentTypes();
ColumnDataPlaceholder data_placeholder;
size_t function_arguments_size = argument_types.size();
for (size_t column_argument_index = 0; column_argument_index < function_arguments_size; ++column_argument_index)
{
const auto & argument_type = argument_types[column_argument_index];
auto * data = b.CreateLoad(column_data_type, b.CreateConstInBoundsGEP1_32(column_data_type, columns_arg, previous_columns_size + column_argument_index));
data_placeholder.data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(argument_type))->getPointerTo());
data_placeholder.null_init = argument_type->isNullable() ? b.CreateExtractValue(data, {1}) : nullptr;
columns.emplace_back(data_placeholder);
}
previous_columns_size += function_arguments_size;
}
/// Initialize loop
auto * end = llvm::BasicBlock::Create(b.getContext(), "end", aggregate_loop_func_definition);
auto * loop = llvm::BasicBlock::Create(b.getContext(), "loop", aggregate_loop_func_definition);
b.CreateCondBr(b.CreateICmpEQ(rows_count_arg, llvm::ConstantInt::get(size_type, 0)), end, loop);
b.SetInsertPoint(loop);
auto * counter_phi = b.CreatePHI(rows_count_arg->getType(), 2);
counter_phi->addIncoming(llvm::ConstantInt::get(size_type, 0), entry);
for (auto & col : columns)
{
col.data = b.CreatePHI(col.data_init->getType(), 2);
col.data->addIncoming(col.data_init, entry);
if (col.null_init)
{
col.null = b.CreatePHI(col.null_init->getType(), 2);
col.null->addIncoming(col.null_init, entry);
}
}
auto * aggregation_place = b.CreateCall(get_place_func_declaration, get_place_function_arg, { get_place_function_context_arg, counter_phi });
previous_columns_size = 0;
for (const auto & function : functions)
{
size_t aggregate_function_offset = function.aggregate_data_offset;
const auto * aggregate_function_ptr = function.function;
auto arguments_types = function.function->getArgumentTypes();
std::vector<llvm::Value *> arguments_values;
size_t function_arguments_size = arguments_types.size();
arguments_values.resize(function_arguments_size);
for (size_t column_argument_index = 0; column_argument_index < function_arguments_size; ++column_argument_index)
{
auto * column_argument_data = columns[previous_columns_size + column_argument_index].data;
auto * column_argument_null_data = columns[previous_columns_size + column_argument_index].null;
auto & argument_type = arguments_types[column_argument_index];
auto * value = b.CreateLoad(toNativeType(b, removeNullable(argument_type)), column_argument_data);
if (!argument_type->isNullable())
{
arguments_values[column_argument_index] = value;
continue;
}
auto * is_null = b.CreateICmpNE(b.CreateLoad(b.getInt8Ty(), column_argument_null_data), b.getInt8(0));
auto * nullable_unitilized = llvm::Constant::getNullValue(toNativeType(b, argument_type));
auto * nullable_value = b.CreateInsertValue(b.CreateInsertValue(nullable_unitilized, value, {0}), is_null, {1});
arguments_values[column_argument_index] = nullable_value;
}
auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_32(nullptr, aggregation_place, aggregate_function_offset);
aggregate_function_ptr->compileAdd(b, aggregation_place_with_offset, arguments_types, arguments_values);
previous_columns_size += function_arguments_size;
}
/// End of loop
auto * cur_block = b.GetInsertBlock();
for (auto & col : columns)
{
col.data->addIncoming(b.CreateConstInBoundsGEP1_32(nullptr, col.data, 1), cur_block);
if (col.null)
col.null->addIncoming(b.CreateConstInBoundsGEP1_32(nullptr, col.null, 1), cur_block);
}
auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1));
counter_phi->addIncoming(value, cur_block);
b.CreateCondBr(b.CreateICmpEQ(value, rows_count_arg), end, loop);
b.SetInsertPoint(end);
b.CreateRetVoid();
}
static void compileAddIntoAggregateStatesFunctionsV2(llvm::Module & module, const std::vector<AggregateFunctionWithOffset> & functions, const std::string & name)
{
auto & context = module.getContext();
llvm::IRBuilder<> b(context);
auto * size_type = b.getIntNTy(sizeof(size_t) * 8);
auto * places_type = b.getInt8Ty()->getPointerTo()->getPointerTo();
auto * column_data_type = llvm::StructType::get(b.getInt8PtrTy(), b.getInt8PtrTy());
@ -699,7 +568,6 @@ CompiledAggregateFunctions compileAggregateFunctons(CHJIT & jit, const std::vect
{
std::string create_aggregate_states_functions_name = functions_dump_name + "_create";
std::string add_aggregate_states_functions_name = functions_dump_name + "_add";
std::string add_aggregate_states_functions_name_v2 = functions_dump_name + "_add_v2";
std::string merge_aggregate_states_functions_name = functions_dump_name + "_merge";
std::string insert_aggregate_states_functions_name = functions_dump_name + "_insert";
@ -707,20 +575,17 @@ CompiledAggregateFunctions compileAggregateFunctons(CHJIT & jit, const std::vect
{
compileCreateAggregateStatesFunctions(module, functions, create_aggregate_states_functions_name);
compileAddIntoAggregateStatesFunctions(module, functions, add_aggregate_states_functions_name);
compileAddIntoAggregateStatesFunctionsV2(module, functions, add_aggregate_states_functions_name_v2);
compileMergeAggregatesStates(module, functions, merge_aggregate_states_functions_name);
compileInsertAggregatesIntoResultColumns(module, functions, insert_aggregate_states_functions_name);
});
auto create_aggregate_states_function = reinterpret_cast<JITCreateAggregateStatesFunction>(compiled_module.function_name_to_symbol[create_aggregate_states_functions_name]);
auto add_into_aggregate_states_function = reinterpret_cast<JITAddIntoAggregateStatesFunction>(compiled_module.function_name_to_symbol[add_aggregate_states_functions_name]);
auto add_into_aggregate_states_function_v2 = reinterpret_cast<JITAddIntoAggregateStatesFunctionV2>(compiled_module.function_name_to_symbol[add_aggregate_states_functions_name_v2]);
auto merge_aggregate_states_function = reinterpret_cast<JITMergeAggregateStatesFunction>(compiled_module.function_name_to_symbol[merge_aggregate_states_functions_name]);
auto insert_aggregate_states_function = reinterpret_cast<JITInsertAggregatesIntoColumnsFunction>(compiled_module.function_name_to_symbol[insert_aggregate_states_functions_name]);
assert(create_aggregate_states_function);
assert(add_into_aggregate_states_function);
assert(add_into_aggregate_states_function_v2);
assert(merge_aggregate_states_function);
assert(insert_aggregate_states_function);
@ -728,7 +593,6 @@ CompiledAggregateFunctions compileAggregateFunctons(CHJIT & jit, const std::vect
{
.create_aggregate_states_function = create_aggregate_states_function,
.add_into_aggregate_states_function = add_into_aggregate_states_function,
.add_into_aggregate_states_function_v2 = add_into_aggregate_states_function_v2,
.merge_aggregate_states_function = merge_aggregate_states_function,
.insert_aggregates_into_columns_function = insert_aggregate_states_function,

View File

@ -41,13 +41,8 @@ struct CompiledFunction
};
/** Compile function to native jit code using CHJIT instance.
* Function is compiled as single module.
* After this function execution, code for function will be compiled and can be queried using
* findCompiledFunction with function name.
* Compiled function can be safely casted to JITCompiledFunction type and must be called with
* valid ColumnData and ColumnDataRowsSize.
* It is important that ColumnData parameter of JITCompiledFunction is result column,
* and will be filled by compiled function.
* It is client responsibility to match ColumnData arguments size with
* function arguments size and additional ColumnData for result.
*/
CompiledFunction compileFunction(CHJIT & jit, const IFunctionBase & function);
@ -57,12 +52,8 @@ struct AggregateFunctionWithOffset
size_t aggregate_data_offset;
};
using GetAggregateDataContext = char *;
using GetAggregateDataFunction = AggregateDataPtr (*)(GetAggregateDataContext, size_t);
using JITCreateAggregateStatesFunction = void (*)(AggregateDataPtr);
using JITAddIntoAggregateStatesFunction = void (*)(ColumnDataRowsSize, ColumnData *, GetAggregateDataFunction, GetAggregateDataContext);
using JITAddIntoAggregateStatesFunctionV2 = void (*)(ColumnDataRowsSize, ColumnData *, AggregateDataPtr *);
using JITAddIntoAggregateStatesFunction = void (*)(ColumnDataRowsSize, ColumnData *, AggregateDataPtr *);
using JITMergeAggregateStatesFunction = void (*)(AggregateDataPtr, AggregateDataPtr);
using JITInsertAggregatesIntoColumnsFunction = void (*)(ColumnDataRowsSize, ColumnData *, AggregateDataPtr *);
@ -70,7 +61,6 @@ struct CompiledAggregateFunctions
{
JITCreateAggregateStatesFunction create_aggregate_states_function;
JITAddIntoAggregateStatesFunction add_into_aggregate_states_function;
JITAddIntoAggregateStatesFunctionV2 add_into_aggregate_states_function_v2;
JITMergeAggregateStatesFunction merge_aggregate_states_function;
JITInsertAggregatesIntoColumnsFunction insert_aggregates_into_columns_function;

View File

@ -303,7 +303,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
settings.min_free_disk_space_for_temporary_data,
settings.compile_expressions,
settings.min_count_to_compile_aggregate_expression,
settings.aggregation_method,
header_before_aggregation); // The source header is also an intermediate header
transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), query_info.projection->aggregate_final);
@ -334,8 +333,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
settings.min_count_to_compile_aggregate_expression,
settings.aggregation_method);
settings.min_count_to_compile_aggregate_expression);
transform_params = std::make_shared<AggregatingTransformParams>(std::move(params), query_info.projection->aggregate_final);
}