This commit is contained in:
nikitamikhaylov 2020-10-01 21:28:40 +03:00
parent 10adac00f4
commit fbd0d14dd6
6 changed files with 33 additions and 52 deletions

View File

@ -85,6 +85,7 @@ CacheDictionary::CacheDictionary(
, size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))} , size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
, size_overlap_mask{this->size - 1} , size_overlap_mask{this->size - 1}
, cells{this->size} , cells{this->size}
, default_keys{this->size}
, rnd_engine(randomSeed()) , rnd_engine(randomSeed())
, update_queue(max_update_queue_size_) , update_queue(max_update_queue_size_)
, update_pool(max_threads_for_updates) , update_pool(max_threads_for_updates)
@ -309,14 +310,14 @@ std::string CacheDictionary::AttributeValuesForKey::dump()
std::string CacheDictionary::UpdateUnit::dumpFoundIds() std::string CacheDictionary::UpdateUnit::dumpFoundIds()
{ {
std::string ans; std::ostringstream os;
for (auto it : found_ids) for (auto it : found_ids)
{ {
ans += "Key: " + std::to_string(it.first) + "\n"; os << "Key: " << std::to_string(it.first) << "\n";
if (it.second.found) if (it.second.found)
ans += it.second.dump() + "\n"; os << it.second.dump() << "\n";
} }
return ans; return os.str();
}; };
/// returns cell_idx (always valid for replacing), 'cell is valid' flag, 'cell is outdated' flag /// returns cell_idx (always valid for replacing), 'cell is valid' flag, 'cell is outdated' flag
@ -387,12 +388,11 @@ void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8>
for (const auto row : ext::range(0, rows)) for (const auto row : ext::range(0, rows))
{ {
const auto id = ids[row]; const auto id = ids[row];
{
std::shared_lock shared_lock(default_cache_rw_lock);
/// Check if the key is stored in the cache of defaults. /// Check if the key is stored in the cache of defaults.
if (default_keys.find(id) != default_keys.end()) if (default_keys.has(id))
continue; continue;
}
const auto find_result = findCellIdx(id, now); const auto find_result = findCellIdx(id, now);
auto insert_to_answer_routine = [&] () auto insert_to_answer_routine = [&] ()
@ -461,7 +461,7 @@ void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8>
} }
/// At this point we have two situations. /// At this point we have two situations.
/// There may be both types of keys: cache_expired_ids and cache_not_found_ids. /// There may be both types of keys: expired and not_found.
/// We will update them all synchronously. /// We will update them all synchronously.
std::vector<Key> required_ids; std::vector<Key> required_ids;
@ -481,11 +481,8 @@ void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8>
for (const auto row : cache_expired_or_not_found_ids[key]) for (const auto row : cache_expired_or_not_found_ids[key])
out[row] = true; out[row] = true;
else else
{
std::unique_lock unique_lock(default_cache_rw_lock);
/// Cache this key as default. /// Cache this key as default.
default_keys.insert(key); default_keys.add(key);
}
} }
} }

View File

@ -22,6 +22,8 @@
#include "IDictionary.h" #include "IDictionary.h"
#include "IDictionarySource.h" #include "IDictionarySource.h"
#include <Common/LRUSet.h>
namespace CurrentMetrics namespace CurrentMetrics
{ {
extern const Metric CacheDictionaryUpdateQueueBatches; extern const Metric CacheDictionaryUpdateQueueBatches;
@ -346,6 +348,7 @@ private:
std::map<std::string, size_t> attribute_index_by_name; std::map<std::string, size_t> attribute_index_by_name;
mutable std::vector<Attribute> attributes; mutable std::vector<Attribute> attributes;
mutable std::vector<CellMetadata> cells; mutable std::vector<CellMetadata> cells;
mutable LRUSet<Key> default_keys;
Attribute * hierarchical_attribute = nullptr; Attribute * hierarchical_attribute = nullptr;
std::unique_ptr<ArenaWithFreeLists> string_arena; std::unique_ptr<ArenaWithFreeLists> string_arena;
@ -360,21 +363,11 @@ private:
mutable std::atomic<size_t> hit_count{0}; mutable std::atomic<size_t> hit_count{0};
mutable std::atomic<size_t> query_count{0}; mutable std::atomic<size_t> query_count{0};
mutable std::unordered_set<Key> default_keys;
/* /*
* Disclaimer: this comment is written not for fun.
*
* How the update goes: we basically have a method like get(keys)->values. Values are cached, so sometimes we * How the update goes: we basically have a method like get(keys)->values. Values are cached, so sometimes we
* can return them from the cache. For values not in cache, we query them from the dictionary, and add to the * can return them from the cache. For values not in cache, we query them from the source, and add to the
* cache. The cache is lossy, so we can't expect it to store all the keys, and we store them separately. Normally, * cache. The cache is lossy, so we can't expect it to store all the keys, and we store them separately.
* they would be passed as a return value of get(), but for Unknown Reasons the dictionaries use a baroque * So, there is a map of found keys to all its attributes.
* interface where get() accepts two callback, one that it calls for found values, and one for not found.
*
* Now we make it even uglier by doing this from multiple threads. The missing values are retrieved from the
* dictionary in a background thread, and this thread calls the provided callback. So if you provide the callbacks,
* you MUST wait until the background update finishes, or god knows what happens. Unfortunately, we have no
* way to check that you did this right, so good luck.
*/ */
struct UpdateUnit struct UpdateUnit
{ {

View File

@ -62,13 +62,9 @@ void CacheDictionary::getItemsNumberImpl(
{ {
const auto id = ids[row]; const auto id = ids[row];
{
std::shared_lock shared_lock(default_cache_rw_lock);
/// First check if this key in the cache of default keys. /// First check if this key in the cache of default keys.
if (default_keys.find(id) != default_keys.end()) if (default_keys.has(id))
continue; continue;
}
/** cell should be updated if either: /** cell should be updated if either:
* 1. ids do not match, * 1. ids do not match,
@ -174,9 +170,7 @@ void CacheDictionary::getItemsNumberImpl(
out[row] = std::get<OutputType>(value.values[attribute_index]); out[row] = std::get<OutputType>(value.values[attribute_index]);
} else } else
{ {
/// Add key to the cache of default keys. default_keys.add(key);
std::unique_lock unique_lock(default_cache_rw_lock);
default_keys.emplace(key);
} }
} }
} }
@ -207,7 +201,7 @@ void CacheDictionary::getItemsString(
{ {
std::shared_lock shared_lock(default_cache_rw_lock); std::shared_lock shared_lock(default_cache_rw_lock);
/// Check if the key is stored in the cache of defaults. /// Check if the key is stored in the cache of defaults.
if (default_keys.find(id) != default_keys.end()) if (default_keys.has(id))
{ {
const auto string_ref = get_default(row); const auto string_ref = get_default(row);
out->insertData(string_ref.data, string_ref.size); out->insertData(string_ref.data, string_ref.size);
@ -258,15 +252,12 @@ void CacheDictionary::getItemsString(
for (const auto row : ext::range(0, ids.size())) for (const auto row : ext::range(0, ids.size()))
{ {
const auto id = ids[row]; const auto id = ids[row];
{
std::shared_lock shared_lock(default_cache_rw_lock);
/// Check if the key is stored in the cache of defaults. /// Check if the key is stored in the cache of defaults.
if (default_keys.find(id) != default_keys.end()) if (default_keys.has(id))
{ {
const auto string_ref = get_default(row); const auto string_ref = get_default(row);
out->insertData(string_ref.data, string_ref.size); out->insertData(string_ref.data, string_ref.size);
} }
}
const auto find_result = findCellIdx(id, now); const auto find_result = findCellIdx(id, now);
auto insert_value_routine = [&]() auto insert_value_routine = [&]()
@ -383,8 +374,7 @@ void CacheDictionary::getItemsString(
value = std::get<String>(found_it->second.values[attribute_index]); value = std::get<String>(found_it->second.values[attribute_index]);
else else
{ {
std::unique_lock unique_lock(default_cache_rw_lock); default_keys.add(id);
default_keys.insert(id);
value = get_default(row); value = get_default(row);
} }
} }

View File

@ -221,6 +221,7 @@ def test_reload_after_fail_in_cache_dictionary(started_cluster):
# Can't get a value from the cache dictionary because the source (table `test.xypairs`) doesn't respond. # Can't get a value from the cache dictionary because the source (table `test.xypairs`) doesn't respond.
expected_error = "Table test.xypairs doesn't exist" expected_error = "Table test.xypairs doesn't exist"
update_error = "Could not update cache dictionary cache_xypairs now"
assert expected_error in query_and_get_error("SELECT dictGetUInt64('cache_xypairs', 'y', toUInt64(1))") assert expected_error in query_and_get_error("SELECT dictGetUInt64('cache_xypairs', 'y', toUInt64(1))")
assert get_status("cache_xypairs") == "LOADED" assert get_status("cache_xypairs") == "LOADED"
assert expected_error in get_last_exception("cache_xypairs") assert expected_error in get_last_exception("cache_xypairs")
@ -254,8 +255,9 @@ def test_reload_after_fail_in_cache_dictionary(started_cluster):
query("SELECT dictGet('cache_xypairs', 'y', toUInt64(1))") == "56" query("SELECT dictGet('cache_xypairs', 'y', toUInt64(1))") == "56"
query("SELECT dictGet('cache_xypairs', 'y', toUInt64(2))") == "0" query("SELECT dictGet('cache_xypairs', 'y', toUInt64(2))") == "0"
error = query_and_get_error("SELECT dictGetUInt64('cache_xypairs', 'y', toUInt64(3))") error = query_and_get_error("SELECT dictGetUInt64('cache_xypairs', 'y', toUInt64(3))")
assert expected_error in error assert (expected_error in error) or (update_error in error)
assert expected_error in get_last_exception("cache_xypairs") last_exception = get_last_exception("cache_xypairs")
assert (expected_error in last_exception) or (update_error in last_exception)
# Create table `test.xypairs` again with changed values. # Create table `test.xypairs` again with changed values.
query(''' query('''

View File

@ -47,7 +47,7 @@ def test_SYSTEM_RELOAD_DICTIONARY(started_cluster):
instance.query("INSERT INTO dictionary_source VALUES (1, 1)") instance.query("INSERT INTO dictionary_source VALUES (1, 1)")
assert TSV(instance.query( assert TSV(instance.query(
"SELECT dictGetUInt8('clickhouse_cache', 'value', toUInt64(0)), dictHas('clickhouse_cache', toUInt64(1))")) == TSV( "SELECT dictGetUInt8('clickhouse_cache', 'value', toUInt64(0)), dictHas('clickhouse_cache', toUInt64(1))")) == TSV(
"0\t1\n") "0\t0\n")
instance.query("SYSTEM RELOAD DICTIONARY clickhouse_cache") instance.query("SYSTEM RELOAD DICTIONARY clickhouse_cache")
assert TSV(instance.query( assert TSV(instance.query(