2021-03-31 17:55:04 +00:00
# include <Interpreters/TransactionLog.h>
2021-11-08 18:56:09 +00:00
# include <Interpreters/TransactionVersionMetadata.h>
2021-12-28 11:23:35 +00:00
# include <Interpreters/Context.h>
2022-01-14 14:03:00 +00:00
# include <Interpreters/TransactionsInfoLog.h>
2021-12-28 11:23:35 +00:00
# include <IO/ReadHelpers.h>
# include <IO/WriteHelpers.h>
# include <IO/ReadBufferFromString.h>
2021-04-08 17:20:45 +00:00
# include <Common/Exception.h>
2021-12-28 11:23:35 +00:00
# include <Common/ZooKeeper/KeeperException.h>
2021-11-08 18:56:09 +00:00
# include <Core/ServerUUID.h>
2022-04-27 15:05:45 +00:00
# include <Common/logger_useful.h>
2022-06-16 17:41:32 +00:00
# include <Common/noexcept_scope.h>
2021-03-31 17:55:04 +00:00
2022-03-14 20:43:34 +00:00
2021-03-31 17:55:04 +00:00
namespace DB
{
2021-04-08 17:20:45 +00:00
namespace ErrorCodes
{
2022-01-14 14:03:00 +00:00
extern const int LOGICAL_ERROR ;
2022-05-20 15:35:29 +00:00
extern const int UNKNOWN_STATUS_OF_TRANSACTION ;
2022-01-14 14:03:00 +00:00
}
static void tryWriteEventToSystemLog ( Poco : : Logger * log , ContextPtr context ,
TransactionsInfoLogElement : : Type type , const TransactionID & tid , CSN csn = Tx : : UnknownCSN )
try
{
auto system_log = context - > getTransactionsInfoLog ( ) ;
if ( ! system_log )
return ;
TransactionsInfoLogElement elem ;
elem . type = type ;
elem . tid = tid ;
elem . csn = csn ;
elem . fillCommonFields ( nullptr ) ;
2023-07-28 07:23:34 +00:00
system_log - > add ( std : : move ( elem ) ) ;
2022-01-14 14:03:00 +00:00
}
catch ( . . . )
{
tryLogCurrentException ( log ) ;
2021-04-08 17:20:45 +00:00
}
2021-03-31 17:55:04 +00:00
TransactionLog : : TransactionLog ( )
2022-06-27 20:48:27 +00:00
: global_context ( Context : : getGlobalContextInstance ( ) )
, log ( & Poco : : Logger : : get ( " TransactionLog " ) )
, zookeeper_path ( global_context - > getConfigRef ( ) . getString ( " transaction_log.zookeeper_path " , " /clickhouse/txn " ) )
, zookeeper_path_log ( zookeeper_path + " /log " )
, fault_probability_before_commit ( global_context - > getConfigRef ( ) . getDouble ( " transaction_log.fault_probability_before_commit " , 0 ) )
, fault_probability_after_commit ( global_context - > getConfigRef ( ) . getDouble ( " transaction_log.fault_probability_after_commit " , 0 ) )
2021-03-31 17:55:04 +00:00
{
2021-12-28 11:23:35 +00:00
loadLogFromZooKeeper ( ) ;
updating_thread = ThreadFromGlobalPool ( & TransactionLog : : runUpdatingThread , this ) ;
}
TransactionLog : : ~ TransactionLog ( )
{
2022-01-19 18:29:31 +00:00
shutdown ( ) ;
}
void TransactionLog : : shutdown ( )
{
if ( stop_flag . exchange ( true ) )
return ;
2021-12-28 11:23:35 +00:00
log_updated_event - > set ( ) ;
2022-01-19 18:29:31 +00:00
latest_snapshot . notify_all ( ) ;
2021-12-28 11:23:35 +00:00
updating_thread . join ( ) ;
2022-01-20 18:15:23 +00:00
std : : lock_guard lock { mutex } ;
/// This is required to... you'll never guess - avoid race condition inside Poco::Logger (Coordination::ZooKeeper::log)
zookeeper . reset ( ) ;
}
ZooKeeperPtr TransactionLog : : getZooKeeper ( ) const
{
std : : lock_guard lock { mutex } ;
return zookeeper ;
2021-12-28 11:23:35 +00:00
}
2022-03-16 19:16:26 +00:00
UInt64 TransactionLog : : deserializeCSN ( const String & csn_node_name )
2021-12-28 11:23:35 +00:00
{
ReadBufferFromString buf { csn_node_name } ;
assertString ( " csn- " , buf ) ;
UInt64 res ;
readText ( res , buf ) ;
assertEOF ( buf ) ;
return res ;
}
2022-03-16 19:16:26 +00:00
String TransactionLog : : serializeCSN ( CSN csn )
2022-03-14 20:43:34 +00:00
{
return zkutil : : getSequentialNodeName ( " csn- " , csn ) ;
}
2022-03-16 19:16:26 +00:00
TransactionID TransactionLog : : deserializeTID ( const String & csn_node_content )
2021-12-28 11:23:35 +00:00
{
TransactionID tid = Tx : : EmptyTID ;
if ( csn_node_content . empty ( ) )
return tid ;
ReadBufferFromString buf { csn_node_content } ;
2021-12-30 13:15:28 +00:00
tid = TransactionID : : read ( buf ) ;
2021-12-28 11:23:35 +00:00
assertEOF ( buf ) ;
return tid ;
}
2022-03-16 19:16:26 +00:00
String TransactionLog : : serializeTID ( const TransactionID & tid )
2021-12-28 11:23:35 +00:00
{
WriteBufferFromOwnString buf ;
2021-12-30 13:15:28 +00:00
TransactionID : : write ( tid , buf ) ;
2021-12-28 11:23:35 +00:00
return buf . str ( ) ;
}
2022-09-26 09:31:27 +00:00
2022-09-29 09:19:47 +00:00
void TransactionLog : : loadEntries ( Strings : : const_iterator beg , Strings : : const_iterator end )
{
2022-09-26 09:31:27 +00:00
size_t entries_count = std : : distance ( beg , end ) ;
2022-09-29 09:19:47 +00:00
if ( ! entries_count )
return ;
2022-09-26 09:07:34 +00:00
2022-09-29 09:19:47 +00:00
String last_entry = * std : : prev ( end ) ;
LOG_TRACE ( log , " Loading {} entries from {}: {}..{} " , entries_count , zookeeper_path_log , * beg , last_entry ) ;
std : : vector < std : : string > entry_paths ;
entry_paths . reserve ( entries_count ) ;
for ( auto it = beg ; it ! = end ; + + it )
entry_paths . emplace_back ( fs : : path ( zookeeper_path_log ) / * it ) ;
2022-09-26 09:07:34 +00:00
2022-09-29 09:19:47 +00:00
auto entries = TSA_READ_ONE_THREAD ( zookeeper ) - > get ( entry_paths ) ;
2022-09-26 09:31:27 +00:00
std : : vector < std : : pair < TIDHash , CSNEntry > > loaded ;
loaded . reserve ( entries_count ) ;
2022-09-29 09:19:47 +00:00
auto it = beg ;
2022-09-26 09:31:27 +00:00
for ( size_t i = 0 ; i < entries_count ; + + i , + + it )
2022-09-26 09:07:34 +00:00
{
2022-09-29 09:19:47 +00:00
auto res = entries [ i ] ;
2022-09-26 09:31:27 +00:00
CSN csn = deserializeCSN ( * it ) ;
TransactionID tid = deserializeTID ( res . data ) ;
loaded . emplace_back ( tid . getHash ( ) , CSNEntry { csn , tid } ) ;
LOG_TEST ( log , " Got entry {} -> {} " , tid , csn ) ;
2022-09-26 09:07:34 +00:00
}
2022-07-14 13:46:01 +00:00
NOEXCEPT_SCOPE_STRICT ( {
2022-05-20 10:41:44 +00:00
std : : lock_guard lock { mutex } ;
for ( const auto & entry : loaded )
{
if ( entry . first = = Tx : : EmptyTID . getHash ( ) )
continue ;
2022-03-14 20:43:34 +00:00
2022-05-20 10:41:44 +00:00
tid_to_csn . emplace ( entry . first , entry . second ) ;
}
last_loaded_entry = last_entry ;
2022-07-14 13:46:01 +00:00
} ) ;
2022-05-20 10:41:44 +00:00
{
std : : lock_guard lock { running_list_mutex } ;
latest_snapshot = loaded . back ( ) . second . csn ;
local_tid_counter = Tx : : MaxReservedLocalTID ;
2022-03-14 20:43:34 +00:00
}
2021-12-28 11:23:35 +00:00
}
void TransactionLog : : loadLogFromZooKeeper ( )
{
2022-05-20 10:41:44 +00:00
chassert ( ! zookeeper ) ;
chassert ( tid_to_csn . empty ( ) ) ;
chassert ( last_loaded_entry . empty ( ) ) ;
2021-12-28 11:23:35 +00:00
zookeeper = global_context - > getZooKeeper ( ) ;
/// We do not write local_tid_counter to disk or zk and maintain it only in memory.
/// Create empty entry to allocate new CSN to safely start counting from the beginning and avoid TID duplication.
/// TODO It's possible to skip this step in come cases (especially for multi-host configuration).
2022-03-14 20:43:34 +00:00
Coordination : : Error code = zookeeper - > tryCreate ( zookeeper_path_log + " /csn- " , " " , zkutil : : CreateMode : : PersistentSequential ) ;
2021-12-28 11:23:35 +00:00
if ( code ! = Coordination : : Error : : ZOK )
{
2022-03-14 20:43:34 +00:00
/// Log probably does not exist, create it
2022-05-20 10:41:44 +00:00
chassert ( code = = Coordination : : Error : : ZNONODE ) ;
2022-03-14 20:43:34 +00:00
zookeeper - > createAncestors ( zookeeper_path_log ) ;
2021-12-28 11:23:35 +00:00
Coordination : : Requests ops ;
2022-03-16 19:16:26 +00:00
ops . emplace_back ( zkutil : : makeCreateRequest ( zookeeper_path + " /tail_ptr " , serializeCSN ( Tx : : MaxReservedCSN ) , zkutil : : CreateMode : : Persistent ) ) ;
2022-03-14 20:43:34 +00:00
ops . emplace_back ( zkutil : : makeCreateRequest ( zookeeper_path_log , " " , zkutil : : CreateMode : : Persistent ) ) ;
/// Fast-forward sequential counter to skip reserved CSNs
2021-12-28 11:23:35 +00:00
for ( size_t i = 0 ; i < = Tx : : MaxReservedCSN ; + + i )
2022-03-14 20:43:34 +00:00
ops . emplace_back ( zkutil : : makeCreateRequest ( zookeeper_path_log + " /csn- " , " " , zkutil : : CreateMode : : PersistentSequential ) ) ;
2021-12-28 11:23:35 +00:00
Coordination : : Responses res ;
code = zookeeper - > tryMulti ( ops , res ) ;
if ( code ! = Coordination : : Error : : ZNODEEXISTS )
zkutil : : KeeperMultiException : : check ( code , ops , res ) ;
}
/// TODO Split log into "subdirectories" to:
/// 1. fetch it more optimal way (avoid listing all CSNs on further incremental updates)
/// 2. simplify log rotation
/// 3. support 64-bit CSNs on top of Apache ZooKeeper (it uses Int32 for sequential numbers)
2022-03-14 20:43:34 +00:00
Strings entries_list = zookeeper - > getChildren ( zookeeper_path_log , nullptr , log_updated_event ) ;
2022-05-20 10:41:44 +00:00
chassert ( ! entries_list . empty ( ) ) ;
2022-06-13 13:31:08 +00:00
: : sort ( entries_list . begin ( ) , entries_list . end ( ) ) ;
2021-12-28 11:23:35 +00:00
loadEntries ( entries_list . begin ( ) , entries_list . end ( ) ) ;
2022-05-20 10:41:44 +00:00
chassert ( ! last_loaded_entry . empty ( ) ) ;
chassert ( latest_snapshot = = deserializeCSN ( last_loaded_entry ) ) ;
2021-11-08 18:56:09 +00:00
local_tid_counter = Tx : : MaxReservedLocalTID ;
2022-03-14 20:43:34 +00:00
2022-03-16 19:16:26 +00:00
tail_ptr = deserializeCSN ( zookeeper - > get ( zookeeper_path + " /tail_ptr " ) ) ;
2021-03-31 17:55:04 +00:00
}
2021-12-28 11:23:35 +00:00
void TransactionLog : : runUpdatingThread ( )
{
while ( true )
{
try
{
2022-05-20 20:08:46 +00:00
/// Do not wait if we have some transactions to finalize
2022-06-28 14:25:29 +00:00
if ( TSA_READ_ONE_THREAD ( unknown_state_list_loaded ) . empty ( ) )
2022-05-20 20:08:46 +00:00
log_updated_event - > wait ( ) ;
2021-12-28 11:23:35 +00:00
if ( stop_flag . load ( ) )
return ;
2022-05-20 15:35:29 +00:00
bool connection_loss = getZooKeeper ( ) - > expired ( ) ;
if ( connection_loss )
2022-01-20 18:15:23 +00:00
{
auto new_zookeeper = global_context - > getZooKeeper ( ) ;
2022-05-23 18:53:33 +00:00
{
std : : lock_guard lock { mutex } ;
zookeeper = new_zookeeper ;
}
/// It's possible that we connected to different [Zoo]Keeper instance
2022-06-20 14:05:51 +00:00
/// so we may read a bit stale state.
2022-06-28 14:25:29 +00:00
TSA_READ_ONE_THREAD ( zookeeper ) - > sync ( zookeeper_path_log ) ;
2022-01-20 18:15:23 +00:00
}
2021-12-28 11:23:35 +00:00
loadNewEntries ( ) ;
2022-03-14 20:43:34 +00:00
removeOldEntries ( ) ;
2022-05-23 19:17:52 +00:00
tryFinalizeUnknownStateTransactions ( ) ;
2021-12-28 11:23:35 +00:00
}
2022-04-12 09:39:21 +00:00
catch ( const Coordination : : Exception & )
2021-12-28 11:23:35 +00:00
{
2022-02-03 18:57:09 +00:00
tryLogCurrentException ( log ) ;
2021-12-28 11:23:35 +00:00
/// TODO better backoff
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 1000 ) ) ;
log_updated_event - > set ( ) ;
}
catch ( . . . )
{
2022-02-03 18:57:09 +00:00
tryLogCurrentException ( log ) ;
2021-12-28 11:23:35 +00:00
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 1000 ) ) ;
log_updated_event - > set ( ) ;
}
}
}
void TransactionLog : : loadNewEntries ( )
{
2022-06-28 14:25:29 +00:00
Strings entries_list = TSA_READ_ONE_THREAD ( zookeeper ) - > getChildren ( zookeeper_path_log , nullptr , log_updated_event ) ;
2022-05-20 10:41:44 +00:00
chassert ( ! entries_list . empty ( ) ) ;
2022-06-13 13:31:08 +00:00
: : sort ( entries_list . begin ( ) , entries_list . end ( ) ) ;
2022-06-28 14:25:29 +00:00
auto it = std : : upper_bound ( entries_list . begin ( ) , entries_list . end ( ) , TSA_READ_ONE_THREAD ( last_loaded_entry ) ) ;
2021-12-28 11:23:35 +00:00
loadEntries ( it , entries_list . end ( ) ) ;
2022-06-28 14:25:29 +00:00
chassert ( TSA_READ_ONE_THREAD ( last_loaded_entry ) = = entries_list . back ( ) ) ;
chassert ( latest_snapshot = = deserializeCSN ( TSA_READ_ONE_THREAD ( last_loaded_entry ) ) ) ;
2021-12-28 11:23:35 +00:00
latest_snapshot . notify_all ( ) ;
}
2022-03-14 20:43:34 +00:00
void TransactionLog : : removeOldEntries ( )
{
/// Try to update tail pointer. It's (almost) safe to set it to the oldest snapshot
/// because if a transaction released snapshot, then CSN is already written into metadata.
/// Why almost? Because on server startup we do not have the oldest snapshot (it's simply equal to the latest one),
/// but it's possible that some CSNs are not written into data parts (and we will write them during startup).
if ( ! global_context - > isServerCompletelyStarted ( ) )
return ;
/// Also similar problem is possible if some table was not attached during startup (for example, if table is detached permanently).
/// Also we write CSNs into data parts without fsync, so it's theoretically possible that we wrote CSN, finished transaction,
/// removed its entry from the log, but after that server restarts and CSN is not actually saved to metadata on disk.
/// We should store a bit more entries in ZK and keep outdated entries for a while.
/// TODO we will need a bit more complex logic for multiple hosts
Coordination : : Stat stat ;
2022-06-28 14:25:29 +00:00
CSN old_tail_ptr = deserializeCSN ( TSA_READ_ONE_THREAD ( zookeeper ) - > get ( zookeeper_path + " /tail_ptr " , & stat ) ) ;
2022-03-14 20:43:34 +00:00
CSN new_tail_ptr = getOldestSnapshot ( ) ;
if ( new_tail_ptr < old_tail_ptr )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Got unexpected tail_ptr {}, oldest snapshot is {}, it's a bug " , old_tail_ptr , new_tail_ptr ) ;
2022-03-18 13:33:59 +00:00
else if ( new_tail_ptr = = old_tail_ptr )
return ;
2022-03-14 20:43:34 +00:00
/// (it's not supposed to fail with ZBADVERSION while there is only one host)
LOG_TRACE ( log , " Updating tail_ptr from {} to {} " , old_tail_ptr , new_tail_ptr ) ;
2022-06-28 14:25:29 +00:00
TSA_READ_ONE_THREAD ( zookeeper ) - > set ( zookeeper_path + " /tail_ptr " , serializeCSN ( new_tail_ptr ) , stat . version ) ;
2022-03-14 20:43:34 +00:00
tail_ptr . store ( new_tail_ptr ) ;
/// Now we can find and remove old entries
TIDMap tids ;
{
std : : lock_guard lock { mutex } ;
tids = tid_to_csn ;
}
2021-12-28 11:23:35 +00:00
2022-03-14 20:43:34 +00:00
/// TODO support batching
std : : vector < TIDHash > removed_entries ;
CSN latest_entry_csn = latest_snapshot . load ( ) ;
for ( const auto & elem : tids )
{
/// Definitely not safe to remove
if ( new_tail_ptr < = elem . second . tid . start_csn )
continue ;
/// Keep at least one node (the latest one we fetched)
if ( elem . second . csn = = latest_entry_csn )
continue ;
LOG_TEST ( log , " Removing entry {} -> {} " , elem . second . tid , elem . second . csn ) ;
2022-06-28 14:25:29 +00:00
auto code = TSA_READ_ONE_THREAD ( zookeeper ) - > tryRemove ( zookeeper_path_log + " / " + serializeCSN ( elem . second . csn ) ) ;
2022-03-14 20:43:34 +00:00
if ( code = = Coordination : : Error : : ZOK | | code = = Coordination : : Error : : ZNONODE )
removed_entries . push_back ( elem . first ) ;
}
std : : lock_guard lock { mutex } ;
for ( const auto & tid_hash : removed_entries )
tid_to_csn . erase ( tid_hash ) ;
}
2022-05-20 15:35:29 +00:00
void TransactionLog : : tryFinalizeUnknownStateTransactions ( )
{
/// We just recovered connection to [Zoo]Keeper.
/// Check if transactions in unknown state were actually committed or not and finalize or rollback them.
UnknownStateList list ;
{
2022-05-20 20:08:46 +00:00
/// We must be sure that the corresponding CSN entry is loaded from ZK.
/// Otherwise we may accidentally rollback committed transaction in case of race condition like this:
/// - runUpdatingThread: loaded some entries, ready to call tryFinalizeUnknownStateTransactions()
/// - commitTransaction: creates CSN entry in the log (txn is committed)
/// - [session expires]
/// - commitTransaction: catches Coordination::Exception (maybe due to fault injection), appends txn to unknown_state_list
/// - runUpdatingThread: calls tryFinalizeUnknownStateTransactions(), fails to find CSN for this txn, rolls it back
/// So all CSN entries that might exist at the moment of appending txn to unknown_state_list
/// must be loaded from ZK before we start finalize that txn.
/// That's why we use two lists here:
/// 1. At first we put txn into unknown_state_list
/// 2. We move it to unknown_state_list_loaded when runUpdatingThread done at least one iteration
/// 3. Then we can safely finalize txns from unknown_state_list_loaded, because all required entries are loaded
2022-05-20 15:35:29 +00:00
std : : lock_guard lock { running_list_mutex } ;
std : : swap ( list , unknown_state_list ) ;
2022-05-20 20:08:46 +00:00
std : : swap ( list , unknown_state_list_loaded ) ;
2022-05-20 15:35:29 +00:00
}
for ( auto & [ txn , state_guard ] : list )
{
/// CSNs must be already loaded, only need to check if the corresponding mapping exists.
if ( auto csn = getCSN ( txn - > tid ) )
{
2023-03-04 15:48:04 +00:00
finalizeCommittedTransaction ( txn . get ( ) , csn , state_guard ) ;
2022-05-20 15:35:29 +00:00
}
else
{
assertTIDIsNotOutdated ( txn - > tid ) ;
state_guard = { } ;
rollbackTransaction ( txn - > shared_from_this ( ) ) ;
}
}
}
2022-03-14 20:43:34 +00:00
CSN TransactionLog : : getLatestSnapshot ( ) const
2021-03-31 17:55:04 +00:00
{
2021-04-08 17:20:45 +00:00
return latest_snapshot . load ( ) ;
2021-03-31 17:55:04 +00:00
}
MergeTreeTransactionPtr TransactionLog : : beginTransaction ( )
{
2021-06-04 09:26:47 +00:00
MergeTreeTransactionPtr txn ;
2021-04-08 17:20:45 +00:00
{
std : : lock_guard lock { running_list_mutex } ;
2022-03-14 20:43:34 +00:00
CSN snapshot = latest_snapshot . load ( ) ;
2021-06-04 09:26:47 +00:00
LocalTID ltid = 1 + local_tid_counter . fetch_add ( 1 ) ;
2022-06-27 20:48:27 +00:00
auto snapshot_lock = snapshots_in_use . insert ( snapshots_in_use . end ( ) , snapshot ) ;
txn = std : : make_shared < MergeTreeTransaction > ( snapshot , ltid , ServerUUID : : get ( ) , snapshot_lock ) ;
2021-06-04 09:26:47 +00:00
bool inserted = running_list . try_emplace ( txn - > tid . getHash ( ) , txn ) . second ;
2021-04-08 17:20:45 +00:00
if ( ! inserted )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " I's a bug: TID {} {} exists " , txn - > tid . getHash ( ) , txn - > tid ) ;
}
2022-01-14 14:03:00 +00:00
LOG_TEST ( log , " Beginning transaction {} ({}) " , txn - > tid , txn - > tid . getHash ( ) ) ;
tryWriteEventToSystemLog ( log , global_context , TransactionsInfoLogElement : : BEGIN , txn - > tid ) ;
2021-04-08 17:20:45 +00:00
return txn ;
2021-03-31 17:55:04 +00:00
}
2022-05-20 20:08:46 +00:00
CSN TransactionLog : : commitTransaction ( const MergeTreeTransactionPtr & txn , bool throw_on_unknown_status )
2021-03-31 17:55:04 +00:00
{
2022-03-14 20:43:34 +00:00
/// Some precommit checks, may throw
2022-05-20 15:35:29 +00:00
auto state_guard = txn - > beforeCommit ( ) ;
2021-04-08 17:20:45 +00:00
2022-05-20 15:35:29 +00:00
CSN allocated_csn = Tx : : UnknownCSN ;
2021-04-08 17:20:45 +00:00
if ( txn - > isReadOnly ( ) )
{
2022-03-14 20:43:34 +00:00
/// Don't need to allocate CSN in ZK for readonly transactions, it's safe to use snapshot/start_csn as "commit" timestamp
2022-01-14 14:03:00 +00:00
LOG_TEST ( log , " Closing readonly transaction {} " , txn - > tid ) ;
2021-04-08 17:20:45 +00:00
}
else
{
2022-02-14 19:47:17 +00:00
LOG_TEST ( log , " Committing transaction {} " , txn - > dumpDescription ( ) ) ;
2021-12-28 11:23:35 +00:00
/// TODO support batching
2022-01-20 18:15:23 +00:00
auto current_zookeeper = getZooKeeper ( ) ;
2022-05-20 15:35:29 +00:00
String csn_path_created ;
try
{
2022-09-11 01:21:34 +00:00
if ( unlikely ( fault_probability_before_commit > 0.0 ) )
2022-05-20 20:08:46 +00:00
{
std : : bernoulli_distribution fault ( fault_probability_before_commit ) ;
if ( fault ( thread_local_rng ) )
2023-08-14 16:57:40 +00:00
throw Coordination : : Exception : : fromMessage ( Coordination : : Error : : ZCONNECTIONLOSS , " Fault injected (before commit) " ) ;
2022-05-20 20:08:46 +00:00
}
2022-05-20 15:35:29 +00:00
/// Commit point
csn_path_created = current_zookeeper - > create ( zookeeper_path_log + " /csn- " , serializeTID ( txn - > tid ) , zkutil : : CreateMode : : PersistentSequential ) ;
2022-05-20 20:08:46 +00:00
2022-09-11 01:21:34 +00:00
if ( unlikely ( fault_probability_after_commit > 0.0 ) )
2022-05-20 20:08:46 +00:00
{
std : : bernoulli_distribution fault ( fault_probability_after_commit ) ;
if ( fault ( thread_local_rng ) )
2023-08-14 16:57:40 +00:00
throw Coordination : : Exception : : fromMessage ( Coordination : : Error : : ZCONNECTIONLOSS , " Fault injected (after commit) " ) ;
2022-05-20 20:08:46 +00:00
}
2022-05-20 15:35:29 +00:00
}
catch ( const Coordination : : Exception & e )
{
if ( ! Coordination : : isHardwareError ( e . code ) )
throw ;
2022-03-14 20:43:34 +00:00
2022-05-20 15:35:29 +00:00
/// We don't know if transaction has been actually committed or not.
/// The only thing we can do is to postpone its finalization.
{
std : : lock_guard lock { running_list_mutex } ;
2023-03-04 15:48:04 +00:00
unknown_state_list . emplace_back ( txn , std : : move ( state_guard ) ) ;
2022-05-20 15:35:29 +00:00
}
log_updated_event - > set ( ) ;
2022-05-20 20:08:46 +00:00
if ( throw_on_unknown_status )
throw Exception ( ErrorCodes : : UNKNOWN_STATUS_OF_TRANSACTION ,
" Connection lost on attempt to commit transaction {}, will finalize it later: {} " ,
txn - > tid , e . message ( ) ) ;
LOG_INFO ( log , " Connection lost on attempt to commit transaction {}, will finalize it later: {} " , txn - > tid , e . message ( ) ) ;
return Tx : : CommittingCSN ;
2022-05-20 15:35:29 +00:00
}
2022-05-23 18:53:33 +00:00
/// Do not allow exceptions between commit point and the and of transaction finalization
/// (otherwise it may stuck in COMMITTING state holding snapshot).
2022-07-14 13:46:01 +00:00
NOEXCEPT_SCOPE_STRICT ( {
/// FIXME Transactions: Sequential node numbers in ZooKeeper are Int32, but 31 bit is not enough for production use
/// (overflow is possible in a several weeks/months of active usage)
allocated_csn = deserializeCSN ( csn_path_created . substr ( zookeeper_path_log . size ( ) + 1 ) ) ;
} ) ;
2022-05-20 15:35:29 +00:00
}
2022-01-14 14:03:00 +00:00
2022-05-21 00:32:35 +00:00
return finalizeCommittedTransaction ( txn . get ( ) , allocated_csn , state_guard ) ;
2022-05-20 15:35:29 +00:00
}
2022-05-21 00:32:35 +00:00
CSN TransactionLog : : finalizeCommittedTransaction ( MergeTreeTransaction * txn , CSN allocated_csn , scope_guard & state_guard ) noexcept
2022-05-20 15:35:29 +00:00
{
2022-07-18 15:40:09 +00:00
LockMemoryExceptionInThread memory_tracker_lock ( VariableContext : : Global ) ;
2022-05-20 15:35:29 +00:00
chassert ( ! allocated_csn = = txn - > isReadOnly ( ) ) ;
if ( allocated_csn )
{
LOG_INFO ( log , " Transaction {} committed with CSN={} " , txn - > tid , allocated_csn ) ;
tryWriteEventToSystemLog ( log , global_context , TransactionsInfoLogElement : : COMMIT , txn - > tid , allocated_csn ) ;
2021-12-28 11:23:35 +00:00
}
2022-05-20 15:35:29 +00:00
else
{
/// Transaction was readonly
allocated_csn = txn - > snapshot ;
tryWriteEventToSystemLog ( log , global_context , TransactionsInfoLogElement : : COMMIT , txn - > tid , allocated_csn ) ;
}
2021-05-18 17:07:29 +00:00
2022-03-14 20:43:34 +00:00
/// Write allocated CSN, so we will be able to cleanup log in ZK. This method is noexcept.
2022-05-20 15:35:29 +00:00
txn - > afterCommit ( allocated_csn ) ;
2022-05-21 00:32:35 +00:00
state_guard = { } ;
2021-04-09 12:53:51 +00:00
2021-04-08 17:20:45 +00:00
{
2022-03-14 20:43:34 +00:00
/// Finally we can remove transaction from the list and release the snapshot
2021-04-08 17:20:45 +00:00
std : : lock_guard lock { running_list_mutex } ;
2022-05-20 15:35:29 +00:00
snapshots_in_use . erase ( txn - > snapshot_in_use_it ) ;
2021-04-08 17:20:45 +00:00
bool removed = running_list . erase ( txn - > tid . getHash ( ) ) ;
if ( ! removed )
2022-05-20 15:35:29 +00:00
{
2023-07-25 04:10:04 +00:00
LOG_ERROR ( log , " It's a bug: TID {} {} doesn't exist " , txn - > tid . getHash ( ) , txn - > tid ) ;
2022-05-20 15:35:29 +00:00
abort ( ) ;
}
2021-04-08 17:20:45 +00:00
}
2022-01-14 14:03:00 +00:00
2023-03-04 15:48:04 +00:00
txn - > afterFinalize ( ) ;
2022-05-20 15:35:29 +00:00
return allocated_csn ;
2021-03-31 17:55:04 +00:00
}
2022-05-20 20:08:46 +00:00
bool TransactionLog : : waitForCSNLoaded ( CSN csn ) const
{
auto current_latest_snapshot = latest_snapshot . load ( ) ;
while ( current_latest_snapshot < csn & & ! stop_flag )
{
latest_snapshot . wait ( current_latest_snapshot ) ;
current_latest_snapshot = latest_snapshot . load ( ) ;
}
return csn < = current_latest_snapshot ;
}
2021-04-09 12:53:51 +00:00
void TransactionLog : : rollbackTransaction ( const MergeTreeTransactionPtr & txn ) noexcept
2021-03-31 17:55:04 +00:00
{
2022-07-18 15:40:09 +00:00
LockMemoryExceptionInThread memory_tracker_lock ( VariableContext : : Global ) ;
2022-02-04 18:18:20 +00:00
LOG_TRACE ( log , " Rolling back transaction {}{} " , txn - > tid ,
std : : uncaught_exceptions ( ) ? fmt : : format ( " due to uncaught exception (code: {}) " , getCurrentExceptionCode ( ) ) : " " ) ;
2022-01-14 14:03:00 +00:00
if ( ! txn - > rollback ( ) )
2022-03-14 20:43:34 +00:00
{
2022-05-20 15:35:29 +00:00
/// Transaction was cancelled or committed concurrently
chassert ( txn - > csn ! = Tx : : UnknownCSN ) ;
2022-01-14 14:03:00 +00:00
return ;
2022-03-14 20:43:34 +00:00
}
2022-01-14 14:03:00 +00:00
2021-04-08 17:20:45 +00:00
{
std : : lock_guard lock { running_list_mutex } ;
bool removed = running_list . erase ( txn - > tid . getHash ( ) ) ;
if ( ! removed )
2021-04-09 12:53:51 +00:00
abort ( ) ;
2021-06-04 09:26:47 +00:00
snapshots_in_use . erase ( txn - > snapshot_in_use_it ) ;
2021-04-08 17:20:45 +00:00
}
2022-01-14 14:03:00 +00:00
tryWriteEventToSystemLog ( log , global_context , TransactionsInfoLogElement : : ROLLBACK , txn - > tid ) ;
2023-03-04 15:48:04 +00:00
txn - > afterFinalize ( ) ;
2021-04-08 17:20:45 +00:00
}
MergeTreeTransactionPtr TransactionLog : : tryGetRunningTransaction ( const TIDHash & tid )
{
std : : lock_guard lock { running_list_mutex } ;
auto it = running_list . find ( tid ) ;
if ( it = = running_list . end ( ) )
2022-03-16 19:16:26 +00:00
return NO_TRANSACTION_PTR ;
2021-04-08 17:20:45 +00:00
return it - > second ;
}
2023-02-15 16:10:06 +00:00
CSN TransactionLog : : getCSN ( const TransactionID & tid , const std : : atomic < CSN > * failback_with_strict_load_csn )
2021-04-08 17:20:45 +00:00
{
2022-03-14 20:43:34 +00:00
/// Avoid creation of the instance if transactions are not actually involved
if ( tid = = Tx : : PrehistoricTID )
return Tx : : PrehistoricCSN ;
2023-02-15 16:10:06 +00:00
return instance ( ) . getCSNImpl ( tid . getHash ( ) , failback_with_strict_load_csn ) ;
2021-04-08 17:20:45 +00:00
}
2023-02-15 16:10:06 +00:00
CSN TransactionLog : : getCSN ( const TIDHash & tid , const std : : atomic < CSN > * failback_with_strict_load_csn )
2021-04-08 17:20:45 +00:00
{
2022-01-31 22:27:55 +00:00
/// Avoid creation of the instance if transactions are not actually involved
2021-04-08 17:20:45 +00:00
if ( tid = = Tx : : PrehistoricTID . getHash ( ) )
return Tx : : PrehistoricCSN ;
2023-02-15 16:10:06 +00:00
return instance ( ) . getCSNImpl ( tid , failback_with_strict_load_csn ) ;
2022-01-31 22:27:55 +00:00
}
2023-02-15 16:10:06 +00:00
CSN TransactionLog : : getCSNImpl ( const TIDHash & tid_hash , const std : : atomic < CSN > * failback_with_strict_load_csn ) const
2022-01-31 22:27:55 +00:00
{
2022-05-20 10:41:44 +00:00
chassert ( tid_hash ) ;
chassert ( tid_hash ! = Tx : : EmptyTID . getHash ( ) ) ;
2022-01-31 22:27:55 +00:00
2023-02-15 16:10:06 +00:00
{
std : : lock_guard lock { mutex } ;
auto it = tid_to_csn . find ( tid_hash ) ;
if ( it ! = tid_to_csn . end ( ) )
return it - > second . csn ;
}
/// Usually commit csn checked by load memory with memory_order_relaxed option just for performance improvements
/// If fast loading fails than getCSN is called.
/// There is a race possible, transaction could be committed concurrently. Right before getCSN has been called. In that case tid_to_csn has no tid_hash but commit csn is set.
/// In order to be sure, commit csn has to be loaded with memory_order_seq_cst after lookup at tid_to_csn
if ( failback_with_strict_load_csn )
if ( CSN maybe_csn = failback_with_strict_load_csn - > load ( ) )
return maybe_csn ;
2022-03-14 20:43:34 +00:00
return Tx : : UnknownCSN ;
}
2023-02-15 16:10:06 +00:00
CSN TransactionLog : : getCSNAndAssert ( const TransactionID & tid , std : : atomic < CSN > & failback_with_strict_load_csn )
{
/// failback_with_strict_load_csn is not provided to getCSN
2023-02-16 18:00:46 +00:00
/// Because it would be checked after assertTIDIsNotOutdated
2023-02-15 16:10:06 +00:00
if ( CSN maybe_csn = getCSN ( tid ) )
return maybe_csn ;
assertTIDIsNotOutdated ( tid , & failback_with_strict_load_csn ) ;
2023-02-16 18:00:55 +00:00
/// If transaction is not outdated then it might be already committed
2023-02-15 16:10:06 +00:00
/// We should load CSN again to distinguish it
/// Otherwise the transactiuon hasn't been committed yet
if ( CSN maybe_csn = failback_with_strict_load_csn . load ( ) )
return maybe_csn ;
return Tx : : UnknownCSN ;
}
void TransactionLog : : assertTIDIsNotOutdated ( const TransactionID & tid , const std : : atomic < CSN > * failback_with_strict_load_csn )
2022-03-14 20:43:34 +00:00
{
if ( tid = = Tx : : PrehistoricTID )
return ;
2022-03-18 11:01:26 +00:00
/// Ensure that we are not trying to get CSN for TID that was already removed from the log
2022-03-14 20:43:34 +00:00
CSN tail = instance ( ) . tail_ptr . load ( ) ;
if ( tail < = tid . start_csn )
return ;
2023-02-15 16:10:06 +00:00
/// At this point of execution tail is lesser that tid.start_csn
/// This mean that transaction is either outdated or just has been committed concurrently and the tail moved forward.
2023-02-16 18:01:02 +00:00
/// If the second case takes place transaction's commit csn has to be set.
2023-02-15 16:10:06 +00:00
/// We should load CSN again to distinguish the second case.
if ( failback_with_strict_load_csn )
if ( CSN maybe_csn = failback_with_strict_load_csn - > load ( ) )
return ;
2022-03-14 20:43:34 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Trying to get CSN for too old TID {}, current tail_ptr is {}, probably it's a bug " , tid , tail ) ;
2021-03-31 17:55:04 +00:00
}
2022-03-14 20:43:34 +00:00
CSN TransactionLog : : getOldestSnapshot ( ) const
2021-06-04 09:26:47 +00:00
{
std : : lock_guard lock { running_list_mutex } ;
if ( snapshots_in_use . empty ( ) )
return getLatestSnapshot ( ) ;
2022-05-20 10:41:44 +00:00
chassert ( running_list . size ( ) = = snapshots_in_use . size ( ) ) ;
chassert ( snapshots_in_use . size ( ) < 2 | | snapshots_in_use . front ( ) < = * + + snapshots_in_use . begin ( ) ) ;
2021-06-04 09:26:47 +00:00
return snapshots_in_use . front ( ) ;
}
2022-03-10 21:29:58 +00:00
TransactionLog : : TransactionsList TransactionLog : : getTransactionsList ( ) const
{
std : : lock_guard lock { running_list_mutex } ;
return running_list ;
}
2022-05-25 20:20:13 +00:00
void TransactionLog : : sync ( ) const
{
2022-06-27 20:48:27 +00:00
Strings entries_list = getZooKeeper ( ) - > getChildren ( zookeeper_path_log ) ;
2022-05-25 20:20:13 +00:00
chassert ( ! entries_list . empty ( ) ) ;
2022-06-13 13:31:08 +00:00
: : sort ( entries_list . begin ( ) , entries_list . end ( ) ) ;
2022-05-25 20:20:13 +00:00
CSN newest_csn = deserializeCSN ( entries_list . back ( ) ) ;
waitForCSNLoaded ( newest_csn ) ;
}
2021-03-31 17:55:04 +00:00
}