Added FixedDeadlineHashMap

This commit is contained in:
Maksim Kita 2021-03-16 01:59:04 +03:00
parent ee898d6d47
commit 3d1c42827b
3 changed files with 398 additions and 200 deletions

View File

@ -0,0 +1,253 @@
#pragma once
#include <chrono>
#include <type_traits>
#include <Common/HashTable/HashMap.h>
#include <Common/BitHelpers.h>
using TimePoint = std::chrono::system_clock::time_point;
template <typename TKey, typename TMapped, typename Hash, bool save_hash_in_cell>
struct DeadlineCell :
public std::conditional_t<save_hash_in_cell,
HashMapCellWithSavedHash<TKey, TMapped, Hash, HashTableNoState>,
HashMapCell<TKey, TMapped, Hash, HashTableNoState>>
{
using Key = TKey;
using Base = std::conditional_t<save_hash_in_cell,
HashMapCellWithSavedHash<TKey, TMapped, Hash, HashTableNoState>,
HashMapCell<TKey, TMapped, Hash, HashTableNoState>>;
using Mapped = typename Base::Mapped;
using State = typename Base::State;
using mapped_type = Mapped;
using key_type = Key;
using Base::Base;
inline TimePoint getDeadline() const { return deadline; }
void setDeadline(TimePoint & deadline_value) { deadline = deadline_value; }
private:
TimePoint deadline;
};
template <typename TKey, typename TValue, typename Disposer, typename Hash, bool save_hash_in_cells>
class FixedDeadlineHashMapImpl :
private HashMapTable<
TKey,
DeadlineCell<TKey, TValue, Hash, save_hash_in_cells>,
Hash,
HashTableGrower<>,
HashTableAllocator>
{
/// TODO: Make custom grower
using Base = HashMapTable<
TKey,
DeadlineCell<TKey, TValue, Hash, save_hash_in_cells>,
Hash,
HashTableGrower<>,
HashTableAllocator>;
static size_t calculateMaxSize(size_t max_size, size_t max_collision_resolution_chain)
{
return roundUpToPowerOfTwoOrZero(std::max(max_size, max_collision_resolution_chain));
}
public:
using Cell = DeadlineCell<TKey, TValue, Hash, save_hash_in_cells>;
using Key = TKey;
using Value = TValue;
using Mapped = typename Cell::Mapped;
explicit FixedDeadlineHashMapImpl(size_t max_size_, size_t max_collision_resolution_chain_, Disposer disposer_ = Disposer())
: Base(calculateMaxSize(max_size_, max_collision_resolution_chain_))
, max_collision_resolution_chain(max_collision_resolution_chain_)
, max_size(max_size_)
, disposer(std::move(disposer_))
{
assert(max_size > 0);
assert(max_collision_resolution_chain > 0);
}
~FixedDeadlineHashMapImpl()
{
clear();
}
Cell * get(const Key & key)
{
if (Cell::isZero(key, *this))
return this->hasZero() ? this->zeroValue() : nullptr;
/// TODO: Optimize
size_t hash_value = Base::hash(key);
size_t place_value = Base::grower.place(hash_value);
size_t resolution_chain = max_collision_resolution_chain;
while (resolution_chain != 0)
{
auto & cell = Base::buf[place_value];
if (cell.isZero(*this))
return nullptr;
if (cell.keyEquals(key, hash_value, *this))
return &cell;
place_value = Base::grower.next(place_value);
--resolution_chain;
}
return nullptr;
}
const Cell * get(const Key & key) const
{
return const_cast<std::decay_t<decltype(*this)> *>(this)->get(key);
}
std::pair<Cell *, bool> ALWAYS_INLINE insert(const Key & key, const Value & value)
{
return emplace(key, value);
}
std::pair<Cell *, bool> ALWAYS_INLINE insert(const Key & key, Value && value)
{
return emplace(key, std::move(value));
}
template<typename ...Args>
std::pair<Cell *, bool> ALWAYS_INLINE emplace(const Key & key, Args && ... args)
{
size_t hash_value = Base::hash(key);
std::pair<Cell *, bool> result;
if (!emplaceIfZero(key, hash_value, result))
result = emplaceNonZeroImpl(key, hash_value);
bool was_inserted = result.second;
if (was_inserted)
new (&result.first->getMapped()) Value(std::forward<Args>(args)...);
return result;
}
template <typename ...Args>
void reinsert(Cell * place_to_use, const Key & key, Args && ... args)
{
size_t hash_value = Base::hash(key);
new (place_to_use) Cell(key, *this);
new (&place_to_use->getMapped()) Value(std::forward<Args>(args)...);
place_to_use->setHash(hash_value);
}
using Base::size;
using iterator = typename Base::iterator;
using const_iterator = typename Base::const_iterator;
using Base::begin;
using Base::end;
size_t getMaxSize() const { return max_size; }
size_t getSizeInBytes() const { return Base::getBufferSizeInBytes(); }
void clear()
{
for (auto & cell : *this)
disposer(cell.getKey(), cell.getMapped());
}
private:
size_t max_collision_resolution_chain;
size_t max_size;
Disposer disposer;
bool emplaceIfZero(const Key & key, size_t hash_value, std::pair<Cell *, bool> & result)
{
if (!Cell::isZero(key, *this))
return false;
if (this->hasZero())
{
result = {this->zeroValue(), false};
return true;
}
++Base::m_size;
this->setHasZero();
this->zeroValue()->setHash(hash_value);
result = {this->zeroValue(), true};
return true;
}
std::pair<Cell *, bool> emplaceNonZeroImpl(const Key & key, size_t hash_value)
{
TimePoint oldest_time = TimePoint::max();
size_t place_value = Base::grower.place(hash_value);
size_t resolution_chain = max_collision_resolution_chain;
bool use_old_value_place = false;
Cell * place_to_insert = nullptr;
while (resolution_chain != 0)
{
auto & cell = Base::buf[place_value];
if (cell.isZero(*this))
{
use_old_value_place = false;
place_to_insert = &cell;
break;
}
if (cell.keyEquals(key, hash_value, *this))
return std::make_pair(&cell, false);
if (cell.getDeadline() < oldest_time)
{
use_old_value_place = true;
place_to_insert = &cell;
}
place_value = Base::grower.next(place_value);
--resolution_chain;
}
if (!place_to_insert)
place_to_insert = &Base::buf[place_value];
if (use_old_value_place)
return std::make_pair(place_to_insert, false);
else
{
++Base::m_size;
new (place_to_insert) Cell(key, *this);
place_to_insert->setHash(hash_value);
return std::make_pair(place_to_insert, true);
}
}
};
template <typename Key, typename Mapped>
struct DefaultFixedHashMapCellDisposer
{
void operator()(const Key &, const Mapped &) const {}
};
template <typename Key, typename Value, typename Disposer = DefaultFixedHashMapCellDisposer<Key, Value>, typename Hash = DefaultHash<Key>>
using FixedDeadlineHashMap = FixedDeadlineHashMapImpl<Key, Value, Disposer, Hash, false>;
template <typename Key, typename Value, typename Disposer = DefaultFixedHashMapCellDisposer<Key, Value>, typename Hash = DefaultHash<Key>>
using FixedDeadlineHashMapWithSavedHash = FixedDeadlineHashMapImpl<Key, Value, Disposer, Hash, true>;

View File

@ -271,13 +271,13 @@ private:
};
template <typename Key, typename Mapped>
struct DefaultCellDisposer
struct DefaultLRUHashMapCellDisposer
{
void operator()(const Key &, const Mapped &) const {}
};
template <typename Key, typename Value, typename Disposer = DefaultCellDisposer<Key, Value>, typename Hash = DefaultHash<Key>>
template <typename Key, typename Value, typename Disposer = DefaultLRUHashMapCellDisposer<Key, Value>, typename Hash = DefaultHash<Key>>
using LRUHashMap = LRUHashMapImpl<Key, Value, Disposer, Hash, false>;
template <typename Key, typename Value, typename Disposer = DefaultCellDisposer<Key, Value>, typename Hash = DefaultHash<Key>>
template <typename Key, typename Value, typename Disposer = DefaultLRUHashMapCellDisposer<Key, Value>, typename Hash = DefaultHash<Key>>
using LRUHashMapWithSavedHash = LRUHashMapImpl<Key, Value, Disposer, Hash, true>;

View File

@ -11,6 +11,7 @@
#include <Common/Arena.h>
#include <Common/ArenaWithFreeLists.h>
#include <Common/HashTable/LRUHashMap.h>
#include <Common/HashTable/FixedDeadlineHashMap.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/ICacheDictionaryStorage.h>
#include <Dictionaries/DictionaryHelpers.h>
@ -46,7 +47,7 @@ public:
CacheDictionaryStorageConfiguration & configuration_)
: configuration(configuration_)
, rnd_engine(randomSeed())
, cache(configuration.max_size_in_cells, false, { *this })
, cache(configuration.max_size_in_cells, 10, { *this })
{
setup(dictionary_structure);
}
@ -162,8 +163,8 @@ private:
, is_default(is_default_)
{}
const size_t element_index;
const bool is_default;
size_t element_index;
bool is_default;
};
template <typename KeysStorageFetchResult>
@ -184,12 +185,12 @@ private:
std::chrono::seconds max_lifetime_seconds(configuration.strict_max_lifetime_seconds);
PaddedPODArray<FetchedKey> fetched_keys;
fetched_keys.reserve(keys_size);
fetched_keys.resize_fill(keys_size);
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
auto key = keys[key_index];
auto * it = cache.find(key);
auto * it = cache.get(key);
if (!it)
{
@ -198,9 +199,10 @@ private:
continue;
}
auto deadline = it->getDeadline();
const auto & cell = it->getMapped();
if (now > cell.deadline + max_lifetime_seconds)
if (now > deadline + max_lifetime_seconds)
{
result.key_index_to_state[key_index] = {KeyState::not_found};
++result.not_found_keys_size;
@ -210,7 +212,7 @@ private:
bool cell_is_expired = false;
KeyState::State key_state = KeyState::found;
if (now > cell.deadline)
if (now > deadline)
{
cell_is_expired = true;
key_state = KeyState::expired;
@ -225,7 +227,7 @@ private:
result.key_index_to_state[key_index].setDefaultValue(cell.is_default);
result.default_keys_size += cell.is_default;
fetched_keys.emplace_back(cell.element_index, cell.is_default);
fetched_keys[key_index] = FetchedKey{cell.element_index, cell.is_default};
}
for (size_t attribute_index = 0; attribute_index < fetch_request.attributesSize(); ++attribute_index)
@ -311,103 +313,143 @@ private:
void insertColumnsForKeysImpl(const PaddedPODArray<KeyType> & keys, Columns columns)
{
const auto now = std::chrono::system_clock::now();
size_t keys_size = keys.size();
size_t columns_size = columns.size();
Field column_value;
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
auto key = keys[key_index];
cache.erase(key);
Cell cell;
auto [it, was_inserted] = cache.insert(key, {});
setCellDeadline(cell, now);
cell.element_index = insert_index;
cell.is_default = false;
++insert_index;
insertCellInCache(key, cell);
}
Field complex_column_value;
for (size_t column_index = 0; column_index < columns.size(); ++column_index)
{
auto & attribute = attributes[column_index];
const auto & column = columns[column_index];
size_t column_size = column->size();
if (unlikely(attribute.is_complex_type))
if (was_inserted)
{
auto & container = std::get<std::vector<Field>>(attribute.attribute_container);
container.reserve(column_size);
auto & cell = it->getMapped();
cell.is_default = false;
for (size_t item_index = 0; item_index < column_size; ++item_index)
for (size_t attribute_index = 0; attribute_index < columns_size; ++attribute_index)
{
column->get(item_index, complex_column_value);
container.emplace_back(complex_column_value);
auto & column = columns[attribute_index];
getAttributeContainer(attribute_index, [&](auto & container)
{
container.emplace_back();
cell.element_index = container.size() - 1;
using ElementType = std::decay_t<decltype(container[0])>;
column->get(key_index, column_value);
if constexpr (std::is_same_v<ElementType, Field>)
container.back() = column_value;
else if constexpr (std::is_same_v<ElementType, StringRef>)
{
const String & value = column_value.get<String>();
StringRef inserted_value = copyStringInArena(StringRef { value.data(), value.size() });
container.back() = inserted_value;
}
else
container.back() = column_value.get<ElementType>();
});
}
}
else
{
auto type_call = [&](const auto & dictionary_attribute_type)
auto & cell_key = it->getKey();
Cell cell;
size_t existing_index = it->getMapped().element_index;
cell.element_index = existing_index;
cell.is_default = false;
if (cell_key != key)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using ValueType = DictionaryValueType<AttributeType>;
using ColumnType =
std::conditional_t<std::is_same_v<AttributeType, String>, ColumnString,
std::conditional_t<IsDecimalNumber<AttributeType>, ColumnDecimal<ValueType>,
ColumnVector<AttributeType>>>;
/// In case of complex key we keep it in arena
if constexpr (std::is_same_v<KeyType, StringRef>)
arena.free(const_cast<char *>(key.data), key.size);
}
const ColumnType & column_typed = static_cast<const ColumnType &>(*column);
cache.reinsert(it, key, cell);
auto & container = std::get<PaddedPODArray<ValueType>>(attribute.attribute_container);
container.reserve(column_size);
/// Put values into index
if constexpr (std::is_same_v<ColumnType, ColumnString>)
for (size_t attribute_index = 0; attribute_index < columns_size; ++attribute_index)
{
auto & column = columns[attribute_index];
getAttributeContainer(attribute_index, [&](auto & container)
{
/// TODO: Serialize while column string in arena then just insert offsets in container
for (size_t item_index = 0; item_index < column_size; ++item_index)
using ElementType = std::decay_t<decltype(container[0])>;
column->get(key_index, column_value);
if constexpr (std::is_same_v<ElementType, Field>)
container[existing_index] = column_value;
else if constexpr (std::is_same_v<ElementType, StringRef>)
{
StringRef value = column->getDataAt(item_index);
StringRef updated_data = copyStringInArena(value);
container.emplace_back(updated_data);
const String & value = column_value.get<String>();
StringRef inserted_value = copyStringInArena(StringRef { value.data(), value.size() });
container[existing_index] = inserted_value;
}
}
else
{
const auto & data = column_typed.getData();
container.insert(data.begin(), data.end());
}
};
callOnDictionaryAttributeType(attribute.type, type_call);
else
container[existing_index] = column_value.get<ElementType>();
});
}
}
}
deleteUnusedKeysIfNecessary();
setCellDeadline(*it, now);
}
}
void insertDefaultKeysImpl(const PaddedPODArray<KeyType> & keys)
{
const auto now = std::chrono::system_clock::now();
for (auto key : keys)
size_t keys_size = keys.size();
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
cache.erase(key);
auto key = keys[key_index];
Cell cell;
Cell value;
value.is_default = true;
setCellDeadline(cell, now);
cell.element_index = 0;
cell.is_default = true;
auto [it, was_inserted] = cache.insert(key, value);
insertCellInCache(key, cell);
if (was_inserted)
{
auto & cell = it->getMapped();
for (size_t attribute_index = 0; attribute_index < attributes.size(); ++attribute_index)
{
getAttributeContainer(attribute_index, [&](auto & container)
{
container.emplace_back();
cell.element_index = container.size();
});
}
}
else
{
value.element_index = it->getMapped().element_index;
if (it->getKey() != key)
{
/// In case of complex key we keep it in arena
if constexpr (std::is_same_v<KeyType, StringRef>)
arena.free(const_cast<char *>(key.data), key.size);
}
cache.reinsert(it, key, value);
}
setCellDeadline(*it, now);
}
deleteUnusedKeysIfNecessary();
}
PaddedPODArray<KeyType> getCachedKeysImpl() const
@ -428,92 +470,6 @@ private:
return result;
}
void deleteUnusedKeysIfNecessary()
{
size_t cache_max_size = cache.getMaxSize();
if (unlikely(attributes.empty()) || insert_index < cache_max_size * 2)
return;
absl::flat_hash_map<size_t, typename CacheLRUHashMap::iterator, DefaultHash<size_t>> element_index_to_cache_iterator;
for (auto begin = cache.begin(); begin != cache.end(); ++begin)
{
auto & node = *begin;
auto & cell = node.getMapped();
size_t element_index = cell.element_index;
element_index_to_cache_iterator.insert(std::make_pair(element_index, begin));
}
size_t last_remove_index = 0;
getAttributeContainer(0, [&, this](auto & container)
{
size_t container_size = container.size();
size_t remove_index = 0;
for (size_t i = 0; i < container_size; ++i)
{
if (indexes_to_delete.contains(i))
{
if constexpr (std::is_same_v<decltype(container[0]), StringRef>)
{
StringRef data = container[i];
arena.free(const_cast<char *>(data.data), data.size);
}
continue;
}
std::swap(container[remove_index], container[i]);
auto it = element_index_to_cache_iterator.find(remove_index);
if (it != element_index_to_cache_iterator.end())
{
auto & cell = it->second->getMapped();
cell.element_index = remove_index;
}
++remove_index;
}
container.erase(container.begin() + remove_index, container.end());
last_remove_index = remove_index;
});
insert_index = last_remove_index;
for (size_t attribute_index = 1; attribute_index < attributes.size(); ++attribute_index)
{
getAttributeContainer(attribute_index, [this](auto & container)
{
size_t container_size = container.size();
size_t remove_index = 0;
for (size_t i = 0; i < container_size; ++i)
{
if (indexes_to_delete.contains(i))
{
if constexpr (std::is_same_v<decltype(container[0]), StringRef>)
{
StringRef data = container[i];
arena.free(const_cast<char *>(data.data), data.size);
}
continue;
}
std::swap(container[remove_index], container[i]);
++remove_index;
}
container.erase(container.begin() + remove_index, container.end());
});
}
indexes_to_delete.clear();
}
template <typename GetContainerFunc>
void getAttributeContainer(size_t attribute_index, GetContainerFunc && func)
{
@ -589,41 +545,12 @@ private:
}
}
using TimePoint = std::chrono::system_clock::time_point;
struct Cell
{
TimePoint deadline;
size_t element_index;
bool is_default;
};
void insertCellInCache(KeyType & key, const Cell & cell)
{
/// Copy complex key into arena and put in cache
if constexpr (dictionary_key_type == DictionaryKeyType::complex)
key = copyStringInArena(key);
cache.insert(key, cell);
}
inline void setCellDeadline(Cell & cell, TimePoint now)
{
if (configuration.lifetime.min_sec == 0 && configuration.lifetime.max_sec == 0)
{
/// This maybe not obvious, but when we define is this cell is expired or expired permanently, we add strict_max_lifetime_seconds
/// to the expiration time. And it overflows pretty well.
cell.deadline = std::chrono::time_point<std::chrono::system_clock>::max() - 2 * std::chrono::seconds(configuration.strict_max_lifetime_seconds);
return;
}
size_t min_sec_lifetime = configuration.lifetime.min_sec;
size_t max_sec_lifetime = configuration.lifetime.max_sec;
std::uniform_int_distribution<UInt64> distribution{min_sec_lifetime, max_sec_lifetime};
cell.deadline = now + std::chrono::seconds(distribution(rnd_engine));
}
CacheDictionaryStorageConfiguration configuration;
ArenaWithFreeLists arena;
@ -660,29 +587,47 @@ private:
CacheDictionaryStorage & storage;
template <typename Key, typename Value>
void operator()(const Key & key, const Value & cell) const
void operator()(const Key & key, const Value &) const
{
/// In case of complex key we keep it in arena
if constexpr (std::is_same_v<Key, StringRef>)
storage.arena.free(const_cast<char *>(key.data), key.size);
storage.indexes_to_delete.insert(cell.element_index);
}
};
using SimpleKeyLRUHashMap = LRUHashMap<UInt64, Cell, CacheStorageCellDisposer>;
using ComplexKeyLRUHashMap = LRUHashMapWithSavedHash<StringRef, Cell, CacheStorageCellDisposer>;
using SimpleFixedDeadlineHashMap = FixedDeadlineHashMap<UInt64, Cell, CacheStorageCellDisposer>;
using ComplexFixedDeadlineHashMap = FixedDeadlineHashMap<StringRef, Cell, CacheStorageCellDisposer>;
using CacheLRUHashMap = std::conditional_t<
using FixedDeadlineHashMap = std::conditional_t<
dictionary_key_type == DictionaryKeyType::simple,
SimpleKeyLRUHashMap,
ComplexKeyLRUHashMap>;
SimpleFixedDeadlineHashMap,
ComplexFixedDeadlineHashMap>;
CacheLRUHashMap cache;
using FixedDeadlineHashMapCell = typename FixedDeadlineHashMap::Cell;
inline void setCellDeadline(FixedDeadlineHashMapCell & cell, TimePoint now)
{
if (configuration.lifetime.min_sec == 0 && configuration.lifetime.max_sec == 0)
{
/// This maybe not obvious, but when we define is this cell is expired or expired permanently, we add strict_max_lifetime_seconds
/// to the expiration time. And it overflows pretty well.
auto deadline = std::chrono::time_point<std::chrono::system_clock>::max() - 2 * std::chrono::seconds(configuration.strict_max_lifetime_seconds);
cell.setDeadline(deadline);
return;
}
size_t min_sec_lifetime = configuration.lifetime.min_sec;
size_t max_sec_lifetime = configuration.lifetime.max_sec;
std::uniform_int_distribution<UInt64> distribution{min_sec_lifetime, max_sec_lifetime};
auto deadline = now + std::chrono::seconds(distribution(rnd_engine));
cell.setDeadline(deadline);
}
FixedDeadlineHashMap cache;
std::vector<Attribute> attributes;
size_t insert_index = 0;
absl::flat_hash_set<size_t, DefaultHash<size_t>> indexes_to_delete;
};
}