store temporary tables in DatabaseMemory

This commit is contained in:
Alexander Tokmakov 2020-01-30 22:00:51 +03:00
parent 93dba03e81
commit a508c25d05
7 changed files with 105 additions and 30 deletions

View File

@ -51,7 +51,7 @@ ASTPtr DatabaseMemory::getCreateTableQueryImpl(const Context &, const String & t
std::lock_guard lock{mutex}; std::lock_guard lock{mutex};
auto it = create_queries.find(table_name); auto it = create_queries.find(table_name);
if (it == create_queries.end() && throw_on_error) if (it == create_queries.end() && throw_on_error)
throw Exception("No table " + table_name + " in database " + database_name, ErrorCodes::UNKNOWN_TABLE); throw Exception("There is no metadata of table " + table_name + " in database " + database_name, ErrorCodes::UNKNOWN_TABLE);
return it->second; return it->second;
} }

View File

@ -19,7 +19,8 @@ namespace DB
class DatabaseMemory : public DatabaseWithOwnTablesBase class DatabaseMemory : public DatabaseWithOwnTablesBase
{ {
public: public:
DatabaseMemory(const String & name_); //FIXME default name
DatabaseMemory(const String & name_ = "_temporary_and_external_tables");
String getEngineName() const override { return "Memory"; } String getEngineName() const override { return "Memory"; }

View File

@ -58,6 +58,8 @@
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <Common/RemoteHostFilter.h> #include <Common/RemoteHostFilter.h>
#include <Databases/DatabaseMemory.h>
namespace ProfileEvents namespace ProfileEvents
{ {
extern const Event ContextLock; extern const Event ContextLock;
@ -99,6 +101,57 @@ namespace ErrorCodes
extern const int ACCESS_DENIED; extern const int ACCESS_DENIED;
} }
struct TemporaryTableHolder : boost::noncopyable
{
static constexpr const char * database_name = "_temporary_and_external_tables";
TemporaryTableHolder(const Context & context_,
DatabaseMemory & external_tables_,
const StoragePtr & table,
const ASTPtr & query = {})
: context(context_), external_tables(external_tables_)
{
if (query)
{
ASTCreateQuery & create = dynamic_cast<ASTCreateQuery &>(*query);
if (create.uuid == UUIDHelpers::Nil)
create.uuid = UUIDHelpers::generateV4();
id = create.uuid;
}
else
id = UUIDHelpers::generateV4();
external_tables.createTable(context, "_data_" + toString(id), table, query);
}
TemporaryTableHolder(TemporaryTableHolder && other)
: context(other.context), external_tables(other.external_tables), id(other.id)
{
other.id = UUIDHelpers::Nil;
}
~TemporaryTableHolder()
{
external_tables.removeTable(context, "_data_" + toString(id));
}
StorageID getGlobalTableID() const
{
return StorageID{database_name, "_data_" + toString(id), id};
}
StoragePtr getTable() const
{
auto table = external_tables.tryGetTable(context, "_data_" + toString(id));
if (!table)
throw Exception("Temporary table " + getGlobalTableID().getNameForLogs() + " not found", ErrorCodes::LOGICAL_ERROR);
return table;
}
const Context & context;
DatabaseMemory & external_tables;
UUID id;
};
/** Set of known objects (environment), that could be used in query. /** Set of known objects (environment), that could be used in query.
* Shared (global) part. Order of members (especially, order of destruction) is very important. * Shared (global) part. Order of members (especially, order of destruction) is very important.
@ -134,6 +187,8 @@ struct ContextShared
mutable VolumePtr tmp_volume; /// Volume for the the temporary files that occur when processing the request. mutable VolumePtr tmp_volume; /// Volume for the the temporary files that occur when processing the request.
Databases databases; /// List of databases and tables in them. Databases databases; /// List of databases and tables in them.
DatabaseMemory temporary_and_external_tables;
mutable std::optional<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization. mutable std::optional<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
mutable std::optional<ExternalDictionariesLoader> external_dictionaries_loader; mutable std::optional<ExternalDictionariesLoader> external_dictionaries_loader;
mutable std::optional<ExternalModelsLoader> external_models_loader; mutable std::optional<ExternalModelsLoader> external_models_loader;
@ -785,9 +840,8 @@ void Context::removeDependency(const StorageID & from, const StorageID & where)
Dependencies Context::getDependencies(const StorageID & from) const Dependencies Context::getDependencies(const StorageID & from) const
{ {
auto lock = getLock(); auto lock = getLock();
StorageID resolved = resolveStorageIDUnlocked(from);
String db = resolveDatabase(from.database_name, current_database); ViewDependencies::const_iterator iter = shared->view_dependencies.find(resolved);
ViewDependencies::const_iterator iter = shared->view_dependencies.find(StorageID(db, from.table_name, from.uuid));
if (iter == shared->view_dependencies.end()) if (iter == shared->view_dependencies.end())
return {}; return {};
@ -820,7 +874,7 @@ bool Context::isDatabaseExist(const String & database_name) const
bool Context::isExternalTableExist(const String & table_name) const bool Context::isExternalTableExist(const String & table_name) const
{ {
return external_tables.end() != external_tables.find(table_name); return external_tables_mapping.count(table_name);
} }
@ -872,8 +926,8 @@ Tables Context::getExternalTables() const
auto lock = getLock(); auto lock = getLock();
Tables res; Tables res;
for (auto & table : external_tables) for (auto & table : external_tables_mapping)
res[table.first] = table.second.first; res[table.first] = table.second->getTable();
if (session_context && session_context != this) if (session_context && session_context != this)
{ {
@ -891,11 +945,11 @@ Tables Context::getExternalTables() const
StoragePtr Context::tryGetExternalTable(const String & table_name) const StoragePtr Context::tryGetExternalTable(const String & table_name) const
{ {
TableAndCreateASTs::const_iterator jt = external_tables.find(table_name); auto it = external_tables_mapping.find(table_name);
if (external_tables.end() == jt) if (external_tables_mapping.end() == it)
return StoragePtr(); return StoragePtr();
return jt->second.first; return it->second->getTable();
} }
StoragePtr Context::getTable(const String & database_name, const String & table_name) const StoragePtr Context::getTable(const String & database_name, const String & table_name) const
@ -965,10 +1019,11 @@ StoragePtr Context::getTableImpl(const StorageID & table_id, std::optional<Excep
void Context::addExternalTable(const String & table_name, const StoragePtr & storage, const ASTPtr & ast) void Context::addExternalTable(const String & table_name, const StoragePtr & storage, const ASTPtr & ast)
{ {
if (external_tables.end() != external_tables.find(table_name)) //FIXME why without getLock()?
if (external_tables_mapping.end() != external_tables_mapping.find(table_name))
throw Exception("Temporary table " + backQuoteIfNeed(table_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS); throw Exception("Temporary table " + backQuoteIfNeed(table_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
external_tables[table_name] = std::pair(storage, ast); external_tables_mapping.emplace(table_name, std::make_shared<TemporaryTableHolder>(*this, shared->temporary_and_external_tables, storage, ast));
} }
@ -984,16 +1039,9 @@ bool Context::hasScalar(const String & name) const
} }
StoragePtr Context::tryRemoveExternalTable(const String & table_name) bool Context::removeExternalTable(const String & table_name)
{ {
TableAndCreateASTs::const_iterator it = external_tables.find(table_name); return external_tables_mapping.erase(table_name);
if (external_tables.end() == it)
return StoragePtr();
auto storage = it->second.first;
external_tables.erase(it);
return storage;
} }
@ -1080,11 +1128,11 @@ DatabasePtr Context::detachDatabase(const String & database_name)
ASTPtr Context::getCreateExternalTableQuery(const String & table_name) const ASTPtr Context::getCreateExternalTableQuery(const String & table_name) const
{ {
TableAndCreateASTs::const_iterator jt = external_tables.find(table_name); auto it = external_tables_mapping.find(table_name);
if (external_tables.end() == jt) if (external_tables_mapping.end() == it)
throw Exception("Temporary table " + backQuoteIfNeed(table_name) + " doesn't exist", ErrorCodes::UNKNOWN_TABLE); throw Exception("Temporary table " + backQuoteIfNeed(table_name) + " doesn't exist", ErrorCodes::UNKNOWN_TABLE);
return jt->second.second; return shared->temporary_and_external_tables.getCreateTableQuery(*this, it->second->getGlobalTableID().table_name);
} }
Settings Context::getSettings() const Settings Context::getSettings() const
@ -2163,6 +2211,27 @@ void Context::resetInputCallbacks()
input_blocks_reader = {}; input_blocks_reader = {};
} }
StorageID Context::resolveStorageIDUnlocked(StorageID storage_id) const
{
if (storage_id.uuid != UUIDHelpers::Nil)
{
//TODO maybe update table and db name?
//TODO add flag `resolved` to StorageID and check access rights if it was not previously resolved
return storage_id;
}
if (storage_id.database_name.empty())
{
auto it = external_tables_mapping.find(storage_id.getTableName());
if (it != external_tables_mapping.end())
return it->second->getGlobalTableID(); /// Do not check access rights for session-local table
if (current_database.empty())
throw Exception("Default database is not selected", ErrorCodes::UNKNOWN_DATABASE);
storage_id.database_name = current_database;
}
checkDatabaseAccessRightsImpl(storage_id.database_name);
return storage_id;
}
SessionCleaner::~SessionCleaner() SessionCleaner::~SessionCleaner()
{ {

View File

@ -143,6 +143,8 @@ struct SubscriptionForUserChange
SubscriptionForUserChange & operator =(const SubscriptionForUserChange &) { subscription = {}; return *this; } SubscriptionForUserChange & operator =(const SubscriptionForUserChange &) { subscription = {}; return *this; }
}; };
struct TemporaryTableHolder;
/** A set of known objects that can be used in the query. /** A set of known objects that can be used in the query.
* Consists of a shared part (always common to all sessions and queries) * Consists of a shared part (always common to all sessions and queries)
* and copied part (which can be its own for each session or query). * and copied part (which can be its own for each session or query).
@ -178,7 +180,9 @@ private:
String default_format; /// Format, used when server formats data by itself and if query does not have FORMAT specification. String default_format; /// Format, used when server formats data by itself and if query does not have FORMAT specification.
/// Thus, used in HTTP interface. If not specified - then some globally default format is used. /// Thus, used in HTTP interface. If not specified - then some globally default format is used.
// TODO maybe replace with DatabaseMemory? // TODO maybe replace with DatabaseMemory?
TableAndCreateASTs external_tables; /// Temporary tables. //TableAndCreateASTs external_tables; /// Temporary tables.
using TemporaryTablesMapping = std::map<String, std::shared_ptr<TemporaryTableHolder>>;
TemporaryTablesMapping external_tables_mapping;
Scalars scalars; Scalars scalars;
StoragePtr view_source; /// Temporary StorageValues used to generate alias columns for materialized views StoragePtr view_source; /// Temporary StorageValues used to generate alias columns for materialized views
Tables table_function_results; /// Temporary tables obtained by execution of table functions. Keyed by AST tree id. Tables table_function_results; /// Temporary tables obtained by execution of table functions. Keyed by AST tree id.
@ -316,7 +320,8 @@ public:
void addExternalTable(const String & table_name, const StoragePtr & storage, const ASTPtr & ast = {}); void addExternalTable(const String & table_name, const StoragePtr & storage, const ASTPtr & ast = {});
void addScalar(const String & name, const Block & block); void addScalar(const String & name, const Block & block);
bool hasScalar(const String & name) const; bool hasScalar(const String & name) const;
StoragePtr tryRemoveExternalTable(const String & table_name); bool removeExternalTable(const String & table_name);
StorageID resolveStorageIDUnlocked(StorageID storage_id) const;
StoragePtr executeTableFunction(const ASTPtr & table_expression); StoragePtr executeTableFunction(const ASTPtr & table_expression);

View File

@ -616,7 +616,7 @@ bool InterpreterCreateQuery::doCreateTable(const ASTCreateQuery & create,
throw Exception("Table " + create.database + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS); throw Exception("Table " + create.database + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
} }
} }
else if (context.tryGetExternalTable(table_name) && create.if_not_exists) else if (context.isExternalTableExist(table_name) && create.if_not_exists)
return false; return false;
StoragePtr res; StoragePtr res;

View File

@ -221,7 +221,7 @@ BlockIO InterpreterDropQuery::executeToTemporaryTable(const String & table_name,
} }
else if (kind == ASTDropQuery::Kind::Drop) else if (kind == ASTDropQuery::Kind::Drop)
{ {
context_handle.tryRemoveExternalTable(table_name); context_handle.removeExternalTable(table_name);
table->shutdown(); table->shutdown();
/// If table was already dropped by anyone, an exception will be thrown /// If table was already dropped by anyone, an exception will be thrown
auto table_lock = table->lockExclusively(context.getCurrentQueryId()); auto table_lock = table->lockExclusively(context.getCurrentQueryId());

View File

@ -357,7 +357,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
/// Save the new temporary tables in the query context /// Save the new temporary tables in the query context
for (const auto & it : query_analyzer->getExternalTables()) for (const auto & it : query_analyzer->getExternalTables())
if (!context->tryGetExternalTable(it.first)) if (!context->isExternalTableExist(it.first))
context->addExternalTable(it.first, it.second); context->addExternalTable(it.first, it.second);
} }