Revert "Asynchronous updates in cache dictionaries "

This commit is contained in:
Nikita Mikhaylov 2020-02-05 23:00:59 +03:00 committed by GitHub
parent dad4f03a8f
commit c53053b103
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 289 additions and 1374 deletions

View File

@ -481,7 +481,6 @@ namespace ErrorCodes
extern const int UNABLE_TO_SKIP_UNUSED_SHARDS = 507;
extern const int UNKNOWN_ACCESS_TYPE = 508;
extern const int INVALID_GRANT = 509;
extern const int CACHE_DICTIONARY_UPDATE_FAIL = 510;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -12,7 +12,6 @@
#include <Common/typeid_cast.h>
#include <ext/range.h>
#include <ext/size.h>
#include <Common/setThreadName.h>
#include "CacheDictionary.inc.h"
#include "DictionaryBlockInputStream.h"
#include "DictionaryFactory.h"
@ -62,48 +61,24 @@ CacheDictionary::CacheDictionary(
const std::string & name_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
DictionaryLifetime dict_lifetime_,
size_t size_,
bool allow_read_expired_keys_,
size_t max_update_queue_size_,
size_t update_queue_push_timeout_milliseconds_,
size_t max_threads_for_updates_)
const DictionaryLifetime dict_lifetime_,
const size_t size_)
: database(database_)
, name(name_)
, full_name{database_.empty() ? name_ : (database_ + "." + name_)}
, dict_struct(dict_struct_)
, source_ptr{std::move(source_ptr_)}
, dict_lifetime(dict_lifetime_)
, allow_read_expired_keys(allow_read_expired_keys_)
, max_update_queue_size(max_update_queue_size_)
, update_queue_push_timeout_milliseconds(update_queue_push_timeout_milliseconds_)
, max_threads_for_updates(max_threads_for_updates_)
, log(&Logger::get("ExternalDictionaries"))
, size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
, size_overlap_mask{this->size - 1}
, cells{this->size}
, rnd_engine(randomSeed())
, update_queue(max_update_queue_size_)
, update_pool(max_threads_for_updates)
{
if (!this->source_ptr->supportsSelectiveLoad())
throw Exception{full_name + ": source cannot be used with CacheDictionary", ErrorCodes::UNSUPPORTED_METHOD};
createAttributes();
for (size_t i = 0; i < max_threads_for_updates; ++i)
update_pool.scheduleOrThrowOnError([this] { updateThreadFunction(); });
}
CacheDictionary::~CacheDictionary()
{
finished = true;
update_queue.clear();
for (size_t i = 0; i < max_threads_for_updates; ++i)
{
auto empty_finishing_ptr = std::make_shared<UpdateUnit>(std::vector<Key>());
update_queue.push(empty_finishing_ptr);
}
update_pool.wait();
}
@ -300,16 +275,10 @@ CacheDictionary::FindResult CacheDictionary::findCellIdx(const Key & id, const C
void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8> & out) const
{
/// There are three types of ids.
/// - Valid ids. These ids are presented in local cache and their lifetime is not expired.
/// - CacheExpired ids. Ids that are in local cache, but their values are rotted (lifetime is expired).
/// - CacheNotFound ids. We have to go to external storage to know its value.
/// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
std::unordered_map<Key, std::vector<size_t>> cache_expired_ids;
std::unordered_map<Key, std::vector<size_t>> cache_not_found_ids;
std::unordered_map<Key, std::vector<size_t>> outdated_ids;
size_t cache_hit = 0;
size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0;
const auto rows = ext::size(ids);
{
@ -322,97 +291,49 @@ void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8>
const auto id = ids[row];
const auto find_result = findCellIdx(id, now);
const auto & cell_idx = find_result.cell_idx;
auto insert_to_answer_routine = [&] ()
{
out[row] = !cells[cell_idx].isDefault();
};
if (!find_result.valid)
{
outdated_ids[id].push_back(row);
if (find_result.outdated)
{
cache_expired_ids[id].push_back(row);
if (allow_read_expired_keys)
insert_to_answer_routine();
}
++cache_expired;
else
{
cache_not_found_ids[id].push_back(row);
}
++cache_not_found;
}
else
{
++cache_hit;
insert_to_answer_routine();
const auto & cell = cells[cell_idx];
out[row] = !cell.isDefault();
}
}
}
ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired_ids.size());
ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found_ids.size());
ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired);
ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found);
ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);
query_count.fetch_add(rows, std::memory_order_relaxed);
hit_count.fetch_add(rows - cache_expired_ids.size() - cache_not_found_ids.size(), std::memory_order_release);
hit_count.fetch_add(rows - outdated_ids.size(), std::memory_order_release);
if (cache_not_found_ids.empty())
{
/// Nothing to update - return;
if (cache_expired_ids.empty())
return;
if (outdated_ids.empty())
return;
if (allow_read_expired_keys)
std::vector<Key> required_ids(outdated_ids.size());
std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids), [](auto & pair) { return pair.first; });
/// request new values
update(
required_ids,
[&](const auto id, const auto)
{
std::vector<Key> required_expired_ids;
required_expired_ids.reserve(cache_expired_ids.size());
std::transform(
std::begin(cache_expired_ids), std::end(cache_expired_ids),
std::back_inserter(required_expired_ids), [](auto & pair) { return pair.first; });
/// Callbacks are empty because we don't want to receive them after an unknown period of time.
auto update_unit_ptr = std::make_shared<UpdateUnit>(required_expired_ids);
tryPushToUpdateQueueOrThrow(update_unit_ptr);
/// Update is async - no need to wait.
return;
}
}
/// At this point we have two situations.
/// There may be both types of keys: cache_expired_ids and cache_not_found_ids.
/// We will update them all synchronously.
std::vector<Key> required_ids;
required_ids.reserve(cache_not_found_ids.size() + cache_expired_ids.size());
std::transform(
std::begin(cache_not_found_ids), std::end(cache_not_found_ids),
std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
std::transform(
std::begin(cache_expired_ids), std::end(cache_expired_ids),
std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
auto on_cell_updated = [&] (const Key id, const size_t)
{
for (const auto row : cache_not_found_ids[id])
out[row] = true;
for (const auto row : cache_expired_ids[id])
out[row] = true;
};
auto on_id_not_found = [&] (const Key id, const size_t)
{
for (const auto row : cache_not_found_ids[id])
out[row] = false;
for (const auto row : cache_expired_ids[id])
out[row] = true;
};
auto update_unit_ptr = std::make_shared<UpdateUnit>(required_ids, on_cell_updated, on_id_not_found);
tryPushToUpdateQueueOrThrow(update_unit_ptr);
waitForCurrentUpdateFinish(update_unit_ptr);
for (const auto row : outdated_ids[id])
out[row] = true;
},
[&](const auto id, const auto)
{
for (const auto row : outdated_ids[id])
out[row] = false;
});
}
@ -669,8 +590,7 @@ void registerDictionaryCache(DictionaryFactory & factory)
DictionarySourcePtr source_ptr) -> DictionaryPtr
{
if (dict_struct.key)
throw Exception{"'key' is not supported for dictionary of layout 'cache'",
ErrorCodes::UNSUPPORTED_METHOD};
throw Exception{"'key' is not supported for dictionary of layout 'cache'", ErrorCodes::UNSUPPORTED_METHOD};
if (dict_struct.range_min || dict_struct.range_max)
throw Exception{full_name
@ -678,11 +598,9 @@ void registerDictionaryCache(DictionaryFactory & factory)
"for a dictionary of layout 'range_hashed'",
ErrorCodes::BAD_ARGUMENTS};
const auto & layout_prefix = config_prefix + ".layout";
const size_t size = config.getUInt64(layout_prefix + ".cache.size_in_cells");
const auto size = config.getInt(layout_prefix + ".cache.size_in_cells");
if (size == 0)
throw Exception{full_name + ": dictionary of layout 'cache' cannot have 0 cells",
ErrorCodes::TOO_SMALL_BUFFER_SIZE};
throw Exception{full_name + ": dictionary of layout 'cache' cannot have 0 cells", ErrorCodes::TOO_SMALL_BUFFER_SIZE};
const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
if (require_nonempty)
@ -692,284 +610,10 @@ void registerDictionaryCache(DictionaryFactory & factory)
const String database = config.getString(config_prefix + ".database", "");
const String name = config.getString(config_prefix + ".name");
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
const size_t max_update_queue_size =
config.getUInt64(layout_prefix + ".cache.max_update_queue_size", 100000);
if (max_update_queue_size == 0)
throw Exception{name + ": dictionary of layout 'cache' cannot have empty update queue of size 0",
ErrorCodes::TOO_SMALL_BUFFER_SIZE};
const bool allow_read_expired_keys =
config.getBool(layout_prefix + ".cache.allow_read_expired_keys", false);
const size_t update_queue_push_timeout_milliseconds =
config.getUInt64(layout_prefix + ".cache.update_queue_push_timeout_milliseconds", 10);
if (update_queue_push_timeout_milliseconds < 10)
throw Exception{name + ": dictionary of layout 'cache' have too little update_queue_push_timeout",
ErrorCodes::BAD_ARGUMENTS};
const size_t max_threads_for_updates =
config.getUInt64(layout_prefix + ".max_threads_for_updates", 4);
if (max_threads_for_updates == 0)
throw Exception{name + ": dictionary of layout 'cache' cannot have zero threads for updates.",
ErrorCodes::BAD_ARGUMENTS};
return std::make_unique<CacheDictionary>(
database, name, dict_struct, std::move(source_ptr), dict_lifetime, size,
allow_read_expired_keys, max_update_queue_size, update_queue_push_timeout_milliseconds,
max_threads_for_updates);
return std::make_unique<CacheDictionary>(database, name, dict_struct, std::move(source_ptr), dict_lifetime, size);
};
factory.registerLayout("cache", create_layout, false);
}
void CacheDictionary::updateThreadFunction()
{
setThreadName("AsyncUpdater");
while (!finished)
{
UpdateUnitPtr first_popped;
update_queue.pop(first_popped);
if (finished)
break;
/// Here we pop as many unit pointers from update queue as we can.
/// We fix current size to avoid livelock (or too long waiting),
/// when this thread pops from the queue and other threads push to the queue.
const size_t current_queue_size = update_queue.size();
if (current_queue_size > 0)
LOG_TRACE(log, "Performing bunch of keys update in cache dictionary with "
<< current_queue_size + 1 << " keys");
std::vector<UpdateUnitPtr> update_request;
update_request.reserve(current_queue_size + 1);
update_request.emplace_back(first_popped);
UpdateUnitPtr current_unit_ptr;
while (update_request.size() && update_queue.tryPop(current_unit_ptr))
update_request.emplace_back(std::move(current_unit_ptr));
BunchUpdateUnit bunch_update_unit(update_request);
try
{
/// Update a bunch of ids.
update(bunch_update_unit);
/// Notify all threads about finished updating the bunch of ids
/// where their own ids were included.
std::unique_lock<std::mutex> lock(update_mutex);
for (auto & unit_ptr: update_request)
unit_ptr->is_done = true;
is_update_finished.notify_all();
}
catch (...)
{
std::unique_lock<std::mutex> lock(update_mutex);
/// It is a big trouble, because one bad query can make other threads fail with not relative exception.
/// So at this point all threads (and queries) will receive the same exception.
for (auto & unit_ptr: update_request)
unit_ptr->current_exception = std::current_exception();
is_update_finished.notify_all();
}
}
}
void CacheDictionary::waitForCurrentUpdateFinish(UpdateUnitPtr & update_unit_ptr) const
{
std::unique_lock<std::mutex> lock(update_mutex);
/*
* We wait here without any timeout to avoid SEGFAULT's.
* Consider timeout for wait had expired and main query's thread ended with exception
* or some other error. But the UpdateUnit with callbacks is left in the queue.
* It has these callback that capture god knows what from the current thread
* (most of the variables lies on the stack of finished thread) that
* intended to do a synchronous update. AsyncUpdate thread can touch deallocated memory and explode.
* */
is_update_finished.wait(
lock,
[&] {return update_unit_ptr->is_done || update_unit_ptr->current_exception; });
if (update_unit_ptr->current_exception)
std::rethrow_exception(update_unit_ptr->current_exception);
}
void CacheDictionary::tryPushToUpdateQueueOrThrow(UpdateUnitPtr & update_unit_ptr) const
{
if (!update_queue.tryPush(update_unit_ptr, update_queue_push_timeout_milliseconds))
throw DB::Exception(
"Cannot push to internal update queue in dictionary " + getFullName() + ". Timelimit of " +
std::to_string(update_queue_push_timeout_milliseconds) + " ms. exceeded. Current queue size is " +
std::to_string(update_queue.size()), ErrorCodes::CACHE_DICTIONARY_UPDATE_FAIL);
}
void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, bunch_update_unit.getRequestedIds().size());
std::unordered_map<Key, UInt8> remaining_ids{bunch_update_unit.getRequestedIds().size()};
for (const auto id : bunch_update_unit.getRequestedIds())
remaining_ids.insert({id, 0});
const auto now = std::chrono::system_clock::now();
if (now > backoff_end_time.load())
{
try
{
if (error_count)
{
/// Recover after error: we have to clone the source here because
/// it could keep connections which should be reset after error.
source_ptr = source_ptr->clone();
}
Stopwatch watch;
auto stream = source_ptr->loadIds(bunch_update_unit.getRequestedIds());
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
stream->readPrefix();
while (const auto block = stream->read())
{
const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
if (!id_column)
throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH};
const auto & ids = id_column->getData();
/// cache column pointers
const auto column_ptrs = ext::map<std::vector>(
ext::range(0, attributes.size()), [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); });
for (const auto i : ext::range(0, ids.size()))
{
const auto id = ids[i];
const auto find_result = findCellIdx(id, now);
const auto & cell_idx = find_result.cell_idx;
auto & cell = cells[cell_idx];
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto & attribute_column = *column_ptrs[attribute_idx];
auto & attribute = attributes[attribute_idx];
setAttributeValue(attribute, cell_idx, attribute_column[i]);
}
/// if cell id is zero and zero does not map to this cell, then the cell is unused
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
{
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
}
else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
bunch_update_unit.informCallersAboutPresentId(id, cell_idx);
/// mark corresponding id as found
remaining_ids[id] = 1;
}
}
stream->readSuffix();
error_count = 0;
last_exception = std::exception_ptr{};
backoff_end_time = std::chrono::system_clock::time_point{};
ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
}
catch (...)
{
++error_count;
last_exception = std::current_exception();
backoff_end_time = now + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, error_count));
tryLogException(last_exception, log, "Could not update cache dictionary '" + getFullName() +
"', next update is scheduled at " + ext::to_string(backoff_end_time.load()));
}
}
size_t not_found_num = 0, found_num = 0;
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
/// Check which ids have not been found and require setting null_value
for (const auto & id_found_pair : remaining_ids)
{
if (id_found_pair.second)
{
++found_num;
continue;
}
++not_found_num;
const auto id = id_found_pair.first;
const auto find_result = findCellIdx(id, now);
const auto & cell_idx = find_result.cell_idx;
auto & cell = cells[cell_idx];
if (error_count)
{
if (find_result.outdated)
{
/// We have expired data for that `id` so we can continue using it.
bool was_default = cell.isDefault();
cell.setExpiresAt(backoff_end_time);
if (was_default)
cell.setDefault();
if (was_default)
bunch_update_unit.informCallersAboutAbsentId(id, cell_idx);
else
bunch_update_unit.informCallersAboutPresentId(id, cell_idx);
continue;
}
/// We don't have expired data for that `id` so all we can do is to rethrow `last_exception`.
std::rethrow_exception(last_exception);
}
/// Check if cell had not been occupied before and increment element counter if it hadn't
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
{
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
}
else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
/// Set null_value for each attribute
cell.setDefault();
for (auto & attribute : attributes)
setDefaultAttributeValue(attribute, cell_idx);
/// inform caller that the cell has not been found
bunch_update_unit.informCallersAboutAbsentId(id, cell_idx);
}
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num);
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedFound, found_num);
ProfileEvents::increment(ProfileEvents::DictCacheRequests);
}
}

View File

@ -4,16 +4,12 @@
#include <chrono>
#include <cmath>
#include <map>
#include <mutex>
#include <shared_mutex>
#include <utility>
#include <variant>
#include <vector>
#include <common/logger_useful.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <Common/ThreadPool.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <pcg_random.hpp>
#include <Common/ArenaWithFreeLists.h>
#include <Common/CurrentMetrics.h>
@ -25,22 +21,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int CACHE_DICTIONARY_UPDATE_FAIL;
}
/*
*
* This dictionary is stored in a cache that has a fixed number of cells.
* These cells contain frequently used elements.
* When searching for a dictionary, the cache is searched first and special heuristic is used:
* while looking for the key, we take a look only at max_collision_length elements.
* So, our cache is not perfect. It has errors like "the key is in cache, but the cache says that it does not".
* And in this case we simply ask external source for the key which is faster.
* You have to keep this logic in mind.
* */
class CacheDictionary final : public IDictionary
{
public:
@ -49,14 +29,8 @@ public:
const std::string & name_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
DictionaryLifetime dict_lifetime_,
size_t size_,
bool allow_read_expired_keys_,
size_t max_update_queue_size_,
size_t update_queue_push_timeout_milliseconds_,
size_t max_threads_for_updates);
~CacheDictionary() override;
const DictionaryLifetime dict_lifetime_,
const size_t size_);
const std::string & getDatabase() const override { return database; }
const std::string & getName() const override { return name; }
@ -81,10 +55,7 @@ public:
std::shared_ptr<const IExternalLoadable> clone() const override
{
return std::make_shared<CacheDictionary>(
database, name, dict_struct, source_ptr->clone(), dict_lifetime, size,
allow_read_expired_keys, max_update_queue_size,
update_queue_push_timeout_milliseconds, max_threads_for_updates);
return std::make_shared<CacheDictionary>(database, name, dict_struct, source_ptr->clone(), dict_lifetime, size);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }
@ -259,6 +230,9 @@ private:
template <typename DefaultGetter>
void getItemsString(Attribute & attribute, const PaddedPODArray<Key> & ids, ColumnString * out, DefaultGetter && get_default) const;
template <typename PresentIdHandler, typename AbsentIdHandler>
void update(const std::vector<Key> & requested_ids, PresentIdHandler && on_cell_updated, AbsentIdHandler && on_id_not_found) const;
PaddedPODArray<Key> getCachedIds() const;
bool isEmptyCell(const UInt64 idx) const;
@ -289,11 +263,6 @@ private:
const DictionaryStructure dict_struct;
mutable DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime;
const bool allow_read_expired_keys;
const size_t max_update_queue_size;
const size_t update_queue_push_timeout_milliseconds;
const size_t max_threads_for_updates;
Logger * const log;
mutable std::shared_mutex rw_lock;
@ -315,8 +284,8 @@ private:
std::unique_ptr<ArenaWithFreeLists> string_arena;
mutable std::exception_ptr last_exception;
mutable std::atomic<size_t> error_count = 0;
mutable std::atomic<std::chrono::system_clock::time_point> backoff_end_time{std::chrono::system_clock::time_point{}};
mutable size_t error_count = 0;
mutable std::chrono::system_clock::time_point backoff_end_time;
mutable pcg64 rnd_engine;
@ -324,166 +293,6 @@ private:
mutable std::atomic<size_t> element_count{0};
mutable std::atomic<size_t> hit_count{0};
mutable std::atomic<size_t> query_count{0};
};
/// Field and methods correlated with update expired and not found keys
using PresentIdHandler = std::function<void(Key, size_t)>;
using AbsentIdHandler = std::function<void(Key, size_t)>;
/*
* Disclaimer: this comment is written not for fun.
*
* How the update goes: we basically have a method like get(keys)->values. Values are cached, so sometimes we
* can return them from the cache. For values not in cache, we query them from the dictionary, and add to the
* cache. The cache is lossy, so we can't expect it to store all the keys, and we store them separately. Normally,
* they would be passed as a return value of get(), but for Unknown Reasons the dictionaries use a baroque
* interface where get() accepts two callback, one that it calls for found values, and one for not found.
*
* Now we make it even uglier by doing this from multiple threads. The missing values are retreived from the
* dictionary in a background thread, and this thread calls the provided callback. So if you provide the callbacks,
* you MUST wait until the background update finishes, or god knows what happens. Unfortunately, we have no
* way to check that you did this right, so good luck.
*/
struct UpdateUnit
{
UpdateUnit(std::vector<Key> requested_ids_,
PresentIdHandler present_id_handler_,
AbsentIdHandler absent_id_handler_) :
requested_ids(std::move(requested_ids_)),
present_id_handler(present_id_handler_),
absent_id_handler(absent_id_handler_) {}
explicit UpdateUnit(std::vector<Key> requested_ids_) :
requested_ids(std::move(requested_ids_)),
present_id_handler([](Key, size_t){}),
absent_id_handler([](Key, size_t){}) {}
std::vector<Key> requested_ids;
PresentIdHandler present_id_handler;
AbsentIdHandler absent_id_handler;
std::atomic<bool> is_done{false};
std::exception_ptr current_exception{nullptr};
};
using UpdateUnitPtr = std::shared_ptr<UpdateUnit>;
using UpdateQueue = ConcurrentBoundedQueue<UpdateUnitPtr>;
/*
* This class is used to concatenate requested_keys.
*
* Imagine that we have several UpdateUnit with different vectors of keys and callbacks for that keys.
* We concatenate them into a long vector of keys that looks like:
*
* a1...ak_a b1...bk_2 c1...ck_3,
*
* where a1...ak_a are requested_keys from the first UpdateUnit.
* In addition we have the same number (three in this case) of callbacks.
* This class helps us to find a callback (or many callbacks) for a special key.
* */
class BunchUpdateUnit
{
public:
explicit BunchUpdateUnit(std::vector<UpdateUnitPtr> & update_request)
{
/// Here we prepare total count of all requested ids
/// not to do useless allocations later.
size_t total_requested_keys_count = 0;
for (auto & unit_ptr: update_request)
{
total_requested_keys_count += unit_ptr->requested_ids.size();
if (helper.empty())
helper.push_back(unit_ptr->requested_ids.size());
else
helper.push_back(unit_ptr->requested_ids.size() + helper.back());
present_id_handlers.emplace_back(unit_ptr->present_id_handler);
absent_id_handlers.emplace_back(unit_ptr->absent_id_handler);
}
concatenated_requested_ids.reserve(total_requested_keys_count);
for (auto & unit_ptr: update_request)
std::for_each(std::begin(unit_ptr->requested_ids), std::end(unit_ptr->requested_ids),
[&] (const Key & key) {concatenated_requested_ids.push_back(key);});
}
const std::vector<Key> & getRequestedIds()
{
return concatenated_requested_ids;
}
void informCallersAboutPresentId(Key id, size_t cell_idx)
{
for (size_t i = 0; i < concatenated_requested_ids.size(); ++i)
{
auto & curr = concatenated_requested_ids[i];
if (curr == id)
getPresentIdHandlerForPosition(i)(id, cell_idx);
}
}
void informCallersAboutAbsentId(Key id, size_t cell_idx)
{
for (size_t i = 0; i < concatenated_requested_ids.size(); ++i)
if (concatenated_requested_ids[i] == id)
getAbsentIdHandlerForPosition(i)(id, cell_idx);
}
private:
PresentIdHandler & getPresentIdHandlerForPosition(size_t position)
{
return present_id_handlers[getUpdateUnitNumberForRequestedIdPosition(position)];
}
AbsentIdHandler & getAbsentIdHandlerForPosition(size_t position)
{
return absent_id_handlers[getUpdateUnitNumberForRequestedIdPosition((position))];
}
size_t getUpdateUnitNumberForRequestedIdPosition(size_t position)
{
return std::lower_bound(helper.begin(), helper.end(), position) - helper.begin();
}
std::vector<Key> concatenated_requested_ids;
std::vector<PresentIdHandler> present_id_handlers;
std::vector<AbsentIdHandler> absent_id_handlers;
std::vector<size_t> helper;
};
mutable UpdateQueue update_queue;
ThreadPool update_pool;
/*
* Actually, we can divide all requested keys into two 'buckets'. There are only four possible states and they
* are described in the table.
*
* cache_not_found_ids |0|0|1|1|
* cache_expired_ids |0|1|0|1|
*
* 0 - if set is empty, 1 - otherwise
*
* Only if there are no cache_not_found_ids and some cache_expired_ids
* (with allow_read_expired_keys_from_cache_dictionary setting) we can perform async update.
* Otherwise we have no concatenate ids and update them sync.
*
*/
void updateThreadFunction();
void update(BunchUpdateUnit & bunch_update_unit) const;
void tryPushToUpdateQueueOrThrow(UpdateUnitPtr & update_unit_ptr) const;
void waitForCurrentUpdateFinish(UpdateUnitPtr & update_unit_ptr) const;
mutable std::mutex update_mutex;
mutable std::condition_variable is_update_finished;
std::atomic<bool> finished{false};
};
}

View File

@ -40,13 +40,11 @@ void CacheDictionary::getItemsNumberImpl(
Attribute & attribute, const PaddedPODArray<Key> & ids, ResultArrayType<OutputType> & out, DefaultGetter && get_default) const
{
/// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
std::unordered_map<Key, std::vector<size_t>> cache_expired_ids;
std::unordered_map<Key, std::vector<size_t>> cache_not_found_ids;
std::unordered_map<Key, std::vector<size_t>> outdated_ids;
auto & attribute_array = std::get<ContainerPtrType<AttributeType>>(attribute.arrays);
const auto rows = ext::size(ids);
size_t cache_hit = 0;
size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0;
{
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
@ -63,105 +61,52 @@ void CacheDictionary::getItemsNumberImpl(
* 3. explicit defaults were specified and cell was set default. */
const auto find_result = findCellIdx(id, now);
auto update_routine = [&]()
{
const auto & cell_idx = find_result.cell_idx;
const auto & cell = cells[cell_idx];
out[row] = cell.isDefault() ? get_default(row) : static_cast<OutputType>(attribute_array[cell_idx]);
};
if (!find_result.valid)
{
outdated_ids[id].push_back(row);
if (find_result.outdated)
{
cache_expired_ids[id].push_back(row);
if (allow_read_expired_keys)
update_routine();
}
++cache_expired;
else
{
cache_not_found_ids[id].push_back(row);
}
++cache_not_found;
}
else
{
++cache_hit;
update_routine();
const auto & cell_idx = find_result.cell_idx;
const auto & cell = cells[cell_idx];
out[row] = cell.isDefault() ? get_default(row) : static_cast<OutputType>(attribute_array[cell_idx]);
}
}
}
ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired_ids.size());
ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found_ids.size());
ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired);
ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found);
ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);
query_count.fetch_add(rows, std::memory_order_relaxed);
hit_count.fetch_add(rows - cache_expired_ids.size() - cache_not_found_ids.size(), std::memory_order_release);
hit_count.fetch_add(rows - outdated_ids.size(), std::memory_order_release);
if (cache_not_found_ids.empty())
{
/// Nothing to update - return
if (cache_expired_ids.empty())
return;
if (outdated_ids.empty())
return;
/// Update async only if allow_read_expired_keys_is_enabledadd condvar usage and better code
if (allow_read_expired_keys)
std::vector<Key> required_ids(outdated_ids.size());
std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids), [](auto & pair) { return pair.first; });
/// request new values
update(
required_ids,
[&](const auto id, const auto cell_idx)
{
std::vector<Key> required_expired_ids;
required_expired_ids.reserve(cache_expired_ids.size());
std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids), std::back_inserter(required_expired_ids),
[](auto & pair) { return pair.first; });
const auto attribute_value = attribute_array[cell_idx];
/// request new values
auto update_unit_ptr = std::make_shared<UpdateUnit>(required_expired_ids);
tryPushToUpdateQueueOrThrow(update_unit_ptr);
/// Nothing to do - return
return;
}
}
/// From this point we have to update all keys sync.
/// Maybe allow_read_expired_keys_from_cache_dictionary is disabled
/// and there no cache_not_found_ids but some cache_expired.
std::vector<Key> required_ids;
required_ids.reserve(cache_not_found_ids.size() + cache_expired_ids.size());
std::transform(
std::begin(cache_not_found_ids), std::end(cache_not_found_ids),
std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
std::transform(
std::begin(cache_expired_ids), std::end(cache_expired_ids),
std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
auto on_cell_updated = [&] (const auto id, const auto cell_idx)
{
const auto attribute_value = attribute_array[cell_idx];
for (const size_t row : cache_not_found_ids[id])
out[row] = static_cast<OutputType>(attribute_value);
for (const size_t row : cache_expired_ids[id])
out[row] = static_cast<OutputType>(attribute_value);
};
auto on_id_not_found = [&] (const auto id, const auto)
{
for (const size_t row : cache_not_found_ids[id])
out[row] = get_default(row);
for (const size_t row : cache_expired_ids[id])
out[row] = get_default(row);
};
/// Request new values
auto update_unit_ptr = std::make_shared<UpdateUnit>(required_ids, on_cell_updated, on_id_not_found);
tryPushToUpdateQueueOrThrow(update_unit_ptr);
waitForCurrentUpdateFinish(update_unit_ptr);
for (const size_t row : outdated_ids[id])
out[row] = static_cast<OutputType>(attribute_value);
},
[&](const auto id, const auto)
{
for (const size_t row : outdated_ids[id])
out[row] = get_default(row);
});
}
template <typename DefaultGetter>
@ -216,13 +161,12 @@ void CacheDictionary::getItemsString(
out->getOffsets().resize_assume_reserved(0);
/// Mapping: <id> -> { all indices `i` of `ids` such that `ids[i]` = <id> }
std::unordered_map<Key, std::vector<size_t>> cache_expired_ids;
std::unordered_map<Key, std::vector<size_t>> cache_not_found_ids;
std::unordered_map<Key, std::vector<size_t>> outdated_ids;
/// we are going to store every string separately
std::unordered_map<Key, String> map;
size_t total_length = 0;
size_t cache_hit = 0;
size_t cache_expired = 0, cache_not_found = 0, cache_hit = 0;
{
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
@ -232,10 +176,17 @@ void CacheDictionary::getItemsString(
const auto id = ids[row];
const auto find_result = findCellIdx(id, now);
auto insert_value_routine = [&]()
if (!find_result.valid)
{
outdated_ids[id].push_back(row);
if (find_result.outdated)
++cache_expired;
else
++cache_not_found;
}
else
{
++cache_hit;
const auto & cell_idx = find_result.cell_idx;
const auto & cell = cells[cell_idx];
const auto string_ref = cell.isDefault() ? get_default(row) : attribute_array[cell_idx];
@ -244,82 +195,37 @@ void CacheDictionary::getItemsString(
map[id] = String{string_ref};
total_length += string_ref.size + 1;
};
if (!find_result.valid)
{
if (find_result.outdated)
{
cache_expired_ids[id].push_back(row);
if (allow_read_expired_keys)
insert_value_routine();
} else
cache_not_found_ids[id].push_back(row);
} else
{
++cache_hit;
insert_value_routine();
}
}
}
ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired_ids.size());
ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found_ids.size());
ProfileEvents::increment(ProfileEvents::DictCacheKeysExpired, cache_expired);
ProfileEvents::increment(ProfileEvents::DictCacheKeysNotFound, cache_not_found);
ProfileEvents::increment(ProfileEvents::DictCacheKeysHit, cache_hit);
query_count.fetch_add(rows, std::memory_order_relaxed);
hit_count.fetch_add(rows - cache_expired_ids.size() - cache_not_found_ids.size(), std::memory_order_release);
hit_count.fetch_add(rows - outdated_ids.size(), std::memory_order_release);
/// Async update of expired keys.
if (cache_not_found_ids.empty())
/// request new values
if (!outdated_ids.empty())
{
if (allow_read_expired_keys && !cache_expired_ids.empty())
{
std::vector<Key> required_expired_ids;
required_expired_ids.reserve(cache_not_found_ids.size());
std::transform(std::begin(cache_expired_ids), std::end(cache_expired_ids),
std::back_inserter(required_expired_ids), [](auto & pair) { return pair.first; });
std::vector<Key> required_ids(outdated_ids.size());
std::transform(std::begin(outdated_ids), std::end(outdated_ids), std::begin(required_ids), [](auto & pair) { return pair.first; });
auto update_unit_ptr = std::make_shared<UpdateUnit>(required_expired_ids);
update(
required_ids,
[&](const auto id, const auto cell_idx)
{
const auto attribute_value = attribute_array[cell_idx];
tryPushToUpdateQueueOrThrow(update_unit_ptr);
/// Do not return at this point, because there some extra stuff to do at the end of this method.
}
}
/// Request new values sync.
/// We have request both cache_not_found_ids and cache_expired_ids.
if (!cache_not_found_ids.empty())
{
std::vector<Key> required_ids;
required_ids.reserve(cache_not_found_ids.size() + cache_expired_ids.size());
std::transform(
std::begin(cache_not_found_ids), std::end(cache_not_found_ids),
std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
std::transform(
std::begin(cache_expired_ids), std::end(cache_expired_ids),
std::back_inserter(required_ids), [](auto & pair) { return pair.first; });
auto on_cell_updated = [&] (const auto id, const auto cell_idx)
{
const auto attribute_value = attribute_array[cell_idx];
map[id] = String{attribute_value};
total_length += (attribute_value.size + 1) * cache_not_found_ids[id].size();
};
auto on_id_not_found = [&] (const auto id, const auto)
{
for (const auto row : cache_not_found_ids[id])
total_length += get_default(row).size + 1;
};
auto update_unit_ptr = std::make_shared<UpdateUnit>(required_ids, on_cell_updated, on_id_not_found);
tryPushToUpdateQueueOrThrow(update_unit_ptr);
waitForCurrentUpdateFinish(update_unit_ptr);
map[id] = String{attribute_value};
total_length += (attribute_value.size + 1) * outdated_ids[id].size();
},
[&](const auto id, const auto)
{
for (const auto row : outdated_ids[id])
total_length += get_default(row).size + 1;
});
}
out->getChars().reserve(total_length);
@ -334,4 +240,167 @@ void CacheDictionary::getItemsString(
}
}
template <typename PresentIdHandler, typename AbsentIdHandler>
void CacheDictionary::update(
const std::vector<Key> & requested_ids, PresentIdHandler && on_cell_updated, AbsentIdHandler && on_id_not_found) const
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size());
std::unordered_map<Key, UInt8> remaining_ids{requested_ids.size()};
for (const auto id : requested_ids)
remaining_ids.insert({id, 0});
const auto now = std::chrono::system_clock::now();
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
if (now > backoff_end_time)
{
try
{
if (error_count)
{
/// Recover after error: we have to clone the source here because
/// it could keep connections which should be reset after error.
source_ptr = source_ptr->clone();
}
Stopwatch watch;
auto stream = source_ptr->loadIds(requested_ids);
stream->readPrefix();
while (const auto block = stream->read())
{
const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
if (!id_column)
throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH};
const auto & ids = id_column->getData();
/// cache column pointers
const auto column_ptrs = ext::map<std::vector>(
ext::range(0, attributes.size()), [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); });
for (const auto i : ext::range(0, ids.size()))
{
const auto id = ids[i];
const auto find_result = findCellIdx(id, now);
const auto & cell_idx = find_result.cell_idx;
auto & cell = cells[cell_idx];
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto & attribute_column = *column_ptrs[attribute_idx];
auto & attribute = attributes[attribute_idx];
setAttributeValue(attribute, cell_idx, attribute_column[i]);
}
/// if cell id is zero and zero does not map to this cell, then the cell is unused
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
{
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
}
else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
/// inform caller
on_cell_updated(id, cell_idx);
/// mark corresponding id as found
remaining_ids[id] = 1;
}
}
stream->readSuffix();
error_count = 0;
last_exception = std::exception_ptr{};
backoff_end_time = std::chrono::system_clock::time_point{};
ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
}
catch (...)
{
++error_count;
last_exception = std::current_exception();
backoff_end_time = now + std::chrono::seconds(calculateDurationWithBackoff(rnd_engine, error_count));
tryLogException(last_exception, log, "Could not update cache dictionary '" + getFullName() +
"', next update is scheduled at " + ext::to_string(backoff_end_time));
}
}
size_t not_found_num = 0, found_num = 0;
/// Check which ids have not been found and require setting null_value
for (const auto & id_found_pair : remaining_ids)
{
if (id_found_pair.second)
{
++found_num;
continue;
}
++not_found_num;
const auto id = id_found_pair.first;
const auto find_result = findCellIdx(id, now);
const auto & cell_idx = find_result.cell_idx;
auto & cell = cells[cell_idx];
if (error_count)
{
if (find_result.outdated)
{
/// We have expired data for that `id` so we can continue using it.
bool was_default = cell.isDefault();
cell.setExpiresAt(backoff_end_time);
if (was_default)
cell.setDefault();
if (was_default)
on_id_not_found(id, cell_idx);
else
on_cell_updated(id, cell_idx);
continue;
}
/// We don't have expired data for that `id` so all we can do is to rethrow `last_exception`.
std::rethrow_exception(last_exception);
}
/// Check if cell had not been occupied before and increment element counter if it hadn't
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
{
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
}
else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
/// Set null_value for each attribute
cell.setDefault();
for (auto & attribute : attributes)
setDefaultAttributeValue(attribute, cell_idx);
/// inform caller that the cell has not been found
on_id_not_found(id, cell_idx);
}
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedMiss, not_found_num);
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequestedFound, found_num);
ProfileEvents::increment(ProfileEvents::DictCacheRequests);
}
}

View File

@ -382,133 +382,4 @@
</attribute>
</structure>
</dictionary>
<dictionary>
<name>one_cell_cache_ints</name>
<source>
<clickhouse>
<host>localhost</host>
<port>9000</port>
<user>default</user>
<password></password>
<db>test_01054</db>
<table>ints</table>
</clickhouse>
</source>
<lifetime>0</lifetime>
<layout>
<cache><size_in_cells>1</size_in_cells></cache>
</layout>
<structure>
<id>
<name>key</name>
</id>
<attribute>
<name>i8</name>
<type>Int8</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>i16</name>
<type>Int16</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>i32</name>
<type>Int32</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>i64</name>
<type>Int64</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>u8</name>
<type>UInt8</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>u16</name>
<type>UInt16</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>u32</name>
<type>UInt32</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>u64</name>
<type>UInt64</type>
<null_value>0</null_value>
</attribute>
</structure>
</dictionary>
<dictionary>
<name>one_cell_cache_ints_overflow</name>
<source>
<clickhouse>
<host>localhost</host>
<port>9000</port>
<user>default</user>
<password></password>
<db>test_01054_overflow</db>
<table>ints</table>
</clickhouse>
</source>
<lifetime>0</lifetime>
<layout>
<cache><size_in_cells>1</size_in_cells></cache>
</layout>
<structure>
<id>
<name>key</name>
</id>
<attribute>
<name>i8</name>
<type>Int8</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>i16</name>
<type>Int16</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>i32</name>
<type>Int32</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>i64</name>
<type>Int64</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>u8</name>
<type>UInt8</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>u16</name>
<type>UInt16</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>u32</name>
<type>UInt32</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>u64</name>
<type>UInt64</type>
<null_value>0</null_value>
</attribute>
</structure>
</dictionary>
</dictionaries>
</dictionaries>

View File

@ -643,22 +643,6 @@ class ClickHouseInstance:
return urllib.urlopen(url, data).read()
def kill_clickhouse(self, stop_start_wait_sec=5):
pid = self.get_process_pid("clickhouse")
if not pid:
raise Exception("No clickhouse found")
self.exec_in_container(["bash", "-c", "kill -9 {}".format(pid)], user='root')
time.sleep(stop_start_wait_sec)
def restore_clickhouse(self, retries=100):
pid = self.get_process_pid("clickhouse")
if pid:
raise Exception("ClickHouse has already started")
self.exec_in_container(["bash", "-c", "{} --daemon".format(CLICKHOUSE_START_COMMAND)], user=str(os.getuid()))
from helpers.test_tools import assert_eq_with_retry
# wait start
assert_eq_with_retry(self, "select 1", "1", retry_count=retries)
def restart_clickhouse(self, stop_start_wait_sec=5, kill=False):
if not self.stay_alive:
raise Exception("clickhouse can be restarted only with stay_alive=True instance")
@ -965,14 +949,3 @@ class ClickHouseInstance:
def destroy_dir(self):
if p.exists(self.path):
shutil.rmtree(self.path)
class ClickHouseKiller(object):
def __init__(self, clickhouse_node):
self.clickhouse_node = clickhouse_node
def __enter__(self):
self.clickhouse_node.kill_clickhouse()
def __exit__(self, exc_type, exc_val, exc_tb):
self.clickhouse_node.restore_clickhouse()

View File

@ -90,7 +90,7 @@ class PartitionManager:
self.heal_all()
class PartitionManagerDisabler:
class PartitionManagerDisbaler:
def __init__(self, manager):
self.manager = manager
self.rules = self.manager.pop_rules()

View File

@ -293,16 +293,13 @@ class DictionaryStructure(object):
class Dictionary(object):
def __init__(self, name, structure, source, config_path,
table_name, fields, min_lifetime=3, max_lifetime=5):
def __init__(self, name, structure, source, config_path, table_name, fields):
self.name = name
self.structure = copy.deepcopy(structure)
self.source = copy.deepcopy(source)
self.config_path = config_path
self.table_name = table_name
self.fields = fields
self.min_lifetime = min_lifetime
self.max_lifetime = max_lifetime
def generate_config(self):
with open(self.config_path, 'w') as result:
@ -310,8 +307,8 @@ class Dictionary(object):
<yandex>
<dictionary>
<lifetime>
<min>{min_lifetime}</min>
<max>{max_lifetime}</max>
<min>3</min>
<max>5</max>
</lifetime>
<name>{name}</name>
{structure}
@ -321,8 +318,6 @@ class Dictionary(object):
</dictionary>
</yandex>
'''.format(
min_lifetime=self.min_lifetime,
max_lifetime=self.max_lifetime,
name=self.name,
structure=self.structure.get_structure_str(),
source=self.source.get_source_str(self.table_name),

View File

@ -72,34 +72,34 @@ FIELDS = {
VALUES = {
"simple": [
[1, 22, 333, 4444, 55555, -6, -77,
-888, -999, '550e8400-e29b-41d4-a716-446655440003',
'1973-06-28', '1985-02-28 23:43:25', 'hello', 22.543, 3332154213.4, 0],
-888, -999, '550e8400-e29b-41d4-a716-446655440003',
'1973-06-28', '1985-02-28 23:43:25', 'hello', 22.543, 3332154213.4, 0],
[2, 3, 4, 5, 6, -7, -8,
-9, -10, '550e8400-e29b-41d4-a716-446655440002',
'1978-06-28', '1986-02-28 23:42:25', 'hello', 21.543, 3222154213.4, 1]
-9, -10, '550e8400-e29b-41d4-a716-446655440002',
'1978-06-28', '1986-02-28 23:42:25', 'hello', 21.543, 3222154213.4, 1]
],
"complex": [
[1, 'world', 22, 333, 4444, 55555, -6,
-77, -888, -999, '550e8400-e29b-41d4-a716-446655440003',
'1973-06-28', '1985-02-28 23:43:25',
'hello', 22.543, 3332154213.4],
-77, -888, -999, '550e8400-e29b-41d4-a716-446655440003',
'1973-06-28', '1985-02-28 23:43:25',
'hello', 22.543, 3332154213.4],
[2, 'qwerty2', 52, 2345, 6544, 9191991, -2,
-717, -81818, -92929, '550e8400-e29b-41d4-a716-446655440007',
'1975-09-28', '2000-02-28 23:33:24',
'my', 255.543, 3332221.44]
-717, -81818, -92929, '550e8400-e29b-41d4-a716-446655440007',
'1975-09-28', '2000-02-28 23:33:24',
'my', 255.543, 3332221.44]
],
"ranged": [
[1, '2019-02-10', '2019-02-01', '2019-02-28',
22, 333, 4444, 55555, -6, -77, -888, -999,
'550e8400-e29b-41d4-a716-446655440003',
'1973-06-28', '1985-02-28 23:43:25', 'hello',
22.543, 3332154213.4],
22, 333, 4444, 55555, -6, -77, -888, -999,
'550e8400-e29b-41d4-a716-446655440003',
'1973-06-28', '1985-02-28 23:43:25', 'hello',
22.543, 3332154213.4],
[2, '2019-04-10', '2019-04-01', '2019-04-28',
11, 3223, 41444, 52515, -65, -747, -8388, -9099,
'550e8400-e29b-41d4-a716-446655440004',
'1973-06-29', '2002-02-28 23:23:25', '!!!!',
32.543, 3332543.4]
11, 3223, 41444, 52515, -65, -747, -8388, -9099,
'550e8400-e29b-41d4-a716-446655440004',
'1973-06-29', '2002-02-28 23:23:25', '!!!!',
32.543, 3332543.4]
]
}

View File

@ -1,30 +0,0 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
<dictionaries_config>/etc/clickhouse-server/config.d/*.xml</dictionaries_config>
</yandex>

View File

@ -1,72 +0,0 @@
<yandex>
<dictionary>
<name>anime_dict</name>
<source>
<clickhouse>
<host>dictionary_node</host>
<port>9000</port>
<user>default</user>
<password></password>
<db>test</db>
<table>ints</table>
</clickhouse>
</source>
<lifetime>
<max>2</max>
<min>1</min>
</lifetime>
<layout>
<cache>
<size_in_cells>10000</size_in_cells>
<max_update_queue_size>10000</max_update_queue_size>
<allow_read_expired_keys>1</allow_read_expired_keys>
<update_queue_push_timeout_milliseconds>10</update_queue_push_timeout_milliseconds>
</cache>
</layout>
<structure>
<id>
<name>key</name>
</id>
<attribute>
<name>i8</name>
<type>Int8</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>i16</name>
<type>Int16</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>i32</name>
<type>Int32</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>i64</name>
<type>Int64</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>u8</name>
<type>UInt8</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>u16</name>
<type>UInt16</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>u32</name>
<type>UInt32</type>
<null_value>0</null_value>
</attribute>
<attribute>
<name>u64</name>
<type>UInt64</type>
<null_value>0</null_value>
</attribute>
</structure>
</dictionary>
</yandex>

View File

@ -1,23 +0,0 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -1,64 +0,0 @@
from __future__ import print_function
import pytest
import time
import os
from contextlib import contextmanager
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseKiller
from helpers.network import PartitionManager
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
dictionary_node = cluster.add_instance('dictionary_node', stay_alive=True)
main_node = cluster.add_instance('main_node', main_configs=['configs/dictionaries/cache_ints_dictionary.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
dictionary_node.query("create database if not exists test;")
dictionary_node.query("drop table if exists test.ints;")
dictionary_node.query("create table test.ints "
"(key UInt64, "
"i8 Int8, i16 Int16, i32 Int32, i64 Int64, "
"u8 UInt8, u16 UInt16, u32 UInt32, u64 UInt64) "
"Engine = Memory;")
dictionary_node.query("insert into test.ints values (7, 7, 7, 7, 7, 7, 7, 7, 7);")
dictionary_node.query("insert into test.ints values (5, 5, 5, 5, 5, 5, 5, 5, 5);")
yield cluster
finally:
cluster.shutdown()
# @pytest.mark.skip(reason="debugging")
def test_default_reading(started_cluster):
assert None != dictionary_node.get_process_pid("clickhouse"), "ClickHouse must be alive"
# Key 0 is not in dictionary, so default value will be returned
def test_helper():
assert '42' == main_node.query("select dictGetOrDefault('anime_dict', 'i8', toUInt64(13), toInt8(42));").rstrip()
assert '42' == main_node.query("select dictGetOrDefault('anime_dict', 'i16', toUInt64(13), toInt16(42));").rstrip()
assert '42' == main_node.query("select dictGetOrDefault('anime_dict', 'i32', toUInt64(13), toInt32(42));").rstrip()
assert '42' == main_node.query("select dictGetOrDefault('anime_dict', 'i64', toUInt64(13), toInt64(42));").rstrip()
assert '42' == main_node.query("select dictGetOrDefault('anime_dict', 'u8', toUInt64(13), toUInt8(42));").rstrip()
assert '42' == main_node.query("select dictGetOrDefault('anime_dict', 'u16', toUInt64(13), toUInt16(42));").rstrip()
assert '42' == main_node.query("select dictGetOrDefault('anime_dict', 'u32', toUInt64(13), toUInt32(42));").rstrip()
assert '42' == main_node.query("select dictGetOrDefault('anime_dict', 'u64', toUInt64(13), toUInt64(42));").rstrip()
test_helper()
with PartitionManager() as pm, ClickHouseKiller(dictionary_node):
assert None == dictionary_node.get_process_pid("clickhouse"), "CLickHouse must be alive"
# Remove connection between main_node and dictionary for sure
pm.heal_all()
pm.partition_instances(main_node, dictionary_node)
# Dictionary max lifetime is 2 seconds.
time.sleep(3)
test_helper()

View File

@ -1,63 +0,0 @@
from __future__ import print_function
import pytest
import time
import os
from contextlib import contextmanager
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseKiller
from helpers.network import PartitionManager
from helpers.network import PartitionManagerDisabler
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
dictionary_node = cluster.add_instance('dictionary_node', stay_alive=True)
main_node = cluster.add_instance('main_node', main_configs=['configs/dictionaries/cache_ints_dictionary.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
dictionary_node.query("create database if not exists test;")
dictionary_node.query("drop table if exists test.ints;")
dictionary_node.query("create table test.ints "
"(key UInt64, "
"i8 Int8, i16 Int16, i32 Int32, i64 Int64, "
"u8 UInt8, u16 UInt16, u32 UInt32, u64 UInt64) "
"Engine = Memory;")
dictionary_node.query("insert into test.ints values (7, 7, 7, 7, 7, 7, 7, 7, 7);")
dictionary_node.query("insert into test.ints values (5, 5, 5, 5, 5, 5, 5, 5, 5);")
yield cluster
finally:
cluster.shutdown()
# @pytest.mark.skip(reason="debugging")
def test_simple_dict_get(started_cluster):
assert None != dictionary_node.get_process_pid("clickhouse"), "ClickHouse must be alive"
def test_helper():
assert '7' == main_node.query("select dictGet('anime_dict', 'i8', toUInt64(7));").rstrip(), "Wrong answer."
assert '7' == main_node.query("select dictGet('anime_dict', 'i16', toUInt64(7));").rstrip(), "Wrong answer."
assert '7' == main_node.query("select dictGet('anime_dict', 'i32', toUInt64(7));").rstrip(), "Wrong answer."
assert '7' == main_node.query("select dictGet('anime_dict', 'i64', toUInt64(7));").rstrip(), "Wrong answer."
assert '7' == main_node.query("select dictGet('anime_dict', 'u8', toUInt64(7));").rstrip(), "Wrong answer."
assert '7' == main_node.query("select dictGet('anime_dict', 'u16', toUInt64(7));").rstrip(), "Wrong answer."
assert '7' == main_node.query("select dictGet('anime_dict', 'u32', toUInt64(7));").rstrip(), "Wrong answer."
assert '7' == main_node.query("select dictGet('anime_dict', 'u64', toUInt64(7));").rstrip(), "Wrong answer."
test_helper()
with PartitionManager() as pm, ClickHouseKiller(dictionary_node):
assert None == dictionary_node.get_process_pid("clickhouse")
# Remove connection between main_node and dictionary for sure
pm.heal_all()
pm.partition_instances(main_node, dictionary_node)
# Dictionary max lifetime is 2 seconds.
time.sleep(3)
test_helper()

View File

@ -1,60 +0,0 @@
from __future__ import print_function
import pytest
import time
import os
from contextlib import contextmanager
from helpers.cluster import ClickHouseCluster
from helpers.cluster import ClickHouseKiller
from helpers.network import PartitionManager
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
cluster = ClickHouseCluster(__file__, base_configs_dir=os.path.join(SCRIPT_DIR, 'configs'))
dictionary_node = cluster.add_instance('dictionary_node', stay_alive=True)
main_node = cluster.add_instance('main_node', main_configs=['configs/dictionaries/cache_ints_dictionary.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
dictionary_node.query("create database if not exists test;")
dictionary_node.query("drop table if exists test.ints;")
dictionary_node.query("create table test.ints "
"(key UInt64, "
"i8 Int8, i16 Int16, i32 Int32, i64 Int64, "
"u8 UInt8, u16 UInt16, u32 UInt32, u64 UInt64) "
"Engine = Memory;")
dictionary_node.query("insert into test.ints values (7, 7, 7, 7, 7, 7, 7, 7, 7);")
dictionary_node.query("insert into test.ints values (5, 5, 5, 5, 5, 5, 5, 5, 5);")
yield cluster
finally:
cluster.shutdown()
# @pytest.mark.skip(reason="debugging")
def test_simple_dict_get_or_default(started_cluster):
assert None != dictionary_node.get_process_pid("clickhouse"), "ClickHouse must be alive"
def test_helper():
assert '5' == main_node.query("select dictGetOrDefault('anime_dict', 'i8', toUInt64(5), toInt8(42));").rstrip()
assert '5' == main_node.query("select dictGetOrDefault('anime_dict', 'i16', toUInt64(5), toInt16(42));").rstrip()
assert '5' == main_node.query("select dictGetOrDefault('anime_dict', 'i32', toUInt64(5), toInt32(42));").rstrip()
assert '5' == main_node.query("select dictGetOrDefault('anime_dict', 'i64', toUInt64(5), toInt64(42));").rstrip()
assert '5' == main_node.query("select dictGetOrDefault('anime_dict', 'u8', toUInt64(5), toUInt8(42));").rstrip()
assert '5' == main_node.query("select dictGetOrDefault('anime_dict', 'u16', toUInt64(5), toUInt16(42));").rstrip()
assert '5' == main_node.query("select dictGetOrDefault('anime_dict', 'u32', toUInt64(5), toUInt32(42));").rstrip()
assert '5' == main_node.query("select dictGetOrDefault('anime_dict', 'u64', toUInt64(5), toUInt64(42));").rstrip()
test_helper()
with PartitionManager() as pm, ClickHouseKiller(dictionary_node):
assert None == dictionary_node.get_process_pid("clickhouse")
# Remove connection between main_node and dictionary for sure
pm.partition_instances(main_node, dictionary_node)
# Dictionary max lifetime is 2 seconds.
time.sleep(3)
test_helper()

View File

@ -1,74 +0,0 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --query="create database if not exists test_01054;"
$CLICKHOUSE_CLIENT --query="drop table if exists test_01054.ints;"
$CLICKHOUSE_CLIENT --query="create table test_01054.ints
(key UInt64, i8 Int8, i16 Int16, i32 Int32, i64 Int64, u8 UInt8, u16 UInt16, u32 UInt32, u64 UInt64)
Engine = Memory;"
$CLICKHOUSE_CLIENT --query="insert into test_01054.ints values (1, 1, 1, 1, 1, 1, 1, 1, 1);"
$CLICKHOUSE_CLIENT --query="insert into test_01054.ints values (2, 2, 2, 2, 2, 2, 2, 2, 2);"
$CLICKHOUSE_CLIENT --query="insert into test_01054.ints values (3, 3, 3, 3, 3, 3, 3, 3, 3);"
function thread1()
{
for attempt_thread1 in {1..100}
do
RAND_NUMBER_THREAD1=$($CLICKHOUSE_CLIENT --query="SELECT rand() % 100;")
$CLICKHOUSE_CLIENT --query="select dictGet('one_cell_cache_ints', 'i8', toUInt64($RAND_NUMBER_THREAD1));"
done
}
function thread2()
{
for attempt_thread2 in {1..100}
do
RAND_NUMBER_THREAD2=$($CLICKHOUSE_CLIENT --query="SELECT rand() % 100;")
$CLICKHOUSE_CLIENT --query="select dictGet('one_cell_cache_ints', 'i8', toUInt64($RAND_NUMBER_THREAD2));"
done
}
function thread3()
{
for attempt_thread3 in {1..100}
do
RAND_NUMBER_THREAD3=$($CLICKHOUSE_CLIENT --query="SELECT rand() % 100;")
$CLICKHOUSE_CLIENT --query="select dictGet('one_cell_cache_ints', 'i8', toUInt64($RAND_NUMBER_THREAD3));"
done
}
function thread4()
{
for attempt_thread4 in {1..100}
do
RAND_NUMBER_THREAD4=$($CLICKHOUSE_CLIENT --query="SELECT rand() % 100;")
$CLICKHOUSE_CLIENT --query="select dictGet('one_cell_cache_ints', 'i8', toUInt64($RAND_NUMBER_THREAD4));"
done
}
export -f thread1;
export -f thread2;
export -f thread3;
export -f thread4;
TIMEOUT=10
# shellcheck disable=SC2188
timeout $TIMEOUT bash -c thread1 > /dev/null 2>&1 &
timeout $TIMEOUT bash -c thread2 > /dev/null 2>&1 &
timeout $TIMEOUT bash -c thread3 > /dev/null 2>&1 &
timeout $TIMEOUT bash -c thread4 > /dev/null 2>&1 &
wait
echo OK
$CLICKHOUSE_CLIENT --query "DROP TABLE if exists test_01054.ints"

View File

@ -1,2 +0,0 @@
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20]

View File

@ -1,56 +0,0 @@
create database if not exists test_01054_overflow;
drop table if exists test_01054_overflow.ints;
create table test_01054_overflow.ints (key UInt64, i8 Int8, i16 Int16, i32 Int32, i64 Int64, u8 UInt8, u16 UInt16, u32 UInt32, u64 UInt64) Engine = Memory;
insert into test_01054_overflow.ints values (1, 1, 1, 1, 1, 1, 1, 1, 1);
insert into test_01054_overflow.ints values (2, 2, 2, 2, 2, 2, 2, 2, 2);
insert into test_01054_overflow.ints values (3, 3, 3, 3, 3, 3, 3, 3, 3);
insert into test_01054_overflow.ints values (4, 4, 4, 4, 4, 4, 4, 4, 4);
insert into test_01054_overflow.ints values (5, 5, 5, 5, 5, 5, 5, 5, 5);
insert into test_01054_overflow.ints values (6, 6, 6, 6, 6, 6, 6, 6, 6);
insert into test_01054_overflow.ints values (7, 7, 7, 7, 7, 7, 7, 7, 7);
insert into test_01054_overflow.ints values (8, 8, 8, 8, 8, 8, 8, 8, 8);
insert into test_01054_overflow.ints values (9, 9, 9, 9, 9, 9, 9, 9, 9);
insert into test_01054_overflow.ints values (10, 10, 10, 10, 10, 10, 10, 10, 10);
insert into test_01054_overflow.ints values (11, 11, 11, 11, 11, 11, 11, 11, 11);
insert into test_01054_overflow.ints values (12, 12, 12, 12, 12, 12, 12, 12, 12);
insert into test_01054_overflow.ints values (13, 13, 13, 13, 13, 13, 13, 13, 13);
insert into test_01054_overflow.ints values (14, 14, 14, 14, 14, 14, 14, 14, 14);
insert into test_01054_overflow.ints values (15, 15, 15, 15, 15, 15, 15, 15, 15);
insert into test_01054_overflow.ints values (16, 16, 16, 16, 16, 16, 16, 16, 16);
insert into test_01054_overflow.ints values (17, 17, 17, 17, 17, 17, 17, 17, 17);
insert into test_01054_overflow.ints values (18, 18, 18, 18, 18, 18, 18, 18, 18);
insert into test_01054_overflow.ints values (19, 19, 19, 19, 19, 19, 19, 19, 19);
insert into test_01054_overflow.ints values (20, 20, 20, 20, 20, 20, 20, 20, 20);
select
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(1)),
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(2)),
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(3)),
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(4)),
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(5)),
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(6)),
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(7)),
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(8)),
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(9)),
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(10)),
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(11)),
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(12)),
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(13)),
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(14)),
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(15)),
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(16)),
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(17)),
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(18)),
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(19)),
dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(20));
SELECT arrayMap(x -> dictGet('one_cell_cache_ints_overflow', 'i8', toUInt64(x)), array)
FROM
(
SELECT [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20] AS array
);
DROP TABLE if exists test_01054.ints;