2019-05-31 04:03:46 +00:00
# include "MergeTreeDataMergerMutator.h"
2020-04-02 16:28:50 +00:00
# include <Storages/MergeTree/MergeTreeSequentialSource.h>
2017-04-01 09:19:00 +00:00
# include <Storages/MergeTree/MergedBlockOutputStream.h>
2019-06-18 12:54:27 +00:00
# include <Storages/MergeTree/MergedColumnOnlyOutputStream.h>
2020-05-04 20:15:38 +00:00
# include <Disks/StoragePolicy.h>
2017-04-01 09:19:00 +00:00
# include <Storages/MergeTree/SimpleMergeSelector.h>
# include <Storages/MergeTree/AllMergeSelector.h>
2019-04-15 09:30:45 +00:00
# include <Storages/MergeTree/TTLMergeSelector.h>
2017-04-01 09:19:00 +00:00
# include <Storages/MergeTree/MergeList.h>
2018-07-18 12:17:48 +00:00
# include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
2019-04-15 09:30:45 +00:00
# include <DataStreams/TTLBlockInputStream.h>
2017-04-11 16:08:02 +00:00
# include <DataStreams/DistinctSortedBlockInputStream.h>
2017-04-01 09:19:00 +00:00
# include <DataStreams/ExpressionBlockInputStream.h>
# include <DataStreams/MaterializingBlockInputStream.h>
# include <DataStreams/ColumnGathererStream.h>
2020-04-02 16:28:50 +00:00
# include <Processors/Merges/MergingSortedTransform.h>
# include <Processors/Merges/CollapsingSortedTransform.h>
# include <Processors/Merges/SummingSortedTransform.h>
# include <Processors/Merges/ReplacingSortedTransform.h>
# include <Processors/Merges/GraphiteRollupSortedTransform.h>
# include <Processors/Merges/AggregatingSortedTransform.h>
# include <Processors/Merges/VersionedCollapsingTransform.h>
# include <Processors/Transforms/ExpressionTransform.h>
# include <Processors/Transforms/MaterializingTransform.h>
2020-07-31 13:23:19 +00:00
# include <Processors/Executors/PipelineExecutingBlockInputStream.h>
2018-09-03 13:36:58 +00:00
# include <Interpreters/MutationsInterpreter.h>
2020-05-20 20:16:32 +00:00
# include <Interpreters/Context.h>
2017-04-01 09:19:00 +00:00
# include <Common/interpolate.h>
2017-07-13 20:58:19 +00:00
# include <Common/typeid_cast.h>
2020-03-31 16:18:18 +00:00
# include <Common/escapeForFileName.h>
2020-06-26 21:55:48 +00:00
# include <Common/FileSyncGuard.h>
2020-08-26 15:29:46 +00:00
# include <Parsers/queryToString.h>
2020-07-28 14:38:34 +00:00
2016-10-27 22:50:02 +00:00
# include <cmath>
2020-07-28 14:38:34 +00:00
# include <ctime>
# include <numeric>
2016-10-24 02:02:37 +00:00
2020-03-30 12:51:05 +00:00
# include <boost/algorithm/string/replace.hpp>
2020-07-28 14:38:34 +00:00
2016-10-24 02:02:37 +00:00
namespace ProfileEvents
{
2017-04-01 07:20:54 +00:00
extern const Event MergedRows ;
extern const Event MergedUncompressedBytes ;
extern const Event MergesTimeMilliseconds ;
2019-09-26 12:06:52 +00:00
extern const Event Merge ;
2016-10-24 02:02:37 +00:00
}
2016-10-27 22:50:02 +00:00
namespace CurrentMetrics
{
2017-04-01 07:20:54 +00:00
extern const Metric BackgroundPoolTask ;
2018-05-25 19:44:14 +00:00
extern const Metric PartMutation ;
2016-10-27 22:50:02 +00:00
}
2014-03-13 12:48:07 +00:00
namespace DB
{
2016-01-11 21:46:36 +00:00
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int DIRECTORY_ALREADY_EXISTS ;
extern const int LOGICAL_ERROR ;
2017-04-01 07:20:54 +00:00
extern const int ABORTED ;
2016-01-11 21:46:36 +00:00
}
2016-12-22 02:04:32 +00:00
/// Do not start to merge parts, if free space is less than sum size of parts times specified coefficient.
/// This value is chosen to not allow big merges to eat all free space. Thus allowing small merges to proceed.
static const double DISK_USAGE_COEFFICIENT_TO_SELECT = 2 ;
/// To do merge, reserve amount of space equals to sum size of parts times specified coefficient.
/// Must be strictly less than DISK_USAGE_COEFFICIENT_TO_SELECT,
/// because between selecting parts to merge and doing merge, amount of free space could have decreased.
static const double DISK_USAGE_COEFFICIENT_TO_RESERVE = 1.1 ;
2014-03-13 12:48:07 +00:00
2019-01-13 22:02:33 +00:00
void FutureMergedMutatedPart : : assign ( MergeTreeData : : DataPartsVector parts_ )
2020-02-13 14:19:43 +00:00
{
if ( parts_ . empty ( ) )
return ;
size_t sum_rows = 0 ;
size_t sum_bytes_uncompressed = 0 ;
for ( const auto & part : parts_ )
{
sum_rows + = part - > rows_count ;
sum_bytes_uncompressed + = part - > getTotalColumnsSize ( ) . data_uncompressed ;
}
2020-04-14 19:47:19 +00:00
auto future_part_type = parts_ . front ( ) - > storage . choosePartTypeOnDisk ( sum_bytes_uncompressed , sum_rows ) ;
2020-02-13 14:19:43 +00:00
assign ( std : : move ( parts_ ) , future_part_type ) ;
}
void FutureMergedMutatedPart : : assign ( MergeTreeData : : DataPartsVector parts_ , MergeTreeDataPartType future_part_type )
2017-08-16 19:24:50 +00:00
{
if ( parts_ . empty ( ) )
return ;
2018-05-13 00:24:23 +00:00
for ( const MergeTreeData : : DataPartPtr & part : parts_ )
2017-09-11 17:55:41 +00:00
{
2018-05-13 00:24:23 +00:00
const MergeTreeData : : DataPartPtr & first_part = parts_ . front ( ) ;
if ( part - > partition . value ! = first_part - > partition . value )
2017-09-11 17:55:41 +00:00
throw Exception (
2018-05-13 00:24:23 +00:00
" Attempting to merge parts " + first_part - > name + " and " + part - > name + " that are in different partitions " ,
2017-09-11 17:55:41 +00:00
ErrorCodes : : LOGICAL_ERROR ) ;
}
2017-08-16 19:24:50 +00:00
parts = std : : move ( parts_ ) ;
UInt32 max_level = 0 ;
2019-11-18 01:11:10 +00:00
Int64 max_mutation = 0 ;
2017-08-16 19:24:50 +00:00
for ( const auto & part : parts )
2019-11-18 01:11:10 +00:00
{
2017-08-16 19:24:50 +00:00
max_level = std : : max ( max_level , part - > info . level ) ;
2019-11-18 01:11:10 +00:00
max_mutation = std : : max ( max_mutation , part - > info . mutation ) ;
}
2017-08-16 19:24:50 +00:00
2020-02-13 14:19:43 +00:00
type = future_part_type ;
2017-08-16 19:24:50 +00:00
part_info . partition_id = parts . front ( ) - > info . partition_id ;
part_info . min_block = parts . front ( ) - > info . min_block ;
part_info . max_block = parts . back ( ) - > info . max_block ;
part_info . level = max_level + 1 ;
2019-11-18 01:11:10 +00:00
part_info . mutation = max_mutation ;
2017-08-16 19:24:50 +00:00
2017-09-07 16:21:06 +00:00
if ( parts . front ( ) - > storage . format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING )
2017-08-21 15:35:29 +00:00
{
2018-05-25 13:29:15 +00:00
DayNum min_date = DayNum ( std : : numeric_limits < UInt16 > : : max ( ) ) ;
DayNum max_date = DayNum ( std : : numeric_limits < UInt16 > : : min ( ) ) ;
2017-08-25 20:41:45 +00:00
for ( const auto & part : parts )
{
2018-05-23 19:34:37 +00:00
/// NOTE: getting min and max dates from part names (instead of part data) because we want
/// the merged part name be determined only by source part names.
/// It is simpler this way when the real min and max dates for the block range can change
/// (e.g. after an ALTER DELETE command).
2018-05-28 15:37:30 +00:00
DayNum part_min_date ;
DayNum part_max_date ;
2018-05-23 19:34:37 +00:00
MergeTreePartInfo : : parseMinMaxDatesFromPartName ( part - > name , part_min_date , part_max_date ) ;
min_date = std : : min ( min_date , part_min_date ) ;
max_date = std : : max ( max_date , part_max_date ) ;
2017-08-25 20:41:45 +00:00
}
2017-08-21 15:35:29 +00:00
2017-08-25 20:41:45 +00:00
name = part_info . getPartNameV0 ( min_date , max_date ) ;
}
else
name = part_info . getPartName ( ) ;
2017-08-16 19:24:50 +00:00
}
2019-12-09 13:35:02 +00:00
void FutureMergedMutatedPart : : updatePath ( const MergeTreeData & storage , const ReservationPtr & reservation )
2019-12-07 09:54:05 +00:00
{
path = storage . getFullPathOnDisk ( reservation - > getDisk ( ) ) + name + " / " ;
}
2019-09-04 16:00:20 +00:00
MergeTreeDataMergerMutator : : MergeTreeDataMergerMutator ( MergeTreeData & data_ , size_t background_pool_size_ )
2020-05-30 21:57:37 +00:00
: data ( data_ ) , background_pool_size ( background_pool_size_ ) , log ( & Poco : : Logger : : get ( data . getLogName ( ) + " (MergerMutator) " ) )
2016-03-25 11:48:45 +00:00
{
}
2014-03-13 12:48:07 +00:00
2020-09-04 10:52:51 +00:00
UInt64 MergeTreeDataMergerMutator : : getMaxSourcePartsSizeForMerge ( ) const
2016-10-27 22:50:02 +00:00
{
2017-04-01 07:20:54 +00:00
size_t busy_threads_in_pool = CurrentMetrics : : values [ CurrentMetrics : : BackgroundPoolTask ] . load ( std : : memory_order_relaxed ) ;
2016-11-24 22:41:27 +00:00
2019-09-04 16:00:20 +00:00
return getMaxSourcePartsSizeForMerge ( background_pool_size , busy_threads_in_pool = = 0 ? 0 : busy_threads_in_pool - 1 ) ; /// 1 is current thread
2016-11-24 22:41:27 +00:00
}
2020-09-04 10:52:51 +00:00
UInt64 MergeTreeDataMergerMutator : : getMaxSourcePartsSizeForMerge ( size_t pool_size , size_t pool_used ) const
2016-11-24 22:41:27 +00:00
{
2017-04-01 07:20:54 +00:00
if ( pool_used > pool_size )
2018-04-20 16:18:16 +00:00
throw Exception ( " Logical error: invalid arguments passed to getMaxSourcePartsSize: pool_used > pool_size " , ErrorCodes : : LOGICAL_ERROR ) ;
2016-10-27 22:50:02 +00:00
2017-04-01 07:20:54 +00:00
size_t free_entries = pool_size - pool_used ;
2019-08-26 14:24:29 +00:00
const auto data_settings = data . getSettings ( ) ;
2016-12-06 03:09:07 +00:00
2020-07-27 20:18:23 +00:00
/// Always allow maximum size if one or less pool entries is busy.
/// One entry is probably the entry where this function is executed.
/// This will protect from bad settings.
2018-12-17 14:10:23 +00:00
UInt64 max_size = 0 ;
2020-07-27 20:18:23 +00:00
if ( pool_used < = 1 | | free_entries > = data_settings - > number_of_free_entries_in_pool_to_lower_max_size_of_merge )
2019-08-13 10:29:31 +00:00
max_size = data_settings - > max_bytes_to_merge_at_max_space_in_pool ;
2017-04-01 07:20:54 +00:00
else
max_size = interpolateExponential (
2019-08-13 10:29:31 +00:00
data_settings - > max_bytes_to_merge_at_min_space_in_pool ,
data_settings - > max_bytes_to_merge_at_max_space_in_pool ,
static_cast < double > ( free_entries ) / data_settings - > number_of_free_entries_in_pool_to_lower_max_size_of_merge ) ;
2016-10-27 22:50:02 +00:00
2020-01-09 14:50:34 +00:00
return std : : min ( max_size , static_cast < UInt64 > ( data . getStoragePolicy ( ) - > getMaxUnreservedFreeSpace ( ) / DISK_USAGE_COEFFICIENT_TO_SELECT ) ) ;
2016-10-27 22:50:02 +00:00
}
2014-03-13 12:48:07 +00:00
2020-09-02 08:28:46 +00:00
UInt64 MergeTreeDataMergerMutator : : getMaxSourcePartSizeForMutation ( ) const
2019-06-17 19:41:48 +00:00
{
2019-08-26 14:24:29 +00:00
const auto data_settings = data . getSettings ( ) ;
2019-08-22 19:35:46 +00:00
size_t busy_threads_in_pool = CurrentMetrics : : values [ CurrentMetrics : : BackgroundPoolTask ] . load ( std : : memory_order_relaxed ) ;
2020-01-09 14:50:34 +00:00
/// DataPart can be store only at one disk. Get maximum reservable free space at all disks.
UInt64 disk_space = data . getStoragePolicy ( ) - > getMaxUnreservedFreeSpace ( ) ;
2020-01-09 11:38:26 +00:00
2019-08-22 19:35:46 +00:00
/// Allow mutations only if there are enough threads, leave free threads for merges else
2020-07-27 20:18:23 +00:00
if ( busy_threads_in_pool < = 1
2020-07-23 12:31:09 +00:00
| | background_pool_size - busy_threads_in_pool > = data_settings - > number_of_free_entries_in_pool_to_execute_mutation )
2020-01-09 11:38:26 +00:00
return static_cast < UInt64 > ( disk_space / DISK_USAGE_COEFFICIENT_TO_RESERVE ) ;
2019-08-22 19:35:46 +00:00
return 0 ;
2019-06-17 19:41:48 +00:00
}
2018-04-20 16:18:16 +00:00
bool MergeTreeDataMergerMutator : : selectPartsToMerge (
2019-01-13 22:02:33 +00:00
FutureMergedMutatedPart & future_part ,
2017-04-01 07:20:54 +00:00
bool aggressive ,
size_t max_total_size_to_merge ,
2018-01-12 17:30:21 +00:00
const AllowedMergingPredicate & can_merge_callback ,
2020-09-04 06:55:19 +00:00
bool merge_with_ttl_allowed ,
2020-04-16 18:47:20 +00:00
String * out_disable_reason )
2014-03-13 12:48:07 +00:00
{
2017-04-01 07:20:54 +00:00
MergeTreeData : : DataPartsVector data_parts = data . getDataPartsVector ( ) ;
2019-08-26 14:24:29 +00:00
const auto data_settings = data . getSettings ( ) ;
2020-09-02 12:16:12 +00:00
auto metadata_snapshot = data . getInMemoryMetadataPtr ( ) ;
2014-03-13 12:48:07 +00:00
2017-04-01 07:20:54 +00:00
if ( data_parts . empty ( ) )
2018-01-12 17:30:21 +00:00
{
2018-01-19 18:42:22 +00:00
if ( out_disable_reason )
* out_disable_reason = " There are no parts in the table " ;
2017-04-01 07:20:54 +00:00
return false ;
2018-01-12 17:30:21 +00:00
}
2016-03-25 11:48:45 +00:00
2020-07-28 14:38:34 +00:00
time_t current_time = std : : time ( nullptr ) ;
2014-03-13 12:48:07 +00:00
2020-09-03 13:29:18 +00:00
IMergeSelector : : PartsRanges parts_ranges ;
2014-03-13 12:48:07 +00:00
2017-08-14 18:16:11 +00:00
const String * prev_partition_id = nullptr ;
2020-04-13 15:21:05 +00:00
/// Previous part only in boundaries of partition frame
2017-04-01 07:20:54 +00:00
const MergeTreeData : : DataPartPtr * prev_part = nullptr ;
2020-09-03 13:02:24 +00:00
2017-04-01 07:20:54 +00:00
for ( const MergeTreeData : : DataPartPtr & part : data_parts )
{
2020-09-03 13:02:24 +00:00
const String & partition_id = part - > info . partition_id ;
if ( ! prev_partition_id | | partition_id ! = * prev_partition_id )
{
2020-09-03 13:29:18 +00:00
if ( parts_ranges . empty ( ) | | ! parts_ranges . back ( ) . empty ( ) )
parts_ranges . emplace_back ( ) ;
2020-09-03 13:02:24 +00:00
/// New partition frame.
prev_partition_id = & partition_id ;
prev_part = nullptr ;
}
2020-04-13 15:21:05 +00:00
/// Check predicate only for first part in each partition.
if ( ! prev_part )
2020-05-14 20:08:15 +00:00
{
2020-04-13 15:21:05 +00:00
/* Parts can be merged with themselves for TTL needs for example.
* So we have to check if this part is currently being inserted with quorum and so on and so forth .
* Obviously we have to check it manually only for the first part
* of each partition because it will be automatically checked for a pair of parts . */
2020-04-16 18:47:20 +00:00
if ( ! can_merge_callback ( nullptr , part , nullptr ) )
2020-04-13 15:21:05 +00:00
continue ;
2020-05-14 20:08:15 +00:00
}
2020-09-03 13:02:24 +00:00
else
2017-04-01 07:20:54 +00:00
{
2020-09-03 13:02:24 +00:00
/// If we cannot merge with previous part we had to start new parts
/// interval (in the same partition)
if ( ! can_merge_callback ( * prev_part , part , nullptr ) )
{
/// Starting new interval in the same partition
2020-09-03 13:29:18 +00:00
if ( ! parts_ranges . back ( ) . empty ( ) )
parts_ranges . emplace_back ( ) ;
2020-09-03 13:02:24 +00:00
2020-09-03 13:29:18 +00:00
/// Now we have no previous part, but it affects only logging
2020-09-03 13:02:24 +00:00
prev_part = nullptr ;
}
2017-04-01 07:20:54 +00:00
}
2014-03-13 12:48:07 +00:00
2017-04-01 07:20:54 +00:00
IMergeSelector : : Part part_info ;
2020-03-23 13:32:02 +00:00
part_info . size = part - > getBytesOnDisk ( ) ;
2017-04-01 07:20:54 +00:00
part_info . age = current_time - part - > modification_time ;
2017-08-14 18:16:11 +00:00
part_info . level = part - > info . level ;
2017-04-01 07:20:54 +00:00
part_info . data = & part ;
2020-09-14 19:15:25 +00:00
part_info . ttl_infos = & part - > ttl_infos ;
2020-09-02 16:15:41 +00:00
part_info . compression_codec_desc = part - > default_codec - > getFullCodecDesc ( ) ;
2019-04-15 09:30:45 +00:00
2020-09-03 13:29:18 +00:00
parts_ranges . back ( ) . emplace_back ( part_info ) ;
2016-10-30 03:39:28 +00:00
2017-08-14 18:16:11 +00:00
/// Check for consistency of data parts. If assertion is failed, it requires immediate investigation.
if ( prev_part & & part - > info . partition_id = = ( * prev_part ) - > info . partition_id
2018-02-19 17:32:37 +00:00
& & part - > info . min_block < = ( * prev_part ) - > info . max_block )
2017-04-01 07:20:54 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_ERROR ( log , " Part {} intersects previous part {} " , part - > name , ( * prev_part ) - > name ) ;
2017-04-01 07:20:54 +00:00
}
2016-11-18 00:20:22 +00:00
2017-04-01 07:20:54 +00:00
prev_part = & part ;
}
2014-03-13 12:48:07 +00:00
2020-09-03 13:29:18 +00:00
IMergeSelector : : PartsRange parts_to_merge ;
2016-11-22 19:34:36 +00:00
2020-09-04 06:55:19 +00:00
if ( metadata_snapshot - > hasAnyTTL ( ) & & merge_with_ttl_allowed & & ! ttl_merges_blocker . isCancelled ( ) )
2019-04-15 09:30:45 +00:00
{
2020-09-07 07:59:14 +00:00
/// TTL delete is prefered to recompression
2020-09-02 10:30:04 +00:00
TTLDeleteMergeSelector delete_ttl_selector (
2020-09-04 14:08:43 +00:00
next_delete_ttl_merge_times_by_partition ,
2020-07-28 14:38:34 +00:00
current_time ,
data_settings - > merge_with_ttl_timeout ,
data_settings - > ttl_only_drop_parts ) ;
2020-08-31 19:50:42 +00:00
2020-09-04 10:52:51 +00:00
parts_to_merge = delete_ttl_selector . select ( parts_ranges , max_total_size_to_merge ) ;
2020-09-02 10:30:04 +00:00
if ( ! parts_to_merge . empty ( ) )
2020-09-07 07:59:14 +00:00
{
2020-09-02 10:30:04 +00:00
future_part . merge_type = MergeType : : TTL_DELETE ;
2020-09-07 07:59:14 +00:00
}
2020-09-02 12:16:12 +00:00
else if ( metadata_snapshot - > hasAnyRecompressionTTL ( ) )
2020-09-02 10:30:04 +00:00
{
TTLRecompressMergeSelector recompress_ttl_selector (
2020-09-04 14:08:43 +00:00
next_recompress_ttl_merge_times_by_partition ,
2020-09-02 10:30:04 +00:00
current_time ,
2020-09-04 14:08:43 +00:00
data_settings - > merge_with_recompression_ttl_timeout ,
2020-09-02 12:16:12 +00:00
metadata_snapshot - > getRecompressionTTLs ( ) ) ;
2020-09-02 10:30:04 +00:00
2020-09-04 10:52:51 +00:00
parts_to_merge = recompress_ttl_selector . select ( parts_ranges , max_total_size_to_merge ) ;
2020-09-02 10:30:04 +00:00
if ( ! parts_to_merge . empty ( ) )
future_part . merge_type = MergeType : : TTL_RECOMPRESS ;
}
2019-04-15 09:30:45 +00:00
}
2014-03-13 12:48:07 +00:00
2017-04-01 07:20:54 +00:00
if ( parts_to_merge . empty ( ) )
2018-01-12 17:30:21 +00:00
{
2020-07-28 14:38:34 +00:00
SimpleMergeSelector : : Settings merge_settings ;
if ( aggressive )
merge_settings . base = 1 ;
parts_to_merge = SimpleMergeSelector ( merge_settings )
2020-09-03 13:29:18 +00:00
. select ( parts_ranges , max_total_size_to_merge ) ;
2014-04-04 10:37:33 +00:00
2020-07-28 14:38:34 +00:00
/// Do not allow to "merge" part with itself for regular merges, unless it is a TTL-merge where it is ok to remove some values with expired ttl
if ( parts_to_merge . size ( ) = = 1 )
throw Exception ( " Logical error: merge selector returned only one part to merge " , ErrorCodes : : LOGICAL_ERROR ) ;
if ( parts_to_merge . empty ( ) )
{
if ( out_disable_reason )
* out_disable_reason = " There is no need to merge parts according to merge selector algorithm " ;
return false ;
}
}
2016-10-30 08:15:55 +00:00
2017-08-16 19:24:50 +00:00
MergeTreeData : : DataPartsVector parts ;
2017-04-01 07:20:54 +00:00
parts . reserve ( parts_to_merge . size ( ) ) ;
for ( IMergeSelector : : Part & part_info : parts_to_merge )
{
const MergeTreeData : : DataPartPtr & part = * static_cast < const MergeTreeData : : DataPartPtr * > ( part_info . data ) ;
parts . push_back ( part ) ;
}
2014-03-13 12:48:07 +00:00
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Selected {} parts from {} to {} " , parts . size ( ) , parts . front ( ) - > name , parts . back ( ) - > name ) ;
2017-08-16 19:24:50 +00:00
future_part . assign ( std : : move ( parts ) ) ;
2017-04-01 07:20:54 +00:00
return true ;
2014-03-13 12:48:07 +00:00
}
2018-04-20 16:18:16 +00:00
bool MergeTreeDataMergerMutator : : selectAllPartsToMergeWithinPartition (
2019-01-13 22:02:33 +00:00
FutureMergedMutatedPart & future_part ,
2018-12-17 14:10:23 +00:00
UInt64 & available_disk_space ,
2017-04-01 07:20:54 +00:00
const AllowedMergingPredicate & can_merge ,
2017-08-14 18:16:11 +00:00
const String & partition_id ,
2018-01-12 17:30:21 +00:00
bool final ,
2020-10-13 18:25:45 +00:00
bool * is_single_merged_part ,
2020-10-15 12:15:02 +00:00
const StorageMetadataPtr & metadata_snapshot ,
2018-01-12 17:30:21 +00:00
String * out_disable_reason )
2016-05-16 18:43:38 +00:00
{
2017-08-14 18:16:11 +00:00
MergeTreeData : : DataPartsVector parts = selectAllPartsFromPartition ( partition_id ) ;
2017-04-01 07:20:54 +00:00
if ( parts . empty ( ) )
return false ;
if ( ! final & & parts . size ( ) = = 1 )
2018-01-12 17:30:21 +00:00
{
2018-01-19 18:42:22 +00:00
if ( out_disable_reason )
* out_disable_reason = " There is only one part inside partition " ;
2017-04-01 07:20:54 +00:00
return false ;
2018-01-12 17:30:21 +00:00
}
2017-04-01 07:20:54 +00:00
2020-10-13 18:25:45 +00:00
/// If final, optimize_skip_merged_partitions is true and we have only one part in partition with level > 0
2020-10-15 12:15:02 +00:00
/// than we don't select it to merge. But if there are some expired TTL then merge is needed
if ( final & & data . getSettings ( ) - > optimize_skip_merged_partitions & & parts . size ( ) = = 1 & & parts [ 0 ] - > info . level > 0 & &
( ! metadata_snapshot - > hasAnyTTL ( ) | | parts [ 0 ] - > checkAllTTLCalculated ( metadata_snapshot ) ) )
2020-10-13 14:51:08 +00:00
{
* is_single_merged_part = true ;
return false ;
}
2017-10-02 16:34:01 +00:00
auto it = parts . begin ( ) ;
auto prev_it = it ;
2017-04-01 07:20:54 +00:00
2018-12-17 14:10:23 +00:00
UInt64 sum_bytes = 0 ;
2017-04-01 07:20:54 +00:00
while ( it ! = parts . end ( ) )
{
2017-10-02 16:34:01 +00:00
/// For the case of one part, we check that it can be merged "with itself".
2018-01-12 17:30:21 +00:00
if ( ( it ! = parts . begin ( ) | | parts . size ( ) = = 1 ) & & ! can_merge ( * prev_it , * it , out_disable_reason ) )
2017-10-02 16:34:01 +00:00
{
2017-04-01 07:20:54 +00:00
return false ;
2017-10-02 16:34:01 +00:00
}
2017-04-01 07:20:54 +00:00
2020-03-23 13:32:02 +00:00
sum_bytes + = ( * it ) - > getBytesOnDisk ( ) ;
2017-04-01 07:20:54 +00:00
prev_it = it ;
+ + it ;
}
/// Enough disk space to cover the new merge with a margin.
2018-01-12 17:30:21 +00:00
auto required_disk_space = sum_bytes * DISK_USAGE_COEFFICIENT_TO_SELECT ;
if ( available_disk_space < = required_disk_space )
2017-04-01 07:20:54 +00:00
{
2017-08-04 14:00:26 +00:00
time_t now = time ( nullptr ) ;
2017-04-01 07:20:54 +00:00
if ( now - disk_space_warning_time > 3600 )
{
disk_space_warning_time = now ;
2020-05-23 22:24:01 +00:00
LOG_WARNING ( log ,
2020-05-23 21:41:35 +00:00
" Won't merge parts from {} to {} because not enough free space: {} free and unreserved "
" , {} required now (+{}% on overhead); suppressing similar warnings for the next hour " ,
parts . front ( ) - > name ,
( * prev_it ) - > name ,
2020-05-30 21:35:52 +00:00
ReadableSize ( available_disk_space ) ,
ReadableSize ( sum_bytes ) ,
2020-05-23 21:41:35 +00:00
static_cast < int > ( ( DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0 ) * 100 ) ) ;
2017-04-01 07:20:54 +00:00
}
2018-01-12 17:30:21 +00:00
2018-01-19 18:42:22 +00:00
if ( out_disable_reason )
2020-05-30 21:35:52 +00:00
* out_disable_reason = fmt : : format ( " Insufficient available disk space, required {} " , ReadableSize ( required_disk_space ) ) ;
2018-01-12 17:30:21 +00:00
2017-04-01 07:20:54 +00:00
return false ;
}
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Selected {} parts from {} to {} " , parts . size ( ) , parts . front ( ) - > name , parts . back ( ) - > name ) ;
2017-08-16 19:24:50 +00:00
future_part . assign ( std : : move ( parts ) ) ;
2020-09-02 08:18:50 +00:00
2018-07-06 15:25:22 +00:00
available_disk_space - = required_disk_space ;
2017-04-01 07:20:54 +00:00
return true ;
2016-05-16 18:43:38 +00:00
}
2018-04-20 16:18:16 +00:00
MergeTreeData : : DataPartsVector MergeTreeDataMergerMutator : : selectAllPartsFromPartition ( const String & partition_id )
2016-01-28 01:00:27 +00:00
{
2017-04-01 07:20:54 +00:00
MergeTreeData : : DataPartsVector parts_from_partition ;
2016-01-28 01:00:27 +00:00
2017-04-01 07:20:54 +00:00
MergeTreeData : : DataParts data_parts = data . getDataParts ( ) ;
2016-01-28 01:00:27 +00:00
2020-03-09 01:50:33 +00:00
for ( const auto & current_part : data_parts )
2017-04-01 07:20:54 +00:00
{
2017-08-14 18:16:11 +00:00
if ( current_part - > info . partition_id ! = partition_id )
2017-04-01 07:20:54 +00:00
continue ;
2016-01-28 01:00:27 +00:00
2017-08-14 18:16:11 +00:00
parts_from_partition . push_back ( current_part ) ;
2017-04-01 07:20:54 +00:00
}
2016-01-28 01:00:27 +00:00
2017-04-01 07:20:54 +00:00
return parts_from_partition ;
2016-01-28 01:00:27 +00:00
}
2014-03-13 12:48:07 +00:00
2016-11-09 17:58:44 +00:00
2016-11-03 12:00:44 +00:00
/// PK columns are sorted and merged, ordinary columns are gathered using info from merge step
2018-10-11 14:53:23 +00:00
static void extractMergingAndGatheringColumns (
2020-05-19 09:54:56 +00:00
const NamesAndTypesList & storage_columns ,
2018-10-12 19:00:43 +00:00
const ExpressionActionsPtr & sorting_key_expr ,
2020-05-28 13:45:08 +00:00
const IndicesDescription & indexes ,
2017-04-01 07:20:54 +00:00
const MergeTreeData : : MergingParams & merging_params ,
2017-12-25 21:57:29 +00:00
NamesAndTypesList & gathering_columns , Names & gathering_column_names ,
2018-10-11 14:53:23 +00:00
NamesAndTypesList & merging_columns , Names & merging_column_names )
2016-11-03 12:00:44 +00:00
{
2018-10-12 19:00:43 +00:00
Names sort_key_columns_vec = sorting_key_expr - > getRequiredColumns ( ) ;
2018-10-11 14:53:23 +00:00
std : : set < String > key_columns ( sort_key_columns_vec . cbegin ( ) , sort_key_columns_vec . cend ( ) ) ;
2019-02-05 14:50:25 +00:00
for ( const auto & index : indexes )
{
2020-05-28 13:45:08 +00:00
Names index_columns_vec = index . expression - > getRequiredColumns ( ) ;
2019-02-05 14:50:25 +00:00
std : : copy ( index_columns_vec . cbegin ( ) , index_columns_vec . cend ( ) ,
std : : inserter ( key_columns , key_columns . end ( ) ) ) ;
}
2017-04-01 07:20:54 +00:00
/// Force sign column for Collapsing mode
if ( merging_params . mode = = MergeTreeData : : MergingParams : : Collapsing )
key_columns . emplace ( merging_params . sign_column ) ;
2017-07-17 19:40:30 +00:00
/// Force version column for Replacing mode
if ( merging_params . mode = = MergeTreeData : : MergingParams : : Replacing )
key_columns . emplace ( merging_params . version_column ) ;
2018-02-02 09:46:54 +00:00
/// Force sign column for VersionedCollapsing mode. Version is already in primary key.
if ( merging_params . mode = = MergeTreeData : : MergingParams : : VersionedCollapsing )
2018-01-29 17:42:19 +00:00
key_columns . emplace ( merging_params . sign_column ) ;
2018-03-14 16:43:18 +00:00
/// Force to merge at least one column in case of empty key
if ( key_columns . empty ( ) )
2020-05-19 09:54:56 +00:00
key_columns . emplace ( storage_columns . front ( ) . name ) ;
2018-03-14 16:43:18 +00:00
2017-04-01 07:20:54 +00:00
/// TODO: also force "summing" and "aggregating" columns to make Horizontal merge only for such columns
2020-05-19 09:54:56 +00:00
for ( const auto & column : storage_columns )
2017-04-01 07:20:54 +00:00
{
2018-10-11 14:53:23 +00:00
if ( key_columns . count ( column . name ) )
2017-04-01 07:20:54 +00:00
{
2018-10-11 14:53:23 +00:00
merging_columns . emplace_back ( column ) ;
merging_column_names . emplace_back ( column . name ) ;
2017-04-01 07:20:54 +00:00
}
else
{
2018-10-11 14:53:23 +00:00
gathering_columns . emplace_back ( column ) ;
gathering_column_names . emplace_back ( column . name ) ;
2017-04-01 07:20:54 +00:00
}
}
2016-11-03 12:00:44 +00:00
}
/* Allow to compute more accurate progress statistics */
class ColumnSizeEstimator
{
2017-04-01 07:20:54 +00:00
MergeTreeData : : DataPart : : ColumnToSize map ;
2016-11-03 12:00:44 +00:00
public :
2017-04-01 07:20:54 +00:00
/// Stores approximate size of columns in bytes
/// Exact values are not required since it used for relative values estimation (progress).
size_t sum_total = 0 ;
size_t sum_index_columns = 0 ;
size_t sum_ordinary_columns = 0 ;
ColumnSizeEstimator ( const MergeTreeData : : DataPart : : ColumnToSize & map_ , const Names & key_columns , const Names & ordinary_columns )
: map ( map_ )
{
for ( const auto & name : key_columns )
if ( ! map . count ( name ) ) map [ name ] = 0 ;
for ( const auto & name : ordinary_columns )
if ( ! map . count ( name ) ) map [ name ] = 0 ;
for ( const auto & name : key_columns )
sum_index_columns + = map . at ( name ) ;
for ( const auto & name : ordinary_columns )
sum_ordinary_columns + = map . at ( name ) ;
2017-09-01 17:21:03 +00:00
sum_total = std : : max ( static_cast < decltype ( sum_index_columns ) > ( 1 ) , sum_index_columns + sum_ordinary_columns ) ;
2017-04-01 07:20:54 +00:00
}
2019-01-11 19:14:50 +00:00
Float64 columnWeight ( const String & column ) const
2017-04-01 07:20:54 +00:00
{
2019-01-11 19:14:50 +00:00
return static_cast < Float64 > ( map . at ( column ) ) / sum_total ;
2017-04-01 07:20:54 +00:00
}
2019-01-11 19:14:50 +00:00
Float64 keyColumnsWeight ( ) const
2017-04-01 07:20:54 +00:00
{
2019-01-11 19:14:50 +00:00
return static_cast < Float64 > ( sum_index_columns ) / sum_total ;
2017-04-01 07:20:54 +00:00
}
2019-01-11 19:14:50 +00:00
} ;
2017-04-01 07:20:54 +00:00
2019-01-11 19:14:50 +00:00
/** Progress callback.
* What it should update :
* - approximate progress
* - amount of read rows
* - various metrics
* - time elapsed for current merge .
*/
2020-08-08 00:47:03 +00:00
/// Auxiliary struct that for each merge stage stores its current progress.
2019-01-11 19:14:50 +00:00
/// A stage is: the horizontal stage + a stage for each gathered column (if we are doing a
/// Vertical merge) or a mutation of a single part. During a single stage all rows are read.
struct MergeStageProgress
{
2020-03-18 03:27:32 +00:00
explicit MergeStageProgress ( Float64 weight_ )
2019-01-11 19:14:50 +00:00
: is_first ( true ) , weight ( weight_ )
2017-04-01 07:20:54 +00:00
{
}
2019-01-11 19:14:50 +00:00
MergeStageProgress ( Float64 initial_progress_ , Float64 weight_ )
: initial_progress ( initial_progress_ ) , is_first ( false ) , weight ( weight_ )
2017-04-01 07:20:54 +00:00
{
}
2019-01-11 19:14:50 +00:00
Float64 initial_progress = 0.0 ;
bool is_first ;
Float64 weight ;
UInt64 total_rows = 0 ;
UInt64 rows_read = 0 ;
2016-11-03 12:00:44 +00:00
} ;
2017-09-14 13:56:54 +00:00
class MergeProgressCallback
2016-11-03 12:00:44 +00:00
{
public :
2019-01-11 19:14:50 +00:00
MergeProgressCallback (
MergeList : : Entry & merge_entry_ , UInt64 & watch_prev_elapsed_ , MergeStageProgress & stage_ )
: merge_entry ( merge_entry_ )
, watch_prev_elapsed ( watch_prev_elapsed_ )
, stage ( stage_ )
2017-04-01 07:20:54 +00:00
{
updateWatch ( ) ;
}
MergeList : : Entry & merge_entry ;
UInt64 & watch_prev_elapsed ;
2019-01-11 19:14:50 +00:00
MergeStageProgress & stage ;
2017-04-01 07:20:54 +00:00
void updateWatch ( )
{
UInt64 watch_curr_elapsed = merge_entry - > watch . elapsed ( ) ;
ProfileEvents : : increment ( ProfileEvents : : MergesTimeMilliseconds , ( watch_curr_elapsed - watch_prev_elapsed ) / 1000000 ) ;
watch_prev_elapsed = watch_curr_elapsed ;
}
void operator ( ) ( const Progress & value )
{
2019-05-20 11:37:41 +00:00
ProfileEvents : : increment ( ProfileEvents : : MergedUncompressedBytes , value . read_bytes ) ;
2019-09-25 10:46:24 +00:00
if ( stage . is_first )
{
2019-05-20 11:37:41 +00:00
ProfileEvents : : increment ( ProfileEvents : : MergedRows , value . read_rows ) ;
2019-09-26 12:06:52 +00:00
ProfileEvents : : increment ( ProfileEvents : : Merge ) ;
2019-09-25 10:37:43 +00:00
}
2017-04-01 07:20:54 +00:00
updateWatch ( ) ;
2019-05-20 11:37:41 +00:00
merge_entry - > bytes_read_uncompressed + = value . read_bytes ;
2019-01-11 19:14:50 +00:00
if ( stage . is_first )
2019-05-20 11:37:41 +00:00
merge_entry - > rows_read + = value . read_rows ;
2018-03-03 18:00:46 +00:00
2019-05-20 11:37:41 +00:00
stage . total_rows + = value . total_rows_to_read ;
stage . rows_read + = value . read_rows ;
2019-01-11 19:14:50 +00:00
if ( stage . total_rows > 0 )
{
merge_entry - > progress . store (
stage . initial_progress + stage . weight * stage . rows_read / stage . total_rows ,
std : : memory_order_relaxed ) ;
}
2018-06-03 20:39:06 +00:00
}
2016-11-03 12:00:44 +00:00
} ;
2020-06-25 16:55:45 +00:00
static bool needSyncPart ( const size_t input_rows , size_t input_bytes , const MergeTreeSettings & settings )
{
2020-07-02 23:41:37 +00:00
return ( ( settings . min_rows_to_fsync_after_merge & & input_rows > = settings . min_rows_to_fsync_after_merge )
| | ( settings . min_compressed_bytes_to_fsync_after_merge & & input_bytes > = settings . min_compressed_bytes_to_fsync_after_merge ) ) ;
2020-06-25 16:55:45 +00:00
}
2016-11-03 12:00:44 +00:00
/// parts should be sorted.
2018-04-20 16:18:16 +00:00
MergeTreeData : : MutableDataPartPtr MergeTreeDataMergerMutator : : mergePartsToTemporaryPart (
2020-06-16 12:19:21 +00:00
const FutureMergedMutatedPart & future_part ,
2020-06-16 14:25:08 +00:00
const StorageMetadataPtr & metadata_snapshot ,
2020-06-16 12:19:21 +00:00
MergeList : : Entry & merge_entry ,
2020-06-18 16:10:47 +00:00
TableLockHolder & ,
2020-06-16 12:19:21 +00:00
time_t time_of_merge ,
2020-10-05 17:43:38 +00:00
const Context & context ,
2020-06-16 12:19:21 +00:00
const ReservationPtr & space_reservation ,
2020-08-28 13:45:42 +00:00
bool deduplicate )
2014-03-13 12:48:07 +00:00
{
2017-05-26 00:47:06 +00:00
static const String TMP_PREFIX = " tmp_merge_ " ;
2017-05-15 23:41:16 +00:00
2019-08-01 15:36:12 +00:00
if ( merges_blocker . isCancelled ( ) )
2017-04-01 07:20:54 +00:00
throw Exception ( " Cancelled merging parts " , ErrorCodes : : ABORTED ) ;
2020-09-07 07:59:14 +00:00
/// We don't want to perform merge assigned with TTL as normal merge, so
/// throw exception
2020-09-03 13:00:13 +00:00
if ( isTTLMergeType ( future_part . merge_type ) & & ttl_merges_blocker . isCancelled ( ) )
throw Exception ( " Cancelled merging parts with TTL " , ErrorCodes : : ABORTED ) ;
2017-08-16 19:24:50 +00:00
const MergeTreeData : : DataPartsVector & parts = future_part . parts ;
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Merging {} parts: from {} to {} into {} " , parts . size ( ) , parts . front ( ) - > name , parts . back ( ) - > name , future_part . type . toString ( ) ) ;
2017-04-01 07:20:54 +00:00
2020-03-19 16:37:55 +00:00
auto disk = space_reservation - > getDisk ( ) ;
String part_path = data . relative_data_path ;
2019-04-01 18:41:19 +00:00
String new_part_tmp_path = part_path + TMP_PREFIX + future_part . name + " / " ;
2020-03-19 16:37:55 +00:00
if ( disk - > exists ( new_part_tmp_path ) )
throw Exception ( " Directory " + fullPath ( disk , new_part_tmp_path ) + " already exists " , ErrorCodes : : DIRECTORY_ALREADY_EXISTS ) ;
2017-04-01 07:20:54 +00:00
MergeTreeData : : DataPart : : ColumnToSize merged_column_to_size ;
2020-06-17 16:39:58 +00:00
Names all_column_names = metadata_snapshot - > getColumns ( ) . getNamesOfPhysical ( ) ;
NamesAndTypesList storage_columns = metadata_snapshot - > getColumns ( ) . getAllPhysical ( ) ;
2019-08-26 14:24:29 +00:00
const auto data_settings = data . getSettings ( ) ;
2017-04-01 07:20:54 +00:00
2019-08-29 18:55:20 +00:00
NamesAndTypesList gathering_columns ;
NamesAndTypesList merging_columns ;
2017-04-01 07:20:54 +00:00
Names gathering_column_names , merging_column_names ;
2018-10-11 14:53:23 +00:00
extractMergingAndGatheringColumns (
2020-06-17 11:05:11 +00:00
storage_columns ,
metadata_snapshot - > getSortingKey ( ) . expression ,
metadata_snapshot - > getSecondaryIndices ( ) ,
data . merging_params ,
gathering_columns ,
gathering_column_names ,
merging_columns ,
merging_column_names ) ;
2017-04-01 07:20:54 +00:00
2020-08-21 15:44:29 +00:00
auto single_disk_volume = std : : make_shared < SingleDiskVolume > ( " volume_ " + future_part . name , disk ) ;
2019-11-21 16:10:22 +00:00
MergeTreeData : : MutableDataPartPtr new_data_part = data . createPart (
2019-11-22 13:58:18 +00:00
future_part . name ,
2020-02-11 13:41:26 +00:00
future_part . type ,
2019-11-22 13:58:18 +00:00
future_part . part_info ,
2020-05-09 21:24:15 +00:00
single_disk_volume ,
2019-11-21 16:10:22 +00:00
TMP_PREFIX + future_part . name ) ;
2019-10-10 16:30:30 +00:00
2020-05-19 09:54:56 +00:00
new_data_part - > setColumns ( storage_columns ) ;
2017-08-19 18:11:20 +00:00
new_data_part - > partition . assign ( future_part . getPartition ( ) ) ;
2017-04-01 07:20:54 +00:00
new_data_part - > is_temp = true ;
2020-08-28 13:45:42 +00:00
bool need_remove_expired_values = false ;
2020-09-03 08:59:41 +00:00
bool force_ttl = false ;
2019-11-25 11:06:59 +00:00
for ( const auto & part : parts )
2020-09-03 08:59:41 +00:00
{
2019-04-15 09:30:45 +00:00
new_data_part - > ttl_infos . update ( part - > ttl_infos ) ;
2020-09-03 08:59:41 +00:00
if ( metadata_snapshot - > hasAnyTTL ( ) & & ! part - > checkAllTTLCalculated ( metadata_snapshot ) )
{
LOG_INFO ( log , " Some TTL values were not calculated for part {}. Will calculate them forcefully during merge. " , part - > name ) ;
need_remove_expired_values = true ;
force_ttl = true ;
}
}
2019-04-15 09:30:45 +00:00
const auto & part_min_ttl = new_data_part - > ttl_infos . part_min_ttl ;
if ( part_min_ttl & & part_min_ttl < = time_of_merge )
need_remove_expired_values = true ;
2019-08-01 15:36:12 +00:00
if ( need_remove_expired_values & & ttl_merges_blocker . isCancelled ( ) )
{
2020-05-23 22:24:01 +00:00
LOG_INFO ( log , " Part {} has values with expired TTL, but merges with TTL are cancelled. " , new_data_part - > name ) ;
2019-08-01 15:36:12 +00:00
need_remove_expired_values = false ;
}
2020-02-11 13:41:26 +00:00
size_t sum_input_rows_upper_bound = merge_entry - > total_rows_count ;
2020-06-25 16:55:45 +00:00
size_t sum_compressed_bytes_upper_bound = merge_entry - > total_size_bytes_compressed ;
2019-04-15 09:30:45 +00:00
MergeAlgorithm merge_alg = chooseMergeAlgorithm ( parts , sum_input_rows_upper_bound , gathering_columns , deduplicate , need_remove_expired_values ) ;
2020-09-10 14:56:15 +00:00
merge_entry - > merge_algorithm = merge_alg ;
2017-04-01 07:20:54 +00:00
2020-10-05 17:43:38 +00:00
LOG_DEBUG ( log , " Selected MergeAlgorithm: {} " , toString ( merge_alg ) ) ;
2017-04-01 07:20:54 +00:00
2018-08-20 16:07:07 +00:00
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
2020-03-17 15:10:56 +00:00
/// (which is locked in data.getTotalActiveSizeInBytes())
2018-08-20 16:07:07 +00:00
/// (which is locked in shared mode when input streams are created) and when inserting new data
/// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus
/// deadlock is impossible.
2020-09-01 10:49:53 +00:00
auto compression_codec = data . getCompressionCodecForPart ( merge_entry - > total_size_bytes_compressed , new_data_part - > ttl_infos , time_of_merge ) ;
2018-08-20 16:07:07 +00:00
2020-10-05 17:43:38 +00:00
auto tmp_disk = context . getTemporaryVolume ( ) - > getDisk ( ) ;
2017-07-04 12:38:53 +00:00
String rows_sources_file_path ;
2020-03-19 16:37:55 +00:00
std : : unique_ptr < WriteBufferFromFileBase > rows_sources_uncompressed_write_buf ;
2017-07-04 12:38:53 +00:00
std : : unique_ptr < WriteBuffer > rows_sources_write_buf ;
2019-11-18 15:18:50 +00:00
std : : optional < ColumnSizeEstimator > column_sizes ;
2017-07-04 12:38:53 +00:00
if ( merge_alg = = MergeAlgorithm : : Vertical )
{
2020-10-05 17:43:38 +00:00
tmp_disk - > createDirectories ( new_part_tmp_path ) ;
2017-07-04 12:38:53 +00:00
rows_sources_file_path = new_part_tmp_path + " rows_sources " ;
2020-10-05 17:43:38 +00:00
rows_sources_uncompressed_write_buf = tmp_disk - > writeFile ( rows_sources_file_path ) ;
2017-07-04 12:38:53 +00:00
rows_sources_write_buf = std : : make_unique < CompressedWriteBuffer > ( * rows_sources_uncompressed_write_buf ) ;
2019-11-18 15:18:50 +00:00
for ( const MergeTreeData : : DataPartPtr & part : parts )
part - > accumulateColumnSizes ( merged_column_to_size ) ;
2019-12-18 16:41:11 +00:00
2019-11-18 15:18:50 +00:00
column_sizes = ColumnSizeEstimator ( merged_column_to_size , merging_column_names , gathering_column_names ) ;
2017-07-04 12:38:53 +00:00
}
else
2017-04-01 07:20:54 +00:00
{
2020-05-19 09:54:56 +00:00
merging_columns = storage_columns ;
2017-04-01 07:20:54 +00:00
merging_column_names = all_column_names ;
gathering_columns . clear ( ) ;
gathering_column_names . clear ( ) ;
}
2020-06-26 21:55:48 +00:00
std : : optional < FileSyncGuard > sync_guard ;
2020-07-02 23:41:37 +00:00
if ( data . getSettings ( ) - > fsync_part_directory )
2020-06-26 21:55:48 +00:00
sync_guard . emplace ( disk , new_part_tmp_path ) ;
2017-04-01 07:20:54 +00:00
/** Read from all parts, merge and write into a new one.
* In passing , we calculate expression for sorting .
*/
2020-04-02 16:28:50 +00:00
Pipes pipes ;
2017-04-01 07:20:54 +00:00
UInt64 watch_prev_elapsed = 0 ;
2018-11-29 09:19:42 +00:00
/// We count total amount of bytes in parts
2018-11-29 11:55:34 +00:00
/// and use direct_io + aio if there is more than min_merge_bytes_to_use_direct_io
2018-11-28 15:05:28 +00:00
bool read_with_direct_io = false ;
2019-08-13 10:29:31 +00:00
if ( data_settings - > min_merge_bytes_to_use_direct_io ! = 0 )
2018-09-12 17:19:52 +00:00
{
size_t total_size = 0 ;
for ( const auto & part : parts )
{
2020-03-23 13:32:02 +00:00
total_size + = part - > getBytesOnDisk ( ) ;
2019-08-13 10:29:31 +00:00
if ( total_size > = data_settings - > min_merge_bytes_to_use_direct_io )
2018-09-12 17:19:52 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_DEBUG ( log , " Will merge parts reading files in O_DIRECT " ) ;
2018-11-28 15:05:28 +00:00
read_with_direct_io = true ;
2018-09-12 17:19:52 +00:00
break ;
}
}
}
2019-01-11 19:14:50 +00:00
MergeStageProgress horizontal_stage_progress (
2019-12-19 14:05:26 +00:00
column_sizes ? column_sizes - > keyColumnsWeight ( ) : 1.0 ) ;
2019-04-15 09:30:45 +00:00
2017-09-14 13:56:54 +00:00
for ( const auto & part : parts )
2017-04-01 07:20:54 +00:00
{
2020-04-02 16:28:50 +00:00
auto input = std : : make_unique < MergeTreeSequentialSource > (
2020-06-16 14:25:08 +00:00
data , metadata_snapshot , part , merging_column_names , read_with_direct_io , true ) ;
2017-04-01 07:20:54 +00:00
2019-01-11 19:14:50 +00:00
input - > setProgressCallback (
MergeProgressCallback ( merge_entry , watch_prev_elapsed , horizontal_stage_progress ) ) ;
2017-04-01 07:20:54 +00:00
2020-04-02 16:28:50 +00:00
Pipe pipe ( std : : move ( input ) ) ;
2020-06-17 11:05:11 +00:00
if ( metadata_snapshot - > hasSortingKey ( ) )
2020-04-02 16:28:50 +00:00
{
2020-08-06 12:24:05 +00:00
pipe . addSimpleTransform ( [ & metadata_snapshot ] ( const Block & header )
{
return std : : make_shared < ExpressionTransform > ( header , metadata_snapshot - > getSortingKey ( ) . expression ) ;
} ) ;
2020-04-02 16:28:50 +00:00
}
pipes . emplace_back ( std : : move ( pipe ) ) ;
2017-04-01 07:20:54 +00:00
}
2020-06-17 11:05:11 +00:00
Names sort_columns = metadata_snapshot - > getSortingKeyColumns ( ) ;
2018-06-30 21:35:01 +00:00
SortDescription sort_description ;
size_t sort_columns_size = sort_columns . size ( ) ;
sort_description . reserve ( sort_columns_size ) ;
2020-07-06 14:33:31 +00:00
Names partition_key_columns = metadata_snapshot - > getPartitionKey ( ) . column_names ;
2020-04-02 16:28:50 +00:00
Block header = pipes . at ( 0 ) . getHeader ( ) ;
2018-06-30 21:35:01 +00:00
for ( size_t i = 0 ; i < sort_columns_size ; + + i )
sort_description . emplace_back ( header . getPositionByName ( sort_columns [ i ] ) , 1 , 1 ) ;
2017-04-14 20:09:18 +00:00
/// The order of the streams is important: when the key is matched, the elements go in the order of the source stream number.
2017-04-01 07:20:54 +00:00
/// In the merged part, the lines with the same key must be in the ascending order of the identifier of original part,
2017-04-14 20:09:18 +00:00
/// that is going in insertion order.
2020-04-02 16:28:50 +00:00
ProcessorPtr merged_transform ;
2017-04-01 07:20:54 +00:00
2019-03-20 17:20:10 +00:00
/// If merge is vertical we cannot calculate it
bool blocks_are_granules_size = ( merge_alg = = MergeAlgorithm : : Vertical ) ;
2019-10-23 23:18:21 +00:00
UInt64 merge_block_size = data_settings - > merge_max_block_size ;
2017-04-01 07:20:54 +00:00
switch ( data . merging_params . mode )
{
case MergeTreeData : : MergingParams : : Ordinary :
2020-04-02 16:28:50 +00:00
merged_transform = std : : make_unique < MergingSortedTransform > (
header , pipes . size ( ) , sort_description , merge_block_size , 0 , rows_sources_write_buf . get ( ) , true , blocks_are_granules_size ) ;
2017-04-01 07:20:54 +00:00
break ;
case MergeTreeData : : MergingParams : : Collapsing :
2020-04-02 16:28:50 +00:00
merged_transform = std : : make_unique < CollapsingSortedTransform > (
2020-04-22 13:52:07 +00:00
header , pipes . size ( ) , sort_description , data . merging_params . sign_column , false ,
2019-10-21 17:57:26 +00:00
merge_block_size , rows_sources_write_buf . get ( ) , blocks_are_granules_size ) ;
2017-04-01 07:20:54 +00:00
break ;
case MergeTreeData : : MergingParams : : Summing :
2020-04-02 16:28:50 +00:00
merged_transform = std : : make_unique < SummingSortedTransform > (
2020-07-06 14:33:31 +00:00
header , pipes . size ( ) , sort_description , data . merging_params . columns_to_sum , partition_key_columns , merge_block_size ) ;
2017-04-01 07:20:54 +00:00
break ;
case MergeTreeData : : MergingParams : : Aggregating :
2020-04-02 16:28:50 +00:00
merged_transform = std : : make_unique < AggregatingSortedTransform > (
header , pipes . size ( ) , sort_description , merge_block_size ) ;
2017-04-01 07:20:54 +00:00
break ;
case MergeTreeData : : MergingParams : : Replacing :
2020-04-02 16:28:50 +00:00
merged_transform = std : : make_unique < ReplacingSortedTransform > (
header , pipes . size ( ) , sort_description , data . merging_params . version_column ,
2019-10-21 17:57:26 +00:00
merge_block_size , rows_sources_write_buf . get ( ) , blocks_are_granules_size ) ;
2017-04-01 07:20:54 +00:00
break ;
case MergeTreeData : : MergingParams : : Graphite :
2020-04-02 16:28:50 +00:00
merged_transform = std : : make_unique < GraphiteRollupSortedTransform > (
header , pipes . size ( ) , sort_description , merge_block_size ,
2017-04-01 07:20:54 +00:00
data . merging_params . graphite_params , time_of_merge ) ;
break ;
2018-02-02 09:46:54 +00:00
case MergeTreeData : : MergingParams : : VersionedCollapsing :
2020-04-02 16:28:50 +00:00
merged_transform = std : : make_unique < VersionedCollapsingTransform > (
header , pipes . size ( ) , sort_description , data . merging_params . sign_column ,
2019-10-21 17:57:26 +00:00
merge_block_size , rows_sources_write_buf . get ( ) , blocks_are_granules_size ) ;
2018-01-29 17:42:19 +00:00
break ;
2017-04-01 07:20:54 +00:00
}
2020-07-31 13:23:19 +00:00
QueryPipeline pipeline ;
2020-08-06 12:24:05 +00:00
pipeline . init ( Pipe : : unitePipes ( std : : move ( pipes ) ) ) ;
pipeline . addTransform ( std : : move ( merged_transform ) ) ;
2020-07-31 13:23:19 +00:00
pipeline . setMaxThreads ( 1 ) ;
BlockInputStreamPtr merged_stream = std : : make_shared < PipelineExecutingBlockInputStream > ( std : : move ( pipeline ) ) ;
2020-04-02 16:28:50 +00:00
2018-05-31 18:28:04 +00:00
if ( deduplicate )
2020-07-21 08:05:52 +00:00
merged_stream = std : : make_shared < DistinctSortedBlockInputStream > ( merged_stream , sort_description , SizeLimits ( ) , 0 /*limit_hint*/ , Names ( ) ) ;
2017-04-11 16:08:02 +00:00
2020-09-02 16:15:41 +00:00
if ( need_remove_expired_values )
2020-09-03 08:59:41 +00:00
merged_stream = std : : make_shared < TTLBlockInputStream > ( merged_stream , data , metadata_snapshot , new_data_part , time_of_merge , force_ttl ) ;
2018-11-30 15:36:10 +00:00
2020-06-17 09:38:47 +00:00
if ( metadata_snapshot - > hasSecondaryIndices ( ) )
2020-05-23 23:02:27 +00:00
{
2020-06-17 09:38:47 +00:00
const auto & indices = metadata_snapshot - > getSecondaryIndices ( ) ;
merged_stream = std : : make_shared < ExpressionBlockInputStream > ( merged_stream , indices . getSingleExpressionForIndices ( metadata_snapshot - > getColumns ( ) , data . global_context ) ) ;
2020-05-23 23:02:27 +00:00
merged_stream = std : : make_shared < MaterializingBlockInputStream > ( merged_stream ) ;
}
2020-06-01 19:58:11 +00:00
const auto & index_factory = MergeTreeIndexFactory : : instance ( ) ;
2017-04-01 07:20:54 +00:00
MergedBlockOutputStream to {
2019-10-21 15:33:59 +00:00
new_data_part ,
2020-06-16 15:51:29 +00:00
metadata_snapshot ,
2018-11-30 15:36:10 +00:00
merging_columns ,
2020-06-17 09:38:47 +00:00
index_factory . getMany ( metadata_snapshot - > getSecondaryIndices ( ) ) ,
2019-03-18 12:02:33 +00:00
compression_codec ,
2018-11-30 15:36:10 +00:00
merged_column_to_size ,
2019-08-13 10:29:31 +00:00
data_settings - > min_merge_bytes_to_use_direct_io ,
2019-03-19 13:10:24 +00:00
blocks_are_granules_size } ;
2017-04-01 07:20:54 +00:00
merged_stream - > readPrefix ( ) ;
to . writePrefix ( ) ;
size_t rows_written = 0 ;
2019-09-09 13:50:19 +00:00
const size_t initial_reservation = space_reservation ? space_reservation - > getSize ( ) : 0 ;
2017-04-01 07:20:54 +00:00
2019-08-01 15:36:12 +00:00
auto is_cancelled = [ & ] ( ) { return merges_blocker . isCancelled ( )
| | ( need_remove_expired_values & & ttl_merges_blocker . isCancelled ( ) ) ; } ;
2017-04-01 07:20:54 +00:00
Block block ;
2019-08-01 15:36:12 +00:00
while ( ! is_cancelled ( ) & & ( block = merged_stream - > read ( ) ) )
2017-04-01 07:20:54 +00:00
{
rows_written + = block . rows ( ) ;
2019-04-15 09:30:45 +00:00
2017-04-01 07:20:54 +00:00
to . write ( block ) ;
merge_entry - > rows_written = merged_stream - > getProfileInfo ( ) . rows ;
merge_entry - > bytes_written_uncompressed = merged_stream - > getProfileInfo ( ) . bytes ;
/// Reservation updates is not performed yet, during the merge it may lead to higher free space requirements
2019-09-09 13:50:19 +00:00
if ( space_reservation & & sum_input_rows_upper_bound )
2017-04-01 07:20:54 +00:00
{
/// The same progress from merge_entry could be used for both algorithms (it should be more accurate)
2019-01-22 19:56:53 +00:00
/// But now we are using inaccurate row-based estimation in Horizontal case for backward compatibility
2017-04-01 07:20:54 +00:00
Float64 progress = ( merge_alg = = MergeAlgorithm : : Horizontal )
? std : : min ( 1. , 1. * rows_written / sum_input_rows_upper_bound )
2018-03-03 18:00:46 +00:00
: std : : min ( 1. , merge_entry - > progress . load ( std : : memory_order_relaxed ) ) ;
2017-04-01 07:20:54 +00:00
2019-09-09 13:50:19 +00:00
space_reservation - > update ( static_cast < size_t > ( ( 1. - progress ) * initial_reservation ) ) ;
2017-04-01 07:20:54 +00:00
}
}
2019-09-10 08:56:27 +00:00
2017-04-01 07:20:54 +00:00
merged_stream - > readSuffix ( ) ;
merged_stream . reset ( ) ;
2019-08-01 15:36:12 +00:00
if ( merges_blocker . isCancelled ( ) )
2017-04-01 07:20:54 +00:00
throw Exception ( " Cancelled merging parts " , ErrorCodes : : ABORTED ) ;
2019-08-01 15:36:12 +00:00
if ( need_remove_expired_values & & ttl_merges_blocker . isCancelled ( ) )
throw Exception ( " Cancelled merging parts with expired TTL " , ErrorCodes : : ABORTED ) ;
2020-06-25 16:55:45 +00:00
bool need_sync = needSyncPart ( sum_input_rows_upper_bound , sum_compressed_bytes_upper_bound , * data_settings ) ;
2017-04-01 07:20:54 +00:00
MergeTreeData : : DataPart : : Checksums checksums_gathered_columns ;
/// Gather ordinary columns
if ( merge_alg = = MergeAlgorithm : : Vertical )
{
size_t sum_input_rows_exact = merge_entry - > rows_read ;
merge_entry - > columns_written = merging_column_names . size ( ) ;
2019-11-18 15:18:50 +00:00
merge_entry - > progress . store ( column_sizes - > keyColumnsWeight ( ) , std : : memory_order_relaxed ) ;
2017-04-01 07:20:54 +00:00
BlockInputStreams column_part_streams ( parts . size ( ) ) ;
auto it_name_and_type = gathering_columns . cbegin ( ) ;
2017-07-04 12:38:53 +00:00
rows_sources_write_buf - > next ( ) ;
rows_sources_uncompressed_write_buf - > next ( ) ;
2020-03-19 16:37:55 +00:00
/// Ensure data has written to disk.
rows_sources_uncompressed_write_buf - > finalize ( ) ;
2018-09-05 16:13:22 +00:00
size_t rows_sources_count = rows_sources_write_buf - > count ( ) ;
/// In special case, when there is only one source part, and no rows were skipped, we may have
/// skipped writing rows_sources file. Otherwise rows_sources_count must be equal to the total
/// number of input rows.
if ( ( rows_sources_count > 0 | | parts . size ( ) > 1 ) & & sum_input_rows_exact ! = rows_sources_count )
throw Exception ( " Number of rows in source parts ( " + toString ( sum_input_rows_exact )
+ " ) differs from number of bytes written to rows_sources file ( " + toString ( rows_sources_count )
+ " ). It is a bug. " , ErrorCodes : : LOGICAL_ERROR ) ;
2020-10-05 17:43:38 +00:00
CompressedReadBufferFromFile rows_sources_read_buf ( tmp_disk - > readFile ( rows_sources_file_path ) ) ;
2018-10-16 21:22:41 +00:00
IMergedBlockOutputStream : : WrittenOffsetColumns written_offset_columns ;
2017-07-04 12:38:53 +00:00
2017-04-01 07:20:54 +00:00
for ( size_t column_num = 0 , gathering_column_names_size = gathering_column_names . size ( ) ;
column_num < gathering_column_names_size ;
+ + column_num , + + it_name_and_type )
{
const String & column_name = it_name_and_type - > name ;
2018-10-16 21:22:41 +00:00
Names column_names { column_name } ;
2018-03-03 18:00:46 +00:00
Float64 progress_before = merge_entry - > progress . load ( std : : memory_order_relaxed ) ;
2017-04-01 07:20:54 +00:00
2019-11-18 15:18:50 +00:00
MergeStageProgress column_progress ( progress_before , column_sizes - > columnWeight ( column_name ) ) ;
2017-04-01 07:20:54 +00:00
for ( size_t part_num = 0 ; part_num < parts . size ( ) ; + + part_num )
{
2020-04-02 16:28:50 +00:00
auto column_part_source = std : : make_shared < MergeTreeSequentialSource > (
2020-06-16 14:25:08 +00:00
data , metadata_snapshot , parts [ part_num ] , column_names , read_with_direct_io , true ) ;
2017-04-01 07:20:54 +00:00
2020-04-02 16:28:50 +00:00
column_part_source - > setProgressCallback (
2019-01-11 19:14:50 +00:00
MergeProgressCallback ( merge_entry , watch_prev_elapsed , column_progress ) ) ;
2017-04-01 07:20:54 +00:00
2020-07-31 13:23:19 +00:00
QueryPipeline column_part_pipeline ;
column_part_pipeline . init ( Pipe ( std : : move ( column_part_source ) ) ) ;
column_part_pipeline . setMaxThreads ( 1 ) ;
column_part_streams [ part_num ] =
std : : make_shared < PipelineExecutingBlockInputStream > ( std : : move ( column_part_pipeline ) ) ;
2017-04-01 07:20:54 +00:00
}
2017-07-04 12:38:53 +00:00
rows_sources_read_buf . seek ( 0 , 0 ) ;
ColumnGathererStream column_gathered_stream ( column_name , column_part_streams , rows_sources_read_buf ) ;
2019-08-28 18:23:20 +00:00
2018-10-16 21:22:41 +00:00
MergedColumnOnlyOutputStream column_to (
2019-10-21 15:33:59 +00:00
new_data_part ,
2020-06-16 15:51:29 +00:00
metadata_snapshot ,
2019-03-18 12:02:33 +00:00
column_gathered_stream . getHeader ( ) ,
compression_codec ,
2019-08-29 10:49:26 +00:00
/// we don't need to recalc indices here
/// because all of them were already recalculated and written
/// as key part of vertical merge
std : : vector < MergeTreeIndexPtr > { } ,
2019-12-09 21:21:17 +00:00
& written_offset_columns ,
2019-08-29 10:49:26 +00:00
to . getIndexGranularity ( ) ) ;
2017-04-01 21:43:25 +00:00
size_t column_elems_written = 0 ;
2017-04-01 07:20:54 +00:00
column_to . writePrefix ( ) ;
2019-08-01 15:36:12 +00:00
while ( ! merges_blocker . isCancelled ( ) & & ( block = column_gathered_stream . read ( ) ) )
2017-04-01 07:20:54 +00:00
{
2017-04-01 21:43:25 +00:00
column_elems_written + = block . rows ( ) ;
2017-04-01 07:20:54 +00:00
column_to . write ( block ) ;
}
2019-02-12 19:04:13 +00:00
2019-08-01 15:36:12 +00:00
if ( merges_blocker . isCancelled ( ) )
2019-02-12 19:04:13 +00:00
throw Exception ( " Cancelled merging parts " , ErrorCodes : : ABORTED ) ;
2017-04-01 07:20:54 +00:00
column_gathered_stream . readSuffix ( ) ;
2020-06-25 16:55:45 +00:00
auto changed_checksums = column_to . writeSuffixAndGetChecksums ( new_data_part , checksums_gathered_columns , need_sync ) ;
2020-03-13 15:09:55 +00:00
checksums_gathered_columns . add ( std : : move ( changed_checksums ) ) ;
2017-04-01 07:20:54 +00:00
2017-04-01 21:43:25 +00:00
if ( rows_written ! = column_elems_written )
{
throw Exception ( " Written " + toString ( column_elems_written ) + " elements of column " + column_name +
" , but " + toString ( rows_written ) + " rows of PK columns " , ErrorCodes : : LOGICAL_ERROR ) ;
}
2018-03-03 18:00:46 +00:00
/// NOTE: 'progress' is modified by single thread, but it may be concurrently read from MergeListElement::getInfo() (StorageSystemMerges).
2019-01-11 19:14:50 +00:00
merge_entry - > columns_written + = 1 ;
2017-04-01 07:20:54 +00:00
merge_entry - > bytes_written_uncompressed + = column_gathered_stream . getProfileInfo ( ) . bytes ;
2019-11-18 15:18:50 +00:00
merge_entry - > progress . store ( progress_before + column_sizes - > columnWeight ( column_name ) , std : : memory_order_relaxed ) ;
2017-04-01 07:20:54 +00:00
}
2017-07-04 12:38:53 +00:00
2020-10-05 17:43:38 +00:00
tmp_disk - > remove ( rows_sources_file_path ) ;
2017-04-01 07:20:54 +00:00
}
2017-08-16 19:24:50 +00:00
for ( const auto & part : parts )
2017-08-18 19:46:26 +00:00
new_data_part - > minmax_idx . merge ( part - > minmax_idx ) ;
2017-08-16 19:24:50 +00:00
2017-04-01 07:20:54 +00:00
/// Print overall profiling info. NOTE: it may duplicates previous messages
{
double elapsed_seconds = merge_entry - > watch . elapsedSeconds ( ) ;
2020-05-30 21:35:52 +00:00
LOG_DEBUG ( log ,
" Merge sorted {} rows, containing {} columns ({} merged, {} gathered) in {} sec., {} rows/sec., {}/sec. " ,
merge_entry - > rows_read ,
all_column_names . size ( ) ,
merging_column_names . size ( ) ,
gathering_column_names . size ( ) ,
elapsed_seconds ,
merge_entry - > rows_read / elapsed_seconds ,
ReadableSize ( merge_entry - > bytes_read_uncompressed / elapsed_seconds ) ) ;
2017-04-01 07:20:54 +00:00
}
if ( merge_alg ! = MergeAlgorithm : : Vertical )
2020-06-25 16:55:45 +00:00
to . writeSuffixAndFinalizePart ( new_data_part , need_sync ) ;
2017-04-01 07:20:54 +00:00
else
2020-06-25 16:55:45 +00:00
to . writeSuffixAndFinalizePart ( new_data_part , need_sync , & storage_columns , & checksums_gathered_columns ) ;
2017-04-01 07:20:54 +00:00
return new_data_part ;
2016-03-31 01:25:16 +00:00
}
2018-05-13 00:24:52 +00:00
MergeTreeData : : MutableDataPartPtr MergeTreeDataMergerMutator : : mutatePartToTemporaryPart (
2019-01-13 22:02:33 +00:00
const FutureMergedMutatedPart & future_part ,
2020-06-16 12:19:21 +00:00
const StorageMetadataPtr & metadata_snapshot ,
2019-12-18 13:09:58 +00:00
const MutationCommands & commands ,
2019-01-11 19:14:50 +00:00
MergeListEntry & merge_entry ,
2020-02-19 17:48:44 +00:00
time_t time_of_mutation ,
2019-04-03 12:52:09 +00:00
const Context & context ,
2019-11-27 09:39:44 +00:00
const ReservationPtr & space_reservation ,
2020-06-18 16:10:47 +00:00
TableLockHolder & )
2018-05-13 00:24:52 +00:00
{
2020-03-19 14:11:37 +00:00
checkOperationIsNotCanceled ( merge_entry ) ;
2018-05-13 00:24:52 +00:00
if ( future_part . parts . size ( ) ! = 1 )
throw Exception ( " Trying to mutate " + toString ( future_part . parts . size ( ) ) + " parts, not one. "
" This is a bug. " , ErrorCodes : : LOGICAL_ERROR ) ;
2018-05-25 19:44:14 +00:00
CurrentMetrics : : Increment num_mutations { CurrentMetrics : : PartMutation } ;
2018-05-13 00:24:52 +00:00
const auto & source_part = future_part . parts [ 0 ] ;
2018-07-18 16:07:08 +00:00
auto storage_from_source_part = StorageFromMergeTreeDataPart : : create ( source_part ) ;
auto context_for_reading = context ;
2020-03-13 14:50:26 +00:00
context_for_reading . setSetting ( " max_streams_to_max_threads_ratio " , 1 ) ;
context_for_reading . setSetting ( " max_threads " , 1 ) ;
2020-09-18 15:50:28 +00:00
/// Allow mutations to work when force_index_by_date or force_primary_key is on.
2020-09-18 22:29:31 +00:00
context_for_reading . setSetting ( " force_index_by_date " , Field ( 0 ) ) ;
context_for_reading . setSetting ( " force_primary_key " , Field ( 0 ) ) ;
2018-05-13 00:24:52 +00:00
2020-02-25 09:49:45 +00:00
MutationCommands commands_for_part ;
2020-01-13 16:39:20 +00:00
for ( const auto & command : commands )
{
if ( command . partition = = nullptr | | future_part . parts [ 0 ] - > info . partition_id = = data . getPartitionIDFromQuery (
command . partition , context_for_reading ) )
commands_for_part . emplace_back ( command ) ;
}
2018-09-03 13:36:58 +00:00
2020-07-30 20:58:08 +00:00
if ( source_part - > isStoredOnDisk ( ) & & ! isStorageTouchedByMutations (
storage_from_source_part , metadata_snapshot , commands_for_part , context_for_reading ) )
2018-07-18 16:07:08 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_TRACE ( log , " Part {} doesn't change up to mutation version {} " , source_part - > name , future_part . part_info . mutation ) ;
2020-06-26 11:30:23 +00:00
return data . cloneAndLoadDataPartOnSameDisk ( source_part , " tmp_clone_ " , future_part . part_info , metadata_snapshot ) ;
2018-07-18 16:07:08 +00:00
}
else
2020-02-25 13:46:45 +00:00
{
2020-05-23 22:24:01 +00:00
LOG_TRACE ( log , " Mutating part {} to mutation version {} " , source_part - > name , future_part . part_info . mutation ) ;
2020-02-25 13:46:45 +00:00
}
2018-05-13 00:24:52 +00:00
2020-02-25 09:49:45 +00:00
BlockInputStreamPtr in = nullptr ;
Block updated_header ;
2020-07-30 20:58:08 +00:00
std : : unique_ptr < MutationsInterpreter > interpreter ;
2018-05-13 00:24:52 +00:00
2020-02-25 09:49:45 +00:00
const auto data_settings = data . getSettings ( ) ;
2020-06-20 07:24:51 +00:00
MutationCommands for_interpreter ;
MutationCommands for_file_renames ;
2019-11-25 20:19:43 +00:00
2020-02-25 09:49:45 +00:00
splitMutationCommands ( source_part , commands_for_part , for_interpreter , for_file_renames ) ;
2019-11-25 20:19:43 +00:00
2020-02-25 09:49:45 +00:00
UInt64 watch_prev_elapsed = 0 ;
MergeStageProgress stage_progress ( 1.0 ) ;
2020-06-17 16:39:58 +00:00
NamesAndTypesList storage_columns = metadata_snapshot - > getColumns ( ) . getAllPhysical ( ) ;
2020-02-25 09:49:45 +00:00
if ( ! for_interpreter . empty ( ) )
{
2020-07-30 20:58:08 +00:00
interpreter = std : : make_unique < MutationsInterpreter > (
storage_from_source_part , metadata_snapshot , for_interpreter , context_for_reading , true ) ;
2020-06-18 16:10:47 +00:00
in = interpreter - > execute ( ) ;
2020-02-25 09:49:45 +00:00
updated_header = interpreter - > getUpdatedHeader ( ) ;
in - > setProgressCallback ( MergeProgressCallback ( merge_entry , watch_prev_elapsed , stage_progress ) ) ;
}
2019-11-25 20:19:43 +00:00
2020-08-21 15:44:29 +00:00
auto single_disk_volume = std : : make_shared < SingleDiskVolume > ( " volume_ " + future_part . name , space_reservation - > getDisk ( ) ) ;
2019-11-25 20:19:43 +00:00
auto new_data_part = data . createPart (
2020-05-09 21:24:15 +00:00
future_part . name , future_part . type , future_part . part_info , single_disk_volume , " tmp_mut_ " + future_part . name ) ;
2020-01-15 13:00:08 +00:00
2018-05-13 00:24:52 +00:00
new_data_part - > is_temp = true ;
2019-04-15 09:30:45 +00:00
new_data_part - > ttl_infos = source_part - > ttl_infos ;
2019-11-25 20:19:43 +00:00
2020-02-04 12:11:32 +00:00
/// It shouldn't be changed by mutation.
2019-08-19 10:37:04 +00:00
new_data_part - > index_granularity_info = source_part - > index_granularity_info ;
2020-05-19 09:54:56 +00:00
new_data_part - > setColumns ( getColumnsForNewDataPart ( source_part , updated_header , storage_columns , for_file_renames ) ) ;
2020-03-18 13:16:59 +00:00
new_data_part - > partition . assign ( source_part - > partition ) ;
2018-05-13 00:24:52 +00:00
2020-05-09 21:24:15 +00:00
auto disk = new_data_part - > volume - > getDisk ( ) ;
2020-03-19 16:37:55 +00:00
String new_part_tmp_path = new_data_part - > getFullRelativePath ( ) ;
2018-05-13 00:24:52 +00:00
2020-03-19 16:37:55 +00:00
disk - > createDirectories ( new_part_tmp_path ) ;
2019-08-05 18:06:05 +00:00
2020-06-26 21:55:48 +00:00
std : : optional < FileSyncGuard > sync_guard ;
2020-07-02 23:41:37 +00:00
if ( data . getSettings ( ) - > fsync_part_directory )
2020-06-26 21:55:48 +00:00
sync_guard . emplace ( disk , new_part_tmp_path ) ;
2020-02-17 15:44:13 +00:00
/// Don't change granularity type while mutating subset of columns
2020-02-25 09:49:45 +00:00
auto mrk_extension = source_part - > index_granularity_info . is_adaptive ? getAdaptiveMrkExtension ( new_data_part - > getType ( ) )
: getNonAdaptiveMrkExtension ( ) ;
2020-06-25 16:55:45 +00:00
bool need_sync = needSyncPart ( source_part - > rows_count , source_part - > getBytesOnDisk ( ) , * data_settings ) ;
2020-02-25 09:49:45 +00:00
bool need_remove_expired_values = false ;
2020-02-25 10:50:17 +00:00
2020-06-16 12:19:21 +00:00
if ( in & & shouldExecuteTTL ( metadata_snapshot , in - > getHeader ( ) . getNamesAndTypesList ( ) . getNames ( ) , commands_for_part ) )
2020-02-25 10:50:17 +00:00
need_remove_expired_values = true ;
2020-01-21 11:56:01 +00:00
/// All columns from part are changed and may be some more that were missing before in part
2020-07-26 14:21:57 +00:00
if ( ! isWidePart ( source_part ) | | ( interpreter & & interpreter - > isAffectingAllColumns ( ) ) )
2018-08-28 13:48:16 +00:00
{
2020-09-01 10:49:53 +00:00
/// Note: this is done before creating input streams, because otherwise data.data_parts_mutex
/// (which is locked in data.getTotalActiveSizeInBytes())
/// (which is locked in shared mode when input streams are created) and when inserting new data
/// the order is reverse. This annoys TSan even though one lock is locked in shared mode and thus
/// deadlock is impossible.
auto compression_codec = data . getCompressionCodecForPart ( source_part - > getBytesOnDisk ( ) , source_part - > ttl_infos , time_of_mutation ) ;
2020-06-17 09:38:47 +00:00
auto part_indices = getIndicesForNewDataPart ( metadata_snapshot - > getSecondaryIndices ( ) , for_file_renames ) ;
2020-03-18 13:16:59 +00:00
mutateAllPartColumns (
2019-11-07 11:11:38 +00:00
new_data_part ,
2020-06-16 15:51:29 +00:00
metadata_snapshot ,
2020-04-10 13:36:51 +00:00
part_indices ,
2020-03-18 13:16:59 +00:00
in ,
time_of_mutation ,
compression_codec ,
merge_entry ,
2020-06-25 16:55:45 +00:00
need_remove_expired_values ,
need_sync ) ;
2020-03-18 13:16:59 +00:00
/// no finalization required, because mutateAllPartColumns use
/// MergedBlockOutputStream which finilaze all part fields itself
2018-05-13 00:24:52 +00:00
}
2020-03-18 13:16:59 +00:00
else /// TODO: check that we modify only non-key columns in this case.
2018-08-28 13:48:16 +00:00
{
/// We will modify only some of the columns. Other columns and key values can be copied as-is.
2020-06-17 09:38:47 +00:00
auto indices_to_recalc = getIndicesToRecalculate ( in , updated_header . getNamesAndTypesList ( ) , metadata_snapshot , context ) ;
2019-02-05 14:50:25 +00:00
2020-08-26 15:29:46 +00:00
NameSet files_to_skip = collectFilesToSkip ( source_part , updated_header , indices_to_recalc , mrk_extension ) ;
2020-05-13 17:43:30 +00:00
NameToNameVector files_to_rename = collectFilesForRenames ( source_part , for_file_renames , mrk_extension ) ;
2020-01-15 13:00:08 +00:00
2020-02-17 20:39:24 +00:00
if ( need_remove_expired_values )
2020-01-22 13:24:20 +00:00
files_to_skip . insert ( " ttl.txt " ) ;
2020-02-17 15:44:13 +00:00
/// Create hardlinks for unchanged files
2020-03-19 16:37:55 +00:00
for ( auto it = disk - > iterateDirectory ( source_part - > getFullRelativePath ( ) ) ; it - > isValid ( ) ; it - > next ( ) )
2018-08-28 13:48:16 +00:00
{
2020-03-30 12:51:05 +00:00
if ( files_to_skip . count ( it - > name ( ) ) )
2018-08-28 13:48:16 +00:00
continue ;
2018-05-13 00:24:52 +00:00
2020-03-30 12:51:05 +00:00
String destination = new_part_tmp_path + " / " ;
2020-05-13 17:43:30 +00:00
String file_name = it - > name ( ) ;
auto rename_it = std : : find_if ( files_to_rename . begin ( ) , files_to_rename . end ( ) , [ & file_name ] ( const auto & rename_pair ) { return rename_pair . first = = file_name ; } ) ;
2020-03-30 12:51:05 +00:00
if ( rename_it ! = files_to_rename . end ( ) )
{
if ( rename_it - > second . empty ( ) )
continue ;
destination + = rename_it - > second ;
}
else
{
destination + = it - > name ( ) ;
}
2018-05-13 00:24:52 +00:00
2020-03-19 16:37:55 +00:00
disk - > createHardLink ( it - > path ( ) , destination ) ;
2018-08-28 13:48:16 +00:00
}
2018-05-13 00:24:52 +00:00
2020-05-19 09:54:56 +00:00
merge_entry - > columns_written = storage_columns . size ( ) - updated_header . columns ( ) ;
2019-01-11 19:14:50 +00:00
2020-01-15 13:00:08 +00:00
new_data_part - > checksums = source_part - > checksums ;
2020-03-18 13:16:59 +00:00
2020-09-01 10:49:53 +00:00
auto compression_codec = source_part - > default_codec ;
2020-01-15 13:47:00 +00:00
if ( in )
2020-01-15 13:00:08 +00:00
{
2020-03-18 13:16:59 +00:00
mutateSomePartColumns (
source_part ,
2020-06-16 15:51:29 +00:00
metadata_snapshot ,
2020-03-18 13:16:59 +00:00
indices_to_recalc ,
2020-03-19 14:11:37 +00:00
updated_header ,
2020-02-25 09:49:45 +00:00
new_data_part ,
2020-03-18 13:16:59 +00:00
in ,
time_of_mutation ,
2020-01-15 13:00:08 +00:00
compression_codec ,
2020-03-18 13:16:59 +00:00
merge_entry ,
2020-06-25 16:55:45 +00:00
need_remove_expired_values ,
need_sync ) ;
2020-01-22 13:24:20 +00:00
}
2020-03-30 12:51:05 +00:00
for ( const auto & [ rename_from , rename_to ] : files_to_rename )
{
if ( rename_to . empty ( ) & & new_data_part - > checksums . files . count ( rename_from ) )
2020-03-31 16:18:18 +00:00
{
2020-03-30 12:51:05 +00:00
new_data_part - > checksums . files . erase ( rename_from ) ;
2020-03-31 16:18:18 +00:00
}
2020-03-30 12:51:05 +00:00
else if ( new_data_part - > checksums . files . count ( rename_from ) )
{
new_data_part - > checksums . files [ rename_to ] = new_data_part - > checksums . files [ rename_from ] ;
new_data_part - > checksums . files . erase ( rename_from ) ;
}
}
2020-02-17 15:44:13 +00:00
2020-08-26 15:29:46 +00:00
finalizeMutatedPart ( source_part , new_data_part , need_remove_expired_values , compression_codec ) ;
2018-08-28 13:48:16 +00:00
}
2017-04-01 07:20:54 +00:00
return new_data_part ;
2016-03-31 01:25:16 +00:00
}
2020-09-10 14:56:15 +00:00
MergeAlgorithm MergeTreeDataMergerMutator : : chooseMergeAlgorithm (
2019-01-04 12:10:00 +00:00
const MergeTreeData : : DataPartsVector & parts , size_t sum_rows_upper_bound ,
2019-04-15 09:30:45 +00:00
const NamesAndTypesList & gathering_columns , bool deduplicate , bool need_remove_expired_values ) const
2016-11-03 12:00:44 +00:00
{
2019-08-26 14:24:29 +00:00
const auto data_settings = data . getSettings ( ) ;
2019-08-13 10:29:31 +00:00
2017-04-11 16:08:02 +00:00
if ( deduplicate )
return MergeAlgorithm : : Horizontal ;
2019-08-13 10:29:31 +00:00
if ( data_settings - > enable_vertical_merge_algorithm = = 0 )
2017-04-01 07:20:54 +00:00
return MergeAlgorithm : : Horizontal ;
2019-04-15 09:30:45 +00:00
if ( need_remove_expired_values )
return MergeAlgorithm : : Horizontal ;
2016-11-03 12:00:44 +00:00
2019-10-16 18:27:53 +00:00
for ( const auto & part : parts )
if ( ! part - > supportsVerticalMerge ( ) )
return MergeAlgorithm : : Horizontal ;
2017-04-01 07:20:54 +00:00
bool is_supported_storage =
data . merging_params . mode = = MergeTreeData : : MergingParams : : Ordinary | |
2017-07-14 18:17:23 +00:00
data . merging_params . mode = = MergeTreeData : : MergingParams : : Collapsing | |
2018-01-29 17:42:19 +00:00
data . merging_params . mode = = MergeTreeData : : MergingParams : : Replacing | |
2018-02-02 09:46:54 +00:00
data . merging_params . mode = = MergeTreeData : : MergingParams : : VersionedCollapsing ;
2016-11-03 12:00:44 +00:00
2019-08-13 10:29:31 +00:00
bool enough_ordinary_cols = gathering_columns . size ( ) > = data_settings - > vertical_merge_algorithm_min_columns_to_activate ;
2016-11-03 12:00:44 +00:00
2019-08-13 10:29:31 +00:00
bool enough_total_rows = sum_rows_upper_bound > = data_settings - > vertical_merge_algorithm_min_rows_to_activate ;
2016-11-03 12:00:44 +00:00
2017-04-01 07:20:54 +00:00
bool no_parts_overflow = parts . size ( ) < = RowSourcePart : : MAX_PARTS ;
2016-11-03 12:00:44 +00:00
2017-04-01 07:20:54 +00:00
auto merge_alg = ( is_supported_storage & & enough_total_rows & & enough_ordinary_cols & & no_parts_overflow ) ?
MergeAlgorithm : : Vertical : MergeAlgorithm : : Horizontal ;
2016-11-03 12:00:44 +00:00
2017-04-01 07:20:54 +00:00
return merge_alg ;
2016-11-03 12:00:44 +00:00
}
2018-04-20 16:18:16 +00:00
MergeTreeData : : DataPartPtr MergeTreeDataMergerMutator : : renameMergedTemporaryPart (
2017-04-01 07:20:54 +00:00
MergeTreeData : : MutableDataPartPtr & new_data_part ,
2017-08-16 19:24:50 +00:00
const MergeTreeData : : DataPartsVector & parts ,
2017-04-01 07:20:54 +00:00
MergeTreeData : : Transaction * out_transaction )
2016-03-31 01:25:16 +00:00
{
2017-04-01 07:20:54 +00:00
/// Rename new part, add to the set and remove original parts.
auto replaced_parts = data . renameTempPartAndReplace ( new_data_part , nullptr , out_transaction ) ;
/// Let's check that all original parts have been deleted and only them.
if ( replaced_parts . size ( ) ! = parts . size ( ) )
{
/** This is normal, although this happens rarely.
*
* The situation - was replaced 0 parts instead of N can be , for example , in the following case
* - we had A part , but there was no B and C parts ;
* - A , B - > AB was in the queue , but it has not been done , because there is no B part ;
* - AB , C - > ABC was in the queue , but it has not been done , because there are no AB and C parts ;
* - we have completed the task of downloading a B part ;
* - we started to make A , B - > AB merge , since all parts appeared ;
* - we decided to download ABC part from another replica , since it was impossible to make merge AB , C - > ABC ;
* - ABC part appeared . When it was added , old A , B , C parts were deleted ;
* - AB merge finished . AB part was added . But this is an obsolete part . The log will contain the message ` Obsolete part added ` ,
* then we get here .
*
* When M > N parts could be replaced ?
* - new block was added in ReplicatedMergeTreeBlockOutputStream ;
* - it was added to working dataset in memory and renamed on filesystem ;
2018-07-04 16:31:21 +00:00
* - but ZooKeeper transaction that adds it to reference dataset in ZK failed ;
2017-04-01 07:20:54 +00:00
* - and it is failed due to connection loss , so we don ' t rollback working dataset in memory ,
* because we don ' t know if the part was added to ZK or not
* ( see ReplicatedMergeTreeBlockOutputStream )
2018-07-04 16:31:21 +00:00
* - then method selectPartsToMerge selects a range and sees , that EphemeralLock for the block in this part is unlocked ,
* and so it is possible to merge a range skipping this part .
2017-04-01 07:20:54 +00:00
* ( NOTE : Merging with part that is not in ZK is not possible , see checks in ' createLogEntryToMergeParts ' . )
* - and after merge , this part will be removed in addition to parts that was merged .
*/
2020-05-23 22:24:01 +00:00
LOG_WARNING ( log , " Unexpected number of parts removed when adding {}: {} instead of {} " , new_data_part - > name , replaced_parts . size ( ) , parts . size ( ) ) ;
2017-04-01 07:20:54 +00:00
}
else
{
for ( size_t i = 0 ; i < parts . size ( ) ; + + i )
if ( parts [ i ] - > name ! = replaced_parts [ i ] - > name )
throw Exception ( " Unexpected part removed when adding " + new_data_part - > name + " : " + replaced_parts [ i ] - > name
+ " instead of " + parts [ i ] - > name , ErrorCodes : : LOGICAL_ERROR ) ;
}
2020-05-23 22:24:01 +00:00
LOG_TRACE ( log , " Merged {} parts: from {} to {} " , parts . size ( ) , parts . front ( ) - > name , parts . back ( ) - > name ) ;
2017-04-01 07:20:54 +00:00
return new_data_part ;
2016-01-28 16:06:57 +00:00
}
2019-06-19 17:56:41 +00:00
2018-04-20 16:18:16 +00:00
size_t MergeTreeDataMergerMutator : : estimateNeededDiskSpace ( const MergeTreeData : : DataPartsVector & source_parts )
2014-03-13 17:44:00 +00:00
{
2017-04-01 07:20:54 +00:00
size_t res = 0 ;
2018-04-20 16:18:16 +00:00
for ( const MergeTreeData : : DataPartPtr & part : source_parts )
2020-03-23 13:32:02 +00:00
res + = part - > getBytesOnDisk ( ) ;
2015-06-11 00:35:36 +00:00
2017-04-01 07:20:54 +00:00
return static_cast < size_t > ( res * DISK_USAGE_COEFFICIENT_TO_RESERVE ) ;
2014-03-13 17:44:00 +00:00
}
2020-01-17 13:54:22 +00:00
void MergeTreeDataMergerMutator : : splitMutationCommands (
MergeTreeData : : DataPartPtr part ,
2020-02-25 09:49:45 +00:00
const MutationCommands & commands ,
MutationCommands & for_interpreter ,
2020-03-18 00:57:00 +00:00
MutationCommands & for_file_renames )
2020-01-17 13:54:22 +00:00
{
2020-05-19 09:54:56 +00:00
ColumnsDescription part_columns ( part - > getColumns ( ) ) ;
2020-06-03 13:27:54 +00:00
if ( ! isWidePart ( part ) )
2020-01-17 13:54:22 +00:00
{
2020-05-19 10:44:53 +00:00
NameSet mutated_columns ;
for ( const auto & command : commands )
2020-01-17 13:54:22 +00:00
{
2020-05-19 16:03:10 +00:00
if ( command . type = = MutationCommand : : Type : : MATERIALIZE_INDEX
| | command . type = = MutationCommand : : Type : : MATERIALIZE_TTL
| | command . type = = MutationCommand : : Type : : DELETE
| | command . type = = MutationCommand : : Type : : UPDATE )
2020-02-25 13:46:45 +00:00
{
2020-01-17 13:54:22 +00:00
for_interpreter . push_back ( command ) ;
2020-05-19 10:44:53 +00:00
for ( const auto & [ column_name , expr ] : command . column_to_update_expression )
mutated_columns . emplace ( column_name ) ;
2020-02-25 13:46:45 +00:00
}
2020-05-19 16:03:10 +00:00
else if ( command . type = = MutationCommand : : Type : : DROP_INDEX )
{
2020-01-17 13:54:22 +00:00
for_file_renames . push_back ( command ) ;
2020-05-19 16:03:10 +00:00
}
2020-05-19 10:44:53 +00:00
else if ( part_columns . has ( command . column_name ) )
2020-04-02 18:24:11 +00:00
{
2020-05-19 10:44:53 +00:00
if ( command . type = = MutationCommand : : Type : : DROP_COLUMN )
2020-04-02 18:24:11 +00:00
{
2020-05-19 10:44:53 +00:00
mutated_columns . emplace ( command . column_name ) ;
}
else if ( command . type = = MutationCommand : : Type : : RENAME_COLUMN )
2020-04-02 18:24:11 +00:00
{
2020-05-19 16:03:10 +00:00
for_interpreter . push_back (
{
2020-05-19 10:44:53 +00:00
. type = MutationCommand : : Type : : READ_COLUMN ,
. column_name = command . rename_to ,
} ) ;
mutated_columns . emplace ( command . column_name ) ;
part_columns . rename ( command . column_name , command . rename_to ) ;
}
2020-04-02 18:24:11 +00:00
}
}
2020-05-19 10:44:53 +00:00
/// If it's compact part than we don't need to actually remove files
/// from disk we just don't read dropped columns
for ( const auto & column : part - > getColumns ( ) )
2020-01-17 13:54:22 +00:00
{
2020-05-19 10:44:53 +00:00
if ( ! mutated_columns . count ( column . name ) )
for_interpreter . emplace_back (
MutationCommand { . type = MutationCommand : : Type : : READ_COLUMN , . column_name = column . name , . data_type = column . type } ) ;
2020-01-17 13:54:22 +00:00
}
}
2020-05-19 10:44:53 +00:00
else
2020-02-25 13:46:45 +00:00
{
2020-05-19 10:44:53 +00:00
for ( const auto & command : commands )
2020-02-25 13:46:45 +00:00
{
2020-05-19 16:03:10 +00:00
if ( command . type = = MutationCommand : : Type : : MATERIALIZE_INDEX
| | command . type = = MutationCommand : : Type : : MATERIALIZE_TTL
| | command . type = = MutationCommand : : Type : : DELETE
| | command . type = = MutationCommand : : Type : : UPDATE )
2020-02-25 13:46:45 +00:00
{
2020-05-19 10:44:53 +00:00
for_interpreter . push_back ( command ) ;
}
2020-05-19 16:03:10 +00:00
else if ( command . type = = MutationCommand : : Type : : DROP_INDEX )
{
for_file_renames . push_back ( command ) ;
}
2020-05-19 10:44:53 +00:00
/// If we don't have this column in source part, than we don't need
/// to materialize it
else if ( part_columns . has ( command . column_name ) )
2020-02-25 13:46:45 +00:00
{
2020-05-19 10:44:53 +00:00
if ( command . type = = MutationCommand : : Type : : READ_COLUMN )
2020-02-25 13:46:45 +00:00
{
2020-05-19 10:44:53 +00:00
for_interpreter . push_back ( command ) ;
}
else if ( command . type = = MutationCommand : : Type : : RENAME_COLUMN )
{
part_columns . rename ( command . column_name , command . rename_to ) ;
for_file_renames . push_back ( command ) ;
}
else
{
for_file_renames . push_back ( command ) ;
}
2020-02-25 13:46:45 +00:00
}
}
}
2020-01-17 13:54:22 +00:00
}
2020-02-17 15:44:13 +00:00
2020-05-13 17:43:30 +00:00
NameToNameVector MergeTreeDataMergerMutator : : collectFilesForRenames (
2020-03-18 00:57:00 +00:00
MergeTreeData : : DataPartPtr source_part , const MutationCommands & commands_for_removes , const String & mrk_extension )
2020-02-17 15:44:13 +00:00
{
/// Collect counts for shared streams of different columns. As an example, Nested columns have shared stream with array sizes.
std : : map < String , size_t > stream_counts ;
2020-02-25 09:49:45 +00:00
for ( const NameAndTypePair & column : source_part - > getColumns ( ) )
2020-02-17 15:44:13 +00:00
{
column . type - > enumerateStreams (
2020-09-18 11:37:58 +00:00
[ & ] ( const IDataType : : SubstreamPath & substream_path , const IDataType & /* substream_type */ )
2020-02-17 15:44:13 +00:00
{
+ + stream_counts [ IDataType : : getFileNameForStream ( column . name , substream_path ) ] ;
} ,
{ } ) ;
}
2020-05-13 17:43:30 +00:00
NameToNameVector rename_vector ;
2020-02-17 15:44:13 +00:00
/// Remove old indices
for ( const auto & command : commands_for_removes )
{
if ( command . type = = MutationCommand : : Type : : DROP_INDEX )
{
2020-05-13 17:43:30 +00:00
rename_vector . emplace_back ( " skp_idx_ " + command . column_name + " .idx " , " " ) ;
rename_vector . emplace_back ( " skp_idx_ " + command . column_name + mrk_extension , " " ) ;
2020-02-17 15:44:13 +00:00
}
2020-02-25 14:02:53 +00:00
else if ( command . type = = MutationCommand : : Type : : DROP_COLUMN )
2020-02-17 15:44:13 +00:00
{
2020-09-18 11:37:58 +00:00
IDataType : : StreamCallback callback = [ & ] ( const IDataType : : SubstreamPath & substream_path , const IDataType & /* substream_type */ )
2020-02-17 15:44:13 +00:00
{
String stream_name = IDataType : : getFileNameForStream ( command . column_name , substream_path ) ;
/// Delete files if they are no longer shared with another column.
if ( - - stream_counts [ stream_name ] = = 0 )
{
2020-05-13 17:43:30 +00:00
rename_vector . emplace_back ( stream_name + " .bin " , " " ) ;
rename_vector . emplace_back ( stream_name + mrk_extension , " " ) ;
2020-02-17 15:44:13 +00:00
}
} ;
IDataType : : SubstreamPath stream_path ;
2020-02-25 09:49:45 +00:00
auto column = source_part - > getColumns ( ) . tryGetByName ( command . column_name ) ;
2020-02-17 15:44:13 +00:00
if ( column )
column - > type - > enumerateStreams ( callback , stream_path ) ;
}
2020-03-30 12:51:05 +00:00
else if ( command . type = = MutationCommand : : Type : : RENAME_COLUMN )
{
2020-03-31 16:18:18 +00:00
String escaped_name_from = escapeForFileName ( command . column_name ) ;
String escaped_name_to = escapeForFileName ( command . rename_to ) ;
2020-09-18 11:37:58 +00:00
IDataType : : StreamCallback callback = [ & ] ( const IDataType : : SubstreamPath & substream_path , const IDataType & /* substream_type */ )
2020-03-30 12:51:05 +00:00
{
String stream_from = IDataType : : getFileNameForStream ( command . column_name , substream_path ) ;
2020-03-31 16:18:18 +00:00
String stream_to = boost : : replace_first_copy ( stream_from , escaped_name_from , escaped_name_to ) ;
if ( stream_from ! = stream_to )
{
2020-05-13 17:43:30 +00:00
rename_vector . emplace_back ( stream_from + " .bin " , stream_to + " .bin " ) ;
rename_vector . emplace_back ( stream_from + mrk_extension , stream_to + mrk_extension ) ;
2020-03-31 16:18:18 +00:00
}
2020-03-30 12:51:05 +00:00
} ;
IDataType : : SubstreamPath stream_path ;
auto column = source_part - > getColumns ( ) . tryGetByName ( command . column_name ) ;
if ( column )
column - > type - > enumerateStreams ( callback , stream_path ) ;
}
2020-02-17 15:44:13 +00:00
}
2020-03-18 00:57:00 +00:00
2020-05-13 17:43:30 +00:00
return rename_vector ;
2020-02-17 15:44:13 +00:00
}
NameSet MergeTreeDataMergerMutator : : collectFilesToSkip (
2020-08-26 15:29:46 +00:00
const MergeTreeDataPartPtr & source_part ,
const Block & updated_header ,
const std : : set < MergeTreeIndexPtr > & indices_to_recalc ,
const String & mrk_extension )
2020-02-17 15:44:13 +00:00
{
2020-08-26 15:29:46 +00:00
NameSet files_to_skip = source_part - > getFileNamesWithoutChecksums ( ) ;
2020-02-17 15:44:13 +00:00
/// Skip updated files
for ( const auto & entry : updated_header )
{
2020-09-18 11:37:58 +00:00
IDataType : : StreamCallback callback = [ & ] ( const IDataType : : SubstreamPath & substream_path , const IDataType & /* substream_type */ )
2020-02-17 18:09:04 +00:00
{
2020-02-17 15:44:13 +00:00
String stream_name = IDataType : : getFileNameForStream ( entry . name , substream_path ) ;
files_to_skip . insert ( stream_name + " .bin " ) ;
files_to_skip . insert ( stream_name + mrk_extension ) ;
} ;
IDataType : : SubstreamPath stream_path ;
entry . type - > enumerateStreams ( callback , stream_path ) ;
}
for ( const auto & index : indices_to_recalc )
{
files_to_skip . insert ( index - > getFileName ( ) + " .idx " ) ;
files_to_skip . insert ( index - > getFileName ( ) + mrk_extension ) ;
}
return files_to_skip ;
}
NamesAndTypesList MergeTreeDataMergerMutator : : getColumnsForNewDataPart (
2020-03-17 13:49:50 +00:00
MergeTreeData : : DataPartPtr source_part ,
const Block & updated_header ,
2020-05-19 09:54:56 +00:00
NamesAndTypesList storage_columns ,
2020-03-23 02:12:31 +00:00
const MutationCommands & commands_for_removes )
2020-02-17 15:44:13 +00:00
{
2020-05-19 10:44:53 +00:00
/// In compact parts we read all columns, because they all stored in a
/// single file
2020-06-05 20:47:46 +00:00
if ( ! isWidePart ( source_part ) )
2020-05-19 09:54:56 +00:00
return updated_header . getNamesAndTypesList ( ) ;
2020-05-19 10:44:53 +00:00
2020-03-17 13:49:50 +00:00
NameSet removed_columns ;
2020-07-27 09:42:37 +00:00
NameToNameMap renamed_columns_to_from ;
/// All commands are validated in AlterCommand so we don't care about order
2020-03-17 13:49:50 +00:00
for ( const auto & command : commands_for_removes )
{
if ( command . type = = MutationCommand : : DROP_COLUMN )
removed_columns . insert ( command . column_name ) ;
2020-03-30 12:51:05 +00:00
if ( command . type = = MutationCommand : : RENAME_COLUMN )
2020-07-27 09:42:37 +00:00
renamed_columns_to_from . emplace ( command . rename_to , command . column_name ) ;
2020-03-17 13:49:50 +00:00
}
2020-02-25 09:49:45 +00:00
Names source_column_names = source_part - > getColumns ( ) . getNames ( ) ;
2020-02-17 15:44:13 +00:00
NameSet source_columns_name_set ( source_column_names . begin ( ) , source_column_names . end ( ) ) ;
2020-05-19 09:54:56 +00:00
for ( auto it = storage_columns . begin ( ) ; it ! = storage_columns . end ( ) ; )
2020-02-17 15:44:13 +00:00
{
if ( updated_header . has ( it - > name ) )
{
auto updated_type = updated_header . getByName ( it - > name ) . type ;
if ( updated_type ! = it - > type )
it - > type = updated_type ;
+ + it ;
}
else
2020-05-19 09:54:56 +00:00
{
2020-07-27 09:42:37 +00:00
if ( ! source_columns_name_set . count ( it - > name ) )
{
/// Source part doesn't have column but some other column
/// was renamed to it's name.
auto renamed_it = renamed_columns_to_from . find ( it - > name ) ;
if ( renamed_it ! = renamed_columns_to_from . end ( )
& & source_columns_name_set . count ( renamed_it - > second ) )
+ + it ;
else
it = storage_columns . erase ( it ) ;
}
else
{
bool was_renamed = false ;
bool was_removed = removed_columns . count ( it - > name ) ;
/// Check that this column was renamed to some other name
for ( const auto & [ rename_to , rename_from ] : renamed_columns_to_from )
{
if ( rename_from = = it - > name )
{
was_renamed = true ;
break ;
}
}
/// If we want to rename this column to some other name, than it
/// should it's previous version should be dropped or removed
if ( renamed_columns_to_from . count ( it - > name ) & & ! was_renamed & & ! was_removed )
throw Exception (
ErrorCodes : : LOGICAL_ERROR ,
" Incorrect mutation commands, trying to rename column {} to {}, but part {} already has column {} " , renamed_columns_to_from [ it - > name ] , it - > name , source_part - > name , it - > name ) ;
/// Column was renamed and no other column renamed to it's name
/// or column is dropped.
if ( ! renamed_columns_to_from . count ( it - > name ) & & ( was_renamed | | was_removed ) )
it = storage_columns . erase ( it ) ;
else
+ + it ;
}
2020-05-19 09:54:56 +00:00
}
2020-02-17 15:44:13 +00:00
}
2020-05-19 09:54:56 +00:00
return storage_columns ;
2020-02-17 15:44:13 +00:00
}
2020-04-10 13:36:51 +00:00
MergeTreeIndices MergeTreeDataMergerMutator : : getIndicesForNewDataPart (
2020-05-28 12:37:05 +00:00
const IndicesDescription & all_indices ,
2020-04-10 13:36:51 +00:00
const MutationCommands & commands_for_removes )
{
NameSet removed_indices ;
for ( const auto & command : commands_for_removes )
if ( command . type = = MutationCommand : : DROP_INDEX )
removed_indices . insert ( command . column_name ) ;
MergeTreeIndices new_indices ;
for ( const auto & index : all_indices )
2020-05-28 12:37:05 +00:00
if ( ! removed_indices . count ( index . name ) )
new_indices . push_back ( MergeTreeIndexFactory : : instance ( ) . get ( index ) ) ;
2020-04-10 13:36:51 +00:00
return new_indices ;
}
2020-02-25 13:46:45 +00:00
2020-03-23 02:12:31 +00:00
std : : set < MergeTreeIndexPtr > MergeTreeDataMergerMutator : : getIndicesToRecalculate (
2020-03-18 11:36:18 +00:00
BlockInputStreamPtr & input_stream ,
const NamesAndTypesList & updated_columns ,
2020-06-17 09:38:47 +00:00
const StorageMetadataPtr & metadata_snapshot ,
2020-06-22 15:51:11 +00:00
const Context & context )
2020-03-18 11:36:18 +00:00
{
/// Checks if columns used in skipping indexes modified.
2020-05-28 12:37:05 +00:00
const auto & index_factory = MergeTreeIndexFactory : : instance ( ) ;
2020-03-18 11:36:18 +00:00
std : : set < MergeTreeIndexPtr > indices_to_recalc ;
ASTPtr indices_recalc_expr_list = std : : make_shared < ASTExpressionList > ( ) ;
for ( const auto & col : updated_columns . getNames ( ) )
{
2020-06-17 09:38:47 +00:00
const auto & indices = metadata_snapshot - > getSecondaryIndices ( ) ;
2020-05-28 12:37:05 +00:00
for ( size_t i = 0 ; i < indices . size ( ) ; + + i )
2020-03-18 11:36:18 +00:00
{
2020-05-28 12:37:05 +00:00
const auto & index = indices [ i ] ;
const auto & index_cols = index . expression - > getRequiredColumns ( ) ;
2020-03-18 11:36:18 +00:00
auto it = std : : find ( std : : cbegin ( index_cols ) , std : : cend ( index_cols ) , col ) ;
2020-05-28 12:37:05 +00:00
if ( it ! = std : : cend ( index_cols )
& & indices_to_recalc . insert ( index_factory . get ( index ) ) . second )
2020-03-18 11:36:18 +00:00
{
2020-05-28 12:37:05 +00:00
ASTPtr expr_list = index . expression_list_ast - > clone ( ) ;
2020-03-18 11:36:18 +00:00
for ( const auto & expr : expr_list - > children )
indices_recalc_expr_list - > children . push_back ( expr - > clone ( ) ) ;
}
}
}
if ( ! indices_to_recalc . empty ( ) & & input_stream )
{
2020-07-22 17:13:05 +00:00
auto indices_recalc_syntax = TreeRewriter ( context ) . analyze ( indices_recalc_expr_list , input_stream - > getHeader ( ) . getNamesAndTypesList ( ) ) ;
2020-03-18 11:36:18 +00:00
auto indices_recalc_expr = ExpressionAnalyzer (
indices_recalc_expr_list ,
indices_recalc_syntax , context ) . getActions ( false ) ;
/// We can update only one column, but some skip idx expression may depend on several
/// columns (c1 + c2 * c3). It works because this stream was created with help of
/// MutationsInterpreter which knows about skip indices and stream 'in' already has
/// all required columns.
/// TODO move this logic to single place.
input_stream = std : : make_shared < MaterializingBlockInputStream > (
std : : make_shared < ExpressionBlockInputStream > ( input_stream , indices_recalc_expr ) ) ;
}
return indices_to_recalc ;
}
2020-06-22 09:04:27 +00:00
bool MergeTreeDataMergerMutator : : shouldExecuteTTL ( const StorageMetadataPtr & metadata_snapshot , const Names & columns , const MutationCommands & commands )
2020-02-25 13:46:45 +00:00
{
2020-06-17 13:39:26 +00:00
if ( ! metadata_snapshot - > hasAnyTTL ( ) )
2020-02-25 13:46:45 +00:00
return false ;
for ( const auto & command : commands )
if ( command . type = = MutationCommand : : MATERIALIZE_TTL )
return true ;
2020-06-16 12:19:21 +00:00
auto dependencies = metadata_snapshot - > getColumnDependencies ( NameSet ( columns . begin ( ) , columns . end ( ) ) ) ;
2020-02-25 13:46:45 +00:00
for ( const auto & dependency : dependencies )
if ( dependency . kind = = ColumnDependency : : TTL_EXPRESSION | | dependency . kind = = ColumnDependency : : TTL_TARGET )
return true ;
return false ;
}
2020-03-18 11:36:18 +00:00
2020-03-18 13:16:59 +00:00
void MergeTreeDataMergerMutator : : mutateAllPartColumns (
MergeTreeData : : MutableDataPartPtr new_data_part ,
2020-06-16 15:51:29 +00:00
const StorageMetadataPtr & metadata_snapshot ,
2020-04-10 13:36:51 +00:00
const MergeTreeIndices & skip_indices ,
2020-03-18 13:16:59 +00:00
BlockInputStreamPtr mutating_stream ,
time_t time_of_mutation ,
const CompressionCodecPtr & compression_codec ,
MergeListEntry & merge_entry ,
2020-06-25 16:55:45 +00:00
bool need_remove_expired_values ,
bool need_sync ) const
2020-03-18 13:16:59 +00:00
{
if ( mutating_stream = = nullptr )
throw Exception ( " Cannot mutate part columns with uninitialized mutations stream. It's a bug " , ErrorCodes : : LOGICAL_ERROR ) ;
2020-06-17 12:39:20 +00:00
if ( metadata_snapshot - > hasPrimaryKey ( ) | | metadata_snapshot - > hasSecondaryIndices ( ) )
2020-03-18 13:16:59 +00:00
mutating_stream = std : : make_shared < MaterializingBlockInputStream > (
2020-06-17 09:38:47 +00:00
std : : make_shared < ExpressionBlockInputStream > ( mutating_stream , data . getPrimaryKeyAndSkipIndicesExpression ( metadata_snapshot ) ) ) ;
2020-03-18 13:16:59 +00:00
if ( need_remove_expired_values )
2020-06-17 13:39:26 +00:00
mutating_stream = std : : make_shared < TTLBlockInputStream > ( mutating_stream , data , metadata_snapshot , new_data_part , time_of_mutation , true ) ;
2020-03-18 13:16:59 +00:00
IMergeTreeDataPart : : MinMaxIndex minmax_idx ;
MergedBlockOutputStream out {
new_data_part ,
2020-06-16 15:51:29 +00:00
metadata_snapshot ,
2020-03-18 13:16:59 +00:00
new_data_part - > getColumns ( ) ,
2020-04-10 13:36:51 +00:00
skip_indices ,
2020-03-18 13:16:59 +00:00
compression_codec } ;
mutating_stream - > readPrefix ( ) ;
out . writePrefix ( ) ;
Block block ;
2020-03-19 14:11:37 +00:00
while ( checkOperationIsNotCanceled ( merge_entry ) & & ( block = mutating_stream - > read ( ) ) )
2020-03-18 13:16:59 +00:00
{
minmax_idx . update ( block , data . minmax_idx_columns ) ;
out . write ( block ) ;
merge_entry - > rows_written + = block . rows ( ) ;
merge_entry - > bytes_written_uncompressed + = block . bytes ( ) ;
}
new_data_part - > minmax_idx = std : : move ( minmax_idx ) ;
mutating_stream - > readSuffix ( ) ;
2020-06-25 16:55:45 +00:00
out . writeSuffixAndFinalizePart ( new_data_part , need_sync ) ;
2020-03-18 13:16:59 +00:00
}
void MergeTreeDataMergerMutator : : mutateSomePartColumns (
const MergeTreeDataPartPtr & source_part ,
2020-06-16 15:51:29 +00:00
const StorageMetadataPtr & metadata_snapshot ,
2020-03-18 13:16:59 +00:00
const std : : set < MergeTreeIndexPtr > & indices_to_recalc ,
2020-03-19 14:11:37 +00:00
const Block & mutation_header ,
2020-03-18 13:16:59 +00:00
MergeTreeData : : MutableDataPartPtr new_data_part ,
2020-03-18 17:06:48 +00:00
BlockInputStreamPtr mutating_stream ,
2020-03-18 13:16:59 +00:00
time_t time_of_mutation ,
const CompressionCodecPtr & compression_codec ,
MergeListEntry & merge_entry ,
2020-06-25 16:55:45 +00:00
bool need_remove_expired_values ,
bool need_sync ) const
2020-03-18 13:16:59 +00:00
{
2020-03-18 17:06:48 +00:00
if ( mutating_stream = = nullptr )
2020-03-18 13:16:59 +00:00
throw Exception ( " Cannot mutate part columns with uninitialized mutations stream. It's a bug " , ErrorCodes : : LOGICAL_ERROR ) ;
if ( need_remove_expired_values )
2020-06-17 13:39:26 +00:00
mutating_stream = std : : make_shared < TTLBlockInputStream > ( mutating_stream , data , metadata_snapshot , new_data_part , time_of_mutation , true ) ;
2020-03-18 13:16:59 +00:00
IMergedBlockOutputStream : : WrittenOffsetColumns unused_written_offsets ;
MergedColumnOnlyOutputStream out (
new_data_part ,
2020-06-16 15:51:29 +00:00
metadata_snapshot ,
2020-03-19 14:11:37 +00:00
mutation_header ,
2020-03-18 13:16:59 +00:00
compression_codec ,
std : : vector < MergeTreeIndexPtr > ( indices_to_recalc . begin ( ) , indices_to_recalc . end ( ) ) ,
nullptr ,
source_part - > index_granularity ,
& source_part - > index_granularity_info
) ;
2020-03-18 17:06:48 +00:00
mutating_stream - > readPrefix ( ) ;
2020-03-18 13:16:59 +00:00
out . writePrefix ( ) ;
Block block ;
2020-03-19 14:11:37 +00:00
while ( checkOperationIsNotCanceled ( merge_entry ) & & ( block = mutating_stream - > read ( ) ) )
2020-03-18 13:16:59 +00:00
{
out . write ( block ) ;
merge_entry - > rows_written + = block . rows ( ) ;
merge_entry - > bytes_written_uncompressed + = block . bytes ( ) ;
}
2020-03-18 17:06:48 +00:00
mutating_stream - > readSuffix ( ) ;
2020-03-18 13:16:59 +00:00
2020-06-25 16:55:45 +00:00
auto changed_checksums = out . writeSuffixAndGetChecksums ( new_data_part , new_data_part - > checksums , need_sync ) ;
2020-03-18 13:16:59 +00:00
new_data_part - > checksums . add ( std : : move ( changed_checksums ) ) ;
}
void MergeTreeDataMergerMutator : : finalizeMutatedPart (
const MergeTreeDataPartPtr & source_part ,
MergeTreeData : : MutableDataPartPtr new_data_part ,
2020-08-26 15:29:46 +00:00
bool need_remove_expired_values ,
const CompressionCodecPtr & codec )
2020-03-18 13:16:59 +00:00
{
2020-05-09 21:24:15 +00:00
auto disk = new_data_part - > volume - > getDisk ( ) ;
2020-03-18 13:16:59 +00:00
if ( need_remove_expired_values )
{
/// Write a file with ttl infos in json format.
2020-03-20 16:32:16 +00:00
auto out_ttl = disk - > writeFile ( new_data_part - > getFullRelativePath ( ) + " ttl.txt " , 4096 ) ;
2020-03-20 16:28:41 +00:00
HashingWriteBuffer out_hashing ( * out_ttl ) ;
2020-03-18 13:16:59 +00:00
new_data_part - > ttl_infos . write ( out_hashing ) ;
new_data_part - > checksums . files [ " ttl.txt " ] . file_size = out_hashing . count ( ) ;
new_data_part - > checksums . files [ " ttl.txt " ] . file_hash = out_hashing . getHash ( ) ;
}
{
/// Write file with checksums.
2020-03-20 16:32:16 +00:00
auto out_checksums = disk - > writeFile ( new_data_part - > getFullRelativePath ( ) + " checksums.txt " , 4096 ) ;
2020-03-20 16:28:41 +00:00
new_data_part - > checksums . write ( * out_checksums ) ;
2020-03-18 13:16:59 +00:00
} /// close fd
2020-08-26 15:29:46 +00:00
{
auto out = disk - > writeFile ( new_data_part - > getFullRelativePath ( ) + IMergeTreeDataPart : : DEFAULT_COMPRESSION_CODEC_FILE_NAME , 4096 ) ;
DB : : writeText ( queryToString ( codec - > getFullCodecDesc ( ) ) , * out ) ;
}
2020-03-18 13:16:59 +00:00
{
/// Write a file with a description of columns.
2020-03-20 16:32:16 +00:00
auto out_columns = disk - > writeFile ( new_data_part - > getFullRelativePath ( ) + " columns.txt " , 4096 ) ;
2020-03-20 16:28:41 +00:00
new_data_part - > getColumns ( ) . writeText ( * out_columns ) ;
2020-03-18 13:16:59 +00:00
} /// close fd
new_data_part - > rows_count = source_part - > rows_count ;
new_data_part - > index_granularity = source_part - > index_granularity ;
new_data_part - > index = source_part - > index ;
new_data_part - > minmax_idx = source_part - > minmax_idx ;
new_data_part - > modification_time = time ( nullptr ) ;
2020-03-23 13:32:02 +00:00
new_data_part - > setBytesOnDisk (
2020-05-09 21:24:15 +00:00
MergeTreeData : : DataPart : : calculateTotalSizeOnDisk ( new_data_part - > volume - > getDisk ( ) , new_data_part - > getFullRelativePath ( ) ) ) ;
2020-08-26 15:29:46 +00:00
new_data_part - > default_codec = codec ;
2020-03-23 12:19:43 +00:00
new_data_part - > calculateColumnsSizesOnDisk ( ) ;
2020-03-18 13:16:59 +00:00
}
2020-03-19 14:11:37 +00:00
bool MergeTreeDataMergerMutator : : checkOperationIsNotCanceled ( const MergeListEntry & merge_entry ) const
{
if ( merges_blocker . isCancelled ( ) | | merge_entry - > is_cancelled )
throw Exception ( " Cancelled mutating parts " , ErrorCodes : : ABORTED ) ;
return true ;
}
2014-03-13 12:48:07 +00:00
}