From caaf7002f551d677eb30639acc94a106b1e97691 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 17 Mar 2016 23:23:40 +0300 Subject: [PATCH] Database engines: development [#METR-19997]. --- dbms/include/DB/Databases/DatabaseOrdinary.h | 18 ++-- dbms/include/DB/Databases/IDatabase.h | 9 +- dbms/include/DB/Interpreters/Context.h | 2 +- dbms/include/DB/Storages/IStorage.h | 10 -- dbms/include/DB/Storages/StorageChunkRef.h | 2 - dbms/src/Databases/DatabaseOrdinary.cpp | 94 +++++++++++++------ dbms/src/Interpreters/Context.cpp | 2 +- .../Interpreters/InterpreterRenameQuery.cpp | 57 +---------- dbms/src/Server/ReplicasStatusHandler.cpp | 9 +- dbms/src/Server/Server.cpp | 10 +- dbms/src/Storages/StorageChunkRef.cpp | 21 ----- 11 files changed, 100 insertions(+), 134 deletions(-) diff --git a/dbms/include/DB/Databases/DatabaseOrdinary.h b/dbms/include/DB/Databases/DatabaseOrdinary.h index bc2cc3b4abd..48f95c3878f 100644 --- a/dbms/include/DB/Databases/DatabaseOrdinary.h +++ b/dbms/include/DB/Databases/DatabaseOrdinary.h @@ -16,29 +16,31 @@ class DatabaseOrdinary : public IDatabase private: const String name; const String path; - std::mutex mutex; + mutable std::mutex mutex; Tables tables; public: DatabaseOrdinary(const String & name_, const String & path_, boost::threadpool::pool * thread_pool_); - bool isTableExist(const String & name) const override; + bool isTableExist(const String & table_name) const override; - StoragePtr tryGetTable(const String & name) override; + StoragePtr tryGetTable(const String & table_name) override; DatabaseIteratorPtr getIterator() override; bool empty() const override; - void createTable(const String & name, StoragePtr & table, const ASTPtr & query, const String & engine) override; + void createTable(const String & table_name, const StoragePtr & table, const ASTPtr & query, const String & engine) override; - StoragePtr removeTable(const String & name) override; + StoragePtr removeTable(const String & table_name) override; - void attachTable(const String & name, StoragePtr & table) override; + void attachTable(const String & table_name, const StoragePtr & table) override; - StoragePtr detachTable(const String & name) override; + StoragePtr detachTable(const String & table_name) override; - ASTPtr getCreateQuery(const String & name) const override; + void renameTable(const String & table_name, IDatabase & to_database, const String & to_table_name) override; + + ASTPtr getCreateQuery(const String & table_name) const override; void shutdown() override; }; diff --git a/dbms/include/DB/Databases/IDatabase.h b/dbms/include/DB/Databases/IDatabase.h index 5a2d164c172..13acd12d2d7 100644 --- a/dbms/include/DB/Databases/IDatabase.h +++ b/dbms/include/DB/Databases/IDatabase.h @@ -34,7 +34,7 @@ using DatabaseIteratorPtr = std::unique_ptr; * - переименовывание таблиц и перенос между БД с одинаковыми движками. */ -class IDatabase : protected std::enable_shared_from_this +class IDatabase : public std::enable_shared_from_this { public: /// Проверить существование таблицы. @@ -51,17 +51,20 @@ public: virtual bool empty() const = 0; /// Добавить таблицу в базу данных. Прописать её наличие в метаданных. - virtual void createTable(const String & name, StoragePtr & table, const ASTPtr & query, const String & engine) = 0; + virtual void createTable(const String & name, const StoragePtr & table, const ASTPtr & query, const String & engine) = 0; /// Удалить таблицу из базы данных и вернуть её. Удалить метаданные. virtual StoragePtr removeTable(const String & name) = 0; /// Добавить таблицу в базу данных, но не прописывать её в метаданных. БД может не поддерживать этот метод. - virtual void attachTable(const String & name, StoragePtr & table) = 0; + virtual void attachTable(const String & name, const StoragePtr & table) = 0; /// Забыть про таблицу, не удаляя её, и вернуть её. БД может не поддерживать этот метод. virtual StoragePtr detachTable(const String & name) = 0; + /// Переименовать таблицу и, возможно, переместить таблицу в другую БД. + virtual void renameTable(const String & name, IDatabase & to_database, const String & to_name) = 0; + /// Получить запрос CREATE TABLE для таблицы. virtual ASTPtr getCreateQuery(const String & name) const = 0; diff --git a/dbms/include/DB/Interpreters/Context.h b/dbms/include/DB/Interpreters/Context.h index d6144d10af1..e4dd46f02f8 100644 --- a/dbms/include/DB/Interpreters/Context.h +++ b/dbms/include/DB/Interpreters/Context.h @@ -158,7 +158,7 @@ public: StoragePtr tryGetTable(const String & database_name, const String & table_name) const; void addExternalTable(const String & table_name, StoragePtr storage); - void addDatabase(const String & database_name, DatabasePtr & database); + void addDatabase(const String & database_name, const DatabasePtr & database); void detachDatabase(const String & database_name); String getCurrentDatabase() const; diff --git a/dbms/include/DB/Storages/IStorage.h b/dbms/include/DB/Storages/IStorage.h index 0f6fea8b283..60491f86858 100644 --- a/dbms/include/DB/Storages/IStorage.h +++ b/dbms/include/DB/Storages/IStorage.h @@ -258,16 +258,6 @@ public: throw Exception("Method optimize is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); } - /** Получить запрос CREATE TABLE, который описывает данную таблицу. - * Обычно этот запрос хранится и достаётся из .sql файла из директории с метаданными. - * Этот метод используется и имеет смысл только если для таблицы не создаётся .sql файл - * - то есть, только для таблиц, которые создаются не пользователем, а самой системой - например, для таблиц типа ChunkRef. - */ - virtual ASTPtr getCustomCreateQuery(const Context & context) const - { - throw Exception("Method getCustomCreateQuery is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); - } - /** Если при уничтожении объекта надо сделать какую-то сложную работу - сделать её заранее. * Например, если таблица содержит какие-нибудь потоки для фоновой работы - попросить их завершиться и дождаться завершения. * По-умолчанию - ничего не делать. diff --git a/dbms/include/DB/Storages/StorageChunkRef.h b/dbms/include/DB/Storages/StorageChunkRef.h index a913d47f4a7..ebdad5c67d1 100644 --- a/dbms/include/DB/Storages/StorageChunkRef.h +++ b/dbms/include/DB/Storages/StorageChunkRef.h @@ -31,8 +31,6 @@ public: size_t max_block_size = DEFAULT_BLOCK_SIZE, unsigned threads = 1) override; - ASTPtr getCustomCreateQuery(const Context & context) const override; - void drop() override; String source_database_name; diff --git a/dbms/src/Databases/DatabaseOrdinary.cpp b/dbms/src/Databases/DatabaseOrdinary.cpp index 3ecfbe7ecfe..f335602a6e5 100644 --- a/dbms/src/Databases/DatabaseOrdinary.cpp +++ b/dbms/src/Databases/DatabaseOrdinary.cpp @@ -27,17 +27,17 @@ DatabaseOrdinary::DatabaseOrdinary(const String & name_, const String & path_, b } -bool DatabaseOrdinary::isTableExist(const String & name) const +bool DatabaseOrdinary::isTableExist(const String & table_name) const { std::lock_guard lock(mutex); - return tables.count(name); + return tables.count(table_name); } -StoragePtr DatabaseOrdinary::tryGetTable(const String & name) +StoragePtr DatabaseOrdinary::tryGetTable(const String & table_name) { std::lock_guard lock(mutex); - auto it = tables.find(name); + auto it = tables.find(table_name); if (it == tables.end()) return {}; return it->second; @@ -79,7 +79,7 @@ public: DatabaseIteratorPtr DatabaseOrdinary::getIterator() { std::lock_guard lock(mutex); - return std::make_shared(tables); + return std::make_unique(tables); } @@ -90,16 +90,16 @@ bool DatabaseOrdinary::empty() const } -void DatabaseOrdinary::attachTable(const String & name, StoragePtr & table) +void DatabaseOrdinary::attachTable(const String & table_name, const StoragePtr & table) { /// Добавляем таблицу в набор. std::lock_guard lock(mutex); - if (!tables.emplace(name, table).second) - throw Exception("Table " + name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS); + if (!tables.emplace(table_name, table).second) + throw Exception("Table " + name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS); } -void DatabaseOrdinary::createTable(const String & name, StoragePtr & table, const ASTPtr & query, const String & engine) +void DatabaseOrdinary::createTable(const String & table_name, const StoragePtr & table, const ASTPtr & query, const String & engine) { /// Создаём файл с метаданными, если нужно - если запрос не ATTACH. /// В него записывается запрос на ATTACH таблицы. @@ -117,8 +117,8 @@ void DatabaseOrdinary::createTable(const String & name, StoragePtr & table, cons { std::lock_guard lock(mutex); - if (tables.count(name)) - throw Exception("Table " + name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS); + if (tables.count(table_name)) + throw Exception("Table " + name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS); } String table_name_escaped; @@ -144,7 +144,7 @@ void DatabaseOrdinary::createTable(const String & name, StoragePtr & table, cons statement_stream << '\n'; statement = statement_stream.str(); - table_name_escaped = escapeForFileName(name); + table_name_escaped = escapeForFileName(table_name); table_metadata_tmp_path = path + "/" + table_name_escaped + ".sql.tmp"; table_metadata_path = path + "/" + table_name_escaped; @@ -161,8 +161,8 @@ void DatabaseOrdinary::createTable(const String & name, StoragePtr & table, cons /// Добавляем таблицу в набор. { std::lock_guard lock(mutex); - if (!tables.emplace(name, table).second) - throw Exception("Table " + name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS); + if (!tables.emplace(table_name, table).second) + throw Exception("Table " + name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS); } Poco::File(table_metadata_tmp_path).renameTo(table_metadata_path); @@ -175,15 +175,15 @@ void DatabaseOrdinary::createTable(const String & name, StoragePtr & table, cons } -StoragePtr DatabaseOrdinary::detachTable(const String & name) +StoragePtr DatabaseOrdinary::detachTable(const String & table_name) { StoragePtr res; { std::lock_guard lock(mutex); - auto it = tables.find(name); + auto it = tables.find(table_name); if (it == tables.end()) - throw Exception("Table " + name + " doesn't exist.", ErrorCodes::TABLE_ALREADY_EXISTS); + throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::TABLE_ALREADY_EXISTS); res = it->second; tables.erase(it); } @@ -192,11 +192,11 @@ StoragePtr DatabaseOrdinary::detachTable(const String & name) } -StoragePtr DatabaseOrdinary::removeTable(const String & name) +StoragePtr DatabaseOrdinary::removeTable(const String & table_name) { - StoragePtr res = detachTable(); + StoragePtr res = detachTable(table_name); - String table_name_escaped = escapeForFileName(name); + String table_name_escaped = escapeForFileName(table_name); String table_metadata_path = path + "/" + table_name_escaped; try @@ -205,7 +205,7 @@ StoragePtr DatabaseOrdinary::removeTable(const String & name) } catch (...) { - attachTable(name, res); + attachTable(table_name, res); throw; } @@ -213,25 +213,65 @@ StoragePtr DatabaseOrdinary::removeTable(const String & name) } -ASTPtr DatabaseOrdinary::getCreateQuery(const String & name) const +static ASTPtr getCreateQueryImpl(const String & path, const String & table_name) { - String table_name_escaped = escapeForFileName(name); + String table_name_escaped = escapeForFileName(table_name); String table_metadata_path = path + "/" + table_name_escaped; - StringPtr query = new String(); + String query; { ReadBufferFromFile in(table_metadata_path, 4096); - WriteBufferFromString out(*query); + WriteBufferFromString out(query); copyData(in, out); } ParserCreateQuery parser; - ASTPtr ast = parseQuery(parser, query->data(), query->data() + query->size(), "in file " + table_metadata_path); + return parseQuery(parser, query.data(), query.data() + query.size(), "in file " + table_metadata_path); +} + + +void DatabaseOrdinary::renameTable(const String & table_name, IDatabase & to_database, const String & to_table_name) +{ + DatabaseOrdinary * to_database_concrete = typeid_cast(&to_database); + + if (!to_database_concrete) + throw Exception("Moving tables between databases of different engines is not supported", ErrorCodes::NOT_IMPLEMENTED); + + StoragePtr table = tryGetTable(table_name); + + if (!table) + throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::TABLE_ALREADY_EXISTS); + + /// Уведомляем таблицу о том, что она переименовывается. Если таблица не поддерживает переименование - кинется исключение. + try + { + table->rename(path + "data/" + escapeForFileName(to_database_concrete->name) + "/", + to_database_concrete->name, + to_table_name); + } + catch (const Poco::Exception & e) + { + /// Более хорошая диагностика. + throw Exception{e}; + } + + ASTPtr ast = getCreateQueryImpl(path, table_name); + ASTCreateQuery & ast_create_query = typeid_cast(*ast); + ast_create_query.table = to_table_name; + + /// NOTE Неатомарно. + to_database_concrete->createTable(to_table_name, table, ast, table->getName()); + removeTable(table_name); +} + + +ASTPtr DatabaseOrdinary::getCreateQuery(const String & table_name) const +{ + ASTPtr ast = getCreateQueryImpl(path, table_name); ASTCreateQuery & ast_create_query = typeid_cast(*ast); ast_create_query.attach = false; ast_create_query.database = name; - ast_create_query.query_string = query; return ast; } diff --git a/dbms/src/Interpreters/Context.cpp b/dbms/src/Interpreters/Context.cpp index a462e0d815b..037c7fa7394 100644 --- a/dbms/src/Interpreters/Context.cpp +++ b/dbms/src/Interpreters/Context.cpp @@ -532,7 +532,7 @@ void Context::addExternalTable(const String & table_name, StoragePtr storage) } -void Context::addDatabase(const String & database_name, DatabasePtr & database) +void Context::addDatabase(const String & database_name, const DatabasePtr & database) { Poco::ScopedLock lock(shared->mutex); diff --git a/dbms/src/Interpreters/InterpreterRenameQuery.cpp b/dbms/src/Interpreters/InterpreterRenameQuery.cpp index 388c95b5820..d731fb4198c 100644 --- a/dbms/src/Interpreters/InterpreterRenameQuery.cpp +++ b/dbms/src/Interpreters/InterpreterRenameQuery.cpp @@ -13,6 +13,8 @@ #include #include +#include + #include @@ -31,30 +33,16 @@ struct RenameDescription { RenameDescription(const ASTRenameQuery::Element & elem, const String & path, const String & current_database) : from_database_name(elem.from.database.empty() ? current_database : elem.from.database), - from_database_name_escaped(escapeForFileName(from_database_name)), from_table_name(elem.from.table), - from_table_name_escaped(escapeForFileName(from_table_name)), - from_metadata_path(path + "metadata/" + from_database_name_escaped + "/" - + (!from_table_name.empty() ? from_table_name_escaped + ".sql" : "")), to_database_name(elem.to.database.empty() ? current_database : elem.to.database), - to_database_name_escaped(escapeForFileName(to_database_name)), - to_table_name(elem.to.table), - to_table_name_escaped(escapeForFileName(to_table_name)), - to_metadata_path(path + "metadata/" + to_database_name_escaped + "/" - + (!to_table_name.empty() ? to_table_name_escaped + ".sql" : "")) + to_table_name(elem.to.table) {} String from_database_name; - String from_database_name_escaped; String from_table_name; - String from_table_name_escaped; - String from_metadata_path; String to_database_name; - String to_database_name_escaped; String to_table_name; - String to_table_name_escaped; - String to_metadata_path; }; @@ -113,43 +101,8 @@ BlockIO InterpreterRenameQuery::execute() { context.assertTableDoesntExist(elem.to_database_name, elem.to_table_name); - /// Уведомляем таблицу о том, что она переименовывается. Если таблица не поддерживает переименование - кинется исключение. - StoragePtr table = context.getTable(elem.from_database_name, elem.from_table_name); - try - { - table->rename(path + "data/" + elem.to_database_name_escaped + "/", elem.to_database_name, - elem.to_table_name); - } - catch (const Poco::Exception & e) - { - throw Exception{e}; - } - - /// Пишем новый файл с метаданными. - { - String create_query; - { - ReadBufferFromFile in(elem.from_metadata_path, 1024); - WriteBufferFromString out(create_query); - copyData(in, out); - } - - ParserCreateQuery parser; - ASTPtr ast = parseQuery(parser, create_query.data(), create_query.data() + create_query.size(), "in file " + elem.from_metadata_path); - - typeid_cast(*ast).table = elem.to_table_name; - - Poco::FileOutputStream ostr(elem.to_metadata_path); - formatAST(*ast, ostr, 0, false); - ostr << '\n'; - } - - /// Переименовываем таблицу в контексте. - context.addTable(elem.to_database_name, elem.to_table_name, - context.detachTable(elem.from_database_name, elem.from_table_name)); - - /// Удаляем старый файл с метаданными. - Poco::File(elem.from_metadata_path).remove(); + context.getDatabase(elem.from_database_name)->renameTable( + elem.from_table_name, *context.getDatabase(elem.to_database_name), elem.to_table_name); } return {}; diff --git a/dbms/src/Server/ReplicasStatusHandler.cpp b/dbms/src/Server/ReplicasStatusHandler.cpp index 29af258c066..8b67090f9ed 100644 --- a/dbms/src/Server/ReplicasStatusHandler.cpp +++ b/dbms/src/Server/ReplicasStatusHandler.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -37,10 +38,10 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request /// Перебираем все реплицируемые таблицы. for (const auto & db : databases) { - for (auto iterator = db.second->getIterator(); iterator.isValid(); iterator.next()) + for (auto iterator = db.second->getIterator(); iterator->isValid(); iterator->next()) { - const auto & table = iterator.table(); - const StorageReplicatedMergeTree * table_replicated = typeid_cast(table.get()); + auto & table = iterator->table(); + StorageReplicatedMergeTree * table_replicated = typeid_cast(table.get()); if (!table_replicated) continue; @@ -54,7 +55,7 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request || (settings.min_relative_delay_to_close && relative_delay >= static_cast(settings.min_relative_delay_to_close))) ok = false; - message << backQuoteIfNeed(db.first) << "." << backQuoteIfNeed(table.first) + message << backQuoteIfNeed(db.first) << "." << backQuoteIfNeed(iterator->name()) << ":\tAbsolute delay: " << absolute_delay << ". Relative delay: " << relative_delay << ".\n"; } } diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index c3dbbd34e0c..32f9a87f69d 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -295,20 +295,20 @@ int Server::main(const std::vector & args) { Poco::File(path + "data/system").createDirectories(); Poco::File(path + "metadata/system").createDirectories(); - global_context->addDatabase("system", new DatabaseOrdinary("system", path + "metadata/system/", nullptr)); + global_context->addDatabase("system", std::make_shared("system", path + "metadata/system/", nullptr)); } DatabasePtr system_database = global_context->getDatabase("system"); - system_database->attachTable("one", StorageSystemOne::create("one")); + system_database->attachTable("one", StorageSystemOne::create("one")); system_database->attachTable("numbers", StorageSystemNumbers::create("numbers")); - system_database->attachTable("numbers_mt", StorageSystemNumbers::create("numbers_mt", true)); - system_database->attachTable("tables", StorageSystemTables::create("tables")); + system_database->attachTable("numbers_mt", StorageSystemNumbers::create("numbers_mt", true)); + system_database->attachTable("tables", StorageSystemTables::create("tables")); system_database->attachTable("parts", StorageSystemParts::create("parts")); system_database->attachTable("databases", StorageSystemDatabases::create("databases")); system_database->attachTable("processes", StorageSystemProcesses::create("processes")); system_database->attachTable("settings", StorageSystemSettings::create("settings")); - system_database->attachTable("events", StorageSystemEvents::create("events")); + system_database->attachTable("events", StorageSystemEvents::create("events")); system_database->attachTable("metrics", StorageSystemMetrics::create("metrics")); system_database->attachTable("merges", StorageSystemMerges::create("merges")); system_database->attachTable("replicas", StorageSystemReplicas::create("replicas")); diff --git a/dbms/src/Storages/StorageChunkRef.cpp b/dbms/src/Storages/StorageChunkRef.cpp index 27a5d4d9299..81b7a62f041 100644 --- a/dbms/src/Storages/StorageChunkRef.cpp +++ b/dbms/src/Storages/StorageChunkRef.cpp @@ -33,27 +33,6 @@ BlockInputStreams StorageChunkRef::read( max_block_size, threads); } -ASTPtr StorageChunkRef::getCustomCreateQuery(const Context & context) const -{ - /// Берём CREATE запрос для таблицы, на которую эта ссылается, и меняем в ней имя и движок. - ASTPtr res = context.getCreateQuery(source_database_name, source_table_name); - ASTCreateQuery & res_create = typeid_cast(*res); - - res_create.database.clear(); - res_create.table = name; - - res_create.storage = new ASTFunction; - ASTFunction & storage_ast = static_cast(*res_create.storage); - storage_ast.name = "ChunkRef"; - storage_ast.arguments = new ASTExpressionList; - storage_ast.children.push_back(storage_ast.arguments); - ASTExpressionList & args_ast = static_cast(*storage_ast.arguments); - args_ast.children.push_back(new ASTIdentifier(StringRange(), source_database_name, ASTIdentifier::Database)); - args_ast.children.push_back(new ASTIdentifier(StringRange(), source_table_name, ASTIdentifier::Table)); - - return res; -} - void StorageChunkRef::drop() { try