Change implementation of copyData for multiple streams.

This commit is contained in:
Nikolai Kochetov 2019-12-19 17:44:58 +03:00
parent c61e2aef3f
commit 7e398f4731
2 changed files with 71 additions and 58 deletions

View File

@ -49,6 +49,10 @@ public:
protected:
Block readImpl() override
{
/// We do not use cancel flag here.
/// If query was cancelled, it will be processed by child streams.
/// Part of the data will be processed.
if (input_streams.size() == 1 && output_streams.size() == 1)
copyData(*input_streams.at(0), *output_streams.at(0));
else

View File

@ -2,6 +2,7 @@
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <DataStreams/ParallelInputsProcessor.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ThreadPool.h>
@ -54,69 +55,77 @@ void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCall
inline void doNothing(const Block &) {}
namespace
{
struct ParallelInsertsHandler
{
using CencellationHook = std::function<void()>;
explicit ParallelInsertsHandler(BlockOutputStreams & output_streams, CencellationHook cancellation_hook_)
: outputs(output_streams.size()), cancellation_hook(std::move(cancellation_hook_))
{
exceptions.resize(output_streams.size());
for (auto & output : output_streams)
outputs.push(output.get());
}
void onBlock(Block & block, size_t /*thread_num*/)
{
IBlockOutputStream * out = nullptr;
outputs.pop(out);
out->write(block);
outputs.push(out);
}
void onFinishThread(size_t /*thread_num*/) {}
void onFinish() {}
void onException(std::exception_ptr & exception, size_t thread_num)
{
exceptions[thread_num] = std::move(exception);
cancellation_hook();
}
void rethrowFirstException()
{
for (auto & exception : exceptions)
if (exception)
std::rethrow_exception(exception);
}
ConcurrentBoundedQueue<IBlockOutputStream *> outputs;
std::vector<std::exception_ptr> exceptions;
CencellationHook cancellation_hook;
};
}
static void copyDataImpl(BlockInputStreams & froms, BlockOutputStreams & tos)
{
if (froms.size() == tos.size())
{
std::vector<ThreadFromGlobalPool> threads;
threads.reserve(froms.size());
for (size_t i = 0; i < froms.size(); i++)
{
threads.emplace_back([from = froms.at(i), to = tos.at(i)]()
{
from->readPrefix();
to->writePrefix();
while (Block block = from->read())
to->write(block);
from->readSuffix();
to->writeSuffix();
});
}
for (auto & thread : threads)
thread.join();
}
else
{
ConcurrentBoundedQueue<Block> queue(froms.size());
std::vector<ThreadFromGlobalPool> to_threads;
for (auto & to : tos)
{
to_threads.emplace_back([&queue, to]()
{
to->writePrefix();
Block block;
while (true)
{
queue.pop(block);
if (!block)
break;
to->write(block);
}
to->writeSuffix();
});
}
for (auto & output : tos)
output->writePrefix();
std::vector<ThreadFromGlobalPool> from_threads_;
from_threads_.reserve(froms.size());
for (auto & from : froms)
{
from_threads_.emplace_back([&queue, from]()
{
from->readPrefix();
while (Block block = from->read())
queue.push(block);
from->readSuffix();
});
}
using Processor = ParallelInputsProcessor<ParallelInsertsHandler>;
Processor * processor_ptr = nullptr;
for (auto & thread : from_threads_)
thread.join();
for (size_t i = 0; i < tos.size(); i++)
queue.push({});
for (auto & thread : to_threads)
thread.join();
}
ParallelInsertsHandler handler(tos, [&processor_ptr]() { processor_ptr->cancel(false); });
ParallelInputsProcessor<ParallelInsertsHandler> processor(froms, nullptr, froms.size(), handler);
processor_ptr = &processor;
processor.process();
processor.wait();
handler.rethrowFirstException();
/// readPrefix is called in ParallelInputsProcessor.
for (auto & input : froms)
input->readSuffix();
for (auto & output : tos)
output->writeSuffix();
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled)