diff --git a/dbms/src/DataStreams/copyData.cpp b/dbms/src/DataStreams/copyData.cpp index 333f07d2836..b3820828977 100644 --- a/dbms/src/DataStreams/copyData.cpp +++ b/dbms/src/DataStreams/copyData.cpp @@ -55,7 +55,7 @@ void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCall inline void doNothing(const Block &) {} -void copyDataImpl(BlockInputStreams & froms, BlockOutputStreams & tos) +static void copyDataImpl(BlockInputStreams & froms, BlockOutputStreams & tos) { if (froms.size() == tos.size()) { @@ -79,25 +79,6 @@ void copyDataImpl(BlockInputStreams & froms, BlockOutputStreams & tos) else { ConcurrentBoundedQueue queue(froms.size()); - ThreadFromGlobalPool from_threads([&]() - { - std::vector from_threads_; - from_threads_.reserve(froms.size()); - for (auto & from : froms) - { - from_threads_.emplace_back([&queue, from]() - { - from->readPrefix(); - while (Block block = from->read()) - queue.push(block); - from->readSuffix(); - }); - } - for (auto & thread : from_threads_) - thread.join(); - for (size_t i = 0; i < tos.size(); i++) - queue.push({}); - }); std::vector to_threads; for (auto & to : tos) { @@ -116,7 +97,23 @@ void copyDataImpl(BlockInputStreams & froms, BlockOutputStreams & tos) }); } - from_threads.join(); + std::vector from_threads_; + from_threads_.reserve(froms.size()); + for (auto & from : froms) + { + from_threads_.emplace_back([&queue, from]() + { + from->readPrefix(); + while (Block block = from->read()) + queue.push(block); + from->readSuffix(); + }); + } + + for (auto & thread : from_threads_) + thread.join(); + for (size_t i = 0; i < tos.size(); i++) + queue.push({}); for (auto & thread : to_threads) thread.join(); }