diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index c8784522207..27cc46de865 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -1437,19 +1437,17 @@ BlockIO InterpreterCreateQuery::fillTableIfNeeded(const ASTCreateQuery & create) /// If the query is a CREATE SELECT, insert the data into the table. if (create.select && !create.attach && !create.is_ordinary_view && !create.is_live_view - && (!create.is_materialized_view || create.is_populate)) + && (!(create.is_materialized_view || create.is_window_view) || create.is_populate)) { - if (create.is_window_view) - { - auto table = DatabaseCatalog::instance().getTable({create.getDatabase(), create.getTable(), create.uuid}, getContext()); - if (auto * window_view = typeid_cast(table.get())) - return window_view->populate(); - return {}; - } - auto insert = std::make_shared(); insert->table_id = {create.getDatabase(), create.getTable(), create.uuid}; - insert->select = create.select->clone(); + if (create.is_window_view) + { + auto table = DatabaseCatalog::instance().getTable(insert->table_id, getContext()); + insert->select = typeid_cast(table.get())->getSourceTableSelectQuery(); + } + else + insert->select = create.select->clone(); return InterpreterInsertQuery(insert, getContext(), getContext()->getSettingsRef().insert_allow_materialized_columns).execute(); diff --git a/src/Processors/Transforms/buildPushingToViewsChain.cpp b/src/Processors/Transforms/buildPushingToViewsChain.cpp index 8abbb63d418..88a36a9a235 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.cpp +++ b/src/Processors/Transforms/buildPushingToViewsChain.cpp @@ -314,7 +314,7 @@ Chain buildPushingToViewsChain( runtime_stats->type = QueryViewsLogElement::ViewType::WINDOW; query = window_view->getMergeableQuery(); // Used only to log in system.query_views_log out = buildPushingToViewsChain( - dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true, view_thread_status, view_counter_ms, storage_header); + dependent_table, dependent_metadata_snapshot, insert_context, ASTPtr(), true, view_thread_status, view_counter_ms); } else out = buildPushingToViewsChain( @@ -392,7 +392,7 @@ Chain buildPushingToViewsChain( } else if (auto * window_view = dynamic_cast(storage.get())) { - auto sink = std::make_shared(live_view_header, *window_view, storage, context); + auto sink = std::make_shared(window_view->getHeader(), *window_view, storage, context); sink->setRuntimeData(thread_status, elapsed_counter_ms); result_chain.addSource(std::move(sink)); } diff --git a/src/Storages/WindowView/StorageWindowView.cpp b/src/Storages/WindowView/StorageWindowView.cpp index 38b8675b69d..ade544d191f 100644 --- a/src/Storages/WindowView/StorageWindowView.cpp +++ b/src/Storages/WindowView/StorageWindowView.cpp @@ -606,6 +606,54 @@ inline void StorageWindowView::fire(UInt32 watermark) } } +ASTPtr StorageWindowView::getSourceTableSelectQuery() +{ + auto select_query_ = select_query->clone(); + auto & modified_select = select_query_->as(); + + if (hasJoin(modified_select)) + { + auto analyzer_res = TreeRewriterResult({}); + removeJoin(modified_select, analyzer_res, getContext()); + } + else + { + modified_select.setExpression(ASTSelectQuery::Expression::HAVING, {}); + modified_select.setExpression(ASTSelectQuery::Expression::GROUP_BY, {}); + } + + auto select_list = std::make_shared(); + for (const auto & column_name : source_header.getNames()) + select_list->children.emplace_back(std::make_shared(column_name)); + modified_select.setExpression(ASTSelectQuery::Expression::SELECT, select_list); + + if (!is_time_column_func_now) + { + auto select_query_ = select_query->clone(); + DropTableIdentifierMatcher::Data drop_table_identifier_data; + DropTableIdentifierMatcher::Visitor drop_table_identifier_visitor(drop_table_identifier_data); + drop_table_identifier_visitor.visit(select_query_); + + FetchQueryInfoMatcher::Data query_info_data; + FetchQueryInfoMatcher::Visitor(query_info_data).visit(select_query_); + + auto order_by = std::make_shared(); + auto order_by_elem = std::make_shared(); + order_by_elem->children.push_back(std::make_shared(query_info_data.timestamp_column_name)); + order_by_elem->direction = 1; + order_by->children.push_back(order_by_elem); + modified_select.setExpression(ASTSelectQuery::Expression::ORDER_BY, std::move(order_by)); + } + else + modified_select.setExpression(ASTSelectQuery::Expression::ORDER_BY, {}); + + const auto select_with_union_query = std::make_shared(); + select_with_union_query->list_of_selects = std::make_shared(); + select_with_union_query->list_of_selects->children.push_back(select_query_); + + return select_with_union_query; +} + std::shared_ptr StorageWindowView::getInnerTableCreateQuery( const ASTPtr & inner_query, ASTStorage * storage, const String & database_name, const String & table_name) { @@ -1002,16 +1050,11 @@ StorageWindowView::StorageWindowView( const StorageID & table_id_, ContextPtr context_, const ASTCreateQuery & query, - const ColumnsDescription & columns_, bool attach_) : IStorage(table_id_) , WithContext(context_->getGlobalContext()) , log(&Poco::Logger::get(fmt::format("StorageWindowView({}.{})", table_id_.database_name, table_id_.table_name))) { - StorageInMemoryMetadata storage_metadata; - storage_metadata.setColumns(columns_); - setInMemoryMetadata(storage_metadata); - if (!query.select) throw Exception(ErrorCodes::INCORRECT_QUERY, "SELECT query is not specified for {}", getName()); @@ -1021,6 +1064,13 @@ StorageWindowView::StorageWindowView( "UNION is not supported for {}", getName()); select_query = query.select->list_of_selects->children.at(0)->clone(); + + source_header = InterpreterSelectQuery(select_query->clone(), getContext(), SelectQueryOptions(QueryProcessingStage::FetchColumns)) + .getSampleBlock(); + StorageInMemoryMetadata storage_metadata; + storage_metadata.setColumns(ColumnsDescription(source_header.getNamesAndTypesList())); + setInMemoryMetadata(storage_metadata); + String select_database_name = getContext()->getCurrentDatabase(); String select_table_name; auto select_query_tmp = select_query->clone(); @@ -1205,52 +1255,6 @@ private: ContextPtr context; }; -BlockIO StorageWindowView::populate() -{ - QueryPipelineBuilder pipeline; - - InterpreterSelectQuery interpreter_fetch{select_query, getContext(), SelectQueryOptions(QueryProcessingStage::FetchColumns)}; - pipeline = interpreter_fetch.buildQueryPipeline(); - - if (!is_time_column_func_now) - { - SortDescription order_descr; - order_descr.emplace_back(timestamp_column_name); - pipeline.addSimpleTransform( - [&](const Block & header) - { - return std::make_shared( - header, - order_descr, - getContext()->getSettingsRef().max_block_size, - 0 /*LIMIT*/, - getContext()->getSettingsRef().max_bytes_before_remerge_sort, - getContext()->getSettingsRef().remerge_sort_lowered_memory_bytes_ratio, - getContext()->getSettingsRef().max_bytes_before_external_sort, - getContext()->getTemporaryVolume(), - getContext()->getSettingsRef().min_free_disk_space_for_temporary_data); - }); - } - - auto sink = std::make_shared(interpreter_fetch.getSampleBlock(), *this, nullptr, getContext()); - - BlockIO res; - - pipeline.addChain(Chain(std::move(sink))); - pipeline.setMaxThreads(1); - pipeline.setSinks([&](const Block & cur_header, QueryPipelineBuilder::StreamType) -> ProcessorPtr - { - return std::make_shared(cur_header); - }); - - res.pipeline = QueryPipelineBuilder::getPipeline(std::move(pipeline)); - - res.pipeline.addStorageHolder(shared_from_this()); - res.pipeline.addStorageHolder(getInnerTable()); - - return res; -} - void StorageWindowView::writeIntoWindowView( StorageWindowView & window_view, const Block & block, ContextPtr local_context) { @@ -1492,21 +1496,26 @@ void StorageWindowView::dropInnerTableIfAny(bool no_delay, ContextPtr local_cont } } -Block & StorageWindowView::getHeader() const +Block StorageWindowView::getHeader() const +{ + return source_header; +} + +Block StorageWindowView::getTargetHeader() const { std::lock_guard lock(sample_block_lock); - if (!sample_block) + if (!target_header) { - sample_block = InterpreterSelectQuery(select_query->clone(), getContext(), SelectQueryOptions(QueryProcessingStage::Complete)) + target_header = InterpreterSelectQuery(select_query->clone(), getContext(), SelectQueryOptions(QueryProcessingStage::Complete)) .getSampleBlock(); /// convert all columns to full columns /// in case some of them are constant - for (size_t i = 0; i < sample_block.columns(); ++i) + for (size_t i = 0; i < target_header.columns(); ++i) { - sample_block.safeGetByPosition(i).column = sample_block.safeGetByPosition(i).column->convertToFullColumnIfConst(); + target_header.safeGetByPosition(i).column = target_header.safeGetByPosition(i).column->convertToFullColumnIfConst(); } } - return sample_block; + return target_header; } StoragePtr StorageWindowView::getSourceTable() const @@ -1578,7 +1587,7 @@ void registerStorageWindowView(StorageFactory & factory) "Experimental WINDOW VIEW feature is not enabled (the setting 'allow_experimental_window_view')", ErrorCodes::SUPPORT_IS_DISABLED); - return std::make_shared(args.table_id, args.getLocalContext(), args.query, args.columns, args.attach); + return std::make_shared(args.table_id, args.getLocalContext(), args.query, args.attach); }); } diff --git a/src/Storages/WindowView/StorageWindowView.h b/src/Storages/WindowView/StorageWindowView.h index 99c931ac4a8..a95ea2aa715 100644 --- a/src/Storages/WindowView/StorageWindowView.h +++ b/src/Storages/WindowView/StorageWindowView.h @@ -108,7 +108,6 @@ public: const StorageID & table_id_, ContextPtr context_, const ASTCreateQuery & query, - const ColumnsDescription & columns, bool attach_); String getName() const override { return "WindowView"; } @@ -153,6 +152,10 @@ public: ASTPtr getMergeableQuery() const { return mergeable_query->clone(); } + ASTPtr getSourceTableSelectQuery(); + + Block getHeader() const; + private: Poco::Logger * log; @@ -168,7 +171,8 @@ private: bool is_tumble; // false if is hop std::atomic shutdown_called{false}; bool has_inner_table{true}; - mutable Block sample_block; + mutable Block source_header; + mutable Block target_header; UInt64 clean_interval_ms; const DateLUTImpl * time_zone = nullptr; UInt32 max_timestamp = 0; @@ -240,6 +244,6 @@ private: StoragePtr getInnerTable() const; StoragePtr getTargetTable() const; - Block & getHeader() const; + Block getTargetHeader() const; }; } diff --git a/src/Storages/WindowView/WindowViewSource.h b/src/Storages/WindowView/WindowViewSource.h index a726cdc8712..ae5eecfac3c 100644 --- a/src/Storages/WindowView/WindowViewSource.h +++ b/src/Storages/WindowView/WindowViewSource.h @@ -20,7 +20,7 @@ public: : SourceWithProgress( is_events_ ? Block( {ColumnWithTypeAndName(ColumnUInt32::create(), std::make_shared(window_view_timezone_), "watermark")}) - : storage_->getHeader()) + : storage_->getTargetHeader()) , storage(storage_) , is_events(is_events_) , window_view_timezone(window_view_timezone_) @@ -32,7 +32,7 @@ public: header.insert( ColumnWithTypeAndName(ColumnUInt32::create(), std::make_shared(window_view_timezone_), "watermark")); else - header = storage->getHeader(); + header = storage->getTargetHeader(); } String getName() const override { return "WindowViewSource"; }