mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 18:12:02 +00:00
Merge
This commit is contained in:
parent
2ba18d9efa
commit
471f6b3a66
@ -9,7 +9,7 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
/** То и дело объединяет таблицы, подходящие под гегэксп, в таблицы типа Chunks.
|
/** То и дело объединяет таблицы, подходящие под регэксп, в таблицы типа Chunks.
|
||||||
* После объндинения заменяет исходные таблицы таблицами типа ChunkRef.
|
* После объндинения заменяет исходные таблицы таблицами типа ChunkRef.
|
||||||
* При чтении ведет себя как таблица типа Merge.
|
* При чтении ведет себя как таблица типа Merge.
|
||||||
* Внимание: если в объединяемых таблицах были лишние столбцы, данные из этих столбцов потеряются при слиянии.
|
* Внимание: если в объединяемых таблицах были лишние столбцы, данные из этих столбцов потеряются при слиянии.
|
||||||
@ -78,6 +78,11 @@ private:
|
|||||||
bool maybeMergeSomething();
|
bool maybeMergeSomething();
|
||||||
Storages selectChunksToMerge();
|
Storages selectChunksToMerge();
|
||||||
void mergeChunks(const Storages & chunks);
|
void mergeChunks(const Storages & chunks);
|
||||||
|
|
||||||
|
typedef std::set<std::string> TableNames;
|
||||||
|
/// Какие таблицы типа Chunks сейчас пишет хоть один ChunkMerger.
|
||||||
|
/// Нужно смотреть, залочив mutex из контекста.
|
||||||
|
static TableNames currently_written_groups;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -308,163 +308,186 @@ void StorageChunkMerger::mergeChunks(const Storages & chunks)
|
|||||||
|
|
||||||
std::string formatted_columns = FormatColumnsForCreateQuery(*required_columns);
|
std::string formatted_columns = FormatColumnsForCreateQuery(*required_columns);
|
||||||
|
|
||||||
/// Атомарно выберем таблице уникальное имя и создадим ее.
|
|
||||||
std::string new_table_name = MakeName(destination_name_prefix, chunks[0]->getTableName(), chunks.back()->getTableName());
|
std::string new_table_name = MakeName(destination_name_prefix, chunks[0]->getTableName(), chunks.back()->getTableName());
|
||||||
|
std::string new_table_full_name;
|
||||||
StoragePtr new_storage_ptr;
|
StoragePtr new_storage_ptr;
|
||||||
{
|
|
||||||
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
|
|
||||||
|
|
||||||
if (!context.getDatabases().count(destination_database))
|
|
||||||
throw Exception("Destination database " + destination_database + " for table " + name + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
|
|
||||||
|
|
||||||
LOG_TRACE(log, "Will merge " << chunks.size() << " chunks: from " << chunks[0]->getTableName() << " to " << chunks.back()->getTableName() << " to new table " << new_table_name << ".");
|
|
||||||
|
|
||||||
/// Уроним Chunks таблицу с таким именем, если она есть. Она могла остаться в результате прерванного слияния той же группы чанков.
|
|
||||||
ASTDropQuery * drop_ast = new ASTDropQuery;
|
|
||||||
ASTPtr drop_ptr = drop_ast;
|
|
||||||
drop_ast->database = destination_database;
|
|
||||||
drop_ast->detach = false;
|
|
||||||
drop_ast->if_exists = true;
|
|
||||||
drop_ast->table = new_table_name;
|
|
||||||
InterpreterDropQuery drop_interpreter(drop_ptr, context);
|
|
||||||
drop_interpreter.execute();
|
|
||||||
|
|
||||||
/// Составим запрос для создания Chunks таблицы.
|
|
||||||
std::string create_query = "CREATE TABLE " + destination_database + "." + new_table_name + " " + formatted_columns + " ENGINE = Chunks";
|
|
||||||
|
|
||||||
/// Распарсим запрос.
|
|
||||||
const char * begin = create_query.data();
|
|
||||||
const char * end = begin + create_query.size();
|
|
||||||
const char * pos = begin;
|
|
||||||
|
|
||||||
ParserCreateQuery parser;
|
|
||||||
ASTPtr ast_create_query;
|
|
||||||
String expected;
|
|
||||||
bool parse_res = parser.parse(pos, end, ast_create_query, expected);
|
|
||||||
|
|
||||||
/// Распарсенный запрос должен заканчиваться на конец входных данных.
|
|
||||||
if (!parse_res || pos != end)
|
|
||||||
throw DB::Exception("Syntax error while parsing create query made by ChunkMerger."
|
|
||||||
" The query is \"" + create_query + "\"."
|
|
||||||
+ " Failed at position " + Poco::NumberFormatter::format(pos - begin) + ": "
|
|
||||||
+ std::string(pos, std::min(SHOW_CHARS_ON_SYNTAX_ERROR, end - pos))
|
|
||||||
+ ", expected " + (parse_res ? "end of query" : expected) + ".",
|
|
||||||
DB::ErrorCodes::LOGICAL_ERROR);
|
|
||||||
|
|
||||||
/// Выполним запрос.
|
|
||||||
InterpreterCreateQuery create_interpreter(ast_create_query, context);
|
|
||||||
new_storage_ptr = create_interpreter.execute();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Скопируем данные в новую таблицу.
|
try
|
||||||
StorageChunks * new_storage = dynamic_cast<StorageChunks *>(&*new_storage_ptr);
|
|
||||||
|
|
||||||
for (size_t chunk_index = 0; chunk_index < chunks.size(); ++chunk_index)
|
|
||||||
{
|
{
|
||||||
StoragePtr src_storage = chunks[chunk_index];
|
|
||||||
BlockOutputStreamPtr output = new_storage->writeToNewChunk(src_storage->getTableName());
|
|
||||||
|
|
||||||
const NamesAndTypesList & src_columns = src_storage->getColumnsList();
|
|
||||||
Names src_column_names;
|
|
||||||
|
|
||||||
ASTSelectQuery * select_query = new ASTSelectQuery;
|
|
||||||
ASTPtr select_query_ptr = select_query;
|
|
||||||
|
|
||||||
/// Запрос, вынимающий нужные столбцы.
|
|
||||||
ASTPtr select_expression_list;
|
|
||||||
ASTPtr database;
|
|
||||||
ASTPtr table; /// Идентификатор или подзапрос (рекурсивно ASTSelectQuery)
|
|
||||||
select_query->database = NewIdentifier(source_database, ASTIdentifier::Database);
|
|
||||||
select_query->table = NewIdentifier(src_storage->getTableName(), ASTIdentifier::Table);
|
|
||||||
ASTExpressionList * select_list = new ASTExpressionList;
|
|
||||||
select_query->select_expression_list = select_list;
|
|
||||||
for (NamesAndTypesList::const_iterator it = src_columns.begin(); it != src_columns.end(); ++it)
|
|
||||||
{
|
{
|
||||||
src_column_names.push_back(it->first);
|
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
|
||||||
select_list->children.push_back(NewIdentifier(it->first, ASTIdentifier::Column));
|
|
||||||
|
if (!context.getDatabases().count(destination_database))
|
||||||
|
throw Exception("Destination database " + destination_database + " for table " + name + " doesn't exist", ErrorCodes::UNKNOWN_DATABASE);
|
||||||
|
|
||||||
|
LOG_TRACE(log, "Will merge " << chunks.size() << " chunks: from " << chunks[0]->getTableName() << " to " << chunks.back()->getTableName() << " to new table " << new_table_name << ".");
|
||||||
|
|
||||||
|
if (currently_written_groups.count(new_table_full_name))
|
||||||
|
{
|
||||||
|
LOG_WARNING(log, "Table " + new_table_full_name + " is already being written. Aborting merge.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string new_table_full_name = destination_database + "." + new_table_name;
|
||||||
|
currently_written_groups.insert(new_table_full_name);
|
||||||
|
|
||||||
|
/// Уроним Chunks таблицу с таким именем, если она есть. Она могла остаться в результате прерванного слияния той же группы чанков.
|
||||||
|
ASTDropQuery * drop_ast = new ASTDropQuery;
|
||||||
|
ASTPtr drop_ptr = drop_ast;
|
||||||
|
drop_ast->database = destination_database;
|
||||||
|
drop_ast->detach = false;
|
||||||
|
drop_ast->if_exists = true;
|
||||||
|
drop_ast->table = new_table_name;
|
||||||
|
InterpreterDropQuery drop_interpreter(drop_ptr, context);
|
||||||
|
drop_interpreter.execute();
|
||||||
|
|
||||||
|
/// Составим запрос для создания Chunks таблицы.
|
||||||
|
std::string create_query = "CREATE TABLE " + destination_database + "." + new_table_name + " " + formatted_columns + " ENGINE = Chunks";
|
||||||
|
|
||||||
|
/// Распарсим запрос.
|
||||||
|
const char * begin = create_query.data();
|
||||||
|
const char * end = begin + create_query.size();
|
||||||
|
const char * pos = begin;
|
||||||
|
|
||||||
|
ParserCreateQuery parser;
|
||||||
|
ASTPtr ast_create_query;
|
||||||
|
String expected;
|
||||||
|
bool parse_res = parser.parse(pos, end, ast_create_query, expected);
|
||||||
|
|
||||||
|
/// Распарсенный запрос должен заканчиваться на конец входных данных.
|
||||||
|
if (!parse_res || pos != end)
|
||||||
|
throw DB::Exception("Syntax error while parsing create query made by ChunkMerger."
|
||||||
|
" The query is \"" + create_query + "\"."
|
||||||
|
+ " Failed at position " + Poco::NumberFormatter::format(pos - begin) + ": "
|
||||||
|
+ std::string(pos, std::min(SHOW_CHARS_ON_SYNTAX_ERROR, end - pos))
|
||||||
|
+ ", expected " + (parse_res ? "end of query" : expected) + ".",
|
||||||
|
DB::ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
|
/// Выполним запрос.
|
||||||
|
InterpreterCreateQuery create_interpreter(ast_create_query, context);
|
||||||
|
new_storage_ptr = create_interpreter.execute();
|
||||||
}
|
}
|
||||||
|
|
||||||
QueryProcessingStage::Enum processed_stage;
|
/// Скопируем данные в новую таблицу.
|
||||||
|
StorageChunks * new_storage = dynamic_cast<StorageChunks *>(&*new_storage_ptr);
|
||||||
Settings settings = context.getSettings();
|
|
||||||
|
|
||||||
BlockInputStreams input_streams = src_storage->read(
|
|
||||||
src_column_names,
|
|
||||||
select_query_ptr,
|
|
||||||
settings,
|
|
||||||
processed_stage);
|
|
||||||
|
|
||||||
BlockInputStreamPtr input = new AddingDefaultBlockInputStream(new ConcatBlockInputStream(input_streams), required_columns);
|
|
||||||
|
|
||||||
copyData(*input, *output);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Атомарно подменим исходные таблицы ссылками на новую.
|
|
||||||
do
|
|
||||||
{
|
|
||||||
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
|
|
||||||
|
|
||||||
/// Если БД успели удалить, ничего не делаем.
|
|
||||||
if (!context.getDatabases().count(source_database))
|
|
||||||
break;
|
|
||||||
|
|
||||||
for (size_t chunk_index = 0; chunk_index < chunks.size(); ++chunk_index)
|
for (size_t chunk_index = 0; chunk_index < chunks.size(); ++chunk_index)
|
||||||
{
|
{
|
||||||
StoragePtr src_storage = chunks[chunk_index];
|
StoragePtr src_storage = chunks[chunk_index];
|
||||||
std::string src_name = src_storage->getTableName();
|
BlockOutputStreamPtr output = new_storage->writeToNewChunk(src_storage->getTableName());
|
||||||
|
|
||||||
/// Если таблицу успели удалить, ничего не делаем.
|
const NamesAndTypesList & src_columns = src_storage->getColumnsList();
|
||||||
if (!context.getDatabases()[source_database].count(src_storage->getTableName()))
|
Names src_column_names;
|
||||||
continue;
|
|
||||||
|
|
||||||
/// Перед удалением таблицы запомним запрос для ее создания.
|
ASTSelectQuery * select_query = new ASTSelectQuery;
|
||||||
ASTPtr create_query_ptr = context.getCreateQuery(source_database, src_storage->getTableName());
|
ASTPtr select_query_ptr = select_query;
|
||||||
|
|
||||||
/// Роняем исходную таблицу.
|
/// Запрос, вынимающий нужные столбцы.
|
||||||
ASTDropQuery * drop_query = new ASTDropQuery();
|
ASTPtr select_expression_list;
|
||||||
ASTPtr drop_query_ptr = drop_query;
|
ASTPtr database;
|
||||||
drop_query->detach = false;
|
ASTPtr table; /// Идентификатор или подзапрос (рекурсивно ASTSelectQuery)
|
||||||
drop_query->if_exists = false;
|
select_query->database = NewIdentifier(source_database, ASTIdentifier::Database);
|
||||||
drop_query->database = source_database;
|
select_query->table = NewIdentifier(src_storage->getTableName(), ASTIdentifier::Table);
|
||||||
drop_query->table = src_storage->getTableName();
|
ASTExpressionList * select_list = new ASTExpressionList;
|
||||||
|
select_query->select_expression_list = select_list;
|
||||||
InterpreterDropQuery interpreter_drop(drop_query_ptr, context);
|
for (NamesAndTypesList::const_iterator it = src_columns.begin(); it != src_columns.end(); ++it)
|
||||||
interpreter_drop.execute();
|
|
||||||
|
|
||||||
/// Создаем на ее месте ChunkRef
|
|
||||||
/// (если бы ChunkRef хранил что-то в директории с данными, тут возникли бы проблемы, потому что данные src_storage еще не удалены).
|
|
||||||
try
|
|
||||||
{
|
{
|
||||||
ASTCreateQuery * create_query = dynamic_cast<ASTCreateQuery *>(&*create_query_ptr);
|
src_column_names.push_back(it->first);
|
||||||
create_query->attach = false;
|
select_list->children.push_back(NewIdentifier(it->first, ASTIdentifier::Column));
|
||||||
create_query->if_not_exists = false;
|
|
||||||
|
|
||||||
ASTFunction * ast_storage = new ASTFunction;
|
|
||||||
create_query->storage = ast_storage;
|
|
||||||
ast_storage->name = "ChunkRef";
|
|
||||||
|
|
||||||
ASTExpressionList * engine_params = new ASTExpressionList;
|
|
||||||
ast_storage->parameters = engine_params;
|
|
||||||
ast_storage->children.push_back(ast_storage->parameters);
|
|
||||||
engine_params->children.push_back(NewIdentifier(destination_database, ASTIdentifier::Database));
|
|
||||||
engine_params->children.push_back(NewIdentifier(new_table_name, ASTIdentifier::Table));
|
|
||||||
|
|
||||||
InterpreterCreateQuery interpreter_create(create_query_ptr, context);
|
|
||||||
interpreter_create.execute();
|
|
||||||
}
|
|
||||||
catch(...)
|
|
||||||
{
|
|
||||||
LOG_ERROR(log, "Chunk " + src_name + " was removed but not replaced. Its data is stored in table " << new_table_name << ". You may need to resolve this manually.");
|
|
||||||
|
|
||||||
throw;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
QueryProcessingStage::Enum processed_stage;
|
||||||
|
|
||||||
|
Settings settings = context.getSettings();
|
||||||
|
|
||||||
|
BlockInputStreams input_streams = src_storage->read(
|
||||||
|
src_column_names,
|
||||||
|
select_query_ptr,
|
||||||
|
settings,
|
||||||
|
processed_stage);
|
||||||
|
|
||||||
|
BlockInputStreamPtr input = new AddingDefaultBlockInputStream(new ConcatBlockInputStream(input_streams), required_columns);
|
||||||
|
|
||||||
|
copyData(*input, *output);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Атомарно подменим исходные таблицы ссылками на новую.
|
||||||
|
{
|
||||||
|
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
|
||||||
|
|
||||||
|
/// Если БД успели удалить, ничего не делаем.
|
||||||
|
if (context.getDatabases().count(source_database))
|
||||||
|
{
|
||||||
|
for (size_t chunk_index = 0; chunk_index < chunks.size(); ++chunk_index)
|
||||||
|
{
|
||||||
|
StoragePtr src_storage = chunks[chunk_index];
|
||||||
|
std::string src_name = src_storage->getTableName();
|
||||||
|
|
||||||
|
/// Если таблицу успели удалить, ничего не делаем.
|
||||||
|
if (!context.getDatabases()[source_database].count(src_storage->getTableName()))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
/// Перед удалением таблицы запомним запрос для ее создания.
|
||||||
|
ASTPtr create_query_ptr = context.getCreateQuery(source_database, src_storage->getTableName());
|
||||||
|
|
||||||
|
/// Роняем исходную таблицу.
|
||||||
|
ASTDropQuery * drop_query = new ASTDropQuery();
|
||||||
|
ASTPtr drop_query_ptr = drop_query;
|
||||||
|
drop_query->detach = false;
|
||||||
|
drop_query->if_exists = false;
|
||||||
|
drop_query->database = source_database;
|
||||||
|
drop_query->table = src_storage->getTableName();
|
||||||
|
|
||||||
|
InterpreterDropQuery interpreter_drop(drop_query_ptr, context);
|
||||||
|
interpreter_drop.execute();
|
||||||
|
|
||||||
|
/// Создаем на ее месте ChunkRef
|
||||||
|
/// (если бы ChunkRef хранил что-то в директории с данными, тут возникли бы проблемы, потому что данные src_storage еще не удалены).
|
||||||
|
try
|
||||||
|
{
|
||||||
|
ASTCreateQuery * create_query = dynamic_cast<ASTCreateQuery *>(&*create_query_ptr);
|
||||||
|
create_query->attach = false;
|
||||||
|
create_query->if_not_exists = false;
|
||||||
|
|
||||||
|
ASTFunction * ast_storage = new ASTFunction;
|
||||||
|
create_query->storage = ast_storage;
|
||||||
|
ast_storage->name = "ChunkRef";
|
||||||
|
|
||||||
|
ASTExpressionList * engine_params = new ASTExpressionList;
|
||||||
|
ast_storage->parameters = engine_params;
|
||||||
|
ast_storage->children.push_back(ast_storage->parameters);
|
||||||
|
engine_params->children.push_back(NewIdentifier(destination_database, ASTIdentifier::Database));
|
||||||
|
engine_params->children.push_back(NewIdentifier(new_table_name, ASTIdentifier::Table));
|
||||||
|
|
||||||
|
InterpreterCreateQuery interpreter_create(create_query_ptr, context);
|
||||||
|
interpreter_create.execute();
|
||||||
|
}
|
||||||
|
catch(...)
|
||||||
|
{
|
||||||
|
LOG_ERROR(log, "Chunk " + src_name + " was removed but not replaced. Its data is stored in table " << new_table_name << ". You may need to resolve this manually.");
|
||||||
|
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
currently_written_groups.erase(new_table_full_name);
|
||||||
|
new_table_full_name = "";
|
||||||
|
}
|
||||||
|
|
||||||
|
new_storage->removeReference();
|
||||||
|
|
||||||
|
LOG_TRACE(log, "Merged chunks.");
|
||||||
|
}
|
||||||
|
catch(...)
|
||||||
|
{
|
||||||
|
Poco::ScopedLock<Poco::Mutex> lock(context.getMutex());
|
||||||
|
|
||||||
|
if (new_table_full_name != "")
|
||||||
|
currently_written_groups.erase(new_table_full_name);
|
||||||
|
|
||||||
|
throw;
|
||||||
}
|
}
|
||||||
while (false);
|
|
||||||
|
|
||||||
new_storage->removeReference();
|
|
||||||
|
|
||||||
LOG_TRACE(log, "Merged chunks.");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user