Add new query processing stage WithMergeableStateAfterAggregation

Process query until the stage where the aggregate functions were
calculated and finalized.

It will be used for optimize_distributed_group_by_sharding_key.

v2: fix aliases
v3: Fix protocol ABI breakage due to WithMergeableStateAfterAggregation
    Conditions >= for QueryProcessingStage::Enum has been verified, and they
    are ok (in InterpreterSelectQuery).
This commit is contained in:
Azat Khuzhin 2020-08-15 16:57:17 +03:00
parent 8e2fba5be1
commit 4043be3121
4 changed files with 70 additions and 17 deletions

View File

@ -104,6 +104,8 @@ public:
query_processing_stage = QueryProcessingStage::FetchColumns; query_processing_stage = QueryProcessingStage::FetchColumns;
else if (stage == "with_mergeable_state") else if (stage == "with_mergeable_state")
query_processing_stage = QueryProcessingStage::WithMergeableState; query_processing_stage = QueryProcessingStage::WithMergeableState;
else if (stage == "with_mergeable_state_after_aggregation")
query_processing_stage = QueryProcessingStage::WithMergeableStateAfterAggregation;
else else
throw Exception("Unknown query processing stage: " + stage, ErrorCodes::BAD_ARGUMENTS); throw Exception("Unknown query processing stage: " + stage, ErrorCodes::BAD_ARGUMENTS);
@ -565,7 +567,7 @@ int mainEntryClickHouseBenchmark(int argc, char ** argv)
("help", "produce help message") ("help", "produce help message")
("concurrency,c", value<unsigned>()->default_value(1), "number of parallel queries") ("concurrency,c", value<unsigned>()->default_value(1), "number of parallel queries")
("delay,d", value<double>()->default_value(1), "delay between intermediate reports in seconds (set 0 to disable reports)") ("delay,d", value<double>()->default_value(1), "delay between intermediate reports in seconds (set 0 to disable reports)")
("stage", value<std::string>()->default_value("complete"), "request query processing up to specified stage: complete,fetch_columns,with_mergeable_state") ("stage", value<std::string>()->default_value("complete"), "request query processing up to specified stage: complete,fetch_columns,with_mergeable_state,with_mergeable_state_after_aggregation")
("iterations,i", value<size_t>()->default_value(0), "amount of queries to be executed") ("iterations,i", value<size_t>()->default_value(0), "amount of queries to be executed")
("timelimit,t", value<double>()->default_value(0.), "stop launch of queries after specified time limit") ("timelimit,t", value<double>()->default_value(0.), "stop launch of queries after specified time limit")
("randomize,r", value<bool>()->default_value(false), "randomize order of execution") ("randomize,r", value<bool>()->default_value(false), "randomize order of execution")

View File

@ -10,17 +10,36 @@ namespace DB
namespace QueryProcessingStage namespace QueryProcessingStage
{ {
/// Numbers matter - the later stage has a larger number. /// Numbers matter - the later stage has a larger number.
///
/// It is part of Protocol ABI, add values only to the end.
/// Also keep in mind that the code may depends on the order of fields, so be double aware when you will add new values.
enum Enum enum Enum
{ {
FetchColumns = 0, /// Only read/have been read the columns specified in the query. /// Only read/have been read the columns specified in the query.
WithMergeableState = 1, /// Until the stage where the results of processing on different servers can be combined. FetchColumns = 0,
Complete = 2, /// Completely. /// Until the stage where the results of processing on different servers can be combined.
WithMergeableState = 1,
/// Completely.
Complete = 2,
/// Until the stage where the aggregate functions were calculated and finalized.
///
/// It is used for auto distributed_group_by_no_merge optimization for distributed engine.
/// (See comments in StorageDistributed).
WithMergeableStateAfterAggregation = 3,
MAX = 4,
}; };
inline const char * toString(UInt64 stage) inline const char * toString(UInt64 stage)
{ {
static const char * data[] = { "FetchColumns", "WithMergeableState", "Complete" }; static const char * data[] =
return stage < 3 {
"FetchColumns",
"WithMergeableState",
"Complete",
"WithMergeableStateAfterAggregation",
};
return stage < MAX
? data[stage] ? data[stage]
: "Unknown stage"; : "Unknown stage";
} }

View File

@ -553,6 +553,11 @@ Block InterpreterSelectQuery::getSampleBlockImpl()
return res; return res;
} }
if (options.to_stage == QueryProcessingStage::Enum::WithMergeableStateAfterAggregation)
{
return analysis_result.before_order_and_select->getSampleBlock();
}
return analysis_result.final_projection->getSampleBlock(); return analysis_result.final_projection->getSampleBlock();
} }
@ -740,6 +745,8 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
auto & expressions = analysis_result; auto & expressions = analysis_result;
const auto & subqueries_for_sets = query_analyzer->getSubqueriesForSets(); const auto & subqueries_for_sets = query_analyzer->getSubqueriesForSets();
bool intermediate_stage = false; bool intermediate_stage = false;
bool to_aggregation_stage = false;
bool from_aggregation_stage = false;
if (options.only_analyze) if (options.only_analyze)
{ {
@ -788,6 +795,13 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
options.to_stage == QueryProcessingStage::WithMergeableState) options.to_stage == QueryProcessingStage::WithMergeableState)
intermediate_stage = true; intermediate_stage = true;
/// Is running on the initiating server during distributed processing?
if (from_stage == QueryProcessingStage::WithMergeableStateAfterAggregation)
from_aggregation_stage = true;
/// Is running on remote servers during distributed processing?
if (options.to_stage == QueryProcessingStage::WithMergeableStateAfterAggregation)
to_aggregation_stage = true;
if (storage && expressions.filter_info && expressions.prewhere_info) if (storage && expressions.filter_info && expressions.prewhere_info)
throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE); throw Exception("PREWHERE is not supported if the table is filtered by row-level security expression", ErrorCodes::ILLEGAL_PREWHERE);
@ -848,6 +862,12 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
if (expressions.need_aggregate) if (expressions.need_aggregate)
executeMergeAggregated(query_plan, aggregate_overflow_row, aggregate_final); executeMergeAggregated(query_plan, aggregate_overflow_row, aggregate_final);
} }
if (from_aggregation_stage)
{
if (intermediate_stage || expressions.first_stage || expressions.second_stage)
throw Exception("Query with after aggregation stage cannot have any other stages", ErrorCodes::LOGICAL_ERROR);
}
if (expressions.first_stage) if (expressions.first_stage)
{ {
@ -939,9 +959,13 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
executeSubqueriesInSetsAndJoins(query_plan, subqueries_for_sets); executeSubqueriesInSetsAndJoins(query_plan, subqueries_for_sets);
} }
if (expressions.second_stage) if (expressions.second_stage || from_aggregation_stage)
{ {
if (expressions.need_aggregate) if (from_aggregation_stage)
{
/// No need to aggregate anything, since this was done on remote shards.
}
else if (expressions.need_aggregate)
{ {
/// If you need to combine aggregated results from multiple servers /// If you need to combine aggregated results from multiple servers
if (!expressions.first_stage) if (!expressions.first_stage)
@ -994,7 +1018,8 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
* limiting the number of rows in each up to `offset + limit`. * limiting the number of rows in each up to `offset + limit`.
*/ */
bool has_prelimit = false; bool has_prelimit = false;
if (query.limitLength() && !query.limit_with_ties && !hasWithTotalsInAnySubqueryInFromClause(query) && if (!to_aggregation_stage &&
query.limitLength() && !query.limit_with_ties && !hasWithTotalsInAnySubqueryInFromClause(query) &&
!query.arrayJoinExpressionList() && !query.distinct && !expressions.hasLimitBy() && !settings.extremes) !query.arrayJoinExpressionList() && !query.distinct && !expressions.hasLimitBy() && !settings.extremes)
{ {
executePreLimit(query_plan, false); executePreLimit(query_plan, false);
@ -1023,17 +1048,22 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
has_prelimit = true; has_prelimit = true;
} }
/** We must do projection after DISTINCT because projection may remove some columns. /// Projection not be done on the shards, since then initiator will not find column in blocks.
*/ /// (significant only for WithMergeableStateAfterAggregation).
if (!to_aggregation_stage)
{
/// We must do projection after DISTINCT because projection may remove some columns.
executeProjection(query_plan, expressions.final_projection); executeProjection(query_plan, expressions.final_projection);
}
/** Extremes are calculated before LIMIT, but after LIMIT BY. This is Ok. /// Extremes are calculated before LIMIT, but after LIMIT BY. This is Ok.
*/
executeExtremes(query_plan); executeExtremes(query_plan);
if (!has_prelimit) /// Limit is no longer needed if there is prelimit. /// Limit is no longer needed if there is prelimit.
if (!to_aggregation_stage && !has_prelimit)
executeLimit(query_plan); executeLimit(query_plan);
if (!to_aggregation_stage)
executeOffset(query_plan); executeOffset(query_plan);
} }
} }

View File

@ -452,6 +452,8 @@ Block StorageMerge::getQueryHeader(
} }
case QueryProcessingStage::WithMergeableState: case QueryProcessingStage::WithMergeableState:
case QueryProcessingStage::Complete: case QueryProcessingStage::Complete:
case QueryProcessingStage::WithMergeableStateAfterAggregation:
case QueryProcessingStage::MAX:
{ {
auto query = query_info.query->clone(); auto query = query_info.query->clone();
removeJoin(*query->as<ASTSelectQuery>()); removeJoin(*query->as<ASTSelectQuery>());