From f8a7bdb8f0e3971991185b7d2bf06d480ddefc61 Mon Sep 17 00:00:00 2001 From: Igor Nikonov Date: Tue, 31 Oct 2023 11:21:20 +0000 Subject: [PATCH] Use monotonic time for part checks scheduling --- .../ReplicatedMergeTreePartCheckThread.cpp | 18 +++++++++--------- .../ReplicatedMergeTreePartCheckThread.h | 5 +++-- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp index 24857bdb17e..b1875464725 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.cpp @@ -64,7 +64,7 @@ void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t return; LOG_TRACE(log, "Enqueueing {} for check after after {}s", name, delay_to_check_seconds); - parts_queue.emplace_back(name, time(nullptr) + delay_to_check_seconds); + parts_queue.emplace_back(name, std::chrono::steady_clock::now() + std::chrono::seconds(delay_to_check_seconds)); parts_set.insert(name); task->schedule(); } @@ -293,10 +293,10 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St time_t lifetime = time(nullptr) - outdated->remove_time; time_t max_lifetime = storage.getSettings()->old_parts_lifetime.totalSeconds(); time_t delay = lifetime >= max_lifetime ? 0 : max_lifetime - lifetime; - result.recheck_after = delay + 30; + result.recheck_after_seconds = delay + 30; auto message = PreformattedMessage::create("Part {} is Outdated, will wait for cleanup thread to handle it " - "and check again after {}s", part_name, result.recheck_after); + "and check again after {}s", part_name, result.recheck_after_seconds); LOG_WARNING(log, message); result.status = {part_name, true, message.text}; result.action = ReplicatedCheckResult::RecheckLater; @@ -411,7 +411,7 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St auto message = PreformattedMessage::create("Young part {} with age {} seconds hasn't been added to ZooKeeper yet. It's ok.", part_name, (current_time - part->modification_time)); LOG_INFO(log, message); - result.recheck_after = part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER - current_time; + result.recheck_after_seconds = part->modification_time + MAX_AGE_OF_LOCAL_PART_THAT_WASNT_ADDED_TO_ZOOKEEPER - current_time; result.status = {part_name, true, message}; result.action = ReplicatedCheckResult::RecheckLater; return result; @@ -436,9 +436,9 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPartAndFix(const String & p case ReplicatedCheckResult::RecheckLater: /// NOTE We cannot enqueue it from the check thread itself if (recheck_after) - *recheck_after = result.recheck_after; + *recheck_after = result.recheck_after_seconds; else - enqueuePart(part_name, result.recheck_after); + enqueuePart(part_name, result.recheck_after_seconds); break; case ReplicatedCheckResult::DetachUnexpected: @@ -539,7 +539,7 @@ void ReplicatedMergeTreePartCheckThread::run() try { - time_t current_time = time(nullptr); + const auto current_time = std::chrono::steady_clock::now(); /// Take part from the queue for verification. PartsToCheckQueue::iterator selected = parts_queue.end(); /// end from std::list is not get invalidated @@ -567,7 +567,7 @@ void ReplicatedMergeTreePartCheckThread::run() if (next_it != parts_queue.end()) { auto delay = next_it->time - current_time; - task->scheduleAfter(delay * 1000); + task->scheduleAfter(duration_cast(delay).count()); } return; } @@ -593,7 +593,7 @@ void ReplicatedMergeTreePartCheckThread::run() else if (recheck_after.has_value()) { LOG_TRACE(log, "Will recheck part {} after after {}s", selected->name, *recheck_after); - selected->time = time(nullptr) + *recheck_after; + selected->time = std::chrono::steady_clock::now() + std::chrono::seconds(*recheck_after); } else { diff --git a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h index eb9d6144593..68dc6ca3d1d 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h +++ b/src/Storages/MergeTree/ReplicatedMergeTreePartCheckThread.h @@ -37,7 +37,7 @@ struct ReplicatedCheckResult bool exists_in_zookeeper; MergeTreeDataPartPtr part; - time_t recheck_after = 0; + time_t recheck_after_seconds = 0; }; /** Checks the integrity of the parts requested for validation. @@ -92,8 +92,9 @@ private: using StringSet = std::set; struct PartToCheck { + using TimePoint = std::chrono::steady_clock::time_point; String name; - time_t time; + TimePoint time; }; using PartsToCheckQueue = std::list;