2019-10-21 00:28:29 +00:00
# include <Storages/MergeTree/MergeTreeDataPartWriterWide.h>
2020-05-20 20:16:32 +00:00
# include <Interpreters/Context.h>
2020-09-21 07:17:58 +00:00
# include <Compression/CompressionFactory.h>
2020-12-14 07:28:42 +00:00
# include <Compression/CompressedReadBufferFromFile.h>
2019-10-21 00:28:29 +00:00
namespace DB
{
2020-02-25 18:10:48 +00:00
namespace ErrorCodes
{
extern const int LOGICAL_ERROR ;
}
2019-10-21 00:28:29 +00:00
2019-10-21 15:33:59 +00:00
namespace
{
constexpr auto DATA_FILE_EXTENSION = " .bin " ;
}
2020-12-14 12:03:49 +00:00
namespace
{
2020-12-15 10:34:28 +00:00
/// Get granules for block using index_granularity
2020-12-14 12:03:49 +00:00
Granules getGranulesToWrite ( const MergeTreeIndexGranularity & index_granularity , size_t block_rows , size_t current_mark , size_t rows_written_in_last_mark )
{
if ( current_mark > = index_granularity . getMarksCount ( ) )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Request to get granules from mark {} but index granularity size is {} " , current_mark , index_granularity . getMarksCount ( ) ) ;
Granules result ;
size_t current_row = 0 ;
2020-12-15 10:34:28 +00:00
/// When our last mark is not finished yet and we have to write in rows into it
2020-12-14 12:03:49 +00:00
if ( rows_written_in_last_mark > 0 )
{
size_t rows_left_in_last_mark = index_granularity . getMarkRows ( current_mark ) - rows_written_in_last_mark ;
2020-12-17 08:17:39 +00:00
size_t rows_left_in_block = block_rows - current_row ;
result . emplace_back ( Granule {
. start_row = current_row ,
2020-12-18 14:44:31 +00:00
. rows_to_write = std : : min ( rows_left_in_block , rows_left_in_last_mark ) ,
2020-12-17 08:17:39 +00:00
. mark_number = current_mark ,
. mark_on_start = false , /// Don't mark this granule because we have already marked it
2020-12-18 14:44:31 +00:00
. is_complete = ( rows_left_in_block > = rows_left_in_last_mark ) ,
2020-12-17 08:17:39 +00:00
} ) ;
2020-12-18 14:44:31 +00:00
current_row + = result . back ( ) . rows_to_write ;
2020-12-14 12:03:49 +00:00
current_mark + + ;
}
2020-12-15 10:34:28 +00:00
/// Calculating normal granules for block
2020-12-14 12:03:49 +00:00
while ( current_row < block_rows )
{
2020-12-17 08:17:39 +00:00
size_t expected_rows_in_mark = index_granularity . getMarkRows ( current_mark ) ;
size_t rows_left_in_block = block_rows - current_row ;
2020-12-15 10:34:28 +00:00
/// If we have less rows in block than expected in granularity
/// save incomplete granule
2020-12-17 08:17:39 +00:00
result . emplace_back ( Granule {
. start_row = current_row ,
2020-12-18 14:44:31 +00:00
. rows_to_write = std : : min ( rows_left_in_block , expected_rows_in_mark ) ,
2020-12-17 08:17:39 +00:00
. mark_number = current_mark ,
. mark_on_start = true ,
2020-12-18 14:44:31 +00:00
. is_complete = ( rows_left_in_block > = expected_rows_in_mark ) ,
2020-12-17 08:17:39 +00:00
} ) ;
2020-12-18 14:44:31 +00:00
current_row + = result . back ( ) . rows_to_write ;
2020-12-14 12:03:49 +00:00
current_mark + + ;
}
return result ;
}
}
2019-10-21 15:33:59 +00:00
MergeTreeDataPartWriterWide : : MergeTreeDataPartWriterWide (
2020-05-10 13:33:27 +00:00
const MergeTreeData : : DataPartPtr & data_part_ ,
2019-10-21 15:33:59 +00:00
const NamesAndTypesList & columns_list_ ,
2020-06-17 12:39:20 +00:00
const StorageMetadataPtr & metadata_snapshot_ ,
2019-12-18 16:41:11 +00:00
const std : : vector < MergeTreeIndexPtr > & indices_to_recalc_ ,
2019-10-21 15:33:59 +00:00
const String & marks_file_extension_ ,
const CompressionCodecPtr & default_codec_ ,
2019-12-18 15:54:45 +00:00
const MergeTreeWriterSettings & settings_ ,
2019-11-07 11:11:38 +00:00
const MergeTreeIndexGranularity & index_granularity_ )
2020-06-26 11:27:19 +00:00
: MergeTreeDataPartWriterOnDisk ( data_part_ , columns_list_ , metadata_snapshot_ ,
2020-05-10 13:33:27 +00:00
indices_to_recalc_ , marks_file_extension_ ,
default_codec_ , settings_ , index_granularity_ )
2019-10-21 15:33:59 +00:00
{
2020-06-17 12:39:20 +00:00
const auto & columns = metadata_snapshot - > getColumns ( ) ;
2019-10-21 15:33:59 +00:00
for ( const auto & it : columns_list )
2020-10-21 23:02:20 +00:00
addStreams ( it , columns . getCodecDescOrDefault ( it . name , default_codec ) , settings . estimated_size ) ;
2019-10-21 15:33:59 +00:00
}
void MergeTreeDataPartWriterWide : : addStreams (
2020-10-21 23:02:20 +00:00
const NameAndTypePair & column ,
2020-09-21 07:17:58 +00:00
const ASTPtr & effective_codec_desc ,
2019-12-09 21:21:17 +00:00
size_t estimated_size )
2019-10-21 15:33:59 +00:00
{
2020-09-21 07:17:58 +00:00
IDataType : : StreamCallback callback = [ & ] ( const IDataType : : SubstreamPath & substream_path , const IDataType & substream_type )
2019-10-21 15:33:59 +00:00
{
2020-10-21 23:02:20 +00:00
String stream_name = IDataType : : getFileNameForStream ( column , substream_path ) ;
2019-10-21 15:33:59 +00:00
/// Shared offsets for Nested type.
if ( column_streams . count ( stream_name ) )
return ;
2020-09-21 14:22:13 +00:00
CompressionCodecPtr compression_codec ;
2020-09-29 22:23:42 +00:00
/// If we can use special codec then just get it
2020-09-21 14:22:13 +00:00
if ( IDataType : : isSpecialCompressionAllowed ( substream_path ) )
compression_codec = CompressionCodecFactory : : instance ( ) . get ( effective_codec_desc , & substream_type , default_codec ) ;
2020-09-29 22:23:42 +00:00
else /// otherwise return only generic codecs and don't use info about the data_type
2020-09-21 14:47:10 +00:00
compression_codec = CompressionCodecFactory : : instance ( ) . get ( effective_codec_desc , nullptr , default_codec , true ) ;
2020-09-21 07:17:58 +00:00
2020-02-19 14:07:36 +00:00
column_streams [ stream_name ] = std : : make_unique < Stream > (
2019-10-21 15:33:59 +00:00
stream_name ,
2020-05-10 13:33:27 +00:00
data_part - > volume - > getDisk ( ) ,
2019-10-21 15:33:59 +00:00
part_path + stream_name , DATA_FILE_EXTENSION ,
part_path + stream_name , marks_file_extension ,
2020-09-21 11:24:10 +00:00
compression_codec ,
2019-10-21 15:33:59 +00:00
settings . max_compress_block_size ,
estimated_size ,
settings . aio_threshold ) ;
} ;
IDataType : : SubstreamPath stream_path ;
2020-10-21 23:02:20 +00:00
column . type - > enumerateStreams ( callback , stream_path ) ;
2019-10-21 15:33:59 +00:00
}
IDataType : : OutputStreamGetter MergeTreeDataPartWriterWide : : createStreamGetter (
2020-12-18 13:14:09 +00:00
const NameAndTypePair & column , WrittenOffsetColumns & offset_columns ) const
2019-10-21 15:33:59 +00:00
{
2019-12-09 21:21:17 +00:00
return [ & , this ] ( const IDataType : : SubstreamPath & substream_path ) - > WriteBuffer *
2019-10-21 15:33:59 +00:00
{
bool is_offsets = ! substream_path . empty ( ) & & substream_path . back ( ) . type = = IDataType : : Substream : : ArraySizes ;
2020-10-21 23:02:20 +00:00
String stream_name = IDataType : : getFileNameForStream ( column , substream_path ) ;
2019-10-21 15:33:59 +00:00
/// Don't write offsets more than one time for Nested type.
if ( is_offsets & & offset_columns . count ( stream_name ) )
return nullptr ;
2020-12-09 18:10:09 +00:00
return & column_streams . at ( stream_name ) - > compressed ;
2019-10-21 15:33:59 +00:00
} ;
}
2020-12-18 14:44:31 +00:00
2020-12-15 09:54:48 +00:00
void MergeTreeDataPartWriterWide : : shiftCurrentMark ( const Granules & granules_written )
{
auto last_granule = granules_written . back ( ) ;
2020-12-15 10:34:28 +00:00
/// If we didn't finished last granule than we will continue to write it from new block
2020-12-18 14:44:31 +00:00
if ( ! last_granule . is_complete )
2020-12-15 09:54:48 +00:00
{
2020-12-15 10:34:28 +00:00
/// Shift forward except last granule
2020-12-15 09:54:48 +00:00
setCurrentMark ( getCurrentMark ( ) + granules_written . size ( ) - 1 ) ;
bool still_in_the_same_granule = granules_written . size ( ) = = 1 ;
2020-12-15 10:34:28 +00:00
/// We wrote whole block in the same granule, but didn't finished it.
/// So add written rows to rows written in last_mark
2020-12-15 09:54:48 +00:00
if ( still_in_the_same_granule )
2020-12-18 14:44:31 +00:00
rows_written_in_last_mark + = last_granule . rows_to_write ;
2020-12-15 09:54:48 +00:00
else
2020-12-18 14:44:31 +00:00
rows_written_in_last_mark = last_granule . rows_to_write ;
2020-12-15 09:54:48 +00:00
}
else
{
setCurrentMark ( getCurrentMark ( ) + granules_written . size ( ) ) ;
rows_written_in_last_mark = 0 ;
}
}
2020-12-10 08:57:52 +00:00
void MergeTreeDataPartWriterWide : : write ( const Block & block , const IColumn : : Permutation * permutation )
2019-10-21 00:28:29 +00:00
{
2019-11-07 11:11:38 +00:00
/// Fill index granularity for this block
/// if it's unknown (in case of insert data or horizontal merge,
/// but not in case of vertical merge)
if ( compute_granularity )
2020-04-26 21:19:25 +00:00
{
size_t index_granularity_for_block = computeIndexGranularity ( block ) ;
2020-12-18 13:49:45 +00:00
if ( rows_written_in_last_mark > 0 )
{
size_t rows_left_in_last_mark = index_granularity . getMarkRows ( getCurrentMark ( ) ) - rows_written_in_last_mark ;
2020-12-21 08:24:52 +00:00
/// Previous granularity was much bigger than our new block's
/// granularity let's adjust it, because we want add new
/// heavy-weight blocks into small old granule.
2020-12-18 13:49:45 +00:00
if ( rows_left_in_last_mark > index_granularity_for_block )
2020-12-21 08:24:52 +00:00
{
/// We have already written more rows than granularity of our block.
/// adjust last mark rows and flush to disk.
if ( rows_written_in_last_mark > = index_granularity_for_block )
adjustLastMarkIfNeedAndFlushToDisk ( rows_written_in_last_mark ) ;
else /// We still can write some rows from new block into previous granule.
adjustLastMarkIfNeedAndFlushToDisk ( index_granularity_for_block - rows_written_in_last_mark ) ;
}
2020-12-18 13:49:45 +00:00
}
2020-04-26 21:19:25 +00:00
fillIndexGranularity ( index_granularity_for_block , block . rows ( ) ) ;
}
2019-11-27 19:57:07 +00:00
2020-12-14 12:51:14 +00:00
auto granules_to_write = getGranulesToWrite ( index_granularity , block . rows ( ) , getCurrentMark ( ) , rows_written_in_last_mark ) ;
2020-12-11 13:20:19 +00:00
2019-12-09 21:21:17 +00:00
auto offset_columns = written_offset_columns ? * written_offset_columns : WrittenOffsetColumns { } ;
2020-12-10 08:57:52 +00:00
Block primary_key_block ;
if ( settings . rewrite_primary_key )
primary_key_block = getBlockAndPermute ( block , metadata_snapshot - > getPrimaryKeyColumns ( ) , permutation ) ;
2020-12-11 08:41:02 +00:00
Block skip_indexes_block = getBlockAndPermute ( block , getSkipIndicesColumns ( ) , permutation ) ;
2019-10-21 00:28:29 +00:00
auto it = columns_list . begin ( ) ;
for ( size_t i = 0 ; i < columns_list . size ( ) ; + + i , + + it )
{
const ColumnWithTypeAndName & column = block . getByName ( it - > name ) ;
if ( permutation )
{
if ( primary_key_block . has ( it - > name ) )
{
const auto & primary_column = * primary_key_block . getByName ( it - > name ) . column ;
2020-12-18 13:14:09 +00:00
writeColumn ( * it , primary_column , offset_columns , granules_to_write ) ;
2019-10-21 00:28:29 +00:00
}
else if ( skip_indexes_block . has ( it - > name ) )
{
const auto & index_column = * skip_indexes_block . getByName ( it - > name ) . column ;
2020-12-18 13:14:09 +00:00
writeColumn ( * it , index_column , offset_columns , granules_to_write ) ;
2019-10-21 00:28:29 +00:00
}
else
{
/// We rearrange the columns that are not included in the primary key here; Then the result is released - to save RAM.
ColumnPtr permuted_column = column . column - > permute ( * permutation , 0 ) ;
2020-12-18 13:14:09 +00:00
writeColumn ( * it , * permuted_column , offset_columns , granules_to_write ) ;
2019-10-21 00:28:29 +00:00
}
}
else
{
2020-12-18 13:14:09 +00:00
writeColumn ( * it , * column . column , offset_columns , granules_to_write ) ;
2019-10-21 00:28:29 +00:00
}
}
2020-12-10 08:57:52 +00:00
if ( settings . rewrite_primary_key )
2020-12-11 13:20:19 +00:00
calculateAndSerializePrimaryIndex ( primary_key_block , granules_to_write ) ;
calculateAndSerializeSkipIndices ( skip_indexes_block , granules_to_write ) ;
2020-12-10 08:57:52 +00:00
2020-12-15 09:54:48 +00:00
shiftCurrentMark ( granules_to_write ) ;
2019-10-21 00:28:29 +00:00
}
void MergeTreeDataPartWriterWide : : writeSingleMark (
2020-10-21 23:02:20 +00:00
const NameAndTypePair & column ,
2019-10-21 00:28:29 +00:00
WrittenOffsetColumns & offset_columns ,
size_t number_of_rows ,
DB : : IDataType : : SubstreamPath & path )
{
2020-12-18 13:14:09 +00:00
StreamsWithMarks marks = getCurrentMarksForColumn ( column , offset_columns , path ) ;
2020-12-10 16:29:10 +00:00
for ( const auto & mark : marks )
flushMarkToFile ( mark , number_of_rows ) ;
}
void MergeTreeDataPartWriterWide : : flushMarkToFile ( const StreamNameAndMark & stream_with_mark , size_t rows_in_mark )
{
Stream & stream = * column_streams [ stream_with_mark . stream_name ] ;
writeIntBinary ( stream_with_mark . mark . offset_in_compressed_file , stream . marks ) ;
writeIntBinary ( stream_with_mark . mark . offset_in_decompressed_block , stream . marks ) ;
if ( settings . can_use_adaptive_granularity )
writeIntBinary ( rows_in_mark , stream . marks ) ;
}
StreamsWithMarks MergeTreeDataPartWriterWide : : getCurrentMarksForColumn (
2020-12-18 13:14:09 +00:00
const NameAndTypePair & column ,
2020-12-10 16:29:10 +00:00
WrittenOffsetColumns & offset_columns ,
DB : : IDataType : : SubstreamPath & path )
{
StreamsWithMarks result ;
2020-10-21 23:02:20 +00:00
column . type - > enumerateStreams ( [ & ] ( const IDataType : : SubstreamPath & substream_path , const IDataType & /* substream_type */ )
2020-12-10 16:29:10 +00:00
{
bool is_offsets = ! substream_path . empty ( ) & & substream_path . back ( ) . type = = IDataType : : Substream : : ArraySizes ;
2019-10-21 00:28:29 +00:00
2020-12-18 13:14:09 +00:00
String stream_name = IDataType : : getFileNameForStream ( column , substream_path ) ;
2019-10-21 00:28:29 +00:00
2020-12-10 16:29:10 +00:00
/// Don't write offsets more than one time for Nested type.
if ( is_offsets & & offset_columns . count ( stream_name ) )
return ;
2019-10-21 00:28:29 +00:00
2020-12-10 16:29:10 +00:00
Stream & stream = * column_streams [ stream_name ] ;
2019-10-21 00:28:29 +00:00
2020-12-10 16:29:10 +00:00
/// There could already be enough data to compress into the new block.
if ( stream . compressed . offset ( ) > = settings . min_compress_block_size )
stream . compressed . next ( ) ;
2019-10-21 00:28:29 +00:00
2020-12-10 16:29:10 +00:00
StreamNameAndMark stream_with_mark ;
stream_with_mark . stream_name = stream_name ;
stream_with_mark . mark . offset_in_compressed_file = stream . plain_hashing . count ( ) ;
stream_with_mark . mark . offset_in_decompressed_block = stream . compressed . offset ( ) ;
2019-10-21 00:28:29 +00:00
2020-12-10 16:29:10 +00:00
result . push_back ( stream_with_mark ) ;
} , path ) ;
2019-10-21 00:28:29 +00:00
2020-12-10 16:29:10 +00:00
return result ;
2019-10-21 00:28:29 +00:00
}
2020-12-11 13:20:19 +00:00
void MergeTreeDataPartWriterWide : : writeSingleGranule (
2020-10-21 23:02:20 +00:00
const NameAndTypePair & name_and_type ,
2019-10-21 00:28:29 +00:00
const IColumn & column ,
WrittenOffsetColumns & offset_columns ,
IDataType : : SerializeBinaryBulkStatePtr & serialization_state ,
IDataType : : SerializeBinaryBulkSettings & serialize_settings ,
2020-12-15 10:34:28 +00:00
const Granule & granule )
2019-10-21 00:28:29 +00:00
{
2020-12-25 18:25:59 +00:00
name_and_type . type - > serializeBinaryBulkWithMultipleStreams ( column , granule . start_row , granule . rows_to_write , serialize_settings , serialization_state ) ;
2019-10-21 00:28:29 +00:00
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
2020-10-21 23:02:20 +00:00
name_and_type . type - > enumerateStreams ( [ & ] ( const IDataType : : SubstreamPath & substream_path , const IDataType & /* substream_type */ )
2019-10-21 00:28:29 +00:00
{
bool is_offsets = ! substream_path . empty ( ) & & substream_path . back ( ) . type = = IDataType : : Substream : : ArraySizes ;
2020-10-21 23:02:20 +00:00
String stream_name = IDataType : : getFileNameForStream ( name_and_type , substream_path ) ;
2019-10-21 00:28:29 +00:00
/// Don't write offsets more than one time for Nested type.
if ( is_offsets & & offset_columns . count ( stream_name ) )
return ;
column_streams [ stream_name ] - > compressed . nextIfAtEnd ( ) ;
} , serialize_settings . path ) ;
}
2019-12-18 15:54:45 +00:00
/// Column must not be empty. (column.size() !== 0)
void MergeTreeDataPartWriterWide : : writeColumn (
2020-10-21 23:02:20 +00:00
const NameAndTypePair & name_and_type ,
2019-10-21 00:28:29 +00:00
const IColumn & column ,
2020-12-11 13:20:19 +00:00
WrittenOffsetColumns & offset_columns ,
const Granules & granules )
2019-10-21 00:28:29 +00:00
{
2020-12-18 13:49:45 +00:00
if ( granules . empty ( ) )
2020-12-25 18:25:59 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Empty granules for column {}, current mark {} " , backQuoteIfNeed ( name_and_type . name ) , getCurrentMark ( ) ) ;
2020-12-18 13:49:45 +00:00
2020-12-25 18:25:59 +00:00
const auto & [ name , type ] = name_and_type ;
2019-11-07 11:11:38 +00:00
auto [ it , inserted ] = serialization_states . emplace ( name , nullptr ) ;
2020-12-15 10:34:28 +00:00
2019-11-07 11:11:38 +00:00
if ( inserted )
{
IDataType : : SerializeBinaryBulkSettings serialize_settings ;
2020-10-21 23:02:20 +00:00
serialize_settings . getter = createStreamGetter ( name_and_type , offset_columns ) ;
2020-12-25 18:25:59 +00:00
type - > serializeBinaryBulkStatePrefix ( serialize_settings , it - > second ) ;
2019-11-07 11:11:38 +00:00
}
2019-12-12 18:55:19 +00:00
const auto & global_settings = storage . global_context . getSettingsRef ( ) ;
2019-10-21 00:28:29 +00:00
IDataType : : SerializeBinaryBulkSettings serialize_settings ;
2020-10-21 23:02:20 +00:00
serialize_settings . getter = createStreamGetter ( name_and_type , offset_columns ) ;
2019-12-12 18:55:19 +00:00
serialize_settings . low_cardinality_max_dictionary_size = global_settings . low_cardinality_max_dictionary_size ;
serialize_settings . low_cardinality_use_single_dictionary_for_part = global_settings . low_cardinality_use_single_dictionary_for_part ! = 0 ;
2019-10-21 00:28:29 +00:00
2020-12-11 13:20:19 +00:00
for ( const auto & granule : granules )
2019-10-21 00:28:29 +00:00
{
2020-12-18 14:44:31 +00:00
data_written = true ;
2020-12-14 11:06:02 +00:00
2020-12-18 13:49:45 +00:00
if ( granule . mark_on_start )
{
if ( last_non_written_marks . count ( name ) )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " We have to add new mark for column, but already have non written mark. Current mark {}, total marks {}, offset {} " , getCurrentMark ( ) , index_granularity . getMarksCount ( ) , rows_written_in_last_mark ) ;
2020-12-25 18:25:59 +00:00
last_non_written_marks [ name ] = getCurrentMarksForColumn ( name_and_type , offset_columns , serialize_settings . path ) ;
2020-12-18 13:49:45 +00:00
}
2019-11-07 11:11:38 +00:00
2020-12-14 11:06:02 +00:00
writeSingleGranule (
2020-12-18 13:14:09 +00:00
name_and_type ,
2020-12-14 11:06:02 +00:00
column ,
offset_columns ,
it - > second ,
serialize_settings ,
2020-12-15 10:34:28 +00:00
granule
2019-10-21 00:28:29 +00:00
) ;
2020-12-18 13:49:45 +00:00
2020-12-18 14:44:31 +00:00
if ( granule . is_complete )
2020-12-18 13:49:45 +00:00
{
2020-12-20 08:01:39 +00:00
auto marks_it = last_non_written_marks . find ( name ) ;
if ( marks_it = = last_non_written_marks . end ( ) )
2020-12-18 13:49:45 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " No mark was saved for incomplete granule for column {} " , backQuoteIfNeed ( name ) ) ;
2020-12-20 08:01:39 +00:00
for ( const auto & mark : marks_it - > second )
2020-12-18 13:49:45 +00:00
flushMarkToFile ( mark , index_granularity . getMarkRows ( granule . mark_number ) ) ;
2020-12-20 08:01:39 +00:00
last_non_written_marks . erase ( marks_it ) ;
2020-12-18 13:49:45 +00:00
}
2019-10-21 00:28:29 +00:00
}
2020-10-21 23:02:20 +00:00
name_and_type . type - > enumerateStreams ( [ & ] ( const IDataType : : SubstreamPath & substream_path , const IDataType & /* substream_type */ )
2019-10-21 00:28:29 +00:00
{
bool is_offsets = ! substream_path . empty ( ) & & substream_path . back ( ) . type = = IDataType : : Substream : : ArraySizes ;
if ( is_offsets )
{
2020-10-21 23:02:20 +00:00
String stream_name = IDataType : : getFileNameForStream ( name_and_type , substream_path ) ;
2019-10-21 00:28:29 +00:00
offset_columns . insert ( stream_name ) ;
}
} , serialize_settings . path ) ;
}
2020-12-14 07:28:42 +00:00
void MergeTreeDataPartWriterWide : : validateColumnOfFixedSize ( const String & name , const IDataType & type )
{
if ( ! type . isValueRepresentedByNumber ( ) | | type . haveSubtypes ( ) )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Cannot validate column of non fixed type {} " , type . getName ( ) ) ;
auto disk = data_part - > volume - > getDisk ( ) ;
String mrk_path = fullPath ( disk , part_path + name + marks_file_extension ) ;
String bin_path = fullPath ( disk , part_path + name + DATA_FILE_EXTENSION ) ;
DB : : ReadBufferFromFile mrk_in ( mrk_path ) ;
DB : : CompressedReadBufferFromFile bin_in ( bin_path , 0 , 0 , 0 ) ;
bool must_be_last = false ;
UInt64 offset_in_compressed_file = 0 ;
UInt64 offset_in_decompressed_block = 0 ;
2020-12-19 18:59:20 +00:00
UInt64 index_granularity_rows = data_part - > index_granularity_info . fixed_index_granularity ;
2020-12-14 07:28:42 +00:00
size_t mark_num ;
2020-12-14 11:06:02 +00:00
2020-12-14 07:28:42 +00:00
for ( mark_num = 0 ; ! mrk_in . eof ( ) ; + + mark_num )
{
2020-12-14 11:06:02 +00:00
if ( mark_num > index_granularity . getMarksCount ( ) )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Incorrect number of marks in memory {}, on disk (at least) { } " , index_granularity.getMarksCount(), mark_num + 1);
2020-12-14 07:28:42 +00:00
DB : : readBinary ( offset_in_compressed_file , mrk_in ) ;
DB : : readBinary ( offset_in_decompressed_block , mrk_in ) ;
if ( settings . can_use_adaptive_granularity )
DB : : readBinary ( index_granularity_rows , mrk_in ) ;
else
2020-12-19 18:59:20 +00:00
index_granularity_rows = data_part - > index_granularity_info . fixed_index_granularity ;
2020-12-14 07:28:42 +00:00
if ( must_be_last )
{
if ( index_granularity_rows ! = 0 )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " We ran out of binary data but still have non empty mark #{} with rows number {} " , mark_num , index_granularity_rows ) ;
if ( ! mrk_in . eof ( ) )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Mark #{} must be last, but we still have some to read " , mark_num ) ;
2020-12-14 11:06:02 +00:00
break ;
}
if ( index_granularity_rows = = 0 )
{
auto column = type . createColumn ( ) ;
type . deserializeBinaryBulk ( * column , bin_in , 1000000000 , 0.0 ) ;
throw Exception ( ErrorCodes : : LOGICAL_ERROR ,
" Still have {} rows in bin stream, last mark #{} index granularity size {}, last rows {} " , column - > size ( ) , mark_num , index_granularity . getMarksCount ( ) , index_granularity_rows ) ;
2020-12-14 07:28:42 +00:00
}
if ( index_granularity_rows ! = index_granularity . getMarkRows ( mark_num ) )
throw Exception (
2020-12-18 20:32:52 +00:00
ErrorCodes : : LOGICAL_ERROR , " Incorrect mark rows for part {} for mark #{} (compressed offset {}, decompressed offset {}), in-memory {}, on disk {}, total marks {} " ,
data_part - > getFullPath ( ) , mark_num , offset_in_compressed_file , offset_in_decompressed_block , index_granularity . getMarkRows ( mark_num ) , index_granularity_rows , index_granularity . getMarksCount ( ) ) ;
2020-12-14 07:28:42 +00:00
auto column = type . createColumn ( ) ;
type . deserializeBinaryBulk ( * column , bin_in , index_granularity_rows , 0.0 ) ;
if ( bin_in . eof ( ) )
{
must_be_last = true ;
}
2020-12-18 13:49:45 +00:00
/// Now they must be equal
if ( column - > size ( ) ! = index_granularity_rows )
2020-12-14 07:28:42 +00:00
{
2020-12-18 20:32:52 +00:00
if ( must_be_last & & ! settings . can_use_adaptive_granularity )
break ;
2020-12-14 07:28:42 +00:00
throw Exception (
ErrorCodes : : LOGICAL_ERROR , " Incorrect mark rows for mark #{} (compressed offset {}, decompressed offset {}), actually in bin file {}, in mrk file {} " ,
mark_num , offset_in_compressed_file , offset_in_decompressed_block , column - > size ( ) , index_granularity . getMarkRows ( mark_num ) ) ;
}
}
if ( ! mrk_in . eof ( ) )
2020-12-14 11:06:02 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR ,
" Still have something in marks stream, last mark #{} index granularity size {}, last rows {} " , mark_num , index_granularity . getMarksCount ( ) , index_granularity_rows ) ;
if ( ! bin_in . eof ( ) )
2020-12-14 07:28:42 +00:00
{
2020-12-14 11:06:02 +00:00
auto column = type . createColumn ( ) ;
type . deserializeBinaryBulk ( * column , bin_in , 1000000000 , 0.0 ) ;
throw Exception ( ErrorCodes : : LOGICAL_ERROR ,
" Still have {} rows in bin stream, last mark #{} index granularity size {}, last rows {} " , column - > size ( ) , mark_num , index_granularity . getMarksCount ( ) , index_granularity_rows ) ;
2020-12-14 07:28:42 +00:00
}
2019-10-21 00:28:29 +00:00
}
2020-06-25 16:55:45 +00:00
void MergeTreeDataPartWriterWide : : finishDataSerialization ( IMergeTreeDataPart : : Checksums & checksums , bool sync )
2019-10-21 17:23:06 +00:00
{
2019-12-12 18:55:19 +00:00
const auto & global_settings = storage . global_context . getSettingsRef ( ) ;
2019-10-21 17:23:06 +00:00
IDataType : : SerializeBinaryBulkSettings serialize_settings ;
2019-12-12 18:55:19 +00:00
serialize_settings . low_cardinality_max_dictionary_size = global_settings . low_cardinality_max_dictionary_size ;
serialize_settings . low_cardinality_use_single_dictionary_for_part = global_settings . low_cardinality_use_single_dictionary_for_part ! = 0 ;
2019-10-21 17:23:06 +00:00
WrittenOffsetColumns offset_columns ;
2020-12-18 13:49:45 +00:00
if ( rows_written_in_last_mark > 0 )
2020-12-21 08:24:52 +00:00
adjustLastMarkIfNeedAndFlushToDisk ( rows_written_in_last_mark ) ;
2019-10-21 17:23:06 +00:00
2019-11-18 15:18:50 +00:00
bool write_final_mark = ( with_final_mark & & data_written ) ;
2019-10-21 17:23:06 +00:00
{
auto it = columns_list . begin ( ) ;
for ( size_t i = 0 ; i < columns_list . size ( ) ; + + i , + + it )
{
if ( ! serialization_states . empty ( ) )
{
2020-10-21 23:02:20 +00:00
serialize_settings . getter = createStreamGetter ( * it , written_offset_columns ? * written_offset_columns : offset_columns ) ;
2019-11-07 11:11:38 +00:00
it - > type - > serializeBinaryBulkStateSuffix ( serialize_settings , serialization_states [ it - > name ] ) ;
2019-10-21 17:23:06 +00:00
}
if ( write_final_mark )
2020-10-21 23:02:20 +00:00
writeFinalMark ( * it , offset_columns , serialize_settings . path ) ;
2019-10-21 17:23:06 +00:00
}
}
2020-03-09 01:59:08 +00:00
for ( auto & stream : column_streams )
2019-10-21 17:23:06 +00:00
{
2020-03-09 01:59:08 +00:00
stream . second - > finalize ( ) ;
stream . second - > addToChecksums ( checksums ) ;
2020-06-25 16:55:45 +00:00
if ( sync )
stream . second - > sync ( ) ;
2019-10-21 17:23:06 +00:00
}
column_streams . clear ( ) ;
2019-10-22 17:42:59 +00:00
serialization_states . clear ( ) ;
2020-12-14 07:28:42 +00:00
# ifndef NDEBUG
2020-12-19 18:59:20 +00:00
/// Heavy weight validation of written data. Checks that we are able to read
/// data according to marks. Otherwise throws LOGICAL_ERROR (equal to about in debug mode)
2020-12-14 07:28:42 +00:00
for ( const auto & column : columns_list )
{
if ( column . type - > isValueRepresentedByNumber ( ) & & ! column . type - > haveSubtypes ( ) )
validateColumnOfFixedSize ( column . name , * column . type ) ;
}
# endif
2019-10-21 17:23:06 +00:00
}
2020-12-10 08:57:52 +00:00
void MergeTreeDataPartWriterWide : : finish ( IMergeTreeDataPart : : Checksums & checksums , bool sync )
{
finishDataSerialization ( checksums , sync ) ;
if ( settings . rewrite_primary_key )
finishPrimaryIndexSerialization ( checksums , sync ) ;
finishSkipIndicesSerialization ( checksums , sync ) ;
2019-10-21 17:23:06 +00:00
}
void MergeTreeDataPartWriterWide : : writeFinalMark (
2020-10-21 23:02:20 +00:00
const NameAndTypePair & column ,
2019-10-21 17:23:06 +00:00
WrittenOffsetColumns & offset_columns ,
DB : : IDataType : : SubstreamPath & path )
{
2020-10-21 23:02:20 +00:00
writeSingleMark ( column , offset_columns , 0 , path ) ;
2019-10-21 17:23:06 +00:00
/// Memoize information about offsets
2020-10-21 23:02:20 +00:00
column . type - > enumerateStreams ( [ & ] ( const IDataType : : SubstreamPath & substream_path , const IDataType & /* substream_type */ )
2019-10-21 17:23:06 +00:00
{
bool is_offsets = ! substream_path . empty ( ) & & substream_path . back ( ) . type = = IDataType : : Substream : : ArraySizes ;
if ( is_offsets )
{
2020-10-21 23:02:20 +00:00
String stream_name = IDataType : : getFileNameForStream ( column , substream_path ) ;
2019-10-21 17:23:06 +00:00
offset_columns . insert ( stream_name ) ;
}
} , path ) ;
}
2020-12-09 18:10:09 +00:00
static void fillIndexGranularityImpl (
MergeTreeIndexGranularity & index_granularity ,
2020-12-14 13:01:01 +00:00
size_t index_offset ,
2020-12-09 18:10:09 +00:00
size_t index_granularity_for_block ,
size_t rows_in_block )
{
2020-12-14 13:01:01 +00:00
for ( size_t current_row = index_offset ; current_row < rows_in_block ; current_row + = index_granularity_for_block )
2020-12-09 18:10:09 +00:00
index_granularity . appendMark ( index_granularity_for_block ) ;
}
void MergeTreeDataPartWriterWide : : fillIndexGranularity ( size_t index_granularity_for_block , size_t rows_in_block )
{
2020-12-14 11:06:02 +00:00
if ( getCurrentMark ( ) < index_granularity . getMarksCount ( ) & & getCurrentMark ( ) ! = index_granularity . getMarksCount ( ) - 1 )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Trying to add marks, while current mark {}, but total marks {} " , getCurrentMark ( ) , index_granularity . getMarksCount ( ) ) ;
2020-12-14 13:01:01 +00:00
size_t index_offset = 0 ;
if ( rows_written_in_last_mark ! = 0 )
index_offset = index_granularity . getLastMarkRows ( ) - rows_written_in_last_mark ;
2020-12-09 18:10:09 +00:00
fillIndexGranularityImpl (
index_granularity ,
2020-12-14 13:01:01 +00:00
index_offset ,
2020-12-09 18:10:09 +00:00
index_granularity_for_block ,
rows_in_block ) ;
}
2020-12-18 13:49:45 +00:00
2020-12-21 08:24:52 +00:00
void MergeTreeDataPartWriterWide : : adjustLastMarkIfNeedAndFlushToDisk ( size_t new_rows_in_last_mark )
2020-12-18 13:49:45 +00:00
{
2020-12-19 18:59:20 +00:00
/// We can adjust marks only if we computed granularity for blocks.
/// Otherwise we cannot change granularity because it will differ from
/// other columns
if ( compute_granularity & & settings . can_use_adaptive_granularity )
2020-12-18 20:32:52 +00:00
{
2020-12-19 18:59:20 +00:00
if ( getCurrentMark ( ) ! = index_granularity . getMarksCount ( ) - 1 )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Non last mark {} (with {} rows) having rows offset { } , total marks { } " ,
getCurrentMark ( ) , index_granularity . getMarkRows ( getCurrentMark ( ) ) , rows_written_in_last_mark , index_granularity . getMarksCount ( ) ) ;
2020-12-18 20:32:52 +00:00
index_granularity . popMark ( ) ;
2020-12-21 08:24:52 +00:00
index_granularity . appendMark ( new_rows_in_last_mark ) ;
2020-12-18 20:32:52 +00:00
}
2020-12-19 18:59:20 +00:00
/// Last mark should be filled, otherwise it's a bug
if ( last_non_written_marks . empty ( ) )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " No saved marks for last mark {} having rows offset {}, total marks {} " ,
getCurrentMark ( ) , rows_written_in_last_mark , index_granularity . getMarksCount ( ) ) ;
2020-12-21 08:24:52 +00:00
if ( rows_written_in_last_mark = = new_rows_in_last_mark )
2020-12-18 20:32:52 +00:00
{
2020-12-21 08:24:52 +00:00
for ( const auto & [ name , marks ] : last_non_written_marks )
{
for ( const auto & mark : marks )
flushMarkToFile ( mark , index_granularity . getMarkRows ( getCurrentMark ( ) ) ) ;
}
2020-12-18 13:49:45 +00:00
2020-12-21 08:24:52 +00:00
last_non_written_marks . clear ( ) ;
2020-12-18 13:49:45 +00:00
2020-12-21 08:24:52 +00:00
if ( compute_granularity & & settings . can_use_adaptive_granularity )
{
/// Also we add mark to each skip index because all of them
/// already accumulated all rows from current adjusting mark
for ( size_t i = 0 ; i < skip_indices . size ( ) ; + + i )
+ + skip_index_accumulated_marks [ i ] ;
/// This mark completed, go further
setCurrentMark ( getCurrentMark ( ) + 1 ) ;
/// Without offset
rows_written_in_last_mark = 0 ;
}
2020-12-18 20:32:52 +00:00
}
2020-12-18 13:49:45 +00:00
}
2019-10-21 00:28:29 +00:00
}