dbms: don't write .sql files for ChunkRef tables [#CONV-7889].

This commit is contained in:
Alexey Milovidov 2013-06-17 07:01:31 +00:00
parent 8fad2d42e2
commit e853c4b320
12 changed files with 104 additions and 101 deletions

View File

@ -195,6 +195,7 @@ namespace ErrorCodes
COLLATION_COMPARISON_FAILED,
UNKNOWN_ACTION,
MULTIPLE_ARRAY_JOIN,
TABLE_MUST_NOT_BE_CREATED_MANUALLY,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -19,6 +19,8 @@
namespace DB
{
class Context;
/** Хранилище. Отвечает за:
* - хранение данных таблицы;
@ -83,7 +85,7 @@ public:
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1)
{
throw Exception("Method read() is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Method read is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Пишет данные в таблицу.
@ -93,7 +95,7 @@ public:
virtual BlockOutputStreamPtr write(
ASTPtr query)
{
throw Exception("Method write() is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Method write is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Удалить данные таблицы. После вызова этого метода, использование объекта некорректно (его можно лишь уничтожить).
@ -114,7 +116,7 @@ public:
*/
virtual void rename(const String & new_path_to_db, const String & new_name)
{
throw Exception("Method rename() is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Method rename is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** ALTER таблицы в виде изменения столбцов, не затрагивающий изменение Storage или его параметров.
@ -122,7 +124,7 @@ public:
*/
virtual void alter(NamesAndTypesListPtr columns)
{
throw Exception("Method alter() is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Method alter is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Выполнить какую-либо фоновую работу. Например, объединение кусков в таблице типа MergeTree.
@ -130,9 +132,19 @@ public:
*/
virtual bool optimize()
{
throw Exception("Method optimize() is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Method optimize is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
/** Получить запрос CREATE TABLE, который описывает данную таблицу.
* Обычно этот запрос хранится и достаётся из .sql файла из директории с метаданными.
* Этот метод используется и имеет смысл только если для таблицы не создаётся .sql файл
* - то есть, только для таблиц, которые создаются не пользователем, а самой системой - например, для таблиц типа ChunkRef.
*/
virtual ASTPtr getCustomCreateQuery(const Context & context) const
{
throw Exception("Method getCustomCreateQuery is not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
virtual ~IStorage() {}
/** Проверить, что все запрошенные имена есть в таблице и заданы корректно.

View File

@ -12,7 +12,6 @@ namespace DB
/** То и дело объединяет таблицы, подходящие под регэксп, в таблицы типа Chunks.
* После объндинения заменяет исходные таблицы таблицами типа ChunkRef.
* При чтении ведет себя как таблица типа Merge.
* Внимание: если в объединяемых таблицах были лишние столбцы, данные из этих столбцов потеряются при слиянии.
*/
class StorageChunkMerger : public IStorage
{
@ -23,7 +22,6 @@ public:
NamesAndTypesListPtr columns_, /// Список столбцов.
const String & source_database_, /// В какой БД искать таблицы-источники.
const String & table_name_regexp_, /// Регексп имён таблиц-источников.
const std::string & destination_database_, /// БД для создаваемых таблиц типа Chunks.
const std::string & destination_name_prefix_, /// Префикс имен создаваемых таблиц типа Chunks.
size_t chunks_to_merge_, /// Сколько чанков сливать в одну группу.
Context & context_); /// Известные таблицы.
@ -53,7 +51,6 @@ private:
NamesAndTypesListPtr columns;
String source_database;
OptimizedRegularExpression table_name_regexp;
String destination_database;
std::string destination_name_prefix;
size_t chunks_to_merge;
Context & context;
@ -69,7 +66,6 @@ private:
NamesAndTypesListPtr columns_,
const String & source_database_,
const String & table_name_regexp_,
const std::string & destination_database_,
const std::string & destination_name_prefix_,
size_t chunks_to_merge_,
Context & context_);

View File

@ -26,6 +26,8 @@ public:
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size = DEFAULT_BLOCK_SIZE,
unsigned threads = 1);
ASTPtr getCustomCreateQuery(const Context & context) const;
void dropImpl();

View File

@ -12,7 +12,8 @@ namespace DB
* Запись не поддерживается. Для записи используются таблицы типа ChunkMerger.
* Таблицы типа ChunkRef могут ссылаться на отдельные куски внутри таблицы типа Chunks.
* Хранит количество ссылающихся таблиц ChunkRef и удаляет себя, когда оно становится нулевым.
* После создания счетчик ссылок имеет значение 1.
* Сразу после создания CREATE-ом, счетчик ссылок имеет значение 1
* (потом, движок ChunkMerger добавляет ссылки от созданных ChunkRef-ов и затем вычитает 1).
*/
class StorageChunks : public StorageLog
{
@ -54,13 +55,14 @@ public:
throw Exception("Table doesn't support renaming", ErrorCodes::NOT_IMPLEMENTED);
}
private:
typedef std::vector<size_t> Marks;
/// Имя чанка - номер (в последовательности, как чанки записаны в таблице).
typedef std::map<String, size_t> ChunkIndices;
/// Номер чанка - засечка, с которой начинаются данные таблицы.
typedef std::vector<size_t> ChunkNumToMark;
String database_name;
bool index_loaded;
Marks marks;
ChunkNumToMark chunk_num_to_marks;
ChunkIndices chunk_indices;
CounterInFile reference_counter;

View File

@ -188,9 +188,20 @@ ASTPtr Context::getCreateQuery(const String & database_name, const String & tabl
/// Здесь хранится определение таблицы
String metadata_path = shared->path + "metadata/" + escapeForFileName(db) + "/" + escapeForFileName(table_name) + ".sql";
if (!Poco::File(metadata_path).exists())
throw Exception("Metadata file " + metadata_path + " for table " + db + "." + table_name + " doesn't exist.",
ErrorCodes::TABLE_METADATA_DOESNT_EXIST);
{
try
{
/// Если файл .sql не предусмотрен (например, для таблиц типа ChunkRef), то движок может сам предоставить запрос CREATE.
return getTable(database_name, table_name)->getCustomCreateQuery(*this);
}
catch (...)
{
throw Exception("Metadata file " + metadata_path + " for table " + db + "." + table_name + " doesn't exist.",
ErrorCodes::TABLE_METADATA_DOESNT_EXIST);
}
}
StringPtr query = new String();
{

View File

@ -57,7 +57,9 @@ void InterpreterDropQuery::execute()
table->database_to_drop = database_dropper;
table->drop();
Poco::File(metadata_path).remove();
/// Для таблиц типа ChunkRef, файла с метаданными не существует.
if (Poco::File(metadata_path).exists())
Poco::File(metadata_path).remove();
}
/// Удаляем информацию о таблице из оперативки

View File

@ -56,7 +56,7 @@ void InterpreterRenameQuery::execute()
context.assertTableExists(from_database_name, from_table_name);
context.assertTableDoesntExist(to_database_name, to_table_name);
/// Уведомляем таблицу о том, что она переименовается.
/// Уведомляем таблицу о том, что она переименовается. Если таблица не поддерживает переименование - кинется исключение.
context.getTable(from_database_name, from_table_name)->rename(path + "data/" + to_database_name_escaped + "/", to_table_name);
/// Пишем новый файл с метаданными.

View File

@ -32,12 +32,11 @@ StoragePtr StorageChunkMerger::create(
NamesAndTypesListPtr columns_,
const String & source_database_,
const String & table_name_regexp_,
const std::string & destination_database_,
const std::string & destination_name_prefix_,
size_t chunks_to_merge_,
Context & context_)
{
return (new StorageChunkMerger(this_database_, name_, columns_, source_database_, table_name_regexp_, destination_database_, destination_name_prefix_, chunks_to_merge_, context_))->thisPtr();
return (new StorageChunkMerger(this_database_, name_, columns_, source_database_, table_name_regexp_, destination_name_prefix_, chunks_to_merge_, context_))->thisPtr();
}
BlockInputStreams StorageChunkMerger::read(
@ -135,12 +134,11 @@ StorageChunkMerger::StorageChunkMerger(
NamesAndTypesListPtr columns_,
const String & source_database_,
const String & table_name_regexp_,
const std::string & destination_database_,
const std::string & destination_name_prefix_,
size_t chunks_to_merge_,
Context & context_)
: this_database(this_database_), name(name_), columns(columns_), source_database(source_database_),
table_name_regexp(table_name_regexp_), destination_database(destination_database_), destination_name_prefix(destination_name_prefix_), chunks_to_merge(chunks_to_merge_), context(context_),
table_name_regexp(table_name_regexp_), destination_name_prefix(destination_name_prefix_), chunks_to_merge(chunks_to_merge_), context(context_),
thread_should_quit(false), merge_thread(&StorageChunkMerger::mergeThread, this), log(&Logger::get("StorageChunkMerger"))
{
}
@ -309,8 +307,8 @@ bool StorageChunkMerger::mergeChunks(const Storages & chunks)
std::string formatted_columns = formatColumnsForCreateQuery(*required_columns);
std::string new_table_name = makeName(destination_name_prefix, chunks[0]->getTableName(), chunks.back()->getTableName());
std::string new_table_full_name = destination_database + "." + new_table_name;
std::string new_table_name = makeName(destination_name_prefix, chunks.front()->getTableName(), chunks.back()->getTableName());
std::string new_table_full_name = source_database + "." + new_table_name;
StoragePtr new_storage_ptr;
try
@ -318,8 +316,8 @@ bool StorageChunkMerger::mergeChunks(const Storages & chunks)
{
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
if (!context.getDatabases().count(destination_database))
throw Exception("Destination database " + destination_database + " for table " + name + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
if (!context.getDatabases().count(source_database))
throw Exception("Destination database " + source_database + " for table " + name + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
LOG_TRACE(log, "Will merge " << chunks.size() << " chunks: from " << chunks[0]->getTableName() << " to " << chunks.back()->getTableName() << " to new table " << new_table_name << ".");
@ -334,7 +332,7 @@ bool StorageChunkMerger::mergeChunks(const Storages & chunks)
/// Уроним Chunks таблицу с таким именем, если она есть. Она могла остаться в результате прерванного слияния той же группы чанков.
ASTDropQuery * drop_ast = new ASTDropQuery;
ASTPtr drop_ptr = drop_ast;
drop_ast->database = destination_database;
drop_ast->database = source_database;
drop_ast->detach = false;
drop_ast->if_exists = true;
drop_ast->table = new_table_name;
@ -342,7 +340,7 @@ bool StorageChunkMerger::mergeChunks(const Storages & chunks)
drop_interpreter.execute();
/// Составим запрос для создания Chunks таблицы.
std::string create_query = "CREATE TABLE " + destination_database + "." + new_table_name + " " + formatted_columns + " ENGINE = Chunks";
std::string create_query = "CREATE TABLE " + source_database + "." + new_table_name + " " + formatted_columns + " ENGINE = Chunks";
/// Распарсим запрос.
const char * begin = create_query.data();
@ -369,12 +367,12 @@ bool StorageChunkMerger::mergeChunks(const Storages & chunks)
}
/// Скопируем данные в новую таблицу.
StorageChunks * new_storage = dynamic_cast<StorageChunks *>(&*new_storage_ptr);
StorageChunks & new_storage = dynamic_cast<StorageChunks &>(*new_storage_ptr);
for (size_t chunk_index = 0; chunk_index < chunks.size(); ++chunk_index)
{
StoragePtr src_storage = chunks[chunk_index];
BlockOutputStreamPtr output = new_storage->writeToNewChunk(src_storage->getTableName());
BlockOutputStreamPtr output = new_storage.writeToNewChunk(src_storage->getTableName());
const NamesAndTypesList & src_columns = src_storage->getColumnsList();
Names src_column_names;
@ -424,45 +422,26 @@ bool StorageChunkMerger::mergeChunks(const Storages & chunks)
std::string src_name = src_storage->getTableName();
/// Если таблицу успели удалить, ничего не делаем.
if (!context.getDatabases()[source_database].count(src_storage->getTableName()))
if (!context.getDatabases()[source_database].count(src_name))
continue;
/// Перед удалением таблицы запомним запрос для ее создания.
ASTPtr create_query_ptr = context.getCreateQuery(source_database, src_storage->getTableName());
/// Роняем исходную таблицу.
ASTDropQuery * drop_query = new ASTDropQuery();
ASTPtr drop_query_ptr = drop_query;
drop_query->detach = false;
drop_query->if_exists = false;
drop_query->database = source_database;
drop_query->table = src_storage->getTableName();
drop_query->table = src_name;
InterpreterDropQuery interpreter_drop(drop_query_ptr, context);
interpreter_drop.execute();
/// Создаем на ее месте ChunkRef
/// (если бы ChunkRef хранил что-то в директории с данными, тут возникли бы проблемы, потому что данные src_storage еще не удалены).
try
{
ASTCreateQuery * create_query = dynamic_cast<ASTCreateQuery *>(&*create_query_ptr);
create_query->attach = false;
create_query->if_not_exists = false;
ASTFunction * ast_storage = new ASTFunction;
create_query->storage = ast_storage;
ast_storage->name = "ChunkRef";
ASTExpressionList * engine_params = new ASTExpressionList;
ast_storage->parameters = engine_params;
ast_storage->children.push_back(ast_storage->parameters);
engine_params->children.push_back(newIdentifier(destination_database, ASTIdentifier::Database));
engine_params->children.push_back(newIdentifier(new_table_name, ASTIdentifier::Table));
InterpreterCreateQuery interpreter_create(create_query_ptr, context);
interpreter_create.execute();
context.addTable(source_database, src_name, StorageChunkRef::create(src_name, context, source_database, new_table_name, false));
}
catch(...)
catch (...)
{
LOG_ERROR(log, "Chunk " + src_name + " was removed but not replaced. Its data is stored in table " << new_table_name << ". You may need to resolve this manually.");
@ -470,11 +449,11 @@ bool StorageChunkMerger::mergeChunks(const Storages & chunks)
}
}
}
currently_written_groups.erase(new_table_full_name);
}
new_storage->removeReference();
new_storage.removeReference();
LOG_TRACE(log, "Merged chunks.");

View File

@ -1,3 +1,5 @@
#include <DB/Parsers/ASTCreateQuery.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Storages/StorageChunkRef.h>
@ -20,6 +22,27 @@ BlockInputStreams StorageChunkRef::read(
return getSource().readFromChunk(name, column_names, query, settings, processed_stage, max_block_size, threads);
}
ASTPtr StorageChunkRef::getCustomCreateQuery(const Context & context) const
{
/// Берём CREATE запрос для таблицы, на которую эта ссылается, и меняем в ней имя и движок.
ASTPtr res = context.getCreateQuery(source_database_name, source_table_name);
ASTCreateQuery & res_create = dynamic_cast<ASTCreateQuery &>(*res);
res_create.database.clear();
res_create.table = name;
res_create.storage = new ASTFunction;
ASTFunction & storage_ast = static_cast<ASTFunction &>(*res_create.storage);
storage_ast.name = "ChunkRef";
storage_ast.arguments = new ASTExpressionList;
storage_ast.children.push_back(storage_ast.arguments);
ASTExpressionList & args_ast = static_cast<ASTExpressionList &>(*storage_ast.arguments);
args_ast.children.push_back(new ASTIdentifier(StringRange(), source_database_name, ASTIdentifier::Database));
args_ast.children.push_back(new ASTIdentifier(StringRange(), source_table_name, ASTIdentifier::Table));
return res;
}
void StorageChunkRef::dropImpl()
{
try
@ -45,13 +68,7 @@ StorageChunkRef::StorageChunkRef(const std::string & name_, const Context & cont
StorageChunks & StorageChunkRef::getSource()
{
StoragePtr table_ptr = context.getTable(source_database_name, source_table_name);
StorageChunks * chunks = dynamic_cast<StorageChunks *>(&*table_ptr);
if (chunks == NULL)
throw Exception("Referenced table " + source_table_name + " in database " + source_database_name + " doesn't exist", ErrorCodes::UNKNOWN_TABLE);
return *chunks;
return dynamic_cast<StorageChunks &>(*context.getTable(source_database_name, source_table_name));
}
const StorageChunks & StorageChunkRef::getSource() const

View File

@ -1,4 +1,5 @@
#include <DB/Storages/StorageChunks.h>
#include <DB/Storages/StorageChunkRef.h>
#include <DB/Common/escapeForFileName.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/WriteHelpers.h>
@ -43,8 +44,6 @@ BlockInputStreams StorageChunks::readFromChunk(
size_t max_block_size,
unsigned threads)
{
loadIndex();
size_t mark1;
size_t mark2;
@ -54,8 +53,8 @@ BlockInputStreams StorageChunks::readFromChunk(
if (!chunk_indices.count(chunk_name))
throw Exception("No chunk " + chunk_name + " in table " + name, ErrorCodes::CHUNK_NOT_FOUND);
size_t index = chunk_indices[chunk_name];
mark1 = marks[index];
mark2 = index + 1 == marks.size() ? marksCount() : marks[index + 1];
mark1 = chunk_num_to_marks[index];
mark2 = index + 1 == chunk_num_to_marks.size() ? marksCount() : chunk_num_to_marks[index + 1];
}
return read(mark1, mark2, column_names, query, settings, processed_stage, max_block_size, threads);
@ -64,18 +63,16 @@ BlockInputStreams StorageChunks::readFromChunk(
BlockOutputStreamPtr StorageChunks::writeToNewChunk(
const std::string & chunk_name)
{
loadIndex();
{
Poco::ScopedWriteRWLock lock(rwlock);
if (chunk_indices.count(chunk_name))
throw Exception("Duplicate chunk name in table " + name, ErrorCodes::DUPLICATE_CHUNK_NAME);
size_t mark = marksCount();
chunk_indices[chunk_name] = marks.size();
chunk_indices[chunk_name] = chunk_num_to_marks.size();
appendChunkToIndex(chunk_name, mark);
marks.push_back(mark);
chunk_num_to_marks.push_back(mark);
}
return StorageLog::write(NULL);
@ -91,22 +88,25 @@ StorageChunks::StorageChunks(
:
StorageLog(path_, name_, columns_),
database_name(database_name_),
index_loaded(false),
reference_counter(path_ + escapeForFileName(name_) + "/refcount.txt"),
context(context_),
log(&Logger::get("StorageChunks"))
{
if (!attach)
reference_counter.add(1, true);
loadIndex();
/// Создадим все таблицы типа ChunkRef. Они должны располагаться в той же БД.
for (ChunkIndices::const_iterator it = chunk_indices.begin(); it != chunk_indices.end(); ++it)
context.addTable(database_name, it->first, StorageChunkRef::create(it->first, context, database_name, name, true));
}
void StorageChunks::loadIndex()
{
loadMarks();
Poco::ScopedWriteRWLock lock(rwlock);
if (index_loaded)
return;
index_loaded = true;
String index_path = path + escapeForFileName(name) + "/chunks.chn";
@ -122,8 +122,8 @@ void StorageChunks::loadIndex()
readStringBinary(name, index);
readIntBinary<UInt64>(mark, index);
chunk_indices[name] = marks.size();
marks.push_back(mark);
chunk_indices[name] = chunk_num_to_marks.size();
chunk_num_to_marks.push_back(mark);
}
}

View File

@ -45,24 +45,7 @@ StoragePtr StorageFactory::get(
}
else if (name == "ChunkRef")
{
ASTs & args_func = dynamic_cast<ASTFunction &>(*dynamic_cast<ASTCreateQuery &>(*query).storage).children;
if (args_func.size() != 1)
throw Exception("Storage ChunkRef requires exactly 2 parameters"
" - names of source database and source table.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = dynamic_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() != 2)
throw Exception("Storage ChunkRef requires exactly 2 parameters"
" - names of source database and source table.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String source_database = dynamic_cast<ASTIdentifier &>(*args[0]).name;
String source_table = dynamic_cast<ASTIdentifier &>(*args[1]).name;
return StorageChunkRef::create(table_name, context, source_database, source_table, attach);
throw Exception("Table with storage ChunkRef must not be created manually.", ErrorCodes::TABLE_MUST_NOT_BE_CREATED_MANUALLY);
}
else if (name == "ChunkMerger")
{
@ -75,7 +58,7 @@ StoragePtr StorageFactory::get(
ASTs & args = dynamic_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() < 3 || args.size() > 5)
if (args.size() < 3 || args.size() > 4)
break;
String source_database = dynamic_cast<ASTIdentifier &>(*args[0]).name;
@ -87,14 +70,12 @@ StoragePtr StorageFactory::get(
if (args.size() > 3)
destination_name_prefix = dynamic_cast<ASTIdentifier &>(*args[3]).name;
if (args.size() > 4)
destination_database = dynamic_cast<ASTIdentifier &>(*args[4]).name;
return StorageChunkMerger::create(database_name, table_name, columns, source_database, source_table_name_regexp, destination_database, destination_name_prefix, chunks_to_merge, context);
return StorageChunkMerger::create(database_name, table_name, columns, source_database, source_table_name_regexp, destination_name_prefix, chunks_to_merge, context);
} while(false);
throw Exception("Storage ChunkMerger requires from 3 to 5 parameters:"
" source database, regexp for source table names, number of chunks to merge, [destination tables name prefix, [destination database]].",
throw Exception("Storage ChunkMerger requires from 3 to 4 parameters:"
" source database, regexp for source table names, number of chunks to merge, [destination tables name prefix].",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
else if (name == "TinyLog")