Merge pull request #21573 from kitaisreal/updated-cache-dictionary-storage

Added specialized CacheDictionaryStorage
This commit is contained in:
Maksim Kita 2021-03-25 21:29:01 +03:00 committed by GitHub
commit bbe1960eea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 682 additions and 458 deletions

View File

@ -320,8 +320,6 @@ Similar to `cache`, but stores data on SSD and index in RAM.
<write_buffer_size>1048576</write_buffer_size>
<!-- Path where cache file will be stored. -->
<path>/var/lib/clickhouse/clickhouse_dictionaries/test_dict</path>
<!-- Max number on stored keys in the cache. Rounded up to a power of two. -->
<max_stored_keys>1048576</max_stored_keys>
</ssd_cache>
</layout>
```
@ -329,8 +327,8 @@ Similar to `cache`, but stores data on SSD and index in RAM.
or
``` sql
LAYOUT(CACHE(BLOCK_SIZE 4096 FILE_SIZE 16777216 READ_BUFFER_SIZE 1048576
PATH /var/lib/clickhouse/clickhouse_dictionaries/test_dict MAX_STORED_KEYS 1048576))
LAYOUT(SSD_CACHE(BLOCK_SIZE 4096 FILE_SIZE 16777216 READ_BUFFER_SIZE 1048576
PATH /var/lib/clickhouse/clickhouse_dictionaries/test_dict))
```
### complex_key_ssd_cache {#complex-key-ssd-cache}

View File

@ -318,8 +318,6 @@ LAYOUT(CACHE(SIZE_IN_CELLS 1000000000))
<write_buffer_size>1048576</write_buffer_size>
<!-- Path where cache file will be stored. -->
<path>/var/lib/clickhouse/clickhouse_dictionaries/test_dict</path>
<!-- Max number on stored keys in the cache. Rounded up to a power of two. -->
<max_stored_keys>1048576</max_stored_keys>
</ssd_cache>
</layout>
```
@ -327,8 +325,8 @@ LAYOUT(CACHE(SIZE_IN_CELLS 1000000000))
или
``` sql
LAYOUT(CACHE(BLOCK_SIZE 4096 FILE_SIZE 16777216 READ_BUFFER_SIZE 1048576
PATH /var/lib/clickhouse/clickhouse_dictionaries/test_dict MAX_STORED_KEYS 1048576))
LAYOUT(SSD_CACHE(BLOCK_SIZE 4096 FILE_SIZE 16777216 READ_BUFFER_SIZE 1048576
PATH /var/lib/clickhouse/clickhouse_dictionaries/test_dict))
```
### complex_key_ssd_cache {#complex-key-ssd-cache}

View File

@ -271,13 +271,13 @@ private:
};
template <typename Key, typename Mapped>
struct DefaultCellDisposer
struct DefaultLRUHashMapCellDisposer
{
void operator()(const Key &, const Mapped &) const {}
};
template <typename Key, typename Value, typename Disposer = DefaultCellDisposer<Key, Value>, typename Hash = DefaultHash<Key>>
template <typename Key, typename Value, typename Disposer = DefaultLRUHashMapCellDisposer<Key, Value>, typename Hash = DefaultHash<Key>>
using LRUHashMap = LRUHashMapImpl<Key, Value, Disposer, Hash, false>;
template <typename Key, typename Value, typename Disposer = DefaultCellDisposer<Key, Value>, typename Hash = DefaultHash<Key>>
template <typename Key, typename Value, typename Disposer = DefaultLRUHashMapCellDisposer<Key, Value>, typename Hash = DefaultHash<Key>>
using LRUHashMapWithSavedHash = LRUHashMapImpl<Key, Value, Disposer, Hash, true>;

View File

@ -692,6 +692,30 @@ public:
assign(from.begin(), from.end());
}
void erase(const_iterator first, const_iterator last)
{
iterator first_no_const = const_cast<iterator>(first);
iterator last_no_const = const_cast<iterator>(last);
size_t items_to_move = end() - last;
while (items_to_move != 0)
{
*first_no_const = *last_no_const;
++first_no_const;
++last_no_const;
--items_to_move;
}
this->c_end = reinterpret_cast<char *>(first_no_const);
}
void erase(const_iterator pos)
{
this->erase(pos, pos + 1);
}
bool operator== (const PODArray & rhs) const
{

View File

@ -92,3 +92,57 @@ TEST(Common, PODInsertElementSizeNotMultipleOfLeftPadding)
EXPECT_EQ(arr1_initially_empty.size(), items_to_insert_size);
}
TEST(Common, PODErase)
{
{
PaddedPODArray<UInt64> items {0,1,2,3,4,5,6,7,8,9};
PaddedPODArray<UInt64> expected;
expected = {0,1,2,3,4,5,6,7,8,9};
items.erase(items.begin(), items.begin());
EXPECT_EQ(items, expected);
items.erase(items.end(), items.end());
EXPECT_EQ(items, expected);
}
{
PaddedPODArray<UInt64> actual {0,1,2,3,4,5,6,7,8,9};
PaddedPODArray<UInt64> expected;
expected = {0,1,4,5,6,7,8,9};
actual.erase(actual.begin() + 2, actual.begin() + 4);
EXPECT_EQ(actual, expected);
expected = {0,1,4};
actual.erase(actual.begin() + 3, actual.end());
EXPECT_EQ(actual, expected);
expected = {};
actual.erase(actual.begin(), actual.end());
EXPECT_EQ(actual, expected);
for (size_t i = 0; i < 10; ++i)
actual.emplace_back(static_cast<UInt64>(i));
expected = {0,1,4,5,6,7,8,9};
actual.erase(actual.begin() + 2, actual.begin() + 4);
EXPECT_EQ(actual, expected);
expected = {0,1,4};
actual.erase(actual.begin() + 3, actual.end());
EXPECT_EQ(actual, expected);
expected = {};
actual.erase(actual.begin(), actual.end());
EXPECT_EQ(actual, expected);
}
{
PaddedPODArray<UInt64> actual {0,1,2,3,4,5,6,7,8,9};
PaddedPODArray<UInt64> expected;
expected = {1,2,3,4,5,6,7,8,9};
actual.erase(actual.begin());
EXPECT_EQ(actual, expected);
}
}

View File

@ -101,7 +101,7 @@ template <DictionaryKeyType dictionary_key_type>
double CacheDictionary<dictionary_key_type>::getLoadFactor() const
{
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
return static_cast<double>(cache_storage_ptr->getSize()) / cache_storage_ptr->getMaxSize();
return cache_storage_ptr->getLoadFactor();
}
template <DictionaryKeyType dictionary_key_type>
@ -333,9 +333,7 @@ Columns CacheDictionary<dictionary_key_type>::getColumnsImpl(
FetchResult result_of_fetch_from_storage;
{
/// Write lock on storage
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
result_of_fetch_from_storage = cache_storage_ptr->fetchColumnsForKeys(keys, request);
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <chrono>
#include <variant>
#include <pcg_random.hpp>
@ -30,28 +31,31 @@ struct CacheDictionaryStorageConfiguration
const DictionaryLifetime lifetime;
};
/** Keys are stored in LRUCache and column values are serialized into arena.
Cell in LRUCache consists of allocated size and place in arena were columns serialized data is stored.
Columns are serialized by rows.
When cell is removed from LRUCache data associated with it is also removed from arena.
In case of complex key we also store key data in arena and it is removed from arena.
*/
/** ICacheDictionaryStorage implementation that keeps key in hash table with fixed collision length.
* Value in hash table point to index in attributes arrays.
*/
template <DictionaryKeyType dictionary_key_type>
class CacheDictionaryStorage final : public ICacheDictionaryStorage
{
static constexpr size_t max_collision_length = 10;
public:
using KeyType = std::conditional_t<dictionary_key_type == DictionaryKeyType::simple, UInt64, StringRef>;
static_assert(dictionary_key_type != DictionaryKeyType::range, "Range key type is not supported by CacheDictionaryStorage");
explicit CacheDictionaryStorage(CacheDictionaryStorageConfiguration & configuration_)
explicit CacheDictionaryStorage(
const DictionaryStructure & dictionary_structure,
CacheDictionaryStorageConfiguration & configuration_)
: configuration(configuration_)
, rnd_engine(randomSeed())
, cache(configuration.max_size_in_cells, false, { arena })
{
size_t cells_size = roundUpToPowerOfTwoOrZero(std::max(configuration.max_size_in_cells, max_collision_length));
cells.resize_fill(cells_size);
size_overlap_mask = cells_size - 1;
setup(dictionary_structure);
}
bool returnsFetchedColumnsInOrderOfRequestedKeys() const override { return true; }
@ -71,9 +75,7 @@ public:
const DictionaryStorageFetchRequest & fetch_request) override
{
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
{
return fetchColumnsForKeysImpl<SimpleKeysStorageFetchResult>(keys, fetch_request);
}
else
throw Exception("Method fetchColumnsForKeys is not supported for complex key storage", ErrorCodes::NOT_IMPLEMENTED);
}
@ -109,9 +111,7 @@ public:
const DictionaryStorageFetchRequest & column_fetch_requests) override
{
if constexpr (dictionary_key_type == DictionaryKeyType::complex)
{
return fetchColumnsForKeysImpl<ComplexKeysStorageFetchResult>(keys, column_fetch_requests);
}
else
throw Exception("Method fetchColumnsForKeys is not supported for simple key storage", ErrorCodes::NOT_IMPLEMENTED);
}
@ -140,79 +140,162 @@ public:
throw Exception("Method getCachedComplexKeys is not supported for simple key storage", ErrorCodes::NOT_IMPLEMENTED);
}
size_t getSize() const override { return cache.size(); }
size_t getSize() const override { return size; }
size_t getMaxSize() const override { return cache.getMaxSize(); }
double getLoadFactor() const override { return static_cast<double>(size) / configuration.max_size_in_cells; }
size_t getBytesAllocated() const override { return arena.size() + cache.getSizeInBytes(); }
size_t getBytesAllocated() const override
{
size_t attributes_size_in_bytes = 0;
size_t attributes_size = attributes.size();
for (size_t attribute_index = 0; attribute_index < attributes_size; ++attribute_index)
{
getAttributeContainer(attribute_index, [&](const auto & container)
{
attributes_size_in_bytes += container.capacity() * sizeof(container[0]);
});
}
return arena.size() + sizeof(Cell) * configuration.max_size_in_cells + attributes_size_in_bytes;
}
private:
struct FetchedKey
{
FetchedKey(size_t element_index_, bool is_default_)
: element_index(element_index_)
, is_default(is_default_)
{}
size_t element_index;
bool is_default;
};
template <typename KeysStorageFetchResult>
ALWAYS_INLINE KeysStorageFetchResult fetchColumnsForKeysImpl(
KeysStorageFetchResult fetchColumnsForKeysImpl(
const PaddedPODArray<KeyType> & keys,
const DictionaryStorageFetchRequest & fetch_request)
{
KeysStorageFetchResult result;
result.fetched_columns = fetch_request.makeAttributesResultColumns();
result.key_index_to_state.resize_fill(keys.size(), {KeyState::not_found});
result.key_index_to_state.resize_fill(keys.size());
const auto now = std::chrono::system_clock::now();
const time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
size_t fetched_columns_index = 0;
size_t keys_size = keys.size();
std::chrono::seconds max_lifetime_seconds(configuration.strict_max_lifetime_seconds);
size_t keys_size = keys.size();
PaddedPODArray<FetchedKey> fetched_keys;
fetched_keys.resize_fill(keys_size);
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
auto key = keys[key_index];
auto * it = cache.find(key);
auto [key_state, cell_index] = getKeyStateAndCellIndex(key, now);
if (it)
if (unlikely(key_state == KeyState::not_found))
{
/// Columns values for key are serialized in cache now deserialize them
const auto & cell = it->getMapped();
result.key_index_to_state[key_index] = {KeyState::not_found};
++result.not_found_keys_size;
continue;
}
bool has_deadline = cellHasDeadline(cell);
auto & cell = cells[cell_index];
if (has_deadline && now > cell.deadline + max_lifetime_seconds)
{
result.key_index_to_state[key_index] = {KeyState::not_found};
++result.not_found_keys_size;
continue;
}
else if (has_deadline && now > cell.deadline)
{
result.key_index_to_state[key_index] = {KeyState::expired, fetched_columns_index};
++result.expired_keys_size;
}
else
{
result.key_index_to_state[key_index] = {KeyState::found, fetched_columns_index};
++result.found_keys_size;
}
result.expired_keys_size += static_cast<size_t>(key_state == KeyState::expired);
++fetched_columns_index;
result.key_index_to_state[key_index] = {key_state, fetched_columns_index};
fetched_keys[fetched_columns_index] = FetchedKey(cell.element_index, cell.is_default);
if (cell.isDefault())
++fetched_columns_index;
result.key_index_to_state[key_index].setDefaultValue(cell.is_default);
result.default_keys_size += cell.is_default;
}
result.found_keys_size = keys_size - (result.expired_keys_size + result.not_found_keys_size);
for (size_t attribute_index = 0; attribute_index < fetch_request.attributesSize(); ++attribute_index)
{
if (!fetch_request.shouldFillResultColumnWithIndex(attribute_index))
continue;
auto & attribute = attributes[attribute_index];
const auto & default_value_provider = fetch_request.defaultValueProviderAtIndex(attribute_index);
size_t fetched_keys_size = fetched_keys.size();
auto & fetched_column = *result.fetched_columns[attribute_index];
fetched_column.reserve(fetched_keys_size);
if (unlikely(attribute.is_complex_type))
{
auto & container = std::get<std::vector<Field>>(attribute.attribute_container);
for (size_t fetched_key_index = 0; fetched_key_index < fetched_columns_index; ++fetched_key_index)
{
result.key_index_to_state[key_index].setDefault();
++result.default_keys_size;
insertDefaultValuesIntoColumns(result.fetched_columns, fetch_request, key_index);
}
else
{
const char * place_for_serialized_columns = cell.place_for_serialized_columns;
deserializeAndInsertIntoColumns(result.fetched_columns, fetch_request, place_for_serialized_columns);
auto fetched_key = fetched_keys[fetched_key_index];
if (unlikely(fetched_key.is_default))
fetched_column.insert(default_value_provider.getDefaultValue(fetched_key_index));
else
fetched_column.insert(container[fetched_key.element_index]);
}
}
else
{
result.key_index_to_state[key_index] = {KeyState::not_found};
++result.not_found_keys_size;
auto type_call = [&](const auto & dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using ValueType = DictionaryValueType<AttributeType>;
using ColumnType =
std::conditional_t<std::is_same_v<AttributeType, String>, ColumnString,
std::conditional_t<IsDecimalNumber<AttributeType>, ColumnDecimal<ValueType>,
ColumnVector<AttributeType>>>;
auto & container = std::get<PaddedPODArray<ValueType>>(attribute.attribute_container);
ColumnType & column_typed = static_cast<ColumnType &>(fetched_column);
if constexpr (std::is_same_v<ColumnType, ColumnString>)
{
for (size_t fetched_key_index = 0; fetched_key_index < fetched_columns_index; ++fetched_key_index)
{
auto fetched_key = fetched_keys[fetched_key_index];
if (unlikely(fetched_key.is_default))
column_typed.insert(default_value_provider.getDefaultValue(fetched_key_index));
else
{
auto item = container[fetched_key.element_index];
column_typed.insertData(item.data, item.size);
}
}
}
else
{
auto & data = column_typed.getData();
for (size_t fetched_key_index = 0; fetched_key_index < fetched_columns_index; ++fetched_key_index)
{
auto fetched_key = fetched_keys[fetched_key_index];
if (unlikely(fetched_key.is_default))
column_typed.insert(default_value_provider.getDefaultValue(fetched_key_index));
else
{
auto item = container[fetched_key.element_index];
data.push_back(item);
}
}
}
};
callOnDictionaryAttributeType(attribute.type, type_call);
}
}
@ -221,58 +304,108 @@ private:
void insertColumnsForKeysImpl(const PaddedPODArray<KeyType> & keys, Columns columns)
{
Arena temporary_values_pool;
size_t columns_to_serialize_size = columns.size();
PaddedPODArray<StringRef> temporary_column_data(columns_to_serialize_size);
const auto now = std::chrono::system_clock::now();
size_t keys_size = keys.size();
Field column_value;
for (size_t key_index = 0; key_index < keys_size; ++key_index)
for (size_t key_index = 0; key_index < keys.size(); ++key_index)
{
size_t allocated_size_for_columns = 0;
const char * block_start = nullptr;
auto key = keys[key_index];
auto * it = cache.find(key);
for (size_t column_index = 0; column_index < columns_to_serialize_size; ++column_index)
size_t cell_index = getCellIndexForInsert(key);
auto & cell = cells[cell_index];
bool cell_was_default = cell.is_default;
cell.is_default = false;
bool was_inserted = cell.deadline == 0;
if (was_inserted)
{
auto & column = columns[column_index];
temporary_column_data[column_index] = column->serializeValueIntoArena(key_index, temporary_values_pool, block_start);
allocated_size_for_columns += temporary_column_data[column_index].size;
}
if constexpr (std::is_same_v<KeyType, StringRef>)
cell.key = copyStringInArena(key);
else
cell.key = key;
char * place_for_serialized_columns = arena.alloc(allocated_size_for_columns);
memcpy(reinterpret_cast<void*>(place_for_serialized_columns), reinterpret_cast<const void*>(block_start), allocated_size_for_columns);
for (size_t attribute_index = 0; attribute_index < columns.size(); ++attribute_index)
{
auto & column = columns[attribute_index];
if (it)
{
/// Cell exists need to free previous serialized place and update deadline
auto & cell = it->getMapped();
getAttributeContainer(attribute_index, [&](auto & container)
{
container.emplace_back();
cell.element_index = container.size() - 1;
if (cell.place_for_serialized_columns)
arena.free(cell.place_for_serialized_columns, cell.allocated_size_for_columns);
using ElementType = std::decay_t<decltype(container[0])>;
setCellDeadline(cell, now);
cell.allocated_size_for_columns = allocated_size_for_columns;
cell.place_for_serialized_columns = place_for_serialized_columns;
column->get(key_index, column_value);
if constexpr (std::is_same_v<ElementType, Field>)
container.back() = column_value;
else if constexpr (std::is_same_v<ElementType, StringRef>)
{
const String & string_value = column_value.get<String>();
StringRef string_value_ref = StringRef {string_value.data(), string_value.size()};
StringRef inserted_value = copyStringInArena(string_value_ref);
container.back() = inserted_value;
}
else
container.back() = column_value.get<NearestFieldType<ElementType>>();
});
}
++size;
}
else
{
/// No cell exists so create and put in cache
Cell cell;
if (cell.key != key)
{
if constexpr (std::is_same_v<KeyType, StringRef>)
{
char * data = const_cast<char *>(cell.key.data);
arena.free(data, cell.key.size);
cell.key = copyStringInArena(key);
}
else
cell.key = key;
}
setCellDeadline(cell, now);
cell.allocated_size_for_columns = allocated_size_for_columns;
cell.place_for_serialized_columns = place_for_serialized_columns;
/// Put values into existing index
size_t index_to_use = cell.element_index;
insertCellInCache(key, cell);
for (size_t attribute_index = 0; attribute_index < columns.size(); ++attribute_index)
{
auto & column = columns[attribute_index];
getAttributeContainer(attribute_index, [&](auto & container)
{
using ElementType = std::decay_t<decltype(container[0])>;
column->get(key_index, column_value);
if constexpr (std::is_same_v<ElementType, Field>)
container[index_to_use] = column_value;
else if constexpr (std::is_same_v<ElementType, StringRef>)
{
const String & string_value = column_value.get<String>();
StringRef string_ref_value = StringRef {string_value.data(), string_value.size()};
StringRef inserted_value = copyStringInArena(string_ref_value);
if (!cell_was_default)
{
StringRef previous_value = container[index_to_use];
arena.free(const_cast<char *>(previous_value.data), previous_value.size);
}
container[index_to_use] = inserted_value;
}
else
container[index_to_use] = column_value.get<NearestFieldType<ElementType>>();
});
}
}
temporary_values_pool.rollback(allocated_size_for_columns);
setCellDeadline(cell, now);
}
}
@ -280,94 +413,224 @@ private:
{
const auto now = std::chrono::system_clock::now();
for (auto key : keys)
size_t keys_size = keys.size();
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
auto * it = cache.find(key);
auto key = keys[key_index];
if (it)
size_t cell_index = getCellIndexForInsert(key);
auto & cell = cells[cell_index];
bool was_inserted = cell.deadline == 0;
bool cell_was_default = cell.is_default;
cell.is_default = true;
if (was_inserted)
{
auto & cell = it->getMapped();
if constexpr (std::is_same_v<KeyType, StringRef>)
cell.key = copyStringInArena(key);
else
cell.key = key;
setCellDeadline(cell, now);
for (size_t attribute_index = 0; attribute_index < attributes.size(); ++attribute_index)
{
getAttributeContainer(attribute_index, [&](auto & container)
{
container.emplace_back();
cell.element_index = container.size() - 1;
});
}
if (cell.place_for_serialized_columns)
arena.free(cell.place_for_serialized_columns, cell.allocated_size_for_columns);
cell.allocated_size_for_columns = 0;
cell.place_for_serialized_columns = nullptr;
++size;
}
else
{
Cell cell;
for (size_t attribute_index = 0; attribute_index < attributes.size(); ++attribute_index)
{
getAttributeContainer(attribute_index, [&](const auto & container)
{
using ElementType = std::decay_t<decltype(container[0])>;
setCellDeadline(cell, now);
cell.allocated_size_for_columns = 0;
cell.place_for_serialized_columns = nullptr;
if constexpr (std::is_same_v<ElementType, StringRef>)
{
if (!cell_was_default)
{
StringRef previous_value = container[cell.element_index];
arena.free(const_cast<char *>(previous_value.data), previous_value.size);
}
}
});
}
insertCellInCache(key, cell);
if (cell.key != key)
{
if constexpr (std::is_same_v<KeyType, StringRef>)
{
char * data = const_cast<char *>(cell.key.data);
arena.free(data, cell.key.size);
cell.key = copyStringInArena(key);
}
else
cell.key = key;
}
}
setCellDeadline(cell, now);
}
}
PaddedPODArray<KeyType> getCachedKeysImpl() const
{
PaddedPODArray<KeyType> result;
result.reserve(cache.size());
result.reserve(size);
for (auto & node : cache)
for (auto & cell : cells)
{
auto & cell = node.getMapped();
if (cell.isDefault())
if (cell.deadline == 0)
continue;
result.emplace_back(node.getKey());
if (cell.is_default)
continue;
result.emplace_back(cell.key);
}
return result;
}
template <typename GetContainerFunc>
void getAttributeContainer(size_t attribute_index, GetContainerFunc && func)
{
auto & attribute = attributes[attribute_index];
auto & attribute_type = attribute.type;
if (unlikely(attribute.is_complex_type))
{
auto & container = std::get<std::vector<Field>>(attribute.attribute_container);
std::forward<GetContainerFunc>(func)(container);
}
else
{
auto type_call = [&](const auto & dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using ValueType = DictionaryValueType<AttributeType>;
auto & container = std::get<PaddedPODArray<ValueType>>(attribute.attribute_container);
std::forward<GetContainerFunc>(func)(container);
};
callOnDictionaryAttributeType(attribute_type, type_call);
}
}
template <typename GetContainerFunc>
void getAttributeContainer(size_t attribute_index, GetContainerFunc && func) const
{
return const_cast<std::decay_t<decltype(*this)> *>(this)->template getAttributeContainer(attribute_index, std::forward<GetContainerFunc>(func));
}
StringRef copyStringInArena(StringRef value_to_copy)
{
size_t value_to_copy_size = value_to_copy.size;
char * place_for_key = arena.alloc(value_to_copy_size);
memcpy(reinterpret_cast<void *>(place_for_key), reinterpret_cast<const void *>(value_to_copy.data), value_to_copy_size);
StringRef updated_value{place_for_key, value_to_copy_size};
return updated_value;
}
void setup(const DictionaryStructure & dictionary_structure)
{
/// For each dictionary attribute create storage attribute
/// For simple attributes create PODArray, for complex vector of Fields
attributes.reserve(dictionary_structure.attributes.size());
for (const auto & dictionary_attribute : dictionary_structure.attributes)
{
auto attribute_type = dictionary_attribute.underlying_type;
auto type_call = [&](const auto & dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using ValueType = DictionaryValueType<AttributeType>;
attributes.emplace_back();
auto & last_attribute = attributes.back();
last_attribute.type = attribute_type;
last_attribute.is_complex_type = dictionary_attribute.is_nullable || dictionary_attribute.is_array;
if (dictionary_attribute.is_nullable)
last_attribute.attribute_container = std::vector<Field>();
else
last_attribute.attribute_container = PaddedPODArray<ValueType>();
};
callOnDictionaryAttributeType(attribute_type, type_call);
}
}
using TimePoint = std::chrono::system_clock::time_point;
struct Cell
{
TimePoint deadline;
size_t allocated_size_for_columns;
char * place_for_serialized_columns;
inline bool isDefault() const { return place_for_serialized_columns == nullptr; }
inline void setDefault()
{
place_for_serialized_columns = nullptr;
allocated_size_for_columns = 0;
}
KeyType key;
size_t element_index;
bool is_default;
time_t deadline;
};
void insertCellInCache(KeyType & key, const Cell & cell)
struct Attribute
{
if constexpr (dictionary_key_type == DictionaryKeyType::complex)
{
/// Copy complex key into arena and put in cache
size_t key_size = key.size;
char * place_for_key = arena.alloc(key_size);
memcpy(reinterpret_cast<void *>(place_for_key), reinterpret_cast<const void *>(key.data), key_size);
KeyType updated_key{place_for_key, key_size};
key = updated_key;
}
AttributeUnderlyingType type;
bool is_complex_type;
cache.insert(key, cell);
}
std::variant<
PaddedPODArray<UInt8>,
PaddedPODArray<UInt16>,
PaddedPODArray<UInt32>,
PaddedPODArray<UInt64>,
PaddedPODArray<UInt128>,
PaddedPODArray<Int8>,
PaddedPODArray<Int16>,
PaddedPODArray<Int32>,
PaddedPODArray<Int64>,
PaddedPODArray<Decimal32>,
PaddedPODArray<Decimal64>,
PaddedPODArray<Decimal128>,
PaddedPODArray<Float32>,
PaddedPODArray<Float64>,
PaddedPODArray<StringRef>,
std::vector<Field>> attribute_container;
};
inline static bool cellHasDeadline(const Cell & cell)
{
return cell.deadline != std::chrono::system_clock::from_time_t(0);
}
CacheDictionaryStorageConfiguration configuration;
pcg64 rnd_engine;
size_t size_overlap_mask = 0;
size_t size = 0;
PaddedPODArray<Cell> cells;
ArenaWithFreeLists arena;
std::vector<Attribute> attributes;
inline void setCellDeadline(Cell & cell, TimePoint now)
{
if (configuration.lifetime.min_sec == 0 && configuration.lifetime.max_sec == 0)
{
cell.deadline = std::chrono::system_clock::from_time_t(0);
/// This maybe not obvious, but when we define is this cell is expired or expired permanently, we add strict_max_lifetime_seconds
/// to the expiration time. And it overflows pretty well.
auto deadline = std::chrono::time_point<std::chrono::system_clock>::max() - 2 * std::chrono::seconds(configuration.strict_max_lifetime_seconds);
cell.deadline = std::chrono::system_clock::to_time_t(deadline);
return;
}
@ -375,44 +638,75 @@ private:
size_t max_sec_lifetime = configuration.lifetime.max_sec;
std::uniform_int_distribution<UInt64> distribution{min_sec_lifetime, max_sec_lifetime};
cell.deadline = now + std::chrono::seconds(distribution(rnd_engine));
auto deadline = now + std::chrono::seconds(distribution(rnd_engine));
cell.deadline = std::chrono::system_clock::to_time_t(deadline);
}
template <typename>
friend class ArenaCellDisposer;
CacheDictionaryStorageConfiguration configuration;
ArenaWithFreeLists arena;
pcg64 rnd_engine;
class ArenaCellDisposer
inline size_t getCellIndex(const KeyType key) const
{
public:
ArenaWithFreeLists & arena;
const size_t hash = DefaultHash<KeyType>()(key);
const size_t index = hash & size_overlap_mask;
return index;
}
template <typename Key, typename Value>
void operator()(const Key & key, const Value & value) const
using KeyStateAndCellIndex = std::pair<KeyState::State, size_t>;
inline KeyStateAndCellIndex getKeyStateAndCellIndex(const KeyType key, const time_t now) const
{
size_t place_value = getCellIndex(key);
const size_t place_value_end = place_value + max_collision_length;
time_t max_lifetime_seconds = static_cast<time_t>(configuration.strict_max_lifetime_seconds);
for (; place_value < place_value_end; ++place_value)
{
/// In case of complex key we keep it in arena
if constexpr (std::is_same_v<Key, StringRef>)
arena.free(const_cast<char *>(key.data), key.size);
const auto cell_place_value = place_value & size_overlap_mask;
const auto & cell = cells[cell_place_value];
if (value.place_for_serialized_columns)
arena.free(value.place_for_serialized_columns, value.allocated_size_for_columns);
if (cell.key != key)
continue;
if (unlikely(now > cell.deadline + max_lifetime_seconds))
return std::make_pair(KeyState::not_found, cell_place_value);
if (unlikely(now > cell.deadline))
return std::make_pair(KeyState::expired, cell_place_value);
return std::make_pair(KeyState::found, cell_place_value);
}
};
using SimpleKeyLRUHashMap = LRUHashMap<UInt64, Cell, ArenaCellDisposer>;
using ComplexKeyLRUHashMap = LRUHashMapWithSavedHash<StringRef, Cell, ArenaCellDisposer>;
return std::make_pair(KeyState::not_found, place_value & size_overlap_mask);
}
using CacheLRUHashMap = std::conditional_t<
dictionary_key_type == DictionaryKeyType::simple,
SimpleKeyLRUHashMap,
ComplexKeyLRUHashMap>;
inline size_t getCellIndexForInsert(const KeyType & key) const
{
size_t place_value = getCellIndex(key);
const size_t place_value_end = place_value + max_collision_length;
size_t oldest_place_value = place_value;
CacheLRUHashMap cache;
time_t oldest_time = std::numeric_limits<time_t>::max();
for (; place_value < place_value_end; ++place_value)
{
const size_t cell_place_value = place_value & size_overlap_mask;
const Cell cell = cells[cell_place_value];
if (cell.deadline == 0)
return cell_place_value;
if (cell.key == key)
return cell_place_value;
if (cell.deadline < oldest_time)
{
oldest_time = cell.deadline;
oldest_place_value = cell_place_value;
}
}
return oldest_place_value;
}
};
}

View File

@ -12,9 +12,9 @@ struct KeyState
{
enum State: uint8_t
{
not_found = 2,
expired = 4,
found = 8,
not_found = 0,
expired = 1,
found = 2,
};
KeyState(State state_, size_t fetched_column_index_)
@ -31,9 +31,10 @@ struct KeyState
inline bool isNotFound() const { return state == State::not_found; }
inline bool isDefault() const { return is_default; }
inline void setDefault() { is_default = true; }
inline void setDefaultValue(bool is_default_value) { is_default = is_default_value; }
/// Valid only if keyState is found or expired
inline size_t getFetchedColumnIndex() const { return fetched_column_index; }
inline void setFetchedColumnIndex(size_t fetched_column_index_value) { fetched_column_index = fetched_column_index_value; }
private:
State state = not_found;
size_t fetched_column_index = 0;
@ -111,8 +112,8 @@ public:
/// Return size of keys in storage
virtual size_t getSize() const = 0;
/// Return maximum size of keys in storage
virtual size_t getMaxSize() const = 0;
/// Returns storage load factor
virtual double getLoadFactor() const = 0;
/// Return bytes allocated in storage
virtual size_t getBytesAllocated() const = 0;

View File

@ -17,7 +17,7 @@
#include <Common/Arena.h>
#include <Common/ArenaWithFreeLists.h>
#include <Common/MemorySanitizer.h>
#include <Common/HashTable/LRUHashMap.h>
#include <Common/HashTable/HashMap.h>
#include <IO/AIO.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/ICacheDictionaryStorage.h>
@ -56,7 +56,6 @@ struct SSDCacheDictionaryStorageConfiguration
const std::string file_path;
const size_t max_partitions_count;
const size_t max_stored_keys;
const size_t block_size;
const size_t file_blocks_size;
const size_t read_buffer_blocks_size;
@ -127,7 +126,7 @@ public:
/// Reset block with new block_data
/// block_data must be filled with zeroes if it is new block
ALWAYS_INLINE inline void reset(char * new_block_data)
inline void reset(char * new_block_data)
{
block_data = new_block_data;
current_block_offset = block_header_size;
@ -135,13 +134,13 @@ public:
}
/// Check if it is enough place to write key in block
ALWAYS_INLINE inline bool enoughtPlaceToWriteKey(const SSDCacheSimpleKey & cache_key) const
inline bool enoughtPlaceToWriteKey(const SSDCacheSimpleKey & cache_key) const
{
return (current_block_offset + (sizeof(cache_key.key) + sizeof(cache_key.size) + cache_key.size)) <= block_size;
}
/// Check if it is enough place to write key in block
ALWAYS_INLINE inline bool enoughtPlaceToWriteKey(const SSDCacheComplexKey & cache_key) const
inline bool enoughtPlaceToWriteKey(const SSDCacheComplexKey & cache_key) const
{
const StringRef & key = cache_key.key;
size_t complex_key_size = sizeof(key.size) + key.size;
@ -152,7 +151,7 @@ public:
/// Write key and returns offset in ssd cache block where data is written
/// It is client responsibility to check if there is enough place in block to write key
/// Returns true if key was written and false if there was not enough place to write key
ALWAYS_INLINE inline bool writeKey(const SSDCacheSimpleKey & cache_key, size_t & offset_in_block)
inline bool writeKey(const SSDCacheSimpleKey & cache_key, size_t & offset_in_block)
{
assert(cache_key.size > 0);
@ -181,7 +180,7 @@ public:
return true;
}
ALWAYS_INLINE inline bool writeKey(const SSDCacheComplexKey & cache_key, size_t & offset_in_block)
inline bool writeKey(const SSDCacheComplexKey & cache_key, size_t & offset_in_block)
{
assert(cache_key.size > 0);
@ -216,20 +215,20 @@ public:
return true;
}
ALWAYS_INLINE inline size_t getKeysSize() const { return keys_size; }
inline size_t getKeysSize() const { return keys_size; }
/// Write keys size into block header
ALWAYS_INLINE inline void writeKeysSize()
inline void writeKeysSize()
{
char * keys_size_offset_data = block_data + block_header_check_sum_size;
std::memcpy(keys_size_offset_data, &keys_size, sizeof(size_t));
}
/// Get check sum from block header
ALWAYS_INLINE inline size_t getCheckSum() const { return unalignedLoad<size_t>(block_data); }
inline size_t getCheckSum() const { return unalignedLoad<size_t>(block_data); }
/// Calculate check sum in block
ALWAYS_INLINE inline size_t calculateCheckSum() const
inline size_t calculateCheckSum() const
{
size_t calculated_check_sum = static_cast<size_t>(CityHash_v1_0_2::CityHash64(block_data + block_header_check_sum_size, block_size - block_header_check_sum_size));
@ -237,7 +236,7 @@ public:
}
/// Check if check sum from block header matched calculated check sum in block
ALWAYS_INLINE inline bool checkCheckSum() const
inline bool checkCheckSum() const
{
size_t calculated_check_sum = calculateCheckSum();
size_t check_sum = getCheckSum();
@ -246,16 +245,16 @@ public:
}
/// Write check sum in block header
ALWAYS_INLINE inline void writeCheckSum()
inline void writeCheckSum()
{
size_t check_sum = static_cast<size_t>(CityHash_v1_0_2::CityHash64(block_data + block_header_check_sum_size, block_size - block_header_check_sum_size));
std::memcpy(block_data, &check_sum, sizeof(size_t));
}
ALWAYS_INLINE inline size_t getBlockSize() const { return block_size; }
inline size_t getBlockSize() const { return block_size; }
/// Returns block data
ALWAYS_INLINE inline char * getBlockData() const { return block_data; }
inline char * getBlockData() const { return block_data; }
/// Read keys that were serialized in block
/// It is client responsibility to ensure that simple or complex keys were written in block
@ -337,9 +336,7 @@ inline bool operator==(const SSDCacheIndex & lhs, const SSDCacheIndex & rhs)
return lhs.block_index == rhs.block_index && lhs.offset_in_block == rhs.offset_in_block;
}
/** SSDCacheMemoryBuffer initialized with block size and memory buffer blocks size.
* Allocate block_size * memory_buffer_blocks_size bytes with page alignment.
* Logically represents multiple memory_buffer_blocks_size blocks and current write block.
/** Logically represents multiple memory_buffer_blocks_size SSDCacheBlocks and current write block.
* If key cannot be written into current_write_block, current block keys size and check summ is written
* and buffer increase index of current_write_block_index.
* If current_write_block_index == memory_buffer_blocks_size write key will always returns true.
@ -444,7 +441,7 @@ private:
size_t current_block_index = 0;
};
/// TODO: Add documentation
/// Logically represents multiple memory_buffer_blocks_size SSDCacheBlocks on file system
template <typename SSDCacheKeyType>
class SSDCacheFileBuffer : private boost::noncopyable
{
@ -614,11 +611,13 @@ public:
}
template <typename FetchBlockFunc>
ALWAYS_INLINE void fetchBlocks(char * read_buffer, size_t read_from_file_buffer_blocks_size, const PaddedPODArray<size_t> & blocks_to_fetch, FetchBlockFunc && func) const
void fetchBlocks(size_t read_from_file_buffer_blocks_size, const PaddedPODArray<size_t> & blocks_to_fetch, FetchBlockFunc && func) const
{
if (blocks_to_fetch.empty())
return;
Memory<Allocator<true>> read_buffer(read_from_file_buffer_blocks_size * block_size, 4096);
size_t blocks_to_fetch_size = blocks_to_fetch.size();
PaddedPODArray<iocb> requests;
@ -631,7 +630,7 @@ public:
{
iocb request{};
char * buffer_place = read_buffer + block_size * (block_to_fetch_index % read_from_file_buffer_blocks_size);
char * buffer_place = read_buffer.data() + block_size * (block_to_fetch_index % read_from_file_buffer_blocks_size);
#if defined(__FreeBSD__)
request.aio.aio_lio_opcode = LIO_READ;
@ -751,7 +750,7 @@ private:
int fd = -1;
};
ALWAYS_INLINE inline static int preallocateDiskSpace(int fd, size_t offset, size_t len)
inline static int preallocateDiskSpace(int fd, size_t offset, size_t len)
{
#if defined(__FreeBSD__)
return posix_fallocate(fd, offset, len);
@ -760,7 +759,7 @@ private:
#endif
}
ALWAYS_INLINE inline static char * getRequestBuffer(const iocb & request)
inline static char * getRequestBuffer(const iocb & request)
{
char * result = nullptr;
@ -773,7 +772,7 @@ private:
return result;
}
ALWAYS_INLINE inline static ssize_t eventResult(io_event & event)
inline static ssize_t eventResult(io_event & event)
{
ssize_t bytes_written;
@ -795,7 +794,13 @@ private:
size_t current_blocks_size = 0;
};
/// TODO: Add documentation
/** ICacheDictionaryStorage implementation that keeps column data serialized in memory index and in disk partitions.
* Data is first written in memory buffer.
* If memory buffer is full then buffer is flushed to disk partition.
* If memory buffer cannot be flushed to associated disk partition, then if partition
* can be allocated (current partition index < max_partitions_size) storage allocates new partition, if not old partitions are reused.
* Index maps key to partition block and offset.
*/
template <DictionaryKeyType dictionary_key_type>
class SSDCacheDictionaryStorage final : public ICacheDictionaryStorage
{
@ -806,9 +811,7 @@ public:
explicit SSDCacheDictionaryStorage(const SSDCacheDictionaryStorageConfiguration & configuration_)
: configuration(configuration_)
, file_buffer(configuration_.file_path, configuration.block_size, configuration.file_blocks_size)
, read_from_file_buffer(configuration_.block_size * configuration_.read_buffer_blocks_size, 4096)
, rnd_engine(randomSeed())
, index(configuration.max_stored_keys, false, { complex_key_arena })
{
memory_buffer_partitions.emplace_back(configuration.block_size, configuration.write_buffer_blocks_size);
}
@ -897,14 +900,31 @@ public:
size_t getSize() const override { return index.size(); }
size_t getMaxSize() const override {return index.getMaxSize(); }
double getLoadFactor() const override
{
size_t partitions_size = memory_buffer_partitions.size();
if (partitions_size == configuration.max_partitions_count)
return 1.0;
auto & current_memory_partition = memory_buffer_partitions[current_partition_index];
size_t full_partitions = partitions_size - 1;
size_t blocks_in_memory = (full_partitions * configuration.write_buffer_blocks_size) + current_memory_partition.getCurrentBlockIndex();
size_t blocks_on_disk = file_buffer.getCurrentBlockIndex();
size_t max_blocks_size = (configuration.file_blocks_size + configuration.write_buffer_blocks_size) * configuration.max_partitions_count;
double load_factor = static_cast<double>(blocks_in_memory + blocks_on_disk) / max_blocks_size;
return load_factor;
}
size_t getBytesAllocated() const override
{
size_t memory_partitions_bytes_size = memory_buffer_partitions.size() * configuration.write_buffer_blocks_size * configuration.block_size;
size_t file_partitions_bytes_size = memory_buffer_partitions.size() * configuration.file_blocks_size * configuration.block_size;
return index.getSizeInBytes() + memory_partitions_bytes_size + file_partitions_bytes_size;
return index.getBufferSizeInBytes() + memory_partitions_bytes_size + file_partitions_bytes_size;
}
private:
@ -920,8 +940,7 @@ private:
default_value
};
TimePoint deadline;
time_t deadline;
SSDCacheIndex index;
size_t in_memory_partition_index;
CellState state;
@ -933,13 +952,12 @@ private:
struct KeyToBlockOffset
{
KeyToBlockOffset(size_t key_index_, size_t offset_in_block_, bool is_expired_)
: key_index(key_index_), offset_in_block(offset_in_block_), is_expired(is_expired_)
KeyToBlockOffset(size_t key_index_, size_t offset_in_block_)
: key_index(key_index_), offset_in_block(offset_in_block_)
{}
size_t key_index = 0;
size_t offset_in_block = 0;
bool is_expired = false;
};
template <typename Result>
@ -950,20 +968,24 @@ private:
Result result;
result.fetched_columns = fetch_request.makeAttributesResultColumns();
result.key_index_to_state.resize_fill(keys.size(), {KeyState::not_found});
result.key_index_to_state.resize_fill(keys.size());
const auto now = std::chrono::system_clock::now();
const time_t now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
size_t fetched_columns_index = 0;
using BlockIndexToKeysMap = std::unordered_map<size_t, std::vector<KeyToBlockOffset>, DefaultHash<size_t>>;
using BlockIndexToKeysMap = absl::flat_hash_map<size_t, PaddedPODArray<KeyToBlockOffset>, DefaultHash<size_t>>;
BlockIndexToKeysMap block_to_keys_map;
absl::flat_hash_set<size_t, DefaultHash<size_t>> unique_blocks_to_request;
PaddedPODArray<size_t> blocks_to_request;
std::chrono::seconds strict_max_lifetime_seconds(configuration.strict_max_lifetime_seconds);
time_t strict_max_lifetime_seconds = static_cast<time_t>(configuration.strict_max_lifetime_seconds);
size_t keys_size = keys.size();
for (size_t attribute_size = 0; attribute_size < fetch_request.attributesSize(); ++attribute_size)
if (fetch_request.shouldFillResultColumnWithIndex(attribute_size))
result.fetched_columns[attribute_size]->reserve(keys_size);
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
auto key = keys[key_index];
@ -978,9 +1000,7 @@ private:
const auto & cell = it->getMapped();
bool has_deadline = cellHasDeadline(cell);
if (has_deadline && now > cell.deadline + strict_max_lifetime_seconds)
if (unlikely(now > cell.deadline + strict_max_lifetime_seconds))
{
++result.not_found_keys_size;
continue;
@ -989,14 +1009,14 @@ private:
bool cell_is_expired = false;
KeyState::State key_state = KeyState::found;
if (has_deadline && now > cell.deadline)
if (now > cell.deadline)
{
cell_is_expired = true;
key_state = KeyState::expired;
}
result.expired_keys_size += cell_is_expired;
result.found_keys_size += !cell_is_expired;
result.expired_keys_size += static_cast<size_t>(cell_is_expired);
result.found_keys_size += static_cast<size_t>(!cell_is_expired);
switch (cell.state)
{
@ -1012,13 +1032,20 @@ private:
}
case Cell::on_disk:
{
block_to_keys_map[cell.index.block_index].emplace_back(key_index, cell.index.offset_in_block, cell_is_expired);
PaddedPODArray<KeyToBlockOffset> & keys_block = block_to_keys_map[cell.index.block_index];
keys_block.emplace_back(key_index, cell.index.offset_in_block);
if (!unique_blocks_to_request.contains(cell.index.block_index))
{
KeyState::State state = cell_is_expired ? KeyState::expired : KeyState::found;
/// Fetched column index will be set later during fetch blocks
result.key_index_to_state[key_index] = {state, 0};
auto insert_result = unique_blocks_to_request.insert(cell.index.block_index);
bool was_inserted = insert_result.second;
if (was_inserted)
blocks_to_request.emplace_back(cell.index.block_index);
unique_blocks_to_request.insert(cell.index.block_index);
}
break;
}
case Cell::default_value:
@ -1037,7 +1064,7 @@ private:
/// Sort blocks by offset before start async io requests
std::sort(blocks_to_request.begin(), blocks_to_request.end());
file_buffer.fetchBlocks(read_from_file_buffer.m_data, configuration.read_buffer_blocks_size, blocks_to_request, [&](size_t block_index, char * block_data)
file_buffer.fetchBlocks(configuration.read_buffer_blocks_size, blocks_to_request, [&](size_t block_index, char * block_data)
{
auto & keys_in_block = block_to_keys_map[block_index];
@ -1046,10 +1073,7 @@ private:
char * key_data = block_data + key_in_block.offset_in_block;
deserializeAndInsertIntoColumns(result.fetched_columns, fetch_request, key_data);
if (key_in_block.is_expired)
result.key_index_to_state[key_in_block.key_index] = {KeyState::expired, fetched_columns_index};
else
result.key_index_to_state[key_in_block.key_index] = {KeyState::found, fetched_columns_index};
result.key_index_to_state[key_in_block.key_index].setFetchedColumnIndex(fetched_columns_index);
++fetched_columns_index;
}
@ -1087,7 +1111,7 @@ private:
throw Exception("Serialized columns size is greater than allowed block size and metadata", ErrorCodes::UNSUPPORTED_METHOD);
/// We cannot reuse place that is already allocated in file or memory cache so we erase key from index
index.erase(key);
eraseKeyFromIndex(key);
Cell cell;
setCellDeadline(cell, now);
@ -1114,8 +1138,7 @@ private:
for (auto key : keys)
{
/// We cannot reuse place that is already allocated in file or memory cache so we erase key from index
index.erase(key);
eraseKeyFromIndex(key);
Cell cell;
@ -1135,7 +1158,7 @@ private:
key = updated_key;
}
index.insert(key, cell);
index[key] = cell;
}
}
@ -1188,7 +1211,7 @@ private:
cell.index = cache_index;
cell.in_memory_partition_index = current_partition_index;
index.insert(ssd_cache_key.key, cell);
index[ssd_cache_key.key] = cell;
break;
}
else
@ -1218,7 +1241,7 @@ private:
if (old_key_cell.isOnDisk() &&
old_key_block >= block_index_in_file_before_write &&
old_key_block < file_read_end_block_index)
index.erase(old_key);
eraseKeyFromIndex(old_key);
}
}
}
@ -1271,7 +1294,7 @@ private:
cell.index = cache_index;
cell.in_memory_partition_index = current_partition_index;
index.insert(ssd_cache_key.key, cell);
index[ssd_cache_key.key] = cell;
break;
}
else
@ -1296,16 +1319,12 @@ private:
}
}
inline static bool cellHasDeadline(const Cell & cell)
{
return cell.deadline != std::chrono::system_clock::from_time_t(0);
}
inline void setCellDeadline(Cell & cell, TimePoint now)
{
if (configuration.lifetime.min_sec == 0 && configuration.lifetime.max_sec == 0)
{
cell.deadline = std::chrono::system_clock::from_time_t(0);
auto deadline = std::chrono::time_point<std::chrono::system_clock>::max() - 2 * std::chrono::seconds(configuration.strict_max_lifetime_seconds);
cell.deadline = std::chrono::system_clock::to_time_t(deadline);
return;
}
@ -1313,47 +1332,45 @@ private:
size_t max_sec_lifetime = configuration.lifetime.max_sec;
std::uniform_int_distribution<UInt64> distribution{min_sec_lifetime, max_sec_lifetime};
cell.deadline = now + std::chrono::seconds{distribution(rnd_engine)};
auto deadline = now + std::chrono::seconds(distribution(rnd_engine));
cell.deadline = std::chrono::system_clock::to_time_t(deadline);
}
template <typename>
friend class ArenaCellKeyDisposer;
inline void eraseKeyFromIndex(KeyType key)
{
auto it = index.find(key);
if (it == nullptr)
return;
/// In case of complex key in arena key is serialized from hash table
KeyType key_copy = it->getKey();
index.erase(key);
if constexpr (std::is_same_v<KeyType, StringRef>)
complex_key_arena.free(const_cast<char *>(key_copy.data), key_copy.size);
}
SSDCacheDictionaryStorageConfiguration configuration;
SSDCacheFileBuffer<SSDCacheKeyType> file_buffer;
Memory<Allocator<true>> read_from_file_buffer;
std::vector<SSDCacheMemoryBuffer<SSDCacheKeyType>> memory_buffer_partitions;
pcg64 rnd_engine;
class ArenaCellKeyDisposer
{
public:
ArenaWithFreeLists & arena;
using SimpleKeyHashMap = HashMap<UInt64, Cell>;
using ComplexKeyHashMap = HashMapWithSavedHash<StringRef, Cell>;
template <typename Key, typename Value>
void operator()(const Key & key, const Value &) const
{
/// In case of complex key we keep it in arena
if constexpr (std::is_same_v<Key, StringRef>)
arena.free(const_cast<char *>(key.data), key.size);
}
};
using SimpleKeyLRUHashMap = LRUHashMap<UInt64, Cell, ArenaCellKeyDisposer>;
using ComplexKeyLRUHashMap = LRUHashMapWithSavedHash<StringRef, Cell, ArenaCellKeyDisposer>;
using CacheLRUHashMap = std::conditional_t<
using CacheMap = std::conditional_t<
dictionary_key_type == DictionaryKeyType::simple,
SimpleKeyLRUHashMap,
ComplexKeyLRUHashMap>;
SimpleKeyHashMap,
ComplexKeyHashMap>;
ArenaWithFreeLists complex_key_arena;
CacheLRUHashMap index;
CacheMap index;
size_t current_partition_index = 0;

View File

@ -1,154 +0,0 @@
clickhouse-client --query="DROP TABLE IF EXISTS simple_cache_dictionary_table_source";
clickhouse-client --query="CREATE TABLE simple_cache_dictionary_table_source (id UInt64, value1 String, value2 UInt64, value3 String, value4 Float64, value5 Decimal64(4)) ENGINE=TinyLog;"
clickhouse-client --query="INSERT INTO simple_cache_dictionary_table_source SELECT number, concat('Value1 ', toString(number)), number, concat('Value3 ', toString(number)), toFloat64(number), cast(number, 'Decimal64(4)') FROM system.numbers LIMIT 1000000;"
clickhouse-client --multiquery --query="CREATE DICTIONARY clickhouse_simple_cache_dictionary (
id UInt64,
value1 String,
value2 UInt64,
value3 String,
value4 Float64,
value5 Decimal64(4)
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'simple_cache_dictionary_table_source' PASSWORD '' DB 'default'))
LIFETIME(MIN 300 MAX 300)
LAYOUT(CACHE(SIZE_IN_CELLS 100000));"
clickhouse-client --multiquery --query="CREATE DICTIONARY clickhouse_ssd_simple_cache_dictionary (
id UInt64,
value1 String,
value2 UInt64,
value3 String,
value4 Float64,
value5 Decimal64(4)
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'simple_cache_dictionary_table_source' PASSWORD '' DB 'default'))
LIFETIME(MIN 300 MAX 300)
LAYOUT(SSD_CACHE(BLOCK_SIZE 4096 FILE_SIZE 16777216 READ_BUFFER_SIZE 1048576 WRITE_BUFFER_SIZE 327680 MAX_STORED_KEYS 1048576 PATH '/opt/mkita/ClickHouse/build_release/programs/ssd_cache'));"
clickhouse-client --multiquery --query="CREATE DICTIONARY clickhouse_dummy_simple_cache_dictionary (
id UInt64,
value1 String,
value2 UInt64,
value3 String,
value4 Float64,
value5 Decimal64(4)
)
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'simple_cache_dictionary_table_source' PASSWORD '' DB 'default'))
LIFETIME(MIN 300 MAX 300)
LAYOUT(DUMMY_SIMPLE());"
./clickhouse-benchmark --query="SELECT
dictGet('default.clickhouse_dummy_simple_cache_dictionary', 'value1', number),
dictGet('default.clickhouse_dummy_simple_cache_dictionary', 'value2', number),
dictGet('default.clickhouse_dummy_simple_cache_dictionary', 'value3', number),
dictGet('default.clickhouse_dummy_simple_cache_dictionary', 'value4', number),
dictGet('default.clickhouse_dummy_simple_cache_dictionary', 'value5', number)
FROM system.numbers
LIMIT 10000
FORMAT Null"
./clickhouse-benchmark --query="SELECT
dictGet('default.clickhouse_simple_cache_dictionary', ('value1', 'value2', 'value3', 'value4', 'value5'), number)
FROM system.numbers
LIMIT 10000
FORMAT Null"
./clickhouse-benchmark --query="SELECT dictGet('default.clickhouse_ssd_simple_cache_dictionary', 'value1', number) FROM system.numbers_mt LIMIT 10000 FORMAT Null"
./clickhouse-benchmark --query="SELECT
dictGet('default.clickhouse_simple_cache_dictionary', 'value1', number),
dictGet('default.clickhouse_simple_cache_dictionary', 'value2', number),
dictGet('default.clickhouse_simple_cache_dictionary', 'value3', number),
dictGet('default.clickhouse_simple_cache_dictionary', 'value4', number),
dictGet('default.clickhouse_simple_cache_dictionary', 'value5', number)
FROM system.numbers
LIMIT 10000
FORMAT Null"
./clickhouse-benchmark --query="SELECT dictGet('default.clickhouse_ssd_simple_cache_dictionary', 'value1', number) FROM system.numbers_mt LIMIT 10000 FORMAT Null"
SELECT
dictGet('default.clickhouse_ssd_simple_cache_dictionary', 'value1', number),
dictGet('default.clickhouse_ssd_simple_cache_dictionary', 'value2', number),
dictGet('default.clickhouse_ssd_simple_cache_dictionary', 'value3', number),
dictGet('default.clickhouse_ssd_simple_cache_dictionary', 'value4', number),
dictGet('default.clickhouse_ssd_simple_cache_dictionary', 'value5', number)
FROM system.numbers
LIMIT 10000
FORMAT Null
SELECT dictGet('default.clickhouse_simple_cache_dictionary', ('value1', 'value2', 'value3', 'value4', 'value5'), number) FROM system.numbers LIMIT 10000 FORMAT Null
SELECT dictGet('default.clickhouse_ssd_simple_cache_dictionary', ('value1', 'value2', 'value3', 'value4', 'value5'), number) FROM system.numbers LIMIT 10000
FORMAT Null
SELECT
dictGet('default.clickhouse_simple_cache_dictionary', ('value1', 'value2', 'value3', 'value4', 'value5'), number)
FROM system.numbers
LIMIT 10000
FORMAT
Null
SELECT
dictGet('default.clickhouse_simple_cache_dictionary', 'value1', number),
dictGet('default.clickhouse_simple_cache_dictionary', 'value2', number),
dictGet('default.clickhouse_simple_cache_dictionary', 'value3', number),
dictGet('default.clickhouse_simple_cache_dictionary', 'value4', number),
dictGet('default.clickhouse_simple_cache_dictionary', 'value5', number)
FROM system.numbers
LIMIT 10000
FORMAT
Null
SELECT
dictGet('default.clickhouse_simple_cache_dictionary', 'value1', number),
dictGet('default.clickhouse_simple_cache_dictionary', 'value2', number)
FROM system.numbers
LIMIT 10000
FORMAT Null
SELECT
dictGet('clickhouse_simple_cache_dictionary', 'value1', number)
FROM system.numbers
LIMIT 100000
FORMAT Null
SELECT
dictGet('clickhouse_simple_cache_dictionary', 'value2', number)
FROM system.numbers
LIMIT 100000
FORMAT Null
SELECT
dictGet('clickhouse_simple_cache_dictionary', 'value3', number)
FROM system.numbers
LIMIT 100000
FORMAT Null
SELECT
dictGet('clickhouse_simple_cache_dictionary', 'value4', number)
FROM system.numbers
LIMIT 100000
FORMAT Null
SELECT
dictGet('clickhouse_simple_cache_dictionary', 'value5', number)
FROM system.numbers
LIMIT 100000
FORMAT Null
SELECT
dictGet('clickhouse_simple_cache_dictionary', 'value1', number),
dictGet('clickhouse_simple_cache_dictionary', 'value2', number),
dictGet('clickhouse_simple_cache_dictionary', 'value3', number),
dictGet('clickhouse_simple_cache_dictionary', 'value4', number),
dictGet('clickhouse_simple_cache_dictionary', 'value5', number)
FROM system.numbers
LIMIT 100000
FORMAT Null
SELECT * FROM clickhouse_simple_cache_dictionary_table;

View File

@ -1,6 +1,6 @@
#include "CacheDictionary.h"
#include "SSDCacheDictionaryStorage.h"
#include "CacheDictionaryStorage.h"
#include "SSDCacheDictionaryStorage.h"
#include <Dictionaries/DictionaryFactory.h>
namespace DB
@ -20,13 +20,13 @@ CacheDictionaryStorageConfiguration parseCacheStorageConfiguration(
const DictionaryLifetime & dict_lifetime,
DictionaryKeyType dictionary_key_type)
{
String dictionary_type_prefix = dictionary_key_type == DictionaryKeyType::complex ? ".complex_key_cache." : ".cache.";
String dictionary_type_prefix = (dictionary_key_type == DictionaryKeyType::complex) ? ".complex_key_cache." : ".cache.";
String dictionary_configuration_prefix = layout_prefix + dictionary_type_prefix;
const size_t size = config.getUInt64(dictionary_configuration_prefix + "size_in_cells");
if (size == 0)
throw Exception(ErrorCodes::TOO_SMALL_BUFFER_SIZE,
"({}: cache dictionary cannot have 0 cells",
"({}): cache dictionary cannot have 0 cells",
full_name);
size_t dict_lifetime_seconds = static_cast<size_t>(dict_lifetime.max_sec);
@ -59,7 +59,6 @@ SSDCacheDictionaryStorageConfiguration parseSSDCacheStorageConfiguration(
static constexpr size_t DEFAULT_READ_BUFFER_SIZE_BYTES = 16 * DEFAULT_SSD_BLOCK_SIZE_BYTES;
static constexpr size_t DEFAULT_WRITE_BUFFER_SIZE_BYTES = DEFAULT_SSD_BLOCK_SIZE_BYTES;
static constexpr size_t DEFAULT_MAX_STORED_KEYS = 100000;
static constexpr size_t DEFAULT_PARTITIONS_COUNT = 16;
const size_t max_partitions_count
@ -94,16 +93,11 @@ SSDCacheDictionaryStorageConfiguration parseSSDCacheStorageConfiguration(
if (directory_path.at(0) != '/')
directory_path = std::filesystem::path{config.getString("path")}.concat(directory_path).string();
const size_t max_stored_keys_in_partition
= config.getInt64(dictionary_configuration_prefix + "max_stored_keys", DEFAULT_MAX_STORED_KEYS);
const size_t rounded_size = roundUpToPowerOfTwoOrZero(max_stored_keys_in_partition);
SSDCacheDictionaryStorageConfiguration configuration{
strict_max_lifetime_seconds,
dict_lifetime,
directory_path,
max_partitions_count,
rounded_size,
block_size,
file_size / block_size,
read_buffer_size / block_size,
@ -194,7 +188,8 @@ DictionaryPtr createCacheDictionaryLayout(
const bool allow_read_expired_keys = config.getBool(layout_prefix + ".cache.allow_read_expired_keys", false);
auto storage_configuration = parseCacheStorageConfiguration(full_name, config, layout_prefix, dict_lifetime, dictionary_key_type);
auto storage = std::make_shared<CacheDictionaryStorage<dictionary_key_type>>(storage_configuration);
std::shared_ptr<ICacheDictionaryStorage> storage = std::make_shared<CacheDictionaryStorage<dictionary_key_type>>(dict_struct, storage_configuration);
auto update_queue_configuration = parseCacheDictionaryUpdateQueueConfiguration(full_name, config, layout_prefix, dictionary_key_type);

View File

@ -7,12 +7,12 @@ class Layout(object):
'flat': '<flat/>',
'hashed': '<hashed/>',
'cache': '<cache><size_in_cells>128</size_in_cells></cache>',
'ssd_cache': '<ssd_cache><path>/etc/clickhouse/dictionaries/all</path><max_stored_keys>128</max_stored_keys></ssd_cache>',
'ssd_cache': '<ssd_cache><path>/etc/clickhouse/dictionaries/all</path></ssd_cache>',
'complex_key_hashed': '<complex_key_hashed/>',
'complex_key_hashed_one_key': '<complex_key_hashed/>',
'complex_key_hashed_two_keys': '<complex_key_hashed/>',
'complex_key_cache': '<complex_key_cache><size_in_cells>128</size_in_cells></complex_key_cache>',
'complex_key_ssd_cache': '<complex_key_ssd_cache><path>/etc/clickhouse/dictionaries/all</path><max_stored_keys>128</max_stored_keys></complex_key_ssd_cache>',
'complex_key_ssd_cache': '<complex_key_ssd_cache><path>/etc/clickhouse/dictionaries/all</path></complex_key_ssd_cache>',
'range_hashed': '<range_hashed/>',
'direct': '<direct/>',
'complex_key_direct': '<complex_key_direct/>'

View File

@ -42,7 +42,6 @@
<read_buffer_size>131072</read_buffer_size>
<write_buffer_size>1048576</write_buffer_size>
<path>/etc/clickhouse/dictionaries/radars</path>
<max_stored_keys>1048576</max_stored_keys>
</complex_key_ssd_cache>
</layout>
<lifetime>1</lifetime>

View File

@ -76,7 +76,7 @@ CREATE DICTIONARY 01053_db.ssd_dict
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'table_for_dict' PASSWORD '' DB '01053_db'))
LIFETIME(MIN 1000 MAX 2000)
LAYOUT(SSD_CACHE(FILE_SIZE 8192 PATH '/var/lib/clickhouse/clickhouse_dicts/1d' BLOCK_SIZE 512 WRITE_BUFFER_SIZE 4096 MAX_STORED_KEYS 1000000));
LAYOUT(SSD_CACHE(FILE_SIZE 8192 PATH '/var/lib/clickhouse/clickhouse_dicts/1d' BLOCK_SIZE 512 WRITE_BUFFER_SIZE 4096));
SELECT 'UPDATE DICTIONARY';
-- 118
@ -142,7 +142,7 @@ CREATE DICTIONARY 01053_db.ssd_dict
PRIMARY KEY id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'table_for_dict' PASSWORD '' DB '01053_db'))
LIFETIME(MIN 1000 MAX 2000)
LAYOUT(SSD_CACHE(FILE_SIZE 8192 PATH '/var/lib/clickhouse/clickhouse_dicts/2d' BLOCK_SIZE 512 WRITE_BUFFER_SIZE 1024 MAX_STORED_KEYS 10));
LAYOUT(SSD_CACHE(FILE_SIZE 8192 PATH '/var/lib/clickhouse/clickhouse_dicts/2d' BLOCK_SIZE 512 WRITE_BUFFER_SIZE 1024));
SELECT 'UPDATE DICTIONARY (MT)';
-- 118

View File

@ -98,7 +98,7 @@ CREATE DICTIONARY 01280_db.ssd_dict
PRIMARY KEY k1, k2
SOURCE(CLICKHOUSE(HOST 'localhost' PORT tcpPort() USER 'default' TABLE 'table_for_dict' PASSWORD '' DB '01280_db'))
LIFETIME(MIN 1000 MAX 2000)
LAYOUT(COMPLEX_KEY_SSD_CACHE(FILE_SIZE 8192 PATH '/var/lib/clickhouse/clickhouse_dicts/1d' BLOCK_SIZE 512 WRITE_BUFFER_SIZE 4096 MAX_STORED_KEYS 1000000));
LAYOUT(COMPLEX_KEY_SSD_CACHE(FILE_SIZE 8192 PATH '/var/lib/clickhouse/clickhouse_dicts/1d' BLOCK_SIZE 512 WRITE_BUFFER_SIZE 4096));
SELECT 'UPDATE DICTIONARY';
-- 118

View File

@ -40,7 +40,7 @@ SELECT dictGetOrDefault('01681_database_for_cache_dictionary.cache_dictionary_si
SELECT 'dictHas';
SELECT dictHas('01681_database_for_cache_dictionary.cache_dictionary_simple_key_simple_attributes', number) FROM system.numbers LIMIT 4;
SELECT 'select all values as input stream';
SELECT * FROM 01681_database_for_cache_dictionary.cache_dictionary_simple_key_simple_attributes;
SELECT * FROM 01681_database_for_cache_dictionary.cache_dictionary_simple_key_simple_attributes ORDER BY id;
DROP DICTIONARY 01681_database_for_cache_dictionary.cache_dictionary_simple_key_simple_attributes;
DROP TABLE 01681_database_for_cache_dictionary.simple_key_simple_attributes_source_table;
@ -84,7 +84,7 @@ SELECT dictGetOrDefault('01681_database_for_cache_dictionary.cache_dictionary_si
SELECT 'dictHas';
SELECT dictHas('01681_database_for_cache_dictionary.cache_dictionary_simple_key_complex_attributes', number) FROM system.numbers LIMIT 4;
SELECT 'select all values as input stream';
SELECT * FROM 01681_database_for_cache_dictionary.cache_dictionary_simple_key_complex_attributes;
SELECT * FROM 01681_database_for_cache_dictionary.cache_dictionary_simple_key_complex_attributes ORDER BY id;
DROP DICTIONARY 01681_database_for_cache_dictionary.cache_dictionary_simple_key_complex_attributes;
DROP TABLE 01681_database_for_cache_dictionary.simple_key_complex_attributes_source_table;

View File

@ -42,7 +42,7 @@ SELECT dictGetOrDefault('01682_database_for_cache_dictionary.cache_dictionary_co
SELECT 'dictHas';
SELECT dictHas('01682_database_for_cache_dictionary.cache_dictionary_complex_key_simple_attributes', (number, concat('id_key_', toString(number)))) FROM system.numbers LIMIT 4;
SELECT 'select all values as input stream';
SELECT * FROM 01682_database_for_cache_dictionary.cache_dictionary_complex_key_simple_attributes;
SELECT * FROM 01682_database_for_cache_dictionary.cache_dictionary_complex_key_simple_attributes ORDER BY id;
DROP DICTIONARY 01682_database_for_cache_dictionary.cache_dictionary_complex_key_simple_attributes;
DROP TABLE 01682_database_for_cache_dictionary.complex_key_simple_attributes_source_table;
@ -89,7 +89,7 @@ SELECT dictGetOrDefault('01682_database_for_cache_dictionary.cache_dictionary_co
SELECT 'dictHas';
SELECT dictHas('01682_database_for_cache_dictionary.cache_dictionary_complex_key_complex_attributes', (number, concat('id_key_', toString(number)))) FROM system.numbers LIMIT 4;
SELECT 'select all values as input stream';
SELECT * FROM 01682_database_for_cache_dictionary.cache_dictionary_complex_key_complex_attributes;
SELECT * FROM 01682_database_for_cache_dictionary.cache_dictionary_complex_key_complex_attributes ORDER BY id;
DROP DICTIONARY 01682_database_for_cache_dictionary.cache_dictionary_complex_key_complex_attributes;
DROP TABLE 01682_database_for_cache_dictionary.complex_key_complex_attributes_source_table;

View File

@ -40,7 +40,7 @@ SELECT dictGetOrDefault('01684_database_for_cache_dictionary.cache_dictionary_si
SELECT 'dictHas';
SELECT dictHas('01684_database_for_cache_dictionary.cache_dictionary_simple_key_simple_attributes', number) FROM system.numbers LIMIT 4;
SELECT 'select all values as input stream';
SELECT * FROM 01684_database_for_cache_dictionary.cache_dictionary_simple_key_simple_attributes;
SELECT * FROM 01684_database_for_cache_dictionary.cache_dictionary_simple_key_simple_attributes ORDER BY id;
DROP DICTIONARY 01684_database_for_cache_dictionary.cache_dictionary_simple_key_simple_attributes;
DROP TABLE 01684_database_for_cache_dictionary.simple_key_simple_attributes_source_table;
@ -84,7 +84,7 @@ SELECT dictGetOrDefault('01684_database_for_cache_dictionary.cache_dictionary_si
SELECT 'dictHas';
SELECT dictHas('01684_database_for_cache_dictionary.cache_dictionary_simple_key_complex_attributes', number) FROM system.numbers LIMIT 4;
SELECT 'select all values as input stream';
SELECT * FROM 01684_database_for_cache_dictionary.cache_dictionary_simple_key_complex_attributes;
SELECT * FROM 01684_database_for_cache_dictionary.cache_dictionary_simple_key_complex_attributes ORDER BY id;
DROP DICTIONARY 01684_database_for_cache_dictionary.cache_dictionary_simple_key_complex_attributes;
DROP TABLE 01684_database_for_cache_dictionary.simple_key_complex_attributes_source_table;

View File

@ -42,7 +42,7 @@ SELECT dictGetOrDefault('01685_database_for_cache_dictionary.cache_dictionary_co
SELECT 'dictHas';
SELECT dictHas('01685_database_for_cache_dictionary.cache_dictionary_complex_key_simple_attributes', (number, concat('id_key_', toString(number)))) FROM system.numbers LIMIT 4;
SELECT 'select all values as input stream';
SELECT * FROM 01685_database_for_cache_dictionary.cache_dictionary_complex_key_simple_attributes;
SELECT * FROM 01685_database_for_cache_dictionary.cache_dictionary_complex_key_simple_attributes ORDER BY id;
DROP DICTIONARY 01685_database_for_cache_dictionary.cache_dictionary_complex_key_simple_attributes;
DROP TABLE 01685_database_for_cache_dictionary.complex_key_simple_attributes_source_table;
@ -89,10 +89,10 @@ SELECT dictGetOrDefault('01685_database_for_cache_dictionary.cache_dictionary_co
SELECT 'dictHas';
SELECT dictHas('01685_database_for_cache_dictionary.cache_dictionary_complex_key_complex_attributes', (number, concat('id_key_', toString(number)))) FROM system.numbers LIMIT 4;
SELECT 'select all values as input stream';
SELECT * FROM 01685_database_for_cache_dictionary.cache_dictionary_complex_key_complex_attributes;
SELECT * FROM 01685_database_for_cache_dictionary.cache_dictionary_complex_key_complex_attributes ORDER BY id;
DROP DICTIONARY 01685_database_for_cache_dictionary.cache_dictionary_complex_key_complex_attributes;
DROP TABLE 01685_database_for_cache_dictionary.complex_key_complex_attributes_source_table;
DROP DATABASE 01685_database_for_cache_dictionary;