Fix possible segfault on exception.

This commit is contained in:
Nikolai Kochetov 2019-12-19 19:27:35 +03:00
parent 9a4b5992e9
commit 725ac1255b
2 changed files with 10 additions and 10 deletions

View File

@ -86,7 +86,7 @@ struct ParallelInsertsHandler
void onException(std::exception_ptr & exception, size_t thread_num) void onException(std::exception_ptr & exception, size_t thread_num)
{ {
exceptions[thread_num] = std::move(exception); exceptions[thread_num] = exception;
cancellation_hook(); cancellation_hook();
} }
@ -104,16 +104,16 @@ struct ParallelInsertsHandler
} }
static void copyDataImpl(BlockInputStreams & froms, BlockOutputStreams & tos) static void copyDataImpl(BlockInputStreams & inputs, BlockOutputStreams & outputs)
{ {
for (auto & output : tos) for (auto & output : outputs)
output->writePrefix(); output->writePrefix();
using Processor = ParallelInputsProcessor<ParallelInsertsHandler>; using Processor = ParallelInputsProcessor<ParallelInsertsHandler>;
Processor * processor_ptr = nullptr; Processor * processor_ptr = nullptr;
ParallelInsertsHandler handler(tos, [&processor_ptr]() { processor_ptr->cancel(false); }); ParallelInsertsHandler handler(outputs, [&processor_ptr]() { processor_ptr->cancel(false); });
ParallelInputsProcessor<ParallelInsertsHandler> processor(froms, nullptr, froms.size(), handler); ParallelInputsProcessor<ParallelInsertsHandler> processor(inputs, nullptr, outputs.size(), handler);
processor_ptr = &processor; processor_ptr = &processor;
processor.process(); processor.process();
@ -121,10 +121,10 @@ static void copyDataImpl(BlockInputStreams & froms, BlockOutputStreams & tos)
handler.rethrowFirstException(); handler.rethrowFirstException();
/// readPrefix is called in ParallelInputsProcessor. /// readPrefix is called in ParallelInputsProcessor.
for (auto & input : froms) for (auto & input : inputs)
input->readSuffix(); input->readSuffix();
for (auto & output : tos) for (auto & output : outputs)
output->writeSuffix(); output->writeSuffix();
} }
@ -138,9 +138,9 @@ void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<boo
copyDataImpl(from, to, is_cancelled_pred, doNothing); copyDataImpl(from, to, is_cancelled_pred, doNothing);
} }
void copyData(BlockInputStreams & froms, BlockOutputStreams & tos) void copyData(BlockInputStreams & inputs, BlockOutputStreams & outputs)
{ {
copyDataImpl(froms, tos); copyDataImpl(inputs, outputs);
} }
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled) void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled)

View File

@ -16,7 +16,7 @@ class Block;
*/ */
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled = nullptr); void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled = nullptr);
void copyData(BlockInputStreams & froms, BlockOutputStreams & tos); void copyData(BlockInputStreams & inputs, BlockOutputStreams & outputs);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled); void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled);