Better code

This commit is contained in:
alesapin 2020-07-20 16:44:07 +03:00
parent 6b7d1033e6
commit b9a0d0802c
2 changed files with 27 additions and 18 deletions

View File

@ -93,7 +93,7 @@ CacheDictionary::CacheDictionary(
, update_queue(max_update_queue_size_)
, update_pool(max_threads_for_updates)
{
if (!this->source_ptr->supportsSelectiveLoad())
if (!source_ptr->supportsSelectiveLoad())
throw Exception{full_name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD};
createAttributes();
@ -865,20 +865,11 @@ void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const
{
try
{
if (error_count)
{
/// Update source write write lock
std::unique_lock source_lock(source_mutex);
/// Recover after error: we have to clone the source here because
/// it could keep connections which should be reset after error.
source_ptr = source_ptr->clone();
}
auto current_source_ptr = getDictionarySourceOrUpdate();
/// Working with source_ptr with read lock. source_ptr has to be alive while stream alive
std::shared_lock source_lock(source_mutex);
Stopwatch watch;
BlockInputStreamPtr stream = source_ptr->loadIds(bunch_update_unit.getRequestedIds());
BlockInputStreamPtr stream = current_source_ptr->loadIds(bunch_update_unit.getRequestedIds());
stream->readPrefix();

View File

@ -92,7 +92,7 @@ public:
database,
name,
dict_struct,
source_ptr->clone(),
getDictionarySourceOrUpdate()->clone(),
dict_lifetime,
strict_max_lifetime_seconds,
size,
@ -289,6 +289,24 @@ private:
Attribute & getAttribute(const std::string & attribute_name) const;
using SharedDictionarySourcePtr = std::shared_ptr<IDictionarySource>;
/// Update dictionary source pointer if required and return
/// it. Thread safe.
SharedDictionarySourcePtr getDictionarySourceOrUpdate() const
{
std::lock_guard lock(source_mutex);
if (error_count)
{
/// Recover after error: we have to clone the source here because
/// it could keep connections which should be reset after error.
auto new_source_ptr = source_ptr->clone();
source_ptr = std::move(new_source_ptr);
}
return source_ptr;
}
struct FindResult
{
const size_t cell_idx;
@ -305,7 +323,11 @@ private:
const std::string name;
const std::string full_name;
const DictionaryStructure dict_struct;
mutable DictionarySourcePtr source_ptr;
/// Dictionary source should be used without mutex
mutable std::mutex source_mutex;
mutable SharedDictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
const size_t strict_max_lifetime_seconds;
const bool allow_read_expired_keys;
@ -320,10 +342,6 @@ private:
/// write, when it need to update cache state all other functions just
/// readers. Suprisingly this lock is also used for last_exception pointer.
mutable std::shared_mutex rw_lock;
/// This lock is used in update function because it is called from different
/// threads. If one thread deside to update source_ptr, than all other
/// threads should not use previous version of the ptr.
mutable std::shared_mutex source_mutex;
/// Actual size will be increased to match power of 2
const size_t size;