diff --git a/src/Storages/WindowView/StorageWindowView.cpp b/src/Storages/WindowView/StorageWindowView.cpp index 13c6fca5163..adc053dd3bc 100644 --- a/src/Storages/WindowView/StorageWindowView.cpp +++ b/src/Storages/WindowView/StorageWindowView.cpp @@ -392,27 +392,17 @@ static void extractDependentTable(ContextPtr context, ASTPtr & query, String & s UInt32 StorageWindowView::getCleanupBound() { - UInt32 w_bound; + if (max_fired_watermark == 0) + return 0; + if (is_proctime) + return max_fired_watermark; + else { - std::lock_guard lock(fire_signal_mutex); - w_bound = max_fired_watermark; - if (w_bound == 0) - return 0; - - if (!is_proctime) - { - if (max_watermark == 0) - return 0; - if (allowed_lateness) - { - UInt32 lateness_bound = addTime(max_timestamp, lateness_kind, -lateness_num_units, *time_zone); - lateness_bound = getWindowLowerBound(lateness_bound); - if (lateness_bound < w_bound) - w_bound = lateness_bound; - } - } + auto w_bound = max_fired_watermark; + if (allowed_lateness) + w_bound = addTime(w_bound, lateness_kind, -lateness_num_units, *time_zone); + return getWindowLowerBound(w_bound); } - return w_bound; } ASTPtr StorageWindowView::getCleanupQuery() @@ -838,7 +828,6 @@ void StorageWindowView::updateMaxWatermark(UInt32 watermark) while (max_watermark < watermark) { fire_signal.push_back(max_watermark); - max_fired_watermark = max_watermark; max_watermark = addTime(max_watermark, slide_kind, slide_num_units, *time_zone); } } @@ -849,7 +838,6 @@ void StorageWindowView::updateMaxWatermark(UInt32 watermark) while (max_watermark_bias <= max_timestamp) { fire_signal.push_back(max_watermark); - max_fired_watermark = max_watermark; max_watermark = addTime(max_watermark, slide_kind, slide_num_units, *time_zone); max_watermark_bias = addTime(max_watermark, slide_kind, slide_num_units, *time_zone); } @@ -861,8 +849,12 @@ void StorageWindowView::updateMaxWatermark(UInt32 watermark) inline void StorageWindowView::cleanup() { - InterpreterAlterQuery alter_query(getCleanupQuery(), getContext()); - alter_query.execute(); + { + std::lock_guard lock(mutex); + auto alter_query = getCleanupQuery(); + InterpreterAlterQuery interpreter_alter(alter_query, getContext()); + interpreter_alter.execute(); + } std::lock_guard lock(fire_signal_mutex); watch_streams.remove_if([](std::weak_ptr & ptr) { return ptr.expired(); }); @@ -927,6 +919,7 @@ void StorageWindowView::threadFuncFireEvent() while (!fire_signal.empty()) { fire(fire_signal.front()); + max_fired_watermark = fire_signal.front(); fire_signal.pop_front(); } }