#include #include #include #include #include #include namespace DB { /// Выделяем дополнительную страницу. Содежрит те данные, которые /// не влезают в основной буфер. ReadBufferAIO::ReadBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, char * existing_memory_) : ReadBufferFromFileBase(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE), fill_buffer(BufferWithOwnMemory(buffer_size_ + DEFAULT_AIO_FILE_BLOCK_SIZE, nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)), filename(filename_) { ProfileEvents::increment(ProfileEvents::FileOpen); int open_flags = (flags_ == -1) ? O_RDONLY : flags_; open_flags |= O_DIRECT; fd = ::open(filename.c_str(), open_flags); if (fd == -1) { auto error_code = (errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE; throwFromErrno("Cannot open file " + filename, error_code); } ::memset(&request, 0, sizeof(request)); } ReadBufferAIO::~ReadBufferAIO() { if (!aio_failed) { try { (void) waitForAIOCompletion(); } catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); } } if (fd != -1) ::close(fd); } void ReadBufferAIO::setMaxBytes(size_t max_bytes_read_) { if (is_started) throw Exception("Illegal attempt to set the maximum number of bytes to read from file " + filename, ErrorCodes::LOGICAL_ERROR); max_bytes_read = max_bytes_read_; } bool ReadBufferAIO::nextImpl() { /// Если конец файла уже был достигнут при вызове этой функции, /// то текущий вызов ошибочен. if (is_eof) return false; if (!is_started) { synchronousRead(); is_started = true; } else receive(); /// Если конец файла только что достигнут, больше ничего не делаем. if (is_eof) return true; /// Создать асинхронный запрос. prepare(); request.aio_lio_opcode = IOCB_CMD_PREAD; request.aio_fildes = fd; request.aio_buf = reinterpret_cast(buffer_begin); request.aio_nbytes = region_aligned_size; request.aio_offset = region_aligned_begin; /// Отправить запрос. while (io_submit(aio_context.ctx, request_ptrs.size(), &request_ptrs[0]) < 0) { if (errno != EINTR) { aio_failed = true; throw Exception("Cannot submit request for asynchronous IO on file " + filename, ErrorCodes::AIO_SUBMIT_ERROR); } } is_pending_read = true; return true; } off_t ReadBufferAIO::doSeek(off_t off, int whence) { off_t new_pos_in_file; if (whence == SEEK_SET) { if (off < 0) throw Exception("SEEK_SET underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND); new_pos_in_file = off; } else if (whence == SEEK_CUR) { if (off >= 0) { if (off > (std::numeric_limits::max() - getPositionInFile())) throw Exception("SEEK_CUR overflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND); } else if (off < -getPositionInFile()) throw Exception("SEEK_CUR underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND); new_pos_in_file = getPositionInFile() + off; } else throw Exception("ReadBufferAIO::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND); if (new_pos_in_file != getPositionInFile()) { off_t working_buffer_begin_pos = pos_in_file - static_cast(working_buffer.size()); if (hasPendingData() && (new_pos_in_file >= working_buffer_begin_pos) && (new_pos_in_file <= pos_in_file)) { /// Свдинулись, но остались в пределах буфера. pos = working_buffer.begin() + (new_pos_in_file - working_buffer_begin_pos); } else { /// Сдвинулись за пределы буфера. pos = working_buffer.end(); pos_in_file = new_pos_in_file; /// Не можем использовать результат текущего асинхронного запроса. skip(); } } return new_pos_in_file; } void ReadBufferAIO::synchronousRead() { prepare(); bytes_read = ::pread(fd, buffer_begin, region_aligned_size, region_aligned_begin); publish(); } void ReadBufferAIO::receive() { if (!waitForAIOCompletion()) return; publish(); } void ReadBufferAIO::skip() { if (!waitForAIOCompletion()) return; is_started = false; bytes_read = events[0].res; size_t region_left_padding = pos_in_file % DEFAULT_AIO_FILE_BLOCK_SIZE; if ((bytes_read < 0) || (static_cast(bytes_read) < region_left_padding)) throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR); } bool ReadBufferAIO::waitForAIOCompletion() { if (is_eof || !is_pending_read) return false; while (io_getevents(aio_context.ctx, events.size(), events.size(), &events[0], nullptr) < 0) { if (errno != EINTR) { aio_failed = true; throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::AIO_COMPLETION_ERROR); } } is_pending_read = false; bytes_read = events[0].res; return true; } void ReadBufferAIO::prepare() { requested_byte_count = std::min(fill_buffer.internalBuffer().size() - DEFAULT_AIO_FILE_BLOCK_SIZE, max_bytes_read); /// Регион диска, из которого хотим читать данные. const off_t region_begin = pos_in_file; if ((requested_byte_count > std::numeric_limits::max()) || (pos_in_file > (std::numeric_limits::max() - static_cast(requested_byte_count)))) throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR); const off_t region_end = pos_in_file + requested_byte_count; /// Выровненный регион диска, из которого будем читать данные. const size_t region_left_padding = region_begin % DEFAULT_AIO_FILE_BLOCK_SIZE; const size_t region_right_padding = (DEFAULT_AIO_FILE_BLOCK_SIZE - (region_end % DEFAULT_AIO_FILE_BLOCK_SIZE)) % DEFAULT_AIO_FILE_BLOCK_SIZE; region_aligned_begin = region_begin - region_left_padding; if (region_end > (std::numeric_limits::max() - static_cast(region_right_padding))) throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR); const off_t region_aligned_end = region_end + region_right_padding; region_aligned_size = region_aligned_end - region_aligned_begin; /// Буфер, в который запишем полученные данные. buffer_begin = fill_buffer.internalBuffer().begin(); } void ReadBufferAIO::publish() { size_t region_left_padding = pos_in_file % DEFAULT_AIO_FILE_BLOCK_SIZE; if ((bytes_read < 0) || (static_cast(bytes_read) < region_left_padding)) throw Exception("Asynchronous read error on file " + filename, ErrorCodes::AIO_READ_ERROR); /// Игнорируем излишние байты слева. bytes_read -= region_left_padding; /// Игнорируем излишние байты справа. bytes_read = std::min(bytes_read, static_cast(requested_byte_count)); if (bytes_read > 0) fill_buffer.buffer().resize(region_left_padding + bytes_read); if (static_cast(bytes_read) < requested_byte_count) is_eof = true; if (pos_in_file > (std::numeric_limits::max() - bytes_read)) throw Exception("An overflow occurred during file operation", ErrorCodes::LOGICAL_ERROR); pos_in_file += bytes_read; total_bytes_read += bytes_read; working_buffer_offset = region_left_padding; if (total_bytes_read == max_bytes_read) is_eof = true; /// Менять местами основной и дублирующий буферы. internalBuffer().swap(fill_buffer.internalBuffer()); buffer().swap(fill_buffer.buffer()); std::swap(position(), fill_buffer.position()); } }