Add ability to push down LIMIT for distributed queries

This way the remote nodes will not need to send all the rows, so this
will decrease network io and also this will make queries w/
optimize_aggregation_in_order=1/LIMIT X and w/o ORDER BY faster since it
initiator will not need to read all the rows, only first X (but note
that for this you need to your data to be sharded correctly or you may
get inaccurate results).

Note, that having lots of processing stages will increase the complexity
of interpreter (it is already not that clean and simple right now).

Although using separate QueryProcessingStage looks pretty natural.

Another option is to make WithMergeableStateAfterAggregation always, but
in this case you will not be able to disable only this optimization,
i.e. if there will be some issue with it.

v2: fix OFFSET
v3: convert 01814_distributed_push_down_limit test to .sh and add retries
v4: add test with OFFSET
v5: add new query stage into the bash completion
v6/tests: use LIMIT O,L syntax over LIMIT L OFFSET O since it is broken in ANTLR parser
          https://clickhouse-test-reports.s3.yandex.net/23027/a18a06399b7aeacba7c50b5d1e981ada5df19745/functional_stateless_tests_(antlr_debug).html#fail1
v7/tests: set use_hedged_requests to 0, to avoid excessive log entries on retries
          https://clickhouse-test-reports.s3.yandex.net/23027/a18a06399b7aeacba7c50b5d1e981ada5df19745/functional_stateless_tests_flaky_check_(address).html#fail1
This commit is contained in:
Azat Khuzhin 2021-06-04 09:43:56 +03:00
parent eb1a33827a
commit 18e8f0eb5e
15 changed files with 270 additions and 16 deletions

View File

@ -1578,6 +1578,18 @@ FORMAT PrettyCompactMonoBlock
Default value: 0
## distributed_push_down_limit (#distributed-push-down-limit}
LIMIT will be applied on each shard separatelly. Usually you don't need to use it, since this will be done automatically if it is possible, i.e. for simple query SELECT FROM LIMIT.
Possible values:
- 0 - Disabled
- 1 - Enabled
!!! note "Note"
That with this setting the result of the query may be inaccurate.
## optimize_skip_unused_shards_limit {#optimize-skip-unused-shards-limit}
Limit for number of sharding key values, turns off `optimize_skip_unused_shards` if the limit is reached.

View File

@ -20,6 +20,7 @@ CLICKHOUSE_QueryProcessingStage=(
fetch_columns
with_mergeable_state
with_mergeable_state_after_aggregation
with_mergeable_state_after_aggregation_and_limit
)
CLICKHOUSE_Format=(

View File

@ -580,7 +580,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
("query", value<std::string>()->default_value(""), "query to execute")
("concurrency,c", value<unsigned>()->default_value(1), "number of parallel queries")
("delay,d", value<double>()->default_value(1), "delay between intermediate reports in seconds (set 0 to disable reports)")
("stage", value<std::string>()->default_value("complete"), "request query processing up to specified stage: complete,fetch_columns,with_mergeable_state,with_mergeable_state_after_aggregation")
("stage", value<std::string>()->default_value("complete"), "request query processing up to specified stage: complete,fetch_columns,with_mergeable_state,with_mergeable_state_after_aggregation,with_mergeable_state_after_aggregation_and_limit")
("iterations,i", value<size_t>()->default_value(0), "amount of queries to be executed")
("timelimit,t", value<double>()->default_value(0.), "stop launch of queries after specified time limit")
("randomize,r", value<bool>()->default_value(false), "randomize order of execution")

View File

@ -2463,7 +2463,7 @@ public:
("password", po::value<std::string>()->implicit_value("\n", ""), "password")
("ask-password", "ask-password")
("quota_key", po::value<std::string>(), "A string to differentiate quotas when the user have keyed quotas configured on server")
("stage", po::value<std::string>()->default_value("complete"), "Request query processing up to specified stage: complete,fetch_columns,with_mergeable_state,with_mergeable_state_after_aggregation")
("stage", po::value<std::string>()->default_value("complete"), "Request query processing up to specified stage: complete,fetch_columns,with_mergeable_state,with_mergeable_state_after_aggregation,with_mergeable_state_after_aggregation_and_limit")
("query_id", po::value<std::string>(), "query_id")
("query,q", po::value<std::string>(), "query")
("database,d", po::value<std::string>(), "database")

View File

@ -24,6 +24,8 @@ namespace QueryProcessingStage
stage = WithMergeableState;
else if (stage_string == "with_mergeable_state_after_aggregation")
stage = WithMergeableStateAfterAggregation;
else if (stage_string == "with_mergeable_state_after_aggregation_and_limit")
stage = WithMergeableStateAfterAggregationAndLimit;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown query processing stage: {}", stage_string);

View File

@ -26,8 +26,15 @@ namespace QueryProcessingStage
/// It is used for auto distributed_group_by_no_merge optimization for distributed engine.
/// (See comments in StorageDistributed).
WithMergeableStateAfterAggregation = 3,
/// Same as WithMergeableStateAfterAggregation but also will apply limit on each shard.
///
/// This query stage will be used for auto
/// distributed_group_by_no_merge/distributed_push_down_limit
/// optimization.
/// (See comments in StorageDistributed).
WithMergeableStateAfterAggregationAndLimit = 4,
MAX = 4,
MAX = 5,
};
inline const char * toString(UInt64 stage)
@ -38,6 +45,7 @@ namespace QueryProcessingStage
"WithMergeableState",
"Complete",
"WithMergeableStateAfterAggregation",
"WithMergeableStateAfterAggregationAndLimit",
};
return stage < MAX
? data[stage]

View File

@ -118,6 +118,7 @@ class IColumn;
\
M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard, if 1 SELECT is executed on each shard, if 2 SELECT and INSERT is executed on each shard", 0) \
M(UInt64, distributed_group_by_no_merge, 0, "If 1, Do not merge aggregation states from different servers for distributed queries (shards will process query up to the Complete stage, initiator just proxies the data from the shards). If 2 the initiator will apply ORDER BY and LIMIT stages (it is not in case when shard process query up to the Complete stage)", 0) \
M(UInt64, distributed_push_down_limit, 0, "If 1, LIMIT will be applied on each shard separatelly. Usually you don't need to use it, since this will be done automatically if it is possible, i.e. for simple query SELECT FROM LIMIT.", 0) \
M(Bool, optimize_distributed_group_by_sharding_key, false, "Optimize GROUP BY sharding_key queries (by avoiding costly aggregation on the initiator server).", 0) \
M(UInt64, optimize_skip_unused_shards_limit, 1000, "Limit for number of sharding key values, turns off optimize_skip_unused_shards if the limit is reached", 0) \
M(Bool, optimize_skip_unused_shards, false, "Assumes that data is distributed by sharding_key. Optimization to skip unused shards if SELECT query filters by sharding_key.", 0) \

View File

@ -632,7 +632,7 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
/// Running on the initiating server during distributed processing or if query is not distributed.
///
/// Also note that with distributed_group_by_no_merge=2 (i.e. when optimize_distributed_group_by_sharding_key takes place)
/// the query on the remote server will be processed up to WithMergeableStateAfterAggregation,
/// the query on the remote server will be processed up to WithMergeableStateAfterAggregationAndLimit,
/// So it will do partial second stage (second_stage=true), and initiator will do the final part.
bool second_stage = from_stage <= QueryProcessingStage::WithMergeableState
&& options.to_stage > QueryProcessingStage::WithMergeableState;
@ -704,7 +704,7 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
return res;
}
if (options.to_stage == QueryProcessingStage::Enum::WithMergeableStateAfterAggregation)
if (options.to_stage >= QueryProcessingStage::Enum::WithMergeableStateAfterAggregation)
{
// It's different from selected_columns, see the comment above for
// WithMergeableState stage.
@ -1011,10 +1011,10 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
/// Support optimize_distributed_group_by_sharding_key
/// Is running on the initiating server during distributed processing?
if (from_stage == QueryProcessingStage::WithMergeableStateAfterAggregation)
if (from_stage >= QueryProcessingStage::WithMergeableStateAfterAggregation)
from_aggregation_stage = true;
/// Is running on remote servers during distributed processing?
if (options.to_stage == QueryProcessingStage::WithMergeableStateAfterAggregation)
if (options.to_stage >= QueryProcessingStage::WithMergeableStateAfterAggregation)
to_aggregation_stage = true;
/// Read the data from Storage. from_stage - to what stage the request was completed in Storage.
@ -1300,7 +1300,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
* but there is no aggregation, then on the remote servers ORDER BY was made
* - therefore, we merge the sorted streams from remote servers.
*
* Also in case of remote servers was process the query up to WithMergeableStateAfterAggregation
* Also in case of remote servers was process the query up to WithMergeableStateAfterAggregationAndLimit
* (distributed_group_by_no_merge=2 or optimize_distributed_group_by_sharding_key=1 takes place),
* then merge the sorted streams is enough, since remote servers already did full ORDER BY.
*/
@ -1334,13 +1334,15 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
}
}
bool apply_limit = options.to_stage != QueryProcessingStage::WithMergeableStateAfterAggregation;
bool apply_offset = options.to_stage != QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
bool has_prelimit = false;
if (!to_aggregation_stage &&
if (apply_limit &&
query.limitLength() && !query.limit_with_ties && !hasWithTotalsInAnySubqueryInFromClause(query) &&
!query.arrayJoinExpressionList() && !query.distinct && !expressions.hasLimitBy() && !settings.extremes &&
!has_withfill)
{
executePreLimit(query_plan, false);
executePreLimit(query_plan, /* do_not_skip_offset= */!apply_offset);
has_prelimit = true;
}
@ -1367,7 +1369,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
}
/// Projection not be done on the shards, since then initiator will not find column in blocks.
/// (significant only for WithMergeableStateAfterAggregation).
/// (significant only for WithMergeableStateAfterAggregation/WithMergeableStateAfterAggregationAndLimit).
if (!to_aggregation_stage)
{
/// We must do projection after DISTINCT because projection may remove some columns.
@ -1378,10 +1380,10 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
executeExtremes(query_plan);
/// Limit is no longer needed if there is prelimit.
if (!to_aggregation_stage && !has_prelimit)
if (apply_limit && !has_prelimit)
executeLimit(query_plan);
if (!to_aggregation_stage)
if (apply_offset)
executeOffset(query_plan);
}
}

View File

@ -69,6 +69,7 @@ Block getHeaderForProcessingStage(
case QueryProcessingStage::WithMergeableState:
case QueryProcessingStage::Complete:
case QueryProcessingStage::WithMergeableStateAfterAggregation:
case QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit:
case QueryProcessingStage::MAX:
{
auto query = query_info.query->clone();

View File

@ -288,6 +288,7 @@ void replaceConstantExpressions(
/// is one of the following:
/// - QueryProcessingStage::Complete
/// - QueryProcessingStage::WithMergeableStateAfterAggregation
/// - QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit
/// - none (in this case regular WithMergeableState should be used)
std::optional<QueryProcessingStage::Enum> getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, bool extremes, const Block & sharding_key_block)
{
@ -349,13 +350,13 @@ std::optional<QueryProcessingStage::Enum> getOptimizedQueryProcessingStage(const
// ORDER BY
const ASTPtr order_by = select.orderBy();
if (order_by)
return QueryProcessingStage::WithMergeableStateAfterAggregation;
return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
// LIMIT BY
// LIMIT
// OFFSET
if (select.limitBy() || select.limitLength() || select.limitOffset())
return QueryProcessingStage::WithMergeableStateAfterAggregation;
return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
// Only simple SELECT FROM GROUP BY sharding_key can use Complete state.
return QueryProcessingStage::Complete;
@ -514,10 +515,22 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
if (settings.distributed_group_by_no_merge)
{
if (settings.distributed_group_by_no_merge == DISTRIBUTED_GROUP_BY_NO_MERGE_AFTER_AGGREGATION)
return QueryProcessingStage::WithMergeableStateAfterAggregation;
{
if (settings.distributed_push_down_limit)
return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
else
return QueryProcessingStage::WithMergeableStateAfterAggregation;
}
else
{
/// NOTE: distributed_group_by_no_merge=1 does not respect distributed_push_down_limit
/// (since in this case queries processed separatelly and the initiator is just a proxy in this case).
return QueryProcessingStage::Complete;
}
}
if (settings.distributed_push_down_limit)
return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
/// Nested distributed query cannot return Complete stage,
/// since the parent query need to aggregate the results after.

View File

@ -0,0 +1,37 @@
distributed_push_down_limit=0
100 100
distributed_push_down_limit=1
0
1
2
3
4
5
6
7
8
9
40 40
auto-distributed_push_down_limit
0
1
2
3
4
5
6
7
8
9
40 40
distributed_push_down_limit=1 with OFFSET
97
96
96
95
95
94
94
93
93
92

View File

@ -0,0 +1,167 @@
#!/usr/bin/env bash
# shellcheck disable=SC2206
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# -- NOTE: this test cannot use 'current_database = $CLICKHOUSE_DATABASE',
# -- because it does not propagated via remote queries,
# -- hence it uses query_id/initial_query_id.
function setup()
{
$CLICKHOUSE_CLIENT -nm -q "
drop table if exists data_01814;
drop table if exists dist_01814;
create table data_01814 (key Int) Engine=MergeTree() order by key settings index_granularity=10 as select * from numbers(100);
create table dist_01814 as data_01814 engine=Distributed('test_cluster_two_shards', $CLICKHOUSE_DATABASE, data_01814, key);
"
}
function cleanup()
{
$CLICKHOUSE_CLIENT -nm -q "
drop table data_01814;
drop table dist_01814;
"
}
function make_query_id()
{
echo "$(tr -cd '[:lower:]' < /dev/urandom | head -c10)-$CLICKHOUSE_DATABASE"
}
function test_distributed_push_down_limit_with_query_log()
{
local table=$1 && shift
local offset=$1 && shift
local query_id
query_id="$(make_query_id)"
# NOTES:
# - max_rows_to_read_leaf cannot be used since it does not know anything
# about optimize_aggregation_in_order,
# - limit push down can be checked only with optimize_aggregation_in_order,
# since otherwise the query will be canceled too early, and read_rows will be
# small.
local settings_and_opts=(
--query_id "$query_id"
--max_block_size 20
--optimize_aggregation_in_order 1
--log_queries 1
--log_queries_min_type 'QUERY_FINISH'
# disable hedged requests to avoid excessive log entries
--use_hedged_requests 0
"$@"
)
$CLICKHOUSE_CLIENT "${settings_and_opts[@]}" -q "select * from $table group by key limit $offset, 10"
$CLICKHOUSE_CLIENT -nm -q "
system flush logs;
select read_rows from system.query_log
where
event_date = today()
and query_kind = 'Select' /* exclude DESC TABLE */
and initial_query_id = '$query_id' and initial_query_id != query_id;
" | xargs # convert new lines to spaces
}
function test_distributed_push_down_limit_0()
{
local args=(
"remote('127.{2,3}', $CLICKHOUSE_DATABASE, data_01814)"
0 # offset
--distributed_push_down_limit 0
)
test_distributed_push_down_limit_with_query_log "${args[@]}" "$@"
}
function test_distributed_push_down_limit_1()
{
local args=(
"remote('127.{2,3}', $CLICKHOUSE_DATABASE, data_01814)"
0 # offset
--distributed_push_down_limit 1
)
test_distributed_push_down_limit_with_query_log "${args[@]}"
}
function test_distributed_push_down_limit_1_offset()
{
local settings_and_opts=(
--distributed_push_down_limit 1
)
$CLICKHOUSE_CLIENT "${settings_and_opts[@]}" -q "select * from remote('127.{2,3}', $CLICKHOUSE_DATABASE, data_01814) group by key order by key desc limit 5, 10"
}
function test_auto_distributed_push_down_limit()
{
local args=(
dist_01814
0 # offset
--optimize_skip_unused_shards 1
--optimize_distributed_group_by_sharding_key 1
--prefer_localhost_replica 0
--distributed_push_down_limit 0
)
test_distributed_push_down_limit_with_query_log "${args[@]}"
}
function main()
{
setup
trap cleanup EXIT
echo 'distributed_push_down_limit=0'
test_distributed_push_down_limit_0 --format Null
#
# The following tests (tests with distributed_push_down_limit=1) requires
# retries, since the query may be canceled earlier due to LIMIT, and so
# only one shard will be processed, and it will get not 40 but 20 rows:
#
# 1.160920 [ 291 ] {7ac5de70-c26c-4e3b-bdee-3873ad1b84f1} <Debug> executeQuery: (from [::ffff:127.0.0.1]:42778, initial_query_id: 66cf643c-b1b4-4f7e-942a-c4c3493029f6, using production parser) (comment: /usr/share/clickhouse-test/queries/0_stateless/01814_distributed_push_down_limit.sql) WITH CAST('test_31uut9', 'String') AS id_distributed_push_down_limit_1 SELECT key FROM test_31uut9.data_01814 GROUP BY key LIMIT 10
# 1.214964 [ 291 ] {7ac5de70-c26c-4e3b-bdee-3873ad1b84f1} <Trace> ContextAccess (default): Access granted: SELECT(key) ON test_31uut9.data_01814
# 1.216790 [ 291 ] {7ac5de70-c26c-4e3b-bdee-3873ad1b84f1} <Debug> test_31uut9.data_01814 (b484ad2e-0591-4faf-8110-1dcbd7cdd0db) (SelectExecutor): Key condition: unknown
# 1.227245 [ 291 ] {7ac5de70-c26c-4e3b-bdee-3873ad1b84f1} <Debug> test_31uut9.data_01814 (b484ad2e-0591-4faf-8110-1dcbd7cdd0db) (SelectExecutor): Selected 1/1 parts by partition key, 1 parts by primary key, 10/11 marks by primary key, 10 marks to read from 1 ranges
# 1.228452 [ 291 ] {7ac5de70-c26c-4e3b-bdee-3873ad1b84f1} <Trace> MergeTreeSelectProcessor: Reading 3 ranges from part all_1_1_0, approx. 100 rows starting from 0
# 1.229104 [ 291 ] {7ac5de70-c26c-4e3b-bdee-3873ad1b84f1} <Trace> InterpreterSelectQuery: FetchColumns -> WithMergeableStateAfterAggregationAndLimit
# 1.339085 [ 291 ] {7ac5de70-c26c-4e3b-bdee-3873ad1b84f1} <Information> TCPHandler: Query was cancelled.
# 1.416573 [ 291 ] {7ac5de70-c26c-4e3b-bdee-3873ad1b84f1} <Information> executeQuery: Read 20 rows, 80.00 B in 0.254374666 sec., 78 rows/sec., 314.50 B/sec.
# 1.419006 [ 291 ] {7ac5de70-c26c-4e3b-bdee-3873ad1b84f1} <Debug> MemoryTracker: Peak memory usage (for query): 0.00 B.
#
local out out_lines max_tries=20
echo 'distributed_push_down_limit=1'
for ((i = 0; i < max_tries; ++i)); do
out=$(test_distributed_push_down_limit_1)
out_lines=( $out )
if [[ ${#out_lines[@]} -gt 2 ]] && [[ ${out_lines[-1]} = 40 ]] && [[ ${out_lines[-2]} = 40 ]]; then
break
fi
done
echo "$out"
echo 'auto-distributed_push_down_limit'
for ((i = 0; i < max_tries; ++i)); do
out=$(test_auto_distributed_push_down_limit)
out_lines=( $out )
if [[ ${#out_lines[@]} -gt 2 ]] && [[ ${out_lines[-1]} = 40 ]] && [[ ${out_lines[-2]} = 40 ]]; then
break
fi
done
echo "$out"
echo 'distributed_push_down_limit=1 with OFFSET'
test_distributed_push_down_limit_1_offset
}
main "$@"

View File

@ -0,0 +1,8 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
# with_mergeable_state_after_aggregation will not stop after 1 row, while with_mergeable_state_after_aggregation_and_limit should
$CLICKHOUSE_CLIENT -q 'select * from system.numbers limit 1' --stage with_mergeable_state_after_aggregation_and_limit

View File

@ -234,6 +234,7 @@
01801_distinct_group_by_shard
01804_dictionary_decimal256_type
01801_s3_distributed
01814_distributed_push_down_limit
01833_test_collation_alvarotuso
01850_dist_INSERT_preserve_error
01870_modulo_partition_key