From cf8202d36ebff92e2104859664a24d7547d334d2 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Fri, 30 Oct 2020 15:41:39 +0300 Subject: [PATCH] better logging, fix flacky test --- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 84 +++++++++++-------- .../MergeTree/ReplicatedMergeTreeQueue.h | 1 + ...tem_parts_race_condition_drop_zookeeper.sh | 5 +- 3 files changed, 55 insertions(+), 35 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 880ad4dd0d3..bf9a94747b6 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -965,13 +965,16 @@ void ReplicatedMergeTreeQueue::checkThereAreNoConflictsInRange(const MergeTreePa } -bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & new_part_name, String & out_reason, std::lock_guard & /* queue_lock */) const +bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & log_entry_name, const String & new_part_name, + String & out_reason, std::lock_guard & /* queue_lock */) const { /// Let's check if the same part is now being created by another action. if (future_parts.count(new_part_name)) { - out_reason = "Not executing log entry for part " + new_part_name - + " because another log entry for the same part is being processed. This shouldn't happen often."; + const char * format_str = "Not executing log entry {} for part {} " + "because another log entry for the same part is being processed. This shouldn't happen often."; + LOG_INFO(log, format_str, log_entry_name, new_part_name); + out_reason = fmt::format(format_str, log_entry_name, new_part_name); return false; /** When the corresponding action is completed, then `isNotCoveredByFuturePart` next time, will succeed, @@ -992,8 +995,10 @@ bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & new_ if (future_part.contains(result_part)) { - out_reason = "Not executing log entry for part " + new_part_name + " because it is covered by part " - + future_part_elem.first + " that is currently executing"; + const char * format_str = "Not executing log entry {} for part {} " + "because it is covered by part {} that is currently executing."; + LOG_TRACE(log, format_str, log_entry_name, new_part_name, future_part_elem.first); + out_reason = fmt::format(format_str, log_entry_name, new_part_name, future_part_elem.first); return false; } } @@ -1005,7 +1010,7 @@ bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & pa { std::lock_guard lock(state_mutex); - if (isNotCoveredByFuturePartsImpl(part_name, reject_reason, lock)) + if (isNotCoveredByFuturePartsImpl(entry.znode_name, part_name, reject_reason, lock)) { CurrentlyExecuting::setActualPartName(entry, part_name, *this); return true; @@ -1030,12 +1035,8 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( { for (const String & new_part_name : entry.getBlockingPartNames()) { - if (!isNotCoveredByFuturePartsImpl(new_part_name, out_postpone_reason, state_lock)) - { - if (!out_postpone_reason.empty()) - LOG_DEBUG(log, out_postpone_reason); + if (!isNotCoveredByFuturePartsImpl(entry.znode_name, new_part_name, out_postpone_reason, state_lock)) return false; - } } } @@ -1051,10 +1052,11 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( { if (future_parts.count(name)) { - String reason = "Not merging into part " + entry.new_part_name - + " because part " + name + " is not ready yet (log entry for that part is being processed)."; - LOG_TRACE(log, reason); - out_postpone_reason = reason; + const char * format_str = "Not executing log entry {} of type {} for part {} " + "because part {} is not ready yet (log entry for that part is being processed)."; + LOG_TRACE(log, format_str, entry.znode_name, entry.typeToString(), entry.new_part_name, name); + /// Copy-paste of above because we need structured logging (instead of already formatted message). + out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.typeToString(), entry.new_part_name, name); return false; } @@ -1070,9 +1072,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( if (merger_mutator.merges_blocker.isCancelled()) { - String reason = "Not executing log entry for part " + entry.new_part_name + " because merges and mutations are cancelled now."; - LOG_DEBUG(log, reason); - out_postpone_reason = reason; + const char * format_str = "Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now."; + LOG_DEBUG(log, format_str, entry.znode_name, entry.typeToString(), entry.new_part_name); + out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.typeToString(), entry.new_part_name); return false; } @@ -1094,17 +1096,19 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( { if (merger_mutator.ttl_merges_blocker.isCancelled()) { - String reason = "Not executing log entry for part " + entry.new_part_name + " because merges with TTL are cancelled now."; - LOG_DEBUG(log, reason); - out_postpone_reason = reason; + const char * format_str = "Not executing log entry {} for part {} because merges with TTL are cancelled now."; + LOG_DEBUG(log, format_str, + entry.znode_name, entry.new_part_name); + out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.new_part_name); return false; } size_t total_merges_with_ttl = data.getTotalMergesWithTTLInMergeList(); if (total_merges_with_ttl >= data_settings->max_number_of_merges_with_ttl_in_pool) { - const char * format_str = "Not executing log entry for part {}" + const char * format_str = "Not executing log entry {} for part {}" " because {} merges with TTL already executing, maximum {}."; - LOG_DEBUG(log, format_str, entry.new_part_name, total_merges_with_ttl, + LOG_DEBUG(log, format_str, entry.znode_name, + entry.new_part_name, total_merges_with_ttl, data_settings->max_number_of_merges_with_ttl_in_pool); out_postpone_reason = fmt::format(format_str, entry.new_part_name, total_merges_with_ttl, @@ -1116,15 +1120,14 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size) { - const char * format_str = "Not executing log entry {} for part {}" + const char * format_str = "Not executing log entry {} of type {} for part {}" " because source parts size ({}) is greater than the current maximum ({})."; - LOG_DEBUG(log, format_str, + LOG_DEBUG(log, format_str, entry.znode_name, entry.typeToString(), entry.new_part_name, ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size)); - /// Copy-paste of above because we need structured logging (instead of already formatted message). - out_postpone_reason = fmt::format(format_str, + out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.typeToString(), entry.new_part_name, ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size)); @@ -1139,9 +1142,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( if (!alter_sequence.canExecuteMetaAlter(entry.alter_version, state_lock)) { int head_alter = alter_sequence.getHeadAlterVersion(state_lock); - out_postpone_reason = "Cannot execute alter metadata with version: " + std::to_string(entry.alter_version) - + " because another alter " + std::to_string(head_alter) - + " must be executed before"; + const char * format_str = "Cannot execute alter metadata {} with version {} because another alter {} must be executed before"; + LOG_TRACE(log, format_str, entry.znode_name, entry.alter_version, head_alter); + out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.alter_version); return false; } } @@ -1153,11 +1156,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( { int head_alter = alter_sequence.getHeadAlterVersion(state_lock); if (head_alter == entry.alter_version) - out_postpone_reason = "Cannot execute alter data with version: " - + std::to_string(entry.alter_version) + " because metadata still not altered"; + { + const char * format_str = "Cannot execute alter data {} with version {} because metadata still not altered"; + LOG_TRACE(log, format_str, entry.znode_name, entry.alter_version); + out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.alter_version); + } else - out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version) - + " because another alter " + std::to_string(head_alter) + " must be executed before"; + { + const char * format_str = "Cannot execute alter data {} with version {} because another alter {} must be executed before"; + LOG_TRACE(log, format_str, entry.znode_name, entry.alter_version, head_alter); + out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.alter_version, head_alter); + } return false; } @@ -1170,7 +1179,14 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( /// Deadlock is possible if multiple DROP/REPLACE RANGE entries are executing in parallel and wait each other. /// See also removePartProducingOpsInRange(...) and ReplicatedMergeTreeQueue::CurrentlyExecuting. if (currently_executing_drop_or_replace_range) + { + + const char * format_str = "Not executing log entry {} of type {} for part {} " + "because another DROP_RANGE or REPLACE_RANGE entry are currently executing."; + LOG_TRACE(log, format_str, entry.znode_name, entry.typeToString(), entry.new_part_name); + out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.typeToString(), entry.new_part_name); return false; + } } return true; diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index 93b79c8336c..8036e66b86b 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -205,6 +205,7 @@ private: * Should be called under state_mutex. */ bool isNotCoveredByFuturePartsImpl( + const String & log_entry_name, const String & new_part_name, String & out_reason, std::lock_guard & state_lock) const; diff --git a/tests/queries/0_stateless/00993_system_parts_race_condition_drop_zookeeper.sh b/tests/queries/0_stateless/00993_system_parts_race_condition_drop_zookeeper.sh index d86631513a1..d4344e6e8bd 100755 --- a/tests/queries/0_stateless/00993_system_parts_race_condition_drop_zookeeper.sh +++ b/tests/queries/0_stateless/00993_system_parts_race_condition_drop_zookeeper.sh @@ -97,4 +97,7 @@ timeout $TIMEOUT bash -c thread6 2>&1 | grep "was not completely removed from Zo wait -for i in {0..9}; do $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS alter_table_$i"; done +for i in {0..9}; do + $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS alter_table_$i" & +done +wait