From 36383d83f1958e9099f437535daa8c2dcb586782 Mon Sep 17 00:00:00 2001 From: Vxider Date: Tue, 31 Mar 2020 14:28:04 +0800 Subject: [PATCH] use max_fired_watermark to cleanup cache in proc time --- .../Storages/WindowView/StorageWindowView.cpp | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/dbms/src/Storages/WindowView/StorageWindowView.cpp b/dbms/src/Storages/WindowView/StorageWindowView.cpp index 4ab2b4f582b..3cb82871a3e 100644 --- a/dbms/src/Storages/WindowView/StorageWindowView.cpp +++ b/dbms/src/Storages/WindowView/StorageWindowView.cpp @@ -341,31 +341,26 @@ Pipes StorageWindowView::blocksToPipes(BlocksList & blocks, Block & sample_block inline void StorageWindowView::cleanCache() { UInt32 w_bound; - if (is_proctime) - { - w_bound = getWindowUpperBound(std::time(nullptr)); - } - else { std::lock_guard lock(fire_signal_mutex); - if (max_watermark == 0) - return; w_bound = max_fired_watermark; if (w_bound == 0) return; - if (allowed_lateness) + if (!is_proctime) { - UInt32 lateness_bound = addTime(max_timestamp, lateness_kind, -1 * lateness_num_units, time_zone); - lateness_bound = getWindowLowerBound(lateness_bound); - if (lateness_bound < w_bound) - w_bound = lateness_bound; + if (max_watermark == 0) + return; + if (allowed_lateness) + { + UInt32 lateness_bound = addTime(max_timestamp, lateness_kind, -1 * lateness_num_units, time_zone); + lateness_bound = getWindowLowerBound(lateness_bound); + if (lateness_bound < w_bound) + w_bound = lateness_bound; + } } } - w_bound = is_tumble ? addTime(w_bound, window_kind, -1 * window_num_units, time_zone) - : addTime(w_bound, hop_kind, -1 * hop_num_units, time_zone); - auto sql = generateCleanCacheQuery(w_bound); InterpreterAlterQuery alt_query(sql, global_context); alt_query.execute(); @@ -694,6 +689,7 @@ void StorageWindowView::threadFuncFireProc() while (next_fire_signal <= timestamp_now) { fire(next_fire_signal); + max_fired_watermark = next_fire_signal; next_fire_signal = addTime(next_fire_signal, window_kind, window_num_units, time_zone); }