From a395532d5442ec6b485ac08195ac05c494dde879 Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 26 Apr 2022 12:55:27 +0200 Subject: [PATCH] Add support for exact right bounded reads for some other buffers --- src/Common/ErrorCodes.cpp | 1 + src/Disks/IDisk.h | 2 +- src/Disks/IO/CachedReadBufferFromRemoteFS.cpp | 7 +++++++ src/IO/ReadBufferFromEncryptedFile.cpp | 14 +++++++++++++- src/IO/ReadBufferFromEncryptedFile.h | 6 ++++++ src/IO/ReadBufferFromFileDescriptor.cpp | 16 ++++++++++++++-- src/IO/ReadBufferFromFileDescriptor.h | 6 ++++++ src/IO/ReadBufferFromS3.h | 2 ++ src/IO/SeekableReadBuffer.h | 2 ++ src/Storages/MergeTree/MergeTreeData.cpp | 2 +- tests/config/config.d/storage_conf.xml | 18 ++++++++++++++++++ 11 files changed, 71 insertions(+), 5 deletions(-) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 3097af6207c..8e8d7725e7f 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -619,6 +619,7 @@ M(648, WRONG_DDL_RENAMING_SETTINGS) \ M(649, INVALID_TRANSACTION) \ M(650, SERIALIZATION_ERROR) \ + M(651, CANNOT_USE_CACHE) \ \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ diff --git a/src/Disks/IDisk.h b/src/Disks/IDisk.h index e79ac73778c..425d630fec3 100644 --- a/src/Disks/IDisk.h +++ b/src/Disks/IDisk.h @@ -205,7 +205,7 @@ public: /// Second bool param is a flag to remove (true) or keep (false) shared data on S3 virtual bool removeSharedFileIfExists(const String & path, bool) { return removeFileIfExists(path); } - virtual const String & getCacheBasePath() const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "There is not cache path"); } + virtual const String & getCacheBasePath() const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "There is no cache path"); } virtual bool isCached() const { return false; } diff --git a/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp b/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp index ae2f0cacc5a..30eea3c448e 100644 --- a/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp +++ b/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp @@ -20,6 +20,7 @@ namespace DB namespace ErrorCodes { extern const int CANNOT_SEEK_THROUGH_FILE; + extern const int CANNOT_USE_CACHE; extern const int LOGICAL_ERROR; } @@ -107,6 +108,12 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getRemoteFSReadBuffer(FileSe return remote_fs_segment_reader; remote_fs_segment_reader = remote_file_reader_creator(); + + if (!remote_fs_segment_reader->supportsRightBoundedReads()) + throw Exception( + ErrorCodes::CANNOT_USE_CACHE, + "Cache cannot be used with a ReadBuffer which does not support right bounded reads"); + file_segment->setRemoteFileReader(remote_fs_segment_reader); return remote_fs_segment_reader; diff --git a/src/IO/ReadBufferFromEncryptedFile.cpp b/src/IO/ReadBufferFromEncryptedFile.cpp index 7aec6dcde02..5a3876d31bd 100644 --- a/src/IO/ReadBufferFromEncryptedFile.cpp +++ b/src/IO/ReadBufferFromEncryptedFile.cpp @@ -88,7 +88,13 @@ bool ReadBufferFromEncryptedFile::nextImpl() size_t bytes_read = 0; while (bytes_read < encrypted_buffer.size() && !in->eof()) { - bytes_read += in->read(encrypted_buffer.data() + bytes_read, encrypted_buffer.size() - bytes_read); + size_t bytes_to_read = encrypted_buffer.size() - bytes_read; + if (read_until_position) + { + assert(*read_until_position > offset); + bytes_to_read = std::min(bytes_to_read, *read_until_position - offset); + } + bytes_read += in->read(encrypted_buffer.data() + bytes_read, bytes_to_read); } /// The used cipher algorithms generate the same number of bytes in output as it were in input, @@ -97,9 +103,15 @@ bool ReadBufferFromEncryptedFile::nextImpl() encryptor.decrypt(encrypted_buffer.data(), bytes_read, working_buffer.begin()); pos = working_buffer.begin(); + offset += bytes_read; return true; } +void ReadBufferFromEncryptedFile::setReadUntilPosition(size_t position) +{ + read_until_position = position; +} + } #endif diff --git a/src/IO/ReadBufferFromEncryptedFile.h b/src/IO/ReadBufferFromEncryptedFile.h index 66768beb28e..96c5aca61d7 100644 --- a/src/IO/ReadBufferFromEncryptedFile.h +++ b/src/IO/ReadBufferFromEncryptedFile.h @@ -26,12 +26,18 @@ public: std::string getFileName() const override { return in->getFileName(); } + bool supportsRightBoundedReads() const override { return true; } + + void setReadUntilPosition(size_t position) override; + private: bool nextImpl() override; std::unique_ptr in; off_t offset = 0; + std::optional read_until_position; + bool need_seek = false; Memory<> encrypted_buffer; diff --git a/src/IO/ReadBufferFromFileDescriptor.cpp b/src/IO/ReadBufferFromFileDescriptor.cpp index f7006666b13..58f99f501e1 100644 --- a/src/IO/ReadBufferFromFileDescriptor.cpp +++ b/src/IO/ReadBufferFromFileDescriptor.cpp @@ -69,10 +69,17 @@ bool ReadBufferFromFileDescriptor::nextImpl() { CurrentMetrics::Increment metric_increment{CurrentMetrics::Read}; + size_t bytes_to_read = internal_buffer.size(); + if (read_until_position) + { + assert(*read_until_position > file_offset_of_buffer_end); + bytes_to_read = std::min(bytes_to_read, *read_until_position - file_offset_of_buffer_end); + } + if (use_pread) - res = ::pread(fd, internal_buffer.begin(), internal_buffer.size(), file_offset_of_buffer_end); + res = ::pread(fd, internal_buffer.begin(), bytes_to_read, file_offset_of_buffer_end); else - res = ::read(fd, internal_buffer.begin(), internal_buffer.size()); + res = ::read(fd, internal_buffer.begin(), bytes_to_read); } if (!res) break; @@ -272,4 +279,9 @@ void ReadBufferFromFileDescriptor::setProgressCallback(ContextPtr context) }); } +void ReadBufferFromFileDescriptor::setReadUntilPosition(size_t position) +{ + read_until_position = position; +} + } diff --git a/src/IO/ReadBufferFromFileDescriptor.h b/src/IO/ReadBufferFromFileDescriptor.h index ba1502fb9aa..75279f14430 100644 --- a/src/IO/ReadBufferFromFileDescriptor.h +++ b/src/IO/ReadBufferFromFileDescriptor.h @@ -18,6 +18,8 @@ protected: bool use_pread = false; /// To access one fd from multiple threads, use 'pread' syscall instead of 'read'. size_t file_offset_of_buffer_end = 0; /// What offset in file corresponds to working_buffer.end(). + std::optional read_until_position; /// For right bounded reads. + int fd; bool nextImpl() override; @@ -59,6 +61,10 @@ public: void setProgressCallback(ContextPtr context); + bool supportsRightBoundedReads() const override { return true; } + + void setReadUntilPosition(size_t position) override; + private: /// Assuming file descriptor supports 'select', check that we have data to read or wait until timeout. bool poll(size_t timeout_microseconds); diff --git a/src/IO/ReadBufferFromS3.h b/src/IO/ReadBufferFromS3.h index 5282d9ad482..b3b9db5453f 100644 --- a/src/IO/ReadBufferFromS3.h +++ b/src/IO/ReadBufferFromS3.h @@ -70,6 +70,8 @@ public: size_t getFileOffsetOfBufferEnd() const override { return offset; } + bool supportsRightBoundedReads() const override { return true; } + private: std::unique_ptr initialize(); diff --git a/src/IO/SeekableReadBuffer.h b/src/IO/SeekableReadBuffer.h index 3a46630350a..a2ae11954b2 100644 --- a/src/IO/SeekableReadBuffer.h +++ b/src/IO/SeekableReadBuffer.h @@ -58,6 +58,8 @@ public: virtual String getInfoForLog() { return ""; } virtual size_t getFileOffsetOfBufferEnd() const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method getFileOffsetOfBufferEnd() not implemented"); } + + virtual bool supportsRightBoundedReads() const { return false; } }; using SeekableReadBufferPtr = std::shared_ptr; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 33064339e03..313919d5dac 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1216,7 +1216,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) for (const auto & [disk_name, disk] : getContext()->getDisksMap()) { - if (disk->isBroken()) + if (disk->isBroken() || disk->isCached()) continue; if (!defined_disk_names.contains(disk_name) && disk->exists(relative_data_path)) diff --git a/tests/config/config.d/storage_conf.xml b/tests/config/config.d/storage_conf.xml index f02e925f2c5..afd71853291 100644 --- a/tests/config/config.d/storage_conf.xml +++ b/tests/config/config.d/storage_conf.xml @@ -3,6 +3,7 @@ s3 + s3_disk/ http://localhost:11111/test/00170_test/ clickhouse clickhouse @@ -14,6 +15,16 @@ 22548578304 1 + + local + local_disk/ + + + cache + local_disk + 22548578304 + 1 + @@ -23,6 +34,13 @@ + + +
+ local_cache +
+
+