diff --git a/src/Interpreters/ExternalLoader.cpp b/src/Interpreters/ExternalLoader.cpp index eb7824a1124..09d0ea3a4b7 100644 --- a/src/Interpreters/ExternalLoader.cpp +++ b/src/Interpreters/ExternalLoader.cpp @@ -1198,8 +1198,8 @@ class ExternalLoader::PeriodicUpdater : private boost::noncopyable public: static constexpr UInt64 check_period_sec = 5; - PeriodicUpdater(LoadablesConfigReader & config_files_reader_, LoadingDispatcher & loading_dispatcher_) - : config_files_reader(config_files_reader_), loading_dispatcher(loading_dispatcher_) + PeriodicUpdater(LoadablesConfigReader & config_files_reader_, LoadingDispatcher & loading_dispatcher_, std::recursive_mutex & config_mutex_) + : config_files_reader(config_files_reader_), loading_dispatcher(loading_dispatcher_), config_mutex(config_mutex_) { } @@ -1242,8 +1242,11 @@ private: while (!event.wait_for(lock, std::chrono::seconds(check_period_sec), pred)) { lock.unlock(); - loading_dispatcher.setConfiguration(config_files_reader.read()); - loading_dispatcher.reloadOutdated(); + { + std::lock_guard config_lock{config_mutex}; + loading_dispatcher.setConfiguration(config_files_reader.read()); + loading_dispatcher.reloadOutdated(); + } lock.lock(); } } @@ -1251,6 +1254,7 @@ private: LoadablesConfigReader & config_files_reader; LoadingDispatcher & loading_dispatcher; + std::recursive_mutex & config_mutex; mutable std::mutex mutex; bool enabled = false; ThreadFromGlobalPool thread; @@ -1264,7 +1268,7 @@ ExternalLoader::ExternalLoader(const String & type_name_, Poco::Logger * log_) [this](auto && a, auto && b, auto && c) { return createObject(a, b, c); }, type_name_, log_)) - , periodic_updater(std::make_unique(*config_files_reader, *loading_dispatcher)) + , periodic_updater(std::make_unique(*config_files_reader, *loading_dispatcher, config_mutex)) , type_name(type_name_) , log(log_) { @@ -1276,11 +1280,13 @@ scope_guard ExternalLoader::addConfigRepository(std::unique_ptrgetName(); + std::lock_guard lock{config_mutex}; config_files_reader->addConfigRepository(std::move(repository)); reloadConfig(name); return [this, ptr, name]() { + std::lock_guard config_lock{config_mutex}; config_files_reader->removeConfigRepository(ptr); reloadConfig(name); }; @@ -1379,7 +1385,10 @@ ReturnType ExternalLoader::load(const FilterByNameFunction & filter) const template ReturnType ExternalLoader::loadOrReload(const String & name) const { - loading_dispatcher->setConfiguration(config_files_reader->read()); + { + std::lock_guard lock{config_mutex}; + loading_dispatcher->setConfiguration(config_files_reader->read()); + } auto result = loading_dispatcher->tryLoadOrReload(name, WAIT); checkLoaded(result, true); return convertTo(result); @@ -1388,7 +1397,10 @@ ReturnType ExternalLoader::loadOrReload(const String & name) const template ReturnType ExternalLoader::loadOrReload(const FilterByNameFunction & filter) const { - loading_dispatcher->setConfiguration(config_files_reader->read()); + { + std::lock_guard lock{config_mutex}; + loading_dispatcher->setConfiguration(config_files_reader->read()); + } auto results = loading_dispatcher->tryLoadOrReload(filter, WAIT); checkLoaded(results, true); return convertTo(results); @@ -1476,16 +1488,19 @@ void ExternalLoader::checkLoaded(const ExternalLoader::LoadResults & results, void ExternalLoader::reloadConfig() const { + std::lock_guard lock{config_mutex}; loading_dispatcher->setConfiguration(config_files_reader->read()); } void ExternalLoader::reloadConfig(const String & repository_name) const { + std::lock_guard lock{config_mutex}; loading_dispatcher->setConfiguration(config_files_reader->read(repository_name)); } void ExternalLoader::reloadConfig(const String & repository_name, const String & path) const { + std::lock_guard lock{config_mutex}; loading_dispatcher->setConfiguration(config_files_reader->read(repository_name, path)); } diff --git a/src/Interpreters/ExternalLoader.h b/src/Interpreters/ExternalLoader.h index 2ea79d3aade..1fa11807eba 100644 --- a/src/Interpreters/ExternalLoader.h +++ b/src/Interpreters/ExternalLoader.h @@ -219,6 +219,14 @@ private: LoadablePtr createObject(const String & name, const ObjectConfig & config, const LoadablePtr & previous_version) const; + /// We have to read configuration from LoadablesConfigReader and load configuration using LoadingDispatcher atomically. + /// Otherwise we can read configuration in one thread, then read and load newer configuration in another thread, + /// and then load outdated configuration from the first thread. + /// Remarkably, each class (LoadablesConfigReader, LoadingDispatcher, PeriodicUpdater) has own mutex for own purposes, + /// but it does not save from complex logical race conditions. + /// TODO Refactor dictionaries loading and get rid of this. + mutable std::recursive_mutex config_mutex; + class LoadablesConfigReader; std::unique_ptr config_files_reader;