Create CallbackRunner in Storage

This commit is contained in:
Antonio Andelic 2022-03-30 10:49:37 +00:00
parent de2c6684a3
commit a3ad99be03
4 changed files with 6 additions and 5 deletions

View File

@ -13,10 +13,10 @@ namespace ErrorCodes
}
ParallelReadBuffer::ParallelReadBuffer(std::unique_ptr<ReadBufferFactory> reader_factory_, ThreadPool * pool, size_t max_working_readers_)
ParallelReadBuffer::ParallelReadBuffer(std::unique_ptr<ReadBufferFactory> reader_factory_, CallbackRunner schedule_, size_t max_working_readers_)
: SeekableReadBufferWithSize(nullptr, 0)
, max_working_readers(max_working_readers_)
, schedule(threadPoolCallbackRunner(*pool))
, schedule(std::move(schedule_))
, reader_factory(std::move(reader_factory_))
{
std::unique_lock<std::mutex> lock{mutex};

View File

@ -77,7 +77,7 @@ public:
virtual std::optional<size_t> getTotalSize() = 0;
};
explicit ParallelReadBuffer(std::unique_ptr<ReadBufferFactory> reader_factory_, ThreadPool * pool, size_t max_working_readers);
explicit ParallelReadBuffer(std::unique_ptr<ReadBufferFactory> reader_factory_, CallbackRunner schedule_, size_t max_working_readers);
~ParallelReadBuffer() override { finishAndWait(); }

View File

@ -324,7 +324,7 @@ std::unique_ptr<ReadBuffer> StorageS3Source::createS3ReadBuffer(const String & k
LOG_TRACE(
log, "Downloading from S3 in {} threads. Object size: {}, Range size: {}.", download_thread_num, object_size, download_buffer_size);
return std::make_unique<ParallelReadBuffer>(std::move(factory), &IOThreadPool::get(), download_thread_num);
return std::make_unique<ParallelReadBuffer>(std::move(factory), threadPoolCallbackRunner(IOThreadPool::get()), download_thread_num);
}
String StorageS3Source::getName() const

View File

@ -2,6 +2,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
@ -349,7 +350,7 @@ namespace
return wrapReadBufferWithCompressionMethod(
std::make_unique<ParallelReadBuffer>(
std::move(read_buffer_factory),
&IOThreadPool::get(),
threadPoolCallbackRunner(IOThreadPool::get()),
download_threads),
chooseCompressionMethod(request_uri.getPath(), compression_method));
}