2018-11-27 16:11:46 +00:00
# include "ExternalLoader.h"
2019-06-02 12:11:01 +00:00
# include <mutex>
# include <pcg_random.hpp>
# include <Common/Config/AbstractConfigurationComparison.h>
2017-10-06 11:10:01 +00:00
# include <Common/Exception.h>
2019-06-02 12:11:01 +00:00
# include <Common/StringUtils/StringUtils.h>
# include <Common/ThreadPool.h>
# include <Common/randomSeed.h>
2017-10-06 11:10:01 +00:00
# include <Common/setThreadName.h>
2019-11-09 19:14:51 +00:00
# include <ext/chrono_io.h>
2017-10-06 10:31:06 +00:00
# include <ext/scope_guard.h>
2019-06-02 12:11:01 +00:00
2017-10-06 10:31:06 +00:00
namespace DB
{
namespace ErrorCodes
{
2019-06-02 12:11:01 +00:00
extern const int LOGICAL_ERROR ;
extern const int BAD_ARGUMENTS ;
2017-10-06 10:31:06 +00:00
}
2019-10-22 12:57:58 +00:00
namespace
{
/// Lock mutex only in async mode
/// In other case does nothing
struct LoadingGuardForAsyncLoad
{
std : : unique_lock < std : : mutex > lock ;
LoadingGuardForAsyncLoad ( bool async , std : : mutex & mutex )
{
if ( async )
lock = std : : unique_lock ( mutex ) ;
}
} ;
}
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
struct ExternalLoader : : ObjectConfig
2017-10-06 10:31:06 +00:00
{
2019-06-02 12:11:01 +00:00
String config_path ;
Poco : : AutoPtr < Poco : : Util : : AbstractConfiguration > config ;
String key_in_config ;
2019-10-17 13:05:12 +00:00
String repository_name ;
2019-06-02 12:11:01 +00:00
} ;
2017-10-06 10:31:06 +00:00
2019-09-30 16:12:08 +00:00
/** Reads configurations from configuration repository and parses it.
2019-06-02 12:11:01 +00:00
*/
2019-09-30 16:12:08 +00:00
class ExternalLoader : : LoadablesConfigReader : private boost : : noncopyable
2017-10-06 10:31:06 +00:00
{
2019-06-02 12:11:01 +00:00
public :
2019-09-30 16:12:08 +00:00
LoadablesConfigReader ( const String & type_name_ , Logger * log_ )
: type_name ( type_name_ ) , log ( log_ )
2017-10-06 10:31:06 +00:00
{
2019-06-02 12:11:01 +00:00
}
2019-09-30 16:12:08 +00:00
~ LoadablesConfigReader ( ) = default ;
2017-10-06 10:31:06 +00:00
2019-10-18 15:44:32 +00:00
void addConfigRepository (
const String & name ,
std : : unique_ptr < IExternalLoaderConfigRepository > repository ,
const ExternalLoaderConfigSettings & settings )
2019-06-02 12:11:01 +00:00
{
std : : lock_guard lock { mutex } ;
2019-10-15 18:04:17 +00:00
repositories . emplace ( name , std : : make_pair ( std : : move ( repository ) , settings ) ) ;
2017-10-06 10:31:06 +00:00
}
2019-10-15 18:04:17 +00:00
void removeConfigRepository ( const String & name )
{
std : : lock_guard lock { mutex } ;
repositories . erase ( name ) ;
}
2019-09-30 11:18:01 +00:00
using ObjectConfigsPtr = std : : shared_ptr < const std : : unordered_map < String /* object's name */ , ObjectConfig > > ;
2017-10-06 10:31:06 +00:00
2019-10-18 15:44:32 +00:00
2019-10-16 14:59:52 +00:00
/// Reads configurations.
2019-09-30 11:18:01 +00:00
ObjectConfigsPtr read ( )
2019-06-02 12:11:01 +00:00
{
2019-10-18 15:44:32 +00:00
std : : lock_guard lock ( mutex ) ;
2019-06-02 12:11:01 +00:00
// Check last modification times of files and read those files which are new or changed.
2019-09-30 16:12:08 +00:00
if ( ! readLoadablesInfos ( ) )
2019-06-02 12:11:01 +00:00
return configs ; // Nothing changed, so we can return the previous result.
2019-04-10 18:36:52 +00:00
2019-10-18 15:44:32 +00:00
return collectConfigs ( ) ;
}
2019-10-21 13:54:23 +00:00
ObjectConfig updateLoadableInfo (
2019-10-18 15:44:32 +00:00
const String & external_name ,
const String & object_name ,
const String & repo_name ,
const Poco : : AutoPtr < Poco : : Util : : AbstractConfiguration > & config ,
const String & key )
{
std : : lock_guard lock ( mutex ) ;
auto it = loadables_infos . find ( object_name ) ;
if ( it = = loadables_infos . end ( ) )
{
LoadablesInfos loadable_info ;
loadables_infos [ object_name ] = loadable_info ;
}
auto & loadable_info = loadables_infos [ object_name ] ;
ObjectConfig object_config { object_name , config , key , repo_name } ;
bool found = false ;
for ( auto iter = loadable_info . configs . begin ( ) ; iter ! = loadable_info . configs . end ( ) ; + + iter )
{
if ( iter - > first = = external_name )
{
iter - > second = object_config ;
found = true ;
break ;
}
}
if ( ! found )
loadable_info . configs . emplace_back ( external_name , object_config ) ;
loadable_info . last_update_time = Poco : : Timestamp { } ; /// now
loadable_info . in_use = true ;
2019-10-21 13:54:23 +00:00
return object_config ;
2019-10-18 15:44:32 +00:00
}
private :
struct LoadablesInfos
{
Poco : : Timestamp last_update_time = 0 ;
std : : vector < std : : pair < String , ObjectConfig > > configs ; // Parsed loadable's contents.
bool in_use = true ; // Whether the `LoadablesInfos` should be destroyed because the correspondent loadable is deleted.
} ;
/// Collect current configurations
ObjectConfigsPtr collectConfigs ( )
{
2019-06-02 12:11:01 +00:00
// Generate new result.
auto new_configs = std : : make_shared < std : : unordered_map < String /* object's name */ , ObjectConfig > > ( ) ;
2019-10-17 17:18:54 +00:00
for ( const auto & [ path , loadable_info ] : loadables_infos )
2019-06-02 12:11:01 +00:00
{
2019-09-30 16:12:08 +00:00
for ( const auto & [ name , config ] : loadable_info . configs )
2019-06-02 12:11:01 +00:00
{
auto already_added_it = new_configs - > find ( name ) ;
if ( already_added_it ! = new_configs - > end ( ) )
{
const auto & already_added = already_added_it - > second ;
LOG_WARNING ( log , path < < " : " < < type_name < < " ' " < < name < < " ' is found "
< < ( ( path = = already_added . config_path )
? ( " twice in the same file " )
: ( " both in file ' " + already_added . config_path + " ' and ' " + path + " ' " ) ) ) ;
continue ;
}
new_configs - > emplace ( name , config ) ;
}
}
2017-10-06 11:10:01 +00:00
2019-06-02 12:11:01 +00:00
configs = new_configs ;
return configs ;
}
2019-04-10 18:36:52 +00:00
2019-09-30 16:12:08 +00:00
/// Read files and store them to the map ` loadables_infos`.
bool readLoadablesInfos ( )
2017-10-06 10:31:06 +00:00
{
2019-06-02 12:11:01 +00:00
bool changed = false ;
2019-09-30 16:12:08 +00:00
for ( auto & name_and_loadable_info : loadables_infos )
2019-06-02 12:11:01 +00:00
{
2019-09-30 16:12:08 +00:00
LoadablesInfos & loadable_info = name_and_loadable_info . second ;
loadable_info . in_use = false ;
2019-06-02 12:11:01 +00:00
}
2019-10-17 13:05:12 +00:00
for ( const auto & [ repo_name , repo_with_settings ] : repositories )
2019-06-02 12:11:01 +00:00
{
2019-10-15 18:04:17 +00:00
const auto names = repo_with_settings . first - > getAllLoadablesDefinitionNames ( ) ;
2019-10-17 09:05:48 +00:00
for ( const auto & loadable_name : names )
2019-06-02 12:11:01 +00:00
{
2019-10-17 09:05:48 +00:00
auto it = loadables_infos . find ( loadable_name ) ;
2019-10-16 17:06:52 +00:00
if ( it ! = loadables_infos . end ( ) )
2019-06-02 12:11:01 +00:00
{
2019-09-30 16:12:08 +00:00
LoadablesInfos & loadable_info = it - > second ;
2019-10-17 13:05:12 +00:00
if ( readLoadablesInfo ( repo_name , * repo_with_settings . first , loadable_name , repo_with_settings . second , loadable_info ) )
2019-06-02 12:11:01 +00:00
changed = true ;
}
else
{
2019-09-30 16:12:08 +00:00
LoadablesInfos loadable_info ;
2019-10-17 13:05:12 +00:00
if ( readLoadablesInfo ( repo_name , * repo_with_settings . first , loadable_name , repo_with_settings . second , loadable_info ) )
2019-06-02 12:11:01 +00:00
{
2019-10-17 09:05:48 +00:00
loadables_infos . emplace ( loadable_name , std : : move ( loadable_info ) ) ;
2019-06-02 12:11:01 +00:00
changed = true ;
}
}
}
}
2017-10-06 10:31:06 +00:00
2019-10-16 17:06:52 +00:00
std : : vector < String > deleted_names ;
2019-09-30 16:12:08 +00:00
for ( auto & [ path , loadable_info ] : loadables_infos )
if ( ! loadable_info . in_use )
2019-10-16 17:06:52 +00:00
deleted_names . emplace_back ( path ) ;
if ( ! deleted_names . empty ( ) )
2019-06-02 12:11:01 +00:00
{
2019-10-16 17:06:52 +00:00
for ( const String & deleted_name : deleted_names )
loadables_infos . erase ( deleted_name ) ;
2019-06-02 12:11:01 +00:00
changed = true ;
}
return changed ;
2017-10-06 10:31:06 +00:00
}
2019-09-30 16:12:08 +00:00
bool readLoadablesInfo (
2019-10-17 13:05:12 +00:00
const String & repo_name ,
2019-09-30 16:12:08 +00:00
IExternalLoaderConfigRepository & repository ,
2019-10-17 17:18:54 +00:00
const String & object_name ,
2019-06-02 12:11:01 +00:00
const ExternalLoaderConfigSettings & settings ,
2019-09-30 16:12:08 +00:00
LoadablesInfos & loadable_info ) const
2019-06-02 12:11:01 +00:00
{
try
{
2019-10-17 17:18:54 +00:00
if ( object_name . empty ( ) | | ! repository . exists ( object_name ) )
2019-06-02 12:11:01 +00:00
{
2019-10-17 17:18:54 +00:00
LOG_WARNING ( log , " Config file ' " + object_name + " ' does not exist " ) ;
2019-06-02 12:11:01 +00:00
return false ;
}
2017-10-06 10:31:06 +00:00
2019-10-17 17:18:54 +00:00
auto update_time_from_repository = repository . getUpdateTime ( object_name ) ;
2019-10-01 08:58:47 +00:00
/// Actually it can't be less, but for sure we check less or equal
if ( update_time_from_repository < = loadable_info . last_update_time )
2019-06-02 12:11:01 +00:00
{
2019-09-30 16:12:08 +00:00
loadable_info . in_use = true ;
2019-06-02 12:11:01 +00:00
return false ;
}
2017-10-06 10:31:06 +00:00
2019-10-17 17:18:54 +00:00
auto file_contents = repository . load ( object_name ) ;
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
/// get all objects' definitions
Poco : : Util : : AbstractConfiguration : : Keys keys ;
file_contents - > keys ( keys ) ;
2017-10-06 10:31:06 +00:00
2019-10-17 17:18:54 +00:00
/// for each object defined in repositories
2019-06-02 12:11:01 +00:00
std : : vector < std : : pair < String , ObjectConfig > > configs_from_file ;
for ( const auto & key : keys )
{
if ( ! startsWith ( key , settings . external_config ) )
{
if ( ! startsWith ( key , " comment " ) & & ! startsWith ( key , " include_from " ) )
2019-10-17 17:18:54 +00:00
LOG_WARNING ( log , object_name < < " : file contains unknown node ' " < < key < < " ', expected ' " < < settings . external_config < < " ' " ) ;
2019-06-02 12:11:01 +00:00
continue ;
}
2019-10-16 14:59:52 +00:00
String external_name = file_contents - > getString ( key + " . " + settings . external_name ) ;
if ( external_name . empty ( ) )
2019-06-02 12:11:01 +00:00
{
2019-10-17 17:18:54 +00:00
LOG_WARNING ( log , object_name < < " : node ' " < < key < < " ' defines " < < type_name < < " with an empty name. It's not allowed " ) ;
2019-06-02 12:11:01 +00:00
continue ;
}
2019-10-17 17:18:54 +00:00
configs_from_file . emplace_back ( external_name , ObjectConfig { object_name , file_contents , key , repo_name } ) ;
2019-06-02 12:11:01 +00:00
}
2019-09-30 16:12:08 +00:00
loadable_info . configs = std : : move ( configs_from_file ) ;
2019-10-01 08:58:47 +00:00
loadable_info . last_update_time = update_time_from_repository ;
2019-09-30 16:12:08 +00:00
loadable_info . in_use = true ;
2019-06-02 12:11:01 +00:00
return true ;
}
catch ( . . . )
{
2019-10-17 17:18:54 +00:00
tryLogCurrentException ( log , " Failed to load config for dictionary ' " + object_name + " ' " ) ;
2019-06-02 12:11:01 +00:00
return false ;
}
}
2019-10-18 15:44:32 +00:00
2019-06-02 12:11:01 +00:00
const String type_name ;
Logger * log ;
2019-04-10 18:36:52 +00:00
2019-06-02 12:11:01 +00:00
std : : mutex mutex ;
2019-10-15 18:04:17 +00:00
using RepositoryPtr = std : : unique_ptr < IExternalLoaderConfigRepository > ;
using RepositoryWithSettings = std : : pair < RepositoryPtr , ExternalLoaderConfigSettings > ;
std : : unordered_map < String , RepositoryWithSettings > repositories ;
2019-09-30 11:18:01 +00:00
ObjectConfigsPtr configs ;
2019-09-30 16:12:08 +00:00
std : : unordered_map < String /* config path */ , LoadablesInfos > loadables_infos ;
2019-06-02 12:11:01 +00:00
} ;
2017-10-06 10:31:06 +00:00
2019-09-30 16:12:08 +00:00
/** Manages loading and reloading objects. Uses configurations from the class LoadablesConfigReader.
2019-06-02 12:11:01 +00:00
* Supports parallel loading .
*/
class ExternalLoader : : LoadingDispatcher : private boost : : noncopyable
{
public :
/// Called to load or reload an object.
2019-07-17 08:39:36 +00:00
using CreateObjectFunction = std : : function < LoadablePtr (
2019-12-04 23:53:06 +00:00
const String & /* name */ , const ObjectConfig & /* config */ , const LoadablePtr & /* previous_version */ ) > ;
2019-06-02 12:11:01 +00:00
LoadingDispatcher (
const CreateObjectFunction & create_object_function_ ,
const String & type_name_ ,
Logger * log_ )
: create_object ( create_object_function_ )
, type_name ( type_name_ )
, log ( log_ )
2017-10-06 10:31:06 +00:00
{
2019-06-02 12:11:01 +00:00
}
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
~ LoadingDispatcher ( )
{
std : : unique_lock lock { mutex } ;
infos . clear ( ) ; /// We clear this map to tell the threads that we don't want any load results anymore.
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
/// Wait for all the threads to finish.
while ( ! loading_ids . empty ( ) )
2017-10-06 10:31:06 +00:00
{
2019-06-02 12:11:01 +00:00
auto it = loading_ids . begin ( ) ;
auto thread = std : : move ( it - > second ) ;
loading_ids . erase ( it ) ;
lock . unlock ( ) ;
event . notify_all ( ) ;
thread . join ( ) ;
lock . lock ( ) ;
}
}
2019-09-30 16:12:08 +00:00
using ObjectConfigsPtr = LoadablesConfigReader : : ObjectConfigsPtr ;
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
/// Sets new configurations for all the objects.
2019-09-30 11:18:01 +00:00
void setConfiguration ( const ObjectConfigsPtr & new_configs )
2019-06-02 12:11:01 +00:00
{
std : : lock_guard lock { mutex } ;
if ( configs = = new_configs )
return ;
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
configs = new_configs ;
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
std : : vector < String > removed_names ;
for ( auto & [ name , info ] : infos )
{
auto new_config_it = new_configs - > find ( name ) ;
if ( new_config_it = = new_configs - > end ( ) )
removed_names . emplace_back ( name ) ;
2017-10-06 10:31:06 +00:00
else
{
2019-06-02 12:11:01 +00:00
const auto & new_config = new_config_it - > second ;
2019-10-21 14:07:47 +00:00
if ( ! isSameConfiguration ( * info . object_config . config , info . object_config . key_in_config , * new_config . config , new_config . key_in_config ) )
2019-06-02 12:11:01 +00:00
{
/// Configuration has been changed.
2019-10-21 14:07:47 +00:00
info . object_config = new_config ;
2019-06-02 12:11:01 +00:00
info . config_changed = true ;
2019-09-27 12:36:54 +00:00
if ( info . wasLoading ( ) )
2019-06-02 12:11:01 +00:00
{
/// The object has been tried to load before, so it is currently in use or was in use
/// and we should try to reload it with the new config.
cancelLoading ( info ) ;
startLoading ( name , info ) ;
}
}
}
}
/// Insert to the map those objects which added to the new configuration.
for ( const auto & [ name , config ] : * new_configs )
{
if ( infos . find ( name ) = = infos . end ( ) )
{
Info & info = infos . emplace ( name , Info { config } ) . first - > second ;
if ( always_load_everything )
startLoading ( name , info ) ;
}
}
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
/// Remove from the map those objects which were removed from the configuration.
for ( const String & name : removed_names )
infos . erase ( name ) ;
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
/// Maybe we have just added new objects which require to be loaded
/// or maybe we have just removed object which were been loaded,
/// so we should notify `event` to recheck conditions in load() and loadAll() now.
event . notify_all ( ) ;
}
2017-10-06 10:31:06 +00:00
2019-10-21 13:54:23 +00:00
void setSingleObjectConfigurationWithoutLoading ( const String & external_name , const ObjectConfig & config )
{
std : : lock_guard lock { mutex } ;
infos . emplace ( external_name , Info { config } ) ;
}
2019-06-02 12:11:01 +00:00
/// Sets whether all the objects from the configuration should be always loaded (even if they aren't used).
void enableAlwaysLoadEverything ( bool enable )
{
std : : lock_guard lock { mutex } ;
if ( always_load_everything = = enable )
return ;
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
always_load_everything = enable ;
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
if ( enable )
2017-10-06 10:31:06 +00:00
{
2019-06-02 12:11:01 +00:00
/// Start loading all the objects which were not loaded yet.
for ( auto & [ name , info ] : infos )
2019-09-27 12:36:54 +00:00
if ( ! info . wasLoading ( ) )
2019-06-02 12:11:01 +00:00
startLoading ( name , info ) ;
}
}
/// Sets whether the objects should be loaded asynchronously, each loading in a new thread (from the thread pool).
void enableAsyncLoading ( bool enable )
{
enable_async_loading = enable ;
}
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
/// Returns the status of the object.
/// If the object has not been loaded yet then the function returns Status::NOT_LOADED.
/// If the specified name isn't found in the configuration then the function returns Status::NOT_EXIST.
Status getCurrentStatus ( const String & name ) const
{
std : : lock_guard lock { mutex } ;
const Info * info = getInfo ( name ) ;
if ( ! info )
return Status : : NOT_EXIST ;
return info - > status ( ) ;
}
2018-12-30 01:09:06 +00:00
2019-06-02 12:11:01 +00:00
/// Returns the load result of the object.
LoadResult getCurrentLoadResult ( const String & name ) const
2017-10-06 10:31:06 +00:00
{
2019-06-02 12:11:01 +00:00
std : : lock_guard lock { mutex } ;
const Info * info = getInfo ( name ) ;
if ( ! info )
return { Status : : NOT_EXIST } ;
2019-09-27 12:36:54 +00:00
return info - > loadResult ( ) ;
2019-06-02 12:11:01 +00:00
}
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
/// Returns all the load results as a map.
/// The function doesn't load anything, it just returns the current load results as is.
2019-09-27 12:36:54 +00:00
LoadResults getCurrentLoadResults ( const FilterByNameFunction & filter_by_name ) const
2019-06-02 12:11:01 +00:00
{
std : : lock_guard lock { mutex } ;
return collectLoadResults ( filter_by_name ) ;
}
2019-09-27 12:36:54 +00:00
LoadResults getCurrentLoadResults ( ) const { return getCurrentLoadResults ( allNames ) ; }
2019-06-02 12:11:01 +00:00
/// Returns all the loaded objects as a map.
/// The function doesn't load anything, it just returns the current load results as is.
2019-09-27 12:36:54 +00:00
Loadables getCurrentlyLoadedObjects ( const FilterByNameFunction & filter_by_name ) const
2019-06-02 12:11:01 +00:00
{
std : : lock_guard lock { mutex } ;
return collectLoadedObjects ( filter_by_name ) ;
}
2019-09-27 12:36:54 +00:00
Loadables getCurrentlyLoadedObjects ( ) const { return getCurrentlyLoadedObjects ( allNames ) ; }
2019-06-02 12:11:01 +00:00
size_t getNumberOfCurrentlyLoadedObjects ( ) const
{
std : : lock_guard lock { mutex } ;
size_t count = 0 ;
for ( const auto & name_and_info : infos )
2017-10-06 10:31:06 +00:00
{
2019-06-02 12:11:01 +00:00
const auto & info = name_and_info . second ;
if ( info . loaded ( ) )
+ + count ;
}
return count ;
}
2019-03-04 18:19:48 +00:00
2019-06-29 17:27:32 +00:00
bool hasCurrentlyLoadedObjects ( ) const
{
std : : lock_guard lock { mutex } ;
2019-07-02 17:24:22 +00:00
for ( auto & name_info : infos )
if ( name_info . second . loaded ( ) )
2019-06-29 17:27:32 +00:00
return true ;
return false ;
}
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
/// Tries to load a specified object during the timeout.
/// Returns nullptr if the loading is unsuccessful or if there is no such object.
void load ( const String & name , LoadablePtr & loaded_object , Duration timeout = NO_TIMEOUT )
{
std : : unique_lock lock { mutex } ;
Info * info = loadImpl ( name , timeout , lock ) ;
loaded_object = ( info ? info - > object : nullptr ) ;
}
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
/// Tries to finish loading of a specified object during the timeout.
/// Returns nullptr if the loading is unsuccessful or if there is no such object.
void loadStrict ( const String & name , LoadablePtr & loaded_object )
{
std : : unique_lock lock { mutex } ;
Info * info = loadImpl ( name , NO_TIMEOUT , lock ) ;
if ( ! info )
throw Exception ( " No such " + type_name + " ' " + name + " '. " , ErrorCodes : : BAD_ARGUMENTS ) ;
checkLoaded ( name , * info ) ;
loaded_object = info - > object ;
}
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
/// Tries to start loading of the objects for which the specified functor returns true.
2019-09-27 12:36:54 +00:00
void load ( const FilterByNameFunction & filter_by_name )
2019-06-02 12:11:01 +00:00
{
std : : lock_guard lock { mutex } ;
for ( auto & [ name , info ] : infos )
2019-09-27 12:36:54 +00:00
if ( ! info . wasLoading ( ) & & filter_by_name ( name ) )
2019-06-02 12:11:01 +00:00
startLoading ( name , info ) ;
}
/// Tries to finish loading of the objects for which the specified function returns true.
2019-09-27 12:36:54 +00:00
void load ( const FilterByNameFunction & filter_by_name , Loadables & loaded_objects , Duration timeout = NO_TIMEOUT )
2019-06-02 12:11:01 +00:00
{
std : : unique_lock lock { mutex } ;
loadImpl ( filter_by_name , timeout , lock ) ;
loaded_objects = collectLoadedObjects ( filter_by_name ) ;
}
2019-10-17 13:05:12 +00:00
/// Tries to finish loading of the objects for which the specified function returns true.
void load ( const FilterByNameFunction & filter_by_name , LoadResults & loaded_results , Duration timeout = NO_TIMEOUT )
{
std : : unique_lock lock { mutex } ;
loadImpl ( filter_by_name , timeout , lock ) ;
loaded_results = collectLoadResults ( filter_by_name ) ;
}
2019-06-02 12:11:01 +00:00
/// Tries to finish loading of all the objects during the timeout.
2019-09-27 12:36:54 +00:00
void load ( Loadables & loaded_objects , Duration timeout = NO_TIMEOUT ) { load ( allNames , loaded_objects , timeout ) ; }
2019-10-17 13:05:12 +00:00
void load ( LoadResults & loaded_results , Duration timeout = NO_TIMEOUT ) { load ( allNames , loaded_results , timeout ) ; }
2019-06-02 12:11:01 +00:00
/// Starts reloading a specified object.
2019-10-21 13:54:23 +00:00
void reload ( const String & name , bool load_never_loading = false )
2019-06-02 12:11:01 +00:00
{
std : : lock_guard lock { mutex } ;
Info * info = getInfo ( name ) ;
if ( ! info )
2019-10-18 15:44:32 +00:00
{
2019-06-02 12:11:01 +00:00
return ;
2019-10-18 15:44:32 +00:00
}
2019-06-02 12:11:01 +00:00
2019-09-27 12:36:54 +00:00
if ( info - > wasLoading ( ) | | load_never_loading )
2019-06-02 12:11:01 +00:00
{
cancelLoading ( * info ) ;
info - > forced_to_reload = true ;
2019-10-21 13:54:23 +00:00
startLoading ( name , * info ) ;
2019-06-02 12:11:01 +00:00
}
}
/// Starts reloading of the objects which `filter_by_name` returns true for.
2019-09-27 12:36:54 +00:00
void reload ( const FilterByNameFunction & filter_by_name , bool load_never_loading = false )
2019-06-02 12:11:01 +00:00
{
std : : lock_guard lock { mutex } ;
for ( auto & [ name , info ] : infos )
2019-07-19 19:37:48 +00:00
{
2019-09-27 12:36:54 +00:00
if ( ( info . wasLoading ( ) | | load_never_loading ) & & filter_by_name ( name ) )
2019-03-04 18:19:48 +00:00
{
2019-06-02 12:11:01 +00:00
cancelLoading ( info ) ;
info . forced_to_reload = true ;
startLoading ( name , info ) ;
}
2019-07-19 19:37:48 +00:00
}
2019-06-02 12:11:01 +00:00
}
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
/// Starts reloading of all the objects.
2019-09-27 12:36:54 +00:00
void reload ( bool load_never_loading = false ) { reload ( allNames , load_never_loading ) ; }
2019-06-02 12:11:01 +00:00
/// Starts reloading all the object which update time is earlier than now.
/// The function doesn't touch the objects which were never tried to load.
void reloadOutdated ( )
{
2019-12-04 15:23:05 +00:00
/// Iterate through all the objects and find loaded ones which should be checked if they need update.
std : : unordered_map < LoadablePtr , bool > should_update_map ;
2019-07-16 17:47:18 +00:00
{
std : : lock_guard lock { mutex } ;
TimePoint now = std : : chrono : : system_clock : : now ( ) ;
for ( const auto & name_and_info : infos )
2019-06-02 12:11:01 +00:00
{
2019-07-16 17:47:18 +00:00
const auto & info = name_and_info . second ;
2019-07-19 21:03:25 +00:00
if ( ( now > = info . next_update_time ) & & ! info . loading ( ) & & info . loaded ( ) )
2019-12-04 16:20:24 +00:00
should_update_map . emplace ( info . object , info . failedToReload ( ) ) ;
2019-07-16 17:47:18 +00:00
}
}
2019-07-19 21:01:20 +00:00
/// Find out which of the loaded objects were modified.
2019-12-04 15:23:05 +00:00
/// We couldn't perform these checks while we were building `should_update_map` because
2019-09-27 12:36:54 +00:00
/// the `mutex` should be unlocked while we're calling the function object->isModified()
2019-12-04 15:23:05 +00:00
for ( auto & [ object , should_update_flag ] : should_update_map )
2019-07-16 17:47:18 +00:00
{
try
{
2019-12-04 15:23:05 +00:00
/// Maybe alredy true, if we have an exception
if ( ! should_update_flag )
should_update_flag = object - > isModified ( ) ;
2019-03-04 18:19:48 +00:00
}
2019-07-16 17:47:18 +00:00
catch ( . . . )
{
tryLogCurrentException ( log , " Could not check if " + type_name + " ' " + object - > getName ( ) + " ' was modified " ) ;
2019-12-04 15:23:05 +00:00
/// Cannot check isModified, so update
should_update_flag = true ;
2019-07-16 17:47:18 +00:00
}
}
2019-07-19 21:01:20 +00:00
/// Iterate through all the objects again and either start loading or just set `next_update_time`.
2019-07-16 17:47:18 +00:00
{
std : : lock_guard lock { mutex } ;
TimePoint now = std : : chrono : : system_clock : : now ( ) ;
for ( auto & [ name , info ] : infos )
2019-07-19 19:37:48 +00:00
{
2019-07-19 21:03:25 +00:00
if ( ( now > = info . next_update_time ) & & ! info . loading ( ) )
2019-07-16 17:47:18 +00:00
{
2019-07-19 21:03:25 +00:00
if ( info . loaded ( ) )
{
2019-12-04 15:23:05 +00:00
auto it = should_update_map . find ( info . object ) ;
if ( it = = should_update_map . end ( ) )
continue ; /// Object has been just loaded (it wasn't loaded while we were building the map `should_update_map`), so we don't have to reload it right now.
bool should_update_flag = it - > second ;
if ( ! should_update_flag )
2019-07-19 21:03:25 +00:00
{
2019-09-27 12:36:54 +00:00
info . next_update_time = calculateNextUpdateTime ( info . object , info . error_count ) ;
2019-07-19 21:03:25 +00:00
continue ;
}
2019-07-19 21:01:20 +00:00
2019-12-04 16:20:24 +00:00
/// Object was modified or it was failed to reload last time, so it should be reloaded.
2019-07-19 21:01:20 +00:00
startLoading ( name , info ) ;
}
else if ( info . failed ( ) )
{
/// Object was never loaded successfully and should be reloaded.
startLoading ( name , info ) ;
2019-07-19 21:03:25 +00:00
}
2019-07-16 17:47:18 +00:00
}
2019-07-19 19:37:48 +00:00
}
2019-07-16 17:47:18 +00:00
}
2018-12-30 01:09:06 +00:00
}
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
private :
struct Info
2019-03-05 16:01:13 +00:00
{
2019-10-21 14:07:47 +00:00
Info ( const ObjectConfig & object_config_ ) : object_config ( object_config_ ) { }
2019-06-02 12:11:01 +00:00
bool loaded ( ) const { return object ! = nullptr ; }
bool failed ( ) const { return ! object & & exception ; }
bool loading ( ) const { return loading_id ! = 0 ; }
2019-09-27 12:36:54 +00:00
bool wasLoading ( ) const { return loaded ( ) | | failed ( ) | | loading ( ) ; }
2019-06-02 12:11:01 +00:00
bool ready ( ) const { return ( loaded ( ) | | failed ( ) ) & & ! forced_to_reload ; }
2019-12-04 16:20:24 +00:00
bool failedToReload ( ) const { return loaded ( ) & & exception ! = nullptr ; }
2019-06-02 12:11:01 +00:00
Status status ( ) const
{
if ( object )
return loading ( ) ? Status : : LOADED_AND_RELOADING : Status : : LOADED ;
else if ( exception )
return loading ( ) ? Status : : FAILED_AND_RELOADING : Status : : FAILED ;
else
return loading ( ) ? Status : : LOADING : Status : : NOT_LOADED ;
}
2019-09-27 12:36:54 +00:00
Duration loadingDuration ( ) const
2019-06-02 12:11:01 +00:00
{
if ( loading ( ) )
return std : : chrono : : duration_cast < Duration > ( std : : chrono : : system_clock : : now ( ) - loading_start_time ) ;
return std : : chrono : : duration_cast < Duration > ( loading_end_time - loading_start_time ) ;
}
2019-09-27 12:36:54 +00:00
LoadResult loadResult ( ) const
2019-06-02 12:11:01 +00:00
{
LoadResult result { status ( ) } ;
result . object = object ;
result . exception = exception ;
result . loading_start_time = loading_start_time ;
2019-09-27 12:36:54 +00:00
result . loading_duration = loadingDuration ( ) ;
2019-10-21 14:07:47 +00:00
result . origin = object_config . config_path ;
result . repository_name = object_config . repository_name ;
2019-06-02 12:11:01 +00:00
return result ;
}
2019-10-21 14:07:47 +00:00
ObjectConfig object_config ;
2019-06-02 12:11:01 +00:00
LoadablePtr object ;
TimePoint loading_start_time ;
TimePoint loading_end_time ;
size_t loading_id = 0 ; /// Non-zero if it's loading right now.
size_t error_count = 0 ; /// Numbers of errors since last successful loading.
std : : exception_ptr exception ; /// Last error occurred.
bool config_changed = false ; /// Whether the config has been change since last successful loading.
bool forced_to_reload = false ; /// Whether the current reloading is forced, i.e. caused by user's direction. For periodic reloading and reloading due to a config's change `forced_to_reload == false`.
TimePoint next_update_time = TimePoint : : max ( ) ; /// Time of the next update, `TimePoint::max()` means "never".
2019-03-05 16:01:13 +00:00
} ;
2019-06-02 12:11:01 +00:00
Info * getInfo ( const String & name )
2018-12-30 01:09:06 +00:00
{
2019-06-02 12:11:01 +00:00
auto it = infos . find ( name ) ;
if ( it = = infos . end ( ) )
return nullptr ;
return & it - > second ;
}
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
const Info * getInfo ( const String & name ) const
{
auto it = infos . find ( name ) ;
if ( it = = infos . end ( ) )
return nullptr ;
return & it - > second ;
}
2019-09-27 12:36:54 +00:00
Loadables collectLoadedObjects ( const FilterByNameFunction & filter_by_name ) const
2019-06-02 12:11:01 +00:00
{
Loadables objects ;
objects . reserve ( infos . size ( ) ) ;
for ( const auto & [ name , info ] : infos )
if ( info . loaded ( ) & & filter_by_name ( name ) )
objects . emplace_back ( info . object ) ;
return objects ;
}
2019-09-27 12:36:54 +00:00
LoadResults collectLoadResults ( const FilterByNameFunction & filter_by_name ) const
2019-06-02 12:11:01 +00:00
{
LoadResults load_results ;
load_results . reserve ( infos . size ( ) ) ;
for ( const auto & [ name , info ] : infos )
2019-10-17 13:05:12 +00:00
{
2019-06-02 12:11:01 +00:00
if ( filter_by_name ( name ) )
2019-09-27 12:36:54 +00:00
load_results . emplace_back ( name , info . loadResult ( ) ) ;
2019-10-17 13:05:12 +00:00
}
2019-06-02 12:11:01 +00:00
return load_results ;
}
Info * loadImpl ( const String & name , Duration timeout , std : : unique_lock < std : : mutex > & lock )
{
Info * info ;
auto pred = [ & ] ( )
2018-12-30 01:09:06 +00:00
{
2019-06-02 12:11:01 +00:00
info = getInfo ( name ) ;
if ( ! info | | info - > ready ( ) )
return true ;
if ( ! info - > loading ( ) )
startLoading ( name , * info ) ;
return info - > ready ( ) ;
} ;
if ( timeout = = NO_TIMEOUT )
event . wait ( lock , pred ) ;
else
event . wait_for ( lock , timeout , pred ) ;
return info ;
}
2019-09-27 12:36:54 +00:00
void loadImpl ( const FilterByNameFunction & filter_by_name , Duration timeout , std : : unique_lock < std : : mutex > & lock )
2019-06-02 12:11:01 +00:00
{
auto pred = [ & ] ( )
{
bool all_ready = true ;
for ( auto & [ name , info ] : infos )
2019-03-05 16:01:13 +00:00
{
2019-06-02 12:11:01 +00:00
if ( info . ready ( ) | | ! filter_by_name ( name ) )
continue ;
if ( ! info . loading ( ) )
startLoading ( name , info ) ;
if ( ! info . ready ( ) )
all_ready = false ;
2019-03-05 16:01:13 +00:00
}
2019-06-02 12:11:01 +00:00
return all_ready ;
} ;
if ( timeout = = NO_TIMEOUT )
event . wait ( lock , pred ) ;
else
event . wait_for ( lock , timeout , pred ) ;
}
2019-10-21 13:54:23 +00:00
void startLoading ( const String & name , Info & info )
2019-06-02 12:11:01 +00:00
{
if ( info . loading ( ) )
return ;
/// All loadings have unique loading IDs.
size_t loading_id = next_loading_id + + ;
info . loading_id = loading_id ;
info . loading_start_time = std : : chrono : : system_clock : : now ( ) ;
info . loading_end_time = TimePoint { } ;
2019-10-21 13:54:23 +00:00
if ( enable_async_loading )
2019-06-02 12:11:01 +00:00
{
/// Put a job to the thread pool for the loading.
auto thread = ThreadFromGlobalPool { & LoadingDispatcher : : doLoading , this , name , loading_id , true } ;
loading_ids . try_emplace ( loading_id , std : : move ( thread ) ) ;
2017-10-06 10:31:06 +00:00
}
2019-06-02 12:11:01 +00:00
else
2017-10-06 10:31:06 +00:00
{
2019-06-02 12:11:01 +00:00
/// Perform the loading immediately.
doLoading ( name , loading_id , false ) ;
2018-12-30 01:09:06 +00:00
}
2019-06-02 12:11:01 +00:00
}
2018-12-30 01:09:06 +00:00
2019-10-22 12:57:58 +00:00
/// Load one object, returns object ptr or exception
/// Do not require locking
2017-10-06 10:31:06 +00:00
2019-10-22 12:57:58 +00:00
std : : pair < LoadablePtr , std : : exception_ptr > loadOneObject (
const String & name ,
const ObjectConfig & config ,
LoadablePtr previous_version )
{
2019-06-02 12:11:01 +00:00
LoadablePtr new_object ;
std : : exception_ptr new_exception ;
2017-10-06 10:31:06 +00:00
try
{
2019-12-04 23:53:06 +00:00
new_object = create_object ( name , config , previous_version ) ;
2017-10-06 10:31:06 +00:00
}
catch ( . . . )
{
2019-06-02 12:11:01 +00:00
new_exception = std : : current_exception ( ) ;
}
2019-10-22 12:57:58 +00:00
return std : : make_pair ( new_object , new_exception ) ;
2019-06-02 12:11:01 +00:00
2019-10-22 12:57:58 +00:00
}
2017-10-06 10:31:06 +00:00
2019-10-22 12:57:58 +00:00
/// Return single object info, checks loading_id and name
std : : optional < Info > getSingleObjectInfo ( const String & name , size_t loading_id , bool async )
{
LoadingGuardForAsyncLoad lock ( async , mutex ) ;
Info * info = getInfo ( name ) ;
if ( ! info | | ! info - > loading ( ) | | ( info - > loading_id ! = loading_id ) )
return { } ;
2019-10-17 17:36:53 +00:00
2019-10-22 12:57:58 +00:00
return * info ;
}
2019-10-22 13:41:17 +00:00
/// Removes object loading_id from loading_ids if it present
/// in other case do nothin should by done with lock
void finishObjectLoading ( size_t loading_id , const LoadingGuardForAsyncLoad & )
{
auto it = loading_ids . find ( loading_id ) ;
if ( it ! = loading_ids . end ( ) )
{
it - > second . detach ( ) ;
loading_ids . erase ( it ) ;
}
}
2019-10-22 12:57:58 +00:00
/// Process loading result
/// Calculates next update time and process errors
void processLoadResult (
const String & name ,
size_t loading_id ,
LoadablePtr previous_version ,
LoadablePtr new_object ,
std : : exception_ptr new_exception ,
size_t error_count ,
bool async )
{
LoadingGuardForAsyncLoad lock ( async , mutex ) ;
2019-06-02 12:11:01 +00:00
/// Calculate a new update time.
TimePoint next_update_time ;
try
{
if ( new_exception )
+ + error_count ;
else
error_count = 0 ;
2019-10-17 17:36:53 +00:00
2019-10-23 13:02:40 +00:00
LoadablePtr object = previous_version ;
if ( new_object )
object = new_object ;
next_update_time = calculateNextUpdateTime ( object , error_count ) ;
2019-06-02 12:11:01 +00:00
}
catch ( . . . )
{
tryLogCurrentException ( log , " Cannot find out when the " + type_name + " ' " + name + " ' should be updated " ) ;
next_update_time = TimePoint : : max ( ) ;
2017-10-06 10:31:06 +00:00
}
2018-08-10 01:41:54 +00:00
2019-10-23 09:27:34 +00:00
2019-10-22 12:57:58 +00:00
Info * info = getInfo ( name ) ;
2019-06-02 12:11:01 +00:00
/// And again we should check if this is still the same loading as we were doing.
/// This is necessary because the object could be removed or load with another config while the `mutex` was unlocked.
if ( ! info | | ! info - > loading ( ) | | ( info - > loading_id ! = loading_id ) )
return ;
2019-10-23 09:40:09 +00:00
if ( new_exception )
2019-04-10 18:36:52 +00:00
{
2019-06-02 12:11:01 +00:00
auto next_update_time_description = [ next_update_time ]
{
if ( next_update_time = = TimePoint : : max ( ) )
return String ( ) ;
2019-11-09 19:14:51 +00:00
return " , next update is scheduled at " + ext : : to_string ( next_update_time ) ;
2019-06-02 12:11:01 +00:00
} ;
if ( previous_version )
tryLogException ( new_exception , log , " Could not update " + type_name + " ' " + name + " ' "
" , leaving the previous version " + next_update_time_description ( ) ) ;
else
tryLogException ( new_exception , log , " Could not load " + type_name + " ' " + name + " ' " + next_update_time_description ( ) ) ;
2019-04-10 18:36:52 +00:00
}
2019-06-02 12:11:01 +00:00
if ( new_object )
info - > object = new_object ;
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
info - > exception = new_exception ;
info - > error_count = error_count ;
info - > loading_end_time = std : : chrono : : system_clock : : now ( ) ;
info - > loading_id = 0 ;
info - > next_update_time = next_update_time ;
2019-04-10 18:36:52 +00:00
2019-06-02 12:11:01 +00:00
info - > forced_to_reload = false ;
if ( new_object )
info - > config_changed = false ;
2019-10-22 13:41:17 +00:00
finishObjectLoading ( loading_id , lock ) ;
2019-10-22 12:57:58 +00:00
}
2019-04-15 19:53:46 +00:00
2019-10-22 12:57:58 +00:00
/// Does the loading, possibly in the separate thread.
void doLoading ( const String & name , size_t loading_id , bool async )
{
2019-10-22 13:41:17 +00:00
try
{
/// We check here if this is exactly the same loading as we planned to perform.
/// This check is necessary because the object could be removed or load with another config before this thread even starts.
std : : optional < Info > info = getSingleObjectInfo ( name , loading_id , async ) ;
if ( ! info )
return ;
2019-10-22 12:57:58 +00:00
2019-10-22 13:41:17 +00:00
/// Use `create_function` to perform the actual loading.
/// It's much better to do it with `mutex` unlocked because the loading can take a lot of time
/// and require access to other objects.
2019-12-04 23:53:06 +00:00
bool need_complete_loading = ! info - > object | | info - > config_changed | | info - > forced_to_reload ;
auto [ new_object , new_exception ] = loadOneObject ( name , info - > object_config , need_complete_loading ? nullptr : info - > object ) ;
2019-10-22 13:41:17 +00:00
if ( ! new_object & & ! new_exception )
throw Exception ( " No object created and no exception raised for " + type_name , ErrorCodes : : LOGICAL_ERROR ) ;
2019-10-22 12:57:58 +00:00
2019-10-22 13:41:17 +00:00
processLoadResult ( name , loading_id , info - > object , new_object , new_exception , info - > error_count , async ) ;
event . notify_all ( ) ;
}
catch ( . . . )
{
LoadingGuardForAsyncLoad lock ( async , mutex ) ;
finishObjectLoading ( loading_id , lock ) ;
throw ;
}
2019-06-02 12:11:01 +00:00
}
void cancelLoading ( const String & name )
2017-10-06 10:31:06 +00:00
{
2019-06-02 12:11:01 +00:00
Info * info = getInfo ( name ) ;
if ( info )
cancelLoading ( * info ) ;
2017-10-06 10:31:06 +00:00
}
2019-06-02 12:11:01 +00:00
void cancelLoading ( Info & info )
2017-10-06 10:31:06 +00:00
{
2019-06-02 12:11:01 +00:00
if ( ! info . loading ( ) )
return ;
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
/// In fact we cannot actually CANCEL the loading (because it's possibly already being performed in another thread).
/// But we can reset the `loading_id` and doLoading() will understand it as a signal to stop loading.
info . loading_id = 0 ;
info . loading_end_time = std : : chrono : : system_clock : : now ( ) ;
}
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
void checkLoaded ( const String & name , const Info & info )
{
if ( info . loaded ( ) )
return ;
if ( info . loading ( ) )
throw Exception ( type_name + " ' " + name + " ' is still loading. " , ErrorCodes : : BAD_ARGUMENTS ) ;
if ( info . failed ( ) )
std : : rethrow_exception ( info . exception ) ;
}
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
/// Filter by name which matches everything.
2019-09-27 12:36:54 +00:00
static bool allNames ( const String & ) { return true ; }
/// Calculate next update time for loaded_object. Can be called without mutex locking,
/// because single loadable can be loaded in single thread only.
TimePoint calculateNextUpdateTime ( const LoadablePtr & loaded_object , size_t error_count ) const
{
static constexpr auto never = TimePoint : : max ( ) ;
2019-10-23 13:02:40 +00:00
if ( loaded_object )
2019-09-27 12:36:54 +00:00
{
if ( ! loaded_object - > supportUpdates ( ) )
return never ;
/// do not update loadable objects with zero as lifetime
const auto & lifetime = loaded_object - > getLifetime ( ) ;
2019-12-02 13:05:43 +00:00
if ( lifetime . min_sec = = 0 & & lifetime . max_sec = = 0 )
2019-09-27 12:36:54 +00:00
return never ;
2019-10-23 13:02:40 +00:00
if ( ! error_count )
{
std : : uniform_int_distribution < UInt64 > distribution { lifetime . min_sec , lifetime . max_sec } ;
return std : : chrono : : system_clock : : now ( ) + std : : chrono : : seconds { distribution ( rnd_engine ) } ;
}
2019-09-27 12:36:54 +00:00
}
return std : : chrono : : system_clock : : now ( ) + std : : chrono : : seconds ( calculateDurationWithBackoff ( rnd_engine , error_count ) ) ;
}
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
const CreateObjectFunction create_object ;
const String type_name ;
Logger * log ;
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
mutable std : : mutex mutex ;
std : : condition_variable event ;
2019-09-30 11:18:01 +00:00
ObjectConfigsPtr configs ;
2019-06-02 12:11:01 +00:00
std : : unordered_map < String , Info > infos ;
bool always_load_everything = false ;
2019-10-18 15:44:32 +00:00
std : : atomic < bool > enable_async_loading = false ;
2019-06-02 12:11:01 +00:00
std : : unordered_map < size_t , ThreadFromGlobalPool > loading_ids ;
size_t next_loading_id = 1 ; /// should always be > 0
2019-09-27 12:36:54 +00:00
mutable pcg64 rnd_engine { randomSeed ( ) } ;
2019-06-02 12:11:01 +00:00
} ;
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
class ExternalLoader : : PeriodicUpdater : private boost : : noncopyable
{
public :
2019-08-30 09:50:38 +00:00
static constexpr UInt64 check_period_sec = 5 ;
2019-09-30 16:12:08 +00:00
PeriodicUpdater ( LoadablesConfigReader & config_files_reader_ , LoadingDispatcher & loading_dispatcher_ )
2019-06-02 12:11:01 +00:00
: config_files_reader ( config_files_reader_ ) , loading_dispatcher ( loading_dispatcher_ )
{
}
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
~ PeriodicUpdater ( ) { enable ( false ) ; }
2017-10-06 10:31:06 +00:00
2019-08-30 09:50:38 +00:00
void enable ( bool enable_ )
2019-06-02 12:11:01 +00:00
{
std : : unique_lock lock { mutex } ;
enabled = enable_ ;
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
if ( enable_ )
{
if ( ! thread . joinable ( ) )
{
/// Starts the thread which will do periodic updates.
thread = ThreadFromGlobalPool { & PeriodicUpdater : : doPeriodicUpdates , this } ;
}
}
else
{
if ( thread . joinable ( ) )
{
/// Wait for the thread to finish.
auto temp_thread = std : : move ( thread ) ;
lock . unlock ( ) ;
event . notify_one ( ) ;
temp_thread . join ( ) ;
2019-04-10 18:36:52 +00:00
}
}
}
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
private :
void doPeriodicUpdates ( )
{
setThreadName ( " ExterLdrReload " ) ;
std : : unique_lock lock { mutex } ;
auto pred = [ this ] { return ! enabled ; } ;
2019-08-30 09:50:38 +00:00
while ( ! event . wait_for ( lock , std : : chrono : : seconds ( check_period_sec ) , pred ) )
2019-06-02 12:11:01 +00:00
{
lock . unlock ( ) ;
loading_dispatcher . setConfiguration ( config_files_reader . read ( ) ) ;
loading_dispatcher . reloadOutdated ( ) ;
lock . lock ( ) ;
}
}
2019-04-15 19:53:46 +00:00
2019-09-30 16:12:08 +00:00
LoadablesConfigReader & config_files_reader ;
2019-06-02 12:11:01 +00:00
LoadingDispatcher & loading_dispatcher ;
mutable std : : mutex mutex ;
bool enabled = false ;
ThreadFromGlobalPool thread ;
std : : condition_variable event ;
} ;
2019-09-30 16:12:08 +00:00
ExternalLoader : : ExternalLoader ( const String & type_name_ , Logger * log )
: config_files_reader ( std : : make_unique < LoadablesConfigReader > ( type_name_ , log ) )
2019-06-02 12:11:01 +00:00
, loading_dispatcher ( std : : make_unique < LoadingDispatcher > (
2019-12-04 23:53:06 +00:00
std : : bind ( & ExternalLoader : : createObject , this , std : : placeholders : : _1 , std : : placeholders : : _2 , std : : placeholders : : _3 ) ,
2019-06-02 12:11:01 +00:00
type_name_ ,
log ) )
, periodic_updater ( std : : make_unique < PeriodicUpdater > ( * config_files_reader , * loading_dispatcher ) )
, type_name ( type_name_ )
{
2019-04-15 19:53:46 +00:00
}
2019-06-02 12:11:01 +00:00
ExternalLoader : : ~ ExternalLoader ( ) = default ;
2019-04-15 19:53:46 +00:00
2019-06-02 12:11:01 +00:00
void ExternalLoader : : addConfigRepository (
2019-10-15 18:04:17 +00:00
const std : : string & repository_name ,
std : : unique_ptr < IExternalLoaderConfigRepository > config_repository ,
const ExternalLoaderConfigSettings & config_settings )
{
config_files_reader - > addConfigRepository ( repository_name , std : : move ( config_repository ) , config_settings ) ;
loading_dispatcher - > setConfiguration ( config_files_reader - > read ( ) ) ;
}
void ExternalLoader : : removeConfigRepository ( const std : : string & repository_name )
2019-04-15 19:53:46 +00:00
{
2019-10-15 18:04:17 +00:00
config_files_reader - > removeConfigRepository ( repository_name ) ;
2019-06-02 12:11:01 +00:00
}
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
void ExternalLoader : : enableAlwaysLoadEverything ( bool enable )
{
loading_dispatcher - > enableAlwaysLoadEverything ( enable ) ;
}
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
void ExternalLoader : : enableAsyncLoading ( bool enable )
{
loading_dispatcher - > enableAsyncLoading ( enable ) ;
2019-04-10 18:36:52 +00:00
}
2017-10-06 10:31:06 +00:00
2019-08-30 09:50:38 +00:00
void ExternalLoader : : enablePeriodicUpdates ( bool enable_ )
2019-06-02 12:11:01 +00:00
{
2019-08-30 09:50:38 +00:00
periodic_updater - > enable ( enable_ ) ;
2019-06-02 12:11:01 +00:00
}
2017-10-06 10:31:06 +00:00
2019-06-29 17:27:32 +00:00
bool ExternalLoader : : hasCurrentlyLoadedObjects ( ) const
2019-06-02 12:11:01 +00:00
{
2019-06-29 17:27:32 +00:00
return loading_dispatcher - > hasCurrentlyLoadedObjects ( ) ;
2019-06-02 12:11:01 +00:00
}
2019-04-10 18:36:52 +00:00
2019-06-02 12:11:01 +00:00
ExternalLoader : : Status ExternalLoader : : getCurrentStatus ( const String & name ) const
{
return loading_dispatcher - > getCurrentStatus ( name ) ;
2019-04-10 18:36:52 +00:00
}
2019-06-02 12:11:01 +00:00
ExternalLoader : : LoadResult ExternalLoader : : getCurrentLoadResult ( const String & name ) const
{
return loading_dispatcher - > getCurrentLoadResult ( name ) ;
}
2019-04-10 18:36:52 +00:00
2019-06-02 12:11:01 +00:00
ExternalLoader : : LoadResults ExternalLoader : : getCurrentLoadResults ( ) const
2019-04-10 18:36:52 +00:00
{
2019-06-02 12:11:01 +00:00
return loading_dispatcher - > getCurrentLoadResults ( ) ;
}
2019-04-15 19:53:46 +00:00
2019-06-02 12:11:01 +00:00
ExternalLoader : : LoadResults ExternalLoader : : getCurrentLoadResults ( const FilterByNameFunction & filter_by_name ) const
{
return loading_dispatcher - > getCurrentLoadResults ( filter_by_name ) ;
}
2019-04-10 18:36:52 +00:00
2019-06-02 12:11:01 +00:00
ExternalLoader : : Loadables ExternalLoader : : getCurrentlyLoadedObjects ( ) const
{
return loading_dispatcher - > getCurrentlyLoadedObjects ( ) ;
}
2019-04-10 18:36:52 +00:00
2019-06-02 12:11:01 +00:00
ExternalLoader : : Loadables ExternalLoader : : getCurrentlyLoadedObjects ( const FilterByNameFunction & filter_by_name ) const
{
return loading_dispatcher - > getCurrentlyLoadedObjects ( filter_by_name ) ;
}
2019-04-10 18:36:52 +00:00
2019-06-02 12:11:01 +00:00
size_t ExternalLoader : : getNumberOfCurrentlyLoadedObjects ( ) const
{
return loading_dispatcher - > getNumberOfCurrentlyLoadedObjects ( ) ;
}
2019-04-10 18:36:52 +00:00
2019-06-02 12:11:01 +00:00
void ExternalLoader : : load ( const String & name , LoadablePtr & loaded_object , Duration timeout ) const
{
loading_dispatcher - > load ( name , loaded_object , timeout ) ;
}
2019-04-10 18:36:52 +00:00
2019-06-02 12:11:01 +00:00
void ExternalLoader : : loadStrict ( const String & name , LoadablePtr & loaded_object ) const
{
loading_dispatcher - > loadStrict ( name , loaded_object ) ;
}
2019-04-10 18:36:52 +00:00
2019-06-02 12:11:01 +00:00
void ExternalLoader : : load ( const FilterByNameFunction & filter_by_name , Loadables & loaded_objects , Duration timeout ) const
{
if ( filter_by_name )
loading_dispatcher - > load ( filter_by_name , loaded_objects , timeout ) ;
else
loading_dispatcher - > load ( loaded_objects , timeout ) ;
}
2019-04-10 18:36:52 +00:00
2019-10-17 13:05:12 +00:00
void ExternalLoader : : load ( const FilterByNameFunction & filter_by_name , LoadResults & loaded_objects , Duration timeout ) const
{
if ( filter_by_name )
loading_dispatcher - > load ( filter_by_name , loaded_objects , timeout ) ;
else
loading_dispatcher - > load ( loaded_objects , timeout ) ;
}
2019-06-02 12:11:01 +00:00
void ExternalLoader : : load ( Loadables & loaded_objects , Duration timeout ) const
{
return loading_dispatcher - > load ( loaded_objects , timeout ) ;
}
2019-04-10 18:36:52 +00:00
2019-10-21 13:54:23 +00:00
void ExternalLoader : : reload ( const String & name , bool load_never_loading ) const
2019-06-02 12:11:01 +00:00
{
2019-10-18 15:44:32 +00:00
auto configs = config_files_reader - > read ( ) ;
loading_dispatcher - > setConfiguration ( configs ) ;
2019-10-21 13:54:23 +00:00
loading_dispatcher - > reload ( name , load_never_loading ) ;
2017-10-06 10:31:06 +00:00
}
2019-10-15 18:04:17 +00:00
void ExternalLoader : : reload ( bool load_never_loading ) const
2017-10-06 10:31:06 +00:00
{
2019-06-02 12:11:01 +00:00
loading_dispatcher - > setConfiguration ( config_files_reader - > read ( ) ) ;
loading_dispatcher - > reload ( load_never_loading ) ;
}
2017-10-06 10:31:06 +00:00
2019-11-25 22:48:23 +00:00
void ExternalLoader : : reload ( const FilterByNameFunction & filter_by_name , bool load_never_loading ) const
{
loading_dispatcher - > setConfiguration ( config_files_reader - > read ( ) ) ;
loading_dispatcher - > reload ( filter_by_name , load_never_loading ) ;
}
2019-10-21 13:54:23 +00:00
void ExternalLoader : : addObjectAndLoad (
2019-10-18 15:44:32 +00:00
const String & name ,
const String & external_name ,
const String & repo_name ,
const Poco : : AutoPtr < Poco : : Util : : AbstractConfiguration > & config ,
const String & key ,
2019-10-21 13:54:23 +00:00
bool load_never_loading ) const
2019-10-18 15:44:32 +00:00
{
2019-10-21 13:54:23 +00:00
auto object_config = config_files_reader - > updateLoadableInfo ( external_name , name , repo_name , config , key ) ;
loading_dispatcher - > setSingleObjectConfigurationWithoutLoading ( external_name , object_config ) ;
LoadablePtr loaded_object ;
if ( load_never_loading )
loading_dispatcher - > loadStrict ( name , loaded_object ) ;
else
loading_dispatcher - > load ( name , loaded_object , Duration : : zero ( ) ) ;
2019-10-18 15:44:32 +00:00
}
2019-07-17 08:39:36 +00:00
ExternalLoader : : LoadablePtr ExternalLoader : : createObject (
2019-12-04 23:53:06 +00:00
const String & name , const ObjectConfig & config , const LoadablePtr & previous_version ) const
2019-06-02 12:11:01 +00:00
{
2019-12-04 23:53:06 +00:00
if ( previous_version )
2019-07-17 08:39:36 +00:00
return previous_version - > clone ( ) ;
2017-10-06 10:31:06 +00:00
2019-07-17 08:39:36 +00:00
return create ( name , * config . config , config . key_in_config ) ;
2019-06-02 12:11:01 +00:00
}
2017-10-06 10:31:06 +00:00
2019-06-02 12:11:01 +00:00
std : : vector < std : : pair < String , Int8 > > ExternalLoader : : getStatusEnumAllPossibleValues ( )
2018-03-23 19:56:24 +00:00
{
2019-06-02 12:11:01 +00:00
return std : : vector < std : : pair < String , Int8 > > {
{ toString ( Status : : NOT_LOADED ) , static_cast < Int8 > ( Status : : NOT_LOADED ) } ,
{ toString ( Status : : LOADED ) , static_cast < Int8 > ( Status : : LOADED ) } ,
{ toString ( Status : : FAILED ) , static_cast < Int8 > ( Status : : FAILED ) } ,
{ toString ( Status : : LOADING ) , static_cast < Int8 > ( Status : : LOADING ) } ,
{ toString ( Status : : LOADED_AND_RELOADING ) , static_cast < Int8 > ( Status : : LOADED_AND_RELOADING ) } ,
{ toString ( Status : : FAILED_AND_RELOADING ) , static_cast < Int8 > ( Status : : FAILED_AND_RELOADING ) } ,
{ toString ( Status : : NOT_EXIST ) , static_cast < Int8 > ( Status : : NOT_EXIST ) } ,
} ;
2018-03-23 19:56:24 +00:00
}
2019-06-02 12:11:01 +00:00
String toString ( ExternalLoader : : Status status )
2018-03-23 19:56:24 +00:00
{
2019-06-02 12:11:01 +00:00
using Status = ExternalLoader : : Status ;
switch ( status )
{
case Status : : NOT_LOADED : return " NOT_LOADED " ;
case Status : : LOADED : return " LOADED " ;
case Status : : FAILED : return " FAILED " ;
case Status : : LOADING : return " LOADING " ;
case Status : : FAILED_AND_RELOADING : return " FAILED_AND_RELOADING " ;
case Status : : LOADED_AND_RELOADING : return " LOADED_AND_RELOADING " ;
case Status : : NOT_EXIST : return " NOT_EXIST " ;
}
__builtin_unreachable ( ) ;
2018-03-23 19:56:24 +00:00
}
2019-06-02 12:11:01 +00:00
std : : ostream & operator < < ( std : : ostream & out , ExternalLoader : : Status status )
2017-10-06 11:10:01 +00:00
{
2019-06-02 12:11:01 +00:00
return out < < toString ( status ) ;
2017-10-06 11:10:01 +00:00
}
2017-10-06 10:31:06 +00:00
}