2017-04-01 09:19:00 +00:00
# include <Interpreters/InterpreterSelectQuery.h>
# include <Interpreters/InterpreterInsertQuery.h>
# include <Interpreters/InterpreterAlterQuery.h>
2017-12-30 00:36:06 +00:00
# include <Interpreters/evaluateConstantExpression.h>
2017-04-01 09:19:00 +00:00
# include <DataStreams/IProfilingBlockInputStream.h>
# include <Databases/IDatabase.h>
# include <Storages/StorageBuffer.h>
2017-12-28 21:36:27 +00:00
# include <Storages/StorageFactory.h>
2017-04-01 09:19:00 +00:00
# include <Parsers/ASTInsertQuery.h>
# include <Parsers/ASTIdentifier.h>
2017-12-30 00:36:06 +00:00
# include <Parsers/ASTLiteral.h>
2017-04-01 09:19:00 +00:00
# include <Parsers/ASTExpressionList.h>
# include <Common/setThreadName.h>
# include <Common/CurrentMetrics.h>
2017-04-08 01:32:05 +00:00
# include <Common/MemoryTracker.h>
2017-12-30 00:36:06 +00:00
# include <Common/FieldVisitors.h>
# include <Common/typeid_cast.h>
2018-06-05 19:46:49 +00:00
# include <Common/ProfileEvents.h>
2017-01-21 04:24:28 +00:00
# include <common/logger_useful.h>
2014-10-26 00:01:36 +00:00
# include <Poco/Ext/ThreadNumber.h>
2017-06-06 17:18:32 +00:00
# include <ext/range.h>
2014-10-27 04:18:13 +00:00
2014-10-26 00:01:36 +00:00
2016-12-19 23:55:13 +00:00
namespace ProfileEvents
{
2017-04-01 07:20:54 +00:00
extern const Event StorageBufferFlush ;
extern const Event StorageBufferErrorOnFlush ;
extern const Event StorageBufferPassedAllMinThresholds ;
extern const Event StorageBufferPassedTimeMaxThreshold ;
extern const Event StorageBufferPassedRowsMaxThreshold ;
extern const Event StorageBufferPassedBytesMaxThreshold ;
2016-12-19 23:55:13 +00:00
}
namespace CurrentMetrics
{
2017-04-01 07:20:54 +00:00
extern const Metric StorageBufferRows ;
extern const Metric StorageBufferBytes ;
2016-12-19 23:55:13 +00:00
}
2014-10-26 00:01:36 +00:00
namespace DB
{
2016-01-11 21:46:36 +00:00
namespace ErrorCodes
{
2017-04-01 07:20:54 +00:00
extern const int INFINITE_LOOP ;
2017-12-30 00:36:06 +00:00
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH ;
2016-01-11 21:46:36 +00:00
}
2014-10-26 00:01:36 +00:00
2018-03-06 20:18:34 +00:00
StorageBuffer : : StorageBuffer ( const std : : string & name_ , const ColumnsDescription & columns_ ,
2017-04-01 07:20:54 +00:00
Context & context_ ,
size_t num_shards_ , const Thresholds & min_thresholds_ , const Thresholds & max_thresholds_ ,
2018-01-12 13:03:19 +00:00
const String & destination_database_ , const String & destination_table_ , bool allow_materialized_ )
2018-03-06 20:18:34 +00:00
: IStorage { columns_ } ,
2018-01-25 14:42:39 +00:00
name ( name_ ) , context ( context_ ) ,
2017-04-01 07:20:54 +00:00
num_shards ( num_shards_ ) , buffers ( num_shards_ ) ,
min_thresholds ( min_thresholds_ ) , max_thresholds ( max_thresholds_ ) ,
destination_database ( destination_database_ ) , destination_table ( destination_table_ ) ,
no_destination ( destination_database . empty ( ) & & destination_table . empty ( ) ) ,
2018-01-12 13:03:19 +00:00
allow_materialized ( allow_materialized_ ) , log ( & Logger : : get ( " StorageBuffer ( " + name + " ) " ) )
2014-10-26 00:01:36 +00:00
{
}
2018-09-06 15:06:54 +00:00
StorageBuffer : : ~ StorageBuffer ( )
{
// Should not happen if shutdown was called
if ( flush_thread . joinable ( ) )
{
shutdown_event . set ( ) ;
flush_thread . join ( ) ;
}
}
2014-10-26 00:01:36 +00:00
2017-03-12 19:18:07 +00:00
/// Reads from one buffer (from one block) under its mutex.
2014-10-26 00:01:36 +00:00
class BufferBlockInputStream : public IProfilingBlockInputStream
{
public :
2018-01-06 18:10:44 +00:00
BufferBlockInputStream ( const Names & column_names_ , StorageBuffer : : Buffer & buffer_ , const StorageBuffer & storage_ )
: column_names ( column_names_ . begin ( ) , column_names_ . end ( ) ) , buffer ( buffer_ ) , storage ( storage_ ) { }
2014-10-26 00:01:36 +00:00
2018-01-06 18:10:44 +00:00
String getName ( ) const override { return " Buffer " ; }
2014-10-26 00:01:36 +00:00
2018-06-03 20:39:06 +00:00
Block getHeader ( ) const override { return storage . getSampleBlockForColumns ( column_names ) ; }
2018-01-06 18:10:44 +00:00
2014-10-26 00:01:36 +00:00
protected :
2018-01-06 18:10:44 +00:00
Block readImpl ( ) override
2017-04-01 07:20:54 +00:00
{
Block res ;
2014-10-26 00:12:39 +00:00
2017-04-01 07:20:54 +00:00
if ( has_been_read )
return res ;
has_been_read = true ;
2014-10-26 00:12:39 +00:00
2017-04-01 07:20:54 +00:00
std : : lock_guard < std : : mutex > lock ( buffer . mutex ) ;
2014-10-26 00:01:36 +00:00
2017-04-01 07:20:54 +00:00
if ( ! buffer . data . rows ( ) )
return res ;
2014-10-26 00:01:36 +00:00
2017-04-01 07:20:54 +00:00
for ( const auto & name : column_names )
2017-12-15 20:48:46 +00:00
res . insert ( buffer . data . getByName ( name ) ) ;
2014-10-26 00:01:36 +00:00
2017-04-01 07:20:54 +00:00
return res ;
}
2014-10-26 00:01:36 +00:00
private :
2017-04-11 18:40:47 +00:00
Names column_names ;
2017-04-01 07:20:54 +00:00
StorageBuffer : : Buffer & buffer ;
2018-01-06 18:10:44 +00:00
const StorageBuffer & storage ;
2017-04-01 07:20:54 +00:00
bool has_been_read = false ;
2014-10-26 00:01:36 +00:00
} ;
2018-04-19 14:47:09 +00:00
QueryProcessingStage : : Enum StorageBuffer : : getQueryProcessingStage ( const Context & context ) const
{
if ( ! no_destination )
{
auto destination = context . getTable ( destination_database , destination_table ) ;
if ( destination . get ( ) = = this )
throw Exception ( " Destination table is myself. Read will cause infinite loop. " , ErrorCodes : : INFINITE_LOOP ) ;
return destination - > getQueryProcessingStage ( context ) ;
}
return QueryProcessingStage : : FetchColumns ;
}
2014-10-26 00:01:36 +00:00
BlockInputStreams StorageBuffer : : read (
2017-04-01 07:20:54 +00:00
const Names & column_names ,
2017-07-15 03:48:36 +00:00
const SelectQueryInfo & query_info ,
2017-04-01 07:20:54 +00:00
const Context & context ,
2018-04-19 14:47:09 +00:00
QueryProcessingStage : : Enum processed_stage ,
2017-04-01 07:20:54 +00:00
size_t max_block_size ,
2017-06-02 15:54:39 +00:00
unsigned num_streams )
2014-10-26 00:01:36 +00:00
{
2017-04-01 07:20:54 +00:00
BlockInputStreams streams_from_dst ;
2014-10-26 00:01:36 +00:00
2017-04-01 07:20:54 +00:00
if ( ! no_destination )
{
auto destination = context . getTable ( destination_database , destination_table ) ;
2015-02-27 20:39:34 +00:00
2017-04-01 07:20:54 +00:00
if ( destination . get ( ) = = this )
throw Exception ( " Destination table is myself. Read will cause infinite loop. " , ErrorCodes : : INFINITE_LOOP ) ;
2015-02-27 20:39:34 +00:00
2017-07-15 03:48:36 +00:00
streams_from_dst = destination - > read ( column_names , query_info , context , processed_stage , max_block_size , num_streams ) ;
2017-04-01 07:20:54 +00:00
}
2014-10-26 00:01:36 +00:00
2017-04-01 07:20:54 +00:00
BlockInputStreams streams_from_buffers ;
streams_from_buffers . reserve ( num_shards ) ;
for ( auto & buf : buffers )
2018-01-06 18:10:44 +00:00
streams_from_buffers . push_back ( std : : make_shared < BufferBlockInputStream > ( column_names , buf , * this ) ) ;
2014-10-26 00:01:36 +00:00
2017-04-01 07:20:54 +00:00
/** If the sources from the table were processed before some non-initial stage of query execution,
* then sources from the buffers must also be wrapped in the processing pipeline before the same stage .
*/
if ( processed_stage > QueryProcessingStage : : FetchColumns )
for ( auto & stream : streams_from_buffers )
2018-07-17 13:09:33 +00:00
stream = InterpreterSelectQuery ( query_info . query , context , stream , processed_stage ) . execute ( ) . in ;
2014-10-26 00:01:36 +00:00
2017-04-01 07:20:54 +00:00
streams_from_dst . insert ( streams_from_dst . end ( ) , streams_from_buffers . begin ( ) , streams_from_buffers . end ( ) ) ;
return streams_from_dst ;
2014-10-26 00:01:36 +00:00
}
2014-10-27 04:18:13 +00:00
static void appendBlock ( const Block & from , Block & to )
{
2017-04-01 07:20:54 +00:00
if ( ! to )
throw Exception ( " Cannot append to empty block " , ErrorCodes : : LOGICAL_ERROR ) ;
2018-02-20 01:14:38 +00:00
assertBlocksHaveEqualStructure ( from , to , " Buffer " ) ;
2017-12-15 20:48:46 +00:00
2017-04-01 07:20:54 +00:00
from . checkNumberOfRows ( ) ;
to . checkNumberOfRows ( ) ;
size_t rows = from . rows ( ) ;
size_t bytes = from . bytes ( ) ;
CurrentMetrics : : add ( CurrentMetrics : : StorageBufferRows , rows ) ;
CurrentMetrics : : add ( CurrentMetrics : : StorageBufferBytes , bytes ) ;
size_t old_rows = to . rows ( ) ;
try
{
for ( size_t column_no = 0 , columns = to . columns ( ) ; column_no < columns ; + + column_no )
{
2017-12-15 20:48:46 +00:00
const IColumn & col_from = * from . getByPosition ( column_no ) . column . get ( ) ;
2018-03-20 10:58:16 +00:00
MutableColumnPtr col_to = ( * std : : move ( to . getByPosition ( column_no ) . column ) ) . mutate ( ) ;
2017-04-01 07:20:54 +00:00
2017-12-15 20:48:46 +00:00
col_to - > insertRangeFrom ( col_from , 0 , rows ) ;
2017-04-01 07:20:54 +00:00
2017-12-15 20:48:46 +00:00
to . getByPosition ( column_no ) . column = std : : move ( col_to ) ;
2017-04-01 07:20:54 +00:00
}
}
catch ( . . . )
{
/// Rollback changes.
try
{
/// Avoid "memory limit exceeded" exceptions during rollback.
2018-05-31 15:54:08 +00:00
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock ( ) ;
2017-04-01 07:20:54 +00:00
for ( size_t column_no = 0 , columns = to . columns ( ) ; column_no < columns ; + + column_no )
{
2017-12-15 20:48:46 +00:00
ColumnPtr & col_to = to . getByPosition ( column_no ) . column ;
2017-04-01 07:20:54 +00:00
if ( col_to - > size ( ) ! = old_rows )
2018-03-20 10:58:16 +00:00
col_to = ( * std : : move ( col_to ) ) . mutate ( ) - > cut ( 0 , old_rows ) ;
2017-04-01 07:20:54 +00:00
}
}
catch ( . . . )
{
/// In case when we cannot rollback, do not leave incorrect state in memory.
std : : terminate ( ) ;
}
throw ;
}
2014-10-27 04:18:13 +00:00
}
2014-10-26 00:01:36 +00:00
class BufferBlockOutputStream : public IBlockOutputStream
{
public :
2017-09-07 21:04:48 +00:00
explicit BufferBlockOutputStream ( StorageBuffer & storage_ ) : storage ( storage_ ) { }
2018-02-19 00:45:32 +00:00
Block getHeader ( ) const override { return storage . getSampleBlock ( ) ; }
2017-04-01 07:20:54 +00:00
void write ( const Block & block ) override
{
if ( ! block )
return ;
size_t rows = block . rows ( ) ;
if ( ! rows )
return ;
StoragePtr destination ;
if ( ! storage . no_destination )
{
destination = storage . context . tryGetTable ( storage . destination_database , storage . destination_table ) ;
if ( destination )
{
if ( destination . get ( ) = = & storage )
throw Exception ( " Destination table is myself. Write will cause infinite loop. " , ErrorCodes : : INFINITE_LOOP ) ;
/// Check table structure.
try
{
destination - > check ( block , true ) ;
}
catch ( Exception & e )
{
e . addMessage ( " (when looking at destination table " + storage . destination_database + " . " + storage . destination_table + " ) " ) ;
throw ;
}
}
}
size_t bytes = block . bytes ( ) ;
/// If the block already exceeds the maximum limit, then we skip the buffer.
if ( rows > storage . max_thresholds . rows | | bytes > storage . max_thresholds . bytes )
{
if ( ! storage . no_destination )
{
LOG_TRACE ( storage . log , " Writing block with " < < rows < < " rows, " < < bytes < < " bytes directly. " ) ;
storage . writeBlockToDestination ( block , destination ) ;
}
return ;
}
/// We distribute the load on the shards by the stream number.
const auto start_shard_num = Poco : : ThreadNumber : : get ( ) % storage . num_shards ;
/// We loop through the buffers, trying to lock mutex. No more than one lap.
auto shard_num = start_shard_num ;
StorageBuffer : : Buffer * least_busy_buffer = nullptr ;
std : : unique_lock < std : : mutex > least_busy_lock ;
size_t least_busy_shard_rows = 0 ;
for ( size_t try_no = 0 ; try_no < storage . num_shards ; + + try_no )
{
2018-08-24 14:51:34 +00:00
std : : unique_lock < std : : mutex > lock ( storage . buffers [ shard_num ] . mutex , std : : try_to_lock ) ;
2017-04-01 07:20:54 +00:00
if ( lock . owns_lock ( ) )
{
size_t num_rows = storage . buffers [ shard_num ] . data . rows ( ) ;
if ( ! least_busy_buffer | | num_rows < least_busy_shard_rows )
{
least_busy_buffer = & storage . buffers [ shard_num ] ;
least_busy_lock = std : : move ( lock ) ;
least_busy_shard_rows = num_rows ;
}
}
shard_num = ( shard_num + 1 ) % storage . num_shards ;
}
/// If you still can not lock anything at once, then we'll wait on mutex.
if ( ! least_busy_buffer )
2018-08-24 14:51:34 +00:00
{
least_busy_buffer = & storage . buffers [ start_shard_num ] ;
least_busy_lock = std : : unique_lock < std : : mutex > ( least_busy_buffer - > mutex ) ;
}
insertIntoBuffer ( block , * least_busy_buffer ) ;
2017-04-01 07:20:54 +00:00
}
2014-10-26 00:01:36 +00:00
private :
2017-04-01 07:20:54 +00:00
StorageBuffer & storage ;
2018-08-24 14:51:34 +00:00
void insertIntoBuffer ( const Block & block , StorageBuffer : : Buffer & buffer )
2017-04-01 07:20:54 +00:00
{
2017-08-04 14:00:26 +00:00
time_t current_time = time ( nullptr ) ;
2017-04-01 07:20:54 +00:00
/// Sort the columns in the block. This is necessary to make it easier to concatenate the blocks later.
Block sorted_block = block . sortColumns ( ) ;
if ( ! buffer . data )
{
buffer . data = sorted_block . cloneEmpty ( ) ;
}
else if ( storage . checkThresholds ( buffer , current_time , sorted_block . rows ( ) , sorted_block . bytes ( ) ) )
{
/** If, after inserting the buffer, the constraints are exceeded, then we will reset the buffer.
* This also protects against unlimited consumption of RAM , since if it is impossible to write to the table ,
* an exception will be thrown , and new data will not be added to the buffer .
*/
2018-08-24 14:51:34 +00:00
storage . flushBuffer ( buffer , true , true /* locked */ ) ;
2017-04-01 07:20:54 +00:00
}
if ( ! buffer . first_write_time )
buffer . first_write_time = current_time ;
appendBlock ( sorted_block , buffer . data ) ;
}
2014-10-26 00:01:36 +00:00
} ;
2017-12-01 21:13:25 +00:00
BlockOutputStreamPtr StorageBuffer : : write ( const ASTPtr & /*query*/ , const Settings & /*settings*/ )
2014-10-26 00:01:36 +00:00
{
2017-04-01 07:20:54 +00:00
return std : : make_shared < BufferBlockOutputStream > ( * this ) ;
2014-10-26 00:01:36 +00:00
}
2018-03-16 09:00:04 +00:00
bool StorageBuffer : : mayBenefitFromIndexForIn ( const ASTPtr & left_in_operand ) const
{
if ( no_destination )
return false ;
auto destination = context . getTable ( destination_database , destination_table ) ;
if ( destination . get ( ) = = this )
throw Exception ( " Destination table is myself. Read will cause infinite loop. " , ErrorCodes : : INFINITE_LOOP ) ;
return destination - > mayBenefitFromIndexForIn ( left_in_operand ) ;
}
2017-06-06 17:06:14 +00:00
void StorageBuffer : : startup ( )
{
2018-03-11 00:15:26 +00:00
if ( context . getSettingsRef ( ) . readonly )
2018-02-01 13:52:29 +00:00
{
LOG_WARNING ( log , " Storage " < < getName ( ) < < " is run with readonly settings, it will not be able to insert data. "
< < " Set apropriate system_profile to fix this. " ) ;
}
2017-06-06 17:06:14 +00:00
flush_thread = std : : thread ( & StorageBuffer : : flushThread , this ) ;
}
2014-10-26 00:01:36 +00:00
void StorageBuffer : : shutdown ( )
{
2017-04-01 07:20:54 +00:00
shutdown_event . set ( ) ;
if ( flush_thread . joinable ( ) )
flush_thread . join ( ) ;
try
{
2017-09-06 20:34:26 +00:00
optimize ( nullptr /*query*/ , { } /*partition*/ , false /*final*/ , false /*deduplicate*/ , context ) ;
2017-04-01 07:20:54 +00:00
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
2014-10-26 00:21:06 +00:00
}
2016-11-18 09:02:49 +00:00
/** NOTE If you do OPTIMIZE after insertion,
* it does not guarantee , that all data will be in destination table at the time of next SELECT just after OPTIMIZE .
*
* Because in case if there was already running flushBuffer method ,
* then call to flushBuffer inside OPTIMIZE will see empty buffer and return quickly ,
* but at the same time , the already running flushBuffer method possibly is not finished ,
* so next SELECT will observe missing data .
*
* This kind of race condition make very hard to implement proper tests .
*/
2017-12-01 21:13:25 +00:00
bool StorageBuffer : : optimize ( const ASTPtr & /*query*/ , const ASTPtr & partition , bool final , bool deduplicate , const Context & /*context*/ )
2014-10-26 00:21:06 +00:00
{
2017-09-06 20:34:26 +00:00
if ( partition )
2017-04-01 07:20:54 +00:00
throw Exception ( " Partition cannot be specified when optimizing table of type Buffer " , ErrorCodes : : NOT_IMPLEMENTED ) ;
2016-05-16 18:43:38 +00:00
2017-04-01 07:20:54 +00:00
if ( final )
throw Exception ( " FINAL cannot be specified when optimizing table of type Buffer " , ErrorCodes : : NOT_IMPLEMENTED ) ;
2014-10-26 00:21:06 +00:00
2017-04-13 13:13:08 +00:00
if ( deduplicate )
throw Exception ( " DEDUPLICATE cannot be specified when optimizing table of type Buffer " , ErrorCodes : : NOT_IMPLEMENTED ) ;
2017-04-01 07:20:54 +00:00
flushAllBuffers ( false ) ;
return true ;
2014-10-26 00:01:36 +00:00
}
2015-12-09 06:55:49 +00:00
bool StorageBuffer : : checkThresholds ( const Buffer & buffer , time_t current_time , size_t additional_rows , size_t additional_bytes ) const
2014-10-26 00:01:36 +00:00
{
2017-04-01 07:20:54 +00:00
time_t time_passed = 0 ;
if ( buffer . first_write_time )
time_passed = current_time - buffer . first_write_time ;
2014-10-26 00:01:36 +00:00
2017-04-01 07:20:54 +00:00
size_t rows = buffer . data . rows ( ) + additional_rows ;
size_t bytes = buffer . data . bytes ( ) + additional_bytes ;
2014-10-26 00:01:36 +00:00
2017-04-01 07:20:54 +00:00
return checkThresholdsImpl ( rows , bytes , time_passed ) ;
2015-12-09 06:55:49 +00:00
}
bool StorageBuffer : : checkThresholdsImpl ( size_t rows , size_t bytes , time_t time_passed ) const
{
2017-04-01 07:20:54 +00:00
if ( time_passed > min_thresholds . time & & rows > min_thresholds . rows & & bytes > min_thresholds . bytes )
{
ProfileEvents : : increment ( ProfileEvents : : StorageBufferPassedAllMinThresholds ) ;
return true ;
}
if ( time_passed > max_thresholds . time )
{
ProfileEvents : : increment ( ProfileEvents : : StorageBufferPassedTimeMaxThreshold ) ;
return true ;
}
if ( rows > max_thresholds . rows )
{
ProfileEvents : : increment ( ProfileEvents : : StorageBufferPassedRowsMaxThreshold ) ;
return true ;
}
if ( bytes > max_thresholds . bytes )
{
ProfileEvents : : increment ( ProfileEvents : : StorageBufferPassedBytesMaxThreshold ) ;
return true ;
}
return false ;
2014-10-26 00:01:36 +00:00
}
2014-12-03 13:28:17 +00:00
void StorageBuffer : : flushAllBuffers ( const bool check_thresholds )
{
2017-04-01 07:20:54 +00:00
for ( auto & buf : buffers )
flushBuffer ( buf , check_thresholds ) ;
2014-12-03 13:28:17 +00:00
}
2018-08-24 14:51:34 +00:00
void StorageBuffer : : flushBuffer ( Buffer & buffer , bool check_thresholds , bool locked )
2014-10-26 00:01:36 +00:00
{
2017-04-01 07:20:54 +00:00
Block block_to_write ;
2017-08-04 14:00:26 +00:00
time_t current_time = time ( nullptr ) ;
2017-04-01 07:20:54 +00:00
size_t rows = 0 ;
size_t bytes = 0 ;
time_t time_passed = 0 ;
2018-08-24 14:51:34 +00:00
std : : unique_lock < std : : mutex > lock ( buffer . mutex , std : : defer_lock ) ;
if ( ! locked )
lock . lock ( ) ;
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
block_to_write = buffer . data . cloneEmpty ( ) ;
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
rows = buffer . data . rows ( ) ;
bytes = buffer . data . bytes ( ) ;
if ( buffer . first_write_time )
time_passed = current_time - buffer . first_write_time ;
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
if ( check_thresholds )
{
if ( ! checkThresholdsImpl ( rows , bytes , time_passed ) )
return ;
}
else
{
if ( rows = = 0 )
return ;
}
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
buffer . data . swap ( block_to_write ) ;
buffer . first_write_time = 0 ;
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
CurrentMetrics : : sub ( CurrentMetrics : : StorageBufferRows , block_to_write . rows ( ) ) ;
CurrentMetrics : : sub ( CurrentMetrics : : StorageBufferBytes , block_to_write . bytes ( ) ) ;
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
ProfileEvents : : increment ( ProfileEvents : : StorageBufferFlush ) ;
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
LOG_TRACE ( log , " Flushing buffer with " < < rows < < " rows, " < < bytes < < " bytes, age " < < time_passed < < " seconds. " ) ;
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
if ( no_destination )
return ;
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
/** For simplicity, buffer is locked during write.
2017-11-15 19:47:49 +00:00
* We could unlock buffer temporary , but it would lead to too many difficulties :
2017-09-07 21:04:48 +00:00
* - data , that is written , will not be visible for SELECTs ;
* - new data could be appended to buffer , and in case of exception , we must merge it with old data , that has not been written ;
* - this could lead to infinite memory growth .
*/
try
{
writeBlockToDestination ( block_to_write , context . tryGetTable ( destination_database , destination_table ) ) ;
}
catch ( . . . )
{
ProfileEvents : : increment ( ProfileEvents : : StorageBufferErrorOnFlush ) ;
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
/// Return the block to its place in the buffer.
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
CurrentMetrics : : add ( CurrentMetrics : : StorageBufferRows , block_to_write . rows ( ) ) ;
CurrentMetrics : : add ( CurrentMetrics : : StorageBufferBytes , block_to_write . bytes ( ) ) ;
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
buffer . data . swap ( block_to_write ) ;
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
if ( ! buffer . first_write_time )
buffer . first_write_time = current_time ;
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
/// After a while, the next write attempt will happen.
throw ;
2017-04-01 07:20:54 +00:00
}
2014-10-27 04:18:13 +00:00
}
2014-10-26 00:01:36 +00:00
2014-10-27 04:18:13 +00:00
void StorageBuffer : : writeBlockToDestination ( const Block & block , StoragePtr table )
{
2017-04-01 07:20:54 +00:00
if ( no_destination | | ! block )
return ;
if ( ! table )
{
LOG_ERROR ( log , " Destination table " < < destination_database < < " . " < < destination_table < < " doesn't exist. Block of data is discarded. " ) ;
return ;
}
auto insert = std : : make_shared < ASTInsertQuery > ( ) ;
insert - > database = destination_database ;
insert - > table = destination_table ;
/** We will insert columns that are the intersection set of columns of the buffer table and the subordinate table.
* This will support some of the cases ( but not all ) when the table structure does not match .
*/
2018-01-01 11:54:14 +00:00
Block structure_of_destination_table = allow_materialized ? table - > getSampleBlock ( ) : table - > getSampleBlockNonMaterialized ( ) ;
2017-04-01 07:20:54 +00:00
Names columns_intersection ;
columns_intersection . reserve ( block . columns ( ) ) ;
for ( size_t i : ext : : range ( 0 , structure_of_destination_table . columns ( ) ) )
{
auto dst_col = structure_of_destination_table . getByPosition ( i ) ;
if ( block . has ( dst_col . name ) )
{
2017-12-23 01:55:46 +00:00
if ( ! block . getByName ( dst_col . name ) . type - > equals ( * dst_col . type ) )
2017-04-01 07:20:54 +00:00
{
LOG_ERROR ( log , " Destination table " < < destination_database < < " . " < < destination_table
2017-10-23 17:32:39 +00:00
< < " have different type of column " < < dst_col . name < < " ( "
< < block . getByName ( dst_col . name ) . type - > getName ( ) < < " != " < < dst_col . type - > getName ( )
< < " ). Block of data is discarded. " ) ;
2017-04-01 07:20:54 +00:00
return ;
}
columns_intersection . push_back ( dst_col . name ) ;
}
}
if ( columns_intersection . empty ( ) )
{
LOG_ERROR ( log , " Destination table " < < destination_database < < " . " < < destination_table < < " have no common columns with block in buffer. Block of data is discarded. " ) ;
return ;
}
if ( columns_intersection . size ( ) ! = block . columns ( ) )
LOG_WARNING ( log , " Not all columns from block in buffer exist in destination table "
< < destination_database < < " . " < < destination_table < < " . Some columns are discarded. " ) ;
auto list_of_columns = std : : make_shared < ASTExpressionList > ( ) ;
insert - > columns = list_of_columns ;
list_of_columns - > children . reserve ( columns_intersection . size ( ) ) ;
for ( const String & column : columns_intersection )
2018-09-20 13:13:33 +00:00
list_of_columns - > children . push_back ( std : : make_shared < ASTIdentifier > ( column ) ) ;
2017-04-01 07:20:54 +00:00
2018-01-12 13:03:19 +00:00
InterpreterInsertQuery interpreter { insert , context , allow_materialized } ;
2017-04-01 07:20:54 +00:00
2018-09-09 03:28:45 +00:00
Block block_to_write ;
for ( const auto & name : columns_intersection )
block_to_write . insert ( block . getByName ( name ) ) ;
2017-04-01 07:20:54 +00:00
auto block_io = interpreter . execute ( ) ;
block_io . out - > writePrefix ( ) ;
2018-09-09 03:28:45 +00:00
block_io . out - > write ( block_to_write ) ;
2017-04-01 07:20:54 +00:00
block_io . out - > writeSuffix ( ) ;
2014-10-26 00:01:36 +00:00
}
void StorageBuffer : : flushThread ( )
{
2017-04-01 07:20:54 +00:00
setThreadName ( " BufferFlush " ) ;
do
{
try
{
flushAllBuffers ( true ) ;
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
} while ( ! shutdown_event . tryWait ( 1000 ) ) ;
2014-10-26 00:01:36 +00:00
}
2016-01-28 01:00:27 +00:00
void StorageBuffer : : alter ( const AlterCommands & params , const String & database_name , const String & table_name , const Context & context )
2014-10-26 00:01:36 +00:00
{
2017-04-01 07:20:54 +00:00
for ( const auto & param : params )
if ( param . type = = AlterCommand : : MODIFY_PRIMARY_KEY )
throw Exception ( " Storage engine " + getName ( ) + " doesn't support primary key. " , ErrorCodes : : NOT_IMPLEMENTED ) ;
2016-05-05 18:28:46 +00:00
2017-09-01 15:05:23 +00:00
auto lock = lockStructureForAlter ( __PRETTY_FUNCTION__ ) ;
2014-10-27 04:18:13 +00:00
2017-04-01 07:20:54 +00:00
/// So that no blocks of the old structure remain.
2017-09-06 20:34:26 +00:00
optimize ( { } /*query*/ , { } /*partition_id*/ , false /*final*/ , false /*deduplicate*/ , context ) ;
2014-10-27 04:18:13 +00:00
2018-03-13 14:18:11 +00:00
ColumnsDescription new_columns = getColumns ( ) ;
params . apply ( new_columns ) ;
context . getDatabase ( database_name ) - > alterTable ( context , table_name , new_columns , { } ) ;
setColumns ( std : : move ( new_columns ) ) ;
2014-10-26 00:01:36 +00:00
}
2017-12-28 21:36:27 +00:00
void registerStorageBuffer ( StorageFactory & factory )
{
/** Buffer(db, table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes)
*
* db , table - in which table to put data from buffer .
* num_buckets - level of parallelism .
* min_time , max_time , min_rows , max_rows , min_bytes , max_bytes - conditions for flushing the buffer .
*/
2017-12-30 00:36:06 +00:00
factory . registerStorage ( " Buffer " , [ ] ( const StorageFactory : : Arguments & args )
2017-12-28 21:36:27 +00:00
{
2017-12-30 00:36:06 +00:00
ASTs & engine_args = args . engine_args ;
if ( engine_args . size ( ) ! = 9 )
2017-12-28 21:36:27 +00:00
throw Exception ( " Storage Buffer requires 9 parameters: "
" destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes. " ,
ErrorCodes : : NUMBER_OF_ARGUMENTS_DOESNT_MATCH ) ;
2017-12-30 00:36:06 +00:00
engine_args [ 0 ] = evaluateConstantExpressionOrIdentifierAsLiteral ( engine_args [ 0 ] , args . local_context ) ;
engine_args [ 1 ] = evaluateConstantExpressionOrIdentifierAsLiteral ( engine_args [ 1 ] , args . local_context ) ;
2017-12-28 21:36:27 +00:00
2017-12-30 00:36:06 +00:00
String destination_database = static_cast < const ASTLiteral & > ( * engine_args [ 0 ] ) . value . safeGet < String > ( ) ;
String destination_table = static_cast < const ASTLiteral & > ( * engine_args [ 1 ] ) . value . safeGet < String > ( ) ;
2017-12-28 21:36:27 +00:00
2017-12-30 00:36:06 +00:00
UInt64 num_buckets = applyVisitor ( FieldVisitorConvertToNumber < UInt64 > ( ) , typeid_cast < ASTLiteral & > ( * engine_args [ 2 ] ) . value ) ;
2017-12-28 21:36:27 +00:00
2017-12-30 00:36:06 +00:00
Int64 min_time = applyVisitor ( FieldVisitorConvertToNumber < Int64 > ( ) , typeid_cast < ASTLiteral & > ( * engine_args [ 3 ] ) . value ) ;
Int64 max_time = applyVisitor ( FieldVisitorConvertToNumber < Int64 > ( ) , typeid_cast < ASTLiteral & > ( * engine_args [ 4 ] ) . value ) ;
UInt64 min_rows = applyVisitor ( FieldVisitorConvertToNumber < UInt64 > ( ) , typeid_cast < ASTLiteral & > ( * engine_args [ 5 ] ) . value ) ;
UInt64 max_rows = applyVisitor ( FieldVisitorConvertToNumber < UInt64 > ( ) , typeid_cast < ASTLiteral & > ( * engine_args [ 6 ] ) . value ) ;
UInt64 min_bytes = applyVisitor ( FieldVisitorConvertToNumber < UInt64 > ( ) , typeid_cast < ASTLiteral & > ( * engine_args [ 7 ] ) . value ) ;
UInt64 max_bytes = applyVisitor ( FieldVisitorConvertToNumber < UInt64 > ( ) , typeid_cast < ASTLiteral & > ( * engine_args [ 8 ] ) . value ) ;
2017-12-28 21:36:27 +00:00
return StorageBuffer : : create (
2017-12-30 00:36:06 +00:00
args . table_name , args . columns ,
args . context ,
2017-12-28 21:36:27 +00:00
num_buckets ,
StorageBuffer : : Thresholds { min_time , min_rows , min_bytes } ,
StorageBuffer : : Thresholds { max_time , max_rows , max_bytes } ,
2018-01-12 13:03:19 +00:00
destination_database , destination_table ,
static_cast < bool > ( args . local_context . getSettingsRef ( ) . insert_allow_materialized_columns ) ) ;
2017-12-28 21:36:27 +00:00
} ) ;
}
2014-10-26 00:01:36 +00:00
}