dbms: reload initially failed dictionaries with exponential backoff [#METR-16702]

This commit is contained in:
Andrey Mironov 2015-06-09 19:12:51 +03:00
parent c94bd2fc09
commit 4fca014e1b
6 changed files with 155 additions and 36 deletions

View File

@ -42,6 +42,8 @@ public:
: CacheDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime, other.size}
{}
std::exception_ptr getCreationException() const override { return {}; }
std::string getName() const override { return name; }
std::string getTypeName() const override { return "Cache"; }

View File

@ -25,8 +25,17 @@ public:
source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime)
{
createAttributes();
loadData();
calculateBytesAllocated();
try
{
loadData();
calculateBytesAllocated();
}
catch (...)
{
creation_exception = std::current_exception();
}
creation_time = std::chrono::system_clock::now();
}
@ -34,6 +43,8 @@ public:
: FlatDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime}
{}
std::exception_ptr getCreationException() const override { return creation_exception; }
std::string getName() const override { return name; }
std::string getTypeName() const override { return "Flat"; }
@ -398,10 +409,11 @@ private:
std::size_t bytes_allocated = 0;
std::size_t element_count = 0;
std::size_t bucket_count = 0;
mutable std::atomic<std::size_t> query_count;
std::chrono::time_point<std::chrono::system_clock> creation_time;
mutable std::atomic<std::size_t> query_count;
std::exception_ptr creation_exception;
};
}

View File

@ -22,8 +22,17 @@ public:
source_ptr{std::move(source_ptr)}, dict_lifetime(dict_lifetime)
{
createAttributes();
loadData();
calculateBytesAllocated();
try
{
loadData();
calculateBytesAllocated();
}
catch (...)
{
creation_exception = std::current_exception();
}
creation_time = std::chrono::system_clock::now();
}
@ -31,6 +40,8 @@ public:
: HashedDictionary{other.name, other.dict_struct, other.source_ptr->clone(), other.dict_lifetime}
{}
std::exception_ptr getCreationException() const override { return creation_exception; }
std::string getName() const override { return name; }
std::string getTypeName() const override { return "Hashed"; }
@ -389,6 +400,8 @@ private:
mutable std::atomic<std::size_t> query_count{};
std::chrono::time_point<std::chrono::system_clock> creation_time;
std::exception_ptr creation_exception;
};
}

View File

@ -24,6 +24,8 @@ class IDictionary
public:
using id_t = std::uint64_t;
virtual std::exception_ptr getCreationException() const = 0;
virtual std::string getName() const = 0;
virtual std::string getTypeName() const = 0;

View File

@ -1,23 +1,23 @@
#pragma once
#include <DB/Dictionaries/IDictionary.h>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
#include <Yandex/MultiVersion.h>
#include <Yandex/logger_useful.h>
#include <Poco/Event.h>
#include <unistd.h>
#include <time.h>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <chrono>
#include <random>
#include <unistd.h>
namespace DB
{
class Context;
class IDictionary;
/** Manages user-defined dictionaries.
* Monitors configuration file and automatically reloads dictionaries in a separate thread.
@ -50,8 +50,16 @@ private:
std::exception_ptr exception;
};
struct failed_dictionary_info final
{
std::unique_ptr<IDictionary> dict;
std::chrono::system_clock::time_point next_attempt_time;
std::uint64_t error_count;
};
std::unordered_map<std::string, dictionary_info> dictionaries;
std::unordered_map<std::string, std::chrono::system_clock::time_point> update_times;
std::unordered_map<std::string, failed_dictionary_info> failed_dictionaries;
std::mt19937_64 rnd_engine{getSeed()};
Context & context;
@ -81,7 +89,7 @@ private:
{
timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return ts.tv_nsec ^ getpid();
return static_cast<std::uint64_t>(ts.tv_nsec ^ getpid());
}
public:

View File

@ -6,6 +6,15 @@
#include <Poco/Util/Application.h>
#include <Poco/Glob.h>
namespace
{
/// 5 seconds
const auto backoff_initial_sec = 5;
/// 10 minutes
const auto backoff_max_sec = 10 * 60;
}
namespace DB
{
@ -41,6 +50,63 @@ void ExternalDictionaries::reloadImpl(const bool throw_on_error)
for (const auto & config_path : config_paths)
reloadFromFile(config_path, throw_on_error);
/// list of recreated dictionaries to perform delayed removal from unordered_map
std::list<std::string> recreated_failed_dictionaries;
/// retry loading failed dictionaries
for (auto & failed_dictionary : failed_dictionaries)
{
if (std::chrono::system_clock::now() < failed_dictionary.second.next_attempt_time)
continue;
const auto & name = failed_dictionary.first;
try
{
auto dict_ptr = failed_dictionary.second.dict->clone();
if (dict_ptr->getCreationException())
{
/// recalculate next attempt time
std::uniform_int_distribution<std::uint64_t> distribution(
0, std::exp2(failed_dictionary.second.error_count));
failed_dictionary.second.next_attempt_time = std::chrono::system_clock::now() +
std::chrono::seconds{
std::min<std::uint64_t>(backoff_max_sec, backoff_initial_sec + distribution(rnd_engine))
};
++failed_dictionary.second.error_count;
}
else
{
const std::lock_guard<std::mutex> lock{dictionaries_mutex};
const auto dict_it = dictionaries.find(name);
if (dict_it->second.dict)
dict_it->second.dict->set(dict_ptr.release());
else
dict_it->second.dict = std::make_shared<MultiVersion<IDictionary>>(dict_ptr.release());
/// erase stored exception on success
dict_it->second.exception = std::exception_ptr{};
const auto & lifetime = dict_ptr->getLifetime();
std::uniform_int_distribution<std::uint64_t> distribution{lifetime.min_sec, lifetime.max_sec};
update_times[name] = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)};
recreated_failed_dictionaries.push_back(name);
}
}
catch (...)
{
LOG_ERROR(log, "Failed reloading " << name << " dictionary due to unexpected error");
}
}
/// do not undertake further attempts to recreate these dictionaries
for (const auto & name : recreated_failed_dictionaries)
failed_dictionaries.erase(name);
/// periodic update
for (auto & dictionary : dictionaries)
{
@ -122,10 +188,10 @@ void ExternalDictionaries::reloadFromFile(const std::string & config_path, const
}
else
{
auto it = last_modification_times.find(config_path);
if (it == std::end(last_modification_times))
it = last_modification_times.emplace(config_path, Poco::Timestamp{0}).first;
auto & config_last_modified = it->second;
auto modification_time_it = last_modification_times.find(config_path);
if (modification_time_it == std::end(last_modification_times))
modification_time_it = last_modification_times.emplace(config_path, Poco::Timestamp{0}).first;
auto & config_last_modified = modification_time_it->second;
const auto last_modified = config_file.getLastModified();
if (last_modified > config_last_modified)
@ -163,12 +229,31 @@ void ExternalDictionaries::reloadFromFile(const std::string & config_path, const
continue;
}
auto it = dictionaries.find(name);
if (it != std::end(dictionaries))
if (it->second.origin != config_path)
throw std::runtime_error{"Overriding dictionary from file " + it->second.origin};
const auto dict_it = dictionaries.find(name);
if (dict_it != std::end(dictionaries))
if (dict_it->second.origin != config_path)
throw std::runtime_error{"Overriding dictionary from file " + dict_it->second.origin};
auto dict_ptr = DictionaryFactory::instance().create(name, *config, key, context);
if (const auto exception_ptr = dict_ptr->getCreationException())
{
const auto failed_dict_it = failed_dictionaries.find(name);
if (failed_dict_it != std::end(failed_dictionaries))
{
failed_dict_it->second = failed_dictionary_info{
std::move(dict_ptr),
std::chrono::system_clock::now() + std::chrono::seconds{backoff_initial_sec}
};
}
else
failed_dictionaries.emplace(name, failed_dictionary_info{
std::move(dict_ptr),
std::chrono::system_clock::now() + std::chrono::seconds{backoff_initial_sec}
});
std::rethrow_exception(exception_ptr);
}
if (!dict_ptr->isCached())
{
const auto & lifetime = dict_ptr->getLifetime();
@ -183,42 +268,38 @@ void ExternalDictionaries::reloadFromFile(const std::string & config_path, const
}
}
const std::lock_guard<std::mutex> lock{dictionaries_mutex};
/// add new dictionary or update an existing version
if (it == std::end(dictionaries))
{
const std::lock_guard<std::mutex> lock{dictionaries_mutex};
if (dict_it == std::end(dictionaries))
dictionaries.emplace(name, dictionary_info{
std::make_shared<MultiVersion<IDictionary>>(dict_ptr.release()),
config_path
});
}
else
{
if (it->second.dict)
it->second.dict->set(dict_ptr.release());
if (dict_it->second.dict)
dict_it->second.dict->set(dict_ptr.release());
else
{
const std::lock_guard<std::mutex> lock{dictionaries_mutex};
it->second.dict = std::make_shared<MultiVersion<IDictionary>>(dict_ptr.release());
}
dict_it->second.dict = std::make_shared<MultiVersion<IDictionary>>(dict_ptr.release());
/// erase stored exception on success
it->second.exception = std::exception_ptr{};
dict_it->second.exception = std::exception_ptr{};
failed_dictionaries.erase(name);
}
}
catch (...)
{
if (!name.empty())
{
const std::lock_guard<std::mutex> lock{dictionaries_mutex};
const auto exception_ptr = std::current_exception();
const auto it = dictionaries.find(name);
if (it == std::end(dictionaries))
{
const std::lock_guard<std::mutex> lock{dictionaries_mutex};
const auto dict_it = dictionaries.find(name);
if (dict_it == std::end(dictionaries))
dictionaries.emplace(name, dictionary_info{nullptr, config_path, exception_ptr});
}
else
it->second.exception = exception_ptr;
dict_it->second.exception = exception_ptr;
}
try
@ -253,16 +334,17 @@ void ExternalDictionaries::reloadFromFile(const std::string & config_path, const
MultiVersion<IDictionary>::Version ExternalDictionaries::getDictionary(const std::string & name) const
{
const std::lock_guard<std::mutex> lock{dictionaries_mutex};
const auto it = dictionaries.find(name);
const auto it = dictionaries.find(name);
if (it == std::end(dictionaries))
throw Exception{
"No such dictionary: " + name,
ErrorCodes::BAD_ARGUMENTS
};
if (!it->second.dict && it->second.exception)
std::rethrow_exception(it->second.exception);
if (!it->second.dict)
it->second.exception ? std::rethrow_exception(it->second.exception) :
throw Exception{"No dictionary", ErrorCodes::LOGICAL_ERROR};
return it->second.dict->get();
}