add transaction counters

This commit is contained in:
Alexander Tokmakov 2021-03-31 20:55:04 +03:00
parent 92f57fd669
commit f596906f47
25 changed files with 542 additions and 4 deletions

View File

@ -548,6 +548,8 @@
M(578, INVALID_FORMAT_INSERT_QUERY_WITH_DATA) \
M(579, INCORRECT_PART_TYPE) \
M(580, CANNOT_SET_ROUNDING_MODE) \
M(581, INVALID_TRANSACTION) \
M(582, SERIALIZATION_ERROR) \
\
M(998, POSTGRESQL_CONNECTION_FAILURE) \
M(999, KEEPER_EXCEPTION) \

View File

@ -0,0 +1,18 @@
#include <Common/TransactionMetadata.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeUUID.h>
namespace DB
{
DataTypePtr TransactionID::getDataType()
{
DataTypes types;
types.push_back(std::make_shared<DataTypeUInt64>());
types.push_back(std::make_shared<DataTypeUInt64>());
types.push_back(std::make_shared<DataTypeUUID>());
return std::make_shared<DataTypeTuple>(std::move(types));
}
}

View File

@ -0,0 +1,41 @@
#pragma once
#include <Core/Types.h>
#include <Core/UUID.h>
namespace DB
{
class IDataType;
using DataTypePtr = std::shared_ptr<const IDataType>;
/// FIXME Transactions: Sequential node numbers in ZooKeeper are Int32, but 31 bit is not enough
using CSN = UInt64;
using Snapshot = CSN;
using LocalTID = UInt64;
struct TransactionID
{
CSN start_csn = 0;
LocalTID local_tid = 0;
UUID host_id = UUIDHelpers::Nil; /// Depends on #17278, leave it Nil for now.
static DataTypePtr getDataType();
};
namespace Tx
{
const CSN UnknownCSN = 0;
const CSN PrehistoricCSN = 1;
const LocalTID PrehistoricLocalTID = 1;
const TransactionID EmptyTID = {0, 0, UUIDHelpers::Nil};
const TransactionID PrehistoricTID = {0, PrehistoricLocalTID, UUIDHelpers::Nil};
/// So far, that changes will never become visible
const CSN RolledBackCSN = std::numeric_limits<CSN>::max();
}
}

View File

@ -0,0 +1,58 @@
#include <Functions/FunctionFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/MergeTreeTransaction.h>
namespace DB
{
namespace
{
class FunctionTransactionID : public IFunction
{
public:
static constexpr auto name = "transactionID";
static FunctionPtr create(const Context & context)
{
return std::make_shared<FunctionTransactionID>(context.getCurrentTransaction());
}
explicit FunctionTransactionID(MergeTreeTransactionPtr && txn_) : txn(txn_)
{
}
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 0; }
DataTypePtr getReturnTypeImpl(const DataTypes &) const override
{
return TransactionID::getDataType();
}
bool isDeterministic() const override { return false; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr & result_type, size_t input_rows_count) const override
{
Tuple res;
if (txn)
res = {txn->tid.start_csn, txn->tid.local_tid, txn->tid.host_id};
else
res = {UInt64(0), UInt64(0), UUIDHelpers::Nil};
return result_type->createColumnConst(input_rows_count, res);
}
private:
MergeTreeTransactionPtr txn;
};
}
void registerFunctionsTransactionCounters(FunctionFactory & factory)
{
factory.registerFunction<FunctionTransactionID>();
}
}

View File

@ -73,6 +73,7 @@ void registerFunctionFile(FunctionFactory & factory);
void registerFunctionConnectionId(FunctionFactory & factory);
void registerFunctionPartitionId(FunctionFactory & factory);
void registerFunctionIsIPAddressContainedIn(FunctionFactory &);
void registerFunctionsTransactionCounters(FunctionFactory & factory);
#if USE_ICU
void registerFunctionConvertCharset(FunctionFactory &);
@ -146,6 +147,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
registerFunctionConnectionId(factory);
registerFunctionPartitionId(factory);
registerFunctionIsIPAddressContainedIn(factory);
registerFunctionsTransactionCounters(factory);
#if USE_ICU
registerFunctionConvertCharset(factory);

View File

@ -2581,6 +2581,21 @@ ZooKeeperMetadataTransactionPtr Context::getZooKeeperMetadataTransaction() const
return metadata_transaction;
}
void Context::setCurrentTransaction(MergeTreeTransactionPtr txn)
{
assert(!merge_tree_transaction || !txn);
assert(this == session_context || this == query_context);
int enable_mvcc_test_helper = getConfigRef().getInt("_enable_mvcc_test_helper_dev", 0);
if (enable_mvcc_test_helper != 42)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Transactions are not supported");
merge_tree_transaction = std::move(txn);
}
MergeTreeTransactionPtr Context::getCurrentTransaction() const
{
return merge_tree_transaction;
}
PartUUIDsPtr Context::getPartUUIDs()
{
auto lock = getLock();

View File

@ -122,6 +122,8 @@ struct BackgroundTaskSchedulingSettings;
class ZooKeeperMetadataTransaction;
using ZooKeeperMetadataTransactionPtr = std::shared_ptr<ZooKeeperMetadataTransaction>;
class MergeTreeTransaction;
using MergeTreeTransactionPtr = std::shared_ptr<MergeTreeTransaction>;
#if USE_EMBEDDED_COMPILER
class CompiledExpressionCache;
@ -290,6 +292,8 @@ private:
/// thousands of signatures.
/// And I hope it will be replaced with more common Transaction sometime.
MergeTreeTransactionPtr merge_tree_transaction; /// Current transaction context. Can be inside session or query context.
/// Use copy constructor or createGlobal() instead
Context();
@ -757,6 +761,10 @@ public:
/// Returns context of current distributed DDL query or nullptr.
ZooKeeperMetadataTransactionPtr getZooKeeperMetadataTransaction() const;
void setCurrentTransaction(MergeTreeTransactionPtr txn);
MergeTreeTransactionPtr getCurrentTransaction() const;
struct MySQLWireContext
{
uint8_t sequence_id = 0;

View File

@ -29,6 +29,7 @@
#include <Parsers/ASTWatchQuery.h>
#include <Parsers/ASTGrantQuery.h>
#include <Parsers/MySQL/ASTCreateQuery.h>
#include <Parsers/ASTTransactionControl.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterAlterQuery.h>
@ -66,6 +67,7 @@
#include <Interpreters/InterpreterUseQuery.h>
#include <Interpreters/InterpreterWatchQuery.h>
#include <Interpreters/InterpreterExternalDDLQuery.h>
#include <Interpreters/InterpreterTransactionControlQuery.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Parsers/ASTSystemQuery.h>
@ -264,6 +266,10 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, Context &
{
return std::make_unique<InterpreterExternalDDLQuery>(query, context);
}
else if (query->as<ASTTransactionControl>())
{
return std::make_unique<InterpreterTransactionControlQuery>(query, context);
}
else
{
throw Exception("Unknown type of query: " + query->getID(), ErrorCodes::UNKNOWN_TYPE_OF_QUERY);

View File

@ -0,0 +1,72 @@
#include <Interpreters/InterpreterTransactionControlQuery.h>
#include <Parsers/ASTTransactionControl.h>
#include <Interpreters/TransactionLog.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INVALID_TRANSACTION;
}
BlockIO InterpreterTransactionControlQuery::execute()
{
if (!query_context.hasSessionContext())
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction Control Language queries are allowed only inside session");
Context & session_context = query_context.getSessionContext();
const auto & tcl = query_ptr->as<const ASTTransactionControl &>();
switch (tcl.action)
{
case ASTTransactionControl::BEGIN:
return executeBegin(session_context);
case ASTTransactionControl::COMMIT:
return executeCommit(session_context);
case ASTTransactionControl::ROLLBACK:
return executeRollback(session_context);
}
}
BlockIO InterpreterTransactionControlQuery::executeBegin(Context & context)
{
if (context.getCurrentTransaction())
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Nested transactions are not supported");
auto txn = TransactionLog::instance().beginTransaction();
context.setCurrentTransaction(txn);
query_context.setCurrentTransaction(txn);
return {};
}
BlockIO InterpreterTransactionControlQuery::executeCommit(Context & context)
{
auto txn = context.getCurrentTransaction();
if (!txn)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "There is no current transaction");
if (txn->getState() != MergeTreeTransaction::RUNNING)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction is not in RUNNING state");
TransactionLog::instance().commitTransaction(txn);
context.setCurrentTransaction(nullptr);
return {};
}
BlockIO InterpreterTransactionControlQuery::executeRollback(Context & context)
{
auto txn = context.getCurrentTransaction();
if (!txn)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "There is no current transaction");
if (txn->getState() == MergeTreeTransaction::COMMITTED)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction is in COMMITTED state");
if (txn->getState() == MergeTreeTransaction::RUNNING)
TransactionLog::instance().rollbackTransaction(txn);
context.getSessionContext().setCurrentTransaction(nullptr);
return {};
}
}

View File

@ -0,0 +1,31 @@
#pragma once
#include <Interpreters/IInterpreter.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
class InterpreterTransactionControlQuery : public IInterpreter
{
public:
InterpreterTransactionControlQuery(const ASTPtr & query_ptr_, Context & context_)
: query_ptr(query_ptr_)
, query_context(context_)
{
}
BlockIO execute() override;
bool ignoreQuota() const override { return true; }
bool ignoreLimits() const override { return true; }
private:
BlockIO executeBegin(Context & context);
BlockIO executeCommit(Context & context);
BlockIO executeRollback(Context & context);
private:
ASTPtr query_ptr;
Context & query_context;
};
}

View File

@ -0,0 +1,14 @@
#include <Interpreters/MergeTreeTransaction.h>
namespace DB
{
MergeTreeTransaction::MergeTreeTransaction(Snapshot snapshot_, LocalTID local_tid_, UUID host_id)
: tid({snapshot_, local_tid_, host_id})
, snapshot(snapshot_)
, state(RUNNING)
{
}
}

View File

@ -0,0 +1,35 @@
#pragma once
#include <Common/TransactionMetadata.h>
namespace DB
{
class MergeTreeTransaction
{
friend class TransactionLog;
public:
enum State
{
RUNNING,
COMMITTED,
ROLLED_BACK,
};
Snapshot getSnapshot() const { return snapshot; }
State getState() const { return state; }
const TransactionID tid;
MergeTreeTransaction() = delete;
MergeTreeTransaction(Snapshot snapshot_, LocalTID local_tid_, UUID host_id);
private:
Snapshot snapshot;
State state;
CSN csn = Tx::UnknownCSN;
};
using MergeTreeTransactionPtr = std::shared_ptr<MergeTreeTransaction>;
}

View File

@ -108,7 +108,9 @@ Block QueryLogElement::createBlock()
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "used_formats"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "used_functions"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "used_storages"},
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "used_table_functions"}
{std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "used_table_functions"},
{TransactionID::getDataType(), "transaction_id"}
};
}
@ -237,6 +239,8 @@ void QueryLogElement::appendToBlock(MutableColumns & columns) const
fill_column(used_storages, column_storage_factory_objects);
fill_column(used_table_functions, column_table_function_factory_objects);
}
columns[i++]->insert(Tuple{tid.start_csn, tid.local_tid, tid.host_id});
}
void QueryLogElement::appendClientInfo(const ClientInfo & client_info, MutableColumns & columns, size_t & i)

View File

@ -2,6 +2,7 @@
#include <Interpreters/SystemLog.h>
#include <Interpreters/ClientInfo.h>
#include <Common/TransactionMetadata.h>
namespace ProfileEvents
@ -80,6 +81,8 @@ struct QueryLogElement
std::shared_ptr<ProfileEvents::Counters> profile_counters;
std::shared_ptr<Settings> query_settings;
TransactionID tid;
static std::string name() { return "QueryLog"; }
static Block createBlock();

View File

@ -0,0 +1,44 @@
#include <Interpreters/TransactionLog.h>
namespace DB
{
TransactionLog & TransactionLog::instance()
{
static TransactionLog inst;
return inst;
}
TransactionLog::TransactionLog()
{
csn_counter = 1;
local_tid_counter = 1;
}
Snapshot TransactionLog::getLatestSnapshot() const
{
return csn_counter.load();
}
MergeTreeTransactionPtr TransactionLog::beginTransaction()
{
Snapshot snapshot = csn_counter.load();
LocalTID ltid = 1 + local_tid_counter.fetch_add(1);
return std::make_shared<MergeTreeTransaction>(snapshot, ltid, UUIDHelpers::Nil);
}
CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn)
{
txn->csn = 1 + csn_counter.fetch_add(1);
/// TODO Transactions: reset local_tid_counter
txn->state = MergeTreeTransaction::COMMITTED;
return txn->csn;
}
void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn)
{
txn->csn = Tx::RolledBackCSN;
txn->state = MergeTreeTransaction::ROLLED_BACK;
}
}

View File

@ -0,0 +1,30 @@
#pragma once
#include <Interpreters/MergeTreeTransaction.h>
#include <boost/noncopyable.hpp>
#include <mutex>
namespace DB
{
class TransactionLog final : private boost::noncopyable
{
public:
static TransactionLog & instance();
TransactionLog();
Snapshot getLatestSnapshot() const;
/// Allocated TID, returns transaction object
MergeTreeTransactionPtr beginTransaction();
CSN commitTransaction(const MergeTreeTransactionPtr & txn);
void rollbackTransaction(const MergeTreeTransactionPtr & txn);
private:
std::atomic<CSN> csn_counter;
std::atomic<LocalTID> local_tid_counter;
};
}

View File

@ -50,6 +50,7 @@
#include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/executeQuery.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <Common/ProfileEvents.h>
#include <Common/SensitiveDataMasker.h>
@ -266,6 +267,9 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c
if (elem.log_comment.size() > settings.max_query_size)
elem.log_comment.resize(settings.max_query_size);
if (auto txn = context.getCurrentTransaction())
elem.tid = txn->tid;
if (settings.calculate_text_stack_trace)
setExceptionStackTrace(elem);
logException(context, elem);
@ -634,6 +638,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
elem.client_info = context.getClientInfo();
if (auto txn = context.getCurrentTransaction())
elem.tid = txn->tid;
bool log_queries = settings.log_queries && !internal;
/// Log into system table start of query execution, if need.

View File

@ -0,0 +1,34 @@
#include <Parsers/ASTTransactionControl.h>
#include <IO/Operators.h>
#include <Common/SipHash.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
void ASTTransactionControl::formatImpl(const FormatSettings & format /*state*/, FormatState &, FormatStateStacked /*frame*/) const
{
switch (action)
{
case BEGIN:
format.ostr << (format.hilite ? hilite_keyword : "") << "BEGIN TRANSACTION" << (format.hilite ? hilite_none : "");
break;
case COMMIT:
format.ostr << (format.hilite ? hilite_keyword : "") << "COMMIT" << (format.hilite ? hilite_none : "");
break;
case ROLLBACK:
format.ostr << (format.hilite ? hilite_keyword : "") << "ROLLBACK" << (format.hilite ? hilite_none : "");
break;
}
}
void ASTTransactionControl::updateTreeHashImpl(SipHash & hash_state) const
{
hash_state.update(action);
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <Parsers/IAST.h>
namespace DB
{
/// Common AST for TCL queries
class ASTTransactionControl : public IAST
{
public:
enum QueryType
{
BEGIN,
COMMIT,
ROLLBACK,
};
QueryType action;
ASTTransactionControl(QueryType action_) : action(action_) {}
String getID(char /*delimiter*/) const override { return "ASTTransactionControl"; }
ASTPtr clone() const override { return std::make_shared<ASTTransactionControl>(*this); }
void formatImpl(const FormatSettings & format, FormatState & /*state*/, FormatStateStacked /*frame*/) const override;
void updateTreeHashImpl(SipHash & hash_state) const override;
};
}

View File

@ -18,6 +18,7 @@
#include <Parsers/ParserSystemQuery.h>
#include <Parsers/ParserUseQuery.h>
#include <Parsers/ParserExternalDDLQuery.h>
#include <Parsers/ParserTransactionControl.h>
namespace DB
@ -40,6 +41,7 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserGrantQuery grant_p;
ParserSetRoleQuery set_role_p;
ParserExternalDDLQuery external_ddl_p;
ParserTransactionControl transaction_control_p;
bool res = query_with_output_p.parse(pos, node, expected)
|| insert_p.parse(pos, node, expected)
@ -54,7 +56,8 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
|| create_settings_profile_p.parse(pos, node, expected)
|| drop_access_entity_p.parse(pos, node, expected)
|| grant_p.parse(pos, node, expected)
|| external_ddl_p.parse(pos, node, expected);
|| external_ddl_p.parse(pos, node, expected)
|| transaction_control_p.parse(pos, node, expected);
return res;
}

View File

@ -0,0 +1,25 @@
#include <Parsers/ParserTransactionControl.h>
#include <Parsers/ASTTransactionControl.h>
#include <Parsers/CommonParsers.h>
namespace DB
{
bool ParserTransactionControl::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ASTTransactionControl::QueryType action;
if (ParserKeyword("BEGIN TRANSACTION").ignore(pos, expected))
action = ASTTransactionControl::BEGIN;
else if (ParserKeyword("COMMIT").ignore(pos, expected))
action = ASTTransactionControl::COMMIT;
else if (ParserKeyword("ROLLBACK").ignore(pos, expected))
action = ASTTransactionControl::ROLLBACK;
else
return false;
node = std::make_shared<ASTTransactionControl>(action);
return true;
}
}

View File

@ -0,0 +1,14 @@
#pragma once
#include <Parsers/IParserBase.h>
namespace DB
{
class ParserTransactionControl : public IParserBase
{
public:
const char * getName() const override { return "TCL query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
}

View File

@ -16,6 +16,7 @@
#include <Storages/MergeTree/MergeTreeDataPartTTLInfo.h>
#include <Storages/MergeTree/MergeTreeIOSettings.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Common/TransactionMetadata.h>
#include <Poco/Path.h>
@ -301,6 +302,19 @@ public:
CompressionCodecPtr default_codec;
struct VersionMetadata
{
TransactionID mintid = Tx::EmptyTID;
TransactionID maxtid = Tx::EmptyTID;
bool maybe_visible = false;
CSN mincsn = Tx::UnknownCSN;
CSN maxcsn = Tx::UnknownCSN;
};
mutable VersionMetadata versions;
/// For data in RAM ('index')
UInt64 getIndexSizeInBytes() const;
UInt64 getIndexSizeInAllocatedBytes() const;

View File

@ -993,6 +993,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
(*it)->remove_time.store((*it)->modification_time, std::memory_order_relaxed);
modifyPartState(it, DataPartState::Outdated);
(*it)->versions.maxtid = Tx::PrehistoricTID;
(*it)->versions.maxcsn = Tx::PrehistoricCSN;
removePartContributionToDataVolume(*it);
};
@ -1000,6 +1002,14 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
while (curr_jt != data_parts_by_state_and_info.end() && (*curr_jt)->getState() == DataPartState::Committed)
{
/// We do not have version metadata and transactions history for old parts,
/// so let's consider that such parts were created by some ancient transaction
/// and were committed with some prehistoric CSN.
/// TODO Transactions: distinguish "prehistoric" parts from uncommitted parts in case of hard restart
(*curr_jt)->versions.mintid = Tx::PrehistoricTID;
(*curr_jt)->versions.mincsn = Tx::PrehistoricCSN;
(*curr_jt)->versions.maybe_visible = true;
/// Don't consider data parts belonging to different partitions.
if ((*curr_jt)->info.partition_id != (*prev_jt)->info.partition_id)
{
@ -1030,7 +1040,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
calculateColumnSizesImpl();
LOG_DEBUG(log, "Loaded data parts ({} items)", data_parts_indexes.size());
}

View File

@ -12,6 +12,7 @@
#include <Databases/IDatabase.h>
#include <Parsers/queryToString.h>
#include <Common/hex.h>
#include <Common/TransactionMetadata.h>
namespace DB
{
@ -75,7 +76,12 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_)
{"rows_where_ttl_info.expression", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"rows_where_ttl_info.min", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
{"rows_where_ttl_info.max", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())}
{"rows_where_ttl_info.max", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
{"mintid", TransactionID::getDataType()},
{"maxtid", TransactionID::getDataType()},
{"mincsn", std::make_shared<DataTypeUInt64>()},
{"maxcsn", std::make_shared<DataTypeUInt64>()},
}
)
{
@ -257,6 +263,20 @@ void StorageSystemParts::processNextStorage(
/// Do not use part->getState*, it can be changed from different thread
if (has_state_column)
columns[res_index++]->insert(IMergeTreeDataPart::stateToString(part_state));
auto get_tid_as_field = [](const TransactionID & tid) -> Field
{
return Tuple{tid.start_csn, tid.local_tid, tid.host_id};
};
if (columns_mask[src_index++])
columns[res_index++]->insert(get_tid_as_field(part->versions.mintid));
if (columns_mask[src_index++])
columns[res_index++]->insert(get_tid_as_field(part->versions.maxtid));
if (columns_mask[src_index++])
columns[res_index++]->insert(part->versions.mincsn);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->versions.maxcsn);
}
}