Parquet output: support NULL values

This commit is contained in:
luc1ph3r 2018-09-22 19:27:50 +03:00
parent f6cae6a5c3
commit cf42b10c3b
2 changed files with 101 additions and 27 deletions

View File

@ -1,6 +1,10 @@
// TODO: clean includes
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnsNumber.h>
#include <Core/ColumnWithTypeAndName.h>
#include <DataTypes/DataTypeNullable.h>
#include <IO/WriteHelpers.h>
#include <arrow/api.h>
@ -48,28 +52,55 @@ void checkFinishStatus(arrow::Status & finish_status, const std::string & column
}
template <typename NumericType, typename ArrowBuilderType>
void ParquetBlockOutputStream::fillArrowArrayWithNumericColumnData(ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array)
{
void ParquetBlockOutputStream::fillArrowArrayWithNumericColumnData(
ColumnPtr write_column,
std::shared_ptr<arrow::Array> & arrow_array,
const PaddedPODArray<UInt8> * null_bytemap
) {
const PaddedPODArray<NumericType> & internal_data = static_cast<const ColumnVector<NumericType> &>(*write_column).getData();
ArrowBuilderType numeric_builder;
arrow::Status append_status;
arrow::Status append_status = numeric_builder.AppendValues(internal_data.data(), internal_data.size());
const UInt8 * arrow_null_bytemap_raw_ptr = nullptr;
PaddedPODArray<UInt8> arrow_null_bytemap;
if (null_bytemap)
{
/// Invert values since Arrow interprets 1 as a non-null value, while CH as a null
arrow_null_bytemap.reserve(null_bytemap->size());
for (size_t i = 0; i != null_bytemap->size(); ++i)
arrow_null_bytemap.emplace_back(1 ^ (*null_bytemap)[i]);
arrow_null_bytemap_raw_ptr = arrow_null_bytemap.data();
}
append_status = numeric_builder.AppendValues(internal_data.data(), internal_data.size(), arrow_null_bytemap_raw_ptr);
checkAppendStatus(append_status, write_column->getName());
arrow::Status finish_status = numeric_builder.Finish(&arrow_array);
checkFinishStatus(finish_status, write_column->getName());
}
void ParquetBlockOutputStream::fillArrowArrayWithStringColumnData(ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array)
{
void ParquetBlockOutputStream::fillArrowArrayWithStringColumnData(
ColumnPtr write_column,
std::shared_ptr<arrow::Array> & arrow_array,
const PaddedPODArray<UInt8> * null_bytemap
) {
const ColumnString & internal_column = static_cast<const ColumnString &>(*write_column);
arrow::StringBuilder string_builder;
arrow::Status append_status;
for (size_t string_i = 0; string_i != internal_column.size(); ++string_i)
{
StringRef string_ref = internal_column.getDataAt(string_i);
if (null_bytemap && (*null_bytemap)[string_i])
{
append_status = string_builder.AppendNull();
}
else
{
StringRef string_ref = internal_column.getDataAt(string_i);
append_status = string_builder.Append(string_ref.data, string_ref.size);
}
arrow::Status append_status = string_builder.Append(string_ref.data, string_ref.size);
checkAppendStatus(append_status, write_column->getName());
}
@ -77,15 +108,23 @@ void ParquetBlockOutputStream::fillArrowArrayWithStringColumnData(ColumnPtr writ
checkFinishStatus(finish_status, write_column->getName());
}
void ParquetBlockOutputStream::fillArrowArrayWithDateColumnData(ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array)
{
void ParquetBlockOutputStream::fillArrowArrayWithDateColumnData(
ColumnPtr write_column,
std::shared_ptr<arrow::Array> & arrow_array,
const PaddedPODArray<UInt8> * null_bytemap
) {
const PaddedPODArray<UInt16> & internal_data = static_cast<const ColumnVector<UInt16> &>(*write_column).getData();
arrow::Date32Builder date32_builder;
arrow::Status append_status;
for (size_t value_i = 0; value_i != internal_data.size(); ++value_i)
{
/// Implicitly converts UInt16 to Int32
arrow::Status append_status = date32_builder.Append(internal_data[value_i]);
if (null_bytemap && (*null_bytemap)[value_i])
append_status = date32_builder.AppendNull();
else
/// Implicitly converts UInt16 to Int32
append_status = date32_builder.Append(internal_data[value_i]);
checkAppendStatus(append_status, write_column->getName());
}
@ -119,12 +158,20 @@ const std::unordered_map<String, std::shared_ptr<arrow::DataType>> ParquetBlockO
{"Date", arrow::date32()},
// TODO: ClickHouse can actually store non-utf8 strings!
{"String", arrow::utf8()}//,
// TODO: add other types:
// 1. FixedString
// 2. DateTime
};
const PaddedPODArray<UInt8> * extractNullBytemapPtr(ColumnPtr column)
{
ColumnPtr null_column = static_cast<const ColumnNullable &>(*column).getNullMapColumnPtr();
const PaddedPODArray<UInt8> & null_bytemap = static_cast<const ColumnVector<UInt8> &>(*null_column).getData();
return &null_bytemap;
}
void ParquetBlockOutputStream::write(const Block & block)
{
block.checkNumberOfRows();
@ -139,38 +186,62 @@ void ParquetBlockOutputStream::write(const Block & block)
for (size_t column_i = 0; column_i < columns_num; ++column_i)
{
// TODO: constructed every iteration
const ColumnWithTypeAndName & column = block.safeGetByPosition(column_i);
// TODO: support NULLs
arrow_fields.emplace_back(new arrow::Field(column.name, internal_type_to_arrow_type.at(column.type->getName()), /*nullable = */false));
const bool is_column_nullable = column.type->isNullable();
const DataTypePtr column_nested_type =
is_column_nullable
? static_cast<const DataTypeNullable *>(column.type.get())->getNestedType()
: column.type;
const DataTypePtr column_type = column.type;
// TODO: do not mix std::string and String
const std::string column_nested_type_name = column_nested_type->getName();
if (internal_type_to_arrow_type.find(column_nested_type_name) == internal_type_to_arrow_type.end())
{
throw Exception(
"The type \"" + column_nested_type_name + "\" of a column \"" + column.name + "\""
" is not supported for conversion into a Parquet data format"
/*, ErrorCodes::TODO*/
);
}
arrow_fields.emplace_back(new arrow::Field(
column.name,
internal_type_to_arrow_type.at(column_nested_type_name),
is_column_nullable
));
std::shared_ptr<arrow::Array> arrow_array;
String internal_type_name = column.type->getName();
ColumnPtr nested_column = is_column_nullable ? static_cast<const ColumnNullable &>(*column.column).getNestedColumnPtr() : column.column;
const PaddedPODArray<UInt8> * null_bytemap = is_column_nullable ? extractNullBytemapPtr(column.column) : nullptr;
if ("String" == internal_type_name)
// TODO: use typeid_cast
if ("String" == column_nested_type_name)
{
fillArrowArrayWithStringColumnData(column.column, arrow_array);
fillArrowArrayWithStringColumnData(nested_column, arrow_array, null_bytemap);
}
else if ("Date" == column_nested_type_name)
{
fillArrowArrayWithDateColumnData(nested_column, arrow_array, null_bytemap);
}
#define DISPATCH(CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE) \
else if (#CPP_NUMERIC_TYPE == internal_type_name) \
else if (#CPP_NUMERIC_TYPE == column_nested_type_name) \
{ \
fillArrowArrayWithNumericColumnData<CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE>(column.column, arrow_array); \
fillArrowArrayWithNumericColumnData<CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE>(nested_column, arrow_array, null_bytemap); \
}
FOR_INTERNAL_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
else if ("Date" == internal_type_name)
{
fillArrowArrayWithDateColumnData(column.column, arrow_array); \
}
// TODO: there are also internal types that are convertable to parquet/arrow once:
// 1. FixedString(N)
// 2. DateTime
else
{
throw Exception(
"Internal type " + column.type->getName() + " of a column \"" + column.name + "\" "
"is not supported for a conversion into a Parquet format"/*, ErrorCodes::TODO*/
"Internal type \"" + column_nested_type_name + "\" of a column \"" + column.name + "\""
" is not supported for conversion into a Parquet data format"/*, ErrorCodes::TODO*/
);
}

View File

@ -22,10 +22,13 @@ private:
WriteBuffer & ostr;
Block header;
static void fillArrowArrayWithDateColumnData(ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array);
static void fillArrowArrayWithStringColumnData(ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array);
static void fillArrowArrayWithDateColumnData(ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array,
const PaddedPODArray<UInt8> * null_bytemap);
static void fillArrowArrayWithStringColumnData(ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array,
const PaddedPODArray<UInt8> * null_bytemap);
template <typename NumericType, typename ArrowBuilderType>
static void fillArrowArrayWithNumericColumnData(ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array);
static void fillArrowArrayWithNumericColumnData(ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array,
const PaddedPODArray<UInt8> * null_bytemap);
static const std::unordered_map<String, std::shared_ptr<arrow::DataType>> internal_type_to_arrow_type;
};