support datetime64 when transform ch chunk to arrow table

This commit is contained in:
taiyang-li 2022-02-13 14:56:01 +08:00
parent 0ed56f0255
commit 6559941972

View File

@ -2,6 +2,7 @@
#if USE_ARROW || USE_PARQUET
// #include <base/Decimal.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
@ -17,6 +18,7 @@
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <Processors/Formats/IOutputFormat.h>
#include <arrow/api.h>
#include <arrow/builder.h>
@ -55,6 +57,7 @@ namespace DB
extern const int UNKNOWN_EXCEPTION;
extern const int UNKNOWN_TYPE;
extern const int LOGICAL_ERROR;
extern const int DECIMAL_OVERFLOW;
}
static const std::initializer_list<std::pair<String, std::shared_ptr<arrow::DataType>>> internal_type_to_arrow_type =
@ -120,6 +123,42 @@ namespace DB
checkStatus(status, write_column->getName(), format_name);
}
static void fillArrowArrayWithDateTime64ColumnData(
const DataTypeDateTime64 * type,
ColumnPtr write_column,
const PaddedPODArray<UInt8> * null_bytemap,
const String & format_name,
arrow::ArrayBuilder* array_builder,
size_t start,
size_t end)
{
const auto & column = assert_cast<const ColumnDecimal<DateTime64> &>(*write_column);
arrow::TimestampBuilder & builder = assert_cast<arrow::TimestampBuilder &>(*array_builder);
arrow::Status status;
auto scale = type->getScale();
bool need_rescale = scale % 3;
auto rescale_multiplier = DecimalUtils::scaleMultiplier<DateTime64::NativeType>(3 - scale % 3);
for (size_t value_i = start; value_i < end; ++value_i)
{
if (null_bytemap && (*null_bytemap)[value_i])
{
status = builder.AppendNull();
}
else
{
auto value = static_cast<Int64>(column[value_i].get<DecimalField<DateTime64>>().getValue());
if (need_rescale)
{
if (common::mulOverflow(value, rescale_multiplier, value))
throw Exception("Decimal math overflow", ErrorCodes::DECIMAL_OVERFLOW);
}
status = builder.Append(value);
}
checkStatus(status, write_column->getName(), format_name);
}
}
static void fillArrowArray(
const String & column_name,
ColumnPtr & column,
@ -480,6 +519,11 @@ namespace DB
if (!callOnIndexAndDataType<void>(column_type->getTypeId(), fill_decimal))
throw Exception{ErrorCodes::LOGICAL_ERROR, "Cannot fill arrow array with decimal data with type {}", column_type_name};
}
else if (isDateTime64(column_type))
{
const auto * datetime64_type = assert_cast<const DataTypeDateTime64 *>(column_type.get());
fillArrowArrayWithDateTime64ColumnData(datetime64_type, column, null_bytemap, format_name, array_builder, start, end);
}
#define DISPATCH(CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE) \
else if (#CPP_NUMERIC_TYPE == column_type_name) \
{ \
@ -546,6 +590,23 @@ namespace DB
}
}
static arrow::TimeUnit::type getArrowTimeUnit(const DataTypeDateTime64 * type)
{
UInt32 scale = type->getScale();
if (scale == 0)
return arrow::TimeUnit::SECOND;
if (scale > 0 && scale <= 3)
return arrow::TimeUnit::MILLI;
if (scale > 3 && scale <= 6)
return arrow::TimeUnit::MICRO;
return arrow::TimeUnit::NANO;
}
static String getArrowTimeZone(const DataTypeDateTime64 * type)
{
return type->getTimeZone().getTimeZone();
}
static std::shared_ptr<arrow::DataType> getArrowType(
DataTypePtr column_type, ColumnPtr column, const std::string & column_name, const std::string & format_name, bool * out_is_column_nullable)
{
@ -638,6 +699,12 @@ namespace DB
return arrow_type_it->second;
}
if (isDateTime64(column_type))
{
const auto * datetime64_type = assert_cast<const DataTypeDateTime64 *>(column_type.get());
return arrow::timestamp(getArrowTimeUnit(datetime64_type), getArrowTimeZone(datetime64_type));
}
throw Exception(ErrorCodes::UNKNOWN_TYPE,
"The type '{}' of a column '{}' is not supported for conversion into {} data format.",
column_type->getName(), column_name, format_name);