Merge pull request #6737 from vitlibar/improve-error-handling-in-cache-dictionary

Improve error handling in cache dictionaries
This commit is contained in:
alexey-milovidov 2019-08-30 16:06:07 +03:00 committed by GitHub
commit e5a7f451f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 163 additions and 91 deletions

View File

@ -70,6 +70,7 @@ CacheDictionary::CacheDictionary(
, dict_struct(dict_struct_) , dict_struct(dict_struct_)
, source_ptr{std::move(source_ptr_)} , source_ptr{std::move(source_ptr_)}
, dict_lifetime(dict_lifetime_) , dict_lifetime(dict_lifetime_)
, log(&Logger::get("ExternalDictionaries"))
, size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))} , size{roundUpToPowerOfTwoOrZero(std::max(size_, size_t(max_collision_length)))}
, size_overlap_mask{this->size - 1} , size_overlap_mask{this->size - 1}
, cells{this->size} , cells{this->size}
@ -575,6 +576,12 @@ BlockInputStreamPtr CacheDictionary::getBlockInputStream(const Names & column_na
return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, getCachedIds(), column_names); return std::make_shared<BlockInputStreamType>(shared_from_this(), max_block_size, getCachedIds(), column_names);
} }
std::exception_ptr CacheDictionary::getLastException() const
{
const ProfilingScopedReadRWLock read_lock{rw_lock, ProfileEvents::DictCacheLockReadNs};
return last_exception;
}
void registerDictionaryCache(DictionaryFactory & factory) void registerDictionaryCache(DictionaryFactory & factory)
{ {
auto create_layout = [=](const std::string & name, auto create_layout = [=](const std::string & name,

View File

@ -7,6 +7,7 @@
#include <shared_mutex> #include <shared_mutex>
#include <variant> #include <variant>
#include <vector> #include <vector>
#include <common/logger_useful.h>
#include <Columns/ColumnDecimal.h> #include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <pcg_random.hpp> #include <pcg_random.hpp>
@ -74,6 +75,8 @@ public:
void isInVectorConstant(const PaddedPODArray<Key> & child_ids, const Key ancestor_id, PaddedPODArray<UInt8> & out) const override; void isInVectorConstant(const PaddedPODArray<Key> & child_ids, const Key ancestor_id, PaddedPODArray<UInt8> & out) const override;
void isInConstantVector(const Key child_id, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const override; void isInConstantVector(const Key child_id, const PaddedPODArray<Key> & ancestor_ids, PaddedPODArray<UInt8> & out) const override;
std::exception_ptr getLastException() const override;
template <typename T> template <typename T>
using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>; using ResultArrayType = std::conditional_t<IsDecimalNumber<T>, DecimalPaddedPODArray<T>, PaddedPODArray<T>>;
@ -253,8 +256,9 @@ private:
const std::string name; const std::string name;
const DictionaryStructure dict_struct; const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr; mutable DictionarySourcePtr source_ptr;
const DictionaryLifetime dict_lifetime; const DictionaryLifetime dict_lifetime;
Logger * const log;
mutable std::shared_mutex rw_lock; mutable std::shared_mutex rw_lock;
@ -274,6 +278,10 @@ private:
Attribute * hierarchical_attribute = nullptr; Attribute * hierarchical_attribute = nullptr;
std::unique_ptr<ArenaWithFreeLists> string_arena; std::unique_ptr<ArenaWithFreeLists> string_arena;
mutable std::exception_ptr last_exception;
mutable size_t error_count = 0;
mutable std::chrono::system_clock::time_point backoff_end_time;
mutable pcg64 rnd_engine; mutable pcg64 rnd_engine;
mutable size_t bytes_allocated = 0; mutable size_t bytes_allocated = 0;

View File

@ -3,6 +3,7 @@
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Common/ProfilingScopedRWLock.h> #include <Common/ProfilingScopedRWLock.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <common/DateLUT.h>
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockInputStream.h>
#include <ext/map.h> #include <ext/map.h>
#include <ext/range.h> #include <ext/range.h>
@ -243,77 +244,102 @@ template <typename PresentIdHandler, typename AbsentIdHandler>
void CacheDictionary::update( void CacheDictionary::update(
const std::vector<Key> & requested_ids, PresentIdHandler && on_cell_updated, AbsentIdHandler && on_id_not_found) const const std::vector<Key> & requested_ids, PresentIdHandler && on_cell_updated, AbsentIdHandler && on_id_not_found) const
{ {
CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests};
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size());
std::unordered_map<Key, UInt8> remaining_ids{requested_ids.size()}; std::unordered_map<Key, UInt8> remaining_ids{requested_ids.size()};
for (const auto id : requested_ids) for (const auto id : requested_ids)
remaining_ids.insert({id, 0}); remaining_ids.insert({id, 0});
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec}; const auto now = std::chrono::system_clock::now();
const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs}; const ProfilingScopedWriteRWLock write_lock{rw_lock, ProfileEvents::DictCacheLockWriteNs};
if (now > backoff_end_time)
{ {
CurrentMetrics::Increment metric_increment{CurrentMetrics::DictCacheRequests}; try
Stopwatch watch;
auto stream = source_ptr->loadIds(requested_ids);
stream->readPrefix();
const auto now = std::chrono::system_clock::now();
while (const auto block = stream->read())
{ {
const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get()); if (error_count)
if (!id_column)
throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH};
const auto & ids = id_column->getData();
/// cache column pointers
const auto column_ptrs = ext::map<std::vector>(
ext::range(0, attributes.size()), [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); });
for (const auto i : ext::range(0, ids.size()))
{ {
const auto id = ids[i]; /// Recover after error: we have to clone the source here because
/// it could keep connections which should be reset after error.
const auto find_result = findCellIdx(id, now); source_ptr = source_ptr->clone();
const auto & cell_idx = find_result.cell_idx;
auto & cell = cells[cell_idx];
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto & attribute_column = *column_ptrs[attribute_idx];
auto & attribute = attributes[attribute_idx];
setAttributeValue(attribute, cell_idx, attribute_column[i]);
}
/// if cell id is zero and zero does not map to this cell, then the cell is unused
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)});
else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
/// inform caller
on_cell_updated(id, cell_idx);
/// mark corresponding id as found
remaining_ids[id] = 1;
} }
Stopwatch watch;
auto stream = source_ptr->loadIds(requested_ids);
stream->readPrefix();
while (const auto block = stream->read())
{
const auto id_column = typeid_cast<const ColumnUInt64 *>(block.safeGetByPosition(0).column.get());
if (!id_column)
throw Exception{name + ": id column has type different from UInt64.", ErrorCodes::TYPE_MISMATCH};
const auto & ids = id_column->getData();
/// cache column pointers
const auto column_ptrs = ext::map<std::vector>(
ext::range(0, attributes.size()), [&block](size_t i) { return block.safeGetByPosition(i + 1).column.get(); });
for (const auto i : ext::range(0, ids.size()))
{
const auto id = ids[i];
const auto find_result = findCellIdx(id, now);
const auto & cell_idx = find_result.cell_idx;
auto & cell = cells[cell_idx];
for (const auto attribute_idx : ext::range(0, attributes.size()))
{
const auto & attribute_column = *column_ptrs[attribute_idx];
auto & attribute = attributes[attribute_idx];
setAttributeValue(attribute, cell_idx, attribute_column[i]);
}
/// if cell id is zero and zero does not map to this cell, then the cell is unused
if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed);
cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
{
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
}
else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
/// inform caller
on_cell_updated(id, cell_idx);
/// mark corresponding id as found
remaining_ids[id] = 1;
}
}
stream->readSuffix();
error_count = 0;
last_exception = std::exception_ptr{};
backoff_end_time = std::chrono::system_clock::time_point{};
ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
} }
catch (...)
{
++error_count;
last_exception = std::current_exception();
backoff_end_time = now + std::chrono::seconds(ExternalLoadableBackoff{}.calculateDuration(rnd_engine, error_count));
stream->readSuffix(); tryLogException(last_exception, log, "Could not update cache dictionary '" + getName() +
"', next update is scheduled at " + DateLUT::instance().timeToString(std::chrono::system_clock::to_time_t(backoff_end_time)));
ProfileEvents::increment(ProfileEvents::DictCacheKeysRequested, requested_ids.size()); }
ProfileEvents::increment(ProfileEvents::DictCacheRequestTimeNs, watch.elapsed());
} }
size_t not_found_num = 0, found_num = 0; size_t not_found_num = 0, found_num = 0;
const auto now = std::chrono::system_clock::now();
/// Check which ids have not been found and require setting null_value /// Check which ids have not been found and require setting null_value
for (const auto & id_found_pair : remaining_ids) for (const auto & id_found_pair : remaining_ids)
{ {
@ -328,24 +354,45 @@ void CacheDictionary::update(
const auto find_result = findCellIdx(id, now); const auto find_result = findCellIdx(id, now);
const auto & cell_idx = find_result.cell_idx; const auto & cell_idx = find_result.cell_idx;
auto & cell = cells[cell_idx]; auto & cell = cells[cell_idx];
/// Set null_value for each attribute if (error_count)
for (auto & attribute : attributes) {
setDefaultAttributeValue(attribute, cell_idx); if (find_result.outdated)
{
/// We have expired data for that `id` so we can continue using it.
bool was_default = cell.isDefault();
cell.setExpiresAt(backoff_end_time);
if (was_default)
cell.setDefault();
if (was_default)
on_id_not_found(id, cell_idx);
else
on_cell_updated(id, cell_idx);
continue;
}
/// We don't have expired data for that `id` so all we can do is to rethrow `last_exception`.
std::rethrow_exception(last_exception);
}
/// Check if cell had not been occupied before and increment element counter if it hadn't /// Check if cell had not been occupied before and increment element counter if it hadn't
if (cell.id == 0 && cell_idx != zero_cell_idx) if (cell.id == 0 && cell_idx != zero_cell_idx)
element_count.fetch_add(1, std::memory_order_relaxed); element_count.fetch_add(1, std::memory_order_relaxed);
cell.id = id; cell.id = id;
if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0) if (dict_lifetime.min_sec != 0 && dict_lifetime.max_sec != 0)
cell.setExpiresAt(std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}); {
std::uniform_int_distribution<UInt64> distribution{dict_lifetime.min_sec, dict_lifetime.max_sec};
cell.setExpiresAt(now + std::chrono::seconds{distribution(rnd_engine)});
}
else else
cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max()); cell.setExpiresAt(std::chrono::time_point<std::chrono::system_clock>::max());
/// Set null_value for each attribute
cell.setDefault(); cell.setDefault();
for (auto & attribute : attributes)
setDefaultAttributeValue(attribute, cell_idx);
/// inform caller that the cell has not been found /// inform caller that the cell has not been found
on_id_not_found(id, cell_idx); on_id_not_found(id, cell_idx);

View File

@ -56,6 +56,8 @@ struct IDictionaryBase : public IExternalLoadable
return source && source->isModified(); return source && source->isModified();
} }
virtual std::exception_ptr getLastException() const { return {}; }
std::shared_ptr<IDictionaryBase> shared_from_this() std::shared_ptr<IDictionaryBase> shared_from_this()
{ {
return std::static_pointer_cast<IDictionaryBase>(IExternalLoadable::shared_from_this()); return std::static_pointer_cast<IDictionaryBase>(IExternalLoadable::shared_from_this());

View File

@ -1,6 +1,5 @@
#include "ExternalLoader.h" #include "ExternalLoader.h"
#include <cmath>
#include <mutex> #include <mutex>
#include <pcg_random.hpp> #include <pcg_random.hpp>
#include <common/DateLUT.h> #include <common/DateLUT.h>
@ -933,6 +932,8 @@ private:
class ExternalLoader::PeriodicUpdater : private boost::noncopyable class ExternalLoader::PeriodicUpdater : private boost::noncopyable
{ {
public: public:
static constexpr UInt64 check_period_sec = 5;
PeriodicUpdater(ConfigFilesReader & config_files_reader_, LoadingDispatcher & loading_dispatcher_) PeriodicUpdater(ConfigFilesReader & config_files_reader_, LoadingDispatcher & loading_dispatcher_)
: config_files_reader(config_files_reader_), loading_dispatcher(loading_dispatcher_) : config_files_reader(config_files_reader_), loading_dispatcher(loading_dispatcher_)
{ {
@ -940,11 +941,10 @@ public:
~PeriodicUpdater() { enable(false); } ~PeriodicUpdater() { enable(false); }
void enable(bool enable_, const ExternalLoaderUpdateSettings & settings_ = {}) void enable(bool enable_)
{ {
std::unique_lock lock{mutex}; std::unique_lock lock{mutex};
enabled = enable_; enabled = enable_;
settings = settings_;
if (enable_) if (enable_)
{ {
@ -985,9 +985,7 @@ public:
return std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)}; return std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)};
} }
std::uniform_int_distribution<UInt64> distribution(0, static_cast<UInt64>(std::exp2(error_count - 1))); return std::chrono::system_clock::now() + std::chrono::seconds(ExternalLoadableBackoff{}.calculateDuration(rnd_engine, error_count));
std::chrono::seconds delay(std::min<UInt64>(settings.backoff_max_sec, settings.backoff_initial_sec + distribution(rnd_engine)));
return std::chrono::system_clock::now() + delay;
} }
private: private:
@ -996,9 +994,8 @@ private:
setThreadName("ExterLdrReload"); setThreadName("ExterLdrReload");
std::unique_lock lock{mutex}; std::unique_lock lock{mutex};
auto timeout = [this] { return std::chrono::seconds(settings.check_period_sec); };
auto pred = [this] { return !enabled; }; auto pred = [this] { return !enabled; };
while (!event.wait_for(lock, timeout(), pred)) while (!event.wait_for(lock, std::chrono::seconds(check_period_sec), pred))
{ {
lock.unlock(); lock.unlock();
loading_dispatcher.setConfiguration(config_files_reader.read()); loading_dispatcher.setConfiguration(config_files_reader.read());
@ -1012,7 +1009,6 @@ private:
mutable std::mutex mutex; mutable std::mutex mutex;
bool enabled = false; bool enabled = false;
ExternalLoaderUpdateSettings settings;
ThreadFromGlobalPool thread; ThreadFromGlobalPool thread;
std::condition_variable event; std::condition_variable event;
mutable pcg64 rnd_engine{randomSeed()}; mutable pcg64 rnd_engine{randomSeed()};
@ -1051,9 +1047,9 @@ void ExternalLoader::enableAsyncLoading(bool enable)
loading_dispatcher->enableAsyncLoading(enable); loading_dispatcher->enableAsyncLoading(enable);
} }
void ExternalLoader::enablePeriodicUpdates(bool enable_, const ExternalLoaderUpdateSettings & settings_) void ExternalLoader::enablePeriodicUpdates(bool enable_)
{ {
periodic_updater->enable(enable_, settings_); periodic_updater->enable(enable_);
} }
bool ExternalLoader::hasCurrentlyLoadedObjects() const bool ExternalLoader::hasCurrentlyLoadedObjects() const

View File

@ -11,19 +11,6 @@
namespace DB namespace DB
{ {
struct ExternalLoaderUpdateSettings
{
UInt64 check_period_sec = 5;
UInt64 backoff_initial_sec = 5;
/// 10 minutes
UInt64 backoff_max_sec = 10 * 60;
ExternalLoaderUpdateSettings() = default;
ExternalLoaderUpdateSettings(UInt64 check_period_sec_, UInt64 backoff_initial_sec_, UInt64 backoff_max_sec_)
: check_period_sec(check_period_sec_), backoff_initial_sec(backoff_initial_sec_), backoff_max_sec(backoff_max_sec_) {}
};
/* External configuration structure. /* External configuration structure.
* *
* <external_group> * <external_group>
@ -105,7 +92,7 @@ public:
void enableAsyncLoading(bool enable); void enableAsyncLoading(bool enable);
/// Sets settings for periodic updates. /// Sets settings for periodic updates.
void enablePeriodicUpdates(bool enable, const ExternalLoaderUpdateSettings & settings = {}); void enablePeriodicUpdates(bool enable);
/// Returns the status of the object. /// Returns the status of the object.
/// If the object has not been loaded yet then the function returns Status::NOT_LOADED. /// If the object has not been loaded yet then the function returns Status::NOT_LOADED.

View File

@ -1,7 +1,7 @@
#include <Interpreters/IExternalLoadable.h> #include <Interpreters/IExternalLoadable.h>
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <cmath>
namespace DB namespace DB
{ {
@ -16,4 +16,13 @@ ExternalLoadableLifetime::ExternalLoadableLifetime(const Poco::Util::AbstractCon
max_sec = has_min ? config.getUInt64(config_prefix + ".max") : min_sec; max_sec = has_min ? config.getUInt64(config_prefix + ".max") : min_sec;
} }
UInt64 ExternalLoadableBackoff::calculateDuration(pcg64 & rnd_engine, size_t error_count) const
{
if (error_count < 1)
error_count = 1;
std::uniform_int_distribution<UInt64> distribution(0, static_cast<UInt64>(std::exp2(error_count - 1)));
return std::min<UInt64>(backoff_max_sec, backoff_initial_sec + distribution(rnd_engine));
}
} }

View File

@ -3,6 +3,7 @@
#include <string> #include <string>
#include <memory> #include <memory>
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <pcg_random.hpp>
#include <Core/Types.h> #include <Core/Types.h>
@ -25,6 +26,17 @@ struct ExternalLoadableLifetime
}; };
/// Delay before trying to load again after error.
struct ExternalLoadableBackoff
{
UInt64 backoff_initial_sec = 5;
UInt64 backoff_max_sec = 10 * 60; /// 10 minutes
/// Calculates time to try loading again after error.
UInt64 calculateDuration(pcg64 & rnd_engine, size_t error_count = 1) const;
};
/// Basic interface for external loadable objects. Is used in ExternalLoader. /// Basic interface for external loadable objects. Is used in ExternalLoader.
class IExternalLoadable : public std::enable_shared_from_this<IExternalLoadable>, private boost::noncopyable class IExternalLoadable : public std::enable_shared_from_this<IExternalLoadable>, private boost::noncopyable
{ {

View File

@ -50,10 +50,11 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con
res_columns[i++]->insert(static_cast<Int8>(load_result.status)); res_columns[i++]->insert(static_cast<Int8>(load_result.status));
res_columns[i++]->insert(load_result.origin); res_columns[i++]->insert(load_result.origin);
if (load_result.object) std::exception_ptr last_exception = load_result.exception;
{
const auto dict_ptr = std::static_pointer_cast<const IDictionaryBase>(load_result.object);
const auto dict_ptr = std::dynamic_pointer_cast<const IDictionaryBase>(load_result.object);
if (dict_ptr)
{
res_columns[i++]->insert(dict_ptr->getTypeName()); res_columns[i++]->insert(dict_ptr->getTypeName());
const auto & dict_struct = dict_ptr->getStructure(); const auto & dict_struct = dict_ptr->getStructure();
@ -66,6 +67,9 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con
res_columns[i++]->insert(dict_ptr->getElementCount()); res_columns[i++]->insert(dict_ptr->getElementCount());
res_columns[i++]->insert(dict_ptr->getLoadFactor()); res_columns[i++]->insert(dict_ptr->getLoadFactor());
res_columns[i++]->insert(dict_ptr->getSource()->toString()); res_columns[i++]->insert(dict_ptr->getSource()->toString());
if (!last_exception)
last_exception = dict_ptr->getLastException();
} }
else else
{ {
@ -76,8 +80,8 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con
res_columns[i++]->insert(static_cast<UInt64>(std::chrono::system_clock::to_time_t(load_result.loading_start_time))); res_columns[i++]->insert(static_cast<UInt64>(std::chrono::system_clock::to_time_t(load_result.loading_start_time)));
res_columns[i++]->insert(std::chrono::duration_cast<std::chrono::duration<float>>(load_result.loading_duration).count()); res_columns[i++]->insert(std::chrono::duration_cast<std::chrono::duration<float>>(load_result.loading_duration).count());
if (load_result.exception) if (last_exception)
res_columns[i++]->insert(getExceptionMessage(load_result.exception, false)); res_columns[i++]->insert(getExceptionMessage(last_exception, false));
else else
res_columns[i++]->insertDefault(); res_columns[i++]->insertDefault();
} }