From 1e338dbd6de132f4b5adf38c7ed6303780359d14 Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Wed, 27 Mar 2013 12:04:48 +0000 Subject: [PATCH] clickhouse: fixed subqueries with Distributed tables [#CONV-6822]. --- dbms/include/DB/Interpreters/Expression.h | 8 +++---- dbms/src/Interpreters/Expression.cpp | 24 ++++++++++++------- .../Interpreters/InterpreterSelectQuery.cpp | 24 +++++++++++++++---- 3 files changed, 39 insertions(+), 17 deletions(-) diff --git a/dbms/include/DB/Interpreters/Expression.h b/dbms/include/DB/Interpreters/Expression.h index 1985e3eb729..d1b726140b6 100644 --- a/dbms/include/DB/Interpreters/Expression.h +++ b/dbms/include/DB/Interpreters/Expression.h @@ -34,14 +34,14 @@ public: * Заменяет узлы ASTSubquery на узлы ASTSet. * Следует вызывать перед execute, если в выражении могут быть множества. */ - void makeSets(size_t subquery_depth = 0); + void makeSets(size_t subquery_depth = 0, unsigned part_id = 0); /** Выполнить подзапросы не в секциях IN и FROM и преобразовать их в константы. * Поддерживаются только независимые подзапросы. * Следует вызывать перед execute, если в выражении могут быть скалярные подзапросы. * Заменяет узлы ASTSubquery на узлы ASTLiteral или tuple. */ - void resolveScalarSubqueries(size_t subquery_depth = 0); + void resolveScalarSubqueries(size_t subquery_depth = 0, unsigned part_id = 0); /** Выполнить выражение над блоком. Блок должен содержать все столбцы - идентификаторы. * Функция добавляет в блок новые столбцы - результаты вычислений. @@ -145,9 +145,9 @@ private: void markBeforeAggregationImpl(ASTPtr ast, unsigned before_part_id, bool below = false); - void makeSetsImpl(ASTPtr ast, size_t subquery_depth); + void makeSetsImpl(ASTPtr ast, size_t subquery_depth, unsigned part_id); - void resolveScalarSubqueriesImpl(ASTPtr & ast, size_t subquery_depth); + void resolveScalarSubqueriesImpl(ASTPtr & ast, size_t subquery_depth, unsigned part_id); bool getArrayJoinInfoImpl(ASTPtr ast, String & column_name); diff --git a/dbms/src/Interpreters/Expression.cpp b/dbms/src/Interpreters/Expression.cpp index 37ffe371f54..03937702ea9 100644 --- a/dbms/src/Interpreters/Expression.cpp +++ b/dbms/src/Interpreters/Expression.cpp @@ -723,13 +723,13 @@ void Expression::markBeforeArrayJoin(unsigned part_id) } -void Expression::makeSets(size_t subquery_depth) +void Expression::makeSets(size_t subquery_depth, unsigned part_id) { - makeSetsImpl(ast, subquery_depth); + makeSetsImpl(ast, subquery_depth, part_id); } -void Expression::makeSetsImpl(ASTPtr ast, size_t subquery_depth) +void Expression::makeSetsImpl(ASTPtr ast, size_t subquery_depth, unsigned part_id) { bool made = false; @@ -738,6 +738,10 @@ void Expression::makeSetsImpl(ASTPtr ast, size_t subquery_depth) { if (func->name == "in" || func->name == "notIn") { + /// Проверим, что мы в правильной части дерева. + if (!((ast->part_id & part_id) || (ast->part_id == 0 && part_id == 0))) + return; + made = true; if (func->children.size() != 1) @@ -807,21 +811,25 @@ void Expression::makeSetsImpl(ASTPtr ast, size_t subquery_depth) if (!made) for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it) if (!dynamic_cast(&**it)) - makeSetsImpl(*it, subquery_depth); + makeSetsImpl(*it, subquery_depth, part_id); } -void Expression::resolveScalarSubqueries(size_t subquery_depth) +void Expression::resolveScalarSubqueries(size_t subquery_depth, unsigned part_id) { - resolveScalarSubqueriesImpl(ast, subquery_depth); + resolveScalarSubqueriesImpl(ast, subquery_depth, part_id); } -void Expression::resolveScalarSubqueriesImpl(ASTPtr & ast, size_t subquery_depth) +void Expression::resolveScalarSubqueriesImpl(ASTPtr & ast, size_t subquery_depth, unsigned part_id) { /// Обход в глубину. Ищем подзапросы. if (ASTSubquery * subquery = dynamic_cast(&*ast)) { + /// Проверим, что мы в правильной части дерева. + if (!((ast->part_id & part_id) || (ast->part_id == 0 && part_id == 0))) + return; + /// Исполняем подзапрос, превращаем результат в множество, и кладём это множество на место подзапроса. InterpreterSelectQuery interpreter(subquery->children[0], context, QueryProcessingStage::Complete, subquery_depth + 1); BlockInputStreamPtr res_stream = interpreter.execute(); @@ -867,7 +875,7 @@ void Expression::resolveScalarSubqueriesImpl(ASTPtr & ast, size_t subquery_depth if (recurse) for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it) if (!dynamic_cast(&**it)) /// А также не опускаемся в подзапросы в секции FROM. - resolveScalarSubqueriesImpl(*it, subquery_depth); + resolveScalarSubqueriesImpl(*it, subquery_depth, part_id); } } diff --git a/dbms/src/Interpreters/InterpreterSelectQuery.cpp b/dbms/src/Interpreters/InterpreterSelectQuery.cpp index 4e7289c1896..b38db749743 100644 --- a/dbms/src/Interpreters/InterpreterSelectQuery.cpp +++ b/dbms/src/Interpreters/InterpreterSelectQuery.cpp @@ -147,22 +147,36 @@ BlockInputStreamPtr InterpreterSelectQuery::execute() if (to_stage > QueryProcessingStage::FetchColumns) { - /// Вычислим подзапросы в секции IN. - expression->makeSets(subquery_depth); - /// А также скалярные подзапросы. - expression->resolveScalarSubqueries(subquery_depth); - /// Нужно ли агрегировать. bool need_aggregate = expression->hasAggregates() || query.group_expression_list; if (from_stage < QueryProcessingStage::WithMergeableState) { + /// Вычислим подзапросы в секции IN. + expression->makeSets(subquery_depth); + /// А также скалярные подзапросы. + expression->resolveScalarSubqueries(subquery_depth); + executeArrayJoin(streams, expression); executeWhere(streams, expression); if (need_aggregate) executeAggregation(streams, expression); } + else if (from_stage <= QueryProcessingStage::WithMergeableState && to_stage > QueryProcessingStage::WithMergeableState) + { + /// Части могут пересекаться из-за склеивания поддеревьев. Здесь это не мешает. + setPartID(query.select_expression_list, PART_SELECT); + if (query.order_expression_list) + setPartID(query.order_expression_list, PART_ORDER); + if (query.having_expression) + setPartID(query.having_expression, PART_HAVING); + + /// Вычислим подзапросы в секции IN. + expression->makeSets(subquery_depth, PART_SELECT | PART_HAVING | PART_ORDER); + /// А также скалярные подзапросы. + expression->resolveScalarSubqueries(subquery_depth, PART_SELECT | PART_HAVING | PART_ORDER); + } if (from_stage <= QueryProcessingStage::WithMergeableState && to_stage > QueryProcessingStage::WithMergeableState)