mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
partially fix distributed_product_mode=local
This commit is contained in:
parent
d083976b88
commit
4707200f83
@ -2,6 +2,7 @@
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/DatabaseAndTableWithAlias.h>
|
||||
#include <Interpreters/IdentifierSemantic.h>
|
||||
#include <Interpreters/InDepthNodeVisitor.h>
|
||||
#include <Storages/StorageDistributed.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
#include <Parsers/ASTSelectQuery.h>
|
||||
@ -23,84 +24,169 @@ namespace ErrorCodes
|
||||
namespace
|
||||
{
|
||||
|
||||
/** Call a function for each non-GLOBAL subquery in IN or JOIN.
|
||||
* Pass to function: AST node with subquery, and AST node with corresponding IN function or JOIN.
|
||||
* Consider only first-level subqueries (do not go recursively into subqueries).
|
||||
*/
|
||||
template <typename F>
|
||||
void forEachNonGlobalSubquery(IAST * node, F && f)
|
||||
{
|
||||
if (auto * function = node->as<ASTFunction>())
|
||||
{
|
||||
if (function->name == "in" || function->name == "notIn")
|
||||
{
|
||||
f(function->arguments->children.at(1).get(), function, nullptr);
|
||||
return;
|
||||
}
|
||||
|
||||
/// Pass into other functions, as subquery could be in aggregate or in lambda functions.
|
||||
}
|
||||
else if (const auto * join = node->as<ASTTablesInSelectQueryElement>())
|
||||
{
|
||||
if (join->table_join && join->table_expression)
|
||||
{
|
||||
auto & table_join = join->table_join->as<ASTTableJoin &>();
|
||||
if (table_join.locality != ASTTableJoin::Locality::Global)
|
||||
{
|
||||
auto & subquery = join->table_expression->as<ASTTableExpression>()->subquery;
|
||||
if (subquery)
|
||||
f(subquery.get(), nullptr, &table_join);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/// Pass into other kind of JOINs, as subquery could be in ARRAY JOIN.
|
||||
}
|
||||
|
||||
/// Descent into all children, but not into subqueries of other kind (scalar subqueries), that are irrelevant to us.
|
||||
for (auto & child : node->children)
|
||||
if (!child->as<ASTSelectQuery>())
|
||||
forEachNonGlobalSubquery(child.get(), f);
|
||||
}
|
||||
|
||||
|
||||
/** Find all (ordinary) tables in any nesting level in AST.
|
||||
*/
|
||||
template <typename F>
|
||||
void forEachTable(IAST * node, F && f)
|
||||
{
|
||||
if (auto * table_expression = node->as<ASTTableExpression>())
|
||||
{
|
||||
auto & database_and_table = table_expression->database_and_table_name;
|
||||
if (database_and_table)
|
||||
f(database_and_table);
|
||||
}
|
||||
|
||||
for (auto & child : node->children)
|
||||
forEachTable(child.get(), f);
|
||||
}
|
||||
|
||||
|
||||
StoragePtr tryGetTable(const ASTPtr & database_and_table, const Context & context)
|
||||
{
|
||||
DatabaseAndTableWithAlias db_and_table(database_and_table);
|
||||
return context.tryGetTable(db_and_table.database, db_and_table.table);
|
||||
}
|
||||
|
||||
using CheckShardsAndTables = InJoinSubqueriesPreprocessor::CheckShardsAndTables;
|
||||
|
||||
struct NonGlobalTableData
|
||||
{
|
||||
using TypeToVisit = ASTTableExpression;
|
||||
|
||||
const CheckShardsAndTables & checker;
|
||||
const Context & context;
|
||||
ASTFunction * function = nullptr;
|
||||
ASTTableJoin * table_join = nullptr;
|
||||
|
||||
void visit(ASTTableExpression & node, ASTPtr &)
|
||||
{
|
||||
ASTPtr & database_and_table = node.database_and_table_name;
|
||||
if (database_and_table)
|
||||
renameIfNeeded(database_and_table);
|
||||
}
|
||||
|
||||
private:
|
||||
void renameIfNeeded(ASTPtr & database_and_table)
|
||||
{
|
||||
const SettingDistributedProductMode distributed_product_mode = context.getSettingsRef().distributed_product_mode;
|
||||
|
||||
StoragePtr storage = tryGetTable(database_and_table, context);
|
||||
if (!storage || !checker.hasAtLeastTwoShards(*storage))
|
||||
return;
|
||||
|
||||
if (distributed_product_mode == DistributedProductMode::DENY)
|
||||
{
|
||||
throw Exception("Double-distributed IN/JOIN subqueries is denied (distributed_product_mode = 'deny')."
|
||||
" You may rewrite query to use local tables in subqueries, or use GLOBAL keyword, or set distributed_product_mode to suitable value.",
|
||||
ErrorCodes::DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED);
|
||||
}
|
||||
else if (distributed_product_mode == DistributedProductMode::GLOBAL)
|
||||
{
|
||||
if (function)
|
||||
{
|
||||
auto * concrete = function->as<ASTFunction>();
|
||||
|
||||
if (concrete->name == "in")
|
||||
concrete->name = "globalIn";
|
||||
else if (concrete->name == "notIn")
|
||||
concrete->name = "globalNotIn";
|
||||
else if (concrete->name == "globalIn" || concrete->name == "globalNotIn")
|
||||
{
|
||||
/// Already processed.
|
||||
}
|
||||
else
|
||||
throw Exception("Logical error: unexpected function name " + concrete->name, ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
else if (table_join)
|
||||
table_join->locality = ASTTableJoin::Locality::Global;
|
||||
else
|
||||
throw Exception("Logical error: unexpected AST node", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
else if (distributed_product_mode == DistributedProductMode::LOCAL)
|
||||
{
|
||||
/// Convert distributed table to corresponding remote table.
|
||||
|
||||
std::string database;
|
||||
std::string table;
|
||||
std::tie(database, table) = checker.getRemoteDatabaseAndTableName(*storage);
|
||||
|
||||
String alias = database_and_table->tryGetAlias();
|
||||
if (alias.empty())
|
||||
throw Exception("Distributed table should have an alias when distributed_product_mode set to local.",
|
||||
ErrorCodes::DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED);
|
||||
|
||||
database_and_table = createTableIdentifier(database, table);
|
||||
database_and_table->setAlias(alias);
|
||||
}
|
||||
else
|
||||
throw Exception("InJoinSubqueriesPreprocessor: unexpected value of 'distributed_product_mode' setting",
|
||||
ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
};
|
||||
|
||||
using NonGlobalTableMatcher = OneTypeMatcher<NonGlobalTableData>;
|
||||
using NonGlobalTableVisitor = InDepthNodeVisitor<NonGlobalTableMatcher, true>;
|
||||
|
||||
|
||||
class NonGlobalSubqueryMatcher
|
||||
{
|
||||
public:
|
||||
struct Data
|
||||
{
|
||||
const CheckShardsAndTables & checker;
|
||||
const Context & context;
|
||||
};
|
||||
|
||||
static void visit(ASTPtr & node, Data & data)
|
||||
{
|
||||
if (auto * function = node->as<ASTFunction>())
|
||||
visit(*function, node, data);
|
||||
if (const auto * tables = node->as<ASTTablesInSelectQueryElement>())
|
||||
visit(*tables, node, data);
|
||||
}
|
||||
|
||||
static bool needChildVisit(ASTPtr & node, const ASTPtr & child)
|
||||
{
|
||||
if (auto * function = node->as<ASTFunction>())
|
||||
if (function->name == "in" || function->name == "notIn")
|
||||
return false; /// Processed, process others
|
||||
|
||||
if (const auto * t = node->as<ASTTablesInSelectQueryElement>())
|
||||
if (t->table_join && t->table_expression)
|
||||
return false; /// Processed, process others
|
||||
|
||||
/// Descent into all children, but not into subqueries of other kind (scalar subqueries), that are irrelevant to us.
|
||||
if (child->as<ASTSelectQuery>())
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
private:
|
||||
static void visit(ASTFunction & node, ASTPtr &, Data & data)
|
||||
{
|
||||
if (node.name == "in" || node.name == "notIn")
|
||||
{
|
||||
auto & subquery = node.arguments->children.at(1);
|
||||
NonGlobalTableVisitor::Data table_data{data.checker, data.context, &node, nullptr};
|
||||
NonGlobalTableVisitor(table_data).visit(subquery);
|
||||
}
|
||||
}
|
||||
|
||||
static void visit(const ASTTablesInSelectQueryElement & node, ASTPtr &, Data & data)
|
||||
{
|
||||
if (!node.table_join || !node.table_expression)
|
||||
return;
|
||||
|
||||
ASTTableJoin * table_join = node.table_join->as<ASTTableJoin>();
|
||||
if (table_join->locality != ASTTableJoin::Locality::Global)
|
||||
{
|
||||
if (auto & subquery = node.table_expression->as<ASTTableExpression>()->subquery)
|
||||
{
|
||||
NonGlobalTableVisitor::Data table_data{data.checker, data.context, nullptr, table_join};
|
||||
NonGlobalTableVisitor(table_data).visit(subquery);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
using NonGlobalSubqueryVisitor = InDepthNodeVisitor<NonGlobalSubqueryMatcher, true>;
|
||||
|
||||
}
|
||||
|
||||
|
||||
void InJoinSubqueriesPreprocessor::process(ASTSelectQuery * query) const
|
||||
void InJoinSubqueriesPreprocessor::visit(ASTPtr & ast) const
|
||||
{
|
||||
if (!query)
|
||||
if (!ast)
|
||||
return;
|
||||
|
||||
const SettingDistributedProductMode distributed_product_mode = context.getSettingsRef().distributed_product_mode;
|
||||
|
||||
if (distributed_product_mode == DistributedProductMode::ALLOW)
|
||||
ASTSelectQuery * query = ast->as<ASTSelectQuery>();
|
||||
if (!query || !query->tables())
|
||||
return;
|
||||
|
||||
if (!query->tables())
|
||||
if (context.getSettingsRef().distributed_product_mode == DistributedProductMode::ALLOW)
|
||||
return;
|
||||
|
||||
const auto & tables_in_select_query = query->tables()->as<ASTTablesInSelectQuery &>();
|
||||
@ -120,65 +206,16 @@ void InJoinSubqueriesPreprocessor::process(ASTSelectQuery * query) const
|
||||
/// If not really distributed table, skip it.
|
||||
{
|
||||
StoragePtr storage = tryGetTable(table_expression->database_and_table_name, context);
|
||||
if (!storage || !hasAtLeastTwoShards(*storage))
|
||||
if (!storage || !checker->hasAtLeastTwoShards(*storage))
|
||||
return;
|
||||
}
|
||||
|
||||
forEachNonGlobalSubquery(query, [&] (IAST * subquery, IAST * function, IAST * table_join)
|
||||
{
|
||||
forEachTable(subquery, [&] (ASTPtr & database_and_table)
|
||||
{
|
||||
StoragePtr storage = tryGetTable(database_and_table, context);
|
||||
|
||||
if (!storage || !hasAtLeastTwoShards(*storage))
|
||||
return;
|
||||
|
||||
if (distributed_product_mode == DistributedProductMode::DENY)
|
||||
{
|
||||
throw Exception("Double-distributed IN/JOIN subqueries is denied (distributed_product_mode = 'deny')."
|
||||
" You may rewrite query to use local tables in subqueries, or use GLOBAL keyword, or set distributed_product_mode to suitable value.",
|
||||
ErrorCodes::DISTRIBUTED_IN_JOIN_SUBQUERY_DENIED);
|
||||
}
|
||||
else if (distributed_product_mode == DistributedProductMode::GLOBAL)
|
||||
{
|
||||
if (function)
|
||||
{
|
||||
auto * concrete = function->as<ASTFunction>();
|
||||
|
||||
if (concrete->name == "in")
|
||||
concrete->name = "globalIn";
|
||||
else if (concrete->name == "notIn")
|
||||
concrete->name = "globalNotIn";
|
||||
else if (concrete->name == "globalIn" || concrete->name == "globalNotIn")
|
||||
{
|
||||
/// Already processed.
|
||||
}
|
||||
else
|
||||
throw Exception("Logical error: unexpected function name " + concrete->name, ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
else if (table_join)
|
||||
table_join->as<ASTTableJoin &>().locality = ASTTableJoin::Locality::Global;
|
||||
else
|
||||
throw Exception("Logical error: unexpected AST node", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
else if (distributed_product_mode == DistributedProductMode::LOCAL)
|
||||
{
|
||||
/// Convert distributed table to corresponding remote table.
|
||||
|
||||
std::string database;
|
||||
std::string table;
|
||||
std::tie(database, table) = getRemoteDatabaseAndTableName(*storage);
|
||||
|
||||
database_and_table = createTableIdentifier(database, table);
|
||||
}
|
||||
else
|
||||
throw Exception("InJoinSubqueriesPreprocessor: unexpected value of 'distributed_product_mode' setting", ErrorCodes::LOGICAL_ERROR);
|
||||
});
|
||||
});
|
||||
NonGlobalSubqueryVisitor::Data visitor_data{*checker, context};
|
||||
NonGlobalSubqueryVisitor(visitor_data).visit(ast);
|
||||
}
|
||||
|
||||
|
||||
bool InJoinSubqueriesPreprocessor::hasAtLeastTwoShards(const IStorage & table) const
|
||||
bool InJoinSubqueriesPreprocessor::CheckShardsAndTables::hasAtLeastTwoShards(const IStorage & table) const
|
||||
{
|
||||
const StorageDistributed * distributed = dynamic_cast<const StorageDistributed *>(&table);
|
||||
if (!distributed)
|
||||
@ -189,7 +226,7 @@ bool InJoinSubqueriesPreprocessor::hasAtLeastTwoShards(const IStorage & table) c
|
||||
|
||||
|
||||
std::pair<std::string, std::string>
|
||||
InJoinSubqueriesPreprocessor::getRemoteDatabaseAndTableName(const IStorage & table) const
|
||||
InJoinSubqueriesPreprocessor::CheckShardsAndTables::getRemoteDatabaseAndTableName(const IStorage & table) const
|
||||
{
|
||||
const StorageDistributed & distributed = dynamic_cast<const StorageDistributed &>(table);
|
||||
return { distributed.getRemoteDatabaseName(), distributed.getRemoteTableName() };
|
||||
|
@ -1,14 +1,15 @@
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <memory>
|
||||
#include <Core/Types.h>
|
||||
#include <Core/SettingsCommon.h>
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class IAST;
|
||||
class IStorage;
|
||||
class ASTSelectQuery;
|
||||
class Context;
|
||||
@ -34,16 +35,26 @@ class Context;
|
||||
class InJoinSubqueriesPreprocessor
|
||||
{
|
||||
public:
|
||||
InJoinSubqueriesPreprocessor(const Context & context) : context(context) {}
|
||||
void process(ASTSelectQuery * query) const;
|
||||
struct CheckShardsAndTables
|
||||
{
|
||||
using Ptr = std::unique_ptr<CheckShardsAndTables>;
|
||||
|
||||
/// These methods could be overriden for the need of the unit test.
|
||||
virtual bool hasAtLeastTwoShards(const IStorage & table) const;
|
||||
virtual std::pair<std::string, std::string> getRemoteDatabaseAndTableName(const IStorage & table) const;
|
||||
virtual ~InJoinSubqueriesPreprocessor() {}
|
||||
/// These methods could be overriden for the need of the unit test.
|
||||
virtual bool hasAtLeastTwoShards(const IStorage & table) const;
|
||||
virtual std::pair<std::string, std::string> getRemoteDatabaseAndTableName(const IStorage & table) const;
|
||||
virtual ~CheckShardsAndTables() {}
|
||||
};
|
||||
|
||||
InJoinSubqueriesPreprocessor(const Context & context, CheckShardsAndTables::Ptr _checker = std::make_unique<CheckShardsAndTables>())
|
||||
: context(context)
|
||||
, checker(std::move(_checker))
|
||||
{}
|
||||
|
||||
void visit(ASTPtr & query) const;
|
||||
|
||||
private:
|
||||
const Context & context;
|
||||
CheckShardsAndTables::Ptr checker;
|
||||
};
|
||||
|
||||
|
||||
|
@ -710,9 +710,8 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
|
||||
(storage ? storage->getColumns().getOrdinary().getNames() : source_columns_list), source_columns_set,
|
||||
result.analyzed_join.columns_from_joined_table);
|
||||
|
||||
/// Depending on the user's profile, check for the execution rights
|
||||
/// distributed subqueries inside the IN or JOIN sections and process these subqueries.
|
||||
InJoinSubqueriesPreprocessor(context).process(select_query);
|
||||
/// Rewrite IN and/or JOIN for distributed tables according to distributed_product_mode setting.
|
||||
InJoinSubqueriesPreprocessor(context).visit(query);
|
||||
|
||||
/// Optimizes logical expressions.
|
||||
LogicalExpressionsOptimizer(select_query, settings.optimize_min_equality_disjunction_chain_length.value).perform();
|
||||
|
@ -52,11 +52,9 @@ private:
|
||||
};
|
||||
|
||||
|
||||
class InJoinSubqueriesPreprocessorMock : public DB::InJoinSubqueriesPreprocessor
|
||||
class CheckShardsAndTablesMock : public DB::InJoinSubqueriesPreprocessor::CheckShardsAndTables
|
||||
{
|
||||
public:
|
||||
using DB::InJoinSubqueriesPreprocessor::InJoinSubqueriesPreprocessor;
|
||||
|
||||
bool hasAtLeastTwoShards(const DB::IStorage & table) const override
|
||||
{
|
||||
if (!table.isRemote())
|
||||
@ -1181,13 +1179,11 @@ TestResult check(const TestEntry & entry)
|
||||
if (!parse(ast_input, entry.input))
|
||||
return TestResult(false, "parse error");
|
||||
|
||||
auto select_query = typeid_cast<DB::ASTSelectQuery *>(&*ast_input);
|
||||
|
||||
bool success = true;
|
||||
|
||||
try
|
||||
{
|
||||
InJoinSubqueriesPreprocessorMock(context).process(select_query);
|
||||
DB::InJoinSubqueriesPreprocessor(context, std::make_unique<CheckShardsAndTablesMock>()).visit(ast_input);
|
||||
}
|
||||
catch (const DB::Exception & ex)
|
||||
{
|
||||
|
@ -0,0 +1 @@
|
||||
1
|
49
dbms/tests/queries/0_stateless/00858_issue_4756.sql
Normal file
49
dbms/tests/queries/0_stateless/00858_issue_4756.sql
Normal file
@ -0,0 +1,49 @@
|
||||
set distributed_product_mode = 'local';
|
||||
|
||||
use test;
|
||||
drop table if exists shard1;
|
||||
drop table if exists shard2;
|
||||
drop table if exists distr1;
|
||||
drop table if exists distr2;
|
||||
|
||||
create table test.shard1 (id Int32) engine = MergeTree order by cityHash64(id);
|
||||
create table test.shard2 (id Int32) engine = MergeTree order by cityHash64(id);
|
||||
|
||||
create table test.distr1 as shard1 engine Distributed (test_cluster_two_shards_localhost, test, shard1, cityHash64(id));
|
||||
create table test.distr2 as shard2 engine Distributed (test_cluster_two_shards_localhost, test, shard2, cityHash64(id));
|
||||
|
||||
insert into shard1 (id) values (0), (1);
|
||||
insert into shard2 (id) values (1), (2);
|
||||
|
||||
select distinct(test.distr1.id) from test.distr1
|
||||
where test.distr1.id in
|
||||
(
|
||||
select test.distr1.id
|
||||
from test.distr1
|
||||
join test.distr2 on test.distr1.id = test.distr2.id
|
||||
where test.distr1.id > 0
|
||||
); -- { serverError 288 }
|
||||
|
||||
select distinct(d0.id) from distr1 d0
|
||||
where d0.id in
|
||||
(
|
||||
select d1.id
|
||||
from distr1 as d1
|
||||
join distr2 as d2 on d1.id = d2.id
|
||||
where d1.id > 0
|
||||
);
|
||||
|
||||
-- TODO
|
||||
--select distinct(test.distr1.id) from test.distr1
|
||||
--where test.distr1.id in
|
||||
--(
|
||||
-- select test.distr1.id
|
||||
-- from test.distr1 as d1
|
||||
-- join test.distr2 as d2 on test.distr1.id = test.distr2.id
|
||||
-- where test.distr1.id > 0
|
||||
--);
|
||||
|
||||
drop table shard1;
|
||||
drop table shard2;
|
||||
drop table distr1;
|
||||
drop table distr2;
|
Loading…
Reference in New Issue
Block a user