mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-13 18:02:24 +00:00
fix stupid bug + codestyle
This commit is contained in:
parent
28d0f3324a
commit
cce0097d9e
@ -716,22 +716,35 @@ void CacheDictionary::updateThreadFunction()
|
|||||||
setThreadName("AsyncUpdater");
|
setThreadName("AsyncUpdater");
|
||||||
while (!finished)
|
while (!finished)
|
||||||
{
|
{
|
||||||
|
UpdateUnitPtr first_popped;
|
||||||
|
update_queue.pop(first_popped);
|
||||||
|
|
||||||
/// Here we pop as many unit pointers from update queue as we can.
|
/// Here we pop as many unit pointers from update queue as we can.
|
||||||
/// We fix current size to avoid livelock (or too long waiting),
|
/// We fix current size to avoid livelock (or too long waiting),
|
||||||
/// when this thread pops from the queue and other threads push to the queue.
|
/// when this thread pops from the queue and other threads push to the queue.
|
||||||
const size_t current_queue_size = update_queue.size();
|
const size_t current_queue_size = update_queue.size();
|
||||||
std::vector<UpdateUnitPtr> update_request(current_queue_size);
|
|
||||||
|
/// Word "bunch" must present in this log message, because it is being checked in tests.
|
||||||
|
if (current_queue_size > 0)
|
||||||
|
LOG_DEBUG(log, "Performing a bunch of keys update in cache dictionary.");
|
||||||
|
|
||||||
|
/// We use deque since there is first_popped pointer.
|
||||||
|
/// And we have to add to the update_request without breaking order.
|
||||||
|
std::deque<UpdateUnitPtr> update_request(current_queue_size);
|
||||||
|
|
||||||
for (auto & unit_ptr: update_request)
|
for (auto & unit_ptr: update_request)
|
||||||
update_queue.pop(unit_ptr);
|
update_queue.pop(unit_ptr);
|
||||||
|
|
||||||
|
update_request.push_front(first_popped);
|
||||||
|
|
||||||
/// Here we prepare total count of all requested ids
|
/// Here we prepare total count of all requested ids
|
||||||
/// not to do useless allocations later.
|
/// not to do useless allocations later.
|
||||||
size_t requested_keys_count = 0;
|
size_t total_requested_keys_count = 0;
|
||||||
for (auto & unit_ptr: update_request)
|
for (auto & unit_ptr: update_request)
|
||||||
requested_keys_count += unit_ptr->requested_ids.size();
|
total_requested_keys_count += unit_ptr->requested_ids.size();
|
||||||
|
|
||||||
std::vector<Key> concatenated_requested_ids;
|
std::vector<Key> concatenated_requested_ids;
|
||||||
concatenated_requested_ids.reserve(requested_keys_count);
|
concatenated_requested_ids.reserve(total_requested_keys_count);
|
||||||
for (auto & unit_ptr: update_request)
|
for (auto & unit_ptr: update_request)
|
||||||
std::for_each(std::begin(unit_ptr->requested_ids), std::end(unit_ptr->requested_ids),
|
std::for_each(std::begin(unit_ptr->requested_ids), std::end(unit_ptr->requested_ids),
|
||||||
[&] (const Key & key) {concatenated_requested_ids.push_back(key);});
|
[&] (const Key & key) {concatenated_requested_ids.push_back(key);});
|
||||||
@ -754,8 +767,10 @@ void CacheDictionary::updateThreadFunction()
|
|||||||
/// Notify all threads about finished updating the bunch of ids
|
/// Notify all threads about finished updating the bunch of ids
|
||||||
/// where their own ids were included.
|
/// where their own ids were included.
|
||||||
std::unique_lock<std::mutex> lock(update_mutex);
|
std::unique_lock<std::mutex> lock(update_mutex);
|
||||||
|
|
||||||
for (auto & unit_ptr: update_request)
|
for (auto & unit_ptr: update_request)
|
||||||
unit_ptr->is_done = true;
|
unit_ptr->is_done = true;
|
||||||
|
|
||||||
is_update_finished.notify_all();
|
is_update_finished.notify_all();
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
|
@ -52,7 +52,7 @@ public:
|
|||||||
const bool allow_read_expired_keys_,
|
const bool allow_read_expired_keys_,
|
||||||
const size_t max_update_queue_size_,
|
const size_t max_update_queue_size_,
|
||||||
const size_t update_queue_push_timeout_milliseconds_,
|
const size_t update_queue_push_timeout_milliseconds_,
|
||||||
const size_t timeout_for_each_update_finish_);
|
const size_t each_update_finish_timeout_seconds_);
|
||||||
|
|
||||||
~CacheDictionary() override;
|
~CacheDictionary() override;
|
||||||
|
|
||||||
|
@ -21,7 +21,6 @@ extern const Event DictCacheRequestTimeNs;
|
|||||||
extern const Event DictCacheRequests;
|
extern const Event DictCacheRequests;
|
||||||
extern const Event DictCacheLockWriteNs;
|
extern const Event DictCacheLockWriteNs;
|
||||||
extern const Event DictCacheLockReadNs;
|
extern const Event DictCacheLockReadNs;
|
||||||
extern const Event DictCacheReadsRottedValues;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace CurrentMetrics
|
namespace CurrentMetrics
|
||||||
@ -175,8 +174,7 @@ void CacheDictionary::getItemsString(
|
|||||||
/// save on some allocations
|
/// save on some allocations
|
||||||
out->getOffsets().reserve(rows);
|
out->getOffsets().reserve(rows);
|
||||||
|
|
||||||
auto &attribute_array = std::get<ContainerPtrType < StringRef>>
|
auto & attribute_array = std::get<ContainerPtrType < StringRef>>(attribute.arrays);
|
||||||
(attribute.arrays);
|
|
||||||
|
|
||||||
auto found_outdated_values = false;
|
auto found_outdated_values = false;
|
||||||
|
|
||||||
@ -198,8 +196,8 @@ void CacheDictionary::getItemsString(
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
const auto &cell_idx = find_result.cell_idx;
|
const auto & cell_idx = find_result.cell_idx;
|
||||||
const auto &cell = cells[cell_idx];
|
const auto & cell = cells[cell_idx];
|
||||||
const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx];
|
const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx];
|
||||||
out->insertData(string_ref.data, string_ref.size);
|
out->insertData(string_ref.data, string_ref.size);
|
||||||
}
|
}
|
||||||
@ -274,7 +272,7 @@ void CacheDictionary::getItemsString(
|
|||||||
query_count.fetch_add(rows, std::memory_order_relaxed);
|
query_count.fetch_add(rows, std::memory_order_relaxed);
|
||||||
hit_count.fetch_add(rows - cache_expired_ids.size() - cache_not_found_ids.size(), std::memory_order_release);
|
hit_count.fetch_add(rows - cache_expired_ids.size() - cache_not_found_ids.size(), std::memory_order_release);
|
||||||
|
|
||||||
/// Async udpdare of expired keys.
|
/// Async update of expired keys.
|
||||||
if (cache_not_found_ids.empty())
|
if (cache_not_found_ids.empty())
|
||||||
{
|
{
|
||||||
if (allow_read_expired_keys && !cache_expired_ids.empty())
|
if (allow_read_expired_keys && !cache_expired_ids.empty())
|
||||||
|
Loading…
Reference in New Issue
Block a user