From 56f0560c468e04645acfd16301db1e6a29fb3cb3 Mon Sep 17 00:00:00 2001 From: kssenii Date: Thu, 21 Oct 2021 20:43:27 +0300 Subject: [PATCH] Asserts and read till end option --- src/Compression/CachedCompressedReadBuffer.h | 6 +++++ .../CompressedReadBufferFromFile.h | 4 ++- src/Disks/DiskWebServer.cpp | 2 +- src/Disks/HDFS/DiskHDFS.cpp | 2 +- ...chronousReadIndirectBufferFromRemoteFS.cpp | 27 ++++++++++++++----- ...ynchronousReadIndirectBufferFromRemoteFS.h | 10 ++++--- src/Disks/IO/ReadBufferFromRemoteFSGather.cpp | 9 +++++++ src/Disks/IO/ReadBufferFromRemoteFSGather.h | 4 ++- src/Disks/S3/DiskS3.cpp | 2 +- src/IO/ReadBuffer.h | 4 +++ src/IO/ReadSettings.h | 6 +++++ .../MergeTree/MergeTreeReaderStream.cpp | 10 ++++++- 12 files changed, 71 insertions(+), 15 deletions(-) diff --git a/src/Compression/CachedCompressedReadBuffer.h b/src/Compression/CachedCompressedReadBuffer.h index 86c7d3d1ce4..16770e343cc 100644 --- a/src/Compression/CachedCompressedReadBuffer.h +++ b/src/Compression/CachedCompressedReadBuffer.h @@ -64,6 +64,12 @@ public: if (file_in) file_in->setReadUntilPosition(position); } + + void setReadUntilEnd() override + { + if (file_in) + file_in->setReadUntilEnd(); + } }; } diff --git a/src/Compression/CompressedReadBufferFromFile.h b/src/Compression/CompressedReadBufferFromFile.h index 8d9abbb3c6c..2e6fbc86753 100644 --- a/src/Compression/CompressedReadBufferFromFile.h +++ b/src/Compression/CompressedReadBufferFromFile.h @@ -46,7 +46,7 @@ private: void prefetch() override; public: - CompressedReadBufferFromFile(std::unique_ptr buf, bool allow_different_codecs_ = false); + explicit CompressedReadBufferFromFile(std::unique_ptr buf, bool allow_different_codecs_ = false); CompressedReadBufferFromFile( const std::string & path, const ReadSettings & settings, size_t estimated_size, bool allow_different_codecs_ = false); @@ -64,6 +64,8 @@ public: } void setReadUntilPosition(size_t position) override { file_in.setReadUntilPosition(position); } + + void setReadUntilEnd() override { file_in.setReadUntilEnd(); } }; } diff --git a/src/Disks/DiskWebServer.cpp b/src/Disks/DiskWebServer.cpp index 009bae355ce..3d4aa2374d6 100644 --- a/src/Disks/DiskWebServer.cpp +++ b/src/Disks/DiskWebServer.cpp @@ -173,7 +173,7 @@ std::unique_ptr DiskWebServer::readFile(const String & p if (threadpool_read) { auto reader = IDiskRemote::getThreadPoolReader(); - return std::make_unique(reader, read_settings.priority, std::move(web_impl)); + return std::make_unique(reader, read_settings, std::move(web_impl), min_bytes_for_seek); } else { diff --git a/src/Disks/HDFS/DiskHDFS.cpp b/src/Disks/HDFS/DiskHDFS.cpp index b1f4db28f4f..d4e56761237 100644 --- a/src/Disks/HDFS/DiskHDFS.cpp +++ b/src/Disks/HDFS/DiskHDFS.cpp @@ -80,7 +80,7 @@ std::unique_ptr DiskHDFS::readFile(const String & path, if (read_settings.remote_fs_method == RemoteFSReadMethod::read_threadpool) { auto reader = getThreadPoolReader(); - return std::make_unique(reader, read_settings.priority, std::move(hdfs_impl)); + return std::make_unique(reader, read_settings, std::move(hdfs_impl)); } else { diff --git a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp index 64c9ce6c433..de360286b6a 100644 --- a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -35,16 +36,16 @@ namespace ErrorCodes AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRemoteFS( AsynchronousReaderPtr reader_, - Int32 priority_, + const ReadSettings & settings_, std::shared_ptr impl_, - size_t buf_size_, size_t min_bytes_for_seek_) - : ReadBufferFromFileBase(buf_size_, nullptr, 0) + : ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0) , reader(reader_) - , priority(priority_) + , priority(settings_.priority) , impl(impl_) - , prefetch_buffer(buf_size_) + , prefetch_buffer(settings_.remote_fs_buffer_size) , min_bytes_for_seek(min_bytes_for_seek_) + , must_read_until_position(settings_.must_read_until_position) { ProfileEvents::increment(ProfileEvents::RemoteFSBuffers); } @@ -69,6 +70,10 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::hasPendingDataToRead() throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})", file_offset_of_buffer_end, read_until_position); } + else if (must_read_until_position) + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Reading for MergeTree family tables must be done with last position boundary"); + return true; } @@ -111,7 +116,17 @@ void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilPosition(size_t pos throw Exception(ErrorCodes::LOGICAL_ERROR, "Prefetch is valid in readUntilPosition"); read_until_position = position; - impl->setReadUntilPosition(position); + impl->setReadUntilPosition(read_until_position); +} + + +void AsynchronousReadIndirectBufferFromRemoteFS::setReadUntilEnd() +{ + if (prefetch_future.valid()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Prefetch is valid in readUntilEnd"); + + read_until_position = impl->getFileSize(); + impl->setReadUntilPosition(read_until_position); } diff --git a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h index 5bc528bb123..d8fad08bc8a 100644 --- a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h +++ b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h @@ -13,9 +13,10 @@ namespace DB { class ReadBufferFromRemoteFSGather; +struct ReadSettings; /** -* Reads data from S3/HDFS/Web using stored paths in metadata. + * Reads data from S3/HDFS/Web using stored paths in metadata. * This class is an asynchronous version of ReadIndirectBufferFromRemoteFS. * * Buffers chain for diskS3: @@ -32,9 +33,8 @@ class AsynchronousReadIndirectBufferFromRemoteFS : public ReadBufferFromFileBase { public: explicit AsynchronousReadIndirectBufferFromRemoteFS( - AsynchronousReaderPtr reader_, Int32 priority_, + AsynchronousReaderPtr reader_, const ReadSettings & settings_, std::shared_ptr impl_, - size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE, size_t min_bytes_for_seek = 1024 * 1024); ~AsynchronousReadIndirectBufferFromRemoteFS() override; @@ -49,6 +49,8 @@ public: void setReadUntilPosition(size_t position) override; + void setReadUntilEnd() override; + private: bool nextImpl() override; @@ -75,6 +77,8 @@ private: size_t bytes_to_ignore = 0; size_t read_until_position = 0; + + bool must_read_until_position; }; } diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index b2484a7928e..d9e2d637fdc 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -170,4 +170,13 @@ String ReadBufferFromRemoteFSGather::getFileName() const return canonical_path; } + +size_t ReadBufferFromRemoteFSGather::getFileSize() const +{ + size_t size = 0; + for (const auto & object : metadata.remote_fs_objects) + size += object.second; + return size; +} + } diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.h b/src/Disks/IO/ReadBufferFromRemoteFSGather.h index a3ca8a081f5..5bc7d4e4819 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.h +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.h @@ -38,8 +38,10 @@ public: size_t readInto(char * data, size_t size, size_t offset, size_t ignore = 0); + size_t getFileSize() const; + protected: - virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t offset) const = 0; + virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t read_until_position) const = 0; RemoteMetadata metadata; diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index e04802d6b8c..660c7d186a9 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -214,7 +214,7 @@ std::unique_ptr DiskS3::readFile(const String & path, co if (threadpool_read) { auto reader = getThreadPoolReader(); - return std::make_unique(reader, read_settings.priority, std::move(s3_impl)); + return std::make_unique(reader, read_settings, std::move(s3_impl)); } else { diff --git a/src/IO/ReadBuffer.h b/src/IO/ReadBuffer.h index 29c1c4ce43a..be456ea398c 100644 --- a/src/IO/ReadBuffer.h +++ b/src/IO/ReadBuffer.h @@ -202,7 +202,11 @@ public: */ virtual void prefetch() {} + /** + * For reading from remote filesystem, when it matters how much we read. + */ virtual void setReadUntilPosition(size_t /* position */) {} + virtual void setReadUntilEnd() {} protected: /// The number of bytes to ignore from the initial position of `working_buffer` diff --git a/src/IO/ReadSettings.h b/src/IO/ReadSettings.h index edd5463bd7c..7c8ad12e158 100644 --- a/src/IO/ReadSettings.h +++ b/src/IO/ReadSettings.h @@ -77,6 +77,12 @@ struct ReadSettings size_t remote_fs_read_max_backoff_ms = 10000; size_t remote_fs_read_backoff_max_tries = 4; + /// Set to true for MergeTree tables to make sure + /// that last position (offset in compressed file) is always passed. + /// (Otherwise asynchronous reading from remote fs is not efficient). + /// If reading is done without final position set, throw logical_error. + bool must_read_until_position = false; + bool http_retriable_read = true; size_t http_max_tries = 1; size_t http_retry_initial_backoff_ms = 100; diff --git a/src/Storages/MergeTree/MergeTreeReaderStream.cpp b/src/Storages/MergeTree/MergeTreeReaderStream.cpp index 0c4e14d0ae8..717925a759c 100644 --- a/src/Storages/MergeTree/MergeTreeReaderStream.cpp +++ b/src/Storages/MergeTree/MergeTreeReaderStream.cpp @@ -50,6 +50,7 @@ MergeTreeReaderStream::MergeTreeReaderStream( /// Avoid empty buffer. May happen while reading dictionary for DataTypeLowCardinality. /// For example: part has single dictionary and all marks point to the same position. ReadSettings read_settings = settings.read_settings; + read_settings.must_read_until_position = true; if (max_mark_range_bytes != 0) read_settings = read_settings.adjustBufferSize(max_mark_range_bytes); @@ -183,7 +184,14 @@ void MergeTreeReaderStream::seekToStart() void MergeTreeReaderStream::adjustForRange(size_t left_mark, size_t right_mark) { auto [right_offset, mark_range_bytes] = getRightOffsetAndBytesRange(left_mark, right_mark); - if (right_offset > last_right_offset) + if (!right_offset) + { + if (cached_buffer) + cached_buffer->setReadUntilEnd(); + if (non_cached_buffer) + non_cached_buffer->setReadUntilEnd(); + } + else if (right_offset > last_right_offset) { last_right_offset = right_offset; if (cached_buffer)