Merge pull request #67983 from Algunenano/flaky_win_view

Fix window view missing blocks due to slow flush to view
This commit is contained in:
Raúl Marín 2024-08-27 10:35:09 +00:00 committed by GitHub
commit e3ef11e505
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 30 additions and 13 deletions

View File

@ -1051,17 +1051,27 @@ void StorageWindowView::threadFuncFireProc()
if (shutdown_called)
return;
/// Acquiring the lock can take seconds (depends on how long it takes to push) so we keep a reference to remember
/// what's the starting point where we want to push from
UInt32 timestamp_start = now();
std::lock_guard lock(fire_signal_mutex);
/// TODO: consider using time_t instead (for every timestamp in this class)
UInt32 timestamp_now = now();
LOG_TRACE(log, "Now: {}, next fire signal: {}, max watermark: {}", timestamp_now, next_fire_signal, max_watermark);
LOG_TRACE(
log,
"Start: {}, now: {}, next fire signal: {}, max watermark: {}",
timestamp_start,
timestamp_now,
next_fire_signal,
max_watermark);
while (next_fire_signal <= timestamp_now)
{
try
{
if (max_watermark >= timestamp_now)
if (max_watermark >= timestamp_start)
fire(next_fire_signal);
}
catch (...)
@ -1075,11 +1085,18 @@ void StorageWindowView::threadFuncFireProc()
slide_interval *= 86400;
next_fire_signal += slide_interval;
LOG_TRACE(log, "Now: {}, next fire signal: {}, max watermark: {}, max fired watermark: {}, slide interval: {}",
timestamp_now, next_fire_signal, max_watermark, max_fired_watermark, slide_interval);
LOG_TRACE(
log,
"Start: {}, now: {}, next fire signal: {}, max watermark: {}, max fired watermark: {}, slide interval: {}",
timestamp_start,
timestamp_now,
next_fire_signal,
max_watermark,
max_fired_watermark,
slide_interval);
}
if (max_watermark >= timestamp_now)
if (max_watermark >= timestamp_start)
clean_cache_task->schedule();
UInt64 next_fire_ms = static_cast<UInt64>(next_fire_signal) * 1000;

View File

@ -1,5 +1,4 @@
#!/usr/bin/env bash
# Tags: no-random-settings, no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -1,5 +1,4 @@
#!/usr/bin/env bash
# Tags: no-random-settings, no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -1,5 +1,4 @@
#!/usr/bin/env bash
# Tags: no-random-settings, no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -1,5 +1,4 @@
#!/usr/bin/env bash
# Tags: no-random-settings, no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh

View File

@ -20,7 +20,9 @@ with client(name="client1>", log=log) as client1, client(
client1.expect(prompt)
client2.expect(prompt)
client1.send("SET enable_analyzer = 0")
client1.send(
"SET enable_analyzer = 0, function_sleep_max_microseconds_per_block=10000000"
)
client1.expect(prompt)
client1.send("SET allow_experimental_window_view = 1")
client1.expect(prompt)
@ -28,7 +30,9 @@ with client(name="client1>", log=log) as client1, client(
client1.expect(prompt)
client2.send("SET allow_experimental_window_view = 1")
client2.expect(prompt)
client2.send("SET enable_analyzer = 0")
client2.send(
"SET enable_analyzer = 0, function_sleep_max_microseconds_per_block=10000000"
)
client2.expect(prompt)
client1.send("CREATE DATABASE IF NOT EXISTS 01056_window_view_proc_hop_watch")
@ -42,8 +46,9 @@ with client(name="client1>", log=log) as client1, client(
"CREATE TABLE 01056_window_view_proc_hop_watch.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple()"
)
client1.expect(prompt)
# Introduce a sleep call to verify that even if the push to view is slow WATCH will work
client1.send(
"CREATE WINDOW VIEW 01056_window_view_proc_hop_watch.wv ENGINE Memory AS SELECT count(a) AS count FROM 01056_window_view_proc_hop_watch.mt GROUP BY hop(timestamp, INTERVAL '1' SECOND, INTERVAL '1' SECOND, 'US/Samoa') AS wid;"
"CREATE WINDOW VIEW 01056_window_view_proc_hop_watch.wv ENGINE Memory AS SELECT count(sleep(5)) AS count FROM 01056_window_view_proc_hop_watch.mt GROUP BY hop(timestamp, INTERVAL '1' SECOND, INTERVAL '1' SECOND, 'US/Samoa') AS wid;"
)
client1.expect(prompt)

View File

@ -1,5 +1,4 @@
#!/usr/bin/env bash
# Tags: no-random-settings, no-parallel
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh