Merge pull request #24462 from azat/total_rows_approx-fix

Account total_rows_approx as soon as possible
This commit is contained in:
Nikolai Kochetov 2021-05-27 11:34:50 +03:00 committed by GitHub
commit b3d30ed9cb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 89 additions and 2 deletions

View File

@ -3,7 +3,9 @@
#include <Processors/IProcessor.h> #include <Processors/IProcessor.h>
#include <Processors/QueryPlan/QueryIdHolder.h> #include <Processors/QueryPlan/QueryIdHolder.h>
#include <Processors/QueryPlan/QueryPlan.h> #include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/Sources/SourceWithProgress.h> #include <Access/EnabledQuota.h>
#include <DataStreams/SizeLimits.h>
#include <Storages/TableLockHolder.h>
namespace DB namespace DB
{ {

View File

@ -23,6 +23,30 @@ SourceWithProgress::SourceWithProgress(Block header, bool enable_auto_progress)
{ {
} }
void SourceWithProgress::setProcessListElement(QueryStatus * elem)
{
process_list_elem = elem;
/// Update total_rows_approx as soon as possible.
///
/// It is important to do this, since you will not get correct
/// total_rows_approx until the query will start reading all parts (in case
/// of query needs to read from multiple parts), and this is especially a
/// problem in case of max_threads=1.
///
/// NOTE: This can be done only if progress callback already set, since
/// otherwise total_rows_approx will lost.
if (total_rows_approx != 0 && progress_callback)
{
Progress total_rows_progress = {0, 0, total_rows_approx};
progress_callback(total_rows_progress);
process_list_elem->updateProgressIn(total_rows_progress);
total_rows_approx = 0;
}
}
void SourceWithProgress::work() void SourceWithProgress::work()
{ {
if (!limits.speed_limits.checkTimeLimit(total_stopwatch.elapsed(), limits.timeout_overflow_mode)) if (!limits.speed_limits.checkTimeLimit(total_stopwatch.elapsed(), limits.timeout_overflow_mode))

View File

@ -51,7 +51,7 @@ public:
void setLimits(const StreamLocalLimits & limits_) final { limits = limits_; } void setLimits(const StreamLocalLimits & limits_) final { limits = limits_; }
void setLeafLimits(const SizeLimits & leaf_limits_) final {leaf_limits = leaf_limits_; } void setLeafLimits(const SizeLimits & leaf_limits_) final {leaf_limits = leaf_limits_; }
void setQuota(const std::shared_ptr<const EnabledQuota> & quota_) final { quota = quota_; } void setQuota(const std::shared_ptr<const EnabledQuota> & quota_) final { quota = quota_; }
void setProcessListElement(QueryStatus * elem) final { process_list_elem = elem; } void setProcessListElement(QueryStatus * elem) final;
void setProgressCallback(const ProgressCallback & callback) final { progress_callback = callback; } void setProgressCallback(const ProgressCallback & callback) final { progress_callback = callback; }
void addTotalRowsApprox(size_t value) final { total_rows_approx += value; } void addTotalRowsApprox(size_t value) final { total_rows_approx += value; }

View File

@ -0,0 +1,8 @@
Waiting for query to be started...
Query started.
Checking total_rows_approx.
10
10
10
10
10

View File

@ -0,0 +1,53 @@
#!/usr/bin/env bash
# Check that total_rows_approx (via system.processes) includes all rows from
# all parts at the query start.
#
# At some point total_rows_approx was accounted only when the query starts
# reading the part, and so total_rows_approx wasn't reliable, even for simple
# SELECT FROM MergeTree()
# It was fixed by take total_rows_approx into account as soon as possible.
#
# To check total_rows_approx this query starts the query in background,
# that sleep's 1 second for each part, and by using max_threads=1 the query
# reads parts sequentially and sleeps 1 second between parts.
# Also the test spawns background process to check total_rows_approx for this
# query.
# It checks multiple times since at first few iterations the query may not
# start yet (since there are 3 excessive sleep calls - 1 for primary key
# analysis and 2 for partition pruning), and get only last 5 total_rows_approx
# rows (one row is not enough since when the query finishes total_rows_approx
# will be set to 10 anyway, regardless proper accounting).
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "drop table if exists data_01882"
$CLICKHOUSE_CLIENT -q "create table data_01882 (key Int) Engine=MergeTree() partition by key order by key as select * from numbers(10)"
QUERY_ID="$CLICKHOUSE_TEST_NAME-$(tr -cd '[:lower:]' < /dev/urandom | head -c10)"
function check_background_query()
{
echo "Waiting for query to be started..."
while [[ $($CLICKHOUSE_CLIENT --param_query_id="$QUERY_ID" -q 'select count() from system.processes where query_id = {query_id:String}') != 1 ]]; do
sleep 0.01
done
echo "Query started."
echo "Checking total_rows_approx."
# check total_rows_approx multiple times
# (to make test more reliable to what it covers)
local i=0
for ((i = 0; i < 20; ++i)); do
$CLICKHOUSE_CLIENT --param_query_id="$QUERY_ID" -q 'select total_rows_approx from system.processes where query_id = {query_id:String}'
(( ++i ))
sleep 1
done | tail -n5
}
check_background_query &
# this query will sleep 10 seconds in total, 1 seconds for each part (10 parts).
$CLICKHOUSE_CLIENT -q "select *, sleepEachRow(1) from data_01882" --max_threads=1 --format Null --query_id="$QUERY_ID" --max_block_size=1
wait