Support allow_databases in distributed DDL. [#CLICKHOUSE-3]

Resolves #2189
This commit is contained in:
Vitaliy Lyudvichenko 2018-04-17 22:33:58 +03:00
parent bf832b3ea6
commit 400ad55754
10 changed files with 107 additions and 21 deletions

View File

@ -588,6 +588,12 @@ QuotaForIntervals & Context::getQuota()
}
void Context::checkDatabaseAccessRights(const std::string & database_name) const
{
auto lock = getLock();
checkDatabaseAccessRightsImpl(database_name);
}
void Context::checkDatabaseAccessRightsImpl(const std::string & database_name) const
{
if (client_info.current_user.empty() || (database_name == "system"))
{
@ -602,8 +608,8 @@ void Context::checkDatabaseAccessRights(const std::string & database_name) const
void Context::addDependency(const DatabaseAndTableName & from, const DatabaseAndTableName & where)
{
auto lock = getLock();
checkDatabaseAccessRights(from.first);
checkDatabaseAccessRights(where.first);
checkDatabaseAccessRightsImpl(from.first);
checkDatabaseAccessRightsImpl(where.first);
shared->view_dependencies[from].insert(where);
// Notify table of dependencies change
@ -615,8 +621,8 @@ void Context::addDependency(const DatabaseAndTableName & from, const DatabaseAnd
void Context::removeDependency(const DatabaseAndTableName & from, const DatabaseAndTableName & where)
{
auto lock = getLock();
checkDatabaseAccessRights(from.first);
checkDatabaseAccessRights(where.first);
checkDatabaseAccessRightsImpl(from.first);
checkDatabaseAccessRightsImpl(where.first);
shared->view_dependencies[from].erase(where);
// Notify table of dependencies change
@ -637,7 +643,7 @@ Dependencies Context::getDependencies(const String & database_name, const String
}
else
{
checkDatabaseAccessRights(db);
checkDatabaseAccessRightsImpl(db);
}
ViewDependencies::const_iterator iter = shared->view_dependencies.find(DatabaseAndTableName(db, table_name));
@ -652,7 +658,7 @@ bool Context::isTableExist(const String & database_name, const String & table_na
auto lock = getLock();
String db = resolveDatabase(database_name, current_database);
checkDatabaseAccessRights(db);
checkDatabaseAccessRightsImpl(db);
Databases::const_iterator it = shared->databases.find(db);
return shared->databases.end() != it
@ -664,7 +670,7 @@ bool Context::isDatabaseExist(const String & database_name) const
{
auto lock = getLock();
String db = resolveDatabase(database_name, current_database);
checkDatabaseAccessRights(db);
checkDatabaseAccessRightsImpl(db);
return shared->databases.end() != shared->databases.find(db);
}
@ -679,7 +685,7 @@ void Context::assertTableExists(const String & database_name, const String & tab
auto lock = getLock();
String db = resolveDatabase(database_name, current_database);
checkDatabaseAccessRights(db);
checkDatabaseAccessRightsImpl(db);
Databases::const_iterator it = shared->databases.find(db);
if (shared->databases.end() == it)
@ -696,7 +702,7 @@ void Context::assertTableDoesntExist(const String & database_name, const String
String db = resolveDatabase(database_name, current_database);
if (check_database_access_rights)
checkDatabaseAccessRights(db);
checkDatabaseAccessRightsImpl(db);
Databases::const_iterator it = shared->databases.find(db);
if (shared->databases.end() != it && it->second->isTableExist(*this, table_name))
@ -710,7 +716,7 @@ void Context::assertDatabaseExists(const String & database_name, bool check_data
String db = resolveDatabase(database_name, current_database);
if (check_database_access_rights)
checkDatabaseAccessRights(db);
checkDatabaseAccessRightsImpl(db);
if (shared->databases.end() == shared->databases.find(db))
throw Exception("Database " + backQuoteIfNeed(db) + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
@ -722,7 +728,7 @@ void Context::assertDatabaseDoesntExist(const String & database_name) const
auto lock = getLock();
String db = resolveDatabase(database_name, current_database);
checkDatabaseAccessRights(db);
checkDatabaseAccessRightsImpl(db);
if (shared->databases.end() != shared->databases.find(db))
throw Exception("Database " + backQuoteIfNeed(db) + " already exists.", ErrorCodes::DATABASE_ALREADY_EXISTS);
@ -789,7 +795,7 @@ StoragePtr Context::getTableImpl(const String & database_name, const String & ta
}
String db = resolveDatabase(database_name, current_database);
checkDatabaseAccessRights(db);
checkDatabaseAccessRightsImpl(db);
Databases::const_iterator it = shared->databases.find(db);
if (shared->databases.end() == it)

View File

@ -178,6 +178,7 @@ public:
void assertDatabaseExists(const String & database_name, bool check_database_acccess_rights = true) const;
void assertDatabaseDoesntExist(const String & database_name) const;
void checkDatabaseAccessRights(const std::string & database_name) const;
Tables getExternalTables() const;
StoragePtr tryGetExternalTable(const String & table_name) const;
@ -392,7 +393,7 @@ private:
* If access is denied, throw an exception.
* NOTE: This method should always be called when the `shared->mutex` mutex is acquired.
*/
void checkDatabaseAccessRights(const std::string & database_name) const;
void checkDatabaseAccessRightsImpl(const std::string & database_name) const;
EmbeddedDictionaries & getEmbeddedDictionariesImpl(bool throw_on_error) const;
ExternalDictionaries & getExternalDictionariesImpl(bool throw_on_error) const;

View File

@ -960,15 +960,25 @@ public:
{
Block res;
if (num_hosts_finished >= waiting_hosts.size())
{
if (first_exception)
throw Exception(*first_exception);
return res;
}
auto zookeeper = context.getZooKeeper();
size_t try_number = 0;
while(res.rows() == 0)
while (res.rows() == 0)
{
if (isCancelled())
{
if (first_exception)
throw Exception(*first_exception);
return res;
}
if (timeout_seconds >= 0 && watch.elapsedSeconds() > timeout_seconds)
{
@ -1020,6 +1030,9 @@ public:
UInt16 port;
Cluster::Address::fromString(host_id, host, port);
if (status.code != 0 && first_exception == nullptr)
first_exception = std::make_unique<Exception>("There was an error on " + host + ": " + status.message, status.code);
++num_hosts_finished;
columns[0]->insert(host);
@ -1092,11 +1105,14 @@ private:
Strings current_active_hosts; /// Hosts that were in active state at the last check
size_t num_hosts_finished = 0;
/// Save the first detected error and throw it at the end of excecution
std::unique_ptr<Exception> first_exception;
Int64 timeout_seconds = 120;
};
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & context)
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & context, const NameSet & query_databases)
{
/// Remove FORMAT <fmt> and INTO OUTFILE <file> if exists
ASTPtr query_ptr = query_ptr_->clone();
@ -1128,13 +1144,26 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont
entry.query = queryToString(query_ptr);
entry.initiator = ddl_worker.getCommonHostID();
/// Check database access rights, assume that all servers have the same users config
NameSet databases_to_check_access_rights;
Cluster::AddressesWithFailover shards = cluster->getShardsAddresses();
for (const auto & shard : shards)
{
for (const auto & addr : shard)
{
entry.hosts.emplace_back(addr);
/// Expand empty database name to shards' default database name
for (const String & database : query_databases)
databases_to_check_access_rights.emplace(database.empty() ? addr.default_database : database);
}
}
for (const String & database : databases_to_check_access_rights)
context.checkDatabaseAccessRights(database.empty() ? context.getCurrentDatabase() : database);
String node_path = ddl_worker.enqueueQuery(entry);
BlockIO io;

View File

@ -18,7 +18,8 @@ struct DDLLogEntry;
struct DDLTask;
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context);
/// Pushes distributed DDL query to the queue
BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr, const Context & context, const NameSet & query_databases);
class DDLWorker

View File

@ -42,7 +42,7 @@ BlockIO InterpreterAlterQuery::execute()
auto & alter = typeid_cast<ASTAlterQuery &>(*query_ptr);
if (!alter.cluster.empty())
return executeDDLQueryOnCluster(query_ptr, context);
return executeDDLQueryOnCluster(query_ptr, context, {alter.table});
const String & table_name = alter.table;
String database_name = alter.database.empty() ? context.getCurrentDatabase() : alter.database;

View File

@ -66,7 +66,7 @@ InterpreterCreateQuery::InterpreterCreateQuery(const ASTPtr & query_ptr_, Contex
BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
{
if (!create.cluster.empty())
return executeDDLQueryOnCluster(query_ptr, context);
return executeDDLQueryOnCluster(query_ptr, context, {create.database});
String database_name = create.database;
@ -439,7 +439,13 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
{
if (!create.cluster.empty())
return executeDDLQueryOnCluster(query_ptr, context);
{
NameSet databases{create.database};
if (!create.to_table.empty())
databases.emplace(create.to_database);
return executeDDLQueryOnCluster(query_ptr, context, databases);
}
String path = context.getPath();
String current_database = context.getCurrentDatabase();

View File

@ -32,7 +32,7 @@ BlockIO InterpreterDropQuery::execute()
checkAccess(drop);
if (!drop.cluster.empty())
return executeDDLQueryOnCluster(query_ptr, context);
return executeDDLQueryOnCluster(query_ptr, context, {drop.database});
String path = context.getPath();
String current_database = context.getCurrentDatabase();

View File

@ -39,7 +39,16 @@ BlockIO InterpreterRenameQuery::execute()
ASTRenameQuery & rename = typeid_cast<ASTRenameQuery &>(*query_ptr);
if (!rename.cluster.empty())
return executeDDLQueryOnCluster(query_ptr, context);
{
NameSet databases;
for (const auto & elem : rename.elements)
{
databases.emplace(elem.from.database);
databases.emplace(elem.to.database);
}
return executeDDLQueryOnCluster(query_ptr, context, databases);
}
String path = context.getPath();
String current_database = context.getCurrentDatabase();

View File

@ -0,0 +1,16 @@
<yandex>
<users>
<restricted_user>
<password></password>
<profile>default</profile>
<quota>default</quota>
<networks>
<ip>::/0</ip>
</networks>
<allow_databases>
<database>db1</database>
</allow_databases>
</restricted_user>
</users>
</yandex>

View File

@ -315,6 +315,24 @@ def test_macro(started_cluster):
ddl_check_query(instance, "DROP TABLE IF EXISTS distr ON CLUSTER '{cluster}'")
ddl_check_query(instance, "DROP TABLE IF EXISTS tab ON CLUSTER '{cluster}'")
def test_allowed_databases(started_cluster):
instance = cluster.instances['ch2']
instance.query("CREATE DATABASE IF NOT EXISTS db1 ON CLUSTER cluster")
instance.query("CREATE DATABASE IF NOT EXISTS db2 ON CLUSTER cluster")
instance.query("CREATE TABLE db1.t1 ON CLUSTER cluster (i Int8) ENGINE = Memory", settings={"user" : "restricted_user"})
with pytest.raises(Exception):
instance.query("CREATE TABLE db2.t2 ON CLUSTER cluster (i Int8) ENGINE = Memory", settings={"user" : "restricted_user"})
with pytest.raises(Exception):
instance.query("CREATE TABLE t3 ON CLUSTER cluster (i Int8) ENGINE = Memory", settings={"user" : "restricted_user"})
with pytest.raises(Exception):
instance.query("DROP DATABASE db2 ON CLUSTER cluster", settings={"user" : "restricted_user"})
instance.query("DROP DATABASE db1 ON CLUSTER cluster", settings={"user" : "restricted_user"})
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:
for name, instance in cluster.instances.items():