2013-02-08 17:06:29 +00:00
# include <DB/Storages/StorageChunkMerger.h>
# include <DB/Storages/StorageChunks.h>
# include <DB/Storages/StorageChunkRef.h>
# include <DB/Parsers/ASTCreateQuery.h>
# include <DB/Parsers/ASTNameTypePair.h>
# include <DB/Parsers/ASTLiteral.h>
# include <DB/Parsers/ASTIdentifier.h>
# include <DB/Parsers/ASTSelectQuery.h>
# include <DB/Parsers/ASTDropQuery.h>
2013-02-14 12:51:52 +00:00
# include <DB/Parsers/ParserCreateQuery.h>
2013-02-08 17:06:29 +00:00
# include <DB/Interpreters/InterpreterCreateQuery.h>
# include <DB/Interpreters/InterpreterDropQuery.h>
# include <DB/Interpreters/InterpreterRenameQuery.h>
# include <DB/DataStreams/copyData.h>
# include <DB/DataStreams/ConcatBlockInputStream.h>
# include <DB/DataStreams/narrowBlockInputStreams.h>
2013-02-14 12:51:52 +00:00
# include <DB/DataStreams/AddingDefaultBlockInputStream.h>
2013-02-08 17:06:29 +00:00
namespace DB
{
const int SLEEP_AFTER_MERGE = 1 ;
const int SLEEP_NO_WORK = 10 ;
const int SLEEP_AFTER_ERROR = 60 ;
2013-05-15 12:34:26 +00:00
StorageChunkMerger : : TableNames StorageChunkMerger : : currently_written_groups ;
2013-02-08 17:06:29 +00:00
StoragePtr StorageChunkMerger : : create (
const std : : string & this_database_ ,
const std : : string & name_ ,
NamesAndTypesListPtr columns_ ,
const String & source_database_ ,
const String & table_name_regexp_ ,
const std : : string & destination_name_prefix_ ,
size_t chunks_to_merge_ ,
Context & context_ )
{
2013-06-17 07:01:31 +00:00
return ( new StorageChunkMerger ( this_database_ , name_ , columns_ , source_database_ , table_name_regexp_ , destination_name_prefix_ , chunks_to_merge_ , context_ ) ) - > thisPtr ( ) ;
2013-02-08 17:06:29 +00:00
}
BlockInputStreams StorageChunkMerger : : read (
const Names & column_names ,
ASTPtr query ,
const Settings & settings ,
QueryProcessingStage : : Enum & processed_stage ,
size_t max_block_size ,
unsigned threads )
{
/// Будем читать из таблиц Chunks, на которые есть хоть одна ChunkRef, подходящая под регэксп, и из прочих таблиц, подходящих под регэксп.
Storages selected_tables ;
{
Poco : : ScopedLock < Poco : : Mutex > lock ( context . getMutex ( ) ) ;
typedef std : : set < std : : string > StringSet ;
StringSet chunks_table_names ;
Databases & databases = context . getDatabases ( ) ;
if ( ! databases . count ( source_database ) )
throw Exception ( " No database " + source_database , ErrorCodes : : UNKNOWN_DATABASE ) ;
Tables & tables = databases [ source_database ] ;
for ( Tables : : iterator it = tables . begin ( ) ; it ! = tables . end ( ) ; + + it )
{
StoragePtr table = it - > second ;
if ( table_name_regexp . match ( it - > first ) & &
! dynamic_cast < StorageChunks * > ( & * table ) & &
! dynamic_cast < StorageChunkMerger * > ( & * table ) )
{
if ( StorageChunkRef * chunk_ref = dynamic_cast < StorageChunkRef * > ( & * table ) )
{
if ( chunk_ref - > source_database_name ! = source_database )
{
LOG_WARNING ( log , " ChunkRef " + chunk_ref - > getTableName ( ) + " points to another database, ignoring " ) ;
continue ;
}
if ( ! chunks_table_names . count ( chunk_ref - > source_table_name ) )
{
if ( tables . count ( chunk_ref - > source_table_name ) )
{
chunks_table_names . insert ( chunk_ref - > source_table_name ) ;
selected_tables . push_back ( tables [ chunk_ref - > source_table_name ] ) ;
}
else
{
LOG_WARNING ( log , " ChunkRef " + chunk_ref - > getTableName ( ) + " points to non-existing Chunks table, ignoring " ) ;
}
}
}
else
{
selected_tables . push_back ( table ) ;
}
}
}
}
BlockInputStreams res ;
/// Среди всех стадий, до которых обрабатывается запрос в таблицах-источниках, выберем минимальную.
processed_stage = QueryProcessingStage : : Complete ;
QueryProcessingStage : : Enum tmp_processed_stage = QueryProcessingStage : : Complete ;
for ( Storages : : iterator it = selected_tables . begin ( ) ; it ! = selected_tables . end ( ) ; + + it )
{
BlockInputStreams source_streams = ( * it ) - > read (
column_names ,
query ,
settings ,
tmp_processed_stage ,
max_block_size ,
selected_tables . size ( ) > threads ? 1 : ( threads / selected_tables . size ( ) ) ) ;
for ( BlockInputStreams : : iterator jt = source_streams . begin ( ) ; jt ! = source_streams . end ( ) ; + + jt )
res . push_back ( * jt ) ;
if ( tmp_processed_stage < processed_stage )
processed_stage = tmp_processed_stage ;
}
/** Если истчоников слишком много, то склеим их в threads источников.
*/
if ( res . size ( ) > threads )
res = narrowBlockInputStreams ( res , threads ) ;
return res ;
}
StorageChunkMerger : : StorageChunkMerger (
const std : : string & this_database_ ,
const std : : string & name_ ,
NamesAndTypesListPtr columns_ ,
const String & source_database_ ,
const String & table_name_regexp_ ,
const std : : string & destination_name_prefix_ ,
size_t chunks_to_merge_ ,
Context & context_ )
: this_database ( this_database_ ) , name ( name_ ) , columns ( columns_ ) , source_database ( source_database_ ) ,
2013-06-17 07:01:31 +00:00
table_name_regexp ( table_name_regexp_ ) , destination_name_prefix ( destination_name_prefix_ ) , chunks_to_merge ( chunks_to_merge_ ) , context ( context_ ) ,
2013-07-28 01:15:52 +00:00
log ( & Logger : : get ( " StorageChunkMerger " ) )
2013-02-08 17:06:29 +00:00
{
2013-06-30 17:12:34 +00:00
merge_thread = boost : : thread ( & StorageChunkMerger : : mergeThread , this ) ;
2013-02-08 17:06:29 +00:00
}
2013-02-20 10:29:26 +00:00
StorageChunkMerger : : ~ StorageChunkMerger ( )
{
2013-07-28 01:15:52 +00:00
cancel_merge_thread . set ( ) ;
merge_thread . join ( ) ;
2013-02-20 10:29:26 +00:00
}
2013-02-08 17:06:29 +00:00
void StorageChunkMerger : : mergeThread ( )
{
2013-07-28 01:15:52 +00:00
while ( true )
2013-02-08 17:06:29 +00:00
{
2013-02-08 17:09:14 +00:00
bool merged = false ;
2013-02-08 17:06:29 +00:00
bool error = true ;
2013-07-28 01:15:52 +00:00
2013-02-08 17:06:29 +00:00
try
{
merged = maybeMergeSomething ( ) ;
error = false ;
}
catch ( const DB : : Exception & e )
{
2013-02-21 09:38:25 +00:00
LOG_ERROR ( log , " StorageChunkMerger at " < < this_database < < " . " < < name < < " failed to merge: DB::Exception. Code: " < < e . code ( ) < < " , e.displayText() = " < < e . displayText ( ) < < " , e.what() = " < < e . what ( )
2013-02-08 17:06:29 +00:00
< < " , Stack trace: \n \n " < < e . getStackTrace ( ) . toString ( ) ) ;
}
catch ( const Poco : : Exception & e )
{
2013-02-21 09:38:25 +00:00
LOG_ERROR ( log , " StorageChunkMerger at " < < this_database < < " . " < < name < < " failed to merge: Poco::Exception. Code: " < < ErrorCodes : : POCO_EXCEPTION < < " , e.code() = " < < e . code ( )
2013-02-08 17:06:29 +00:00
< < " , e.displayText() = " < < e . displayText ( ) < < " , e.what() = " < < e . what ( ) ) ;
}
catch ( const std : : exception & e )
{
2013-02-21 09:38:25 +00:00
LOG_ERROR ( log , " StorageChunkMerger at " < < this_database < < " . " < < name < < " failed to merge: std::exception. Code: " < < ErrorCodes : : STD_EXCEPTION < < " , e.what() = " < < e . what ( ) ) ;
2013-02-08 17:06:29 +00:00
}
catch ( . . . )
{
2013-02-21 09:38:25 +00:00
LOG_ERROR ( log , " StorageChunkMerger at " < < this_database < < " . " < < name < < " failed to merge: unknown exception. Code: " < < ErrorCodes : : UNKNOWN_EXCEPTION ) ;
2013-02-08 17:06:29 +00:00
}
2013-07-28 01:15:52 +00:00
unsigned sleep_ammount = error ? SLEEP_AFTER_ERROR : ( merged ? SLEEP_AFTER_MERGE : SLEEP_NO_WORK ) ;
if ( cancel_merge_thread . tryWait ( 1000 * sleep_ammount ) )
2013-02-08 17:06:29 +00:00
break ;
}
}
2013-06-15 08:48:30 +00:00
static std : : string makeName ( const std : : string & prefix , const std : : string & first_chunk , const std : : string & last_chunk )
2013-02-08 17:06:29 +00:00
{
size_t lcp = 0 ; /// Длина общего префикса имен чанков.
while ( lcp < first_chunk . size ( ) & & lcp < last_chunk . size ( ) & & first_chunk [ lcp ] = = last_chunk [ lcp ] )
+ + lcp ;
return prefix + first_chunk + " _ " + last_chunk . substr ( lcp ) ;
}
bool StorageChunkMerger : : maybeMergeSomething ( )
{
Storages chunks = selectChunksToMerge ( ) ;
if ( chunks . empty ( ) )
return false ;
2013-05-15 13:45:44 +00:00
return mergeChunks ( chunks ) ;
2013-02-08 17:06:29 +00:00
}
StorageChunkMerger : : Storages StorageChunkMerger : : selectChunksToMerge ( )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( context . getMutex ( ) ) ;
Storages res ;
Databases & databases = context . getDatabases ( ) ;
if ( ! databases . count ( source_database ) )
throw Exception ( " No database " + source_database , ErrorCodes : : UNKNOWN_DATABASE ) ;
Tables & tables = databases [ source_database ] ;
for ( Tables : : iterator it = tables . begin ( ) ; it ! = tables . end ( ) ; + + it )
{
StoragePtr table = it - > second ;
if ( table_name_regexp . match ( it - > first ) & &
! dynamic_cast < StorageChunks * > ( & * table ) & &
! dynamic_cast < StorageChunkMerger * > ( & * table ) & &
! dynamic_cast < StorageChunkRef * > ( & * table ) )
{
res . push_back ( table ) ;
if ( res . size ( ) > = chunks_to_merge )
break ;
}
}
if ( res . size ( ) < chunks_to_merge )
res . clear ( ) ;
return res ;
}
2013-06-15 08:48:30 +00:00
static ASTPtr newIdentifier ( const std : : string & name , ASTIdentifier : : Kind kind )
2013-02-08 17:06:29 +00:00
{
ASTIdentifier * res = new ASTIdentifier ;
res - > name = name ;
res - > kind = kind ;
return res ;
}
2013-06-15 08:48:30 +00:00
static std : : string formatColumnsForCreateQuery ( NamesAndTypesList & columns )
2013-02-14 12:51:52 +00:00
{
std : : string res ;
res + = " ( " ;
for ( NamesAndTypesList : : iterator it = columns . begin ( ) ; it ! = columns . end ( ) ; + + it )
{
if ( it ! = columns . begin ( ) )
res + = " , " ;
res + = it - > first ;
res + = " " ;
res + = it - > second - > getName ( ) ;
}
res + = " ) " ;
return res ;
}
2013-05-15 13:45:44 +00:00
bool StorageChunkMerger : : mergeChunks ( const Storages & chunks )
2013-02-08 17:06:29 +00:00
{
2013-02-14 12:51:52 +00:00
typedef std : : map < std : : string , DataTypePtr > ColumnsMap ;
/// Объединим множества столбцов сливаемых чанков.
ColumnsMap known_columns_types ( columns - > begin ( ) , columns - > end ( ) ) ;
NamesAndTypesListPtr required_columns = new NamesAndTypesList ;
* required_columns = * columns ;
for ( size_t chunk_index = 0 ; chunk_index < chunks . size ( ) ; + + chunk_index )
{
const NamesAndTypesList & current_columns = chunks [ chunk_index ] - > getColumnsList ( ) ;
for ( NamesAndTypesList : : const_iterator it = current_columns . begin ( ) ; it ! = current_columns . end ( ) ; + + it )
{
const std : : string & name = it - > first ;
const DataTypePtr & type = it - > second ;
if ( known_columns_types . count ( name ) )
{
2013-03-14 12:11:23 +00:00
String current_type_name = type - > getName ( ) ;
String known_type_name = known_columns_types [ name ] - > getName ( ) ;
if ( current_type_name ! = known_type_name )
2013-03-14 12:54:59 +00:00
throw Exception ( " Different types of column " + name + " in different chunks: type " + current_type_name + " in chunk " + chunks [ chunk_index ] - > getTableName ( ) + " , type " + known_type_name + " somewhere else " , ErrorCodes : : TYPE_MISMATCH ) ;
2013-02-14 12:51:52 +00:00
}
else
{
known_columns_types [ name ] = type ;
required_columns - > push_back ( * it ) ;
}
}
}
2013-06-15 08:48:30 +00:00
std : : string formatted_columns = formatColumnsForCreateQuery ( * required_columns ) ;
2013-02-14 12:51:52 +00:00
2013-06-17 07:01:31 +00:00
std : : string new_table_name = makeName ( destination_name_prefix , chunks . front ( ) - > getTableName ( ) , chunks . back ( ) - > getTableName ( ) ) ;
std : : string new_table_full_name = source_database + " . " + new_table_name ;
2013-02-08 17:06:29 +00:00
StoragePtr new_storage_ptr ;
2013-05-15 12:32:40 +00:00
try
2013-02-08 17:06:29 +00:00
{
2013-02-14 12:51:52 +00:00
{
2013-05-15 12:32:40 +00:00
Poco : : ScopedLock < Poco : : Mutex > lock ( context . getMutex ( ) ) ;
2013-06-17 07:01:31 +00:00
if ( ! context . getDatabases ( ) . count ( source_database ) )
throw Exception ( " Destination database " + source_database + " for table " + name + " doesn't exist " , ErrorCodes : : UNKNOWN_DATABASE ) ;
2013-05-15 12:32:40 +00:00
LOG_TRACE ( log , " Will merge " < < chunks . size ( ) < < " chunks: from " < < chunks [ 0 ] - > getTableName ( ) < < " to " < < chunks . back ( ) - > getTableName ( ) < < " to new table " < < new_table_name < < " . " ) ;
if ( currently_written_groups . count ( new_table_full_name ) )
{
LOG_WARNING ( log , " Table " + new_table_full_name + " is already being written. Aborting merge. " ) ;
2013-05-15 13:45:44 +00:00
return false ;
2013-05-15 12:32:40 +00:00
}
currently_written_groups . insert ( new_table_full_name ) ;
/// Уроним Chunks таблицу с таким именем, если она есть. Она могла остаться в результате прерванного слияния той же группы чанков.
ASTDropQuery * drop_ast = new ASTDropQuery ;
ASTPtr drop_ptr = drop_ast ;
2013-06-17 07:01:31 +00:00
drop_ast - > database = source_database ;
2013-05-15 12:32:40 +00:00
drop_ast - > detach = false ;
drop_ast - > if_exists = true ;
drop_ast - > table = new_table_name ;
InterpreterDropQuery drop_interpreter ( drop_ptr , context ) ;
drop_interpreter . execute ( ) ;
/// Составим запрос для создания Chunks таблицы.
2013-06-17 07:01:31 +00:00
std : : string create_query = " CREATE TABLE " + source_database + " . " + new_table_name + " " + formatted_columns + " ENGINE = Chunks " ;
2013-05-15 12:32:40 +00:00
/// Распарсим запрос.
const char * begin = create_query . data ( ) ;
const char * end = begin + create_query . size ( ) ;
const char * pos = begin ;
ParserCreateQuery parser ;
ASTPtr ast_create_query ;
String expected ;
bool parse_res = parser . parse ( pos , end , ast_create_query , expected ) ;
/// Распарсенный запрос должен заканчиваться на конец входных данных.
if ( ! parse_res | | pos ! = end )
throw DB : : Exception ( " Syntax error while parsing create query made by ChunkMerger. "
" The query is \" " + create_query + " \" . "
2013-06-21 20:34:19 +00:00
+ " Failed at position " + toString ( pos - begin ) + " : "
2013-05-15 12:32:40 +00:00
+ std : : string ( pos , std : : min ( SHOW_CHARS_ON_SYNTAX_ERROR , end - pos ) )
+ " , expected " + ( parse_res ? " end of query " : expected ) + " . " ,
DB : : ErrorCodes : : LOGICAL_ERROR ) ;
/// Выполним запрос.
InterpreterCreateQuery create_interpreter ( ast_create_query , context ) ;
new_storage_ptr = create_interpreter . execute ( ) ;
2013-02-14 12:51:52 +00:00
}
2013-02-08 17:06:29 +00:00
2013-05-15 12:32:40 +00:00
/// Скопируем данные в новую таблицу.
2013-06-17 07:01:31 +00:00
StorageChunks & new_storage = dynamic_cast < StorageChunks & > ( * new_storage_ptr ) ;
2013-02-08 17:06:29 +00:00
for ( size_t chunk_index = 0 ; chunk_index < chunks . size ( ) ; + + chunk_index )
{
StoragePtr src_storage = chunks [ chunk_index ] ;
2013-06-17 07:01:31 +00:00
BlockOutputStreamPtr output = new_storage . writeToNewChunk ( src_storage - > getTableName ( ) ) ;
2013-02-18 13:10:59 +00:00
2013-05-15 12:32:40 +00:00
const NamesAndTypesList & src_columns = src_storage - > getColumnsList ( ) ;
Names src_column_names ;
2013-02-08 17:06:29 +00:00
2013-05-15 12:32:40 +00:00
ASTSelectQuery * select_query = new ASTSelectQuery ;
ASTPtr select_query_ptr = select_query ;
2013-02-08 17:06:29 +00:00
2013-05-15 12:32:40 +00:00
/// Запрос, вынимающий нужные столбцы.
ASTPtr select_expression_list ;
ASTPtr database ;
ASTPtr table ; /// Идентификатор или подзапрос (рекурсивно ASTSelectQuery)
2013-06-15 08:48:30 +00:00
select_query - > database = newIdentifier ( source_database , ASTIdentifier : : Database ) ;
select_query - > table = newIdentifier ( src_storage - > getTableName ( ) , ASTIdentifier : : Table ) ;
2013-05-15 12:32:40 +00:00
ASTExpressionList * select_list = new ASTExpressionList ;
select_query - > select_expression_list = select_list ;
for ( NamesAndTypesList : : const_iterator it = src_columns . begin ( ) ; it ! = src_columns . end ( ) ; + + it )
2013-02-08 17:06:29 +00:00
{
2013-05-15 12:32:40 +00:00
src_column_names . push_back ( it - > first ) ;
2013-06-15 08:48:30 +00:00
select_list - > children . push_back ( newIdentifier ( it - > first , ASTIdentifier : : Column ) ) ;
2013-02-08 17:06:29 +00:00
}
2013-05-15 12:32:40 +00:00
2013-09-13 23:28:40 +00:00
QueryProcessingStage : : Enum processed_stage = QueryProcessingStage : : Complete ;
2013-05-15 12:32:40 +00:00
Settings settings = context . getSettings ( ) ;
BlockInputStreams input_streams = src_storage - > read (
src_column_names ,
select_query_ptr ,
settings ,
2013-09-13 23:28:40 +00:00
processed_stage ,
DEFAULT_MERGE_BLOCK_SIZE ) ;
2013-05-15 12:32:40 +00:00
BlockInputStreamPtr input = new AddingDefaultBlockInputStream ( new ConcatBlockInputStream ( input_streams ) , required_columns ) ;
copyData ( * input , * output ) ;
}
/// Атомарно подменим исходные таблицы ссылками на новую.
{
Poco : : ScopedLock < Poco : : Mutex > lock ( context . getMutex ( ) ) ;
/// Если БД успели удалить, ничего не делаем.
if ( context . getDatabases ( ) . count ( source_database ) )
2013-02-08 17:06:29 +00:00
{
2013-05-15 12:32:40 +00:00
for ( size_t chunk_index = 0 ; chunk_index < chunks . size ( ) ; + + chunk_index )
{
StoragePtr src_storage = chunks [ chunk_index ] ;
std : : string src_name = src_storage - > getTableName ( ) ;
/// Если таблицу успели удалить, ничего не делаем.
2013-06-17 07:01:31 +00:00
if ( ! context . getDatabases ( ) [ source_database ] . count ( src_name ) )
2013-05-15 12:32:40 +00:00
continue ;
/// Роняем исходную таблицу.
ASTDropQuery * drop_query = new ASTDropQuery ( ) ;
ASTPtr drop_query_ptr = drop_query ;
drop_query - > detach = false ;
drop_query - > if_exists = false ;
drop_query - > database = source_database ;
2013-06-17 07:01:31 +00:00
drop_query - > table = src_name ;
2013-05-15 12:32:40 +00:00
InterpreterDropQuery interpreter_drop ( drop_query_ptr , context ) ;
interpreter_drop . execute ( ) ;
/// Создаем на е е месте ChunkRef
try
{
2013-06-17 07:01:31 +00:00
context . addTable ( source_database , src_name , StorageChunkRef : : create ( src_name , context , source_database , new_table_name , false ) ) ;
2013-05-15 12:32:40 +00:00
}
2013-06-17 07:01:31 +00:00
catch ( . . . )
2013-05-15 12:32:40 +00:00
{
LOG_ERROR ( log , " Chunk " + src_name + " was removed but not replaced. Its data is stored in table " < < new_table_name < < " . You may need to resolve this manually. " ) ;
throw ;
}
}
2013-02-08 17:06:29 +00:00
}
2013-06-17 07:01:31 +00:00
2013-05-15 12:32:40 +00:00
currently_written_groups . erase ( new_table_full_name ) ;
2013-02-08 17:06:29 +00:00
}
2013-05-05 18:02:05 +00:00
2013-06-17 07:01:31 +00:00
new_storage . removeReference ( ) ;
2013-05-05 18:02:05 +00:00
2013-05-15 12:32:40 +00:00
LOG_TRACE ( log , " Merged chunks. " ) ;
2013-05-15 13:45:44 +00:00
return true ;
2013-05-15 12:32:40 +00:00
}
catch ( . . . )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( context . getMutex ( ) ) ;
2013-05-15 13:33:19 +00:00
currently_written_groups . erase ( new_table_full_name ) ;
2013-05-15 12:32:40 +00:00
throw ;
}
2013-02-08 17:06:29 +00:00
}
}