add condvar usage and better code

This commit is contained in:
Nikita Mikhaylov 2019-12-26 00:43:12 +03:00
parent 77c4727e50
commit 208db45668
4 changed files with 216 additions and 216 deletions

View File

@ -476,6 +476,7 @@ namespace ErrorCodes
extern const int S3_ERROR = 499;
extern const int CANNOT_CREATE_DICTIONARY_FROM_METADATA = 500;
extern const int CANNOT_CREATE_DATABASE = 501;
extern const int CACHE_DICTIONARY_UPDATE_FAIL = 502;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -12,6 +12,7 @@
#include <Common/typeid_cast.h>
#include <ext/range.h>
#include <ext/size.h>
#include <Common/setThreadName.h>
#include "CacheDictionary.inc.h"
#include "DictionaryBlockInputStream.h"
#include "DictionaryFactory.h"
@ -61,16 +62,23 @@ CacheDictionary::CacheDictionary(
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
const size_t size_)
const size_t size_,
const bool allow_read_expired_keys_,
const size_t max_update_queue_size_,
const size_t update_queue_push_timeout_milliseconds_)
: name{name_}
, dict_struct(dict_struct_)
, source_ptr{std::move(source_ptr_)}
, dict_lifetime(dict_lifetime_)
, allow_read_expired_keys(allow_read_expired_keys_)
, max_update_queue_size(max_update_queue_size_)
, update_queue_push_timeout_milliseconds(update_queue_push_timeout_milliseconds_)
, log(&Logger::get("ExternalDictionaries"))
, size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
, size_overlap_mask{this->size - 1}
, cells{this->size}
, rnd_engine(randomSeed())
, update_queue(max_update_queue_size_)
{
if (!this->source_ptr->supportsSelectiveLoad())
throw Exception{name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD};
@ -289,9 +297,7 @@ void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8>
std::unordered_map<Key, std::vector<size_t>> cache_expired_ids;
std::unordered_map<Key, std::vector<size_t>> cache_not_found_ids;
size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0;
const bool allow_read_expired_keys_from_cache_dictionary = getAllowReadExpiredKeysSetting();
size_t cache_hit = 0;
const auto rows = ext::size(ids);
{
@ -304,76 +310,92 @@ void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8>
const auto id = ids[row];
const auto find_result = findCellIdx(id, now);
const auto & cell_idx = find_result.cell_idx;
auto insert_to_answer_routine = [&] ()
{
const auto & cell = cells[cell_idx];
out[row] = !cell.isDefault();
};
if (!find_result.valid)
{
if (find_result.outdated)
{
cache_expired_ids[id].push_back(row);
++cache_expired;
if (allow_read_expired_keys_from_cache_dictionary)
{
const auto & cell = cells[cell_idx];
out[row] = !cell.isDefault();
}
if (allow_read_expired_keys)
insert_to_answer_routine();
}
else
{
cache_not_found_ids[id].push_back(row);
++cache_not_found;
}
}
else
{
++cache_hit;
const auto & cell = cells[cell_idx];
out[row] = !cell.isDefault();
insert_to_answer_routine();
}
}
}
ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired);
ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found);
ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired_ids.size());
ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found_ids.size());
ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);
query_count.fetch_add(rows, std::memory_order_relaxed);
const size_t outdated_ids_count = cache_expired + cache_not_found;
hit_count.fetch_add(rows - outdated_ids_count, std::memory_order_release);
hit_count.fetch_add(rows - cache_expired_ids.size() - cache_not_found_ids.size(), std::memory_order_release);
/// We have no keys to update.
if (outdated_ids_count == 0)
return;
/// Schedule an update job for expired keys. (At this point we have only expired keys.)
/// No need to wait for expired keys being updated.
if (allow_read_expired_keys_from_cache_dictionary && cache_not_found == 0)
if (cache_not_found_ids.empty())
{
std::vector<Key> required_expired_ids(cache_expired_ids.size());
std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::begin(required_expired_ids), [](auto & pair) { return pair.first; });
/// Callbacks are empty because we don't want to receive them after an unknown period of time.
auto update_unit_ptr = std::make_shared<UpdateUnit>(required_expired_ids, [&](const auto, const auto){}, [&](const auto, const auto){} );
if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
throw std::runtime_error("Can't schedule an update job.");
return;
/// Nothing to update - return;
if (cache_expired_ids.empty())
return;
if (allow_read_expired_keys)
{
std::vector<Key> required_expired_ids;
required_expired_ids.reserve(cache_expired_ids.size());
std::transform(
std::begin(cache_expired_ids), std::end(cache_expired_ids),
std::back_inserter(required_expired_ids), [](auto & pair) { return pair.first; });
/// Callbacks are empty because we don't want to receive them after an unknown period of time.
auto update_unit_ptr = std::make_shared<UpdateUnit>(
required_expired_ids,
[&](const auto, const auto) {},
[&](const auto, const auto) {} );
tryPushToUpdateQueueOrThrow(update_unit_ptr);
/// Update is async - no need to wait.
return;
}
}
/// At this point we have two situations. There may be both types of keys: expired and not found.
/// At this point we have two situations.
/// There may be both types of keys: cache_expired_ids and cache_not_found_ids.
/// We will update them all synchronously.
std::vector<Key> required_ids;
required_ids.reserve(outdated_ids_count);
std::transform(std::begin(cache_not_found_ids), std::end(cache_not_found_ids), std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
required_ids.reserve(cache_not_found_ids.size() + cache_expired_ids.size());
std::transform(
std::begin(cache_not_found_ids), std::end(cache_not_found_ids),
std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
std::transform(
std::begin(cache_expired_ids), std::end(cache_expired_ids),
std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
auto update_unit_ptr = std::make_shared<UpdateUnit>(
std::move(required_ids),
[&](const Key id, const size_t) {
[&](const Key id, const size_t)
{
for (const auto row : cache_not_found_ids[id])
out[row] = true;
for (const auto row : cache_expired_ids[id])
out[row] = true;
},
[&](const Key id, const size_t) {
[&](const Key id, const size_t)
{
for (const auto row : cache_not_found_ids[id])
out[row] = false;
for (const auto row : cache_expired_ids[id])
@ -381,17 +403,8 @@ void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8>
}
);
if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
throw std::runtime_error("Too many updates");
// waitForCurrentUpdateFinish();
while (!update_unit_ptr->is_done && !update_unit_ptr->current_exception) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::this_thread::yield();
}
if (update_unit_ptr->current_exception)
std::rethrow_exception(update_unit_ptr->current_exception);
tryPushToUpdateQueueOrThrow(update_unit_ptr);
waitForCurrentUpdateFinish(update_unit_ptr);
}
@ -648,7 +661,8 @@ void registerDictionaryCache(DictionaryFactory & factory)
DictionarySourcePtr source_ptr) -> DictionaryPtr
{
if (dict_struct.key)
throw Exception{"'key' is not supported for dictionary of layout 'cache'", ErrorCodes::UNSUPPORTED_METHOD};
throw Exception{"'key' is not supported for dictionary of layout 'cache'",
ErrorCodes::UNSUPPORTED_METHOD};
if (dict_struct.range_min || dict_struct.range_max)
throw Exception{name
@ -656,9 +670,11 @@ void registerDictionaryCache(DictionaryFactory & factory)
"for a dictionary of layout 'range_hashed'",
ErrorCodes::BAD_ARGUMENTS};
const auto & layout_prefix = config_prefix + ".layout";
const auto size = config.getInt(layout_prefix + ".cache.size_in_cells");
const auto size = config.getUInt64(layout_prefix + ".cache.size_in_cells");
if (size == 0)
throw Exception{name + ": dictionary of layout 'cache' cannot have 0 cells", ErrorCodes::TOO_SMALL_BUFFER_SIZE};
throw Exception{name + ": dictionary of layout 'cache' cannot have 0 cells",
ErrorCodes::TOO_SMALL_BUFFER_SIZE};
const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
if (require_nonempty)
@ -666,13 +682,32 @@ void registerDictionaryCache(DictionaryFactory & factory)
ErrorCodes::BAD_ARGUMENTS};
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
return std::make_unique<CacheDictionary>(name, dict_struct, std::move(source_ptr), dict_lifetime, size);
const auto max_update_queue_size =
config.getUInt64(layout_prefix + ".cache.max_update_queue_size", 100000);
if (max_update_queue_size == 0)
throw Exception{name + ": dictionary of layout 'cache' cannot have empty update queue of size 0",
ErrorCodes::TOO_SMALL_BUFFER_SIZE};
const bool allow_read_expired_keys =
config.getBool(layout_prefix + ".cache.allow_read_expired_keys", false);
const auto update_queue_push_timeout_milliseconds =
config.getUInt64(layout_prefix + ".cache.update_queue_push_timeout_milliseconds", 10);
if (update_queue_push_timeout_milliseconds < 10)
throw Exception{name + ": dictionary of layout 'cache' have too little update_queue_push_timeout",
ErrorCodes::BAD_ARGUMENTS};
return std::make_unique<CacheDictionary>(
name, dict_struct, std::move(source_ptr), dict_lifetime, size,
max_update_queue_size, allow_read_expired_keys, update_queue_push_timeout_milliseconds);
};
factory.registerLayout("cache", create_layout, false);
}
void CacheDictionary::updateThreadFunction()
{
setThreadName("AsyncUpdater");
while (!finished)
{
UpdateUnitPtr unit_ptr;
@ -680,21 +715,34 @@ void CacheDictionary::updateThreadFunction()
try
{
update(unit_ptr->requested_ids, unit_ptr->on_cell_updated, unit_ptr->on_id_not_found);
std::unique_lock<std::mutex> lock(update_mutex);
unit_ptr->is_done = true;
last_update.fetch_add(1);
is_update_finished.notify_all();
}
catch (...)
{
std::unique_lock<std::mutex> lock(update_mutex);
unit_ptr->current_exception = std::current_exception();
is_update_finished.notify_all();
}
}
}
void CacheDictionary::waitForCurrentUpdateFinish() const
void CacheDictionary::waitForCurrentUpdateFinish(UpdateUnitPtr update_unit_ptr) const
{
size_t current_update_number = update_number.fetch_add(1);
while (last_update != current_update_number)
std::this_thread::yield();
std::unique_lock<std::mutex> lock(update_mutex);
is_update_finished.wait(lock,
[&] () {return update_unit_ptr->is_done || update_unit_ptr->current_exception; });
if (update_unit_ptr->current_exception)
std::rethrow_exception(update_unit_ptr->current_exception);
}
void CacheDictionary::tryPushToUpdateQueueOrThrow(UpdateUnitPtr update_unit_ptr) const
{
if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
throw DB::Exception("Cannot push to internal update queue. Current queue size is " +
std::to_string(update_queue.size()), ErrorCodes::CACHE_DICTIONARY_UPDATE_FAIL);
}
}

View File

@ -26,6 +26,21 @@
namespace DB
{
namespace ErrorCodes
{
extern const int CACHE_DICTIONARY_UPDATE_FAIL;
}
/*
* cache_not_found_ids |0|0|1|1|
* cache_expired_ids |0|1|0|1|
*
* 0 - if set is empty, 1 - otherwise
*
* Only if there are no cache_not_found_ids and some cache_expired_ids
* (with allow_read_expired_keys_from_cache_dictionary setting) we can perform async update.
* Otherwise we have no concatenate ids and update them sync.
* */
class CacheDictionary final : public IDictionary
{
public:
@ -34,7 +49,10 @@ public:
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
const DictionaryLifetime dict_lifetime_,
const size_t size_);
const size_t size_,
const bool allow_read_expired_keys_,
const size_t max_update_queue_size_,
const size_t update_queue_push_timeout_milliseconds_);
~CacheDictionary() override;
@ -59,7 +77,9 @@ public:
std::shared_ptr<const IExternalLoadable> clone() const override
{
return std::make_shared<CacheDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, size);
return std::make_shared<CacheDictionary>(
name, dict_struct, source_ptr->clone(), dict_lifetime, size,
allow_read_expired_keys, max_update_queue_size, update_queue_push_timeout_milliseconds);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }
@ -265,6 +285,10 @@ private:
const DictionaryStructure dict_struct;
mutable DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
const bool allow_read_expired_keys;
const size_t max_update_queue_size;
const size_t update_queue_push_timeout_milliseconds;
Logger * const log;
mutable std::shared_mutex rw_lock;
@ -296,8 +320,7 @@ private:
mutable std::atomic<size_t> hit_count{0};
mutable std::atomic<size_t> query_count{0};
mutable std::atomic<size_t> update_number{1};
mutable std::atomic<size_t> last_update{0};
/// Field and methods correlated with update expired and not found keys
struct UpdateUnit
{
@ -319,23 +342,17 @@ private:
using UpdateUnitPtr = std::shared_ptr<UpdateUnit>;
using UpdateQueue = ConcurrentBoundedQueue<UpdateUnitPtr>;
// TODO: make setting called max_updates_number
mutable UpdateQueue update_queue{100};
mutable UpdateQueue update_queue;
ThreadFromGlobalPool update_thread;
void updateThreadFunction();
std::atomic<bool> finished{false};
void tryPushToUpdateQueueOrThrow(UpdateUnitPtr update_unit_ptr) const;
void waitForCurrentUpdateFinish(UpdateUnitPtr update_unit_ptr) const;
static bool getAllowReadExpiredKeysSetting()
{
Context * context = current_thread->getThreadGroup()->global_context;
return context->getSettingsRef().allow_read_expired_keys_from_cache_dictionary;
}
const size_t update_queue_push_timeout_milliseconds = 100;
void waitForCurrentUpdateFinish() const;
mutable std::mutex update_mutex;
mutable std::condition_variable is_update_finished;
std::atomic<bool> finished{false};
};
}

View File

@ -44,12 +44,10 @@ void CacheDictionary::getItemsNumberImpl(
std::unordered_map<Key, std::vector<size_t>> cache_expired_ids;
std::unordered_map<Key, std::vector<size_t>> cache_not_found_ids;
const bool allow_read_expired_keys_from_cache_dictionary = getAllowReadExpiredKeysSetting();
auto & attribute_array = std::get<ContainerPtrType<AttributeType>>(attribute.arrays);
const auto rows = ext::size(ids);
size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0;
size_t cache_hit = 0;
{
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
@ -80,15 +78,12 @@ void CacheDictionary::getItemsNumberImpl(
if (find_result.outdated)
{
cache_expired_ids[id].push_back(row);
++cache_expired;
if (allow_read_expired_keys_from_cache_dictionary)
if (allow_read_expired_keys)
update_routine();
}
else
{
cache_not_found_ids[id].push_back(row);
++cache_not_found;
}
}
else
@ -99,21 +94,21 @@ void CacheDictionary::getItemsNumberImpl(
}
}
ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired);
ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found);
ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired_ids.size());
ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found_ids.size());
ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);
query_count.fetch_add(rows, std::memory_order_relaxed);
const size_t outdated_ids_count = cache_expired + cache_not_found;
hit_count.fetch_add(rows - outdated_ids_count, std::memory_order_release);
hit_count.fetch_add(rows - cache_expired_ids.size() - cache_not_found_ids.size(), std::memory_order_release);
if (outdated_ids_count == 0)
return;
if (allow_read_expired_keys_from_cache_dictionary)
if (cache_not_found_ids.empty())
{
if (!cache_expired_ids.empty())
/// Nothing to update - return
if (cache_expired_ids.empty())
return;
/// Update async only if allow_read_expired_keys_is_enabled
if (allow_read_expired_keys)
{
std::vector<Key> required_expired_ids;
required_expired_ids.reserve(cache_expired_ids.size());
@ -126,34 +121,27 @@ void CacheDictionary::getItemsNumberImpl(
[&](const auto, const auto) {},
[&](const auto, const auto) {});
if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
throw std::runtime_error("Too many updates");
tryPushToUpdateQueueOrThrow(update_unit_ptr);
/// Nothing to do - return
return;
}
}
if (cache_not_found_ids.empty())
return;
/// From this point we have to update all keys sync.
/// Maybe allow_read_expired_keys_from_cache_dictionary is disabled
/// and there no cache_not_found_ids but some cache_expired.
std::vector<Key> required_ids;
required_ids.reserve(cache_not_found_ids.size() + cache_expired_ids.size());
std::transform(
std::begin(cache_not_found_ids), std::end(cache_not_found_ids),
std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
std::transform(
std::begin(cache_expired_ids), std::end(cache_expired_ids),
std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
if (allow_read_expired_keys_from_cache_dictionary)
{
/// In this case we have to update synchronously only cache_not_found_ids
required_ids.reserve(cache_not_found_ids.size());
std::transform(std::begin(cache_not_found_ids), std::end(cache_not_found_ids), std::back_inserter(required_ids),
[](auto & pair) { return pair.first; });
}
else
{
required_ids.reserve(cache_not_found_ids.size() + cache_expired_ids.size());
std::transform(std::begin(cache_not_found_ids), std::end(cache_not_found_ids), std::back_inserter(required_ids),
[](auto & pair) { return pair.first; });
std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::back_inserter(required_ids),
[](auto & pair) { return pair.first; });
}
/// request new values
/// Request new values
auto update_unit_ptr = std::make_shared<UpdateUnit>(
required_ids,
[&](const auto id, const auto cell_idx)
@ -175,29 +163,20 @@ void CacheDictionary::getItemsNumberImpl(
out[row] = get_default(row);
});
if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
throw std::runtime_error("Too many updates");
// waitForCurrentUpdateFinish();
while (!update_unit_ptr->is_done && !update_unit_ptr->current_exception) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::this_thread::yield();
}
if (update_unit_ptr->current_exception)
std::rethrow_exception(update_unit_ptr->current_exception);
tryPushToUpdateQueueOrThrow(update_unit_ptr);
waitForCurrentUpdateFinish(update_unit_ptr);
}
template <typename DefaultGetter>
void CacheDictionary::getItemsString(
Attribute & attribute, const PaddedPODArray<Key> & ids, ColumnString * out, DefaultGetter && get_default) const
{
Attribute & attribute, const PaddedPODArray<Key> & ids, ColumnString * out, DefaultGetter && get_default) const {
const auto rows = ext::size(ids);
/// save on some allocations
out->getOffsets().reserve(rows);
auto & attribute_array = std::get<ContainerPtrType<StringRef>>(attribute.arrays);
auto &attribute_array = std::get<ContainerPtrType < StringRef>>
(attribute.arrays);
auto found_outdated_values = false;
@ -207,20 +186,16 @@ void CacheDictionary::getItemsString(
const auto now = std::chrono::system_clock::now();
/// fetch up-to-date values, discard on fail
for (const auto row : ext::range(0, rows))
{
for (const auto row : ext::range(0, rows)) {
const auto id = ids[row];
const auto find_result = findCellIdx(id, now);
if (!find_result.valid)
{
if (!find_result.valid) {
found_outdated_values = true;
break;
}
else
{
const auto & cell_idx = find_result.cell_idx;
const auto & cell = cells[cell_idx];
} else {
const auto &cell_idx = find_result.cell_idx;
const auto &cell = cells[cell_idx];
const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx];
out->insertData(string_ref.data, string_ref.size);
}
@ -228,8 +203,7 @@ void CacheDictionary::getItemsString(
}
/// optimistic code completed successfully
if (!found_outdated_values)
{
if (!found_outdated_values) {
query_count.fetch_add(rows, std::memory_order_relaxed);
hit_count.fetch_add(rows, std::memory_order_release);
return;
@ -245,25 +219,21 @@ void CacheDictionary::getItemsString(
/// we are going to store every string separately
std::unordered_map<Key, String> map;
const bool allow_read_expired_keys_from_cache_dictionary = getAllowReadExpiredKeysSetting();
size_t total_length = 0;
size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0;
size_t cache_hit = 0;
{
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
const auto now = std::chrono::system_clock::now();
for (const auto row : ext::range(0, ids.size()))
{
for (const auto row : ext::range(0, ids.size())) {
const auto id = ids[row];
const auto find_result = findCellIdx(id, now);
auto insert_value_routine = [&]()
{
const auto & cell_idx = find_result.cell_idx;
const auto & cell = cells[cell_idx];
auto insert_value_routine = [&]() {
const auto &cell_idx = find_result.cell_idx;
const auto &cell = cells[cell_idx];
const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx];
if (!cell.isDefault())
@ -272,89 +242,60 @@ void CacheDictionary::getItemsString(
total_length += string_ref.size + 1;
};
if (!find_result.valid)
{
if (find_result.outdated)
{
if (!find_result.valid) {
if (find_result.outdated) {
cache_expired_ids[id].push_back(row);
++cache_expired;
if (allow_read_expired_keys_from_cache_dictionary)
{
if (allow_read_expired_keys)
insert_value_routine();
}
}
else
{
} else {
cache_not_found_ids[id].push_back(row);
++cache_not_found;
}
}
else
{
} else {
++cache_hit;
insert_value_routine();
}
}
}
ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired);
ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found);
ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired_ids.size());
ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found_ids.size());
ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);
query_count.fetch_add(rows, std::memory_order_relaxed);
const size_t outdated_ids_count = cache_expired + cache_not_found;
hit_count.fetch_add(rows - outdated_ids_count, std::memory_order_release);
hit_count.fetch_add(rows - cache_expired_ids.size() - cache_not_found_ids.size(), std::memory_order_release);
if (!cache_expired_ids.empty())
/// Async udpdare of expired keys.
if (cache_not_found_ids.empty())
{
std::vector<Key> required_expired_ids;
required_expired_ids.reserve(cache_not_found_ids.size());
std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::back_inserter(required_expired_ids), [](auto & pair) { return pair.first; });
if (allow_read_expired_keys && !cache_expired_ids.empty())
{
std::vector<Key> required_expired_ids;
required_expired_ids.reserve(cache_not_found_ids.size());
std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids),
std::back_inserter(required_expired_ids), [](auto & pair) { return pair.first; });
if (allow_read_expired_keys_from_cache_dictionary)
{
auto update_unit_ptr = std::make_shared<UpdateUnit>(required_expired_ids, [&](const auto, const auto){}, [&](const auto, const auto) {});
if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
throw std::runtime_error("Too many updates");
}
else
{
auto update_unit_ptr = std::make_shared<UpdateUnit>(
required_expired_ids,
[&](const auto id, const auto cell_idx)
{
const auto attribute_value = attribute_array[cell_idx];
required_expired_ids, [&](const auto, const auto) {}, [&](const auto, const auto) {});
map[id] = String{attribute_value};
total_length += (attribute_value.size + 1) * cache_expired_ids[id].size();
},
[&](const auto id, const auto)
{
for (const auto row : cache_expired_ids[id])
total_length += get_default(row).size + 1;
});
tryPushToUpdateQueueOrThrow(update_unit_ptr);
if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
throw std::runtime_error("Too many updates");
// waitForCurrentUpdateFinish();
while (!update_unit_ptr->is_done && !update_unit_ptr->current_exception) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::this_thread::yield();
}
if (update_unit_ptr->current_exception)
std::rethrow_exception(update_unit_ptr->current_exception);
/// Do not return at this point, because there some extra stuff to do at the end of this method.
}
}
/// request new values
/// Request new values sync.
/// We have request both cache_not_found_ids and cache_expired_ids.
if (!cache_not_found_ids.empty())
{
std::vector<Key> required_ids;
required_ids.reserve(cache_not_found_ids.size());
std::transform(std::begin(cache_not_found_ids), std::end(cache_not_found_ids), std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
required_ids.reserve(cache_not_found_ids.size() + cache_expired_ids.size());
std::transform(
std::begin(cache_not_found_ids), std::end(cache_not_found_ids),
std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
std::transform(
std::begin(cache_expired_ids), std::end(cache_expired_ids),
std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
auto update_unit_ptr = std::make_shared<UpdateUnit>(
required_ids,
@ -371,17 +312,8 @@ void CacheDictionary::getItemsString(
total_length += get_default(row).size + 1;
});
if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
throw std::runtime_error("Too many updates");
// waitForCurrentUpdateFinish();begin
while (!update_unit_ptr->is_done && !update_unit_ptr->current_exception) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::this_thread::yield();
}
if (update_unit_ptr->current_exception)
std::rethrow_exception(update_unit_ptr->current_exception);
tryPushToUpdateQueueOrThrow(update_unit_ptr);
waitForCurrentUpdateFinish(update_unit_ptr);
}
out->getChars().reserve(total_length);
@ -409,8 +341,6 @@ void CacheDictionary::update(
const auto now = std::chrono::system_clock::now();
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
if (now > backoff_end_time)
{
try
@ -423,7 +353,11 @@ void CacheDictionary::update(
}
Stopwatch watch;
/// Go to external storage. Might be very slow and blocking.
auto stream = source_ptr->loadIds(requested_ids);
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
stream->readPrefix();
while (const auto block = stream->read())