diff --git a/src/Dictionaries/CacheDictionary.cpp b/src/Dictionaries/CacheDictionary.cpp index 6fb14251ae5..e20ed62ff21 100644 --- a/src/Dictionaries/CacheDictionary.cpp +++ b/src/Dictionaries/CacheDictionary.cpp @@ -844,13 +844,13 @@ void CacheDictionary::tryPushToUpdateQueueOrThrow(UpdateUnitPtr & update_unit_pt std::to_string(update_queue.size())); } -void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const +void CacheDictionary::update(UpdateUnitPtr & update_unit_ptr) const { CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests}; - ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, bunch_update_unit.getRequestedIds().size()); + ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, update_unit_ptr->requested_ids.size()); - std::unordered_map remaining_ids{bunch_update_unit.getRequestedIds().size()}; - for (const auto id : bunch_update_unit.getRequestedIds()) + std::unordered_map remaining_ids{update_unit_ptr->requested_ids.size()}; + for (const auto id : update_unit_ptr->requested_ids) remaining_ids.insert({id, 0}); const auto now = std::chrono::system_clock::now(); @@ -864,7 +864,7 @@ void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const Stopwatch watch; - BlockInputStreamPtr stream = current_source_ptr->loadIds(bunch_update_unit.getRequestedIds()); + BlockInputStreamPtr stream = current_source_ptr->loadIds(update_unit_ptr->requested_ids); stream->readPrefix(); @@ -917,7 +917,7 @@ void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const else cell.setExpiresAt(std::chrono::time_point::max()); - bunch_update_unit.informCallersAboutPresentId(id, cell_idx); + update_unit_ptr->present_id_handler(id, cell_idx); /// mark corresponding id as found remaining_ids[id] = 1; } @@ -979,9 +979,9 @@ void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const if (was_default) cell.setDefault(); if (was_default) - bunch_update_unit.informCallersAboutAbsentId(id, cell_idx); + update_unit_ptr->absent_id_handler(id, cell_idx); else - bunch_update_unit.informCallersAboutPresentId(id, cell_idx); + update_unit_ptr->present_id_handler(id, cell_idx); continue; } /// We don't have expired data for that `id` so all we can do is to rethrow `last_exception`. @@ -1013,7 +1013,7 @@ void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const setDefaultAttributeValue(attribute, cell_idx); /// inform caller that the cell has not been found - bunch_update_unit.informCallersAboutAbsentId(id, cell_idx); + update_unit_ptr->absent_id_handler(id, cell_idx); } ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num); diff --git a/src/Dictionaries/CacheDictionary.h b/src/Dictionaries/CacheDictionary.h index 086e41adead..9da76c6a853 100644 --- a/src/Dictionaries/CacheDictionary.h +++ b/src/Dictionaries/CacheDictionary.h @@ -419,116 +419,6 @@ private: using UpdateUnitPtr = std::shared_ptr; using UpdateQueue = ConcurrentBoundedQueue; - - /* - * This class is used to concatenate requested_keys. - * - * Imagine that we have several UpdateUnit with different vectors of keys and callbacks for that keys. - * We concatenate them into a long vector of keys that looks like: - * - * a1...ak_a b1...bk_2 c1...ck_3, - * - * where a1...ak_a are requested_keys from the first UpdateUnit. - * In addition we have the same number (three in this case) of callbacks. - * This class helps us to find a callback (or many callbacks) for a special key. - * */ - - class BunchUpdateUnit - { - public: - explicit BunchUpdateUnit(std::vector & update_request) - { - /// Here we prepare total count of all requested ids - /// not to do useless allocations later. - size_t total_requested_keys_count = 0; - - for (auto & unit_ptr: update_request) - { - total_requested_keys_count += unit_ptr->requested_ids.size(); - if (helper.empty()) - helper.push_back(unit_ptr->requested_ids.size()); - else - helper.push_back(unit_ptr->requested_ids.size() + helper.back()); - present_id_handlers.emplace_back(unit_ptr->present_id_handler); - absent_id_handlers.emplace_back(unit_ptr->absent_id_handler); - update_units.emplace_back(unit_ptr); - } - - concatenated_requested_ids.reserve(total_requested_keys_count); - for (auto & unit_ptr: update_request) - std::for_each(std::begin(unit_ptr->requested_ids), std::end(unit_ptr->requested_ids), - [&] (const Key & key) {concatenated_requested_ids.push_back(key);}); - - } - - const std::vector & getRequestedIds() - { - return concatenated_requested_ids; - } - - void informCallersAboutPresentId(Key id, size_t cell_idx) - { - for (size_t position = 0; position < concatenated_requested_ids.size(); ++position) - { - if (concatenated_requested_ids[position] == id) - { - auto unit_number = getUpdateUnitNumberForRequestedIdPosition(position); - auto lock = getLockToCurrentUnit(unit_number); - if (canUseCallback(unit_number)) - getPresentIdHandlerForPosition(unit_number)(id, cell_idx); - } - } - } - - void informCallersAboutAbsentId(Key id, size_t cell_idx) - { - for (size_t position = 0; position < concatenated_requested_ids.size(); ++position) - if (concatenated_requested_ids[position] == id) - { - auto unit_number = getUpdateUnitNumberForRequestedIdPosition(position); - auto lock = getLockToCurrentUnit(unit_number); - if (canUseCallback(unit_number)) - getAbsentIdHandlerForPosition(unit_number)(id, cell_idx); - } - } - - - private: - /// Needed for control the usage of callback to avoid SEGFAULTs. - bool canUseCallback(size_t unit_number) - { - return update_units[unit_number].get()->can_use_callback; - } - - std::unique_lock getLockToCurrentUnit(size_t unit_number) - { - return std::unique_lock(update_units[unit_number].get()->callback_mutex); - } - - PresentIdHandler & getPresentIdHandlerForPosition(size_t unit_number) - { - return update_units[unit_number].get()->present_id_handler; - } - - AbsentIdHandler & getAbsentIdHandlerForPosition(size_t unit_number) - { - return update_units[unit_number].get()->absent_id_handler; - } - - size_t getUpdateUnitNumberForRequestedIdPosition(size_t position) - { - return std::lower_bound(helper.begin(), helper.end(), position) - helper.begin(); - } - - std::vector concatenated_requested_ids; - std::vector present_id_handlers; - std::vector absent_id_handlers; - - std::vector> update_units; - - std::vector helper; - }; - mutable UpdateQueue update_queue; ThreadPool update_pool; @@ -548,7 +438,7 @@ private: * */ void updateThreadFunction(); - void update(BunchUpdateUnit & bunch_update_unit) const; + void update(UpdateUnitPtr & update_unit_ptr) const; void tryPushToUpdateQueueOrThrow(UpdateUnitPtr & update_unit_ptr) const;