Function execution with dry runs

This commit prevents stateful functions like rowNumberInAllBlocks from being modified in getHeader() calls.
This commit is contained in:
Amos Bird 2018-12-02 19:00:23 +08:00
parent 1cc69100f1
commit 3cc0829cc1
9 changed files with 96 additions and 31 deletions

View File

@ -27,7 +27,7 @@ Block ExpressionBlockInputStream::getTotals()
Block ExpressionBlockInputStream::getHeader() const
{
Block res = children.back()->getHeader();
expression->execute(res);
expression->execute(res, true);
return res;
}

View File

@ -34,7 +34,7 @@ public:
return std::const_pointer_cast<FunctionExpression>(shared_from_this());
}
void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) override
void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/, bool) override
{
Block expr_block;
for (size_t i = 0; i < arguments.size(); ++i)
@ -118,7 +118,7 @@ public:
return std::const_pointer_cast<FunctionCapture>(shared_from_this());
}
void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count, bool) override
{
ColumnsWithTypeAndName columns;
columns.reserve(arguments.size());

View File

@ -267,7 +267,7 @@ bool allArgumentsAreConstants(const Block & block, const ColumnNumbers & args)
}
bool PreparedFunctionImpl::defaultImplementationForConstantArguments(Block & block, const ColumnNumbers & args, size_t result,
size_t input_rows_count)
size_t input_rows_count, bool dry_run)
{
ColumnNumbers arguments_to_remain_constants = getArgumentsThatAreAlwaysConstant();
@ -312,7 +312,7 @@ bool PreparedFunctionImpl::defaultImplementationForConstantArguments(Block & blo
for (size_t i = 0; i < arguments_size; ++i)
temporary_argument_numbers[i] = i;
executeWithoutLowCardinalityColumns(temporary_block, temporary_argument_numbers, arguments_size, temporary_block.rows());
executeWithoutLowCardinalityColumns(temporary_block, temporary_argument_numbers, arguments_size, temporary_block.rows(), dry_run);
ColumnPtr result_column;
/// extremely rare case, when we have function with completely const arguments
@ -328,7 +328,7 @@ bool PreparedFunctionImpl::defaultImplementationForConstantArguments(Block & blo
bool PreparedFunctionImpl::defaultImplementationForNulls(Block & block, const ColumnNumbers & args, size_t result,
size_t input_rows_count)
size_t input_rows_count, bool dry_run)
{
if (args.empty() || !useDefaultImplementationForNulls())
return false;
@ -344,7 +344,7 @@ bool PreparedFunctionImpl::defaultImplementationForNulls(Block & block, const Co
if (null_presence.has_nullable)
{
Block temporary_block = createBlockWithNestedColumns(block, args, result);
executeWithoutLowCardinalityColumns(temporary_block, args, result, temporary_block.rows());
executeWithoutLowCardinalityColumns(temporary_block, args, result, temporary_block.rows(), dry_run);
block.getByPosition(result).column = wrapInNullable(temporary_block.getByPosition(result).column, block, args,
result, input_rows_count);
return true;
@ -353,15 +353,19 @@ bool PreparedFunctionImpl::defaultImplementationForNulls(Block & block, const Co
return false;
}
void PreparedFunctionImpl::executeWithoutLowCardinalityColumns(Block & block, const ColumnNumbers & args, size_t result, size_t input_rows_count)
void PreparedFunctionImpl::executeWithoutLowCardinalityColumns(Block & block, const ColumnNumbers & args, size_t result,
size_t input_rows_count, bool dry_run)
{
if (defaultImplementationForConstantArguments(block, args, result, input_rows_count))
if (defaultImplementationForConstantArguments(block, args, result, input_rows_count, dry_run))
return;
if (defaultImplementationForNulls(block, args, result, input_rows_count))
if (defaultImplementationForNulls(block, args, result, input_rows_count, dry_run))
return;
executeImpl(block, args, result, input_rows_count);
if (dry_run)
executeImplDryRun(block, args, result, input_rows_count);
else
executeImpl(block, args, result, input_rows_count);
}
static const ColumnLowCardinality * findLowCardinalityArgument(const Block & block, const ColumnNumbers & args)
@ -441,7 +445,7 @@ static void convertLowCardinalityColumnsToFull(Block & block, const ColumnNumber
}
}
void PreparedFunctionImpl::execute(Block & block, const ColumnNumbers & args, size_t result, size_t input_rows_count)
void PreparedFunctionImpl::execute(Block & block, const ColumnNumbers & args, size_t result, size_t input_rows_count, bool dry_run)
{
if (useDefaultImplementationForLowCardinalityColumns())
{
@ -477,7 +481,7 @@ void PreparedFunctionImpl::execute(Block & block, const ColumnNumbers & args, si
ColumnPtr indexes = replaceLowCardinalityColumnsByNestedAndGetDictionaryIndexes(
block_without_low_cardinality, args, can_be_executed_on_default_arguments);
executeWithoutLowCardinalityColumns(block_without_low_cardinality, args, result, block_without_low_cardinality.rows());
executeWithoutLowCardinalityColumns(block_without_low_cardinality, args, result, block_without_low_cardinality.rows(), dry_run);
auto & keys = block_without_low_cardinality.safeGetByPosition(result).column;
if (auto full_column = keys->convertToFullColumnIfConst())
@ -511,12 +515,12 @@ void PreparedFunctionImpl::execute(Block & block, const ColumnNumbers & args, si
else
{
convertLowCardinalityColumnsToFull(block_without_low_cardinality, args);
executeWithoutLowCardinalityColumns(block_without_low_cardinality, args, result, input_rows_count);
executeWithoutLowCardinalityColumns(block_without_low_cardinality, args, result, input_rows_count, dry_run);
res.column = block_without_low_cardinality.safeGetByPosition(result).column;
}
}
else
executeWithoutLowCardinalityColumns(block, args, result, input_rows_count);
executeWithoutLowCardinalityColumns(block, args, result, input_rows_count, dry_run);
}
void FunctionBuilderImpl::checkNumberOfArguments(size_t number_of_arguments) const

View File

@ -40,7 +40,7 @@ public:
/// Get the main function name.
virtual String getName() const = 0;
virtual void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) = 0;
virtual void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count, bool dry_run) = 0;
};
using PreparedFunctionPtr = std::shared_ptr<IPreparedFunction>;
@ -52,7 +52,7 @@ using PreparedFunctionLowCardinalityResultCachePtr = std::shared_ptr<PreparedFun
class PreparedFunctionImpl : public IPreparedFunction
{
public:
void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) final;
void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count, bool dry_run = false) final;
/// Create cache which will be used to store result of function executed on LowCardinality column.
/// Only for default LowCardinality implementation.
@ -61,6 +61,10 @@ public:
protected:
virtual void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) = 0;
virtual void executeImplDryRun(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count)
{
executeImpl(block, arguments, result, input_rows_count);
}
/** Default implementation in presence of Nullable arguments or NULL constants as arguments is the following:
* if some of arguments are NULL constants then return NULL constant,
@ -94,11 +98,11 @@ protected:
private:
bool defaultImplementationForNulls(Block & block, const ColumnNumbers & args, size_t result,
size_t input_rows_count);
size_t input_rows_count, bool dry_run);
bool defaultImplementationForConstantArguments(Block & block, const ColumnNumbers & args, size_t result,
size_t input_rows_count);
size_t input_rows_count, bool dry_run);
void executeWithoutLowCardinalityColumns(Block & block, const ColumnNumbers & arguments, size_t result,
size_t input_rows_count);
size_t input_rows_count, bool dry_run);
/// Cache is created by function createLowCardinalityResultCache()
PreparedFunctionLowCardinalityResultCachePtr low_cardinality_result_cache;
@ -123,9 +127,9 @@ public:
virtual PreparedFunctionPtr prepare(const Block & sample_block, const ColumnNumbers & arguments, size_t result) const = 0;
/// TODO: make const
virtual void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count)
virtual void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count, bool dry_run = false)
{
return prepare(block, arguments, result)->execute(block, arguments, result, input_rows_count);
return prepare(block, arguments, result)->execute(block, arguments, result, input_rows_count, dry_run);
}
#if USE_EMBEDDED_COMPILER
@ -330,6 +334,7 @@ public:
bool canBeExecutedOnLowCardinalityDictionary() const override { return isDeterministicInScopeOfQuery(); }
using PreparedFunctionImpl::execute;
using PreparedFunctionImpl::executeImplDryRun;
using FunctionBuilderImpl::getReturnTypeImpl;
using FunctionBuilderImpl::getLambdaArgumentTypesImpl;
using FunctionBuilderImpl::getReturnType;
@ -404,6 +409,10 @@ protected:
{
return function->executeImpl(block, arguments, result, input_rows_count);
}
void executeImplDryRun(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) final
{
return function->executeImplDryRun(block, arguments, result, input_rows_count);
}
bool useDefaultImplementationForNulls() const final { return function->useDefaultImplementationForNulls(); }
bool useDefaultImplementationForConstants() const final { return function->useDefaultImplementationForConstants(); }
bool useDefaultImplementationForLowCardinalityColumns() const final { return function->useDefaultImplementationForLowCardinalityColumns(); }

View File

@ -44,6 +44,12 @@ public:
return std::make_shared<DataTypeUInt64>();
}
void executeImplDryRun(Block & block, const ColumnNumbers &, size_t result, size_t input_rows_count) override
{
auto column = ColumnUInt64::create(input_rows_count);
block.getByPosition(result).column = std::move(column);
}
void executeImpl(Block & block, const ColumnNumbers &, size_t result, size_t input_rows_count) override
{
size_t current_row_number = rows.fetch_add(input_rows_count);

View File

@ -204,7 +204,7 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings)
/// so we don't want to unfold non deterministic functions
if (all_const && function_base->isSuitableForConstantFolding() && (!compile_expressions || function_base->isDeterministic()))
{
function->execute(sample_block, arguments, result_position, sample_block.rows());
function->execute(sample_block, arguments, result_position, sample_block.rows(), true);
/// If the result is not a constant, just in case, we will consider the result as unknown.
ColumnWithTypeAndName & col = sample_block.safeGetByPosition(result_position);
@ -325,7 +325,7 @@ void ExpressionAction::prepare(Block & sample_block, const Settings & settings)
}
void ExpressionAction::execute(Block & block) const
void ExpressionAction::execute(Block & block, bool dry_run) const
{
size_t input_rows_count = block.rows();
@ -355,7 +355,7 @@ void ExpressionAction::execute(Block & block) const
ProfileEvents::increment(ProfileEvents::FunctionExecute);
if (is_function_compiled)
ProfileEvents::increment(ProfileEvents::CompiledFunctionExecute);
function->execute(block, arguments, num_columns_without_result, input_rows_count);
function->execute(block, arguments, num_columns_without_result, input_rows_count, dry_run);
break;
}
@ -383,7 +383,7 @@ void ExpressionAction::execute(Block & block) const
Block tmp_block{src_col, {{}, src_col.type, {}}};
function_builder->build({src_col})->execute(tmp_block, {0}, 1, src_col.column->size());
function_builder->build({src_col})->execute(tmp_block, {0}, 1, src_col.column->size(), dry_run);
non_empty_array_columns[name] = tmp_block.safeGetByPosition(1).column;
}
@ -492,7 +492,7 @@ void ExpressionAction::execute(Block & block) const
void ExpressionAction::executeOnTotals(Block & block) const
{
if (type != JOIN)
execute(block);
execute(block, false);
else
join->joinTotals(block);
}
@ -704,11 +704,11 @@ bool ExpressionActions::popUnusedArrayJoin(const Names & required_columns, Expre
return true;
}
void ExpressionActions::execute(Block & block) const
void ExpressionActions::execute(Block & block, bool dry_run) const
{
for (const auto & action : actions)
{
action.execute(block);
action.execute(block, dry_run);
checkLimits(block);
}
}

View File

@ -136,7 +136,7 @@ private:
friend class ExpressionActions;
void prepare(Block & sample_block, const Settings & settings);
void execute(Block & block) const;
void execute(Block & block, bool dry_run) const;
void executeOnTotals(Block & block) const;
};
@ -217,7 +217,7 @@ public:
const NamesAndTypesList & getRequiredColumnsWithTypes() const { return input_columns; }
/// Execute the expression on the block. The block must contain all the columns returned by getRequiredColumns.
void execute(Block & block) const;
void execute(Block & block, bool dry_run = false) const;
/** Execute the expression on the block of total values.
* Almost the same as `execute`. The difference is only when JOIN is executed.

View File

@ -0,0 +1,9 @@
0.3 2018-11-19 13:00:00 \N
0.3 2018-11-19 13:05:00 \N
0.4 2018-11-19 13:10:00 1
0.5 2018-11-19 13:15:00 1.2
0.6 2018-11-19 13:15:00 1.5
0.7 2018-11-19 13:20:00 1.8
0.8 2018-11-19 13:25:00 2.1
0.9 2018-11-19 13:25:00 2.4
0.5 2018-11-19 13:30:00 2.2

View File

@ -0,0 +1,37 @@
-- https://stackoverflow.com/questions/53416531/clickhouse-moving-average
DROP TABLE IF EXISTS test.bm;
USE test;
CREATE TABLE bm (amount float, business_dttm DateTime) engine Log;
INSERT INTO bm VALUES (0.3,'2018-11-19 13:00:00'), (0.3,'2018-11-19 13:05:00'), (0.4,'2018-11-19 13:10:00'), (0.5,'2018-11-19 13:15:00'), (0.6,'2018-11-19 13:15:00'), (0.7,'2018-11-19 13:20:00'), (0.8,'2018-11-19 13:25:00'), (0.9,'2018-11-19 13:25:00'), (0.5,'2018-11-19 13:30:00');
WITH
(
SELECT arrayCumSum(groupArray(amount))
FROM
(
SELECT
amount
FROM bm
ORDER BY business_dttm
)
) AS arr,
1 + rowNumberInAllBlocks() AS id,
3 AS window_size
SELECT
amount,
business_dttm,
if(id < window_size, NULL, round(arr[id] - arr[id - window_size], 4)) AS moving_sum
FROM
(
SELECT
amount,
business_dttm
FROM bm
ORDER BY business_dttm
);
DROP TABLE test.bm;