Async read for disk web

This commit is contained in:
kssenii 2021-09-26 22:49:28 +03:00
parent 6219d541a5
commit efddc03246
3 changed files with 31 additions and 11 deletions

View File

@ -116,19 +116,22 @@ public:
ContextPtr context_,
size_t buf_size_,
size_t backoff_threshold_,
size_t max_tries_)
size_t max_tries_,
size_t threadpool_read_)
: ReadBufferFromRemoteFS(metadata_)
, uri(uri_)
, context(context_)
, buf_size(buf_size_)
, backoff_threshold(backoff_threshold_)
, max_tries(max_tries_)
, threadpool_read(threadpool_read_)
{
}
SeekableReadBufferPtr createReadBuffer(const String & path) const override
{
return std::make_unique<ReadIndirectBufferFromWebServer>(fs::path(uri) / path, context, buf_size, backoff_threshold, max_tries);
return std::make_unique<ReadIndirectBufferFromWebServer>(
fs::path(uri) / path, context, buf_size, backoff_threshold, max_tries, threadpool_read);
}
private:
@ -137,6 +140,7 @@ private:
size_t buf_size;
size_t backoff_threshold;
size_t max_tries;
bool threadpool_read;
};
@ -198,9 +202,14 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
RemoteMetadata meta(path, remote_path);
meta.remote_fs_objects.emplace_back(std::make_pair(remote_path, iter->second.size));
bool threadpool_read = read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool;
auto web_impl = std::make_unique<ReadBufferFromWebServer>(url, meta, getContext(),
read_settings.remote_fs_buffer_size, read_settings.remote_fs_backoff_threshold, read_settings.remote_fs_backoff_max_tries);
if (read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool)
read_settings.remote_fs_buffer_size,
read_settings.remote_fs_backoff_threshold, read_settings.remote_fs_backoff_max_tries,
threadpool_read);
if (threadpool_read)
{
auto reader = IDiskRemote::getThreadPoolReader();
auto buf = std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings.priority, std::move(web_impl));

View File

@ -24,7 +24,8 @@ static const auto WAIT_MS = 10;
ReadIndirectBufferFromWebServer::ReadIndirectBufferFromWebServer(
const String & url_, ContextPtr context_, size_t buf_size_, size_t backoff_threshold_, size_t max_tries_)
const String & url_, ContextPtr context_, size_t buf_size_,
size_t backoff_threshold_, size_t max_tries_, bool use_external_buffer_)
: BufferWithOwnMemory<SeekableReadBuffer>(buf_size_)
, log(&Poco::Logger::get("ReadIndirectBufferFromWebServer"))
, context(context_)
@ -32,6 +33,7 @@ ReadIndirectBufferFromWebServer::ReadIndirectBufferFromWebServer(
, buf_size(buf_size_)
, backoff_threshold_ms(backoff_threshold_)
, max_tries(max_tries_)
, use_external_buffer(use_external_buffer_)
{
}
@ -70,8 +72,15 @@ bool ReadIndirectBufferFromWebServer::nextImpl()
if (impl)
{
/// Restore correct position at the needed offset.
impl->position() = position();
if (use_external_buffer)
{
impl->set(working_buffer.begin(), working_buffer.size());
}
else
{
impl->position() = position();
}
assert(!impl->hasPendingData());
}

View File

@ -16,10 +16,10 @@ namespace DB
class ReadIndirectBufferFromWebServer : public BufferWithOwnMemory<SeekableReadBuffer>
{
public:
explicit ReadIndirectBufferFromWebServer(const String & url_,
ContextPtr context_,
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE,
size_t backoff_threshold_ = 10000, size_t max_tries_ = 4);
explicit ReadIndirectBufferFromWebServer(
const String & url_, ContextPtr context_,
size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE,
size_t backoff_threshold_ = 10000, size_t max_tries_ = 4, bool use_external_buffer_ = false);
bool nextImpl() override;
@ -42,6 +42,8 @@ private:
size_t backoff_threshold_ms;
size_t max_tries;
bool use_external_buffer;
};
}