diff --git a/dbms/src/Storages/WindowView/StorageWindowView.cpp b/dbms/src/Storages/WindowView/StorageWindowView.cpp index 4ab2b4f582b..3cb82871a3e 100644 --- a/dbms/src/Storages/WindowView/StorageWindowView.cpp +++ b/dbms/src/Storages/WindowView/StorageWindowView.cpp @@ -341,31 +341,26 @@ Pipes StorageWindowView::blocksToPipes(BlocksList & blocks, Block & sample_block inline void StorageWindowView::cleanCache() { UInt32 w_bound; - if (is_proctime) - { - w_bound = getWindowUpperBound(std::time(nullptr)); - } - else { std::lock_guard lock(fire_signal_mutex); - if (max_watermark == 0) - return; w_bound = max_fired_watermark; if (w_bound == 0) return; - if (allowed_lateness) + if (!is_proctime) { - UInt32 lateness_bound = addTime(max_timestamp, lateness_kind, -1 * lateness_num_units, time_zone); - lateness_bound = getWindowLowerBound(lateness_bound); - if (lateness_bound < w_bound) - w_bound = lateness_bound; + if (max_watermark == 0) + return; + if (allowed_lateness) + { + UInt32 lateness_bound = addTime(max_timestamp, lateness_kind, -1 * lateness_num_units, time_zone); + lateness_bound = getWindowLowerBound(lateness_bound); + if (lateness_bound < w_bound) + w_bound = lateness_bound; + } } } - w_bound = is_tumble ? addTime(w_bound, window_kind, -1 * window_num_units, time_zone) - : addTime(w_bound, hop_kind, -1 * hop_num_units, time_zone); - auto sql = generateCleanCacheQuery(w_bound); InterpreterAlterQuery alt_query(sql, global_context); alt_query.execute(); @@ -694,6 +689,7 @@ void StorageWindowView::threadFuncFireProc() while (next_fire_signal <= timestamp_now) { fire(next_fire_signal); + max_fired_watermark = next_fire_signal; next_fire_signal = addTime(next_fire_signal, window_kind, window_num_units, time_zone); }