Added specialized CacheDictionaryStorage

This commit is contained in:
Maksim Kita 2021-03-10 01:58:19 +03:00
parent f1223e7548
commit ce20eae2a3
7 changed files with 908 additions and 275 deletions

View File

@ -692,6 +692,30 @@ public:
assign(from.begin(), from.end());
}
void erase(const_iterator first, const_iterator last)
{
iterator first_no_const = const_cast<iterator>(first);
iterator last_no_const = const_cast<iterator>(last);
size_t items_to_move = end() - last;
while (items_to_move != 0)
{
*first_no_const = *last_no_const;
++first_no_const;
++last_no_const;
--items_to_move;
}
this->c_end = reinterpret_cast<char *>(first_no_const);
}
void erase(const_iterator pos)
{
this->erase(pos, pos + 1);
}
bool operator== (const PODArray & rhs) const
{

View File

@ -92,3 +92,57 @@ TEST(Common, PODInsertElementSizeNotMultipleOfLeftPadding)
EXPECT_EQ(arr1_initially_empty.size(), items_to_insert_size);
}
TEST(Common, PODErase)
{
{
PaddedPODArray<UInt64> items {0,1,2,3,4,5,6,7,8,9};
PaddedPODArray<UInt64> expected;
expected = {0,1,2,3,4,5,6,7,8,9};
items.erase(items.begin(), items.begin());
EXPECT_EQ(items, expected);
items.erase(items.end(), items.end());
EXPECT_EQ(items, expected);
}
{
PaddedPODArray<UInt64> actual {0,1,2,3,4,5,6,7,8,9};
PaddedPODArray<UInt64> expected;
expected = {0,1,4,5,6,7,8,9};
actual.erase(actual.begin() + 2, actual.begin() + 4);
EXPECT_EQ(actual, expected);
expected = {0,1,4};
actual.erase(actual.begin() + 3, actual.end());
EXPECT_EQ(actual, expected);
expected = {};
actual.erase(actual.begin(), actual.end());
EXPECT_EQ(actual, expected);
for (size_t i = 0; i < 10; ++i)
actual.emplace_back(static_cast<UInt64>(i));
expected = {0,1,4,5,6,7,8,9};
actual.erase(actual.begin() + 2, actual.begin() + 4);
EXPECT_EQ(actual, expected);
expected = {0,1,4};
actual.erase(actual.begin() + 3, actual.end());
EXPECT_EQ(actual, expected);
expected = {};
actual.erase(actual.begin(), actual.end());
EXPECT_EQ(actual, expected);
}
{
PaddedPODArray<UInt64> actual {0,1,2,3,4,5,6,7,8,9};
PaddedPODArray<UInt64> expected;
expected = {1,2,3,4,5,6,7,8,9};
actual.erase(actual.begin());
EXPECT_EQ(actual, expected);
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <chrono>
#include <variant>
#include <pcg_random.hpp>
@ -30,16 +31,7 @@ struct CacheDictionaryStorageConfiguration
const DictionaryLifetime lifetime;
};
/** Keys are stored in LRUCache and column values are serialized into arena.
Cell in LRUCache consists of allocated size and place in arena were columns serialized data is stored.
Columns are serialized by rows.
When cell is removed from LRUCache data associated with it is also removed from arena.
In case of complex key we also store key data in arena and it is removed from arena.
*/
/// TODO: Add documentation
template <DictionaryKeyType dictionary_key_type>
class CacheDictionaryStorage final : public ICacheDictionaryStorage
{
@ -47,11 +39,36 @@ public:
using KeyType = std::conditional_t<dictionary_key_type == DictionaryKeyType::simple, UInt64, StringRef>;
static_assert(dictionary_key_type != DictionaryKeyType::range, "Range key type is not supported by CacheDictionaryStorage");
explicit CacheDictionaryStorage(CacheDictionaryStorageConfiguration & configuration_)
explicit CacheDictionaryStorage(
const DictionaryStructure & dictionary_structure,
CacheDictionaryStorageConfiguration & configuration_)
: configuration(configuration_)
, rnd_engine(randomSeed())
, cache(configuration.max_size_in_cells, false, { arena })
, cache(configuration.max_size_in_cells, false, { *this })
{
for (const auto & dictionary_attribute : dictionary_structure.attributes)
{
auto attribute_type = dictionary_attribute.underlying_type;
auto type_call = [&](const auto & dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using ValueType = DictionaryValueType<AttributeType>;
attributes.emplace_back();
auto & last_attribute = attributes.back();
last_attribute.type = attribute_type;
last_attribute.is_complex_type = dictionary_attribute.is_nullable || dictionary_attribute.is_array;
if (dictionary_attribute.is_nullable)
last_attribute.attribute_container = std::vector<Field>();
else
last_attribute.attribute_container = PaddedPODArray<ValueType>();
};
callOnDictionaryAttributeType(attribute_type, type_call);
}
}
bool returnsFetchedColumnsInOrderOfRequestedKeys() const override { return true; }
@ -144,10 +161,36 @@ public:
size_t getMaxSize() const override { return cache.getMaxSize(); }
size_t getBytesAllocated() const override { return arena.size() + cache.getSizeInBytes(); }
size_t getBytesAllocated() const override
{
size_t attributes_size_in_bytes = 0;
size_t attributes_size = attributes.size();
for (size_t attribute_index = 0; attribute_index < attributes_size; ++attribute_index)
{
getAttributeContainer(attribute_index, [&](const auto & container)
{
attributes_size_in_bytes += container.capacity() * sizeof(container[0]);
});
}
return arena.size() + cache.getSizeInBytes();
}
private:
struct FetchedKey
{
FetchedKey(size_t element_index_, bool is_default_)
: element_index(element_index_)
, is_default(is_default_)
{}
const size_t element_index;
const bool is_default;
};
template <typename KeysStorageFetchResult>
ALWAYS_INLINE KeysStorageFetchResult fetchColumnsForKeysImpl(
const PaddedPODArray<KeyType> & keys,
@ -161,10 +204,12 @@ private:
const auto now = std::chrono::system_clock::now();
size_t fetched_columns_index = 0;
size_t keys_size = keys.size();
std::chrono::seconds max_lifetime_seconds(configuration.strict_max_lifetime_seconds);
size_t keys_size = keys.size();
PaddedPODArray<FetchedKey> fetched_keys;
fetched_keys.reserve(keys_size);
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
@ -195,19 +240,14 @@ private:
++result.found_keys_size;
}
++fetched_columns_index;
if (cell.isDefault())
if (cell.is_default)
{
result.key_index_to_state[key_index].setDefault();
++result.default_keys_size;
insertDefaultValuesIntoColumns(result.fetched_columns, fetch_request, key_index);
}
else
{
const char * place_for_serialized_columns = cell.place_for_serialized_columns;
deserializeAndInsertIntoColumns(result.fetched_columns, fetch_request, place_for_serialized_columns);
}
fetched_keys.emplace_back(cell.element_index, cell.is_default);
++fetched_columns_index;
}
else
{
@ -216,64 +256,166 @@ private:
}
}
for (size_t attribute_index = 0; attribute_index < fetch_request.attributesSize(); ++attribute_index)
{
if (!fetch_request.shouldFillResultColumnWithIndex(attribute_index))
continue;
size_t fetched_keys_size = fetched_keys.size();
auto & attribute = attributes[attribute_index];
const auto & default_value_provider = fetch_request.defaultValueProviderAtIndex(attribute_index);
auto & fetched_column = *result.fetched_columns[attribute_index];
fetched_column.reserve(fetched_keys_size);
if (unlikely(attribute.is_complex_type))
{
auto & container = std::get<std::vector<Field>>(attribute.attribute_container);
for (size_t fetched_key_index = 0; fetched_key_index < fetched_keys.size(); ++fetched_key_index)
{
auto fetched_key = fetched_keys[fetched_key_index];
if (fetched_key.is_default)
fetched_column.insert(default_value_provider.getDefaultValue(fetched_key_index));
else
fetched_column.insert(container[fetched_key.element_index]);
}
}
else
{
auto type_call = [&](const auto & dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using ValueType = DictionaryValueType<AttributeType>;
using ColumnType =
std::conditional_t<std::is_same_v<AttributeType, String>, ColumnString,
std::conditional_t<IsDecimalNumber<AttributeType>, ColumnDecimal<ValueType>,
ColumnVector<AttributeType>>>;
auto & container = std::get<PaddedPODArray<ValueType>>(attribute.attribute_container);
ColumnType & column_typed = static_cast<ColumnType &>(fetched_column);
if constexpr (std::is_same_v<ColumnType, ColumnString>)
{
for (size_t fetched_key_index = 0; fetched_key_index < fetched_keys.size(); ++fetched_key_index)
{
auto fetched_key = fetched_keys[fetched_key_index];
if (fetched_key.is_default)
column_typed.insert(default_value_provider.getDefaultValue(fetched_key_index));
else
{
auto item = container[fetched_key.element_index];
column_typed.insertData(item.data, item.size);
}
}
}
else
{
for (size_t fetched_key_index = 0; fetched_key_index < fetched_keys.size(); ++fetched_key_index)
{
auto fetched_key = fetched_keys[fetched_key_index];
auto & data = column_typed.getData();
if (fetched_key.is_default)
column_typed.insert(default_value_provider.getDefaultValue(fetched_key_index));
else
{
auto item = container[fetched_key.element_index];
data.push_back(item);
}
}
}
};
callOnDictionaryAttributeType(attribute.type, type_call);
}
}
return result;
}
void insertColumnsForKeysImpl(const PaddedPODArray<KeyType> & keys, Columns columns)
{
Arena temporary_values_pool;
size_t columns_to_serialize_size = columns.size();
PaddedPODArray<StringRef> temporary_column_data(columns_to_serialize_size);
const auto now = std::chrono::system_clock::now();
size_t keys_size = keys.size();
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
size_t allocated_size_for_columns = 0;
const char * block_start = nullptr;
auto key = keys[key_index];
auto * it = cache.find(key);
cache.erase(key);
for (size_t column_index = 0; column_index < columns_to_serialize_size; ++column_index)
Cell cell;
setCellDeadline(cell, now);
cell.element_index = insert_index;
cell.is_default = false;
++insert_index;
insertCellInCache(key, cell);
}
Field complex_column_value;
for (size_t column_index = 0; column_index < columns.size(); ++column_index)
{
auto & attribute = attributes[column_index];
const auto & column = columns[column_index];
size_t column_size = column->size();
if (unlikely(attribute.is_complex_type))
{
auto & column = columns[column_index];
temporary_column_data[column_index] = column->serializeValueIntoArena(key_index, temporary_values_pool, block_start);
allocated_size_for_columns += temporary_column_data[column_index].size;
}
auto & container = std::get<std::vector<Field>>(attribute.attribute_container);
container.reserve(column_size);
char * place_for_serialized_columns = arena.alloc(allocated_size_for_columns);
memcpy(reinterpret_cast<void*>(place_for_serialized_columns), reinterpret_cast<const void*>(block_start), allocated_size_for_columns);
if (it)
{
/// Cell exists need to free previous serialized place and update deadline
auto & cell = it->getMapped();
if (cell.place_for_serialized_columns)
arena.free(cell.place_for_serialized_columns, cell.allocated_size_for_columns);
setCellDeadline(cell, now);
cell.allocated_size_for_columns = allocated_size_for_columns;
cell.place_for_serialized_columns = place_for_serialized_columns;
for (size_t item_index = 0; item_index < column_size; ++item_index)
{
column->get(item_index, complex_column_value);
container.emplace_back(complex_column_value);
}
}
else
{
/// No cell exists so create and put in cache
Cell cell;
auto type_call = [&](const auto & dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using ValueType = DictionaryValueType<AttributeType>;
using ColumnType =
std::conditional_t<std::is_same_v<AttributeType, String>, ColumnString,
std::conditional_t<IsDecimalNumber<AttributeType>, ColumnDecimal<ValueType>,
ColumnVector<AttributeType>>>;
setCellDeadline(cell, now);
cell.allocated_size_for_columns = allocated_size_for_columns;
cell.place_for_serialized_columns = place_for_serialized_columns;
const ColumnType & column_typed = static_cast<const ColumnType &>(*column);
insertCellInCache(key, cell);
auto & container = std::get<PaddedPODArray<ValueType>>(attribute.attribute_container);
container.reserve(column_size);
if constexpr (std::is_same_v<ColumnType, ColumnString>)
{
/// TODO: Serialize while column string in arena then just insert offsets in container
for (size_t item_index = 0; item_index < column_size; ++item_index)
{
StringRef value = column->getDataAt(item_index);
StringRef updated_data = copyStringInArena(value);
container.emplace_back(updated_data);
}
}
else
{
const auto & data = column_typed.getData();
container.insert(data.begin(), data.end());
}
};
callOnDictionaryAttributeType(attribute.type, type_call);
}
temporary_values_pool.rollback(allocated_size_for_columns);
}
deleteUnusedKeysIfNecessary();
}
void insertDefaultKeysImpl(const PaddedPODArray<KeyType> & keys)
@ -282,31 +424,18 @@ private:
for (auto key : keys)
{
auto * it = cache.find(key);
cache.erase(key);
if (it)
{
auto & cell = it->getMapped();
Cell cell;
setCellDeadline(cell, now);
setCellDeadline(cell, now);
cell.element_index = 0;
cell.is_default = true;
if (cell.place_for_serialized_columns)
arena.free(cell.place_for_serialized_columns, cell.allocated_size_for_columns);
cell.allocated_size_for_columns = 0;
cell.place_for_serialized_columns = nullptr;
}
else
{
Cell cell;
setCellDeadline(cell, now);
cell.allocated_size_for_columns = 0;
cell.place_for_serialized_columns = nullptr;
insertCellInCache(key, cell);
}
insertCellInCache(key, cell);
}
deleteUnusedKeysIfNecessary();
}
PaddedPODArray<KeyType> getCachedKeysImpl() const
@ -318,7 +447,7 @@ private:
{
auto & cell = node.getMapped();
if (cell.isDefault())
if (cell.is_default)
continue;
result.emplace_back(node.getKey());
@ -327,37 +456,138 @@ private:
return result;
}
void deleteUnusedKeysIfNecessary()
{
size_t cache_max_size = cache.getMaxSize();
if (unlikely(attributes.empty()) || insert_index * 2 < cache_max_size)
return;
std::unordered_map<size_t, typename CacheLRUHashMap::iterator> element_index_to_cache_iterator;
for (auto begin = cache.begin(); begin != cache.end(); ++begin)
{
auto & node = *begin;
auto & cell = node.getMapped();
size_t element_index = cell.element_index;
element_index_to_cache_iterator.insert(std::make_pair(element_index, begin));
}
size_t last_remove_index = 0;
getAttributeContainer(0, [&, this](auto & container)
{
size_t container_size = container.size();
size_t remove_index = 0;
for (size_t i = 0; i < container_size; ++i)
{
if (indexes_to_delete.contains(i))
continue;
std::swap(container[remove_index], container[i]);
auto it = element_index_to_cache_iterator.find(remove_index);
if (it != element_index_to_cache_iterator.end())
{
auto & cell = it->second->getMapped();
cell.element_index = remove_index;
}
++remove_index;
}
container.erase(container.begin() + remove_index, container.end());
last_remove_index = remove_index;
});
insert_index = last_remove_index;
for (size_t attribute_index = 1; attribute_index < attributes.size(); ++attribute_index)
{
getAttributeContainer(attribute_index, [this](auto & container)
{
size_t container_size = container.size();
size_t remove_index = 0;
for (size_t i = 0; i < container_size; ++i)
{
if (indexes_to_delete.contains(i))
continue;
std::swap(container[remove_index], container[i]);
++remove_index;
}
container.erase(container.begin() + remove_index, container.end());
});
}
indexes_to_delete.clear();
}
template <typename GetContainerFunc>
void getAttributeContainer(size_t attribute_index, GetContainerFunc && func)
{
auto & attribute = attributes[attribute_index];
auto & attribute_type = attribute.type;
if (unlikely(attribute.is_complex_type))
{
auto & container = std::get<std::vector<Field>>(attribute.attribute_container);
std::forward<GetContainerFunc>(func)(container);
}
else
{
auto type_call = [&](const auto & dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using ValueType = DictionaryValueType<AttributeType>;
auto & container = std::get<PaddedPODArray<ValueType>>(attribute.attribute_container);
std::forward<GetContainerFunc>(func)(container);
};
callOnDictionaryAttributeType(attribute_type, type_call);
}
}
template <typename GetContainerFunc>
void getAttributeContainer(size_t attribute_index, GetContainerFunc && func) const
{
return const_cast<std::decay_t<decltype(*this)> *>(this)->template getAttributeContainer(attribute_index, std::forward<GetContainerFunc>(func));
}
using TimePoint = std::chrono::system_clock::time_point;
struct Cell
{
TimePoint deadline;
size_t allocated_size_for_columns;
char * place_for_serialized_columns;
inline bool isDefault() const { return place_for_serialized_columns == nullptr; }
inline void setDefault()
{
place_for_serialized_columns = nullptr;
allocated_size_for_columns = 0;
}
size_t element_index;
bool is_default;
};
void insertCellInCache(KeyType & key, const Cell & cell)
{
/// Copy complex key into arena and put in cache
if constexpr (dictionary_key_type == DictionaryKeyType::complex)
{
/// Copy complex key into arena and put in cache
size_t key_size = key.size;
char * place_for_key = arena.alloc(key_size);
memcpy(reinterpret_cast<void *>(place_for_key), reinterpret_cast<const void *>(key.data), key_size);
KeyType updated_key{place_for_key, key_size};
key = updated_key;
}
key = copyStringInArena(key);
cache.insert(key, cell);
}
StringRef copyStringInArena(StringRef value_to_copy)
{
size_t value_to_copy_size = value_to_copy.size;
char * place_for_key = arena.alloc(value_to_copy_size);
memcpy(reinterpret_cast<void *>(place_for_key), reinterpret_cast<const void *>(value_to_copy.data), value_to_copy_size);
StringRef updated_value{place_for_key, value_to_copy_size};
return updated_value;
}
inline static bool cellHasDeadline(const Cell & cell)
{
return cell.deadline != std::chrono::system_clock::from_time_t(0);
@ -378,34 +608,58 @@ private:
cell.deadline = now + std::chrono::seconds(distribution(rnd_engine));
}
template <typename>
friend class ArenaCellDisposer;
CacheDictionaryStorageConfiguration configuration;
ArenaWithFreeLists arena;
pcg64 rnd_engine;
class ArenaCellDisposer
struct Attribute
{
AttributeUnderlyingType type;
bool is_complex_type;
std::variant<
PaddedPODArray<UInt8>,
PaddedPODArray<UInt16>,
PaddedPODArray<UInt32>,
PaddedPODArray<UInt64>,
PaddedPODArray<UInt128>,
PaddedPODArray<Int8>,
PaddedPODArray<Int16>,
PaddedPODArray<Int32>,
PaddedPODArray<Int64>,
PaddedPODArray<Decimal32>,
PaddedPODArray<Decimal64>,
PaddedPODArray<Decimal128>,
PaddedPODArray<Float32>,
PaddedPODArray<Float64>,
PaddedPODArray<StringRef>,
std::vector<Field>> attribute_container;
};
std::vector<Attribute> attributes;
size_t insert_index = 0;
std::unordered_set<size_t, DefaultHash<size_t>> indexes_to_delete;
class CacheStorageCellDisposer
{
public:
ArenaWithFreeLists & arena;
CacheDictionaryStorage & storage;
template <typename Key, typename Value>
void operator()(const Key & key, const Value & value) const
void operator()(const Key & key, const Value & cell) const
{
/// In case of complex key we keep it in arena
if constexpr (std::is_same_v<Key, StringRef>)
arena.free(const_cast<char *>(key.data), key.size);
storage.arena.free(const_cast<char *>(key.data), key.size);
if (value.place_for_serialized_columns)
arena.free(value.place_for_serialized_columns, value.allocated_size_for_columns);
storage.indexes_to_delete.insert(cell.element_index);
}
};
using SimpleKeyLRUHashMap = LRUHashMap<UInt64, Cell, ArenaCellDisposer>;
using ComplexKeyLRUHashMap = LRUHashMapWithSavedHash<StringRef, Cell, ArenaCellDisposer>;
using SimpleKeyLRUHashMap = LRUHashMap<UInt64, Cell, CacheStorageCellDisposer>;
using ComplexKeyLRUHashMap = LRUHashMapWithSavedHash<StringRef, Cell, CacheStorageCellDisposer>;
using CacheLRUHashMap = std::conditional_t<
dictionary_key_type == DictionaryKeyType::simple,

View File

@ -1316,9 +1316,6 @@ private:
cell.deadline = now + std::chrono::seconds{distribution(rnd_engine)};
}
template <typename>
friend class ArenaCellKeyDisposer;
SSDCacheDictionaryStorageConfiguration configuration;
SSDCacheFileBuffer<SSDCacheKeyType> file_buffer;

View File

@ -0,0 +1,412 @@
#pragma once
#include <chrono>
#include <pcg_random.hpp>
#include <Common/randomSeed.h>
#include <Common/Arena.h>
#include <Common/ArenaWithFreeLists.h>
#include <Common/HashTable/LRUHashMap.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/ICacheDictionaryStorage.h>
#include <Dictionaries/DictionaryHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
struct SerializedCacheDictionaryStorageConfiguration
{
/// Max size of storage in cells
const size_t max_size_in_cells;
/// Needed to perform check if cell is expired or not found. Default value is dictionary max lifetime.
const size_t strict_max_lifetime_seconds;
/// Lifetime of dictionary. Cell deadline is random value between lifetime min and max seconds.
const DictionaryLifetime lifetime;
};
/** Keys are stored in LRUCache and column values are serialized into arena.
Cell in LRUCache consists of allocated size and place in arena were columns serialized data is stored.
Columns are serialized by rows.
When cell is removed from LRUCache data associated with it is also removed from arena.
In case of complex key we also store key data in arena and it is removed from arena.
*/
/// TODO: Remove
template <DictionaryKeyType dictionary_key_type>
class SerializedCacheDictionaryStorage final : public ICacheDictionaryStorage
{
public:
using KeyType = std::conditional_t<dictionary_key_type == DictionaryKeyType::simple, UInt64, StringRef>;
static_assert(dictionary_key_type != DictionaryKeyType::range, "Range key type is not supported by CacheDictionaryStorage");
explicit SerializedCacheDictionaryStorage(SerializedCacheDictionaryStorageConfiguration & configuration_)
: configuration(configuration_)
, rnd_engine(randomSeed())
, cache(configuration.max_size_in_cells, false, { arena })
{
}
bool returnsFetchedColumnsInOrderOfRequestedKeys() const override { return true; }
String getName() const override
{
if (dictionary_key_type == DictionaryKeyType::simple)
return "SerializedCache";
else
return "ComplexKeySerializedCache";
}
bool supportsSimpleKeys() const override { return dictionary_key_type == DictionaryKeyType::simple; }
SimpleKeysStorageFetchResult fetchColumnsForKeys(
const PaddedPODArray<UInt64> & keys,
const DictionaryStorageFetchRequest & fetch_request) override
{
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
return fetchColumnsForKeysImpl<SimpleKeysStorageFetchResult>(keys, fetch_request);
else
throw Exception("Method fetchColumnsForKeys is not supported for complex key storage", ErrorCodes::NOT_IMPLEMENTED);
}
void insertColumnsForKeys(const PaddedPODArray<UInt64> & keys, Columns columns) override
{
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
insertColumnsForKeysImpl(keys, columns);
else
throw Exception("Method insertColumnsForKeys is not supported for complex key storage", ErrorCodes::NOT_IMPLEMENTED);
}
void insertDefaultKeys(const PaddedPODArray<UInt64> & keys) override
{
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
insertDefaultKeysImpl(keys);
else
throw Exception("Method insertDefaultKeysImpl is not supported for complex key storage", ErrorCodes::NOT_IMPLEMENTED);
}
PaddedPODArray<UInt64> getCachedSimpleKeys() const override
{
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
return getCachedKeysImpl();
else
throw Exception("Method getCachedSimpleKeys is not supported for complex key storage", ErrorCodes::NOT_IMPLEMENTED);
}
bool supportsComplexKeys() const override { return dictionary_key_type == DictionaryKeyType::complex; }
ComplexKeysStorageFetchResult fetchColumnsForKeys(
const PaddedPODArray<StringRef> & keys,
const DictionaryStorageFetchRequest & column_fetch_requests) override
{
if constexpr (dictionary_key_type == DictionaryKeyType::complex)
return fetchColumnsForKeysImpl<ComplexKeysStorageFetchResult>(keys, column_fetch_requests);
else
throw Exception("Method fetchColumnsForKeys is not supported for simple key storage", ErrorCodes::NOT_IMPLEMENTED);
}
void insertColumnsForKeys(const PaddedPODArray<StringRef> & keys, Columns columns) override
{
if constexpr (dictionary_key_type == DictionaryKeyType::complex)
insertColumnsForKeysImpl(keys, columns);
else
throw Exception("Method insertColumnsForKeys is not supported for simple key storage", ErrorCodes::NOT_IMPLEMENTED);
}
void insertDefaultKeys(const PaddedPODArray<StringRef> & keys) override
{
if constexpr (dictionary_key_type == DictionaryKeyType::complex)
insertDefaultKeysImpl(keys);
else
throw Exception("Method insertDefaultKeysImpl is not supported for simple key storage", ErrorCodes::NOT_IMPLEMENTED);
}
PaddedPODArray<StringRef> getCachedComplexKeys() const override
{
if constexpr (dictionary_key_type == DictionaryKeyType::complex)
return getCachedKeysImpl();
else
throw Exception("Method getCachedComplexKeys is not supported for simple key storage", ErrorCodes::NOT_IMPLEMENTED);
}
size_t getSize() const override { return cache.size(); }
size_t getMaxSize() const override { return cache.getMaxSize(); }
size_t getBytesAllocated() const override { return arena.size() + cache.getSizeInBytes(); }
private:
template <typename KeysStorageFetchResult>
ALWAYS_INLINE KeysStorageFetchResult fetchColumnsForKeysImpl(
const PaddedPODArray<KeyType> & keys,
const DictionaryStorageFetchRequest & fetch_request)
{
KeysStorageFetchResult result;
result.fetched_columns = fetch_request.makeAttributesResultColumns();
result.key_index_to_state.resize_fill(keys.size(), {KeyState::not_found});
const auto now = std::chrono::system_clock::now();
size_t fetched_columns_index = 0;
std::chrono::seconds max_lifetime_seconds(configuration.strict_max_lifetime_seconds);
size_t keys_size = keys.size();
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
auto key = keys[key_index];
auto * it = cache.find(key);
if (it)
{
/// Columns values for key are serialized in cache now deserialize them
const auto & cell = it->getMapped();
bool has_deadline = cellHasDeadline(cell);
if (has_deadline && now > cell.deadline + max_lifetime_seconds)
{
result.key_index_to_state[key_index] = {KeyState::not_found};
++result.not_found_keys_size;
continue;
}
else if (has_deadline && now > cell.deadline)
{
result.key_index_to_state[key_index] = {KeyState::expired, fetched_columns_index};
++result.expired_keys_size;
}
else
{
result.key_index_to_state[key_index] = {KeyState::found, fetched_columns_index};
++result.found_keys_size;
}
++fetched_columns_index;
if (cell.isDefault())
{
result.key_index_to_state[key_index].setDefault();
++result.default_keys_size;
insertDefaultValuesIntoColumns(result.fetched_columns, fetch_request, key_index);
}
else
{
const char * place_for_serialized_columns = cell.place_for_serialized_columns;
deserializeAndInsertIntoColumns(result.fetched_columns, fetch_request, place_for_serialized_columns);
}
}
else
{
result.key_index_to_state[key_index] = {KeyState::not_found};
++result.not_found_keys_size;
}
}
return result;
}
void insertColumnsForKeysImpl(const PaddedPODArray<KeyType> & keys, Columns columns)
{
Arena temporary_values_pool;
size_t columns_to_serialize_size = columns.size();
PaddedPODArray<StringRef> temporary_column_data(columns_to_serialize_size);
const auto now = std::chrono::system_clock::now();
size_t keys_size = keys.size();
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
size_t allocated_size_for_columns = 0;
const char * block_start = nullptr;
auto key = keys[key_index];
auto * it = cache.find(key);
for (size_t column_index = 0; column_index < columns_to_serialize_size; ++column_index)
{
auto & column = columns[column_index];
temporary_column_data[column_index] = column->serializeValueIntoArena(key_index, temporary_values_pool, block_start);
allocated_size_for_columns += temporary_column_data[column_index].size;
}
char * place_for_serialized_columns = arena.alloc(allocated_size_for_columns);
memcpy(reinterpret_cast<void*>(place_for_serialized_columns), reinterpret_cast<const void*>(block_start), allocated_size_for_columns);
if (it)
{
/// Cell exists need to free previous serialized place and update deadline
auto & cell = it->getMapped();
if (cell.place_for_serialized_columns)
arena.free(cell.place_for_serialized_columns, cell.allocated_size_for_columns);
setCellDeadline(cell, now);
cell.allocated_size_for_columns = allocated_size_for_columns;
cell.place_for_serialized_columns = place_for_serialized_columns;
}
else
{
/// No cell exists so create and put in cache
Cell cell;
setCellDeadline(cell, now);
cell.allocated_size_for_columns = allocated_size_for_columns;
cell.place_for_serialized_columns = place_for_serialized_columns;
insertCellInCache(key, cell);
}
temporary_values_pool.rollback(allocated_size_for_columns);
}
}
void insertDefaultKeysImpl(const PaddedPODArray<KeyType> & keys)
{
const auto now = std::chrono::system_clock::now();
for (auto key : keys)
{
auto * it = cache.find(key);
if (it)
{
auto & cell = it->getMapped();
setCellDeadline(cell, now);
if (cell.place_for_serialized_columns)
arena.free(cell.place_for_serialized_columns, cell.allocated_size_for_columns);
cell.allocated_size_for_columns = 0;
cell.place_for_serialized_columns = nullptr;
}
else
{
Cell cell;
setCellDeadline(cell, now);
cell.allocated_size_for_columns = 0;
cell.place_for_serialized_columns = nullptr;
insertCellInCache(key, cell);
}
}
}
PaddedPODArray<KeyType> getCachedKeysImpl() const
{
PaddedPODArray<KeyType> result;
result.reserve(cache.size());
for (auto & node : cache)
{
auto & cell = node.getMapped();
if (cell.isDefault())
continue;
result.emplace_back(node.getKey());
}
return result;
}
using TimePoint = std::chrono::system_clock::time_point;
struct Cell
{
TimePoint deadline;
size_t allocated_size_for_columns;
char * place_for_serialized_columns;
inline bool isDefault() const { return place_for_serialized_columns == nullptr; }
inline void setDefault()
{
place_for_serialized_columns = nullptr;
allocated_size_for_columns = 0;
}
};
void insertCellInCache(KeyType & key, const Cell & cell)
{
if constexpr (dictionary_key_type == DictionaryKeyType::complex)
{
/// Copy complex key into arena and put in cache
size_t key_size = key.size;
char * place_for_key = arena.alloc(key_size);
memcpy(reinterpret_cast<void *>(place_for_key), reinterpret_cast<const void *>(key.data), key_size);
KeyType updated_key{place_for_key, key_size};
key = updated_key;
}
cache.insert(key, cell);
}
inline static bool cellHasDeadline(const Cell & cell)
{
return cell.deadline != std::chrono::system_clock::from_time_t(0);
}
inline void setCellDeadline(Cell & cell, TimePoint now)
{
if (configuration.lifetime.min_sec == 0 && configuration.lifetime.max_sec == 0)
{
cell.deadline = std::chrono::system_clock::from_time_t(0);
return;
}
size_t min_sec_lifetime = configuration.lifetime.min_sec;
size_t max_sec_lifetime = configuration.lifetime.max_sec;
std::uniform_int_distribution<UInt64> distribution{min_sec_lifetime, max_sec_lifetime};
cell.deadline = now + std::chrono::seconds(distribution(rnd_engine));
}
SerializedCacheDictionaryStorageConfiguration configuration;
ArenaWithFreeLists arena;
pcg64 rnd_engine;
class ArenaCellDisposer
{
public:
ArenaWithFreeLists & arena;
template <typename Key, typename Value>
void operator()(const Key & key, const Value & value) const
{
/// In case of complex key we keep it in arena
if constexpr (std::is_same_v<Key, StringRef>)
arena.free(const_cast<char *>(key.data), key.size);
if (value.place_for_serialized_columns)
arena.free(value.place_for_serialized_columns, value.allocated_size_for_columns);
}
};
using SimpleKeyLRUHashMap = LRUHashMap<UInt64, Cell, ArenaCellDisposer>;
using ComplexKeyLRUHashMap = LRUHashMapWithSavedHash<StringRef, Cell, ArenaCellDisposer>;
using CacheLRUHashMap = std::conditional_t<
dictionary_key_type == DictionaryKeyType::simple,
SimpleKeyLRUHashMap,
ComplexKeyLRUHashMap>;
CacheLRUHashMap cache;
};
}

View File

@ -1,154 +0,0 @@
clickhouse-client --query="DROP TABLE IF EXISTS simple_cache_dictionary_table_source";
clickhouse-client --query="CREATE TABLE simple_cache_dictionary_table_source (id UInt64, value1 String, value2 UInt64, value3 String, value4 Float64, value5 Decimal64(4)) ENGINE=TinyLog;"
clickhouse-client --query="INSERT INTO simple_cache_dictionary_table_source SELECT number, concat('Value1 ', toString(number)), number, concat('Value3 ', toString(number)), toFloat64(number), cast(number, 'Decimal64(4)') FROM system.numbers LIMIT 1000000;"
clickhouse-client --multiquery --query="CREATE DICTIONARY clickhouse_simple_cache_dictionary (
id UInt64,
value1 String,
value2 UInt64,
value3 String,
value4 Float64,
value5 Decimal64(4)
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'simple_cache_dictionary_table_source' PASSWORD '' DB 'default'))
LIFETIME(MIN 300 MAX 300)
LAYOUT(CACHE(SIZE_IN_CELLS 100000));"
clickhouse-client --multiquery --query="CREATE DICTIONARY clickhouse_ssd_simple_cache_dictionary (
id UInt64,
value1 String,
value2 UInt64,
value3 String,
value4 Float64,
value5 Decimal64(4)
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'simple_cache_dictionary_table_source' PASSWORD '' DB 'default'))
LIFETIME(MIN 300 MAX 300)
LAYOUT(SSD_CACHE(BLOCK_SIZE 4096 FILE_SIZE 16777216 READ_BUFFER_SIZE 1048576 WRITE_BUFFER_SIZE 327680 MAX_STORED_KEYS 1048576 PATH '/opt/mkita/ClickHouse/build_release/programs/ssd_cache'));"
clickhouse-client --multiquery --query="CREATE DICTIONARY clickhouse_dummy_simple_cache_dictionary (
id UInt64,
value1 String,
value2 UInt64,
value3 String,
value4 Float64,
value5 Decimal64(4)
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'simple_cache_dictionary_table_source' PASSWORD '' DB 'default'))
LIFETIME(MIN 300 MAX 300)
LAYOUT(DUMMY_SIMPLE());"
./clickhouse-benchmark --query="SELECT
dictGet('default.clickhouse_dummy_simple_cache_dictionary', 'value1', number),
dictGet('default.clickhouse_dummy_simple_cache_dictionary', 'value2', number),
dictGet('default.clickhouse_dummy_simple_cache_dictionary', 'value3', number),
dictGet('default.clickhouse_dummy_simple_cache_dictionary', 'value4', number),
dictGet('default.clickhouse_dummy_simple_cache_dictionary', 'value5', number)
FROM system.numbers
LIMIT 10000
FORMAT Null"
./clickhouse-benchmark --query="SELECT
dictGet('default.clickhouse_simple_cache_dictionary', ('value1', 'value2', 'value3', 'value4', 'value5'), number)
FROM system.numbers
LIMIT 10000
FORMAT Null"
./clickhouse-benchmark --query="SELECT dictGet('default.clickhouse_ssd_simple_cache_dictionary', 'value1', number) FROM system.numbers_mt LIMIT 10000 FORMAT Null"
./clickhouse-benchmark --query="SELECT
dictGet('default.clickhouse_simple_cache_dictionary', 'value1', number),
dictGet('default.clickhouse_simple_cache_dictionary', 'value2', number),
dictGet('default.clickhouse_simple_cache_dictionary', 'value3', number),
dictGet('default.clickhouse_simple_cache_dictionary', 'value4', number),
dictGet('default.clickhouse_simple_cache_dictionary', 'value5', number)
FROM system.numbers
LIMIT 10000
FORMAT Null"
./clickhouse-benchmark --query="SELECT dictGet('default.clickhouse_ssd_simple_cache_dictionary', 'value1', number) FROM system.numbers_mt LIMIT 10000 FORMAT Null"
SELECT
dictGet('default.clickhouse_ssd_simple_cache_dictionary', 'value1', number),
dictGet('default.clickhouse_ssd_simple_cache_dictionary', 'value2', number),
dictGet('default.clickhouse_ssd_simple_cache_dictionary', 'value3', number),
dictGet('default.clickhouse_ssd_simple_cache_dictionary', 'value4', number),
dictGet('default.clickhouse_ssd_simple_cache_dictionary', 'value5', number)
FROM system.numbers
LIMIT 10000
FORMAT Null
SELECT dictGet('default.clickhouse_simple_cache_dictionary', ('value1', 'value2', 'value3', 'value4', 'value5'), number) FROM system.numbers LIMIT 10000 FORMAT Null
SELECT dictGet('default.clickhouse_ssd_simple_cache_dictionary', ('value1', 'value2', 'value3', 'value4', 'value5'), number) FROM system.numbers LIMIT 10000
FORMAT Null
SELECT
dictGet('default.clickhouse_simple_cache_dictionary', ('value1', 'value2', 'value3', 'value4', 'value5'), number)
FROM system.numbers
LIMIT 10000
FORMAT
Null
SELECT
dictGet('default.clickhouse_simple_cache_dictionary', 'value1', number),
dictGet('default.clickhouse_simple_cache_dictionary', 'value2', number),
dictGet('default.clickhouse_simple_cache_dictionary', 'value3', number),
dictGet('default.clickhouse_simple_cache_dictionary', 'value4', number),
dictGet('default.clickhouse_simple_cache_dictionary', 'value5', number)
FROM system.numbers
LIMIT 10000
FORMAT
Null
SELECT
dictGet('default.clickhouse_simple_cache_dictionary', 'value1', number),
dictGet('default.clickhouse_simple_cache_dictionary', 'value2', number)
FROM system.numbers
LIMIT 10000
FORMAT Null
SELECT
dictGet('clickhouse_simple_cache_dictionary', 'value1', number)
FROM system.numbers
LIMIT 100000
FORMAT Null
SELECT
dictGet('clickhouse_simple_cache_dictionary', 'value2', number)
FROM system.numbers
LIMIT 100000
FORMAT Null
SELECT
dictGet('clickhouse_simple_cache_dictionary', 'value3', number)
FROM system.numbers
LIMIT 100000
FORMAT Null
SELECT
dictGet('clickhouse_simple_cache_dictionary', 'value4', number)
FROM system.numbers
LIMIT 100000
FORMAT Null
SELECT
dictGet('clickhouse_simple_cache_dictionary', 'value5', number)
FROM system.numbers
LIMIT 100000
FORMAT Null
SELECT
dictGet('clickhouse_simple_cache_dictionary', 'value1', number),
dictGet('clickhouse_simple_cache_dictionary', 'value2', number),
dictGet('clickhouse_simple_cache_dictionary', 'value3', number),
dictGet('clickhouse_simple_cache_dictionary', 'value4', number),
dictGet('clickhouse_simple_cache_dictionary', 'value5', number)
FROM system.numbers
LIMIT 100000
FORMAT Null
SELECT * FROM clickhouse_simple_cache_dictionary_table;

View File

@ -1,6 +1,7 @@
#include "CacheDictionary.h"
#include "SSDCacheDictionaryStorage.h"
#include "CacheDictionaryStorage.h"
#include "SerializedCacheDictionaryStorage.h"
#include "SSDCacheDictionaryStorage.h"
#include <Dictionaries/DictionaryFactory.h>
namespace DB
@ -18,9 +19,16 @@ CacheDictionaryStorageConfiguration parseCacheStorageConfiguration(
const Poco::Util::AbstractConfiguration & config,
const String & layout_prefix,
const DictionaryLifetime & dict_lifetime,
DictionaryKeyType dictionary_key_type)
DictionaryKeyType dictionary_key_type,
bool serialized_storage)
{
String dictionary_type_prefix = dictionary_key_type == DictionaryKeyType::complex ? ".complex_key_cache." : ".cache.";
String dictionary_type_prefix;
if (!serialized_storage)
dictionary_type_prefix = dictionary_key_type == DictionaryKeyType::complex ? ".complex_key_cache." : ".cache.";
else
dictionary_type_prefix = dictionary_key_type == DictionaryKeyType::complex ? ".serialized_complex_key_cache." : ".serialized_cache.";
String dictionary_configuration_prefix = layout_prefix + dictionary_type_prefix;
const size_t size = config.getUInt64(dictionary_configuration_prefix + "size_in_cells");
@ -158,7 +166,8 @@ DictionaryPtr createCacheDictionaryLayout(
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr)
DictionarySourcePtr source_ptr,
bool serialized_storage)
{
static_assert(dictionary_key_type != DictionaryKeyType::range, "Range key type is not supported by CacheDictionary");
@ -193,8 +202,23 @@ DictionaryPtr createCacheDictionaryLayout(
const bool allow_read_expired_keys = config.getBool(layout_prefix + ".cache.allow_read_expired_keys", false);
auto storage_configuration = parseCacheStorageConfiguration(full_name, config, layout_prefix, dict_lifetime, dictionary_key_type);
auto storage = std::make_shared<CacheDictionaryStorage<dictionary_key_type>>(storage_configuration);
auto storage_configuration = parseCacheStorageConfiguration(full_name, config, layout_prefix, dict_lifetime, dictionary_key_type, serialized_storage);
std::shared_ptr<ICacheDictionaryStorage> storage;
if (serialized_storage)
{
SerializedCacheDictionaryStorageConfiguration serialized_configuration
{
.max_size_in_cells = storage_configuration.max_size_in_cells,
.strict_max_lifetime_seconds = storage_configuration.strict_max_lifetime_seconds,
.lifetime = storage_configuration.lifetime,
};
storage = std::make_shared<SerializedCacheDictionaryStorage<dictionary_key_type>>(serialized_configuration);
}
else
storage = std::make_shared<CacheDictionaryStorage<dictionary_key_type>>(dict_struct, storage_configuration);
auto update_queue_configuration = parseCacheDictionaryUpdateQueueConfiguration(full_name, config, layout_prefix, dictionary_key_type);
@ -265,7 +289,7 @@ void registerDictionaryCache(DictionaryFactory & factory)
const std::string & config_prefix,
DictionarySourcePtr source_ptr) -> DictionaryPtr
{
return createCacheDictionaryLayout<DictionaryKeyType::simple>(full_name, dict_struct, config, config_prefix, std::move(source_ptr));
return createCacheDictionaryLayout<DictionaryKeyType::simple>(full_name, dict_struct, config, config_prefix, std::move(source_ptr), false);
};
factory.registerLayout("cache", create_simple_cache_layout, false);
@ -276,11 +300,33 @@ void registerDictionaryCache(DictionaryFactory & factory)
const std::string & config_prefix,
DictionarySourcePtr source_ptr) -> DictionaryPtr
{
return createCacheDictionaryLayout<DictionaryKeyType::complex>(full_name, dict_struct, config, config_prefix, std::move(source_ptr));
return createCacheDictionaryLayout<DictionaryKeyType::complex>(full_name, dict_struct, config, config_prefix, std::move(source_ptr), false);
};
factory.registerLayout("complex_key_cache", create_complex_key_cache_layout, true);
auto create_simple_serialized_cache_layout = [=](const String & full_name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr) -> DictionaryPtr
{
return createCacheDictionaryLayout<DictionaryKeyType::simple>(full_name, dict_struct, config, config_prefix, std::move(source_ptr), true);
};
factory.registerLayout("serialized_cache", create_simple_serialized_cache_layout, false);
auto create_complex_key_serialzied_cache_layout = [=](const std::string & full_name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr) -> DictionaryPtr
{
return createCacheDictionaryLayout<DictionaryKeyType::complex>(full_name, dict_struct, config, config_prefix, std::move(source_ptr), true);
};
factory.registerLayout("complex_key_serialized_cache", create_complex_key_serialzied_cache_layout, true);
#if defined(OS_LINUX) || defined(__FreeBSD__)
auto create_simple_ssd_cache_layout = [=](const std::string & full_name,