Database engines: development [#METR-19997].

This commit is contained in:
Alexey Milovidov 2016-03-17 20:29:07 +03:00
parent 9838d6af73
commit 4c7b2a0412
8 changed files with 151 additions and 180 deletions

View File

@ -1,5 +1,6 @@
#pragma once
#include <threadpool.hpp>
#include <DB/Databases/IDatabase.h>

View File

@ -40,8 +40,6 @@ public:
ASTPtr getCreateQuery(const String & name) const override;
void dropAll() override;
void shutdown() override;
};

View File

@ -65,9 +65,6 @@ public:
/// Получить запрос CREATE TABLE для таблицы.
virtual ASTPtr getCreateQuery(const String & name) const = 0;
/// Удалить все таблицы.
virtual void dropAll() = 0;
/// Попросить все таблицы завершить фоновые потоки, которые они используют, и удалить все объекты таблиц.
virtual void shutdown() = 0;

View File

@ -12,24 +12,19 @@ namespace DB
class ASTCreateQuery;
/** Позволяет создать новую таблицу, или создать объект уже существующей таблицы, или создать БД, или создать объект уже существующей БД
/** Позволяет создать новую таблицу, или создать объект уже существующей таблицы, или создать БД, или создать объект уже существующей БД.
*/
class InterpreterCreateQuery : public IInterpreter
{
public:
InterpreterCreateQuery(ASTPtr query_ptr_, Context & context_);
/** В случае таблицы: добавляет созданную таблицу в контекст, а также возвращает её.
* В случае БД: добавляет созданную БД в контекст и возвращает NULL.
* assume_metadata_exists - не проверять наличие файла с метаданными и не создавать его
* (для случая выполнения запроса из существующего файла с метаданными).
*/
BlockIO execute() override
{
return executeImpl(false);
}
/** assume_metadata_exists - не проверять наличие файла с метаданными и не создавать его
/** Не проверять наличие файла с метаданными и не создавать его
* (для случая выполнения запроса из существующего файла с метаданными).
*/
void executeLoadExisting()
@ -48,12 +43,24 @@ public:
private:
BlockIO executeImpl(bool assume_metadata_exists);
void createDatabase(ASTCreateQuery & create, bool assume_metadata_exists);
void createDatabase(ASTCreateQuery & create);
BlockIO createTable(ASTCreateQuery & create, bool assume_metadata_exists);
struct ColumnsInfo
{
NamesAndTypesListPtr columns = new NamesAndTypesList;
NamesAndTypesList materialized_columns;
NamesAndTypesList alias_columns;
ColumnDefaults column_defaults;
};
/// Вычислить список столбцов таблицы и вернуть его.
ColumnsInfo setColumns(ASTCreateQuery & create, const Block & as_select_sample, const StoragePtr & as_storage) const;
String setEngine(ASTCreateQuery & create, const StoragePtr & as_storage) const;
/// AST в список столбцов с типами. Столбцы типа Nested развернуты в список настоящих столбцов.
using ColumnsAndDefaults = std::pair<NamesAndTypesList, ColumnDefaults>;
ColumnsAndDefaults parseColumns(ASTPtr expression_list);
ColumnsAndDefaults parseColumns(ASTPtr expression_list) const;
/// removes columns from the columns list and return them in a separate list
static NamesAndTypesList removeAndReturnColumns(ColumnsAndDefaults & columns_and_defaults, ColumnDefaultType type);

View File

@ -237,12 +237,6 @@ ASTPtr DatabaseOrdinary::getCreateQuery(const String & name) const
}
void DatabaseOrdinary::dropAll()
{
/// TODO
}
void DatabaseOrdinary::shutdown()
{
std::lock_guard<std::mutex> lock(mutex);

View File

@ -1,3 +1,5 @@
#include <memory>
#include <Poco/File.h>
#include <Poco/FileStream.h>
@ -32,7 +34,9 @@
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/Databases/DatabaseFactory.h>
#include <DB/Databases/IDatabase.h>
namespace DB
@ -47,6 +51,7 @@ namespace ErrorCodes
extern const int INCORRECT_QUERY;
extern const int ENGINE_REQUIRED;
extern const int TABLE_METADATA_ALREADY_EXISTS;
extern const int UNKNOWN_DATABASE_ENGINE;
}
@ -56,7 +61,7 @@ InterpreterCreateQuery::InterpreterCreateQuery(ASTPtr query_ptr_, Context & cont
}
void InterpreterCreateQuery::createDatabase(ASTCreateQuery & create, bool assume_metadata_exists)
void InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
{
String path = context.getPath();
String current_database = context.getCurrentDatabase();
@ -83,96 +88,57 @@ void InterpreterCreateQuery::createDatabase(ASTCreateQuery & create, bool assume
Poco::File(data_path).createDirectory();
}
DatabasePtr database = DatabaseFactory::get(create.en);
String database_engine_name;
if (const ASTIdentifier * engine_id = typeid_cast<const ASTIdentifier *>(create.storage.get()))
database_engine_name = engine_id->name;
else
{
std::stringstream ostr;
formatAST(*create.storage, ostr, 0, false, false);
throw Exception("Unknown database engine: " + ostr.str(), ErrorCodes::UNKNOWN_DATABASE_ENGINE);
}
DatabasePtr database = DatabaseFactory::get(database_engine_name, database_name, metadata_path, nullptr); /// TODO threadpool
if (!create.if_not_exists || !context.isDatabaseExist(database_name))
context.addDatabase(database_name);
return {};
context.addDatabase(database_name, database);
}
BlockIO createTable(bool assume_metadata_exists);
BlockIO InterpreterCreateQuery::executeImpl(bool assume_metadata_exists)
InterpreterCreateQuery::ColumnsInfo InterpreterCreateQuery::setColumns(
ASTCreateQuery & create, const Block & as_select_sample, const StoragePtr & as_storage) const
{
String path = context.getPath();
String current_database = context.getCurrentDatabase();
ColumnsInfo res;
ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*query_ptr);
String database_name = create.database.empty() ? current_database : create.database;
String database_name_escaped = escapeForFileName(database_name);
String table_name = create.table;
String table_name_escaped = escapeForFileName(table_name);
String as_database_name = create.as_database.empty() ? current_database : create.as_database;
String as_table_name = create.as_table;
String data_path = path + "data/" + database_name_escaped + "/";
String metadata_path = path + "metadata/" + database_name_escaped + "/" + (!table_name.empty() ? table_name_escaped + ".sql" : "");
/// CREATE|ATTACH DATABASE
if (!database_name.empty() && table_name.empty())
{
return {};
}
SharedPtr<InterpreterSelectQuery> interpreter_select;
Block select_sample;
/// Для таблиц типа view, чтобы получить столбцы, может понадобиться sample_block.
if (create.select && (!create.attach || (!create.columns && (create.is_view || create.is_materialized_view))))
{
interpreter_select = new InterpreterSelectQuery(create.select, context);
select_sample = interpreter_select->getSampleBlock();
}
StoragePtr res;
String storage_name;
NamesAndTypesListPtr columns = new NamesAndTypesList;
NamesAndTypesList materialized_columns{};
NamesAndTypesList alias_columns{};
ColumnDefaults column_defaults{};
StoragePtr as_storage;
IStorage::TableStructureReadLockPtr as_storage_lock;
if (!as_table_name.empty())
{
as_storage = context.getTable(as_database_name, as_table_name);
as_storage_lock = as_storage->lockStructure(false);
}
/// Получаем список столбцов
if (create.columns)
{
auto && columns_and_defaults = parseColumns(create.columns);
materialized_columns = removeAndReturnColumns(columns_and_defaults, ColumnDefaultType::Materialized);
alias_columns = removeAndReturnColumns(columns_and_defaults, ColumnDefaultType::Alias);
columns = new NamesAndTypesList{std::move(columns_and_defaults.first)};
column_defaults = std::move(columns_and_defaults.second);
res.materialized_columns = removeAndReturnColumns(columns_and_defaults, ColumnDefaultType::Materialized);
res.alias_columns = removeAndReturnColumns(columns_and_defaults, ColumnDefaultType::Alias);
res.columns = new NamesAndTypesList{std::move(columns_and_defaults.first)};
res.column_defaults = std::move(columns_and_defaults.second);
if (columns->size() + materialized_columns.size() == 0)
if (res.columns->size() + res.materialized_columns.size() == 0)
throw Exception{"Cannot CREATE table without physical columns", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED};
}
else if (!create.as_table.empty())
{
columns = new NamesAndTypesList(as_storage->getColumnsListNonMaterialized());
materialized_columns = as_storage->materialized_columns;
alias_columns = as_storage->alias_columns;
column_defaults = as_storage->column_defaults;
res.columns = new NamesAndTypesList(as_storage->getColumnsListNonMaterialized());
res.materialized_columns = as_storage->materialized_columns;
res.alias_columns = as_storage->alias_columns;
res.column_defaults = as_storage->column_defaults;
}
else if (create.select)
{
columns = new NamesAndTypesList;
for (size_t i = 0; i < select_sample.columns(); ++i)
columns->push_back(NameAndTypePair(select_sample.getByPosition(i).name, select_sample.getByPosition(i).type));
res.columns = new NamesAndTypesList;
for (size_t i = 0; i < as_select_sample.columns(); ++i)
res.columns->push_back(NameAndTypePair(as_select_sample.getByPosition(i).name, as_select_sample.getByPosition(i).type));
}
else
throw Exception("Incorrect CREATE query: required list of column descriptions or AS section or SELECT.", ErrorCodes::INCORRECT_QUERY);
/// Даже если в запросе был список столбцов, на всякий случай приведем его к стандартному виду (развернем Nested).
ASTPtr new_columns = formatColumns(*columns, materialized_columns, alias_columns, column_defaults);
/// Даже если в запросе был список столбцов, на всякий случай приведем его к стандартному виду (развернём Nested).
ASTPtr new_columns = formatColumns(*res.columns, res.materialized_columns, res.alias_columns, res.column_defaults);
if (create.columns)
{
auto it = std::find(create.children.begin(), create.children.end(), create.columns);
@ -185,6 +151,15 @@ BlockIO InterpreterCreateQuery::executeImpl(bool assume_metadata_exists)
create.children.push_back(new_columns);
create.columns = new_columns;
return res;
}
String InterpreterCreateQuery::setEngine(
ASTCreateQuery & create, const StoragePtr & as_storage) const
{
String storage_name;
auto set_engine = [&](const char * engine)
{
storage_name = engine;
@ -193,7 +168,6 @@ BlockIO InterpreterCreateQuery::executeImpl(bool assume_metadata_exists)
create.storage = func;
};
/// Выбор нужного движка таблицы
if (create.storage)
{
storage_name = typeid_cast<ASTFunction &>(*create.storage).name;
@ -201,6 +175,10 @@ BlockIO InterpreterCreateQuery::executeImpl(bool assume_metadata_exists)
else if (!create.as_table.empty())
{
/// NOTE Получение структуры у таблицы, указанной в AS делается не атомарно с созданием таблицы.
String as_database_name = create.as_database.empty() ? context.getCurrentDatabase() : create.as_database;
String as_table_name = create.as_table;
storage_name = as_storage->getName();
create.storage = typeid_cast<const ASTCreateQuery &>(*context.getCreateQuery(as_database_name, as_table_name)).storage;
}
@ -213,7 +191,54 @@ BlockIO InterpreterCreateQuery::executeImpl(bool assume_metadata_exists)
else
throw Exception("Incorrect CREATE query: required ENGINE.", ErrorCodes::ENGINE_REQUIRED);
return storage_name;
}
BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create, bool assume_metadata_exists)
{
String path = context.getPath();
String current_database = context.getCurrentDatabase();
String database_name = create.database.empty() ? current_database : create.database;
String database_name_escaped = escapeForFileName(database_name);
String table_name = create.table;
String table_name_escaped = escapeForFileName(table_name);
String data_path = path + "data/" + database_name_escaped + "/";
String metadata_path = path + "metadata/" + database_name_escaped + "/" + table_name_escaped + ".sql";
std::unique_ptr<InterpreterSelectQuery> interpreter_select;
Block as_select_sample;
/// Для таблиц типа view, чтобы получить столбцы, может понадобиться sample_block.
if (create.select && (!create.attach || (!create.columns && (create.is_view || create.is_materialized_view))))
{
interpreter_select = std::make_unique<InterpreterSelectQuery>(create.select, context);
as_select_sample = interpreter_select->getSampleBlock();
}
String as_database_name = create.as_database.empty() ? current_database : create.as_database;
String as_table_name = create.as_table;
StoragePtr as_storage;
IStorage::TableStructureReadLockPtr as_storage_lock;
if (!as_table_name.empty())
{
as_storage = context.getTable(as_database_name, as_table_name);
as_storage_lock = as_storage->lockStructure(false);
}
/// Устанавливаем и получаем список столбцов.
ColumnsInfo columns = setColumns(create, as_select_sample, as_storage);
/// Выбор нужного движка таблицы
String storage_name = setEngine(create, as_storage);
StoragePtr res;
{
/// TODO Операции с файловой системой не должны быть под глобальной блокировкой.
/// Сделать DDLTableLock
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
if (!create.is_temporary)
@ -231,49 +256,15 @@ BlockIO InterpreterCreateQuery::executeImpl(bool assume_metadata_exists)
res = StorageFactory::instance().get(
storage_name, data_path, table_name, database_name, context,
context.getGlobalContext(), query_ptr, columns,
materialized_columns, alias_columns, column_defaults, create.attach);
/// Проверка наличия метаданных таблицы на диске и создание метаданных.
/// TODO Операции с файловой системой не должны быть под глобальной блокировкой.
if (!assume_metadata_exists && !create.is_temporary)
{
if (Poco::File(metadata_path).exists())
{
/** Запрос ATTACH TABLE может использоваться, чтобы создать в оперативке ссылку на уже существующую таблицу.
* Это используется, например, при загрузке сервера.
*/
if (!create.attach)
throw Exception("Metadata for table " + database_name + "." + table_name + " already exists.",
ErrorCodes::TABLE_METADATA_ALREADY_EXISTS);
}
else
{
/// Меняем CREATE на ATTACH и пишем запрос в файл.
ASTPtr attach_ptr = query_ptr->clone();
ASTCreateQuery & attach = typeid_cast<ASTCreateQuery &>(*attach_ptr);
attach.attach = true;
attach.database.clear();
attach.as_database.clear();
attach.as_table.clear();
attach.if_not_exists = false;
attach.is_populate = false;
/// Для engine VIEW необходимо сохранить сам селект запрос, для остальных - наоборот
if (storage_name != "View" && storage_name != "MaterializedView")
attach.select = nullptr;
Poco::FileOutputStream metadata_file(metadata_path);
formatAST(attach, metadata_file, 0, false);
metadata_file << "\n";
}
}
context.getGlobalContext(), query_ptr, columns.columns,
columns.materialized_columns, columns.alias_columns, columns.column_defaults, create.attach);
if (create.is_temporary)
context.getSessionContext().addExternalTable(table_name, res);
else if (assume_metadata_exists)
context.getDatabase(database_name)->attachTable(table_name, res);
else
context.addTable(database_name, table_name, res);
context.getDatabase(database_name)->createTable(table_name, res, query_ptr, storage_name);
}
/// Если запрос CREATE SELECT, то вставим в таблицу данные
@ -293,14 +284,14 @@ BlockIO InterpreterCreateQuery::executeImpl(bool assume_metadata_exists)
}
},
/// @note shouldn't these two contexts be session contexts in case of temporary table?
columns, column_defaults, context, static_cast<bool>(context.getSettingsRef().strict_insert_defaults)
columns.columns, columns.column_defaults, context, static_cast<bool>(context.getSettingsRef().strict_insert_defaults)
},
materialized_columns
columns.materialized_columns
}
};
BlockIO io;
io.in_sample = select_sample;
io.in_sample = as_select_sample;
io.in = new NullAndDoCopyBlockInputStream(interpreter_select->execute().in, out);
return io;
@ -310,7 +301,22 @@ BlockIO InterpreterCreateQuery::executeImpl(bool assume_metadata_exists)
}
InterpreterCreateQuery::ColumnsAndDefaults InterpreterCreateQuery::parseColumns(ASTPtr expression_list)
BlockIO InterpreterCreateQuery::executeImpl(bool assume_metadata_exists)
{
ASTCreateQuery & create = typeid_cast<ASTCreateQuery &>(*query_ptr);
/// CREATE|ATTACH DATABASE
if (!create.database.empty() && create.table.empty())
{
createDatabase(create);
return {};
}
else
return createTable(create, assume_metadata_exists);
}
InterpreterCreateQuery::ColumnsAndDefaults InterpreterCreateQuery::parseColumns(ASTPtr expression_list) const
{
auto & column_list_ast = typeid_cast<ASTExpressionList &>(*expression_list);
@ -412,8 +418,8 @@ InterpreterCreateQuery::ColumnsAndDefaults InterpreterCreateQuery::parseColumns(
return { *DataTypeNested::expandNestedColumns(columns), defaults };
}
NamesAndTypesList InterpreterCreateQuery::removeAndReturnColumns(ColumnsAndDefaults & columns_and_defaults,
const ColumnDefaultType type)
NamesAndTypesList InterpreterCreateQuery::removeAndReturnColumns(
ColumnsAndDefaults & columns_and_defaults, const ColumnDefaultType type)
{
auto & columns = columns_and_defaults.first;
auto & defaults = columns_and_defaults.second;

View File

@ -4,6 +4,7 @@
#include <DB/Parsers/ASTDropQuery.h>
#include <DB/Interpreters/InterpreterDropQuery.h>
#include <DB/Storages/IStorage.h>
#include <DB/Databases/IDatabase.h>
namespace DB
@ -36,6 +37,10 @@ BlockIO InterpreterDropQuery::execute()
String data_path = path + "data/" + database_name_escaped + "/";
String metadata_path = path + "metadata/" + database_name_escaped + "/";
auto database = context.tryGetDatabase(database_name);
if (!database && !drop.if_exists)
throw Exception("Database " + database_name + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
StorageVector tables_to_drop;
if (!drop.table.empty())
@ -54,10 +59,6 @@ BlockIO InterpreterDropQuery::execute()
}
else
{
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
auto database = context.tryGetDatabase(database_name);
if (!database)
{
if (!drop.if_exists)
@ -65,7 +66,7 @@ BlockIO InterpreterDropQuery::execute()
return {};
}
for (auto iterator = database->getIterator(); iterator.isValid(); iterator.next())
for (auto iterator = database->getIterator(); iterator->isValid(); iterator->next())
tables_to_drop.push_back(iterator->table());
}
@ -78,49 +79,16 @@ BlockIO InterpreterDropQuery::execute()
String current_table_name = table->getTableName();
/// Удаляем информацию о таблице из оперативки
StoragePtr detached = context.detachTable(database_name, current_table_name);
/// Удаляем метаданные таблицы.
database->removeTable(current_table_name);
/// Удаляем данные таблицы
if (!drop.detach)
{
String current_data_path = data_path + escapeForFileName(current_table_name);
String current_metadata_path = metadata_path + escapeForFileName(current_table_name) + ".sql";
/// Для таблиц типа ChunkRef, файла с метаданными не существует.
bool metadata_file_exists = Poco::File(current_metadata_path).exists();
if (metadata_file_exists)
{
if (Poco::File(current_metadata_path + ".bak").exists())
Poco::File(current_metadata_path + ".bak").remove();
Poco::File(current_metadata_path).renameTo(current_metadata_path + ".bak");
}
try
{
table->drop();
}
catch (const Exception & e)
{
/// Такая ошибка означает, что таблицу невозможно удалить, и данные пока ещё консистентны. Можно вернуть таблицу на место.
/// NOTE Таблица будет оставаться в состоянии после shutdown - не производить всевозможной фоновой работы.
if (e.code() == ErrorCodes::TABLE_WAS_NOT_DROPPED)
{
if (metadata_file_exists)
Poco::File(current_metadata_path + ".bak").renameTo(current_metadata_path);
context.addTable(database_name, current_table_name, detached);
throw;
}
else
throw;
}
table->drop(); /// TODO Не удалять метаданные, если таблицу не получилось удалить.
table->is_dropped = true;
if (metadata_file_exists)
Poco::File(current_metadata_path + ".bak").remove();
String current_data_path = data_path + escapeForFileName(current_table_name);
if (Poco::File(current_data_path).exists())
Poco::File(current_data_path).remove(true);

View File

@ -194,11 +194,11 @@ StorageChunks::StorageChunks(
if (context.isTableExist(database_name, it->first))
{
LOG_WARNING(log, "Chunk " << it->first << " already exists.");
context.detachTable(database_name, it->first);
context.getDatabase(database_name)->detachTable(it->first);
}
context.getDatabase(database_name)->attachTable(
it->first, StorageChunkRef::create(it->first, context, database_name, name, true));
auto table = StorageChunkRef::create(it->first, context, database_name, name, true);
context.getDatabase(database_name)->attachTable(it->first, table);
++refcount;
}