Merge pull request #55836 from azat/dist/limit-by-fix

RFC: Fix "Cannot find column X in source stream" for Distributed queries with LIMIT BY
This commit is contained in:
Alexey Milovidov 2023-11-26 04:03:41 +01:00 committed by GitHub
commit f8ebe5134d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 115 additions and 10 deletions

View File

@ -3,6 +3,7 @@
from argparse import ArgumentParser
import os
import jinja2
import itertools
def removesuffix(text, suffix):
@ -47,6 +48,7 @@ def main(args):
loader=jinja2.FileSystemLoader(suite_dir),
keep_trailing_newline=True,
)
j2env.globals.update(product=itertools.product)
test_names = os.listdir(suite_dir)
for test_name in test_names:

View File

@ -1050,6 +1050,9 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
if (analysis_result.before_window)
return analysis_result.before_window->getResultColumns();
// NOTE: should not handle before_limit_by specially since
// WithMergeableState does not process LIMIT BY
return analysis_result.before_order_by->getResultColumns();
}
@ -1093,6 +1096,12 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
if (analysis_result.before_window)
return analysis_result.before_window->getResultColumns();
// In case of query on remote shards executed up to
// WithMergeableStateAfterAggregation*, they can process LIMIT BY,
// since the initiator will not apply LIMIT BY again.
if (analysis_result.before_limit_by)
return analysis_result.before_limit_by->getResultColumns();
return analysis_result.before_order_by->getResultColumns();
}
@ -1539,7 +1548,11 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
if (query.limitLength())
executeDistinct(query_plan, false, expressions.selected_columns, false);
if (expressions.hasLimitBy())
/// In case of query executed on remote shards (up to
/// WithMergeableState*) LIMIT BY cannot be applied, since it
/// will be applied on the initiator as well, and the header
/// may not match in some obscure cases.
if (options.to_stage == QueryProcessingStage::FetchColumns && expressions.hasLimitBy())
{
executeExpression(query_plan, expressions.before_limit_by, "Before LIMIT BY");
executeLimitBy(query_plan);

View File

@ -8,6 +8,7 @@
import enum
from queue import Full
import shutil
import itertools
import sys
import os
import os.path
@ -1640,6 +1641,8 @@ class TestSuite:
if USE_JINJA
else None
)
if j2env is not None:
j2env.globals.update(product=itertools.product)
for test_name in os.listdir(self.suite_path):
if not is_test_from_dir(self.suite_path, test_name):

View File

@ -19,10 +19,8 @@ explain select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2
Expression (Projection)
LimitBy
Union
Expression (Before LIMIT BY)
LimitBy
Expression ((Before LIMIT BY + (Before ORDER BY + (Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))))
ReadFromStorage (SystemNumbers)
Expression ((Before LIMIT BY + (Before ORDER BY + (Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))))
ReadFromStorage (SystemNumbers)
Expression
ReadFromRemote (Read from remote replica)
explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- optimized
@ -58,11 +56,10 @@ Expression (Projection)
Expression (Before LIMIT BY)
Sorting (Merge sorted streams for ORDER BY, without aggregation)
Union
LimitBy
Expression ((Before LIMIT BY + (Before ORDER BY + (Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) [lifted up part]))
Sorting (Sorting for ORDER BY)
Expression ((Before ORDER BY + (Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))))
ReadFromStorage (SystemNumbers)
Expression ((Before ORDER BY + (Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))) [lifted up part])
Sorting (Sorting for ORDER BY)
Expression ((Before ORDER BY + (Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY)))))
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized
Expression (Projection)

View File

@ -0,0 +1,52 @@
Used settings: prefer_localhost_replica=0,distributed_group_by_no_merge=0,distributed_push_down_limit=1
0
0
Used settings: prefer_localhost_replica=0,distributed_group_by_no_merge=0,distributed_push_down_limit=0
0
0
Used settings: prefer_localhost_replica=0,distributed_group_by_no_merge=1,distributed_push_down_limit=1
0
0
0
0
Used settings: prefer_localhost_replica=0,distributed_group_by_no_merge=1,distributed_push_down_limit=0
0
0
0
0
Used settings: prefer_localhost_replica=0,distributed_group_by_no_merge=2,distributed_push_down_limit=1
0
0
0
0
Used settings: prefer_localhost_replica=0,distributed_group_by_no_merge=2,distributed_push_down_limit=0
0
0
0
0
Used settings: prefer_localhost_replica=1,distributed_group_by_no_merge=0,distributed_push_down_limit=1
0
0
Used settings: prefer_localhost_replica=1,distributed_group_by_no_merge=0,distributed_push_down_limit=0
0
0
Used settings: prefer_localhost_replica=1,distributed_group_by_no_merge=1,distributed_push_down_limit=1
0
0
0
0
Used settings: prefer_localhost_replica=1,distributed_group_by_no_merge=1,distributed_push_down_limit=0
0
0
0
0
Used settings: prefer_localhost_replica=1,distributed_group_by_no_merge=2,distributed_push_down_limit=1
0
0
0
0
Used settings: prefer_localhost_replica=1,distributed_group_by_no_merge=2,distributed_push_down_limit=0
0
0
0
0

View File

@ -0,0 +1,26 @@
{#
Randomize settings:
- prefer_localhost_replica
- distributed_group_by_no_merge (0 = WithMergeableState, 1 = Complete, 2 = WithMergeableStateAfterAggregation/WithMergeableStateAfterAggregationAndLimit)
- distributed_push_down_limit (0/1 = dis/allows WithMergeableStateAfterAggregationAndLimit
#}
{% for settings in product(
[
'prefer_localhost_replica=0',
'prefer_localhost_replica=1',
],
[
'distributed_group_by_no_merge=0',
'distributed_group_by_no_merge=1',
'distributed_group_by_no_merge=2',
],
[
'distributed_push_down_limit=1',
'distributed_push_down_limit=0',
],
) %}
{% set settings = settings | join(',') %}
select 'Used settings: {{ settings }}';
select dummy from remote('127.{1,1}', system.one) where dummy + dummy >= 0 limit 1 by dummy + dummy + 0 as l settings {{ settings }};
select dummy from (select dummy + dummy + 0 as l, dummy from remote('127.{1,1}', system.one) where dummy + dummy >= 0 limit 1 by l) settings {{ settings }};
{% endfor %}

View File

@ -0,0 +1,3 @@
0 0
0 0
0 0

View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT --stage with_mergeable_state --query 'SELECT dummy FROM system.one WHERE (dummy + dummy) >= 0 LIMIT 1 BY (dummy + dummy) + 0 AS l'
$CLICKHOUSE_CLIENT --stage with_mergeable_state_after_aggregation --query 'SELECT dummy FROM system.one WHERE (dummy + dummy) >= 0 LIMIT 1 BY (dummy + dummy) + 0 AS l'
$CLICKHOUSE_CLIENT --stage with_mergeable_state_after_aggregation_and_limit --query 'SELECT dummy FROM system.one WHERE (dummy + dummy) >= 0 LIMIT 1 BY (dummy + dummy) + 0 AS l'