mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-11 18:14:03 +00:00
clickhouse: fixed subqueries with Distributed tables [#CONV-6822].
This commit is contained in:
parent
d2241771bd
commit
1e338dbd6d
@ -34,14 +34,14 @@ public:
|
||||
* Заменяет узлы ASTSubquery на узлы ASTSet.
|
||||
* Следует вызывать перед execute, если в выражении могут быть множества.
|
||||
*/
|
||||
void makeSets(size_t subquery_depth = 0);
|
||||
void makeSets(size_t subquery_depth = 0, unsigned part_id = 0);
|
||||
|
||||
/** Выполнить подзапросы не в секциях IN и FROM и преобразовать их в константы.
|
||||
* Поддерживаются только независимые подзапросы.
|
||||
* Следует вызывать перед execute, если в выражении могут быть скалярные подзапросы.
|
||||
* Заменяет узлы ASTSubquery на узлы ASTLiteral или tuple.
|
||||
*/
|
||||
void resolveScalarSubqueries(size_t subquery_depth = 0);
|
||||
void resolveScalarSubqueries(size_t subquery_depth = 0, unsigned part_id = 0);
|
||||
|
||||
/** Выполнить выражение над блоком. Блок должен содержать все столбцы - идентификаторы.
|
||||
* Функция добавляет в блок новые столбцы - результаты вычислений.
|
||||
@ -145,9 +145,9 @@ private:
|
||||
|
||||
void markBeforeAggregationImpl(ASTPtr ast, unsigned before_part_id, bool below = false);
|
||||
|
||||
void makeSetsImpl(ASTPtr ast, size_t subquery_depth);
|
||||
void makeSetsImpl(ASTPtr ast, size_t subquery_depth, unsigned part_id);
|
||||
|
||||
void resolveScalarSubqueriesImpl(ASTPtr & ast, size_t subquery_depth);
|
||||
void resolveScalarSubqueriesImpl(ASTPtr & ast, size_t subquery_depth, unsigned part_id);
|
||||
|
||||
bool getArrayJoinInfoImpl(ASTPtr ast, String & column_name);
|
||||
|
||||
|
@ -723,13 +723,13 @@ void Expression::markBeforeArrayJoin(unsigned part_id)
|
||||
}
|
||||
|
||||
|
||||
void Expression::makeSets(size_t subquery_depth)
|
||||
void Expression::makeSets(size_t subquery_depth, unsigned part_id)
|
||||
{
|
||||
makeSetsImpl(ast, subquery_depth);
|
||||
makeSetsImpl(ast, subquery_depth, part_id);
|
||||
}
|
||||
|
||||
|
||||
void Expression::makeSetsImpl(ASTPtr ast, size_t subquery_depth)
|
||||
void Expression::makeSetsImpl(ASTPtr ast, size_t subquery_depth, unsigned part_id)
|
||||
{
|
||||
bool made = false;
|
||||
|
||||
@ -738,6 +738,10 @@ void Expression::makeSetsImpl(ASTPtr ast, size_t subquery_depth)
|
||||
{
|
||||
if (func->name == "in" || func->name == "notIn")
|
||||
{
|
||||
/// Проверим, что мы в правильной части дерева.
|
||||
if (!((ast->part_id & part_id) || (ast->part_id == 0 && part_id == 0)))
|
||||
return;
|
||||
|
||||
made = true;
|
||||
|
||||
if (func->children.size() != 1)
|
||||
@ -807,21 +811,25 @@ void Expression::makeSetsImpl(ASTPtr ast, size_t subquery_depth)
|
||||
if (!made)
|
||||
for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it)
|
||||
if (!dynamic_cast<ASTSelectQuery *>(&**it))
|
||||
makeSetsImpl(*it, subquery_depth);
|
||||
makeSetsImpl(*it, subquery_depth, part_id);
|
||||
}
|
||||
|
||||
|
||||
void Expression::resolveScalarSubqueries(size_t subquery_depth)
|
||||
void Expression::resolveScalarSubqueries(size_t subquery_depth, unsigned part_id)
|
||||
{
|
||||
resolveScalarSubqueriesImpl(ast, subquery_depth);
|
||||
resolveScalarSubqueriesImpl(ast, subquery_depth, part_id);
|
||||
}
|
||||
|
||||
|
||||
void Expression::resolveScalarSubqueriesImpl(ASTPtr & ast, size_t subquery_depth)
|
||||
void Expression::resolveScalarSubqueriesImpl(ASTPtr & ast, size_t subquery_depth, unsigned part_id)
|
||||
{
|
||||
/// Обход в глубину. Ищем подзапросы.
|
||||
if (ASTSubquery * subquery = dynamic_cast<ASTSubquery *>(&*ast))
|
||||
{
|
||||
/// Проверим, что мы в правильной части дерева.
|
||||
if (!((ast->part_id & part_id) || (ast->part_id == 0 && part_id == 0)))
|
||||
return;
|
||||
|
||||
/// Исполняем подзапрос, превращаем результат в множество, и кладём это множество на место подзапроса.
|
||||
InterpreterSelectQuery interpreter(subquery->children[0], context, QueryProcessingStage::Complete, subquery_depth + 1);
|
||||
BlockInputStreamPtr res_stream = interpreter.execute();
|
||||
@ -867,7 +875,7 @@ void Expression::resolveScalarSubqueriesImpl(ASTPtr & ast, size_t subquery_depth
|
||||
if (recurse)
|
||||
for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it)
|
||||
if (!dynamic_cast<ASTSelectQuery *>(&**it)) /// А также не опускаемся в подзапросы в секции FROM.
|
||||
resolveScalarSubqueriesImpl(*it, subquery_depth);
|
||||
resolveScalarSubqueriesImpl(*it, subquery_depth, part_id);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -147,22 +147,36 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
|
||||
|
||||
if (to_stage > QueryProcessingStage::FetchColumns)
|
||||
{
|
||||
/// Вычислим подзапросы в секции IN.
|
||||
expression->makeSets(subquery_depth);
|
||||
/// А также скалярные подзапросы.
|
||||
expression->resolveScalarSubqueries(subquery_depth);
|
||||
|
||||
/// Нужно ли агрегировать.
|
||||
bool need_aggregate = expression->hasAggregates() || query.group_expression_list;
|
||||
|
||||
if (from_stage < QueryProcessingStage::WithMergeableState)
|
||||
{
|
||||
/// Вычислим подзапросы в секции IN.
|
||||
expression->makeSets(subquery_depth);
|
||||
/// А также скалярные подзапросы.
|
||||
expression->resolveScalarSubqueries(subquery_depth);
|
||||
|
||||
executeArrayJoin(streams, expression);
|
||||
executeWhere(streams, expression);
|
||||
|
||||
if (need_aggregate)
|
||||
executeAggregation(streams, expression);
|
||||
}
|
||||
else if (from_stage <= QueryProcessingStage::WithMergeableState && to_stage > QueryProcessingStage::WithMergeableState)
|
||||
{
|
||||
/// Части могут пересекаться из-за склеивания поддеревьев. Здесь это не мешает.
|
||||
setPartID(query.select_expression_list, PART_SELECT);
|
||||
if (query.order_expression_list)
|
||||
setPartID(query.order_expression_list, PART_ORDER);
|
||||
if (query.having_expression)
|
||||
setPartID(query.having_expression, PART_HAVING);
|
||||
|
||||
/// Вычислим подзапросы в секции IN.
|
||||
expression->makeSets(subquery_depth, PART_SELECT | PART_HAVING | PART_ORDER);
|
||||
/// А также скалярные подзапросы.
|
||||
expression->resolveScalarSubqueries(subquery_depth, PART_SELECT | PART_HAVING | PART_ORDER);
|
||||
}
|
||||
|
||||
if (from_stage <= QueryProcessingStage::WithMergeableState
|
||||
&& to_stage > QueryProcessingStage::WithMergeableState)
|
||||
|
Loading…
Reference in New Issue
Block a user