2021-03-29 08:24:56 +00:00
# include <Coordination/KeeperSnapshotManager.h>
2021-01-21 11:07:55 +00:00
# include <IO/WriteHelpers.h>
2021-03-01 13:33:34 +00:00
# include <Compression/CompressedReadBuffer.h>
# include <Compression/CompressedWriteBuffer.h>
2021-01-21 11:07:55 +00:00
# include <IO/ReadHelpers.h>
# include <Common/ZooKeeper/ZooKeeperIO.h>
2021-03-01 13:33:34 +00:00
# include <Coordination/ReadBufferFromNuraftBuffer.h>
# include <Coordination/WriteBufferFromNuraftBuffer.h>
# include <IO/WriteBufferFromFile.h>
# include <IO/ReadBufferFromFile.h>
# include <IO/copyData.h>
# include <filesystem>
2021-01-21 11:07:55 +00:00
namespace DB
{
2021-03-01 13:33:34 +00:00
namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT_VERSION ;
2021-03-03 11:10:24 +00:00
extern const int UNKNOWN_SNAPSHOT ;
2021-03-04 12:30:26 +00:00
extern const int LOGICAL_ERROR ;
2021-03-01 13:33:34 +00:00
}
2021-01-21 11:07:55 +00:00
namespace
{
2021-04-08 14:17:57 +00:00
uint64_t getSnapshotPathUpToLogIdx ( const String & snapshot_path )
2021-03-01 13:33:34 +00:00
{
std : : filesystem : : path path ( snapshot_path ) ;
std : : string filename = path . stem ( ) ;
Strings name_parts ;
splitInto < ' _ ' > ( name_parts , filename ) ;
2021-04-08 14:17:57 +00:00
return parse < uint64_t > ( name_parts [ 1 ] ) ;
2021-03-01 13:33:34 +00:00
}
2021-04-08 14:17:57 +00:00
std : : string getSnapshotFileName ( uint64_t up_to_log_idx )
2021-03-01 13:33:34 +00:00
{
return std : : string { " snapshot_ " } + std : : to_string ( up_to_log_idx ) + " .bin " ;
}
2021-03-03 12:21:21 +00:00
std : : string getBaseName ( const String & path )
{
size_t basename_start = path . rfind ( ' / ' ) ;
return std : : string { & path [ basename_start + 1 ] , path . length ( ) - basename_start - 1 } ;
}
2021-03-01 13:33:34 +00:00
String parentPath ( const String & path )
{
auto rslash_pos = path . rfind ( ' / ' ) ;
if ( rslash_pos > 0 )
return path . substr ( 0 , rslash_pos ) ;
return " / " ;
}
2021-03-29 08:24:56 +00:00
void writeNode ( const KeeperStorage : : Node & node , WriteBuffer & out )
2021-02-26 14:54:59 +00:00
{
2021-03-04 13:02:30 +00:00
writeBinary ( node . data , out ) ;
/// Serialize ACL
writeBinary ( node . acls . size ( ) , out ) ;
for ( const auto & acl : node . acls )
{
writeBinary ( acl . permissions , out ) ;
writeBinary ( acl . scheme , out ) ;
writeBinary ( acl . id , out ) ;
}
writeBinary ( node . is_sequental , out ) ;
/// Serialize stat
writeBinary ( node . stat . czxid , out ) ;
writeBinary ( node . stat . mzxid , out ) ;
writeBinary ( node . stat . ctime , out ) ;
writeBinary ( node . stat . mtime , out ) ;
writeBinary ( node . stat . version , out ) ;
writeBinary ( node . stat . cversion , out ) ;
writeBinary ( node . stat . aversion , out ) ;
writeBinary ( node . stat . ephemeralOwner , out ) ;
writeBinary ( node . stat . dataLength , out ) ;
writeBinary ( node . stat . numChildren , out ) ;
writeBinary ( node . stat . pzxid , out ) ;
writeBinary ( node . seq_num , out ) ;
2021-02-26 14:54:59 +00:00
}
2021-01-21 11:07:55 +00:00
2021-03-29 08:24:56 +00:00
void readNode ( KeeperStorage : : Node & node , ReadBuffer & in )
2021-02-26 14:54:59 +00:00
{
2021-03-04 13:02:30 +00:00
readBinary ( node . data , in ) ;
/// Deserialize ACL
size_t acls_size ;
readBinary ( acls_size , in ) ;
for ( size_t i = 0 ; i < acls_size ; + + i )
{
Coordination : : ACL acl ;
readBinary ( acl . permissions , in ) ;
readBinary ( acl . scheme , in ) ;
readBinary ( acl . id , in ) ;
node . acls . push_back ( acl ) ;
}
readBinary ( node . is_sequental , in ) ;
/// Deserialize stat
readBinary ( node . stat . czxid , in ) ;
readBinary ( node . stat . mzxid , in ) ;
readBinary ( node . stat . ctime , in ) ;
readBinary ( node . stat . mtime , in ) ;
readBinary ( node . stat . version , in ) ;
readBinary ( node . stat . cversion , in ) ;
readBinary ( node . stat . aversion , in ) ;
readBinary ( node . stat . ephemeralOwner , in ) ;
readBinary ( node . stat . dataLength , in ) ;
readBinary ( node . stat . numChildren , in ) ;
readBinary ( node . stat . pzxid , in ) ;
readBinary ( node . seq_num , in ) ;
2021-02-26 14:54:59 +00:00
}
2021-03-02 14:30:56 +00:00
void serializeSnapshotMetadata ( const SnapshotMetadataPtr & snapshot_meta , WriteBuffer & out )
{
auto buffer = snapshot_meta - > serialize ( ) ;
2021-03-04 13:02:30 +00:00
writeVarUInt ( buffer - > size ( ) , out ) ;
out . write ( reinterpret_cast < const char * > ( buffer - > data_begin ( ) ) , buffer - > size ( ) ) ;
2021-03-02 14:30:56 +00:00
}
SnapshotMetadataPtr deserializeSnapshotMetadata ( ReadBuffer & in )
{
2021-03-04 13:02:30 +00:00
size_t data_size ;
readVarUInt ( data_size , in ) ;
auto buffer = nuraft : : buffer : : alloc ( data_size ) ;
in . readStrict ( reinterpret_cast < char * > ( buffer - > data_begin ( ) ) , data_size ) ;
2021-03-02 14:30:56 +00:00
buffer - > pos ( 0 ) ;
return SnapshotMetadata : : deserialize ( * buffer ) ;
}
2021-01-21 11:07:55 +00:00
}
2021-03-01 13:33:34 +00:00
2021-03-29 08:24:56 +00:00
void KeeperStorageSnapshot : : serialize ( const KeeperStorageSnapshot & snapshot , WriteBuffer & out )
2021-01-21 11:07:55 +00:00
{
2021-03-04 13:02:30 +00:00
writeBinary ( static_cast < uint8_t > ( snapshot . version ) , out ) ;
2021-03-02 14:30:56 +00:00
serializeSnapshotMetadata ( snapshot . snapshot_meta , out ) ;
2021-03-04 13:02:30 +00:00
writeBinary ( snapshot . session_id , out ) ;
writeBinary ( snapshot . snapshot_container_size , out ) ;
2021-03-01 15:32:27 +00:00
size_t counter = 0 ;
for ( auto it = snapshot . begin ; counter < snapshot . snapshot_container_size ; + + it , + + counter )
2021-03-01 13:33:34 +00:00
{
const auto & path = it - > key ;
const auto & node = it - > value ;
2021-03-04 11:22:59 +00:00
if ( static_cast < size_t > ( node . stat . mzxid ) > snapshot . snapshot_meta - > get_last_log_idx ( ) )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Trying to serialize node with mzxid {}, but last snapshot index {} " , node . stat . mzxid , snapshot . snapshot_meta - > get_last_log_idx ( ) ) ;
2021-03-03 12:21:21 +00:00
2021-03-04 13:02:30 +00:00
writeBinary ( path , out ) ;
2021-03-01 13:33:34 +00:00
writeNode ( node , out ) ;
}
2021-03-01 15:32:27 +00:00
size_t size = snapshot . session_and_timeout . size ( ) ;
2021-03-04 13:02:30 +00:00
writeBinary ( size , out ) ;
2021-03-01 13:33:34 +00:00
for ( const auto & [ session_id , timeout ] : snapshot . session_and_timeout )
2021-01-21 11:07:55 +00:00
{
2021-03-04 13:02:30 +00:00
writeBinary ( session_id , out ) ;
writeBinary ( timeout , out ) ;
2021-01-21 11:07:55 +00:00
}
}
2021-03-29 08:24:56 +00:00
SnapshotMetadataPtr KeeperStorageSnapshot : : deserialize ( KeeperStorage & storage , ReadBuffer & in )
2021-01-21 11:07:55 +00:00
{
2021-03-01 13:33:34 +00:00
uint8_t version ;
2021-03-04 13:02:30 +00:00
readBinary ( version , in ) ;
2021-03-01 13:33:34 +00:00
if ( static_cast < SnapshotVersion > ( version ) > SnapshotVersion : : V0 )
throw Exception ( ErrorCodes : : UNKNOWN_FORMAT_VERSION , " Unsupported snapshot version {} " , version ) ;
2021-03-02 14:30:56 +00:00
SnapshotMetadataPtr result = deserializeSnapshotMetadata ( in ) ;
2021-03-04 11:22:59 +00:00
int64_t session_id ;
2021-03-04 13:02:30 +00:00
readBinary ( session_id , in ) ;
2021-03-04 11:22:59 +00:00
storage . zxid = result - > get_last_log_idx ( ) ;
2021-03-01 13:33:34 +00:00
storage . session_id_counter = session_id ;
size_t snapshot_container_size ;
2021-03-04 13:02:30 +00:00
readBinary ( snapshot_container_size , in ) ;
2021-03-01 13:33:34 +00:00
size_t current_size = 0 ;
while ( current_size < snapshot_container_size )
2021-01-21 11:07:55 +00:00
{
2021-03-01 13:33:34 +00:00
std : : string path ;
2021-03-04 13:02:30 +00:00
readBinary ( path , in ) ;
2021-03-29 08:24:56 +00:00
KeeperStorage : : Node node ;
2021-03-01 13:33:34 +00:00
readNode ( node , in ) ;
storage . container . insertOrReplace ( path , node ) ;
2021-03-03 12:29:00 +00:00
if ( node . stat . ephemeralOwner ! = 0 )
storage . ephemerals [ node . stat . ephemeralOwner ] . insert ( path ) ;
2021-03-01 13:33:34 +00:00
current_size + + ;
2021-01-21 11:07:55 +00:00
}
2021-03-03 12:21:21 +00:00
2021-03-02 15:19:05 +00:00
for ( const auto & itr : storage . container )
{
if ( itr . key ! = " / " )
{
auto parent_path = parentPath ( itr . key ) ;
2021-03-29 08:24:56 +00:00
storage . container . updateValue ( parent_path , [ & path = itr . key ] ( KeeperStorage : : Node & value ) { value . children . insert ( getBaseName ( path ) ) ; } ) ;
2021-03-02 15:19:05 +00:00
}
}
2021-03-01 13:33:34 +00:00
size_t active_sessions_size ;
2021-03-04 13:02:30 +00:00
readBinary ( active_sessions_size , in ) ;
2021-03-01 13:33:34 +00:00
2021-03-01 15:32:27 +00:00
size_t current_session_size = 0 ;
2021-03-01 13:33:34 +00:00
while ( current_session_size < active_sessions_size )
{
int64_t active_session_id , timeout ;
2021-03-04 13:02:30 +00:00
readBinary ( active_session_id , in ) ;
readBinary ( timeout , in ) ;
2021-03-01 13:33:34 +00:00
storage . addSessionID ( active_session_id , timeout ) ;
current_session_size + + ;
}
2021-03-02 14:30:56 +00:00
return result ;
2021-03-01 13:33:34 +00:00
}
2021-04-08 14:17:57 +00:00
KeeperStorageSnapshot : : KeeperStorageSnapshot ( KeeperStorage * storage_ , uint64_t up_to_log_idx_ )
2021-03-01 13:33:34 +00:00
: storage ( storage_ )
2021-03-02 14:30:56 +00:00
, snapshot_meta ( std : : make_shared < SnapshotMetadata > ( up_to_log_idx_ , 0 , std : : make_shared < nuraft : : cluster_config > ( ) ) )
, session_id ( storage - > session_id_counter )
{
storage - > enableSnapshotMode ( ) ;
snapshot_container_size = storage - > container . snapshotSize ( ) ;
begin = storage - > getSnapshotIteratorBegin ( ) ;
session_and_timeout = storage - > getActiveSessions ( ) ;
}
2021-03-29 08:24:56 +00:00
KeeperStorageSnapshot : : KeeperStorageSnapshot ( KeeperStorage * storage_ , const SnapshotMetadataPtr & snapshot_meta_ )
2021-03-02 14:30:56 +00:00
: storage ( storage_ )
, snapshot_meta ( snapshot_meta_ )
2021-03-01 13:33:34 +00:00
, session_id ( storage - > session_id_counter )
{
storage - > enableSnapshotMode ( ) ;
snapshot_container_size = storage - > container . snapshotSize ( ) ;
begin = storage - > getSnapshotIteratorBegin ( ) ;
session_and_timeout = storage - > getActiveSessions ( ) ;
}
2021-03-29 08:24:56 +00:00
KeeperStorageSnapshot : : ~ KeeperStorageSnapshot ( )
2021-03-01 13:33:34 +00:00
{
storage - > disableSnapshotMode ( ) ;
2021-01-21 11:07:55 +00:00
}
2021-03-29 08:24:56 +00:00
KeeperSnapshotManager : : KeeperSnapshotManager ( const std : : string & snapshots_path_ , size_t snapshots_to_keep_ , size_t storage_tick_time_ )
2021-03-01 13:33:34 +00:00
: snapshots_path ( snapshots_path_ )
2021-03-01 14:54:08 +00:00
, snapshots_to_keep ( snapshots_to_keep_ )
2021-03-19 08:08:43 +00:00
, storage_tick_time ( storage_tick_time_ )
2021-03-01 13:33:34 +00:00
{
namespace fs = std : : filesystem ;
if ( ! fs : : exists ( snapshots_path ) )
fs : : create_directories ( snapshots_path ) ;
for ( const auto & p : fs : : directory_iterator ( snapshots_path ) )
{
2021-03-03 11:10:24 +00:00
if ( startsWith ( p . path ( ) , " tmp_ " ) ) /// Unfinished tmp files
{
std : : filesystem : : remove ( p ) ;
continue ;
}
2021-03-01 13:33:34 +00:00
size_t snapshot_up_to = getSnapshotPathUpToLogIdx ( p . path ( ) ) ;
existing_snapshots [ snapshot_up_to ] = p . path ( ) ;
}
2021-03-01 14:54:08 +00:00
removeOutdatedSnapshotsIfNeeded ( ) ;
2021-03-01 13:33:34 +00:00
}
2021-04-08 14:17:57 +00:00
std : : string KeeperSnapshotManager : : serializeSnapshotBufferToDisk ( nuraft : : buffer & buffer , uint64_t up_to_log_idx )
2021-03-01 13:33:34 +00:00
{
ReadBufferFromNuraftBuffer reader ( buffer ) ;
2021-03-03 11:10:24 +00:00
auto snapshot_file_name = getSnapshotFileName ( up_to_log_idx ) ;
auto tmp_snapshot_file_name = " tmp_ " + snapshot_file_name ;
std : : string tmp_snapshot_path = std : : filesystem : : path { snapshots_path } / tmp_snapshot_file_name ;
std : : string new_snapshot_path = std : : filesystem : : path { snapshots_path } / snapshot_file_name ;
2021-03-01 13:33:34 +00:00
2021-03-03 11:10:24 +00:00
WriteBufferFromFile plain_buf ( tmp_snapshot_path ) ;
2021-03-01 13:33:34 +00:00
copyData ( reader , plain_buf ) ;
plain_buf . sync ( ) ;
2021-03-03 11:10:24 +00:00
std : : filesystem : : rename ( tmp_snapshot_path , new_snapshot_path ) ;
2021-03-01 14:40:32 +00:00
existing_snapshots . emplace ( up_to_log_idx , new_snapshot_path ) ;
2021-03-01 14:54:08 +00:00
removeOutdatedSnapshotsIfNeeded ( ) ;
2021-03-01 13:33:34 +00:00
return new_snapshot_path ;
}
2021-03-29 08:24:56 +00:00
nuraft : : ptr < nuraft : : buffer > KeeperSnapshotManager : : deserializeLatestSnapshotBufferFromDisk ( )
2021-03-02 14:30:56 +00:00
{
2021-03-03 11:10:24 +00:00
while ( ! existing_snapshots . empty ( ) )
2021-03-02 14:30:56 +00:00
{
2021-03-03 11:10:24 +00:00
auto latest_itr = existing_snapshots . rbegin ( ) ;
try
{
return deserializeSnapshotBufferFromDisk ( latest_itr - > first ) ;
}
catch ( const DB : : Exception & )
{
std : : filesystem : : remove ( latest_itr - > second ) ;
existing_snapshots . erase ( latest_itr - > first ) ;
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
2021-03-02 14:30:56 +00:00
}
2021-03-03 11:10:24 +00:00
2021-03-02 14:30:56 +00:00
return nullptr ;
}
2021-04-08 14:17:57 +00:00
nuraft : : ptr < nuraft : : buffer > KeeperSnapshotManager : : deserializeSnapshotBufferFromDisk ( uint64_t up_to_log_idx ) const
2021-03-01 13:33:34 +00:00
{
const std : : string & snapshot_path = existing_snapshots . at ( up_to_log_idx ) ;
WriteBufferFromNuraftBuffer writer ;
ReadBufferFromFile reader ( snapshot_path ) ;
copyData ( reader , writer ) ;
return writer . getBuffer ( ) ;
}
2021-03-29 08:24:56 +00:00
nuraft : : ptr < nuraft : : buffer > KeeperSnapshotManager : : serializeSnapshotToBuffer ( const KeeperStorageSnapshot & snapshot )
2021-03-01 13:33:34 +00:00
{
WriteBufferFromNuraftBuffer writer ;
CompressedWriteBuffer compressed_writer ( writer ) ;
2021-03-29 08:24:56 +00:00
KeeperStorageSnapshot : : serialize ( snapshot , compressed_writer ) ;
2021-03-01 14:40:32 +00:00
compressed_writer . finalize ( ) ;
2021-03-01 13:33:34 +00:00
return writer . getBuffer ( ) ;
}
2021-03-29 08:24:56 +00:00
SnapshotMetaAndStorage KeeperSnapshotManager : : deserializeSnapshotFromBuffer ( nuraft : : ptr < nuraft : : buffer > buffer ) const
2021-03-01 13:33:34 +00:00
{
ReadBufferFromNuraftBuffer reader ( buffer ) ;
CompressedReadBuffer compressed_reader ( reader ) ;
2021-03-29 08:24:56 +00:00
auto storage = std : : make_unique < KeeperStorage > ( storage_tick_time ) ;
auto snapshot_metadata = KeeperStorageSnapshot : : deserialize ( * storage , compressed_reader ) ;
2021-03-19 08:08:43 +00:00
return std : : make_pair ( snapshot_metadata , std : : move ( storage ) ) ;
2021-03-01 13:33:34 +00:00
}
2021-03-29 08:24:56 +00:00
SnapshotMetaAndStorage KeeperSnapshotManager : : restoreFromLatestSnapshot ( )
2021-03-01 13:33:34 +00:00
{
if ( existing_snapshots . empty ( ) )
2021-03-19 08:08:43 +00:00
return { } ;
2021-03-01 13:33:34 +00:00
2021-03-03 11:10:24 +00:00
auto buffer = deserializeLatestSnapshotBufferFromDisk ( ) ;
if ( ! buffer )
2021-03-19 08:08:43 +00:00
return { } ;
return deserializeSnapshotFromBuffer ( buffer ) ;
2021-03-01 13:33:34 +00:00
}
2021-03-29 08:24:56 +00:00
void KeeperSnapshotManager : : removeOutdatedSnapshotsIfNeeded ( )
2021-03-01 14:54:08 +00:00
{
while ( existing_snapshots . size ( ) > snapshots_to_keep )
2021-03-03 11:10:24 +00:00
removeSnapshot ( existing_snapshots . begin ( ) - > first ) ;
}
2021-04-08 14:17:57 +00:00
void KeeperSnapshotManager : : removeSnapshot ( uint64_t log_idx )
2021-03-03 11:10:24 +00:00
{
auto itr = existing_snapshots . find ( log_idx ) ;
if ( itr = = existing_snapshots . end ( ) )
throw Exception ( ErrorCodes : : UNKNOWN_SNAPSHOT , " Unknown snapshot with log index {} " , log_idx ) ;
std : : filesystem : : remove ( itr - > second ) ;
existing_snapshots . erase ( itr ) ;
2021-03-01 14:54:08 +00:00
}
2021-03-01 13:33:34 +00:00
2021-01-21 11:07:55 +00:00
}