2019-10-10 16:30:30 +00:00
# include "IMergeTreeDataPart.h"
# include <optional>
2021-12-08 02:40:59 +00:00
# include <boost/algorithm/string/join.hpp>
2022-01-03 23:04:56 +00:00
# include <string_view>
2020-02-27 16:47:40 +00:00
# include <Core/Defines.h>
# include <IO/HashingWriteBuffer.h>
2021-12-08 02:40:59 +00:00
# include <IO/HashingReadBuffer.h>
2020-02-27 16:47:40 +00:00
# include <IO/ReadBufferFromString.h>
2019-10-10 16:30:30 +00:00
# include <IO/ReadHelpers.h>
# include <IO/WriteHelpers.h>
# include <Storages/MergeTree/MergeTreeData.h>
2020-03-19 16:37:55 +00:00
# include <Storages/MergeTree/localBackup.h>
2020-07-16 10:54:49 +00:00
# include <Storages/MergeTree/checkDataPart.h>
2021-01-14 16:26:56 +00:00
# include <Storages/StorageReplicatedMergeTree.h>
2022-01-05 11:51:50 +00:00
# include <Storages/MergeTree/PartMetadataManagerOrdinary.h>
# include <Storages/MergeTree/PartMetadataManagerWithCache.h>
2020-02-27 16:47:40 +00:00
# include <Common/StringUtils/StringUtils.h>
# include <Common/escapeForFileName.h>
2021-01-15 12:28:53 +00:00
# include <Common/CurrentMetrics.h>
2021-01-02 09:47:38 +00:00
# include <Common/FieldVisitorsAccurateComparison.h>
2022-01-10 19:39:10 +00:00
# include <Common/MemoryTrackerBlockerInThread.h>
2021-10-02 07:13:14 +00:00
# include <base/JSON.h>
# include <base/logger_useful.h>
2020-08-28 09:07:20 +00:00
# include <Compression/getCompressionCodecForFile.h>
2022-01-10 19:01:41 +00:00
# include <Parsers/parseQuery.h>
2020-08-28 09:07:20 +00:00
# include <Parsers/queryToString.h>
2022-01-10 19:01:41 +00:00
# include <Parsers/ExpressionElementParsers.h>
2020-11-27 11:00:33 +00:00
# include <DataTypes/NestedUtils.h>
2021-05-30 13:57:30 +00:00
# include <DataTypes/DataTypeAggregateFunction.h>
2022-02-14 19:50:08 +00:00
# include <Interpreters/MergeTreeTransaction.h>
2022-02-17 21:26:37 +00:00
# include <Interpreters/TransactionLog.h>
2019-10-10 16:30:30 +00:00
2021-01-15 12:28:53 +00:00
namespace CurrentMetrics
{
extern const Metric PartsTemporary ;
extern const Metric PartsPreCommitted ;
extern const Metric PartsCommitted ;
2021-12-30 14:27:22 +00:00
extern const Metric PartsPreActive ;
extern const Metric PartsActive ;
2021-01-15 12:28:53 +00:00
extern const Metric PartsOutdated ;
extern const Metric PartsDeleting ;
extern const Metric PartsDeleteOnDestroy ;
2021-01-21 18:17:00 +00:00
extern const Metric PartsWide ;
extern const Metric PartsCompact ;
extern const Metric PartsInMemory ;
2021-01-15 12:28:53 +00:00
}
2019-10-10 16:30:30 +00:00
namespace DB
{
2021-01-14 16:26:56 +00:00
2019-10-10 16:30:30 +00:00
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int DIRECTORY_ALREADY_EXISTS ;
extern const int CANNOT_READ_ALL_DATA ;
extern const int LOGICAL_ERROR ;
2019-10-10 16:30:30 +00:00
extern const int FILE_DOESNT_EXIST ;
extern const int NO_FILE_IN_DATA_PART ;
extern const int EXPECTED_END_OF_FILE ;
extern const int CORRUPTED_DATA ;
extern const int NOT_FOUND_EXPECTED_DATA_PART ;
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART ;
extern const int BAD_TTL_FILE ;
2019-11-18 15:18:50 +00:00
extern const int NOT_IMPLEMENTED ;
2019-10-10 16:30:30 +00:00
}
2020-02-27 16:47:40 +00:00
static std : : unique_ptr < ReadBufferFromFileBase > openForReading ( const DiskPtr & disk , const String & path )
2019-10-10 16:30:30 +00:00
{
2021-08-16 00:00:32 +00:00
size_t file_size = disk - > getFileSize ( path ) ;
2021-08-24 21:45:58 +00:00
return disk - > readFile ( path , ReadSettings ( ) . adjustBufferSize ( file_size ) , file_size ) ;
2019-10-10 16:30:30 +00:00
}
2022-01-07 10:37:08 +00:00
void IMergeTreeDataPart : : MinMaxIndex : : load ( const MergeTreeData & data , const PartMetadataManagerPtr & manager )
2019-10-10 16:30:30 +00:00
{
2021-03-02 10:33:54 +00:00
auto metadata_snapshot = data . getInMemoryMetadataPtr ( ) ;
const auto & partition_key = metadata_snapshot - > getPartitionKey ( ) ;
auto minmax_column_names = data . getMinMaxColumnsNames ( partition_key ) ;
auto minmax_column_types = data . getMinMaxColumnsTypes ( partition_key ) ;
size_t minmax_idx_size = minmax_column_types . size ( ) ;
2021-09-16 21:19:58 +00:00
2020-03-10 14:56:55 +00:00
hyperrectangle . reserve ( minmax_idx_size ) ;
2019-10-10 16:30:30 +00:00
for ( size_t i = 0 ; i < minmax_idx_size ; + + i )
{
2022-01-05 11:51:50 +00:00
String file_name = " minmax_ " + escapeForFileName ( minmax_column_names [ i ] ) + " .idx " ;
auto file = manager - > read ( file_name ) ;
2021-03-09 14:46:52 +00:00
auto serialization = minmax_column_types [ i ] - > getDefaultSerialization ( ) ;
2019-10-10 16:30:30 +00:00
Field min_val ;
2021-03-09 14:46:52 +00:00
serialization - > deserializeBinary ( min_val , * file ) ;
2019-10-10 16:30:30 +00:00
Field max_val ;
2021-03-09 14:46:52 +00:00
serialization - > deserializeBinary ( max_val , * file ) ;
2019-10-10 16:30:30 +00:00
2021-01-02 09:47:38 +00:00
// NULL_LAST
if ( min_val . isNull ( ) )
2021-08-27 14:09:15 +00:00
min_val = POSITIVE_INFINITY ;
2021-01-02 09:47:38 +00:00
if ( max_val . isNull ( ) )
2021-08-27 14:09:15 +00:00
max_val = POSITIVE_INFINITY ;
2021-01-02 09:47:38 +00:00
2020-03-10 14:56:55 +00:00
hyperrectangle . emplace_back ( min_val , true , max_val , true ) ;
2019-10-10 16:30:30 +00:00
}
initialized = true ;
}
2022-02-01 10:36:51 +00:00
IMergeTreeDataPart : : MinMaxIndex : : WrittenFiles IMergeTreeDataPart : : MinMaxIndex : : store (
2020-02-27 16:47:40 +00:00
const MergeTreeData & data , const DiskPtr & disk_ , const String & part_path , Checksums & out_checksums ) const
2019-10-10 16:30:30 +00:00
{
2021-03-02 10:33:54 +00:00
auto metadata_snapshot = data . getInMemoryMetadataPtr ( ) ;
const auto & partition_key = metadata_snapshot - > getPartitionKey ( ) ;
auto minmax_column_names = data . getMinMaxColumnsNames ( partition_key ) ;
auto minmax_column_types = data . getMinMaxColumnsTypes ( partition_key ) ;
2022-02-01 10:36:51 +00:00
return store ( minmax_column_names , minmax_column_types , disk_ , part_path , out_checksums ) ;
2019-10-10 16:30:30 +00:00
}
2022-02-01 10:36:51 +00:00
IMergeTreeDataPart : : MinMaxIndex : : WrittenFiles IMergeTreeDataPart : : MinMaxIndex : : store (
2020-02-27 16:47:40 +00:00
const Names & column_names ,
const DataTypes & data_types ,
const DiskPtr & disk_ ,
const String & part_path ,
Checksums & out_checksums ) const
2019-10-10 16:30:30 +00:00
{
if ( ! initialized )
throw Exception ( " Attempt to store uninitialized MinMax index for part " + part_path + " . This is a bug. " ,
ErrorCodes : : LOGICAL_ERROR ) ;
2022-02-01 10:36:51 +00:00
WrittenFiles written_files ;
2019-10-10 16:30:30 +00:00
for ( size_t i = 0 ; i < column_names . size ( ) ; + + i )
{
String file_name = " minmax_ " + escapeForFileName ( column_names [ i ] ) + " .idx " ;
2021-03-09 14:46:52 +00:00
auto serialization = data_types . at ( i ) - > getDefaultSerialization ( ) ;
2019-10-10 16:30:30 +00:00
2021-05-05 15:10:14 +00:00
auto out = disk_ - > writeFile ( fs : : path ( part_path ) / file_name ) ;
2020-02-27 16:47:40 +00:00
HashingWriteBuffer out_hashing ( * out ) ;
2021-03-09 14:46:52 +00:00
serialization - > serializeBinary ( hyperrectangle [ i ] . left , out_hashing ) ;
serialization - > serializeBinary ( hyperrectangle [ i ] . right , out_hashing ) ;
2019-10-10 16:30:30 +00:00
out_hashing . next ( ) ;
out_checksums . files [ file_name ] . file_size = out_hashing . count ( ) ;
out_checksums . files [ file_name ] . file_hash = out_hashing . getHash ( ) ;
2022-02-01 10:36:51 +00:00
out - > preFinalize ( ) ;
written_files . emplace_back ( std : : move ( out ) ) ;
2019-10-10 16:30:30 +00:00
}
2022-02-01 10:36:51 +00:00
return written_files ;
2019-10-10 16:30:30 +00:00
}
void IMergeTreeDataPart : : MinMaxIndex : : update ( const Block & block , const Names & column_names )
{
if ( ! initialized )
2020-03-10 14:56:55 +00:00
hyperrectangle . reserve ( column_names . size ( ) ) ;
2019-10-10 16:30:30 +00:00
for ( size_t i = 0 ; i < column_names . size ( ) ; + + i )
{
2020-04-02 17:27:07 +00:00
FieldRef min_value ;
FieldRef max_value ;
2019-10-10 16:30:30 +00:00
const ColumnWithTypeAndName & column = block . getByName ( column_names [ i ] ) ;
2021-01-02 09:47:38 +00:00
if ( const auto * column_nullable = typeid_cast < const ColumnNullable * > ( column . column . get ( ) ) )
column_nullable - > getExtremesNullLast ( min_value , max_value ) ;
else
column . column - > getExtremes ( min_value , max_value ) ;
2019-10-10 16:30:30 +00:00
if ( ! initialized )
2020-03-10 14:56:55 +00:00
hyperrectangle . emplace_back ( min_value , true , max_value , true ) ;
2019-10-10 16:30:30 +00:00
else
{
2021-01-02 09:47:38 +00:00
hyperrectangle [ i ] . left
= applyVisitor ( FieldVisitorAccurateLess ( ) , hyperrectangle [ i ] . left , min_value ) ? hyperrectangle [ i ] . left : min_value ;
hyperrectangle [ i ] . right
= applyVisitor ( FieldVisitorAccurateLess ( ) , hyperrectangle [ i ] . right , max_value ) ? max_value : hyperrectangle [ i ] . right ;
2019-10-10 16:30:30 +00:00
}
}
initialized = true ;
}
void IMergeTreeDataPart : : MinMaxIndex : : merge ( const MinMaxIndex & other )
{
if ( ! other . initialized )
return ;
if ( ! initialized )
{
2020-03-10 14:56:55 +00:00
hyperrectangle = other . hyperrectangle ;
2019-10-10 16:30:30 +00:00
initialized = true ;
}
else
{
2020-03-10 14:56:55 +00:00
for ( size_t i = 0 ; i < hyperrectangle . size ( ) ; + + i )
2019-10-10 16:30:30 +00:00
{
2020-03-10 14:56:55 +00:00
hyperrectangle [ i ] . left = std : : min ( hyperrectangle [ i ] . left , other . hyperrectangle [ i ] . left ) ;
hyperrectangle [ i ] . right = std : : max ( hyperrectangle [ i ] . right , other . hyperrectangle [ i ] . right ) ;
2019-10-10 16:30:30 +00:00
}
}
}
2021-12-08 02:40:59 +00:00
void IMergeTreeDataPart : : MinMaxIndex : : appendFiles ( const MergeTreeData & data , Strings & files )
{
auto metadata_snapshot = data . getInMemoryMetadataPtr ( ) ;
const auto & partition_key = metadata_snapshot - > getPartitionKey ( ) ;
auto minmax_column_names = data . getMinMaxColumnsNames ( partition_key ) ;
size_t minmax_idx_size = minmax_column_names . size ( ) ;
for ( size_t i = 0 ; i < minmax_idx_size ; + + i )
{
String file_name = " minmax_ " + escapeForFileName ( minmax_column_names [ i ] ) + " .idx " ;
files . push_back ( file_name ) ;
}
}
2019-10-10 16:30:30 +00:00
2021-01-21 18:17:00 +00:00
static void incrementStateMetric ( IMergeTreeDataPart : : State state )
2021-01-15 12:28:53 +00:00
{
switch ( state )
{
case IMergeTreeDataPart : : State : : Temporary :
CurrentMetrics : : add ( CurrentMetrics : : PartsTemporary ) ;
return ;
2021-12-30 14:27:22 +00:00
case IMergeTreeDataPart : : State : : PreActive :
CurrentMetrics : : add ( CurrentMetrics : : PartsPreActive ) ;
2021-01-15 12:28:53 +00:00
CurrentMetrics : : add ( CurrentMetrics : : PartsPreCommitted ) ;
return ;
2021-12-30 14:27:22 +00:00
case IMergeTreeDataPart : : State : : Active :
CurrentMetrics : : add ( CurrentMetrics : : PartsActive ) ;
2021-01-15 12:28:53 +00:00
CurrentMetrics : : add ( CurrentMetrics : : PartsCommitted ) ;
return ;
case IMergeTreeDataPart : : State : : Outdated :
CurrentMetrics : : add ( CurrentMetrics : : PartsOutdated ) ;
return ;
case IMergeTreeDataPart : : State : : Deleting :
CurrentMetrics : : add ( CurrentMetrics : : PartsDeleting ) ;
return ;
case IMergeTreeDataPart : : State : : DeleteOnDestroy :
CurrentMetrics : : add ( CurrentMetrics : : PartsDeleteOnDestroy ) ;
return ;
}
}
2021-01-21 18:17:00 +00:00
static void decrementStateMetric ( IMergeTreeDataPart : : State state )
2021-01-15 12:28:53 +00:00
{
switch ( state )
{
case IMergeTreeDataPart : : State : : Temporary :
CurrentMetrics : : sub ( CurrentMetrics : : PartsTemporary ) ;
return ;
2021-12-30 14:27:22 +00:00
case IMergeTreeDataPart : : State : : PreActive :
CurrentMetrics : : sub ( CurrentMetrics : : PartsPreActive ) ;
2021-01-15 12:28:53 +00:00
CurrentMetrics : : sub ( CurrentMetrics : : PartsPreCommitted ) ;
return ;
2021-12-30 14:27:22 +00:00
case IMergeTreeDataPart : : State : : Active :
CurrentMetrics : : sub ( CurrentMetrics : : PartsActive ) ;
2021-01-15 12:28:53 +00:00
CurrentMetrics : : sub ( CurrentMetrics : : PartsCommitted ) ;
return ;
case IMergeTreeDataPart : : State : : Outdated :
CurrentMetrics : : sub ( CurrentMetrics : : PartsOutdated ) ;
return ;
case IMergeTreeDataPart : : State : : Deleting :
CurrentMetrics : : sub ( CurrentMetrics : : PartsDeleting ) ;
return ;
case IMergeTreeDataPart : : State : : DeleteOnDestroy :
CurrentMetrics : : sub ( CurrentMetrics : : PartsDeleteOnDestroy ) ;
return ;
}
2021-01-21 18:17:00 +00:00
}
2021-01-15 12:28:53 +00:00
2021-01-21 18:17:00 +00:00
static void incrementTypeMetric ( MergeTreeDataPartType type )
{
switch ( type . getValue ( ) )
{
case MergeTreeDataPartType : : WIDE :
CurrentMetrics : : add ( CurrentMetrics : : PartsWide ) ;
return ;
case MergeTreeDataPartType : : COMPACT :
CurrentMetrics : : add ( CurrentMetrics : : PartsCompact ) ;
return ;
case MergeTreeDataPartType : : IN_MEMORY :
CurrentMetrics : : add ( CurrentMetrics : : PartsInMemory ) ;
return ;
case MergeTreeDataPartType : : UNKNOWN :
return ;
}
}
static void decrementTypeMetric ( MergeTreeDataPartType type )
{
switch ( type . getValue ( ) )
{
case MergeTreeDataPartType : : WIDE :
CurrentMetrics : : sub ( CurrentMetrics : : PartsWide ) ;
return ;
case MergeTreeDataPartType : : COMPACT :
CurrentMetrics : : sub ( CurrentMetrics : : PartsCompact ) ;
return ;
case MergeTreeDataPartType : : IN_MEMORY :
CurrentMetrics : : sub ( CurrentMetrics : : PartsInMemory ) ;
return ;
case MergeTreeDataPartType : : UNKNOWN :
return ;
}
2021-01-15 12:28:53 +00:00
}
2019-10-10 16:30:30 +00:00
IMergeTreeDataPart : : IMergeTreeDataPart (
2021-05-14 21:45:13 +00:00
const MergeTreeData & storage_ ,
2021-03-12 16:33:41 +00:00
const String & name_ ,
const VolumePtr & volume_ ,
const std : : optional < String > & relative_path_ ,
2021-02-10 14:12:49 +00:00
Type part_type_ ,
const IMergeTreeDataPart * parent_part_ )
2019-10-10 16:30:30 +00:00
: storage ( storage_ )
, name ( name_ )
, info ( MergeTreePartInfo : : fromPartName ( name_ , storage . format_version ) )
2021-02-10 14:12:49 +00:00
, volume ( parent_part_ ? parent_part_ - > volume : volume_ )
2019-10-10 16:30:30 +00:00
, relative_path ( relative_path_ . value_or ( name_ ) )
2020-01-14 13:23:51 +00:00
, index_granularity_info ( storage_ , part_type_ )
, part_type ( part_type_ )
2021-02-10 14:12:49 +00:00
, parent_part ( parent_part_ )
2021-12-28 11:29:01 +00:00
, use_metadata_cache ( storage . use_metadata_cache )
2019-10-16 18:27:53 +00:00
{
2021-02-10 14:12:49 +00:00
if ( parent_part )
2021-12-30 14:27:22 +00:00
state = State : : Active ;
2021-01-21 18:17:00 +00:00
incrementStateMetric ( state ) ;
incrementTypeMetric ( part_type ) ;
2021-09-16 21:19:58 +00:00
minmax_idx = std : : make_shared < MinMaxIndex > ( ) ;
2021-12-28 10:06:13 +00:00
2022-01-05 11:51:50 +00:00
initializePartMetadataManager ( ) ;
2019-10-16 18:27:53 +00:00
}
2019-10-10 16:30:30 +00:00
IMergeTreeDataPart : : IMergeTreeDataPart (
2020-02-27 16:47:40 +00:00
const MergeTreeData & storage_ ,
const String & name_ ,
const MergeTreePartInfo & info_ ,
2020-05-09 21:24:15 +00:00
const VolumePtr & volume_ ,
2020-02-27 16:47:40 +00:00
const std : : optional < String > & relative_path_ ,
2021-02-10 14:12:49 +00:00
Type part_type_ ,
const IMergeTreeDataPart * parent_part_ )
2019-10-10 16:30:30 +00:00
: storage ( storage_ )
, name ( name_ )
, info ( info_ )
2021-02-10 14:12:49 +00:00
, volume ( parent_part_ ? parent_part_ - > volume : volume_ )
2019-10-10 16:30:30 +00:00
, relative_path ( relative_path_ . value_or ( name_ ) )
2020-01-14 13:23:51 +00:00
, index_granularity_info ( storage_ , part_type_ )
, part_type ( part_type_ )
2021-02-10 14:12:49 +00:00
, parent_part ( parent_part_ )
2021-12-28 11:29:01 +00:00
, use_metadata_cache ( storage . use_metadata_cache )
2019-10-16 18:27:53 +00:00
{
2021-02-10 14:12:49 +00:00
if ( parent_part )
2021-12-30 14:27:22 +00:00
state = State : : Active ;
2021-01-21 18:17:00 +00:00
incrementStateMetric ( state ) ;
incrementTypeMetric ( part_type ) ;
2021-09-16 21:19:58 +00:00
minmax_idx = std : : make_shared < MinMaxIndex > ( ) ;
2022-01-05 12:05:22 +00:00
2022-01-05 11:51:50 +00:00
initializePartMetadataManager ( ) ;
2021-01-15 12:28:53 +00:00
}
IMergeTreeDataPart : : ~ IMergeTreeDataPart ( )
{
2021-01-21 18:17:00 +00:00
decrementStateMetric ( state ) ;
decrementTypeMetric ( part_type ) ;
2019-10-16 18:27:53 +00:00
}
2019-10-10 16:30:30 +00:00
String IMergeTreeDataPart : : getNewName ( const MergeTreePartInfo & new_part_info ) const
{
if ( storage . format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING )
{
/// NOTE: getting min and max dates from the part name (instead of part data) because we want
/// the merged part name be determined only by source part names.
/// It is simpler this way when the real min and max dates for the block range can change
/// (e.g. after an ALTER DELETE command).
DayNum min_date ;
DayNum max_date ;
MergeTreePartInfo : : parseMinMaxDatesFromPartName ( name , min_date , max_date ) ;
return new_part_info . getPartNameV0 ( min_date , max_date ) ;
}
else
return new_part_info . getPartName ( ) ;
}
2019-12-03 00:23:11 +00:00
std : : optional < size_t > IMergeTreeDataPart : : getColumnPosition ( const String & column_name ) const
2019-10-31 14:44:17 +00:00
{
2019-12-25 20:06:16 +00:00
auto it = column_name_to_position . find ( column_name ) ;
if ( it = = column_name_to_position . end ( ) )
2019-12-03 00:23:11 +00:00
return { } ;
2019-12-25 20:06:16 +00:00
return it - > second ;
2019-12-18 16:41:11 +00:00
}
2019-10-31 14:44:17 +00:00
2021-01-15 12:15:13 +00:00
void IMergeTreeDataPart : : setState ( IMergeTreeDataPart : : State new_state ) const
{
2021-01-21 18:17:00 +00:00
decrementStateMetric ( state ) ;
2021-01-15 12:15:13 +00:00
state = new_state ;
2021-01-21 18:17:00 +00:00
incrementStateMetric ( state ) ;
2021-01-15 12:15:13 +00:00
}
IMergeTreeDataPart : : State IMergeTreeDataPart : : getState ( ) const
{
return state ;
}
2021-03-22 22:16:41 +00:00
std : : pair < DayNum , DayNum > IMergeTreeDataPart : : getMinMaxDate ( ) const
2019-10-10 16:30:30 +00:00
{
2021-09-16 21:19:58 +00:00
if ( storage . minmax_idx_date_column_pos ! = - 1 & & minmax_idx - > initialized )
2021-03-22 22:16:41 +00:00
{
2021-09-16 21:19:58 +00:00
const auto & hyperrectangle = minmax_idx - > hyperrectangle [ storage . minmax_idx_date_column_pos ] ;
2021-03-22 22:16:41 +00:00
return { DayNum ( hyperrectangle . left . get < UInt64 > ( ) ) , DayNum ( hyperrectangle . right . get < UInt64 > ( ) ) } ;
}
2019-10-10 16:30:30 +00:00
else
2021-03-22 22:16:41 +00:00
return { } ;
2019-10-10 16:30:30 +00:00
}
2021-03-22 22:16:41 +00:00
std : : pair < time_t , time_t > IMergeTreeDataPart : : getMinMaxTime ( ) const
2019-10-10 16:30:30 +00:00
{
2021-09-16 21:19:58 +00:00
if ( storage . minmax_idx_time_column_pos ! = - 1 & & minmax_idx - > initialized )
2021-03-22 22:16:41 +00:00
{
2021-09-16 21:19:58 +00:00
const auto & hyperrectangle = minmax_idx - > hyperrectangle [ storage . minmax_idx_time_column_pos ] ;
2019-10-10 16:30:30 +00:00
2021-03-22 22:16:41 +00:00
/// The case of DateTime
if ( hyperrectangle . left . getType ( ) = = Field : : Types : : UInt64 )
{
assert ( hyperrectangle . right . getType ( ) = = Field : : Types : : UInt64 ) ;
return { hyperrectangle . left . get < UInt64 > ( ) , hyperrectangle . right . get < UInt64 > ( ) } ;
}
/// The case of DateTime64
else if ( hyperrectangle . left . getType ( ) = = Field : : Types : : Decimal64 )
{
2021-03-23 23:03:14 +00:00
assert ( hyperrectangle . right . getType ( ) = = Field : : Types : : Decimal64 ) ;
2019-10-10 16:30:30 +00:00
2021-03-22 22:16:41 +00:00
auto left = hyperrectangle . left . get < DecimalField < Decimal64 > > ( ) ;
auto right = hyperrectangle . right . get < DecimalField < Decimal64 > > ( ) ;
assert ( left . getScale ( ) = = right . getScale ( ) ) ;
return { left . getValue ( ) / left . getScaleMultiplier ( ) , right . getValue ( ) / right . getScaleMultiplier ( ) } ;
}
else
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Part minmax index by time is neither DateTime or DateTime64 " ) ;
}
2019-10-10 16:30:30 +00:00
else
2021-03-22 22:16:41 +00:00
return { } ;
2019-10-10 16:30:30 +00:00
}
2021-03-22 22:16:41 +00:00
2022-01-21 00:20:41 +00:00
void IMergeTreeDataPart : : setColumns ( const NamesAndTypesList & new_columns )
2019-11-18 12:22:27 +00:00
{
2019-12-25 20:06:16 +00:00
columns = new_columns ;
2021-10-29 17:21:02 +00:00
2019-12-25 20:06:16 +00:00
column_name_to_position . clear ( ) ;
column_name_to_position . reserve ( new_columns . size ( ) ) ;
size_t pos = 0 ;
2021-10-29 17:21:02 +00:00
2019-11-18 12:22:27 +00:00
for ( const auto & column : columns )
2021-11-02 03:03:52 +00:00
column_name_to_position . emplace ( column . name , pos + + ) ;
2022-01-21 00:20:41 +00:00
}
2021-10-29 17:21:02 +00:00
2022-01-21 00:20:41 +00:00
void IMergeTreeDataPart : : setSerializationInfos ( const SerializationInfoByName & new_infos )
{
serialization_infos = new_infos ;
2019-11-18 12:22:27 +00:00
}
2021-11-02 03:03:52 +00:00
SerializationPtr IMergeTreeDataPart : : getSerialization ( const NameAndTypePair & column ) const
2021-11-01 02:40:43 +00:00
{
2021-11-02 03:03:52 +00:00
auto it = serialization_infos . find ( column . getNameInStorage ( ) ) ;
return it = = serialization_infos . end ( )
? IDataType : : getSerialization ( column )
: IDataType : : getSerialization ( column , * it - > second ) ;
2021-11-01 02:40:43 +00:00
}
2019-11-18 12:22:27 +00:00
void IMergeTreeDataPart : : removeIfNeeded ( )
2019-10-10 16:30:30 +00:00
{
2022-02-17 21:26:37 +00:00
assert ( assertHasValidVersionMetadata ( ) ) ;
2021-12-10 13:29:51 +00:00
if ( ! is_temp & & state ! = State : : DeleteOnDestroy )
return ;
2019-10-31 14:44:17 +00:00
2021-12-10 13:29:51 +00:00
try
{
auto path = getFullRelativePath ( ) ;
2019-10-31 14:44:17 +00:00
2021-12-10 13:29:51 +00:00
if ( ! volume - > getDisk ( ) - > exists ( path ) )
return ;
2019-10-10 16:30:30 +00:00
2021-12-10 13:29:51 +00:00
if ( is_temp )
{
String file_name = fileName ( relative_path ) ;
2019-10-10 16:30:30 +00:00
2021-12-10 13:29:51 +00:00
if ( file_name . empty ( ) )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " relative_path {} of part {} is invalid or not set " , relative_path , name ) ;
2019-10-10 16:30:30 +00:00
2021-12-10 13:29:51 +00:00
if ( ! startsWith ( file_name , " tmp " ) & & ! endsWith ( file_name , " .tmp_proj " ) )
2021-06-09 12:36:47 +00:00
{
2021-12-10 13:29:51 +00:00
LOG_ERROR (
storage . log ,
" ~DataPart() should remove part {} but its name doesn't start with \" tmp \" or end with \" .tmp_proj \" . Too "
" suspicious, keeping the part. " ,
path ) ;
return ;
2021-06-09 12:36:47 +00:00
}
2021-12-10 13:29:51 +00:00
}
2019-11-19 09:38:17 +00:00
2021-12-10 13:29:51 +00:00
if ( parent_part )
{
2022-04-15 16:36:23 +00:00
auto [ can_remove , _ ] = canRemovePart ( ) ;
projectionRemove ( parent_part - > getFullRelativePath ( ) , ! can_remove ) ;
2019-10-31 14:44:17 +00:00
}
2021-12-10 13:29:51 +00:00
else
remove ( ) ;
if ( state = = State : : DeleteOnDestroy )
2019-10-31 14:44:17 +00:00
{
2021-12-10 13:29:51 +00:00
LOG_TRACE ( storage . log , " Removed part from old location {} " , path ) ;
2019-10-31 14:44:17 +00:00
}
}
2021-12-10 13:29:51 +00:00
catch ( . . . )
{
/// FIXME If part it temporary, then directory will not be removed for 1 day (temporary_directories_lifetime).
/// If it's tmp_merge_<part_name> or tmp_fetch_<part_name>,
/// then all future attempts to execute part producing operation will fail with "directory already exists".
/// Seems like it's especially important for remote disks, because removal may fail due to network issues.
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
assert ( ! is_temp ) ;
assert ( state ! = State : : DeleteOnDestroy ) ;
assert ( state ! = State : : Temporary ) ;
}
2019-10-10 16:30:30 +00:00
}
UInt64 IMergeTreeDataPart : : getIndexSizeInBytes ( ) const
{
UInt64 res = 0 ;
for ( const ColumnPtr & column : index )
res + = column - > byteSize ( ) ;
return res ;
}
UInt64 IMergeTreeDataPart : : getIndexSizeInAllocatedBytes ( ) const
{
UInt64 res = 0 ;
for ( const ColumnPtr & column : index )
res + = column - > allocatedBytes ( ) ;
return res ;
}
void IMergeTreeDataPart : : assertState ( const std : : initializer_list < IMergeTreeDataPart : : State > & affordable_states ) const
{
if ( ! checkState ( affordable_states ) )
{
String states_str ;
for ( auto affordable_state : affordable_states )
2021-09-06 14:24:03 +00:00
{
states_str + = stateString ( affordable_state ) ;
states_str + = ' ' ;
}
2019-10-10 16:30:30 +00:00
throw Exception ( " Unexpected state of part " + getNameWithState ( ) + " . Expected: " + states_str , ErrorCodes : : NOT_FOUND_EXPECTED_DATA_PART ) ;
}
}
void IMergeTreeDataPart : : assertOnDisk ( ) const
{
if ( ! isStoredOnDisk ( ) )
2019-11-05 11:53:22 +00:00
throw Exception ( " Data part ' " + name + " ' with type ' "
2020-02-11 13:41:26 +00:00
+ getType ( ) . toString ( ) + " ' is not stored on disk " , ErrorCodes : : LOGICAL_ERROR ) ;
2019-10-10 16:30:30 +00:00
}
UInt64 IMergeTreeDataPart : : getMarksCount ( ) const
{
return index_granularity . getMarksCount ( ) ;
}
size_t IMergeTreeDataPart : : getFileSizeOrZero ( const String & file_name ) const
{
auto checksum = checksums . files . find ( file_name ) ;
if ( checksum = = checksums . files . end ( ) )
return 0 ;
return checksum - > second . file_size ;
}
2022-03-28 17:21:47 +00:00
String IMergeTreeDataPart : : getColumnNameWithMinimumCompressedSize ( const StorageSnapshotPtr & storage_snapshot ) const
2020-01-15 18:24:10 +00:00
{
2022-03-28 17:21:47 +00:00
auto options = GetColumnsOptions ( GetColumnsOptions : : AllPhysical ) . withExtendedObjects ( ) . withSubcolumns ( ) ;
auto storage_columns = storage_snapshot - > getColumns ( options ) ;
2021-02-10 14:12:49 +00:00
MergeTreeData : : AlterConversions alter_conversions ;
if ( ! parent_part )
alter_conversions = storage . getAlterConversionsForPart ( shared_from_this ( ) ) ;
2020-05-15 10:26:44 +00:00
std : : optional < std : : string > minimum_size_column ;
2020-01-15 18:24:10 +00:00
UInt64 minimum_size = std : : numeric_limits < UInt64 > : : max ( ) ;
for ( const auto & column : storage_columns )
{
2020-05-15 10:26:44 +00:00
auto column_name = column . name ;
auto column_type = column . type ;
if ( alter_conversions . isColumnRenamed ( column . name ) )
column_name = alter_conversions . getColumnOldName ( column . name ) ;
2020-09-14 11:22:17 +00:00
if ( ! hasColumnFiles ( column ) )
2020-01-15 18:24:10 +00:00
continue ;
2021-12-09 10:39:28 +00:00
const auto size = getColumnSize ( column_name ) . data_compressed ;
2020-01-15 18:24:10 +00:00
if ( size < minimum_size )
{
minimum_size = size ;
2020-05-15 10:26:44 +00:00
minimum_size_column = column_name ;
2020-01-15 18:24:10 +00:00
}
}
if ( ! minimum_size_column )
throw Exception ( " Could not find a column of minimum size in MergeTree, part " + getFullPath ( ) , ErrorCodes : : LOGICAL_ERROR ) ;
return * minimum_size_column ;
}
2019-10-10 16:30:30 +00:00
String IMergeTreeDataPart : : getFullPath ( ) const
{
if ( relative_path . empty ( ) )
throw Exception ( " Part relative_path cannot be empty. It's bug. " , ErrorCodes : : LOGICAL_ERROR ) ;
2021-05-12 06:53:04 +00:00
return fs : : path ( storage . getFullPathOnDisk ( volume - > getDisk ( ) ) ) / ( parent_part ? parent_part - > relative_path : " " ) / relative_path / " " ;
2019-10-10 16:30:30 +00:00
}
2020-02-27 16:47:40 +00:00
String IMergeTreeDataPart : : getFullRelativePath ( ) const
{
if ( relative_path . empty ( ) )
throw Exception ( " Part relative_path cannot be empty. It's bug. " , ErrorCodes : : LOGICAL_ERROR ) ;
2021-05-12 06:53:04 +00:00
return fs : : path ( storage . relative_data_path ) / ( parent_part ? parent_part - > relative_path : " " ) / relative_path / " " ;
2020-02-27 16:47:40 +00:00
}
2019-11-18 15:18:50 +00:00
void IMergeTreeDataPart : : loadColumnsChecksumsIndexes ( bool require_columns_checksums , bool check_consistency )
2019-10-31 14:44:17 +00:00
{
assertOnDisk ( ) ;
/// Memory should not be limited during ATTACH TABLE query.
/// This is already true at the server startup but must be also ensured for manual table ATTACH.
/// Motivation: memory for index is shared between queries - not belong to the query itself.
2022-01-10 19:39:10 +00:00
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker ( VariableContext : : Global ) ;
2019-10-31 14:44:17 +00:00
2022-03-31 02:10:05 +00:00
try
2021-02-10 14:12:49 +00:00
{
2022-03-31 02:10:05 +00:00
loadUUID ( ) ;
loadColumns ( require_columns_checksums ) ;
loadChecksums ( require_columns_checksums ) ;
loadIndexGranularity ( ) ;
calculateColumnsAndSecondaryIndicesSizesOnDisk ( ) ;
loadIndex ( ) ; /// Must be called after loadIndexGranularity as it uses the value of `index_granularity`
loadRowsCount ( ) ; /// Must be called after loadIndexGranularity() as it uses the value of `index_granularity`.
loadPartitionAndMinMaxIndex ( ) ;
if ( ! parent_part )
{
loadTTLInfos ( ) ;
loadProjections ( require_columns_checksums , check_consistency ) ;
}
2021-02-10 14:12:49 +00:00
2022-03-31 02:10:05 +00:00
if ( check_consistency )
checkConsistency ( require_columns_checksums ) ;
2021-06-04 13:06:57 +00:00
2022-03-31 02:10:05 +00:00
loadDefaultCompressionCodec ( ) ;
}
catch ( . . . )
{
// There could be conditions that data part to be loaded is broken, but some of meta infos are already written
// into meta data before exception, need to clean them all.
metadata_manager - > deleteAll ( /*include_projection*/ true ) ;
metadata_manager - > assertAllDeleted ( /*include_projection*/ true ) ;
throw ;
2021-02-10 14:12:49 +00:00
}
2019-10-31 14:44:17 +00:00
}
2021-12-08 02:40:59 +00:00
void IMergeTreeDataPart : : appendFilesOfColumnsChecksumsIndexes ( Strings & files , bool include_projection ) const
{
if ( isStoredOnDisk ( ) )
{
appendFilesOfUUID ( files ) ;
appendFilesOfColumns ( files ) ;
appendFilesOfChecksums ( files ) ;
appendFilesOfIndexGranularity ( files ) ;
2022-01-04 05:41:11 +00:00
appendFilesOfIndex ( files ) ;
2021-12-08 02:40:59 +00:00
appendFilesOfRowsCount ( files ) ;
appendFilesOfPartitionAndMinMaxIndex ( files ) ;
appendFilesOfTTLInfos ( files ) ;
appendFilesOfDefaultCompressionCodec ( files ) ;
}
if ( ! parent_part & & include_projection )
{
for ( const auto & [ projection_name , projection_part ] : projection_parts )
{
Strings projection_files ;
projection_part - > appendFilesOfColumnsChecksumsIndexes ( projection_files , true ) ;
for ( const auto & projection_file : projection_files )
files . push_back ( fs : : path ( projection_part - > relative_path ) / projection_file ) ;
}
}
}
2021-02-10 14:12:49 +00:00
void IMergeTreeDataPart : : loadProjections ( bool require_columns_checksums , bool check_consistency )
{
auto metadata_snapshot = storage . getInMemoryMetadataPtr ( ) ;
for ( const auto & projection : metadata_snapshot - > projections )
{
String path = getFullRelativePath ( ) + projection . name + " .proj " ;
if ( volume - > getDisk ( ) - > exists ( path ) )
{
auto part = storage . createPart ( projection . name , { " all " , 0 , 0 , 0 } , volume , projection . name + " .proj " , this ) ;
part - > loadColumnsChecksumsIndexes ( require_columns_checksums , check_consistency ) ;
projection_parts . emplace ( projection . name , std : : move ( part ) ) ;
}
}
}
2019-10-31 14:44:17 +00:00
void IMergeTreeDataPart : : loadIndexGranularity ( )
{
2020-02-11 13:41:26 +00:00
throw Exception ( " Method 'loadIndexGranularity' is not implemented for part with type " + getType ( ) . toString ( ) , ErrorCodes : : NOT_IMPLEMENTED ) ;
2019-10-31 14:44:17 +00:00
}
2022-03-23 04:13:42 +00:00
/// Currently we don't cache mark files of part, because cache other meta files is enough to speed up loading.
2021-12-08 02:40:59 +00:00
void IMergeTreeDataPart : : appendFilesOfIndexGranularity ( Strings & /* files */ ) const
{
}
2019-10-31 14:44:17 +00:00
void IMergeTreeDataPart : : loadIndex ( )
{
/// It can be empty in case of mutations
if ( ! index_granularity . isInitialized ( ) )
throw Exception ( " Index granularity is not loaded before index loading " , ErrorCodes : : LOGICAL_ERROR ) ;
2020-06-17 12:39:20 +00:00
auto metadata_snapshot = storage . getInMemoryMetadataPtr ( ) ;
2021-02-10 14:12:49 +00:00
if ( parent_part )
metadata_snapshot = metadata_snapshot - > projections . get ( name ) . metadata ;
2020-06-17 12:39:20 +00:00
const auto & primary_key = metadata_snapshot - > getPrimaryKey ( ) ;
2020-05-21 19:46:03 +00:00
size_t key_size = primary_key . column_names . size ( ) ;
2019-10-31 14:44:17 +00:00
if ( key_size )
{
MutableColumns loaded_index ;
loaded_index . resize ( key_size ) ;
for ( size_t i = 0 ; i < key_size ; + + i )
{
2020-05-20 18:11:38 +00:00
loaded_index [ i ] = primary_key . data_types [ i ] - > createColumn ( ) ;
2019-10-31 14:44:17 +00:00
loaded_index [ i ] - > reserve ( index_granularity . getMarksCount ( ) ) ;
}
2022-01-05 11:51:50 +00:00
String index_name = " primary.idx " ;
String index_path = fs : : path ( getFullRelativePath ( ) ) / index_name ;
auto index_file = metadata_manager - > read ( index_name ) ;
2020-05-25 23:47:11 +00:00
size_t marks_count = index_granularity . getMarksCount ( ) ;
2021-10-29 17:21:02 +00:00
Serializations key_serializations ( key_size ) ;
2021-04-04 09:17:54 +00:00
for ( size_t j = 0 ; j < key_size ; + + j )
2021-10-29 17:21:02 +00:00
key_serializations [ j ] = primary_key . data_types [ j ] - > getDefaultSerialization ( ) ;
2021-04-04 09:17:54 +00:00
2020-05-25 23:47:11 +00:00
for ( size_t i = 0 ; i < marks_count ; + + i ) //-V756
2019-10-31 14:44:17 +00:00
for ( size_t j = 0 ; j < key_size ; + + j )
2021-10-29 17:21:02 +00:00
key_serializations [ j ] - > deserializeBinary ( * loaded_index [ j ] , * index_file ) ;
2019-10-31 14:44:17 +00:00
for ( size_t i = 0 ; i < key_size ; + + i )
{
loaded_index [ i ] - > protect ( ) ;
2020-05-25 23:47:11 +00:00
if ( loaded_index [ i ] - > size ( ) ! = marks_count )
2019-10-31 14:44:17 +00:00
throw Exception ( " Cannot read all data from index file " + index_path
2020-05-25 23:47:11 +00:00
+ " (expected size: " + toString ( marks_count ) + " , read: " + toString ( loaded_index [ i ] - > size ( ) ) + " ) " ,
2019-10-31 14:44:17 +00:00
ErrorCodes : : CANNOT_READ_ALL_DATA ) ;
}
2020-02-27 16:47:40 +00:00
if ( ! index_file - > eof ( ) )
2020-05-09 21:24:15 +00:00
throw Exception ( " Index file " + fullPath ( volume - > getDisk ( ) , index_path ) + " is unexpectedly long " , ErrorCodes : : EXPECTED_END_OF_FILE ) ;
2019-10-31 14:44:17 +00:00
index . assign ( std : : make_move_iterator ( loaded_index . begin ( ) ) , std : : make_move_iterator ( loaded_index . end ( ) ) ) ;
}
}
2022-01-04 05:41:11 +00:00
void IMergeTreeDataPart : : appendFilesOfIndex ( Strings & files ) const
2021-12-08 02:40:59 +00:00
{
auto metadata_snapshot = storage . getInMemoryMetadataPtr ( ) ;
if ( parent_part )
metadata_snapshot = metadata_snapshot - > projections . has ( name ) ? metadata_snapshot - > projections . get ( name ) . metadata : nullptr ;
if ( ! metadata_snapshot )
return ;
2021-12-29 04:31:54 +00:00
if ( metadata_snapshot - > hasPrimaryKey ( ) )
2021-12-08 02:40:59 +00:00
files . push_back ( " primary.idx " ) ;
}
2020-08-26 15:29:46 +00:00
NameSet IMergeTreeDataPart : : getFileNamesWithoutChecksums ( ) const
{
if ( ! isStoredOnDisk ( ) )
return { } ;
NameSet result = { " checksums.txt " , " columns.txt " } ;
2021-05-05 15:10:14 +00:00
String default_codec_path = fs : : path ( getFullRelativePath ( ) ) / DEFAULT_COMPRESSION_CODEC_FILE_NAME ;
2022-02-03 18:57:09 +00:00
String txn_version_path = fs : : path ( getFullRelativePath ( ) ) / TXN_VERSION_METADATA_FILE_NAME ;
2020-10-15 16:17:16 +00:00
2020-08-26 19:41:57 +00:00
if ( volume - > getDisk ( ) - > exists ( default_codec_path ) )
2020-08-26 15:29:46 +00:00
result . emplace ( DEFAULT_COMPRESSION_CODEC_FILE_NAME ) ;
2022-02-03 18:57:09 +00:00
if ( volume - > getDisk ( ) - > exists ( txn_version_path ) )
result . emplace ( TXN_VERSION_METADATA_FILE_NAME ) ;
2020-08-26 15:29:46 +00:00
return result ;
}
2020-08-28 09:07:20 +00:00
void IMergeTreeDataPart : : loadDefaultCompressionCodec ( )
2020-08-26 15:29:46 +00:00
{
2020-08-27 08:35:55 +00:00
/// In memory parts doesn't have any compression
2020-08-26 15:29:46 +00:00
if ( ! isStoredOnDisk ( ) )
2020-08-27 13:32:23 +00:00
{
default_codec = CompressionCodecFactory : : instance ( ) . get ( " NONE " , { } ) ;
2020-08-28 09:07:20 +00:00
return ;
2020-08-27 13:32:23 +00:00
}
2020-08-26 15:29:46 +00:00
2021-05-05 15:10:14 +00:00
String path = fs : : path ( getFullRelativePath ( ) ) / DEFAULT_COMPRESSION_CODEC_FILE_NAME ;
2022-01-05 11:51:50 +00:00
bool exists = metadata_manager - > exists ( DEFAULT_COMPRESSION_CODEC_FILE_NAME ) ;
2021-12-28 10:06:13 +00:00
if ( ! exists )
2020-08-28 09:07:20 +00:00
{
default_codec = detectDefaultCompressionCodec ( ) ;
}
else
{
2022-01-05 11:51:50 +00:00
auto file_buf = metadata_manager - > read ( DEFAULT_COMPRESSION_CODEC_FILE_NAME ) ;
2020-08-28 09:07:20 +00:00
String codec_line ;
readEscapedStringUntilEOL ( codec_line , * file_buf ) ;
2020-08-26 15:29:46 +00:00
2020-08-28 09:07:20 +00:00
ReadBufferFromString buf ( codec_line ) ;
2020-08-26 15:29:46 +00:00
2020-08-28 09:07:20 +00:00
if ( ! checkString ( " CODEC " , buf ) )
{
2021-12-08 02:40:59 +00:00
LOG_WARNING (
storage . log ,
" Cannot parse default codec for part {} from file {}, content '{}'. Default compression codec will be deduced "
" automatically, from data on disk " ,
name ,
path ,
codec_line ) ;
2020-08-28 09:07:20 +00:00
default_codec = detectDefaultCompressionCodec ( ) ;
}
2020-08-26 15:29:46 +00:00
2020-08-28 09:07:20 +00:00
try
{
ParserCodec codec_parser ;
auto codec_ast = parseQuery ( codec_parser , codec_line . data ( ) + buf . getPosition ( ) , codec_line . data ( ) + codec_line . length ( ) , " codec parser " , 0 , DBMS_DEFAULT_MAX_PARSER_DEPTH ) ;
default_codec = CompressionCodecFactory : : instance ( ) . get ( codec_ast , { } ) ;
}
catch ( const DB : : Exception & ex )
{
LOG_WARNING ( storage . log , " Cannot parse default codec for part {} from file {}, content '{}', error '{}'. Default compression codec will be deduced automatically, from data on disk. " , name , path , codec_line , ex . what ( ) ) ;
default_codec = detectDefaultCompressionCodec ( ) ;
}
2020-08-26 15:29:46 +00:00
}
2020-08-28 09:07:20 +00:00
}
2020-08-26 15:29:46 +00:00
2021-12-31 03:13:38 +00:00
void IMergeTreeDataPart : : appendFilesOfDefaultCompressionCodec ( Strings & files )
2021-12-08 02:40:59 +00:00
{
files . push_back ( DEFAULT_COMPRESSION_CODEC_FILE_NAME ) ;
}
2020-08-28 09:07:20 +00:00
CompressionCodecPtr IMergeTreeDataPart : : detectDefaultCompressionCodec ( ) const
{
/// In memory parts doesn't have any compression
if ( ! isStoredOnDisk ( ) )
return CompressionCodecFactory : : instance ( ) . get ( " NONE " , { } ) ;
auto metadata_snapshot = storage . getInMemoryMetadataPtr ( ) ;
const auto & storage_columns = metadata_snapshot - > getColumns ( ) ;
CompressionCodecPtr result = nullptr ;
for ( const auto & part_column : columns )
2020-08-26 15:29:46 +00:00
{
2020-08-31 13:39:27 +00:00
/// It was compressed with default codec and it's not empty
2021-12-09 10:39:28 +00:00
auto column_size = getColumnSize ( part_column . name ) ;
2020-08-31 13:39:27 +00:00
if ( column_size . data_compressed ! = 0 & & ! storage_columns . hasCompressionCodec ( part_column . name ) )
2020-08-28 09:07:20 +00:00
{
2021-01-15 09:04:23 +00:00
String path_to_data_file ;
2021-11-02 03:03:52 +00:00
getSerialization ( part_column ) - > enumerateStreams ( [ & ] ( const ISerialization : : SubstreamPath & substream_path )
2020-12-23 11:53:49 +00:00
{
2021-01-15 09:04:23 +00:00
if ( path_to_data_file . empty ( ) )
{
2021-05-05 15:10:14 +00:00
String candidate_path = fs : : path ( getFullRelativePath ( ) ) / ( ISerialization : : getFileNameForStream ( part_column , substream_path ) + " .bin " ) ;
2021-01-15 09:04:23 +00:00
2021-01-15 09:10:03 +00:00
/// We can have existing, but empty .bin files. Example: LowCardinality(Nullable(...)) columns and column_name.dict.null.bin file.
2021-01-15 09:04:23 +00:00
if ( volume - > getDisk ( ) - > exists ( candidate_path ) & & volume - > getDisk ( ) - > getFileSize ( candidate_path ) ! = 0 )
path_to_data_file = candidate_path ;
}
} ) ;
if ( path_to_data_file . empty ( ) )
2020-12-23 11:53:49 +00:00
{
2021-01-15 09:04:23 +00:00
LOG_WARNING ( storage . log , " Part's {} column {} has non zero data compressed size, but all data files don't exist or empty " , name , backQuoteIfNeed ( part_column . name ) ) ;
2020-12-23 11:53:49 +00:00
continue ;
}
2021-01-15 09:04:23 +00:00
result = getCompressionCodecForFile ( volume - > getDisk ( ) , path_to_data_file ) ;
2020-08-28 09:07:20 +00:00
break ;
}
2020-08-26 15:29:46 +00:00
}
2020-08-28 09:07:20 +00:00
if ( ! result )
result = CompressionCodecFactory : : instance ( ) . getDefaultCodec ( ) ;
2020-08-26 15:29:46 +00:00
2020-08-28 09:07:20 +00:00
return result ;
2020-08-26 15:29:46 +00:00
}
2019-10-31 14:44:17 +00:00
void IMergeTreeDataPart : : loadPartitionAndMinMaxIndex ( )
{
2021-02-10 14:12:49 +00:00
if ( storage . format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING & & ! parent_part )
2019-10-31 14:44:17 +00:00
{
DayNum min_date ;
DayNum max_date ;
MergeTreePartInfo : : parseMinMaxDatesFromPartName ( name , min_date , max_date ) ;
const auto & date_lut = DateLUT : : instance ( ) ;
partition = MergeTreePartition ( date_lut . toNumYYYYMM ( min_date ) ) ;
2021-09-16 21:19:58 +00:00
minmax_idx = std : : make_shared < MinMaxIndex > ( min_date , max_date ) ;
2019-10-31 14:44:17 +00:00
}
else
{
2020-02-27 16:47:40 +00:00
String path = getFullRelativePath ( ) ;
2021-02-10 14:12:49 +00:00
if ( ! parent_part )
2022-01-05 11:51:50 +00:00
partition . load ( storage , metadata_manager ) ;
2021-02-10 14:12:49 +00:00
2019-10-31 14:44:17 +00:00
if ( ! isEmpty ( ) )
2021-02-10 14:12:49 +00:00
{
if ( parent_part )
// projection parts don't have minmax_idx, and it's always initialized
2021-09-16 21:19:58 +00:00
minmax_idx - > initialized = true ;
2021-02-10 14:12:49 +00:00
else
2022-01-05 11:51:50 +00:00
minmax_idx - > load ( storage , metadata_manager ) ;
2021-02-10 14:12:49 +00:00
}
if ( parent_part )
return ;
2019-10-31 14:44:17 +00:00
}
2020-06-17 10:34:23 +00:00
auto metadata_snapshot = storage . getInMemoryMetadataPtr ( ) ;
String calculated_partition_id = partition . getID ( metadata_snapshot - > getPartitionKey ( ) . sample_block ) ;
2019-10-31 14:44:17 +00:00
if ( calculated_partition_id ! = info . partition_id )
throw Exception (
" While loading part " + getFullPath ( ) + " : calculated partition ID: " + calculated_partition_id
+ " differs from partition ID in part name: " + info . partition_id ,
ErrorCodes : : CORRUPTED_DATA ) ;
}
2021-12-08 02:40:59 +00:00
void IMergeTreeDataPart : : appendFilesOfPartitionAndMinMaxIndex ( Strings & files ) const
{
if ( storage . format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING & & ! parent_part )
return ;
if ( ! parent_part )
partition . appendFiles ( storage , files ) ;
if ( ! isEmpty ( ) )
if ( ! parent_part )
minmax_idx - > appendFiles ( storage , files ) ;
}
2019-10-31 14:44:17 +00:00
void IMergeTreeDataPart : : loadChecksums ( bool require )
{
2021-05-05 15:10:14 +00:00
const String path = fs : : path ( getFullRelativePath ( ) ) / " checksums.txt " ;
2022-01-05 11:51:50 +00:00
bool exists = metadata_manager - > exists ( " checksums.txt " ) ;
2021-12-28 10:06:13 +00:00
if ( exists )
2019-10-31 14:44:17 +00:00
{
2022-01-05 11:51:50 +00:00
auto buf = metadata_manager - > read ( " checksums.txt " ) ;
2020-02-27 16:47:40 +00:00
if ( checksums . read ( * buf ) )
2019-10-31 14:44:17 +00:00
{
2020-02-27 16:47:40 +00:00
assertEOF ( * buf ) ;
2019-10-31 14:44:17 +00:00
bytes_on_disk = checksums . getTotalSizeOnDisk ( ) ;
}
else
2020-05-09 21:24:15 +00:00
bytes_on_disk = calculateTotalSizeOnDisk ( volume - > getDisk ( ) , getFullRelativePath ( ) ) ;
2019-10-31 14:44:17 +00:00
}
else
{
if ( require )
2021-03-22 13:27:35 +00:00
throw Exception ( ErrorCodes : : NO_FILE_IN_DATA_PART , " No checksums.txt in part {} " , name ) ;
2019-10-31 14:44:17 +00:00
2020-07-16 10:54:49 +00:00
/// If the checksums file is not present, calculate the checksums and write them to disk.
/// Check the data while we are at it.
LOG_WARNING ( storage . log , " Checksums for part {} not found. Will calculate them from data on disk. " , name ) ;
2021-03-22 13:27:35 +00:00
2020-07-16 10:54:49 +00:00
checksums = checkDataPart ( shared_from_this ( ) , false ) ;
2021-03-22 13:27:35 +00:00
2020-07-16 10:54:49 +00:00
{
2021-05-05 15:10:14 +00:00
auto out = volume - > getDisk ( ) - > writeFile ( fs : : path ( getFullRelativePath ( ) ) / " checksums.txt.tmp " , 4096 ) ;
2020-07-16 10:54:49 +00:00
checksums . write ( * out ) ;
}
2021-05-05 15:10:14 +00:00
volume - > getDisk ( ) - > moveFile ( fs : : path ( getFullRelativePath ( ) ) / " checksums.txt.tmp " , fs : : path ( getFullRelativePath ( ) ) / " checksums.txt " ) ;
2020-07-16 10:54:49 +00:00
bytes_on_disk = checksums . getTotalSizeOnDisk ( ) ;
2019-10-31 14:44:17 +00:00
}
}
2021-12-31 03:13:38 +00:00
void IMergeTreeDataPart : : appendFilesOfChecksums ( Strings & files )
2021-12-08 02:40:59 +00:00
{
files . push_back ( " checksums.txt " ) ;
}
2019-10-31 14:44:17 +00:00
void IMergeTreeDataPart : : loadRowsCount ( )
{
2021-05-05 15:10:14 +00:00
String path = fs : : path ( getFullRelativePath ( ) ) / " count.txt " ;
2021-04-17 01:06:59 +00:00
auto read_rows_count = [ & ] ( )
{
2022-01-05 11:51:50 +00:00
auto buf = metadata_manager - > read ( " count.txt " ) ;
2021-04-17 01:06:59 +00:00
readIntText ( rows_count , * buf ) ;
assertEOF ( * buf ) ;
} ;
2019-10-31 14:44:17 +00:00
if ( index_granularity . empty ( ) )
{
rows_count = 0 ;
}
2021-02-10 14:12:49 +00:00
else if ( storage . format_version > = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING | | part_type = = Type : : COMPACT | | parent_part )
2019-10-31 14:44:17 +00:00
{
2022-01-05 11:51:50 +00:00
bool exists = metadata_manager - > exists ( " count.txt " ) ;
if ( ! exists )
2019-10-31 14:44:17 +00:00
throw Exception ( " No count.txt in part " + name , ErrorCodes : : NO_FILE_IN_DATA_PART ) ;
2021-04-17 01:06:59 +00:00
read_rows_count ( ) ;
2020-07-25 14:42:20 +00:00
# ifndef NDEBUG
/// columns have to be loaded
for ( const auto & column : getColumns ( ) )
{
2020-07-27 09:42:37 +00:00
/// Most trivial types
2021-04-11 23:30:04 +00:00
if ( column . type - > isValueRepresentedByNumber ( )
& & ! column . type - > haveSubtypes ( )
2021-11-02 20:30:28 +00:00
& & getSerialization ( column ) - > getKind ( ) = = ISerialization : : Kind : : DEFAULT )
2020-07-25 14:42:20 +00:00
{
2021-12-09 10:39:28 +00:00
auto size = getColumnSize ( column . name ) ;
2020-07-25 14:42:20 +00:00
if ( size . data_uncompressed = = 0 )
continue ;
size_t rows_in_column = size . data_uncompressed / column . type - > getSizeOfValueInMemory ( ) ;
if ( rows_in_column ! = rows_count )
{
throw Exception (
ErrorCodes : : LOGICAL_ERROR ,
" Column {} has rows count {} according to size in memory "
" and size of single value, but data part {} has {} rows " , backQuote ( column . name ) , rows_in_column , name , rows_count ) ;
}
2020-12-09 11:23:37 +00:00
2020-12-09 11:46:04 +00:00
size_t last_possibly_incomplete_mark_rows = index_granularity . getLastNonFinalMarkRows ( ) ;
/// All this rows have to be written in column
size_t index_granularity_without_last_mark = index_granularity . getTotalRows ( ) - last_possibly_incomplete_mark_rows ;
/// We have more rows in column than in index granularity without last possibly incomplete mark
if ( rows_in_column < index_granularity_without_last_mark )
2020-12-09 11:23:37 +00:00
{
throw Exception (
ErrorCodes : : LOGICAL_ERROR ,
" Column {} has rows count {} according to size in memory "
2020-12-09 11:46:04 +00:00
" and size of single value, but index granularity in part {} without last mark has {} rows, which is more than in column " ,
backQuote ( column . name ) , rows_in_column , name , index_granularity . getTotalRows ( ) ) ;
}
/// In last mark we actually written less or equal rows than stored in last mark of index granularity
if ( rows_in_column - index_granularity_without_last_mark > last_possibly_incomplete_mark_rows )
{
throw Exception (
ErrorCodes : : LOGICAL_ERROR ,
" Column {} has rows count {} in last mark according to size in memory "
" and size of single value, but index granularity in part {} in last mark has {} rows which is less than in column " ,
backQuote ( column . name ) , rows_in_column - index_granularity_without_last_mark , name , last_possibly_incomplete_mark_rows ) ;
2020-12-09 11:23:37 +00:00
}
2020-07-25 14:42:20 +00:00
}
}
# endif
2019-10-31 14:44:17 +00:00
}
else
{
2021-04-17 01:06:59 +00:00
if ( volume - > getDisk ( ) - > exists ( path ) )
{
read_rows_count ( ) ;
return ;
}
2019-10-31 14:44:17 +00:00
for ( const NameAndTypePair & column : columns )
{
2021-11-02 03:03:52 +00:00
ColumnPtr column_col = column . type - > createColumn ( * getSerialization ( column ) ) ;
2019-10-31 14:44:17 +00:00
if ( ! column_col - > isFixedAndContiguous ( ) | | column_col - > lowCardinality ( ) )
continue ;
2021-12-09 10:39:28 +00:00
size_t column_size = getColumnSize ( column . name ) . data_uncompressed ;
2019-10-31 14:44:17 +00:00
if ( ! column_size )
continue ;
size_t sizeof_field = column_col - > sizeOfValueIfFixed ( ) ;
rows_count = column_size / sizeof_field ;
if ( column_size % sizeof_field ! = 0 )
{
throw Exception (
" Uncompressed size of column " + column . name + " ( " + toString ( column_size )
+ " ) is not divisible by the size of value ( " + toString ( sizeof_field ) + " ) " ,
ErrorCodes : : LOGICAL_ERROR ) ;
}
size_t last_mark_index_granularity = index_granularity . getLastNonFinalMarkRows ( ) ;
size_t rows_approx = index_granularity . getTotalRows ( ) ;
if ( ! ( rows_count < = rows_approx & & rows_approx < rows_count + last_mark_index_granularity ) )
throw Exception (
" Unexpected size of column " + column . name + " : " + toString ( rows_count ) + " rows, expected "
+ toString ( rows_approx ) + " +- " + toString ( last_mark_index_granularity ) + " rows according to the index " ,
ErrorCodes : : LOGICAL_ERROR ) ;
return ;
}
throw Exception ( " Data part doesn't contain fixed size column (even Date column) " , ErrorCodes::LOGICAL_ERROR) ;
}
}
2021-12-31 03:13:38 +00:00
void IMergeTreeDataPart : : appendFilesOfRowsCount ( Strings & files )
2021-12-08 02:40:59 +00:00
{
files . push_back ( " count.txt " ) ;
}
2019-10-31 14:44:17 +00:00
void IMergeTreeDataPart : : loadTTLInfos ( )
{
2022-01-05 11:51:50 +00:00
bool exists = metadata_manager - > exists ( " ttl.txt " ) ;
2021-12-28 10:06:13 +00:00
if ( exists )
2019-10-31 14:44:17 +00:00
{
2022-01-05 11:51:50 +00:00
auto in = metadata_manager - > read ( " ttl.txt " ) ;
2020-02-27 16:47:40 +00:00
assertString ( " ttl format version: " , * in ) ;
2019-10-31 14:44:17 +00:00
size_t format_version ;
2020-02-27 16:47:40 +00:00
readText ( format_version , * in ) ;
assertChar ( ' \n ' , * in ) ;
2019-10-31 14:44:17 +00:00
if ( format_version = = 1 )
{
try
{
2020-02-27 16:47:40 +00:00
ttl_infos . read ( * in ) ;
2019-10-31 14:44:17 +00:00
}
catch ( const JSONException & )
{
throw Exception ( " Error while parsing file ttl.txt in part: " + name , ErrorCodes : : BAD_TTL_FILE ) ;
}
}
else
throw Exception ( " Unknown ttl format version: " + toString ( format_version ) , ErrorCodes : : BAD_TTL_FILE ) ;
}
}
2021-12-08 02:40:59 +00:00
2021-12-31 03:13:38 +00:00
void IMergeTreeDataPart : : appendFilesOfTTLInfos ( Strings & files )
2020-10-15 16:17:16 +00:00
{
2021-12-08 02:40:59 +00:00
files . push_back ( " ttl.txt " ) ;
}
2020-10-15 16:17:16 +00:00
void IMergeTreeDataPart : : loadUUID ( )
{
2022-01-05 11:51:50 +00:00
bool exists = metadata_manager - > exists ( UUID_FILE_NAME ) ;
2021-12-28 10:06:13 +00:00
if ( exists )
2020-10-15 16:17:16 +00:00
{
2022-01-05 11:51:50 +00:00
auto in = metadata_manager - > read ( UUID_FILE_NAME ) ;
2020-10-15 16:17:16 +00:00
readText ( uuid , * in ) ;
if ( uuid = = UUIDHelpers : : Nil )
throw Exception ( " Unexpected empty " + String ( UUID_FILE_NAME ) + " in part: " + name , ErrorCodes : : LOGICAL_ERROR ) ;
}
}
2021-12-31 03:13:38 +00:00
void IMergeTreeDataPart : : appendFilesOfUUID ( Strings & files )
2021-12-08 02:40:59 +00:00
{
files . push_back ( UUID_FILE_NAME ) ;
}
2019-10-31 14:44:17 +00:00
void IMergeTreeDataPart : : loadColumns ( bool require )
{
2021-05-05 15:10:14 +00:00
String path = fs : : path ( getFullRelativePath ( ) ) / " columns.txt " ;
2020-06-17 16:39:58 +00:00
auto metadata_snapshot = storage . getInMemoryMetadataPtr ( ) ;
2021-02-10 14:12:49 +00:00
if ( parent_part )
metadata_snapshot = metadata_snapshot - > projections . get ( name ) . metadata ;
2021-01-21 12:34:11 +00:00
NamesAndTypesList loaded_columns ;
2022-01-05 11:51:50 +00:00
bool exists = metadata_manager - > exists ( " columns.txt " ) ;
2021-12-28 10:06:13 +00:00
if ( ! exists )
2019-10-31 14:44:17 +00:00
{
2020-01-15 18:24:10 +00:00
/// We can get list of columns only from columns.txt in compact parts.
if ( require | | part_type = = Type : : COMPACT )
2021-01-14 16:26:56 +00:00
throw Exception ( " No columns.txt in part " + name + " , expected path " + path + " on drive " + volume - > getDisk ( ) - > getName ( ) ,
ErrorCodes : : NO_FILE_IN_DATA_PART ) ;
2019-10-31 14:44:17 +00:00
/// If there is no file with a list of columns, write it down.
2020-06-17 16:39:58 +00:00
for ( const NameAndTypePair & column : metadata_snapshot - > getColumns ( ) . getAllPhysical ( ) )
2021-05-05 15:10:14 +00:00
if ( volume - > getDisk ( ) - > exists ( fs : : path ( getFullRelativePath ( ) ) / ( getFileNameForColumn ( column ) + " .bin " ) ) )
2021-01-21 12:34:11 +00:00
loaded_columns . push_back ( column ) ;
2019-10-31 14:44:17 +00:00
if ( columns . empty ( ) )
throw Exception ( " No columns in part " + name , ErrorCodes : : NO_FILE_IN_DATA_PART ) ;
{
2020-05-09 21:24:15 +00:00
auto buf = volume - > getDisk ( ) - > writeFile ( path + " .tmp " , 4096 ) ;
2021-01-21 12:34:11 +00:00
loaded_columns . writeText ( * buf ) ;
2019-10-31 14:44:17 +00:00
}
2020-05-09 21:24:15 +00:00
volume - > getDisk ( ) - > moveFile ( path + " .tmp " , path ) ;
2019-12-09 21:21:17 +00:00
}
else
{
2022-01-05 11:51:50 +00:00
auto in = metadata_manager - > read ( " columns.txt " ) ;
2021-12-08 02:40:59 +00:00
loaded_columns . readText ( * in ) ;
2021-11-27 09:40:46 +00:00
for ( const auto & column : loaded_columns )
{
const auto * aggregate_function_data_type = typeid_cast < const DataTypeAggregateFunction * > ( column . type . get ( ) ) ;
if ( aggregate_function_data_type & & aggregate_function_data_type - > isVersioned ( ) )
aggregate_function_data_type - > setVersion ( 0 , /* if_empty */ true ) ;
}
2019-10-31 14:44:17 +00:00
}
2021-10-29 17:21:02 +00:00
SerializationInfo : : Settings settings =
{
2021-12-08 15:29:00 +00:00
. ratio_of_defaults_for_sparse = storage . getSettings ( ) - > ratio_of_defaults_for_sparse_serialization ,
2021-10-29 17:21:02 +00:00
. choose_kind = false ,
} ;
SerializationInfoByName infos ( loaded_columns , settings ) ;
2022-03-23 04:13:42 +00:00
exists = metadata_manager - > exists ( SERIALIZATION_FILE_NAME ) ;
if ( exists )
{
auto in = metadata_manager - > read ( SERIALIZATION_FILE_NAME ) ;
infos . readJSON ( * in ) ;
}
2021-10-29 17:21:02 +00:00
2022-01-21 00:20:41 +00:00
setColumns ( loaded_columns ) ;
setSerializationInfos ( infos ) ;
2019-10-31 14:44:17 +00:00
}
2022-02-14 19:50:08 +00:00
void IMergeTreeDataPart : : assertHasVersionMetadata ( MergeTreeTransaction * txn ) const
{
TransactionID expected_tid = txn ? txn - > tid : Tx : : PrehistoricTID ;
if ( version . creation_tid ! = expected_tid )
throw Exception ( ErrorCodes : : LOGICAL_ERROR ,
" CreationTID of part {} (table {}) is set to unexpected value {}, it's a bug. Current transaction: {} " ,
name , storage . getStorageID ( ) . getNameForLogs ( ) , version . creation_tid , txn ? txn - > dumpDescription ( ) : " <none> " ) ;
2022-02-15 15:00:45 +00:00
assert ( ! txn | | storage . supportsTransactions ( ) ) ;
2022-02-14 19:50:08 +00:00
assert ( ! txn | | volume - > getDisk ( ) - > exists ( fs : : path ( getFullRelativePath ( ) ) / TXN_VERSION_METADATA_FILE_NAME ) ) ;
}
2021-12-30 13:15:28 +00:00
void IMergeTreeDataPart : : storeVersionMetadata ( ) const
{
2022-02-17 21:26:37 +00:00
if ( ! wasInvolvedInTransaction ( ) )
2021-12-30 13:15:28 +00:00
return ;
2022-02-14 19:50:08 +00:00
LOG_TEST ( storage . log , " Writing version for {} (creation: {}, removal {}) " , name , version . creation_tid , version . removal_tid ) ;
2022-02-15 15:00:45 +00:00
assert ( storage . supportsTransactions ( ) ) ;
2022-02-14 19:50:08 +00:00
if ( ! isStoredOnDisk ( ) )
throw Exception ( ErrorCodes : : NOT_IMPLEMENTED , " Transactions are not supported for in-memory parts (table: {}, part: {}) " ,
storage . getStorageID ( ) . getNameForLogs ( ) , name ) ;
2021-12-30 13:15:28 +00:00
String version_file_name = fs : : path ( getFullRelativePath ( ) ) / TXN_VERSION_METADATA_FILE_NAME ;
String tmp_version_file_name = version_file_name + " .tmp " ;
DiskPtr disk = volume - > getDisk ( ) ;
{
2022-02-04 18:18:20 +00:00
/// TODO IDisk interface does not allow to open file with O_EXCL flag (for DiskLocal),
/// so we create empty file at first (expecting that createFile throws if file already exists)
2022-03-08 19:11:47 +00:00
/// and then overwrite it.
2022-02-04 18:18:20 +00:00
disk - > createFile ( tmp_version_file_name ) ;
auto out = disk - > writeFile ( tmp_version_file_name , 256 , WriteMode : : Rewrite ) ;
2022-01-28 17:47:37 +00:00
version . write ( * out ) ;
2021-12-30 13:15:28 +00:00
out - > finalize ( ) ;
out - > sync ( ) ;
}
SyncGuardPtr sync_guard ;
if ( storage . getSettings ( ) - > fsync_part_directory )
2022-02-04 18:18:20 +00:00
sync_guard = disk - > getDirectorySyncGuard ( getFullRelativePath ( ) ) ;
2022-03-07 16:35:47 +00:00
disk - > replaceFile ( tmp_version_file_name , version_file_name ) ;
2021-12-30 13:15:28 +00:00
}
2022-02-17 21:26:37 +00:00
void IMergeTreeDataPart : : appendCSNToVersionMetadata ( VersionMetadata : : WhichCSN which_csn ) const
{
assert ( ! version . creation_tid . isEmpty ( ) ) ;
assert ( ! ( which_csn = = VersionMetadata : : WhichCSN : : CREATION & & version . creation_tid . isPrehistoric ( ) ) ) ;
assert ( ! ( which_csn = = VersionMetadata : : WhichCSN : : CREATION & & version . creation_csn = = 0 ) ) ;
assert ( ! ( which_csn = = VersionMetadata : : WhichCSN : : REMOVAL & & ( version . removal_tid . isPrehistoric ( ) | | version . removal_tid . isEmpty ( ) ) ) ) ;
assert ( ! ( which_csn = = VersionMetadata : : WhichCSN : : REMOVAL & & version . removal_csn = = 0 ) ) ;
assert ( isStoredOnDisk ( ) ) ;
/// Small enough appends to file are usually atomic,
/// so we append new metadata instead of rewriting file to reduce number of fsyncs.
/// We don't need to do fsync when writing CSN, because in case of hard restart
/// we will be able to restore CSN from transaction log in Keeper.
String version_file_name = fs : : path ( getFullRelativePath ( ) ) / TXN_VERSION_METADATA_FILE_NAME ;
DiskPtr disk = volume - > getDisk ( ) ;
auto out = disk - > writeFile ( version_file_name , 256 , WriteMode : : Append ) ;
version . writeCSN ( * out , which_csn ) ;
out - > finalize ( ) ;
}
2022-03-08 19:11:47 +00:00
void IMergeTreeDataPart : : appendRemovalTIDToVersionMetadata ( bool clear ) const
{
assert ( ! version . creation_tid . isEmpty ( ) ) ;
assert ( version . removal_csn = = 0 ) ;
assert ( ! version . removal_tid . isEmpty ( ) ) ;
assert ( isStoredOnDisk ( ) ) ;
if ( version . creation_tid . isPrehistoric ( ) & & ! clear )
{
/// Metadata file probably does not exist, because it was not written on part creation, because it was created without a transaction.
/// Let's create it (if needed). Concurrent writes are not possible, because creation_csn is prehistoric and we own removal_tid_lock.
storeVersionMetadata ( ) ;
return ;
}
2022-03-16 19:16:26 +00:00
if ( clear )
LOG_TEST ( storage . log , " Clearing removal TID for {} (creation: {}, removal {}) " , name , version . creation_tid , version . removal_tid ) ;
else
LOG_TEST ( storage . log , " Appending removal TID for {} (creation: {}, removal {}) " , name , version . creation_tid , version . removal_tid ) ;
2022-03-08 19:11:47 +00:00
String version_file_name = fs : : path ( getFullRelativePath ( ) ) / TXN_VERSION_METADATA_FILE_NAME ;
DiskPtr disk = volume - > getDisk ( ) ;
auto out = disk - > writeFile ( version_file_name , 256 , WriteMode : : Append ) ;
version . writeRemovalTID ( * out , clear ) ;
out - > finalize ( ) ;
2022-03-18 11:01:26 +00:00
/// fsync is not required when we clearing removal TID, because after hard restart we will fix metadata
2022-03-08 19:11:47 +00:00
if ( ! clear )
out - > sync ( ) ;
}
2021-12-30 13:15:28 +00:00
void IMergeTreeDataPart : : loadVersionMetadata ( ) const
2022-01-19 18:29:31 +00:00
try
2021-12-30 13:15:28 +00:00
{
String version_file_name = fs : : path ( getFullRelativePath ( ) ) / TXN_VERSION_METADATA_FILE_NAME ;
String tmp_version_file_name = version_file_name + " .tmp " ;
DiskPtr disk = volume - > getDisk ( ) ;
auto remove_tmp_file = [ & ] ( )
{
auto last_modified = disk - > getLastModified ( tmp_version_file_name ) ;
auto buf = openForReading ( disk , tmp_version_file_name ) ;
String content ;
readStringUntilEOF ( content , * buf ) ;
LOG_WARNING ( storage . log , " Found file {} that was last modified on {}, has size {} and the following content: {} " ,
tmp_version_file_name , last_modified . epochTime ( ) , content . size ( ) , content ) ;
disk - > removeFile ( tmp_version_file_name ) ;
} ;
if ( disk - > exists ( version_file_name ) )
{
auto buf = openForReading ( disk , version_file_name ) ;
2022-01-28 17:47:37 +00:00
version . read ( * buf ) ;
2021-12-30 13:15:28 +00:00
if ( disk - > exists ( tmp_version_file_name ) )
remove_tmp_file ( ) ;
return ;
}
/// Four (?) cases are possible:
/// 1. Part was created without transactions.
/// 2. Version metadata file was not renamed from *.tmp on part creation.
2022-02-14 19:50:08 +00:00
/// 3. Version metadata were written to *.tmp file, but hard restart happened before fsync.
2021-12-30 13:15:28 +00:00
/// 4. Fsyncs in storeVersionMetadata() work incorrectly.
if ( ! disk - > exists ( tmp_version_file_name ) )
{
2022-02-14 19:50:08 +00:00
/// Case 1.
2021-12-30 13:15:28 +00:00
/// We do not have version metadata and transactions history for old parts,
/// so let's consider that such parts were created by some ancient transaction
/// and were committed with some prehistoric CSN.
2022-02-14 19:50:08 +00:00
/// NOTE It might be Case 3, but version metadata file is written on part creation before other files,
/// so it's not Case 3 if part is not broken.
version . setCreationTID ( Tx : : PrehistoricTID , nullptr ) ;
2022-01-28 17:47:37 +00:00
version . creation_csn = Tx : : PrehistoricCSN ;
2021-12-30 13:15:28 +00:00
return ;
}
/// Case 2.
/// Content of *.tmp file may be broken, just use fake TID.
/// Transaction was not committed if *.tmp file was not renamed, so we should complete rollback by removing part.
2022-02-14 19:50:08 +00:00
version . setCreationTID ( Tx : : DummyTID , nullptr ) ;
2022-01-28 17:47:37 +00:00
version . creation_csn = Tx : : RolledBackCSN ;
2021-12-30 13:15:28 +00:00
remove_tmp_file ( ) ;
}
2022-01-19 18:29:31 +00:00
catch ( Exception & e )
{
e . addMessage ( " While loading version metadata from table {} part {} " , storage . getStorageID ( ) . getNameForLogs ( ) , name ) ;
throw ;
}
2021-12-30 13:15:28 +00:00
2022-02-17 21:26:37 +00:00
bool IMergeTreeDataPart : : wasInvolvedInTransaction ( ) const
{
2022-02-23 22:31:21 +00:00
assert ( ! version . creation_tid . isEmpty ( ) | | ( state = = State : : Temporary /* && std::uncaught_exceptions() */ ) ) ;
2022-02-17 21:26:37 +00:00
bool created_by_transaction = ! version . creation_tid . isPrehistoric ( ) ;
bool removed_by_transaction = version . isRemovalTIDLocked ( ) & & version . removal_tid_lock ! = Tx : : PrehistoricTID . getHash ( ) ;
return created_by_transaction | | removed_by_transaction ;
}
bool IMergeTreeDataPart : : assertHasValidVersionMetadata ( ) const
{
/// We don't have many tests with server restarts and it's really inconvenient to write such tests.
/// So we use debug assertions to ensure that part version is written correctly.
2022-03-18 11:01:26 +00:00
/// This method is not supposed to be called in release builds.
2022-02-17 21:26:37 +00:00
2022-02-24 21:51:21 +00:00
if ( isProjectionPart ( ) )
return true ;
2022-02-17 21:26:37 +00:00
if ( ! wasInvolvedInTransaction ( ) )
return true ;
if ( ! isStoredOnDisk ( ) )
return false ;
2022-03-09 20:38:18 +00:00
if ( part_is_probably_removed_from_disk )
return true ;
2022-02-17 21:26:37 +00:00
DiskPtr disk = volume - > getDisk ( ) ;
if ( ! disk - > exists ( getFullRelativePath ( ) ) )
return true ;
String content ;
String version_file_name = fs : : path ( getFullRelativePath ( ) ) / TXN_VERSION_METADATA_FILE_NAME ;
try
{
auto buf = openForReading ( disk , version_file_name ) ;
readStringUntilEOF ( content , * buf ) ;
ReadBufferFromString str_buf { content } ;
VersionMetadata file ;
file . read ( str_buf ) ;
bool valid_creation_tid = version . creation_tid = = file . creation_tid ;
2022-03-08 19:11:47 +00:00
bool valid_removal_tid = version . removal_tid = = file . removal_tid | | version . removal_tid = = Tx : : PrehistoricTID ;
2022-02-17 21:26:37 +00:00
bool valid_creation_csn = version . creation_csn = = file . creation_csn | | version . creation_csn = = Tx : : RolledBackCSN ;
2022-03-08 19:11:47 +00:00
bool valid_removal_csn = version . removal_csn = = file . removal_csn | | version . removal_csn = = Tx : : PrehistoricCSN ;
2022-02-17 21:26:37 +00:00
if ( ! valid_creation_tid | | ! valid_removal_tid | | ! valid_creation_csn | | ! valid_removal_csn )
throw Exception ( ErrorCodes : : CORRUPTED_DATA , " Invalid version metadata file " ) ;
return true ;
}
catch ( . . . )
{
WriteBufferFromOwnString expected ;
version . write ( expected ) ;
tryLogCurrentException ( storage . log , fmt : : format ( " File {} contains: \n {} \n expected: \n {} " , version_file_name , content , expected . str ( ) ) ) ;
return false ;
}
}
2021-12-30 13:15:28 +00:00
2021-12-31 03:13:38 +00:00
void IMergeTreeDataPart : : appendFilesOfColumns ( Strings & files )
2021-12-08 02:40:59 +00:00
{
files . push_back ( " columns.txt " ) ;
2022-03-23 04:13:42 +00:00
files . push_back ( SERIALIZATION_FILE_NAME ) ;
2021-12-08 02:40:59 +00:00
}
2020-10-20 15:10:24 +00:00
bool IMergeTreeDataPart : : shallParticipateInMerges ( const StoragePolicyPtr & storage_policy ) const
{
/// `IMergeTreeDataPart::volume` describes space where current part belongs, and holds
/// `SingleDiskVolume` object which does not contain up-to-date settings of corresponding volume.
/// Therefore we shall obtain volume from storage policy.
auto volume_ptr = storage_policy - > getVolume ( storage_policy - > getVolumeIndexByDisk ( volume - > getDisk ( ) ) ) ;
return ! volume_ptr - > areMergesAvoided ( ) ;
}
2020-02-28 17:14:55 +00:00
UInt64 IMergeTreeDataPart : : calculateTotalSizeOnDisk ( const DiskPtr & disk_ , const String & from )
2019-10-10 16:30:30 +00:00
{
2020-02-28 17:14:55 +00:00
if ( disk_ - > isFile ( from ) )
return disk_ - > getFileSize ( from ) ;
2019-10-10 16:30:30 +00:00
std : : vector < std : : string > files ;
2020-02-28 17:14:55 +00:00
disk_ - > listFiles ( from , files ) ;
2019-10-10 16:30:30 +00:00
UInt64 res = 0 ;
for ( const auto & file : files )
2021-05-05 15:10:14 +00:00
res + = calculateTotalSizeOnDisk ( disk_ , fs : : path ( from ) / file ) ;
2019-10-10 16:30:30 +00:00
return res ;
}
void IMergeTreeDataPart : : renameTo ( const String & new_relative_path , bool remove_new_dir_if_exists ) const
2022-01-31 20:47:04 +00:00
try
2019-10-10 16:30:30 +00:00
{
2019-10-31 14:44:17 +00:00
assertOnDisk ( ) ;
2020-02-27 16:47:40 +00:00
String from = getFullRelativePath ( ) ;
2021-05-12 06:53:04 +00:00
String to = fs : : path ( storage . relative_data_path ) / ( parent_part ? parent_part - > relative_path : " " ) / new_relative_path / " " ;
2019-10-10 16:30:30 +00:00
2020-05-09 21:24:15 +00:00
if ( ! volume - > getDisk ( ) - > exists ( from ) )
2021-01-07 12:29:34 +00:00
throw Exception ( " Part directory " + fullPath ( volume - > getDisk ( ) , from ) + " doesn't exist. Most likely it is a logical error. " , ErrorCodes : : FILE_DOESNT_EXIST ) ;
2019-10-10 16:30:30 +00:00
2020-05-09 21:24:15 +00:00
if ( volume - > getDisk ( ) - > exists ( to ) )
2019-10-10 16:30:30 +00:00
{
if ( remove_new_dir_if_exists )
{
Names files ;
2020-05-09 21:24:15 +00:00
volume - > getDisk ( ) - > listFiles ( to , files ) ;
2019-10-10 16:30:30 +00:00
2020-05-23 22:24:01 +00:00
LOG_WARNING ( storage . log , " Part directory {} already exists and contains {} files. Removing it. " , fullPath ( volume - > getDisk ( ) , to ) , files . size ( ) ) ;
2019-10-10 16:30:30 +00:00
2020-05-09 21:24:15 +00:00
volume - > getDisk ( ) - > removeRecursive ( to ) ;
2019-10-10 16:30:30 +00:00
}
else
{
2020-05-09 21:24:15 +00:00
throw Exception ( " Part directory " + fullPath ( volume - > getDisk ( ) , to ) + " already exists " , ErrorCodes : : DIRECTORY_ALREADY_EXISTS ) ;
2019-10-10 16:30:30 +00:00
}
}
2022-01-05 11:51:50 +00:00
metadata_manager - > deleteAll ( true ) ;
metadata_manager - > assertAllDeleted ( true ) ;
2020-05-09 21:24:15 +00:00
volume - > getDisk ( ) - > setLastModified ( from , Poco : : Timestamp : : fromEpochTime ( time ( nullptr ) ) ) ;
2021-03-23 10:33:07 +00:00
volume - > getDisk ( ) - > moveDirectory ( from , to ) ;
2019-10-10 16:30:30 +00:00
relative_path = new_relative_path ;
2022-01-05 11:51:50 +00:00
metadata_manager - > updateAll ( true ) ;
2021-01-07 16:26:53 +00:00
2021-01-26 13:29:45 +00:00
SyncGuardPtr sync_guard ;
2021-01-07 16:26:53 +00:00
if ( storage . getSettings ( ) - > fsync_part_directory )
2021-01-26 13:29:45 +00:00
sync_guard = volume - > getDisk ( ) - > getDirectorySyncGuard ( to ) ;
2019-10-10 16:30:30 +00:00
}
2022-01-31 20:47:04 +00:00
catch ( . . . )
{
if ( startsWith ( new_relative_path , " detached/ " ) )
{
// Don't throw when the destination is to the detached folder. It might be able to
// recover in some cases, such as fetching parts into multi-disks while some of the
// disks are broken.
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
else
throw ;
}
2019-10-10 16:30:30 +00:00
2022-04-18 23:09:09 +00:00
std : : pair < bool , NameSet > IMergeTreeDataPart : : canRemovePart ( ) const
2021-06-08 19:11:22 +00:00
{
2021-07-12 02:56:49 +00:00
/// NOTE: It's needed for zero-copy replication
2021-06-09 12:36:47 +00:00
if ( force_keep_shared_data )
2022-04-18 23:09:09 +00:00
return std : : make_pair ( false , NameSet { } ) ;
2019-12-03 14:33:56 +00:00
2022-04-15 16:36:23 +00:00
return storage . unlockSharedData ( * this ) ;
2021-06-08 19:11:22 +00:00
}
2022-01-05 11:51:50 +00:00
void IMergeTreeDataPart : : initializePartMetadataManager ( )
{
# if USE_ROCKSDB
if ( use_metadata_cache )
metadata_manager = std : : make_shared < PartMetadataManagerWithCache > ( this , storage . getContext ( ) - > getMergeTreeMetadataCache ( ) ) ;
else
metadata_manager = std : : make_shared < PartMetadataManagerOrdinary > ( this ) ;
# else
metadata_manager = std : : make_shared < PartMetadataManagerOrdinary > ( this ) ;
# endif
}
2021-06-09 12:36:47 +00:00
void IMergeTreeDataPart : : remove ( ) const
2019-10-31 14:44:17 +00:00
{
2022-02-17 21:26:37 +00:00
assert ( assertHasValidVersionMetadata ( ) ) ;
2022-03-09 20:38:18 +00:00
part_is_probably_removed_from_disk = true ;
2022-04-15 16:36:23 +00:00
auto [ can_remove , files_not_to_remove ] = canRemovePart ( ) ;
2021-06-09 12:36:47 +00:00
2019-10-31 14:44:17 +00:00
if ( ! isStoredOnDisk ( ) )
return ;
if ( relative_path . empty ( ) )
throw Exception ( " Part relative_path cannot be empty. This is bug. " , ErrorCodes : : LOGICAL_ERROR ) ;
2021-02-10 14:12:49 +00:00
if ( isProjectionPart ( ) )
{
LOG_WARNING ( storage . log , " Projection part {} should be removed by its parent {}. " , name , parent_part - > name ) ;
2022-04-15 16:36:23 +00:00
projectionRemove ( parent_part - > getFullRelativePath ( ) , ! can_remove ) ;
2021-02-10 14:12:49 +00:00
return ;
}
2022-01-05 11:51:50 +00:00
metadata_manager - > deleteAll ( false ) ;
metadata_manager - > assertAllDeleted ( false ) ;
2021-12-08 02:40:59 +00:00
2019-10-31 14:44:17 +00:00
/** Atomic directory removal:
* - rename directory to temporary name ;
* - remove it recursive .
*
* For temporary name we use " delete_tmp_ " prefix .
*
* NOTE : We cannot use " tmp_delete_ " prefix , because there is a second thread ,
* that calls " clearOldTemporaryDirectories " and removes all directories , that begin with " tmp_ " and are old enough .
* But when we removing data part , it can be old enough . And rename doesn ' t change mtime .
* And a race condition can happen that will lead to " File not found " error here .
*/
2021-12-10 13:29:51 +00:00
/// NOTE We rename part to delete_tmp_<relative_path> instead of delete_tmp_<name> to avoid race condition
/// when we try to remove two parts with the same name, but different relative paths,
/// for example all_1_2_1 (in Deleting state) and tmp_merge_all_1_2_1 (in Temporary state).
2021-05-05 15:10:14 +00:00
fs : : path from = fs : : path ( storage . relative_data_path ) / relative_path ;
2021-12-10 13:29:51 +00:00
fs : : path to = fs : : path ( storage . relative_data_path ) / ( " delete_tmp_ " + relative_path ) ;
2019-10-31 14:44:17 +00:00
// TODO directory delete_tmp_<name> is never removed if server crashes before returning from this function
2021-02-10 14:12:49 +00:00
auto disk = volume - > getDisk ( ) ;
if ( disk - > exists ( to ) )
2019-10-31 14:44:17 +00:00
{
2021-12-10 13:29:51 +00:00
LOG_WARNING ( storage . log , " Directory {} (to which part must be renamed before removing) already exists. Most likely this is due to unclean restart or race condition. Removing it. " , fullPath ( disk , to ) ) ;
2019-10-31 14:44:17 +00:00
try
{
2022-04-18 23:09:09 +00:00
disk - > removeSharedRecursive ( fs : : path ( to ) / " " , ! can_remove , files_not_to_remove ) ;
2019-10-31 14:44:17 +00:00
}
catch ( . . . )
{
2021-02-10 14:12:49 +00:00
LOG_ERROR ( storage . log , " Cannot recursively remove directory {}. Exception: {} " , fullPath ( disk , to ) , getCurrentExceptionMessage ( false ) ) ;
2019-10-31 14:44:17 +00:00
throw ;
}
}
try
{
2021-02-10 14:12:49 +00:00
disk - > moveDirectory ( from , to ) ;
2019-10-31 14:44:17 +00:00
}
2021-05-07 21:53:44 +00:00
catch ( const fs : : filesystem_error & e )
2019-10-31 14:44:17 +00:00
{
2021-05-07 21:53:44 +00:00
if ( e . code ( ) = = std : : errc : : no_such_file_or_directory )
{
2022-04-22 10:30:40 +00:00
LOG_ERROR ( storage . log , " Directory {} (part to remove) doesn't exist or one of nested files has gone. Most likely this is due to manual removing. This should be discouraged. Ignoring. " , fullPath ( disk , from ) ) ;
2021-05-07 21:53:44 +00:00
return ;
}
throw ;
2019-10-31 14:44:17 +00:00
}
2021-02-10 14:12:49 +00:00
// Record existing projection directories so we don't remove them twice
std : : unordered_set < String > projection_directories ;
for ( const auto & [ p_name , projection_part ] : projection_parts )
{
2022-04-19 12:01:30 +00:00
/// NOTE: projections currently unsupported with zero copy replication.
/// TODO: fix it.
2022-04-15 16:36:23 +00:00
projection_part - > projectionRemove ( to , ! can_remove ) ;
2021-02-10 14:12:49 +00:00
projection_directories . emplace ( p_name + " .proj " ) ;
}
2020-04-13 00:42:23 +00:00
if ( checksums . empty ( ) )
2019-10-31 14:44:17 +00:00
{
2020-04-13 00:42:23 +00:00
/// If the part is not completely written, we cannot use fast path by listing files.
2022-04-18 23:09:09 +00:00
disk - > removeSharedRecursive ( fs : : path ( to ) / " " , ! can_remove , files_not_to_remove ) ;
2019-10-31 14:44:17 +00:00
}
2020-04-13 00:42:23 +00:00
else
2019-10-31 14:44:17 +00:00
{
2020-04-13 00:42:23 +00:00
try
{
/// Remove each expected file in directory, then remove directory itself.
2022-02-01 10:36:51 +00:00
IDisk : : RemoveBatchRequest request ;
2020-04-13 00:42:23 +00:00
2021-04-18 09:17:02 +00:00
# if !defined(__clang__)
2020-04-13 00:42:23 +00:00
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wunused-variable"
# endif
for ( const auto & [ file , _ ] : checksums . files )
2021-02-10 14:12:49 +00:00
{
if ( projection_directories . find ( file ) = = projection_directories . end ( ) )
2022-02-01 10:36:51 +00:00
request . emplace_back ( fs : : path ( to ) / file ) ;
2021-02-10 14:12:49 +00:00
}
2021-04-18 09:17:02 +00:00
# if !defined(__clang__)
2020-04-13 00:42:23 +00:00
# pragma GCC diagnostic pop
# endif
for ( const auto & file : { " checksums.txt " , " columns.txt " } )
2022-02-01 10:36:51 +00:00
request . emplace_back ( fs : : path ( to ) / file ) ;
2020-08-26 15:29:46 +00:00
2022-02-01 10:36:51 +00:00
request . emplace_back ( fs : : path ( to ) / DEFAULT_COMPRESSION_CODEC_FILE_NAME , true ) ;
request . emplace_back ( fs : : path ( to ) / DELETE_ON_DESTROY_MARKER_FILE_NAME , true ) ;
2022-02-06 20:36:08 +00:00
request . emplace_back ( fs : : path ( to ) / TXN_VERSION_METADATA_FILE_NAME , true ) ;
2020-04-13 00:42:23 +00:00
2022-04-18 23:09:09 +00:00
disk - > removeSharedFiles ( request , ! can_remove , files_not_to_remove ) ;
2021-02-10 14:12:49 +00:00
disk - > removeDirectory ( to ) ;
2020-04-13 00:42:23 +00:00
}
catch ( . . . )
{
/// Recursive directory removal does many excessive "stat" syscalls under the hood.
2021-02-10 14:12:49 +00:00
LOG_ERROR ( storage . log , " Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: {} " , fullPath ( disk , to ) , getCurrentExceptionMessage ( false ) ) ;
2019-10-31 14:44:17 +00:00
2022-04-18 23:09:09 +00:00
disk - > removeSharedRecursive ( fs : : path ( to ) / " " , ! can_remove , files_not_to_remove ) ;
2020-04-13 00:42:23 +00:00
}
2019-10-31 14:44:17 +00:00
}
}
2021-02-10 14:12:49 +00:00
2021-06-11 12:41:48 +00:00
void IMergeTreeDataPart : : projectionRemove ( const String & parent_to , bool keep_shared_data ) const
2021-02-10 14:12:49 +00:00
{
2022-01-05 11:51:50 +00:00
metadata_manager - > deleteAll ( false ) ;
metadata_manager - > assertAllDeleted ( false ) ;
2021-12-08 02:40:59 +00:00
2022-01-19 12:27:11 +00:00
String to = fs : : path ( parent_to ) / relative_path ;
2021-02-10 14:12:49 +00:00
auto disk = volume - > getDisk ( ) ;
if ( checksums . empty ( ) )
{
LOG_ERROR (
storage . log ,
" Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: checksums.txt is missing " ,
fullPath ( disk , to ) ) ;
/// If the part is not completely written, we cannot use fast path by listing files.
2022-04-18 23:09:09 +00:00
disk - > removeSharedRecursive ( fs : : path ( to ) / " " , keep_shared_data , { } ) ;
2021-02-10 14:12:49 +00:00
}
else
{
try
{
/// Remove each expected file in directory, then remove directory itself.
2022-02-01 10:36:51 +00:00
IDisk : : RemoveBatchRequest request ;
2021-02-10 14:12:49 +00:00
# if !defined(__clang__)
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wunused-variable"
# endif
for ( const auto & [ file , _ ] : checksums . files )
2022-02-01 10:36:51 +00:00
request . emplace_back ( fs : : path ( to ) / file ) ;
2021-02-10 14:12:49 +00:00
# if !defined(__clang__)
# pragma GCC diagnostic pop
# endif
for ( const auto & file : { " checksums.txt " , " columns.txt " } )
2022-02-01 10:36:51 +00:00
request . emplace_back ( fs : : path ( to ) / file ) ;
request . emplace_back ( fs : : path ( to ) / DEFAULT_COMPRESSION_CODEC_FILE_NAME , true ) ;
request . emplace_back ( fs : : path ( to ) / DELETE_ON_DESTROY_MARKER_FILE_NAME , true ) ;
2021-02-10 14:12:49 +00:00
2022-04-18 23:09:09 +00:00
disk - > removeSharedFiles ( request , keep_shared_data , { } ) ;
disk - > removeSharedRecursive ( to , keep_shared_data , { } ) ;
2021-02-10 14:12:49 +00:00
}
catch ( . . . )
{
/// Recursive directory removal does many excessive "stat" syscalls under the hood.
LOG_ERROR ( storage . log , " Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: {} " , fullPath ( disk , to ) , getCurrentExceptionMessage ( false ) ) ;
2022-04-18 23:09:09 +00:00
disk - > removeSharedRecursive ( fs : : path ( to ) / " " , keep_shared_data , { } ) ;
2021-02-10 14:12:49 +00:00
}
}
}
2021-12-01 14:24:26 +00:00
String IMergeTreeDataPart : : getRelativePathForPrefix ( const String & prefix , bool detached ) const
2019-10-10 16:30:30 +00:00
{
String res ;
/** If you need to detach a part, and directory into which we want to rename it already exists,
* we will rename to the directory with the name to which the suffix is added in the form of " _tryN " .
* This is done only in the case of ` to_detached ` , because it is assumed that in this case the exact name does not matter .
* No more than 10 attempts are made so that there are not too many junk directories left .
*/
2021-12-01 14:24:26 +00:00
auto full_relative_path = fs : : path ( storage . relative_data_path ) ;
if ( detached )
full_relative_path / = " detached " ;
if ( detached & & parent_part )
2021-12-01 15:00:40 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Cannot detach projection " ) ;
2021-12-01 14:24:26 +00:00
else if ( parent_part )
full_relative_path / = parent_part - > relative_path ;
2021-12-20 12:55:07 +00:00
for ( int try_no = 0 ; try_no < 10 ; + + try_no )
2019-10-10 16:30:30 +00:00
{
2020-06-03 09:51:23 +00:00
res = ( prefix . empty ( ) ? " " : prefix + " _ " ) + name + ( try_no ? " _try " + DB : : toString ( try_no ) : " " ) ;
2019-10-10 16:30:30 +00:00
2021-12-01 14:24:26 +00:00
if ( ! volume - > getDisk ( ) - > exists ( full_relative_path / res ) )
2019-10-10 16:30:30 +00:00
return res ;
2020-05-23 22:24:01 +00:00
LOG_WARNING ( storage . log , " Directory {} (to detach to) already exists. Will detach to directory with '_tryN' suffix. " , res ) ;
2019-10-10 16:30:30 +00:00
}
return res ;
}
2020-06-03 09:51:23 +00:00
String IMergeTreeDataPart : : getRelativePathForDetachedPart ( const String & prefix ) const
2019-10-10 16:30:30 +00:00
{
2020-06-03 22:00:02 +00:00
/// Do not allow underscores in the prefix because they are used as separators.
assert ( prefix . find_first_of ( ' _ ' ) = = String : : npos ) ;
2021-08-04 14:42:48 +00:00
assert ( prefix . empty ( ) | | std : : find ( DetachedPartInfo : : DETACH_REASONS . begin ( ) ,
DetachedPartInfo : : DETACH_REASONS . end ( ) ,
prefix ) ! = DetachedPartInfo : : DETACH_REASONS . end ( ) ) ;
2021-12-01 14:24:26 +00:00
return " detached/ " + getRelativePathForPrefix ( prefix , /* detached */ true ) ;
2019-10-10 16:30:30 +00:00
}
void IMergeTreeDataPart : : renameToDetached ( const String & prefix ) const
{
2020-06-03 22:00:02 +00:00
renameTo ( getRelativePathForDetachedPart ( prefix ) , true ) ;
2019-10-10 16:30:30 +00:00
}
2020-06-26 11:30:23 +00:00
void IMergeTreeDataPart : : makeCloneInDetached ( const String & prefix , const StorageMetadataPtr & /*metadata_snapshot*/ ) const
2019-10-10 16:30:30 +00:00
{
2021-05-05 15:10:14 +00:00
String destination_path = fs : : path ( storage . relative_data_path ) / getRelativePathForDetachedPart ( prefix ) ;
2021-12-01 15:17:31 +00:00
localBackup ( volume - > getDisk ( ) , getFullRelativePath ( ) , destination_path ) ;
2021-05-05 15:10:14 +00:00
volume - > getDisk ( ) - > removeFileIfExists ( fs : : path ( destination_path ) / DELETE_ON_DESTROY_MARKER_FILE_NAME ) ;
2019-10-10 16:30:30 +00:00
}
2020-10-15 13:55:13 +00:00
void IMergeTreeDataPart : : makeCloneOnDisk ( const DiskPtr & disk , const String & directory_name ) const
2019-10-10 16:30:30 +00:00
{
assertOnDisk ( ) ;
2020-10-15 13:55:13 +00:00
if ( disk - > getName ( ) = = volume - > getDisk ( ) - > getName ( ) )
2020-05-09 21:24:15 +00:00
throw Exception ( " Can not clone data part " + name + " to same disk " + volume - > getDisk ( ) - > getName ( ) , ErrorCodes : : LOGICAL_ERROR ) ;
2020-10-15 13:55:13 +00:00
if ( directory_name . empty ( ) )
throw Exception ( " Can not clone data part " + name + " to empty directory. " , ErrorCodes : : LOGICAL_ERROR ) ;
2019-10-10 16:30:30 +00:00
2021-05-05 15:10:14 +00:00
String path_to_clone = fs : : path ( storage . relative_data_path ) / directory_name / " " ;
2019-10-10 16:30:30 +00:00
2021-05-05 15:10:14 +00:00
if ( disk - > exists ( fs : : path ( path_to_clone ) / relative_path ) )
2020-10-07 11:35:28 +00:00
{
2022-02-01 09:52:02 +00:00
LOG_WARNING ( storage . log , " Path {} already exists. Will remove it and clone again. " , fullPath ( disk , path_to_clone + relative_path ) ) ;
2021-05-05 15:10:14 +00:00
disk - > removeRecursive ( fs : : path ( path_to_clone ) / relative_path / " " ) ;
2020-10-07 11:35:28 +00:00
}
2020-10-15 13:55:13 +00:00
disk - > createDirectories ( path_to_clone ) ;
volume - > getDisk ( ) - > copy ( getFullRelativePath ( ) , disk , path_to_clone ) ;
2021-05-05 15:10:14 +00:00
volume - > getDisk ( ) - > removeFileIfExists ( fs : : path ( path_to_clone ) / DELETE_ON_DESTROY_MARKER_FILE_NAME ) ;
2019-10-10 16:30:30 +00:00
}
2020-01-15 19:16:56 +00:00
void IMergeTreeDataPart : : checkConsistencyBase ( ) const
{
2020-02-27 16:47:40 +00:00
String path = getFullRelativePath ( ) ;
2020-01-15 19:16:56 +00:00
2020-06-17 10:34:23 +00:00
auto metadata_snapshot = storage . getInMemoryMetadataPtr ( ) ;
2021-02-10 14:12:49 +00:00
if ( parent_part )
metadata_snapshot = metadata_snapshot - > projections . get ( name ) . metadata ;
else
{
// No need to check projections here because we already did consistent checking when loading projections if necessary.
}
2020-06-17 12:39:20 +00:00
const auto & pk = metadata_snapshot - > getPrimaryKey ( ) ;
2021-03-02 10:33:54 +00:00
const auto & partition_key = metadata_snapshot - > getPartitionKey ( ) ;
2020-01-15 19:16:56 +00:00
if ( ! checksums . empty ( ) )
{
2022-04-18 10:18:43 +00:00
if ( ! pk . column_names . empty ( ) & & ! checksums . files . contains ( " primary.idx " ) )
2020-01-15 19:16:56 +00:00
throw Exception ( " No checksum for primary.idx " , ErrorCodes : : NO_FILE_IN_DATA_PART ) ;
if ( storage . format_version > = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING )
{
2022-04-18 10:18:43 +00:00
if ( ! checksums . files . contains ( " count.txt " ) )
2020-01-15 19:16:56 +00:00
throw Exception ( " No checksum for count.txt " , ErrorCodes : : NO_FILE_IN_DATA_PART ) ;
2022-04-18 10:18:43 +00:00
if ( metadata_snapshot - > hasPartitionKey ( ) & & ! checksums . files . contains ( " partition.dat " ) )
2020-01-15 19:16:56 +00:00
throw Exception ( " No checksum for partition.dat " , ErrorCodes : : NO_FILE_IN_DATA_PART ) ;
2021-02-10 14:12:49 +00:00
if ( ! isEmpty ( ) & & ! parent_part )
2020-01-15 19:16:56 +00:00
{
2021-03-02 10:33:54 +00:00
for ( const String & col_name : storage . getMinMaxColumnsNames ( partition_key ) )
2020-01-15 19:16:56 +00:00
{
2022-04-18 10:18:43 +00:00
if ( ! checksums . files . contains ( " minmax_ " + escapeForFileName ( col_name ) + " .idx " ) )
2020-01-15 19:16:56 +00:00
throw Exception ( " No minmax idx file checksum for column " + col_name , ErrorCodes : : NO_FILE_IN_DATA_PART ) ;
}
}
}
2020-05-09 21:24:15 +00:00
checksums . checkSizes ( volume - > getDisk ( ) , path ) ;
2020-01-15 19:16:56 +00:00
}
else
{
2020-02-27 17:57:49 +00:00
auto check_file_not_empty = [ & path ] ( const DiskPtr & disk_ , const String & file_path )
{
2020-02-27 16:47:40 +00:00
UInt64 file_size ;
if ( ! disk_ - > exists ( file_path ) | | ( file_size = disk_ - > getFileSize ( file_path ) ) = = 0 )
throw Exception ( " Part " + fullPath ( disk_ , path ) + " is broken: " + fullPath ( disk_ , file_path ) + " is empty " , ErrorCodes : : BAD_SIZE_OF_FILE_IN_DATA_PART ) ;
return file_size ;
2020-01-15 19:16:56 +00:00
} ;
/// Check that the primary key index is not empty.
2020-05-26 13:46:19 +00:00
if ( ! pk . column_names . empty ( ) )
2021-05-05 15:10:14 +00:00
check_file_not_empty ( volume - > getDisk ( ) , fs : : path ( path ) / " primary.idx " ) ;
2020-01-15 19:16:56 +00:00
if ( storage . format_version > = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING )
{
2021-05-05 15:10:14 +00:00
check_file_not_empty ( volume - > getDisk ( ) , fs : : path ( path ) / " count.txt " ) ;
2020-01-15 19:16:56 +00:00
2020-06-17 10:34:23 +00:00
if ( metadata_snapshot - > hasPartitionKey ( ) )
2021-05-05 15:10:14 +00:00
check_file_not_empty ( volume - > getDisk ( ) , fs : : path ( path ) / " partition.dat " ) ;
2020-01-15 19:16:56 +00:00
2021-02-10 14:12:49 +00:00
if ( ! parent_part )
{
for ( const String & col_name : storage . getMinMaxColumnsNames ( partition_key ) )
2021-05-12 06:53:04 +00:00
check_file_not_empty ( volume - > getDisk ( ) , fs : : path ( path ) / ( " minmax_ " + escapeForFileName ( col_name ) + " .idx " ) ) ;
2021-02-10 14:12:49 +00:00
}
2020-01-15 19:16:56 +00:00
}
}
}
2020-06-03 18:59:18 +00:00
void IMergeTreeDataPart : : checkConsistency ( bool /* require_part_metadata */ ) const
{
throw Exception ( " Method 'checkConsistency' is not implemented for part with type " + getType ( ) . toString ( ) , ErrorCodes : : NOT_IMPLEMENTED ) ;
}
2021-10-08 13:13:56 +00:00
void IMergeTreeDataPart : : calculateColumnsAndSecondaryIndicesSizesOnDisk ( )
{
calculateColumnsSizesOnDisk ( ) ;
calculateSecondaryIndicesSizesOnDisk ( ) ;
}
2020-03-23 12:19:43 +00:00
void IMergeTreeDataPart : : calculateColumnsSizesOnDisk ( )
{
if ( getColumns ( ) . empty ( ) | | checksums . empty ( ) )
throw Exception ( " Cannot calculate columns sizes when columns or checksums are not initialized " , ErrorCodes : : LOGICAL_ERROR ) ;
2020-06-29 20:36:18 +00:00
calculateEachColumnSizes ( columns_sizes , total_columns_size ) ;
2020-03-23 12:19:43 +00:00
}
2021-10-08 13:13:56 +00:00
void IMergeTreeDataPart : : calculateSecondaryIndicesSizesOnDisk ( )
{
if ( checksums . empty ( ) )
throw Exception ( " Cannot calculate secondary indexes sizes when columns or checksums are not initialized " , ErrorCodes : : LOGICAL_ERROR ) ;
auto secondary_indices_descriptions = storage . getInMemoryMetadataPtr ( ) - > secondary_indices ;
for ( auto & index_description : secondary_indices_descriptions )
{
ColumnSize index_size ;
auto index_ptr = MergeTreeIndexFactory : : instance ( ) . get ( index_description ) ;
auto index_name = index_ptr - > getFileName ( ) ;
auto index_name_escaped = escapeForFileName ( index_name ) ;
auto index_file_name = index_name_escaped + index_ptr - > getSerializedFileExtension ( ) ;
auto index_marks_file_name = index_name_escaped + index_granularity_info . marks_file_extension ;
2021-10-11 11:00:10 +00:00
/// If part does not contain index
2021-10-08 13:13:56 +00:00
auto bin_checksum = checksums . files . find ( index_file_name ) ;
if ( bin_checksum ! = checksums . files . end ( ) )
{
index_size . data_compressed = bin_checksum - > second . file_size ;
index_size . data_uncompressed = bin_checksum - > second . uncompressed_size ;
}
auto mrk_checksum = checksums . files . find ( index_marks_file_name ) ;
if ( mrk_checksum ! = checksums . files . end ( ) )
index_size . marks = mrk_checksum - > second . file_size ;
total_secondary_indices_size . add ( index_size ) ;
secondary_index_sizes [ index_description . name ] = index_size ;
}
}
2021-12-09 10:39:28 +00:00
ColumnSize IMergeTreeDataPart : : getColumnSize ( const String & column_name ) const
2020-03-23 12:19:43 +00:00
{
/// For some types of parts columns_size maybe not calculated
auto it = columns_sizes . find ( column_name ) ;
if ( it ! = columns_sizes . end ( ) )
return it - > second ;
return ColumnSize { } ;
}
2021-10-08 13:13:56 +00:00
IndexSize IMergeTreeDataPart : : getSecondaryIndexSize ( const String & secondary_index_name ) const
{
auto it = secondary_index_sizes . find ( secondary_index_name ) ;
if ( it ! = secondary_index_sizes . end ( ) )
return it - > second ;
return ColumnSize { } ;
}
2020-03-23 12:19:43 +00:00
void IMergeTreeDataPart : : accumulateColumnSizes ( ColumnToSize & column_to_size ) const
{
2020-03-23 15:43:20 +00:00
for ( const auto & [ column_name , size ] : columns_sizes )
column_to_size [ column_name ] = size . data_compressed ;
2020-03-23 12:19:43 +00:00
}
2020-09-03 08:59:41 +00:00
bool IMergeTreeDataPart : : checkAllTTLCalculated ( const StorageMetadataPtr & metadata_snapshot ) const
{
if ( ! metadata_snapshot - > hasAnyTTL ( ) )
return false ;
if ( metadata_snapshot - > hasRowsTTL ( ) )
{
if ( isEmpty ( ) ) /// All rows were finally deleted and we don't store TTL
return true ;
else if ( ttl_infos . table_ttl . min = = 0 )
return false ;
}
for ( const auto & [ column , desc ] : metadata_snapshot - > getColumnTTLs ( ) )
{
/// Part has this column, but we don't calculated TTL for it
2022-04-18 10:18:43 +00:00
if ( ! ttl_infos . columns_ttl . contains ( column ) & & getColumns ( ) . contains ( column ) )
2020-09-03 08:59:41 +00:00
return false ;
}
for ( const auto & move_desc : metadata_snapshot - > getMoveTTLs ( ) )
{
/// Move TTL is not calculated
2022-04-18 10:18:43 +00:00
if ( ! ttl_infos . moves_ttl . contains ( move_desc . result_column ) )
2020-09-03 08:59:41 +00:00
return false ;
}
2020-12-25 14:52:46 +00:00
for ( const auto & group_by_desc : metadata_snapshot - > getGroupByTTLs ( ) )
{
2022-04-18 10:18:43 +00:00
if ( ! ttl_infos . group_by_ttl . contains ( group_by_desc . result_column ) )
2020-12-25 14:52:46 +00:00
return false ;
}
2021-01-13 14:04:27 +00:00
for ( const auto & rows_where_desc : metadata_snapshot - > getRowsWhereTTLs ( ) )
2021-01-11 23:07:21 +00:00
{
2022-04-18 10:18:43 +00:00
if ( ! ttl_infos . rows_where_ttl . contains ( rows_where_desc . result_column ) )
2021-01-11 23:07:21 +00:00
return false ;
}
2020-09-03 08:59:41 +00:00
return true ;
}
2020-11-03 08:58:26 +00:00
String IMergeTreeDataPart : : getUniqueId ( ) const
2020-10-09 14:24:10 +00:00
{
2020-11-03 08:58:26 +00:00
auto disk = volume - > getDisk ( ) ;
2021-07-05 03:32:56 +00:00
if ( ! disk - > supportZeroCopyReplication ( ) )
throw Exception ( fmt : : format ( " Disk {} doesn't support zero-copy replication " , disk - > getName ( ) ) , ErrorCodes : : LOGICAL_ERROR ) ;
2020-10-09 14:24:10 +00:00
2022-02-02 16:40:21 +00:00
return disk - > getUniqueId ( fs : : path ( getFullRelativePath ( ) ) / FILE_FOR_REFERENCES_CHECK ) ;
2020-11-03 08:58:26 +00:00
}
2022-02-02 16:44:29 +00:00
String IMergeTreeDataPart : : getZeroLevelPartBlockID ( std : : string_view token ) const
2021-03-31 15:20:30 +00:00
{
if ( info . level ! = 0 )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Trying to get block id for non zero level part {} " , name ) ;
SipHash hash ;
insert_deduplication_token setting for INSERT statement
The setting allows a user to provide own deduplication semantic in Replicated*MergeTree
If provided, it's used instead of data digest to generate block ID
So, for example, by providing a unique value for the setting in each INSERT statement,
user can avoid the same inserted data being deduplicated
Inserting data within the same INSERT statement are split into blocks
according to the *insert_block_size* settings
(max_insert_block_size, min_insert_block_size_rows, min_insert_block_size_bytes).
Each block with the same INSERT statement will get an ordinal number.
The ordinal number is added to insert_deduplication_token to get block dedup token
i.e. <token>_0, <token>_1, ... Deduplication is done per block
So, to guarantee deduplication for two same INSERT queries,
dedup token and number of blocks to have to be the same
Issue: #7461
2021-11-21 20:39:42 +00:00
if ( token . empty ( ) )
{
checksums . computeTotalChecksumDataOnly ( hash ) ;
}
else
{
2022-01-03 23:04:56 +00:00
hash . update ( token . data ( ) , token . size ( ) ) ;
insert_deduplication_token setting for INSERT statement
The setting allows a user to provide own deduplication semantic in Replicated*MergeTree
If provided, it's used instead of data digest to generate block ID
So, for example, by providing a unique value for the setting in each INSERT statement,
user can avoid the same inserted data being deduplicated
Inserting data within the same INSERT statement are split into blocks
according to the *insert_block_size* settings
(max_insert_block_size, min_insert_block_size_rows, min_insert_block_size_bytes).
Each block with the same INSERT statement will get an ordinal number.
The ordinal number is added to insert_deduplication_token to get block dedup token
i.e. <token>_0, <token>_1, ... Deduplication is done per block
So, to guarantee deduplication for two same INSERT queries,
dedup token and number of blocks to have to be the same
Issue: #7461
2021-11-21 20:39:42 +00:00
}
2021-03-31 15:20:30 +00:00
union
{
char bytes [ 16 ] ;
UInt64 words [ 2 ] ;
} hash_value ;
hash . get128 ( hash_value . bytes ) ;
return info . partition_id + " _ " + toString ( hash_value . words [ 0 ] ) + " _ " + toString ( hash_value . words [ 1 ] ) ;
}
2021-12-08 02:40:59 +00:00
IMergeTreeDataPart : : uint128 IMergeTreeDataPart : : getActualChecksumByFile ( const String & file_path ) const
{
2021-12-28 10:06:13 +00:00
assert ( use_metadata_cache ) ;
2021-12-08 02:40:59 +00:00
String file_name = std : : filesystem : : path ( file_path ) . filename ( ) ;
const auto filenames_without_checksums = getFileNamesWithoutChecksums ( ) ;
auto it = checksums . files . find ( file_name ) ;
2022-04-18 10:18:43 +00:00
if ( ! filenames_without_checksums . contains ( file_name ) & & it ! = checksums . files . end ( ) )
2021-12-08 02:40:59 +00:00
{
return it - > second . file_hash ;
}
if ( ! volume - > getDisk ( ) - > exists ( file_path ) )
{
return { } ;
}
std : : unique_ptr < ReadBufferFromFileBase > in_file = volume - > getDisk ( ) - > readFile ( file_path ) ;
HashingReadBuffer in_hash ( * in_file ) ;
String value ;
readStringUntilEOF ( value , in_hash ) ;
return in_hash . getHash ( ) ;
}
2022-01-07 10:37:08 +00:00
std : : unordered_map < String , IMergeTreeDataPart : : uint128 > IMergeTreeDataPart : : checkMetadata ( ) const
2021-12-08 02:40:59 +00:00
{
2022-01-07 10:37:08 +00:00
return metadata_manager - > check ( ) ;
2021-12-08 02:40:59 +00:00
}
2019-12-18 13:09:58 +00:00
bool isCompactPart ( const MergeTreeDataPartPtr & data_part )
{
return ( data_part & & data_part - > getType ( ) = = MergeTreeDataPartType : : COMPACT ) ;
}
bool isWidePart ( const MergeTreeDataPartPtr & data_part )
{
return ( data_part & & data_part - > getType ( ) = = MergeTreeDataPartType : : WIDE ) ;
}
2020-04-20 01:38:38 +00:00
bool isInMemoryPart ( const MergeTreeDataPartPtr & data_part )
{
return ( data_part & & data_part - > getType ( ) = = MergeTreeDataPartType : : IN_MEMORY ) ;
}
2019-10-10 16:30:30 +00:00
}