From 400ad557549d82be1146beca3f7bedf353653fb9 Mon Sep 17 00:00:00 2001 From: Vitaliy Lyudvichenko Date: Tue, 17 Apr 2018 22:33:58 +0300 Subject: [PATCH] Support allow_databases in distributed DDL. [#CLICKHOUSE-3] Resolves #2189 --- dbms/src/Interpreters/Context.cpp | 30 ++++++++++------- dbms/src/Interpreters/Context.h | 3 +- dbms/src/Interpreters/DDLWorker.cpp | 33 +++++++++++++++++-- dbms/src/Interpreters/DDLWorker.h | 3 +- .../Interpreters/InterpreterAlterQuery.cpp | 2 +- .../Interpreters/InterpreterCreateQuery.cpp | 10 ++++-- .../src/Interpreters/InterpreterDropQuery.cpp | 2 +- .../Interpreters/InterpreterRenameQuery.cpp | 11 ++++++- .../configs/users.d/restricted_user.xml | 16 +++++++++ .../integration/test_distributed_ddl/test.py | 18 ++++++++++ 10 files changed, 107 insertions(+), 21 deletions(-) create mode 100644 dbms/tests/integration/test_distributed_ddl/configs/users.d/restricted_user.xml diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index b0bf8f6f441..2e10acf4c73 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -588,6 +588,12 @@ QuotaForIntervals & Context::getQuota() } void Context::checkDatabaseAccessRights(const std::string & database_name) const +{ + auto lock = getLock(); + checkDatabaseAccessRightsImpl(database_name); +} + +void Context::checkDatabaseAccessRightsImpl(const std::string & database_name) const { if (client_info.current_user.empty() || (database_name == "system")) { @@ -602,8 +608,8 @@ void Context::checkDatabaseAccessRights(const std::string & database_name) const void Context::addDependency(const DatabaseAndTableName & from, const DatabaseAndTableName & where) { auto lock = getLock(); - checkDatabaseAccessRights(from.first); - checkDatabaseAccessRights(where.first); + checkDatabaseAccessRightsImpl(from.first); + checkDatabaseAccessRightsImpl(where.first); shared->view_dependencies[from].insert(where); // Notify table of dependencies change @@ -615,8 +621,8 @@ void Context::addDependency(const DatabaseAndTableName & from, const DatabaseAnd void Context::removeDependency(const DatabaseAndTableName & from, const DatabaseAndTableName & where) { auto lock = getLock(); - checkDatabaseAccessRights(from.first); - checkDatabaseAccessRights(where.first); + checkDatabaseAccessRightsImpl(from.first); + checkDatabaseAccessRightsImpl(where.first); shared->view_dependencies[from].erase(where); // Notify table of dependencies change @@ -637,7 +643,7 @@ Dependencies Context::getDependencies(const String & database_name, const String } else { - checkDatabaseAccessRights(db); + checkDatabaseAccessRightsImpl(db); } ViewDependencies::const_iterator iter = shared->view_dependencies.find(DatabaseAndTableName(db, table_name)); @@ -652,7 +658,7 @@ bool Context::isTableExist(const String & database_name, const String & table_na auto lock = getLock(); String db = resolveDatabase(database_name, current_database); - checkDatabaseAccessRights(db); + checkDatabaseAccessRightsImpl(db); Databases::const_iterator it = shared->databases.find(db); return shared->databases.end() != it @@ -664,7 +670,7 @@ bool Context::isDatabaseExist(const String & database_name) const { auto lock = getLock(); String db = resolveDatabase(database_name, current_database); - checkDatabaseAccessRights(db); + checkDatabaseAccessRightsImpl(db); return shared->databases.end() != shared->databases.find(db); } @@ -679,7 +685,7 @@ void Context::assertTableExists(const String & database_name, const String & tab auto lock = getLock(); String db = resolveDatabase(database_name, current_database); - checkDatabaseAccessRights(db); + checkDatabaseAccessRightsImpl(db); Databases::const_iterator it = shared->databases.find(db); if (shared->databases.end() == it) @@ -696,7 +702,7 @@ void Context::assertTableDoesntExist(const String & database_name, const String String db = resolveDatabase(database_name, current_database); if (check_database_access_rights) - checkDatabaseAccessRights(db); + checkDatabaseAccessRightsImpl(db); Databases::const_iterator it = shared->databases.find(db); if (shared->databases.end() != it && it->second->isTableExist(*this, table_name)) @@ -710,7 +716,7 @@ void Context::assertDatabaseExists(const String & database_name, bool check_data String db = resolveDatabase(database_name, current_database); if (check_database_access_rights) - checkDatabaseAccessRights(db); + checkDatabaseAccessRightsImpl(db); if (shared->databases.end() == shared->databases.find(db)) throw Exception("Database " + backQuoteIfNeed(db) + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE); @@ -722,7 +728,7 @@ void Context::assertDatabaseDoesntExist(const String & database_name) const auto lock = getLock(); String db = resolveDatabase(database_name, current_database); - checkDatabaseAccessRights(db); + checkDatabaseAccessRightsImpl(db); if (shared->databases.end() != shared->databases.find(db)) throw Exception("Database " + backQuoteIfNeed(db) + " already exists.", ErrorCodes::DATABASE_ALREADY_EXISTS); @@ -789,7 +795,7 @@ StoragePtr Context::getTableImpl(const String & database_name, const String & ta } String db = resolveDatabase(database_name, current_database); - checkDatabaseAccessRights(db); + checkDatabaseAccessRightsImpl(db); Databases::const_iterator it = shared->databases.find(db); if (shared->databases.end() == it) diff --git a/dbms/src/Interpreters/Context.h b/dbms/src/Interpreters/Context.h index 670bda401bf..69f18c913b0 100644 --- a/dbms/src/Interpreters/Context.h +++ b/dbms/src/Interpreters/Context.h @@ -178,6 +178,7 @@ public: void assertDatabaseExists(const String & database_name, bool check_database_acccess_rights = true) const; void assertDatabaseDoesntExist(const String & database_name) const; + void checkDatabaseAccessRights(const std::string & database_name) const; Tables getExternalTables() const; StoragePtr tryGetExternalTable(const String & table_name) const; @@ -392,7 +393,7 @@ private: * If access is denied, throw an exception. * NOTE: This method should always be called when the `shared->mutex` mutex is acquired. */ - void checkDatabaseAccessRights(const std::string & database_name) const; + void checkDatabaseAccessRightsImpl(const std::string & database_name) const; EmbeddedDictionaries & getEmbeddedDictionariesImpl(bool throw_on_error) const; ExternalDictionaries & getExternalDictionariesImpl(bool throw_on_error) const; diff --git a/dbms/src/Interpreters/DDLWorker.cpp b/dbms/src/Interpreters/DDLWorker.cpp index 5a820ff7334..c8bdd67ce2a 100644 --- a/dbms/src/Interpreters/DDLWorker.cpp +++ b/dbms/src/Interpreters/DDLWorker.cpp @@ -960,15 +960,25 @@ public: { Block res; if (num_hosts_finished >= waiting_hosts.size()) + { + if (first_exception) + throw Exception(*first_exception); + return res; + } auto zookeeper = context.getZooKeeper(); size_t try_number = 0; - while(res.rows() == 0) + while (res.rows() == 0) { if (isCancelled()) + { + if (first_exception) + throw Exception(*first_exception); + return res; + } if (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds) { @@ -1020,6 +1030,9 @@ public: UInt16 port; Cluster::Address::fromString(host_id, host, port); + if (status.code != 0 && first_exception == nullptr) + first_exception = std::make_unique("There was an error on " + host + ": " + status.message, status.code); + ++num_hosts_finished; columns[0]->insert(host); @@ -1092,11 +1105,14 @@ private: Strings current_active_hosts; /// Hosts that were in active state at the last check size_t num_hosts_finished = 0; + /// Save the first detected error and throw it at the end of excecution + std::unique_ptr first_exception; + Int64 timeout_seconds = 120; }; -BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & context) +BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & context, const NameSet & query_databases) { /// Remove FORMAT and INTO OUTFILE if exists ASTPtr query_ptr = query_ptr_->clone(); @@ -1128,13 +1144,26 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont entry.query = queryToString(query_ptr); entry.initiator = ddl_worker.getCommonHostID(); + /// Check database access rights, assume that all servers have the same users config + NameSet databases_to_check_access_rights; + Cluster::AddressesWithFailover shards = cluster->getShardsAddresses(); + for (const auto & shard : shards) { for (const auto & addr : shard) + { entry.hosts.emplace_back(addr); + + /// Expand empty database name to shards' default database name + for (const String & database : query_databases) + databases_to_check_access_rights.emplace(database.empty() ? addr.default_database : database); + } } + for (const String & database : databases_to_check_access_rights) + context.checkDatabaseAccessRights(database.empty() ? context.getCurrentDatabase() : database); + String node_path = ddl_worker.enqueueQuery(entry); BlockIO io; diff --git a/dbms/src/Interpreters/DDLWorker.h b/dbms/src/Interpreters/DDLWorker.h index f9c296d373a..d640b6d0bc8 100644 --- a/dbms/src/Interpreters/DDLWorker.h +++ b/dbms/src/Interpreters/DDLWorker.h @@ -18,7 +18,8 @@ struct DDLLogEntry; struct DDLTask; -BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context); +/// Pushes distributed DDL query to the queue +BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, const NameSet & query_databases); class DDLWorker diff --git a/dbms/src/Interpreters/InterpreterAlterQuery.cpp b/dbms/src/Interpreters/InterpreterAlterQuery.cpp index f4708a67c3d..bc7861ad41c 100644 --- a/dbms/src/Interpreters/InterpreterAlterQuery.cpp +++ b/dbms/src/Interpreters/InterpreterAlterQuery.cpp @@ -42,7 +42,7 @@ BlockIO InterpreterAlterQuery::execute() auto & alter = typeid_cast(*query_ptr); if (!alter.cluster.empty()) - return executeDDLQueryOnCluster(query_ptr, context); + return executeDDLQueryOnCluster(query_ptr, context, {alter.table}); const String & table_name = alter.table; String database_name = alter.database.empty() ? context.getCurrentDatabase() : alter.database; diff --git a/dbms/src/Interpreters/InterpreterCreateQuery.cpp b/dbms/src/Interpreters/InterpreterCreateQuery.cpp index 455217a5e40..99f0efc10c9 100644 --- a/dbms/src/Interpreters/InterpreterCreateQuery.cpp +++ b/dbms/src/Interpreters/InterpreterCreateQuery.cpp @@ -66,7 +66,7 @@ InterpreterCreateQuery::InterpreterCreateQuery(const ASTPtr & query_ptr_, Contex BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create) { if (!create.cluster.empty()) - return executeDDLQueryOnCluster(query_ptr, context); + return executeDDLQueryOnCluster(query_ptr, context, {create.database}); String database_name = create.database; @@ -439,7 +439,13 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create) { if (!create.cluster.empty()) - return executeDDLQueryOnCluster(query_ptr, context); + { + NameSet databases{create.database}; + if (!create.to_table.empty()) + databases.emplace(create.to_database); + + return executeDDLQueryOnCluster(query_ptr, context, databases); + } String path = context.getPath(); String current_database = context.getCurrentDatabase(); diff --git a/dbms/src/Interpreters/InterpreterDropQuery.cpp b/dbms/src/Interpreters/InterpreterDropQuery.cpp index 0fdf2b1ccf4..839b714a499 100644 --- a/dbms/src/Interpreters/InterpreterDropQuery.cpp +++ b/dbms/src/Interpreters/InterpreterDropQuery.cpp @@ -32,7 +32,7 @@ BlockIO InterpreterDropQuery::execute() checkAccess(drop); if (!drop.cluster.empty()) - return executeDDLQueryOnCluster(query_ptr, context); + return executeDDLQueryOnCluster(query_ptr, context, {drop.database}); String path = context.getPath(); String current_database = context.getCurrentDatabase(); diff --git a/dbms/src/Interpreters/InterpreterRenameQuery.cpp b/dbms/src/Interpreters/InterpreterRenameQuery.cpp index 00aa95ee6fb..d241e620455 100644 --- a/dbms/src/Interpreters/InterpreterRenameQuery.cpp +++ b/dbms/src/Interpreters/InterpreterRenameQuery.cpp @@ -39,7 +39,16 @@ BlockIO InterpreterRenameQuery::execute() ASTRenameQuery & rename = typeid_cast(*query_ptr); if (!rename.cluster.empty()) - return executeDDLQueryOnCluster(query_ptr, context); + { + NameSet databases; + for (const auto & elem : rename.elements) + { + databases.emplace(elem.from.database); + databases.emplace(elem.to.database); + } + + return executeDDLQueryOnCluster(query_ptr, context, databases); + } String path = context.getPath(); String current_database = context.getCurrentDatabase(); diff --git a/dbms/tests/integration/test_distributed_ddl/configs/users.d/restricted_user.xml b/dbms/tests/integration/test_distributed_ddl/configs/users.d/restricted_user.xml new file mode 100644 index 00000000000..5b6084eea7b --- /dev/null +++ b/dbms/tests/integration/test_distributed_ddl/configs/users.d/restricted_user.xml @@ -0,0 +1,16 @@ + + + + + default + default + + ::/0 + + + + db1 + + + + diff --git a/dbms/tests/integration/test_distributed_ddl/test.py b/dbms/tests/integration/test_distributed_ddl/test.py index 8b7e46443d5..8621f723ac1 100755 --- a/dbms/tests/integration/test_distributed_ddl/test.py +++ b/dbms/tests/integration/test_distributed_ddl/test.py @@ -315,6 +315,24 @@ def test_macro(started_cluster): ddl_check_query(instance, "DROP TABLE IF EXISTS distr ON CLUSTER '{cluster}'") ddl_check_query(instance, "DROP TABLE IF EXISTS tab ON CLUSTER '{cluster}'") + +def test_allowed_databases(started_cluster): + instance = cluster.instances['ch2'] + instance.query("CREATE DATABASE IF NOT EXISTS db1 ON CLUSTER cluster") + instance.query("CREATE DATABASE IF NOT EXISTS db2 ON CLUSTER cluster") + + instance.query("CREATE TABLE db1.t1 ON CLUSTER cluster (i Int8) ENGINE = Memory", settings={"user" : "restricted_user"}) + + with pytest.raises(Exception): + instance.query("CREATE TABLE db2.t2 ON CLUSTER cluster (i Int8) ENGINE = Memory", settings={"user" : "restricted_user"}) + with pytest.raises(Exception): + instance.query("CREATE TABLE t3 ON CLUSTER cluster (i Int8) ENGINE = Memory", settings={"user" : "restricted_user"}) + with pytest.raises(Exception): + instance.query("DROP DATABASE db2 ON CLUSTER cluster", settings={"user" : "restricted_user"}) + + instance.query("DROP DATABASE db1 ON CLUSTER cluster", settings={"user" : "restricted_user"}) + + if __name__ == '__main__': with contextmanager(started_cluster)() as cluster: for name, instance in cluster.instances.items():