ClickHouse/src/Dictionaries/CacheDictionary.cpp

1128 lines
46 KiB
C++
Raw Normal View History

#include "CacheDictionary.h"
2017-04-27 17:16:24 +00:00
#include <memory>
2021-02-16 21:33:02 +00:00
#include <ext/range.h>
#include <ext/size.h>
#include <ext/map.h>
#include <ext/chrono_io.h>
#include <Core/Defines.h>
#include <Common/BitHelpers.h>
#include <Common/CurrentMetrics.h>
#include <Common/HashTable/Hash.h>
2021-02-16 21:33:02 +00:00
#include <Common/HashTable/HashSet.h>
2017-04-08 01:32:05 +00:00
#include <Common/ProfileEvents.h>
#include <Common/ProfilingScopedRWLock.h>
2017-07-13 20:58:19 +00:00
#include <Common/typeid_cast.h>
#include <Common/setThreadName.h>
2021-02-16 21:33:02 +00:00
#include <IO/WriteBufferFromOStream.h>
#include <Dictionaries/DictionaryBlockInputStream.h>
#include <Dictionaries/CacheDictionaryStorage.h>
#include <Dictionaries/SSDCacheDictionaryStorage.h>
#include <Dictionaries/DictionaryFactory.h>
2017-04-08 01:32:05 +00:00
namespace ProfileEvents
{
extern const Event DictCacheKeysRequested;
extern const Event DictCacheKeysRequestedMiss;
extern const Event DictCacheKeysRequestedFound;
extern const Event DictCacheKeysExpired;
extern const Event DictCacheKeysNotFound;
extern const Event DictCacheKeysHit;
extern const Event DictCacheRequestTimeNs;
extern const Event DictCacheRequests;
extern const Event DictCacheLockWriteNs;
extern const Event DictCacheLockReadNs;
2017-04-08 01:32:05 +00:00
}
namespace CurrentMetrics
{
extern const Metric DictCacheRequests;
2017-04-08 01:32:05 +00:00
}
namespace DB
{
namespace ErrorCodes
{
2020-02-25 18:02:41 +00:00
extern const int CACHE_DICTIONARY_UPDATE_FAIL;
extern const int TYPE_MISMATCH;
extern const int BAD_ARGUMENTS;
extern const int UNSUPPORTED_METHOD;
extern const int TOO_SMALL_BUFFER_SIZE;
}
2021-02-16 21:33:02 +00:00
template <DictionaryKeyType dictionary_key_type>
CacheDictionary<dictionary_key_type>::CacheDictionary(
2020-07-14 18:46:29 +00:00
const StorageID & dict_id_,
2019-08-03 11:02:40 +00:00
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
2021-02-16 21:33:02 +00:00
CacheDictionaryStoragePtr cache_storage_ptr_,
CacheDictionaryUpdateQueueConfiguration update_queue_configuration_,
DictionaryLifetime dict_lifetime_,
2021-02-16 21:33:02 +00:00
bool allow_read_expired_keys_)
2020-07-14 18:46:29 +00:00
: IDictionary(dict_id_)
2019-08-03 11:02:40 +00:00
, dict_struct(dict_struct_)
, source_ptr{std::move(source_ptr_)}
2021-02-16 21:33:02 +00:00
, cache_storage_ptr(cache_storage_ptr_)
, update_queue(
dict_id_.getNameForLogs(),
update_queue_configuration_,
2021-03-01 22:23:14 +00:00
[this](CacheDictionaryUpdateUnitPtr<dictionary_key_type> unit_to_update)
2021-02-16 21:33:02 +00:00
{
update(unit_to_update);
})
2019-08-03 11:02:40 +00:00
, dict_lifetime(dict_lifetime_)
2020-05-30 21:57:37 +00:00
, log(&Poco::Logger::get("ExternalDictionaries"))
2021-02-16 21:33:02 +00:00
, allow_read_expired_keys(allow_read_expired_keys_)
, rnd_engine(randomSeed())
{
2020-07-20 13:44:07 +00:00
if (!source_ptr->supportsSelectiveLoad())
throw Exception{full_name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD};
2021-02-16 21:33:02 +00:00
setupHierarchicalAttribute();
}
2021-02-16 21:33:02 +00:00
template <DictionaryKeyType dictionary_key_type>
CacheDictionary<dictionary_key_type>::~CacheDictionary()
{
2021-02-16 21:33:02 +00:00
update_queue.stopAndWait();
}
template <DictionaryKeyType dictionary_key_type>
size_t CacheDictionary<dictionary_key_type>::getElementCount() const
{
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
return cache_storage_ptr->getSize();
}
2021-02-16 21:33:02 +00:00
template <DictionaryKeyType dictionary_key_type>
size_t CacheDictionary<dictionary_key_type>::getBytesAllocated() const
2020-08-13 10:45:06 +00:00
{
/// In case of existing string arena we check the size of it.
/// But the same appears in setAttributeValue() function, which is called from update() function
/// which in turn is called from another thread.
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
2021-02-16 21:33:02 +00:00
return cache_storage_ptr->getBytesAllocated();
}
template <DictionaryKeyType dictionary_key_type>
double CacheDictionary<dictionary_key_type>::getLoadFactor() const
{
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
return static_cast<double>(cache_storage_ptr->getSize()) / cache_storage_ptr->getMaxSize();
}
2021-02-16 21:33:02 +00:00
template <DictionaryKeyType dictionary_key_type>
std::exception_ptr CacheDictionary<dictionary_key_type>::getLastException() const
{
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
return last_exception;
2020-08-13 10:45:06 +00:00
}
2021-02-16 21:33:02 +00:00
template <DictionaryKeyType dictionary_key_type>
const IDictionarySource * CacheDictionary<dictionary_key_type>::getSource() const
2020-08-13 10:45:06 +00:00
{
/// Mutex required here because of the getSourceAndUpdateIfNeeded() function
/// which is used from another thread.
std::lock_guard lock(source_mutex);
return source_ptr.get();
}
2021-02-16 21:33:02 +00:00
template <DictionaryKeyType dictionary_key_type>
2021-02-19 13:32:26 +00:00
void CacheDictionary<dictionary_key_type>::toParent(const PaddedPODArray<UInt64> & ids [[maybe_unused]], PaddedPODArray<UInt64> & out [[maybe_unused]]) const
{
2021-02-16 21:33:02 +00:00
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
{
/// Run update on requested keys before fetch from storage
const auto & attribute_name = hierarchical_attribute->name;
auto result_type = std::make_shared<DataTypeUInt64>();
auto column = getColumnsImpl({attribute_name}, {result_type->createColumn()}, ids, {nullptr}).front();
const auto & values = assert_cast<const ColumnVector<UInt64> &>(*column);
out.assign(values.getData());
}
else
2021-02-18 16:57:20 +00:00
throw Exception("Hierarchy is not supported for complex key CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD);
}
/// Allow to use single value in same way as array.
2021-02-16 21:33:02 +00:00
static inline UInt64 getAt(const PaddedPODArray<UInt64> & arr, const size_t idx)
{
return arr[idx];
}
2021-02-16 21:33:02 +00:00
static inline UInt64 getAt(const UInt64 & value, const size_t)
{
return value;
}
2021-02-16 21:33:02 +00:00
template <DictionaryKeyType dictionary_key_type>
template <typename AncestorType>
2021-02-16 21:33:02 +00:00
void CacheDictionary<dictionary_key_type>::isInImpl(const PaddedPODArray<Key> & child_ids, const AncestorType & ancestor_ids, PaddedPODArray<UInt8> & out) const
{
/// Transform all children to parents until ancestor id or null_value will be reached.
size_t out_size = out.size();
memset(out.data(), 0xFF, out_size); /// 0xFF means "not calculated"
2021-02-16 21:33:02 +00:00
const auto null_value = hierarchical_attribute->null_value.get<UInt64>();
PaddedPODArray<Key> children(out_size, 0);
PaddedPODArray<Key> parents(child_ids.begin(), child_ids.end());
for (size_t i = 0; i < DBMS_HIERARCHICAL_DICTIONARY_MAX_DEPTH; ++i)
{
size_t out_idx = 0;
size_t parents_idx = 0;
size_t new_children_idx = 0;
while (out_idx < out_size)
{
/// Already calculated
if (out[out_idx] != 0xFF)
{
++out_idx;
continue;
}
/// No parent
if (parents[parents_idx] == null_value)
{
out[out_idx] = 0;
}
/// Found ancestor
else if (parents[parents_idx] == getAt(ancestor_ids, parents_idx))
{
out[out_idx] = 1;
}
2017-08-10 03:22:43 +00:00
/// Loop detected
else if (children[new_children_idx] == parents[parents_idx])
{
2017-08-07 19:02:30 +00:00
out[out_idx] = 1;
}
2017-08-10 03:22:43 +00:00
/// Found intermediate parent, add this value to search at next loop iteration
else
{
children[new_children_idx] = parents[parents_idx];
++new_children_idx;
}
++out_idx;
++parents_idx;
}
if (new_children_idx == 0)
break;
/// Transform all children to its parents.
children.resize(new_children_idx);
parents.resize(new_children_idx);
toParent(children, parents);
}
}
2021-02-16 21:33:02 +00:00
template <DictionaryKeyType dictionary_key_type>
void CacheDictionary<dictionary_key_type>::isInVectorVector(
const PaddedPODArray<UInt64> & child_ids, const PaddedPODArray<UInt64> & ancestor_ids, PaddedPODArray<UInt8> & out) const
{
isInImpl(child_ids, ancestor_ids, out);
}
2021-02-16 21:33:02 +00:00
template <DictionaryKeyType dictionary_key_type>
void CacheDictionary<dictionary_key_type>::isInVectorConstant(const PaddedPODArray<UInt64> & child_ids, const UInt64 ancestor_id, PaddedPODArray<UInt8> & out) const
{
isInImpl(child_ids, ancestor_id, out);
}
2021-02-16 21:33:02 +00:00
template <DictionaryKeyType dictionary_key_type>
void CacheDictionary<dictionary_key_type>::isInConstantVector(const UInt64 child_id, const PaddedPODArray<UInt64> & ancestor_ids, PaddedPODArray<UInt8> & out) const
{
/// Special case with single child value.
2021-02-16 21:33:02 +00:00
const auto null_value = hierarchical_attribute->null_value.get<UInt64>();
PaddedPODArray<Key> child(1, child_id);
PaddedPODArray<Key> parent(1);
std::vector<Key> ancestors(1, child_id);
/// Iteratively find all ancestors for child.
for (size_t i = 0; i < DBMS_HIERARCHICAL_DICTIONARY_MAX_DEPTH; ++i)
{
toParent(child, parent);
if (parent[0] == null_value)
break;
child[0] = parent[0];
ancestors.push_back(parent[0]);
}
/// Assuming short hierarchy, so linear search is Ok.
for (size_t i = 0, out_size = out.size(); i < out_size; ++i)
out[i] = std::find(ancestors.begin(), ancestors.end(), ancestor_ids[i]) != ancestors.end();
}
2021-02-16 21:33:02 +00:00
template <DictionaryKeyType dictionary_key_type>
void CacheDictionary<dictionary_key_type>::setupHierarchicalAttribute()
{
/// TODO: Move this to DictionaryStructure
2021-02-16 21:33:02 +00:00
for (const auto & attribute : dict_struct.attributes)
{
2021-02-16 21:33:02 +00:00
if (attribute.hierarchical)
{
2021-02-16 21:33:02 +00:00
hierarchical_attribute = &attribute;
2021-02-16 21:33:02 +00:00
if (attribute.underlying_type != AttributeUnderlyingType::utUInt64)
throw Exception{full_name + ": hierarchical attribute must be UInt64.", ErrorCodes::TYPE_MISMATCH};
}
}
}
2021-02-16 21:33:02 +00:00
template <DictionaryKeyType dictionary_key_type>
ColumnPtr CacheDictionary<dictionary_key_type>::getColumn(
const std::string & attribute_name,
const DataTypePtr & result_type,
const Columns & key_columns,
const DataTypes & key_types,
const ColumnPtr & default_values_column) const
{
return getColumns({attribute_name}, {result_type}, key_columns, key_types, {default_values_column}).front();
}
2021-02-16 21:33:02 +00:00
template <DictionaryKeyType dictionary_key_type>
Columns CacheDictionary<dictionary_key_type>::getColumns(
const Strings & attribute_names,
const DataTypes &,
const Columns & key_columns,
2021-02-24 17:13:36 +00:00
const DataTypes & key_types,
2021-02-16 21:33:02 +00:00
const Columns & default_values_columns) const
{
2021-02-24 17:13:36 +00:00
if (dictionary_key_type == DictionaryKeyType::complex)
dict_struct.validateKeyTypes(key_types);
2021-02-27 20:39:34 +00:00
Arena complex_keys_arena;
DictionaryKeysExtractor<dictionary_key_type> extractor(key_columns, complex_keys_arena);
2021-02-16 21:33:02 +00:00
auto & keys = extractor.getKeys();
2021-02-24 17:13:36 +00:00
2021-02-16 21:33:02 +00:00
return getColumnsImpl(attribute_names, key_columns, keys, default_values_columns);
}
2021-02-16 21:33:02 +00:00
template <DictionaryKeyType dictionary_key_type>
Columns CacheDictionary<dictionary_key_type>::getColumnsImpl(
const Strings & attribute_names,
2021-02-19 13:32:26 +00:00
const Columns & key_columns [[maybe_unused]],
2021-02-16 21:33:02 +00:00
const PaddedPODArray<KeyType> & keys,
const Columns & default_values_columns) const
{
DictionaryStorageFetchRequest request(dict_struct, attribute_names);
2021-02-16 21:33:02 +00:00
FetchResult result_of_fetch_from_storage;
{
2021-02-17 11:48:06 +00:00
/// Write lock on storage
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
2021-02-27 16:04:32 +00:00
result_of_fetch_from_storage = cache_storage_ptr->fetchColumnsForKeys(keys, request);
2021-02-16 21:33:02 +00:00
}
size_t found_keys_size = result_of_fetch_from_storage.found_keys_size;
size_t expired_keys_size = result_of_fetch_from_storage.expired_keys_size;
size_t not_found_keys_size = result_of_fetch_from_storage.not_found_keys_size;
ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, found_keys_size);
2021-02-16 21:33:02 +00:00
ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, expired_keys_size);
ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, not_found_keys_size);
2021-02-16 21:33:02 +00:00
query_count.fetch_add(keys.size());
hit_count.fetch_add(found_keys_size);
2021-02-16 21:33:02 +00:00
MutableColumns & fetched_columns_from_storage = result_of_fetch_from_storage.fetched_columns;
const PaddedPODArray<KeyState> & key_index_to_state_from_storage = result_of_fetch_from_storage.key_index_to_state;
bool source_returns_fetched_columns_in_order_of_keys = cache_storage_ptr->returnsFetchedColumnsInOrderOfRequestedKeys();
2021-02-16 21:33:02 +00:00
if (not_found_keys_size == 0 && expired_keys_size == 0)
{
/// All keys were found in storage
if (source_returns_fetched_columns_in_order_of_keys)
return request.filterRequestedColumns(fetched_columns_from_storage);
else
{
/// Reorder result from storage to requested keys indexes
MutableColumns aggregated_columns = aggregateColumnsInOrderOfKeys(
keys,
request,
fetched_columns_from_storage,
key_index_to_state_from_storage);
return request.filterRequestedColumns(aggregated_columns);
}
2021-02-16 21:33:02 +00:00
}
std::shared_ptr<CacheDictionaryUpdateUnit<dictionary_key_type>> update_unit = makeUpdateUnit(key_columns, keys, result_of_fetch_from_storage, request);
HashMap<KeyType, size_t> requested_keys_to_fetched_columns_during_update_index;
MutableColumns fetched_columns_during_update = request.makeAttributesResultColumns();
if (not_found_keys_size == 0 && expired_keys_size > 0 && allow_read_expired_keys)
2021-02-16 21:33:02 +00:00
{
/// Start async update only if allow read expired keys and all keys are found
update_queue.tryPushToUpdateQueueOrThrow(update_unit);
if (source_returns_fetched_columns_in_order_of_keys)
return request.filterRequestedColumns(fetched_columns_from_storage);
else
{
/// Reorder result from storage to requested keys indexes
MutableColumns aggregated_columns = aggregateColumnsInOrderOfKeys(
keys,
request,
fetched_columns_from_storage,
key_index_to_state_from_storage);
return request.filterRequestedColumns(aggregated_columns);
}
2021-02-16 21:33:02 +00:00
}
else
{
/// Start sync update
update_queue.tryPushToUpdateQueueOrThrow(update_unit);
update_queue.waitForCurrentUpdateFinish(update_unit);
2021-02-16 21:33:02 +00:00
requested_keys_to_fetched_columns_during_update_index = std::move(update_unit->requested_keys_to_fetched_columns_during_update_index);
fetched_columns_during_update = std::move(update_unit->fetched_columns_during_update);
}
2021-02-16 21:33:02 +00:00
std::vector<DefaultValueProvider> default_value_providers;
default_value_providers.reserve(dict_struct.attributes.size());
2021-02-16 21:33:02 +00:00
size_t default_values_column_index = 0;
for (const auto & dictionary_attribute : dict_struct.attributes)
{
2021-02-16 21:33:02 +00:00
if (request.containsAttribute(dictionary_attribute.name))
{
2021-02-16 21:33:02 +00:00
default_value_providers.emplace_back(dictionary_attribute.null_value, default_values_columns[default_values_column_index]);
++default_values_column_index;
}
2021-02-16 21:33:02 +00:00
else
default_value_providers.emplace_back(dictionary_attribute.null_value);
}
2021-02-16 21:33:02 +00:00
MutableColumns aggregated_columns = aggregateColumns(
keys,
request,
fetched_columns_from_storage,
key_index_to_state_from_storage,
2021-02-16 21:33:02 +00:00
fetched_columns_during_update,
requested_keys_to_fetched_columns_during_update_index,
default_value_providers);
return request.filterRequestedColumns(aggregated_columns);
}
2021-02-16 21:33:02 +00:00
template <DictionaryKeyType dictionary_key_type>
2021-03-01 22:23:14 +00:00
ColumnUInt8::Ptr CacheDictionary<dictionary_key_type>::hasKeys(const Columns & key_columns, const DataTypes & key_types) const
{
2021-03-01 22:23:14 +00:00
if (dictionary_key_type == DictionaryKeyType::complex)
dict_struct.validateKeyTypes(key_types);
2021-02-27 20:39:34 +00:00
Arena complex_keys_arena;
DictionaryKeysExtractor<dictionary_key_type> extractor(key_columns, complex_keys_arena);
2021-02-16 21:33:02 +00:00
const auto & keys = extractor.getKeys();
2021-02-18 16:57:20 +00:00
/// We make empty request just to fetch if keys exists
2021-02-16 21:33:02 +00:00
DictionaryStorageFetchRequest request(dict_struct, {});
2021-02-16 21:33:02 +00:00
FetchResult result_of_fetch_from_storage;
{
2021-02-17 11:48:06 +00:00
/// Write lock on storage
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
2021-03-02 13:07:17 +00:00
result_of_fetch_from_storage = cache_storage_ptr->fetchColumnsForKeys(keys, request);
2021-02-16 21:33:02 +00:00
}
size_t found_keys_size = result_of_fetch_from_storage.found_keys_size;
size_t expired_keys_size = result_of_fetch_from_storage.expired_keys_size;
size_t not_found_keys_size = result_of_fetch_from_storage.not_found_keys_size;
ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, found_keys_size);
2021-02-16 21:33:02 +00:00
ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, expired_keys_size);
ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, not_found_keys_size);
query_count.fetch_add(keys.size());
hit_count.fetch_add(found_keys_size);
CacheDictionaryUpdateUnitPtr<dictionary_key_type> update_unit = makeUpdateUnit(key_columns, keys, result_of_fetch_from_storage, request);
2021-02-16 21:33:02 +00:00
HashMap<KeyType, size_t> requested_keys_to_fetched_columns_during_update_index;
2021-02-16 21:33:02 +00:00
if (not_found_keys_size == 0 && expired_keys_size == 0)
{
2021-02-16 21:33:02 +00:00
/// All keys were found in storage
return ColumnUInt8::create(keys.size(), true);
}
else if (not_found_keys_size == 0 && expired_keys_size > 0 && allow_read_expired_keys)
{
/// Start async update only if allow read expired keys and all keys are found
update_queue.tryPushToUpdateQueueOrThrow(update_unit);
2021-02-16 21:33:02 +00:00
return ColumnUInt8::create(keys.size(), true);
}
else if (not_found_keys_size > 0)
{
/// Start sync update
update_queue.tryPushToUpdateQueueOrThrow(update_unit);
update_queue.waitForCurrentUpdateFinish(update_unit);
2021-02-16 21:33:02 +00:00
requested_keys_to_fetched_columns_during_update_index = std::move(update_unit->requested_keys_to_fetched_columns_during_update_index);
}
2021-02-16 21:33:02 +00:00
auto result = ColumnUInt8::create(keys.size(), false);
auto & data = result->getData();
2021-02-16 21:33:02 +00:00
for (size_t key_index = 0; key_index < keys.size(); ++key_index)
{
auto key = keys[key_index];
if (result_of_fetch_from_storage.key_index_to_state[key_index].isFound())
{
2021-02-16 21:33:02 +00:00
/// Check if key was fetched from cache
data[key_index] = true;
}
else if (requested_keys_to_fetched_columns_during_update_index.has(key))
{
/// Check if key was not in cache and was fetched during update
data[key_index] = true;
}
}
2021-02-16 21:33:02 +00:00
return result;
}
template <DictionaryKeyType dictionary_key_type>
CacheDictionaryUpdateUnitPtr<dictionary_key_type> CacheDictionary<dictionary_key_type>::makeUpdateUnit(
const Columns & key_columns [[maybe_unused]],
const PaddedPODArray<KeyType> & keys,
const KeysStorageFetchResult<KeyType> & storage_fetch_result,
const DictionaryStorageFetchRequest & fetch_request) const
{
CacheDictionaryUpdateUnitPtr<dictionary_key_type> update_unit;
auto & key_index_to_state = storage_fetch_result.key_index_to_state;
size_t update_keys_size = storage_fetch_result.expired_keys_size + storage_fetch_result.not_found_keys_size;
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
{
PaddedPODArray<UInt64> not_found_or_expired_keys;
not_found_or_expired_keys.reserve(update_keys_size);
for (size_t i = 0; i < keys.size(); ++i)
{
auto key_state = key_index_to_state[i];
if (key_state.isExpired() || key_state.isNotFound())
not_found_or_expired_keys.emplace_back(keys[i]);
}
update_unit = std::make_shared<CacheDictionaryUpdateUnit<dictionary_key_type>>(std::move(not_found_or_expired_keys), fetch_request);
}
else
{
std::vector<size_t> not_found_or_expired_indexes;
not_found_or_expired_indexes.reserve(update_keys_size);
for (size_t i = 0; i < keys.size(); ++i)
{
auto key_state = key_index_to_state[i];
if (key_state.isExpired() || key_state.isNotFound())
not_found_or_expired_indexes.emplace_back(i);
}
update_unit = std::make_shared<CacheDictionaryUpdateUnit<dictionary_key_type>>(key_columns, std::move(not_found_or_expired_indexes), fetch_request);
}
return update_unit;
}
template <DictionaryKeyType dictionary_key_type>
MutableColumns CacheDictionary<dictionary_key_type>::aggregateColumnsInOrderOfKeys(
const PaddedPODArray<KeyType> & keys,
const DictionaryStorageFetchRequest & request,
const MutableColumns & fetched_columns,
const PaddedPODArray<KeyState> & key_index_to_state)
{
MutableColumns aggregated_columns = request.makeAttributesResultColumns();
2021-02-16 21:33:02 +00:00
for (size_t fetch_request_index = 0; fetch_request_index < request.attributesSize(); ++fetch_request_index)
{
if (!request.shouldFillResultColumnWithIndex(fetch_request_index))
continue;
2021-02-16 21:33:02 +00:00
const auto & aggregated_column = aggregated_columns[fetch_request_index];
const auto & fetched_column = fetched_columns[fetch_request_index];
2021-02-16 21:33:02 +00:00
for (size_t key_index = 0; key_index < keys.size(); ++key_index)
{
auto state = key_index_to_state[key_index];
2021-02-16 21:33:02 +00:00
if (state.isNotFound())
continue;
aggregated_column->insertFrom(*fetched_column, state.fetched_column_index);
}
}
return aggregated_columns;
}
2021-02-16 21:33:02 +00:00
template <DictionaryKeyType dictionary_key_type>
MutableColumns CacheDictionary<dictionary_key_type>::aggregateColumns(
const PaddedPODArray<KeyType> & keys,
const DictionaryStorageFetchRequest & request,
const MutableColumns & fetched_columns_from_storage,
const PaddedPODArray<KeyState> & key_index_to_fetched_columns_from_storage_result,
2021-02-16 21:33:02 +00:00
const MutableColumns & fetched_columns_during_update,
const HashMap<KeyType, size_t> & found_keys_to_fetched_columns_during_update_index,
const std::vector<DefaultValueProvider> & default_value_providers)
{
MutableColumns aggregated_columns = request.makeAttributesResultColumns();
2021-02-16 21:33:02 +00:00
for (size_t fetch_request_index = 0; fetch_request_index < request.attributesSize(); ++fetch_request_index)
{
2021-02-16 21:33:02 +00:00
if (!request.shouldFillResultColumnWithIndex(fetch_request_index))
continue;
2021-02-16 21:33:02 +00:00
const auto & aggregated_column = aggregated_columns[fetch_request_index];
const auto & fetched_column_from_storage = fetched_columns_from_storage[fetch_request_index];
const auto & fetched_column_during_update = fetched_columns_during_update[fetch_request_index];
const auto & default_value_provider = default_value_providers[fetch_request_index];
2021-02-16 21:33:02 +00:00
for (size_t key_index = 0; key_index < keys.size(); ++key_index)
{
auto key = keys[key_index];
auto key_state_from_storage = key_index_to_fetched_columns_from_storage_result[key_index];
if (key_state_from_storage.isFound())
{
2021-02-16 21:33:02 +00:00
/// Check and insert value if key was fetched from cache
aggregated_column->insertFrom(*fetched_column_from_storage, key_state_from_storage.fetched_column_index);
2021-02-16 21:33:02 +00:00
continue;
}
2021-02-16 21:33:02 +00:00
/// Check and insert value if key was not in cache and was fetched during update
const auto * find_iterator_in_fetch_during_update = found_keys_to_fetched_columns_during_update_index.find(key);
if (find_iterator_in_fetch_during_update)
{
aggregated_column->insertFrom(*fetched_column_during_update, find_iterator_in_fetch_during_update->getMapped());
continue;
}
2021-02-16 21:33:02 +00:00
/// Insert default value
aggregated_column->insert(default_value_provider.getDefaultValue(key_index));
}
}
2021-02-16 21:33:02 +00:00
return aggregated_columns;
}
template <DictionaryKeyType dictionary_key_type>
BlockInputStreamPtr CacheDictionary<dictionary_key_type>::getBlockInputStream(const Names & column_names, size_t max_block_size) const
{
using BlockInputStreamType = DictionaryBlockInputStream<Key>;
2021-02-16 21:33:02 +00:00
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, cache_storage_ptr->getCachedSimpleKeys(), column_names);
else
{
2021-02-16 21:33:02 +00:00
auto keys = cache_storage_ptr->getCachedComplexKeys();
return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, keys, column_names);
}
2021-02-16 21:33:02 +00:00
}
2021-02-16 21:33:02 +00:00
template <DictionaryKeyType dictionary_key_type>
2021-03-01 22:23:14 +00:00
void CacheDictionary<dictionary_key_type>::update(CacheDictionaryUpdateUnitPtr<dictionary_key_type> update_unit_ptr)
2021-02-16 21:33:02 +00:00
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
2021-02-16 21:33:02 +00:00
size_t found_num = 0;
2021-02-16 21:33:02 +00:00
std::vector<UInt64> requested_keys_vector;
2021-02-16 21:33:02 +00:00
size_t requested_keys_size = 0;
2021-02-16 21:33:02 +00:00
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
{
const PaddedPODArray<UInt64> & requested_keys = update_unit_ptr->requested_simple_keys;
2021-02-16 21:33:02 +00:00
requested_keys_vector.reserve(requested_keys.size());
requested_keys_vector.assign(requested_keys.begin(), requested_keys.end());
2021-02-16 21:33:02 +00:00
requested_keys_size = requested_keys.size();
}
else
requested_keys_size = update_unit_ptr->requested_complex_key_rows.size();
2020-09-17 18:57:57 +00:00
2021-03-01 22:23:14 +00:00
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_keys_size);
2021-02-16 21:33:02 +00:00
const auto & fetch_request = update_unit_ptr->request;
2020-09-17 18:57:57 +00:00
2021-02-16 21:33:02 +00:00
const auto now = std::chrono::system_clock::now();
2021-02-16 21:33:02 +00:00
if (now > backoff_end_time.load())
2020-10-02 19:09:48 +00:00
{
2021-02-16 21:33:02 +00:00
try
{
auto current_source_ptr = getSourceAndUpdateIfNeeded();
2020-10-02 19:09:48 +00:00
2021-02-16 21:33:02 +00:00
Stopwatch watch;
BlockInputStreamPtr stream;
2020-10-02 19:09:48 +00:00
2021-02-16 21:33:02 +00:00
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
stream = current_source_ptr->loadIds(requested_keys_vector);
else
{
const auto & requested_complex_keys_columns = update_unit_ptr->requested_complex_key_columns;
const auto & requested_complex_keys_rows = update_unit_ptr->requested_complex_key_rows;
2020-10-02 19:09:48 +00:00
2021-02-16 21:33:02 +00:00
stream = current_source_ptr->loadKeys(requested_complex_keys_columns, requested_complex_keys_rows);
}
2020-11-13 16:16:56 +00:00
2021-02-16 21:33:02 +00:00
stream->readPrefix();
2020-10-02 19:09:48 +00:00
2021-02-16 21:33:02 +00:00
/// Lock for cache modification
ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
2020-10-02 19:09:48 +00:00
2021-02-16 21:33:02 +00:00
size_t skip_keys_size_offset = dict_struct.getKeysSize();
2021-02-16 21:33:02 +00:00
while (Block block = stream->read())
{
2021-02-16 21:33:02 +00:00
Columns key_columns;
key_columns.reserve(skip_keys_size_offset);
2021-02-16 21:33:02 +00:00
auto block_columns = block.getColumns();
2021-02-16 21:33:02 +00:00
/// Remove keys columns
for (size_t i = 0; i < skip_keys_size_offset; ++i)
{
key_columns.emplace_back(*block_columns.begin());
block_columns.erase(block_columns.begin());
}
2021-02-27 20:39:34 +00:00
DictionaryKeysExtractor<dictionary_key_type> keys_extractor(key_columns, update_unit_ptr->complex_key_arena);
2021-02-16 21:33:02 +00:00
const auto & keys = keys_extractor.getKeys();
2021-02-16 21:33:02 +00:00
cache_storage_ptr->insertColumnsForKeys(keys, block_columns);
2021-02-16 21:33:02 +00:00
for (size_t index_of_attribute = 0; index_of_attribute < update_unit_ptr->fetched_columns_during_update.size(); ++index_of_attribute)
{
auto & column_to_update = update_unit_ptr->fetched_columns_during_update[index_of_attribute];
2020-09-30 14:35:37 +00:00
2021-02-16 21:33:02 +00:00
if (fetch_request.shouldFillResultColumnWithIndex(index_of_attribute))
{
auto column = block.safeGetByPosition(skip_keys_size_offset + index_of_attribute).column;
column_to_update->insertRangeFrom(*column, 0, keys.size());
}
}
2021-02-16 21:33:02 +00:00
for (size_t i = 0; i < keys.size(); ++i)
{
auto fetched_key_from_source = keys[i];
2021-02-18 16:57:20 +00:00
size_t column_offset = found_num;
update_unit_ptr->requested_keys_to_fetched_columns_during_update_index[fetched_key_from_source] = column_offset + i;
2021-02-16 21:33:02 +00:00
}
2021-02-18 16:57:20 +00:00
found_num += keys.size();
2021-02-16 21:33:02 +00:00
}
2019-12-19 18:22:04 +00:00
2021-02-16 21:33:02 +00:00
stream->readSuffix();
2020-09-17 18:57:57 +00:00
2021-02-16 21:33:02 +00:00
error_count = 0;
last_exception = std::exception_ptr{};
backoff_end_time = std::chrono::system_clock::time_point{};
2021-02-16 21:33:02 +00:00
ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
}
catch (...)
{
2021-02-16 21:33:02 +00:00
/// Lock just for last_exception safety
ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
++error_count;
last_exception = std::current_exception();
backoff_end_time = now + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, error_count));
2021-02-16 21:33:02 +00:00
tryLogException(last_exception, log,
"Could not update cache dictionary '" + getDictionaryID().getNameForLogs() +
"', next update is scheduled at " + ext::to_string(backoff_end_time.load()));
try
2020-11-13 16:16:56 +00:00
{
2021-02-16 21:33:02 +00:00
std::rethrow_exception(last_exception);
2020-11-13 16:16:56 +00:00
}
2021-02-16 21:33:02 +00:00
catch (...)
2020-11-13 16:16:56 +00:00
{
2021-02-16 21:33:02 +00:00
throw DB::Exception(ErrorCodes::CACHE_DICTIONARY_UPDATE_FAIL,
"Update failed for dictionary {} : {}",
getDictionaryID().getNameForLogs(),
getCurrentExceptionMessage(true /*with stack trace*/,
true /*check embedded stack trace*/));
2020-11-13 16:16:56 +00:00
}
}
2021-02-16 21:33:02 +00:00
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, requested_keys_size - found_num);
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedFound, found_num);
ProfileEvents::increment(ProfileEvents::DictCacheRequests);
}
else
{
/// Won't request source for keys
throw DB::Exception(ErrorCodes::CACHE_DICTIONARY_UPDATE_FAIL,
"Query contains keys that are not present in cache or expired. Could not update cache dictionary {} now, because nearest update is scheduled at {}. Try again later.",
getDictionaryID().getNameForLogs(),
ext::to_string(backoff_end_time.load()));
}
2021-02-16 21:33:02 +00:00
}
2021-02-16 21:33:02 +00:00
template class CacheDictionary<DictionaryKeyType::simple>;
template class CacheDictionary<DictionaryKeyType::complex>;
2021-02-16 21:33:02 +00:00
namespace
{
2019-12-26 18:56:34 +00:00
2021-02-16 21:33:02 +00:00
CacheDictionaryStorageConfiguration parseCacheStorageConfiguration(
const std::string & full_name,
const Poco::Util::AbstractConfiguration & config,
const std::string & layout_prefix,
const DictionaryLifetime & dict_lifetime,
bool is_complex)
{
2021-02-16 21:33:02 +00:00
std::string dictionary_type_prefix = is_complex ? ".complex_key_cache." : ".cache.";
std::string dictionary_configuration_prefix = layout_prefix + dictionary_type_prefix;
2021-02-16 21:33:02 +00:00
const size_t size = config.getUInt64(dictionary_configuration_prefix + "size_in_cells");
if (size == 0)
throw Exception{full_name + ": dictionary of layout 'cache' cannot have 0 cells",
ErrorCodes::TOO_SMALL_BUFFER_SIZE};
2021-02-16 21:33:02 +00:00
const size_t strict_max_lifetime_seconds =
config.getUInt64(dictionary_configuration_prefix + "strict_max_lifetime_seconds",
static_cast<size_t>(dict_lifetime.max_sec));
2021-02-27 16:04:32 +00:00
size_t rounded_size = roundUpToPowerOfTwoOrZero(size);
2021-02-16 21:33:02 +00:00
CacheDictionaryStorageConfiguration storage_configuration {
2021-02-27 16:04:32 +00:00
rounded_size,
2021-02-16 21:33:02 +00:00
strict_max_lifetime_seconds,
dict_lifetime
};
2020-09-17 18:57:57 +00:00
2021-02-16 21:33:02 +00:00
return storage_configuration;
2020-09-17 18:57:57 +00:00
}
2021-02-25 14:39:05 +00:00
#if defined(OS_LINUX) || defined(__FreeBSD__)
2021-02-16 21:33:02 +00:00
SSDCacheDictionaryStorageConfiguration parseSSDCacheStorageConfiguration(
const std::string & full_name,
const Poco::Util::AbstractConfiguration & config,
const std::string & layout_prefix,
const DictionaryLifetime & dict_lifetime,
bool is_complex)
{
2021-02-16 21:33:02 +00:00
std::string dictionary_type_prefix = is_complex ? ".complex_key_ssd_cache." : ".ssd_cache.";
std::string dictionary_configuration_prefix = layout_prefix + dictionary_type_prefix;
2021-02-16 21:33:02 +00:00
const size_t strict_max_lifetime_seconds =
config.getUInt64(dictionary_configuration_prefix + "strict_max_lifetime_seconds",
static_cast<size_t>(dict_lifetime.max_sec));
2021-02-16 21:33:02 +00:00
static constexpr size_t DEFAULT_SSD_BLOCK_SIZE_BYTES = DEFAULT_AIO_FILE_BLOCK_SIZE;
static constexpr size_t DEFAULT_FILE_SIZE_BYTES = 4 * 1024 * 1024 * 1024ULL;
static constexpr size_t DEFAULT_READ_BUFFER_SIZE_BYTES = 16 * DEFAULT_SSD_BLOCK_SIZE_BYTES;
static constexpr size_t DEFAULT_WRITE_BUFFER_SIZE_BYTES = DEFAULT_SSD_BLOCK_SIZE_BYTES;
2021-02-16 21:33:02 +00:00
static constexpr size_t DEFAULT_MAX_STORED_KEYS = 100000;
static constexpr size_t DEFAULT_PARTITIONS_COUNT = 16;
2021-02-24 17:13:36 +00:00
const size_t max_partitions_count = config.getInt64(dictionary_configuration_prefix + "ssd_cache.max_partitions_count", DEFAULT_PARTITIONS_COUNT);
2021-02-24 17:13:36 +00:00
const size_t block_size = config.getInt64(dictionary_configuration_prefix + "block_size", DEFAULT_SSD_BLOCK_SIZE_BYTES);
const size_t file_size = config.getInt64(dictionary_configuration_prefix + "file_size", DEFAULT_FILE_SIZE_BYTES);
if (file_size % block_size != 0)
2021-02-16 21:33:02 +00:00
throw Exception{full_name + ": file_size must be a multiple of block_size", ErrorCodes::BAD_ARGUMENTS};
2021-02-24 17:13:36 +00:00
const size_t read_buffer_size = config.getInt64(dictionary_configuration_prefix + "read_buffer_size", DEFAULT_READ_BUFFER_SIZE_BYTES);
if (read_buffer_size % block_size != 0)
2021-02-16 21:33:02 +00:00
throw Exception{full_name + ": read_buffer_size must be a multiple of block_size", ErrorCodes::BAD_ARGUMENTS};
2021-02-24 17:13:36 +00:00
const size_t write_buffer_size = config.getInt64(dictionary_configuration_prefix + "write_buffer_size", DEFAULT_WRITE_BUFFER_SIZE_BYTES);
if (write_buffer_size % block_size != 0)
2021-02-16 21:33:02 +00:00
throw Exception{full_name + ": write_buffer_size must be a multiple of block_size", ErrorCodes::BAD_ARGUMENTS};
2021-02-16 21:33:02 +00:00
auto directory_path = config.getString(dictionary_configuration_prefix + "path");
if (directory_path.empty())
throw Exception{full_name + ": dictionary of layout 'ssd_cache' cannot have empty path",
ErrorCodes::BAD_ARGUMENTS};
if (directory_path.at(0) != '/')
directory_path = std::filesystem::path{config.getString("path")}.concat(directory_path).string();
const size_t max_stored_keys_in_partition = config.getInt64(dictionary_configuration_prefix + "max_stored_keys", DEFAULT_MAX_STORED_KEYS);
2021-02-28 13:30:03 +00:00
const size_t rounded_size = roundUpToPowerOfTwoOrZero(max_stored_keys_in_partition);
2021-02-16 21:33:02 +00:00
SSDCacheDictionaryStorageConfiguration configuration {
strict_max_lifetime_seconds,
dict_lifetime,
directory_path,
max_partitions_count,
2021-02-28 13:30:03 +00:00
rounded_size,
2021-02-16 21:33:02 +00:00
block_size,
2021-02-24 17:13:36 +00:00
file_size / block_size,
read_buffer_size / block_size,
write_buffer_size / block_size
2021-02-16 21:33:02 +00:00
};
2021-02-16 21:33:02 +00:00
return configuration;
}
2021-02-25 14:39:05 +00:00
#endif
2021-02-16 21:33:02 +00:00
CacheDictionaryUpdateQueueConfiguration parseCacheDictionaryUpdateQueueConfiguration(
const std::string & full_name,
const Poco::Util::AbstractConfiguration & config,
const std::string & layout_prefix,
bool is_complex)
{
2021-02-16 21:33:02 +00:00
std::string type = is_complex ? "complex_key_cache" : "cache";
2021-02-16 21:33:02 +00:00
const size_t max_update_queue_size =
config.getUInt64(layout_prefix + ".cache.max_update_queue_size", 100000);
if (max_update_queue_size == 0)
throw Exception{full_name + ": dictionary of layout'" + type + "'cannot have empty update queue of size 0",
ErrorCodes::TOO_SMALL_BUFFER_SIZE};
2021-02-16 21:33:02 +00:00
const size_t update_queue_push_timeout_milliseconds =
config.getUInt64(layout_prefix + ".cache.update_queue_push_timeout_milliseconds", 10);
if (update_queue_push_timeout_milliseconds < 10)
throw Exception{full_name + ": dictionary of layout'" + type + "'have too little update_queue_push_timeout",
ErrorCodes::BAD_ARGUMENTS};
2021-02-16 21:33:02 +00:00
const size_t query_wait_timeout_milliseconds =
config.getUInt64(layout_prefix + ".cache.query_wait_timeout_milliseconds", 60000);
2017-04-27 17:16:24 +00:00
2021-02-16 21:33:02 +00:00
const size_t max_threads_for_updates =
config.getUInt64(layout_prefix + ".max_threads_for_updates", 4);
if (max_threads_for_updates == 0)
throw Exception{full_name + ": dictionary of layout'"+ type +"'cannot have zero threads for updates.",
ErrorCodes::BAD_ARGUMENTS};
2020-10-02 14:26:39 +00:00
2021-02-16 21:33:02 +00:00
CacheDictionaryUpdateQueueConfiguration update_queue_configuration {
max_update_queue_size,
max_threads_for_updates,
2021-02-16 21:33:02 +00:00
update_queue_push_timeout_milliseconds,
query_wait_timeout_milliseconds };
2021-02-16 21:33:02 +00:00
return update_queue_configuration;
2017-04-27 17:16:24 +00:00
}
}
void registerDictionaryCache(DictionaryFactory & factory)
{
2021-02-16 21:33:02 +00:00
auto create_simple_cache_layout = [=](const std::string & full_name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
2018-12-10 15:50:58 +00:00
DictionarySourcePtr source_ptr) -> DictionaryPtr
{
if (dict_struct.key)
throw Exception{"'key' is not supported for dictionary of layout 'cache'",
ErrorCodes::UNSUPPORTED_METHOD};
if (dict_struct.range_min || dict_struct.range_max)
throw Exception{full_name
+ ": elements .structure.range_min and .structure.range_max should be defined only "
"for a dictionary of layout 'range_hashed'",
ErrorCodes::BAD_ARGUMENTS};
const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
if (require_nonempty)
throw Exception{full_name + ": dictionary of layout 'cache' cannot have 'require_nonempty' attribute set",
ErrorCodes::BAD_ARGUMENTS};
2021-02-16 21:33:02 +00:00
const auto & layout_prefix = config_prefix + ".layout";
2020-04-17 17:01:18 +00:00
2021-02-16 21:33:02 +00:00
const auto dict_id = StorageID::fromDictionaryConfig(config, config_prefix);
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
const bool allow_read_expired_keys =
config.getBool(layout_prefix + ".cache.allow_read_expired_keys", false);
2021-02-16 21:33:02 +00:00
auto storage_configuration = parseCacheStorageConfiguration(full_name, config, layout_prefix, dict_lifetime, false);
auto storage = std::make_shared<CacheDictionaryStorage<DictionaryKeyType::simple>>(storage_configuration);
2021-02-16 21:33:02 +00:00
auto update_queue_configuration = parseCacheDictionaryUpdateQueueConfiguration(full_name, config, layout_prefix, false);
2020-04-17 17:01:18 +00:00
2021-02-16 21:33:02 +00:00
return std::make_unique<CacheDictionary<DictionaryKeyType::simple>>(
2020-07-14 19:19:17 +00:00
dict_id,
2020-04-17 17:01:18 +00:00
dict_struct,
std::move(source_ptr),
2021-02-16 21:33:02 +00:00
storage,
update_queue_configuration,
2020-04-17 17:01:18 +00:00
dict_lifetime,
2021-02-16 21:33:02 +00:00
allow_read_expired_keys);
};
2021-02-16 21:33:02 +00:00
factory.registerLayout("cache", create_simple_cache_layout, false);
2021-02-16 21:33:02 +00:00
auto create_complex_key_cache_layout = [=](const std::string & full_name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr) -> DictionaryPtr
2020-04-17 17:01:18 +00:00
{
2021-02-16 21:33:02 +00:00
if (dict_struct.id)
throw Exception{"'id' is not supported for dictionary of layout 'complex_key_cache'",
ErrorCodes::UNSUPPORTED_METHOD};
2020-04-17 17:01:18 +00:00
2021-02-16 21:33:02 +00:00
if (dict_struct.range_min || dict_struct.range_max)
throw Exception{full_name
+ ": elements .structure.range_min and .structure.range_max should be defined only "
"for a dictionary of layout 'range_hashed'",
ErrorCodes::BAD_ARGUMENTS};
2020-04-17 17:01:18 +00:00
2021-02-16 21:33:02 +00:00
const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
if (require_nonempty)
throw Exception{full_name + ": dictionary of layout 'cache' cannot have 'require_nonempty' attribute set",
ErrorCodes::BAD_ARGUMENTS};
2021-02-16 21:33:02 +00:00
const auto & layout_prefix = config_prefix + ".layout";
2020-09-28 14:48:32 +00:00
2021-02-16 21:33:02 +00:00
const auto dict_id = StorageID::fromDictionaryConfig(config, config_prefix);
2020-09-28 14:48:32 +00:00
2021-02-16 21:33:02 +00:00
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
2020-09-28 14:48:32 +00:00
2021-02-16 21:33:02 +00:00
const bool allow_read_expired_keys =
config.getBool(layout_prefix + ".cache.allow_read_expired_keys", false);
2021-02-16 21:33:02 +00:00
auto storage_configuration = parseCacheStorageConfiguration(full_name, config, layout_prefix, dict_lifetime, true);
auto storage = std::make_shared<CacheDictionaryStorage<DictionaryKeyType::complex>>(storage_configuration);
2020-09-17 18:57:57 +00:00
2021-02-16 21:33:02 +00:00
auto update_queue_configuration = parseCacheDictionaryUpdateQueueConfiguration(full_name, config, layout_prefix, true);
2021-02-16 21:33:02 +00:00
return std::make_unique<CacheDictionary<DictionaryKeyType::complex>>(
dict_id,
dict_struct,
std::move(source_ptr),
storage,
update_queue_configuration,
dict_lifetime,
allow_read_expired_keys);
};
2021-02-16 21:33:02 +00:00
factory.registerLayout("complex_key_cache", create_complex_key_cache_layout, true);
2021-02-21 12:50:55 +00:00
#if defined(OS_LINUX) || defined(__FreeBSD__)
2021-02-16 21:33:02 +00:00
auto create_simple_ssd_cache_layout = [=](const std::string & full_name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr) -> DictionaryPtr
{
if (dict_struct.key)
throw Exception{"'key' is not supported for dictionary of layout 'cache'",
ErrorCodes::UNSUPPORTED_METHOD};
2021-02-16 21:33:02 +00:00
if (dict_struct.range_min || dict_struct.range_max)
throw Exception{full_name
+ ": elements .structure.range_min and .structure.range_max should be defined only "
"for a dictionary of layout 'range_hashed'",
ErrorCodes::BAD_ARGUMENTS};
2021-02-16 21:33:02 +00:00
const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
if (require_nonempty)
throw Exception{full_name + ": dictionary of layout 'cache' cannot have 'require_nonempty' attribute set",
ErrorCodes::BAD_ARGUMENTS};
2021-02-16 21:33:02 +00:00
const auto & layout_prefix = config_prefix + ".layout";
2021-02-16 21:33:02 +00:00
const auto dict_id = StorageID::fromDictionaryConfig(config, config_prefix);
2021-02-16 21:33:02 +00:00
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
2020-09-17 18:57:57 +00:00
2021-02-16 21:33:02 +00:00
const bool allow_read_expired_keys =
config.getBool(layout_prefix + ".cache.allow_read_expired_keys", false);
2021-02-16 21:33:02 +00:00
auto storage_configuration = parseSSDCacheStorageConfiguration(full_name, config, layout_prefix, dict_lifetime, false);
auto storage = std::make_shared<SSDCacheDictionaryStorage<DictionaryKeyType::simple>>(storage_configuration);
2021-02-16 21:33:02 +00:00
auto update_queue_configuration = parseCacheDictionaryUpdateQueueConfiguration(full_name, config, layout_prefix, false);
2020-09-17 18:57:57 +00:00
2021-02-16 21:33:02 +00:00
return std::make_unique<CacheDictionary<DictionaryKeyType::simple>>(
dict_id,
dict_struct,
std::move(source_ptr),
storage,
update_queue_configuration,
dict_lifetime,
allow_read_expired_keys);
};
2020-09-24 15:53:14 +00:00
2021-02-16 21:33:02 +00:00
factory.registerLayout("ssd_cache", create_simple_ssd_cache_layout, false);
2020-09-17 18:57:57 +00:00
2021-02-16 21:33:02 +00:00
auto create_complex_key_ssd_cache_layout = [=](const std::string & full_name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr) -> DictionaryPtr
{
if (dict_struct.id)
throw Exception{"'id' is not supported for dictionary of layout 'complex_key_cache'",
ErrorCodes::UNSUPPORTED_METHOD};
2021-02-16 21:33:02 +00:00
if (dict_struct.range_min || dict_struct.range_max)
throw Exception{full_name
+ ": elements .structure.range_min and .structure.range_max should be defined only "
"for a dictionary of layout 'range_hashed'",
ErrorCodes::BAD_ARGUMENTS};
2021-02-16 21:33:02 +00:00
const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
if (require_nonempty)
throw Exception{full_name + ": dictionary of layout 'cache' cannot have 'require_nonempty' attribute set",
ErrorCodes::BAD_ARGUMENTS};
2021-02-16 21:33:02 +00:00
const auto & layout_prefix = config_prefix + ".layout";
2021-02-16 21:33:02 +00:00
const auto dict_id = StorageID::fromDictionaryConfig(config, config_prefix);
2020-07-20 12:34:29 +00:00
2021-02-16 21:33:02 +00:00
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
2020-10-02 14:26:39 +00:00
2021-02-16 21:33:02 +00:00
const bool allow_read_expired_keys =
config.getBool(layout_prefix + ".cache.allow_read_expired_keys", false);
2021-02-16 21:33:02 +00:00
auto storage_configuration = parseSSDCacheStorageConfiguration(full_name, config, layout_prefix, dict_lifetime, true);
auto storage = std::make_shared<SSDCacheDictionaryStorage<DictionaryKeyType::complex>>(storage_configuration);
2021-02-16 21:33:02 +00:00
auto update_queue_configuration = parseCacheDictionaryUpdateQueueConfiguration(full_name, config, layout_prefix, true);
2020-10-02 15:47:07 +00:00
2021-02-16 21:33:02 +00:00
return std::make_unique<CacheDictionary<DictionaryKeyType::complex>>(
dict_id,
dict_struct,
std::move(source_ptr),
storage,
update_queue_configuration,
dict_lifetime,
allow_read_expired_keys);
};
2021-02-16 21:33:02 +00:00
factory.registerLayout("complex_key_ssd_cache", create_complex_key_ssd_cache_layout, true);
2021-02-21 12:50:55 +00:00
#endif
}
2019-12-26 18:56:34 +00:00
}