2019-10-16 18:27:53 +00:00
# include <Storages/MergeTree/MergeTreeDataPartWriterCompact.h>
2020-02-06 15:32:00 +00:00
# include <Storages/MergeTree/MergeTreeDataPartCompact.h>
2019-10-16 18:27:53 +00:00
namespace DB
{
2020-09-21 17:35:09 +00:00
2020-12-14 12:52:15 +00:00
namespace ErrorCodes
{
extern const int LOGICAL_ERROR ;
}
2019-10-22 10:50:17 +00:00
MergeTreeDataPartWriterCompact : : MergeTreeDataPartWriterCompact (
2020-05-10 13:33:27 +00:00
const MergeTreeData : : DataPartPtr & data_part_ ,
2019-10-22 10:50:17 +00:00
const NamesAndTypesList & columns_list_ ,
2020-06-17 12:39:20 +00:00
const StorageMetadataPtr & metadata_snapshot_ ,
2019-12-18 16:41:11 +00:00
const std : : vector < MergeTreeIndexPtr > & indices_to_recalc_ ,
2019-10-22 10:50:17 +00:00
const String & marks_file_extension_ ,
const CompressionCodecPtr & default_codec_ ,
2019-12-18 15:54:45 +00:00
const MergeTreeWriterSettings & settings_ ,
2019-11-07 11:11:38 +00:00
const MergeTreeIndexGranularity & index_granularity_ )
2020-06-26 11:27:19 +00:00
: MergeTreeDataPartWriterOnDisk ( data_part_ , columns_list_ , metadata_snapshot_ ,
2020-04-14 01:26:34 +00:00
indices_to_recalc_ , marks_file_extension_ ,
2020-04-30 15:27:39 +00:00
default_codec_ , settings_ , index_granularity_ )
2020-07-07 00:15:02 +00:00
, plain_file ( data_part - > volume - > getDisk ( ) - > writeFile (
part_path + MergeTreeDataPartCompact : : DATA_FILE_NAME_WITH_EXTENSION ,
2020-07-10 23:33:36 +00:00
settings . max_compress_block_size ,
2021-01-10 00:28:59 +00:00
WriteMode : : Rewrite ) )
2020-07-07 00:15:02 +00:00
, plain_hashing ( * plain_file )
, marks_file ( data_part - > volume - > getDisk ( ) - > writeFile (
part_path + MergeTreeDataPartCompact : : DATA_FILE_NAME + marks_file_extension_ ,
4096 ,
WriteMode : : Rewrite ) )
, marks ( * marks_file )
2019-10-22 10:50:17 +00:00
{
2020-07-07 00:15:02 +00:00
const auto & storage_columns = metadata_snapshot - > getColumns ( ) ;
for ( const auto & column : columns_list )
2020-10-21 23:02:20 +00:00
addStreams ( column , storage_columns . getCodecDescOrDefault ( column . name , default_codec ) ) ;
2020-09-21 17:35:09 +00:00
}
2020-10-21 23:02:20 +00:00
void MergeTreeDataPartWriterCompact : : addStreams ( const NameAndTypePair & column , const ASTPtr & effective_codec_desc )
2020-09-21 17:35:09 +00:00
{
IDataType : : StreamCallback callback = [ & ] ( const IDataType : : SubstreamPath & substream_path , const IDataType & substream_type )
2020-09-03 22:04:46 +00:00
{
2020-10-21 23:02:20 +00:00
String stream_name = IDataType : : getFileNameForStream ( column , substream_path ) ;
2020-09-21 17:35:09 +00:00
/// Shared offsets for Nested type.
if ( compressed_streams . count ( stream_name ) )
return ;
CompressionCodecPtr compression_codec ;
2020-09-22 12:49:55 +00:00
/// If we can use special codec than just get it
2020-09-21 17:35:09 +00:00
if ( IDataType : : isSpecialCompressionAllowed ( substream_path ) )
compression_codec = CompressionCodecFactory : : instance ( ) . get ( effective_codec_desc , & substream_type , default_codec ) ;
2020-09-22 12:49:55 +00:00
else /// otherwise return only generic codecs and don't use info about data_type
2020-09-21 17:35:09 +00:00
compression_codec = CompressionCodecFactory : : instance ( ) . get ( effective_codec_desc , nullptr , default_codec , true ) ;
UInt64 codec_id = compression_codec - > getHash ( ) ;
auto & stream = streams_by_codec [ codec_id ] ;
2020-09-03 22:04:46 +00:00
if ( ! stream )
2020-09-22 12:16:15 +00:00
stream = std : : make_shared < CompressedStream > ( plain_hashing , compression_codec ) ;
2020-09-03 22:04:46 +00:00
2020-09-21 17:35:09 +00:00
compressed_streams . emplace ( stream_name , stream ) ;
} ;
IDataType : : SubstreamPath stream_path ;
2020-10-21 23:02:20 +00:00
column . type - > enumerateStreams ( callback , stream_path ) ;
2019-10-22 10:50:17 +00:00
}
2020-12-14 12:03:49 +00:00
namespace
{
2020-12-15 10:34:28 +00:00
/// Get granules for block using index_granularity
2020-12-15 09:54:48 +00:00
Granules getGranulesToWrite ( const MergeTreeIndexGranularity & index_granularity , size_t block_rows , size_t current_mark , bool last_block )
2020-12-14 12:03:49 +00:00
{
if ( current_mark > = index_granularity . getMarksCount ( ) )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Request to get granules from mark {} but index granularity size is {} " , current_mark , index_granularity . getMarksCount ( ) ) ;
Granules result ;
size_t current_row = 0 ;
while ( current_row < block_rows )
{
2020-12-17 08:17:39 +00:00
size_t expected_rows_in_mark = index_granularity . getMarkRows ( current_mark ) ;
size_t rows_left_in_block = block_rows - current_row ;
if ( rows_left_in_block < expected_rows_in_mark & & ! last_block )
2020-12-15 09:54:48 +00:00
{
2020-12-15 10:34:28 +00:00
/// Invariant: we always have equal amount of rows for block in compact parts because we accumulate them in buffer.
/// The only exclusion is the last block, when we cannot accumulate more rows.
2020-12-17 08:17:39 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Required to write {} rows, but only {} rows was written for the non last granule " , expected_rows_in_mark , rows_left_in_block ) ;
2020-12-15 09:54:48 +00:00
}
2020-12-14 12:03:49 +00:00
2020-12-17 08:17:39 +00:00
result . emplace_back ( Granule {
. start_row = current_row ,
2020-12-18 14:44:31 +00:00
. rows_to_write = std : : min ( rows_left_in_block , expected_rows_in_mark ) ,
2020-12-17 08:17:39 +00:00
. mark_number = current_mark ,
2020-12-18 14:44:31 +00:00
. mark_on_start = true ,
. is_complete = ( rows_left_in_block > = expected_rows_in_mark )
2020-12-17 08:17:39 +00:00
} ) ;
2020-12-18 14:44:31 +00:00
current_row + = result . back ( ) . rows_to_write ;
2020-12-14 12:03:49 +00:00
current_mark + + ;
}
return result ;
}
/// Write single granule of one column (rows between 2 marks)
void writeColumnSingleGranule (
const ColumnWithTypeAndName & column ,
IDataType : : OutputStreamGetter stream_getter ,
size_t from_row ,
size_t number_of_rows )
{
IDataType : : SerializeBinaryBulkStatePtr state ;
IDataType : : SerializeBinaryBulkSettings serialize_settings ;
serialize_settings . getter = stream_getter ;
serialize_settings . position_independent_encoding = true ;
serialize_settings . low_cardinality_max_dictionary_size = 0 ;
column . type - > serializeBinaryBulkStatePrefix ( serialize_settings , state ) ;
column . type - > serializeBinaryBulkWithMultipleStreams ( * column . column , from_row , number_of_rows , serialize_settings , state ) ;
column . type - > serializeBinaryBulkStateSuffix ( serialize_settings , state ) ;
}
}
2020-12-10 08:57:52 +00:00
void MergeTreeDataPartWriterCompact : : write ( const Block & block , const IColumn : : Permutation * permutation )
2019-11-27 11:35:27 +00:00
{
2019-11-27 19:57:07 +00:00
/// Fill index granularity for this block
/// if it's unknown (in case of insert data or horizontal merge,
/// but not in case of vertical merge)
if ( compute_granularity )
2020-04-26 21:19:25 +00:00
{
size_t index_granularity_for_block = computeIndexGranularity ( block ) ;
fillIndexGranularity ( index_granularity_for_block , block . rows ( ) ) ;
}
2019-11-27 19:57:07 +00:00
2020-12-14 14:27:39 +00:00
Block result_block = permuteBlockIfNeeded ( block , permutation ) ;
2019-11-27 11:35:27 +00:00
2019-11-28 20:14:41 +00:00
if ( ! header )
header = result_block . cloneEmpty ( ) ;
2019-12-27 21:17:53 +00:00
columns_buffer . add ( result_block . mutateColumns ( ) ) ;
2020-12-17 10:25:31 +00:00
size_t current_mark_rows = index_granularity . getMarkRows ( getCurrentMark ( ) ) ;
2019-12-27 21:17:53 +00:00
size_t rows_in_buffer = columns_buffer . size ( ) ;
2019-12-27 21:32:55 +00:00
2020-12-17 08:17:39 +00:00
if ( rows_in_buffer > = current_mark_rows )
2019-12-27 21:17:53 +00:00
{
2020-12-11 15:00:58 +00:00
Block flushed_block = header . cloneWithColumns ( columns_buffer . releaseColumns ( ) ) ;
2020-12-17 10:25:31 +00:00
auto granules_to_write = getGranulesToWrite ( index_granularity , flushed_block . rows ( ) , getCurrentMark ( ) , /* last_block = */ false ) ;
2020-12-14 12:51:14 +00:00
writeDataBlockPrimaryIndexAndSkipIndices ( flushed_block , granules_to_write ) ;
2020-12-17 10:25:31 +00:00
setCurrentMark ( getCurrentMark ( ) + granules_to_write . size ( ) ) ;
2019-12-27 21:17:53 +00:00
}
2019-11-27 11:35:27 +00:00
}
2020-12-14 12:51:14 +00:00
void MergeTreeDataPartWriterCompact : : writeDataBlockPrimaryIndexAndSkipIndices ( const Block & block , const Granules & granules_to_write )
2019-10-16 18:27:53 +00:00
{
2020-12-14 12:51:14 +00:00
writeDataBlock ( block , granules_to_write ) ;
2019-10-16 18:27:53 +00:00
2020-12-14 12:51:14 +00:00
if ( settings . rewrite_primary_key )
2019-10-16 18:27:53 +00:00
{
2020-12-14 12:51:14 +00:00
Block primary_key_block = getBlockAndPermute ( block , metadata_snapshot - > getPrimaryKeyColumns ( ) , nullptr ) ;
calculateAndSerializePrimaryIndex ( primary_key_block , granules_to_write ) ;
2020-12-11 13:20:19 +00:00
}
2019-11-27 11:35:27 +00:00
2020-12-14 12:51:14 +00:00
Block skip_indices_block = getBlockAndPermute ( block , getSkipIndicesColumns ( ) , nullptr ) ;
calculateAndSerializeSkipIndices ( skip_indices_block , granules_to_write ) ;
2019-11-27 11:35:27 +00:00
}
2020-12-14 12:51:14 +00:00
void MergeTreeDataPartWriterCompact : : writeDataBlock ( const Block & block , const Granules & granules )
2019-10-16 18:27:53 +00:00
{
2020-12-11 13:20:19 +00:00
for ( const auto & granule : granules )
2019-10-16 18:27:53 +00:00
{
2020-12-18 14:44:31 +00:00
data_written = true ;
2019-12-18 16:41:11 +00:00
2020-09-03 22:04:46 +00:00
auto name_and_type = columns_list . begin ( ) ;
for ( size_t i = 0 ; i < columns_list . size ( ) ; + + i , + + name_and_type )
2019-12-27 21:17:53 +00:00
{
2020-09-22 13:48:38 +00:00
/// Tricky part, because we share compressed streams between different columns substreams.
/// Compressed streams write data to the single file, but with different compression codecs.
/// So we flush each stream (using next()) before using new one, because otherwise we will override
/// data in result file.
2020-09-22 12:16:15 +00:00
CompressedStreamPtr prev_stream ;
2020-09-21 17:35:09 +00:00
auto stream_getter = [ & , this ] ( const IDataType : : SubstreamPath & substream_path ) - > WriteBuffer *
{
2020-10-21 23:02:20 +00:00
String stream_name = IDataType : : getFileNameForStream ( * name_and_type , substream_path ) ;
2020-09-21 17:35:09 +00:00
2020-09-22 12:16:15 +00:00
auto & result_stream = compressed_streams [ stream_name ] ;
/// Write one compressed block per column in granule for more optimal reading.
if ( prev_stream & & prev_stream ! = result_stream )
{
/// Offset should be 0, because compressed block is written for every granule.
2020-09-21 17:35:09 +00:00
assert ( result_stream - > hashing_buf . offset ( ) = = 0 ) ;
2020-09-22 12:16:15 +00:00
prev_stream - > hashing_buf . next ( ) ;
}
prev_stream = result_stream ;
2020-01-15 16:39:29 +00:00
2020-09-21 17:35:09 +00:00
return & result_stream - > hashing_buf ;
} ;
2020-09-08 16:28:49 +00:00
2020-09-22 12:16:15 +00:00
2020-07-07 00:15:02 +00:00
writeIntBinary ( plain_hashing . count ( ) , marks ) ;
2020-09-08 16:28:49 +00:00
writeIntBinary ( UInt64 ( 0 ) , marks ) ;
2019-12-27 21:17:53 +00:00
2020-12-18 14:44:31 +00:00
writeColumnSingleGranule ( block . getByName ( name_and_type - > name ) , stream_getter , granule . start_row , granule . rows_to_write ) ;
2020-09-23 10:11:48 +00:00
2020-09-23 13:21:06 +00:00
/// Each type always have at least one substream
prev_stream - > hashing_buf . next ( ) ; //-V522
2019-12-27 21:17:53 +00:00
}
2019-11-05 11:53:22 +00:00
2020-12-18 14:44:31 +00:00
writeIntBinary ( granule . rows_to_write , marks ) ;
2019-10-16 18:27:53 +00:00
}
}
2020-06-25 16:55:45 +00:00
void MergeTreeDataPartWriterCompact : : finishDataSerialization ( IMergeTreeDataPart : : Checksums & checksums , bool sync )
2019-12-18 16:41:11 +00:00
{
2019-12-27 21:17:53 +00:00
if ( columns_buffer . size ( ) ! = 0 )
2020-12-11 15:00:58 +00:00
{
auto block = header . cloneWithColumns ( columns_buffer . releaseColumns ( ) ) ;
2020-12-15 09:54:48 +00:00
auto granules_to_write = getGranulesToWrite ( index_granularity , block . rows ( ) , getCurrentMark ( ) , /* last_block = */ true ) ;
2020-12-18 14:44:31 +00:00
if ( ! granules_to_write . back ( ) . is_complete )
2020-12-15 09:54:48 +00:00
{
/// Correct last mark as it should contain exact amount of rows.
index_granularity . popMark ( ) ;
2020-12-18 14:44:31 +00:00
index_granularity . appendMark ( granules_to_write . back ( ) . rows_to_write ) ;
2020-12-15 09:54:48 +00:00
}
2020-12-14 12:51:14 +00:00
writeDataBlockPrimaryIndexAndSkipIndices ( block , granules_to_write ) ;
2020-12-11 15:00:58 +00:00
}
2019-11-27 11:35:27 +00:00
2020-09-08 16:28:49 +00:00
# ifndef NDEBUG
/// Offsets should be 0, because compressed block is written for every granule.
for ( const auto & [ _ , stream ] : streams_by_codec )
assert ( stream - > hashing_buf . offset ( ) = = 0 ) ;
# endif
2019-11-07 11:11:38 +00:00
if ( with_final_mark & & data_written )
2019-10-21 17:23:06 +00:00
{
for ( size_t i = 0 ; i < columns_list . size ( ) ; + + i )
{
2020-07-07 00:15:02 +00:00
writeIntBinary ( plain_hashing . count ( ) , marks ) ;
writeIntBinary ( UInt64 ( 0 ) , marks ) ;
2019-10-21 17:23:06 +00:00
}
2020-07-07 00:15:02 +00:00
writeIntBinary ( UInt64 ( 0 ) , marks ) ;
2019-10-21 17:23:06 +00:00
}
2020-07-07 00:15:02 +00:00
plain_file - > next ( ) ;
marks . next ( ) ;
addToChecksums ( checksums ) ;
2020-09-10 21:39:21 +00:00
2020-10-06 09:38:00 +00:00
plain_file - > finalize ( ) ;
marks_file - > finalize ( ) ;
2020-06-25 16:55:45 +00:00
if ( sync )
2020-09-10 21:39:21 +00:00
{
plain_file - > sync ( ) ;
marks_file - > sync ( ) ;
}
2019-10-21 17:23:06 +00:00
}
2020-04-29 21:57:58 +00:00
static void fillIndexGranularityImpl (
2020-04-26 21:19:25 +00:00
MergeTreeIndexGranularity & index_granularity ,
2020-04-29 21:57:58 +00:00
size_t index_offset ,
2020-04-26 21:19:25 +00:00
size_t index_granularity_for_block ,
size_t rows_in_block )
{
for ( size_t current_row = index_offset ; current_row < rows_in_block ; current_row + = index_granularity_for_block )
{
size_t rows_left_in_block = rows_in_block - current_row ;
/// Try to extend last granule if block is large enough
/// or it isn't first in granule (index_offset != 0).
if ( rows_left_in_block < index_granularity_for_block & &
( rows_in_block > = index_granularity_for_block | | index_offset ! = 0 ) )
{
// If enough rows are left, create a new granule. Otherwise, extend previous granule.
// So, real size of granule differs from index_granularity_for_block not more than 50%.
if ( rows_left_in_block * 2 > = index_granularity_for_block )
index_granularity . appendMark ( rows_left_in_block ) ;
else
index_granularity . addRowsToLastMark ( rows_left_in_block ) ;
}
else
{
index_granularity . appendMark ( index_granularity_for_block ) ;
}
}
}
void MergeTreeDataPartWriterCompact : : fillIndexGranularity ( size_t index_granularity_for_block , size_t rows_in_block )
{
2020-12-14 12:51:14 +00:00
size_t index_offset = 0 ;
if ( index_granularity . getMarksCount ( ) > getCurrentMark ( ) )
index_offset = index_granularity . getMarkRows ( getCurrentMark ( ) ) - columns_buffer . size ( ) ;
2020-04-26 21:19:25 +00:00
fillIndexGranularityImpl (
index_granularity ,
2020-12-11 15:00:58 +00:00
index_offset ,
2020-04-26 21:19:25 +00:00
index_granularity_for_block ,
rows_in_block ) ;
}
2020-07-07 00:15:02 +00:00
void MergeTreeDataPartWriterCompact : : addToChecksums ( MergeTreeDataPartChecksums & checksums )
{
String data_file_name = MergeTreeDataPartCompact : : DATA_FILE_NAME_WITH_EXTENSION ;
String marks_file_name = MergeTreeDataPartCompact : : DATA_FILE_NAME + marks_file_extension ;
size_t uncompressed_size = 0 ;
2020-09-03 14:53:05 +00:00
CityHash_v1_0_2 : : uint128 uncompressed_hash { 0 , 0 } ;
2020-07-07 00:15:02 +00:00
2020-09-03 22:38:17 +00:00
for ( const auto & [ _ , stream ] : streams_by_codec )
2020-07-07 00:15:02 +00:00
{
uncompressed_size + = stream - > hashing_buf . count ( ) ;
2020-07-09 18:26:54 +00:00
auto stream_hash = stream - > hashing_buf . getHash ( ) ;
2020-07-07 00:15:02 +00:00
uncompressed_hash = CityHash_v1_0_2 : : CityHash128WithSeed (
2020-07-09 18:26:54 +00:00
reinterpret_cast < char * > ( & stream_hash ) , sizeof ( stream_hash ) , uncompressed_hash ) ;
2020-07-07 00:15:02 +00:00
}
2020-09-03 14:53:05 +00:00
checksums . files [ data_file_name ] . is_compressed = true ;
2020-07-07 00:15:02 +00:00
checksums . files [ data_file_name ] . uncompressed_size = uncompressed_size ;
checksums . files [ data_file_name ] . uncompressed_hash = uncompressed_hash ;
checksums . files [ data_file_name ] . file_size = plain_hashing . count ( ) ;
checksums . files [ data_file_name ] . file_hash = plain_hashing . getHash ( ) ;
checksums . files [ marks_file_name ] . file_size = marks . count ( ) ;
checksums . files [ marks_file_name ] . file_hash = marks . getHash ( ) ;
}
2019-12-27 21:17:53 +00:00
void MergeTreeDataPartWriterCompact : : ColumnsBuffer : : add ( MutableColumns & & columns )
{
if ( accumulated_columns . empty ( ) )
accumulated_columns = std : : move ( columns ) ;
else
{
for ( size_t i = 0 ; i < columns . size ( ) ; + + i )
accumulated_columns [ i ] - > insertRangeFrom ( * columns [ i ] , 0 , columns [ i ] - > size ( ) ) ;
}
}
Columns MergeTreeDataPartWriterCompact : : ColumnsBuffer : : releaseColumns ( )
{
Columns res ( std : : make_move_iterator ( accumulated_columns . begin ( ) ) ,
std : : make_move_iterator ( accumulated_columns . end ( ) ) ) ;
accumulated_columns . clear ( ) ;
return res ;
}
size_t MergeTreeDataPartWriterCompact : : ColumnsBuffer : : size ( ) const
{
if ( accumulated_columns . empty ( ) )
return 0 ;
return accumulated_columns . at ( 0 ) - > size ( ) ;
}
2020-12-10 08:57:52 +00:00
void MergeTreeDataPartWriterCompact : : finish ( IMergeTreeDataPart : : Checksums & checksums , bool sync )
{
finishDataSerialization ( checksums , sync ) ;
if ( settings . rewrite_primary_key )
finishPrimaryIndexSerialization ( checksums , sync ) ;
finishSkipIndicesSerialization ( checksums , sync ) ;
}
2019-10-21 15:33:59 +00:00
}