#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; extern const int INCORRECT_QUERY; extern const int TABLE_WAS_NOT_DROPPED; extern const int QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW; extern const int SUPPORT_IS_DISABLED; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int CANNOT_PARSE_TEXT; } namespace { const auto RESCHEDULE_MS = 500; class ParserStageMergeableOneMatcher { public: using Visitor = InDepthNodeVisitor; struct Data { ASTPtr window_function; String window_column_name; bool is_tumble = false; bool is_hop = false; }; static bool needChildVisit(ASTPtr & node, const ASTPtr &) { if (node->as()) return false; return true; } static void visit(ASTPtr & ast, Data & data) { if (const auto * t = ast->as()) visit(*t, ast, data); } private: static void visit(const ASTFunction & node, ASTPtr & node_ptr, Data & data) { if (node.name == "TUMBLE") { if (!data.window_function) { data.is_tumble = true; data.window_column_name = node.getColumnName(); data.window_function = node.clone(); } else if (serializeAST(node) != serializeAST(*data.window_function)) throw Exception("WINDOW VIEW only support ONE WINDOW FUNCTION", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW); } else if (node.name == "HOP") { if (!data.window_function) { data.is_hop = true; data.window_function = node.clone(); auto ptr_ = node.clone(); std::static_pointer_cast(ptr_)->setAlias(""); auto arrayJoin = makeASTFunction("arrayJoin", ptr_); arrayJoin->alias = node.alias; node_ptr = arrayJoin; data.window_column_name = arrayJoin->getColumnName(); } else if (serializeAST(node) != serializeAST(*data.window_function)) throw Exception("WINDOW VIEW only support ONE WINDOW FUNCTION", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW); } } }; class ParserProcTimeFinalMatcher { public: using Visitor = InDepthNodeVisitor; struct Data { bool is_proctime_tumble = false; String window_column_name; }; static bool needChildVisit(ASTPtr &, const ASTPtr &) { return true; } static void visit(ASTPtr & ast, Data & data) { if (const auto * t = ast->as()) visit(*t, ast, data); } private: static void visit(const ASTFunction & node, ASTPtr & node_ptr, Data & data) { if (node.name == "TUMBLE") { if (const auto * t = node.arguments->children[0]->as(); t && t->name == "now") { data.is_proctime_tumble = true; node_ptr->children[0]->children[0] = std::make_shared("____timestamp"); data.window_column_name = node.getColumnName(); } } } }; static inline IntervalKind strToIntervalKind(const String& interval_str) { if (interval_str == "Second") return IntervalKind::Second; else if (interval_str == "Minute") return IntervalKind::Minute; else if (interval_str == "Hour") return IntervalKind::Hour; else if (interval_str == "Day") return IntervalKind::Day; else if (interval_str == "Week") return IntervalKind::Week; else if (interval_str == "Month") return IntervalKind::Month; else if (interval_str == "Quarter") return IntervalKind::Quarter; else if (interval_str == "Year") return IntervalKind::Year; __builtin_unreachable(); } static inline String generateInnerTableName(const String & table_name) { return ".inner." + table_name; } static ASTPtr generateDeleteRetiredQuery(StorageID inner_table_id, UInt32 timestamp) { auto function_equal = makeASTFunction( "less", std::make_shared("____w_end"), std::make_shared(timestamp)); auto alterCommand = std::make_shared(); alterCommand->type = ASTAlterCommand::DELETE; alterCommand->predicate = function_equal; alterCommand->children.push_back(alterCommand->predicate); auto alterCommandList = std::make_shared(); alterCommandList->add(alterCommand); auto alterQuery = std::make_shared(); alterQuery->database = inner_table_id.database_name; alterQuery->table = inner_table_id.table_name; alterQuery->set(alterQuery->command_list, alterCommandList); return alterQuery; } static std::shared_ptr generateFetchColumnsQuery(const StorageID & inner_storage) { auto res_query = std::make_shared(); auto select = std::make_shared(); select->children.push_back(std::make_shared()); res_query->setExpression(ASTSelectQuery::Expression::SELECT, select); auto tableInSelectQuery = std::make_shared(); auto tableInSelectQueryElement = std::make_shared(); res_query->setExpression(ASTSelectQuery::Expression::TABLES, std::make_shared()); auto tables = res_query->tables(); auto tables_elem = std::make_shared(); auto table_expr = std::make_shared(); tables->children.push_back(tables_elem); tables_elem->table_expression = table_expr; tables_elem->children.push_back(table_expr); table_expr->database_and_table_name = createTableIdentifier(inner_storage.database_name, inner_storage.table_name); table_expr->children.push_back(table_expr->database_and_table_name); return res_query; } } static void extractDependentTable(ASTSelectQuery & query, String & select_database_name, String & select_table_name) { auto db_and_table = getDatabaseAndTable(query, 0); ASTPtr subquery = extractTableExpression(query, 0); if (!db_and_table && !subquery) return; if (db_and_table) { select_table_name = db_and_table->table; if (db_and_table->database.empty()) { db_and_table->database = select_database_name; AddDefaultDatabaseVisitor visitor(select_database_name); visitor.visit(query); } else select_database_name = db_and_table->database; } else if (auto * ast_select = subquery->as()) { if (ast_select->list_of_selects->children.size() != 1) throw Exception("UNION is not supported for WINDOW VIEW", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW); auto & inner_query = ast_select->list_of_selects->children.at(0); extractDependentTable(inner_query->as(), select_database_name, select_table_name); } else throw Exception( "Logical error while creating StorageWindowView." " Could not retrieve table name from select query.", DB::ErrorCodes::LOGICAL_ERROR); } void StorageWindowView::checkTableCanBeDropped() const { auto table_id = getStorageID(); Dependencies dependencies = global_context.getDependencies(table_id); if (!dependencies.empty()) { StorageID dependent_table_id = dependencies.front(); throw Exception("Table has dependency " + dependent_table_id.getNameForLogs(), ErrorCodes::TABLE_WAS_NOT_DROPPED); } } static void executeDropQuery(ASTDropQuery::Kind kind, Context & global_context, const StorageID & target_table_id) { if (global_context.tryGetTable(target_table_id)) { auto drop_query = std::make_shared(); drop_query->database = target_table_id.database_name; drop_query->table = target_table_id.table_name; drop_query->kind = kind; ASTPtr ast_drop_query = drop_query; InterpreterDropQuery drop_interpreter(ast_drop_query, global_context); drop_interpreter.execute(); } } void StorageWindowView::drop(TableStructureWriteLockHolder &) { auto table_id = getStorageID(); global_context.removeDependency(select_table_id, table_id); if (!inner_table_id.empty()) executeDropQuery(ASTDropQuery::Kind::Drop, global_context, inner_table_id); std::lock_guard lock(mutex); is_dropped = true; condition.notify_all(); } void StorageWindowView::truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) { if (!inner_table_id.empty()) executeDropQuery(ASTDropQuery::Kind::Truncate, global_context, inner_table_id); else { std::lock_guard lock(mutex); mergeable_blocks = std::make_shared>(); } } bool StorageWindowView::optimize(const ASTPtr & query, const ASTPtr & partition, bool final, bool deduplicate, const Context & context) { if (inner_table_id.empty()) throw Exception( "OPTIMIZE only supported when creating WINDOW VIEW within INNER table.", ErrorCodes::INCORRECT_QUERY); return getInnerStorage()->optimize(query, partition, final, deduplicate, context); } inline void StorageWindowView::cleanCache() { //delete fired blocks UInt32 timestamp_now = std::time(nullptr); UInt32 w_lower_bound = getWindowLowerBound(timestamp_now, -1); if (watermark_num_units != 0) w_lower_bound = getWatermark(w_lower_bound); if (!inner_table_id.empty()) { auto sql = generateDeleteRetiredQuery(inner_table_id, w_lower_bound); InterpreterAlterQuery alt_query(sql, global_context); alt_query.execute(); } else { std::lock_guard lock(mutex); for (BlocksListPtr mergeable_block : *mergeable_blocks) { mergeable_block->remove_if([&w_lower_bound](Block & block_) { auto & column_ = block_.getByName("____w_end").column; const auto & data = static_cast(*column_).getData(); for (size_t i = 0; i < column_->size(); ++i) { if (data[i] >= w_lower_bound) return false; } return true; }); } mergeable_blocks->remove_if([](BlocksListPtr & ptr) { return ptr->size() == 0; }); } } inline void StorageWindowView::flushToTable(UInt32 timestamp_) { //write into dependent table StoragePtr target_table = getTargetStorage(); auto _blockInputStreamPtr = getNewBlocksInputStreamPtr(timestamp_); auto _lock = target_table->lockStructureForShare(true, global_context.getCurrentQueryId()); auto stream = target_table->write(getInnerQuery(), global_context); copyData(*_blockInputStreamPtr, *stream); } std::shared_ptr StorageWindowView::generateInnerTableCreateQuery(const ASTCreateQuery & inner_create_query, const String & database_name, const String & table_name) { /// We will create a query to create an internal table. auto manual_create_query = std::make_shared(); manual_create_query->database = database_name; manual_create_query->table = table_name; auto new_columns_list = std::make_shared(); auto sample_block_ = InterpreterSelectQuery(getInnerQuery(), global_context, getParentStorage(), SelectQueryOptions(QueryProcessingStage::WithMergeableState)) .getSampleBlock(); auto columns_list = std::make_shared(); if (is_proctime_tumble) { auto column_window = std::make_shared(); column_window->name = window_column_name; column_window->type = makeASTFunction("Tuple", std::make_shared("DateTime"), std::make_shared("DateTime")); columns_list->children.push_back(column_window); } for (auto & column_ : sample_block_.getColumnsWithTypeAndName()) { ParserIdentifierWithOptionalParameters parser; String sql = column_.type->getName(); ASTPtr ast = parseQuery(parser, sql.data(), sql.data() + sql.size(), "data type", 0); auto column_dec = std::make_shared(); column_dec->name = column_.name; column_dec->type = ast; columns_list->children.push_back(column_dec); } auto column_wend = std::make_shared(); column_wend->name = "____w_end"; column_wend->type = std::make_shared("DateTime"); columns_list->children.push_back(column_wend); new_columns_list->set(new_columns_list->columns, columns_list); manual_create_query->set(manual_create_query->columns_list, new_columns_list); manual_create_query->set(manual_create_query->storage, inner_create_query.storage->ptr()); return manual_create_query; } inline UInt32 StorageWindowView::getWindowLowerBound(UInt32 time_sec, int window_id_skew) { switch (window_kind) { #define CASE_WINDOW_KIND(KIND) \ case IntervalKind::KIND: \ { \ UInt32 res = ToStartOfTransform::execute(time_sec, window_num_units, time_zone); \ if (window_id_skew != 0) \ return AddTime::execute(res, window_id_skew * window_num_units, time_zone); \ else \ return res; \ } CASE_WINDOW_KIND(Second) CASE_WINDOW_KIND(Minute) CASE_WINDOW_KIND(Hour) CASE_WINDOW_KIND(Day) CASE_WINDOW_KIND(Week) CASE_WINDOW_KIND(Month) CASE_WINDOW_KIND(Quarter) CASE_WINDOW_KIND(Year) #undef CASE_WINDOW_KIND } __builtin_unreachable(); } inline UInt32 StorageWindowView::getWindowUpperBound(UInt32 time_sec, int window_id_skew) { switch (window_kind) { #define CASE_WINDOW_KIND(KIND) \ case IntervalKind::KIND: \ { \ UInt32 start = ToStartOfTransform::execute(time_sec, window_num_units, time_zone); \ UInt32 res = AddTime::execute(start, window_num_units, time_zone); \ if (window_id_skew != 0) \ return AddTime::execute(res, window_id_skew * window_num_units, time_zone); \ else \ return res; \ } CASE_WINDOW_KIND(Second) CASE_WINDOW_KIND(Minute) CASE_WINDOW_KIND(Hour) CASE_WINDOW_KIND(Day) CASE_WINDOW_KIND(Week) CASE_WINDOW_KIND(Month) CASE_WINDOW_KIND(Quarter) CASE_WINDOW_KIND(Year) #undef CASE_WINDOW_KIND } __builtin_unreachable(); } inline UInt32 StorageWindowView::getWatermark(UInt32 time_sec) { switch (watermark_kind) { #define CASE_WINDOW_KIND(KIND) \ case IntervalKind::KIND: \ { \ return AddTime::execute(time_sec, watermark_num_units, time_zone); \ } CASE_WINDOW_KIND(Second) CASE_WINDOW_KIND(Minute) CASE_WINDOW_KIND(Hour) CASE_WINDOW_KIND(Day) CASE_WINDOW_KIND(Week) CASE_WINDOW_KIND(Month) CASE_WINDOW_KIND(Quarter) CASE_WINDOW_KIND(Year) #undef CASE_WINDOW_KIND } __builtin_unreachable(); } inline void StorageWindowView::addFireSignal(UInt32 timestamp_) { if (!target_table_id.empty()) fire_signal.push_back(timestamp_); for (auto watch_stream : watch_streams) { if (watch_stream) watch_stream->addFireSignal(timestamp_); } condition.notify_all(); } void StorageWindowView::threadFuncCleanCache() { while (!shutdown_called) { try { cleanCache(); sleep(clean_interval); } catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); break; } } if (!shutdown_called) cleanCacheTask->scheduleAfter(RESCHEDULE_MS); } void StorageWindowView::threadFuncToTable() { while (!shutdown_called && !target_table_id.empty()) { std::unique_lock lock(flush_table_mutex); condition.wait_for(lock, std::chrono::seconds(5)); try { while (!fire_signal.empty()) { UInt32 timestamp_; { std::unique_lock lock_(fire_signal_mutex); timestamp_ = fire_signal.front(); fire_signal.pop_front(); } flushToTable(timestamp_); } } catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); break; } } if (!shutdown_called) toTableTask->scheduleAfter(RESCHEDULE_MS); } void StorageWindowView::threadFuncFire() { while (!shutdown_called) { UInt64 timestamp_usec = static_cast(Poco::Timestamp().epochMicroseconds()); UInt64 w_end = static_cast(getWindowUpperBound(static_cast(timestamp_usec / 1000000))) * 1000000; std::this_thread::sleep_for(std::chrono::microseconds(w_end - timestamp_usec)); std::unique_lock lock(fire_signal_mutex); addFireSignal(static_cast(w_end / 1000000)); } if (!shutdown_called) toTableTask->scheduleAfter(RESCHEDULE_MS); } BlockInputStreams StorageWindowView::watch( const Names & /*column_names*/, const SelectQueryInfo & query_info, const Context & /*context*/, QueryProcessingStage::Enum & processed_stage, size_t /*max_block_size*/, const unsigned /*num_streams*/) { ASTWatchQuery & query = typeid_cast(*query_info.query); bool has_limit = false; UInt64 limit = 0; if (query.limit_length) { has_limit = true; limit = safeGet(typeid_cast(*query.limit_length).value); } auto reader = std::make_shared( std::static_pointer_cast(shared_from_this()), active_ptr, has_limit, limit); { std::lock_guard lock(fire_signal_mutex); watch_streams.push_back(reader.get()); } processed_stage = QueryProcessingStage::Complete; return {reader}; } Block StorageWindowView::getHeader() const { if (!sample_block) { auto storage = global_context.getTable(select_table_id); sample_block = InterpreterSelectQuery(getInnerQuery(), global_context, storage, SelectQueryOptions(QueryProcessingStage::Complete)) .getSampleBlock(); for (size_t i = 0; i < sample_block.columns(); ++i) sample_block.safeGetByPosition(i).column = sample_block.safeGetByPosition(i).column->convertToFullColumnIfConst(); } return sample_block; } StorageWindowView::StorageWindowView( const StorageID & table_id_, Context & local_context, const ASTCreateQuery & query, const ColumnsDescription & columns_, bool attach_) : IStorage(table_id_) , global_context(local_context.getGlobalContext()) , time_zone(DateLUT::instance()) { setColumns(columns_); if (!query.select) throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY); /// Default value, if only table name exist in the query if (query.select->list_of_selects->children.size() != 1) throw Exception("UNION is not supported for Window View", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW); auto inner_query_ = query.select->list_of_selects->children.at(0); ASTSelectQuery & select_query = typeid_cast(*inner_query_); String select_database_name = local_context.getCurrentDatabase(); String select_table_name; extractDependentTable(select_query, select_database_name, select_table_name); select_table_id = StorageID(select_database_name, select_table_name); inner_query = innerQueryParser(select_query); final_query = inner_query->clone(); ParserProcTimeFinalMatcher::Data final_query_data; ParserProcTimeFinalMatcher::Visitor(final_query_data).visit(final_query); is_proctime_tumble = final_query_data.is_proctime_tumble; if (is_proctime_tumble) window_column_name = final_query_data.window_column_name; /// If the table is not specified - use the table `system.one` if (select_table_name.empty()) { select_database_name = "system"; select_table_name = "one"; } global_context.addDependency(select_table_id, table_id_); if (!query.to_table.empty()) target_table_id = StorageID(query.to_database, query.to_table); clean_interval = local_context.getSettingsRef().window_view_clean_interval.totalSeconds(); mergeable_blocks = std::make_shared>(); active_ptr = std::make_shared(true); if (query.watermark_function) { if (is_proctime_tumble) throw Exception("WATERMARK is not support for Processing time processing.", ErrorCodes::INCORRECT_QUERY); // parser watermark function const auto & watermark_function = std::static_pointer_cast(query.watermark_function); if (!startsWith(watermark_function->name, "toInterval")) throw Exception( "Illegal type WATERMARK function " + watermark_function->name + ", should be Interval", ErrorCodes::ILLEGAL_COLUMN); String interval_str = watermark_function->name.substr(10); const auto & interval_units_p1 = std::static_pointer_cast(watermark_function->children.front()->children.front()); watermark_kind = strToIntervalKind(interval_str); try { watermark_num_units = boost::lexical_cast(interval_units_p1->value.get()); } catch (const boost::bad_lexical_cast &) { throw Exception("Cannot parse string '" + interval_units_p1->value.get() + "' as Integer.", ErrorCodes::CANNOT_PARSE_TEXT); } if (watermark_num_units > 0) watermark_num_units *= -1; } if (query.storage) { if (attach_) { inner_table_id = StorageID(table_id_.database_name, generateInnerTableName(table_id_.table_name)); } else { if (query.storage->engine->name != "MergeTree") throw Exception( "The ENGINE of WindowView must be MergeTree family of table engines including the engines with replication support", ErrorCodes::INCORRECT_QUERY); auto manual_create_query = generateInnerTableCreateQuery(query, table_id_.database_name, generateInnerTableName(table_id_.table_name)); InterpreterCreateQuery create_interpreter(manual_create_query, local_context); create_interpreter.setInternal(true); create_interpreter.execute(); inner_storage = global_context.getTable(manual_create_query->database, manual_create_query->table); inner_table_id = inner_storage->getStorageID(); } fetch_column_query = generateFetchColumnsQuery(inner_table_id); } toTableTask = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncToTable(); }); cleanCacheTask = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanCache(); }); fireTask = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncFire(); }); toTableTask->deactivate(); cleanCacheTask->deactivate(); fireTask->deactivate(); } ASTPtr StorageWindowView::innerQueryParser(ASTSelectQuery & query) { if (!query.groupBy()) throw Exception("GROUP BY query is required for " + getName(), ErrorCodes::INCORRECT_QUERY); // parse stage mergeable ASTPtr result = query.clone(); ASTPtr expr_list = result; ParserStageMergeableOneMatcher::Data stageMergeableOneData; ParserStageMergeableOneMatcher::Visitor(stageMergeableOneData).visit(expr_list); if (!stageMergeableOneData.is_tumble && !stageMergeableOneData.is_hop) throw Exception("WINDOW FUNCTION is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY); window_column_name = stageMergeableOneData.window_column_name; // parser window function ASTFunction & window_function = typeid_cast(*stageMergeableOneData.window_function); const auto & arguments = window_function.arguments->children; const auto & arg1 = std::static_pointer_cast(arguments.at(1)); if (!arg1 || !startsWith(arg1->name, "toInterval")) throw Exception("Illegal type of last argument of function " + arg1->name + " should be Interval", ErrorCodes::ILLEGAL_COLUMN); String interval_str = arg1->name.substr(10); window_kind = strToIntervalKind(interval_str); const auto & interval_units_p1 = std::static_pointer_cast(arg1->children.front()->children.front()); window_num_units = stoi(interval_units_p1->value.get()); return result; } void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, const Block & block, const Context & context) { UInt32 timestamp_now = std::time(nullptr); auto block_stream = std::make_shared(block); BlockInputStreams streams; if (window_view.is_proctime_tumble) streams = {std::make_shared(block_stream, timestamp_now)}; else streams = {block_stream}; auto window_proxy_storage = std::make_shared( StorageID("", "WindowViewProxyStorage"), window_view.getParentStorage(), std::move(streams), QueryProcessingStage::FetchColumns); InterpreterSelectQuery select_block( window_view.getFinalQuery(), context, window_proxy_storage, QueryProcessingStage::WithMergeableState); auto data_mergeable_stream = std::make_shared(select_block.execute().in); // extract ____w_end ColumnsWithTypeAndName columns_; columns_.emplace_back( nullptr, std::make_shared(DataTypes{std::make_shared(), std::make_shared()}), window_view.window_column_name); const auto & function_tuple = FunctionFactory::instance().get("tupleElement", context); ExpressionActionsPtr actions_ = std::make_shared(columns_, context); actions_->add(ExpressionAction::addColumn( {std::make_shared()->createColumnConst(1, toField(2)), std::make_shared(), "____tuple_arg"})); actions_->add(ExpressionAction::applyFunction(function_tuple, Names{window_view.window_column_name, "____tuple_arg"}, "____w_end")); actions_->add(ExpressionAction::removeColumn("____tuple_arg")); BlockInputStreamPtr in_stream; UInt32 watermark = window_view.getWatermark(timestamp_now); actions_->add(ExpressionAction::addColumn({std::make_shared()->createColumnConst(1, toField(watermark)), std::make_shared(), "____watermark"})); const auto & function_greater = FunctionFactory::instance().get("greaterOrEquals", context); actions_->add(ExpressionAction::applyFunction(function_greater, Names{"____w_end", "____watermark"}, "____filter")); actions_->add(ExpressionAction::removeColumn("____watermark")); in_stream = std::make_shared(data_mergeable_stream, actions_, "____filter", true); if (!window_view.inner_table_id.empty()) { auto stream = window_view.getInnerStorage()->write(window_view.getInnerQuery(), context); if (window_view.is_proctime_tumble) { std::unique_lock lock(window_view.fire_signal_mutex); copyData(*in_stream, *stream); } else if (window_view.watermark_num_units != 0) { while (Block block_ = in_stream->read()) { auto column_wend = block_.getByName("____w_end").column; stream->write(std::move(block_)); const ColumnUInt32::Container & wend_data = static_cast(*column_wend).getData(); for (size_t i = 0; i < wend_data.size(); ++i) { if (wend_data[i] < timestamp_now) window_view.addFireSignal(wend_data[i]); } } } else copyData(*in_stream, *stream); } else { BlocksListPtr new_mergeable_blocks = std::make_shared(); if (window_view.is_proctime_tumble) { std::unique_lock lock(window_view.fire_signal_mutex); while (Block block_ = in_stream->read()) new_mergeable_blocks->push_back(std::move(block_)); } if (window_view.watermark_num_units != 0) { while (Block block_ = in_stream->read()) { auto column_wend = block_.getByName("____w_end").column; new_mergeable_blocks->push_back(std::move(block_)); const ColumnUInt32::Container & wend_data = static_cast(*column_wend).getData(); for (size_t i = 0; i < wend_data.size(); ++i) { if (wend_data[i] < timestamp_now) window_view.addFireSignal(wend_data[i]); } } } else { while (Block block_ = in_stream->read()) new_mergeable_blocks->push_back(std::move(block_)); } if (!new_mergeable_blocks->empty()) { std::unique_lock lock(window_view.mutex); window_view.getMergeableBlocksList()->push_back(new_mergeable_blocks); } } } void StorageWindowView::startup() { // Start the working thread if (!target_table_id.empty()) toTableTask->activateAndSchedule(); // cleanCacheTask->activateAndSchedule(); fireTask->activateAndSchedule(); } void StorageWindowView::shutdown() { bool expected = false; if (!shutdown_called.compare_exchange_strong(expected, true)) return; toTableTask->deactivate(); cleanCacheTask->deactivate(); fireTask->deactivate(); } StorageWindowView::~StorageWindowView() { shutdown(); } BlockInputStreamPtr StorageWindowView::getNewBlocksInputStreamPtr(UInt32 timestamp_) { BlockInputStreamPtr stream; if (!inner_table_id.empty()) { auto & storage = getInnerStorage(); InterpreterSelectQuery fetch(fetch_column_query, global_context, storage, SelectQueryOptions(QueryProcessingStage::FetchColumns)); ColumnsWithTypeAndName columns_; columns_.emplace_back(nullptr, std::make_shared(), "____w_end"); ExpressionActionsPtr actions_ = std::make_shared(columns_, global_context); actions_->add(ExpressionAction::addColumn({std::make_shared()->createColumnConst(1, toField(timestamp_)), std::make_shared(), "____timestamp_now"})); const auto & function_equals = FunctionFactory::instance().get("equals", global_context); ExpressionActionsPtr apply_function_actions = std::make_shared(columns_, global_context); actions_->add(ExpressionAction::applyFunction(function_equals, Names{"____w_end", "____timestamp_now"}, "____filter")); actions_->add(ExpressionAction::removeColumn("____w_end")); actions_->add(ExpressionAction::removeColumn("____timestamp_now")); stream = std::make_shared(fetch.execute().in, actions_, "____filter", true); } else { if (mergeable_blocks->empty()) return std::make_shared(getHeader()); auto sample_block_ = mergeable_blocks->front()->front().cloneEmpty(); stream = std::make_shared(mergeable_blocks, sample_block_, timestamp_); } BlockInputStreams from; from.push_back(std::move(stream)); auto proxy_storage = std::make_shared( StorageID("", "WindowViewProxyStorage"), getParentStorage(), std::move(from), QueryProcessingStage::WithMergeableState); InterpreterSelectQuery select(getFinalQuery(), global_context, proxy_storage, QueryProcessingStage::Complete); BlockInputStreamPtr data = std::make_shared(select.execute().in); return data; } void registerStorageWindowView(StorageFactory & factory) { factory.registerStorage("WindowView", [](const StorageFactory::Arguments & args) { if (!args.attach && !args.local_context.getSettingsRef().allow_experimental_window_view) throw Exception( "Experimental WINDOW VIEW feature is not enabled (the setting 'allow_experimental_window_view')", ErrorCodes::SUPPORT_IS_DISABLED); return StorageWindowView::create(args.table_id, args.local_context, args.query, args.columns, args.attach); }); } }