Merge pull request #61984 from kitaisreal/arrow-schema-to-clickhouse-schema-nullable-fix

Arrow schema to ClickHouse schema Nullable fix
This commit is contained in:
Kruglov Pavel 2024-04-08 12:53:49 +02:00 committed by GitHub
commit a2cfc80a39
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 367 additions and 135 deletions

View File

@ -86,7 +86,7 @@ Chunk ArrowBlockInputFormat::read()
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr;
arrow_column_to_ch_column->arrowTableToCHChunk(res, *table_result, (*table_result)->num_rows(), block_missing_values_ptr);
res = arrow_column_to_ch_column->arrowTableToCHChunk(*table_result, (*table_result)->num_rows(), block_missing_values_ptr);
/// There is no easy way to get original record batch size from Arrow metadata.
/// Let's just use the number of bytes read from read buffer.

View File

@ -78,7 +78,7 @@ namespace ErrorCodes
/// Inserts numeric data right into internal column data to reduce an overhead
template <typename NumericType, typename VectorType = ColumnVector<NumericType>>
static ColumnWithTypeAndName readColumnWithNumericData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithNumericData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
auto internal_type = std::make_shared<DataTypeNumber<NumericType>>();
auto internal_column = internal_type->createColumn();
@ -103,7 +103,7 @@ static ColumnWithTypeAndName readColumnWithNumericData(std::shared_ptr<arrow::Ch
/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars.
/// Also internal strings are null terminated.
template <typename ArrowArray>
static ColumnWithTypeAndName readColumnWithStringData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithStringData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
auto internal_type = std::make_shared<DataTypeString>();
auto internal_column = internal_type->createColumn();
@ -147,7 +147,7 @@ static ColumnWithTypeAndName readColumnWithStringData(std::shared_ptr<arrow::Chu
return {std::move(internal_column), std::move(internal_type), column_name};
}
static ColumnWithTypeAndName readColumnWithFixedStringData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithFixedStringData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
const auto * fixed_type = assert_cast<arrow::FixedSizeBinaryType *>(arrow_column->type().get());
size_t fixed_len = fixed_type->byte_width();
@ -166,7 +166,7 @@ static ColumnWithTypeAndName readColumnWithFixedStringData(std::shared_ptr<arrow
}
template <typename ValueType>
static ColumnWithTypeAndName readColumnWithBigIntegerFromFixedBinaryData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name, const DataTypePtr & column_type)
static ColumnWithTypeAndName readColumnWithBigIntegerFromFixedBinaryData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name, const DataTypePtr & column_type)
{
const auto * fixed_type = assert_cast<arrow::FixedSizeBinaryType *>(arrow_column->type().get());
size_t fixed_len = fixed_type->byte_width();
@ -193,7 +193,7 @@ static ColumnWithTypeAndName readColumnWithBigIntegerFromFixedBinaryData(std::sh
}
template <typename ColumnType, typename ValueType = typename ColumnType::ValueType>
static ColumnWithTypeAndName readColumnWithBigNumberFromBinaryData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name, const DataTypePtr & column_type)
static ColumnWithTypeAndName readColumnWithBigNumberFromBinaryData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name, const DataTypePtr & column_type)
{
size_t total_size = 0;
for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i)
@ -229,7 +229,7 @@ static ColumnWithTypeAndName readColumnWithBigNumberFromBinaryData(std::shared_p
return {std::move(internal_column), column_type, column_name};
}
static ColumnWithTypeAndName readColumnWithBooleanData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithBooleanData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
auto internal_type = DataTypeFactory::instance().get("Bool");
auto internal_column = internal_type->createColumn();
@ -248,7 +248,7 @@ static ColumnWithTypeAndName readColumnWithBooleanData(std::shared_ptr<arrow::Ch
return {std::move(internal_column), internal_type, column_name};
}
static ColumnWithTypeAndName readColumnWithDate32Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name,
static ColumnWithTypeAndName readColumnWithDate32Data(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name,
const DataTypePtr & type_hint, FormatSettings::DateTimeOverflowBehavior date_time_overflow_behavior)
{
DataTypePtr internal_type;
@ -310,7 +310,7 @@ static ColumnWithTypeAndName readColumnWithDate32Data(std::shared_ptr<arrow::Chu
}
/// Arrow stores Parquet::DATETIME in Int64, while ClickHouse stores DateTime in UInt32. Therefore, it should be checked before saving
static ColumnWithTypeAndName readColumnWithDate64Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithDate64Data(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
auto internal_type = std::make_shared<DataTypeDateTime>();
auto internal_column = internal_type->createColumn();
@ -329,7 +329,7 @@ static ColumnWithTypeAndName readColumnWithDate64Data(std::shared_ptr<arrow::Chu
return {std::move(internal_column), std::move(internal_type), column_name};
}
static ColumnWithTypeAndName readColumnWithTimestampData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithTimestampData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
const auto & arrow_type = static_cast<const arrow::TimestampType &>(*(arrow_column->type()));
const UInt8 scale = arrow_type.unit() * 3;
@ -350,7 +350,7 @@ static ColumnWithTypeAndName readColumnWithTimestampData(std::shared_ptr<arrow::
}
template <typename TimeType, typename TimeArray>
static ColumnWithTypeAndName readColumnWithTimeData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithTimeData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
const auto & arrow_type = static_cast<const TimeType &>(*(arrow_column->type()));
const UInt8 scale = arrow_type.unit() * 3;
@ -373,18 +373,18 @@ static ColumnWithTypeAndName readColumnWithTimeData(std::shared_ptr<arrow::Chunk
return {std::move(internal_column), std::move(internal_type), column_name};
}
static ColumnWithTypeAndName readColumnWithTime32Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithTime32Data(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
return readColumnWithTimeData<arrow::Time32Type, arrow::Time32Array>(arrow_column, column_name);
}
static ColumnWithTypeAndName readColumnWithTime64Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithTime64Data(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
return readColumnWithTimeData<arrow::Time64Type, arrow::Time64Array>(arrow_column, column_name);
}
template <typename DecimalType, typename DecimalArray>
static ColumnWithTypeAndName readColumnWithDecimalDataImpl(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name, DataTypePtr internal_type)
static ColumnWithTypeAndName readColumnWithDecimalDataImpl(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name, DataTypePtr internal_type)
{
auto internal_column = internal_type->createColumn();
auto & column = assert_cast<ColumnDecimal<DecimalType> &>(*internal_column);
@ -403,7 +403,7 @@ static ColumnWithTypeAndName readColumnWithDecimalDataImpl(std::shared_ptr<arrow
}
template <typename DecimalArray>
static ColumnWithTypeAndName readColumnWithDecimalData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readColumnWithDecimalData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
const auto * arrow_decimal_type = static_cast<arrow::DecimalType *>(arrow_column->type().get());
size_t precision = arrow_decimal_type->precision();
@ -418,7 +418,7 @@ static ColumnWithTypeAndName readColumnWithDecimalData(std::shared_ptr<arrow::Ch
}
/// Creates a null bytemap from arrow's null bitmap
static ColumnPtr readByteMapFromArrowColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
static ColumnPtr readByteMapFromArrowColumn(const std::shared_ptr<arrow::ChunkedArray> & arrow_column)
{
if (!arrow_column->null_count())
return ColumnUInt8::create(arrow_column->length(), 0);
@ -453,7 +453,7 @@ struct ArrowOffsetArray<arrow::LargeListArray>
};
template <typename ArrowListArray>
static ColumnPtr readOffsetsFromArrowListColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
static ColumnPtr readOffsetsFromArrowListColumn(const std::shared_ptr<arrow::ChunkedArray> & arrow_column)
{
auto offsets_column = ColumnUInt64::create();
ColumnArray::Offsets & offsets_data = assert_cast<ColumnVector<UInt64> &>(*offsets_column).getData();
@ -463,7 +463,7 @@ static ColumnPtr readOffsetsFromArrowListColumn(std::shared_ptr<arrow::ChunkedAr
{
ArrowListArray & list_chunk = dynamic_cast<ArrowListArray &>(*(arrow_column->chunk(chunk_i)));
auto arrow_offsets_array = list_chunk.offsets();
auto & arrow_offsets = dynamic_cast<ArrowOffsetArray<ArrowListArray>::type &>(*arrow_offsets_array);
auto & arrow_offsets = dynamic_cast<typename ArrowOffsetArray<ArrowListArray>::type &>(*arrow_offsets_array);
/*
* CH uses element size as "offsets", while arrow uses actual offsets as offsets.
@ -620,7 +620,7 @@ static ColumnPtr readColumnWithIndexesData(std::shared_ptr<arrow::ChunkedArray>
}
template <typename ArrowListArray>
static std::shared_ptr<arrow::ChunkedArray> getNestedArrowColumn(std::shared_ptr<arrow::ChunkedArray> & arrow_column)
static std::shared_ptr<arrow::ChunkedArray> getNestedArrowColumn(const std::shared_ptr<arrow::ChunkedArray> & arrow_column)
{
arrow::ArrayVector array_vector;
array_vector.reserve(arrow_column->num_chunks());
@ -648,7 +648,7 @@ static std::shared_ptr<arrow::ChunkedArray> getNestedArrowColumn(std::shared_ptr
return std::make_shared<arrow::ChunkedArray>(array_vector);
}
static ColumnWithTypeAndName readIPv6ColumnFromBinaryData(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readIPv6ColumnFromBinaryData(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
size_t total_size = 0;
for (int chunk_i = 0, num_chunks = arrow_column->num_chunks(); chunk_i < num_chunks; ++chunk_i)
@ -684,7 +684,7 @@ static ColumnWithTypeAndName readIPv6ColumnFromBinaryData(std::shared_ptr<arrow:
return {std::move(internal_column), std::move(internal_type), column_name};
}
static ColumnWithTypeAndName readIPv4ColumnWithInt32Data(std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
static ColumnWithTypeAndName readIPv4ColumnWithInt32Data(const std::shared_ptr<arrow::ChunkedArray> & arrow_column, const String & column_name)
{
auto internal_type = std::make_shared<DataTypeIPv4>();
auto internal_column = internal_type->createColumn();
@ -705,35 +705,31 @@ static ColumnWithTypeAndName readIPv4ColumnWithInt32Data(std::shared_ptr<arrow::
return {std::move(internal_column), std::move(internal_type), column_name};
}
static ColumnWithTypeAndName readColumnFromArrowColumn(
std::shared_ptr<arrow::ChunkedArray> & arrow_column,
const std::string & column_name,
const std::string & format_name,
bool is_nullable,
std::unordered_map<String, ArrowColumnToCHColumn::DictionaryInfo> & dictionary_infos,
bool allow_null_type,
bool skip_columns_with_unsupported_types,
bool & skipped,
FormatSettings::DateTimeOverflowBehavior date_time_overflow_behavior = FormatSettings::DateTimeOverflowBehavior::Ignore,
DataTypePtr type_hint = nullptr,
bool is_map_nested = false)
struct ReadColumnFromArrowColumnSettings
{
if (!is_nullable && (arrow_column->null_count() || (type_hint && type_hint->isNullable())) && arrow_column->type()->id() != arrow::Type::LIST
&& arrow_column->type()->id() != arrow::Type::MAP && arrow_column->type()->id() != arrow::Type::STRUCT &&
arrow_column->type()->id() != arrow::Type::DICTIONARY)
{
DataTypePtr nested_type_hint;
if (type_hint)
nested_type_hint = removeNullable(type_hint);
auto nested_column = readColumnFromArrowColumn(arrow_column, column_name, format_name, true, dictionary_infos, allow_null_type, skip_columns_with_unsupported_types, skipped, date_time_overflow_behavior, nested_type_hint);
if (skipped)
return {};
auto nullmap_column = readByteMapFromArrowColumn(arrow_column);
auto nullable_type = std::make_shared<DataTypeNullable>(std::move(nested_column.type));
auto nullable_column = ColumnNullable::create(nested_column.column, nullmap_column);
return {std::move(nullable_column), std::move(nullable_type), column_name};
}
std::string format_name;
FormatSettings::DateTimeOverflowBehavior date_time_overflow_behavior;
bool allow_arrow_null_type;
bool skip_columns_with_unsupported_types;
};
static ColumnWithTypeAndName readColumnFromArrowColumn(
const std::shared_ptr<arrow::ChunkedArray> & arrow_column,
std::string column_name,
std::unordered_map<String, ArrowColumnToCHColumn::DictionaryInfo> dictionary_infos,
DataTypePtr type_hint,
bool is_nullable_column,
bool is_map_nested_column,
const ReadColumnFromArrowColumnSettings & settings);
static ColumnWithTypeAndName readNonNullableColumnFromArrowColumn(
const std::shared_ptr<arrow::ChunkedArray> & arrow_column,
std::string column_name,
std::unordered_map<String, ArrowColumnToCHColumn::DictionaryInfo> dictionary_infos,
DataTypePtr type_hint,
bool is_map_nested_column,
const ReadColumnFromArrowColumnSettings & settings)
{
switch (arrow_column->type()->id())
{
case arrow::Type::STRING:
@ -790,7 +786,7 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
case arrow::Type::BOOL:
return readColumnWithBooleanData(arrow_column, column_name);
case arrow::Type::DATE32:
return readColumnWithDate32Data(arrow_column, column_name, type_hint, date_time_overflow_behavior);
return readColumnWithDate32Data(arrow_column, column_name, type_hint, settings.date_time_overflow_behavior);
case arrow::Type::DATE64:
return readColumnWithDate64Data(arrow_column, column_name);
// ClickHouse writes Date as arrow UINT16 and DateTime as arrow UINT32,
@ -837,9 +833,16 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
key_type_hint = map_type_hint->getKeyType();
}
}
auto arrow_nested_column = getNestedArrowColumn<arrow::ListArray>(arrow_column);
auto nested_column = readColumnFromArrowColumn(arrow_nested_column, column_name, format_name, false, dictionary_infos, allow_null_type, skip_columns_with_unsupported_types, skipped, date_time_overflow_behavior, nested_type_hint, true);
if (skipped)
auto nested_column = readColumnFromArrowColumn(arrow_nested_column,
column_name,
dictionary_infos,
nested_type_hint,
false /*is_nullable_column*/,
true /*is_map_nested_column*/,
settings);
if (!nested_column.column)
return {};
auto offsets_column = readOffsetsFromArrowListColumn<arrow::ListArray>(arrow_column);
@ -866,7 +869,7 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
case arrow::Type::LIST:
case arrow::Type::LARGE_LIST:
{
bool is_large = arrow_column->type()->id() == arrow::Type::LARGE_LIST;
bool is_large_list = arrow_column->type()->id() == arrow::Type::LARGE_LIST;
DataTypePtr nested_type_hint;
if (type_hint)
{
@ -874,12 +877,33 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
if (array_type_hint)
nested_type_hint = array_type_hint->getNestedType();
}
auto arrow_nested_column = is_large ? getNestedArrowColumn<arrow::LargeListArray>(arrow_column) : getNestedArrowColumn<arrow::ListArray>(arrow_column);
auto nested_column = readColumnFromArrowColumn(arrow_nested_column, column_name, format_name, false, dictionary_infos, allow_null_type, skip_columns_with_unsupported_types, skipped, date_time_overflow_behavior, nested_type_hint);
if (skipped)
bool is_nested_nullable_column = false;
if (is_large_list)
{
auto * arrow_large_list_type = assert_cast<arrow::LargeListType *>(arrow_column->type().get());
is_nested_nullable_column = arrow_large_list_type->value_field()->nullable();
}
else
{
auto * arrow_list_type = assert_cast<arrow::ListType *>(arrow_column->type().get());
is_nested_nullable_column = arrow_list_type->value_field()->nullable();
}
auto arrow_nested_column = is_large_list ? getNestedArrowColumn<arrow::LargeListArray>(arrow_column) : getNestedArrowColumn<arrow::ListArray>(arrow_column);
auto nested_column = readColumnFromArrowColumn(arrow_nested_column,
column_name,
dictionary_infos,
nested_type_hint,
is_nested_nullable_column,
false /*is_map_nested_column*/,
settings);
if (!nested_column.column)
return {};
auto offsets_column = is_large ? readOffsetsFromArrowListColumn<arrow::LargeListArray>(arrow_column) : readOffsetsFromArrowListColumn<arrow::ListArray>(arrow_column);
auto offsets_column = is_large_list ? readOffsetsFromArrowListColumn<arrow::LargeListArray>(arrow_column) : readOffsetsFromArrowListColumn<arrow::ListArray>(arrow_column);
auto array_column = ColumnArray::create(nested_column.column, offsets_column);
DataTypePtr array_type;
/// If type hint is Nested, we should return Nested type,
/// because we differentiate Nested and simple Array(Tuple)
@ -913,11 +937,13 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
for (int i = 0; i != arrow_struct_type->num_fields(); ++i)
{
auto field_name = arrow_struct_type->field(i)->name();
const auto & field = arrow_struct_type->field(i);
const auto & field_name = field->name();
DataTypePtr nested_type_hint;
if (tuple_type_hint)
{
if (tuple_type_hint->haveExplicitNames() && !is_map_nested)
if (tuple_type_hint->haveExplicitNames() && !is_map_nested_column)
{
auto pos = tuple_type_hint->tryGetPositionByName(field_name);
if (pos)
@ -926,13 +952,21 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
else if (size_t(i) < tuple_type_hint->getElements().size())
nested_type_hint = tuple_type_hint->getElement(i);
}
auto nested_arrow_column = std::make_shared<arrow::ChunkedArray>(nested_arrow_columns[i]);
auto element = readColumnFromArrowColumn(nested_arrow_column, field_name, format_name, false, dictionary_infos, allow_null_type, skip_columns_with_unsupported_types, skipped, date_time_overflow_behavior, nested_type_hint);
if (skipped)
auto column_with_type_and_name = readColumnFromArrowColumn(nested_arrow_column,
field_name,
dictionary_infos,
nested_type_hint,
field->nullable(),
false /*is_map_nested_column*/,
settings);
if (!column_with_type_and_name.column)
return {};
tuple_elements.emplace_back(std::move(element.column));
tuple_types.emplace_back(std::move(element.type));
tuple_names.emplace_back(std::move(element.name));
tuple_elements.emplace_back(std::move(column_with_type_and_name.column));
tuple_types.emplace_back(std::move(column_with_type_and_name.type));
tuple_names.emplace_back(std::move(column_with_type_and_name.name));
}
auto tuple_column = ColumnTuple::create(std::move(tuple_elements));
@ -953,8 +987,19 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
arrow::DictionaryArray & dict_chunk = dynamic_cast<arrow::DictionaryArray &>(*(arrow_column->chunk(chunk_i)));
dict_array.emplace_back(dict_chunk.dictionary());
}
auto arrow_dict_column = std::make_shared<arrow::ChunkedArray>(dict_array);
auto dict_column = readColumnFromArrowColumn(arrow_dict_column, column_name, format_name, false, dictionary_infos, allow_null_type, skip_columns_with_unsupported_types, skipped, date_time_overflow_behavior);
auto dict_column = readColumnFromArrowColumn(arrow_dict_column,
column_name,
dictionary_infos,
nullptr /*nested_type_hint*/,
false /*is_nullable_column*/,
false /*is_map_nested_column*/,
settings);
if (!dict_column.column)
return {};
for (size_t i = 0; i != dict_column.column->size(); ++i)
{
if (dict_column.column->isDefaultAt(i))
@ -963,6 +1008,7 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
break;
}
}
auto lc_type = std::make_shared<DataTypeLowCardinality>(is_lc_nullable ? makeNullable(dict_column.type) : dict_column.type);
auto tmp_lc_column = lc_type->createColumn();
auto tmp_dict_column = IColumn::mutate(assert_cast<ColumnLowCardinality *>(tmp_lc_column.get())->getDictionaryPtr());
@ -1002,7 +1048,7 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
// TODO: read UUID as a string?
case arrow::Type::NA:
{
if (allow_null_type)
if (settings.allow_arrow_null_type)
{
auto type = std::make_shared<DataTypeNothing>();
auto column = ColumnNothing::create(arrow_column->length());
@ -1012,11 +1058,8 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
}
default:
{
if (skip_columns_with_unsupported_types)
{
skipped = true;
if (settings.skip_columns_with_unsupported_types)
return {};
}
throw Exception(
ErrorCodes::UNKNOWN_TYPE,
@ -1024,14 +1067,59 @@ static ColumnWithTypeAndName readColumnFromArrowColumn(
"If it happens during schema inference and you want to skip columns with "
"unsupported types, you can enable setting input_format_{}"
"_skip_columns_with_unsupported_types_in_schema_inference",
format_name,
settings.format_name,
arrow_column->type()->name(),
column_name,
boost::algorithm::to_lower_copy(format_name));
boost::algorithm::to_lower_copy(settings.format_name));
}
}
}
static ColumnWithTypeAndName readColumnFromArrowColumn(
const std::shared_ptr<arrow::ChunkedArray> & arrow_column,
std::string column_name,
std::unordered_map<String, ArrowColumnToCHColumn::DictionaryInfo> dictionary_infos,
DataTypePtr type_hint,
bool is_nullable_column,
bool is_map_nested_column,
const ReadColumnFromArrowColumnSettings & settings)
{
bool read_as_nullable_column = arrow_column->null_count() || is_nullable_column || (type_hint && type_hint->isNullable());
if (read_as_nullable_column &&
arrow_column->type()->id() != arrow::Type::LIST &&
arrow_column->type()->id() != arrow::Type::LARGE_LIST &&
arrow_column->type()->id() != arrow::Type::MAP &&
arrow_column->type()->id() != arrow::Type::STRUCT &&
arrow_column->type()->id() != arrow::Type::DICTIONARY)
{
DataTypePtr nested_type_hint;
if (type_hint)
nested_type_hint = removeNullable(type_hint);
auto nested_column = readNonNullableColumnFromArrowColumn(arrow_column,
column_name,
dictionary_infos,
nested_type_hint,
is_map_nested_column,
settings);
if (!nested_column.column)
return {};
auto nullmap_column = readByteMapFromArrowColumn(arrow_column);
auto nullable_type = std::make_shared<DataTypeNullable>(std::move(nested_column.type));
auto nullable_column = ColumnNullable::create(nested_column.column, nullmap_column);
return {std::move(nullable_column), std::move(nullable_type), column_name};
}
return readNonNullableColumnFromArrowColumn(arrow_column,
column_name,
dictionary_infos,
type_hint,
is_map_nested_column,
settings);
}
// Creating CH header by arrow schema. Will be useful in task about inserting
// data from file without knowing table structure.
@ -1042,23 +1130,9 @@ static void checkStatus(const arrow::Status & status, const String & column_name
throw Exception{ErrorCodes::UNKNOWN_EXCEPTION, "Error with a {} column '{}': {}.", format_name, column_name, status.ToString()};
}
Block ArrowColumnToCHColumn::arrowSchemaToCHHeader(
const arrow::Schema & schema, const std::string & format_name,
bool skip_columns_with_unsupported_types, const Block * hint_header, bool ignore_case)
/// Create empty arrow column using specified field
static std::shared_ptr<arrow::ChunkedArray> createArrowColumn(const std::shared_ptr<arrow::Field> & field, const String & format_name)
{
ColumnsWithTypeAndName sample_columns;
std::unordered_set<String> nested_table_names;
if (hint_header)
nested_table_names = Nested::getAllTableNames(*hint_header, ignore_case);
for (const auto & field : schema.fields())
{
if (hint_header && !hint_header->has(field->name(), ignore_case)
&& !nested_table_names.contains(ignore_case ? boost::to_lower_copy(field->name()) : field->name()))
continue;
/// Create empty arrow column by it's type and convert it to ClickHouse column.
arrow::MemoryPool * pool = arrow::default_memory_pool();
std::unique_ptr<arrow::ArrayBuilder> array_builder;
arrow::Status status = MakeBuilder(pool, field->type(), &array_builder);
@ -1068,18 +1142,44 @@ Block ArrowColumnToCHColumn::arrowSchemaToCHHeader(
status = array_builder->Finish(&arrow_array);
checkStatus(status, field->name(), format_name);
arrow::ArrayVector array_vector = {arrow_array};
auto arrow_column = std::make_shared<arrow::ChunkedArray>(array_vector);
return std::make_shared<arrow::ChunkedArray>(arrow::ArrayVector{arrow_array});
}
Block ArrowColumnToCHColumn::arrowSchemaToCHHeader(
const arrow::Schema & schema,
const std::string & format_name,
bool skip_columns_with_unsupported_types)
{
ReadColumnFromArrowColumnSettings settings
{
.format_name = format_name,
.date_time_overflow_behavior = FormatSettings::DateTimeOverflowBehavior::Ignore,
.allow_arrow_null_type = false,
.skip_columns_with_unsupported_types = skip_columns_with_unsupported_types
};
ColumnsWithTypeAndName sample_columns;
for (const auto & field : schema.fields())
{
/// Create empty arrow column by it's type and convert it to ClickHouse column.
auto arrow_column = createArrowColumn(field, format_name);
std::unordered_map<std::string, DictionaryInfo> dict_infos;
bool skipped = false;
bool allow_null_type = false;
if (hint_header && hint_header->has(field->name()) && hint_header->getByName(field->name()).type->isNullable())
allow_null_type = true;
ColumnWithTypeAndName sample_column = readColumnFromArrowColumn(
arrow_column, field->name(), format_name, false, dict_infos, allow_null_type, skip_columns_with_unsupported_types, skipped);
if (!skipped)
auto sample_column = readColumnFromArrowColumn(
arrow_column,
field->name(),
dict_infos,
nullptr /*nested_type_hint*/,
field->nullable() /*is_nullable_column*/,
false /*is_map_nested_column*/,
settings);
if (sample_column.column)
sample_columns.emplace_back(std::move(sample_column));
}
return Block(std::move(sample_columns));
}
@ -1101,30 +1201,43 @@ ArrowColumnToCHColumn::ArrowColumnToCHColumn(
{
}
void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table, size_t num_rows, BlockMissingValues * block_missing_values)
Chunk ArrowColumnToCHColumn::arrowTableToCHChunk(const std::shared_ptr<arrow::Table> & table, size_t num_rows, BlockMissingValues * block_missing_values)
{
NameToColumnPtr name_to_column_ptr;
NameToArrowColumn name_to_arrow_column;
for (auto column_name : table->ColumnNames())
{
std::shared_ptr<arrow::ChunkedArray> arrow_column = table->GetColumnByName(column_name);
auto arrow_column = table->GetColumnByName(column_name);
if (!arrow_column)
throw Exception(ErrorCodes::DUPLICATE_COLUMN, "Column '{}' is duplicated", column_name);
auto arrow_field = table->schema()->GetFieldByName(column_name);
if (case_insensitive_matching)
boost::to_lower(column_name);
name_to_column_ptr[std::move(column_name)] = arrow_column;
name_to_arrow_column[std::move(column_name)] = {std::move(arrow_column), std::move(arrow_field)};
}
arrowColumnsToCHChunk(res, name_to_column_ptr, num_rows, block_missing_values);
return arrowColumnsToCHChunk(name_to_arrow_column, num_rows, block_missing_values);
}
void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr, size_t num_rows, BlockMissingValues * block_missing_values)
Chunk ArrowColumnToCHColumn::arrowColumnsToCHChunk(const NameToArrowColumn & name_to_arrow_column, size_t num_rows, BlockMissingValues * block_missing_values)
{
Columns columns_list;
columns_list.reserve(header.columns());
ReadColumnFromArrowColumnSettings settings
{
.format_name = format_name,
.date_time_overflow_behavior = date_time_overflow_behavior,
.allow_arrow_null_type = true,
.skip_columns_with_unsupported_types = false
};
Columns columns;
columns.reserve(header.columns());
std::unordered_map<String, std::pair<BlockPtr, std::shared_ptr<NestedColumnExtractHelper>>> nested_tables;
bool skipped = false;
for (size_t column_i = 0, columns = header.columns(); column_i < columns; ++column_i)
for (size_t column_i = 0, header_columns = header.columns(); column_i < header_columns; ++column_i)
{
const ColumnWithTypeAndName & header_column = header.getByPosition(column_i);
@ -1133,15 +1246,17 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
boost::to_lower(search_column_name);
ColumnWithTypeAndName column;
if (!name_to_column_ptr.contains(search_column_name))
if (!name_to_arrow_column.contains(search_column_name))
{
bool read_from_nested = false;
/// Check if it's a subcolumn from some struct.
String nested_table_name = Nested::extractTableName(header_column.name);
String search_nested_table_name = nested_table_name;
if (case_insensitive_matching)
boost::to_lower(search_nested_table_name);
if (name_to_column_ptr.contains(search_nested_table_name))
if (name_to_arrow_column.contains(search_nested_table_name))
{
if (!nested_tables.contains(search_nested_table_name))
{
@ -1153,10 +1268,19 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
}
auto nested_table_type = Nested::collect(nested_columns).front().type;
std::shared_ptr<arrow::ChunkedArray> arrow_column = name_to_column_ptr[search_nested_table_name];
ColumnsWithTypeAndName cols = {
readColumnFromArrowColumn(arrow_column, nested_table_name, format_name, false, dictionary_infos, true, false,
skipped, date_time_overflow_behavior, nested_table_type)};
const auto & arrow_column = name_to_arrow_column.find(search_nested_table_name)->second;
ColumnsWithTypeAndName cols =
{
readColumnFromArrowColumn(arrow_column.column,
nested_table_name,
dictionary_infos,
nested_table_type,
arrow_column.field->nullable() /*is_nullable_column*/,
false /*is_map_nested_column*/,
settings)
};
BlockPtr block_ptr = std::make_shared<Block>(cols);
auto column_extractor = std::make_shared<NestedColumnExtractHelper>(*block_ptr, case_insensitive_matching);
nested_tables[search_nested_table_name] = {block_ptr, column_extractor};
@ -1180,7 +1304,7 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
column.name = header_column.name;
column.type = header_column.type;
column.column = header_column.column->cloneResized(num_rows);
columns_list.push_back(std::move(column.column));
columns.push_back(std::move(column.column));
if (block_missing_values)
block_missing_values->setBits(column_i, num_rows);
continue;
@ -1189,9 +1313,14 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
}
else
{
auto arrow_column = name_to_column_ptr[search_column_name];
column = readColumnFromArrowColumn(
arrow_column, header_column.name, format_name, false, dictionary_infos, true, false, skipped, date_time_overflow_behavior, header_column.type);
const auto & arrow_column = name_to_arrow_column.find(search_column_name)->second;
column = readColumnFromArrowColumn(arrow_column.column,
header_column.name,
dictionary_infos,
header_column.type,
arrow_column.field->nullable(),
false /*is_map_nested_column*/,
settings);
}
if (null_as_default)
@ -1216,10 +1345,10 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
}
column.type = header_column.type;
columns_list.push_back(std::move(column.column));
columns.push_back(std::move(column.column));
}
res.setColumns(columns_list, num_rows);
return Chunk(std::move(columns), num_rows);
}
}

View File

@ -19,8 +19,6 @@ class Chunk;
class ArrowColumnToCHColumn
{
public:
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::ChunkedArray>>;
ArrowColumnToCHColumn(
const Block & header_,
const std::string & format_name_,
@ -30,18 +28,13 @@ public:
bool case_insensitive_matching_ = false,
bool is_stream_ = false);
void arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table, size_t num_rows, BlockMissingValues * block_missing_values = nullptr);
Chunk arrowTableToCHChunk(const std::shared_ptr<arrow::Table> & table, size_t num_rows, BlockMissingValues * block_missing_values = nullptr);
void arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr & name_to_column_ptr, size_t num_rows, BlockMissingValues * block_missing_values = nullptr);
/// Transform arrow schema to ClickHouse header. If hint_header is provided,
/// we will skip columns in schema that are not in hint_header.
/// Transform arrow schema to ClickHouse header
static Block arrowSchemaToCHHeader(
const arrow::Schema & schema,
const std::string & format_name,
bool skip_columns_with_unsupported_types = false,
const Block * hint_header = nullptr,
bool ignore_case = false);
bool skip_columns_with_unsupported_types = false);
struct DictionaryInfo
{
@ -52,6 +45,16 @@ public:
private:
struct ArrowColumn
{
std::shared_ptr<arrow::ChunkedArray> column;
std::shared_ptr<arrow::Field> field;
};
using NameToArrowColumn = std::unordered_map<std::string, ArrowColumn>;
Chunk arrowColumnsToCHChunk(const NameToArrowColumn & name_to_arrow_column, size_t num_rows, BlockMissingValues * block_missing_values);
const Block & header;
const std::string format_name;
/// If false, throw exception if some columns in header not exists in arrow table.

View File

@ -71,12 +71,10 @@ Chunk ORCBlockInputFormat::read()
approx_bytes_read_for_chunk = file_reader->GetRawORCReader()->getStripe(stripe_current)->getDataLength();
++stripe_current;
Chunk res;
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr;
arrow_column_to_ch_column->arrowTableToCHChunk(res, table, num_rows, block_missing_values_ptr);
return res;
return arrow_column_to_ch_column->arrowTableToCHChunk(table, num_rows, block_missing_values_ptr);
}
void ORCBlockInputFormat::resetParser()

View File

@ -601,7 +601,7 @@ void ParquetBlockInputFormat::decodeOneChunk(size_t row_group_batch_idx, std::un
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &res.block_missing_values : nullptr;
row_group_batch.arrow_column_to_ch_column->arrowTableToCHChunk(res.chunk, *tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr);
res.chunk = row_group_batch.arrow_column_to_ch_column->arrowTableToCHChunk(*tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr);
lock.lock();

View File

@ -282,11 +282,10 @@ struct DeltaLakeMetadataParser<Configuration, MetadataReadHelper>::Impl
format_settings.date_time_overflow_behavior,
/* case_insensitive_column_matching */false);
Chunk res;
std::shared_ptr<arrow::Table> table;
THROW_ARROW_NOT_OK(reader->ReadTable(&table));
column_reader.arrowTableToCHChunk(res, table, reader->parquet_reader()->metadata()->num_rows());
Chunk res = column_reader.arrowTableToCHChunk(table, reader->parquet_reader()->metadata()->num_rows());
const auto & res_columns = res.getColumns();
if (res_columns.size() != 2)

View File

@ -0,0 +1,40 @@
Parquet
a UInt64
a_nullable Nullable(UInt64)
Arrow
a UInt64
a_nullable Nullable(UInt64)
Parquet
b Array(Nullable(UInt64))
b_nullable Array(Nullable(UInt64))
Arrow
b Array(Nullable(UInt64))
b_nullable Array(Nullable(UInt64))
Parquet
c Tuple(\n a UInt64,\n b String)
c_nullable Tuple(\n a Nullable(UInt64),\n b Nullable(String))
Arrow
c Tuple(\n a UInt64,\n b String)
c_nullable Tuple(\n a Nullable(UInt64),\n b Nullable(String))
Parquet
d Tuple(\n a UInt64,\n b Tuple(\n a UInt64,\n b String),\n d_nullable Tuple(\n a UInt64,\n b Tuple(\n a Nullable(UInt64),\n b Nullable(String))))
Arrow
d Tuple(\n a UInt64,\n b Tuple(\n a UInt64,\n b String),\n d_nullable Tuple(\n a UInt64,\n b Tuple(\n a Nullable(UInt64),\n b Nullable(String))))
Parquet
e Map(UInt64, Nullable(String))
e_nullable Map(UInt64, Nullable(String))
Arrow
e Map(UInt64, Nullable(String))
e_nullable Map(UInt64, Nullable(String))
Parquet
f Map(UInt64, Map(UInt64, Nullable(String)))
f_nullables Map(UInt64, Map(UInt64, Nullable(String)))
Arrow
f Map(UInt64, Map(UInt64, Nullable(String)))
f_nullables Map(UInt64, Map(UInt64, Nullable(String)))
Parquet
g String
g_nullable Nullable(String)
Arrow
g LowCardinality(String)
g_nullable LowCardinality(String)

View File

@ -0,0 +1,63 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
DATA_FILE=$CLICKHOUSE_TEST_UNIQUE_NAME.data
formats="Parquet Arrow"
for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('a UInt64, a_nullable Nullable(UInt64)', 42) limit 10 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('b Array(UInt64), b_nullable Array(Nullable(UInt64))', 42) limit 10 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('c Tuple(a UInt64, b String), c_nullable Tuple(a Nullable(UInt64), b Nullable(String))', 42) limit 10 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('d Tuple(a UInt64, b Tuple(a UInt64, b String), d_nullable Tuple(a UInt64, b Tuple(a Nullable(UInt64), b Nullable(String))))', 42) limit 10 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('e Map(UInt64, String), e_nullable Map(UInt64, Nullable(String))', 42) limit 10 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('f Map(UInt64, Map(UInt64, String)), f_nullables Map(UInt64, Map(UInt64, Nullable(String)))', 42) limit 10 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
for format in $formats
do
echo $format
$CLICKHOUSE_LOCAL -q "select * from generateRandom('g LowCardinality(String), g_nullable LowCardinality(Nullable(String))', 42) limit 10 settings output_format_arrow_low_cardinality_as_dictionary=1, allow_suspicious_low_cardinality_types=1 format $format" > $DATA_FILE
$CLICKHOUSE_LOCAL -q "desc file('$DATA_FILE') SETTINGS schema_inference_make_columns_nullable = 0"
done
rm $DATA_FILE