Added documentation

This commit is contained in:
Maksim Kita 2021-02-17 14:48:06 +03:00
parent dc0bb7485d
commit c5fbe8793b
8 changed files with 229 additions and 131 deletions

View File

@ -305,13 +305,11 @@ Columns CacheDictionary<dictionary_key_type>::getColumnsImpl(
{
DictionaryStorageFetchRequest request(dict_struct, attribute_names);
using FetchResult = std::conditional_t<dictionary_key_type == DictionaryKeyType::simple, SimpleKeysStorageFetchResult, ComplexKeysStorageFetchResult>;
FetchResult result_of_fetch_from_storage;
{
/// Read lock on storage
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
/// Write lock on storage
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
auto fetch_result = cache_storage_ptr->fetchColumnsForKeys(keys, request);
result_of_fetch_from_storage = std::move(fetch_result);
@ -405,13 +403,11 @@ ColumnUInt8::Ptr CacheDictionary<dictionary_key_type>::hasKeys(const Columns & k
DictionaryStorageFetchRequest request(dict_struct, {});
using FetchResult = std::conditional_t<dictionary_key_type == DictionaryKeyType::simple, SimpleKeysStorageFetchResult, ComplexKeysStorageFetchResult>;
FetchResult result_of_fetch_from_storage;
{
/// Read lock on storage
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
/// Write lock on storage
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
auto fetch_result = cache_storage_ptr->fetchColumnsForKeys(keys, request);
result_of_fetch_from_storage = std::move(fetch_result);
@ -488,6 +484,7 @@ ColumnUInt8::Ptr CacheDictionary<dictionary_key_type>::hasKeys(const Columns & k
return result;
}
/// TODO: Remove before merge
// namespace {
// String convertKeyToString(UInt64 key)
@ -520,6 +517,7 @@ MutableColumns CacheDictionary<dictionary_key_type>::aggregateColumns(
const HashMap<KeyType, size_t> & found_keys_to_fetched_columns_during_update_index,
const std::vector<DefaultValueProvider> & default_value_providers)
{
/// TODO: Remove before merge
// std::cerr << "CacheDictionary::aggregateColumns" << std::endl;
// std::cerr << "Fetched keys from storage" << std::endl;
// for (auto & node : found_keys_to_fetched_columns_from_storage_index)

View File

@ -27,20 +27,28 @@
namespace DB
{
namespace ErrorCodes
{
}
/** CacheDictionary store keys in cache storage and can asynchronous and synchronous updates during keys fetch.
/*
*
* This dictionary is stored in a cache that has a fixed number of cells.
* These cells contain frequently used elements.
* When searching for a dictionary, the cache is searched first and special heuristic is used:
* while looking for the key, we take a look only at max_collision_length elements.
* So, our cache is not perfect. It has errors like "the key is in cache, but the cache says that it does not".
* And in this case we simply ask external source for the key which is faster.
* You have to keep this logic in mind.
* */
If keys are not found in storage during fetch, dictionary start update operation with update queue.
During update operation necessary keys are fetched from source and inserted into storage.
After that data from storage and source are aggregated and returned to the client.
Typical flow:
1. Client request data during for example getColumn function call.
2. CacheDictionary request data from storage and if all data is found in storage it returns result to client.
3. If some data is not in storage cache dictionary try to perform update.
If all keys are just expired and allow_read_expired_keys option is set dictionary starts asynchronous update and
return result to client.
If there are not found keys dictionary start synchronous update and wait for result.
4. After getting result from synchronous update dictionary aggregates data that was previously fetched from
storage and data that was fetched during update and return result to client.
*/
template <DictionaryKeyType dictionary_key_type>
class CacheDictionary final : public IDictionary
{
@ -143,6 +151,7 @@ public:
PaddedPODArray<UInt8> & out) const override;
private:
using FetchResult = std::conditional_t<dictionary_key_type == DictionaryKeyType::simple, SimpleKeysStorageFetchResult, ComplexKeysStorageFetchResult>;
Columns getColumnsImpl(
const Strings & attribute_names,
@ -163,8 +172,6 @@ private:
void update(CacheDictionaryUpdateUnitPtr<dictionary_key_type> & update_unit_ptr);
using SharedDictionarySourcePtr = std::shared_ptr<IDictionarySource>;
/// Update dictionary source pointer if required and return it. Thread safe.
/// MultiVersion is not used here because it works with constant pointers.
/// For some reason almost all methods in IDictionarySource interface are

View File

@ -21,31 +21,22 @@ namespace ErrorCodes
struct CacheDictionaryStorageConfiguration
{
/// Max size of storage in cells
const size_t max_size_in_cells;
/// Needed to perform check if cell is expired or not found. Default value is dictionary max lifetime.
const size_t strict_max_lifetime_seconds;
/// Lifetime of dictionary. Cell deadline is random value between lifetime min and max seconds.
const DictionaryLifetime lifetime;
};
template <typename CacheDictionaryStorage>
class ArenaCellDisposer
{
public:
CacheDictionaryStorage & storage;
/** Keys are stored in LRUCache and column values are serialized into arena.
template <typename Key, typename Value>
void operator()(const Key & key, const Value & value) const
{
/// In case of complex key we keep it in arena
if constexpr (std::is_same_v<Key, StringRef>)
{
storage.arena.free(const_cast<char *>(key.data), key.size);
}
Cell in LRUCache consists of allocated size and place in arena were columns serialized data is stored.
storage.arena.free(value.place_for_serialized_columns, value.allocated_size_for_columns);
}
};
When cell is removed from LRUCache data associated with it is also removed from arena.
/// TODO: Fix name
In case of complex key we also store key data in arena and it is removed from arena.
*/
template <DictionaryKeyType dictionary_key_type>
class CacheDictionaryStorage final : public ICacheDictionaryStorage
{
@ -56,10 +47,15 @@ public:
explicit CacheDictionaryStorage(CacheDictionaryStorageConfiguration & configuration_)
: configuration(configuration_)
, rnd_engine(randomSeed())
, cache(configuration.max_size_in_cells, false, ArenaCellDisposer<CacheDictionaryStorage<dictionary_key_type>> { *this })
, cache(configuration.max_size_in_cells, false, { *this })
{
}
bool returnFetchedColumnsDuringFetchInOrderOfRequestedKeys() const override
{
return true;
}
bool supportsSimpleKeys() const override
{
return dictionary_key_type == DictionaryKeyType::simple;
@ -67,7 +63,7 @@ public:
SimpleKeysStorageFetchResult fetchColumnsForKeys(
const PaddedPODArray<UInt64> & keys,
const DictionaryStorageFetchRequest & fetch_request) const override
const DictionaryStorageFetchRequest & fetch_request) override
{
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
{
@ -102,7 +98,7 @@ public:
ComplexKeysStorageFetchResult fetchColumnsForKeys(
const PaddedPODArray<StringRef> & keys,
const DictionaryStorageFetchRequest & column_fetch_requests) const override
const DictionaryStorageFetchRequest & column_fetch_requests) override
{
if constexpr (dictionary_key_type == DictionaryKeyType::complex)
{
@ -145,7 +141,7 @@ private:
void fetchColumnsForKeysImpl(
const PaddedPODArray<KeyType> & keys,
const DictionaryStorageFetchRequest & fetch_request,
KeysStorageFetchResult & result) const
KeysStorageFetchResult & result)
{
result.fetched_columns = fetch_request.makeAttributesResultColumns();
result.found_keys_to_fetched_columns_index.reserve(keys.size());
@ -287,7 +283,7 @@ private:
inline void setCellDeadline(Cell & cell, TimePoint now)
{
/// TODO: Fix deadlines
/// TODO: Fix zero dictionary lifetime deadlines
size_t min_sec_lifetime = configuration.lifetime.min_sec;
size_t max_sec_lifetime = configuration.lifetime.max_sec;
@ -305,8 +301,26 @@ private:
pcg64 rnd_engine;
using SimpleKeyLRUHashMap = LRUHashMap<UInt64, Cell, ArenaCellDisposer<CacheDictionaryStorage<dictionary_key_type>>>;
using ComplexKeyLRUHashMap = LRUHashMapWithSavedHash<StringRef, Cell, ArenaCellDisposer<CacheDictionaryStorage<dictionary_key_type>>>;
class ArenaCellDisposer
{
public:
CacheDictionaryStorage<dictionary_key_type> & storage;
template <typename Key, typename Value>
void operator()(const Key & key, const Value & value) const
{
/// In case of complex key we keep it in arena
if constexpr (std::is_same_v<Key, StringRef>)
{
storage.arena.free(const_cast<char *>(key.data), key.size);
}
storage.arena.free(value.place_for_serialized_columns, value.allocated_size_for_columns);
}
};
using SimpleKeyLRUHashMap = LRUHashMap<UInt64, Cell, ArenaCellDisposer>;
using ComplexKeyLRUHashMap = LRUHashMapWithSavedHash<StringRef, Cell, ArenaCellDisposer>;
using CacheLRUHashMap = std::conditional_t<
dictionary_key_type == DictionaryKeyType::simple,

View File

@ -30,15 +30,19 @@ CacheDictionaryUpdateQueue<dictionary_key_type>::CacheDictionaryUpdateQueue(
{
for (size_t i = 0; i < configuration.max_threads_for_updates; ++i)
update_pool.scheduleOrThrowOnError([this] { updateThreadFunction(); });
std::cerr << "Constructor finished" << std::endl;
}
template <DictionaryKeyType dictionary_key_type>
CacheDictionaryUpdateQueue<dictionary_key_type>::~CacheDictionaryUpdateQueue()
{
if (!finished)
stopAndWait();
try {
if (!finished)
stopAndWait();
}
catch (...)
{
/// TODO: Write log
}
}
template <DictionaryKeyType dictionary_key_type>
@ -55,8 +59,6 @@ void CacheDictionaryUpdateQueue<dictionary_key_type>::tryPushToUpdateQueueOrThro
dictionary_name_for_logs,
std::to_string(configuration.update_queue_push_timeout_milliseconds),
std::to_string(update_queue.size()));
std::cerr << "CacheDictionaryUpdateQueue::tryPushToUpdateQueueOrThrow finished" << std::endl;
}
template <DictionaryKeyType dictionary_key_type>
@ -126,13 +128,9 @@ void CacheDictionaryUpdateQueue<dictionary_key_type>::updateThreadFunction()
while (!finished)
{
std::cerr << "CacheDictionary::updateThreadFunction wait for task " << std::endl;
CacheDictionaryUpdateUnitPtr<dictionary_key_type> unit_to_update;
update_queue.pop(unit_to_update);
std::cerr << "CacheDictionary::updateThreadFunction got task " << std::endl;
if (finished)
break;

View File

@ -24,13 +24,24 @@ namespace CurrentMetrics
namespace DB
{
/** This class is passed between update queue and update queue client during update.
For simple keys we pass simple keys.
For complex keys we pass complex keys columns and requested rows to update.
During update cache dictionary should fill requested_keys_to_fetched_columns_during_update_index and
fetched_columns_during_update.
For complex key to extend lifetime of key complex key arena should be used.
*/
template <DictionaryKeyType dictionary_key_type>
class CacheDictionaryUpdateUnit
{
public:
using KeyType = std::conditional_t<dictionary_key_type == DictionaryKeyType::simple, UInt64, StringRef>;
static_assert(dictionary_key_type != DictionaryKeyType::range, "Range key type is not supported by CacheDictionaryUpdateUnit");
/// Constructor for simple keys update request
explicit CacheDictionaryUpdateUnit(
PaddedPODArray<UInt64> && requested_simple_keys_,
const DictionaryStorageFetchRequest & request_)
@ -41,6 +52,7 @@ public:
fetched_columns_during_update = request.makeAttributesResultColumns();
}
/// Constructor for complex keys update request
explicit CacheDictionaryUpdateUnit(
const Columns & requested_complex_key_columns_,
const std::vector<size_t> && requested_complex_key_rows_,
@ -65,9 +77,9 @@ public:
const DictionaryStorageFetchRequest request;
/// This should be filled by the client during update
HashMap<KeyType, size_t> requested_keys_to_fetched_columns_during_update_index;
MutableColumns fetched_columns_during_update;
/// Complex keys are serialized in this arena and added to map
const std::shared_ptr<Arena> complex_key_arena;
private:
@ -90,17 +102,25 @@ extern template class CacheDictionaryUpdateUnit<DictionaryKeyType::complex>;
struct CacheDictionaryUpdateQueueConfiguration
{
/// Size of update queue
const size_t max_update_queue_size;
const size_t update_queue_push_timeout_milliseconds;
const size_t query_wait_timeout_milliseconds;
/// Size in thead pool of update queue
const size_t max_threads_for_updates;
/// Timeout for trying to push update unit into queue
const size_t update_queue_push_timeout_milliseconds;
/// Timeout during sync waititing of update unit
const size_t query_wait_timeout_milliseconds;
};
/** Responsibility of this class is to provide asynchronous and synchronous update support for CacheDictionary
It is responsibility of CacheDictionary to perform update with UpdateUnit using UpdateFunction.
*/
template <DictionaryKeyType dictionary_key_type>
class CacheDictionaryUpdateQueue
{
public:
/// Client of update queue must provide this function in constructor and perform update using update unit.
using UpdateFunction = std::function<void (CacheDictionaryUpdateUnitPtr<dictionary_key_type> &)>;
static_assert(dictionary_key_type != DictionaryKeyType::range, "Range key type is not supported by CacheDictionaryUpdateQueue");
@ -111,20 +131,33 @@ public:
~CacheDictionaryUpdateQueue();
const CacheDictionaryUpdateQueueConfiguration & getConfiguration() const
{
return configuration;
}
/// Get configuration that was passed to constructor
const CacheDictionaryUpdateQueueConfiguration & getConfiguration() const { return configuration; }
bool isFinished() const
{
return finished;
}
/// Is queue finished
bool isFinished() const { return finished; }
/// Synchronous wait for update queue to stop
void stopAndWait();
/** Try to add update unit into queue.
If queue is full and oush cannot be performed in update_queue_push_timeout_milliseconds from configuration
an exception will be thrown.
If queue already finished an exception will be thrown.
*/
void tryPushToUpdateQueueOrThrow(CacheDictionaryUpdateUnitPtr<dictionary_key_type> & update_unit_ptr);
/** Try to synchronously wait for update completion.
If exception was passed from update function during update it will be rethrowed.
If update will not be finished in query_wait_timeout_milliseconds from configuration
an exception will be thrown.
If queue already finished an exception will be thrown.
*/
void waitForCurrentUpdateFinish(CacheDictionaryUpdateUnitPtr<dictionary_key_type> & update_unit_ptr) const;
private:

View File

@ -17,6 +17,89 @@ namespace ErrorCodes
extern const int TYPE_MISMATCH;
}
/** Support class for dictionary storages.
The main idea is that during fetch we create all columns, but fill only columns that client requested.
We need to create other columns during fetch, because in case of serialized storage we can skip
unnecessary columns serialized in cache with skipSerializedInArena method.
When result is fetched from the storage client of storage can filterOnlyNecessaryColumns
and get only columns that match attributes_names_to_fetch.
*/
class DictionaryStorageFetchRequest
{
public:
DictionaryStorageFetchRequest(const DictionaryStructure & structure, const Strings & attributes_names_to_fetch)
: attributes_to_fetch_names_set(attributes_names_to_fetch.begin(), attributes_names_to_fetch.end())
, attributes_to_fetch_filter(structure.attributes.size(), false)
{
size_t attributes_size = structure.attributes.size();
attributes_to_fetch_types.reserve(attributes_size);
for (size_t i = 0; i < attributes_size; ++i)
{
const auto & name = structure.attributes[i].name;
const auto & type = structure.attributes[i].type;
attributes_to_fetch_types.emplace_back(type);
if (attributes_to_fetch_names_set.find(name) != attributes_to_fetch_names_set.end())
{
attributes_to_fetch_filter[i] = true;
}
}
}
DictionaryStorageFetchRequest() = default;
/// Check requested attributes size
size_t attributesSize() const
{
return attributes_to_fetch_types.size();
}
/// Check if attribute with attribute_name was requested to fetch
bool containsAttribute(const String & attribute_name) const
{
return attributes_to_fetch_names_set.find(attribute_name) != attributes_to_fetch_names_set.end();
}
/// Check if attribute with attribute_index should be filled during fetch
bool shouldFillResultColumnWithIndex(size_t attribute_index) const
{
return attributes_to_fetch_filter[attribute_index];
}
/// Create columns for each of dictionary attributes
MutableColumns makeAttributesResultColumns() const
{
MutableColumns result;
result.reserve(attributes_to_fetch_types.size());
for (const auto & type : attributes_to_fetch_types)
result.emplace_back(type->createColumn());
return result;
}
/// Filter only requested colums
Columns filterRequestedColumns(MutableColumns & fetched_mutable_columns) const
{
Columns result;
result.reserve(attributes_to_fetch_types.size());
for (size_t fetch_request_index = 0; fetch_request_index < attributes_to_fetch_types.size(); ++fetch_request_index)
if (shouldFillResultColumnWithIndex(fetch_request_index))
result.emplace_back(std::move(fetched_mutable_columns[fetch_request_index]));
return result;
}
private:
std::unordered_set<String> attributes_to_fetch_names_set;
std::vector<bool> attributes_to_fetch_filter;
DataTypes attributes_to_fetch_types;
};
/**
* In Dictionaries implementation String attribute is stored in arena and StringRefs are pointing to it.
*/

View File

@ -8,18 +8,23 @@
namespace DB
{
/// Result of fetch from CacheDictionaryStorage
template <typename KeyType>
struct KeysStorageFetchResult
{
/// Fetched column values
MutableColumns fetched_columns;
/// Found key to index in fetched_columns
HashMap<KeyType, size_t> found_keys_to_fetched_columns_index;
/// Expired key to index in fetched_columns
HashMap<KeyType, size_t> expired_keys_to_fetched_columns_index;
/// Keys that are not found in storage
PaddedPODArray<KeyType> not_found_or_expired_keys;
/// Indexes of requested keys that are not found in storage
PaddedPODArray<size_t> not_found_or_expired_keys_indexes;
};
@ -27,90 +32,47 @@ struct KeysStorageFetchResult
using SimpleKeysStorageFetchResult = KeysStorageFetchResult<UInt64>;
using ComplexKeysStorageFetchResult = KeysStorageFetchResult<StringRef>;
class DictionaryStorageFetchRequest
{
public:
DictionaryStorageFetchRequest(const DictionaryStructure & structure, const Strings & attributes_names_to_fetch)
: attributes_to_fetch_names_set(attributes_names_to_fetch.begin(), attributes_names_to_fetch.end())
, attributes_to_fetch_filter(structure.attributes.size(), false)
{
size_t attributes_size = structure.attributes.size();
attributes_to_fetch_types.reserve(attributes_size);
for (size_t i = 0; i < attributes_size; ++i)
{
const auto & name = structure.attributes[i].name;
const auto & type = structure.attributes[i].type;
attributes_to_fetch_types.emplace_back(type);
if (attributes_to_fetch_names_set.find(name) != attributes_to_fetch_names_set.end())
{
attributes_to_fetch_filter[i] = true;
}
}
}
DictionaryStorageFetchRequest() = default;
size_t attributesSize() const
{
return attributes_to_fetch_types.size();
}
bool containsAttribute(const String & attribute_name) const
{
return attributes_to_fetch_names_set.find(attribute_name) != attributes_to_fetch_names_set.end();
}
bool shouldFillResultColumnWithIndex(size_t attribute_index) const
{
return attributes_to_fetch_filter[attribute_index];
}
MutableColumns makeAttributesResultColumns() const
{
MutableColumns result;
result.reserve(attributes_to_fetch_types.size());
for (const auto & type : attributes_to_fetch_types)
result.emplace_back(type->createColumn());
return result;
}
private:
std::unordered_set<String> attributes_to_fetch_names_set;
std::vector<bool> attributes_to_fetch_filter;
DataTypes attributes_to_fetch_types;
};
class ICacheDictionaryStorage
{
public:
virtual ~ICacheDictionaryStorage() = default;
/// Necessary if all keys are found we can return result to client without additional aggregation
virtual bool returnFetchedColumnsDuringFetchInOrderOfRequestedKeys() const = 0;
/// Does storage support simple keys
virtual bool supportsSimpleKeys() const = 0;
/// Fetch columns for keys, this method is not write thread safe
virtual SimpleKeysStorageFetchResult fetchColumnsForKeys(
const PaddedPODArray<UInt64> & keys,
const DictionaryStorageFetchRequest & fetch_request) const = 0;
const DictionaryStorageFetchRequest & fetch_request) = 0;
/// Fetch columns for keys, this method is not write thread safe
virtual void insertColumnsForKeys(const PaddedPODArray<UInt64> & keys, Columns columns) = 0;
/// Return cached simple keys
virtual PaddedPODArray<UInt64> getCachedSimpleKeys() const = 0;
/// Does storage support complex keys
virtual bool supportsComplexKeys() const = 0;
/// Fetch columns for keys, this method is not write thread safe
virtual ComplexKeysStorageFetchResult fetchColumnsForKeys(
const PaddedPODArray<StringRef> & keys,
const DictionaryStorageFetchRequest & column_fetch_requests) const = 0;
const DictionaryStorageFetchRequest & column_fetch_requests) = 0;
/// Fetch columns for keys, this method is not write thread safe
virtual void insertColumnsForKeys(const PaddedPODArray<StringRef> & keys, Columns columns) = 0;
/// Return cached simple keys
virtual PaddedPODArray<StringRef> getCachedComplexKeys() const = 0;
/// Return size of keys in storage
virtual size_t getSize() const = 0;
/// Return bytes allocated in storage
virtual size_t getBytesAllocated() const = 0;
};

View File

@ -810,6 +810,8 @@ public:
memory_buffer_partitions.emplace_back(configuration.block_size, configuration.write_buffer_blocks_size);
}
bool returnFetchedColumnsDuringFetchInOrderOfRequestedKeys() const override { return false; }
bool supportsSimpleKeys() const override
{
return dictionary_key_type == DictionaryKeyType::simple;
@ -817,7 +819,7 @@ public:
SimpleKeysStorageFetchResult fetchColumnsForKeys(
const PaddedPODArray<UInt64> & keys,
const DictionaryStorageFetchRequest & fetch_request) const override
const DictionaryStorageFetchRequest & fetch_request) override
{
if constexpr (dictionary_key_type == DictionaryKeyType::simple)
{
@ -852,7 +854,7 @@ public:
ComplexKeysStorageFetchResult fetchColumnsForKeys(
const PaddedPODArray<StringRef> & keys,
const DictionaryStorageFetchRequest & column_fetch_requests) const override
const DictionaryStorageFetchRequest & column_fetch_requests) override
{
if constexpr (dictionary_key_type == DictionaryKeyType::complex)
{
@ -1240,6 +1242,7 @@ private:
cell.deadline = now + std::chrono::seconds{distribution(rnd_engine)};
}
/// TODO: Reuse
static void deserializeAndInsertIntoColumns(MutableColumns & columns, const DictionaryStorageFetchRequest & fetch_request, const char * place_for_serialized_columns)
{
for (size_t column_index = 0; column_index < columns.size(); ++column_index)