From 6882e8ad79bcc1ce0b2c80e82aee38f57a4504bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Wed, 7 Aug 2024 14:34:33 +0200 Subject: [PATCH 1/4] Revert "Merge pull request #67130 from rschu1ze/unflake-win-view-tests2" This reverts commit 48e61a295cf3e2a7ab93dc99531d14701b90004c, reversing changes made to da24aa06fac26bf1516320cc6e49c8927b1f600a. --- .../queries/0_stateless/01052_window_view_proc_tumble_to_now.sh | 1 - tests/queries/0_stateless/01053_window_view_proc_hop_to_now.sh | 1 - tests/queries/0_stateless/01054_window_view_proc_tumble_to.sh | 1 - tests/queries/0_stateless/01055_window_view_proc_hop_to.sh | 1 - .../0_stateless/01075_window_view_proc_tumble_to_now_populate.sh | 1 - 5 files changed, 5 deletions(-) diff --git a/tests/queries/0_stateless/01052_window_view_proc_tumble_to_now.sh b/tests/queries/0_stateless/01052_window_view_proc_tumble_to_now.sh index c473bf766b0..a235b60cee9 100755 --- a/tests/queries/0_stateless/01052_window_view_proc_tumble_to_now.sh +++ b/tests/queries/0_stateless/01052_window_view_proc_tumble_to_now.sh @@ -1,5 +1,4 @@ #!/usr/bin/env bash -# Tags: no-random-settings, no-parallel CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh diff --git a/tests/queries/0_stateless/01053_window_view_proc_hop_to_now.sh b/tests/queries/0_stateless/01053_window_view_proc_hop_to_now.sh index ca89dd9daf1..b4a647c9864 100755 --- a/tests/queries/0_stateless/01053_window_view_proc_hop_to_now.sh +++ b/tests/queries/0_stateless/01053_window_view_proc_hop_to_now.sh @@ -1,5 +1,4 @@ #!/usr/bin/env bash -# Tags: no-random-settings, no-parallel CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh diff --git a/tests/queries/0_stateless/01054_window_view_proc_tumble_to.sh b/tests/queries/0_stateless/01054_window_view_proc_tumble_to.sh index 67cac226de6..af9a47714b5 100755 --- a/tests/queries/0_stateless/01054_window_view_proc_tumble_to.sh +++ b/tests/queries/0_stateless/01054_window_view_proc_tumble_to.sh @@ -1,5 +1,4 @@ #!/usr/bin/env bash -# Tags: no-random-settings, no-parallel CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh diff --git a/tests/queries/0_stateless/01055_window_view_proc_hop_to.sh b/tests/queries/0_stateless/01055_window_view_proc_hop_to.sh index e44fc2e363c..5a7e92e6a1b 100755 --- a/tests/queries/0_stateless/01055_window_view_proc_hop_to.sh +++ b/tests/queries/0_stateless/01055_window_view_proc_hop_to.sh @@ -1,5 +1,4 @@ #!/usr/bin/env bash -# Tags: no-random-settings, no-parallel CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh diff --git a/tests/queries/0_stateless/01075_window_view_proc_tumble_to_now_populate.sh b/tests/queries/0_stateless/01075_window_view_proc_tumble_to_now_populate.sh index 859b9a86a2a..220bb39602b 100755 --- a/tests/queries/0_stateless/01075_window_view_proc_tumble_to_now_populate.sh +++ b/tests/queries/0_stateless/01075_window_view_proc_tumble_to_now_populate.sh @@ -1,5 +1,4 @@ #!/usr/bin/env bash -# Tags: no-random-settings, no-parallel CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh From 5f3417153439b56b68a34be890b75884941800dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Wed, 7 Aug 2024 15:12:06 +0200 Subject: [PATCH 2/4] Fix window view missing blocks due to slow flush to view --- src/Storages/WindowView/StorageWindowView.cpp | 25 ++++++++++++++++--- .../01056_window_view_proc_hop_watch.py | 7 +++--- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/src/Storages/WindowView/StorageWindowView.cpp b/src/Storages/WindowView/StorageWindowView.cpp index 5830c844582..94eed575ca8 100644 --- a/src/Storages/WindowView/StorageWindowView.cpp +++ b/src/Storages/WindowView/StorageWindowView.cpp @@ -1051,17 +1051,27 @@ void StorageWindowView::threadFuncFireProc() if (shutdown_called) return; + /// Acquiring the lock can take seconds (depends on how long it takes to push) so we keep a reference to remember + /// what's the starting point where we want to push from + UInt32 timestamp_start = now(); + std::lock_guard lock(fire_signal_mutex); /// TODO: consider using time_t instead (for every timestamp in this class) UInt32 timestamp_now = now(); - LOG_TRACE(log, "Now: {}, next fire signal: {}, max watermark: {}", timestamp_now, next_fire_signal, max_watermark); + LOG_TRACE( + log, + "Start: {}, now: {}, next fire signal: {}, max watermark: {}", + timestamp_start, + timestamp_now, + next_fire_signal, + max_watermark); while (next_fire_signal <= timestamp_now) { try { - if (max_watermark >= timestamp_now) + if (max_watermark >= timestamp_start) fire(next_fire_signal); } catch (...) @@ -1075,8 +1085,15 @@ void StorageWindowView::threadFuncFireProc() slide_interval *= 86400; next_fire_signal += slide_interval; - LOG_TRACE(log, "Now: {}, next fire signal: {}, max watermark: {}, max fired watermark: {}, slide interval: {}", - timestamp_now, next_fire_signal, max_watermark, max_fired_watermark, slide_interval); + LOG_TRACE( + log, + "Start: {}, now: {}, next fire signal: {}, max watermark: {}, max fired watermark: {}, slide interval: {}", + timestamp_start, + timestamp_now, + next_fire_signal, + max_watermark, + max_fired_watermark, + slide_interval); } if (max_watermark >= timestamp_now) diff --git a/tests/queries/0_stateless/01056_window_view_proc_hop_watch.py b/tests/queries/0_stateless/01056_window_view_proc_hop_watch.py index 6bb8db2c38e..bc4ec16f1b1 100755 --- a/tests/queries/0_stateless/01056_window_view_proc_hop_watch.py +++ b/tests/queries/0_stateless/01056_window_view_proc_hop_watch.py @@ -20,7 +20,7 @@ with client(name="client1>", log=log) as client1, client( client1.expect(prompt) client2.expect(prompt) - client1.send("SET enable_analyzer = 0") + client1.send("SET enable_analyzer = 0, function_sleep_max_microseconds_per_block=10000000") client1.expect(prompt) client1.send("SET allow_experimental_window_view = 1") client1.expect(prompt) @@ -28,7 +28,7 @@ with client(name="client1>", log=log) as client1, client( client1.expect(prompt) client2.send("SET allow_experimental_window_view = 1") client2.expect(prompt) - client2.send("SET enable_analyzer = 0") + client2.send("SET enable_analyzer = 0, function_sleep_max_microseconds_per_block=10000000") client2.expect(prompt) client1.send("CREATE DATABASE IF NOT EXISTS 01056_window_view_proc_hop_watch") @@ -42,8 +42,9 @@ with client(name="client1>", log=log) as client1, client( "CREATE TABLE 01056_window_view_proc_hop_watch.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple()" ) client1.expect(prompt) + # Introduce a sleep call to verify that even if the push to view is slow WATCH will work client1.send( - "CREATE WINDOW VIEW 01056_window_view_proc_hop_watch.wv ENGINE Memory AS SELECT count(a) AS count FROM 01056_window_view_proc_hop_watch.mt GROUP BY hop(timestamp, INTERVAL '1' SECOND, INTERVAL '1' SECOND, 'US/Samoa') AS wid;" + "CREATE WINDOW VIEW 01056_window_view_proc_hop_watch.wv ENGINE Memory AS SELECT count(sleep(5)) AS count FROM 01056_window_view_proc_hop_watch.mt GROUP BY hop(timestamp, INTERVAL '1' SECOND, INTERVAL '1' SECOND, 'US/Samoa') AS wid;" ) client1.expect(prompt) From aca7e6734ceb5a0ec7a8d6f8f3bfa5bde05c860e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Wed, 7 Aug 2024 16:24:56 +0200 Subject: [PATCH 3/4] Style --- .../0_stateless/01056_window_view_proc_hop_watch.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/queries/0_stateless/01056_window_view_proc_hop_watch.py b/tests/queries/0_stateless/01056_window_view_proc_hop_watch.py index bc4ec16f1b1..e0f969050b5 100755 --- a/tests/queries/0_stateless/01056_window_view_proc_hop_watch.py +++ b/tests/queries/0_stateless/01056_window_view_proc_hop_watch.py @@ -20,7 +20,9 @@ with client(name="client1>", log=log) as client1, client( client1.expect(prompt) client2.expect(prompt) - client1.send("SET enable_analyzer = 0, function_sleep_max_microseconds_per_block=10000000") + client1.send( + "SET enable_analyzer = 0, function_sleep_max_microseconds_per_block=10000000" + ) client1.expect(prompt) client1.send("SET allow_experimental_window_view = 1") client1.expect(prompt) @@ -28,7 +30,9 @@ with client(name="client1>", log=log) as client1, client( client1.expect(prompt) client2.send("SET allow_experimental_window_view = 1") client2.expect(prompt) - client2.send("SET enable_analyzer = 0, function_sleep_max_microseconds_per_block=10000000") + client2.send( + "SET enable_analyzer = 0, function_sleep_max_microseconds_per_block=10000000" + ) client2.expect(prompt) client1.send("CREATE DATABASE IF NOT EXISTS 01056_window_view_proc_hop_watch") From 35eb4fa1766c6f5b5d81cbfc68e63cadd4b01838 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Mar=C3=ADn?= Date: Thu, 8 Aug 2024 12:03:14 +0200 Subject: [PATCH 4/4] Schedule WV cleanup after any fire trigger --- src/Storages/WindowView/StorageWindowView.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/WindowView/StorageWindowView.cpp b/src/Storages/WindowView/StorageWindowView.cpp index 94eed575ca8..4a20a07ae89 100644 --- a/src/Storages/WindowView/StorageWindowView.cpp +++ b/src/Storages/WindowView/StorageWindowView.cpp @@ -1096,7 +1096,7 @@ void StorageWindowView::threadFuncFireProc() slide_interval); } - if (max_watermark >= timestamp_now) + if (max_watermark >= timestamp_start) clean_cache_task->schedule(); UInt64 next_fire_ms = static_cast(next_fire_signal) * 1000;