From 554e7a0dd20c6dc0dd8d6e612afc835da3f64797 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sat, 2 May 2020 22:59:07 +0300 Subject: [PATCH] Preparations --- .../Formats/Impl/ParquetBlockInputFormat.cpp | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 221e1906a0a..235a5eab9b5 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -26,7 +26,7 @@ namespace DB class RandomAccessFileFromSeekableReadBuffer : public arrow::io::RandomAccessFile { public: - RandomAccessFileFromSeekableReadBuffer(SeekableReadBuffer& in_, off_t file_size_) + RandomAccessFileFromSeekableReadBuffer(SeekableReadBuffer & in_, off_t file_size_) : in(in_) , file_size(file_size_) , is_closed(false) @@ -34,33 +34,33 @@ public: } - virtual arrow::Status GetSize(int64_t* size) override + arrow::Status GetSize(int64_t* size) override { *size = file_size; return arrow::Status::OK(); } - virtual arrow::Status Close() override + arrow::Status Close() override { is_closed = true; return arrow::Status::OK(); } - virtual arrow::Status Tell(int64_t* position) const override + arrow::Status Tell(int64_t* position) const override { *position = in.getPosition(); return arrow::Status::OK(); } - virtual bool closed() const override { return is_closed; } + bool closed() const override { return is_closed; } - virtual arrow::Status Read(int64_t nbytes, int64_t* bytes_read, void* out) override + arrow::Status Read(int64_t nbytes, int64_t* bytes_read, void* out) override { *bytes_read = in.readBig(reinterpret_cast(out), nbytes); return arrow::Status::OK(); } - virtual arrow::Status Read(int64_t nbytes, std::shared_ptr* out) override + arrow::Status Read(int64_t nbytes, std::shared_ptr* out) override { std::shared_ptr buf; ARROW_RETURN_NOT_OK(arrow::AllocateBuffer(nbytes, &buf)); @@ -69,22 +69,22 @@ public: return arrow::Status::OK(); } - virtual arrow::Status Seek(int64_t position) override + arrow::Status Seek(int64_t position) override { in.seek(position, SEEK_SET); return arrow::Status::OK(); } private: - SeekableReadBuffer& in; + SeekableReadBuffer & in; off_t file_size; bool is_closed; }; -static std::shared_ptr as_arrow_file(ReadBuffer & in) +static std::shared_ptr asArrowFile(ReadBuffer & in) { - if (auto * fd_in = dynamic_cast(&in)) + if (auto * fd_in = dynamic_cast(&in)) { struct stat stat; auto res = ::fstat(fd_in->getFD(), &stat); @@ -101,6 +101,7 @@ static std::shared_ptr as_arrow_file(ReadBuffer & WriteBufferFromString file_buffer(file_data); copyData(in, file_buffer); } + return std::make_shared(arrow::Buffer::FromString(std::move(file_data))); } @@ -119,7 +120,7 @@ namespace ErrorCodes ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_) : IInputFormat(std::move(header_), in_) { - THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(as_arrow_file(in_), arrow::default_memory_pool(), &file_reader)); + THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(in_), arrow::default_memory_pool(), &file_reader)); row_group_total = file_reader->num_row_groups(); std::shared_ptr schema;