#include "ArrowBufferedStreams.h" #if USE_ARROW || USE_ORC || USE_PARQUET #include #include #include #include #include #include #include namespace DB { ArrowBufferedOutputStream::ArrowBufferedOutputStream(WriteBuffer & out_) : out{out_}, is_open{true} { } arrow::Status ArrowBufferedOutputStream::Close() { is_open = false; return arrow::Status::OK(); } arrow::Result ArrowBufferedOutputStream::Tell() const { return arrow::Result(total_length); } arrow::Status ArrowBufferedOutputStream::Write(const void * data, int64_t length) { out.write(reinterpret_cast(data), length); total_length += length; return arrow::Status::OK(); } RandomAccessFileFromSeekableReadBuffer::RandomAccessFileFromSeekableReadBuffer(SeekableReadBuffer & in_, off_t file_size_) : in{in_}, file_size{file_size_}, is_open{true} { } arrow::Result RandomAccessFileFromSeekableReadBuffer::GetSize() { return arrow::Result(file_size); } arrow::Status RandomAccessFileFromSeekableReadBuffer::Close() { is_open = false; return arrow::Status::OK(); } arrow::Result RandomAccessFileFromSeekableReadBuffer::Tell() const { return in.getPosition(); } arrow::Result RandomAccessFileFromSeekableReadBuffer::Read(int64_t nbytes, void * out) { return in.readBig(reinterpret_cast(out), nbytes); } arrow::Result> RandomAccessFileFromSeekableReadBuffer::Read(int64_t nbytes) { ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes)) ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data())) if (bytes_read < nbytes) RETURN_NOT_OK(buffer->Resize(bytes_read)); return buffer; } arrow::Status RandomAccessFileFromSeekableReadBuffer::Seek(int64_t position) { in.seek(position, SEEK_SET); return arrow::Status::OK(); } ArrowInputStreamFromReadBuffer::ArrowInputStreamFromReadBuffer(ReadBuffer & in_) : in(in_), is_open{true} { } arrow::Result ArrowInputStreamFromReadBuffer::Read(int64_t nbytes, void * out) { return in.readBig(reinterpret_cast(out), nbytes); } arrow::Result> ArrowInputStreamFromReadBuffer::Read(int64_t nbytes) { ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes)) ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data())) if (bytes_read < nbytes) RETURN_NOT_OK(buffer->Resize(bytes_read)); return buffer; } arrow::Status ArrowInputStreamFromReadBuffer::Abort() { return arrow::Status(); } arrow::Result ArrowInputStreamFromReadBuffer::Tell() const { return in.count(); } arrow::Status ArrowInputStreamFromReadBuffer::Close() { is_open = false; return arrow::Status(); } std::shared_ptr asArrowFile(ReadBuffer & in) { if (auto * fd_in = dynamic_cast(&in)) { struct stat stat; auto res = ::fstat(fd_in->getFD(), &stat); // if fd is a regular file i.e. not stdin if (res == 0 && S_ISREG(stat.st_mode)) return std::make_shared(*fd_in, stat.st_size); } // fallback to loading the entire file in memory std::string file_data; { WriteBufferFromString file_buffer(file_data); copyData(in, file_buffer); } return std::make_shared(arrow::Buffer::FromString(std::move(file_data))); } } #endif