use max_fired_watermark to cleanup cache in proc time

This commit is contained in:
Vxider 2020-03-31 14:28:04 +08:00
parent 8f84599ba3
commit 36383d83f1

View File

@ -341,31 +341,26 @@ Pipes StorageWindowView::blocksToPipes(BlocksList & blocks, Block & sample_block
inline void StorageWindowView::cleanCache() inline void StorageWindowView::cleanCache()
{ {
UInt32 w_bound; UInt32 w_bound;
if (is_proctime)
{
w_bound = getWindowUpperBound(std::time(nullptr));
}
else
{ {
std::lock_guard lock(fire_signal_mutex); std::lock_guard lock(fire_signal_mutex);
if (max_watermark == 0)
return;
w_bound = max_fired_watermark; w_bound = max_fired_watermark;
if (w_bound == 0) if (w_bound == 0)
return; return;
if (allowed_lateness) if (!is_proctime)
{ {
UInt32 lateness_bound = addTime(max_timestamp, lateness_kind, -1 * lateness_num_units, time_zone); if (max_watermark == 0)
lateness_bound = getWindowLowerBound(lateness_bound); return;
if (lateness_bound < w_bound) if (allowed_lateness)
w_bound = lateness_bound; {
UInt32 lateness_bound = addTime(max_timestamp, lateness_kind, -1 * lateness_num_units, time_zone);
lateness_bound = getWindowLowerBound(lateness_bound);
if (lateness_bound < w_bound)
w_bound = lateness_bound;
}
} }
} }
w_bound = is_tumble ? addTime(w_bound, window_kind, -1 * window_num_units, time_zone)
: addTime(w_bound, hop_kind, -1 * hop_num_units, time_zone);
auto sql = generateCleanCacheQuery(w_bound); auto sql = generateCleanCacheQuery(w_bound);
InterpreterAlterQuery alt_query(sql, global_context); InterpreterAlterQuery alt_query(sql, global_context);
alt_query.execute(); alt_query.execute();
@ -694,6 +689,7 @@ void StorageWindowView::threadFuncFireProc()
while (next_fire_signal <= timestamp_now) while (next_fire_signal <= timestamp_now)
{ {
fire(next_fire_signal); fire(next_fire_signal);
max_fired_watermark = next_fire_signal;
next_fire_signal = addTime(next_fire_signal, window_kind, window_num_units, time_zone); next_fire_signal = addTime(next_fire_signal, window_kind, window_num_units, time_zone);
} }