From 3cc0829cc1b9eef07af23c06fabb7256651167d6 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Sun, 2 Dec 2018 19:00:23 +0800 Subject: [PATCH] Function execution with dry runs This commit prevents stateful functions like rowNumberInAllBlocks from being modified in getHeader() calls. --- .../ExpressionBlockInputStream.cpp | 2 +- dbms/src/Functions/FunctionsMiscellaneous.h | 4 +- dbms/src/Functions/IFunction.cpp | 28 ++++++++------ dbms/src/Functions/IFunction.h | 23 ++++++++---- dbms/src/Functions/rowNumberInAllBlocks.cpp | 6 +++ dbms/src/Interpreters/ExpressionActions.cpp | 14 +++---- dbms/src/Interpreters/ExpressionActions.h | 4 +- .../00799_function_dry_run.reference | 9 +++++ .../0_stateless/00799_function_dry_run.sql | 37 +++++++++++++++++++ 9 files changed, 96 insertions(+), 31 deletions(-) create mode 100644 dbms/tests/queries/0_stateless/00799_function_dry_run.reference create mode 100644 dbms/tests/queries/0_stateless/00799_function_dry_run.sql diff --git a/dbms/src/DataStreams/ExpressionBlockInputStream.cpp b/dbms/src/DataStreams/ExpressionBlockInputStream.cpp index f1840acd023..3eb559abaad 100644 --- a/dbms/src/DataStreams/ExpressionBlockInputStream.cpp +++ b/dbms/src/DataStreams/ExpressionBlockInputStream.cpp @@ -27,7 +27,7 @@ Block ExpressionBlockInputStream::getTotals() Block ExpressionBlockInputStream::getHeader() const { Block res = children.back()->getHeader(); - expression->execute(res); + expression->execute(res, true); return res; } diff --git a/dbms/src/Functions/FunctionsMiscellaneous.h b/dbms/src/Functions/FunctionsMiscellaneous.h index ee3a92bb6b6..6803e16abbe 100644 --- a/dbms/src/Functions/FunctionsMiscellaneous.h +++ b/dbms/src/Functions/FunctionsMiscellaneous.h @@ -34,7 +34,7 @@ public: return std::const_pointer_cast(shared_from_this()); } - void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override + void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/, bool) override { Block expr_block; for (size_t i = 0; i < arguments.size(); ++i) @@ -118,7 +118,7 @@ public: return std::const_pointer_cast(shared_from_this()); } - void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override + void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count, bool) override { ColumnsWithTypeAndName columns; columns.reserve(arguments.size()); diff --git a/dbms/src/Functions/IFunction.cpp b/dbms/src/Functions/IFunction.cpp index 7d6d09624d2..358fbfc425b 100644 --- a/dbms/src/Functions/IFunction.cpp +++ b/dbms/src/Functions/IFunction.cpp @@ -267,7 +267,7 @@ bool allArgumentsAreConstants(const Block & block, const ColumnNumbers & args) } bool PreparedFunctionImpl::defaultImplementationForConstantArguments(Block & block, const ColumnNumbers & args, size_t result, - size_t input_rows_count) + size_t input_rows_count, bool dry_run) { ColumnNumbers arguments_to_remain_constants = getArgumentsThatAreAlwaysConstant(); @@ -312,7 +312,7 @@ bool PreparedFunctionImpl::defaultImplementationForConstantArguments(Block & blo for (size_t i = 0; i < arguments_size; ++i) temporary_argument_numbers[i] = i; - executeWithoutLowCardinalityColumns(temporary_block, temporary_argument_numbers, arguments_size, temporary_block.rows()); + executeWithoutLowCardinalityColumns(temporary_block, temporary_argument_numbers, arguments_size, temporary_block.rows(), dry_run); ColumnPtr result_column; /// extremely rare case, when we have function with completely const arguments @@ -328,7 +328,7 @@ bool PreparedFunctionImpl::defaultImplementationForConstantArguments(Block & blo bool PreparedFunctionImpl::defaultImplementationForNulls(Block & block, const ColumnNumbers & args, size_t result, - size_t input_rows_count) + size_t input_rows_count, bool dry_run) { if (args.empty() || !useDefaultImplementationForNulls()) return false; @@ -344,7 +344,7 @@ bool PreparedFunctionImpl::defaultImplementationForNulls(Block & block, const Co if (null_presence.has_nullable) { Block temporary_block = createBlockWithNestedColumns(block, args, result); - executeWithoutLowCardinalityColumns(temporary_block, args, result, temporary_block.rows()); + executeWithoutLowCardinalityColumns(temporary_block, args, result, temporary_block.rows(), dry_run); block.getByPosition(result).column = wrapInNullable(temporary_block.getByPosition(result).column, block, args, result, input_rows_count); return true; @@ -353,15 +353,19 @@ bool PreparedFunctionImpl::defaultImplementationForNulls(Block & block, const Co return false; } -void PreparedFunctionImpl::executeWithoutLowCardinalityColumns(Block & block, const ColumnNumbers & args, size_t result, size_t input_rows_count) +void PreparedFunctionImpl::executeWithoutLowCardinalityColumns(Block & block, const ColumnNumbers & args, size_t result, + size_t input_rows_count, bool dry_run) { - if (defaultImplementationForConstantArguments(block, args, result, input_rows_count)) + if (defaultImplementationForConstantArguments(block, args, result, input_rows_count, dry_run)) return; - if (defaultImplementationForNulls(block, args, result, input_rows_count)) + if (defaultImplementationForNulls(block, args, result, input_rows_count, dry_run)) return; - executeImpl(block, args, result, input_rows_count); + if (dry_run) + executeImplDryRun(block, args, result, input_rows_count); + else + executeImpl(block, args, result, input_rows_count); } static const ColumnLowCardinality * findLowCardinalityArgument(const Block & block, const ColumnNumbers & args) @@ -441,7 +445,7 @@ static void convertLowCardinalityColumnsToFull(Block & block, const ColumnNumber } } -void PreparedFunctionImpl::execute(Block & block, const ColumnNumbers & args, size_t result, size_t input_rows_count) +void PreparedFunctionImpl::execute(Block & block, const ColumnNumbers & args, size_t result, size_t input_rows_count, bool dry_run) { if (useDefaultImplementationForLowCardinalityColumns()) { @@ -477,7 +481,7 @@ void PreparedFunctionImpl::execute(Block & block, const ColumnNumbers & args, si ColumnPtr indexes = replaceLowCardinalityColumnsByNestedAndGetDictionaryIndexes( block_without_low_cardinality, args, can_be_executed_on_default_arguments); - executeWithoutLowCardinalityColumns(block_without_low_cardinality, args, result, block_without_low_cardinality.rows()); + executeWithoutLowCardinalityColumns(block_without_low_cardinality, args, result, block_without_low_cardinality.rows(), dry_run); auto & keys = block_without_low_cardinality.safeGetByPosition(result).column; if (auto full_column = keys->convertToFullColumnIfConst()) @@ -511,12 +515,12 @@ void PreparedFunctionImpl::execute(Block & block, const ColumnNumbers & args, si else { convertLowCardinalityColumnsToFull(block_without_low_cardinality, args); - executeWithoutLowCardinalityColumns(block_without_low_cardinality, args, result, input_rows_count); + executeWithoutLowCardinalityColumns(block_without_low_cardinality, args, result, input_rows_count, dry_run); res.column = block_without_low_cardinality.safeGetByPosition(result).column; } } else - executeWithoutLowCardinalityColumns(block, args, result, input_rows_count); + executeWithoutLowCardinalityColumns(block, args, result, input_rows_count, dry_run); } void FunctionBuilderImpl::checkNumberOfArguments(size_t number_of_arguments) const diff --git a/dbms/src/Functions/IFunction.h b/dbms/src/Functions/IFunction.h index 4028a61ef7b..547229ecae1 100644 --- a/dbms/src/Functions/IFunction.h +++ b/dbms/src/Functions/IFunction.h @@ -40,7 +40,7 @@ public: /// Get the main function name. virtual String getName() const = 0; - virtual void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) = 0; + virtual void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count, bool dry_run) = 0; }; using PreparedFunctionPtr = std::shared_ptr; @@ -52,7 +52,7 @@ using PreparedFunctionLowCardinalityResultCachePtr = std::shared_ptrexecute(block, arguments, result, input_rows_count); + return prepare(block, arguments, result)->execute(block, arguments, result, input_rows_count, dry_run); } #if USE_EMBEDDED_COMPILER @@ -330,6 +334,7 @@ public: bool canBeExecutedOnLowCardinalityDictionary() const override { return isDeterministicInScopeOfQuery(); } using PreparedFunctionImpl::execute; + using PreparedFunctionImpl::executeImplDryRun; using FunctionBuilderImpl::getReturnTypeImpl; using FunctionBuilderImpl::getLambdaArgumentTypesImpl; using FunctionBuilderImpl::getReturnType; @@ -404,6 +409,10 @@ protected: { return function->executeImpl(block, arguments, result, input_rows_count); } + void executeImplDryRun(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) final + { + return function->executeImplDryRun(block, arguments, result, input_rows_count); + } bool useDefaultImplementationForNulls() const final { return function->useDefaultImplementationForNulls(); } bool useDefaultImplementationForConstants() const final { return function->useDefaultImplementationForConstants(); } bool useDefaultImplementationForLowCardinalityColumns() const final { return function->useDefaultImplementationForLowCardinalityColumns(); } diff --git a/dbms/src/Functions/rowNumberInAllBlocks.cpp b/dbms/src/Functions/rowNumberInAllBlocks.cpp index 0ee2ece13d1..cce7681cf9c 100644 --- a/dbms/src/Functions/rowNumberInAllBlocks.cpp +++ b/dbms/src/Functions/rowNumberInAllBlocks.cpp @@ -44,6 +44,12 @@ public: return std::make_shared(); } + void executeImplDryRun(Block & block, const ColumnNumbers &, size_t result, size_t input_rows_count) override + { + auto column = ColumnUInt64::create(input_rows_count); + block.getByPosition(result).column = std::move(column); + } + void executeImpl(Block & block, const ColumnNumbers &, size_t result, size_t input_rows_count) override { size_t current_row_number = rows.fetch_add(input_rows_count); diff --git a/dbms/src/Interpreters/ExpressionActions.cpp b/dbms/src/Interpreters/ExpressionActions.cpp index b1fab40a654..a33261ef385 100644 --- a/dbms/src/Interpreters/ExpressionActions.cpp +++ b/dbms/src/Interpreters/ExpressionActions.cpp @@ -204,7 +204,7 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings) /// so we don't want to unfold non deterministic functions if (all_const && function_base->isSuitableForConstantFolding() && (!compile_expressions || function_base->isDeterministic())) { - function->execute(sample_block, arguments, result_position, sample_block.rows()); + function->execute(sample_block, arguments, result_position, sample_block.rows(), true); /// If the result is not a constant, just in case, we will consider the result as unknown. ColumnWithTypeAndName & col = sample_block.safeGetByPosition(result_position); @@ -325,7 +325,7 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings) } -void ExpressionAction::execute(Block & block) const +void ExpressionAction::execute(Block & block, bool dry_run) const { size_t input_rows_count = block.rows(); @@ -355,7 +355,7 @@ void ExpressionAction::execute(Block & block) const ProfileEvents::increment(ProfileEvents::FunctionExecute); if (is_function_compiled) ProfileEvents::increment(ProfileEvents::CompiledFunctionExecute); - function->execute(block, arguments, num_columns_without_result, input_rows_count); + function->execute(block, arguments, num_columns_without_result, input_rows_count, dry_run); break; } @@ -383,7 +383,7 @@ void ExpressionAction::execute(Block & block) const Block tmp_block{src_col, {{}, src_col.type, {}}}; - function_builder->build({src_col})->execute(tmp_block, {0}, 1, src_col.column->size()); + function_builder->build({src_col})->execute(tmp_block, {0}, 1, src_col.column->size(), dry_run); non_empty_array_columns[name] = tmp_block.safeGetByPosition(1).column; } @@ -492,7 +492,7 @@ void ExpressionAction::execute(Block & block) const void ExpressionAction::executeOnTotals(Block & block) const { if (type != JOIN) - execute(block); + execute(block, false); else join->joinTotals(block); } @@ -704,11 +704,11 @@ bool ExpressionActions::popUnusedArrayJoin(const Names & required_columns, Expre return true; } -void ExpressionActions::execute(Block & block) const +void ExpressionActions::execute(Block & block, bool dry_run) const { for (const auto & action : actions) { - action.execute(block); + action.execute(block, dry_run); checkLimits(block); } } diff --git a/dbms/src/Interpreters/ExpressionActions.h b/dbms/src/Interpreters/ExpressionActions.h index 781134dbeb2..67b42febe16 100644 --- a/dbms/src/Interpreters/ExpressionActions.h +++ b/dbms/src/Interpreters/ExpressionActions.h @@ -136,7 +136,7 @@ private: friend class ExpressionActions; void prepare(Block & sample_block, const Settings & settings); - void execute(Block & block) const; + void execute(Block & block, bool dry_run) const; void executeOnTotals(Block & block) const; }; @@ -217,7 +217,7 @@ public: const NamesAndTypesList & getRequiredColumnsWithTypes() const { return input_columns; } /// Execute the expression on the block. The block must contain all the columns returned by getRequiredColumns. - void execute(Block & block) const; + void execute(Block & block, bool dry_run = false) const; /** Execute the expression on the block of total values. * Almost the same as `execute`. The difference is only when JOIN is executed. diff --git a/dbms/tests/queries/0_stateless/00799_function_dry_run.reference b/dbms/tests/queries/0_stateless/00799_function_dry_run.reference new file mode 100644 index 00000000000..35cebe7569a --- /dev/null +++ b/dbms/tests/queries/0_stateless/00799_function_dry_run.reference @@ -0,0 +1,9 @@ +0.3 2018-11-19 13:00:00 \N +0.3 2018-11-19 13:05:00 \N +0.4 2018-11-19 13:10:00 1 +0.5 2018-11-19 13:15:00 1.2 +0.6 2018-11-19 13:15:00 1.5 +0.7 2018-11-19 13:20:00 1.8 +0.8 2018-11-19 13:25:00 2.1 +0.9 2018-11-19 13:25:00 2.4 +0.5 2018-11-19 13:30:00 2.2 diff --git a/dbms/tests/queries/0_stateless/00799_function_dry_run.sql b/dbms/tests/queries/0_stateless/00799_function_dry_run.sql new file mode 100644 index 00000000000..ac472c317d0 --- /dev/null +++ b/dbms/tests/queries/0_stateless/00799_function_dry_run.sql @@ -0,0 +1,37 @@ +-- https://stackoverflow.com/questions/53416531/clickhouse-moving-average + +DROP TABLE IF EXISTS test.bm; + +USE test; + +CREATE TABLE bm (amount float, business_dttm DateTime) engine Log; + +INSERT INTO bm VALUES (0.3,'2018-11-19 13:00:00'), (0.3,'2018-11-19 13:05:00'), (0.4,'2018-11-19 13:10:00'), (0.5,'2018-11-19 13:15:00'), (0.6,'2018-11-19 13:15:00'), (0.7,'2018-11-19 13:20:00'), (0.8,'2018-11-19 13:25:00'), (0.9,'2018-11-19 13:25:00'), (0.5,'2018-11-19 13:30:00'); + +WITH + ( + SELECT arrayCumSum(groupArray(amount)) + FROM + ( + SELECT + amount + FROM bm + ORDER BY business_dttm + ) + ) AS arr, + 1 + rowNumberInAllBlocks() AS id, + 3 AS window_size +SELECT + amount, + business_dttm, + if(id < window_size, NULL, round(arr[id] - arr[id - window_size], 4)) AS moving_sum +FROM +( + SELECT + amount, + business_dttm + FROM bm + ORDER BY business_dttm +); + +DROP TABLE test.bm;