Fix window view missing blocks due to slow flush to view

This commit is contained in:
Raúl Marín 2024-08-07 15:12:06 +02:00
parent 6882e8ad79
commit 5f34171534
2 changed files with 25 additions and 7 deletions

View File

@ -1051,17 +1051,27 @@ void StorageWindowView::threadFuncFireProc()
if (shutdown_called)
return;
/// Acquiring the lock can take seconds (depends on how long it takes to push) so we keep a reference to remember
/// what's the starting point where we want to push from
UInt32 timestamp_start = now();
std::lock_guard lock(fire_signal_mutex);
/// TODO: consider using time_t instead (for every timestamp in this class)
UInt32 timestamp_now = now();
LOG_TRACE(log, "Now: {}, next fire signal: {}, max watermark: {}", timestamp_now, next_fire_signal, max_watermark);
LOG_TRACE(
log,
"Start: {}, now: {}, next fire signal: {}, max watermark: {}",
timestamp_start,
timestamp_now,
next_fire_signal,
max_watermark);
while (next_fire_signal <= timestamp_now)
{
try
{
if (max_watermark >= timestamp_now)
if (max_watermark >= timestamp_start)
fire(next_fire_signal);
}
catch (...)
@ -1075,8 +1085,15 @@ void StorageWindowView::threadFuncFireProc()
slide_interval *= 86400;
next_fire_signal += slide_interval;
LOG_TRACE(log, "Now: {}, next fire signal: {}, max watermark: {}, max fired watermark: {}, slide interval: {}",
timestamp_now, next_fire_signal, max_watermark, max_fired_watermark, slide_interval);
LOG_TRACE(
log,
"Start: {}, now: {}, next fire signal: {}, max watermark: {}, max fired watermark: {}, slide interval: {}",
timestamp_start,
timestamp_now,
next_fire_signal,
max_watermark,
max_fired_watermark,
slide_interval);
}
if (max_watermark >= timestamp_now)

View File

@ -20,7 +20,7 @@ with client(name="client1>", log=log) as client1, client(
client1.expect(prompt)
client2.expect(prompt)
client1.send("SET enable_analyzer = 0")
client1.send("SET enable_analyzer = 0, function_sleep_max_microseconds_per_block=10000000")
client1.expect(prompt)
client1.send("SET allow_experimental_window_view = 1")
client1.expect(prompt)
@ -28,7 +28,7 @@ with client(name="client1>", log=log) as client1, client(
client1.expect(prompt)
client2.send("SET allow_experimental_window_view = 1")
client2.expect(prompt)
client2.send("SET enable_analyzer = 0")
client2.send("SET enable_analyzer = 0, function_sleep_max_microseconds_per_block=10000000")
client2.expect(prompt)
client1.send("CREATE DATABASE IF NOT EXISTS 01056_window_view_proc_hop_watch")
@ -42,8 +42,9 @@ with client(name="client1>", log=log) as client1, client(
"CREATE TABLE 01056_window_view_proc_hop_watch.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple()"
)
client1.expect(prompt)
# Introduce a sleep call to verify that even if the push to view is slow WATCH will work
client1.send(
"CREATE WINDOW VIEW 01056_window_view_proc_hop_watch.wv ENGINE Memory AS SELECT count(a) AS count FROM 01056_window_view_proc_hop_watch.mt GROUP BY hop(timestamp, INTERVAL '1' SECOND, INTERVAL '1' SECOND, 'US/Samoa') AS wid;"
"CREATE WINDOW VIEW 01056_window_view_proc_hop_watch.wv ENGINE Memory AS SELECT count(sleep(5)) AS count FROM 01056_window_view_proc_hop_watch.mt GROUP BY hop(timestamp, INTERVAL '1' SECOND, INTERVAL '1' SECOND, 'US/Samoa') AS wid;"
)
client1.expect(prompt)