2017-04-01 09:19:00 +00:00
# include <Storages/MergeTree/MergeTreeData.h>
# include <Interpreters/ExpressionAnalyzer.h>
# include <Storages/MergeTree/MergeTreeBlockInputStream.h>
# include <Storages/MergeTree/MergedBlockOutputStream.h>
2017-12-03 00:48:19 +00:00
# include <Storages/MergeTree/checkDataPart.h>
2017-04-01 09:19:00 +00:00
# include <Storages/AlterCommands.h>
# include <Parsers/ASTIdentifier.h>
# include <Parsers/ASTNameTypePair.h>
2017-09-06 20:34:26 +00:00
# include <Parsers/ASTLiteral.h>
# include <Parsers/ASTFunction.h>
# include <Parsers/ASTPartition.h>
2017-08-19 18:11:20 +00:00
# include <Parsers/ExpressionListParsers.h>
2017-08-31 19:56:43 +00:00
# include <Parsers/parseQuery.h>
2017-04-01 09:19:00 +00:00
# include <DataStreams/ExpressionBlockInputStream.h>
2017-09-06 20:34:26 +00:00
# include <DataStreams/ValuesRowInputStream.h>
2017-04-01 09:19:00 +00:00
# include <DataStreams/copyData.h>
# include <IO/WriteBufferFromFile.h>
2017-07-31 21:39:24 +00:00
# include <IO/WriteBufferFromString.h>
2017-04-01 09:19:00 +00:00
# include <IO/CompressedReadBuffer.h>
2017-09-06 20:34:26 +00:00
# include <IO/ReadBufferFromMemory.h>
# include <IO/ConcatReadBuffer.h>
2017-04-01 09:19:00 +00:00
# include <IO/HexWriteBuffer.h>
2017-09-05 12:12:55 +00:00
# include <IO/Operators.h>
2017-04-01 09:19:00 +00:00
# include <DataTypes/DataTypeDate.h>
# include <DataTypes/DataTypeDateTime.h>
# include <DataTypes/DataTypeEnum.h>
2017-12-25 18:58:39 +00:00
# include <DataTypes/NestedUtils.h>
2017-04-01 09:19:00 +00:00
# include <DataTypes/DataTypeArray.h>
# include <DataTypes/DataTypeNullable.h>
# include <Functions/FunctionFactory.h>
# include <Functions/IFunction.h>
2017-05-10 08:08:32 +00:00
# include <Common/Increment.h>
2017-05-10 06:49:19 +00:00
# include <Common/SimpleIncrement.h>
2017-04-01 09:19:00 +00:00
# include <Common/escapeForFileName.h>
2018-01-15 19:07:47 +00:00
# include <Common/StringUtils/StringUtils.h>
2017-04-01 09:19:00 +00:00
# include <Common/Stopwatch.h>
2017-07-13 20:58:19 +00:00
# include <Common/typeid_cast.h>
2017-09-05 12:12:55 +00:00
# include <Common/localBackup.h>
# include <Poco/DirectoryIterator.h>
2014-10-16 01:21:03 +00:00
2017-09-11 22:40:51 +00:00
# include <boost/range/adaptor/filtered.hpp>
2014-06-10 14:24:33 +00:00
# include <algorithm>
2014-10-16 01:21:03 +00:00
# include <iomanip>
2015-04-16 07:22:29 +00:00
# include <thread>
2017-02-09 17:29:36 +00:00
# include <typeinfo>
# include <typeindex>
2017-11-20 04:15:43 +00:00
# include <optional>
2018-01-23 22:56:46 +00:00
# include <Interpreters/PartLog.h>
2014-03-09 17:36:01 +00:00
2016-10-24 02:02:37 +00:00
namespace ProfileEvents
{
2017-04-01 07:20:54 +00:00
extern const Event RejectedInserts ;
extern const Event DelayedInserts ;
extern const Event DelayedInsertsMilliseconds ;
2016-10-24 02:02:37 +00:00
}
2016-10-27 22:50:02 +00:00
namespace CurrentMetrics
{
2017-04-01 07:20:54 +00:00
extern const Metric DelayedInserts ;
2016-10-27 22:50:02 +00:00
}
2014-03-09 17:36:01 +00:00
namespace DB
{
2016-11-20 12:43:20 +00:00
namespace ErrorCodes
{
2017-04-01 07:20:54 +00:00
extern const int MEMORY_LIMIT_EXCEEDED ;
2017-08-19 18:11:20 +00:00
extern const int SYNTAX_ERROR ;
2017-08-25 20:41:45 +00:00
extern const int CORRUPTED_DATA ;
2017-09-06 20:34:26 +00:00
extern const int INVALID_PARTITION_VALUE ;
2017-09-08 18:11:09 +00:00
extern const int METADATA_MISMATCH ;
2017-11-20 19:33:12 +00:00
extern const int PART_IS_TEMPORARILY_LOCKED ;
2018-03-09 23:23:15 +00:00
extern const int TOO_MANY_PARTS ;
2016-11-20 12:43:20 +00:00
}
2014-03-09 17:36:01 +00:00
MergeTreeData : : MergeTreeData (
2017-04-01 07:20:54 +00:00
const String & database_ , const String & table_ ,
2018-03-06 20:18:34 +00:00
const String & full_path_ , const ColumnsDescription & columns_ ,
2017-04-01 07:20:54 +00:00
Context & context_ ,
2017-09-08 13:17:38 +00:00
const ASTPtr & primary_expr_ast_ ,
2018-02-09 10:53:50 +00:00
const ASTPtr & secondary_sort_expr_ast_ ,
2017-09-08 18:11:09 +00:00
const String & date_column_name ,
const ASTPtr & partition_expr_ast_ ,
2017-09-08 13:17:38 +00:00
const ASTPtr & sampling_expression_ ,
2017-04-01 07:20:54 +00:00
const MergingParams & merging_params_ ,
const MergeTreeSettings & settings_ ,
bool require_part_metadata_ ,
bool attach ,
2017-12-01 21:40:58 +00:00
BrokenPartCallback broken_part_callback_ )
2018-03-06 20:18:34 +00:00
: ITableDeclaration { columns_ } ,
2018-01-25 14:42:39 +00:00
context ( context_ ) ,
2017-09-08 13:17:38 +00:00
sampling_expression ( sampling_expression_ ) ,
2017-09-19 20:42:42 +00:00
index_granularity ( settings_ . index_granularity ) ,
2017-04-01 07:20:54 +00:00
merging_params ( merging_params_ ) ,
2017-09-08 13:17:38 +00:00
settings ( settings_ ) ,
primary_expr_ast ( primary_expr_ast_ ) ,
2018-02-09 10:53:50 +00:00
secondary_sort_expr_ast ( secondary_sort_expr_ast_ ) ,
2017-09-08 18:11:09 +00:00
partition_expr_ast ( partition_expr_ast_ ) ,
2017-04-01 07:20:54 +00:00
require_part_metadata ( require_part_metadata_ ) ,
database_name ( database_ ) , table_name ( table_ ) ,
2018-01-25 14:42:39 +00:00
full_path ( full_path_ ) ,
2017-04-01 07:20:54 +00:00
broken_part_callback ( broken_part_callback_ ) ,
2017-12-25 14:56:32 +00:00
log_name ( database_name + " . " + table_name ) , log ( & Logger : : get ( log_name + " (Data) " ) ) ,
2018-02-19 15:31:43 +00:00
data_parts_by_info ( data_parts_indexes . get < TagByInfo > ( ) ) ,
data_parts_by_state_and_info ( data_parts_indexes . get < TagByStateAndInfo > ( ) )
2014-03-09 17:36:01 +00:00
{
2018-03-06 14:49:27 +00:00
/// NOTE: using the same columns list as is read when performing actual merges.
2018-03-13 15:00:28 +00:00
merging_params . check ( getColumns ( ) . getAllPhysical ( ) ) ;
2017-04-01 07:20:54 +00:00
2018-02-21 14:43:10 +00:00
if ( ! primary_expr_ast )
2018-02-19 17:31:30 +00:00
throw Exception ( " Primary key cannot be empty " , ErrorCodes : : BAD_ARGUMENTS ) ;
2017-04-01 07:20:54 +00:00
initPrimaryKey ( ) ;
2017-09-01 20:33:17 +00:00
2018-02-21 14:43:10 +00:00
if ( sampling_expression & & ( ! primary_key_sample . has ( sampling_expression - > getColumnName ( ) ) )
2017-11-01 01:45:10 +00:00
& & ! attach & & ! settings . compatibility_allow_sampling_expression_not_in_primary_key ) /// This is for backward compatibility.
2017-09-18 19:24:27 +00:00
throw Exception ( " Sampling expression must be present in the primary key " , ErrorCodes : : BAD_ARGUMENTS ) ;
2017-09-08 18:11:09 +00:00
MergeTreeDataFormatVersion min_format_version ( 0 ) ;
if ( ! date_column_name . empty ( ) )
2017-09-01 20:33:17 +00:00
{
2017-09-08 18:11:09 +00:00
try
{
2017-11-01 01:45:10 +00:00
String partition_expr_str = " toYYYYMM( " + backQuoteIfNeed ( date_column_name ) + " ) " ;
2017-09-08 18:11:09 +00:00
ParserNotEmptyExpressionList parser ( /* allow_alias_without_as_keyword = */ false ) ;
partition_expr_ast = parseQuery (
2018-04-16 15:11:13 +00:00
parser , partition_expr_str . data ( ) , partition_expr_str . data ( ) + partition_expr_str . length ( ) , " partition expression " , 0 ) ;
2017-09-08 13:17:38 +00:00
2017-09-08 18:11:09 +00:00
initPartitionKey ( ) ;
2017-09-08 13:17:38 +00:00
2017-09-08 18:11:09 +00:00
if ( minmax_idx_date_column_pos = = - 1 )
throw Exception ( " Could not find Date column " , ErrorCodes : : BAD_TYPE_OF_FIELD ) ;
}
catch ( Exception & e )
{
/// Better error message.
e . addMessage ( " (while initializing MergeTree partition key from date column ` " + date_column_name + " `) " ) ;
throw ;
}
2017-09-08 13:17:38 +00:00
}
2017-09-08 18:11:09 +00:00
else
2017-09-08 13:17:38 +00:00
{
2017-09-08 18:11:09 +00:00
initPartitionKey ( ) ;
min_format_version = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING ;
2017-09-01 20:33:17 +00:00
}
2017-04-01 07:20:54 +00:00
2018-04-16 10:04:59 +00:00
auto path_exists = Poco : : File ( full_path ) . exists ( ) ;
2017-04-01 07:20:54 +00:00
/// Creating directories, if not exist.
Poco : : File ( full_path ) . createDirectories ( ) ;
2018-04-16 10:04:59 +00:00
2017-04-01 07:20:54 +00:00
Poco : : File ( full_path + " detached " ) . createDirectory ( ) ;
2017-08-25 20:41:45 +00:00
String version_file_path = full_path + " format_version.txt " ;
2018-04-16 10:04:59 +00:00
// When data path not exists, ignore the format_version check
if ( ! attach | | ! path_exists )
2017-08-25 20:41:45 +00:00
{
2017-09-08 18:11:09 +00:00
format_version = min_format_version ;
2017-08-25 20:41:45 +00:00
WriteBufferFromFile buf ( version_file_path ) ;
writeIntText ( format_version . toUnderType ( ) , buf ) ;
}
else if ( Poco : : File ( version_file_path ) . exists ( ) )
{
ReadBufferFromFile buf ( version_file_path ) ;
readIntText ( format_version , buf ) ;
if ( ! buf . eof ( ) )
throw Exception ( " Bad version file: " + version_file_path , ErrorCodes : : CORRUPTED_DATA ) ;
}
else
format_version = 0 ;
2017-09-08 18:11:09 +00:00
if ( format_version < min_format_version )
throw Exception (
" MergeTree data format version on disk doesn't support custom partitioning " ,
ErrorCodes : : METADATA_MISMATCH ) ;
2016-05-16 23:04:03 +00:00
}
2018-02-21 17:05:21 +00:00
static void checkKeyExpression ( const ExpressionActions & expr , const Block & sample_block , const String & key_name )
2017-12-09 10:14:45 +00:00
{
2018-02-21 17:05:21 +00:00
for ( const ExpressionAction & action : expr . getActions ( ) )
{
if ( action . type = = ExpressionAction : : ARRAY_JOIN )
throw Exception ( key_name + " key cannot contain array joins " ) ;
if ( action . type = = ExpressionAction : : APPLY_FUNCTION )
{
IFunctionBase & func = * action . function ;
if ( ! func . isDeterministic ( ) )
throw Exception ( key_name + " key cannot contain non-deterministic functions, "
" but contains function " + func . getName ( ) ,
ErrorCodes : : BAD_ARGUMENTS ) ;
}
}
2017-12-09 10:14:45 +00:00
2018-02-21 17:05:21 +00:00
for ( const ColumnWithTypeAndName & element : sample_block )
{
const ColumnPtr & column = element . column ;
if ( column & & ( column - > isColumnConst ( ) | | column - > isDummy ( ) ) )
throw Exception { key_name + " key cannot contain constants " , ErrorCodes : : ILLEGAL_COLUMN } ;
if ( element . type - > isNullable ( ) )
throw Exception { key_name + " key cannot contain nullable columns " , ErrorCodes : : ILLEGAL_COLUMN } ;
}
2017-12-09 10:14:45 +00:00
}
2016-05-16 23:04:03 +00:00
void MergeTreeData : : initPrimaryKey ( )
{
2018-02-09 10:53:50 +00:00
auto addSortDescription = [ ] ( SortDescription & descr , const ASTPtr & expr_ast )
2017-04-01 07:20:54 +00:00
{
2018-02-09 10:53:50 +00:00
descr . reserve ( descr . size ( ) + expr_ast - > children . size ( ) ) ;
for ( const ASTPtr & ast : expr_ast - > children )
{
String name = ast - > getColumnName ( ) ;
descr . emplace_back ( name , 1 , 1 ) ;
}
} ;
/// Initialize description of sorting for primary key.
primary_sort_descr . clear ( ) ;
addSortDescription ( primary_sort_descr , primary_expr_ast ) ;
2017-04-01 07:20:54 +00:00
2018-03-13 15:00:28 +00:00
primary_expr = ExpressionAnalyzer ( primary_expr_ast , context , nullptr , getColumns ( ) . getAllPhysical ( ) ) . getActions ( false ) ;
2017-04-01 07:20:54 +00:00
2018-02-09 10:53:50 +00:00
{
ExpressionActionsPtr projected_expr =
2018-03-13 15:00:28 +00:00
ExpressionAnalyzer ( primary_expr_ast , context , nullptr , getColumns ( ) . getAllPhysical ( ) ) . getActions ( true ) ;
2018-02-09 10:53:50 +00:00
primary_key_sample = projected_expr - > getSampleBlock ( ) ;
}
2017-04-01 07:20:54 +00:00
2018-02-21 17:05:21 +00:00
checkKeyExpression ( * primary_expr , primary_key_sample , " Primary " ) ;
2017-04-01 07:20:54 +00:00
2018-02-21 17:05:21 +00:00
size_t primary_key_size = primary_key_sample . columns ( ) ;
2017-04-01 07:20:54 +00:00
primary_key_data_types . resize ( primary_key_size ) ;
for ( size_t i = 0 ; i < primary_key_size ; + + i )
primary_key_data_types [ i ] = primary_key_sample . getByPosition ( i ) . type ;
2018-02-09 10:53:50 +00:00
sort_descr = primary_sort_descr ;
if ( secondary_sort_expr_ast )
{
addSortDescription ( sort_descr , secondary_sort_expr_ast ) ;
2018-03-13 15:00:28 +00:00
secondary_sort_expr = ExpressionAnalyzer ( secondary_sort_expr_ast , context , nullptr , getColumns ( ) . getAllPhysical ( ) ) . getActions ( false ) ;
2018-02-09 10:53:50 +00:00
ExpressionActionsPtr projected_expr =
2018-03-13 15:00:28 +00:00
ExpressionAnalyzer ( secondary_sort_expr_ast , context , nullptr , getColumns ( ) . getAllPhysical ( ) ) . getActions ( true ) ;
2018-02-09 10:53:50 +00:00
auto secondary_key_sample = projected_expr - > getSampleBlock ( ) ;
2018-02-21 17:05:21 +00:00
checkKeyExpression ( * secondary_sort_expr , secondary_key_sample , " Secondary " ) ;
2018-02-09 10:53:50 +00:00
}
2014-03-13 12:48:07 +00:00
}
2014-03-09 17:36:01 +00:00
2016-04-15 17:42:51 +00:00
2017-09-08 18:11:09 +00:00
void MergeTreeData : : initPartitionKey ( )
2017-08-19 18:11:20 +00:00
{
2017-09-01 20:33:17 +00:00
if ( ! partition_expr_ast | | partition_expr_ast - > children . empty ( ) )
return ;
2018-03-13 15:00:28 +00:00
partition_expr = ExpressionAnalyzer ( partition_expr_ast , context , nullptr , getColumns ( ) . getAllPhysical ( ) ) . getActions ( false ) ;
2017-08-19 18:11:20 +00:00
for ( const ASTPtr & ast : partition_expr_ast - > children )
2017-08-30 19:03:19 +00:00
{
String col_name = ast - > getColumnName ( ) ;
2018-02-21 17:05:21 +00:00
partition_key_sample . insert ( partition_expr - > getSampleBlock ( ) . getByName ( col_name ) ) ;
2017-08-30 19:03:19 +00:00
}
2017-08-21 15:35:29 +00:00
2018-02-21 17:05:21 +00:00
checkKeyExpression ( * partition_expr , partition_key_sample , " Partition " ) ;
2017-09-01 20:33:17 +00:00
/// Add all columns used in the partition key to the min-max index.
2017-12-25 21:57:29 +00:00
const NamesAndTypesList & minmax_idx_columns_with_types = partition_expr - > getRequiredColumnsWithTypes ( ) ;
2017-08-21 15:35:29 +00:00
minmax_idx_expr = std : : make_shared < ExpressionActions > ( minmax_idx_columns_with_types , context . getSettingsRef ( ) ) ;
2017-12-25 21:57:29 +00:00
for ( const NameAndTypePair & column : minmax_idx_columns_with_types )
2017-08-21 15:35:29 +00:00
{
minmax_idx_columns . emplace_back ( column . name ) ;
minmax_idx_column_types . emplace_back ( column . type ) ;
minmax_idx_sort_descr . emplace_back ( column . name , 1 , 1 ) ;
}
2017-09-01 20:33:17 +00:00
/// Try to find the date column in columns used by the partition key (a common case).
2017-08-21 15:35:29 +00:00
bool encountered_date_column = false ;
for ( size_t i = 0 ; i < minmax_idx_column_types . size ( ) ; + + i )
{
if ( typeid_cast < const DataTypeDate * > ( minmax_idx_column_types [ i ] . get ( ) ) )
{
if ( ! encountered_date_column )
{
minmax_idx_date_column_pos = i ;
encountered_date_column = true ;
}
else
{
/// There is more than one Date column in partition key and we don't know which one to choose.
minmax_idx_date_column_pos = - 1 ;
}
}
}
2017-08-19 18:11:20 +00:00
}
2017-12-25 21:57:29 +00:00
void MergeTreeData : : MergingParams : : check ( const NamesAndTypesList & columns ) const
2016-04-15 17:42:51 +00:00
{
2018-02-02 09:46:54 +00:00
if ( ! sign_column . empty ( ) & & mode ! = MergingParams : : Collapsing & & mode ! = MergingParams : : VersionedCollapsing )
throw Exception ( " Sign column for MergeTree cannot be specified in modes except Collapsing or VersionedCollapsing. " ,
2018-01-29 17:42:19 +00:00
ErrorCodes : : LOGICAL_ERROR ) ;
2018-02-02 09:46:54 +00:00
if ( ! version_column . empty ( ) & & mode ! = MergingParams : : Replacing & & mode ! = MergingParams : : VersionedCollapsing )
throw Exception ( " Version column for MergeTree cannot be specified in modes except Replacing or VersionedCollapsing. " ,
2018-01-29 17:42:19 +00:00
ErrorCodes : : LOGICAL_ERROR ) ;
if ( ! columns_to_sum . empty ( ) & & mode ! = MergingParams : : Summing )
throw Exception ( " List of columns to sum for MergeTree cannot be specified in all modes except Summing. " ,
ErrorCodes : : LOGICAL_ERROR ) ;
2017-04-01 07:20:54 +00:00
/// Check that if the sign column is needed, it exists and is of type Int8.
2018-01-29 17:42:19 +00:00
auto check_sign_column = [ this , & columns ] ( bool is_optional , const std : : string & storage )
2017-04-01 07:20:54 +00:00
{
if ( sign_column . empty ( ) )
2018-01-29 17:42:19 +00:00
{
if ( is_optional )
return ;
throw Exception ( " Logical error: Sign column for storage " + storage + " is empty " , ErrorCodes : : LOGICAL_ERROR ) ;
}
2017-04-01 07:20:54 +00:00
2018-01-01 12:00:05 +00:00
bool miss_column = true ;
2017-04-01 07:20:54 +00:00
for ( const auto & column : columns )
{
if ( column . name = = sign_column )
{
if ( ! typeid_cast < const DataTypeInt8 * > ( column . type . get ( ) ) )
2018-01-29 17:42:19 +00:00
throw Exception ( " Sign column ( " + sign_column + " ) for storage " + storage + " must have type Int8. "
" Provided column of type " + column . type - > getName ( ) + " . " , ErrorCodes : : BAD_TYPE_OF_FIELD ) ;
2018-01-01 12:00:05 +00:00
miss_column = false ;
2017-04-01 07:20:54 +00:00
break ;
}
}
2018-01-09 18:00:19 +00:00
if ( miss_column )
2017-12-31 19:40:53 +00:00
throw Exception ( " Sign column " + sign_column + " does not exist in table declaration. " ) ;
2018-01-29 17:42:19 +00:00
} ;
2017-04-01 07:20:54 +00:00
2018-01-29 17:42:19 +00:00
/// that if the version_column column is needed, it exists and is of unsigned integer type.
auto check_version_column = [ this , & columns ] ( bool is_optional , const std : : string & storage )
2017-04-01 07:20:54 +00:00
{
2018-01-29 17:42:19 +00:00
if ( version_column . empty ( ) )
{
if ( is_optional )
return ;
2017-04-01 07:20:54 +00:00
2018-01-29 17:42:19 +00:00
throw Exception ( " Logical error: Version column for storage " + storage + " is empty " , ErrorCodes : : LOGICAL_ERROR ) ;
}
2017-04-01 07:20:54 +00:00
2018-01-01 12:00:05 +00:00
bool miss_column = true ;
2017-04-01 07:20:54 +00:00
for ( const auto & column : columns )
{
if ( column . name = = version_column )
{
2018-03-06 14:49:27 +00:00
if ( ! column . type - > canBeUsedAsVersion ( ) )
throw Exception ( " The column " + version_column +
" cannot be used as a version column for storage " + storage +
" because it is of type " + column . type - > getName ( ) +
2018-03-06 19:01:45 +00:00
" (must be of an integer type or of type Date or DateTime) " , ErrorCodes : : BAD_TYPE_OF_FIELD ) ;
2018-01-01 12:00:05 +00:00
miss_column = false ;
2017-04-01 07:20:54 +00:00
break ;
}
}
2018-01-09 18:00:19 +00:00
if ( miss_column )
2017-12-31 19:40:53 +00:00
throw Exception ( " Version column " + version_column + " does not exist in table declaration. " ) ;
2018-01-29 17:42:19 +00:00
} ;
if ( mode = = MergingParams : : Collapsing )
check_sign_column ( false , " CollapsingMergeTree " ) ;
if ( mode = = MergingParams : : Summing )
{
/// If columns_to_sum are set, then check that such columns exist.
for ( const auto & column_to_sum : columns_to_sum )
2018-02-02 12:14:30 +00:00
{
auto check_column_to_sum_exists = [ & column_to_sum ] ( const NameAndTypePair & name_and_type )
{
return column_to_sum = = Nested : : extractTableName ( name_and_type . name ) ;
} ;
if ( columns . end ( ) = = std : : find_if ( columns . begin ( ) , columns . end ( ) , check_column_to_sum_exists ) )
throw Exception (
" Column " + column_to_sum + " listed in columns to sum does not exist in table declaration. " ) ;
}
2018-01-29 17:42:19 +00:00
}
if ( mode = = MergingParams : : Replacing )
check_version_column ( true , " ReplacingMergeTree " ) ;
2018-02-02 09:46:54 +00:00
if ( mode = = MergingParams : : VersionedCollapsing )
2018-01-29 17:42:19 +00:00
{
2018-02-02 09:46:54 +00:00
check_sign_column ( false , " VersionedCollapsingMergeTree " ) ;
check_version_column ( false , " VersionedCollapsingMergeTree " ) ;
2017-04-01 07:20:54 +00:00
}
/// TODO Checks for Graphite mode.
2016-04-24 09:44:47 +00:00
}
String MergeTreeData : : MergingParams : : getModeName ( ) const
{
2017-04-01 07:20:54 +00:00
switch ( mode )
{
2017-09-11 22:40:51 +00:00
case Ordinary : return " " ;
case Collapsing : return " Collapsing " ;
case Summing : return " Summing " ;
case Aggregating : return " Aggregating " ;
2017-04-01 07:20:54 +00:00
case Replacing : return " Replacing " ;
2017-09-11 22:40:51 +00:00
case Graphite : return " Graphite " ;
2018-02-02 09:46:54 +00:00
case VersionedCollapsing : return " VersionedCollapsing " ;
2017-04-01 07:20:54 +00:00
default :
throw Exception ( " Unknown mode of operation for MergeTreeData: " + toString < int > ( mode ) , ErrorCodes : : LOGICAL_ERROR ) ;
}
2016-04-15 17:42:51 +00:00
}
2015-08-17 21:09:36 +00:00
Int64 MergeTreeData : : getMaxDataPartIndex ( )
2014-03-13 12:48:07 +00:00
{
2017-09-11 22:40:51 +00:00
std : : lock_guard < std : : mutex > lock_all ( data_parts_mutex ) ;
2016-01-30 00:57:35 +00:00
2017-08-14 18:16:11 +00:00
Int64 max_block_id = 0 ;
2018-02-19 15:31:43 +00:00
for ( const DataPartPtr & part : data_parts_by_info )
2017-08-14 18:16:11 +00:00
max_block_id = std : : max ( max_block_id , part - > info . max_block ) ;
2014-09-29 05:03:03 +00:00
2017-08-14 18:16:11 +00:00
return max_block_id ;
2014-03-09 17:36:01 +00:00
}
2014-08-13 08:07:52 +00:00
void MergeTreeData : : loadDataParts ( bool skip_sanity_checks )
2014-03-09 17:36:01 +00:00
{
2017-04-01 07:20:54 +00:00
LOG_DEBUG ( log , " Loading data parts " ) ;
Strings part_file_names ;
Poco : : DirectoryIterator end ;
for ( Poco : : DirectoryIterator it ( full_path ) ; it ! = end ; + + it )
{
2017-08-16 19:22:49 +00:00
/// Skip temporary directories.
2017-05-15 23:41:16 +00:00
if ( startsWith ( it . name ( ) , " tmp " ) )
2017-04-01 07:20:54 +00:00
continue ;
part_file_names . push_back ( it . name ( ) ) ;
}
DataPartsVector broken_parts_to_remove ;
DataPartsVector broken_parts_to_detach ;
size_t suspicious_broken_parts = 0 ;
2017-11-20 19:33:12 +00:00
std : : lock_guard < std : : mutex > lock ( data_parts_mutex ) ;
data_parts_indexes . clear ( ) ;
2017-04-01 07:20:54 +00:00
for ( const String & file_name : part_file_names )
{
2017-08-16 19:24:50 +00:00
MergeTreePartInfo part_info ;
2017-08-25 20:41:45 +00:00
if ( ! MergeTreePartInfo : : tryParsePartName ( file_name , & part_info , format_version ) )
2017-04-01 07:20:54 +00:00
continue ;
2017-08-16 19:24:50 +00:00
MutableDataPartPtr part = std : : make_shared < DataPart > ( * this , file_name , part_info ) ;
2017-05-16 15:40:32 +00:00
part - > relative_path = file_name ;
2017-04-01 07:20:54 +00:00
bool broken = false ;
try
{
2017-08-16 19:24:50 +00:00
part - > loadColumnsChecksumsIndexes ( require_part_metadata , true ) ;
2017-04-01 07:20:54 +00:00
}
catch ( const Exception & e )
{
/// Don't count the part as broken if there is not enough memory to load it.
/// In fact, there can be many similar situations.
/// But it is OK, because there is a safety guard against deleting too many parts.
if ( e . code ( ) = = ErrorCodes : : MEMORY_LIMIT_EXCEEDED )
throw ;
broken = true ;
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
catch ( . . . )
{
broken = true ;
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
/// Ignore and possibly delete broken parts that can appear as a result of hard server restart.
if ( broken )
{
2017-08-14 18:16:11 +00:00
if ( part - > info . level = = 0 )
2017-04-01 07:20:54 +00:00
{
/// It is impossible to restore level 0 parts.
LOG_ERROR ( log , " Considering to remove broken part " < < full_path + file_name < < " because it's impossible to repair. " ) ;
broken_parts_to_remove . push_back ( part ) ;
}
else
{
/// Count the number of parts covered by the broken part. If it is at least two, assume that
/// the broken part was created as a result of merging them and we won't lose data if we
/// delete it.
int contained_parts = 0 ;
LOG_ERROR ( log , " Part " < < full_path + file_name < < " is broken. Looking for parts to replace it. " ) ;
for ( const String & contained_name : part_file_names )
{
if ( contained_name = = file_name )
continue ;
2017-04-18 20:38:07 +00:00
2017-08-14 18:16:11 +00:00
MergeTreePartInfo contained_part_info ;
2017-08-25 20:41:45 +00:00
if ( ! MergeTreePartInfo : : tryParsePartName ( contained_name , & contained_part_info , format_version ) )
2017-04-18 20:38:07 +00:00
continue ;
2017-08-14 18:16:11 +00:00
if ( part - > info . contains ( contained_part_info ) )
2017-04-01 07:20:54 +00:00
{
LOG_ERROR ( log , " Found part " < < full_path + contained_name ) ;
+ + contained_parts ;
}
}
if ( contained_parts > = 2 )
{
LOG_ERROR ( log , " Considering to remove broken part " < < full_path + file_name < < " because it covers at least 2 other parts " ) ;
broken_parts_to_remove . push_back ( part ) ;
}
else
{
LOG_ERROR ( log , " Detaching broken part " < < full_path + file_name
< < " because it covers less than 2 parts. You need to resolve this manually " ) ;
broken_parts_to_detach . push_back ( part ) ;
2017-06-21 19:11:11 +00:00
+ + suspicious_broken_parts ;
2017-04-01 07:20:54 +00:00
}
}
continue ;
}
part - > modification_time = Poco : : File ( full_path + file_name ) . getLastModified ( ) . epochTime ( ) ;
2017-09-21 21:51:17 +00:00
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
2017-09-11 22:40:51 +00:00
part - > state = DataPartState : : Committed ;
2017-04-01 07:20:54 +00:00
2017-11-20 19:33:12 +00:00
if ( ! data_parts_indexes . insert ( part ) . second )
throw Exception ( " Part " + part - > name + " already exists " , ErrorCodes : : DUPLICATE_DATA_PART ) ;
2017-04-01 07:20:54 +00:00
}
if ( suspicious_broken_parts > settings . max_suspicious_broken_parts & & ! skip_sanity_checks )
throw Exception ( " Suspiciously many ( " + toString ( suspicious_broken_parts ) + " ) broken parts to remove. " ,
ErrorCodes : : TOO_MANY_UNEXPECTED_DATA_PARTS ) ;
2017-05-16 15:40:32 +00:00
for ( auto & part : broken_parts_to_remove )
2017-04-01 07:20:54 +00:00
part - > remove ( ) ;
2017-05-16 15:40:32 +00:00
for ( auto & part : broken_parts_to_detach )
2017-04-01 07:20:54 +00:00
part - > renameAddPrefix ( true , " " ) ;
/// Delete from the set of current parts those parts that are covered by another part (those parts that
2017-06-21 19:07:08 +00:00
/// were merged), but that for some reason are still not deleted from the filesystem.
2017-04-01 07:20:54 +00:00
/// Deletion of files will be performed later in the clearOldParts() method.
2017-11-20 19:33:12 +00:00
if ( data_parts_indexes . size ( ) > = 2 )
2017-04-01 07:20:54 +00:00
{
2018-02-19 15:31:43 +00:00
/// Now all parts are committed, so data_parts_by_state_and_info == committed_parts_range
auto prev_jt = data_parts_by_state_and_info . begin ( ) ;
2017-09-21 21:51:17 +00:00
auto curr_jt = std : : next ( prev_jt ) ;
2018-02-19 15:31:43 +00:00
auto deactivate_part = [ & ] ( DataPartIteratorByStateAndInfo it )
2017-11-20 19:33:12 +00:00
{
2018-03-03 17:44:53 +00:00
( * it ) - > remove_time . store ( ( * it ) - > modification_time , std : : memory_order_relaxed ) ;
2017-11-20 19:33:12 +00:00
modifyPartState ( it , DataPartState : : Outdated ) ;
} ;
( * prev_jt ) - > assertState ( { DataPartState : : Committed } ) ;
2018-02-19 15:31:43 +00:00
while ( curr_jt ! = data_parts_by_state_and_info . end ( ) & & ( * curr_jt ) - > state = = DataPartState : : Committed )
2017-04-01 07:20:54 +00:00
{
2017-08-14 18:16:11 +00:00
/// Don't consider data parts belonging to different partitions.
if ( ( * curr_jt ) - > info . partition_id ! = ( * prev_jt ) - > info . partition_id )
2017-04-01 07:20:54 +00:00
{
+ + prev_jt ;
+ + curr_jt ;
continue ;
}
if ( ( * curr_jt ) - > contains ( * * prev_jt ) )
{
2017-11-20 19:33:12 +00:00
deactivate_part ( prev_jt ) ;
2017-04-01 07:20:54 +00:00
prev_jt = curr_jt ;
+ + curr_jt ;
}
else if ( ( * prev_jt ) - > contains ( * * curr_jt ) )
{
2017-11-20 19:33:12 +00:00
auto next = std : : next ( curr_jt ) ;
deactivate_part ( curr_jt ) ;
curr_jt = next ;
2017-04-01 07:20:54 +00:00
}
else
{
+ + prev_jt ;
+ + curr_jt ;
}
}
}
2017-05-14 23:14:21 +00:00
calculateColumnSizesImpl ( ) ;
2017-04-01 07:20:54 +00:00
2017-11-20 19:33:12 +00:00
LOG_DEBUG ( log , " Loaded data parts ( " < < data_parts_indexes . size ( ) < < " items) " ) ;
2014-03-09 17:36:01 +00:00
}
2017-02-07 17:52:41 +00:00
/// Is the part directory old.
/// True if its modification time and the modification time of all files inside it is less then threshold.
/// (Only files on the first level of nesting are considered).
2016-06-06 19:16:34 +00:00
static bool isOldPartDirectory ( Poco : : File & directory , time_t threshold )
{
2017-04-01 07:20:54 +00:00
if ( directory . getLastModified ( ) . epochTime ( ) > = threshold )
return false ;
2016-06-06 19:16:34 +00:00
2017-04-01 07:20:54 +00:00
Poco : : DirectoryIterator end ;
for ( Poco : : DirectoryIterator it ( directory ) ; it ! = end ; + + it )
if ( it - > getLastModified ( ) . epochTime ( ) > = threshold )
return false ;
2016-06-06 19:16:34 +00:00
2017-04-01 07:20:54 +00:00
return true ;
2016-06-06 19:16:34 +00:00
}
2017-05-31 15:01:25 +00:00
void MergeTreeData : : clearOldTemporaryDirectories ( ssize_t custom_directories_lifetime_seconds )
2014-03-09 17:36:01 +00:00
{
2017-04-01 07:20:54 +00:00
/// If the method is already called from another thread, then we don't need to do anything.
std : : unique_lock < std : : mutex > lock ( clear_old_temporary_directories_mutex , std : : defer_lock ) ;
if ( ! lock . try_lock ( ) )
return ;
2017-08-04 14:00:26 +00:00
time_t current_time = time ( nullptr ) ;
2017-05-31 15:01:25 +00:00
ssize_t deadline = ( custom_directories_lifetime_seconds > = 0 )
? current_time - custom_directories_lifetime_seconds
2017-09-20 14:41:07 +00:00
: current_time - settings . temporary_directories_lifetime . totalSeconds ( ) ;
2017-04-01 07:20:54 +00:00
/// Delete temporary directories older than a day.
Poco : : DirectoryIterator end ;
for ( Poco : : DirectoryIterator it { full_path } ; it ! = end ; + + it )
{
2017-05-15 23:41:16 +00:00
if ( startsWith ( it . name ( ) , " tmp " ) )
2017-04-01 07:20:54 +00:00
{
Poco : : File tmp_dir ( full_path + it . name ( ) ) ;
try
{
2017-05-31 15:01:25 +00:00
if ( tmp_dir . isDirectory ( ) & & isOldPartDirectory ( tmp_dir , deadline ) )
2017-04-01 07:20:54 +00:00
{
LOG_WARNING ( log , " Removing temporary directory " < < full_path < < it . name ( ) ) ;
Poco : : File ( full_path + it . name ( ) ) . remove ( true ) ;
}
}
catch ( const Poco : : FileNotFoundException & )
{
/// If the file is already deleted, do nothing.
}
}
}
2016-02-14 11:02:47 +00:00
}
MergeTreeData : : DataPartsVector MergeTreeData : : grabOldParts ( )
{
2017-04-01 07:20:54 +00:00
DataPartsVector res ;
/// If the method is already called from another thread, then we don't need to do anything.
std : : unique_lock < std : : mutex > lock ( grab_old_parts_mutex , std : : defer_lock ) ;
if ( ! lock . try_lock ( ) )
return res ;
2017-08-04 14:00:26 +00:00
time_t now = time ( nullptr ) ;
2018-02-19 15:31:43 +00:00
std : : vector < DataPartIteratorByStateAndInfo > parts_to_delete ;
2017-04-01 07:20:54 +00:00
{
2017-09-11 22:40:51 +00:00
std : : lock_guard < std : : mutex > lock_parts ( data_parts_mutex ) ;
2017-04-01 07:20:54 +00:00
2017-11-20 19:33:12 +00:00
auto outdated_parts_range = getDataPartsStateRange ( DataPartState : : Outdated ) ;
for ( auto it = outdated_parts_range . begin ( ) ; it ! = outdated_parts_range . end ( ) ; + + it )
2017-04-01 07:20:54 +00:00
{
2017-11-20 19:33:12 +00:00
const DataPartPtr & part = * it ;
2018-03-03 17:44:53 +00:00
auto part_remove_time = part - > remove_time . load ( std : : memory_order_relaxed ) ;
2018-02-19 15:31:43 +00:00
if ( part . unique ( ) & & /// Grab only parts that are not used by anyone (SELECTs for example).
2018-03-03 17:44:53 +00:00
part_remove_time < now & &
now - part_remove_time > settings . old_parts_lifetime . totalSeconds ( ) )
2017-04-01 07:20:54 +00:00
{
2017-11-20 19:33:12 +00:00
parts_to_delete . emplace_back ( it ) ;
2017-04-01 07:20:54 +00:00
}
}
2017-11-20 19:33:12 +00:00
res . reserve ( parts_to_delete . size ( ) ) ;
for ( const auto & it_to_delete : parts_to_delete )
{
res . emplace_back ( * it_to_delete ) ;
modifyPartState ( it_to_delete , DataPartState : : Deleting ) ;
}
2017-04-01 07:20:54 +00:00
}
if ( ! res . empty ( ) )
LOG_TRACE ( log , " Found " < < res . size ( ) < < " old parts to remove. " ) ;
return res ;
2014-03-09 17:36:01 +00:00
}
2016-02-14 11:02:47 +00:00
2017-09-11 22:40:51 +00:00
void MergeTreeData : : rollbackDeletingParts ( const MergeTreeData : : DataPartsVector & parts )
{
std : : lock_guard < std : : mutex > lock ( data_parts_mutex ) ;
for ( auto & part : parts )
{
/// We should modify it under data_parts_mutex
2017-09-21 21:51:17 +00:00
part - > assertState ( { DataPartState : : Deleting } ) ;
2017-11-20 19:33:12 +00:00
modifyPartState ( part , DataPartState : : Outdated ) ;
2017-09-11 22:40:51 +00:00
}
}
void MergeTreeData : : removePartsFinally ( const MergeTreeData : : DataPartsVector & parts )
2014-07-25 11:15:11 +00:00
{
2018-01-23 22:56:46 +00:00
{
std : : lock_guard < std : : mutex > lock ( data_parts_mutex ) ;
2017-09-11 22:40:51 +00:00
2018-01-23 22:56:46 +00:00
/// TODO: use data_parts iterators instead of pointers
for ( auto & part : parts )
{
2018-02-19 15:31:43 +00:00
auto it = data_parts_by_info . find ( part - > info ) ;
if ( it = = data_parts_by_info . end ( ) )
throw Exception ( " Deleting data part " + part - > name + " doesn't exist " , ErrorCodes : : LOGICAL_ERROR ) ;
2018-01-23 22:56:46 +00:00
( * it ) - > assertState ( { DataPartState : : Deleting } ) ;
data_parts_indexes . erase ( it ) ;
}
}
/// Data parts is still alive (since DataPartsVector holds shared_ptrs) and contain useful metainformation for logging
/// NOTE: There is no need to log parts deletion somewhere else, all deleting parts pass through this function and pass away
2018-03-10 19:57:13 +00:00
if ( auto part_log = context . getPartLog ( database_name ) )
2017-09-11 22:40:51 +00:00
{
2018-01-23 22:56:46 +00:00
PartLogElement part_log_elem ;
part_log_elem . event_type = PartLogElement : : REMOVE_PART ;
part_log_elem . event_time = time ( nullptr ) ;
part_log_elem . duration_ms = 0 ;
2017-09-11 22:40:51 +00:00
2018-01-23 22:56:46 +00:00
part_log_elem . database_name = database_name ;
part_log_elem . table_name = table_name ;
2017-11-20 19:33:12 +00:00
2018-01-23 22:56:46 +00:00
for ( auto & part : parts )
{
part_log_elem . part_name = part - > name ;
2018-03-26 14:18:04 +00:00
part_log_elem . bytes_compressed_on_disk = part - > bytes_on_disk ;
2018-01-23 22:56:46 +00:00
part_log_elem . rows = part - > rows_count ;
part_log - > add ( part_log_elem ) ;
}
2017-09-11 22:40:51 +00:00
}
2014-07-25 11:15:11 +00:00
}
2017-11-20 19:33:12 +00:00
void MergeTreeData : : clearOldPartsFromFilesystem ( )
2014-07-25 11:15:11 +00:00
{
2017-04-01 07:20:54 +00:00
auto parts_to_remove = grabOldParts ( ) ;
2014-07-25 11:15:11 +00:00
2017-04-01 07:20:54 +00:00
for ( const DataPartPtr & part : parts_to_remove )
{
2017-11-20 19:33:12 +00:00
LOG_DEBUG ( log , " Removing part from filesystem " < < part - > name ) ;
2017-04-01 07:20:54 +00:00
part - > remove ( ) ;
}
2017-11-20 19:33:12 +00:00
removePartsFinally ( parts_to_remove ) ;
2014-07-25 11:15:11 +00:00
}
2017-12-03 02:15:35 +00:00
void MergeTreeData : : setPath ( const String & new_full_path )
2014-03-09 17:36:01 +00:00
{
2017-12-03 02:15:35 +00:00
if ( Poco : : File { new_full_path } . exists ( ) )
throw Exception {
" Target path already exists: " + new_full_path ,
/// @todo existing target can also be a file, not directory
ErrorCodes : : DIRECTORY_ALREADY_EXISTS } ;
2017-04-01 07:20:54 +00:00
2017-12-03 02:15:35 +00:00
Poco : : File ( full_path ) . renameTo ( new_full_path ) ;
context . dropCaches ( ) ;
2017-04-01 07:20:54 +00:00
full_path = new_full_path ;
2014-03-09 17:36:01 +00:00
}
2014-03-13 12:48:07 +00:00
void MergeTreeData : : dropAllData ( )
2014-03-09 17:36:01 +00:00
{
2017-04-01 07:20:54 +00:00
LOG_TRACE ( log , " dropAllData: waiting for locks. " ) ;
2015-09-17 21:31:26 +00:00
2017-04-01 07:20:54 +00:00
std : : lock_guard < std : : mutex > lock ( data_parts_mutex ) ;
2015-09-17 21:31:26 +00:00
2017-04-01 07:20:54 +00:00
LOG_TRACE ( log , " dropAllData: removing data from memory. " ) ;
2015-09-17 21:31:26 +00:00
2017-11-20 19:33:12 +00:00
data_parts_indexes . clear ( ) ;
2017-04-01 07:20:54 +00:00
column_sizes . clear ( ) ;
2014-03-09 17:36:01 +00:00
2017-08-07 17:01:04 +00:00
context . dropCaches ( ) ;
2014-03-13 19:14:25 +00:00
2017-04-01 07:20:54 +00:00
LOG_TRACE ( log , " dropAllData: removing data from filesystem. " ) ;
2015-09-17 21:31:26 +00:00
2017-04-01 07:20:54 +00:00
Poco : : File ( full_path ) . remove ( true ) ;
2015-09-17 21:31:26 +00:00
2017-04-01 07:20:54 +00:00
LOG_TRACE ( log , " dropAllData: done. " ) ;
2014-03-09 17:36:01 +00:00
}
2017-02-08 18:43:35 +00:00
namespace
{
/// If true, then in order to ALTER the type of the column from the type from to the type to
/// we don't need to rewrite the data, we only need to update metadata and columns.txt in part directories.
2017-02-09 17:29:36 +00:00
/// The function works for Arrays and Nullables of the same structure.
2017-02-08 18:43:35 +00:00
bool isMetadataOnlyConversion ( const IDataType * from , const IDataType * to )
{
2017-04-01 07:20:54 +00:00
if ( from - > getName ( ) = = to - > getName ( ) )
return true ;
static const std : : unordered_multimap < std : : type_index , const std : : type_info & > ALLOWED_CONVERSIONS =
{
{ typeid ( DataTypeEnum8 ) , typeid ( DataTypeEnum8 ) } ,
{ typeid ( DataTypeEnum8 ) , typeid ( DataTypeInt8 ) } ,
{ typeid ( DataTypeEnum16 ) , typeid ( DataTypeEnum16 ) } ,
{ typeid ( DataTypeEnum16 ) , typeid ( DataTypeInt16 ) } ,
{ typeid ( DataTypeDateTime ) , typeid ( DataTypeUInt32 ) } ,
{ typeid ( DataTypeUInt32 ) , typeid ( DataTypeDateTime ) } ,
{ typeid ( DataTypeDate ) , typeid ( DataTypeUInt16 ) } ,
{ typeid ( DataTypeUInt16 ) , typeid ( DataTypeDate ) } ,
} ;
while ( true )
{
auto it_range = ALLOWED_CONVERSIONS . equal_range ( typeid ( * from ) ) ;
for ( auto it = it_range . first ; it ! = it_range . second ; + + it )
{
if ( it - > second = = typeid ( * to ) )
return true ;
}
const auto * arr_from = typeid_cast < const DataTypeArray * > ( from ) ;
const auto * arr_to = typeid_cast < const DataTypeArray * > ( to ) ;
if ( arr_from & & arr_to )
{
from = arr_from - > getNestedType ( ) . get ( ) ;
to = arr_to - > getNestedType ( ) . get ( ) ;
continue ;
}
const auto * nullable_from = typeid_cast < const DataTypeNullable * > ( from ) ;
const auto * nullable_to = typeid_cast < const DataTypeNullable * > ( to ) ;
if ( nullable_from & & nullable_to )
{
from = nullable_from - > getNestedType ( ) . get ( ) ;
to = nullable_to - > getNestedType ( ) . get ( ) ;
continue ;
}
return false ;
}
2017-02-08 18:43:35 +00:00
}
}
void MergeTreeData : : checkAlter ( const AlterCommands & commands )
2014-03-09 17:36:01 +00:00
{
2017-04-01 07:20:54 +00:00
/// Check that needed transformations can be applied to the list of columns without considering type conversions.
2018-03-13 14:18:11 +00:00
auto new_columns = getColumns ( ) ;
2018-03-06 20:18:34 +00:00
commands . apply ( new_columns ) ;
2017-04-01 07:20:54 +00:00
/// Set of columns that shouldn't be altered.
NameSet columns_alter_forbidden ;
2017-09-12 19:20:56 +00:00
/// Primary key columns can be ALTERed only if they are used in the key as-is
2017-04-01 07:20:54 +00:00
/// (and not as a part of some expression) and if the ALTER only affects column metadata.
NameSet columns_alter_metadata_only ;
2017-09-12 19:20:56 +00:00
if ( partition_expr )
2017-04-01 07:20:54 +00:00
{
2017-09-12 19:20:56 +00:00
/// Forbid altering partition key columns because it can change partition ID format.
/// TODO: in some cases (e.g. adding an Enum value) a partition key column can still be ALTERed.
/// We should allow it.
for ( const String & col : partition_expr - > getRequiredColumns ( ) )
columns_alter_forbidden . insert ( col ) ;
}
2017-08-31 13:33:32 +00:00
2018-02-09 10:53:50 +00:00
auto processSortingColumns =
[ & columns_alter_forbidden , & columns_alter_metadata_only ] ( const ExpressionActionsPtr & expression )
2017-09-12 19:20:56 +00:00
{
2018-02-09 10:53:50 +00:00
for ( const ExpressionAction & action : expression - > getActions ( ) )
2017-04-01 07:20:54 +00:00
{
auto action_columns = action . getNeededColumns ( ) ;
columns_alter_forbidden . insert ( action_columns . begin ( ) , action_columns . end ( ) ) ;
}
2018-02-09 10:53:50 +00:00
for ( const String & col : expression - > getRequiredColumns ( ) )
2017-08-31 13:33:32 +00:00
columns_alter_metadata_only . insert ( col ) ;
2018-02-09 10:53:50 +00:00
} ;
if ( primary_expr )
processSortingColumns ( primary_expr ) ;
2017-08-31 13:33:32 +00:00
/// We don't process sampling_expression separately because it must be among the primary key columns.
2018-02-09 10:53:50 +00:00
if ( secondary_sort_expr )
processSortingColumns ( secondary_sort_expr ) ;
2017-08-31 13:33:32 +00:00
if ( ! merging_params . sign_column . empty ( ) )
columns_alter_forbidden . insert ( merging_params . sign_column ) ;
2017-04-01 07:20:54 +00:00
std : : map < String , const IDataType * > old_types ;
2018-03-13 15:00:28 +00:00
for ( const auto & column : getColumns ( ) . getAllPhysical ( ) )
2017-04-01 07:20:54 +00:00
old_types . emplace ( column . name , column . type . get ( ) ) ;
for ( const AlterCommand & command : commands )
{
if ( columns_alter_forbidden . count ( command . column_name ) )
throw Exception ( " trying to ALTER key column " + command . column_name , ErrorCodes : : ILLEGAL_COLUMN ) ;
if ( columns_alter_metadata_only . count ( command . column_name ) )
{
if ( command . type = = AlterCommand : : MODIFY_COLUMN )
{
auto it = old_types . find ( command . column_name ) ;
if ( it ! = old_types . end ( ) & & isMetadataOnlyConversion ( it - > second , command . data_type . get ( ) ) )
continue ;
}
throw Exception (
" ALTER of key column " + command . column_name + " must be metadata-only " ,
ErrorCodes : : ILLEGAL_COLUMN ) ;
}
}
/// Check that type conversions are possible.
ExpressionActionsPtr unused_expression ;
NameToNameMap unused_map ;
bool unused_bool ;
2018-03-13 15:00:28 +00:00
createConvertExpression ( nullptr , getColumns ( ) . getAllPhysical ( ) , new_columns . getAllPhysical ( ) , unused_expression , unused_map , unused_bool ) ;
2014-03-09 17:36:01 +00:00
}
2017-12-25 21:57:29 +00:00
void MergeTreeData : : createConvertExpression ( const DataPartPtr & part , const NamesAndTypesList & old_columns , const NamesAndTypesList & new_columns ,
2017-04-01 07:20:54 +00:00
ExpressionActionsPtr & out_expression , NameToNameMap & out_rename_map , bool & out_force_update_metadata ) const
2014-03-09 17:36:01 +00:00
{
2017-04-01 07:20:54 +00:00
out_expression = nullptr ;
out_rename_map = { } ;
out_force_update_metadata = false ;
using NameToType = std : : map < String , const IDataType * > ;
NameToType new_types ;
2017-12-25 21:57:29 +00:00
for ( const NameAndTypePair & column : new_columns )
2017-04-01 07:20:54 +00:00
new_types . emplace ( column . name , column . type . get ( ) ) ;
/// For every column that need to be converted: source column name, column name of calculated expression for conversion.
std : : vector < std : : pair < String , String > > conversions ;
2017-12-03 01:49:54 +00:00
/// Collect counts for shared streams of different columns. As an example, Nested columns have shared stream with array sizes.
std : : map < String , size_t > stream_counts ;
2017-12-25 21:57:29 +00:00
for ( const NameAndTypePair & column : old_columns )
2017-04-01 07:20:54 +00:00
{
2017-12-03 01:49:54 +00:00
column . type - > enumerateStreams ( [ & ] ( const IDataType : : SubstreamPath & substream_path )
2017-04-01 07:20:54 +00:00
{
2017-12-03 01:49:54 +00:00
+ + stream_counts [ IDataType : : getFileNameForStream ( column . name , substream_path ) ] ;
} , { } ) ;
}
2017-04-01 07:20:54 +00:00
2017-12-25 21:57:29 +00:00
for ( const NameAndTypePair & column : old_columns )
2017-12-03 01:49:54 +00:00
{
if ( ! new_types . count ( column . name ) )
{
/// The column was deleted.
2017-04-01 07:20:54 +00:00
if ( ! part | | part - > hasColumnFiles ( column . name ) )
{
2017-12-03 01:49:54 +00:00
column . type - > enumerateStreams ( [ & ] ( const IDataType : : SubstreamPath & substream_path )
2017-04-01 07:20:54 +00:00
{
2017-12-03 01:49:54 +00:00
String file_name = IDataType : : getFileNameForStream ( column . name , substream_path ) ;
2017-04-01 07:20:54 +00:00
2017-12-03 01:49:54 +00:00
/// Delete files if they are no longer shared with another column.
if ( - - stream_counts [ file_name ] = = 0 )
2017-04-01 07:20:54 +00:00
{
2017-12-03 01:49:54 +00:00
out_rename_map [ file_name + " .bin " ] = " " ;
out_rename_map [ file_name + " .mrk " ] = " " ;
2017-04-01 07:20:54 +00:00
}
2017-12-03 01:49:54 +00:00
} , { } ) ;
2017-04-01 07:20:54 +00:00
}
}
else
{
2017-12-03 01:49:54 +00:00
/// The column was converted. Collect conversions.
2017-04-01 07:20:54 +00:00
const auto * new_type = new_types [ column . name ] ;
const String new_type_name = new_type - > getName ( ) ;
const auto * old_type = column . type . get ( ) ;
2017-12-03 01:49:54 +00:00
if ( ! new_type - > equals ( * old_type ) & & ( ! part | | part - > hasColumnFiles ( column . name ) ) )
2017-04-01 07:20:54 +00:00
{
if ( isMetadataOnlyConversion ( old_type , new_type ) )
{
out_force_update_metadata = true ;
continue ;
}
/// Need to modify column type.
if ( ! out_expression )
2017-12-25 21:57:29 +00:00
out_expression = std : : make_shared < ExpressionActions > ( NamesAndTypesList ( ) , context . getSettingsRef ( ) ) ;
2017-04-01 07:20:54 +00:00
out_expression - > addInput ( ColumnWithTypeAndName ( nullptr , column . type , column . name ) ) ;
Names out_names ;
2017-12-03 01:49:54 +00:00
/// This is temporary name for expression. TODO Invent the name more safely.
const String new_type_name_column = ' # ' + new_type_name + " _column " ;
2017-04-01 07:20:54 +00:00
out_expression - > add ( ExpressionAction : : addColumn (
2017-12-10 22:44:04 +00:00
{ DataTypeString ( ) . createColumnConst ( 1 , new_type_name ) , std : : make_shared < DataTypeString > ( ) , new_type_name_column } ) ) ;
2017-04-01 07:20:54 +00:00
2018-02-06 19:34:53 +00:00
const auto & function = FunctionFactory : : instance ( ) . get ( " CAST " , context ) ;
2017-04-01 07:20:54 +00:00
out_expression - > add ( ExpressionAction : : applyFunction (
function , Names { column . name , new_type_name_column } ) , out_names ) ;
out_expression - > add ( ExpressionAction : : removeColumn ( new_type_name_column ) ) ;
out_expression - > add ( ExpressionAction : : removeColumn ( column . name ) ) ;
conversions . emplace_back ( column . name , out_names . at ( 0 ) ) ;
}
}
}
if ( ! conversions . empty ( ) )
{
/// Give proper names for temporary columns with conversion results.
NamesWithAliases projection ;
projection . reserve ( conversions . size ( ) ) ;
for ( const auto & source_and_expression : conversions )
{
2017-12-03 01:49:54 +00:00
/// Column name for temporary filenames before renaming. NOTE The is unnecessarily tricky.
2017-04-01 07:20:54 +00:00
2017-12-03 01:49:54 +00:00
String original_column_name = source_and_expression . first ;
String temporary_column_name = original_column_name + " converting " ;
2017-04-01 07:20:54 +00:00
2017-12-03 01:49:54 +00:00
projection . emplace_back ( source_and_expression . second , temporary_column_name ) ;
2017-04-01 07:20:54 +00:00
2017-12-03 01:49:54 +00:00
/// After conversion, we need to rename temporary files into original.
2017-04-01 07:20:54 +00:00
2017-12-03 01:49:54 +00:00
new_types [ source_and_expression . first ] - > enumerateStreams (
[ & ] ( const IDataType : : SubstreamPath & substream_path )
{
2017-12-03 05:55:49 +00:00
/// Skip array sizes, because they cannot be modified in ALTER.
if ( ! substream_path . empty ( ) & & substream_path . back ( ) . type = = IDataType : : Substream : : ArraySizes )
return ;
2017-12-03 01:49:54 +00:00
String original_file_name = IDataType : : getFileNameForStream ( original_column_name , substream_path ) ;
String temporary_file_name = IDataType : : getFileNameForStream ( temporary_column_name , substream_path ) ;
2017-04-01 07:20:54 +00:00
2017-12-03 01:49:54 +00:00
out_rename_map [ temporary_file_name + " .bin " ] = original_file_name + " .bin " ;
out_rename_map [ temporary_file_name + " .mrk " ] = original_file_name + " .mrk " ;
} , { } ) ;
2017-04-01 07:20:54 +00:00
}
out_expression - > add ( ExpressionAction : : project ( projection ) ) ;
}
if ( part & & ! out_rename_map . empty ( ) )
{
2017-07-31 21:39:24 +00:00
WriteBufferFromOwnString out ;
2018-02-21 05:11:53 +00:00
out < < " Will " ;
2017-07-31 21:39:24 +00:00
bool first = true ;
for ( const auto & from_to : out_rename_map )
2017-04-01 07:20:54 +00:00
{
2017-07-31 21:39:24 +00:00
if ( ! first )
out < < " , " ;
first = false ;
2018-02-21 05:11:53 +00:00
if ( from_to . second . empty ( ) )
out < < " remove " < < from_to . first ;
else
out < < " rename " < < from_to . first < < " to " < < from_to . second ;
2017-04-01 07:20:54 +00:00
}
2017-07-31 21:39:24 +00:00
out < < " in part " < < part - > name ;
LOG_DEBUG ( log , out . str ( ) ) ;
2017-04-01 07:20:54 +00:00
}
2014-03-20 13:00:42 +00:00
}
2014-03-09 17:36:01 +00:00
2014-09-29 20:26:46 +00:00
MergeTreeData : : AlterDataPartTransactionPtr MergeTreeData : : alterDataPart (
2017-04-01 07:20:54 +00:00
const DataPartPtr & part ,
2017-12-25 21:57:29 +00:00
const NamesAndTypesList & new_columns ,
2017-04-01 07:20:54 +00:00
const ASTPtr & new_primary_key ,
bool skip_sanity_checks )
2014-03-20 13:00:42 +00:00
{
2017-04-01 07:20:54 +00:00
ExpressionActionsPtr expression ;
AlterDataPartTransactionPtr transaction ( new AlterDataPartTransaction ( part ) ) ; /// Blocks changes to the part.
bool force_update_metadata ;
createConvertExpression ( part , part - > columns , new_columns , expression , transaction - > rename_map , force_update_metadata ) ;
size_t num_files_to_modify = transaction - > rename_map . size ( ) ;
size_t num_files_to_remove = 0 ;
for ( const auto & from_to : transaction - > rename_map )
if ( from_to . second . empty ( ) )
+ + num_files_to_remove ;
if ( ! skip_sanity_checks
& & ( num_files_to_modify > settings . max_files_to_modify_in_alter_columns
| | num_files_to_remove > settings . max_files_to_remove_in_alter_columns ) )
{
transaction - > clear ( ) ;
2017-04-12 13:25:07 +00:00
const bool forbidden_because_of_modify = num_files_to_modify > settings . max_files_to_modify_in_alter_columns ;
2017-04-01 07:20:54 +00:00
std : : stringstream exception_message ;
2017-04-12 13:25:07 +00:00
exception_message
< < " Suspiciously many ( "
< < ( forbidden_because_of_modify ? num_files_to_modify : num_files_to_remove )
2017-04-01 07:20:54 +00:00
< < " ) files ( " ;
bool first = true ;
for ( const auto & from_to : transaction - > rename_map )
{
if ( ! first )
exception_message < < " , " ;
2017-04-12 13:25:07 +00:00
if ( forbidden_because_of_modify )
{
exception_message < < " from ` " < < from_to . first < < " ' to ` " < < from_to . second < < " ' " ;
first = false ;
}
else if ( from_to . second . empty ( ) )
{
exception_message < < " ` " < < from_to . first < < " ' " ;
first = false ;
}
2017-04-01 07:20:54 +00:00
}
2017-04-12 13:25:07 +00:00
exception_message
< < " ) need to be "
< < ( forbidden_because_of_modify ? " modified " : " removed " )
< < " in part " < < part - > name < < " of table at " < < full_path < < " . Aborting just in case. "
< < " If it is not an error, you could increase merge_tree/ "
< < ( forbidden_because_of_modify ? " max_files_to_modify_in_alter_columns " : " max_files_to_remove_in_alter_columns " )
< < " parameter in configuration file (current value: "
< < ( forbidden_because_of_modify ? settings . max_files_to_modify_in_alter_columns : settings . max_files_to_remove_in_alter_columns )
< < " ) " ;
2017-04-01 07:20:54 +00:00
throw Exception ( exception_message . str ( ) , ErrorCodes : : TABLE_DIFFERS_TOO_MUCH ) ;
}
DataPart : : Checksums add_checksums ;
/// Update primary key if needed.
size_t new_primary_key_file_size { } ;
2017-06-21 01:24:05 +00:00
MergeTreeDataPartChecksum : : uint128 new_primary_key_hash { } ;
2017-04-01 07:20:54 +00:00
2018-02-09 10:53:50 +00:00
/// TODO: Check the order of secondary sorting key columns.
2017-04-01 07:20:54 +00:00
if ( new_primary_key . get ( ) ! = primary_expr_ast . get ( ) )
{
ExpressionActionsPtr new_primary_expr = ExpressionAnalyzer ( new_primary_key , context , nullptr , new_columns ) . getActions ( true ) ;
Block new_primary_key_sample = new_primary_expr - > getSampleBlock ( ) ;
size_t new_key_size = new_primary_key_sample . columns ( ) ;
Columns new_index ( new_key_size ) ;
/// Copy the existing primary key columns. Fill new columns with default values.
/// NOTE default expressions are not supported.
ssize_t prev_position_of_existing_column = - 1 ;
for ( size_t i = 0 ; i < new_key_size ; + + i )
{
const String & column_name = new_primary_key_sample . safeGetByPosition ( i ) . name ;
if ( primary_key_sample . has ( column_name ) )
{
ssize_t position_of_existing_column = primary_key_sample . getPositionByName ( column_name ) ;
if ( position_of_existing_column < prev_position_of_existing_column )
throw Exception ( " Permuting of columns of primary key is not supported " , ErrorCodes : : BAD_ARGUMENTS ) ;
new_index [ i ] = part - > index . at ( position_of_existing_column ) ;
prev_position_of_existing_column = position_of_existing_column ;
}
else
{
const IDataType & type = * new_primary_key_sample . safeGetByPosition ( i ) . type ;
2017-12-18 04:07:26 +00:00
new_index [ i ] = type . createColumnConstWithDefaultValue ( part - > marks_count ) - > convertToFullColumnIfConst ( ) ;
2017-04-01 07:20:54 +00:00
}
}
if ( prev_position_of_existing_column = = - 1 )
throw Exception ( " No common columns while modifying primary key " , ErrorCodes : : BAD_ARGUMENTS ) ;
String index_tmp_path = full_path + part - > name + " /primary.idx.tmp " ;
WriteBufferFromFile index_file ( index_tmp_path ) ;
HashingWriteBuffer index_stream ( index_file ) ;
2017-10-24 14:11:53 +00:00
for ( size_t i = 0 , marks_count = part - > marks_count ; i < marks_count ; + + i )
2017-04-01 07:20:54 +00:00
for ( size_t j = 0 ; j < new_key_size ; + + j )
2017-09-01 18:21:01 +00:00
new_primary_key_sample . getByPosition ( j ) . type - > serializeBinary ( * new_index [ j ] . get ( ) , i , index_stream ) ;
2017-04-01 07:20:54 +00:00
transaction - > rename_map [ " primary.idx.tmp " ] = " primary.idx " ;
index_stream . next ( ) ;
new_primary_key_file_size = index_stream . count ( ) ;
new_primary_key_hash = index_stream . getHash ( ) ;
}
if ( transaction - > rename_map . empty ( ) & & ! force_update_metadata )
{
transaction - > clear ( ) ;
return nullptr ;
}
/// Apply the expression and write the result to temporary files.
if ( expression )
{
2017-10-24 14:11:53 +00:00
MarkRanges ranges { MarkRange ( 0 , part - > marks_count ) } ;
2017-03-24 13:52:50 +00:00
BlockInputStreamPtr part_in = std : : make_shared < MergeTreeBlockInputStream > (
2017-06-30 16:28:27 +00:00
* this , part , DEFAULT_MERGE_BLOCK_SIZE , 0 , 0 , expression - > getRequiredColumns ( ) , ranges ,
2017-04-01 07:20:54 +00:00
false , nullptr , " " , false , 0 , DBMS_DEFAULT_BUFFER_SIZE , false ) ;
2017-10-13 01:02:16 +00:00
auto compression_settings = this - > context . chooseCompressionSettings (
2018-03-26 14:18:04 +00:00
part - > bytes_on_disk ,
static_cast < double > ( part - > bytes_on_disk ) / this - > getTotalActiveSizeInBytes ( ) ) ;
2017-04-01 07:20:54 +00:00
ExpressionBlockInputStream in ( part_in , expression ) ;
2017-08-16 19:41:52 +00:00
/** Don't write offsets for arrays, because ALTER never change them
* ( MODIFY COLUMN could only change types of elements but never modify array sizes ) .
* Also note that they does not participate in ' rename_map ' .
* Also note , that for columns , that are parts of Nested ,
* temporary column name ( ' converting_column_name ' ) created in ' createConvertExpression ' method
* will have old name of shared offsets for arrays .
*/
2018-02-19 00:45:32 +00:00
MergedColumnOnlyOutputStream out ( * this , in . getHeader ( ) , full_path + part - > name + ' / ' , true /* sync */ , compression_settings , true /* skip_offsets */ ) ;
2017-08-16 19:41:52 +00:00
2017-04-01 07:20:54 +00:00
in . readPrefix ( ) ;
out . writePrefix ( ) ;
while ( Block b = in . read ( ) )
out . write ( b ) ;
in . readSuffix ( ) ;
add_checksums = out . writeSuffixAndGetChecksums ( ) ;
}
/// Update the checksums.
DataPart : : Checksums new_checksums = part - > checksums ;
for ( auto it : transaction - > rename_map )
{
2017-08-04 14:00:26 +00:00
if ( it . second . empty ( ) )
2017-04-01 07:20:54 +00:00
new_checksums . files . erase ( it . first ) ;
else
new_checksums . files [ it . second ] = add_checksums . files [ it . first ] ;
}
if ( new_primary_key_file_size )
{
new_checksums . files [ " primary.idx " ] . file_size = new_primary_key_file_size ;
new_checksums . files [ " primary.idx " ] . file_hash = new_primary_key_hash ;
}
/// Write the checksums to the temporary file.
if ( ! part - > checksums . empty ( ) )
{
transaction - > new_checksums = new_checksums ;
WriteBufferFromFile checksums_file ( full_path + part - > name + " /checksums.txt.tmp " , 4096 ) ;
new_checksums . write ( checksums_file ) ;
transaction - > rename_map [ " checksums.txt.tmp " ] = " checksums.txt " ;
}
/// Write the new column list to the temporary file.
{
transaction - > new_columns = new_columns . filter ( part - > columns . getNames ( ) ) ;
WriteBufferFromFile columns_file ( full_path + part - > name + " /columns.txt.tmp " , 4096 ) ;
transaction - > new_columns . writeText ( columns_file ) ;
transaction - > rename_map [ " columns.txt.tmp " ] = " columns.txt " ;
}
return transaction ;
2014-07-11 12:47:45 +00:00
}
2014-03-20 13:00:42 +00:00
2014-07-11 12:47:45 +00:00
void MergeTreeData : : AlterDataPartTransaction : : commit ( )
{
2017-04-01 07:20:54 +00:00
if ( ! data_part )
return ;
try
{
2017-07-28 17:34:02 +00:00
std : : unique_lock < std : : shared_mutex > lock ( data_part - > columns_lock ) ;
2017-04-01 07:20:54 +00:00
String path = data_part - > storage . full_path + data_part - > name + " / " ;
/// NOTE: checking that a file exists before renaming or deleting it
/// is justified by the fact that, when converting an ordinary column
/// to a nullable column, new files are created which did not exist
/// before, i.e. they do not have older versions.
/// 1) Rename the old files.
2017-12-03 02:15:35 +00:00
for ( const auto & from_to : rename_map )
2017-04-01 07:20:54 +00:00
{
2017-12-03 02:15:35 +00:00
String name = from_to . second . empty ( ) ? from_to . first : from_to . second ;
2017-04-01 07:20:54 +00:00
Poco : : File file { path + name } ;
if ( file . exists ( ) )
file . renameTo ( path + name + " .tmp2 " ) ;
}
/// 2) Move new files in the place of old and update the metadata in memory.
2017-12-03 02:15:35 +00:00
for ( const auto & from_to : rename_map )
2017-04-01 07:20:54 +00:00
{
2017-12-03 02:15:35 +00:00
if ( ! from_to . second . empty ( ) )
Poco : : File { path + from_to . first } . renameTo ( path + from_to . second ) ;
2017-04-01 07:20:54 +00:00
}
2017-08-04 14:00:26 +00:00
auto & mutable_part = const_cast < DataPart & > ( * data_part ) ;
2017-04-01 07:20:54 +00:00
mutable_part . checksums = new_checksums ;
mutable_part . columns = new_columns ;
/// 3) Delete the old files.
2017-12-03 02:15:35 +00:00
for ( const auto & from_to : rename_map )
2017-04-01 07:20:54 +00:00
{
2017-12-03 02:15:35 +00:00
String name = from_to . second . empty ( ) ? from_to . first : from_to . second ;
2017-04-01 07:20:54 +00:00
Poco : : File file { path + name + " .tmp2 " } ;
if ( file . exists ( ) )
file . remove ( ) ;
}
2018-03-26 14:18:04 +00:00
mutable_part . bytes_on_disk = MergeTreeData : : DataPart : : calculateTotalSizeOnDisk ( path ) ;
2017-04-01 07:20:54 +00:00
/// TODO: we can skip resetting caches when the column is added.
2017-08-07 17:01:04 +00:00
data_part - > storage . context . dropCaches ( ) ;
2017-04-01 07:20:54 +00:00
clear ( ) ;
}
catch ( . . . )
{
/// Don't delete temporary files in the destructor in case something went wrong.
clear ( ) ;
throw ;
}
2014-03-09 17:36:01 +00:00
}
2014-03-20 13:00:42 +00:00
2014-07-11 12:47:45 +00:00
MergeTreeData : : AlterDataPartTransaction : : ~ AlterDataPartTransaction ( )
2014-03-13 19:36:28 +00:00
{
2017-04-01 07:20:54 +00:00
try
{
if ( ! data_part )
return ;
2017-05-16 15:40:32 +00:00
LOG_WARNING ( data_part - > storage . log , " Aborting ALTER of part " < < data_part - > relative_path ) ;
2017-04-01 07:20:54 +00:00
2017-05-16 15:40:32 +00:00
String path = data_part - > getFullPath ( ) ;
2017-12-03 02:59:59 +00:00
for ( const auto & from_to : rename_map )
2017-04-01 07:20:54 +00:00
{
2017-12-03 02:59:59 +00:00
if ( ! from_to . second . empty ( ) )
2017-04-01 07:20:54 +00:00
{
try
{
2017-12-03 02:59:59 +00:00
Poco : : File file ( path + from_to . first ) ;
2017-04-01 07:20:54 +00:00
if ( file . exists ( ) )
file . remove ( ) ;
}
catch ( Poco : : Exception & e )
{
2017-12-03 02:59:59 +00:00
LOG_WARNING ( data_part - > storage . log , " Can't remove " < < path + from_to . first < < " : " < < e . displayText ( ) ) ;
2017-04-01 07:20:54 +00:00
}
}
}
}
catch ( . . . )
{
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
2014-03-09 17:36:01 +00:00
}
2018-02-19 15:31:43 +00:00
MergeTreeData : : DataPartsVector MergeTreeData : : getActivePartsToReplace (
const MergeTreePartInfo & new_part_info ,
2018-02-19 16:12:16 +00:00
const String & new_part_name ,
2018-02-19 15:31:43 +00:00
DataPartPtr & out_covering_part ,
std : : lock_guard < std : : mutex > & /* data_parts_lock */ ) const
2014-03-13 12:48:07 +00:00
{
2018-02-19 15:31:43 +00:00
/// Parts contained in the part are consecutive in data_parts, intersecting the insertion place for the part itself.
auto it_middle = data_parts_by_state_and_info . lower_bound ( DataPartStateAndInfo ( DataPartState : : Committed , new_part_info ) ) ;
auto committed_parts_range = getDataPartsStateRange ( DataPartState : : Committed ) ;
2017-11-20 19:33:12 +00:00
2018-02-19 15:31:43 +00:00
/// Go to the left.
DataPartIteratorByStateAndInfo begin = it_middle ;
while ( begin ! = committed_parts_range . begin ( ) )
2017-04-01 07:20:54 +00:00
{
2018-02-19 15:31:43 +00:00
auto prev = std : : prev ( begin ) ;
2017-04-01 07:20:54 +00:00
2018-02-19 15:31:43 +00:00
if ( ! new_part_info . contains ( ( * prev ) - > info ) )
2017-09-11 17:55:41 +00:00
{
2018-02-19 15:31:43 +00:00
if ( ( * prev ) - > info . contains ( new_part_info ) )
{
out_covering_part = * prev ;
return { } ;
}
2017-09-11 17:55:41 +00:00
2018-02-19 16:12:16 +00:00
if ( ! new_part_info . isDisjoint ( ( * prev ) - > info ) )
throw Exception ( " Part " + new_part_name + " intersects previous part " + ( * prev ) - > getNameWithState ( ) +
" . It is a bug. " , ErrorCodes : : LOGICAL_ERROR ) ;
2017-04-01 07:20:54 +00:00
2018-02-19 15:31:43 +00:00
break ;
2017-09-11 17:55:41 +00:00
}
2017-04-01 07:20:54 +00:00
2018-02-19 15:31:43 +00:00
begin = prev ;
}
2017-06-25 02:22:10 +00:00
2018-02-19 15:31:43 +00:00
/// Go to the right.
DataPartIteratorByStateAndInfo end = it_middle ;
while ( end ! = committed_parts_range . end ( ) )
{
if ( ( * end ) - > info = = new_part_info )
throw Exception ( " Unexpected duplicate part " + ( * end ) - > getNameWithState ( ) + " . It is a bug. " , ErrorCodes : : LOGICAL_ERROR ) ;
2017-11-20 19:33:12 +00:00
2018-02-19 15:31:43 +00:00
if ( ! new_part_info . contains ( ( * end ) - > info ) )
2017-05-24 20:19:29 +00:00
{
2018-02-19 15:31:43 +00:00
if ( ( * end ) - > info . contains ( new_part_info ) )
2017-11-20 19:33:12 +00:00
{
2018-02-19 15:31:43 +00:00
out_covering_part = * end ;
return { } ;
2017-11-20 19:33:12 +00:00
}
2017-09-11 22:40:51 +00:00
2018-02-19 16:12:16 +00:00
if ( ! new_part_info . isDisjoint ( ( * end ) - > info ) )
throw Exception ( " Part " + new_part_name + " intersects next part " + ( * end ) - > getNameWithState ( ) +
" . It is a bug. " , ErrorCodes : : LOGICAL_ERROR ) ;
2017-05-24 20:19:29 +00:00
2018-02-19 15:31:43 +00:00
break ;
2017-05-24 20:19:29 +00:00
}
2017-11-20 19:33:12 +00:00
2018-02-19 15:31:43 +00:00
+ + end ;
}
2017-11-20 19:33:12 +00:00
2018-02-19 15:31:43 +00:00
return DataPartsVector { begin , end } ;
}
2017-11-20 19:33:12 +00:00
2017-04-01 07:20:54 +00:00
2018-02-19 15:31:43 +00:00
void MergeTreeData : : renameTempPartAndAdd ( MutableDataPartPtr & part , SimpleIncrement * increment , Transaction * out_transaction )
{
auto removed = renameTempPartAndReplace ( part , increment , out_transaction ) ;
if ( ! removed . empty ( ) )
throw Exception ( " Added part " + part - > name + " covers " + toString ( removed . size ( ) )
+ " existing part(s) (including " + removed [ 0 ] - > name + " ) " , ErrorCodes : : LOGICAL_ERROR ) ;
}
2017-09-11 22:40:51 +00:00
2017-04-01 07:20:54 +00:00
2018-02-19 15:31:43 +00:00
MergeTreeData : : DataPartsVector MergeTreeData : : renameTempPartAndReplace (
MutableDataPartPtr & part , SimpleIncrement * increment , Transaction * out_transaction )
{
if ( out_transaction & & out_transaction - > data & & out_transaction - > data ! = this )
throw Exception ( " The same MergeTreeData::Transaction cannot be used for different tables " ,
ErrorCodes : : LOGICAL_ERROR ) ;
2017-09-11 22:40:51 +00:00
2018-02-19 15:31:43 +00:00
std : : lock_guard < std : : mutex > lock ( data_parts_mutex ) ;
2017-09-11 22:40:51 +00:00
2018-02-19 15:31:43 +00:00
part - > assertState ( { DataPartState : : Temporary } ) ;
2017-09-11 22:40:51 +00:00
2018-02-19 15:31:43 +00:00
MergeTreePartInfo part_info = part - > info ;
String part_name ;
2017-09-11 22:40:51 +00:00
2018-02-19 15:31:43 +00:00
if ( DataPartPtr existing_part_in_partition = getAnyPartInPartition ( part - > info . partition_id , lock ) )
{
if ( part - > partition . value ! = existing_part_in_partition - > partition . value )
throw Exception (
" Partition value mismatch between two parts with the same partition ID. Existing part: "
+ existing_part_in_partition - > name + " , newly added part: " + part - > name ,
ErrorCodes : : CORRUPTED_DATA ) ;
}
2017-09-11 22:40:51 +00:00
2018-02-19 15:31:43 +00:00
/** It is important that obtaining new block number and adding that block to parts set is done atomically.
* Otherwise there is race condition - merge of blocks could happen in interval that doesn ' t yet contain new part .
*/
if ( increment )
part_info . min_block = part_info . max_block = increment - > get ( ) ;
2017-09-11 22:40:51 +00:00
2018-02-19 15:31:43 +00:00
if ( format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING )
part_name = part_info . getPartNameV0 ( part - > getMinDate ( ) , part - > getMaxDate ( ) ) ;
else
part_name = part_info . getPartName ( ) ;
2017-04-01 07:20:54 +00:00
2018-02-19 15:31:43 +00:00
LOG_TRACE ( log , " Renaming temporary part " < < part - > relative_path < < " to " < < part_name < < " . " ) ;
2017-11-20 19:33:12 +00:00
2018-02-19 15:31:43 +00:00
auto it_duplicate = data_parts_by_info . find ( part_info ) ;
if ( it_duplicate ! = data_parts_by_info . end ( ) )
{
String message = " Part " + ( * it_duplicate ) - > getNameWithState ( ) + " already exists " ;
2017-09-11 22:40:51 +00:00
2018-02-19 15:31:43 +00:00
if ( ( * it_duplicate ) - > checkState ( { DataPartState : : Outdated , DataPartState : : Deleting } ) )
throw Exception ( message + " , but it will be deleted soon " , ErrorCodes : : PART_IS_TEMPORARILY_LOCKED ) ;
2017-09-11 22:40:51 +00:00
2018-02-19 15:31:43 +00:00
throw Exception ( message , ErrorCodes : : DUPLICATE_DATA_PART ) ;
}
2017-11-20 19:33:12 +00:00
2018-02-19 15:31:43 +00:00
DataPartPtr covering_part ;
2018-02-19 16:12:16 +00:00
DataPartsVector covered_parts = getActivePartsToReplace ( part_info , part_name , covering_part , lock ) ;
2017-11-20 19:33:12 +00:00
2018-02-19 15:31:43 +00:00
if ( covering_part )
{
LOG_WARNING ( log , " Tried to add obsolete part " < < part_name < < " covered by " < < covering_part - > getNameWithState ( ) ) ;
return { } ;
}
2017-11-20 19:33:12 +00:00
2018-02-19 15:31:43 +00:00
/// All checks are passed. Now we can rename the part on disk.
/// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts
///
/// If out_transaction is null, we commit the part to the active set immediately, else add it to the transaction.
part - > name = part_name ;
part - > info = part_info ;
part - > is_temp = false ;
part - > state = DataPartState : : PreCommitted ;
part - > renameTo ( part_name ) ;
2017-11-20 19:33:12 +00:00
2018-02-19 15:31:43 +00:00
auto part_it = data_parts_indexes . insert ( part ) . first ;
2017-04-01 07:20:54 +00:00
2018-02-19 15:31:43 +00:00
if ( out_transaction )
{
out_transaction - > data = this ;
out_transaction - > precommitted_parts . insert ( part ) ;
}
else
{
auto current_time = time ( nullptr ) ;
for ( const DataPartPtr & covered_part : covered_parts )
2017-11-20 19:33:12 +00:00
{
2018-03-03 17:44:53 +00:00
covered_part - > remove_time . store ( current_time , std : : memory_order_relaxed ) ;
2018-02-19 15:31:43 +00:00
modifyPartState ( covered_part , DataPartState : : Outdated ) ;
removePartContributionToColumnSizes ( covered_part ) ;
2017-11-20 19:33:12 +00:00
}
2018-02-19 15:31:43 +00:00
modifyPartState ( part_it , DataPartState : : Committed ) ;
addPartContributionToColumnSizes ( part ) ;
2017-04-01 07:20:54 +00:00
}
2018-02-19 15:31:43 +00:00
return covered_parts ;
2014-03-13 17:44:00 +00:00
}
2017-09-11 22:40:51 +00:00
void MergeTreeData : : removePartsFromWorkingSet ( const DataPartsVector & remove , bool clear_without_timeout )
2014-07-01 15:58:25 +00:00
{
2017-04-01 07:20:54 +00:00
std : : lock_guard < std : : mutex > lock ( data_parts_mutex ) ;
2014-07-01 15:58:25 +00:00
2017-09-05 19:03:51 +00:00
for ( auto & part : remove )
2017-04-01 07:20:54 +00:00
{
2018-02-19 15:31:43 +00:00
if ( ! data_parts_by_info . count ( part - > info ) )
2017-09-11 22:40:51 +00:00
throw Exception ( " Part " + part - > getNameWithState ( ) + " not found in data_parts " , ErrorCodes : : LOGICAL_ERROR ) ;
2016-01-30 02:29:20 +00:00
2017-09-21 21:51:17 +00:00
part - > assertState ( { DataPartState : : PreCommitted , DataPartState : : Committed , DataPartState : : Outdated } ) ;
2017-04-01 07:20:54 +00:00
}
2015-09-16 04:18:16 +00:00
2017-09-11 22:40:51 +00:00
auto remove_time = clear_without_timeout ? 0 : time ( nullptr ) ;
2017-04-01 07:20:54 +00:00
for ( const DataPartPtr & part : remove )
{
2017-09-11 22:40:51 +00:00
if ( part - > state = = DataPartState : : Committed )
2017-04-01 07:20:54 +00:00
removePartContributionToColumnSizes ( part ) ;
2017-11-20 19:33:12 +00:00
modifyPartState ( part , DataPartState : : Outdated ) ;
2018-03-03 17:44:53 +00:00
part - > remove_time . store ( remove_time , std : : memory_order_relaxed ) ;
2017-04-01 07:20:54 +00:00
}
2014-07-01 15:58:25 +00:00
}
2017-09-05 19:03:51 +00:00
2017-09-11 22:40:51 +00:00
void MergeTreeData : : renameAndDetachPart ( const DataPartPtr & part_to_detach , const String & prefix , bool restore_covered ,
bool move_to_detached )
2014-04-02 07:59:43 +00:00
{
2017-09-11 22:40:51 +00:00
LOG_INFO ( log , " Renaming " < < part_to_detach - > relative_path < < " to " < < prefix < < part_to_detach - > name < < " and detaching it. " ) ;
2017-04-01 07:20:54 +00:00
std : : lock_guard < std : : mutex > lock ( data_parts_mutex ) ;
2018-02-19 15:31:43 +00:00
auto it_part = data_parts_by_info . find ( part_to_detach - > info ) ;
if ( it_part = = data_parts_by_info . end ( ) )
2017-09-11 22:40:51 +00:00
throw Exception ( " No such data part " + part_to_detach - > getNameWithState ( ) , ErrorCodes : : NO_SUCH_DATA_PART ) ;
2017-04-01 07:20:54 +00:00
2017-09-11 22:40:51 +00:00
/// What if part_to_detach is reference to *it_part? Make a new owner just in case.
2017-11-20 19:33:12 +00:00
DataPartPtr part = * it_part ;
2017-04-01 07:20:54 +00:00
2017-11-20 19:33:12 +00:00
if ( part - > state = = DataPartState : : Committed )
removePartContributionToColumnSizes ( part ) ;
modifyPartState ( it_part , DataPartState : : Deleting ) ;
2017-04-01 07:20:54 +00:00
if ( move_to_detached | | ! prefix . empty ( ) )
part - > renameAddPrefix ( move_to_detached , prefix ) ;
2017-11-20 19:33:12 +00:00
data_parts_indexes . erase ( it_part ) ;
2017-04-01 07:20:54 +00:00
2017-11-20 19:33:12 +00:00
if ( restore_covered & & part - > info . level = = 0 )
2017-04-01 07:20:54 +00:00
{
2017-11-20 19:33:12 +00:00
LOG_WARNING ( log , " Will not recover parts covered by zero-level part " < < part - > name ) ;
return ;
}
2017-09-11 22:40:51 +00:00
2017-11-20 19:33:12 +00:00
if ( restore_covered )
{
2017-04-01 07:20:54 +00:00
Strings restored ;
bool error = false ;
2017-11-20 19:33:12 +00:00
String error_parts ;
2017-04-01 07:20:54 +00:00
2017-08-14 18:16:11 +00:00
Int64 pos = part - > info . min_block ;
2017-04-01 07:20:54 +00:00
2017-11-20 19:33:12 +00:00
auto is_appropriate_state = [ ] ( DataPartState state )
2017-04-01 07:20:54 +00:00
{
2017-11-20 19:33:12 +00:00
return state = = DataPartState : : Committed | | state = = DataPartState : : Outdated ;
} ;
2018-02-19 15:31:43 +00:00
auto update_error = [ & ] ( DataPartIteratorByInfo it )
2017-11-20 19:33:12 +00:00
{
error = true ;
error_parts + = ( * it ) - > getNameWithState ( ) + " " ;
} ;
2018-02-19 15:31:43 +00:00
auto it_middle = data_parts_by_info . lower_bound ( part - > info ) ;
2017-11-20 19:33:12 +00:00
/// Restore the leftmost part covered by the part
2018-02-19 15:31:43 +00:00
if ( it_middle ! = data_parts_by_info . begin ( ) )
2017-11-20 19:33:12 +00:00
{
auto it = std : : prev ( it_middle ) ;
if ( part - > contains ( * * it ) & & is_appropriate_state ( ( * it ) - > state ) )
2017-04-01 07:20:54 +00:00
{
2017-11-20 19:33:12 +00:00
/// Maybe, we must consider part level somehow
2017-08-14 18:16:11 +00:00
if ( ( * it ) - > info . min_block ! = part - > info . min_block )
2017-11-20 19:33:12 +00:00
update_error ( it ) ;
2017-09-11 22:40:51 +00:00
if ( ( * it ) - > state ! = DataPartState : : Committed )
{
addPartContributionToColumnSizes ( * it ) ;
2017-11-20 19:33:12 +00:00
modifyPartState ( it , DataPartState : : Committed ) ; // iterator is not invalidated here
2017-09-11 22:40:51 +00:00
}
2017-08-14 18:16:11 +00:00
pos = ( * it ) - > info . max_block + 1 ;
2017-04-01 07:20:54 +00:00
restored . push_back ( ( * it ) - > name ) ;
}
else
2017-11-20 19:33:12 +00:00
update_error ( it ) ;
2017-04-01 07:20:54 +00:00
}
else
error = true ;
2017-11-20 19:33:12 +00:00
/// Restore "right" parts
2018-02-19 15:31:43 +00:00
for ( auto it = it_middle ; it ! = data_parts_by_info . end ( ) & & part - > contains ( * * it ) ; + + it )
2017-04-01 07:20:54 +00:00
{
2017-08-14 18:16:11 +00:00
if ( ( * it ) - > info . min_block < pos )
2017-04-01 07:20:54 +00:00
continue ;
2017-11-20 19:33:12 +00:00
if ( ! is_appropriate_state ( ( * it ) - > state ) )
{
update_error ( it ) ;
continue ;
}
2017-08-14 18:16:11 +00:00
if ( ( * it ) - > info . min_block > pos )
2017-11-20 19:33:12 +00:00
update_error ( it ) ;
2017-09-11 22:40:51 +00:00
if ( ( * it ) - > state ! = DataPartState : : Committed )
{
addPartContributionToColumnSizes ( * it ) ;
2017-11-20 19:33:12 +00:00
modifyPartState ( it , DataPartState : : Committed ) ;
2017-09-11 22:40:51 +00:00
}
2017-08-14 18:16:11 +00:00
pos = ( * it ) - > info . max_block + 1 ;
2017-04-01 07:20:54 +00:00
restored . push_back ( ( * it ) - > name ) ;
}
2017-08-14 18:16:11 +00:00
if ( pos ! = part - > info . max_block + 1 )
2017-04-01 07:20:54 +00:00
error = true ;
for ( const String & name : restored )
{
LOG_INFO ( log , " Activated part " < < name ) ;
}
if ( error )
2017-11-20 19:33:12 +00:00
{
LOG_ERROR ( log , " The set of parts restored in place of " < < part - > name < < " looks incomplete. "
< < " There might or might not be a data loss. "
< < ( error_parts . empty ( ) ? " " : " Suspicious parts: " + error_parts ) ) ;
}
2017-04-01 07:20:54 +00:00
}
2014-03-13 17:44:00 +00:00
}
2014-09-19 11:44:29 +00:00
2015-11-18 21:37:28 +00:00
size_t MergeTreeData : : getTotalActiveSizeInBytes ( ) const
2015-04-17 05:35:53 +00:00
{
2017-04-01 07:20:54 +00:00
size_t res = 0 ;
2017-11-20 19:33:12 +00:00
{
std : : lock_guard < std : : mutex > lock ( data_parts_mutex ) ;
for ( auto & part : getDataPartsStateRange ( DataPartState : : Committed ) )
2018-03-26 14:18:04 +00:00
res + = part - > bytes_on_disk ;
2017-11-20 19:33:12 +00:00
}
2015-04-17 05:35:53 +00:00
2017-04-01 07:20:54 +00:00
return res ;
2015-04-17 05:35:53 +00:00
}
2014-04-09 16:32:32 +00:00
2017-08-14 18:16:11 +00:00
size_t MergeTreeData : : getMaxPartsCountForPartition ( ) const
2014-04-11 16:56:49 +00:00
{
2017-04-01 07:20:54 +00:00
std : : lock_guard < std : : mutex > lock ( data_parts_mutex ) ;
size_t res = 0 ;
size_t cur_count = 0 ;
2017-08-14 18:16:11 +00:00
const String * cur_partition_id = nullptr ;
2017-04-01 07:20:54 +00:00
2017-11-20 19:33:12 +00:00
for ( const auto & part : getDataPartsStateRange ( DataPartState : : Committed ) )
2017-04-01 07:20:54 +00:00
{
2017-08-14 18:16:11 +00:00
if ( cur_partition_id & & part - > info . partition_id = = * cur_partition_id )
2017-04-01 07:20:54 +00:00
{
+ + cur_count ;
}
else
{
2017-08-14 18:16:11 +00:00
cur_partition_id = & part - > info . partition_id ;
2017-04-01 07:20:54 +00:00
cur_count = 1 ;
}
res = std : : max ( res , cur_count ) ;
}
return res ;
2014-04-11 16:56:49 +00:00
}
2016-01-30 00:57:35 +00:00
2014-09-03 02:32:23 +00:00
void MergeTreeData : : delayInsertIfNeeded ( Poco : : Event * until )
2014-05-27 08:43:01 +00:00
{
2017-08-14 18:16:11 +00:00
const size_t parts_count = getMaxPartsCountForPartition ( ) ;
2017-04-01 07:20:54 +00:00
if ( parts_count < settings . parts_to_delay_insert )
return ;
2016-10-27 22:50:02 +00:00
2017-04-01 07:20:54 +00:00
if ( parts_count > = settings . parts_to_throw_insert )
{
ProfileEvents : : increment ( ProfileEvents : : RejectedInserts ) ;
2018-03-09 23:23:15 +00:00
throw Exception ( " Too many parts ( " + toString ( parts_count ) + " ). Merges are processing significantly slower than inserts. " , ErrorCodes : : TOO_MANY_PARTS ) ;
2017-04-01 07:20:54 +00:00
}
2014-09-12 20:29:29 +00:00
2017-04-01 07:20:54 +00:00
const size_t max_k = settings . parts_to_throw_insert - settings . parts_to_delay_insert ; /// always > 0
const size_t k = 1 + parts_count - settings . parts_to_delay_insert ; /// from 1 to max_k
2017-06-22 16:17:01 +00:00
const double delay_milliseconds = : : pow ( settings . max_delay_to_insert * 1000 , static_cast < double > ( k ) / max_k ) ;
2014-09-13 18:34:08 +00:00
2017-04-01 07:20:54 +00:00
ProfileEvents : : increment ( ProfileEvents : : DelayedInserts ) ;
2017-06-22 16:17:01 +00:00
ProfileEvents : : increment ( ProfileEvents : : DelayedInsertsMilliseconds , delay_milliseconds ) ;
2014-06-20 18:45:19 +00:00
2017-04-01 07:20:54 +00:00
CurrentMetrics : : Increment metric_increment ( CurrentMetrics : : DelayedInserts ) ;
2016-10-27 22:50:02 +00:00
2017-04-01 07:20:54 +00:00
LOG_INFO ( log , " Delaying inserting block by "
2017-06-22 16:17:01 +00:00
< < std : : fixed < < std : : setprecision ( 4 ) < < delay_milliseconds < < " ms. because there are " < < parts_count < < " parts " ) ;
2014-09-03 02:32:23 +00:00
2017-04-01 07:20:54 +00:00
if ( until )
2017-06-22 16:17:01 +00:00
until - > tryWait ( delay_milliseconds ) ;
2017-04-01 07:20:54 +00:00
else
2017-06-22 16:29:15 +00:00
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( static_cast < size_t > ( delay_milliseconds ) ) ) ;
2014-05-27 08:43:01 +00:00
}
2014-07-25 11:38:46 +00:00
MergeTreeData : : DataPartPtr MergeTreeData : : getActiveContainingPart ( const String & part_name )
2014-04-03 11:48:28 +00:00
{
2017-08-25 20:41:45 +00:00
auto part_info = MergeTreePartInfo : : fromPartName ( part_name , format_version ) ;
2017-04-01 07:20:54 +00:00
std : : lock_guard < std : : mutex > lock ( data_parts_mutex ) ;
2017-11-20 19:33:12 +00:00
auto committed_parts_range = getDataPartsStateRange ( DataPartState : : Committed ) ;
2017-04-01 07:20:54 +00:00
/// The part can be covered only by the previous or the next one in data_parts.
2018-02-19 15:31:43 +00:00
auto it = data_parts_by_state_and_info . lower_bound ( DataPartStateAndInfo ( DataPartState : : Committed , part_info ) ) ;
2017-04-01 07:20:54 +00:00
2017-11-20 19:33:12 +00:00
if ( it ! = committed_parts_range . end ( ) )
2017-04-01 07:20:54 +00:00
{
if ( ( * it ) - > name = = part_name )
return * it ;
2017-08-15 11:59:08 +00:00
if ( ( * it ) - > info . contains ( part_info ) )
2017-04-01 07:20:54 +00:00
return * it ;
}
2017-11-20 19:33:12 +00:00
if ( it ! = committed_parts_range . begin ( ) )
2017-04-01 07:20:54 +00:00
{
- - it ;
2017-08-15 11:59:08 +00:00
if ( ( * it ) - > info . contains ( part_info ) )
2017-04-01 07:20:54 +00:00
return * it ;
}
return nullptr ;
2014-04-03 11:48:28 +00:00
}
2017-10-03 19:04:56 +00:00
MergeTreeData : : DataPartPtr MergeTreeData : : getPartIfExists ( const String & part_name , const MergeTreeData : : DataPartStates & valid_states )
2014-07-25 11:38:46 +00:00
{
2017-08-25 20:41:45 +00:00
auto part_info = MergeTreePartInfo : : fromPartName ( part_name , format_version ) ;
2014-07-25 11:38:46 +00:00
2017-09-11 22:40:51 +00:00
std : : lock_guard < std : : mutex > lock ( data_parts_mutex ) ;
2017-10-03 19:04:56 +00:00
2018-02-19 15:31:43 +00:00
auto it = data_parts_by_info . find ( part_info ) ;
if ( it = = data_parts_by_info . end ( ) )
2017-11-20 19:33:12 +00:00
return nullptr ;
for ( auto state : valid_states )
{
if ( ( * it ) - > state = = state )
2017-12-18 17:26:46 +00:00
return * it ;
2017-11-20 19:33:12 +00:00
}
2014-07-25 11:38:46 +00:00
2017-04-01 07:20:54 +00:00
return nullptr ;
2014-07-25 11:38:46 +00:00
}
2016-01-28 01:00:27 +00:00
2014-08-08 08:28:13 +00:00
MergeTreeData : : MutableDataPartPtr MergeTreeData : : loadPartAndFixMetadata ( const String & relative_path )
{
2017-08-16 19:24:50 +00:00
MutableDataPartPtr part = std : : make_shared < DataPart > ( * this , Poco : : Path ( relative_path ) . getFileName ( ) ) ;
2017-05-16 15:40:32 +00:00
part - > relative_path = relative_path ;
String full_part_path = part - > getFullPath ( ) ;
2017-04-01 07:20:54 +00:00
/// Earlier the list of columns was written incorrectly. Delete it and re-create.
2017-05-16 15:40:32 +00:00
if ( Poco : : File ( full_part_path + " columns.txt " ) . exists ( ) )
Poco : : File ( full_part_path + " columns.txt " ) . remove ( ) ;
2017-04-01 07:20:54 +00:00
2017-08-16 19:24:50 +00:00
part - > loadColumnsChecksumsIndexes ( false , true ) ;
2017-05-16 15:40:32 +00:00
part - > modification_time = Poco : : File ( full_part_path ) . getLastModified ( ) . epochTime ( ) ;
2017-04-01 07:20:54 +00:00
/// If the checksums file is not present, calculate the checksums and write them to disk.
/// Check the data while we are at it.
if ( part - > checksums . empty ( ) )
{
2017-12-03 00:46:34 +00:00
part - > checksums = checkDataPart ( full_part_path , index_granularity , false , primary_key_data_types ) ;
2017-04-01 07:20:54 +00:00
{
2017-05-16 15:40:32 +00:00
WriteBufferFromFile out ( full_part_path + " checksums.txt.tmp " , 4096 ) ;
2017-04-01 07:20:54 +00:00
part - > checksums . write ( out ) ;
}
2017-05-16 15:40:32 +00:00
Poco : : File ( full_part_path + " checksums.txt.tmp " ) . renameTo ( full_part_path + " checksums.txt " ) ;
2017-04-01 07:20:54 +00:00
}
return part ;
2014-08-08 08:28:13 +00:00
}
2014-03-27 11:29:40 +00:00
2017-05-14 23:14:21 +00:00
void MergeTreeData : : calculateColumnSizesImpl ( )
2014-09-19 11:44:29 +00:00
{
2017-04-01 07:20:54 +00:00
column_sizes . clear ( ) ;
2014-09-19 11:44:29 +00:00
2017-09-21 21:51:17 +00:00
/// Take into account only committed parts
2017-11-20 19:33:12 +00:00
auto committed_parts_range = getDataPartsStateRange ( DataPartState : : Committed ) ;
for ( const auto & part : committed_parts_range )
2017-04-01 07:20:54 +00:00
addPartContributionToColumnSizes ( part ) ;
2014-09-19 11:44:29 +00:00
}
void MergeTreeData : : addPartContributionToColumnSizes ( const DataPartPtr & part )
{
2017-11-01 19:56:07 +00:00
std : : shared_lock < std : : shared_mutex > lock ( part - > columns_lock ) ;
2017-04-01 07:20:54 +00:00
2018-03-26 14:18:04 +00:00
for ( const auto & column : part - > columns )
2017-04-01 07:20:54 +00:00
{
2018-03-26 14:18:04 +00:00
DataPart : : ColumnSize & total_column_size = column_sizes [ column . name ] ;
DataPart : : ColumnSize part_column_size = part - > getColumnSize ( column . name , * column . type ) ;
total_column_size . add ( part_column_size ) ;
2017-04-01 07:20:54 +00:00
}
2014-09-19 11:44:29 +00:00
}
void MergeTreeData : : removePartContributionToColumnSizes ( const DataPartPtr & part )
{
2018-03-26 14:18:04 +00:00
std : : shared_lock < std : : shared_mutex > lock ( part - > columns_lock ) ;
2017-04-01 07:20:54 +00:00
2018-03-26 14:18:04 +00:00
for ( const auto & column : part - > columns )
2017-04-01 07:20:54 +00:00
{
2018-03-26 14:18:04 +00:00
DataPart : : ColumnSize & total_column_size = column_sizes [ column . name ] ;
DataPart : : ColumnSize part_column_size = part - > getColumnSize ( column . name , * column . type ) ;
2017-04-01 07:20:54 +00:00
2018-03-26 14:18:04 +00:00
auto log_subtract = [ & ] ( size_t & from , size_t value , const char * field )
2017-04-01 07:20:54 +00:00
{
2018-03-26 14:18:04 +00:00
if ( value > from )
LOG_ERROR ( log , " Possibly incorrect column size subtraction: "
< < from < < " - " < < value < < " = " < < from - value
< < " , column: " < < column . name < < " , field: " < < field ) ;
2017-04-01 07:20:54 +00:00
2018-03-26 14:18:04 +00:00
from - = value ;
} ;
2017-04-01 07:20:54 +00:00
2018-03-26 14:18:04 +00:00
log_subtract ( total_column_size . data_compressed , part_column_size . data_compressed , " .data_compressed " ) ;
log_subtract ( total_column_size . data_uncompressed , part_column_size . data_uncompressed , " .data_uncompressed " ) ;
log_subtract ( total_column_size . marks , part_column_size . marks , " .marks " ) ;
2017-04-01 07:20:54 +00:00
}
2014-09-19 11:44:29 +00:00
}
2014-10-03 17:57:01 +00:00
2017-09-06 20:34:26 +00:00
void MergeTreeData : : freezePartition ( const ASTPtr & partition_ast , const String & with_name , const Context & context )
2014-11-11 04:11:07 +00:00
{
2017-11-20 04:15:43 +00:00
std : : optional < String > prefix ;
2017-09-18 20:49:21 +00:00
String partition_id ;
2017-10-12 18:21:17 +00:00
2017-09-07 16:21:06 +00:00
if ( format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING )
2017-09-06 20:34:26 +00:00
{
const auto & partition = dynamic_cast < const ASTPartition & > ( * partition_ast ) ;
2017-09-18 20:49:21 +00:00
/// Month-partitioning specific - partition value can represent a prefix of the partition to freeze.
2017-09-06 20:34:26 +00:00
if ( const auto * partition_lit = dynamic_cast < const ASTLiteral * > ( partition . value . get ( ) ) )
prefix = partition_lit - > value . getType ( ) = = Field : : Types : : UInt64
? toString ( partition_lit - > value . get < UInt64 > ( ) )
: partition_lit - > value . safeGet < String > ( ) ;
2017-09-18 20:49:21 +00:00
else
partition_id = getPartitionIDFromQuery ( partition_ast , context ) ;
2017-09-06 20:34:26 +00:00
}
else
2017-09-18 20:49:21 +00:00
partition_id = getPartitionIDFromQuery ( partition_ast , context ) ;
2017-09-06 20:34:26 +00:00
2017-09-18 20:49:21 +00:00
if ( prefix )
2017-10-04 00:22:00 +00:00
LOG_DEBUG ( log , " Freezing parts with prefix " + * prefix ) ;
2017-09-18 20:49:21 +00:00
else
LOG_DEBUG ( log , " Freezing parts with partition ID " + partition_id ) ;
2017-04-01 07:20:54 +00:00
String clickhouse_path = Poco : : Path ( context . getPath ( ) ) . makeAbsolute ( ) . toString ( ) ;
String shadow_path = clickhouse_path + " shadow/ " ;
Poco : : File ( shadow_path ) . createDirectories ( ) ;
String backup_path = shadow_path
+ ( ! with_name . empty ( )
? escapeForFileName ( with_name )
: toString ( Increment ( shadow_path + " increment.txt " ) . get ( true ) ) )
+ " / " ;
LOG_DEBUG ( log , " Snapshot will be placed at " + backup_path ) ;
2017-10-12 18:21:17 +00:00
/// Acquire a snapshot of active data parts to prevent removing while doing backup.
const auto data_parts = getDataParts ( ) ;
2017-04-01 07:20:54 +00:00
size_t parts_processed = 0 ;
2017-10-12 18:21:17 +00:00
for ( const auto & part : data_parts )
2017-04-01 07:20:54 +00:00
{
2017-09-18 20:49:21 +00:00
if ( prefix )
2017-04-01 07:20:54 +00:00
{
2017-10-04 00:22:00 +00:00
if ( ! startsWith ( part - > info . partition_id , * prefix ) )
2017-09-18 20:49:21 +00:00
continue ;
}
2017-10-12 18:21:17 +00:00
else if ( part - > info . partition_id ! = partition_id )
2017-09-18 20:49:21 +00:00
continue ;
2017-04-01 07:20:54 +00:00
2017-10-12 18:21:17 +00:00
LOG_DEBUG ( log , " Freezing part " < < part - > name ) ;
2017-04-01 07:20:54 +00:00
2017-10-12 18:21:17 +00:00
String part_absolute_path = Poco : : Path ( part - > getFullPath ( ) ) . absolute ( ) . toString ( ) ;
2017-09-18 20:49:21 +00:00
if ( ! startsWith ( part_absolute_path , clickhouse_path ) )
throw Exception ( " Part path " + part_absolute_path + " is not inside " + clickhouse_path , ErrorCodes : : LOGICAL_ERROR ) ;
String backup_part_absolute_path = part_absolute_path ;
backup_part_absolute_path . replace ( 0 , clickhouse_path . size ( ) , backup_path ) ;
localBackup ( part_absolute_path , backup_part_absolute_path ) ;
+ + parts_processed ;
2017-04-01 07:20:54 +00:00
}
LOG_DEBUG ( log , " Freezed " < < parts_processed < < " parts " ) ;
2014-11-11 04:11:07 +00:00
}
2017-08-14 18:16:11 +00:00
size_t MergeTreeData : : getPartitionSize ( const std : : string & partition_id ) const
2016-01-28 01:00:27 +00:00
{
2017-04-01 07:20:54 +00:00
size_t size = 0 ;
Poco : : DirectoryIterator end ;
for ( Poco : : DirectoryIterator it ( full_path ) ; it ! = end ; + + it )
{
2017-08-14 18:16:11 +00:00
MergeTreePartInfo part_info ;
2017-08-25 20:41:45 +00:00
if ( ! MergeTreePartInfo : : tryParsePartName ( it . name ( ) , & part_info , format_version ) )
2017-04-01 07:20:54 +00:00
continue ;
2017-08-14 18:16:11 +00:00
if ( part_info . partition_id ! = partition_id )
2017-04-01 07:20:54 +00:00
continue ;
const auto part_path = it . path ( ) . absolute ( ) . toString ( ) ;
2017-12-03 02:15:35 +00:00
for ( Poco : : DirectoryIterator it2 ( part_path ) ; it2 ! = end ; + + it2 )
2017-04-01 07:20:54 +00:00
{
const auto part_file_path = it2 . path ( ) . absolute ( ) . toString ( ) ;
size + = Poco : : File ( part_file_path ) . getSize ( ) ;
}
}
return size ;
2016-01-28 01:00:27 +00:00
}
2014-11-11 04:11:07 +00:00
2017-09-11 17:55:41 +00:00
String MergeTreeData : : getPartitionIDFromQuery ( const ASTPtr & ast , const Context & context )
2014-10-03 17:57:01 +00:00
{
2017-09-11 17:55:41 +00:00
const auto & partition_ast = typeid_cast < const ASTPartition & > ( * ast ) ;
2017-09-06 20:34:26 +00:00
2017-09-11 17:55:41 +00:00
if ( ! partition_ast . value )
return partition_ast . id ;
2017-09-06 20:34:26 +00:00
2017-09-07 16:21:06 +00:00
if ( format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING )
2017-09-06 20:34:26 +00:00
{
2017-09-11 17:55:41 +00:00
/// Month-partitioning specific - partition ID can be passed in the partition value.
const auto * partition_lit = typeid_cast < const ASTLiteral * > ( partition_ast . value . get ( ) ) ;
2017-09-06 20:34:26 +00:00
if ( partition_lit & & partition_lit - > value . getType ( ) = = Field : : Types : : String )
{
String partition_id = partition_lit - > value . get < String > ( ) ;
if ( partition_id . size ( ) ! = 6 | | ! std : : all_of ( partition_id . begin ( ) , partition_id . end ( ) , isNumericASCII ) )
throw Exception (
" Invalid partition format: " + partition_id + " . Partition should consist of 6 digits: YYYYMM " ,
ErrorCodes : : INVALID_PARTITION_VALUE ) ;
return partition_id ;
}
}
2014-10-03 17:57:01 +00:00
2017-09-06 20:34:26 +00:00
/// Re-parse partition key fields using the information about expected field types.
2018-02-21 17:05:21 +00:00
size_t fields_count = partition_key_sample . columns ( ) ;
2017-09-11 17:55:41 +00:00
if ( partition_ast . fields_count ! = fields_count )
2017-09-06 20:34:26 +00:00
throw Exception (
2017-09-11 17:55:41 +00:00
" Wrong number of fields in the partition expression: " + toString ( partition_ast . fields_count ) +
2017-09-06 20:34:26 +00:00
" , must be: " + toString ( fields_count ) ,
ErrorCodes : : INVALID_PARTITION_VALUE ) ;
Row partition_row ( fields_count ) ;
if ( fields_count )
{
ReadBufferFromMemory left_paren_buf ( " ( " , 1 ) ;
2017-09-11 17:55:41 +00:00
ReadBufferFromMemory fields_buf ( partition_ast . fields_str . data , partition_ast . fields_str . size ) ;
2017-09-06 20:34:26 +00:00
ReadBufferFromMemory right_paren_buf ( " ) " , 1) ;
ConcatReadBuffer buf ( { & left_paren_buf , & fields_buf , & right_paren_buf } ) ;
2018-02-21 17:05:21 +00:00
ValuesRowInputStream input_stream ( buf , partition_key_sample , context , /* interpret_expressions = */ true ) ;
MutableColumns columns = partition_key_sample . cloneEmptyColumns ( ) ;
2017-12-15 21:11:24 +00:00
if ( ! input_stream . read ( columns ) )
2017-09-06 20:34:26 +00:00
throw Exception (
2017-09-11 17:55:41 +00:00
" Could not parse partition value: ` " + partition_ast . fields_str . toString ( ) + " ` " ,
2017-09-06 20:34:26 +00:00
ErrorCodes : : INVALID_PARTITION_VALUE ) ;
for ( size_t i = 0 ; i < fields_count ; + + i )
2017-12-15 21:11:24 +00:00
columns [ i ] - > get ( 0 , partition_row [ i ] ) ;
2017-09-06 20:34:26 +00:00
}
2014-10-03 17:57:01 +00:00
2017-09-11 17:55:41 +00:00
MergeTreePartition partition ( std : : move ( partition_row ) ) ;
String partition_id = partition . getID ( * this ) ;
{
2018-02-19 15:31:43 +00:00
std : : lock_guard < std : : mutex > data_parts_lock ( data_parts_mutex ) ;
2017-09-11 17:55:41 +00:00
DataPartPtr existing_part_in_partition = getAnyPartInPartition ( partition_id , data_parts_lock ) ;
if ( existing_part_in_partition & & existing_part_in_partition - > partition . value ! = partition . value )
{
WriteBufferFromOwnString buf ;
writeCString ( " Parsed partition value: " , buf ) ;
partition . serializeTextQuoted ( * this , buf ) ;
writeCString ( " doesn't match partition value for an existing part with the same partition ID: " , buf ) ;
writeString ( existing_part_in_partition - > name , buf ) ;
throw Exception ( buf . str ( ) , ErrorCodes : : INVALID_PARTITION_VALUE ) ;
}
}
return partition_id ;
2016-01-30 00:57:35 +00:00
}
2017-11-20 19:33:12 +00:00
MergeTreeData : : DataPartsVector MergeTreeData : : getDataPartsVector ( const DataPartStates & affordable_states , DataPartStateVector * out_states ) const
2017-09-11 22:40:51 +00:00
{
DataPartsVector res ;
2017-11-20 19:33:12 +00:00
DataPartsVector buf ;
2017-09-11 22:40:51 +00:00
{
std : : lock_guard < std : : mutex > lock ( data_parts_mutex ) ;
2017-11-20 19:33:12 +00:00
for ( auto state : affordable_states )
{
buf = std : : move ( res ) ;
res . clear ( ) ;
auto range = getDataPartsStateRange ( state ) ;
std : : merge ( range . begin ( ) , range . end ( ) , buf . begin ( ) , buf . end ( ) , std : : back_inserter ( res ) , LessDataPart ( ) ) ;
}
if ( out_states ! = nullptr )
{
out_states - > resize ( res . size ( ) ) ;
for ( size_t i = 0 ; i < res . size ( ) ; + + i )
( * out_states ) [ i ] = res [ i ] - > state ;
}
2017-09-11 22:40:51 +00:00
}
2017-11-20 19:33:12 +00:00
2017-09-11 22:40:51 +00:00
return res ;
}
2017-11-20 19:33:12 +00:00
MergeTreeData : : DataPartsVector MergeTreeData : : getAllDataPartsVector ( MergeTreeData : : DataPartStateVector * out_states ) const
2017-10-06 16:48:41 +00:00
{
DataPartsVector res ;
{
std : : lock_guard < std : : mutex > lock ( data_parts_mutex ) ;
2018-02-19 15:31:43 +00:00
res . assign ( data_parts_by_info . begin ( ) , data_parts_by_info . end ( ) ) ;
2017-10-06 16:48:41 +00:00
2017-11-20 19:33:12 +00:00
if ( out_states ! = nullptr )
{
out_states - > resize ( res . size ( ) ) ;
for ( size_t i = 0 ; i < res . size ( ) ; + + i )
( * out_states ) [ i ] = res [ i ] - > state ;
}
2017-10-06 16:48:41 +00:00
}
2017-11-20 19:33:12 +00:00
2017-10-06 16:48:41 +00:00
return res ;
}
2017-10-03 19:04:56 +00:00
MergeTreeData : : DataParts MergeTreeData : : getDataParts ( const DataPartStates & affordable_states ) const
2017-09-11 22:40:51 +00:00
{
DataParts res ;
{
std : : lock_guard < std : : mutex > lock ( data_parts_mutex ) ;
2017-11-20 19:33:12 +00:00
for ( auto state : affordable_states )
{
auto range = getDataPartsStateRange ( state ) ;
res . insert ( range . begin ( ) , range . end ( ) ) ;
}
2017-09-11 22:40:51 +00:00
}
return res ;
}
MergeTreeData : : DataParts MergeTreeData : : getDataParts ( ) const
{
return getDataParts ( { DataPartState : : Committed } ) ;
}
MergeTreeData : : DataPartsVector MergeTreeData : : getDataPartsVector ( ) const
{
return getDataPartsVector ( { DataPartState : : Committed } ) ;
}
MergeTreeData : : DataPartPtr MergeTreeData : : getAnyPartInPartition (
2018-02-19 15:31:43 +00:00
const String & partition_id , std : : lock_guard < std : : mutex > & /*data_parts_lock*/ )
2017-09-11 22:40:51 +00:00
{
auto min_block = std : : numeric_limits < Int64 > : : min ( ) ;
MergeTreePartInfo dummy_part_info ( partition_id , min_block , min_block , 0 ) ;
2018-02-19 15:31:43 +00:00
auto it = data_parts_by_state_and_info . lower_bound ( DataPartStateAndInfo ( DataPartState : : Committed , dummy_part_info ) ) ;
2017-09-11 22:40:51 +00:00
2018-02-19 15:31:43 +00:00
if ( it ! = data_parts_by_state_and_info . end ( ) & & ( * it ) - > state = = DataPartState : : Committed & & ( * it ) - > info . partition_id = = partition_id )
2017-09-11 22:40:51 +00:00
return * it ;
2017-11-20 19:33:12 +00:00
return nullptr ;
2017-09-11 22:40:51 +00:00
}
2017-05-24 20:19:29 +00:00
void MergeTreeData : : Transaction : : rollback ( )
{
2017-11-20 19:33:12 +00:00
if ( ! isEmpty ( ) )
2017-05-24 20:19:29 +00:00
{
std : : stringstream ss ;
2018-02-19 15:31:43 +00:00
ss < < " Removing parts: " ;
for ( const auto & part : precommitted_parts )
ss < < " " < < part - > relative_path ;
ss < < " . " ;
2017-05-24 20:19:29 +00:00
LOG_DEBUG ( data - > log , " Undoing transaction. " < < ss . str ( ) ) ;
2018-02-19 15:31:43 +00:00
data - > removePartsFromWorkingSet (
DataPartsVector ( precommitted_parts . begin ( ) , precommitted_parts . end ( ) ) ,
/* clear_without_timeout = */ true ) ;
2017-05-24 20:19:29 +00:00
}
2017-11-20 19:33:12 +00:00
clear ( ) ;
2017-05-24 20:19:29 +00:00
}
2018-02-19 15:31:43 +00:00
MergeTreeData : : DataPartsVector MergeTreeData : : Transaction : : commit ( )
2017-09-11 17:55:41 +00:00
{
2018-02-19 15:31:43 +00:00
DataPartsVector total_covered_parts ;
2017-09-11 22:40:51 +00:00
2018-02-19 15:31:43 +00:00
if ( ! isEmpty ( ) )
2017-09-11 22:40:51 +00:00
{
2018-02-19 15:31:43 +00:00
std : : lock_guard < std : : mutex > data_parts_lock ( data - > data_parts_mutex ) ;
2017-09-11 22:40:51 +00:00
2018-02-19 15:31:43 +00:00
auto current_time = time ( nullptr ) ;
for ( const DataPartPtr & part : precommitted_parts )
2017-09-11 22:40:51 +00:00
{
2018-02-19 15:31:43 +00:00
DataPartPtr covering_part ;
2018-02-19 16:12:16 +00:00
DataPartsVector covered_parts = data - > getActivePartsToReplace ( part - > info , part - > name , covering_part , data_parts_lock ) ;
2018-02-19 15:31:43 +00:00
if ( covering_part )
2017-09-11 22:40:51 +00:00
{
2018-02-19 15:31:43 +00:00
LOG_WARNING ( data - > log , " Tried to commit obsolete part " < < part - > name
< < " covered by " < < covering_part - > getNameWithState ( ) ) ;
2018-03-03 17:44:53 +00:00
part - > remove_time . store ( 0 , std : : memory_order_relaxed ) ; /// The part will be removed without waiting for old_parts_lifetime seconds.
2018-02-19 15:31:43 +00:00
data - > modifyPartState ( part , DataPartState : : Outdated ) ;
2017-09-11 22:40:51 +00:00
}
2018-02-19 15:31:43 +00:00
else
{
total_covered_parts . insert ( total_covered_parts . end ( ) , covered_parts . begin ( ) , covered_parts . end ( ) ) ;
for ( const DataPartPtr & covered_part : covered_parts )
{
2018-03-03 17:44:53 +00:00
covered_part - > remove_time . store ( current_time , std : : memory_order_relaxed ) ;
2018-02-19 15:31:43 +00:00
data - > modifyPartState ( covered_part , DataPartState : : Outdated ) ;
data - > removePartContributionToColumnSizes ( covered_part ) ;
}
2017-09-11 22:40:51 +00:00
2018-02-19 15:31:43 +00:00
data - > modifyPartState ( part , DataPartState : : Committed ) ;
2017-09-11 22:40:51 +00:00
data - > addPartContributionToColumnSizes ( part ) ;
2018-02-19 15:31:43 +00:00
}
2017-09-11 22:40:51 +00:00
}
}
2018-02-19 15:31:43 +00:00
clear ( ) ;
return total_covered_parts ;
2017-09-11 17:55:41 +00:00
}
2017-05-24 20:19:29 +00:00
2018-04-04 20:37:28 +00:00
bool MergeTreeData : : isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions ( const ASTPtr & node ) const
2018-01-21 07:30:07 +00:00
{
String column_name = node - > getColumnName ( ) ;
2018-02-09 10:53:50 +00:00
for ( const auto & column : primary_sort_descr )
2018-01-21 07:30:07 +00:00
if ( column_name = = column . column_name )
return true ;
2018-04-04 20:37:28 +00:00
if ( partition_expr_ast & & partition_expr_ast - > children . at ( 0 ) - > getColumnName ( ) = = column_name )
return true ;
2018-03-16 06:51:37 +00:00
if ( const ASTFunction * func = typeid_cast < const ASTFunction * > ( node . get ( ) ) )
if ( func - > arguments - > children . size ( ) = = 1 )
2018-04-04 20:37:28 +00:00
return isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions ( func - > arguments - > children . front ( ) ) ;
2018-03-16 06:51:37 +00:00
2018-01-21 07:30:07 +00:00
return false ;
}
bool MergeTreeData : : mayBenefitFromIndexForIn ( const ASTPtr & left_in_operand ) const
{
2018-04-03 21:16:58 +00:00
/// Make sure that the left side of the IN operator contain part of the primary key.
/// If there is a tuple on the left side of the IN operator, at least one item of the tuple must be part of the primary key (probably wrapped by a chain of some acceptable functions).
2018-01-21 07:30:07 +00:00
const ASTFunction * left_in_operand_tuple = typeid_cast < const ASTFunction * > ( left_in_operand . get ( ) ) ;
if ( left_in_operand_tuple & & left_in_operand_tuple - > name = = " tuple " )
{
for ( const auto & item : left_in_operand_tuple - > arguments - > children )
2018-04-04 20:37:28 +00:00
if ( isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions ( item ) )
2018-04-03 18:37:35 +00:00
return true ;
2018-01-21 07:30:07 +00:00
2018-04-03 18:37:35 +00:00
/// The tuple itself may be part of the primary key, so check that as a last resort.
2018-04-04 20:37:28 +00:00
return isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions ( left_in_operand ) ;
2018-01-21 07:30:07 +00:00
}
else
{
2018-04-04 20:37:28 +00:00
return isPrimaryKeyOrPartitionKeyColumnPossiblyWrappedInFunctions ( left_in_operand ) ;
2018-01-21 07:30:07 +00:00
}
}
2017-09-11 22:40:51 +00:00
2014-03-09 17:36:01 +00:00
}