mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-17 20:02:05 +00:00
better logging, fix flacky test
This commit is contained in:
parent
46db8a2d51
commit
cf8202d36e
@ -965,13 +965,16 @@ void ReplicatedMergeTreeQueue::checkThereAreNoConflictsInRange(const MergeTreePa
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & new_part_name, String & out_reason, std::lock_guard<std::mutex> & /* queue_lock */) const
|
bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & log_entry_name, const String & new_part_name,
|
||||||
|
String & out_reason, std::lock_guard<std::mutex> & /* queue_lock */) const
|
||||||
{
|
{
|
||||||
/// Let's check if the same part is now being created by another action.
|
/// Let's check if the same part is now being created by another action.
|
||||||
if (future_parts.count(new_part_name))
|
if (future_parts.count(new_part_name))
|
||||||
{
|
{
|
||||||
out_reason = "Not executing log entry for part " + new_part_name
|
const char * format_str = "Not executing log entry {} for part {} "
|
||||||
+ " because another log entry for the same part is being processed. This shouldn't happen often.";
|
"because another log entry for the same part is being processed. This shouldn't happen often.";
|
||||||
|
LOG_INFO(log, format_str, log_entry_name, new_part_name);
|
||||||
|
out_reason = fmt::format(format_str, log_entry_name, new_part_name);
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
/** When the corresponding action is completed, then `isNotCoveredByFuturePart` next time, will succeed,
|
/** When the corresponding action is completed, then `isNotCoveredByFuturePart` next time, will succeed,
|
||||||
@ -992,8 +995,10 @@ bool ReplicatedMergeTreeQueue::isNotCoveredByFuturePartsImpl(const String & new_
|
|||||||
|
|
||||||
if (future_part.contains(result_part))
|
if (future_part.contains(result_part))
|
||||||
{
|
{
|
||||||
out_reason = "Not executing log entry for part " + new_part_name + " because it is covered by part "
|
const char * format_str = "Not executing log entry {} for part {} "
|
||||||
+ future_part_elem.first + " that is currently executing";
|
"because it is covered by part {} that is currently executing.";
|
||||||
|
LOG_TRACE(log, format_str, log_entry_name, new_part_name, future_part_elem.first);
|
||||||
|
out_reason = fmt::format(format_str, log_entry_name, new_part_name, future_part_elem.first);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1005,7 +1010,7 @@ bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & pa
|
|||||||
{
|
{
|
||||||
std::lock_guard lock(state_mutex);
|
std::lock_guard lock(state_mutex);
|
||||||
|
|
||||||
if (isNotCoveredByFuturePartsImpl(part_name, reject_reason, lock))
|
if (isNotCoveredByFuturePartsImpl(entry.znode_name, part_name, reject_reason, lock))
|
||||||
{
|
{
|
||||||
CurrentlyExecuting::setActualPartName(entry, part_name, *this);
|
CurrentlyExecuting::setActualPartName(entry, part_name, *this);
|
||||||
return true;
|
return true;
|
||||||
@ -1030,14 +1035,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
|
|||||||
{
|
{
|
||||||
for (const String & new_part_name : entry.getBlockingPartNames())
|
for (const String & new_part_name : entry.getBlockingPartNames())
|
||||||
{
|
{
|
||||||
if (!isNotCoveredByFuturePartsImpl(new_part_name, out_postpone_reason, state_lock))
|
if (!isNotCoveredByFuturePartsImpl(entry.znode_name, new_part_name, out_postpone_reason, state_lock))
|
||||||
{
|
|
||||||
if (!out_postpone_reason.empty())
|
|
||||||
LOG_DEBUG(log, out_postpone_reason);
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::MUTATE_PART)
|
if (entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::MUTATE_PART)
|
||||||
{
|
{
|
||||||
@ -1051,10 +1052,11 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
|
|||||||
{
|
{
|
||||||
if (future_parts.count(name))
|
if (future_parts.count(name))
|
||||||
{
|
{
|
||||||
String reason = "Not merging into part " + entry.new_part_name
|
const char * format_str = "Not executing log entry {} of type {} for part {} "
|
||||||
+ " because part " + name + " is not ready yet (log entry for that part is being processed).";
|
"because part {} is not ready yet (log entry for that part is being processed).";
|
||||||
LOG_TRACE(log, reason);
|
LOG_TRACE(log, format_str, entry.znode_name, entry.typeToString(), entry.new_part_name, name);
|
||||||
out_postpone_reason = reason;
|
/// Copy-paste of above because we need structured logging (instead of already formatted message).
|
||||||
|
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.typeToString(), entry.new_part_name, name);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1070,9 +1072,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
|
|||||||
|
|
||||||
if (merger_mutator.merges_blocker.isCancelled())
|
if (merger_mutator.merges_blocker.isCancelled())
|
||||||
{
|
{
|
||||||
String reason = "Not executing log entry for part " + entry.new_part_name + " because merges and mutations are cancelled now.";
|
const char * format_str = "Not executing log entry {} of type {} for part {} because merges and mutations are cancelled now.";
|
||||||
LOG_DEBUG(log, reason);
|
LOG_DEBUG(log, format_str, entry.znode_name, entry.typeToString(), entry.new_part_name);
|
||||||
out_postpone_reason = reason;
|
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.typeToString(), entry.new_part_name);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1094,17 +1096,19 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
|
|||||||
{
|
{
|
||||||
if (merger_mutator.ttl_merges_blocker.isCancelled())
|
if (merger_mutator.ttl_merges_blocker.isCancelled())
|
||||||
{
|
{
|
||||||
String reason = "Not executing log entry for part " + entry.new_part_name + " because merges with TTL are cancelled now.";
|
const char * format_str = "Not executing log entry {} for part {} because merges with TTL are cancelled now.";
|
||||||
LOG_DEBUG(log, reason);
|
LOG_DEBUG(log, format_str,
|
||||||
out_postpone_reason = reason;
|
entry.znode_name, entry.new_part_name);
|
||||||
|
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.new_part_name);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
size_t total_merges_with_ttl = data.getTotalMergesWithTTLInMergeList();
|
size_t total_merges_with_ttl = data.getTotalMergesWithTTLInMergeList();
|
||||||
if (total_merges_with_ttl >= data_settings->max_number_of_merges_with_ttl_in_pool)
|
if (total_merges_with_ttl >= data_settings->max_number_of_merges_with_ttl_in_pool)
|
||||||
{
|
{
|
||||||
const char * format_str = "Not executing log entry for part {}"
|
const char * format_str = "Not executing log entry {} for part {}"
|
||||||
" because {} merges with TTL already executing, maximum {}.";
|
" because {} merges with TTL already executing, maximum {}.";
|
||||||
LOG_DEBUG(log, format_str, entry.new_part_name, total_merges_with_ttl,
|
LOG_DEBUG(log, format_str, entry.znode_name,
|
||||||
|
entry.new_part_name, total_merges_with_ttl,
|
||||||
data_settings->max_number_of_merges_with_ttl_in_pool);
|
data_settings->max_number_of_merges_with_ttl_in_pool);
|
||||||
|
|
||||||
out_postpone_reason = fmt::format(format_str, entry.new_part_name, total_merges_with_ttl,
|
out_postpone_reason = fmt::format(format_str, entry.new_part_name, total_merges_with_ttl,
|
||||||
@ -1116,15 +1120,14 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
|
|||||||
|
|
||||||
if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size)
|
if (!ignore_max_size && sum_parts_size_in_bytes > max_source_parts_size)
|
||||||
{
|
{
|
||||||
const char * format_str = "Not executing log entry {} for part {}"
|
const char * format_str = "Not executing log entry {} of type {} for part {}"
|
||||||
" because source parts size ({}) is greater than the current maximum ({}).";
|
" because source parts size ({}) is greater than the current maximum ({}).";
|
||||||
|
|
||||||
LOG_DEBUG(log, format_str,
|
LOG_DEBUG(log, format_str, entry.znode_name,
|
||||||
entry.typeToString(), entry.new_part_name,
|
entry.typeToString(), entry.new_part_name,
|
||||||
ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size));
|
ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size));
|
||||||
|
|
||||||
/// Copy-paste of above because we need structured logging (instead of already formatted message).
|
out_postpone_reason = fmt::format(format_str, entry.znode_name,
|
||||||
out_postpone_reason = fmt::format(format_str,
|
|
||||||
entry.typeToString(), entry.new_part_name,
|
entry.typeToString(), entry.new_part_name,
|
||||||
ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size));
|
ReadableSize(sum_parts_size_in_bytes), ReadableSize(max_source_parts_size));
|
||||||
|
|
||||||
@ -1139,9 +1142,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
|
|||||||
if (!alter_sequence.canExecuteMetaAlter(entry.alter_version, state_lock))
|
if (!alter_sequence.canExecuteMetaAlter(entry.alter_version, state_lock))
|
||||||
{
|
{
|
||||||
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
|
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
|
||||||
out_postpone_reason = "Cannot execute alter metadata with version: " + std::to_string(entry.alter_version)
|
const char * format_str = "Cannot execute alter metadata {} with version {} because another alter {} must be executed before";
|
||||||
+ " because another alter " + std::to_string(head_alter)
|
LOG_TRACE(log, format_str, entry.znode_name, entry.alter_version, head_alter);
|
||||||
+ " must be executed before";
|
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.alter_version);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1153,11 +1156,17 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
|
|||||||
{
|
{
|
||||||
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
|
int head_alter = alter_sequence.getHeadAlterVersion(state_lock);
|
||||||
if (head_alter == entry.alter_version)
|
if (head_alter == entry.alter_version)
|
||||||
out_postpone_reason = "Cannot execute alter data with version: "
|
{
|
||||||
+ std::to_string(entry.alter_version) + " because metadata still not altered";
|
const char * format_str = "Cannot execute alter data {} with version {} because metadata still not altered";
|
||||||
|
LOG_TRACE(log, format_str, entry.znode_name, entry.alter_version);
|
||||||
|
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.alter_version);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version)
|
{
|
||||||
+ " because another alter " + std::to_string(head_alter) + " must be executed before";
|
const char * format_str = "Cannot execute alter data {} with version {} because another alter {} must be executed before";
|
||||||
|
LOG_TRACE(log, format_str, entry.znode_name, entry.alter_version, head_alter);
|
||||||
|
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.alter_version, head_alter);
|
||||||
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -1170,8 +1179,15 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
|
|||||||
/// Deadlock is possible if multiple DROP/REPLACE RANGE entries are executing in parallel and wait each other.
|
/// Deadlock is possible if multiple DROP/REPLACE RANGE entries are executing in parallel and wait each other.
|
||||||
/// See also removePartProducingOpsInRange(...) and ReplicatedMergeTreeQueue::CurrentlyExecuting.
|
/// See also removePartProducingOpsInRange(...) and ReplicatedMergeTreeQueue::CurrentlyExecuting.
|
||||||
if (currently_executing_drop_or_replace_range)
|
if (currently_executing_drop_or_replace_range)
|
||||||
|
{
|
||||||
|
|
||||||
|
const char * format_str = "Not executing log entry {} of type {} for part {} "
|
||||||
|
"because another DROP_RANGE or REPLACE_RANGE entry are currently executing.";
|
||||||
|
LOG_TRACE(log, format_str, entry.znode_name, entry.typeToString(), entry.new_part_name);
|
||||||
|
out_postpone_reason = fmt::format(format_str, entry.znode_name, entry.typeToString(), entry.new_part_name);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -205,6 +205,7 @@ private:
|
|||||||
* Should be called under state_mutex.
|
* Should be called under state_mutex.
|
||||||
*/
|
*/
|
||||||
bool isNotCoveredByFuturePartsImpl(
|
bool isNotCoveredByFuturePartsImpl(
|
||||||
|
const String & log_entry_name,
|
||||||
const String & new_part_name, String & out_reason,
|
const String & new_part_name, String & out_reason,
|
||||||
std::lock_guard<std::mutex> & state_lock) const;
|
std::lock_guard<std::mutex> & state_lock) const;
|
||||||
|
|
||||||
|
@ -97,4 +97,7 @@ timeout $TIMEOUT bash -c thread6 2>&1 | grep "was not completely removed from Zo
|
|||||||
|
|
||||||
wait
|
wait
|
||||||
|
|
||||||
for i in {0..9}; do $CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS alter_table_$i"; done
|
for i in {0..9}; do
|
||||||
|
$CLICKHOUSE_CLIENT -q "DROP TABLE IF EXISTS alter_table_$i" &
|
||||||
|
done
|
||||||
|
wait
|
||||||
|
Loading…
Reference in New Issue
Block a user