got rid of timeouts + comments added

This commit is contained in:
Nikita Mikhaylov 2020-02-05 18:25:48 +03:00
parent 2d8b0dba03
commit c4666141a8
2 changed files with 53 additions and 30 deletions

View File

@ -67,7 +67,6 @@ CacheDictionary::CacheDictionary(
bool allow_read_expired_keys_, bool allow_read_expired_keys_,
size_t max_update_queue_size_, size_t max_update_queue_size_,
size_t update_queue_push_timeout_milliseconds_, size_t update_queue_push_timeout_milliseconds_,
size_t each_update_finish_timeout_seconds_,
size_t max_threads_for_updates_) size_t max_threads_for_updates_)
: database(database_) : database(database_)
, name(name_) , name(name_)
@ -78,7 +77,6 @@ CacheDictionary::CacheDictionary(
, allow_read_expired_keys(allow_read_expired_keys_) , allow_read_expired_keys(allow_read_expired_keys_)
, max_update_queue_size(max_update_queue_size_) , max_update_queue_size(max_update_queue_size_)
, update_queue_push_timeout_milliseconds(update_queue_push_timeout_milliseconds_) , update_queue_push_timeout_milliseconds(update_queue_push_timeout_milliseconds_)
, each_update_finish_timeout_seconds(each_update_finish_timeout_seconds_)
, max_threads_for_updates(max_threads_for_updates_) , max_threads_for_updates(max_threads_for_updates_)
, log(&Logger::get("ExternalDictionaries")) , log(&Logger::get("ExternalDictionaries"))
, size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))} , size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
@ -710,12 +708,6 @@ void registerDictionaryCache(DictionaryFactory & factory)
throw Exception{name + ": dictionary of layout 'cache' have too little update_queue_push_timeout", throw Exception{name + ": dictionary of layout 'cache' have too little update_queue_push_timeout",
ErrorCodes::BAD_ARGUMENTS}; ErrorCodes::BAD_ARGUMENTS};
const size_t each_update_finish_timeout_seconds =
config.getUInt64(layout_prefix + ".each_update_finish_timeout_seconds", 600);
if (each_update_finish_timeout_seconds == 0)
throw Exception{name + ": dictionary of layout 'cache' cannot have timeout equals to zero.",
ErrorCodes::BAD_ARGUMENTS};
const size_t max_threads_for_updates = const size_t max_threads_for_updates =
config.getUInt64(layout_prefix + ".max_threads_for_updates", 4); config.getUInt64(layout_prefix + ".max_threads_for_updates", 4);
if (max_threads_for_updates == 0) if (max_threads_for_updates == 0)
@ -725,7 +717,7 @@ void registerDictionaryCache(DictionaryFactory & factory)
return std::make_unique<CacheDictionary>( return std::make_unique<CacheDictionary>(
database, name, dict_struct, std::move(source_ptr), dict_lifetime, size, database, name, dict_struct, std::move(source_ptr), dict_lifetime, size,
allow_read_expired_keys, max_update_queue_size, update_queue_push_timeout_milliseconds, allow_read_expired_keys, max_update_queue_size, update_queue_push_timeout_milliseconds,
each_update_finish_timeout_seconds, max_threads_for_updates); max_threads_for_updates);
}; };
factory.registerLayout("cache", create_layout, false); factory.registerLayout("cache", create_layout, false);
} }
@ -791,13 +783,18 @@ void CacheDictionary::updateThreadFunction()
void CacheDictionary::waitForCurrentUpdateFinish(UpdateUnitPtr & update_unit_ptr) const void CacheDictionary::waitForCurrentUpdateFinish(UpdateUnitPtr & update_unit_ptr) const
{ {
std::unique_lock<std::mutex> lock(update_mutex); std::unique_lock<std::mutex> lock(update_mutex);
const auto sleeping_result = is_update_finished.wait_for(
lock,
std::chrono::seconds(each_update_finish_timeout_seconds),
[&] {return update_unit_ptr->is_done || update_unit_ptr->current_exception; });
if (!sleeping_result) /*
throw DB::Exception("Keys updating timed out", ErrorCodes::CACHE_DICTIONARY_UPDATE_FAIL); * We wait here without any timeout to avoid SEGFAULT's.
* Consider timeout for wait had expired and main query's thread ended with exception
* or some other error. But the UpdateUnit with callbacks is left in the queue.
* It has these callback that capture god knows what from the current thread
* (most of the variables lies on the stack of finished thread) that
* intended to do a synchronous update. AsyncUpdate thread can touch deallocated memory and explode.
* */
is_update_finished.wait(
lock,
[&] {return update_unit_ptr->is_done || update_unit_ptr->current_exception; });
if (update_unit_ptr->current_exception) if (update_unit_ptr->current_exception)
std::rethrow_exception(update_unit_ptr->current_exception); std::rethrow_exception(update_unit_ptr->current_exception);

View File

@ -32,14 +32,14 @@ namespace ErrorCodes
} }
/* /*
* cache_not_found_ids |0|0|1|1|
* cache_expired_ids |0|1|0|1|
* *
* 0 - if set is empty, 1 - otherwise * This dictionary is stored in a cache that has a fixed number of cells.
* * These cells contain frequently used elements.
* Only if there are no cache_not_found_ids and some cache_expired_ids * When searching for a dictionary, the cache is searched first and special heuristic is used:
* (with allow_read_expired_keys_from_cache_dictionary setting) we can perform async update. * while looking for the key, we take a look only at max_collision_length elements.
* Otherwise we have no concatenate ids and update them sync. * So, our cache is not perfect. It has errors like "the key is in cache, but the cache says that it does not".
* And in this case we simply ask external source for the key which is faster.
* You have to keep this logic in mind.
* */ * */
class CacheDictionary final : public IDictionary class CacheDictionary final : public IDictionary
{ {
@ -54,7 +54,6 @@ public:
bool allow_read_expired_keys_, bool allow_read_expired_keys_,
size_t max_update_queue_size_, size_t max_update_queue_size_,
size_t update_queue_push_timeout_milliseconds_, size_t update_queue_push_timeout_milliseconds_,
size_t each_update_finish_timeout_seconds_,
size_t max_threads_for_updates); size_t max_threads_for_updates);
~CacheDictionary() override; ~CacheDictionary() override;
@ -85,7 +84,7 @@ public:
return std::make_shared<CacheDictionary>( return std::make_shared<CacheDictionary>(
database, name, dict_struct, source_ptr->clone(), dict_lifetime, size, database, name, dict_struct, source_ptr->clone(), dict_lifetime, size,
allow_read_expired_keys, max_update_queue_size, allow_read_expired_keys, max_update_queue_size,
update_queue_push_timeout_milliseconds, each_update_finish_timeout_seconds, max_threads_for_updates); update_queue_push_timeout_milliseconds, max_threads_for_updates);
} }
const IDictionarySource * getSource() const override { return source_ptr.get(); } const IDictionarySource * getSource() const override { return source_ptr.get(); }
@ -293,7 +292,6 @@ private:
const bool allow_read_expired_keys; const bool allow_read_expired_keys;
const size_t max_update_queue_size; const size_t max_update_queue_size;
const size_t update_queue_push_timeout_milliseconds; const size_t update_queue_push_timeout_milliseconds;
const size_t each_update_finish_timeout_seconds;
const size_t max_threads_for_updates; const size_t max_threads_for_updates;
Logger * const log; Logger * const log;
@ -333,7 +331,9 @@ private:
using AbsentIdHandler = std::function<void(Key, size_t)>; using AbsentIdHandler = std::function<void(Key, size_t)>;
/* /*
* How the update goes: we basically have a method get(keys)->values. Values are cached, so sometimes we * Disclaimer: this comment is written not for fun.
*
* How the update goes: we basically have a method like get(keys)->values. Values are cached, so sometimes we
* can return them from the cache. For values not in cache, we query them from the dictionary, and add to the * can return them from the cache. For values not in cache, we query them from the dictionary, and add to the
* cache. The cache is lossy, so we can't expect it to store all the keys, and we store them separately. Normally, * cache. The cache is lossy, so we can't expect it to store all the keys, and we store them separately. Normally,
* they would be passed as a return value of get(), but for Unknown Reasons the dictionaries use a baroque * they would be passed as a return value of get(), but for Unknown Reasons the dictionaries use a baroque
@ -369,6 +369,20 @@ private:
using UpdateUnitPtr = std::shared_ptr<UpdateUnit>; using UpdateUnitPtr = std::shared_ptr<UpdateUnit>;
using UpdateQueue = ConcurrentBoundedQueue<UpdateUnitPtr>; using UpdateQueue = ConcurrentBoundedQueue<UpdateUnitPtr>;
/*
* This class is used to concatenate requested_keys.
*
* Imagine that we have several UpdateUnit with different vectors of keys and callbacks for that keys.
* We concatenate them into a long vector of keys that looks like:
*
* a1...ak_a b1...bk_2 c1...ck_3,
*
* where a1...ak_a are requested_keys from the first UpdateUnit.
* In addition we have the same number (three in this case) of callbacks.
* This class helps us to find a callback (or many callbacks) for a special key.
* */
class BunchUpdateUnit class BunchUpdateUnit
{ {
public: public:
@ -446,8 +460,24 @@ private:
ThreadPool update_pool; ThreadPool update_pool;
/*
* Actually, we can divide all requested keys into two 'buckets'. There are only four possible states and they
* are described in the table.
*
* cache_not_found_ids |0|0|1|1|
* cache_expired_ids |0|1|0|1|
*
* 0 - if set is empty, 1 - otherwise
*
* Only if there are no cache_not_found_ids and some cache_expired_ids
* (with allow_read_expired_keys_from_cache_dictionary setting) we can perform async update.
* Otherwise we have no concatenate ids and update them sync.
*
*/
void updateThreadFunction(); void updateThreadFunction();
void update(BunchUpdateUnit & bunch_update_unit) const; void update(BunchUpdateUnit & bunch_update_unit) const;
void tryPushToUpdateQueueOrThrow(UpdateUnitPtr & update_unit_ptr) const; void tryPushToUpdateQueueOrThrow(UpdateUnitPtr & update_unit_ptr) const;
void waitForCurrentUpdateFinish(UpdateUnitPtr & update_unit_ptr) const; void waitForCurrentUpdateFinish(UpdateUnitPtr & update_unit_ptr) const;
@ -455,9 +485,5 @@ private:
mutable std::condition_variable is_update_finished; mutable std::condition_variable is_update_finished;
std::atomic<bool> finished{false}; std::atomic<bool> finished{false};
}; };
} }