#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int INCORRECT_QUERY; extern const int TABLE_WAS_NOT_DROPPED; extern const int QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW; extern const int SUPPORT_IS_DISABLED; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; } namespace { const auto RESCHEDULE_MS = 500; class ParserStageMergeableOneMatcher { public: using Visitor = InDepthNodeVisitor; struct Data { ASTPtr window_function; String window_column_name; // String window_column_name_or_alias; bool is_tumble = false; bool is_hop = false; }; static bool needChildVisit(ASTPtr & node, const ASTPtr &) { if (node->as()) return false; return true; } static void visit(ASTPtr & ast, Data & data) { if (const auto * t = ast->as()) visit(*t, ast, data); } private: static void visit(const ASTFunction & node, ASTPtr & node_ptr, Data & data) { if (node.name == "TUMBLE") { if (!data.window_function) { data.is_tumble = true; data.window_column_name = node.getColumnName(); data.window_function = node.clone(); } else if (serializeAST(node) != serializeAST(*data.window_function)) throw Exception("WINDOW VIEW only support ONE WINDOW FUNCTION", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW); } else if (node.name == "HOP") { if (!data.window_function) { data.is_hop = true; data.window_function = node.clone(); auto ptr_ = node.clone(); std::static_pointer_cast(ptr_)->setAlias(""); auto arrayJoin = makeASTFunction("arrayJoin", ptr_); arrayJoin->alias = node.alias; data.window_column_name = arrayJoin->getColumnName(); node_ptr = arrayJoin; } else if (serializeAST(node) != serializeAST(*data.window_function)) throw Exception("WINDOW VIEW only support ONE WINDOW FUNCTION", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW); } } }; } static void extractDependentTable(ASTSelectQuery & query, String & select_database_name, String & select_table_name) { auto db_and_table = getDatabaseAndTable(query, 0); ASTPtr subquery = extractTableExpression(query, 0); if (!db_and_table && !subquery) return; if (db_and_table) { select_table_name = db_and_table->table; if (db_and_table->database.empty()) { db_and_table->database = select_database_name; AddDefaultDatabaseVisitor visitor(select_database_name); visitor.visit(query); } else select_database_name = db_and_table->database; } else if (auto * ast_select = subquery->as()) { if (ast_select->list_of_selects->children.size() != 1) throw Exception("UNION is not supported for WINDOW VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW); auto & inner_query = ast_select->list_of_selects->children.at(0); extractDependentTable(inner_query->as(), select_database_name, select_table_name); } else throw Exception( "Logical error while creating StorageWindowView." " Could not retrieve table name from select query.", DB::ErrorCodes::LOGICAL_ERROR); } void StorageWindowView::checkTableCanBeDropped() const { Dependencies dependencies = global_context.getDependencies(database_name, table_name); if (!dependencies.empty()) { DatabaseAndTableName database_and_table_name = dependencies.front(); throw Exception( "Table has dependency " + database_and_table_name.first + "." + database_and_table_name.second, ErrorCodes::TABLE_WAS_NOT_DROPPED); } } void StorageWindowView::drop(TableStructureWriteLockHolder &) { global_context.removeDependency( DatabaseAndTableName(select_database_name, select_table_name), DatabaseAndTableName(database_name, table_name)); std::lock_guard lock(mutex); is_dropped = true; condition.notify_all(); } inline void StorageWindowView::flushToTable() { //write into dependent table StoragePtr target_table = getTargetTable(); auto _blockInputStreamPtr = getNewBlocksInputStreamPtr(); auto _lock = target_table->lockStructureForShare(true, global_context.getCurrentQueryId()); auto stream = target_table->write(getInnerQuery(), global_context); copyData(*_blockInputStreamPtr, *stream); } UInt32 StorageWindowView::getWindowUpperBound(UInt32 time_sec) { switch (window_kind) { #define CASE_WINDOW_KIND(KIND) \ case IntervalKind::KIND: \ { \ UInt32 start = ToStartOfTransform::execute(time_sec, window_num_units, time_zone); \ return AddTime::execute(start, window_num_units, time_zone); \ } CASE_WINDOW_KIND(Second); CASE_WINDOW_KIND(Minute); CASE_WINDOW_KIND(Hour); CASE_WINDOW_KIND(Day); CASE_WINDOW_KIND(Week); CASE_WINDOW_KIND(Month); CASE_WINDOW_KIND(Quarter); CASE_WINDOW_KIND(Year); #undef CASE_WINDOW_KIND } __builtin_unreachable(); } void StorageWindowView::threadFuncToTable() { while (!shutdown_called && has_target_table) { std::unique_lock lock(flushTableMutex); UInt64 timestamp_usec = static_cast(Poco::Timestamp().epochMicroseconds()); UInt64 w_end = static_cast(getWindowUpperBound(static_cast(timestamp_usec / 1000000))) * 1000000; condition.wait_for(lock, std::chrono::microseconds(w_end - timestamp_usec)); try { if (refreshBlockStatus()) flushToTable(); } catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); break; } } if (!shutdown_called) toTableTask->scheduleAfter(RESCHEDULE_MS); } BlockInputStreams StorageWindowView::watch( const Names & /*column_names*/, const SelectQueryInfo & query_info, const Context & context, QueryProcessingStage::Enum & processed_stage, size_t /*max_block_size*/, const unsigned /*num_streams*/) { if (has_target_table) throw Exception("WATCH query is disabled for " + getName() + " when constructed with 'TO' clause.", ErrorCodes::INCORRECT_QUERY); if (active_ptr.use_count() > 1) throw Exception("WATCH query is already attached, WINDOW VIEW only supports attaching one watch query.", ErrorCodes::INCORRECT_QUERY); ASTWatchQuery & query = typeid_cast(*query_info.query); bool has_limit = false; UInt64 limit = 0; if (query.limit_length) { has_limit = true; limit = safeGet(typeid_cast(*query.limit_length).value); } auto reader = std::make_shared( std::static_pointer_cast(shared_from_this()), active_ptr, has_limit, limit, context.getSettingsRef().temporary_window_view_timeout.totalSeconds()); { std::lock_guard no_users_thread_lock(no_users_thread_mutex); if (no_users_thread.joinable()) { std::lock_guard lock(no_users_thread_wakeup_mutex); no_users_thread_wakeup = true; no_users_thread_condition.notify_one(); } } processed_stage = QueryProcessingStage::Complete; return {reader}; } Block StorageWindowView::getHeader() const { if (!sample_block) { auto storage = global_context.getTable(select_database_name, select_table_name); sample_block = InterpreterSelectQuery( getInnerQuery(), global_context, storage, SelectQueryOptions(QueryProcessingStage::Complete)) .getSampleBlock(); for (size_t i = 0; i < sample_block.columns(); ++i) sample_block.safeGetByPosition(i).column = sample_block.safeGetByPosition(i).column->convertToFullColumnIfConst(); } return sample_block; } StoragePtr & StorageWindowView::getParentStorage() { if (!parent_storage) parent_storage = global_context.getTable(getSelectDatabaseName(), getSelectTableName()); return parent_storage; } StorageWindowView::StorageWindowView( const String & table_name_, const String & database_name_, Context & local_context, const ASTCreateQuery & query, const ColumnsDescription & columns_) : table_name(table_name_) , database_name(database_name_) , global_context(local_context.getGlobalContext()) , time_zone(DateLUT::instance()) , log(&Poco::Logger::get("StorageWindowView")) { setColumns(columns_); if (!query.select) throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY); /// Default value, if only table name exist in the query select_database_name = local_context.getCurrentDatabase(); if (query.select->list_of_selects->children.size() != 1) throw Exception("UNION is not supported for Window View", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW); auto inner_query_ = query.select->list_of_selects->children.at(0); ASTSelectQuery & select_query = typeid_cast(*inner_query_); extractDependentTable(select_query, select_database_name, select_table_name); inner_query = innerQueryParser(select_query); /// If the table is not specified - use the table `system.one` if (select_table_name.empty()) { select_database_name = "system"; select_table_name = "one"; } global_context.addDependency( DatabaseAndTableName(select_database_name, select_table_name), DatabaseAndTableName(database_name, table_name)); if (!query.to_table.empty()) { has_target_table = true; target_database_name = query.to_database; target_table_name = query.to_table; } is_temporary = query.temporary; temporary_window_view_timeout = local_context.getSettingsRef().temporary_window_view_timeout.totalSeconds(); mergeable_blocks = std::make_shared>(); active_ptr = std::make_shared(true); toTableTask = global_context.getSchedulePool().createTask(log->name(), [this] { threadFuncToTable(); }); toTableTask->deactivate(); } ASTPtr StorageWindowView::innerQueryParser(ASTSelectQuery & query) { if (!query.groupBy()) throw Exception("GROUP BY query is required for " + getName(), ErrorCodes::INCORRECT_QUERY); // parse stage mergeable ASTPtr result = query.clone(); ASTPtr expr_list = result; ParserStageMergeableOneMatcher::Data stageMergeableOneData; ParserStageMergeableOneMatcher::Visitor(stageMergeableOneData).visit(expr_list); if (!stageMergeableOneData.is_tumble && !stageMergeableOneData.is_hop) throw Exception("WINDOW FUNCTION is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY); window_column_name = stageMergeableOneData.window_column_name; // parser window function ASTFunction & window_function = typeid_cast(*stageMergeableOneData.window_function); ASTExpressionList &window_function_args = typeid_cast(*window_function.arguments); const auto & children = window_function_args.children; const auto & interval_p1 = std::static_pointer_cast(children.at(1)); if (!interval_p1 || !startsWith(interval_p1->name, "toInterval")) throw Exception("Illegal type of last argument of function " + interval_p1->name + " should be Interval", ErrorCodes::ILLEGAL_COLUMN); String interval_str = interval_p1->name.substr(10); if (interval_str == "Second") window_kind = IntervalKind::Second; else if (interval_str == "Minute") window_kind = IntervalKind::Minute; else if (interval_str == "Hour") window_kind = IntervalKind::Hour; else if (interval_str == "Day") window_kind = IntervalKind::Day; else if (interval_str == "Week") window_kind = IntervalKind::Week; else if (interval_str == "Month") window_kind = IntervalKind::Month; else if (interval_str == "Quarter") window_kind = IntervalKind::Quarter; else if (interval_str == "Year") window_kind = IntervalKind::Year; const auto & interval_units_p1 = std::static_pointer_cast(interval_p1->children.front()->children.front()); window_num_units = stoi(interval_units_p1->value.get()); return result; } void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, const Block & block, const Context & context) { BlockInputStreams streams = {std::make_shared(block)}; auto window_proxy_storage = std::make_shared( window_view.getParentStorage(), std::move(streams), QueryProcessingStage::FetchColumns); InterpreterSelectQuery select_block( window_view.getInnerQuery(), context, window_proxy_storage, QueryProcessingStage::WithMergeableState); auto data_mergeable_stream = std::make_shared(select_block.execute().in); BlocksListPtr new_mergeable_blocks = std::make_shared(); while (Block block_ = data_mergeable_stream->read()) { const ColumnTuple * column_tuple = checkAndGetColumn(block_.getByName(window_view.window_column_name).column.get()); block_.insert( {ColumnUInt8::create(block_.rows(), WINDOW_VIEW_FIRE_STATUS::WAITING), std::make_shared(), "____fire_status"}); block_.insert({column_tuple->getColumnPtr(1), std::make_shared(), "____w_end"}); new_mergeable_blocks->push_back(block_); } if (new_mergeable_blocks->empty()) return; { std::unique_lock lock(window_view.mutex); window_view.getMergeableBlocksList()->push_back(new_mergeable_blocks); } window_view.condition.notify_all(); } StoragePtr StorageWindowView::getTargetTable() const { return global_context.getTable(target_database_name, target_table_name); } StoragePtr StorageWindowView::tryGetTargetTable() const { return global_context.tryGetTable(target_database_name, target_table_name); } void StorageWindowView::startup() { // Start the working thread if (has_target_table) toTableTask->activateAndSchedule(); startNoUsersThread(temporary_window_view_timeout); } void StorageWindowView::shutdown() { bool expected = false; if (!shutdown_called.compare_exchange_strong(expected, true)) return; toTableTask->deactivate(); { std::lock_guard no_users_thread_lock(no_users_thread_mutex); if (no_users_thread.joinable()) { { std::lock_guard lock(no_users_thread_wakeup_mutex); no_users_thread_wakeup = true; no_users_thread_condition.notify_one(); } } } } StorageWindowView::~StorageWindowView() { shutdown(); { std::lock_guard lock(no_users_thread_mutex); if (no_users_thread.joinable()) no_users_thread.detach(); } } bool StorageWindowView::refreshBlockStatus() { UInt32 timestamp_now = std::time(nullptr); for (BlocksListPtr mergeable_block : *mergeable_blocks) { for (Block & block : *mergeable_block) { auto & col_wend = block.getByName("____w_end").column; const auto & wend_data = static_cast(*col_wend).getData(); auto & col_status = block.getByName("____fire_status").column; const auto & col_status_data = static_cast(*col_status).getData(); for (size_t i = 0; i < col_wend->size(); ++i) { if (wend_data[i] < timestamp_now && col_status_data[i] == WINDOW_VIEW_FIRE_STATUS::WAITING) return true; } } } return false; } BlockInputStreamPtr StorageWindowView::getNewBlocksInputStreamPtr() { if (mergeable_blocks->empty()) return {std::make_shared(getHeader())}; { std::lock_guard lock(mutex); //delete fired blocks for (BlocksListPtr mergeable_block : *mergeable_blocks) { mergeable_block->remove_if([](Block & block_) { auto & column_ = block_.getByName("____fire_status").column; const auto & data = static_cast(*column_).getData(); for (size_t i = 0; i < column_->size(); ++i) { if (data[i] != WINDOW_VIEW_FIRE_STATUS::RETIRED) return false; } return true; }); } mergeable_blocks->remove_if([](BlocksListPtr & ptr) { return ptr->size() == 0; }); if (mergeable_blocks->empty()) return {std::make_shared(getHeader())}; // mark blocks can be fired UInt32 timestamp_now = std::time(nullptr); for (BlocksListPtr mergeable_block : *mergeable_blocks) { for (Block & block : *mergeable_block) { auto & col_wend = block.getByName("____w_end").column; const auto & wend_data = static_cast(*col_wend).getData(); auto & col_status = block.getByName("____fire_status").column; auto col_status_mutable = col_status->assumeMutable(); auto & col_status_data = static_cast(*col_status_mutable).getData(); for (size_t i = 0; i < col_wend->size(); ++i) { if (wend_data[i] < timestamp_now && col_status_data[i] == WINDOW_VIEW_FIRE_STATUS::WAITING) col_status_data[i] = WINDOW_VIEW_FIRE_STATUS::READY; } col_status = std::move(col_status_mutable); } } } BlockInputStreams from; auto sample_block_ = mergeable_blocks->front()->front().cloneEmpty(); //TODO: 改为全局 BlockInputStreamPtr stream = std::make_shared(mergeable_blocks, sample_block_, mutex); from.push_back(std::move(stream)); auto proxy_storage = std::make_shared( getParentStorage(), std::move(from), QueryProcessingStage::WithMergeableState); InterpreterSelectQuery select(getInnerQuery(), global_context, proxy_storage, QueryProcessingStage::Complete); BlockInputStreamPtr data = std::make_shared(select.execute().in); return data; } BlocksPtr StorageWindowView::getNewBlocks() { auto res = getNewBlocksInputStreamPtr(); BlocksPtr blocks = std::make_shared(); while (Block this_block = res->read()) blocks->push_back(this_block); return blocks; } void StorageWindowView::noUsersThread(std::shared_ptr storage, const UInt64 & timeout) { bool drop_table = false; if (storage->shutdown_called) return; { while (1) { std::unique_lock lock(storage->no_users_thread_wakeup_mutex); if (!storage->no_users_thread_condition.wait_for(lock, std::chrono::seconds(timeout), [&] { return storage->no_users_thread_wakeup; })) { storage->no_users_thread_wakeup = false; if (storage->shutdown_called) return; if (!storage->global_context.getDependencies(storage->database_name, storage->table_name).empty()) continue; drop_table = true; } break; } } if (drop_table) { if (storage->global_context.tryGetTable(storage->database_name, storage->table_name)) { try { /// We create and execute `drop` query for this table auto drop_query = std::make_shared(); drop_query->database = storage->database_name; drop_query->table = storage->table_name; drop_query->kind = ASTDropQuery::Kind::Drop; ASTPtr ast_drop_query = drop_query; InterpreterDropQuery drop_interpreter(ast_drop_query, storage->global_context); drop_interpreter.execute(); } catch (...) { } } } } void StorageWindowView::startNoUsersThread(const UInt64 & timeout) { bool expected = false; if (!start_no_users_thread_called.compare_exchange_strong(expected, true)) return; if (is_temporary) { std::lock_guard no_users_thread_lock(no_users_thread_mutex); if (shutdown_called) return; if (no_users_thread.joinable()) { { std::lock_guard lock(no_users_thread_wakeup_mutex); no_users_thread_wakeup = true; no_users_thread_condition.notify_one(); } no_users_thread.join(); } { std::lock_guard lock(no_users_thread_wakeup_mutex); no_users_thread_wakeup = false; } if (!is_dropped) no_users_thread = std::thread(&StorageWindowView::noUsersThread, std::static_pointer_cast(shared_from_this()), timeout); } start_no_users_thread_called = false; } void registerStorageWindowView(StorageFactory & factory) { factory.registerStorage("WindowView", [](const StorageFactory::Arguments & args) { if (!args.attach && !args.local_context.getSettingsRef().allow_experimental_window_view) throw Exception( "Experimental WINDOW VIEW feature is not enabled (the setting 'allow_experimental_window_view')", ErrorCodes::SUPPORT_IS_DISABLED); return StorageWindowView::create(args.table_name, args.database_name, args.local_context, args.query, args.columns); }); } }