diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 88f7564a7f2..9d7a4275dc1 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -50,7 +50,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/src/Core/Settings.h b/src/Core/Settings.h index feebaad3ce7..c260aa41230 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -108,7 +108,6 @@ class IColumn; M(UInt64, min_count_to_compile_expression, 3, "The number of identical expressions before they are JIT-compiled", 0) \ M(Bool, compile_aggregate_expressions, true, "Compile aggregate functions to native code.", 0) \ M(UInt64, min_count_to_compile_aggregate_expression, 0, "The number of identical aggreagte expressions before they are JIT-compiled", 0) \ - M(UInt64, aggregation_method, 0, "Aggregation method", 0) \ M(UInt64, group_by_two_level_threshold, 100000, "From what number of keys, a two-level aggregation starts. 0 - the threshold is not set.", 0) \ M(UInt64, group_by_two_level_threshold_bytes, 50000000, "From what size of the aggregation state in bytes, a two-level aggregation begins to be used. 0 - the threshold is not set. Two-level aggregation is used when at least one of the thresholds is triggered.", 0) \ M(Bool, distributed_aggregation_memory_efficient, true, "Is the memory-saving mode of distributed aggregation enabled.", 0) \ diff --git a/src/DataStreams/TTLAggregationAlgorithm.cpp b/src/DataStreams/TTLAggregationAlgorithm.cpp index ceebae1ab1c..66792dcfdb2 100644 --- a/src/DataStreams/TTLAggregationAlgorithm.cpp +++ b/src/DataStreams/TTLAggregationAlgorithm.cpp @@ -34,7 +34,7 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm( false, settings.max_rows_to_group_by, settings.group_by_overflow_mode, 0, 0, settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set, storage_.getContext()->getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data, - settings.compile_aggregate_expressions, settings.min_count_to_compile_aggregate_expression, settings.aggregation_method); + settings.compile_aggregate_expressions, settings.min_count_to_compile_aggregate_expression); aggregator = std::make_unique(params); } diff --git a/src/Interpreters/ActionsDAG.cpp b/src/Interpreters/ActionsDAG.cpp index 9fa48f6ceab..1518706f0a6 100644 --- a/src/Interpreters/ActionsDAG.cpp +++ b/src/Interpreters/ActionsDAG.cpp @@ -8,7 +8,6 @@ #include #include #include -#include #include #include diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 673b9d600f4..3543783494b 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -24,6 +24,7 @@ #include #include #include +#include namespace ProfileEvents @@ -222,6 +223,30 @@ static CHJIT & getJITInstance() return jit; } +class CompiledAggregateFunctionsHolder final : public CompiledExpressionCacheEntry +{ +public: + explicit CompiledAggregateFunctionsHolder(CompiledAggregateFunctions compiled_function_) + : CompiledExpressionCacheEntry(compiled_function_.compiled_module.size) + , compiled_aggregate_functions(compiled_function_) + {} + + ~CompiledAggregateFunctionsHolder() override + { + std::string symbol_names; + for (const auto & [name, _] : compiled_aggregate_functions.compiled_module.function_name_to_symbol) + { + symbol_names += name; + symbol_names += ' '; + } + + std::cerr << "CompiledAggregateFunctionsHolder::~CompiledAggregateFunctionsHolder " << symbol_names << std::endl; + getJITInstance().deleteCompiledModule(compiled_aggregate_functions.compiled_module); + } + + CompiledAggregateFunctions compiled_aggregate_functions; +}; + #endif Aggregator::Aggregator(const Params & params_) @@ -287,7 +312,6 @@ Aggregator::Aggregator(const Params & params_) void Aggregator::compileAggregateFunctions() { static std::unordered_map aggregate_functions_description_to_count; - static std::unordered_map aggregation_functions_dump_to_add_compiled; static std::mutex mtx; if (!params.compile_aggregate_expressions || params.overflow_row) @@ -324,36 +348,37 @@ void Aggregator::compileAggregateFunctions() if (functions_to_compile.empty() || functions_to_compile.size() != aggregate_functions.size()) return; - CompiledAggregateFunctions compiled_aggregate_functions; + SipHash aggregate_functions_description_hash; + aggregate_functions_description_hash.update(functions_description); - SipHash aggregate_function_description_hash; - aggregate_function_description_hash.update(functions_description); - - UInt128 aggregate_function_description_hash_result; - aggregate_function_description_hash.get128(aggregate_function_description_hash_result); + UInt128 aggregate_functions_description_hash_key; + aggregate_functions_description_hash.get128(aggregate_functions_description_hash_key); { std::lock_guard lock(mtx); - if (aggregate_functions_description_to_count[aggregate_function_description_hash_result]++ < params.min_count_to_compile_aggregate_expression) + if (aggregate_functions_description_to_count[aggregate_functions_description_hash_key]++ < params.min_count_to_compile_aggregate_expression) return; - auto it = aggregation_functions_dump_to_add_compiled.find(functions_description); - if (it != aggregation_functions_dump_to_add_compiled.end()) + if (auto * compilation_cache = CompiledExpressionCacheFactory::instance().tryGetCache()) { - compiled_aggregate_functions = it->second; + auto [compiled_function_cache_entry, _] = compilation_cache->getOrSet(aggregate_functions_description_hash_key, [&] () + { + LOG_TRACE(log, "Compile expression {}", functions_description); + + auto compiled_aggregate_functions = compileAggregateFunctons(getJITInstance(), functions_to_compile, functions_description); + return std::make_shared(std::move(compiled_aggregate_functions)); + }); + + compiled_aggregate_functions_holder = std::static_pointer_cast(compiled_function_cache_entry); } else { LOG_TRACE(log, "Compile expression {}", functions_description); - - compiled_aggregate_functions = compileAggregateFunctons(getJITInstance(), functions_to_compile, functions_description); - aggregation_functions_dump_to_add_compiled[functions_description] = compiled_aggregate_functions; + auto compiled_aggregate_functions = compileAggregateFunctons(getJITInstance(), functions_to_compile, functions_description); + compiled_aggregate_functions_holder = std::make_shared(std::move(compiled_aggregate_functions)); } } - - LOG_TRACE(log, "Use compiled expression {}", functions_description); - compiled_functions.emplace(std::move(compiled_aggregate_functions)); } #endif @@ -568,232 +593,34 @@ void NO_INLINE Aggregator::executeImpl( typename Method::State state(key_columns, key_sizes, aggregation_state_cache); if (!no_more_keys) - executeImplBatch(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row); - else - executeImplBatch(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row); -} - + { #if USE_EMBEDDED_COMPILER - -template -void NO_INLINE Aggregator::handleAggregationJIT( - Method & method, - typename Method::State & state, - Arena * aggregates_pool, - size_t rows, - AggregateFunctionInstruction * aggregate_instructions) const -{ - std::vector columns_data; - columns_data.reserve(aggregate_functions.size()); - - /// Add values to the aggregate functions. - for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst) - { - size_t arguments_size = inst->that->getArgumentTypes().size(); - for (size_t i = 0; i < arguments_size; ++i) - columns_data.emplace_back(getColumnData(inst->batch_arguments[i])); - } - - auto add_into_aggregate_states_function = compiled_functions->add_into_aggregate_states_function; - auto create_aggregate_states_function = compiled_functions->create_aggregate_states_function; - - std::unique_ptr places; - - bool not_all_functions_compiled = compiled_functions->functions_count != offsets_of_aggregate_states.size(); - if (not_all_functions_compiled) - places.reset(new AggregateDataPtr[rows]); - - auto get_aggregate_data = [&](size_t row) -> AggregateDataPtr - { - AggregateDataPtr aggregate_data; - - if constexpr (!no_more_keys) + if (compiled_aggregate_functions_holder) { - auto emplace_result = state.emplaceKey(method.data, row, *aggregates_pool); - - /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. - if (emplace_result.isInserted()) - { - /// exception-safety - if you can not allocate memory or create states, then destructors will not be called. - emplace_result.setMapped(nullptr); - - aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states); - create_aggregate_states_function(aggregate_data); - - emplace_result.setMapped(aggregate_data); - } - else - aggregate_data = emplace_result.getMapped(); - - assert(aggregate_data != nullptr); + executeImplBatch(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row); } else - { - /// Add only if the key already exists. - /// Overflow row is disabled for JIT. - auto find_result = state.findKey(method.data, row, *aggregates_pool); - assert(find_result.getMapped() != nullptr); - - aggregate_data = find_result.getMapped(); - } - - if (not_all_functions_compiled) - places[row] = aggregate_data; - - return aggregate_data; - }; - - GetAggregateDataFunction get_aggregate_data_function = FunctorToStaticMethodAdaptor::unsafeCall; - GetAggregateDataContext get_aggregate_data_context = reinterpret_cast(&get_aggregate_data); - - add_into_aggregate_states_function(rows, columns_data.data(), get_aggregate_data_function, get_aggregate_data_context); - - /// Add values to the aggregate functions. - AggregateFunctionInstruction * inst = aggregate_instructions + compiled_functions->functions_count; - for (; inst->that; ++inst) - { - if (inst->offsets) - inst->batch_that->addBatchArray(rows, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool); - else - inst->batch_that->addBatch(rows, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool); - } -} - -template -void NO_INLINE Aggregator::handleAggregationJITV2( - Method & method, - typename Method::State & state, - Arena * aggregates_pool, - size_t rows, - AggregateFunctionInstruction * aggregate_instructions) const -{ - std::vector columns_data; - columns_data.reserve(aggregate_functions.size()); - - /// Add values to the aggregate functions. - for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst) - { - size_t arguments_size = inst->that->getArgumentTypes().size(); - for (size_t i = 0; i < arguments_size; ++i) - columns_data.emplace_back(getColumnData(inst->batch_arguments[i])); - } - - auto add_into_aggregate_states_function = compiled_functions->add_into_aggregate_states_function_v2; - auto create_aggregate_states_function = compiled_functions->create_aggregate_states_function; - - std::unique_ptr places(new AggregateDataPtr[rows]); - - /// For all rows. - for (size_t i = 0; i < rows; ++i) - { - AggregateDataPtr aggregate_data = nullptr; - - if constexpr (!no_more_keys) - { - auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool); - - /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. - if (emplace_result.isInserted()) - { - /// exception-safety - if you can not allocate memory or create states, then destructors will not be called. - emplace_result.setMapped(nullptr); - - aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states); - create_aggregate_states_function(aggregate_data); - createAggregateStates(compiled_functions->functions_count, aggregate_data); - emplace_result.setMapped(aggregate_data); - } - else - aggregate_data = emplace_result.getMapped(); - - assert(aggregate_data != nullptr); - } - else - { - /// Add only if the key already exists. - auto find_result = state.findKey(method.data, i, *aggregates_pool); - if (find_result.isFound()) - aggregate_data = find_result.getMapped(); - } - - places[i] = aggregate_data; - } - - add_into_aggregate_states_function(rows, columns_data.data(), places.get()); - - /// Add values to the aggregate functions. - AggregateFunctionInstruction * inst = aggregate_instructions + compiled_functions->functions_count; - for (; inst->that; ++inst) - { - if (inst->offsets) - inst->batch_that->addBatchArray(rows, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool); - else - inst->batch_that->addBatch(rows, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool); - } -} - #endif - -template -void NO_INLINE Aggregator::handleAggregationDefault( - Method & method, - typename Method::State & state, - Arena * aggregates_pool, - size_t rows, - AggregateFunctionInstruction * aggregate_instructions, - AggregateDataPtr overflow_row) const -{ - std::unique_ptr places(new AggregateDataPtr[rows]); - - /// For all rows. - for (size_t i = 0; i < rows; ++i) - { - AggregateDataPtr aggregate_data; - - if constexpr (!no_more_keys) { - auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool); - - /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. - if (emplace_result.isInserted()) - { - /// exception-safety - if you can not allocate memory or create states, then destructors will not be called. - emplace_result.setMapped(nullptr); - - aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states); - createAggregateStates(aggregate_data); - - emplace_result.setMapped(aggregate_data); - } - else - aggregate_data = emplace_result.getMapped(); - - assert(aggregate_data != nullptr); + executeImplBatch(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row); } - else - { - /// Add only if the key already exists. - auto find_result = state.findKey(method.data, i, *aggregates_pool); - if (find_result.isFound()) - aggregate_data = find_result.getMapped(); - else - aggregate_data = overflow_row; - } - - places[i] = aggregate_data; } - - /// Add values to the aggregate functions. - for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst) + else { - if (inst->offsets) - inst->batch_that->addBatchArray(rows, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool); +#if USE_EMBEDDED_COMPILER + if (compiled_aggregate_functions_holder) + { + executeImplBatch(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row); + } else - inst->batch_that->addBatch(rows, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool); +#endif + { + executeImplBatch(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row); + } } } -template +template void NO_INLINE Aggregator::executeImplBatch( Method & method, typename Method::State & state, @@ -850,18 +677,86 @@ void NO_INLINE Aggregator::executeImplBatch( } } + size_t compiled_functions_count = 0; + #if USE_EMBEDDED_COMPILER - if (compiled_functions) - { - // if (params.aggregation_method == 0) - // handleAggregationJIT(method, state, aggregates_pool, rows, aggregate_instructions); - // else - handleAggregationJITV2(method, state, aggregates_pool, rows, aggregate_instructions); - } - else + if constexpr (use_compiled_functions) + compiled_functions_count = compiled_aggregate_functions_holder->compiled_aggregate_functions.functions_count; #endif + + std::unique_ptr places(new AggregateDataPtr[rows]); + + /// For all rows. + for (size_t i = 0; i < rows; ++i) { - handleAggregationDefault(method, state, aggregates_pool, rows, aggregate_instructions, overflow_row); + AggregateDataPtr aggregate_data = nullptr; + + if constexpr (!no_more_keys) + { + auto emplace_result = state.emplaceKey(method.data, i, *aggregates_pool); + + /// If a new key is inserted, initialize the states of the aggregate functions, and possibly something related to the key. + if (emplace_result.isInserted()) + { + /// exception-safety - if you can not allocate memory or create states, then destructors will not be called. + emplace_result.setMapped(nullptr); + + aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states); + +#if USE_EMBEDDED_COMPILER + if constexpr (use_compiled_functions) + compiled_aggregate_functions_holder->compiled_aggregate_functions.create_aggregate_states_function(aggregate_data); +#endif + createAggregateStates(compiled_functions_count, aggregate_data); + + emplace_result.setMapped(aggregate_data); + } + else + aggregate_data = emplace_result.getMapped(); + + assert(aggregate_data != nullptr); + } + else + { + /// Add only if the key already exists. + auto find_result = state.findKey(method.data, i, *aggregates_pool); + if (find_result.isFound()) + aggregate_data = find_result.getMapped(); + else + aggregate_data = overflow_row; + } + + places[i] = aggregate_data; + } + +#if USE_EMBEDDED_COMPILER + if constexpr (use_compiled_functions) + { + std::vector columns_data; + columns_data.reserve(aggregate_functions.size()); + + for (size_t compiled_function_index = 0; compiled_function_index < compiled_functions_count; ++compiled_function_index) + { + AggregateFunctionInstruction * inst = aggregate_instructions + compiled_function_index; + size_t arguments_size = inst->that->getArgumentTypes().size(); + + for (size_t i = 0; i < arguments_size; ++i) + columns_data.emplace_back(getColumnData(inst->batch_arguments[i])); + } + + auto add_into_aggregate_states_function = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function; + add_into_aggregate_states_function(rows, columns_data.data(), places.get()); + } +#endif + + /// Add values to the aggregate functions. + AggregateFunctionInstruction * inst = aggregate_instructions + compiled_functions_count; + for (; inst->that; ++inst) + { + if (inst->offsets) + inst->batch_that->addBatchArray(rows, places.get(), inst->state_offset, inst->batch_arguments, inst->offsets, aggregates_pool); + else + inst->batch_that->addBatch(rows, places.get(), inst->state_offset, inst->batch_arguments, aggregates_pool); } } @@ -1304,9 +1199,22 @@ void Aggregator::convertToBlockImpl( raw_key_columns.push_back(column.get()); if (final) - convertToBlockImplFinal(method, data, std::move(raw_key_columns), final_aggregate_columns, arena); + { +#if USE_EMBEDDED_COMPILER + if (compiled_aggregate_functions_holder) + { + convertToBlockImplFinal(method, data, std::move(raw_key_columns), final_aggregate_columns, arena); + } + else +#endif + { + convertToBlockImplFinal(method, data, std::move(raw_key_columns), final_aggregate_columns, arena); + } + } else + { convertToBlockImplNotFinal(method, data, std::move(raw_key_columns), aggregate_columns); + } /// In order to release memory early. data.clearAndShrink(); } @@ -1380,7 +1288,7 @@ inline void Aggregator::insertAggregatesIntoColumns( } -template +template void NO_INLINE Aggregator::convertToBlockImplFinal( Method & method, Table & data, @@ -1400,85 +1308,81 @@ void NO_INLINE Aggregator::convertToBlockImplFinal( auto shuffled_key_sizes = method.shuffleKeyColumns(key_columns, key_sizes); const auto & key_sizes_ref = shuffled_key_sizes ? *shuffled_key_sizes : key_sizes; -#if USE_EMBEDDED_COMPILER - if (compiled_functions) + PaddedPODArray places; + places.reserve(data.size()); + + data.forEachValue([&](const auto & key, auto & mapped) { - PaddedPODArray places; - places.reserve(data.size()); + method.insertKeyIntoColumns(key, key_columns, key_sizes_ref); + places.emplace_back(mapped); - data.forEachValue([&](const auto & key, auto & mapped) - { - method.insertKeyIntoColumns(key, key_columns, key_sizes_ref); - places.emplace_back(mapped); + /// Mark the cell as destroyed so it will not be destroyed in destructor. + mapped = nullptr; + }); - /// Mark the cell as destroyed so it will not be destroyed in destructor. - mapped = nullptr; - }); + std::exception_ptr exception; + size_t aggregate_functions_destroy_index = 0; - std::exception_ptr exception; - size_t aggregate_functions_destroy_index = 0; - - try + try + { +#if USE_EMBEDDED_COMPILER + if constexpr (use_compiled_functions) { /** For JIT compiled functions we need to resize columns before pass them into compiled code. * insert_aggregates_into_columns_function function does not throw exception. */ std::vector columns_data; + + auto compiled_functions = compiled_aggregate_functions_holder->compiled_aggregate_functions; + columns_data.reserve(final_aggregate_columns.size()); - for (size_t i = 0; i < compiled_functions->functions_count; ++i) + for (size_t i = 0; i < compiled_functions.functions_count; ++i) { auto & final_aggregate_column = final_aggregate_columns[i]; final_aggregate_column = final_aggregate_column->cloneResized(places.size()); columns_data.emplace_back(getColumnData(final_aggregate_column.get())); } - auto insert_aggregate_states_function = compiled_functions->insert_aggregates_into_columns_function; - insert_aggregate_states_function(places.size(), columns_data.data(), places.data()); + auto insert_aggregates_into_columns_function = compiled_functions.insert_aggregates_into_columns_function; + insert_aggregates_into_columns_function(places.size(), columns_data.data(), places.data()); - aggregate_functions_destroy_index = compiled_functions->functions_count; + aggregate_functions_destroy_index = compiled_functions.functions_count; + } +#endif - for (; aggregate_functions_destroy_index < params.aggregates_size;) - { - auto & final_aggregate_column = final_aggregate_columns[aggregate_functions_destroy_index]; - size_t offset = offsets_of_aggregate_states[aggregate_functions_destroy_index]; + for (; aggregate_functions_destroy_index < params.aggregates_size;) + { + auto & final_aggregate_column = final_aggregate_columns[aggregate_functions_destroy_index]; + size_t offset = offsets_of_aggregate_states[aggregate_functions_destroy_index]; - /** We increase aggregate_functions_destroy_index because by function contract if insertResultIntoAndDestroyBatch + /** We increase aggregate_functions_destroy_index because by function contract if insertResultIntoAndDestroyBatch * throws exception, it also must destroy all necessary states. * Then code need to continue to destroy other aggregate function states with next function index. */ - size_t destroy_index = aggregate_functions_destroy_index; - ++aggregate_functions_destroy_index; + size_t destroy_index = aggregate_functions_destroy_index; + ++aggregate_functions_destroy_index; - bool is_state = aggregate_functions[destroy_index]->isState(); - bool destroy_place_after_insert = !is_state; + bool is_state = aggregate_functions[destroy_index]->isState(); + bool destroy_place_after_insert = !is_state; - aggregate_functions[destroy_index]->insertResultIntoAndDestroyBatch(places.size(), places.data(), offset, *final_aggregate_column, arena, destroy_place_after_insert); - } + aggregate_functions[destroy_index]->insertResultIntoAndDestroyBatch( + places.size(), places.data(), offset, *final_aggregate_column, arena, destroy_place_after_insert); } - catch (...) - { - exception = std::current_exception(); - } - - for (; aggregate_functions_destroy_index < params.aggregates_size; ++aggregate_functions_destroy_index) - { - size_t offset = offsets_of_aggregate_states[aggregate_functions_destroy_index]; - aggregate_functions[aggregate_functions_destroy_index]->destroyBatch(places.size(), places.data(), offset); - } - - if (exception) - std::rethrow_exception(exception); } - else -#endif + catch (...) { - data.forEachValue([&](const auto & key, auto & mapped) - { - method.insertKeyIntoColumns(key, key_columns, key_sizes_ref); - insertAggregatesIntoColumns(mapped, final_aggregate_columns, arena); - }); + exception = std::current_exception(); } + + for (; aggregate_functions_destroy_index < params.aggregates_size; ++aggregate_functions_destroy_index) + { + size_t offset = offsets_of_aggregate_states[aggregate_functions_destroy_index]; + aggregate_functions[aggregate_functions_destroy_index]->destroyBatch(places.size(), places.data(), offset); + } + + if (exception) + std::rethrow_exception(exception); } template @@ -1898,7 +1802,7 @@ void NO_INLINE Aggregator::mergeDataNullKey( } -template +template void NO_INLINE Aggregator::mergeDataImpl( Table & table_dst, Table & table_src, @@ -1907,52 +1811,34 @@ void NO_INLINE Aggregator::mergeDataImpl( if constexpr (Method::low_cardinality_optimization) mergeDataNullKey(table_dst, table_src, arena); + table_src.mergeToViaEmplace(table_dst, [&](AggregateDataPtr & __restrict dst, AggregateDataPtr & __restrict src, bool inserted) + { + if (!inserted) + { + size_t compiled_functions_count = 0; + #if USE_EMBEDDED_COMPILER - if (compiled_functions) - { - auto merge_aggregate_states_function_typed = compiled_functions->merge_aggregate_states_function; - - table_src.mergeToViaEmplace(table_dst, [&](AggregateDataPtr & __restrict dst, AggregateDataPtr & __restrict src, bool inserted) - { - if (!inserted) + if constexpr (use_compiled_functions) { - merge_aggregate_states_function_typed(dst, src); - - for (size_t i = compiled_functions->functions_count; i < params.aggregates_size; ++i) - aggregate_functions[i]->merge(dst + offsets_of_aggregate_states[i], src + offsets_of_aggregate_states[i], arena); - - for (size_t i = compiled_functions->functions_count; i < params.aggregates_size; ++i) - aggregate_functions[i]->destroy(src + offsets_of_aggregate_states[i]); + const auto & compiled_functions = compiled_aggregate_functions_holder->compiled_aggregate_functions; + compiled_functions.merge_aggregate_states_function(dst, src); + compiled_functions_count = compiled_aggregate_functions_holder->compiled_aggregate_functions.functions_count; } - else - { - dst = src; - } - - src = nullptr; - }); - } - else #endif - { - table_src.mergeToViaEmplace(table_dst, [&](AggregateDataPtr & __restrict dst, AggregateDataPtr & __restrict src, bool inserted) + + for (size_t i = compiled_functions_count; i < params.aggregates_size; ++i) + aggregate_functions[i]->merge(dst + offsets_of_aggregate_states[i], src + offsets_of_aggregate_states[i], arena); + + for (size_t i = compiled_functions_count; i < params.aggregates_size; ++i) + aggregate_functions[i]->destroy(src + offsets_of_aggregate_states[i]); + } + else { - if (!inserted) - { - for (size_t i = 0; i < params.aggregates_size; ++i) - aggregate_functions[i]->merge(dst + offsets_of_aggregate_states[i], src + offsets_of_aggregate_states[i], arena); + dst = src; + } - for (size_t i = 0; i < params.aggregates_size; ++i) - aggregate_functions[i]->destroy(src + offsets_of_aggregate_states[i]); - } - else - { - dst = src; - } - - src = nullptr; - }); - } + src = nullptr; + }); table_src.clearAndShrink(); } @@ -2056,21 +1942,41 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl( AggregatedDataVariants & current = *non_empty_data[result_num]; if (!no_more_keys) - mergeDataImpl( - getDataVariant(*res).data, - getDataVariant(current).data, - res->aggregates_pool); + { +#if USE_EMBEDDED_COMPILER + if (compiled_aggregate_functions_holder) + { + mergeDataImpl( + getDataVariant(*res).data, + getDataVariant(current).data, + res->aggregates_pool); + } + else +#endif + { + mergeDataImpl( + getDataVariant(*res).data, + getDataVariant(current).data, + res->aggregates_pool); + } + } else if (res->without_key) + { + /// TODO: Use compile function mergeDataNoMoreKeysImpl( getDataVariant(*res).data, res->without_key, getDataVariant(current).data, res->aggregates_pool); + } else + { + /// TODO: Use compile function mergeDataOnlyExistingKeysImpl( getDataVariant(*res).data, getDataVariant(current).data, res->aggregates_pool); + } /// `current` will not destroy the states of aggregate functions in the destructor current.aggregator = nullptr; @@ -2095,11 +2001,22 @@ void NO_INLINE Aggregator::mergeBucketImpl( return; AggregatedDataVariants & current = *data[result_num]; - - mergeDataImpl( - getDataVariant(*res).data.impls[bucket], - getDataVariant(current).data.impls[bucket], - arena); +#if USE_EMBEDDED_COMPILER + if (compiled_aggregate_functions_holder) + { + mergeDataImpl( + getDataVariant(*res).data.impls[bucket], + getDataVariant(current).data.impls[bucket], + arena); + } + else +#endif + { + mergeDataImpl( + getDataVariant(*res).data.impls[bucket], + getDataVariant(current).data.impls[bucket], + arena); + } } } diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index dd9a11cf4ae..19600d6aeb9 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -852,6 +852,8 @@ using AggregatedDataVariantsPtr = std::shared_ptr; using ManyAggregatedDataVariants = std::vector; using ManyAggregatedDataVariantsPtr = std::shared_ptr; +class CompiledAggregateFunctionsHolder; + /** How are "total" values calculated with WITH TOTALS? * (For more details, see TotalsHavingTransform.) * @@ -911,7 +913,6 @@ public: bool compile_aggregate_expressions; size_t min_count_to_compile_aggregate_expression; - size_t aggregation_method; Params( const Block & src_header_, @@ -924,7 +925,6 @@ public: size_t min_free_disk_space_, bool compile_aggregate_expressions_, size_t min_count_to_compile_aggregate_expression_, - size_t aggregation_method_, const Block & intermediate_header_ = {}) : src_header(src_header_), intermediate_header(intermediate_header_), @@ -936,15 +936,14 @@ public: tmp_volume(tmp_volume_), max_threads(max_threads_), min_free_disk_space(min_free_disk_space_), compile_aggregate_expressions(compile_aggregate_expressions_), - min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_), - aggregation_method(aggregation_method_) + min_count_to_compile_aggregate_expression(min_count_to_compile_aggregate_expression_) { } /// Only parameters that matter during merge. Params(const Block & intermediate_header_, const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_) - : Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0, false, 0, 0) + : Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0, false, 0) { intermediate_header = intermediate_header_; } @@ -1087,7 +1086,7 @@ private: TemporaryFiles temporary_files; #if USE_EMBEDDED_COMPILER - std::optional compiled_functions; + std::shared_ptr compiled_aggregate_functions_holder; #endif /** Try to compile aggregate functions. @@ -1121,7 +1120,7 @@ private: AggregateDataPtr overflow_row) const; /// Specialization for a particular value no_more_keys. - template + template void executeImplBatch( Method & method, typename Method::State & state, @@ -1130,31 +1129,6 @@ private: AggregateFunctionInstruction * aggregate_instructions, AggregateDataPtr overflow_row) const; - template - void handleAggregationJIT( - Method & method, - typename Method::State & state, - Arena * aggregates_pool, - size_t rows, - AggregateFunctionInstruction * aggregate_instructions) const; - - template - void handleAggregationJITV2( - Method & method, - typename Method::State & state, - Arena * aggregates_pool, - size_t rows, - AggregateFunctionInstruction * aggregate_instructions) const; - - template - void handleAggregationDefault( - Method & method, - typename Method::State & state, - Arena * aggregates_pool, - size_t rows, - AggregateFunctionInstruction * aggregate_instructions, - AggregateDataPtr overflow_row) const; - /// For case when there are no keys (all aggregate into one row). static void executeWithoutKeyImpl( AggregatedDataWithoutKey & res, @@ -1183,7 +1157,7 @@ private: Arena * arena) const; /// Merge data from hash table `src` into `dst`. - template + template void mergeDataImpl( Table & table_dst, Table & table_src, @@ -1227,7 +1201,7 @@ private: MutableColumns & final_aggregate_columns, Arena * arena) const; - template + template void convertToBlockImplFinal( Method & method, Table & data, diff --git a/src/Interpreters/AsynchronousMetrics.cpp b/src/Interpreters/AsynchronousMetrics.cpp index 2216be4390c..9803a2d8e9b 100644 --- a/src/Interpreters/AsynchronousMetrics.cpp +++ b/src/Interpreters/AsynchronousMetrics.cpp @@ -1,6 +1,6 @@ #include #include -#include +#include #include #include #include diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index a0fce58b472..2992bc010ab 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -45,7 +45,6 @@ #include #include #include -#include #include #include #include diff --git a/src/Interpreters/ExpressionJIT.cpp b/src/Interpreters/ExpressionJIT.cpp index d30bab0e6df..497aa56ab13 100644 --- a/src/Interpreters/ExpressionJIT.cpp +++ b/src/Interpreters/ExpressionJIT.cpp @@ -1,4 +1,6 @@ -#include +#if !defined(ARCADIA_BUILD) +# include "config_core.h" +#endif #if USE_EMBEDDED_COMPILER @@ -20,6 +22,7 @@ #include #include #include +#include #include namespace DB @@ -42,15 +45,16 @@ static Poco::Logger * getLogger() return &logger; } -class CompiledFunctionHolder +class CompiledFunctionHolder : public CompiledExpressionCacheEntry { public: explicit CompiledFunctionHolder(CompiledFunction compiled_function_) - : compiled_function(compiled_function_) + : CompiledExpressionCacheEntry(compiled_function_.compiled_module.size) + , compiled_function(compiled_function_) {} - ~CompiledFunctionHolder() + ~CompiledFunctionHolder() override { getJITInstance().deleteCompiledModule(compiled_function.compiled_module); } @@ -287,19 +291,18 @@ static FunctionBasePtr compile( { LOG_TRACE(getLogger(), "Compile expression {}", llvm_function->getName()); auto compiled_function = compileFunction(getJITInstance(), *llvm_function); - auto compiled_function_holder = std::make_shared(compiled_function); - - return std::make_shared(std::move(compiled_function_holder), compiled_function.compiled_module.size); + return std::make_shared(compiled_function); }); - llvm_function->setCompiledFunction(compiled_function_cache_entry->getCompiledFunctionHolder()); + std::shared_ptr compiled_function_holder = std::static_pointer_cast(compiled_function_cache_entry); + llvm_function->setCompiledFunction(std::move(compiled_function_holder)); } else { auto compiled_function = compileFunction(getJITInstance(), *llvm_function); - auto compiled_function_ptr = std::make_shared(compiled_function); + auto compiled_function_holder = std::make_shared(compiled_function); - llvm_function->setCompiledFunction(compiled_function_ptr); + llvm_function->setCompiledFunction(std::move(compiled_function_holder)); } return llvm_function; @@ -568,25 +571,6 @@ void ActionsDAG::compileFunctions(size_t min_count_to_compile_expression) } } -CompiledExpressionCacheFactory & CompiledExpressionCacheFactory::instance() -{ - static CompiledExpressionCacheFactory factory; - return factory; -} - -void CompiledExpressionCacheFactory::init(size_t cache_size) -{ - if (cache) - throw Exception(ErrorCodes::LOGICAL_ERROR, "CompiledExpressionCache was already initialized"); - - cache = std::make_unique(cache_size); -} - -CompiledExpressionCache * CompiledExpressionCacheFactory::tryGetCache() -{ - return cache.get(); -} - } #endif diff --git a/src/Interpreters/ExpressionJIT.h b/src/Interpreters/ExpressionJIT.h deleted file mode 100644 index ab78346cf27..00000000000 --- a/src/Interpreters/ExpressionJIT.h +++ /dev/null @@ -1,63 +0,0 @@ -#pragma once - -#if !defined(ARCADIA_BUILD) -# include "config_core.h" -#endif - -#if USE_EMBEDDED_COMPILER -# include -# include - -namespace DB -{ - -class CompiledFunctionHolder; - -class CompiledFunctionCacheEntry -{ -public: - CompiledFunctionCacheEntry(std::shared_ptr compiled_function_holder_, size_t compiled_function_size_) - : compiled_function_holder(std::move(compiled_function_holder_)) - , compiled_function_size(compiled_function_size_) - {} - - std::shared_ptr getCompiledFunctionHolder() const { return compiled_function_holder; } - - size_t getCompiledFunctionSize() const { return compiled_function_size; } - -private: - std::shared_ptr compiled_function_holder; - - size_t compiled_function_size; -}; - -struct CompiledFunctionWeightFunction -{ - size_t operator()(const CompiledFunctionCacheEntry & compiled_function) const - { - return compiled_function.getCompiledFunctionSize(); - } -}; - -class CompiledExpressionCache : public LRUCache -{ -public: - using Base = LRUCache; - using Base::Base; -}; - -class CompiledExpressionCacheFactory -{ -private: - std::unique_ptr cache; - -public: - static CompiledExpressionCacheFactory & instance(); - - void init(size_t cache_size); - CompiledExpressionCache * tryGetCache(); -}; - -} - -#endif diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index aae69426391..900820fb209 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -2040,8 +2040,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac settings.max_threads, settings.min_free_disk_space_for_temporary_data, settings.compile_aggregate_expressions, - settings.min_count_to_compile_aggregate_expression, - settings.aggregation_method); + settings.min_count_to_compile_aggregate_expression); SortDescription group_by_sort_description; @@ -2145,8 +2144,7 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPlan & query_plan, Modific settings.max_threads, settings.min_free_disk_space_for_temporary_data, settings.compile_aggregate_expressions, - settings.min_count_to_compile_aggregate_expression, - settings.aggregation_method); + settings.min_count_to_compile_aggregate_expression); auto transform_params = std::make_shared(params, true); diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index f76d51e765b..bdeb4a30e9e 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -25,7 +25,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/src/Interpreters/JIT/CompiledExpressionCache.cpp b/src/Interpreters/JIT/CompiledExpressionCache.cpp new file mode 100644 index 00000000000..98f4eec982d --- /dev/null +++ b/src/Interpreters/JIT/CompiledExpressionCache.cpp @@ -0,0 +1,34 @@ +#include "CompiledExpressionCache.h" + +#if USE_EMBEDDED_COMPILER + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +CompiledExpressionCacheFactory & CompiledExpressionCacheFactory::instance() +{ + static CompiledExpressionCacheFactory factory; + return factory; +} + +void CompiledExpressionCacheFactory::init(size_t cache_size) +{ + if (cache) + throw Exception(ErrorCodes::LOGICAL_ERROR, "CompiledExpressionCache was already initialized"); + + cache = std::make_unique(cache_size); +} + +CompiledExpressionCache * CompiledExpressionCacheFactory::tryGetCache() +{ + return cache.get(); +} + +} + +#endif diff --git a/src/Interpreters/JIT/CompiledExpressionCache.h b/src/Interpreters/JIT/CompiledExpressionCache.h new file mode 100644 index 00000000000..5182a77d77a --- /dev/null +++ b/src/Interpreters/JIT/CompiledExpressionCache.h @@ -0,0 +1,61 @@ +#pragma once + +#if !defined(ARCADIA_BUILD) +# include "config_core.h" +#endif + +#if USE_EMBEDDED_COMPILER +# include +# include +# include + +namespace DB +{ + +class CompiledExpressionCacheEntry +{ +public: + explicit CompiledExpressionCacheEntry(size_t compiled_expression_size_) + : compiled_expression_size(compiled_expression_size_) + {} + + size_t getCompiledExpressionSize() const { return compiled_expression_size; } + + virtual ~CompiledExpressionCacheEntry() {} + +private: + + size_t compiled_expression_size = 0; + +}; + +struct CompiledFunctionWeightFunction +{ + size_t operator()(const CompiledExpressionCacheEntry & compiled_function) const + { + return compiled_function.getCompiledExpressionSize(); + } +}; + +class CompiledExpressionCache : public LRUCache +{ +public: + using Base = LRUCache; + using Base::Base; +}; + +class CompiledExpressionCacheFactory +{ +private: + std::unique_ptr cache; + +public: + static CompiledExpressionCacheFactory & instance(); + + void init(size_t cache_size); + CompiledExpressionCache * tryGetCache(); +}; + +} + +#endif diff --git a/src/Interpreters/JIT/compileFunction.cpp b/src/Interpreters/JIT/compileFunction.cpp index 3e326e82246..25198bebca6 100644 --- a/src/Interpreters/JIT/compileFunction.cpp +++ b/src/Interpreters/JIT/compileFunction.cpp @@ -307,137 +307,6 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const auto & context = module.getContext(); llvm::IRBuilder<> b(context); - auto * size_type = b.getIntNTy(sizeof(size_t) * 8); - - auto * column_data_type = llvm::StructType::get(b.getInt8PtrTy(), b.getInt8PtrTy()); - auto * get_place_func_declaration = llvm::FunctionType::get(b.getInt8Ty()->getPointerTo(), { b.getInt8Ty()->getPointerTo(), size_type }, /*isVarArg=*/false); - auto * aggregate_loop_func_declaration = llvm::FunctionType::get(b.getVoidTy(), { size_type, column_data_type->getPointerTo(), get_place_func_declaration->getPointerTo(), b.getInt8Ty()->getPointerTo() }, false); - - auto * aggregate_loop_func_definition = llvm::Function::Create(aggregate_loop_func_declaration, llvm::Function::ExternalLinkage, name, module); - - auto * arguments = aggregate_loop_func_definition->args().begin(); - llvm::Value * rows_count_arg = arguments++; - llvm::Value * columns_arg = arguments++; - llvm::Value * get_place_function_arg = arguments++; - llvm::Value * get_place_function_context_arg = arguments++; - - /// Initialize ColumnDataPlaceholder llvm representation of ColumnData - - auto * entry = llvm::BasicBlock::Create(b.getContext(), "entry", aggregate_loop_func_definition); - b.SetInsertPoint(entry); - - std::vector columns; - size_t previous_columns_size = 0; - - for (const auto & function : functions) - { - auto argument_types = function.function->getArgumentTypes(); - - ColumnDataPlaceholder data_placeholder; - - size_t function_arguments_size = argument_types.size(); - - for (size_t column_argument_index = 0; column_argument_index < function_arguments_size; ++column_argument_index) - { - const auto & argument_type = argument_types[column_argument_index]; - auto * data = b.CreateLoad(column_data_type, b.CreateConstInBoundsGEP1_32(column_data_type, columns_arg, previous_columns_size + column_argument_index)); - data_placeholder.data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(argument_type))->getPointerTo()); - data_placeholder.null_init = argument_type->isNullable() ? b.CreateExtractValue(data, {1}) : nullptr; - columns.emplace_back(data_placeholder); - } - - previous_columns_size += function_arguments_size; - } - - /// Initialize loop - - auto * end = llvm::BasicBlock::Create(b.getContext(), "end", aggregate_loop_func_definition); - auto * loop = llvm::BasicBlock::Create(b.getContext(), "loop", aggregate_loop_func_definition); - - b.CreateCondBr(b.CreateICmpEQ(rows_count_arg, llvm::ConstantInt::get(size_type, 0)), end, loop); - - b.SetInsertPoint(loop); - - auto * counter_phi = b.CreatePHI(rows_count_arg->getType(), 2); - counter_phi->addIncoming(llvm::ConstantInt::get(size_type, 0), entry); - - for (auto & col : columns) - { - col.data = b.CreatePHI(col.data_init->getType(), 2); - col.data->addIncoming(col.data_init, entry); - - if (col.null_init) - { - col.null = b.CreatePHI(col.null_init->getType(), 2); - col.null->addIncoming(col.null_init, entry); - } - } - - auto * aggregation_place = b.CreateCall(get_place_func_declaration, get_place_function_arg, { get_place_function_context_arg, counter_phi }); - - previous_columns_size = 0; - for (const auto & function : functions) - { - size_t aggregate_function_offset = function.aggregate_data_offset; - const auto * aggregate_function_ptr = function.function; - - auto arguments_types = function.function->getArgumentTypes(); - std::vector arguments_values; - - size_t function_arguments_size = arguments_types.size(); - arguments_values.resize(function_arguments_size); - - for (size_t column_argument_index = 0; column_argument_index < function_arguments_size; ++column_argument_index) - { - auto * column_argument_data = columns[previous_columns_size + column_argument_index].data; - auto * column_argument_null_data = columns[previous_columns_size + column_argument_index].null; - - auto & argument_type = arguments_types[column_argument_index]; - - auto * value = b.CreateLoad(toNativeType(b, removeNullable(argument_type)), column_argument_data); - if (!argument_type->isNullable()) - { - arguments_values[column_argument_index] = value; - continue; - } - - auto * is_null = b.CreateICmpNE(b.CreateLoad(b.getInt8Ty(), column_argument_null_data), b.getInt8(0)); - auto * nullable_unitilized = llvm::Constant::getNullValue(toNativeType(b, argument_type)); - auto * nullable_value = b.CreateInsertValue(b.CreateInsertValue(nullable_unitilized, value, {0}), is_null, {1}); - arguments_values[column_argument_index] = nullable_value; - } - - auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_32(nullptr, aggregation_place, aggregate_function_offset); - aggregate_function_ptr->compileAdd(b, aggregation_place_with_offset, arguments_types, arguments_values); - - previous_columns_size += function_arguments_size; - } - - /// End of loop - - auto * cur_block = b.GetInsertBlock(); - for (auto & col : columns) - { - col.data->addIncoming(b.CreateConstInBoundsGEP1_32(nullptr, col.data, 1), cur_block); - - if (col.null) - col.null->addIncoming(b.CreateConstInBoundsGEP1_32(nullptr, col.null, 1), cur_block); - } - - auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1)); - counter_phi->addIncoming(value, cur_block); - - b.CreateCondBr(b.CreateICmpEQ(value, rows_count_arg), end, loop); - - b.SetInsertPoint(end); - b.CreateRetVoid(); -} - -static void compileAddIntoAggregateStatesFunctionsV2(llvm::Module & module, const std::vector & functions, const std::string & name) -{ - auto & context = module.getContext(); - llvm::IRBuilder<> b(context); - auto * size_type = b.getIntNTy(sizeof(size_t) * 8); auto * places_type = b.getInt8Ty()->getPointerTo()->getPointerTo(); auto * column_data_type = llvm::StructType::get(b.getInt8PtrTy(), b.getInt8PtrTy()); @@ -699,7 +568,6 @@ CompiledAggregateFunctions compileAggregateFunctons(CHJIT & jit, const std::vect { std::string create_aggregate_states_functions_name = functions_dump_name + "_create"; std::string add_aggregate_states_functions_name = functions_dump_name + "_add"; - std::string add_aggregate_states_functions_name_v2 = functions_dump_name + "_add_v2"; std::string merge_aggregate_states_functions_name = functions_dump_name + "_merge"; std::string insert_aggregate_states_functions_name = functions_dump_name + "_insert"; @@ -707,20 +575,17 @@ CompiledAggregateFunctions compileAggregateFunctons(CHJIT & jit, const std::vect { compileCreateAggregateStatesFunctions(module, functions, create_aggregate_states_functions_name); compileAddIntoAggregateStatesFunctions(module, functions, add_aggregate_states_functions_name); - compileAddIntoAggregateStatesFunctionsV2(module, functions, add_aggregate_states_functions_name_v2); compileMergeAggregatesStates(module, functions, merge_aggregate_states_functions_name); compileInsertAggregatesIntoResultColumns(module, functions, insert_aggregate_states_functions_name); }); auto create_aggregate_states_function = reinterpret_cast(compiled_module.function_name_to_symbol[create_aggregate_states_functions_name]); auto add_into_aggregate_states_function = reinterpret_cast(compiled_module.function_name_to_symbol[add_aggregate_states_functions_name]); - auto add_into_aggregate_states_function_v2 = reinterpret_cast(compiled_module.function_name_to_symbol[add_aggregate_states_functions_name_v2]); auto merge_aggregate_states_function = reinterpret_cast(compiled_module.function_name_to_symbol[merge_aggregate_states_functions_name]); auto insert_aggregate_states_function = reinterpret_cast(compiled_module.function_name_to_symbol[insert_aggregate_states_functions_name]); assert(create_aggregate_states_function); assert(add_into_aggregate_states_function); - assert(add_into_aggregate_states_function_v2); assert(merge_aggregate_states_function); assert(insert_aggregate_states_function); @@ -728,7 +593,6 @@ CompiledAggregateFunctions compileAggregateFunctons(CHJIT & jit, const std::vect { .create_aggregate_states_function = create_aggregate_states_function, .add_into_aggregate_states_function = add_into_aggregate_states_function, - .add_into_aggregate_states_function_v2 = add_into_aggregate_states_function_v2, .merge_aggregate_states_function = merge_aggregate_states_function, .insert_aggregates_into_columns_function = insert_aggregate_states_function, diff --git a/src/Interpreters/JIT/compileFunction.h b/src/Interpreters/JIT/compileFunction.h index 3a0435b098c..2a2f2a0d20a 100644 --- a/src/Interpreters/JIT/compileFunction.h +++ b/src/Interpreters/JIT/compileFunction.h @@ -41,13 +41,8 @@ struct CompiledFunction }; /** Compile function to native jit code using CHJIT instance. - * Function is compiled as single module. - * After this function execution, code for function will be compiled and can be queried using - * findCompiledFunction with function name. - * Compiled function can be safely casted to JITCompiledFunction type and must be called with - * valid ColumnData and ColumnDataRowsSize. - * It is important that ColumnData parameter of JITCompiledFunction is result column, - * and will be filled by compiled function. + * It is client responsibility to match ColumnData arguments size with + * function arguments size and additional ColumnData for result. */ CompiledFunction compileFunction(CHJIT & jit, const IFunctionBase & function); @@ -57,12 +52,8 @@ struct AggregateFunctionWithOffset size_t aggregate_data_offset; }; -using GetAggregateDataContext = char *; -using GetAggregateDataFunction = AggregateDataPtr (*)(GetAggregateDataContext, size_t); - using JITCreateAggregateStatesFunction = void (*)(AggregateDataPtr); -using JITAddIntoAggregateStatesFunction = void (*)(ColumnDataRowsSize, ColumnData *, GetAggregateDataFunction, GetAggregateDataContext); -using JITAddIntoAggregateStatesFunctionV2 = void (*)(ColumnDataRowsSize, ColumnData *, AggregateDataPtr *); +using JITAddIntoAggregateStatesFunction = void (*)(ColumnDataRowsSize, ColumnData *, AggregateDataPtr *); using JITMergeAggregateStatesFunction = void (*)(AggregateDataPtr, AggregateDataPtr); using JITInsertAggregatesIntoColumnsFunction = void (*)(ColumnDataRowsSize, ColumnData *, AggregateDataPtr *); @@ -70,7 +61,6 @@ struct CompiledAggregateFunctions { JITCreateAggregateStatesFunction create_aggregate_states_function; JITAddIntoAggregateStatesFunction add_into_aggregate_states_function; - JITAddIntoAggregateStatesFunctionV2 add_into_aggregate_states_function_v2; JITMergeAggregateStatesFunction merge_aggregate_states_function; JITInsertAggregatesIntoColumnsFunction insert_aggregates_into_columns_function; diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 1d781dbc08e..cffedf44823 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -303,7 +303,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( settings.min_free_disk_space_for_temporary_data, settings.compile_expressions, settings.min_count_to_compile_aggregate_expression, - settings.aggregation_method, header_before_aggregation); // The source header is also an intermediate header transform_params = std::make_shared(std::move(params), query_info.projection->aggregate_final); @@ -334,8 +333,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( settings.max_threads, settings.min_free_disk_space_for_temporary_data, settings.compile_aggregate_expressions, - settings.min_count_to_compile_aggregate_expression, - settings.aggregation_method); + settings.min_count_to_compile_aggregate_expression); transform_params = std::make_shared(std::move(params), query_info.projection->aggregate_final); }