2020-04-05 12:18:51 +00:00
# include <Databases/DatabaseReplicated.h>
# include <IO/ReadBufferFromFile.h>
2020-05-11 12:55:17 +00:00
# include <IO/ReadBufferFromString.h>
2020-04-05 12:18:51 +00:00
# include <IO/ReadHelpers.h>
# include <IO/WriteBufferFromFile.h>
# include <IO/WriteHelpers.h>
# include <Interpreters/Context.h>
2020-05-11 12:55:17 +00:00
# include <Interpreters/executeQuery.h>
2020-04-05 12:18:51 +00:00
# include <Parsers/queryToString.h>
2020-05-12 13:35:05 +00:00
# include <Common/Exception.h>
2020-04-05 12:18:51 +00:00
# include <Common/ZooKeeper/KeeperException.h>
# include <Common/ZooKeeper/Types.h>
# include <Common/ZooKeeper/ZooKeeper.h>
2020-05-11 12:55:17 +00:00
# include <Common/ZooKeeper/Lock.h>
2020-04-05 12:18:51 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int NO_ZOOKEEPER ;
2020-06-22 14:19:26 +00:00
extern const int LOGICAL_ERROR ;
2020-06-24 12:45:42 +00:00
extern const int BAD_ARGUMENTS ;
2020-04-05 12:18:51 +00:00
}
void DatabaseReplicated : : setZooKeeper ( zkutil : : ZooKeeperPtr zookeeper )
{
std : : lock_guard lock ( current_zookeeper_mutex ) ;
current_zookeeper = zookeeper ;
}
zkutil : : ZooKeeperPtr DatabaseReplicated : : tryGetZooKeeper ( ) const
{
std : : lock_guard lock ( current_zookeeper_mutex ) ;
return current_zookeeper ;
}
zkutil : : ZooKeeperPtr DatabaseReplicated : : getZooKeeper ( ) const
{
auto res = tryGetZooKeeper ( ) ;
if ( ! res )
throw Exception ( " Cannot get ZooKeeper " , ErrorCodes : : NO_ZOOKEEPER ) ;
return res ;
}
DatabaseReplicated : : DatabaseReplicated (
const String & name_ ,
const String & metadata_path_ ,
const String & zookeeper_path_ ,
const String & replica_name_ ,
2020-05-05 14:16:59 +00:00
Context & context_ )
2020-06-20 15:39:58 +00:00
: DatabaseAtomic ( name_ , metadata_path_ , " store/ " , " DatabaseReplicated ( " + name_ + " ) " , context_ )
2020-04-05 12:18:51 +00:00
, zookeeper_path ( zookeeper_path_ )
, replica_name ( replica_name_ )
{
2020-06-24 12:45:42 +00:00
if ( zookeeper_path . empty ( ) | | replica_name . empty ( ) ) {
throw Exception ( " ZooKeeper path and replica name must be non-empty " , ErrorCodes : : BAD_ARGUMENTS ) ;
}
if ( zookeeper_path . back ( ) = = ' / ' )
2020-04-05 12:18:51 +00:00
zookeeper_path . resize ( zookeeper_path . size ( ) - 1 ) ;
2020-05-01 13:16:02 +00:00
// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it.
2020-06-24 12:45:42 +00:00
if ( zookeeper_path . front ( ) ! = ' / ' )
2020-04-05 12:18:51 +00:00
zookeeper_path = " / " + zookeeper_path ;
2020-05-01 13:16:02 +00:00
2020-04-05 12:18:51 +00:00
if ( context_ . hasZooKeeper ( ) ) {
current_zookeeper = context_ . getZooKeeper ( ) ;
}
if ( ! current_zookeeper )
{
2020-04-30 16:15:27 +00:00
throw Exception ( " Can't create replicated database without ZooKeeper " , ErrorCodes : : NO_ZOOKEEPER ) ;
}
2020-04-05 12:18:51 +00:00
2020-06-20 15:39:58 +00:00
// New database
2020-06-24 12:45:42 +00:00
if ( ! current_zookeeper - > exists ( zookeeper_path ) ) {
2020-05-24 17:13:53 +00:00
createDatabaseZKNodes ( ) ;
2020-06-20 15:39:58 +00:00
// Old replica recovery
2020-06-24 12:45:42 +00:00
} else if ( current_zookeeper - > exists ( zookeeper_path + " /replicas/ " + replica_name ) ) {
2020-06-22 14:19:26 +00:00
String remote_last_entry = current_zookeeper - > get ( zookeeper_path + " /replicas/ " + replica_name , { } , NULL ) ;
2020-06-20 15:39:58 +00:00
String local_last_entry ;
try
{
ReadBufferFromFile in ( getMetadataPath ( ) + " .last_entry " , 16 ) ;
readStringUntilEOF ( local_last_entry , in ) ;
}
catch ( const Exception & e )
{
2020-06-22 14:19:26 +00:00
// Metadata is corrupted.
// Replica erases the previous zk last executed log entry
// and behaves like a new clean replica.
writeLastExecutedToDiskAndZK ( ) ;
2020-06-20 15:39:58 +00:00
}
2020-05-24 17:13:53 +00:00
2020-06-22 14:19:26 +00:00
if ( ! local_last_entry . empty ( ) & & local_last_entry = = remote_last_entry ) {
2020-06-20 15:39:58 +00:00
last_executed_log_entry = local_last_entry ;
} else {
2020-06-22 14:19:26 +00:00
throw Exception ( " Replica name might be in use by a different node. Please check replica_name parameter. Remove .last_entry file from metadata to create a new replica. " , ErrorCodes : : LOGICAL_ERROR ) ;
2020-06-20 15:39:58 +00:00
}
2020-05-13 17:00:47 +00:00
}
2020-05-11 12:55:17 +00:00
2020-06-20 15:39:58 +00:00
snapshot_period = context_ . getConfigRef ( ) . getInt ( " database_replicated_snapshot_period " , 10 ) ;
2020-06-22 14:19:26 +00:00
LOG_DEBUG ( log , " Snapshot period is set to " < < snapshot_period < < " log entries per one snapshot " ) ;
2020-05-24 17:13:53 +00:00
2020-06-20 15:39:58 +00:00
background_log_executor = global_context . getReplicatedSchedulePool ( ) . createTask ( database_name + " (DatabaseReplicated::background_executor) " , [ this ] { runBackgroundLogExecutor ( ) ; } ) ;
background_log_executor - > scheduleAfter ( 500 ) ;
2020-05-11 12:55:17 +00:00
}
2020-05-24 17:13:53 +00:00
void DatabaseReplicated : : createDatabaseZKNodes ( ) {
current_zookeeper = getZooKeeper ( ) ;
current_zookeeper - > createAncestors ( zookeeper_path ) ;
current_zookeeper - > createIfNotExists ( zookeeper_path , String ( ) ) ;
current_zookeeper - > createIfNotExists ( zookeeper_path + " /log " , String ( ) ) ;
2020-06-20 15:39:58 +00:00
current_zookeeper - > createIfNotExists ( zookeeper_path + " /snapshots " , String ( ) ) ;
current_zookeeper - > createIfNotExists ( zookeeper_path + " /replicas " , String ( ) ) ;
}
void DatabaseReplicated : : RemoveOutdatedSnapshotsAndLog ( ) {
// This method removes all snapshots and logged queries
// that no longer will be in use by current replicas or
// new coming ones.
// Each registered replica has its state in ZooKeeper.
2020-06-22 14:22:26 +00:00
// Therefore, snapshots and logged queries that are less
// than a least advanced replica are removed.
2020-06-20 15:39:58 +00:00
// It does not interfere with a new coming replica
// metadata loading from snapshot
2020-06-22 14:22:26 +00:00
// because the replica will use the latest snapshot available
2020-06-20 15:39:58 +00:00
// and this snapshot will set the last executed log query
// to a greater one than the least advanced current replica.
current_zookeeper = getZooKeeper ( ) ;
Strings replica_states = current_zookeeper - > getChildren ( zookeeper_path + " /replicas " ) ;
auto least_advanced = std : : min_element ( replica_states . begin ( ) , replica_states . end ( ) ) ;
Strings snapshots = current_zookeeper - > getChildren ( zookeeper_path + " /snapshots " ) ;
if ( snapshots . size ( ) < 2 ) {
return ;
}
std : : sort ( snapshots . begin ( ) , snapshots . end ( ) ) ;
auto still_useful = std : : lower_bound ( snapshots . begin ( ) , snapshots . end ( ) , * least_advanced ) ;
snapshots . erase ( still_useful , snapshots . end ( ) ) ;
for ( const String & snapshot : snapshots ) {
current_zookeeper - > tryRemoveRecursive ( zookeeper_path + " /snapshots/ " + snapshot ) ;
}
Strings log_entry_names = current_zookeeper - > getChildren ( zookeeper_path + " /log " ) ;
std : : sort ( log_entry_names . begin ( ) , log_entry_names . end ( ) ) ;
auto still_useful_log = std : : upper_bound ( log_entry_names . begin ( ) , log_entry_names . end ( ) , * still_useful ) ;
log_entry_names . erase ( still_useful_log , log_entry_names . end ( ) ) ;
for ( const String & log_entry_name : log_entry_names ) {
String log_entry_path = zookeeper_path + " /log/ " + log_entry_name ;
current_zookeeper - > tryRemove ( log_entry_path ) ;
}
2020-05-11 12:55:17 +00:00
}
2020-05-24 17:13:53 +00:00
void DatabaseReplicated : : runBackgroundLogExecutor ( ) {
2020-06-20 15:39:58 +00:00
if ( last_executed_log_entry = = " " ) {
loadMetadataFromSnapshot ( ) ;
}
2020-05-24 17:13:53 +00:00
current_zookeeper = getZooKeeper ( ) ;
2020-06-07 11:20:05 +00:00
Strings log_entry_names = current_zookeeper - > getChildren ( zookeeper_path + " /log " ) ;
std : : sort ( log_entry_names . begin ( ) , log_entry_names . end ( ) ) ;
auto newest_entry_it = std : : upper_bound ( log_entry_names . begin ( ) , log_entry_names . end ( ) , last_executed_log_entry ) ;
log_entry_names . erase ( log_entry_names . begin ( ) , newest_entry_it ) ;
for ( const String & log_entry_name : log_entry_names ) {
String log_entry_path = zookeeper_path + " /log/ " + log_entry_name ;
executeFromZK ( log_entry_path ) ;
last_executed_log_entry = log_entry_name ;
2020-06-22 14:19:26 +00:00
writeLastExecutedToDiskAndZK ( ) ;
2020-06-20 15:39:58 +00:00
int log_n = parse < int > ( log_entry_name . substr ( 4 ) ) ;
int last_log_n = parse < int > ( log_entry_names . back ( ) . substr ( 4 ) ) ;
2020-06-22 14:19:26 +00:00
// The third condition gurantees at most one snapshot creation per batch
2020-06-20 15:39:58 +00:00
if ( log_n > 0 & & snapshot_period > 0 & & ( last_log_n - log_n ) / snapshot_period = = 0 & & log_n % snapshot_period = = 0 ) {
createSnapshot ( ) ;
}
2020-05-24 17:13:53 +00:00
}
2020-06-07 11:20:05 +00:00
2020-05-24 17:13:53 +00:00
background_log_executor - > scheduleAfter ( 500 ) ;
2020-05-11 12:55:17 +00:00
}
2020-06-22 14:19:26 +00:00
void DatabaseReplicated : : writeLastExecutedToDiskAndZK ( ) {
2020-05-27 18:40:00 +00:00
current_zookeeper = getZooKeeper ( ) ;
2020-06-20 15:39:58 +00:00
current_zookeeper - > createOrUpdate ( zookeeper_path + " /replicas/ " + replica_name , last_executed_log_entry , zkutil : : CreateMode : : Persistent ) ;
2020-05-27 18:40:00 +00:00
String metadata_file = getMetadataPath ( ) + " .last_entry " ;
2020-06-20 15:39:58 +00:00
WriteBufferFromFile out ( metadata_file , last_executed_log_entry . size ( ) , O_WRONLY | O_CREAT ) ;
writeString ( last_executed_log_entry , out ) ;
2020-05-13 17:00:47 +00:00
out . next ( ) ;
if ( global_context . getSettingsRef ( ) . fsync_metadata )
out . sync ( ) ;
out . close ( ) ;
}
2020-05-24 17:13:53 +00:00
void DatabaseReplicated : : executeFromZK ( String & path ) {
2020-05-11 12:55:17 +00:00
current_zookeeper = getZooKeeper ( ) ;
2020-05-24 17:13:53 +00:00
String query_to_execute = current_zookeeper - > get ( path , { } , NULL ) ;
2020-05-11 12:55:17 +00:00
ReadBufferFromString istr ( query_to_execute ) ;
String dummy_string ;
WriteBufferFromString ostr ( dummy_string ) ;
2020-05-12 13:35:05 +00:00
try
{
current_context = std : : make_unique < Context > ( global_context ) ;
2020-05-26 15:08:09 +00:00
current_context - > getClientInfo ( ) . query_kind = ClientInfo : : QueryKind : : REPLICATED_LOG_QUERY ;
2020-05-12 14:25:36 +00:00
current_context - > setCurrentDatabase ( database_name ) ;
2020-05-12 13:35:05 +00:00
current_context - > setCurrentQueryId ( " " ) ; // generate random query_id
executeQuery ( istr , ostr , false , * current_context , { } ) ;
}
catch ( . . . )
{
2020-05-24 17:13:53 +00:00
tryLogCurrentException ( log , " Query from zookeeper " + query_to_execute + " wasn't finished successfully " ) ;
2020-05-12 13:35:05 +00:00
}
LOG_DEBUG ( log , " Executed query: " < < query_to_execute ) ;
2020-05-11 12:55:17 +00:00
}
2020-05-05 14:16:59 +00:00
void DatabaseReplicated : : propose ( const ASTPtr & query ) {
2020-05-11 12:55:17 +00:00
current_zookeeper = getZooKeeper ( ) ;
2020-05-13 17:00:47 +00:00
2020-06-22 14:19:26 +00:00
LOG_DEBUG ( log , " Proposing query: " < < queryToString ( query ) ) ;
2020-06-07 11:20:05 +00:00
current_zookeeper - > create ( zookeeper_path + " /log/log- " , queryToString ( query ) , zkutil : : CreateMode : : PersistentSequential ) ;
2020-05-27 18:33:37 +00:00
2020-05-24 17:13:53 +00:00
background_log_executor - > schedule ( ) ;
2020-05-13 17:00:47 +00:00
}
2020-06-20 15:39:58 +00:00
void DatabaseReplicated : : createSnapshot ( ) {
2020-05-24 17:13:53 +00:00
current_zookeeper = getZooKeeper ( ) ;
2020-06-20 15:39:58 +00:00
String snapshot_path = zookeeper_path + " /snapshots/ " + last_executed_log_entry ;
if ( Coordination : : ZNODEEXISTS = = current_zookeeper - > tryCreate ( snapshot_path , String ( ) , zkutil : : CreateMode : : Persistent ) ) {
return ;
}
2020-05-13 17:00:47 +00:00
for ( auto iterator = getTablesIterator ( { } ) ; iterator - > isValid ( ) ; iterator - > next ( ) ) {
String table_name = iterator - > name ( ) ;
auto query = getCreateQueryFromMetadata ( getObjectMetadataPath ( table_name ) , true ) ;
String statement = queryToString ( query ) ;
2020-06-24 12:45:42 +00:00
current_zookeeper - > createIfNotExists ( snapshot_path + " / " + table_name , statement ) ;
2020-05-13 17:00:47 +00:00
}
2020-06-24 12:45:42 +00:00
current_zookeeper - > createIfNotExists ( snapshot_path + " /.completed " , String ( ) ) ;
2020-06-20 15:39:58 +00:00
RemoveOutdatedSnapshotsAndLog ( ) ;
2020-05-11 12:55:17 +00:00
}
2020-04-05 12:18:51 +00:00
2020-05-24 17:13:53 +00:00
void DatabaseReplicated : : loadMetadataFromSnapshot ( ) {
2020-06-22 14:19:26 +00:00
// Executes the latest snapshot.
// Used by new replicas only.
2020-05-24 17:13:53 +00:00
current_zookeeper = getZooKeeper ( ) ;
2020-06-20 15:39:58 +00:00
Strings snapshots ;
if ( current_zookeeper - > tryGetChildren ( zookeeper_path + " /snapshots " , snapshots ) ! = Coordination : : ZOK )
return ;
2020-06-24 12:45:42 +00:00
auto latest_snapshot = std : : max_element ( snapshots . begin ( ) , snapshots . end ( ) ) ;
while ( snapshots . size ( ) > 0 & & ! current_zookeeper - > exists ( zookeeper_path + " /snapshots/ " + * latest_snapshot + " /.completed " ) ) {
snapshots . erase ( latest_snapshot ) ;
latest_snapshot = std : : max_element ( snapshots . begin ( ) , snapshots . end ( ) ) ;
}
2020-06-20 15:39:58 +00:00
if ( snapshots . size ( ) < 1 ) {
return ;
}
2020-06-24 12:45:42 +00:00
2020-05-24 17:13:53 +00:00
Strings metadatas ;
2020-06-20 15:39:58 +00:00
if ( current_zookeeper - > tryGetChildren ( zookeeper_path + " /snapshots/ " + * latest_snapshot , metadatas ) ! = Coordination : : ZOK )
2020-05-24 17:13:53 +00:00
return ;
2020-06-22 14:19:26 +00:00
LOG_DEBUG ( log , " Executing " < < * latest_snapshot < < " snapshot " ) ;
2020-05-24 17:13:53 +00:00
for ( auto t = metadatas . begin ( ) ; t ! = metadatas . end ( ) ; + + t ) {
2020-06-20 15:39:58 +00:00
String path = zookeeper_path + " /snapshots/ " + * latest_snapshot + " / " + * t ;
2020-06-22 14:19:26 +00:00
2020-05-24 17:13:53 +00:00
executeFromZK ( path ) ;
}
2020-06-20 15:39:58 +00:00
last_executed_log_entry = * latest_snapshot ;
2020-06-22 14:19:26 +00:00
writeLastExecutedToDiskAndZK ( ) ;
2020-06-20 15:39:58 +00:00
}
void DatabaseReplicated : : drop ( const Context & context_ )
{
current_zookeeper = getZooKeeper ( ) ;
current_zookeeper - > tryRemove ( zookeeper_path + " /replicas/ " + replica_name ) ;
DatabaseAtomic : : drop ( context_ ) ;
2020-05-24 17:13:53 +00:00
}
2020-04-05 12:18:51 +00:00
}