RangeHashedDictionary added options range_lookup_strategy, convert_null_range_bound_to_open

This commit is contained in:
Maksim Kita 2022-01-24 11:43:49 +00:00
parent 4e7e67e330
commit e27332ce10
4 changed files with 308 additions and 40 deletions

View File

@ -60,8 +60,8 @@ private:
const auto & attributes_types_to_read = coordinator->getAttributesTypesToRead();
const auto & attributes_default_values_columns = coordinator->getAttributesDefaultValuesColumns();
const auto & dictionary = coordinator->getDictionary();
auto attributes_columns = dictionary->getColumns(
const auto & read_columns_func = coordinator->getReadColumnsFunc();
auto attributes_columns = read_columns_func(
attributes_names_to_read,
attributes_types_to_read,
key_columns,

View File

@ -19,6 +19,8 @@ class DictionarySourceCoordinator final : public shared_ptr_helper<DictionarySou
public:
using ReadColumnsFunc = std::function<Columns (const Strings &, const DataTypes &, const Columns &, const DataTypes &, const Columns &)>;
Pipe read(size_t num_streams);
private:
@ -31,6 +33,15 @@ private:
: dictionary(std::move(dictionary_))
, key_columns_with_type(std::move(key_columns_with_type_))
, max_block_size(max_block_size_)
, read_columns_func([this](
const Strings & attribute_names,
const DataTypes & result_types,
const Columns & key_columns,
const DataTypes & key_types,
const Columns & default_values_columns)
{
return dictionary->getColumns(attribute_names, result_types, key_columns, key_types, default_values_columns);
})
{
initialize(column_names);
}
@ -45,6 +56,31 @@ private:
, key_columns_with_type(std::move(key_columns_with_type_))
, data_columns_with_type(std::move(data_columns_with_type_))
, max_block_size(max_block_size_)
, read_columns_func([this](
const Strings & attribute_names,
const DataTypes & result_types,
const Columns & key_columns,
const DataTypes & key_types,
const Columns & default_values_columns)
{
return dictionary->getColumns(attribute_names, result_types, key_columns, key_types, default_values_columns);
})
{
initialize(column_names);
}
explicit DictionarySourceCoordinator(
std::shared_ptr<const IDictionary> dictionary_,
const Names & column_names,
ColumnsWithTypeAndName && key_columns_with_type_,
ColumnsWithTypeAndName && data_columns_with_type_,
size_t max_block_size_,
ReadColumnsFunc read_columns_func_)
: dictionary(std::move(dictionary_))
, key_columns_with_type(std::move(key_columns_with_type_))
, data_columns_with_type(std::move(data_columns_with_type_))
, max_block_size(max_block_size_)
, read_columns_func(std::move(read_columns_func_))
{
initialize(column_names);
}
@ -61,6 +97,8 @@ private:
const std::vector<ColumnPtr> & getAttributesDefaultValuesColumns() const { return attributes_default_values_columns; }
const ReadColumnsFunc & getReadColumnsFunc() const { return read_columns_func; }
const std::shared_ptr<const IDictionary> & getDictionary() const { return dictionary; }
void initialize(const Names & column_names);
@ -79,6 +117,8 @@ private:
std::vector<ColumnPtr> attributes_default_values_columns;
const size_t max_block_size;
ReadColumnsFunc read_columns_func;
std::atomic<size_t> parallel_read_block_index = 0;
};

View File

@ -26,6 +26,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
extern const int DICTIONARY_IS_EMPTY;
extern const int UNSUPPORTED_METHOD;
extern const int TYPE_MISMATCH;
}
@ -87,7 +88,7 @@ ColumnPtr RangeHashedDictionary<dictionary_key_type, RangeStorageDataType>::getC
vec_null_map_to = &col_null_map_to->getData();
}
auto type_call = [&](const auto &dictionary_attribute_type)
auto type_call = [&](const auto & dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
@ -171,6 +172,106 @@ ColumnPtr RangeHashedDictionary<dictionary_key_type, RangeStorageDataType>::getC
return result;
}
template <DictionaryKeyType dictionary_key_type, typename RangeStorageDataType>
ColumnPtr RangeHashedDictionary<dictionary_key_type, RangeStorageDataType>::getColumnInternal(
const std::string & attribute_name,
const DataTypePtr & result_type,
const PaddedPODArray<UInt64> & key_to_index) const
{
ColumnPtr result;
const auto & dictionary_attribute = dict_struct.getAttribute(attribute_name, result_type);
const size_t attribute_index = dict_struct.attribute_name_to_index.find(attribute_name)->second;
const auto & attribute = attributes[attribute_index];
size_t keys_size = key_to_index.size();
bool is_attribute_nullable = attribute.is_value_nullable.has_value();
ColumnUInt8::MutablePtr col_null_map_to;
ColumnUInt8::Container * vec_null_map_to = nullptr;
if (is_attribute_nullable)
{
col_null_map_to = ColumnUInt8::create(keys_size, false);
vec_null_map_to = &col_null_map_to->getData();
}
auto type_call = [&](const auto & dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using ValueType = DictionaryValueType<AttributeType>;
using ColumnProvider = DictionaryAttributeColumnProvider<AttributeType>;
auto column = ColumnProvider::getColumn(dictionary_attribute, keys_size);
if constexpr (std::is_same_v<ValueType, Array>)
{
auto * out = column.get();
getItemsInternalImpl<ValueType, false>(
attribute,
key_to_index,
[&](size_t, const Array & value, bool)
{
out->insert(value);
});
}
else if constexpr (std::is_same_v<ValueType, StringRef>)
{
auto * out = column.get();
if (is_attribute_nullable)
getItemsInternalImpl<ValueType, true>(
attribute,
key_to_index,
[&](size_t row, const StringRef value, bool is_null)
{
(*vec_null_map_to)[row] = is_null;
out->insertData(value.data, value.size);
});
else
getItemsInternalImpl<ValueType, false>(
attribute,
key_to_index,
[&](size_t, const StringRef value, bool)
{
out->insertData(value.data, value.size);
});
}
else
{
auto & out = column->getData();
if (is_attribute_nullable)
getItemsInternalImpl<ValueType, true>(
attribute,
key_to_index,
[&](size_t row, const auto value, bool is_null)
{
(*vec_null_map_to)[row] = is_null;
out[row] = value;
});
else
getItemsInternalImpl<ValueType, false>(
attribute,
key_to_index,
[&](size_t row, const auto value, bool)
{
out[row] = value;
});
}
result = std::move(column);
};
callOnDictionaryAttributeType(attribute.type, type_call);
if (is_attribute_nullable)
result = ColumnNullable::create(std::move(result), std::move(col_null_map_to));
return result;
}
template <DictionaryKeyType dictionary_key_type, typename RangeStorageDataType>
ColumnUInt8::Ptr RangeHashedDictionary<dictionary_key_type, RangeStorageDataType>::hasKeys(const Columns & key_columns, const DataTypes & key_types) const
{
@ -185,8 +286,13 @@ ColumnUInt8::Ptr RangeHashedDictionary<dictionary_key_type, RangeStorageDataType
ColumnWithTypeAndName column_to_cast = {range_storage_column->convertToFullColumnIfConst(), key_types.back(), ""};
auto range_column_updated = castColumnAccurate(column_to_cast, dict_struct.range_min->type);
const auto & range_column = assert_cast<const RangeColumnType &>(*range_column_updated);
const auto & range_column_data = range_column.getData();
const auto * range_column = typeid_cast<const RangeColumnType *>(range_column_updated.get());
if (!range_column)
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Dictionary {} range column type should be equal to {}",
getFullName(),
dict_struct.range_min->type->getName());
const auto & range_column_data = range_column->getData();
auto key_columns_copy = key_columns;
key_columns_copy.pop_back();
@ -363,33 +469,42 @@ void RangeHashedDictionary<dictionary_key_type, RangeStorageDataType>::getItemsI
const auto date = range_column_data[key_index];
const auto & interval_tree = it->getMapped();
size_t min_value_index = 0;
std::optional<RangeInterval> min_range;
size_t value_index = 0;
std::optional<RangeInterval> range;
interval_tree.find(date, [&](auto & interval, auto & value_index)
interval_tree.find(date, [&](auto & interval, auto & interval_value_index)
{
if (min_range && interval < *min_range)
if (range)
{
min_range = interval;
min_value_index = value_index;
if (likely(configuration.lookup_strategy == RangeHashedDictionaryLookupStrategy::min) && interval < *range)
{
range = interval;
value_index = interval_value_index;
}
else if (configuration.lookup_strategy == RangeHashedDictionaryLookupStrategy::max && interval > * range)
{
range = interval;
value_index = interval_value_index;
}
}
else
{
min_range = interval;
min_value_index = value_index;
range = interval;
value_index = interval_value_index;
}
return true;
});
if (min_range.has_value())
if (range.has_value())
{
++keys_found;
AttributeType value = container[value_index];
if constexpr (is_nullable)
{
AttributeType value = container[min_value_index];
bool is_null = (*attribute.is_value_nullable)[min_value_index];
bool is_null = (*attribute.is_value_nullable)[value_index];
if (!is_null)
set_value(key_index, value, false);
@ -398,7 +513,6 @@ void RangeHashedDictionary<dictionary_key_type, RangeStorageDataType>::getItemsI
}
else
{
AttributeType value = container[min_value_index];
set_value(key_index, value, false);
}
@ -419,6 +533,53 @@ void RangeHashedDictionary<dictionary_key_type, RangeStorageDataType>::getItemsI
found_count.fetch_add(keys_found, std::memory_order_relaxed);
}
template <DictionaryKeyType dictionary_key_type, typename RangeStorageDataType>
template <typename AttributeType, bool is_nullable, typename ValueSetter>
void RangeHashedDictionary<dictionary_key_type, RangeStorageDataType>::getItemsInternalImpl(
const Attribute & attribute,
const PaddedPODArray<UInt64> & key_to_index,
ValueSetter && set_value) const
{
size_t keys_size = key_to_index.size();
const auto & container = std::get<AttributeContainerType<AttributeType>>(attribute.container);
size_t container_size = container.size();
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
UInt64 container_index = key_to_index[key_index];
if (unlikely(container_index >= container_size))
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Dictionary {} expected attribute container index {} must be less than attribute container size {}",
getFullName(),
container_index,
container_size
);
}
AttributeType value = container[container_index];
if constexpr (is_nullable)
{
bool is_null = (*attribute.is_value_nullable)[container_index];
if (!is_null)
set_value(key_index, value, false);
else
set_value(key_index, value, true);
}
else
{
set_value(key_index, value, false);
}
}
query_count.fetch_add(keys_size, std::memory_order_relaxed);
found_count.fetch_add(keys_size, std::memory_order_relaxed);
}
template <DictionaryKeyType dictionary_key_type, typename RangeStorageDataType>
void RangeHashedDictionary<dictionary_key_type, RangeStorageDataType>::updateData()
{
@ -512,14 +673,14 @@ void RangeHashedDictionary<dictionary_key_type, RangeStorageDataType>::blockToAt
const auto * min_range_column_typed = typeid_cast<const RangeColumnType *>(min_range_column);
if (!min_range_column_typed)
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Dictionary {} range min should be equal to {}",
"Dictionary {} range min column type should be equal to {}",
getFullName(),
dict_struct.range_min->type->getName());
const auto * max_range_column_typed = typeid_cast<const RangeColumnType *>(max_range_column);
if (!max_range_column_typed)
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Dictionary {} range max should be equal to {}",
"Dictionary {} range max column type should be equal to {}",
getFullName(),
dict_struct.range_max->type->getName());
@ -644,14 +805,13 @@ void RangeHashedDictionary<dictionary_key_type, RangeStorageDataType>::setAttrib
template <DictionaryKeyType dictionary_key_type, typename RangeStorageDataType>
Pipe RangeHashedDictionary<dictionary_key_type, RangeStorageDataType>::read(const Names & column_names, size_t max_block_size, size_t num_streams) const
{
auto range_key_column = dict_struct.range_min->type->createColumn();
auto key_to_index_column = ColumnUInt64::create();
auto range_min_column = dict_struct.range_min->type->createColumn();
auto * range_key_column_typed = typeid_cast<RangeColumnType *>(range_key_column.get());
auto * range_min_column_typed = typeid_cast<RangeColumnType *>(range_min_column.get());
if (!range_min_column_typed || !range_key_column_typed)
if (!range_min_column_typed)
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Dictionary {} range min should be equal to {}",
"Dictionary {} range min column type should be equal to {}",
getFullName(),
dict_struct.range_min->type->getName());
@ -660,30 +820,28 @@ Pipe RangeHashedDictionary<dictionary_key_type, RangeStorageDataType>::read(cons
auto * range_max_column_typed = typeid_cast<RangeColumnType *>(range_max_column.get());
if (!range_max_column_typed)
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Dictionary {} range max should be equal to {}",
"Dictionary {} range max column type should be equal to {}",
getFullName(),
dict_struct.range_max->type->getName());
PaddedPODArray<KeyType> keys;
auto & range_key_column_data = range_key_column_typed->getData();
auto & key_to_index_column_data = key_to_index_column->getData();
auto & range_min_column_data = range_min_column_typed->getData();
auto & range_max_column_data = range_max_column_typed->getData();
const auto & container = key_attribute.container;
size_t container_size = container.size();
keys.reserve(container_size);
range_key_column_data.reserve(container_size);
range_min_column_data.reserve(container_size);
range_max_column_data.reserve(container_size);
keys.reserve(element_count);
key_to_index_column_data.reserve(element_count);
range_min_column_data.reserve(element_count);
range_max_column_data.reserve(element_count);
for (const auto & key : container)
{
for (const auto & [interval, _] : key.getMapped())
for (const auto & [interval, index] : key.getMapped())
{
keys.push_back(key.getKey());
range_key_column_data.push_back(interval.left);
keys.emplace_back(key.getKey());
key_to_index_column_data.emplace_back(index);
range_min_column_data.push_back(interval.left);
range_max_column_data.push_back(interval.right);
}
@ -703,12 +861,54 @@ Pipe RangeHashedDictionary<dictionary_key_type, RangeStorageDataType>::read(cons
key_columns = deserializeColumnsWithTypeAndNameFromKeys(dict_struct, keys, 0, keys.size());
}
key_columns.emplace_back(ColumnWithTypeAndName{std::move(range_key_column), dict_struct.range_min->type, ""});
key_columns.emplace_back(ColumnWithTypeAndName{std::move(key_to_index_column), std::make_shared<DataTypeUInt64>(), ""});
ColumnsWithTypeAndName data_columns = {std::move(range_min_column_with_type), std::move(range_max_column_with_type)};
std::shared_ptr<const IDictionary> dictionary = shared_from_this();
auto coordinator = DictionarySourceCoordinator::create(dictionary, column_names, std::move(key_columns), std::move(data_columns), max_block_size);
DictionarySourceCoordinator::ReadColumnsFunc read_keys_func = [dictionary_copy = dictionary](
const Strings & attribute_names,
const DataTypes & result_types,
const Columns & key_columns,
const DataTypes,
const Columns &)
{
auto range_dictionary_ptr = std::static_pointer_cast<const RangeHashedDictionary<dictionary_key_type, RangeStorageDataType>>(dictionary_copy);
size_t attribute_names_size = attribute_names.size();
Columns result;
result.reserve(attribute_names_size);
auto key_column = key_columns.back();
const auto * key_to_index_column = typeid_cast<const ColumnUInt64 *>(key_column.get());
if (!key_to_index_column)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Dictionary {} read expect indexes column with type UInt64",
range_dictionary_ptr->getFullName());
const auto & data = key_to_index_column->getData();
for (size_t i = 0; i < attribute_names_size; ++i)
{
const auto & attribute_name = attribute_names[i];
const auto & result_type = result_types[i];
result.emplace_back(range_dictionary_ptr->getColumnInternal(attribute_name, result_type, data));
}
return result;
};
auto coordinator = DictionarySourceCoordinator::create(
dictionary,
column_names,
std::move(key_columns),
std::move(data_columns),
max_block_size,
std::move(read_keys_func));
auto result = coordinator->read(num_streams);
return result;
@ -723,7 +923,7 @@ static DictionaryPtr createRangeHashedDictionary(const std::string & full_name,
{
static constexpr auto layout_name = dictionary_key_type == DictionaryKeyType::Simple ? "range_hashed" : "complex_key_range_hashed";
if (dictionary_key_type == DictionaryKeyType::Simple)
if constexpr (dictionary_key_type == DictionaryKeyType::Simple)
{
if (dict_struct.key)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "'key' is not supported for dictionary of layout 'range_hashed'");
@ -744,11 +944,21 @@ static DictionaryPtr createRangeHashedDictionary(const std::string & full_name,
const auto dict_id = StorageID::fromDictionaryConfig(config, config_prefix);
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
const bool convert_null_range_bound_to_open = config.getBool(config_prefix + ".convert_null_range_bound_to_open", true);
String dictionary_layout_prefix = config_prefix + ".layout." + layout_name;
const bool convert_null_range_bound_to_open = config.getBool(dictionary_layout_prefix + ".convert_null_range_bound_to_open", true);
String range_lookup_strategy = config.getString(dictionary_layout_prefix + ".range_lookup_strategy", "min");
RangeHashedDictionaryLookupStrategy lookup_strategy = RangeHashedDictionaryLookupStrategy::min;
if (range_lookup_strategy == "min")
lookup_strategy = RangeHashedDictionaryLookupStrategy::min;
else if (range_lookup_strategy == "max")
lookup_strategy = RangeHashedDictionaryLookupStrategy::max;
RangeHashedDictionaryConfiguration configuration
{
.convert_null_range_bound_to_open = convert_null_range_bound_to_open,
.lookup_strategy = lookup_strategy,
.require_nonempty = require_nonempty
};
@ -763,7 +973,7 @@ static DictionaryPtr createRangeHashedDictionary(const std::string & full_name,
if constexpr (IsDataTypeDecimalOrNumber<DataType> || IsDataTypeDateOrDateTime<DataType> || IsDataTypeEnum<DataType>)
{
result = std::make_unique<RangeHashedDictionary<DictionaryKeyType::Simple, DataType>>(
result = std::make_unique<RangeHashedDictionary<dictionary_key_type, DataType>>(
dict_id,
dict_struct,
std::move(source_ptr),

View File

@ -19,9 +19,16 @@
namespace DB
{
enum class RangeHashedDictionaryLookupStrategy
{
min,
max
};
struct RangeHashedDictionaryConfiguration
{
bool convert_null_range_bound_to_open;
RangeHashedDictionaryLookupStrategy lookup_strategy;
bool require_nonempty;
};
@ -88,7 +95,7 @@ public:
DictionarySpecialKeyType getSpecialKeyType() const override { return DictionarySpecialKeyType::Range;}
ColumnPtr getColumn(
const std::string& attribute_name,
const std::string & attribute_name,
const DataTypePtr & result_type,
const Columns & key_columns,
const DataTypes & key_types,
@ -167,6 +174,17 @@ private:
ValueSetter && set_value,
DefaultValueExtractor & default_value_extractor) const;
ColumnPtr getColumnInternal(
const std::string & attribute_name,
const DataTypePtr & result_type,
const PaddedPODArray<UInt64> & key_to_index) const;
template <typename AttributeType, bool is_nullable, typename ValueSetter>
void getItemsInternalImpl(
const Attribute & attribute,
const PaddedPODArray<UInt64> & key_to_index,
ValueSetter && set_value) const;
void updateData();
void blockToAttributes(const Block & block);