mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Updated implementation
This commit is contained in:
parent
3d1c42827b
commit
bc22f4f6eb
@ -1,253 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <chrono>
|
||||
#include <type_traits>
|
||||
#include <Common/HashTable/HashMap.h>
|
||||
#include <Common/BitHelpers.h>
|
||||
|
||||
using TimePoint = std::chrono::system_clock::time_point;
|
||||
|
||||
template <typename TKey, typename TMapped, typename Hash, bool save_hash_in_cell>
|
||||
struct DeadlineCell :
|
||||
public std::conditional_t<save_hash_in_cell,
|
||||
HashMapCellWithSavedHash<TKey, TMapped, Hash, HashTableNoState>,
|
||||
HashMapCell<TKey, TMapped, Hash, HashTableNoState>>
|
||||
{
|
||||
using Key = TKey;
|
||||
|
||||
using Base = std::conditional_t<save_hash_in_cell,
|
||||
HashMapCellWithSavedHash<TKey, TMapped, Hash, HashTableNoState>,
|
||||
HashMapCell<TKey, TMapped, Hash, HashTableNoState>>;
|
||||
|
||||
using Mapped = typename Base::Mapped;
|
||||
using State = typename Base::State;
|
||||
|
||||
using mapped_type = Mapped;
|
||||
using key_type = Key;
|
||||
|
||||
using Base::Base;
|
||||
|
||||
inline TimePoint getDeadline() const { return deadline; }
|
||||
|
||||
void setDeadline(TimePoint & deadline_value) { deadline = deadline_value; }
|
||||
|
||||
private:
|
||||
TimePoint deadline;
|
||||
};
|
||||
|
||||
template <typename TKey, typename TValue, typename Disposer, typename Hash, bool save_hash_in_cells>
|
||||
class FixedDeadlineHashMapImpl :
|
||||
private HashMapTable<
|
||||
TKey,
|
||||
DeadlineCell<TKey, TValue, Hash, save_hash_in_cells>,
|
||||
Hash,
|
||||
HashTableGrower<>,
|
||||
HashTableAllocator>
|
||||
{
|
||||
/// TODO: Make custom grower
|
||||
using Base = HashMapTable<
|
||||
TKey,
|
||||
DeadlineCell<TKey, TValue, Hash, save_hash_in_cells>,
|
||||
Hash,
|
||||
HashTableGrower<>,
|
||||
HashTableAllocator>;
|
||||
|
||||
static size_t calculateMaxSize(size_t max_size, size_t max_collision_resolution_chain)
|
||||
{
|
||||
return roundUpToPowerOfTwoOrZero(std::max(max_size, max_collision_resolution_chain));
|
||||
}
|
||||
public:
|
||||
using Cell = DeadlineCell<TKey, TValue, Hash, save_hash_in_cells>;
|
||||
using Key = TKey;
|
||||
using Value = TValue;
|
||||
using Mapped = typename Cell::Mapped;
|
||||
|
||||
explicit FixedDeadlineHashMapImpl(size_t max_size_, size_t max_collision_resolution_chain_, Disposer disposer_ = Disposer())
|
||||
: Base(calculateMaxSize(max_size_, max_collision_resolution_chain_))
|
||||
, max_collision_resolution_chain(max_collision_resolution_chain_)
|
||||
, max_size(max_size_)
|
||||
, disposer(std::move(disposer_))
|
||||
{
|
||||
assert(max_size > 0);
|
||||
assert(max_collision_resolution_chain > 0);
|
||||
}
|
||||
|
||||
~FixedDeadlineHashMapImpl()
|
||||
{
|
||||
clear();
|
||||
}
|
||||
|
||||
Cell * get(const Key & key)
|
||||
{
|
||||
if (Cell::isZero(key, *this))
|
||||
return this->hasZero() ? this->zeroValue() : nullptr;
|
||||
|
||||
/// TODO: Optimize
|
||||
|
||||
size_t hash_value = Base::hash(key);
|
||||
size_t place_value = Base::grower.place(hash_value);
|
||||
size_t resolution_chain = max_collision_resolution_chain;
|
||||
|
||||
while (resolution_chain != 0)
|
||||
{
|
||||
auto & cell = Base::buf[place_value];
|
||||
|
||||
if (cell.isZero(*this))
|
||||
return nullptr;
|
||||
|
||||
if (cell.keyEquals(key, hash_value, *this))
|
||||
return &cell;
|
||||
|
||||
place_value = Base::grower.next(place_value);
|
||||
--resolution_chain;
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
const Cell * get(const Key & key) const
|
||||
{
|
||||
return const_cast<std::decay_t<decltype(*this)> *>(this)->get(key);
|
||||
}
|
||||
|
||||
std::pair<Cell *, bool> ALWAYS_INLINE insert(const Key & key, const Value & value)
|
||||
{
|
||||
return emplace(key, value);
|
||||
}
|
||||
|
||||
std::pair<Cell *, bool> ALWAYS_INLINE insert(const Key & key, Value && value)
|
||||
{
|
||||
return emplace(key, std::move(value));
|
||||
}
|
||||
|
||||
template<typename ...Args>
|
||||
std::pair<Cell *, bool> ALWAYS_INLINE emplace(const Key & key, Args && ... args)
|
||||
{
|
||||
size_t hash_value = Base::hash(key);
|
||||
std::pair<Cell *, bool> result;
|
||||
|
||||
if (!emplaceIfZero(key, hash_value, result))
|
||||
result = emplaceNonZeroImpl(key, hash_value);
|
||||
|
||||
bool was_inserted = result.second;
|
||||
|
||||
if (was_inserted)
|
||||
new (&result.first->getMapped()) Value(std::forward<Args>(args)...);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
template <typename ...Args>
|
||||
void reinsert(Cell * place_to_use, const Key & key, Args && ... args)
|
||||
{
|
||||
size_t hash_value = Base::hash(key);
|
||||
|
||||
new (place_to_use) Cell(key, *this);
|
||||
new (&place_to_use->getMapped()) Value(std::forward<Args>(args)...);
|
||||
place_to_use->setHash(hash_value);
|
||||
}
|
||||
|
||||
using Base::size;
|
||||
|
||||
using iterator = typename Base::iterator;
|
||||
using const_iterator = typename Base::const_iterator;
|
||||
|
||||
using Base::begin;
|
||||
using Base::end;
|
||||
|
||||
size_t getMaxSize() const { return max_size; }
|
||||
|
||||
size_t getSizeInBytes() const { return Base::getBufferSizeInBytes(); }
|
||||
|
||||
void clear()
|
||||
{
|
||||
for (auto & cell : *this)
|
||||
disposer(cell.getKey(), cell.getMapped());
|
||||
}
|
||||
|
||||
private:
|
||||
size_t max_collision_resolution_chain;
|
||||
size_t max_size;
|
||||
Disposer disposer;
|
||||
|
||||
bool emplaceIfZero(const Key & key, size_t hash_value, std::pair<Cell *, bool> & result)
|
||||
{
|
||||
if (!Cell::isZero(key, *this))
|
||||
return false;
|
||||
|
||||
if (this->hasZero())
|
||||
{
|
||||
result = {this->zeroValue(), false};
|
||||
return true;
|
||||
}
|
||||
|
||||
++Base::m_size;
|
||||
|
||||
this->setHasZero();
|
||||
this->zeroValue()->setHash(hash_value);
|
||||
result = {this->zeroValue(), true};
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::pair<Cell *, bool> emplaceNonZeroImpl(const Key & key, size_t hash_value)
|
||||
{
|
||||
TimePoint oldest_time = TimePoint::max();
|
||||
size_t place_value = Base::grower.place(hash_value);
|
||||
size_t resolution_chain = max_collision_resolution_chain;
|
||||
|
||||
bool use_old_value_place = false;
|
||||
Cell * place_to_insert = nullptr;
|
||||
|
||||
while (resolution_chain != 0)
|
||||
{
|
||||
auto & cell = Base::buf[place_value];
|
||||
|
||||
if (cell.isZero(*this))
|
||||
{
|
||||
use_old_value_place = false;
|
||||
place_to_insert = &cell;
|
||||
break;
|
||||
}
|
||||
|
||||
if (cell.keyEquals(key, hash_value, *this))
|
||||
return std::make_pair(&cell, false);
|
||||
|
||||
if (cell.getDeadline() < oldest_time)
|
||||
{
|
||||
use_old_value_place = true;
|
||||
place_to_insert = &cell;
|
||||
}
|
||||
|
||||
place_value = Base::grower.next(place_value);
|
||||
--resolution_chain;
|
||||
}
|
||||
|
||||
if (!place_to_insert)
|
||||
place_to_insert = &Base::buf[place_value];
|
||||
|
||||
if (use_old_value_place)
|
||||
return std::make_pair(place_to_insert, false);
|
||||
else
|
||||
{
|
||||
++Base::m_size;
|
||||
|
||||
new (place_to_insert) Cell(key, *this);
|
||||
place_to_insert->setHash(hash_value);
|
||||
|
||||
return std::make_pair(place_to_insert, true);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
template <typename Key, typename Mapped>
|
||||
struct DefaultFixedHashMapCellDisposer
|
||||
{
|
||||
void operator()(const Key &, const Mapped &) const {}
|
||||
};
|
||||
|
||||
template <typename Key, typename Value, typename Disposer = DefaultFixedHashMapCellDisposer<Key, Value>, typename Hash = DefaultHash<Key>>
|
||||
using FixedDeadlineHashMap = FixedDeadlineHashMapImpl<Key, Value, Disposer, Hash, false>;
|
||||
|
||||
template <typename Key, typename Value, typename Disposer = DefaultFixedHashMapCellDisposer<Key, Value>, typename Hash = DefaultHash<Key>>
|
||||
using FixedDeadlineHashMapWithSavedHash = FixedDeadlineHashMapImpl<Key, Value, Disposer, Hash, true>;
|
@ -332,10 +332,16 @@ Columns CacheDictionary<dictionary_key_type>::getColumnsImpl(
|
||||
|
||||
FetchResult result_of_fetch_from_storage;
|
||||
|
||||
{
|
||||
/// Write lock on storage
|
||||
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
|
||||
bool protect_get_with_write_lock = cache_storage_ptr->canPerformFetchByMultipleThreadsWithoutLock();
|
||||
|
||||
if (protect_get_with_write_lock)
|
||||
{
|
||||
const ProfilingScopedReadRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
|
||||
result_of_fetch_from_storage = cache_storage_ptr->fetchColumnsForKeys(keys, request);
|
||||
}
|
||||
else
|
||||
{
|
||||
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
|
||||
result_of_fetch_from_storage = cache_storage_ptr->fetchColumnsForKeys(keys, request);
|
||||
}
|
||||
|
||||
|
@ -4,14 +4,11 @@
|
||||
#include <variant>
|
||||
|
||||
#include <pcg_random.hpp>
|
||||
#include <absl/container/flat_hash_map.h>
|
||||
#include <absl/container/flat_hash_set.h>
|
||||
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/Arena.h>
|
||||
#include <Common/ArenaWithFreeLists.h>
|
||||
#include <Common/HashTable/LRUHashMap.h>
|
||||
#include <Common/HashTable/FixedDeadlineHashMap.h>
|
||||
#include <Dictionaries/DictionaryStructure.h>
|
||||
#include <Dictionaries/ICacheDictionaryStorage.h>
|
||||
#include <Dictionaries/DictionaryHelpers.h>
|
||||
@ -38,6 +35,9 @@ struct CacheDictionaryStorageConfiguration
|
||||
template <DictionaryKeyType dictionary_key_type>
|
||||
class CacheDictionaryStorage final : public ICacheDictionaryStorage
|
||||
{
|
||||
|
||||
static constexpr size_t max_collision_length = 10;
|
||||
|
||||
public:
|
||||
using KeyType = std::conditional_t<dictionary_key_type == DictionaryKeyType::simple, UInt64, StringRef>;
|
||||
static_assert(dictionary_key_type != DictionaryKeyType::range, "Range key type is not supported by CacheDictionaryStorage");
|
||||
@ -47,13 +47,19 @@ public:
|
||||
CacheDictionaryStorageConfiguration & configuration_)
|
||||
: configuration(configuration_)
|
||||
, rnd_engine(randomSeed())
|
||||
, cache(configuration.max_size_in_cells, 10, { *this })
|
||||
{
|
||||
size_t cells_size = roundUpToPowerOfTwoOrZero(std::max(configuration.max_size_in_cells, max_collision_length));
|
||||
|
||||
cells.resize_fill(cells_size);
|
||||
size_overlap_mask = cells_size - 1;
|
||||
|
||||
setup(dictionary_structure);
|
||||
}
|
||||
|
||||
bool returnsFetchedColumnsInOrderOfRequestedKeys() const override { return true; }
|
||||
|
||||
bool canPerformFetchByMultipleThreadsWithoutLock() const override { return true; }
|
||||
|
||||
String getName() const override
|
||||
{
|
||||
if (dictionary_key_type == DictionaryKeyType::simple)
|
||||
@ -134,9 +140,9 @@ public:
|
||||
throw Exception("Method getCachedComplexKeys is not supported for simple key storage", ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
size_t getSize() const override { return cache.size(); }
|
||||
size_t getSize() const override { return size; }
|
||||
|
||||
size_t getMaxSize() const override { return cache.getMaxSize(); }
|
||||
size_t getMaxSize() const override { return configuration.max_size_in_cells; }
|
||||
|
||||
size_t getBytesAllocated() const override
|
||||
{
|
||||
@ -151,7 +157,7 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
return arena.size() + cache.getSizeInBytes() + attributes_size_in_bytes;
|
||||
return arena.size() + sizeof(Cell) * configuration.max_size_in_cells + attributes_size_in_bytes;
|
||||
}
|
||||
|
||||
private:
|
||||
@ -175,9 +181,9 @@ private:
|
||||
KeysStorageFetchResult result;
|
||||
|
||||
result.fetched_columns = fetch_request.makeAttributesResultColumns();
|
||||
result.key_index_to_state.resize_fill(keys.size(), {KeyState::not_found});
|
||||
result.key_index_to_state.resize_fill(keys.size());
|
||||
|
||||
const auto now = std::chrono::system_clock::now();
|
||||
const time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
|
||||
|
||||
size_t fetched_columns_index = 0;
|
||||
size_t keys_size = keys.size();
|
||||
@ -190,54 +196,39 @@ private:
|
||||
for (size_t key_index = 0; key_index < keys_size; ++key_index)
|
||||
{
|
||||
auto key = keys[key_index];
|
||||
auto * it = cache.get(key);
|
||||
auto [key_state, cell_index] = getKeyStateAndCellIndex(key, now);
|
||||
|
||||
if (!it)
|
||||
if (unlikely(key_state == KeyState::not_found))
|
||||
{
|
||||
result.key_index_to_state[key_index] = {KeyState::not_found};
|
||||
++result.not_found_keys_size;
|
||||
continue;
|
||||
}
|
||||
|
||||
auto deadline = it->getDeadline();
|
||||
const auto & cell = it->getMapped();
|
||||
auto & cell = cells[cell_index];
|
||||
|
||||
if (now > deadline + max_lifetime_seconds)
|
||||
{
|
||||
result.key_index_to_state[key_index] = {KeyState::not_found};
|
||||
++result.not_found_keys_size;
|
||||
continue;
|
||||
}
|
||||
|
||||
bool cell_is_expired = false;
|
||||
KeyState::State key_state = KeyState::found;
|
||||
|
||||
if (now > deadline)
|
||||
{
|
||||
cell_is_expired = true;
|
||||
key_state = KeyState::expired;
|
||||
}
|
||||
result.expired_keys_size += static_cast<size_t>(key_state == KeyState::expired);
|
||||
|
||||
result.key_index_to_state[key_index] = {key_state, fetched_columns_index};
|
||||
++fetched_columns_index;
|
||||
fetched_keys[fetched_columns_index] = FetchedKey(cell.element_index, cell.is_default);
|
||||
|
||||
result.expired_keys_size += cell_is_expired;
|
||||
result.found_keys_size += !cell_is_expired;
|
||||
++fetched_columns_index;
|
||||
|
||||
result.key_index_to_state[key_index].setDefaultValue(cell.is_default);
|
||||
result.default_keys_size += cell.is_default;
|
||||
|
||||
fetched_keys[key_index] = FetchedKey{cell.element_index, cell.is_default};
|
||||
}
|
||||
|
||||
result.found_keys_size = keys_size - (result.expired_keys_size + result.not_found_keys_size);
|
||||
|
||||
for (size_t attribute_index = 0; attribute_index < fetch_request.attributesSize(); ++attribute_index)
|
||||
{
|
||||
if (!fetch_request.shouldFillResultColumnWithIndex(attribute_index))
|
||||
continue;
|
||||
|
||||
size_t fetched_keys_size = fetched_keys.size();
|
||||
auto & attribute = attributes[attribute_index];
|
||||
const auto & default_value_provider = fetch_request.defaultValueProviderAtIndex(attribute_index);
|
||||
|
||||
size_t fetched_keys_size = fetched_keys.size();
|
||||
auto & fetched_column = *result.fetched_columns[attribute_index];
|
||||
fetched_column.reserve(fetched_keys_size);
|
||||
|
||||
@ -245,7 +236,7 @@ private:
|
||||
{
|
||||
auto & container = std::get<std::vector<Field>>(attribute.attribute_container);
|
||||
|
||||
for (size_t fetched_key_index = 0; fetched_key_index < fetched_keys.size(); ++fetched_key_index)
|
||||
for (size_t fetched_key_index = 0; fetched_key_index < fetched_columns_index; ++fetched_key_index)
|
||||
{
|
||||
auto fetched_key = fetched_keys[fetched_key_index];
|
||||
|
||||
@ -272,7 +263,7 @@ private:
|
||||
|
||||
if constexpr (std::is_same_v<ColumnType, ColumnString>)
|
||||
{
|
||||
for (size_t fetched_key_index = 0; fetched_key_index < fetched_keys.size(); ++fetched_key_index)
|
||||
for (size_t fetched_key_index = 0; fetched_key_index < fetched_columns_index; ++fetched_key_index)
|
||||
{
|
||||
auto fetched_key = fetched_keys[fetched_key_index];
|
||||
|
||||
@ -287,7 +278,7 @@ private:
|
||||
}
|
||||
else
|
||||
{
|
||||
for (size_t fetched_key_index = 0; fetched_key_index < fetched_keys.size(); ++fetched_key_index)
|
||||
for (size_t fetched_key_index = 0; fetched_key_index < fetched_columns_index; ++fetched_key_index)
|
||||
{
|
||||
auto fetched_key = fetched_keys[fetched_key_index];
|
||||
auto & data = column_typed.getData();
|
||||
@ -314,23 +305,27 @@ private:
|
||||
{
|
||||
const auto now = std::chrono::system_clock::now();
|
||||
|
||||
size_t keys_size = keys.size();
|
||||
|
||||
size_t columns_size = columns.size();
|
||||
Field column_value;
|
||||
|
||||
for (size_t key_index = 0; key_index < keys_size; ++key_index)
|
||||
for (size_t key_index = 0; key_index < keys.size(); ++key_index)
|
||||
{
|
||||
auto key = keys[key_index];
|
||||
|
||||
auto [it, was_inserted] = cache.insert(key, {});
|
||||
size_t cell_index = getCellIndexForInsert(key);
|
||||
auto & cell = cells[cell_index];
|
||||
|
||||
cell.is_default = false;
|
||||
|
||||
bool was_inserted = cell.deadline == 0;
|
||||
|
||||
if (was_inserted)
|
||||
{
|
||||
auto & cell = it->getMapped();
|
||||
cell.is_default = false;
|
||||
if constexpr (std::is_same_v<KeyType, StringRef>)
|
||||
cell.key = copyStringInArena(key);
|
||||
else
|
||||
cell.key = key;
|
||||
|
||||
for (size_t attribute_index = 0; attribute_index < columns_size; ++attribute_index)
|
||||
for (size_t attribute_index = 0; attribute_index < columns.size(); ++attribute_index)
|
||||
{
|
||||
auto & column = columns[attribute_index];
|
||||
|
||||
@ -347,38 +342,36 @@ private:
|
||||
container.back() = column_value;
|
||||
else if constexpr (std::is_same_v<ElementType, StringRef>)
|
||||
{
|
||||
const String & value = column_value.get<String>();
|
||||
StringRef inserted_value = copyStringInArena(StringRef { value.data(), value.size() });
|
||||
const String & string_value = column_value.get<String>();
|
||||
StringRef string_value_ref = StringRef {string_value.data(), string_value.size()};
|
||||
StringRef inserted_value = copyStringInArena(string_value_ref);
|
||||
container.back() = inserted_value;
|
||||
}
|
||||
else
|
||||
container.back() = column_value.get<ElementType>();
|
||||
container.back() = column_value.get<NearestFieldType<ElementType>>();
|
||||
});
|
||||
}
|
||||
|
||||
++size;
|
||||
}
|
||||
else
|
||||
{
|
||||
auto & cell_key = it->getKey();
|
||||
|
||||
Cell cell;
|
||||
|
||||
size_t existing_index = it->getMapped().element_index;
|
||||
|
||||
cell.element_index = existing_index;
|
||||
cell.is_default = false;
|
||||
|
||||
if (cell_key != key)
|
||||
if (cell.key != key)
|
||||
{
|
||||
/// In case of complex key we keep it in arena
|
||||
if constexpr (std::is_same_v<KeyType, StringRef>)
|
||||
arena.free(const_cast<char *>(key.data), key.size);
|
||||
{
|
||||
char * data = const_cast<char *>(cell.key.data);
|
||||
arena.free(data, cell.key.size);
|
||||
cell.key = copyStringInArena(key);
|
||||
}
|
||||
else
|
||||
cell.key = key;
|
||||
}
|
||||
|
||||
cache.reinsert(it, key, cell);
|
||||
/// Put values into existing index
|
||||
size_t index_to_use = cell.element_index;
|
||||
|
||||
/// Put values into index
|
||||
|
||||
for (size_t attribute_index = 0; attribute_index < columns_size; ++attribute_index)
|
||||
for (size_t attribute_index = 0; attribute_index < columns.size(); ++attribute_index)
|
||||
{
|
||||
auto & column = columns[attribute_index];
|
||||
|
||||
@ -389,20 +382,26 @@ private:
|
||||
column->get(key_index, column_value);
|
||||
|
||||
if constexpr (std::is_same_v<ElementType, Field>)
|
||||
container[existing_index] = column_value;
|
||||
container[index_to_use] = column_value;
|
||||
else if constexpr (std::is_same_v<ElementType, StringRef>)
|
||||
{
|
||||
const String & value = column_value.get<String>();
|
||||
StringRef inserted_value = copyStringInArena(StringRef { value.data(), value.size() });
|
||||
container[existing_index] = inserted_value;
|
||||
const String & string_value = column_value.get<String>();
|
||||
StringRef string_ref_value = StringRef {string_value.data(), string_value.size()};
|
||||
StringRef inserted_value = copyStringInArena(string_ref_value);
|
||||
|
||||
StringRef previous_value = container[index_to_use];
|
||||
char * data = const_cast<char *>(previous_value.data);
|
||||
arena.free(data, previous_value.size);
|
||||
|
||||
container[index_to_use] = inserted_value;
|
||||
}
|
||||
else
|
||||
container[existing_index] = column_value.get<ElementType>();
|
||||
container[index_to_use] = column_value.get<NearestFieldType<ElementType>>();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
setCellDeadline(*it, now);
|
||||
setCellDeadline(cell, now);
|
||||
}
|
||||
}
|
||||
|
||||
@ -416,55 +415,64 @@ private:
|
||||
{
|
||||
auto key = keys[key_index];
|
||||
|
||||
Cell value;
|
||||
value.is_default = true;
|
||||
size_t cell_index = getCellIndexForInsert(key);
|
||||
auto & cell = cells[cell_index];
|
||||
|
||||
auto [it, was_inserted] = cache.insert(key, value);
|
||||
bool was_inserted = cell.deadline == 0;
|
||||
|
||||
cell.is_default = true;
|
||||
|
||||
if (was_inserted)
|
||||
{
|
||||
auto & cell = it->getMapped();
|
||||
if constexpr (std::is_same_v<KeyType, StringRef>)
|
||||
cell.key = copyStringInArena(key);
|
||||
else
|
||||
cell.key = key;
|
||||
|
||||
for (size_t attribute_index = 0; attribute_index < attributes.size(); ++attribute_index)
|
||||
{
|
||||
getAttributeContainer(attribute_index, [&](auto & container)
|
||||
{
|
||||
container.emplace_back();
|
||||
cell.element_index = container.size();
|
||||
cell.element_index = container.size() - 1;
|
||||
});
|
||||
}
|
||||
|
||||
++size;
|
||||
}
|
||||
else
|
||||
{
|
||||
value.element_index = it->getMapped().element_index;
|
||||
|
||||
if (it->getKey() != key)
|
||||
if (cell.key != key)
|
||||
{
|
||||
/// In case of complex key we keep it in arena
|
||||
if constexpr (std::is_same_v<KeyType, StringRef>)
|
||||
arena.free(const_cast<char *>(key.data), key.size);
|
||||
{
|
||||
char * data = const_cast<char *>(cell.key.data);
|
||||
arena.free(data, cell.key.size);
|
||||
cell.key = copyStringInArena(key);
|
||||
}
|
||||
else
|
||||
cell.key = key;
|
||||
}
|
||||
}
|
||||
|
||||
cache.reinsert(it, key, value);
|
||||
}
|
||||
|
||||
setCellDeadline(*it, now);
|
||||
setCellDeadline(cell, now);
|
||||
}
|
||||
}
|
||||
|
||||
PaddedPODArray<KeyType> getCachedKeysImpl() const
|
||||
{
|
||||
PaddedPODArray<KeyType> result;
|
||||
result.reserve(cache.size());
|
||||
result.reserve(size);
|
||||
|
||||
for (auto & node : cache)
|
||||
for (auto cell : cells)
|
||||
{
|
||||
auto & cell = node.getMapped();
|
||||
if (cell.deadline == 0)
|
||||
continue;
|
||||
|
||||
if (cell.is_default)
|
||||
continue;
|
||||
|
||||
result.emplace_back(node.getKey());
|
||||
result.emplace_back(cell.key);
|
||||
}
|
||||
|
||||
return result;
|
||||
@ -545,18 +553,16 @@ private:
|
||||
}
|
||||
}
|
||||
|
||||
using TimePoint = std::chrono::system_clock::time_point;
|
||||
|
||||
struct Cell
|
||||
{
|
||||
KeyType key;
|
||||
size_t element_index;
|
||||
bool is_default;
|
||||
time_t deadline;
|
||||
};
|
||||
|
||||
CacheDictionaryStorageConfiguration configuration;
|
||||
|
||||
ArenaWithFreeLists arena;
|
||||
|
||||
pcg64 rnd_engine;
|
||||
|
||||
struct Attribute
|
||||
{
|
||||
AttributeUnderlyingType type;
|
||||
@ -581,38 +587,28 @@ private:
|
||||
std::vector<Field>> attribute_container;
|
||||
};
|
||||
|
||||
class CacheStorageCellDisposer
|
||||
{
|
||||
public:
|
||||
CacheDictionaryStorage & storage;
|
||||
CacheDictionaryStorageConfiguration configuration;
|
||||
|
||||
template <typename Key, typename Value>
|
||||
void operator()(const Key & key, const Value &) const
|
||||
{
|
||||
/// In case of complex key we keep it in arena
|
||||
if constexpr (std::is_same_v<Key, StringRef>)
|
||||
storage.arena.free(const_cast<char *>(key.data), key.size);
|
||||
}
|
||||
};
|
||||
pcg64 rnd_engine;
|
||||
|
||||
using SimpleFixedDeadlineHashMap = FixedDeadlineHashMap<UInt64, Cell, CacheStorageCellDisposer>;
|
||||
using ComplexFixedDeadlineHashMap = FixedDeadlineHashMap<StringRef, Cell, CacheStorageCellDisposer>;
|
||||
size_t size_overlap_mask = 0;
|
||||
|
||||
using FixedDeadlineHashMap = std::conditional_t<
|
||||
dictionary_key_type == DictionaryKeyType::simple,
|
||||
SimpleFixedDeadlineHashMap,
|
||||
ComplexFixedDeadlineHashMap>;
|
||||
size_t size = 0;
|
||||
|
||||
using FixedDeadlineHashMapCell = typename FixedDeadlineHashMap::Cell;
|
||||
PaddedPODArray<Cell> cells;
|
||||
|
||||
inline void setCellDeadline(FixedDeadlineHashMapCell & cell, TimePoint now)
|
||||
ArenaWithFreeLists arena;
|
||||
|
||||
std::vector<Attribute> attributes;
|
||||
|
||||
inline void setCellDeadline(Cell & cell, TimePoint now)
|
||||
{
|
||||
if (configuration.lifetime.min_sec == 0 && configuration.lifetime.max_sec == 0)
|
||||
{
|
||||
/// This maybe not obvious, but when we define is this cell is expired or expired permanently, we add strict_max_lifetime_seconds
|
||||
/// to the expiration time. And it overflows pretty well.
|
||||
auto deadline = std::chrono::time_point<std::chrono::system_clock>::max() - 2 * std::chrono::seconds(configuration.strict_max_lifetime_seconds);
|
||||
cell.setDeadline(deadline);
|
||||
cell.deadline = std::chrono::system_clock::to_time_t(deadline);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -622,12 +618,73 @@ private:
|
||||
std::uniform_int_distribution<UInt64> distribution{min_sec_lifetime, max_sec_lifetime};
|
||||
|
||||
auto deadline = now + std::chrono::seconds(distribution(rnd_engine));
|
||||
cell.setDeadline(deadline);
|
||||
cell.deadline = std::chrono::system_clock::to_time_t(deadline);
|
||||
}
|
||||
|
||||
FixedDeadlineHashMap cache;
|
||||
inline size_t getCellIndex(const KeyType key) const
|
||||
{
|
||||
const size_t hash = DefaultHash<KeyType>()(key);
|
||||
const size_t index = hash & size_overlap_mask;
|
||||
return index;
|
||||
}
|
||||
|
||||
std::vector<Attribute> attributes;
|
||||
using KeyStateAndCellIndex = std::pair<KeyState::State, size_t>;
|
||||
|
||||
inline KeyStateAndCellIndex getKeyStateAndCellIndex(const KeyType key, const time_t now) const
|
||||
{
|
||||
size_t place_value = getCellIndex(key);
|
||||
const size_t place_value_end = place_value + max_collision_length;
|
||||
|
||||
time_t max_lifetime_seconds = static_cast<time_t>(configuration.strict_max_lifetime_seconds);
|
||||
|
||||
for (; place_value < place_value_end; ++place_value)
|
||||
{
|
||||
const auto cell_place_value = place_value & size_overlap_mask;
|
||||
const auto & cell = cells[cell_place_value];
|
||||
|
||||
if (cell.key != key)
|
||||
continue;
|
||||
|
||||
if (unlikely(now > cell.deadline + max_lifetime_seconds))
|
||||
return std::make_pair(KeyState::not_found, cell_place_value);
|
||||
|
||||
if (unlikely(now > cell.deadline))
|
||||
return std::make_pair(KeyState::expired, cell_place_value);
|
||||
|
||||
return std::make_pair(KeyState::found, cell_place_value);
|
||||
}
|
||||
|
||||
return std::make_pair(KeyState::not_found, place_value);
|
||||
}
|
||||
|
||||
inline size_t getCellIndexForInsert(const KeyType & key) const
|
||||
{
|
||||
size_t place_value = getCellIndex(key);
|
||||
const size_t place_value_end = place_value + max_collision_length;
|
||||
size_t oldest_place_value = place_value;
|
||||
|
||||
time_t oldest_time = std::numeric_limits<time_t>::max();
|
||||
|
||||
for (; place_value < place_value_end; ++place_value)
|
||||
{
|
||||
const size_t cell_place_value = place_value & size_overlap_mask;
|
||||
const Cell cell = cells[cell_place_value];
|
||||
|
||||
if (cell.deadline == 0)
|
||||
return cell_place_value;
|
||||
|
||||
if (cell.key == key)
|
||||
return place_value;
|
||||
|
||||
if (cell.deadline < oldest_time)
|
||||
{
|
||||
oldest_time = cell.deadline;
|
||||
oldest_place_value = cell_place_value;
|
||||
}
|
||||
}
|
||||
|
||||
return oldest_place_value;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -12,9 +12,9 @@ struct KeyState
|
||||
{
|
||||
enum State: uint8_t
|
||||
{
|
||||
not_found = 2,
|
||||
expired = 4,
|
||||
found = 8,
|
||||
not_found = 0,
|
||||
expired = 1,
|
||||
found = 2,
|
||||
};
|
||||
|
||||
KeyState(State state_, size_t fetched_column_index_)
|
||||
@ -72,6 +72,8 @@ public:
|
||||
/// Necessary if all keys are found we can return result to client without additional aggregation
|
||||
virtual bool returnsFetchedColumnsInOrderOfRequestedKeys() const = 0;
|
||||
|
||||
virtual bool canPerformFetchByMultipleThreadsWithoutLock() const = 0;
|
||||
|
||||
/// Name of storage
|
||||
virtual String getName() const = 0;
|
||||
|
||||
|
@ -815,6 +815,8 @@ public:
|
||||
|
||||
bool returnsFetchedColumnsInOrderOfRequestedKeys() const override { return false; }
|
||||
|
||||
bool canPerformFetchByMultipleThreadsWithoutLock() const override { return false; }
|
||||
|
||||
String getName() const override
|
||||
{
|
||||
if (dictionary_key_type == DictionaryKeyType::simple)
|
||||
|
@ -40,7 +40,7 @@ SELECT dictGetOrDefault('01681_database_for_cache_dictionary.cache_dictionary_si
|
||||
SELECT 'dictHas';
|
||||
SELECT dictHas('01681_database_for_cache_dictionary.cache_dictionary_simple_key_simple_attributes', number) FROM system.numbers LIMIT 4;
|
||||
SELECT 'select all values as input stream';
|
||||
SELECT * FROM 01681_database_for_cache_dictionary.cache_dictionary_simple_key_simple_attributes;
|
||||
SELECT * FROM 01681_database_for_cache_dictionary.cache_dictionary_simple_key_simple_attributes ORDER BY id;
|
||||
|
||||
DROP DICTIONARY 01681_database_for_cache_dictionary.cache_dictionary_simple_key_simple_attributes;
|
||||
DROP TABLE 01681_database_for_cache_dictionary.simple_key_simple_attributes_source_table;
|
||||
@ -84,7 +84,7 @@ SELECT dictGetOrDefault('01681_database_for_cache_dictionary.cache_dictionary_si
|
||||
SELECT 'dictHas';
|
||||
SELECT dictHas('01681_database_for_cache_dictionary.cache_dictionary_simple_key_complex_attributes', number) FROM system.numbers LIMIT 4;
|
||||
SELECT 'select all values as input stream';
|
||||
SELECT * FROM 01681_database_for_cache_dictionary.cache_dictionary_simple_key_complex_attributes;
|
||||
SELECT * FROM 01681_database_for_cache_dictionary.cache_dictionary_simple_key_complex_attributes ORDER BY id;
|
||||
|
||||
DROP DICTIONARY 01681_database_for_cache_dictionary.cache_dictionary_simple_key_complex_attributes;
|
||||
DROP TABLE 01681_database_for_cache_dictionary.simple_key_complex_attributes_source_table;
|
||||
|
@ -42,7 +42,7 @@ SELECT dictGetOrDefault('01682_database_for_cache_dictionary.cache_dictionary_co
|
||||
SELECT 'dictHas';
|
||||
SELECT dictHas('01682_database_for_cache_dictionary.cache_dictionary_complex_key_simple_attributes', (number, concat('id_key_', toString(number)))) FROM system.numbers LIMIT 4;
|
||||
SELECT 'select all values as input stream';
|
||||
SELECT * FROM 01682_database_for_cache_dictionary.cache_dictionary_complex_key_simple_attributes;
|
||||
SELECT * FROM 01682_database_for_cache_dictionary.cache_dictionary_complex_key_simple_attributes ORDER BY id;
|
||||
|
||||
DROP DICTIONARY 01682_database_for_cache_dictionary.cache_dictionary_complex_key_simple_attributes;
|
||||
DROP TABLE 01682_database_for_cache_dictionary.complex_key_simple_attributes_source_table;
|
||||
@ -89,7 +89,7 @@ SELECT dictGetOrDefault('01682_database_for_cache_dictionary.cache_dictionary_co
|
||||
SELECT 'dictHas';
|
||||
SELECT dictHas('01682_database_for_cache_dictionary.cache_dictionary_complex_key_complex_attributes', (number, concat('id_key_', toString(number)))) FROM system.numbers LIMIT 4;
|
||||
SELECT 'select all values as input stream';
|
||||
SELECT * FROM 01682_database_for_cache_dictionary.cache_dictionary_complex_key_complex_attributes;
|
||||
SELECT * FROM 01682_database_for_cache_dictionary.cache_dictionary_complex_key_complex_attributes ORDER BY id;
|
||||
|
||||
DROP DICTIONARY 01682_database_for_cache_dictionary.cache_dictionary_complex_key_complex_attributes;
|
||||
DROP TABLE 01682_database_for_cache_dictionary.complex_key_complex_attributes_source_table;
|
||||
|
@ -40,7 +40,7 @@ SELECT dictGetOrDefault('01684_database_for_cache_dictionary.cache_dictionary_si
|
||||
SELECT 'dictHas';
|
||||
SELECT dictHas('01684_database_for_cache_dictionary.cache_dictionary_simple_key_simple_attributes', number) FROM system.numbers LIMIT 4;
|
||||
SELECT 'select all values as input stream';
|
||||
SELECT * FROM 01684_database_for_cache_dictionary.cache_dictionary_simple_key_simple_attributes;
|
||||
SELECT * FROM 01684_database_for_cache_dictionary.cache_dictionary_simple_key_simple_attributes ORDER BY id;
|
||||
|
||||
DROP DICTIONARY 01684_database_for_cache_dictionary.cache_dictionary_simple_key_simple_attributes;
|
||||
DROP TABLE 01684_database_for_cache_dictionary.simple_key_simple_attributes_source_table;
|
||||
@ -84,7 +84,7 @@ SELECT dictGetOrDefault('01684_database_for_cache_dictionary.cache_dictionary_si
|
||||
SELECT 'dictHas';
|
||||
SELECT dictHas('01684_database_for_cache_dictionary.cache_dictionary_simple_key_complex_attributes', number) FROM system.numbers LIMIT 4;
|
||||
SELECT 'select all values as input stream';
|
||||
SELECT * FROM 01684_database_for_cache_dictionary.cache_dictionary_simple_key_complex_attributes;
|
||||
SELECT * FROM 01684_database_for_cache_dictionary.cache_dictionary_simple_key_complex_attributes ORDER BY id;
|
||||
|
||||
DROP DICTIONARY 01684_database_for_cache_dictionary.cache_dictionary_simple_key_complex_attributes;
|
||||
DROP TABLE 01684_database_for_cache_dictionary.simple_key_complex_attributes_source_table;
|
||||
|
@ -42,7 +42,7 @@ SELECT dictGetOrDefault('01685_database_for_cache_dictionary.cache_dictionary_co
|
||||
SELECT 'dictHas';
|
||||
SELECT dictHas('01685_database_for_cache_dictionary.cache_dictionary_complex_key_simple_attributes', (number, concat('id_key_', toString(number)))) FROM system.numbers LIMIT 4;
|
||||
SELECT 'select all values as input stream';
|
||||
SELECT * FROM 01685_database_for_cache_dictionary.cache_dictionary_complex_key_simple_attributes;
|
||||
SELECT * FROM 01685_database_for_cache_dictionary.cache_dictionary_complex_key_simple_attributes ORDER BY id;
|
||||
|
||||
DROP DICTIONARY 01685_database_for_cache_dictionary.cache_dictionary_complex_key_simple_attributes;
|
||||
DROP TABLE 01685_database_for_cache_dictionary.complex_key_simple_attributes_source_table;
|
||||
@ -89,7 +89,7 @@ SELECT dictGetOrDefault('01685_database_for_cache_dictionary.cache_dictionary_co
|
||||
SELECT 'dictHas';
|
||||
SELECT dictHas('01685_database_for_cache_dictionary.cache_dictionary_complex_key_complex_attributes', (number, concat('id_key_', toString(number)))) FROM system.numbers LIMIT 4;
|
||||
SELECT 'select all values as input stream';
|
||||
SELECT * FROM 01685_database_for_cache_dictionary.cache_dictionary_complex_key_complex_attributes;
|
||||
SELECT * FROM 01685_database_for_cache_dictionary.cache_dictionary_complex_key_complex_attributes ORDER BY id;
|
||||
|
||||
DROP DICTIONARY 01685_database_for_cache_dictionary.cache_dictionary_complex_key_complex_attributes;
|
||||
DROP TABLE 01685_database_for_cache_dictionary.complex_key_complex_attributes_source_table;
|
||||
|
Loading…
Reference in New Issue
Block a user