#pragma once #include #include #include #include #include #include #include #include #include #include namespace DB { /** Класс для асинхронного чтения данных. */ class ReadBufferAIO : public ReadBufferFromFileBase { public: ReadBufferAIO(const std::string & filename_, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE, int flags_ = -1, char * existing_memory_ = nullptr); ~ReadBufferAIO() override; ReadBufferAIO(const ReadBufferAIO &) = delete; ReadBufferAIO & operator=(const ReadBufferAIO &) = delete; void setMaxBytes(size_t max_bytes_read_); off_t getPositionInFile() override; std::string getFileName() const noexcept override { return filename; } int getFD() const noexcept override { return fd; } private: off_t getPositionInFileRelaxed() const noexcept; off_t doSeek(off_t off, int whence) override; bool nextImpl() override; void synchronousRead(); void initRequest(); void publishReceivedData(); void sync(); /// Ждать окончания текущей асинхронной задачи. bool waitForAIOCompletion(); /// Менять местами основной и дублирующий буферы. void swapBuffers() noexcept; void skipPendingAIO(); private: /// Буфер для асинхронных операций чтения данных. BufferWithOwnMemory fill_buffer; iocb request; std::vector request_ptrs{&request}; std::vector events{1}; AIOContext aio_context{1}; const std::string filename; ssize_t bytes_read = 0; size_t max_bytes_read = std::numeric_limits::max(); size_t total_bytes_read = 0; size_t requested_byte_count = 0; off_t region_aligned_begin = 0; off_t pos_in_file = 0; int fd = -1; Position buffer_begin = nullptr; off_t region_aligned_size = 0; /// Асинхронная операция чтения ещё не завершилась. bool is_pending_read = false; /// Конец файла достигнут. bool is_eof = false; /// Был отправлен хоть один запрос на асинхронную операцию чтения. bool is_started = false; /// Асинхронная операция завершилась неудачно? bool aio_failed = false; }; }