diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 797895e4a93..929f5fd2d6c 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -303,6 +303,7 @@ static bool isTrivialSelect(const ASTPtr & select) Chain InterpreterInsertQuery::buildChain( const StoragePtr & table, + size_t view_level, const StorageMetadataPtr & metadata_snapshot, const Names & columns, ThreadStatusesHolderPtr thread_status_holder, @@ -324,7 +325,7 @@ Chain InterpreterInsertQuery::buildChain( if (check_access) getContext()->checkAccess(AccessType::INSERT, table->getStorageID(), sample.getNames()); - Chain sink = buildSink(table, metadata_snapshot, thread_status_holder, running_group, elapsed_counter_ms); + Chain sink = buildSink(table, view_level, metadata_snapshot, thread_status_holder, running_group, elapsed_counter_ms); Chain chain = buildPreSinkChain(sink.getInputHeader(), table, metadata_snapshot, sample); chain.appendChain(std::move(sink)); @@ -333,6 +334,7 @@ Chain InterpreterInsertQuery::buildChain( Chain InterpreterInsertQuery::buildSink( const StoragePtr & table, + size_t view_level, const StorageMetadataPtr & metadata_snapshot, ThreadStatusesHolderPtr thread_status_holder, ThreadGroupPtr running_group, @@ -361,7 +363,7 @@ Chain InterpreterInsertQuery::buildSink( else { out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr, - query_ptr, no_destination, + query_ptr, view_level, no_destination, thread_status_holder, running_group, elapsed_counter_ms, async_insert); } @@ -423,7 +425,14 @@ Chain InterpreterInsertQuery::buildPreSinkChain( return out; } -std::pair, std::vector> InterpreterInsertQuery::buildPreAndSinkChains(size_t presink_streams, size_t sink_streams, StoragePtr table, const StorageMetadataPtr & metadata_snapshot, const Block & query_sample_block) +std::pair, std::vector> InterpreterInsertQuery::buildPreAndSinkChains( + size_t presink_streams, + size_t sink_streams, + StoragePtr table, + size_t view_level, + const StorageMetadataPtr & metadata_snapshot, + const Block & query_sample_block + ) { chassert(presink_streams > 0); chassert(sink_streams > 0); @@ -439,7 +448,7 @@ std::pair, std::vector> InterpreterInsertQuery::buildP for (size_t i = 0; i < sink_streams; ++i) { - auto out = buildSink(table, metadata_snapshot, /* thread_status_holder= */ nullptr, + auto out = buildSink(table, view_level, metadata_snapshot, /* thread_status_holder= */ nullptr, running_group, /* elapsed_counter_ms= */ nullptr); sink_chains.emplace_back(std::move(out)); @@ -639,7 +648,7 @@ QueryPipeline InterpreterInsertQuery::buildInsertSelectPipeline(ASTInsertQuery & auto [presink_chains, sink_chains] = buildPreAndSinkChains( presink_streams_size, sink_streams_size, - table, metadata_snapshot, query_sample_block); + table, /* view_level */ 0, metadata_snapshot, query_sample_block); pipeline.resize(presink_chains.size()); @@ -693,7 +702,7 @@ QueryPipeline InterpreterInsertQuery::buildInsertPipeline(ASTInsertQuery & query { auto [presink_chains, sink_chains] = buildPreAndSinkChains( /* presink_streams */1, /* sink_streams */1, - table, metadata_snapshot, query_sample_block); + table, /* view_level */ 0, metadata_snapshot, query_sample_block); chain = std::move(presink_chains.front()); chain.appendChain(std::move(sink_chains.front())); diff --git a/src/Interpreters/InterpreterInsertQuery.h b/src/Interpreters/InterpreterInsertQuery.h index 894c7c42144..cc1d7b100fa 100644 --- a/src/Interpreters/InterpreterInsertQuery.h +++ b/src/Interpreters/InterpreterInsertQuery.h @@ -43,6 +43,7 @@ public: Chain buildChain( const StoragePtr & table, + size_t view_level, const StorageMetadataPtr & metadata_snapshot, const Names & columns, ThreadStatusesHolderPtr thread_status_holder = {}, @@ -79,13 +80,20 @@ private: std::vector> owned_buffers; - std::pair, std::vector> buildPreAndSinkChains(size_t presink_streams, size_t sink_streams, StoragePtr table, const StorageMetadataPtr & metadata_snapshot, const Block & query_sample_block); + std::pair, std::vector> buildPreAndSinkChains( + size_t presink_streams, + size_t sink_streams, + StoragePtr table, + size_t view_level, + const StorageMetadataPtr & metadata_snapshot, + const Block & query_sample_block); QueryPipeline buildInsertSelectPipeline(ASTInsertQuery & query, StoragePtr table); QueryPipeline buildInsertPipeline(ASTInsertQuery & query, StoragePtr table); Chain buildSink( const StoragePtr & table, + size_t view_level, const StorageMetadataPtr & metadata_snapshot, ThreadStatusesHolderPtr thread_status_holder, ThreadGroupPtr running_group, diff --git a/src/Processors/Executors/PushingPipelineExecutor.cpp b/src/Processors/Executors/PushingPipelineExecutor.cpp index 7a1c0111a3a..70085938e8e 100644 --- a/src/Processors/Executors/PushingPipelineExecutor.cpp +++ b/src/Processors/Executors/PushingPipelineExecutor.cpp @@ -127,7 +127,10 @@ void PushingPipelineExecutor::finish() finished = true; if (executor) - executor->executeStep(); + { + auto res = executor->executeStep(); + chassert(!res); + } } void PushingPipelineExecutor::cancel() diff --git a/src/Processors/Transforms/ExceptionKeepingTransform.cpp b/src/Processors/Transforms/ExceptionKeepingTransform.cpp index b50f66b0240..9fbf710efd8 100644 --- a/src/Processors/Transforms/ExceptionKeepingTransform.cpp +++ b/src/Processors/Transforms/ExceptionKeepingTransform.cpp @@ -1,3 +1,4 @@ +#include #include #include #include diff --git a/src/Processors/Transforms/buildPushingToViewsChain.cpp b/src/Processors/Transforms/buildPushingToViewsChain.cpp index b48ba3eb8a0..74aede2c76a 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.cpp +++ b/src/Processors/Transforms/buildPushingToViewsChain.cpp @@ -35,6 +35,7 @@ #include #include +#include #include @@ -120,7 +121,7 @@ using ViewsDataPtr = std::shared_ptr; class CopyingDataToViewsTransform final : public IProcessor { public: - CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data); + CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data, size_t view_level_); String getName() const override { return "CopyingDataToViewsTransform"; } Status prepare() override; @@ -129,6 +130,7 @@ public: private: InputPort & input; ViewsDataPtr views_data; + size_t view_level; }; /// For source chunk, execute view query over it. @@ -223,6 +225,7 @@ private: /// Generates one chain part for every view in buildPushingToViewsChain std::optional generateViewChain( ContextPtr context, + size_t view_level, const StorageID & view_id, ThreadGroupPtr running_group, Chain & result_chain, @@ -369,7 +372,7 @@ std::optional generateViewChain( /// TODO: remove sql_security_type check after we turn `ignore_empty_sql_security_in_create_view_query=false` bool check_access = !materialized_view->hasInnerTable() && materialized_view->getInMemoryMetadataPtr()->sql_security_type; - out = interpreter.buildChain(inner_table, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access); + out = interpreter.buildChain(inner_table, view_level + 1, inner_metadata_snapshot, insert_columns, thread_status_holder, view_counter_ms, check_access); if (interpreter.shouldAddSquashingFroStorage(inner_table)) { @@ -400,6 +403,7 @@ std::optional generateViewChain( query = live_view->getInnerQuery(); out = buildPushingToViewsChain( view, view_metadata_snapshot, insert_context, ASTPtr(), + view_level + 1, /* no_destination= */ true, thread_status_holder, running_group, view_counter_ms, async_insert, storage_header); } @@ -409,12 +413,14 @@ std::optional generateViewChain( query = window_view->getMergeableQuery(); out = buildPushingToViewsChain( view, view_metadata_snapshot, insert_context, ASTPtr(), + view_level + 1, /* no_destination= */ true, thread_status_holder, running_group, view_counter_ms, async_insert); } else out = buildPushingToViewsChain( view, view_metadata_snapshot, insert_context, ASTPtr(), + view_level + 1, /* no_destination= */ false, thread_status_holder, running_group, view_counter_ms, async_insert); @@ -466,12 +472,14 @@ Chain buildPushingToViewsChain( const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const ASTPtr & query_ptr, + size_t view_level, bool no_destination, ThreadStatusesHolderPtr thread_status_holder, ThreadGroupPtr running_group, std::atomic_uint64_t * elapsed_counter_ms, bool async_insert, - const Block & live_view_header) + const Block & live_view_header + ) { checkStackSize(); Chain result_chain; @@ -514,7 +522,7 @@ Chain buildPushingToViewsChain( try { auto out = generateViewChain( - context, view_id, running_group, result_chain, + context, view_level, view_id, running_group, result_chain, views_data, thread_status_holder, async_insert, storage_header, disable_deduplication_for_children); if (!out.has_value()) @@ -554,7 +562,7 @@ Chain buildPushingToViewsChain( for (const auto & chain : chains) headers.push_back(chain.getOutputHeader()); - auto copying_data = std::make_shared(storage_header, views_data); + auto copying_data = std::make_shared(storage_header, views_data, view_level); auto finalizing_views = std::make_shared(std::move(headers), views_data); auto out = copying_data->getOutputs().begin(); auto in = finalizing_views->getInputs().begin(); @@ -726,10 +734,11 @@ static void logQueryViews(std::list & views, ContextPtr context } -CopyingDataToViewsTransform::CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data) +CopyingDataToViewsTransform::CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data, size_t view_level_) : IProcessor({header}, OutputPorts(data->views.size(), header)) , input(inputs.front()) , views_data(std::move(data)) + , view_level(view_level_) { if (views_data->views.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "CopyingDataToViewsTransform cannot have zero outputs"); @@ -765,6 +774,12 @@ IProcessor::Status CopyingDataToViewsTransform::prepare() auto data = input.pullData(); if (data.exception) { + // If view_level == 0 than the exception comes from the source table. + // There is no case when we could tolerate exceptions from the source table. + // Do not tolerate incoming exception and do not pass it to the following processors. + if (view_level == 0) + std::rethrow_exception(data.exception); + if (!views_data->has_exception) { views_data->first_exception = data.exception; diff --git a/src/Processors/Transforms/buildPushingToViewsChain.h b/src/Processors/Transforms/buildPushingToViewsChain.h index a1feed91b60..4308bbc56f4 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.h +++ b/src/Processors/Transforms/buildPushingToViewsChain.h @@ -60,6 +60,7 @@ Chain buildPushingToViewsChain( const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const ASTPtr & query_ptr, + size_t view_level, /// It is true when we should not insert into table, but only to views. /// Used e.g. for kafka. We should try to remove it somehow. bool no_destination, diff --git a/src/Storages/MergeTree/MergeTreeSink.cpp b/src/Storages/MergeTree/MergeTreeSink.cpp index d65d1f3212f..dfddcb311f4 100644 --- a/src/Storages/MergeTree/MergeTreeSink.cpp +++ b/src/Storages/MergeTree/MergeTreeSink.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -44,6 +45,8 @@ MergeTreeSink::~MergeTreeSink() if (!delayed_chunk) return; + chassert(isCancelled() || std::uncaught_exceptions()); + for (auto & partition : delayed_chunk->partitions) { partition.temp_part.cancel(); @@ -76,6 +79,7 @@ void MergeTreeSink::onStart() void MergeTreeSink::onFinish() { chassert(!isCancelled()); + finishDelayedChunk(); } diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index f8863a8a6d6..87b3d238e3b 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -119,7 +119,7 @@ struct MergedBlockOutputStream::Finalizer::Impl } void finish(); - void cancel(); + void cancel() noexcept; }; void MergedBlockOutputStream::Finalizer::finish() @@ -130,7 +130,7 @@ void MergedBlockOutputStream::Finalizer::finish() to_finish->finish(); } -void MergedBlockOutputStream::Finalizer::cancel() +void MergedBlockOutputStream::Finalizer::cancel() noexcept { std::unique_ptr to_cancel = std::move(impl); impl.reset(); @@ -167,7 +167,7 @@ void MergedBlockOutputStream::Finalizer::Impl::finish() part->getDataPartStorage().removeFile(file_name); } -void MergedBlockOutputStream::Finalizer::Impl::cancel() +void MergedBlockOutputStream::Finalizer::Impl::cancel() noexcept { writer.cancel(); @@ -183,15 +183,8 @@ MergedBlockOutputStream::Finalizer::Finalizer(std::unique_ptr impl_) : imp MergedBlockOutputStream::Finalizer::~Finalizer() { - try - { - if (impl) - finish(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } + if (impl) + cancel(); } diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.h b/src/Storages/MergeTree/MergedBlockOutputStream.h index b6fc13cbe42..2f6205427d1 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.h +++ b/src/Storages/MergeTree/MergedBlockOutputStream.h @@ -55,7 +55,7 @@ public: ~Finalizer(); void finish(); - void cancel(); + void cancel() noexcept; }; /// Finalize writing part and fill inner structures diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp index 3422e534f7d..8d3f4e8812e 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp @@ -179,6 +179,8 @@ ReplicatedMergeTreeSinkImpl::~ReplicatedMergeTreeSinkImpl() if (!delayed_chunk) return; + chassert(isCancelled() || std::uncaught_exceptions()); + for (auto & partition : delayed_chunk->partitions) { partition.temp_part.cancel(); diff --git a/tests/integration/test_checking_s3_blobs_paranoid/configs/no_s3_retries.xml b/tests/integration/test_checking_s3_blobs_paranoid/configs/no_s3_retries.xml new file mode 100644 index 00000000000..9dd3600329a --- /dev/null +++ b/tests/integration/test_checking_s3_blobs_paranoid/configs/no_s3_retries.xml @@ -0,0 +1,11 @@ + + + + + + 1 + 0 + 0 + + + diff --git a/tests/integration/test_checking_s3_blobs_paranoid/configs/query_log_conf.xml b/tests/integration/test_checking_s3_blobs_paranoid/configs/query_log_conf.xml new file mode 100644 index 00000000000..0e51871c2ef --- /dev/null +++ b/tests/integration/test_checking_s3_blobs_paranoid/configs/query_log_conf.xml @@ -0,0 +1,7 @@ + + + system + query_log
+ broken_s3 +
+
diff --git a/tests/integration/test_checking_s3_blobs_paranoid/test.py b/tests/integration/test_checking_s3_blobs_paranoid/test.py index 2cc70eba2ab..78eed431771 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/test.py +++ b/tests/integration/test_checking_s3_blobs_paranoid/test.py @@ -39,6 +39,18 @@ def cluster(): ], with_minio=True, ) + cluster.add_instance( + "node_with_query_log_on_s3", + main_configs=[ + "configs/storage_conf.xml", + "configs/query_log_conf.xml", + ], + user_configs=[ + "configs/setting.xml", + "configs/no_s3_retries.xml", + ], + with_minio=True, + ) logging.info("Starting cluster...") cluster.start() logging.info("Cluster started") @@ -718,3 +730,150 @@ def test_no_key_found_disk(cluster, broken_s3): ) assert s3_disk_no_key_errors_metric_value > 0 + + +def test_node_with_query_log_on_s3(cluster, broken_s3): + node = cluster.instances["node_with_query_log_on_s3"] + + node.query( + """ + SYSTEM FLUSH LOGS + """ + ) + + node.query( + """ + DROP VIEW IF EXISTS log_sink_mv + """ + ) + + node.query( + """ + DROP TABLE IF EXISTS log_sink + """ + ) + + node.query( + """ + CREATE TABLE log_sink + ENGINE = MergeTree() + ORDER BY () + EMPTY AS + SELECT * + FROM system.query_log + """ + ) + + node.query( + """ + CREATE MATERIALIZED VIEW log_sink_mv TO log_sink AS + SELECT * + FROM system.query_log + """ + ) + + node.query( + """ + SELECT 1111 + """ + ) + + node.query( + """ + SYSTEM FLUSH LOGS + """ + ) + + node.query( + """ + SELECT 2222 + """ + ) + + broken_s3.setup_at_object_upload(count=100, after=0) + + node.query( + """ + SYSTEM FLUSH LOGS + """ + ) + + count_from_query_log = node.query( + """ + SELECT count() from system.query_log WHERE query like 'SELECT 2222%' AND type = 'QueryFinish' + """ + ) + + assert count_from_query_log == "0\n" + + +def test_exception_in_onFinish(cluster, broken_s3): + node = cluster.instances["node_with_query_log_on_s3"] + + node.query( + """ + DROP VIEW IF EXISTS source_sink_mv + """ + ) + + node.query( + """ + DROP TABLE IF EXISTS source_sink + """ + ) + + node.query( + """ + DROP TABLE IF EXISTS source + """ + ) + + node.query( + """ + CREATE TABLE source (i Int64) + ENGINE = MergeTree() + ORDER BY () + SETTINGS storage_policy='broken_s3' + """ + ) + + node.query( + """ + CREATE TABLE source_sink + ENGINE = MergeTree() + ORDER BY () + EMPTY AS + SELECT * + FROM source + """ + ) + + node.query( + """ + CREATE MATERIALIZED VIEW source_sink_mv TO source_sink AS + SELECT * + FROM source + """ + ) + + node.query( + """ + INSERT INTO source SETTINGS materialized_views_ignore_errors=1 VALUES (1) + """ + ) + + broken_s3.setup_at_object_upload(count=100, after=0) + + node.query_and_get_error( + """ + INSERT INTO source SETTINGS materialized_views_ignore_errors=1 VALUES (2) + """ + ) + + count_from_query_log = node.query( + """ + SELECT count() from source + """ + ) + + assert count_from_query_log == "1\n" diff --git a/tests/queries/0_stateless/03282_materialized_views_ignore_errors.reference b/tests/queries/0_stateless/03282_materialized_views_ignore_errors.reference new file mode 100644 index 00000000000..2ed34740cc4 --- /dev/null +++ b/tests/queries/0_stateless/03282_materialized_views_ignore_errors.reference @@ -0,0 +1,14 @@ +-- { echoOn } + +insert into testX select number from numbers(20) + settings materialized_views_ignore_errors = 0; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } +select count() from testX; +0 +select count() from testXA; +0 +insert into testX select number from numbers(20) + settings materialized_views_ignore_errors = 1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } +select count() from testX; +0 +select count() from testXA; +0 diff --git a/tests/queries/0_stateless/03282_materialized_views_ignore_errors.sql b/tests/queries/0_stateless/03282_materialized_views_ignore_errors.sql new file mode 100644 index 00000000000..c5f4e94eecb --- /dev/null +++ b/tests/queries/0_stateless/03282_materialized_views_ignore_errors.sql @@ -0,0 +1,28 @@ +-- more blocks to process +set max_block_size = 10; +set min_insert_block_size_rows = 10; + +drop table if exists testX; +drop table if exists testXA; + +create table testX (A Int64) engine=MergeTree partition by (intDiv(A, 10), throwIf(A=2)) order by tuple(); +create materialized view testXA engine=MergeTree order by tuple() as select sleep(0.1) from testX; + +-- { echoOn } + +insert into testX select number from numbers(20) + settings materialized_views_ignore_errors = 0; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } + +select count() from testX; +select count() from testXA; + +insert into testX select number from numbers(20) + settings materialized_views_ignore_errors = 1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } + +select count() from testX; +select count() from testXA; + +-- { echoOff } + +drop table testX; +drop view testXA;