Merge pull request #22673 from nvartolomei/nv/exp-arrow-stream

Experiment with ArrowStream streaming
This commit is contained in:
Nikolai Kochetov 2021-04-12 13:29:25 +03:00 committed by GitHub
commit 7019a9a659
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 63 additions and 11 deletions

View File

@ -78,7 +78,7 @@ void ArrowBlockInputFormat::prepareReader()
{
if (stream)
{
auto stream_reader_status = arrow::ipc::RecordBatchStreamReader::Open(asArrowFile(in));
auto stream_reader_status = arrow::ipc::RecordBatchStreamReader::Open(std::make_unique<ArrowInputStreamFromReadBuffer>(in));
if (!stream_reader_status.ok())
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
"Error while opening a table: {}", stream_reader_status.status().ToString());

View File

@ -55,26 +55,23 @@ arrow::Status RandomAccessFileFromSeekableReadBuffer::Close()
arrow::Result<int64_t> RandomAccessFileFromSeekableReadBuffer::Tell() const
{
return arrow::Result<int64_t>(in.getPosition());
return in.getPosition();
}
arrow::Result<int64_t> RandomAccessFileFromSeekableReadBuffer::Read(int64_t nbytes, void * out)
{
int64_t bytes_read = in.readBig(reinterpret_cast<char *>(out), nbytes);
return arrow::Result<int64_t>(bytes_read);
return in.readBig(reinterpret_cast<char *>(out), nbytes);
}
arrow::Result<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromSeekableReadBuffer::Read(int64_t nbytes)
{
auto buffer_status = arrow::AllocateBuffer(nbytes);
ARROW_RETURN_NOT_OK(buffer_status);
ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes))
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data()))
auto shared_buffer = std::shared_ptr<arrow::Buffer>(std::move(std::move(*buffer_status)));
if (bytes_read < nbytes)
RETURN_NOT_OK(buffer->Resize(bytes_read));
size_t n = in.readBig(reinterpret_cast<char *>(shared_buffer->mutable_data()), nbytes);
auto read_buffer = arrow::SliceBuffer(shared_buffer, 0, n);
return arrow::Result<std::shared_ptr<arrow::Buffer>>(shared_buffer);
return buffer;
}
arrow::Status RandomAccessFileFromSeekableReadBuffer::Seek(int64_t position)
@ -83,6 +80,43 @@ arrow::Status RandomAccessFileFromSeekableReadBuffer::Seek(int64_t position)
return arrow::Status::OK();
}
ArrowInputStreamFromReadBuffer::ArrowInputStreamFromReadBuffer(ReadBuffer & in_) : in(in_), is_open{true}
{
}
arrow::Result<int64_t> ArrowInputStreamFromReadBuffer::Read(int64_t nbytes, void * out)
{
return in.readBig(reinterpret_cast<char *>(out), nbytes);
}
arrow::Result<std::shared_ptr<arrow::Buffer>> ArrowInputStreamFromReadBuffer::Read(int64_t nbytes)
{
ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes))
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data()))
if (bytes_read < nbytes)
RETURN_NOT_OK(buffer->Resize(bytes_read));
return buffer;
}
arrow::Status ArrowInputStreamFromReadBuffer::Abort()
{
return arrow::Status();
}
arrow::Result<int64_t> ArrowInputStreamFromReadBuffer::Tell() const
{
return in.count();
}
arrow::Status ArrowInputStreamFromReadBuffer::Close()
{
is_open = false;
return arrow::Status();
}
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in)
{
if (auto * fd_in = dynamic_cast<ReadBufferFromFileDescriptor *>(&in))

View File

@ -61,6 +61,24 @@ private:
ARROW_DISALLOW_COPY_AND_ASSIGN(RandomAccessFileFromSeekableReadBuffer);
};
class ArrowInputStreamFromReadBuffer : public arrow::io::InputStream
{
public:
explicit ArrowInputStreamFromReadBuffer(ReadBuffer & in);
arrow::Result<int64_t> Read(int64_t nbytes, void* out) override;
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override;
arrow::Status Abort() override;
arrow::Result<int64_t> Tell() const override;
arrow::Status Close() override;
bool closed() const override { return !is_open; }
private:
ReadBuffer & in;
bool is_open = false;
ARROW_DISALLOW_COPY_AND_ASSIGN(ArrowInputStreamFromReadBuffer);
};
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in);
}