Fix attach

This commit is contained in:
kssenii 2021-11-20 12:03:39 +03:00
parent 9bdad8ab64
commit 5c9509d68b
3 changed files with 10 additions and 8 deletions

View File

@ -65,10 +65,7 @@ public:
bool is_window_view{false}; bool is_window_view{false};
bool is_populate{false}; bool is_populate{false};
bool replace_view{false}; /// CREATE OR REPLACE VIEW bool replace_view{false}; /// CREATE OR REPLACE VIEW
bool is_watermark_strictly_ascending{false}; /// STRICTLY ASCENDING WATERMARK STRATEGY FOR WINDOW VIEW
bool is_watermark_ascending{false}; /// ASCENDING WATERMARK STRATEGY FOR WINDOW VIEW
bool is_watermark_bounded{false}; /// BOUNDED OUT OF ORDERNESS WATERMARK STRATEGY FOR WINDOW VIEW
bool allowed_lateness{false}; /// ALLOWED LATENESS FOR WINDOW VIEW
ASTColumns * columns_list = nullptr; ASTColumns * columns_list = nullptr;
ASTExpressionList * tables = nullptr; ASTExpressionList * tables = nullptr;
@ -90,6 +87,11 @@ public:
std::optional<UInt64> live_view_timeout; /// For CREATE LIVE VIEW ... WITH TIMEOUT ... std::optional<UInt64> live_view_timeout; /// For CREATE LIVE VIEW ... WITH TIMEOUT ...
std::optional<UInt64> live_view_periodic_refresh; /// For CREATE LIVE VIEW ... WITH [PERIODIC] REFRESH ... std::optional<UInt64> live_view_periodic_refresh; /// For CREATE LIVE VIEW ... WITH [PERIODIC] REFRESH ...
bool is_watermark_strictly_ascending{false}; /// STRICTLY ASCENDING WATERMARK STRATEGY FOR WINDOW VIEW
bool is_watermark_ascending{false}; /// ASCENDING WATERMARK STRATEGY FOR WINDOW VIEW
bool is_watermark_bounded{false}; /// BOUNDED OUT OF ORDERNESS WATERMARK STRATEGY FOR WINDOW VIEW
bool allowed_lateness{false}; /// ALLOWED LATENESS FOR WINDOW VIEW
bool attach_short_syntax{false}; bool attach_short_syntax{false};
std::optional<String> attach_from_path = std::nullopt; std::optional<String> attach_from_path = std::nullopt;

View File

@ -805,7 +805,6 @@ bool ParserCreateWindowViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected &
return false; return false;
} }
if (!s_window.ignore(pos, expected)) if (!s_window.ignore(pos, expected))
return false; return false;
@ -818,7 +817,6 @@ bool ParserCreateWindowViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected &
if (!table_name_p.parse(pos, table, expected)) if (!table_name_p.parse(pos, table, expected))
return false; return false;
if (ParserKeyword{"ON"}.ignore(pos, expected)) if (ParserKeyword{"ON"}.ignore(pos, expected))
{ {
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected)) if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
@ -888,6 +886,7 @@ bool ParserCreateWindowViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected &
StorageID table_id = table->as<ASTTableIdentifier>()->getTableId(); StorageID table_id = table->as<ASTTableIdentifier>()->getTableId();
query->database = table_id.database_name; query->database = table_id.database_name;
query->table = table_id.table_name; query->table = table_id.table_name;
query->uuid = table_id.uuid;
query->cluster = cluster_str; query->cluster = cluster_str;
if (to_table) if (to_table)

View File

@ -514,6 +514,8 @@ std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
PullingAsyncPipelineExecutor executor(pipeline); PullingAsyncPipelineExecutor executor(pipeline);
Block block; Block block;
BlocksPtr new_blocks = std::make_shared<Blocks>(); BlocksPtr new_blocks = std::make_shared<Blocks>();
/// TODO: Get rid of this blocks collected in memory...
while (executor.pull(block)) while (executor.pull(block))
{ {
if (block.rows() == 0) if (block.rows() == 0)
@ -1059,7 +1061,6 @@ ASTPtr StorageWindowView::innerQueryParser(ASTSelectQuery & query)
void StorageWindowView::eventTimeParser(const ASTCreateQuery & query) void StorageWindowView::eventTimeParser(const ASTCreateQuery & query)
{ {
/// TODO: This is sad that we add a lot of only StorageWindowView related information into ASTCreateQuery... :(
if (query.is_watermark_strictly_ascending || query.is_watermark_ascending || query.is_watermark_bounded) if (query.is_watermark_strictly_ascending || query.is_watermark_ascending || query.is_watermark_bounded)
{ {
is_proctime = false; is_proctime = false;
@ -1119,7 +1120,7 @@ void StorageWindowView::writeIntoWindowView(
lateness_bound = watermark_lower_bound; lateness_bound = watermark_lower_bound;
} }
} }
else if (! window_view.is_time_column_func_now) else if (!window_view.is_time_column_func_now)
{ {
lateness_bound = t_max_fired_watermark; lateness_bound = t_max_fired_watermark;
} }