diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.cpp b/src/DataStreams/PushingToViewsBlockOutputStream.cpp index a79cc61bd2d..ce0922bf282 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.cpp +++ b/src/DataStreams/PushingToViewsBlockOutputStream.cpp @@ -90,7 +90,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream( else out = std::make_shared(dependent_table, *views_context, ASTPtr()); - views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out)}); + views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr}); } /// Do not push to destination table if the flag is set @@ -162,7 +162,12 @@ void PushingToViewsBlockOutputStream::write(const Block & block) { // Process sequentially for (size_t view_num = 0; view_num < views.size(); ++view_num) + { process(block, view_num); + + if (views[view_num].exception) + std::rethrow_exception(views[view_num].exception); + } } } @@ -190,8 +195,18 @@ void PushingToViewsBlockOutputStream::writeSuffix() if (output) output->writeSuffix(); + std::exception_ptr first_exception; + for (auto & view : views) { + if (view.exception) + { + if (!first_exception) + first_exception = view.exception; + + continue; + } + try { view.out->writeSuffix(); @@ -202,6 +217,9 @@ void PushingToViewsBlockOutputStream::writeSuffix() throw; } } + + if (first_exception) + std::rethrow_exception(first_exception); } void PushingToViewsBlockOutputStream::flush() @@ -270,7 +288,11 @@ void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_n catch (Exception & ex) { ex.addMessage("while pushing to view " + view.table_id.getNameForLogs()); - throw; + view.exception = std::current_exception(); + } + catch (...) + { + view.exception = std::current_exception(); } } diff --git a/src/DataStreams/PushingToViewsBlockOutputStream.h b/src/DataStreams/PushingToViewsBlockOutputStream.h index 162c2e1b447..a2a1ca5caf5 100644 --- a/src/DataStreams/PushingToViewsBlockOutputStream.h +++ b/src/DataStreams/PushingToViewsBlockOutputStream.h @@ -40,6 +40,7 @@ private: ASTPtr query; StorageID table_id; BlockOutputStreamPtr out; + std::exception_ptr exception; }; std::vector views; diff --git a/tests/queries/0_stateless/01275_parallel_mv.reference b/tests/queries/0_stateless/01275_parallel_mv.reference new file mode 100644 index 00000000000..898d3f7266e --- /dev/null +++ b/tests/queries/0_stateless/01275_parallel_mv.reference @@ -0,0 +1,4 @@ +10 +10 +0 +10 diff --git a/tests/queries/0_stateless/01275_parallel_mv.sql b/tests/queries/0_stateless/01275_parallel_mv.sql new file mode 100644 index 00000000000..b67fbf02f8d --- /dev/null +++ b/tests/queries/0_stateless/01275_parallel_mv.sql @@ -0,0 +1,18 @@ +drop table if exists testX; +drop table if exists testXA; +drop table if exists testXB; +drop table if exists testXC; + +create table testX (A Int64) engine=MergeTree order by tuple(); + +create materialized view testXA engine=MergeTree order by tuple() as select sleep(1) from testX; +create materialized view testXB engine=MergeTree order by tuple() as select sleep(2), throwIf(A=1) from testX; +create materialized view testXC engine=MergeTree order by tuple() as select sleep(1) from testX; + +set parallel_view_processing=1; +insert into testX select number from numbers(10); -- {serverError 395} + +select count() from testX; +select count() from testXA; +select count() from testXB; +select count() from testXC;