From a140f4127e67d122f18e2909387b684e959e8d3d Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Wed, 13 Oct 2021 09:56:34 +0300 Subject: [PATCH] Fix ExternalLoader without additional mutex. --- src/Interpreters/ExternalLoader.cpp | 66 ++++++++++++++--------------- src/Interpreters/ExternalLoader.h | 8 ---- 2 files changed, 33 insertions(+), 41 deletions(-) diff --git a/src/Interpreters/ExternalLoader.cpp b/src/Interpreters/ExternalLoader.cpp index dc8466f3c26..b2cd9495feb 100644 --- a/src/Interpreters/ExternalLoader.cpp +++ b/src/Interpreters/ExternalLoader.cpp @@ -133,7 +133,13 @@ public: settings = settings_; } - using ObjectConfigsPtr = std::shared_ptr>>; + struct ObjectConfigs + { + std::unordered_map> configs_by_name; + size_t counter = 0; + }; + + using ObjectConfigsPtr = std::shared_ptr; /// Reads all repositories. ObjectConfigsPtr read() @@ -336,7 +342,7 @@ private: need_collect_object_configs = false; // Generate new result. - auto new_configs = std::make_shared>>(); + auto new_configs = std::make_shared(); for (const auto & [repository, repository_info] : repositories) { @@ -344,8 +350,8 @@ private: { for (const auto & [object_name, key_in_config] : file_info.objects) { - auto already_added_it = new_configs->find(object_name); - if (already_added_it == new_configs->end()) + auto already_added_it = new_configs->configs_by_name.find(object_name); + if (already_added_it == new_configs->configs_by_name.end()) { auto new_config = std::make_shared(); new_config->config = file_info.file_contents; @@ -353,7 +359,7 @@ private: new_config->repository_name = repository->getName(); new_config->from_temp_repository = repository->isTemporary(); new_config->path = path; - new_configs->emplace(object_name, std::move(new_config)); + new_configs->configs_by_name.emplace(object_name, std::move(new_config)); } else { @@ -372,6 +378,7 @@ private: } } + new_configs->counter = counter++; object_configs = new_configs; } @@ -383,6 +390,7 @@ private: std::unordered_map repositories; ObjectConfigsPtr object_configs; bool need_collect_object_configs = false; + size_t counter = 0; }; @@ -433,13 +441,22 @@ public: if (configs == new_configs) return; + /// The following check prevents a race when two threads are trying to update configuration + /// at almost the same time: + /// 1) first thread reads a configuration (for example as a part of periodic updates) + /// 2) second thread sets a new configuration (for example after executing CREATE DICTIONARY) + /// 3) first thread sets the configuration it read in 1) and thus discards the changes made in 2). + /// So we use `counter` here to ensure we exchange the current configuration only for a newer one. + if (configs && (configs->counter >= new_configs->counter)) + return; + configs = new_configs; std::vector removed_names; for (auto & [name, info] : infos) { - auto new_config_it = new_configs->find(name); - if (new_config_it == new_configs->end()) + auto new_config_it = new_configs->configs_by_name.find(name); + if (new_config_it == new_configs->configs_by_name.end()) { removed_names.emplace_back(name); } @@ -462,7 +479,7 @@ public: } /// Insert to the map those objects which added to the new configuration. - for (const auto & [name, config] : *new_configs) + for (const auto & [name, config] : new_configs->configs_by_name) { if (infos.find(name) == infos.end()) { @@ -1198,8 +1215,8 @@ class ExternalLoader::PeriodicUpdater : private boost::noncopyable public: static constexpr UInt64 check_period_sec = 5; - PeriodicUpdater(LoadablesConfigReader & config_files_reader_, LoadingDispatcher & loading_dispatcher_, std::recursive_mutex & config_mutex_) - : config_files_reader(config_files_reader_), loading_dispatcher(loading_dispatcher_), config_mutex(config_mutex_) + PeriodicUpdater(LoadablesConfigReader & config_files_reader_, LoadingDispatcher & loading_dispatcher_) + : config_files_reader(config_files_reader_), loading_dispatcher(loading_dispatcher_) { } @@ -1242,21 +1259,14 @@ private: while (!event.wait_for(lock, std::chrono::seconds(check_period_sec), pred)) { lock.unlock(); - { - { - std::lock_guard config_lock{config_mutex}; - loading_dispatcher.setConfiguration(config_files_reader.read()); - } - loading_dispatcher.reloadOutdated(); - } + loading_dispatcher.setConfiguration(config_files_reader.read()); + loading_dispatcher.reloadOutdated(); lock.lock(); } } LoadablesConfigReader & config_files_reader; LoadingDispatcher & loading_dispatcher; - - std::recursive_mutex & config_mutex; mutable std::mutex mutex; bool enabled = false; ThreadFromGlobalPool thread; @@ -1270,7 +1280,7 @@ ExternalLoader::ExternalLoader(const String & type_name_, Poco::Logger * log_) [this](auto && a, auto && b, auto && c) { return createObject(a, b, c); }, type_name_, log_)) - , periodic_updater(std::make_unique(*config_files_reader, *loading_dispatcher, config_mutex)) + , periodic_updater(std::make_unique(*config_files_reader, *loading_dispatcher)) , type_name(type_name_) , log(log_) { @@ -1282,13 +1292,12 @@ scope_guard ExternalLoader::addConfigRepository(std::unique_ptrgetName(); - std::lock_guard lock{config_mutex}; + config_files_reader->addConfigRepository(std::move(repository)); reloadConfig(name); return [this, ptr, name]() { - std::lock_guard config_lock{config_mutex}; config_files_reader->removeConfigRepository(ptr); reloadConfig(name); }; @@ -1387,10 +1396,7 @@ ReturnType ExternalLoader::load(const FilterByNameFunction & filter) const template ReturnType ExternalLoader::loadOrReload(const String & name) const { - { - std::lock_guard lock{config_mutex}; - loading_dispatcher->setConfiguration(config_files_reader->read()); - } + loading_dispatcher->setConfiguration(config_files_reader->read()); auto result = loading_dispatcher->tryLoadOrReload(name, WAIT); checkLoaded(result, true); return convertTo(result); @@ -1399,10 +1405,7 @@ ReturnType ExternalLoader::loadOrReload(const String & name) const template ReturnType ExternalLoader::loadOrReload(const FilterByNameFunction & filter) const { - { - std::lock_guard lock{config_mutex}; - loading_dispatcher->setConfiguration(config_files_reader->read()); - } + loading_dispatcher->setConfiguration(config_files_reader->read()); auto results = loading_dispatcher->tryLoadOrReload(filter, WAIT); checkLoaded(results, true); return convertTo(results); @@ -1490,19 +1493,16 @@ void ExternalLoader::checkLoaded(const ExternalLoader::LoadResults & results, void ExternalLoader::reloadConfig() const { - std::lock_guard lock{config_mutex}; loading_dispatcher->setConfiguration(config_files_reader->read()); } void ExternalLoader::reloadConfig(const String & repository_name) const { - std::lock_guard lock{config_mutex}; loading_dispatcher->setConfiguration(config_files_reader->read(repository_name)); } void ExternalLoader::reloadConfig(const String & repository_name, const String & path) const { - std::lock_guard lock{config_mutex}; loading_dispatcher->setConfiguration(config_files_reader->read(repository_name, path)); } diff --git a/src/Interpreters/ExternalLoader.h b/src/Interpreters/ExternalLoader.h index ed17138754a..81e043e40bf 100644 --- a/src/Interpreters/ExternalLoader.h +++ b/src/Interpreters/ExternalLoader.h @@ -219,14 +219,6 @@ private: LoadablePtr createObject(const String & name, const ObjectConfig & config, const LoadablePtr & previous_version) const; - /// We have to read configuration from LoadablesConfigReader and load configuration using LoadingDispatcher atomically. - /// Otherwise we can read configuration in one thread, then read and load newer configuration in another thread, - /// and then load outdated configuration from the first thread. - /// Remarkably, each class (LoadablesConfigReader, LoadingDispatcher, PeriodicUpdater) has own mutex for own purposes, - /// but it does not save from complex logical race conditions. - /// TODO Refactor dictionaries loading and get rid of this. - mutable std::recursive_mutex config_mutex; - class LoadablesConfigReader; std::unique_ptr config_files_reader;