Reloading of a dictionary now waits for results and shows errors.

Clearer names for load and reload functions in ExternalLoader.
This commit is contained in:
Vitaly Baranov 2019-12-12 21:33:43 +03:00
parent 9f27e05dc8
commit d0d5c72e4d
9 changed files with 363 additions and 245 deletions

View File

@ -39,7 +39,7 @@ Tables DatabaseDictionary::listTables(const Context & context, const FilterByNam
if (filter_by_name)
{
/// If `filter_by_name` is set, we iterate through all dictionaries with such names. That's why we need to load all of them.
context.getExternalDictionariesLoader().load(filter_by_name, load_results);
load_results = context.getExternalDictionariesLoader().tryLoad<ExternalLoader::LoadResults>(filter_by_name);
}
else
{
@ -47,12 +47,12 @@ Tables DatabaseDictionary::listTables(const Context & context, const FilterByNam
load_results = context.getExternalDictionariesLoader().getCurrentLoadResults();
}
for (const auto & [object_name, info]: load_results)
for (const auto & load_result: load_results)
{
/// Load tables only from XML dictionaries, don't touch other
if (info.object != nullptr && info.repository_name.empty())
if (load_result.object && load_result.repository_name.empty())
{
auto dict_ptr = std::static_pointer_cast<const IDictionaryBase>(info.object);
auto dict_ptr = std::static_pointer_cast<const IDictionaryBase>(load_result.object);
auto dict_name = dict_ptr->getName();
const DictionaryStructure & dictionary_structure = dict_ptr->getStructure();
auto columns = StorageDictionary::getNamesAndTypes(dictionary_structure);

View File

@ -326,9 +326,9 @@ void DatabaseOnDisk::createDictionary(
bool lazy_load = context.getConfigRef().getBool("dictionaries_lazy_load", true);
if (!lazy_load)
{
/// loadStrict() is called here to force loading the dictionary, wait until the loading is finished,
/// load() is called here to force loading the dictionary, wait until the loading is finished,
/// and throw an exception if the loading is failed.
external_loader.loadStrict(full_name);
external_loader.load(full_name);
}
database.attachDictionary(dictionary_name, context);

View File

@ -21,12 +21,12 @@ public:
DictPtr getDictionary(const std::string & name) const
{
return std::static_pointer_cast<const IDictionaryBase>(getLoadable(name));
return std::static_pointer_cast<const IDictionaryBase>(load(name));
}
DictPtr tryGetDictionary(const std::string & name) const
{
return std::static_pointer_cast<const IDictionaryBase>(tryGetLoadable(name));
return std::static_pointer_cast<const IDictionaryBase>(tryLoad(name));
}
void addConfigRepository(

View File

@ -12,6 +12,7 @@
#include <ext/scope_guard.h>
#include <boost/range/adaptor/map.hpp>
#include <boost/range/algorithm/copy.hpp>
#include <unordered_set>
namespace DB
@ -22,21 +23,68 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
namespace
{
/// Lock mutex only in async mode
/// In other case does nothing
struct LoadingGuardForAsyncLoad
{
std::unique_lock<std::mutex> lock;
LoadingGuardForAsyncLoad(bool async, std::mutex & mutex)
template <typename ReturnType>
ReturnType convertTo(ExternalLoader::LoadResult result)
{
if (async)
lock = std::unique_lock(mutex);
if constexpr (std::is_same_v<ReturnType, ExternalLoader::LoadResult>)
return result;
else
{
static_assert(std::is_same_v<ReturnType, ExternalLoader::LoadablePtr>);
return std::move(result.object);
}
}
};
template <typename ReturnType>
ReturnType convertTo(ExternalLoader::LoadResults results)
{
if constexpr (std::is_same_v<ReturnType, ExternalLoader::LoadResults>)
return results;
else
{
static_assert(std::is_same_v<ReturnType, ExternalLoader::Loadables>);
ExternalLoader::Loadables objects;
objects.reserve(results.size());
for (const auto & result : results)
{
if (auto object = std::move(result.object))
objects.push_back(std::move(object));
}
return objects;
}
}
template <typename ReturnType>
ReturnType notExists(const String & name)
{
if constexpr (std::is_same_v<ReturnType, ExternalLoader::LoadResult>)
{
ExternalLoader::LoadResult res;
res.name = name;
return res;
}
else
{
static_assert(std::is_same_v<ReturnType, ExternalLoader::LoadablePtr>);
return nullptr;
}
}
/// Lock mutex only in async mode
/// In other case does nothing
struct LoadingGuardForAsyncLoad
{
std::unique_lock<std::mutex> lock;
LoadingGuardForAsyncLoad(bool async, std::mutex & mutex)
{
if (async)
lock = std::unique_lock(mutex);
}
};
}
struct ExternalLoader::ObjectConfig
@ -379,7 +427,7 @@ public:
/// Configuration has been changed.
info.config_changed = true;
if (info.wasLoading())
if (info.triedToLoad())
{
/// The object has been tried to load before, so it is currently in use or was in use
/// and we should try to reload it with the new config.
@ -395,7 +443,7 @@ public:
{
if (infos.find(name) == infos.end())
{
Info & info = infos.emplace(name, Info{config}).first->second;
Info & info = infos.emplace(name, Info{name, config}).first->second;
if (always_load_everything)
startLoading(name, info);
}
@ -424,7 +472,7 @@ public:
{
/// Start loading all the objects which were not loaded yet.
for (auto & [name, info] : infos)
if (!info.wasLoading())
if (!info.triedToLoad())
startLoading(name, info);
}
}
@ -448,35 +496,25 @@ public:
}
/// Returns the load result of the object.
LoadResult getCurrentLoadResult(const String & name) const
template <typename ReturnType>
ReturnType getCurrentLoadResult(const String & name) const
{
std::lock_guard lock{mutex};
const Info * info = getInfo(name);
if (!info)
return {Status::NOT_EXIST};
return info->loadResult();
return notExists<ReturnType>(name);
return info->getLoadResult<ReturnType>();
}
/// Returns all the load results as a map.
/// The function doesn't load anything, it just returns the current load results as is.
LoadResults getCurrentLoadResults(const FilterByNameFunction & filter_by_name) const
template <typename ReturnType>
ReturnType getCurrentLoadResults(const FilterByNameFunction & filter) const
{
std::lock_guard lock{mutex};
return collectLoadResults(filter_by_name);
return collectLoadResults<ReturnType>(filter);
}
LoadResults getCurrentLoadResults() const { return getCurrentLoadResults(allNames); }
/// Returns all the loaded objects as a map.
/// The function doesn't load anything, it just returns the current load results as is.
Loadables getCurrentlyLoadedObjects(const FilterByNameFunction & filter_by_name) const
{
std::lock_guard lock{mutex};
return collectLoadedObjects(filter_by_name);
}
Loadables getCurrentlyLoadedObjects() const { return getCurrentlyLoadedObjects(allNames); }
size_t getNumberOfCurrentlyLoadedObjects() const
{
std::lock_guard lock{mutex};
@ -499,91 +537,67 @@ public:
return false;
}
Strings getAllTriedToLoadNames() const
{
Strings names;
for (auto & [name, info] : infos)
if (info.triedToLoad())
names.push_back(name);
return names;
}
/// Tries to load a specified object during the timeout.
/// Returns nullptr if the loading is unsuccessful or if there is no such object.
void load(const String & name, LoadablePtr & loaded_object, Duration timeout = NO_TIMEOUT)
template <typename ReturnType>
ReturnType tryLoad(const String & name, Duration timeout)
{
std::unique_lock lock{mutex};
Info * info = loadImpl(name, timeout, lock);
loaded_object = (info ? info->object : nullptr);
}
/// Tries to finish loading of a specified object during the timeout.
/// Returns nullptr if the loading is unsuccessful or if there is no such object.
void loadStrict(const String & name, LoadablePtr & loaded_object)
{
std::unique_lock lock{mutex};
Info * info = loadImpl(name, NO_TIMEOUT, lock);
if (!info)
throw Exception("No such " + type_name + " '" + name + "'.", ErrorCodes::BAD_ARGUMENTS);
checkLoaded(name, *info);
loaded_object = info->object;
return notExists<ReturnType>(name);
return info->getLoadResult<ReturnType>();
}
/// Tries to start loading of the objects for which the specified functor returns true.
void load(const FilterByNameFunction & filter_by_name)
{
std::lock_guard lock{mutex};
for (auto & [name, info] : infos)
if (!info.wasLoading() && filter_by_name(name))
startLoading(name, info);
}
/// Tries to finish loading of the objects for which the specified function returns true.
void load(const FilterByNameFunction & filter_by_name, Loadables & loaded_objects, Duration timeout = NO_TIMEOUT)
template <typename ReturnType>
ReturnType tryLoad(const FilterByNameFunction & filter, Duration timeout)
{
std::unique_lock lock{mutex};
loadImpl(filter_by_name, timeout, lock);
loaded_objects = collectLoadedObjects(filter_by_name);
loadImpl(filter, timeout, lock);
return collectLoadResults<ReturnType>(filter);
}
/// Tries to finish loading of the objects for which the specified function returns true.
void load(const FilterByNameFunction & filter_by_name, LoadResults & loaded_results, Duration timeout = NO_TIMEOUT)
/// Tries to load or reload a specified object.
template <typename ReturnType>
ReturnType tryLoadOrReload(const String & name, Duration timeout)
{
std::unique_lock lock{mutex};
loadImpl(filter_by_name, timeout, lock);
loaded_results = collectLoadResults(filter_by_name);
}
/// Tries to finish loading of all the objects during the timeout.
void load(Loadables & loaded_objects, Duration timeout = NO_TIMEOUT) { load(allNames, loaded_objects, timeout); }
void load(LoadResults & loaded_results, Duration timeout = NO_TIMEOUT) { load(allNames, loaded_results, timeout); }
/// Starts reloading a specified object.
void reload(const String & name, bool load_never_loading = false)
{
std::lock_guard lock{mutex};
Info * info = getInfo(name);
if (!info)
{
return;
}
return notExists<ReturnType>(name);
cancelLoading(*info);
info->forced_to_reload = true;
if (info->wasLoading() || load_never_loading)
{
cancelLoading(*info);
info->forced_to_reload = true;
startLoading(name, *info);
}
info = loadImpl(name, timeout, lock);
if (!info)
return notExists<ReturnType>(name);
return info->getLoadResult<ReturnType>();
}
/// Starts reloading of the objects which `filter_by_name` returns true for.
void reload(const FilterByNameFunction & filter_by_name, bool load_never_loading = false)
template <typename ReturnType>
ReturnType tryLoadOrReload(const FilterByNameFunction & filter, Duration timeout)
{
std::lock_guard lock{mutex};
std::unique_lock lock{mutex};
for (auto & [name, info] : infos)
{
if ((info.wasLoading() || load_never_loading) && filter_by_name(name))
if (filter(name))
{
cancelLoading(info);
info.forced_to_reload = true;
startLoading(name, info);
}
}
}
/// Starts reloading of all the objects.
void reload(bool load_never_loading = false) { reload(allNames, load_never_loading); }
loadImpl(filter, timeout, lock);
return collectLoadResults<ReturnType>(filter);
}
/// Starts reloading all the object which update time is earlier than now.
/// The function doesn't touch the objects which were never tried to load.
@ -658,12 +672,12 @@ public:
private:
struct Info
{
Info(const ObjectConfig & object_config_) : object_config(object_config_) {}
Info(const String & name_, const ObjectConfig & object_config_) : name(name_), object_config(object_config_) {}
bool loaded() const { return object != nullptr; }
bool failed() const { return !object && exception; }
bool loading() const { return loading_id != 0; }
bool wasLoading() const { return loaded() || failed() || loading(); }
bool triedToLoad() const { return loaded() || failed() || loading(); }
bool ready() const { return (loaded() || failed()) && !forced_to_reload; }
bool failedToReload() const { return loaded() && exception != nullptr; }
@ -684,27 +698,37 @@ private:
return std::chrono::duration_cast<Duration>(loading_end_time - loading_start_time);
}
LoadResult loadResult() const
template <typename ReturnType>
ReturnType getLoadResult() const
{
LoadResult result{status()};
result.object = object;
result.exception = exception;
result.loading_start_time = loading_start_time;
result.loading_duration = loadingDuration();
result.origin = object_config.path;
result.repository_name = object_config.repository_name;
return result;
if constexpr (std::is_same_v<ReturnType, LoadResult>)
{
LoadResult result;
result.name = name;
result.status = status();
result.object = object;
result.exception = exception;
result.loading_start_time = loading_start_time;
result.loading_duration = loadingDuration();
result.origin = object_config.path;
result.repository_name = object_config.repository_name;
return result;
}
else
{
static_assert(std::is_same_v<ReturnType, ExternalLoader::LoadablePtr>);
return object;
}
}
ObjectConfig object_config;
String name;
LoadablePtr object;
ObjectConfig object_config;
TimePoint loading_start_time;
TimePoint loading_end_time;
size_t loading_id = 0; /// Non-zero if it's loading right now.
size_t error_count = 0; /// Numbers of errors since last successful loading.
std::exception_ptr exception; /// Last error occurred.
bool config_changed = false; /// Whether the config has been change since last successful loading.
bool forced_to_reload = false; /// Whether the current reloading is forced, i.e. caused by user's direction. For periodic reloading and reloading due to a config's change `forced_to_reload == false`.
TimePoint next_update_time = TimePoint::max(); /// Time of the next update, `TimePoint::max()` means "never".
};
@ -724,26 +748,25 @@ private:
return &it->second;
}
Loadables collectLoadedObjects(const FilterByNameFunction & filter_by_name) const
template <typename ReturnType>
ReturnType collectLoadResults(const FilterByNameFunction & filter) const
{
Loadables objects;
objects.reserve(infos.size());
for (const auto & [name, info] : infos)
if (info.loaded() && filter_by_name(name))
objects.emplace_back(info.object);
return objects;
}
LoadResults collectLoadResults(const FilterByNameFunction & filter_by_name) const
{
LoadResults load_results;
load_results.reserve(infos.size());
ReturnType results;
results.reserve(infos.size());
for (const auto & [name, info] : infos)
{
if (filter_by_name(name))
load_results.emplace_back(name, info.loadResult());
if (filter(name))
{
auto result = info.template getLoadResult<typename ReturnType::value_type>();
if constexpr (std::is_same_v<typename ReturnType::value_type, LoadablePtr>)
{
if (!result)
continue;
}
results.emplace_back(std::move(result));
}
}
return load_results;
return results;
}
Info * loadImpl(const String & name, Duration timeout, std::unique_lock<std::mutex> & lock)
@ -759,7 +782,7 @@ private:
return info->ready();
};
if (timeout == NO_TIMEOUT)
if (timeout == WAIT)
event.wait(lock, pred);
else
event.wait_for(lock, timeout, pred);
@ -767,14 +790,14 @@ private:
return info;
}
void loadImpl(const FilterByNameFunction & filter_by_name, Duration timeout, std::unique_lock<std::mutex> & lock)
void loadImpl(const FilterByNameFunction & filter, Duration timeout, std::unique_lock<std::mutex> & lock)
{
auto pred = [&]()
{
bool all_ready = true;
for (auto & [name, info] : infos)
{
if (info.ready() || !filter_by_name(name))
if (info.ready() || !filter(name))
continue;
if (!info.loading())
startLoading(name, info);
@ -784,7 +807,7 @@ private:
return all_ready;
};
if (timeout == NO_TIMEOUT)
if (timeout == WAIT)
event.wait(lock, pred);
else
event.wait_for(lock, timeout, pred);
@ -946,12 +969,14 @@ private:
/// Use `create_function` to perform the actual loading.
/// It's much better to do it with `mutex` unlocked because the loading can take a lot of time
/// and require access to other objects.
bool need_complete_loading = !info->object || info->config_changed || info->forced_to_reload;
auto [new_object, new_exception] = loadOneObject(name, info->object_config, need_complete_loading ? nullptr : info->object);
auto previous_version_to_use = info->object;
bool need_complete_reloading = !info->object || info->config_changed || info->forced_to_reload;
if (need_complete_reloading)
previous_version_to_use = nullptr; /// Need complete reloading, cannot use the previous version.
auto [new_object, new_exception] = loadOneObject(name, info->object_config, previous_version_to_use);
if (!new_object && !new_exception)
throw Exception("No object created and no exception raised for " + type_name, ErrorCodes::LOGICAL_ERROR);
processLoadResult(name, loading_id, info->object, new_object, new_exception, info->error_count, async);
event.notify_all();
}
@ -981,19 +1006,6 @@ private:
info.loading_end_time = std::chrono::system_clock::now();
}
void checkLoaded(const String & name, const Info & info)
{
if (info.loaded())
return;
if (info.loading())
throw Exception(type_name + " '" + name + "' is still loading.", ErrorCodes::BAD_ARGUMENTS);
if (info.failed())
std::rethrow_exception(info.exception);
}
/// Filter by name which matches everything.
static bool allNames(const String &) { return true; }
/// Calculate next update time for loaded_object. Can be called without mutex locking,
/// because single loadable can be loaded in single thread only.
TimePoint calculateNextUpdateTime(const LoadablePtr & loaded_object, size_t error_count) const
@ -1101,14 +1113,15 @@ private:
};
ExternalLoader::ExternalLoader(const String & type_name_, Logger * log)
: config_files_reader(std::make_unique<LoadablesConfigReader>(type_name_, log))
ExternalLoader::ExternalLoader(const String & type_name_, Logger * log_)
: config_files_reader(std::make_unique<LoadablesConfigReader>(type_name_, log_))
, loading_dispatcher(std::make_unique<LoadingDispatcher>(
std::bind(&ExternalLoader::createObject, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3),
type_name_,
log))
log_))
, periodic_updater(std::make_unique<PeriodicUpdater>(*config_files_reader, *loading_dispatcher))
, type_name(type_name_)
, log(log_)
{
}
@ -1155,29 +1168,26 @@ ExternalLoader::Status ExternalLoader::getCurrentStatus(const String & name) con
return loading_dispatcher->getCurrentStatus(name);
}
ExternalLoader::LoadResult ExternalLoader::getCurrentLoadResult(const String & name) const
template <typename ReturnType, typename>
ReturnType ExternalLoader::getCurrentLoadResult(const String & name) const
{
return loading_dispatcher->getCurrentLoadResult(name);
return loading_dispatcher->getCurrentLoadResult<ReturnType>(name);
}
ExternalLoader::LoadResults ExternalLoader::getCurrentLoadResults() const
template <typename ReturnType, typename>
ReturnType ExternalLoader::getCurrentLoadResults(const FilterByNameFunction & filter) const
{
return loading_dispatcher->getCurrentLoadResults();
}
ExternalLoader::LoadResults ExternalLoader::getCurrentLoadResults(const FilterByNameFunction & filter_by_name) const
{
return loading_dispatcher->getCurrentLoadResults(filter_by_name);
return loading_dispatcher->getCurrentLoadResults<ReturnType>(filter);
}
ExternalLoader::Loadables ExternalLoader::getCurrentlyLoadedObjects() const
{
return loading_dispatcher->getCurrentlyLoadedObjects();
return getCurrentLoadResults<Loadables>();
}
ExternalLoader::Loadables ExternalLoader::getCurrentlyLoadedObjects(const FilterByNameFunction & filter_by_name) const
ExternalLoader::Loadables ExternalLoader::getCurrentlyLoadedObjects(const FilterByNameFunction & filter) const
{
return loading_dispatcher->getCurrentlyLoadedObjects(filter_by_name);
return getCurrentLoadResults<Loadables>(filter);
}
size_t ExternalLoader::getNumberOfCurrentlyLoadedObjects() const
@ -1185,56 +1195,104 @@ size_t ExternalLoader::getNumberOfCurrentlyLoadedObjects() const
return loading_dispatcher->getNumberOfCurrentlyLoadedObjects();
}
void ExternalLoader::load(const String & name, LoadablePtr & loaded_object, Duration timeout) const
template <typename ReturnType, typename>
ReturnType ExternalLoader::tryLoad(const String & name, Duration timeout) const
{
loading_dispatcher->load(name, loaded_object, timeout);
return loading_dispatcher->tryLoad<ReturnType>(name, timeout);
}
void ExternalLoader::loadStrict(const String & name, LoadablePtr & loaded_object) const
template <typename ReturnType, typename>
ReturnType ExternalLoader::tryLoad(const FilterByNameFunction & filter, Duration timeout) const
{
loading_dispatcher->loadStrict(name, loaded_object);
return loading_dispatcher->tryLoad<ReturnType>(filter, timeout);
}
void ExternalLoader::load(const FilterByNameFunction & filter_by_name, Loadables & loaded_objects, Duration timeout) const
template <typename ReturnType, typename>
ReturnType ExternalLoader::load(const String & name) const
{
if (filter_by_name)
loading_dispatcher->load(filter_by_name, loaded_objects, timeout);
else
loading_dispatcher->load(loaded_objects, timeout);
auto result = tryLoad<LoadResult>(name);
checkLoaded(result, false);
return convertTo<ReturnType>(result);
}
template <typename ReturnType, typename>
ReturnType ExternalLoader::load(const FilterByNameFunction & filter) const
{
auto results = tryLoad<LoadResults>(filter);
checkLoaded(results, false);
return convertTo<ReturnType>(results);
}
template <typename ReturnType, typename>
ReturnType ExternalLoader::loadOrReload(const String & name) const
{
loading_dispatcher->setConfiguration(config_files_reader->read());
auto result = loading_dispatcher->tryLoadOrReload<LoadResult>(name, WAIT);
checkLoaded(result, true);
return convertTo<ReturnType>(result);
}
template <typename ReturnType, typename>
ReturnType ExternalLoader::loadOrReload(const FilterByNameFunction & filter) const
{
loading_dispatcher->setConfiguration(config_files_reader->read());
auto results = loading_dispatcher->tryLoadOrReload<LoadResults>(filter, WAIT);
checkLoaded(results, true);
return convertTo<ReturnType>(results);
}
template <typename ReturnType, typename>
ReturnType ExternalLoader::reloadAllTriedToLoad() const
{
std::unordered_set<String> names;
boost::range::copy(getAllTriedToLoadNames(), std::inserter(names, names.end()));
return loadOrReload<ReturnType>([&names](const String & name) { return names.count(name); });
}
Strings ExternalLoader::getAllTriedToLoadNames() const
{
return loading_dispatcher->getAllTriedToLoadNames();
}
void ExternalLoader::load(const FilterByNameFunction & filter_by_name, LoadResults & loaded_objects, Duration timeout) const
void ExternalLoader::checkLoaded(const ExternalLoader::LoadResult & result,
bool check_no_errors) const
{
if (filter_by_name)
loading_dispatcher->load(filter_by_name, loaded_objects, timeout);
else
loading_dispatcher->load(loaded_objects, timeout);
if (result.object && (!check_no_errors || !result.exception))
return;
if (result.status == ExternalLoader::Status::LOADING)
throw Exception(type_name + " '" + result.name + "' is still loading", ErrorCodes::BAD_ARGUMENTS);
if (result.exception)
std::rethrow_exceptiozn(result.exception);
if (result.status == ExternalLoader::Status::NOT_EXIST)
throw Exception(type_name + " '" + result.name + "' not found", ErrorCodes::BAD_ARGUMENTS);
if (result.status == ExternalLoader::Status::NOT_LOADED)
throw Exception(type_name + " '" + result.name + "' not tried to load", ErrorCodes::BAD_ARGUMENTS);
}
void ExternalLoader::load(Loadables & loaded_objects, Duration timeout) const
void ExternalLoader::checkLoaded(const ExternalLoader::LoadResults & results,
bool check_no_errors) const
{
return loading_dispatcher->load(loaded_objects, timeout);
std::exception_ptr exception;
for (const auto & result : results)
{
try
{
checkLoaded(result, check_no_errors);
}
catch (...)
{
if (!exception)
exception = std::current_exception();
else
tryLogCurrentException(log);
}
}
if (exception)
std::rethrow_exception(exception);
}
void ExternalLoader::reload(const String & name, bool load_never_loading) const
{
reloadConfig();
loading_dispatcher->reload(name, load_never_loading);
}
void ExternalLoader::reload(bool load_never_loading) const
{
reloadConfig();
loading_dispatcher->reload(load_never_loading);
}
void ExternalLoader::reload(const FilterByNameFunction & filter_by_name, bool load_never_loading) const
{
reloadConfig();
loading_dispatcher->reload(filter_by_name, load_never_loading);
}
void ExternalLoader::reloadConfig() const
{
@ -1296,4 +1354,27 @@ std::ostream & operator<<(std::ostream & out, ExternalLoader::Status status)
return out << toString(status);
}
template ExternalLoader::LoadablePtr ExternalLoader::getCurrentLoadResult<ExternalLoader::LoadablePtr>(const String &) const;
template ExternalLoader::LoadResult ExternalLoader::getCurrentLoadResult<ExternalLoader::LoadResult>(const String &) const;
template ExternalLoader::Loadables ExternalLoader::getCurrentLoadResults<ExternalLoader::Loadables>(const FilterByNameFunction &) const;
template ExternalLoader::LoadResults ExternalLoader::getCurrentLoadResults<ExternalLoader::LoadResults>(const FilterByNameFunction &) const;
template ExternalLoader::LoadablePtr ExternalLoader::tryLoad<ExternalLoader::LoadablePtr>(const String &, Duration) const;
template ExternalLoader::LoadResult ExternalLoader::tryLoad<ExternalLoader::LoadResult>(const String &, Duration) const;
template ExternalLoader::Loadables ExternalLoader::tryLoad<ExternalLoader::Loadables>(const FilterByNameFunction &, Duration) const;
template ExternalLoader::LoadResults ExternalLoader::tryLoad<ExternalLoader::LoadResults>(const FilterByNameFunction &, Duration) const;
template ExternalLoader::LoadablePtr ExternalLoader::load<ExternalLoader::LoadablePtr>(const String &) const;
template ExternalLoader::LoadResult ExternalLoader::load<ExternalLoader::LoadResult>(const String &) const;
template ExternalLoader::Loadables ExternalLoader::load<ExternalLoader::Loadables>(const FilterByNameFunction &) const;
template ExternalLoader::LoadResults ExternalLoader::load<ExternalLoader::LoadResults>(const FilterByNameFunction &) const;
template ExternalLoader::LoadablePtr ExternalLoader::loadOrReload<ExternalLoader::LoadablePtr>(const String &) const;
template ExternalLoader::LoadResult ExternalLoader::loadOrReload<ExternalLoader::LoadResult>(const String &) const;
template ExternalLoader::Loadables ExternalLoader::loadOrReload<ExternalLoader::Loadables>(const FilterByNameFunction &) const;
template ExternalLoader::LoadResults ExternalLoader::loadOrReload<ExternalLoader::LoadResults>(const FilterByNameFunction &) const;
template ExternalLoader::Loadables ExternalLoader::reloadAllTriedToLoad<ExternalLoader::Loadables>() const;
template ExternalLoader::LoadResults ExternalLoader::reloadAllTriedToLoad<ExternalLoader::LoadResults>() const;
}

View File

@ -65,8 +65,8 @@ public:
struct LoadResult
{
LoadResult(Status status_) : status(status_) {}
Status status;
Status status = Status::NOT_EXIST;
String name;
LoadablePtr object;
String origin;
TimePoint loading_start_time;
@ -75,7 +75,13 @@ public:
std::string repository_name;
};
using LoadResults = std::vector<std::pair<String, LoadResult>>;
using LoadResults = std::vector<LoadResult>;
template <typename T>
static constexpr bool is_scalar_load_result_type = std::is_same_v<T, LoadResult> || std::is_same_v<T, LoadablePtr>;
template <typename T>
static constexpr bool is_vector_load_result_type = std::is_same_v<T, LoadResults> || std::is_same_v<T, Loadables>;
ExternalLoader(const String & type_name_, Logger * log);
virtual ~ExternalLoader();
@ -105,63 +111,88 @@ public:
/// Returns the result of loading the object.
/// The function doesn't load anything, it just returns the current load result as is.
LoadResult getCurrentLoadResult(const String & name) const;
template <typename ReturnType = LoadResult, typename = std::enable_if_t<is_scalar_load_result_type<ReturnType>, void>>
ReturnType getCurrentLoadResult(const String & name) const;
using FilterByNameFunction = std::function<bool(const String &)>;
/// Returns all the load results as a map.
/// The function doesn't load anything, it just returns the current load results as is.
LoadResults getCurrentLoadResults() const;
LoadResults getCurrentLoadResults(const FilterByNameFunction & filter_by_name) const;
template <typename ReturnType = LoadResults, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
ReturnType getCurrentLoadResults() const { return getCurrentLoadResults<ReturnType>(alwaysTrue); }
template <typename ReturnType = LoadResults, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
ReturnType getCurrentLoadResults(const FilterByNameFunction & filter) const;
/// Returns all loaded objects as a map.
/// The function doesn't load anything, it just returns the current load results as is.
Loadables getCurrentlyLoadedObjects() const;
Loadables getCurrentlyLoadedObjects(const FilterByNameFunction & filter_by_name) const;
size_t getNumberOfCurrentlyLoadedObjects() const;
Loadables getCurrentlyLoadedObjects(const FilterByNameFunction & filter) const;
/// Returns true if any object was loaded.
bool hasCurrentlyLoadedObjects() const;
size_t getNumberOfCurrentlyLoadedObjects() const;
static constexpr Duration NO_TIMEOUT = Duration::max();
static constexpr Duration NO_WAIT = Duration::zero();
static constexpr Duration WAIT = Duration::max();
/// Tries to finish loading of a specified object during the timeout.
/// Returns nullptr if the loading is unsuccessful or if there is no such object.
void load(const String & name, LoadablePtr & loaded_object, Duration timeout = NO_TIMEOUT) const;
void load(const String & name) const { LoadablePtr object; load(name, object, Duration::zero()); }
LoadablePtr loadAndGet(const String & name, Duration timeout = NO_TIMEOUT) const { LoadablePtr object; load(name, object, timeout); return object; }
LoadablePtr tryGetLoadable(const String & name) const { return loadAndGet(name); }
/// Loads a specified object.
/// The function does nothing if it's already loaded.
/// The function doesn't throw an exception if it's failed to load.
template <typename ReturnType = LoadablePtr, typename = std::enable_if_t<is_scalar_load_result_type<ReturnType>, void>>
ReturnType tryLoad(const String & name, Duration timeout = WAIT) const;
/// Tries to finish loading of a specified object during the timeout.
/// Throws an exception if the loading is unsuccessful or if there is no such object.
void loadStrict(const String & name, LoadablePtr & loaded_object) const;
void loadStrict(const String & name) const { LoadablePtr object; loadStrict(name, object); }
LoadablePtr getLoadable(const String & name) const { LoadablePtr object; loadStrict(name, object); return object; }
/// Loads objects by filter.
/// The function does nothing for already loaded objects, it just returns them.
/// The function doesn't throw an exception if it's failed to load something.
template <typename ReturnType = Loadables, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
ReturnType tryLoad(const FilterByNameFunction & filter, Duration timeout = WAIT) const;
/// Tries to finish loading of the objects for which the specified function returns true.
void load(const FilterByNameFunction & filter_by_name) const { Loadables objects; load(filter_by_name, objects, Duration::zero()); }
void load(const FilterByNameFunction & filter_by_name, Loadables & loaded_objects, Duration timeout = NO_TIMEOUT) const;
void load(const FilterByNameFunction & filter_by_name, LoadResults & load_results, Duration timeout = NO_TIMEOUT) const;
Loadables loadAndGet(const FilterByNameFunction & filter_by_name, Duration timeout = NO_TIMEOUT) const { Loadables loaded_objects; load(filter_by_name, loaded_objects, timeout); return loaded_objects; }
/// Loads all objects.
/// The function does nothing for already loaded objects, it just returns them.
/// The function doesn't throw an exception if it's failed to load something.
template <typename ReturnType = Loadables, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
ReturnType tryLoadAll(Duration timeout = WAIT) const { return tryLoad<ReturnType>(alwaysTrue, timeout); }
/// Tries to finish loading of all the objects during the timeout.
void load(Loadables & loaded_objects, Duration timeout = NO_TIMEOUT) const;
/// Loads a specified object.
/// The function does nothing if it's already loaded.
/// The function throws an exception if it's failed to load.
template <typename ReturnType = LoadablePtr, typename = std::enable_if_t<is_scalar_load_result_type<ReturnType>, void>>
ReturnType load(const String & name) const;
/// Starts reloading of a specified object.
/// `load_never_loading` specifies what to do if the object has never been loading before.
/// The function can either skip it (false) or load for the first time (true).
/// Also function can load dictionary synchronously
void reload(const String & name, bool load_never_loading = false) const;
/// Loads objects by filter.
/// The function does nothing for already loaded objects, it just returns them.
/// The function throws an exception if it's failed to load something.
template <typename ReturnType = Loadables, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
ReturnType load(const FilterByNameFunction & filter) const;
/// Starts reloading of all the objects.
/// `load_never_loading` specifies what to do with the objects which have never been loading before.
/// The function can either skip them (false) or load for the first time (true).
void reload(bool load_never_loading = false) const;
/// Loads all objects. Not recommended to use.
/// The function does nothing for already loaded objects, it just returns them.
/// The function throws an exception if it's failed to load something.
template <typename ReturnType = Loadables, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
ReturnType loadAll() const { return load<ReturnType>(alwaysTrue); }
/// Starts reloading of all objects matched `filter_by_name`.
/// `load_never_loading` specifies what to do with the objects which have never been loading before.
/// The function can either skip them (false) or load for the first time (true).
void reload(const FilterByNameFunction & filter_by_name, bool load_never_loading = false) const;
/// Loads or reloads a specified object.
/// The function reloads the object if it's already loaded.
/// The function throws an exception if it's failed to load or reload.
template <typename ReturnType = LoadablePtr, typename = std::enable_if_t<is_scalar_load_result_type<ReturnType>, void>>
ReturnType loadOrReload(const String & name) const;
/// Loads or reloads objects by filter.
/// The function reloads the objects which are already loaded.
/// The function throws an exception if it's failed to load or reload something.
template <typename ReturnType = Loadables, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
ReturnType loadOrReload(const FilterByNameFunction & filter) const;
/// Load or reloads all objects. Not recommended to use.
/// The function throws an exception if it's failed to load or reload something.
template <typename ReturnType = Loadables, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
ReturnType loadOrReloadAll() const { return loadOrReload<ReturnType>(alwaysTrue); }
/// Reloads objects by filter which were tried to load before (successfully or not).
/// The function throws an exception if it's failed to load or reload something.
template <typename ReturnType = Loadables, typename = std::enable_if_t<is_vector_load_result_type<ReturnType>, void>>
ReturnType reloadAllTriedToLoad() const;
/// Reloads all config repositories.
void reloadConfig() const;
@ -176,8 +207,13 @@ protected:
virtual LoadablePtr create(const String & name, const Poco::Util::AbstractConfiguration & config, const String & key_in_config) const = 0;
private:
struct ObjectConfig;
void checkLoaded(const LoadResult & result, bool check_no_errors) const;
void checkLoaded(const LoadResults & results, bool check_no_errors) const;
static bool alwaysTrue(const String &) { return true; }
Strings getAllTriedToLoadNames() const;
struct ObjectConfig;
LoadablePtr createObject(const String & name, const ObjectConfig & config, const LoadablePtr & previous_version) const;
class LoadablesConfigReader;
@ -190,6 +226,7 @@ private:
std::unique_ptr<PeriodicUpdater> periodic_updater;
const String type_name;
Poco::Logger * log;
};
String toString(ExternalLoader::Status status);

View File

@ -22,7 +22,7 @@ public:
ModelPtr getModel(const std::string & name) const
{
return std::static_pointer_cast<const IModel>(getLoadable(name));
return std::static_pointer_cast<const IModel>(load(name));
}
void addConfigRepository(const String & name,

View File

@ -167,11 +167,11 @@ BlockIO InterpreterSystemQuery::execute()
break;
#endif
case Type::RELOAD_DICTIONARY:
system_context.getExternalDictionariesLoader().reload(query.target_dictionary, true /* load the dictionary even if it wasn't loading before */);
system_context.getExternalDictionariesLoader().loadOrReload(query.target_dictionary);
break;
case Type::RELOAD_DICTIONARIES:
executeCommandsAndThrowIfError(
[&] () { system_context.getExternalDictionariesLoader().reload(); },
[&] () { system_context.getExternalDictionariesLoader().reloadAllTriedToLoad(); },
[&] () { system_context.getEmbeddedDictionaries().reload(); }
);
break;

View File

@ -48,19 +48,19 @@ NamesAndTypesList StorageSystemDictionaries::getNamesAndTypes()
void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Context & context, const SelectQueryInfo & /*query_info*/) const
{
const auto & external_dictionaries = context.getExternalDictionariesLoader();
for (const auto & [dict_name, load_result] : external_dictionaries.getCurrentLoadResults())
for (const auto & load_result : external_dictionaries.getCurrentLoadResults())
{
if (startsWith(load_result.repository_name, IExternalLoaderConfigRepository::INTERNAL_REPOSITORY_NAME_PREFIX))
continue;
size_t i = 0;
String database;
String short_name = dict_name;
String short_name = load_result.name;
if (!load_result.repository_name.empty() && startsWith(dict_name, load_result.repository_name + "."))
if (!load_result.repository_name.empty() && startsWith(load_result.name, load_result.repository_name + "."))
{
database = load_result.repository_name;
short_name = dict_name.substr(load_result.repository_name.length() + 1);
short_name = load_result.name.substr(load_result.repository_name.length() + 1);
}
res_columns[i++]->insert(database);

View File

@ -30,9 +30,9 @@ void StorageSystemModels::fillData(MutableColumns & res_columns, const Context &
const auto & external_models_loader = context.getExternalModelsLoader();
auto load_results = external_models_loader.getCurrentLoadResults();
for (const auto & [model_name, load_result] : load_results)
for (const auto & load_result : load_results)
{
res_columns[0]->insert(model_name);
res_columns[0]->insert(load_result.name);
res_columns[1]->insert(static_cast<Int8>(load_result.status));
res_columns[2]->insert(load_result.origin);