Merge pull request #59239 from Algunenano/parallel_replicas_cte

Disable parallel replicas JOIN with CTE (not analyzer)
This commit is contained in:
Raúl Marín 2024-01-31 14:45:13 +01:00 committed by GitHub
commit dad6ea0930
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 85 additions and 82 deletions

View File

@ -1414,10 +1414,7 @@ FutureSetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool
set_key = right_in_operand->getTreeHash(/*ignore_aliases=*/ true);
if (auto set = data.prepared_sets->findSubquery(set_key))
{
set->markAsINSubquery();
return set;
}
FutureSetFromSubqueryPtr external_table_set;
@ -1464,7 +1461,7 @@ FutureSetPtr ActionsMatcher::makeSet(const ASTFunction & node, Data & data, bool
}
return data.prepared_sets->addFromSubquery(
set_key, std::move(source), nullptr, std::move(external_table_set), data.getContext()->getSettingsRef(), /*in_subquery=*/true);
set_key, std::move(source), nullptr, std::move(external_table_set), data.getContext()->getSettingsRef());
}
else
{

View File

@ -32,6 +32,7 @@ namespace ErrorCodes
{
extern const int WRONG_GLOBAL_SUBQUERY;
extern const int LOGICAL_ERROR;
extern const int SUPPORT_IS_DISABLED;
}
class GlobalSubqueriesMatcher
@ -200,23 +201,33 @@ public:
}
private:
static bool shouldBeExecutedGlobally(const Data & data)
{
const Settings & settings = data.getContext()->getSettingsRef();
/// For parallel replicas we reinterpret JOIN as GLOBAL JOIN as a way to broadcast data
const bool enable_parallel_processing_of_joins = data.getContext()->canUseParallelReplicasOnInitiator();
return settings.prefer_global_in_and_join || enable_parallel_processing_of_joins;
}
/// GLOBAL IN
static void visit(ASTFunction & func, ASTPtr &, Data & data)
{
if ((shouldBeExecutedGlobally(data)
const Settings & settings = data.getContext()->getSettingsRef();
const bool prefer_global = settings.prefer_global_in_and_join;
const bool enable_parallel_processing_of_joins = data.getContext()->canUseParallelReplicasOnInitiator();
if (((prefer_global || enable_parallel_processing_of_joins)
&& (func.name == "in" || func.name == "notIn" || func.name == "nullIn" || func.name == "notNullIn"))
|| func.name == "globalIn" || func.name == "globalNotIn" || func.name == "globalNullIn" || func.name == "globalNotNullIn")
{
ASTPtr & ast = func.arguments->children[1];
if (enable_parallel_processing_of_joins)
{
/// We don't enable parallel replicas for IN (subquery)
if (ast->as<ASTSubquery>())
{
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
{
LOG_DEBUG(getLogger("GlobalSubqueriesMatcher"), "IN with subquery is not supported with parallel replicas");
data.getContext()->getQueryContext()->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
return;
}
else if (settings.allow_experimental_parallel_reading_from_replicas == 2)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "IN with subquery is not supported with parallel replicas");
}
}
/// Literal or function can use regular IN.
/// NOTE: We don't support passing table functions to IN.
@ -241,9 +252,39 @@ private:
/// GLOBAL JOIN
static void visit(ASTTablesInSelectQueryElement & table_elem, ASTPtr &, Data & data)
{
const Settings & settings = data.getContext()->getSettingsRef();
const bool prefer_global = settings.prefer_global_in_and_join;
const bool enable_parallel_processing_of_joins = data.getContext()->canUseParallelReplicasOnInitiator();
if (table_elem.table_join
&& (table_elem.table_join->as<ASTTableJoin &>().locality == JoinLocality::Global || shouldBeExecutedGlobally(data)))
&& (table_elem.table_join->as<ASTTableJoin &>().locality == JoinLocality::Global || prefer_global
|| enable_parallel_processing_of_joins))
{
if (enable_parallel_processing_of_joins)
{
/// For parallel replicas we currently only support JOIN with subqueries
/// Note that tableA join tableB is previously converted into tableA JOIN (Select * FROM tableB) so that's ok
/// We don't support WITH cte as (subquery) Select table JOIN cte because we don't do conversion in AST
bool is_subquery = false;
if (const auto * ast_table_expr = table_elem.table_expression->as<ASTTableExpression>())
is_subquery = ast_table_expr->subquery->as<ASTSubquery>() != nullptr
&& ast_table_expr->subquery->as<ASTSubquery>()->cte_name.empty();
else if (table_elem.table_expression->as<ASTSubquery>())
is_subquery = true;
if (!is_subquery)
{
if (settings.allow_experimental_parallel_reading_from_replicas == 1)
{
LOG_DEBUG(getLogger("GlobalSubqueriesMatcher"), "JOIN with parallel replicas is only supported with subqueries");
data.getContext()->getQueryContext()->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
return;
}
else if (settings.allow_experimental_parallel_reading_from_replicas == 2)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "JOIN with parallel replicas is only supported with subqueries");
}
}
Names required_columns;
/// Fill required columns for GLOBAL JOIN.

View File

@ -864,38 +864,7 @@ bool InterpreterSelectQuery::adjustParallelReplicasAfterAnalysis()
ASTSelectQuery & query = getSelectQuery();
/// While only_analyze we don't know anything about parts, so any decision about how many parallel replicas to use would be wrong
if (!storage || !context->canUseParallelReplicasOnInitiator())
return false;
/// check if IN operator with subquery is present in the query
/// if so, disable parallel replicas
if (query_analyzer->getPreparedSets()->hasSubqueries())
{
bool in_subqueries = false;
const auto & sets = query_analyzer->getPreparedSets();
const auto subqueries = sets->getSubqueries();
for (const auto & subquery : subqueries)
{
if (subquery->isINSubquery())
{
in_subqueries = true;
break;
}
}
if (in_subqueries)
{
if (settings.allow_experimental_parallel_reading_from_replicas == 2)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "IN with subquery is not supported with parallel replicas");
context->setSetting("allow_experimental_parallel_reading_from_replicas", Field(0));
context->setSetting("max_parallel_replicas", UInt64{0});
LOG_DEBUG(log, "Disabling parallel replicas to execute a query with IN with subquery");
return true;
}
}
if (options.only_analyze)
if (!storage || options.only_analyze || !context->canUseParallelReplicasOnInitiator())
return false;
if (getTrivialCount(0).has_value())

View File

@ -98,12 +98,8 @@ FutureSetFromSubquery::FutureSetFromSubquery(
std::unique_ptr<QueryPlan> source_,
StoragePtr external_table_,
std::shared_ptr<FutureSetFromSubquery> external_table_set_,
const Settings & settings,
bool in_subquery_)
: external_table(std::move(external_table_))
, external_table_set(std::move(external_table_set_))
, source(std::move(source_))
, in_subquery(in_subquery_)
const Settings & settings)
: external_table(std::move(external_table_)), external_table_set(std::move(external_table_set_)), source(std::move(source_))
{
set_and_key = std::make_shared<SetAndKey>();
set_and_key->key = std::move(key);
@ -281,16 +277,10 @@ FutureSetFromSubqueryPtr PreparedSets::addFromSubquery(
std::unique_ptr<QueryPlan> source,
StoragePtr external_table,
FutureSetFromSubqueryPtr external_table_set,
const Settings & settings,
bool in_subquery)
const Settings & settings)
{
auto from_subquery = std::make_shared<FutureSetFromSubquery>(
toString(key, {}),
std::move(source),
std::move(external_table),
std::move(external_table_set),
settings,
in_subquery);
toString(key, {}), std::move(source), std::move(external_table), std::move(external_table_set), settings);
auto [it, inserted] = sets_from_subqueries.emplace(key, from_subquery);
@ -340,15 +330,6 @@ std::shared_ptr<FutureSetFromSubquery> PreparedSets::findSubquery(const Hash & k
return it->second;
}
void PreparedSets::markAsINSubquery(const Hash & key)
{
auto it = sets_from_subqueries.find(key);
if (it == sets_from_subqueries.end())
return;
it->second->markAsINSubquery();
}
std::shared_ptr<FutureSetFromStorage> PreparedSets::findStorage(const Hash & key) const
{
auto it = sets_from_storage.find(key);

View File

@ -101,8 +101,7 @@ public:
std::unique_ptr<QueryPlan> source_,
StoragePtr external_table_,
std::shared_ptr<FutureSetFromSubquery> external_table_set_,
const Settings & settings,
bool in_subquery_);
const Settings & settings);
FutureSetFromSubquery(
String key,
@ -118,8 +117,6 @@ public:
QueryTreeNodePtr detachQueryTree() { return std::move(query_tree); }
void setQueryPlan(std::unique_ptr<QueryPlan> source_);
void markAsINSubquery() { in_subquery = true; }
bool isINSubquery() const { return in_subquery; }
private:
SetAndKeyPtr set_and_key;
@ -128,11 +125,6 @@ private:
std::unique_ptr<QueryPlan> source;
QueryTreeNodePtr query_tree;
bool in_subquery = false; // subquery used in IN operator
// the flag can be removed after enabling new analyzer and removing interpreter
// or after enabling support IN operator with subqueries in parallel replicas
// Note: it's necessary with interpreter since prepared sets used also for GLOBAL JOINs,
// with new analyzer it's not a case
};
using FutureSetFromSubqueryPtr = std::shared_ptr<FutureSetFromSubquery>;
@ -160,8 +152,7 @@ public:
std::unique_ptr<QueryPlan> source,
StoragePtr external_table,
FutureSetFromSubqueryPtr external_table_set,
const Settings & settings,
bool in_subquery = false);
const Settings & settings);
FutureSetFromSubqueryPtr addFromSubquery(
const Hash & key,
@ -171,7 +162,6 @@ public:
FutureSetFromTuplePtr findTuple(const Hash & key, const DataTypes & types) const;
FutureSetFromStoragePtr findStorage(const Hash & key) const;
FutureSetFromSubqueryPtr findSubquery(const Hash & key) const;
void markAsINSubquery(const Hash & key);
using Subqueries = std::vector<FutureSetFromSubqueryPtr>;
Subqueries getSubqueries() const;

View File

@ -0,0 +1,2 @@
990000
990000

View File

@ -0,0 +1,23 @@
DROP TABLE IF EXISTS pr_1;
DROP TABLE IF EXISTS pr_2;
CREATE TABLE pr_1 (`a` UInt32) ENGINE = MergeTree ORDER BY a PARTITION BY a % 10 AS
SELECT 10 * intDiv(number, 10) + 1 FROM numbers(1_000_000);
CREATE TABLE pr_2 (`a` UInt32) ENGINE = MergeTree ORDER BY a AS
SELECT * FROM numbers(1_000_000);
WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 10000)
SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a;
WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 10000)
SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a
SETTINGS allow_experimental_parallel_reading_from_replicas = 1, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3;
-- Testing that it is disabled for allow_experimental_analyzer=0. With analyzer it will be supported (with correct result)
WITH filtered_groups AS (SELECT a FROM pr_1 WHERE a >= 10000)
SELECT count() FROM pr_2 INNER JOIN filtered_groups ON pr_2.a = filtered_groups.a
SETTINGS allow_experimental_analyzer = 0, allow_experimental_parallel_reading_from_replicas = 2, parallel_replicas_for_non_replicated_merge_tree = 1, cluster_for_parallel_replicas = 'test_cluster_two_shards', max_parallel_replicas = 3; -- { serverError SUPPORT_IS_DISABLED }
DROP TABLE IF EXISTS pr_1;
DROP TABLE IF EXISTS pr_2;