diff --git a/src/Interpreters/InterpreterInsertQuery.cpp b/src/Interpreters/InterpreterInsertQuery.cpp index 797895e4a93..b23711de83d 100644 --- a/src/Interpreters/InterpreterInsertQuery.cpp +++ b/src/Interpreters/InterpreterInsertQuery.cpp @@ -361,7 +361,7 @@ Chain InterpreterInsertQuery::buildSink( else { out = buildPushingToViewsChain(table, metadata_snapshot, context_ptr, - query_ptr, no_destination, + query_ptr, /* view_level */ 0, no_destination, thread_status_holder, running_group, elapsed_counter_ms, async_insert); } diff --git a/src/Processors/Executors/PushingPipelineExecutor.cpp b/src/Processors/Executors/PushingPipelineExecutor.cpp index 7a1c0111a3a..70085938e8e 100644 --- a/src/Processors/Executors/PushingPipelineExecutor.cpp +++ b/src/Processors/Executors/PushingPipelineExecutor.cpp @@ -127,7 +127,10 @@ void PushingPipelineExecutor::finish() finished = true; if (executor) - executor->executeStep(); + { + auto res = executor->executeStep(); + chassert(!res); + } } void PushingPipelineExecutor::cancel() diff --git a/src/Processors/Transforms/ExceptionKeepingTransform.cpp b/src/Processors/Transforms/ExceptionKeepingTransform.cpp index b50f66b0240..9fbf710efd8 100644 --- a/src/Processors/Transforms/ExceptionKeepingTransform.cpp +++ b/src/Processors/Transforms/ExceptionKeepingTransform.cpp @@ -1,3 +1,4 @@ +#include #include #include #include diff --git a/src/Processors/Transforms/buildPushingToViewsChain.cpp b/src/Processors/Transforms/buildPushingToViewsChain.cpp index b48ba3eb8a0..f0c92c261cd 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.cpp +++ b/src/Processors/Transforms/buildPushingToViewsChain.cpp @@ -35,6 +35,7 @@ #include #include +#include #include @@ -120,7 +121,7 @@ using ViewsDataPtr = std::shared_ptr; class CopyingDataToViewsTransform final : public IProcessor { public: - CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data); + CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data, size_t view_level_); String getName() const override { return "CopyingDataToViewsTransform"; } Status prepare() override; @@ -129,6 +130,7 @@ public: private: InputPort & input; ViewsDataPtr views_data; + size_t view_level; }; /// For source chunk, execute view query over it. @@ -223,6 +225,7 @@ private: /// Generates one chain part for every view in buildPushingToViewsChain std::optional generateViewChain( ContextPtr context, + size_t view_level, const StorageID & view_id, ThreadGroupPtr running_group, Chain & result_chain, @@ -400,6 +403,7 @@ std::optional generateViewChain( query = live_view->getInnerQuery(); out = buildPushingToViewsChain( view, view_metadata_snapshot, insert_context, ASTPtr(), + view_level + 1, /* no_destination= */ true, thread_status_holder, running_group, view_counter_ms, async_insert, storage_header); } @@ -409,12 +413,14 @@ std::optional generateViewChain( query = window_view->getMergeableQuery(); out = buildPushingToViewsChain( view, view_metadata_snapshot, insert_context, ASTPtr(), + view_level + 1, /* no_destination= */ true, thread_status_holder, running_group, view_counter_ms, async_insert); } else out = buildPushingToViewsChain( view, view_metadata_snapshot, insert_context, ASTPtr(), + view_level + 1, /* no_destination= */ false, thread_status_holder, running_group, view_counter_ms, async_insert); @@ -466,12 +472,14 @@ Chain buildPushingToViewsChain( const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const ASTPtr & query_ptr, + size_t view_level, bool no_destination, ThreadStatusesHolderPtr thread_status_holder, ThreadGroupPtr running_group, std::atomic_uint64_t * elapsed_counter_ms, bool async_insert, - const Block & live_view_header) + const Block & live_view_header + ) { checkStackSize(); Chain result_chain; @@ -514,7 +522,7 @@ Chain buildPushingToViewsChain( try { auto out = generateViewChain( - context, view_id, running_group, result_chain, + context, view_level, view_id, running_group, result_chain, views_data, thread_status_holder, async_insert, storage_header, disable_deduplication_for_children); if (!out.has_value()) @@ -554,7 +562,7 @@ Chain buildPushingToViewsChain( for (const auto & chain : chains) headers.push_back(chain.getOutputHeader()); - auto copying_data = std::make_shared(storage_header, views_data); + auto copying_data = std::make_shared(storage_header, views_data, view_level); auto finalizing_views = std::make_shared(std::move(headers), views_data); auto out = copying_data->getOutputs().begin(); auto in = finalizing_views->getInputs().begin(); @@ -726,10 +734,11 @@ static void logQueryViews(std::list & views, ContextPtr context } -CopyingDataToViewsTransform::CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data) +CopyingDataToViewsTransform::CopyingDataToViewsTransform(const Block & header, ViewsDataPtr data, size_t view_level_) : IProcessor({header}, OutputPorts(data->views.size(), header)) , input(inputs.front()) , views_data(std::move(data)) + , view_level(view_level_) { if (views_data->views.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "CopyingDataToViewsTransform cannot have zero outputs"); @@ -765,6 +774,12 @@ IProcessor::Status CopyingDataToViewsTransform::prepare() auto data = input.pullData(); if (data.exception) { + // If view_level == 0 than the exception comes from the source table. + // There is no case when we could tolerate exceptions from the source table. + // Do not tolerate incomming exception and do not pass it to the follwing processors. + if (view_level == 0) + std::rethrow_exception(data.exception); + if (!views_data->has_exception) { views_data->first_exception = data.exception; diff --git a/src/Processors/Transforms/buildPushingToViewsChain.h b/src/Processors/Transforms/buildPushingToViewsChain.h index a1feed91b60..4308bbc56f4 100644 --- a/src/Processors/Transforms/buildPushingToViewsChain.h +++ b/src/Processors/Transforms/buildPushingToViewsChain.h @@ -60,6 +60,7 @@ Chain buildPushingToViewsChain( const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const ASTPtr & query_ptr, + size_t view_level, /// It is true when we should not insert into table, but only to views. /// Used e.g. for kafka. We should try to remove it somehow. bool no_destination, diff --git a/src/Storages/MergeTree/MergeTreeSink.cpp b/src/Storages/MergeTree/MergeTreeSink.cpp index 582a722d678..47f5ab97119 100644 --- a/src/Storages/MergeTree/MergeTreeSink.cpp +++ b/src/Storages/MergeTree/MergeTreeSink.cpp @@ -4,6 +4,7 @@ #include #include #include +#include "Common/logger_useful.h" #include #include @@ -79,6 +80,7 @@ void MergeTreeSink::onStart() void MergeTreeSink::onFinish() { chassert(!isCancelled()); + finishDelayedChunk(); } diff --git a/tests/integration/test_checking_s3_blobs_paranoid/configs/no_s3_retries.xml b/tests/integration/test_checking_s3_blobs_paranoid/configs/no_s3_retries.xml new file mode 100644 index 00000000000..9dd3600329a --- /dev/null +++ b/tests/integration/test_checking_s3_blobs_paranoid/configs/no_s3_retries.xml @@ -0,0 +1,11 @@ + + + + + + 1 + 0 + 0 + + + diff --git a/tests/integration/test_checking_s3_blobs_paranoid/configs/query_log_conf.xml b/tests/integration/test_checking_s3_blobs_paranoid/configs/query_log_conf.xml new file mode 100644 index 00000000000..0e51871c2ef --- /dev/null +++ b/tests/integration/test_checking_s3_blobs_paranoid/configs/query_log_conf.xml @@ -0,0 +1,7 @@ + + + system + query_log
+ broken_s3 +
+
diff --git a/tests/integration/test_checking_s3_blobs_paranoid/test.py b/tests/integration/test_checking_s3_blobs_paranoid/test.py index 2cc70eba2ab..dbc2a0385af 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/test.py +++ b/tests/integration/test_checking_s3_blobs_paranoid/test.py @@ -39,6 +39,18 @@ def cluster(): ], with_minio=True, ) + cluster.add_instance( + "node_with_query_log_on_s3", + main_configs=[ + "configs/storage_conf.xml", + "configs/query_log_conf.xml" + ], + user_configs=[ + "configs/setting.xml", + "configs/no_s3_retries.xml", + ], + with_minio=True, + ) logging.info("Starting cluster...") cluster.start() logging.info("Cluster started") @@ -718,3 +730,118 @@ def test_no_key_found_disk(cluster, broken_s3): ) assert s3_disk_no_key_errors_metric_value > 0 + + +def test_node_with_query_log_on_s3(cluster, broken_s3): + node = cluster.instances["node_with_query_log_on_s3"] + + node.query( + """ + SYSTEM FLUSH LOGS + """ + ) + + node.query( + """ + CREATE TABLE log_sink + ENGINE = MergeTree() + ORDER BY () + EMPTY AS + SELECT * + FROM system.query_log + """ + ) + + node.query( + """ + CREATE MATERIALIZED VIEW log_sink_mv TO log_sink AS + SELECT * + FROM system.query_log + """ + ) + + node.query( + """ + SELECT 1111 + """ + ) + + node.query( + """ + SYSTEM FLUSH LOGS + """ + ) + + node.query( + """ + SELECT 2222 + """ + ) + + broken_s3.setup_at_object_upload(count=100, after=0) + + node.query( + """ + SYSTEM FLUSH LOGS + """ + ) + + count_from_query_log = node.query( + """ + SELECT count() from system.query_log WHERE query like 'SELECT 2222%' AND type = 'QueryFinish' + """) + + assert count_from_query_log == "0\n" + + +def test_exception_in_onFinish(cluster, broken_s3): + node = cluster.instances["node_with_query_log_on_s3"] + + node.query( + """ + CREATE TABLE source (i Int64) + ENGINE = MergeTree() + ORDER BY () + SETTINGS storage_policy='broken_s3' + """ + ) + + node.query( + """ + CREATE TABLE source_sink + ENGINE = MergeTree() + ORDER BY () + EMPTY AS + SELECT * + FROM source + """ + ) + + node.query( + """ + CREATE MATERIALIZED VIEW source_sink_mv TO source_sink AS + SELECT * + FROM source + """ + ) + + node.query( + """ + INSERT INTO source SETTINGS materialized_views_ignore_errors=1 VALUES (1) + """ + ) + + broken_s3.setup_at_object_upload(count=100, after=0) + + node.query_and_get_error( + """ + INSERT INTO source SETTINGS materialized_views_ignore_errors=1 VALUES (2) + """ + ) + + count_from_query_log = node.query( + """ + SELECT count() from source + """) + + assert count_from_query_log == "1\n" diff --git a/tests/queries/0_stateless/03282_materialized_views_ignore_errors.reference b/tests/queries/0_stateless/03282_materialized_views_ignore_errors.reference new file mode 100644 index 00000000000..2ed34740cc4 --- /dev/null +++ b/tests/queries/0_stateless/03282_materialized_views_ignore_errors.reference @@ -0,0 +1,14 @@ +-- { echoOn } + +insert into testX select number from numbers(20) + settings materialized_views_ignore_errors = 0; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } +select count() from testX; +0 +select count() from testXA; +0 +insert into testX select number from numbers(20) + settings materialized_views_ignore_errors = 1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } +select count() from testX; +0 +select count() from testXA; +0 diff --git a/tests/queries/0_stateless/03282_materialized_views_ignore_errors.sql b/tests/queries/0_stateless/03282_materialized_views_ignore_errors.sql new file mode 100644 index 00000000000..c5f4e94eecb --- /dev/null +++ b/tests/queries/0_stateless/03282_materialized_views_ignore_errors.sql @@ -0,0 +1,28 @@ +-- more blocks to process +set max_block_size = 10; +set min_insert_block_size_rows = 10; + +drop table if exists testX; +drop table if exists testXA; + +create table testX (A Int64) engine=MergeTree partition by (intDiv(A, 10), throwIf(A=2)) order by tuple(); +create materialized view testXA engine=MergeTree order by tuple() as select sleep(0.1) from testX; + +-- { echoOn } + +insert into testX select number from numbers(20) + settings materialized_views_ignore_errors = 0; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } + +select count() from testX; +select count() from testXA; + +insert into testX select number from numbers(20) + settings materialized_views_ignore_errors = 1; -- { serverError FUNCTION_THROW_IF_VALUE_IS_NON_ZERO } + +select count() from testX; +select count() from testXA; + +-- { echoOff } + +drop table testX; +drop view testXA;