2021-03-31 17:55:04 +00:00
# include <Interpreters/TransactionLog.h>
2021-11-08 18:56:09 +00:00
# include <Interpreters/TransactionVersionMetadata.h>
2021-12-28 11:23:35 +00:00
# include <Interpreters/Context.h>
2022-01-14 14:03:00 +00:00
# include <Interpreters/TransactionsInfoLog.h>
2021-12-28 11:23:35 +00:00
# include <IO/ReadHelpers.h>
# include <IO/WriteHelpers.h>
# include <IO/ReadBufferFromString.h>
2021-04-08 17:20:45 +00:00
# include <Common/Exception.h>
2021-12-28 11:23:35 +00:00
# include <Common/ZooKeeper/KeeperException.h>
2021-11-08 18:56:09 +00:00
# include <Core/ServerUUID.h>
2022-04-27 15:05:45 +00:00
# include <Common/logger_useful.h>
2021-03-31 17:55:04 +00:00
2022-03-14 20:43:34 +00:00
/// It's used in critical places to exit on unexpected exceptions.
/// SIGABRT is usually better that broken state in memory with unpredictable consequences.
# define NOEXCEPT_SCOPE SCOPE_EXIT({ if (std::uncaught_exceptions()) { tryLogCurrentException("NOEXCEPT_SCOPE"); abort(); } })
2021-03-31 17:55:04 +00:00
namespace DB
{
2021-04-08 17:20:45 +00:00
namespace ErrorCodes
{
2022-01-14 14:03:00 +00:00
extern const int LOGICAL_ERROR ;
}
static void tryWriteEventToSystemLog ( Poco : : Logger * log , ContextPtr context ,
TransactionsInfoLogElement : : Type type , const TransactionID & tid , CSN csn = Tx : : UnknownCSN )
try
{
auto system_log = context - > getTransactionsInfoLog ( ) ;
if ( ! system_log )
return ;
TransactionsInfoLogElement elem ;
elem . type = type ;
elem . tid = tid ;
elem . csn = csn ;
elem . fillCommonFields ( nullptr ) ;
system_log - > add ( elem ) ;
}
catch ( . . . )
{
tryLogCurrentException ( log ) ;
2021-04-08 17:20:45 +00:00
}
2021-03-31 17:55:04 +00:00
TransactionLog : : TransactionLog ( )
2021-05-18 17:07:29 +00:00
: log ( & Poco : : Logger : : get ( " TransactionLog " ) )
2021-03-31 17:55:04 +00:00
{
2021-12-28 11:23:35 +00:00
global_context = Context : : getGlobalContextInstance ( ) ;
2022-01-31 22:27:55 +00:00
global_context - > checkTransactionsAreAllowed ( ) ;
2022-03-16 19:16:26 +00:00
zookeeper_path = global_context - > getConfigRef ( ) . getString ( " transaction_log.zookeeper_path " , " /clickhouse/txn " ) ;
2022-03-14 20:43:34 +00:00
zookeeper_path_log = zookeeper_path + " /log " ;
2021-12-28 11:23:35 +00:00
loadLogFromZooKeeper ( ) ;
updating_thread = ThreadFromGlobalPool ( & TransactionLog : : runUpdatingThread , this ) ;
}
TransactionLog : : ~ TransactionLog ( )
{
2022-01-19 18:29:31 +00:00
shutdown ( ) ;
}
void TransactionLog : : shutdown ( )
{
if ( stop_flag . exchange ( true ) )
return ;
2021-12-28 11:23:35 +00:00
log_updated_event - > set ( ) ;
2022-01-19 18:29:31 +00:00
latest_snapshot . notify_all ( ) ;
2021-12-28 11:23:35 +00:00
updating_thread . join ( ) ;
2022-01-20 18:15:23 +00:00
std : : lock_guard lock { mutex } ;
/// This is required to... you'll never guess - avoid race condition inside Poco::Logger (Coordination::ZooKeeper::log)
zookeeper . reset ( ) ;
}
ZooKeeperPtr TransactionLog : : getZooKeeper ( ) const
{
std : : lock_guard lock { mutex } ;
return zookeeper ;
2021-12-28 11:23:35 +00:00
}
2022-03-16 19:16:26 +00:00
UInt64 TransactionLog : : deserializeCSN ( const String & csn_node_name )
2021-12-28 11:23:35 +00:00
{
ReadBufferFromString buf { csn_node_name } ;
assertString ( " csn- " , buf ) ;
UInt64 res ;
readText ( res , buf ) ;
assertEOF ( buf ) ;
return res ;
}
2022-03-16 19:16:26 +00:00
String TransactionLog : : serializeCSN ( CSN csn )
2022-03-14 20:43:34 +00:00
{
return zkutil : : getSequentialNodeName ( " csn- " , csn ) ;
}
2022-03-16 19:16:26 +00:00
TransactionID TransactionLog : : deserializeTID ( const String & csn_node_content )
2021-12-28 11:23:35 +00:00
{
TransactionID tid = Tx : : EmptyTID ;
if ( csn_node_content . empty ( ) )
return tid ;
ReadBufferFromString buf { csn_node_content } ;
2021-12-30 13:15:28 +00:00
tid = TransactionID : : read ( buf ) ;
2021-12-28 11:23:35 +00:00
assertEOF ( buf ) ;
return tid ;
}
2022-03-16 19:16:26 +00:00
String TransactionLog : : serializeTID ( const TransactionID & tid )
2021-12-28 11:23:35 +00:00
{
WriteBufferFromOwnString buf ;
2021-12-30 13:15:28 +00:00
TransactionID : : write ( tid , buf ) ;
2021-12-28 11:23:35 +00:00
return buf . str ( ) ;
}
void TransactionLog : : loadEntries ( Strings : : const_iterator beg , Strings : : const_iterator end )
{
std : : vector < std : : future < Coordination : : GetResponse > > futures ;
size_t entries_count = std : : distance ( beg , end ) ;
if ( ! entries_count )
return ;
String last_entry = * std : : prev ( end ) ;
2022-03-14 20:43:34 +00:00
LOG_TRACE ( log , " Loading {} entries from {}: {}..{} " , entries_count , zookeeper_path_log , * beg , last_entry ) ;
2021-12-28 11:23:35 +00:00
futures . reserve ( entries_count ) ;
for ( auto it = beg ; it ! = end ; + + it )
2022-03-14 20:43:34 +00:00
futures . emplace_back ( zookeeper - > asyncGet ( fs : : path ( zookeeper_path_log ) / * it ) ) ;
2021-12-28 11:23:35 +00:00
2022-03-14 20:43:34 +00:00
std : : vector < std : : pair < TIDHash , CSNEntry > > loaded ;
2021-12-28 11:23:35 +00:00
loaded . reserve ( entries_count ) ;
auto it = beg ;
for ( size_t i = 0 ; i < entries_count ; + + i , + + it )
{
auto res = futures [ i ] . get ( ) ;
2022-03-16 19:16:26 +00:00
CSN csn = deserializeCSN ( * it ) ;
TransactionID tid = deserializeTID ( res . data ) ;
2022-03-14 20:43:34 +00:00
loaded . emplace_back ( tid . getHash ( ) , CSNEntry { csn , tid } ) ;
2021-12-28 11:23:35 +00:00
LOG_TEST ( log , " Got entry {} -> {} " , tid , csn ) ;
}
futures . clear ( ) ;
2022-03-14 20:43:34 +00:00
NOEXCEPT_SCOPE ;
2022-01-19 20:16:05 +00:00
LockMemoryExceptionInThread lock_memory_tracker ( VariableContext : : Global ) ;
2022-01-20 18:15:23 +00:00
std : : lock_guard lock { mutex } ;
2022-03-14 20:43:34 +00:00
for ( const auto & entry : loaded )
{
if ( entry . first = = Tx : : EmptyTID . getHash ( ) )
continue ;
tid_to_csn . emplace ( entry . first , entry . second ) ;
}
last_loaded_entry = last_entry ;
latest_snapshot = loaded . back ( ) . second . csn ;
local_tid_counter = Tx : : MaxReservedLocalTID ;
2021-12-28 11:23:35 +00:00
}
void TransactionLog : : loadLogFromZooKeeper ( )
{
assert ( ! zookeeper ) ;
assert ( tid_to_csn . empty ( ) ) ;
assert ( last_loaded_entry . empty ( ) ) ;
zookeeper = global_context - > getZooKeeper ( ) ;
/// We do not write local_tid_counter to disk or zk and maintain it only in memory.
/// Create empty entry to allocate new CSN to safely start counting from the beginning and avoid TID duplication.
/// TODO It's possible to skip this step in come cases (especially for multi-host configuration).
2022-03-14 20:43:34 +00:00
Coordination : : Error code = zookeeper - > tryCreate ( zookeeper_path_log + " /csn- " , " " , zkutil : : CreateMode : : PersistentSequential ) ;
2021-12-28 11:23:35 +00:00
if ( code ! = Coordination : : Error : : ZOK )
{
2022-03-14 20:43:34 +00:00
/// Log probably does not exist, create it
2021-12-28 11:23:35 +00:00
assert ( code = = Coordination : : Error : : ZNONODE ) ;
2022-03-14 20:43:34 +00:00
zookeeper - > createAncestors ( zookeeper_path_log ) ;
2021-12-28 11:23:35 +00:00
Coordination : : Requests ops ;
2022-03-16 19:16:26 +00:00
ops . emplace_back ( zkutil : : makeCreateRequest ( zookeeper_path + " /tail_ptr " , serializeCSN ( Tx : : MaxReservedCSN ) , zkutil : : CreateMode : : Persistent ) ) ;
2022-03-14 20:43:34 +00:00
ops . emplace_back ( zkutil : : makeCreateRequest ( zookeeper_path_log , " " , zkutil : : CreateMode : : Persistent ) ) ;
/// Fast-forward sequential counter to skip reserved CSNs
2021-12-28 11:23:35 +00:00
for ( size_t i = 0 ; i < = Tx : : MaxReservedCSN ; + + i )
2022-03-14 20:43:34 +00:00
ops . emplace_back ( zkutil : : makeCreateRequest ( zookeeper_path_log + " /csn- " , " " , zkutil : : CreateMode : : PersistentSequential ) ) ;
2021-12-28 11:23:35 +00:00
Coordination : : Responses res ;
code = zookeeper - > tryMulti ( ops , res ) ;
if ( code ! = Coordination : : Error : : ZNODEEXISTS )
zkutil : : KeeperMultiException : : check ( code , ops , res ) ;
}
/// TODO Split log into "subdirectories" to:
/// 1. fetch it more optimal way (avoid listing all CSNs on further incremental updates)
/// 2. simplify log rotation
/// 3. support 64-bit CSNs on top of Apache ZooKeeper (it uses Int32 for sequential numbers)
2022-03-14 20:43:34 +00:00
Strings entries_list = zookeeper - > getChildren ( zookeeper_path_log , nullptr , log_updated_event ) ;
2021-12-28 11:23:35 +00:00
assert ( ! entries_list . empty ( ) ) ;
std : : sort ( entries_list . begin ( ) , entries_list . end ( ) ) ;
loadEntries ( entries_list . begin ( ) , entries_list . end ( ) ) ;
assert ( ! last_loaded_entry . empty ( ) ) ;
2022-03-16 19:16:26 +00:00
assert ( latest_snapshot = = deserializeCSN ( last_loaded_entry ) ) ;
2021-11-08 18:56:09 +00:00
local_tid_counter = Tx : : MaxReservedLocalTID ;
2022-03-14 20:43:34 +00:00
2022-03-16 19:16:26 +00:00
tail_ptr = deserializeCSN ( zookeeper - > get ( zookeeper_path + " /tail_ptr " ) ) ;
2021-03-31 17:55:04 +00:00
}
2021-12-28 11:23:35 +00:00
void TransactionLog : : runUpdatingThread ( )
{
while ( true )
{
try
{
log_updated_event - > wait ( ) ;
if ( stop_flag . load ( ) )
return ;
2022-04-12 09:39:21 +00:00
if ( getZooKeeper ( ) - > expired ( ) )
2022-01-20 18:15:23 +00:00
{
auto new_zookeeper = global_context - > getZooKeeper ( ) ;
std : : lock_guard lock { mutex } ;
zookeeper = new_zookeeper ;
}
2021-12-28 11:23:35 +00:00
loadNewEntries ( ) ;
2022-03-14 20:43:34 +00:00
removeOldEntries ( ) ;
2021-12-28 11:23:35 +00:00
}
2022-04-12 09:39:21 +00:00
catch ( const Coordination : : Exception & )
2021-12-28 11:23:35 +00:00
{
2022-02-03 18:57:09 +00:00
tryLogCurrentException ( log ) ;
2021-12-28 11:23:35 +00:00
/// TODO better backoff
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 1000 ) ) ;
log_updated_event - > set ( ) ;
}
catch ( . . . )
{
2022-02-03 18:57:09 +00:00
tryLogCurrentException ( log ) ;
2021-12-28 11:23:35 +00:00
std : : this_thread : : sleep_for ( std : : chrono : : milliseconds ( 1000 ) ) ;
log_updated_event - > set ( ) ;
}
}
}
void TransactionLog : : loadNewEntries ( )
{
2022-03-14 20:43:34 +00:00
Strings entries_list = zookeeper - > getChildren ( zookeeper_path_log , nullptr , log_updated_event ) ;
2021-12-28 11:23:35 +00:00
assert ( ! entries_list . empty ( ) ) ;
std : : sort ( entries_list . begin ( ) , entries_list . end ( ) ) ;
auto it = std : : upper_bound ( entries_list . begin ( ) , entries_list . end ( ) , last_loaded_entry ) ;
loadEntries ( it , entries_list . end ( ) ) ;
assert ( last_loaded_entry = = entries_list . back ( ) ) ;
2022-03-16 19:16:26 +00:00
assert ( latest_snapshot = = deserializeCSN ( last_loaded_entry ) ) ;
2021-12-28 11:23:35 +00:00
latest_snapshot . notify_all ( ) ;
}
2022-03-14 20:43:34 +00:00
void TransactionLog : : removeOldEntries ( )
{
/// Try to update tail pointer. It's (almost) safe to set it to the oldest snapshot
/// because if a transaction released snapshot, then CSN is already written into metadata.
/// Why almost? Because on server startup we do not have the oldest snapshot (it's simply equal to the latest one),
/// but it's possible that some CSNs are not written into data parts (and we will write them during startup).
if ( ! global_context - > isServerCompletelyStarted ( ) )
return ;
/// Also similar problem is possible if some table was not attached during startup (for example, if table is detached permanently).
/// Also we write CSNs into data parts without fsync, so it's theoretically possible that we wrote CSN, finished transaction,
/// removed its entry from the log, but after that server restarts and CSN is not actually saved to metadata on disk.
/// We should store a bit more entries in ZK and keep outdated entries for a while.
/// TODO we will need a bit more complex logic for multiple hosts
Coordination : : Stat stat ;
2022-03-16 19:16:26 +00:00
CSN old_tail_ptr = deserializeCSN ( zookeeper - > get ( zookeeper_path + " /tail_ptr " , & stat ) ) ;
2022-03-14 20:43:34 +00:00
CSN new_tail_ptr = getOldestSnapshot ( ) ;
if ( new_tail_ptr < old_tail_ptr )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Got unexpected tail_ptr {}, oldest snapshot is {}, it's a bug " , old_tail_ptr , new_tail_ptr ) ;
2022-03-18 13:33:59 +00:00
else if ( new_tail_ptr = = old_tail_ptr )
return ;
2022-03-14 20:43:34 +00:00
/// (it's not supposed to fail with ZBADVERSION while there is only one host)
LOG_TRACE ( log , " Updating tail_ptr from {} to {} " , old_tail_ptr , new_tail_ptr ) ;
2022-03-16 19:16:26 +00:00
zookeeper - > set ( zookeeper_path + " /tail_ptr " , serializeCSN ( new_tail_ptr ) , stat . version ) ;
2022-03-14 20:43:34 +00:00
tail_ptr . store ( new_tail_ptr ) ;
/// Now we can find and remove old entries
TIDMap tids ;
{
std : : lock_guard lock { mutex } ;
tids = tid_to_csn ;
}
2021-12-28 11:23:35 +00:00
2022-03-14 20:43:34 +00:00
/// TODO support batching
std : : vector < TIDHash > removed_entries ;
CSN latest_entry_csn = latest_snapshot . load ( ) ;
for ( const auto & elem : tids )
{
/// Definitely not safe to remove
if ( new_tail_ptr < = elem . second . tid . start_csn )
continue ;
/// Keep at least one node (the latest one we fetched)
if ( elem . second . csn = = latest_entry_csn )
continue ;
LOG_TEST ( log , " Removing entry {} -> {} " , elem . second . tid , elem . second . csn ) ;
2022-03-16 19:16:26 +00:00
auto code = zookeeper - > tryRemove ( zookeeper_path_log + " / " + serializeCSN ( elem . second . csn ) ) ;
2022-03-14 20:43:34 +00:00
if ( code = = Coordination : : Error : : ZOK | | code = = Coordination : : Error : : ZNONODE )
removed_entries . push_back ( elem . first ) ;
}
std : : lock_guard lock { mutex } ;
for ( const auto & tid_hash : removed_entries )
tid_to_csn . erase ( tid_hash ) ;
}
CSN TransactionLog : : getLatestSnapshot ( ) const
2021-03-31 17:55:04 +00:00
{
2021-04-08 17:20:45 +00:00
return latest_snapshot . load ( ) ;
2021-03-31 17:55:04 +00:00
}
MergeTreeTransactionPtr TransactionLog : : beginTransaction ( )
{
2021-06-04 09:26:47 +00:00
MergeTreeTransactionPtr txn ;
2021-04-08 17:20:45 +00:00
{
std : : lock_guard lock { running_list_mutex } ;
2022-03-14 20:43:34 +00:00
CSN snapshot = latest_snapshot . load ( ) ;
2021-06-04 09:26:47 +00:00
LocalTID ltid = 1 + local_tid_counter . fetch_add ( 1 ) ;
2021-11-08 18:56:09 +00:00
txn = std : : make_shared < MergeTreeTransaction > ( snapshot , ltid , ServerUUID : : get ( ) ) ;
2021-06-04 09:26:47 +00:00
bool inserted = running_list . try_emplace ( txn - > tid . getHash ( ) , txn ) . second ;
2021-04-08 17:20:45 +00:00
if ( ! inserted )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " I's a bug: TID {} {} exists " , txn - > tid . getHash ( ) , txn - > tid ) ;
2021-06-04 09:26:47 +00:00
txn - > snapshot_in_use_it = snapshots_in_use . insert ( snapshots_in_use . end ( ) , snapshot ) ;
2021-04-08 17:20:45 +00:00
}
2022-01-14 14:03:00 +00:00
LOG_TEST ( log , " Beginning transaction {} ({}) " , txn - > tid , txn - > tid . getHash ( ) ) ;
tryWriteEventToSystemLog ( log , global_context , TransactionsInfoLogElement : : BEGIN , txn - > tid ) ;
2021-04-08 17:20:45 +00:00
return txn ;
2021-03-31 17:55:04 +00:00
}
CSN TransactionLog : : commitTransaction ( const MergeTreeTransactionPtr & txn )
{
2022-03-14 20:43:34 +00:00
/// Some precommit checks, may throw
2022-03-15 13:35:48 +00:00
auto committing_lock = txn - > beforeCommit ( ) ;
2021-04-08 17:20:45 +00:00
CSN new_csn ;
if ( txn - > isReadOnly ( ) )
{
2022-03-14 20:43:34 +00:00
/// Don't need to allocate CSN in ZK for readonly transactions, it's safe to use snapshot/start_csn as "commit" timestamp
2022-01-14 14:03:00 +00:00
LOG_TEST ( log , " Closing readonly transaction {} " , txn - > tid ) ;
2021-04-08 17:20:45 +00:00
new_csn = txn - > snapshot ;
2022-01-14 14:03:00 +00:00
tryWriteEventToSystemLog ( log , global_context , TransactionsInfoLogElement : : COMMIT , txn - > tid , new_csn ) ;
2021-04-08 17:20:45 +00:00
}
else
{
2022-02-14 19:47:17 +00:00
LOG_TEST ( log , " Committing transaction {} " , txn - > dumpDescription ( ) ) ;
2021-12-28 11:23:35 +00:00
/// TODO handle connection loss
/// TODO support batching
2022-01-20 18:15:23 +00:00
auto current_zookeeper = getZooKeeper ( ) ;
2022-03-16 19:16:26 +00:00
String path_created = current_zookeeper - > create ( zookeeper_path_log + " /csn- " , serializeTID ( txn - > tid ) , zkutil : : CreateMode : : PersistentSequential ) ; /// Commit point
2022-03-14 20:43:34 +00:00
NOEXCEPT_SCOPE ;
/// FIXME Transactions: Sequential node numbers in ZooKeeper are Int32, but 31 bit is not enough for production use
/// (overflow is possible in a several weeks/months of active usage)
2022-03-16 19:16:26 +00:00
new_csn = deserializeCSN ( path_created . substr ( zookeeper_path_log . size ( ) + 1 ) ) ;
2022-01-14 14:03:00 +00:00
2021-12-28 11:23:35 +00:00
LOG_INFO ( log , " Transaction {} committed with CSN={} " , txn - > tid , new_csn ) ;
2022-01-14 14:03:00 +00:00
tryWriteEventToSystemLog ( log , global_context , TransactionsInfoLogElement : : COMMIT , txn - > tid , new_csn ) ;
2021-04-08 17:20:45 +00:00
2022-03-14 20:43:34 +00:00
/// Wait for committed changes to become actually visible, so the next transaction in this session will see the changes
2021-12-28 11:23:35 +00:00
/// TODO it's optional, add a setting for this
auto current_latest_snapshot = latest_snapshot . load ( ) ;
2022-01-19 18:29:31 +00:00
while ( current_latest_snapshot < new_csn & & ! stop_flag )
2021-12-28 11:23:35 +00:00
{
latest_snapshot . wait ( current_latest_snapshot ) ;
current_latest_snapshot = latest_snapshot . load ( ) ;
}
}
2021-05-18 17:07:29 +00:00
2022-03-14 20:43:34 +00:00
/// Write allocated CSN, so we will be able to cleanup log in ZK. This method is noexcept.
2021-04-09 12:53:51 +00:00
txn - > afterCommit ( new_csn ) ;
2021-04-08 17:20:45 +00:00
{
2022-03-14 20:43:34 +00:00
/// Finally we can remove transaction from the list and release the snapshot
2021-04-08 17:20:45 +00:00
std : : lock_guard lock { running_list_mutex } ;
bool removed = running_list . erase ( txn - > tid . getHash ( ) ) ;
if ( ! removed )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " I's a bug: TID {} {} doesn't exist " , txn - > tid . getHash ( ) , txn - > tid ) ;
2021-06-04 09:26:47 +00:00
snapshots_in_use . erase ( txn - > snapshot_in_use_it ) ;
2021-04-08 17:20:45 +00:00
}
2022-01-14 14:03:00 +00:00
2021-04-09 12:53:51 +00:00
return new_csn ;
2021-03-31 17:55:04 +00:00
}
2021-04-09 12:53:51 +00:00
void TransactionLog : : rollbackTransaction ( const MergeTreeTransactionPtr & txn ) noexcept
2021-03-31 17:55:04 +00:00
{
2022-02-04 18:18:20 +00:00
LOG_TRACE ( log , " Rolling back transaction {}{} " , txn - > tid ,
std : : uncaught_exceptions ( ) ? fmt : : format ( " due to uncaught exception (code: {}) " , getCurrentExceptionCode ( ) ) : " " ) ;
2022-01-14 14:03:00 +00:00
if ( ! txn - > rollback ( ) )
2022-03-14 20:43:34 +00:00
{
/// Transaction was cancelled concurrently, it's already rolled back.
2022-03-15 13:35:48 +00:00
assert ( txn - > csn = = Tx : : RolledBackCSN ) ;
2022-01-14 14:03:00 +00:00
return ;
2022-03-14 20:43:34 +00:00
}
2022-01-14 14:03:00 +00:00
2021-04-08 17:20:45 +00:00
{
std : : lock_guard lock { running_list_mutex } ;
bool removed = running_list . erase ( txn - > tid . getHash ( ) ) ;
if ( ! removed )
2021-04-09 12:53:51 +00:00
abort ( ) ;
2021-06-04 09:26:47 +00:00
snapshots_in_use . erase ( txn - > snapshot_in_use_it ) ;
2021-04-08 17:20:45 +00:00
}
2022-01-14 14:03:00 +00:00
tryWriteEventToSystemLog ( log , global_context , TransactionsInfoLogElement : : ROLLBACK , txn - > tid ) ;
2021-04-08 17:20:45 +00:00
}
MergeTreeTransactionPtr TransactionLog : : tryGetRunningTransaction ( const TIDHash & tid )
{
std : : lock_guard lock { running_list_mutex } ;
auto it = running_list . find ( tid ) ;
if ( it = = running_list . end ( ) )
2022-03-16 19:16:26 +00:00
return NO_TRANSACTION_PTR ;
2021-04-08 17:20:45 +00:00
return it - > second ;
}
2022-01-31 22:27:55 +00:00
CSN TransactionLog : : getCSN ( const TransactionID & tid )
2021-04-08 17:20:45 +00:00
{
2022-03-14 20:43:34 +00:00
/// Avoid creation of the instance if transactions are not actually involved
if ( tid = = Tx : : PrehistoricTID )
return Tx : : PrehistoricCSN ;
return instance ( ) . getCSNImpl ( tid . getHash ( ) ) ;
2021-04-08 17:20:45 +00:00
}
2022-01-31 22:27:55 +00:00
CSN TransactionLog : : getCSN ( const TIDHash & tid )
2021-04-08 17:20:45 +00:00
{
2022-01-31 22:27:55 +00:00
/// Avoid creation of the instance if transactions are not actually involved
2021-04-08 17:20:45 +00:00
if ( tid = = Tx : : PrehistoricTID . getHash ( ) )
return Tx : : PrehistoricCSN ;
2022-01-31 22:27:55 +00:00
return instance ( ) . getCSNImpl ( tid ) ;
}
2022-03-14 20:43:34 +00:00
CSN TransactionLog : : getCSNImpl ( const TIDHash & tid_hash ) const
2022-01-31 22:27:55 +00:00
{
2022-03-14 20:43:34 +00:00
assert ( tid_hash ) ;
assert ( tid_hash ! = Tx : : EmptyTID . getHash ( ) ) ;
2022-01-31 22:27:55 +00:00
2022-01-20 18:15:23 +00:00
std : : lock_guard lock { mutex } ;
2022-03-14 20:43:34 +00:00
auto it = tid_to_csn . find ( tid_hash ) ;
if ( it ! = tid_to_csn . end ( ) )
return it - > second . csn ;
return Tx : : UnknownCSN ;
}
void TransactionLog : : assertTIDIsNotOutdated ( const TransactionID & tid )
{
if ( tid = = Tx : : PrehistoricTID )
return ;
2022-03-18 11:01:26 +00:00
/// Ensure that we are not trying to get CSN for TID that was already removed from the log
2022-03-14 20:43:34 +00:00
CSN tail = instance ( ) . tail_ptr . load ( ) ;
if ( tail < = tid . start_csn )
return ;
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Trying to get CSN for too old TID {}, current tail_ptr is {}, probably it's a bug " , tid , tail ) ;
2021-03-31 17:55:04 +00:00
}
2022-03-14 20:43:34 +00:00
CSN TransactionLog : : getOldestSnapshot ( ) const
2021-06-04 09:26:47 +00:00
{
std : : lock_guard lock { running_list_mutex } ;
if ( snapshots_in_use . empty ( ) )
return getLatestSnapshot ( ) ;
return snapshots_in_use . front ( ) ;
}
2022-03-10 21:29:58 +00:00
TransactionLog : : TransactionsList TransactionLog : : getTransactionsList ( ) const
{
std : : lock_guard lock { running_list_mutex } ;
return running_list ;
}
2021-03-31 17:55:04 +00:00
}