Fix LocalObjectStorage trying to use mmap/io_uring and failing reads

This commit is contained in:
Michael Kolupaev 2024-08-26 22:58:03 +00:00
parent 033b9cc28c
commit 8b01e17be4
6 changed files with 51 additions and 24 deletions

View File

@ -3,6 +3,7 @@
#include <IO/AsyncReadCounters.h> #include <IO/AsyncReadCounters.h>
#include <IO/SeekableReadBuffer.h> #include <IO/SeekableReadBuffer.h>
#include <base/getThreadId.h> #include <base/getThreadId.h>
#include <base/demangle.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/CurrentThread.h> #include <Common/CurrentThread.h>
#include <Common/ElapsedTimeProfileEventIncrement.h> #include <Common/ElapsedTimeProfileEventIncrement.h>
@ -41,6 +42,11 @@ namespace CurrentMetrics
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace namespace
{ {
struct AsyncReadIncrement : boost::noncopyable struct AsyncReadIncrement : boost::noncopyable
@ -136,6 +142,7 @@ IAsynchronousReader::Result ThreadPoolRemoteFSReader::execute(Request request, b
if (request.ignore) if (request.ignore)
{ {
ProfileEvents::increment(ProfileEvents::AsynchronousReaderIgnoredBytes, request.ignore); ProfileEvents::increment(ProfileEvents::AsynchronousReaderIgnoredBytes, request.ignore);
chassert(request.ignore < request.size);
reader.ignore(request.ignore); reader.ignore(request.ignore);
} }
} }
@ -152,11 +159,15 @@ IAsynchronousReader::Result ThreadPoolRemoteFSReader::execute(Request request, b
IAsynchronousReader::Result read_result; IAsynchronousReader::Result read_result;
if (result) if (result)
{ {
chassert(reader.buffer().begin() == request.buf); if (reader.position() < request.buf || reader.buffer().end() > request.buf + request.size)
chassert(reader.buffer().end() <= request.buf + request.size); {
read_result.size = reader.buffer().size(); /// `reader` didn't put the data into the expected buffer. Maybe the ReadBuffer doesn't
read_result.offset = reader.offset(); /// support external buffer, maybe we switched buffer without calling reader.set().
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, read_result.size); throw Exception(ErrorCodes::LOGICAL_ERROR, "ThreadPoolRemoteFSReader got unexpected address range (seek_performed = {}, reader type: {}).", seek_performed, demangle(typeid(reader).name()));
}
read_result.size = reader.buffer().end() - request.buf;
read_result.offset = reader.position() - request.buf;
ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, reader.buffer().size());
} }
read_result.execution_watch = std::move(watch); read_result.execution_watch = std::move(watch);

View File

@ -29,9 +29,6 @@ private:
class RemoteFSFileDescriptor : public IAsynchronousReader::IFileDescriptor class RemoteFSFileDescriptor : public IAsynchronousReader::IFileDescriptor
{ {
public: public:
/// `reader_` implementation must ensure that next() places data at the start of internal_buffer,
/// even if there was previously a seek. I.e. seek() shouldn't leave pending data (no short seek
/// optimization), and nextImpl() shouldn't assign nextimpl_working_buffer_offset.
explicit RemoteFSFileDescriptor( explicit RemoteFSFileDescriptor(
SeekableReadBuffer & reader_, SeekableReadBuffer & reader_,
std::shared_ptr<AsyncReadCounters> async_read_counters_) std::shared_ptr<AsyncReadCounters> async_read_counters_)

View File

@ -80,25 +80,35 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
ReadSettings LocalObjectStorage::patchSettings(const ReadSettings & read_settings) const ReadSettings LocalObjectStorage::patchSettings(const ReadSettings & read_settings) const
{ {
if (!read_settings.enable_filesystem_cache) LocalFSReadMethod changed_method = read_settings.local_fs_method;
switch (read_settings.local_fs_method)
{
case LocalFSReadMethod::mmap:
/// MMap read buffer doesn't support external buffer, required by ReadBufferFromRemoteFSGather.
changed_method = LocalFSReadMethod::pread;
LOG_INFO(log, "Changing local object storage filesystem read method from `mmap` to `pread`");
break;
case LocalFSReadMethod::pread_threadpool:
case LocalFSReadMethod::pread_fake_async:
case LocalFSReadMethod::io_uring:
if (read_settings.enable_filesystem_cache)
{
/// For now we cannot allow asynchronous reader from local filesystem when CachedObjectStorage is used.
changed_method = LocalFSReadMethod::pread;
LOG_INFO(log, "Changing local object storage filesystem read method to `pread`");
}
break;
default:
break;
}
if (changed_method == read_settings.local_fs_method && read_settings.direct_io_threshold == 0)
return IObjectStorage::patchSettings(read_settings); return IObjectStorage::patchSettings(read_settings);
auto modified_settings{read_settings}; auto modified_settings{read_settings};
/// For now we cannot allow asynchronous reader from local filesystem when CachedObjectStorage is used. modified_settings.local_fs_method = changed_method;
switch (modified_settings.local_fs_method) /// O_DIRECT imposes buffer alignment restrictions, which ReadBufferFromRemoteFSGather doesn't support.
{ modified_settings.direct_io_threshold = 0;
case LocalFSReadMethod::pread_threadpool:
case LocalFSReadMethod::pread_fake_async:
{
modified_settings.local_fs_method = LocalFSReadMethod::pread;
LOG_INFO(log, "Changing local filesystem read method to `pread`");
break;
}
default:
{
break;
}
}
return IObjectStorage::patchSettings(modified_settings); return IObjectStorage::patchSettings(modified_settings);
} }

View File

@ -49,6 +49,8 @@ public:
size_t size = 0; size_t size = 0;
char * buf = nullptr; char * buf = nullptr;
Priority priority; Priority priority;
/// Skip the first `ignore` bytes. I.e. read `size - ignore` bytes at `offset + ignore`.
/// The skipping is done using ReadBuffer::ignore.
size_t ignore = 0; size_t ignore = 0;
}; };

View File

@ -1,2 +1,5 @@
1 test 1 test
1 test 1 test
1 test
1 test
1 test

View File

@ -14,6 +14,10 @@ SETTINGS disk = disk(
INSERT INTO test SELECT 1, 'test'; INSERT INTO test SELECT 1, 'test';
SELECT * FROM test; SELECT * FROM test;
SELECT * FROM test SETTINGS min_bytes_to_use_mmap_io=1, local_filesystem_read_method='mmap';
SELECT * FROM test SETTINGS local_filesystem_read_method='io_uring';
SELECT * FROM test SETTINGS min_bytes_to_use_direct_io=1;
DROP TABLE test SYNC; DROP TABLE test SYNC;
CREATE TABLE test (a Int32, b String) CREATE TABLE test (a Int32, b String)