diff --git a/src/Storages/WindowView/StorageWindowView.cpp b/src/Storages/WindowView/StorageWindowView.cpp index 0b822b9aab3..04c26053dba 100644 --- a/src/Storages/WindowView/StorageWindowView.cpp +++ b/src/Storages/WindowView/StorageWindowView.cpp @@ -439,6 +439,7 @@ bool StorageWindowView::optimize( bool cleanup, ContextPtr local_context) { + throwIfWindowViewIsDisabled(local_context); auto storage_ptr = getInnerTable(); auto metadata_snapshot = storage_ptr->getInMemoryMetadataPtr(); return getInnerTable()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, cleanup, local_context); @@ -449,6 +450,7 @@ void StorageWindowView::alter( ContextPtr local_context, AlterLockHolder &) { + throwIfWindowViewIsDisabled(local_context); auto table_id = getStorageID(); StorageInMemoryMetadata new_metadata = getInMemoryMetadata(); StorageInMemoryMetadata old_metadata = getInMemoryMetadata(); @@ -508,8 +510,9 @@ void StorageWindowView::alter( startup(); } -void StorageWindowView::checkAlterIsPossible(const AlterCommands & commands, ContextPtr /*local_context*/) const +void StorageWindowView::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const { + throwIfWindowViewIsDisabled(local_context); for (const auto & command : commands) { if (!command.isCommentAlter() && command.type != AlterCommand::MODIFY_QUERY) @@ -519,6 +522,7 @@ void StorageWindowView::checkAlterIsPossible(const AlterCommands & commands, Con std::pair StorageWindowView::getNewBlocks(UInt32 watermark) { + throwIfWindowViewIsDisabled(); UInt32 w_start = addTime(watermark, window_kind, -window_num_units, *time_zone); auto inner_table = getInnerTable(); @@ -654,6 +658,7 @@ std::pair StorageWindowView::getNewBlocks(UInt32 watermark) inline void StorageWindowView::fire(UInt32 watermark) { + throwIfWindowViewIsDisabled(); LOG_TRACE(log, "Watch streams number: {}, target table: {}", watch_streams.size(), target_table_id.empty() ? "None" : target_table_id.getNameForLogs()); @@ -722,6 +727,7 @@ inline void StorageWindowView::fire(UInt32 watermark) ASTPtr StorageWindowView::getSourceTableSelectQuery() { + throwIfWindowViewIsDisabled(); auto query = select_query->clone(); auto & modified_select = query->as(); @@ -947,6 +953,7 @@ UInt32 StorageWindowView::getWindowUpperBound(UInt32 time_sec) void StorageWindowView::addFireSignal(std::set & signals) { + throwIfWindowViewIsDisabled(); std::lock_guard lock(fire_signal_mutex); for (const auto & signal : signals) fire_signal.push_back(signal); @@ -962,6 +969,7 @@ void StorageWindowView::updateMaxTimestamp(UInt32 timestamp) void StorageWindowView::updateMaxWatermark(UInt32 watermark) { + throwIfWindowViewIsDisabled(); if (is_proctime) { max_watermark = watermark; @@ -1014,6 +1022,7 @@ void StorageWindowView::cleanup() void StorageWindowView::threadFuncCleanup() { + throwIfWindowViewIsDisabled(); if (shutdown_called) return; @@ -1033,6 +1042,7 @@ void StorageWindowView::threadFuncCleanup() void StorageWindowView::threadFuncFireProc() { + throwIfWindowViewIsDisabled(); if (shutdown_called) return; @@ -1069,6 +1079,7 @@ void StorageWindowView::threadFuncFireProc() void StorageWindowView::threadFuncFireEvent() { + throwIfWindowViewIsDisabled(); std::lock_guard lock(fire_signal_mutex); LOG_TRACE(log, "Fire events: {}", fire_signal.size()); @@ -1100,6 +1111,7 @@ void StorageWindowView::read( const size_t max_block_size, const size_t num_streams) { + throwIfWindowViewIsDisabled(local_context); if (target_table_id.empty()) return; @@ -1140,6 +1152,7 @@ Pipe StorageWindowView::watch( size_t /*max_block_size*/, const size_t /*num_streams*/) { + throwIfWindowViewIsDisabled(local_context); ASTWatchQuery & query = typeid_cast(*query_info.query); bool has_limit = false; @@ -1178,8 +1191,10 @@ StorageWindowView::StorageWindowView( , clean_interval_usec(context_->getSettingsRef().window_view_clean_interval.totalMicroseconds()) { if (context_->getSettingsRef().allow_experimental_analyzer) - throw Exception(ErrorCodes::UNSUPPORTED_METHOD, - "Experimental WINDOW VIEW feature is not supported with new infrastructure for query analysis (the setting 'allow_experimental_analyzer')"); + disabled_due_to_analyzer = true; + + if (mode <= LoadingStrictnessLevel::CREATE) + throwIfWindowViewIsDisabled(); if (!query.select) throw Exception(ErrorCodes::INCORRECT_QUERY, "SELECT query is not specified for {}", getName()); @@ -1243,6 +1258,9 @@ StorageWindowView::StorageWindowView( } } + if (disabled_due_to_analyzer) + return; + clean_cache_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); }); fire_task = getContext()->getSchedulePool().createTask( getStorageID().getFullTableName(), [this] { is_proctime ? threadFuncFireProc() : threadFuncFireEvent(); }); @@ -1400,6 +1418,7 @@ void StorageWindowView::eventTimeParser(const ASTCreateQuery & query) void StorageWindowView::writeIntoWindowView( StorageWindowView & window_view, const Block & block, ContextPtr local_context) { + window_view.throwIfWindowViewIsDisabled(local_context); while (window_view.modifying_query) std::this_thread::sleep_for(std::chrono::milliseconds(100)); @@ -1589,6 +1608,9 @@ void StorageWindowView::writeIntoWindowView( void StorageWindowView::startup() { + if (disabled_due_to_analyzer) + return; + DatabaseCatalog::instance().addViewDependency(select_table_id, getStorageID()); fire_task->activate(); @@ -1602,6 +1624,8 @@ void StorageWindowView::startup() void StorageWindowView::shutdown(bool) { shutdown_called = true; + if (disabled_due_to_analyzer) + return; fire_condition.notify_all(); @@ -1657,6 +1681,7 @@ Block StorageWindowView::getInputHeader() const const Block & StorageWindowView::getOutputHeader() const { + throwIfWindowViewIsDisabled(); std::lock_guard lock(sample_block_lock); if (!output_header) { @@ -1681,6 +1706,13 @@ StoragePtr StorageWindowView::getTargetTable() const return DatabaseCatalog::instance().getTable(target_table_id, getContext()); } +void StorageWindowView::throwIfWindowViewIsDisabled(ContextPtr local_context) const +{ + if (disabled_due_to_analyzer || (local_context && local_context->getSettingsRef().allow_experimental_analyzer)) + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Experimental WINDOW VIEW feature is not supported " + "in the current infrastructure for query analysis (the setting 'allow_experimental_analyzer')"); +} + void registerStorageWindowView(StorageFactory & factory) { factory.registerStorage("WindowView", [](const StorageFactory::Arguments & args) diff --git a/src/Storages/WindowView/StorageWindowView.h b/src/Storages/WindowView/StorageWindowView.h index 0b7cd54e3a7..f79867df424 100644 --- a/src/Storages/WindowView/StorageWindowView.h +++ b/src/Storages/WindowView/StorageWindowView.h @@ -271,5 +271,9 @@ private: StoragePtr getSourceTable() const; StoragePtr getInnerTable() const; StoragePtr getTargetTable() const; + + bool disabled_due_to_analyzer = false; + + void throwIfWindowViewIsDisabled(ContextPtr local_context = nullptr) const; }; } diff --git a/tests/queries/0_stateless/01056_window_view_proc_hop_watch.py b/tests/queries/0_stateless/01056_window_view_proc_hop_watch.py index 4c3e3ead455..2db14fcdddf 100755 --- a/tests/queries/0_stateless/01056_window_view_proc_hop_watch.py +++ b/tests/queries/0_stateless/01056_window_view_proc_hop_watch.py @@ -28,6 +28,8 @@ with client(name="client1>", log=log) as client1, client( client1.expect(prompt) client2.send("SET allow_experimental_window_view = 1") client2.expect(prompt) + client2.send("SET allow_experimental_analyzer = 0") + client2.expect(prompt) client1.send("CREATE DATABASE IF NOT EXISTS 01056_window_view_proc_hop_watch") client1.expect(prompt) diff --git a/tests/queries/0_stateless/01059_window_view_event_hop_watch_strict_asc.py b/tests/queries/0_stateless/01059_window_view_event_hop_watch_strict_asc.py index 9adff06442e..2323ee5c838 100755 --- a/tests/queries/0_stateless/01059_window_view_event_hop_watch_strict_asc.py +++ b/tests/queries/0_stateless/01059_window_view_event_hop_watch_strict_asc.py @@ -26,6 +26,8 @@ with client(name="client1>", log=log) as client1, client( client1.expect(prompt) client1.send("SET window_view_heartbeat_interval = 1") client1.expect(prompt) + client2.send("SET allow_experimental_analyzer = 0") + client2.expect(prompt) client1.send("CREATE DATABASE IF NOT EXISTS db_01059_event_hop_watch_strict_asc") client1.expect(prompt) diff --git a/tests/queries/0_stateless/01062_window_view_event_hop_watch_asc.py b/tests/queries/0_stateless/01062_window_view_event_hop_watch_asc.py index bb40b1df2f0..db9e8cef6c5 100755 --- a/tests/queries/0_stateless/01062_window_view_event_hop_watch_asc.py +++ b/tests/queries/0_stateless/01062_window_view_event_hop_watch_asc.py @@ -28,6 +28,8 @@ with client(name="client1>", log=log) as client1, client( client1.expect(prompt) client2.send("SET allow_experimental_window_view = 1") client2.expect(prompt) + client2.send("SET allow_experimental_analyzer = 0") + client2.expect(prompt) client1.send("CREATE DATABASE IF NOT EXISTS 01062_window_view_event_hop_watch_asc") client1.expect(prompt) diff --git a/tests/queries/0_stateless/01065_window_view_event_hop_watch_bounded.py b/tests/queries/0_stateless/01065_window_view_event_hop_watch_bounded.py index 7f00130b184..b8d5ff02d37 100755 --- a/tests/queries/0_stateless/01065_window_view_event_hop_watch_bounded.py +++ b/tests/queries/0_stateless/01065_window_view_event_hop_watch_bounded.py @@ -27,6 +27,8 @@ with client(name="client1>", log=log) as client1, client( client1.expect(prompt) client2.send("SET allow_experimental_window_view = 1") client2.expect(prompt) + client2.send("SET allow_experimental_analyzer = 0") + client2.expect(prompt) client1.send( "CREATE DATABASE IF NOT EXISTS 01065_window_view_event_hop_watch_bounded" diff --git a/tests/queries/0_stateless/01069_window_view_proc_tumble_watch.py b/tests/queries/0_stateless/01069_window_view_proc_tumble_watch.py index eb31b2ccbcf..21c2e831afc 100755 --- a/tests/queries/0_stateless/01069_window_view_proc_tumble_watch.py +++ b/tests/queries/0_stateless/01069_window_view_proc_tumble_watch.py @@ -28,6 +28,8 @@ with client(name="client1>", log=log) as client1, client( client1.expect(prompt) client2.send("SET allow_experimental_window_view = 1") client2.expect(prompt) + client2.send("SET allow_experimental_analyzer = 0") + client2.expect(prompt) client1.send("CREATE DATABASE 01069_window_view_proc_tumble_watch") client1.expect(prompt) diff --git a/tests/queries/0_stateless/01070_window_view_watch_events.py b/tests/queries/0_stateless/01070_window_view_watch_events.py index 8aeff041cc1..1cf7678a014 100755 --- a/tests/queries/0_stateless/01070_window_view_watch_events.py +++ b/tests/queries/0_stateless/01070_window_view_watch_events.py @@ -28,6 +28,8 @@ with client(name="client1>", log=log) as client1, client( client1.expect(prompt) client2.send("SET allow_experimental_window_view = 1") client2.expect(prompt) + client2.send("SET allow_experimental_analyzer = 0") + client2.expect(prompt) client1.send("CREATE DATABASE IF NOT EXISTS 01070_window_view_watch_events") client1.expect(prompt) diff --git a/tests/queries/0_stateless/01078_window_view_alter_query_watch.py b/tests/queries/0_stateless/01078_window_view_alter_query_watch.py index c32e508c5a5..3f3dfe0cda8 100755 --- a/tests/queries/0_stateless/01078_window_view_alter_query_watch.py +++ b/tests/queries/0_stateless/01078_window_view_alter_query_watch.py @@ -28,10 +28,14 @@ with client(name="client1>", log=log) as client1, client( client1.expect(prompt) client2.send("SET allow_experimental_window_view = 1") client2.expect(prompt) + client2.send("SET allow_experimental_analyzer = 0") + client2.expect(prompt) client3.send("SET allow_experimental_window_view = 1") client3.expect(prompt) client3.send("SET window_view_heartbeat_interval = 1") client3.expect(prompt) + client3.send("SET allow_experimental_analyzer = 0") + client3.expect(prompt) client1.send("CREATE DATABASE IF NOT EXISTS 01078_window_view_alter_query_watch") client1.expect(prompt) diff --git a/tests/queries/0_stateless/01082_window_view_watch_limit.py b/tests/queries/0_stateless/01082_window_view_watch_limit.py index 12c8d295591..9938ebcab98 100755 --- a/tests/queries/0_stateless/01082_window_view_watch_limit.py +++ b/tests/queries/0_stateless/01082_window_view_watch_limit.py @@ -27,6 +27,8 @@ with client(name="client1>", log=log) as client1, client( client1.expect(prompt) client2.send("SET allow_experimental_window_view = 1") client2.expect(prompt) + client2.send("SET allow_experimental_analyzer = 0") + client2.expect(prompt) client1.send("CREATE DATABASE IF NOT EXISTS 01082_window_view_watch_limit") client1.expect(prompt)