Database engines: development [#METR-19997].

This commit is contained in:
Alexey Milovidov 2016-03-17 23:23:40 +03:00
parent 4c7b2a0412
commit caaf7002f5
11 changed files with 100 additions and 134 deletions

View File

@ -16,29 +16,31 @@ class DatabaseOrdinary : public IDatabase
private:
const String name;
const String path;
std::mutex mutex;
mutable std::mutex mutex;
Tables tables;
public:
DatabaseOrdinary(const String & name_, const String & path_, boost::threadpool::pool * thread_pool_);
bool isTableExist(const String & name) const override;
bool isTableExist(const String & table_name) const override;
StoragePtr tryGetTable(const String & name) override;
StoragePtr tryGetTable(const String & table_name) override;
DatabaseIteratorPtr getIterator() override;
bool empty() const override;
void createTable(const String & name, StoragePtr & table, const ASTPtr & query, const String & engine) override;
void createTable(const String & table_name, const StoragePtr & table, const ASTPtr & query, const String & engine) override;
StoragePtr removeTable(const String & name) override;
StoragePtr removeTable(const String & table_name) override;
void attachTable(const String & name, StoragePtr & table) override;
void attachTable(const String & table_name, const StoragePtr & table) override;
StoragePtr detachTable(const String & name) override;
StoragePtr detachTable(const String & table_name) override;
ASTPtr getCreateQuery(const String & name) const override;
void renameTable(const String & table_name, IDatabase & to_database, const String & to_table_name) override;
ASTPtr getCreateQuery(const String & table_name) const override;
void shutdown() override;
};

View File

@ -34,7 +34,7 @@ using DatabaseIteratorPtr = std::unique_ptr<IDatabaseIterator>;
* - переименовывание таблиц и перенос между БД с одинаковыми движками.
*/
class IDatabase : protected std::enable_shared_from_this<IDatabase>
class IDatabase : public std::enable_shared_from_this<IDatabase>
{
public:
/// Проверить существование таблицы.
@ -51,17 +51,20 @@ public:
virtual bool empty() const = 0;
/// Добавить таблицу в базу данных. Прописать её наличие в метаданных.
virtual void createTable(const String & name, StoragePtr & table, const ASTPtr & query, const String & engine) = 0;
virtual void createTable(const String & name, const StoragePtr & table, const ASTPtr & query, const String & engine) = 0;
/// Удалить таблицу из базы данных и вернуть её. Удалить метаданные.
virtual StoragePtr removeTable(const String & name) = 0;
/// Добавить таблицу в базу данных, но не прописывать её в метаданных. БД может не поддерживать этот метод.
virtual void attachTable(const String & name, StoragePtr & table) = 0;
virtual void attachTable(const String & name, const StoragePtr & table) = 0;
/// Забыть про таблицу, не удаляя её, и вернуть её. БД может не поддерживать этот метод.
virtual StoragePtr detachTable(const String & name) = 0;
/// Переименовать таблицу и, возможно, переместить таблицу в другую БД.
virtual void renameTable(const String & name, IDatabase & to_database, const String & to_name) = 0;
/// Получить запрос CREATE TABLE для таблицы.
virtual ASTPtr getCreateQuery(const String & name) const = 0;

View File

@ -158,7 +158,7 @@ public:
StoragePtr tryGetTable(const String & database_name, const String & table_name) const;
void addExternalTable(const String & table_name, StoragePtr storage);
void addDatabase(const String & database_name, DatabasePtr & database);
void addDatabase(const String & database_name, const DatabasePtr & database);
void detachDatabase(const String & database_name);
String getCurrentDatabase() const;

View File

@ -258,16 +258,6 @@ public:
throw Exception("Method optimize is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Получить запрос CREATE TABLE, который описывает данную таблицу.
* Обычно этот запрос хранится и достаётся из .sql файла из директории с метаданными.
* Этот метод используется и имеет смысл только если для таблицы не создаётся .sql файл
* - то есть, только для таблиц, которые создаются не пользователем, а самой системой - например, для таблиц типа ChunkRef.
*/
virtual ASTPtr getCustomCreateQuery(const Context & context) const
{
throw Exception("Method getCustomCreateQuery is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Если при уничтожении объекта надо сделать какую-то сложную работу - сделать её заранее.
* Например, если таблица содержит какие-нибудь потоки для фоновой работы - попросить их завершиться и дождаться завершения.
* По-умолчанию - ничего не делать.

View File

@ -31,8 +31,6 @@ public:
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1) override;
ASTPtr getCustomCreateQuery(const Context & context) const override;
void drop() override;
String source_database_name;

View File

@ -27,17 +27,17 @@ DatabaseOrdinary::DatabaseOrdinary(const String & name_, const String & path_, b
}
bool DatabaseOrdinary::isTableExist(const String & name) const
bool DatabaseOrdinary::isTableExist(const String & table_name) const
{
std::lock_guard<std::mutex> lock(mutex);
return tables.count(name);
return tables.count(table_name);
}
StoragePtr DatabaseOrdinary::tryGetTable(const String & name)
StoragePtr DatabaseOrdinary::tryGetTable(const String & table_name)
{
std::lock_guard<std::mutex> lock(mutex);
auto it = tables.find(name);
auto it = tables.find(table_name);
if (it == tables.end())
return {};
return it->second;
@ -79,7 +79,7 @@ public:
DatabaseIteratorPtr DatabaseOrdinary::getIterator()
{
std::lock_guard<std::mutex> lock(mutex);
return std::make_shared<DatabaseOrdinaryIterator>(tables);
return std::make_unique<DatabaseOrdinaryIterator>(tables);
}
@ -90,16 +90,16 @@ bool DatabaseOrdinary::empty() const
}
void DatabaseOrdinary::attachTable(const String & name, StoragePtr & table)
void DatabaseOrdinary::attachTable(const String & table_name, const StoragePtr & table)
{
/// Добавляем таблицу в набор.
std::lock_guard<std::mutex> lock(mutex);
if (!tables.emplace(name, table).second)
throw Exception("Table " + name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
if (!tables.emplace(table_name, table).second)
throw Exception("Table " + name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
void DatabaseOrdinary::createTable(const String & name, StoragePtr & table, const ASTPtr & query, const String & engine)
void DatabaseOrdinary::createTable(const String & table_name, const StoragePtr & table, const ASTPtr & query, const String & engine)
{
/// Создаём файл с метаданными, если нужно - если запрос не ATTACH.
/// В него записывается запрос на ATTACH таблицы.
@ -117,8 +117,8 @@ void DatabaseOrdinary::createTable(const String & name, StoragePtr & table, cons
{
std::lock_guard<std::mutex> lock(mutex);
if (tables.count(name))
throw Exception("Table " + name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
if (tables.count(table_name))
throw Exception("Table " + name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
String table_name_escaped;
@ -144,7 +144,7 @@ void DatabaseOrdinary::createTable(const String & name, StoragePtr & table, cons
statement_stream << '\n';
statement = statement_stream.str();
table_name_escaped = escapeForFileName(name);
table_name_escaped = escapeForFileName(table_name);
table_metadata_tmp_path = path + "/" + table_name_escaped + ".sql.tmp";
table_metadata_path = path + "/" + table_name_escaped;
@ -161,8 +161,8 @@ void DatabaseOrdinary::createTable(const String & name, StoragePtr & table, cons
/// Добавляем таблицу в набор.
{
std::lock_guard<std::mutex> lock(mutex);
if (!tables.emplace(name, table).second)
throw Exception("Table " + name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
if (!tables.emplace(table_name, table).second)
throw Exception("Table " + name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
Poco::File(table_metadata_tmp_path).renameTo(table_metadata_path);
@ -175,15 +175,15 @@ void DatabaseOrdinary::createTable(const String & name, StoragePtr & table, cons
}
StoragePtr DatabaseOrdinary::detachTable(const String & name)
StoragePtr DatabaseOrdinary::detachTable(const String & table_name)
{
StoragePtr res;
{
std::lock_guard<std::mutex> lock(mutex);
auto it = tables.find(name);
auto it = tables.find(table_name);
if (it == tables.end())
throw Exception("Table " + name + " doesn't exist.", ErrorCodes::TABLE_ALREADY_EXISTS);
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::TABLE_ALREADY_EXISTS);
res = it->second;
tables.erase(it);
}
@ -192,11 +192,11 @@ StoragePtr DatabaseOrdinary::detachTable(const String & name)
}
StoragePtr DatabaseOrdinary::removeTable(const String & name)
StoragePtr DatabaseOrdinary::removeTable(const String & table_name)
{
StoragePtr res = detachTable();
StoragePtr res = detachTable(table_name);
String table_name_escaped = escapeForFileName(name);
String table_name_escaped = escapeForFileName(table_name);
String table_metadata_path = path + "/" + table_name_escaped;
try
@ -205,7 +205,7 @@ StoragePtr DatabaseOrdinary::removeTable(const String & name)
}
catch (...)
{
attachTable(name, res);
attachTable(table_name, res);
throw;
}
@ -213,25 +213,65 @@ StoragePtr DatabaseOrdinary::removeTable(const String & name)
}
ASTPtr DatabaseOrdinary::getCreateQuery(const String & name) const
static ASTPtr getCreateQueryImpl(const String & path, const String & table_name)
{
String table_name_escaped = escapeForFileName(name);
String table_name_escaped = escapeForFileName(table_name);
String table_metadata_path = path + "/" + table_name_escaped;
StringPtr query = new String();
String query;
{
ReadBufferFromFile in(table_metadata_path, 4096);
WriteBufferFromString out(*query);
WriteBufferFromString out(query);
copyData(in, out);
}
ParserCreateQuery parser;
ASTPtr ast = parseQuery(parser, query->data(), query->data() + query->size(), "in file " + table_metadata_path);
return parseQuery(parser, query.data(), query.data() + query.size(), "in file " + table_metadata_path);
}
void DatabaseOrdinary::renameTable(const String & table_name, IDatabase & to_database, const String & to_table_name)
{
DatabaseOrdinary * to_database_concrete = typeid_cast<DatabaseOrdinary *>(&to_database);
if (!to_database_concrete)
throw Exception("Moving tables between databases of different engines is not supported", ErrorCodes::NOT_IMPLEMENTED);
StoragePtr table = tryGetTable(table_name);
if (!table)
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::TABLE_ALREADY_EXISTS);
/// Уведомляем таблицу о том, что она переименовывается. Если таблица не поддерживает переименование - кинется исключение.
try
{
table->rename(path + "data/" + escapeForFileName(to_database_concrete->name) + "/",
to_database_concrete->name,
to_table_name);
}
catch (const Poco::Exception & e)
{
/// Более хорошая диагностика.
throw Exception{e};
}
ASTPtr ast = getCreateQueryImpl(path, table_name);
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
ast_create_query.table = to_table_name;
/// NOTE Неатомарно.
to_database_concrete->createTable(to_table_name, table, ast, table->getName());
removeTable(table_name);
}
ASTPtr DatabaseOrdinary::getCreateQuery(const String & table_name) const
{
ASTPtr ast = getCreateQueryImpl(path, table_name);
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
ast_create_query.attach = false;
ast_create_query.database = name;
ast_create_query.query_string = query;
return ast;
}

View File

@ -532,7 +532,7 @@ void Context::addExternalTable(const String & table_name, StoragePtr storage)
}
void Context::addDatabase(const String & database_name, DatabasePtr & database)
void Context::addDatabase(const String & database_name, const DatabasePtr & database)
{
Poco::ScopedLock<Poco::Mutex> lock(shared->mutex);

View File

@ -13,6 +13,8 @@
#include <DB/Parsers/formatAST.h>
#include <DB/Parsers/parseQuery.h>
#include <DB/Databases/IDatabase.h>
#include <DB/Interpreters/InterpreterRenameQuery.h>
@ -31,30 +33,16 @@ struct RenameDescription
{
RenameDescription(const ASTRenameQuery::Element & elem, const String & path, const String & current_database) :
from_database_name(elem.from.database.empty() ? current_database : elem.from.database),
from_database_name_escaped(escapeForFileName(from_database_name)),
from_table_name(elem.from.table),
from_table_name_escaped(escapeForFileName(from_table_name)),
from_metadata_path(path + "metadata/" + from_database_name_escaped + "/"
+ (!from_table_name.empty() ? from_table_name_escaped + ".sql" : "")),
to_database_name(elem.to.database.empty() ? current_database : elem.to.database),
to_database_name_escaped(escapeForFileName(to_database_name)),
to_table_name(elem.to.table),
to_table_name_escaped(escapeForFileName(to_table_name)),
to_metadata_path(path + "metadata/" + to_database_name_escaped + "/"
+ (!to_table_name.empty() ? to_table_name_escaped + ".sql" : ""))
to_table_name(elem.to.table)
{}
String from_database_name;
String from_database_name_escaped;
String from_table_name;
String from_table_name_escaped;
String from_metadata_path;
String to_database_name;
String to_database_name_escaped;
String to_table_name;
String to_table_name_escaped;
String to_metadata_path;
};
@ -113,43 +101,8 @@ BlockIO InterpreterRenameQuery::execute()
{
context.assertTableDoesntExist(elem.to_database_name, elem.to_table_name);
/// Уведомляем таблицу о том, что она переименовывается. Если таблица не поддерживает переименование - кинется исключение.
StoragePtr table = context.getTable(elem.from_database_name, elem.from_table_name);
try
{
table->rename(path + "data/" + elem.to_database_name_escaped + "/", elem.to_database_name,
elem.to_table_name);
}
catch (const Poco::Exception & e)
{
throw Exception{e};
}
/// Пишем новый файл с метаданными.
{
String create_query;
{
ReadBufferFromFile in(elem.from_metadata_path, 1024);
WriteBufferFromString out(create_query);
copyData(in, out);
}
ParserCreateQuery parser;
ASTPtr ast = parseQuery(parser, create_query.data(), create_query.data() + create_query.size(), "in file " + elem.from_metadata_path);
typeid_cast<ASTCreateQuery &>(*ast).table = elem.to_table_name;
Poco::FileOutputStream ostr(elem.to_metadata_path);
formatAST(*ast, ostr, 0, false);
ostr << '\n';
}
/// Переименовываем таблицу в контексте.
context.addTable(elem.to_database_name, elem.to_table_name,
context.detachTable(elem.from_database_name, elem.from_table_name));
/// Удаляем старый файл с метаданными.
Poco::File(elem.from_metadata_path).remove();
context.getDatabase(elem.from_database_name)->renameTable(
elem.from_table_name, *context.getDatabase(elem.to_database_name), elem.to_table_name);
}
return {};

View File

@ -3,6 +3,7 @@
#include <DB/Interpreters/Context.h>
#include <DB/Storages/StorageReplicatedMergeTree.h>
#include <DB/Common/HTMLForm.h>
#include <DB/Databases/IDatabase.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
@ -37,10 +38,10 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request
/// Перебираем все реплицируемые таблицы.
for (const auto & db : databases)
{
for (auto iterator = db.second->getIterator(); iterator.isValid(); iterator.next())
for (auto iterator = db.second->getIterator(); iterator->isValid(); iterator->next())
{
const auto & table = iterator.table();
const StorageReplicatedMergeTree * table_replicated = typeid_cast<const StorageReplicatedMergeTree *>(table.get());
auto & table = iterator->table();
StorageReplicatedMergeTree * table_replicated = typeid_cast<StorageReplicatedMergeTree *>(table.get());
if (!table_replicated)
continue;
@ -54,7 +55,7 @@ void ReplicasStatusHandler::handleRequest(Poco::Net::HTTPServerRequest & request
|| (settings.min_relative_delay_to_close && relative_delay >= static_cast<time_t>(settings.min_relative_delay_to_close)))
ok = false;
message << backQuoteIfNeed(db.first) << "." << backQuoteIfNeed(table.first)
message << backQuoteIfNeed(db.first) << "." << backQuoteIfNeed(iterator->name())
<< ":\tAbsolute delay: " << absolute_delay << ". Relative delay: " << relative_delay << ".\n";
}
}

View File

@ -295,20 +295,20 @@ int Server::main(const std::vector<std::string> & args)
{
Poco::File(path + "data/system").createDirectories();
Poco::File(path + "metadata/system").createDirectories();
global_context->addDatabase("system", new DatabaseOrdinary("system", path + "metadata/system/", nullptr));
global_context->addDatabase("system", std::make_shared<DatabaseOrdinary>("system", path + "metadata/system/", nullptr));
}
DatabasePtr system_database = global_context->getDatabase("system");
system_database->attachTable("one", StorageSystemOne::create("one"));
system_database->attachTable("one", StorageSystemOne::create("one"));
system_database->attachTable("numbers", StorageSystemNumbers::create("numbers"));
system_database->attachTable("numbers_mt", StorageSystemNumbers::create("numbers_mt", true));
system_database->attachTable("tables", StorageSystemTables::create("tables"));
system_database->attachTable("numbers_mt", StorageSystemNumbers::create("numbers_mt", true));
system_database->attachTable("tables", StorageSystemTables::create("tables"));
system_database->attachTable("parts", StorageSystemParts::create("parts"));
system_database->attachTable("databases", StorageSystemDatabases::create("databases"));
system_database->attachTable("processes", StorageSystemProcesses::create("processes"));
system_database->attachTable("settings", StorageSystemSettings::create("settings"));
system_database->attachTable("events", StorageSystemEvents::create("events"));
system_database->attachTable("events", StorageSystemEvents::create("events"));
system_database->attachTable("metrics", StorageSystemMetrics::create("metrics"));
system_database->attachTable("merges", StorageSystemMerges::create("merges"));
system_database->attachTable("replicas", StorageSystemReplicas::create("replicas"));

View File

@ -33,27 +33,6 @@ BlockInputStreams StorageChunkRef::read(
max_block_size, threads);
}
ASTPtr StorageChunkRef::getCustomCreateQuery(const Context & context) const
{
/// Берём CREATE запрос для таблицы, на которую эта ссылается, и меняем в ней имя и движок.
ASTPtr res = context.getCreateQuery(source_database_name, source_table_name);
ASTCreateQuery & res_create = typeid_cast<ASTCreateQuery &>(*res);
res_create.database.clear();
res_create.table = name;
res_create.storage = new ASTFunction;
ASTFunction & storage_ast = static_cast<ASTFunction &>(*res_create.storage);
storage_ast.name = "ChunkRef";
storage_ast.arguments = new ASTExpressionList;
storage_ast.children.push_back(storage_ast.arguments);
ASTExpressionList & args_ast = static_cast<ASTExpressionList &>(*storage_ast.arguments);
args_ast.children.push_back(new ASTIdentifier(StringRange(), source_database_name, ASTIdentifier::Database));
args_ast.children.push_back(new ASTIdentifier(StringRange(), source_table_name, ASTIdentifier::Table));
return res;
}
void StorageChunkRef::drop()
{
try