Fix serializing LowCardinality as Arrow dictionary

This commit is contained in:
avogar 2023-04-03 19:36:43 +00:00
parent fbb22348ea
commit b2e52f80cd
5 changed files with 132 additions and 27 deletions

View File

@ -98,6 +98,7 @@ void ArrowBlockOutputFormat::prepareWriter(const std::shared_ptr<arrow::Schema>
arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchWriter>> writer_status;
arrow::ipc::IpcWriteOptions options = arrow::ipc::IpcWriteOptions::Defaults();
options.codec = *arrow::util::Codec::Create(getArrowCompression(format_settings.arrow.output_compression_method));
options.emit_dictionary_deltas = true;
// TODO: should we use arrow::ipc::IpcOptions::alignment?
if (stream)

View File

@ -194,7 +194,7 @@ namespace DB
size_t end,
bool output_string_as_string,
bool output_fixed_string_as_fixed_byte_array,
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values);
std::unordered_map<String, MutableColumnPtr> & dictionary_values);
template <typename Builder>
static void fillArrowArrayWithArrayColumnData(
@ -208,7 +208,7 @@ namespace DB
size_t end,
bool output_string_as_string,
bool output_fixed_string_as_fixed_byte_array,
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values)
std::unordered_map<String, MutableColumnPtr> & dictionary_values)
{
const auto * column_array = assert_cast<const ColumnArray *>(column.get());
ColumnPtr nested_column = column_array->getDataPtr();
@ -239,7 +239,7 @@ namespace DB
size_t end,
bool output_string_as_string,
bool output_fixed_string_as_fixed_byte_array,
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values)
std::unordered_map<String, MutableColumnPtr> & dictionary_values)
{
const auto * column_tuple = assert_cast<const ColumnTuple *>(column.get());
const auto * type_tuple = assert_cast<const DataTypeTuple *>(column_type.get());
@ -270,7 +270,7 @@ namespace DB
}
template<typename T>
static PaddedPODArray<Int64> extractIndexesImpl(ColumnPtr column, size_t start, size_t end, bool shift)
static PaddedPODArray<Int64> extractIndexes(ColumnPtr column, size_t start, size_t end, bool shift)
{
const PaddedPODArray<T> & data = assert_cast<const ColumnVector<T> *>(column.get())->getData();
PaddedPODArray<Int64> result;
@ -282,23 +282,72 @@ namespace DB
return result;
}
static PaddedPODArray<Int64> extractIndexesImpl(ColumnPtr column, size_t start, size_t end, bool shift)
static PaddedPODArray<Int64> extractIndexes(ColumnPtr column, size_t start, size_t end, bool shift)
{
switch (column->getDataType())
{
case TypeIndex::UInt8:
return extractIndexesImpl<UInt8>(column, start, end, shift);
return extractIndexes<UInt8>(column, start, end, shift);
case TypeIndex::UInt16:
return extractIndexesImpl<UInt16>(column, start, end, shift);
return extractIndexes<UInt16>(column, start, end, shift);
case TypeIndex::UInt32:
return extractIndexesImpl<UInt32>(column, start, end, shift);
return extractIndexes<UInt32>(column, start, end, shift);
case TypeIndex::UInt64:
return extractIndexesImpl<UInt64>(column, start, end, shift);
return extractIndexes<UInt64>(column, start, end, shift);
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Indexes column must be ColumnUInt, got {}.", column->getName());
}
}
template <typename IndexesType, typename MappingType>
static PaddedPODArray<Int64> extractIndexesWithRemapping(ColumnPtr indexes, ColumnPtr mapping, size_t start, size_t end, bool shift)
{
const PaddedPODArray<IndexesType> & indexes_data = assert_cast<const ColumnVector<IndexesType> *>(indexes.get())->getData();
const PaddedPODArray<MappingType> & mapping_data = assert_cast<const ColumnVector<MappingType> *>(mapping.get())->getData();
PaddedPODArray<Int64> result;
result.reserve(end - start);
if (shift)
std::transform(indexes_data.begin() + start, indexes_data.begin() + end, std::back_inserter(result), [&](IndexesType value) { return mapping_data[Int64(value)] - 1; });
else
std::transform(indexes_data.begin() + start, indexes_data.begin() + end, std::back_inserter(result), [&](IndexesType value) { return mapping_data[Int64(value)]; });
return result;
}
template <typename IndexesType>
static PaddedPODArray<Int64> extractIndexesWithRemapping(ColumnPtr indexes, ColumnPtr mapping, size_t start, size_t end, bool shift)
{
switch (mapping->getDataType())
{
case TypeIndex::UInt8:
return extractIndexesWithRemapping<IndexesType, UInt8>(indexes, mapping, start, end, shift);
case TypeIndex::UInt16:
return extractIndexesWithRemapping<IndexesType, UInt16>(indexes, mapping, start, end, shift);
case TypeIndex::UInt32:
return extractIndexesWithRemapping<IndexesType, UInt32>(indexes, mapping, start, end, shift);
case TypeIndex::UInt64:
return extractIndexesWithRemapping<IndexesType, UInt64>(indexes, mapping, start, end, shift);
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Indexes column must be ColumnUInt, got {}.", indexes->getName());
}
}
static PaddedPODArray<Int64> extractIndexesWithRemapping(ColumnPtr indexes, ColumnPtr mapping, size_t start, size_t end, bool shift)
{
switch (indexes->getDataType())
{
case TypeIndex::UInt8:
return extractIndexesWithRemapping<UInt8>(indexes, mapping, start, end, shift);
case TypeIndex::UInt16:
return extractIndexesWithRemapping<UInt16>(indexes, mapping, start, end, shift);
case TypeIndex::UInt32:
return extractIndexesWithRemapping<UInt32>(indexes, mapping, start, end, shift);
case TypeIndex::UInt64:
return extractIndexesWithRemapping<UInt64>(indexes, mapping, start, end, shift);
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Indexes column must be ColumnUInt, got {}.", indexes->getName());
}
}
template<typename ValueType>
static void fillArrowArrayWithLowCardinalityColumnDataImpl(
const String & column_name,
@ -311,35 +360,61 @@ namespace DB
size_t end,
bool output_string_as_string,
bool output_fixed_string_as_fixed_byte_array,
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values)
std::unordered_map<String, MutableColumnPtr> & dictionary_values)
{
const auto * column_lc = assert_cast<const ColumnLowCardinality *>(column.get());
arrow::DictionaryBuilder<ValueType> * builder = assert_cast<arrow::DictionaryBuilder<ValueType> *>(array_builder);
auto & dict_values = dictionary_values[column_name];
bool is_nullable = column_type->isLowCardinalityNullable();
/// Convert dictionary from LowCardinality to Arrow dictionary only once and then reuse it.
ColumnPtr mapping;
if (!dict_values)
{
auto value_type = assert_cast<arrow::DictionaryType *>(builder->type().get())->value_type();
std::unique_ptr<arrow::ArrayBuilder> values_builder;
arrow::MemoryPool* pool = arrow::default_memory_pool();
arrow::Status status = MakeBuilder(pool, value_type, &values_builder);
checkStatus(status, column->getName(), format_name);
auto dict_column = column_lc->getDictionary().getNestedNotNullableColumn();
const auto & dict_type = removeNullable(assert_cast<const DataTypeLowCardinality *>(column_type.get())->getDictionaryType());
fillArrowArray(column_name, dict_column, dict_type, nullptr, values_builder.get(), format_name, is_nullable, dict_column->size(), output_string_as_string, output_fixed_string_as_fixed_byte_array, dictionary_values);
status = values_builder->Finish(&dict_values);
checkStatus(status, column->getName(), format_name);
/// On first time just remember the first dictionary
dict_values = IColumn::mutate(column_lc->getDictionaryPtr());
}
else
{
/// In ClickHouse blocks with same header can contain LowCardinality columns with
/// different dictionaries.
/// Arrow supports only single dictionary for all batches, but it allows to extend
/// dictionary if previous dictionary is a prefix of a new one.
/// So, if new LowCardinality column has different dictionary
/// we extend previous one by using IColumnUnique::uniqueInsertRangeFrom
/// and then remap indexes so they match with the new extended dictionary.
const auto & new_dict = column_lc->getDictionary();
auto & dict = dynamic_cast<IColumnUnique &>(*dict_values);
if (dict.getHash() != new_dict.getHash())
{
const auto & new_values = new_dict.getNestedColumn();
mapping = dict.uniqueInsertRangeFrom(*new_values, 0, new_values->size());
}
}
arrow::Status status = builder->InsertMemoValues(*dict_values);
/// Convert dictionary values to arrow array.
auto value_type = assert_cast<arrow::DictionaryType *>(builder->type().get())->value_type();
std::unique_ptr<arrow::ArrayBuilder> values_builder;
arrow::MemoryPool* pool = arrow::default_memory_pool();
arrow::Status status = MakeBuilder(pool, value_type, &values_builder);
checkStatus(status, column->getName(), format_name);
auto dict_column = dynamic_cast<IColumnUnique &>(*dict_values).getNestedNotNullableColumn();
const auto & dict_type = removeNullable(assert_cast<const DataTypeLowCardinality *>(column_type.get())->getDictionaryType());
fillArrowArray(column_name, dict_column, dict_type, nullptr, values_builder.get(), format_name, is_nullable, dict_column->size(), output_string_as_string, output_fixed_string_as_fixed_byte_array, dictionary_values);
std::shared_ptr<arrow::Array> arrow_dict_array;
status = values_builder->Finish(&arrow_dict_array);
checkStatus(status, column->getName(), format_name);
status = builder->InsertMemoValues(*arrow_dict_array);
checkStatus(status, column->getName(), format_name);
/// AppendIndices in DictionaryBuilder works only with int64_t data, so we cannot use
/// fillArrowArray here and should copy all indexes to int64_t container.
auto indexes = extractIndexesImpl(column_lc->getIndexesPtr(), start, end, is_nullable);
PaddedPODArray<Int64> indexes;
if (mapping)
indexes = extractIndexesWithRemapping(column_lc->getIndexesPtr(), mapping, start, end, is_nullable);
else
indexes = extractIndexes(column_lc->getIndexesPtr(), start, end, is_nullable);
const uint8_t * arrow_null_bytemap_raw_ptr = nullptr;
PaddedPODArray<uint8_t> arrow_null_bytemap;
if (column_type->isLowCardinalityNullable())
@ -367,7 +442,7 @@ namespace DB
size_t end,
bool output_string_as_string,
bool output_fixed_string_as_fixed_byte_array,
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values)
std::unordered_map<String, MutableColumnPtr> & dictionary_values)
{
auto value_type = assert_cast<arrow::DictionaryType *>(array_builder->type().get())->value_type();
@ -552,7 +627,7 @@ namespace DB
size_t end,
bool output_string_as_string,
bool output_fixed_string_as_fixed_byte_array,
std::unordered_map<String, std::shared_ptr<arrow::Array>> & dictionary_values)
std::unordered_map<String, MutableColumnPtr> & dictionary_values)
{
const String column_type_name = column_type->getFamilyName();

View File

@ -26,7 +26,7 @@ private:
/// Map {column name : arrow dictionary}.
/// To avoid converting dictionary from LowCardinality to Arrow
/// Dictionary every chunk we save it and reuse.
std::unordered_map<std::string, std::shared_ptr<arrow::Array>> dictionary_values;
std::unordered_map<std::string, MutableColumnPtr> dictionary_values;
/// We should initialize arrow fields on first call of chChunkToArrowTable, not in constructor
/// because LowCardinality column from header always has indexes type UInt8, so, we should get

View File

@ -0,0 +1,20 @@
0
1
2
3
4
5
6
7
8
9
0
1
2
3
4
5
6
7
8
9

View File

@ -0,0 +1,9 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_LOCAL -q "select toLowCardinality(toString(number % 10)) as x from numbers(20) format Arrow settings max_block_size=7, output_format_arrow_low_cardinality_as_dictionary=1" | $CLICKHOUSE_LOCAL -q "select * from table" --input-format='Arrow'