diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 0f85ad5c792..55745ffc211 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -548,6 +548,8 @@ M(578, INVALID_FORMAT_INSERT_QUERY_WITH_DATA) \ M(579, INCORRECT_PART_TYPE) \ M(580, CANNOT_SET_ROUNDING_MODE) \ + M(581, INVALID_TRANSACTION) \ + M(582, SERIALIZATION_ERROR) \ \ M(998, POSTGRESQL_CONNECTION_FAILURE) \ M(999, KEEPER_EXCEPTION) \ diff --git a/src/Common/TransactionMetadata.cpp b/src/Common/TransactionMetadata.cpp new file mode 100644 index 00000000000..430ea5cda29 --- /dev/null +++ b/src/Common/TransactionMetadata.cpp @@ -0,0 +1,18 @@ +#include +#include +#include +#include + +namespace DB +{ + +DataTypePtr TransactionID::getDataType() +{ + DataTypes types; + types.push_back(std::make_shared()); + types.push_back(std::make_shared()); + types.push_back(std::make_shared()); + return std::make_shared(std::move(types)); +} + +} diff --git a/src/Common/TransactionMetadata.h b/src/Common/TransactionMetadata.h new file mode 100644 index 00000000000..34eaaf2d245 --- /dev/null +++ b/src/Common/TransactionMetadata.h @@ -0,0 +1,41 @@ +#pragma once +#include +#include + +namespace DB +{ + +class IDataType; +using DataTypePtr = std::shared_ptr; + +/// FIXME Transactions: Sequential node numbers in ZooKeeper are Int32, but 31 bit is not enough +using CSN = UInt64; +using Snapshot = CSN; +using LocalTID = UInt64; + +struct TransactionID +{ + CSN start_csn = 0; + LocalTID local_tid = 0; + UUID host_id = UUIDHelpers::Nil; /// Depends on #17278, leave it Nil for now. + + static DataTypePtr getDataType(); +}; + +namespace Tx +{ + +const CSN UnknownCSN = 0; +const CSN PrehistoricCSN = 1; + +const LocalTID PrehistoricLocalTID = 1; + +const TransactionID EmptyTID = {0, 0, UUIDHelpers::Nil}; +const TransactionID PrehistoricTID = {0, PrehistoricLocalTID, UUIDHelpers::Nil}; + +/// So far, that changes will never become visible +const CSN RolledBackCSN = std::numeric_limits::max(); + +} + +} diff --git a/src/Functions/FunctionsTransactionCounters.cpp b/src/Functions/FunctionsTransactionCounters.cpp new file mode 100644 index 00000000000..93a2961fb74 --- /dev/null +++ b/src/Functions/FunctionsTransactionCounters.cpp @@ -0,0 +1,58 @@ +#include +#include +#include + + +namespace DB +{ + +namespace +{ + +class FunctionTransactionID : public IFunction +{ +public: + static constexpr auto name = "transactionID"; + + static FunctionPtr create(const Context & context) + { + return std::make_shared(context.getCurrentTransaction()); + } + + explicit FunctionTransactionID(MergeTreeTransactionPtr && txn_) : txn(txn_) + { + } + + String getName() const override { return name; } + + size_t getNumberOfArguments() const override { return 0; } + + DataTypePtr getReturnTypeImpl(const DataTypes &) const override + { + return TransactionID::getDataType(); + } + + bool isDeterministic() const override { return false; } + + ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr & result_type, size_t input_rows_count) const override + { + Tuple res; + if (txn) + res = {txn->tid.start_csn, txn->tid.local_tid, txn->tid.host_id}; + else + res = {UInt64(0), UInt64(0), UUIDHelpers::Nil}; + return result_type->createColumnConst(input_rows_count, res); + } + +private: + MergeTreeTransactionPtr txn; +}; + +} + +void registerFunctionsTransactionCounters(FunctionFactory & factory) +{ + factory.registerFunction(); +} + +} diff --git a/src/Functions/registerFunctionsMiscellaneous.cpp b/src/Functions/registerFunctionsMiscellaneous.cpp index d00d4a42db0..ab415740be3 100644 --- a/src/Functions/registerFunctionsMiscellaneous.cpp +++ b/src/Functions/registerFunctionsMiscellaneous.cpp @@ -73,6 +73,7 @@ void registerFunctionFile(FunctionFactory & factory); void registerFunctionConnectionId(FunctionFactory & factory); void registerFunctionPartitionId(FunctionFactory & factory); void registerFunctionIsIPAddressContainedIn(FunctionFactory &); +void registerFunctionsTransactionCounters(FunctionFactory & factory); #if USE_ICU void registerFunctionConvertCharset(FunctionFactory &); @@ -146,6 +147,7 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory) registerFunctionConnectionId(factory); registerFunctionPartitionId(factory); registerFunctionIsIPAddressContainedIn(factory); + registerFunctionsTransactionCounters(factory); #if USE_ICU registerFunctionConvertCharset(factory); diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 8eec5d099e2..5c6afb38ff4 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -2581,6 +2581,21 @@ ZooKeeperMetadataTransactionPtr Context::getZooKeeperMetadataTransaction() const return metadata_transaction; } +void Context::setCurrentTransaction(MergeTreeTransactionPtr txn) +{ + assert(!merge_tree_transaction || !txn); + assert(this == session_context || this == query_context); + int enable_mvcc_test_helper = getConfigRef().getInt("_enable_mvcc_test_helper_dev", 0); + if (enable_mvcc_test_helper != 42) + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Transactions are not supported"); + merge_tree_transaction = std::move(txn); +} + +MergeTreeTransactionPtr Context::getCurrentTransaction() const +{ + return merge_tree_transaction; +} + PartUUIDsPtr Context::getPartUUIDs() { auto lock = getLock(); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 5c98a2ba64a..e035031f993 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -122,6 +122,8 @@ struct BackgroundTaskSchedulingSettings; class ZooKeeperMetadataTransaction; using ZooKeeperMetadataTransactionPtr = std::shared_ptr; +class MergeTreeTransaction; +using MergeTreeTransactionPtr = std::shared_ptr; #if USE_EMBEDDED_COMPILER class CompiledExpressionCache; @@ -290,6 +292,8 @@ private: /// thousands of signatures. /// And I hope it will be replaced with more common Transaction sometime. + MergeTreeTransactionPtr merge_tree_transaction; /// Current transaction context. Can be inside session or query context. + /// Use copy constructor or createGlobal() instead Context(); @@ -757,6 +761,10 @@ public: /// Returns context of current distributed DDL query or nullptr. ZooKeeperMetadataTransactionPtr getZooKeeperMetadataTransaction() const; + void setCurrentTransaction(MergeTreeTransactionPtr txn); + MergeTreeTransactionPtr getCurrentTransaction() const; + + struct MySQLWireContext { uint8_t sequence_id = 0; diff --git a/src/Interpreters/InterpreterFactory.cpp b/src/Interpreters/InterpreterFactory.cpp index 15e4c52f040..5684150fed1 100644 --- a/src/Interpreters/InterpreterFactory.cpp +++ b/src/Interpreters/InterpreterFactory.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include @@ -66,6 +67,7 @@ #include #include #include +#include #include #include @@ -264,6 +266,10 @@ std::unique_ptr InterpreterFactory::get(ASTPtr & query, Context & { return std::make_unique(query, context); } + else if (query->as()) + { + return std::make_unique(query, context); + } else { throw Exception("Unknown type of query: " + query->getID(), ErrorCodes::UNKNOWN_TYPE_OF_QUERY); diff --git a/src/Interpreters/InterpreterTransactionControlQuery.cpp b/src/Interpreters/InterpreterTransactionControlQuery.cpp new file mode 100644 index 00000000000..369f5edd1da --- /dev/null +++ b/src/Interpreters/InterpreterTransactionControlQuery.cpp @@ -0,0 +1,72 @@ +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int INVALID_TRANSACTION; +} + +BlockIO InterpreterTransactionControlQuery::execute() +{ + if (!query_context.hasSessionContext()) + throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction Control Language queries are allowed only inside session"); + + Context & session_context = query_context.getSessionContext(); + const auto & tcl = query_ptr->as(); + + switch (tcl.action) + { + case ASTTransactionControl::BEGIN: + return executeBegin(session_context); + case ASTTransactionControl::COMMIT: + return executeCommit(session_context); + case ASTTransactionControl::ROLLBACK: + return executeRollback(session_context); + } +} + +BlockIO InterpreterTransactionControlQuery::executeBegin(Context & context) +{ + if (context.getCurrentTransaction()) + throw Exception(ErrorCodes::INVALID_TRANSACTION, "Nested transactions are not supported"); + + auto txn = TransactionLog::instance().beginTransaction(); + context.setCurrentTransaction(txn); + query_context.setCurrentTransaction(txn); + return {}; +} + +BlockIO InterpreterTransactionControlQuery::executeCommit(Context & context) +{ + auto txn = context.getCurrentTransaction(); + if (!txn) + throw Exception(ErrorCodes::INVALID_TRANSACTION, "There is no current transaction"); + if (txn->getState() != MergeTreeTransaction::RUNNING) + throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction is not in RUNNING state"); + + TransactionLog::instance().commitTransaction(txn); + context.setCurrentTransaction(nullptr); + return {}; +} + +BlockIO InterpreterTransactionControlQuery::executeRollback(Context & context) +{ + auto txn = context.getCurrentTransaction(); + if (!txn) + throw Exception(ErrorCodes::INVALID_TRANSACTION, "There is no current transaction"); + if (txn->getState() == MergeTreeTransaction::COMMITTED) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction is in COMMITTED state"); + + if (txn->getState() == MergeTreeTransaction::RUNNING) + TransactionLog::instance().rollbackTransaction(txn); + context.getSessionContext().setCurrentTransaction(nullptr); + return {}; +} + +} diff --git a/src/Interpreters/InterpreterTransactionControlQuery.h b/src/Interpreters/InterpreterTransactionControlQuery.h new file mode 100644 index 00000000000..4c455d22a24 --- /dev/null +++ b/src/Interpreters/InterpreterTransactionControlQuery.h @@ -0,0 +1,31 @@ +#pragma once +#include +#include + +namespace DB +{ + +class InterpreterTransactionControlQuery : public IInterpreter +{ +public: + InterpreterTransactionControlQuery(const ASTPtr & query_ptr_, Context & context_) + : query_ptr(query_ptr_) + , query_context(context_) + { + } + + BlockIO execute() override; + + bool ignoreQuota() const override { return true; } + bool ignoreLimits() const override { return true; } +private: + BlockIO executeBegin(Context & context); + BlockIO executeCommit(Context & context); + BlockIO executeRollback(Context & context); + +private: + ASTPtr query_ptr; + Context & query_context; +}; + +} diff --git a/src/Interpreters/MergeTreeTransaction.cpp b/src/Interpreters/MergeTreeTransaction.cpp new file mode 100644 index 00000000000..d1af1d61ba7 --- /dev/null +++ b/src/Interpreters/MergeTreeTransaction.cpp @@ -0,0 +1,14 @@ +#include + +namespace DB +{ + +MergeTreeTransaction::MergeTreeTransaction(Snapshot snapshot_, LocalTID local_tid_, UUID host_id) + : tid({snapshot_, local_tid_, host_id}) + , snapshot(snapshot_) + , state(RUNNING) +{ +} + + +} diff --git a/src/Interpreters/MergeTreeTransaction.h b/src/Interpreters/MergeTreeTransaction.h new file mode 100644 index 00000000000..95c4a81c756 --- /dev/null +++ b/src/Interpreters/MergeTreeTransaction.h @@ -0,0 +1,35 @@ +#pragma once +#include + +namespace DB +{ + +class MergeTreeTransaction +{ + friend class TransactionLog; +public: + enum State + { + RUNNING, + COMMITTED, + ROLLED_BACK, + }; + + Snapshot getSnapshot() const { return snapshot; } + State getState() const { return state; } + + const TransactionID tid; + + MergeTreeTransaction() = delete; + MergeTreeTransaction(Snapshot snapshot_, LocalTID local_tid_, UUID host_id); + +private: + Snapshot snapshot; + State state; + + CSN csn = Tx::UnknownCSN; +}; + +using MergeTreeTransactionPtr = std::shared_ptr; + +} diff --git a/src/Interpreters/QueryLog.cpp b/src/Interpreters/QueryLog.cpp index b6902468242..cee1d2b5412 100644 --- a/src/Interpreters/QueryLog.cpp +++ b/src/Interpreters/QueryLog.cpp @@ -108,7 +108,9 @@ Block QueryLogElement::createBlock() {std::make_shared(std::make_shared()), "used_formats"}, {std::make_shared(std::make_shared()), "used_functions"}, {std::make_shared(std::make_shared()), "used_storages"}, - {std::make_shared(std::make_shared()), "used_table_functions"} + {std::make_shared(std::make_shared()), "used_table_functions"}, + + {TransactionID::getDataType(), "transaction_id"} }; } @@ -237,6 +239,8 @@ void QueryLogElement::appendToBlock(MutableColumns & columns) const fill_column(used_storages, column_storage_factory_objects); fill_column(used_table_functions, column_table_function_factory_objects); } + + columns[i++]->insert(Tuple{tid.start_csn, tid.local_tid, tid.host_id}); } void QueryLogElement::appendClientInfo(const ClientInfo & client_info, MutableColumns & columns, size_t & i) diff --git a/src/Interpreters/QueryLog.h b/src/Interpreters/QueryLog.h index 8617a8d1cbc..7532f2d14f2 100644 --- a/src/Interpreters/QueryLog.h +++ b/src/Interpreters/QueryLog.h @@ -2,6 +2,7 @@ #include #include +#include namespace ProfileEvents @@ -80,6 +81,8 @@ struct QueryLogElement std::shared_ptr profile_counters; std::shared_ptr query_settings; + TransactionID tid; + static std::string name() { return "QueryLog"; } static Block createBlock(); diff --git a/src/Interpreters/TransactionLog.cpp b/src/Interpreters/TransactionLog.cpp new file mode 100644 index 00000000000..00dbb3352b4 --- /dev/null +++ b/src/Interpreters/TransactionLog.cpp @@ -0,0 +1,44 @@ +#include + +namespace DB +{ + +TransactionLog & TransactionLog::instance() +{ + static TransactionLog inst; + return inst; +} + +TransactionLog::TransactionLog() +{ + csn_counter = 1; + local_tid_counter = 1; +} + +Snapshot TransactionLog::getLatestSnapshot() const +{ + return csn_counter.load(); +} + +MergeTreeTransactionPtr TransactionLog::beginTransaction() +{ + Snapshot snapshot = csn_counter.load(); + LocalTID ltid = 1 + local_tid_counter.fetch_add(1); + return std::make_shared(snapshot, ltid, UUIDHelpers::Nil); +} + +CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn) +{ + txn->csn = 1 + csn_counter.fetch_add(1); + /// TODO Transactions: reset local_tid_counter + txn->state = MergeTreeTransaction::COMMITTED; + return txn->csn; +} + +void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn) +{ + txn->csn = Tx::RolledBackCSN; + txn->state = MergeTreeTransaction::ROLLED_BACK; +} + +} diff --git a/src/Interpreters/TransactionLog.h b/src/Interpreters/TransactionLog.h new file mode 100644 index 00000000000..073d2a2929d --- /dev/null +++ b/src/Interpreters/TransactionLog.h @@ -0,0 +1,30 @@ +#pragma once +#include +#include +#include + +namespace DB +{ + +class TransactionLog final : private boost::noncopyable +{ +public: + static TransactionLog & instance(); + + TransactionLog(); + + Snapshot getLatestSnapshot() const; + + /// Allocated TID, returns transaction object + MergeTreeTransactionPtr beginTransaction(); + + CSN commitTransaction(const MergeTreeTransactionPtr & txn); + + void rollbackTransaction(const MergeTreeTransactionPtr & txn); + +private: + std::atomic csn_counter; + std::atomic local_tid_counter; +}; + +} diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index a5c21405ff1..a00f8301622 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -50,6 +50,7 @@ #include #include #include +#include #include #include @@ -266,6 +267,9 @@ static void onExceptionBeforeStart(const String & query_for_logging, Context & c if (elem.log_comment.size() > settings.max_query_size) elem.log_comment.resize(settings.max_query_size); + if (auto txn = context.getCurrentTransaction()) + elem.tid = txn->tid; + if (settings.calculate_text_stack_trace) setExceptionStackTrace(elem); logException(context, elem); @@ -634,6 +638,9 @@ static std::tuple executeQueryImpl( elem.client_info = context.getClientInfo(); + if (auto txn = context.getCurrentTransaction()) + elem.tid = txn->tid; + bool log_queries = settings.log_queries && !internal; /// Log into system table start of query execution, if need. diff --git a/src/Parsers/ASTTransactionControl.cpp b/src/Parsers/ASTTransactionControl.cpp new file mode 100644 index 00000000000..d12c9d6d6e4 --- /dev/null +++ b/src/Parsers/ASTTransactionControl.cpp @@ -0,0 +1,34 @@ +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + +void ASTTransactionControl::formatImpl(const FormatSettings & format /*state*/, FormatState &, FormatStateStacked /*frame*/) const +{ + switch (action) + { + case BEGIN: + format.ostr << (format.hilite ? hilite_keyword : "") << "BEGIN TRANSACTION" << (format.hilite ? hilite_none : ""); + break; + case COMMIT: + format.ostr << (format.hilite ? hilite_keyword : "") << "COMMIT" << (format.hilite ? hilite_none : ""); + break; + case ROLLBACK: + format.ostr << (format.hilite ? hilite_keyword : "") << "ROLLBACK" << (format.hilite ? hilite_none : ""); + break; + } +} + +void ASTTransactionControl::updateTreeHashImpl(SipHash & hash_state) const +{ + hash_state.update(action); +} + +} diff --git a/src/Parsers/ASTTransactionControl.h b/src/Parsers/ASTTransactionControl.h new file mode 100644 index 00000000000..c01c4172627 --- /dev/null +++ b/src/Parsers/ASTTransactionControl.h @@ -0,0 +1,29 @@ +#pragma once +#include + +namespace DB +{ + +/// Common AST for TCL queries +class ASTTransactionControl : public IAST +{ +public: + enum QueryType + { + BEGIN, + COMMIT, + ROLLBACK, + }; + + QueryType action; + + ASTTransactionControl(QueryType action_) : action(action_) {} + + String getID(char /*delimiter*/) const override { return "ASTTransactionControl"; } + ASTPtr clone() const override { return std::make_shared(*this); } + + void formatImpl(const FormatSettings & format, FormatState & /*state*/, FormatStateStacked /*frame*/) const override; + void updateTreeHashImpl(SipHash & hash_state) const override; +}; + +} diff --git a/src/Parsers/ParserQuery.cpp b/src/Parsers/ParserQuery.cpp index 4550bdc8a75..f9c11f1c870 100644 --- a/src/Parsers/ParserQuery.cpp +++ b/src/Parsers/ParserQuery.cpp @@ -18,6 +18,7 @@ #include #include #include +#include namespace DB @@ -40,6 +41,7 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) ParserGrantQuery grant_p; ParserSetRoleQuery set_role_p; ParserExternalDDLQuery external_ddl_p; + ParserTransactionControl transaction_control_p; bool res = query_with_output_p.parse(pos, node, expected) || insert_p.parse(pos, node, expected) @@ -54,7 +56,8 @@ bool ParserQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) || create_settings_profile_p.parse(pos, node, expected) || drop_access_entity_p.parse(pos, node, expected) || grant_p.parse(pos, node, expected) - || external_ddl_p.parse(pos, node, expected); + || external_ddl_p.parse(pos, node, expected) + || transaction_control_p.parse(pos, node, expected); return res; } diff --git a/src/Parsers/ParserTransactionControl.cpp b/src/Parsers/ParserTransactionControl.cpp new file mode 100644 index 00000000000..a5591a0447e --- /dev/null +++ b/src/Parsers/ParserTransactionControl.cpp @@ -0,0 +1,25 @@ +#include +#include +#include + +namespace DB +{ + +bool ParserTransactionControl::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) +{ + ASTTransactionControl::QueryType action; + + if (ParserKeyword("BEGIN TRANSACTION").ignore(pos, expected)) + action = ASTTransactionControl::BEGIN; + else if (ParserKeyword("COMMIT").ignore(pos, expected)) + action = ASTTransactionControl::COMMIT; + else if (ParserKeyword("ROLLBACK").ignore(pos, expected)) + action = ASTTransactionControl::ROLLBACK; + else + return false; + + node = std::make_shared(action); + return true; +} + +} diff --git a/src/Parsers/ParserTransactionControl.h b/src/Parsers/ParserTransactionControl.h new file mode 100644 index 00000000000..157c088624c --- /dev/null +++ b/src/Parsers/ParserTransactionControl.h @@ -0,0 +1,14 @@ +#pragma once +#include + +namespace DB +{ + +class ParserTransactionControl : public IParserBase +{ +public: + const char * getName() const override { return "TCL query"; } + bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; +}; + +} diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 03f6564788a..c990bdcc65b 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -16,6 +16,7 @@ #include #include #include +#include #include @@ -301,6 +302,19 @@ public: CompressionCodecPtr default_codec; + struct VersionMetadata + { + TransactionID mintid = Tx::EmptyTID; + TransactionID maxtid = Tx::EmptyTID; + + bool maybe_visible = false; + + CSN mincsn = Tx::UnknownCSN; + CSN maxcsn = Tx::UnknownCSN; + }; + + mutable VersionMetadata versions; + /// For data in RAM ('index') UInt64 getIndexSizeInBytes() const; UInt64 getIndexSizeInAllocatedBytes() const; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index d61de13b604..7610239662e 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -993,6 +993,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) { (*it)->remove_time.store((*it)->modification_time, std::memory_order_relaxed); modifyPartState(it, DataPartState::Outdated); + (*it)->versions.maxtid = Tx::PrehistoricTID; + (*it)->versions.maxcsn = Tx::PrehistoricCSN; removePartContributionToDataVolume(*it); }; @@ -1000,6 +1002,14 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) while (curr_jt != data_parts_by_state_and_info.end() && (*curr_jt)->getState() == DataPartState::Committed) { + /// We do not have version metadata and transactions history for old parts, + /// so let's consider that such parts were created by some ancient transaction + /// and were committed with some prehistoric CSN. + /// TODO Transactions: distinguish "prehistoric" parts from uncommitted parts in case of hard restart + (*curr_jt)->versions.mintid = Tx::PrehistoricTID; + (*curr_jt)->versions.mincsn = Tx::PrehistoricCSN; + (*curr_jt)->versions.maybe_visible = true; + /// Don't consider data parts belonging to different partitions. if ((*curr_jt)->info.partition_id != (*prev_jt)->info.partition_id) { @@ -1030,7 +1040,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) calculateColumnSizesImpl(); - LOG_DEBUG(log, "Loaded data parts ({} items)", data_parts_indexes.size()); } diff --git a/src/Storages/System/StorageSystemParts.cpp b/src/Storages/System/StorageSystemParts.cpp index 6a643dbe1b9..f100ecdfa9b 100644 --- a/src/Storages/System/StorageSystemParts.cpp +++ b/src/Storages/System/StorageSystemParts.cpp @@ -12,6 +12,7 @@ #include #include #include +#include namespace DB { @@ -75,7 +76,12 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_) {"rows_where_ttl_info.expression", std::make_shared(std::make_shared())}, {"rows_where_ttl_info.min", std::make_shared(std::make_shared())}, - {"rows_where_ttl_info.max", std::make_shared(std::make_shared())} + {"rows_where_ttl_info.max", std::make_shared(std::make_shared())}, + + {"mintid", TransactionID::getDataType()}, + {"maxtid", TransactionID::getDataType()}, + {"mincsn", std::make_shared()}, + {"maxcsn", std::make_shared()}, } ) { @@ -257,6 +263,20 @@ void StorageSystemParts::processNextStorage( /// Do not use part->getState*, it can be changed from different thread if (has_state_column) columns[res_index++]->insert(IMergeTreeDataPart::stateToString(part_state)); + + auto get_tid_as_field = [](const TransactionID & tid) -> Field + { + return Tuple{tid.start_csn, tid.local_tid, tid.host_id}; + }; + + if (columns_mask[src_index++]) + columns[res_index++]->insert(get_tid_as_field(part->versions.mintid)); + if (columns_mask[src_index++]) + columns[res_index++]->insert(get_tid_as_field(part->versions.maxtid)); + if (columns_mask[src_index++]) + columns[res_index++]->insert(part->versions.mincsn); + if (columns_mask[src_index++]) + columns[res_index++]->insert(part->versions.maxcsn); } }