2021-03-29 08:24:56 +00:00
# include <Coordination/KeeperStateManager.h>
2021-11-19 09:30:58 +00:00
2021-10-03 09:54:23 +00:00
# include <Coordination/Defines.h>
2021-02-11 09:17:57 +00:00
# include <Common/Exception.h>
2021-05-12 14:05:44 +00:00
# include <filesystem>
2022-03-02 19:02:02 +00:00
# include <Common/isLocalAddress.h>
# include <Common/DNSResolver.h>
2021-01-13 10:32:20 +00:00
namespace DB
{
2021-02-11 09:17:57 +00:00
namespace ErrorCodes
{
extern const int RAFT_ERROR ;
}
2022-03-02 19:02:02 +00:00
namespace
{
bool isLocalhost ( const std : : string & hostname )
{
return isLoopback ( DNSResolver : : instance ( ) . resolveHost ( hostname ) ) ;
}
}
2022-03-02 19:37:59 +00:00
/// this function quite long because contains a lot of sanity checks in config:
/// 1. No duplicate endpoints
/// 2. No "localhost" or "127.0.0.1" or another local addresses mixed with normal addresses
/// 3. Raft internal port is equal to client port
/// 4. No duplicate IDs
/// 5. Our ID present in hostnames list
2021-11-19 09:30:58 +00:00
KeeperStateManager : : KeeperConfigurationWrapper KeeperStateManager : : parseServersConfiguration ( const Poco : : Util : : AbstractConfiguration & config , bool allow_without_us ) const
2021-01-13 10:32:20 +00:00
{
2021-10-19 13:11:29 +00:00
KeeperConfigurationWrapper result ;
2021-10-18 15:27:51 +00:00
result . cluster_config = std : : make_shared < nuraft : : cluster_config > ( ) ;
2021-02-11 09:17:57 +00:00
Poco : : Util : : AbstractConfiguration : : Keys keys ;
2021-02-16 19:02:18 +00:00
config . keys ( config_prefix + " .raft_configuration " , keys ) ;
2021-02-11 09:17:57 +00:00
2022-03-02 19:37:59 +00:00
std : : unordered_set < UInt64 > client_ports ;
if ( config . has ( config_prefix + " .tcp_port " ) )
client_ports . insert ( config . getUInt64 ( config_prefix + " .tcp_port " ) ) ;
if ( config . has ( config_prefix + " .tcp_port_secure " ) )
client_ports . insert ( config . getUInt64 ( config_prefix + " .tcp_port_secure " ) ) ;
2021-12-02 11:46:33 +00:00
/// Sometimes (especially in cloud envs) users can provide incorrect
/// configuration with duplicated raft ids or endpoints. We check them
/// on config parsing stage and never commit to quorum.
std : : unordered_map < std : : string , int > check_duplicated_hostnames ;
2021-10-18 15:27:51 +00:00
size_t total_servers = 0 ;
2022-03-02 19:02:02 +00:00
std : : string local_hostname ;
std : : string non_local_hostname ;
2021-02-11 09:17:57 +00:00
for ( const auto & server_key : keys )
{
2021-04-12 12:40:01 +00:00
if ( ! startsWith ( server_key , " server " ) )
continue ;
2021-02-16 19:02:18 +00:00
std : : string full_prefix = config_prefix + " .raft_configuration. " + server_key ;
2021-10-18 15:27:51 +00:00
int new_server_id = config . getInt ( full_prefix + " .id " ) ;
2021-02-11 09:17:57 +00:00
std : : string hostname = config . getString ( full_prefix + " .hostname " ) ;
int port = config . getInt ( full_prefix + " .port " ) ;
bool can_become_leader = config . getBool ( full_prefix + " .can_become_leader " , true ) ;
int32_t priority = config . getInt ( full_prefix + " .priority " , 1 ) ;
2021-02-11 10:25:10 +00:00
bool start_as_follower = config . getBool ( full_prefix + " .start_as_follower " , false ) ;
2021-04-12 12:25:52 +00:00
2022-03-02 19:37:59 +00:00
if ( client_ports . contains ( port ) )
{
throw Exception ( ErrorCodes : : RAFT_ERROR , " Raft config contains hostname '{}' with port '{}' which is equal to client port on current machine " ,
hostname , port ) ;
}
2022-03-02 19:02:02 +00:00
if ( isLocalhost ( hostname ) )
local_hostname = hostname ;
else
non_local_hostname = hostname ;
2021-02-11 10:25:10 +00:00
if ( start_as_follower )
2021-10-18 15:27:51 +00:00
result . servers_start_as_followers . insert ( new_server_id ) ;
2021-02-11 09:17:57 +00:00
auto endpoint = hostname + " : " + std : : to_string ( port ) ;
2021-12-02 11:46:33 +00:00
if ( check_duplicated_hostnames . count ( endpoint ) )
{
2022-03-02 19:37:59 +00:00
throw Exception ( ErrorCodes : : RAFT_ERROR , " Raft config contains duplicate endpoints: "
2021-12-02 11:46:33 +00:00
" endpoint {} has been already added with id {}, but going to add it one more time with id {} " ,
endpoint , check_duplicated_hostnames [ endpoint ] , new_server_id ) ;
}
else
{
/// Fullscan to check duplicated ids
for ( const auto & [ id_endpoint , id ] : check_duplicated_hostnames )
{
if ( new_server_id = = id )
2022-03-02 19:37:59 +00:00
throw Exception ( ErrorCodes : : RAFT_ERROR , " Raft config contains duplicate ids: id {} has been already added with endpoint {}, "
2021-12-02 11:46:33 +00:00
" but going to add it one more time with endpoint {} " , id , id_endpoint , endpoint ) ;
}
check_duplicated_hostnames . emplace ( endpoint , new_server_id ) ;
}
2021-10-18 15:27:51 +00:00
auto peer_config = nuraft : : cs_new < nuraft : : srv_config > ( new_server_id , 0 , endpoint , " " , ! can_become_leader , priority ) ;
if ( my_server_id = = new_server_id )
2021-02-11 09:17:57 +00:00
{
2021-10-18 15:27:51 +00:00
result . config = peer_config ;
result . port = port ;
2021-02-11 09:17:57 +00:00
}
2021-10-18 15:27:51 +00:00
result . cluster_config - > get_servers ( ) . push_back ( peer_config ) ;
total_servers + + ;
2021-02-11 09:17:57 +00:00
}
2021-04-12 12:25:52 +00:00
2021-10-19 13:37:28 +00:00
if ( ! result . config & & ! allow_without_us )
2021-02-16 19:02:18 +00:00
throw Exception ( ErrorCodes : : RAFT_ERROR , " Our server id {} not found in raft_configuration section " , my_server_id ) ;
2021-02-11 10:25:10 +00:00
2021-10-18 15:27:51 +00:00
if ( result . servers_start_as_followers . size ( ) = = total_servers )
2021-02-11 10:25:10 +00:00
throw Exception ( ErrorCodes : : RAFT_ERROR , " At least one of servers should be able to start as leader (without <start_as_follower>) " ) ;
2021-10-18 15:27:51 +00:00
2022-03-02 19:02:02 +00:00
if ( ! local_hostname . empty ( ) & & ! non_local_hostname . empty ( ) )
{
throw Exception (
ErrorCodes : : RAFT_ERROR ,
" Mixing local and non-local hostnames ('{}' and '{}') in raft_configuration is not allowed. Different hosts can resolve it to themselves so it's not allowed. " ,
local_hostname , non_local_hostname ) ;
}
2021-10-18 15:27:51 +00:00
return result ;
}
KeeperStateManager : : KeeperStateManager ( int server_id_ , const std : : string & host , int port , const std : : string & logs_path )
2021-10-27 12:26:42 +00:00
: my_server_id ( server_id_ )
, secure ( false )
, log_store ( nuraft : : cs_new < KeeperLogStore > ( logs_path , 5000 , false , false ) )
2021-10-18 15:27:51 +00:00
{
auto peer_config = nuraft : : cs_new < nuraft : : srv_config > ( my_server_id , host + " : " + std : : to_string ( port ) ) ;
2021-10-19 12:00:26 +00:00
configuration_wrapper . cluster_config = nuraft : : cs_new < nuraft : : cluster_config > ( ) ;
configuration_wrapper . port = port ;
configuration_wrapper . config = peer_config ;
configuration_wrapper . cluster_config - > get_servers ( ) . push_back ( peer_config ) ;
2021-10-18 15:27:51 +00:00
}
KeeperStateManager : : KeeperStateManager (
2021-10-27 12:26:42 +00:00
int my_server_id_ ,
2021-10-18 15:27:51 +00:00
const std : : string & config_prefix_ ,
2021-10-27 12:26:42 +00:00
const std : : string & log_storage_path ,
2021-10-18 15:27:51 +00:00
const Poco : : Util : : AbstractConfiguration & config ,
2021-10-27 12:26:42 +00:00
const CoordinationSettingsPtr & coordination_settings )
: my_server_id ( my_server_id_ )
2021-10-27 15:21:26 +00:00
, secure ( config . getBool ( config_prefix_ + " .raft_configuration.secure " , false ) )
2021-10-18 15:27:51 +00:00
, config_prefix ( config_prefix_ )
2021-10-19 13:37:28 +00:00
, configuration_wrapper ( parseServersConfiguration ( config , false ) )
2021-10-18 15:27:51 +00:00
, log_store ( nuraft : : cs_new < KeeperLogStore > (
2021-10-27 12:26:42 +00:00
log_storage_path ,
2021-11-18 20:17:22 +00:00
coordination_settings - > rotate_log_storage_interval ,
coordination_settings - > force_sync ,
coordination_settings - > compress_logs ) )
2021-10-18 15:27:51 +00:00
{
2021-01-13 10:32:20 +00:00
}
2021-04-08 14:17:57 +00:00
void KeeperStateManager : : loadLogStore ( uint64_t last_commited_index , uint64_t logs_to_keep )
2021-02-16 19:02:18 +00:00
{
2021-03-04 11:22:59 +00:00
log_store - > init ( last_commited_index , logs_to_keep ) ;
2021-02-16 19:02:18 +00:00
}
2021-10-18 15:27:51 +00:00
ClusterConfigPtr KeeperStateManager : : getLatestConfigFromLogStore ( ) const
{
auto entry_with_change = log_store - > getLatestConfigChange ( ) ;
if ( entry_with_change )
return ClusterConfig : : deserialize ( entry_with_change - > get_buf ( ) ) ;
return nullptr ;
}
2021-03-29 08:24:56 +00:00
void KeeperStateManager : : flushLogStore ( )
2021-02-17 20:36:25 +00:00
{
log_store - > flush ( ) ;
}
2021-03-29 08:24:56 +00:00
void KeeperStateManager : : save_config ( const nuraft : : cluster_config & config )
2021-01-13 10:32:20 +00:00
{
2021-10-19 13:11:29 +00:00
std : : lock_guard lock ( configuration_wrapper_mutex ) ;
2021-01-13 10:32:20 +00:00
nuraft : : ptr < nuraft : : buffer > buf = config . serialize ( ) ;
2021-10-19 12:00:26 +00:00
configuration_wrapper . cluster_config = nuraft : : cluster_config : : deserialize ( * buf ) ;
2021-01-13 10:32:20 +00:00
}
2021-03-29 08:24:56 +00:00
void KeeperStateManager : : save_state ( const nuraft : : srv_state & state )
2021-01-13 10:32:20 +00:00
{
2021-10-19 13:11:29 +00:00
nuraft : : ptr < nuraft : : buffer > buf = state . serialize ( ) ;
server_state = nuraft : : srv_state : : deserialize ( * buf ) ;
2021-10-18 15:27:51 +00:00
}
ConfigUpdateActions KeeperStateManager : : getConfigurationDiff ( const Poco : : Util : : AbstractConfiguration & config ) const
{
2021-10-19 13:37:28 +00:00
auto new_configuration_wrapper = parseServersConfiguration ( config , true ) ;
2021-10-18 15:27:51 +00:00
std : : unordered_map < int , KeeperServerConfigPtr > new_ids , old_ids ;
2021-10-19 19:42:40 +00:00
for ( const auto & new_server : new_configuration_wrapper . cluster_config - > get_servers ( ) )
2021-10-18 15:27:51 +00:00
new_ids [ new_server - > get_id ( ) ] = new_server ;
2021-10-19 13:11:29 +00:00
{
std : : lock_guard lock ( configuration_wrapper_mutex ) ;
2021-10-19 19:42:40 +00:00
for ( const auto & old_server : configuration_wrapper . cluster_config - > get_servers ( ) )
2021-10-19 13:11:29 +00:00
old_ids [ old_server - > get_id ( ) ] = old_server ;
}
2021-10-18 15:27:51 +00:00
ConfigUpdateActions result ;
2021-10-19 13:11:29 +00:00
/// First of all add new servers
2021-10-19 07:14:53 +00:00
for ( auto [ new_id , server_config ] : new_ids )
{
if ( ! old_ids . count ( new_id ) )
result . emplace_back ( ConfigUpdateAction { ConfigUpdateActionType : : AddServer , server_config } ) ;
}
2021-10-18 15:27:51 +00:00
2021-10-19 13:11:29 +00:00
/// After that remove old ones
2021-10-19 12:00:26 +00:00
for ( auto [ old_id , server_config ] : old_ids )
{
if ( ! new_ids . count ( old_id ) )
result . emplace_back ( ConfigUpdateAction { ConfigUpdateActionType : : RemoveServer , server_config } ) ;
}
2021-10-18 15:27:51 +00:00
{
2021-10-19 13:11:29 +00:00
std : : lock_guard lock ( configuration_wrapper_mutex ) ;
/// And update priority if required
for ( const auto & old_server : configuration_wrapper . cluster_config - > get_servers ( ) )
2021-10-18 15:27:51 +00:00
{
2021-10-19 13:11:29 +00:00
for ( const auto & new_server : new_configuration_wrapper . cluster_config - > get_servers ( ) )
2021-10-18 15:27:51 +00:00
{
2021-10-19 13:11:29 +00:00
if ( old_server - > get_id ( ) = = new_server - > get_id ( ) )
2021-10-19 12:00:26 +00:00
{
2021-10-19 13:11:29 +00:00
if ( old_server - > get_priority ( ) ! = new_server - > get_priority ( ) )
{
result . emplace_back ( ConfigUpdateAction { ConfigUpdateActionType : : UpdatePriority , new_server } ) ;
}
break ;
2021-10-19 12:00:26 +00:00
}
2021-10-18 15:27:51 +00:00
}
}
}
return result ;
}
2021-01-13 10:32:20 +00:00
}