diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index b23711de83d..929f5fd2d6c 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -303,6 +303,7 @@ static bool isTrivialSelect(const ASTPtr & select) Chain InterpreterInsertQuery::buildChain( const StoragePtr & table, + size_t view_level, const StorageMetadataPtr & metadata_snapshot, const Names & columns, ThreadStatusesHolderPtr thread_status_holder, @@ -324,7 +325,7 @@ Chain InterpreterInsertQuery::buildChain( if (check_access) getContext()->checkAccess(AccessType::INSERT, table->getStorageID(), sample.getNames()); - Chain sink = buildSink(table, metadata_snapshot, thread_status_holder, running_group, elapsed_counter_ms); + Chain sink = buildSink(table, view_level, metadata_snapshot, thread_status_holder, running_group, elapsed_counter_ms); Chain chain = buildPreSinkChain(sink.getInputHeader(), table, metadata_snapshot, sample); chain.appendChain(std::move(sink)); @@ -333,6 +334,7 @@ Chain InterpreterInsertQuery::buildChain( Chain InterpreterInsertQuery::buildSink( const StoragePtr & table, + size_t view_level, const StorageMetadataPtr & metadata_snapshot, ThreadStatusesHolderPtr thread_status_holder, ThreadGroupPtr running_group, @@ -361,7 +363,7 @@ Chain InterpreterInsertQuery::buildSink( else { out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr, - query_ptr, /* view_level */ 0, no_destination, + query_ptr, view_level, no_destination, thread_status_holder, running_group, elapsed_counter_ms, async_insert); } @@ -423,7 +425,14 @@ Chain InterpreterInsertQuery::buildPreSinkChain( return out; } -std::pair, std::vector> InterpreterInsertQuery::buildPreAndSinkChains(size_t presink_streams, size_t sink_streams, StoragePtr table, const StorageMetadataPtr & metadata_snapshot, const Block & query_sample_block) +std::pair, std::vector> InterpreterInsertQuery::buildPreAndSinkChains( + size_t presink_streams, + size_t sink_streams, + StoragePtr table, + size_t view_level, + const StorageMetadataPtr & metadata_snapshot, + const Block & query_sample_block + ) { chassert(presink_streams > 0); chassert(sink_streams > 0); @@ -439,7 +448,7 @@ std::pair, std::vector> InterpreterInsertQuery::buildP for (size_t i = 0; i < sink_streams; ++i) { - auto out = buildSink(table, metadata_snapshot, /* thread_status_holder= */ nullptr, + auto out = buildSink(table, view_level, metadata_snapshot, /* thread_status_holder= */ nullptr, running_group, /* elapsed_counter_ms= */ nullptr); sink_chains.emplace_back(std::move(out)); @@ -639,7 +648,7 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery & auto [presink_chains, sink_chains] = buildPreAndSinkChains( presink_streams_size, sink_streams_size, - table, metadata_snapshot, query_sample_block); + table, /* view_level */ 0, metadata_snapshot, query_sample_block); pipeline.resize(presink_chains.size()); @@ -693,7 +702,7 @@ QueryPipeline InterpreterInsertQuery::buildInsertPipeline(ASTInsertQuery & query { auto [presink_chains, sink_chains] = buildPreAndSinkChains( /* presink_streams */1, /* sink_streams */1, - table, metadata_snapshot, query_sample_block); + table, /* view_level */ 0, metadata_snapshot, query_sample_block); chain = std::move(presink_chains.front()); chain.appendChain(std::move(sink_chains.front())); diff --git a/src/Interpreters/InterpreterInsertQuery.h b/src/Interpreters/InterpreterInsertQuery.h index 894c7c42144..cc1d7b100fa 100644 --- a/src/Interpreters/InterpreterInsertQuery.h +++ b/src/Interpreters/InterpreterInsertQuery.h @@ -43,6 +43,7 @@ public: Chain buildChain( const StoragePtr & table, + size_t view_level, const StorageMetadataPtr & metadata_snapshot, const Names & columns, ThreadStatusesHolderPtr thread_status_holder = {}, @@ -79,13 +80,20 @@ private: std::vector> owned_buffers; - std::pair, std::vector> buildPreAndSinkChains(size_t presink_streams, size_t sink_streams, StoragePtr table, const StorageMetadataPtr & metadata_snapshot, const Block & query_sample_block); + std::pair, std::vector> buildPreAndSinkChains( + size_t presink_streams, + size_t sink_streams, + StoragePtr table, + size_t view_level, + const StorageMetadataPtr & metadata_snapshot, + const Block & query_sample_block); QueryPipeline buildInsertSelectPipeline(ASTInsertQuery & query, StoragePtr table); QueryPipeline buildInsertPipeline(ASTInsertQuery & query, StoragePtr table); Chain buildSink( const StoragePtr & table, + size_t view_level, const StorageMetadataPtr & metadata_snapshot, ThreadStatusesHolderPtr thread_status_holder, ThreadGroupPtr running_group, diff --git a/src/Processors/Transforms/buildPushingToViewsChain.cpp b/src/Processors/Transforms/buildPushingToViewsChain.cpp index 63b5f0506df..74aede2c76a 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.cpp +++ b/src/Processors/Transforms/buildPushingToViewsChain.cpp @@ -372,7 +372,7 @@ std::optional generateViewChain( /// TODO: remove sql_security_type check after we turn `ignore_empty_sql_security_in_create_view_query=false` bool check_access = !materialized_view->hasInnerTable() && materialized_view->getInMemoryMetadataPtr()->sql_security_type; - out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access); + out = interpreter.buildChain(inner_table, view_level + 1, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access); if (interpreter.shouldAddSquashingFroStorage(inner_table)) {