Additionally check remote_fs_execute_merges_on_single_replica_time_threshold inside ReplicatedMergeTreeQueue.

This commit is contained in:
Nikolai Kochetov 2022-01-31 17:53:28 +00:00
parent 1c9f026178
commit a207cdf28f

View File

@ -1196,15 +1196,30 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
return false;
}
if (merge_strategy_picker.shouldMergeOnSingleReplica(entry))
bool should_execute_on_single_replica = merge_strategy_picker.shouldMergeOnSingleReplica(entry);
if (!should_execute_on_single_replica)
{
/// Separate check. If we use only s3, check remote_fs_execute_merges_on_single_replica_time_threshold as well.
auto disks = storage.getDisks();
bool only_s3_storage = true;
for (const auto & disk : disks)
if (disk->getType() != DB::DiskType::S3)
only_s3_storage = false;
if (!disks.empty() && only_s3_storage)
should_execute_on_single_replica = merge_strategy_picker.shouldMergeOnSingleReplicaShared(entry);
}
if (should_execute_on_single_replica)
{
auto replica_to_execute_merge = merge_strategy_picker.pickReplicaToExecuteMerge(entry);
if (replica_to_execute_merge && !merge_strategy_picker.isMergeFinishedByReplica(replica_to_execute_merge.value(), entry))
{
String reason = "Not executing merge for the part " + entry.new_part_name
+ ", waiting for " + replica_to_execute_merge.value() + " to execute merge.";
out_postpone_reason = reason;
const char * format_str = "Not executing merge for the part {}, waiting for {} to execute merge.";
LOG_DEBUG(log, format_str, entry.new_part_name, replica_to_execute_merge.value());
out_postpone_reason = out_postpone_reason = fmt::format(format_str, format_str, entry.new_part_name, replica_to_execute_merge.value());
return false;
}
}