fix fire in hop window

This commit is contained in:
Vxider 2022-05-03 12:42:42 +08:00
parent 07a7b6b17a
commit afbdb8fccc

View File

@ -909,38 +909,36 @@ void StorageWindowView::threadFuncCleanup()
void StorageWindowView::threadFuncFireProc()
{
static bool window_kind_larger_than_day = window_kind == IntervalKind::Week || window_kind == IntervalKind::Month
|| window_kind == IntervalKind::Quarter || window_kind == IntervalKind::Year;
std::unique_lock lock(fire_signal_mutex);
UInt32 timestamp_now = std::time(nullptr);
/// When window kind is larger than day, getWindowUpperBound() will get a day num instead of timestamp,
/// and addTime will also add day num to the next_fire_signal, so we need to convert it into timestamp.
/// Otherwise, we will get wrong result and after create window view with window kind larger than day,
/// since day num is too smaller than current timestamp, it will fire a lot.
auto exact_fire_signal = window_kind_larger_than_day ? next_fire_signal * 86400 : next_fire_signal;
while (exact_fire_signal <= timestamp_now)
while (next_fire_signal <= timestamp_now)
{
try
{
fire(exact_fire_signal);
fire(next_fire_signal);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
max_fired_watermark = exact_fire_signal;
next_fire_signal = addTime(next_fire_signal, window_kind, window_num_units, *time_zone);
exact_fire_signal = window_kind_larger_than_day ? next_fire_signal * 86400 : next_fire_signal;
max_fired_watermark = next_fire_signal;
auto slide_interval
= is_tumble ? addTime(0, window_kind, window_num_units, *time_zone) : addTime(0, hop_kind, hop_num_units, *time_zone);
/// When window is larger than day, getWindowUpperBound() will get a day num instead of timestamp,
/// and addTime will also add day num to the next_fire_signal, so we need to convert it into timestamp.
/// Otherwise, we will get wrong result and after create window view with window kind larger than day,
/// since day num is too smaller than current timestamp, it will fire a lot.
if (is_tumble ? window_kind > IntervalKind::Day : hop_kind > IntervalKind::Day)
slide_interval *= 86400;
next_fire_signal += slide_interval;
}
UInt64 timestamp_ms = static_cast<UInt64>(Poco::Timestamp().epochMicroseconds()) / 1000;
if (!shutdown_called)
fire_task->scheduleAfter(std::max(
UInt64(0),
static_cast<UInt64>(window_kind_larger_than_day ? next_fire_signal * 86400 : next_fire_signal) * 1000 - timestamp_ms));
static_cast<UInt64>(next_fire_signal) * 1000 - timestamp_ms));
}
void StorageWindowView::threadFuncFireEvent()