fix view_level

This commit is contained in:
Sema Checherinda 2024-12-11 15:42:19 +01:00
parent 170cdfdf08
commit 673276ad45
3 changed files with 25 additions and 8 deletions

View File

@ -303,6 +303,7 @@ static bool isTrivialSelect(const ASTPtr & select)
Chain InterpreterInsertQuery::buildChain( Chain InterpreterInsertQuery::buildChain(
const StoragePtr & table, const StoragePtr & table,
size_t view_level,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
const Names & columns, const Names & columns,
ThreadStatusesHolderPtr thread_status_holder, ThreadStatusesHolderPtr thread_status_holder,
@ -324,7 +325,7 @@ Chain InterpreterInsertQuery::buildChain(
if (check_access) if (check_access)
getContext()->checkAccess(AccessType::INSERT, table->getStorageID(), sample.getNames()); getContext()->checkAccess(AccessType::INSERT, table->getStorageID(), sample.getNames());
Chain sink = buildSink(table, metadata_snapshot, thread_status_holder, running_group, elapsed_counter_ms); Chain sink = buildSink(table, view_level, metadata_snapshot, thread_status_holder, running_group, elapsed_counter_ms);
Chain chain = buildPreSinkChain(sink.getInputHeader(), table, metadata_snapshot, sample); Chain chain = buildPreSinkChain(sink.getInputHeader(), table, metadata_snapshot, sample);
chain.appendChain(std::move(sink)); chain.appendChain(std::move(sink));
@ -333,6 +334,7 @@ Chain InterpreterInsertQuery::buildChain(
Chain InterpreterInsertQuery::buildSink( Chain InterpreterInsertQuery::buildSink(
const StoragePtr & table, const StoragePtr & table,
size_t view_level,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
ThreadStatusesHolderPtr thread_status_holder, ThreadStatusesHolderPtr thread_status_holder,
ThreadGroupPtr running_group, ThreadGroupPtr running_group,
@ -361,7 +363,7 @@ Chain InterpreterInsertQuery::buildSink(
else else
{ {
out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr, out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr,
query_ptr, /* view_level */ 0, no_destination, query_ptr, view_level, no_destination,
thread_status_holder, running_group, elapsed_counter_ms, async_insert); thread_status_holder, running_group, elapsed_counter_ms, async_insert);
} }
@ -423,7 +425,14 @@ Chain InterpreterInsertQuery::buildPreSinkChain(
return out; return out;
} }
std::pair<std::vector<Chain>, std::vector<Chain>> InterpreterInsertQuery::buildPreAndSinkChains(size_t presink_streams, size_t sink_streams, StoragePtr table, const StorageMetadataPtr & metadata_snapshot, const Block & query_sample_block) std::pair<std::vector<Chain>, std::vector<Chain>> InterpreterInsertQuery::buildPreAndSinkChains(
size_t presink_streams,
size_t sink_streams,
StoragePtr table,
size_t view_level,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block
)
{ {
chassert(presink_streams > 0); chassert(presink_streams > 0);
chassert(sink_streams > 0); chassert(sink_streams > 0);
@ -439,7 +448,7 @@ std::pair<std::vector<Chain>, std::vector<Chain>> InterpreterInsertQuery::buildP
for (size_t i = 0; i < sink_streams; ++i) for (size_t i = 0; i < sink_streams; ++i)
{ {
auto out = buildSink(table, metadata_snapshot, /* thread_status_holder= */ nullptr, auto out = buildSink(table, view_level, metadata_snapshot, /* thread_status_holder= */ nullptr,
running_group, /* elapsed_counter_ms= */ nullptr); running_group, /* elapsed_counter_ms= */ nullptr);
sink_chains.emplace_back(std::move(out)); sink_chains.emplace_back(std::move(out));
@ -639,7 +648,7 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery &
auto [presink_chains, sink_chains] = buildPreAndSinkChains( auto [presink_chains, sink_chains] = buildPreAndSinkChains(
presink_streams_size, sink_streams_size, presink_streams_size, sink_streams_size,
table, metadata_snapshot, query_sample_block); table, /* view_level */ 0, metadata_snapshot, query_sample_block);
pipeline.resize(presink_chains.size()); pipeline.resize(presink_chains.size());
@ -693,7 +702,7 @@ QueryPipeline InterpreterInsertQuery::buildInsertPipeline(ASTInsertQuery & query
{ {
auto [presink_chains, sink_chains] = buildPreAndSinkChains( auto [presink_chains, sink_chains] = buildPreAndSinkChains(
/* presink_streams */1, /* sink_streams */1, /* presink_streams */1, /* sink_streams */1,
table, metadata_snapshot, query_sample_block); table, /* view_level */ 0, metadata_snapshot, query_sample_block);
chain = std::move(presink_chains.front()); chain = std::move(presink_chains.front());
chain.appendChain(std::move(sink_chains.front())); chain.appendChain(std::move(sink_chains.front()));

View File

@ -43,6 +43,7 @@ public:
Chain buildChain( Chain buildChain(
const StoragePtr & table, const StoragePtr & table,
size_t view_level,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
const Names & columns, const Names & columns,
ThreadStatusesHolderPtr thread_status_holder = {}, ThreadStatusesHolderPtr thread_status_holder = {},
@ -79,13 +80,20 @@ private:
std::vector<std::unique_ptr<ReadBuffer>> owned_buffers; std::vector<std::unique_ptr<ReadBuffer>> owned_buffers;
std::pair<std::vector<Chain>, std::vector<Chain>> buildPreAndSinkChains(size_t presink_streams, size_t sink_streams, StoragePtr table, const StorageMetadataPtr & metadata_snapshot, const Block & query_sample_block); std::pair<std::vector<Chain>, std::vector<Chain>> buildPreAndSinkChains(
size_t presink_streams,
size_t sink_streams,
StoragePtr table,
size_t view_level,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block);
QueryPipeline buildInsertSelectPipeline(ASTInsertQuery & query, StoragePtr table); QueryPipeline buildInsertSelectPipeline(ASTInsertQuery & query, StoragePtr table);
QueryPipeline buildInsertPipeline(ASTInsertQuery & query, StoragePtr table); QueryPipeline buildInsertPipeline(ASTInsertQuery & query, StoragePtr table);
Chain buildSink( Chain buildSink(
const StoragePtr & table, const StoragePtr & table,
size_t view_level,
const StorageMetadataPtr & metadata_snapshot, const StorageMetadataPtr & metadata_snapshot,
ThreadStatusesHolderPtr thread_status_holder, ThreadStatusesHolderPtr thread_status_holder,
ThreadGroupPtr running_group, ThreadGroupPtr running_group,

View File

@ -372,7 +372,7 @@ std::optional<Chain> generateViewChain(
/// TODO: remove sql_security_type check after we turn `ignore_empty_sql_security_in_create_view_query=false` /// TODO: remove sql_security_type check after we turn `ignore_empty_sql_security_in_create_view_query=false`
bool check_access = !materialized_view->hasInnerTable() && materialized_view->getInMemoryMetadataPtr()->sql_security_type; bool check_access = !materialized_view->hasInnerTable() && materialized_view->getInMemoryMetadataPtr()->sql_security_type;
out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access); out = interpreter.buildChain(inner_table, view_level + 1, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access);
if (interpreter.shouldAddSquashingFroStorage(inner_table)) if (interpreter.shouldAddSquashingFroStorage(inner_table))
{ {