2019-05-31 04:03:46 +00:00
# include "MergeTreeDataMergerMutator.h"
2020-04-02 16:28:50 +00:00
# include <Storages/MergeTree/MergeTreeSequentialSource.h>
2017-04-01 09:19:00 +00:00
# include <Storages/MergeTree/MergedBlockOutputStream.h>
2019-06-18 12:54:27 +00:00
# include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
2017-04-01 09:19:00 +00:00
# include <Storages/MergeTree/SimpleMergeSelector.h>
# include <Storages/MergeTree/AllMergeSelector.h>
2019-04-15 09:30:45 +00:00
# include <Storages/MergeTree/TTLMergeSelector.h>
2017-04-01 09:19:00 +00:00
# include <Storages/MergeTree/MergeList.h>
2021-02-10 14:12:49 +00:00
# include <Storages/MergeTree/MergeTreeDataWriter.h>
2018-07-18 12:17:48 +00:00
# include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
2021-09-16 21:19:58 +00:00
# include <Storages/MergeTree/FutureMergedMutatedPart.h>
# include <Storages/MergeTree/IMergeTreeDataPart.h>
# include <Storages/MergeTree/MergeTreeData.h>
# include <Storages/MergeTree/MergeProgress.h>
# include <Storages/MergeTree/MergeTask.h>
2022-03-18 19:31:44 +00:00
# include <Storages/MergeTree/ActiveDataPartSet.h>
2021-09-16 21:19:58 +00:00
2021-10-15 10:11:57 +00:00
# include <Processors/Transforms/TTLTransform.h>
# include <Processors/Transforms/TTLCalcTransform.h>
2021-10-08 14:03:54 +00:00
# include <Processors/Transforms/DistinctSortedTransform.h>
2020-04-02 16:28:50 +00:00
# include <Processors/Merges/MergingSortedTransform.h>
# include <Processors/Merges/CollapsingSortedTransform.h>
# include <Processors/Merges/SummingSortedTransform.h>
# include <Processors/Merges/ReplacingSortedTransform.h>
# include <Processors/Merges/GraphiteRollupSortedTransform.h>
# include <Processors/Merges/AggregatingSortedTransform.h>
# include <Processors/Merges/VersionedCollapsingTransform.h>
2021-02-10 14:12:49 +00:00
# include <Processors/Sources/SourceFromSingleChunk.h>
2020-04-02 16:28:50 +00:00
# include <Processors/Transforms/ExpressionTransform.h>
# include <Processors/Transforms/MaterializingTransform.h>
2018-09-03 13:36:58 +00:00
# include <Interpreters/MutationsInterpreter.h>
2022-03-18 19:31:44 +00:00
# include <Interpreters/MergeTreeTransaction.h>
2020-05-20 20:16:32 +00:00
# include <Interpreters/Context.h>
2017-04-01 09:19:00 +00:00
# include <Common/interpolate.h>
2017-07-13 20:58:19 +00:00
# include <Common/typeid_cast.h>
2020-03-31 16:18:18 +00:00
# include <Common/escapeForFileName.h>
2020-08-26 15:29:46 +00:00
# include <Parsers/queryToString.h>
2020-07-28 14:38:34 +00:00
2016-10-27 22:50:02 +00:00
# include <cmath>
2020-07-28 14:38:34 +00:00
# include <ctime>
# include <numeric>
2016-10-24 02:02:37 +00:00
2020-03-30 12:51:05 +00:00
# include <boost/algorithm/string/replace.hpp>
2016-10-27 22:50:02 +00:00
namespace CurrentMetrics
{
2021-09-30 21:26:24 +00:00
extern const Metric BackgroundMergesAndMutationsPoolTask ;
2016-10-27 22:50:02 +00:00
}
2014-03-13 12:48:07 +00:00
namespace DB
{
2016-01-11 21:46:36 +00:00
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int LOGICAL_ERROR ;
2022-02-03 18:57:09 +00:00
extern const int ABORTED ;
2016-01-11 21:46:36 +00:00
}
2016-12-22 02:04:32 +00:00
/// Do not start to merge parts, if free space is less than sum size of parts times specified coefficient.
/// This value is chosen to not allow big merges to eat all free space. Thus allowing small merges to proceed.
static const double DISK_USAGE_COEFFICIENT_TO_SELECT = 2 ;
/// To do merge, reserve amount of space equals to sum size of parts times specified coefficient.
/// Must be strictly less than DISK_USAGE_COEFFICIENT_TO_SELECT,
/// because between selecting parts to merge and doing merge, amount of free space could have decreased.
static const double DISK_USAGE_COEFFICIENT_TO_RESERVE = 1.1 ;
2014-03-13 12:48:07 +00:00
2021-09-30 21:26:24 +00:00
MergeTreeDataMergerMutator : : MergeTreeDataMergerMutator ( MergeTreeData & data_ , size_t max_tasks_count_ )
: data ( data_ ) , max_tasks_count ( max_tasks_count_ ) , log ( & Poco : : Logger : : get ( data . getLogName ( ) + " (MergerMutator) " ) )
2016-03-25 11:48:45 +00:00
{
}
2014-03-13 12:48:07 +00:00
2020-09-04 10:52:51 +00:00
UInt64 MergeTreeDataMergerMutator : : getMaxSourcePartsSizeForMerge ( ) const
2016-10-27 22:50:02 +00:00
{
2021-09-30 21:26:24 +00:00
size_t scheduled_tasks_count = CurrentMetrics : : values [ CurrentMetrics : : BackgroundMergesAndMutationsPoolTask ] . load ( std : : memory_order_relaxed ) ;
2016-11-24 22:41:27 +00:00
2021-09-30 21:26:24 +00:00
return getMaxSourcePartsSizeForMerge ( max_tasks_count , scheduled_tasks_count ) ;
2016-11-24 22:41:27 +00:00
}
2021-09-30 21:26:24 +00:00
UInt64 MergeTreeDataMergerMutator : : getMaxSourcePartsSizeForMerge ( size_t max_count , size_t scheduled_tasks_count ) const
2016-11-24 22:41:27 +00:00
{
2021-09-30 21:26:24 +00:00
if ( scheduled_tasks_count > max_count )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Logical error: invalid argument passed to \
getMaxSourcePartsSize : scheduled_tasks_count = { } > max_count = { } " , scheduled_tasks_count, max_count);
2016-10-27 22:50:02 +00:00
2021-09-30 21:26:24 +00:00
size_t free_entries = max_count - scheduled_tasks_count ;
2019-08-26 14:24:29 +00:00
const auto data_settings = data . getSettings ( ) ;
2016-12-06 03:09:07 +00:00
2020-07-27 20:18:23 +00:00
/// Always allow maximum size if one or less pool entries is busy.
/// One entry is probably the entry where this function is executed.
/// This will protect from bad settings.
2018-12-17 14:10:23 +00:00
UInt64 max_size = 0 ;
2021-09-30 21:26:24 +00:00
if ( scheduled_tasks_count < = 1 | | free_entries > = data_settings - > number_of_free_entries_in_pool_to_lower_max_size_of_merge )
2019-08-13 10:29:31 +00:00
max_size = data_settings - > max_bytes_to_merge_at_max_space_in_pool ;
2017-04-01 07:20:54 +00:00
else
max_size = interpolateExponential (
2019-08-13 10:29:31 +00:00
data_settings - > max_bytes_to_merge_at_min_space_in_pool ,
data_settings - > max_bytes_to_merge_at_max_space_in_pool ,
static_cast < double > ( free_entries ) / data_settings - > number_of_free_entries_in_pool_to_lower_max_size_of_merge ) ;
2016-10-27 22:50:02 +00:00
2020-01-09 14:50:34 +00:00
return std : : min ( max_size , static_cast < UInt64 > ( data . getStoragePolicy ( ) - > getMaxUnreservedFreeSpace ( ) / DISK_USAGE_COEFFICIENT_TO_SELECT ) ) ;
2016-10-27 22:50:02 +00:00
}
2014-03-13 12:48:07 +00:00
2020-09-02 08:28:46 +00:00
UInt64 MergeTreeDataMergerMutator : : getMaxSourcePartSizeForMutation ( ) const
2019-06-17 19:41:48 +00:00
{
2019-08-26 14:24:29 +00:00
const auto data_settings = data . getSettings ( ) ;
2021-09-30 21:26:24 +00:00
size_t occupied = CurrentMetrics : : values [ CurrentMetrics : : BackgroundMergesAndMutationsPoolTask ] . load ( std : : memory_order_relaxed ) ;
2019-08-22 19:35:46 +00:00
2020-01-09 14:50:34 +00:00
/// DataPart can be store only at one disk. Get maximum reservable free space at all disks.
UInt64 disk_space = data . getStoragePolicy ( ) - > getMaxUnreservedFreeSpace ( ) ;
2020-01-09 11:38:26 +00:00
2019-08-22 19:35:46 +00:00
/// Allow mutations only if there are enough threads, leave free threads for merges else
2021-09-30 21:26:24 +00:00
if ( occupied < = 1
| | max_tasks_count - occupied > = data_settings - > number_of_free_entries_in_pool_to_execute_mutation )
2020-01-09 11:38:26 +00:00
return static_cast < UInt64 > ( disk_space / DISK_USAGE_COEFFICIENT_TO_RESERVE ) ;
2019-08-22 19:35:46 +00:00
return 0 ;
2019-06-17 19:41:48 +00:00
}
2020-11-10 14:42:56 +00:00
SelectPartsDecision MergeTreeDataMergerMutator : : selectPartsToMerge (
2021-09-16 21:19:58 +00:00
FutureMergedMutatedPartPtr future_part ,
2017-04-01 07:20:54 +00:00
bool aggressive ,
size_t max_total_size_to_merge ,
2018-01-12 17:30:21 +00:00
const AllowedMergingPredicate & can_merge_callback ,
2020-09-04 06:55:19 +00:00
bool merge_with_ttl_allowed ,
2021-05-17 11:14:09 +00:00
const MergeTreeTransactionPtr & txn ,
2020-04-16 18:47:20 +00:00
String * out_disable_reason )
2014-03-13 12:48:07 +00:00
{
2022-03-18 19:31:44 +00:00
MergeTreeData : : DataPartsVector data_parts ;
if ( txn )
{
/// Merge predicate (for simple MergeTree) allows to merge two parts only if both parts are visible for merge transaction.
/// So at the first glance we could just get all active parts.
/// Active parts include uncommitted parts, but it's ok and merge predicate handles it.
/// However, it's possible that some transaction is trying to remove a part in the middle, for example, all_2_2_0.
/// If parts all_1_1_0 and all_3_3_0 are active and visible for merge transaction, then we would try to merge them.
/// But it's wrong, because all_2_2_0 may become active again if transaction will roll back.
/// That's why we must include some outdated parts into `data_part`, more precisely, such parts that removal is not committed.
MergeTreeData : : DataPartsVector active_parts ;
MergeTreeData : : DataPartsVector outdated_parts ;
{
auto lock = data . lockParts ( ) ;
active_parts = data . getDataPartsVectorForInternalUsage ( { MergeTreeData : : DataPartState : : Active } , lock ) ;
outdated_parts = data . getDataPartsVectorForInternalUsage ( { MergeTreeData : : DataPartState : : Outdated } , lock ) ;
}
ActiveDataPartSet active_parts_set { data . format_version } ;
for ( const auto & part : active_parts )
active_parts_set . add ( part - > name ) ;
for ( const auto & part : outdated_parts )
{
/// We don't need rolled back parts.
/// NOTE When rolling back a transaction we set creation_csn to RolledBackCSN at first
/// and then remove part from working set, so there's no race condition
if ( part - > version . creation_csn = = Tx : : RolledBackCSN )
continue ;
/// We don't need parts that are finally removed.
/// NOTE There's a minor race condition: we may get UnknownCSN if a transaction has been just committed concurrently.
/// But it's not a problem if we will add such part to `data_parts`.
if ( part - > version . removal_csn ! = Tx : : UnknownCSN )
continue ;
active_parts_set . add ( part - > name ) ;
}
/// Restore "active" parts set from selected active and outdated parts
auto remove_pred = [ & ] ( const MergeTreeData : : DataPartPtr & part ) - > bool
{
return active_parts_set . getContainingPart ( part - > info ) ! = part - > name ;
} ;
auto new_end_it = std : : remove_if ( active_parts . begin ( ) , active_parts . end ( ) , remove_pred ) ;
active_parts . erase ( new_end_it , active_parts . end ( ) ) ;
new_end_it = std : : remove_if ( outdated_parts . begin ( ) , outdated_parts . end ( ) , remove_pred ) ;
outdated_parts . erase ( new_end_it , outdated_parts . end ( ) ) ;
std : : merge ( active_parts . begin ( ) , active_parts . end ( ) ,
outdated_parts . begin ( ) , outdated_parts . end ( ) ,
std : : back_inserter ( data_parts ) , MergeTreeData : : LessDataPart ( ) ) ;
}
else
{
/// Simply get all active parts
data_parts = data . getDataPartsVectorForInternalUsage ( ) ;
}
2019-08-26 14:24:29 +00:00
const auto data_settings = data . getSettings ( ) ;
2020-09-02 12:16:12 +00:00
auto metadata_snapshot = data . getInMemoryMetadataPtr ( ) ;
2014-03-13 12:48:07 +00:00
2017-04-01 07:20:54 +00:00
if ( data_parts . empty ( ) )
2018-01-12 17:30:21 +00:00
{
2018-01-19 18:42:22 +00:00
if ( out_disable_reason )
* out_disable_reason = " There are no parts in the table " ;
2020-11-10 14:42:56 +00:00
return SelectPartsDecision : : CANNOT_SELECT ;
2018-01-12 17:30:21 +00:00
}
2016-03-25 11:48:45 +00:00
2020-07-28 14:38:34 +00:00
time_t current_time = std : : time ( nullptr ) ;
2014-03-13 12:48:07 +00:00
2020-09-03 13:29:18 +00:00
IMergeSelector : : PartsRanges parts_ranges ;
2014-03-13 12:48:07 +00:00
2020-10-20 15:10:24 +00:00
StoragePolicyPtr storage_policy = data . getStoragePolicy ( ) ;
2020-10-22 12:41:01 +00:00
/// Volumes with stopped merges are extremely rare situation.
/// Check it once and don't check each part (this is bad for performance).
bool has_volumes_with_disabled_merges = storage_policy - > hasAnyVolumeWithDisabledMerges ( ) ;
2020-10-20 15:10:24 +00:00
2017-08-14 18:16:11 +00:00
const String * prev_partition_id = nullptr ;
2020-04-13 15:21:05 +00:00
/// Previous part only in boundaries of partition frame
2017-04-01 07:20:54 +00:00
const MergeTreeData : : DataPartPtr * prev_part = nullptr ;
2020-09-03 13:02:24 +00:00
2020-12-16 14:31:17 +00:00
size_t parts_selected_precondition = 0 ;
2017-04-01 07:20:54 +00:00
for ( const MergeTreeData : : DataPartPtr & part : data_parts )
{
2020-09-03 13:02:24 +00:00
const String & partition_id = part - > info . partition_id ;
if ( ! prev_partition_id | | partition_id ! = * prev_partition_id )
{
2020-09-03 13:29:18 +00:00
if ( parts_ranges . empty ( ) | | ! parts_ranges . back ( ) . empty ( ) )
parts_ranges . emplace_back ( ) ;
2021-05-13 14:04:36 +00:00
2020-09-03 13:02:24 +00:00
/// New partition frame.
prev_partition_id = & partition_id ;
prev_part = nullptr ;
}
2021-05-10 18:03:37 +00:00
/// Check predicate only for the first part in each range.
2020-04-13 15:21:05 +00:00
if ( ! prev_part )
2020-05-14 20:08:15 +00:00
{
2020-04-13 15:21:05 +00:00
/* Parts can be merged with themselves for TTL needs for example.
* So we have to check if this part is currently being inserted with quorum and so on and so forth .
* Obviously we have to check it manually only for the first part
* of each partition because it will be automatically checked for a pair of parts . */
2021-05-18 17:07:29 +00:00
if ( ! can_merge_callback ( nullptr , part , txn . get ( ) , nullptr ) )
2020-04-13 15:21:05 +00:00
continue ;
2021-05-13 11:29:59 +00:00
2021-06-22 13:47:42 +00:00
/// This part can be merged only with next parts (no prev part exists), so start
/// new interval if previous was not empty.
if ( ! parts_ranges . back ( ) . empty ( ) )
parts_ranges . emplace_back ( ) ;
2020-05-14 20:08:15 +00:00
}
2020-09-03 13:02:24 +00:00
else
2017-04-01 07:20:54 +00:00
{
2020-09-03 13:02:24 +00:00
/// If we cannot merge with previous part we had to start new parts
/// interval (in the same partition)
2021-05-18 17:07:29 +00:00
if ( ! can_merge_callback ( * prev_part , part , txn . get ( ) , nullptr ) )
2020-09-03 13:02:24 +00:00
{
2021-06-22 13:47:42 +00:00
/// Now we have no previous part
prev_part = nullptr ;
/// Mustn't be empty
2021-05-10 18:03:37 +00:00
assert ( ! parts_ranges . back ( ) . empty ( ) ) ;
2020-09-03 13:02:24 +00:00
2021-06-22 13:47:42 +00:00
/// Some parts cannot be merged with previous parts and also cannot be merged with themselves,
/// for example, merge is already assigned for such parts, or they participate in quorum inserts
/// and so on.
/// Also we don't start new interval here (maybe all next parts cannot be merged and we don't want to have empty interval)
2021-07-28 19:21:48 +00:00
if ( ! can_merge_callback ( nullptr , part , txn . get ( ) , nullptr ) )
2021-06-22 13:47:42 +00:00
continue ;
/// Starting new interval in the same partition
parts_ranges . emplace_back ( ) ;
2020-09-03 13:02:24 +00:00
}
2017-04-01 07:20:54 +00:00
}
2014-03-13 12:48:07 +00:00
2017-04-01 07:20:54 +00:00
IMergeSelector : : Part part_info ;
2020-03-23 13:32:02 +00:00
part_info . size = part - > getBytesOnDisk ( ) ;
2017-04-01 07:20:54 +00:00
part_info . age = current_time - part - > modification_time ;
2017-08-14 18:16:11 +00:00
part_info . level = part - > info . level ;
2017-04-01 07:20:54 +00:00
part_info . data = & part ;
2020-09-14 19:15:25 +00:00
part_info . ttl_infos = & part - > ttl_infos ;
2020-09-02 16:15:41 +00:00
part_info . compression_codec_desc = part - > default_codec - > getFullCodecDesc ( ) ;
2020-10-22 12:41:01 +00:00
part_info . shall_participate_in_merges = has_volumes_with_disabled_merges ? part - > shallParticipateInMerges ( storage_policy ) : true ;
2019-04-15 09:30:45 +00:00
2020-12-16 14:31:17 +00:00
+ + parts_selected_precondition ;
2020-09-03 13:29:18 +00:00
parts_ranges . back ( ) . emplace_back ( part_info ) ;
2016-10-30 03:39:28 +00:00
2017-08-14 18:16:11 +00:00
/// Check for consistency of data parts. If assertion is failed, it requires immediate investigation.
if ( prev_part & & part - > info . partition_id = = ( * prev_part ) - > info . partition_id
2018-02-19 17:32:37 +00:00
& & part - > info . min_block < = ( * prev_part ) - > info . max_block )
2017-04-01 07:20:54 +00:00
{
2021-05-10 18:03:37 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Part {} intersects previous part {} " , part - > name , ( * prev_part ) - > name ) ;
2017-04-01 07:20:54 +00:00
}
2016-11-18 00:20:22 +00:00
2017-04-01 07:20:54 +00:00
prev_part = & part ;
}
2014-03-13 12:48:07 +00:00
2020-12-16 14:31:17 +00:00
if ( parts_selected_precondition = = 0 )
{
if ( out_disable_reason )
* out_disable_reason = " No parts satisfy preconditions for merge " ;
return SelectPartsDecision : : CANNOT_SELECT ;
}
2020-09-03 13:29:18 +00:00
IMergeSelector : : PartsRange parts_to_merge ;
2016-11-22 19:34:36 +00:00
2020-09-04 06:55:19 +00:00
if ( metadata_snapshot - > hasAnyTTL ( ) & & merge_with_ttl_allowed & & ! ttl_merges_blocker . isCancelled ( ) )
2019-04-15 09:30:45 +00:00
{
2020-10-27 11:04:03 +00:00
/// TTL delete is preferred to recompression
2020-09-02 10:30:04 +00:00
TTLDeleteMergeSelector delete_ttl_selector (
2020-09-04 14:08:43 +00:00
next_delete_ttl_merge_times_by_partition ,
2020-07-28 14:38:34 +00:00
current_time ,
data_settings - > merge_with_ttl_timeout ,
data_settings - > ttl_only_drop_parts ) ;
2020-08-31 19:50:42 +00:00
2020-09-04 10:52:51 +00:00
parts_to_merge = delete_ttl_selector . select ( parts_ranges , max_total_size_to_merge ) ;
2020-09-02 10:30:04 +00:00
if ( ! parts_to_merge . empty ( ) )
2020-09-07 07:59:14 +00:00
{
2021-09-16 21:19:58 +00:00
future_part - > merge_type = MergeType : : TTL_DELETE ;
2020-09-07 07:59:14 +00:00
}
2020-09-02 12:16:12 +00:00
else if ( metadata_snapshot - > hasAnyRecompressionTTL ( ) )
2020-09-02 10:30:04 +00:00
{
TTLRecompressMergeSelector recompress_ttl_selector (
2020-09-04 14:08:43 +00:00
next_recompress_ttl_merge_times_by_partition ,
2020-09-02 10:30:04 +00:00
current_time ,
2020-09-04 14:08:43 +00:00
data_settings - > merge_with_recompression_ttl_timeout ,
2020-09-02 12:16:12 +00:00
metadata_snapshot - > getRecompressionTTLs ( ) ) ;
2020-09-02 10:30:04 +00:00
2020-09-04 10:52:51 +00:00
parts_to_merge = recompress_ttl_selector . select ( parts_ranges , max_total_size_to_merge ) ;
2020-09-02 10:30:04 +00:00
if ( ! parts_to_merge . empty ( ) )
2021-09-16 21:19:58 +00:00
future_part - > merge_type = MergeType : : TTL_RECOMPRESS ;
2020-09-02 10:30:04 +00:00
}
2019-04-15 09:30:45 +00:00
}
2014-03-13 12:48:07 +00:00
2017-04-01 07:20:54 +00:00
if ( parts_to_merge . empty ( ) )
2018-01-12 17:30:21 +00:00
{
2020-07-28 14:38:34 +00:00
SimpleMergeSelector : : Settings merge_settings ;
2021-05-25 13:21:17 +00:00
/// Override value from table settings
merge_settings . max_parts_to_merge_at_once = data_settings - > max_parts_to_merge_at_once ;
2020-07-28 14:38:34 +00:00
if ( aggressive )
merge_settings . base = 1 ;
parts_to_merge = SimpleMergeSelector ( merge_settings )
2020-09-03 13:29:18 +00:00
. select ( parts_ranges , max_total_size_to_merge ) ;
2014-04-04 10:37:33 +00:00
2020-07-28 14:38:34 +00:00
/// Do not allow to "merge" part with itself for regular merges, unless it is a TTL-merge where it is ok to remove some values with expired ttl
if ( parts_to_merge . size ( ) = = 1 )
throw Exception ( " Logical error: merge selector returned only one part to merge " , ErrorCodes : : LOGICAL_ERROR ) ;
if ( parts_to_merge . empty ( ) )
{
if ( out_disable_reason )
* out_disable_reason = " There is no need to merge parts according to merge selector algorithm " ;
2020-11-10 14:42:56 +00:00
return SelectPartsDecision : : CANNOT_SELECT ;
2020-07-28 14:38:34 +00:00
}
}
2016-10-30 08:15:55 +00:00
2017-08-16 19:24:50 +00:00
MergeTreeData : : DataPartsVector parts ;
2017-04-01 07:20:54 +00:00
parts . reserve ( parts_to_merge . size ( ) ) ;
for ( IMergeSelector : : Part & part_info : parts_to_merge )
{
const MergeTreeData : : DataPartPtr & part = * static_cast < const MergeTreeData : : DataPartPtr * > ( part_info . data ) ;
parts . push_back ( part ) ;
}
2014-03-13 12:48:07 +00:00
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Selected {} parts from {} to {} " , parts . size ( ) , parts . front ( ) - > name , parts . back ( ) - > name ) ;
2021-09-16 21:19:58 +00:00
future_part - > assign ( std : : move ( parts ) ) ;
2020-11-10 14:42:56 +00:00
return SelectPartsDecision : : SELECTED ;
2014-03-13 12:48:07 +00:00
}
2020-11-10 14:42:56 +00:00
SelectPartsDecision MergeTreeDataMergerMutator : : selectAllPartsToMergeWithinPartition (
2021-09-16 21:19:58 +00:00
FutureMergedMutatedPartPtr future_part ,
2017-04-01 07:20:54 +00:00
const AllowedMergingPredicate & can_merge ,
2017-08-14 18:16:11 +00:00
const String & partition_id ,
2018-01-12 17:30:21 +00:00
bool final ,
2020-10-15 12:15:02 +00:00
const StorageMetadataPtr & metadata_snapshot ,
2021-06-02 20:03:44 +00:00
const MergeTreeTransactionPtr & txn ,
2020-11-20 14:29:13 +00:00
String * out_disable_reason ,
2020-12-04 14:01:59 +00:00
bool optimize_skip_merged_partitions )
2016-05-16 18:43:38 +00:00
{
2017-08-14 18:16:11 +00:00
MergeTreeData : : DataPartsVector parts = selectAllPartsFromPartition ( partition_id ) ;
2017-04-01 07:20:54 +00:00
if ( parts . empty ( ) )
2021-10-06 17:07:35 +00:00
{
if ( out_disable_reason )
* out_disable_reason = " There are no parts inside partition " ;
2020-11-19 22:22:40 +00:00
return SelectPartsDecision : : CANNOT_SELECT ;
2021-10-06 17:07:35 +00:00
}
2017-04-01 07:20:54 +00:00
if ( ! final & & parts . size ( ) = = 1 )
2018-01-12 17:30:21 +00:00
{
2018-01-19 18:42:22 +00:00
if ( out_disable_reason )
* out_disable_reason = " There is only one part inside partition " ;
2020-11-10 14:42:56 +00:00
return SelectPartsDecision : : CANNOT_SELECT ;
2018-01-12 17:30:21 +00:00
}
2017-04-01 07:20:54 +00:00
2020-10-13 18:25:45 +00:00
/// If final, optimize_skip_merged_partitions is true and we have only one part in partition with level > 0
2020-10-15 12:15:02 +00:00
/// than we don't select it to merge. But if there are some expired TTL then merge is needed
2020-12-04 14:01:59 +00:00
if ( final & & optimize_skip_merged_partitions & & parts . size ( ) = = 1 & & parts [ 0 ] - > info . level > 0 & &
2020-10-15 12:15:02 +00:00
( ! metadata_snapshot - > hasAnyTTL ( ) | | parts [ 0 ] - > checkAllTTLCalculated ( metadata_snapshot ) ) )
2020-10-13 14:51:08 +00:00
{
2021-10-06 17:07:35 +00:00
if ( out_disable_reason )
* out_disable_reason = " Partition skipped due to optimize_skip_merged_partitions " ;
2020-11-10 14:42:56 +00:00
return SelectPartsDecision : : NOTHING_TO_MERGE ;
2020-10-13 14:51:08 +00:00
}
2017-10-02 16:34:01 +00:00
auto it = parts . begin ( ) ;
auto prev_it = it ;
2017-04-01 07:20:54 +00:00
2018-12-17 14:10:23 +00:00
UInt64 sum_bytes = 0 ;
2017-04-01 07:20:54 +00:00
while ( it ! = parts . end ( ) )
{
2017-10-02 16:34:01 +00:00
/// For the case of one part, we check that it can be merged "with itself".
2021-06-02 20:03:44 +00:00
if ( ( it ! = parts . begin ( ) | | parts . size ( ) = = 1 ) & & ! can_merge ( * prev_it , * it , txn . get ( ) , out_disable_reason ) )
2017-10-02 16:34:01 +00:00
{
2020-11-19 22:22:40 +00:00
return SelectPartsDecision : : CANNOT_SELECT ;
2017-10-02 16:34:01 +00:00
}
2017-04-01 07:20:54 +00:00
2020-03-23 13:32:02 +00:00
sum_bytes + = ( * it ) - > getBytesOnDisk ( ) ;
2017-04-01 07:20:54 +00:00
prev_it = it ;
+ + it ;
}
2022-02-03 08:21:19 +00:00
auto available_disk_space = data . getStoragePolicy ( ) - > getMaxUnreservedFreeSpace ( ) ;
2017-04-01 07:20:54 +00:00
/// Enough disk space to cover the new merge with a margin.
2018-01-12 17:30:21 +00:00
auto required_disk_space = sum_bytes * DISK_USAGE_COEFFICIENT_TO_SELECT ;
if ( available_disk_space < = required_disk_space )
2017-04-01 07:20:54 +00:00
{
2017-08-04 14:00:26 +00:00
time_t now = time ( nullptr ) ;
2017-04-01 07:20:54 +00:00
if ( now - disk_space_warning_time > 3600 )
{
disk_space_warning_time = now ;
2020-05-23 22:24:01 +00:00
LOG_WARNING ( log ,
2020-05-23 21:41:35 +00:00
" Won't merge parts from {} to {} because not enough free space: {} free and unreserved "
" , {} required now (+{}% on overhead); suppressing similar warnings for the next hour " ,
parts . front ( ) - > name ,
( * prev_it ) - > name ,
2020-05-30 21:35:52 +00:00
ReadableSize ( available_disk_space ) ,
ReadableSize ( sum_bytes ) ,
2020-05-23 21:41:35 +00:00
static_cast < int > ( ( DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0 ) * 100 ) ) ;
2017-04-01 07:20:54 +00:00
}
2018-01-12 17:30:21 +00:00
2018-01-19 18:42:22 +00:00
if ( out_disable_reason )
2020-05-30 21:35:52 +00:00
* out_disable_reason = fmt : : format ( " Insufficient available disk space, required {} " , ReadableSize ( required_disk_space ) ) ;
2018-01-12 17:30:21 +00:00
2020-11-10 14:42:56 +00:00
return SelectPartsDecision : : CANNOT_SELECT ;
2017-04-01 07:20:54 +00:00
}
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Selected {} parts from {} to {} " , parts . size ( ) , parts . front ( ) - > name , parts . back ( ) - > name ) ;
2021-09-16 21:19:58 +00:00
future_part - > assign ( std : : move ( parts ) ) ;
2020-09-02 08:18:50 +00:00
2020-11-10 14:42:56 +00:00
return SelectPartsDecision : : SELECTED ;
2016-05-16 18:43:38 +00:00
}
2018-04-20 16:18:16 +00:00
MergeTreeData : : DataPartsVector MergeTreeDataMergerMutator : : selectAllPartsFromPartition ( const String & partition_id )
2016-01-28 01:00:27 +00:00
{
2017-04-01 07:20:54 +00:00
MergeTreeData : : DataPartsVector parts_from_partition ;
2016-01-28 01:00:27 +00:00
2021-11-17 18:14:14 +00:00
MergeTreeData : : DataParts data_parts = data . getDataPartsForInternalUsage ( ) ;
2016-01-28 01:00:27 +00:00
2020-03-09 01:50:33 +00:00
for ( const auto & current_part : data_parts )
2017-04-01 07:20:54 +00:00
{
2017-08-14 18:16:11 +00:00
if ( current_part - > info . partition_id ! = partition_id )
2017-04-01 07:20:54 +00:00
continue ;
2016-01-28 01:00:27 +00:00
2017-08-14 18:16:11 +00:00
parts_from_partition . push_back ( current_part ) ;
2017-04-01 07:20:54 +00:00
}
2016-01-28 01:00:27 +00:00
2017-04-01 07:20:54 +00:00
return parts_from_partition ;
2016-01-28 01:00:27 +00:00
}
2014-03-13 12:48:07 +00:00
2021-09-16 21:19:58 +00:00
/// parts should be sorted.
MergeTaskPtr MergeTreeDataMergerMutator : : mergePartsToTemporaryPart (
FutureMergedMutatedPartPtr future_part ,
const StorageMetadataPtr & metadata_snapshot ,
MergeList : : Entry * merge_entry ,
2021-09-24 13:57:44 +00:00
std : : unique_ptr < MergeListElement > projection_merge_list_element ,
TableLockHolder ,
2021-09-16 21:19:58 +00:00
time_t time_of_merge ,
ContextPtr context ,
ReservationSharedPtr space_reservation ,
bool deduplicate ,
const Names & deduplicate_by_columns ,
2017-04-01 07:20:54 +00:00
const MergeTreeData : : MergingParams & merging_params ,
2022-02-14 19:50:08 +00:00
const MergeTreeTransactionPtr & txn ,
2021-09-24 13:57:44 +00:00
const IMergeTreeDataPart * parent_part ,
2021-09-29 10:05:57 +00:00
const String & suffix )
2016-11-03 12:00:44 +00:00
{
2021-09-16 21:19:58 +00:00
return std : : make_shared < MergeTask > (
future_part ,
const_cast < StorageMetadataPtr & > ( metadata_snapshot ) ,
merge_entry ,
2021-09-24 13:57:44 +00:00
std : : move ( projection_merge_list_element ) ,
2021-09-16 21:19:58 +00:00
time_of_merge ,
context ,
space_reservation ,
deduplicate ,
deduplicate_by_columns ,
merging_params ,
2021-09-24 13:57:44 +00:00
parent_part ,
2021-09-29 10:05:57 +00:00
suffix ,
2022-02-14 19:50:08 +00:00
txn ,
2021-09-16 21:19:58 +00:00
& data ,
2021-09-10 21:16:09 +00:00
this ,
2021-09-16 21:19:58 +00:00
& merges_blocker ,
& ttl_merges_blocker ) ;
}
2018-03-14 16:43:18 +00:00
2017-04-01 07:20:54 +00:00
2021-09-16 21:19:58 +00:00
MutateTaskPtr MergeTreeDataMergerMutator : : mutatePartToTemporaryPart (
FutureMergedMutatedPartPtr future_part ,
StorageMetadataPtr metadata_snapshot ,
MutationCommandsConstPtr commands ,
MergeListEntry * merge_entry ,
time_t time_of_mutation ,
ContextPtr context ,
2022-02-14 19:50:08 +00:00
const MergeTreeTransactionPtr & txn ,
2021-09-16 21:19:58 +00:00
ReservationSharedPtr space_reservation ,
TableLockHolder & holder )
{
return std : : make_shared < MutateTask > (
future_part ,
metadata_snapshot ,
commands ,
merge_entry ,
time_of_mutation ,
context ,
space_reservation ,
holder ,
2022-02-14 19:50:08 +00:00
txn ,
2021-09-16 21:19:58 +00:00
data ,
* this ,
merges_blocker
) ;
2016-11-03 12:00:44 +00:00
}
2021-09-16 21:19:58 +00:00
MergeAlgorithm MergeTreeDataMergerMutator : : chooseMergeAlgorithm (
const MergeTreeData : : DataPartsVector & parts ,
size_t sum_rows_upper_bound ,
const NamesAndTypesList & gathering_columns ,
bool deduplicate ,
bool need_remove_expired_values ,
const MergeTreeData : : MergingParams & merging_params ) const
{
const auto data_settings = data . getSettings ( ) ;
2017-04-01 07:20:54 +00:00
2021-09-16 21:19:58 +00:00
if ( deduplicate )
return MergeAlgorithm : : Horizontal ;
if ( data_settings - > enable_vertical_merge_algorithm = = 0 )
return MergeAlgorithm : : Horizontal ;
if ( need_remove_expired_values )
return MergeAlgorithm : : Horizontal ;
2017-04-01 07:20:54 +00:00
2021-09-16 21:19:58 +00:00
for ( const auto & part : parts )
if ( ! part - > supportsVerticalMerge ( ) )
return MergeAlgorithm : : Horizontal ;
2017-04-01 07:20:54 +00:00
2021-09-16 21:19:58 +00:00
bool is_supported_storage =
merging_params . mode = = MergeTreeData : : MergingParams : : Ordinary | |
merging_params . mode = = MergeTreeData : : MergingParams : : Collapsing | |
merging_params . mode = = MergeTreeData : : MergingParams : : Replacing | |
merging_params . mode = = MergeTreeData : : MergingParams : : VersionedCollapsing ;
2017-04-01 07:20:54 +00:00
2021-09-16 21:19:58 +00:00
bool enough_ordinary_cols = gathering_columns . size ( ) > = data_settings - > vertical_merge_algorithm_min_columns_to_activate ;
2017-04-01 07:20:54 +00:00
2021-09-16 21:19:58 +00:00
bool enough_total_rows = sum_rows_upper_bound > = data_settings - > vertical_merge_algorithm_min_rows_to_activate ;
2017-04-01 07:20:54 +00:00
2021-09-16 21:19:58 +00:00
bool no_parts_overflow = parts . size ( ) < = RowSourcePart : : MAX_PARTS ;
2017-04-01 07:20:54 +00:00
2021-09-16 21:19:58 +00:00
auto merge_alg = ( is_supported_storage & & enough_total_rows & & enough_ordinary_cols & & no_parts_overflow ) ?
MergeAlgorithm : : Vertical : MergeAlgorithm : : Horizontal ;
2019-01-11 19:14:50 +00:00
2021-09-16 21:19:58 +00:00
return merge_alg ;
}
2019-01-11 19:14:50 +00:00
2016-11-03 12:00:44 +00:00
2021-09-16 21:19:58 +00:00
MergeTreeData : : DataPartPtr MergeTreeDataMergerMutator : : renameMergedTemporaryPart (
MergeTreeData : : MutableDataPartPtr & new_data_part ,
const MergeTreeData : : DataPartsVector & parts ,
2021-09-23 19:53:27 +00:00
const MergeTreeTransactionPtr & txn ,
2021-09-16 21:19:58 +00:00
MergeTreeData : : Transaction * out_transaction )
2016-11-03 12:00:44 +00:00
{
2022-02-03 18:57:09 +00:00
/// Some of source parts was possibly created in transaction, so non-transactional merge may break isolation.
if ( data . transactions_enabled . load ( std : : memory_order_relaxed ) & & ! txn )
throw Exception ( ErrorCodes : : ABORTED , " Cancelling merge, because it was done without starting transaction, "
" but transactions were enabled for this table " ) ;
2021-09-16 21:19:58 +00:00
/// Rename new part, add to the set and remove original parts.
2021-09-23 19:53:27 +00:00
auto replaced_parts = data . renameTempPartAndReplace ( new_data_part , txn . get ( ) , nullptr , out_transaction ) ;
2017-04-01 07:20:54 +00:00
2021-09-16 21:19:58 +00:00
/// Let's check that all original parts have been deleted and only them.
if ( replaced_parts . size ( ) ! = parts . size ( ) )
2017-04-01 07:20:54 +00:00
{
2021-09-16 21:19:58 +00:00
/** This is normal, although this happens rarely.
*
* The situation - was replaced 0 parts instead of N can be , for example , in the following case
* - we had A part , but there was no B and C parts ;
* - A , B - > AB was in the queue , but it has not been done , because there is no B part ;
* - AB , C - > ABC was in the queue , but it has not been done , because there are no AB and C parts ;
* - we have completed the task of downloading a B part ;
* - we started to make A , B - > AB merge , since all parts appeared ;
* - we decided to download ABC part from another replica , since it was impossible to make merge AB , C - > ABC ;
* - ABC part appeared . When it was added , old A , B , C parts were deleted ;
* - AB merge finished . AB part was added . But this is an obsolete part . The log will contain the message ` Obsolete part added ` ,
* then we get here .
*
* When M > N parts could be replaced ?
* - new block was added in ReplicatedMergeTreeBlockOutputStream ;
* - it was added to working dataset in memory and renamed on filesystem ;
* - but ZooKeeper transaction that adds it to reference dataset in ZK failed ;
* - and it is failed due to connection loss , so we don ' t rollback working dataset in memory ,
* because we don ' t know if the part was added to ZK or not
* ( see ReplicatedMergeTreeBlockOutputStream )
* - then method selectPartsToMerge selects a range and sees , that EphemeralLock for the block in this part is unlocked ,
* and so it is possible to merge a range skipping this part .
* ( NOTE : Merging with part that is not in ZK is not possible , see checks in ' createLogEntryToMergeParts ' . )
* - and after merge , this part will be removed in addition to parts that was merged .
*/
LOG_WARNING ( log , " Unexpected number of parts removed when adding {}: {} instead of {} " , new_data_part - > name , replaced_parts . size ( ) , parts . size ( ) ) ;
2017-04-01 07:20:54 +00:00
}
2021-09-16 21:19:58 +00:00
else
2017-04-01 07:20:54 +00:00
{
2021-09-16 21:19:58 +00:00
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
if ( parts [ i ] - > name ! = replaced_parts [ i ] - > name )
throw Exception ( " Unexpected part removed when adding " + new_data_part - > name + " : " + replaced_parts [ i ] - > name
+ " instead of " + parts [ i ] - > name , ErrorCodes : : LOGICAL_ERROR ) ;
2018-06-03 20:39:06 +00:00
}
2016-11-03 12:00:44 +00:00
2021-09-16 21:19:58 +00:00
LOG_TRACE ( log , " Merged {} parts: from {} to {} " , parts . size ( ) , parts . front ( ) - > name , parts . back ( ) - > name ) ;
return new_data_part ;
2020-06-25 16:55:45 +00:00
}
2021-09-16 21:19:58 +00:00
size_t MergeTreeDataMergerMutator : : estimateNeededDiskSpace ( const MergeTreeData : : DataPartsVector & source_parts )
2014-03-13 12:48:07 +00:00
{
2021-09-16 21:19:58 +00:00
size_t res = 0 ;
for ( const MergeTreeData : : DataPartPtr & part : source_parts )
res + = part - > getBytesOnDisk ( ) ;
2017-04-01 07:20:54 +00:00
2021-09-16 21:19:58 +00:00
return static_cast < size_t > ( res * DISK_USAGE_COEFFICIENT_TO_RESERVE ) ;
}
2017-04-01 07:20:54 +00:00
2021-09-16 21:19:58 +00:00
void MergeTreeDataMergerMutator : : splitMutationCommands (
MergeTreeData : : DataPartPtr part ,
const MutationCommands & commands ,
MutationCommands & for_interpreter ,
MutationCommands & for_file_renames )
{
ColumnsDescription part_columns ( part - > getColumns ( ) ) ;
2017-04-01 07:20:54 +00:00
2021-09-16 21:19:58 +00:00
if ( ! isWidePart ( part ) )
2020-09-03 08:59:41 +00:00
{
2021-09-16 21:19:58 +00:00
NameSet mutated_columns ;
for ( const auto & command : commands )
2020-09-03 08:59:41 +00:00
{
2021-09-16 21:19:58 +00:00
if ( command . type = = MutationCommand : : Type : : MATERIALIZE_INDEX
| | command . type = = MutationCommand : : Type : : MATERIALIZE_COLUMN
| | command . type = = MutationCommand : : Type : : MATERIALIZE_PROJECTION
| | command . type = = MutationCommand : : Type : : MATERIALIZE_TTL
| | command . type = = MutationCommand : : Type : : DELETE
| | command . type = = MutationCommand : : Type : : UPDATE )
{
for_interpreter . push_back ( command ) ;
for ( const auto & [ column_name , expr ] : command . column_to_update_expression )
mutated_columns . emplace ( column_name ) ;
if ( command . type = = MutationCommand : : Type : : MATERIALIZE_COLUMN )
mutated_columns . emplace ( command . column_name ) ;
}
else if ( command . type = = MutationCommand : : Type : : DROP_INDEX | | command . type = = MutationCommand : : Type : : DROP_PROJECTION )
{
for_file_renames . push_back ( command ) ;
}
else if ( part_columns . has ( command . column_name ) )
{
if ( command . type = = MutationCommand : : Type : : DROP_COLUMN )
{
mutated_columns . emplace ( command . column_name ) ;
}
else if ( command . type = = MutationCommand : : Type : : RENAME_COLUMN )
{
for_interpreter . push_back (
{
. type = MutationCommand : : Type : : READ_COLUMN ,
. column_name = command . rename_to ,
} ) ;
mutated_columns . emplace ( command . column_name ) ;
part_columns . rename ( command . column_name , command . rename_to ) ;
}
}
2020-09-03 08:59:41 +00:00
}
2021-09-16 21:19:58 +00:00
/// If it's compact part, then we don't need to actually remove files
/// from disk we just don't read dropped columns
for ( const auto & column : part - > getColumns ( ) )
2021-06-28 14:14:26 +00:00
{
2021-09-16 21:19:58 +00:00
if ( ! mutated_columns . count ( column . name ) )
for_interpreter . emplace_back (
MutationCommand { . type = MutationCommand : : Type : : READ_COLUMN , . column_name = column . name , . data_type = column . type } ) ;
2021-06-28 14:14:26 +00:00
}
2020-09-03 08:59:41 +00:00
}
2020-05-19 10:44:53 +00:00
else
2020-02-25 13:46:45 +00:00
{
2020-05-19 10:44:53 +00:00
for ( const auto & command : commands )
2020-02-25 13:46:45 +00:00
{
2020-05-19 16:03:10 +00:00
if ( command . type = = MutationCommand : : Type : : MATERIALIZE_INDEX
2021-07-31 18:17:06 +00:00
| | command . type = = MutationCommand : : Type : : MATERIALIZE_COLUMN
2021-02-10 14:12:49 +00:00
| | command . type = = MutationCommand : : Type : : MATERIALIZE_PROJECTION
2020-05-19 16:03:10 +00:00
| | command . type = = MutationCommand : : Type : : MATERIALIZE_TTL
| | command . type = = MutationCommand : : Type : : DELETE
| | command . type = = MutationCommand : : Type : : UPDATE )
2020-02-25 13:46:45 +00:00
{
2020-05-19 10:44:53 +00:00
for_interpreter . push_back ( command ) ;
}
2021-02-10 14:12:49 +00:00
else if ( command . type = = MutationCommand : : Type : : DROP_INDEX | | command . type = = MutationCommand : : Type : : DROP_PROJECTION )
2020-05-19 16:03:10 +00:00
{
for_file_renames . push_back ( command ) ;
}
2020-05-19 10:44:53 +00:00
/// If we don't have this column in source part, than we don't need
/// to materialize it
else if ( part_columns . has ( command . column_name ) )
2020-02-25 13:46:45 +00:00
{
2020-05-19 10:44:53 +00:00
if ( command . type = = MutationCommand : : Type : : READ_COLUMN )
2020-02-25 13:46:45 +00:00
{
2020-05-19 10:44:53 +00:00
for_interpreter . push_back ( command ) ;
}
else if ( command . type = = MutationCommand : : Type : : RENAME_COLUMN )
{
part_columns . rename ( command . column_name , command . rename_to ) ;
for_file_renames . push_back ( command ) ;
}
else
{
for_file_renames . push_back ( command ) ;
}
2020-02-25 13:46:45 +00:00
}
}
}
2020-01-17 13:54:22 +00:00
}
2020-02-17 15:44:13 +00:00
2021-11-03 20:29:48 +00:00
std : : pair < NamesAndTypesList , SerializationInfoByName >
MergeTreeDataMergerMutator : : getColumnsForNewDataPart (
2020-03-17 13:49:50 +00:00
MergeTreeData : : DataPartPtr source_part ,
const Block & updated_header ,
2020-05-19 09:54:56 +00:00
NamesAndTypesList storage_columns ,
2021-11-03 20:29:48 +00:00
const SerializationInfoByName & serialization_infos ,
2020-03-23 02:12:31 +00:00
const MutationCommands & commands_for_removes )
2020-02-17 15:44:13 +00:00
{
2020-03-17 13:49:50 +00:00
NameSet removed_columns ;
2020-07-27 09:42:37 +00:00
NameToNameMap renamed_columns_to_from ;
2021-11-03 20:29:48 +00:00
NameToNameMap renamed_columns_from_to ;
2021-11-08 18:24:38 +00:00
ColumnsDescription part_columns ( source_part - > getColumns ( ) ) ;
2021-11-03 20:29:48 +00:00
2020-07-27 09:42:37 +00:00
/// All commands are validated in AlterCommand so we don't care about order
2020-03-17 13:49:50 +00:00
for ( const auto & command : commands_for_removes )
{
2021-11-08 18:24:38 +00:00
/// If we don't have this column in source part, than we don't need to materialize it
if ( ! part_columns . has ( command . column_name ) )
continue ;
2020-03-17 13:49:50 +00:00
if ( command . type = = MutationCommand : : DROP_COLUMN )
removed_columns . insert ( command . column_name ) ;
2021-11-03 20:29:48 +00:00
2020-03-30 12:51:05 +00:00
if ( command . type = = MutationCommand : : RENAME_COLUMN )
2021-11-03 20:29:48 +00:00
{
2020-07-27 09:42:37 +00:00
renamed_columns_to_from . emplace ( command . rename_to , command . column_name ) ;
2021-11-03 20:29:48 +00:00
renamed_columns_from_to . emplace ( command . column_name , command . rename_to ) ;
}
}
SerializationInfoByName new_serialization_infos ;
for ( const auto & [ name , info ] : serialization_infos )
{
2022-03-15 01:48:21 +00:00
if ( removed_columns . count ( name ) )
2021-11-03 20:29:48 +00:00
continue ;
auto it = renamed_columns_from_to . find ( name ) ;
if ( it ! = renamed_columns_from_to . end ( ) )
new_serialization_infos . emplace ( it - > second , info ) ;
else
new_serialization_infos . emplace ( name , info ) ;
2020-03-17 13:49:50 +00:00
}
2021-11-03 20:29:48 +00:00
/// In compact parts we read all columns, because they all stored in a
/// single file
2022-03-15 01:48:21 +00:00
if ( ! isWidePart ( source_part ) )
2021-11-03 20:29:48 +00:00
return { updated_header . getNamesAndTypesList ( ) , new_serialization_infos } ;
2020-02-25 09:49:45 +00:00
Names source_column_names = source_part - > getColumns ( ) . getNames ( ) ;
2020-02-17 15:44:13 +00:00
NameSet source_columns_name_set ( source_column_names . begin ( ) , source_column_names . end ( ) ) ;
2020-05-19 09:54:56 +00:00
for ( auto it = storage_columns . begin ( ) ; it ! = storage_columns . end ( ) ; )
2020-02-17 15:44:13 +00:00
{
if ( updated_header . has ( it - > name ) )
{
auto updated_type = updated_header . getByName ( it - > name ) . type ;
if ( updated_type ! = it - > type )
it - > type = updated_type ;
+ + it ;
}
else
2020-05-19 09:54:56 +00:00
{
2020-07-27 09:42:37 +00:00
if ( ! source_columns_name_set . count ( it - > name ) )
{
/// Source part doesn't have column but some other column
/// was renamed to it's name.
auto renamed_it = renamed_columns_to_from . find ( it - > name ) ;
if ( renamed_it ! = renamed_columns_to_from . end ( )
& & source_columns_name_set . count ( renamed_it - > second ) )
+ + it ;
else
it = storage_columns . erase ( it ) ;
}
else
{
/// Check that this column was renamed to some other name
2021-11-03 20:29:48 +00:00
bool was_renamed = renamed_columns_from_to . count ( it - > name ) ;
bool was_removed = removed_columns . count ( it - > name ) ;
2020-07-27 09:42:37 +00:00
/// If we want to rename this column to some other name, than it
/// should it's previous version should be dropped or removed
if ( renamed_columns_to_from . count ( it - > name ) & & ! was_renamed & & ! was_removed )
throw Exception (
ErrorCodes : : LOGICAL_ERROR ,
" Incorrect mutation commands, trying to rename column {} to {}, but part {} already has column {} " , renamed_columns_to_from [ it - > name ] , it - > name , source_part - > name , it - > name ) ;
/// Column was renamed and no other column renamed to it's name
/// or column is dropped.
if ( ! renamed_columns_to_from . count ( it - > name ) & & ( was_renamed | | was_removed ) )
it = storage_columns . erase ( it ) ;
else
+ + it ;
}
2020-05-19 09:54:56 +00:00
}
2020-02-17 15:44:13 +00:00
}
2020-05-19 09:54:56 +00:00
2021-11-03 20:29:48 +00:00
return { storage_columns , new_serialization_infos } ;
2020-02-17 15:44:13 +00:00
}
2021-02-10 14:12:49 +00:00
2021-08-04 16:16:21 +00:00
ExecuteTTLType MergeTreeDataMergerMutator : : shouldExecuteTTL ( const StorageMetadataPtr & metadata_snapshot , const ColumnDependencies & dependencies )
2020-02-25 13:46:45 +00:00
{
2020-06-17 13:39:26 +00:00
if ( ! metadata_snapshot - > hasAnyTTL ( ) )
2021-08-04 14:16:13 +00:00
return ExecuteTTLType : : NONE ;
2020-02-25 13:46:45 +00:00
2021-08-04 16:16:21 +00:00
bool has_ttl_expression = false ;
2020-02-25 13:46:45 +00:00
for ( const auto & dependency : dependencies )
2020-03-18 13:16:59 +00:00
{
2021-08-04 14:16:13 +00:00
if ( dependency . kind = = ColumnDependency : : TTL_EXPRESSION )
has_ttl_expression = true ;
2020-03-18 13:16:59 +00:00
2021-08-04 14:16:13 +00:00
if ( dependency . kind = = ColumnDependency : : TTL_TARGET )
return ExecuteTTLType : : NORMAL ;
2020-08-26 15:29:46 +00:00
}
2021-08-04 14:16:13 +00:00
return has_ttl_expression ? ExecuteTTLType : : RECALCULATE : ExecuteTTLType : : NONE ;
2020-03-18 13:16:59 +00:00
}
2020-03-19 14:11:37 +00:00
2014-03-13 12:48:07 +00:00
}