2020-04-14 01:26:34 +00:00
# include <Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h>
2020-02-27 16:47:40 +00:00
# include <utility>
2019-10-19 16:49:36 +00:00
namespace DB
{
2020-02-25 18:10:48 +00:00
namespace ErrorCodes
{
extern const int LOGICAL_ERROR ;
}
2019-10-21 15:33:59 +00:00
2019-11-05 11:53:22 +00:00
namespace
{
constexpr auto INDEX_FILE_EXTENSION = " .idx " ;
}
2019-10-19 16:49:36 +00:00
2020-04-14 01:26:34 +00:00
void MergeTreeDataPartWriterOnDisk : : Stream : : finalize ( )
2019-10-19 16:49:36 +00:00
{
compressed . next ( ) ;
plain_file - > next ( ) ;
marks . next ( ) ;
}
2020-05-20 22:16:08 +00:00
void MergeTreeDataPartWriterOnDisk : : Stream : : sync ( ) const
2019-10-19 16:49:36 +00:00
{
plain_file - > sync ( ) ;
2020-02-27 16:47:40 +00:00
marks_file - > sync ( ) ;
2019-10-19 16:49:36 +00:00
}
2020-04-14 01:26:34 +00:00
MergeTreeDataPartWriterOnDisk : : Stream : : Stream (
2019-10-19 16:49:36 +00:00
const String & escaped_column_name_ ,
2020-02-27 16:47:40 +00:00
DiskPtr disk_ ,
2019-10-19 16:49:36 +00:00
const String & data_path_ ,
const std : : string & data_file_extension_ ,
const std : : string & marks_path_ ,
const std : : string & marks_file_extension_ ,
const CompressionCodecPtr & compression_codec_ ,
size_t max_compress_block_size_ ,
size_t estimated_size_ ,
size_t aio_threshold_ ) :
escaped_column_name ( escaped_column_name_ ) ,
data_file_extension { data_file_extension_ } ,
marks_file_extension { marks_file_extension_ } ,
2020-02-27 16:47:40 +00:00
plain_file ( disk_ - > writeFile ( data_path_ + data_file_extension , max_compress_block_size_ , WriteMode : : Rewrite , estimated_size_ , aio_threshold_ ) ) ,
2019-10-19 16:49:36 +00:00
plain_hashing ( * plain_file ) , compressed_buf ( plain_hashing , compression_codec_ ) , compressed ( compressed_buf ) ,
2020-02-27 16:47:40 +00:00
marks_file ( disk_ - > writeFile ( marks_path_ + marks_file_extension , 4096 , WriteMode : : Rewrite ) ) , marks ( * marks_file )
2019-10-19 16:49:36 +00:00
{
}
2020-04-14 01:26:34 +00:00
void MergeTreeDataPartWriterOnDisk : : Stream : : addToChecksums ( MergeTreeData : : DataPart : : Checksums & checksums )
2019-10-21 15:33:59 +00:00
{
String name = escaped_column_name ;
2019-10-19 16:49:36 +00:00
2019-10-21 15:33:59 +00:00
checksums . files [ name + data_file_extension ] . is_compressed = true ;
checksums . files [ name + data_file_extension ] . uncompressed_size = compressed . count ( ) ;
checksums . files [ name + data_file_extension ] . uncompressed_hash = compressed . getHash ( ) ;
checksums . files [ name + data_file_extension ] . file_size = plain_hashing . count ( ) ;
checksums . files [ name + data_file_extension ] . file_hash = plain_hashing . getHash ( ) ;
2019-10-19 16:49:36 +00:00
2019-10-21 15:33:59 +00:00
checksums . files [ name + marks_file_extension ] . file_size = marks . count ( ) ;
checksums . files [ name + marks_file_extension ] . file_hash = marks . getHash ( ) ;
}
2019-10-19 16:49:36 +00:00
2019-11-05 11:53:22 +00:00
2020-04-14 01:26:34 +00:00
MergeTreeDataPartWriterOnDisk : : MergeTreeDataPartWriterOnDisk (
2020-06-03 13:27:54 +00:00
const MergeTreeData : : DataPartPtr & data_part_ ,
2019-11-05 11:53:22 +00:00
const NamesAndTypesList & columns_list_ ,
2019-12-18 16:41:11 +00:00
const std : : vector < MergeTreeIndexPtr > & indices_to_recalc_ ,
2019-11-05 11:53:22 +00:00
const String & marks_file_extension_ ,
const CompressionCodecPtr & default_codec_ ,
2019-12-18 15:54:45 +00:00
const MergeTreeWriterSettings & settings_ ,
2020-04-30 15:27:39 +00:00
const MergeTreeIndexGranularity & index_granularity_ )
2020-06-03 13:27:54 +00:00
: IMergeTreeDataPartWriter ( data_part_ ,
2020-04-14 01:26:34 +00:00
columns_list_ , indices_to_recalc_ ,
index_granularity_ , settings_ )
2020-06-03 13:27:54 +00:00
, part_path ( data_part_ - > getFullRelativePath ( ) )
2019-11-05 11:53:22 +00:00
, marks_file_extension ( marks_file_extension_ )
, default_codec ( default_codec_ )
2019-11-07 11:11:38 +00:00
, compute_granularity ( index_granularity . empty ( ) )
2019-11-05 11:53:22 +00:00
{
if ( settings . blocks_are_granules_size & & ! index_granularity . empty ( ) )
throw Exception ( " Can't take information about index granularity from blocks, when non empty index_granularity array specified " , ErrorCodes : : LOGICAL_ERROR ) ;
2019-12-18 16:41:11 +00:00
2020-06-03 13:27:54 +00:00
auto disk = data_part - > volume - > getDisk ( ) ;
2020-02-27 16:47:40 +00:00
if ( ! disk - > exists ( part_path ) )
disk - > createDirectories ( part_path ) ;
2019-11-05 11:53:22 +00:00
}
2020-04-30 15:27:39 +00:00
// Implemetation is splitted into static functions for ability
/// of making unit tests without creation instance of IMergeTreeDataPartWriter,
/// which requires a lot of dependencies and access to filesystem.
static size_t computeIndexGranularityImpl (
2019-11-05 11:53:22 +00:00
const Block & block ,
size_t index_granularity_bytes ,
size_t fixed_index_granularity_rows ,
bool blocks_are_granules ,
2020-04-30 15:27:39 +00:00
bool can_use_adaptive_index_granularity )
2019-11-05 11:53:22 +00:00
{
size_t rows_in_block = block . rows ( ) ;
size_t index_granularity_for_block ;
if ( ! can_use_adaptive_index_granularity )
index_granularity_for_block = fixed_index_granularity_rows ;
else
{
size_t block_size_in_memory = block . bytes ( ) ;
if ( blocks_are_granules )
index_granularity_for_block = rows_in_block ;
else if ( block_size_in_memory > = index_granularity_bytes )
{
size_t granules_in_block = block_size_in_memory / index_granularity_bytes ;
index_granularity_for_block = rows_in_block / granules_in_block ;
}
else
{
size_t size_of_row_in_bytes = block_size_in_memory / rows_in_block ;
index_granularity_for_block = index_granularity_bytes / size_of_row_in_bytes ;
}
}
if ( index_granularity_for_block = = 0 ) /// very rare case when index granularity bytes less then single row
index_granularity_for_block = 1 ;
/// We should be less or equal than fixed index granularity
index_granularity_for_block = std : : min ( fixed_index_granularity_rows , index_granularity_for_block ) ;
2020-04-30 15:27:39 +00:00
return index_granularity_for_block ;
}
2019-12-18 16:41:11 +00:00
2020-04-30 15:27:39 +00:00
static void fillIndexGranularityImpl (
MergeTreeIndexGranularity & index_granularity ,
size_t index_offset ,
size_t index_granularity_for_block ,
size_t rows_in_block )
{
for ( size_t current_row = index_offset ; current_row < rows_in_block ; current_row + = index_granularity_for_block )
index_granularity . appendMark ( index_granularity_for_block ) ;
2019-11-05 11:53:22 +00:00
}
2020-04-30 15:27:39 +00:00
size_t MergeTreeDataPartWriterOnDisk : : computeIndexGranularity ( const Block & block )
2019-11-05 11:53:22 +00:00
{
const auto storage_settings = storage . getSettings ( ) ;
2020-04-30 15:27:39 +00:00
return computeIndexGranularityImpl (
block ,
storage_settings - > index_granularity_bytes ,
storage_settings - > index_granularity ,
settings . blocks_are_granules_size ,
settings . can_use_adaptive_granularity ) ;
}
void MergeTreeDataPartWriterOnDisk : : fillIndexGranularity ( size_t index_granularity_for_block , size_t rows_in_block )
{
2019-11-05 11:53:22 +00:00
fillIndexGranularityImpl (
index_granularity ,
2020-04-30 15:27:39 +00:00
getIndexOffset ( ) ,
index_granularity_for_block ,
rows_in_block ) ;
2019-11-05 11:53:22 +00:00
}
2020-04-14 01:26:34 +00:00
void MergeTreeDataPartWriterOnDisk : : initPrimaryIndex ( )
2019-11-05 11:53:22 +00:00
{
if ( storage . hasPrimaryKey ( ) )
{
2020-06-03 13:27:54 +00:00
index_file_stream = data_part - > volume - > getDisk ( ) - > writeFile ( part_path + " primary.idx " , DBMS_DEFAULT_BUFFER_SIZE , WriteMode : : Rewrite ) ;
2019-11-05 11:53:22 +00:00
index_stream = std : : make_unique < HashingWriteBuffer > ( * index_file_stream ) ;
}
2019-11-08 14:36:10 +00:00
primary_index_initialized = true ;
2019-11-05 11:53:22 +00:00
}
2020-04-14 01:26:34 +00:00
void MergeTreeDataPartWriterOnDisk : : initSkipIndices ( )
2019-11-05 11:53:22 +00:00
{
2020-06-03 13:27:54 +00:00
for ( const auto & index_helper : skip_indices )
2019-11-05 11:53:22 +00:00
{
2020-06-03 13:27:54 +00:00
String stream_name = index_helper - > getFileName ( ) ;
2019-11-05 11:53:22 +00:00
skip_indices_streams . emplace_back (
2020-04-14 01:26:34 +00:00
std : : make_unique < MergeTreeDataPartWriterOnDisk : : Stream > (
2019-11-05 11:53:22 +00:00
stream_name ,
2020-06-03 13:27:54 +00:00
data_part - > volume - > getDisk ( ) ,
2019-11-05 11:53:22 +00:00
part_path + stream_name , INDEX_FILE_EXTENSION ,
part_path + stream_name , marks_file_extension ,
default_codec , settings . max_compress_block_size ,
0 , settings . aio_threshold ) ) ;
2020-06-03 13:27:54 +00:00
skip_indices_aggregators . push_back ( index_helper - > createIndexAggregator ( ) ) ;
2019-11-05 11:53:22 +00:00
skip_index_filling . push_back ( 0 ) ;
}
2019-11-08 14:36:10 +00:00
skip_indices_initialized = true ;
2019-11-05 11:53:22 +00:00
}
2020-04-14 01:26:34 +00:00
void MergeTreeDataPartWriterOnDisk : : calculateAndSerializePrimaryIndex ( const Block & primary_index_block )
2019-11-05 11:53:22 +00:00
{
2019-11-08 14:36:10 +00:00
if ( ! primary_index_initialized )
throw Exception ( " Primary index is not initialized " , ErrorCodes : : LOGICAL_ERROR ) ;
2020-06-03 13:27:54 +00:00
size_t rows = primary_index_block . rows ( ) ;
2020-03-09 01:50:33 +00:00
size_t primary_columns_num = primary_index_block . columns ( ) ;
2019-11-05 11:53:22 +00:00
if ( index_columns . empty ( ) )
{
2020-03-09 01:50:33 +00:00
index_types = primary_index_block . getDataTypes ( ) ;
2019-11-05 11:53:22 +00:00
index_columns . resize ( primary_columns_num ) ;
last_index_row . resize ( primary_columns_num ) ;
for ( size_t i = 0 ; i < primary_columns_num ; + + i )
2020-03-09 01:50:33 +00:00
index_columns [ i ] = primary_index_block . getByPosition ( i ) . column - > cloneEmpty ( ) ;
2019-11-05 11:53:22 +00:00
}
/** While filling index (index_columns), disable memory tracker.
* Because memory is allocated here ( maybe in context of INSERT query ) ,
* but then freed in completely different place ( while merging parts ) , where query memory_tracker is not available .
* And otherwise it will look like excessively growing memory consumption in context of query .
* ( observed in long INSERT SELECTs )
*/
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock ( ) ;
/// Write index. The index contains Primary Key value for each `index_granularity` row.
2019-11-28 20:14:41 +00:00
2020-04-30 15:27:39 +00:00
size_t current_row = getIndexOffset ( ) ;
size_t total_marks = index_granularity . getMarksCount ( ) ;
while ( index_mark < total_marks & & current_row < rows )
2019-11-05 11:53:22 +00:00
{
if ( storage . hasPrimaryKey ( ) )
{
for ( size_t j = 0 ; j < primary_columns_num ; + + j )
{
2020-03-09 01:50:33 +00:00
const auto & primary_column = primary_index_block . getByPosition ( j ) ;
2020-04-30 15:27:39 +00:00
index_columns [ j ] - > insertFrom ( * primary_column . column , current_row ) ;
primary_column . type - > serializeBinary ( * primary_column . column , current_row , * index_stream ) ;
2019-11-05 11:53:22 +00:00
}
}
2020-04-30 15:27:39 +00:00
current_row + = index_granularity . getMarkRows ( index_mark + + ) ;
2019-11-05 11:53:22 +00:00
}
/// store last index row to write final mark at the end of column
for ( size_t j = 0 ; j < primary_columns_num ; + + j )
{
2020-03-09 01:50:33 +00:00
const IColumn & primary_column = * primary_index_block . getByPosition ( j ) . column . get ( ) ;
2019-11-05 11:53:22 +00:00
primary_column . get ( rows - 1 , last_index_row [ j ] ) ;
}
}
2020-04-14 01:26:34 +00:00
void MergeTreeDataPartWriterOnDisk : : calculateAndSerializeSkipIndices ( const Block & skip_indexes_block )
2019-11-05 11:53:22 +00:00
{
2019-11-08 14:36:10 +00:00
if ( ! skip_indices_initialized )
throw Exception ( " Skip indices are not initialized " , ErrorCodes : : LOGICAL_ERROR ) ;
2020-06-03 13:27:54 +00:00
size_t rows = skip_indexes_block . rows ( ) ;
2019-11-05 11:53:22 +00:00
size_t skip_index_current_data_mark = 0 ;
2019-12-27 21:17:53 +00:00
/// Filling and writing skip indices like in MergeTreeDataPartWriterWide::writeColumn
2019-11-05 11:53:22 +00:00
for ( size_t i = 0 ; i < skip_indices . size ( ) ; + + i )
{
2020-06-03 13:27:54 +00:00
const auto index_helper = skip_indices [ i ] ;
2019-11-05 11:53:22 +00:00
auto & stream = * skip_indices_streams [ i ] ;
size_t prev_pos = 0 ;
skip_index_current_data_mark = skip_index_data_mark ;
while ( prev_pos < rows )
{
UInt64 limit = 0 ;
2020-04-30 15:27:39 +00:00
size_t current_index_offset = getIndexOffset ( ) ;
if ( prev_pos = = 0 & & current_index_offset ! = 0 )
2019-11-05 11:53:22 +00:00
{
2020-04-30 15:27:39 +00:00
limit = current_index_offset ;
2019-11-05 11:53:22 +00:00
}
else
{
limit = index_granularity . getMarkRows ( skip_index_current_data_mark ) ;
if ( skip_indices_aggregators [ i ] - > empty ( ) )
{
2020-06-03 13:27:54 +00:00
skip_indices_aggregators [ i ] = index_helper - > createIndexAggregator ( ) ;
2019-11-05 11:53:22 +00:00
skip_index_filling [ i ] = 0 ;
if ( stream . compressed . offset ( ) > = settings . min_compress_block_size )
stream . compressed . next ( ) ;
writeIntBinary ( stream . plain_hashing . count ( ) , stream . marks ) ;
writeIntBinary ( stream . compressed . offset ( ) , stream . marks ) ;
/// Actually this numbers is redundant, but we have to store them
/// to be compatible with normal .mrk2 file format
if ( settings . can_use_adaptive_granularity )
writeIntBinary ( 1UL , stream . marks ) ;
}
/// this mark is aggregated, go to the next one
skip_index_current_data_mark + + ;
}
size_t pos = prev_pos ;
skip_indices_aggregators [ i ] - > update ( skip_indexes_block , & pos , limit ) ;
if ( pos = = prev_pos + limit )
{
+ + skip_index_filling [ i ] ;
/// write index if it is filled
2020-06-03 13:27:54 +00:00
if ( skip_index_filling [ i ] = = index_helper - > index . granularity )
2019-11-05 11:53:22 +00:00
{
skip_indices_aggregators [ i ] - > getGranuleAndReset ( ) - > serializeBinary ( stream . compressed ) ;
skip_index_filling [ i ] = 0 ;
}
}
prev_pos = pos ;
}
}
skip_index_data_mark = skip_index_current_data_mark ;
}
2020-04-14 01:26:34 +00:00
void MergeTreeDataPartWriterOnDisk : : finishPrimaryIndexSerialization ( MergeTreeData : : DataPart : : Checksums & checksums )
2019-11-05 11:53:22 +00:00
{
2019-11-20 13:33:41 +00:00
bool write_final_mark = ( with_final_mark & & data_written ) ;
if ( write_final_mark & & compute_granularity )
index_granularity . appendMark ( 0 ) ;
2019-11-05 11:53:22 +00:00
if ( index_stream )
{
2019-11-20 13:33:41 +00:00
if ( write_final_mark )
2019-11-05 11:53:22 +00:00
{
for ( size_t j = 0 ; j < index_columns . size ( ) ; + + j )
{
index_columns [ j ] - > insert ( last_index_row [ j ] ) ;
index_types [ j ] - > serializeBinary ( last_index_row [ j ] , * index_stream ) ;
}
2019-11-08 14:36:10 +00:00
2019-11-05 11:53:22 +00:00
last_index_row . clear ( ) ;
}
index_stream - > next ( ) ;
checksums . files [ " primary.idx " ] . file_size = index_stream - > count ( ) ;
checksums . files [ " primary.idx " ] . file_hash = index_stream - > getHash ( ) ;
index_stream = nullptr ;
}
}
2020-04-14 01:26:34 +00:00
void MergeTreeDataPartWriterOnDisk : : finishSkipIndicesSerialization (
2019-11-05 11:53:22 +00:00
MergeTreeData : : DataPart : : Checksums & checksums )
{
for ( size_t i = 0 ; i < skip_indices . size ( ) ; + + i )
{
auto & stream = * skip_indices_streams [ i ] ;
if ( ! skip_indices_aggregators [ i ] - > empty ( ) )
skip_indices_aggregators [ i ] - > getGranuleAndReset ( ) - > serializeBinary ( stream . compressed ) ;
}
for ( auto & stream : skip_indices_streams )
{
stream - > finalize ( ) ;
stream - > addToChecksums ( checksums ) ;
}
2019-11-07 11:11:38 +00:00
skip_indices_streams . clear ( ) ;
2019-11-05 11:53:22 +00:00
skip_indices_aggregators . clear ( ) ;
skip_index_filling . clear ( ) ;
}
2019-10-21 15:33:59 +00:00
}