Merge pull request #48361 from Avogar/fix-arrow-dict-2

Fix serializing LowCardinality as Arrow dictionary
This commit is contained in:
Kruglov Pavel 2023-04-19 12:23:27 +02:00 committed by GitHub
commit 8053b18c05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 129 additions and 27 deletions

View File

@ -98,6 +98,7 @@ void ArrowBlockOutputFormat::prepareWriter(const std::shared_ptr<arrow::Schema>
arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchWriter>> writer_status; arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchWriter>> writer_status;
arrow::ipc::IpcWriteOptions options = arrow::ipc::IpcWriteOptions::Defaults(); arrow::ipc::IpcWriteOptions options = arrow::ipc::IpcWriteOptions::Defaults();
options.codec = *arrow::util::Codec::Create(getArrowCompression(format_settings.arrow.output_compression_method)); options.codec = *arrow::util::Codec::Create(getArrowCompression(format_settings.arrow.output_compression_method));
options.emit_dictionary_deltas = true;
// TODO: should we use arrow::ipc::IpcOptions::alignment? // TODO: should we use arrow::ipc::IpcOptions::alignment?
if (stream) if (stream)

View File

@ -201,7 +201,7 @@ namespace DB
size_t end, size_t end,
bool output_string_as_string, bool output_string_as_string,
bool output_fixed_string_as_fixed_byte_array, bool output_fixed_string_as_fixed_byte_array,
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values); std::unordered_map<String, MutableColumnPtr> & dictionary_values);
template <typename Builder> template <typename Builder>
static void fillArrowArrayWithArrayColumnData( static void fillArrowArrayWithArrayColumnData(
@ -215,7 +215,7 @@ namespace DB
size_t end, size_t end,
bool output_string_as_string, bool output_string_as_string,
bool output_fixed_string_as_fixed_byte_array, bool output_fixed_string_as_fixed_byte_array,
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values) std::unordered_map<String, MutableColumnPtr> & dictionary_values)
{ {
const auto * column_array = assert_cast<const ColumnArray *>(column.get()); const auto * column_array = assert_cast<const ColumnArray *>(column.get());
ColumnPtr nested_column = column_array->getDataPtr(); ColumnPtr nested_column = column_array->getDataPtr();
@ -246,7 +246,7 @@ namespace DB
size_t end, size_t end,
bool output_string_as_string, bool output_string_as_string,
bool output_fixed_string_as_fixed_byte_array, bool output_fixed_string_as_fixed_byte_array,
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values) std::unordered_map<String, MutableColumnPtr> & dictionary_values)
{ {
const auto * column_tuple = assert_cast<const ColumnTuple *>(column.get()); const auto * column_tuple = assert_cast<const ColumnTuple *>(column.get());
const auto * type_tuple = assert_cast<const DataTypeTuple *>(column_type.get()); const auto * type_tuple = assert_cast<const DataTypeTuple *>(column_type.get());
@ -277,7 +277,7 @@ namespace DB
} }
template<typename T> template<typename T>
static PaddedPODArray<Int64> extractIndexesImpl(ColumnPtr column, size_t start, size_t end, bool shift) static PaddedPODArray<Int64> extractIndexes(ColumnPtr column, size_t start, size_t end, bool shift)
{ {
const PaddedPODArray<T> & data = assert_cast<const ColumnVector<T> *>(column.get())->getData(); const PaddedPODArray<T> & data = assert_cast<const ColumnVector<T> *>(column.get())->getData();
PaddedPODArray<Int64> result; PaddedPODArray<Int64> result;
@ -289,23 +289,69 @@ namespace DB
return result; return result;
} }
static PaddedPODArray<Int64> extractIndexesImpl(ColumnPtr column, size_t start, size_t end, bool shift) static PaddedPODArray<Int64> extractIndexes(ColumnPtr column, size_t start, size_t end, bool shift)
{ {
switch (column->getDataType()) switch (column->getDataType())
{ {
case TypeIndex::UInt8: case TypeIndex::UInt8:
return extractIndexesImpl<UInt8>(column, start, end, shift); return extractIndexes<UInt8>(column, start, end, shift);
case TypeIndex::UInt16: case TypeIndex::UInt16:
return extractIndexesImpl<UInt16>(column, start, end, shift); return extractIndexes<UInt16>(column, start, end, shift);
case TypeIndex::UInt32: case TypeIndex::UInt32:
return extractIndexesImpl<UInt32>(column, start, end, shift); return extractIndexes<UInt32>(column, start, end, shift);
case TypeIndex::UInt64: case TypeIndex::UInt64:
return extractIndexesImpl<UInt64>(column, start, end, shift); return extractIndexes<UInt64>(column, start, end, shift);
default: default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Indexes column must be ColumnUInt, got {}.", column->getName()); throw Exception(ErrorCodes::LOGICAL_ERROR, "Indexes column must be ColumnUInt, got {}.", column->getName());
} }
} }
template <typename IndexesType, typename MappingType>
static PaddedPODArray<Int64> extractIndexesWithRemapping(ColumnPtr indexes, ColumnPtr mapping, size_t start, size_t end, bool shift)
{
const PaddedPODArray<IndexesType> & indexes_data = assert_cast<const ColumnVector<IndexesType> *>(indexes.get())->getData();
const PaddedPODArray<MappingType> & mapping_data = assert_cast<const ColumnVector<MappingType> *>(mapping.get())->getData();
PaddedPODArray<Int64> result;
result.reserve(end - start);
std::transform(indexes_data.begin() + start, indexes_data.begin() + end, std::back_inserter(result), [&](IndexesType value) { return mapping_data[Int64(value)] - shift; });
return result;
}
template <typename IndexesType>
static PaddedPODArray<Int64> extractIndexesWithRemapping(ColumnPtr indexes, ColumnPtr mapping, size_t start, size_t end, bool shift)
{
switch (mapping->getDataType())
{
case TypeIndex::UInt8:
return extractIndexesWithRemapping<IndexesType, UInt8>(indexes, mapping, start, end, shift);
case TypeIndex::UInt16:
return extractIndexesWithRemapping<IndexesType, UInt16>(indexes, mapping, start, end, shift);
case TypeIndex::UInt32:
return extractIndexesWithRemapping<IndexesType, UInt32>(indexes, mapping, start, end, shift);
case TypeIndex::UInt64:
return extractIndexesWithRemapping<IndexesType, UInt64>(indexes, mapping, start, end, shift);
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Indexes column must be ColumnUInt, got {}.", indexes->getName());
}
}
static PaddedPODArray<Int64> extractIndexesWithRemapping(ColumnPtr indexes, ColumnPtr mapping, size_t start, size_t end, bool shift)
{
switch (indexes->getDataType())
{
case TypeIndex::UInt8:
return extractIndexesWithRemapping<UInt8>(indexes, mapping, start, end, shift);
case TypeIndex::UInt16:
return extractIndexesWithRemapping<UInt16>(indexes, mapping, start, end, shift);
case TypeIndex::UInt32:
return extractIndexesWithRemapping<UInt32>(indexes, mapping, start, end, shift);
case TypeIndex::UInt64:
return extractIndexesWithRemapping<UInt64>(indexes, mapping, start, end, shift);
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Indexes column must be ColumnUInt, got {}.", indexes->getName());
}
}
template<typename ValueType> template<typename ValueType>
static void fillArrowArrayWithLowCardinalityColumnDataImpl( static void fillArrowArrayWithLowCardinalityColumnDataImpl(
const String & column_name, const String & column_name,
@ -318,35 +364,61 @@ namespace DB
size_t end, size_t end,
bool output_string_as_string, bool output_string_as_string,
bool output_fixed_string_as_fixed_byte_array, bool output_fixed_string_as_fixed_byte_array,
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values) std::unordered_map<String, MutableColumnPtr> & dictionary_values)
{ {
const auto * column_lc = assert_cast<const ColumnLowCardinality *>(column.get()); const auto * column_lc = assert_cast<const ColumnLowCardinality *>(column.get());
arrow::DictionaryBuilder<ValueType> * builder = assert_cast<arrow::DictionaryBuilder<ValueType> *>(array_builder); arrow::DictionaryBuilder<ValueType> * builder = assert_cast<arrow::DictionaryBuilder<ValueType> *>(array_builder);
auto & dict_values = dictionary_values[column_name]; auto & dict_values = dictionary_values[column_name];
bool is_nullable = column_type->isLowCardinalityNullable(); bool is_nullable = column_type->isLowCardinalityNullable();
/// Convert dictionary from LowCardinality to Arrow dictionary only once and then reuse it. ColumnPtr mapping;
if (!dict_values) if (!dict_values)
{ {
/// On first time just remember the first dictionary
dict_values = IColumn::mutate(column_lc->getDictionaryPtr());
}
else
{
/// In ClickHouse blocks with same header can contain LowCardinality columns with
/// different dictionaries.
/// Arrow supports only single dictionary for all batches, but it allows to extend
/// dictionary if previous dictionary is a prefix of a new one.
/// So, if new LowCardinality column has different dictionary
/// we extend previous one by using IColumnUnique::uniqueInsertRangeFrom
/// and then remap indexes so they match with the new extended dictionary.
const auto & new_dict = column_lc->getDictionary();
auto & dict = dynamic_cast<IColumnUnique &>(*dict_values);
if (dict.getHash() != new_dict.getHash())
{
const auto & new_values = new_dict.getNestedColumn();
mapping = dict.uniqueInsertRangeFrom(*new_values, 0, new_values->size());
}
}
/// Convert dictionary values to arrow array.
auto value_type = assert_cast<arrow::DictionaryType *>(builder->type().get())->value_type(); auto value_type = assert_cast<arrow::DictionaryType *>(builder->type().get())->value_type();
std::unique_ptr<arrow::ArrayBuilder> values_builder; std::unique_ptr<arrow::ArrayBuilder> values_builder;
arrow::MemoryPool* pool = arrow::default_memory_pool(); arrow::MemoryPool* pool = arrow::default_memory_pool();
arrow::Status status = MakeBuilder(pool, value_type, &values_builder); arrow::Status status = MakeBuilder(pool, value_type, &values_builder);
checkStatus(status, column->getName(), format_name); checkStatus(status, column->getName(), format_name);
auto dict_column = column_lc->getDictionary().getNestedNotNullableColumn(); auto dict_column = dynamic_cast<IColumnUnique &>(*dict_values).getNestedNotNullableColumn();
const auto & dict_type = removeNullable(assert_cast<const DataTypeLowCardinality *>(column_type.get())->getDictionaryType()); const auto & dict_type = removeNullable(assert_cast<const DataTypeLowCardinality *>(column_type.get())->getDictionaryType());
fillArrowArray(column_name, dict_column, dict_type, nullptr, values_builder.get(), format_name, is_nullable, dict_column->size(), output_string_as_string, output_fixed_string_as_fixed_byte_array, dictionary_values); fillArrowArray(column_name, dict_column, dict_type, nullptr, values_builder.get(), format_name, is_nullable, dict_column->size(), output_string_as_string, output_fixed_string_as_fixed_byte_array, dictionary_values);
status = values_builder->Finish(&dict_values); std::shared_ptr<arrow::Array> arrow_dict_array;
status = values_builder->Finish(&arrow_dict_array);
checkStatus(status, column->getName(), format_name); checkStatus(status, column->getName(), format_name);
}
arrow::Status status = builder->InsertMemoValues(*dict_values); status = builder->InsertMemoValues(*arrow_dict_array);
checkStatus(status, column->getName(), format_name); checkStatus(status, column->getName(), format_name);
/// AppendIndices in DictionaryBuilder works only with int64_t data, so we cannot use /// AppendIndices in DictionaryBuilder works only with int64_t data, so we cannot use
/// fillArrowArray here and should copy all indexes to int64_t container. /// fillArrowArray here and should copy all indexes to int64_t container.
auto indexes = extractIndexesImpl(column_lc->getIndexesPtr(), start, end, is_nullable); PaddedPODArray<Int64> indexes;
if (mapping)
indexes = extractIndexesWithRemapping(column_lc->getIndexesPtr(), mapping, start, end, is_nullable);
else
indexes = extractIndexes(column_lc->getIndexesPtr(), start, end, is_nullable);
const uint8_t * arrow_null_bytemap_raw_ptr = nullptr; const uint8_t * arrow_null_bytemap_raw_ptr = nullptr;
PaddedPODArray<uint8_t> arrow_null_bytemap; PaddedPODArray<uint8_t> arrow_null_bytemap;
if (column_type->isLowCardinalityNullable()) if (column_type->isLowCardinalityNullable())
@ -374,7 +446,7 @@ namespace DB
size_t end, size_t end,
bool output_string_as_string, bool output_string_as_string,
bool output_fixed_string_as_fixed_byte_array, bool output_fixed_string_as_fixed_byte_array,
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values) std::unordered_map<String, MutableColumnPtr> & dictionary_values)
{ {
auto value_type = assert_cast<arrow::DictionaryType *>(array_builder->type().get())->value_type(); auto value_type = assert_cast<arrow::DictionaryType *>(array_builder->type().get())->value_type();
@ -606,7 +678,7 @@ namespace DB
size_t end, size_t end,
bool output_string_as_string, bool output_string_as_string,
bool output_fixed_string_as_fixed_byte_array, bool output_fixed_string_as_fixed_byte_array,
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values) std::unordered_map<String, MutableColumnPtr> & dictionary_values)
{ {
const String column_type_name = column_type->getFamilyName(); const String column_type_name = column_type->getFamilyName();
WhichDataType which(column_type); WhichDataType which(column_type);

View File

@ -26,7 +26,7 @@ private:
/// Map {column name : arrow dictionary}. /// Map {column name : arrow dictionary}.
/// To avoid converting dictionary from LowCardinality to Arrow /// To avoid converting dictionary from LowCardinality to Arrow
/// Dictionary every chunk we save it and reuse. /// Dictionary every chunk we save it and reuse.
std::unordered_map<std::string, std::shared_ptr<arrow::Array>> dictionary_values; std::unordered_map<std::string, MutableColumnPtr> dictionary_values;
/// We should initialize arrow fields on first call of chChunkToArrowTable, not in constructor /// We should initialize arrow fields on first call of chChunkToArrowTable, not in constructor
/// because LowCardinality column from header always has indexes type UInt8, so, we should get /// because LowCardinality column from header always has indexes type UInt8, so, we should get

View File

@ -0,0 +1,20 @@
0
1
2
3
4
5
6
7
8
9
0
1
2
3
4
5
6
7
8
9

View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_LOCAL -q "select toLowCardinality(toString(number % 10)) as x from numbers(20) format Arrow settings max_block_size=7, output_format_arrow_low_cardinality_as_dictionary=1" | $CLICKHOUSE_LOCAL -q "select * from table" --input-format='Arrow'