Merge pull request #10757 from ClickHouse/fix-parallel-mv

Fix parallel MV
This commit is contained in:
alesapin 2020-05-15 14:03:09 +03:00 committed by GitHub
commit ef1c7da4af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 47 additions and 2 deletions

View File

@ -90,7 +90,7 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
else
out = std::make_shared<PushingToViewsBlockOutputStream>(dependent_table, *views_context, ASTPtr());
views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out)});
views.emplace_back(ViewInfo{std::move(query), database_table, std::move(out), nullptr});
}
/// Do not push to destination table if the flag is set
@ -162,7 +162,12 @@ void PushingToViewsBlockOutputStream::write(const Block & block)
{
// Process sequentially
for (size_t view_num = 0; view_num < views.size(); ++view_num)
{
process(block, view_num);
if (views[view_num].exception)
std::rethrow_exception(views[view_num].exception);
}
}
}
@ -190,8 +195,18 @@ void PushingToViewsBlockOutputStream::writeSuffix()
if (output)
output->writeSuffix();
std::exception_ptr first_exception;
for (auto & view : views)
{
if (view.exception)
{
if (!first_exception)
first_exception = view.exception;
continue;
}
try
{
view.out->writeSuffix();
@ -202,6 +217,9 @@ void PushingToViewsBlockOutputStream::writeSuffix()
throw;
}
}
if (first_exception)
std::rethrow_exception(first_exception);
}
void PushingToViewsBlockOutputStream::flush()
@ -270,7 +288,11 @@ void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_n
catch (Exception & ex)
{
ex.addMessage("while pushing to view " + view.table_id.getNameForLogs());
throw;
view.exception = std::current_exception();
}
catch (...)
{
view.exception = std::current_exception();
}
}

View File

@ -40,6 +40,7 @@ private:
ASTPtr query;
StorageID table_id;
BlockOutputStreamPtr out;
std::exception_ptr exception;
};
std::vector<ViewInfo> views;

View File

@ -0,0 +1,4 @@
10
10
0
10

View File

@ -0,0 +1,18 @@
drop table if exists testX;
drop table if exists testXA;
drop table if exists testXB;
drop table if exists testXC;
create table testX (A Int64) engine=MergeTree order by tuple();
create materialized view testXA engine=MergeTree order by tuple() as select sleep(1) from testX;
create materialized view testXB engine=MergeTree order by tuple() as select sleep(2), throwIf(A=1) from testX;
create materialized view testXC engine=MergeTree order by tuple() as select sleep(1) from testX;
set parallel_view_processing=1;
insert into testX select number from numbers(10); -- {serverError 395}
select count() from testX;
select count() from testXA;
select count() from testXB;
select count() from testXC;