2017-04-01 09:19:00 +00:00
# include <Storages/MergeTree/MergedBlockOutputStream.h>
# include <IO/createWriteBufferFromFileBase.h>
# include <Common/escapeForFileName.h>
2017-12-25 18:58:39 +00:00
# include <DataTypes/NestedUtils.h>
2018-11-15 14:06:54 +00:00
# include <DataStreams/MarkInCompressedFile.h>
2018-01-15 19:07:47 +00:00
# include <Common/StringUtils/StringUtils.h>
2017-07-13 20:58:19 +00:00
# include <Common/typeid_cast.h>
2017-04-08 01:32:05 +00:00
# include <Common/MemoryTracker.h>
2017-01-21 04:24:28 +00:00
# include <Poco/File.h>
2016-07-21 16:22:24 +00:00
namespace DB
{
2019-03-14 23:10:51 +00:00
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS ;
}
2016-07-21 16:22:24 +00:00
namespace
{
constexpr auto DATA_FILE_EXTENSION = " .bin " ;
2018-11-15 14:06:54 +00:00
constexpr auto FIXED_MARKS_FILE_EXTENSION = " .mrk " ;
constexpr auto ADAPTIVE_MARKS_FILE_EXTENSION = " .mrk2 " ;
constexpr auto FIXED_MARK_BYTE_SIZE = sizeof ( MarkInCompressedFile ) ;
constexpr auto ADAPTIVE_MARK_BYTE_SIZE = sizeof ( MarkInCompressedFile ) + sizeof ( size_t ) ;
2019-02-05 14:50:25 +00:00
constexpr auto INDEX_FILE_EXTENSION = " .idx " ;
2016-07-21 16:22:24 +00:00
}
2018-10-16 21:22:41 +00:00
2016-07-21 16:22:24 +00:00
/// Implementation of IMergedBlockOutputStream.
IMergedBlockOutputStream : : IMergedBlockOutputStream (
2017-04-01 07:20:54 +00:00
MergeTreeData & storage_ ,
size_t min_compress_block_size_ ,
size_t max_compress_block_size_ ,
2018-12-21 12:17:30 +00:00
CompressionCodecPtr codec_ ,
2018-11-29 13:50:34 +00:00
size_t aio_threshold_ ,
2018-11-30 15:36:10 +00:00
bool blocks_are_granules_size_ ,
2018-11-29 13:50:34 +00:00
const std : : vector < size_t > & index_granularity_ )
2018-11-30 15:36:10 +00:00
: storage ( storage_ )
, min_compress_block_size ( min_compress_block_size_ )
, max_compress_block_size ( max_compress_block_size_ )
, aio_threshold ( aio_threshold_ )
, marks_file_extension ( storage . index_granularity_bytes = = 0 ? FIXED_MARKS_FILE_EXTENSION : ADAPTIVE_MARKS_FILE_EXTENSION )
, mark_size_in_bytes ( storage . index_granularity_bytes = = 0 ? FIXED_MARK_BYTE_SIZE : ADAPTIVE_MARK_BYTE_SIZE )
, blocks_are_granules_size ( blocks_are_granules_size_ )
2018-12-03 08:52:21 +00:00
, index_granularity ( index_granularity_ )
2018-11-30 15:36:10 +00:00
, compute_granularity ( index_granularity . empty ( ) )
2019-03-18 12:02:33 +00:00
, codec ( std : : move ( codec_ ) )
2016-07-21 16:22:24 +00:00
{
2018-12-03 08:52:21 +00:00
if ( blocks_are_granules_size & & ! index_granularity . empty ( ) )
2018-12-04 14:44:42 +00:00
throw Exception ( " Can't take information about index granularity from blocks, when non empty index_granularity array specified " , ErrorCodes : : LOGICAL_ERROR ) ;
2016-07-21 16:22:24 +00:00
}
2016-12-10 04:51:36 +00:00
2017-11-20 02:15:15 +00:00
void IMergedBlockOutputStream : : addStreams (
2017-04-01 07:20:54 +00:00
const String & path ,
const String & name ,
const IDataType & type ,
2019-01-04 12:10:00 +00:00
const CompressionCodecPtr & effective_codec ,
2017-04-01 07:20:54 +00:00
size_t estimated_size ,
bool skip_offsets )
2016-07-21 16:22:24 +00:00
{
2017-08-07 07:31:16 +00:00
IDataType : : StreamCallback callback = [ & ] ( const IDataType : : SubstreamPath & substream_path )
2017-04-01 07:20:54 +00:00
{
2017-08-07 07:31:16 +00:00
if ( skip_offsets & & ! substream_path . empty ( ) & & substream_path . back ( ) . type = = IDataType : : Substream : : ArraySizes )
return ;
2017-04-01 07:20:54 +00:00
2017-08-07 07:31:16 +00:00
String stream_name = IDataType : : getFileNameForStream ( name , substream_path ) ;
2017-04-01 07:20:54 +00:00
2017-11-20 02:15:15 +00:00
/// Shared offsets for Nested type.
2017-08-07 07:31:16 +00:00
if ( column_streams . count ( stream_name ) )
return ;
column_streams [ stream_name ] = std : : make_unique < ColumnStream > (
stream_name ,
path + stream_name , DATA_FILE_EXTENSION ,
2018-11-15 14:06:54 +00:00
path + stream_name , marks_file_extension ,
2019-01-04 12:10:00 +00:00
effective_codec ,
2017-04-01 07:20:54 +00:00
max_compress_block_size ,
estimated_size ,
aio_threshold ) ;
2017-08-07 07:31:16 +00:00
} ;
2016-07-21 16:22:24 +00:00
2018-06-07 18:14:37 +00:00
IDataType : : SubstreamPath stream_path ;
type . enumerateStreams ( callback , stream_path ) ;
}
IDataType : : OutputStreamGetter IMergedBlockOutputStream : : createStreamGetter (
2018-10-16 21:22:41 +00:00
const String & name , WrittenOffsetColumns & offset_columns , bool skip_offsets )
2018-06-07 18:14:37 +00:00
{
return [ & , skip_offsets ] ( const IDataType : : SubstreamPath & substream_path ) - > WriteBuffer *
{
bool is_offsets = ! substream_path . empty ( ) & & substream_path . back ( ) . type = = IDataType : : Substream : : ArraySizes ;
if ( is_offsets & & skip_offsets )
return nullptr ;
String stream_name = IDataType : : getFileNameForStream ( name , substream_path ) ;
/// Don't write offsets more than one time for Nested type.
if ( is_offsets & & offset_columns . count ( stream_name ) )
return nullptr ;
return & column_streams [ stream_name ] - > compressed ;
} ;
2016-10-20 10:13:07 +00:00
}
2019-03-18 15:54:58 +00:00
void fillIndexGranularityImpl (
const Block & block ,
size_t index_granularity_bytes ,
size_t index_granularity_rows ,
bool blocks_are_granules ,
size_t index_offset ,
std : : vector < size_t > & index_granularity )
2018-11-12 16:14:37 +00:00
{
2019-03-18 15:54:58 +00:00
size_t rows_in_block = block . rows ( ) ;
2018-11-30 15:36:10 +00:00
size_t index_granularity_for_block ;
2019-03-18 15:54:58 +00:00
if ( index_granularity_bytes = = 0 )
index_granularity_for_block = index_granularity_rows ;
2018-11-30 15:36:10 +00:00
else
{
2019-03-18 15:54:58 +00:00
size_t block_size_in_memory = block . bytes ( ) ;
if ( blocks_are_granules )
index_granularity_for_block = rows_in_block ;
else if ( block_size_in_memory > = index_granularity_bytes )
{
2019-03-19 09:57:29 +00:00
//std::cerr << "BLOCK SIZE In MEMORY:" << block_size_in_memory << std::endl;
2019-03-18 15:54:58 +00:00
size_t granules_in_block = block_size_in_memory / index_granularity_bytes ;
2019-03-19 09:57:29 +00:00
//std::cerr << "GRANULES IN BLOCK:" << granules_in_block << std::endl;
2019-03-18 15:54:58 +00:00
index_granularity_for_block = rows_in_block / granules_in_block ;
}
2018-11-30 15:36:10 +00:00
else
2019-03-18 15:54:58 +00:00
{
size_t size_of_row_in_bytes = block_size_in_memory / rows_in_block ;
index_granularity_for_block = index_granularity_bytes / size_of_row_in_bytes ;
}
2018-11-30 15:36:10 +00:00
}
2019-03-18 16:21:52 +00:00
if ( index_granularity_for_block = = 0 ) /// very rare case when index granularity bytes less then single row
index_granularity_for_block = 1 ;
2019-03-19 09:57:29 +00:00
//std::cerr << "GRANULARITY SIZE IN ROWS:"<< index_granularity_for_block << std::endl;
2018-12-04 08:05:58 +00:00
2019-03-18 15:54:58 +00:00
for ( size_t current_row = index_offset ; current_row < rows_in_block ; current_row + = index_granularity_for_block )
2018-11-30 15:36:10 +00:00
index_granularity . push_back ( index_granularity_for_block ) ;
2019-03-18 15:54:58 +00:00
}
2018-12-04 14:44:42 +00:00
2019-03-18 15:54:58 +00:00
void IMergedBlockOutputStream : : fillIndexGranularity ( const Block & block )
{
fillIndexGranularityImpl (
block ,
storage . index_granularity_bytes ,
storage . index_granularity ,
blocks_are_granules_size ,
index_offset ,
index_granularity ) ;
2018-11-12 16:14:37 +00:00
}
2018-11-30 15:36:10 +00:00
size_t IMergedBlockOutputStream : : writeSingleGranule (
2017-04-01 07:20:54 +00:00
const String & name ,
const IDataType & type ,
const IColumn & column ,
2018-10-16 21:22:41 +00:00
WrittenOffsetColumns & offset_columns ,
2018-06-07 18:14:37 +00:00
bool skip_offsets ,
2018-11-12 16:14:37 +00:00
IDataType : : SerializeBinaryBulkStatePtr & serialization_state ,
2018-11-30 15:36:10 +00:00
IDataType : : SerializeBinaryBulkSettings & serialize_settings ,
size_t from_row ,
size_t number_of_rows ,
bool write_marks )
2016-10-20 10:13:07 +00:00
{
2018-11-30 15:36:10 +00:00
if ( write_marks )
2017-04-01 07:20:54 +00:00
{
2018-11-30 15:36:10 +00:00
/// Write marks.
2019-03-18 15:54:58 +00:00
type . enumerateStreams ( [ & ] ( const IDataType : : SubstreamPath & substream_path )
2017-04-01 07:20:54 +00:00
{
2019-03-18 15:54:58 +00:00
bool is_offsets = ! substream_path . empty ( ) & & substream_path . back ( ) . type = = IDataType : : Substream : : ArraySizes ;
if ( is_offsets & & skip_offsets )
return ;
2017-04-01 07:20:54 +00:00
2019-03-18 15:54:58 +00:00
String stream_name = IDataType : : getFileNameForStream ( name , substream_path ) ;
2017-11-20 02:15:15 +00:00
2019-03-18 15:54:58 +00:00
/// Don't write offsets more than one time for Nested type.
if ( is_offsets & & offset_columns . count ( stream_name ) )
return ;
2017-11-20 02:15:15 +00:00
2019-03-18 15:54:58 +00:00
ColumnStream & stream = * column_streams [ stream_name ] ;
2017-04-01 07:20:54 +00:00
2019-03-18 15:54:58 +00:00
/// There could already be enough data to compress into the new block.
if ( stream . compressed . offset ( ) > = min_compress_block_size )
stream . compressed . next ( ) ;
2017-04-01 07:20:54 +00:00
2019-03-18 15:54:58 +00:00
writeIntBinary ( stream . plain_hashing . count ( ) , stream . marks ) ;
writeIntBinary ( stream . compressed . offset ( ) , stream . marks ) ;
if ( stream . marks_file_extension ! = FIXED_MARKS_FILE_EXTENSION )
writeIntBinary ( number_of_rows , stream . marks ) ;
} , serialize_settings . path ) ;
2018-11-30 15:36:10 +00:00
}
type . serializeBinaryBulkWithMultipleStreams ( column , from_row , number_of_rows , serialize_settings , serialization_state ) ;
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
type . enumerateStreams ( [ & ] ( const IDataType : : SubstreamPath & substream_path )
{
bool is_offsets = ! substream_path . empty ( ) & & substream_path . back ( ) . type = = IDataType : : Substream : : ArraySizes ;
if ( is_offsets & & skip_offsets )
return ;
String stream_name = IDataType : : getFileNameForStream ( name , substream_path ) ;
2017-04-01 07:20:54 +00:00
2018-11-30 15:36:10 +00:00
/// Don't write offsets more than one time for Nested type.
if ( is_offsets & & offset_columns . count ( stream_name ) )
return ;
column_streams [ stream_name ] - > compressed . nextIfAtEnd ( ) ;
} , serialize_settings . path ) ;
return from_row + number_of_rows ;
}
std : : pair < size_t , size_t > IMergedBlockOutputStream : : writeColumn (
const String & name ,
const IDataType & type ,
const IColumn & column ,
WrittenOffsetColumns & offset_columns ,
bool skip_offsets ,
IDataType : : SerializeBinaryBulkStatePtr & serialization_state ,
size_t from_mark )
{
2019-03-18 12:02:33 +00:00
auto & settings = storage . global_context . getSettingsRef ( ) ;
2018-11-30 15:36:10 +00:00
IDataType : : SerializeBinaryBulkSettings serialize_settings ;
serialize_settings . getter = createStreamGetter ( name , offset_columns , skip_offsets ) ;
serialize_settings . low_cardinality_max_dictionary_size = settings . low_cardinality_max_dictionary_size ;
serialize_settings . low_cardinality_use_single_dictionary_for_part = settings . low_cardinality_use_single_dictionary_for_part ! = 0 ;
size_t total_rows = column . size ( ) ;
size_t current_row = 0 ;
2019-03-20 17:04:34 +00:00
//std::cerr << "FROM MARK:" << from_mark << std::endl;
2018-11-30 15:36:10 +00:00
size_t current_column_mark = from_mark ;
while ( current_row < total_rows )
{
size_t rows_to_write ;
bool write_marks = true ;
/// If there is `index_offset`, then the first mark goes not immediately, but after this number of rows.
if ( current_row = = 0 & & index_offset ! = 0 )
{
write_marks = false ;
rows_to_write = index_offset ;
}
else
{
if ( index_granularity . size ( ) < = current_column_mark )
throw Exception (
" Incorrect size of index granularity expect mark " + toString ( current_column_mark ) + " totally have marks " + toString ( index_granularity . size ( ) ) ,
ErrorCodes : : LOGICAL_ERROR ) ;
rows_to_write = index_granularity [ current_column_mark ] ;
}
current_row = writeSingleGranule (
name ,
type ,
column ,
offset_columns ,
skip_offsets ,
serialization_state ,
serialize_settings ,
current_row ,
rows_to_write ,
write_marks
) ;
2018-12-04 14:44:42 +00:00
if ( write_marks )
current_column_mark + + ;
2019-03-20 17:04:34 +00:00
//std::cerr << "current column mark (loop):" << current_column_mark << std::endl;
2017-04-01 07:20:54 +00:00
}
2019-03-20 17:04:34 +00:00
//std::cerr << "Current column mark:" << current_column_mark << std::endl;
2017-11-20 02:15:15 +00:00
/// Memoize offsets for Nested types, that are already written. They will not be written again for next columns of Nested structure.
type . enumerateStreams ( [ & ] ( const IDataType : : SubstreamPath & substream_path )
{
bool is_offsets = ! substream_path . empty ( ) & & substream_path . back ( ) . type = = IDataType : : Substream : : ArraySizes ;
if ( is_offsets )
{
String stream_name = IDataType : : getFileNameForStream ( name , substream_path ) ;
offset_columns . insert ( stream_name ) ;
}
2018-07-09 18:19:03 +00:00
} , serialize_settings . path ) ;
2018-11-30 15:36:10 +00:00
2019-03-20 17:04:34 +00:00
return std : : make_pair ( current_column_mark , current_row - total_rows ) ;
2016-07-21 16:22:24 +00:00
}
2016-12-10 04:51:36 +00:00
2016-07-21 16:22:24 +00:00
/// Implementation of IMergedBlockOutputStream::ColumnStream.
IMergedBlockOutputStream : : ColumnStream : : ColumnStream (
2017-04-01 07:20:54 +00:00
const String & escaped_column_name_ ,
const String & data_path ,
const std : : string & data_file_extension_ ,
const std : : string & marks_path ,
const std : : string & marks_file_extension_ ,
2018-10-11 02:57:48 +00:00
const CompressionCodecPtr & compression_codec ,
2017-04-01 07:20:54 +00:00
size_t max_compress_block_size ,
size_t estimated_size ,
size_t aio_threshold ) :
escaped_column_name ( escaped_column_name_ ) ,
data_file_extension { data_file_extension_ } ,
marks_file_extension { marks_file_extension_ } ,
plain_file ( createWriteBufferFromFileBase ( data_path + data_file_extension , estimated_size , aio_threshold , max_compress_block_size ) ) ,
2018-12-20 17:37:02 +00:00
plain_hashing ( * plain_file ) , compressed_buf ( plain_hashing , compression_codec ) , compressed ( compressed_buf ) ,
2017-04-01 07:20:54 +00:00
marks_file ( marks_path + marks_file_extension , 4096 , O_TRUNC | O_CREAT | O_WRONLY ) , marks ( marks_file )
2016-07-21 16:22:24 +00:00
{
}
void IMergedBlockOutputStream : : ColumnStream : : finalize ( )
{
2017-04-01 07:20:54 +00:00
compressed . next ( ) ;
plain_file - > next ( ) ;
marks . next ( ) ;
2016-07-21 16:22:24 +00:00
}
void IMergedBlockOutputStream : : ColumnStream : : sync ( )
{
2017-04-01 07:20:54 +00:00
plain_file - > sync ( ) ;
marks_file . sync ( ) ;
2016-07-21 16:22:24 +00:00
}
2016-12-10 06:10:29 +00:00
void IMergedBlockOutputStream : : ColumnStream : : addToChecksums ( MergeTreeData : : DataPart : : Checksums & checksums )
2016-07-21 16:22:24 +00:00
{
2017-04-01 07:20:54 +00:00
String name = escaped_column_name ;
2016-07-21 16:22:24 +00:00
2017-04-01 07:20:54 +00:00
checksums . files [ name + data_file_extension ] . is_compressed = true ;
checksums . files [ name + data_file_extension ] . uncompressed_size = compressed . count ( ) ;
checksums . files [ name + data_file_extension ] . uncompressed_hash = compressed . getHash ( ) ;
checksums . files [ name + data_file_extension ] . file_size = plain_hashing . count ( ) ;
checksums . files [ name + data_file_extension ] . file_hash = plain_hashing . getHash ( ) ;
2016-07-21 16:22:24 +00:00
2017-04-01 07:20:54 +00:00
checksums . files [ name + marks_file_extension ] . file_size = marks . count ( ) ;
checksums . files [ name + marks_file_extension ] . file_hash = marks . getHash ( ) ;
2016-07-21 16:22:24 +00:00
}
2016-12-10 04:51:36 +00:00
2016-07-21 16:22:24 +00:00
/// Implementation of MergedBlockOutputStream.
MergedBlockOutputStream : : MergedBlockOutputStream (
2017-04-01 07:20:54 +00:00
MergeTreeData & storage_ ,
String part_path_ ,
2017-12-25 21:57:29 +00:00
const NamesAndTypesList & columns_list_ ,
2019-03-18 12:02:33 +00:00
CompressionCodecPtr default_codec_ ,
2018-11-30 15:36:10 +00:00
bool blocks_are_granules_size_ ,
const std : : vector < size_t > & index_granularity_ )
2017-04-01 07:20:54 +00:00
: IMergedBlockOutputStream (
2019-01-04 12:10:00 +00:00
storage_ , storage_ . global_context . getSettings ( ) . min_compress_block_size ,
storage_ . global_context . getSettings ( ) . max_compress_block_size , default_codec_ ,
2019-03-18 12:02:33 +00:00
storage_ . global_context . getSettings ( ) . min_bytes_to_use_direct_io ,
2018-11-30 15:36:10 +00:00
blocks_are_granules_size_ ,
index_granularity_ ) ,
2017-04-01 07:20:54 +00:00
columns_list ( columns_list_ ) , part_path ( part_path_ )
2016-07-21 16:22:24 +00:00
{
2017-04-01 07:20:54 +00:00
init ( ) ;
for ( const auto & it : columns_list )
2018-10-11 02:57:48 +00:00
{
const auto columns = storage . getColumns ( ) ;
2018-12-21 12:17:30 +00:00
addStreams ( part_path , it . name , * it . type , columns . getCodecOrDefault ( it . name , default_codec_ ) , 0 , false ) ;
2018-10-11 02:57:48 +00:00
}
2016-07-21 16:22:24 +00:00
}
MergedBlockOutputStream : : MergedBlockOutputStream (
2017-04-01 07:20:54 +00:00
MergeTreeData & storage_ ,
String part_path_ ,
2017-12-25 21:57:29 +00:00
const NamesAndTypesList & columns_list_ ,
2018-12-21 12:17:30 +00:00
CompressionCodecPtr default_codec_ ,
2017-04-01 07:20:54 +00:00
const MergeTreeData : : DataPart : : ColumnToSize & merged_column_to_size_ ,
2018-11-30 15:36:10 +00:00
size_t aio_threshold_ ,
bool blocks_are_granules_size_ ,
const std : : vector < size_t > & index_granularity_ )
2017-04-01 07:20:54 +00:00
: IMergedBlockOutputStream (
2019-01-04 12:10:00 +00:00
storage_ , storage_ . global_context . getSettings ( ) . min_compress_block_size ,
storage_ . global_context . getSettings ( ) . max_compress_block_size , default_codec_ ,
2018-11-30 15:36:10 +00:00
aio_threshold_ , blocks_are_granules_size_ , index_granularity_ ) ,
2017-04-01 07:20:54 +00:00
columns_list ( columns_list_ ) , part_path ( part_path_ )
2016-07-21 16:22:24 +00:00
{
2017-04-01 07:20:54 +00:00
init ( ) ;
2018-11-28 15:05:28 +00:00
/// If summary size is more than threshold than we will use AIO
size_t total_size = 0 ;
if ( aio_threshold > 0 )
2017-04-01 07:20:54 +00:00
{
2018-11-28 15:05:28 +00:00
for ( const auto & it : columns_list )
2017-04-01 07:20:54 +00:00
{
auto it2 = merged_column_to_size_ . find ( it . name ) ;
if ( it2 ! = merged_column_to_size_ . end ( ) )
2018-11-28 15:05:28 +00:00
total_size + = it2 - > second ;
2017-04-01 07:20:54 +00:00
}
}
2018-11-28 15:05:28 +00:00
for ( const auto & it : columns_list )
2018-10-11 02:57:48 +00:00
{
const auto columns = storage . getColumns ( ) ;
2018-12-21 12:17:30 +00:00
addStreams ( part_path , it . name , * it . type , columns . getCodecOrDefault ( it . name , default_codec_ ) , total_size , false ) ;
2018-10-11 02:57:48 +00:00
}
2016-07-21 16:22:24 +00:00
}
std : : string MergedBlockOutputStream : : getPartPath ( ) const
{
2017-04-01 07:20:54 +00:00
return part_path ;
2016-07-21 16:22:24 +00:00
}
2018-05-07 02:01:11 +00:00
/// If data is pre-sorted.
2016-07-21 16:22:24 +00:00
void MergedBlockOutputStream : : write ( const Block & block )
{
2017-04-01 07:20:54 +00:00
writeImpl ( block , nullptr ) ;
2016-07-21 16:22:24 +00:00
}
2017-03-12 19:18:07 +00:00
/** If the data is not sorted, but we pre-calculated the permutation, after which they will be sorted.
2017-04-01 07:20:54 +00:00
* This method is used to save RAM , since you do not need to keep two blocks at once - the source and the sorted .
*/
2016-07-21 16:22:24 +00:00
void MergedBlockOutputStream : : writeWithPermutation ( const Block & block , const IColumn : : Permutation * permutation )
{
2017-04-01 07:20:54 +00:00
writeImpl ( block , permutation ) ;
2016-07-21 16:22:24 +00:00
}
void MergedBlockOutputStream : : writeSuffix ( )
{
2017-04-01 07:20:54 +00:00
throw Exception ( " Method writeSuffix is not supported by MergedBlockOutputStream " , ErrorCodes : : NOT_IMPLEMENTED ) ;
2016-07-21 16:22:24 +00:00
}
2017-08-30 19:03:19 +00:00
void MergedBlockOutputStream : : writeSuffixAndFinalizePart (
MergeTreeData : : MutableDataPartPtr & new_part ,
2017-12-25 21:57:29 +00:00
const NamesAndTypesList * total_column_list ,
2017-08-30 19:03:19 +00:00
MergeTreeData : : DataPart : : Checksums * additional_column_checksums )
2016-07-21 16:22:24 +00:00
{
2018-06-07 18:14:37 +00:00
/// Finish columns serialization.
2018-08-22 03:33:46 +00:00
if ( ! serialization_states . empty ( ) )
2018-06-07 18:14:37 +00:00
{
2019-01-04 12:10:00 +00:00
auto & settings = storage . global_context . getSettingsRef ( ) ;
2018-08-22 03:33:46 +00:00
IDataType : : SerializeBinaryBulkSettings serialize_settings ;
serialize_settings . low_cardinality_max_dictionary_size = settings . low_cardinality_max_dictionary_size ;
serialize_settings . low_cardinality_use_single_dictionary_for_part = settings . low_cardinality_use_single_dictionary_for_part ! = 0 ;
2018-10-16 21:22:41 +00:00
WrittenOffsetColumns offset_columns ;
2018-08-22 03:33:46 +00:00
auto it = columns_list . begin ( ) ;
for ( size_t i = 0 ; i < columns_list . size ( ) ; + + i , + + it )
{
serialize_settings . getter = createStreamGetter ( it - > name , offset_columns , false ) ;
it - > type - > serializeBinaryBulkStateSuffix ( serialize_settings , serialization_states [ i ] ) ;
}
2018-06-07 18:14:37 +00:00
}
2019-02-05 14:50:25 +00:00
/// Finish skip index serialization
for ( size_t i = 0 ; i < storage . skip_indices . size ( ) ; + + i )
{
auto & stream = * skip_indices_streams [ i ] ;
2019-03-08 19:52:21 +00:00
if ( ! skip_indices_aggregators [ i ] - > empty ( ) )
skip_indices_aggregators [ i ] - > getGranuleAndReset ( ) - > serializeBinary ( stream . compressed ) ;
2019-02-05 14:50:25 +00:00
}
2017-08-30 19:03:19 +00:00
if ( ! total_column_list )
total_column_list = & columns_list ;
2017-04-01 07:20:54 +00:00
/// Finish write and get checksums.
MergeTreeData : : DataPart : : Checksums checksums ;
if ( additional_column_checksums )
checksums = std : : move ( * additional_column_checksums ) ;
2018-02-19 17:31:30 +00:00
if ( index_stream )
2017-04-01 07:20:54 +00:00
{
index_stream - > next ( ) ;
checksums . files [ " primary.idx " ] . file_size = index_stream - > count ( ) ;
checksums . files [ " primary.idx " ] . file_hash = index_stream - > getHash ( ) ;
index_stream = nullptr ;
}
2019-02-05 14:50:25 +00:00
for ( auto & stream : skip_indices_streams )
{
stream - > finalize ( ) ;
stream - > addToChecksums ( checksums ) ;
}
skip_indices_streams . clear ( ) ;
2019-03-08 19:52:21 +00:00
skip_indices_aggregators . clear ( ) ;
2019-02-05 14:50:25 +00:00
skip_index_filling . clear ( ) ;
2017-04-01 07:20:54 +00:00
for ( ColumnStreams : : iterator it = column_streams . begin ( ) ; it ! = column_streams . end ( ) ; + + it )
{
it - > second - > finalize ( ) ;
it - > second - > addToChecksums ( checksums ) ;
}
column_streams . clear ( ) ;
2017-10-19 18:20:41 +00:00
if ( storage . format_version > = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING )
{
new_part - > partition . store ( storage , part_path , checksums ) ;
2018-05-23 19:34:37 +00:00
if ( new_part - > minmax_idx . initialized )
new_part - > minmax_idx . store ( storage , part_path , checksums ) ;
2018-08-06 16:42:43 +00:00
else if ( rows_count )
2018-08-06 16:53:34 +00:00
throw Exception ( " MinMax index was not initialized for new non-empty part " + new_part - > name
2018-08-06 16:42:43 +00:00
+ " . It is a bug. " , ErrorCodes : : LOGICAL_ERROR ) ;
2017-10-24 14:11:53 +00:00
WriteBufferFromFile count_out ( part_path + " count.txt " , 4096 ) ;
HashingWriteBuffer count_out_hashing ( count_out ) ;
writeIntText ( rows_count , count_out_hashing ) ;
count_out_hashing . next ( ) ;
checksums . files [ " count.txt " ] . file_size = count_out_hashing . count ( ) ;
checksums . files [ " count.txt " ] . file_hash = count_out_hashing . getHash ( ) ;
2017-04-01 07:20:54 +00:00
}
{
/// Write a file with a description of columns.
WriteBufferFromFile out ( part_path + " columns.txt " , 4096 ) ;
2017-08-30 19:03:19 +00:00
total_column_list - > writeText ( out ) ;
2017-04-01 07:20:54 +00:00
}
{
/// Write file with checksums.
WriteBufferFromFile out ( part_path + " checksums.txt " , 4096 ) ;
checksums . write ( out ) ;
}
2017-10-24 14:11:53 +00:00
new_part - > rows_count = rows_count ;
2019-03-19 09:57:29 +00:00
//std::cerr << "SETTING CURRENT MARK FOR PART:" << part_path << " to " << current_mark << std::endl;
2018-11-30 15:36:10 +00:00
new_part - > marks_count = current_mark ;
2017-08-30 19:03:19 +00:00
new_part - > modification_time = time ( nullptr ) ;
new_part - > columns = * total_column_list ;
2017-12-15 20:48:46 +00:00
new_part - > index . assign ( std : : make_move_iterator ( index_columns . begin ( ) ) , std : : make_move_iterator ( index_columns . end ( ) ) ) ;
2017-08-30 19:03:19 +00:00
new_part - > checksums = checksums ;
2018-07-08 03:56:24 +00:00
new_part - > bytes_on_disk = checksums . getTotalSizeOnDisk ( ) ;
2018-11-15 14:06:54 +00:00
new_part - > marks_file_extension = marks_file_extension ;
2018-11-30 15:36:10 +00:00
new_part - > marks_index_granularity . swap ( index_granularity ) ;
2018-11-15 14:06:54 +00:00
new_part - > mark_size_in_bytes = mark_size_in_bytes ;
2016-07-21 16:22:24 +00:00
}
void MergedBlockOutputStream : : init ( )
{
2017-04-01 07:20:54 +00:00
Poco : : File ( part_path ) . createDirectories ( ) ;
2018-02-19 17:31:30 +00:00
if ( storage . hasPrimaryKey ( ) )
2017-04-01 07:20:54 +00:00
{
index_file_stream = std : : make_unique < WriteBufferFromFile > (
part_path + " primary.idx " , DBMS_DEFAULT_BUFFER_SIZE , O_TRUNC | O_CREAT | O_WRONLY ) ;
index_stream = std : : make_unique < HashingWriteBuffer > ( * index_file_stream ) ;
}
2019-02-05 14:50:25 +00:00
for ( const auto & index : storage . skip_indices )
{
String stream_name = index - > getFileName ( ) ;
skip_indices_streams . emplace_back (
std : : make_unique < ColumnStream > (
stream_name ,
part_path + stream_name , INDEX_FILE_EXTENSION ,
2019-03-18 12:02:33 +00:00
part_path + stream_name , marks_file_extension ,
2019-02-05 14:50:25 +00:00
codec , max_compress_block_size ,
0 , aio_threshold ) ) ;
2019-03-08 19:52:21 +00:00
skip_indices_aggregators . push_back ( index - > createIndexAggregator ( ) ) ;
2019-02-05 14:50:25 +00:00
skip_index_filling . push_back ( 0 ) ;
}
2016-07-21 16:22:24 +00:00
}
2016-12-10 04:51:36 +00:00
2016-07-21 16:22:24 +00:00
void MergedBlockOutputStream : : writeImpl ( const Block & block , const IColumn : : Permutation * permutation )
{
2017-04-01 07:20:54 +00:00
block . checkNumberOfRows ( ) ;
size_t rows = block . rows ( ) ;
2018-12-04 14:44:42 +00:00
if ( ! rows )
return ;
2018-11-30 15:36:10 +00:00
/// Fill index granularity for this block
/// if it's unknown (in case of insert data or horizontal merge,
/// but not in case of vertical merge)
if ( compute_granularity )
fillIndexGranularity ( block ) ;
2017-04-01 07:20:54 +00:00
2017-08-07 07:31:16 +00:00
/// The set of written offset columns so that you do not write shared offsets of nested structures columns several times
2018-10-16 21:22:41 +00:00
WrittenOffsetColumns offset_columns ;
2017-04-01 07:20:54 +00:00
2018-11-09 19:01:39 +00:00
auto primary_key_column_names = storage . primary_key_columns ;
2019-02-14 16:59:26 +00:00
std : : set < String > skip_indexes_column_names_set ;
2019-02-05 14:50:25 +00:00
for ( const auto & index : storage . skip_indices )
2019-02-14 16:59:26 +00:00
std : : copy ( index - > columns . cbegin ( ) , index - > columns . cend ( ) ,
std : : inserter ( skip_indexes_column_names_set , skip_indexes_column_names_set . end ( ) ) ) ;
Names skip_indexes_column_names ( skip_indexes_column_names_set . begin ( ) , skip_indexes_column_names_set . end ( ) ) ;
2017-04-01 07:20:54 +00:00
/// Here we will add the columns related to the Primary Key, then write the index.
2018-10-11 14:53:23 +00:00
std : : vector < ColumnWithTypeAndName > primary_key_columns ( primary_key_column_names . size ( ) ) ;
std : : map < String , size_t > primary_key_column_name_to_position ;
2017-04-01 07:20:54 +00:00
2018-10-11 14:53:23 +00:00
for ( size_t i = 0 , size = primary_key_column_names . size ( ) ; i < size ; + + i )
2017-04-01 07:20:54 +00:00
{
2018-10-11 14:53:23 +00:00
const auto & name = primary_key_column_names [ i ] ;
2017-04-01 07:20:54 +00:00
2018-10-11 14:53:23 +00:00
if ( ! primary_key_column_name_to_position . emplace ( name , i ) . second )
2017-04-01 07:20:54 +00:00
throw Exception ( " Primary key contains duplicate columns " , ErrorCodes : : BAD_ARGUMENTS ) ;
2018-10-11 14:53:23 +00:00
primary_key_columns [ i ] = block . getByName ( name ) ;
2017-04-01 07:20:54 +00:00
2018-10-11 14:53:23 +00:00
/// Reorder primary key columns in advance and add them to `primary_key_columns`.
2017-04-01 07:20:54 +00:00
if ( permutation )
2018-10-11 14:53:23 +00:00
primary_key_columns [ i ] . column = primary_key_columns [ i ] . column - > permute ( * permutation , 0 ) ;
2017-04-01 07:20:54 +00:00
}
2019-02-05 14:50:25 +00:00
/// The same for skip indexes columns
std : : vector < ColumnWithTypeAndName > skip_indexes_columns ( skip_indexes_column_names . size ( ) ) ;
std : : map < String , size_t > skip_indexes_column_name_to_position ;
for ( size_t i = 0 , size = skip_indexes_column_names . size ( ) ; i < size ; + + i )
{
const auto & name = skip_indexes_column_names [ i ] ;
skip_indexes_column_name_to_position . emplace ( name , i ) ;
skip_indexes_columns [ i ] = block . getByName ( name ) ;
/// Reorder index columns in advance.
if ( permutation )
skip_indexes_columns [ i ] . column = skip_indexes_columns [ i ] . column - > permute ( * permutation , 0 ) ;
}
2017-04-01 07:20:54 +00:00
if ( index_columns . empty ( ) )
{
2018-10-11 14:53:23 +00:00
index_columns . resize ( primary_key_column_names . size ( ) ) ;
for ( size_t i = 0 , size = primary_key_column_names . size ( ) ; i < size ; + + i )
index_columns [ i ] = primary_key_columns [ i ] . column - > cloneEmpty ( ) ;
2017-04-01 07:20:54 +00:00
}
2018-06-07 18:14:37 +00:00
if ( serialization_states . empty ( ) )
{
serialization_states . reserve ( columns_list . size ( ) ) ;
2018-10-16 21:22:41 +00:00
WrittenOffsetColumns tmp_offset_columns ;
2018-06-07 18:14:37 +00:00
IDataType : : SerializeBinaryBulkSettings settings ;
for ( const auto & col : columns_list )
{
settings . getter = createStreamGetter ( col . name , tmp_offset_columns , false ) ;
serialization_states . emplace_back ( nullptr ) ;
col . type - > serializeBinaryBulkStatePrefix ( settings , serialization_states . back ( ) ) ;
}
}
2018-11-30 15:36:10 +00:00
size_t new_index_offset = 0 ;
2017-04-01 07:20:54 +00:00
/// Now write the data.
2018-06-07 18:14:37 +00:00
auto it = columns_list . begin ( ) ;
for ( size_t i = 0 ; i < columns_list . size ( ) ; + + i , + + it )
2017-04-01 07:20:54 +00:00
{
2018-06-07 18:14:37 +00:00
const ColumnWithTypeAndName & column = block . getByName ( it - > name ) ;
2017-04-01 07:20:54 +00:00
if ( permutation )
{
2018-10-11 14:53:23 +00:00
auto primary_column_it = primary_key_column_name_to_position . find ( it - > name ) ;
2019-02-05 14:50:25 +00:00
auto skip_index_column_it = skip_indexes_column_name_to_position . find ( it - > name ) ;
2018-10-11 14:53:23 +00:00
if ( primary_key_column_name_to_position . end ( ) ! = primary_column_it )
2017-04-01 07:20:54 +00:00
{
2019-02-05 14:50:25 +00:00
const auto & primary_column = * primary_key_columns [ primary_column_it - > second ] . column ;
2018-11-30 15:36:10 +00:00
std : : tie ( std : : ignore , new_index_offset ) = writeColumn ( column . name , * column . type , primary_column , offset_columns , false , serialization_states [ i ] , current_mark ) ;
2017-04-01 07:20:54 +00:00
}
2019-02-05 14:50:25 +00:00
else if ( skip_indexes_column_name_to_position . end ( ) ! = skip_index_column_it )
{
const auto & index_column = * skip_indexes_columns [ skip_index_column_it - > second ] . column ;
2019-03-18 12:02:33 +00:00
writeColumn ( column . name , * column . type , index_column , offset_columns , false , serialization_states [ i ] , current_mark ) ;
2019-02-05 14:50:25 +00:00
}
2017-04-01 07:20:54 +00:00
else
{
/// We rearrange the columns that are not included in the primary key here; Then the result is released - to save RAM.
2018-06-07 18:14:37 +00:00
ColumnPtr permuted_column = column . column - > permute ( * permutation , 0 ) ;
2018-11-30 15:36:10 +00:00
std : : tie ( std : : ignore , new_index_offset ) = writeColumn ( column . name , * column . type , * permuted_column , offset_columns , false , serialization_states [ i ] , current_mark ) ;
2017-04-01 07:20:54 +00:00
}
}
else
{
2018-11-30 15:36:10 +00:00
std : : tie ( std : : ignore , new_index_offset ) = writeColumn ( column . name , * column . type , * column . column , offset_columns , false , serialization_states [ i ] , current_mark ) ;
2017-04-01 07:20:54 +00:00
}
}
2017-10-24 14:11:53 +00:00
rows_count + = rows ;
2019-02-05 14:50:25 +00:00
{
2019-02-14 16:59:26 +00:00
/// Creating block for update
Block indices_update_block ( skip_indexes_columns ) ;
2019-03-18 12:02:33 +00:00
/// Filling and writing skip indices like in IMergedBlockOutputStream::writeColumn
2019-02-05 14:50:25 +00:00
for ( size_t i = 0 ; i < storage . skip_indices . size ( ) ; + + i )
{
const auto index = storage . skip_indices [ i ] ;
auto & stream = * skip_indices_streams [ i ] ;
size_t prev_pos = 0 ;
while ( prev_pos < rows )
{
2019-02-10 15:17:45 +00:00
UInt64 limit = 0 ;
2019-02-05 14:50:25 +00:00
if ( prev_pos = = 0 & & index_offset ! = 0 )
{
limit = index_offset ;
}
else
{
limit = storage . index_granularity ;
2019-03-08 19:52:21 +00:00
if ( skip_indices_aggregators [ i ] - > empty ( ) )
2019-02-05 14:50:25 +00:00
{
2019-03-08 19:52:21 +00:00
skip_indices_aggregators [ i ] = index - > createIndexAggregator ( ) ;
2019-02-05 14:50:25 +00:00
skip_index_filling [ i ] = 0 ;
if ( stream . compressed . offset ( ) > = min_compress_block_size )
stream . compressed . next ( ) ;
writeIntBinary ( stream . plain_hashing . count ( ) , stream . marks ) ;
writeIntBinary ( stream . compressed . offset ( ) , stream . marks ) ;
}
}
size_t pos = prev_pos ;
2019-03-08 19:52:21 +00:00
skip_indices_aggregators [ i ] - > update ( indices_update_block , & pos , limit ) ;
2019-02-05 14:50:25 +00:00
if ( pos = = prev_pos + limit )
{
+ + skip_index_filling [ i ] ;
/// write index if it is filled
if ( skip_index_filling [ i ] = = index - > granularity )
{
2019-03-08 19:52:21 +00:00
skip_indices_aggregators [ i ] - > getGranuleAndReset ( ) - > serializeBinary ( stream . compressed ) ;
2019-02-05 14:50:25 +00:00
skip_index_filling [ i ] = 0 ;
}
}
prev_pos = pos ;
}
}
}
2017-04-01 07:20:54 +00:00
{
/** While filling index (index_columns), disable memory tracker.
* Because memory is allocated here ( maybe in context of INSERT query ) ,
* but then freed in completely different place ( while merging parts ) , where query memory_tracker is not available .
* And otherwise it will look like excessively growing memory consumption in context of query .
* ( observed in long INSERT SELECTs )
*/
2018-05-31 15:54:08 +00:00
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock ( ) ;
2017-04-01 07:20:54 +00:00
/// Write index. The index contains Primary Key value for each `index_granularity` row.
2019-03-19 09:57:29 +00:00
//std::cerr << "Index Granularity size:" << index_granularity.size() << std::endl;
//std::cerr << "Index Granularity first elem:" << index_granularity[0] << std::endl;
2019-03-18 15:54:58 +00:00
for ( size_t i = index_offset ; i < rows ; )
2017-04-01 07:20:54 +00:00
{
2018-02-19 17:31:30 +00:00
if ( storage . hasPrimaryKey ( ) )
2017-04-01 07:20:54 +00:00
{
2018-10-11 14:53:23 +00:00
for ( size_t j = 0 , size = primary_key_columns . size ( ) ; j < size ; + + j )
2017-04-01 07:20:54 +00:00
{
2018-10-11 14:53:23 +00:00
const IColumn & primary_column = * primary_key_columns [ j ] . column . get ( ) ;
2017-09-01 18:21:01 +00:00
index_columns [ j ] - > insertFrom ( primary_column , i ) ;
2018-10-11 14:53:23 +00:00
primary_key_columns [ j ] . type - > serializeBinary ( primary_column , i , * index_stream ) ;
2017-04-01 07:20:54 +00:00
}
}
2019-03-19 09:57:29 +00:00
//std::cerr << "I:" << i << " Total rows:" << rows << std::endl;
//std::cerr << "Increment current mark:" << current_mark << std::endl;
2018-11-30 15:36:10 +00:00
+ + current_mark ;
2019-03-18 15:54:58 +00:00
if ( current_mark < index_granularity . size ( ) )
i + = index_granularity [ current_mark ] ;
else
break ;
2017-04-01 07:20:54 +00:00
}
}
2019-03-19 09:57:29 +00:00
//std::cerr << "Index granularity size:" << index_granularity.size() << std::endl;
//std::cerr << "block written, total marks:" << current_mark << std::endl;
2017-04-01 07:20:54 +00:00
2018-11-30 15:36:10 +00:00
index_offset = new_index_offset ;
2016-07-21 16:22:24 +00:00
}
2016-12-10 04:51:36 +00:00
2016-07-21 16:22:24 +00:00
/// Implementation of MergedColumnOnlyOutputStream.
2016-12-10 04:51:36 +00:00
MergedColumnOnlyOutputStream : : MergedColumnOnlyOutputStream (
2018-10-16 21:22:41 +00:00
MergeTreeData & storage_ , const Block & header_ , String part_path_ , bool sync_ ,
2018-12-21 12:17:30 +00:00
CompressionCodecPtr default_codec_ , bool skip_offsets_ ,
2018-11-30 15:36:10 +00:00
WrittenOffsetColumns & already_written_offset_columns ,
const std : : vector < size_t > & index_granularity_ )
2017-04-01 07:20:54 +00:00
: IMergedBlockOutputStream (
2019-01-04 12:10:00 +00:00
storage_ , storage_ . global_context . getSettings ( ) . min_compress_block_size ,
storage_ . global_context . getSettings ( ) . max_compress_block_size , default_codec_ ,
2019-03-18 12:02:33 +00:00
storage_ . global_context . getSettings ( ) . min_bytes_to_use_direct_io ,
2018-11-30 15:36:10 +00:00
false ,
index_granularity_ ) ,
2018-10-16 21:22:41 +00:00
header ( header_ ) , part_path ( part_path_ ) , sync ( sync_ ) , skip_offsets ( skip_offsets_ ) ,
already_written_offset_columns ( already_written_offset_columns )
2016-07-21 16:22:24 +00:00
{
2018-11-30 15:36:10 +00:00
if ( index_granularity . empty ( ) )
throw Exception ( " Can't write column without information about part index granularity " , ErrorCodes : : LOGICAL_ERROR ) ;
2016-07-21 16:22:24 +00:00
}
void MergedColumnOnlyOutputStream : : write ( const Block & block )
{
2017-04-01 07:20:54 +00:00
if ( ! initialized )
{
column_streams . clear ( ) ;
2018-06-07 18:14:37 +00:00
serialization_states . clear ( ) ;
serialization_states . reserve ( block . columns ( ) ) ;
2018-10-16 21:22:41 +00:00
WrittenOffsetColumns tmp_offset_columns ;
2018-06-07 18:14:37 +00:00
IDataType : : SerializeBinaryBulkSettings settings ;
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < block . columns ( ) ; + + i )
{
2018-06-07 18:14:37 +00:00
const auto & col = block . safeGetByPosition ( i ) ;
2018-10-11 02:57:48 +00:00
const auto columns = storage . getColumns ( ) ;
2018-12-21 12:17:30 +00:00
addStreams ( part_path , col . name , * col . type , columns . getCodecOrDefault ( col . name , codec ) , 0 , skip_offsets ) ;
2018-06-07 18:14:37 +00:00
serialization_states . emplace_back ( nullptr ) ;
settings . getter = createStreamGetter ( col . name , tmp_offset_columns , false ) ;
col . type - > serializeBinaryBulkStatePrefix ( settings , serialization_states . back ( ) ) ;
2017-04-01 07:20:54 +00:00
}
2018-06-07 18:14:37 +00:00
2017-04-01 07:20:54 +00:00
initialized = true ;
}
2018-11-30 15:36:10 +00:00
size_t new_index_offset = 0 ;
size_t new_current_mark = 0 ;
2018-10-16 21:22:41 +00:00
WrittenOffsetColumns offset_columns = already_written_offset_columns ;
2017-04-01 07:20:54 +00:00
for ( size_t i = 0 ; i < block . columns ( ) ; + + i )
{
const ColumnWithTypeAndName & column = block . safeGetByPosition ( i ) ;
2018-11-30 15:36:10 +00:00
std : : tie ( new_current_mark , new_index_offset ) = writeColumn ( column . name , * column . type , * column . column , offset_columns , skip_offsets , serialization_states [ i ] , current_mark ) ;
2017-04-01 07:20:54 +00:00
}
2018-11-30 15:36:10 +00:00
index_offset = new_index_offset ;
current_mark = new_current_mark ;
2016-07-21 16:22:24 +00:00
}
void MergedColumnOnlyOutputStream : : writeSuffix ( )
{
2017-04-01 07:20:54 +00:00
throw Exception ( " Method writeSuffix is not supported by MergedColumnOnlyOutputStream " , ErrorCodes : : NOT_IMPLEMENTED ) ;
2016-07-21 16:22:24 +00:00
}
MergeTreeData : : DataPart : : Checksums MergedColumnOnlyOutputStream : : writeSuffixAndGetChecksums ( )
{
2018-06-07 18:14:37 +00:00
/// Finish columns serialization.
2019-01-04 12:10:00 +00:00
auto & settings = storage . global_context . getSettingsRef ( ) ;
2018-07-09 18:19:03 +00:00
IDataType : : SerializeBinaryBulkSettings serialize_settings ;
2018-08-08 16:40:50 +00:00
serialize_settings . low_cardinality_max_dictionary_size = settings . low_cardinality_max_dictionary_size ;
serialize_settings . low_cardinality_use_single_dictionary_for_part = settings . low_cardinality_use_single_dictionary_for_part ! = 0 ;
2018-10-16 21:22:41 +00:00
for ( size_t i = 0 , size = header . columns ( ) ; i < size ; + + i )
2018-06-07 18:14:37 +00:00
{
2018-10-16 21:22:41 +00:00
auto & column = header . getByPosition ( i ) ;
serialize_settings . getter = createStreamGetter ( column . name , already_written_offset_columns , skip_offsets ) ;
2018-07-09 18:19:03 +00:00
column . type - > serializeBinaryBulkStateSuffix ( serialize_settings , serialization_states [ i ] ) ;
2018-06-07 18:14:37 +00:00
}
2017-04-01 07:20:54 +00:00
MergeTreeData : : DataPart : : Checksums checksums ;
2016-07-21 16:22:24 +00:00
2017-04-01 07:20:54 +00:00
for ( auto & column_stream : column_streams )
{
column_stream . second - > finalize ( ) ;
if ( sync )
column_stream . second - > sync ( ) ;
2016-08-29 16:57:59 +00:00
2017-04-01 07:20:54 +00:00
column_stream . second - > addToChecksums ( checksums ) ;
}
2016-07-21 16:22:24 +00:00
2017-04-01 07:20:54 +00:00
column_streams . clear ( ) ;
2018-06-07 18:14:37 +00:00
serialization_states . clear ( ) ;
2017-04-01 07:20:54 +00:00
initialized = false ;
2016-07-21 16:22:24 +00:00
2017-04-01 07:20:54 +00:00
return checksums ;
2016-07-21 16:22:24 +00:00
}
}