2018-12-01 12:42:56 +00:00
# include <boost/range/algorithm_ext/erase.hpp>
2017-04-01 09:19:00 +00:00
# include <Interpreters/InterpreterSelectQuery.h>
# include <Interpreters/InterpreterInsertQuery.h>
# include <Interpreters/InterpreterAlterQuery.h>
2018-11-19 06:14:36 +00:00
# include <Interpreters/castColumn.h>
2017-12-30 00:36:06 +00:00
# include <Interpreters/evaluateConstantExpression.h>
2021-02-05 11:41:44 +00:00
# include <Interpreters/addMissingDefaults.h>
2017-04-01 09:19:00 +00:00
# include <Storages/StorageBuffer.h>
2017-12-28 21:36:27 +00:00
# include <Storages/StorageFactory.h>
2018-12-25 23:14:39 +00:00
# include <Storages/AlterCommands.h>
2017-04-01 09:19:00 +00:00
# include <Parsers/ASTInsertQuery.h>
# include <Parsers/ASTIdentifier.h>
2017-12-30 00:36:06 +00:00
# include <Parsers/ASTLiteral.h>
2017-04-01 09:19:00 +00:00
# include <Parsers/ASTExpressionList.h>
# include <Common/CurrentMetrics.h>
2017-04-08 01:32:05 +00:00
# include <Common/MemoryTracker.h>
2021-06-14 04:13:35 +00:00
# include <Common/FieldVisitorConvertToNumber.h>
2019-10-08 18:42:22 +00:00
# include <Common/quoteString.h>
2017-12-30 00:36:06 +00:00
# include <Common/typeid_cast.h>
2018-06-05 19:46:49 +00:00
# include <Common/ProfileEvents.h>
2021-10-02 07:13:14 +00:00
# include <base/logger_useful.h>
# include <base/getThreadId.h>
# include <base/range.h>
2020-11-17 17:16:55 +00:00
# include <Processors/QueryPlan/ExpressionStep.h>
2020-01-29 18:14:40 +00:00
# include <Processors/Transforms/FilterTransform.h>
# include <Processors/Transforms/ExpressionTransform.h>
2021-07-23 14:25:35 +00:00
# include <Processors/Sinks/SinkToStorage.h>
2021-10-13 18:22:02 +00:00
# include <Processors/Sources/SourceWithProgress.h>
2021-09-08 18:29:38 +00:00
# include <Processors/QueryPlan/QueryPlan.h>
2020-09-29 16:21:58 +00:00
# include <Processors/QueryPlan/SettingQuotaAndLimitsStep.h>
# include <Processors/QueryPlan/ReadFromPreparedSource.h>
# include <Processors/QueryPlan/UnionStep.h>
2021-03-04 17:38:12 +00:00
# include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
# include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
2021-09-03 17:29:36 +00:00
# include <Processors/Executors/PushingPipelineExecutor.h>
2014-10-26 00:01:36 +00:00
2021-07-17 18:06:46 +00:00
2016-12-19 23:55:13 +00:00
namespace ProfileEvents
{
2017-04-01 07:20:54 +00:00
extern const Event StorageBufferFlush ;
extern const Event StorageBufferErrorOnFlush ;
extern const Event StorageBufferPassedAllMinThresholds ;
extern const Event StorageBufferPassedTimeMaxThreshold ;
extern const Event StorageBufferPassedRowsMaxThreshold ;
extern const Event StorageBufferPassedBytesMaxThreshold ;
2021-04-12 06:04:38 +00:00
extern const Event StorageBufferPassedTimeFlushThreshold ;
extern const Event StorageBufferPassedRowsFlushThreshold ;
extern const Event StorageBufferPassedBytesFlushThreshold ;
2021-04-06 18:12:40 +00:00
extern const Event StorageBufferLayerLockReadersWaitMilliseconds ;
extern const Event StorageBufferLayerLockWritersWaitMilliseconds ;
2016-12-19 23:55:13 +00:00
}
namespace CurrentMetrics
{
2017-04-01 07:20:54 +00:00
extern const Metric StorageBufferRows ;
extern const Metric StorageBufferBytes ;
2016-12-19 23:55:13 +00:00
}
2014-10-26 00:01:36 +00:00
namespace DB
{
2016-01-11 21:46:36 +00:00
namespace ErrorCodes
{
2020-07-13 13:58:15 +00:00
extern const int BAD_ARGUMENTS ;
2020-02-25 18:02:41 +00:00
extern const int NOT_IMPLEMENTED ;
extern const int LOGICAL_ERROR ;
2017-04-01 07:20:54 +00:00
extern const int INFINITE_LOOP ;
2017-12-30 00:36:06 +00:00
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH ;
2021-02-28 05:24:39 +00:00
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN ;
2016-01-11 21:46:36 +00:00
}
2014-10-26 00:01:36 +00:00
2021-04-06 18:12:40 +00:00
std : : unique_lock < std : : mutex > StorageBuffer : : Buffer : : lockForReading ( ) const
{
return lockImpl ( /* read= */ true ) ;
}
std : : unique_lock < std : : mutex > StorageBuffer : : Buffer : : lockForWriting ( ) const
{
return lockImpl ( /* read= */ false ) ;
}
std : : unique_lock < std : : mutex > StorageBuffer : : Buffer : : tryLock ( ) const
{
std : : unique_lock lock ( mutex , std : : try_to_lock ) ;
return lock ;
}
std : : unique_lock < std : : mutex > StorageBuffer : : Buffer : : lockImpl ( bool read ) const
{
std : : unique_lock lock ( mutex , std : : defer_lock ) ;
Stopwatch watch ( CLOCK_MONOTONIC_COARSE ) ;
lock . lock ( ) ;
UInt64 elapsed = watch . elapsedMilliseconds ( ) ;
if ( read )
ProfileEvents : : increment ( ProfileEvents : : StorageBufferLayerLockReadersWaitMilliseconds , elapsed ) ;
else
ProfileEvents : : increment ( ProfileEvents : : StorageBufferLayerLockWritersWaitMilliseconds , elapsed ) ;
return lock ;
}
2019-12-04 16:06:55 +00:00
StorageBuffer : : StorageBuffer (
const StorageID & table_id_ ,
const ColumnsDescription & columns_ ,
const ConstraintsDescription & constraints_ ,
2021-04-23 12:18:23 +00:00
const String & comment ,
2021-04-10 23:33:54 +00:00
ContextPtr context_ ,
2019-12-04 16:06:55 +00:00
size_t num_shards_ ,
const Thresholds & min_thresholds_ ,
const Thresholds & max_thresholds_ ,
2021-04-12 06:04:38 +00:00
const Thresholds & flush_thresholds_ ,
2020-02-17 19:28:25 +00:00
const StorageID & destination_id_ ,
2019-12-04 16:06:55 +00:00
bool allow_materialized_ )
: IStorage ( table_id_ )
2021-04-10 23:33:54 +00:00
, WithContext ( context_ - > getBufferContext ( ) )
2021-04-23 12:18:23 +00:00
, num_shards ( num_shards_ )
, buffers ( num_shards_ )
2019-12-04 16:06:55 +00:00
, min_thresholds ( min_thresholds_ )
, max_thresholds ( max_thresholds_ )
2021-04-12 06:04:38 +00:00
, flush_thresholds ( flush_thresholds_ )
2020-02-17 19:28:25 +00:00
, destination_id ( destination_id_ )
2019-12-04 16:06:55 +00:00
, allow_materialized ( allow_materialized_ )
2020-05-30 21:57:37 +00:00
, log ( & Poco : : Logger : : get ( " StorageBuffer ( " + table_id_ . getFullTableName ( ) + " ) " ) )
2021-04-10 23:33:54 +00:00
, bg_pool ( getContext ( ) - > getBufferFlushSchedulePool ( ) )
2014-10-26 00:01:36 +00:00
{
2020-06-19 15:39:41 +00:00
StorageInMemoryMetadata storage_metadata ;
storage_metadata . setColumns ( columns_ ) ;
storage_metadata . setConstraints ( constraints_ ) ;
2021-04-23 12:18:23 +00:00
storage_metadata . setComment ( comment ) ;
2020-06-19 15:39:41 +00:00
setInMemoryMetadata ( storage_metadata ) ;
2021-10-09 22:51:41 +00:00
flush_handle = bg_pool . createTask ( log - > name ( ) + " /Bg " , [ this ] { backgroundFlush ( ) ; } ) ;
2014-10-26 00:01:36 +00:00
}
2017-03-12 19:18:07 +00:00
/// Reads from one buffer (from one block) under its mutex.
2020-01-29 18:14:40 +00:00
class BufferSource : public SourceWithProgress
2014-10-26 00:01:36 +00:00
{
public :
2020-06-16 14:25:08 +00:00
BufferSource ( const Names & column_names_ , StorageBuffer : : Buffer & buffer_ , const StorageBuffer & storage , const StorageMetadataPtr & metadata_snapshot )
: SourceWithProgress (
2020-06-19 17:17:13 +00:00
metadata_snapshot - > getSampleBlockForColumns ( column_names_ , storage . getVirtuals ( ) , storage . getStorageID ( ) ) )
2021-07-15 03:12:37 +00:00
, column_names_and_types ( metadata_snapshot - > getColumns ( ) . getByNames ( ColumnsDescription : : All , column_names_ , true ) )
2020-06-16 14:25:08 +00:00
, buffer ( buffer_ ) { }
2014-10-26 00:01:36 +00:00
2018-01-06 18:10:44 +00:00
String getName ( ) const override { return " Buffer " ; }
2014-10-26 00:01:36 +00:00
protected :
2020-01-29 18:14:40 +00:00
Chunk generate ( ) override
2017-04-01 07:20:54 +00:00
{
2020-01-29 18:14:40 +00:00
Chunk res ;
2014-10-26 00:12:39 +00:00
2017-04-01 07:20:54 +00:00
if ( has_been_read )
return res ;
has_been_read = true ;
2014-10-26 00:12:39 +00:00
2021-04-06 18:12:40 +00:00
std : : unique_lock lock ( buffer . lockForReading ( ) ) ;
2014-10-26 00:01:36 +00:00
2017-04-01 07:20:54 +00:00
if ( ! buffer . data . rows ( ) )
return res ;
2014-10-26 00:01:36 +00:00
2020-01-29 18:14:40 +00:00
Columns columns ;
2020-12-07 19:02:26 +00:00
columns . reserve ( column_names_and_types . size ( ) ) ;
2020-01-29 18:14:40 +00:00
2020-12-07 19:02:26 +00:00
for ( const auto & elem : column_names_and_types )
2021-09-18 19:31:30 +00:00
columns . emplace_back ( getColumnFromBlock ( buffer . data , elem ) ) ;
2020-01-29 18:14:40 +00:00
UInt64 size = columns . at ( 0 ) - > size ( ) ;
res . setColumns ( std : : move ( columns ) , size ) ;
2014-10-26 00:01:36 +00:00
2017-04-01 07:20:54 +00:00
return res ;
}
2014-10-26 00:01:36 +00:00
private :
2020-12-07 19:02:26 +00:00
NamesAndTypesList column_names_and_types ;
2017-04-01 07:20:54 +00:00
StorageBuffer : : Buffer & buffer ;
bool has_been_read = false ;
2014-10-26 00:01:36 +00:00
} ;
2021-04-22 13:32:17 +00:00
QueryProcessingStage : : Enum StorageBuffer : : getQueryProcessingStage (
ContextPtr local_context ,
QueryProcessingStage : : Enum to_stage ,
const StorageMetadataPtr & ,
SelectQueryInfo & query_info ) const
2018-04-19 14:47:09 +00:00
{
2020-02-17 19:28:25 +00:00
if ( destination_id )
2018-04-19 14:47:09 +00:00
{
2021-04-10 23:33:54 +00:00
auto destination = DatabaseCatalog : : instance ( ) . getTable ( destination_id , local_context ) ;
2018-04-19 14:47:09 +00:00
if ( destination . get ( ) = = this )
throw Exception ( " Destination table is myself. Read will cause infinite loop. " , ErrorCodes : : INFINITE_LOOP ) ;
2021-04-22 13:32:17 +00:00
return destination - > getQueryProcessingStage ( local_context , to_stage , destination - > getInMemoryMetadataPtr ( ) , query_info ) ;
2018-04-19 14:47:09 +00:00
}
return QueryProcessingStage : : FetchColumns ;
}
2020-01-30 10:22:59 +00:00
2020-08-03 13:54:14 +00:00
Pipe StorageBuffer : : read (
2020-01-30 10:26:25 +00:00
const Names & column_names ,
2020-06-16 14:25:08 +00:00
const StorageMetadataPtr & metadata_snapshot ,
2020-11-10 12:02:22 +00:00
SelectQueryInfo & query_info ,
2021-04-10 23:33:54 +00:00
ContextPtr local_context ,
2020-09-29 16:21:58 +00:00
QueryProcessingStage : : Enum processed_stage ,
const size_t max_block_size ,
const unsigned num_streams )
{
QueryPlan plan ;
2021-04-10 23:33:54 +00:00
read ( plan , column_names , metadata_snapshot , query_info , local_context , processed_stage , max_block_size , num_streams ) ;
2021-03-04 17:38:12 +00:00
return plan . convertToPipe (
2021-04-10 23:33:54 +00:00
QueryPlanOptimizationSettings : : fromContext ( local_context ) ,
BuildQueryPipelineSettings : : fromContext ( local_context ) ) ;
2020-09-29 16:21:58 +00:00
}
void StorageBuffer : : read (
QueryPlan & query_plan ,
2020-01-30 10:26:25 +00:00
const Names & column_names ,
2020-06-16 14:25:08 +00:00
const StorageMetadataPtr & metadata_snapshot ,
2020-09-20 17:52:17 +00:00
SelectQueryInfo & query_info ,
2021-04-10 23:33:54 +00:00
ContextPtr local_context ,
2020-01-30 10:26:25 +00:00
QueryProcessingStage : : Enum processed_stage ,
size_t max_block_size ,
unsigned num_streams )
{
2020-02-17 19:28:25 +00:00
if ( destination_id )
2017-04-01 07:20:54 +00:00
{
2021-04-10 23:33:54 +00:00
auto destination = DatabaseCatalog : : instance ( ) . getTable ( destination_id , local_context ) ;
2015-02-27 20:39:34 +00:00
2017-04-01 07:20:54 +00:00
if ( destination . get ( ) = = this )
throw Exception ( " Destination table is myself. Read will cause infinite loop. " , ErrorCodes : : INFINITE_LOOP ) ;
2015-02-27 20:39:34 +00:00
2021-04-10 23:33:54 +00:00
auto destination_lock = destination - > lockForShare ( local_context - > getCurrentQueryId ( ) , local_context - > getSettingsRef ( ) . lock_acquire_timeout ) ;
2018-11-30 15:34:24 +00:00
2020-06-16 18:41:11 +00:00
auto destination_metadata_snapshot = destination - > getInMemoryMetadataPtr ( ) ;
2020-06-17 16:39:58 +00:00
const bool dst_has_same_structure = std : : all_of ( column_names . begin ( ) , column_names . end ( ) , [ metadata_snapshot , destination_metadata_snapshot ] ( const String & column_name )
2018-11-19 06:14:36 +00:00
{
2020-06-17 16:39:58 +00:00
const auto & dest_columns = destination_metadata_snapshot - > getColumns ( ) ;
2020-06-16 15:51:29 +00:00
const auto & our_columns = metadata_snapshot - > getColumns ( ) ;
2021-07-15 17:36:48 +00:00
auto dest_columm = dest_columns . tryGetColumnOrSubcolumn ( ColumnsDescription : : AllPhysical , column_name ) ;
return dest_columm & & dest_columm - > type - > equals ( * our_columns . getColumnOrSubcolumn ( ColumnsDescription : : AllPhysical , column_name ) . type ) ;
2018-12-01 12:42:56 +00:00
} ) ;
if ( dst_has_same_structure )
{
2020-05-13 13:49:10 +00:00
if ( query_info . order_optimizer )
2021-04-10 23:33:54 +00:00
query_info . input_order_info = query_info . order_optimizer - > getInputOrder ( destination_metadata_snapshot , local_context ) ;
2019-12-11 13:09:46 +00:00
2018-12-01 12:42:56 +00:00
/// The destination table has the same structure of the requested columns and we can simply read blocks from there.
2020-09-29 16:21:58 +00:00
destination - > read (
query_plan , column_names , destination_metadata_snapshot , query_info ,
2021-04-10 23:33:54 +00:00
local_context , processed_stage , max_block_size , num_streams ) ;
2018-12-01 12:42:56 +00:00
}
else
2018-11-19 06:14:36 +00:00
{
2018-12-01 12:42:56 +00:00
/// There is a struct mismatch and we need to convert read blocks from the destination table.
2020-06-16 15:51:29 +00:00
const Block header = metadata_snapshot - > getSampleBlock ( ) ;
2018-12-01 12:42:56 +00:00
Names columns_intersection = column_names ;
Block header_after_adding_defaults = header ;
2020-06-17 16:39:58 +00:00
const auto & dest_columns = destination_metadata_snapshot - > getColumns ( ) ;
const auto & our_columns = metadata_snapshot - > getColumns ( ) ;
2018-12-01 12:42:56 +00:00
for ( const String & column_name : column_names )
2018-11-27 00:43:58 +00:00
{
2020-04-24 10:20:03 +00:00
if ( ! dest_columns . hasPhysical ( column_name ) )
2018-12-01 12:42:56 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_WARNING ( log , " Destination table {} doesn't have column {}. The default values are used. " , destination_id . getNameForLogs ( ) , backQuoteIfNeed ( column_name ) ) ;
2018-12-01 12:42:56 +00:00
boost : : range : : remove_erase ( columns_intersection , column_name ) ;
continue ;
}
2020-04-27 15:38:35 +00:00
const auto & dst_col = dest_columns . getPhysical ( column_name ) ;
const auto & col = our_columns . getPhysical ( column_name ) ;
2018-12-01 12:42:56 +00:00
if ( ! dst_col . type - > equals ( * col . type ) )
2018-11-27 00:43:58 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_WARNING ( log , " Destination table {} has different type of column {} ({} != {}). Data from destination table are converted. " , destination_id . getNameForLogs ( ) , backQuoteIfNeed ( column_name ) , dst_col . type - > getName ( ) , col . type - > getName ( ) ) ;
2018-12-01 12:42:56 +00:00
header_after_adding_defaults . getByName ( column_name ) = ColumnWithTypeAndName ( dst_col . type , column_name ) ;
2018-11-27 00:43:58 +00:00
}
}
2018-12-01 12:42:56 +00:00
if ( columns_intersection . empty ( ) )
2018-11-27 00:43:58 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_WARNING ( log , " Destination table {} has no common columns with block in buffer. Block of data is skipped. " , destination_id . getNameForLogs ( ) ) ;
2018-11-27 00:43:58 +00:00
}
2018-12-01 12:42:56 +00:00
else
2018-11-27 00:43:58 +00:00
{
2020-09-29 16:21:58 +00:00
destination - > read (
query_plan , columns_intersection , destination_metadata_snapshot , query_info ,
2021-04-10 23:33:54 +00:00
local_context , processed_stage , max_block_size , num_streams ) ;
2020-06-15 19:08:58 +00:00
2020-10-06 08:21:05 +00:00
if ( query_plan . isInitialized ( ) )
2018-12-01 12:42:56 +00:00
{
2020-10-06 08:21:05 +00:00
2021-02-05 11:41:44 +00:00
auto actions = addMissingDefaults (
query_plan . getCurrentDataStream ( ) . header ,
header_after_adding_defaults . getNamesAndTypesList ( ) ,
metadata_snapshot - > getColumns ( ) ,
2021-04-10 23:33:54 +00:00
local_context ) ;
2021-02-05 11:41:44 +00:00
auto adding_missed = std : : make_unique < ExpressionStep > (
2020-10-06 08:21:05 +00:00
query_plan . getCurrentDataStream ( ) ,
2021-03-04 17:38:12 +00:00
std : : move ( actions ) ) ;
2020-10-06 08:24:12 +00:00
2020-10-06 08:21:05 +00:00
adding_missed - > setStepDescription ( " Add columns missing in destination table " ) ;
query_plan . addStep ( std : : move ( adding_missed ) ) ;
2020-09-29 16:21:58 +00:00
2020-11-17 17:16:55 +00:00
auto actions_dag = ActionsDAG : : makeConvertingActions (
query_plan . getCurrentDataStream ( ) . header . getColumnsWithTypeAndName ( ) ,
header . getColumnsWithTypeAndName ( ) ,
ActionsDAG : : MatchColumnsMode : : Name ) ;
2021-03-04 17:38:12 +00:00
auto converting = std : : make_unique < ExpressionStep > ( query_plan . getCurrentDataStream ( ) , actions_dag ) ;
2020-09-29 16:21:58 +00:00
2020-10-06 08:21:05 +00:00
converting - > setStepDescription ( " Convert destination table columns to Buffer table structure " ) ;
query_plan . addStep ( std : : move ( converting ) ) ;
2020-10-06 08:24:12 +00:00
}
2018-11-27 00:43:58 +00:00
}
}
2018-12-01 12:42:56 +00:00
2020-09-29 16:21:58 +00:00
if ( query_plan . isInitialized ( ) )
{
StreamLocalLimits limits ;
SizeLimits leaf_limits ;
/// Add table lock for destination table.
auto adding_limits_and_quota = std : : make_unique < SettingQuotaAndLimitsStep > (
query_plan . getCurrentDataStream ( ) ,
destination ,
std : : move ( destination_lock ) ,
limits ,
leaf_limits ,
nullptr ,
nullptr ) ;
adding_limits_and_quota - > setStepDescription ( " Lock destination table for Buffer " ) ;
query_plan . addStep ( std : : move ( adding_limits_and_quota ) ) ;
}
2017-04-01 07:20:54 +00:00
}
2014-10-26 00:01:36 +00:00
2020-08-03 13:54:14 +00:00
Pipe pipe_from_buffers ;
{
Pipes pipes_from_buffers ;
pipes_from_buffers . reserve ( num_shards ) ;
for ( auto & buf : buffers )
pipes_from_buffers . emplace_back ( std : : make_shared < BufferSource > ( column_names , buf , * this , metadata_snapshot ) ) ;
pipe_from_buffers = Pipe : : unitePipes ( std : : move ( pipes_from_buffers ) ) ;
}
2014-10-26 00:01:36 +00:00
2020-09-29 16:21:58 +00:00
if ( pipe_from_buffers . empty ( ) )
return ;
QueryPlan buffers_plan ;
2020-06-11 12:18:45 +00:00
2017-04-01 07:20:54 +00:00
/** If the sources from the table were processed before some non-initial stage of query execution,
* then sources from the buffers must also be wrapped in the processing pipeline before the same stage .
*/
if ( processed_stage > QueryProcessingStage : : FetchColumns )
2019-03-14 08:05:18 +00:00
{
2020-09-29 16:21:58 +00:00
auto interpreter = InterpreterSelectQuery (
2021-04-10 23:33:54 +00:00
query_info . query , local_context , std : : move ( pipe_from_buffers ) ,
2020-09-29 16:21:58 +00:00
SelectQueryOptions ( processed_stage ) ) ;
interpreter . buildQueryPlan ( buffers_plan ) ;
}
else
{
if ( query_info . prewhere_info )
2019-03-14 08:05:18 +00:00
{
2021-06-25 14:49:28 +00:00
auto actions_settings = ExpressionActionsSettings : : fromContext ( local_context ) ;
2021-02-15 19:48:06 +00:00
if ( query_info . prewhere_info - > alias_actions )
2020-08-03 13:54:14 +00:00
{
2020-09-29 16:21:58 +00:00
pipe_from_buffers . addSimpleTransform ( [ & ] ( const Block & header )
{
2021-02-15 19:48:06 +00:00
return std : : make_shared < ExpressionTransform > (
2021-02-13 22:07:13 +00:00
header ,
2021-06-25 14:49:28 +00:00
std : : make_shared < ExpressionActions > ( query_info . prewhere_info - > alias_actions , actions_settings ) ) ;
2020-09-29 16:21:58 +00:00
} ) ;
2021-02-13 22:07:13 +00:00
}
2020-09-29 16:21:58 +00:00
2021-02-15 19:48:06 +00:00
if ( query_info . prewhere_info - > row_level_filter )
2020-09-29 16:21:58 +00:00
{
pipe_from_buffers . addSimpleTransform ( [ & ] ( const Block & header )
{
2021-02-15 19:48:06 +00:00
return std : : make_shared < FilterTransform > (
header ,
2021-06-25 14:49:28 +00:00
std : : make_shared < ExpressionActions > ( query_info . prewhere_info - > row_level_filter , actions_settings ) ,
2021-02-15 19:48:06 +00:00
query_info . prewhere_info - > row_level_column_name ,
false ) ;
2020-09-29 16:21:58 +00:00
} ) ;
}
2021-02-13 22:07:13 +00:00
pipe_from_buffers . addSimpleTransform ( [ & ] ( const Block & header )
{
return std : : make_shared < FilterTransform > (
header ,
2021-06-25 14:49:28 +00:00
std : : make_shared < ExpressionActions > ( query_info . prewhere_info - > prewhere_actions , actions_settings ) ,
2021-02-13 22:07:13 +00:00
query_info . prewhere_info - > prewhere_column_name ,
query_info . prewhere_info - > remove_prewhere_column ) ;
} ) ;
2019-03-14 08:05:18 +00:00
}
2020-09-29 16:21:58 +00:00
auto read_from_buffers = std : : make_unique < ReadFromPreparedSource > ( std : : move ( pipe_from_buffers ) ) ;
read_from_buffers - > setStepDescription ( " Read from buffers of Buffer table " ) ;
buffers_plan . addStep ( std : : move ( read_from_buffers ) ) ;
2019-03-14 08:05:18 +00:00
}
2020-09-29 16:21:58 +00:00
if ( ! query_plan . isInitialized ( ) )
{
query_plan = std : : move ( buffers_plan ) ;
return ;
}
auto result_header = buffers_plan . getCurrentDataStream ( ) . header ;
/// Convert structure from table to structure from buffer.
if ( ! blocksHaveEqualStructure ( query_plan . getCurrentDataStream ( ) . header , result_header ) )
{
2020-11-17 17:16:55 +00:00
auto convert_actions_dag = ActionsDAG : : makeConvertingActions (
query_plan . getCurrentDataStream ( ) . header . getColumnsWithTypeAndName ( ) ,
result_header . getColumnsWithTypeAndName ( ) ,
ActionsDAG : : MatchColumnsMode : : Name ) ;
auto converting = std : : make_unique < ExpressionStep > ( query_plan . getCurrentDataStream ( ) , convert_actions_dag ) ;
2020-09-29 16:21:58 +00:00
query_plan . addStep ( std : : move ( converting ) ) ;
2019-03-14 08:05:18 +00:00
}
2020-09-29 16:21:58 +00:00
DataStreams input_streams ;
input_streams . emplace_back ( query_plan . getCurrentDataStream ( ) ) ;
input_streams . emplace_back ( buffers_plan . getCurrentDataStream ( ) ) ;
std : : vector < std : : unique_ptr < QueryPlan > > plans ;
plans . emplace_back ( std : : make_unique < QueryPlan > ( std : : move ( query_plan ) ) ) ;
plans . emplace_back ( std : : make_unique < QueryPlan > ( std : : move ( buffers_plan ) ) ) ;
query_plan = QueryPlan ( ) ;
2021-03-25 09:57:14 +00:00
auto union_step = std : : make_unique < UnionStep > ( std : : move ( input_streams ) ) ;
2020-09-29 16:21:58 +00:00
union_step - > setStepDescription ( " Unite sources from Buffer table " ) ;
query_plan . unitePlans ( std : : move ( union_step ) , std : : move ( plans ) ) ;
2014-10-26 00:01:36 +00:00
}
2020-07-11 21:58:32 +00:00
static void appendBlock ( const Block & from , Block & to )
2014-10-27 04:18:13 +00:00
{
2017-04-01 07:20:54 +00:00
if ( ! to )
throw Exception ( " Cannot append to empty block " , ErrorCodes : : LOGICAL_ERROR ) ;
2021-10-22 16:25:48 +00:00
if ( to . rows ( ) )
assertBlocksHaveEqualStructure ( from , to , " Buffer " ) ;
2017-12-15 20:48:46 +00:00
2017-04-01 07:20:54 +00:00
from . checkNumberOfRows ( ) ;
to . checkNumberOfRows ( ) ;
size_t rows = from . rows ( ) ;
size_t bytes = from . bytes ( ) ;
CurrentMetrics : : add ( CurrentMetrics : : StorageBufferRows , rows ) ;
CurrentMetrics : : add ( CurrentMetrics : : StorageBufferBytes , bytes ) ;
size_t old_rows = to . rows ( ) ;
Fix NULL dereference in Buffer rollback
<details>
Stacktrace:
```
(gdb) bt
0 DB::appendBlock (from=..., to=...) at ../src/Storages/StorageBuffer.cpp:411
1 DB::BufferBlockOutputStream::insertIntoBuffer (this=<optimized out>, block=..., buffer=...) at ../src/Storages/StorageBuffer.cpp:541
2 0x000000000f2e9d5f in DB::BufferBlockOutputStream::write (this=<optimized out>, block=...) at ../src/Storages/StorageBuffer.cpp:508
3 0x000000000ec426c4 in DB::PushingToViewsBlockOutputStream::write (this=0x7f74660faa18, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:160
4 0x000000000ec49633 in DB::AddingDefaultBlockOutputStream::write (this=0x7f74660f1b18, block=...) at ../src/DataStreams/AddingDefaultBlockOutputStream.cpp:10
5 0x000000000ec483ac in DB::SquashingBlockOutputStream::finalize (this=0x7f74660f1d18) at ../src/DataStreams/SquashingBlockOutputStream.cpp:30
6 0x000000000ec48429 in DB::SquashingBlockOutputStream::writeSuffix (this=0x7f74660f1d18) at ../src/DataStreams/SquashingBlockOutputStream.cpp:50
7 0x000000000ec43f8f in DB::PushingToViewsBlockOutputStream::writeSuffix (this=0x7f74660f8258) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:280
8 0x000000000ec43f8f in DB::PushingToViewsBlockOutputStream::writeSuffix (this=0x7f74b7ddea18) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:280
9 0x000000000f2e6748 in DB::StorageBuffer::writeBlockToDestination (this=<optimized out>, block=..., table=...) at ../src/Storages/StorageBuffer.cpp:820
10 0x000000000f2ea00b in DB::BufferBlockOutputStream::write (this=0x7f7574e11748, block=...) at ../src/Storages/StorageBuffer.cpp:469
11 0x000000000ec426c4 in DB::PushingToViewsBlockOutputStream::write (this=0x7f7574ed3658, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:160
12 0x000000000ec49633 in DB::AddingDefaultBlockOutputStream::write (this=0x7f7574e84518, block=...) at ../src/DataStreams/AddingDefaultBlockOutputStream.cpp:10
13 0x000000000ec482f4 in DB::SquashingBlockOutputStream::write (this=0x7f7574e84718, block=...) at ../src/DataStreams/SquashingBlockOutputStream.cpp:17
14 0x000000000ebe8bce in DB::CountingBlockOutputStream::write (this=0x7f7574ed3720, block=...) at ../src/DataStreams/CountingBlockOutputStream.cpp:17
15 0x000000000f68e834 in DB::TCPHandler::receiveData (this=<optimized out>, scalar=<optimized out>) at ../src/Server/TCPHandler.cpp:1168
16 0x000000000f68737c in DB::TCPHandler::receivePacket (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:918
17 0x000000000f688d2f in DB::TCPHandler::readDataNext (this=0x7f7574f17000, poll_interval=@0x7f6f1dff1f78: 10000000, receive_timeout=@0x7f6f1dff1f68: 300) at ../src/Server/TCPHandler.cpp:460
18 0x000000000f6878be in DB::TCPHandler::readData (this=0x7f7574f17000, connection_settings=...) at ../src/Server/TCPHandler.cpp:490
19 DB::TCPHandler::processInsertQuery (this=0x7f7574f17000, connection_settings=...) at ../src/Server/TCPHandler.cpp:519
20 0x000000000f680ab9 in DB::TCPHandler::runImpl (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:268
21 0x000000000f68f297 in DB::TCPHandler::run (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:1414
22 0x0000000011fb81cf in Poco::Net::TCPServerConnection::start (this=0x0) at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
23 0x0000000011fb9be1 in Poco::Net::TCPServerDispatcher::run (this=0x7f752ab5fd00) at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:112
24 0x00000000120e71c9 in Poco::PooledThread::run (this=0x7f747d3a4580) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
25 0x00000000120e315a in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>) at ../contrib/poco/Foundation/src/Thread_POSIX.cpp:345
26 0x00007f760620aea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
27 0x00007f760613aeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
(gdb) p to.data.__end_-to.data.__begin_
$17 = 10
(gdb) p to.data.__begin_[9].column.px
$19 = (const DB::IColumn *) 0x7f7328392720
(gdb) p to.data.__begin_[8].column.px
$20 = (const DB::IColumn *) 0x0
(gdb) p to.data.__begin_[7].column.px
$21 = (const DB::IColumn *) 0x7f746f33d360
```
Line numbers matched with this version -
https://github.com/azat/ClickHouse/blob/f0e7cb16a729e2fbddd4f5804a48f7ea8928581b/src/Storages/StorageBuffer.cpp#L411
</details>
2020-12-21 20:48:18 +00:00
MutableColumnPtr last_col ;
2017-04-01 07:20:54 +00:00
try
{
2021-01-15 19:39:10 +00:00
MemoryTracker : : BlockerInThread temporarily_disable_memory_tracker ;
2020-12-31 19:10:39 +00:00
2021-10-22 16:25:48 +00:00
if ( to . rows ( ) = = 0 )
2017-04-01 07:20:54 +00:00
{
2021-10-22 16:25:48 +00:00
to = from ;
}
else
{
for ( size_t column_no = 0 , columns = to . columns ( ) ; column_no < columns ; + + column_no )
{
const IColumn & col_from = * from . getByPosition ( column_no ) . column . get ( ) ;
last_col = IColumn : : mutate ( std : : move ( to . getByPosition ( column_no ) . column ) ) ;
2017-04-01 07:20:54 +00:00
2021-10-22 16:25:48 +00:00
last_col - > insertRangeFrom ( col_from , 0 , rows ) ;
2017-04-01 07:20:54 +00:00
2021-10-22 16:25:48 +00:00
to . getByPosition ( column_no ) . column = std : : move ( last_col ) ;
}
2017-04-01 07:20:54 +00:00
}
}
catch ( . . . )
{
/// Rollback changes.
2020-12-31 19:10:39 +00:00
/// In case of rollback, it is better to ignore memory limits instead of abnormal server termination.
/// So ignore any memory limits, even global (since memory tracking has drift).
MemoryTracker : : BlockerInThread temporarily_ignore_any_memory_limits ( VariableContext : : Global ) ;
2017-04-01 07:20:54 +00:00
try
{
for ( size_t column_no = 0 , columns = to . columns ( ) ; column_no < columns ; + + column_no )
{
2017-12-15 20:48:46 +00:00
ColumnPtr & col_to = to . getByPosition ( column_no ) . column ;
Fix NULL dereference in Buffer rollback
<details>
Stacktrace:
```
(gdb) bt
0 DB::appendBlock (from=..., to=...) at ../src/Storages/StorageBuffer.cpp:411
1 DB::BufferBlockOutputStream::insertIntoBuffer (this=<optimized out>, block=..., buffer=...) at ../src/Storages/StorageBuffer.cpp:541
2 0x000000000f2e9d5f in DB::BufferBlockOutputStream::write (this=<optimized out>, block=...) at ../src/Storages/StorageBuffer.cpp:508
3 0x000000000ec426c4 in DB::PushingToViewsBlockOutputStream::write (this=0x7f74660faa18, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:160
4 0x000000000ec49633 in DB::AddingDefaultBlockOutputStream::write (this=0x7f74660f1b18, block=...) at ../src/DataStreams/AddingDefaultBlockOutputStream.cpp:10
5 0x000000000ec483ac in DB::SquashingBlockOutputStream::finalize (this=0x7f74660f1d18) at ../src/DataStreams/SquashingBlockOutputStream.cpp:30
6 0x000000000ec48429 in DB::SquashingBlockOutputStream::writeSuffix (this=0x7f74660f1d18) at ../src/DataStreams/SquashingBlockOutputStream.cpp:50
7 0x000000000ec43f8f in DB::PushingToViewsBlockOutputStream::writeSuffix (this=0x7f74660f8258) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:280
8 0x000000000ec43f8f in DB::PushingToViewsBlockOutputStream::writeSuffix (this=0x7f74b7ddea18) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:280
9 0x000000000f2e6748 in DB::StorageBuffer::writeBlockToDestination (this=<optimized out>, block=..., table=...) at ../src/Storages/StorageBuffer.cpp:820
10 0x000000000f2ea00b in DB::BufferBlockOutputStream::write (this=0x7f7574e11748, block=...) at ../src/Storages/StorageBuffer.cpp:469
11 0x000000000ec426c4 in DB::PushingToViewsBlockOutputStream::write (this=0x7f7574ed3658, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:160
12 0x000000000ec49633 in DB::AddingDefaultBlockOutputStream::write (this=0x7f7574e84518, block=...) at ../src/DataStreams/AddingDefaultBlockOutputStream.cpp:10
13 0x000000000ec482f4 in DB::SquashingBlockOutputStream::write (this=0x7f7574e84718, block=...) at ../src/DataStreams/SquashingBlockOutputStream.cpp:17
14 0x000000000ebe8bce in DB::CountingBlockOutputStream::write (this=0x7f7574ed3720, block=...) at ../src/DataStreams/CountingBlockOutputStream.cpp:17
15 0x000000000f68e834 in DB::TCPHandler::receiveData (this=<optimized out>, scalar=<optimized out>) at ../src/Server/TCPHandler.cpp:1168
16 0x000000000f68737c in DB::TCPHandler::receivePacket (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:918
17 0x000000000f688d2f in DB::TCPHandler::readDataNext (this=0x7f7574f17000, poll_interval=@0x7f6f1dff1f78: 10000000, receive_timeout=@0x7f6f1dff1f68: 300) at ../src/Server/TCPHandler.cpp:460
18 0x000000000f6878be in DB::TCPHandler::readData (this=0x7f7574f17000, connection_settings=...) at ../src/Server/TCPHandler.cpp:490
19 DB::TCPHandler::processInsertQuery (this=0x7f7574f17000, connection_settings=...) at ../src/Server/TCPHandler.cpp:519
20 0x000000000f680ab9 in DB::TCPHandler::runImpl (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:268
21 0x000000000f68f297 in DB::TCPHandler::run (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:1414
22 0x0000000011fb81cf in Poco::Net::TCPServerConnection::start (this=0x0) at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
23 0x0000000011fb9be1 in Poco::Net::TCPServerDispatcher::run (this=0x7f752ab5fd00) at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:112
24 0x00000000120e71c9 in Poco::PooledThread::run (this=0x7f747d3a4580) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
25 0x00000000120e315a in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>) at ../contrib/poco/Foundation/src/Thread_POSIX.cpp:345
26 0x00007f760620aea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
27 0x00007f760613aeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
(gdb) p to.data.__end_-to.data.__begin_
$17 = 10
(gdb) p to.data.__begin_[9].column.px
$19 = (const DB::IColumn *) 0x7f7328392720
(gdb) p to.data.__begin_[8].column.px
$20 = (const DB::IColumn *) 0x0
(gdb) p to.data.__begin_[7].column.px
$21 = (const DB::IColumn *) 0x7f746f33d360
```
Line numbers matched with this version -
https://github.com/azat/ClickHouse/blob/f0e7cb16a729e2fbddd4f5804a48f7ea8928581b/src/Storages/StorageBuffer.cpp#L411
</details>
2020-12-21 20:48:18 +00:00
/// If there is no column, then the exception was thrown in the middle of append, in the insertRangeFrom()
if ( ! col_to )
2020-12-21 22:21:54 +00:00
{
Fix NULL dereference in Buffer rollback
<details>
Stacktrace:
```
(gdb) bt
0 DB::appendBlock (from=..., to=...) at ../src/Storages/StorageBuffer.cpp:411
1 DB::BufferBlockOutputStream::insertIntoBuffer (this=<optimized out>, block=..., buffer=...) at ../src/Storages/StorageBuffer.cpp:541
2 0x000000000f2e9d5f in DB::BufferBlockOutputStream::write (this=<optimized out>, block=...) at ../src/Storages/StorageBuffer.cpp:508
3 0x000000000ec426c4 in DB::PushingToViewsBlockOutputStream::write (this=0x7f74660faa18, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:160
4 0x000000000ec49633 in DB::AddingDefaultBlockOutputStream::write (this=0x7f74660f1b18, block=...) at ../src/DataStreams/AddingDefaultBlockOutputStream.cpp:10
5 0x000000000ec483ac in DB::SquashingBlockOutputStream::finalize (this=0x7f74660f1d18) at ../src/DataStreams/SquashingBlockOutputStream.cpp:30
6 0x000000000ec48429 in DB::SquashingBlockOutputStream::writeSuffix (this=0x7f74660f1d18) at ../src/DataStreams/SquashingBlockOutputStream.cpp:50
7 0x000000000ec43f8f in DB::PushingToViewsBlockOutputStream::writeSuffix (this=0x7f74660f8258) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:280
8 0x000000000ec43f8f in DB::PushingToViewsBlockOutputStream::writeSuffix (this=0x7f74b7ddea18) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:280
9 0x000000000f2e6748 in DB::StorageBuffer::writeBlockToDestination (this=<optimized out>, block=..., table=...) at ../src/Storages/StorageBuffer.cpp:820
10 0x000000000f2ea00b in DB::BufferBlockOutputStream::write (this=0x7f7574e11748, block=...) at ../src/Storages/StorageBuffer.cpp:469
11 0x000000000ec426c4 in DB::PushingToViewsBlockOutputStream::write (this=0x7f7574ed3658, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:160
12 0x000000000ec49633 in DB::AddingDefaultBlockOutputStream::write (this=0x7f7574e84518, block=...) at ../src/DataStreams/AddingDefaultBlockOutputStream.cpp:10
13 0x000000000ec482f4 in DB::SquashingBlockOutputStream::write (this=0x7f7574e84718, block=...) at ../src/DataStreams/SquashingBlockOutputStream.cpp:17
14 0x000000000ebe8bce in DB::CountingBlockOutputStream::write (this=0x7f7574ed3720, block=...) at ../src/DataStreams/CountingBlockOutputStream.cpp:17
15 0x000000000f68e834 in DB::TCPHandler::receiveData (this=<optimized out>, scalar=<optimized out>) at ../src/Server/TCPHandler.cpp:1168
16 0x000000000f68737c in DB::TCPHandler::receivePacket (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:918
17 0x000000000f688d2f in DB::TCPHandler::readDataNext (this=0x7f7574f17000, poll_interval=@0x7f6f1dff1f78: 10000000, receive_timeout=@0x7f6f1dff1f68: 300) at ../src/Server/TCPHandler.cpp:460
18 0x000000000f6878be in DB::TCPHandler::readData (this=0x7f7574f17000, connection_settings=...) at ../src/Server/TCPHandler.cpp:490
19 DB::TCPHandler::processInsertQuery (this=0x7f7574f17000, connection_settings=...) at ../src/Server/TCPHandler.cpp:519
20 0x000000000f680ab9 in DB::TCPHandler::runImpl (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:268
21 0x000000000f68f297 in DB::TCPHandler::run (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:1414
22 0x0000000011fb81cf in Poco::Net::TCPServerConnection::start (this=0x0) at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
23 0x0000000011fb9be1 in Poco::Net::TCPServerDispatcher::run (this=0x7f752ab5fd00) at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:112
24 0x00000000120e71c9 in Poco::PooledThread::run (this=0x7f747d3a4580) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
25 0x00000000120e315a in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>) at ../contrib/poco/Foundation/src/Thread_POSIX.cpp:345
26 0x00007f760620aea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
27 0x00007f760613aeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
(gdb) p to.data.__end_-to.data.__begin_
$17 = 10
(gdb) p to.data.__begin_[9].column.px
$19 = (const DB::IColumn *) 0x7f7328392720
(gdb) p to.data.__begin_[8].column.px
$20 = (const DB::IColumn *) 0x0
(gdb) p to.data.__begin_[7].column.px
$21 = (const DB::IColumn *) 0x7f746f33d360
```
Line numbers matched with this version -
https://github.com/azat/ClickHouse/blob/f0e7cb16a729e2fbddd4f5804a48f7ea8928581b/src/Storages/StorageBuffer.cpp#L411
</details>
2020-12-21 20:48:18 +00:00
col_to = std : : move ( last_col ) ;
2020-12-21 22:21:54 +00:00
/// Suppress clang-tidy [bugprone-use-after-move]
last_col = { } ;
}
Fix NULL dereference in Buffer rollback
<details>
Stacktrace:
```
(gdb) bt
0 DB::appendBlock (from=..., to=...) at ../src/Storages/StorageBuffer.cpp:411
1 DB::BufferBlockOutputStream::insertIntoBuffer (this=<optimized out>, block=..., buffer=...) at ../src/Storages/StorageBuffer.cpp:541
2 0x000000000f2e9d5f in DB::BufferBlockOutputStream::write (this=<optimized out>, block=...) at ../src/Storages/StorageBuffer.cpp:508
3 0x000000000ec426c4 in DB::PushingToViewsBlockOutputStream::write (this=0x7f74660faa18, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:160
4 0x000000000ec49633 in DB::AddingDefaultBlockOutputStream::write (this=0x7f74660f1b18, block=...) at ../src/DataStreams/AddingDefaultBlockOutputStream.cpp:10
5 0x000000000ec483ac in DB::SquashingBlockOutputStream::finalize (this=0x7f74660f1d18) at ../src/DataStreams/SquashingBlockOutputStream.cpp:30
6 0x000000000ec48429 in DB::SquashingBlockOutputStream::writeSuffix (this=0x7f74660f1d18) at ../src/DataStreams/SquashingBlockOutputStream.cpp:50
7 0x000000000ec43f8f in DB::PushingToViewsBlockOutputStream::writeSuffix (this=0x7f74660f8258) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:280
8 0x000000000ec43f8f in DB::PushingToViewsBlockOutputStream::writeSuffix (this=0x7f74b7ddea18) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:280
9 0x000000000f2e6748 in DB::StorageBuffer::writeBlockToDestination (this=<optimized out>, block=..., table=...) at ../src/Storages/StorageBuffer.cpp:820
10 0x000000000f2ea00b in DB::BufferBlockOutputStream::write (this=0x7f7574e11748, block=...) at ../src/Storages/StorageBuffer.cpp:469
11 0x000000000ec426c4 in DB::PushingToViewsBlockOutputStream::write (this=0x7f7574ed3658, block=...) at ../src/DataStreams/PushingToViewsBlockOutputStream.cpp:160
12 0x000000000ec49633 in DB::AddingDefaultBlockOutputStream::write (this=0x7f7574e84518, block=...) at ../src/DataStreams/AddingDefaultBlockOutputStream.cpp:10
13 0x000000000ec482f4 in DB::SquashingBlockOutputStream::write (this=0x7f7574e84718, block=...) at ../src/DataStreams/SquashingBlockOutputStream.cpp:17
14 0x000000000ebe8bce in DB::CountingBlockOutputStream::write (this=0x7f7574ed3720, block=...) at ../src/DataStreams/CountingBlockOutputStream.cpp:17
15 0x000000000f68e834 in DB::TCPHandler::receiveData (this=<optimized out>, scalar=<optimized out>) at ../src/Server/TCPHandler.cpp:1168
16 0x000000000f68737c in DB::TCPHandler::receivePacket (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:918
17 0x000000000f688d2f in DB::TCPHandler::readDataNext (this=0x7f7574f17000, poll_interval=@0x7f6f1dff1f78: 10000000, receive_timeout=@0x7f6f1dff1f68: 300) at ../src/Server/TCPHandler.cpp:460
18 0x000000000f6878be in DB::TCPHandler::readData (this=0x7f7574f17000, connection_settings=...) at ../src/Server/TCPHandler.cpp:490
19 DB::TCPHandler::processInsertQuery (this=0x7f7574f17000, connection_settings=...) at ../src/Server/TCPHandler.cpp:519
20 0x000000000f680ab9 in DB::TCPHandler::runImpl (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:268
21 0x000000000f68f297 in DB::TCPHandler::run (this=0x7f7574f17000) at ../src/Server/TCPHandler.cpp:1414
22 0x0000000011fb81cf in Poco::Net::TCPServerConnection::start (this=0x0) at ../contrib/poco/Net/src/TCPServerConnection.cpp:43
23 0x0000000011fb9be1 in Poco::Net::TCPServerDispatcher::run (this=0x7f752ab5fd00) at ../contrib/poco/Net/src/TCPServerDispatcher.cpp:112
24 0x00000000120e71c9 in Poco::PooledThread::run (this=0x7f747d3a4580) at ../contrib/poco/Foundation/src/ThreadPool.cpp:199
25 0x00000000120e315a in Poco::ThreadImpl::runnableEntry (pThread=<optimized out>) at ../contrib/poco/Foundation/src/Thread_POSIX.cpp:345
26 0x00007f760620aea7 in start_thread (arg=<optimized out>) at pthread_create.c:477
27 0x00007f760613aeaf in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95
(gdb) p to.data.__end_-to.data.__begin_
$17 = 10
(gdb) p to.data.__begin_[9].column.px
$19 = (const DB::IColumn *) 0x7f7328392720
(gdb) p to.data.__begin_[8].column.px
$20 = (const DB::IColumn *) 0x0
(gdb) p to.data.__begin_[7].column.px
$21 = (const DB::IColumn *) 0x7f746f33d360
```
Line numbers matched with this version -
https://github.com/azat/ClickHouse/blob/f0e7cb16a729e2fbddd4f5804a48f7ea8928581b/src/Storages/StorageBuffer.cpp#L411
</details>
2020-12-21 20:48:18 +00:00
/// But if there is still nothing, abort
if ( ! col_to )
throw Exception ( " No column to rollback " , ErrorCodes : : LOGICAL_ERROR ) ;
2017-04-01 07:20:54 +00:00
if ( col_to - > size ( ) ! = old_rows )
2020-05-14 08:30:18 +00:00
col_to = col_to - > cut ( 0 , old_rows ) ;
2017-04-01 07:20:54 +00:00
}
}
catch ( . . . )
{
/// In case when we cannot rollback, do not leave incorrect state in memory.
std : : terminate ( ) ;
}
throw ;
}
2014-10-27 04:18:13 +00:00
}
2021-07-23 14:25:35 +00:00
class BufferSink : public SinkToStorage
2014-10-26 00:01:36 +00:00
{
public :
2021-07-23 14:25:35 +00:00
explicit BufferSink (
2020-06-16 15:51:29 +00:00
StorageBuffer & storage_ ,
const StorageMetadataPtr & metadata_snapshot_ )
2021-07-23 14:25:35 +00:00
: SinkToStorage ( metadata_snapshot_ - > getSampleBlock ( ) )
, storage ( storage_ )
2020-06-16 15:51:29 +00:00
, metadata_snapshot ( metadata_snapshot_ )
2017-04-01 07:20:54 +00:00
{
2018-11-19 06:14:36 +00:00
// Check table structure.
2021-09-03 17:29:36 +00:00
metadata_snapshot - > check ( getHeader ( ) , true ) ;
2021-07-23 14:25:35 +00:00
}
2018-11-19 06:14:36 +00:00
2021-07-23 19:33:59 +00:00
String getName ( ) const override { return " BufferSink " ; }
2021-07-23 14:25:35 +00:00
void consume ( Chunk chunk ) override
{
size_t rows = chunk . getNumRows ( ) ;
2017-04-01 07:20:54 +00:00
if ( ! rows )
return ;
2021-09-03 17:29:36 +00:00
auto block = getHeader ( ) . cloneWithColumns ( chunk . getColumns ( ) ) ;
2021-07-23 14:25:35 +00:00
2017-04-01 07:20:54 +00:00
StoragePtr destination ;
2020-02-17 19:28:25 +00:00
if ( storage . destination_id )
2017-04-01 07:20:54 +00:00
{
2021-04-10 23:33:54 +00:00
destination = DatabaseCatalog : : instance ( ) . tryGetTable ( storage . destination_id , storage . getContext ( ) ) ;
2018-11-19 06:14:36 +00:00
if ( destination . get ( ) = = & storage )
throw Exception ( " Destination table is myself. Write will cause infinite loop. " , ErrorCodes : : INFINITE_LOOP ) ;
2017-04-01 07:20:54 +00:00
}
size_t bytes = block . bytes ( ) ;
2021-05-12 18:12:36 +00:00
storage . lifetime_writes . rows + = rows ;
storage . lifetime_writes . bytes + = bytes ;
2020-07-11 21:58:32 +00:00
2017-04-01 07:20:54 +00:00
/// If the block already exceeds the maximum limit, then we skip the buffer.
if ( rows > storage . max_thresholds . rows | | bytes > storage . max_thresholds . bytes )
{
2020-02-17 19:28:25 +00:00
if ( storage . destination_id )
2017-04-01 07:20:54 +00:00
{
2021-04-15 18:00:16 +00:00
LOG_DEBUG ( storage . log , " Writing block with {} rows, {} bytes directly. " , rows , bytes ) ;
2017-04-01 07:20:54 +00:00
storage . writeBlockToDestination ( block , destination ) ;
2019-10-30 19:58:19 +00:00
}
2017-04-01 07:20:54 +00:00
return ;
}
/// We distribute the load on the shards by the stream number.
2020-02-02 02:35:47 +00:00
const auto start_shard_num = getThreadId ( ) % storage . num_shards ;
2017-04-01 07:20:54 +00:00
/// We loop through the buffers, trying to lock mutex. No more than one lap.
auto shard_num = start_shard_num ;
StorageBuffer : : Buffer * least_busy_buffer = nullptr ;
std : : unique_lock < std : : mutex > least_busy_lock ;
size_t least_busy_shard_rows = 0 ;
for ( size_t try_no = 0 ; try_no < storage . num_shards ; + + try_no )
{
2021-04-06 18:12:40 +00:00
std : : unique_lock lock ( storage . buffers [ shard_num ] . tryLock ( ) ) ;
2017-04-01 07:20:54 +00:00
if ( lock . owns_lock ( ) )
{
size_t num_rows = storage . buffers [ shard_num ] . data . rows ( ) ;
if ( ! least_busy_buffer | | num_rows < least_busy_shard_rows )
{
least_busy_buffer = & storage . buffers [ shard_num ] ;
least_busy_lock = std : : move ( lock ) ;
least_busy_shard_rows = num_rows ;
}
}
shard_num = ( shard_num + 1 ) % storage . num_shards ;
}
/// If you still can not lock anything at once, then we'll wait on mutex.
if ( ! least_busy_buffer )
2018-08-24 14:51:34 +00:00
{
least_busy_buffer = & storage . buffers [ start_shard_num ] ;
2021-04-06 18:12:40 +00:00
least_busy_lock = least_busy_buffer - > lockForWriting ( ) ;
2018-08-24 14:51:34 +00:00
}
insertIntoBuffer ( block , * least_busy_buffer ) ;
2020-04-16 07:48:49 +00:00
least_busy_lock . unlock ( ) ;
storage . reschedule ( ) ;
2017-04-01 07:20:54 +00:00
}
2014-10-26 00:01:36 +00:00
private :
2017-04-01 07:20:54 +00:00
StorageBuffer & storage ;
2020-06-16 15:51:29 +00:00
StorageMetadataPtr metadata_snapshot ;
2017-04-01 07:20:54 +00:00
2018-08-24 14:51:34 +00:00
void insertIntoBuffer ( const Block & block , StorageBuffer : : Buffer & buffer )
2017-04-01 07:20:54 +00:00
{
2017-08-04 14:00:26 +00:00
time_t current_time = time ( nullptr ) ;
2017-04-01 07:20:54 +00:00
/// Sort the columns in the block. This is necessary to make it easier to concatenate the blocks later.
Block sorted_block = block . sortColumns ( ) ;
if ( ! buffer . data )
{
buffer . data = sorted_block . cloneEmpty ( ) ;
2021-05-12 18:12:36 +00:00
storage . total_writes . rows + = buffer . data . rows ( ) ;
storage . total_writes . bytes + = buffer . data . allocatedBytes ( ) ;
2017-04-01 07:20:54 +00:00
}
2021-04-12 06:04:38 +00:00
else if ( storage . checkThresholds ( buffer , /* direct= */ true , current_time , sorted_block . rows ( ) , sorted_block . bytes ( ) ) )
2017-04-01 07:20:54 +00:00
{
/** If, after inserting the buffer, the constraints are exceeded, then we will reset the buffer.
* This also protects against unlimited consumption of RAM , since if it is impossible to write to the table ,
* an exception will be thrown , and new data will not be added to the buffer .
*/
2019-12-22 22:35:53 +00:00
storage . flushBuffer ( buffer , false /* check_thresholds */ , true /* locked */ ) ;
2017-04-01 07:20:54 +00:00
}
if ( ! buffer . first_write_time )
buffer . first_write_time = current_time ;
2021-05-12 18:12:36 +00:00
size_t old_rows = buffer . data . rows ( ) ;
size_t old_bytes = buffer . data . allocatedBytes ( ) ;
2020-07-11 21:58:32 +00:00
appendBlock ( sorted_block , buffer . data ) ;
2021-05-12 18:12:36 +00:00
storage . total_writes . rows + = ( buffer . data . rows ( ) - old_rows ) ;
storage . total_writes . bytes + = ( buffer . data . allocatedBytes ( ) - old_bytes ) ;
2017-04-01 07:20:54 +00:00
}
2014-10-26 00:01:36 +00:00
} ;
2021-07-23 14:25:35 +00:00
SinkToStoragePtr StorageBuffer : : write ( const ASTPtr & /*query*/ , const StorageMetadataPtr & metadata_snapshot , ContextPtr /*context*/ )
2014-10-26 00:01:36 +00:00
{
2021-07-23 19:33:59 +00:00
return std : : make_shared < BufferSink > ( * this , metadata_snapshot ) ;
2014-10-26 00:01:36 +00:00
}
2020-06-17 09:38:47 +00:00
bool StorageBuffer : : mayBenefitFromIndexForIn (
2021-04-10 23:33:54 +00:00
const ASTPtr & left_in_operand , ContextPtr query_context , const StorageMetadataPtr & /*metadata_snapshot*/ ) const
2018-03-16 09:00:04 +00:00
{
2020-02-17 19:28:25 +00:00
if ( ! destination_id )
2018-03-16 09:00:04 +00:00
return false ;
2020-05-28 23:01:18 +00:00
auto destination = DatabaseCatalog : : instance ( ) . getTable ( destination_id , query_context ) ;
2018-03-16 09:00:04 +00:00
if ( destination . get ( ) = = this )
throw Exception ( " Destination table is myself. Read will cause infinite loop. " , ErrorCodes : : INFINITE_LOOP ) ;
2020-06-17 09:38:47 +00:00
return destination - > mayBenefitFromIndexForIn ( left_in_operand , query_context , destination - > getInMemoryMetadataPtr ( ) ) ;
2018-03-16 09:00:04 +00:00
}
2017-06-06 17:06:14 +00:00
void StorageBuffer : : startup ( )
{
2021-04-10 23:33:54 +00:00
if ( getContext ( ) - > getSettingsRef ( ) . readonly )
2018-02-01 13:52:29 +00:00
{
2021-01-27 18:05:18 +00:00
LOG_WARNING ( log , " Storage {} is run with readonly settings, it will not be able to insert data. Set appropriate buffer_profile to fix this. " , getName ( ) ) ;
2018-02-01 13:52:29 +00:00
}
2020-04-16 07:48:49 +00:00
flush_handle - > activateAndSchedule ( ) ;
2017-06-06 17:06:14 +00:00
}
2021-05-12 18:14:00 +00:00
void StorageBuffer : : flush ( )
2014-10-26 00:01:36 +00:00
{
2020-05-02 16:54:20 +00:00
if ( ! flush_handle )
return ;
2020-04-16 07:48:49 +00:00
flush_handle - > deactivate ( ) ;
2017-04-01 07:20:54 +00:00
try
{
2021-04-10 23:33:54 +00:00
optimize ( nullptr /*query*/ , getInMemoryMetadataPtr ( ) , { } /*partition*/ , false /*final*/ , false /*deduplicate*/ , { } , getContext ( ) ) ;
2017-04-01 07:20:54 +00:00
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
2014-10-26 00:21:06 +00:00
}
2016-11-18 09:02:49 +00:00
/** NOTE If you do OPTIMIZE after insertion,
* it does not guarantee , that all data will be in destination table at the time of next SELECT just after OPTIMIZE .
*
* Because in case if there was already running flushBuffer method ,
* then call to flushBuffer inside OPTIMIZE will see empty buffer and return quickly ,
* but at the same time , the already running flushBuffer method possibly is not finished ,
* so next SELECT will observe missing data .
*
* This kind of race condition make very hard to implement proper tests .
*/
2020-06-17 13:39:26 +00:00
bool StorageBuffer : : optimize (
const ASTPtr & /*query*/ ,
const StorageMetadataPtr & /*metadata_snapshot*/ ,
const ASTPtr & partition ,
bool final ,
bool deduplicate ,
2020-12-01 09:10:12 +00:00
const Names & /* deduplicate_by_columns */ ,
2021-04-10 23:33:54 +00:00
ContextPtr /*context*/ )
2014-10-26 00:21:06 +00:00
{
2017-09-06 20:34:26 +00:00
if ( partition )
2017-04-01 07:20:54 +00:00
throw Exception ( " Partition cannot be specified when optimizing table of type Buffer " , ErrorCodes : : NOT_IMPLEMENTED ) ;
2016-05-16 18:43:38 +00:00
2017-04-01 07:20:54 +00:00
if ( final )
throw Exception ( " FINAL cannot be specified when optimizing table of type Buffer " , ErrorCodes : : NOT_IMPLEMENTED ) ;
2014-10-26 00:21:06 +00:00
2017-04-13 13:13:08 +00:00
if ( deduplicate )
throw Exception ( " DEDUPLICATE cannot be specified when optimizing table of type Buffer " , ErrorCodes : : NOT_IMPLEMENTED ) ;
2020-09-23 12:19:36 +00:00
flushAllBuffers ( false , true ) ;
2017-04-01 07:20:54 +00:00
return true ;
2014-10-26 00:01:36 +00:00
}
2021-01-27 18:05:18 +00:00
bool StorageBuffer : : supportsPrewhere ( ) const
{
if ( ! destination_id )
return false ;
2021-04-10 23:33:54 +00:00
auto dest = DatabaseCatalog : : instance ( ) . tryGetTable ( destination_id , getContext ( ) ) ;
2021-01-27 18:05:18 +00:00
if ( dest & & dest . get ( ) ! = this )
return dest - > supportsPrewhere ( ) ;
return false ;
}
2014-10-26 00:01:36 +00:00
2021-04-12 06:04:38 +00:00
bool StorageBuffer : : checkThresholds ( const Buffer & buffer , bool direct , time_t current_time , size_t additional_rows , size_t additional_bytes ) const
2014-10-26 00:01:36 +00:00
{
2017-04-01 07:20:54 +00:00
time_t time_passed = 0 ;
if ( buffer . first_write_time )
time_passed = current_time - buffer . first_write_time ;
2014-10-26 00:01:36 +00:00
2017-04-01 07:20:54 +00:00
size_t rows = buffer . data . rows ( ) + additional_rows ;
size_t bytes = buffer . data . bytes ( ) + additional_bytes ;
2014-10-26 00:01:36 +00:00
2021-04-12 06:04:38 +00:00
return checkThresholdsImpl ( direct , rows , bytes , time_passed ) ;
2015-12-09 06:55:49 +00:00
}
2021-04-12 06:04:38 +00:00
bool StorageBuffer : : checkThresholdsImpl ( bool direct , size_t rows , size_t bytes , time_t time_passed ) const
2015-12-09 06:55:49 +00:00
{
2017-04-01 07:20:54 +00:00
if ( time_passed > min_thresholds . time & & rows > min_thresholds . rows & & bytes > min_thresholds . bytes )
{
ProfileEvents : : increment ( ProfileEvents : : StorageBufferPassedAllMinThresholds ) ;
return true ;
}
if ( time_passed > max_thresholds . time )
{
ProfileEvents : : increment ( ProfileEvents : : StorageBufferPassedTimeMaxThreshold ) ;
return true ;
}
if ( rows > max_thresholds . rows )
{
ProfileEvents : : increment ( ProfileEvents : : StorageBufferPassedRowsMaxThreshold ) ;
return true ;
}
if ( bytes > max_thresholds . bytes )
{
ProfileEvents : : increment ( ProfileEvents : : StorageBufferPassedBytesMaxThreshold ) ;
return true ;
}
2021-04-12 06:04:38 +00:00
if ( ! direct )
{
if ( flush_thresholds . time & & time_passed > flush_thresholds . time )
{
ProfileEvents : : increment ( ProfileEvents : : StorageBufferPassedTimeFlushThreshold ) ;
return true ;
}
if ( flush_thresholds . rows & & rows > flush_thresholds . rows )
{
ProfileEvents : : increment ( ProfileEvents : : StorageBufferPassedRowsFlushThreshold ) ;
return true ;
}
if ( flush_thresholds . bytes & & bytes > flush_thresholds . bytes )
{
ProfileEvents : : increment ( ProfileEvents : : StorageBufferPassedBytesFlushThreshold ) ;
return true ;
}
}
2017-04-01 07:20:54 +00:00
return false ;
2014-10-26 00:01:36 +00:00
}
2020-09-23 12:19:36 +00:00
void StorageBuffer : : flushAllBuffers ( bool check_thresholds , bool reset_blocks_structure )
2014-12-03 13:28:17 +00:00
{
2017-04-01 07:20:54 +00:00
for ( auto & buf : buffers )
2020-09-23 12:19:36 +00:00
flushBuffer ( buf , check_thresholds , false , reset_blocks_structure ) ;
2014-12-03 13:28:17 +00:00
}
2020-09-23 12:19:36 +00:00
void StorageBuffer : : flushBuffer ( Buffer & buffer , bool check_thresholds , bool locked , bool reset_block_structure )
2014-10-26 00:01:36 +00:00
{
2017-04-01 07:20:54 +00:00
Block block_to_write ;
2017-08-04 14:00:26 +00:00
time_t current_time = time ( nullptr ) ;
2017-04-01 07:20:54 +00:00
size_t rows = 0 ;
size_t bytes = 0 ;
time_t time_passed = 0 ;
2021-04-06 18:12:40 +00:00
std : : optional < std : : unique_lock < std : : mutex > > lock ;
2018-08-24 14:51:34 +00:00
if ( ! locked )
2021-04-06 18:12:40 +00:00
lock . emplace ( buffer . lockForReading ( ) ) ;
2017-04-01 07:20:54 +00:00
2021-10-22 16:25:48 +00:00
block_to_write = buffer . data . cloneEmpty ( ) ;
2017-09-07 21:04:48 +00:00
rows = buffer . data . rows ( ) ;
bytes = buffer . data . bytes ( ) ;
if ( buffer . first_write_time )
time_passed = current_time - buffer . first_write_time ;
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
if ( check_thresholds )
{
2021-04-12 06:04:38 +00:00
if ( ! checkThresholdsImpl ( /* direct= */ false , rows , bytes , time_passed ) )
2017-09-07 21:04:48 +00:00
return ;
}
else
{
if ( rows = = 0 )
return ;
}
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
buffer . data . swap ( block_to_write ) ;
buffer . first_write_time = 0 ;
2017-04-01 07:20:54 +00:00
2021-05-12 18:12:36 +00:00
size_t block_rows = block_to_write . rows ( ) ;
size_t block_bytes = block_to_write . bytes ( ) ;
2021-05-24 13:55:05 +00:00
size_t block_allocated_bytes_delta = block_to_write . allocatedBytes ( ) - buffer . data . allocatedBytes ( ) ;
2021-05-12 18:12:36 +00:00
CurrentMetrics : : sub ( CurrentMetrics : : StorageBufferRows , block_rows ) ;
CurrentMetrics : : sub ( CurrentMetrics : : StorageBufferBytes , block_bytes ) ;
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
ProfileEvents : : increment ( ProfileEvents : : StorageBufferFlush ) ;
2017-04-01 07:20:54 +00:00
2020-02-17 19:28:25 +00:00
if ( ! destination_id )
2021-01-21 18:11:39 +00:00
{
2021-05-12 18:12:36 +00:00
total_writes . rows - = block_rows ;
2021-05-24 13:55:05 +00:00
total_writes . bytes - = block_allocated_bytes_delta ;
2021-05-12 18:12:36 +00:00
2021-04-15 18:00:16 +00:00
LOG_DEBUG ( log , " Flushing buffer with {} rows (discarded), {} bytes, age {} seconds {}. " , rows , bytes , time_passed , ( check_thresholds ? " (bg) " : " (direct) " ) ) ;
2017-09-07 21:04:48 +00:00
return ;
2021-01-21 18:11:39 +00:00
}
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
/** For simplicity, buffer is locked during write.
2017-11-15 19:47:49 +00:00
* We could unlock buffer temporary , but it would lead to too many difficulties :
2017-09-07 21:04:48 +00:00
* - data , that is written , will not be visible for SELECTs ;
* - new data could be appended to buffer , and in case of exception , we must merge it with old data , that has not been written ;
* - this could lead to infinite memory growth .
*/
2021-01-21 18:11:39 +00:00
Stopwatch watch ;
2017-09-07 21:04:48 +00:00
try
{
2021-04-10 23:33:54 +00:00
writeBlockToDestination ( block_to_write , DatabaseCatalog : : instance ( ) . tryGetTable ( destination_id , getContext ( ) ) ) ;
2020-09-23 12:19:36 +00:00
if ( reset_block_structure )
buffer . data . clear ( ) ;
2017-09-07 21:04:48 +00:00
}
catch ( . . . )
{
ProfileEvents : : increment ( ProfileEvents : : StorageBufferErrorOnFlush ) ;
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
/// Return the block to its place in the buffer.
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
CurrentMetrics : : add ( CurrentMetrics : : StorageBufferRows , block_to_write . rows ( ) ) ;
CurrentMetrics : : add ( CurrentMetrics : : StorageBufferBytes , block_to_write . bytes ( ) ) ;
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
buffer . data . swap ( block_to_write ) ;
2017-04-01 07:20:54 +00:00
2021-05-08 14:43:03 +00:00
if ( ! buffer . first_write_time ) // -V547
2017-09-07 21:04:48 +00:00
buffer . first_write_time = current_time ;
2017-04-01 07:20:54 +00:00
2017-09-07 21:04:48 +00:00
/// After a while, the next write attempt will happen.
throw ;
2017-04-01 07:20:54 +00:00
}
2021-01-21 18:11:39 +00:00
2021-05-12 18:12:36 +00:00
total_writes . rows - = block_rows ;
2021-05-24 13:55:05 +00:00
total_writes . bytes - = block_allocated_bytes_delta ;
2021-05-12 18:12:36 +00:00
2021-01-21 18:11:39 +00:00
UInt64 milliseconds = watch . elapsedMilliseconds ( ) ;
2021-04-15 18:00:16 +00:00
LOG_DEBUG ( log , " Flushing buffer with {} rows, {} bytes, age {} seconds, took {} ms {}. " , rows , bytes , time_passed , milliseconds , ( check_thresholds ? " (bg) " : " (direct) " ) ) ;
2014-10-27 04:18:13 +00:00
}
2014-10-26 00:01:36 +00:00
2014-10-27 04:18:13 +00:00
void StorageBuffer : : writeBlockToDestination ( const Block & block , StoragePtr table )
{
2020-02-17 19:28:25 +00:00
if ( ! destination_id | | ! block )
2017-04-01 07:20:54 +00:00
return ;
if ( ! table )
{
2020-05-23 22:24:01 +00:00
LOG_ERROR ( log , " Destination table {} doesn't exist. Block of data is discarded. " , destination_id . getNameForLogs ( ) ) ;
2017-04-01 07:20:54 +00:00
return ;
}
2020-06-16 12:48:10 +00:00
auto destination_metadata_snapshot = table - > getInMemoryMetadataPtr ( ) ;
2017-04-01 07:20:54 +00:00
2021-01-15 19:39:10 +00:00
MemoryTracker : : BlockerInThread temporarily_disable_memory_tracker ;
2019-10-30 19:58:19 +00:00
2017-04-01 07:20:54 +00:00
auto insert = std : : make_shared < ASTInsertQuery > ( ) ;
2020-03-02 20:23:58 +00:00
insert - > table_id = destination_id ;
2017-04-01 07:20:54 +00:00
/** We will insert columns that are the intersection set of columns of the buffer table and the subordinate table.
* This will support some of the cases ( but not all ) when the table structure does not match .
*/
2020-06-16 15:51:29 +00:00
Block structure_of_destination_table = allow_materialized ? destination_metadata_snapshot - > getSampleBlock ( )
: destination_metadata_snapshot - > getSampleBlockNonMaterialized ( ) ;
2018-11-19 06:14:36 +00:00
Block block_to_write ;
2021-06-15 19:55:21 +00:00
for ( size_t i : collections : : range ( 0 , structure_of_destination_table . columns ( ) ) )
2017-04-01 07:20:54 +00:00
{
auto dst_col = structure_of_destination_table . getByPosition ( i ) ;
if ( block . has ( dst_col . name ) )
{
2018-11-19 06:14:36 +00:00
auto column = block . getByName ( dst_col . name ) ;
if ( ! column . type - > equals ( * dst_col . type ) )
2017-04-01 07:20:54 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_WARNING ( log , " Destination table {} have different type of column {} ({} != {}). Block of data is converted. " , destination_id . getNameForLogs ( ) , backQuoteIfNeed ( column . name ) , dst_col . type - > getName ( ) , column . type - > getName ( ) ) ;
2020-04-14 21:05:45 +00:00
column . column = castColumn ( column , dst_col . type ) ;
2018-11-19 06:14:36 +00:00
column . type = dst_col . type ;
2017-04-01 07:20:54 +00:00
}
2018-11-19 06:14:36 +00:00
block_to_write . insert ( column ) ;
2017-04-01 07:20:54 +00:00
}
}
2018-11-19 06:14:36 +00:00
if ( block_to_write . columns ( ) = = 0 )
2017-04-01 07:20:54 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_ERROR ( log , " Destination table {} have no common columns with block in buffer. Block of data is discarded. " , destination_id . getNameForLogs ( ) ) ;
2017-04-01 07:20:54 +00:00
return ;
}
2018-11-19 06:14:36 +00:00
if ( block_to_write . columns ( ) ! = block . columns ( ) )
2020-05-23 22:24:01 +00:00
LOG_WARNING ( log , " Not all columns from block in buffer exist in destination table {}. Some columns are discarded. " , destination_id . getNameForLogs ( ) ) ;
2017-04-01 07:20:54 +00:00
auto list_of_columns = std : : make_shared < ASTExpressionList > ( ) ;
insert - > columns = list_of_columns ;
2018-11-19 06:14:36 +00:00
list_of_columns - > children . reserve ( block_to_write . columns ( ) ) ;
2018-11-19 15:20:34 +00:00
for ( const auto & column : block_to_write )
2018-11-19 06:14:36 +00:00
list_of_columns - > children . push_back ( std : : make_shared < ASTIdentifier > ( column . name ) ) ;
2017-04-01 07:20:54 +00:00
2021-04-10 23:33:54 +00:00
auto insert_context = Context : : createCopy ( getContext ( ) ) ;
insert_context - > makeQueryContext ( ) ;
2020-07-06 17:24:33 +00:00
InterpreterInsertQuery interpreter { insert , insert_context , allow_materialized } ;
2017-04-01 07:20:54 +00:00
auto block_io = interpreter . execute ( ) ;
2021-09-16 17:40:42 +00:00
PushingPipelineExecutor executor ( block_io . pipeline ) ;
2021-09-03 17:29:36 +00:00
executor . start ( ) ;
executor . push ( std : : move ( block_to_write ) ) ;
executor . finish ( ) ;
2014-10-26 00:01:36 +00:00
}
2020-12-25 01:20:09 +00:00
void StorageBuffer : : backgroundFlush ( )
2014-10-26 00:01:36 +00:00
{
2020-04-16 07:48:49 +00:00
try
{
flushAllBuffers ( true ) ;
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
2017-04-01 07:20:54 +00:00
2020-04-16 07:48:49 +00:00
reschedule ( ) ;
}
void StorageBuffer : : reschedule ( )
{
time_t min_first_write_time = std : : numeric_limits < time_t > : : max ( ) ;
time_t rows = 0 ;
for ( auto & buffer : buffers )
2017-04-01 07:20:54 +00:00
{
Reduce lock contention for multiple layers of the Buffer engine
Otherwise you can see something like this for the following query:
```sql
WITH
arrayMap(x -> demangle(addressToSymbol(x)), s.trace) AS trace_array,
arrayStringConcat(trace_array, '\n') AS trace_string
SELECT
p.thread_id,
p.query_id,
p.query,
trace_string
FROM
(
SELECT
query_id,
query,
arrayJoin(thread_ids) AS thread_id
FROM system.processes
) AS p
INNER JOIN system.stack_trace AS s ON p.thread_id = s.thread_id
ORDER BY p.query_id ASC
SETTINGS enable_global_with_statement = 0, allow_introspection_functions = 1
FORMAT PrettyCompactNoEscapes
```
Lots of the following:
```sql
INSERT INTO buffer (...) VALUES
__lll_lock_wait
pthread_mutex_lock
std::__1::mutex::lock()
DB::StorageBuffer::reschedule()
DB::BufferBlockOutputStream::write(DB::Block const&)
```
That will wait one of this:
```
INSERT INTO buffer (...) VALUES
...
DB::PushingToViewsBlockOutputStream::write(DB::Block const&)
DB::AddingDefaultBlockOutputStream::write(DB::Block const&)
DB::SquashingBlockOutputStream::finalize()
DB::SquashingBlockOutputStream::writeSuffix()
DB::PushingToViewsBlockOutputStream::writeSuffix()
DB::StorageBuffer::writeBlockToDestination(DB::Block const&, std::__1::shared_ptr<DB::IStorage>)
DB::StorageBuffer::flushBuffer(DB::StorageBuffer::Buffer&, bool, bool, bool)
```
P.S. we cannot simply unlock the buffer during flushing, see comments in
the code
2021-01-21 18:11:39 +00:00
/// try_to_lock here to avoid waiting for other layers flushing to be finished,
/// since the buffer table may:
/// - push to Distributed table, that may take too much time,
/// - push to table with materialized views attached,
/// this is also may take some time.
///
/// try_to_lock is also ok for background flush, since if there is
/// INSERT contended, then the reschedule will be done after
/// INSERT will be done.
2021-04-06 18:12:40 +00:00
std : : unique_lock lock ( buffer . tryLock ( ) ) ;
Reduce lock contention for multiple layers of the Buffer engine
Otherwise you can see something like this for the following query:
```sql
WITH
arrayMap(x -> demangle(addressToSymbol(x)), s.trace) AS trace_array,
arrayStringConcat(trace_array, '\n') AS trace_string
SELECT
p.thread_id,
p.query_id,
p.query,
trace_string
FROM
(
SELECT
query_id,
query,
arrayJoin(thread_ids) AS thread_id
FROM system.processes
) AS p
INNER JOIN system.stack_trace AS s ON p.thread_id = s.thread_id
ORDER BY p.query_id ASC
SETTINGS enable_global_with_statement = 0, allow_introspection_functions = 1
FORMAT PrettyCompactNoEscapes
```
Lots of the following:
```sql
INSERT INTO buffer (...) VALUES
__lll_lock_wait
pthread_mutex_lock
std::__1::mutex::lock()
DB::StorageBuffer::reschedule()
DB::BufferBlockOutputStream::write(DB::Block const&)
```
That will wait one of this:
```
INSERT INTO buffer (...) VALUES
...
DB::PushingToViewsBlockOutputStream::write(DB::Block const&)
DB::AddingDefaultBlockOutputStream::write(DB::Block const&)
DB::SquashingBlockOutputStream::finalize()
DB::SquashingBlockOutputStream::writeSuffix()
DB::PushingToViewsBlockOutputStream::writeSuffix()
DB::StorageBuffer::writeBlockToDestination(DB::Block const&, std::__1::shared_ptr<DB::IStorage>)
DB::StorageBuffer::flushBuffer(DB::StorageBuffer::Buffer&, bool, bool, bool)
```
P.S. we cannot simply unlock the buffer during flushing, see comments in
the code
2021-01-21 18:11:39 +00:00
if ( lock . owns_lock ( ) )
{
min_first_write_time = buffer . first_write_time ;
rows + = buffer . data . rows ( ) ;
}
2020-04-16 07:48:49 +00:00
}
/// will be rescheduled via INSERT
if ( ! rows )
return ;
time_t current_time = time ( nullptr ) ;
time_t time_passed = current_time - min_first_write_time ;
size_t min = std : : max < ssize_t > ( min_thresholds . time - time_passed , 1 ) ;
size_t max = std : : max < ssize_t > ( max_thresholds . time - time_passed , 1 ) ;
flush_handle - > scheduleAfter ( std : : min ( min , max ) * 1000 ) ;
2014-10-26 00:01:36 +00:00
}
2021-04-10 23:33:54 +00:00
void StorageBuffer : : checkAlterIsPossible ( const AlterCommands & commands , ContextPtr local_context ) const
2019-12-26 18:17:05 +00:00
{
2021-04-10 23:33:54 +00:00
auto name_deps = getDependentViewsByColumn ( local_context ) ;
2019-12-26 18:17:05 +00:00
for ( const auto & command : commands )
{
2021-10-29 12:31:18 +00:00
if ( command . type ! = AlterCommand : : Type : : ADD_COLUMN & & command . type ! = AlterCommand : : Type : : MODIFY_COLUMN
& & command . type ! = AlterCommand : : Type : : DROP_COLUMN & & command . type ! = AlterCommand : : Type : : COMMENT_COLUMN
& & command . type ! = AlterCommand : : Type : : COMMENT_TABLE )
2021-09-06 14:24:03 +00:00
throw Exception ( ErrorCodes : : NOT_IMPLEMENTED , " Alter of type '{}' is not supported by storage {} " ,
command . type , getName ( ) ) ;
2021-04-30 05:02:32 +00:00
if ( command . type = = AlterCommand : : Type : : DROP_COLUMN & & ! command . clear )
2021-02-28 05:24:39 +00:00
{
2021-02-28 07:42:08 +00:00
const auto & deps_mv = name_deps [ command . column_name ] ;
2021-02-28 05:24:39 +00:00
if ( ! deps_mv . empty ( ) )
{
throw Exception (
" Trying to ALTER DROP column " + backQuoteIfNeed ( command . column_name ) + " which is referenced by materialized view "
+ toString ( deps_mv ) ,
ErrorCodes : : ALTER_OF_COLUMN_IS_FORBIDDEN ) ;
}
}
2019-12-26 18:17:05 +00:00
}
}
2020-11-25 13:47:32 +00:00
std : : optional < UInt64 > StorageBuffer : : totalRows ( const Settings & settings ) const
2020-03-29 07:50:47 +00:00
{
std : : optional < UInt64 > underlying_rows ;
2021-04-10 23:33:54 +00:00
auto underlying = DatabaseCatalog : : instance ( ) . tryGetTable ( destination_id , getContext ( ) ) ;
2020-03-29 07:50:47 +00:00
if ( underlying )
2020-11-25 13:47:32 +00:00
underlying_rows = underlying - > totalRows ( settings ) ;
2020-03-29 07:50:47 +00:00
if ( ! underlying_rows )
return underlying_rows ;
2021-05-12 18:12:36 +00:00
return total_writes . rows + * underlying_rows ;
2020-03-29 07:50:47 +00:00
}
2014-10-26 00:01:36 +00:00
2020-11-25 13:47:32 +00:00
std : : optional < UInt64 > StorageBuffer : : totalBytes ( const Settings & /*settings*/ ) const
2020-03-29 08:54:00 +00:00
{
2021-05-12 18:12:36 +00:00
return total_writes . bytes ;
2020-03-29 08:54:00 +00:00
}
2021-10-25 17:49:49 +00:00
void StorageBuffer : : alter ( const AlterCommands & params , ContextPtr local_context , AlterLockHolder & )
2014-10-26 00:01:36 +00:00
{
2019-12-10 20:47:05 +00:00
auto table_id = getStorageID ( ) ;
2021-04-10 23:33:54 +00:00
checkAlterIsPossible ( params , local_context ) ;
2020-06-17 13:39:26 +00:00
auto metadata_snapshot = getInMemoryMetadataPtr ( ) ;
2019-08-26 14:50:34 +00:00
2020-09-23 12:06:54 +00:00
/// Flush all buffers to storages, so that no non-empty blocks of the old
/// structure remain. Structure of empty blocks will be updated during first
/// insert.
2021-04-10 23:33:54 +00:00
optimize ( { } /*query*/ , metadata_snapshot , { } /*partition_id*/ , false /*final*/ , false /*deduplicate*/ , { } , local_context ) ;
2014-10-27 04:18:13 +00:00
2020-06-17 13:39:26 +00:00
StorageInMemoryMetadata new_metadata = * metadata_snapshot ;
2021-04-10 23:33:54 +00:00
params . apply ( new_metadata , local_context ) ;
DatabaseCatalog : : instance ( ) . getDatabase ( table_id . database_name ) - > alterTable ( local_context , table_id , new_metadata ) ;
2020-06-15 16:55:33 +00:00
setInMemoryMetadata ( new_metadata ) ;
2014-10-26 00:01:36 +00:00
}
2017-12-28 21:36:27 +00:00
void registerStorageBuffer ( StorageFactory & factory )
{
/** Buffer(db, table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes)
*
* db , table - in which table to put data from buffer .
* num_buckets - level of parallelism .
2021-04-12 06:04:38 +00:00
* min_time , max_time , min_rows , max_rows , min_bytes , max_bytes - conditions for flushing the buffer ,
* flush_time , flush_rows , flush_bytes - conditions for flushing .
2017-12-28 21:36:27 +00:00
*/
2017-12-30 00:36:06 +00:00
factory . registerStorage ( " Buffer " , [ ] ( const StorageFactory : : Arguments & args )
2017-12-28 21:36:27 +00:00
{
2017-12-30 00:36:06 +00:00
ASTs & engine_args = args . engine_args ;
2021-04-12 06:04:38 +00:00
if ( engine_args . size ( ) < 9 | | engine_args . size ( ) > 12 )
throw Exception ( " Storage Buffer requires from 9 to 12 parameters: "
" destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes[, flush_time, flush_rows, flush_bytes]. " ,
2017-12-28 21:36:27 +00:00
ErrorCodes : : NUMBER_OF_ARGUMENTS_DOESNT_MATCH ) ;
2020-07-13 13:58:15 +00:00
// Table and database name arguments accept expressions, evaluate them.
2021-04-10 23:33:54 +00:00
engine_args [ 0 ] = evaluateConstantExpressionForDatabaseName ( engine_args [ 0 ] , args . getLocalContext ( ) ) ;
engine_args [ 1 ] = evaluateConstantExpressionOrIdentifierAsLiteral ( engine_args [ 1 ] , args . getLocalContext ( ) ) ;
2017-12-28 21:36:27 +00:00
2020-07-13 13:58:15 +00:00
// After we evaluated all expressions, check that all arguments are
// literals.
2021-12-20 12:55:07 +00:00
for ( size_t i = 0 ; i < engine_args . size ( ) ; + + i )
2020-07-13 13:58:15 +00:00
{
if ( ! typeid_cast < ASTLiteral * > ( engine_args [ i ] . get ( ) ) )
{
throw Exception ( ErrorCodes : : BAD_ARGUMENTS ,
2020-07-14 12:40:18 +00:00
" Storage Buffer expects a literal as an argument #{}, got '{}' "
2020-07-13 13:58:15 +00:00
" instead " , i , engine_args [ i ] - > formatForErrorMessage ( ) ) ;
}
}
2021-04-12 06:04:38 +00:00
size_t i = 0 ;
String destination_database = engine_args [ i + + ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
String destination_table = engine_args [ i + + ] - > as < ASTLiteral & > ( ) . value . safeGet < String > ( ) ;
UInt64 num_buckets = applyVisitor ( FieldVisitorConvertToNumber < UInt64 > ( ) , engine_args [ i + + ] - > as < ASTLiteral & > ( ) . value ) ;
2017-12-28 21:36:27 +00:00
2021-04-12 06:04:38 +00:00
StorageBuffer : : Thresholds min ;
StorageBuffer : : Thresholds max ;
StorageBuffer : : Thresholds flush ;
2017-12-28 21:36:27 +00:00
2021-04-12 06:04:38 +00:00
min . time = applyVisitor ( FieldVisitorConvertToNumber < UInt64 > ( ) , engine_args [ i + + ] - > as < ASTLiteral & > ( ) . value ) ;
max . time = applyVisitor ( FieldVisitorConvertToNumber < UInt64 > ( ) , engine_args [ i + + ] - > as < ASTLiteral & > ( ) . value ) ;
min . rows = applyVisitor ( FieldVisitorConvertToNumber < UInt64 > ( ) , engine_args [ i + + ] - > as < ASTLiteral & > ( ) . value ) ;
max . rows = applyVisitor ( FieldVisitorConvertToNumber < UInt64 > ( ) , engine_args [ i + + ] - > as < ASTLiteral & > ( ) . value ) ;
min . bytes = applyVisitor ( FieldVisitorConvertToNumber < UInt64 > ( ) , engine_args [ i + + ] - > as < ASTLiteral & > ( ) . value ) ;
max . bytes = applyVisitor ( FieldVisitorConvertToNumber < UInt64 > ( ) , engine_args [ i + + ] - > as < ASTLiteral & > ( ) . value ) ;
if ( engine_args . size ( ) > i )
flush . time = applyVisitor ( FieldVisitorConvertToNumber < UInt64 > ( ) , engine_args [ i + + ] - > as < ASTLiteral & > ( ) . value ) ;
if ( engine_args . size ( ) > i )
flush . rows = applyVisitor ( FieldVisitorConvertToNumber < UInt64 > ( ) , engine_args [ i + + ] - > as < ASTLiteral & > ( ) . value ) ;
if ( engine_args . size ( ) > i )
flush . bytes = applyVisitor ( FieldVisitorConvertToNumber < UInt64 > ( ) , engine_args [ i + + ] - > as < ASTLiteral & > ( ) . value ) ;
2017-12-28 21:36:27 +00:00
2020-02-17 19:28:25 +00:00
/// If destination_id is not set, do not write data from the buffer, but simply empty the buffer.
StorageID destination_id = StorageID : : createEmpty ( ) ;
if ( ! destination_table . empty ( ) )
{
2021-04-10 23:33:54 +00:00
destination_id . database_name = args . getContext ( ) - > resolveDatabase ( destination_database ) ;
2020-02-17 19:28:25 +00:00
destination_id . table_name = destination_table ;
}
2017-12-28 21:36:27 +00:00
return StorageBuffer : : create (
2019-12-04 16:06:55 +00:00
args . table_id ,
args . columns ,
args . constraints ,
2021-04-23 12:18:23 +00:00
args . comment ,
2021-04-10 23:33:54 +00:00
args . getContext ( ) ,
2017-12-28 21:36:27 +00:00
num_buckets ,
2021-04-23 12:18:23 +00:00
min ,
max ,
flush ,
2020-02-17 19:28:25 +00:00
destination_id ,
2021-04-10 23:33:54 +00:00
static_cast < bool > ( args . getLocalContext ( ) - > getSettingsRef ( ) . insert_allow_materialized_columns ) ) ;
2021-01-08 11:42:17 +00:00
} ,
{
. supports_parallel_insert = true ,
2017-12-28 21:36:27 +00:00
} ) ;
}
2014-10-26 00:01:36 +00:00
}