Dictionary update field fix

This commit is contained in:
Maksim Kita 2021-04-02 23:16:04 +03:00
parent 6d3bf81dd1
commit ff86c21e65
9 changed files with 353 additions and 265 deletions

View File

@ -22,19 +22,28 @@ struct PairNoInit
First first;
Second second;
PairNoInit() {}
PairNoInit() = default;
template <typename First_>
PairNoInit(First_ && first_, NoInitTag) : first(std::forward<First_>(first_))
template <typename FirstValue>
PairNoInit(FirstValue && first_, NoInitTag)
: first(std::forward<FirstValue>(first_))
{
}
template <typename First_, typename Second_>
PairNoInit(First_ && first_, Second_ && second_) : first(std::forward<First_>(first_)), second(std::forward<Second_>(second_))
template <typename FirstValue, typename SecondValue>
PairNoInit(FirstValue && first_, SecondValue && second_)
: first(std::forward<FirstValue>(first_))
, second(std::forward<SecondValue>(second_))
{
}
};
template <typename First, typename Second>
PairNoInit<std::decay_t<First>, std::decay_t<Second>> makePairNoInit(First && first, Second && second)
{
return PairNoInit<std::decay_t<First>, std::decay_t<Second>>(std::forward<First>(first), std::forward<Second>(second));
}
template <typename Key, typename TMapped, typename Hash, typename TState = HashTableNoState>
struct HashMapCell

View File

@ -1,11 +1,14 @@
#pragma once
#include <Common/Arena.h>
#include <Common/HashTable/HashMap.h>
#include <Columns/IColumn.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Core/Block.h>
#include <Dictionaries/IDictionary.h>
#include <Dictionaries/DictionaryStructure.h>
@ -416,6 +419,105 @@ private:
Arena * complex_key_arena;
};
/** Merge block with blocks from stream. If there are duplicate keys in block they are filtered out.
* In result block_to_update will be merged with each block readed from stream.
* Note: readPrefix readImpl readSuffix will be called on stream object during function execution.
*/
template <DictionaryKeyType dictionary_key_type>
void mergeBlockWithStream(
size_t key_column_size [[maybe_unused]],
Block & block_to_update [[maybe_unused]],
BlockInputStreamPtr & stream [[maybe_unused]])
{
using KeyType = std::conditional_t<dictionary_key_type == DictionaryKeyType::simple, UInt64, StringRef>;
static_assert(dictionary_key_type != DictionaryKeyType::range, "Range key type is not supported by updatePreviousyLoadedBlockWithStream");
Columns saved_block_key_columns;
saved_block_key_columns.reserve(key_column_size);
/// Split into keys columns and attribute columns
for (size_t i = 0; i < key_column_size; ++i)
saved_block_key_columns.emplace_back(block_to_update.safeGetByPosition(i).column);
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
DictionaryKeysExtractor<dictionary_key_type> saved_keys_extractor(saved_block_key_columns, arena_holder.getComplexKeyArena());
auto saved_keys_extracted_from_block = saved_keys_extractor.extractAllKeys();
IColumn::Filter filter(saved_keys_extracted_from_block.size(), true);
HashMap<KeyType, size_t> saved_key_to_index;
saved_key_to_index.reserve(saved_keys_extracted_from_block.size());
size_t indexes_to_remove_count = 0;
for (size_t i = 0; i < saved_keys_extracted_from_block.size(); ++i)
{
auto saved_key = saved_keys_extracted_from_block[i];
auto [it, was_inserted] = saved_key_to_index.insert(makePairNoInit(saved_key, i));
if (!was_inserted)
{
size_t index_to_remove = it->getMapped();
filter[index_to_remove] = false;
it->getMapped() = i;
++indexes_to_remove_count;
}
}
auto result_fetched_columns = block_to_update.cloneEmptyColumns();
stream->readPrefix();
while (Block block = stream->read())
{
Columns block_key_columns;
block_key_columns.reserve(key_column_size);
/// Split into keys columns and attribute columns
for (size_t i = 0; i < key_column_size; ++i)
block_key_columns.emplace_back(block.safeGetByPosition(i).column);
DictionaryKeysExtractor<dictionary_key_type> update_keys_extractor(block_key_columns, arena_holder.getComplexKeyArena());
PaddedPODArray<KeyType> update_keys = update_keys_extractor.extractAllKeys();
for (auto update_key : update_keys)
{
const auto * it = saved_key_to_index.find(update_key);
if (it != nullptr)
{
size_t index_to_filter = it->getMapped();
filter[index_to_filter] = false;
++indexes_to_remove_count;
}
}
size_t rows = block.rows();
for (size_t column_index = 0; column_index < block.columns(); ++column_index)
{
const auto update_column = block.safeGetByPosition(column_index).column;
MutableColumnPtr & result_fetched_column = result_fetched_columns[column_index];
result_fetched_column->insertRangeFrom(*update_column, 0, rows);
}
}
stream->readSuffix();
size_t result_fetched_rows = result_fetched_columns.front()->size();
size_t filter_hint = filter.size() - indexes_to_remove_count;
for (size_t column_index = 0; column_index < block_to_update.columns(); ++column_index)
{
auto & column = block_to_update.getByPosition(column_index).column;
column = column->filter(filter, filter_hint);
MutableColumnPtr mutable_column = column->assumeMutable();
const IColumn & fetched_column = *result_fetched_columns[column_index];
mutable_column->insertRangeFrom(fetched_column, 0, result_fetched_rows);
}
}
/**
* Returns ColumnVector data as PaddedPodArray.

View File

@ -32,14 +32,14 @@ FlatDictionary::FlatDictionary(
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
bool require_nonempty_,
BlockPtr saved_block_)
BlockPtr previously_loaded_block_)
: IDictionary(dict_id_)
, dict_struct(dict_struct_)
, source_ptr{std::move(source_ptr_)}
, dict_lifetime(dict_lifetime_)
, require_nonempty(require_nonempty_)
, loaded_ids(initial_array_size, false)
, saved_block{std::move(saved_block_)}
, loaded_keys(initial_array_size, false)
, previously_loaded_block(std::move(previously_loaded_block_))
{
createAttributes();
loadData();
@ -126,20 +126,19 @@ ColumnPtr FlatDictionary::getColumn(
ColumnUInt8::Ptr FlatDictionary::hasKeys(const Columns & key_columns, const DataTypes &) const
{
PaddedPODArray<UInt64> backup_storage;
const auto& ids = getColumnVectorData(this, key_columns.front(), backup_storage);
const auto & keys = getColumnVectorData(this, key_columns.front(), backup_storage);
size_t keys_size = keys.size();
auto result = ColumnUInt8::create(ext::size(ids));
auto result = ColumnUInt8::create(keys_size);
auto & out = result->getData();
const auto ids_count = ext::size(ids);
for (const auto i : ext::range(0, ids_count))
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
const auto id = ids[i];
out[i] = id < loaded_ids.size() && loaded_ids[id];
const auto key = keys[key_index];
out[key_index] = key < loaded_keys.size() && loaded_keys[key];
}
query_count.fetch_add(ids_count, std::memory_order_relaxed);
query_count.fetch_add(keys_size, std::memory_order_relaxed);
return result;
}
@ -153,22 +152,14 @@ ColumnPtr FlatDictionary::getHierarchy(ColumnPtr key_column, const DataTypePtr &
const auto & hierarchical_attribute = attributes[hierarchical_attribute_index];
const UInt64 null_value = std::get<UInt64>(hierarchical_attribute.null_values);
const ContainerType<UInt64> & parent_keys = std::get<ContainerType<UInt64>>(hierarchical_attribute.arrays);
const ContainerType<UInt64> & parent_keys = std::get<ContainerType<UInt64>>(hierarchical_attribute.container);
auto is_key_valid_func = [&, this](auto & key)
{
return key < loaded_ids.size() && loaded_ids[key];
};
auto is_key_valid_func = [&, this](auto & key) { return key < loaded_keys.size() && loaded_keys[key]; };
auto get_parent_key_func = [&, this](auto & hierarchy_key)
{
std::optional<UInt64> result;
if (hierarchy_key >= loaded_ids.size() || !loaded_ids[hierarchy_key])
return result;
result = parent_keys[hierarchy_key];
bool is_key_valid = hierarchy_key < loaded_keys.size() && loaded_keys[hierarchy_key];
std::optional<UInt64> result = is_key_valid ? std::make_optional(parent_keys[hierarchy_key]) : std::nullopt;
return result;
};
@ -194,22 +185,14 @@ ColumnUInt8::Ptr FlatDictionary::isInHierarchy(
const auto & hierarchical_attribute = attributes[hierarchical_attribute_index];
const UInt64 null_value = std::get<UInt64>(hierarchical_attribute.null_values);
const ContainerType<UInt64> & parent_keys = std::get<ContainerType<UInt64>>(hierarchical_attribute.arrays);
const ContainerType<UInt64> & parent_keys = std::get<ContainerType<UInt64>>(hierarchical_attribute.container);
auto is_key_valid_func = [&, this](auto & key)
{
return key < loaded_ids.size() && loaded_ids[key];
};
auto is_key_valid_func = [&, this](auto & key) { return key < loaded_keys.size() && loaded_keys[key]; };
auto get_parent_key_func = [&, this](auto & hierarchy_key)
{
std::optional<UInt64> result;
if (hierarchy_key >= loaded_ids.size() || !loaded_ids[hierarchy_key])
return result;
result = parent_keys[hierarchy_key];
bool is_key_valid = hierarchy_key < loaded_keys.size() && loaded_keys[hierarchy_key];
std::optional<UInt64> result = is_key_valid ? std::make_optional(parent_keys[hierarchy_key]) : std::nullopt;
return result;
};
@ -230,7 +213,7 @@ ColumnPtr FlatDictionary::getDescendants(
size_t hierarchical_attribute_index = *dict_struct.hierarchical_attribute_index;
const auto & hierarchical_attribute = attributes[hierarchical_attribute_index];
const ContainerType<UInt64> & parent_keys = std::get<ContainerType<UInt64>>(hierarchical_attribute.arrays);
const ContainerType<UInt64> & parent_keys = std::get<ContainerType<UInt64>>(hierarchical_attribute.container);
HashMap<UInt64, PaddedPODArray<UInt64>> parent_to_child;
@ -238,7 +221,7 @@ ColumnPtr FlatDictionary::getDescendants(
{
auto parent_key = parent_keys[i];
if (loaded_ids[i])
if (loaded_keys[i])
parent_to_child[parent_key].emplace_back(static_cast<UInt64>(i));
}
@ -260,22 +243,34 @@ void FlatDictionary::createAttributes()
void FlatDictionary::blockToAttributes(const Block & block)
{
const IColumn & id_column = *block.safeGetByPosition(0).column;
element_count += id_column.size();
const auto keys_column = block.safeGetByPosition(0).column;
for (const size_t attribute_idx : ext::range(0, attributes.size()))
DictionaryKeysArenaHolder<DictionaryKeyType::simple> arena_holder;
DictionaryKeysExtractor<DictionaryKeyType::simple> keys_extractor({ keys_column }, arena_holder.getComplexKeyArena());
auto keys = keys_extractor.extractAllKeys();
size_t key_offset = 1;
for (size_t attribute_index = 0; attribute_index < attributes.size(); ++attribute_index)
{
const IColumn & attribute_column = *block.safeGetByPosition(attribute_idx + 1).column;
Attribute & attribute = attributes[attribute_idx];
const IColumn & attribute_column = *block.safeGetByPosition(attribute_index + key_offset).column;
Attribute & attribute = attributes[attribute_index];
for (const auto row_idx : ext::range(0, id_column.size()))
setAttributeValue(attribute, id_column[row_idx].get<UInt64>(), attribute_column[row_idx]);
for (size_t i = 0; i < keys.size(); ++i)
{
auto key = keys[i];
if (key < loaded_keys.size() && loaded_keys[key])
continue;
setAttributeValue(attribute, key, attribute_column[i]);
++element_count;
}
}
}
void FlatDictionary::updateData()
{
if (!saved_block || saved_block->rows() == 0)
if (!previously_loaded_block || previously_loaded_block->rows() == 0)
{
auto stream = source_ptr->loadUpdatedAll();
stream->readPrefix();
@ -283,12 +278,13 @@ void FlatDictionary::updateData()
while (const auto block = stream->read())
{
/// We are using this to keep saved data if input stream consists of multiple blocks
if (!saved_block)
saved_block = std::make_shared<DB::Block>(block.cloneEmpty());
for (const auto attribute_idx : ext::range(0, attributes.size() + 1))
if (!previously_loaded_block)
previously_loaded_block = std::make_shared<DB::Block>(block.cloneEmpty());
for (size_t column_index = 0; column_index < block.columns(); ++column_index)
{
const IColumn & update_column = *block.getByPosition(attribute_idx).column.get();
MutableColumnPtr saved_column = saved_block->getByPosition(attribute_idx).column->assumeMutable();
const IColumn & update_column = *block.getByPosition(column_index).column.get();
MutableColumnPtr saved_column = previously_loaded_block->getByPosition(column_index).column->assumeMutable();
saved_column->insertRangeFrom(update_column, 0, update_column.size());
}
}
@ -297,51 +293,14 @@ void FlatDictionary::updateData()
else
{
auto stream = source_ptr->loadUpdatedAll();
stream->readPrefix();
while (Block block = stream->read())
{
const auto & saved_id_column = *saved_block->safeGetByPosition(0).column;
const auto & update_id_column = *block.safeGetByPosition(0).column;
std::unordered_map<UInt64, std::vector<size_t>> update_ids;
for (size_t row = 0; row < update_id_column.size(); ++row)
{
const auto id = update_id_column.get64(row);
update_ids[id].push_back(row);
mergeBlockWithStream<DictionaryKeyType::simple>(
dict_struct.getKeysSize(),
*previously_loaded_block,
stream);
}
const size_t saved_rows = saved_id_column.size();
IColumn::Filter filter(saved_rows);
std::unordered_map<UInt64, std::vector<size_t>>::iterator it;
for (size_t row = 0; row < saved_id_column.size(); ++row)
{
auto id = saved_id_column.get64(row);
it = update_ids.find(id);
if (it != update_ids.end())
filter[row] = 0;
else
filter[row] = 1;
}
auto block_columns = block.mutateColumns();
for (const auto attribute_idx : ext::range(0, attributes.size() + 1))
{
auto & column = saved_block->safeGetByPosition(attribute_idx).column;
const auto & filtered_column = column->filter(filter, -1);
block_columns[attribute_idx]->insertRangeFrom(*filtered_column.get(), 0, filtered_column->size());
}
saved_block->setColumns(std::move(block_columns));
}
stream->readSuffix();
}
if (saved_block)
blockToAttributes(*saved_block.get());
if (previously_loaded_block)
blockToAttributes(*previously_loaded_block.get());
}
void FlatDictionary::loadData()
@ -363,24 +322,6 @@ void FlatDictionary::loadData()
throw Exception{full_name + ": dictionary source is empty and 'require_nonempty' property is set.", ErrorCodes::DICTIONARY_IS_EMPTY};
}
template <typename T>
void FlatDictionary::addAttributeSize(const Attribute & attribute)
{
const auto & array_ref = std::get<ContainerType<T>>(attribute.arrays);
bytes_allocated += sizeof(PaddedPODArray<T>) + array_ref.allocated_bytes();
bucket_count = array_ref.capacity();
}
template <>
void FlatDictionary::addAttributeSize<String>(const Attribute & attribute)
{
const auto & array_ref = std::get<ContainerType<StringRef>>(attribute.arrays);
bytes_allocated += sizeof(PaddedPODArray<StringRef>) + array_ref.allocated_bytes();
bytes_allocated += sizeof(Arena) + attribute.string_arena->size();
bucket_count = array_ref.capacity();
}
void FlatDictionary::calculateBytesAllocated()
{
bytes_allocated += attributes.size() * sizeof(attributes.front());
@ -391,8 +332,14 @@ void FlatDictionary::calculateBytesAllocated()
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using ValueType = DictionaryValueType<AttributeType>;
addAttributeSize<AttributeType>(attribute);
const auto & container = std::get<ContainerType<ValueType>>(attribute.container);
bytes_allocated += sizeof(PaddedPODArray<ValueType>) + container.allocated_bytes();
bucket_count = container.capacity();
if constexpr (std::is_same_v<ValueType, StringRef>)
bytes_allocated += sizeof(Arena) + attribute.string_arena->size();
};
callOnDictionaryAttributeType(attribute.type, type_call);
@ -405,7 +352,7 @@ void FlatDictionary::createAttributeImpl(Attribute & attribute, const Field & nu
{
attribute.null_values = T(null_value.get<T>());
const auto & null_value_ref = std::get<T>(attribute.null_values);
attribute.arrays.emplace<ContainerType<T>>(initial_array_size, null_value_ref);
attribute.container.emplace<ContainerType<T>>(initial_array_size, null_value_ref);
}
template <>
@ -415,7 +362,7 @@ void FlatDictionary::createAttributeImpl<String>(Attribute & attribute, const Fi
const String & string = null_value.get<String>();
const char * string_in_arena = attribute.string_arena->insert(string.data(), string.size());
attribute.null_values.emplace<StringRef>(string_in_arena, string.size());
attribute.arrays.emplace<ContainerType<StringRef>>(initial_array_size, StringRef(string_in_arena, string.size()));
attribute.container.emplace<ContainerType<StringRef>>(initial_array_size, StringRef(string_in_arena, string.size()));
}
FlatDictionary::Attribute FlatDictionary::createAttribute(const DictionaryAttribute& attribute, const Field & null_value)
@ -436,57 +383,64 @@ FlatDictionary::Attribute FlatDictionary::createAttribute(const DictionaryAttrib
return attr;
}
template <typename AttributeType, typename OutputType, typename ValueSetter, typename DefaultValueExtractor>
void FlatDictionary::getItemsImpl(
const Attribute & attribute,
const PaddedPODArray<UInt64> & ids,
const PaddedPODArray<UInt64> & keys,
ValueSetter && set_value,
DefaultValueExtractor & default_value_extractor) const
{
const auto & attr = std::get<ContainerType<AttributeType>>(attribute.arrays);
const auto rows = ext::size(ids);
const auto & container = std::get<ContainerType<AttributeType>>(attribute.container);
const auto rows = keys.size();
for (const auto row : ext::range(0, rows))
for (size_t row = 0; row < rows; ++row)
{
const auto id = ids[row];
set_value(row, id < ext::size(attr) && loaded_ids[id] ? static_cast<OutputType>(attr[id]) : default_value_extractor[row]);
const auto key = keys[row];
if (key < loaded_keys.size() && loaded_keys[key])
set_value(row, static_cast<OutputType>(container[key]));
else
set_value(row, default_value_extractor[row]);
}
query_count.fetch_add(rows, std::memory_order_relaxed);
}
template <typename T>
void FlatDictionary::resize(Attribute & attribute, const UInt64 id)
void FlatDictionary::resize(Attribute & attribute, UInt64 key)
{
if (id >= max_array_size)
throw Exception{full_name + ": identifier should be less than " + toString(max_array_size), ErrorCodes::ARGUMENT_OUT_OF_BOUND};
if (key >= max_array_size)
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND,
"({}): identifier should be less than ({})",
full_name,
toString(max_array_size));
auto & array = std::get<ContainerType<T>>(attribute.arrays);
if (id >= array.size())
auto & container = std::get<ContainerType<T>>(attribute.container);
if (key >= container.size())
{
const size_t elements_count = id + 1; //id=0 -> elements_count=1
loaded_ids.resize(elements_count, false);
array.resize_fill(elements_count, std::get<T>(attribute.null_values));
const size_t elements_count = key + 1; //id=0 -> elements_count=1
loaded_keys.resize(elements_count, false);
container.resize_fill(elements_count, std::get<T>(attribute.null_values));
}
}
template <typename T>
void FlatDictionary::setAttributeValueImpl(Attribute & attribute, const UInt64 id, const T & value)
void FlatDictionary::setAttributeValueImpl(Attribute & attribute, UInt64 key, const T & value)
{
auto & array = std::get<ContainerType<T>>(attribute.arrays);
array[id] = value;
loaded_ids[id] = true;
auto & array = std::get<ContainerType<T>>(attribute.container);
array[key] = value;
loaded_keys[key] = true;
}
template <>
void FlatDictionary::setAttributeValueImpl<String>(Attribute & attribute, const UInt64 id, const String & value)
void FlatDictionary::setAttributeValueImpl<String>(Attribute & attribute, UInt64 key, const String & value)
{
const auto * string_in_arena = attribute.string_arena->insert(value.data(), value.size());
setAttributeValueImpl(attribute, id, StringRef{string_in_arena, value.size()});
setAttributeValueImpl(attribute, key, StringRef{string_in_arena, value.size()});
}
void FlatDictionary::setAttributeValue(Attribute & attribute, const UInt64 id, const Field & value)
void FlatDictionary::setAttributeValue(Attribute & attribute, const UInt64 key, const Field & value)
{
auto type_call = [&](const auto &dictionary_attribute_type)
{
@ -494,44 +448,36 @@ void FlatDictionary::setAttributeValue(Attribute & attribute, const UInt64 id, c
using AttributeType = typename Type::AttributeType;
using ResizeType = std::conditional_t<std::is_same_v<AttributeType, String>, StringRef, AttributeType>;
resize<ResizeType>(attribute, id);
resize<ResizeType>(attribute, key);
if (attribute.nullable_set)
{
if (value.isNull())
{
attribute.nullable_set->insert(id);
loaded_ids[id] = true;
attribute.nullable_set->insert(key);
loaded_keys[key] = true;
return;
}
else
{
attribute.nullable_set->erase(id);
}
}
setAttributeValueImpl<AttributeType>(attribute, id, value.get<AttributeType>());
setAttributeValueImpl<AttributeType>(attribute, key, value.get<AttributeType>());
};
callOnDictionaryAttributeType(attribute.type, type_call);
}
PaddedPODArray<UInt64> FlatDictionary::getIds() const
{
const auto ids_count = ext::size(loaded_ids);
PaddedPODArray<UInt64> ids;
ids.reserve(ids_count);
for (auto idx : ext::range(0, ids_count))
if (loaded_ids[idx])
ids.push_back(idx);
return ids;
}
BlockInputStreamPtr FlatDictionary::getBlockInputStream(const Names & column_names, size_t max_block_size) const
{
return std::make_shared<DictionaryBlockInputStream>(shared_from_this(), max_block_size, getIds(), column_names);
const auto keys_count = loaded_keys.size();
PaddedPODArray<UInt64> keys;
keys.reserve(keys_count);
for (size_t key_index = 0; key_index < keys_count; ++key_index)
if (loaded_keys[key_index])
keys.push_back(key_index);
return std::make_shared<DictionaryBlockInputStream>(shared_from_this(), max_block_size, std::move(keys), column_names);
}
void registerDictionaryFlat(DictionaryFactory & factory)
@ -543,19 +489,20 @@ void registerDictionaryFlat(DictionaryFactory & factory)
DictionarySourcePtr source_ptr) -> DictionaryPtr
{
if (dict_struct.key)
throw Exception{"'key' is not supported for dictionary of layout 'flat'", ErrorCodes::UNSUPPORTED_METHOD};
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "'key' is not supported for dictionary of layout 'flat'");
if (dict_struct.range_min || dict_struct.range_max)
throw Exception{full_name
+ ": elements .structure.range_min and .structure.range_max should be defined only "
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"({}): elements .structure.range_min and .structure.range_max should be defined only "
"for a dictionary of layout 'range_hashed'",
ErrorCodes::BAD_ARGUMENTS};
full_name);
const auto dict_id = StorageID::fromDictionaryConfig(config, config_prefix);
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
return std::make_unique<FlatDictionary>(dict_id, dict_struct, std::move(source_ptr), dict_lifetime, require_nonempty);
};
factory.registerLayout("flat", create_layout, false);
}

View File

@ -32,7 +32,7 @@ public:
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
bool require_nonempty_,
BlockPtr saved_block_ = nullptr);
BlockPtr previously_loaded_block_ = nullptr);
std::string getTypeName() const override { return "Flat"; }
@ -48,7 +48,7 @@ public:
std::shared_ptr<const IExternalLoadable> clone() const override
{
return std::make_shared<FlatDictionary>(getDictionaryID(), dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, saved_block);
return std::make_shared<FlatDictionary>(getDictionaryID(), dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, previously_loaded_block);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }
@ -133,7 +133,7 @@ private:
ContainerType<Float32>,
ContainerType<Float64>,
ContainerType<StringRef>>
arrays;
container;
std::unique_ptr<Arena> string_arena;
};
@ -143,9 +143,6 @@ private:
void updateData();
void loadData();
template <typename T>
void addAttributeSize(const Attribute & attribute);
void calculateBytesAllocated();
template <typename T>
@ -156,41 +153,32 @@ private:
template <typename AttributeType, typename OutputType, typename ValueSetter, typename DefaultValueExtractor>
void getItemsImpl(
const Attribute & attribute,
const PaddedPODArray<UInt64> & ids,
const PaddedPODArray<UInt64> & keys,
ValueSetter && set_value,
DefaultValueExtractor & default_value_extractor) const;
template <typename T>
void resize(Attribute & attribute, const UInt64 id);
void resize(Attribute & attribute, UInt64 key);
template <typename T>
void setAttributeValueImpl(Attribute & attribute, const UInt64 id, const T & value);
void setAttributeValueImpl(Attribute & attribute, UInt64 key, const T & value);
void setAttributeValue(Attribute & attribute, const UInt64 id, const Field & value);
const Attribute & getAttribute(const std::string & attribute_name) const;
template <typename ChildType, typename AncestorType>
void isInImpl(const ChildType & child_ids, const AncestorType & ancestor_ids, PaddedPODArray<UInt8> & out) const;
PaddedPODArray<UInt64> getIds() const;
void setAttributeValue(Attribute & attribute, UInt64 key, const Field & value);
const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
const bool require_nonempty;
std::map<std::string, size_t> attribute_index_by_name;
std::vector<Attribute> attributes;
std::vector<bool> loaded_ids;
std::vector<bool> loaded_keys;
size_t bytes_allocated = 0;
size_t element_count = 0;
size_t bucket_count = 0;
mutable std::atomic<size_t> query_count{0};
/// TODO: Remove
BlockPtr saved_block;
BlockPtr previously_loaded_block;
};
}

View File

@ -1,9 +1,5 @@
#include "HashedDictionary.h"
#include <ext/size.h>
#include <absl/container/flat_hash_map.h>
#include <Core/Defines.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnsNumber.h>
@ -46,13 +42,13 @@ HashedDictionary<dictionary_key_type, sparse>::HashedDictionary(
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
bool require_nonempty_,
BlockPtr saved_block_)
BlockPtr previously_loaded_block_)
: IDictionary(dict_id_)
, dict_struct(dict_struct_)
, source_ptr(std::move(source_ptr_))
, dict_lifetime(dict_lifetime_)
, require_nonempty(require_nonempty_)
, saved_block(std::move(saved_block_))
, previously_loaded_block(std::move(previously_loaded_block_))
{
createAttributes();
loadData();
@ -347,7 +343,7 @@ void HashedDictionary<dictionary_key_type, sparse>::createAttributes()
template <DictionaryKeyType dictionary_key_type, bool sparse>
void HashedDictionary<dictionary_key_type, sparse>::updateData()
{
if (!saved_block || saved_block->rows() == 0)
if (!previously_loaded_block || previously_loaded_block->rows() == 0)
{
auto stream = source_ptr->loadUpdatedAll();
stream->readPrefix();
@ -355,13 +351,13 @@ void HashedDictionary<dictionary_key_type, sparse>::updateData()
while (const auto block = stream->read())
{
/// We are using this to keep saved data if input stream consists of multiple blocks
if (!saved_block)
saved_block = std::make_shared<DB::Block>(block.cloneEmpty());
if (!previously_loaded_block)
previously_loaded_block = std::make_shared<DB::Block>(block.cloneEmpty());
for (const auto attribute_idx : ext::range(0, attributes.size() + 1))
{
const IColumn & update_column = *block.getByPosition(attribute_idx).column.get();
MutableColumnPtr saved_column = saved_block->getByPosition(attribute_idx).column->assumeMutable();
MutableColumnPtr saved_column = previously_loaded_block->getByPosition(attribute_idx).column->assumeMutable();
saved_column->insertRangeFrom(update_column, 0, update_column.size());
}
}
@ -369,70 +365,17 @@ void HashedDictionary<dictionary_key_type, sparse>::updateData()
}
else
{
size_t skip_keys_size_offset = dict_struct.getKeysSize();
Columns saved_block_key_columns;
saved_block_key_columns.reserve(skip_keys_size_offset);
/// Split into keys columns and attribute columns
for (size_t i = 0; i < skip_keys_size_offset; ++i)
saved_block_key_columns.emplace_back(saved_block->safeGetByPosition(i).column);
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
DictionaryKeysExtractor<dictionary_key_type> saved_keys_extractor(saved_block_key_columns, arena_holder.getComplexKeyArena());
auto saved_keys_extracted_from_block = saved_keys_extractor.extractAllKeys();
auto stream = source_ptr->loadUpdatedAll();
stream->readPrefix();
while (Block block = stream->read())
{
/// TODO: Rewrite
Columns block_key_columns;
block_key_columns.reserve(skip_keys_size_offset);
/// Split into keys columns and attribute columns
for (size_t i = 0; i < skip_keys_size_offset; ++i)
block_key_columns.emplace_back(block.safeGetByPosition(i).column);
DictionaryKeysExtractor<dictionary_key_type> block_keys_extractor(saved_block_key_columns, arena_holder.getComplexKeyArena());
auto keys_extracted_from_block = block_keys_extractor.extractAllKeys();
absl::flat_hash_map<KeyType, std::vector<size_t>, DefaultHash<KeyType>> update_keys;
for (size_t row = 0; row < keys_extracted_from_block.size(); ++row)
{
auto key = keys_extracted_from_block[row];
update_keys[key].push_back(row);
mergeBlockWithStream<dictionary_key_type>(
dict_struct.getKeysSize(),
*previously_loaded_block,
stream);
}
IColumn::Filter filter(saved_keys_extracted_from_block.size());
for (size_t row = 0; row < saved_keys_extracted_from_block.size(); ++row)
if (previously_loaded_block)
{
auto key = saved_keys_extracted_from_block[row];
auto it = update_keys.find(key);
filter[row] = (it == update_keys.end());
}
auto block_columns = block.mutateColumns();
for (const auto attribute_idx : ext::range(0, attributes.size() + 1))
{
auto & column = saved_block->safeGetByPosition(attribute_idx).column;
const auto & filtered_column = column->filter(filter, -1);
block_columns[attribute_idx]->insertRangeFrom(*filtered_column.get(), 0, filtered_column->size());
}
saved_block->setColumns(std::move(block_columns));
}
stream->readSuffix();
}
if (saved_block)
{
resize(saved_block->rows());
blockToAttributes(*saved_block.get());
resize(previously_loaded_block->rows());
blockToAttributes(*previously_loaded_block.get());
}
}

View File

@ -41,7 +41,7 @@ public:
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
bool require_nonempty_,
BlockPtr saved_block_ = nullptr);
BlockPtr previously_loaded_block_ = nullptr);
std::string getTypeName() const override
{
@ -67,7 +67,7 @@ public:
std::shared_ptr<const IExternalLoadable> clone() const override
{
return std::make_shared<HashedDictionary<dictionary_key_type, sparse>>(getDictionaryID(), dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, saved_block);
return std::make_shared<HashedDictionary<dictionary_key_type, sparse>>(getDictionaryID(), dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, previously_loaded_block);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }
@ -219,8 +219,7 @@ private:
size_t bucket_count = 0;
mutable std::atomic<size_t> query_count{0};
/// TODO: Remove
BlockPtr saved_block;
BlockPtr previously_loaded_block;
Arena complex_key_arena;
};

View File

@ -0,0 +1,8 @@
1 First
simple_key_flat_dictionary 01785_db 1
1 First
simple_key_hashed_dictionary 01785_db 1
1 First
simple_key_cache_dictionary 01785_db 1
1 FirstKey First
complex_key_hashed_dictionary 01785_db 1

View File

@ -0,0 +1,91 @@
DROP DATABASE IF EXISTS 01785_db;
CREATE DATABASE 01785_db;
DROP TABLE IF EXISTS 01785_db.simple_key_source_table;
CREATE TABLE 01785_db.simple_key_source_table
(
id UInt64,
value String
) ENGINE = TinyLog();
INSERT INTO 01785_db.simple_key_source_table VALUES (1, 'First');
INSERT INTO 01785_db.simple_key_source_table VALUES (1, 'First');
DROP DICTIONARY IF EXISTS 01785_db.simple_key_flat_dictionary;
CREATE DICTIONARY 01785_db.simple_key_flat_dictionary
(
id UInt64,
value String
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() DB '01785_db' TABLE 'simple_key_source_table'))
LAYOUT(FLAT())
LIFETIME(MIN 0 MAX 1000);
SELECT * FROM 01785_db.simple_key_flat_dictionary;
SELECT name, database, element_count FROM system.dictionaries WHERE database = '01785_db' AND name = 'simple_key_flat_dictionary';
DROP DICTIONARY 01785_db.simple_key_flat_dictionary;
CREATE DICTIONARY 01785_db.simple_key_hashed_dictionary
(
id UInt64,
value String
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() DB '01785_db' TABLE 'simple_key_source_table'))
LAYOUT(HASHED())
LIFETIME(MIN 0 MAX 1000);
SELECT * FROM 01785_db.simple_key_hashed_dictionary;
SELECT name, database, element_count FROM system.dictionaries WHERE database = '01785_db' AND name = 'simple_key_hashed_dictionary';
DROP DICTIONARY 01785_db.simple_key_hashed_dictionary;
CREATE DICTIONARY 01785_db.simple_key_cache_dictionary
(
id UInt64,
value String
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() DB '01785_db' TABLE 'simple_key_source_table'))
LAYOUT(CACHE(SIZE_IN_CELLS 100000))
LIFETIME(MIN 0 MAX 1000);
SELECT toUInt64(1) as key, dictGet('01785_db.simple_key_cache_dictionary', 'value', key);
SELECT name, database, element_count FROM system.dictionaries WHERE database = '01785_db' AND name = 'simple_key_cache_dictionary';
DROP DICTIONARY 01785_db.simple_key_cache_dictionary;
DROP TABLE 01785_db.simple_key_source_table;
DROP TABLE IF EXISTS 01785_db.complex_key_source_table;
CREATE TABLE 01785_db.complex_key_source_table
(
id UInt64,
id_key String,
value String
) ENGINE = TinyLog();
INSERT INTO 01785_db.complex_key_source_table VALUES (1, 'FirstKey', 'First');
INSERT INTO 01785_db.complex_key_source_table VALUES (1, 'FirstKey', 'First');
CREATE DICTIONARY 01785_db.complex_key_hashed_dictionary
(
id UInt64,
id_key String,
value String
)
PRIMARY KEY id, id_key
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() DB '01785_db' TABLE 'complex_key_source_table'))
LAYOUT(COMPLEX_KEY_HASHED())
LIFETIME(MIN 0 MAX 1000);
SELECT * FROM 01785_db.complex_key_hashed_dictionary;
SELECT name, database, element_count FROM system.dictionaries WHERE database = '01785_db' AND name = 'complex_key_hashed_dictionary';
DROP DICTIONARY 01785_db.complex_key_hashed_dictionary;
DROP TABLE 01785_db.complex_key_source_table;
DROP DATABASE 01785_db;

View File

@ -696,6 +696,7 @@
"01760_system_dictionaries",
"01760_polygon_dictionaries",
"01778_hierarchical_dictionaries",
"01780_clickhouse_dictionary_source_loop"
"01780_clickhouse_dictionary_source_loop",
"01785_dictionary_element_count"
]
}