first attempt

This commit is contained in:
Nikita Mikhaylov 2020-08-11 22:39:48 +03:00
parent 3fa00a72d4
commit 519c5c500e
2 changed files with 10 additions and 120 deletions

View File

@ -844,13 +844,13 @@ void CacheDictionary::tryPushToUpdateQueueOrThrow(UpdateUnitPtr & update_unit_pt
std::to_string(update_queue.size()));
}
void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const
void CacheDictionary::update(UpdateUnitPtr & update_unit_ptr) const
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, bunch_update_unit.getRequestedIds().size());
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, update_unit_ptr->requested_ids.size());
std::unordered_map<Key, UInt8> remaining_ids{bunch_update_unit.getRequestedIds().size()};
for (const auto id : bunch_update_unit.getRequestedIds())
std::unordered_map<Key, UInt8> remaining_ids{update_unit_ptr->requested_ids.size()};
for (const auto id : update_unit_ptr->requested_ids)
remaining_ids.insert({id, 0});
const auto now = std::chrono::system_clock::now();
@ -864,7 +864,7 @@ void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const
Stopwatch watch;
BlockInputStreamPtr stream = current_source_ptr->loadIds(bunch_update_unit.getRequestedIds());
BlockInputStreamPtr stream = current_source_ptr->loadIds(update_unit_ptr->requested_ids);
stream->readPrefix();
@ -917,7 +917,7 @@ void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const
else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
bunch_update_unit.informCallersAboutPresentId(id, cell_idx);
update_unit_ptr->present_id_handler(id, cell_idx);
/// mark corresponding id as found
remaining_ids[id] = 1;
}
@ -979,9 +979,9 @@ void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const
if (was_default)
cell.setDefault();
if (was_default)
bunch_update_unit.informCallersAboutAbsentId(id, cell_idx);
update_unit_ptr->absent_id_handler(id, cell_idx);
else
bunch_update_unit.informCallersAboutPresentId(id, cell_idx);
update_unit_ptr->present_id_handler(id, cell_idx);
continue;
}
/// We don't have expired data for that `id` so all we can do is to rethrow `last_exception`.
@ -1013,7 +1013,7 @@ void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const
setDefaultAttributeValue(attribute, cell_idx);
/// inform caller that the cell has not been found
bunch_update_unit.informCallersAboutAbsentId(id, cell_idx);
update_unit_ptr->absent_id_handler(id, cell_idx);
}
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num);

View File

@ -419,116 +419,6 @@ private:
using UpdateUnitPtr = std::shared_ptr<UpdateUnit>;
using UpdateQueue = ConcurrentBoundedQueue<UpdateUnitPtr>;
/*
* This class is used to concatenate requested_keys.
*
* Imagine that we have several UpdateUnit with different vectors of keys and callbacks for that keys.
* We concatenate them into a long vector of keys that looks like:
*
* a1...ak_a b1...bk_2 c1...ck_3,
*
* where a1...ak_a are requested_keys from the first UpdateUnit.
* In addition we have the same number (three in this case) of callbacks.
* This class helps us to find a callback (or many callbacks) for a special key.
* */
class BunchUpdateUnit
{
public:
explicit BunchUpdateUnit(std::vector<UpdateUnitPtr> & update_request)
{
/// Here we prepare total count of all requested ids
/// not to do useless allocations later.
size_t total_requested_keys_count = 0;
for (auto & unit_ptr: update_request)
{
total_requested_keys_count += unit_ptr->requested_ids.size();
if (helper.empty())
helper.push_back(unit_ptr->requested_ids.size());
else
helper.push_back(unit_ptr->requested_ids.size() + helper.back());
present_id_handlers.emplace_back(unit_ptr->present_id_handler);
absent_id_handlers.emplace_back(unit_ptr->absent_id_handler);
update_units.emplace_back(unit_ptr);
}
concatenated_requested_ids.reserve(total_requested_keys_count);
for (auto & unit_ptr: update_request)
std::for_each(std::begin(unit_ptr->requested_ids), std::end(unit_ptr->requested_ids),
[&] (const Key & key) {concatenated_requested_ids.push_back(key);});
}
const std::vector<Key> & getRequestedIds()
{
return concatenated_requested_ids;
}
void informCallersAboutPresentId(Key id, size_t cell_idx)
{
for (size_t position = 0; position < concatenated_requested_ids.size(); ++position)
{
if (concatenated_requested_ids[position] == id)
{
auto unit_number = getUpdateUnitNumberForRequestedIdPosition(position);
auto lock = getLockToCurrentUnit(unit_number);
if (canUseCallback(unit_number))
getPresentIdHandlerForPosition(unit_number)(id, cell_idx);
}
}
}
void informCallersAboutAbsentId(Key id, size_t cell_idx)
{
for (size_t position = 0; position < concatenated_requested_ids.size(); ++position)
if (concatenated_requested_ids[position] == id)
{
auto unit_number = getUpdateUnitNumberForRequestedIdPosition(position);
auto lock = getLockToCurrentUnit(unit_number);
if (canUseCallback(unit_number))
getAbsentIdHandlerForPosition(unit_number)(id, cell_idx);
}
}
private:
/// Needed for control the usage of callback to avoid SEGFAULTs.
bool canUseCallback(size_t unit_number)
{
return update_units[unit_number].get()->can_use_callback;
}
std::unique_lock<std::mutex> getLockToCurrentUnit(size_t unit_number)
{
return std::unique_lock<std::mutex>(update_units[unit_number].get()->callback_mutex);
}
PresentIdHandler & getPresentIdHandlerForPosition(size_t unit_number)
{
return update_units[unit_number].get()->present_id_handler;
}
AbsentIdHandler & getAbsentIdHandlerForPosition(size_t unit_number)
{
return update_units[unit_number].get()->absent_id_handler;
}
size_t getUpdateUnitNumberForRequestedIdPosition(size_t position)
{
return std::lower_bound(helper.begin(), helper.end(), position) - helper.begin();
}
std::vector<Key> concatenated_requested_ids;
std::vector<PresentIdHandler> present_id_handlers;
std::vector<AbsentIdHandler> absent_id_handlers;
std::vector<std::reference_wrapper<UpdateUnitPtr>> update_units;
std::vector<size_t> helper;
};
mutable UpdateQueue update_queue;
ThreadPool update_pool;
@ -548,7 +438,7 @@ private:
*
*/
void updateThreadFunction();
void update(BunchUpdateUnit & bunch_update_unit) const;
void update(UpdateUnitPtr & update_unit_ptr) const;
void tryPushToUpdateQueueOrThrow(UpdateUnitPtr & update_unit_ptr) const;