2013-02-08 17:06:29 +00:00
# include <DB/Storages/StorageChunkMerger.h>
# include <DB/Storages/StorageChunks.h>
# include <DB/Storages/StorageChunkRef.h>
# include <DB/Parsers/ASTCreateQuery.h>
# include <DB/Parsers/ASTNameTypePair.h>
# include <DB/Parsers/ASTLiteral.h>
# include <DB/Parsers/ASTIdentifier.h>
# include <DB/Parsers/ASTSelectQuery.h>
# include <DB/Parsers/ASTDropQuery.h>
2013-02-14 12:51:52 +00:00
# include <DB/Parsers/ParserCreateQuery.h>
2014-02-26 19:50:04 +00:00
# include <DB/Parsers/formatAST.h>
2014-03-03 20:22:39 +00:00
# include <DB/Interpreters/executeQuery.h>
2014-03-20 10:59:45 +00:00
# include <DB/Interpreters/InterpreterDropQuery.h>
2013-02-08 17:06:29 +00:00
# include <DB/DataStreams/ConcatBlockInputStream.h>
# include <DB/DataStreams/narrowBlockInputStreams.h>
2013-02-14 12:51:52 +00:00
# include <DB/DataStreams/AddingDefaultBlockInputStream.h>
2014-01-17 15:19:20 +00:00
# include <DB/Common/VirtualColumnUtils.h>
2013-02-08 17:06:29 +00:00
namespace DB
{
const int SLEEP_AFTER_MERGE = 1 ;
const int SLEEP_NO_WORK = 10 ;
const int SLEEP_AFTER_ERROR = 60 ;
2013-05-15 12:34:26 +00:00
StorageChunkMerger : : TableNames StorageChunkMerger : : currently_written_groups ;
2013-02-08 17:06:29 +00:00
StoragePtr StorageChunkMerger : : create (
const std : : string & this_database_ ,
const std : : string & name_ ,
NamesAndTypesListPtr columns_ ,
const String & source_database_ ,
const String & table_name_regexp_ ,
const std : : string & destination_name_prefix_ ,
size_t chunks_to_merge_ ,
Context & context_ )
{
2013-06-17 07:01:31 +00:00
return ( new StorageChunkMerger ( this_database_ , name_ , columns_ , source_database_ , table_name_regexp_ , destination_name_prefix_ , chunks_to_merge_ , context_ ) ) - > thisPtr ( ) ;
2013-02-08 17:06:29 +00:00
}
2014-01-16 14:52:13 +00:00
NameAndTypePair StorageChunkMerger : : getColumn ( const String & column_name ) const
{
if ( column_name = = _table_column_name ) return std : : make_pair ( _table_column_name , new DataTypeString ) ;
return getRealColumn ( column_name ) ;
}
bool StorageChunkMerger : : hasColumn ( const String & column_name ) const
{
if ( column_name = = _table_column_name ) return true ;
return hasRealColumn ( column_name ) ;
}
2013-02-08 17:06:29 +00:00
BlockInputStreams StorageChunkMerger : : read (
const Names & column_names ,
ASTPtr query ,
const Settings & settings ,
QueryProcessingStage : : Enum & processed_stage ,
size_t max_block_size ,
unsigned threads )
{
/// Будем читать из таблиц Chunks, на которые есть хоть одна ChunkRef, подходящая под регэксп, и из прочих таблиц, подходящих под регэксп.
Storages selected_tables ;
2014-01-16 14:52:13 +00:00
2013-02-08 17:06:29 +00:00
{
Poco : : ScopedLock < Poco : : Mutex > lock ( context . getMutex ( ) ) ;
typedef std : : set < std : : string > StringSet ;
StringSet chunks_table_names ;
Databases & databases = context . getDatabases ( ) ;
if ( ! databases . count ( source_database ) )
throw Exception ( " No database " + source_database , ErrorCodes : : UNKNOWN_DATABASE ) ;
Tables & tables = databases [ source_database ] ;
for ( Tables : : iterator it = tables . begin ( ) ; it ! = tables . end ( ) ; + + it )
{
StoragePtr table = it - > second ;
if ( table_name_regexp . match ( it - > first ) & &
! dynamic_cast < StorageChunks * > ( & * table ) & &
! dynamic_cast < StorageChunkMerger * > ( & * table ) )
{
if ( StorageChunkRef * chunk_ref = dynamic_cast < StorageChunkRef * > ( & * table ) )
{
if ( chunk_ref - > source_database_name ! = source_database )
{
2014-03-20 13:00:42 +00:00
LOG_WARNING ( log , " ChunkRef " + it - > first + " points to another database, ignoring " ) ;
2013-02-08 17:06:29 +00:00
continue ;
}
if ( ! chunks_table_names . count ( chunk_ref - > source_table_name ) )
{
if ( tables . count ( chunk_ref - > source_table_name ) )
{
chunks_table_names . insert ( chunk_ref - > source_table_name ) ;
selected_tables . push_back ( tables [ chunk_ref - > source_table_name ] ) ;
}
else
{
2014-03-20 13:00:42 +00:00
LOG_WARNING ( log , " ChunkRef " + it - > first + " points to non-existing Chunks table, ignoring " ) ;
2013-02-08 17:06:29 +00:00
}
}
}
else
{
selected_tables . push_back ( table ) ;
}
}
}
}
2014-01-16 14:52:13 +00:00
2014-03-20 13:00:42 +00:00
TableLocks table_locks ;
/// Нельзя, чтобы эти таблицы кто-нибудь удалил, пока мы их читаем.
for ( auto table : selected_tables )
{
table_locks . push_back ( table - > lockStructure ( false ) ) ;
}
2013-02-08 17:06:29 +00:00
BlockInputStreams res ;
/// Среди всех стадий, до которых обрабатывается запрос в таблицах-источниках, выберем минимальную.
processed_stage = QueryProcessingStage : : Complete ;
QueryProcessingStage : : Enum tmp_processed_stage = QueryProcessingStage : : Complete ;
2014-02-11 18:38:21 +00:00
2014-02-26 10:59:56 +00:00
bool has_virtual_column = false ;
for ( const auto & column : column_names )
if ( column = = _table_column_name )
has_virtual_column = true ;
2014-02-11 18:38:21 +00:00
Block virtual_columns_block = getBlockWithVirtualColumns ( selected_tables ) ;
2014-02-26 10:59:56 +00:00
BlockInputStreamPtr virtual_columns ;
/// Если запрошен хотя бы один виртуальный столбец, пробуем индексировать
if ( has_virtual_column )
virtual_columns = VirtualColumnUtils : : getVirtualColumnsBlocks ( query - > clone ( ) , virtual_columns_block , context ) ;
else /// Иначе, считаем допустимыми все возможные значения
virtual_columns = new OneBlockInputStream ( virtual_columns_block ) ;
2014-03-19 12:35:27 +00:00
std : : multiset < String > values = VirtualColumnUtils : : extractSingleValueFromBlocks < String > ( virtual_columns , _table_column_name ) ;
2014-02-11 18:38:21 +00:00
bool all_inclusive = ( values . size ( ) = = virtual_columns_block . rows ( ) ) ;
2013-02-08 17:06:29 +00:00
2014-03-20 13:00:42 +00:00
for ( size_t i = 0 ; i < selected_tables . size ( ) ; + + i )
2013-02-08 17:06:29 +00:00
{
2014-03-20 13:00:42 +00:00
StoragePtr table = selected_tables [ i ] ;
auto table_lock = table_locks [ i ] ;
if ( table - > getName ( ) ! = " Chunks " & & ! all_inclusive & & values . find ( table - > getTableName ( ) ) = = values . end ( ) )
2014-02-11 18:38:21 +00:00
continue ;
2014-01-17 15:19:20 +00:00
/// Список виртуальных столбцов, которые мы заполним сейчас и список столбцов, которые передадим дальше
Names virt_column_names , real_column_names ;
for ( const auto & column : column_names )
2014-03-20 13:00:42 +00:00
if ( column = = _table_column_name & & table - > getName ( ) ! = " Chunks " ) /// таблица Chunks сама заполняет столбец _table
2014-01-17 15:19:20 +00:00
virt_column_names . push_back ( column ) ;
else
real_column_names . push_back ( column ) ;
/// Если в запросе только виртуальные столбцы, надо запросить хотя бы один любой другой.
if ( real_column_names . size ( ) = = 0 )
2014-03-20 13:00:42 +00:00
real_column_names . push_back ( ExpressionActions : : getSmallestColumn ( table - > getColumnsList ( ) ) ) ;
2014-01-17 15:19:20 +00:00
2014-02-26 10:59:56 +00:00
ASTPtr modified_query_ast = query - > clone ( ) ;
/// Подменяем виртуальный столбец на е г о значение
if ( ! virt_column_names . empty ( ) )
2014-03-20 13:00:42 +00:00
VirtualColumnUtils : : rewriteEntityInAst ( modified_query_ast , _table_column_name , table - > getTableName ( ) ) ;
2014-02-26 10:59:56 +00:00
2014-03-20 13:00:42 +00:00
BlockInputStreams source_streams = table - > read (
2014-01-16 14:52:13 +00:00
real_column_names ,
2014-02-26 10:59:56 +00:00
modified_query_ast ,
2013-02-08 17:06:29 +00:00
settings ,
tmp_processed_stage ,
max_block_size ,
selected_tables . size ( ) > threads ? 1 : ( threads / selected_tables . size ( ) ) ) ;
2014-01-16 14:52:13 +00:00
2014-03-20 13:00:42 +00:00
for ( auto & stream : source_streams )
{
stream - > addTableLock ( table_lock ) ;
}
2014-01-17 15:19:20 +00:00
/// Добавляем в ответ вирутальные столбцы
for ( const auto & virtual_column : virt_column_names )
2014-01-16 14:52:13 +00:00
{
if ( virtual_column = = _table_column_name )
{
for ( auto & stream : source_streams )
2014-01-17 15:19:20 +00:00
{
2014-03-20 13:00:42 +00:00
stream = new AddingConstColumnBlockInputStream < String > ( stream , new DataTypeString , table - > getTableName ( ) , _table_column_name ) ;
2014-01-17 15:19:20 +00:00
}
2014-01-16 14:52:13 +00:00
}
}
2013-02-08 17:06:29 +00:00
for ( BlockInputStreams : : iterator jt = source_streams . begin ( ) ; jt ! = source_streams . end ( ) ; + + jt )
res . push_back ( * jt ) ;
if ( tmp_processed_stage < processed_stage )
processed_stage = tmp_processed_stage ;
}
/** Если истчоников слишком много, то склеим их в threads источников.
*/
if ( res . size ( ) > threads )
res = narrowBlockInputStreams ( res , threads ) ;
return res ;
2014-02-11 18:38:21 +00:00
}
/// Построить блок состоящий только из возможных значений виртуальных столбцов
Block StorageChunkMerger : : getBlockWithVirtualColumns ( const Storages & selected_tables ) const
{
Block res ;
ColumnWithNameAndType _table ( new ColumnString , new DataTypeString , _table_column_name ) ;
for ( Storages : : const_iterator it = selected_tables . begin ( ) ; it ! = selected_tables . end ( ) ; + + it )
if ( ( * it ) - > getName ( ) ! = " Chunks " )
_table . column - > insert ( ( * it ) - > getTableName ( ) ) ;
res . insert ( _table ) ;
return res ;
2013-02-08 17:06:29 +00:00
}
StorageChunkMerger : : StorageChunkMerger (
const std : : string & this_database_ ,
const std : : string & name_ ,
NamesAndTypesListPtr columns_ ,
const String & source_database_ ,
const String & table_name_regexp_ ,
const std : : string & destination_name_prefix_ ,
size_t chunks_to_merge_ ,
Context & context_ )
: this_database ( this_database_ ) , name ( name_ ) , columns ( columns_ ) , source_database ( source_database_ ) ,
2013-09-14 05:14:22 +00:00
table_name_regexp ( table_name_regexp_ ) , destination_name_prefix ( destination_name_prefix_ ) , chunks_to_merge ( chunks_to_merge_ ) ,
context ( context_ ) , settings ( context . getSettings ( ) ) ,
2013-09-30 01:29:19 +00:00
log ( & Logger : : get ( " StorageChunkMerger " ) ) , shutdown_called ( false )
2013-02-08 17:06:29 +00:00
{
2013-06-30 17:12:34 +00:00
merge_thread = boost : : thread ( & StorageChunkMerger : : mergeThread , this ) ;
2014-01-16 15:25:54 +00:00
_table_column_name = " _table " + VirtualColumnUtils : : chooseSuffix ( getColumnsList ( ) , " _table " ) ;
2013-02-08 17:06:29 +00:00
}
2013-09-30 01:29:19 +00:00
void StorageChunkMerger : : shutdown ( )
2013-02-20 10:29:26 +00:00
{
2013-09-30 01:29:19 +00:00
if ( shutdown_called )
return ;
shutdown_called = true ;
2013-07-28 01:15:52 +00:00
cancel_merge_thread . set ( ) ;
merge_thread . join ( ) ;
2013-02-20 10:29:26 +00:00
}
2013-09-30 01:29:19 +00:00
StorageChunkMerger : : ~ StorageChunkMerger ( )
{
shutdown ( ) ;
}
2013-02-08 17:06:29 +00:00
void StorageChunkMerger : : mergeThread ( )
{
2013-07-28 01:15:52 +00:00
while ( true )
2013-02-08 17:06:29 +00:00
{
2013-02-08 17:09:14 +00:00
bool merged = false ;
2013-02-08 17:06:29 +00:00
bool error = true ;
2013-07-28 01:15:52 +00:00
2013-02-08 17:06:29 +00:00
try
{
merged = maybeMergeSomething ( ) ;
error = false ;
}
2013-10-26 03:20:51 +00:00
catch ( const Exception & e )
2013-02-08 17:06:29 +00:00
{
2013-10-26 03:20:51 +00:00
LOG_ERROR ( log , " StorageChunkMerger at " < < this_database < < " . " < < name < < " failed to merge: Code: " < < e . code ( ) < < " , e.displayText() = " < < e . displayText ( ) < < " , e.what() = " < < e . what ( )
2013-02-08 17:06:29 +00:00
< < " , Stack trace: \n \n " < < e . getStackTrace ( ) . toString ( ) ) ;
}
catch ( const Poco : : Exception & e )
{
2013-02-21 09:38:25 +00:00
LOG_ERROR ( log , " StorageChunkMerger at " < < this_database < < " . " < < name < < " failed to merge: Poco::Exception. Code: " < < ErrorCodes : : POCO_EXCEPTION < < " , e.code() = " < < e . code ( )
2013-02-08 17:06:29 +00:00
< < " , e.displayText() = " < < e . displayText ( ) < < " , e.what() = " < < e . what ( ) ) ;
}
catch ( const std : : exception & e )
{
2013-02-21 09:38:25 +00:00
LOG_ERROR ( log , " StorageChunkMerger at " < < this_database < < " . " < < name < < " failed to merge: std::exception. Code: " < < ErrorCodes : : STD_EXCEPTION < < " , e.what() = " < < e . what ( ) ) ;
2013-02-08 17:06:29 +00:00
}
catch ( . . . )
{
2013-02-21 09:38:25 +00:00
LOG_ERROR ( log , " StorageChunkMerger at " < < this_database < < " . " < < name < < " failed to merge: unknown exception. Code: " < < ErrorCodes : : UNKNOWN_EXCEPTION ) ;
2013-02-08 17:06:29 +00:00
}
2013-07-28 01:15:52 +00:00
unsigned sleep_ammount = error ? SLEEP_AFTER_ERROR : ( merged ? SLEEP_AFTER_MERGE : SLEEP_NO_WORK ) ;
2013-10-22 19:11:03 +00:00
if ( shutdown_called | | cancel_merge_thread . tryWait ( 1000 * sleep_ammount ) )
2013-02-08 17:06:29 +00:00
break ;
}
}
2013-06-15 08:48:30 +00:00
static std : : string makeName ( const std : : string & prefix , const std : : string & first_chunk , const std : : string & last_chunk )
2013-02-08 17:06:29 +00:00
{
size_t lcp = 0 ; /// Длина общего префикса имен чанков.
while ( lcp < first_chunk . size ( ) & & lcp < last_chunk . size ( ) & & first_chunk [ lcp ] = = last_chunk [ lcp ] )
+ + lcp ;
return prefix + first_chunk + " _ " + last_chunk . substr ( lcp ) ;
}
bool StorageChunkMerger : : maybeMergeSomething ( )
{
Storages chunks = selectChunksToMerge ( ) ;
2013-10-22 19:11:03 +00:00
if ( chunks . empty ( ) | | shutdown_called )
2013-02-08 17:06:29 +00:00
return false ;
2013-05-15 13:45:44 +00:00
return mergeChunks ( chunks ) ;
2013-02-08 17:06:29 +00:00
}
StorageChunkMerger : : Storages StorageChunkMerger : : selectChunksToMerge ( )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( context . getMutex ( ) ) ;
2013-09-14 05:14:22 +00:00
2013-02-08 17:06:29 +00:00
Storages res ;
2013-09-14 05:14:22 +00:00
2013-02-08 17:06:29 +00:00
Databases & databases = context . getDatabases ( ) ;
if ( ! databases . count ( source_database ) )
throw Exception ( " No database " + source_database , ErrorCodes : : UNKNOWN_DATABASE ) ;
2013-09-14 05:14:22 +00:00
2013-02-08 17:06:29 +00:00
Tables & tables = databases [ source_database ] ;
for ( Tables : : iterator it = tables . begin ( ) ; it ! = tables . end ( ) ; + + it )
{
StoragePtr table = it - > second ;
if ( table_name_regexp . match ( it - > first ) & &
! dynamic_cast < StorageChunks * > ( & * table ) & &
! dynamic_cast < StorageChunkMerger * > ( & * table ) & &
! dynamic_cast < StorageChunkRef * > ( & * table ) )
{
res . push_back ( table ) ;
if ( res . size ( ) > = chunks_to_merge )
break ;
}
}
2013-09-14 05:14:22 +00:00
2013-02-08 17:06:29 +00:00
if ( res . size ( ) < chunks_to_merge )
res . clear ( ) ;
return res ;
}
2013-06-15 08:48:30 +00:00
static ASTPtr newIdentifier ( const std : : string & name , ASTIdentifier : : Kind kind )
2013-02-08 17:06:29 +00:00
{
ASTIdentifier * res = new ASTIdentifier ;
res - > name = name ;
res - > kind = kind ;
return res ;
}
2013-05-15 13:45:44 +00:00
bool StorageChunkMerger : : mergeChunks ( const Storages & chunks )
2013-02-08 17:06:29 +00:00
{
2013-02-14 12:51:52 +00:00
typedef std : : map < std : : string , DataTypePtr > ColumnsMap ;
2014-03-20 13:00:42 +00:00
TableLocks table_locks ;
/// Нельзя, чтобы эти таблицы кто-нибудь удалил, пока мы их читаем.
for ( auto table : chunks )
{
table_locks . push_back ( table - > lockStructure ( false ) ) ;
}
2013-02-14 12:51:52 +00:00
/// Объединим множества столбцов сливаемых чанков.
ColumnsMap known_columns_types ( columns - > begin ( ) , columns - > end ( ) ) ;
NamesAndTypesListPtr required_columns = new NamesAndTypesList ;
* required_columns = * columns ;
for ( size_t chunk_index = 0 ; chunk_index < chunks . size ( ) ; + + chunk_index )
{
const NamesAndTypesList & current_columns = chunks [ chunk_index ] - > getColumnsList ( ) ;
for ( NamesAndTypesList : : const_iterator it = current_columns . begin ( ) ; it ! = current_columns . end ( ) ; + + it )
{
const std : : string & name = it - > first ;
const DataTypePtr & type = it - > second ;
if ( known_columns_types . count ( name ) )
{
2013-03-14 12:11:23 +00:00
String current_type_name = type - > getName ( ) ;
String known_type_name = known_columns_types [ name ] - > getName ( ) ;
if ( current_type_name ! = known_type_name )
2013-03-14 12:54:59 +00:00
throw Exception ( " Different types of column " + name + " in different chunks: type " + current_type_name + " in chunk " + chunks [ chunk_index ] - > getTableName ( ) + " , type " + known_type_name + " somewhere else " , ErrorCodes : : TYPE_MISMATCH ) ;
2013-02-14 12:51:52 +00:00
}
else
{
known_columns_types [ name ] = type ;
required_columns - > push_back ( * it ) ;
}
}
}
2013-06-15 08:48:30 +00:00
std : : string formatted_columns = formatColumnsForCreateQuery ( * required_columns ) ;
2013-02-14 12:51:52 +00:00
2013-06-17 07:01:31 +00:00
std : : string new_table_name = makeName ( destination_name_prefix , chunks . front ( ) - > getTableName ( ) , chunks . back ( ) - > getTableName ( ) ) ;
2014-03-03 20:22:39 +00:00
std : : string new_table_full_name = backQuoteIfNeed ( source_database ) + " . " + backQuoteIfNeed ( new_table_name ) ;
2013-02-08 17:06:29 +00:00
StoragePtr new_storage_ptr ;
2013-05-15 12:32:40 +00:00
try
2013-02-08 17:06:29 +00:00
{
2013-02-14 12:51:52 +00:00
{
2013-05-15 12:32:40 +00:00
Poco : : ScopedLock < Poco : : Mutex > lock ( context . getMutex ( ) ) ;
2013-06-17 07:01:31 +00:00
if ( ! context . getDatabases ( ) . count ( source_database ) )
throw Exception ( " Destination database " + source_database + " for table " + name + " doesn't exist " , ErrorCodes : : UNKNOWN_DATABASE ) ;
2013-05-15 12:32:40 +00:00
LOG_TRACE ( log , " Will merge " < < chunks . size ( ) < < " chunks: from " < < chunks [ 0 ] - > getTableName ( ) < < " to " < < chunks . back ( ) - > getTableName ( ) < < " to new table " < < new_table_name < < " . " ) ;
if ( currently_written_groups . count ( new_table_full_name ) )
{
LOG_WARNING ( log , " Table " + new_table_full_name + " is already being written. Aborting merge. " ) ;
2013-05-15 13:45:44 +00:00
return false ;
2013-05-15 12:32:40 +00:00
}
currently_written_groups . insert ( new_table_full_name ) ;
2014-03-20 10:59:45 +00:00
}
2013-05-15 12:32:40 +00:00
2014-03-20 10:59:45 +00:00
/// Уроним Chunks таблицу с таким именем, если она есть. Она могла остаться в результате прерванного слияния той же группы чанков.
executeQuery ( " DROP TABLE IF EXISTS " + new_table_full_name , context , true ) ;
2014-03-03 20:22:39 +00:00
2014-03-20 10:59:45 +00:00
/// Выполним запрос для создания Chunks таблицы.
executeQuery ( " CREATE TABLE " + new_table_full_name + " " + formatted_columns + " ENGINE = Chunks " , context , true ) ;
new_storage_ptr = context . getTable ( source_database , new_table_name ) ;
2014-03-03 20:22:39 +00:00
2013-05-15 12:32:40 +00:00
/// Скопируем данные в новую таблицу.
2013-06-17 07:01:31 +00:00
StorageChunks & new_storage = dynamic_cast < StorageChunks & > ( * new_storage_ptr ) ;
2013-02-08 17:06:29 +00:00
for ( size_t chunk_index = 0 ; chunk_index < chunks . size ( ) ; + + chunk_index )
{
StoragePtr src_storage = chunks [ chunk_index ] ;
2013-06-17 07:01:31 +00:00
BlockOutputStreamPtr output = new_storage . writeToNewChunk ( src_storage - > getTableName ( ) ) ;
2013-02-18 13:10:59 +00:00
2013-05-15 12:32:40 +00:00
const NamesAndTypesList & src_columns = src_storage - > getColumnsList ( ) ;
Names src_column_names ;
2013-02-08 17:06:29 +00:00
2013-05-15 12:32:40 +00:00
ASTSelectQuery * select_query = new ASTSelectQuery ;
ASTPtr select_query_ptr = select_query ;
2013-02-08 17:06:29 +00:00
2013-05-15 12:32:40 +00:00
/// Запрос, вынимающий нужные столбцы.
ASTPtr select_expression_list ;
ASTPtr database ;
ASTPtr table ; /// Идентификатор или подзапрос (рекурсивно ASTSelectQuery)
2013-06-15 08:48:30 +00:00
select_query - > database = newIdentifier ( source_database , ASTIdentifier : : Database ) ;
select_query - > table = newIdentifier ( src_storage - > getTableName ( ) , ASTIdentifier : : Table ) ;
2013-05-15 12:32:40 +00:00
ASTExpressionList * select_list = new ASTExpressionList ;
select_query - > select_expression_list = select_list ;
for ( NamesAndTypesList : : const_iterator it = src_columns . begin ( ) ; it ! = src_columns . end ( ) ; + + it )
2013-02-08 17:06:29 +00:00
{
2013-05-15 12:32:40 +00:00
src_column_names . push_back ( it - > first ) ;
2013-06-15 08:48:30 +00:00
select_list - > children . push_back ( newIdentifier ( it - > first , ASTIdentifier : : Column ) ) ;
2013-02-08 17:06:29 +00:00
}
2013-05-15 12:32:40 +00:00
2013-09-13 23:28:40 +00:00
QueryProcessingStage : : Enum processed_stage = QueryProcessingStage : : Complete ;
2013-05-15 12:32:40 +00:00
BlockInputStreams input_streams = src_storage - > read (
src_column_names ,
select_query_ptr ,
settings ,
2013-09-13 23:28:40 +00:00
processed_stage ,
DEFAULT_MERGE_BLOCK_SIZE ) ;
2013-05-15 12:32:40 +00:00
BlockInputStreamPtr input = new AddingDefaultBlockInputStream ( new ConcatBlockInputStream ( input_streams ) , required_columns ) ;
2014-03-20 13:00:42 +00:00
2013-10-22 19:11:03 +00:00
input - > readPrefix ( ) ;
output - > writePrefix ( ) ;
Block block ;
while ( ! shutdown_called & & ( block = input - > read ( ) ) )
output - > write ( block ) ;
if ( shutdown_called )
{
LOG_INFO ( log , " Shutdown requested while merging chunks. " ) ;
new_storage . removeReference ( ) ; /// После этого временные данные удалятся.
return false ;
}
input - > readSuffix ( ) ;
output - > writeSuffix ( ) ;
2013-05-15 12:32:40 +00:00
}
/// Атомарно подменим исходные таблицы ссылками на новую.
2014-03-20 10:59:45 +00:00
/// При этом удалять таблицы под мьютексом контекста нельзя, пока только отцепим их.
Storages tables_to_drop ;
2013-05-15 12:32:40 +00:00
{
Poco : : ScopedLock < Poco : : Mutex > lock ( context . getMutex ( ) ) ;
/// Если БД успели удалить, ничего не делаем.
if ( context . getDatabases ( ) . count ( source_database ) )
2013-02-08 17:06:29 +00:00
{
2013-05-15 12:32:40 +00:00
for ( size_t chunk_index = 0 ; chunk_index < chunks . size ( ) ; + + chunk_index )
{
StoragePtr src_storage = chunks [ chunk_index ] ;
std : : string src_name = src_storage - > getTableName ( ) ;
/// Если таблицу успели удалить, ничего не делаем.
2014-03-20 10:59:45 +00:00
if ( ! context . isTableExist ( source_database , src_name ) )
2013-05-15 12:32:40 +00:00
continue ;
2014-03-20 10:59:45 +00:00
/// Отцепляем исходную таблицу. Е е данные и метаданные остаются на диске.
tables_to_drop . push_back ( context . detachTable ( source_database , src_name ) ) ;
/// Создаем на е е месте ChunkRef. Это возможно только потому что у ChunkRef нет ни, ни метаданных.
2013-05-15 12:32:40 +00:00
try
{
2013-06-17 07:01:31 +00:00
context . addTable ( source_database , src_name , StorageChunkRef : : create ( src_name , context , source_database , new_table_name , false ) ) ;
2013-05-15 12:32:40 +00:00
}
2013-06-17 07:01:31 +00:00
catch ( . . . )
2013-05-15 12:32:40 +00:00
{
LOG_ERROR ( log , " Chunk " + src_name + " was removed but not replaced. Its data is stored in table " < < new_table_name < < " . You may need to resolve this manually. " ) ;
throw ;
}
}
2013-02-08 17:06:29 +00:00
}
2013-06-17 07:01:31 +00:00
2013-05-15 12:32:40 +00:00
currently_written_groups . erase ( new_table_full_name ) ;
2013-02-08 17:06:29 +00:00
}
2013-05-05 18:02:05 +00:00
2014-03-20 10:59:45 +00:00
/// Теперь удалим данные отцепленных таблиц.
2014-03-20 13:00:42 +00:00
table_locks . clear ( ) ;
2014-03-20 10:59:45 +00:00
for ( StoragePtr table : tables_to_drop )
{
InterpreterDropQuery : : dropDetachedTable ( source_database , table , context ) ;
/// NOTE: Если между подменой таблицы и этой строчкой кто-то успеет попытаться создать новую таблицу на е е месте,
/// что-нибудь может сломаться.
}
2013-10-22 19:11:03 +00:00
/// Сейчас на new_storage ссылаются таблицы типа ChunkRef. Удалим лишнюю ссылку, которая была при создании.
2013-06-17 07:01:31 +00:00
new_storage . removeReference ( ) ;
2013-05-05 18:02:05 +00:00
2013-05-15 12:32:40 +00:00
LOG_TRACE ( log , " Merged chunks. " ) ;
2013-05-15 13:45:44 +00:00
return true ;
2013-05-15 12:32:40 +00:00
}
catch ( . . . )
{
Poco : : ScopedLock < Poco : : Mutex > lock ( context . getMutex ( ) ) ;
2013-05-15 13:33:19 +00:00
currently_written_groups . erase ( new_table_full_name ) ;
2013-05-15 12:32:40 +00:00
throw ;
}
2013-02-08 17:06:29 +00:00
}
}