From fa1c35753bae9751f021f298e6f233ea1e6e773e Mon Sep 17 00:00:00 2001 From: FawnD2 Date: Sun, 3 May 2020 03:54:39 +0300 Subject: [PATCH] Move CH -> Arrow routine converter into separated class --- .../Formats/Impl/ArrowColumnToCHColumn.h | 1 + .../Formats/Impl/CHColumnToArrowColumn.cpp | 358 ++++++++++++++++++ .../Formats/Impl/CHColumnToArrowColumn.h | 35 ++ .../Formats/Impl/ParquetBlockOutputFormat.cpp | 343 +---------------- 4 files changed, 398 insertions(+), 339 deletions(-) create mode 100644 src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp create mode 100644 src/Processors/Formats/Impl/CHColumnToArrowColumn.h diff --git a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h index 8611c9117d0..75561f1ce03 100644 --- a/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h +++ b/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h @@ -1,3 +1,4 @@ +#pragma once #include "config_formats.h" #if USE_ARROW || USE_ORC || USE_PARQUET diff --git a/src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp b/src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp new file mode 100644 index 00000000000..9fd6d2d80e4 --- /dev/null +++ b/src/Processors/Formats/Impl/CHColumnToArrowColumn.cpp @@ -0,0 +1,358 @@ +#include "CHColumnToArrowColumn.h" + +#if USE_ARROW || USE_PARQUET + +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + namespace ErrorCodes + { + extern const int UNKNOWN_EXCEPTION; + extern const int UNKNOWN_TYPE; + } + + static const std::initializer_list>> internal_type_to_arrow_type = + { + {"UInt8", arrow::uint8()}, + {"Int8", arrow::int8()}, + {"UInt16", arrow::uint16()}, + {"Int16", arrow::int16()}, + {"UInt32", arrow::uint32()}, + {"Int32", arrow::int32()}, + {"UInt64", arrow::uint64()}, + {"Int64", arrow::int64()}, + {"Float32", arrow::float32()}, + {"Float64", arrow::float64()}, + + //{"Date", arrow::date64()}, + //{"Date", arrow::date32()}, + {"Date", arrow::uint16()}, // CHECK + //{"DateTime", arrow::date64()}, // BUG! saves as date32 + {"DateTime", arrow::uint32()}, + + // TODO: ClickHouse can actually store non-utf8 strings! + {"String", arrow::utf8()}, + {"FixedString", arrow::utf8()}, + }; + + static const PaddedPODArray * extractNullBytemapPtr(ColumnPtr column) + { + ColumnPtr null_column = assert_cast(*column).getNullMapColumnPtr(); + const PaddedPODArray & null_bytemap = assert_cast &>(*null_column).getData(); + return &null_bytemap; + } + + static void checkStatus(arrow::Status & status, const std::string & column_name, const std::string & format_name) + { + if (!status.ok()) + throw Exception{"Error with a " + format_name + " column \"" + column_name + "\": " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION}; + } + + template + static void fillArrowArrayWithNumericColumnData( + ColumnPtr write_column, + std::shared_ptr & arrow_array, + const PaddedPODArray * null_bytemap, + const std::string & format_name) + { + const PaddedPODArray & internal_data = assert_cast &>(*write_column).getData(); + ArrowBuilderType builder; + arrow::Status status; + + const UInt8 * arrow_null_bytemap_raw_ptr = nullptr; + PaddedPODArray arrow_null_bytemap; + if (null_bytemap) + { + /// Invert values since Arrow interprets 1 as a non-null value, while CH as a null + arrow_null_bytemap.reserve(null_bytemap->size()); + for (auto is_null : *null_bytemap) + arrow_null_bytemap.emplace_back(!is_null); + + arrow_null_bytemap_raw_ptr = arrow_null_bytemap.data(); + } + + if constexpr (std::is_same_v) + status = builder.AppendValues( + reinterpret_cast(internal_data.data()), + internal_data.size(), + reinterpret_cast(arrow_null_bytemap_raw_ptr)); + else + status = builder.AppendValues(internal_data.data(), internal_data.size(), reinterpret_cast(arrow_null_bytemap_raw_ptr)); + checkStatus(status, write_column->getName(), format_name); + + status = builder.Finish(&arrow_array); + checkStatus(status, write_column->getName(), format_name); + } + + template + static void fillArrowArrayWithStringColumnData( + ColumnPtr write_column, + std::shared_ptr & arrow_array, + const PaddedPODArray * null_bytemap, + const std::string & format_name) + { + const auto & internal_column = assert_cast(*write_column); + arrow::StringBuilder builder; + arrow::Status status; + + for (size_t string_i = 0, size = internal_column.size(); string_i < size; ++string_i) + { + if (null_bytemap && (*null_bytemap)[string_i]) + { + status = builder.AppendNull(); + } + else + { + StringRef string_ref = internal_column.getDataAt(string_i); + status = builder.Append(string_ref.data, string_ref.size); + } + + checkStatus(status, write_column->getName(), format_name); + } + + status = builder.Finish(&arrow_array); + checkStatus(status, write_column->getName(), format_name); + } + + static void fillArrowArrayWithDateColumnData( + ColumnPtr write_column, + std::shared_ptr & arrow_array, + const PaddedPODArray * null_bytemap, + const std::string & format_name) + { + const PaddedPODArray & internal_data = assert_cast &>(*write_column).getData(); + //arrow::Date32Builder date_builder; + arrow::UInt16Builder builder; + arrow::Status status; + + for (size_t value_i = 0, size = internal_data.size(); value_i < size; ++value_i) + { + if (null_bytemap && (*null_bytemap)[value_i]) + status = builder.AppendNull(); + else + /// Implicitly converts UInt16 to Int32 + status = builder.Append(internal_data[value_i]); + checkStatus(status, write_column->getName(), format_name); + } + + status = builder.Finish(&arrow_array); + checkStatus(status, write_column->getName(), format_name); + } + + static void fillArrowArrayWithDateTimeColumnData( + ColumnPtr write_column, + std::shared_ptr & arrow_array, + const PaddedPODArray * null_bytemap, + const std::string & format_name) + { + const auto & internal_data = assert_cast &>(*write_column).getData(); + //arrow::Date64Builder builder; + arrow::UInt32Builder builder; + arrow::Status status; + + for (size_t value_i = 0, size = internal_data.size(); value_i < size; ++value_i) + { + if (null_bytemap && (*null_bytemap)[value_i]) + status = builder.AppendNull(); + else + /// Implicitly converts UInt16 to Int32 + //status = date_builder.Append(static_cast(internal_data[value_i]) * 1000); // now ms. TODO check other units + status = builder.Append(internal_data[value_i]); + + checkStatus(status, write_column->getName(), format_name); + } + + status = builder.Finish(&arrow_array); + checkStatus(status, write_column->getName(), format_name); + } + + template + static void fillArrowArrayWithDecimalColumnData( + ColumnPtr write_column, + std::shared_ptr & arrow_array, + const PaddedPODArray * null_bytemap, + const DataType * decimal_type, + const std::string & format_name) + { + const auto & column = static_cast(*write_column); + arrow::DecimalBuilder builder(arrow::decimal(decimal_type->getPrecision(), decimal_type->getScale())); + arrow::Status status; + + for (size_t value_i = 0, size = column.size(); value_i < size; ++value_i) + { + if (null_bytemap && (*null_bytemap)[value_i]) + status = builder.AppendNull(); + else + status = builder.Append( + arrow::Decimal128(reinterpret_cast(&column.getElement(value_i).value))); // TODO: try copy column + + checkStatus(status, write_column->getName(), format_name); + } + status = builder.Finish(&arrow_array); + checkStatus(status, write_column->getName(), format_name); + + /* TODO column copy + const auto & internal_data = static_cast(*write_column).getData(); + //ArrowBuilderType numeric_builder; + arrow::DecimalBuilder builder(arrow::decimal(decimal_type->getPrecision(), decimal_type->getScale())); + arrow::Status status; + + const uint8_t * arrow_null_bytemap_raw_ptr = nullptr; + PaddedPODArray arrow_null_bytemap; + if (null_bytemap) + { + /// Invert values since Arrow interprets 1 as a non-null value, while CH as a null + arrow_null_bytemap.reserve(null_bytemap->size()); + for (size_t i = 0, size = null_bytemap->size(); i < size; ++i) + arrow_null_bytemap.emplace_back(1 ^ (*null_bytemap)[i]); + + arrow_null_bytemap_raw_ptr = arrow_null_bytemap.data(); + } + + if constexpr (std::is_same_v) + status = builder.AppendValues( + reinterpret_cast(internal_data.data()), + internal_data.size(), + reinterpret_cast(arrow_null_bytemap_raw_ptr)); + else + status = builder.AppendValues(internal_data.data(), internal_data.size(), reinterpret_cast(arrow_null_bytemap_raw_ptr)); + checkStatus(status, write_column->getName(), format_name); + + status = builder.Finish(&arrow_array); + checkStatus(status, write_column->getName(), format_name); + */ + } + + void CHColumnToArrowColumn::CHChunkToArrowTable( + std::shared_ptr & res, + const Block & header, + const Chunk & chunk, + size_t columns_num, + std::string format_name) + { + /// For arrow::Schema and arrow::Table creation + std::vector> arrow_fields; + std::vector> arrow_arrays; + arrow_fields.reserve(columns_num); + arrow_arrays.reserve(columns_num); + + for (size_t column_i = 0; column_i < columns_num; ++column_i) + { + // TODO: constructed every iteration + ColumnWithTypeAndName column = header.safeGetByPosition(column_i); + column.column = chunk.getColumns()[column_i]; + + const bool is_column_nullable = column.type->isNullable(); + const auto & column_nested_type + = is_column_nullable ? static_cast(column.type.get())->getNestedType() : column.type; + const std::string column_nested_type_name = column_nested_type->getFamilyName(); + + if (isDecimal(column_nested_type)) + { + const auto add_decimal_field = [&](const auto & types) -> bool { + using Types = std::decay_t; + using ToDataType = typename Types::LeftType; + + if constexpr ( + std::is_same_v> + || std::is_same_v> + || std::is_same_v>) + { + const auto & decimal_type = static_cast(column_nested_type.get()); + arrow_fields.emplace_back(std::make_shared( + column.name, arrow::decimal(decimal_type->getPrecision(), decimal_type->getScale()), is_column_nullable)); + } + + return false; + }; + callOnIndexAndDataType(column_nested_type->getTypeId(), add_decimal_field); + } + else + { + if (const auto * arrow_type_it = std::find_if(internal_type_to_arrow_type.begin(), internal_type_to_arrow_type.end(), + [=](auto && elem) { return elem.first == column_nested_type_name; }); + arrow_type_it != internal_type_to_arrow_type.end()) + { + arrow_fields.emplace_back(std::make_shared(column.name, arrow_type_it->second, is_column_nullable)); + } else + { + throw Exception{"The type \"" + column_nested_type_name + "\" of a column \"" + column.name + "\"" + " is not supported for conversion into a " + format_name + " data format", + ErrorCodes::UNKNOWN_TYPE}; + } + } + + ColumnPtr nested_column + = is_column_nullable ? assert_cast(*column.column).getNestedColumnPtr() : column.column; + const PaddedPODArray * null_bytemap = is_column_nullable ? extractNullBytemapPtr(column.column) : nullptr; + + std::shared_ptr arrow_array; + + if ("String" == column_nested_type_name) + { + fillArrowArrayWithStringColumnData(nested_column, arrow_array, null_bytemap, format_name); + } + else if ("FixedString" == column_nested_type_name) + { + fillArrowArrayWithStringColumnData(nested_column, arrow_array, null_bytemap, format_name); + } + else if ("Date" == column_nested_type_name) + { + fillArrowArrayWithDateColumnData(nested_column, arrow_array, null_bytemap, format_name); + } + else if ("DateTime" == column_nested_type_name) + { + fillArrowArrayWithDateTimeColumnData(nested_column, arrow_array, null_bytemap, format_name); + } + else if (isDecimal(column_nested_type)) + { + auto fill_decimal = [&](const auto & types) -> bool + { + using Types = std::decay_t; + using ToDataType = typename Types::LeftType; + if constexpr ( + std::is_same_v> + || std::is_same_v> + || std::is_same_v>) + { + const auto & decimal_type = static_cast(column_nested_type.get()); + fillArrowArrayWithDecimalColumnData(nested_column, arrow_array, null_bytemap, decimal_type, format_name); + } + return false; + }; + callOnIndexAndDataType(column_nested_type->getTypeId(), fill_decimal); + } +#define DISPATCH(CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE) \ + else if (#CPP_NUMERIC_TYPE == column_nested_type_name) \ + { \ + fillArrowArrayWithNumericColumnData(nested_column, arrow_array, null_bytemap, format_name); \ + } + + FOR_INTERNAL_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + else + { + throw Exception{"Internal type \"" + column_nested_type_name + "\" of a column \"" + column.name + "\"" + " is not supported for conversion into a " + format_name + " data format", + ErrorCodes::UNKNOWN_TYPE}; + } + + arrow_arrays.emplace_back(std::move(arrow_array)); + } + + std::shared_ptr arrow_schema = std::make_shared(std::move(arrow_fields)); + + res = arrow::Table::Make(arrow_schema, arrow_arrays); + } +} + +#endif diff --git a/src/Processors/Formats/Impl/CHColumnToArrowColumn.h b/src/Processors/Formats/Impl/CHColumnToArrowColumn.h new file mode 100644 index 00000000000..a2186d0d1a0 --- /dev/null +++ b/src/Processors/Formats/Impl/CHColumnToArrowColumn.h @@ -0,0 +1,35 @@ +#pragma once +#include "config_formats.h" + +#if USE_ARROW || USE_PARQUET + +#include +#include +#include + +namespace DB +{ + +class CHColumnToArrowColumn +{ +private: + +#define FOR_INTERNAL_NUMERIC_TYPES(M) \ + M(UInt8, arrow::UInt8Builder) \ + M(Int8, arrow::Int8Builder) \ + M(UInt16, arrow::UInt16Builder) \ + M(Int16, arrow::Int16Builder) \ + M(UInt32, arrow::UInt32Builder) \ + M(Int32, arrow::Int32Builder) \ + M(UInt64, arrow::UInt64Builder) \ + M(Int64, arrow::Int64Builder) \ + M(Float32, arrow::FloatBuilder) \ + M(Float64, arrow::DoubleBuilder) + + +public: + static void CHChunkToArrowTable(std::shared_ptr & res, const Block & header, const Chunk & chunk, + size_t columns_num, std::string format_name); +}; +} +#endif diff --git a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp index 2681d862c25..6a5c435f88d 100644 --- a/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp @@ -3,28 +3,19 @@ #if USE_PARQUET // TODO: clean includes -#include -#include -#include #include #include -#include #include -#include #include -#include -#include -#include #include #include #include #include -#include #include #include #include -#include #include +#include "CHColumnToArrowColumn.h" namespace DB @@ -40,217 +31,6 @@ ParquetBlockOutputFormat::ParquetBlockOutputFormat(WriteBuffer & out_, const Blo { } -static void checkStatus(arrow::Status & status, const std::string & column_name) -{ - if (!status.ok()) - throw Exception{"Error with a parquet column \"" + column_name + "\": " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION}; -} - -template -static void fillArrowArrayWithNumericColumnData( - ColumnPtr write_column, std::shared_ptr & arrow_array, const PaddedPODArray * null_bytemap) -{ - const PaddedPODArray & internal_data = assert_cast &>(*write_column).getData(); - ArrowBuilderType builder; - arrow::Status status; - - const UInt8 * arrow_null_bytemap_raw_ptr = nullptr; - PaddedPODArray arrow_null_bytemap; - if (null_bytemap) - { - /// Invert values since Arrow interprets 1 as a non-null value, while CH as a null - arrow_null_bytemap.reserve(null_bytemap->size()); - for (auto is_null : *null_bytemap) - arrow_null_bytemap.emplace_back(!is_null); - - arrow_null_bytemap_raw_ptr = arrow_null_bytemap.data(); - } - - if constexpr (std::is_same_v) - status = builder.AppendValues( - reinterpret_cast(internal_data.data()), - internal_data.size(), - reinterpret_cast(arrow_null_bytemap_raw_ptr)); - else - status = builder.AppendValues(internal_data.data(), internal_data.size(), reinterpret_cast(arrow_null_bytemap_raw_ptr)); - checkStatus(status, write_column->getName()); - - status = builder.Finish(&arrow_array); - checkStatus(status, write_column->getName()); -} - -template -static void fillArrowArrayWithStringColumnData( - ColumnPtr write_column, std::shared_ptr & arrow_array, const PaddedPODArray * null_bytemap) -{ - const auto & internal_column = assert_cast(*write_column); - arrow::StringBuilder builder; - arrow::Status status; - - for (size_t string_i = 0, size = internal_column.size(); string_i < size; ++string_i) - { - if (null_bytemap && (*null_bytemap)[string_i]) - { - status = builder.AppendNull(); - } - else - { - StringRef string_ref = internal_column.getDataAt(string_i); - status = builder.Append(string_ref.data, string_ref.size); - } - - checkStatus(status, write_column->getName()); - } - - status = builder.Finish(&arrow_array); - checkStatus(status, write_column->getName()); -} - -static void fillArrowArrayWithDateColumnData( - ColumnPtr write_column, std::shared_ptr & arrow_array, const PaddedPODArray * null_bytemap) -{ - const PaddedPODArray & internal_data = assert_cast &>(*write_column).getData(); - //arrow::Date32Builder date_builder; - arrow::UInt16Builder builder; - arrow::Status status; - - for (size_t value_i = 0, size = internal_data.size(); value_i < size; ++value_i) - { - if (null_bytemap && (*null_bytemap)[value_i]) - status = builder.AppendNull(); - else - /// Implicitly converts UInt16 to Int32 - status = builder.Append(internal_data[value_i]); - checkStatus(status, write_column->getName()); - } - - status = builder.Finish(&arrow_array); - checkStatus(status, write_column->getName()); -} - -static void fillArrowArrayWithDateTimeColumnData( - ColumnPtr write_column, std::shared_ptr & arrow_array, const PaddedPODArray * null_bytemap) -{ - const auto & internal_data = assert_cast &>(*write_column).getData(); - //arrow::Date64Builder builder; - arrow::UInt32Builder builder; - arrow::Status status; - - for (size_t value_i = 0, size = internal_data.size(); value_i < size; ++value_i) - { - if (null_bytemap && (*null_bytemap)[value_i]) - status = builder.AppendNull(); - else - /// Implicitly converts UInt16 to Int32 - //status = date_builder.Append(static_cast(internal_data[value_i]) * 1000); // now ms. TODO check other units - status = builder.Append(internal_data[value_i]); - - checkStatus(status, write_column->getName()); - } - - status = builder.Finish(&arrow_array); - checkStatus(status, write_column->getName()); -} - -template -static void fillArrowArrayWithDecimalColumnData( - ColumnPtr write_column, - std::shared_ptr & arrow_array, - const PaddedPODArray * null_bytemap, - const DataType * decimal_type) -{ - const auto & column = static_cast(*write_column); - arrow::DecimalBuilder builder(arrow::decimal(decimal_type->getPrecision(), decimal_type->getScale())); - arrow::Status status; - - for (size_t value_i = 0, size = column.size(); value_i < size; ++value_i) - { - if (null_bytemap && (*null_bytemap)[value_i]) - status = builder.AppendNull(); - else - status = builder.Append( - arrow::Decimal128(reinterpret_cast(&column.getElement(value_i).value))); // TODO: try copy column - - checkStatus(status, write_column->getName()); - } - status = builder.Finish(&arrow_array); - checkStatus(status, write_column->getName()); - -/* TODO column copy - const auto & internal_data = static_cast(*write_column).getData(); - //ArrowBuilderType numeric_builder; - arrow::DecimalBuilder builder(arrow::decimal(decimal_type->getPrecision(), decimal_type->getScale())); - arrow::Status status; - - const uint8_t * arrow_null_bytemap_raw_ptr = nullptr; - PaddedPODArray arrow_null_bytemap; - if (null_bytemap) - { - /// Invert values since Arrow interprets 1 as a non-null value, while CH as a null - arrow_null_bytemap.reserve(null_bytemap->size()); - for (size_t i = 0, size = null_bytemap->size(); i < size; ++i) - arrow_null_bytemap.emplace_back(1 ^ (*null_bytemap)[i]); - - arrow_null_bytemap_raw_ptr = arrow_null_bytemap.data(); - } - - if constexpr (std::is_same_v) - status = builder.AppendValues( - reinterpret_cast(internal_data.data()), - internal_data.size(), - reinterpret_cast(arrow_null_bytemap_raw_ptr)); - else - status = builder.AppendValues(internal_data.data(), internal_data.size(), reinterpret_cast(arrow_null_bytemap_raw_ptr)); - checkStatus(status, write_column->getName()); - - status = builder.Finish(&arrow_array); - checkStatus(status, write_column->getName()); -*/ -} - -#define FOR_INTERNAL_NUMERIC_TYPES(M) \ - M(UInt8, arrow::UInt8Builder) \ - M(Int8, arrow::Int8Builder) \ - M(UInt16, arrow::UInt16Builder) \ - M(Int16, arrow::Int16Builder) \ - M(UInt32, arrow::UInt32Builder) \ - M(Int32, arrow::Int32Builder) \ - M(UInt64, arrow::UInt64Builder) \ - M(Int64, arrow::Int64Builder) \ - M(Float32, arrow::FloatBuilder) \ - M(Float64, arrow::DoubleBuilder) - -const std::unordered_map> internal_type_to_arrow_type = { - {"UInt8", arrow::uint8()}, - {"Int8", arrow::int8()}, - {"UInt16", arrow::uint16()}, - {"Int16", arrow::int16()}, - {"UInt32", arrow::uint32()}, - {"Int32", arrow::int32()}, - {"UInt64", arrow::uint64()}, - {"Int64", arrow::int64()}, - {"Float32", arrow::float32()}, - {"Float64", arrow::float64()}, - - //{"Date", arrow::date64()}, - //{"Date", arrow::date32()}, - {"Date", arrow::uint16()}, // CHECK - //{"DateTime", arrow::date64()}, // BUG! saves as date32 - {"DateTime", arrow::uint32()}, - - // TODO: ClickHouse can actually store non-utf8 strings! - {"String", arrow::utf8()}, - {"FixedString", arrow::utf8()}, -}; - -static const PaddedPODArray * extractNullBytemapPtr(ColumnPtr column) -{ - ColumnPtr null_column = assert_cast(*column).getNullMapColumnPtr(); - const PaddedPODArray & null_bytemap = assert_cast &>(*null_column).getData(); - return &null_bytemap; -} - - class OstreamOutputStream : public arrow::io::OutputStream { public: @@ -291,131 +71,16 @@ private: void ParquetBlockOutputFormat::consume(Chunk chunk) { - const auto & header = getPort(PortKind::Main).getHeader(); + const Block & header = getPort(PortKind::Main).getHeader(); const size_t columns_num = chunk.getNumColumns(); + std::shared_ptr arrow_table; - /// For arrow::Schema and arrow::Table creation - std::vector> arrow_fields; - std::vector> arrow_arrays; - arrow_fields.reserve(columns_num); - arrow_arrays.reserve(columns_num); - - for (size_t column_i = 0; column_i < columns_num; ++column_i) - { - // TODO: constructed every iteration - ColumnWithTypeAndName column = header.safeGetByPosition(column_i); - column.column = chunk.getColumns()[column_i]; - - const bool is_column_nullable = column.type->isNullable(); - const auto & column_nested_type - = is_column_nullable ? static_cast(column.type.get())->getNestedType() : column.type; - const std::string column_nested_type_name = column_nested_type->getFamilyName(); - - if (isDecimal(column_nested_type)) - { - const auto add_decimal_field = [&](const auto & types) -> bool { - using Types = std::decay_t; - using ToDataType = typename Types::LeftType; - - if constexpr ( - std::is_same_v< - ToDataType, - DataTypeDecimal< - Decimal32>> || std::is_same_v> || std::is_same_v>) - { - const auto & decimal_type = static_cast(column_nested_type.get()); - arrow_fields.emplace_back(std::make_shared( - column.name, arrow::decimal(decimal_type->getPrecision(), decimal_type->getScale()), is_column_nullable)); - } - - return false; - }; - callOnIndexAndDataType(column_nested_type->getTypeId(), add_decimal_field); - } - else - { - if (internal_type_to_arrow_type.find(column_nested_type_name) == internal_type_to_arrow_type.end()) - { - throw Exception{"The type \"" + column_nested_type_name + "\" of a column \"" + column.name - + "\"" - " is not supported for conversion into a Parquet data format", - ErrorCodes::UNKNOWN_TYPE}; - } - - arrow_fields.emplace_back(std::make_shared(column.name, internal_type_to_arrow_type.at(column_nested_type_name), is_column_nullable)); - } - - std::shared_ptr arrow_array; - - ColumnPtr nested_column - = is_column_nullable ? assert_cast(*column.column).getNestedColumnPtr() : column.column; - const PaddedPODArray * null_bytemap = is_column_nullable ? extractNullBytemapPtr(column.column) : nullptr; - - if ("String" == column_nested_type_name) - { - fillArrowArrayWithStringColumnData(nested_column, arrow_array, null_bytemap); - } - else if ("FixedString" == column_nested_type_name) - { - fillArrowArrayWithStringColumnData(nested_column, arrow_array, null_bytemap); - } - else if ("Date" == column_nested_type_name) - { - fillArrowArrayWithDateColumnData(nested_column, arrow_array, null_bytemap); - } - else if ("DateTime" == column_nested_type_name) - { - fillArrowArrayWithDateTimeColumnData(nested_column, arrow_array, null_bytemap); - } - - else if (isDecimal(column_nested_type)) - { - auto fill_decimal = [&](const auto & types) -> bool - { - using Types = std::decay_t; - using ToDataType = typename Types::LeftType; - if constexpr ( - std::is_same_v< - ToDataType, - DataTypeDecimal< - Decimal32>> || std::is_same_v> || std::is_same_v>) - { - const auto & decimal_type = static_cast(column_nested_type.get()); - fillArrowArrayWithDecimalColumnData(nested_column, arrow_array, null_bytemap, decimal_type); - } - return false; - }; - callOnIndexAndDataType(column_nested_type->getTypeId(), fill_decimal); - } -#define DISPATCH(CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE) \ - else if (#CPP_NUMERIC_TYPE == column_nested_type_name) \ - { \ - fillArrowArrayWithNumericColumnData(nested_column, arrow_array, null_bytemap); \ - } - - FOR_INTERNAL_NUMERIC_TYPES(DISPATCH) -#undef DISPATCH - else - { - throw Exception{"Internal type \"" + column_nested_type_name + "\" of a column \"" + column.name - + "\"" - " is not supported for conversion into a Parquet data format", - ErrorCodes::UNKNOWN_TYPE}; - } - - - arrow_arrays.emplace_back(std::move(arrow_array)); - } - - std::shared_ptr arrow_schema = std::make_shared(std::move(arrow_fields)); - - std::shared_ptr arrow_table = arrow::Table::Make(arrow_schema, arrow_arrays); + CHColumnToArrowColumn::CHChunkToArrowTable(arrow_table, header, chunk, columns_num, "Parquet"); auto sink = std::make_shared(out); if (!file_writer) { - parquet::WriterProperties::Builder builder; #if USE_SNAPPY builder.compression(parquet::Compression::SNAPPY);