From 1d6e77a29a1e36e755f15d5c720fd1690f7d63a2 Mon Sep 17 00:00:00 2001 From: pufit Date: Sun, 11 Dec 2022 16:15:41 -0500 Subject: [PATCH] Move reader selection logic back to `StorageFile`. --- src/Disks/IO/createReadBufferFromFileBase.cpp | 91 ++++--------------- src/Disks/IO/createReadBufferFromFileBase.h | 18 ---- src/Storages/StorageFile.cpp | 84 ++++++++++++++--- 3 files changed, 86 insertions(+), 107 deletions(-) diff --git a/src/Disks/IO/createReadBufferFromFileBase.cpp b/src/Disks/IO/createReadBufferFromFileBase.cpp index 911c677300f..fca05787959 100644 --- a/src/Disks/IO/createReadBufferFromFileBase.cpp +++ b/src/Disks/IO/createReadBufferFromFileBase.cpp @@ -2,7 +2,6 @@ #include #include #include -#include #include #include #include @@ -24,37 +23,22 @@ namespace DB namespace ErrorCodes { extern const int LOGICAL_ERROR; - extern const int CANNOT_STAT; } -std::unique_ptr createReadBufferFromFileOrFileDescriptorBase( +std::unique_ptr createReadBufferFromFileBase( const std::string & filename, const ReadSettings & settings, std::optional read_hint, std::optional file_size, int flags, char * existing_memory, - size_t alignment, - bool read_from_fd, - int fd) + size_t alignment) { if (file_size.has_value() && !*file_size) return std::make_unique(); - struct stat file_stat{}; - if (read_from_fd) - { - if (0 != fstat(fd, &file_stat)) - throwFromErrno("Cannot stat file descriptor", ErrorCodes::CANNOT_STAT); - } - else - { - if (0 != stat(filename.c_str(), &file_stat)) - throwFromErrno("Cannot stat file " + filename, ErrorCodes::CANNOT_STAT); - } - - size_t estimated_size = file_stat.st_size; + size_t estimated_size = 0; if (read_hint.has_value()) estimated_size = *read_hint; else if (file_size.has_value()) @@ -63,24 +47,23 @@ std::unique_ptr createReadBufferFromFileOrFileDescriptor if (!existing_memory && settings.local_fs_method == LocalFSReadMethod::mmap && settings.mmap_threshold - && estimated_size >= settings.mmap_threshold - && S_ISREG(file_stat.st_mode)) + && settings.mmap_cache + && estimated_size >= settings.mmap_threshold) { try { - std::unique_ptr res; - - if (settings.mmap_cache) - res = std::make_unique(*settings.mmap_cache, filename, 0, estimated_size); + std::unique_ptr res; + if (file_size) + res = std::make_unique(*settings.mmap_cache, filename, 0, *file_size); else - res = std::make_unique(filename, 0, estimated_size); + res = std::make_unique(*settings.mmap_cache, filename, 0, *file_size); ProfileEvents::increment(ProfileEvents::CreatedReadBufferMMap); return res; } catch (const ErrnoException &) { - /// Fallback if mmap is not supported. + /// Fallback if mmap is not supported (example: pipe). ProfileEvents::increment(ProfileEvents::CreatedReadBufferMMapFailed); } } @@ -89,21 +72,13 @@ std::unique_ptr createReadBufferFromFileOrFileDescriptor { std::unique_ptr res; - /// Pread works only with regular files, so we explicitly fallback to read in other cases. - if (settings.local_fs_method == LocalFSReadMethod::read || !S_ISREG(file_stat.st_mode)) + if (settings.local_fs_method == LocalFSReadMethod::read) { - if (read_from_fd) - res = std::make_unique(fd, buffer_size, existing_memory, alignment, file_size); - else - res = std::make_unique(filename, buffer_size, actual_flags, existing_memory, alignment, file_size); + res = std::make_unique(filename, buffer_size, actual_flags, existing_memory, alignment, file_size); } else if (settings.local_fs_method == LocalFSReadMethod::pread || settings.local_fs_method == LocalFSReadMethod::mmap) { - if (read_from_fd) - res = std::make_unique(fd, buffer_size, existing_memory, alignment, file_size); - else - res = std::make_unique( - filename, buffer_size, actual_flags, existing_memory, alignment, file_size); + res = std::make_unique(filename, buffer_size, actual_flags, existing_memory, alignment, file_size); } else if (settings.local_fs_method == LocalFSReadMethod::pread_fake_async) { @@ -112,13 +87,8 @@ std::unique_ptr createReadBufferFromFileOrFileDescriptor throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized"); auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER); - - if (read_from_fd) - res = std::make_unique( - reader, settings.priority, fd, buffer_size, existing_memory, alignment, file_size); - else - res = std::make_unique( - reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment, file_size); + res = std::make_unique( + reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment, file_size); } else if (settings.local_fs_method == LocalFSReadMethod::pread_threadpool) { @@ -127,13 +97,8 @@ std::unique_ptr createReadBufferFromFileOrFileDescriptor throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized"); auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER); - - if (read_from_fd) - res = std::make_unique( - reader, settings.priority, fd, buffer_size, existing_memory, alignment, file_size); - else - res = std::make_unique( - reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment, file_size); + res = std::make_unique( + reader, settings.priority, filename, buffer_size, actual_flags, existing_memory, alignment, file_size); } else throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown read method"); @@ -209,26 +174,4 @@ std::unique_ptr createReadBufferFromFileOrFileDescriptor return create(buffer_size, flags); } -std::unique_ptr createReadBufferFromFileBase( - const std::string & filename, - const ReadSettings & settings, - std::optional read_hint, - std::optional file_size, - int flags_, - char * existing_memory, - size_t alignment) -{ - return createReadBufferFromFileOrFileDescriptorBase(filename, settings, read_hint, file_size, flags_, existing_memory, alignment); -} - -std::unique_ptr createReadBufferFromFileDescriptorBase( - int fd, - const ReadSettings & settings, - std::optional read_hint, - std::optional file_size, - char * existing_memory , - size_t alignment) -{ - return createReadBufferFromFileOrFileDescriptorBase({}, settings, read_hint, file_size, -1, existing_memory, alignment, true, fd); -} } diff --git a/src/Disks/IO/createReadBufferFromFileBase.h b/src/Disks/IO/createReadBufferFromFileBase.h index 542ea423462..c2e2040587b 100644 --- a/src/Disks/IO/createReadBufferFromFileBase.h +++ b/src/Disks/IO/createReadBufferFromFileBase.h @@ -14,17 +14,6 @@ namespace DB * @param read_hint - the number of bytes to read hint * @param file_size - size of file */ -std::unique_ptr createReadBufferFromFileOrFileDescriptorBase( - const std::string & filename, - const ReadSettings & settings, - std::optional read_hint = {}, - std::optional file_size = {}, - int flags_ = -1, - char * existing_memory = nullptr, - size_t alignment = 0, - bool read_from_fd = false, - int fd = 0); - std::unique_ptr createReadBufferFromFileBase( const std::string & filename, const ReadSettings & settings, @@ -34,11 +23,4 @@ std::unique_ptr createReadBufferFromFileBase( char * existing_memory = nullptr, size_t alignment = 0); -std::unique_ptr createReadBufferFromFileDescriptorBase( - int fd, - const ReadSettings & settings, - std::optional read_hint = {}, - std::optional file_size = {}, - char * existing_memory = nullptr, - size_t alignment = 0); } diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index 6154cdb73ca..1c61370a392 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -38,8 +38,7 @@ #include #include #include - -#include +#include #include #include @@ -51,6 +50,13 @@ #include +namespace ProfileEvents +{ + extern const Event CreatedReadBufferOrdinary; + extern const Event CreatedReadBufferMMap; + extern const Event CreatedReadBufferMMapFailed; +} + namespace fs = std::filesystem; namespace DB @@ -69,6 +75,7 @@ namespace ErrorCodes extern const int FILE_DOESNT_EXIST; extern const int TIMEOUT_EXCEEDED; extern const int INCOMPATIBLE_COLUMNS; + extern const int CANNOT_STAT; extern const int LOGICAL_ERROR; extern const int CANNOT_APPEND_TO_FILE; extern const int CANNOT_EXTRACT_TABLE_STRUCTURE; @@ -182,6 +189,7 @@ void checkCreationIsAllowed( std::unique_ptr createReadBuffer( const String & current_path, bool use_table_fd, + const String & storage_name, int table_fd, const String & compression_method, ContextPtr context) @@ -189,27 +197,73 @@ std::unique_ptr createReadBuffer( std::unique_ptr nested_buffer; CompressionMethod method; - auto read_method = context->getSettingsRef().storage_file_read_method.value; - auto read_settings = context->getReadSettings(); - read_settings.mmap_threshold = 1; - read_settings.mmap_cache = nullptr; /// Turn off mmap cache for Storage File - - if (auto opt_method = magic_enum::enum_cast(read_method)) - read_settings.local_fs_method = *opt_method; + auto read_method_string = context->getSettingsRef().storage_file_read_method.value; + LocalFSReadMethod read_method; + if (auto opt_method = magic_enum::enum_cast(read_method_string)) + read_method = *opt_method; else - throwFromErrno("Unknown read method " + read_method, ErrorCodes::UNKNOWN_READ_METHOD); + throwFromErrno("Unknown read method " + read_method_string, ErrorCodes::UNKNOWN_READ_METHOD); + + struct stat file_stat{}; if (use_table_fd) { - nested_buffer = createReadBufferFromFileDescriptorBase(table_fd, read_settings); + /// Check if file descriptor allows random reads (and reading it twice). + if (0 != fstat(table_fd, &file_stat)) + throwFromErrno("Cannot stat table file descriptor, inside " + storage_name, ErrorCodes::CANNOT_STAT); + method = chooseCompressionMethod("", compression_method); } else { - nested_buffer = createReadBufferFromFileBase(current_path, read_settings); + /// Check if file descriptor allows random reads (and reading it twice). + if (0 != stat(current_path.c_str(), &file_stat)) + throwFromErrno("Cannot stat file " + current_path, ErrorCodes::CANNOT_STAT); + method = chooseCompressionMethod(current_path, compression_method); } + + bool mmap_failed = false; + if (S_ISREG(file_stat.st_mode) && read_method == LocalFSReadMethod::mmap) + { + try + { + if (use_table_fd) + nested_buffer = std::make_unique(table_fd, 0); + else + nested_buffer = std::make_unique(current_path, 0); + + ProfileEvents::increment(ProfileEvents::CreatedReadBufferMMap); + } + catch (const ErrnoException &) + { + /// Fallback if mmap is not supported. + ProfileEvents::increment(ProfileEvents::CreatedReadBufferMMapFailed); + mmap_failed = true; + } + } + + if (S_ISREG(file_stat.st_mode) && (read_method == LocalFSReadMethod::pread || mmap_failed)) + { + if (use_table_fd) + nested_buffer = std::make_unique(table_fd); + else + nested_buffer = std::make_unique(current_path, context->getSettingsRef().max_read_buffer_size); + + ProfileEvents::increment(ProfileEvents::CreatedReadBufferOrdinary); + } + else + { + if (use_table_fd) + nested_buffer = std::make_unique(table_fd); + else + nested_buffer = std::make_unique(current_path, context->getSettingsRef().max_read_buffer_size); + + ProfileEvents::increment(ProfileEvents::CreatedReadBufferOrdinary); + } + + /// For clickhouse-local and clickhouse-client add progress callback to display progress bar. if (context->getApplicationType() == Context::ApplicationType::LOCAL || context->getApplicationType() == Context::ApplicationType::CLIENT) @@ -277,7 +331,7 @@ ColumnsDescription StorageFile::getTableStructureFromFileDescriptor(ContextPtr c { /// We will use PeekableReadBuffer to create a checkpoint, so we need a place /// where we can store the original read buffer. - read_buffer_from_fd = createReadBuffer("", true, table_fd, compression_method, context); + read_buffer_from_fd = createReadBuffer("", true, getName(), table_fd, compression_method, context); auto read_buf = std::make_unique(*read_buffer_from_fd); read_buf->setCheckpoint(); return read_buf; @@ -326,7 +380,7 @@ ColumnsDescription StorageFile::getTableStructureFromFile( if (it == paths.end()) return nullptr; - return createReadBuffer(*it++, false, -1, compression_method, context); + return createReadBuffer(*it++, false, "File", -1, compression_method, context); }; ColumnsDescription columns; @@ -543,7 +597,7 @@ public: } if (!read_buf) - read_buf = createReadBuffer(current_path, storage->use_table_fd, storage->table_fd, storage->compression_method, context); + read_buf = createReadBuffer(current_path, storage->use_table_fd, storage->getName(), storage->table_fd, storage->compression_method, context); auto format = context->getInputFormat(storage->format_name, *read_buf, block_for_format, max_block_size, storage->format_settings);