This commit is contained in:
Alexander Tokmakov 2020-02-10 21:19:35 +03:00
parent 869e20d207
commit 45338ad9ca
5 changed files with 45 additions and 39 deletions

View File

@ -95,7 +95,6 @@ namespace ErrorCodes
extern const int PARTITION_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT;
extern const int SESSION_NOT_FOUND;
extern const int SESSION_IS_LOCKED;
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
extern const int LOGICAL_ERROR;
extern const int SCALAR_ALREADY_EXISTS;
extern const int UNKNOWN_SCALAR;
@ -765,7 +764,7 @@ Dependencies Context::getDependencies(const StorageID & from) const
{
auto lock = getLock();
StorageID resolved = resolveStorageIDUnlocked(from);
ViewDependencies::const_iterator iter = shared->view_dependencies.find(resolved);
auto iter = shared->view_dependencies.find(resolved);
if (iter == shared->view_dependencies.end())
return {};
@ -774,8 +773,7 @@ Dependencies Context::getDependencies(const StorageID & from) const
bool Context::isTableExist(const String & database_name, const String & table_name) const
{
//FIXME do we need resolve temporary tables here?
auto table_id = resolveStorageID({database_name, table_name});
auto table_id = resolveStorageID({database_name, table_name}, StorageNamespace::Ordinary);
return DatabaseCatalog::instance().isTableExist(table_id, *this);
}
@ -2046,30 +2044,45 @@ void Context::resetInputCallbacks()
input_blocks_reader = {};
}
StorageID Context::resolveStorageID(StorageID storage_id) const
StorageID Context::resolveStorageID(StorageID storage_id, StorageNamespace where) const
{
auto lock = getLock();
return resolveStorageIDUnlocked(std::move(storage_id));
return resolveStorageIDUnlocked(std::move(storage_id), where);
}
StorageID Context::resolveStorageIDUnlocked(StorageID storage_id) const
StorageID Context::resolveStorageIDUnlocked(StorageID storage_id, StorageNamespace where) const
{
if (storage_id.uuid != UUIDHelpers::Nil)
{
//TODO maybe update table and db name?
//TODO add flag `resolved` to StorageID and check access rights if it was not previously resolved
return storage_id;
bool look_for_external_table = where & StorageNamespace::External;
bool in_current_database = where & StorageNamespace::CurrentDatabase;
bool in_specified_database = where & StorageNamespace::Global;
if (!storage_id.database_name.empty())
{
if (in_specified_database)
return storage_id;
throw Exception("External and temporary tables have no database, but " +
storage_id.database_name + " is specified", ErrorCodes::UNKNOWN_TABLE);
}
if (storage_id.database_name.empty())
if (look_for_external_table)
{
auto it = external_tables_mapping.find(storage_id.getTableName());
if (it != external_tables_mapping.end())
return it->second->getGlobalTableID(); /// Do not check access rights for session-local table
return it->second->getGlobalTableID();
}
if (in_current_database)
{
if (current_database.empty())
throw Exception("Default database is not selected", ErrorCodes::UNKNOWN_DATABASE);
storage_id.database_name = current_database;
return storage_id;
}
return storage_id;
throw Exception("Cannot resolve database name for table " + storage_id.getNameForLogs(), ErrorCodes::UNKNOWN_TABLE);
}

View File

@ -145,8 +145,6 @@ struct SubscriptionForUserChange
};
struct TemporaryTableHolder;
class DatabaseCatalog;
using DatabaseCatalogPtr = std::shared_ptr<DatabaseCatalog>;
/** A set of known objects that can be used in the query.
* Consists of a shared part (always common to all sessions and queries)
@ -197,9 +195,6 @@ private:
using SampleBlockCache = std::unordered_map<std::string, Block>;
mutable SampleBlockCache sample_block_cache;
using DatabasePtr = std::shared_ptr<IDatabase>;
using Databases = std::map<String, std::shared_ptr<IDatabase>>;
NameToNameMap query_parameters; /// Dictionary with query parameters for prepared statements.
/// (key=name, value)
@ -306,11 +301,19 @@ public:
bool isDictionaryExists(const String & database_name, const String & dictionary_name) const;
bool isExternalTableExist(const String & table_name) const;
enum StorageNamespace
{
Global = 1u, /// Database name must be specified
CurrentDatabase = 2u, /// Use current database
Ordinary = Global | CurrentDatabase, /// If database name is not specified, use current database
External = 4u, /// Try get external table
All = External | Ordinary /// If database name is not specified, try get external table,
/// if external table not found use current database.
};
String resolveDatabase(const String & database_name) const;
String resolveDatabaseAndCheckAccess(const String & database_name) const;
//StorageID resolveDatabase(StorageID table_id) const;
StorageID resolveStorageID(StorageID storage_id) const;
StorageID resolveStorageIDUnlocked(StorageID storage_id) const;
StorageID resolveStorageID(StorageID storage_id, StorageNamespace where = StorageNamespace::All) const;
StorageID resolveStorageIDUnlocked(StorageID storage_id, StorageNamespace where = StorageNamespace::All) const;
const Scalars & getScalars() const;
const Block & getScalar(const String & name) const;

View File

@ -65,13 +65,6 @@ DatabaseAndTable DatabaseCatalog::tryGetByUUID(const UUID & uuid) const
return it->second;
}
//String DatabaseCatalog::resolveDatabase(const String & database_name, const String & current_database)
//{
// String res = database_name.empty() ? current_database : database_name;
// if (res.empty())
// throw Exception("Default database is not selected", ErrorCodes::UNKNOWN_DATABASE);
// return res;
//}
StoragePtr DatabaseCatalog::getTable(const StorageID & table_id, const Context & local_context, std::optional<Exception> * exception) const
{
@ -121,6 +114,7 @@ void DatabaseCatalog::assertDatabaseDoesntExist(const String & database_name) co
void DatabaseCatalog::assertDatabaseExistsUnlocked(const String & database_name) const
{
assert(!database_name.empty());
if (databases.end() == databases.find(database_name))
throw Exception("Database " + backQuoteIfNeed(database_name) + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
}
@ -128,13 +122,13 @@ void DatabaseCatalog::assertDatabaseExistsUnlocked(const String & database_name)
void DatabaseCatalog::assertDatabaseDoesntExistUnlocked(const String & database_name) const
{
assert(!database_name.empty());
if (databases.end() != databases.find(database_name))
throw Exception("Database " + backQuoteIfNeed(database_name) + " already exists.", ErrorCodes::DATABASE_ALREADY_EXISTS);
}
void DatabaseCatalog::attachDatabase(const String & database_name, const DatabasePtr & database)
{
//local_context.checkDatabaseAccessRights(database_name);
std::lock_guard lock{databases_mutex};
assertDatabaseDoesntExistUnlocked(database_name);
databases[database_name] = database;
@ -143,18 +137,15 @@ void DatabaseCatalog::attachDatabase(const String & database_name, const Databas
DatabasePtr DatabaseCatalog::detachDatabase(const String & database_name)
{
//local_context.checkDatabaseAccessRights(database_name);
std::lock_guard lock{databases_mutex};
auto res = getDatabase(database_name); //FIXME locks order
assertDatabaseExistsUnlocked(database_name);
auto res = databases.find(database_name)->second;
databases.erase(database_name);
return res;
}
DatabasePtr DatabaseCatalog::getDatabase(const String & database_name) const
{
assert(!database_name.empty());
//String db = local_context.resolveDatabase(database_name);
//local_context.checkDatabaseAccessRights(db); //FIXME non-atomic
std::lock_guard lock{databases_mutex};
assertDatabaseExistsUnlocked(database_name);
return databases.find(database_name)->second;
@ -163,7 +154,6 @@ DatabasePtr DatabaseCatalog::getDatabase(const String & database_name) const
DatabasePtr DatabaseCatalog::tryGetDatabase(const String & database_name) const
{
assert(!database_name.empty());
//String db = local_context.resolveDatabase(database_name);
std::lock_guard lock{databases_mutex};
auto it = databases.find(database_name);
if (it == databases.end())
@ -198,7 +188,7 @@ bool DatabaseCatalog::isTableExist(const DB::StorageID & table_id, const DB::Con
void DatabaseCatalog::assertTableDoesntExist(const StorageID & table_id, const Context & context) const
{
if (!isTableExist(table_id, context))
if (isTableExist(table_id, context))
throw Exception("Table " + table_id.getNameForLogs() + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}

View File

@ -18,12 +18,11 @@ struct StorageID;
class Exception;
using DatabasePtr = std::shared_ptr<IDatabase>;
using DatabaseAndTable = std::pair<DatabasePtr, StoragePtr>;
using Databases = std::map<String, std::shared_ptr<IDatabase>>;
//TODO make singleton?
class DatabaseCatalog : boost::noncopyable
{
public:
using Databases = std::map<String, std::shared_ptr<IDatabase>>;
static constexpr const char * TEMPORARY_DATABASE = "_temporary_and_external_tables";
static constexpr const char * SYSTEM_DATABASE = "system";

View File

@ -250,6 +250,7 @@ BlockIO InterpreterDropQuery::executeToDatabase(const String & database_name, AS
context.checkAccess(AccessType::DETACH_DATABASE, database_name);
DatabaseCatalog::instance().detachDatabase(database_name);
database->shutdown();
//FIXME someone may still use tables from database
}
else if (kind == ASTDropQuery::Kind::Drop)
{