Merge pull request #33611 from azat/dist-current-database

This commit is contained in:
Vladimir C 2022-01-14 14:19:42 +03:00 committed by GitHub
commit 1254225061
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 84 additions and 18 deletions

View File

@ -19,6 +19,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/misc.h>
#include <set>
namespace DB
{
@ -33,11 +34,21 @@ public:
explicit AddDefaultDatabaseVisitor(
ContextPtr context_,
const String & database_name_,
bool only_replace_current_database_function_ = false)
bool only_replace_current_database_function_ = false,
bool only_replace_in_join_ = false)
: context(context_)
, database_name(database_name_)
, only_replace_current_database_function(only_replace_current_database_function_)
{}
, only_replace_in_join(only_replace_in_join_)
{
if (!context->isGlobalContext())
{
for (const auto & [table_name, _ /* storage */] : context->getExternalTables())
{
external_tables.insert(table_name);
}
}
}
void visitDDL(ASTPtr & ast) const
{
@ -81,8 +92,10 @@ private:
ContextPtr context;
const String database_name;
std::set<String> external_tables;
bool only_replace_current_database_function = false;
bool only_replace_in_join = false;
void visit(ASTSelectWithUnionQuery & select, ASTPtr &) const
{
@ -124,6 +137,9 @@ private:
void visit(ASTTablesInSelectQueryElement & tables_element, ASTPtr &) const
{
if (only_replace_in_join && !tables_element.table_join)
return;
if (tables_element.table_expression)
tryVisit<ASTTableExpression>(tables_element.table_expression);
}
@ -138,13 +154,17 @@ private:
void visit(const ASTTableIdentifier & identifier, ASTPtr & ast) const
{
if (!identifier.compound())
{
auto qualified_identifier = std::make_shared<ASTTableIdentifier>(database_name, identifier.name());
if (!identifier.alias.empty())
qualified_identifier->setAlias(identifier.alias);
ast = qualified_identifier;
}
/// Already has database.
if (identifier.compound())
return;
/// There is temporary table with such name, should not be rewritten.
if (external_tables.count(identifier.shortName()))
return;
auto qualified_identifier = std::make_shared<ASTTableIdentifier>(database_name, identifier.name());
if (!identifier.alias.empty())
qualified_identifier->setAlias(identifier.alias);
ast = qualified_identifier;
}
void visit(ASTSubquery & subquery, ASTPtr &) const

View File

@ -363,8 +363,7 @@ void RestoreQualifiedNamesMatcher::visit(ASTIdentifier & identifier, ASTPtr &, D
if (IdentifierSemantic::getMembership(identifier))
{
identifier.restoreTable(); // TODO(ilezhankin): should restore qualified name here - why exactly here?
if (data.rename)
data.changeTable(identifier);
data.changeTable(identifier);
}
}
}

View File

@ -67,7 +67,6 @@ struct RestoreQualifiedNamesMatcher
{
DatabaseAndTableWithAlias distributed_table;
DatabaseAndTableWithAlias remote_table;
bool rename = false;
void changeTable(ASTIdentifier & identifier) const;
};

View File

@ -46,6 +46,7 @@
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/JoinedTables.h>
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/AddDefaultDatabaseVisitor.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/Context.h>
#include <Interpreters/createBlockSelector.h>
@ -126,7 +127,12 @@ namespace
/// select query has database, table and table function names as AST pointers
/// Creates a copy of query, changes database, table and table function names.
ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, const std::string & table, ASTPtr table_function_ptr = nullptr)
ASTPtr rewriteSelectQuery(
ContextPtr context,
const ASTPtr & query,
const std::string & remote_database,
const std::string & remote_table,
ASTPtr table_function_ptr = nullptr)
{
auto modified_query_ast = query->clone();
@ -140,7 +146,7 @@ ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, co
if (table_function_ptr)
select_query.addTableFunction(table_function_ptr);
else
select_query.replaceDatabaseAndTable(database, table);
select_query.replaceDatabaseAndTable(remote_database, remote_table);
/// Restore long column names (cause our short names are ambiguous).
/// TODO: aliased table functions & CREATE TABLE AS table function cases
@ -148,12 +154,20 @@ ASTPtr rewriteSelectQuery(const ASTPtr & query, const std::string & database, co
{
RestoreQualifiedNamesVisitor::Data data;
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query->as<ASTSelectQuery &>(), 0));
data.remote_table.database = database;
data.remote_table.table = table;
data.rename = true;
data.remote_table.database = remote_database;
data.remote_table.table = remote_table;
RestoreQualifiedNamesVisitor(data).visit(modified_query_ast);
}
/// To make local JOIN works, default database should be added to table names.
/// But only for JOIN section, since the following should work using default_database:
/// - SELECT * FROM d WHERE value IN (SELECT l.value FROM l) ORDER BY value
/// (see 01487_distributed_in_not_default_db)
AddDefaultDatabaseVisitor visitor(context, context->getCurrentDatabase(),
/* only_replace_current_database_function_= */false,
/* only_replace_in_join_= */true);
visitor.visit(modified_query_ast);
return modified_query_ast;
}
@ -601,7 +615,8 @@ void StorageDistributed::read(
throw Exception(ErrorCodes::ILLEGAL_FINAL, "Final modifier is not allowed together with parallel reading from replicas feature");
const auto & modified_query_ast = rewriteSelectQuery(
query_info.query, remote_database, remote_table, remote_table_function_ptr);
local_context, query_info.query,
remote_database, remote_table, remote_table_function_ptr);
Block header =
InterpreterSelectQuery(query_info.query, local_context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();

View File

@ -0,0 +1,14 @@
-- { echoOn }
select * from dist_02175 l join local_02175 r using dummy;
0
0
select * from dist_02175 l global join local_02175 r using dummy;
0
0
-- explicit database for distributed table
select * from remote('127.1', currentDatabase(), dist_02175) l join local_02175 r using dummy;
0
0
select * from remote('127.1', currentDatabase(), dist_02175) l global join local_02175 r using dummy;
0
0

View File

@ -0,0 +1,19 @@
-- Tags: shard
drop table if exists local_02175;
drop table if exists dist_02175;
create table local_02175 engine=Memory() as select * from system.one;
create table dist_02175 as local_02175 engine=Distributed(test_cluster_two_shards, currentDatabase(), local_02175);
-- { echoOn }
select * from dist_02175 l join local_02175 r using dummy;
select * from dist_02175 l global join local_02175 r using dummy;
-- explicit database for distributed table
select * from remote('127.1', currentDatabase(), dist_02175) l join local_02175 r using dummy;
select * from remote('127.1', currentDatabase(), dist_02175) l global join local_02175 r using dummy;
-- { echoOff }
drop table local_02175;
drop table dist_02175;