better style

This commit is contained in:
Nikita Mikhaylov 2020-01-30 18:18:12 +03:00
parent c5e664a072
commit b19f76df01
2 changed files with 24 additions and 29 deletions

View File

@ -327,8 +327,7 @@ void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8>
auto insert_to_answer_routine = [&] ()
{
const auto & cell = cells[cell_idx];
out[row] = !cell.isDefault();
out[row] = !cells[cell_idx].isDefault();
};
if (!find_result.valid)
@ -749,7 +748,7 @@ void CacheDictionary::updateThreadFunction()
if (current_queue_size > 0)
LOG_TRACE(log, "Performing bunch of keys update in cache dictionary with "
<< current_queue_size+1 << " keys" );
<< current_queue_size + 1 << " keys");
std::vector<UpdateUnitPtr> update_request;
update_request.reserve(current_queue_size + 1);
@ -789,7 +788,7 @@ void CacheDictionary::updateThreadFunction()
}
}
void CacheDictionary::waitForCurrentUpdateFinish(UpdateUnitPtr update_unit_ptr) const
void CacheDictionary::waitForCurrentUpdateFinish(UpdateUnitPtr & update_unit_ptr) const
{
std::unique_lock<std::mutex> lock(update_mutex);
const auto sleeping_result = is_update_finished.wait_for(
@ -804,14 +803,14 @@ void CacheDictionary::waitForCurrentUpdateFinish(UpdateUnitPtr update_unit_ptr)
std::rethrow_exception(update_unit_ptr->current_exception);
}
void CacheDictionary::tryPushToUpdateQueueOrThrow(UpdateUnitPtr update_unit_ptr) const
void CacheDictionary::tryPushToUpdateQueueOrThrow(UpdateUnitPtr & update_unit_ptr) const
{
if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
throw DB::Exception("Cannot push to internal update queue. Current queue size is " +
std::to_string(update_queue.size()), ErrorCodes::CACHE_DICTIONARY_UPDATE_FAIL);
}
void CacheDictionary::update(BunchUpdateUnit bunch_update_unit) const
void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, bunch_update_unit.getRequestedIds().size());

View File

@ -332,8 +332,6 @@ private:
using PresentIdHandler = std::function<void(Key, size_t)>;
using AbsentIdHandler = std::function<void(Key, size_t)>;
using FoundIdsMaskPtr = std::shared_ptr<std::unordered_map<Key, UInt8>>;
struct UpdateUnit
{
UpdateUnit(std::vector<Key> requested_ids_,
@ -359,23 +357,10 @@ private:
using UpdateUnitPtr = std::shared_ptr<UpdateUnit>;
using UpdateQueue = ConcurrentBoundedQueue<UpdateUnitPtr>;
mutable UpdateQueue update_queue;
ThreadPool update_pool;
void updateThreadFunction();
void tryPushToUpdateQueueOrThrow(UpdateUnitPtr update_unit_ptr) const;
void waitForCurrentUpdateFinish(UpdateUnitPtr update_unit_ptr) const;
mutable std::mutex update_mutex;
mutable std::condition_variable is_update_finished;
std::atomic<bool> finished{false};
class BunchUpdateUnit
{
public:
explicit BunchUpdateUnit(std::vector<UpdateUnitPtr> update_request)
explicit BunchUpdateUnit(std::vector<UpdateUnitPtr> & update_request)
{
/// Here we prepare total count of all requested ids
/// not to do useless allocations later.
@ -416,19 +401,16 @@ private:
void informCallersAboutAbsentId(Key id, size_t cell_idx)
{
for (size_t i = 0; i < concatenated_requested_ids.size(); ++i) {
auto &curr = concatenated_requested_ids[i];
if (curr == id)
for (size_t i = 0; i < concatenated_requested_ids.size(); ++i)
if (concatenated_requested_ids[i] == id)
getAbsentIdHandlerForPosition(i)(id, cell_idx);
}
}
private:
PresentIdHandler & getPresentIdHandlerForPosition(size_t position)
{
auto i = getUpdateUnitNumberForRequestedIdPosition(position);
return present_id_handlers[i];
return present_id_handlers[getUpdateUnitNumberForRequestedIdPosition(position)];
}
AbsentIdHandler & getAbsentIdHandlerForPosition(size_t position)
@ -448,8 +430,22 @@ private:
std::vector<size_t> helper;
};
mutable UpdateQueue update_queue;
ThreadPool update_pool;
void updateThreadFunction();
void update(BunchUpdateUnit & bunch_update_unit) const;
void tryPushToUpdateQueueOrThrow(UpdateUnitPtr & update_unit_ptr) const;
void waitForCurrentUpdateFinish(UpdateUnitPtr & update_unit_ptr) const;
mutable std::mutex update_mutex;
mutable std::condition_variable is_update_finished;
std::atomic<bool> finished{false};
void update(BunchUpdateUnit bunch_update_unit) const;
};
}