#include #include #include #include #include #include namespace DB { WriteBufferAIO::WriteBufferAIO(const std::string & filename_, size_t buffer_size_, int flags_, mode_t mode_, char * existing_memory_) : BufferWithOwnMemory(buffer_size_, existing_memory_, DEFAULT_AIO_FILE_BLOCK_SIZE), flush_buffer(BufferWithOwnMemory(buffer_size_, nullptr, DEFAULT_AIO_FILE_BLOCK_SIZE)), filename(filename_) { ProfileEvents::increment(ProfileEvents::FileOpen); int open_flags = (flags_ == -1) ? (O_WRONLY | O_TRUNC | O_CREAT) : flags_; open_flags |= O_DIRECT; fd = ::open(filename.c_str(), open_flags, mode_); if (fd == -1) { got_exception = true; auto error_code = (errno == ENOENT) ? ErrorCodes::FILE_DOESNT_EXIST : ErrorCodes::CANNOT_OPEN_FILE; throwFromErrno("Cannot open file " + filename, error_code); } ::memset(&request, 0, sizeof(request)); } WriteBufferAIO::~WriteBufferAIO() { if (!got_exception) { try { flush(); } catch (...) { tryLogCurrentException(__PRETTY_FUNCTION__); } } if (fd != -1) ::close(fd); } off_t WriteBufferAIO::seek(off_t off, int whence) { flush(); if (whence == SEEK_SET) { if (off < 0) { got_exception = true; throw Exception("SEEK_SET underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND); } pos_in_file = off; } else if (whence == SEEK_CUR) { if (off >= 0) { if (off > (std::numeric_limits::max() - pos_in_file)) { got_exception = true; throw Exception("SEEK_CUR overflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND); } } else if (off < -pos_in_file) { got_exception = true; throw Exception("SEEK_CUR underflow", ErrorCodes::ARGUMENT_OUT_OF_BOUND); } pos_in_file += off; } else { got_exception = true; throw Exception("WriteBufferAIO::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND); } return pos_in_file; } off_t WriteBufferAIO::getPositionInFile() { return seek(0, SEEK_CUR); } void WriteBufferAIO::truncate(off_t length) { flush(); int res = ::ftruncate(fd, length); if (res == -1) { got_exception = true; throwFromErrno("Cannot truncate file " + filename, ErrorCodes::CANNOT_TRUNCATE_FILE); } } void WriteBufferAIO::sync() { flush(); /// Попросим ОС сбросить данные на диск. int res = ::fsync(fd); if (res == -1) { got_exception = true; throwFromErrno("Cannot fsync " + getFileName(), ErrorCodes::CANNOT_FSYNC); } } void WriteBufferAIO::flush() { next(); waitForAIOCompletion(); } void WriteBufferAIO::nextImpl() { if (!offset()) return; waitForAIOCompletion(); swapBuffers(); /// Input parameters: fd, pos_in_file, flush_buffer /* region_aligned_begin region_begin region_end region_aligned_end | | | | | +---------------------+ +----------------------+ | | | | | +--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+ |XXXXX* : : : : : : : : : : *XXXX| +-------|XXXXX* : : : : : : : : : : *XXXX|-------+ | |XXXXX* : : : : : : : : : : *XXXX| | | |XXXXX* : : : : : : : : : : *XXXX| | | +--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+ | (1) | | ^ | |(1) read| +---- left padded disk page | right padded disk page ----+ |read | | | | +--------+ (left padded page) | (right padded page) +--------+ | | |XXXXX*YY| | |ZZZ*XXXX| | | |XXXXX*YY|--------------------------------->+<--------------------------------------------|ZZZ*XXXX| | +------>|XXXXX*YY|<--+ | (3) scattered write +------------>|ZZZ*XXXX|<------+ |XXXXX*YY| | | | |ZZZ*XXXX| +--------+ |(2)copy | |(2)copy +--------+ | | | +--------------------+ | +--------------------+ | | | | buffer_begin aligned_buffer_begin.......+...........aligned_buffer_end buffer_end | | | | | | | | | +---------------------+ +----------------------+ | | | | | | | | | ---+--------+--------+--------+--------+--------+--------+--------+--------+--------+---- | | *YY: : : : : : : : : :ZZZ* | | *YY: : : : : : : : : :ZZZ* | +-------*YY: : : : : : : : : :ZZZ*--------+ *YY: : : : : : : : : :ZZZ* ---+--------+--------+--------+--------+--------+--------+--------+--------+--------+---- */ // // 1. Determine the enclosing page-aligned disk region. // /// Disk region we want to write to. size_t region_begin = pos_in_file; size_t region_end = pos_in_file + flush_buffer.offset(); /// Page-aligned disk region. size_t region_aligned_begin = region_begin - (region_begin % DEFAULT_AIO_FILE_BLOCK_SIZE); size_t region_aligned_end = region_end; if ((region_aligned_end % DEFAULT_AIO_FILE_BLOCK_SIZE) != 0) region_aligned_end += DEFAULT_AIO_FILE_BLOCK_SIZE - (region_aligned_end % DEFAULT_AIO_FILE_BLOCK_SIZE); bool has_left_padding = (region_aligned_begin != region_begin); bool has_right_padding = (region_aligned_end != region_end); // // 2. Read needed data from disk into padded pages. // if (has_left_padding) { /// Left-side padding disk region. ssize_t read_count = ::pread(fd, &left_page[0], DEFAULT_AIO_FILE_BLOCK_SIZE, region_aligned_begin); if (read_count < 0) throw Exception("Read error"); } if (has_right_padding) { /// Right-side padding disk region. ssize_t read_count = ::pread(fd, &right_page[0], DEFAULT_AIO_FILE_BLOCK_SIZE, (region_aligned_end - DEFAULT_AIO_FILE_BLOCK_SIZE)); if (read_count < 0) throw Exception("Read error"); } // // 3. Copy padding data (2 user-space copies) from the buffer into the padded pages. // /// Buffer we want to write to disk. Position buffer_begin = flush_buffer.buffer().begin(); Position buffer_end = buffer_begin + flush_buffer.offset(); /// Subset of the buffer that is page-aligned. Position aligned_buffer_begin = buffer_begin; Position aligned_buffer_end = buffer_end; if (has_left_padding) { size_t left_page_unmodified_size = region_begin - region_aligned_begin; size_t left_page_modified_size = DEFAULT_AIO_FILE_BLOCK_SIZE - left_page_unmodified_size; aligned_buffer_begin += left_page_modified_size; ::memcpy(&left_page[0] + left_page_unmodified_size, buffer_begin, left_page_modified_size); } if (has_right_padding) { size_t right_page_begin = region_aligned_end - DEFAULT_AIO_FILE_BLOCK_SIZE; size_t right_page_modified_size = region_end - right_page_begin; aligned_buffer_end -= right_page_modified_size; ::memcpy(&right_page[0], (buffer_end - right_page_modified_size), right_page_modified_size); } // // 4. Create requests. // size_t i = 0; if (has_left_padding) { iov[i].iov_base = &left_page[0]; iov[i].iov_len = DEFAULT_AIO_FILE_BLOCK_SIZE; ++i; } iov[i].iov_base = aligned_buffer_begin; iov[i].iov_len = aligned_buffer_end - aligned_buffer_begin; ++i; if (has_right_padding) { iov[i].iov_base = &right_page[0]; iov[i].iov_len = DEFAULT_AIO_FILE_BLOCK_SIZE; } /// Send requests (1 syscall). request.aio_lio_opcode = IOCB_CMD_PWRITEV; request.aio_fildes = fd; request.aio_buf = reinterpret_cast(iov); request.aio_nbytes = i + 1; request.aio_offset = region_aligned_begin; /// Отправить запрос. while (io_submit(aio_context.ctx, request_ptrs.size(), &request_ptrs[0]) < 0) { if (errno != EINTR) { got_exception = true; throw Exception("Cannot submit request for asynchronous IO on file " + filename, ErrorCodes::AIO_SUBMIT_ERROR); } } is_pending_write = true; } void WriteBufferAIO::waitForAIOCompletion() { if (is_pending_write) { while (io_getevents(aio_context.ctx, events.size(), events.size(), &events[0], nullptr) < 0) { if (errno != EINTR) { got_exception = true; throw Exception("Failed to wait for asynchronous IO completion on file " + filename, ErrorCodes::AIO_COMPLETION_ERROR); } } is_pending_write = false; off_t bytes_written = events[0].res; if ((bytes_written < 0) || (static_cast(bytes_written) < flush_buffer.offset())) { got_exception = true; throw Exception("Asynchronous write error on file " + filename, ErrorCodes::AIO_WRITE_ERROR); } if (pos_in_file > (std::numeric_limits::max() - bytes_written)) { got_exception = true; throw Exception("File position overflowed", ErrorCodes::LOGICAL_ERROR); } pos_in_file += bytes_written; } } void WriteBufferAIO::swapBuffers() noexcept { buffer().swap(flush_buffer.buffer()); std::swap(position(), flush_buffer.position()); } }