mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Merge pull request #4962 from vitlibar/resolve-dictionary-depends-on-dictionary
Resolve correctly when dictionary depends on dictionary
This commit is contained in:
commit
f0c7e56603
@ -65,21 +65,12 @@ StoragePtr DatabaseDictionary::tryGetTable(
|
||||
const Context & context,
|
||||
const String & table_name) const
|
||||
{
|
||||
auto objects_map = context.getExternalDictionaries().getObjectsMap();
|
||||
const auto & dictionaries = objects_map.get();
|
||||
|
||||
auto dict_ptr = context.getExternalDictionaries().tryGetDictionary(table_name);
|
||||
if (dict_ptr)
|
||||
{
|
||||
auto it = dictionaries.find(table_name);
|
||||
if (it != dictionaries.end())
|
||||
{
|
||||
const auto & dict_ptr = std::static_pointer_cast<IDictionaryBase>(it->second.loadable);
|
||||
if (dict_ptr)
|
||||
{
|
||||
const DictionaryStructure & dictionary_structure = dict_ptr->getStructure();
|
||||
auto columns = StorageDictionary::getNamesAndTypes(dictionary_structure);
|
||||
return StorageDictionary::create(table_name, ColumnsDescription{columns}, context, true, table_name);
|
||||
}
|
||||
}
|
||||
const DictionaryStructure & dictionary_structure = dict_ptr->getStructure();
|
||||
auto columns = StorageDictionary::getNamesAndTypes(dictionary_structure);
|
||||
return StorageDictionary::create(table_name, ColumnsDescription{columns}, context, true, table_name);
|
||||
}
|
||||
|
||||
return {};
|
||||
|
@ -104,7 +104,7 @@ struct ContextShared
|
||||
mutable std::recursive_mutex mutex;
|
||||
/// Separate mutex for access of dictionaries. Separate mutex to avoid locks when server doing request to itself.
|
||||
mutable std::mutex embedded_dictionaries_mutex;
|
||||
mutable std::mutex external_dictionaries_mutex;
|
||||
mutable std::recursive_mutex external_dictionaries_mutex;
|
||||
mutable std::mutex external_models_mutex;
|
||||
/// Separate mutex for re-initialization of zookeer session. This operation could take a long time and must not interfere with another operations.
|
||||
mutable std::mutex zookeeper_mutex;
|
||||
@ -1240,44 +1240,38 @@ EmbeddedDictionaries & Context::getEmbeddedDictionariesImpl(const bool throw_on_
|
||||
|
||||
ExternalDictionaries & Context::getExternalDictionariesImpl(const bool throw_on_error) const
|
||||
{
|
||||
{
|
||||
std::lock_guard lock(shared->external_dictionaries_mutex);
|
||||
if (shared->external_dictionaries)
|
||||
return *shared->external_dictionaries;
|
||||
}
|
||||
|
||||
const auto & config = getConfigRef();
|
||||
|
||||
std::lock_guard lock(shared->external_dictionaries_mutex);
|
||||
|
||||
if (!shared->external_dictionaries)
|
||||
{
|
||||
if (!this->global_context)
|
||||
throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
auto config_repository = shared->runtime_components_factory->createExternalDictionariesConfigRepository();
|
||||
|
||||
shared->external_dictionaries.emplace(
|
||||
std::move(config_repository),
|
||||
config,
|
||||
*this->global_context,
|
||||
throw_on_error);
|
||||
shared->external_dictionaries.emplace(std::move(config_repository), config, *this->global_context);
|
||||
shared->external_dictionaries->init(throw_on_error);
|
||||
}
|
||||
|
||||
return *shared->external_dictionaries;
|
||||
}
|
||||
|
||||
ExternalModels & Context::getExternalModelsImpl(bool throw_on_error) const
|
||||
{
|
||||
std::lock_guard lock(shared->external_models_mutex);
|
||||
|
||||
if (!shared->external_models)
|
||||
{
|
||||
if (!this->global_context)
|
||||
throw Exception("Logical error: there is no global context", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
auto config_repository = shared->runtime_components_factory->createExternalModelsConfigRepository();
|
||||
|
||||
shared->external_models.emplace(
|
||||
std::move(config_repository),
|
||||
*this->global_context,
|
||||
throw_on_error);
|
||||
shared->external_models.emplace(std::move(config_repository), *this->global_context);
|
||||
shared->external_models->init(throw_on_error);
|
||||
}
|
||||
|
||||
return *shared->external_models;
|
||||
}
|
||||
|
||||
|
@ -30,8 +30,7 @@ namespace
|
||||
ExternalDictionaries::ExternalDictionaries(
|
||||
std::unique_ptr<IExternalLoaderConfigRepository> config_repository,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
Context & context,
|
||||
bool throw_on_error)
|
||||
Context & context)
|
||||
: ExternalLoader(config,
|
||||
externalDictionariesUpdateSettings,
|
||||
getExternalDictionariesConfigSettings(),
|
||||
@ -40,11 +39,11 @@ ExternalDictionaries::ExternalDictionaries(
|
||||
"external dictionary"),
|
||||
context(context)
|
||||
{
|
||||
init(throw_on_error);
|
||||
}
|
||||
|
||||
|
||||
std::unique_ptr<IExternalLoadable> ExternalDictionaries::create(
|
||||
const std::string & name, const Configuration & config, const std::string & config_prefix)
|
||||
const std::string & name, const Configuration & config, const std::string & config_prefix) const
|
||||
{
|
||||
return DictionaryFactory::instance().create(name, config, config_prefix, context);
|
||||
}
|
||||
|
@ -21,8 +21,7 @@ public:
|
||||
ExternalDictionaries(
|
||||
std::unique_ptr<IExternalLoaderConfigRepository> config_repository,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
Context & context,
|
||||
bool throw_on_error);
|
||||
Context & context);
|
||||
|
||||
/// Forcibly reloads specified dictionary.
|
||||
void reloadDictionary(const std::string & name) { reload(name); }
|
||||
@ -40,7 +39,7 @@ public:
|
||||
protected:
|
||||
|
||||
std::unique_ptr<IExternalLoadable> create(const std::string & name, const Configuration & config,
|
||||
const std::string & config_prefix) override;
|
||||
const std::string & config_prefix) const override;
|
||||
|
||||
using ExternalLoader::getObjectsMap;
|
||||
|
||||
|
@ -57,12 +57,16 @@ ExternalLoader::ExternalLoader(const Poco::Util::AbstractConfiguration & config_
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
void ExternalLoader::init(bool throw_on_error)
|
||||
{
|
||||
if (is_initialized)
|
||||
return;
|
||||
std::call_once(is_initialized_flag, &ExternalLoader::initImpl, this, throw_on_error);
|
||||
}
|
||||
|
||||
is_initialized = true;
|
||||
|
||||
void ExternalLoader::initImpl(bool throw_on_error)
|
||||
{
|
||||
std::lock_guard all_lock(all_mutex);
|
||||
|
||||
{
|
||||
/// During synchronous loading of external dictionaries at moment of query execution,
|
||||
@ -87,13 +91,13 @@ ExternalLoader::~ExternalLoader()
|
||||
|
||||
void ExternalLoader::reloadAndUpdate(bool throw_on_error)
|
||||
{
|
||||
std::lock_guard all_lock(all_mutex);
|
||||
|
||||
reloadFromConfigFiles(throw_on_error);
|
||||
|
||||
/// list of recreated loadable objects to perform delayed removal from unordered_map
|
||||
std::list<std::string> recreated_failed_loadable_objects;
|
||||
|
||||
std::lock_guard all_lock(all_mutex);
|
||||
|
||||
/// retry loading failed loadable objects
|
||||
for (auto & failed_loadable_object : failed_loadable_objects)
|
||||
{
|
||||
@ -250,15 +254,17 @@ void ExternalLoader::reloadAndUpdate(bool throw_on_error)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ExternalLoader::reloadFromConfigFiles(const bool throw_on_error, const bool force_reload, const std::string & only_dictionary)
|
||||
{
|
||||
const auto config_paths = config_repository->list(config_main, config_settings.path_setting_name);
|
||||
std::lock_guard all_lock{all_mutex};
|
||||
|
||||
const auto config_paths = config_repository->list(config_main, config_settings.path_setting_name);
|
||||
for (const auto & config_path : config_paths)
|
||||
{
|
||||
try
|
||||
{
|
||||
reloadFromConfigFile(config_path, throw_on_error, force_reload, only_dictionary);
|
||||
reloadFromConfigFile(config_path, force_reload, only_dictionary);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
@ -270,30 +276,34 @@ void ExternalLoader::reloadFromConfigFiles(const bool throw_on_error, const bool
|
||||
}
|
||||
|
||||
/// erase removed from config loadable objects
|
||||
std::lock_guard lock{map_mutex};
|
||||
|
||||
std::list<std::string> removed_loadable_objects;
|
||||
for (const auto & loadable : loadable_objects)
|
||||
{
|
||||
const auto & current_config = loadable_objects_defined_in_config[loadable.second.origin];
|
||||
if (current_config.find(loadable.first) == std::end(current_config))
|
||||
removed_loadable_objects.emplace_back(loadable.first);
|
||||
std::lock_guard lock{map_mutex};
|
||||
std::list<std::string> removed_loadable_objects;
|
||||
for (const auto & loadable : loadable_objects)
|
||||
{
|
||||
const auto & current_config = loadable_objects_defined_in_config[loadable.second.origin];
|
||||
if (current_config.find(loadable.first) == std::end(current_config))
|
||||
removed_loadable_objects.emplace_back(loadable.first);
|
||||
}
|
||||
for (const auto & name : removed_loadable_objects)
|
||||
loadable_objects.erase(name);
|
||||
}
|
||||
for (const auto & name : removed_loadable_objects)
|
||||
loadable_objects.erase(name);
|
||||
|
||||
/// create all loadable objects which was read from config
|
||||
finishAllReloads(throw_on_error);
|
||||
}
|
||||
|
||||
void ExternalLoader::reloadFromConfigFile(const std::string & config_path, const bool throw_on_error,
|
||||
const bool force_reload, const std::string & loadable_name)
|
||||
|
||||
void ExternalLoader::reloadFromConfigFile(const std::string & config_path, const bool force_reload, const std::string & loadable_name)
|
||||
{
|
||||
// We assume `all_mutex` is already locked.
|
||||
|
||||
if (config_path.empty() || !config_repository->exists(config_path))
|
||||
{
|
||||
LOG_WARNING(log, "config file '" + config_path + "' does not exist");
|
||||
}
|
||||
else
|
||||
{
|
||||
std::lock_guard all_lock(all_mutex);
|
||||
|
||||
auto modification_time_it = last_modification_times.find(config_path);
|
||||
if (modification_time_it == std::end(last_modification_times))
|
||||
modification_time_it = last_modification_times.emplace(config_path, Poco::Timestamp{0}).first;
|
||||
@ -329,103 +339,24 @@ void ExternalLoader::reloadFromConfigFile(const std::string & config_path, const
|
||||
continue;
|
||||
}
|
||||
|
||||
try
|
||||
name = loaded_config->getString(key + "." + config_settings.external_name);
|
||||
if (name.empty())
|
||||
{
|
||||
name = loaded_config->getString(key + "." + config_settings.external_name);
|
||||
if (name.empty())
|
||||
{
|
||||
LOG_WARNING(log, config_path << ": " + config_settings.external_name + " name cannot be empty");
|
||||
continue;
|
||||
}
|
||||
|
||||
loadable_objects_defined_in_config[config_path].emplace(name);
|
||||
if (!loadable_name.empty() && name != loadable_name)
|
||||
continue;
|
||||
|
||||
decltype(loadable_objects.begin()) object_it;
|
||||
{
|
||||
std::lock_guard lock{map_mutex};
|
||||
object_it = loadable_objects.find(name);
|
||||
|
||||
/// Object with the same name was declared in other config file.
|
||||
if (object_it != std::end(loadable_objects) && object_it->second.origin != config_path)
|
||||
throw Exception(object_name + " '" + name + "' from file " + config_path
|
||||
+ " already declared in file " + object_it->second.origin,
|
||||
ErrorCodes::EXTERNAL_LOADABLE_ALREADY_EXISTS);
|
||||
}
|
||||
|
||||
auto object_ptr = create(name, *loaded_config, key);
|
||||
|
||||
/// If the object could not be loaded.
|
||||
if (const auto exception_ptr = object_ptr->getCreationException())
|
||||
{
|
||||
std::chrono::seconds delay(update_settings.backoff_initial_sec);
|
||||
const auto failed_dict_it = failed_loadable_objects.find(name);
|
||||
FailedLoadableInfo info{std::move(object_ptr), std::chrono::system_clock::now() + delay, 0};
|
||||
if (failed_dict_it != std::end(failed_loadable_objects))
|
||||
(*failed_dict_it).second = std::move(info);
|
||||
else
|
||||
failed_loadable_objects.emplace(name, std::move(info));
|
||||
|
||||
std::rethrow_exception(exception_ptr);
|
||||
}
|
||||
else if (object_ptr->supportUpdates())
|
||||
{
|
||||
const auto & lifetime = object_ptr->getLifetime();
|
||||
if (lifetime.min_sec != 0 && lifetime.max_sec != 0)
|
||||
{
|
||||
std::uniform_int_distribution<UInt64> distribution(lifetime.min_sec, lifetime.max_sec);
|
||||
|
||||
update_times[name] = std::chrono::system_clock::now() +
|
||||
std::chrono::seconds{distribution(rnd_engine)};
|
||||
}
|
||||
}
|
||||
|
||||
std::lock_guard lock{map_mutex};
|
||||
|
||||
/// add new loadable object or update an existing version
|
||||
if (object_it == std::end(loadable_objects))
|
||||
loadable_objects.emplace(name, LoadableInfo{std::move(object_ptr), config_path, {}});
|
||||
else
|
||||
{
|
||||
if (object_it->second.loadable)
|
||||
object_it->second.loadable.reset();
|
||||
object_it->second.loadable = std::move(object_ptr);
|
||||
|
||||
/// erase stored exception on success
|
||||
object_it->second.exception = std::exception_ptr{};
|
||||
failed_loadable_objects.erase(name);
|
||||
}
|
||||
LOG_WARNING(log, config_path << ": " + config_settings.external_name + " name cannot be empty");
|
||||
continue;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
if (!name.empty())
|
||||
{
|
||||
/// If the loadable object could not load data or even failed to initialize from the config.
|
||||
/// - all the same we insert information into the `loadable_objects`, with the zero pointer `loadable`.
|
||||
|
||||
std::lock_guard lock{map_mutex};
|
||||
loadable_objects_defined_in_config[config_path].emplace(name);
|
||||
if (!loadable_name.empty() && name != loadable_name)
|
||||
continue;
|
||||
|
||||
const auto exception_ptr = std::current_exception();
|
||||
const auto loadable_it = loadable_objects.find(name);
|
||||
if (loadable_it == std::end(loadable_objects))
|
||||
loadable_objects.emplace(name, LoadableInfo{nullptr, config_path, exception_ptr});
|
||||
else
|
||||
loadable_it->second.exception = exception_ptr;
|
||||
}
|
||||
|
||||
tryLogCurrentException(log, "Cannot create " + object_name + " '"
|
||||
+ name + "' from config path " + config_path);
|
||||
|
||||
/// propagate exception
|
||||
if (throw_on_error)
|
||||
throw;
|
||||
}
|
||||
objects_to_reload.emplace(name, LoadableCreationInfo{name, loaded_config, config_path, key});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ExternalLoader::reload()
|
||||
{
|
||||
reloadFromConfigFiles(true, true);
|
||||
@ -441,10 +372,149 @@ void ExternalLoader::reload(const std::string & name)
|
||||
throw Exception("Failed to load " + object_name + " '" + name + "' during the reload process", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
|
||||
|
||||
void ExternalLoader::finishReload(const std::string & loadable_name, bool throw_on_error) const
|
||||
{
|
||||
// We assume `all_mutex` is already locked.
|
||||
|
||||
auto it = objects_to_reload.find(loadable_name);
|
||||
if (it == objects_to_reload.end())
|
||||
return;
|
||||
|
||||
LoadableCreationInfo creation_info = std::move(it->second);
|
||||
objects_to_reload.erase(it);
|
||||
finishReloadImpl(creation_info, throw_on_error);
|
||||
}
|
||||
|
||||
|
||||
void ExternalLoader::finishAllReloads(bool throw_on_error) const
|
||||
{
|
||||
// We assume `all_mutex` is already locked.
|
||||
|
||||
// We cannot just go through the map `objects_to_create` from begin to end and create every object
|
||||
// because these objects can depend on each other.
|
||||
// For example, if the first object's data depends on the second object's data then
|
||||
// creating the first object will cause creating the second object too.
|
||||
while (!objects_to_reload.empty())
|
||||
{
|
||||
auto it = objects_to_reload.begin();
|
||||
LoadableCreationInfo creation_info = std::move(it->second);
|
||||
objects_to_reload.erase(it);
|
||||
|
||||
try
|
||||
{
|
||||
finishReloadImpl(creation_info, throw_on_error);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
objects_to_reload.clear(); // no more objects to create after an exception
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ExternalLoader::finishReloadImpl(const LoadableCreationInfo & creation_info, bool throw_on_error) const
|
||||
{
|
||||
// We assume `all_mutex` is already locked.
|
||||
|
||||
const std::string & name = creation_info.name;
|
||||
const std::string & config_path = creation_info.config_path;
|
||||
|
||||
try
|
||||
{
|
||||
ObjectsMap::iterator object_it;
|
||||
{
|
||||
std::lock_guard lock{map_mutex};
|
||||
object_it = loadable_objects.find(name);
|
||||
|
||||
/// Object with the same name was declared in other config file.
|
||||
if (object_it != std::end(loadable_objects) && object_it->second.origin != config_path)
|
||||
throw Exception(object_name + " '" + name + "' from file " + config_path
|
||||
+ " already declared in file " + object_it->second.origin,
|
||||
ErrorCodes::EXTERNAL_LOADABLE_ALREADY_EXISTS);
|
||||
}
|
||||
|
||||
auto object_ptr = create(name, *creation_info.config, creation_info.config_prefix);
|
||||
|
||||
/// If the object could not be loaded.
|
||||
if (const auto exception_ptr = object_ptr->getCreationException())
|
||||
{
|
||||
std::chrono::seconds delay(update_settings.backoff_initial_sec);
|
||||
const auto failed_dict_it = failed_loadable_objects.find(name);
|
||||
FailedLoadableInfo info{std::move(object_ptr), std::chrono::system_clock::now() + delay, 0};
|
||||
if (failed_dict_it != std::end(failed_loadable_objects))
|
||||
(*failed_dict_it).second = std::move(info);
|
||||
else
|
||||
failed_loadable_objects.emplace(name, std::move(info));
|
||||
|
||||
std::rethrow_exception(exception_ptr);
|
||||
}
|
||||
else if (object_ptr->supportUpdates())
|
||||
{
|
||||
const auto & lifetime = object_ptr->getLifetime();
|
||||
if (lifetime.min_sec != 0 && lifetime.max_sec != 0)
|
||||
{
|
||||
std::uniform_int_distribution<UInt64> distribution(lifetime.min_sec, lifetime.max_sec);
|
||||
|
||||
update_times[name] = std::chrono::system_clock::now() +
|
||||
std::chrono::seconds{distribution(rnd_engine)};
|
||||
}
|
||||
}
|
||||
|
||||
std::lock_guard lock{map_mutex};
|
||||
|
||||
/// add new loadable object or update an existing version
|
||||
if (object_it == std::end(loadable_objects))
|
||||
loadable_objects.emplace(name, LoadableInfo{std::move(object_ptr), config_path, {}});
|
||||
else
|
||||
{
|
||||
if (object_it->second.loadable)
|
||||
object_it->second.loadable.reset();
|
||||
object_it->second.loadable = std::move(object_ptr);
|
||||
|
||||
/// erase stored exception on success
|
||||
object_it->second.exception = std::exception_ptr{};
|
||||
failed_loadable_objects.erase(name);
|
||||
}
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
if (!name.empty())
|
||||
{
|
||||
/// If the loadable object could not load data or even failed to initialize from the config.
|
||||
/// - all the same we insert information into the `loadable_objects`, with the zero pointer `loadable`.
|
||||
|
||||
std::lock_guard lock{map_mutex};
|
||||
|
||||
const auto exception_ptr = std::current_exception();
|
||||
const auto loadable_it = loadable_objects.find(name);
|
||||
if (loadable_it == std::end(loadable_objects))
|
||||
loadable_objects.emplace(name, LoadableInfo{nullptr, config_path, exception_ptr});
|
||||
else
|
||||
loadable_it->second.exception = exception_ptr;
|
||||
}
|
||||
|
||||
tryLogCurrentException(log, "Cannot create " + object_name + " '"
|
||||
+ name + "' from config path " + config_path);
|
||||
|
||||
/// propagate exception
|
||||
if (throw_on_error)
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ExternalLoader::LoadablePtr ExternalLoader::getLoadableImpl(const std::string & name, bool throw_on_error) const
|
||||
{
|
||||
std::lock_guard lock{map_mutex};
|
||||
/// We try to finish the reloading of the object `name` here, before searching it in the map `loadable_objects` later in this function.
|
||||
/// If some other thread is already doing this reload work we don't want to wait until it finishes, because it's faster to just use
|
||||
/// the current version of this loadable object. That's why we use try_lock() instead of lock() here.
|
||||
std::unique_lock all_lock{all_mutex, std::defer_lock};
|
||||
if (all_lock.try_lock())
|
||||
finishReload(name, throw_on_error);
|
||||
|
||||
std::lock_guard lock{map_mutex};
|
||||
const auto it = loadable_objects.find(name);
|
||||
if (it == std::end(loadable_objects))
|
||||
{
|
||||
|
@ -89,7 +89,7 @@ public:
|
||||
using Configuration = Poco::Util::AbstractConfiguration;
|
||||
using ObjectsMap = std::unordered_map<std::string, LoadableInfo>;
|
||||
|
||||
/// Objects will be loaded immediately and then will be updated in separate thread, each 'reload_period' seconds.
|
||||
/// Call init() after constructing the instance of any derived class.
|
||||
ExternalLoader(const Configuration & config_main,
|
||||
const ExternalLoaderUpdateSettings & update_settings,
|
||||
const ExternalLoaderConfigSettings & config_settings,
|
||||
@ -97,6 +97,11 @@ public:
|
||||
Logger * log, const std::string & loadable_object_name);
|
||||
virtual ~ExternalLoader();
|
||||
|
||||
/// Should be called after creating an instance of a derived class.
|
||||
/// Loads the objects immediately and starts a separate thread to update them once in each 'reload_period' seconds.
|
||||
/// This function does nothing if called again.
|
||||
void init(bool throw_on_error);
|
||||
|
||||
/// Forcibly reloads all loadable objects.
|
||||
void reload();
|
||||
|
||||
@ -108,7 +113,7 @@ public:
|
||||
|
||||
protected:
|
||||
virtual std::unique_ptr<IExternalLoadable> create(const std::string & name, const Configuration & config,
|
||||
const std::string & config_prefix) = 0;
|
||||
const std::string & config_prefix) const = 0;
|
||||
|
||||
class LockedObjectsMap
|
||||
{
|
||||
@ -123,12 +128,8 @@ protected:
|
||||
/// Direct access to objects.
|
||||
LockedObjectsMap getObjectsMap() const;
|
||||
|
||||
/// Should be called in derived constructor (to avoid pure virtual call).
|
||||
void init(bool throw_on_error);
|
||||
|
||||
private:
|
||||
|
||||
bool is_initialized = false;
|
||||
std::once_flag is_initialized_flag;
|
||||
|
||||
/// Protects only objects map.
|
||||
/** Reading and assignment of "loadable" should be done under mutex.
|
||||
@ -136,22 +137,35 @@ private:
|
||||
*/
|
||||
mutable std::mutex map_mutex;
|
||||
|
||||
/// Protects all data, currently used to avoid races between updating thread and SYSTEM queries
|
||||
mutable std::mutex all_mutex;
|
||||
/// Protects all data, currently used to avoid races between updating thread and SYSTEM queries.
|
||||
/// The mutex is recursive because creating of objects might be recursive, i.e.
|
||||
/// creating objects might cause creating other objects.
|
||||
mutable std::recursive_mutex all_mutex;
|
||||
|
||||
/// name -> loadable.
|
||||
ObjectsMap loadable_objects;
|
||||
mutable ObjectsMap loadable_objects;
|
||||
|
||||
struct LoadableCreationInfo
|
||||
{
|
||||
std::string name;
|
||||
Poco::AutoPtr<Poco::Util::AbstractConfiguration> config;
|
||||
std::string config_path;
|
||||
std::string config_prefix;
|
||||
};
|
||||
|
||||
/// Objects which should be reloaded soon.
|
||||
mutable std::unordered_map<std::string, LoadableCreationInfo> objects_to_reload;
|
||||
|
||||
/// Here are loadable objects, that has been never loaded successfully.
|
||||
/// They are also in 'loadable_objects', but with nullptr as 'loadable'.
|
||||
std::unordered_map<std::string, FailedLoadableInfo> failed_loadable_objects;
|
||||
mutable std::unordered_map<std::string, FailedLoadableInfo> failed_loadable_objects;
|
||||
|
||||
/// Both for loadable_objects and failed_loadable_objects.
|
||||
std::unordered_map<std::string, std::chrono::system_clock::time_point> update_times;
|
||||
mutable std::unordered_map<std::string, std::chrono::system_clock::time_point> update_times;
|
||||
|
||||
std::unordered_map<std::string, std::unordered_set<std::string>> loadable_objects_defined_in_config;
|
||||
|
||||
pcg64 rnd_engine{randomSeed()};
|
||||
mutable pcg64 rnd_engine{randomSeed()};
|
||||
|
||||
const Configuration & config_main;
|
||||
const ExternalLoaderUpdateSettings & update_settings;
|
||||
@ -168,17 +182,22 @@ private:
|
||||
|
||||
std::unordered_map<std::string, Poco::Timestamp> last_modification_times;
|
||||
|
||||
void initImpl(bool throw_on_error);
|
||||
|
||||
/// Check objects definitions in config files and reload or/and add new ones if the definition is changed
|
||||
/// If loadable_name is not empty, load only loadable object with name loadable_name
|
||||
void reloadFromConfigFiles(bool throw_on_error, bool force_reload = false, const std::string & loadable_name = "");
|
||||
void reloadFromConfigFile(const std::string & config_path, const bool throw_on_error,
|
||||
const bool force_reload, const std::string & loadable_name);
|
||||
void reloadFromConfigFile(const std::string & config_path, const bool force_reload, const std::string & loadable_name);
|
||||
|
||||
/// Check config files and update expired loadable objects
|
||||
void reloadAndUpdate(bool throw_on_error = false);
|
||||
|
||||
void reloadPeriodically();
|
||||
|
||||
void finishReload(const std::string & loadable_name, bool throw_on_error) const;
|
||||
void finishAllReloads(bool throw_on_error) const;
|
||||
void finishReloadImpl(const LoadableCreationInfo & creation_info, bool throw_on_error) const;
|
||||
|
||||
LoadablePtr getLoadableImpl(const std::string & name, bool throw_on_error) const;
|
||||
};
|
||||
|
||||
|
@ -33,8 +33,7 @@ namespace
|
||||
|
||||
ExternalModels::ExternalModels(
|
||||
std::unique_ptr<IExternalLoaderConfigRepository> config_repository,
|
||||
Context & context,
|
||||
bool throw_on_error)
|
||||
Context & context)
|
||||
: ExternalLoader(context.getConfigRef(),
|
||||
externalModelsUpdateSettings,
|
||||
getExternalModelsConfigSettings(),
|
||||
@ -43,11 +42,10 @@ ExternalModels::ExternalModels(
|
||||
"external model"),
|
||||
context(context)
|
||||
{
|
||||
init(throw_on_error);
|
||||
}
|
||||
|
||||
std::unique_ptr<IExternalLoadable> ExternalModels::create(
|
||||
const std::string & name, const Configuration & config, const std::string & config_prefix)
|
||||
const std::string & name, const Configuration & config, const std::string & config_prefix) const
|
||||
{
|
||||
String type = config.getString(config_prefix + ".type");
|
||||
ExternalLoadableLifetime lifetime(config, config_prefix + ".lifetime");
|
||||
|
@ -20,8 +20,7 @@ public:
|
||||
/// Models will be loaded immediately and then will be updated in separate thread, each 'reload_period' seconds.
|
||||
ExternalModels(
|
||||
std::unique_ptr<IExternalLoaderConfigRepository> config_repository,
|
||||
Context & context,
|
||||
bool throw_on_error);
|
||||
Context & context);
|
||||
|
||||
/// Forcibly reloads specified model.
|
||||
void reloadModel(const std::string & name) { reload(name); }
|
||||
@ -34,7 +33,7 @@ public:
|
||||
protected:
|
||||
|
||||
std::unique_ptr<IExternalLoadable> create(const std::string & name, const Configuration & config,
|
||||
const std::string & config_prefix) override;
|
||||
const std::string & config_prefix) const override;
|
||||
|
||||
using ExternalLoader::getObjectsMap;
|
||||
|
||||
|
@ -1 +0,0 @@
|
||||
dictionaries
|
4
dbms/tests/integration/test_dictionaries/configs/dictionaries/.gitignore
vendored
Normal file
4
dbms/tests/integration/test_dictionaries/configs/dictionaries/.gitignore
vendored
Normal file
@ -0,0 +1,4 @@
|
||||
*
|
||||
!.gitignore
|
||||
!source.tsv
|
||||
!dictionary_preset*
|
@ -0,0 +1,30 @@
|
||||
|
||||
<dictionaries>
|
||||
<dictionary>
|
||||
<name>dep_x</name>
|
||||
<source>
|
||||
<clickhouse>
|
||||
<host>localhost</host>
|
||||
<port>9000</port>
|
||||
<user>default</user>
|
||||
<password></password>
|
||||
<db>dict</db>
|
||||
<table>dep_z</table>
|
||||
</clickhouse>
|
||||
</source>
|
||||
<lifetime>60</lifetime>
|
||||
<layout>
|
||||
<flat/>
|
||||
</layout>
|
||||
<structure>
|
||||
<id>
|
||||
<name>id</name>
|
||||
</id>
|
||||
<attribute>
|
||||
<name>String_</name>
|
||||
<type>String</type>
|
||||
<null_value>XX</null_value>
|
||||
</attribute>
|
||||
</structure>
|
||||
</dictionary>
|
||||
</dictionaries>
|
@ -0,0 +1,40 @@
|
||||
|
||||
<dictionaries>
|
||||
<dictionary>
|
||||
<name>dep_y</name>
|
||||
<source>
|
||||
<clickhouse>
|
||||
<host>localhost</host>
|
||||
<port>9000</port>
|
||||
<user>default</user>
|
||||
<password></password>
|
||||
<db>test</db>
|
||||
<table>dictionary_source</table>
|
||||
</clickhouse>
|
||||
</source>
|
||||
<lifetime>60</lifetime>
|
||||
<layout>
|
||||
<flat/>
|
||||
</layout>
|
||||
<structure>
|
||||
<id>
|
||||
<name>id</name>
|
||||
</id>
|
||||
<attribute>
|
||||
<name>Int64_</name>
|
||||
<type>Int64</type>
|
||||
<null_value>121</null_value>
|
||||
</attribute>
|
||||
<attribute>
|
||||
<name>Float32_</name>
|
||||
<type>Float32</type>
|
||||
<null_value>121</null_value>
|
||||
</attribute>
|
||||
<attribute>
|
||||
<name>String_</name>
|
||||
<type>String</type>
|
||||
<null_value>YY</null_value>
|
||||
</attribute>
|
||||
</structure>
|
||||
</dictionary>
|
||||
</dictionaries>
|
@ -0,0 +1,35 @@
|
||||
|
||||
<dictionaries>
|
||||
<dictionary>
|
||||
<name>dep_z</name>
|
||||
<source>
|
||||
<clickhouse>
|
||||
<host>localhost</host>
|
||||
<port>9000</port>
|
||||
<user>default</user>
|
||||
<password></password>
|
||||
<db>dict</db>
|
||||
<table>dep_y</table>
|
||||
</clickhouse>
|
||||
</source>
|
||||
<lifetime>60</lifetime>
|
||||
<layout>
|
||||
<flat/>
|
||||
</layout>
|
||||
<structure>
|
||||
<id>
|
||||
<name>id</name>
|
||||
</id>
|
||||
<attribute>
|
||||
<name>Int64_</name>
|
||||
<type>Int64</type>
|
||||
<null_value>122</null_value>
|
||||
</attribute>
|
||||
<attribute>
|
||||
<name>String_</name>
|
||||
<type>String</type>
|
||||
<null_value>ZZ</null_value>
|
||||
</attribute>
|
||||
</structure>
|
||||
</dictionary>
|
||||
</dictionaries>
|
@ -1,4 +1,5 @@
|
||||
import os
|
||||
import glob
|
||||
import difflib
|
||||
|
||||
files = ['key_simple.tsv', 'key_complex_integers.tsv', 'key_complex_mixed.tsv']
|
||||
@ -181,6 +182,10 @@ def generate_dictionaries(path, structure):
|
||||
|
||||
file_names = []
|
||||
|
||||
# Add ready dictionaries.
|
||||
file_names.extend(glob.glob(os.path.join(path, "*dictionary_preset*.xml")))
|
||||
|
||||
# Generate dictionaries.
|
||||
for (name, key_idx, has_parent), (source, layout) in zip(structure, sources_and_layouts):
|
||||
filename = os.path.join(path, 'dictionary_%s.xml' % name)
|
||||
file_names.append(filename)
|
||||
|
@ -29,6 +29,7 @@ def setup_module(module):
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
instance.query("CREATE DATABASE IF NOT EXISTS dict ENGINE=Dictionary")
|
||||
test_table.create_clickhouse_source(instance)
|
||||
for line in TSV(instance.query('select name from system.dictionaries')).lines:
|
||||
print line,
|
||||
@ -122,6 +123,7 @@ def test_select_all_from_cached(cached_dictionary_structure):
|
||||
print test_table.process_diff(diff)
|
||||
assert not diff
|
||||
|
||||
|
||||
def test_null_value(started_cluster):
|
||||
query = instance.query
|
||||
|
||||
@ -132,3 +134,14 @@ def test_null_value(started_cluster):
|
||||
# Check, that empty null_value interprets as default value
|
||||
assert TSV(query("select dictGetUInt64('clickhouse_cache', 'UInt64_', toUInt64(12121212))")) == TSV("0")
|
||||
assert TSV(query("select dictGetDateTime('clickhouse_cache', 'DateTime_', toUInt64(12121212))")) == TSV("0000-00-00 00:00:00")
|
||||
|
||||
|
||||
def test_dictionary_dependency(started_cluster):
|
||||
query = instance.query
|
||||
|
||||
assert query("SELECT dictGetString('dep_x', 'String_', toUInt64(1))") == "10577349846663553072\n"
|
||||
assert query("SELECT dictGetString('dep_y', 'String_', toUInt64(1))") == "10577349846663553072\n"
|
||||
assert query("SELECT dictGetString('dep_z', 'String_', toUInt64(1))") == "10577349846663553072\n"
|
||||
assert query("SELECT dictGetString('dep_x', 'String_', toUInt64(12121212))") == "XX\n"
|
||||
assert query("SELECT dictGetString('dep_y', 'String_', toUInt64(12121212))") == "YY\n"
|
||||
assert query("SELECT dictGetString('dep_z', 'String_', toUInt64(12121212))") == "ZZ\n"
|
||||
|
Loading…
Reference in New Issue
Block a user