2014-10-26 00:01:36 +00:00
# include <DB/Interpreters/InterpreterSelectQuery.h>
2014-10-27 04:18:13 +00:00
# include <DB/Interpreters/InterpreterInsertQuery.h>
2014-10-26 00:01:36 +00:00
# include <DB/Interpreters/InterpreterAlterQuery.h>
2015-01-18 08:25:56 +00:00
# include <DB/DataStreams/IProfilingBlockInputStream.h>
2016-05-13 21:08:19 +00:00
# include <DB/Databases/IDatabase.h>
2014-10-26 00:01:36 +00:00
# include <DB/Storages/StorageBuffer.h>
2014-10-27 04:18:13 +00:00
# include <DB/Parsers/ASTInsertQuery.h>
2015-11-08 00:28:12 +00:00
# include <DB/Parsers/ASTIdentifier.h>
2016-11-20 12:43:20 +00:00
# include <DB/Parsers/ASTExpressionList.h>
2015-09-24 18:54:21 +00:00
# include <DB/Common/setThreadName.h>
2016-12-19 23:55:13 +00:00
# include <DB/Common/CurrentMetrics.h>
2017-01-21 04:24:28 +00:00
# include <common/logger_useful.h>
2014-10-26 00:01:36 +00:00
# include <Poco/Ext/ThreadNumber.h>
2015-10-05 00:33:43 +00:00
# include <ext/range.hpp>
2014-10-27 04:18:13 +00:00
2014-10-26 00:01:36 +00:00
2016-12-19 23:55:13 +00:00
namespace ProfileEvents
{
2017-03-05 01:23:10 +00:00
extern const Event StorageBufferFlush ;
2016-12-19 23:55:13 +00:00
extern const Event StorageBufferErrorOnFlush ;
2017-03-05 01:23:10 +00:00
extern const Event StorageBufferPassedAllMinThresholds ;
extern const Event StorageBufferPassedTimeMaxThreshold ;
extern const Event StorageBufferPassedRowsMaxThreshold ;
extern const Event StorageBufferPassedBytesMaxThreshold ;
2016-12-19 23:55:13 +00:00
}
namespace CurrentMetrics
{
extern const Metric StorageBufferRows ;
extern const Metric StorageBufferBytes ;
}
2014-10-26 00:01:36 +00:00
namespace DB
{
2016-01-11 21:46:36 +00:00
namespace ErrorCodes
{
extern const int INFINITE_LOOP ;
extern const int BLOCKS_HAS_DIFFERENT_STRUCTURE ;
}
2014-10-26 00:01:36 +00:00
2015-06-09 19:43:06 +00:00
StoragePtr StorageBuffer : : create ( const std : : string & name_ , NamesAndTypesListPtr columns_ ,
const NamesAndTypesList & materialized_columns_ ,
const NamesAndTypesList & alias_columns_ ,
const ColumnDefaults & column_defaults_ ,
Context & context_ ,
2014-10-26 00:01:36 +00:00
size_t num_shards_ , const Thresholds & min_thresholds_ , const Thresholds & max_thresholds_ ,
const String & destination_database_ , const String & destination_table_ )
{
2016-08-26 21:25:05 +00:00
return make_shared (
2015-06-09 19:43:06 +00:00
name_ , columns_ , materialized_columns_ , alias_columns_ , column_defaults_ ,
2016-08-30 19:28:32 +00:00
context_ , num_shards_ , min_thresholds_ , max_thresholds_ , destination_database_ , destination_table_ ) ;
2014-10-26 00:01:36 +00:00
}
2015-06-09 19:43:06 +00:00
StorageBuffer : : StorageBuffer ( const std : : string & name_ , NamesAndTypesListPtr columns_ ,
const NamesAndTypesList & materialized_columns_ ,
const NamesAndTypesList & alias_columns_ ,
const ColumnDefaults & column_defaults_ ,
Context & context_ ,
2014-10-26 00:01:36 +00:00
size_t num_shards_ , const Thresholds & min_thresholds_ , const Thresholds & max_thresholds_ ,
const String & destination_database_ , const String & destination_table_ )
2015-06-09 19:43:06 +00:00
: IStorage { materialized_columns_ , alias_columns_ , column_defaults_ } ,
name ( name_ ) , columns ( columns_ ) , context ( context_ ) ,
2014-10-26 00:01:36 +00:00
num_shards ( num_shards_ ) , buffers ( num_shards_ ) ,
min_thresholds ( min_thresholds_ ) , max_thresholds ( max_thresholds_ ) ,
destination_database ( destination_database_ ) , destination_table ( destination_table_ ) ,
no_destination ( destination_database . empty ( ) & & destination_table . empty ( ) ) ,
log ( & Logger : : get ( " StorageBuffer ( " + name + " ) " ) ) ,
2014-12-03 13:01:00 +00:00
flush_thread ( & StorageBuffer : : flushThread , this )
2014-10-26 00:01:36 +00:00
{
}
2017-03-12 19:18:07 +00:00
/// Reads from one buffer (from one block) under its mutex.
2014-10-26 00:01:36 +00:00
class BufferBlockInputStream : public IProfilingBlockInputStream
{
public :
BufferBlockInputStream ( const Names & column_names_ , StorageBuffer : : Buffer & buffer_ )
: column_names ( column_names_ . begin ( ) , column_names_ . end ( ) ) , buffer ( buffer_ ) { }
2015-06-08 20:22:02 +00:00
String getName ( ) const { return " Buffer " ; }
2014-10-26 00:01:36 +00:00
String getID ( ) const
{
std : : stringstream res ;
res < < " Buffer( " < < & buffer ;
for ( const auto & name : column_names )
res < < " , " < < name ;
res < < " ) " ;
return res . str ( ) ;
}
protected :
Block readImpl ( )
{
2014-10-26 00:12:39 +00:00
Block res ;
if ( has_been_read )
return res ;
has_been_read = true ;
2014-10-26 00:01:36 +00:00
std : : lock_guard < std : : mutex > lock ( buffer . mutex ) ;
2017-01-02 20:52:30 +00:00
if ( ! buffer . data . rows ( ) )
2014-10-26 00:01:36 +00:00
return res ;
2015-02-13 20:37:30 +00:00
for ( const auto & name : column_names )
2014-10-26 00:01:36 +00:00
{
2015-02-13 20:37:30 +00:00
auto & col = buffer . data . getByName ( name ) ;
2015-07-17 01:27:35 +00:00
res . insert ( ColumnWithTypeAndName ( col . column - > clone ( ) , col . type , name ) ) ;
2014-10-26 00:01:36 +00:00
}
return res ;
}
private :
NameSet column_names ;
StorageBuffer : : Buffer & buffer ;
2014-10-26 00:12:39 +00:00
bool has_been_read = false ;
2014-10-26 00:01:36 +00:00
} ;
BlockInputStreams StorageBuffer : : read (
const Names & column_names ,
ASTPtr query ,
2014-12-17 11:53:17 +00:00
const Context & context ,
2014-10-26 00:01:36 +00:00
const Settings & settings ,
QueryProcessingStage : : Enum & processed_stage ,
size_t max_block_size ,
unsigned threads )
{
processed_stage = QueryProcessingStage : : FetchColumns ;
BlockInputStreams streams_from_dst ;
if ( ! no_destination )
2015-02-27 20:39:34 +00:00
{
auto destination = context . getTable ( destination_database , destination_table ) ;
if ( destination . get ( ) = = this )
throw Exception ( " Destination table is myself. Read will cause infinite loop. " , ErrorCodes : : INFINITE_LOOP ) ;
2017-03-13 18:01:46 +00:00
/** Turn off the optimization "transfer to PREWHERE",
* since Buffer does not support PREWHERE .
2016-03-04 02:54:26 +00:00
*/
Settings modified_settings = settings ;
modified_settings . optimize_move_to_prewhere = false ;
streams_from_dst = destination - > read ( column_names , query , context , modified_settings , processed_stage , max_block_size , threads ) ;
2015-02-27 20:39:34 +00:00
}
2014-10-26 00:01:36 +00:00
BlockInputStreams streams_from_buffers ;
streams_from_buffers . reserve ( num_shards ) ;
for ( auto & buf : buffers )
2016-05-28 12:22:22 +00:00
streams_from_buffers . push_back ( std : : make_shared < BufferBlockInputStream > ( column_names , buf ) ) ;
2014-10-26 00:01:36 +00:00
2017-03-13 18:01:46 +00:00
/** If the sources from the table were processed before some non-initial stage of query execution,
* then sources from the buffers must also be wrapped in the processing pipeline before the same stage .
2014-10-26 00:01:36 +00:00
*/
if ( processed_stage > QueryProcessingStage : : FetchColumns )
for ( auto & stream : streams_from_buffers )
2015-06-18 02:11:05 +00:00
stream = InterpreterSelectQuery ( query , context , processed_stage , 0 , stream ) . execute ( ) . in ;
2014-10-26 00:01:36 +00:00
streams_from_dst . insert ( streams_from_dst . end ( ) , streams_from_buffers . begin ( ) , streams_from_buffers . end ( ) ) ;
return streams_from_dst ;
}
2014-10-27 04:18:13 +00:00
static void appendBlock ( const Block & from , Block & to )
{
2015-12-09 06:04:00 +00:00
if ( ! to )
throw Exception ( " Cannot append to empty block " , ErrorCodes : : LOGICAL_ERROR ) ;
2017-01-02 20:42:49 +00:00
from . checkNumberOfRows ( ) ;
to . checkNumberOfRows ( ) ;
2014-10-27 04:18:13 +00:00
size_t rows = from . rows ( ) ;
2016-12-19 23:55:13 +00:00
size_t bytes = from . bytes ( ) ;
CurrentMetrics : : add ( CurrentMetrics : : StorageBufferRows , rows ) ;
CurrentMetrics : : add ( CurrentMetrics : : StorageBufferBytes , bytes ) ;
2016-12-21 17:10:32 +00:00
size_t old_rows = to . rows ( ) ;
try
2014-10-27 04:18:13 +00:00
{
2016-12-21 17:10:32 +00:00
for ( size_t column_no = 0 , columns = to . columns ( ) ; column_no < columns ; + + column_no )
{
2017-01-02 20:12:12 +00:00
const IColumn & col_from = * from . safeGetByPosition ( column_no ) . column . get ( ) ;
IColumn & col_to = * to . safeGetByPosition ( column_no ) . column . get ( ) ;
2014-10-27 04:18:13 +00:00
2016-12-21 17:10:32 +00:00
if ( col_from . getName ( ) ! = col_to . getName ( ) )
throw Exception ( " Cannot append block to another: different type of columns at index " + toString ( column_no )
+ " . Block 1: " + from . dumpStructure ( ) + " . Block 2: " + to . dumpStructure ( ) , ErrorCodes : : BLOCKS_HAS_DIFFERENT_STRUCTURE ) ;
col_to . insertRangeFrom ( col_from , 0 , rows ) ;
}
}
catch ( . . . )
{
/// Rollback changes.
2016-12-22 00:13:59 +00:00
try
{
2017-03-13 20:38:42 +00:00
/// Avoid "memory limit exceeded" exceptions during rollback.
TemporarilyDisableMemoryTracker temporarily_disable_memory_tracker ;
2016-12-22 00:13:59 +00:00
for ( size_t column_no = 0 , columns = to . columns ( ) ; column_no < columns ; + + column_no )
{
2017-01-02 20:12:12 +00:00
ColumnPtr & col_to = to . safeGetByPosition ( column_no ) . column ;
2016-12-22 00:13:59 +00:00
if ( col_to - > size ( ) ! = old_rows )
col_to = col_to - > cut ( 0 , old_rows ) ;
}
}
catch ( . . . )
2016-12-21 17:10:32 +00:00
{
2016-12-22 00:13:59 +00:00
/// In case when we cannot rollback, do not leave incorrect state in memory.
std : : terminate ( ) ;
2016-12-21 17:10:32 +00:00
}
2014-10-27 04:18:13 +00:00
2016-12-21 17:10:32 +00:00
throw ;
2014-10-27 04:18:13 +00:00
}
}
2014-10-26 00:01:36 +00:00
class BufferBlockOutputStream : public IBlockOutputStream
{
public :
BufferBlockOutputStream ( StorageBuffer & storage_ ) : storage ( storage_ ) { }
2016-12-19 23:55:13 +00:00
void write ( const Block & block ) override
2014-10-26 00:01:36 +00:00
{
if ( ! block )
return ;
2017-01-02 20:52:30 +00:00
size_t rows = block . rows ( ) ;
2014-10-26 00:01:36 +00:00
if ( ! rows )
return ;
2014-10-27 04:18:13 +00:00
StoragePtr destination ;
if ( ! storage . no_destination )
{
destination = storage . context . tryGetTable ( storage . destination_database , storage . destination_table ) ;
2015-05-18 20:28:40 +00:00
if ( destination )
2014-10-27 04:18:13 +00:00
{
2015-05-18 20:28:40 +00:00
if ( destination . get ( ) = = & storage )
throw Exception ( " Destination table is myself. Write will cause infinite loop. " , ErrorCodes : : INFINITE_LOOP ) ;
2017-03-13 18:01:46 +00:00
/// Check table structure.
2015-05-18 20:28:40 +00:00
try
{
destination - > check ( block , true ) ;
}
catch ( Exception & e )
{
e . addMessage ( " (when looking at destination table " + storage . destination_database + " . " + storage . destination_table + " ) " ) ;
throw ;
}
2014-10-27 04:18:13 +00:00
}
}
2014-10-26 00:01:36 +00:00
size_t bytes = block . bytes ( ) ;
2017-03-13 18:01:46 +00:00
/// If the block already exceeds the maximum limit, then we skip the buffer.
2014-10-26 00:01:36 +00:00
if ( rows > storage . max_thresholds . rows | | bytes > storage . max_thresholds . bytes )
{
2014-10-27 04:18:13 +00:00
if ( ! storage . no_destination )
{
LOG_TRACE ( storage . log , " Writing block with " < < rows < < " rows, " < < bytes < < " bytes directly. " ) ;
storage . writeBlockToDestination ( block , destination ) ;
}
2014-10-26 00:01:36 +00:00
return ;
}
2017-03-13 18:01:46 +00:00
/// We distribute the load on the shards by the stream number.
2014-10-26 00:01:36 +00:00
const auto start_shard_num = Poco : : ThreadNumber : : get ( ) % storage . num_shards ;
2017-03-13 18:01:46 +00:00
/// We loop through the buffers, trying to lock mutex. No more than one lap.
2014-10-26 00:01:36 +00:00
auto shard_num = start_shard_num ;
size_t try_no = 0 ;
for ( ; try_no ! = storage . num_shards ; + + try_no )
{
std : : unique_lock < std : : mutex > lock ( storage . buffers [ shard_num ] . mutex , std : : try_to_lock_t ( ) ) ;
if ( lock . owns_lock ( ) )
{
insertIntoBuffer ( block , storage . buffers [ shard_num ] , std : : move ( lock ) ) ;
break ;
}
+ + shard_num ;
if ( shard_num = = storage . num_shards )
shard_num = 0 ;
}
2017-03-13 18:01:46 +00:00
/// If you still can not lock anything at once, then we'll wait on mutex.
2014-10-26 00:01:36 +00:00
if ( try_no = = storage . num_shards )
insertIntoBuffer ( block , storage . buffers [ start_shard_num ] , std : : unique_lock < std : : mutex > ( storage . buffers [ start_shard_num ] . mutex ) ) ;
}
private :
StorageBuffer & storage ;
void insertIntoBuffer ( const Block & block , StorageBuffer : : Buffer & buffer , std : : unique_lock < std : : mutex > & & lock )
{
2015-12-09 06:55:49 +00:00
time_t current_time = time ( 0 ) ;
2017-03-13 18:01:46 +00:00
/// Sort the columns in the block. This is necessary to make it easier to concatenate the blocks later.
2015-02-13 20:37:30 +00:00
Block sorted_block = block . sortColumns ( ) ;
2014-10-26 00:01:36 +00:00
if ( ! buffer . data )
{
2015-02-13 20:37:30 +00:00
buffer . data = sorted_block . cloneEmpty ( ) ;
2014-10-26 00:01:36 +00:00
}
2017-01-02 20:52:30 +00:00
else if ( storage . checkThresholds ( buffer , current_time , sorted_block . rows ( ) , sorted_block . bytes ( ) ) )
2014-10-26 00:01:36 +00:00
{
2017-03-13 18:01:46 +00:00
/** If, after inserting the buffer, the constraints are exceeded, then we will reset the buffer.
* This also protects against unlimited consumption of RAM , since if it is impossible to write to the table ,
* an exception will be thrown , and new data will not be added to the buffer .
2015-12-09 06:04:00 +00:00
*/
2014-10-26 00:01:36 +00:00
lock . unlock ( ) ;
2016-12-19 23:55:13 +00:00
storage . flushBuffer ( buffer , true ) ;
2015-09-01 21:48:38 +00:00
lock . lock ( ) ;
2014-10-26 00:01:36 +00:00
}
2015-09-01 21:48:38 +00:00
2015-12-09 06:10:13 +00:00
if ( ! buffer . first_write_time )
2015-12-09 06:55:49 +00:00
buffer . first_write_time = current_time ;
2015-12-09 06:10:13 +00:00
2015-09-01 21:48:38 +00:00
appendBlock ( sorted_block , buffer . data ) ;
2014-10-26 00:01:36 +00:00
}
} ;
2015-09-10 20:43:42 +00:00
BlockOutputStreamPtr StorageBuffer : : write ( ASTPtr query , const Settings & settings )
2014-10-26 00:01:36 +00:00
{
2016-05-28 12:22:22 +00:00
return std : : make_shared < BufferBlockOutputStream > ( * this ) ;
2014-10-26 00:01:36 +00:00
}
void StorageBuffer : : shutdown ( )
{
shutdown_event . set ( ) ;
if ( flush_thread . joinable ( ) )
flush_thread . join ( ) ;
2015-09-02 17:47:29 +00:00
try
{
2016-05-16 18:43:38 +00:00
optimize ( { } , { } , context . getSettings ( ) ) ;
2015-09-02 17:47:29 +00:00
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
2014-10-26 00:21:06 +00:00
}
2016-11-18 09:02:49 +00:00
/** NOTE If you do OPTIMIZE after insertion,
* it does not guarantee , that all data will be in destination table at the time of next SELECT just after OPTIMIZE .
*
* Because in case if there was already running flushBuffer method ,
* then call to flushBuffer inside OPTIMIZE will see empty buffer and return quickly ,
* but at the same time , the already running flushBuffer method possibly is not finished ,
* so next SELECT will observe missing data .
*
* This kind of race condition make very hard to implement proper tests .
*/
2016-05-16 18:43:38 +00:00
bool StorageBuffer : : optimize ( const String & partition , bool final , const Settings & settings )
2014-10-26 00:21:06 +00:00
{
2016-05-16 18:43:38 +00:00
if ( ! partition . empty ( ) )
throw Exception ( " Partition cannot be specified when optimizing table of type Buffer " , ErrorCodes : : NOT_IMPLEMENTED ) ;
if ( final )
throw Exception ( " FINAL cannot be specified when optimizing table of type Buffer " , ErrorCodes : : NOT_IMPLEMENTED ) ;
2014-10-26 00:21:06 +00:00
2016-05-16 18:43:38 +00:00
flushAllBuffers ( false ) ;
2014-10-26 00:21:06 +00:00
return true ;
2014-10-26 00:01:36 +00:00
}
2015-12-09 06:55:49 +00:00
bool StorageBuffer : : checkThresholds ( const Buffer & buffer , time_t current_time , size_t additional_rows , size_t additional_bytes ) const
2014-10-26 00:01:36 +00:00
{
time_t time_passed = 0 ;
if ( buffer . first_write_time )
time_passed = current_time - buffer . first_write_time ;
2017-01-02 20:52:30 +00:00
size_t rows = buffer . data . rows ( ) + additional_rows ;
2014-10-26 00:01:36 +00:00
size_t bytes = buffer . data . bytes ( ) + additional_bytes ;
2015-12-09 06:55:49 +00:00
return checkThresholdsImpl ( rows , bytes , time_passed ) ;
}
bool StorageBuffer : : checkThresholdsImpl ( size_t rows , size_t bytes , time_t time_passed ) const
{
2017-03-05 01:23:10 +00:00
if ( time_passed > min_thresholds . time & & rows > min_thresholds . rows & & bytes > min_thresholds . bytes )
{
ProfileEvents : : increment ( ProfileEvents : : StorageBufferPassedAllMinThresholds ) ;
return true ;
}
if ( time_passed > max_thresholds . time )
{
ProfileEvents : : increment ( ProfileEvents : : StorageBufferPassedTimeMaxThreshold ) ;
return true ;
}
if ( rows > max_thresholds . rows )
{
ProfileEvents : : increment ( ProfileEvents : : StorageBufferPassedRowsMaxThreshold ) ;
return true ;
}
if ( bytes > max_thresholds . bytes )
{
ProfileEvents : : increment ( ProfileEvents : : StorageBufferPassedBytesMaxThreshold ) ;
return true ;
}
return false ;
2014-10-26 00:01:36 +00:00
}
2014-12-03 13:28:17 +00:00
void StorageBuffer : : flushAllBuffers ( const bool check_thresholds )
{
for ( auto & buf : buffers )
flushBuffer ( buf , check_thresholds ) ;
}
2014-10-26 00:01:36 +00:00
void StorageBuffer : : flushBuffer ( Buffer & buffer , bool check_thresholds )
{
2015-12-11 02:19:32 +00:00
Block block_to_write ;
2015-12-09 06:55:49 +00:00
time_t current_time = time ( 0 ) ;
2014-10-26 00:01:36 +00:00
2015-12-09 06:16:24 +00:00
size_t rows = 0 ;
size_t bytes = 0 ;
time_t time_passed = 0 ;
2014-10-26 00:01:36 +00:00
{
std : : lock_guard < std : : mutex > lock ( buffer . mutex ) ;
2015-12-11 02:19:32 +00:00
block_to_write = buffer . data . cloneEmpty ( ) ;
2017-01-02 20:52:30 +00:00
rows = buffer . data . rows ( ) ;
2015-12-09 06:16:24 +00:00
bytes = buffer . data . bytes ( ) ;
if ( buffer . first_write_time )
time_passed = current_time - buffer . first_write_time ;
2015-09-01 21:48:38 +00:00
if ( check_thresholds )
{
2015-12-09 06:55:49 +00:00
if ( ! checkThresholdsImpl ( rows , bytes , time_passed ) )
2015-09-01 21:48:38 +00:00
return ;
}
else
{
2015-12-09 06:16:24 +00:00
if ( rows = = 0 )
2015-09-01 21:48:38 +00:00
return ;
}
2014-10-26 00:01:36 +00:00
buffer . data . swap ( block_to_write ) ;
buffer . first_write_time = 0 ;
2016-12-19 23:55:13 +00:00
CurrentMetrics : : sub ( CurrentMetrics : : StorageBufferRows , block_to_write . rows ( ) ) ;
CurrentMetrics : : sub ( CurrentMetrics : : StorageBufferBytes , block_to_write . bytes ( ) ) ;
2015-12-09 06:16:24 +00:00
2017-03-05 01:23:10 +00:00
ProfileEvents : : increment ( ProfileEvents : : StorageBufferFlush ) ;
2016-12-19 23:55:13 +00:00
LOG_TRACE ( log , " Flushing buffer with " < < rows < < " rows, " < < bytes < < " bytes, age " < < time_passed < < " seconds. " ) ;
2014-10-27 04:18:13 +00:00
2016-12-19 23:55:13 +00:00
if ( no_destination )
return ;
2014-10-27 04:18:13 +00:00
2016-12-19 23:55:13 +00:00
/** For simplicity, buffer is locked during write.
* We could unlock buffer temporary , but it would lead to too much difficulties :
* - data , that is written , will not be visible for SELECTs ;
* - new data could be appended to buffer , and in case of exception , we must merge it with old data , that has not been written ;
* - this could lead to infinite memory growth .
*/
try
2014-10-27 04:18:13 +00:00
{
2016-12-19 23:55:13 +00:00
writeBlockToDestination ( block_to_write , context . tryGetTable ( destination_database , destination_table ) ) ;
2014-10-27 04:18:13 +00:00
}
2016-12-19 23:55:13 +00:00
catch ( . . . )
{
ProfileEvents : : increment ( ProfileEvents : : StorageBufferErrorOnFlush ) ;
2014-10-27 04:18:13 +00:00
2017-03-12 19:18:07 +00:00
/// Return the block to its place in the buffer.
2015-09-01 21:48:38 +00:00
2016-12-19 23:55:13 +00:00
CurrentMetrics : : add ( CurrentMetrics : : StorageBufferRows , block_to_write . rows ( ) ) ;
CurrentMetrics : : add ( CurrentMetrics : : StorageBufferBytes , block_to_write . bytes ( ) ) ;
2014-10-27 04:18:13 +00:00
2016-12-19 23:55:13 +00:00
buffer . data . swap ( block_to_write ) ;
if ( ! buffer . first_write_time )
buffer . first_write_time = current_time ;
2017-03-13 18:01:46 +00:00
/// After a while, the next write attempt will happen.
2016-12-19 23:55:13 +00:00
throw ;
}
2014-10-27 04:18:13 +00:00
}
}
2014-10-26 00:01:36 +00:00
2014-10-27 04:18:13 +00:00
void StorageBuffer : : writeBlockToDestination ( const Block & block , StoragePtr table )
{
if ( no_destination | | ! block )
2014-10-26 00:01:36 +00:00
return ;
2014-10-27 04:18:13 +00:00
if ( ! table )
{
LOG_ERROR ( log , " Destination table " < < destination_database < < " . " < < destination_table < < " doesn't exist. Block of data is discarded. " ) ;
return ;
}
2016-05-28 15:42:22 +00:00
auto insert = std : : make_shared < ASTInsertQuery > ( ) ;
2014-10-27 04:18:13 +00:00
insert - > database = destination_database ;
insert - > table = destination_table ;
2017-03-13 18:01:46 +00:00
/** We will insert columns that are the intersection set of columns of the buffer table and the subordinate table.
* This will support some of the cases ( but not all ) when the table structure does not match .
2014-10-27 04:18:13 +00:00
*/
Block structure_of_destination_table = table - > getSampleBlock ( ) ;
Names columns_intersection ;
columns_intersection . reserve ( block . columns ( ) ) ;
for ( size_t i : ext : : range ( 0 , structure_of_destination_table . columns ( ) ) )
{
2017-01-02 20:12:12 +00:00
auto dst_col = structure_of_destination_table . getByPosition ( i ) ;
2014-10-27 04:18:13 +00:00
if ( block . has ( dst_col . name ) )
{
if ( block . getByName ( dst_col . name ) . type - > getName ( ) ! = dst_col . type - > getName ( ) )
{
LOG_ERROR ( log , " Destination table " < < destination_database < < " . " < < destination_table
< < " have different type of column " < < dst_col . name < < " . Block of data is discarded. " ) ;
return ;
}
columns_intersection . push_back ( dst_col . name ) ;
}
2014-10-26 00:01:36 +00:00
}
2014-10-27 04:18:13 +00:00
if ( columns_intersection . empty ( ) )
2014-10-26 00:01:36 +00:00
{
2014-10-27 04:18:13 +00:00
LOG_ERROR ( log , " Destination table " < < destination_database < < " . " < < destination_table < < " have no common columns with block in buffer. Block of data is discarded. " ) ;
return ;
2014-10-26 00:01:36 +00:00
}
2014-10-27 04:18:13 +00:00
if ( columns_intersection . size ( ) ! = block . columns ( ) )
LOG_WARNING ( log , " Not all columns from block in buffer exist in destination table "
< < destination_database < < " . " < < destination_table < < " . Some columns are discarded. " ) ;
2016-05-28 15:42:22 +00:00
auto list_of_columns = std : : make_shared < ASTExpressionList > ( ) ;
2014-10-27 04:18:13 +00:00
insert - > columns = list_of_columns ;
list_of_columns - > children . reserve ( columns_intersection . size ( ) ) ;
for ( const String & column : columns_intersection )
2016-05-28 15:42:22 +00:00
list_of_columns - > children . push_back ( std : : make_shared < ASTIdentifier > ( StringRange ( ) , column , ASTIdentifier : : Column ) ) ;
2014-10-27 04:18:13 +00:00
2016-05-28 15:42:22 +00:00
InterpreterInsertQuery interpreter { insert , context } ;
2014-10-27 04:18:13 +00:00
auto block_io = interpreter . execute ( ) ;
block_io . out - > writePrefix ( ) ;
block_io . out - > write ( block ) ;
block_io . out - > writeSuffix ( ) ;
2014-10-26 00:01:36 +00:00
}
void StorageBuffer : : flushThread ( )
{
2015-09-24 18:54:21 +00:00
setThreadName ( " BufferFlush " ) ;
2014-10-26 00:01:36 +00:00
do
{
try
{
2014-12-03 13:28:17 +00:00
flushAllBuffers ( true ) ;
2014-10-26 00:01:36 +00:00
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
} while ( ! shutdown_event . tryWait ( 1000 ) ) ;
}
2016-01-28 01:00:27 +00:00
void StorageBuffer : : alter ( const AlterCommands & params , const String & database_name , const String & table_name , const Context & context )
2014-10-26 00:01:36 +00:00
{
2016-05-05 18:28:46 +00:00
for ( const auto & param : params )
if ( param . type = = AlterCommand : : MODIFY_PRIMARY_KEY )
throw Exception ( " Storage engine " + getName ( ) + " doesn't support primary key. " , ErrorCodes : : NOT_IMPLEMENTED ) ;
2014-10-26 00:01:36 +00:00
auto lock = lockStructureForAlter ( ) ;
2014-10-27 04:18:13 +00:00
2017-03-13 18:01:46 +00:00
/// So that no blocks of the old structure remain.
2016-05-16 18:43:38 +00:00
optimize ( { } , { } , context . getSettings ( ) ) ;
2014-10-27 04:18:13 +00:00
2014-11-12 10:37:47 +00:00
params . apply ( * columns , materialized_columns , alias_columns , column_defaults ) ;
2016-05-13 21:08:19 +00:00
context . getDatabase ( database_name ) - > alterTable (
context , table_name ,
* columns , materialized_columns , alias_columns , column_defaults , { } ) ;
2014-10-26 00:01:36 +00:00
}
}