Fix currentDatabase function cannot be used in ON CLUSTER ddl query. (#14211)

This commit is contained in:
Winter Zhang 2020-09-09 17:58:59 +08:00 committed by GitHub
parent 32135d96f9
commit 3d1d64ec60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 93 additions and 8 deletions

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTQueryWithTableAndOutput.h> #include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/ASTRenameQuery.h> #include <Parsers/ASTRenameQuery.h>
#include <Parsers/ASTIdentifier.h> #include <Parsers/ASTIdentifier.h>
@ -23,8 +24,9 @@ namespace DB
class AddDefaultDatabaseVisitor class AddDefaultDatabaseVisitor
{ {
public: public:
AddDefaultDatabaseVisitor(const String & database_name_, std::ostream * ostr_ = nullptr) AddDefaultDatabaseVisitor(const String & database_name_, bool only_replace_current_database_function_ = false, std::ostream * ostr_ = nullptr)
: database_name(database_name_), : database_name(database_name_),
only_replace_current_database_function(only_replace_current_database_function_),
visit_depth(0), visit_depth(0),
ostr(ostr_) ostr(ostr_)
{} {}
@ -34,7 +36,8 @@ public:
visitDDLChildren(ast); visitDDLChildren(ast);
if (!tryVisitDynamicCast<ASTQueryWithTableAndOutput>(ast) && if (!tryVisitDynamicCast<ASTQueryWithTableAndOutput>(ast) &&
!tryVisitDynamicCast<ASTRenameQuery>(ast)) !tryVisitDynamicCast<ASTRenameQuery>(ast) &&
!tryVisitDynamicCast<ASTFunction>(ast))
{} {}
} }
@ -60,6 +63,7 @@ public:
private: private:
const String database_name; const String database_name;
bool only_replace_current_database_function = false;
mutable size_t visit_depth; mutable size_t visit_depth;
std::ostream * ostr; std::ostream * ostr;
@ -164,12 +168,18 @@ private:
void visitDDL(ASTQueryWithTableAndOutput & node, ASTPtr &) const void visitDDL(ASTQueryWithTableAndOutput & node, ASTPtr &) const
{ {
if (only_replace_current_database_function)
return;
if (node.database.empty()) if (node.database.empty())
node.database = database_name; node.database = database_name;
} }
void visitDDL(ASTRenameQuery & node, ASTPtr &) const void visitDDL(ASTRenameQuery & node, ASTPtr &) const
{ {
if (only_replace_current_database_function)
return;
for (ASTRenameQuery::Element & elem : node.elements) for (ASTRenameQuery::Element & elem : node.elements)
{ {
if (elem.from.database.empty()) if (elem.from.database.empty())
@ -179,6 +189,15 @@ private:
} }
} }
void visitDDL(ASTFunction & function, ASTPtr & node) const
{
if (function.name == "currentDatabase")
{
node = std::make_shared<ASTLiteral>(database_name);
return;
}
}
void visitDDLChildren(ASTPtr & ast) const void visitDDLChildren(ASTPtr & ast) const
{ {
for (auto & child : ast->children) for (auto & child : ast->children)

View File

@ -1434,9 +1434,11 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont
[](const AccessRightsElement & elem) { return elem.isEmptyDatabase(); }) [](const AccessRightsElement & elem) { return elem.isEmptyDatabase(); })
!= query_requires_access.end()); != query_requires_access.end());
bool use_local_default_database = false;
const String & current_database = context.getCurrentDatabase();
if (need_replace_current_database) if (need_replace_current_database)
{ {
bool use_local_default_database = false;
Strings shard_default_databases; Strings shard_default_databases;
for (const auto & shard : shards) for (const auto & shard : shards)
{ {
@ -1457,10 +1459,6 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont
if (use_local_default_database) if (use_local_default_database)
{ {
const String & current_database = context.getCurrentDatabase();
AddDefaultDatabaseVisitor visitor(current_database);
visitor.visitDDL(query_ptr);
query_requires_access.replaceEmptyDatabase(current_database); query_requires_access.replaceEmptyDatabase(current_database);
} }
else else
@ -1481,6 +1479,9 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont
} }
} }
AddDefaultDatabaseVisitor visitor(current_database, !use_local_default_database);
visitor.visitDDL(query_ptr);
/// Check access rights, assume that all servers have the same users config /// Check access rights, assume that all servers have the same users config
if (query_requires_grant_option) if (query_requires_grant_option)
context.getAccess()->checkGrantOption(query_requires_access); context.getAccess()->checkGrantOption(query_requires_access);

View File

@ -48,7 +48,7 @@ StorageID extractDependentTableFromSelectQuery(ASTSelectQuery & query, const Con
{ {
if (add_default_db) if (add_default_db)
{ {
AddDefaultDatabaseVisitor visitor(context.getCurrentDatabase(), nullptr); AddDefaultDatabaseVisitor visitor(context.getCurrentDatabase(), false, nullptr);
visitor.visit(query); visitor.visit(query);
} }

View File

@ -0,0 +1,28 @@
<yandex>
<remote_servers>
<cluster>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch1</host>
<port>9000</port>
</replica>
<replica>
<host>ch2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>true</internal_replication>
<replica>
<host>ch3</host>
<port>9000</port>
</replica>
<replica>
<host>ch4</host>
<port>9000</port>
</replica>
</shard>
</cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,5 @@
<yandex>
<distributed_ddl>
<path>/clickhouse/task_queue/ddl</path>
</distributed_ddl>
</yandex>

View File

@ -0,0 +1,32 @@
import time
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
ch1 = cluster.add_instance('ch1', config_dir="configs", with_zookeeper=True)
ch2 = cluster.add_instance('ch2', config_dir="configs", with_zookeeper=True)
ch3 = cluster.add_instance('ch3', config_dir="configs", with_zookeeper=True)
ch4 = cluster.add_instance('ch4', config_dir="configs", with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
ch1.query("CREATE DATABASE test_default_database ON CLUSTER 'cluster';")
yield cluster
finally:
cluster.shutdown()
def test_default_database_on_cluster(started_cluster):
ch1.query(database='test_default_database', sql="CREATE TABLE test_local_table ON CLUSTER 'cluster' (column UInt8) ENGINE = Memory;")
for node in [ch1, ch2, ch3, ch4]:
assert node.query("SHOW TABLES FROM test_default_database FORMAT TSV") == "test_local_table\n"
ch1.query(database='test_default_database', sql="CREATE TABLE test_distributed_table ON CLUSTER 'cluster' (column UInt8) ENGINE = Distributed(cluster, currentDatabase(), 'test_local_table');")
for node in [ch1, ch2, ch3, ch4]:
assert node.query("SHOW TABLES FROM test_default_database FORMAT TSV") == "test_distributed_table\ntest_local_table\n"
assert node.query("SHOW CREATE TABLE test_default_database.test_distributed_table FORMAT TSV") == "CREATE TABLE test_default_database.test_distributed_table\\n(\\n `column` UInt8\\n)\\nENGINE = Distributed(\\'cluster\\', \\'test_default_database\\', \\'test_local_table\\')\n"