2021-03-03 11:22:38 +00:00
# include <Coordination/NuKeeperSnapshotManager.h>
2021-01-21 11:07:55 +00:00
# include <IO/WriteHelpers.h>
2021-03-01 13:33:34 +00:00
# include <Compression/CompressedReadBuffer.h>
# include <Compression/CompressedWriteBuffer.h>
2021-01-21 11:07:55 +00:00
# include <IO/ReadHelpers.h>
# include <Common/ZooKeeper/ZooKeeperIO.h>
2021-03-01 13:33:34 +00:00
# include <Coordination/ReadBufferFromNuraftBuffer.h>
# include <Coordination/WriteBufferFromNuraftBuffer.h>
# include <IO/WriteBufferFromFile.h>
# include <IO/ReadBufferFromFile.h>
# include <IO/copyData.h>
# include <filesystem>
2021-01-21 11:07:55 +00:00
namespace DB
{
2021-03-01 13:33:34 +00:00
namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT_VERSION ;
2021-03-03 11:10:24 +00:00
extern const int UNKNOWN_SNAPSHOT ;
2021-03-01 13:33:34 +00:00
}
2021-01-21 11:07:55 +00:00
namespace
{
2021-03-01 13:33:34 +00:00
size_t getSnapshotPathUpToLogIdx ( const String & snapshot_path )
{
std : : filesystem : : path path ( snapshot_path ) ;
std : : string filename = path . stem ( ) ;
Strings name_parts ;
splitInto < ' _ ' > ( name_parts , filename ) ;
return parse < size_t > ( name_parts [ 1 ] ) ;
}
std : : string getSnapshotFileName ( size_t up_to_log_idx )
{
return std : : string { " snapshot_ " } + std : : to_string ( up_to_log_idx ) + " .bin " ;
}
2021-03-03 12:21:21 +00:00
std : : string getBaseName ( const String & path )
{
size_t basename_start = path . rfind ( ' / ' ) ;
return std : : string { & path [ basename_start + 1 ] , path . length ( ) - basename_start - 1 } ;
}
2021-03-01 13:33:34 +00:00
String parentPath ( const String & path )
{
auto rslash_pos = path . rfind ( ' / ' ) ;
if ( rslash_pos > 0 )
return path . substr ( 0 , rslash_pos ) ;
return " / " ;
}
2021-02-26 14:54:59 +00:00
void writeNode ( const NuKeeperStorage : : Node & node , WriteBuffer & out )
{
2021-03-03 16:27:08 +00:00
/// FIXME why we store them in network order?
2021-02-26 14:54:59 +00:00
Coordination : : write ( node . data , out ) ;
Coordination : : write ( node . acls , out ) ;
Coordination : : write ( node . is_sequental , out ) ;
Coordination : : write ( node . stat , out ) ;
Coordination : : write ( node . seq_num , out ) ;
}
2021-01-21 11:07:55 +00:00
2021-02-26 14:54:59 +00:00
void readNode ( NuKeeperStorage : : Node & node , ReadBuffer & in )
{
2021-03-03 16:27:08 +00:00
/// FIXME why we store them in network order?
2021-02-26 14:54:59 +00:00
Coordination : : read ( node . data , in ) ;
Coordination : : read ( node . acls , in ) ;
Coordination : : read ( node . is_sequental , in ) ;
Coordination : : read ( node . stat , in ) ;
Coordination : : read ( node . seq_num , in ) ;
}
2021-03-02 14:30:56 +00:00
void serializeSnapshotMetadata ( const SnapshotMetadataPtr & snapshot_meta , WriteBuffer & out )
{
auto buffer = snapshot_meta - > serialize ( ) ;
Coordination : : write ( reinterpret_cast < const char * > ( buffer - > data_begin ( ) ) , buffer - > size ( ) , out ) ;
}
SnapshotMetadataPtr deserializeSnapshotMetadata ( ReadBuffer & in )
{
2021-03-03 12:29:00 +00:00
/// FIXME double copy (alesap)
2021-03-02 14:30:56 +00:00
std : : string data ;
Coordination : : read ( data , in ) ;
auto buffer = nuraft : : buffer : : alloc ( data . size ( ) ) ;
buffer - > put_raw ( reinterpret_cast < const nuraft : : byte * > ( data . c_str ( ) ) , data . size ( ) ) ;
buffer - > pos ( 0 ) ;
return SnapshotMetadata : : deserialize ( * buffer ) ;
}
2021-01-21 11:07:55 +00:00
}
2021-03-01 13:33:34 +00:00
void NuKeeperStorageSnapshot : : serialize ( const NuKeeperStorageSnapshot & snapshot , WriteBuffer & out )
2021-01-21 11:07:55 +00:00
{
2021-03-01 13:33:34 +00:00
Coordination : : write ( static_cast < uint8_t > ( snapshot . version ) , out ) ;
2021-03-02 14:30:56 +00:00
serializeSnapshotMetadata ( snapshot . snapshot_meta , out ) ;
2021-03-01 13:33:34 +00:00
Coordination : : write ( snapshot . session_id , out ) ;
Coordination : : write ( snapshot . snapshot_container_size , out ) ;
2021-03-01 15:32:27 +00:00
size_t counter = 0 ;
for ( auto it = snapshot . begin ; counter < snapshot . snapshot_container_size ; + + it , + + counter )
2021-03-01 13:33:34 +00:00
{
const auto & path = it - > key ;
const auto & node = it - > value ;
2021-03-04 11:22:59 +00:00
if ( static_cast < size_t > ( node . stat . mzxid ) > snapshot . snapshot_meta - > get_last_log_idx ( ) )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Trying to serialize node with mzxid {}, but last snapshot index {} " , node . stat . mzxid , snapshot . snapshot_meta - > get_last_log_idx ( ) ) ;
2021-03-03 12:21:21 +00:00
2021-03-01 13:33:34 +00:00
Coordination : : write ( path , out ) ;
writeNode ( node , out ) ;
}
2021-03-01 15:32:27 +00:00
size_t size = snapshot . session_and_timeout . size ( ) ;
Coordination : : write ( size , out ) ;
2021-03-01 13:33:34 +00:00
for ( const auto & [ session_id , timeout ] : snapshot . session_and_timeout )
2021-01-21 11:07:55 +00:00
{
Coordination : : write ( session_id , out ) ;
2021-03-01 13:33:34 +00:00
Coordination : : write ( timeout , out ) ;
2021-01-21 11:07:55 +00:00
}
}
2021-03-02 14:30:56 +00:00
SnapshotMetadataPtr NuKeeperStorageSnapshot : : deserialize ( NuKeeperStorage & storage , ReadBuffer & in )
2021-01-21 11:07:55 +00:00
{
2021-03-01 13:33:34 +00:00
uint8_t version ;
Coordination : : read ( version , in ) ;
if ( static_cast < SnapshotVersion > ( version ) > SnapshotVersion : : V0 )
throw Exception ( ErrorCodes : : UNKNOWN_FORMAT_VERSION , " Unsupported snapshot version {} " , version ) ;
2021-03-02 14:30:56 +00:00
SnapshotMetadataPtr result = deserializeSnapshotMetadata ( in ) ;
2021-03-04 11:22:59 +00:00
int64_t session_id ;
2021-03-01 13:33:34 +00:00
Coordination : : read ( session_id , in ) ;
2021-03-04 11:22:59 +00:00
storage . zxid = result - > get_last_log_idx ( ) ;
2021-03-01 13:33:34 +00:00
storage . session_id_counter = session_id ;
size_t snapshot_container_size ;
Coordination : : read ( snapshot_container_size , in ) ;
size_t current_size = 0 ;
while ( current_size < snapshot_container_size )
2021-01-21 11:07:55 +00:00
{
2021-03-01 13:33:34 +00:00
std : : string path ;
Coordination : : read ( path , in ) ;
NuKeeperStorage : : Node node ;
readNode ( node , in ) ;
storage . container . insertOrReplace ( path , node ) ;
2021-03-03 12:29:00 +00:00
if ( node . stat . ephemeralOwner ! = 0 )
storage . ephemerals [ node . stat . ephemeralOwner ] . insert ( path ) ;
2021-03-01 13:33:34 +00:00
current_size + + ;
2021-01-21 11:07:55 +00:00
}
2021-03-03 12:21:21 +00:00
2021-03-02 15:19:05 +00:00
for ( const auto & itr : storage . container )
{
if ( itr . key ! = " / " )
{
auto parent_path = parentPath ( itr . key ) ;
2021-03-03 12:21:21 +00:00
storage . container . updateValue ( parent_path , [ & path = itr . key ] ( NuKeeperStorage : : Node & value ) { value . children . insert ( getBaseName ( path ) ) ; } ) ;
2021-03-02 15:19:05 +00:00
}
}
2021-03-01 13:33:34 +00:00
size_t active_sessions_size ;
Coordination : : read ( active_sessions_size , in ) ;
2021-03-01 15:32:27 +00:00
size_t current_session_size = 0 ;
2021-03-01 13:33:34 +00:00
while ( current_session_size < active_sessions_size )
{
int64_t active_session_id , timeout ;
Coordination : : read ( active_session_id , in ) ;
Coordination : : read ( timeout , in ) ;
storage . addSessionID ( active_session_id , timeout ) ;
current_session_size + + ;
}
2021-03-02 14:30:56 +00:00
return result ;
2021-03-01 13:33:34 +00:00
}
NuKeeperStorageSnapshot : : NuKeeperStorageSnapshot ( NuKeeperStorage * storage_ , size_t up_to_log_idx_ )
: storage ( storage_ )
2021-03-02 14:30:56 +00:00
, snapshot_meta ( std : : make_shared < SnapshotMetadata > ( up_to_log_idx_ , 0 , std : : make_shared < nuraft : : cluster_config > ( ) ) )
, session_id ( storage - > session_id_counter )
{
storage - > enableSnapshotMode ( ) ;
snapshot_container_size = storage - > container . snapshotSize ( ) ;
begin = storage - > getSnapshotIteratorBegin ( ) ;
session_and_timeout = storage - > getActiveSessions ( ) ;
}
NuKeeperStorageSnapshot : : NuKeeperStorageSnapshot ( NuKeeperStorage * storage_ , const SnapshotMetadataPtr & snapshot_meta_ )
: storage ( storage_ )
, snapshot_meta ( snapshot_meta_ )
2021-03-01 13:33:34 +00:00
, session_id ( storage - > session_id_counter )
{
storage - > enableSnapshotMode ( ) ;
snapshot_container_size = storage - > container . snapshotSize ( ) ;
begin = storage - > getSnapshotIteratorBegin ( ) ;
session_and_timeout = storage - > getActiveSessions ( ) ;
}
NuKeeperStorageSnapshot : : ~ NuKeeperStorageSnapshot ( )
{
storage - > clearGarbageAfterSnapshot ( ) ;
storage - > disableSnapshotMode ( ) ;
2021-01-21 11:07:55 +00:00
}
2021-03-01 14:54:08 +00:00
NuKeeperSnapshotManager : : NuKeeperSnapshotManager ( const std : : string & snapshots_path_ , size_t snapshots_to_keep_ )
2021-03-01 13:33:34 +00:00
: snapshots_path ( snapshots_path_ )
2021-03-01 14:54:08 +00:00
, snapshots_to_keep ( snapshots_to_keep_ )
2021-03-01 13:33:34 +00:00
{
namespace fs = std : : filesystem ;
if ( ! fs : : exists ( snapshots_path ) )
fs : : create_directories ( snapshots_path ) ;
for ( const auto & p : fs : : directory_iterator ( snapshots_path ) )
{
2021-03-03 11:10:24 +00:00
if ( startsWith ( p . path ( ) , " tmp_ " ) ) /// Unfinished tmp files
{
std : : filesystem : : remove ( p ) ;
continue ;
}
2021-03-01 13:33:34 +00:00
size_t snapshot_up_to = getSnapshotPathUpToLogIdx ( p . path ( ) ) ;
existing_snapshots [ snapshot_up_to ] = p . path ( ) ;
}
2021-03-01 14:54:08 +00:00
removeOutdatedSnapshotsIfNeeded ( ) ;
2021-03-01 13:33:34 +00:00
}
std : : string NuKeeperSnapshotManager : : serializeSnapshotBufferToDisk ( nuraft : : buffer & buffer , size_t up_to_log_idx )
{
ReadBufferFromNuraftBuffer reader ( buffer ) ;
2021-03-03 11:10:24 +00:00
auto snapshot_file_name = getSnapshotFileName ( up_to_log_idx ) ;
auto tmp_snapshot_file_name = " tmp_ " + snapshot_file_name ;
std : : string tmp_snapshot_path = std : : filesystem : : path { snapshots_path } / tmp_snapshot_file_name ;
std : : string new_snapshot_path = std : : filesystem : : path { snapshots_path } / snapshot_file_name ;
2021-03-01 13:33:34 +00:00
2021-03-03 11:10:24 +00:00
WriteBufferFromFile plain_buf ( tmp_snapshot_path ) ;
2021-03-01 13:33:34 +00:00
copyData ( reader , plain_buf ) ;
plain_buf . sync ( ) ;
2021-03-03 11:10:24 +00:00
std : : filesystem : : rename ( tmp_snapshot_path , new_snapshot_path ) ;
2021-03-01 14:40:32 +00:00
existing_snapshots . emplace ( up_to_log_idx , new_snapshot_path ) ;
2021-03-01 14:54:08 +00:00
removeOutdatedSnapshotsIfNeeded ( ) ;
2021-03-01 13:33:34 +00:00
return new_snapshot_path ;
}
2021-03-03 11:10:24 +00:00
nuraft : : ptr < nuraft : : buffer > NuKeeperSnapshotManager : : deserializeLatestSnapshotBufferFromDisk ( )
2021-03-02 14:30:56 +00:00
{
2021-03-03 11:10:24 +00:00
while ( ! existing_snapshots . empty ( ) )
2021-03-02 14:30:56 +00:00
{
2021-03-03 11:10:24 +00:00
auto latest_itr = existing_snapshots . rbegin ( ) ;
try
{
return deserializeSnapshotBufferFromDisk ( latest_itr - > first ) ;
}
catch ( const DB : : Exception & )
{
std : : filesystem : : remove ( latest_itr - > second ) ;
existing_snapshots . erase ( latest_itr - > first ) ;
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
}
2021-03-02 14:30:56 +00:00
}
2021-03-03 11:10:24 +00:00
2021-03-02 14:30:56 +00:00
return nullptr ;
}
2021-03-01 13:33:34 +00:00
nuraft : : ptr < nuraft : : buffer > NuKeeperSnapshotManager : : deserializeSnapshotBufferFromDisk ( size_t up_to_log_idx ) const
{
const std : : string & snapshot_path = existing_snapshots . at ( up_to_log_idx ) ;
WriteBufferFromNuraftBuffer writer ;
ReadBufferFromFile reader ( snapshot_path ) ;
copyData ( reader , writer ) ;
return writer . getBuffer ( ) ;
}
nuraft : : ptr < nuraft : : buffer > NuKeeperSnapshotManager : : serializeSnapshotToBuffer ( const NuKeeperStorageSnapshot & snapshot )
{
WriteBufferFromNuraftBuffer writer ;
CompressedWriteBuffer compressed_writer ( writer ) ;
NuKeeperStorageSnapshot : : serialize ( snapshot , compressed_writer ) ;
2021-03-01 14:40:32 +00:00
compressed_writer . finalize ( ) ;
2021-03-01 13:33:34 +00:00
return writer . getBuffer ( ) ;
}
2021-03-02 14:30:56 +00:00
SnapshotMetadataPtr NuKeeperSnapshotManager : : deserializeSnapshotFromBuffer ( NuKeeperStorage * storage , nuraft : : ptr < nuraft : : buffer > buffer )
2021-03-01 13:33:34 +00:00
{
ReadBufferFromNuraftBuffer reader ( buffer ) ;
CompressedReadBuffer compressed_reader ( reader ) ;
2021-03-02 14:30:56 +00:00
return NuKeeperStorageSnapshot : : deserialize ( * storage , compressed_reader ) ;
2021-03-01 13:33:34 +00:00
}
2021-03-03 11:10:24 +00:00
SnapshotMetadataPtr NuKeeperSnapshotManager : : restoreFromLatestSnapshot ( NuKeeperStorage * storage )
2021-03-01 13:33:34 +00:00
{
if ( existing_snapshots . empty ( ) )
2021-03-02 14:30:56 +00:00
return nullptr ;
2021-03-01 13:33:34 +00:00
2021-03-03 11:10:24 +00:00
auto buffer = deserializeLatestSnapshotBufferFromDisk ( ) ;
if ( ! buffer )
return nullptr ;
2021-03-02 14:30:56 +00:00
return deserializeSnapshotFromBuffer ( storage , buffer ) ;
2021-03-01 13:33:34 +00:00
}
2021-03-01 14:54:08 +00:00
void NuKeeperSnapshotManager : : removeOutdatedSnapshotsIfNeeded ( )
{
while ( existing_snapshots . size ( ) > snapshots_to_keep )
2021-03-03 11:10:24 +00:00
removeSnapshot ( existing_snapshots . begin ( ) - > first ) ;
}
void NuKeeperSnapshotManager : : removeSnapshot ( size_t log_idx )
{
auto itr = existing_snapshots . find ( log_idx ) ;
if ( itr = = existing_snapshots . end ( ) )
throw Exception ( ErrorCodes : : UNKNOWN_SNAPSHOT , " Unknown snapshot with log index {} " , log_idx ) ;
std : : filesystem : : remove ( itr - > second ) ;
existing_snapshots . erase ( itr ) ;
2021-03-01 14:54:08 +00:00
}
2021-03-01 13:33:34 +00:00
2021-01-21 11:07:55 +00:00
}