Preparations

This commit is contained in:
Alexey Milovidov 2020-05-02 22:59:07 +03:00
parent e6ab4d655b
commit 554e7a0dd2

View File

@ -26,7 +26,7 @@ namespace DB
class RandomAccessFileFromSeekableReadBuffer : public arrow::io::RandomAccessFile
{
public:
RandomAccessFileFromSeekableReadBuffer(SeekableReadBuffer& in_, off_t file_size_)
RandomAccessFileFromSeekableReadBuffer(SeekableReadBuffer & in_, off_t file_size_)
: in(in_)
, file_size(file_size_)
, is_closed(false)
@ -34,33 +34,33 @@ public:
}
virtual arrow::Status GetSize(int64_t* size) override
arrow::Status GetSize(int64_t* size) override
{
*size = file_size;
return arrow::Status::OK();
}
virtual arrow::Status Close() override
arrow::Status Close() override
{
is_closed = true;
return arrow::Status::OK();
}
virtual arrow::Status Tell(int64_t* position) const override
arrow::Status Tell(int64_t* position) const override
{
*position = in.getPosition();
return arrow::Status::OK();
}
virtual bool closed() const override { return is_closed; }
bool closed() const override { return is_closed; }
virtual arrow::Status Read(int64_t nbytes, int64_t* bytes_read, void* out) override
arrow::Status Read(int64_t nbytes, int64_t* bytes_read, void* out) override
{
*bytes_read = in.readBig(reinterpret_cast<char *>(out), nbytes);
return arrow::Status::OK();
}
virtual arrow::Status Read(int64_t nbytes, std::shared_ptr<arrow::Buffer>* out) override
arrow::Status Read(int64_t nbytes, std::shared_ptr<arrow::Buffer>* out) override
{
std::shared_ptr<arrow::Buffer> buf;
ARROW_RETURN_NOT_OK(arrow::AllocateBuffer(nbytes, &buf));
@ -69,22 +69,22 @@ public:
return arrow::Status::OK();
}
virtual arrow::Status Seek(int64_t position) override
arrow::Status Seek(int64_t position) override
{
in.seek(position, SEEK_SET);
return arrow::Status::OK();
}
private:
SeekableReadBuffer& in;
SeekableReadBuffer & in;
off_t file_size;
bool is_closed;
};
static std::shared_ptr<arrow::io::RandomAccessFile> as_arrow_file(ReadBuffer & in)
static std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in)
{
if (auto * fd_in = dynamic_cast<ReadBufferFromFileDescriptor*>(&in))
if (auto * fd_in = dynamic_cast<ReadBufferFromFileDescriptor *>(&in))
{
struct stat stat;
auto res = ::fstat(fd_in->getFD(), &stat);
@ -101,6 +101,7 @@ static std::shared_ptr<arrow::io::RandomAccessFile> as_arrow_file(ReadBuffer &
WriteBufferFromString file_buffer(file_data);
copyData(in, file_buffer);
}
return std::make_shared<arrow::io::BufferReader>(arrow::Buffer::FromString(std::move(file_data)));
}
@ -119,7 +120,7 @@ namespace ErrorCodes
ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_)
: IInputFormat(std::move(header_), in_)
{
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(as_arrow_file(in_), arrow::default_memory_pool(), &file_reader));
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(in_), arrow::default_memory_pool(), &file_reader));
row_group_total = file_reader->num_row_groups();
std::shared_ptr<arrow::Schema> schema;