diff --git a/src/Storages/WindowView/StorageWindowView.cpp b/src/Storages/WindowView/StorageWindowView.cpp index 11259b73576..4a20a07ae89 100644 --- a/src/Storages/WindowView/StorageWindowView.cpp +++ b/src/Storages/WindowView/StorageWindowView.cpp @@ -79,19 +79,6 @@ namespace ErrorCodes extern const int UNSUPPORTED_METHOD; } -namespace -{ - -ContextPtr getCorrectContext() -{ - auto result = Context::createCopy(Context::getGlobalContextInstance()); - result->setSetting("allow_experimental_analyzer", false); - return result; -} - -} - - namespace { /// Fetch all window info and replace tumble or hop node names with windowID @@ -441,7 +428,7 @@ ASTPtr StorageWindowView::getCleanupQuery() void StorageWindowView::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr local_context, TableExclusiveLockHolder &) { - InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Truncate, getCorrectContext(), local_context, inner_table_id, true); + InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Truncate, getContext(), local_context, inner_table_id, true); } bool StorageWindowView::optimize( @@ -486,7 +473,7 @@ void StorageWindowView::alter( output_header.clear(); InterpreterDropQuery::executeDropQuery( - ASTDropQuery::Kind::Drop, getCorrectContext(), local_context, inner_table_id, true); + ASTDropQuery::Kind::Drop, getContext(), local_context, inner_table_id, true); /// create inner table auto create_context = Context::createCopy(local_context); @@ -499,8 +486,8 @@ void StorageWindowView::alter( shutdown_called = false; - clean_cache_task = getCorrectContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); }); - fire_task = getCorrectContext()->getSchedulePool().createTask( + clean_cache_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); }); + fire_task = getContext()->getSchedulePool().createTask( getStorageID().getFullTableName(), [this] { is_proctime ? threadFuncFireProc() : threadFuncFireEvent(); }); clean_cache_task->deactivate(); fire_task->deactivate(); @@ -543,7 +530,7 @@ std::pair StorageWindowView::getNewBlocks(UInt32 watermark) auto inner_table = getInnerTable(); InterpreterSelectQuery fetch( inner_fetch_query, - getCorrectContext(), + getContext(), inner_table, inner_table->getInMemoryMetadataPtr(), SelectQueryOptions(QueryProcessingStage::FetchColumns)); @@ -577,8 +564,8 @@ std::pair StorageWindowView::getNewBlocks(UInt32 watermark) filter_function = makeASTFunction("has", func_array, std::make_shared(window_id_name)); } - auto syntax_result = TreeRewriter(getCorrectContext()).analyze(filter_function, builder.getHeader().getNamesAndTypesList()); - auto filter_expression = ExpressionAnalyzer(filter_function, syntax_result, getCorrectContext()).getActionsDAG(false); + auto syntax_result = TreeRewriter(getContext()).analyze(filter_function, builder.getHeader().getNamesAndTypesList()); + auto filter_expression = ExpressionAnalyzer(filter_function, syntax_result, getContext()).getActionsDAG(false); auto filter_actions = std::make_shared(std::move(filter_expression)); builder.addSimpleTransform([&](const Block & header) @@ -594,7 +581,7 @@ std::pair StorageWindowView::getNewBlocks(UInt32 watermark) column.column = column.type->createColumnConst(0, Tuple{w_start, watermark}); auto adding_column_dag = ActionsDAG::makeAddingColumnActions(std::move(column)); auto adding_column_actions - = std::make_shared(std::move(adding_column_dag), ExpressionActionsSettings::fromContext(getCorrectContext())); + = std::make_shared(std::move(adding_column_dag), ExpressionActionsSettings::fromContext(getContext())); builder.addSimpleTransform([&](const Block & header) { return std::make_shared(header, adding_column_actions); @@ -608,7 +595,7 @@ std::pair StorageWindowView::getNewBlocks(UInt32 watermark) new_header.getColumnsWithTypeAndName(), ActionsDAG::MatchColumnsMode::Name); auto actions = std::make_shared( - std::move(convert_actions_dag), ExpressionActionsSettings::fromContext(getCorrectContext(), CompileExpressions::yes)); + std::move(convert_actions_dag), ExpressionActionsSettings::fromContext(getContext(), CompileExpressions::yes)); builder.addSimpleTransform([&](const Block & stream_header) { return std::make_shared(stream_header, actions); @@ -632,11 +619,11 @@ std::pair StorageWindowView::getNewBlocks(UInt32 watermark) return StorageBlocks::createStorage(blocks_id_global, required_columns, std::move(pipes), QueryProcessingStage::WithMergeableState); }; - TemporaryTableHolder blocks_storage(getCorrectContext(), creator); + TemporaryTableHolder blocks_storage(getContext(), creator); InterpreterSelectQuery select( getFinalQuery(), - getCorrectContext(), + getContext(), blocks_storage.getTable(), blocks_storage.getTable()->getInMemoryMetadataPtr(), SelectQueryOptions(QueryProcessingStage::Complete)); @@ -651,8 +638,8 @@ std::pair StorageWindowView::getNewBlocks(UInt32 watermark) { return std::make_shared( current_header, - getCorrectContext()->getSettingsRef().min_insert_block_size_rows, - getCorrectContext()->getSettingsRef().min_insert_block_size_bytes); + getContext()->getSettingsRef().min_insert_block_size_rows, + getContext()->getSettingsRef().min_insert_block_size_bytes); }); auto header = builder.getHeader(); @@ -707,7 +694,7 @@ inline void StorageWindowView::fire(UInt32 watermark) insert->table_id = target_table->getStorageID(); InterpreterInsertQuery interpreter( insert, - getCorrectContext(), + getContext(), /* allow_materialized */ false, /* no_squash */ false, /* no_destination */ false, @@ -720,8 +707,8 @@ inline void StorageWindowView::fire(UInt32 watermark) pipe.getHeader(), block_io.pipeline.getHeader().getNamesAndTypesList(), getTargetTable()->getInMemoryMetadataPtr()->getColumns(), - getCorrectContext(), - getCorrectContext()->getSettingsRef().insert_null_as_default); + getContext(), + getContext()->getSettingsRef().insert_null_as_default); auto adding_missing_defaults_actions = std::make_shared(std::move(adding_missing_defaults_dag)); pipe.addSimpleTransform([&](const Block & stream_header) { @@ -734,7 +721,7 @@ inline void StorageWindowView::fire(UInt32 watermark) ActionsDAG::MatchColumnsMode::Position); auto actions = std::make_shared( std::move(convert_actions_dag), - ExpressionActionsSettings::fromContext(getCorrectContext(), CompileExpressions::yes)); + ExpressionActionsSettings::fromContext(getContext(), CompileExpressions::yes)); pipe.addSimpleTransform([&](const Block & stream_header) { return std::make_shared(stream_header, actions); @@ -755,7 +742,7 @@ ASTPtr StorageWindowView::getSourceTableSelectQuery() if (hasJoin(modified_select)) { auto analyzer_res = TreeRewriterResult({}); - removeJoin(modified_select, analyzer_res, getCorrectContext()); + removeJoin(modified_select, analyzer_res, getContext()); } else { @@ -804,13 +791,13 @@ ASTPtr StorageWindowView::getInnerTableCreateQuery(const ASTPtr & inner_query, c Aliases aliases; QueryAliasesVisitor(aliases).visit(inner_query); auto inner_query_normalized = inner_query->clone(); - QueryNormalizer::Data normalizer_data(aliases, {}, false, getCorrectContext()->getSettingsRef(), false); + QueryNormalizer::Data normalizer_data(aliases, {}, false, getContext()->getSettingsRef(), false); QueryNormalizer(normalizer_data).visit(inner_query_normalized); auto inner_select_query = std::static_pointer_cast(inner_query_normalized); auto t_sample_block - = InterpreterSelectQuery(inner_select_query, getCorrectContext(), SelectQueryOptions(QueryProcessingStage::WithMergeableState)) + = InterpreterSelectQuery(inner_select_query, getContext(), SelectQueryOptions(QueryProcessingStage::WithMergeableState)) .getSampleBlock(); ASTPtr columns_list = InterpreterCreateQuery::formatColumns(t_sample_block.getNamesAndTypesList()); @@ -1028,7 +1015,7 @@ void StorageWindowView::cleanup() std::lock_guard mutex_lock(mutex); auto alter_query = getCleanupQuery(); - auto cleanup_context = Context::createCopy(getCorrectContext()); + auto cleanup_context = Context::createCopy(getContext()); cleanup_context->makeQueryContext(); cleanup_context->setCurrentQueryId(""); cleanup_context->setQueryKindReplicatedDatabaseInternal(); @@ -1227,6 +1214,7 @@ StorageWindowView::StorageWindowView( const String & comment, LoadingStrictnessLevel mode) : IStorage(table_id_) + , WithContext(context_->getGlobalContext()) , log(getLogger(fmt::format("StorageWindowView({}.{})", table_id_.database_name, table_id_.table_name))) , fire_signal_timeout_s(context_->getSettingsRef().wait_for_window_view_fire_signal_timeout.totalSeconds()) , clean_interval_usec(context_->getSettingsRef().window_view_clean_interval.totalMicroseconds()) @@ -1306,8 +1294,8 @@ StorageWindowView::StorageWindowView( if (disabled_due_to_analyzer) return; - clean_cache_task = getCorrectContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); }); - fire_task = getCorrectContext()->getSchedulePool().createTask( + clean_cache_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); }); + fire_task = getContext()->getSchedulePool().createTask( getStorageID().getFullTableName(), [this] { is_proctime ? threadFuncFireProc() : threadFuncFireEvent(); }); clean_cache_task->deactivate(); fire_task->deactivate(); @@ -1318,7 +1306,7 @@ ASTPtr StorageWindowView::initInnerQuery(ASTSelectQuery query, ContextPtr contex select_query = query.clone(); output_header.clear(); - String select_database_name = getCorrectContext()->getCurrentDatabase(); + String select_database_name = getContext()->getCurrentDatabase(); String select_table_name; auto select_query_tmp = query.clone(); extractDependentTable(context_, select_query_tmp, select_database_name, select_table_name); @@ -1735,7 +1723,7 @@ void StorageWindowView::drop() { /// Must be guaranteed at this point for database engine Atomic that has_inner_table == false, /// because otherwise will be a deadlock. - dropInnerTableIfAny(false, getCorrectContext()); + dropInnerTableIfAny(false, getContext()); } void StorageWindowView::dropInnerTableIfAny(bool sync, ContextPtr local_context) @@ -1746,10 +1734,10 @@ void StorageWindowView::dropInnerTableIfAny(bool sync, ContextPtr local_context) try { InterpreterDropQuery::executeDropQuery( - ASTDropQuery::Kind::Drop, getCorrectContext(), local_context, inner_table_id, sync); + ASTDropQuery::Kind::Drop, getContext(), local_context, inner_table_id, sync); if (has_inner_target_table) - InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getCorrectContext(), local_context, target_table_id, sync, /* ignore_sync_setting */ true); + InterpreterDropQuery::executeDropQuery(ASTDropQuery::Kind::Drop, getContext(), local_context, target_table_id, sync, /* ignore_sync_setting */ true); } catch (...) { @@ -1769,7 +1757,7 @@ const Block & StorageWindowView::getOutputHeader() const std::lock_guard lock(sample_block_lock); if (!output_header) { - output_header = InterpreterSelectQuery(select_query->clone(), getCorrectContext(), SelectQueryOptions(QueryProcessingStage::Complete)) + output_header = InterpreterSelectQuery(select_query->clone(), getContext(), SelectQueryOptions(QueryProcessingStage::Complete)) .getSampleBlock(); } return output_header; @@ -1777,17 +1765,17 @@ const Block & StorageWindowView::getOutputHeader() const StoragePtr StorageWindowView::getSourceTable() const { - return DatabaseCatalog::instance().getTable(select_table_id, getCorrectContext()); + return DatabaseCatalog::instance().getTable(select_table_id, getContext()); } StoragePtr StorageWindowView::getInnerTable() const { - return DatabaseCatalog::instance().getTable(inner_table_id, getCorrectContext()); + return DatabaseCatalog::instance().getTable(inner_table_id, getContext()); } StoragePtr StorageWindowView::getTargetTable() const { - return DatabaseCatalog::instance().getTable(target_table_id, getCorrectContext()); + return DatabaseCatalog::instance().getTable(target_table_id, getContext()); } void StorageWindowView::throwIfWindowViewIsDisabled(ContextPtr local_context) const diff --git a/src/Storages/WindowView/StorageWindowView.h b/src/Storages/WindowView/StorageWindowView.h index 3d48eaae548..38fca512ed9 100644 --- a/src/Storages/WindowView/StorageWindowView.h +++ b/src/Storages/WindowView/StorageWindowView.h @@ -100,7 +100,7 @@ using ASTPtr = std::shared_ptr; * Users need to take these duplicated results into account. */ -class StorageWindowView final : public IStorage +class StorageWindowView final : public IStorage, WithContext { friend class WindowViewSource; friend class WatermarkTransform;