From 4ea363006b872a8e0e365bbb230f5b43feb5da48 Mon Sep 17 00:00:00 2001 From: Nicolae Vartolomei Date: Mon, 5 Apr 2021 22:21:16 +0300 Subject: [PATCH] Experiment with ArrowStream streaming --- .../Formats/Impl/ArrowBlockInputFormat.cpp | 2 +- .../Formats/Impl/ArrowBufferedStreams.cpp | 46 +++++++++++++++++++ .../Formats/Impl/ArrowBufferedStreams.h | 18 ++++++++ 3 files changed, 65 insertions(+), 1 deletion(-) diff --git a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp index 6a5c9718278..5eeb512746e 100644 --- a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp @@ -78,7 +78,7 @@ void ArrowBlockInputFormat::prepareReader() { if (stream) { - auto stream_reader_status = arrow::ipc::RecordBatchStreamReader::Open(asArrowFile(in)); + auto stream_reader_status = arrow::ipc::RecordBatchStreamReader::Open(asArrowInputStream(in)); if (!stream_reader_status.ok()) throw Exception(ErrorCodes::UNKNOWN_EXCEPTION, "Error while opening a table: {}", stream_reader_status.status().ToString()); diff --git a/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp b/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp index c783e10debb..ee95b48de96 100644 --- a/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp +++ b/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp @@ -83,6 +83,47 @@ arrow::Status RandomAccessFileFromSeekableReadBuffer::Seek(int64_t position) return arrow::Status::OK(); } + +ArrowInputStream::ArrowInputStream(ReadBuffer & in_) : in(in_) +{ +} + +arrow::Result ArrowInputStream::Read(int64_t nbytes, void * out) +{ + return in.read(reinterpret_cast(out), nbytes); +} + +arrow::Result> ArrowInputStream::Read(int64_t nbytes) +{ + std::string file_data; + { + WriteBufferFromString file_buffer(file_data); + copyData(in, file_buffer, nbytes); + } + + return arrow::Buffer::FromString(std::move(file_data)); +} + +arrow::Status ArrowInputStream::Abort() +{ + return arrow::Status(); +} + +arrow::Result ArrowInputStream::Tell() const +{ + return in.count(); +} + +arrow::Status ArrowInputStream::Close() +{ + return arrow::Status(); +} + +bool ArrowInputStream::closed() const +{ + return false; +} + std::shared_ptr asArrowFile(ReadBuffer & in) { if (auto * fd_in = dynamic_cast(&in)) @@ -104,6 +145,11 @@ std::shared_ptr asArrowFile(ReadBuffer & in) return std::make_shared(arrow::Buffer::FromString(std::move(file_data))); } +std::shared_ptr asArrowInputStream(ReadBuffer & in) +{ + return std::make_shared(in); +} + } #endif diff --git a/src/Processors/Formats/Impl/ArrowBufferedStreams.h b/src/Processors/Formats/Impl/ArrowBufferedStreams.h index bb94535549c..5dabce3e0df 100644 --- a/src/Processors/Formats/Impl/ArrowBufferedStreams.h +++ b/src/Processors/Formats/Impl/ArrowBufferedStreams.h @@ -61,7 +61,25 @@ private: ARROW_DISALLOW_COPY_AND_ASSIGN(RandomAccessFileFromSeekableReadBuffer); }; +class ArrowInputStream : public arrow::io::InputStream +{ +public: + explicit ArrowInputStream(ReadBuffer & in); + arrow::Result Read(int64_t nbytes, void* out) override; + arrow::Result> Read(int64_t nbytes) override; + arrow::Status Abort() override; + arrow::Result Tell() const override; + arrow::Status Close() override; + bool closed() const override; + +private: + ReadBuffer & in; + + ARROW_DISALLOW_COPY_AND_ASSIGN(ArrowInputStream); +}; + std::shared_ptr asArrowFile(ReadBuffer & in); +std::shared_ptr asArrowInputStream(ReadBuffer & in); }