mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-11 17:02:25 +00:00
refactor databases: inherit from DatabaseOnDisk
This commit is contained in:
parent
b686738056
commit
14e871535a
@ -10,12 +10,12 @@ DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, const Contex
|
||||
{
|
||||
}
|
||||
|
||||
void DatabaseAtomic::renameTable(const Context & context, const String & table_name, IDatabase & to_database,
|
||||
const String & to_table_name, TableStructureWriteLockHolder & lock)
|
||||
{
|
||||
//TODO
|
||||
DatabaseOnDisk::renameTable<DatabaseAtomic>(*this, context, table_name, to_database, to_table_name, lock);
|
||||
}
|
||||
//void DatabaseAtomic::renameTable(const Context & context, const String & table_name, IDatabase & to_database,
|
||||
// const String & to_table_name, TableStructureWriteLockHolder & lock)
|
||||
//{
|
||||
// //TODO
|
||||
// DatabaseOnDisk::renameTable<DatabaseAtomic>(*this, context, table_name, to_database, to_table_name, lock);
|
||||
//}
|
||||
|
||||
|
||||
}
|
||||
|
@ -15,11 +15,11 @@ public:
|
||||
|
||||
String getEngineName() const override { return "Atomic"; }
|
||||
|
||||
void renameTable(const Context & context,
|
||||
const String & table_name,
|
||||
IDatabase & to_database,
|
||||
const String & to_table_name,
|
||||
TableStructureWriteLockHolder &) override;
|
||||
//void renameTable(const Context & context,
|
||||
// const String & table_name,
|
||||
// IDatabase & to_database,
|
||||
// const String & to_table_name,
|
||||
// TableStructureWriteLockHolder &) override;
|
||||
|
||||
|
||||
};
|
||||
|
@ -22,8 +22,8 @@ namespace ErrorCodes
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
}
|
||||
|
||||
DatabaseDictionary::DatabaseDictionary(String name_)
|
||||
: IDatabase(std::move(name_)),
|
||||
DatabaseDictionary::DatabaseDictionary(const String & name_)
|
||||
: IDatabase(name_),
|
||||
log(&Logger::get("DatabaseDictionary(" + database_name + ")"))
|
||||
{
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ namespace DB
|
||||
class DatabaseDictionary : public IDatabase
|
||||
{
|
||||
public:
|
||||
DatabaseDictionary(String name_);
|
||||
DatabaseDictionary(const String & name_);
|
||||
|
||||
String getEngineName() const override
|
||||
{
|
||||
|
@ -29,12 +29,9 @@ namespace ErrorCodes
|
||||
|
||||
|
||||
|
||||
DatabaseLazy::DatabaseLazy(String name_, const String & metadata_path_, time_t expiration_time_, const Context & context_)
|
||||
: IDatabase(std::move(name_))
|
||||
, metadata_path(metadata_path_)
|
||||
, data_path("data/" + escapeForFileName(database_name) + "/")
|
||||
DatabaseLazy::DatabaseLazy(const String & name_, const String & metadata_path_, time_t expiration_time_, const Context & context_)
|
||||
: DatabaseOnDisk(name_, metadata_path_, "DatabaseLazy (" + name_ + ")")
|
||||
, expiration_time(expiration_time_)
|
||||
, log(&Logger::get("DatabaseLazy (" + database_name + ")"))
|
||||
{
|
||||
Poco::File(context_.getPath() + getDataPath()).createDirectories();
|
||||
}
|
||||
@ -44,7 +41,7 @@ void DatabaseLazy::loadStoredObjects(
|
||||
Context & context,
|
||||
bool /* has_force_restore_data_flag */)
|
||||
{
|
||||
DatabaseOnDisk::iterateMetadataFiles(*this, log, context, [this](const String & file_name)
|
||||
iterateMetadataFiles(context, [this](const String & file_name)
|
||||
{
|
||||
const std::string table_name = file_name.substr(0, file_name.size() - 4);
|
||||
attachTable(table_name, nullptr);
|
||||
@ -61,13 +58,13 @@ void DatabaseLazy::createTable(
|
||||
SCOPE_EXIT({ clearExpiredTables(); });
|
||||
if (!endsWith(table->getName(), "Log"))
|
||||
throw Exception("Lazy engine can be used only with *Log tables.", ErrorCodes::UNSUPPORTED_METHOD);
|
||||
DatabaseOnDisk::createTable(*this, context, table_name, table, query);
|
||||
DatabaseOnDisk::createTable(context, table_name, table, query);
|
||||
|
||||
/// DatabaseOnDisk::createTable renames file, so we need to get new metadata_modification_time.
|
||||
std::lock_guard lock(tables_mutex);
|
||||
std::lock_guard lock(mutex);
|
||||
auto it = tables_cache.find(table_name);
|
||||
if (it != tables_cache.end())
|
||||
it->second.metadata_modification_time = DatabaseOnDisk::getObjectMetadataModificationTime(*this, table_name);
|
||||
it->second.metadata_modification_time = DatabaseOnDisk::getObjectMetadataModificationTime(table_name);
|
||||
}
|
||||
|
||||
void DatabaseLazy::removeTable(
|
||||
@ -75,7 +72,7 @@ void DatabaseLazy::removeTable(
|
||||
const String & table_name)
|
||||
{
|
||||
SCOPE_EXIT({ clearExpiredTables(); });
|
||||
DatabaseOnDisk::removeTable(*this, context, table_name, log);
|
||||
DatabaseOnDisk::removeTable(context, table_name);
|
||||
}
|
||||
|
||||
void DatabaseLazy::renameTable(
|
||||
@ -86,30 +83,17 @@ void DatabaseLazy::renameTable(
|
||||
TableStructureWriteLockHolder & lock)
|
||||
{
|
||||
SCOPE_EXIT({ clearExpiredTables(); });
|
||||
DatabaseOnDisk::renameTable<DatabaseLazy>(*this, context, table_name, to_database, to_table_name, lock);
|
||||
DatabaseOnDisk::renameTable(context, table_name, to_database, to_table_name, lock);
|
||||
}
|
||||
|
||||
|
||||
time_t DatabaseLazy::getObjectMetadataModificationTime(
|
||||
const Context & /* context */,
|
||||
const String & table_name)
|
||||
time_t DatabaseLazy::getObjectMetadataModificationTime(const String & table_name) const
|
||||
{
|
||||
std::lock_guard lock(tables_mutex);
|
||||
std::lock_guard lock(mutex);
|
||||
auto it = tables_cache.find(table_name);
|
||||
if (it != tables_cache.end())
|
||||
return it->second.metadata_modification_time;
|
||||
else
|
||||
throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
|
||||
}
|
||||
|
||||
ASTPtr DatabaseLazy::getCreateTableQueryImpl(const Context & context, const String & table_name, bool throw_on_error) const
|
||||
{
|
||||
return DatabaseOnDisk::getCreateTableQueryImpl(*this, context, table_name, throw_on_error);
|
||||
}
|
||||
|
||||
ASTPtr DatabaseLazy::getCreateDatabaseQuery(const Context & context) const
|
||||
{
|
||||
return DatabaseOnDisk::getCreateDatabaseQuery(*this, context);
|
||||
throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
|
||||
}
|
||||
|
||||
void DatabaseLazy::alterTable(
|
||||
@ -120,22 +104,17 @@ void DatabaseLazy::alterTable(
|
||||
const ConstraintsDescription & /* constraints */,
|
||||
const ASTModifier & /* storage_modifier */)
|
||||
{
|
||||
//FIXME WTF
|
||||
SCOPE_EXIT({ clearExpiredTables(); });
|
||||
throw Exception("ALTER query is not supported for Lazy database.", ErrorCodes::UNSUPPORTED_METHOD);
|
||||
}
|
||||
|
||||
|
||||
void DatabaseLazy::drop(const Context & context)
|
||||
{
|
||||
DatabaseOnDisk::drop(*this, context);
|
||||
}
|
||||
|
||||
bool DatabaseLazy::isTableExist(
|
||||
const Context & /* context */,
|
||||
const String & table_name) const
|
||||
{
|
||||
SCOPE_EXIT({ clearExpiredTables(); });
|
||||
std::lock_guard lock(tables_mutex);
|
||||
std::lock_guard lock(mutex);
|
||||
return tables_cache.find(table_name) != tables_cache.end();
|
||||
}
|
||||
|
||||
@ -145,7 +124,7 @@ StoragePtr DatabaseLazy::tryGetTable(
|
||||
{
|
||||
SCOPE_EXIT({ clearExpiredTables(); });
|
||||
{
|
||||
std::lock_guard lock(tables_mutex);
|
||||
std::lock_guard lock(mutex);
|
||||
auto it = tables_cache.find(table_name);
|
||||
if (it == tables_cache.end())
|
||||
throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
|
||||
@ -165,7 +144,7 @@ StoragePtr DatabaseLazy::tryGetTable(
|
||||
|
||||
DatabaseTablesIteratorPtr DatabaseLazy::getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name)
|
||||
{
|
||||
std::lock_guard lock(tables_mutex);
|
||||
std::lock_guard lock(mutex);
|
||||
Strings filtered_tables;
|
||||
for (const auto & [table_name, cached_table] : tables_cache)
|
||||
{
|
||||
@ -184,12 +163,12 @@ bool DatabaseLazy::empty(const Context & /* context */) const
|
||||
void DatabaseLazy::attachTable(const String & table_name, const StoragePtr & table)
|
||||
{
|
||||
LOG_DEBUG(log, "Attach table " << backQuote(table_name) << ".");
|
||||
std::lock_guard lock(tables_mutex);
|
||||
std::lock_guard lock(mutex);
|
||||
time_t current_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
|
||||
|
||||
auto [it, inserted] = tables_cache.emplace(std::piecewise_construct,
|
||||
std::forward_as_tuple(table_name),
|
||||
std::forward_as_tuple(table, current_time, DatabaseOnDisk::getObjectMetadataModificationTime(*this, table_name)));
|
||||
std::forward_as_tuple(table, current_time, DatabaseOnDisk::getObjectMetadataModificationTime(table_name)));
|
||||
if (!inserted)
|
||||
throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
|
||||
|
||||
@ -201,7 +180,7 @@ StoragePtr DatabaseLazy::detachTable(const String & table_name)
|
||||
StoragePtr res;
|
||||
{
|
||||
LOG_DEBUG(log, "Detach table " << backQuote(table_name) << ".");
|
||||
std::lock_guard lock(tables_mutex);
|
||||
std::lock_guard lock(mutex);
|
||||
auto it = tables_cache.find(table_name);
|
||||
if (it == tables_cache.end())
|
||||
throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
|
||||
@ -217,7 +196,7 @@ void DatabaseLazy::shutdown()
|
||||
{
|
||||
TablesCache tables_snapshot;
|
||||
{
|
||||
std::lock_guard lock(tables_mutex);
|
||||
std::lock_guard lock(mutex);
|
||||
tables_snapshot = tables_cache;
|
||||
}
|
||||
|
||||
@ -227,7 +206,7 @@ void DatabaseLazy::shutdown()
|
||||
kv.second.table->shutdown();
|
||||
}
|
||||
|
||||
std::lock_guard lock(tables_mutex);
|
||||
std::lock_guard lock(mutex);
|
||||
tables_cache.clear();
|
||||
}
|
||||
|
||||
@ -243,21 +222,6 @@ DatabaseLazy::~DatabaseLazy()
|
||||
}
|
||||
}
|
||||
|
||||
String DatabaseLazy::getDataPath() const
|
||||
{
|
||||
return data_path;
|
||||
}
|
||||
|
||||
String DatabaseLazy::getMetadataPath() const
|
||||
{
|
||||
return metadata_path;
|
||||
}
|
||||
|
||||
String DatabaseLazy::getObjectMetadataPath(const String & table_name) const
|
||||
{
|
||||
return DatabaseOnDisk::getObjectMetadataPath(*this, table_name);
|
||||
}
|
||||
|
||||
StoragePtr DatabaseLazy::loadTable(const Context & context, const String & table_name) const
|
||||
{
|
||||
SCOPE_EXIT({ clearExpiredTables(); });
|
||||
@ -282,7 +246,7 @@ StoragePtr DatabaseLazy::loadTable(const Context & context, const String & table
|
||||
if (!ast || !endsWith(table->getName(), "Log"))
|
||||
throw Exception("Only *Log tables can be used with Lazy database engine.", ErrorCodes::LOGICAL_ERROR);
|
||||
{
|
||||
std::lock_guard lock(tables_mutex);
|
||||
std::lock_guard lock(mutex);
|
||||
auto it = tables_cache.find(table_name);
|
||||
if (it == tables_cache.end())
|
||||
throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
|
||||
@ -305,7 +269,7 @@ StoragePtr DatabaseLazy::loadTable(const Context & context, const String & table
|
||||
|
||||
void DatabaseLazy::clearExpiredTables() const
|
||||
{
|
||||
std::lock_guard lock(tables_mutex);
|
||||
std::lock_guard lock(mutex);
|
||||
auto time_now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
|
||||
|
||||
CacheExpirationQueue expired_tables;
|
||||
|
@ -1,6 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <Databases/DatabasesCommon.h>
|
||||
#include <Databases/DatabaseOnDisk.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
|
||||
@ -15,10 +15,11 @@ class DatabaseLazyIterator;
|
||||
* Works like DatabaseOrdinary, but stores in memory only cache.
|
||||
* Can be used only with *Log engines.
|
||||
*/
|
||||
class DatabaseLazy : public IDatabase
|
||||
class DatabaseLazy : public DatabaseOnDisk
|
||||
{
|
||||
//TODO rewrite it all
|
||||
public:
|
||||
DatabaseLazy(String name_, const String & metadata_path_, time_t expiration_time_, const Context & context_);
|
||||
DatabaseLazy(const String & name_, const String & metadata_path_, time_t expiration_time_, const Context & context_);
|
||||
|
||||
String getEngineName() const override { return "Lazy"; }
|
||||
|
||||
@ -51,17 +52,7 @@ public:
|
||||
const ConstraintsDescription & constraints,
|
||||
const ASTModifier & engine_modifier) override;
|
||||
|
||||
time_t getObjectMetadataModificationTime(
|
||||
const Context & context,
|
||||
const String & table_name) override;
|
||||
|
||||
ASTPtr getCreateDatabaseQuery(const Context & context) const override;
|
||||
|
||||
String getDataPath() const override;
|
||||
String getMetadataPath() const override;
|
||||
String getObjectMetadataPath(const String & table_name) const override;
|
||||
|
||||
void drop(const Context & context) override;
|
||||
time_t getObjectMetadataModificationTime(const String & table_name) const override;
|
||||
|
||||
bool isTableExist(
|
||||
const Context & context,
|
||||
@ -83,9 +74,6 @@ public:
|
||||
|
||||
~DatabaseLazy() override;
|
||||
|
||||
protected:
|
||||
ASTPtr getCreateTableQueryImpl(const Context & context, const String & table_name, bool throw_on_error) const override;
|
||||
|
||||
private:
|
||||
struct CacheExpirationQueueElement
|
||||
{
|
||||
@ -113,18 +101,11 @@ private:
|
||||
|
||||
using TablesCache = std::unordered_map<String, CachedTable>;
|
||||
|
||||
|
||||
const String metadata_path;
|
||||
const String data_path;
|
||||
|
||||
const time_t expiration_time;
|
||||
|
||||
mutable std::mutex tables_mutex;
|
||||
mutable TablesCache tables_cache;
|
||||
mutable CacheExpirationQueue cache_expiration_queue;
|
||||
|
||||
Poco::Logger * log;
|
||||
|
||||
StoragePtr loadTable(const Context & context, const String & table_name) const;
|
||||
|
||||
void clearExpiredTables() const;
|
||||
|
@ -13,9 +13,8 @@ namespace ErrorCodes
|
||||
extern const int UNSUPPORTED_METHOD;
|
||||
}
|
||||
|
||||
DatabaseMemory::DatabaseMemory(String name_)
|
||||
: DatabaseWithOwnTablesBase(std::move(name_))
|
||||
, log(&Logger::get("DatabaseMemory(" + database_name + ")"))
|
||||
DatabaseMemory::DatabaseMemory(const String & name_)
|
||||
: DatabaseWithOwnTablesBase(name_, "DatabaseMemory(" + name_ + ")")
|
||||
{}
|
||||
|
||||
void DatabaseMemory::createTable(
|
||||
|
@ -17,7 +17,7 @@ namespace DB
|
||||
class DatabaseMemory : public DatabaseWithOwnTablesBase
|
||||
{
|
||||
public:
|
||||
DatabaseMemory(String name_);
|
||||
DatabaseMemory(const String & name_);
|
||||
|
||||
String getEngineName() const override { return "Memory"; }
|
||||
|
||||
@ -32,9 +32,6 @@ public:
|
||||
const String & table_name) override;
|
||||
|
||||
ASTPtr getCreateDatabaseQuery(const Context & context) const override;
|
||||
|
||||
private:
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -112,7 +112,7 @@ ASTPtr DatabaseMySQL::getCreateTableQueryImpl(const Context &, const String & ta
|
||||
return local_tables_cache[table_name].create_table_query;
|
||||
}
|
||||
|
||||
time_t DatabaseMySQL::getObjectMetadataModificationTime(const Context &, const String & table_name)
|
||||
time_t DatabaseMySQL::getObjectMetadataModificationTime(const String & table_name) const
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(mutex);
|
||||
|
||||
|
@ -36,7 +36,7 @@ public:
|
||||
|
||||
StoragePtr tryGetTable(const Context & context, const String & name) const override;
|
||||
|
||||
time_t getObjectMetadataModificationTime(const Context & context, const String & name) override;
|
||||
time_t getObjectMetadataModificationTime(const String & name) const override;
|
||||
|
||||
void shutdown() override;
|
||||
|
||||
|
@ -28,8 +28,6 @@ static constexpr size_t METADATA_FILE_BUFFER_SIZE = 32768;
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_GET_CREATE_TABLE_QUERY;
|
||||
extern const int CANNOT_GET_CREATE_DICTIONARY_QUERY;
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
extern const int INCORRECT_FILE_NAME;
|
||||
extern const int SYNTAX_ERROR;
|
||||
@ -40,63 +38,6 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
namespace detail
|
||||
{
|
||||
String getObjectMetadataPath(const String & base_path, const String & table_name)
|
||||
{
|
||||
return base_path + (endsWith(base_path, "/") ? "" : "/") + escapeForFileName(table_name) + ".sql";
|
||||
}
|
||||
|
||||
String getDatabaseMetadataPath(const String & base_path)
|
||||
{
|
||||
return (endsWith(base_path, "/") ? base_path.substr(0, base_path.size() - 1) : base_path) + ".sql";
|
||||
}
|
||||
|
||||
ASTPtr getQueryFromMetadata(const String & metadata_path, bool throw_on_error)
|
||||
{
|
||||
String query;
|
||||
|
||||
try
|
||||
{
|
||||
ReadBufferFromFile in(metadata_path, 4096);
|
||||
readStringUntilEOF(query, in);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
if (!throw_on_error && e.code() == ErrorCodes::FILE_DOESNT_EXIST)
|
||||
return nullptr;
|
||||
else
|
||||
throw;
|
||||
}
|
||||
|
||||
ParserCreateQuery parser;
|
||||
const char * pos = query.data();
|
||||
std::string error_message;
|
||||
auto ast = tryParseQuery(parser, pos, pos + query.size(), error_message, /* hilite = */ false,
|
||||
"in file " + metadata_path, /* allow_multi_statements = */ false, 0);
|
||||
|
||||
if (!ast && throw_on_error)
|
||||
throw Exception(error_message, ErrorCodes::SYNTAX_ERROR);
|
||||
|
||||
return ast;
|
||||
}
|
||||
|
||||
ASTPtr getCreateQueryFromMetadata(const String & metadata_path, const String & database, bool throw_on_error)
|
||||
{
|
||||
ASTPtr ast = getQueryFromMetadata(metadata_path, throw_on_error);
|
||||
|
||||
if (ast)
|
||||
{
|
||||
auto & ast_create_query = ast->as<ASTCreateQuery &>();
|
||||
ast_create_query.attach = false;
|
||||
ast_create_query.database = database;
|
||||
}
|
||||
|
||||
return ast;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ASTPtr parseCreateQueryFromMetadataFile(const String & filepath, Poco::Logger * log)
|
||||
{
|
||||
String definition;
|
||||
@ -169,6 +110,7 @@ String getObjectDefinitionFromCreateQuery(const ASTPtr & query)
|
||||
if (!create)
|
||||
{
|
||||
std::ostringstream query_stream;
|
||||
//FIXME WTF
|
||||
formatAST(*create, query_stream, true);
|
||||
throw Exception("Query '" + query_stream.str() + "' is not CREATE query", ErrorCodes::LOGICAL_ERROR);
|
||||
}
|
||||
@ -198,7 +140,6 @@ String getObjectDefinitionFromCreateQuery(const ASTPtr & query)
|
||||
}
|
||||
|
||||
void DatabaseOnDisk::createTable(
|
||||
IDatabase & database,
|
||||
const Context & context,
|
||||
const String & table_name,
|
||||
const StoragePtr & table,
|
||||
@ -218,14 +159,14 @@ void DatabaseOnDisk::createTable(
|
||||
/// A race condition would be possible if a table with the same name is simultaneously created using CREATE and using ATTACH.
|
||||
/// But there is protection from it - see using DDLGuard in InterpreterCreateQuery.
|
||||
|
||||
if (database.isDictionaryExist(context, table_name))
|
||||
throw Exception("Dictionary " + backQuote(database.getDatabaseName()) + "." + backQuote(table_name) + " already exists.",
|
||||
if (isDictionaryExist(context, table_name))
|
||||
throw Exception("Dictionary " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " already exists.",
|
||||
ErrorCodes::DICTIONARY_ALREADY_EXISTS);
|
||||
|
||||
if (database.isTableExist(context, table_name))
|
||||
throw Exception("Table " + backQuote(database.getDatabaseName()) + "." + backQuote(table_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
|
||||
if (isTableExist(context, table_name))
|
||||
throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
|
||||
|
||||
String table_metadata_path = database.getObjectMetadataPath(table_name);
|
||||
String table_metadata_path = getObjectMetadataPath(table_name);
|
||||
String table_metadata_tmp_path = table_metadata_path + ".tmp";
|
||||
String statement;
|
||||
|
||||
@ -244,7 +185,7 @@ void DatabaseOnDisk::createTable(
|
||||
try
|
||||
{
|
||||
/// Add a table to the map of known tables.
|
||||
database.attachTable(table_name, table);
|
||||
attachTable(table_name, table);
|
||||
|
||||
/// If it was ATTACH query and file with table metadata already exist
|
||||
/// (so, ATTACH is done after DETACH), then rename atomically replaces old file with new one.
|
||||
@ -257,79 +198,11 @@ void DatabaseOnDisk::createTable(
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void DatabaseOnDisk::createDictionary(
|
||||
IDatabase & database,
|
||||
const Context & context,
|
||||
const String & dictionary_name,
|
||||
const ASTPtr & query)
|
||||
void DatabaseOnDisk::removeTable(const Context & /* context */, const String & table_name)
|
||||
{
|
||||
const auto & settings = context.getSettingsRef();
|
||||
StoragePtr res = detachTable(table_name);
|
||||
|
||||
/** The code is based on the assumption that all threads share the same order of operations
|
||||
* - creating the .sql.tmp file;
|
||||
* - adding a dictionary to `dictionaries`;
|
||||
* - rename .sql.tmp to .sql.
|
||||
*/
|
||||
|
||||
/// A race condition would be possible if a dictionary with the same name is simultaneously created using CREATE and using ATTACH.
|
||||
/// But there is protection from it - see using DDLGuard in InterpreterCreateQuery.
|
||||
if (database.isDictionaryExist(context, dictionary_name))
|
||||
throw Exception("Dictionary " + backQuote(database.getDatabaseName()) + "." + backQuote(dictionary_name) + " already exists.", ErrorCodes::DICTIONARY_ALREADY_EXISTS);
|
||||
|
||||
if (database.isTableExist(context, dictionary_name))
|
||||
throw Exception("Table " + backQuote(database.getDatabaseName()) + "." + backQuote(dictionary_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
|
||||
|
||||
|
||||
String dictionary_metadata_path = database.getObjectMetadataPath(dictionary_name);
|
||||
String dictionary_metadata_tmp_path = dictionary_metadata_path + ".tmp";
|
||||
String statement;
|
||||
|
||||
{
|
||||
statement = getObjectDefinitionFromCreateQuery(query);
|
||||
|
||||
/// Exclusive flags guarantees, that table is not created right now in another thread. Otherwise, exception will be thrown.
|
||||
WriteBufferFromFile out(dictionary_metadata_tmp_path, statement.size(), O_WRONLY | O_CREAT | O_EXCL);
|
||||
writeString(statement, out);
|
||||
out.next();
|
||||
if (settings.fsync_metadata)
|
||||
out.sync();
|
||||
out.close();
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
/// Do not load it now because we want more strict loading
|
||||
database.attachDictionary(dictionary_name, context, false);
|
||||
/// Load dictionary
|
||||
bool lazy_load = context.getConfigRef().getBool("dictionaries_lazy_load", true);
|
||||
String dict_name = database.getDatabaseName() + "." + dictionary_name;
|
||||
context.getExternalDictionariesLoader().addDictionaryWithConfig(
|
||||
dict_name, database.getDatabaseName(), query->as<const ASTCreateQuery &>(), !lazy_load);
|
||||
|
||||
/// If it was ATTACH query and file with dictionary metadata already exist
|
||||
/// (so, ATTACH is done after DETACH), then rename atomically replaces old file with new one.
|
||||
Poco::File(dictionary_metadata_tmp_path).renameTo(dictionary_metadata_path);
|
||||
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
database.detachDictionary(dictionary_name, context);
|
||||
Poco::File(dictionary_metadata_tmp_path).remove();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void DatabaseOnDisk::removeTable(
|
||||
IDatabase & database,
|
||||
const Context & /* context */,
|
||||
const String & table_name,
|
||||
Poco::Logger * log)
|
||||
{
|
||||
StoragePtr res = database.detachTable(table_name);
|
||||
|
||||
String table_metadata_path = database.getObjectMetadataPath(table_name);
|
||||
String table_metadata_path = getObjectMetadataPath(table_name);
|
||||
|
||||
try
|
||||
{
|
||||
@ -346,49 +219,63 @@ void DatabaseOnDisk::removeTable(
|
||||
{
|
||||
LOG_WARNING(log, getCurrentExceptionMessage(__PRETTY_FUNCTION__));
|
||||
}
|
||||
database.attachTable(table_name, res);
|
||||
attachTable(table_name, res);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void DatabaseOnDisk::removeDictionary(
|
||||
IDatabase & database,
|
||||
const Context & context,
|
||||
const String & dictionary_name,
|
||||
Poco::Logger * /*log*/)
|
||||
void DatabaseOnDisk::renameTable(
|
||||
const Context & context,
|
||||
const String & table_name,
|
||||
IDatabase & to_database,
|
||||
const String & to_table_name,
|
||||
TableStructureWriteLockHolder & lock)
|
||||
{
|
||||
database.detachDictionary(dictionary_name, context);
|
||||
if (typeid(*this) != typeid(to_database))
|
||||
throw Exception("Moving tables between databases of different engines is not supported", ErrorCodes::NOT_IMPLEMENTED);
|
||||
|
||||
String dictionary_metadata_path = database.getObjectMetadataPath(dictionary_name);
|
||||
StoragePtr table = tryGetTable(context, table_name);
|
||||
|
||||
if (!table)
|
||||
throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
|
||||
|
||||
/// Notify the table that it is renamed. If the table does not support renaming, exception is thrown.
|
||||
try
|
||||
{
|
||||
Poco::File(dictionary_metadata_path).remove();
|
||||
table->rename(context.getPath() + "/data/" + escapeForFileName(to_database.getDatabaseName()) + "/",
|
||||
to_database.getDatabaseName(),
|
||||
to_table_name, lock);
|
||||
}
|
||||
catch (...)
|
||||
catch (const Exception &)
|
||||
{
|
||||
/// If remove was not possible for some reason
|
||||
database.attachDictionary(dictionary_name, context);
|
||||
throw;
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
/// Better diagnostics.
|
||||
throw Exception{Exception::CreateFromPoco, e};
|
||||
}
|
||||
|
||||
ASTPtr ast = getQueryFromMetadata(getObjectMetadataPath(table_name));
|
||||
if (!ast)
|
||||
throw Exception("There is no metadata file for table " + backQuote(table_name) + ".", ErrorCodes::FILE_DOESNT_EXIST);
|
||||
ast->as<ASTCreateQuery &>().table = to_table_name;
|
||||
|
||||
/// NOTE Non-atomic.
|
||||
to_database.createTable(context, to_table_name, table, ast);
|
||||
removeTable(context, table_name);
|
||||
}
|
||||
|
||||
|
||||
ASTPtr DatabaseOnDisk::getCreateTableQueryImpl(
|
||||
const IDatabase & database,
|
||||
const Context & context,
|
||||
const String & table_name,
|
||||
bool throw_on_error)
|
||||
ASTPtr DatabaseOnDisk::getCreateTableQueryImpl(const Context & context, const String & table_name, bool throw_on_error) const
|
||||
{
|
||||
ASTPtr ast;
|
||||
|
||||
auto table_metadata_path = detail::getObjectMetadataPath(database.getMetadataPath(), table_name);
|
||||
ast = detail::getCreateQueryFromMetadata(table_metadata_path, database.getDatabaseName(), throw_on_error);
|
||||
auto table_metadata_path = getObjectMetadataPath(table_name);
|
||||
ast = getCreateQueryFromMetadata(table_metadata_path, throw_on_error);
|
||||
if (!ast && throw_on_error)
|
||||
{
|
||||
/// Handle system.* tables for which there are no table.sql files.
|
||||
bool has_table = database.tryGetTable(context, table_name) != nullptr;
|
||||
bool has_table = tryGetTable(context, table_name) != nullptr;
|
||||
|
||||
auto msg = has_table
|
||||
? "There is no CREATE TABLE query for table "
|
||||
@ -400,40 +287,17 @@ ASTPtr DatabaseOnDisk::getCreateTableQueryImpl(
|
||||
return ast;
|
||||
}
|
||||
|
||||
|
||||
ASTPtr DatabaseOnDisk::getCreateDictionaryQueryImpl(
|
||||
const IDatabase & database,
|
||||
const Context & context,
|
||||
const String & dictionary_name,
|
||||
bool throw_on_error)
|
||||
ASTPtr DatabaseOnDisk::getCreateDatabaseQuery(const Context & /*context*/) const
|
||||
{
|
||||
ASTPtr ast;
|
||||
|
||||
auto dictionary_metadata_path = detail::getObjectMetadataPath(database.getMetadataPath(), dictionary_name);
|
||||
ast = detail::getCreateQueryFromMetadata(dictionary_metadata_path, database.getDatabaseName(), throw_on_error);
|
||||
if (!ast && throw_on_error)
|
||||
{
|
||||
/// Handle system.* tables for which there are no table.sql files.
|
||||
bool has_dictionary = database.isDictionaryExist(context, dictionary_name);
|
||||
|
||||
auto msg = has_dictionary ? "There is no CREATE DICTIONARY query for table " : "There is no metadata file for dictionary ";
|
||||
|
||||
throw Exception(msg + backQuote(dictionary_name), ErrorCodes::CANNOT_GET_CREATE_DICTIONARY_QUERY);
|
||||
}
|
||||
|
||||
return ast;
|
||||
}
|
||||
|
||||
ASTPtr DatabaseOnDisk::getCreateDatabaseQuery(const IDatabase & database, const Context & /*context*/)
|
||||
{
|
||||
ASTPtr ast;
|
||||
|
||||
auto database_metadata_path = detail::getDatabaseMetadataPath(database.getMetadataPath());
|
||||
ast = detail::getCreateQueryFromMetadata(database_metadata_path, database.getDatabaseName(), true);
|
||||
auto database_metadata_path = getDatabaseMetadataPath(getMetadataPath());
|
||||
ast = getCreateQueryFromMetadata(database_metadata_path, true);
|
||||
if (!ast)
|
||||
{
|
||||
/// Handle databases (such as default) for which there are no database.sql files.
|
||||
String query = "CREATE DATABASE " + backQuoteIfNeed(database.getDatabaseName()) + " ENGINE = Lazy";
|
||||
//FIXME WTF
|
||||
String query = "CREATE DATABASE " + backQuoteIfNeed(getDatabaseName()) + " ENGINE = Lazy";
|
||||
ParserCreateQuery parser;
|
||||
ast = parseQuery(parser, query.data(), query.data() + query.size(), "", 0);
|
||||
}
|
||||
@ -441,22 +305,22 @@ ASTPtr DatabaseOnDisk::getCreateDatabaseQuery(const IDatabase & database, const
|
||||
return ast;
|
||||
}
|
||||
|
||||
void DatabaseOnDisk::drop(const IDatabase & database, const Context & context)
|
||||
void DatabaseOnDisk::drop(const Context & context)
|
||||
{
|
||||
Poco::File(context.getPath() + database.getDataPath()).remove(false);
|
||||
Poco::File(database.getMetadataPath()).remove(false);
|
||||
Poco::File(context.getPath() + getDataPath()).remove(false);
|
||||
Poco::File(getMetadataPath()).remove(false);
|
||||
}
|
||||
|
||||
String DatabaseOnDisk::getObjectMetadataPath(const IDatabase & database, const String & table_name)
|
||||
String DatabaseOnDisk::getObjectMetadataPath(const String & table_name) const
|
||||
{
|
||||
return detail::getObjectMetadataPath(database.getMetadataPath(), table_name);
|
||||
String base_path = getMetadataPath();
|
||||
//FIXME
|
||||
return base_path + (endsWith(base_path, "/") ? "" : "/") + escapeForFileName(table_name) + ".sql";
|
||||
}
|
||||
|
||||
time_t DatabaseOnDisk::getObjectMetadataModificationTime(
|
||||
const IDatabase & database,
|
||||
const String & table_name)
|
||||
time_t DatabaseOnDisk::getObjectMetadataModificationTime(const String & table_name) const
|
||||
{
|
||||
String table_metadata_path = getObjectMetadataPath(database, table_name);
|
||||
String table_metadata_path = getObjectMetadataPath(table_name);
|
||||
Poco::File meta_file(table_metadata_path);
|
||||
|
||||
if (meta_file.exists())
|
||||
@ -465,10 +329,10 @@ time_t DatabaseOnDisk::getObjectMetadataModificationTime(
|
||||
return static_cast<time_t>(0);
|
||||
}
|
||||
|
||||
void DatabaseOnDisk::iterateMetadataFiles(const IDatabase & database, Poco::Logger * log, const Context & context, const IteratingFunction & iterating_function)
|
||||
void DatabaseOnDisk::iterateMetadataFiles(const Context & context, const IteratingFunction & iterating_function) const
|
||||
{
|
||||
Poco::DirectoryIterator dir_end;
|
||||
for (Poco::DirectoryIterator dir_it(database.getMetadataPath()); dir_it != dir_end; ++dir_it)
|
||||
for (Poco::DirectoryIterator dir_it(getMetadataPath()); dir_it != dir_end; ++dir_it)
|
||||
{
|
||||
/// For '.svn', '.gitignore' directory and similar.
|
||||
if (dir_it.name().at(0) == '.')
|
||||
@ -483,7 +347,7 @@ void DatabaseOnDisk::iterateMetadataFiles(const IDatabase & database, Poco::Logg
|
||||
if (endsWith(dir_it.name(), tmp_drop_ext))
|
||||
{
|
||||
const std::string object_name = dir_it.name().substr(0, dir_it.name().size() - strlen(tmp_drop_ext));
|
||||
if (Poco::File(context.getPath() + database.getDataPath() + '/' + object_name).exists())
|
||||
if (Poco::File(context.getPath() + getDataPath() + '/' + object_name).exists())
|
||||
{
|
||||
Poco::File(dir_it->path()).renameTo(object_name + ".sql");
|
||||
LOG_WARNING(log, "Object " << backQuote(object_name) << " was not dropped previously");
|
||||
@ -510,9 +374,58 @@ void DatabaseOnDisk::iterateMetadataFiles(const IDatabase & database, Poco::Logg
|
||||
iterating_function(dir_it.name());
|
||||
}
|
||||
else
|
||||
throw Exception("Incorrect file extension: " + dir_it.name() + " in metadata directory " + database.getMetadataPath(),
|
||||
throw Exception("Incorrect file extension: " + dir_it.name() + " in metadata directory " + getMetadataPath(),
|
||||
ErrorCodes::INCORRECT_FILE_NAME);
|
||||
}
|
||||
}
|
||||
|
||||
String DatabaseOnDisk::getDatabaseMetadataPath(const String & base_path) const
|
||||
{
|
||||
//FIXME
|
||||
return (endsWith(base_path, "/") ? base_path.substr(0, base_path.size() - 1) : base_path) + ".sql";
|
||||
}
|
||||
|
||||
ASTPtr DatabaseOnDisk::getQueryFromMetadata(const String & database_metadata_path, bool throw_on_error) const
|
||||
{
|
||||
String query;
|
||||
|
||||
try
|
||||
{
|
||||
ReadBufferFromFile in(database_metadata_path, 4096);
|
||||
readStringUntilEOF(query, in);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
if (!throw_on_error && e.code() == ErrorCodes::FILE_DOESNT_EXIST)
|
||||
return nullptr;
|
||||
else
|
||||
throw;
|
||||
}
|
||||
|
||||
ParserCreateQuery parser;
|
||||
const char * pos = query.data();
|
||||
std::string error_message;
|
||||
auto ast = tryParseQuery(parser, pos, pos + query.size(), error_message, /* hilite = */ false,
|
||||
"in file " + metadata_path, /* allow_multi_statements = */ false, 0);
|
||||
|
||||
if (!ast && throw_on_error)
|
||||
throw Exception(error_message, ErrorCodes::SYNTAX_ERROR);
|
||||
|
||||
return ast;
|
||||
}
|
||||
|
||||
ASTPtr DatabaseOnDisk::getCreateQueryFromMetadata(const String & database_metadata_path, bool throw_on_error) const
|
||||
{
|
||||
ASTPtr ast = getQueryFromMetadata(database_metadata_path, throw_on_error);
|
||||
|
||||
if (ast)
|
||||
{
|
||||
auto & ast_create_query = ast->as<ASTCreateQuery &>();
|
||||
ast_create_query.attach = false;
|
||||
ast_create_query.database = database_name;
|
||||
}
|
||||
|
||||
return ast;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -11,14 +11,6 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace detail
|
||||
{
|
||||
String getObjectMetadataPath(const String & base_path, const String & dictionary_name);
|
||||
String getDatabaseMetadataPath(const String & base_path);
|
||||
ASTPtr getQueryFromMetadata(const String & metadata_path, bool throw_on_error = true);
|
||||
ASTPtr getCreateQueryFromMetadata(const String & metadata_path, const String & database, bool throw_on_error);
|
||||
}
|
||||
|
||||
ASTPtr parseCreateQueryFromMetadataFile(const String & filepath, Poco::Logger * log);
|
||||
|
||||
std::pair<String, StoragePtr> createTableFromAST(
|
||||
@ -37,126 +29,58 @@ String getObjectDefinitionFromCreateQuery(const ASTPtr & query);
|
||||
|
||||
/* Class to provide basic operations with tables when metadata is stored on disk in .sql files.
|
||||
*/
|
||||
class DatabaseOnDisk
|
||||
class DatabaseOnDisk : public DatabaseWithOwnTablesBase
|
||||
{
|
||||
public:
|
||||
static void createTable(
|
||||
IDatabase & database,
|
||||
DatabaseOnDisk(const String & name, const String & metadata_path_, const String & logger)
|
||||
: DatabaseWithOwnTablesBase(name, logger)
|
||||
, metadata_path(metadata_path_)
|
||||
, data_path("data/" + escapeForFileName(database_name) + "/") {}
|
||||
|
||||
void createTable(
|
||||
const Context & context,
|
||||
const String & table_name,
|
||||
const StoragePtr & table,
|
||||
const ASTPtr & query);
|
||||
const ASTPtr & query) override;
|
||||
|
||||
static void createDictionary(
|
||||
IDatabase & database,
|
||||
void removeTable(
|
||||
const Context & context,
|
||||
const String & dictionary_name,
|
||||
const ASTPtr & query);
|
||||
const String & table_name) override;
|
||||
|
||||
static void removeTable(
|
||||
IDatabase & database,
|
||||
const Context & context,
|
||||
const String & table_name,
|
||||
Poco::Logger * log);
|
||||
|
||||
static void removeDictionary(
|
||||
IDatabase & database,
|
||||
const Context & context,
|
||||
const String & dictionary_name,
|
||||
Poco::Logger * log);
|
||||
|
||||
template <typename Database>
|
||||
static void renameTable(
|
||||
IDatabase & database,
|
||||
void renameTable(
|
||||
const Context & context,
|
||||
const String & table_name,
|
||||
IDatabase & to_database,
|
||||
const String & to_table_name,
|
||||
TableStructureWriteLockHolder & lock);
|
||||
TableStructureWriteLockHolder & lock) override;
|
||||
|
||||
static ASTPtr getCreateDatabaseQuery(
|
||||
const IDatabase & database,
|
||||
const Context & context);
|
||||
ASTPtr getCreateDatabaseQuery(const Context & context) const override;
|
||||
|
||||
static void drop(const IDatabase & database, const Context & context);
|
||||
void drop(const Context & context) override;
|
||||
|
||||
static String getObjectMetadataPath(
|
||||
const IDatabase & database,
|
||||
const String & object_name);
|
||||
String getObjectMetadataPath(const String & object_name) const override;
|
||||
|
||||
static time_t getObjectMetadataModificationTime(
|
||||
const IDatabase & database,
|
||||
const String & object_name);
|
||||
time_t getObjectMetadataModificationTime(const String & object_name) const override;
|
||||
|
||||
String getDataPath() const override { return data_path; }
|
||||
String getMetadataPath() const override { return metadata_path; }
|
||||
|
||||
protected:
|
||||
using IteratingFunction = std::function<void(const String &)>;
|
||||
static void iterateMetadataFiles(const IDatabase & database, Poco::Logger * log, const Context & context, const IteratingFunction & iterating_function);
|
||||
void iterateMetadataFiles(const Context & context, const IteratingFunction & iterating_function) const;
|
||||
|
||||
static ASTPtr getCreateTableQueryImpl(
|
||||
const IDatabase & database,
|
||||
ASTPtr getCreateTableQueryImpl(
|
||||
const Context & context,
|
||||
const String & table_name,
|
||||
bool throw_on_error);
|
||||
bool throw_on_error) const override;
|
||||
|
||||
static ASTPtr getCreateDictionaryQueryImpl(
|
||||
const IDatabase & database,
|
||||
const Context & context,
|
||||
const String & dictionary_name,
|
||||
bool throw_on_error);
|
||||
String getDatabaseMetadataPath(const String & base_path) const;
|
||||
ASTPtr getQueryFromMetadata(const String & metadata_path, bool throw_on_error = true) const;
|
||||
ASTPtr getCreateQueryFromMetadata(const String & metadata_path, bool throw_on_error) const;
|
||||
|
||||
|
||||
const String metadata_path;
|
||||
const String data_path;
|
||||
};
|
||||
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int UNKNOWN_TABLE;
|
||||
extern const int FILE_DOESNT_EXIST;
|
||||
}
|
||||
|
||||
template <typename Database>
|
||||
void DatabaseOnDisk::renameTable(
|
||||
IDatabase & database,
|
||||
const Context & context,
|
||||
const String & table_name,
|
||||
IDatabase & to_database,
|
||||
const String & to_table_name,
|
||||
TableStructureWriteLockHolder & lock)
|
||||
{
|
||||
Database * to_database_concrete = typeid_cast<Database *>(&to_database);
|
||||
|
||||
if (!to_database_concrete)
|
||||
throw Exception("Moving tables between databases of different engines is not supported", ErrorCodes::NOT_IMPLEMENTED);
|
||||
|
||||
StoragePtr table = database.tryGetTable(context, table_name);
|
||||
|
||||
if (!table)
|
||||
throw Exception("Table " + backQuote(database.getDatabaseName()) + "." + backQuote(table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
|
||||
|
||||
/// Notify the table that it is renamed. If the table does not support renaming, exception is thrown.
|
||||
try
|
||||
{
|
||||
table->rename(context.getPath() + "/data/" + escapeForFileName(to_database_concrete->getDatabaseName()) + "/",
|
||||
to_database_concrete->getDatabaseName(),
|
||||
to_table_name, lock);
|
||||
}
|
||||
catch (const Exception &)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
/// Better diagnostics.
|
||||
throw Exception{Exception::CreateFromPoco, e};
|
||||
}
|
||||
|
||||
ASTPtr ast = detail::getQueryFromMetadata(detail::getObjectMetadataPath(database.getMetadataPath(), table_name));
|
||||
if (!ast)
|
||||
throw Exception("There is no metadata file for table " + backQuote(table_name) + ".", ErrorCodes::FILE_DOESNT_EXIST);
|
||||
ast->as<ASTCreateQuery &>().table = to_table_name;
|
||||
|
||||
/// NOTE Non-atomic.
|
||||
to_database_concrete->createTable(context, to_table_name, table, ast);
|
||||
database.removeTable(context, table_name);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -35,10 +35,8 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_CREATE_TABLE_FROM_METADATA;
|
||||
extern const int CANNOT_CREATE_DICTIONARY_FROM_METADATA;
|
||||
extern const int EMPTY_LIST_OF_COLUMNS_PASSED;
|
||||
extern const int CANNOT_PARSE_TEXT;
|
||||
extern const int EMPTY_LIST_OF_ATTRIBUTES_PASSED;
|
||||
}
|
||||
|
||||
|
||||
@ -92,11 +90,8 @@ void logAboutProgress(Poco::Logger * log, size_t processed, size_t total, Atomic
|
||||
}
|
||||
|
||||
|
||||
DatabaseOrdinary::DatabaseOrdinary(String name_, const String & metadata_path_, const Context & context_)
|
||||
: DatabaseWithDictionaries(std::move(name_))
|
||||
, metadata_path(metadata_path_)
|
||||
, data_path("data/" + escapeForFileName(database_name) + "/")
|
||||
, log(&Logger::get("DatabaseOrdinary (" + database_name + ")"))
|
||||
DatabaseOrdinary::DatabaseOrdinary(const String & name_, const String & metadata_path_, const Context & context_)
|
||||
: DatabaseWithDictionaries(name_, metadata_path_,"DatabaseOrdinary (" + name_ + ")")
|
||||
{
|
||||
Poco::File(context_.getPath() + getDataPath()).createDirectories();
|
||||
}
|
||||
@ -114,7 +109,7 @@ void DatabaseOrdinary::loadStoredObjects(
|
||||
FileNames file_names;
|
||||
|
||||
size_t total_dictionaries = 0;
|
||||
DatabaseOnDisk::iterateMetadataFiles(*this, log, context, [&file_names, &total_dictionaries, this](const String & file_name)
|
||||
iterateMetadataFiles(context, [&file_names, &total_dictionaries, this](const String & file_name)
|
||||
{
|
||||
String full_path = metadata_path + "/" + file_name;
|
||||
try
|
||||
@ -206,70 +201,6 @@ void DatabaseOrdinary::startupTables(ThreadPool & thread_pool)
|
||||
thread_pool.wait();
|
||||
}
|
||||
|
||||
void DatabaseOrdinary::createTable(
|
||||
const Context & context,
|
||||
const String & table_name,
|
||||
const StoragePtr & table,
|
||||
const ASTPtr & query)
|
||||
{
|
||||
DatabaseOnDisk::createTable(*this, context, table_name, table, query);
|
||||
}
|
||||
|
||||
void DatabaseOrdinary::createDictionary(
|
||||
const Context & context,
|
||||
const String & dictionary_name,
|
||||
const ASTPtr & query)
|
||||
{
|
||||
DatabaseOnDisk::createDictionary(*this, context, dictionary_name, query);
|
||||
}
|
||||
|
||||
void DatabaseOrdinary::removeTable(
|
||||
const Context & context,
|
||||
const String & table_name)
|
||||
{
|
||||
DatabaseOnDisk::removeTable(*this, context, table_name, log);
|
||||
}
|
||||
|
||||
void DatabaseOrdinary::removeDictionary(
|
||||
const Context & context,
|
||||
const String & table_name)
|
||||
{
|
||||
DatabaseOnDisk::removeDictionary(*this, context, table_name, log);
|
||||
}
|
||||
|
||||
void DatabaseOrdinary::renameTable(
|
||||
const Context & context,
|
||||
const String & table_name,
|
||||
IDatabase & to_database,
|
||||
const String & to_table_name,
|
||||
TableStructureWriteLockHolder & lock)
|
||||
{
|
||||
DatabaseOnDisk::renameTable<DatabaseOrdinary>(*this, context, table_name, to_database, to_table_name, lock);
|
||||
}
|
||||
|
||||
|
||||
time_t DatabaseOrdinary::getObjectMetadataModificationTime(
|
||||
const Context & /* context */,
|
||||
const String & table_name)
|
||||
{
|
||||
return DatabaseOnDisk::getObjectMetadataModificationTime(*this, table_name);
|
||||
}
|
||||
|
||||
ASTPtr DatabaseOrdinary::getCreateTableQueryImpl(const Context & context, const String & table_name, bool throw_on_error) const
|
||||
{
|
||||
return DatabaseOnDisk::getCreateTableQueryImpl(*this, context, table_name, throw_on_error);
|
||||
}
|
||||
|
||||
ASTPtr DatabaseOrdinary::getCreateDictionaryQueryImpl(const Context & context, const String & dictionary_name, bool throw_on_error) const
|
||||
{
|
||||
return DatabaseOnDisk::getCreateDictionaryQueryImpl(*this, context, dictionary_name, throw_on_error);
|
||||
}
|
||||
|
||||
ASTPtr DatabaseOrdinary::getCreateDatabaseQuery(const Context & context) const
|
||||
{
|
||||
return DatabaseOnDisk::getCreateDatabaseQuery(*this, context);
|
||||
}
|
||||
|
||||
void DatabaseOrdinary::alterTable(
|
||||
const Context & context,
|
||||
const String & table_name,
|
||||
@ -330,26 +261,4 @@ void DatabaseOrdinary::alterTable(
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void DatabaseOrdinary::drop(const Context & context)
|
||||
{
|
||||
DatabaseOnDisk::drop(*this, context);
|
||||
}
|
||||
|
||||
|
||||
String DatabaseOrdinary::getDataPath() const
|
||||
{
|
||||
return data_path;
|
||||
}
|
||||
|
||||
String DatabaseOrdinary::getMetadataPath() const
|
||||
{
|
||||
return metadata_path;
|
||||
}
|
||||
|
||||
String DatabaseOrdinary::getObjectMetadataPath(const String & table_name) const
|
||||
{
|
||||
return DatabaseOnDisk::getObjectMetadataPath(*this, table_name);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -14,7 +14,7 @@ namespace DB
|
||||
class DatabaseOrdinary : public DatabaseWithDictionaries //DatabaseWithOwnTablesBase
|
||||
{
|
||||
public:
|
||||
DatabaseOrdinary(String name_, const String & metadata_path_, const Context & context);
|
||||
DatabaseOrdinary(const String & name_, const String & metadata_path_, const Context & context);
|
||||
|
||||
String getEngineName() const override { return "Ordinary"; }
|
||||
|
||||
@ -22,32 +22,6 @@ public:
|
||||
Context & context,
|
||||
bool has_force_restore_data_flag) override;
|
||||
|
||||
void createTable(
|
||||
const Context & context,
|
||||
const String & table_name,
|
||||
const StoragePtr & table,
|
||||
const ASTPtr & query) override;
|
||||
|
||||
void createDictionary(
|
||||
const Context & context,
|
||||
const String & dictionary_name,
|
||||
const ASTPtr & query) override;
|
||||
|
||||
void removeTable(
|
||||
const Context & context,
|
||||
const String & table_name) override;
|
||||
|
||||
void removeDictionary(
|
||||
const Context & context,
|
||||
const String & table_name) override;
|
||||
|
||||
void renameTable(
|
||||
const Context & context,
|
||||
const String & table_name,
|
||||
IDatabase & to_database,
|
||||
const String & to_table_name,
|
||||
TableStructureWriteLockHolder &) override;
|
||||
|
||||
void alterTable(
|
||||
const Context & context,
|
||||
const String & name,
|
||||
@ -56,29 +30,7 @@ public:
|
||||
const ConstraintsDescription & constraints,
|
||||
const ASTModifier & engine_modifier) override;
|
||||
|
||||
time_t getObjectMetadataModificationTime(
|
||||
const Context & context,
|
||||
const String & table_name) override;
|
||||
|
||||
ASTPtr getCreateDatabaseQuery(const Context & context) const override;
|
||||
|
||||
String getDataPath() const override;
|
||||
String getMetadataPath() const override;
|
||||
String getObjectMetadataPath(const String & table_name) const override;
|
||||
|
||||
void drop(const Context & context) override;
|
||||
|
||||
protected:
|
||||
ASTPtr getCreateTableQueryImpl(const Context & context, const String & table_name,
|
||||
bool throw_on_error) const override;
|
||||
|
||||
ASTPtr getCreateDictionaryQueryImpl(const Context & context, const String & name,
|
||||
bool throw_on_error) const override;
|
||||
|
||||
private:
|
||||
const String metadata_path;
|
||||
const String data_path;
|
||||
Poco::Logger * log;
|
||||
|
||||
void startupTables(ThreadPool & thread_pool);
|
||||
};
|
||||
|
@ -2,6 +2,8 @@
|
||||
#include <Interpreters/ExternalDictionariesLoader.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Storages/StorageDictionary.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <Poco/File.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -17,7 +19,7 @@ namespace ErrorCodes
|
||||
}
|
||||
|
||||
|
||||
void DatabaseWithDictionaries::attachDictionary(const String & dictionary_name, const Context & context, bool load)
|
||||
void DatabaseWithDictionaries::attachDictionary(const String & dictionary_name, const Context & context, bool reload)
|
||||
{
|
||||
const auto & external_loader = context.getExternalDictionariesLoader();
|
||||
|
||||
@ -31,7 +33,7 @@ void DatabaseWithDictionaries::attachDictionary(const String & dictionary_name,
|
||||
ErrorCodes::DICTIONARY_ALREADY_EXISTS);
|
||||
}
|
||||
|
||||
if (load)
|
||||
if (reload)
|
||||
external_loader.reload(full_name, true);
|
||||
}
|
||||
|
||||
@ -50,6 +52,82 @@ void DatabaseWithDictionaries::detachDictionary(const String & dictionary_name,
|
||||
|
||||
}
|
||||
|
||||
void DatabaseWithDictionaries::createDictionary( const Context & context, const String & dictionary_name, const ASTPtr & query)
|
||||
{
|
||||
const auto & settings = context.getSettingsRef();
|
||||
|
||||
/** The code is based on the assumption that all threads share the same order of operations
|
||||
* - creating the .sql.tmp file;
|
||||
* - adding a dictionary to `dictionaries`;
|
||||
* - rename .sql.tmp to .sql.
|
||||
*/
|
||||
|
||||
/// A race condition would be possible if a dictionary with the same name is simultaneously created using CREATE and using ATTACH.
|
||||
/// But there is protection from it - see using DDLGuard in InterpreterCreateQuery.
|
||||
if (isDictionaryExist(context, dictionary_name))
|
||||
throw Exception("Dictionary " + backQuote(getDatabaseName()) + "." + backQuote(dictionary_name) + " already exists.", ErrorCodes::DICTIONARY_ALREADY_EXISTS);
|
||||
|
||||
if (isTableExist(context, dictionary_name))
|
||||
throw Exception("Table " + backQuote(getDatabaseName()) + "." + backQuote(dictionary_name) + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
|
||||
|
||||
|
||||
String dictionary_metadata_path = getObjectMetadataPath(dictionary_name);
|
||||
String dictionary_metadata_tmp_path = dictionary_metadata_path + ".tmp";
|
||||
String statement;
|
||||
|
||||
{
|
||||
statement = getObjectDefinitionFromCreateQuery(query);
|
||||
|
||||
/// Exclusive flags guarantees, that table is not created right now in another thread. Otherwise, exception will be thrown.
|
||||
WriteBufferFromFile out(dictionary_metadata_tmp_path, statement.size(), O_WRONLY | O_CREAT | O_EXCL);
|
||||
writeString(statement, out);
|
||||
out.next();
|
||||
if (settings.fsync_metadata)
|
||||
out.sync();
|
||||
out.close();
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
/// Do not load it now because we want more strict loading
|
||||
attachDictionary(dictionary_name, context, false);
|
||||
/// Load dictionary
|
||||
bool lazy_load = context.getConfigRef().getBool("dictionaries_lazy_load", true);
|
||||
String dict_name = getDatabaseName() + "." + dictionary_name;
|
||||
context.getExternalDictionariesLoader().addDictionaryWithConfig(
|
||||
dict_name, getDatabaseName(), query->as<const ASTCreateQuery &>(), !lazy_load);
|
||||
|
||||
/// If it was ATTACH query and file with dictionary metadata already exist
|
||||
/// (so, ATTACH is done after DETACH), then rename atomically replaces old file with new one.
|
||||
Poco::File(dictionary_metadata_tmp_path).renameTo(dictionary_metadata_path);
|
||||
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
detachDictionary(dictionary_name, context);
|
||||
Poco::File(dictionary_metadata_tmp_path).remove();
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void DatabaseWithDictionaries::removeDictionary(const Context & context, const String & dictionary_name)
|
||||
{
|
||||
detachDictionary(dictionary_name, context);
|
||||
|
||||
String dictionary_metadata_path = getObjectMetadataPath(dictionary_name);
|
||||
|
||||
try
|
||||
{
|
||||
Poco::File(dictionary_metadata_path).remove();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
/// If remove was not possible for some reason
|
||||
attachDictionary(dictionary_name, context);
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
StoragePtr DatabaseWithDictionaries::tryGetTable(const Context & context, const String & table_name) const
|
||||
{
|
||||
if (auto table_ptr = DatabaseWithOwnTablesBase::tryGetTable(context, table_name))
|
||||
@ -121,4 +199,26 @@ StoragePtr DatabaseWithDictionaries::getDictionaryStorage(const Context & contex
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
ASTPtr DatabaseWithDictionaries::getCreateDictionaryQueryImpl(
|
||||
const Context & context,
|
||||
const String & dictionary_name,
|
||||
bool throw_on_error) const
|
||||
{
|
||||
ASTPtr ast;
|
||||
|
||||
auto dictionary_metadata_path = getObjectMetadataPath(dictionary_name);
|
||||
ast = getCreateQueryFromMetadata(dictionary_metadata_path, throw_on_error);
|
||||
if (!ast && throw_on_error)
|
||||
{
|
||||
/// Handle system.* tables for which there are no table.sql files.
|
||||
bool has_dictionary = isDictionaryExist(context, dictionary_name);
|
||||
|
||||
auto msg = has_dictionary ? "There is no CREATE DICTIONARY query for table " : "There is no metadata file for dictionary ";
|
||||
|
||||
throw Exception(msg + backQuote(dictionary_name), ErrorCodes::CANNOT_GET_CREATE_DICTIONARY_QUERY);
|
||||
}
|
||||
|
||||
return ast;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,15 +1,21 @@
|
||||
#include <Databases/DatabasesCommon.h>
|
||||
#include <Databases/DatabaseOnDisk.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
||||
class DatabaseWithDictionaries : public DatabaseWithOwnTablesBase
|
||||
class DatabaseWithDictionaries : public DatabaseOnDisk
|
||||
{
|
||||
public:
|
||||
void attachDictionary(const String & name, const Context & context, bool reload) override;
|
||||
void attachDictionary(const String & name, const Context & context, bool reload = true) override;
|
||||
|
||||
void detachDictionary(const String & name, const Context & context, bool reload) override;
|
||||
void detachDictionary(const String & name, const Context & context, bool reload = true) override;
|
||||
|
||||
void createDictionary(const Context & context,
|
||||
const String & dictionary_name,
|
||||
const ASTPtr & query) override;
|
||||
|
||||
void removeDictionary(const Context & context, const String & dictionary_name) override;
|
||||
|
||||
StoragePtr tryGetTable(const Context & context, const String & table_name) const override;
|
||||
|
||||
@ -20,10 +26,14 @@ public:
|
||||
bool isDictionaryExist(const Context & context, const String & dictionary_name) const override;
|
||||
|
||||
protected:
|
||||
DatabaseWithDictionaries(String name) : DatabaseWithOwnTablesBase(std::move(name)) {}
|
||||
DatabaseWithDictionaries(const String & name, const String & metadata_path_, const String & logger)
|
||||
: DatabaseOnDisk(name, metadata_path_, logger) {}
|
||||
|
||||
StoragePtr getDictionaryStorage(const Context & context, const String & table_name) const;
|
||||
|
||||
ASTPtr getCreateDictionaryQueryImpl(const Context & context,
|
||||
const String & dictionary_name,
|
||||
bool throw_on_error) const override;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -21,6 +21,11 @@ namespace ErrorCodes
|
||||
extern const int DICTIONARY_ALREADY_EXISTS;
|
||||
}
|
||||
|
||||
DatabaseWithOwnTablesBase::DatabaseWithOwnTablesBase(const String & name_, const String & logger)
|
||||
: IDatabase(name_), log(&Logger::get(logger))
|
||||
{
|
||||
}
|
||||
|
||||
bool DatabaseWithOwnTablesBase::isTableExist(
|
||||
const Context & /*context*/,
|
||||
const String & table_name) const
|
||||
@ -110,7 +115,7 @@ DatabaseWithOwnTablesBase::~DatabaseWithOwnTablesBase()
|
||||
{
|
||||
try
|
||||
{
|
||||
shutdown();
|
||||
DatabaseWithOwnTablesBase::shutdown();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
@ -44,8 +44,9 @@ protected:
|
||||
mutable std::mutex mutex;
|
||||
Tables tables;
|
||||
Dictionaries dictionaries;
|
||||
Poco::Logger * log;
|
||||
|
||||
DatabaseWithOwnTablesBase(String name_) : IDatabase(std::move(name_)) { }
|
||||
DatabaseWithOwnTablesBase(const String & name_, const String & logger);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -235,7 +235,7 @@ public:
|
||||
}
|
||||
|
||||
/// Returns time of table's metadata change, 0 if there is no corresponding metadata file.
|
||||
virtual time_t getObjectMetadataModificationTime(const Context & /*context*/, const String & /*name*/)
|
||||
virtual time_t getObjectMetadataModificationTime(const String & /*name*/) const
|
||||
{
|
||||
return static_cast<time_t>(0);
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ bool ExternalLoaderDatabaseConfigRepository::exists(const std::string & loadable
|
||||
|
||||
Poco::Timestamp ExternalLoaderDatabaseConfigRepository::getUpdateTime(const std::string & loadable_definition_name)
|
||||
{
|
||||
return database->getObjectMetadataModificationTime(context, trimDatabaseName(loadable_definition_name, database));
|
||||
return database->getObjectMetadataModificationTime(trimDatabaseName(loadable_definition_name, database));
|
||||
}
|
||||
|
||||
std::set<std::string> ExternalLoaderDatabaseConfigRepository::getAllLoadablesDefinitionNames() const
|
||||
|
@ -258,7 +258,7 @@ protected:
|
||||
res_columns[res_index++]->insert(database->getObjectMetadataPath(table_name));
|
||||
|
||||
if (columns_mask[src_index++])
|
||||
res_columns[res_index++]->insert(static_cast<UInt64>(database->getObjectMetadataModificationTime(context, table_name)));
|
||||
res_columns[res_index++]->insert(static_cast<UInt64>(database->getObjectMetadataModificationTime(table_name)));
|
||||
|
||||
{
|
||||
Array dependencies_table_name_array;
|
||||
|
Loading…
Reference in New Issue
Block a user