fix for distributed

This commit is contained in:
Alexander Kuzmenkov 2021-04-23 13:58:54 +03:00
parent 0b990c9519
commit 54ad086753
5 changed files with 151 additions and 9 deletions

View File

@ -279,6 +279,9 @@ InterpreterSelectQuery::InterpreterSelectQuery(
, log(&Poco::Logger::get("InterpreterSelectQuery"))
, metadata_snapshot(metadata_snapshot_)
{
fmt::print(stderr, "InterpreterSelectQuery @ {} created at \n{}\n",
static_cast<const void *>(this), StackTrace().toString());
checkStackSize();
initSettings();
@ -627,6 +630,10 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
if (!analysis_result.need_aggregate)
{
// What's the difference with selected_columns?
if (analysis_result.before_window)
{
return analysis_result.before_window->getResultColumns();
}
return analysis_result.before_order_by->getResultColumns();
}
@ -655,6 +662,11 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
if (options.to_stage == QueryProcessingStage::Enum::WithMergeableStateAfterAggregation)
{
// What's the difference with selected_columns?
if (analysis_result.before_window)
{
return analysis_result.before_window->getResultColumns();
}
return analysis_result.before_order_by->getResultColumns();
}
@ -995,7 +1007,10 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
* but there is an ORDER or LIMIT,
* then we will perform the preliminary sorting and LIMIT on the remote server.
*/
if (!expressions.second_stage && !expressions.need_aggregate && !expressions.hasHaving())
if (!expressions.second_stage
&& !expressions.need_aggregate
&& !expressions.hasHaving()
&& !expressions.has_window)
{
if (expressions.has_order_by)
executeOrder(query_plan, query_info.input_order_info);
@ -1105,12 +1120,41 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
/// We need to reset input order info, so that executeOrder can't use it
query_info.input_order_info.reset();
}
// Now we must execute:
// 1) expressions before window functions,
// 2) window functions,
// 3) expressions after window functions,
// 4) preliminary distinct.
// This code decides which part we execute on shard (first_stage)
// and which part on initiator (second_stage). See also the counterpart
// code for "second_stage" that has to execute the rest.
if (expressions.need_aggregate)
{
// We have aggregation, so we can't execute any later-stage
// expressions on shards, neither "before window functions" nor
// "before ORDER BY".
}
else
{
executeExpression(query_plan, expressions.before_window, "Before window functions");
executeWindow(query_plan);
executeExpression(query_plan, expressions.before_order_by, "Before ORDER BY");
executeDistinct(query_plan, true, expressions.selected_columns, true);
// We don't have aggregation.
// Window functions must be executed on initiator (second_stage).
// ORDER BY and DISTINCT might depend on them, so if we have
// window functions, we can't execute ORDER BY and DISTINCT
// now, on shard (first_stage).
if (query_analyzer->hasWindow())
{
executeExpression(query_plan, expressions.before_window, "Before window functions");
}
else
{
// We don't have window functions, so we can execute the
// expressions before ORDER BY and the preliminary DISTINCT
// now, on shards (first_stage).
assert(!expressions.before_window);
executeExpression(query_plan, expressions.before_order_by, "Before ORDER BY");
executeDistinct(query_plan, true, expressions.selected_columns, true);
}
}
preliminary_sort();
@ -1154,16 +1198,38 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
}
else if (expressions.hasHaving())
executeHaving(query_plan, expressions.before_having);
}
else if (query.group_by_with_totals || query.group_by_with_rollup || query.group_by_with_cube)
throw Exception("WITH TOTALS, ROLLUP or CUBE are not supported without aggregation", ErrorCodes::NOT_IMPLEMENTED);
// Now we must execute:
// 1) expressions before window functions
// 2) window functions
// 3) expressions after window functions.
// Some of these were already executed at the shards (first_stage),
// see the counterpart code and comments there.
if (expressions.need_aggregate)
{
executeExpression(query_plan, expressions.before_window,
"Before window functions");
executeWindow(query_plan);
executeExpression(query_plan, expressions.before_order_by, "Before ORDER BY");
executeDistinct(query_plan, true, expressions.selected_columns, true);
}
else if (query.group_by_with_totals || query.group_by_with_rollup || query.group_by_with_cube)
throw Exception("WITH TOTALS, ROLLUP or CUBE are not supported without aggregation", ErrorCodes::NOT_IMPLEMENTED);
else
{
if (query_analyzer->hasWindow())
{
executeWindow(query_plan);
executeExpression(query_plan, expressions.before_order_by, "Before ORDER BY");
}
else
{
// Neither aggregation nor windows, all expressions before
// ORDER BY executed on shards.
}
}
executeDistinct(query_plan, true, expressions.selected_columns, true);
if (expressions.has_order_by)
{
@ -1720,9 +1786,20 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
if (!options.ignore_quota && (options.to_stage == QueryProcessingStage::Complete))
quota = context->getQuota();
fmt::print(stderr, "required columns for storage::read:\n");
for (const auto & c : required_columns)
{
fmt::print(stderr, "{}\n", c);
}
fmt::print(stderr, "processing stage for storage::read is {}\n",
processing_stage);
storage->read(query_plan, required_columns, metadata_snapshot,
query_info, context, processing_stage, max_block_size, max_streams);
WriteBufferFromOwnString ss;
query_plan.explainPlan(ss, {true});
fmt::print(stderr, "query plan after storage::read:'\n{}'\n", ss.str());
if (context->hasQueryContext() && !options.is_internal)
{
auto local_storage_id = storage->getStorageID();

View File

@ -136,6 +136,10 @@ QueryPipelinePtr QueryPlan::buildQueryPipeline(
const QueryPlanOptimizationSettings & optimization_settings,
const BuildQueryPipelineSettings & build_pipeline_settings)
{
WriteBufferFromOwnString ss;
explainPlan(ss, {true});
fmt::print(stderr, "plan at buildQueryPipeline:'\n{}'\n", ss.str());
checkInitialized();
optimize(optimization_settings);

View File

@ -553,6 +553,9 @@ void StorageDistributed::read(
Block header =
InterpreterSelectQuery(query_info.query, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
fmt::print(stderr, "header in storage::read is '{}'\n",
header.dumpStructure());
/// Return directly (with correct header) if no shard to query.
if (query_info.getCluster()->getShardsInfo().empty())
{

View File

@ -3,3 +3,45 @@ set allow_experimental_window_functions = 1;
select row_number() over (order by dummy) from (select * from remote('127.0.0.{1,2}', system, one));
1
2
select row_number() over (order by dummy) from remote('127.0.0.{1,2}', system, one);
1
2
select max(identity(dummy + 1)) over () from remote('127.0.0.{1,2}', system, one);
1
1
drop table if exists t_01568;
create table t_01568 engine Log as select intDiv(number, 3) p, number from numbers(9);
select sum(number) over w, max(number) over w from t_01568 window w as (partition by p);
3 2
3 2
3 2
12 5
12 5
12 5
21 8
21 8
21 8
select sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p);
6 2
6 2
6 2
6 2
6 2
6 2
24 5
24 5
24 5
24 5
24 5
24 5
42 8
42 8
42 8
42 8
42 8
42 8
select distinct sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p);
6 2
24 5
42 8
drop table t_01568;

View File

@ -2,3 +2,19 @@
set allow_experimental_window_functions = 1;
select row_number() over (order by dummy) from (select * from remote('127.0.0.{1,2}', system, one));
select row_number() over (order by dummy) from remote('127.0.0.{1,2}', system, one);
select max(identity(dummy + 1)) over () from remote('127.0.0.{1,2}', system, one);
drop table if exists t_01568;
create table t_01568 engine Log as select intDiv(number, 3) p, number from numbers(9);
select sum(number) over w, max(number) over w from t_01568 window w as (partition by p);
select sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p);
select distinct sum(number) over w, max(number) over w from remote('127.0.0.{1,2}', '', t_01568) window w as (partition by p);
drop table t_01568;