Merge pull request #26466 from azat/optimize-dist-select

Rework SELECT from Distributed optimizations
This commit is contained in:
alexey-milovidov 2021-08-08 03:59:32 +03:00 committed by GitHub
commit c5207fc237
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 311 additions and 136 deletions

View File

@ -1346,10 +1346,10 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
/** If there was more than one stream,
* then DISTINCT needs to be performed once again after merging all streams.
*/
if (query.distinct)
if (!from_aggregation_stage && query.distinct)
executeDistinct(query_plan, false, expressions.selected_columns, false);
if (expressions.hasLimitBy())
if (!from_aggregation_stage && expressions.hasLimitBy())
{
executeExpression(query_plan, expressions.before_limit_by, "Before LIMIT BY");
executeLimitBy(query_plan);

View File

@ -284,86 +284,6 @@ void replaceConstantExpressions(
visitor.visit(node);
}
/// This is the implementation of optimize_distributed_group_by_sharding_key.
/// It returns up to which stage the query can be processed on a shard, which
/// is one of the following:
/// - QueryProcessingStage::Complete
/// - QueryProcessingStage::WithMergeableStateAfterAggregation
/// - QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit
/// - none (in this case regular WithMergeableState should be used)
std::optional<QueryProcessingStage::Enum> getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, bool extremes, const Names & sharding_key_columns)
{
const auto & select = query_info.query->as<ASTSelectQuery &>();
auto sharding_block_has = [&](const auto & exprs) -> bool
{
std::unordered_set<std::string> expr_columns;
for (auto & expr : exprs)
{
auto id = expr->template as<ASTIdentifier>();
if (!id)
continue;
expr_columns.emplace(id->name());
}
for (const auto & column : sharding_key_columns)
{
if (!expr_columns.contains(column))
return false;
}
return true;
};
// GROUP BY qualifiers
// - TODO: WITH TOTALS can be implemented
// - TODO: WITH ROLLUP can be implemented (I guess)
if (select.group_by_with_totals || select.group_by_with_rollup || select.group_by_with_cube)
return {};
// Window functions are not supported.
if (query_info.has_window)
return {};
// TODO: extremes support can be implemented
if (extremes)
return {};
// DISTINCT
if (select.distinct)
{
if (!sharding_block_has(select.select()->children))
return {};
}
// GROUP BY
const ASTPtr group_by = select.groupBy();
if (!group_by)
{
if (!select.distinct)
return {};
}
else
{
if (!sharding_block_has(group_by->children))
return {};
}
// ORDER BY
const ASTPtr order_by = select.orderBy();
if (order_by)
return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
// LIMIT BY
// LIMIT
// OFFSET
if (select.limitBy() || select.limitLength() || select.limitOffset())
return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
// Only simple SELECT FROM GROUP BY sharding_key can use Complete state.
return QueryProcessingStage::Complete;
}
size_t getClusterQueriedNodes(const Settings & settings, const ClusterPtr & cluster)
{
size_t num_local_shards = cluster->getLocalShardCount();
@ -527,13 +447,12 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
{
/// NOTE: distributed_group_by_no_merge=1 does not respect distributed_push_down_limit
/// (since in this case queries processed separately and the initiator is just a proxy in this case).
if (to_stage != QueryProcessingStage::Complete)
throw Exception("Queries with distributed_group_by_no_merge=1 should be processed to Complete stage", ErrorCodes::LOGICAL_ERROR);
return QueryProcessingStage::Complete;
}
}
if (settings.distributed_push_down_limit)
return QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
/// Nested distributed query cannot return Complete stage,
/// since the parent query need to aggregate the results after.
if (to_stage == QueryProcessingStage::WithMergeableState)
@ -542,24 +461,108 @@ QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
/// If there is only one node, the query can be fully processed by the
/// shard, initiator will work as a proxy only.
if (getClusterQueriedNodes(settings, cluster) == 1)
return QueryProcessingStage::Complete;
if (settings.optimize_skip_unused_shards &&
settings.optimize_distributed_group_by_sharding_key &&
has_sharding_key &&
(settings.allow_nondeterministic_optimize_skip_unused_shards || sharding_key_is_deterministic))
{
auto stage = getOptimizedQueryProcessingStage(query_info, settings.extremes, sharding_key_expr->getRequiredColumns());
if (stage)
{
LOG_DEBUG(log, "Force processing stage to {}", QueryProcessingStage::toString(*stage));
return *stage;
}
/// In case the query was processed to
/// WithMergeableStateAfterAggregation/WithMergeableStateAfterAggregationAndLimit
/// (which are greater the Complete stage)
/// we cannot return Complete (will break aliases and similar),
/// relevant for Distributed over Distributed
return std::max(to_stage, QueryProcessingStage::Complete);
}
auto optimized_stage = getOptimizedQueryProcessingStage(query_info, settings);
if (optimized_stage)
{
if (*optimized_stage == QueryProcessingStage::Complete)
return std::min(to_stage, *optimized_stage);
return *optimized_stage;
}
return QueryProcessingStage::WithMergeableState;
}
std::optional<QueryProcessingStage::Enum> StorageDistributed::getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, const Settings & settings) const
{
bool optimize_sharding_key_aggregation =
settings.optimize_skip_unused_shards &&
settings.optimize_distributed_group_by_sharding_key &&
has_sharding_key &&
(settings.allow_nondeterministic_optimize_skip_unused_shards || sharding_key_is_deterministic);
QueryProcessingStage::Enum default_stage = QueryProcessingStage::WithMergeableStateAfterAggregation;
if (settings.distributed_push_down_limit)
default_stage = QueryProcessingStage::WithMergeableStateAfterAggregationAndLimit;
const auto & select = query_info.query->as<ASTSelectQuery &>();
auto expr_contains_sharding_key = [&](const auto & exprs) -> bool
{
std::unordered_set<std::string> expr_columns;
for (auto & expr : exprs)
{
auto id = expr->template as<ASTIdentifier>();
if (!id)
continue;
expr_columns.emplace(id->name());
}
for (const auto & column : sharding_key_expr->getRequiredColumns())
{
if (!expr_columns.contains(column))
return false;
}
return true;
};
// GROUP BY qualifiers
// - TODO: WITH TOTALS can be implemented
// - TODO: WITH ROLLUP can be implemented (I guess)
if (select.group_by_with_totals || select.group_by_with_rollup || select.group_by_with_cube)
return {};
// Window functions are not supported.
if (query_info.has_window)
return {};
// TODO: extremes support can be implemented
if (settings.extremes)
return {};
// DISTINCT
if (select.distinct)
{
if (!optimize_sharding_key_aggregation || !expr_contains_sharding_key(select.select()->children))
return {};
}
// GROUP BY
const ASTPtr group_by = select.groupBy();
if (!query_info.syntax_analyzer_result->aggregates.empty() || group_by)
{
if (!optimize_sharding_key_aggregation || !group_by || !expr_contains_sharding_key(group_by->children))
return {};
}
// LIMIT BY
if (const ASTPtr limit_by = select.limitBy())
{
if (!optimize_sharding_key_aggregation || !expr_contains_sharding_key(limit_by->children))
return {};
}
// ORDER BY
if (const ASTPtr order_by = select.orderBy())
return default_stage;
// LIMIT BY
// LIMIT
// OFFSET
if (select.limitLength() || select.limitOffset())
return default_stage;
// Only simple SELECT FROM GROUP BY sharding_key can use Complete state.
return QueryProcessingStage::Complete;
}
Pipe StorageDistributed::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,

View File

@ -177,6 +177,24 @@ private:
ClusterPtr
skipUnusedShards(ClusterPtr cluster, const ASTPtr & query_ptr, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) const;
/// This method returns optimal query processing stage.
///
/// Here is the list of stages (from the less optimal to more optimal):
/// - WithMergeableState
/// - WithMergeableStateAfterAggregation
/// - WithMergeableStateAfterAggregationAndLimit
/// - Complete
///
/// Some simple queries w/o GROUP BY/DISTINCT can use more optimal stage.
///
/// Also in case of optimize_distributed_group_by_sharding_key=1 the queries
/// with GROUP BY/DISTINCT sharding_key can also use more optimal stage.
/// (see also optimize_skip_unused_shards/allow_nondeterministic_optimize_skip_unused_shards)
///
/// @return QueryProcessingStage or empty std::optoinal
/// (in this case regular WithMergeableState should be used)
std::optional<QueryProcessingStage::Enum> getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, const Settings & settings) const;
size_t getRandomShardIndex(const Cluster::ShardsInfo & shards);
const DistributedSettings & getDistributedSettingsRef() const { return distributed_settings; }

View File

@ -25,6 +25,8 @@ ORDER BY LIMIT
LIMIT BY
0
1
0
1
LIMIT BY LIMIT
0
GROUP BY ORDER BY

View File

@ -57,7 +57,9 @@ LIMIT
1 0
LIMIT OFFSET
1 1
OFFSET
OFFSET distributed_push_down_limit=0
1 1
OFFSET distributed_push_down_limit=1
1 1
1 0
1 1
@ -65,6 +67,8 @@ WHERE LIMIT OFFSET
1 1
LIMIT BY 1
1 0
1 0
1 1
1 1
GROUP BY (Distributed-over-Distributed)
4 0

View File

@ -60,8 +60,10 @@ select 'LIMIT';
select count(), * from dist_01247 group by number limit 1;
select 'LIMIT OFFSET';
select count(), * from dist_01247 group by number limit 1 offset 1;
select 'OFFSET';
select count(), * from dist_01247 group by number offset 1;
select 'OFFSET distributed_push_down_limit=0';
select count(), * from dist_01247 group by number offset 1 settings distributed_push_down_limit=0;
select 'OFFSET distributed_push_down_limit=1';
select count(), * from dist_01247 group by number offset 1 settings distributed_push_down_limit=1;
-- this will emulate different data on for different shards
select 'WHERE LIMIT OFFSET';
select count(), * from dist_01247 where number = _shard_num-1 group by number order by number limit 1 offset 1;

View File

@ -12,18 +12,6 @@ distributed_push_down_limit=1
8
9
40 40
auto-distributed_push_down_limit
0
1
2
3
4
5
6
7
8
9
40 40
distributed_push_down_limit=1 with OFFSET
97
96

View File

@ -86,9 +86,11 @@ function test_distributed_push_down_limit_0()
function test_distributed_push_down_limit_1()
{
local args=(
"remote('127.{2,3}', $CLICKHOUSE_DATABASE, data_01814)"
"remote('127.{2,3}', $CLICKHOUSE_DATABASE, data_01814, key)"
0 # offset
--distributed_push_down_limit 1
--optimize_skip_unused_shards 1
--optimize_distributed_group_by_sharding_key 1
)
test_distributed_push_down_limit_with_query_log "${args[@]}"
}
@ -97,22 +99,11 @@ function test_distributed_push_down_limit_1_offset()
{
local settings_and_opts=(
--distributed_push_down_limit 1
)
$CLICKHOUSE_CLIENT "${settings_and_opts[@]}" -q "select * from remote('127.{2,3}', $CLICKHOUSE_DATABASE, data_01814) group by key order by key desc limit 5, 10"
}
function test_auto_distributed_push_down_limit()
{
local args=(
dist_01814
0 # offset
--optimize_skip_unused_shards 1
--optimize_distributed_group_by_sharding_key 1
--prefer_localhost_replica 0
--distributed_push_down_limit 0
)
test_distributed_push_down_limit_with_query_log "${args[@]}"
$CLICKHOUSE_CLIENT "${settings_and_opts[@]}" -q "select * from remote('127.{2,3}', $CLICKHOUSE_DATABASE, data_01814, key) group by key order by key desc limit 5, 10"
}
function main()
@ -151,16 +142,6 @@ function main()
done
echo "$out"
echo 'auto-distributed_push_down_limit'
for ((i = 0; i < max_tries; ++i)); do
out=$(test_auto_distributed_push_down_limit)
out_lines=( $out )
if [[ ${#out_lines[@]} -gt 2 ]] && [[ ${out_lines[-1]} = 40 ]] && [[ ${out_lines[-2]} = 40 ]]; then
break
fi
done
echo "$out"
echo 'distributed_push_down_limit=1 with OFFSET'
test_distributed_push_down_limit_1_offset
}

View File

@ -0,0 +1,32 @@
-- { echo }
explain select * from remote('127.{1,2}', view(select * from numbers(1e6))) order by number limit 10 settings distributed_push_down_limit=0;
Expression (Projection)
Limit (preliminary LIMIT)
MergingSorted (Merge sorted streams after aggregation stage for ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
MergingSorted (Merge sorted streams for ORDER BY)
MergeSorting (Merge sorted blocks for ORDER BY)
PartialSorting (Sort each block for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select * from remote('127.{1,2}', view(select * from numbers(1e6))) order by number limit 10 settings distributed_push_down_limit=1;
Expression (Projection)
Limit (preliminary LIMIT)
MergingSorted (Merge sorted streams after aggregation stage for ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
Limit (preliminary LIMIT)
MergingSorted (Merge sorted streams for ORDER BY)
MergeSorting (Merge sorted blocks for ORDER BY)
PartialSorting (Sort each block for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)

View File

@ -0,0 +1,3 @@
-- { echo }
explain select * from remote('127.{1,2}', view(select * from numbers(1e6))) order by number limit 10 settings distributed_push_down_limit=0;
explain select * from remote('127.{1,2}', view(select * from numbers(1e6))) order by number limit 10 settings distributed_push_down_limit=1;

View File

@ -0,0 +1,115 @@
-- { echo }
explain select distinct k1 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- not optimized
Expression (Projection)
Distinct
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
Distinct (Preliminary DISTINCT)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- optimized
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
Expression (Projection)
Distinct
Distinct (Preliminary DISTINCT)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- not optimized
Expression (Projection)
LimitBy
Expression (Before LIMIT BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
LimitBy
Expression ((Before LIMIT BY + Before ORDER BY))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- optimized
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
Expression (Projection)
LimitBy
Expression ((Before LIMIT BY + Before ORDER BY))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select distinct k1 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- not optimized
Expression (Projection)
Distinct
MergingSorted (Merge sorted streams for ORDER BY, without aggregation)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
MergingSorted (Merge sorted streams for ORDER BY)
MergeSorting (Merge sorted blocks for ORDER BY)
PartialSorting (Sort each block for ORDER BY)
Distinct (Preliminary DISTINCT)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized
Expression (Projection)
MergingSorted (Merge sorted streams after aggregation stage for ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
Distinct
MergingSorted (Merge sorted streams for ORDER BY)
MergeSorting (Merge sorted blocks for ORDER BY)
PartialSorting (Sort each block for ORDER BY)
Distinct (Preliminary DISTINCT)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- not optimized
Expression (Projection)
LimitBy
Expression (Before LIMIT BY)
MergingSorted (Merge sorted streams for ORDER BY, without aggregation)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
LimitBy
Expression (Before LIMIT BY)
MergingSorted (Merge sorted streams for ORDER BY)
MergeSorting (Merge sorted blocks for ORDER BY)
PartialSorting (Sort each block for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)
explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized
Expression (Projection)
MergingSorted (Merge sorted streams after aggregation stage for ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Union
LimitBy
Expression (Before LIMIT BY)
MergingSorted (Merge sorted streams for ORDER BY)
MergeSorting (Merge sorted blocks for ORDER BY)
PartialSorting (Sort each block for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
Expression ((Convert VIEW subquery result to VIEW table structure + (Materialize constants after VIEW subquery + (Projection + Before ORDER BY))))
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromStorage (SystemNumbers)
ReadFromRemote (Read from remote replica)

View File

@ -0,0 +1,13 @@
set optimize_skip_unused_shards=1;
set optimize_distributed_group_by_sharding_key=1;
-- { echo }
explain select distinct k1 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- not optimized
explain select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- optimized
explain select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- not optimized
explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)); -- optimized
explain select distinct k1 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- not optimized
explain select distinct k1, k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized
explain select distinct on (k1) k2 from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- not optimized
explain select distinct on (k1, k2) v from remote('127.{1,2}', view(select 1 k1, 2 k2, 3 v from numbers(2)), cityHash64(k1, k2)) order by v; -- optimized

View File

@ -0,0 +1,8 @@
-- { echo }
select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=0;
0
select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=1;
0
select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_group_by_no_merge=1;
0
0

View File

@ -0,0 +1,6 @@
drop table if exists dist;
create table dist as system.one engine=Distributed('test_shard_localhost', system, one);
-- { echo }
select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=0;
select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_push_down_limit=1;
select dummy as foo from remote('127.{2,3}', currentDatabase(), dist) limit 1 settings prefer_localhost_replica=0, distributed_group_by_no_merge=1;