Common thread pool for disks

This commit is contained in:
kssenii 2021-09-21 10:00:56 +03:00
parent cacc81ca61
commit 40ee75fe97
6 changed files with 18 additions and 7 deletions

View File

@ -11,7 +11,6 @@
#include <IO/SeekAvoidingReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromRemoteFS.h>
#include <IO/ThreadPoolRemoteFSReader.h>
#include <Disks/AsynchronousReadIndirectBufferFromRemoteFS.h>
@ -194,7 +193,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
auto web_impl = std::make_unique<ReadBufferFromWebServer>(url, meta, getContext(), read_settings.remote_fs_buffer_size);
if (read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool)
{
static AsynchronousReaderPtr reader = std::make_shared<ThreadPoolRemoteFSReader>(16, 1000000);
auto reader = IDiskRemote::getThreadPoolReader();
auto buf = std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings.priority, std::move(web_impl));
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), min_bytes_for_seek);
}

View File

@ -4,7 +4,6 @@
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <IO/ReadBufferFromRemoteFS.h>
#include <IO/ThreadPoolRemoteFSReader.h>
#include <Disks/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/WriteIndirectBufferFromRemoteFS.h>
@ -108,7 +107,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskHDFS::readFile(const String & path,
if (read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool)
{
static AsynchronousReaderPtr reader = std::make_shared<ThreadPoolRemoteFSReader>(16, 1000000);
auto reader = getThreadPoolReader();
auto buf = std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings.priority, std::move(hdfs_impl));
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
}

View File

@ -12,6 +12,7 @@
#include <Common/checkStackSize.h>
#include <boost/algorithm/string.hpp>
#include <Common/filesystemHelpers.h>
#include <IO/ThreadPoolRemoteFSReader.h>
namespace DB
@ -496,4 +497,11 @@ String IDiskRemote::getUniqueId(const String & path) const
return id;
}
AsynchronousReaderPtr IDiskRemote::getThreadPoolReader()
{
static AsynchronousReaderPtr reader = std::make_shared<ThreadPoolRemoteFSReader>(16, 1000000);
return reader;
}
}

View File

@ -35,6 +35,10 @@ protected:
using RemoteFSPathKeeperPtr = std::shared_ptr<RemoteFSPathKeeper>;
class IAsynchronousReader;
using AsynchronousReaderPtr = std::shared_ptr<IAsynchronousReader>;
/// Base Disk class for remote FS's, which are not posix-compatible (DiskS3 and DiskHDFS)
class IDiskRemote : public IDisk
{
@ -127,6 +131,8 @@ public:
virtual RemoteFSPathKeeperPtr createFSPathKeeper() const = 0;
static AsynchronousReaderPtr getThreadPoolReader();
protected:
Poco::Logger * log;
const String name;

View File

@ -29,7 +29,6 @@
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromRemoteFS.h>
#include <Disks/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <IO/ThreadPoolRemoteFSReader.h>
#include <aws/s3/model/CopyObjectRequest.h> // Y_IGNORE
#include <aws/s3/model/DeleteObjectsRequest.h> // Y_IGNORE
@ -238,7 +237,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, co
if (read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool)
{
static AsynchronousReaderPtr reader = std::make_shared<ThreadPoolRemoteFSReader>(16, 1000000);
auto reader = getThreadPoolReader();
auto buf = std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings.priority, std::move(s3_impl));
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
}

View File

@ -81,7 +81,7 @@ bool ReadBufferFromRemoteFS::readImpl()
}
off_t ReadBufferFromRemoteFS::seek(off_t offset_, int whence)
off_t ReadBufferFromRemoteFS::seek([[maybe_unused]] off_t offset_, int whence)
{
if (whence != SEEK_SET)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET is allowed");