diff --git a/dbms/src/Storages/WindowView/StorageWindowView.cpp b/dbms/src/Storages/WindowView/StorageWindowView.cpp index b929f1d2b88..e0e358a2544 100644 --- a/dbms/src/Storages/WindowView/StorageWindowView.cpp +++ b/dbms/src/Storages/WindowView/StorageWindowView.cpp @@ -24,10 +24,12 @@ #include #include #include +#include #include #include #include #include +#include #include #include #include @@ -62,108 +64,115 @@ namespace { const auto RESCHEDULE_MS = 500; - class ParserStageMergeableOneMatcher + struct StageMergeableVisitorData { - public: - using Visitor = InDepthNodeVisitor; + using TypeToVisit = ASTFunction; - struct Data - { - ASTPtr window_function; - String window_column_name; - String timestamp_column_name; - bool is_tumble = false; - bool is_hop = false; - }; + ASTPtr window_function; + String window_column_name; + String window_column_alias; + String timestamp_column_name; + bool is_tumble = false; + bool is_hop = false; - static bool needChildVisit(ASTPtr & node, const ASTPtr &) - { - if (node->as()) - return false; - return true; - } - - static void visit(ASTPtr & ast, Data & data) - { - if (const auto * t = ast->as()) - visit(*t, ast, data); - } - - private: - static void visit(const ASTFunction & node, ASTPtr & node_ptr, Data & data) + void visit(const ASTFunction & node, ASTPtr & node_ptr) { if (node.name == "TUMBLE") { - if (!data.window_function) + if (!window_function) { - data.is_tumble = true; - data.window_column_name = node.getColumnName(); - data.window_function = node.clone(); - data.timestamp_column_name = node.arguments->children[0]->getColumnName(); + is_tumble = true; + window_column_name = node.getColumnName(); + window_column_alias = node.alias; + window_function = node.clone(); + timestamp_column_name = node.arguments->children[0]->getColumnName(); } - else if (serializeAST(node) != serializeAST(*data.window_function)) + else if (serializeAST(node) != serializeAST(*window_function)) throw Exception("WINDOW VIEW only support ONE WINDOW FUNCTION", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW); } else if (node.name == "HOP") { - if (!data.window_function) + if (!window_function) { - data.is_hop = true; - data.window_function = node.clone(); - data.timestamp_column_name = node.arguments->children[0]->getColumnName(); + is_hop = true; + window_function = node.clone(); + timestamp_column_name = node.arguments->children[0]->getColumnName(); auto ptr_ = node.clone(); std::static_pointer_cast(ptr_)->setAlias(""); auto arrayJoin = makeASTFunction("arrayJoin", ptr_); arrayJoin->alias = node.alias; node_ptr = arrayJoin; - data.window_column_name = arrayJoin->getColumnName(); + window_column_name = arrayJoin->getColumnName(); + window_column_alias = arrayJoin->alias; } - else if (serializeAST(node) != serializeAST(*data.window_function)) + else if (serializeAST(node) != serializeAST(*window_function)) throw Exception("WINDOW VIEW only support ONE WINDOW FUNCTION", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW); } } }; - class ParserProcTimeFinalMatcher + struct ReplaceFuncNowVisitorData + { + using TypeToVisit = ASTFunction; + + bool is_time_column_func_now = false; + String window_column_name; + + void visit(ASTFunction & node, ASTPtr & node_ptr) + { + if (node.name == "TUMBLE") + { + if (const auto * t = node.arguments->children[0]->as(); t && t->name == "now") + { + is_time_column_func_now = true; + node_ptr->children[0]->children[0] = std::make_shared("____timestamp"); + window_column_name = node.getColumnName(); + } + } + else if (node.name == "HOP") + { + if (const auto * t = node.arguments->children[0]->as(); t && t->name == "now") + is_time_column_func_now = true; + } + } + }; + + class ReplaceFunctionWindowMatcher { public: - using Visitor = InDepthNodeVisitor; + using Visitor = InDepthNodeVisitor; struct Data { - bool is_time_column_now = false; String window_column_name; + String window_column_alias; }; - static bool needChildVisit(ASTPtr &, const ASTPtr &) - { - return true; - } + static bool needChildVisit(ASTPtr &, const ASTPtr &) { return true; } static void visit(ASTPtr & ast, Data & data) { if (const auto * t = ast->as()) visit(*t, ast, data); + if (const auto * t = ast->as()) + visit(*t, ast, data); } private: static void visit(const ASTFunction & node, ASTPtr & node_ptr, Data & data) { - if (node.name == "TUMBLE") + if (node.name == "TUMBLE" || node.name == "HOP") { - if (const auto * t = node.arguments->children[0]->as(); t && t->name == "now") - { - data.is_time_column_now = true; - node_ptr->children[0]->children[0] = std::make_shared("____timestamp"); - data.window_column_name = node.getColumnName(); - } - } - else if (node.name == "HOP") - { - if (const auto * t = node.arguments->children[0]->as(); t && t->name == "now") - data.is_time_column_now = true; + if (queryToString(node) == data.window_column_name) + node_ptr = std::make_shared(data.window_column_name); } } + + static void visit(const ASTIdentifier & node, ASTPtr & node_ptr, Data & data) + { + if (node.name == data.window_column_alias) + node_ptr = std::make_shared(data.window_column_name); + } }; static inline IntervalKind strToIntervalKind(const String& interval_str) @@ -478,7 +487,7 @@ std::shared_ptr StorageWindowView::generateInnerTableCreateQuery auto columns_list = std::make_shared(); - if (is_time_column_now && is_tumble) + if (is_time_column_func_now && is_tumble) { auto column_window = std::make_shared(); column_window->name = window_column_name; @@ -502,9 +511,57 @@ std::shared_ptr StorageWindowView::generateInnerTableCreateQuery column_wend->type = std::make_shared("DateTime"); columns_list->children.push_back(column_wend); + if (inner_create_query.storage->ttl_table) + throw Exception("TTL is not supported for inner table in Window View", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW); + + ReplaceFunctionWindowMatcher::Data query_data; + query_data.window_column_name = window_column_name; + query_data.window_column_alias = window_column_alias; + ReplaceFunctionWindowMatcher::Visitor visitor(query_data); + + ReplaceFuncNowVisitorData parser_proc_time_data; + InDepthNodeVisitor, true> time_now_visitor(parser_proc_time_data); + + auto storage = std::make_shared(); + storage->set(storage->engine, inner_create_query.storage->engine->clone()); + if (inner_create_query.storage->partition_by) + { + auto partition_by = inner_create_query.storage->partition_by->clone(); + if (is_time_column_func_now) + time_now_visitor.visit(partition_by); + visitor.visit(partition_by); + storage->set(storage->partition_by, partition_by); + } + if (inner_create_query.storage->primary_key) + { + auto primary_key = inner_create_query.storage->primary_key->clone(); + if (is_time_column_func_now) + time_now_visitor.visit(primary_key); + visitor.visit(primary_key); + storage->set(storage->primary_key, primary_key); + } + if (inner_create_query.storage->order_by) + { + auto order_by = inner_create_query.storage->order_by->clone(); + if (is_time_column_func_now) + time_now_visitor.visit(order_by); + visitor.visit(order_by); + storage->set(storage->order_by, order_by); + } + if (inner_create_query.storage->sample_by) + { + auto sample_by = inner_create_query.storage->sample_by->clone(); + if (is_time_column_func_now) + time_now_visitor.visit(sample_by); + visitor.visit(sample_by); + storage->set(storage->sample_by, sample_by); + } + if (inner_create_query.storage->settings) + storage->set(storage->settings, inner_create_query.storage->settings->clone()); + new_columns_list->set(new_columns_list->columns, columns_list); manual_create_query->set(manual_create_query->columns_list, new_columns_list); - manual_create_query->set(manual_create_query->storage, inner_create_query.storage->ptr()); + manual_create_query->set(manual_create_query->storage, storage); return manual_create_query; } @@ -731,10 +788,10 @@ StorageWindowView::StorageWindowView( inner_query = innerQueryParser(select_query); final_query = inner_query->clone(); - ParserProcTimeFinalMatcher::Data final_query_data; - ParserProcTimeFinalMatcher::Visitor(final_query_data).visit(final_query); - is_time_column_now = final_query_data.is_time_column_now; - if (is_time_column_now && is_tumble) + ReplaceFuncNowVisitorData final_query_data; + InDepthNodeVisitor, true>(final_query_data).visit(final_query); + is_time_column_func_now = final_query_data.is_time_column_func_now; + if (is_time_column_func_now && is_tumble) window_column_name = final_query_data.window_column_name; is_watermark_strictly_ascending = query.is_watermark_strictly_ascending; is_watermark_ascending = query.is_watermark_ascending; @@ -758,7 +815,7 @@ StorageWindowView::StorageWindowView( if (query.is_watermark_strictly_ascending || query.is_watermark_ascending || query.is_watermark_bounded) { is_proctime = false; - if (is_time_column_now) + if (is_time_column_func_now) throw Exception("now() is not support for Event time processing.", ErrorCodes::INCORRECT_QUERY); if (query.is_watermark_ascending) { @@ -820,7 +877,7 @@ StorageWindowView::StorageWindowView( } else { - if (query.storage->engine->name != "MergeTree") + if (!endsWith(query.storage->engine->name, "MergeTree")) throw Exception( "The ENGINE of WindowView must be MergeTree family of table engines including the engines with replication support", ErrorCodes::INCORRECT_QUERY); @@ -870,16 +927,17 @@ ASTPtr StorageWindowView::innerQueryParser(ASTSelectQuery & query) // parse stage mergeable ASTPtr result = query.clone(); ASTPtr expr_list = result; - ParserStageMergeableOneMatcher::Data stageMergeableOneData; - ParserStageMergeableOneMatcher::Visitor(stageMergeableOneData).visit(expr_list); - if (!stageMergeableOneData.is_tumble && !stageMergeableOneData.is_hop) + StageMergeableVisitorData stageMergeableData; + InDepthNodeVisitor, true>(stageMergeableData).visit(expr_list); + if (!stageMergeableData.is_tumble && !stageMergeableData.is_hop) throw Exception("WINDOW FUNCTION is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY); - window_column_name = stageMergeableOneData.window_column_name; - timestamp_column_name = stageMergeableOneData.timestamp_column_name; - is_tumble = stageMergeableOneData.is_tumble; + window_column_name = stageMergeableData.window_column_name; + window_column_alias = stageMergeableData.window_column_alias; + timestamp_column_name = stageMergeableData.timestamp_column_name; + is_tumble = stageMergeableData.is_tumble; // parser window function - ASTFunction & window_function = typeid_cast(*stageMergeableOneData.window_function); + ASTFunction & window_function = typeid_cast(*stageMergeableData.window_function); const auto & arguments = window_function.arguments->children; const auto & arg1 = std::static_pointer_cast(arguments.at(1)); if (!arg1 || !startsWith(arg1->name, "toInterval")) diff --git a/dbms/src/Storages/WindowView/StorageWindowView.h b/dbms/src/Storages/WindowView/StorageWindowView.h index 8c814d16c14..87460d5280d 100644 --- a/dbms/src/Storages/WindowView/StorageWindowView.h +++ b/dbms/src/Storages/WindowView/StorageWindowView.h @@ -59,7 +59,7 @@ private: Context & global_context; bool is_proctime{true}; - bool is_time_column_now; + bool is_time_column_func_now; bool is_tumble; // false if is hop std::atomic shutdown_called{false}; mutable Block sample_block; @@ -95,6 +95,7 @@ private: Int64 watermark_num_units = 0; Int64 lateness_num_units = 0; String window_column_name; + String window_column_alias; String timestamp_column_name; StorageID select_table_id = StorageID::createEmpty(); diff --git a/dbms/tests/queries/0_stateless/01054_window_view_proc_now_tumble_inner_to.sql b/dbms/tests/queries/0_stateless/01054_window_view_proc_now_tumble_inner_to.sql index 477b8ed4fae..a184d43ebbc 100755 --- a/dbms/tests/queries/0_stateless/01054_window_view_proc_now_tumble_inner_to.sql +++ b/dbms/tests/queries/0_stateless/01054_window_view_proc_now_tumble_inner_to.sql @@ -7,7 +7,7 @@ DROP TABLE IF EXISTS wv; CREATE TABLE dst(count UInt64) Engine=MergeTree ORDER BY tuple(); CREATE TABLE mt(a Int32) ENGINE=MergeTree ORDER BY tuple(); -CREATE WINDOW VIEW wv TO dst ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) AS count FROM mt GROUP BY TUMBLE(now(), INTERVAL '1' SECOND) AS wid; +CREATE WINDOW VIEW wv TO dst ENGINE=AggregatingMergeTree PARTITION BY wid ORDER BY tuple(TUMBLE(now(), INTERVAL '1' SECOND)) AS SELECT count(a) AS count FROM mt GROUP BY TUMBLE(now(), INTERVAL '1' SECOND) AS wid; INSERT INTO mt VALUES (1); SELECT sleep(2); diff --git a/dbms/tests/queries/0_stateless/01056_window_view_proc_now_hop_inner_to.sql b/dbms/tests/queries/0_stateless/01056_window_view_proc_now_hop_inner_to.sql index a41fabdccf9..89c78784ab3 100755 --- a/dbms/tests/queries/0_stateless/01056_window_view_proc_now_hop_inner_to.sql +++ b/dbms/tests/queries/0_stateless/01056_window_view_proc_now_hop_inner_to.sql @@ -7,7 +7,7 @@ DROP TABLE IF EXISTS wv; CREATE TABLE dst(count UInt64) Engine=MergeTree ORDER BY tuple(); CREATE TABLE mt(a Int32) ENGINE=MergeTree ORDER BY tuple(); -CREATE WINDOW VIEW wv to dst ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) AS count FROM mt GROUP BY HOP(now(), INTERVAL '1' SECOND, INTERVAL '1' SECOND) AS wid; +CREATE WINDOW VIEW wv to dst ENGINE=AggregatingMergeTree ORDER BY tuple(wid) AS SELECT count(a) AS count FROM mt GROUP BY HOP(now(), INTERVAL '1' SECOND, INTERVAL '1' SECOND) AS wid; INSERT INTO mt VALUES (1); SELECT sleep(2); diff --git a/dbms/tests/queries/0_stateless/01058_window_view_proc_tumble_inner_to.sql b/dbms/tests/queries/0_stateless/01058_window_view_proc_tumble_inner_to.sql index 5af4267033d..f4226f24cec 100755 --- a/dbms/tests/queries/0_stateless/01058_window_view_proc_tumble_inner_to.sql +++ b/dbms/tests/queries/0_stateless/01058_window_view_proc_tumble_inner_to.sql @@ -7,7 +7,7 @@ DROP TABLE IF EXISTS wv; CREATE TABLE dst(count UInt64) Engine=MergeTree ORDER BY tuple(); CREATE TABLE mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple(); -CREATE WINDOW VIEW wv TO dst ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) AS count FROM mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid; +CREATE WINDOW VIEW wv TO dst ENGINE=AggregatingMergeTree ORDER BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS SELECT count(a) AS count FROM mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid; INSERT INTO mt VALUES (1, now()); SELECT sleep(2); diff --git a/dbms/tests/queries/0_stateless/01060_window_view_proc_hop_inner_to.sql b/dbms/tests/queries/0_stateless/01060_window_view_proc_hop_inner_to.sql index 125d98c6b25..0a6a4fff2cb 100755 --- a/dbms/tests/queries/0_stateless/01060_window_view_proc_hop_inner_to.sql +++ b/dbms/tests/queries/0_stateless/01060_window_view_proc_hop_inner_to.sql @@ -7,7 +7,7 @@ DROP TABLE IF EXISTS wv; CREATE TABLE dst(count UInt64) Engine=MergeTree ORDER BY tuple(); CREATE TABLE mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple(); -CREATE WINDOW VIEW wv TO dst ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) AS count FROM mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '1' SECOND) AS wid; +CREATE WINDOW VIEW wv TO dst ENGINE=AggregatingMergeTree ORDER BY wid AS SELECT count(a) AS count FROM mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '1' SECOND) AS wid; INSERT INTO mt VALUES (1, now()); SELECT sleep(2); diff --git a/dbms/tests/queries/0_stateless/01063_window_view_event_strict_asc_tumble_inner_to.sql b/dbms/tests/queries/0_stateless/01063_window_view_event_strict_asc_tumble_inner_to.sql index 5025f2a075f..f4ad0b6dcd6 100644 --- a/dbms/tests/queries/0_stateless/01063_window_view_event_strict_asc_tumble_inner_to.sql +++ b/dbms/tests/queries/0_stateless/01063_window_view_event_strict_asc_tumble_inner_to.sql @@ -7,7 +7,7 @@ DROP TABLE IF EXISTS wv; CREATE TABLE dst(count UInt64, w_end DateTime) Engine=MergeTree ORDER BY tuple(); CREATE TABLE mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple(); -CREATE WINDOW VIEW wv TO dst ENGINE=MergeTree ORDER BY tuple() WATERMARK=STRICTLY_ASCENDING AS SELECT count(a) AS count, TUMBLE_END(wid) as w_end FROM mt GROUP BY TUMBLE(timestamp, INTERVAL '5' SECOND) AS wid; +CREATE WINDOW VIEW wv TO dst ENGINE=AggregatingMergeTree ORDER BY wid WATERMARK=STRICTLY_ASCENDING AS SELECT count(a) AS count, TUMBLE_END(wid) as w_end FROM mt GROUP BY TUMBLE(timestamp, INTERVAL '5' SECOND) AS wid; INSERT INTO mt VALUES (1, '1990/01/01 12:00:00'); INSERT INTO mt VALUES (1, '1990/01/01 12:00:01'); diff --git a/dbms/tests/queries/0_stateless/01065_window_view_event_strict_asc_hop_inner_to.sql b/dbms/tests/queries/0_stateless/01065_window_view_event_strict_asc_hop_inner_to.sql index 872f39649ef..34502703f14 100644 --- a/dbms/tests/queries/0_stateless/01065_window_view_event_strict_asc_hop_inner_to.sql +++ b/dbms/tests/queries/0_stateless/01065_window_view_event_strict_asc_hop_inner_to.sql @@ -7,7 +7,7 @@ DROP TABLE IF EXISTS wv; CREATE TABLE dst(count UInt64, w_end DateTime) Engine=MergeTree ORDER BY tuple(); CREATE TABLE mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple(); -CREATE WINDOW VIEW wv TO dst ENGINE=MergeTree() order by tuple() WATERMARK=STRICTLY_ASCENDING AS SELECT count(a) AS count, HOP_END(wid) as w_end FROM mt GROUP BY HOP(timestamp, INTERVAL '2' SECOND, INTERVAL '3' SECOND) AS wid; +CREATE WINDOW VIEW wv TO dst ENGINE=AggregatingMergeTree() order by wid WATERMARK=STRICTLY_ASCENDING AS SELECT count(a) AS count, HOP_END(wid) as w_end FROM mt GROUP BY HOP(timestamp, INTERVAL '2' SECOND, INTERVAL '3' SECOND) AS wid; INSERT INTO mt VALUES (1, '1990/01/01 12:00:00'); INSERT INTO mt VALUES (1, '1990/01/01 12:00:01'); diff --git a/dbms/tests/queries/0_stateless/01068_window_view_event_asc_tumble_inner_to.sql b/dbms/tests/queries/0_stateless/01068_window_view_event_asc_tumble_inner_to.sql index 13aae098fcb..e50b94dd269 100644 --- a/dbms/tests/queries/0_stateless/01068_window_view_event_asc_tumble_inner_to.sql +++ b/dbms/tests/queries/0_stateless/01068_window_view_event_asc_tumble_inner_to.sql @@ -7,7 +7,7 @@ DROP TABLE IF EXISTS wv; CREATE TABLE dst(count UInt64, w_end DateTime) Engine=MergeTree ORDER BY tuple(); CREATE TABLE mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple(); -CREATE WINDOW VIEW wv TO dst ENGINE=MergeTree ORDER BY tuple() WATERMARK=ASCENDING AS SELECT count(a) AS count, TUMBLE_END(wid) AS w_end FROM mt GROUP BY TUMBLE(timestamp, INTERVAL '5' SECOND) AS wid; +CREATE WINDOW VIEW wv TO dst ENGINE=AggregatingMergeTree ORDER BY wid WATERMARK=ASCENDING AS SELECT count(a) AS count, TUMBLE_END(wid) AS w_end FROM mt GROUP BY TUMBLE(timestamp, INTERVAL '5' SECOND) AS wid; INSERT INTO mt VALUES (1, '1990/01/01 12:00:00'); INSERT INTO mt VALUES (1, '1990/01/01 12:00:01'); diff --git a/dbms/tests/queries/0_stateless/01070_window_view_event_asc_hop_inner_to.sql b/dbms/tests/queries/0_stateless/01070_window_view_event_asc_hop_inner_to.sql index 34aae857a29..e889761f3dd 100644 --- a/dbms/tests/queries/0_stateless/01070_window_view_event_asc_hop_inner_to.sql +++ b/dbms/tests/queries/0_stateless/01070_window_view_event_asc_hop_inner_to.sql @@ -7,7 +7,7 @@ DROP TABLE IF EXISTS wv; CREATE TABLE dst(count UInt64, w_end DateTime) Engine=MergeTree ORDER BY tuple(); CREATE TABLE mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple(); -CREATE WINDOW VIEW wv TO dst ENGINE=MergeTree ORDER BY tuple() WATERMARK=ASCENDING AS SELECT count(a) AS count, HOP_END(wid) AS w_end FROM mt GROUP BY HOP(timestamp, INTERVAL '2' SECOND, INTERVAL '3' SECOND) AS wid; +CREATE WINDOW VIEW wv TO dst ENGINE=AggregatingMergeTree ORDER BY wid WATERMARK=ASCENDING AS SELECT count(a) AS count, HOP_END(wid) AS w_end FROM mt GROUP BY HOP(timestamp, INTERVAL '2' SECOND, INTERVAL '3' SECOND) AS wid; INSERT INTO mt VALUES (1, '1990/01/01 12:00:00'); INSERT INTO mt VALUES (1, '1990/01/01 12:00:01'); diff --git a/dbms/tests/queries/0_stateless/01075_window_view_event_bounded_hop_inner_to.sql b/dbms/tests/queries/0_stateless/01075_window_view_event_bounded_hop_inner_to.sql index ca4969123a0..1f2179182e1 100644 --- a/dbms/tests/queries/0_stateless/01075_window_view_event_bounded_hop_inner_to.sql +++ b/dbms/tests/queries/0_stateless/01075_window_view_event_bounded_hop_inner_to.sql @@ -7,7 +7,7 @@ DROP TABLE IF EXISTS wv; CREATE TABLE dst(count UInt64, w_end DateTime) Engine=MergeTree ORDER BY tuple(); CREATE TABLE mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple(); -CREATE WINDOW VIEW wv TO dst ENGINE=MergeTree ORDER BY tuple() WATERMARK=INTERVAL '2' SECOND AS SELECT count(a) AS count, HOP_END(wid) AS w_end FROM mt GROUP BY HOP(timestamp, INTERVAL '2' SECOND, INTERVAL '3' SECOND) AS wid; +CREATE WINDOW VIEW wv TO dst ENGINE=AggregatingMergeTree ORDER BY wid WATERMARK=INTERVAL '2' SECOND AS SELECT count(a) AS count, HOP_END(wid) AS w_end FROM mt GROUP BY HOP(timestamp, INTERVAL '2' SECOND, INTERVAL '3' SECOND) AS wid; INSERT INTO mt VALUES (1, '1990/01/01 12:00:00'); INSERT INTO mt VALUES (1, '1990/01/01 12:00:01');