Parquet input: get rid of redundant copying

This commit is contained in:
Ivan Zhukov 2018-05-15 18:30:46 +03:00
parent 282110cba4
commit f8dae2bc76
2 changed files with 102 additions and 49 deletions

View File

@ -4,6 +4,9 @@
// TODO: clear includes
#include <Core/ColumnWithTypeAndName.h>
#include <Columns/IColumn.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <DataStreams/ParquetBlockInputStream.h>
#include <DataTypes/DataTypeFactory.h>
#include <IO/BufferBase.h>
@ -36,15 +39,73 @@ Block ParquetBlockInputStream::getHeader() const
return header;
}
void ParquetBlockInputStream::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows)
/// Inserts numeric data right into internal column data to reduce an overhead
template <typename NumericType>
void ParquetBlockInputStream::fillColumnWithNumericData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
{
IDataType::InputStreamGetter input_stream_getter = [&] (const IDataType::SubstreamPath &) { return &istr; };
type.deserializeBinaryBulkWithMultipleStreams(column, input_stream_getter, rows, /* avg_value_size_hint = */0, false, {});
PaddedPODArray<NumericType> & column_data = static_cast<ColumnVector<NumericType> &>(*internal_column).getData();
column_data.reserve(arrow_column->length());
if (column.size() != rows)
throw Exception("Cannot read all data in ParquetBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA);
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->data()->num_chunks()); ++chunk_i)
{
std::shared_ptr<arrow::Array> chunk = arrow_column->data()->chunk(chunk_i);
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
const NumericType * raw_data = reinterpret_cast<const NumericType *>(buffer->data());
column_data.insert_assume_reserved(raw_data, raw_data + chunk->length());
}
}
/// Inserts chars and offsets right into internal column data to reduce an overhead.
/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars.
/// Also internal strings are null terminated.
void ParquetBlockInputStream::fillColumnWithStringData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
{
PaddedPODArray<UInt8> & column_chars_t = static_cast<ColumnString &>(*internal_column).getChars();
PaddedPODArray<UInt64> & column_offsets = static_cast<ColumnString &>(*internal_column).getOffsets();
size_t chars_t_size = 0;
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->data()->num_chunks()); ++chunk_i)
{
arrow::BinaryArray & chunk = static_cast<arrow::BinaryArray &>(*(arrow_column->data()->chunk(chunk_i)));
const size_t chunk_length = chunk.length();
chars_t_size += chunk.value_offset(chunk_length - 1) + chunk.value_length(chunk_length - 1);
chars_t_size += chunk_length; /// additional space for null bytes
}
column_chars_t.reserve(chars_t_size);
column_offsets.reserve(arrow_column->length());
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->data()->num_chunks()); ++chunk_i)
{
arrow::BinaryArray & chunk = static_cast<arrow::BinaryArray &>(*(arrow_column->data()->chunk(chunk_i)));
std::shared_ptr<arrow::Buffer> buffer = chunk.value_data();
const size_t chunk_length = chunk.length();
for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i)
{
const UInt8 * raw_data = buffer->data() + chunk.value_offset(offset_i);
column_chars_t.insert_assume_reserved(raw_data, raw_data + chunk.value_length(offset_i));
column_chars_t.emplace_back('\0');
column_offsets.emplace_back(column_chars_t.size());
}
}
}
#define FOR_ARROW_NUMERIC_TYPES(M) \
M(arrow::Type::UINT8, UInt8) \
M(arrow::Type::INT8, Int8) \
M(arrow::Type::UINT16, UInt16) \
M(arrow::Type::INT16, Int16) \
M(arrow::Type::UINT32, UInt32) \
M(arrow::Type::INT32, Int32) \
M(arrow::Type::UINT64, UInt64) \
M(arrow::Type::INT64, Int64) \
M(arrow::Type::FLOAT, Float32) \
M(arrow::Type::DOUBLE, Float64)
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::Column>>;
@ -87,9 +148,9 @@ Block ParquetBlockInputStream::readImpl()
name_to_column_ptr[arrow_column->name()] = arrow_column;
}
for (size_t i = 0; i != header.columns(); ++i)
for (size_t column_i = 0; column_i != header.columns(); ++column_i)
{
ColumnWithTypeAndName header_column = header.getByPosition(i);
ColumnWithTypeAndName header_column = header.getByPosition(column_i);
if (name_to_column_ptr.find(header_column.name) == name_to_column_ptr.end())
// TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable?
@ -102,21 +163,20 @@ Block ParquetBlockInputStream::readImpl()
std::shared_ptr<arrow::Column> arrow_column = name_to_column_ptr[header_column.name];
arrow::Type::type arrow_type = arrow_column->type()->id();
if (arrow_type_to_native_type.find(arrow_type) == arrow_type_to_native_type.end())
if (arrow_type_to_internal_type.find(arrow_type) == arrow_type_to_internal_type.end())
{
throw Exception("Unsupported type " + arrow_column->type()->name() + " of a column " + arrow_column->name()/*, ErrorCodes::TODO*/);
}
DataTypePtr native_type = arrow_type_to_native_type[arrow_type];
if (header_column.type->getName() != native_type->getName())
DataTypePtr internal_type = arrow_type_to_internal_type[arrow_type];
if (header_column.type->getName() != internal_type->getName())
{
throw Exception("Input data type " + native_type->getName() + " for column \"" + header_column.name + "\" is not compatible with an actual type " + header_column.type->getName());
throw Exception("Input data type " + internal_type->getName() + " for column \"" + header_column.name + "\" is not compatible with an actual type " + header_column.type->getName());
}
ColumnWithTypeAndName column;
column.name = header_column.name;
column.type = native_type;
column.type = internal_type;
/// Data
MutableColumnPtr read_column = column.type->createColumn();
@ -125,38 +185,26 @@ Block ParquetBlockInputStream::readImpl()
if (arrow::Type::STRING == arrow_type)
{
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->data()->num_chunks()); ++chunk_i)
{
arrow::StringArray & chunk = static_cast<arrow::StringArray &>(*(arrow_column->data()->chunk(chunk_i)));
/// buffers[0] is a null bitmap and buffers[1] are actual values
// TODO: need to recalculate the size if strings are null-terminated
size_t buf_sz = chunk.data()->buffers[1]->size() + (chunk.length() * sizeof(size_t));
// TODO: naming
std::vector<char> data(buf_sz);
WriteBuffer wb(data.data(), data.size());
for (size_t string_i = 0; string_i != static_cast<size_t>(chunk.length()); ++string_i)
writeStringBinary(chunk.GetString(string_i), wb);
ReadBufferFromMemory values_buffer(data.data(), data.size());
size_t rows_num = chunk.length();
readData(*column.type, *read_column, values_buffer, rows_num);
fillColumnWithStringData(arrow_column, read_column);
}
// TODO: check that values smaller than INT32 are being read correctly
#define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
else if (ARROW_NUMERIC_TYPE == arrow_type) \
{ \
fillColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, read_column); \
}
FOR_ARROW_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
// TODO: arrow::Type::BOOLEAN
// TODO: arrow::Type::DATE32
// TODO: arrow::Type::DATE64
// TODO: add other types
else
{
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->data()->num_chunks()); ++chunk_i)
{
std::shared_ptr<arrow::Array> chunk = arrow_column->data()->chunk(chunk_i);
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
// TODO: make less copying?
ReadBufferFromMemory values_buffer(buffer->data(), buffer->size());
size_t rows_num = chunk->length();
readData(*column.type, *read_column, values_buffer, rows_num);
}
throw Exception("Unsupported parquet type " + arrow_column->type()->name()/*, ErrorCodes::TODO*/);
}
column.column = std::move(read_column);

View File

@ -1,5 +1,7 @@
#pragma once
#include <Columns/IColumn.h>
#include <Columns/ColumnVector.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
@ -20,8 +22,6 @@ public:
String getName() const override { return "Parquet"; }
Block getHeader() const override;
static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows);
protected:
Block readImpl() override;
@ -29,8 +29,12 @@ private:
ReadBuffer & istr;
Block header;
std::unordered_map<arrow::Type::type, std::shared_ptr<IDataType>> arrow_type_to_native_type = {
{arrow::Type::BOOL, std::make_shared<DataTypeUInt8>()},
template <typename NumericType>
void fillColumnWithNumericData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column);
void fillColumnWithStringData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column);
std::unordered_map<arrow::Type::type, std::shared_ptr<IDataType>> arrow_type_to_internal_type = {
{arrow::Type::UINT8, std::make_shared<DataTypeUInt8>()},
{arrow::Type::INT8, std::make_shared<DataTypeInt8>()},
{arrow::Type::UINT16, std::make_shared<DataTypeUInt16>()},
@ -41,9 +45,10 @@ private:
{arrow::Type::INT64, std::make_shared<DataTypeInt64>()},
{arrow::Type::FLOAT, std::make_shared<DataTypeFloat32>()},
{arrow::Type::DOUBLE, std::make_shared<DataTypeFloat64>()},
// TODO:
{arrow::Type::STRING, std::make_shared<DataTypeString>()}//,
/* {arrow::Type::BINARY, Binary, ByteArrayType}, */
// TODO:
/* {arrow::Type::BOOL, std::make_shared<DataTypeUInt8>()}, */
/* {arrow::Type::DATE32, Date32, Int32Type}, */
/* {arrow::Type::DATE64, Date64, Int32Type}//, */
// TODO: add other types