mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-10-08 17:40:49 +00:00
more
This commit is contained in:
parent
be5729e113
commit
d93b0d5030
@ -12,7 +12,6 @@
|
|||||||
#include <Common/typeid_cast.h>
|
#include <Common/typeid_cast.h>
|
||||||
#include <ext/range.h>
|
#include <ext/range.h>
|
||||||
#include <ext/size.h>
|
#include <ext/size.h>
|
||||||
#include <Interpreters/Context.h>
|
|
||||||
#include "CacheDictionary.inc.h"
|
#include "CacheDictionary.inc.h"
|
||||||
#include "DictionaryBlockInputStream.h"
|
#include "DictionaryBlockInputStream.h"
|
||||||
#include "DictionaryFactory.h"
|
#include "DictionaryFactory.h"
|
||||||
@ -292,9 +291,7 @@ void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8>
|
|||||||
|
|
||||||
size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0;
|
size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0;
|
||||||
|
|
||||||
Context * context = current_thread->getThreadGroup()->global_context;
|
const bool allow_read_expired_keys_from_cache_dictionary = getAllowReadExpiredKeysSetting();
|
||||||
const bool allow_read_expired_keys_from_cache_dictionary =
|
|
||||||
context->getSettingsRef().allow_read_expired_keys_from_cache_dictionary;
|
|
||||||
|
|
||||||
const auto rows = ext::size(ids);
|
const auto rows = ext::size(ids);
|
||||||
{
|
{
|
||||||
@ -380,8 +377,7 @@ void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8>
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
UInt64 timeout{10};
|
const bool res = update_queue.tryPush(update_unit, update_queue_push_timeout_milliseconds);
|
||||||
const bool res = update_queue.tryPush(update_unit, timeout);
|
|
||||||
|
|
||||||
if (!res)
|
if (!res)
|
||||||
throw std::runtime_error("Too many updates");
|
throw std::runtime_error("Too many updates");
|
||||||
|
@ -16,6 +16,7 @@
|
|||||||
#include <pcg_random.hpp>
|
#include <pcg_random.hpp>
|
||||||
#include <Common/ArenaWithFreeLists.h>
|
#include <Common/ArenaWithFreeLists.h>
|
||||||
#include <Common/CurrentMetrics.h>
|
#include <Common/CurrentMetrics.h>
|
||||||
|
#include <Interpreters/Context.h>
|
||||||
#include <ext/bit_cast.h>
|
#include <ext/bit_cast.h>
|
||||||
#include "DictionaryStructure.h"
|
#include "DictionaryStructure.h"
|
||||||
#include "IDictionary.h"
|
#include "IDictionary.h"
|
||||||
@ -313,6 +314,14 @@ private:
|
|||||||
void updateThreadFunction();
|
void updateThreadFunction();
|
||||||
std::atomic<bool> finished{false};
|
std::atomic<bool> finished{false};
|
||||||
|
|
||||||
|
bool getAllowReadExpiredKeysSetting() const
|
||||||
|
{
|
||||||
|
Context * context = current_thread->getThreadGroup()->global_context;
|
||||||
|
return context->getSettingsRef().allow_read_expired_keys_from_cache_dictionary;
|
||||||
|
}
|
||||||
|
|
||||||
|
const size_t update_queue_push_timeout_milliseconds = 10;
|
||||||
|
|
||||||
void waitForCurrentUpdateFinish() const;
|
void waitForCurrentUpdateFinish() const;
|
||||||
mutable std::mutex update_mutex;
|
mutable std::mutex update_mutex;
|
||||||
};
|
};
|
||||||
|
@ -162,10 +162,13 @@ void CacheDictionary::getItemsString(
|
|||||||
out->getOffsets().resize_assume_reserved(0);
|
out->getOffsets().resize_assume_reserved(0);
|
||||||
|
|
||||||
/// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
|
/// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
|
||||||
std::unordered_map<Key, std::vector<size_t>> outdated_ids;
|
std::unordered_map<Key, std::vector<size_t>> cache_expired_ids;
|
||||||
|
std::unordered_map<Key, std::vector<size_t>> cache_not_found_ids;
|
||||||
/// we are going to store every string separately
|
/// we are going to store every string separately
|
||||||
std::unordered_map<Key, String> map;
|
std::unordered_map<Key, String> map;
|
||||||
|
|
||||||
|
const bool allow_read_expired_keys_from_cache_dictionary = getAllowReadExpiredKeysSetting();
|
||||||
|
|
||||||
size_t total_length = 0;
|
size_t total_length = 0;
|
||||||
size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0;
|
size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0;
|
||||||
{
|
{
|
||||||
@ -177,17 +180,10 @@ void CacheDictionary::getItemsString(
|
|||||||
const auto id = ids[row];
|
const auto id = ids[row];
|
||||||
|
|
||||||
const auto find_result = findCellIdx(id, now);
|
const auto find_result = findCellIdx(id, now);
|
||||||
if (!find_result.valid)
|
|
||||||
|
|
||||||
|
auto insert_value_routine = [&]()
|
||||||
{
|
{
|
||||||
outdated_ids[id].push_back(row);
|
|
||||||
if (find_result.outdated)
|
|
||||||
++cache_expired;
|
|
||||||
else
|
|
||||||
++cache_not_found;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
++cache_hit;
|
|
||||||
const auto & cell_idx = find_result.cell_idx;
|
const auto & cell_idx = find_result.cell_idx;
|
||||||
const auto & cell = cells[cell_idx];
|
const auto & cell = cells[cell_idx];
|
||||||
const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx];
|
const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx];
|
||||||
@ -196,6 +192,30 @@ void CacheDictionary::getItemsString(
|
|||||||
map[id] = String{string_ref};
|
map[id] = String{string_ref};
|
||||||
|
|
||||||
total_length += string_ref.size + 1;
|
total_length += string_ref.size + 1;
|
||||||
|
};
|
||||||
|
|
||||||
|
if (!find_result.valid)
|
||||||
|
{
|
||||||
|
if (find_result.outdated)
|
||||||
|
{
|
||||||
|
cache_expired_ids[id].push_back(row);
|
||||||
|
++cache_expired;
|
||||||
|
|
||||||
|
if (allow_read_expired_keys_from_cache_dictionary)
|
||||||
|
{
|
||||||
|
insert_value_routine();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
cache_not_found_ids[id].push_back(row);
|
||||||
|
++cache_not_found;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
++cache_hit;
|
||||||
|
insert_value_routine();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -205,28 +225,67 @@ void CacheDictionary::getItemsString(
|
|||||||
ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);
|
ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);
|
||||||
|
|
||||||
query_count.fetch_add(rows, std::memory_order_relaxed);
|
query_count.fetch_add(rows, std::memory_order_relaxed);
|
||||||
hit_count.fetch_add(rows - outdated_ids.size(), std::memory_order_release);
|
const size_t outdated_ids_count = cache_expired + cache_not_found;
|
||||||
|
hit_count.fetch_add(rows - outdated_ids_count, std::memory_order_release);
|
||||||
|
|
||||||
|
if (!cache_expired_ids.empty())
|
||||||
|
{
|
||||||
|
std::vector<Key> required_expired_ids(cache_not_found_ids.size());
|
||||||
|
std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::begin(required_expired_ids), [](auto & pair) { return pair.first; });
|
||||||
|
|
||||||
|
if (allow_read_expired_keys_from_cache_dictionary)
|
||||||
|
{
|
||||||
|
UpdateUnit update_unit{required_expired_ids, [&](const auto, const auto) {}, [&](const auto, const auto) {}};
|
||||||
|
if (!update_queue.tryPush(update_unit, update_queue_push_timeout_milliseconds))
|
||||||
|
throw std::runtime_error("Too many updates");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
UpdateUnit update_unit{
|
||||||
|
required_expired_ids,
|
||||||
|
[&](const auto id, const auto cell_idx)
|
||||||
|
{
|
||||||
|
const auto attribute_value = attribute_array[cell_idx];
|
||||||
|
|
||||||
|
map[id] = String{attribute_value};
|
||||||
|
total_length += (attribute_value.size + 1) * cache_not_found_ids[id].size();
|
||||||
|
},
|
||||||
|
[&](const auto id, const auto)
|
||||||
|
{
|
||||||
|
for (const auto row : cache_not_found_ids[id])
|
||||||
|
total_length += get_default(row).size + 1;
|
||||||
|
}};
|
||||||
|
|
||||||
|
if (!update_queue.tryPush(update_unit, update_queue_push_timeout_milliseconds))
|
||||||
|
throw std::runtime_error("Too many updates");
|
||||||
|
|
||||||
|
waitForCurrentUpdateFinish();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// request new values
|
/// request new values
|
||||||
if (!outdated_ids.empty())
|
if (!cache_not_found_ids.empty())
|
||||||
{
|
{
|
||||||
std::vector<Key> required_ids(outdated_ids.size());
|
std::vector<Key> required_ids(cache_not_found_ids.size());
|
||||||
std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids), [](auto & pair) { return pair.first; });
|
std::transform(std::begin(cache_not_found_ids), std::end(cache_not_found_ids), std::begin(required_ids), [](auto & pair) { return pair.first; });
|
||||||
|
|
||||||
update(
|
UpdateUnit update_unit{
|
||||||
required_ids,
|
required_ids,
|
||||||
[&](const auto id, const auto cell_idx)
|
[&](const auto id, const auto cell_idx)
|
||||||
{
|
{
|
||||||
const auto attribute_value = attribute_array[cell_idx];
|
const auto attribute_value = attribute_array[cell_idx];
|
||||||
|
|
||||||
map[id] = String{attribute_value};
|
map[id] = String{attribute_value};
|
||||||
total_length += (attribute_value.size + 1) * outdated_ids[id].size();
|
total_length += (attribute_value.size + 1) * cache_not_found_ids[id].size();
|
||||||
},
|
},
|
||||||
[&](const auto id, const auto)
|
[&](const auto id, const auto)
|
||||||
{
|
{
|
||||||
for (const auto row : outdated_ids[id])
|
for (const auto row : cache_not_found_ids[id])
|
||||||
total_length += get_default(row).size + 1;
|
total_length += get_default(row).size + 1;
|
||||||
});
|
}};
|
||||||
|
|
||||||
|
if (!update_queue.tryPush(update_unit, update_queue_push_timeout_milliseconds))
|
||||||
|
throw std::runtime_error("Too many updates");
|
||||||
}
|
}
|
||||||
|
|
||||||
out->getChars().reserve(total_length);
|
out->getChars().reserve(total_length);
|
||||||
|
Loading…
Reference in New Issue
Block a user