This commit is contained in:
Nikita Mikhaylov 2019-12-19 21:22:04 +03:00
parent 5f3d7cace4
commit be5729e113
4 changed files with 130 additions and 19 deletions

View File

@ -350,6 +350,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, low_cardinality_max_dictionary_size, 8192, "Maximum size (in rows) of shared global dictionary for LowCardinality type.", 0) \ M(SettingUInt64, low_cardinality_max_dictionary_size, 8192, "Maximum size (in rows) of shared global dictionary for LowCardinality type.", 0) \
M(SettingBool, low_cardinality_use_single_dictionary_for_part, false, "LowCardinality type serialization setting. If is true, than will use additional keys when global dictionary overflows. Otherwise, will create several shared dictionaries.", 0) \ M(SettingBool, low_cardinality_use_single_dictionary_for_part, false, "LowCardinality type serialization setting. If is true, than will use additional keys when global dictionary overflows. Otherwise, will create several shared dictionaries.", 0) \
M(SettingBool, decimal_check_overflow, true, "Check overflow of decimal arithmetic/comparison operations", 0) \ M(SettingBool, decimal_check_overflow, true, "Check overflow of decimal arithmetic/comparison operations", 0) \
M(SettingBool, allow_read_expired_keys_from_cache_dictionary, false, "Allow read keys if their lifetime is expired.", 0) \
\ \
M(SettingBool, prefer_localhost_replica, 1, "1 - always send query to local replica, if it exists. 0 - choose replica to send query between local and remote ones according to load_balancing", 0) \ M(SettingBool, prefer_localhost_replica, 1, "1 - always send query to local replica, if it exists. 0 - choose replica to send query between local and remote ones according to load_balancing", 0) \
M(SettingUInt64, max_fetch_partition_retries_count, 5, "Amount of retries while fetching partition from another host.", 0) \ M(SettingUInt64, max_fetch_partition_retries_count, 5, "Amount of retries while fetching partition from another host.", 0) \

View File

@ -12,6 +12,7 @@
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <ext/range.h> #include <ext/range.h>
#include <ext/size.h> #include <ext/size.h>
#include <Interpreters/Context.h>
#include "CacheDictionary.inc.h" #include "CacheDictionary.inc.h"
#include "DictionaryBlockInputStream.h" #include "DictionaryBlockInputStream.h"
#include "DictionaryFactory.h" #include "DictionaryFactory.h"
@ -76,6 +77,14 @@ CacheDictionary::CacheDictionary(
throw Exception{name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD}; throw Exception{name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD};
createAttributes(); createAttributes();
update_thread = ThreadFromGlobalPool([this] { updateThreadFunction(); });
}
CacheDictionary::~CacheDictionary()
{
finished = true;
update_queue.clear();
update_thread.join();
} }
@ -272,11 +281,21 @@ CacheDictionary::FindResult CacheDictionary::findCellIdx(const Key & id, const C
void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8> & out) const void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8> & out) const
{ {
/// There are three types of ids.
/// - Valid ids. These ids are presented in local cache and their lifetime is not expired.
/// - CacheExpired ids. Ids that are in local cache, but their values are rotted (lifetime is expired).
/// - CacheNotFound ids. We have to go to external storage to know its value.
/// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> } /// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
std::unordered_map<Key, std::vector<size_t>> outdated_ids; std::unordered_map<Key, std::vector<size_t>> cache_expired_ids;
std::unordered_map<Key, std::vector<size_t>> cache_not_found_ids;
size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0; size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0;
Context * context = current_thread->getThreadGroup()->global_context;
const bool allow_read_expired_keys_from_cache_dictionary =
context->getSettingsRef().allow_read_expired_keys_from_cache_dictionary;
const auto rows = ext::size(ids); const auto rows = ext::size(ids);
{ {
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs}; const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
@ -290,11 +309,22 @@ void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8>
const auto & cell_idx = find_result.cell_idx; const auto & cell_idx = find_result.cell_idx;
if (!find_result.valid) if (!find_result.valid)
{ {
outdated_ids[id].push_back(row);
if (find_result.outdated) if (find_result.outdated)
{
cache_expired_ids[id].push_back(row);
++cache_expired; ++cache_expired;
if (allow_read_expired_keys_from_cache_dictionary)
{
const auto & cell = cells[cell_idx];
out[row] = !cell.isDefault();
}
}
else else
{
cache_not_found_ids[id].push_back(row);
++cache_not_found; ++cache_not_found;
}
} }
else else
{ {
@ -310,27 +340,53 @@ void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8>
ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit); ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);
query_count.fetch_add(rows, std::memory_order_relaxed); query_count.fetch_add(rows, std::memory_order_relaxed);
hit_count.fetch_add(rows - outdated_ids.size(), std::memory_order_release); const size_t outdated_ids_count = cache_expired + cache_not_found;
hit_count.fetch_add(rows - outdated_ids_count, std::memory_order_release);
if (outdated_ids.empty()) /// We have no keys to update.
if (outdated_ids_count == 0)
return; return;
std::vector<Key> required_ids(outdated_ids.size()); /// Schedule an update job for expired keys. (At this point we have only expired keys.)
std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids), [](auto & pair) { return pair.first; }); /// No need to wait for expired keys being updated.
if (allow_read_expired_keys_from_cache_dictionary && cache_not_found == 0)
{
std::vector<Key> required_expired_ids(cache_expired_ids.size());
std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::begin(required_expired_ids), [](auto & pair) { return pair.first; });
/// Callbacks are empty because we don't want to receive them after an unknown period of time.
UpdateUnit update_unit{std::move(required_expired_ids), [&](const Key, const size_t){}, [&](const Key, const size_t){}};
UInt64 timeout{10}; /// TODO: make setting or a field called update_queue_push_timeout;
if (!update_queue.tryPush(update_unit, timeout))
throw std::runtime_error("Can't schedule an update job.");
return;
}
/// request new values /// At this point we have two situations. There may be both types of keys: expired and not found.
update( /// We will update them all synchronously.
required_ids,
[&](const auto id, const auto) std::vector<Key> required_ids(cache_not_found_ids.size());
{ std::transform(std::begin(cache_not_found_ids), std::end(cache_not_found_ids), std::begin(required_ids), [](auto & pair) { return pair.first; });
for (const auto row : outdated_ids[id]) std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::begin(required_ids), [](auto & pair) { return pair.first; });
out[row] = true;
}, UpdateUnit update_unit{
[&](const auto id, const auto) std::move(required_ids),
{ [&](const Key id, const size_t) {
for (const auto row : outdated_ids[id]) for (const auto row : cache_not_found_ids[id])
out[row] = false; out[row] = true;
}); },
[&](const Key id, const size_t) {
for (const auto row : cache_not_found_ids[id])
out[row] = false;
}
};
UInt64 timeout{10};
const bool res = update_queue.tryPush(update_unit, timeout);
if (!res)
throw std::runtime_error("Too many updates");
waitForCurrentUpdateFinish();
} }
@ -610,5 +666,31 @@ void registerDictionaryCache(DictionaryFactory & factory)
factory.registerLayout("cache", create_layout, false); factory.registerLayout("cache", create_layout, false);
} }
void CacheDictionary::updateThreadFunction()
{
try
{
while (!finished)
{
UpdateUnit unit;
update_queue.pop(unit);
update(unit.requested_ids, unit.on_cell_updated, unit.on_id_not_found);
last_update.fetch_add(1);
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void CacheDictionary::waitForCurrentUpdateFinish() const
{
size_t current_update_number = update_number.fetch_add(1);
while (last_update != current_update_number)
std::this_thread::yield();
}
} }

View File

@ -4,12 +4,15 @@
#include <chrono> #include <chrono>
#include <cmath> #include <cmath>
#include <map> #include <map>
#include <mutex>
#include <shared_mutex> #include <shared_mutex>
#include <variant> #include <variant>
#include <vector> #include <vector>
#include <common/logger_useful.h> #include <common/logger_useful.h>
#include <Columns/ColumnDecimal.h> #include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Common/ThreadPool.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <pcg_random.hpp> #include <pcg_random.hpp>
#include <Common/ArenaWithFreeLists.h> #include <Common/ArenaWithFreeLists.h>
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
@ -31,6 +34,8 @@ public:
const DictionaryLifetime dict_lifetime_, const DictionaryLifetime dict_lifetime_,
const size_t size_); const size_t size_);
~CacheDictionary() override;
std::string getName() const override { return name; } std::string getName() const override { return name; }
std::string getTypeName() const override { return "Cache"; } std::string getTypeName() const override { return "Cache"; }
@ -288,6 +293,28 @@ private:
mutable std::atomic<size_t> element_count{0}; mutable std::atomic<size_t> element_count{0};
mutable std::atomic<size_t> hit_count{0}; mutable std::atomic<size_t> hit_count{0};
mutable std::atomic<size_t> query_count{0}; mutable std::atomic<size_t> query_count{0};
mutable std::atomic<size_t> update_number{1};
mutable std::atomic<size_t> last_update{0};
struct UpdateUnit
{
std::vector<Key> requested_ids;
std::function<void(const Key, const size_t)> on_cell_updated;
std::function<void(const Key, const size_t)> on_id_not_found;
};
using UpdateQueue = ConcurrentBoundedQueue<UpdateUnit>;
// TODO: make setting called max_updates_number
mutable UpdateQueue update_queue{10};
ThreadFromGlobalPool update_thread;
void updateThreadFunction();
std::atomic<bool> finished{false};
void waitForCurrentUpdateFinish() const;
mutable std::mutex update_mutex;
}; };
} }

View File

@ -21,6 +21,7 @@ extern const Event DictCacheRequestTimeNs;
extern const Event DictCacheRequests; extern const Event DictCacheRequests;
extern const Event DictCacheLockWriteNs; extern const Event DictCacheLockWriteNs;
extern const Event DictCacheLockReadNs; extern const Event DictCacheLockReadNs;
extern const Event DictCacheReadsRottedValues;
} }
namespace CurrentMetrics namespace CurrentMetrics