mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-22 17:50:47 +00:00
clickhouse: fixed ARRAY JOIN in distributed queries without WHERE and aggregation. [#METR-10349]
This commit is contained in:
parent
ccc3da3148
commit
420d5696ea
@ -65,7 +65,7 @@ private:
|
||||
void executeTotalsAndHaving( BlockInputStreams & streams, bool has_having, ExpressionActionsPtr expression,
|
||||
bool overflow_row);
|
||||
void executeHaving( BlockInputStreams & streams, ExpressionActionsPtr expression);
|
||||
void executeOuterExpression( BlockInputStreams & streams, ExpressionActionsPtr expression);
|
||||
void executeExpression( BlockInputStreams & streams, ExpressionActionsPtr expression);
|
||||
void executeOrder( BlockInputStreams & streams);
|
||||
void executePreLimit( BlockInputStreams & streams);
|
||||
void executeUnion( BlockInputStreams & streams);
|
||||
|
@ -210,6 +210,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
|
||||
bool has_having = false;
|
||||
bool has_order_by = !query.order_expression_list.isNull();
|
||||
|
||||
ExpressionActionsPtr array_join;
|
||||
ExpressionActionsPtr before_where;
|
||||
ExpressionActionsPtr before_aggregation;
|
||||
ExpressionActionsPtr before_having;
|
||||
@ -225,7 +226,8 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
|
||||
if (from_stage < QueryProcessingStage::WithMergeableState
|
||||
&& to_stage >= QueryProcessingStage::WithMergeableState)
|
||||
{
|
||||
query_analyzer->appendArrayJoin(chain);
|
||||
if (query_analyzer->appendArrayJoin(chain))
|
||||
array_join = chain.getLastActions();
|
||||
|
||||
if (query_analyzer->appendWhere(chain))
|
||||
{
|
||||
@ -312,6 +314,16 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
|
||||
if (need_aggregate)
|
||||
executeAggregation(streams, before_aggregation, aggregate_overflow_row, aggregate_final);
|
||||
|
||||
if (array_join && !has_where && !need_aggregate && to_stage == QueryProcessingStage::WithMergeableState)
|
||||
{
|
||||
/** Если есть ARRAY JOIN, его действие сначала старается оказаться в
|
||||
* before_where, before_aggregation или before_order_and_select.
|
||||
* Если ни одного из них нет, array_join нужно выполнить отдельно.
|
||||
*/
|
||||
|
||||
executeExpression(streams, array_join);
|
||||
}
|
||||
|
||||
/** Оптимизация - при распределённой обработке запроса,
|
||||
* если не указаны DISTINCT, GROUP, HAVING, ORDER, но указан LIMIT,
|
||||
* то выполним предварительный LIMIT на удалёном сервере.
|
||||
@ -339,7 +351,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
|
||||
executeHaving(streams, before_having);
|
||||
}
|
||||
|
||||
executeOuterExpression(streams, before_order_and_select);
|
||||
executeExpression(streams, before_order_and_select);
|
||||
|
||||
if (has_order_by)
|
||||
executeOrder(streams);
|
||||
@ -667,7 +679,7 @@ void InterpreterSelectQuery::executeTotalsAndHaving(BlockInputStreams & streams,
|
||||
}
|
||||
|
||||
|
||||
void InterpreterSelectQuery::executeOuterExpression(BlockInputStreams & streams, ExpressionActionsPtr expression)
|
||||
void InterpreterSelectQuery::executeExpression(BlockInputStreams & streams, ExpressionActionsPtr expression)
|
||||
{
|
||||
bool is_async = settings.asynchronous && streams.size() <= settings.max_threads;
|
||||
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
|
||||
|
Loading…
Reference in New Issue
Block a user