2022-05-06 12:25:25 +00:00
# include <functional>
# include <iomanip>
# include <iterator>
# include <mutex>
# include <sstream>
2021-03-29 08:24:56 +00:00
# include <Coordination/KeeperStorage.h>
2022-05-06 12:25:25 +00:00
# include <Coordination/pathUtils.h>
# include <IO/Operators.h>
# include <IO/WriteHelpers.h>
# include <boost/algorithm/string.hpp>
# include <Poco/Base64Encoder.h>
# include <Poco/SHA1Engine.h>
2022-05-05 10:32:41 +00:00
# include "Common/ZooKeeper/ZooKeeperConstants.h"
2020-11-04 18:54:55 +00:00
# include <Common/StringUtils/StringUtils.h>
2022-05-06 12:25:25 +00:00
# include <Common/ZooKeeper/IKeeper.h>
2022-01-10 19:01:41 +00:00
# include <Common/hex.h>
2022-04-27 15:05:45 +00:00
# include <Common/logger_useful.h>
2022-05-06 12:25:25 +00:00
# include <Common/setThreadName.h>
2020-10-30 14:16:47 +00:00
2020-11-11 13:55:28 +00:00
namespace DB
{
2020-11-25 13:19:09 +00:00
namespace ErrorCodes
{
extern const int LOGICAL_ERROR ;
extern const int BAD_ARGUMENTS ;
}
2022-04-05 06:27:03 +00:00
namespace
{
2022-05-09 09:57:06 +00:00
String base64Encode ( const String & decoded )
{
std : : ostringstream ostr ; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
ostr . exceptions ( std : : ios : : failbit ) ;
Poco : : Base64Encoder encoder ( ostr ) ;
encoder . rdbuf ( ) - > setLineLength ( 0 ) ;
encoder < < decoded ;
encoder . close ( ) ;
return ostr . str ( ) ;
}
2021-05-21 21:19:22 +00:00
2022-05-09 09:57:06 +00:00
String getSHA1 ( const String & userdata )
{
Poco : : SHA1Engine engine ;
engine . update ( userdata ) ;
const auto & digest_id = engine . digest ( ) ;
return String { digest_id . begin ( ) , digest_id . end ( ) } ;
}
2021-05-22 16:07:47 +00:00
2022-05-09 09:57:06 +00:00
String generateDigest ( const String & userdata )
{
std : : vector < String > user_password ;
boost : : split ( user_password , userdata , [ ] ( char character ) { return character = = ' : ' ; } ) ;
return user_password [ 0 ] + " : " + base64Encode ( getSHA1 ( userdata ) ) ;
}
2021-05-21 21:19:22 +00:00
2022-05-09 09:57:06 +00:00
bool fixupACL (
const std : : vector < Coordination : : ACL > & request_acls ,
const std : : vector < KeeperStorage : : AuthID > & current_ids ,
std : : vector < Coordination : : ACL > & result_acls )
{
if ( request_acls . empty ( ) )
return true ;
bool valid_found = false ;
for ( const auto & request_acl : request_acls )
{
if ( request_acl . scheme = = " auth " )
2021-05-22 16:21:52 +00:00
{
2022-05-09 09:57:06 +00:00
for ( const auto & current_id : current_ids )
2021-05-22 16:21:52 +00:00
{
2021-05-24 12:18:04 +00:00
valid_found = true ;
2022-05-06 12:25:25 +00:00
Coordination : : ACL new_acl = request_acl ;
2022-05-09 09:57:06 +00:00
new_acl . scheme = current_id . scheme ;
new_acl . id = current_id . id ;
2022-05-06 12:25:25 +00:00
result_acls . push_back ( new_acl ) ;
}
2021-05-24 12:18:04 +00:00
}
2022-05-09 09:57:06 +00:00
else if ( request_acl . scheme = = " world " & & request_acl . id = = " anyone " )
{
/// We don't need to save default ACLs
valid_found = true ;
}
else if ( request_acl . scheme = = " digest " )
{
Coordination : : ACL new_acl = request_acl ;
/// Bad auth
if ( std : : count ( new_acl . id . begin ( ) , new_acl . id . end ( ) , ' : ' ) ! = 1 )
return false ;
valid_found = true ;
result_acls . push_back ( new_acl ) ;
}
2021-05-22 16:21:52 +00:00
}
2022-05-09 09:57:06 +00:00
return valid_found ;
}
2021-05-22 16:21:52 +00:00
2022-05-09 09:57:06 +00:00
KeeperStorage : : ResponsesForSessions processWatchesImpl (
const String & path , KeeperStorage : : Watches & watches , KeeperStorage : : Watches & list_watches , Coordination : : Event event_type )
{
KeeperStorage : : ResponsesForSessions result ;
auto watch_it = watches . find ( path ) ;
if ( watch_it ! = watches . end ( ) )
2020-11-10 13:43:10 +00:00
{
2022-05-09 09:57:06 +00:00
std : : shared_ptr < Coordination : : ZooKeeperWatchResponse > watch_response = std : : make_shared < Coordination : : ZooKeeperWatchResponse > ( ) ;
watch_response - > path = path ;
watch_response - > xid = Coordination : : WATCH_XID ;
watch_response - > zxid = - 1 ;
watch_response - > type = event_type ;
watch_response - > state = Coordination : : State : : CONNECTED ;
for ( auto watcher_session : watch_it - > second )
result . push_back ( KeeperStorage : : ResponseForSession { watcher_session , watch_response } ) ;
watches . erase ( watch_it ) ;
}
2020-11-10 13:43:10 +00:00
2022-05-09 09:57:06 +00:00
auto parent_path = parentPath ( path ) ;
2021-08-26 11:50:08 +00:00
2022-05-09 09:57:06 +00:00
Strings paths_to_check_for_list_watches ;
if ( event_type = = Coordination : : Event : : CREATED )
{
paths_to_check_for_list_watches . push_back ( parent_path . toString ( ) ) ; /// Trigger list watches for parent
}
else if ( event_type = = Coordination : : Event : : DELETED )
{
paths_to_check_for_list_watches . push_back ( path ) ; /// Trigger both list watches for this path
paths_to_check_for_list_watches . push_back ( parent_path . toString ( ) ) ; /// And for parent path
}
/// CHANGED event never trigger list wathes
2020-11-10 13:43:10 +00:00
2022-05-09 09:57:06 +00:00
for ( const auto & path_to_check : paths_to_check_for_list_watches )
{
watch_it = list_watches . find ( path_to_check ) ;
if ( watch_it ! = list_watches . end ( ) )
2022-05-06 12:25:25 +00:00
{
2022-05-09 09:57:06 +00:00
std : : shared_ptr < Coordination : : ZooKeeperWatchResponse > watch_list_response
= std : : make_shared < Coordination : : ZooKeeperWatchResponse > ( ) ;
watch_list_response - > path = path_to_check ;
watch_list_response - > xid = Coordination : : WATCH_XID ;
watch_list_response - > zxid = - 1 ;
if ( path_to_check = = parent_path )
watch_list_response - > type = Coordination : : Event : : CHILD ;
else
watch_list_response - > type = Coordination : : Event : : DELETED ;
watch_list_response - > state = Coordination : : State : : CONNECTED ;
for ( auto watcher_session : watch_it - > second )
result . push_back ( KeeperStorage : : ResponseForSession { watcher_session , watch_list_response } ) ;
2021-08-26 11:50:08 +00:00
2022-05-09 09:57:06 +00:00
list_watches . erase ( watch_it ) ;
2021-08-26 11:50:08 +00:00
}
2020-11-10 13:43:10 +00:00
}
2022-05-09 09:57:06 +00:00
return result ;
}
2022-04-05 06:27:03 +00:00
}
void KeeperStorage : : Node : : setData ( String new_data )
{
size_bytes = size_bytes - data . size ( ) + new_data . size ( ) ;
data = std : : move ( new_data ) ;
}
void KeeperStorage : : Node : : addChild ( StringRef child_path )
{
size_bytes + = sizeof child_path ;
children . insert ( child_path ) ;
}
void KeeperStorage : : Node : : removeChild ( StringRef child_path )
{
size_bytes - = sizeof child_path ;
children . erase ( child_path ) ;
}
2020-11-10 13:43:10 +00:00
2021-05-23 17:54:42 +00:00
KeeperStorage : : KeeperStorage ( int64_t tick_time_ms , const String & superdigest_ )
2022-05-06 12:25:25 +00:00
: session_expiry_queue ( tick_time_ms ) , superdigest ( superdigest_ )
2020-11-03 14:49:30 +00:00
{
2021-02-26 13:53:34 +00:00
container . insert ( " / " , Node ( ) ) ;
2020-11-03 14:49:30 +00:00
}
2022-05-06 12:25:25 +00:00
template < class . . . Ts >
struct Overloaded : Ts . . .
2022-05-05 10:32:41 +00:00
{
2022-05-06 12:25:25 +00:00
using Ts : : operator ( ) . . . ;
} ;
template < class . . . Ts >
Overloaded ( Ts . . . ) - > Overloaded < Ts . . . > ;
2022-05-11 09:08:39 +00:00
std : : shared_ptr < KeeperStorage : : Node > KeeperStorage : : UncommittedState : : getNode ( StringRef path )
2022-05-06 12:25:25 +00:00
{
std : : shared_ptr < Node > node { nullptr } ;
2022-05-05 10:32:41 +00:00
if ( auto maybe_node_it = storage . container . find ( path ) ; maybe_node_it ! = storage . container . end ( ) )
{
2022-05-06 12:25:25 +00:00
const auto & committed_node = maybe_node_it - > value ;
node = std : : make_shared < KeeperStorage : : Node > ( ) ;
node - > stat = committed_node . stat ;
2022-05-09 11:53:19 +00:00
node - > seq_num = committed_node . seq_num ;
node - > setData ( committed_node . getData ( ) ) ;
2022-05-05 10:32:41 +00:00
}
2022-05-06 12:25:25 +00:00
applyDeltas (
path ,
Overloaded {
[ & ] ( const CreateNodeDelta & create_delta )
{
assert ( ! node ) ;
node = std : : make_shared < Node > ( ) ;
node - > stat = create_delta . stat ;
node - > setData ( create_delta . data ) ;
} ,
[ & ] ( const RemoveNodeDelta & /*remove_delta*/ )
{
assert ( node ) ;
node = nullptr ;
} ,
2022-05-09 11:53:19 +00:00
[ & ] ( const UpdateNodeDelta & update_delta )
{
assert ( node ) ;
update_delta . update_fn ( * node ) ;
} ,
2022-05-06 12:25:25 +00:00
[ & ] ( auto & & /*delta*/ ) { } ,
} ) ;
return node ;
}
2022-05-11 09:08:39 +00:00
bool KeeperStorage : : UncommittedState : : hasNode ( StringRef path ) const
2022-05-06 12:25:25 +00:00
{
bool exists = storage . container . contains ( std : : string { path } ) ;
applyDeltas (
path ,
Overloaded {
[ & ] ( const CreateNodeDelta & /*create_delta*/ )
{
assert ( ! exists ) ;
exists = true ;
} ,
[ & ] ( const RemoveNodeDelta & /*remove_delta*/ )
{
assert ( exists ) ;
exists = false ;
} ,
[ & ] ( auto & & /*delta*/ ) { } ,
} ) ;
return exists ;
2022-05-05 10:32:41 +00:00
}
2022-05-11 09:08:39 +00:00
Coordination : : ACLs KeeperStorage : : UncommittedState : : getACLs ( StringRef path ) const
2022-05-09 09:16:05 +00:00
{
std : : optional < uint64_t > acl_id ;
if ( auto maybe_node_it = storage . container . find ( path ) ; maybe_node_it ! = storage . container . end ( ) )
acl_id . emplace ( maybe_node_it - > value . acl_id ) ;
const Coordination : : ACLs * acls { nullptr } ;
applyDeltas (
path ,
Overloaded {
[ & ] ( const CreateNodeDelta & create_delta )
{
assert ( ! acl_id ) ;
acls = & create_delta . acls ;
} ,
2022-05-11 12:10:17 +00:00
[ & ] ( const RemoveNodeDelta & /*remove_delta*/ )
{
assert ( acl_id | | acls ) ;
acl_id . reset ( ) ;
acls = nullptr ;
} ,
2022-05-09 09:16:05 +00:00
[ & ] ( const SetACLDelta & set_acl_delta )
{
assert ( acl_id | | acls ) ;
acls = & set_acl_delta . acls ;
} ,
[ & ] ( auto & & /*delta*/ ) { } ,
} ) ;
if ( acls )
return * acls ;
return acl_id ? storage . acl_map . convertNumber ( * acl_id ) : Coordination : : ACLs { } ;
}
2022-05-06 12:25:25 +00:00
namespace
2022-05-05 10:32:41 +00:00
{
2022-05-11 12:10:17 +00:00
[[noreturn]] void onStorageInconsistency ( )
{
2022-05-16 13:15:31 +00:00
LOG_ERROR ( & Poco : : Logger : : get ( " KeeperStorage " ) , " Inconsistency found between uncommitted and committed data. Keeper will terminate to avoid undefined behaviour. " ) ;
2022-05-11 12:10:17 +00:00
std : : terminate ( ) ;
}
2022-05-05 10:32:41 +00:00
}
2022-05-06 12:25:25 +00:00
Coordination : : Error KeeperStorage : : commit ( int64_t commit_zxid , int64_t session_id )
2022-05-05 10:32:41 +00:00
{
2022-05-11 09:08:39 +00:00
for ( auto & delta : uncommitted_state . deltas )
2022-05-05 10:32:41 +00:00
{
2022-05-06 12:25:25 +00:00
if ( delta . zxid > commit_zxid )
break ;
2022-05-09 07:02:11 +00:00
bool finish_subdelta = false ;
2022-05-06 12:25:25 +00:00
auto result = std : : visit (
2022-05-10 09:53:15 +00:00
[ & , & path = delta . path ] < typename DeltaType > ( DeltaType & operation ) - > Coordination : : Error
2022-05-09 09:42:23 +00:00
{
if constexpr ( std : : same_as < DeltaType , KeeperStorage : : CreateNodeDelta > )
2022-05-06 12:25:25 +00:00
{
if ( ! createNode (
path ,
2022-05-09 09:42:23 +00:00
std : : move ( operation . data ) ,
operation . stat ,
operation . is_sequental ,
operation . is_ephemeral ,
std : : move ( operation . acls ) ,
2022-05-06 12:25:25 +00:00
session_id ) )
2022-05-11 12:10:17 +00:00
onStorageInconsistency ( ) ;
2022-05-06 12:25:25 +00:00
return Coordination : : Error : : ZOK ;
2022-05-09 09:42:23 +00:00
}
else if constexpr ( std : : same_as < DeltaType , KeeperStorage : : UpdateNodeDelta > )
2022-05-06 12:25:25 +00:00
{
2022-05-09 09:16:05 +00:00
auto node_it = container . find ( path ) ;
if ( node_it = = container . end ( ) )
2022-05-11 12:10:17 +00:00
onStorageInconsistency ( ) ;
2022-05-06 12:25:25 +00:00
2022-05-09 09:42:23 +00:00
if ( operation . version ! = - 1 & & operation . version ! = node_it - > value . stat . version )
2022-05-11 12:10:17 +00:00
onStorageInconsistency ( ) ;
2022-05-06 12:25:25 +00:00
2022-05-09 09:42:23 +00:00
container . updateValue ( path , operation . update_fn ) ;
2022-05-06 12:25:25 +00:00
return Coordination : : Error : : ZOK ;
2022-05-09 09:42:23 +00:00
}
else if constexpr ( std : : same_as < DeltaType , KeeperStorage : : RemoveNodeDelta > )
2022-05-06 12:25:25 +00:00
{
2022-05-09 09:42:23 +00:00
if ( ! removeNode ( path , operation . version ) )
2022-05-11 12:10:17 +00:00
onStorageInconsistency ( ) ;
2022-05-06 12:25:25 +00:00
return Coordination : : Error : : ZOK ;
2022-05-09 09:42:23 +00:00
}
else if constexpr ( std : : same_as < DeltaType , KeeperStorage : : SetACLDelta > )
2022-05-06 12:25:25 +00:00
{
2022-05-09 09:16:05 +00:00
auto node_it = container . find ( path ) ;
2022-05-11 07:53:32 +00:00
if ( node_it = = container . end ( ) )
2022-05-11 12:10:17 +00:00
onStorageInconsistency ( ) ;
2022-05-06 12:25:25 +00:00
2022-05-09 09:42:23 +00:00
if ( operation . version ! = - 1 & & operation . version ! = node_it - > value . stat . aversion )
2022-05-11 12:10:17 +00:00
onStorageInconsistency ( ) ;
2022-05-06 12:25:25 +00:00
2022-05-09 09:16:05 +00:00
acl_map . removeUsage ( node_it - > value . acl_id ) ;
2022-05-06 12:25:25 +00:00
2022-05-09 09:42:23 +00:00
uint64_t acl_id = acl_map . convertACLs ( operation . acls ) ;
2022-05-06 12:25:25 +00:00
acl_map . addUsage ( acl_id ) ;
container . updateValue ( path , [ acl_id ] ( KeeperStorage : : Node & node ) { node . acl_id = acl_id ; } ) ;
return Coordination : : Error : : ZOK ;
2022-05-09 09:42:23 +00:00
}
2022-05-09 10:19:20 +00:00
else if constexpr ( std : : same_as < DeltaType , KeeperStorage : : ErrorDelta > )
2022-05-09 09:42:23 +00:00
return operation . error ;
else if constexpr ( std : : same_as < DeltaType , KeeperStorage : : SubDeltaEnd > )
2022-05-09 07:02:11 +00:00
{
finish_subdelta = true ;
return Coordination : : Error : : ZOK ;
2022-05-09 09:42:23 +00:00
}
2022-05-10 12:53:18 +00:00
else if constexpr ( std : : same_as < DeltaType , KeeperStorage : : AddAuthDelta > )
{
session_and_auth [ operation . session_id ] . emplace_back ( std : : move ( operation . auth_id ) ) ;
return Coordination : : Error : : ZOK ;
}
2022-05-09 09:42:23 +00:00
else
2022-05-09 07:02:11 +00:00
{
2022-05-09 09:42:23 +00:00
// shouldn't be called in any process functions
2022-05-11 12:10:17 +00:00
onStorageInconsistency ( ) ;
2022-05-09 09:42:23 +00:00
}
} ,
2022-05-06 12:25:25 +00:00
delta . operation ) ;
if ( result ! = Coordination : : Error : : ZOK )
return result ;
2022-05-09 07:02:11 +00:00
if ( finish_subdelta )
return Coordination : : Error : : ZOK ;
2022-05-06 12:25:25 +00:00
}
return Coordination : : Error : : ZOK ;
}
bool KeeperStorage : : createNode (
const std : : string & path ,
String data ,
const Coordination : : Stat & stat ,
bool is_sequental ,
bool is_ephemeral ,
Coordination : : ACLs node_acls ,
int64_t session_id )
{
auto parent_path = parentPath ( path ) ;
2022-05-09 09:16:05 +00:00
auto node_it = container . find ( parent_path ) ;
2022-05-06 12:25:25 +00:00
2022-05-09 09:16:05 +00:00
if ( node_it = = container . end ( ) )
2022-05-06 12:25:25 +00:00
return false ;
2022-05-09 09:16:05 +00:00
if ( node_it - > value . stat . ephemeralOwner ! = 0 )
2022-05-06 12:25:25 +00:00
return false ;
if ( container . contains ( path ) )
return false ;
KeeperStorage : : Node created_node ;
uint64_t acl_id = acl_map . convertACLs ( node_acls ) ;
acl_map . addUsage ( acl_id ) ;
created_node . acl_id = acl_id ;
created_node . stat = stat ;
created_node . setData ( std : : move ( data ) ) ;
created_node . is_sequental = is_sequental ;
auto [ map_key , _ ] = container . insert ( path , created_node ) ;
/// Take child path from key owned by map.
auto child_path = getBaseName ( map_key - > getKey ( ) ) ;
container . updateValue ( parent_path , [ child_path ] ( KeeperStorage : : Node & parent ) { parent . addChild ( child_path ) ; } ) ;
if ( is_ephemeral )
ephemerals [ session_id ] . emplace ( path ) ;
return true ;
} ;
2022-05-05 10:32:41 +00:00
2022-05-06 12:25:25 +00:00
bool KeeperStorage : : removeNode ( const std : : string & path , int32_t version )
{
2022-05-09 09:16:05 +00:00
auto node_it = container . find ( path ) ;
if ( node_it = = container . end ( ) )
2022-05-06 12:25:25 +00:00
return false ;
2022-05-09 09:16:05 +00:00
if ( version ! = - 1 & & version ! = node_it - > value . stat . version )
2022-05-06 12:25:25 +00:00
return false ;
2022-05-09 09:16:05 +00:00
if ( node_it - > value . stat . numChildren )
2022-05-06 12:25:25 +00:00
return false ;
2022-05-09 09:16:05 +00:00
auto prev_node = node_it - > value ;
2022-05-06 12:25:25 +00:00
if ( prev_node . stat . ephemeralOwner ! = 0 )
{
auto ephemerals_it = ephemerals . find ( prev_node . stat . ephemeralOwner ) ;
ephemerals_it - > second . erase ( path ) ;
if ( ephemerals_it - > second . empty ( ) )
ephemerals . erase ( ephemerals_it ) ;
2022-05-05 10:32:41 +00:00
}
2022-05-06 12:25:25 +00:00
acl_map . removeUsage ( prev_node . acl_id ) ;
container . updateValue (
2022-05-09 09:16:05 +00:00
parentPath ( path ) ,
[ child_basename = getBaseName ( node_it - > key ) ] ( KeeperStorage : : Node & parent ) { parent . removeChild ( child_basename ) ; } ) ;
2022-05-06 12:25:25 +00:00
container . erase ( path ) ;
return true ;
2022-05-05 10:32:41 +00:00
}
2022-05-06 12:25:25 +00:00
2021-08-24 12:30:31 +00:00
struct KeeperStorageRequestProcessor
2020-10-30 19:57:30 +00:00
{
Coordination : : ZooKeeperRequestPtr zk_request ;
2022-05-06 12:25:25 +00:00
explicit KeeperStorageRequestProcessor ( const Coordination : : ZooKeeperRequestPtr & zk_request_ ) : zk_request ( zk_request_ ) { }
virtual Coordination : : ZooKeeperResponsePtr process ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t time ) const = 0 ;
virtual std : : vector < KeeperStorage : : Delta >
preprocess ( KeeperStorage & /*storage*/ , int64_t /*zxid*/ , int64_t /*session_id*/ , int64_t /*time*/ ) const
{
return { } ;
}
2022-05-09 08:32:25 +00:00
// process the request using locally committed data
virtual Coordination : : ZooKeeperResponsePtr
processLocal ( KeeperStorage & /*storage*/ , int64_t /*zxid*/ , int64_t /*session_id*/ , int64_t /*time*/ ) const
{
throw Exception { DB : : ErrorCodes : : LOGICAL_ERROR , " Cannot process the request locally " } ;
}
2022-05-06 12:25:25 +00:00
virtual KeeperStorage : : ResponsesForSessions
processWatches ( KeeperStorage : : Watches & /*watches*/ , KeeperStorage : : Watches & /*list_watches*/ ) const
{
return { } ;
}
2022-05-09 09:16:05 +00:00
virtual bool checkAuth ( KeeperStorage & /*storage*/ , int64_t /*session_id*/ , bool /*is_local*/ ) const { return true ; }
2020-11-10 13:43:10 +00:00
2021-08-24 12:30:31 +00:00
virtual ~ KeeperStorageRequestProcessor ( ) = default ;
2020-11-20 12:36:10 +00:00
} ;
2020-10-30 19:57:30 +00:00
2021-08-24 12:30:31 +00:00
struct KeeperStorageHeartbeatRequestProcessor final : public KeeperStorageRequestProcessor
2020-11-03 14:49:30 +00:00
{
2021-08-24 12:30:31 +00:00
using KeeperStorageRequestProcessor : : KeeperStorageRequestProcessor ;
2022-05-06 12:25:25 +00:00
Coordination : : ZooKeeperResponsePtr
process ( KeeperStorage & /* storage */ , int64_t /* zxid */ , int64_t /* session_id */ , int64_t /* time */ ) const override
2020-11-03 14:49:30 +00:00
{
2022-05-09 11:53:19 +00:00
return zk_request - > makeResponse ( ) ;
2020-11-03 14:49:30 +00:00
}
} ;
2021-08-24 12:30:31 +00:00
struct KeeperStorageSyncRequestProcessor final : public KeeperStorageRequestProcessor
2021-02-10 13:01:05 +00:00
{
2021-08-24 12:30:31 +00:00
using KeeperStorageRequestProcessor : : KeeperStorageRequestProcessor ;
2022-05-06 12:25:25 +00:00
Coordination : : ZooKeeperResponsePtr
process ( KeeperStorage & /* storage */ , int64_t /* zxid */ , int64_t /* session_id */ , int64_t /* time */ ) const override
2021-02-10 13:01:05 +00:00
{
auto response = zk_request - > makeResponse ( ) ;
2021-05-08 14:10:06 +00:00
dynamic_cast < Coordination : : ZooKeeperSyncResponse & > ( * response ) . path
= dynamic_cast < Coordination : : ZooKeeperSyncRequest & > ( * zk_request ) . path ;
2022-05-09 07:02:11 +00:00
return response ;
2021-02-10 13:01:05 +00:00
}
} ;
2022-05-06 12:25:25 +00:00
namespace
{
2022-05-09 09:16:05 +00:00
2022-05-10 12:53:18 +00:00
Coordination : : ACLs getNodeACLs ( KeeperStorage & storage , StringRef path , bool is_local )
2022-05-09 09:16:05 +00:00
{
if ( is_local )
{
auto node_it = storage . container . find ( path ) ;
if ( node_it = = storage . container . end ( ) )
return { } ;
return storage . acl_map . convertNumber ( node_it - > value . acl_id ) ;
}
2022-05-11 09:08:39 +00:00
return storage . uncommitted_state . getACLs ( path ) ;
2022-05-09 09:16:05 +00:00
}
2022-05-06 12:25:25 +00:00
}
2022-05-10 12:53:18 +00:00
bool KeeperStorage : : checkACL ( StringRef path , int32_t permission , int64_t session_id , bool is_local )
{
const auto node_acls = getNodeACLs ( * this , path , is_local ) ;
if ( node_acls . empty ( ) )
return true ;
2022-05-11 09:08:39 +00:00
if ( uncommitted_state . hasACL ( session_id , is_local , [ ] ( const auto & auth_id ) { return auth_id . scheme = = " super " ; } ) )
2022-05-10 12:53:18 +00:00
return true ;
2022-05-11 07:53:32 +00:00
2022-05-10 12:53:18 +00:00
for ( const auto & node_acl : node_acls )
{
if ( node_acl . permissions & permission )
{
if ( node_acl . scheme = = " world " & & node_acl . id = = " anyone " )
return true ;
2022-05-11 09:08:39 +00:00
if ( uncommitted_state . hasACL (
2022-05-10 12:53:18 +00:00
session_id ,
is_local ,
[ & ] ( const auto & auth_id ) { return auth_id . scheme = = node_acl . scheme & & auth_id . id = = node_acl . id ; } ) )
return true ;
}
}
return false ;
}
2022-05-06 12:25:25 +00:00
2021-08-24 12:30:31 +00:00
struct KeeperStorageCreateRequestProcessor final : public KeeperStorageRequestProcessor
2020-10-30 19:57:30 +00:00
{
2021-08-24 12:30:31 +00:00
using KeeperStorageRequestProcessor : : KeeperStorageRequestProcessor ;
2020-11-10 13:43:10 +00:00
2022-05-06 12:25:25 +00:00
KeeperStorage : : ResponsesForSessions
processWatches ( KeeperStorage : : Watches & watches , KeeperStorage : : Watches & list_watches ) const override
2020-11-10 13:43:10 +00:00
{
2021-01-19 14:22:28 +00:00
return processWatchesImpl ( zk_request - > getPath ( ) , watches , list_watches , Coordination : : Event : : CREATED ) ;
2020-11-10 13:43:10 +00:00
}
2022-05-09 09:16:05 +00:00
bool checkAuth ( KeeperStorage & storage , int64_t session_id , bool is_local ) const override
2020-10-30 19:57:30 +00:00
{
2022-01-22 15:29:36 +00:00
auto path = zk_request - > getPath ( ) ;
2022-05-10 12:53:18 +00:00
return storage . checkACL ( parentPath ( path ) , Coordination : : ACL : : Create , session_id , is_local ) ;
2021-05-21 21:19:22 +00:00
}
2022-05-06 12:25:25 +00:00
std : : vector < KeeperStorage : : Delta > preprocess ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t time ) const override
2022-05-05 10:32:41 +00:00
{
Coordination : : ZooKeeperCreateRequest & request = dynamic_cast < Coordination : : ZooKeeperCreateRequest & > ( * zk_request ) ;
2022-05-06 12:25:25 +00:00
std : : vector < KeeperStorage : : Delta > new_deltas ;
2022-05-05 10:32:41 +00:00
auto parent_path = parentPath ( request . path ) ;
2022-05-11 09:08:39 +00:00
auto parent_node = storage . uncommitted_state . getNode ( parent_path ) ;
2022-05-05 10:32:41 +00:00
if ( parent_node = = nullptr )
2022-05-09 07:02:11 +00:00
return { { zxid , Coordination : : Error : : ZNONODE } } ;
2022-05-06 12:25:25 +00:00
2022-05-05 10:32:41 +00:00
else if ( parent_node - > stat . ephemeralOwner ! = 0 )
2022-05-09 07:02:11 +00:00
return { { zxid , Coordination : : Error : : ZNOCHILDRENFOREPHEMERALS } } ;
2022-05-05 10:32:41 +00:00
std : : string path_created = request . path ;
if ( request . is_sequential )
{
auto seq_num = parent_node - > seq_num ;
2022-05-06 12:25:25 +00:00
std : : stringstream seq_num_str ; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
2022-05-05 10:32:41 +00:00
seq_num_str . exceptions ( std : : ios : : failbit ) ;
seq_num_str < < std : : setw ( 10 ) < < std : : setfill ( ' 0 ' ) < < seq_num ;
path_created + = seq_num_str . str ( ) ;
}
2022-05-11 09:08:39 +00:00
if ( storage . uncommitted_state . hasNode ( path_created ) )
2022-05-09 07:02:11 +00:00
return { { zxid , Coordination : : Error : : ZNODEEXISTS } } ;
2022-05-05 10:32:41 +00:00
if ( getBaseName ( path_created ) . size = = 0 )
2022-05-09 07:02:11 +00:00
return { { zxid , Coordination : : Error : : ZBADARGUMENTS } } ;
2022-05-05 10:32:41 +00:00
2022-05-06 12:25:25 +00:00
Coordination : : ACLs node_acls ;
if ( ! fixupACL ( request . acls , storage . session_and_auth [ session_id ] , node_acls ) )
2022-05-09 07:02:11 +00:00
return { { zxid , Coordination : : Error : : ZINVALIDACL } } ;
2022-05-06 12:25:25 +00:00
Coordination : : Stat stat ;
stat . czxid = zxid ;
stat . mzxid = zxid ;
stat . pzxid = zxid ;
stat . ctime = time ;
stat . mtime = time ;
stat . numChildren = 0 ;
2022-05-09 11:53:19 +00:00
stat . version = 0 ;
stat . aversion = 0 ;
2022-05-10 13:31:39 +00:00
stat . cversion = 0 ;
2022-05-06 12:25:25 +00:00
stat . dataLength = request . data . length ( ) ;
stat . ephemeralOwner = request . is_ephemeral ? session_id : 0 ;
new_deltas . emplace_back (
2022-05-10 13:31:39 +00:00
std : : move ( path_created ) ,
2022-05-06 12:25:25 +00:00
zxid ,
KeeperStorage : : CreateNodeDelta { stat , request . is_ephemeral , request . is_sequential , std : : move ( node_acls ) , request . data } ) ;
2022-05-05 10:32:41 +00:00
int32_t parent_cversion = request . parent_cversion ;
2022-05-06 12:25:25 +00:00
new_deltas . emplace_back (
std : : string { parent_path } ,
zxid ,
KeeperStorage : : UpdateNodeDelta { [ parent_cversion , zxid ] ( KeeperStorage : : Node & node )
{
+ + node . seq_num ;
if ( parent_cversion = = - 1 )
+ + node . stat . cversion ;
else if ( parent_cversion > node . stat . cversion )
node . stat . cversion = parent_cversion ;
if ( zxid > node . stat . pzxid )
node . stat . pzxid = zxid ;
+ + node . stat . numChildren ;
} } ) ;
return new_deltas ;
2022-05-05 10:32:41 +00:00
}
2022-05-06 12:25:25 +00:00
Coordination : : ZooKeeperResponsePtr process ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t /*time*/ ) const override
2021-05-21 21:19:22 +00:00
{
2020-10-30 19:57:30 +00:00
Coordination : : ZooKeeperResponsePtr response_ptr = zk_request - > makeResponse ( ) ;
Coordination : : ZooKeeperCreateResponse & response = dynamic_cast < Coordination : : ZooKeeperCreateResponse & > ( * response_ptr ) ;
2021-08-03 09:59:08 +00:00
2022-05-09 08:32:25 +00:00
if ( const auto result = storage . commit ( zxid , session_id ) ; result ! = Coordination : : Error : : ZOK )
2020-10-30 19:57:30 +00:00
{
2022-05-06 12:25:25 +00:00
response . error = result ;
2022-05-09 07:02:11 +00:00
return response_ptr ;
2020-10-30 19:57:30 +00:00
}
2022-05-11 09:08:39 +00:00
const auto & deltas = storage . uncommitted_state . deltas ;
2022-05-06 12:25:25 +00:00
auto create_delta_it = std : : find_if (
deltas . begin ( ) ,
deltas . end ( ) ,
[ zxid ] ( const auto & delta )
{ return delta . zxid = = zxid & & std : : holds_alternative < KeeperStorage : : CreateNodeDelta > ( delta . operation ) ; } ) ;
2021-08-03 09:59:08 +00:00
2022-05-11 12:10:17 +00:00
assert ( create_delta_it ! = deltas . end ( ) ) ;
2021-08-03 09:59:08 +00:00
2022-05-06 12:25:25 +00:00
response . path_created = create_delta_it - > path ;
2021-08-03 09:59:08 +00:00
response . error = Coordination : : Error : : ZOK ;
2022-05-06 12:25:25 +00:00
return response_ptr ;
2020-10-30 19:57:30 +00:00
}
} ;
2021-08-24 12:30:31 +00:00
struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProcessor
2020-11-03 14:49:30 +00:00
{
2022-05-09 09:16:05 +00:00
bool checkAuth ( KeeperStorage & storage , int64_t session_id , bool is_local ) const override
2021-05-21 21:19:22 +00:00
{
2022-05-10 12:53:18 +00:00
return storage . checkACL ( zk_request - > getPath ( ) , Coordination : : ACL : : Read , session_id , is_local ) ;
2021-05-21 21:19:22 +00:00
}
2021-08-24 12:30:31 +00:00
using KeeperStorageRequestProcessor : : KeeperStorageRequestProcessor ;
2022-05-06 12:25:25 +00:00
std : : vector < KeeperStorage : : Delta >
preprocess ( KeeperStorage & storage , int64_t zxid , int64_t /*session_id*/ , int64_t /*time*/ ) const override
{
Coordination : : ZooKeeperGetRequest & request = dynamic_cast < Coordination : : ZooKeeperGetRequest & > ( * zk_request ) ;
2022-05-11 09:08:39 +00:00
if ( ! storage . uncommitted_state . hasNode ( request . path ) )
2022-05-09 07:02:11 +00:00
return { { zxid , Coordination : : Error : : ZNONODE } } ;
2022-05-06 12:25:25 +00:00
return { } ;
}
2022-05-09 08:32:25 +00:00
template < bool local >
Coordination : : ZooKeeperResponsePtr processImpl ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t /* time */ ) const
2020-11-03 14:49:30 +00:00
{
Coordination : : ZooKeeperResponsePtr response_ptr = zk_request - > makeResponse ( ) ;
Coordination : : ZooKeeperGetResponse & response = dynamic_cast < Coordination : : ZooKeeperGetResponse & > ( * response_ptr ) ;
Coordination : : ZooKeeperGetRequest & request = dynamic_cast < Coordination : : ZooKeeperGetRequest & > ( * zk_request ) ;
2022-05-10 07:00:38 +00:00
if constexpr ( ! local )
2022-05-09 08:32:25 +00:00
{
if ( const auto result = storage . commit ( zxid , session_id ) ; result ! = Coordination : : Error : : ZOK )
{
response . error = result ;
return response_ptr ;
}
}
const auto on_error = [ & ] ( [[maybe_unused]] const auto error_code )
2020-11-03 14:49:30 +00:00
{
2022-05-09 08:32:25 +00:00
if constexpr ( local )
response . error = error_code ;
else
2022-05-11 12:10:17 +00:00
onStorageInconsistency ( ) ;
2022-05-09 08:32:25 +00:00
} ;
auto & container = storage . container ;
auto node_it = container . find ( request . path ) ;
if ( node_it = = container . end ( ) )
{
on_error ( Coordination : : Error : : ZNONODE ) ;
2020-11-03 14:49:30 +00:00
}
else
{
2022-05-09 08:32:25 +00:00
response . stat = node_it - > value . stat ;
response . data = node_it - > value . getData ( ) ;
2020-11-03 14:49:30 +00:00
response . error = Coordination : : Error : : ZOK ;
}
2022-05-06 12:25:25 +00:00
return response_ptr ;
2020-11-03 14:49:30 +00:00
}
2022-05-09 08:32:25 +00:00
Coordination : : ZooKeeperResponsePtr process ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t time ) const override
{
return processImpl < false > ( storage , zxid , session_id , time ) ;
}
Coordination : : ZooKeeperResponsePtr processLocal ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t time ) const override
{
return processImpl < true > ( storage , zxid , session_id , time ) ;
}
2020-11-03 14:49:30 +00:00
} ;
2021-08-24 12:30:31 +00:00
struct KeeperStorageRemoveRequestProcessor final : public KeeperStorageRequestProcessor
2020-11-04 18:54:55 +00:00
{
2022-05-09 09:16:05 +00:00
bool checkAuth ( KeeperStorage & storage , int64_t session_id , bool is_local ) const override
2021-05-21 21:19:22 +00:00
{
2022-05-10 12:53:18 +00:00
return storage . checkACL ( parentPath ( zk_request - > getPath ( ) ) , Coordination : : ACL : : Delete , session_id , is_local ) ;
2021-05-21 21:19:22 +00:00
}
2021-08-24 12:30:31 +00:00
using KeeperStorageRequestProcessor : : KeeperStorageRequestProcessor ;
2022-05-06 12:25:25 +00:00
std : : vector < KeeperStorage : : Delta >
preprocess ( KeeperStorage & storage , int64_t zxid , int64_t /*session_id*/ , int64_t /*time*/ ) const override
2020-11-04 18:54:55 +00:00
{
Coordination : : ZooKeeperRemoveRequest & request = dynamic_cast < Coordination : : ZooKeeperRemoveRequest & > ( * zk_request ) ;
2020-11-19 16:06:19 +00:00
2022-05-06 12:25:25 +00:00
std : : vector < KeeperStorage : : Delta > new_deltas ;
2021-05-28 11:52:19 +00:00
2022-05-06 12:25:25 +00:00
const auto update_parent_pzxid = [ & ] ( )
{
auto parent_path = parentPath ( request . path ) ;
2022-05-11 09:08:39 +00:00
if ( ! storage . uncommitted_state . hasNode ( parent_path ) )
2022-05-06 12:25:25 +00:00
return ;
new_deltas . emplace_back (
std : : string { parent_path } ,
zxid ,
KeeperStorage : : UpdateNodeDelta { [ zxid ] ( KeeperStorage : : Node & parent )
{
if ( parent . stat . pzxid < zxid )
parent . stat . pzxid = zxid ;
} } ) ;
} ;
2021-02-26 13:53:34 +00:00
2022-05-11 09:08:39 +00:00
auto node = storage . uncommitted_state . getNode ( request . path ) ;
2021-02-19 07:05:52 +00:00
2022-05-06 12:25:25 +00:00
if ( ! node )
{
if ( request . restored_from_zookeeper_log )
update_parent_pzxid ( ) ;
2022-05-09 07:02:11 +00:00
return { { zxid , Coordination : : Error : : ZNONODE } } ;
2022-05-06 12:25:25 +00:00
}
else if ( request . version ! = - 1 & & request . version ! = node - > stat . version )
2022-05-09 07:02:11 +00:00
return { { zxid , Coordination : : Error : : ZBADVERSION } } ;
2022-05-06 12:25:25 +00:00
else if ( node - > stat . numChildren )
2022-05-09 07:02:11 +00:00
return { { zxid , Coordination : : Error : : ZNOTEMPTY } } ;
2022-05-06 12:25:25 +00:00
if ( request . restored_from_zookeeper_log )
update_parent_pzxid ( ) ;
new_deltas . emplace_back (
std : : string { parentPath ( request . path ) } ,
zxid ,
KeeperStorage : : UpdateNodeDelta { [ ] ( KeeperStorage : : Node & parent )
{
- - parent . stat . numChildren ;
+ + parent . stat . cversion ;
} } ) ;
new_deltas . emplace_back ( request . path , zxid , KeeperStorage : : RemoveNodeDelta { request . version } ) ;
return new_deltas ;
}
2021-05-28 11:52:19 +00:00
2022-05-06 12:25:25 +00:00
Coordination : : ZooKeeperResponsePtr process ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t /* time */ ) const override
{
Coordination : : ZooKeeperResponsePtr response_ptr = zk_request - > makeResponse ( ) ;
Coordination : : ZooKeeperRemoveResponse & response = dynamic_cast < Coordination : : ZooKeeperRemoveResponse & > ( * response_ptr ) ;
2020-11-20 08:37:16 +00:00
2022-05-09 08:32:25 +00:00
response . error = storage . commit ( zxid , session_id ) ;
2022-05-06 12:25:25 +00:00
return response_ptr ;
2020-11-04 18:54:55 +00:00
}
2020-11-10 13:43:10 +00:00
2022-05-06 12:25:25 +00:00
KeeperStorage : : ResponsesForSessions
processWatches ( KeeperStorage : : Watches & watches , KeeperStorage : : Watches & list_watches ) const override
2020-11-10 13:43:10 +00:00
{
2021-01-19 14:22:28 +00:00
return processWatchesImpl ( zk_request - > getPath ( ) , watches , list_watches , Coordination : : Event : : DELETED ) ;
2020-11-10 13:43:10 +00:00
}
2020-11-04 18:54:55 +00:00
} ;
2021-08-24 12:30:31 +00:00
struct KeeperStorageExistsRequestProcessor final : public KeeperStorageRequestProcessor
2020-11-04 18:54:55 +00:00
{
2021-08-24 12:30:31 +00:00
using KeeperStorageRequestProcessor : : KeeperStorageRequestProcessor ;
2022-05-06 12:25:25 +00:00
std : : vector < KeeperStorage : : Delta >
preprocess ( KeeperStorage & storage , int64_t zxid , int64_t /*session_id*/ , int64_t /*time*/ ) const override
{
Coordination : : ZooKeeperExistsRequest & request = dynamic_cast < Coordination : : ZooKeeperExistsRequest & > ( * zk_request ) ;
2022-05-11 09:08:39 +00:00
if ( ! storage . uncommitted_state . hasNode ( request . path ) )
2022-05-09 07:02:11 +00:00
return { { zxid , Coordination : : Error : : ZNONODE } } ;
2022-05-06 12:25:25 +00:00
return { } ;
}
2022-05-09 08:32:25 +00:00
template < bool local >
Coordination : : ZooKeeperResponsePtr processImpl ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t /* time */ ) const
2020-11-04 18:54:55 +00:00
{
Coordination : : ZooKeeperResponsePtr response_ptr = zk_request - > makeResponse ( ) ;
Coordination : : ZooKeeperExistsResponse & response = dynamic_cast < Coordination : : ZooKeeperExistsResponse & > ( * response_ptr ) ;
Coordination : : ZooKeeperExistsRequest & request = dynamic_cast < Coordination : : ZooKeeperExistsRequest & > ( * zk_request ) ;
2022-05-10 07:00:38 +00:00
if constexpr ( ! local )
2020-11-04 18:54:55 +00:00
{
2022-05-09 08:32:25 +00:00
if ( const auto result = storage . commit ( zxid , session_id ) ; result ! = Coordination : : Error : : ZOK )
{
response . error = result ;
return response_ptr ;
}
}
const auto on_error = [ & ] ( [[maybe_unused]] const auto error_code )
{
if constexpr ( local )
response . error = error_code ;
else
2022-05-11 12:10:17 +00:00
onStorageInconsistency ( ) ;
2022-05-09 08:32:25 +00:00
} ;
auto & container = storage . container ;
auto node_it = container . find ( request . path ) ;
if ( node_it = = container . end ( ) )
{
on_error ( Coordination : : Error : : ZNONODE ) ;
2020-11-04 18:54:55 +00:00
}
else
{
2022-05-09 08:32:25 +00:00
response . stat = node_it - > value . stat ;
response . error = Coordination : : Error : : ZOK ;
2020-11-04 18:54:55 +00:00
}
2022-05-06 12:25:25 +00:00
return response_ptr ;
2020-11-04 18:54:55 +00:00
}
2022-05-09 08:32:25 +00:00
Coordination : : ZooKeeperResponsePtr process ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t time ) const override
{
return processImpl < false > ( storage , zxid , session_id , time ) ;
}
Coordination : : ZooKeeperResponsePtr processLocal ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t time ) const override
{
return processImpl < true > ( storage , zxid , session_id , time ) ;
}
2020-11-04 18:54:55 +00:00
} ;
2021-08-24 12:30:31 +00:00
struct KeeperStorageSetRequestProcessor final : public KeeperStorageRequestProcessor
2020-11-04 18:54:55 +00:00
{
2022-05-09 09:16:05 +00:00
bool checkAuth ( KeeperStorage & storage , int64_t session_id , bool is_local ) const override
2021-05-21 21:19:22 +00:00
{
2022-05-10 12:53:18 +00:00
return storage . checkACL ( zk_request - > getPath ( ) , Coordination : : ACL : : Write , session_id , is_local ) ;
2021-05-21 21:19:22 +00:00
}
2021-08-24 12:30:31 +00:00
using KeeperStorageRequestProcessor : : KeeperStorageRequestProcessor ;
2022-05-06 12:25:25 +00:00
std : : vector < KeeperStorage : : Delta > preprocess ( KeeperStorage & storage , int64_t zxid , int64_t /*session_id*/ , int64_t time ) const override
{
Coordination : : ZooKeeperSetRequest & request = dynamic_cast < Coordination : : ZooKeeperSetRequest & > ( * zk_request ) ;
std : : vector < KeeperStorage : : Delta > new_deltas ;
2022-05-11 09:08:39 +00:00
if ( ! storage . uncommitted_state . hasNode ( request . path ) )
2022-05-09 07:02:11 +00:00
return { { zxid , Coordination : : Error : : ZNONODE } } ;
2022-05-06 12:25:25 +00:00
2022-05-11 09:08:39 +00:00
auto node = storage . uncommitted_state . getNode ( request . path ) ;
2022-05-06 12:25:25 +00:00
if ( request . version ! = - 1 & & request . version ! = node - > stat . version )
2022-05-09 07:02:11 +00:00
return { { zxid , Coordination : : Error : : ZBADVERSION } } ;
2022-05-06 12:25:25 +00:00
new_deltas . emplace_back (
request . path ,
zxid ,
KeeperStorage : : UpdateNodeDelta {
2022-05-10 09:53:15 +00:00
[ zxid , data = request . data , time ] ( KeeperStorage : : Node & value )
2022-05-06 12:25:25 +00:00
{
value . stat . version + + ;
value . stat . mzxid = zxid ;
value . stat . mtime = time ;
value . stat . dataLength = data . length ( ) ;
2022-05-10 09:53:15 +00:00
value . setData ( data ) ;
2022-05-06 12:25:25 +00:00
} ,
request . version } ) ;
2022-05-10 13:42:16 +00:00
new_deltas . emplace_back (
parentPath ( request . path ) . toString ( ) ,
zxid ,
KeeperStorage : : UpdateNodeDelta
{
[ ] ( KeeperStorage : : Node & parent )
{
parent . stat . cversion + + ;
}
}
) ;
2022-05-06 12:25:25 +00:00
return new_deltas ;
}
Coordination : : ZooKeeperResponsePtr process ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t /*time*/ ) const override
2020-11-04 18:54:55 +00:00
{
2021-05-21 21:19:22 +00:00
auto & container = storage . container ;
2020-11-04 18:54:55 +00:00
Coordination : : ZooKeeperResponsePtr response_ptr = zk_request - > makeResponse ( ) ;
Coordination : : ZooKeeperSetResponse & response = dynamic_cast < Coordination : : ZooKeeperSetResponse & > ( * response_ptr ) ;
Coordination : : ZooKeeperSetRequest & request = dynamic_cast < Coordination : : ZooKeeperSetRequest & > ( * zk_request ) ;
2022-05-09 08:32:25 +00:00
if ( const auto result = storage . commit ( zxid , session_id ) ; result ! = Coordination : : Error : : ZOK )
2020-11-04 18:54:55 +00:00
{
2022-05-06 12:25:25 +00:00
response . error = result ;
2022-05-09 07:02:11 +00:00
return response_ptr ;
2020-11-04 18:54:55 +00:00
}
2021-02-26 13:53:34 +00:00
2022-05-09 08:32:25 +00:00
auto node_it = container . find ( request . path ) ;
if ( node_it = = container . end ( ) )
2022-05-11 12:10:17 +00:00
onStorageInconsistency ( ) ;
2020-11-04 18:54:55 +00:00
2022-05-09 08:32:25 +00:00
response . stat = node_it - > value . stat ;
2022-05-06 12:25:25 +00:00
response . error = Coordination : : Error : : ZOK ;
2020-11-04 18:54:55 +00:00
2022-05-06 12:25:25 +00:00
return response_ptr ;
2020-11-04 18:54:55 +00:00
}
2020-11-10 13:43:10 +00:00
2022-05-06 12:25:25 +00:00
KeeperStorage : : ResponsesForSessions
processWatches ( KeeperStorage : : Watches & watches , KeeperStorage : : Watches & list_watches ) const override
2020-11-10 13:43:10 +00:00
{
2021-01-19 14:22:28 +00:00
return processWatchesImpl ( zk_request - > getPath ( ) , watches , list_watches , Coordination : : Event : : CHANGED ) ;
2020-11-10 13:43:10 +00:00
}
2020-11-04 18:54:55 +00:00
} ;
2021-08-24 12:30:31 +00:00
struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProcessor
2020-11-04 18:54:55 +00:00
{
2022-05-09 09:16:05 +00:00
bool checkAuth ( KeeperStorage & storage , int64_t session_id , bool is_local ) const override
2021-05-21 21:19:22 +00:00
{
2022-05-10 12:53:18 +00:00
return storage . checkACL ( zk_request - > getPath ( ) , Coordination : : ACL : : Read , session_id , is_local ) ;
2021-05-21 21:19:22 +00:00
}
2021-08-24 12:30:31 +00:00
using KeeperStorageRequestProcessor : : KeeperStorageRequestProcessor ;
2022-05-06 12:25:25 +00:00
std : : vector < KeeperStorage : : Delta >
preprocess ( KeeperStorage & storage , int64_t zxid , int64_t /*session_id*/ , int64_t /*time*/ ) const override
{
Coordination : : ZooKeeperListRequest & request = dynamic_cast < Coordination : : ZooKeeperListRequest & > ( * zk_request ) ;
2022-05-11 09:08:39 +00:00
if ( ! storage . uncommitted_state . hasNode ( request . path ) )
2022-05-09 07:02:11 +00:00
return { { zxid , Coordination : : Error : : ZNONODE } } ;
2022-05-06 12:25:25 +00:00
return { } ;
}
2022-05-09 08:32:25 +00:00
template < bool local >
Coordination : : ZooKeeperResponsePtr processImpl ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t /* time */ ) const
2020-11-04 18:54:55 +00:00
{
Coordination : : ZooKeeperResponsePtr response_ptr = zk_request - > makeResponse ( ) ;
Coordination : : ZooKeeperListResponse & response = dynamic_cast < Coordination : : ZooKeeperListResponse & > ( * response_ptr ) ;
Coordination : : ZooKeeperListRequest & request = dynamic_cast < Coordination : : ZooKeeperListRequest & > ( * zk_request ) ;
2022-01-22 19:36:23 +00:00
2022-05-10 07:00:38 +00:00
if constexpr ( ! local )
2020-11-04 18:54:55 +00:00
{
2022-05-09 08:32:25 +00:00
if ( const auto result = storage . commit ( zxid , session_id ) ; result ! = Coordination : : Error : : ZOK )
{
response . error = result ;
return response_ptr ;
}
}
const auto on_error = [ & ] ( [[maybe_unused]] const auto error_code )
{
if constexpr ( local )
response . error = error_code ;
else
2022-05-11 12:10:17 +00:00
onStorageInconsistency ( ) ;
2022-05-09 08:32:25 +00:00
} ;
auto & container = storage . container ;
auto node_it = container . find ( request . path ) ;
if ( node_it = = container . end ( ) )
{
on_error ( Coordination : : Error : : ZNONODE ) ;
2020-11-04 18:54:55 +00:00
}
else
{
auto path_prefix = request . path ;
if ( path_prefix . empty ( ) )
2020-11-25 13:19:09 +00:00
throw DB : : Exception ( " Logical error: path cannot be empty " , ErrorCodes : : LOGICAL_ERROR ) ;
2020-11-04 18:54:55 +00:00
2022-05-09 08:32:25 +00:00
const auto & children = node_it - > value . getChildren ( ) ;
2022-04-05 06:27:03 +00:00
response . names . reserve ( children . size ( ) ) ;
2022-01-19 11:46:29 +00:00
2022-04-05 06:27:03 +00:00
for ( const auto child : children )
2022-01-19 11:46:29 +00:00
response . names . push_back ( child . toString ( ) ) ;
2020-11-04 18:54:55 +00:00
2022-05-09 08:32:25 +00:00
response . stat = node_it - > value . stat ;
2020-11-04 18:54:55 +00:00
response . error = Coordination : : Error : : ZOK ;
}
2022-05-06 12:25:25 +00:00
return response_ptr ;
2020-11-04 18:54:55 +00:00
}
2022-05-09 08:32:25 +00:00
Coordination : : ZooKeeperResponsePtr process ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t time ) const override
{
return processImpl < false > ( storage , zxid , session_id , time ) ;
}
Coordination : : ZooKeeperResponsePtr processLocal ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t time ) const override
{
return processImpl < true > ( storage , zxid , session_id , time ) ;
}
2020-11-04 18:54:55 +00:00
} ;
2021-08-24 12:30:31 +00:00
struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestProcessor
2020-11-04 18:54:55 +00:00
{
2022-05-09 09:16:05 +00:00
bool checkAuth ( KeeperStorage & storage , int64_t session_id , bool is_local ) const override
2021-05-21 21:19:22 +00:00
{
2022-05-10 12:53:18 +00:00
return storage . checkACL ( zk_request - > getPath ( ) , Coordination : : ACL : : Read , session_id , is_local ) ;
2021-05-21 21:19:22 +00:00
}
2021-08-24 12:30:31 +00:00
using KeeperStorageRequestProcessor : : KeeperStorageRequestProcessor ;
2022-05-06 12:25:25 +00:00
std : : vector < KeeperStorage : : Delta >
preprocess ( KeeperStorage & storage , int64_t zxid , int64_t /*session_id*/ , int64_t /*time*/ ) const override
{
Coordination : : ZooKeeperCheckRequest & request = dynamic_cast < Coordination : : ZooKeeperCheckRequest & > ( * zk_request ) ;
2022-05-11 09:08:39 +00:00
if ( ! storage . uncommitted_state . hasNode ( request . path ) )
2022-05-09 07:02:11 +00:00
return { { zxid , Coordination : : Error : : ZNONODE } } ;
2022-05-06 12:25:25 +00:00
2022-05-11 09:08:39 +00:00
auto node = storage . uncommitted_state . getNode ( request . path ) ;
2022-05-06 12:25:25 +00:00
if ( request . version ! = - 1 & & request . version ! = node - > stat . version )
2022-05-09 07:02:11 +00:00
return { { zxid , Coordination : : Error : : ZBADVERSION } } ;
2022-05-06 12:25:25 +00:00
return { } ;
}
2022-05-09 08:32:25 +00:00
template < bool local >
Coordination : : ZooKeeperResponsePtr processImpl ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t /* time */ ) const
2020-11-04 18:54:55 +00:00
{
Coordination : : ZooKeeperResponsePtr response_ptr = zk_request - > makeResponse ( ) ;
Coordination : : ZooKeeperCheckResponse & response = dynamic_cast < Coordination : : ZooKeeperCheckResponse & > ( * response_ptr ) ;
Coordination : : ZooKeeperCheckRequest & request = dynamic_cast < Coordination : : ZooKeeperCheckRequest & > ( * zk_request ) ;
2022-05-09 08:32:25 +00:00
2022-05-10 07:00:38 +00:00
if constexpr ( ! local )
2022-05-09 08:32:25 +00:00
{
if ( const auto result = storage . commit ( zxid , session_id ) ; result ! = Coordination : : Error : : ZOK )
{
response . error = result ;
return response_ptr ;
}
}
const auto on_error = [ & ] ( [[maybe_unused]] const auto error_code )
{
if constexpr ( local )
response . error = error_code ;
else
2022-05-11 12:10:17 +00:00
onStorageInconsistency ( ) ;
2022-05-09 08:32:25 +00:00
} ;
auto & container = storage . container ;
auto node_it = container . find ( request . path ) ;
if ( node_it = = container . end ( ) )
2020-11-04 18:54:55 +00:00
{
2022-05-09 08:32:25 +00:00
on_error ( Coordination : : Error : : ZNONODE ) ;
2020-11-04 18:54:55 +00:00
}
2022-05-09 08:32:25 +00:00
else if ( request . version ! = - 1 & & request . version ! = node_it - > value . stat . version )
2020-11-04 18:54:55 +00:00
{
2022-05-09 08:32:25 +00:00
on_error ( Coordination : : Error : : ZBADVERSION ) ;
2020-11-04 18:54:55 +00:00
}
else
{
response . error = Coordination : : Error : : ZOK ;
}
2022-05-06 12:25:25 +00:00
return response_ptr ;
2020-11-04 18:54:55 +00:00
}
2022-05-09 08:32:25 +00:00
Coordination : : ZooKeeperResponsePtr process ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t time ) const override
{
return processImpl < false > ( storage , zxid , session_id , time ) ;
}
Coordination : : ZooKeeperResponsePtr processLocal ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t time ) const override
{
return processImpl < true > ( storage , zxid , session_id , time ) ;
}
2020-11-04 18:54:55 +00:00
} ;
2021-06-22 10:49:35 +00:00
2021-08-24 12:30:31 +00:00
struct KeeperStorageSetACLRequestProcessor final : public KeeperStorageRequestProcessor
2021-06-22 10:49:35 +00:00
{
2022-05-09 09:16:05 +00:00
bool checkAuth ( KeeperStorage & storage , int64_t session_id , bool is_local ) const override
2021-06-22 10:49:35 +00:00
{
2022-05-10 12:53:18 +00:00
return storage . checkACL ( zk_request - > getPath ( ) , Coordination : : ACL : : Admin , session_id , is_local ) ;
2021-06-22 10:49:35 +00:00
}
2021-08-24 12:30:31 +00:00
using KeeperStorageRequestProcessor : : KeeperStorageRequestProcessor ;
2021-06-22 10:49:35 +00:00
2022-05-06 12:25:25 +00:00
std : : vector < KeeperStorage : : Delta > preprocess ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t /*time*/ ) const override
2021-06-22 10:49:35 +00:00
{
2022-05-06 12:25:25 +00:00
Coordination : : ZooKeeperSetACLRequest & request = dynamic_cast < Coordination : : ZooKeeperSetACLRequest & > ( * zk_request ) ;
2022-05-11 09:08:39 +00:00
auto & uncommitted_state = storage . uncommitted_state ;
if ( ! uncommitted_state . hasNode ( request . path ) )
2022-05-09 07:02:11 +00:00
return { { zxid , Coordination : : Error : : ZNONODE } } ;
2022-05-06 12:25:25 +00:00
2022-05-11 09:08:39 +00:00
auto node = uncommitted_state . getNode ( request . path ) ;
2022-05-06 12:25:25 +00:00
if ( request . version ! = - 1 & & request . version ! = node - > stat . aversion )
2022-05-09 07:02:11 +00:00
return { { zxid , Coordination : : Error : : ZBADVERSION } } ;
2022-05-06 12:25:25 +00:00
auto & session_auth_ids = storage . session_and_auth [ session_id ] ;
Coordination : : ACLs node_acls ;
if ( ! fixupACL ( request . acls , session_auth_ids , node_acls ) )
2022-05-09 07:02:11 +00:00
return { { zxid , Coordination : : Error : : ZINVALIDACL } } ;
2022-05-06 12:25:25 +00:00
return {
{ request . path , zxid , KeeperStorage : : SetACLDelta { std : : move ( node_acls ) , request . version } } ,
{ request . path , zxid , KeeperStorage : : UpdateNodeDelta { [ ] ( KeeperStorage : : Node & n ) { + + n . stat . aversion ; } } } } ;
}
2021-06-22 10:49:35 +00:00
2022-05-06 12:25:25 +00:00
Coordination : : ZooKeeperResponsePtr process ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t /* time */ ) const override
{
2021-06-22 10:49:35 +00:00
Coordination : : ZooKeeperResponsePtr response_ptr = zk_request - > makeResponse ( ) ;
Coordination : : ZooKeeperSetACLResponse & response = dynamic_cast < Coordination : : ZooKeeperSetACLResponse & > ( * response_ptr ) ;
Coordination : : ZooKeeperSetACLRequest & request = dynamic_cast < Coordination : : ZooKeeperSetACLRequest & > ( * zk_request ) ;
2022-05-06 12:25:25 +00:00
2022-05-09 08:32:25 +00:00
if ( const auto result = storage . commit ( zxid , session_id ) ; result ! = Coordination : : Error : : ZOK )
2021-06-22 10:49:35 +00:00
{
2022-05-06 12:25:25 +00:00
response . error = result ;
2022-05-09 07:02:11 +00:00
return response_ptr ;
2021-06-22 10:49:35 +00:00
}
2022-05-09 08:32:25 +00:00
auto node_it = storage . container . find ( request . path ) ;
if ( node_it = = storage . container . end ( ) )
2022-05-11 12:10:17 +00:00
onStorageInconsistency ( ) ;
2022-05-09 08:32:25 +00:00
response . stat = node_it - > value . stat ;
2022-05-06 12:25:25 +00:00
response . error = Coordination : : Error : : ZOK ;
2021-06-22 10:49:35 +00:00
2022-05-06 12:25:25 +00:00
return response_ptr ;
2021-06-22 10:49:35 +00:00
}
} ;
2021-08-24 12:30:31 +00:00
struct KeeperStorageGetACLRequestProcessor final : public KeeperStorageRequestProcessor
2021-06-22 10:49:35 +00:00
{
2022-05-09 09:16:05 +00:00
bool checkAuth ( KeeperStorage & storage , int64_t session_id , bool is_local ) const override
2021-06-22 10:49:35 +00:00
{
2022-05-10 12:53:18 +00:00
return storage . checkACL ( zk_request - > getPath ( ) , Coordination : : ACL : : Admin | Coordination : : ACL : : Read , session_id , is_local ) ;
2021-06-22 10:49:35 +00:00
}
2022-05-10 12:53:18 +00:00
2021-08-24 12:30:31 +00:00
using KeeperStorageRequestProcessor : : KeeperStorageRequestProcessor ;
2021-06-22 10:49:35 +00:00
2022-05-06 12:25:25 +00:00
std : : vector < KeeperStorage : : Delta >
preprocess ( KeeperStorage & storage , int64_t zxid , int64_t /*session_id*/ , int64_t /*time*/ ) const override
{
Coordination : : ZooKeeperGetACLRequest & request = dynamic_cast < Coordination : : ZooKeeperGetACLRequest & > ( * zk_request ) ;
2022-05-11 09:08:39 +00:00
if ( ! storage . uncommitted_state . hasNode ( request . path ) )
2022-05-09 07:02:11 +00:00
return { { zxid , Coordination : : Error : : ZNONODE } } ;
2022-05-06 12:25:25 +00:00
return { } ;
}
2022-05-09 08:32:25 +00:00
template < bool local >
Coordination : : ZooKeeperResponsePtr processImpl ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t /* time */ ) const
2021-06-22 10:49:35 +00:00
{
Coordination : : ZooKeeperResponsePtr response_ptr = zk_request - > makeResponse ( ) ;
Coordination : : ZooKeeperGetACLResponse & response = dynamic_cast < Coordination : : ZooKeeperGetACLResponse & > ( * response_ptr ) ;
Coordination : : ZooKeeperGetACLRequest & request = dynamic_cast < Coordination : : ZooKeeperGetACLRequest & > ( * zk_request ) ;
2022-05-09 08:32:25 +00:00
2022-05-10 07:00:38 +00:00
if constexpr ( ! local )
2022-05-09 08:32:25 +00:00
{
if ( const auto result = storage . commit ( zxid , session_id ) ; result ! = Coordination : : Error : : ZOK )
{
response . error = result ;
return response_ptr ;
}
}
const auto on_error = [ & ] ( [[maybe_unused]] const auto error_code )
{
if constexpr ( local )
response . error = error_code ;
else
2022-05-11 12:10:17 +00:00
onStorageInconsistency ( ) ;
2022-05-09 08:32:25 +00:00
} ;
2021-06-22 10:49:35 +00:00
auto & container = storage . container ;
2022-05-09 08:32:25 +00:00
auto node_it = container . find ( request . path ) ;
if ( node_it = = container . end ( ) )
2021-06-22 10:49:35 +00:00
{
2022-05-09 08:32:25 +00:00
on_error ( Coordination : : Error : : ZNONODE ) ;
2021-06-22 10:49:35 +00:00
}
else
{
2022-05-09 08:32:25 +00:00
response . stat = node_it - > value . stat ;
response . acl = storage . acl_map . convertNumber ( node_it - > value . acl_id ) ;
2021-06-22 10:49:35 +00:00
}
2022-05-06 12:25:25 +00:00
return response_ptr ;
2021-06-22 10:49:35 +00:00
}
2022-05-09 08:32:25 +00:00
Coordination : : ZooKeeperResponsePtr process ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t time ) const override
{
return processImpl < false > ( storage , zxid , session_id , time ) ;
}
Coordination : : ZooKeeperResponsePtr processLocal ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t time ) const override
{
return processImpl < true > ( storage , zxid , session_id , time ) ;
}
2021-06-22 10:49:35 +00:00
} ;
2021-08-24 12:30:31 +00:00
struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestProcessor
2020-11-04 18:54:55 +00:00
{
2022-05-09 09:16:05 +00:00
bool checkAuth ( KeeperStorage & storage , int64_t session_id , bool is_local ) const override
2021-05-21 21:19:22 +00:00
{
for ( const auto & concrete_request : concrete_requests )
2022-05-09 09:16:05 +00:00
if ( ! concrete_request - > checkAuth ( storage , session_id , is_local ) )
2021-05-21 21:19:22 +00:00
return false ;
return true ;
}
2021-08-24 12:30:31 +00:00
std : : vector < KeeperStorageRequestProcessorPtr > concrete_requests ;
explicit KeeperStorageMultiRequestProcessor ( const Coordination : : ZooKeeperRequestPtr & zk_request_ )
: KeeperStorageRequestProcessor ( zk_request_ )
2020-11-04 18:54:55 +00:00
{
Coordination : : ZooKeeperMultiRequest & request = dynamic_cast < Coordination : : ZooKeeperMultiRequest & > ( * zk_request ) ;
concrete_requests . reserve ( request . requests . size ( ) ) ;
2020-11-25 13:19:09 +00:00
for ( const auto & sub_request : request . requests )
2020-11-04 18:54:55 +00:00
{
2020-12-10 21:33:13 +00:00
auto sub_zk_request = std : : dynamic_pointer_cast < Coordination : : ZooKeeperRequest > ( sub_request ) ;
2022-04-05 06:27:03 +00:00
switch ( sub_zk_request - > getOpNum ( ) )
2020-11-04 18:54:55 +00:00
{
2022-04-05 06:27:03 +00:00
case Coordination : : OpNum : : Create :
concrete_requests . push_back ( std : : make_shared < KeeperStorageCreateRequestProcessor > ( sub_zk_request ) ) ;
break ;
case Coordination : : OpNum : : Remove :
concrete_requests . push_back ( std : : make_shared < KeeperStorageRemoveRequestProcessor > ( sub_zk_request ) ) ;
break ;
case Coordination : : OpNum : : Set :
concrete_requests . push_back ( std : : make_shared < KeeperStorageSetRequestProcessor > ( sub_zk_request ) ) ;
break ;
case Coordination : : OpNum : : Check :
concrete_requests . push_back ( std : : make_shared < KeeperStorageCheckRequestProcessor > ( sub_zk_request ) ) ;
break ;
default :
2022-05-06 12:25:25 +00:00
throw DB : : Exception (
ErrorCodes : : BAD_ARGUMENTS , " Illegal command as part of multi ZooKeeper request {} " , sub_zk_request - > getOpNum ( ) ) ;
2020-11-04 18:54:55 +00:00
}
}
}
2022-05-09 07:02:11 +00:00
std : : vector < KeeperStorage : : Delta > preprocess ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t time ) const override
2022-05-06 12:25:25 +00:00
{
2022-05-09 08:32:25 +00:00
// manually add deltas so that the result of previous request in the transaction is used in the next request
2022-05-11 09:08:39 +00:00
auto & saved_deltas = storage . uncommitted_state . deltas ;
2022-05-09 07:02:11 +00:00
std : : vector < Coordination : : Error > response_errors ;
response_errors . reserve ( concrete_requests . size ( ) ) ;
for ( size_t i = 0 ; i < concrete_requests . size ( ) ; + + i )
{
auto new_deltas = concrete_requests [ i ] - > preprocess ( storage , zxid , session_id , time ) ;
2022-05-09 11:53:19 +00:00
if ( ! new_deltas . empty ( ) )
2022-05-09 07:02:11 +00:00
{
2022-05-09 11:53:19 +00:00
if ( auto * error = std : : get_if < KeeperStorage : : ErrorDelta > ( & new_deltas . back ( ) . operation ) )
{
std : : erase_if ( saved_deltas , [ zxid ] ( const auto & delta ) { return delta . zxid = = zxid ; } ) ;
2022-05-09 07:02:11 +00:00
2022-05-09 11:53:19 +00:00
response_errors . push_back ( error - > error ) ;
2022-05-09 07:02:11 +00:00
2022-05-09 11:53:19 +00:00
for ( size_t j = i + 1 ; j < concrete_requests . size ( ) ; + + j )
{
response_errors . push_back ( Coordination : : Error : : ZRUNTIMEINCONSISTENCY ) ;
}
2022-05-09 07:02:11 +00:00
2022-05-09 11:53:19 +00:00
return { { zxid , KeeperStorage : : FailedMultiDelta { std : : move ( response_errors ) } } } ;
}
2022-05-09 07:02:11 +00:00
}
new_deltas . emplace_back ( zxid , KeeperStorage : : SubDeltaEnd { } ) ;
response_errors . push_back ( Coordination : : Error : : ZOK ) ;
saved_deltas . insert ( saved_deltas . end ( ) , std : : make_move_iterator ( new_deltas . begin ( ) ) , std : : make_move_iterator ( new_deltas . end ( ) ) ) ;
}
return { } ;
2022-05-06 12:25:25 +00:00
}
Coordination : : ZooKeeperResponsePtr process ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t time ) const override
2020-11-04 18:54:55 +00:00
{
Coordination : : ZooKeeperResponsePtr response_ptr = zk_request - > makeResponse ( ) ;
Coordination : : ZooKeeperMultiResponse & response = dynamic_cast < Coordination : : ZooKeeperMultiResponse & > ( * response_ptr ) ;
2022-05-11 09:08:39 +00:00
auto & deltas = storage . uncommitted_state . deltas ;
2022-05-09 07:02:11 +00:00
if ( auto * failed_multi = std : : get_if < KeeperStorage : : FailedMultiDelta > ( & deltas . front ( ) . operation ) )
2020-11-04 18:54:55 +00:00
{
2022-05-09 07:02:11 +00:00
for ( size_t i = 0 ; i < concrete_requests . size ( ) ; + + i )
2020-11-04 18:54:55 +00:00
{
2022-05-09 07:02:11 +00:00
response . responses [ i ] = std : : make_shared < Coordination : : ZooKeeperErrorResponse > ( ) ;
response . responses [ i ] - > error = failed_multi - > error_codes [ i ] ;
}
2020-12-08 13:28:39 +00:00
2022-05-09 07:02:11 +00:00
return response_ptr ;
}
2020-12-08 13:28:39 +00:00
2022-05-09 08:32:25 +00:00
for ( size_t i = 0 ; i < concrete_requests . size ( ) ; + + i )
2022-05-09 07:02:11 +00:00
{
2022-05-09 08:32:25 +00:00
auto cur_response = concrete_requests [ i ] - > process ( storage , zxid , session_id , time ) ;
2020-11-04 18:54:55 +00:00
2022-05-09 07:02:11 +00:00
while ( ! deltas . empty ( ) )
{
if ( std : : holds_alternative < KeeperStorage : : SubDeltaEnd > ( deltas . front ( ) . operation ) )
{
deltas . pop_front ( ) ;
break ;
2020-11-04 18:54:55 +00:00
}
2020-12-08 13:28:39 +00:00
2022-05-09 07:02:11 +00:00
deltas . pop_front ( ) ;
2020-11-04 18:54:55 +00:00
}
2022-05-09 07:02:11 +00:00
response . responses [ i ] = cur_response ;
2022-05-09 08:32:25 +00:00
}
response . error = Coordination : : Error : : ZOK ;
return response_ptr ;
}
Coordination : : ZooKeeperResponsePtr processLocal ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t time ) const override
{
Coordination : : ZooKeeperResponsePtr response_ptr = zk_request - > makeResponse ( ) ;
Coordination : : ZooKeeperMultiResponse & response = dynamic_cast < Coordination : : ZooKeeperMultiResponse & > ( * response_ptr ) ;
for ( size_t i = 0 ; i < concrete_requests . size ( ) ; + + i )
{
auto cur_response = concrete_requests [ i ] - > process ( storage , zxid , session_id , time ) ;
response . responses [ i ] = cur_response ;
if ( cur_response - > error ! = Coordination : : Error : : ZOK )
{
for ( size_t j = 0 ; j < = i ; + + j )
{
auto response_error = response . responses [ j ] - > error ;
response . responses [ j ] = std : : make_shared < Coordination : : ZooKeeperErrorResponse > ( ) ;
response . responses [ j ] - > error = response_error ;
}
for ( size_t j = i + 1 ; j < response . responses . size ( ) ; + + j )
{
response . responses [ j ] = std : : make_shared < Coordination : : ZooKeeperErrorResponse > ( ) ;
response . responses [ j ] - > error = Coordination : : Error : : ZRUNTIMEINCONSISTENCY ;
}
2022-05-09 07:02:11 +00:00
2022-05-09 08:32:25 +00:00
return response_ptr ;
}
2020-11-04 18:54:55 +00:00
}
2022-05-09 07:02:11 +00:00
response . error = Coordination : : Error : : ZOK ;
return response_ptr ;
2020-11-04 18:54:55 +00:00
}
2020-11-10 13:43:10 +00:00
2022-05-06 12:25:25 +00:00
KeeperStorage : : ResponsesForSessions
processWatches ( KeeperStorage : : Watches & watches , KeeperStorage : : Watches & list_watches ) const override
2020-11-10 13:43:10 +00:00
{
2021-03-29 08:24:56 +00:00
KeeperStorage : : ResponsesForSessions result ;
2020-11-10 13:43:10 +00:00
for ( const auto & generic_request : concrete_requests )
2021-01-19 14:22:28 +00:00
{
auto responses = generic_request - > processWatches ( watches , list_watches ) ;
result . insert ( result . end ( ) , responses . begin ( ) , responses . end ( ) ) ;
}
return result ;
2020-11-10 13:43:10 +00:00
}
2020-11-04 18:54:55 +00:00
} ;
2021-08-24 12:30:31 +00:00
struct KeeperStorageCloseRequestProcessor final : public KeeperStorageRequestProcessor
2020-11-19 16:06:19 +00:00
{
2021-08-24 12:30:31 +00:00
using KeeperStorageRequestProcessor : : KeeperStorageRequestProcessor ;
2022-05-06 12:25:25 +00:00
Coordination : : ZooKeeperResponsePtr process ( KeeperStorage & , int64_t , int64_t , int64_t /* time */ ) const override
2020-11-19 16:06:19 +00:00
{
2020-11-25 13:19:09 +00:00
throw DB : : Exception ( " Called process on close request " , ErrorCodes : : LOGICAL_ERROR ) ;
2020-11-19 16:06:19 +00:00
}
} ;
2021-08-24 12:30:31 +00:00
struct KeeperStorageAuthRequestProcessor final : public KeeperStorageRequestProcessor
2021-05-15 15:01:00 +00:00
{
2021-08-24 12:30:31 +00:00
using KeeperStorageRequestProcessor : : KeeperStorageRequestProcessor ;
2022-05-10 12:53:18 +00:00
std : : vector < KeeperStorage : : Delta > preprocess ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t /*time*/ ) const override
2021-05-15 15:01:00 +00:00
{
2021-05-21 21:19:22 +00:00
Coordination : : ZooKeeperAuthRequest & auth_request = dynamic_cast < Coordination : : ZooKeeperAuthRequest & > ( * zk_request ) ;
2021-05-15 15:01:00 +00:00
Coordination : : ZooKeeperResponsePtr response_ptr = zk_request - > makeResponse ( ) ;
2021-05-21 21:19:22 +00:00
2021-05-24 12:18:04 +00:00
if ( auth_request . scheme ! = " digest " | | std : : count ( auth_request . data . begin ( ) , auth_request . data . end ( ) , ' : ' ) ! = 1 )
2022-05-10 12:53:18 +00:00
return { { zxid , Coordination : : Error : : ZAUTHFAILED } } ;
std : : vector < KeeperStorage : : Delta > new_deltas ;
auto digest = generateDigest ( auth_request . data ) ;
if ( digest = = storage . superdigest )
2021-05-24 12:18:04 +00:00
{
2022-05-10 12:53:18 +00:00
KeeperStorage : : AuthID auth { " super " , " " } ;
new_deltas . emplace_back ( zxid , KeeperStorage : : AddAuthDelta { session_id , std : : move ( auth ) } ) ;
2021-05-24 12:18:04 +00:00
}
else
2021-05-22 16:07:47 +00:00
{
2022-05-10 12:53:18 +00:00
KeeperStorage : : AuthID new_auth { auth_request . scheme , digest } ;
2022-05-11 09:08:39 +00:00
if ( ! storage . uncommitted_state . hasACL ( session_id , false , [ & ] ( const auto & auth_id ) { return new_auth = = auth_id ; } ) )
2022-05-10 12:53:18 +00:00
new_deltas . emplace_back ( zxid , KeeperStorage : : AddAuthDelta { session_id , std : : move ( new_auth ) } ) ;
2021-05-21 21:19:22 +00:00
}
2022-05-10 12:53:18 +00:00
return new_deltas ;
}
Coordination : : ZooKeeperResponsePtr process ( KeeperStorage & storage , int64_t zxid , int64_t session_id , int64_t /* time */ ) const override
{
Coordination : : ZooKeeperResponsePtr response_ptr = zk_request - > makeResponse ( ) ;
Coordination : : ZooKeeperAuthResponse & auth_response = dynamic_cast < Coordination : : ZooKeeperAuthResponse & > ( * response_ptr ) ;
if ( const auto result = storage . commit ( zxid , session_id ) ; result ! = Coordination : : Error : : ZOK )
auth_response . error = result ;
2022-05-09 07:02:11 +00:00
return response_ptr ;
2021-05-15 15:01:00 +00:00
}
} ;
2021-03-29 08:24:56 +00:00
void KeeperStorage : : finalize ( )
2020-11-03 14:49:30 +00:00
{
2021-01-19 14:22:28 +00:00
if ( finalized )
throw DB : : Exception ( " Testkeeper storage already finalized " , ErrorCodes : : LOGICAL_ERROR ) ;
2020-11-10 13:43:10 +00:00
2021-01-19 14:22:28 +00:00
finalized = true ;
2020-11-03 14:49:30 +00:00
2021-02-04 13:22:30 +00:00
for ( const auto & [ session_id , ephemerals_paths ] : ephemerals )
for ( const String & ephemeral_path : ephemerals_paths )
2021-02-04 12:07:41 +00:00
container . erase ( ephemeral_path ) ;
ephemerals . clear ( ) ;
2020-11-18 20:36:25 +00:00
2021-01-19 14:22:28 +00:00
watches . clear ( ) ;
list_watches . clear ( ) ;
sessions_and_watchers . clear ( ) ;
2021-02-04 12:07:41 +00:00
session_expiry_queue . clear ( ) ;
2020-11-04 18:54:55 +00:00
}
2021-08-24 12:30:31 +00:00
class KeeperStorageRequestProcessorsFactory final : private boost : : noncopyable
2020-11-04 18:54:55 +00:00
{
public :
2021-08-24 12:30:31 +00:00
using Creator = std : : function < KeeperStorageRequestProcessorPtr ( const Coordination : : ZooKeeperRequestPtr & ) > ;
2020-11-11 13:07:06 +00:00
using OpNumToRequest = std : : unordered_map < Coordination : : OpNum , Creator > ;
2020-11-04 18:54:55 +00:00
2021-08-24 12:30:31 +00:00
static KeeperStorageRequestProcessorsFactory & instance ( )
2020-11-04 18:54:55 +00:00
{
2021-08-24 12:30:31 +00:00
static KeeperStorageRequestProcessorsFactory factory ;
2020-11-04 18:54:55 +00:00
return factory ;
}
2021-08-24 12:30:31 +00:00
KeeperStorageRequestProcessorPtr get ( const Coordination : : ZooKeeperRequestPtr & zk_request ) const
2020-11-04 18:54:55 +00:00
{
2022-05-09 09:45:03 +00:00
auto request_it = op_num_to_request . find ( zk_request - > getOpNum ( ) ) ;
if ( request_it = = op_num_to_request . end ( ) )
2020-11-25 13:19:09 +00:00
throw DB : : Exception ( " Unknown operation type " + toString ( zk_request - > getOpNum ( ) ) , ErrorCodes : : LOGICAL_ERROR ) ;
2020-11-04 18:54:55 +00:00
2022-05-09 09:45:03 +00:00
return request_it - > second ( zk_request ) ;
2020-11-04 18:54:55 +00:00
}
2020-11-03 14:49:30 +00:00
2020-11-11 13:07:06 +00:00
void registerRequest ( Coordination : : OpNum op_num , Creator creator )
2020-11-04 18:54:55 +00:00
{
if ( ! op_num_to_request . try_emplace ( op_num , creator ) . second )
throw DB : : Exception ( ErrorCodes : : LOGICAL_ERROR , " Request with op num {} already registered " , op_num ) ;
}
private :
OpNumToRequest op_num_to_request ;
2021-08-24 12:30:31 +00:00
KeeperStorageRequestProcessorsFactory ( ) ;
2020-11-04 18:54:55 +00:00
} ;
2022-05-06 12:25:25 +00:00
template < Coordination : : OpNum num , typename RequestT >
2021-08-24 12:30:31 +00:00
void registerKeeperRequestProcessor ( KeeperStorageRequestProcessorsFactory & factory )
2020-11-04 18:54:55 +00:00
{
2022-05-06 12:25:25 +00:00
factory . registerRequest (
num , [ ] ( const Coordination : : ZooKeeperRequestPtr & zk_request ) { return std : : make_shared < RequestT > ( zk_request ) ; } ) ;
2020-11-04 18:54:55 +00:00
}
2021-08-24 12:30:31 +00:00
KeeperStorageRequestProcessorsFactory : : KeeperStorageRequestProcessorsFactory ( )
2020-11-04 18:54:55 +00:00
{
2021-08-24 12:30:31 +00:00
registerKeeperRequestProcessor < Coordination : : OpNum : : Heartbeat , KeeperStorageHeartbeatRequestProcessor > ( * this ) ;
registerKeeperRequestProcessor < Coordination : : OpNum : : Sync , KeeperStorageSyncRequestProcessor > ( * this ) ;
registerKeeperRequestProcessor < Coordination : : OpNum : : Auth , KeeperStorageAuthRequestProcessor > ( * this ) ;
registerKeeperRequestProcessor < Coordination : : OpNum : : Close , KeeperStorageCloseRequestProcessor > ( * this ) ;
registerKeeperRequestProcessor < Coordination : : OpNum : : Create , KeeperStorageCreateRequestProcessor > ( * this ) ;
registerKeeperRequestProcessor < Coordination : : OpNum : : Remove , KeeperStorageRemoveRequestProcessor > ( * this ) ;
registerKeeperRequestProcessor < Coordination : : OpNum : : Exists , KeeperStorageExistsRequestProcessor > ( * this ) ;
registerKeeperRequestProcessor < Coordination : : OpNum : : Get , KeeperStorageGetRequestProcessor > ( * this ) ;
registerKeeperRequestProcessor < Coordination : : OpNum : : Set , KeeperStorageSetRequestProcessor > ( * this ) ;
registerKeeperRequestProcessor < Coordination : : OpNum : : List , KeeperStorageListRequestProcessor > ( * this ) ;
registerKeeperRequestProcessor < Coordination : : OpNum : : SimpleList , KeeperStorageListRequestProcessor > ( * this ) ;
registerKeeperRequestProcessor < Coordination : : OpNum : : Check , KeeperStorageCheckRequestProcessor > ( * this ) ;
registerKeeperRequestProcessor < Coordination : : OpNum : : Multi , KeeperStorageMultiRequestProcessor > ( * this ) ;
registerKeeperRequestProcessor < Coordination : : OpNum : : SetACL , KeeperStorageSetACLRequestProcessor > ( * this ) ;
registerKeeperRequestProcessor < Coordination : : OpNum : : GetACL , KeeperStorageGetACLRequestProcessor > ( * this ) ;
2020-11-03 14:49:30 +00:00
}
2022-05-09 07:02:11 +00:00
void KeeperStorage : : preprocessRequest (
2022-05-09 09:16:05 +00:00
const Coordination : : ZooKeeperRequestPtr & zk_request , int64_t session_id , int64_t time , int64_t new_last_zxid , bool check_acl )
2022-05-05 10:32:41 +00:00
{
2022-05-12 08:58:36 +00:00
int64_t last_zxid = uncommitted_zxids . empty ( ) ? zxid : uncommitted_zxids . back ( ) ;
if ( new_last_zxid < last_zxid | | ( uncommitted_zxids . empty ( ) & & new_last_zxid = = last_zxid ) )
throw Exception (
ErrorCodes : : LOGICAL_ERROR , " Got new ZXID {} smaller or equal to current ZXID ({}). It's a bug " , new_last_zxid , zxid ) ;
if ( new_last_zxid = = last_zxid )
// last uncommitted zxids is same as the current one so we are probably pre_committing on the leader
// but the leader already preprocessed the request while he appended the ZXID
// same ZXIDs in other cases should not happen but we can be more sure of that once we add the digest
// i.e. we will comapare both ZXID and the digest
return ;
uncommitted_zxids . push_back ( new_last_zxid ) ;
2022-05-09 09:16:05 +00:00
KeeperStorageRequestProcessorPtr request_processor = KeeperStorageRequestProcessorsFactory : : instance ( ) . get ( zk_request ) ;
2022-05-09 09:35:16 +00:00
if ( zk_request - > getOpNum ( ) = = Coordination : : OpNum : : Close ) /// Close request is special
{
2022-05-11 09:08:39 +00:00
auto & deltas = uncommitted_state . deltas ;
2022-05-09 09:35:16 +00:00
auto session_ephemerals = ephemerals . find ( session_id ) ;
if ( session_ephemerals ! = ephemerals . end ( ) )
{
for ( const auto & ephemeral_path : session_ephemerals - > second )
{
2022-05-11 09:08:39 +00:00
if ( uncommitted_state . hasNode ( ephemeral_path ) )
2022-05-09 09:35:16 +00:00
{
2022-05-09 09:42:23 +00:00
deltas . emplace_back (
parentPath ( ephemeral_path ) . toString ( ) ,
new_last_zxid ,
UpdateNodeDelta { [ ephemeral_path ] ( Node & parent )
{
- - parent . stat . numChildren ;
+ + parent . stat . cversion ;
} } ) ;
2022-05-09 09:35:16 +00:00
deltas . emplace_back ( ephemeral_path , new_last_zxid , RemoveNodeDelta ( ) ) ;
}
}
}
return ;
}
2022-05-09 09:16:05 +00:00
if ( check_acl & & ! request_processor - > checkAuth ( * this , session_id , false ) )
{
2022-05-11 09:08:39 +00:00
uncommitted_state . deltas . emplace_back ( new_last_zxid , Coordination : : Error : : ZNOAUTH ) ;
2022-05-09 09:16:05 +00:00
return ;
}
2022-05-06 12:25:25 +00:00
auto new_deltas = request_processor - > preprocess ( * this , new_last_zxid , session_id , time ) ;
2022-05-11 09:08:39 +00:00
uncommitted_state . deltas . insert (
uncommitted_state . deltas . end ( ) , std : : make_move_iterator ( new_deltas . begin ( ) ) , std : : make_move_iterator ( new_deltas . end ( ) ) ) ;
2022-05-05 10:32:41 +00:00
}
2022-05-06 12:25:25 +00:00
KeeperStorage : : ResponsesForSessions KeeperStorage : : processRequest (
const Coordination : : ZooKeeperRequestPtr & zk_request ,
int64_t session_id ,
int64_t time ,
std : : optional < int64_t > new_last_zxid ,
2022-05-09 08:32:25 +00:00
bool check_acl ,
bool is_local )
2020-11-03 14:49:30 +00:00
{
2021-02-25 08:34:05 +00:00
if ( new_last_zxid )
{
2022-05-12 08:58:36 +00:00
if ( uncommitted_zxids . empty ( ) )
2022-05-06 12:25:25 +00:00
throw Exception (
2022-05-12 08:58:36 +00:00
ErrorCodes : : LOGICAL_ERROR , " Trying to commit a ZXID ({}) which was not preprocessed " , * new_last_zxid ) ;
if ( uncommitted_zxids . front ( ) ! = * new_last_zxid )
throw Exception (
ErrorCodes : : LOGICAL_ERROR , " Trying to commit a ZXID {} while the next ZXID to commit is {} " , * new_last_zxid , uncommitted_zxids . front ( ) ) ;
2021-02-25 08:34:05 +00:00
zxid = * new_last_zxid ;
2022-05-12 08:58:36 +00:00
uncommitted_zxids . pop_front ( ) ;
2021-02-25 08:34:05 +00:00
}
2022-05-12 08:58:36 +00:00
KeeperStorage : : ResponsesForSessions results ;
2021-09-02 11:54:32 +00:00
/// ZooKeeper update sessions expirity for each request, not only for heartbeats
2021-09-02 11:43:34 +00:00
session_expiry_queue . addNewSessionOrUpdate ( session_id , session_and_timeout [ session_id ] ) ;
2021-08-24 12:30:31 +00:00
if ( zk_request - > getOpNum ( ) = = Coordination : : OpNum : : Close ) /// Close request is special
2020-11-03 14:49:30 +00:00
{
2022-05-09 09:35:16 +00:00
commit ( zxid , session_id ) ;
2021-03-16 12:36:54 +00:00
2022-05-11 09:08:39 +00:00
for ( const auto & delta : uncommitted_state . deltas )
2022-05-09 09:35:16 +00:00
{
if ( delta . zxid > zxid )
break ;
2022-01-22 19:36:23 +00:00
2022-05-09 09:35:16 +00:00
if ( std : : holds_alternative < RemoveNodeDelta > ( delta . operation ) )
{
auto responses = processWatchesImpl ( delta . path , watches , list_watches , Coordination : : Event : : DELETED ) ;
2021-01-19 14:22:28 +00:00
results . insert ( results . end ( ) , responses . begin ( ) , responses . end ( ) ) ;
}
}
2022-05-09 09:35:16 +00:00
2022-05-11 09:08:39 +00:00
std : : erase_if ( uncommitted_state . deltas , [ this ] ( const auto & delta ) { return delta . zxid = = zxid ; } ) ;
2022-05-09 09:35:16 +00:00
2021-01-19 14:22:28 +00:00
clearDeadWatches ( session_id ) ;
2021-05-27 08:15:46 +00:00
auto auth_it = session_and_auth . find ( session_id ) ;
if ( auth_it ! = session_and_auth . end ( ) )
session_and_auth . erase ( auth_it ) ;
2021-01-19 14:22:28 +00:00
/// Finish connection
auto response = std : : make_shared < Coordination : : ZooKeeperCloseResponse > ( ) ;
response - > xid = zk_request - > xid ;
response - > zxid = getZXID ( ) ;
2021-02-03 20:32:15 +00:00
session_expiry_queue . remove ( session_id ) ;
session_and_timeout . erase ( session_id ) ;
results . push_back ( ResponseForSession { session_id , response } ) ;
}
2021-08-24 12:30:31 +00:00
else if ( zk_request - > getOpNum ( ) = = Coordination : : OpNum : : Heartbeat ) /// Heartbeat request is also special
2021-02-03 20:32:15 +00:00
{
2021-08-24 12:30:31 +00:00
KeeperStorageRequestProcessorPtr storage_request = KeeperStorageRequestProcessorsFactory : : instance ( ) . get ( zk_request ) ;
2022-05-09 07:02:11 +00:00
auto response = storage_request - > process ( * this , zxid , session_id , time ) ;
2021-02-03 20:32:15 +00:00
response - > xid = zk_request - > xid ;
response - > zxid = getZXID ( ) ;
2021-01-19 14:45:45 +00:00
results . push_back ( ResponseForSession { session_id , response } ) ;
2020-11-03 14:49:30 +00:00
}
2021-08-24 12:30:31 +00:00
else /// normal requests proccession
2020-11-03 14:49:30 +00:00
{
2021-08-24 12:30:31 +00:00
KeeperStorageRequestProcessorPtr request_processor = KeeperStorageRequestProcessorsFactory : : instance ( ) . get ( zk_request ) ;
2021-05-21 21:19:22 +00:00
Coordination : : ZooKeeperResponsePtr response ;
2022-05-09 09:16:05 +00:00
if ( is_local )
2021-05-21 21:19:22 +00:00
{
2022-05-09 09:16:05 +00:00
if ( check_acl & & ! request_processor - > checkAuth ( * this , session_id , true ) )
{
response = zk_request - > makeResponse ( ) ;
/// Original ZooKeeper always throws no auth, even when user provided some credentials
response - > error = Coordination : : Error : : ZNOAUTH ;
}
2022-05-11 07:53:32 +00:00
else
{
response = request_processor - > processLocal ( * this , zxid , session_id , time ) ;
}
2021-05-21 21:19:22 +00:00
}
else
{
2022-05-09 09:16:05 +00:00
response = request_processor - > process ( * this , zxid , session_id , time ) ;
2022-05-11 09:08:39 +00:00
std : : erase_if ( uncommitted_state . deltas , [ this ] ( const auto & delta ) { return delta . zxid = = zxid ; } ) ;
2021-05-21 21:19:22 +00:00
}
2021-01-19 14:22:28 +00:00
2021-08-24 12:30:31 +00:00
/// Watches for this requests are added to the watches lists
2021-01-19 14:22:28 +00:00
if ( zk_request - > has_watch )
{
if ( response - > error = = Coordination : : Error : : ZOK )
{
2022-05-06 12:25:25 +00:00
auto & watches_type
= zk_request - > getOpNum ( ) = = Coordination : : OpNum : : List | | zk_request - > getOpNum ( ) = = Coordination : : OpNum : : SimpleList
2021-01-19 14:22:28 +00:00
? list_watches
: watches ;
watches_type [ zk_request - > getPath ( ) ] . emplace_back ( session_id ) ;
sessions_and_watchers [ session_id ] . emplace ( zk_request - > getPath ( ) ) ;
}
else if ( response - > error = = Coordination : : Error : : ZNONODE & & zk_request - > getOpNum ( ) = = Coordination : : OpNum : : Exists )
{
watches [ zk_request - > getPath ( ) ] . emplace_back ( session_id ) ;
sessions_and_watchers [ session_id ] . emplace ( zk_request - > getPath ( ) ) ;
}
}
2021-08-24 12:30:31 +00:00
/// If this requests processed successfully we need to check watches
2021-01-19 14:22:28 +00:00
if ( response - > error = = Coordination : : Error : : ZOK )
{
2021-08-24 12:30:31 +00:00
auto watch_responses = request_processor - > processWatches ( watches , list_watches ) ;
2021-01-19 14:22:28 +00:00
results . insert ( results . end ( ) , watch_responses . begin ( ) , watch_responses . end ( ) ) ;
}
response - > xid = zk_request - > xid ;
response - > zxid = getZXID ( ) ;
2021-01-19 14:45:45 +00:00
results . push_back ( ResponseForSession { session_id , response } ) ;
2020-11-03 14:49:30 +00:00
}
2021-01-19 14:22:28 +00:00
return results ;
2020-11-03 14:49:30 +00:00
}
2022-05-10 13:04:35 +00:00
void KeeperStorage : : rollbackRequest ( int64_t rollback_zxid )
{
2022-05-12 08:58:36 +00:00
if ( uncommitted_zxids . empty ( ) | | uncommitted_zxids . back ( ) ! = rollback_zxid )
throw Exception (
ErrorCodes : : LOGICAL_ERROR , " Trying to rollback invalid ZXID ({}). It should be the last preprocessed. " , rollback_zxid ) ;
2022-05-10 13:04:35 +00:00
// we can only rollback the last zxid (if there is any)
// if there is a delta with a larger zxid, we have invalid state
2022-05-11 09:08:39 +00:00
assert ( uncommitted_state . deltas . empty ( ) | | uncommitted_state . deltas . back ( ) . zxid < = rollback_zxid ) ;
std : : erase_if ( uncommitted_state . deltas , [ rollback_zxid ] ( const auto & delta ) { return delta . zxid = = rollback_zxid ; } ) ;
2022-05-10 13:04:35 +00:00
}
2021-01-19 14:22:28 +00:00
2021-03-29 08:24:56 +00:00
void KeeperStorage : : clearDeadWatches ( int64_t session_id )
2020-11-26 14:57:32 +00:00
{
2021-08-25 09:31:02 +00:00
/// Clear all watches for this session
2020-11-26 14:57:32 +00:00
auto watches_it = sessions_and_watchers . find ( session_id ) ;
if ( watches_it ! = sessions_and_watchers . end ( ) )
{
for ( const auto & watch_path : watches_it - > second )
{
2021-08-24 12:30:31 +00:00
/// Maybe it's a normal watch
2020-11-26 14:57:32 +00:00
auto watch = watches . find ( watch_path ) ;
if ( watch ! = watches . end ( ) )
{
auto & watches_for_path = watch - > second ;
for ( auto w_it = watches_for_path . begin ( ) ; w_it ! = watches_for_path . end ( ) ; )
{
2021-01-19 14:22:28 +00:00
if ( * w_it = = session_id )
2020-11-26 14:57:32 +00:00
w_it = watches_for_path . erase ( w_it ) ;
else
+ + w_it ;
}
if ( watches_for_path . empty ( ) )
watches . erase ( watch ) ;
}
2020-12-14 16:01:29 +00:00
2021-08-24 12:30:31 +00:00
/// Maybe it's a list watch
2020-12-14 16:01:29 +00:00
auto list_watch = list_watches . find ( watch_path ) ;
if ( list_watch ! = list_watches . end ( ) )
{
auto & list_watches_for_path = list_watch - > second ;
for ( auto w_it = list_watches_for_path . begin ( ) ; w_it ! = list_watches_for_path . end ( ) ; )
{
2021-01-19 14:22:28 +00:00
if ( * w_it = = session_id )
2020-12-14 16:01:29 +00:00
w_it = list_watches_for_path . erase ( w_it ) ;
else
+ + w_it ;
}
if ( list_watches_for_path . empty ( ) )
list_watches . erase ( list_watch ) ;
}
2020-11-26 14:57:32 +00:00
}
2021-08-24 12:30:31 +00:00
2020-11-26 14:57:32 +00:00
sessions_and_watchers . erase ( watches_it ) ;
}
}
2021-11-05 10:21:34 +00:00
void KeeperStorage : : dumpWatches ( WriteBufferFromOwnString & buf ) const
{
2021-11-18 20:17:22 +00:00
for ( const auto & [ session_id , watches_paths ] : sessions_and_watchers )
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
buf < < " 0x " < < getHexUIntLowercase ( session_id ) < < " \n " ;
for ( const String & path : watches_paths )
2021-11-12 12:48:42 +00:00
buf < < " \t " < < path < < " \n " ;
2021-11-05 10:21:34 +00:00
}
}
void KeeperStorage : : dumpWatchesByPath ( WriteBufferFromOwnString & buf ) const
{
2021-11-18 20:17:22 +00:00
auto write_int_vec = [ & buf ] ( const std : : vector < int64_t > & session_ids )
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
for ( int64_t session_id : session_ids )
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
buf < < " \t 0x " < < getHexUIntLowercase ( session_id ) < < " \n " ;
2021-11-05 10:21:34 +00:00
}
} ;
2021-11-18 20:17:22 +00:00
for ( const auto & [ watch_path , sessions ] : watches )
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
buf < < watch_path < < " \n " ;
write_int_vec ( sessions ) ;
2021-11-05 10:21:34 +00:00
}
2021-11-18 20:17:22 +00:00
for ( const auto & [ watch_path , sessions ] : list_watches )
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
buf < < watch_path < < " \n " ;
write_int_vec ( sessions ) ;
2021-11-05 10:21:34 +00:00
}
}
2021-11-18 20:17:22 +00:00
void KeeperStorage : : dumpSessionsAndEphemerals ( WriteBufferFromOwnString & buf ) const
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
auto write_str_set = [ & buf ] ( const std : : unordered_set < String > & ephemeral_paths )
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
for ( const String & path : ephemeral_paths )
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
buf < < " \t " < < path < < " \n " ;
2021-11-05 10:21:34 +00:00
}
} ;
2021-11-18 20:17:22 +00:00
buf < < " Sessions dump ( " < < session_and_timeout . size ( ) < < " ): \n " ;
for ( const auto & [ session_id , _ ] : session_and_timeout )
{
buf < < " 0x " < < getHexUIntLowercase ( session_id ) < < " \n " ;
}
buf < < " Sessions with Ephemerals ( " < < getSessionWithEphemeralNodesCount ( ) < < " ): \n " ;
for ( const auto & [ session_id , ephemeral_paths ] : ephemerals )
2021-11-05 10:21:34 +00:00
{
2021-11-18 20:17:22 +00:00
buf < < " 0x " < < getHexUIntLowercase ( session_id ) < < " \n " ;
write_str_set ( ephemeral_paths ) ;
2021-11-05 10:21:34 +00:00
}
}
2021-11-18 20:17:22 +00:00
uint64_t KeeperStorage : : getTotalWatchesCount ( ) const
{
uint64_t ret = 0 ;
for ( const auto & [ path , subscribed_sessions ] : watches )
ret + = subscribed_sessions . size ( ) ;
for ( const auto & [ path , subscribed_sessions ] : list_watches )
ret + = subscribed_sessions . size ( ) ;
return ret ;
}
uint64_t KeeperStorage : : getSessionsWithWatchesCount ( ) const
{
std : : unordered_set < int64_t > counter ;
for ( const auto & [ path , subscribed_sessions ] : watches )
counter . insert ( subscribed_sessions . begin ( ) , subscribed_sessions . end ( ) ) ;
for ( const auto & [ path , subscribed_sessions ] : list_watches )
counter . insert ( subscribed_sessions . begin ( ) , subscribed_sessions . end ( ) ) ;
return counter . size ( ) ;
}
uint64_t KeeperStorage : : getTotalEphemeralNodesCount ( ) const
{
uint64_t ret = 0 ;
for ( const auto & [ session_id , nodes ] : ephemerals )
ret + = nodes . size ( ) ;
return ret ;
}
2020-10-30 14:16:47 +00:00
}