From be5729e1135a14639d8623190317ef9c4a37e2ad Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Thu, 19 Dec 2019 21:22:04 +0300 Subject: [PATCH] mvc --- dbms/src/Core/Settings.h | 1 + dbms/src/Dictionaries/CacheDictionary.cpp | 120 ++++++++++++++++---- dbms/src/Dictionaries/CacheDictionary.h | 27 +++++ dbms/src/Dictionaries/CacheDictionary.inc.h | 1 + 4 files changed, 130 insertions(+), 19 deletions(-) diff --git a/dbms/src/Core/Settings.h b/dbms/src/Core/Settings.h index 5a8af895610..4298232ac9c 100644 --- a/dbms/src/Core/Settings.h +++ b/dbms/src/Core/Settings.h @@ -350,6 +350,7 @@ struct Settings : public SettingsCollection M(SettingUInt64, low_cardinality_max_dictionary_size, 8192, "Maximum size (in rows) of shared global dictionary for LowCardinality type.", 0) \ M(SettingBool, low_cardinality_use_single_dictionary_for_part, false, "LowCardinality type serialization setting. If is true, than will use additional keys when global dictionary overflows. Otherwise, will create several shared dictionaries.", 0) \ M(SettingBool, decimal_check_overflow, true, "Check overflow of decimal arithmetic/comparison operations", 0) \ + M(SettingBool, allow_read_expired_keys_from_cache_dictionary, false, "Allow read keys if their lifetime is expired.", 0) \ \ M(SettingBool, prefer_localhost_replica, 1, "1 - always send query to local replica, if it exists. 0 - choose replica to send query between local and remote ones according to load_balancing", 0) \ M(SettingUInt64, max_fetch_partition_retries_count, 5, "Amount of retries while fetching partition from another host.", 0) \ diff --git a/dbms/src/Dictionaries/CacheDictionary.cpp b/dbms/src/Dictionaries/CacheDictionary.cpp index 4dcb87c7b8a..97368521399 100644 --- a/dbms/src/Dictionaries/CacheDictionary.cpp +++ b/dbms/src/Dictionaries/CacheDictionary.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include "CacheDictionary.inc.h" #include "DictionaryBlockInputStream.h" #include "DictionaryFactory.h" @@ -76,6 +77,14 @@ CacheDictionary::CacheDictionary( throw Exception{name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD}; createAttributes(); + update_thread = ThreadFromGlobalPool([this] { updateThreadFunction(); }); +} + +CacheDictionary::~CacheDictionary() +{ + finished = true; + update_queue.clear(); + update_thread.join(); } @@ -272,11 +281,21 @@ CacheDictionary::FindResult CacheDictionary::findCellIdx(const Key & id, const C void CacheDictionary::has(const PaddedPODArray & ids, PaddedPODArray & out) const { + /// There are three types of ids. + /// - Valid ids. These ids are presented in local cache and their lifetime is not expired. + /// - CacheExpired ids. Ids that are in local cache, but their values are rotted (lifetime is expired). + /// - CacheNotFound ids. We have to go to external storage to know its value. + /// Mapping: -> { all indices `i` of `ids` such that `ids[i]` = } - std::unordered_map> outdated_ids; + std::unordered_map> cache_expired_ids; + std::unordered_map> cache_not_found_ids; size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0; + Context * context = current_thread->getThreadGroup()->global_context; + const bool allow_read_expired_keys_from_cache_dictionary = + context->getSettingsRef().allow_read_expired_keys_from_cache_dictionary; + const auto rows = ext::size(ids); { const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; @@ -290,11 +309,22 @@ void CacheDictionary::has(const PaddedPODArray & ids, PaddedPODArray const auto & cell_idx = find_result.cell_idx; if (!find_result.valid) { - outdated_ids[id].push_back(row); if (find_result.outdated) + { + cache_expired_ids[id].push_back(row); ++cache_expired; + + if (allow_read_expired_keys_from_cache_dictionary) + { + const auto & cell = cells[cell_idx]; + out[row] = !cell.isDefault(); + } + } else + { + cache_not_found_ids[id].push_back(row); ++cache_not_found; + } } else { @@ -310,27 +340,53 @@ void CacheDictionary::has(const PaddedPODArray & ids, PaddedPODArray ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit); query_count.fetch_add(rows, std::memory_order_relaxed); - hit_count.fetch_add(rows - outdated_ids.size(), std::memory_order_release); + const size_t outdated_ids_count = cache_expired + cache_not_found; + hit_count.fetch_add(rows - outdated_ids_count, std::memory_order_release); - if (outdated_ids.empty()) + /// We have no keys to update. + if (outdated_ids_count == 0) return; - std::vector required_ids(outdated_ids.size()); - std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids), [](auto & pair) { return pair.first; }); + /// Schedule an update job for expired keys. (At this point we have only expired keys.) + /// No need to wait for expired keys being updated. + if (allow_read_expired_keys_from_cache_dictionary && cache_not_found == 0) + { + std::vector required_expired_ids(cache_expired_ids.size()); + std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::begin(required_expired_ids), [](auto & pair) { return pair.first; }); + /// Callbacks are empty because we don't want to receive them after an unknown period of time. + UpdateUnit update_unit{std::move(required_expired_ids), [&](const Key, const size_t){}, [&](const Key, const size_t){}}; + UInt64 timeout{10}; /// TODO: make setting or a field called update_queue_push_timeout; + if (!update_queue.tryPush(update_unit, timeout)) + throw std::runtime_error("Can't schedule an update job."); + return; + } - /// request new values - update( - required_ids, - [&](const auto id, const auto) - { - for (const auto row : outdated_ids[id]) - out[row] = true; - }, - [&](const auto id, const auto) - { - for (const auto row : outdated_ids[id]) - out[row] = false; - }); + /// At this point we have two situations. There may be both types of keys: expired and not found. + /// We will update them all synchronously. + + std::vector required_ids(cache_not_found_ids.size()); + std::transform(std::begin(cache_not_found_ids), std::end(cache_not_found_ids), std::begin(required_ids), [](auto & pair) { return pair.first; }); + std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::begin(required_ids), [](auto & pair) { return pair.first; }); + + UpdateUnit update_unit{ + std::move(required_ids), + [&](const Key id, const size_t) { + for (const auto row : cache_not_found_ids[id]) + out[row] = true; + }, + [&](const Key id, const size_t) { + for (const auto row : cache_not_found_ids[id]) + out[row] = false; + } + }; + + UInt64 timeout{10}; + const bool res = update_queue.tryPush(update_unit, timeout); + + if (!res) + throw std::runtime_error("Too many updates"); + + waitForCurrentUpdateFinish(); } @@ -610,5 +666,31 @@ void registerDictionaryCache(DictionaryFactory & factory) factory.registerLayout("cache", create_layout, false); } +void CacheDictionary::updateThreadFunction() +{ + try + { + while (!finished) + { + UpdateUnit unit; + update_queue.pop(unit); + + update(unit.requested_ids, unit.on_cell_updated, unit.on_id_not_found); + last_update.fetch_add(1); + } + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } +} + +void CacheDictionary::waitForCurrentUpdateFinish() const +{ + size_t current_update_number = update_number.fetch_add(1); + while (last_update != current_update_number) + std::this_thread::yield(); +} + } diff --git a/dbms/src/Dictionaries/CacheDictionary.h b/dbms/src/Dictionaries/CacheDictionary.h index b5065a63922..3f5dcc81d7f 100644 --- a/dbms/src/Dictionaries/CacheDictionary.h +++ b/dbms/src/Dictionaries/CacheDictionary.h @@ -4,12 +4,15 @@ #include #include #include +#include #include #include #include #include #include #include +#include +#include #include #include #include @@ -31,6 +34,8 @@ public: const DictionaryLifetime dict_lifetime_, const size_t size_); + ~CacheDictionary() override; + std::string getName() const override { return name; } std::string getTypeName() const override { return "Cache"; } @@ -288,6 +293,28 @@ private: mutable std::atomic element_count{0}; mutable std::atomic hit_count{0}; mutable std::atomic query_count{0}; + + mutable std::atomic update_number{1}; + mutable std::atomic last_update{0}; + + struct UpdateUnit + { + std::vector requested_ids; + std::function on_cell_updated; + std::function on_id_not_found; + }; + + using UpdateQueue = ConcurrentBoundedQueue; + + // TODO: make setting called max_updates_number + mutable UpdateQueue update_queue{10}; + + ThreadFromGlobalPool update_thread; + void updateThreadFunction(); + std::atomic finished{false}; + + void waitForCurrentUpdateFinish() const; + mutable std::mutex update_mutex; }; } diff --git a/dbms/src/Dictionaries/CacheDictionary.inc.h b/dbms/src/Dictionaries/CacheDictionary.inc.h index 87005ac821f..609442a2f2a 100644 --- a/dbms/src/Dictionaries/CacheDictionary.inc.h +++ b/dbms/src/Dictionaries/CacheDictionary.inc.h @@ -21,6 +21,7 @@ extern const Event DictCacheRequestTimeNs; extern const Event DictCacheRequests; extern const Event DictCacheLockWriteNs; extern const Event DictCacheLockReadNs; +extern const Event DictCacheReadsRottedValues; } namespace CurrentMetrics