From 54ad0867530b71447c3e916dec77fefcd9bed71b Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov Date: Fri, 23 Apr 2021 13:58:54 +0300 Subject: [PATCH] fix for distributed --- src/Interpreters/InterpreterSelectQuery.cpp | 95 +++++++++++++++++-- src/Processors/QueryPlan/QueryPlan.cpp | 4 + src/Storages/StorageDistributed.cpp | 3 + ...568_window_functions_distributed.reference | 42 ++++++++ .../01568_window_functions_distributed.sql | 16 ++++ 5 files changed, 151 insertions(+), 9 deletions(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 06922527eaa..870c41fc24e 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -279,6 +279,9 @@ InterpreterSelectQuery::InterpreterSelectQuery( , log(&Poco::Logger::get("InterpreterSelectQuery")) , metadata_snapshot(metadata_snapshot_) { + fmt::print(stderr, "InterpreterSelectQuery @ {} created at \n{}\n", + static_cast(this), StackTrace().toString()); + checkStackSize(); initSettings(); @@ -627,6 +630,10 @@ Block InterpreterSelectQuery::getSampleBlockImpl() if (!analysis_result.need_aggregate) { // What's the difference with selected_columns? + if (analysis_result.before_window) + { + return analysis_result.before_window->getResultColumns(); + } return analysis_result.before_order_by->getResultColumns(); } @@ -655,6 +662,11 @@ Block InterpreterSelectQuery::getSampleBlockImpl() if (options.to_stage == QueryProcessingStage::Enum::WithMergeableStateAfterAggregation) { // What's the difference with selected_columns? + if (analysis_result.before_window) + { + return analysis_result.before_window->getResultColumns(); + } + return analysis_result.before_order_by->getResultColumns(); } @@ -995,7 +1007,10 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu * but there is an ORDER or LIMIT, * then we will perform the preliminary sorting and LIMIT on the remote server. */ - if (!expressions.second_stage && !expressions.need_aggregate && !expressions.hasHaving()) + if (!expressions.second_stage + && !expressions.need_aggregate + && !expressions.hasHaving() + && !expressions.has_window) { if (expressions.has_order_by) executeOrder(query_plan, query_info.input_order_info); @@ -1105,12 +1120,41 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu /// We need to reset input order info, so that executeOrder can't use it query_info.input_order_info.reset(); } + + // Now we must execute: + // 1) expressions before window functions, + // 2) window functions, + // 3) expressions after window functions, + // 4) preliminary distinct. + // This code decides which part we execute on shard (first_stage) + // and which part on initiator (second_stage). See also the counterpart + // code for "second_stage" that has to execute the rest. + if (expressions.need_aggregate) + { + // We have aggregation, so we can't execute any later-stage + // expressions on shards, neither "before window functions" nor + // "before ORDER BY". + } else { - executeExpression(query_plan, expressions.before_window, "Before window functions"); - executeWindow(query_plan); - executeExpression(query_plan, expressions.before_order_by, "Before ORDER BY"); - executeDistinct(query_plan, true, expressions.selected_columns, true); + // We don't have aggregation. + // Window functions must be executed on initiator (second_stage). + // ORDER BY and DISTINCT might depend on them, so if we have + // window functions, we can't execute ORDER BY and DISTINCT + // now, on shard (first_stage). + if (query_analyzer->hasWindow()) + { + executeExpression(query_plan, expressions.before_window, "Before window functions"); + } + else + { + // We don't have window functions, so we can execute the + // expressions before ORDER BY and the preliminary DISTINCT + // now, on shards (first_stage). + assert(!expressions.before_window); + executeExpression(query_plan, expressions.before_order_by, "Before ORDER BY"); + executeDistinct(query_plan, true, expressions.selected_columns, true); + } } preliminary_sort(); @@ -1154,16 +1198,38 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu } else if (expressions.hasHaving()) executeHaving(query_plan, expressions.before_having); + } + else if (query.group_by_with_totals || query.group_by_with_rollup || query.group_by_with_cube) + throw Exception("WITH TOTALS, ROLLUP or CUBE are not supported without aggregation", ErrorCodes::NOT_IMPLEMENTED); + // Now we must execute: + // 1) expressions before window functions + // 2) window functions + // 3) expressions after window functions. + // Some of these were already executed at the shards (first_stage), + // see the counterpart code and comments there. + if (expressions.need_aggregate) + { executeExpression(query_plan, expressions.before_window, "Before window functions"); executeWindow(query_plan); executeExpression(query_plan, expressions.before_order_by, "Before ORDER BY"); - executeDistinct(query_plan, true, expressions.selected_columns, true); - } - else if (query.group_by_with_totals || query.group_by_with_rollup || query.group_by_with_cube) - throw Exception("WITH TOTALS, ROLLUP or CUBE are not supported without aggregation", ErrorCodes::NOT_IMPLEMENTED); + else + { + if (query_analyzer->hasWindow()) + { + executeWindow(query_plan); + executeExpression(query_plan, expressions.before_order_by, "Before ORDER BY"); + } + else + { + // Neither aggregation nor windows, all expressions before + // ORDER BY executed on shards. + } + } + + executeDistinct(query_plan, true, expressions.selected_columns, true); if (expressions.has_order_by) { @@ -1720,9 +1786,20 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc if (!options.ignore_quota && (options.to_stage == QueryProcessingStage::Complete)) quota = context->getQuota(); + fmt::print(stderr, "required columns for storage::read:\n"); + for (const auto & c : required_columns) + { + fmt::print(stderr, "{}\n", c); + } + fmt::print(stderr, "processing stage for storage::read is {}\n", + processing_stage); storage->read(query_plan, required_columns, metadata_snapshot, query_info, context, processing_stage, max_block_size, max_streams); + WriteBufferFromOwnString ss; + query_plan.explainPlan(ss, {true}); + fmt::print(stderr, "query plan after storage::read:'\n{}'\n", ss.str()); + if (context->hasQueryContext() && !options.is_internal) { auto local_storage_id = storage->getStorageID(); diff --git a/src/Processors/QueryPlan/QueryPlan.cpp b/src/Processors/QueryPlan/QueryPlan.cpp index ad3649385fd..36b737dc091 100644 --- a/src/Processors/QueryPlan/QueryPlan.cpp +++ b/src/Processors/QueryPlan/QueryPlan.cpp @@ -136,6 +136,10 @@ QueryPipelinePtr QueryPlan::buildQueryPipeline( const QueryPlanOptimizationSettings & optimization_settings, const BuildQueryPipelineSettings & build_pipeline_settings) { + WriteBufferFromOwnString ss; + explainPlan(ss, {true}); + fmt::print(stderr, "plan at buildQueryPipeline:'\n{}'\n", ss.str()); + checkInitialized(); optimize(optimization_settings); diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 3d96796a79b..c0d00b268bc 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -553,6 +553,9 @@ void StorageDistributed::read( Block header = InterpreterSelectQuery(query_info.query, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock(); + fmt::print(stderr, "header in storage::read is '{}'\n", + header.dumpStructure()); + /// Return directly (with correct header) if no shard to query. if (query_info.getCluster()->getShardsInfo().empty()) { diff --git a/tests/queries/0_stateless/01568_window_functions_distributed.reference b/tests/queries/0_stateless/01568_window_functions_distributed.reference index b441189303d..7355cfc8dd6 100644 --- a/tests/queries/0_stateless/01568_window_functions_distributed.reference +++ b/tests/queries/0_stateless/01568_window_functions_distributed.reference @@ -3,3 +3,45 @@ set allow_experimental_window_functions = 1; select row_number() over (order by dummy) from (select * from remote('127.0.0.{1,2}', system, one)); 1 2 +select row_number() over (order by dummy) from remote('127.0.0.{1,2}', system, one); +1 +2 +select max(identity(dummy + 1)) over () from remote('127.0.0.{1,2}', system, one); +1 +1 +drop table if exists t_01568; +create table t_01568 engine Log as select intDiv(number, 3) p, number from numbers(9); +select sum(number) over w, max(number) over w from t_01568 window w as (partition by p); +3 2 +3 2 +3 2 +12 5 +12 5 +12 5 +21 8 +21 8 +21 8 +select sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p); +6 2 +6 2 +6 2 +6 2 +6 2 +6 2 +24 5 +24 5 +24 5 +24 5 +24 5 +24 5 +42 8 +42 8 +42 8 +42 8 +42 8 +42 8 +select distinct sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p); +6 2 +24 5 +42 8 +drop table t_01568; diff --git a/tests/queries/0_stateless/01568_window_functions_distributed.sql b/tests/queries/0_stateless/01568_window_functions_distributed.sql index 754b996e00c..277c951e898 100644 --- a/tests/queries/0_stateless/01568_window_functions_distributed.sql +++ b/tests/queries/0_stateless/01568_window_functions_distributed.sql @@ -2,3 +2,19 @@ set allow_experimental_window_functions = 1; select row_number() over (order by dummy) from (select * from remote('127.0.0.{1,2}', system, one)); + +select row_number() over (order by dummy) from remote('127.0.0.{1,2}', system, one); + +select max(identity(dummy + 1)) over () from remote('127.0.0.{1,2}', system, one); + +drop table if exists t_01568; + +create table t_01568 engine Log as select intDiv(number, 3) p, number from numbers(9); + +select sum(number) over w, max(number) over w from t_01568 window w as (partition by p); + +select sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p); + +select distinct sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p); + +drop table t_01568;