From 3ecd9f972a7105d2d4fc8e6f7e2ac6ccf44efd41 Mon Sep 17 00:00:00 2001 From: Vxider Date: Fri, 13 May 2022 12:51:51 +0000 Subject: [PATCH] alter table support for windowview --- src/Storages/WindowView/StorageWindowView.cpp | 223 ++++++++++++------ src/Storages/WindowView/StorageWindowView.h | 25 +- src/Storages/WindowView/WindowViewSource.h | 2 + 3 files changed, 165 insertions(+), 85 deletions(-) diff --git a/src/Storages/WindowView/StorageWindowView.cpp b/src/Storages/WindowView/StorageWindowView.cpp index 1319c864b7b..de7543a8f67 100644 --- a/src/Storages/WindowView/StorageWindowView.cpp +++ b/src/Storages/WindowView/StorageWindowView.cpp @@ -39,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -452,6 +453,65 @@ bool StorageWindowView::optimize( return getInnerStorage()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, local_context); } +void StorageWindowView::alter( + const AlterCommands & params, + ContextPtr local_context, + AlterLockHolder &) +{ + auto table_id = getStorageID(); + StorageInMemoryMetadata new_metadata = getInMemoryMetadata(); + StorageInMemoryMetadata old_metadata = getInMemoryMetadata(); + params.apply(new_metadata, local_context); + + const auto & new_select = new_metadata.select; + const auto & new_select_query = new_metadata.select.inner_query; + + auto old_inner_table_id = inner_table_id; + + modifying_query = true; + shutdown(); + + auto inner_query = initInnerQuery(new_select_query->as(), local_context); + + dropInnerTableIfAny(true, getContext()); + + /// create inner table + std::exchange(has_inner_table, true); + auto create_context = Context::createCopy(local_context); + auto inner_create_query = getInnerTableCreateQuery(inner_query, inner_table_id); + InterpreterCreateQuery create_interpreter(inner_create_query, create_context); + create_interpreter.setInternal(true); + create_interpreter.execute(); + + DatabaseCatalog::instance().addDependency(select_table_id, table_id); + DatabaseCatalog::instance().updateDependency(old_inner_table_id, table_id, inner_table_id, table_id); + + shutdown_called = false; + + clean_cache_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); }); + fire_task = getContext()->getSchedulePool().createTask( + getStorageID().getFullTableName(), [this] { is_proctime ? threadFuncFireProc() : threadFuncFireEvent(); }); + clean_cache_task->deactivate(); + fire_task->deactivate(); + + new_metadata.setSelectQuery(new_select); + + DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata); + setInMemoryMetadata(new_metadata); + + startup(); + modifying_query = false; +} + +void StorageWindowView::checkAlterIsPossible(const AlterCommands & commands, ContextPtr /*local_context*/) const +{ + for (const auto & command : commands) + { + if (!command.isCommentAlter() && command.type != AlterCommand::MODIFY_QUERY) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter of type '{}' is not supported by storage {}", command.type, getName()); + } +} + std::pair StorageWindowView::getNewBlocks(UInt32 watermark) { UInt32 w_start = addTime(watermark, window_kind, -window_num_units, *time_zone); @@ -600,13 +660,12 @@ inline void StorageWindowView::fire(UInt32 watermark) } } -std::shared_ptr StorageWindowView::getInnerTableCreateQuery( - const ASTPtr & inner_query, ASTStorage * storage, const String & database_name, const String & table_name) +ASTPtr StorageWindowView::getInnerTableCreateQuery(const ASTPtr & inner_query, const StorageID & inner_table_id) { /// We will create a query to create an internal table. auto inner_create_query = std::make_shared(); - inner_create_query->setDatabase(database_name); - inner_create_query->setTable(table_name); + inner_create_query->setDatabase(inner_table_id.getDatabaseName()); + inner_create_query->setTable(inner_table_id.getTableName()); Aliases aliases; QueryAliasesVisitor(aliases).visit(inner_query); @@ -684,33 +743,34 @@ std::shared_ptr StorageWindowView::getInnerTableCreateQuery( }; auto new_storage = std::make_shared(); - /// storage != nullptr in case create window view with ENGINE syntax - if (storage) + /// inner_storage_engine != nullptr in case create window view with ENGINE syntax + if (inner_table_engine) { - new_storage->set(new_storage->engine, storage->engine->clone()); + auto storage = inner_table_engine->as(); + new_storage->set(new_storage->engine, storage.engine->clone()); - if (storage->ttl_table) + if (storage.ttl_table) throw Exception( ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW, "TTL is not supported for inner table in Window View"); - if (!endsWith(storage->engine->name, "MergeTree")) + if (!endsWith(storage.engine->name, "MergeTree")) throw Exception( ErrorCodes::INCORRECT_QUERY, "The ENGINE of WindowView must be MergeTree family of table engines " "including the engines with replication support"); - if (storage->partition_by) - new_storage->set(new_storage->partition_by, visit(storage->partition_by)); - if (storage->primary_key) - new_storage->set(new_storage->primary_key, visit(storage->primary_key)); - if (storage->order_by) - new_storage->set(new_storage->order_by, visit(storage->order_by)); - if (storage->sample_by) - new_storage->set(new_storage->sample_by, visit(storage->sample_by)); + if (storage.partition_by) + new_storage->set(new_storage->partition_by, visit(storage.partition_by)); + if (storage.primary_key) + new_storage->set(new_storage->primary_key, visit(storage.primary_key)); + if (storage.order_by) + new_storage->set(new_storage->order_by, visit(storage.order_by)); + if (storage.sample_by) + new_storage->set(new_storage->sample_by, visit(storage.sample_by)); - if (storage->settings) - new_storage->set(new_storage->settings, storage->settings->clone()); + if (storage.settings) + new_storage->set(new_storage->settings, storage.settings->clone()); } else { @@ -969,6 +1029,7 @@ StorageWindowView::StorageWindowView( : IStorage(table_id_) , WithContext(context_->getGlobalContext()) , log(&Poco::Logger::get(fmt::format("StorageWindowView({}.{})", table_id_.database_name, table_id_.table_name))) + , clean_interval_ms(context_->getSettingsRef().window_view_clean_interval.totalMilliseconds()) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); @@ -982,11 +1043,47 @@ StorageWindowView::StorageWindowView( ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW, "UNION is not supported for {}", getName()); - select_query = query.select->list_of_selects->children.at(0)->clone(); + /// Extract information about watermark, lateness. + eventTimeParser(query); + + target_table_id = query.to_table_id; + + if (query.storage) + inner_table_engine = query.storage->clone(); + + auto inner_query = initInnerQuery(query.select->list_of_selects->children.at(0)->as(), context_); + + if(is_proctime) + next_fire_signal = getWindowUpperBound(std::time(nullptr)); + + std::exchange(has_inner_table, true); + if (!attach_) + { + auto inner_create_query = getInnerTableCreateQuery(inner_query, inner_table_id); + auto create_context = Context::createCopy(context_); + InterpreterCreateQuery create_interpreter(inner_create_query, create_context); + create_interpreter.setInternal(true); + create_interpreter.execute(); + } + + DatabaseCatalog::instance().addDependency(select_table_id, getStorageID()); + + clean_cache_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); }); + fire_task = getContext()->getSchedulePool().createTask( + getStorageID().getFullTableName(), [this] { is_proctime ? threadFuncFireProc() : threadFuncFireEvent(); }); + clean_cache_task->deactivate(); + fire_task->deactivate(); +} + +ASTPtr StorageWindowView::initInnerQuery(ASTSelectQuery query, ContextPtr context_) +{ + select_query = query.clone(); + sample_block.clear(); + String select_database_name = getContext()->getCurrentDatabase(); String select_table_name; - auto select_query_tmp = select_query->clone(); - extractDependentTable(getContext(), select_query_tmp, select_database_name, select_table_name); + auto select_query_tmp = query.clone(); + extractDependentTable(context_, select_query_tmp, select_database_name, select_table_name); /// If the table is not specified - use the table `system.one` if (select_table_name.empty()) @@ -995,77 +1092,40 @@ StorageWindowView::StorageWindowView( select_table_name = "one"; } select_table_id = StorageID(select_database_name, select_table_name); - DatabaseCatalog::instance().addDependency(select_table_id, table_id_); /// Extract all info from query; substitute Function_tumble and Function_hop with Function_windowID. - auto inner_query = innerQueryParser(select_query->as()); + auto inner_query = innerQueryParser(query); - // Parse mergeable query + /// Parse mergeable query mergeable_query = inner_query->clone(); ReplaceFunctionNowData func_now_data; ReplaceFunctionNowVisitor(func_now_data).visit(mergeable_query); is_time_column_func_now = func_now_data.is_time_column_func_now; + if (!is_proctime && is_time_column_func_now) + throw Exception("now() is not supported for Event time processing.", ErrorCodes::INCORRECT_QUERY); if (is_time_column_func_now) window_id_name = func_now_data.window_id_name; - // Parse final query (same as mergeable query but has tumble/hop instead of windowID) + /// Parse final query (same as mergeable query but has tumble/hop instead of windowID) final_query = mergeable_query->clone(); ReplaceWindowIdMatcher::Data final_query_data; - if (is_tumble) - final_query_data.window_name = "tumble"; - else - final_query_data.window_name = "hop"; + final_query_data.window_name = is_tumble ? "tumble" : "hop"; ReplaceWindowIdMatcher::Visitor(final_query_data).visit(final_query); - is_watermark_strictly_ascending = query.is_watermark_strictly_ascending; - is_watermark_ascending = query.is_watermark_ascending; - is_watermark_bounded = query.is_watermark_bounded; - target_table_id = query.to_table_id; + window_column_name = std::regex_replace(window_id_name, std::regex("windowID"), is_tumble ? "tumble" : "hop"); - /// Extract information about watermark, lateness. - eventTimeParser(query); - - if (is_tumble) - window_column_name = std::regex_replace(window_id_name, std::regex("windowID"), "tumble"); - else - window_column_name = std::regex_replace(window_id_name, std::regex("windowID"), "hop"); - - auto generate_inner_table_name = [](const StorageID & storage_id) + auto generate_inner_table_id = [](const StorageID & storage_id) { - if (storage_id.hasUUID()) - return ".inner." + toString(storage_id.uuid); - return ".inner." + storage_id.table_name; + StorageID table_id = StorageID::createEmpty(); + table_id.database_name = storage_id.database_name; + table_id.table_name = ".inner." + (storage_id.hasUUID() ? toString(storage_id.uuid) : storage_id.table_name); + return table_id; }; + inner_table_id = generate_inner_table_id(getStorageID()); - if (attach_) - { - inner_table_id = StorageID(table_id_.database_name, generate_inner_table_name(table_id_)); - } - else - { - auto inner_create_query - = getInnerTableCreateQuery(inner_query, query.storage, table_id_.database_name, generate_inner_table_name(table_id_)); - - auto create_context = Context::createCopy(context_); - InterpreterCreateQuery create_interpreter(inner_create_query, create_context); - create_interpreter.setInternal(true); - create_interpreter.execute(); - inner_table_id = StorageID(inner_create_query->getDatabase(), inner_create_query->getTable()); - } - - clean_interval_ms = getContext()->getSettingsRef().window_view_clean_interval.totalMilliseconds(); - next_fire_signal = getWindowUpperBound(std::time(nullptr)); - - clean_cache_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); }); - if (is_proctime) - fire_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncFireProc(); }); - else - fire_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncFireEvent(); }); - clean_cache_task->deactivate(); - fire_task->deactivate(); + return inner_query; } - ASTPtr StorageWindowView::innerQueryParser(const ASTSelectQuery & query) { if (!query.groupBy()) @@ -1127,13 +1187,16 @@ ASTPtr StorageWindowView::innerQueryParser(const ASTSelectQuery & query) void StorageWindowView::eventTimeParser(const ASTCreateQuery & query) { + watermark_num_units = 0; + lateness_num_units = 0; + is_watermark_strictly_ascending = query.is_watermark_strictly_ascending; + is_watermark_ascending = query.is_watermark_ascending; + is_watermark_bounded = query.is_watermark_bounded; + if (query.is_watermark_strictly_ascending || query.is_watermark_ascending || query.is_watermark_bounded) { is_proctime = false; - if (is_time_column_func_now) - throw Exception("now() is not supported for Event time processing.", ErrorCodes::INCORRECT_QUERY); - if (query.is_watermark_ascending) { is_watermark_bounded = true; @@ -1147,6 +1210,8 @@ void StorageWindowView::eventTimeParser(const ASTCreateQuery & query) "Illegal type WATERMARK function should be Interval"); } } + else + is_proctime = true; if (query.allowed_lateness) { @@ -1155,11 +1220,16 @@ void StorageWindowView::eventTimeParser(const ASTCreateQuery & query) query.lateness_function, lateness_kind, lateness_num_units, "Illegal type ALLOWED_LATENESS function should be Interval"); } + else + allowed_lateness = false; } void StorageWindowView::writeIntoWindowView( StorageWindowView & window_view, const Block & block, ContextPtr local_context) { + while (window_view.modifying_query) + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + Pipe pipe(std::make_shared(block.cloneEmpty(), Chunk(block.getColumns(), block.rows()))); UInt32 lateness_bound = 0; @@ -1340,6 +1410,7 @@ void StorageWindowView::shutdown() { std::lock_guard lock(mutex); fire_condition.notify_all(); + fire_signal_condition.notify_all(); } clean_cache_task->deactivate(); diff --git a/src/Storages/WindowView/StorageWindowView.h b/src/Storages/WindowView/StorageWindowView.h index 782e8f2b899..e19260ef7e0 100644 --- a/src/Storages/WindowView/StorageWindowView.h +++ b/src/Storages/WindowView/StorageWindowView.h @@ -134,6 +134,10 @@ public: const Names & deduplicate_by_columns, ContextPtr context) override; + void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & table_lock_holder) override; + + void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override; + void startup() override; void shutdown() override; @@ -161,10 +165,11 @@ private: /// Used to fetch the mergeable state and generate the final result. e.g. SELECT * FROM * GROUP BY tumble(____timestamp, *) ASTPtr final_query; - bool is_proctime{true}; + bool is_proctime; bool is_time_column_func_now; bool is_tumble; // false if is hop std::atomic shutdown_called{false}; + std::atomic modifying_query{false}; bool has_inner_table{true}; mutable Block sample_block; UInt64 clean_interval_ms; @@ -172,10 +177,10 @@ private: UInt32 max_timestamp = 0; UInt32 max_watermark = 0; // next watermark to fire UInt32 max_fired_watermark = 0; - bool is_watermark_strictly_ascending{false}; - bool is_watermark_ascending{false}; - bool is_watermark_bounded{false}; - bool allowed_lateness{false}; + bool is_watermark_strictly_ascending; + bool is_watermark_ascending; + bool is_watermark_bounded; + bool allowed_lateness; UInt32 next_fire_signal; std::deque fire_signal; std::list> watch_streams; @@ -195,8 +200,8 @@ private: Int64 window_num_units; Int64 hop_num_units; Int64 slice_num_units; - Int64 watermark_num_units = 0; - Int64 lateness_num_units = 0; + Int64 watermark_num_units; + Int64 lateness_num_units; Int64 slide_num_units; String window_id_name; String window_id_alias; @@ -207,6 +212,8 @@ private: StorageID target_table_id = StorageID::createEmpty(); StorageID inner_table_id = StorageID::createEmpty(); + ASTPtr inner_table_engine; + BackgroundSchedulePool::TaskHolder clean_cache_task; BackgroundSchedulePool::TaskHolder fire_task; @@ -215,9 +222,8 @@ private: ASTPtr innerQueryParser(const ASTSelectQuery & query); void eventTimeParser(const ASTCreateQuery & query); + ASTPtr initInnerQuery(ASTSelectQuery query, ContextPtr context); - std::shared_ptr getInnerTableCreateQuery( - const ASTPtr & inner_query, ASTStorage * storage, const String & database_name, const String & table_name); UInt32 getCleanupBound(); ASTPtr getCleanupQuery(); @@ -235,6 +241,7 @@ private: ASTPtr getFinalQuery() const { return final_query->clone(); } ASTPtr getFetchColumnQuery(UInt32 w_start, UInt32 w_end) const; + ASTPtr getInnerTableCreateQuery(const ASTPtr & inner_query, const StorageID & inner_table_id); StoragePtr getParentStorage() const; diff --git a/src/Storages/WindowView/WindowViewSource.h b/src/Storages/WindowView/WindowViewSource.h index a726cdc8712..7b914933035 100644 --- a/src/Storages/WindowView/WindowViewSource.h +++ b/src/Storages/WindowView/WindowViewSource.h @@ -51,6 +51,8 @@ protected: Block block; UInt32 watermark; std::tie(block, watermark) = generateImpl(); + if (!block) + return Chunk(); if (is_events) { return Chunk(