Merge pull request #9923 from azat/dist-on-dist

[RFC] Distributed over distributed (v2)
This commit is contained in:
alexey-milovidov 2020-04-01 05:25:21 +03:00 committed by GitHub
commit c7afc51a52
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 208 additions and 32 deletions

View File

@ -716,6 +716,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
const Settings & settings = context->getSettingsRef();
auto & expressions = analysis_result;
auto & subqueries_for_sets = query_analyzer->getSubqueriesForSets();
bool intermediate_stage = false;
if (options.only_analyze)
{
@ -775,7 +776,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
if (from_stage == QueryProcessingStage::WithMergeableState &&
options.to_stage == QueryProcessingStage::WithMergeableState)
throw Exception("Distributed on Distributed is not supported", ErrorCodes::NOT_IMPLEMENTED);
intermediate_stage = true;
if (storage && expressions.filter_info && expressions.prewhere_info)
throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE);
@ -802,6 +803,47 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
options.to_stage > QueryProcessingStage::WithMergeableState &&
!query.group_by_with_totals && !query.group_by_with_rollup && !query.group_by_with_cube;
auto preliminary_sort = [&]()
{
/** For distributed query processing,
* if no GROUP, HAVING set,
* but there is an ORDER or LIMIT,
* then we will perform the preliminary sorting and LIMIT on the remote server.
*/
if (!expressions.second_stage && !expressions.need_aggregate && !expressions.hasHaving())
{
if (expressions.has_order_by)
executeOrder(pipeline, query_info.input_sorting_info);
if (expressions.has_order_by && query.limitLength())
executeDistinct(pipeline, false, expressions.selected_columns);
if (expressions.hasLimitBy())
{
executeExpression(pipeline, expressions.before_limit_by);
executeLimitBy(pipeline);
}
if (query.limitLength())
{
if constexpr (pipeline_with_processors)
executePreLimit(pipeline, true);
else
executePreLimit(pipeline);
}
}
};
if (intermediate_stage)
{
if (expressions.first_stage || expressions.second_stage)
throw Exception("Query with intermediate stage cannot have any other stages", ErrorCodes::LOGICAL_ERROR);
preliminary_sort();
if (expressions.need_aggregate)
executeMergeAggregated(pipeline, aggregate_overflow_row, aggregate_final);
}
if (expressions.first_stage)
{
if (expressions.hasFilter())
@ -900,33 +942,7 @@ void InterpreterSelectQuery::executeImpl(TPipeline & pipeline, const BlockInputS
executeDistinct(pipeline, true, expressions.selected_columns);
}
/** For distributed query processing,
* if no GROUP, HAVING set,
* but there is an ORDER or LIMIT,
* then we will perform the preliminary sorting and LIMIT on the remote server.
*/
if (!expressions.second_stage && !expressions.need_aggregate && !expressions.hasHaving())
{
if (expressions.has_order_by)
executeOrder(pipeline, query_info.input_sorting_info);
if (expressions.has_order_by && query.limitLength())
executeDistinct(pipeline, false, expressions.selected_columns);
if (expressions.hasLimitBy())
{
executeExpression(pipeline, expressions.before_limit_by);
executeLimitBy(pipeline);
}
if (query.limitLength())
{
if constexpr (pipeline_with_processors)
executePreLimit(pipeline, true);
else
executePreLimit(pipeline);
}
}
preliminary_sort();
// If there is no global subqueries, we can run subqueries only when receive them on server.
if (!query_analyzer->hasGlobalSubqueries() && !subqueries_for_sets.empty())

View File

@ -0,0 +1,84 @@
DISTINCT ORDER BY
0
1
2
GROUP BY ORDER BY
0
1
2
GROUP BY ORDER BY LIMIT
0
HAVING
1
1
1
1
GROUP BY HAVING
1
ORDER BY
0
0
0
0
1
1
1
1
2
2
2
2
ORDER BY LIMIT
0
ORDER BY LIMIT BY
0
1
2
cluster() ORDER BY
0
0
0
0
0
0
0
0
1
1
1
1
1
1
1
1
2
2
2
2
2
2
2
2
cluster() GROUP BY ORDER BY
0
1
2
LEFT JOIN
0 0
1 1
RIGHT JOIN
0 0
1 1
0 2
GROUP BY ORDER BY group_by_two_level_threshold
0
1
2
GROUP BY ORDER BY distributed_aggregation_memory_efficient
0
1
2
GROUP BY ORDER BY distributed_aggregation_memory_efficient/group_by_two_level_threshold
0
1
2

View File

@ -0,0 +1,58 @@
create table if not exists data_01223 (key Int) Engine=Memory();
create table if not exists dist_layer_01223 as data_01223 Engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01223);
create table if not exists dist_01223 as data_01223 Engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01223);
select * from dist_01223;
insert into data_01223 select * from numbers(3);
select 'DISTINCT ORDER BY';
select distinct * from dist_01223 order by key;
select 'GROUP BY ORDER BY';
select * from dist_01223 group by key order by key;
select 'GROUP BY ORDER BY LIMIT';
select * from dist_01223 group by key order by key limit 1;
select 'HAVING';
select * from dist_01223 having key = 1;
select 'GROUP BY HAVING';
select * from dist_01223 group by key having key = 1;
select 'ORDER BY';
select * from dist_01223 order by key;
select 'ORDER BY LIMIT';
select * from dist_01223 order by key limit 1;
select 'ORDER BY LIMIT BY';
select * from dist_01223 order by key limit 1 by key;
select 'cluster() ORDER BY';
select * from cluster(test_cluster_two_shards, currentDatabase(), dist_01223) order by key;
select 'cluster() GROUP BY ORDER BY';
select * from cluster(test_cluster_two_shards, currentDatabase(), dist_01223) group by key order by key;
select 'LEFT JOIN';
select toInt32(number) key, b.key from numbers(2) a left join (select distinct * from dist_01223) b using key order by b.key;
select 'RIGHT JOIN';
select toInt32(number) key, b.key from numbers(2) a right join (select distinct * from dist_01223) b using key order by b.key;
-- more data for GROUP BY
insert into data_01223 select number%3 from numbers(30);
-- group_by_two_level_threshold
select 'GROUP BY ORDER BY group_by_two_level_threshold';
select * from dist_01223 group by key order by key settings
group_by_two_level_threshold=1,
group_by_two_level_threshold_bytes=1;
-- distributed_aggregation_memory_efficient
select 'GROUP BY ORDER BY distributed_aggregation_memory_efficient';
select * from dist_01223 group by key order by key settings
distributed_aggregation_memory_efficient=1;
-- distributed_aggregation_memory_efficient/group_by_two_level_threshold
select 'GROUP BY ORDER BY distributed_aggregation_memory_efficient/group_by_two_level_threshold';
select * from dist_01223 group by key order by key settings
group_by_two_level_threshold=1,
group_by_two_level_threshold_bytes=1,
distributed_aggregation_memory_efficient=1;
drop table dist_01223;
drop table dist_layer_01223;
drop table data_01223;

View File

@ -0,0 +1,4 @@
GLOBAL IN distributed_group_by_no_merge
1
GLOBAL IN
1

View File

@ -0,0 +1,18 @@
create table if not exists data_01224 (key Int) Engine=Memory();
create table if not exists dist_layer_01224 as data_01224 Engine=Distributed(test_cluster_two_shards, currentDatabase(), data_01224);
create table if not exists dist_01224 as data_01224 Engine=Distributed(test_cluster_two_shards, currentDatabase(), dist_layer_01224);
select * from dist_01224;
insert into data_01224 select * from numbers(3);
-- "Table expression is undefined, Method: ExpressionAnalyzer::interpretSubquery"
select 'GLOBAL IN distributed_group_by_no_merge';
select distinct * from dist_01224 where key global in (1) settings distributed_group_by_no_merge=1;
-- requires #9923
select 'GLOBAL IN';
select distinct * from dist_01224 where key global in (1);
drop table dist_01224;
drop table dist_layer_01224;
drop table data_01224;

View File

@ -96,8 +96,6 @@ To view your clusters, use the system.clusters table.
The Distributed engine allows working with a cluster like a local server. However, the cluster is inextensible: you must write its configuration in the server config file (even better, for all the clusters servers).
There is no support for Distributed tables that look at other Distributed tables (except in cases when a Distributed table only has one shard). As an alternative, make the Distributed table look at the “final” tables.
The Distributed engine requires writing clusters to the config file. Clusters from the config file are updated on the fly, without restarting the server. If you need to send a query to an unknown set of shards and replicas each time, you dont need to create a Distributed table use the remote table function instead. See the section [Table functions](../../query_language/table_functions/index.md).
There are two methods for writing data to a cluster:

View File

@ -78,8 +78,6 @@ logs - имя кластера в конфигурационном файле с
Движок Distributed позволяет работать с кластером, как с локальным сервером. При этом, кластер является неэластичным: вы должны прописать его конфигурацию в конфигурационный файл сервера (лучше всех серверов кластера).
Не поддерживаются Distributed таблицы, смотрящие на другие Distributed таблицы (за исключением случаев, когда у Distributed таблицы всего один шард). Вместо этого, сделайте так, чтобы Distributed таблица смотрела на «конечные» таблицы.
Как видно, движок Distributed требует прописывания кластера в конфигурационный файл; кластера из конфигурационного файла обновляются налету, без перезапуска сервера. Если вам необходимо каждый раз отправлять запрос на неизвестный набор шардов и реплик, вы можете не создавать Distributed таблицу, а воспользоваться табличной функцией remote. Смотрите раздел [Табличные функции](../../query_language/table_functions/index.md).
Есть два способа записывать данные на кластер: