Parquet improvements

- Read only required columns
- Avoid reading the entire file to RAM when possible
- Allow using internal parquet on darwin
This commit is contained in:
Andrew Onyshchuk 2020-02-23 00:04:58 -06:00
parent 49c68012bf
commit d15ff3e8d5
3 changed files with 144 additions and 67 deletions

View File

@ -4,7 +4,7 @@ endif()
if (ENABLE_PARQUET)
if (NOT OS_FREEBSD AND NOT OS_DARWIN) # Freebsd: ../contrib/arrow/cpp/src/arrow/util/bit-util.h:27:10: fatal error: endian.h: No such file or directory
if (NOT OS_FREEBSD) # Freebsd: ../contrib/arrow/cpp/src/arrow/util/bit-util.h:27:10: fatal error: endian.h: No such file or directory
option(USE_INTERNAL_PARQUET_LIBRARY "Set to FALSE to use system parquet library instead of bundled" ${NOT_UNBUNDLED})
endif()

View File

@ -4,91 +4,171 @@
#include <Formats/FormatFactory.h>
#include <IO/BufferBase.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <arrow/status.h>
#include <parquet/arrow/reader.h>
#include <parquet/file_reader.h>
#include "ArrowColumnToCHColumn.h"
#include <common/logger_useful.h>
#include <sys/stat.h>
namespace DB
{
ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_)
: IInputFormat(std::move(header_), in_)
class RandomAccessFileFromSeekableReadBuffer : public arrow::io::RandomAccessFile
{
public:
RandomAccessFileFromSeekableReadBuffer(SeekableReadBuffer& in_, off_t file_size_)
: in(in_)
, file_size(file_size_)
, is_closed(false)
{
}
Chunk ParquetBlockInputFormat::generate()
virtual arrow::Status GetSize(int64_t* size) override
{
Chunk res;
auto &header = getPort().getHeader();
*size = file_size;
return arrow::Status::OK();
}
if (!in.eof())
virtual arrow::Status Close() override
{
is_closed = true;
return arrow::Status::OK();
}
virtual arrow::Status Tell(int64_t* position) const override
{
*position = in.getPosition();
return arrow::Status::OK();
}
virtual bool closed() const override { return is_closed; }
virtual arrow::Status Read(int64_t nbytes, int64_t* bytes_read, void* out) override
{
*bytes_read = in.readBig(reinterpret_cast<char *>(out), nbytes);
return arrow::Status::OK();
}
virtual arrow::Status Read(int64_t nbytes, std::shared_ptr<arrow::Buffer>* out) override
{
std::shared_ptr<arrow::Buffer> buf;
ARROW_RETURN_NOT_OK(arrow::AllocateBuffer(nbytes, &buf));
size_t n = in.readBig(reinterpret_cast<char *>(buf->mutable_data()), nbytes);
*out = arrow::SliceBuffer(buf, 0, n);
return arrow::Status::OK();
}
virtual arrow::Status Seek(int64_t position) override
{
in.seek(position, SEEK_SET);
return arrow::Status::OK();
}
private:
SeekableReadBuffer& in;
off_t file_size;
bool is_closed;
};
static std::shared_ptr<arrow::io::RandomAccessFile> as_arrow_file(ReadBuffer & in)
{
if (auto fd_in = dynamic_cast<ReadBufferFromFileDescriptor*>(&in))
{
struct stat stat;
::fstat(fd_in->getFD(), &stat);
// if fd is a regular file i.e. not stdin
if (S_ISREG(stat.st_mode))
{
/*
First we load whole stream into string (its very bad and limiting .parquet file size to half? of RAM)
Then producing blocks for every row_group (dont load big .parquet files with one row_group - it can eat x10+ RAM from .parquet file size)
*/
if (row_group_current < row_group_total)
throw Exception{"Got new data, but data from previous chunks was not read " +
std::to_string(row_group_current) + "/" + std::to_string(row_group_total),
ErrorCodes::CANNOT_READ_ALL_DATA};
file_data.clear();
{
WriteBufferFromString file_buffer(file_data);
copyData(in, file_buffer);
}
buffer = std::make_unique<arrow::Buffer>(file_data);
// TODO: maybe use parquet::RandomAccessSource?
auto status = parquet::arrow::FileReader::Make(
::arrow::default_memory_pool(),
parquet::ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(*buffer)),
&file_reader);
row_group_total = file_reader->num_row_groups();
row_group_current = 0;
return std::make_shared<RandomAccessFileFromSeekableReadBuffer>(*fd_in, stat.st_size);
}
//DUMP(row_group_current, row_group_total);
if (row_group_current >= row_group_total)
return res;
}
// TODO: also catch a ParquetException thrown by filereader?
//arrow::Status read_status = filereader.ReadTable(&table);
std::shared_ptr<arrow::Table> table;
arrow::Status read_status = file_reader->ReadRowGroup(row_group_current, &table);
// fallback to loading the entire file in memory
std::string file_data;
{
WriteBufferFromString file_buffer(file_data);
copyData(in, file_buffer);
}
return std::make_shared<arrow::io::BufferReader>(arrow::Buffer::FromString(std::move(file_data)));
}
ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, read_status, header, row_group_current, "Parquet");
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
#define THROW_ARROW_NOT_OK(status) \
do { \
::arrow::Status __s = (status); \
if(!__s.ok()) \
throw Exception(__s.ToString(), ErrorCodes::BAD_ARGUMENTS);\
} while (false)
ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_)
: IInputFormat(std::move(header_), in_)
{
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(as_arrow_file(in_), arrow::default_memory_pool(), &file_reader));
row_group_total = file_reader->num_row_groups();
std::shared_ptr<arrow::Schema> schema;
THROW_ARROW_NOT_OK(file_reader->GetSchema(&schema));
for (int i = 0; i < schema->num_fields(); ++i)
{
if (getPort().getHeader().has(schema->field(i)->name()))
{
column_indices.push_back(i);
}
}
}
Chunk ParquetBlockInputFormat::generate()
{
Chunk res;
auto &header = getPort().getHeader();
if (row_group_current >= row_group_total)
return res;
}
void ParquetBlockInputFormat::resetParser()
{
IInputFormat::resetParser();
std::shared_ptr<arrow::Table> table;
arrow::Status read_status = file_reader->ReadRowGroup(row_group_current, column_indices, &table);
ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, read_status, header, row_group_current, "Parquet");
return res;
}
file_reader.reset();
file_data.clear();
buffer.reset();
row_group_total = 0;
row_group_current = 0;
}
void ParquetBlockInputFormat::resetParser()
{
IInputFormat::resetParser();
void registerInputFormatProcessorParquet(FormatFactory &factory)
{
factory.registerInputFormatProcessor(
"Parquet",
[](ReadBuffer &buf,
const Block &sample,
const RowInputFormatParams &,
const FormatSettings & /* settings */)
{
return std::make_shared<ParquetBlockInputFormat>(buf, sample);
});
}
file_reader.reset();
row_group_total = 0;
row_group_current = 0;
}
void registerInputFormatProcessorParquet(FormatFactory &factory)
{
factory.registerInputFormatProcessor(
"Parquet",
[](ReadBuffer &buf,
const Block &sample,
const RowInputFormatParams &,
const FormatSettings & /* settings */)
{
return std::make_shared<ParquetBlockInputFormat>(buf, sample);
});
}
}

View File

@ -27,13 +27,10 @@ protected:
Chunk generate() override;
private:
// TODO: check that this class implements every part of its parent
std::unique_ptr<parquet::arrow::FileReader> file_reader;
std::string file_data;
std::unique_ptr<arrow::Buffer> buffer;
int row_group_total = 0;
// indices of columns to read from Parquet file
std::vector<int> column_indices;
int row_group_current = 0;
};