From 3a6b37691a511ad912e8a452b7e3212e589a9ffd Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Tue, 27 Jul 2021 19:50:57 +0300 Subject: [PATCH 1/4] Compile aggregate functions without key --- src/Interpreters/Aggregator.cpp | 48 ++++- src/Interpreters/Aggregator.h | 5 +- src/Interpreters/JIT/compileFunction.cpp | 132 ++++++++++++++ src/Interpreters/JIT/compileFunction.h | 4 + tests/performance/jit_aggregate_functions.xml | 171 ++++++++++++++++++ .../00165_jit_aggregate_functions.reference | 12 ++ .../00165_jit_aggregate_functions.sql | 96 ++++++++++ 7 files changed, 463 insertions(+), 5 deletions(-) diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 7ffae761c0c..1327fedcd6d 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -781,15 +781,48 @@ void NO_INLINE Aggregator::executeImplBatch( } +template void NO_INLINE Aggregator::executeWithoutKeyImpl( AggregatedDataWithoutKey & res, size_t rows, AggregateFunctionInstruction * aggregate_instructions, Arena * arena) { - /// Adding values - for (AggregateFunctionInstruction * inst = aggregate_instructions; inst->that; ++inst) +#if USE_EMBEDDED_COMPILER + if constexpr (use_compiled_functions) { + std::vector columns_data; + + for (size_t i = 0; i < aggregate_functions.size(); ++i) + { + if (!is_aggregate_function_compiled[i]) + continue; + + AggregateFunctionInstruction * inst = aggregate_instructions + i; + size_t arguments_size = inst->that->getArgumentTypes().size(); + + for (size_t argument_index = 0; argument_index < arguments_size; ++argument_index) + { + columns_data.emplace_back(getColumnData(inst->batch_arguments[argument_index])); + } + } + + auto add_into_aggregate_states_function_single_place = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function_single_place; + add_into_aggregate_states_function_single_place(rows, columns_data.data(), res); + } +#endif + + /// Adding values + for (size_t i = 0; i < aggregate_functions.size(); ++i) + { + AggregateFunctionInstruction * inst = aggregate_instructions + i; + +#if USE_EMBEDDED_COMPILER + if constexpr (use_compiled_functions) + if (is_aggregate_function_compiled[i]) + continue; +#endif + if (inst->offsets) inst->batch_that->addBatchSinglePlace( inst->offsets[static_cast(rows - 1)], res + inst->state_offset, inst->batch_arguments, arena); @@ -930,7 +963,16 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData /// For the case when there are no keys (all aggregate into one row). if (result.type == AggregatedDataVariants::Type::without_key) { - executeWithoutKeyImpl(result.without_key, num_rows, aggregate_functions_instructions.data(), result.aggregates_pool); +#if USE_EMBEDDED_COMPILER + if (compiled_aggregate_functions_holder) + { + executeWithoutKeyImpl(result.without_key, num_rows, aggregate_functions_instructions.data(), result.aggregates_pool); + } + else +#endif + { + executeWithoutKeyImpl(result.without_key, num_rows, aggregate_functions_instructions.data(), result.aggregates_pool); + } } else { diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index 265ef912794..fde6ba219df 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -1121,7 +1121,7 @@ private: AggregateDataPtr overflow_row) const; /// Specialization for a particular value no_more_keys. - template + template void executeImplBatch( Method & method, typename Method::State & state, @@ -1131,7 +1131,8 @@ private: AggregateDataPtr overflow_row) const; /// For case when there are no keys (all aggregate into one row). - static void executeWithoutKeyImpl( + template + void executeWithoutKeyImpl( AggregatedDataWithoutKey & res, size_t rows, AggregateFunctionInstruction * aggregate_instructions, diff --git a/src/Interpreters/JIT/compileFunction.cpp b/src/Interpreters/JIT/compileFunction.cpp index 03ef0500757..5e1d4ca0375 100644 --- a/src/Interpreters/JIT/compileFunction.cpp +++ b/src/Interpreters/JIT/compileFunction.cpp @@ -436,6 +436,133 @@ static void compileAddIntoAggregateStatesFunctions(llvm::Module & module, const b.CreateRetVoid(); } +static void compileAddIntoAggregateStatesFunctionsSinglePlace(llvm::Module & module, const std::vector & functions, const std::string & name) +{ + auto & context = module.getContext(); + llvm::IRBuilder<> b(context); + + auto * size_type = b.getIntNTy(sizeof(size_t) * 8); + auto * places_type = b.getInt8Ty()->getPointerTo(); + auto * column_data_type = llvm::StructType::get(b.getInt8PtrTy(), b.getInt8PtrTy()); + + auto * aggregate_loop_func_declaration = llvm::FunctionType::get(b.getVoidTy(), { size_type, column_data_type->getPointerTo(), places_type }, false); + auto * aggregate_loop_func_definition = llvm::Function::Create(aggregate_loop_func_declaration, llvm::Function::ExternalLinkage, name, module); + + auto * arguments = aggregate_loop_func_definition->args().begin(); + llvm::Value * rows_count_arg = arguments++; + llvm::Value * columns_arg = arguments++; + llvm::Value * place_arg = arguments++; + + /// Initialize ColumnDataPlaceholder llvm representation of ColumnData + + auto * entry = llvm::BasicBlock::Create(b.getContext(), "entry", aggregate_loop_func_definition); + b.SetInsertPoint(entry); + + std::vector columns; + size_t previous_columns_size = 0; + + for (const auto & function : functions) + { + auto argument_types = function.function->getArgumentTypes(); + + ColumnDataPlaceholder data_placeholder; + + size_t function_arguments_size = argument_types.size(); + + for (size_t column_argument_index = 0; column_argument_index < function_arguments_size; ++column_argument_index) + { + const auto & argument_type = argument_types[column_argument_index]; + auto * data = b.CreateLoad(column_data_type, b.CreateConstInBoundsGEP1_64(column_data_type, columns_arg, previous_columns_size + column_argument_index)); + data_placeholder.data_init = b.CreatePointerCast(b.CreateExtractValue(data, {0}), toNativeType(b, removeNullable(argument_type))->getPointerTo()); + data_placeholder.null_init = argument_type->isNullable() ? b.CreateExtractValue(data, {1}) : nullptr; + columns.emplace_back(data_placeholder); + } + + previous_columns_size += function_arguments_size; + } + + /// Initialize loop + + auto * end = llvm::BasicBlock::Create(b.getContext(), "end", aggregate_loop_func_definition); + auto * loop = llvm::BasicBlock::Create(b.getContext(), "loop", aggregate_loop_func_definition); + + b.CreateCondBr(b.CreateICmpEQ(rows_count_arg, llvm::ConstantInt::get(size_type, 0)), end, loop); + + b.SetInsertPoint(loop); + + auto * counter_phi = b.CreatePHI(rows_count_arg->getType(), 2); + counter_phi->addIncoming(llvm::ConstantInt::get(size_type, 0), entry); + + for (auto & col : columns) + { + col.data = b.CreatePHI(col.data_init->getType(), 2); + col.data->addIncoming(col.data_init, entry); + + if (col.null_init) + { + col.null = b.CreatePHI(col.null_init->getType(), 2); + col.null->addIncoming(col.null_init, entry); + } + } + + previous_columns_size = 0; + for (const auto & function : functions) + { + size_t aggregate_function_offset = function.aggregate_data_offset; + const auto * aggregate_function_ptr = function.function; + + auto arguments_types = function.function->getArgumentTypes(); + std::vector arguments_values; + + size_t function_arguments_size = arguments_types.size(); + arguments_values.resize(function_arguments_size); + + for (size_t column_argument_index = 0; column_argument_index < function_arguments_size; ++column_argument_index) + { + auto * column_argument_data = columns[previous_columns_size + column_argument_index].data; + auto * column_argument_null_data = columns[previous_columns_size + column_argument_index].null; + + auto & argument_type = arguments_types[column_argument_index]; + + auto * value = b.CreateLoad(toNativeType(b, removeNullable(argument_type)), column_argument_data); + if (!argument_type->isNullable()) + { + arguments_values[column_argument_index] = value; + continue; + } + + auto * is_null = b.CreateICmpNE(b.CreateLoad(b.getInt8Ty(), column_argument_null_data), b.getInt8(0)); + auto * nullable_unitilized = llvm::Constant::getNullValue(toNativeType(b, argument_type)); + auto * nullable_value = b.CreateInsertValue(b.CreateInsertValue(nullable_unitilized, value, {0}), is_null, {1}); + arguments_values[column_argument_index] = nullable_value; + } + + auto * aggregation_place_with_offset = b.CreateConstInBoundsGEP1_64(nullptr, place_arg, aggregate_function_offset); + aggregate_function_ptr->compileAdd(b, aggregation_place_with_offset, arguments_types, arguments_values); + + previous_columns_size += function_arguments_size; + } + + /// End of loop + + auto * cur_block = b.GetInsertBlock(); + for (auto & col : columns) + { + col.data->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, col.data, 1), cur_block); + + if (col.null) + col.null->addIncoming(b.CreateConstInBoundsGEP1_64(nullptr, col.null, 1), cur_block); + } + + auto * value = b.CreateAdd(counter_phi, llvm::ConstantInt::get(size_type, 1)); + counter_phi->addIncoming(value, cur_block); + + b.CreateCondBr(b.CreateICmpEQ(value, rows_count_arg), end, loop); + + b.SetInsertPoint(end); + b.CreateRetVoid(); +} + static void compileMergeAggregatesStates(llvm::Module & module, const std::vector & functions, const std::string & name) { auto & context = module.getContext(); @@ -569,6 +696,7 @@ CompiledAggregateFunctions compileAggregateFunctions(CHJIT & jit, const std::vec std::string create_aggregate_states_functions_name = functions_dump_name + "_create"; std::string add_aggregate_states_functions_name = functions_dump_name + "_add"; + std::string add_aggregate_states_functions_name_single_place = functions_dump_name + "_add_single_place"; std::string merge_aggregate_states_functions_name = functions_dump_name + "_merge"; std::string insert_aggregate_states_functions_name = functions_dump_name + "_insert"; @@ -576,17 +704,20 @@ CompiledAggregateFunctions compileAggregateFunctions(CHJIT & jit, const std::vec { compileCreateAggregateStatesFunctions(module, functions, create_aggregate_states_functions_name); compileAddIntoAggregateStatesFunctions(module, functions, add_aggregate_states_functions_name); + compileAddIntoAggregateStatesFunctionsSinglePlace(module, functions, add_aggregate_states_functions_name_single_place); compileMergeAggregatesStates(module, functions, merge_aggregate_states_functions_name); compileInsertAggregatesIntoResultColumns(module, functions, insert_aggregate_states_functions_name); }); auto create_aggregate_states_function = reinterpret_cast(compiled_module.function_name_to_symbol[create_aggregate_states_functions_name]); auto add_into_aggregate_states_function = reinterpret_cast(compiled_module.function_name_to_symbol[add_aggregate_states_functions_name]); + auto add_into_aggregate_states_function_single_place = reinterpret_cast(compiled_module.function_name_to_symbol[add_aggregate_states_functions_name_single_place]); auto merge_aggregate_states_function = reinterpret_cast(compiled_module.function_name_to_symbol[merge_aggregate_states_functions_name]); auto insert_aggregate_states_function = reinterpret_cast(compiled_module.function_name_to_symbol[insert_aggregate_states_functions_name]); assert(create_aggregate_states_function); assert(add_into_aggregate_states_function); + assert(add_into_aggregate_states_function_single_place); assert(merge_aggregate_states_function); assert(insert_aggregate_states_function); @@ -598,6 +729,7 @@ CompiledAggregateFunctions compileAggregateFunctions(CHJIT & jit, const std::vec { .create_aggregate_states_function = create_aggregate_states_function, .add_into_aggregate_states_function = add_into_aggregate_states_function, + .add_into_aggregate_states_function_single_place = add_into_aggregate_states_function_single_place, .merge_aggregate_states_function = merge_aggregate_states_function, .insert_aggregates_into_columns_function = insert_aggregate_states_function, diff --git a/src/Interpreters/JIT/compileFunction.h b/src/Interpreters/JIT/compileFunction.h index 1cf15e15201..ae91c2a7650 100644 --- a/src/Interpreters/JIT/compileFunction.h +++ b/src/Interpreters/JIT/compileFunction.h @@ -54,6 +54,7 @@ struct AggregateFunctionWithOffset using JITCreateAggregateStatesFunction = void (*)(AggregateDataPtr); using JITAddIntoAggregateStatesFunction = void (*)(ColumnDataRowsSize, ColumnData *, AggregateDataPtr *); +using JITAddIntoAggregateStatesFunctionSinglePlace = void (*)(ColumnDataRowsSize, ColumnData *, AggregateDataPtr); using JITMergeAggregateStatesFunction = void (*)(AggregateDataPtr, AggregateDataPtr); using JITInsertAggregateStatesIntoColumnsFunction = void (*)(ColumnDataRowsSize, ColumnData *, AggregateDataPtr *); @@ -61,6 +62,8 @@ struct CompiledAggregateFunctions { JITCreateAggregateStatesFunction create_aggregate_states_function; JITAddIntoAggregateStatesFunction add_into_aggregate_states_function; + JITAddIntoAggregateStatesFunctionSinglePlace add_into_aggregate_states_function_single_place; + JITMergeAggregateStatesFunction merge_aggregate_states_function; JITInsertAggregateStatesIntoColumnsFunction insert_aggregates_into_columns_function; @@ -75,6 +78,7 @@ struct CompiledAggregateFunctions * * JITCreateAggregateStatesFunction will initialize aggregate data ptr with initial aggregate states values. * JITAddIntoAggregateStatesFunction will update aggregate states for aggregate functions with specified ColumnData. + * JITAddIntoAggregateStatesFunctionSinglePlace will update single aggregate state for aggregate functions with specified ColumnData. * JITMergeAggregateStatesFunction will merge aggregate states for aggregate functions. * JITInsertAggregateStatesIntoColumnsFunction will insert aggregate states for aggregate functions into result columns. */ diff --git a/tests/performance/jit_aggregate_functions.xml b/tests/performance/jit_aggregate_functions.xml index 3e99f6d9615..b1a937dfae9 100644 --- a/tests/performance/jit_aggregate_functions.xml +++ b/tests/performance/jit_aggregate_functions.xml @@ -150,6 +150,44 @@ FORMAT Null + + SELECT + {function}(value_1), + {function}(value_2), + {function}(value_3) + FROM {table} + FORMAT Null + + + + SELECT + {function}(value_1), + {function}(value_2), + sum(toUInt256(value_3)), + {function}(value_3) + FROM {table} + FORMAT Null + + + + SELECT + {function}If(value_1, predicate), + {function}If(value_2, predicate), + {function}If(value_3, predicate) + FROM {table} + FORMAT Null + + + + SELECT + {function}If(value_1, predicate), + {function}If(value_2, predicate), + sumIf(toUInt256(value_3), predicate), + {function}If(value_3, predicate) + FROM {table} + FORMAT Null + + SELECT {function}(value_1), @@ -200,6 +238,51 @@ FORMAT Null + + SELECT + {function}(value_1), + {function}(value_2), + {function}(value_3), + {function}(value_4), + {function}(value_5) + FROM {table} + FORMAT Null + + + + SELECT + {function}(value_1), + {function}(value_2), + sum(toUInt256(value_3)), + {function}(value_3), + {function}(value_4), + {function}(value_5) + FROM {table} + FORMAT Null + + + + SELECT + {function}If(value_1, predicate), + {function}If(value_2, predicate), + {function}If(value_3, predicate), + {function}If(value_4, predicate), + {function}If(value_5, predicate) + FROM {table} + FORMAT Null + + + + SELECT + {function}If(value_1, predicate), + {function}If(value_2, predicate), + sumIf(toUInt256(value_3), predicate), + {function}If(value_3, predicate), + {function}If(value_4, predicate), + {function}If(value_5, predicate) + FROM {table} + FORMAT Null + SELECT @@ -247,6 +330,48 @@ FORMAT Null + + SELECT + {function}(WatchID), + {function}(CounterID), + {function}(ClientIP) + FROM hits_100m_single + FORMAT Null + + + + SELECT + {function}(WatchID), + {function}(CounterID), + sum(toUInt256(ClientIP)), + {function}(ClientIP) + FROM hits_100m_single + FORMAT Null + + + + SELECT + {function}(WatchID), + {function}(CounterID), + {function}(ClientIP), + {function}(IPNetworkID), + {function}(SearchEngineID) + FROM hits_100m_single + FORMAT Null + + + + SELECT + {function}(WatchID), + {function}(CounterID), + sum(toUInt256(ClientIP)), + {function}(ClientIP), + {function}(IPNetworkID), + {function}(SearchEngineID) + FROM hits_100m_single + FORMAT Null + + WITH (WatchID % 2 == 0) AS predicate SELECT @@ -297,5 +422,51 @@ FORMAT Null + + WITH (WatchID % 2 == 0) AS predicate + SELECT + {function}If(WatchID, predicate), + {function}If(CounterID, predicate), + {function}If(ClientIP, predicate) + FROM hits_100m_single + FORMAT Null + + + + WITH (WatchID % 2 == 0) AS predicate + SELECT + {function}If(WatchID, predicate), + {function}If(CounterID, predicate), + sumIf(toUInt256(ClientIP), predicate), + {function}If(ClientIP, predicate) + FROM hits_100m_single + FORMAT Null + + + + WITH (WatchID % 2 == 0) AS predicate + SELECT + {function}If(WatchID, predicate), + {function}If(CounterID, predicate), + {function}If(ClientIP, predicate), + {function}If(IPNetworkID, predicate), + {function}If(SearchEngineID, predicate) + FROM hits_100m_single + FORMAT Null + + + + WITH (WatchID % 2 == 0) AS predicate + SELECT + {function}If(WatchID, predicate), + {function}If(CounterID, predicate), + sumIf(toUInt256(ClientIP), predicate), + {function}If(ClientIP, predicate), + {function}If(IPNetworkID, predicate), + {function}If(SearchEngineID, predicate) + FROM hits_100m_single + FORMAT Null + + DROP TABLE IF EXISTS {table} diff --git a/tests/queries/1_stateful/00165_jit_aggregate_functions.reference b/tests/queries/1_stateful/00165_jit_aggregate_functions.reference index 451a676754c..fa084170f53 100644 --- a/tests/queries/1_stateful/00165_jit_aggregate_functions.reference +++ b/tests/queries/1_stateful/00165_jit_aggregate_functions.reference @@ -62,6 +62,12 @@ Simple functions if combinator 11482817 4611990575414646848 9223302669582414438 9828522700609834800 378121905921203.2 34845264.2080656 25993 9223372036854775806 4611686018427387904 4689180182672571856 63469 4612175339998036670 9222961628400798084 17239621485933250238 663164390134376.5 7825349797.6059 25996 9223372036854775806 4611686018427387904 2067736879306995526 29103473 4611744585914335132 9223035551850347954 12590190375872647672 525927999326314.7 26049107.15514301 23939 9223372036854775806 4611686018427387904 8318055464870862444 +Simple functions without key +4611686725751467379 9223371678237104442 3626326766789368100 408650940859.2896 104735.01095549858 8873898 9223372036854775807 4611686018427387904 3818489297630359920 +Simple functions with non compilable function without key +4611686725751467379 9223371678237104442 3626326766789368100 61384643584599682996279588 408650940859.2896 104735.01095549858 8873898 9223372036854775807 4611686018427387904 3818489297630359920 +Simple functions if combinator without key +4611687533683519016 9223371678237104442 4124667747700004330 930178817930.5122 321189.2280948817 4434274 9223372036854775806 4611686018427387904 2265422677606390266 Aggregation without JIT compilation Simple functions 1704509 4611700827100483880 9223360787015464643 10441337359398154812 19954243669348.844 9648741.579254271 523264 9223372036854775807 4611686018427387904 4544239379628300646 @@ -126,3 +132,9 @@ Simple functions if combinator 11482817 4611990575414646848 9223302669582414438 9828522700609834800 378121905921203.2 34845264.2080656 25993 9223372036854775806 4611686018427387904 4689180182672571856 63469 4612175339998036670 9222961628400798084 17239621485933250238 663164390134376.5 7825349797.6059 25996 9223372036854775806 4611686018427387904 2067736879306995526 29103473 4611744585914335132 9223035551850347954 12590190375872647672 525927999326314.7 26049107.15514301 23939 9223372036854775806 4611686018427387904 8318055464870862444 +Simple functions without key +4611686725751467379 9223371678237104442 3626326766789368100 408650940859.2896 104735.01095549858 8873898 9223372036854775807 4611686018427387904 3818489297630359920 +Simple functions with non compilable function without key +4611686725751467379 9223371678237104442 3626326766789368100 61384643584599682996279588 408650940859.2896 104735.01095549858 8873898 9223372036854775807 4611686018427387904 3818489297630359920 +Simple functions if combinator without key +4611687533683519016 9223371678237104442 4124667747700004330 930178817930.5122 321189.2280948817 4434274 9223372036854775806 4611686018427387904 2265422677606390266 diff --git a/tests/queries/1_stateful/00165_jit_aggregate_functions.sql b/tests/queries/1_stateful/00165_jit_aggregate_functions.sql index 6c13c6e4d42..c826a129b2a 100644 --- a/tests/queries/1_stateful/00165_jit_aggregate_functions.sql +++ b/tests/queries/1_stateful/00165_jit_aggregate_functions.sql @@ -53,6 +53,54 @@ SELECT FROM test.hits GROUP BY CounterID ORDER BY count() DESC LIMIT 20; +SELECT 'Simple functions without key'; + +SELECT + min(WatchID) AS min_watch_id, + max(WatchID), + sum(WatchID), + avg(WatchID), + avgWeighted(WatchID, CounterID), + count(WatchID), + groupBitOr(WatchID), + groupBitAnd(WatchID), + groupBitXor(WatchID) +FROM test.hits +ORDER BY min_watch_id DESC LIMIT 20; + +SELECT 'Simple functions with non compilable function without key'; + +SELECT + min(WatchID) AS min_watch_id, + max(WatchID), + sum(WatchID), + sum(toUInt128(WatchID)), + avg(WatchID), + avgWeighted(WatchID, CounterID), + count(WatchID), + groupBitOr(WatchID), + groupBitAnd(WatchID), + groupBitXor(WatchID) +FROM test.hits +ORDER BY min_watch_id DESC LIMIT 20; + +SELECT 'Simple functions if combinator without key'; + +WITH (WatchID % 2 == 0) AS predicate +SELECT + minIf(WatchID, predicate) as min_watch_id, + maxIf(WatchID, predicate), + sumIf(WatchID, predicate), + avgIf(WatchID, predicate), + avgWeightedIf(WatchID, CounterID, predicate), + countIf(WatchID, predicate), + groupBitOrIf(WatchID, predicate), + groupBitAndIf(WatchID, predicate), + groupBitXorIf(WatchID, predicate) +FROM test.hits +ORDER BY min_watch_id +DESC LIMIT 20; + SET compile_aggregate_expressions = 0; SELECT 'Aggregation without JIT compilation'; @@ -105,3 +153,51 @@ SELECT groupBitXorIf(WatchID, predicate) FROM test.hits GROUP BY CounterID ORDER BY count() DESC LIMIT 20; + +SELECT 'Simple functions without key'; + +SELECT + min(WatchID) AS min_watch_id, + max(WatchID), + sum(WatchID), + avg(WatchID), + avgWeighted(WatchID, CounterID), + count(WatchID), + groupBitOr(WatchID), + groupBitAnd(WatchID), + groupBitXor(WatchID) +FROM test.hits +ORDER BY min_watch_id DESC LIMIT 20; + +SELECT 'Simple functions with non compilable function without key'; + +SELECT + min(WatchID) AS min_watch_id, + max(WatchID), + sum(WatchID), + sum(toUInt128(WatchID)), + avg(WatchID), + avgWeighted(WatchID, CounterID), + count(WatchID), + groupBitOr(WatchID), + groupBitAnd(WatchID), + groupBitXor(WatchID) +FROM test.hits +ORDER BY min_watch_id DESC LIMIT 20; + +SELECT 'Simple functions if combinator without key'; + +WITH (WatchID % 2 == 0) AS predicate +SELECT + minIf(WatchID, predicate) as min_watch_id, + maxIf(WatchID, predicate), + sumIf(WatchID, predicate), + avgIf(WatchID, predicate), + avgWeightedIf(WatchID, CounterID, predicate), + countIf(WatchID, predicate), + groupBitOrIf(WatchID, predicate), + groupBitAndIf(WatchID, predicate), + groupBitXorIf(WatchID, predicate) +FROM test.hits +ORDER BY min_watch_id +DESC LIMIT 20; From fd19f473115b97bb66c4cc65751fc631b0e8c413 Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Wed, 28 Jul 2021 19:47:36 +0300 Subject: [PATCH 2/4] Fixed MSan tests --- src/Interpreters/Aggregator.cpp | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 1327fedcd6d..709faac9057 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -809,6 +809,20 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl( auto add_into_aggregate_states_function_single_place = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function_single_place; add_into_aggregate_states_function_single_place(rows, columns_data.data(), res); + +#if defined(MEMORY_SANITIZER) + + /// We compile only functions that do not allocate some data in Arena. Only store necessary state in AggregateData place. + for (size_t aggregate_function_index = 0; aggregate_function_index < aggregate_functions.size(); ++aggregate_function_index) + { + if (!is_aggregate_function_compiled[aggregate_function_index]) + continue; + + auto aggregate_data_with_offset = res + offsets_of_aggregate_states[aggregate_function_index]; + auto data_size = params.aggregates[aggregate_function_index].function->sizeOfData(); + __msan_unpoison(aggregate_data_with_offset, data_size); + } +#endif } #endif From 8d061390f3e22e6e464d6e75e4340d69f7faa89e Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Tue, 3 Aug 2021 16:19:56 +0300 Subject: [PATCH 3/4] Updated performance tests --- tests/performance/jit_aggregate_functions.xml | 171 ----------- .../jit_aggregate_functions_no_key.xml | 284 ++++++++++++++++++ 2 files changed, 284 insertions(+), 171 deletions(-) create mode 100644 tests/performance/jit_aggregate_functions_no_key.xml diff --git a/tests/performance/jit_aggregate_functions.xml b/tests/performance/jit_aggregate_functions.xml index b1a937dfae9..3e99f6d9615 100644 --- a/tests/performance/jit_aggregate_functions.xml +++ b/tests/performance/jit_aggregate_functions.xml @@ -150,44 +150,6 @@ FORMAT Null - - SELECT - {function}(value_1), - {function}(value_2), - {function}(value_3) - FROM {table} - FORMAT Null - - - - SELECT - {function}(value_1), - {function}(value_2), - sum(toUInt256(value_3)), - {function}(value_3) - FROM {table} - FORMAT Null - - - - SELECT - {function}If(value_1, predicate), - {function}If(value_2, predicate), - {function}If(value_3, predicate) - FROM {table} - FORMAT Null - - - - SELECT - {function}If(value_1, predicate), - {function}If(value_2, predicate), - sumIf(toUInt256(value_3), predicate), - {function}If(value_3, predicate) - FROM {table} - FORMAT Null - - SELECT {function}(value_1), @@ -238,51 +200,6 @@ FORMAT Null - - SELECT - {function}(value_1), - {function}(value_2), - {function}(value_3), - {function}(value_4), - {function}(value_5) - FROM {table} - FORMAT Null - - - - SELECT - {function}(value_1), - {function}(value_2), - sum(toUInt256(value_3)), - {function}(value_3), - {function}(value_4), - {function}(value_5) - FROM {table} - FORMAT Null - - - - SELECT - {function}If(value_1, predicate), - {function}If(value_2, predicate), - {function}If(value_3, predicate), - {function}If(value_4, predicate), - {function}If(value_5, predicate) - FROM {table} - FORMAT Null - - - - SELECT - {function}If(value_1, predicate), - {function}If(value_2, predicate), - sumIf(toUInt256(value_3), predicate), - {function}If(value_3, predicate), - {function}If(value_4, predicate), - {function}If(value_5, predicate) - FROM {table} - FORMAT Null - SELECT @@ -330,48 +247,6 @@ FORMAT Null - - SELECT - {function}(WatchID), - {function}(CounterID), - {function}(ClientIP) - FROM hits_100m_single - FORMAT Null - - - - SELECT - {function}(WatchID), - {function}(CounterID), - sum(toUInt256(ClientIP)), - {function}(ClientIP) - FROM hits_100m_single - FORMAT Null - - - - SELECT - {function}(WatchID), - {function}(CounterID), - {function}(ClientIP), - {function}(IPNetworkID), - {function}(SearchEngineID) - FROM hits_100m_single - FORMAT Null - - - - SELECT - {function}(WatchID), - {function}(CounterID), - sum(toUInt256(ClientIP)), - {function}(ClientIP), - {function}(IPNetworkID), - {function}(SearchEngineID) - FROM hits_100m_single - FORMAT Null - - WITH (WatchID % 2 == 0) AS predicate SELECT @@ -422,51 +297,5 @@ FORMAT Null - - WITH (WatchID % 2 == 0) AS predicate - SELECT - {function}If(WatchID, predicate), - {function}If(CounterID, predicate), - {function}If(ClientIP, predicate) - FROM hits_100m_single - FORMAT Null - - - - WITH (WatchID % 2 == 0) AS predicate - SELECT - {function}If(WatchID, predicate), - {function}If(CounterID, predicate), - sumIf(toUInt256(ClientIP), predicate), - {function}If(ClientIP, predicate) - FROM hits_100m_single - FORMAT Null - - - - WITH (WatchID % 2 == 0) AS predicate - SELECT - {function}If(WatchID, predicate), - {function}If(CounterID, predicate), - {function}If(ClientIP, predicate), - {function}If(IPNetworkID, predicate), - {function}If(SearchEngineID, predicate) - FROM hits_100m_single - FORMAT Null - - - - WITH (WatchID % 2 == 0) AS predicate - SELECT - {function}If(WatchID, predicate), - {function}If(CounterID, predicate), - sumIf(toUInt256(ClientIP), predicate), - {function}If(ClientIP, predicate), - {function}If(IPNetworkID, predicate), - {function}If(SearchEngineID, predicate) - FROM hits_100m_single - FORMAT Null - - DROP TABLE IF EXISTS {table} diff --git a/tests/performance/jit_aggregate_functions_no_key.xml b/tests/performance/jit_aggregate_functions_no_key.xml new file mode 100644 index 00000000000..0d2577af97c --- /dev/null +++ b/tests/performance/jit_aggregate_functions_no_key.xml @@ -0,0 +1,284 @@ + + + hits_100m_single + + + + 1 + 0 + + + + CREATE TABLE jit_test_memory ( + key UInt64, + value_1 UInt64, + value_2 UInt64, + value_3 UInt64, + value_4 UInt64, + value_5 UInt64, + predicate UInt8 + ) Engine = Memory + + + + CREATE TABLE jit_test_merge_tree ( + key UInt64, + value_1 UInt64, + value_2 UInt64, + value_3 UInt64, + value_4 UInt64, + value_5 UInt64, + predicate UInt8 + ) Engine = MergeTree + ORDER BY key + + + + CREATE TABLE jit_test_merge_tree_nullable ( + key UInt64, + value_1 Nullable(UInt64), + value_2 Nullable(UInt64), + value_3 Nullable(UInt64), + value_4 Nullable(UInt64), + value_5 Nullable(UInt64), + predicate UInt8 + ) Engine = Memory + + + + CREATE TABLE jit_test_memory_nullable ( + key UInt64, + value_1 Nullable(UInt64), + value_2 Nullable(UInt64), + value_3 Nullable(UInt64), + value_4 Nullable(UInt64), + value_5 Nullable(UInt64), + predicate UInt8 + ) Engine = MergeTree + ORDER BY key + + + + + function + + sum + min + max + avg + any + anyLast + count + groupBitOr + groupBitAnd + groupBitXor + + + + + table + + jit_test_memory + jit_test_merge_tree + jit_test_memory_nullable + jit_test_merge_tree_nullable + + + + + group_scale + + 1000000 + + + + + + INSERT INTO {table} + SELECT + number % 1000000, + number, + number, + number, + number, + number, + if (number % 2 == 0, 1, 0) + FROM + system.numbers_mt + LIMIT 100000000 + + + + SELECT + {function}(value_1), + {function}(value_2), + {function}(value_3) + FROM {table} + FORMAT Null + + + + SELECT + {function}(value_1), + {function}(value_2), + sum(toUInt256(value_3)), + {function}(value_3) + FROM {table} + FORMAT Null + + + + SELECT + {function}If(value_1, predicate), + {function}If(value_2, predicate), + {function}If(value_3, predicate) + FROM {table} + FORMAT Null + + + + SELECT + {function}If(value_1, predicate), + {function}If(value_2, predicate), + sumIf(toUInt256(value_3), predicate), + {function}If(value_3, predicate) + FROM {table} + FORMAT Null + + + + SELECT + {function}(value_1), + {function}(value_2), + {function}(value_3), + {function}(value_4), + {function}(value_5) + FROM {table} + FORMAT Null + + + + SELECT + {function}(value_1), + {function}(value_2), + sum(toUInt256(value_3)), + {function}(value_3), + {function}(value_4), + {function}(value_5) + FROM {table} + FORMAT Null + + + + SELECT + {function}If(value_1, predicate), + {function}If(value_2, predicate), + {function}If(value_3, predicate), + {function}If(value_4, predicate), + {function}If(value_5, predicate) + FROM {table} + FORMAT Null + + + + SELECT + {function}If(value_1, predicate), + {function}If(value_2, predicate), + sumIf(toUInt256(value_3), predicate), + {function}If(value_3, predicate), + {function}If(value_4, predicate), + {function}If(value_5, predicate) + FROM {table} + FORMAT Null + + + + SELECT + {function}(WatchID), + {function}(CounterID), + {function}(ClientIP) + FROM hits_100m_single + FORMAT Null + + + + SELECT + {function}(WatchID), + {function}(CounterID), + sum(toUInt256(ClientIP)), + {function}(ClientIP) + FROM hits_100m_single + FORMAT Null + + + + SELECT + {function}(WatchID), + {function}(CounterID), + {function}(ClientIP), + {function}(IPNetworkID), + {function}(SearchEngineID) + FROM hits_100m_single + FORMAT Null + + + + SELECT + {function}(WatchID), + {function}(CounterID), + sum(toUInt256(ClientIP)), + {function}(ClientIP), + {function}(IPNetworkID), + {function}(SearchEngineID) + FROM hits_100m_single + FORMAT Null + + + + WITH (WatchID % 2 == 0) AS predicate + SELECT + {function}If(WatchID, predicate), + {function}If(CounterID, predicate), + {function}If(ClientIP, predicate) + FROM hits_100m_single + FORMAT Null + + + + WITH (WatchID % 2 == 0) AS predicate + SELECT + {function}If(WatchID, predicate), + {function}If(CounterID, predicate), + sumIf(toUInt256(ClientIP), predicate), + {function}If(ClientIP, predicate) + FROM hits_100m_single + FORMAT Null + + + + WITH (WatchID % 2 == 0) AS predicate + SELECT + {function}If(WatchID, predicate), + {function}If(CounterID, predicate), + {function}If(ClientIP, predicate), + {function}If(IPNetworkID, predicate), + {function}If(SearchEngineID, predicate) + FROM hits_100m_single + FORMAT Null + + + + WITH (WatchID % 2 == 0) AS predicate + SELECT + {function}If(WatchID, predicate), + {function}If(CounterID, predicate), + sumIf(toUInt256(ClientIP), predicate), + {function}If(ClientIP, predicate), + {function}If(IPNetworkID, predicate), + {function}If(SearchEngineID, predicate) + FROM hits_100m_single + FORMAT Null + + + DROP TABLE IF EXISTS {table} + From 7264c7bf4b17e4875a564d7b25ec8f38f7653bdc Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Thu, 5 Aug 2021 12:37:15 +0300 Subject: [PATCH 4/4] Fixed performance tests --- tests/performance/jit_aggregate_functions_no_key.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/performance/jit_aggregate_functions_no_key.xml b/tests/performance/jit_aggregate_functions_no_key.xml index 0d2577af97c..2d8f390059a 100644 --- a/tests/performance/jit_aggregate_functions_no_key.xml +++ b/tests/performance/jit_aggregate_functions_no_key.xml @@ -105,7 +105,7 @@ if (number % 2 == 0, 1, 0) FROM system.numbers_mt - LIMIT 100000000 + LIMIT 10000000