diff --git a/dbms/src/Storages/WindowView/AddingTimestampBlockInputStream.h b/dbms/src/Storages/WindowView/AddingTimestampBlockInputStream.h new file mode 100644 index 00000000000..a307fccc3bc --- /dev/null +++ b/dbms/src/Storages/WindowView/AddingTimestampBlockInputStream.h @@ -0,0 +1,39 @@ +#pragma once + +#include +#include + + +namespace DB +{ +/** Add timestamp column for processing time process in + * WINDOW VIEW + */ +class AddingTimestampBlockInputStream : public IBlockInputStream +{ +public: + AddingTimestampBlockInputStream(const BlockInputStreamPtr & input_, UInt32 timestamp_) : input(input_), timestamp(timestamp_) + { + cached_header = input->getHeader(); + cached_header.insert({ColumnUInt32::create(1, 1), std::make_shared(), "____timestamp"}); + } + + String getName() const override { return "AddingTimestamp"; } + + Block getHeader() const override { return cached_header.cloneEmpty(); } + +protected: + Block readImpl() override + { + Block res = input->read(); + if (res) + res.insert({ColumnUInt32::create(res.rows(), timestamp), std::make_shared(), "____timestamp"}); + return res; + } + +private: + BlockInputStreamPtr input; + Block cached_header; + UInt32 timestamp; +}; +} diff --git a/dbms/src/Storages/WindowView/StorageWindowView.cpp b/dbms/src/Storages/WindowView/StorageWindowView.cpp index 8d1c3913b45..0505bf536c8 100644 --- a/dbms/src/Storages/WindowView/StorageWindowView.cpp +++ b/dbms/src/Storages/WindowView/StorageWindowView.cpp @@ -38,9 +38,10 @@ #include #include +#include +#include #include #include -#include #include @@ -111,8 +112,8 @@ namespace std::static_pointer_cast(ptr_)->setAlias(""); auto arrayJoin = makeASTFunction("arrayJoin", ptr_); arrayJoin->alias = node.alias; - data.window_column_name = arrayJoin->getColumnName(); node_ptr = arrayJoin; + data.window_column_name = arrayJoin->getColumnName(); } else if (serializeAST(node) != serializeAST(*data.window_function)) throw Exception("WINDOW VIEW only support ONE WINDOW FUNCTION", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW); @@ -120,6 +121,43 @@ namespace } }; + class ParserProcTimeFinalMatcher + { + public: + using Visitor = InDepthNodeVisitor; + + struct Data + { + bool is_proctime_tumble = false; + String window_column_name; + }; + + static bool needChildVisit(ASTPtr &, const ASTPtr &) + { + return true; + } + + static void visit(ASTPtr & ast, Data & data) + { + if (const auto * t = ast->as()) + visit(*t, ast, data); + } + + private: + static void visit(const ASTFunction & node, ASTPtr & node_ptr, Data & data) + { + if (node.name == "TUMBLE") + { + if (const auto * t = node.arguments->children[0]->as(); t && t->name == "now") + { + data.is_proctime_tumble = true; + node_ptr->children[0]->children[0] = std::make_shared("____timestamp"); + data.window_column_name = node.getColumnName(); + } + } + } + }; + static inline IntervalKind strToIntervalKind(const String& interval_str) { if (interval_str == "Second") @@ -299,7 +337,7 @@ inline void StorageWindowView::clearInnerTable() inline void StorageWindowView::flushToTable(UInt32 timestamp_) { //write into dependent table - StoragePtr target_table = getTargetTable(); + StoragePtr target_table = getTargetStorage(); auto _blockInputStreamPtr = getNewBlocksInputStreamPtr(timestamp_); auto _lock = target_table->lockStructureForShare(true, global_context.getCurrentQueryId()); auto stream = target_table->write(getInnerQuery(), global_context); @@ -315,12 +353,21 @@ std::shared_ptr StorageWindowView::generateInnerTableCreateQuery auto new_columns_list = std::make_shared(); - auto storage = getParentStorage(); auto sample_block_ - = InterpreterSelectQuery(getInnerQuery(), global_context, storage, SelectQueryOptions(QueryProcessingStage::WithMergeableState)) + = InterpreterSelectQuery(getInnerQuery(), global_context, getParentStorage(), SelectQueryOptions(QueryProcessingStage::WithMergeableState)) .getSampleBlock(); auto columns_list = std::make_shared(); + + if (is_proctime_tumble) + { + auto column_window = std::make_shared(); + column_window->name = window_column_name; + column_window->type + = makeASTFunction("Tuple", std::make_shared("DateTime"), std::make_shared("DateTime")); + columns_list->children.push_back(column_window); + } + for (auto & column_ : sample_block_.getColumnsWithTypeAndName()) { ParserIdentifierWithOptionalParameters parser; @@ -331,10 +378,10 @@ std::shared_ptr StorageWindowView::generateInnerTableCreateQuery column_dec->type = ast; columns_list->children.push_back(column_dec); } - auto column_fire_status = std::make_shared(); - column_fire_status->name = "____w_end"; - column_fire_status->type = std::make_shared("DateTime"); - columns_list->children.push_back(column_fire_status); + auto column_wend = std::make_shared(); + column_wend->name = "____w_end"; + column_wend->type = std::make_shared("DateTime"); + columns_list->children.push_back(column_wend); new_columns_list->set(new_columns_list->columns, columns_list); manual_create_query->set(manual_create_query->columns_list, new_columns_list); @@ -569,6 +616,13 @@ StorageWindowView::StorageWindowView( select_table_id = StorageID(select_database_name, select_table_name); inner_query = innerQueryParser(select_query); + final_query = inner_query->clone(); + ParserProcTimeFinalMatcher::Data final_query_data; + ParserProcTimeFinalMatcher::Visitor(final_query_data).visit(final_query); + is_proctime_tumble = final_query_data.is_proctime_tumble; + if (is_proctime_tumble) + window_column_name = final_query_data.window_column_name; + /// If the table is not specified - use the table `system.one` if (select_table_name.empty()) { @@ -581,7 +635,6 @@ StorageWindowView::StorageWindowView( if (!query.to_table.empty()) target_table_id = StorageID(query.to_database, query.to_table); - is_temporary = query.temporary; inner_table_clear_interval = local_context.getSettingsRef().window_view_inner_table_clean_interval.totalSeconds(); mergeable_blocks = std::make_shared>(); @@ -590,6 +643,9 @@ StorageWindowView::StorageWindowView( if (query.watermark_function) { + if (is_proctime_tumble) + throw Exception("WATERMARK is not support for Processing time processing.", ErrorCodes::INCORRECT_QUERY); + // parser watermark function const auto & watermark_function = std::static_pointer_cast(query.watermark_function); if (!startsWith(watermark_function->name, "toInterval")) @@ -603,7 +659,7 @@ StorageWindowView::StorageWindowView( { watermark_num_units = boost::lexical_cast(interval_units_p1->value.get()); } - catch (const boost::bad_lexical_cast & exec) + catch (const boost::bad_lexical_cast &) { throw Exception("Cannot parse string '" + interval_units_p1->value.get() + "' as Integer.", ErrorCodes::CANNOT_PARSE_TEXT); } @@ -661,8 +717,6 @@ ASTPtr StorageWindowView::innerQueryParser(ASTSelectQuery & query) // parser window function ASTFunction & window_function = typeid_cast(*stageMergeableOneData.window_function); const auto & arguments = window_function.arguments->children; - const auto & arg0 = std::static_pointer_cast(arguments.at(0)); - timestamp_column_name = arg0->IAST::getAliasOrColumnName(); const auto & arg1 = std::static_pointer_cast(arguments.at(1)); if (!arg1 || !startsWith(arg1->name, "toInterval")) throw Exception("Illegal type of last argument of function " + arg1->name + " should be Interval", ErrorCodes::ILLEGAL_COLUMN); @@ -678,12 +732,17 @@ ASTPtr StorageWindowView::innerQueryParser(ASTSelectQuery & query) void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, const Block & block, const Context & context) { UInt32 timestamp_now = std::time(nullptr); + auto block_stream = std::make_shared(block); + BlockInputStreams streams; + if (window_view.is_proctime_tumble) + streams = {std::make_shared(block_stream, timestamp_now)}; + else + streams = {block_stream}; - BlockInputStreams streams = {std::make_shared(block)}; auto window_proxy_storage = std::make_shared( StorageID("", "WindowViewProxyStorage"), window_view.getParentStorage(), std::move(streams), QueryProcessingStage::FetchColumns); InterpreterSelectQuery select_block( - window_view.getInnerQuery(), context, window_proxy_storage, QueryProcessingStage::WithMergeableState); + window_view.getFinalQuery(), context, window_proxy_storage, QueryProcessingStage::WithMergeableState); auto data_mergeable_stream = std::make_shared(select_block.execute().in); @@ -701,35 +760,35 @@ void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, con actions_->add(ExpressionAction::removeColumn("____tuple_arg")); BlockInputStreamPtr in_stream; - if (window_view.watermark_num_units != 0) - { - UInt32 watermark = window_view.getWatermark(timestamp_now); - actions_->add( - ExpressionAction::addColumn({std::make_shared()->createColumnConst(1, toField(watermark)), std::make_shared(), "____watermark"})); - const auto & function_greater = FunctionFactory::instance().get("greaterOrEquals", context); - actions_->add(ExpressionAction::applyFunction(function_greater, Names{"____w_end", "____watermark"}, "____filter")); - actions_->add(ExpressionAction::removeColumn("____watermark")); - in_stream = std::make_shared(data_mergeable_stream, actions_, "____filter", true); - } - else - in_stream = std::make_shared(data_mergeable_stream, actions_); + UInt32 watermark = window_view.getWatermark(timestamp_now); + actions_->add(ExpressionAction::addColumn({std::make_shared()->createColumnConst(1, toField(watermark)), + std::make_shared(), + "____watermark"})); + const auto & function_greater = FunctionFactory::instance().get("greaterOrEquals", context); + actions_->add(ExpressionAction::applyFunction(function_greater, Names{"____w_end", "____watermark"}, "____filter")); + actions_->add(ExpressionAction::removeColumn("____watermark")); + in_stream = std::make_shared(data_mergeable_stream, actions_, "____filter", true); if (!window_view.inner_table_id.empty()) { auto stream = window_view.getInnerStorage()->write(window_view.getInnerQuery(), context); - if (window_view.watermark_num_units != 0) + if (window_view.is_proctime_tumble) + { + std::unique_lock lock(window_view.fire_signal_mutex); + copyData(*in_stream, *stream); + } + else if (window_view.watermark_num_units != 0) { while (Block block_ = in_stream->read()) { + auto column_wend = block_.getByName("____w_end").column; stream->write(std::move(block_)); const ColumnUInt32::Container & wend_data - = static_cast(*block_.getByName("____w_end").column.get()).getData(); + = static_cast(*column_wend).getData(); for (size_t i = 0; i < wend_data.size(); ++i) { if (wend_data[i] < timestamp_now) - { window_view.addFireSignal(wend_data[i]); - } } } } @@ -739,13 +798,20 @@ void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, con else { BlocksListPtr new_mergeable_blocks = std::make_shared(); + if (window_view.is_proctime_tumble) + { + std::unique_lock lock(window_view.fire_signal_mutex); + while (Block block_ = in_stream->read()) + new_mergeable_blocks->push_back(std::move(block_)); + } if (window_view.watermark_num_units != 0) { while (Block block_ = in_stream->read()) { + auto column_wend = block_.getByName("____w_end").column; new_mergeable_blocks->push_back(std::move(block_)); const ColumnUInt32::Container & wend_data - = static_cast(*block_.getByName("____w_end").column.get()).getData(); + = static_cast(*column_wend).getData(); for (size_t i = 0; i < wend_data.size(); ++i) { if (wend_data[i] < timestamp_now) @@ -766,16 +832,6 @@ void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, con } } -StoragePtr StorageWindowView::getTargetTable() const -{ - return global_context.getTable(target_table_id); -} - -StoragePtr StorageWindowView::tryGetTargetTable() const -{ - return global_context.tryGetTable(target_table_id); -} - void StorageWindowView::startup() { // Start the working thread @@ -802,45 +858,41 @@ StorageWindowView::~StorageWindowView() BlockInputStreamPtr StorageWindowView::getNewBlocksInputStreamPtr(UInt32 timestamp_) { + BlockInputStreamPtr stream; + if (!inner_table_id.empty()) - return getNewBlocksInputStreamPtrInnerTable(timestamp_); + { + auto & storage = getInnerStorage(); + InterpreterSelectQuery fetch(fetch_column_query, global_context, storage, SelectQueryOptions(QueryProcessingStage::FetchColumns)); - if (mergeable_blocks->empty()) - return {std::make_shared(getHeader())}; + ColumnsWithTypeAndName columns_; + columns_.emplace_back(nullptr, std::make_shared(), "____w_end"); - BlockInputStreams from; - auto sample_block_ = mergeable_blocks->front()->front().cloneEmpty(); - BlockInputStreamPtr stream = std::make_shared(mergeable_blocks, sample_block_, timestamp_); - from.push_back(std::move(stream)); - auto proxy_storage = std::make_shared( - StorageID("", "WindowViewProxyStorage"), getParentStorage(), std::move(from), QueryProcessingStage::WithMergeableState); - - InterpreterSelectQuery select(getInnerQuery(), global_context, proxy_storage, QueryProcessingStage::Complete); - BlockInputStreamPtr data = std::make_shared(select.execute().in); - return data; -} - -BlockInputStreamPtr StorageWindowView::getNewBlocksInputStreamPtrInnerTable(UInt32 timestamp_) -{ - auto & storage = getInnerStorage(); - InterpreterSelectQuery fetch(fetch_column_query, global_context, storage, SelectQueryOptions(QueryProcessingStage::FetchColumns)); - - ColumnsWithTypeAndName columns_; - columns_.emplace_back(nullptr, std::make_shared(), "____w_end"); - - ExpressionActionsPtr actions_ = std::make_shared(columns_, global_context); - actions_->add(ExpressionAction::addColumn({std::make_shared()->createColumnConst(1, toField(timestamp_)), std::make_shared(), "____w_end_now"})); - const auto & function_equals = FunctionFactory::instance().get("equals", global_context); - ExpressionActionsPtr apply_function_actions = std::make_shared(columns_, global_context); - actions_->add(ExpressionAction::applyFunction(function_equals, Names{"____w_end", "____w_end_now"}, "____filter")); - auto stream = std::make_shared(fetch.execute().in, actions_, "____filter", true); + ExpressionActionsPtr actions_ = std::make_shared(columns_, global_context); + actions_->add(ExpressionAction::addColumn({std::make_shared()->createColumnConst(1, toField(timestamp_)), + std::make_shared(), + "____timestamp_now"})); + const auto & function_equals = FunctionFactory::instance().get("equals", global_context); + ExpressionActionsPtr apply_function_actions = std::make_shared(columns_, global_context); + actions_->add(ExpressionAction::applyFunction(function_equals, Names{"____w_end", "____timestamp_now"}, "____filter")); + actions_->add(ExpressionAction::removeColumn("____w_end")); + actions_->add(ExpressionAction::removeColumn("____timestamp_now")); + stream = std::make_shared(fetch.execute().in, actions_, "____filter", true); + } + else + { + if (mergeable_blocks->empty()) + return std::make_shared(getHeader()); + auto sample_block_ = mergeable_blocks->front()->front().cloneEmpty(); + stream = std::make_shared(mergeable_blocks, sample_block_, timestamp_); + } BlockInputStreams from; from.push_back(std::move(stream)); auto proxy_storage = std::make_shared( StorageID("", "WindowViewProxyStorage"), getParentStorage(), std::move(from), QueryProcessingStage::WithMergeableState); - InterpreterSelectQuery select(getInnerQuery(), global_context, proxy_storage, QueryProcessingStage::Complete); + InterpreterSelectQuery select(getFinalQuery(), global_context, proxy_storage, QueryProcessingStage::Complete); BlockInputStreamPtr data = std::make_shared(select.execute().in); return data; } diff --git a/dbms/src/Storages/WindowView/StorageWindowView.h b/dbms/src/Storages/WindowView/StorageWindowView.h index b547982299a..854e666710c 100644 --- a/dbms/src/Storages/WindowView/StorageWindowView.h +++ b/dbms/src/Storages/WindowView/StorageWindowView.h @@ -25,21 +25,11 @@ public: ~StorageWindowView() override; String getName() const override { return "WindowView"; } - ASTPtr getInnerQuery() const { return inner_query->clone(); } - - /// It is passed inside the query and solved at its level. bool supportsSampling() const override { return true; } bool supportsFinal() const override { return true; } - bool isTemporary() { return is_temporary; } - - bool hasActiveUsers() { return active_ptr.use_count() > 1; } - void checkTableCanBeDropped() const override; - StoragePtr getTargetTable() const; - StoragePtr tryGetTargetTable() const; - void drop(TableStructureWriteLockHolder &) override; void startup() override; @@ -56,14 +46,70 @@ public: BlocksListPtrs getMergeableBlocksList() { return mergeable_blocks; } std::shared_ptr getActivePtr() { return active_ptr; } - /// Read new data blocks that store query result BlockInputStreamPtr getNewBlocksInputStreamPtr(UInt32 timestamp_); - BlockInputStreamPtr getNewBlocksInputStreamPtrInnerTable(UInt32 timestamp_); + static void writeIntoWindowView(StorageWindowView & window_view, const Block & block, const Context & context); - BlocksPtr getNewBlocks(); +private: + ASTPtr inner_query; + ASTPtr final_query; + ASTPtr fetch_column_query; + + Context & global_context; + bool is_proctime_tumble{false}; + std::atomic shutdown_called{false}; + mutable Block sample_block; + UInt64 inner_table_clear_interval; + const DateLUTImpl & time_zone; + std::list fire_signal; + std::list watch_streams; + std::condition_variable condition; + BlocksListPtrs mergeable_blocks; + + /// Mutex for the blocks and ready condition + std::mutex mutex; + std::mutex flush_table_mutex; + std::mutex fire_signal_mutex; + std::mutex proc_time_signal_mutex; + + /// Active users + std::shared_ptr active_ptr; + + IntervalKind::Kind window_kind; + IntervalKind::Kind watermark_kind; + Int64 window_num_units; + Int64 watermark_num_units = 0; + String window_column_name; + + StorageID select_table_id = StorageID::createEmpty(); + StorageID target_table_id = StorageID::createEmpty(); + StorageID inner_table_id = StorageID::createEmpty(); + StoragePtr parent_storage; + StoragePtr inner_storage; + StoragePtr target_storage; + + BackgroundSchedulePool::TaskHolder toTableTask; + BackgroundSchedulePool::TaskHolder innerTableClearTask; + BackgroundSchedulePool::TaskHolder fireTask; + + ASTPtr innerQueryParser(ASTSelectQuery & inner_query); + + std::shared_ptr generateInnerTableCreateQuery(const ASTCreateQuery & inner_create_query, const String & database_name, const String & table_name); + + UInt32 getWindowLowerBound(UInt32 time_sec, int window_id_skew = 0); + UInt32 getWindowUpperBound(UInt32 time_sec, int window_id_skew = 0); + UInt32 getWatermark(UInt32 time_sec); Block getHeader() const; + void flushToTable(UInt32 timestamp_); + void clearInnerTable(); + void threadFuncToTable(); + void threadFuncClearInnerTable(); + void threadFuncFire(); + void addFireSignal(UInt32 timestamp_); + + ASTPtr getInnerQuery() const { return inner_query->clone(); } + ASTPtr getFinalQuery() const { return final_query->clone(); } StoragePtr& getParentStorage() { @@ -79,68 +125,12 @@ public: return inner_storage; } - static void writeIntoWindowView(StorageWindowView & window_view, const Block & block, const Context & context); - - static void writeIntoWindowViewInnerTable(StorageWindowView & window_view, const Block & block, const Context & context); - - ASTPtr innerQueryParser(ASTSelectQuery & inner_query); - - std::shared_ptr generateInnerTableCreateQuery(const ASTCreateQuery & inner_create_query, const String & database_name, const String & table_name); - - inline UInt32 getWindowLowerBound(UInt32 time_sec, int window_id_skew = 0); - inline UInt32 getWindowUpperBound(UInt32 time_sec, int window_id_skew = 0); - inline UInt32 getWatermark(UInt32 time_sec); - -private: - StorageID select_table_id = StorageID::createEmpty(); - ASTPtr inner_query; - ASTPtr fetch_column_query; - String window_column_name; - String timestamp_column_name; - Context & global_context; - StoragePtr parent_storage; - StoragePtr inner_storage; - bool is_temporary{false}; - mutable Block sample_block; - - /// Mutex for the blocks and ready condition - std::mutex mutex; - std::mutex flush_table_mutex; - std::mutex fire_signal_mutex; - std::list fire_signal; - std::list watch_streams; - /// New blocks ready condition to broadcast to readers - /// that new blocks are available - std::condition_variable condition; - - /// Active users - std::shared_ptr active_ptr; - BlocksListPtrs mergeable_blocks; - - IntervalKind::Kind window_kind; - Int64 window_num_units; - IntervalKind::Kind watermark_kind; - Int64 watermark_num_units = 0; - const DateLUTImpl & time_zone; - - StorageID target_table_id = StorageID::createEmpty(); - StorageID inner_table_id = StorageID::createEmpty(); - - void flushToTable(UInt32 timestamp_); - void clearInnerTable(); - void threadFuncToTable(); - void threadFuncClearInnerTable(); - void threadFuncFire(); - void addFireSignal(UInt32 timestamp_); - - std::atomic shutdown_called{false}; - UInt64 inner_table_clear_interval; - - Poco::Timestamp timestamp; - - BackgroundSchedulePool::TaskHolder toTableTask; - BackgroundSchedulePool::TaskHolder innerTableClearTask; - BackgroundSchedulePool::TaskHolder fireTask; + StoragePtr& getTargetStorage() + { + if (target_storage == nullptr && !target_table_id.empty()) + target_storage = global_context.getTable(target_table_id); + return target_storage; + } StorageWindowView( const StorageID & table_id_, diff --git a/dbms/src/Storages/WindowView/WindowViewProxyStorage.h b/dbms/src/Storages/WindowView/WindowViewProxyStorage.h index b2529f861b7..e449b5a9f0c 100644 --- a/dbms/src/Storages/WindowView/WindowViewProxyStorage.h +++ b/dbms/src/Storages/WindowView/WindowViewProxyStorage.h @@ -4,6 +4,7 @@ #include #include #include +#include #include namespace DB @@ -12,11 +13,25 @@ namespace DB class WindowViewProxyStorage : public IStorage { public: + WindowViewProxyStorage(const StorageID & table_id_, StoragePtr parent_storage_, QueryProcessingStage::Enum to_stage_) + : IStorage(table_id_) + , parent_storage(std::move(parent_storage_)) + , streams({std::make_shared(Block())}) + , to_stage(to_stage_) + { + column_des = parent_storage->getColumns(); + column_des.add({"____timestamp", std::make_shared(), false}); + } + WindowViewProxyStorage(const StorageID & table_id_, StoragePtr parent_storage_, BlockInputStreams streams_, QueryProcessingStage::Enum to_stage_) : IStorage(table_id_) , parent_storage(std::move(parent_storage_)) , streams(std::move(streams_)) - , to_stage(to_stage_) {} + , to_stage(to_stage_) + { + column_des = parent_storage->getColumns(); + column_des.add({"____timestamp", std::make_shared(), false}); + } public: std::string getName() const override { return "WindowViewProxyStorage(" + parent_storage->getName() + ")"; } @@ -53,17 +68,29 @@ public: Names getColumnsRequiredForSampling() const override { return parent_storage->getColumnsRequiredForSampling(); } Names getColumnsRequiredForFinal() const override { return parent_storage->getColumnsRequiredForFinal(); } - const ColumnsDescription & getColumns() const override { return parent_storage->getColumns(); } + const ColumnsDescription & getColumns() const override { return column_des; } void setColumns(ColumnsDescription columns_) override { return parent_storage->setColumns(columns_); } - NameAndTypePair getColumn(const String & column_name) const override { return parent_storage->getColumn(column_name); } + NameAndTypePair getColumn(const String & column_name) const override + { + if (column_name == "____timestamp") + return {"____timestamp", std::shared_ptr()}; + return parent_storage->getColumn(column_name); + } - bool hasColumn(const String & column_name) const override { return parent_storage->hasColumn(column_name); } + bool hasColumn(const String & column_name) const override + { + if (column_name == "____timestamp") + return true; + return parent_storage->hasColumn(column_name); + } private: StoragePtr parent_storage; BlockInputStreams streams; + String window_column_name; + ColumnsDescription column_des; QueryProcessingStage::Enum to_stage; }; } diff --git a/dbms/tests/queries/0_stateless/01052_window_view_event_time_watch.reference b/dbms/tests/queries/0_stateless/01052_window_view_event_time_watch.reference new file mode 100644 index 00000000000..048630d233f --- /dev/null +++ b/dbms/tests/queries/0_stateless/01052_window_view_event_time_watch.reference @@ -0,0 +1,10 @@ +--TUMBLE-- +1 +--HOP-- +1 +1 +--INNER_TUMBLE-- +1 +--INNER_HOP-- +1 +1 diff --git a/dbms/tests/queries/0_stateless/01052_window_view_event_time_watch.sql b/dbms/tests/queries/0_stateless/01052_window_view_event_time_watch.sql new file mode 100644 index 00000000000..ca29f1a36e6 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01052_window_view_event_time_watch.sql @@ -0,0 +1,35 @@ +SET allow_experimental_window_view = 1; + +DROP TABLE IF EXISTS test.mt; + +SELECT '--TUMBLE--'; +DROP TABLE IF EXISTS test.wv; +CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple(); +CREATE WINDOW VIEW test.wv WATERMARK INTERVAL '1' SECOND AS SELECT count(a) FROM test.mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid; + +INSERT INTO test.mt VALUES (1, now() + INTERVAL '1' SECOND); +WATCH test.wv LIMIT 1; + +SELECT '--HOP--'; +DROP TABLE IF EXISTS test.wv; +CREATE WINDOW VIEW test.wv WATERMARK INTERVAL '1' SECOND AS SELECT count(a) FROM test.mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid; + +INSERT INTO test.mt VALUES (1, now() + INTERVAL '1' SECOND); +WATCH test.wv LIMIT 2; + +SELECT '--INNER_TUMBLE--'; +DROP TABLE IF EXISTS test.wv; +CREATE WINDOW VIEW test.wv ENGINE=MergeTree ORDER BY tuple() WATERMARK INTERVAL '1' SECOND AS SELECT count(a) FROM test.mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid; + +INSERT INTO test.mt VALUES (1, now() + INTERVAL '1' SECOND); +WATCH test.wv LIMIT 1; + +SELECT '--INNER_HOP--'; +DROP TABLE IF EXISTS test.wv; +CREATE WINDOW VIEW test.wv ENGINE=MergeTree ORDER BY tuple() WATERMARK INTERVAL '1' SECOND AS SELECT count(a) FROM test.mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid; + +INSERT INTO test.mt VALUES (1, now() + INTERVAL '1' SECOND); +WATCH test.wv LIMIT 2; + +DROP TABLE test.wv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/01052_window_view_watch_tumble.reference b/dbms/tests/queries/0_stateless/01052_window_view_watch_tumble.reference deleted file mode 100644 index d00491fd7e5..00000000000 --- a/dbms/tests/queries/0_stateless/01052_window_view_watch_tumble.reference +++ /dev/null @@ -1 +0,0 @@ -1 diff --git a/dbms/tests/queries/0_stateless/01052_window_view_watch_tumble.sql b/dbms/tests/queries/0_stateless/01052_window_view_watch_tumble.sql deleted file mode 100644 index dfa2ef9d1a6..00000000000 --- a/dbms/tests/queries/0_stateless/01052_window_view_watch_tumble.sql +++ /dev/null @@ -1,13 +0,0 @@ -SET allow_experimental_window_view = 1; - -DROP TABLE IF EXISTS test.wv; -DROP TABLE IF EXISTS test.mt; - -CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple(); -CREATE WINDOW VIEW test.wv AS SELECT count(a) FROM test.mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid; - -INSERT INTO test.mt VALUES (1, now()); -WATCH test.wv LIMIT 1; - -DROP TABLE test.wv; -DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/01053_window_view_event_time_to.reference b/dbms/tests/queries/0_stateless/01053_window_view_event_time_to.reference new file mode 100644 index 00000000000..670c8eb78c8 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01053_window_view_event_time_to.reference @@ -0,0 +1,14 @@ +--TUMBLE-- +0 +1 +--HOP-- +0 +1 +1 +--INNER_TUMBLE-- +0 +1 +--INNER_HOP-- +0 +1 +1 diff --git a/dbms/tests/queries/0_stateless/01053_window_view_event_time_to.sql b/dbms/tests/queries/0_stateless/01053_window_view_event_time_to.sql new file mode 100644 index 00000000000..fa93cffead1 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01053_window_view_event_time_to.sql @@ -0,0 +1,47 @@ +SET allow_experimental_window_view = 1; + +DROP TABLE IF EXISTS test.mt; +DROP TABLE IF EXISTS test.dst; +DROP TABLE IF EXISTS test.`.inner.wv`; + +CREATE TABLE test.dst(count UInt64) Engine=MergeTree ORDER BY tuple(); + +SELECT '--TUMBLE--'; +DROP TABLE IF EXISTS test.wv; +TRUNCATE TABLE test.dst; +CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple(); +CREATE WINDOW VIEW test.wv TO test.dst WATERMARK INTERVAL '1' SECOND AS SELECT count(a) AS count FROM test.mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid; + +INSERT INTO test.mt VALUES (1, now()); +SELECT sleep(2); +SELECT count FROM test.dst; + +SELECT '--HOP--'; +DROP TABLE IF EXISTS test.wv; +TRUNCATE TABLE test.dst; +CREATE WINDOW VIEW test.wv TO test.dst WATERMARK INTERVAL '1' SECOND AS SELECT count(a) AS count FROM test.mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid; + +INSERT INTO test.mt VALUES (1, now()); +SELECT sleep(2); +SELECT count FROM test.dst; + +SELECT '--INNER_TUMBLE--'; +DROP TABLE IF EXISTS test.wv; +TRUNCATE TABLE test.dst; +CREATE WINDOW VIEW test.wv TO test.dst ENGINE=MergeTree ORDER BY tuple() WATERMARK INTERVAL '1' SECOND AS SELECT count(a) AS count FROM test.mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid; + +INSERT INTO test.mt VALUES (1, now()); +SELECT sleep(2); +SELECT count FROM test.dst; + +SELECT '--INNER_HOP--'; +DROP TABLE IF EXISTS test.wv; +TRUNCATE TABLE test.dst; +CREATE WINDOW VIEW test.wv TO test.dst ENGINE=MergeTree ORDER BY tuple() WATERMARK INTERVAL '1' SECOND AS SELECT count(a) AS count FROM test.mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid; + +INSERT INTO test.mt VALUES (1, now()); +SELECT sleep(2); +SELECT count FROM test.dst; + +DROP TABLE test.wv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/01053_window_view_watch_hop.reference b/dbms/tests/queries/0_stateless/01053_window_view_watch_hop.reference deleted file mode 100644 index 6ed281c757a..00000000000 --- a/dbms/tests/queries/0_stateless/01053_window_view_watch_hop.reference +++ /dev/null @@ -1,2 +0,0 @@ -1 -1 diff --git a/dbms/tests/queries/0_stateless/01053_window_view_watch_hop.sql b/dbms/tests/queries/0_stateless/01053_window_view_watch_hop.sql deleted file mode 100644 index 1ddd273c103..00000000000 --- a/dbms/tests/queries/0_stateless/01053_window_view_watch_hop.sql +++ /dev/null @@ -1,13 +0,0 @@ -SET allow_experimental_window_view = 1; - -DROP TABLE IF EXISTS test.wv; -DROP TABLE IF EXISTS test.mt; - -CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple(); -CREATE WINDOW VIEW test.wv AS SELECT count(a) FROM test.mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid; - -INSERT INTO test.mt VALUES (1, now()); -WATCH test.wv LIMIT 2; - -DROP TABLE test.wv; -DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/01054_window_view_inner_watch_tumble.reference b/dbms/tests/queries/0_stateless/01054_window_view_inner_watch_tumble.reference deleted file mode 100644 index d00491fd7e5..00000000000 --- a/dbms/tests/queries/0_stateless/01054_window_view_inner_watch_tumble.reference +++ /dev/null @@ -1 +0,0 @@ -1 diff --git a/dbms/tests/queries/0_stateless/01054_window_view_inner_watch_tumble.sql b/dbms/tests/queries/0_stateless/01054_window_view_inner_watch_tumble.sql deleted file mode 100644 index 0d7c94047f1..00000000000 --- a/dbms/tests/queries/0_stateless/01054_window_view_inner_watch_tumble.sql +++ /dev/null @@ -1,13 +0,0 @@ -SET allow_experimental_window_view = 1; - -DROP TABLE IF EXISTS test.wv; -DROP TABLE IF EXISTS test.mt; - -CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple(); -CREATE WINDOW VIEW test.wv ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) FROM test.mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid; - -INSERT INTO test.mt VALUES (1, now()); -WATCH test.wv LIMIT 1; - -DROP TABLE test.wv; -DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/01054_window_view_proc_time_watch.reference b/dbms/tests/queries/0_stateless/01054_window_view_proc_time_watch.reference new file mode 100644 index 00000000000..048630d233f --- /dev/null +++ b/dbms/tests/queries/0_stateless/01054_window_view_proc_time_watch.reference @@ -0,0 +1,10 @@ +--TUMBLE-- +1 +--HOP-- +1 +1 +--INNER_TUMBLE-- +1 +--INNER_HOP-- +1 +1 diff --git a/dbms/tests/queries/0_stateless/01054_window_view_proc_time_watch.sql b/dbms/tests/queries/0_stateless/01054_window_view_proc_time_watch.sql new file mode 100644 index 00000000000..926ae776ce5 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01054_window_view_proc_time_watch.sql @@ -0,0 +1,35 @@ +SET allow_experimental_window_view = 1; + +DROP TABLE IF EXISTS test.mt; +CREATE TABLE test.mt(a Int32) ENGINE=MergeTree ORDER BY tuple(); + +SELECT '--TUMBLE--'; +DROP TABLE IF EXISTS test.wv; +CREATE WINDOW VIEW test.wv AS SELECT count(a) FROM test.mt GROUP BY TUMBLE(now(), INTERVAL '1' SECOND) AS wid; + +INSERT INTO test.mt VALUES (1); +WATCH test.wv LIMIT 1; + +SELECT '--HOP--'; +DROP TABLE IF EXISTS test.wv; +CREATE WINDOW VIEW test.wv AS SELECT count(a) FROM test.mt GROUP BY HOP(now(), INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid; + +INSERT INTO test.mt VALUES (1); +WATCH test.wv LIMIT 2; + +SELECT '--INNER_TUMBLE--'; +DROP TABLE IF EXISTS test.wv; +CREATE WINDOW VIEW test.wv ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) FROM test.mt GROUP BY TUMBLE(now(), INTERVAL '1' SECOND) AS wid; + +INSERT INTO test.mt VALUES (1); +WATCH test.wv LIMIT 1; + +SELECT '--INNER_HOP--'; +DROP TABLE IF EXISTS test.wv; +CREATE WINDOW VIEW test.wv ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) FROM test.mt GROUP BY HOP(now(), INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid; + +INSERT INTO test.mt VALUES (1); +WATCH test.wv LIMIT 2; + +DROP TABLE test.wv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/01055_window_view_inner_watch_hop.reference b/dbms/tests/queries/0_stateless/01055_window_view_inner_watch_hop.reference deleted file mode 100644 index 6ed281c757a..00000000000 --- a/dbms/tests/queries/0_stateless/01055_window_view_inner_watch_hop.reference +++ /dev/null @@ -1,2 +0,0 @@ -1 -1 diff --git a/dbms/tests/queries/0_stateless/01055_window_view_inner_watch_hop.sql b/dbms/tests/queries/0_stateless/01055_window_view_inner_watch_hop.sql deleted file mode 100644 index 35cccdaf0b8..00000000000 --- a/dbms/tests/queries/0_stateless/01055_window_view_inner_watch_hop.sql +++ /dev/null @@ -1,13 +0,0 @@ -SET allow_experimental_window_view = 1; - -DROP TABLE IF EXISTS test.wv; -DROP TABLE IF EXISTS test.mt; - -CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple(); -CREATE WINDOW VIEW test.wv ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) FROM test.mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid; - -INSERT INTO test.mt VALUES (1, now()); -WATCH test.wv LIMIT 2; - -DROP TABLE test.wv; -DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/01055_window_view_proc_time_to.reference b/dbms/tests/queries/0_stateless/01055_window_view_proc_time_to.reference new file mode 100644 index 00000000000..670c8eb78c8 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01055_window_view_proc_time_to.reference @@ -0,0 +1,14 @@ +--TUMBLE-- +0 +1 +--HOP-- +0 +1 +1 +--INNER_TUMBLE-- +0 +1 +--INNER_HOP-- +0 +1 +1 diff --git a/dbms/tests/queries/0_stateless/01055_window_view_proc_time_to.sql b/dbms/tests/queries/0_stateless/01055_window_view_proc_time_to.sql new file mode 100644 index 00000000000..38373e9af1c --- /dev/null +++ b/dbms/tests/queries/0_stateless/01055_window_view_proc_time_to.sql @@ -0,0 +1,47 @@ +SET allow_experimental_window_view = 1; + +DROP TABLE IF EXISTS test.mt; +DROP TABLE IF EXISTS test.dst; +DROP TABLE IF EXISTS test.`.inner.wv`; + +CREATE TABLE test.dst(count UInt64) Engine=MergeTree ORDER BY tuple(); +CREATE TABLE test.mt(a Int32) ENGINE=MergeTree ORDER BY tuple(); + +SELECT '--TUMBLE--'; +DROP TABLE IF EXISTS test.wv; +TRUNCATE TABLE test.dst; +CREATE WINDOW VIEW test.wv TO test.dst AS SELECT count(a) AS count FROM test.mt GROUP BY TUMBLE(now(), INTERVAL '1' SECOND) AS wid; + +INSERT INTO test.mt VALUES (1); +SELECT sleep(1); +SELECT count FROM test.dst; + +SELECT '--HOP--'; +DROP TABLE IF EXISTS test.wv; +TRUNCATE TABLE test.dst; +CREATE WINDOW VIEW test.wv TO test.dst AS SELECT count(a) AS count FROM test.mt GROUP BY HOP(now(), INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid; + +INSERT INTO test.mt VALUES (1); +SELECT sleep(2); +SELECT count FROM test.dst; + +SELECT '--INNER_TUMBLE--'; +DROP TABLE IF EXISTS test.wv; +TRUNCATE TABLE test.dst; +CREATE WINDOW VIEW test.wv TO test.dst ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) AS count FROM test.mt GROUP BY TUMBLE(now(), INTERVAL '1' SECOND) AS wid; + +INSERT INTO test.mt VALUES (1); +SELECT sleep(1); +SELECT count FROM test.dst; + +SELECT '--INNER_HOP--'; +DROP TABLE IF EXISTS test.wv; +TRUNCATE TABLE test.dst; +CREATE WINDOW VIEW test.wv TO test.dst ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) AS count FROM test.mt GROUP BY HOP(now(), INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid; + +INSERT INTO test.mt VALUES (1); +SELECT sleep(2); +SELECT count FROM test.dst; + +DROP TABLE test.wv; +DROP TABLE test.mt; diff --git a/dbms/tests/queries/0_stateless/01056_window_view_to_tumble.reference b/dbms/tests/queries/0_stateless/01056_window_view_to_tumble.reference deleted file mode 100644 index 0d66ea1aee9..00000000000 --- a/dbms/tests/queries/0_stateless/01056_window_view_to_tumble.reference +++ /dev/null @@ -1,2 +0,0 @@ -0 -1 diff --git a/dbms/tests/queries/0_stateless/01056_window_view_to_tumble.sql b/dbms/tests/queries/0_stateless/01056_window_view_to_tumble.sql deleted file mode 100644 index 1169ae06eb4..00000000000 --- a/dbms/tests/queries/0_stateless/01056_window_view_to_tumble.sql +++ /dev/null @@ -1,18 +0,0 @@ -SET allow_experimental_window_view = 1; - -DROP TABLE IF EXISTS test.wv; -DROP TABLE IF EXISTS test.mt; -DROP TABLE IF EXISTS test.dst; -DROP TABLE IF EXISTS test.`.inner.wv`; - -CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple(); -CREATE TABLE test.dst(count UInt64) Engine=MergeTree ORDER BY tuple(); -CREATE WINDOW VIEW test.wv TO test.dst AS SELECT count(a) AS count FROM test.mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid; - -INSERT INTO test.mt VALUES (1, now()); -SELECT sleep(1); -SELECT count FROM test.dst; - -DROP TABLE test.wv; -DROP TABLE test.mt; -DROP TABLE test.dst; \ No newline at end of file diff --git a/dbms/tests/queries/0_stateless/01057_window_view_to_hop.reference b/dbms/tests/queries/0_stateless/01057_window_view_to_hop.reference deleted file mode 100644 index 986394f7c0f..00000000000 --- a/dbms/tests/queries/0_stateless/01057_window_view_to_hop.reference +++ /dev/null @@ -1,3 +0,0 @@ -0 -1 -1 diff --git a/dbms/tests/queries/0_stateless/01057_window_view_to_hop.sql b/dbms/tests/queries/0_stateless/01057_window_view_to_hop.sql deleted file mode 100644 index a6227913d9d..00000000000 --- a/dbms/tests/queries/0_stateless/01057_window_view_to_hop.sql +++ /dev/null @@ -1,18 +0,0 @@ -SET allow_experimental_window_view = 1; - -DROP TABLE IF EXISTS test.wv; -DROP TABLE IF EXISTS test.mt; -DROP TABLE IF EXISTS test.dst; -DROP TABLE IF EXISTS test.`.inner.wv`; - -CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple(); -CREATE TABLE test.dst(count UInt64) ENGINE=MergeTree ORDER BY tuple(); -CREATE WINDOW VIEW test.wv TO test.dst AS SELECT count(a) AS count FROM test.mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid; - -INSERT INTO test.mt VALUES (1, now()); -SELECT sleep(3); -SELECT count FROM test.dst; - -DROP TABLE test.wv; -DROP TABLE test.mt; -DROP TABLE test.dst; \ No newline at end of file diff --git a/dbms/tests/queries/0_stateless/01058_window_view_inner_to_tumble.reference b/dbms/tests/queries/0_stateless/01058_window_view_inner_to_tumble.reference deleted file mode 100644 index 0d66ea1aee9..00000000000 --- a/dbms/tests/queries/0_stateless/01058_window_view_inner_to_tumble.reference +++ /dev/null @@ -1,2 +0,0 @@ -0 -1 diff --git a/dbms/tests/queries/0_stateless/01058_window_view_inner_to_tumble.sql b/dbms/tests/queries/0_stateless/01058_window_view_inner_to_tumble.sql deleted file mode 100644 index 1f4a580b5b3..00000000000 --- a/dbms/tests/queries/0_stateless/01058_window_view_inner_to_tumble.sql +++ /dev/null @@ -1,18 +0,0 @@ -SET allow_experimental_window_view = 1; - -DROP TABLE IF EXISTS test.wv; -DROP TABLE IF EXISTS test.mt; -DROP TABLE IF EXISTS test.dst; -DROP TABLE IF EXISTS test.`.inner.wv`; - -CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple(); -CREATE TABLE test.dst(count UInt64) ENGINE=MergeTree ORDER BY tuple(); -CREATE WINDOW VIEW test.wv TO test.dst ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) AS count FROM test.mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid; - -INSERT INTO test.mt VALUES (1, now()); -SELECT sleep(1); -SELECT count FROM test.dst; - -DROP TABLE test.wv; -DROP TABLE test.mt; -DROP TABLE test.dst; \ No newline at end of file diff --git a/dbms/tests/queries/0_stateless/01059_window_view_inner_to_hop.reference b/dbms/tests/queries/0_stateless/01059_window_view_inner_to_hop.reference deleted file mode 100644 index 986394f7c0f..00000000000 --- a/dbms/tests/queries/0_stateless/01059_window_view_inner_to_hop.reference +++ /dev/null @@ -1,3 +0,0 @@ -0 -1 -1 diff --git a/dbms/tests/queries/0_stateless/01059_window_view_inner_to_hop.sql b/dbms/tests/queries/0_stateless/01059_window_view_inner_to_hop.sql deleted file mode 100644 index 49e0f67f14a..00000000000 --- a/dbms/tests/queries/0_stateless/01059_window_view_inner_to_hop.sql +++ /dev/null @@ -1,18 +0,0 @@ -SET allow_experimental_window_view = 1; - -DROP TABLE IF EXISTS test.wv; -DROP TABLE IF EXISTS test.mt; -DROP TABLE IF EXISTS test.dst; -DROP TABLE IF EXISTS test.`.inner.wv`; - -CREATE TABLE test.mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple(); -CREATE TABLE test.dst(count UInt64) ENGINE=MergeTree ORDER BY tuple(); -CREATE WINDOW VIEW test.wv TO test.dst ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) AS count FROM test.mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '2' SECOND) AS wid; - -INSERT INTO test.mt VALUES (1, now()); -SELECT sleep(3); -SELECT count FROM test.dst; - -DROP TABLE test.wv; -DROP TABLE test.mt; -DROP TABLE test.dst; \ No newline at end of file