2014-10-26 00:01:36 +00:00
# include <DB/Interpreters/InterpreterSelectQuery.h>
2014-10-27 04:18:13 +00:00
# include <DB/Interpreters/InterpreterInsertQuery.h>
2014-10-26 00:01:36 +00:00
# include <DB/Interpreters/InterpreterAlterQuery.h>
# include <DB/Storages/StorageBuffer.h>
2014-10-27 04:18:13 +00:00
# include <DB/Parsers/ASTInsertQuery.h>
2014-10-26 00:01:36 +00:00
# include <Poco/Ext/ThreadNumber.h>
2014-10-27 04:18:13 +00:00
# include <statdaemons/ext/range.hpp>
2014-10-26 00:01:36 +00:00
namespace DB
{
StoragePtr StorageBuffer : : create ( const std : : string & name_ , NamesAndTypesListPtr columns_ , Context & context_ ,
size_t num_shards_ , const Thresholds & min_thresholds_ , const Thresholds & max_thresholds_ ,
const String & destination_database_ , const String & destination_table_ )
{
return ( new StorageBuffer {
name_ , columns_ , context_ , num_shards_ , min_thresholds_ , max_thresholds_ , destination_database_ , destination_table_ } ) - > thisPtr ( ) ;
}
StorageBuffer : : StorageBuffer ( const std : : string & name_ , NamesAndTypesListPtr columns_ , Context & context_ ,
size_t num_shards_ , const Thresholds & min_thresholds_ , const Thresholds & max_thresholds_ ,
const String & destination_database_ , const String & destination_table_ )
: name ( name_ ) , columns ( columns_ ) , context ( context_ ) ,
num_shards ( num_shards_ ) , buffers ( num_shards_ ) ,
min_thresholds ( min_thresholds_ ) , max_thresholds ( max_thresholds_ ) ,
destination_database ( destination_database_ ) , destination_table ( destination_table_ ) ,
no_destination ( destination_database . empty ( ) & & destination_table . empty ( ) ) ,
log ( & Logger : : get ( " StorageBuffer ( " + name + " ) " ) ) ,
2014-12-03 13:01:00 +00:00
flush_thread ( & StorageBuffer : : flushThread , this )
2014-10-26 00:01:36 +00:00
{
}
/// Читает из одного буфера (из одного блока) под е г о mutex-ом.
class BufferBlockInputStream : public IProfilingBlockInputStream
{
public :
BufferBlockInputStream ( const Names & column_names_ , StorageBuffer : : Buffer & buffer_ )
: column_names ( column_names_ . begin ( ) , column_names_ . end ( ) ) , buffer ( buffer_ ) { }
String getName ( ) const { return " BufferBlockInputStream " ; }
String getID ( ) const
{
std : : stringstream res ;
res < < " Buffer( " < < & buffer ;
for ( const auto & name : column_names )
res < < " , " < < name ;
res < < " ) " ;
return res . str ( ) ;
}
protected :
Block readImpl ( )
{
2014-10-26 00:12:39 +00:00
Block res ;
if ( has_been_read )
return res ;
has_been_read = true ;
2014-10-26 00:01:36 +00:00
std : : lock_guard < std : : mutex > lock ( buffer . mutex ) ;
if ( ! buffer . data )
return res ;
for ( size_t i = 0 , size = buffer . data . columns ( ) ; i < size ; + + i )
{
auto & col = buffer . data . unsafeGetByPosition ( i ) ;
if ( column_names . count ( col . name ) )
res . insert ( col ) ;
}
return res ;
}
private :
NameSet column_names ;
StorageBuffer : : Buffer & buffer ;
2014-10-26 00:12:39 +00:00
bool has_been_read = false ;
2014-10-26 00:01:36 +00:00
} ;
BlockInputStreams StorageBuffer : : read (
const Names & column_names ,
ASTPtr query ,
2014-12-17 11:53:17 +00:00
const Context & context ,
2014-10-26 00:01:36 +00:00
const Settings & settings ,
QueryProcessingStage : : Enum & processed_stage ,
size_t max_block_size ,
unsigned threads )
{
processed_stage = QueryProcessingStage : : FetchColumns ;
BlockInputStreams streams_from_dst ;
if ( ! no_destination )
streams_from_dst = context . getTable ( destination_database , destination_table ) - > read (
2014-12-17 11:53:17 +00:00
column_names , query , context , settings , processed_stage , max_block_size , threads ) ;
2014-10-26 00:01:36 +00:00
BlockInputStreams streams_from_buffers ;
streams_from_buffers . reserve ( num_shards ) ;
for ( auto & buf : buffers )
streams_from_buffers . push_back ( new BufferBlockInputStream ( column_names , buf ) ) ;
/** Если источники из таблицы были обработаны до какой-то не начальной стадии выполнения запроса,
* т о т о г д а и с т о ч н и к и и з б у ф е р о в н а д о т о ж е о б е р н у т ь в к о н в е й е р о б р а б о т к и д о т о й ж е с т а д и и .
*/
if ( processed_stage > QueryProcessingStage : : FetchColumns )
for ( auto & stream : streams_from_buffers )
stream = InterpreterSelectQuery ( query , context , processed_stage , 0 , stream ) . execute ( ) ;
streams_from_dst . insert ( streams_from_dst . end ( ) , streams_from_buffers . begin ( ) , streams_from_buffers . end ( ) ) ;
return streams_from_dst ;
}
2014-10-27 04:18:13 +00:00
static void appendBlock ( const Block & from , Block & to )
{
size_t rows = from . rows ( ) ;
for ( size_t column_no = 0 , columns = to . columns ( ) ; column_no < columns ; + + column_no )
{
const IColumn & col_from = * from . getByPosition ( column_no ) . column . get ( ) ;
IColumn & col_to = * to . getByPosition ( column_no ) . column . get ( ) ;
if ( col_from . getName ( ) ! = col_to . getName ( ) )
throw Exception ( " Cannot append block to another: different type of columns at index " + toString ( column_no )
+ " . Block 1: " + from . dumpStructure ( ) + " . Block 2: " + to . dumpStructure ( ) , ErrorCodes : : BLOCKS_HAS_DIFFERENT_STRUCTURE ) ;
for ( size_t row_no = 0 ; row_no < rows ; + + row_no )
col_to . insertFrom ( col_from , row_no ) ;
}
}
2014-10-26 00:01:36 +00:00
class BufferBlockOutputStream : public IBlockOutputStream
{
public :
BufferBlockOutputStream ( StorageBuffer & storage_ ) : storage ( storage_ ) { }
void write ( const Block & block )
{
if ( ! block )
return ;
size_t rows = block . rowsInFirstColumn ( ) ;
if ( ! rows )
return ;
2014-10-27 04:18:13 +00:00
StoragePtr destination ;
if ( ! storage . no_destination )
{
destination = storage . context . tryGetTable ( storage . destination_database , storage . destination_table ) ;
/// Проверяем структуру таблицы.
try
{
destination - > check ( block , true ) ;
}
catch ( Exception & e )
{
e . addMessage ( " (when looking at destination table " + storage . destination_database + " . " + storage . destination_table + " ) " ) ;
throw ;
}
}
2014-10-26 00:01:36 +00:00
size_t bytes = block . bytes ( ) ;
/// Если блок уже превышает максимальные ограничения, то пишем минуя буфер.
if ( rows > storage . max_thresholds . rows | | bytes > storage . max_thresholds . bytes )
{
2014-10-27 04:18:13 +00:00
if ( ! storage . no_destination )
{
LOG_TRACE ( storage . log , " Writing block with " < < rows < < " rows, " < < bytes < < " bytes directly. " ) ;
storage . writeBlockToDestination ( block , destination ) ;
}
2014-10-26 00:01:36 +00:00
return ;
}
/// Распределяем нагрузку по шардам по номеру потока.
const auto start_shard_num = Poco : : ThreadNumber : : get ( ) % storage . num_shards ;
/// Перебираем буферы по кругу, пытаясь заблокировать mutex. Н е более одного круга.
auto shard_num = start_shard_num ;
size_t try_no = 0 ;
for ( ; try_no ! = storage . num_shards ; + + try_no )
{
std : : unique_lock < std : : mutex > lock ( storage . buffers [ shard_num ] . mutex , std : : try_to_lock_t ( ) ) ;
if ( lock . owns_lock ( ) )
{
insertIntoBuffer ( block , storage . buffers [ shard_num ] , std : : move ( lock ) ) ;
break ;
}
+ + shard_num ;
if ( shard_num = = storage . num_shards )
shard_num = 0 ;
}
/// Если так и не удалось ничего сразу заблокировать, то будем ждать на mutex-е .
if ( try_no = = storage . num_shards )
insertIntoBuffer ( block , storage . buffers [ start_shard_num ] , std : : unique_lock < std : : mutex > ( storage . buffers [ start_shard_num ] . mutex ) ) ;
}
private :
StorageBuffer & storage ;
void insertIntoBuffer ( const Block & block , StorageBuffer : : Buffer & buffer , std : : unique_lock < std : : mutex > & & lock )
{
if ( ! buffer . data )
{
buffer . first_write_time = time ( 0 ) ;
buffer . data = block . cloneEmpty ( ) ;
}
/// Если после вставки в буфер, ограничения будут превышены, то будем сбрасывать буфер.
if ( storage . checkThresholds ( buffer , time ( 0 ) , block . rowsInFirstColumn ( ) , block . bytes ( ) ) )
{
/// Вытащим из буфера блок, заменим буфер на пустой. После этого можно разблокировать mutex.
Block block_to_write ;
buffer . data . swap ( block_to_write ) ;
buffer . first_write_time = 0 ;
lock . unlock ( ) ;
2014-10-27 04:18:13 +00:00
if ( ! storage . no_destination )
{
appendBlock ( block , block_to_write ) ;
storage . writeBlockToDestination ( block_to_write ,
storage . context . tryGetTable ( storage . destination_database , storage . destination_table ) ) ;
}
2014-10-26 00:01:36 +00:00
}
else
appendBlock ( block , buffer . data ) ;
}
} ;
BlockOutputStreamPtr StorageBuffer : : write ( ASTPtr query )
{
return new BufferBlockOutputStream ( * this ) ;
}
void StorageBuffer : : shutdown ( )
{
shutdown_event . set ( ) ;
if ( flush_thread . joinable ( ) )
flush_thread . join ( ) ;
2014-10-26 00:21:06 +00:00
optimize ( ) ;
}
bool StorageBuffer : : optimize ( )
{
2014-12-03 13:28:17 +00:00
flushAllBuffers ( false ) ;
2014-10-26 00:21:06 +00:00
return true ;
2014-10-26 00:01:36 +00:00
}
bool StorageBuffer : : checkThresholds ( Buffer & buffer , time_t current_time , size_t additional_rows , size_t additional_bytes )
{
time_t time_passed = 0 ;
if ( buffer . first_write_time )
time_passed = current_time - buffer . first_write_time ;
size_t rows = buffer . data . rowsInFirstColumn ( ) + additional_rows ;
size_t bytes = buffer . data . bytes ( ) + additional_bytes ;
bool res =
( time_passed > min_thresholds . time & & rows > min_thresholds . rows & & bytes > min_thresholds . bytes )
| | ( time_passed > max_thresholds . time | | rows > max_thresholds . rows | | bytes > max_thresholds . bytes ) ;
if ( res )
LOG_TRACE ( log , " Flushing buffer with " < < rows < < " rows, " < < bytes < < " bytes, age " < < time_passed < < " seconds. " ) ;
return res ;
}
2014-12-03 13:28:17 +00:00
void StorageBuffer : : flushAllBuffers ( const bool check_thresholds )
{
for ( auto & buf : buffers )
flushBuffer ( buf , check_thresholds ) ;
}
2014-10-26 00:01:36 +00:00
void StorageBuffer : : flushBuffer ( Buffer & buffer , bool check_thresholds )
{
Block block_to_write ;
time_t current_time = check_thresholds ? time ( 0 ) : 0 ;
2014-10-27 04:18:13 +00:00
/** Довольно много проблем из-за того, что хотим блокировать буфер лишь на короткое время.
* П о д б л о к и р о в к о й , п о л у ч а е м и з б у ф е р а б л о к , и з а м е н я е м в н ё м б л о к н а н о в ы й п у с т о й .
* З а т е м п ы т а е м с я з а п и с а т ь п о л у ч е н н ы й б л о к в п о д ч и н ё н н у ю т а б л и ц у .
* Е с л и э т о г о н е п о л у ч и л о с ь - к л а д ё м д а н н ы е о б р а т н о в б у ф е р .
* З а м е ч а н и е : м о ж е т б ы т ь , с т о и т и з б а в и т ь с я о т т а к о й с л о ж н о с т и .
*/
2014-10-26 00:01:36 +00:00
{
std : : lock_guard < std : : mutex > lock ( buffer . mutex ) ;
if ( check_thresholds & & ! checkThresholds ( buffer , current_time ) )
return ;
buffer . data . swap ( block_to_write ) ;
buffer . first_write_time = 0 ;
}
2014-10-27 04:18:13 +00:00
if ( no_destination )
return ;
try
2014-10-26 00:01:36 +00:00
{
2014-10-27 04:18:13 +00:00
writeBlockToDestination ( block_to_write , context . tryGetTable ( destination_database , destination_table ) ) ;
}
catch ( . . . )
{
/// Возвращаем блок на место в буфер.
std : : lock_guard < std : : mutex > lock ( buffer . mutex ) ;
if ( buffer . data )
{
/** Так как структура таблицы не изменилась, можно склеить два блока.
* З а м е ч а н и е : о с т а ё т с я п р о б л е м а - и з - з а т о г о , ч т о в р а з н ы х п о п ы т к а х в с т а в л я ю т с я р а з н ы е б л о к и ,
* т е р я е т с я и д е м п о т е н т н о с т ь в с т а в к и в ReplicatedMergeTree .
*/
appendBlock ( block_to_write , buffer . data ) ;
buffer . data . swap ( block_to_write ) ;
}
if ( ! buffer . first_write_time )
buffer . first_write_time = current_time ;
/// Через некоторое время будет следующая попытка записать.
throw ;
}
}
2014-10-26 00:01:36 +00:00
2014-10-27 04:18:13 +00:00
void StorageBuffer : : writeBlockToDestination ( const Block & block , StoragePtr table )
{
if ( no_destination | | ! block )
2014-10-26 00:01:36 +00:00
return ;
2014-10-27 04:18:13 +00:00
if ( ! table )
{
LOG_ERROR ( log , " Destination table " < < destination_database < < " . " < < destination_table < < " doesn't exist. Block of data is discarded. " ) ;
return ;
}
ASTInsertQuery * insert = new ASTInsertQuery ;
ASTPtr ast_ptr = insert ;
insert - > database = destination_database ;
insert - > table = destination_table ;
/** Будем вставлять столбцы, являющиеся пересечением множества столбцов таблицы-буфера и подчинённой таблицы.
* Э т о п о з в о л и т п о д д е р ж а т ь ч а с т ь с л у ч а е в ( н о н е в с е ) , к о г д а с т р у к т у р а т а б л и ц ы н е с о в п а д а е т .
*/
Block structure_of_destination_table = table - > getSampleBlock ( ) ;
Names columns_intersection ;
columns_intersection . reserve ( block . columns ( ) ) ;
for ( size_t i : ext : : range ( 0 , structure_of_destination_table . columns ( ) ) )
{
auto dst_col = structure_of_destination_table . unsafeGetByPosition ( i ) ;
if ( block . has ( dst_col . name ) )
{
if ( block . getByName ( dst_col . name ) . type - > getName ( ) ! = dst_col . type - > getName ( ) )
{
LOG_ERROR ( log , " Destination table " < < destination_database < < " . " < < destination_table
< < " have different type of column " < < dst_col . name < < " . Block of data is discarded. " ) ;
return ;
}
columns_intersection . push_back ( dst_col . name ) ;
}
2014-10-26 00:01:36 +00:00
}
2014-10-27 04:18:13 +00:00
if ( columns_intersection . empty ( ) )
2014-10-26 00:01:36 +00:00
{
2014-10-27 04:18:13 +00:00
LOG_ERROR ( log , " Destination table " < < destination_database < < " . " < < destination_table < < " have no common columns with block in buffer. Block of data is discarded. " ) ;
return ;
2014-10-26 00:01:36 +00:00
}
2014-10-27 04:18:13 +00:00
if ( columns_intersection . size ( ) ! = block . columns ( ) )
LOG_WARNING ( log , " Not all columns from block in buffer exist in destination table "
< < destination_database < < " . " < < destination_table < < " . Some columns are discarded. " ) ;
ASTExpressionList * list_of_columns = new ASTExpressionList ;
insert - > columns = list_of_columns ;
list_of_columns - > children . reserve ( columns_intersection . size ( ) ) ;
for ( const String & column : columns_intersection )
list_of_columns - > children . push_back ( new ASTIdentifier ( StringRange ( ) , column , ASTIdentifier : : Column ) ) ;
InterpreterInsertQuery interpreter { ast_ptr , context } ;
auto block_io = interpreter . execute ( ) ;
block_io . out - > writePrefix ( ) ;
block_io . out - > write ( block ) ;
block_io . out - > writeSuffix ( ) ;
2014-10-26 00:01:36 +00:00
}
void StorageBuffer : : flushThread ( )
{
do
{
try
{
2014-12-03 13:28:17 +00:00
flushAllBuffers ( true ) ;
2014-10-26 00:01:36 +00:00
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
} while ( ! shutdown_event . tryWait ( 1000 ) ) ;
}
void StorageBuffer : : alter ( const AlterCommands & params , const String & database_name , const String & table_name , Context & context )
{
auto lock = lockStructureForAlter ( ) ;
2014-10-27 04:18:13 +00:00
/// Чтобы не осталось блоков старой структуры.
optimize ( ) ;
2014-11-12 10:37:47 +00:00
params . apply ( * columns , materialized_columns , alias_columns , column_defaults ) ;
InterpreterAlterQuery : : updateMetadata ( database_name , table_name ,
* columns , materialized_columns , alias_columns , column_defaults , context ) ;
2014-10-26 00:01:36 +00:00
}
}