Experiment with ArrowStream streaming

This commit is contained in:
Nicolae Vartolomei 2021-04-05 22:21:16 +03:00
parent ef7571c226
commit 4ea363006b
3 changed files with 65 additions and 1 deletions

View File

@ -78,7 +78,7 @@ void ArrowBlockInputFormat::prepareReader()
{ {
if (stream) if (stream)
{ {
auto stream_reader_status = arrow::ipc::RecordBatchStreamReader::Open(asArrowFile(in)); auto stream_reader_status = arrow::ipc::RecordBatchStreamReader::Open(asArrowInputStream(in));
if (!stream_reader_status.ok()) if (!stream_reader_status.ok())
throw Exception(ErrorCodes::UNKNOWN_EXCEPTION, throw Exception(ErrorCodes::UNKNOWN_EXCEPTION,
"Error while opening a table: {}", stream_reader_status.status().ToString()); "Error while opening a table: {}", stream_reader_status.status().ToString());

View File

@ -83,6 +83,47 @@ arrow::Status RandomAccessFileFromSeekableReadBuffer::Seek(int64_t position)
return arrow::Status::OK(); return arrow::Status::OK();
} }
ArrowInputStream::ArrowInputStream(ReadBuffer & in_) : in(in_)
{
}
arrow::Result<int64_t> ArrowInputStream::Read(int64_t nbytes, void * out)
{
return in.read(reinterpret_cast<char *>(out), nbytes);
}
arrow::Result<std::shared_ptr<arrow::Buffer>> ArrowInputStream::Read(int64_t nbytes)
{
std::string file_data;
{
WriteBufferFromString file_buffer(file_data);
copyData(in, file_buffer, nbytes);
}
return arrow::Buffer::FromString(std::move(file_data));
}
arrow::Status ArrowInputStream::Abort()
{
return arrow::Status();
}
arrow::Result<int64_t> ArrowInputStream::Tell() const
{
return in.count();
}
arrow::Status ArrowInputStream::Close()
{
return arrow::Status();
}
bool ArrowInputStream::closed() const
{
return false;
}
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in) std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in)
{ {
if (auto * fd_in = dynamic_cast<ReadBufferFromFileDescriptor *>(&in)) if (auto * fd_in = dynamic_cast<ReadBufferFromFileDescriptor *>(&in))
@ -104,6 +145,11 @@ std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in)
return std::make_shared<arrow::io::BufferReader>(arrow::Buffer::FromString(std::move(file_data))); return std::make_shared<arrow::io::BufferReader>(arrow::Buffer::FromString(std::move(file_data)));
} }
std::shared_ptr<arrow::io::InputStream> asArrowInputStream(ReadBuffer & in)
{
return std::make_shared<ArrowInputStream>(in);
}
} }
#endif #endif

View File

@ -61,7 +61,25 @@ private:
ARROW_DISALLOW_COPY_AND_ASSIGN(RandomAccessFileFromSeekableReadBuffer); ARROW_DISALLOW_COPY_AND_ASSIGN(RandomAccessFileFromSeekableReadBuffer);
}; };
class ArrowInputStream : public arrow::io::InputStream
{
public:
explicit ArrowInputStream(ReadBuffer & in);
arrow::Result<int64_t> Read(int64_t nbytes, void* out) override;
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t nbytes) override;
arrow::Status Abort() override;
arrow::Result<int64_t> Tell() const override;
arrow::Status Close() override;
bool closed() const override;
private:
ReadBuffer & in;
ARROW_DISALLOW_COPY_AND_ASSIGN(ArrowInputStream);
};
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in); std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in);
std::shared_ptr<arrow::io::InputStream> asArrowInputStream(ReadBuffer & in);
} }