diff --git a/src/Backups/BackupEntryFromImmutableFile.cpp b/src/Backups/BackupEntryFromImmutableFile.cpp index 41e8e913642..9be1b53c85d 100644 --- a/src/Backups/BackupEntryFromImmutableFile.cpp +++ b/src/Backups/BackupEntryFromImmutableFile.cpp @@ -41,7 +41,7 @@ std::unique_ptr BackupEntryFromImmutableFile::getReadBuffer() const if (disk) return disk->readFile(file_path); else - return createReadBufferFromFileBase(file_path, 0, 0, 0, nullptr); + return createReadBufferFromFileBase(file_path, {}, 0); } } diff --git a/src/Backups/BackupEntryFromSmallFile.cpp b/src/Backups/BackupEntryFromSmallFile.cpp index cbd517744f5..a4dd00ad9b1 100644 --- a/src/Backups/BackupEntryFromSmallFile.cpp +++ b/src/Backups/BackupEntryFromSmallFile.cpp @@ -10,7 +10,7 @@ namespace { String readFile(const String & file_path) { - auto buf = createReadBufferFromFileBase(file_path, 0, 0, 0, nullptr); + auto buf = createReadBufferFromFileBase(file_path, {}, 0); String s; readStringUntilEOF(s, *buf); return s; diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 183a9f40cc9..814aa8997dc 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -578,6 +578,7 @@ M(607, BACKUP_ELEMENT_DUPLICATE) \ M(608, CANNOT_RESTORE_TABLE) \ M(609, CANNOT_ADVISE) \ + M(610, UNKNOWN_READ_METHOD) \ \ M(998, POSTGRESQL_CONNECTION_FAILURE) \ M(999, KEEPER_EXCEPTION) \ diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 2758c4da30d..46fa915595e 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -3,6 +3,7 @@ #include #include #include +#include namespace Poco::Util @@ -500,7 +501,7 @@ class IColumn; \ M(String, local_filesystem_read_method, "pread", "Method of reading data from local filesystem, one of: read, pread, mmap, pread_threadpool.", 0) \ M(Bool, local_filesystem_read_prefetch, false, "Should use prefetching when reading data from local filesystem.", 0) \ - M(Int32, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \ + M(Int64, read_priority, 0, "Priority to read data from local filesystem. Only supported for 'pread_threadpool' method.", 0) \ \ /** Experimental functions */ \ M(Bool, allow_experimental_funnel_functions, false, "Enable experimental functions for funnel analysis.", 0) \ diff --git a/src/Disks/DiskCacheWrapper.cpp b/src/Disks/DiskCacheWrapper.cpp index 1805fb20026..270b7d836ca 100644 --- a/src/Disks/DiskCacheWrapper.cpp +++ b/src/Disks/DiskCacheWrapper.cpp @@ -177,7 +177,7 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode [this, path, buf_size, mode]() { /// Copy file from cache to actual disk when cached buffer is finalized. - auto src_buffer = cache_disk->readFile(path); + auto src_buffer = cache_disk->readFile(path, ReadSettings(), 0); auto dst_buffer = DiskDecorator::writeFile(path, buf_size, mode); copyData(*src_buffer, *dst_buffer); dst_buffer->finalize(); diff --git a/src/Disks/DiskEncrypted.cpp b/src/Disks/DiskEncrypted.cpp index f1a9410c6af..6a96caed6d5 100644 --- a/src/Disks/DiskEncrypted.cpp +++ b/src/Disks/DiskEncrypted.cpp @@ -262,7 +262,7 @@ std::unique_ptr DiskEncrypted::writeFile(const String & if (old_file_size) { /// Append mode: we continue to use the same header. - auto read_buffer = delegate->readFile(wrapped_path, FileEncryption::Header::kSize); + auto read_buffer = delegate->readFile(wrapped_path, ReadSettings().adjustBufferSize(FileEncryption::Header::kSize)); header = readHeader(*read_buffer); key = getKey(path, header, *settings); } diff --git a/src/IO/ReadBufferFromFileDescriptor.cpp b/src/IO/ReadBufferFromFileDescriptor.cpp index 2685ea9ff78..7e539f94d39 100644 --- a/src/IO/ReadBufferFromFileDescriptor.cpp +++ b/src/IO/ReadBufferFromFileDescriptor.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include diff --git a/src/IO/ReadSettings.h b/src/IO/ReadSettings.h index a84a9b4c5cc..9ea12e20999 100644 --- a/src/IO/ReadSettings.h +++ b/src/IO/ReadSettings.h @@ -52,6 +52,14 @@ struct ReadSettings /// For 'pread_threadpool' method. Lower is more priority. size_t priority = 0; + + ReadSettings adjustBufferSize(size_t file_size) const + { + ReadSettings res = *this; + res.local_fs_buffer_size = std::min(file_size, local_fs_buffer_size); + res.remote_fs_buffer_size = std::min(file_size, remote_fs_buffer_size); + return res; + } }; } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index f59d50dbdeb..436f88763fd 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -118,6 +118,7 @@ namespace ErrorCodes extern const int TABLE_SIZE_EXCEEDS_MAX_DROP_SIZE_LIMIT; extern const int LOGICAL_ERROR; extern const int NOT_IMPLEMENTED; + extern const int UNKNOWN_READ_METHOD; } @@ -2691,4 +2692,31 @@ PartUUIDsPtr Context::getIgnoredPartUUIDs() const return ignored_part_uuids; } + +ReadSettings Context::getReadSettings() const +{ + ReadSettings res; + + if (settings.local_filesystem_read_method.value == "read") + res.local_fs_method = ReadMethod::read; + else if (settings.local_filesystem_read_method.value == "pread") + res.local_fs_method = ReadMethod::pread; + else if (settings.local_filesystem_read_method.value == "mmap") + res.local_fs_method = ReadMethod::mmap; + else if (settings.local_filesystem_read_method.value == "pread_threadpool") + res.local_fs_method = ReadMethod::pread_threadpool; + else + throw Exception(ErrorCodes::UNKNOWN_READ_METHOD, "Unknown read method '{}'", settings.local_filesystem_read_method.value); + + res.local_fs_prefetch = settings.local_filesystem_read_prefetch; + res.local_fs_buffer_size = settings.max_read_buffer_size; + res.direct_io_threshold = settings.min_bytes_to_use_direct_io; + res.mmap_threshold = settings.min_bytes_to_use_mmap_io; + res.priority = settings.read_priority; + + res.mmap_cache = getMMappedFileCache().get(); + + return res; +} + } diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 9527b87ed39..a2c30539234 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -825,6 +825,9 @@ public: ReadTaskCallback getReadTaskCallback() const; void setReadTaskCallback(ReadTaskCallback && callback); + /** Get settings for reading from filesystem. */ + ReadSettings getReadSettings() const; + private: std::unique_lock getLock() const; diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index dc3e863b841..ce717021c44 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -43,11 +43,9 @@ namespace ErrorCodes static MergeTreeReaderSettings getMergeTreeReaderSettings(const ContextPtr & context) { const auto & settings = context->getSettingsRef(); - return { - .min_bytes_to_use_direct_io = settings.min_bytes_to_use_direct_io, - .min_bytes_to_use_mmap_io = settings.min_bytes_to_use_mmap_io, - .mmap_cache = context->getMMappedFileCache(), - .max_read_buffer_size = settings.max_read_buffer_size, + return + { + .read_settings = context->getReadSettings(), .save_marks_in_cache = true, .checksum_on_read = settings.checksum_on_read, }; diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index de250de5f1a..6d598c8d93d 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -57,10 +57,7 @@ namespace ErrorCodes static std::unique_ptr openForReading(const DiskPtr & disk, const String & path) { size_t file_size = disk->getFileSize(path); - ReadSettings settings; - settings.local_fs_buffer_size = std::min(DBMS_DEFAULT_BUFFER_SIZE, file_size); - settings.remote_fs_buffer_size = settings.local_fs_buffer_size; - return disk->readFile(path, settings, file_size); + return disk->readFile(path, ReadSettings().adjustBufferSize(file_size), file_size); } void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const DiskPtr & disk_, const String & part_path) diff --git a/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp index 726acd48926..ad1c2abeee7 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp @@ -107,9 +107,7 @@ void MergeTreeDataPartCompact::loadIndexGranularity() size_t marks_file_size = volume->getDisk()->getFileSize(marks_file_path); - ReadSettings settings; - settings.local_fs_buffer_size = settings.remote_fs_buffer_size = marks_file_size; - auto buffer = volume->getDisk()->readFile(marks_file_path, settings, marks_file_size); + auto buffer = volume->getDisk()->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size); while (!buffer->eof()) { /// Skip offsets for columns diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp index f0fff25e929..d2484cf3536 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp @@ -129,9 +129,7 @@ void MergeTreeDataPartWide::loadIndexGranularity() } else { - ReadSettings settings; - settings.local_fs_buffer_size = settings.remote_fs_buffer_size = marks_file_size; - auto buffer = volume->getDisk()->readFile(marks_file_path, marks_file_size); + auto buffer = volume->getDisk()->readFile(marks_file_path, ReadSettings().adjustBufferSize(marks_file_size), marks_file_size); while (!buffer->eof()) { buffer->seek(sizeof(size_t) * 2, SEEK_CUR); /// skip offset_in_compressed file and offset_in_decompressed_block diff --git a/src/Storages/MergeTree/MergeTreeIOSettings.h b/src/Storages/MergeTree/MergeTreeIOSettings.h index dd241cfd591..8b67ab53b59 100644 --- a/src/Storages/MergeTree/MergeTreeIOSettings.h +++ b/src/Storages/MergeTree/MergeTreeIOSettings.h @@ -13,10 +13,8 @@ using MMappedFileCachePtr = std::shared_ptr; struct MergeTreeReaderSettings { - size_t min_bytes_to_use_direct_io = 0; - size_t min_bytes_to_use_mmap_io = 0; - MMappedFileCachePtr mmap_cache; - size_t max_read_buffer_size = DBMS_DEFAULT_BUFFER_SIZE; + /// Common read settings. + ReadSettings read_settings; /// If save_marks_in_cache is false, then, if marks are not in cache, /// we will load them but won't save in the cache, to avoid evicting other data. bool save_marks_in_cache = false; diff --git a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp index 5e42cb3eacb..446ae9b97a1 100644 --- a/src/Storages/MergeTree/MergeTreeMarksLoader.cpp +++ b/src/Storages/MergeTree/MergeTreeMarksLoader.cpp @@ -60,13 +60,10 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl() auto res = std::make_shared(marks_count * columns_in_mark); - ReadSettings settings; - settings.local_fs_buffer_size = settings.remote_fs_buffer_size = file_size; - if (!index_granularity_info.is_adaptive) { /// Read directly to marks. - auto buffer = disk->readFile(mrk_path, settings, file_size); + auto buffer = disk->readFile(mrk_path, ReadSettings().adjustBufferSize(file_size), file_size); buffer->readStrict(reinterpret_cast(res->data()), file_size); if (!buffer->eof()) @@ -75,7 +72,7 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl() } else { - auto buffer = disk->readFile(mrk_path, settings, file_size); + auto buffer = disk->readFile(mrk_path, ReadSettings().adjustBufferSize(file_size), file_size); size_t i = 0; while (!buffer->eof()) { diff --git a/src/Storages/MergeTree/MergeTreePartition.cpp b/src/Storages/MergeTree/MergeTreePartition.cpp index ad176864246..741606b994a 100644 --- a/src/Storages/MergeTree/MergeTreePartition.cpp +++ b/src/Storages/MergeTree/MergeTreePartition.cpp @@ -172,9 +172,7 @@ namespace static std::unique_ptr openForReading(const DiskPtr & disk, const String & path) { size_t file_size = disk->getFileSize(path); - ReadSettings settings; - settings.local_fs_buffer_size = settings.remote_fs_buffer_size = std::min(DBMS_DEFAULT_BUFFER_SIZE, file_size); - return disk->readFile(path, settings, file_size); + return disk->readFile(path, ReadSettings().adjustBufferSize(file_size), file_size); } String MergeTreePartition::getID(const MergeTreeData & storage) const diff --git a/src/Storages/StorageLog.cpp b/src/Storages/StorageLog.cpp index 0e156f24cc2..0dfebb6195a 100644 --- a/src/Storages/StorageLog.cpp +++ b/src/Storages/StorageLog.cpp @@ -63,14 +63,14 @@ public: LogSource( size_t block_size_, const NamesAndTypesList & columns_, StorageLog & storage_, - size_t mark_number_, size_t rows_limit_, size_t max_read_buffer_size_) + size_t mark_number_, size_t rows_limit_, ReadSettings read_settings_) : SourceWithProgress(getHeader(columns_)), block_size(block_size_), columns(columns_), storage(storage_), mark_number(mark_number_), rows_limit(rows_limit_), - max_read_buffer_size(max_read_buffer_size_) + read_settings(std::move(read_settings_)) { } @@ -86,14 +86,14 @@ private: size_t mark_number; /// from what mark to read data size_t rows_limit; /// The maximum number of rows that can be read size_t rows_read = 0; - size_t max_read_buffer_size; + ReadSettings read_settings; std::unordered_map serializations; struct Stream { - Stream(const DiskPtr & disk, const String & data_path, size_t offset, size_t max_read_buffer_size_) - : plain(disk->readFile(data_path, std::min(max_read_buffer_size_, disk->getFileSize(data_path)))) + Stream(const DiskPtr & disk, const String & data_path, size_t offset, ReadSettings read_settings_) + : plain(disk->readFile(data_path, read_settings_.adjustBufferSize(disk->getFileSize(data_path)))) , compressed(*plain) { if (offset) @@ -188,7 +188,7 @@ void LogSource::readData(const NameAndTypePair & name_and_type, ColumnPtr & colu offset = file_it->second.marks[mark_number].offset; auto & data_file_path = file_it->second.data_file_path; - auto it = streams.try_emplace(stream_name, storage.disk, data_file_path, offset, max_read_buffer_size).first; + auto it = streams.try_emplace(stream_name, storage.disk, data_file_path, offset, read_settings).first; return &it->second.compressed; }; @@ -563,7 +563,7 @@ void StorageLog::loadMarks(std::chrono::seconds lock_timeout) for (auto & file : files_by_index) file->second.marks.reserve(marks_count); - std::unique_ptr marks_rb = disk->readFile(marks_file_path, 32768); + std::unique_ptr marks_rb = disk->readFile(marks_file_path, ReadSettings().adjustBufferSize(32768)); while (!marks_rb->eof()) { for (auto & file : files_by_index) @@ -678,7 +678,7 @@ Pipe StorageLog::read( if (num_streams > marks_size) num_streams = marks_size; - size_t max_read_buffer_size = context->getSettingsRef().max_read_buffer_size; + ReadSettings read_settings = context->getReadSettings(); for (size_t stream = 0; stream < num_streams; ++stream) { @@ -694,7 +694,7 @@ Pipe StorageLog::read( *this, mark_begin, rows_end - rows_begin, - max_read_buffer_size)); + read_settings)); } /// No need to hold lock while reading because we read fixed range of data that does not change while appending more data. diff --git a/src/Storages/StorageStripeLog.cpp b/src/Storages/StorageStripeLog.cpp index 6bf91a145ed..99e6ba2fa1f 100644 --- a/src/Storages/StorageStripeLog.cpp +++ b/src/Storages/StorageStripeLog.cpp @@ -78,7 +78,7 @@ public: StorageStripeLog & storage_, const StorageMetadataPtr & metadata_snapshot_, const Names & column_names, - size_t max_read_buffer_size_, + ReadSettings read_settings_, std::shared_ptr & index_, IndexForNativeFormat::Blocks::const_iterator index_begin_, IndexForNativeFormat::Blocks::const_iterator index_end_) @@ -86,7 +86,7 @@ public: getHeader(storage_, metadata_snapshot_, column_names, index_begin_, index_end_)) , storage(storage_) , metadata_snapshot(metadata_snapshot_) - , max_read_buffer_size(max_read_buffer_size_) + , read_settings(std::move(read_settings_)) , index(index_) , index_begin(index_begin_) , index_end(index_end_) @@ -123,7 +123,7 @@ protected: private: StorageStripeLog & storage; StorageMetadataPtr metadata_snapshot; - size_t max_read_buffer_size; + ReadSettings read_settings; std::shared_ptr index; IndexForNativeFormat::Blocks::const_iterator index_begin; @@ -145,9 +145,7 @@ private: started = true; String data_file_path = storage.table_path + "data.bin"; - size_t buffer_size = std::min(max_read_buffer_size, storage.disk->getFileSize(data_file_path)); - - data_in.emplace(storage.disk->readFile(data_file_path, buffer_size)); + data_in.emplace(storage.disk->readFile(data_file_path, read_settings.adjustBufferSize(storage.disk->getFileSize(data_file_path)))); block_in.emplace(*data_in, 0, index_begin, index_end); } } @@ -345,7 +343,9 @@ Pipe StorageStripeLog::read( return Pipe(std::make_shared(metadata_snapshot->getSampleBlockForColumns(column_names, getVirtuals(), getStorageID()))); } - CompressedReadBufferFromFile index_in(disk->readFile(index_file, 4096)); + ReadSettings read_settings = context->getReadSettings(); + + CompressedReadBufferFromFile index_in(disk->readFile(index_file, read_settings.adjustBufferSize(4096))); std::shared_ptr index{std::make_shared(index_in, column_names_set)}; size_t size = index->blocks.size(); @@ -361,7 +361,7 @@ Pipe StorageStripeLog::read( std::advance(end, (stream + 1) * size / num_streams); pipes.emplace_back(std::make_shared( - *this, metadata_snapshot, column_names, context->getSettingsRef().max_read_buffer_size, index, begin, end)); + *this, metadata_snapshot, column_names, read_settings, index, begin, end)); } /// We do not keep read lock directly at the time of reading, because we read ranges of data that do not change. diff --git a/src/Storages/StorageTinyLog.cpp b/src/Storages/StorageTinyLog.cpp index 53ae74c4e00..7ab891d5936 100644 --- a/src/Storages/StorageTinyLog.cpp +++ b/src/Storages/StorageTinyLog.cpp @@ -70,11 +70,11 @@ public: size_t block_size_, const NamesAndTypesList & columns_, StorageTinyLog & storage_, - size_t max_read_buffer_size_, + ReadSettings read_settings_, FileChecker::Map file_sizes_) : SourceWithProgress(getHeader(columns_)) , block_size(block_size_), columns(columns_), storage(storage_) - , max_read_buffer_size(max_read_buffer_size_), file_sizes(std::move(file_sizes_)) + , read_settings(std::move(read_settings_)), file_sizes(std::move(file_sizes_)) { } @@ -88,13 +88,15 @@ private: NamesAndTypesList columns; StorageTinyLog & storage; bool is_finished = false; - size_t max_read_buffer_size; + ReadSettings read_settings; FileChecker::Map file_sizes; struct Stream { - Stream(const DiskPtr & disk, const String & data_path, size_t max_read_buffer_size_, size_t file_size) - : plain(file_size ? disk->readFile(data_path, std::min(max_read_buffer_size_, file_size)) : std::make_unique(nullptr, 0)), + Stream(const DiskPtr & disk, const String & data_path, ReadSettings read_settings_, size_t file_size) + : plain(file_size + ? disk->readFile(data_path, read_settings_.adjustBufferSize(file_size)) + : std::make_unique(nullptr, 0)), limited(std::make_unique(*plain, file_size, false)), compressed(*limited) { @@ -178,7 +180,7 @@ void TinyLogSource::readData(const NameAndTypePair & name_and_type, { String file_path = storage.files[stream_name].data_file_path; stream = std::make_unique( - storage.disk, file_path, max_read_buffer_size, file_sizes[fileName(file_path)]); + storage.disk, file_path, read_settings, file_sizes[fileName(file_path)]); } return &stream->compressed; @@ -493,8 +495,6 @@ Pipe StorageTinyLog::read( // When reading, we lock the entire storage, because we only have one file // per column and can't modify it concurrently. - const Settings & settings = context->getSettingsRef(); - std::shared_lock lock{rwlock, getLockTimeout(context)}; if (!lock) throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED); @@ -504,7 +504,7 @@ Pipe StorageTinyLog::read( max_block_size, Nested::convertToSubcolumns(all_columns), *this, - settings.max_read_buffer_size, + context->getReadSettings(), file_checker.getFileSizes())); }