Fix: for joins with old analyzer

This commit is contained in:
Igor Nikonov 2023-12-22 17:26:31 +00:00
parent b9f1039ac4
commit 6f68696c14
5 changed files with 69 additions and 20 deletions

View File

@ -1414,7 +1414,10 @@ FutureSetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool
set_key = right_in_operand->getTreeHash(/*ignore_aliases=*/ true); set_key = right_in_operand->getTreeHash(/*ignore_aliases=*/ true);
if (auto set = data.prepared_sets->findSubquery(set_key)) if (auto set = data.prepared_sets->findSubquery(set_key))
{
set->markAsINSubquery();
return set; return set;
}
FutureSetPtr external_table_set; FutureSetPtr external_table_set;
@ -1460,7 +1463,8 @@ FutureSetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool
interpreter->buildQueryPlan(*source); interpreter->buildQueryPlan(*source);
} }
return data.prepared_sets->addFromSubquery(set_key, std::move(source), nullptr, std::move(external_table_set), data.getContext()->getSettingsRef()); return data.prepared_sets->addFromSubquery(
set_key, std::move(source), nullptr, std::move(external_table_set), data.getContext()->getSettingsRef(), true);
} }
else else
{ {

View File

@ -870,7 +870,38 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis()
ASTSelectQuery & query = getSelectQuery(); ASTSelectQuery & query = getSelectQuery();
/// While only_analyze we don't know anything about parts, so any decision about how many parallel replicas to use would be wrong /// While only_analyze we don't know anything about parts, so any decision about how many parallel replicas to use would be wrong
if (!storage || options.only_analyze || !context->canUseParallelReplicasOnInitiator()) if (!storage || !context->canUseParallelReplicasOnInitiator())
return false;
if (query_analyzer->getPreparedSets()->hasSubqueries())
{
bool in_subqueries = false;
const auto & sets = query_analyzer->getPreparedSets();
const auto subqueries = sets->getSubqueries();
for(const auto & subquery : subqueries)
{
if (subquery->isINSubquery())
{
in_subqueries = true;
break;
}
}
// LOG_DEBUG(log, "Prepared sets: subqueries={} in_subqueries={}\n{}", subqueries.size(), in_subqueries, StackTrace().toString());
if (in_subqueries)
{
if (settings.allow_experimental_parallel_reading_from_replicas == 2)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "IN with subquery is not supported with parallel replicas");
context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
context->setSetting("max_parallel_replicas", UInt64{0});
LOG_DEBUG(log, "Disabling parallel replicas to execute a query with IN with subquery");
return true;
}
}
if (options.only_analyze)
return false; return false;
if (getTrivialCount(0).has_value()) if (getTrivialCount(0).has_value())
@ -882,17 +913,6 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis()
return true; return true;
} }
if (query_analyzer->getPreparedSets()->hasSubqueries())
{
if (settings.allow_experimental_parallel_reading_from_replicas == 2)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "IN with subquery is not supported with parallel replicas");
context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
context->setSetting("max_parallel_replicas", UInt64{0});
LOG_DEBUG(log, "Disabling parallel replicas to execute a query with IN with subquery");
return true;
}
auto storage_merge_tree = std::dynamic_pointer_cast<MergeTreeData>(storage); auto storage_merge_tree = std::dynamic_pointer_cast<MergeTreeData>(storage);
if (!storage_merge_tree || settings.parallel_replicas_min_number_of_rows_per_replica == 0) if (!storage_merge_tree || settings.parallel_replicas_min_number_of_rows_per_replica == 0)
return false; return false;

View File

@ -98,10 +98,12 @@ FutureSetFromSubquery::FutureSetFromSubquery(
std::unique_ptr<QueryPlan> source_, std::unique_ptr<QueryPlan> source_,
StoragePtr external_table_, StoragePtr external_table_,
FutureSetPtr external_table_set_, FutureSetPtr external_table_set_,
const Settings & settings) const Settings & settings,
bool in_subquery_)
: external_table(std::move(external_table_)) : external_table(std::move(external_table_))
, external_table_set(std::move(external_table_set_)) , external_table_set(std::move(external_table_set_))
, source(std::move(source_)) , source(std::move(source_))
, in_subquery(in_subquery_)
{ {
set_and_key = std::make_shared<SetAndKey>(); set_and_key = std::make_shared<SetAndKey>();
set_and_key->key = std::move(key); set_and_key->key = std::move(key);
@ -261,14 +263,18 @@ FutureSetPtr PreparedSets::addFromSubquery(
std::unique_ptr<QueryPlan> source, std::unique_ptr<QueryPlan> source,
StoragePtr external_table, StoragePtr external_table,
FutureSetPtr external_table_set, FutureSetPtr external_table_set,
const Settings & settings) const Settings & settings,
bool in_subquery)
{ {
// LOG_DEBUG(&Poco::Logger::get(__PRETTY_FUNCTION__), "in_subquery={}\n{}", in_subquery, StackTrace().toString());
auto from_subquery = std::make_shared<FutureSetFromSubquery>( auto from_subquery = std::make_shared<FutureSetFromSubquery>(
toString(key, {}), toString(key, {}),
std::move(source), std::move(source),
std::move(external_table), std::move(external_table),
std::move(external_table_set), std::move(external_table_set),
settings); settings,
in_subquery);
auto [it, inserted] = sets_from_subqueries.emplace(key, from_subquery); auto [it, inserted] = sets_from_subqueries.emplace(key, from_subquery);
@ -318,6 +324,15 @@ std::shared_ptr<FutureSetFromSubquery> PreparedSets::findSubquery(const Hash & k
return it->second; return it->second;
} }
void PreparedSets::markAsINSubquery(const Hash & key)
{
auto it = sets_from_subqueries.find(key);
if (it == sets_from_subqueries.end())
return;
it->second->markAsINSubquery();
}
std::shared_ptr<FutureSetFromStorage> PreparedSets::findStorage(const Hash & key) const std::shared_ptr<FutureSetFromStorage> PreparedSets::findStorage(const Hash & key) const
{ {
auto it = sets_from_storage.find(key); auto it = sets_from_storage.find(key);
@ -331,7 +346,7 @@ PreparedSets::Subqueries PreparedSets::getSubqueries() const
{ {
PreparedSets::Subqueries res; PreparedSets::Subqueries res;
res.reserve(sets_from_subqueries.size()); res.reserve(sets_from_subqueries.size());
for (auto & [_, set] : sets_from_subqueries) for (const auto & [_, set] : sets_from_subqueries)
res.push_back(set); res.push_back(set);
return res; return res;

View File

@ -59,7 +59,7 @@ using FutureSetPtr = std::shared_ptr<FutureSet>;
class FutureSetFromStorage final : public FutureSet class FutureSetFromStorage final : public FutureSet
{ {
public: public:
FutureSetFromStorage(SetPtr set_); explicit FutureSetFromStorage(SetPtr set_);
SetPtr get() const override; SetPtr get() const override;
DataTypes getTypes() const override; DataTypes getTypes() const override;
@ -97,7 +97,8 @@ public:
std::unique_ptr<QueryPlan> source_, std::unique_ptr<QueryPlan> source_,
StoragePtr external_table_, StoragePtr external_table_,
FutureSetPtr external_table_set_, FutureSetPtr external_table_set_,
const Settings & settings); const Settings & settings,
bool in_subquery_);
FutureSetFromSubquery( FutureSetFromSubquery(
String key, String key,
@ -112,6 +113,8 @@ public:
QueryTreeNodePtr detachQueryTree() { return std::move(query_tree); } QueryTreeNodePtr detachQueryTree() { return std::move(query_tree); }
void setQueryPlan(std::unique_ptr<QueryPlan> source_); void setQueryPlan(std::unique_ptr<QueryPlan> source_);
void markAsINSubquery() { in_subquery = true; }
bool isINSubquery() const { return in_subquery; }
private: private:
SetAndKeyPtr set_and_key; SetAndKeyPtr set_and_key;
@ -120,6 +123,7 @@ private:
std::unique_ptr<QueryPlan> source; std::unique_ptr<QueryPlan> source;
QueryTreeNodePtr query_tree; QueryTreeNodePtr query_tree;
bool in_subquery = false; // subquery used in IN operator
}; };
/// Container for all the sets used in query. /// Container for all the sets used in query.
@ -145,7 +149,8 @@ public:
std::unique_ptr<QueryPlan> source, std::unique_ptr<QueryPlan> source,
StoragePtr external_table, StoragePtr external_table,
FutureSetPtr external_table_set, FutureSetPtr external_table_set,
const Settings & settings); const Settings & settings,
bool in_subquery = false);
FutureSetPtr addFromSubquery( FutureSetPtr addFromSubquery(
const Hash & key, const Hash & key,
@ -155,6 +160,7 @@ public:
FutureSetPtr findTuple(const Hash & key, const DataTypes & types) const; FutureSetPtr findTuple(const Hash & key, const DataTypes & types) const;
std::shared_ptr<FutureSetFromStorage> findStorage(const Hash & key) const; std::shared_ptr<FutureSetFromStorage> findStorage(const Hash & key) const;
std::shared_ptr<FutureSetFromSubquery> findSubquery(const Hash & key) const; std::shared_ptr<FutureSetFromSubquery> findSubquery(const Hash & key) const;
void markAsINSubquery(const Hash & key);
using Subqueries = std::vector<std::shared_ptr<FutureSetFromSubquery>>; using Subqueries = std::vector<std::shared_ptr<FutureSetFromSubquery>>;
Subqueries getSubqueries() const; Subqueries getSubqueries() const;

View File

@ -1,5 +1,7 @@
-- Tags: zookeeper -- Tags: zookeeper
DROP TABLE IF EXISTS join_inner_table SYNC;
CREATE TABLE join_inner_table CREATE TABLE join_inner_table
( (
id UUID, id UUID,
@ -77,6 +79,8 @@ ORDER BY is_initial_query, c, query;
---- Query with JOIN ---- Query with JOIN
DROP TABLE IF EXISTS join_outer_table SYNC;
CREATE TABLE join_outer_table CREATE TABLE join_outer_table
( (
id UUID, id UUID,