From c0ae93107ff10dfbc565773729fe99ee34d09ac9 Mon Sep 17 00:00:00 2001 From: Pavel Kruglov Date: Wed, 11 Aug 2021 19:04:04 +0300 Subject: [PATCH] Fix bad conflict resolving --- src/IO/ReadBufferAIO.cpp | 312 --------------------------------------- src/IO/ReadBufferAIO.h | 111 -------------- 2 files changed, 423 deletions(-) delete mode 100644 src/IO/ReadBufferAIO.cpp delete mode 100644 src/IO/ReadBufferAIO.h diff --git a/src/IO/ReadBufferAIO.cpp b/src/IO/ReadBufferAIO.cpp deleted file mode 100644 index c064e0d4ed9..00000000000 --- a/src/IO/ReadBufferAIO.cpp +++ /dev/null @@ -1,312 +0,0 @@ -#if defined(OS_LINUX) || defined(__FreeBSD__) - -#include -#include -#include -#include -#include -#include - -#include -#include -#include - -#include - - -namespace ProfileEvents -{ - extern const Event FileOpen; - extern const Event ReadBufferAIORead; - extern const Event ReadBufferAIOReadBytes; -} - -namespace CurrentMetrics -{ - extern const Metric Read; -} - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int FILE_DOESNT_EXIST; - extern const int CANNOT_OPEN_FILE; - extern const int LOGICAL_ERROR; - extern const int ARGUMENT_OUT_OF_BOUND; - extern const int AIO_READ_ERROR; -} - - -/// Note: an additional page is allocated that will contain the data that -/// does not fit into the main buffer. -ReadBufferAIO::ReadBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, char * existing_memory_) - : ReadBufferFromFileBase(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE), - fill_buffer(BufferWithOwnMemory(internalBuffer().size(), nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)), - filename(filename_) -{ - ProfileEvents::increment(ProfileEvents::FileOpen); - - int open_flags = (flags_ == -1) ? O_RDONLY : flags_; - open_flags |= O_DIRECT; - open_flags |= O_CLOEXEC; - - fd = ::open(filename.c_str(), open_flags); - if (fd == -1) - { - auto error_code = (errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE; - throwFromErrnoWithPath("Cannot open file " + filename, filename, error_code); - } -} - -ReadBufferAIO::~ReadBufferAIO() -{ - if (!aio_failed) - { - try - { - (void) waitForAIOCompletion(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - } - - if (fd != -1) - ::close(fd); -} - -void ReadBufferAIO::setMaxBytes(size_t max_bytes_read_) -{ - if (is_started) - throw Exception("Illegal attempt to set the maximum number of bytes to read from file " + filename, ErrorCodes::LOGICAL_ERROR); - max_bytes_read = max_bytes_read_; -} - -bool ReadBufferAIO::nextImpl() -{ - /// If the end of the file has already been reached by calling this function, - /// then the current call is wrong. - if (is_eof) - return false; - - std::optional watch; - if (profile_callback) - watch.emplace(clock_type); - - if (!is_pending_read) - synchronousRead(); - else - receive(); - - if (profile_callback) - { - ProfileInfo info; - info.bytes_requested = requested_byte_count; - info.bytes_read = bytes_read; - info.nanoseconds = watch->elapsed(); //-V1007 - profile_callback(info); - } - - is_started = true; - - /// If the end of the file is just reached, do nothing else. - if (is_eof) - return bytes_read != 0; - - /// Create an asynchronous request. - prepare(); - -#if defined(__FreeBSD__) - request.aio.aio_lio_opcode = LIO_READ; - request.aio.aio_fildes = fd; - request.aio.aio_buf = reinterpret_cast(buffer_begin); - request.aio.aio_nbytes = region_aligned_size; - request.aio.aio_offset = region_aligned_begin; -#else - request.aio_lio_opcode = IOCB_CMD_PREAD; - request.aio_fildes = fd; - request.aio_buf = reinterpret_cast(buffer_begin); - request.aio_nbytes = region_aligned_size; - request.aio_offset = region_aligned_begin; -#endif - - /// Send the request. - try - { - future_bytes_read = AIOContextPool::instance().post(request); - } - catch (...) - { - aio_failed = true; - throw; - } - - is_pending_read = true; - return true; -} - -off_t ReadBufferAIO::seek(off_t off, int whence) -{ - off_t new_pos_in_file; - - if (whence == SEEK_SET) - { - if (off < 0) - throw Exception("SEEK_SET underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND); - new_pos_in_file = off; - } - else if (whence == SEEK_CUR) - { - if (off >= 0) - { - if (off > (std::numeric_limits::max() - getPosition())) - throw Exception("SEEK_CUR overflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND); - } - else if (off < -getPosition()) - throw Exception("SEEK_CUR underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND); - new_pos_in_file = getPosition() + off; - } - else - throw Exception("ReadBufferAIO::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND); - - if (new_pos_in_file != getPosition()) - { - off_t first_read_pos_in_file = first_unread_pos_in_file - static_cast(working_buffer.size()); - if (hasPendingData() && (new_pos_in_file >= first_read_pos_in_file) && (new_pos_in_file <= first_unread_pos_in_file)) - { - /// Moved, but remained within the buffer. - pos = working_buffer.begin() + (new_pos_in_file - first_read_pos_in_file); - } - else - { - /// Moved past the buffer. - pos = working_buffer.end(); - first_unread_pos_in_file = new_pos_in_file; - - /// If we go back, than it's not eof - is_eof = false; - - /// We can not use the result of the current asynchronous request. - skip(); - } - } - - return new_pos_in_file; -} - -void ReadBufferAIO::synchronousRead() -{ - CurrentMetrics::Increment metric_increment_read{CurrentMetrics::Read}; - - prepare(); - bytes_read = ::pread(fd, buffer_begin, region_aligned_size, region_aligned_begin); - - ProfileEvents::increment(ProfileEvents::ReadBufferAIORead); - ProfileEvents::increment(ProfileEvents::ReadBufferAIOReadBytes, bytes_read); - - finalize(); -} - -void ReadBufferAIO::receive() -{ - if (!waitForAIOCompletion()) - { - throw Exception("Trying to receive data from AIO, but nothing was queued. It's a bug", ErrorCodes::LOGICAL_ERROR); - } - finalize(); -} - -void ReadBufferAIO::skip() -{ - if (!waitForAIOCompletion()) - return; - - /// @todo I presume this assignment is redundant since waitForAIOCompletion() performs a similar one -// bytes_read = future_bytes_read.get(); - if ((bytes_read < 0) || (static_cast(bytes_read) < region_left_padding)) - throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR); -} - -bool ReadBufferAIO::waitForAIOCompletion() -{ - if (is_eof || !is_pending_read) - return false; - - CurrentMetrics::Increment metric_increment_read{CurrentMetrics::Read}; - - bytes_read = future_bytes_read.get(); - is_pending_read = false; - - ProfileEvents::increment(ProfileEvents::ReadBufferAIORead); - ProfileEvents::increment(ProfileEvents::ReadBufferAIOReadBytes, bytes_read); - - return true; -} - -void ReadBufferAIO::prepare() -{ - requested_byte_count = std::min(fill_buffer.internalBuffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE, max_bytes_read); - - /// Region of the disk from which we want to read data. - const off_t region_begin = first_unread_pos_in_file; - - if ((requested_byte_count > static_cast(std::numeric_limits::max())) || - (first_unread_pos_in_file > (std::numeric_limits::max() - static_cast(requested_byte_count)))) - throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR); - - const off_t region_end = first_unread_pos_in_file + requested_byte_count; - - /// The aligned region of the disk from which we will read the data. - region_left_padding = region_begin % DEFAULT_AIO_FILE_BLOCK_SIZE; - const size_t region_right_padding = (DEFAULT_AIO_FILE_BLOCK_SIZE - (region_end % DEFAULT_AIO_FILE_BLOCK_SIZE)) % DEFAULT_AIO_FILE_BLOCK_SIZE; - - region_aligned_begin = region_begin - region_left_padding; - - if (region_end > (std::numeric_limits::max() - static_cast(region_right_padding))) - throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR); - - const off_t region_aligned_end = region_end + region_right_padding; - region_aligned_size = region_aligned_end - region_aligned_begin; - - buffer_begin = fill_buffer.internalBuffer().begin(); - - /// Unpoison because msan doesn't instrument linux AIO - __msan_unpoison(buffer_begin, fill_buffer.internalBuffer().size()); -} - -void ReadBufferAIO::finalize() -{ - if ((bytes_read < 0) || (static_cast(bytes_read) < region_left_padding)) - throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR); - - /// Ignore redundant bytes on the left. - bytes_read -= region_left_padding; - - /// Ignore redundant bytes on the right. - bytes_read = std::min(static_cast(bytes_read), static_cast(requested_byte_count)); - - if (bytes_read > 0) - fill_buffer.buffer().resize(region_left_padding + bytes_read); - if (static_cast(bytes_read) < requested_byte_count) - is_eof = true; - - if (first_unread_pos_in_file > (std::numeric_limits::max() - bytes_read)) - throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR); - - first_unread_pos_in_file += bytes_read; - total_bytes_read += bytes_read; - nextimpl_working_buffer_offset = region_left_padding; - - if (total_bytes_read == max_bytes_read) - is_eof = true; - - /// Swap the main and duplicate buffers. - swap(fill_buffer); -} - -} - -#endif diff --git a/src/IO/ReadBufferAIO.h b/src/IO/ReadBufferAIO.h deleted file mode 100644 index d476865747d..00000000000 --- a/src/IO/ReadBufferAIO.h +++ /dev/null @@ -1,111 +0,0 @@ -#pragma once - -#if defined(OS_LINUX) || defined(__FreeBSD__) - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - - -namespace CurrentMetrics -{ - extern const Metric OpenFileForRead; -} - -namespace DB -{ - -/** Class for asynchronous data reading. - */ -class ReadBufferAIO final : public ReadBufferFromFileBase -{ -public: - ReadBufferAIO(const std::string & filename_, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, int flags_ = -1, - char * existing_memory_ = nullptr); - ~ReadBufferAIO() override; - - ReadBufferAIO(const ReadBufferAIO &) = delete; - ReadBufferAIO & operator=(const ReadBufferAIO &) = delete; - - void setMaxBytes(size_t max_bytes_read_); - off_t getPosition() override { return first_unread_pos_in_file - (working_buffer.end() - pos); } - std::string getFileName() const override { return filename; } - int getFD() const { return fd; } - - off_t seek(off_t off, int whence) override; - -private: - /// - bool nextImpl() override; - /// Synchronously read the data. - void synchronousRead(); - /// Get data from an asynchronous request. - void receive(); - /// Ignore data from an asynchronous request. - void skip(); - /// Wait for the end of the current asynchronous task. - bool waitForAIOCompletion(); - /// Prepare the request. - void prepare(); - /// Prepare for reading a duplicate buffer containing data from - /// of the last request. - void finalize(); - -private: - /// Buffer for asynchronous data read operations. - BufferWithOwnMemory fill_buffer; - - /// Description of the asynchronous read request. - iocb request{}; - std::future future_bytes_read; - - const std::string filename; - - /// The maximum number of bytes that can be read. - size_t max_bytes_read = std::numeric_limits::max(); - /// Number of bytes requested. - size_t requested_byte_count = 0; - /// The number of bytes read at the last request. - ssize_t bytes_read = 0; - /// The total number of bytes read. - size_t total_bytes_read = 0; - - /// The position of the first unread byte in the file. - off_t first_unread_pos_in_file = 0; - - /// The starting position of the aligned region of the disk from which the data is read. - off_t region_aligned_begin = 0; - /// Left offset to align the region of the disk. - size_t region_left_padding = 0; - /// The size of the aligned region of the disk. - size_t region_aligned_size = 0; - - /// The file descriptor for read. - int fd = -1; - - /// The buffer to which the received data is written. - Position buffer_begin = nullptr; - - /// The asynchronous read operation is not yet completed. - bool is_pending_read = false; - /// The end of the file is reached. - bool is_eof = false; - /// At least one read request was sent. - bool is_started = false; - /// Did the asynchronous operation fail? - bool aio_failed = false; - - CurrentMetrics::Increment metric_increment{CurrentMetrics::OpenFileForRead}; -}; - -} - -#endif