Chenge number of threads for insert select.

This commit is contained in:
Nikolai Kochetov 2019-12-19 22:20:13 +03:00
parent 725ac1255b
commit 8a1ec4c379

View File

@ -63,10 +63,10 @@ struct ParallelInsertsHandler
{
using CencellationHook = std::function<void()>;
explicit ParallelInsertsHandler(BlockOutputStreams & output_streams, CencellationHook cancellation_hook_)
explicit ParallelInsertsHandler(BlockOutputStreams & output_streams, CencellationHook cancellation_hook_, size_t num_threads)
: outputs(output_streams.size()), cancellation_hook(std::move(cancellation_hook_))
{
exceptions.resize(output_streams.size());
exceptions.resize(num_threads);
for (auto & output : output_streams)
outputs.push(output.get());
@ -112,8 +112,8 @@ static void copyDataImpl(BlockInputStreams & inputs, BlockOutputStreams & output
using Processor = ParallelInputsProcessor<ParallelInsertsHandler>;
Processor * processor_ptr = nullptr;
ParallelInsertsHandler handler(outputs, [&processor_ptr]() { processor_ptr->cancel(false); });
ParallelInputsProcessor<ParallelInsertsHandler> processor(inputs, nullptr, outputs.size(), handler);
ParallelInsertsHandler handler(outputs, [&processor_ptr]() { processor_ptr->cancel(false); }, inputs.size());
ParallelInputsProcessor<ParallelInsertsHandler> processor(inputs, nullptr, inputs.size(), handler);
processor_ptr = &processor;
processor.process();