From 6f68696c14f26dc2124e658b6cc8a48d2ca32217 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Fri, 22 Dec 2023 17:26:31 +0000 Subject: [PATCH] Fix: for joins with old analyzer --- src/Interpreters/ActionsVisitor.cpp | 6 ++- src/Interpreters/InterpreterSelectQuery.cpp | 44 ++++++++++++++----- src/Interpreters/PreparedSets.cpp | 23 ++++++++-- src/Interpreters/PreparedSets.h | 12 +++-- .../02731_parallel_replicas_join_subquery.sql | 4 ++ 5 files changed, 69 insertions(+), 20 deletions(-) diff --git a/src/Interpreters/ActionsVisitor.cpp b/src/Interpreters/ActionsVisitor.cpp index 6be9f6c803f..6ffe04d0c95 100644 --- a/src/Interpreters/ActionsVisitor.cpp +++ b/src/Interpreters/ActionsVisitor.cpp @@ -1414,7 +1414,10 @@ FutureSetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool set_key = right_in_operand->getTreeHash(/*ignore_aliases=*/ true); if (auto set = data.prepared_sets->findSubquery(set_key)) + { + set->markAsINSubquery(); return set; + } FutureSetPtr external_table_set; @@ -1460,7 +1463,8 @@ FutureSetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool interpreter->buildQueryPlan(*source); } - return data.prepared_sets->addFromSubquery(set_key, std::move(source), nullptr, std::move(external_table_set), data.getContext()->getSettingsRef()); + return data.prepared_sets->addFromSubquery( + set_key, std::move(source), nullptr, std::move(external_table_set), data.getContext()->getSettingsRef(), true); } else { diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 19deb16db7c..42a0d60fc8a 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -870,7 +870,38 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis() ASTSelectQuery & query = getSelectQuery(); /// While only_analyze we don't know anything about parts, so any decision about how many parallel replicas to use would be wrong - if (!storage || options.only_analyze || !context->canUseParallelReplicasOnInitiator()) + if (!storage || !context->canUseParallelReplicasOnInitiator()) + return false; + + if (query_analyzer->getPreparedSets()->hasSubqueries()) + { + bool in_subqueries = false; + const auto & sets = query_analyzer->getPreparedSets(); + const auto subqueries = sets->getSubqueries(); + for(const auto & subquery : subqueries) + { + if (subquery->isINSubquery()) + { + in_subqueries = true; + break; + } + } + + // LOG_DEBUG(log, "Prepared sets: subqueries={} in_subqueries={}\n{}", subqueries.size(), in_subqueries, StackTrace().toString()); + + if (in_subqueries) + { + if (settings.allow_experimental_parallel_reading_from_replicas == 2) + throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "IN with subquery is not supported with parallel replicas"); + + context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0)); + context->setSetting("max_parallel_replicas", UInt64{0}); + LOG_DEBUG(log, "Disabling parallel replicas to execute a query with IN with subquery"); + return true; + } + } + + if (options.only_analyze) return false; if (getTrivialCount(0).has_value()) @@ -882,17 +913,6 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis() return true; } - if (query_analyzer->getPreparedSets()->hasSubqueries()) - { - if (settings.allow_experimental_parallel_reading_from_replicas == 2) - throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "IN with subquery is not supported with parallel replicas"); - - context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0)); - context->setSetting("max_parallel_replicas", UInt64{0}); - LOG_DEBUG(log, "Disabling parallel replicas to execute a query with IN with subquery"); - return true; - } - auto storage_merge_tree = std::dynamic_pointer_cast(storage); if (!storage_merge_tree || settings.parallel_replicas_min_number_of_rows_per_replica == 0) return false; diff --git a/src/Interpreters/PreparedSets.cpp b/src/Interpreters/PreparedSets.cpp index 7468bd1d519..8e7c493d6a5 100644 --- a/src/Interpreters/PreparedSets.cpp +++ b/src/Interpreters/PreparedSets.cpp @@ -98,10 +98,12 @@ FutureSetFromSubquery::FutureSetFromSubquery( std::unique_ptr source_, StoragePtr external_table_, FutureSetPtr external_table_set_, - const Settings & settings) + const Settings & settings, + bool in_subquery_) : external_table(std::move(external_table_)) , external_table_set(std::move(external_table_set_)) , source(std::move(source_)) + , in_subquery(in_subquery_) { set_and_key = std::make_shared(); set_and_key->key = std::move(key); @@ -261,14 +263,18 @@ FutureSetPtr PreparedSets::addFromSubquery( std::unique_ptr source, StoragePtr external_table, FutureSetPtr external_table_set, - const Settings & settings) + const Settings & settings, + bool in_subquery) { + // LOG_DEBUG(&Poco::Logger::get(__PRETTY_FUNCTION__), "in_subquery={}\n{}", in_subquery, StackTrace().toString()); + auto from_subquery = std::make_shared( toString(key, {}), std::move(source), std::move(external_table), std::move(external_table_set), - settings); + settings, + in_subquery); auto [it, inserted] = sets_from_subqueries.emplace(key, from_subquery); @@ -318,6 +324,15 @@ std::shared_ptr PreparedSets::findSubquery(const Hash & k return it->second; } +void PreparedSets::markAsINSubquery(const Hash & key) +{ + auto it = sets_from_subqueries.find(key); + if (it == sets_from_subqueries.end()) + return; + + it->second->markAsINSubquery(); +} + std::shared_ptr PreparedSets::findStorage(const Hash & key) const { auto it = sets_from_storage.find(key); @@ -331,7 +346,7 @@ PreparedSets::Subqueries PreparedSets::getSubqueries() const { PreparedSets::Subqueries res; res.reserve(sets_from_subqueries.size()); - for (auto & [_, set] : sets_from_subqueries) + for (const auto & [_, set] : sets_from_subqueries) res.push_back(set); return res; diff --git a/src/Interpreters/PreparedSets.h b/src/Interpreters/PreparedSets.h index e6d499715b8..65072caf0a9 100644 --- a/src/Interpreters/PreparedSets.h +++ b/src/Interpreters/PreparedSets.h @@ -59,7 +59,7 @@ using FutureSetPtr = std::shared_ptr; class FutureSetFromStorage final : public FutureSet { public: - FutureSetFromStorage(SetPtr set_); + explicit FutureSetFromStorage(SetPtr set_); SetPtr get() const override; DataTypes getTypes() const override; @@ -97,7 +97,8 @@ public: std::unique_ptr source_, StoragePtr external_table_, FutureSetPtr external_table_set_, - const Settings & settings); + const Settings & settings, + bool in_subquery_); FutureSetFromSubquery( String key, @@ -112,6 +113,8 @@ public: QueryTreeNodePtr detachQueryTree() { return std::move(query_tree); } void setQueryPlan(std::unique_ptr source_); + void markAsINSubquery() { in_subquery = true; } + bool isINSubquery() const { return in_subquery; } private: SetAndKeyPtr set_and_key; @@ -120,6 +123,7 @@ private: std::unique_ptr source; QueryTreeNodePtr query_tree; + bool in_subquery = false; // subquery used in IN operator }; /// Container for all the sets used in query. @@ -145,7 +149,8 @@ public: std::unique_ptr source, StoragePtr external_table, FutureSetPtr external_table_set, - const Settings & settings); + const Settings & settings, + bool in_subquery = false); FutureSetPtr addFromSubquery( const Hash & key, @@ -155,6 +160,7 @@ public: FutureSetPtr findTuple(const Hash & key, const DataTypes & types) const; std::shared_ptr findStorage(const Hash & key) const; std::shared_ptr findSubquery(const Hash & key) const; + void markAsINSubquery(const Hash & key); using Subqueries = std::vector>; Subqueries getSubqueries() const; diff --git a/tests/queries/0_stateless/02731_parallel_replicas_join_subquery.sql b/tests/queries/0_stateless/02731_parallel_replicas_join_subquery.sql index fa40c96048c..a117378b0bf 100644 --- a/tests/queries/0_stateless/02731_parallel_replicas_join_subquery.sql +++ b/tests/queries/0_stateless/02731_parallel_replicas_join_subquery.sql @@ -1,5 +1,7 @@ -- Tags: zookeeper +DROP TABLE IF EXISTS join_inner_table SYNC; + CREATE TABLE join_inner_table ( id UUID, @@ -77,6 +79,8 @@ ORDER BY is_initial_query, c, query; ---- Query with JOIN +DROP TABLE IF EXISTS join_outer_table SYNC; + CREATE TABLE join_outer_table ( id UUID,