Merge pull request #55989 from Avogar/lc-as-arrow-dict-fix

Fix output/input of Arrow dictionary column
This commit is contained in:
Kruglov Pavel 2023-10-30 20:47:49 +01:00 committed by GitHub
commit 4c2a132d96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 66 additions and 11 deletions

View File

@ -158,7 +158,8 @@ void ArrowBlockInputFormat::prepareReader()
format_settings.arrow.allow_missing_columns,
format_settings.null_as_default,
format_settings.date_time_overflow_behavior,
format_settings.arrow.case_insensitive_column_matching);
format_settings.arrow.case_insensitive_column_matching,
stream);
if (stream)
record_batch_total = -1;

View File

@ -1058,13 +1058,15 @@ ArrowColumnToCHColumn::ArrowColumnToCHColumn(
bool allow_missing_columns_,
bool null_as_default_,
FormatSettings::DateTimeOverflowBehavior date_time_overflow_behavior_,
bool case_insensitive_matching_)
bool case_insensitive_matching_,
bool is_stream_)
: header(header_)
, format_name(format_name_)
, allow_missing_columns(allow_missing_columns_)
, null_as_default(null_as_default_)
, case_insensitive_matching(case_insensitive_matching_)
, date_time_overflow_behavior(date_time_overflow_behavior_)
, case_insensitive_matching(case_insensitive_matching_)
, is_stream(is_stream_)
{
}
@ -1164,6 +1166,10 @@ void ArrowColumnToCHColumn::arrowColumnsToCHChunk(Chunk & res, NameToColumnPtr &
if (null_as_default)
insertNullAsDefaultIfNeeded(column, header_column, column_i, block_missing_values);
/// In ArrowStream batches can have different dictionaries.
if (is_stream)
dictionary_infos.clear();
try
{
column.column = castColumn(column, header_column.type);

View File

@ -27,7 +27,8 @@ public:
bool allow_missing_columns_,
bool null_as_default_,
FormatSettings::DateTimeOverflowBehavior date_time_overflow_behavior_,
bool case_insensitive_matching_ = false);
bool case_insensitive_matching_ = false,
bool is_stream_ = false);
void arrowTableToCHChunk(Chunk & res, std::shared_ptr<arrow::Table> & table, size_t num_rows, BlockMissingValues * block_missing_values = nullptr);
@ -56,8 +57,9 @@ private:
/// If false, throw exception if some columns in header not exists in arrow table.
bool allow_missing_columns;
bool null_as_default;
bool case_insensitive_matching;
FormatSettings::DateTimeOverflowBehavior date_time_overflow_behavior;
bool case_insensitive_matching;
bool is_stream;
/// Map {column name : dictionary column}.
/// To avoid converting dictionary from Arrow Dictionary

View File

@ -58,6 +58,7 @@ namespace DB
extern const int UNKNOWN_TYPE;
extern const int LOGICAL_ERROR;
extern const int DECIMAL_OVERFLOW;
extern const int ILLEGAL_COLUMN;
}
static const std::initializer_list<std::pair<String, std::shared_ptr<arrow::DataType>>> internal_type_to_arrow_type =
@ -356,6 +357,18 @@ namespace DB
}
}
static void checkIfIndexesTypeIsExceeded(const std::shared_ptr<arrow::DataType> & arrow_type, size_t dict_size)
{
const auto & dict_indexes_arrow_type = assert_cast<const arrow::DictionaryType *>(arrow_type.get())->index_type();
/// We use UInt32 or UInt64 type for indexes. It makes sense to check overflow only for UInt32.
const auto * indexes_uint32_type = typeid_cast<const arrow::UInt32Type *>(dict_indexes_arrow_type.get());
if (indexes_uint32_type && dict_size > UINT32_MAX)
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"Cannot convert ClickHouse LowCardinality column to Arrow Dictionary column:"
" resulting dictionary size exceeds the max value of index type UInt32");
}
template<typename ValueType>
static void fillArrowArrayWithLowCardinalityColumnDataImpl(
const String & column_name,
@ -396,6 +409,7 @@ namespace DB
{
const auto & new_values = new_dict.getNestedColumn();
mapping = dict.uniqueInsertRangeFrom(*new_values, 0, new_values->size());
checkIfIndexesTypeIsExceeded(array_builder->type(), dict.size());
}
}
@ -799,18 +813,26 @@ namespace DB
static std::shared_ptr<arrow::DataType> getArrowTypeForLowCardinalityIndexes(ColumnPtr indexes_column)
{
/// Arrow docs recommend preferring signed integers over unsigned integers for representing dictionary indices.
/// https://arrow.apache.org/docs/format/Columnar.html#dictionary-encoded-layout
switch (indexes_column->getDataType())
{
/// In ClickHouse blocks with same header can contain LowCardinality columns with
/// different dictionaries.
/// Arrow supports only single dictionary for all batches, but it allows to extend
/// dictionary if previous dictionary is a prefix of a new one.
/// But it can happen that LowCardinality columns contains UInt8 indexes columns
/// but resulting extended arrow dictionary will exceed UInt8 and we will need UInt16,
/// but it's not possible to change the type of Arrow dictionary indexes as it's
/// written in Arrow schema.
/// We can just always use type UInt64, but it can be inefficient.
/// In most cases UInt32 should be enough (with more unique values using dictionary is quite meaningless).
/// So we use minimum UInt32 type here (actually maybe even UInt16 will be enough).
/// In case if it's exceeded during dictionary extension, an exception will be thrown.
case TypeIndex::UInt8:
return arrow::int8();
case TypeIndex::UInt16:
return arrow::int16();
case TypeIndex::UInt32:
return arrow::int32();
return arrow::uint32();
case TypeIndex::UInt64:
return arrow::int64();
return arrow::uint64();
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Indexes column for getUniqueIndex must be ColumnUInt, got {}.", indexes_column->getName());
}

View File

@ -0,0 +1,4 @@
240
foo
foo
bar

View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS arrow_dicts"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE arrow_dicts (a LowCardinality(String)) engine=MergeTree order by a"
${CLICKHOUSE_CLIENT} --query="SYSTEM STOP MERGES arrow_dicts"
${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_dicts select toString(number) from numbers(120);"
${CLICKHOUSE_CLIENT} --query="INSERT INTO arrow_dicts select toString(number) from numbers(120, 120);"
${CLICKHOUSE_CLIENT} --query="SELECT * FROM arrow_dicts FORMAT Arrow SETTINGS output_format_arrow_low_cardinality_as_dictionary=1" > "${CLICKHOUSE_TMP}"/$CLICKHOUSE_TEST_UNIQUE_NAME.arrow
${CLICKHOUSE_CLIENT} --query="DROP TABLE arrow_dicts"
$CLICKHOUSE_LOCAL -q "select uniqExact(a) from file('$CLICKHOUSE_TMP/$CLICKHOUSE_TEST_UNIQUE_NAME.arrow')"
$CLICKHOUSE_LOCAL -q "select * from file('$CUR_DIR/data_arrow/different_dicts.arrowstream')"