Merge pull request #56162 from ClickHouse/part-check-scheduling-monotonic-time

Use monotonic clock for part check scheduling
This commit is contained in:
Igor Nikonov 2023-11-01 23:15:27 +01:00 committed by GitHub
commit 823b62a55c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 12 additions and 11 deletions

View File

@ -64,7 +64,7 @@ void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t
return; return;
LOG_TRACE(log, "Enqueueing {} for check after after {}s", name, delay_to_check_seconds); LOG_TRACE(log, "Enqueueing {} for check after after {}s", name, delay_to_check_seconds);
parts_queue.emplace_back(name, time(nullptr) + delay_to_check_seconds); parts_queue.emplace_back(name, std::chrono::steady_clock::now() + std::chrono::seconds(delay_to_check_seconds));
parts_set.insert(name); parts_set.insert(name);
task->schedule(); task->schedule();
} }
@ -293,10 +293,10 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St
time_t lifetime = time(nullptr) - outdated->remove_time; time_t lifetime = time(nullptr) - outdated->remove_time;
time_t max_lifetime = storage.getSettings()->old_parts_lifetime.totalSeconds(); time_t max_lifetime = storage.getSettings()->old_parts_lifetime.totalSeconds();
time_t delay = lifetime >= max_lifetime ? 0 : max_lifetime - lifetime; time_t delay = lifetime >= max_lifetime ? 0 : max_lifetime - lifetime;
result.recheck_after = delay + 30; result.recheck_after_seconds = delay + 30;
auto message = PreformattedMessage::create("Part {} is Outdated, will wait for cleanup thread to handle it " auto message = PreformattedMessage::create("Part {} is Outdated, will wait for cleanup thread to handle it "
"and check again after {}s", part_name, result.recheck_after); "and check again after {}s", part_name, result.recheck_after_seconds);
LOG_WARNING(log, message); LOG_WARNING(log, message);
result.status = {part_name, true, message.text}; result.status = {part_name, true, message.text};
result.action = ReplicatedCheckResult::RecheckLater; result.action = ReplicatedCheckResult::RecheckLater;
@ -411,7 +411,7 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St
auto message = PreformattedMessage::create("Young part {} with age {} seconds hasn't been added to ZooKeeper yet. It's ok.", auto message = PreformattedMessage::create("Young part {} with age {} seconds hasn't been added to ZooKeeper yet. It's ok.",
part_name, (current_time - part->modification_time)); part_name, (current_time - part->modification_time));
LOG_INFO(log, message); LOG_INFO(log, message);
result.recheck_after = part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER - current_time; result.recheck_after_seconds = part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER - current_time;
result.status = {part_name, true, message}; result.status = {part_name, true, message};
result.action = ReplicatedCheckResult::RecheckLater; result.action = ReplicatedCheckResult::RecheckLater;
return result; return result;
@ -436,9 +436,9 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPartAndFix(const String & p
case ReplicatedCheckResult::RecheckLater: case ReplicatedCheckResult::RecheckLater:
/// NOTE We cannot enqueue it from the check thread itself /// NOTE We cannot enqueue it from the check thread itself
if (recheck_after) if (recheck_after)
*recheck_after = result.recheck_after; *recheck_after = result.recheck_after_seconds;
else else
enqueuePart(part_name, result.recheck_after); enqueuePart(part_name, result.recheck_after_seconds);
break; break;
case ReplicatedCheckResult::DetachUnexpected: case ReplicatedCheckResult::DetachUnexpected:
@ -539,7 +539,7 @@ void ReplicatedMergeTreePartCheckThread::run()
try try
{ {
time_t current_time = time(nullptr); const auto current_time = std::chrono::steady_clock::now();
/// Take part from the queue for verification. /// Take part from the queue for verification.
PartsToCheckQueue::iterator selected = parts_queue.end(); /// end from std::list is not get invalidated PartsToCheckQueue::iterator selected = parts_queue.end(); /// end from std::list is not get invalidated
@ -567,7 +567,7 @@ void ReplicatedMergeTreePartCheckThread::run()
if (next_it != parts_queue.end()) if (next_it != parts_queue.end())
{ {
auto delay = next_it->time - current_time; auto delay = next_it->time - current_time;
task->scheduleAfter(delay * 1000); task->scheduleAfter(duration_cast<std::chrono::milliseconds>(delay).count());
} }
return; return;
} }
@ -593,7 +593,7 @@ void ReplicatedMergeTreePartCheckThread::run()
else if (recheck_after.has_value()) else if (recheck_after.has_value())
{ {
LOG_TRACE(log, "Will recheck part {} after after {}s", selected->name, *recheck_after); LOG_TRACE(log, "Will recheck part {} after after {}s", selected->name, *recheck_after);
selected->time = time(nullptr) + *recheck_after; selected->time = std::chrono::steady_clock::now() + std::chrono::seconds(*recheck_after);
} }
else else
{ {

View File

@ -37,7 +37,7 @@ struct ReplicatedCheckResult
bool exists_in_zookeeper; bool exists_in_zookeeper;
MergeTreeDataPartPtr part; MergeTreeDataPartPtr part;
time_t recheck_after = 0; time_t recheck_after_seconds = 0;
}; };
/** Checks the integrity of the parts requested for validation. /** Checks the integrity of the parts requested for validation.
@ -92,8 +92,9 @@ private:
using StringSet = std::set<String>; using StringSet = std::set<String>;
struct PartToCheck struct PartToCheck
{ {
using TimePoint = std::chrono::steady_clock::time_point;
String name; String name;
time_t time; TimePoint time;
}; };
using PartsToCheckQueue = std::list<PartToCheck>; using PartsToCheckQueue = std::list<PartToCheck>;