Move io_uring reader into the Context from static to make it's thread joinable

v2: fix for standalone keeper build
CI: https://s3.amazonaws.com/clickhouse-test-reports/52717/72b1052f7c2d453308262924e767ab8dc2206933/stateless_tests__debug__[4_5].html
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2023-08-03 21:37:37 +02:00
parent b7edde3621
commit 6ccbc2ea75
5 changed files with 51 additions and 3 deletions

View File

@ -4,6 +4,7 @@
#include <Common/Macros.h>
#include <Common/ThreadPool.h>
#include <Common/callOnce.h>
#include <Disks/IO/IOUringReader.h>
#include <Core/ServerSettings.h>
@ -62,6 +63,11 @@ struct ContextSharedPart : boost::noncopyable
mutable std::unique_ptr<IAsynchronousReader> asynchronous_local_fs_reader;
mutable std::unique_ptr<IAsynchronousReader> synchronous_local_fs_reader;
#if USE_LIBURING
mutable OnceFlag io_uring_reader_initialized;
mutable std::unique_ptr<IOUringReader> io_uring_reader;
#endif
mutable OnceFlag threadpool_writer_initialized;
mutable std::unique_ptr<ThreadPool> threadpool_writer;
@ -225,6 +231,17 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co
}
}
#if USE_LIBURING
IOUringReader & Context::getIOURingReader() const
{
callOnce(shared->io_uring_reader_initialized, [&] {
shared->io_uring_reader = std::make_unique<IOUringReader>(512);
});
return *shared->io_uring_reader;
}
#endif
std::shared_ptr<FilesystemCacheLog> Context::getFilesystemCacheLog() const
{
return nullptr;

View File

@ -20,6 +20,8 @@
#include <memory>
#include "config.h"
namespace DB
{
@ -28,6 +30,7 @@ class Macros;
class FilesystemCacheLog;
class FilesystemReadPrefetchesLog;
class BlobStorageLog;
class IOUringReader;
/// A small class which owns ContextShared.
/// We don't use something like unique_ptr directly to allow ContextShared type to be incomplete.
@ -127,6 +130,9 @@ public:
ApplicationType getApplicationType() const { return ApplicationType::KEEPER; }
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) const;
#if USE_LIBURING
IOUringReader & getIOURingReader() const;
#endif
std::shared_ptr<AsyncReadCounters> getAsyncReadCounters() const;
ThreadPool & getThreadPoolWriter() const;

View File

@ -101,12 +101,16 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
else if (settings.local_fs_method == LocalFSReadMethod::io_uring)
{
#if USE_LIBURING
static std::shared_ptr<IOUringReader> reader = std::make_shared<IOUringReader>(512);
if (!reader->isSupported())
auto global_context = Context::getGlobalContextInstance();
if (!global_context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot obtain io_uring reader (global context not initialized)");
auto & reader = global_context->getIOURingReader();
if (!reader.isSupported())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "io_uring is not supported by this system");
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
*reader,
reader,
settings.priority,
filename,
buffer_size,

View File

@ -35,6 +35,7 @@
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/StoragePolicy.h>
#include <Disks/IO/IOUringReader.h>
#include <IO/SynchronousReader.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Interpreters/ActionLocksManager.h>
@ -309,6 +310,11 @@ struct ContextSharedPart : boost::noncopyable
mutable OnceFlag threadpool_writer_initialized;
mutable std::unique_ptr<ThreadPool> threadpool_writer;
#if USE_LIBURING
mutable OnceFlag io_uring_reader_initialized;
mutable std::unique_ptr<IOUringReader> io_uring_reader;
#endif
mutable ThrottlerPtr replicated_fetches_throttler; /// A server-wide throttler for replicated fetches
mutable ThrottlerPtr replicated_sends_throttler; /// A server-wide throttler for replicated sends
@ -4861,6 +4867,17 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co
}
}
#if USE_LIBURING
IOUringReader & Context::getIOURingReader() const
{
callOnce(shared->io_uring_reader_initialized, [&] {
shared->io_uring_reader = std::make_unique<IOUringReader>(512);
});
return *shared->io_uring_reader;
}
#endif
ThreadPool & Context::getThreadPoolWriter() const
{
callOnce(shared->threadpool_writer_initialized, [&] {

View File

@ -109,6 +109,7 @@ class AsynchronousInsertLog;
class BackupLog;
class BlobStorageLog;
class IAsynchronousReader;
class IOUringReader;
struct MergeTreeSettings;
struct InitialAllRangesAnnouncement;
struct ParallelReadRequest;
@ -1210,6 +1211,9 @@ public:
OrdinaryBackgroundExecutorPtr getCommonExecutor() const;
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) const;
#if USE_LIBURING
IOUringReader & getIOURingReader() const;
#endif
std::shared_ptr<AsyncReadCounters> getAsyncReadCounters() const;