Merge pull request #56120 from kitaisreal/window-functions-decrease-amount-of-virtual-function-calls

WindowTransform decrease amount of virtual function calls
This commit is contained in:
Raúl Marín 2023-11-17 13:24:38 +01:00 committed by GitHub
commit fc793fe31f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 48 additions and 23 deletions

View File

@ -182,10 +182,19 @@ ColumnPtr FunctionArrayReduce::executeImpl(const ColumnsWithTypeAndName & argume
that->addBatchArray(0, input_rows_count, places.data(), 0, aggregate_arguments, offsets->data(), arena.get());
}
for (size_t i = 0; i < input_rows_count; ++i)
/// We should use insertMergeResultInto to insert result into ColumnAggregateFunction
/// correctly if result contains AggregateFunction's states
agg_func.insertMergeResultInto(places[i], res_col, arena.get());
if (agg_func.isState())
{
for (size_t i = 0; i < input_rows_count; ++i)
/// We should use insertMergeResultInto to insert result into ColumnAggregateFunction
/// correctly if result contains AggregateFunction's states
agg_func.insertMergeResultInto(places[i], res_col, arena.get());
}
else
{
for (size_t i = 0; i < input_rows_count; ++i)
agg_func.insertResultInto(places[i], res_col, arena.get());
}
return result_holder;
}

View File

@ -141,10 +141,19 @@ ColumnPtr FunctionInitializeAggregation::executeImpl(const ColumnsWithTypeAndNam
that->addBatch(0, input_rows_count, places.data(), 0, aggregate_arguments, arena.get());
}
for (size_t i = 0; i < input_rows_count; ++i)
if (agg_func.isState())
{
/// We should use insertMergeResultInto to insert result into ColumnAggregateFunction
/// correctly if result contains AggregateFunction's states
agg_func.insertMergeResultInto(places[i], res_col, arena.get());
for (size_t i = 0; i < input_rows_count; ++i)
agg_func.insertMergeResultInto(places[i], res_col, arena.get());
}
else
{
for (size_t i = 0; i < input_rows_count; ++i)
agg_func.insertResultInto(places[i], res_col, arena.get());
}
return result_holder;
}

View File

@ -257,6 +257,7 @@ WindowTransform::WindowTransform(const Block & input_header_,
window_description.frame = *custom_default_frame;
}
workspace.is_aggregate_function_state = workspace.aggregate_function->isState();
workspace.aggregate_function_state.reset(
aggregate_function->sizeOfData(),
aggregate_function->alignOfData());
@ -957,10 +958,7 @@ void WindowTransform::updateAggregationState()
auto * columns = ws.argument_columns.data();
// Removing arena.get() from the loop makes it faster somehow...
auto * arena_ptr = arena.get();
for (auto row = first_row; row < past_the_end_row; ++row)
{
a->add(buf, columns, row, arena_ptr);
}
a->addBatchSinglePlaceFromInterval(first_row, past_the_end_row, buf, columns, arena_ptr);
}
}
}
@ -987,9 +985,16 @@ void WindowTransform::writeOutCurrentRow()
// FIXME does it also allocate the result on the arena?
// We'll have to pass it out with blocks then...
/// We should use insertMergeResultInto to insert result into ColumnAggregateFunction
/// correctly if result contains AggregateFunction's states
a->insertMergeResultInto(buf, *result_column, arena.get());
if (ws.is_aggregate_function_state)
{
/// We should use insertMergeResultInto to insert result into ColumnAggregateFunction
/// correctly if result contains AggregateFunction's states
a->insertMergeResultInto(buf, *result_column, arena.get());
}
else
{
a->insertResultInto(buf, *result_column, arena.get());
}
}
}
}

View File

@ -26,6 +26,9 @@ struct WindowFunctionWorkspace
{
AggregateFunctionPtr aggregate_function;
// Cached value of aggregate function isState virtual method
bool is_aggregate_function_state = false;
// This field is set for pure window functions. When set, we ignore the
// window_function.aggregate_function, and work through this interface
// instead.

View File

@ -6,21 +6,20 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
set -e -o pipefail
function wait_for_query_to_start()
{
while [[ $($CLICKHOUSE_CURL -sS "$CLICKHOUSE_URL" -d "SELECT count() FROM system.processes WHERE query_id = '$1'") == 0 ]]; do sleep 0.1; done
}
# Run a test query that takes very long to run.
query_id="01572_kill_window_function-$CLICKHOUSE_DATABASE"
$CLICKHOUSE_CLIENT --query_id="$query_id" --query "SELECT count(1048575) OVER (PARTITION BY intDiv(NULL, number) ORDER BY number DESC NULLS FIRST ROWS BETWEEN CURRENT ROW AND 1048575 FOLLOWING) FROM numbers(255, 1048575)" >/dev/null 2>&1 &
$CLICKHOUSE_CLIENT --query_id="$query_id" --query "SELECT sum(number) OVER (PARTITION BY number % 10 ORDER BY number DESC NULLS FIRST ROWS BETWEEN CURRENT ROW AND 99999 FOLLOWING) FROM numbers(0, 10000000) format Null;" >/dev/null 2>&1 &
client_pid=$!
echo Started
# Use one query to both kill the test query and verify that it has started,
# because if we try to kill it before it starts, the test will fail.
while [ -z "$($CLICKHOUSE_CLIENT --query "kill query where query_id = '$query_id' and current_database = currentDatabase()")" ]
do
# If we don't yet see the query in the process list, the client should still
# be running. The query is very long.
kill -0 -- $client_pid
sleep 1
done
wait_for_query_to_start $query_id
$CLICKHOUSE_CLIENT --query "kill query where query_id = '$query_id' and current_database = currentDatabase() format Null"
echo Sent kill request
# Wait for the client to terminate.