Better doLoading locking logic

This commit is contained in:
alesapin 2019-10-22 15:57:58 +03:00
parent d3461f9d46
commit ae42dd1cea

View File

@ -20,6 +20,60 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
namespace
{
/// RAII wrapper for LoadingDispatcher::doLoading() method.
/// Remove information about loading in destructor
struct LoadingIdsCleaner
{
bool async;
std::mutex & mutex;
size_t loading_id;
std::unordered_map<size_t, ThreadFromGlobalPool> & loading_ids;
LoadingIdsCleaner(
bool async_,
std::mutex & mutex_,
size_t loading_id_,
std::unordered_map<size_t, ThreadFromGlobalPool> & loading_ids_)
: async(async_)
, mutex(mutex_)
, loading_id(loading_id_)
, loading_ids(loading_ids_)
{
}
~LoadingIdsCleaner()
{
if (async)
{
std::lock_guard lock(mutex);
/// Remove the information about the thread after it finishes.
/// Should be done with lock
auto it = loading_ids.find(loading_id);
if (it != loading_ids.end())
{
it->second.detach();
loading_ids.erase(it);
}
}
}
};
/// Lock mutex only in async mode
/// In other case does nothing
struct LoadingGuardForAsyncLoad
{
std::unique_lock<std::mutex> lock;
LoadingGuardForAsyncLoad(bool async, std::mutex & mutex)
{
if (async)
lock = std::unique_lock(mutex);
}
};
}
struct ExternalLoader::ObjectConfig
{
@ -765,48 +819,15 @@ private:
}
}
/// Does the loading, possibly in the separate thread.
void doLoading(const String & name, size_t loading_id, bool async)
/// Load one object, returns object ptr or exception
/// Do not require locking
std::pair<LoadablePtr, std::exception_ptr> loadOneObject(
const String & name,
const ObjectConfig & config,
bool config_changed,
LoadablePtr previous_version)
{
std::unique_lock<std::mutex> lock;
if (async)
{
setThreadName("ExterLdrJob");
lock = std::unique_lock{mutex}; /// If `async == false` the mutex is already locked.
}
SCOPE_EXIT({
if (async)
{
if (!lock.owns_lock())
lock.lock();
/// Remove the information about the thread after it finishes.
auto it = loading_ids.find(loading_id);
if (it != loading_ids.end())
{
it->second.detach();
loading_ids.erase(it);
}
}
});
/// We check here if this is exactly the same loading as we planned to perform.
/// This check is necessary because the object could be removed or load with another config before this thread even starts.
Info * info = getInfo(name);
if (!info || !info->loading() || (info->loading_id != loading_id))
return;
ObjectConfig config = info->object_config;
bool config_changed = info->config_changed;
LoadablePtr previous_version = info->object;
size_t error_count = info->error_count;
/// Use `create_function` to perform the actual loading.
/// It's much better to do it with `mutex` unlocked because the loading can take a lot of time
/// and require access to other objects.
if (async)
lock.unlock();
LoadablePtr new_object;
std::exception_ptr new_exception;
try
@ -817,16 +838,33 @@ private:
{
new_exception = std::current_exception();
}
return std::make_pair(new_object, new_exception);
if (!new_object && !new_exception)
throw Exception("No object created and no exception raised for " + type_name, ErrorCodes::LOGICAL_ERROR);
}
/// Lock the mutex again to store the changes.
if (async)
lock.lock();
else if (new_exception)
std::rethrow_exception(new_exception);
/// Return single object info, checks loading_id and name
std::optional<Info> getSingleObjectInfo(const String & name, size_t loading_id, bool async)
{
LoadingGuardForAsyncLoad lock(async, mutex);
Info * info = getInfo(name);
if (!info || !info->loading() || (info->loading_id != loading_id))
return {};
return *info;
}
/// Process loading result
/// Calculates next update time and process errors
void processLoadResult(
const String & name,
size_t loading_id,
LoadablePtr previous_version,
LoadablePtr new_object,
std::exception_ptr new_exception,
size_t error_count,
bool async)
{
LoadingGuardForAsyncLoad lock(async, mutex);
/// Calculate a new update time.
TimePoint next_update_time;
try
@ -844,7 +882,7 @@ private:
next_update_time = TimePoint::max();
}
info = getInfo(name);
Info * info = getInfo(name);
/// And again we should check if this is still the same loading as we were doing.
/// This is necessary because the object could be removed or load with another config while the `mutex` was unlocked.
@ -879,10 +917,33 @@ private:
info->forced_to_reload = false;
if (new_object)
info->config_changed = false;
}
/// Notify `event` to recheck conditions in loadImpl() now.
if (async)
lock.unlock();
/// Does the loading, possibly in the separate thread.
void doLoading(const String & name, size_t loading_id, bool async)
{
/// We should clean loading_id when we finish, even in case of exceptions
LoadingIdsCleaner cleaner(async, mutex, loading_id, loading_ids);
/// We check here if this is exactly the same loading as we planned to perform.
/// This check is necessary because the object could be removed or load with another config before this thread even starts.
std::optional<Info> info = getSingleObjectInfo(name, loading_id, async);
if (!info)
return;
/// Use `create_function` to perform the actual loading.
/// It's much better to do it with `mutex` unlocked because the loading can take a lot of time
/// and require access to other objects.
auto [new_object, new_exception] = loadOneObject(name, info->object_config, info->config_changed, info->object);
if (!new_object && !new_exception)
throw Exception("No object created and no exception raised for " + type_name, ErrorCodes::LOGICAL_ERROR);
/// In synchronus mode we throw exception immediately
if (!async && new_exception)
std::rethrow_exception(new_exception);
processLoadResult(name, loading_id, info->object, new_object, new_exception, info->error_count, async);
event.notify_all();
}