2021-11-16 12:01:30 +00:00
# include <algorithm>
2021-11-17 11:32:20 +00:00
# include <atomic>
2021-11-16 12:01:30 +00:00
# include <chrono>
2021-11-17 11:32:20 +00:00
# include <condition_variable>
2021-12-07 10:35:42 +00:00
# include <mutex>
2021-11-17 11:32:20 +00:00
# include <unordered_map>
2021-11-16 12:01:30 +00:00
# include <unordered_set>
2021-11-15 14:52:52 +00:00
# include <base/getFQDNOrHostName.h>
2023-08-31 11:34:25 +00:00
# include <Common/Config/ConfigHelper.h>
2021-11-17 11:32:20 +00:00
# include <Common/Exception.h>
2023-08-31 11:34:25 +00:00
# include <Common/FailPoint.h>
# include <Common/logger_useful.h>
# include <Common/setThreadName.h>
2024-05-19 08:02:06 +00:00
# include <Common/StringUtils.h>
2023-08-31 11:34:25 +00:00
# include <Common/thread_local_rng.h>
2021-11-11 09:03:53 +00:00
# include <Common/ZooKeeper/Types.h>
2021-11-15 14:52:52 +00:00
2021-11-11 09:03:53 +00:00
# include <Core/ServerUUID.h>
# include <Interpreters/Cluster.h>
# include <Interpreters/ClusterDiscovery.h>
# include <Interpreters/Context.h>
2021-11-17 14:16:49 +00:00
# include <Poco/Exception.h>
# include <Poco/JSON/JSON.h>
# include <Poco/JSON/Object.h>
# include <Poco/JSON/Parser.h>
2021-11-11 09:03:53 +00:00
namespace DB
{
2021-11-17 11:32:20 +00:00
namespace ErrorCodes
2021-11-15 14:52:52 +00:00
{
2023-08-31 11:34:25 +00:00
extern const int KEEPER_EXCEPTION ;
2021-11-17 11:32:20 +00:00
extern const int LOGICAL_ERROR ;
2023-12-21 11:30:13 +00:00
extern const int NO_ELEMENTS_IN_CONFIG ;
2021-11-17 11:32:20 +00:00
}
2021-11-15 14:52:52 +00:00
2023-08-31 11:34:25 +00:00
namespace FailPoints
{
extern const char cluster_discovery_faults [ ] ;
}
2021-11-17 11:32:20 +00:00
namespace
{
2021-11-15 14:52:52 +00:00
2021-11-16 13:39:54 +00:00
fs : : path getShardsListPath ( const String & zk_root )
2021-11-15 14:52:52 +00:00
{
2021-11-16 13:39:54 +00:00
return fs : : path ( zk_root + " /shards " ) ;
2021-11-15 14:52:52 +00:00
}
}
2021-11-17 11:32:20 +00:00
/*
* Holds boolean flags for fixed set of keys .
* Flags can be concurrently set from different threads , and consumer can wait for it .
*/
template < typename T >
class ClusterDiscovery : : ConcurrentFlags
{
public :
template < typename It >
ConcurrentFlags ( It begin , It end )
{
for ( auto it = begin ; it ! = end ; + + it )
flags . emplace ( * it , false ) ;
}
2021-11-18 08:57:26 +00:00
void set ( const T & key )
2021-11-17 11:32:20 +00:00
{
2021-12-13 11:40:07 +00:00
auto it = flags . find ( key ) ;
if ( it = = flags . end ( ) )
throw DB : : Exception ( ErrorCodes : : LOGICAL_ERROR , " Unknown value '{}' " , key ) ;
it - > second = true ;
2021-12-07 10:35:42 +00:00
any_need_update = true ;
cv . notify_one ( ) ;
}
2021-11-17 11:32:20 +00:00
2021-12-07 10:35:42 +00:00
/// waits unit at least one flag is set
/// caller should handle all set flags (or set it again manually)
2021-12-13 11:40:07 +00:00
/// note: keys of returen map should not be changed!
2021-12-20 15:57:35 +00:00
/// @param finished - output parameter indicates that stop() was called
std : : unordered_map < T , std : : atomic_bool > & wait ( std : : chrono : : milliseconds timeout , bool & finished )
2021-11-17 11:32:20 +00:00
{
std : : unique_lock < std : : mutex > lk ( mu ) ;
2021-12-20 15:57:35 +00:00
cv . wait_for ( lk , timeout , [ this ] ( ) - > bool { return any_need_update | | stop_flag ; } ) ;
finished = stop_flag ;
2021-12-07 10:35:42 +00:00
/// all set flags expected to be handled by caller
any_need_update = false ;
return flags ;
2021-11-17 11:32:20 +00:00
}
2021-12-20 15:57:35 +00:00
void stop ( )
{
std : : unique_lock < std : : mutex > lk ( mu ) ;
stop_flag = true ;
cv . notify_one ( ) ;
}
2021-11-18 08:57:26 +00:00
2021-12-07 10:35:42 +00:00
private :
2021-11-17 11:32:20 +00:00
std : : condition_variable cv ;
std : : mutex mu ;
2021-12-07 10:35:42 +00:00
/// flag indicates that update is required
2021-11-17 11:32:20 +00:00
std : : unordered_map < T , std : : atomic_bool > flags ;
2021-12-07 10:35:42 +00:00
std : : atomic_bool any_need_update = true ;
2021-12-20 15:57:35 +00:00
bool stop_flag = false ;
2021-11-17 11:32:20 +00:00
} ;
2021-11-11 09:03:53 +00:00
ClusterDiscovery : : ClusterDiscovery (
const Poco : : Util : : AbstractConfiguration & config ,
2021-11-18 08:57:26 +00:00
ContextPtr context_ ,
2021-11-11 09:03:53 +00:00
const String & config_prefix )
2021-11-18 08:57:26 +00:00
: context ( Context : : createCopy ( context_ ) )
2021-11-25 13:45:38 +00:00
, current_node_name ( toString ( ServerUUID : : get ( ) ) )
2024-01-23 17:04:50 +00:00
, log ( getLogger ( " ClusterDiscovery " ) )
2021-11-11 09:03:53 +00:00
{
2021-11-25 13:45:38 +00:00
LOG_DEBUG ( log , " Cluster discovery is enabled " ) ;
2021-11-11 09:03:53 +00:00
Poco : : Util : : AbstractConfiguration : : Keys config_keys ;
config . keys ( config_prefix , config_keys ) ;
for ( const auto & key : config_keys )
{
2023-12-21 11:30:13 +00:00
String cluster_config_prefix = config_prefix + " . " + key + " .discovery " ;
if ( ! config . has ( cluster_config_prefix ) )
2021-11-18 09:45:57 +00:00
continue ;
2021-11-29 12:35:28 +00:00
2023-12-21 11:30:13 +00:00
String zk_root = config . getString ( cluster_config_prefix + " .path " ) ;
if ( zk_root . empty ( ) )
throw Exception ( ErrorCodes : : NO_ELEMENTS_IN_CONFIG , " ZooKeeper path for cluster '{}' is empty " , key ) ;
2024-01-08 16:40:00 +00:00
const auto & password = config . getString ( cluster_config_prefix + " .password " , " " ) ;
const auto & cluster_secret = config . getString ( cluster_config_prefix + " .secret " , " " ) ;
if ( ! password . empty ( ) & & ! cluster_secret . empty ( ) )
2024-01-11 08:39:49 +00:00
throw Exception ( ErrorCodes : : NO_ELEMENTS_IN_CONFIG , " Both 'password' and 'secret' are specified for cluster '{}', only one option can be used at the same time " , key ) ;
2024-01-08 16:40:00 +00:00
2021-11-29 12:35:28 +00:00
clusters_info . emplace (
key ,
ClusterInfo (
/* name_= */ key ,
2023-12-21 11:30:13 +00:00
/* zk_root_= */ zk_root ,
/* host_name= */ config . getString ( cluster_config_prefix + " .my_hostname " , getFQDNOrHostName ( ) ) ,
/* username= */ config . getString ( cluster_config_prefix + " .user " , context - > getUserName ( ) ) ,
2024-01-08 16:40:00 +00:00
/* password= */ password ,
/* cluster_secret= */ cluster_secret ,
2021-11-29 12:35:28 +00:00
/* port= */ context - > getTCPPort ( ) ,
2023-12-21 11:30:13 +00:00
/* secure= */ config . getBool ( cluster_config_prefix + " .secure " , false ) ,
/* shard_id= */ config . getUInt ( cluster_config_prefix + " .shard " , 0 ) ,
/* observer_mode= */ ConfigHelper : : getBool ( config , cluster_config_prefix + " .observer " ) ,
/* invisible= */ ConfigHelper : : getBool ( config , cluster_config_prefix + " .invisible " )
2021-11-29 12:35:28 +00:00
)
) ;
2021-11-11 09:03:53 +00:00
}
2022-08-12 11:05:13 +00:00
std : : vector < String > clusters_info_names ;
clusters_info_names . reserve ( clusters_info . size ( ) ) ;
for ( const auto & e : clusters_info )
clusters_info_names . emplace_back ( e . first ) ;
LOG_TRACE ( log , " Clusters in discovery mode: {} " , fmt : : join ( clusters_info_names , " , " ) ) ;
clusters_to_update = std : : make_shared < UpdateFlags > ( clusters_info_names . begin ( ) , clusters_info_names . end ( ) ) ;
2021-11-11 09:03:53 +00:00
}
2021-11-16 12:01:30 +00:00
/// List node in zookeper for cluster
2021-11-16 09:02:44 +00:00
Strings ClusterDiscovery : : getNodeNames ( zkutil : : ZooKeeperPtr & zk ,
const String & zk_root ,
const String & cluster_name ,
int * version ,
bool set_callback )
{
2023-05-13 00:57:31 +00:00
auto watch_callback = [ cluster_name , my_clusters_to_update = clusters_to_update ] ( auto ) { my_clusters_to_update - > set ( cluster_name ) ; } ;
2021-11-16 09:02:44 +00:00
Coordination : : Stat stat ;
2021-11-16 13:39:54 +00:00
Strings nodes = zk - > getChildrenWatch ( getShardsListPath ( zk_root ) , & stat , set_callback ? watch_callback : Coordination : : WatchCallback { } ) ;
2021-11-16 09:02:44 +00:00
if ( version )
* version = stat . cversion ;
return nodes ;
}
2021-11-17 11:32:20 +00:00
/// Reads node information from specified zookeeper nodes
/// On error returns empty result
2021-11-16 13:39:54 +00:00
ClusterDiscovery : : NodesInfo ClusterDiscovery : : getNodes ( zkutil : : ZooKeeperPtr & zk , const String & zk_root , const Strings & node_uuids )
2021-11-11 09:03:53 +00:00
{
2021-11-16 12:01:30 +00:00
NodesInfo result ;
2021-11-16 13:39:54 +00:00
for ( const auto & node_uuid : node_uuids )
2021-11-11 09:03:53 +00:00
{
2021-11-16 13:39:54 +00:00
String payload ;
2021-11-25 13:45:38 +00:00
bool ok = zk - > tryGet ( getShardsListPath ( zk_root ) / node_uuid , payload ) & &
NodeInfo : : parse ( payload , result [ node_uuid ] ) ;
if ( ! ok )
2021-11-17 14:16:49 +00:00
{
2021-11-25 13:45:38 +00:00
LOG_WARNING ( log , " Can't get data from node '{}' in '{}' " , node_uuid , zk_root ) ;
2021-11-16 13:39:54 +00:00
return { } ;
2021-11-17 14:16:49 +00:00
}
2021-11-11 09:03:53 +00:00
}
return result ;
}
2021-11-16 13:39:54 +00:00
/// Checks if cluster nodes set is changed.
/// Returns true if update required.
2021-11-16 12:01:30 +00:00
/// It performs only shallow check (set of nodes' uuids).
/// So, if node's hostname are changed, then cluster won't be updated.
bool ClusterDiscovery : : needUpdate ( const Strings & node_uuids , const NodesInfo & nodes )
2021-11-15 14:52:52 +00:00
{
2021-11-16 12:01:30 +00:00
bool has_difference = node_uuids . size ( ) ! = nodes . size ( ) | |
std : : any_of ( node_uuids . begin ( ) , node_uuids . end ( ) , [ & nodes ] ( auto u ) { return ! nodes . contains ( u ) ; } ) ;
2021-11-17 13:47:40 +00:00
{
2021-11-25 13:45:38 +00:00
/// Just to log updated nodes, suboptimal, but should be ok for expected update sizes
2021-11-17 13:47:40 +00:00
std : : set < String > new_names ( node_uuids . begin ( ) , node_uuids . end ( ) ) ;
std : : set < String > old_names ;
for ( const auto & [ name , _ ] : nodes )
old_names . emplace ( name ) ;
auto format_cluster_update = [ ] ( const std : : set < String > & s1 , const std : : set < String > & s2 )
{
std : : vector < String > diff ;
std : : set_difference ( s1 . begin ( ) , s1 . end ( ) , s2 . begin ( ) , s2 . end ( ) , std : : back_inserter ( diff ) ) ;
constexpr size_t max_to_show = 3 ;
size_t sz = diff . size ( ) ;
2021-11-25 13:45:38 +00:00
bool need_crop = sz > max_to_show ;
if ( need_crop )
2021-11-17 13:47:40 +00:00
diff . resize ( max_to_show ) ;
2021-11-25 13:45:38 +00:00
if ( sz = = 0 )
return fmt : : format ( " {} nodes " , sz ) ;
2021-11-29 11:25:52 +00:00
return fmt : : format ( " {} node{} [{}{}] " , sz , sz ! = 1 ? " s " : " " , fmt : : join ( diff , " , " ) , need_crop ? " ,... " : " " ) ;
2021-11-17 13:47:40 +00:00
} ;
2021-12-07 09:45:32 +00:00
LOG_DEBUG ( log , " Cluster update: added {}, removed {} " ,
2021-11-17 13:47:40 +00:00
format_cluster_update ( new_names , old_names ) ,
format_cluster_update ( old_names , new_names ) ) ;
}
2021-11-16 12:01:30 +00:00
return has_difference ;
}
2021-11-16 09:02:44 +00:00
2021-11-19 09:42:00 +00:00
ClusterPtr ClusterDiscovery : : makeCluster ( const ClusterInfo & cluster_info )
2021-11-16 12:01:30 +00:00
{
2022-11-02 15:28:39 +00:00
std : : vector < Strings > shards ;
2021-11-19 07:21:40 +00:00
{
2023-05-04 16:35:18 +00:00
std : : map < size_t , Strings > replica_addresses ;
2021-11-29 12:35:28 +00:00
for ( const auto & [ _ , node ] : cluster_info . nodes_info )
2021-11-19 07:21:40 +00:00
{
2021-11-29 12:35:28 +00:00
if ( cluster_info . current_node . secure ! = node . secure )
{
LOG_WARNING ( log , " Node '{}' in cluster '{}' has different 'secure' value, skipping it " , node . address , cluster_info . name ) ;
continue ;
}
2023-05-04 16:35:18 +00:00
replica_addresses [ node . shard_id ] . emplace_back ( node . address ) ;
2021-11-19 07:21:40 +00:00
}
2023-05-04 16:35:18 +00:00
shards . reserve ( replica_addresses . size ( ) ) ;
for ( auto & [ _ , replicas ] : replica_addresses )
2021-11-29 12:35:28 +00:00
shards . emplace_back ( std : : move ( replicas ) ) ;
2021-11-19 07:21:40 +00:00
}
2021-11-29 12:35:28 +00:00
bool secure = cluster_info . current_node . secure ;
2023-04-07 16:26:23 +00:00
ClusterConnectionParameters params {
2023-12-21 11:30:13 +00:00
/* username= */ cluster_info . username ,
/* password= */ cluster_info . password ,
2021-11-29 12:35:28 +00:00
/* clickhouse_port= */ secure ? context - > getTCPPortSecure ( ) . value_or ( DBMS_DEFAULT_SECURE_PORT ) : context - > getTCPPort ( ) ,
2021-11-19 07:21:40 +00:00
/* treat_local_as_remote= */ false ,
2022-11-02 15:28:39 +00:00
/* treat_local_port_as_remote= */ false , /// should be set only for clickhouse-local, but cluster discovery is not used there
2023-04-07 16:26:23 +00:00
/* secure= */ secure ,
2023-06-07 16:25:52 +00:00
/* priority= */ Priority { 1 } ,
2024-01-08 16:40:00 +00:00
/* cluster_name= */ cluster_info . name ,
/* cluster_secret= */ cluster_info . cluster_secret } ;
2023-04-07 16:26:23 +00:00
auto cluster = std : : make_shared < Cluster > (
context - > getSettingsRef ( ) ,
shards ,
params ) ;
2021-11-16 12:01:30 +00:00
return cluster ;
}
2021-11-11 09:03:53 +00:00
2022-08-09 17:53:32 +00:00
static bool contains ( const Strings & list , const String & value )
{
return std : : find ( list . begin ( ) , list . end ( ) , value ) ! = list . end ( ) ;
}
2021-11-16 12:01:30 +00:00
/// Reads data from zookeeper and tries to update cluster.
/// Returns true on success (or no update required).
bool ClusterDiscovery : : updateCluster ( ClusterInfo & cluster_info )
{
2021-12-07 09:45:32 +00:00
LOG_DEBUG ( log , " Updating cluster '{}' " , cluster_info . name ) ;
2021-11-16 12:01:30 +00:00
auto zk = context - > getZooKeeper ( ) ;
int start_version ;
Strings node_uuids = getNodeNames ( zk , cluster_info . zk_root , cluster_info . name , & start_version , false ) ;
2021-11-29 11:25:52 +00:00
auto & nodes_info = cluster_info . nodes_info ;
2021-11-16 12:01:30 +00:00
2022-08-17 14:46:06 +00:00
auto on_exit = [ this , start_version , & zk , & cluster_info , & nodes_info ] ( )
{
/// in case of successful update we still need to check if configuration of cluster still valid and also set watch callback
int current_version ;
getNodeNames ( zk , cluster_info . zk_root , cluster_info . name , & current_version , true ) ;
if ( current_version ! = start_version )
{
LOG_DEBUG ( log , " Cluster '{}' configuration changed during update " , cluster_info . name ) ;
nodes_info . clear ( ) ;
return false ;
}
return true ;
} ;
2022-08-09 17:53:32 +00:00
if ( ! cluster_info . current_node_is_observer & & ! contains ( node_uuids , current_node_name ) )
2021-11-16 12:01:30 +00:00
{
2021-11-25 13:45:38 +00:00
LOG_ERROR ( log , " Can't find current node in cluster '{}', will register again " , cluster_info . name ) ;
2021-11-16 12:01:30 +00:00
registerInZk ( zk , cluster_info ) ;
2021-11-29 11:25:52 +00:00
nodes_info . clear ( ) ;
2021-11-16 12:01:30 +00:00
return false ;
}
2023-05-20 04:41:48 +00:00
if ( cluster_info . current_cluster_is_invisible )
{
2024-02-13 23:21:13 +00:00
LOG_DEBUG ( log , " Cluster '{}' is invisible. " , cluster_info . name ) ;
2023-05-20 04:41:48 +00:00
return true ;
}
2021-11-16 12:01:30 +00:00
if ( ! needUpdate ( node_uuids , nodes_info ) )
{
2021-12-07 09:45:32 +00:00
LOG_DEBUG ( log , " No update required for cluster '{}' " , cluster_info . name ) ;
2022-08-17 14:46:06 +00:00
return on_exit ( ) ;
2021-11-16 12:01:30 +00:00
}
nodes_info = getNodes ( zk , cluster_info . zk_root , node_uuids ) ;
if ( nodes_info . empty ( ) )
2021-11-25 13:45:38 +00:00
{
LOG_WARNING ( log , " Can't get nodes info for '{}' " , cluster_info . name ) ;
2021-11-16 12:01:30 +00:00
return false ;
2021-11-25 13:45:38 +00:00
}
2021-11-16 12:01:30 +00:00
2022-08-17 14:46:06 +00:00
if ( bool ok = on_exit ( ) ; ! ok )
2021-11-16 12:01:30 +00:00
return false ;
2021-12-07 09:45:32 +00:00
LOG_DEBUG ( log , " Updating system.clusters record for '{}' with {} nodes " , cluster_info . name , cluster_info . nodes_info . size ( ) ) ;
2021-11-25 13:45:38 +00:00
2021-11-19 09:42:00 +00:00
auto cluster = makeCluster ( cluster_info ) ;
2023-04-14 19:41:26 +00:00
std : : lock_guard lock ( mutex ) ;
cluster_impls [ cluster_info . name ] = cluster ;
2021-11-16 09:02:44 +00:00
return true ;
2021-11-11 09:03:53 +00:00
}
2021-11-16 12:01:30 +00:00
void ClusterDiscovery : : registerInZk ( zkutil : : ZooKeeperPtr & zk , ClusterInfo & info )
{
2022-08-12 11:05:13 +00:00
/// Create root node in observer mode not to get 'No node' error
String node_path = getShardsListPath ( info . zk_root ) / current_node_name ;
zk - > createAncestors ( node_path ) ;
2022-08-09 17:53:32 +00:00
if ( info . current_node_is_observer )
{
LOG_DEBUG ( log , " Current node {} is observer of cluster {} " , current_node_name , info . name ) ;
return ;
}
2021-11-25 13:45:38 +00:00
LOG_DEBUG ( log , " Registering current node {} in cluster {} " , current_node_name , info . name ) ;
2021-11-19 09:42:00 +00:00
2021-11-29 12:35:28 +00:00
zk - > createOrUpdate ( node_path , info . current_node . serialize ( ) , zkutil : : CreateMode : : Ephemeral ) ;
2021-11-25 13:45:38 +00:00
LOG_DEBUG ( log , " Current node {} registered in cluster {} " , current_node_name , info . name ) ;
2021-11-15 14:52:52 +00:00
}
2021-12-27 12:41:09 +00:00
void ClusterDiscovery : : initialUpdate ( )
2021-11-11 09:03:53 +00:00
{
2023-08-31 11:33:01 +00:00
LOG_DEBUG ( log , " Initializing " ) ;
2023-08-31 11:34:25 +00:00
fiu_do_on ( FailPoints : : cluster_discovery_faults ,
{
constexpr UInt8 success_chance = 4 ;
2023-09-01 13:57:17 +00:00
static size_t fail_count = 0 ;
fail_count + + ;
/// strict limit on fail count to avoid flaky tests
2023-09-04 10:54:59 +00:00
auto is_failed = fail_count < success_chance & & std : : uniform_int_distribution < > ( 0 , success_chance ) ( thread_local_rng ) ! = 0 ;
2023-08-31 11:34:25 +00:00
if ( is_failed )
throw Exception ( ErrorCodes : : KEEPER_EXCEPTION , " Failpoint cluster_discovery_faults is triggered " ) ;
} ) ;
2021-11-19 09:42:00 +00:00
auto zk = context - > getZooKeeper ( ) ;
for ( auto & [ _ , info ] : clusters_info )
2021-11-18 08:57:26 +00:00
{
2021-11-19 09:42:00 +00:00
registerInZk ( zk , info ) ;
if ( ! updateCluster ( info ) )
2021-11-18 08:57:26 +00:00
{
2021-12-27 12:41:09 +00:00
LOG_WARNING ( log , " Error on initial cluster '{}' update, will retry in background " , info . name ) ;
2021-11-19 09:42:00 +00:00
clusters_to_update - > set ( info . name ) ;
2021-11-18 08:57:26 +00:00
}
}
2023-08-31 11:33:01 +00:00
LOG_DEBUG ( log , " Initialized " ) ;
is_initialized = true ;
2021-12-27 12:41:09 +00:00
}
void ClusterDiscovery : : start ( )
{
if ( clusters_info . empty ( ) )
{
LOG_DEBUG ( log , " No defined clusters for discovery " ) ;
return ;
}
try
{
initialUpdate ( ) ;
}
catch ( . . . )
{
tryLogCurrentException ( log , " Caught exception in cluster discovery initialization " ) ;
}
2021-11-18 08:57:26 +00:00
2021-12-10 13:23:28 +00:00
using namespace std : : chrono_literals ;
constexpr static std : : chrono : : milliseconds DEFAULT_BACKOFF_TIMEOUT = 10 ms ;
LOG_DEBUG ( log , " Starting working thread " ) ;
2021-11-19 09:42:00 +00:00
main_thread = ThreadFromGlobalPool ( [ this ]
{
2021-12-10 13:23:28 +00:00
std : : chrono : : milliseconds backoff_timeout = DEFAULT_BACKOFF_TIMEOUT ;
2021-12-07 09:20:33 +00:00
bool finish = false ;
while ( ! finish )
2021-11-19 09:42:00 +00:00
{
2021-12-07 09:20:33 +00:00
try
{
2021-12-20 15:43:11 +00:00
finish = runMainThread ( [ & backoff_timeout ] { backoff_timeout = DEFAULT_BACKOFF_TIMEOUT ; } ) ;
2021-12-07 09:20:33 +00:00
}
catch ( . . . )
{
/*
* it can be zk error ( will take new session ) or other retriable error ,
* should not stop discovery forever
*/
tryLogCurrentException ( log , " Caught exception in cluster discovery runMainThread " ) ;
}
2021-12-10 13:23:28 +00:00
std : : this_thread : : sleep_for ( backoff_timeout ) ;
backoff_timeout = std : : min ( backoff_timeout * 2 , std : : chrono : : milliseconds ( 3 min ) ) ;
2021-11-19 09:42:00 +00:00
}
} ) ;
}
2021-12-07 10:35:42 +00:00
/// Returns `true` on graceful shutdown (no restart required)
2021-12-20 15:43:11 +00:00
bool ClusterDiscovery : : runMainThread ( std : : function < void ( ) > up_to_date_callback )
2021-11-19 09:42:00 +00:00
{
setThreadName ( " ClusterDiscover " ) ;
LOG_DEBUG ( log , " Worker thread started " ) ;
2021-11-16 12:01:30 +00:00
using namespace std : : chrono_literals ;
2021-11-15 14:52:52 +00:00
2021-12-07 12:51:27 +00:00
constexpr auto force_update_interval = 2 min ;
2023-08-31 11:33:01 +00:00
if ( ! is_initialized )
initialUpdate ( ) ;
2021-12-20 15:57:35 +00:00
bool finished = false ;
while ( ! finished )
2021-11-15 14:52:52 +00:00
{
2021-12-20 15:43:11 +00:00
bool all_up_to_date = true ;
2021-12-20 15:57:35 +00:00
auto & clusters = clusters_to_update - > wait ( 5 s , finished ) ;
2021-12-07 10:35:42 +00:00
for ( auto & [ cluster_name , need_update ] : clusters )
2021-11-15 14:52:52 +00:00
{
2021-12-07 12:51:27 +00:00
auto cluster_info_it = clusters_info . find ( cluster_name ) ;
if ( cluster_info_it = = clusters_info . end ( ) )
{
LOG_ERROR ( log , " Unknown cluster '{}' " , cluster_name ) ;
2021-11-17 11:32:20 +00:00
continue ;
2021-12-07 12:51:27 +00:00
}
auto & cluster_info = cluster_info_it - > second ;
if ( ! need_update . exchange ( false ) )
{
/// force updating periodically
bool force_update = cluster_info . watch . elapsedSeconds ( ) > std : : chrono : : seconds ( force_update_interval ) . count ( ) ;
if ( ! force_update )
continue ;
}
2021-11-25 13:45:38 +00:00
2021-12-07 12:51:27 +00:00
if ( updateCluster ( cluster_info ) )
2021-11-25 13:45:38 +00:00
{
2021-12-07 12:51:27 +00:00
cluster_info . watch . restart ( ) ;
2021-11-25 13:45:38 +00:00
LOG_DEBUG ( log , " Cluster '{}' updated successfully " , cluster_name ) ;
}
else
{
2021-12-20 15:43:11 +00:00
all_up_to_date = false ;
2021-12-13 11:40:07 +00:00
/// no need to trigger convar, will retry after timeout in `wait`
need_update = true ;
2021-12-07 10:35:42 +00:00
LOG_WARNING ( log , " Cluster '{}' wasn't updated, will retry " , cluster_name ) ;
2021-11-25 13:45:38 +00:00
}
2021-11-15 14:52:52 +00:00
}
2021-12-20 15:43:11 +00:00
if ( all_up_to_date )
{
up_to_date_callback ( ) ;
}
2021-11-15 14:52:52 +00:00
}
2021-11-18 08:57:26 +00:00
LOG_DEBUG ( log , " Worker thread stopped " ) ;
2021-12-20 15:57:35 +00:00
return finished ;
2021-11-15 14:52:52 +00:00
}
2023-04-14 19:41:26 +00:00
ClusterPtr ClusterDiscovery : : getCluster ( const String & cluster_name ) const
{
std : : lock_guard lock ( mutex ) ;
auto it = cluster_impls . find ( cluster_name ) ;
if ( it = = cluster_impls . end ( ) )
return nullptr ;
return it - > second ;
}
std : : unordered_map < String , ClusterPtr > ClusterDiscovery : : getClusters ( ) const
{
std : : lock_guard lock ( mutex ) ;
return cluster_impls ;
}
2021-11-15 14:52:52 +00:00
void ClusterDiscovery : : shutdown ( )
{
2021-12-07 09:45:32 +00:00
LOG_DEBUG ( log , " Shutting down " ) ;
2021-12-20 15:57:35 +00:00
clusters_to_update - > stop ( ) ;
2021-12-07 10:35:42 +00:00
2021-11-15 14:52:52 +00:00
if ( main_thread . joinable ( ) )
main_thread . join ( ) ;
}
ClusterDiscovery : : ~ ClusterDiscovery ( )
{
2023-04-14 19:41:26 +00:00
try
{
ClusterDiscovery : : shutdown ( ) ;
}
catch ( . . . )
{
tryLogCurrentException ( log , " Error on ClusterDiscovery shutdown " ) ;
}
2021-11-15 14:52:52 +00:00
}
2021-11-17 14:16:49 +00:00
bool ClusterDiscovery : : NodeInfo : : parse ( const String & data , NodeInfo & result )
{
try
{
Poco : : JSON : : Parser parser ;
auto json = parser . parse ( data ) . extract < Poco : : JSON : : Object : : Ptr > ( ) ;
2021-12-07 09:07:30 +00:00
size_t ver = json - > optValue < size_t > ( " version " , data_ver ) ;
if ( ver = = data_ver )
{
result . address = json - > getValue < std : : string > ( " address " ) ;
result . secure = json - > optValue < bool > ( " secure " , false ) ;
result . shard_id = json - > optValue < size_t > ( " shard_id " , 0 ) ;
}
else
{
LOG_ERROR (
2024-01-23 17:04:50 +00:00
getLogger ( " ClusterDiscovery " ) ,
2021-12-07 09:07:30 +00:00
" Unsupported version '{}' of data in zk node '{}' " ,
ver , data . size ( ) < 1024 ? data : " [data too long] " ) ;
}
2021-11-17 14:16:49 +00:00
}
catch ( Poco : : Exception & e )
{
LOG_WARNING (
2024-01-23 17:04:50 +00:00
getLogger ( " ClusterDiscovery " ) ,
2021-11-17 14:16:49 +00:00
" Can't parse '{}' from node: {} " ,
data . size ( ) < 1024 ? data : " [data too long] " , e . displayText ( ) ) ;
return false ;
}
return true ;
}
String ClusterDiscovery : : NodeInfo : : serialize ( ) const
{
Poco : : JSON : : Object json ;
2021-12-07 09:07:30 +00:00
json . set ( " version " , data_ver ) ;
2021-11-17 14:16:49 +00:00
json . set ( " address " , address ) ;
2021-11-29 12:35:28 +00:00
json . set ( " shard_id " , shard_id ) ;
2021-11-17 14:16:49 +00:00
std : : ostringstream oss ; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss . exceptions ( std : : ios : : failbit ) ;
Poco : : JSON : : Stringifier : : stringify ( json , oss ) ;
return oss . str ( ) ;
}
2021-11-11 09:03:53 +00:00
}