This commit is contained in:
Nikita Mikhaylov 2020-04-17 20:01:18 +03:00
parent 48ba1f4ced
commit c603acd515
3 changed files with 129 additions and 27 deletions

View File

@ -46,6 +46,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
extern const int UNSUPPORTED_METHOD; extern const int UNSUPPORTED_METHOD;
extern const int TOO_SMALL_BUFFER_SIZE; extern const int TOO_SMALL_BUFFER_SIZE;
extern const int TIMEOUT_EXCEEDED;
} }
@ -63,10 +64,12 @@ CacheDictionary::CacheDictionary(
const DictionaryStructure & dict_struct_, const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_, DictionarySourcePtr source_ptr_,
DictionaryLifetime dict_lifetime_, DictionaryLifetime dict_lifetime_,
size_t strict_max_lifetime_seconds_,
size_t size_, size_t size_,
bool allow_read_expired_keys_, bool allow_read_expired_keys_,
size_t max_update_queue_size_, size_t max_update_queue_size_,
size_t update_queue_push_timeout_milliseconds_, size_t update_queue_push_timeout_milliseconds_,
size_t query_wait_timeout_milliseconds_,
size_t max_threads_for_updates_) size_t max_threads_for_updates_)
: database(database_) : database(database_)
, name(name_) , name(name_)
@ -74,9 +77,11 @@ CacheDictionary::CacheDictionary(
, dict_struct(dict_struct_) , dict_struct(dict_struct_)
, source_ptr{std::move(source_ptr_)} , source_ptr{std::move(source_ptr_)}
, dict_lifetime(dict_lifetime_) , dict_lifetime(dict_lifetime_)
, strict_max_lifetime_seconds(strict_max_lifetime_seconds_)
, allow_read_expired_keys(allow_read_expired_keys_) , allow_read_expired_keys(allow_read_expired_keys_)
, max_update_queue_size(max_update_queue_size_) , max_update_queue_size(max_update_queue_size_)
, update_queue_push_timeout_milliseconds(update_queue_push_timeout_milliseconds_) , update_queue_push_timeout_milliseconds(update_queue_push_timeout_milliseconds_)
, query_wait_timeout_milliseconds(query_wait_timeout_milliseconds_)
, max_threads_for_updates(max_threads_for_updates_) , max_threads_for_updates(max_threads_for_updates_)
, log(&Logger::get("ExternalDictionaries")) , log(&Logger::get("ExternalDictionaries"))
, size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))} , size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
@ -332,6 +337,13 @@ void CacheDictionary::has(const PaddedPODArray<Key> & ids, PaddedPODArray<UInt8>
{ {
if (find_result.outdated) if (find_result.outdated)
{ {
/// Protection of reading very expired keys.
if (now > cells[find_result.cell_idx].strict_max)
{
cache_not_found_ids[id].push_back(row);
continue;
}
cache_expired_ids[id].push_back(row); cache_expired_ids[id].push_back(row);
if (allow_read_expired_keys) if (allow_read_expired_keys)
@ -693,6 +705,9 @@ void registerDictionaryCache(DictionaryFactory & factory)
const String name = config.getString(config_prefix + ".name"); const String name = config.getString(config_prefix + ".name");
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"}; const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
const size_t strict_max_lifetime_seconds =
config.getUInt64(layout_prefix + ".cache.strict_max_lifetime_seconds", static_cast<size_t>(dict_lifetime.max_sec));
const size_t max_update_queue_size = const size_t max_update_queue_size =
config.getUInt64(layout_prefix + ".cache.max_update_queue_size", 100000); config.getUInt64(layout_prefix + ".cache.max_update_queue_size", 100000);
if (max_update_queue_size == 0) if (max_update_queue_size == 0)
@ -708,6 +723,9 @@ void registerDictionaryCache(DictionaryFactory & factory)
throw Exception{name + ": dictionary of layout 'cache' have too little update_queue_push_timeout", throw Exception{name + ": dictionary of layout 'cache' have too little update_queue_push_timeout",
ErrorCodes::BAD_ARGUMENTS}; ErrorCodes::BAD_ARGUMENTS};
const size_t query_wait_timeout_milliseconds =
config.getUInt64(layout_prefix + ".cache.query_wait_timeout_milliseconds", 60000);
const size_t max_threads_for_updates = const size_t max_threads_for_updates =
config.getUInt64(layout_prefix + ".max_threads_for_updates", 4); config.getUInt64(layout_prefix + ".max_threads_for_updates", 4);
if (max_threads_for_updates == 0) if (max_threads_for_updates == 0)
@ -715,8 +733,17 @@ void registerDictionaryCache(DictionaryFactory & factory)
ErrorCodes::BAD_ARGUMENTS}; ErrorCodes::BAD_ARGUMENTS};
return std::make_unique<CacheDictionary>( return std::make_unique<CacheDictionary>(
database, name, dict_struct, std::move(source_ptr), dict_lifetime, size, database,
allow_read_expired_keys, max_update_queue_size, update_queue_push_timeout_milliseconds, name,
dict_struct,
std::move(source_ptr),
dict_lifetime,
strict_max_lifetime_seconds,
size,
allow_read_expired_keys,
max_update_queue_size,
update_queue_push_timeout_milliseconds,
query_wait_timeout_milliseconds,
max_threads_for_updates); max_threads_for_updates);
}; };
factory.registerLayout("cache", create_layout, false); factory.registerLayout("cache", create_layout, false);
@ -782,20 +809,32 @@ void CacheDictionary::updateThreadFunction()
void CacheDictionary::waitForCurrentUpdateFinish(UpdateUnitPtr & update_unit_ptr) const void CacheDictionary::waitForCurrentUpdateFinish(UpdateUnitPtr & update_unit_ptr) const
{ {
std::unique_lock<std::mutex> lock(update_mutex); std::unique_lock<std::mutex> update_lock(update_mutex);
/* size_t timeout_for_wait = 100000;
* We wait here without any timeout to avoid SEGFAULT's. bool result = is_update_finished.wait_for(
* Consider timeout for wait had expired and main query's thread ended with exception update_lock,
* or some other error. But the UpdateUnit with callbacks is left in the queue. std::chrono::milliseconds(timeout_for_wait),
* It has these callback that capture god knows what from the current thread
* (most of the variables lies on the stack of finished thread) that
* intended to do a synchronous update. AsyncUpdate thread can touch deallocated memory and explode.
* */
is_update_finished.wait(
lock,
[&] {return update_unit_ptr->is_done || update_unit_ptr->current_exception; }); [&] {return update_unit_ptr->is_done || update_unit_ptr->current_exception; });
if (!result)
{
std::lock_guard<std::mutex> callback_lock(update_unit_ptr->callback_mutex);
/*
* We acquire a lock here and store false to special variable to avoid SEGFAULT's.
* Consider timeout for wait had expired and main query's thread ended with exception
* or some other error. But the UpdateUnit with callbacks is left in the queue.
* It has these callback that capture god knows what from the current thread
* (most of the variables lies on the stack of finished thread) that
* intended to do a synchronous update. AsyncUpdate thread can touch deallocated memory and explode.
* */
update_unit_ptr->can_use_callback = false;
throw DB::Exception(
"Dictionary " + getName() + " source seems unavailable, because " +
toString(timeout_for_wait) + " timeout exceeded.", ErrorCodes::TIMEOUT_EXCEEDED);
}
if (update_unit_ptr->current_exception) if (update_unit_ptr->current_exception)
std::rethrow_exception(update_unit_ptr->current_exception); std::rethrow_exception(update_unit_ptr->current_exception);
} }
@ -968,9 +1007,14 @@ void CacheDictionary::update(BunchUpdateUnit & bunch_update_unit) const
{ {
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec}; std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)}); cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
cell.strict_max = now + std::chrono::seconds{strict_max_lifetime_seconds};
} }
else else
{
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max()); cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
cell.strict_max = now + std::chrono::seconds{strict_max_lifetime_seconds};
}
/// Set null_value for each attribute /// Set null_value for each attribute
cell.setDefault(); cell.setDefault();

View File

@ -55,10 +55,12 @@ public:
const DictionaryStructure & dict_struct_, const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_, DictionarySourcePtr source_ptr_,
DictionaryLifetime dict_lifetime_, DictionaryLifetime dict_lifetime_,
size_t strict_max_lifetime_seconds,
size_t size_, size_t size_,
bool allow_read_expired_keys_, bool allow_read_expired_keys_,
size_t max_update_queue_size_, size_t max_update_queue_size_,
size_t update_queue_push_timeout_milliseconds_, size_t update_queue_push_timeout_milliseconds_,
size_t query_wait_timeout_milliseconds,
size_t max_threads_for_updates); size_t max_threads_for_updates);
~CacheDictionary() override; ~CacheDictionary() override;
@ -87,9 +89,18 @@ public:
std::shared_ptr<const IExternalLoadable> clone() const override std::shared_ptr<const IExternalLoadable> clone() const override
{ {
return std::make_shared<CacheDictionary>( return std::make_shared<CacheDictionary>(
database, name, dict_struct, source_ptr->clone(), dict_lifetime, size, database,
allow_read_expired_keys, max_update_queue_size, name,
update_queue_push_timeout_milliseconds, max_threads_for_updates); dict_struct,
source_ptr->clone(),
dict_lifetime,
strict_max_lifetime_seconds,
size,
allow_read_expired_keys,
max_update_queue_size,
update_queue_push_timeout_milliseconds,
query_wait_timeout_milliseconds,
max_threads_for_updates);
} }
const IDictionarySource * getSource() const override { return source_ptr.get(); } const IDictionarySource * getSource() const override { return source_ptr.get(); }
@ -206,6 +217,8 @@ private:
/// Stores both expiration time and `is_default` flag in the most significant bit /// Stores both expiration time and `is_default` flag in the most significant bit
time_point_urep_t data; time_point_urep_t data;
time_point_t strict_max;
/// Sets expiration time, resets `is_default` flag to false /// Sets expiration time, resets `is_default` flag to false
time_point_t expiresAt() const { return ext::safe_bit_cast<time_point_t>(data & EXPIRES_AT_MASK); } time_point_t expiresAt() const { return ext::safe_bit_cast<time_point_t>(data & EXPIRES_AT_MASK); }
void setExpiresAt(const time_point_t & t) { data = ext::safe_bit_cast<time_point_urep_t>(t); } void setExpiresAt(const time_point_t & t) { data = ext::safe_bit_cast<time_point_urep_t>(t); }
@ -294,9 +307,11 @@ private:
const DictionaryStructure dict_struct; const DictionaryStructure dict_struct;
mutable DictionarySourcePtr source_ptr; mutable DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime; const DictionaryLifetime dict_lifetime;
const size_t strict_max_lifetime_seconds;
const bool allow_read_expired_keys; const bool allow_read_expired_keys;
const size_t max_update_queue_size; const size_t max_update_queue_size;
const size_t update_queue_push_timeout_milliseconds; const size_t update_queue_push_timeout_milliseconds;
const size_t query_wait_timeout_milliseconds;
const size_t max_threads_for_updates; const size_t max_threads_for_updates;
Logger * const log; Logger * const log;
@ -366,6 +381,12 @@ private:
alive_keys(CurrentMetrics::CacheDictionaryUpdateQueueKeys, requested_ids.size()){} alive_keys(CurrentMetrics::CacheDictionaryUpdateQueueKeys, requested_ids.size()){}
std::vector<Key> requested_ids; std::vector<Key> requested_ids;
/// It might seem that it is a leak of performance.
/// But aquiring a mutex without contention is rather cheap.
std::mutex callback_mutex;
bool can_use_callback{true};
PresentIdHandler present_id_handler; PresentIdHandler present_id_handler;
AbsentIdHandler absent_id_handler; AbsentIdHandler absent_id_handler;
@ -412,6 +433,7 @@ private:
helper.push_back(unit_ptr->requested_ids.size() + helper.back()); helper.push_back(unit_ptr->requested_ids.size() + helper.back());
present_id_handlers.emplace_back(unit_ptr->present_id_handler); present_id_handlers.emplace_back(unit_ptr->present_id_handler);
absent_id_handlers.emplace_back(unit_ptr->absent_id_handler); absent_id_handlers.emplace_back(unit_ptr->absent_id_handler);
update_units.emplace_back(unit_ptr);
} }
concatenated_requested_ids.reserve(total_requested_keys_count); concatenated_requested_ids.reserve(total_requested_keys_count);
@ -428,31 +450,51 @@ private:
void informCallersAboutPresentId(Key id, size_t cell_idx) void informCallersAboutPresentId(Key id, size_t cell_idx)
{ {
for (size_t i = 0; i < concatenated_requested_ids.size(); ++i) for (size_t position = 0; position < concatenated_requested_ids.size(); ++position)
{ {
auto & curr = concatenated_requested_ids[i]; if (concatenated_requested_ids[position] == id)
if (curr == id) {
getPresentIdHandlerForPosition(i)(id, cell_idx); auto unit_number = getUpdateUnitNumberForRequestedIdPosition(position);
auto lock = getLockToCurrentUnit(unit_number);
if (canUseCallback(unit_number))
getPresentIdHandlerForPosition(unit_number)(id, cell_idx);
}
} }
} }
void informCallersAboutAbsentId(Key id, size_t cell_idx) void informCallersAboutAbsentId(Key id, size_t cell_idx)
{ {
for (size_t i = 0; i < concatenated_requested_ids.size(); ++i) for (size_t position = 0; position < concatenated_requested_ids.size(); ++position)
if (concatenated_requested_ids[i] == id) if (concatenated_requested_ids[position] == id)
getAbsentIdHandlerForPosition(i)(id, cell_idx); {
auto unit_number = getUpdateUnitNumberForRequestedIdPosition(position);
auto lock = getLockToCurrentUnit(unit_number);
if (canUseCallback(unit_number))
getAbsentIdHandlerForPosition(unit_number)(id, cell_idx);
}
} }
private: private:
PresentIdHandler & getPresentIdHandlerForPosition(size_t position) /// Needed for control the usage of callback to avoid SEGFAULTs.
bool canUseCallback(size_t unit_number)
{ {
return present_id_handlers[getUpdateUnitNumberForRequestedIdPosition(position)]; return update_units[unit_number].get()->can_use_callback;
} }
AbsentIdHandler & getAbsentIdHandlerForPosition(size_t position) std::unique_lock<std::mutex> getLockToCurrentUnit(size_t unit_number)
{ {
return absent_id_handlers[getUpdateUnitNumberForRequestedIdPosition((position))]; return std::unique_lock<std::mutex>(update_units[unit_number].get()->callback_mutex);
}
PresentIdHandler & getPresentIdHandlerForPosition(size_t unit_number)
{
return update_units[unit_number].get()->present_id_handler;
}
AbsentIdHandler & getAbsentIdHandlerForPosition(size_t unit_number)
{
return update_units[unit_number].get()->absent_id_handler;
} }
size_t getUpdateUnitNumberForRequestedIdPosition(size_t position) size_t getUpdateUnitNumberForRequestedIdPosition(size_t position)
@ -464,6 +506,8 @@ private:
std::vector<PresentIdHandler> present_id_handlers; std::vector<PresentIdHandler> present_id_handlers;
std::vector<AbsentIdHandler> absent_id_handlers; std::vector<AbsentIdHandler> absent_id_handlers;
std::vector<std::reference_wrapper<UpdateUnitPtr>> update_units;
std::vector<size_t> helper; std::vector<size_t> helper;
}; };

View File

@ -75,6 +75,13 @@ void CacheDictionary::getItemsNumberImpl(
if (find_result.outdated) if (find_result.outdated)
{ {
/// Protection of reading very expired keys.
if (now > cells[find_result.cell_idx].strict_max)
{
cache_not_found_ids[id].push_back(row);
continue;
}
cache_expired_ids[id].push_back(row); cache_expired_ids[id].push_back(row);
if (allow_read_expired_keys) if (allow_read_expired_keys)
update_routine(); update_routine();
@ -249,6 +256,13 @@ void CacheDictionary::getItemsString(
{ {
if (find_result.outdated) if (find_result.outdated)
{ {
/// Protection of reading very expired keys.
if (now > cells[find_result.cell_idx].strict_max)
{
cache_not_found_ids[id].push_back(row);
continue;
}
cache_expired_ids[id].push_back(row); cache_expired_ids[id].push_back(row);
if (allow_read_expired_keys) if (allow_read_expired_keys)