mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-29 13:10:48 +00:00
Fix ExternalLoader without additional mutex.
This commit is contained in:
parent
9bc7ca6dd5
commit
a140f4127e
@ -133,7 +133,13 @@ public:
|
|||||||
settings = settings_;
|
settings = settings_;
|
||||||
}
|
}
|
||||||
|
|
||||||
using ObjectConfigsPtr = std::shared_ptr<const std::unordered_map<String /* object's name */, std::shared_ptr<const ObjectConfig>>>;
|
struct ObjectConfigs
|
||||||
|
{
|
||||||
|
std::unordered_map<String /* object's name */, std::shared_ptr<const ObjectConfig>> configs_by_name;
|
||||||
|
size_t counter = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
using ObjectConfigsPtr = std::shared_ptr<const ObjectConfigs>;
|
||||||
|
|
||||||
/// Reads all repositories.
|
/// Reads all repositories.
|
||||||
ObjectConfigsPtr read()
|
ObjectConfigsPtr read()
|
||||||
@ -336,7 +342,7 @@ private:
|
|||||||
need_collect_object_configs = false;
|
need_collect_object_configs = false;
|
||||||
|
|
||||||
// Generate new result.
|
// Generate new result.
|
||||||
auto new_configs = std::make_shared<std::unordered_map<String /* object's name */, std::shared_ptr<const ObjectConfig>>>();
|
auto new_configs = std::make_shared<ObjectConfigs>();
|
||||||
|
|
||||||
for (const auto & [repository, repository_info] : repositories)
|
for (const auto & [repository, repository_info] : repositories)
|
||||||
{
|
{
|
||||||
@ -344,8 +350,8 @@ private:
|
|||||||
{
|
{
|
||||||
for (const auto & [object_name, key_in_config] : file_info.objects)
|
for (const auto & [object_name, key_in_config] : file_info.objects)
|
||||||
{
|
{
|
||||||
auto already_added_it = new_configs->find(object_name);
|
auto already_added_it = new_configs->configs_by_name.find(object_name);
|
||||||
if (already_added_it == new_configs->end())
|
if (already_added_it == new_configs->configs_by_name.end())
|
||||||
{
|
{
|
||||||
auto new_config = std::make_shared<ObjectConfig>();
|
auto new_config = std::make_shared<ObjectConfig>();
|
||||||
new_config->config = file_info.file_contents;
|
new_config->config = file_info.file_contents;
|
||||||
@ -353,7 +359,7 @@ private:
|
|||||||
new_config->repository_name = repository->getName();
|
new_config->repository_name = repository->getName();
|
||||||
new_config->from_temp_repository = repository->isTemporary();
|
new_config->from_temp_repository = repository->isTemporary();
|
||||||
new_config->path = path;
|
new_config->path = path;
|
||||||
new_configs->emplace(object_name, std::move(new_config));
|
new_configs->configs_by_name.emplace(object_name, std::move(new_config));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -372,6 +378,7 @@ private:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
new_configs->counter = counter++;
|
||||||
object_configs = new_configs;
|
object_configs = new_configs;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -383,6 +390,7 @@ private:
|
|||||||
std::unordered_map<Repository *, RepositoryInfo> repositories;
|
std::unordered_map<Repository *, RepositoryInfo> repositories;
|
||||||
ObjectConfigsPtr object_configs;
|
ObjectConfigsPtr object_configs;
|
||||||
bool need_collect_object_configs = false;
|
bool need_collect_object_configs = false;
|
||||||
|
size_t counter = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@ -433,13 +441,22 @@ public:
|
|||||||
if (configs == new_configs)
|
if (configs == new_configs)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
/// The following check prevents a race when two threads are trying to update configuration
|
||||||
|
/// at almost the same time:
|
||||||
|
/// 1) first thread reads a configuration (for example as a part of periodic updates)
|
||||||
|
/// 2) second thread sets a new configuration (for example after executing CREATE DICTIONARY)
|
||||||
|
/// 3) first thread sets the configuration it read in 1) and thus discards the changes made in 2).
|
||||||
|
/// So we use `counter` here to ensure we exchange the current configuration only for a newer one.
|
||||||
|
if (configs && (configs->counter >= new_configs->counter))
|
||||||
|
return;
|
||||||
|
|
||||||
configs = new_configs;
|
configs = new_configs;
|
||||||
|
|
||||||
std::vector<String> removed_names;
|
std::vector<String> removed_names;
|
||||||
for (auto & [name, info] : infos)
|
for (auto & [name, info] : infos)
|
||||||
{
|
{
|
||||||
auto new_config_it = new_configs->find(name);
|
auto new_config_it = new_configs->configs_by_name.find(name);
|
||||||
if (new_config_it == new_configs->end())
|
if (new_config_it == new_configs->configs_by_name.end())
|
||||||
{
|
{
|
||||||
removed_names.emplace_back(name);
|
removed_names.emplace_back(name);
|
||||||
}
|
}
|
||||||
@ -462,7 +479,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Insert to the map those objects which added to the new configuration.
|
/// Insert to the map those objects which added to the new configuration.
|
||||||
for (const auto & [name, config] : *new_configs)
|
for (const auto & [name, config] : new_configs->configs_by_name)
|
||||||
{
|
{
|
||||||
if (infos.find(name) == infos.end())
|
if (infos.find(name) == infos.end())
|
||||||
{
|
{
|
||||||
@ -1198,8 +1215,8 @@ class ExternalLoader::PeriodicUpdater : private boost::noncopyable
|
|||||||
public:
|
public:
|
||||||
static constexpr UInt64 check_period_sec = 5;
|
static constexpr UInt64 check_period_sec = 5;
|
||||||
|
|
||||||
PeriodicUpdater(LoadablesConfigReader & config_files_reader_, LoadingDispatcher & loading_dispatcher_, std::recursive_mutex & config_mutex_)
|
PeriodicUpdater(LoadablesConfigReader & config_files_reader_, LoadingDispatcher & loading_dispatcher_)
|
||||||
: config_files_reader(config_files_reader_), loading_dispatcher(loading_dispatcher_), config_mutex(config_mutex_)
|
: config_files_reader(config_files_reader_), loading_dispatcher(loading_dispatcher_)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1242,21 +1259,14 @@ private:
|
|||||||
while (!event.wait_for(lock, std::chrono::seconds(check_period_sec), pred))
|
while (!event.wait_for(lock, std::chrono::seconds(check_period_sec), pred))
|
||||||
{
|
{
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
{
|
loading_dispatcher.setConfiguration(config_files_reader.read());
|
||||||
{
|
loading_dispatcher.reloadOutdated();
|
||||||
std::lock_guard config_lock{config_mutex};
|
|
||||||
loading_dispatcher.setConfiguration(config_files_reader.read());
|
|
||||||
}
|
|
||||||
loading_dispatcher.reloadOutdated();
|
|
||||||
}
|
|
||||||
lock.lock();
|
lock.lock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LoadablesConfigReader & config_files_reader;
|
LoadablesConfigReader & config_files_reader;
|
||||||
LoadingDispatcher & loading_dispatcher;
|
LoadingDispatcher & loading_dispatcher;
|
||||||
|
|
||||||
std::recursive_mutex & config_mutex;
|
|
||||||
mutable std::mutex mutex;
|
mutable std::mutex mutex;
|
||||||
bool enabled = false;
|
bool enabled = false;
|
||||||
ThreadFromGlobalPool thread;
|
ThreadFromGlobalPool thread;
|
||||||
@ -1270,7 +1280,7 @@ ExternalLoader::ExternalLoader(const String & type_name_, Poco::Logger * log_)
|
|||||||
[this](auto && a, auto && b, auto && c) { return createObject(a, b, c); },
|
[this](auto && a, auto && b, auto && c) { return createObject(a, b, c); },
|
||||||
type_name_,
|
type_name_,
|
||||||
log_))
|
log_))
|
||||||
, periodic_updater(std::make_unique<PeriodicUpdater>(*config_files_reader, *loading_dispatcher, config_mutex))
|
, periodic_updater(std::make_unique<PeriodicUpdater>(*config_files_reader, *loading_dispatcher))
|
||||||
, type_name(type_name_)
|
, type_name(type_name_)
|
||||||
, log(log_)
|
, log(log_)
|
||||||
{
|
{
|
||||||
@ -1282,13 +1292,12 @@ scope_guard ExternalLoader::addConfigRepository(std::unique_ptr<IExternalLoaderC
|
|||||||
{
|
{
|
||||||
auto * ptr = repository.get();
|
auto * ptr = repository.get();
|
||||||
String name = ptr->getName();
|
String name = ptr->getName();
|
||||||
std::lock_guard lock{config_mutex};
|
|
||||||
config_files_reader->addConfigRepository(std::move(repository));
|
config_files_reader->addConfigRepository(std::move(repository));
|
||||||
reloadConfig(name);
|
reloadConfig(name);
|
||||||
|
|
||||||
return [this, ptr, name]()
|
return [this, ptr, name]()
|
||||||
{
|
{
|
||||||
std::lock_guard config_lock{config_mutex};
|
|
||||||
config_files_reader->removeConfigRepository(ptr);
|
config_files_reader->removeConfigRepository(ptr);
|
||||||
reloadConfig(name);
|
reloadConfig(name);
|
||||||
};
|
};
|
||||||
@ -1387,10 +1396,7 @@ ReturnType ExternalLoader::load(const FilterByNameFunction & filter) const
|
|||||||
template <typename ReturnType, typename>
|
template <typename ReturnType, typename>
|
||||||
ReturnType ExternalLoader::loadOrReload(const String & name) const
|
ReturnType ExternalLoader::loadOrReload(const String & name) const
|
||||||
{
|
{
|
||||||
{
|
loading_dispatcher->setConfiguration(config_files_reader->read());
|
||||||
std::lock_guard lock{config_mutex};
|
|
||||||
loading_dispatcher->setConfiguration(config_files_reader->read());
|
|
||||||
}
|
|
||||||
auto result = loading_dispatcher->tryLoadOrReload<LoadResult>(name, WAIT);
|
auto result = loading_dispatcher->tryLoadOrReload<LoadResult>(name, WAIT);
|
||||||
checkLoaded(result, true);
|
checkLoaded(result, true);
|
||||||
return convertTo<ReturnType>(result);
|
return convertTo<ReturnType>(result);
|
||||||
@ -1399,10 +1405,7 @@ ReturnType ExternalLoader::loadOrReload(const String & name) const
|
|||||||
template <typename ReturnType, typename>
|
template <typename ReturnType, typename>
|
||||||
ReturnType ExternalLoader::loadOrReload(const FilterByNameFunction & filter) const
|
ReturnType ExternalLoader::loadOrReload(const FilterByNameFunction & filter) const
|
||||||
{
|
{
|
||||||
{
|
loading_dispatcher->setConfiguration(config_files_reader->read());
|
||||||
std::lock_guard lock{config_mutex};
|
|
||||||
loading_dispatcher->setConfiguration(config_files_reader->read());
|
|
||||||
}
|
|
||||||
auto results = loading_dispatcher->tryLoadOrReload<LoadResults>(filter, WAIT);
|
auto results = loading_dispatcher->tryLoadOrReload<LoadResults>(filter, WAIT);
|
||||||
checkLoaded(results, true);
|
checkLoaded(results, true);
|
||||||
return convertTo<ReturnType>(results);
|
return convertTo<ReturnType>(results);
|
||||||
@ -1490,19 +1493,16 @@ void ExternalLoader::checkLoaded(const ExternalLoader::LoadResults & results,
|
|||||||
|
|
||||||
void ExternalLoader::reloadConfig() const
|
void ExternalLoader::reloadConfig() const
|
||||||
{
|
{
|
||||||
std::lock_guard lock{config_mutex};
|
|
||||||
loading_dispatcher->setConfiguration(config_files_reader->read());
|
loading_dispatcher->setConfiguration(config_files_reader->read());
|
||||||
}
|
}
|
||||||
|
|
||||||
void ExternalLoader::reloadConfig(const String & repository_name) const
|
void ExternalLoader::reloadConfig(const String & repository_name) const
|
||||||
{
|
{
|
||||||
std::lock_guard lock{config_mutex};
|
|
||||||
loading_dispatcher->setConfiguration(config_files_reader->read(repository_name));
|
loading_dispatcher->setConfiguration(config_files_reader->read(repository_name));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ExternalLoader::reloadConfig(const String & repository_name, const String & path) const
|
void ExternalLoader::reloadConfig(const String & repository_name, const String & path) const
|
||||||
{
|
{
|
||||||
std::lock_guard lock{config_mutex};
|
|
||||||
loading_dispatcher->setConfiguration(config_files_reader->read(repository_name, path));
|
loading_dispatcher->setConfiguration(config_files_reader->read(repository_name, path));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -219,14 +219,6 @@ private:
|
|||||||
|
|
||||||
LoadablePtr createObject(const String & name, const ObjectConfig & config, const LoadablePtr & previous_version) const;
|
LoadablePtr createObject(const String & name, const ObjectConfig & config, const LoadablePtr & previous_version) const;
|
||||||
|
|
||||||
/// We have to read configuration from LoadablesConfigReader and load configuration using LoadingDispatcher atomically.
|
|
||||||
/// Otherwise we can read configuration in one thread, then read and load newer configuration in another thread,
|
|
||||||
/// and then load outdated configuration from the first thread.
|
|
||||||
/// Remarkably, each class (LoadablesConfigReader, LoadingDispatcher, PeriodicUpdater) has own mutex for own purposes,
|
|
||||||
/// but it does not save from complex logical race conditions.
|
|
||||||
/// TODO Refactor dictionaries loading and get rid of this.
|
|
||||||
mutable std::recursive_mutex config_mutex;
|
|
||||||
|
|
||||||
class LoadablesConfigReader;
|
class LoadablesConfigReader;
|
||||||
std::unique_ptr<LoadablesConfigReader> config_files_reader;
|
std::unique_ptr<LoadablesConfigReader> config_files_reader;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user