diff --git a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp index 2df087e941f..fc095ddb543 100644 --- a/src/Disks/IO/ThreadPoolRemoteFSReader.cpp +++ b/src/Disks/IO/ThreadPoolRemoteFSReader.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -41,6 +42,11 @@ namespace CurrentMetrics namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + namespace { struct AsyncReadIncrement : boost::noncopyable @@ -136,6 +142,7 @@ IAsynchronousReader::Result ThreadPoolRemoteFSReader::execute(Request request, b if (request.ignore) { ProfileEvents::increment(ProfileEvents::AsynchronousReaderIgnoredBytes, request.ignore); + chassert(request.ignore < request.size); reader.ignore(request.ignore); } } @@ -152,11 +159,15 @@ IAsynchronousReader::Result ThreadPoolRemoteFSReader::execute(Request request, b IAsynchronousReader::Result read_result; if (result) { - chassert(reader.buffer().begin() == request.buf); - chassert(reader.buffer().end() <= request.buf + request.size); - read_result.size = reader.buffer().size(); - read_result.offset = reader.offset(); - ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, read_result.size); + if (reader.position() < request.buf || reader.buffer().end() > request.buf + request.size) + { + /// `reader` didn't put the data into the expected buffer. Maybe the ReadBuffer doesn't + /// support external buffer, maybe we switched buffer without calling reader.set(). + throw Exception(ErrorCodes::LOGICAL_ERROR, "ThreadPoolRemoteFSReader got unexpected address range (seek_performed = {}, reader type: {}).", seek_performed, demangle(typeid(reader).name())); + } + read_result.size = reader.buffer().end() - request.buf; + read_result.offset = reader.position() - request.buf; + ProfileEvents::increment(ProfileEvents::ThreadpoolReaderReadBytes, reader.buffer().size()); } read_result.execution_watch = std::move(watch); diff --git a/src/Disks/IO/ThreadPoolRemoteFSReader.h b/src/Disks/IO/ThreadPoolRemoteFSReader.h index eacce5a54ac..abc251b2b10 100644 --- a/src/Disks/IO/ThreadPoolRemoteFSReader.h +++ b/src/Disks/IO/ThreadPoolRemoteFSReader.h @@ -29,9 +29,6 @@ private: class RemoteFSFileDescriptor : public IAsynchronousReader::IFileDescriptor { public: - /// `reader_` implementation must ensure that next() places data at the start of internal_buffer, - /// even if there was previously a seek. I.e. seek() shouldn't leave pending data (no short seek - /// optimization), and nextImpl() shouldn't assign nextimpl_working_buffer_offset. explicit RemoteFSFileDescriptor( SeekableReadBuffer & reader_, std::shared_ptr async_read_counters_) diff --git a/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp b/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp index 5b61c57ca21..b9fdb41e27d 100644 --- a/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp +++ b/src/Disks/ObjectStorages/Local/LocalObjectStorage.cpp @@ -80,25 +80,35 @@ std::unique_ptr LocalObjectStorage::readObjects( /// NOL ReadSettings LocalObjectStorage::patchSettings(const ReadSettings & read_settings) const { - if (!read_settings.enable_filesystem_cache) + LocalFSReadMethod changed_method = read_settings.local_fs_method; + switch (read_settings.local_fs_method) + { + case LocalFSReadMethod::mmap: + /// MMap read buffer doesn't support external buffer, required by ReadBufferFromRemoteFSGather. + changed_method = LocalFSReadMethod::pread; + LOG_INFO(log, "Changing local object storage filesystem read method from `mmap` to `pread`"); + break; + case LocalFSReadMethod::pread_threadpool: + case LocalFSReadMethod::pread_fake_async: + case LocalFSReadMethod::io_uring: + if (read_settings.enable_filesystem_cache) + { + /// For now we cannot allow asynchronous reader from local filesystem when CachedObjectStorage is used. + changed_method = LocalFSReadMethod::pread; + LOG_INFO(log, "Changing local object storage filesystem read method to `pread`"); + } + break; + default: + break; + } + + if (changed_method == read_settings.local_fs_method && read_settings.direct_io_threshold == 0) return IObjectStorage::patchSettings(read_settings); auto modified_settings{read_settings}; - /// For now we cannot allow asynchronous reader from local filesystem when CachedObjectStorage is used. - switch (modified_settings.local_fs_method) - { - case LocalFSReadMethod::pread_threadpool: - case LocalFSReadMethod::pread_fake_async: - { - modified_settings.local_fs_method = LocalFSReadMethod::pread; - LOG_INFO(log, "Changing local filesystem read method to `pread`"); - break; - } - default: - { - break; - } - } + modified_settings.local_fs_method = changed_method; + /// O_DIRECT imposes buffer alignment restrictions, which ReadBufferFromRemoteFSGather doesn't support. + modified_settings.direct_io_threshold = 0; return IObjectStorage::patchSettings(modified_settings); } diff --git a/src/IO/AsynchronousReader.h b/src/IO/AsynchronousReader.h index 815a7b2774e..e59e3f89526 100644 --- a/src/IO/AsynchronousReader.h +++ b/src/IO/AsynchronousReader.h @@ -49,6 +49,8 @@ public: size_t size = 0; char * buf = nullptr; Priority priority; + /// Skip the first `ignore` bytes. I.e. read `size - ignore` bytes at `offset + ignore`. + /// The skipping is done using ReadBuffer::ignore. size_t ignore = 0; }; diff --git a/tests/queries/0_stateless/02714_local_object_storage.reference b/tests/queries/0_stateless/02714_local_object_storage.reference index b3f28057554..1558ea7dbf8 100644 --- a/tests/queries/0_stateless/02714_local_object_storage.reference +++ b/tests/queries/0_stateless/02714_local_object_storage.reference @@ -1,2 +1,5 @@ 1 test 1 test +1 test +1 test +1 test diff --git a/tests/queries/0_stateless/02714_local_object_storage.sql b/tests/queries/0_stateless/02714_local_object_storage.sql index bfe3162df97..0e8d01e0251 100644 --- a/tests/queries/0_stateless/02714_local_object_storage.sql +++ b/tests/queries/0_stateless/02714_local_object_storage.sql @@ -14,6 +14,10 @@ SETTINGS disk = disk( INSERT INTO test SELECT 1, 'test'; SELECT * FROM test; +SELECT * FROM test SETTINGS min_bytes_to_use_mmap_io=1, local_filesystem_read_method='mmap'; +SELECT * FROM test SETTINGS local_filesystem_read_method='io_uring'; +SELECT * FROM test SETTINGS min_bytes_to_use_direct_io=1; + DROP TABLE test SYNC; CREATE TABLE test (a Int32, b String)