support inner table params

This commit is contained in:
Vxider 2020-03-22 23:03:16 +08:00
parent 8486f24168
commit 05f5062a09
11 changed files with 141 additions and 82 deletions

View File

@ -24,10 +24,12 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTWatchQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/queryToString.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
@ -62,108 +64,115 @@ namespace
{
const auto RESCHEDULE_MS = 500;
class ParserStageMergeableOneMatcher
struct StageMergeableVisitorData
{
public:
using Visitor = InDepthNodeVisitor<ParserStageMergeableOneMatcher, true>;
using TypeToVisit = ASTFunction;
struct Data
{
ASTPtr window_function;
String window_column_name;
String timestamp_column_name;
bool is_tumble = false;
bool is_hop = false;
};
ASTPtr window_function;
String window_column_name;
String window_column_alias;
String timestamp_column_name;
bool is_tumble = false;
bool is_hop = false;
static bool needChildVisit(ASTPtr & node, const ASTPtr &)
{
if (node->as<ASTFunction>())
return false;
return true;
}
static void visit(ASTPtr & ast, Data & data)
{
if (const auto * t = ast->as<ASTFunction>())
visit(*t, ast, data);
}
private:
static void visit(const ASTFunction & node, ASTPtr & node_ptr, Data & data)
void visit(const ASTFunction & node, ASTPtr & node_ptr)
{
if (node.name == "TUMBLE")
{
if (!data.window_function)
if (!window_function)
{
data.is_tumble = true;
data.window_column_name = node.getColumnName();
data.window_function = node.clone();
data.timestamp_column_name = node.arguments->children[0]->getColumnName();
is_tumble = true;
window_column_name = node.getColumnName();
window_column_alias = node.alias;
window_function = node.clone();
timestamp_column_name = node.arguments->children[0]->getColumnName();
}
else if (serializeAST(node) != serializeAST(*data.window_function))
else if (serializeAST(node) != serializeAST(*window_function))
throw Exception("WINDOW VIEW only support ONE WINDOW FUNCTION", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW);
}
else if (node.name == "HOP")
{
if (!data.window_function)
if (!window_function)
{
data.is_hop = true;
data.window_function = node.clone();
data.timestamp_column_name = node.arguments->children[0]->getColumnName();
is_hop = true;
window_function = node.clone();
timestamp_column_name = node.arguments->children[0]->getColumnName();
auto ptr_ = node.clone();
std::static_pointer_cast<ASTFunction>(ptr_)->setAlias("");
auto arrayJoin = makeASTFunction("arrayJoin", ptr_);
arrayJoin->alias = node.alias;
node_ptr = arrayJoin;
data.window_column_name = arrayJoin->getColumnName();
window_column_name = arrayJoin->getColumnName();
window_column_alias = arrayJoin->alias;
}
else if (serializeAST(node) != serializeAST(*data.window_function))
else if (serializeAST(node) != serializeAST(*window_function))
throw Exception("WINDOW VIEW only support ONE WINDOW FUNCTION", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW);
}
}
};
class ParserProcTimeFinalMatcher
struct ReplaceFuncNowVisitorData
{
using TypeToVisit = ASTFunction;
bool is_time_column_func_now = false;
String window_column_name;
void visit(ASTFunction & node, ASTPtr & node_ptr)
{
if (node.name == "TUMBLE")
{
if (const auto * t = node.arguments->children[0]->as<ASTFunction>(); t && t->name == "now")
{
is_time_column_func_now = true;
node_ptr->children[0]->children[0] = std::make_shared<ASTIdentifier>("____timestamp");
window_column_name = node.getColumnName();
}
}
else if (node.name == "HOP")
{
if (const auto * t = node.arguments->children[0]->as<ASTFunction>(); t && t->name == "now")
is_time_column_func_now = true;
}
}
};
class ReplaceFunctionWindowMatcher
{
public:
using Visitor = InDepthNodeVisitor<ParserProcTimeFinalMatcher, true>;
using Visitor = InDepthNodeVisitor<ReplaceFunctionWindowMatcher, true>;
struct Data
{
bool is_time_column_now = false;
String window_column_name;
String window_column_alias;
};
static bool needChildVisit(ASTPtr &, const ASTPtr &)
{
return true;
}
static bool needChildVisit(ASTPtr &, const ASTPtr &) { return true; }
static void visit(ASTPtr & ast, Data & data)
{
if (const auto * t = ast->as<ASTFunction>())
visit(*t, ast, data);
if (const auto * t = ast->as<ASTIdentifier>())
visit(*t, ast, data);
}
private:
static void visit(const ASTFunction & node, ASTPtr & node_ptr, Data & data)
{
if (node.name == "TUMBLE")
if (node.name == "TUMBLE" || node.name == "HOP")
{
if (const auto * t = node.arguments->children[0]->as<ASTFunction>(); t && t->name == "now")
{
data.is_time_column_now = true;
node_ptr->children[0]->children[0] = std::make_shared<ASTIdentifier>("____timestamp");
data.window_column_name = node.getColumnName();
}
}
else if (node.name == "HOP")
{
if (const auto * t = node.arguments->children[0]->as<ASTFunction>(); t && t->name == "now")
data.is_time_column_now = true;
if (queryToString(node) == data.window_column_name)
node_ptr = std::make_shared<ASTIdentifier>(data.window_column_name);
}
}
static void visit(const ASTIdentifier & node, ASTPtr & node_ptr, Data & data)
{
if (node.name == data.window_column_alias)
node_ptr = std::make_shared<ASTIdentifier>(data.window_column_name);
}
};
static inline IntervalKind strToIntervalKind(const String& interval_str)
@ -478,7 +487,7 @@ std::shared_ptr<ASTCreateQuery> StorageWindowView::generateInnerTableCreateQuery
auto columns_list = std::make_shared<ASTExpressionList>();
if (is_time_column_now && is_tumble)
if (is_time_column_func_now && is_tumble)
{
auto column_window = std::make_shared<ASTColumnDeclaration>();
column_window->name = window_column_name;
@ -502,9 +511,57 @@ std::shared_ptr<ASTCreateQuery> StorageWindowView::generateInnerTableCreateQuery
column_wend->type = std::make_shared<ASTIdentifier>("DateTime");
columns_list->children.push_back(column_wend);
if (inner_create_query.storage->ttl_table)
throw Exception("TTL is not supported for inner table in Window View", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW);
ReplaceFunctionWindowMatcher::Data query_data;
query_data.window_column_name = window_column_name;
query_data.window_column_alias = window_column_alias;
ReplaceFunctionWindowMatcher::Visitor visitor(query_data);
ReplaceFuncNowVisitorData parser_proc_time_data;
InDepthNodeVisitor<OneTypeMatcher<ReplaceFuncNowVisitorData>, true> time_now_visitor(parser_proc_time_data);
auto storage = std::make_shared<ASTStorage>();
storage->set(storage->engine, inner_create_query.storage->engine->clone());
if (inner_create_query.storage->partition_by)
{
auto partition_by = inner_create_query.storage->partition_by->clone();
if (is_time_column_func_now)
time_now_visitor.visit(partition_by);
visitor.visit(partition_by);
storage->set(storage->partition_by, partition_by);
}
if (inner_create_query.storage->primary_key)
{
auto primary_key = inner_create_query.storage->primary_key->clone();
if (is_time_column_func_now)
time_now_visitor.visit(primary_key);
visitor.visit(primary_key);
storage->set(storage->primary_key, primary_key);
}
if (inner_create_query.storage->order_by)
{
auto order_by = inner_create_query.storage->order_by->clone();
if (is_time_column_func_now)
time_now_visitor.visit(order_by);
visitor.visit(order_by);
storage->set(storage->order_by, order_by);
}
if (inner_create_query.storage->sample_by)
{
auto sample_by = inner_create_query.storage->sample_by->clone();
if (is_time_column_func_now)
time_now_visitor.visit(sample_by);
visitor.visit(sample_by);
storage->set(storage->sample_by, sample_by);
}
if (inner_create_query.storage->settings)
storage->set(storage->settings, inner_create_query.storage->settings->clone());
new_columns_list->set(new_columns_list->columns, columns_list);
manual_create_query->set(manual_create_query->columns_list, new_columns_list);
manual_create_query->set(manual_create_query->storage, inner_create_query.storage->ptr());
manual_create_query->set(manual_create_query->storage, storage);
return manual_create_query;
}
@ -731,10 +788,10 @@ StorageWindowView::StorageWindowView(
inner_query = innerQueryParser(select_query);
final_query = inner_query->clone();
ParserProcTimeFinalMatcher::Data final_query_data;
ParserProcTimeFinalMatcher::Visitor(final_query_data).visit(final_query);
is_time_column_now = final_query_data.is_time_column_now;
if (is_time_column_now && is_tumble)
ReplaceFuncNowVisitorData final_query_data;
InDepthNodeVisitor<OneTypeMatcher<ReplaceFuncNowVisitorData>, true>(final_query_data).visit(final_query);
is_time_column_func_now = final_query_data.is_time_column_func_now;
if (is_time_column_func_now && is_tumble)
window_column_name = final_query_data.window_column_name;
is_watermark_strictly_ascending = query.is_watermark_strictly_ascending;
is_watermark_ascending = query.is_watermark_ascending;
@ -758,7 +815,7 @@ StorageWindowView::StorageWindowView(
if (query.is_watermark_strictly_ascending || query.is_watermark_ascending || query.is_watermark_bounded)
{
is_proctime = false;
if (is_time_column_now)
if (is_time_column_func_now)
throw Exception("now() is not support for Event time processing.", ErrorCodes::INCORRECT_QUERY);
if (query.is_watermark_ascending)
{
@ -820,7 +877,7 @@ StorageWindowView::StorageWindowView(
}
else
{
if (query.storage->engine->name != "MergeTree")
if (!endsWith(query.storage->engine->name, "MergeTree"))
throw Exception(
"The ENGINE of WindowView must be MergeTree family of table engines including the engines with replication support",
ErrorCodes::INCORRECT_QUERY);
@ -870,16 +927,17 @@ ASTPtr StorageWindowView::innerQueryParser(ASTSelectQuery & query)
// parse stage mergeable
ASTPtr result = query.clone();
ASTPtr expr_list = result;
ParserStageMergeableOneMatcher::Data stageMergeableOneData;
ParserStageMergeableOneMatcher::Visitor(stageMergeableOneData).visit(expr_list);
if (!stageMergeableOneData.is_tumble && !stageMergeableOneData.is_hop)
StageMergeableVisitorData stageMergeableData;
InDepthNodeVisitor<OneTypeMatcher<StageMergeableVisitorData, false>, true>(stageMergeableData).visit(expr_list);
if (!stageMergeableData.is_tumble && !stageMergeableData.is_hop)
throw Exception("WINDOW FUNCTION is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);
window_column_name = stageMergeableOneData.window_column_name;
timestamp_column_name = stageMergeableOneData.timestamp_column_name;
is_tumble = stageMergeableOneData.is_tumble;
window_column_name = stageMergeableData.window_column_name;
window_column_alias = stageMergeableData.window_column_alias;
timestamp_column_name = stageMergeableData.timestamp_column_name;
is_tumble = stageMergeableData.is_tumble;
// parser window function
ASTFunction & window_function = typeid_cast<ASTFunction &>(*stageMergeableOneData.window_function);
ASTFunction & window_function = typeid_cast<ASTFunction &>(*stageMergeableData.window_function);
const auto & arguments = window_function.arguments->children;
const auto & arg1 = std::static_pointer_cast<ASTFunction>(arguments.at(1));
if (!arg1 || !startsWith(arg1->name, "toInterval"))

View File

@ -59,7 +59,7 @@ private:
Context & global_context;
bool is_proctime{true};
bool is_time_column_now;
bool is_time_column_func_now;
bool is_tumble; // false if is hop
std::atomic<bool> shutdown_called{false};
mutable Block sample_block;
@ -95,6 +95,7 @@ private:
Int64 watermark_num_units = 0;
Int64 lateness_num_units = 0;
String window_column_name;
String window_column_alias;
String timestamp_column_name;
StorageID select_table_id = StorageID::createEmpty();

View File

@ -7,7 +7,7 @@ DROP TABLE IF EXISTS wv;
CREATE TABLE dst(count UInt64) Engine=MergeTree ORDER BY tuple();
CREATE TABLE mt(a Int32) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW wv TO dst ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) AS count FROM mt GROUP BY TUMBLE(now(), INTERVAL '1' SECOND) AS wid;
CREATE WINDOW VIEW wv TO dst ENGINE=AggregatingMergeTree PARTITION BY wid ORDER BY tuple(TUMBLE(now(), INTERVAL '1' SECOND)) AS SELECT count(a) AS count FROM mt GROUP BY TUMBLE(now(), INTERVAL '1' SECOND) AS wid;
INSERT INTO mt VALUES (1);
SELECT sleep(2);

View File

@ -7,7 +7,7 @@ DROP TABLE IF EXISTS wv;
CREATE TABLE dst(count UInt64) Engine=MergeTree ORDER BY tuple();
CREATE TABLE mt(a Int32) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW wv to dst ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) AS count FROM mt GROUP BY HOP(now(), INTERVAL '1' SECOND, INTERVAL '1' SECOND) AS wid;
CREATE WINDOW VIEW wv to dst ENGINE=AggregatingMergeTree ORDER BY tuple(wid) AS SELECT count(a) AS count FROM mt GROUP BY HOP(now(), INTERVAL '1' SECOND, INTERVAL '1' SECOND) AS wid;
INSERT INTO mt VALUES (1);
SELECT sleep(2);

View File

@ -7,7 +7,7 @@ DROP TABLE IF EXISTS wv;
CREATE TABLE dst(count UInt64) Engine=MergeTree ORDER BY tuple();
CREATE TABLE mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW wv TO dst ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) AS count FROM mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid;
CREATE WINDOW VIEW wv TO dst ENGINE=AggregatingMergeTree ORDER BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS SELECT count(a) AS count FROM mt GROUP BY TUMBLE(timestamp, INTERVAL '1' SECOND) AS wid;
INSERT INTO mt VALUES (1, now());
SELECT sleep(2);

View File

@ -7,7 +7,7 @@ DROP TABLE IF EXISTS wv;
CREATE TABLE dst(count UInt64) Engine=MergeTree ORDER BY tuple();
CREATE TABLE mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW wv TO dst ENGINE=MergeTree ORDER BY tuple() AS SELECT count(a) AS count FROM mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '1' SECOND) AS wid;
CREATE WINDOW VIEW wv TO dst ENGINE=AggregatingMergeTree ORDER BY wid AS SELECT count(a) AS count FROM mt GROUP BY HOP(timestamp, INTERVAL '1' SECOND, INTERVAL '1' SECOND) AS wid;
INSERT INTO mt VALUES (1, now());
SELECT sleep(2);

View File

@ -7,7 +7,7 @@ DROP TABLE IF EXISTS wv;
CREATE TABLE dst(count UInt64, w_end DateTime) Engine=MergeTree ORDER BY tuple();
CREATE TABLE mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW wv TO dst ENGINE=MergeTree ORDER BY tuple() WATERMARK=STRICTLY_ASCENDING AS SELECT count(a) AS count, TUMBLE_END(wid) as w_end FROM mt GROUP BY TUMBLE(timestamp, INTERVAL '5' SECOND) AS wid;
CREATE WINDOW VIEW wv TO dst ENGINE=AggregatingMergeTree ORDER BY wid WATERMARK=STRICTLY_ASCENDING AS SELECT count(a) AS count, TUMBLE_END(wid) as w_end FROM mt GROUP BY TUMBLE(timestamp, INTERVAL '5' SECOND) AS wid;
INSERT INTO mt VALUES (1, '1990/01/01 12:00:00');
INSERT INTO mt VALUES (1, '1990/01/01 12:00:01');

View File

@ -7,7 +7,7 @@ DROP TABLE IF EXISTS wv;
CREATE TABLE dst(count UInt64, w_end DateTime) Engine=MergeTree ORDER BY tuple();
CREATE TABLE mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW wv TO dst ENGINE=MergeTree() order by tuple() WATERMARK=STRICTLY_ASCENDING AS SELECT count(a) AS count, HOP_END(wid) as w_end FROM mt GROUP BY HOP(timestamp, INTERVAL '2' SECOND, INTERVAL '3' SECOND) AS wid;
CREATE WINDOW VIEW wv TO dst ENGINE=AggregatingMergeTree() order by wid WATERMARK=STRICTLY_ASCENDING AS SELECT count(a) AS count, HOP_END(wid) as w_end FROM mt GROUP BY HOP(timestamp, INTERVAL '2' SECOND, INTERVAL '3' SECOND) AS wid;
INSERT INTO mt VALUES (1, '1990/01/01 12:00:00');
INSERT INTO mt VALUES (1, '1990/01/01 12:00:01');

View File

@ -7,7 +7,7 @@ DROP TABLE IF EXISTS wv;
CREATE TABLE dst(count UInt64, w_end DateTime) Engine=MergeTree ORDER BY tuple();
CREATE TABLE mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW wv TO dst ENGINE=MergeTree ORDER BY tuple() WATERMARK=ASCENDING AS SELECT count(a) AS count, TUMBLE_END(wid) AS w_end FROM mt GROUP BY TUMBLE(timestamp, INTERVAL '5' SECOND) AS wid;
CREATE WINDOW VIEW wv TO dst ENGINE=AggregatingMergeTree ORDER BY wid WATERMARK=ASCENDING AS SELECT count(a) AS count, TUMBLE_END(wid) AS w_end FROM mt GROUP BY TUMBLE(timestamp, INTERVAL '5' SECOND) AS wid;
INSERT INTO mt VALUES (1, '1990/01/01 12:00:00');
INSERT INTO mt VALUES (1, '1990/01/01 12:00:01');

View File

@ -7,7 +7,7 @@ DROP TABLE IF EXISTS wv;
CREATE TABLE dst(count UInt64, w_end DateTime) Engine=MergeTree ORDER BY tuple();
CREATE TABLE mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW wv TO dst ENGINE=MergeTree ORDER BY tuple() WATERMARK=ASCENDING AS SELECT count(a) AS count, HOP_END(wid) AS w_end FROM mt GROUP BY HOP(timestamp, INTERVAL '2' SECOND, INTERVAL '3' SECOND) AS wid;
CREATE WINDOW VIEW wv TO dst ENGINE=AggregatingMergeTree ORDER BY wid WATERMARK=ASCENDING AS SELECT count(a) AS count, HOP_END(wid) AS w_end FROM mt GROUP BY HOP(timestamp, INTERVAL '2' SECOND, INTERVAL '3' SECOND) AS wid;
INSERT INTO mt VALUES (1, '1990/01/01 12:00:00');
INSERT INTO mt VALUES (1, '1990/01/01 12:00:01');

View File

@ -7,7 +7,7 @@ DROP TABLE IF EXISTS wv;
CREATE TABLE dst(count UInt64, w_end DateTime) Engine=MergeTree ORDER BY tuple();
CREATE TABLE mt(a Int32, timestamp DateTime) ENGINE=MergeTree ORDER BY tuple();
CREATE WINDOW VIEW wv TO dst ENGINE=MergeTree ORDER BY tuple() WATERMARK=INTERVAL '2' SECOND AS SELECT count(a) AS count, HOP_END(wid) AS w_end FROM mt GROUP BY HOP(timestamp, INTERVAL '2' SECOND, INTERVAL '3' SECOND) AS wid;
CREATE WINDOW VIEW wv TO dst ENGINE=AggregatingMergeTree ORDER BY wid WATERMARK=INTERVAL '2' SECOND AS SELECT count(a) AS count, HOP_END(wid) AS w_end FROM mt GROUP BY HOP(timestamp, INTERVAL '2' SECOND, INTERVAL '3' SECOND) AS wid;
INSERT INTO mt VALUES (1, '1990/01/01 12:00:00');
INSERT INTO mt VALUES (1, '1990/01/01 12:00:01');