mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Merge pull request #3970 from yandex/storage-dictionary-race-condition
Fixed race condition in StorageDictionary
This commit is contained in:
commit
014e344a36
@ -48,12 +48,12 @@ ExternalLoader::ExternalLoader(const Poco::Util::AbstractConfiguration & config_
|
||||
const ExternalLoaderConfigSettings & config_settings,
|
||||
std::unique_ptr<IExternalLoaderConfigRepository> config_repository,
|
||||
Logger * log, const std::string & loadable_object_name)
|
||||
: config_main(config_main)
|
||||
, update_settings(update_settings)
|
||||
, config_settings(config_settings)
|
||||
, config_repository(std::move(config_repository))
|
||||
, log(log)
|
||||
, object_name(loadable_object_name)
|
||||
: config_main(config_main)
|
||||
, update_settings(update_settings)
|
||||
, config_settings(config_settings)
|
||||
, config_repository(std::move(config_repository))
|
||||
, log(log)
|
||||
, object_name(loadable_object_name)
|
||||
{
|
||||
}
|
||||
|
||||
@ -92,7 +92,7 @@ void ExternalLoader::reloadAndUpdate(bool throw_on_error)
|
||||
/// list of recreated loadable objects to perform delayed removal from unordered_map
|
||||
std::list<std::string> recreated_failed_loadable_objects;
|
||||
|
||||
std::unique_lock<std::mutex> all_lock(all_mutex);
|
||||
std::lock_guard all_lock(all_mutex);
|
||||
|
||||
/// retry loading failed loadable objects
|
||||
for (auto & failed_loadable_object : failed_loadable_objects)
|
||||
@ -109,11 +109,11 @@ void ExternalLoader::reloadAndUpdate(bool throw_on_error)
|
||||
{
|
||||
/// recalculate next attempt time
|
||||
std::uniform_int_distribution<UInt64> distribution(
|
||||
0, static_cast<UInt64>(std::exp2(failed_loadable_object.second.error_count)));
|
||||
0, static_cast<UInt64>(std::exp2(failed_loadable_object.second.error_count)));
|
||||
|
||||
std::chrono::seconds delay(std::min<UInt64>(
|
||||
update_settings.backoff_max_sec,
|
||||
update_settings.backoff_initial_sec + distribution(rnd_engine)));
|
||||
update_settings.backoff_max_sec,
|
||||
update_settings.backoff_initial_sec + distribution(rnd_engine)));
|
||||
failed_loadable_object.second.next_attempt_time = std::chrono::system_clock::now() + delay;
|
||||
|
||||
++failed_loadable_object.second.error_count;
|
||||
@ -122,7 +122,7 @@ void ExternalLoader::reloadAndUpdate(bool throw_on_error)
|
||||
}
|
||||
else
|
||||
{
|
||||
const std::lock_guard<std::mutex> lock{map_mutex};
|
||||
std::lock_guard lock{map_mutex};
|
||||
|
||||
const auto & lifetime = loadable_ptr->getLifetime();
|
||||
std::uniform_int_distribution<UInt64> distribution{lifetime.min_sec, lifetime.max_sec};
|
||||
@ -153,62 +153,80 @@ void ExternalLoader::reloadAndUpdate(bool throw_on_error)
|
||||
failed_loadable_objects.erase(name);
|
||||
|
||||
/// periodic update
|
||||
for (auto & loadable_object : loadable_objects)
|
||||
{
|
||||
const auto & name = loadable_object.first;
|
||||
std::vector<std::pair<std::string, LoadablePtr>> objects_to_update;
|
||||
|
||||
try
|
||||
/// Collect objects that needs to be updated under lock. Then create new versions without lock, and assign under lock.
|
||||
{
|
||||
std::lock_guard lock{map_mutex};
|
||||
|
||||
for (auto & loadable_object : loadable_objects)
|
||||
{
|
||||
/// If the loadable objects failed to load or even failed to initialize from the config.
|
||||
if (!loadable_object.second.loadable)
|
||||
continue;
|
||||
|
||||
auto current = loadable_object.second.loadable;
|
||||
const LoadablePtr & current = loadable_object.second.loadable;
|
||||
const auto & lifetime = current->getLifetime();
|
||||
|
||||
/// do not update loadable objects with zero as lifetime
|
||||
if (lifetime.min_sec == 0 || lifetime.max_sec == 0)
|
||||
continue;
|
||||
|
||||
if (current->supportUpdates())
|
||||
{
|
||||
auto & update_time = update_times[current->getName()];
|
||||
if (!current->supportUpdates())
|
||||
continue;
|
||||
|
||||
/// check that timeout has passed
|
||||
if (std::chrono::system_clock::now() < update_time)
|
||||
continue;
|
||||
auto update_time = update_times[current->getName()];
|
||||
|
||||
SCOPE_EXIT({
|
||||
/// calculate next update time
|
||||
std::uniform_int_distribution<UInt64> distribution{lifetime.min_sec, lifetime.max_sec};
|
||||
update_time = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)};
|
||||
});
|
||||
/// check that timeout has passed
|
||||
if (std::chrono::system_clock::now() < update_time)
|
||||
continue;
|
||||
|
||||
/// check source modified
|
||||
if (current->isModified())
|
||||
{
|
||||
/// create new version of loadable object
|
||||
auto new_version = current->clone();
|
||||
if (!current->isModified())
|
||||
continue;
|
||||
|
||||
if (const auto exception_ptr = new_version->getCreationException())
|
||||
std::rethrow_exception(exception_ptr);
|
||||
objects_to_update.emplace_back(loadable_object.first, current);
|
||||
}
|
||||
}
|
||||
|
||||
loadable_object.second.loadable.reset();
|
||||
loadable_object.second.loadable = std::move(new_version);
|
||||
}
|
||||
}
|
||||
for (auto & [name, current] : objects_to_update)
|
||||
{
|
||||
LoadablePtr new_version;
|
||||
std::exception_ptr exception;
|
||||
|
||||
/// erase stored exception on success
|
||||
loadable_object.second.exception = std::exception_ptr{};
|
||||
try
|
||||
{
|
||||
/// create new version of loadable object
|
||||
new_version = current->clone();
|
||||
exception = new_version->getCreationException();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
loadable_object.second.exception = std::current_exception();
|
||||
exception = std::current_exception();
|
||||
}
|
||||
|
||||
tryLogCurrentException(log, "Cannot update " + object_name + " '" + name + "', leaving old version");
|
||||
{
|
||||
std::lock_guard lock{map_mutex};
|
||||
|
||||
if (throw_on_error)
|
||||
throw;
|
||||
if (auto it = loadable_objects.find(name); it != loadable_objects.end())
|
||||
{
|
||||
/// calculate next update time
|
||||
const auto & lifetime = current->getLifetime();
|
||||
std::uniform_int_distribution<UInt64> distribution{lifetime.min_sec, lifetime.max_sec};
|
||||
update_times[name] = std::chrono::system_clock::now() + std::chrono::seconds{distribution(rnd_engine)};
|
||||
|
||||
it->second.exception = exception;
|
||||
if (!exception)
|
||||
{
|
||||
it->second.loadable.reset();
|
||||
it->second.loadable = std::move(new_version);
|
||||
}
|
||||
else
|
||||
{
|
||||
tryLogCurrentException(log, "Cannot update " + object_name + " '" + name + "', leaving old version");
|
||||
if (throw_on_error)
|
||||
throw;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -233,6 +251,8 @@ void ExternalLoader::reloadFromConfigFiles(const bool throw_on_error, const bool
|
||||
}
|
||||
|
||||
/// erase removed from config loadable objects
|
||||
std::lock_guard lock{map_mutex};
|
||||
|
||||
std::list<std::string> removed_loadable_objects;
|
||||
for (const auto & loadable : loadable_objects)
|
||||
{
|
||||
@ -253,7 +273,7 @@ void ExternalLoader::reloadFromConfigFile(const std::string & config_path, const
|
||||
}
|
||||
else
|
||||
{
|
||||
std::unique_lock<std::mutex> all_lock(all_mutex);
|
||||
std::lock_guard all_lock(all_mutex);
|
||||
|
||||
auto modification_time_it = last_modification_times.find(config_path);
|
||||
if (modification_time_it == std::end(last_modification_times))
|
||||
@ -305,15 +325,15 @@ void ExternalLoader::reloadFromConfigFile(const std::string & config_path, const
|
||||
|
||||
decltype(loadable_objects.begin()) object_it;
|
||||
{
|
||||
std::lock_guard<std::mutex> lock{map_mutex};
|
||||
std::lock_guard lock{map_mutex};
|
||||
object_it = loadable_objects.find(name);
|
||||
}
|
||||
|
||||
/// Object with the same name was declared in other config file.
|
||||
if (object_it != std::end(loadable_objects) && object_it->second.origin != config_path)
|
||||
throw Exception(object_name + " '" + name + "' from file " + config_path
|
||||
+ " already declared in file " + object_it->second.origin,
|
||||
ErrorCodes::EXTERNAL_LOADABLE_ALREADY_EXISTS);
|
||||
/// Object with the same name was declared in other config file.
|
||||
if (object_it != std::end(loadable_objects) && object_it->second.origin != config_path)
|
||||
throw Exception(object_name + " '" + name + "' from file " + config_path
|
||||
+ " already declared in file " + object_it->second.origin,
|
||||
ErrorCodes::EXTERNAL_LOADABLE_ALREADY_EXISTS);
|
||||
}
|
||||
|
||||
auto object_ptr = create(name, *loaded_config, key);
|
||||
|
||||
@ -342,7 +362,7 @@ void ExternalLoader::reloadFromConfigFile(const std::string & config_path, const
|
||||
}
|
||||
}
|
||||
|
||||
const std::lock_guard<std::mutex> lock{map_mutex};
|
||||
std::lock_guard lock{map_mutex};
|
||||
|
||||
/// add new loadable object or update an existing version
|
||||
if (object_it == std::end(loadable_objects))
|
||||
@ -365,7 +385,7 @@ void ExternalLoader::reloadFromConfigFile(const std::string & config_path, const
|
||||
/// If the loadable object could not load data or even failed to initialize from the config.
|
||||
/// - all the same we insert information into the `loadable_objects`, with the zero pointer `loadable`.
|
||||
|
||||
const std::lock_guard<std::mutex> lock{map_mutex};
|
||||
std::lock_guard lock{map_mutex};
|
||||
|
||||
const auto exception_ptr = std::current_exception();
|
||||
const auto loadable_it = loadable_objects.find(name);
|
||||
@ -397,14 +417,14 @@ void ExternalLoader::reload(const std::string & name)
|
||||
reloadFromConfigFiles(true, true, name);
|
||||
|
||||
/// Check that specified object was loaded
|
||||
const std::lock_guard<std::mutex> lock{map_mutex};
|
||||
std::lock_guard lock{map_mutex};
|
||||
if (!loadable_objects.count(name))
|
||||
throw Exception("Failed to load " + object_name + " '" + name + "' during the reload process", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
|
||||
ExternalLoader::LoadablePtr ExternalLoader::getLoadableImpl(const std::string & name, bool throw_on_error) const
|
||||
{
|
||||
const std::lock_guard<std::mutex> lock{map_mutex};
|
||||
std::lock_guard lock{map_mutex};
|
||||
|
||||
const auto it = loadable_objects.find(name);
|
||||
if (it == std::end(loadable_objects))
|
||||
|
@ -132,6 +132,9 @@ private:
|
||||
bool is_initialized = false;
|
||||
|
||||
/// Protects only objects map.
|
||||
/** Reading and assignment of "loadable" should be done under mutex.
|
||||
* Creating new versions of "loadable" should not be done under mutex.
|
||||
*/
|
||||
mutable std::mutex map_mutex;
|
||||
|
||||
/// Protects all data, currently used to avoid races between updating thread and SYSTEM queries
|
||||
|
@ -37,16 +37,10 @@ CC=clang CXX=clang++ cmake -D SANITIZE=thread ..
|
||||
ninja
|
||||
```
|
||||
|
||||
## Copy binary to your server
|
||||
|
||||
```
|
||||
scp ./dbms/programs/clickhouse yourserver:~/clickhouse-tsan
|
||||
```
|
||||
|
||||
## Start ClickHouse and run tests
|
||||
|
||||
```
|
||||
sudo -u clickhouse TSAN_OPTIONS='halt_on_error=1' ./clickhouse-tsan server --config /etc/clickhouse-server/config.xml
|
||||
sudo -u clickhouse TSAN_OPTIONS='halt_on_error=1,suppressions=../dbms/tests/tsan_suppressions.txt' ./clickhouse-tsan server --config /etc/clickhouse-server/config.xml
|
||||
```
|
||||
|
||||
|
||||
@ -63,12 +57,6 @@ cmake -D SANITIZE=undefined ..
|
||||
ninja
|
||||
```
|
||||
|
||||
## Copy binary to your server
|
||||
|
||||
```
|
||||
scp ./dbms/programs/clickhouse yourserver:~/clickhouse-ubsan
|
||||
```
|
||||
|
||||
## Start ClickHouse and run tests
|
||||
|
||||
```
|
||||
|
2
dbms/tests/tsan_suppressions.txt
Normal file
2
dbms/tests/tsan_suppressions.txt
Normal file
@ -0,0 +1,2 @@
|
||||
# libc++
|
||||
race:locale
|
Loading…
Reference in New Issue
Block a user