diff --git a/dbms/src/DataStreams/ParquetBlockOutputStream.cpp b/dbms/src/DataStreams/ParquetBlockOutputStream.cpp index bc4d1f837d1..416bf02552d 100644 --- a/dbms/src/DataStreams/ParquetBlockOutputStream.cpp +++ b/dbms/src/DataStreams/ParquetBlockOutputStream.cpp @@ -1,6 +1,10 @@ // TODO: clean includes +#include #include #include +#include +#include +#include #include #include @@ -48,28 +52,55 @@ void checkFinishStatus(arrow::Status & finish_status, const std::string & column } template -void ParquetBlockOutputStream::fillArrowArrayWithNumericColumnData(ColumnPtr write_column, std::shared_ptr & arrow_array) -{ +void ParquetBlockOutputStream::fillArrowArrayWithNumericColumnData( + ColumnPtr write_column, + std::shared_ptr & arrow_array, + const PaddedPODArray * null_bytemap +) { const PaddedPODArray & internal_data = static_cast &>(*write_column).getData(); ArrowBuilderType numeric_builder; + arrow::Status append_status; - arrow::Status append_status = numeric_builder.AppendValues(internal_data.data(), internal_data.size()); + const UInt8 * arrow_null_bytemap_raw_ptr = nullptr; + PaddedPODArray arrow_null_bytemap; + if (null_bytemap) + { + /// Invert values since Arrow interprets 1 as a non-null value, while CH as a null + arrow_null_bytemap.reserve(null_bytemap->size()); + for (size_t i = 0; i != null_bytemap->size(); ++i) + arrow_null_bytemap.emplace_back(1 ^ (*null_bytemap)[i]); + + arrow_null_bytemap_raw_ptr = arrow_null_bytemap.data(); + } + + append_status = numeric_builder.AppendValues(internal_data.data(), internal_data.size(), arrow_null_bytemap_raw_ptr); checkAppendStatus(append_status, write_column->getName()); arrow::Status finish_status = numeric_builder.Finish(&arrow_array); checkFinishStatus(finish_status, write_column->getName()); } -void ParquetBlockOutputStream::fillArrowArrayWithStringColumnData(ColumnPtr write_column, std::shared_ptr & arrow_array) -{ +void ParquetBlockOutputStream::fillArrowArrayWithStringColumnData( + ColumnPtr write_column, + std::shared_ptr & arrow_array, + const PaddedPODArray * null_bytemap +) { const ColumnString & internal_column = static_cast(*write_column); arrow::StringBuilder string_builder; + arrow::Status append_status; for (size_t string_i = 0; string_i != internal_column.size(); ++string_i) { - StringRef string_ref = internal_column.getDataAt(string_i); + if (null_bytemap && (*null_bytemap)[string_i]) + { + append_status = string_builder.AppendNull(); + } + else + { + StringRef string_ref = internal_column.getDataAt(string_i); + append_status = string_builder.Append(string_ref.data, string_ref.size); + } - arrow::Status append_status = string_builder.Append(string_ref.data, string_ref.size); checkAppendStatus(append_status, write_column->getName()); } @@ -77,15 +108,23 @@ void ParquetBlockOutputStream::fillArrowArrayWithStringColumnData(ColumnPtr writ checkFinishStatus(finish_status, write_column->getName()); } -void ParquetBlockOutputStream::fillArrowArrayWithDateColumnData(ColumnPtr write_column, std::shared_ptr & arrow_array) -{ +void ParquetBlockOutputStream::fillArrowArrayWithDateColumnData( + ColumnPtr write_column, + std::shared_ptr & arrow_array, + const PaddedPODArray * null_bytemap +) { const PaddedPODArray & internal_data = static_cast &>(*write_column).getData(); arrow::Date32Builder date32_builder; + arrow::Status append_status; for (size_t value_i = 0; value_i != internal_data.size(); ++value_i) { - /// Implicitly converts UInt16 to Int32 - arrow::Status append_status = date32_builder.Append(internal_data[value_i]); + if (null_bytemap && (*null_bytemap)[value_i]) + append_status = date32_builder.AppendNull(); + else + /// Implicitly converts UInt16 to Int32 + append_status = date32_builder.Append(internal_data[value_i]); + checkAppendStatus(append_status, write_column->getName()); } @@ -119,12 +158,20 @@ const std::unordered_map> ParquetBlockO {"Date", arrow::date32()}, + // TODO: ClickHouse can actually store non-utf8 strings! {"String", arrow::utf8()}//, // TODO: add other types: // 1. FixedString // 2. DateTime }; +const PaddedPODArray * extractNullBytemapPtr(ColumnPtr column) +{ + ColumnPtr null_column = static_cast(*column).getNullMapColumnPtr(); + const PaddedPODArray & null_bytemap = static_cast &>(*null_column).getData(); + return &null_bytemap; +} + void ParquetBlockOutputStream::write(const Block & block) { block.checkNumberOfRows(); @@ -139,38 +186,62 @@ void ParquetBlockOutputStream::write(const Block & block) for (size_t column_i = 0; column_i < columns_num; ++column_i) { + // TODO: constructed every iteration const ColumnWithTypeAndName & column = block.safeGetByPosition(column_i); - // TODO: support NULLs - arrow_fields.emplace_back(new arrow::Field(column.name, internal_type_to_arrow_type.at(column.type->getName()), /*nullable = */false)); + const bool is_column_nullable = column.type->isNullable(); + const DataTypePtr column_nested_type = + is_column_nullable + ? static_cast(column.type.get())->getNestedType() + : column.type; + const DataTypePtr column_type = column.type; + // TODO: do not mix std::string and String + const std::string column_nested_type_name = column_nested_type->getName(); + + if (internal_type_to_arrow_type.find(column_nested_type_name) == internal_type_to_arrow_type.end()) + { + throw Exception( + "The type \"" + column_nested_type_name + "\" of a column \"" + column.name + "\"" + " is not supported for conversion into a Parquet data format" + /*, ErrorCodes::TODO*/ + ); + } + + arrow_fields.emplace_back(new arrow::Field( + column.name, + internal_type_to_arrow_type.at(column_nested_type_name), + is_column_nullable + )); std::shared_ptr arrow_array; - String internal_type_name = column.type->getName(); + ColumnPtr nested_column = is_column_nullable ? static_cast(*column.column).getNestedColumnPtr() : column.column; + const PaddedPODArray * null_bytemap = is_column_nullable ? extractNullBytemapPtr(column.column) : nullptr; - if ("String" == internal_type_name) + // TODO: use typeid_cast + if ("String" == column_nested_type_name) { - fillArrowArrayWithStringColumnData(column.column, arrow_array); + fillArrowArrayWithStringColumnData(nested_column, arrow_array, null_bytemap); + } + else if ("Date" == column_nested_type_name) + { + fillArrowArrayWithDateColumnData(nested_column, arrow_array, null_bytemap); } #define DISPATCH(CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE) \ - else if (#CPP_NUMERIC_TYPE == internal_type_name) \ + else if (#CPP_NUMERIC_TYPE == column_nested_type_name) \ { \ - fillArrowArrayWithNumericColumnData(column.column, arrow_array); \ + fillArrowArrayWithNumericColumnData(nested_column, arrow_array, null_bytemap); \ } FOR_INTERNAL_NUMERIC_TYPES(DISPATCH) #undef DISPATCH - else if ("Date" == internal_type_name) - { - fillArrowArrayWithDateColumnData(column.column, arrow_array); \ - } // TODO: there are also internal types that are convertable to parquet/arrow once: // 1. FixedString(N) // 2. DateTime else { throw Exception( - "Internal type " + column.type->getName() + " of a column \"" + column.name + "\" " - "is not supported for a conversion into a Parquet format"/*, ErrorCodes::TODO*/ + "Internal type \"" + column_nested_type_name + "\" of a column \"" + column.name + "\"" + " is not supported for conversion into a Parquet data format"/*, ErrorCodes::TODO*/ ); } diff --git a/dbms/src/DataStreams/ParquetBlockOutputStream.h b/dbms/src/DataStreams/ParquetBlockOutputStream.h index c6de4d0affc..896b73fba03 100644 --- a/dbms/src/DataStreams/ParquetBlockOutputStream.h +++ b/dbms/src/DataStreams/ParquetBlockOutputStream.h @@ -22,10 +22,13 @@ private: WriteBuffer & ostr; Block header; - static void fillArrowArrayWithDateColumnData(ColumnPtr write_column, std::shared_ptr & arrow_array); - static void fillArrowArrayWithStringColumnData(ColumnPtr write_column, std::shared_ptr & arrow_array); + static void fillArrowArrayWithDateColumnData(ColumnPtr write_column, std::shared_ptr & arrow_array, + const PaddedPODArray * null_bytemap); + static void fillArrowArrayWithStringColumnData(ColumnPtr write_column, std::shared_ptr & arrow_array, + const PaddedPODArray * null_bytemap); template - static void fillArrowArrayWithNumericColumnData(ColumnPtr write_column, std::shared_ptr & arrow_array); + static void fillArrowArrayWithNumericColumnData(ColumnPtr write_column, std::shared_ptr & arrow_array, + const PaddedPODArray * null_bytemap); static const std::unordered_map> internal_type_to_arrow_type; };