From 689a1544380682ffe82960c8aa5c9ee097a88fc8 Mon Sep 17 00:00:00 2001 From: Sema Checherinda <104093494+CheSema@users.noreply.github.com> Date: Fri, 6 Dec 2024 18:04:01 +0100 Subject: [PATCH 1/7] Revert "Revert "Revert "Revert "make d-tor Finalizer more obvious"""" --- src/Storages/MergeTree/MergeTreeSink.cpp | 3 +++ .../MergeTree/MergedBlockOutputStream.cpp | 17 +++++------------ .../MergeTree/MergedBlockOutputStream.h | 2 +- .../MergeTree/ReplicatedMergeTreeSink.cpp | 2 ++ 4 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeSink.cpp b/src/Storages/MergeTree/MergeTreeSink.cpp index d65d1f3212f..582a722d678 100644 --- a/src/Storages/MergeTree/MergeTreeSink.cpp +++ b/src/Storages/MergeTree/MergeTreeSink.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -44,6 +45,8 @@ MergeTreeSink::~MergeTreeSink() if (!delayed_chunk) return; + chassert(isCancelled() || std::uncaught_exceptions()); + for (auto & partition : delayed_chunk->partitions) { partition.temp_part.cancel(); diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index f8863a8a6d6..87b3d238e3b 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -119,7 +119,7 @@ struct MergedBlockOutputStream::Finalizer::Impl } void finish(); - void cancel(); + void cancel() noexcept; }; void MergedBlockOutputStream::Finalizer::finish() @@ -130,7 +130,7 @@ void MergedBlockOutputStream::Finalizer::finish() to_finish->finish(); } -void MergedBlockOutputStream::Finalizer::cancel() +void MergedBlockOutputStream::Finalizer::cancel() noexcept { std::unique_ptr to_cancel = std::move(impl); impl.reset(); @@ -167,7 +167,7 @@ void MergedBlockOutputStream::Finalizer::Impl::finish() part->getDataPartStorage().removeFile(file_name); } -void MergedBlockOutputStream::Finalizer::Impl::cancel() +void MergedBlockOutputStream::Finalizer::Impl::cancel() noexcept { writer.cancel(); @@ -183,15 +183,8 @@ MergedBlockOutputStream::Finalizer::Finalizer(std::unique_ptr impl_) : imp MergedBlockOutputStream::Finalizer::~Finalizer() { - try - { - if (impl) - finish(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } + if (impl) + cancel(); } diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.h b/src/Storages/MergeTree/MergedBlockOutputStream.h index b6fc13cbe42..2f6205427d1 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.h +++ b/src/Storages/MergeTree/MergedBlockOutputStream.h @@ -55,7 +55,7 @@ public: ~Finalizer(); void finish(); - void cancel(); + void cancel() noexcept; }; /// Finalize writing part and fill inner structures diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 19a69eb46be..507b4bfd214 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -179,6 +179,8 @@ ReplicatedMergeTreeSinkImpl::~ReplicatedMergeTreeSinkImpl() if (!delayed_chunk) return; + chassert(isCancelled() || std::uncaught_exceptions()); + for (auto & partition : delayed_chunk->partitions) { partition.temp_part.cancel(); From 0f5775301338d1d47c0ff04fe78f6353ff4dacde Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Tue, 10 Dec 2024 16:24:46 +0100 Subject: [PATCH 2/7] fix bahaviour with materialized_views_ignore_errors --- src/Interpreters/InterpreterInsertQuery.cpp | 2 +- .../Executors/PushingPipelineExecutor.cpp | 5 +- .../Transforms/ExceptionKeepingTransform.cpp | 1 + .../Transforms/buildPushingToViewsChain.cpp | 25 +++- .../Transforms/buildPushingToViewsChain.h | 1 + src/Storages/MergeTree/MergeTreeSink.cpp | 2 + .../configs/no_s3_retries.xml | 11 ++ .../configs/query_log_conf.xml | 7 + .../test_checking_s3_blobs_paranoid/test.py | 127 ++++++++++++++++++ ...materialized_views_ignore_errors.reference | 14 ++ ...03282_materialized_views_ignore_errors.sql | 28 ++++ 11 files changed, 216 insertions(+), 7 deletions(-) create mode 100644 tests/integration/test_checking_s3_blobs_paranoid/configs/no_s3_retries.xml create mode 100644 tests/integration/test_checking_s3_blobs_paranoid/configs/query_log_conf.xml create mode 100644 tests/queries/0_stateless/03282_materialized_views_ignore_errors.reference create mode 100644 tests/queries/0_stateless/03282_materialized_views_ignore_errors.sql diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 797895e4a93..b23711de83d 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -361,7 +361,7 @@ Chain InterpreterInsertQuery::buildSink( else { out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr, - query_ptr, no_destination, + query_ptr, /* view_level */ 0, no_destination, thread_status_holder, running_group, elapsed_counter_ms, async_insert); } diff --git a/src/Processors/Executors/PushingPipelineExecutor.cpp b/src/Processors/Executors/PushingPipelineExecutor.cpp index 7a1c0111a3a..70085938e8e 100644 --- a/src/Processors/Executors/PushingPipelineExecutor.cpp +++ b/src/Processors/Executors/PushingPipelineExecutor.cpp @@ -127,7 +127,10 @@ void PushingPipelineExecutor::finish() finished = true; if (executor) - executor->executeStep(); + { + auto res = executor->executeStep(); + chassert(!res); + } } void PushingPipelineExecutor::cancel() diff --git a/src/Processors/Transforms/ExceptionKeepingTransform.cpp b/src/Processors/Transforms/ExceptionKeepingTransform.cpp index b50f66b0240..9fbf710efd8 100644 --- a/src/Processors/Transforms/ExceptionKeepingTransform.cpp +++ b/src/Processors/Transforms/ExceptionKeepingTransform.cpp @@ -1,3 +1,4 @@ +#include #include #include #include diff --git a/src/Processors/Transforms/buildPushingToViewsChain.cpp b/src/Processors/Transforms/buildPushingToViewsChain.cpp index b48ba3eb8a0..f0c92c261cd 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.cpp +++ b/src/Processors/Transforms/buildPushingToViewsChain.cpp @@ -35,6 +35,7 @@ #include #include +#include #include @@ -120,7 +121,7 @@ using ViewsDataPtr = std::shared_ptr; class CopyingDataToViewsTransform final : public IProcessor { public: - CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data); + CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data, size_t view_level_); String getName() const override { return "CopyingDataToViewsTransform"; } Status prepare() override; @@ -129,6 +130,7 @@ public: private: InputPort & input; ViewsDataPtr views_data; + size_t view_level; }; /// For source chunk, execute view query over it. @@ -223,6 +225,7 @@ private: /// Generates one chain part for every view in buildPushingToViewsChain std::optional generateViewChain( ContextPtr context, + size_t view_level, const StorageID & view_id, ThreadGroupPtr running_group, Chain & result_chain, @@ -400,6 +403,7 @@ std::optional generateViewChain( query = live_view->getInnerQuery(); out = buildPushingToViewsChain( view, view_metadata_snapshot, insert_context, ASTPtr(), + view_level + 1, /* no_destination= */ true, thread_status_holder, running_group, view_counter_ms, async_insert, storage_header); } @@ -409,12 +413,14 @@ std::optional generateViewChain( query = window_view->getMergeableQuery(); out = buildPushingToViewsChain( view, view_metadata_snapshot, insert_context, ASTPtr(), + view_level + 1, /* no_destination= */ true, thread_status_holder, running_group, view_counter_ms, async_insert); } else out = buildPushingToViewsChain( view, view_metadata_snapshot, insert_context, ASTPtr(), + view_level + 1, /* no_destination= */ false, thread_status_holder, running_group, view_counter_ms, async_insert); @@ -466,12 +472,14 @@ Chain buildPushingToViewsChain( const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const ASTPtr & query_ptr, + size_t view_level, bool no_destination, ThreadStatusesHolderPtr thread_status_holder, ThreadGroupPtr running_group, std::atomic_uint64_t * elapsed_counter_ms, bool async_insert, - const Block & live_view_header) + const Block & live_view_header + ) { checkStackSize(); Chain result_chain; @@ -514,7 +522,7 @@ Chain buildPushingToViewsChain( try { auto out = generateViewChain( - context, view_id, running_group, result_chain, + context, view_level, view_id, running_group, result_chain, views_data, thread_status_holder, async_insert, storage_header, disable_deduplication_for_children); if (!out.has_value()) @@ -554,7 +562,7 @@ Chain buildPushingToViewsChain( for (const auto & chain : chains) headers.push_back(chain.getOutputHeader()); - auto copying_data = std::make_shared(storage_header, views_data); + auto copying_data = std::make_shared(storage_header, views_data, view_level); auto finalizing_views = std::make_shared(std::move(headers), views_data); auto out = copying_data->getOutputs().begin(); auto in = finalizing_views->getInputs().begin(); @@ -726,10 +734,11 @@ static void logQueryViews(std::list & views, ContextPtr context } -CopyingDataToViewsTransform::CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data) +CopyingDataToViewsTransform::CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data, size_t view_level_) : IProcessor({header}, OutputPorts(data->views.size(), header)) , input(inputs.front()) , views_data(std::move(data)) + , view_level(view_level_) { if (views_data->views.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "CopyingDataToViewsTransform cannot have zero outputs"); @@ -765,6 +774,12 @@ IProcessor::Status CopyingDataToViewsTransform::prepare() auto data = input.pullData(); if (data.exception) { + // If view_level == 0 than the exception comes from the source table. + // There is no case when we could tolerate exceptions from the source table. + // Do not tolerate incomming exception and do not pass it to the follwing processors. + if (view_level == 0) + std::rethrow_exception(data.exception); + if (!views_data->has_exception) { views_data->first_exception = data.exception; diff --git a/src/Processors/Transforms/buildPushingToViewsChain.h b/src/Processors/Transforms/buildPushingToViewsChain.h index a1feed91b60..4308bbc56f4 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.h +++ b/src/Processors/Transforms/buildPushingToViewsChain.h @@ -60,6 +60,7 @@ Chain buildPushingToViewsChain( const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const ASTPtr & query_ptr, + size_t view_level, /// It is true when we should not insert into table, but only to views. /// Used e.g. for kafka. We should try to remove it somehow. bool no_destination, diff --git a/src/Storages/MergeTree/MergeTreeSink.cpp b/src/Storages/MergeTree/MergeTreeSink.cpp index 582a722d678..47f5ab97119 100644 --- a/src/Storages/MergeTree/MergeTreeSink.cpp +++ b/src/Storages/MergeTree/MergeTreeSink.cpp @@ -4,6 +4,7 @@ #include #include #include +#include "Common/logger_useful.h" #include #include @@ -79,6 +80,7 @@ void MergeTreeSink::onStart() void MergeTreeSink::onFinish() { chassert(!isCancelled()); + finishDelayedChunk(); } diff --git a/tests/integration/test_checking_s3_blobs_paranoid/configs/no_s3_retries.xml b/tests/integration/test_checking_s3_blobs_paranoid/configs/no_s3_retries.xml new file mode 100644 index 00000000000..9dd3600329a --- /dev/null +++ b/tests/integration/test_checking_s3_blobs_paranoid/configs/no_s3_retries.xml @@ -0,0 +1,11 @@ + + + + + + 1 + 0 + 0 + + + diff --git a/tests/integration/test_checking_s3_blobs_paranoid/configs/query_log_conf.xml b/tests/integration/test_checking_s3_blobs_paranoid/configs/query_log_conf.xml new file mode 100644 index 00000000000..0e51871c2ef --- /dev/null +++ b/tests/integration/test_checking_s3_blobs_paranoid/configs/query_log_conf.xml @@ -0,0 +1,7 @@ + + + system + query_log
+ broken_s3 +
+
diff --git a/tests/integration/test_checking_s3_blobs_paranoid/test.py b/tests/integration/test_checking_s3_blobs_paranoid/test.py index 2cc70eba2ab..dbc2a0385af 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/test.py +++ b/tests/integration/test_checking_s3_blobs_paranoid/test.py @@ -39,6 +39,18 @@ def cluster(): ], with_minio=True, ) + cluster.add_instance( + "node_with_query_log_on_s3", + main_configs=[ + "configs/storage_conf.xml", + "configs/query_log_conf.xml" + ], + user_configs=[ + "configs/setting.xml", + "configs/no_s3_retries.xml", + ], + with_minio=True, + ) logging.info("Starting cluster...") cluster.start() logging.info("Cluster started") @@ -718,3 +730,118 @@ def test_no_key_found_disk(cluster, broken_s3): ) assert s3_disk_no_key_errors_metric_value > 0 + + +def test_node_with_query_log_on_s3(cluster, broken_s3): + node = cluster.instances["node_with_query_log_on_s3"] + + node.query( + """ + SYSTEM FLUSH LOGS + """ + ) + + node.query( + """ + CREATE TABLE log_sink + ENGINE = MergeTree() + ORDER BY () + EMPTY AS + SELECT * + FROM system.query_log + """ + ) + + node.query( + """ + CREATE MATERIALIZED VIEW log_sink_mv TO log_sink AS + SELECT * + FROM system.query_log + """ + ) + + node.query( + """ + SELECT 1111 + """ + ) + + node.query( + """ + SYSTEM FLUSH LOGS + """ + ) + + node.query( + """ + SELECT 2222 + """ + ) + + broken_s3.setup_at_object_upload(count=100, after=0) + + node.query( + """ + SYSTEM FLUSH LOGS + """ + ) + + count_from_query_log = node.query( + """ + SELECT count() from system.query_log WHERE query like 'SELECT 2222%' AND type = 'QueryFinish' + """) + + assert count_from_query_log == "0\n" + + +def test_exception_in_onFinish(cluster, broken_s3): + node = cluster.instances["node_with_query_log_on_s3"] + + node.query( + """ + CREATE TABLE source (i Int64) + ENGINE = MergeTree() + ORDER BY () + SETTINGS storage_policy='broken_s3' + """ + ) + + node.query( + """ + CREATE TABLE source_sink + ENGINE = MergeTree() + ORDER BY () + EMPTY AS + SELECT * + FROM source + """ + ) + + node.query( + """ + CREATE MATERIALIZED VIEW source_sink_mv TO source_sink AS + SELECT * + FROM source + """ + ) + + node.query( + """ + INSERT INTO source SETTINGS materialized_views_ignore_errors=1 VALUES (1) + """ + ) + + broken_s3.setup_at_object_upload(count=100, after=0) + + node.query_and_get_error( + """ + INSERT INTO source SETTINGS materialized_views_ignore_errors=1 VALUES (2) + """ + ) + + count_from_query_log = node.query( + """ + SELECT count() from source + """) + + assert count_from_query_log == "1\n" diff --git a/tests/queries/0_stateless/03282_materialized_views_ignore_errors.reference b/tests/queries/0_stateless/03282_materialized_views_ignore_errors.reference new file mode 100644 index 00000000000..2ed34740cc4 --- /dev/null +++ b/tests/queries/0_stateless/03282_materialized_views_ignore_errors.reference @@ -0,0 +1,14 @@ +-- { echoOn } + +insert into testX select number from numbers(20) + settings materialized_views_ignore_errors = 0; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } +select count() from testX; +0 +select count() from testXA; +0 +insert into testX select number from numbers(20) + settings materialized_views_ignore_errors = 1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } +select count() from testX; +0 +select count() from testXA; +0 diff --git a/tests/queries/0_stateless/03282_materialized_views_ignore_errors.sql b/tests/queries/0_stateless/03282_materialized_views_ignore_errors.sql new file mode 100644 index 00000000000..c5f4e94eecb --- /dev/null +++ b/tests/queries/0_stateless/03282_materialized_views_ignore_errors.sql @@ -0,0 +1,28 @@ +-- more blocks to process +set max_block_size = 10; +set min_insert_block_size_rows = 10; + +drop table if exists testX; +drop table if exists testXA; + +create table testX (A Int64) engine=MergeTree partition by (intDiv(A, 10), throwIf(A=2)) order by tuple(); +create materialized view testXA engine=MergeTree order by tuple() as select sleep(0.1) from testX; + +-- { echoOn } + +insert into testX select number from numbers(20) + settings materialized_views_ignore_errors = 0; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } + +select count() from testX; +select count() from testXA; + +insert into testX select number from numbers(20) + settings materialized_views_ignore_errors = 1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } + +select count() from testX; +select count() from testXA; + +-- { echoOff } + +drop table testX; +drop view testXA; From ea17c5d133bcf195bac4264e470a0ca2d724eff6 Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Tue, 10 Dec 2024 16:26:38 +0100 Subject: [PATCH 3/7] fix headers --- src/Storages/MergeTree/MergeTreeSink.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Storages/MergeTree/MergeTreeSink.cpp b/src/Storages/MergeTree/MergeTreeSink.cpp index 47f5ab97119..dfddcb311f4 100644 --- a/src/Storages/MergeTree/MergeTreeSink.cpp +++ b/src/Storages/MergeTree/MergeTreeSink.cpp @@ -4,7 +4,6 @@ #include #include #include -#include "Common/logger_useful.h" #include #include From 4932f599eaa301fd7e05727531baf45723f1a818 Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Tue, 10 Dec 2024 16:43:56 +0100 Subject: [PATCH 4/7] fix style --- src/Processors/Transforms/buildPushingToViewsChain.cpp | 2 +- tests/integration/test_checking_s3_blobs_paranoid/test.py | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/Processors/Transforms/buildPushingToViewsChain.cpp b/src/Processors/Transforms/buildPushingToViewsChain.cpp index f0c92c261cd..63b5f0506df 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.cpp +++ b/src/Processors/Transforms/buildPushingToViewsChain.cpp @@ -776,7 +776,7 @@ IProcessor::Status CopyingDataToViewsTransform::prepare() { // If view_level == 0 than the exception comes from the source table. // There is no case when we could tolerate exceptions from the source table. - // Do not tolerate incomming exception and do not pass it to the follwing processors. + // Do not tolerate incoming exception and do not pass it to the following processors. if (view_level == 0) std::rethrow_exception(data.exception); diff --git a/tests/integration/test_checking_s3_blobs_paranoid/test.py b/tests/integration/test_checking_s3_blobs_paranoid/test.py index dbc2a0385af..84e92e7205d 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/test.py +++ b/tests/integration/test_checking_s3_blobs_paranoid/test.py @@ -43,7 +43,7 @@ def cluster(): "node_with_query_log_on_s3", main_configs=[ "configs/storage_conf.xml", - "configs/query_log_conf.xml" + "configs/query_log_conf.xml", ], user_configs=[ "configs/setting.xml", @@ -789,7 +789,8 @@ def test_node_with_query_log_on_s3(cluster, broken_s3): count_from_query_log = node.query( """ SELECT count() from system.query_log WHERE query like 'SELECT 2222%' AND type = 'QueryFinish' - """) + """ + ) assert count_from_query_log == "0\n" @@ -842,6 +843,7 @@ def test_exception_in_onFinish(cluster, broken_s3): count_from_query_log = node.query( """ SELECT count() from source - """) + """ + ) assert count_from_query_log == "1\n" From c0172f54ccc5b4aee58c1146364fc61f62a743ba Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Wed, 11 Dec 2024 12:06:30 +0100 Subject: [PATCH 5/7] fix new tests --- .../test_checking_s3_blobs_paranoid/test.py | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tests/integration/test_checking_s3_blobs_paranoid/test.py b/tests/integration/test_checking_s3_blobs_paranoid/test.py index 84e92e7205d..0cbba8493af 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/test.py +++ b/tests/integration/test_checking_s3_blobs_paranoid/test.py @@ -741,6 +741,19 @@ def test_node_with_query_log_on_s3(cluster, broken_s3): """ ) + node.query( + """ + DROP VIEW IF EXISTS log_sink_mv + """ + ) + + + node.query( + """ + DROP TABLE IF EXISTS log_sink + """ + ) + node.query( """ CREATE TABLE log_sink @@ -798,6 +811,24 @@ def test_node_with_query_log_on_s3(cluster, broken_s3): def test_exception_in_onFinish(cluster, broken_s3): node = cluster.instances["node_with_query_log_on_s3"] + node.query( + """ + DROP VIEW IF EXISTS source_sink_mv + """ + ) + + node.query( + """ + DROP TABLE IF EXISTS source_sink + """ + ) + + node.query( + """ + DROP TABLE IF EXISTS source + """ + ) + node.query( """ CREATE TABLE source (i Int64) From 170cdfdf087e3fe5ccca3744b1b2ad83ea783387 Mon Sep 17 00:00:00 2001 From: robot-clickhouse Date: Wed, 11 Dec 2024 11:15:26 +0000 Subject: [PATCH 6/7] Automatic style fix --- tests/integration/test_checking_s3_blobs_paranoid/test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/test_checking_s3_blobs_paranoid/test.py b/tests/integration/test_checking_s3_blobs_paranoid/test.py index 0cbba8493af..78eed431771 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/test.py +++ b/tests/integration/test_checking_s3_blobs_paranoid/test.py @@ -747,7 +747,6 @@ def test_node_with_query_log_on_s3(cluster, broken_s3): """ ) - node.query( """ DROP TABLE IF EXISTS log_sink From 673276ad4541e6d8283ff52ef5b2a382f711de0a Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Wed, 11 Dec 2024 15:42:19 +0100 Subject: [PATCH 7/7] fix view_level --- src/Interpreters/InterpreterInsertQuery.cpp | 21 +++++++++++++------ src/Interpreters/InterpreterInsertQuery.h | 10 ++++++++- .../Transforms/buildPushingToViewsChain.cpp | 2 +- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index b23711de83d..929f5fd2d6c 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -303,6 +303,7 @@ static bool isTrivialSelect(const ASTPtr & select) Chain InterpreterInsertQuery::buildChain( const StoragePtr & table, + size_t view_level, const StorageMetadataPtr & metadata_snapshot, const Names & columns, ThreadStatusesHolderPtr thread_status_holder, @@ -324,7 +325,7 @@ Chain InterpreterInsertQuery::buildChain( if (check_access) getContext()->checkAccess(AccessType::INSERT, table->getStorageID(), sample.getNames()); - Chain sink = buildSink(table, metadata_snapshot, thread_status_holder, running_group, elapsed_counter_ms); + Chain sink = buildSink(table, view_level, metadata_snapshot, thread_status_holder, running_group, elapsed_counter_ms); Chain chain = buildPreSinkChain(sink.getInputHeader(), table, metadata_snapshot, sample); chain.appendChain(std::move(sink)); @@ -333,6 +334,7 @@ Chain InterpreterInsertQuery::buildChain( Chain InterpreterInsertQuery::buildSink( const StoragePtr & table, + size_t view_level, const StorageMetadataPtr & metadata_snapshot, ThreadStatusesHolderPtr thread_status_holder, ThreadGroupPtr running_group, @@ -361,7 +363,7 @@ Chain InterpreterInsertQuery::buildSink( else { out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr, - query_ptr, /* view_level */ 0, no_destination, + query_ptr, view_level, no_destination, thread_status_holder, running_group, elapsed_counter_ms, async_insert); } @@ -423,7 +425,14 @@ Chain InterpreterInsertQuery::buildPreSinkChain( return out; } -std::pair, std::vector> InterpreterInsertQuery::buildPreAndSinkChains(size_t presink_streams, size_t sink_streams, StoragePtr table, const StorageMetadataPtr & metadata_snapshot, const Block & query_sample_block) +std::pair, std::vector> InterpreterInsertQuery::buildPreAndSinkChains( + size_t presink_streams, + size_t sink_streams, + StoragePtr table, + size_t view_level, + const StorageMetadataPtr & metadata_snapshot, + const Block & query_sample_block + ) { chassert(presink_streams > 0); chassert(sink_streams > 0); @@ -439,7 +448,7 @@ std::pair, std::vector> InterpreterInsertQuery::buildP for (size_t i = 0; i < sink_streams; ++i) { - auto out = buildSink(table, metadata_snapshot, /* thread_status_holder= */ nullptr, + auto out = buildSink(table, view_level, metadata_snapshot, /* thread_status_holder= */ nullptr, running_group, /* elapsed_counter_ms= */ nullptr); sink_chains.emplace_back(std::move(out)); @@ -639,7 +648,7 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery & auto [presink_chains, sink_chains] = buildPreAndSinkChains( presink_streams_size, sink_streams_size, - table, metadata_snapshot, query_sample_block); + table, /* view_level */ 0, metadata_snapshot, query_sample_block); pipeline.resize(presink_chains.size()); @@ -693,7 +702,7 @@ QueryPipeline InterpreterInsertQuery::buildInsertPipeline(ASTInsertQuery & query { auto [presink_chains, sink_chains] = buildPreAndSinkChains( /* presink_streams */1, /* sink_streams */1, - table, metadata_snapshot, query_sample_block); + table, /* view_level */ 0, metadata_snapshot, query_sample_block); chain = std::move(presink_chains.front()); chain.appendChain(std::move(sink_chains.front())); diff --git a/src/Interpreters/InterpreterInsertQuery.h b/src/Interpreters/InterpreterInsertQuery.h index 894c7c42144..cc1d7b100fa 100644 --- a/src/Interpreters/InterpreterInsertQuery.h +++ b/src/Interpreters/InterpreterInsertQuery.h @@ -43,6 +43,7 @@ public: Chain buildChain( const StoragePtr & table, + size_t view_level, const StorageMetadataPtr & metadata_snapshot, const Names & columns, ThreadStatusesHolderPtr thread_status_holder = {}, @@ -79,13 +80,20 @@ private: std::vector> owned_buffers; - std::pair, std::vector> buildPreAndSinkChains(size_t presink_streams, size_t sink_streams, StoragePtr table, const StorageMetadataPtr & metadata_snapshot, const Block & query_sample_block); + std::pair, std::vector> buildPreAndSinkChains( + size_t presink_streams, + size_t sink_streams, + StoragePtr table, + size_t view_level, + const StorageMetadataPtr & metadata_snapshot, + const Block & query_sample_block); QueryPipeline buildInsertSelectPipeline(ASTInsertQuery & query, StoragePtr table); QueryPipeline buildInsertPipeline(ASTInsertQuery & query, StoragePtr table); Chain buildSink( const StoragePtr & table, + size_t view_level, const StorageMetadataPtr & metadata_snapshot, ThreadStatusesHolderPtr thread_status_holder, ThreadGroupPtr running_group, diff --git a/src/Processors/Transforms/buildPushingToViewsChain.cpp b/src/Processors/Transforms/buildPushingToViewsChain.cpp index 63b5f0506df..74aede2c76a 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.cpp +++ b/src/Processors/Transforms/buildPushingToViewsChain.cpp @@ -372,7 +372,7 @@ std::optional generateViewChain( /// TODO: remove sql_security_type check after we turn `ignore_empty_sql_security_in_create_view_query=false` bool check_access = !materialized_view->hasInnerTable() && materialized_view->getInMemoryMetadataPtr()->sql_security_type; - out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access); + out = interpreter.buildChain(inner_table, view_level + 1, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access); if (interpreter.shouldAddSquashingFroStorage(inner_table)) {