fix windowview cleanup

This commit is contained in:
Vxider 2022-05-15 16:50:34 +00:00
parent 51ad78c5f7
commit 18ee285ce1

View File

@ -392,27 +392,17 @@ static void extractDependentTable(ContextPtr context, ASTPtr & query, String & s
UInt32 StorageWindowView::getCleanupBound()
{
UInt32 w_bound;
if (max_fired_watermark == 0)
return 0;
if (is_proctime)
return max_fired_watermark;
else
{
std::lock_guard lock(fire_signal_mutex);
w_bound = max_fired_watermark;
if (w_bound == 0)
return 0;
if (!is_proctime)
{
if (max_watermark == 0)
return 0;
if (allowed_lateness)
{
UInt32 lateness_bound = addTime(max_timestamp, lateness_kind, -lateness_num_units, *time_zone);
lateness_bound = getWindowLowerBound(lateness_bound);
if (lateness_bound < w_bound)
w_bound = lateness_bound;
}
}
auto w_bound = max_fired_watermark;
if (allowed_lateness)
w_bound = addTime(w_bound, lateness_kind, -lateness_num_units, *time_zone);
return getWindowLowerBound(w_bound);
}
return w_bound;
}
ASTPtr StorageWindowView::getCleanupQuery()
@ -838,7 +828,6 @@ void StorageWindowView::updateMaxWatermark(UInt32 watermark)
while (max_watermark < watermark)
{
fire_signal.push_back(max_watermark);
max_fired_watermark = max_watermark;
max_watermark = addTime(max_watermark, slide_kind, slide_num_units, *time_zone);
}
}
@ -849,7 +838,6 @@ void StorageWindowView::updateMaxWatermark(UInt32 watermark)
while (max_watermark_bias <= max_timestamp)
{
fire_signal.push_back(max_watermark);
max_fired_watermark = max_watermark;
max_watermark = addTime(max_watermark, slide_kind, slide_num_units, *time_zone);
max_watermark_bias = addTime(max_watermark, slide_kind, slide_num_units, *time_zone);
}
@ -861,8 +849,12 @@ void StorageWindowView::updateMaxWatermark(UInt32 watermark)
inline void StorageWindowView::cleanup()
{
InterpreterAlterQuery alter_query(getCleanupQuery(), getContext());
alter_query.execute();
{
std::lock_guard lock(mutex);
auto alter_query = getCleanupQuery();
InterpreterAlterQuery interpreter_alter(alter_query, getContext());
interpreter_alter.execute();
}
std::lock_guard lock(fire_signal_mutex);
watch_streams.remove_if([](std::weak_ptr<WindowViewSource> & ptr) { return ptr.expired(); });
@ -927,6 +919,7 @@ void StorageWindowView::threadFuncFireEvent()
while (!fire_signal.empty())
{
fire(fire_signal.front());
max_fired_watermark = fire_signal.front();
fire_signal.pop_front();
}
}