DirectDictionary updated

This commit is contained in:
Maksim Kita 2021-03-08 01:32:13 +03:00 committed by Maksim Kita
parent 5a6cc876e4
commit 45879472d3
12 changed files with 762 additions and 912 deletions

View File

@ -133,41 +133,25 @@ std::string ClickHouseDictionarySource::getUpdateFieldAndDate()
BlockInputStreamPtr ClickHouseDictionarySource::loadAll()
{
/** Query to local ClickHouse is marked internal in order to avoid
* the necessity of holding process_list_element shared pointer.
*/
if (is_local)
{
auto stream = executeQuery(load_all_query, context, true).getInputStream();
/// FIXME res.in may implicitly use some objects owned be res, but them will be destructed after return
stream = std::make_shared<ConvertingBlockInputStream>(stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return stream;
}
return std::make_shared<RemoteBlockInputStream>(pool, load_all_query, sample_block, context);
return createStreamForQuery(load_all_query);
}
BlockInputStreamPtr ClickHouseDictionarySource::loadUpdatedAll()
{
std::string load_update_query = getUpdateFieldAndDate();
if (is_local)
{
auto stream = executeQuery(load_update_query, context, true).getInputStream();
stream = std::make_shared<ConvertingBlockInputStream>(stream, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return stream;
}
return std::make_shared<RemoteBlockInputStream>(pool, load_update_query, sample_block, context);
String load_update_query = getUpdateFieldAndDate();
return createStreamForQuery(load_update_query);
}
BlockInputStreamPtr ClickHouseDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
return createStreamForSelectiveLoad(query_builder.composeLoadIdsQuery(ids));
return createStreamForQuery(query_builder.composeLoadIdsQuery(ids));
}
BlockInputStreamPtr ClickHouseDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
return createStreamForSelectiveLoad(
query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::IN_WITH_TUPLES));
String query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::IN_WITH_TUPLES);
return createStreamForQuery(query);
}
bool ClickHouseDictionarySource::isModified() const
@ -194,17 +178,19 @@ std::string ClickHouseDictionarySource::toString() const
}
BlockInputStreamPtr ClickHouseDictionarySource::createStreamForSelectiveLoad(const std::string & query)
BlockInputStreamPtr ClickHouseDictionarySource::createStreamForQuery(const String & query)
{
/// Sample block should not contain first row default values
auto empty_sample_block = sample_block.cloneEmpty();
if (is_local)
{
auto res = executeQuery(query, context, true).getInputStream();
res = std::make_shared<ConvertingBlockInputStream>(
res, sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return res;
auto stream = executeQuery(query, context, true).getInputStream();
stream = std::make_shared<ConvertingBlockInputStream>(stream, empty_sample_block, ConvertingBlockInputStream::MatchColumnsMode::Position);
return stream;
}
return std::make_shared<RemoteBlockInputStream>(pool, query, sample_block, context);
return std::make_shared<RemoteBlockInputStream>(pool, query, empty_sample_block, context);
}
std::string ClickHouseDictionarySource::doInvalidateQuery(const std::string & request) const

View File

@ -55,7 +55,7 @@ public:
private:
std::string getUpdateFieldAndDate();
BlockInputStreamPtr createStreamForSelectiveLoad(const std::string & query);
BlockInputStreamPtr createStreamForQuery(const String & query);
std::string doInvalidateQuery(const std::string & request) const;

View File

@ -1,403 +0,0 @@
#include "ComplexKeyDirectDictionary.h"
#include <IO/WriteHelpers.h>
#include "DictionaryBlockInputStream.h"
#include "DictionaryFactory.h"
#include <Core/Defines.h>
#include <Columns/ColumnNullable.h>
#include <Functions/FunctionHelpers.h>
#include <DataTypes/DataTypesDecimal.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
extern const int BAD_ARGUMENTS;
extern const int UNSUPPORTED_METHOD;
}
ComplexKeyDirectDictionary::ComplexKeyDirectDictionary(
const StorageID & dict_id_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
BlockPtr saved_block_)
: IDictionaryBase(dict_id_)
, dict_struct(dict_struct_)
, source_ptr{std::move(source_ptr_)}
, saved_block{std::move(saved_block_)}
{
if (!this->source_ptr->supportsSelectiveLoad())
throw Exception{full_name + ": source cannot be used with ComplexKeyDirectDictionary", ErrorCodes::UNSUPPORTED_METHOD};
createAttributes();
}
ColumnPtr ComplexKeyDirectDictionary::getColumn(
const std::string & attribute_name,
const DataTypePtr & result_type,
const Columns & key_columns,
const DataTypes & key_types,
const ColumnPtr & default_values_column) const
{
dict_struct.validateKeyTypes(key_types);
ColumnPtr result;
const auto & attribute = getAttribute(attribute_name);
const auto & dictionary_attribute = dict_struct.getAttribute(attribute_name, result_type);
auto keys_size = key_columns.front()->size();
ColumnUInt8::MutablePtr col_null_map_to;
ColumnUInt8::Container * vec_null_map_to = nullptr;
if (attribute.is_nullable)
{
col_null_map_to = ColumnUInt8::create(keys_size, false);
vec_null_map_to = &col_null_map_to->getData();
}
auto type_call = [&](const auto & dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using ValueType = DictionaryValueType<AttributeType>;
using ColumnProvider = DictionaryAttributeColumnProvider<AttributeType>;
const auto attribute_null_value = std::get<ValueType>(attribute.null_values);
AttributeType null_value = static_cast<AttributeType>(attribute_null_value);
DictionaryDefaultValueExtractor<AttributeType> default_value_extractor(std::move(null_value), default_values_column);
auto column = ColumnProvider::getColumn(dictionary_attribute, keys_size);
if constexpr (std::is_same_v<AttributeType, String>)
{
auto * out = column.get();
getItemsImpl<String, String>(
attribute,
key_columns,
[&](const size_t row, const String value, bool is_null)
{
if (attribute.is_nullable)
(*vec_null_map_to)[row] = is_null;
const auto ref = StringRef{value};
out->insertData(ref.data, ref.size);
},
default_value_extractor);
}
else
{
auto & out = column->getData();
getItemsImpl<AttributeType, AttributeType>(
attribute,
key_columns,
[&](const size_t row, const auto value, bool is_null)
{
if (attribute.is_nullable)
(*vec_null_map_to)[row] = is_null;
out[row] = value;
},
default_value_extractor);
}
result = std::move(column);
};
callOnDictionaryAttributeType(attribute.type, type_call);
if (attribute.is_nullable)
{
result = ColumnNullable::create(result, std::move(col_null_map_to));
}
return result;
}
ColumnUInt8::Ptr ComplexKeyDirectDictionary::hasKeys(const Columns & key_columns, const DataTypes & key_types) const
{
dict_struct.validateKeyTypes(key_types);
auto size = key_columns.front()->size();
auto result = ColumnUInt8::create(size);
auto& out = result->getData();
const auto rows = key_columns.front()->size();
const auto keys_size = dict_struct.key->size();
StringRefs keys_array(keys_size);
MapType<UInt8> has_key;
Arena temporary_keys_pool;
std::vector<size_t> to_load(rows);
PODArray<StringRef> keys(rows);
for (const auto row : ext::range(0, rows))
{
const StringRef key = placeKeysInPool(row, key_columns, keys_array, *dict_struct.key, temporary_keys_pool);
keys[row] = key;
has_key[key] = 0;
to_load[row] = row;
}
auto stream = source_ptr->loadKeys(key_columns, to_load);
stream->readPrefix();
while (const auto block = stream->read())
{
const auto columns = ext::map<Columns>(
ext::range(0, keys_size), [&](const size_t attribute_idx) { return block.safeGetByPosition(attribute_idx).column; });
Arena pool;
StringRefs keys_temp(keys_size);
const auto columns_size = columns.front()->size();
for (const auto row_idx : ext::range(0, columns_size))
{
const StringRef key = placeKeysInPool(row_idx, columns, keys_temp, *dict_struct.key, pool);
if (has_key.has(key))
{
has_key[key] = 1;
}
}
}
stream->readSuffix();
for (const auto row : ext::range(0, rows))
{
out[row] = has_key[keys[row]];
}
query_count.fetch_add(rows, std::memory_order_relaxed);
return result;
}
void ComplexKeyDirectDictionary::createAttributes()
{
const auto size = dict_struct.attributes.size();
attributes.reserve(size);
for (const auto & attribute : dict_struct.attributes)
{
attribute_index_by_name.emplace(attribute.name, attributes.size());
attribute_name_by_index.emplace(attributes.size(), attribute.name);
attributes.push_back(createAttribute(attribute, attribute.null_value, attribute.name));
if (attribute.hierarchical)
throw Exception{full_name + ": hierarchical attributes not supported for dictionary of type " + getTypeName(),
ErrorCodes::TYPE_MISMATCH};
}
}
template <typename T>
void ComplexKeyDirectDictionary::createAttributeImpl(Attribute & attribute, const Field & null_value)
{
attribute.null_values = T(null_value.get<NearestFieldType<T>>());
}
template <>
void ComplexKeyDirectDictionary::createAttributeImpl<String>(Attribute & attribute, const Field & null_value)
{
attribute.string_arena = std::make_unique<Arena>();
const String & string = null_value.get<String>();
const char * string_in_arena = attribute.string_arena->insert(string.data(), string.size());
attribute.null_values.emplace<StringRef>(string_in_arena, string.size());
}
ComplexKeyDirectDictionary::Attribute ComplexKeyDirectDictionary::createAttribute(
const DictionaryAttribute & attribute, const Field & null_value, const std::string & attr_name)
{
Attribute attr{attribute.underlying_type, attribute.is_nullable, {}, {}, attr_name};
auto type_call = [&](const auto &dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
createAttributeImpl<AttributeType>(attr, null_value);
};
callOnDictionaryAttributeType(attribute.underlying_type, type_call);
return attr;
}
template <typename Pool>
StringRef ComplexKeyDirectDictionary::placeKeysInPool(
const size_t row, const Columns & key_columns, StringRefs & keys, const std::vector<DictionaryAttribute> & key_attributes, Pool & pool) const
{
const auto keys_size = key_columns.size();
size_t sum_keys_size{};
for (size_t j = 0; j < keys_size; ++j)
{
keys[j] = key_columns[j]->getDataAt(row);
sum_keys_size += keys[j].size;
if (key_attributes[j].underlying_type == AttributeUnderlyingType::utString)
sum_keys_size += sizeof(size_t) + 1;
}
auto place = pool.alloc(sum_keys_size);
auto key_start = place;
for (size_t j = 0; j < keys_size; ++j)
{
if (key_attributes[j].underlying_type == AttributeUnderlyingType::utString)
{
auto start = key_start;
auto key_size = keys[j].size + 1;
memcpy(key_start, &key_size, sizeof(size_t));
key_start += sizeof(size_t);
memcpy(key_start, keys[j].data, keys[j].size);
key_start += keys[j].size;
*key_start = '\0';
++key_start;
keys[j].data = start;
keys[j].size += sizeof(size_t) + 1;
}
else
{
memcpy(key_start, keys[j].data, keys[j].size);
keys[j].data = key_start;
key_start += keys[j].size;
}
}
return {place, sum_keys_size};
}
template <typename AttributeType, typename OutputType, typename ValueSetter, typename DefaultValueExtractor>
void ComplexKeyDirectDictionary::getItemsImpl(
const Attribute & attribute,
const Columns & key_columns,
ValueSetter && set_value,
DefaultValueExtractor & default_value_extractor) const
{
const auto rows = key_columns.front()->size();
const auto keys_size = dict_struct.key->size();
StringRefs keys_array(keys_size);
MapType<OutputType> value_by_key;
HashMapWithSavedHash<StringRef, bool, StringRefHash> value_is_null;
Arena temporary_keys_pool;
std::vector<size_t> to_load(rows);
PODArray<StringRef> keys(rows);
for (const auto row : ext::range(0, rows))
{
const StringRef key = placeKeysInPool(row, key_columns, keys_array, *dict_struct.key, temporary_keys_pool);
keys[row] = key;
value_by_key[key] = static_cast<AttributeType>(default_value_extractor[row]);
to_load[row] = row;
value_is_null[key] = false;
}
auto stream = source_ptr->loadKeys(key_columns, to_load);
const auto attributes_size = attributes.size();
stream->readPrefix();
while (const auto block = stream->read())
{
const auto columns = ext::map<Columns>(
ext::range(0, keys_size), [&](const size_t attribute_idx) { return block.safeGetByPosition(attribute_idx).column; });
const auto attribute_columns = ext::map<Columns>(ext::range(0, attributes_size), [&](const size_t attribute_idx)
{
return block.safeGetByPosition(keys_size + attribute_idx).column;
});
for (const size_t attribute_idx : ext::range(0, attributes.size()))
{
if (attribute.name != attribute_name_by_index.at(attribute_idx))
{
continue;
}
const IColumn & attribute_column = *attribute_columns[attribute_idx];
Arena pool;
StringRefs keys_temp(keys_size);
const auto columns_size = columns.front()->size();
for (const auto row_idx : ext::range(0, columns_size))
{
const StringRef key = placeKeysInPool(row_idx, columns, keys_temp, *dict_struct.key, pool);
if (value_by_key.has(key))
{
auto value = attribute_column[row_idx];
if (value.isNull())
value_is_null[key] = true;
else
value_by_key[key] = static_cast<OutputType>(value.template get<NearestFieldType<AttributeType>>());
}
}
}
}
stream->readSuffix();
for (const auto row : ext::range(0, rows))
{
auto key = keys[row];
set_value(row, value_by_key[key], value_is_null[key]);
}
query_count.fetch_add(rows, std::memory_order_relaxed);
}
const ComplexKeyDirectDictionary::Attribute & ComplexKeyDirectDictionary::getAttribute(const std::string & attribute_name) const
{
const auto it = attribute_index_by_name.find(attribute_name);
if (it == std::end(attribute_index_by_name))
throw Exception{full_name + ": no such attribute '" + attribute_name + "'", ErrorCodes::BAD_ARGUMENTS};
return attributes[it->second];
}
BlockInputStreamPtr ComplexKeyDirectDictionary::getBlockInputStream(const Names & /* column_names */, size_t /* max_block_size */) const
{
return source_ptr->loadAll();
}
void registerDictionaryComplexKeyDirect(DictionaryFactory & factory)
{
auto create_layout = [=](const std::string & full_name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr) -> DictionaryPtr
{
if (!dict_struct.key)
throw Exception{"'key' is required for dictionary of layout 'complex_key_direct'", ErrorCodes::BAD_ARGUMENTS};
if (dict_struct.range_min || dict_struct.range_max)
throw Exception{full_name
+ ": elements .structure.range_min and .structure.range_max should be defined only "
"for a dictionary of layout 'range_hashed'",
ErrorCodes::BAD_ARGUMENTS};
const auto dict_id = StorageID::fromDictionaryConfig(config, config_prefix);
if (config.has(config_prefix + ".lifetime.min") || config.has(config_prefix + ".lifetime.max"))
throw Exception{"'lifetime' parameter is redundant for the dictionary' of layout 'direct'", ErrorCodes::BAD_ARGUMENTS};
return std::make_unique<ComplexKeyDirectDictionary>(dict_id, dict_struct, std::move(source_ptr));
};
factory.registerLayout("complex_key_direct", create_layout, true);
}
}

View File

@ -1,147 +0,0 @@
#pragma once
#include <atomic>
#include <variant>
#include <vector>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <Common/Arena.h>
#include <Core/Block.h>
#include <Common/HashTable/HashMap.h>
#include <ext/range.h>
#include <ext/size.h>
#include <ext/map.h>
#include "IDictionary.h"
#include "IDictionarySource.h"
#include "DictionaryStructure.h"
#include "DictionaryHelpers.h"
namespace DB
{
class ComplexKeyDirectDictionary final : public IDictionaryBase
{
public:
ComplexKeyDirectDictionary(
const StorageID & dict_id_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
BlockPtr saved_block_ = nullptr);
std::string getTypeName() const override { return "ComplexKeyDirect"; }
size_t getBytesAllocated() const override { return 0; }
size_t getQueryCount() const override { return query_count.load(std::memory_order_relaxed); }
double getHitRate() const override { return 1.0; }
size_t getElementCount() const override { return 0; }
double getLoadFactor() const override { return 0; }
std::string getKeyDescription() const { return key_description; }
std::shared_ptr<const IExternalLoadable> clone() const override
{
return std::make_shared<ComplexKeyDirectDictionary>(getDictionaryID(), dict_struct, source_ptr->clone(), saved_block);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }
const DictionaryLifetime & getLifetime() const override { return dict_lifetime; }
const DictionaryStructure & getStructure() const override { return dict_struct; }
bool isInjective(const std::string & attribute_name) const override
{
return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective;
}
DictionaryKeyType getKeyType() const override { return DictionaryKeyType::complex; }
ColumnPtr getColumn(
const std::string& attribute_name,
const DataTypePtr & result_type,
const Columns & key_columns,
const DataTypes & key_types,
const ColumnPtr & default_values_column) const override;
ColumnUInt8::Ptr hasKeys(const Columns & key_columns, const DataTypes & key_types) const override;
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override;
private:
template <typename Value>
using MapType = HashMapWithSavedHash<StringRef, Value, StringRefHash>;
struct Attribute final
{
AttributeUnderlyingType type;
bool is_nullable;
std::variant<
UInt8,
UInt16,
UInt32,
UInt64,
UInt128,
Int8,
Int16,
Int32,
Int64,
Decimal32,
Decimal64,
Decimal128,
Float32,
Float64,
StringRef>
null_values;
std::unique_ptr<Arena> string_arena;
std::string name;
};
void createAttributes();
template <typename T>
void addAttributeSize(const Attribute & attribute);
template <typename T>
static void createAttributeImpl(Attribute & attribute, const Field & null_value);
static Attribute createAttribute(const DictionaryAttribute & attribute, const Field & null_value, const std::string & name);
template <typename Pool>
StringRef placeKeysInPool(
const size_t row, const Columns & key_columns, StringRefs & keys, const std::vector<DictionaryAttribute> & key_attributes, Pool & pool) const;
template <typename AttributeType, typename OutputType, typename ValueSetter, typename DefaultValueExtractor>
void getItemsImpl(
const Attribute & attribute,
const Columns & key_columns,
ValueSetter && set_value,
DefaultValueExtractor & default_value_extractor) const;
template <typename T>
void setAttributeValueImpl(Attribute & attribute, const Key id, const T & value);
void setAttributeValue(Attribute & attribute, const Key id, const Field & value);
const Attribute & getAttribute(const std::string & attribute_name) const;
const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
std::map<std::string, size_t> attribute_index_by_name;
std::map<size_t, std::string> attribute_name_by_index;
std::vector<Attribute> attributes;
mutable std::atomic<size_t> query_count{0};
BlockPtr saved_block;
const std::string key_description{dict_struct.getKeyDescription()};
};
}

View File

@ -17,8 +17,115 @@ namespace ErrorCodes
extern const int UNSUPPORTED_METHOD;
}
namespace
{
inline UInt64 getAt(const PaddedPODArray<UInt64> & arr, const size_t idx)
{
return arr[idx];
}
DirectDictionary::DirectDictionary(
inline UInt64 getAt(const UInt64 & value, const size_t)
{
return value;
}
/// TODO: Use this class from DictionaryHelpers after cache dictionaries pull request will be merged
template <DictionaryKeyType key_type>
class DictionaryKeysExtractor
{
public:
using KeyType = std::conditional_t<key_type == DictionaryKeyType::simple, UInt64, StringRef>;
static_assert(key_type != DictionaryKeyType::range, "Range key type is not supported by DictionaryKeysExtractor");
explicit DictionaryKeysExtractor(const Columns & key_columns, Arena & existing_arena)
{
assert(!key_columns.empty());
if constexpr (key_type == DictionaryKeyType::simple)
keys = getColumnVectorData(key_columns.front());
else
keys = deserializeKeyColumnsInArena(key_columns, existing_arena);
}
const PaddedPODArray<KeyType> & getKeys() const
{
return keys;
}
private:
static PaddedPODArray<UInt64> getColumnVectorData(const ColumnPtr column)
{
PaddedPODArray<UInt64> result;
auto full_column = column->convertToFullColumnIfConst();
const auto *vector_col = checkAndGetColumn<ColumnVector<UInt64>>(full_column.get());
if (!vector_col)
throw Exception{ErrorCodes::TYPE_MISMATCH, "Column type mismatch for simple key expected UInt64"};
result.assign(vector_col->getData());
return result;
}
static PaddedPODArray<StringRef> deserializeKeyColumnsInArena(const Columns & key_columns, Arena & temporary_arena)
{
size_t keys_size = key_columns.front()->size();
PaddedPODArray<StringRef> result;
result.reserve(keys_size);
PaddedPODArray<StringRef> temporary_column_data(key_columns.size());
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
size_t allocated_size_for_columns = 0;
const char * block_start = nullptr;
for (size_t column_index = 0; column_index < key_columns.size(); ++column_index)
{
const auto & column = key_columns[column_index];
temporary_column_data[column_index] = column->serializeValueIntoArena(key_index, temporary_arena, block_start);
allocated_size_for_columns += temporary_column_data[column_index].size;
}
result.push_back(StringRef{block_start, allocated_size_for_columns});
}
return result;
}
PaddedPODArray<KeyType> keys;
};
/// TODO: Use this class from DictionaryHelpers after cache dictionaries pull request will be merged
class DefaultValueProvider final
{
public:
explicit DefaultValueProvider(Field default_value_, ColumnPtr default_values_column_ = nullptr)
: default_value(std::move(default_value_))
, default_values_column(default_values_column_)
{
}
Field getDefaultValue(size_t row) const
{
if (default_values_column)
return (*default_values_column)[row];
return default_value;
}
private:
Field default_value;
ColumnPtr default_values_column;
};
}
template <DictionaryKeyType dictionary_key_type>
DirectDictionary<dictionary_key_type>::DirectDictionary(
const StorageID & dict_id_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
@ -28,36 +135,37 @@ DirectDictionary::DirectDictionary(
, source_ptr{std::move(source_ptr_)}
, saved_block{std::move(saved_block_)}
{
if (!this->source_ptr->supportsSelectiveLoad())
if (!source_ptr->supportsSelectiveLoad())
throw Exception{full_name + ": source cannot be used with DirectDictionary", ErrorCodes::UNSUPPORTED_METHOD};
createAttributes();
setup();
}
void DirectDictionary::toParent(const PaddedPODArray<Key> & ids, PaddedPODArray<Key> & out) const
template <DictionaryKeyType dictionary_key_type>
void DirectDictionary<dictionary_key_type>::toParent(const PaddedPODArray<Key> & ids, PaddedPODArray<Key> & out) const
{
const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);
DictionaryDefaultValueExtractor<UInt64> extractor(null_value);
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
{
const auto & attribute_name = hierarchical_attribute->name;
getItemsImpl<UInt64, UInt64>(
*hierarchical_attribute,
ids,
[&](const size_t row, const UInt64 value, bool) { out[row] = value; },
extractor);
auto result_type = std::make_shared<DataTypeUInt64>();
auto input_column = result_type->createColumn();
auto & input_column_typed = assert_cast<ColumnVector<UInt64> &>(*input_column);
auto & data = input_column_typed.getData();
data.insert(ids.begin(), ids.end());
auto column = getColumn({attribute_name}, result_type, {std::move(input_column)}, {result_type}, {nullptr});
const auto & result_column_typed = assert_cast<const ColumnVector<UInt64> &>(*column);
const auto & result_data = result_column_typed.getData();
out.assign(result_data);
}
else
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Hierarchy is not supported for complex key DirectDictionary");
}
static inline DirectDictionary::Key getAt(const PaddedPODArray<DirectDictionary::Key> & arr, const size_t idx)
{
return arr[idx];
}
static inline DirectDictionary::Key getAt(const DirectDictionary::Key & value, const size_t)
{
return value;
}
DirectDictionary::Key DirectDictionary::getValueOrNullByKey(const Key & to_find) const
template <DictionaryKeyType dictionary_key_type>
UInt64 DirectDictionary<dictionary_key_type>::getValueOrNullByKey(const Key & to_find) const
{
std::vector<Key> required_key = {to_find};
@ -65,12 +173,13 @@ DirectDictionary::Key DirectDictionary::getValueOrNullByKey(const Key & to_find)
stream->readPrefix();
bool is_found = false;
Key result = std::get<Key>(hierarchical_attribute->null_values);
UInt64 result = hierarchical_attribute->null_value.template get<UInt64>();
while (const auto block = stream->read())
{
const IColumn & id_column = *block.safeGetByPosition(0).column;
for (const size_t attribute_idx : ext::range(0, attributes.size()))
for (const size_t attribute_idx : ext::range(0, dict_struct.attributes.size()))
{
if (is_found)
break;
@ -96,10 +205,11 @@ DirectDictionary::Key DirectDictionary::getValueOrNullByKey(const Key & to_find)
return result;
}
template <DictionaryKeyType dictionary_key_type>
template <typename ChildType, typename AncestorType>
void DirectDictionary::isInImpl(const ChildType & child_ids, const AncestorType & ancestor_ids, PaddedPODArray<UInt8> & out) const
void DirectDictionary<dictionary_key_type>::isInImpl(const ChildType & child_ids, const AncestorType & ancestor_ids, PaddedPODArray<UInt8> & out) const
{
const auto null_value = std::get<UInt64>(hierarchical_attribute->null_values);
const auto null_value = hierarchical_attribute->null_value.template get<UInt64>();
const auto rows = out.size();
for (const auto row : ext::range(0, rows))
@ -116,315 +226,308 @@ void DirectDictionary::isInImpl(const ChildType & child_ids, const AncestorType
query_count.fetch_add(rows, std::memory_order_relaxed);
}
void DirectDictionary::isInVectorVector(
const PaddedPODArray<Key> & child_ids, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const
template <DictionaryKeyType dictionary_key_type>
void DirectDictionary<dictionary_key_type>::isInVectorVector(
const PaddedPODArray<UInt64> & child_ids, const PaddedPODArray<UInt64> & ancestor_ids, PaddedPODArray<UInt8> & out) const
{
isInImpl(child_ids, ancestor_ids, out);
}
void DirectDictionary::isInVectorConstant(const PaddedPODArray<Key> & child_ids, const Key ancestor_id, PaddedPODArray<UInt8> & out) const
template <DictionaryKeyType dictionary_key_type>
void DirectDictionary<dictionary_key_type>::isInVectorConstant(const PaddedPODArray<UInt64> & child_ids, const UInt64 ancestor_id, PaddedPODArray<UInt8> & out) const
{
isInImpl(child_ids, ancestor_id, out);
}
void DirectDictionary::isInConstantVector(const Key child_id, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const
template <DictionaryKeyType dictionary_key_type>
void DirectDictionary<dictionary_key_type>::isInConstantVector(const UInt64 child_id, const PaddedPODArray<UInt64> & ancestor_ids, PaddedPODArray<UInt8> & out) const
{
isInImpl(child_id, ancestor_ids, out);
}
ColumnPtr DirectDictionary::getColumn(
template <DictionaryKeyType dictionary_key_type>
ColumnPtr DirectDictionary<dictionary_key_type>::getColumn(
const std::string & attribute_name,
const DataTypePtr & result_type,
const Columns & key_columns,
const DataTypes &,
const DataTypes & key_types,
const ColumnPtr & default_values_column) const
{
ColumnPtr result;
if constexpr (dictionary_key_type == DictionaryKeyType::complex)
dict_struct.validateKeyTypes(key_types);
PaddedPODArray<Key> backup_storage;
const auto & ids = getColumnVectorData(this, key_columns.front(), backup_storage);
Arena complex_key_arena;
const auto & attribute = getAttribute(attribute_name);
const DictionaryAttribute & attribute = dict_struct.getAttribute(attribute_name, result_type);
auto result = attribute.type->createColumn();
auto keys_size = ids.size();
DefaultValueProvider default_value_provider(attribute.null_value, default_values_column);
DictionaryKeysExtractor<dictionary_key_type> extractor(key_columns, complex_key_arena);
const auto & requested_keys = extractor.getKeys();
size_t requested_attribute_index = attribute_index_by_name.find(attribute_name)->second;
ColumnUInt8::MutablePtr col_null_map_to;
ColumnUInt8::Container * vec_null_map_to = nullptr;
if (attribute.is_nullable)
{
col_null_map_to = ColumnUInt8::create(keys_size, false);
vec_null_map_to = &col_null_map_to->getData();
}
size_t dictionary_keys_size = dict_struct.getKeysNames().size();
size_t requested_key_index = 0;
Field block_column_value;
const auto & dictionary_attribute = dict_struct.getAttribute(attribute_name, result_type);
/** In result stream keys are returned in same order as they were requested.
* For example if we request keys [1, 2, 3, 4] but source has only [2, 3] we need to return to client
* [default_value, 2, 3, default_value].
* For each key fetched from source current algorithm adds default values until
* requested key with requested_key_index match key fetched from source.
* At the end we also need to process tail.
*/
auto type_call = [&](const auto &dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
BlockInputStreamPtr stream = getSourceBlockInputStream(key_columns, requested_keys);
using ValueType = DictionaryValueType<AttributeType>;
using ColumnProvider = DictionaryAttributeColumnProvider<AttributeType>;
const auto attribute_null_value = std::get<ValueType>(attribute.null_values);
AttributeType null_value = static_cast<AttributeType>(attribute_null_value);
DictionaryDefaultValueExtractor<AttributeType> default_value_extractor(std::move(null_value), default_values_column);
auto column = ColumnProvider::getColumn(dictionary_attribute, keys_size);
if constexpr (std::is_same_v<AttributeType, String>)
{
auto * out = column.get();
getItemsImpl<String, String>(
attribute,
ids,
[&](const size_t row, const String value, bool is_null)
{
if (attribute.is_nullable)
(*vec_null_map_to)[row] = is_null;
const auto ref = StringRef{value};
out->insertData(ref.data, ref.size);
},
default_value_extractor);
}
else
{
auto & out = column->getData();
getItemsImpl<AttributeType, AttributeType>(
attribute,
ids,
[&](const size_t row, const auto value, bool is_null)
{
if (attribute.is_nullable)
(*vec_null_map_to)[row] = is_null;
out[row] = value;
},
default_value_extractor);
}
result = std::move(column);
};
callOnDictionaryAttributeType(attribute.type, type_call);
if (attribute.is_nullable)
{
result = ColumnNullable::create(result, std::move(col_null_map_to));
}
return result;
}
ColumnUInt8::Ptr DirectDictionary::hasKeys(const Columns & key_columns, const DataTypes &) const
{
PaddedPODArray<Key> backup_storage;
const auto& ids = getColumnVectorData(this, key_columns.front(), backup_storage);
auto result = ColumnUInt8::create(ext::size(ids));
auto& out = result->getData();
const auto rows = ext::size(ids);
HashMap<Key, UInt8> has_key;
for (const auto row : ext::range(0, rows))
has_key[ids[row]] = 0;
std::vector<Key> to_load;
to_load.reserve(has_key.size());
for (auto it = has_key.begin(); it != has_key.end(); ++it)
to_load.emplace_back(static_cast<Key>(it->getKey()));
auto stream = source_ptr->loadIds(to_load);
stream->readPrefix();
while (const auto block = stream->read())
{
const IColumn & id_column = *block.safeGetByPosition(0).column;
Columns block_key_columns;
block_key_columns.reserve(dictionary_keys_size);
for (const auto row_idx : ext::range(0, id_column.size()))
auto block_columns = block.getColumns();
/// Split into keys columns and attribute columns
for (size_t i = 0; i < dictionary_keys_size; ++i)
{
const auto key = id_column[row_idx].get<UInt64>();
has_key[key] = 1;
block_key_columns.emplace_back(*block_columns.begin());
block_columns.erase(block_columns.begin());
}
DictionaryKeysExtractor<dictionary_key_type> block_keys_extractor(block_key_columns, complex_key_arena);
const auto & block_keys = block_keys_extractor.getKeys();
size_t block_keys_size = block_keys.size();
const auto & block_column = block.safeGetByPosition(dictionary_keys_size + requested_attribute_index).column;
for (size_t block_key_index = 0; block_key_index < block_keys_size; ++block_key_index)
{
auto block_key = block_keys[block_key_index];
while (requested_key_index < requested_keys.size() &&
block_key != requested_keys[requested_key_index])
{
block_column_value = default_value_provider.getDefaultValue(requested_key_index);
result->insert(block_column_value);
++requested_key_index;
}
block_column->get(block_key_index, block_column_value);
result->insert(block_column_value);
++requested_key_index;
}
}
stream->readSuffix();
for (const auto row : ext::range(0, rows))
out[row] = has_key[ids[row]];
size_t requested_keys_size = requested_keys.size();
query_count.fetch_add(rows, std::memory_order_relaxed);
Field default_value;
/// Process tail, if source returned keys less keys sizes than we fetched insert default value for tail
for (; requested_key_index < requested_keys_size; ++requested_key_index)
{
default_value = default_value_provider.getDefaultValue(requested_key_index);
result->insert(default_value);
}
query_count.fetch_add(requested_keys_size, std::memory_order_relaxed);
Field result_val;
for (size_t i = 0; i < result->size(); ++i)
{
result->get(i, result_val);
std::cerr << "I " << i << " dump " << result_val.dump() << std::endl;
}
return result;
}
void DirectDictionary::createAttributes()
template <DictionaryKeyType dictionary_key_type>
ColumnUInt8::Ptr DirectDictionary<dictionary_key_type>::hasKeys(const Columns & key_columns, const DataTypes & key_types) const
{
const auto size = dict_struct.attributes.size();
attributes.reserve(size);
if constexpr (dictionary_key_type == DictionaryKeyType::complex)
dict_struct.validateKeyTypes(key_types);
for (const auto & attribute : dict_struct.attributes)
Arena complex_key_arena;
DictionaryKeysExtractor<dictionary_key_type> requested_keys_extractor(key_columns, complex_key_arena);
const auto & requested_keys = requested_keys_extractor.getKeys();
size_t requested_keys_size = requested_keys.size();
auto result = ColumnUInt8::create(requested_keys_size, false);
auto & result_data = result->getData();
size_t dictionary_keys_size = dict_struct.getKeysNames().size();
size_t requested_key_index = 0;
Field block_column_value;
/** Algorithm is the same as in getColumn method. There are only 2 details
* 1. We does not process tail because result column is created with false default value.
* 2. If requested key does not match key from source we set false in requested_key_index.
*/
BlockInputStreamPtr stream = getSourceBlockInputStream(key_columns, requested_keys);
stream->readPrefix();
while (const auto block = stream->read())
{
attribute_index_by_name.emplace(attribute.name, attributes.size());
attribute_name_by_index.emplace(attributes.size(), attribute.name);
attributes.push_back(createAttribute(attribute, attribute.null_value, attribute.name));
auto block_columns = block.getColumns();
Columns block_key_columns;
block_key_columns.reserve(dictionary_keys_size);
/// Split into keys columns and attribute columns
for (size_t i = 0; i < dictionary_keys_size; ++i)
{
block_key_columns.emplace_back(*block_columns.begin());
block_columns.erase(block_columns.begin());
}
DictionaryKeysExtractor<dictionary_key_type> block_keys_extractor(block_key_columns, complex_key_arena);
const auto & block_keys = block_keys_extractor.getKeys();
size_t block_keys_size = block_keys.size();
for (size_t block_key_index = 0; block_key_index < block_keys_size; ++block_key_index)
{
auto block_key = block_keys[block_key_index];
while (requested_key_index < requested_keys.size() &&
block_key != requested_keys[requested_key_index])
{
result_data[requested_key_index] = false;
++requested_key_index;
}
result_data[requested_key_index] = true;
++requested_key_index;
}
}
stream->readSuffix();
/// We does not add additional code for tail because result was initialized with false values
query_count.fetch_add(requested_keys_size, std::memory_order_relaxed);
return result;
}
template <DictionaryKeyType dictionary_key_type>
BlockInputStreamPtr DirectDictionary<dictionary_key_type>::getSourceBlockInputStream(const Columns & key_columns, const PaddedPODArray<KeyType> & requested_keys) const
{
size_t requested_keys_size = requested_keys.size();
BlockInputStreamPtr stream;
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
{
std::vector<UInt64> ids;
ids.reserve(requested_keys_size);
for (auto key : requested_keys)
ids.emplace_back(key);
stream = source_ptr->loadIds(ids);
}
else
{
std::vector<size_t> requested_rows;
requested_rows.reserve(requested_keys_size);
for (size_t i = 0; i < requested_keys_size; ++i)
requested_rows.emplace_back(i);
stream = source_ptr->loadKeys(key_columns, requested_rows);
}
return stream;
}
template <DictionaryKeyType dictionary_key_type>
void DirectDictionary<dictionary_key_type>::setup()
{
/// TODO: Move this to DictionaryStructure
size_t dictionary_attributes_size = dict_struct.attributes.size();
for (size_t i = 0; i < dictionary_attributes_size; ++i)
{
const auto & attribute = dict_struct.attributes[i];
attribute_index_by_name[attribute.name] = i;
attribute_name_by_index[i] = attribute.name;
if (attribute.hierarchical)
{
hierarchical_attribute = &attributes.back();
if constexpr (dictionary_key_type == DictionaryKeyType::complex)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"({}): hierachical attributes are not supported for complex key direct dictionary",
full_name);
if (hierarchical_attribute->type != AttributeUnderlyingType::utUInt64)
hierarchical_attribute = &attribute;
if (attribute.underlying_type != AttributeUnderlyingType::utUInt64)
throw Exception{full_name + ": hierarchical attribute must be UInt64.", ErrorCodes::TYPE_MISMATCH};
}
}
}
template <typename T>
void DirectDictionary::createAttributeImpl(Attribute & attribute, const Field & null_value)
{
attribute.null_values = T(null_value.get<NearestFieldType<T>>());
}
template <>
void DirectDictionary::createAttributeImpl<String>(Attribute & attribute, const Field & null_value)
{
attribute.string_arena = std::make_unique<Arena>();
const String & string = null_value.get<String>();
const char * string_in_arena = attribute.string_arena->insert(string.data(), string.size());
attribute.null_values.emplace<StringRef>(string_in_arena, string.size());
}
DirectDictionary::Attribute DirectDictionary::createAttribute(const DictionaryAttribute& attribute, const Field & null_value, const std::string & attr_name)
{
Attribute attr{attribute.underlying_type, attribute.is_nullable, {}, {}, attr_name};
auto type_call = [&](const auto &dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
createAttributeImpl<AttributeType>(attr, null_value);
};
callOnDictionaryAttributeType(attribute.underlying_type, type_call);
return attr;
}
template <typename AttributeType, typename OutputType, typename ValueSetter, typename DefaultValueExtractor>
void DirectDictionary::getItemsImpl(
const Attribute & attribute,
const PaddedPODArray<Key> & ids,
ValueSetter && set_value,
DefaultValueExtractor & default_value_extractor) const
{
const auto rows = ext::size(ids);
HashMap<Key, OutputType> value_by_key;
HashSet<Key> value_is_null;
for (const auto row : ext::range(0, rows))
{
auto key = ids[row];
value_by_key[key] = static_cast<AttributeType>(default_value_extractor[row]);
}
std::vector<Key> to_load;
to_load.reserve(value_by_key.size());
for (auto it = value_by_key.begin(); it != value_by_key.end(); ++it)
to_load.emplace_back(static_cast<Key>(it->getKey()));
auto stream = source_ptr->loadIds(to_load);
stream->readPrefix();
const auto it = attribute_index_by_name.find(attribute.name);
if (it == std::end(attribute_index_by_name))
throw Exception{full_name + ": no such attribute '" + attribute.name + "'", ErrorCodes::BAD_ARGUMENTS};
auto attribute_index = it->second;
while (const auto block = stream->read())
{
const IColumn & id_column = *block.safeGetByPosition(0).column;
const IColumn & attribute_column = *block.safeGetByPosition(attribute_index + 1).column;
for (const auto row_idx : ext::range(0, id_column.size()))
{
const auto key = id_column[row_idx].get<UInt64>();
if (value_by_key.find(key) != value_by_key.end())
{
auto value = attribute_column[row_idx];
if (value.isNull())
value_is_null.insert(key);
else
value_by_key[key] = static_cast<OutputType>(value.get<NearestFieldType<AttributeType>>());
}
}
}
stream->readSuffix();
for (const auto row : ext::range(0, rows))
{
auto key = ids[row];
set_value(row, value_by_key[key], value_is_null.find(key) != nullptr);
}
query_count.fetch_add(rows, std::memory_order_relaxed);
}
const DirectDictionary::Attribute & DirectDictionary::getAttribute(const std::string & attribute_name) const
{
const auto it = attribute_index_by_name.find(attribute_name);
if (it == std::end(attribute_index_by_name))
throw Exception{full_name + ": no such attribute '" + attribute_name + "'", ErrorCodes::BAD_ARGUMENTS};
return attributes[it->second];
}
BlockInputStreamPtr DirectDictionary::getBlockInputStream(const Names & /* column_names */, size_t /* max_block_size */) const
template <DictionaryKeyType dictionary_key_type>
BlockInputStreamPtr DirectDictionary<dictionary_key_type>::getBlockInputStream(const Names & /* column_names */, size_t /* max_block_size */) const
{
return source_ptr->loadAll();
}
void registerDictionaryDirect(DictionaryFactory & factory)
namespace
{
auto create_layout = [=](const std::string & full_name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr) -> DictionaryPtr
template <DictionaryKeyType dictionary_key_type>
DictionaryPtr createDirectDictionary(
const std::string & full_name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr)
{
if (dict_struct.key)
throw Exception{"'key' is not supported for dictionary of layout 'direct'", ErrorCodes::UNSUPPORTED_METHOD};
const auto * layout_name = dictionary_key_type == DictionaryKeyType::simple ? "direct" : "complex_key_direct";
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
{
if (dict_struct.key)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"'key' is not supported for dictionary of layout '({})'",
layout_name);
}
else
{
if (dict_struct.id)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"'id' is not supported for dictionary of layout '({})'",
layout_name);
}
if (dict_struct.range_min || dict_struct.range_max)
throw Exception{full_name
+ ": elements .structure.range_min and .structure.range_max should be defined only "
"for a dictionary of layout 'range_hashed'",
ErrorCodes::BAD_ARGUMENTS};
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"({}): elements .structure.range_min and .structure.range_max should be defined only " \
"for a dictionary of layout 'range_hashed'",
full_name);
const auto dict_id = StorageID::fromDictionaryConfig(config, config_prefix);
if (config.has(config_prefix + ".lifetime.min") || config.has(config_prefix + ".lifetime.max"))
throw Exception{"'lifetime' parameter is redundant for the dictionary' of layout 'direct'", ErrorCodes::BAD_ARGUMENTS};
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"'lifetime' parameter is redundant for the dictionary' of layout '({})'",
layout_name);
return std::make_unique<DirectDictionary<dictionary_key_type>>(dict_id, dict_struct, std::move(source_ptr));
}
}
return std::make_unique<DirectDictionary>(dict_id, dict_struct, std::move(source_ptr));
};
factory.registerLayout("direct", create_layout, false);
template class DirectDictionary<DictionaryKeyType::simple>;
template class DirectDictionary<DictionaryKeyType::complex>;
void registerDictionaryDirect(DictionaryFactory & factory)
{
factory.registerLayout("direct", createDirectDictionary<DictionaryKeyType::simple>, false);
factory.registerLayout("complex_key_direct", createDirectDictionary<DictionaryKeyType::complex>, true);
}

View File

@ -18,16 +18,25 @@
namespace DB
{
template <DictionaryKeyType dictionary_key_type>
class DirectDictionary final : public IDictionary
{
public:
static_assert(dictionary_key_type != DictionaryKeyType::range, "Range key type is not supported by direct dictionary");
using KeyType = std::conditional_t<dictionary_key_type == DictionaryKeyType::simple, UInt64, StringRef>;
DirectDictionary(
const StorageID & dict_id_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
BlockPtr saved_block_ = nullptr);
std::string getTypeName() const override { return "Direct"; }
std::string getTypeName() const override {
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
return "Direct";
else
return "ComplexKeyDirect";
}
size_t getBytesAllocated() const override { return 0; }
@ -52,19 +61,27 @@ public:
bool isInjective(const std::string & attribute_name) const override
{
return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective;
auto it = attribute_index_by_name.find(attribute_name);
if (it == attribute_index_by_name.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"({}): no attribute with name ({}) in dictionary",
full_name,
attribute_name);
return dict_struct.attributes[it->second].injective;
}
bool hasHierarchy() const override { return hierarchical_attribute; }
void toParent(const PaddedPODArray<Key> & ids, PaddedPODArray<Key> & out) const override;
void toParent(const PaddedPODArray<UInt64> & ids, PaddedPODArray<UInt64> & out) const override;
void isInVectorVector(
const PaddedPODArray<Key> & child_ids, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const override;
void isInVectorConstant(const PaddedPODArray<Key> & child_ids, const Key ancestor_id, PaddedPODArray<UInt8> & out) const override;
void isInConstantVector(const Key child_id, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const override;
const PaddedPODArray<UInt64> & child_ids, const PaddedPODArray<UInt64> & ancestor_ids, PaddedPODArray<UInt8> & out) const override;
void isInVectorConstant(const PaddedPODArray<UInt64> & child_ids, const UInt64 ancestor_id, PaddedPODArray<UInt8> & out) const override;
void isInConstantVector(const UInt64 child_id, const PaddedPODArray<UInt64> & ancestor_ids, PaddedPODArray<UInt8> & out) const override;
DictionaryKeyType getKeyType() const override { return DictionaryKeyType::simple; }
DictionaryKeyType getKeyType() const override { return dictionary_key_type; }
ColumnPtr getColumn(
const std::string& attribute_name,
@ -78,56 +95,11 @@ public:
BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const override;
private:
struct Attribute final
{
AttributeUnderlyingType type;
bool is_nullable;
std::variant<
UInt8,
UInt16,
UInt32,
UInt64,
UInt128,
Int8,
Int16,
Int32,
Int64,
Decimal32,
Decimal64,
Decimal128,
Float32,
Float64,
StringRef>
null_values;
std::unique_ptr<Arena> string_arena;
std::string name;
};
void setup();
void createAttributes();
BlockInputStreamPtr getSourceBlockInputStream(const Columns & key_columns, const PaddedPODArray<KeyType> & requested_keys) const;
template <typename T>
void addAttributeSize(const Attribute & attribute);
template <typename T>
static void createAttributeImpl(Attribute & attribute, const Field & null_value);
static Attribute createAttribute(const DictionaryAttribute& attribute, const Field & null_value, const std::string & name);
template <typename AttributeType, typename OutputType, typename ValueSetter, typename DefaultValueExtractor>
void getItemsImpl(
const Attribute & attribute,
const PaddedPODArray<Key> & ids,
ValueSetter && set_value,
DefaultValueExtractor & default_value_extractor) const;
template <typename T>
void setAttributeValueImpl(Attribute & attribute, const Key id, const T & value);
void setAttributeValue(Attribute & attribute, const Key id, const Field & value);
const Attribute & getAttribute(const std::string & attribute_name) const;
Key getValueOrNullByKey(const Key & to_find) const;
UInt64 getValueOrNullByKey(const UInt64 & to_find) const;
template <typename ChildType, typename AncestorType>
void isInImpl(const ChildType & child_ids, const AncestorType & ancestor_ids, PaddedPODArray<UInt8> & out) const;
@ -136,14 +108,17 @@ private:
const DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
std::map<std::string, size_t> attribute_index_by_name;
std::map<size_t, std::string> attribute_name_by_index;
std::vector<Attribute> attributes;
const Attribute * hierarchical_attribute = nullptr;
std::unordered_map<std::string, size_t> attribute_index_by_name;
std::unordered_map<size_t, std::string> attribute_name_by_index;
const DictionaryAttribute * hierarchical_attribute = nullptr;
mutable std::atomic<size_t> query_count{0};
BlockPtr saved_block;
};
extern template class DirectDictionary<DictionaryKeyType::simple>;
extern template class DirectDictionary<DictionaryKeyType::complex>;
}

View File

@ -25,7 +25,6 @@ void registerDictionarySourceLibrary(DictionarySourceFactory & source_factory);
class DictionaryFactory;
void registerDictionaryRangeHashed(DictionaryFactory & factory);
void registerDictionaryComplexKeyHashed(DictionaryFactory & factory);
void registerDictionaryComplexKeyDirect(DictionaryFactory & factory);
void registerDictionaryTrie(DictionaryFactory & factory);
void registerDictionaryFlat(DictionaryFactory & factory);
void registerDictionaryHashed(DictionaryFactory & factory);
@ -59,7 +58,6 @@ void registerDictionaries()
auto & factory = DictionaryFactory::instance();
registerDictionaryRangeHashed(factory);
registerDictionaryComplexKeyHashed(factory);
registerDictionaryComplexKeyDirect(factory);
registerDictionaryTrie(factory);
registerDictionaryFlat(factory);
registerDictionaryHashed(factory);

View File

@ -727,8 +727,9 @@ private:
auto dict = helper.getDictionary(arguments[0]);
ColumnPtr res;
/// TODO: Rewrite this
if (!((res = executeDispatch<FlatDictionary>(arguments, result_type, dict))
|| (res = executeDispatch<DirectDictionary>(arguments, result_type, dict))
|| (res = executeDispatch<DirectDictionary<DictionaryKeyType::simple>>(arguments, result_type, dict))
|| (res = executeDispatch<HashedDictionary>(arguments, result_type, dict))
|| (res = executeDispatch<CacheDictionary<DictionaryKeyType::simple>>(arguments, result_type, dict))))
throw Exception{"Unsupported dictionary type " + dict->getTypeName(), ErrorCodes::UNKNOWN_TYPE};
@ -881,7 +882,7 @@ private:
ColumnPtr res;
if (!((res = executeDispatch<FlatDictionary>(arguments, dict))
|| (res = executeDispatch<DirectDictionary>(arguments, dict))
|| (res = executeDispatch<DirectDictionary<DictionaryKeyType::simple>>(arguments, dict))
|| (res = executeDispatch<HashedDictionary>(arguments, dict))
|| (res = executeDispatch<CacheDictionary<DictionaryKeyType::simple>>(arguments, dict))))
throw Exception{"Unsupported dictionary type " + dict->getTypeName(), ErrorCodes::UNKNOWN_TYPE};

View File

@ -0,0 +1,66 @@
Dictionary direct_dictionary_simple_key_simple_attributes
dictGet existing value
value_0 value_second_0
value_1 value_second_1
value_2 value_second_2
dictGet with non existing value
value_0 value_second_0
value_1 value_second_1
value_2 value_second_2
value_first_default value_second_default
dictGetOrDefault existing value
value_0 value_second_0
value_1 value_second_1
value_2 value_second_2
dictGetOrDefault non existing value
value_0 value_second_0
value_1 value_second_1
value_2 value_second_2
default default
dictHas
1
1
1
0
select all values as input stream
0 value_0 value_second_0
1 value_1 value_second_1
2 value_2 value_second_2
Dictionary direct_dictionary_simple_key_complex_attributes
dictGet existing value
value_0 value_second_0
value_1 \N
value_2 value_second_2
dictGet with non existing value
value_0 value_second_0
value_1 \N
value_2 value_second_2
value_first_default value_second_default
dictGetOrDefault existing value
value_0 value_second_0
value_1 \N
value_2 value_second_2
dictGetOrDefault non existing value
value_0 value_second_0
value_1 \N
value_2 value_second_2
default default
dictHas
1
1
1
0
select all values as input stream
0 value_0 value_second_0
1 value_1 \N
2 value_2 value_second_2
Dictionary direct_dictionary_simple_key_hierarchy
dictGet
0
0
1
1
2
dictGetHierarchy
[1]
[4,2,1]

View File

@ -0,0 +1,120 @@
DROP DATABASE IF EXISTS 01753_dictionary_db;
CREATE DATABASE 01753_dictionary_db;
CREATE TABLE 01753_dictionary_db.simple_key_simple_attributes_source_table
(
id UInt64,
value_first String,
value_second String
)
ENGINE = TinyLog;
INSERT INTO 01753_dictionary_db.simple_key_simple_attributes_source_table VALUES(0, 'value_0', 'value_second_0');
INSERT INTO 01753_dictionary_db.simple_key_simple_attributes_source_table VALUES(1, 'value_1', 'value_second_1');
INSERT INTO 01753_dictionary_db.simple_key_simple_attributes_source_table VALUES(2, 'value_2', 'value_second_2');
CREATE DICTIONARY 01753_dictionary_db.direct_dictionary_simple_key_simple_attributes
(
id UInt64,
value_first String DEFAULT 'value_first_default',
value_second String DEFAULT 'value_second_default'
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'simple_key_simple_attributes_source_table'))
LAYOUT(DIRECT());
SELECT 'Dictionary direct_dictionary_simple_key_simple_attributes';
SELECT 'dictGet existing value';
SELECT dictGet('01753_dictionary_db.direct_dictionary_simple_key_simple_attributes', 'value_first', number) as value_first,
dictGet('01753_dictionary_db.direct_dictionary_simple_key_simple_attributes', 'value_second', number) as value_second FROM system.numbers LIMIT 3;
SELECT 'dictGet with non existing value';
SELECT dictGet('01753_dictionary_db.direct_dictionary_simple_key_simple_attributes', 'value_first', number) as value_first,
dictGet('01753_dictionary_db.direct_dictionary_simple_key_simple_attributes', 'value_second', number) as value_second FROM system.numbers LIMIT 4;
SELECT 'dictGetOrDefault existing value';
SELECT dictGetOrDefault('01753_dictionary_db.direct_dictionary_simple_key_simple_attributes', 'value_first', number, toString('default')) as value_first,
dictGetOrDefault('01753_dictionary_db.direct_dictionary_simple_key_simple_attributes', 'value_second', number, toString('default')) as value_second FROM system.numbers LIMIT 3;
SELECT 'dictGetOrDefault non existing value';
SELECT dictGetOrDefault('01753_dictionary_db.direct_dictionary_simple_key_simple_attributes', 'value_first', number, toString('default')) as value_first,
dictGetOrDefault('01753_dictionary_db.direct_dictionary_simple_key_simple_attributes', 'value_second', number, toString('default')) as value_second FROM system.numbers LIMIT 4;
SELECT 'dictHas';
SELECT dictHas('01753_dictionary_db.direct_dictionary_simple_key_simple_attributes', number) FROM system.numbers LIMIT 4;
SELECT 'select all values as input stream';
SELECT * FROM 01753_dictionary_db.direct_dictionary_simple_key_simple_attributes;
DROP DICTIONARY 01753_dictionary_db.direct_dictionary_simple_key_simple_attributes;
DROP TABLE 01753_dictionary_db.simple_key_simple_attributes_source_table;
CREATE TABLE 01753_dictionary_db.simple_key_complex_attributes_source_table
(
id UInt64,
value_first String,
value_second Nullable(String)
)
ENGINE = TinyLog;
INSERT INTO 01753_dictionary_db.simple_key_complex_attributes_source_table VALUES(0, 'value_0', 'value_second_0');
INSERT INTO 01753_dictionary_db.simple_key_complex_attributes_source_table VALUES(1, 'value_1', NULL);
INSERT INTO 01753_dictionary_db.simple_key_complex_attributes_source_table VALUES(2, 'value_2', 'value_second_2');
CREATE DICTIONARY 01753_dictionary_db.direct_dictionary_simple_key_complex_attributes
(
id UInt64,
value_first String DEFAULT 'value_first_default',
value_second Nullable(String) DEFAULT 'value_second_default'
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'simple_key_complex_attributes_source_table'))
LAYOUT(DIRECT());
SELECT 'Dictionary direct_dictionary_simple_key_complex_attributes';
SELECT 'dictGet existing value';
SELECT dictGet('01753_dictionary_db.direct_dictionary_simple_key_complex_attributes', 'value_first', number) as value_first,
dictGet('01753_dictionary_db.direct_dictionary_simple_key_complex_attributes', 'value_second', number) as value_second FROM system.numbers LIMIT 3;
SELECT 'dictGet with non existing value';
SELECT dictGet('01753_dictionary_db.direct_dictionary_simple_key_complex_attributes', 'value_first', number) as value_first,
dictGet('01753_dictionary_db.direct_dictionary_simple_key_complex_attributes', 'value_second', number) as value_second FROM system.numbers LIMIT 4;
SELECT 'dictGetOrDefault existing value';
SELECT dictGetOrDefault('01753_dictionary_db.direct_dictionary_simple_key_complex_attributes', 'value_first', number, toString('default')) as value_first,
dictGetOrDefault('01753_dictionary_db.direct_dictionary_simple_key_complex_attributes', 'value_second', number, toString('default')) as value_second FROM system.numbers LIMIT 3;
SELECT 'dictGetOrDefault non existing value';
SELECT dictGetOrDefault('01753_dictionary_db.direct_dictionary_simple_key_complex_attributes', 'value_first', number, toString('default')) as value_first,
dictGetOrDefault('01753_dictionary_db.direct_dictionary_simple_key_complex_attributes', 'value_second', number, toString('default')) as value_second FROM system.numbers LIMIT 4;
SELECT 'dictHas';
SELECT dictHas('01753_dictionary_db.direct_dictionary_simple_key_complex_attributes', number) FROM system.numbers LIMIT 4;
SELECT 'select all values as input stream';
SELECT * FROM 01753_dictionary_db.direct_dictionary_simple_key_complex_attributes;
DROP DICTIONARY 01753_dictionary_db.direct_dictionary_simple_key_complex_attributes;
DROP TABLE 01753_dictionary_db.simple_key_complex_attributes_source_table;
CREATE TABLE 01753_dictionary_db.simple_key_hierarchy_table
(
id UInt64,
parent_id UInt64
) ENGINE = TinyLog();
INSERT INTO 01753_dictionary_db.simple_key_hierarchy_table VALUES (1, 0);
INSERT INTO 01753_dictionary_db.simple_key_hierarchy_table VALUES (2, 1);
INSERT INTO 01753_dictionary_db.simple_key_hierarchy_table VALUES (3, 1);
INSERT INTO 01753_dictionary_db.simple_key_hierarchy_table VALUES (4, 2);
CREATE DICTIONARY 01753_dictionary_db.direct_dictionary_simple_key_hierarchy
(
id UInt64,
parent_id UInt64 HIERARCHICAL
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'simple_key_hierarchy_table'))
LAYOUT(DIRECT());
SELECT 'Dictionary direct_dictionary_simple_key_hierarchy';
SELECT 'dictGet';
SELECT dictGet('01753_dictionary_db.direct_dictionary_simple_key_hierarchy', 'parent_id', number) FROM system.numbers LIMIT 5;
SELECT 'dictGetHierarchy';
SELECT dictGetHierarchy('01753_dictionary_db.direct_dictionary_simple_key_hierarchy', toUInt64(1));
SELECT dictGetHierarchy('01753_dictionary_db.direct_dictionary_simple_key_hierarchy', toUInt64(4));
DROP DICTIONARY 01753_dictionary_db.direct_dictionary_simple_key_hierarchy;
DROP TABLE 01753_dictionary_db.simple_key_hierarchy_table;
DROP DATABASE 01753_dictionary_db;

View File

@ -0,0 +1,56 @@
Dictionary direct_dictionary_complex_key_simple_attributes
dictGet existing value
value_0 value_second_0
value_1 value_second_1
value_2 value_second_2
dictGet with non existing value
value_0 value_second_0
value_1 value_second_1
value_2 value_second_2
value_first_default value_second_default
dictGetOrDefault existing value
value_0 value_second_0
value_1 value_second_1
value_2 value_second_2
dictGetOrDefault non existing value
value_0 value_second_0
value_1 value_second_1
value_2 value_second_2
default default
dictHas
1
1
1
0
select all values as input stream
0 id_key_0 value_0 value_second_0
1 id_key_1 value_1 value_second_1
2 id_key_2 value_2 value_second_2
Dictionary direct_dictionary_complex_key_complex_attributes
dictGet existing value
value_0 value_second_0
value_1 \N
value_2 value_second_2
dictGet with non existing value
value_0 value_second_0
value_1 \N
value_2 value_second_2
value_first_default value_second_default
dictGetOrDefault existing value
value_0 value_second_0
value_1 \N
value_2 value_second_2
dictGetOrDefault non existing value
value_0 value_second_0
value_1 \N
value_2 value_second_2
default default
dictHas
1
1
1
0
select all values as input stream
0 id_key_0 value_0 value_second_0
1 id_key_1 value_1 \N
2 id_key_2 value_2 value_second_2

View File

@ -0,0 +1,95 @@
DROP DATABASE IF EXISTS 01754_dictionary_db;
CREATE DATABASE 01754_dictionary_db;
CREATE TABLE 01754_dictionary_db.complex_key_simple_attributes_source_table
(
id UInt64,
id_key String,
value_first String,
value_second String
)
ENGINE = TinyLog;
INSERT INTO 01754_dictionary_db.complex_key_simple_attributes_source_table VALUES(0, 'id_key_0', 'value_0', 'value_second_0');
INSERT INTO 01754_dictionary_db.complex_key_simple_attributes_source_table VALUES(1, 'id_key_1', 'value_1', 'value_second_1');
INSERT INTO 01754_dictionary_db.complex_key_simple_attributes_source_table VALUES(2, 'id_key_2', 'value_2', 'value_second_2');
CREATE DICTIONARY 01754_dictionary_db.direct_dictionary_complex_key_simple_attributes
(
id UInt64,
id_key String DEFAULT 'test_default_id_key',
value_first String DEFAULT 'value_first_default',
value_second String DEFAULT 'value_second_default'
)
PRIMARY KEY id, id_key
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'complex_key_simple_attributes_source_table'))
LAYOUT(COMPLEX_KEY_DIRECT());
SELECT 'Dictionary direct_dictionary_complex_key_simple_attributes';
SELECT 'dictGet existing value';
SELECT dictGet('01754_dictionary_db.direct_dictionary_complex_key_simple_attributes', 'value_first', (number, concat('id_key_', toString(number)))) as value_first,
dictGet('01754_dictionary_db.direct_dictionary_complex_key_simple_attributes', 'value_second', (number, concat('id_key_', toString(number)))) as value_second FROM system.numbers LIMIT 3;
SELECT 'dictGet with non existing value';
SELECT dictGet('01754_dictionary_db.direct_dictionary_complex_key_simple_attributes', 'value_first', (number, concat('id_key_', toString(number)))) as value_first,
dictGet('01754_dictionary_db.direct_dictionary_complex_key_simple_attributes', 'value_second', (number, concat('id_key_', toString(number)))) as value_second FROM system.numbers LIMIT 4;
SELECT 'dictGetOrDefault existing value';
SELECT dictGetOrDefault('01754_dictionary_db.direct_dictionary_complex_key_simple_attributes', 'value_first', (number, concat('id_key_', toString(number))), toString('default')) as value_first,
dictGetOrDefault('01754_dictionary_db.direct_dictionary_complex_key_simple_attributes', 'value_second', (number, concat('id_key_', toString(number))), toString('default')) as value_second FROM system.numbers LIMIT 3;
SELECT 'dictGetOrDefault non existing value';
SELECT dictGetOrDefault('01754_dictionary_db.direct_dictionary_complex_key_simple_attributes', 'value_first', (number, concat('id_key_', toString(number))), toString('default')) as value_first,
dictGetOrDefault('01754_dictionary_db.direct_dictionary_complex_key_simple_attributes', 'value_second', (number, concat('id_key_', toString(number))), toString('default')) as value_second FROM system.numbers LIMIT 4;
SELECT 'dictHas';
SELECT dictHas('01754_dictionary_db.direct_dictionary_complex_key_simple_attributes', (number, concat('id_key_', toString(number)))) FROM system.numbers LIMIT 4;
SELECT 'select all values as input stream';
SELECT * FROM 01754_dictionary_db.direct_dictionary_complex_key_simple_attributes;
DROP DICTIONARY 01754_dictionary_db.direct_dictionary_complex_key_simple_attributes;
DROP TABLE 01754_dictionary_db.complex_key_simple_attributes_source_table;
CREATE TABLE 01754_dictionary_db.complex_key_complex_attributes_source_table
(
id UInt64,
id_key String,
value_first String,
value_second Nullable(String)
)
ENGINE = TinyLog;
INSERT INTO 01754_dictionary_db.complex_key_complex_attributes_source_table VALUES(0, 'id_key_0', 'value_0', 'value_second_0');
INSERT INTO 01754_dictionary_db.complex_key_complex_attributes_source_table VALUES(1, 'id_key_1', 'value_1', NULL);
INSERT INTO 01754_dictionary_db.complex_key_complex_attributes_source_table VALUES(2, 'id_key_2', 'value_2', 'value_second_2');
CREATE DICTIONARY 01754_dictionary_db.direct_dictionary_complex_key_complex_attributes
(
id UInt64,
id_key String,
value_first String DEFAULT 'value_first_default',
value_second Nullable(String) DEFAULT 'value_second_default'
)
PRIMARY KEY id, id_key
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'complex_key_complex_attributes_source_table'))
LAYOUT(COMPLEX_KEY_DIRECT());
SELECT 'Dictionary direct_dictionary_complex_key_complex_attributes';
SELECT 'dictGet existing value';
SELECT dictGet('01754_dictionary_db.direct_dictionary_complex_key_complex_attributes', 'value_first', (number, concat('id_key_', toString(number)))) as value_first,
dictGet('01754_dictionary_db.direct_dictionary_complex_key_complex_attributes', 'value_second', (number, concat('id_key_', toString(number)))) as value_second FROM system.numbers LIMIT 3;
SELECT 'dictGet with non existing value';
SELECT dictGet('01754_dictionary_db.direct_dictionary_complex_key_complex_attributes', 'value_first', (number, concat('id_key_', toString(number)))) as value_first,
dictGet('01754_dictionary_db.direct_dictionary_complex_key_complex_attributes', 'value_second', (number, concat('id_key_', toString(number)))) as value_second FROM system.numbers LIMIT 4;
SELECT 'dictGetOrDefault existing value';
SELECT dictGetOrDefault('01754_dictionary_db.direct_dictionary_complex_key_complex_attributes', 'value_first', (number, concat('id_key_', toString(number))), toString('default')) as value_first,
dictGetOrDefault('01754_dictionary_db.direct_dictionary_complex_key_complex_attributes', 'value_second', (number, concat('id_key_', toString(number))), toString('default')) as value_second FROM system.numbers LIMIT 3;
SELECT 'dictGetOrDefault non existing value';
SELECT dictGetOrDefault('01754_dictionary_db.direct_dictionary_complex_key_complex_attributes', 'value_first', (number, concat('id_key_', toString(number))), toString('default')) as value_first,
dictGetOrDefault('01754_dictionary_db.direct_dictionary_complex_key_complex_attributes', 'value_second', (number, concat('id_key_', toString(number))), toString('default')) as value_second FROM system.numbers LIMIT 4;
SELECT 'dictHas';
SELECT dictHas('01754_dictionary_db.direct_dictionary_complex_key_complex_attributes', (number, concat('id_key_', toString(number)))) FROM system.numbers LIMIT 4;
SELECT 'select all values as input stream';
SELECT * FROM 01754_dictionary_db.direct_dictionary_complex_key_complex_attributes;
DROP DICTIONARY 01754_dictionary_db.direct_dictionary_complex_key_complex_attributes;
DROP TABLE 01754_dictionary_db.complex_key_complex_attributes_source_table;
DROP DATABASE 01754_dictionary_db;