2021-06-17 13:29:11 +00:00
# include <Coordination/ZooKeeperDataReader.h>
2021-12-30 16:21:49 +00:00
2021-06-17 13:29:11 +00:00
# include <filesystem>
# include <cstdlib>
2021-12-30 16:21:49 +00:00
# include <string>
2021-06-17 13:29:11 +00:00
# include <IO/ReadHelpers.h>
# include <Common/ZooKeeper/ZooKeeperIO.h>
2023-04-08 04:47:21 +00:00
# include <Common/logger_useful.h>
2021-06-17 13:29:11 +00:00
# include <IO/ReadBufferFromFile.h>
2024-02-20 13:21:29 +00:00
# include <Coordination/KeeperCommon.h>
2021-06-17 13:29:11 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED ;
extern const int CORRUPTED_DATA ;
}
int64_t getZxidFromName ( const std : : string & filename )
{
std : : filesystem : : path path ( filename ) ;
std : : string extension = path . extension ( ) ;
char * end ;
int64_t zxid = std : : strtoul ( extension . data ( ) + 1 , & end , 16 ) ;
return zxid ;
}
void deserializeSnapshotMagic ( ReadBuffer & in )
{
int32_t magic_header , version ;
int64_t dbid ;
Coordination : : read ( magic_header , in ) ;
Coordination : : read ( version , in ) ;
2021-06-17 16:32:50 +00:00
if ( version ! = 2 )
throw Exception ( ErrorCodes : : NOT_IMPLEMENTED , " Cannot deserialize ZooKeeper data other than version 2, got version {} " , version ) ;
2021-06-17 13:29:11 +00:00
Coordination : : read ( dbid , in ) ;
static constexpr int32_t SNP_HEADER = 1514885966 ; /// "ZKSN"
if ( magic_header ! = SNP_HEADER )
2023-07-25 04:10:04 +00:00
throw Exception ( ErrorCodes : : CORRUPTED_DATA , " Incorrect magic header in file, expected {}, got {} " , SNP_HEADER , magic_header ) ;
2021-06-17 13:29:11 +00:00
}
2023-11-11 18:37:30 +00:00
template < typename Storage >
int64_t deserializeSessionAndTimeout ( Storage & storage , ReadBuffer & in )
2021-06-17 13:29:11 +00:00
{
int32_t count ;
Coordination : : read ( count , in ) ;
int64_t max_session_id = 0 ;
while ( count > 0 )
{
int64_t session_id ;
int32_t timeout ;
Coordination : : read ( session_id , in ) ;
Coordination : : read ( timeout , in ) ;
storage . addSessionID ( session_id , timeout ) ;
max_session_id = std : : max ( session_id , max_session_id ) ;
count - - ;
}
return max_session_id ;
}
2023-11-11 18:37:30 +00:00
template < typename Storage >
void deserializeACLMap ( Storage & storage , ReadBuffer & in )
2021-06-17 13:29:11 +00:00
{
int32_t count ;
Coordination : : read ( count , in ) ;
while ( count > 0 )
{
int64_t map_index ;
Coordination : : read ( map_index , in ) ;
Coordination : : ACLs acls ;
int32_t acls_len ;
Coordination : : read ( acls_len , in ) ;
while ( acls_len > 0 )
{
Coordination : : ACL acl ;
Coordination : : read ( acl . permissions , in ) ;
Coordination : : read ( acl . scheme , in ) ;
Coordination : : read ( acl . id , in ) ;
acls . push_back ( acl ) ;
acls_len - - ;
}
storage . acl_map . addMapping ( map_index , acls ) ;
count - - ;
}
}
2023-11-11 18:37:30 +00:00
template < typename Storage >
2024-01-31 23:49:18 +00:00
int64_t deserializeStorageData ( Storage & storage , ReadBuffer & in , LoggerPtr log )
2021-06-17 13:29:11 +00:00
{
int64_t max_zxid = 0 ;
std : : string path ;
Coordination : : read ( path , in ) ;
size_t count = 0 ;
while ( path ! = " / " )
{
2023-11-11 18:37:30 +00:00
typename Storage : : Node node { } ;
2022-04-05 06:27:03 +00:00
String data ;
Coordination : : read ( data , in ) ;
2024-02-05 14:30:00 +00:00
node . setData ( data ) ;
2021-06-17 13:29:11 +00:00
Coordination : : read ( node . acl_id , in ) ;
/// Deserialize stat
2024-02-05 14:30:00 +00:00
Coordination : : read ( node . czxid , in ) ;
Coordination : : read ( node . mzxid , in ) ;
2021-06-17 13:29:11 +00:00
/// For some reason ZXID specified in filename can be smaller
/// then actual zxid from nodes. In this case we will use zxid from nodes.
2024-02-05 14:30:00 +00:00
max_zxid = std : : max ( max_zxid , node . mzxid ) ;
int64_t ctime ;
Coordination : : read ( ctime , in ) ;
node . setCtime ( ctime ) ;
2024-02-09 09:40:43 +00:00
Coordination : : read ( node . mtime , in ) ;
2024-02-05 14:30:00 +00:00
Coordination : : read ( node . version , in ) ;
Coordination : : read ( node . cversion , in ) ;
Coordination : : read ( node . aversion , in ) ;
int64_t ephemeral_owner ;
Coordination : : read ( ephemeral_owner , in ) ;
if ( ephemeral_owner ! = 0 )
2024-02-07 14:57:52 +00:00
node . setEphemeralOwner ( ephemeral_owner ) ;
2024-02-05 14:30:00 +00:00
Coordination : : read ( node . pzxid , in ) ;
2021-06-17 13:29:11 +00:00
if ( ! path . empty ( ) )
{
2024-02-07 14:57:52 +00:00
if ( ephemeral_owner = = 0 )
node . setSeqNum ( node . cversion ) ;
2021-06-17 13:29:11 +00:00
storage . container . insertOrReplace ( path , node ) ;
2024-02-05 14:30:00 +00:00
if ( ephemeral_owner ! = 0 )
storage . ephemerals [ ephemeral_owner ] . insert ( path ) ;
2021-06-17 13:29:11 +00:00
storage . acl_map . addUsage ( node . acl_id ) ;
}
Coordination : : read ( path , in ) ;
count + + ;
if ( count % 1000 = = 0 )
2021-06-17 16:32:50 +00:00
LOG_INFO ( log , " Deserialized nodes from snapshot: {} " , count ) ;
2021-06-17 13:29:11 +00:00
}
for ( const auto & itr : storage . container )
{
if ( itr . key ! = " / " )
{
2023-05-17 13:32:51 +00:00
auto parent_path = parentNodePath ( itr . key ) ;
2024-02-28 14:30:52 +00:00
storage . container . updateValue ( parent_path , [ my_path = itr . key ] ( typename Storage : : Node & value ) { value . addChild ( getBaseNodeName ( my_path ) ) ; value . increaseNumChildren ( ) ; } ) ;
2021-06-17 13:29:11 +00:00
}
}
return max_zxid ;
}
2023-11-11 18:37:30 +00:00
template < typename Storage >
2024-01-31 23:49:18 +00:00
void deserializeKeeperStorageFromSnapshot ( Storage & storage , const std : : string & snapshot_path , LoggerPtr log )
2021-06-17 13:29:11 +00:00
{
2021-06-17 16:32:50 +00:00
LOG_INFO ( log , " Deserializing storage snapshot {} " , snapshot_path ) ;
2021-06-17 13:29:11 +00:00
int64_t zxid = getZxidFromName ( snapshot_path ) ;
ReadBufferFromFile reader ( snapshot_path ) ;
deserializeSnapshotMagic ( reader ) ;
2021-06-17 16:32:50 +00:00
LOG_INFO ( log , " Magic deserialized, looks OK " ) ;
2021-06-17 13:29:11 +00:00
auto max_session_id = deserializeSessionAndTimeout ( storage , reader ) ;
2021-06-17 16:32:50 +00:00
LOG_INFO ( log , " Sessions and timeouts deserialized " ) ;
2021-06-17 13:29:11 +00:00
2022-01-13 09:31:54 +00:00
storage . session_id_counter = max_session_id + 1 ; /// session_id_counter pointer to next slot
2021-06-17 13:29:11 +00:00
deserializeACLMap ( storage , reader ) ;
2021-06-17 16:32:50 +00:00
LOG_INFO ( log , " ACLs deserialized " ) ;
2021-06-17 13:29:11 +00:00
2021-06-17 16:32:50 +00:00
LOG_INFO ( log , " Deserializing data from snapshot " ) ;
int64_t zxid_from_nodes = deserializeStorageData ( storage , reader , log ) ;
2021-07-09 13:00:50 +00:00
/// In ZooKeeper Snapshots can contain inconsistent state of storage. They call
/// this inconsistent state "fuzzy". So it's guaranteed that snapshot contain all
/// records up to zxid from snapshot name and also some records for future.
/// But it doesn't mean that we have just some state of storage from future (like zxid + 100 log records).
/// We have incorrect state of storage where some random log entries from future were applied....
///
/// In ZooKeeper they say that their transactions log is idempotent and can be applied to "fuzzy" state as is.
2021-07-10 08:42:25 +00:00
/// It's true but there is no any general invariant which produces this property. They just have ad-hoc "if's" which detects
2021-07-09 13:00:50 +00:00
/// "fuzzy" state inconsistencies and apply log records in special way. Several examples:
/// https://github.com/apache/zookeeper/blob/master/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java#L453-L463
/// https://github.com/apache/zookeeper/blob/master/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java#L476-L480
/// https://github.com/apache/zookeeper/blob/master/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java#L547-L549
if ( zxid_from_nodes > zxid )
2021-07-09 13:03:23 +00:00
LOG_WARNING ( log , " ZooKeeper snapshot was in inconsistent (fuzzy) state. Will try to apply log. ZooKeeper create non fuzzy snapshot with restart. You can just restart ZooKeeper server and get consistent version. " ) ;
2021-07-09 13:00:50 +00:00
storage . zxid = zxid ;
2021-06-17 16:32:50 +00:00
LOG_INFO ( log , " Finished, snapshot ZXID {} " , storage . zxid ) ;
2021-06-17 13:29:11 +00:00
}
2024-03-18 08:23:51 +00:00
namespace fs = std : : filesystem ;
2023-11-13 12:53:22 +00:00
2023-11-11 18:37:30 +00:00
template < typename Storage >
2024-01-31 23:49:18 +00:00
void deserializeKeeperStorageFromSnapshotsDir ( Storage & storage , const std : : string & path , LoggerPtr log )
2021-06-17 13:29:11 +00:00
{
std : : map < int64_t , std : : string > existing_snapshots ;
for ( const auto & p : fs : : directory_iterator ( path ) )
{
const auto & log_path = p . path ( ) ;
if ( ! log_path . has_filename ( ) | | ! startsWith ( log_path . filename ( ) , " snapshot. " ) )
continue ;
int64_t zxid = getZxidFromName ( log_path ) ;
existing_snapshots [ zxid ] = p . path ( ) ;
}
2021-06-17 16:32:50 +00:00
LOG_INFO ( log , " Totally have {} snapshots, will use latest " , existing_snapshots . size ( ) ) ;
2021-06-17 13:29:11 +00:00
/// deserialize only from latest snapshot
if ( ! existing_snapshots . empty ( ) )
2021-06-17 16:32:50 +00:00
deserializeKeeperStorageFromSnapshot ( storage , existing_snapshots . rbegin ( ) - > second , log ) ;
else
throw Exception ( ErrorCodes : : CORRUPTED_DATA , " No snapshots found on path {}. At least one snapshot must exist. " , path ) ;
2021-06-17 13:29:11 +00:00
}
void deserializeLogMagic ( ReadBuffer & in )
{
int32_t magic_header , version ;
int64_t dbid ;
Coordination : : read ( magic_header , in ) ;
Coordination : : read ( version , in ) ;
Coordination : : read ( dbid , in ) ;
static constexpr int32_t LOG_HEADER = 1514884167 ; /// "ZKLG"
if ( magic_header ! = LOG_HEADER )
2021-07-09 13:00:50 +00:00
throw Exception ( ErrorCodes : : CORRUPTED_DATA , " Incorrect magic header in file, expected {}, got {} " , LOG_HEADER , magic_header ) ;
2021-06-17 16:32:50 +00:00
if ( version ! = 2 )
2021-07-09 13:00:50 +00:00
throw Exception ( ErrorCodes : : NOT_IMPLEMENTED , " Cannot deserialize ZooKeeper data other than version 2, got version {} " , version ) ;
2021-06-17 13:29:11 +00:00
}
2021-07-09 13:00:50 +00:00
/// ZooKeeper transactions log differs from requests. The main reason: to store records in log
/// in some "finalized" state (for example with concrete versions).
///
/// Example:
/// class CreateTxn {
2021-06-17 13:29:11 +00:00
/// ustring path;
/// buffer data;
/// vector<org.apache.zookeeper.data.ACL> acl;
/// boolean ephemeral;
/// int parentCVersion;
/// }
/// But Create Request:
/// class CreateRequest {
/// ustring path;
/// buffer data;
/// vector<org.apache.zookeeper.data.ACL> acl;
/// int flags;
/// }
///
/// However type is the same OpNum...
///
/// Also there is a comment in ZooKeeper's code base about log structure, but
/// it's almost completely incorrect. Actual ZooKeeper log structure starting from version 3.6+:
///
/// Magic Header: "ZKLG" + 4 byte version + 8 byte dbid.
/// After that goes serialized transactions, in the following format:
/// 8 byte checksum
/// 4 byte transaction length
/// 8 byte session_id (author of the transaction)
/// 4 byte user XID
/// 8 byte ZXID
/// 8 byte transaction time
/// 4 byte transaction type (OpNum)
/// [Transaction body depending on transaction type]
/// 12 bytes tail (starting from 3.6+): 4 byte version + 8 byte checksum of data tree
/// 1 byte -- 0x42
///
/// Transaction body is quite simple for all kinds of transactions except
/// Multitransactions. Their structure is following:
/// 4 byte sub transactions count
/// 4 byte sub transaction length
/// [Transaction body depending on transaction type]
/// and so on
///
/// Gotchas:
///
/// 1) For some reason ZooKeeper store ErrorTxn's in log. It's
/// reasonable for Multitransactions, but why they store standalone errors
/// is not clear.
///
/// 2) For some reason there is no 12 bytes tail (version + checksum of
/// tree) after standalone ErrorTxn.
///
/// 3) The most strange thing: In one of our production logs (about 1.2GB
/// size) we have found Multitransaction with two sub transactions: Error1
/// and Error2, both -1 OpCode. Normal Error transaction has 4 bytes length
/// (for error code), but the Error1 has 550 bytes length. What is more
/// strange, that this 550 bytes obviously was a part of Create transaction,
/// but the operation code was -1. We have added debug prints to original
/// zookeeper (3.6.3) and found that it just reads 550 bytes of this "Error"
2023-10-15 13:43:02 +00:00
/// transaction, took the first 4 bytes as an error code (it was 79, non
2021-06-17 13:29:11 +00:00
/// existing code) and skip all remaining 546 bytes. NOTE: it looks like a bug
/// in ZooKeeper.
///
namespace
{
Coordination : : ZooKeeperRequestPtr deserializeCreateTxn ( ReadBuffer & in )
{
std : : shared_ptr < Coordination : : ZooKeeperCreateRequest > result = std : : make_shared < Coordination : : ZooKeeperCreateRequest > ( ) ;
Coordination : : read ( result - > path , in ) ;
Coordination : : read ( result - > data , in ) ;
Coordination : : read ( result - > acls , in ) ;
Coordination : : read ( result - > is_ephemeral , in ) ;
2021-07-09 13:00:50 +00:00
Coordination : : read ( result - > parent_cversion , in ) ;
result - > restored_from_zookeeper_log = true ;
2021-06-17 13:29:11 +00:00
return result ;
}
Coordination : : ZooKeeperRequestPtr deserializeDeleteTxn ( ReadBuffer & in )
{
std : : shared_ptr < Coordination : : ZooKeeperRemoveRequest > result = std : : make_shared < Coordination : : ZooKeeperRemoveRequest > ( ) ;
Coordination : : read ( result - > path , in ) ;
2021-07-09 13:00:50 +00:00
result - > restored_from_zookeeper_log = true ;
2021-06-17 13:29:11 +00:00
return result ;
}
Coordination : : ZooKeeperRequestPtr deserializeSetTxn ( ReadBuffer & in )
{
std : : shared_ptr < Coordination : : ZooKeeperSetRequest > result = std : : make_shared < Coordination : : ZooKeeperSetRequest > ( ) ;
Coordination : : read ( result - > path , in ) ;
Coordination : : read ( result - > data , in ) ;
Coordination : : read ( result - > version , in ) ;
2021-07-09 13:00:50 +00:00
result - > restored_from_zookeeper_log = true ;
2021-06-18 18:36:19 +00:00
/// It stores version + 1 (which should be, not for request)
result - > version - = 1 ;
2021-06-17 13:29:11 +00:00
return result ;
}
Coordination : : ZooKeeperRequestPtr deserializeCheckVersionTxn ( ReadBuffer & in )
{
std : : shared_ptr < Coordination : : ZooKeeperCheckRequest > result = std : : make_shared < Coordination : : ZooKeeperCheckRequest > ( ) ;
Coordination : : read ( result - > path , in ) ;
Coordination : : read ( result - > version , in ) ;
2021-07-09 13:00:50 +00:00
result - > restored_from_zookeeper_log = true ;
2021-09-16 10:34:13 +00:00
/// It stores version + 1 (which should be, not for request)
result - > version - = 1 ;
2021-06-17 13:29:11 +00:00
return result ;
}
Coordination : : ZooKeeperRequestPtr deserializeCreateSession ( ReadBuffer & in )
{
std : : shared_ptr < Coordination : : ZooKeeperSessionIDRequest > result = std : : make_shared < Coordination : : ZooKeeperSessionIDRequest > ( ) ;
int32_t timeout ;
Coordination : : read ( timeout , in ) ;
result - > session_timeout_ms = timeout ;
2021-07-09 13:00:50 +00:00
result - > restored_from_zookeeper_log = true ;
2021-06-17 13:29:11 +00:00
return result ;
}
2021-07-09 13:00:50 +00:00
Coordination : : ZooKeeperRequestPtr deserializeCloseSession ( ReadBuffer & in , bool empty )
2021-06-17 13:29:11 +00:00
{
std : : shared_ptr < Coordination : : ZooKeeperCloseRequest > result = std : : make_shared < Coordination : : ZooKeeperCloseRequest > ( ) ;
2021-07-09 13:00:50 +00:00
if ( ! empty )
{
std : : vector < std : : string > data ;
Coordination : : read ( data , in ) ;
}
result - > restored_from_zookeeper_log = true ;
2021-06-17 13:29:11 +00:00
return result ;
}
Coordination : : ZooKeeperRequestPtr deserializeErrorTxn ( ReadBuffer & in )
{
int32_t error ;
Coordination : : read ( error , in ) ;
return nullptr ;
}
2021-06-22 10:49:35 +00:00
Coordination : : ZooKeeperRequestPtr deserializeSetACLTxn ( ReadBuffer & in )
{
std : : shared_ptr < Coordination : : ZooKeeperSetACLRequest > result = std : : make_shared < Coordination : : ZooKeeperSetACLRequest > ( ) ;
Coordination : : read ( result - > path , in ) ;
Coordination : : read ( result - > acls , in ) ;
Coordination : : read ( result - > version , in ) ;
/// It stores version + 1 (which should be, not for request)
result - > version - = 1 ;
2021-07-09 13:00:50 +00:00
result - > restored_from_zookeeper_log = true ;
2021-06-22 10:49:35 +00:00
return result ;
}
2021-06-17 13:29:11 +00:00
Coordination : : ZooKeeperRequestPtr deserializeMultiTxn ( ReadBuffer & in ) ;
2021-07-09 13:00:50 +00:00
Coordination : : ZooKeeperRequestPtr deserializeTxnImpl ( ReadBuffer & in , bool subtxn , int64_t txn_length = 0 )
2021-06-17 13:29:11 +00:00
{
int32_t type ;
Coordination : : read ( type , in ) ;
2021-06-18 08:55:59 +00:00
Coordination : : ZooKeeperRequestPtr result = nullptr ;
2021-06-17 13:29:11 +00:00
int32_t sub_txn_length = 0 ;
if ( subtxn )
Coordination : : read ( sub_txn_length , in ) ;
2021-07-09 13:00:50 +00:00
bool empty_txn = ! subtxn & & txn_length = = 32 ; /// Possible for old-style CloseTxn's
if ( empty_txn & & type ! = - 11 )
throw Exception ( ErrorCodes : : NOT_IMPLEMENTED , " Empty non close-session transaction found " ) ;
2021-06-17 13:29:11 +00:00
int64_t in_count_before = in . count ( ) ;
switch ( type )
{
case 1 :
result = deserializeCreateTxn ( in ) ;
break ;
case 2 :
result = deserializeDeleteTxn ( in ) ;
break ;
case 5 :
result = deserializeSetTxn ( in ) ;
break ;
2021-06-22 10:49:35 +00:00
case 7 :
result = deserializeSetACLTxn ( in ) ;
break ;
2021-06-17 13:29:11 +00:00
case 13 :
result = deserializeCheckVersionTxn ( in ) ;
break ;
case 14 :
result = deserializeMultiTxn ( in ) ;
break ;
case - 10 :
result = deserializeCreateSession ( in ) ;
break ;
case - 11 :
2021-07-09 13:00:50 +00:00
result = deserializeCloseSession ( in , empty_txn ) ;
2021-06-17 13:29:11 +00:00
break ;
case - 1 :
result = deserializeErrorTxn ( in ) ;
break ;
default :
throw Exception ( ErrorCodes : : NOT_IMPLEMENTED , " Not implemented operation {} " , type ) ;
}
if ( subtxn )
{
int64_t bytes_read = in . count ( ) - in_count_before ;
if ( bytes_read < sub_txn_length )
in . ignore ( sub_txn_length - bytes_read ) ;
}
return result ;
}
Coordination : : ZooKeeperRequestPtr deserializeMultiTxn ( ReadBuffer & in )
{
int32_t length ;
Coordination : : read ( length , in ) ;
std : : shared_ptr < Coordination : : ZooKeeperMultiRequest > result = std : : make_shared < Coordination : : ZooKeeperMultiRequest > ( ) ;
while ( length > 0 )
{
auto subrequest = deserializeTxnImpl ( in , true ) ;
2021-06-18 18:36:19 +00:00
result - > requests . push_back ( subrequest ) ;
2021-06-17 13:29:11 +00:00
length - - ;
}
return result ;
}
bool isErrorRequest ( Coordination : : ZooKeeperRequestPtr request )
{
return request = = nullptr ;
}
bool hasErrorsInMultiRequest ( Coordination : : ZooKeeperRequestPtr request )
{
2021-06-21 19:59:19 +00:00
if ( request = = nullptr )
return true ;
2023-02-19 22:15:09 +00:00
for ( const auto & subrequest : dynamic_cast < Coordination : : ZooKeeperMultiRequest * > ( request . get ( ) ) - > requests )
2021-06-18 18:36:19 +00:00
if ( subrequest = = nullptr )
2021-06-17 13:29:11 +00:00
return true ;
return false ;
}
}
2023-11-11 18:37:30 +00:00
template < typename Storage >
2024-01-31 23:49:18 +00:00
bool deserializeTxn ( Storage & storage , ReadBuffer & in , LoggerPtr /*log*/ )
2021-06-17 13:29:11 +00:00
{
int64_t checksum ;
Coordination : : read ( checksum , in ) ;
/// Zero padding is possible until file end
if ( checksum = = 0 )
return false ;
int32_t txn_len ;
Coordination : : read ( txn_len , in ) ;
int64_t count_before = in . count ( ) ;
int64_t session_id ;
Coordination : : read ( session_id , in ) ;
int32_t xid ;
Coordination : : read ( xid , in ) ;
int64_t zxid ;
Coordination : : read ( zxid , in ) ;
int64_t time ;
Coordination : : read ( time , in ) ;
2021-07-09 13:00:50 +00:00
Coordination : : ZooKeeperRequestPtr request = deserializeTxnImpl ( in , false , txn_len ) ;
2021-06-17 13:29:11 +00:00
2021-06-17 16:32:50 +00:00
/// Skip all other bytes
2021-06-17 13:29:11 +00:00
int64_t bytes_read = in . count ( ) - count_before ;
if ( bytes_read < txn_len )
in . ignore ( txn_len - bytes_read ) ;
/// We don't need to apply error requests
if ( isErrorRequest ( request ) )
return true ;
request - > xid = xid ;
if ( zxid > storage . zxid )
{
/// Separate processing of session id requests
if ( request - > getOpNum ( ) = = Coordination : : OpNum : : SessionID )
{
const Coordination : : ZooKeeperSessionIDRequest & session_id_request = dynamic_cast < const Coordination : : ZooKeeperSessionIDRequest & > ( * request ) ;
storage . getSessionID ( session_id_request . session_timeout_ms ) ;
}
else
{
/// Skip failed multirequests
if ( request - > getOpNum ( ) = = Coordination : : OpNum : : Multi & & hasErrorsInMultiRequest ( request ) )
return true ;
2022-05-17 08:11:08 +00:00
storage . preprocessRequest ( request , session_id , time , zxid , /* check_acl = */ false ) ;
2022-05-23 14:37:57 +00:00
storage . processRequest ( request , session_id , zxid , /* check_acl = */ false ) ;
2021-06-17 13:29:11 +00:00
}
}
return true ;
}
2023-11-11 18:37:30 +00:00
template < typename Storage >
2024-01-31 23:49:18 +00:00
void deserializeLogAndApplyToStorage ( Storage & storage , const std : : string & log_path , LoggerPtr log )
2021-06-17 13:29:11 +00:00
{
ReadBufferFromFile reader ( log_path ) ;
2021-06-17 16:32:50 +00:00
LOG_INFO ( log , " Deserializing log {} " , log_path ) ;
2021-06-17 13:29:11 +00:00
deserializeLogMagic ( reader ) ;
2021-06-17 16:32:50 +00:00
LOG_INFO ( log , " Header looks OK " ) ;
2021-06-17 13:29:11 +00:00
size_t counter = 0 ;
2021-06-18 18:36:19 +00:00
while ( ! reader . eof ( ) & & deserializeTxn ( storage , reader , log ) )
2021-06-17 13:29:11 +00:00
{
counter + + ;
if ( counter % 1000 = = 0 )
2021-06-17 16:32:50 +00:00
LOG_INFO ( log , " Deserialized txns log: {} " , counter ) ;
2021-06-17 13:29:11 +00:00
int8_t forty_two ;
Coordination : : read ( forty_two , reader ) ;
if ( forty_two ! = 0x42 )
throw Exception ( ErrorCodes : : NOT_IMPLEMENTED , " Forty two check byte ({}) is not equal 0x42 " , forty_two) ;
}
2021-06-17 16:32:50 +00:00
LOG_INFO ( log , " Finished {} deserialization, totally read {} records " , log_path , counter ) ;
2021-06-17 13:29:11 +00:00
}
2023-11-11 18:37:30 +00:00
template < typename Storage >
2024-01-31 23:49:18 +00:00
void deserializeLogsAndApplyToStorage ( Storage & storage , const std : : string & path , LoggerPtr log )
2021-06-17 13:29:11 +00:00
{
std : : map < int64_t , std : : string > existing_logs ;
for ( const auto & p : fs : : directory_iterator ( path ) )
{
const auto & log_path = p . path ( ) ;
if ( ! log_path . has_filename ( ) | | ! startsWith ( log_path . filename ( ) , " log. " ) )
continue ;
int64_t zxid = getZxidFromName ( log_path ) ;
existing_logs [ zxid ] = p . path ( ) ;
}
2021-06-17 16:32:50 +00:00
LOG_INFO ( log , " Totally have {} logs " , existing_logs . size ( ) ) ;
2021-09-14 17:14:14 +00:00
std : : vector < std : : string > stored_files ;
for ( auto it = existing_logs . rbegin ( ) ; it ! = existing_logs . rend ( ) ; + + it )
2021-06-17 13:29:11 +00:00
{
2021-09-14 17:14:14 +00:00
if ( it - > first > = storage . zxid )
{
stored_files . emplace_back ( it - > second ) ;
}
else if ( it - > first < storage . zxid )
{
/// add the last logfile that is less than the zxid
stored_files . emplace_back ( it - > second ) ;
break ;
}
}
2021-09-14 17:19:19 +00:00
for ( auto it = stored_files . rbegin ( ) ; it ! = stored_files . rend ( ) ; + + it )
2021-09-14 17:14:14 +00:00
{
2021-09-14 17:19:19 +00:00
deserializeLogAndApplyToStorage ( storage , * it , log ) ;
2021-06-17 13:29:11 +00:00
}
}
2024-01-31 23:49:18 +00:00
template void deserializeKeeperStorageFromSnapshot < KeeperMemoryStorage > ( KeeperMemoryStorage & storage , const std : : string & snapshot_path , LoggerPtr log ) ;
template void deserializeKeeperStorageFromSnapshotsDir < KeeperMemoryStorage > ( KeeperMemoryStorage & storage , const std : : string & path , LoggerPtr log ) ;
template void deserializeLogAndApplyToStorage < KeeperMemoryStorage > ( KeeperMemoryStorage & storage , const std : : string & log_path , LoggerPtr log ) ;
template void deserializeLogsAndApplyToStorage < KeeperMemoryStorage > ( KeeperMemoryStorage & storage , const std : : string & path , LoggerPtr log ) ;
2023-11-11 18:37:30 +00:00
2021-06-17 13:29:11 +00:00
}