diff --git a/dbms/src/Dictionaries/CacheDictionary.cpp b/dbms/src/Dictionaries/CacheDictionary.cpp index 97368521399..1a6865e6aeb 100644 --- a/dbms/src/Dictionaries/CacheDictionary.cpp +++ b/dbms/src/Dictionaries/CacheDictionary.cpp @@ -12,7 +12,6 @@ #include #include #include -#include #include "CacheDictionary.inc.h" #include "DictionaryBlockInputStream.h" #include "DictionaryFactory.h" @@ -292,9 +291,7 @@ void CacheDictionary::has(const PaddedPODArray & ids, PaddedPODArray size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0; - Context * context = current_thread->getThreadGroup()->global_context; - const bool allow_read_expired_keys_from_cache_dictionary = - context->getSettingsRef().allow_read_expired_keys_from_cache_dictionary; + const bool allow_read_expired_keys_from_cache_dictionary = getAllowReadExpiredKeysSetting(); const auto rows = ext::size(ids); { @@ -380,8 +377,7 @@ void CacheDictionary::has(const PaddedPODArray & ids, PaddedPODArray } }; - UInt64 timeout{10}; - const bool res = update_queue.tryPush(update_unit, timeout); + const bool res = update_queue.tryPush(update_unit, update_queue_push_timeout_milliseconds); if (!res) throw std::runtime_error("Too many updates"); diff --git a/dbms/src/Dictionaries/CacheDictionary.h b/dbms/src/Dictionaries/CacheDictionary.h index 3f5dcc81d7f..11c7a5f08c6 100644 --- a/dbms/src/Dictionaries/CacheDictionary.h +++ b/dbms/src/Dictionaries/CacheDictionary.h @@ -16,6 +16,7 @@ #include #include #include +#include #include #include "DictionaryStructure.h" #include "IDictionary.h" @@ -313,6 +314,14 @@ private: void updateThreadFunction(); std::atomic finished{false}; + bool getAllowReadExpiredKeysSetting() const + { + Context * context = current_thread->getThreadGroup()->global_context; + return context->getSettingsRef().allow_read_expired_keys_from_cache_dictionary; + } + + const size_t update_queue_push_timeout_milliseconds = 10; + void waitForCurrentUpdateFinish() const; mutable std::mutex update_mutex; }; diff --git a/dbms/src/Dictionaries/CacheDictionary.inc.h b/dbms/src/Dictionaries/CacheDictionary.inc.h index 609442a2f2a..543074829ba 100644 --- a/dbms/src/Dictionaries/CacheDictionary.inc.h +++ b/dbms/src/Dictionaries/CacheDictionary.inc.h @@ -162,10 +162,13 @@ void CacheDictionary::getItemsString( out->getOffsets().resize_assume_reserved(0); /// Mapping: -> { all indices `i` of `ids` such that `ids[i]` = } - std::unordered_map> outdated_ids; + std::unordered_map> cache_expired_ids; + std::unordered_map> cache_not_found_ids; /// we are going to store every string separately std::unordered_map map; + const bool allow_read_expired_keys_from_cache_dictionary = getAllowReadExpiredKeysSetting(); + size_t total_length = 0; size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0; { @@ -177,17 +180,10 @@ void CacheDictionary::getItemsString( const auto id = ids[row]; const auto find_result = findCellIdx(id, now); - if (!find_result.valid) + + + auto insert_value_routine = [&]() { - outdated_ids[id].push_back(row); - if (find_result.outdated) - ++cache_expired; - else - ++cache_not_found; - } - else - { - ++cache_hit; const auto & cell_idx = find_result.cell_idx; const auto & cell = cells[cell_idx]; const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx]; @@ -196,6 +192,30 @@ void CacheDictionary::getItemsString( map[id] = String{string_ref}; total_length += string_ref.size + 1; + }; + + if (!find_result.valid) + { + if (find_result.outdated) + { + cache_expired_ids[id].push_back(row); + ++cache_expired; + + if (allow_read_expired_keys_from_cache_dictionary) + { + insert_value_routine(); + } + } + else + { + cache_not_found_ids[id].push_back(row); + ++cache_not_found; + } + } + else + { + ++cache_hit; + insert_value_routine(); } } } @@ -205,28 +225,67 @@ void CacheDictionary::getItemsString( ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit); query_count.fetch_add(rows, std::memory_order_relaxed); - hit_count.fetch_add(rows - outdated_ids.size(), std::memory_order_release); + const size_t outdated_ids_count = cache_expired + cache_not_found; + hit_count.fetch_add(rows - outdated_ids_count, std::memory_order_release); + + if (!cache_expired_ids.empty()) + { + std::vector required_expired_ids(cache_not_found_ids.size()); + std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::begin(required_expired_ids), [](auto & pair) { return pair.first; }); + + if (allow_read_expired_keys_from_cache_dictionary) + { + UpdateUnit update_unit{required_expired_ids, [&](const auto, const auto) {}, [&](const auto, const auto) {}}; + if (!update_queue.tryPush(update_unit, update_queue_push_timeout_milliseconds)) + throw std::runtime_error("Too many updates"); + } + else + { + UpdateUnit update_unit{ + required_expired_ids, + [&](const auto id, const auto cell_idx) + { + const auto attribute_value = attribute_array[cell_idx]; + + map[id] = String{attribute_value}; + total_length += (attribute_value.size + 1) * cache_not_found_ids[id].size(); + }, + [&](const auto id, const auto) + { + for (const auto row : cache_not_found_ids[id]) + total_length += get_default(row).size + 1; + }}; + + if (!update_queue.tryPush(update_unit, update_queue_push_timeout_milliseconds)) + throw std::runtime_error("Too many updates"); + + waitForCurrentUpdateFinish(); + } + } /// request new values - if (!outdated_ids.empty()) + if (!cache_not_found_ids.empty()) { - std::vector required_ids(outdated_ids.size()); - std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids), [](auto & pair) { return pair.first; }); + std::vector required_ids(cache_not_found_ids.size()); + std::transform(std::begin(cache_not_found_ids), std::end(cache_not_found_ids), std::begin(required_ids), [](auto & pair) { return pair.first; }); - update( + UpdateUnit update_unit{ required_ids, [&](const auto id, const auto cell_idx) { const auto attribute_value = attribute_array[cell_idx]; map[id] = String{attribute_value}; - total_length += (attribute_value.size + 1) * outdated_ids[id].size(); + total_length += (attribute_value.size + 1) * cache_not_found_ids[id].size(); }, [&](const auto id, const auto) { - for (const auto row : outdated_ids[id]) + for (const auto row : cache_not_found_ids[id]) total_length += get_default(row).size + 1; - }); + }}; + + if (!update_queue.tryPush(update_unit, update_queue_push_timeout_milliseconds)) + throw std::runtime_error("Too many updates"); } out->getChars().reserve(total_length);