This commit is contained in:
alesapin 2020-09-04 16:55:07 +03:00
parent f9dd4cc98d
commit ea7168580b
4 changed files with 14 additions and 1 deletions

View File

@ -179,7 +179,7 @@ inline MergeListEntry::~MergeListEntry()
{
std::lock_guard lock{list.mutex};
if (it->merge_type == MergeType::TTL_DELETE)
if (isTTLMergeType(it->merge_type))
--list.merges_with_ttl_counter;
list.merges.erase(it);

View File

@ -54,6 +54,7 @@ struct Settings;
M(UInt64, min_replicated_logs_to_keep, 10, "Keep about this number of last records in ZooKeeper log, even if they are obsolete. It doesn't affect work of tables: used only to diagnose ZooKeeper log before cleaning.", 0) \
M(Seconds, prefer_fetch_merged_part_time_threshold, 3600, "If time passed after replication log entry creation exceeds this threshold and sum size of parts is greater than \"prefer_fetch_merged_part_size_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \
M(UInt64, prefer_fetch_merged_part_size_threshold, 10ULL * 1024 * 1024 * 1024, "If sum size of parts exceeds this threshold and time passed after replication log entry creation is greater than \"prefer_fetch_merged_part_time_threshold\", prefer fetching merged part from replica instead of doing merge locally. To speed up very long merges.", 0) \
M(Seconds, try_fetch_recompressed_part_timeout, 7200, "Recompression works slow in most cases, so we don't start merge with recompression until this timeout and trying to fetch recompressed part from replica which assigned this merge with recompression.", 0) \
M(Bool, always_fetch_merged_part, 0, "If true, replica never merge parts and always download merged parts from other replicas.", 0) \
M(UInt64, max_suspicious_broken_parts, 10, "Max broken parts, if more - deny automatic deletion.", 0) \
M(UInt64, max_files_to_modify_in_alter_columns, 75, "Not apply ALTER if number of files for modification(deletion, addition) more than this.", 0) \

View File

@ -15,6 +15,8 @@ MergeType checkAndGetMergeType(UInt64 merge_type)
return MergeType::REGULAR;
else if (merge_type == static_cast<UInt64>(MergeType::TTL_DELETE))
return MergeType::TTL_DELETE;
else if (merge_type == static_cast<UInt64>(MergeType::TTL_RECOMPRESS))
return MergeType::TTL_RECOMPRESS;
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unknown MergeType {}", static_cast<UInt64>(merge_type));
}

View File

@ -1308,6 +1308,16 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
return false;
}
if (entry.merge_type == MergeType::TTL_RECOMPRESS &&
(time(nullptr) - entry.create_time) <= storage_settings_ptr->try_fetch_recompressed_part_timeout.totalSeconds() &&
entry.source_replica != replica_name)
{
LOG_INFO(log, "Will try to fetch part {} until '{}' because this part assigned to recompression merge. "
"Source replica {} will try to merge this part first", entry.new_part_name,
LocalDateTime(entry.create_time + storage_settings_ptr->try_fetch_recompressed_part_timeout.totalSeconds()), entry.source_replica);
return false;
}
DataPartsVector parts;
bool have_all_parts = true;
for (const String & name : entry.source_parts)