windowview_multi_column_groupby

This commit is contained in:
Vxider 2022-02-24 14:06:37 +08:00
parent 90ae785e53
commit 43475f79bf
4 changed files with 94 additions and 75 deletions

View File

@ -639,10 +639,44 @@ std::shared_ptr<ASTCreateQuery> StorageWindowView::getInnerTableCreateQuery(
"The first argument of time window function should not be a constant value.", "The first argument of time window function should not be a constant value.",
ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW); ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW);
ToIdentifierMatcher::Data query_data;
query_data.window_id_name = window_id_name;
query_data.window_id_alias = window_id_alias;
ToIdentifierMatcher::Visitor to_identifier_visitor(query_data);
ReplaceFunctionNowData time_now_data;
ReplaceFunctionNowVisitor time_now_visitor(time_now_data);
ReplaceFunctionWindowMatcher::Data func_hop_data;
ReplaceFunctionWindowMatcher::Visitor func_window_visitor(func_hop_data);
DropTableIdentifierMatcher::Data drop_table_identifier_data;
DropTableIdentifierMatcher::Visitor drop_table_identifier_visitor(drop_table_identifier_data);
auto visit = [&](const IAST * ast)
{
auto node = ast->clone();
QueryNormalizer(normalizer_data).visit(node);
/// now() -> ____timestamp
if (is_time_column_func_now)
{
time_now_visitor.visit(node);
function_now_timezone = time_now_data.now_timezone;
}
drop_table_identifier_visitor.visit(node);
/// tumble/hop -> windowID
func_window_visitor.visit(node);
to_identifier_visitor.visit(node);
node->setAlias("");
return node;
};
auto new_storage = std::make_shared<ASTStorage>(); auto new_storage = std::make_shared<ASTStorage>();
/// storage != nullptr in case create window view with ENGINE syntax /// storage != nullptr in case create window view with ENGINE syntax
if (storage) if (storage)
{ {
new_storage->set(new_storage->engine, storage->engine->clone());
if (storage->ttl_table) if (storage->ttl_table)
throw Exception( throw Exception(
ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW, ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW,
@ -654,46 +688,14 @@ std::shared_ptr<ASTCreateQuery> StorageWindowView::getInnerTableCreateQuery(
"The ENGINE of WindowView must be MergeTree family of table engines " "The ENGINE of WindowView must be MergeTree family of table engines "
"including the engines with replication support"); "including the engines with replication support");
ToIdentifierMatcher::Data query_data; if (storage->partition_by)
query_data.window_id_name = window_id_name; new_storage->set(new_storage->partition_by, visit(storage->partition_by));
query_data.window_id_alias = window_id_alias; if (storage->primary_key)
ToIdentifierMatcher::Visitor to_identifier_visitor(query_data); new_storage->set(new_storage->primary_key, visit(storage->primary_key));
if (storage->order_by)
ReplaceFunctionNowData time_now_data; new_storage->set(new_storage->order_by, visit(storage->order_by));
ReplaceFunctionNowVisitor time_now_visitor(time_now_data); if (storage->sample_by)
ReplaceFunctionWindowMatcher::Data func_hop_data; new_storage->set(new_storage->sample_by, visit(storage->sample_by));
ReplaceFunctionWindowMatcher::Visitor func_window_visitor(func_hop_data);
DropTableIdentifierMatcher::Data drop_table_identifier_data;
DropTableIdentifierMatcher::Visitor drop_table_identifier_visitor(drop_table_identifier_data);
new_storage->set(new_storage->engine, storage->engine->clone());
auto visit = [&](const IAST * ast, IAST *& field)
{
if (ast)
{
auto node = ast->clone();
QueryNormalizer(normalizer_data).visit(node);
/// now() -> ____timestamp
if (is_time_column_func_now)
{
time_now_visitor.visit(node);
function_now_timezone = time_now_data.now_timezone;
}
drop_table_identifier_visitor.visit(node);
/// tumble/hop -> windowID
func_window_visitor.visit(node);
to_identifier_visitor.visit(node);
node->setAlias("");
new_storage->set(field, node);
}
};
visit(storage->partition_by, new_storage->partition_by);
visit(storage->primary_key, new_storage->primary_key);
visit(storage->order_by, new_storage->order_by);
visit(storage->sample_by, new_storage->sample_by);
if (storage->settings) if (storage->settings)
new_storage->set(new_storage->settings, storage->settings->clone()); new_storage->set(new_storage->settings, storage->settings->clone());
@ -702,8 +704,21 @@ std::shared_ptr<ASTCreateQuery> StorageWindowView::getInnerTableCreateQuery(
{ {
new_storage->set(new_storage->engine, makeASTFunction("AggregatingMergeTree")); new_storage->set(new_storage->engine, makeASTFunction("AggregatingMergeTree"));
new_storage->set(new_storage->order_by, std::make_shared<ASTIdentifier>(window_id_column_name)); if (inner_select_query->groupBy()->children.size() == 1) //GROUP BY windowID
new_storage->set(new_storage->primary_key, std::make_shared<ASTIdentifier>(window_id_column_name)); {
auto node = visit(inner_select_query->groupBy()->children[0].get());
new_storage->set(new_storage->order_by, std::make_shared<ASTIdentifier>(node->getColumnName()));
}
else
{
auto group_by_function = makeASTFunction("tuple");
for (auto & child : inner_select_query->groupBy()->children)
{
auto node = visit(child.get());
group_by_function->arguments->children.push_back(std::make_shared<ASTIdentifier>(node->getColumnName()));
}
new_storage->set(new_storage->order_by, group_by_function);
}
} }
auto new_columns = std::make_shared<ASTColumns>(); auto new_columns = std::make_shared<ASTColumns>();

View File

@ -1,34 +1,34 @@
---TUMBLE--- ---TUMBLE---
||---WINDOW COLUMN NAME--- ||---WINDOW COLUMN NAME---
CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(1))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nPRIMARY KEY `windowID(timestamp, toIntervalSecond(1))`\nORDER BY `windowID(timestamp, toIntervalSecond(1))`\nSETTINGS index_granularity = 8192 CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(1))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nORDER BY `windowID(timestamp, toIntervalSecond(1))`\nSETTINGS index_granularity = 8192
||---WINDOW COLUMN ALIAS--- ||---WINDOW COLUMN ALIAS---
CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(\'1\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nPRIMARY KEY `windowID(timestamp, toIntervalSecond(\'1\'))`\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'))`\nSETTINGS index_granularity = 8192 CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(\'1\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'))`\nSETTINGS index_granularity = 8192
||---IDENTIFIER--- ||---IDENTIFIER---
CREATE TABLE test_01048.`.inner.wv`\n(\n `b` Int32,\n `windowID(timestamp, toIntervalSecond(\'1\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nPRIMARY KEY `windowID(timestamp, toIntervalSecond(\'1\'))`\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'))`\nSETTINGS index_granularity = 8192 CREATE TABLE test_01048.`.inner.wv`\n(\n `b` Int32,\n `windowID(timestamp, toIntervalSecond(\'1\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nORDER BY (b, `windowID(timestamp, toIntervalSecond(\'1\'))`)\nSETTINGS index_granularity = 8192
CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(\'1\'))` UInt32,\n `b` Int32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nPRIMARY KEY `windowID(timestamp, toIntervalSecond(\'1\'))`\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'))`\nSETTINGS index_granularity = 8192 CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(\'1\'))` UInt32,\n `b` Int32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nORDER BY (`windowID(timestamp, toIntervalSecond(\'1\'))`, b)\nSETTINGS index_granularity = 8192
||---FUNCTION--- ||---FUNCTION---
CREATE TABLE test_01048.`.inner.wv`\n(\n `plus(a, b)` Int64,\n `windowID(timestamp, toIntervalSecond(\'1\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nPRIMARY KEY `windowID(timestamp, toIntervalSecond(\'1\'))`\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'))`\nSETTINGS index_granularity = 8192 CREATE TABLE test_01048.`.inner.wv`\n(\n `plus(a, b)` Int64,\n `windowID(timestamp, toIntervalSecond(\'1\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nORDER BY (`plus(a, b)`, `windowID(timestamp, toIntervalSecond(\'1\'))`)\nSETTINGS index_granularity = 8192
CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(\'1\'))` UInt32,\n `plus(a, b)` Int64,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nPRIMARY KEY `windowID(timestamp, toIntervalSecond(\'1\'))`\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'))`\nSETTINGS index_granularity = 8192 CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(\'1\'))` UInt32,\n `plus(a, b)` Int64,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nORDER BY (`windowID(timestamp, toIntervalSecond(\'1\'))`, `plus(a, b)`)\nSETTINGS index_granularity = 8192
||---TimeZone--- ||---TimeZone---
CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(\'1\'), \'Asia/Shanghai\')` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nPRIMARY KEY `windowID(timestamp, toIntervalSecond(\'1\'), \'Asia/Shanghai\')`\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'), \'Asia/Shanghai\')`\nSETTINGS index_granularity = 8192 CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(\'1\'), \'Asia/Shanghai\')` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'), \'Asia/Shanghai\')`\nSETTINGS index_granularity = 8192
||---DATA COLUMN ALIAS--- ||---DATA COLUMN ALIAS---
CREATE TABLE test_01048.`.inner.wv`\n(\n `b` Int32,\n `windowID(timestamp, toIntervalSecond(\'1\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nPRIMARY KEY `windowID(timestamp, toIntervalSecond(\'1\'))`\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'))`\nSETTINGS index_granularity = 8192 CREATE TABLE test_01048.`.inner.wv`\n(\n `b` Int32,\n `windowID(timestamp, toIntervalSecond(\'1\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nORDER BY (b, `windowID(timestamp, toIntervalSecond(\'1\'))`)\nSETTINGS index_granularity = 8192
||---JOIN--- ||---JOIN---
CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(\'1\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32),\n `count(mt_2.b)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nPRIMARY KEY `windowID(timestamp, toIntervalSecond(\'1\'))`\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'))`\nSETTINGS index_granularity = 8192 CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(\'1\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32),\n `count(mt_2.b)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'))`\nSETTINGS index_granularity = 8192
---HOP--- ---HOP---
||---WINDOW COLUMN NAME--- ||---WINDOW COLUMN NAME---
CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(1), toIntervalSecond(3))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nPRIMARY KEY `windowID(timestamp, toIntervalSecond(1), toIntervalSecond(3))`\nORDER BY `windowID(timestamp, toIntervalSecond(1), toIntervalSecond(3))`\nSETTINGS index_granularity = 8192 CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(1), toIntervalSecond(3))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nORDER BY `windowID(timestamp, toIntervalSecond(1), toIntervalSecond(3))`\nSETTINGS index_granularity = 8192
||---WINDOW COLUMN ALIAS--- ||---WINDOW COLUMN ALIAS---
CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nPRIMARY KEY `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`\nSETTINGS index_granularity = 8192 CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`\nSETTINGS index_granularity = 8192
||---IDENTIFIER--- ||---IDENTIFIER---
CREATE TABLE test_01048.`.inner.wv`\n(\n `b` Int32,\n `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nPRIMARY KEY `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`\nSETTINGS index_granularity = 8192 CREATE TABLE test_01048.`.inner.wv`\n(\n `b` Int32,\n `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nORDER BY (b, `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`)\nSETTINGS index_granularity = 8192
CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))` UInt32,\n `b` Int32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nPRIMARY KEY `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`\nSETTINGS index_granularity = 8192 CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))` UInt32,\n `b` Int32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nORDER BY (`windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`, b)\nSETTINGS index_granularity = 8192
||---FUNCTION--- ||---FUNCTION---
CREATE TABLE test_01048.`.inner.wv`\n(\n `plus(a, b)` Int64,\n `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nPRIMARY KEY `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`\nSETTINGS index_granularity = 8192 CREATE TABLE test_01048.`.inner.wv`\n(\n `plus(a, b)` Int64,\n `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nORDER BY (`plus(a, b)`, `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`)\nSETTINGS index_granularity = 8192
CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))` UInt32,\n `plus(a, b)` Int64,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nPRIMARY KEY `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`\nSETTINGS index_granularity = 8192 CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))` UInt32,\n `plus(a, b)` Int64,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nORDER BY (`windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`, `plus(a, b)`)\nSETTINGS index_granularity = 8192
||---TimeZone--- ||---TimeZone---
CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(1), toIntervalSecond(3), \'Asia/Shanghai\')` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nPRIMARY KEY `windowID(timestamp, toIntervalSecond(1), toIntervalSecond(3), \'Asia/Shanghai\')`\nORDER BY `windowID(timestamp, toIntervalSecond(1), toIntervalSecond(3), \'Asia/Shanghai\')`\nSETTINGS index_granularity = 8192 CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(1), toIntervalSecond(3), \'Asia/Shanghai\')` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nORDER BY `windowID(timestamp, toIntervalSecond(1), toIntervalSecond(3), \'Asia/Shanghai\')`\nSETTINGS index_granularity = 8192
||---DATA COLUMN ALIAS--- ||---DATA COLUMN ALIAS---
CREATE TABLE test_01048.`.inner.wv`\n(\n `b` Int32,\n `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nPRIMARY KEY `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`\nSETTINGS index_granularity = 8192 CREATE TABLE test_01048.`.inner.wv`\n(\n `b` Int32,\n `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nORDER BY (b, `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`)\nSETTINGS index_granularity = 8192
||---JOIN--- ||---JOIN---
CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32),\n `count(mt_2.b)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nPRIMARY KEY `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`\nSETTINGS index_granularity = 8192 CREATE TABLE test_01048.`.inner.wv`\n(\n `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))` UInt32,\n `count(a)` AggregateFunction(count, Int32),\n `count(mt_2.b)` AggregateFunction(count, Int32)\n)\nENGINE = AggregatingMergeTree\nORDER BY `windowID(timestamp, toIntervalSecond(\'1\'), toIntervalSecond(\'3\'))`\nSETTINGS index_granularity = 8192

View File

@ -1,3 +1,7 @@
3 1990-01-01 12:00:05 1 1 1990-01-01 12:00:05
2 1990-01-01 12:00:10 1 2 1990-01-01 12:00:05
2 1990-01-01 12:00:15 1 3 1990-01-01 12:00:05
1 4 1990-01-01 12:00:10
1 5 1990-01-01 12:00:10
1 6 1990-01-01 12:00:15
1 7 1990-01-01 12:00:15

View File

@ -10,22 +10,22 @@ DROP TABLE IF EXISTS mt;
DROP TABLE IF EXISTS dst; DROP TABLE IF EXISTS dst;
DROP TABLE IF EXISTS wv; DROP TABLE IF EXISTS wv;
CREATE TABLE dst(count UInt64, w_end DateTime) Engine=MergeTree ORDER BY tuple(); CREATE TABLE dst(count UInt64, market Int32, w_end DateTime) Engine=MergeTree ORDER BY tuple();
CREATE TABLE mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple(); CREATE TABLE mt(a Int32, market Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW wv TO dst WATERMARK=ASCENDING AS SELECT count(a) AS count, tumbleEnd(wid) AS w_end FROM mt GROUP BY tumble(timestamp, INTERVAL '5' SECOND, 'US/Samoa') AS wid; CREATE WINDOW VIEW wv TO dst WATERMARK=ASCENDING AS SELECT count(a) AS count, market, tumbleEnd(wid) AS w_end FROM mt GROUP BY tumble(timestamp, INTERVAL '5' SECOND, 'US/Samoa') AS wid, market;
INSERT INTO mt VALUES (1, '1990/01/01 12:00:00'); INSERT INTO mt VALUES (1, 1, '1990/01/01 12:00:00');
INSERT INTO mt VALUES (1, '1990/01/01 12:00:01'); INSERT INTO mt VALUES (1, 2, '1990/01/01 12:00:01');
INSERT INTO mt VALUES (1, '1990/01/01 12:00:02'); INSERT INTO mt VALUES (1, 3, '1990/01/01 12:00:02');
INSERT INTO mt VALUES (1, '1990/01/01 12:00:05'); INSERT INTO mt VALUES (1, 4, '1990/01/01 12:00:05');
INSERT INTO mt VALUES (1, '1990/01/01 12:00:06'); INSERT INTO mt VALUES (1, 5, '1990/01/01 12:00:06');
INSERT INTO mt VALUES (1, '1990/01/01 12:00:10'); INSERT INTO mt VALUES (1, 6, '1990/01/01 12:00:10');
INSERT INTO mt VALUES (1, '1990/01/01 12:00:11'); INSERT INTO mt VALUES (1, 7, '1990/01/01 12:00:11');
INSERT INTO mt VALUES (1, '1990/01/01 12:00:30'); INSERT INTO mt VALUES (1, 8, '1990/01/01 12:00:30');
EOF EOF
while true; do while true; do
$CLICKHOUSE_CLIENT --query="SELECT count(*) FROM dst" | grep -q "3" && break || sleep .5 ||: $CLICKHOUSE_CLIENT --query="SELECT count(*) FROM dst" | grep -q "7" && break || sleep .5 ||:
done done
$CLICKHOUSE_CLIENT --query="SELECT * FROM dst ORDER BY w_end;" $CLICKHOUSE_CLIENT --query="SELECT * FROM dst ORDER BY w_end;"