add EXPLAIN CURRENT TRANSACTION

This commit is contained in:
Alexander Tokmakov 2022-02-14 22:47:17 +03:00
parent 07e66e690d
commit cbd3b45646
15 changed files with 44 additions and 16 deletions

View File

@ -9,6 +9,7 @@
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/TableOverrideUtils.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <Formats/FormatFactory.h>
#include <Parsers/DumpASTNode.h>
#include <Parsers/queryToString.h>
@ -387,6 +388,23 @@ QueryPipeline InterpreterExplainQuery::executeImpl()
override_info.appendTo(buf);
break;
}
case ASTExplainQuery::CurrentTransaction:
{
if (ast.getSettings())
throw Exception("Settings are not supported for EXPLAIN CURRENT TRANSACTION query.", ErrorCodes::UNKNOWN_SETTING);
if (auto txn = getContext()->getCurrentTransaction())
{
String dump = txn->dumpDescription();
buf.write(dump.data(), dump.size());
}
else
{
writeCString("<no current transaction>", buf);
}
break;
}
}
if (insert_buf)
{

View File

@ -17,6 +17,8 @@ public:
static Block getSampleBlock(ASTExplainQuery::ExplainKind kind);
bool supportsTransactions() const override { return true; }
private:
ASTPtr query;

View File

@ -277,7 +277,7 @@ CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn)
}
else
{
LOG_TEST(log, "Committing transaction {}{}", txn->tid, txn->dumpDescription());
LOG_TEST(log, "Committing transaction {}", txn->dumpDescription());
/// TODO handle connection loss
/// TODO support batching
auto current_zookeeper = getZooKeeper();

View File

@ -145,11 +145,9 @@ bool VersionMetadata::isMaxTIDLocked() const
void VersionMetadata::setCreationTID(const TransactionID & tid, const TransactionInfoContext & context)
{
/// TODO Transactions: initialize it in constructor on part creation and remove this method
/// FIXME ReplicatedMergeTreeBlockOutputStream may add one part multiple times
/// NOTE ReplicatedMergeTreeBlockOutputStream may add one part multiple times
assert(creation_tid.isEmpty() || creation_tid == tid);
const_cast<TransactionID &>(creation_tid) = tid;
creation_tid = tid;
tryWriteEventToSystemLog(log, TransactionsInfoLogElement::ADD_PART, tid, context);
}

View File

@ -26,6 +26,7 @@
#include <Parsers/ASTShowProcesslistQuery.h>
#include <Parsers/ASTWatchQuery.h>
#include <Parsers/ASTTransactionControl.h>
#include <Parsers/ASTExplainQuery.h>
#include <Parsers/Lexer.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ParserQuery.h>
@ -440,7 +441,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (auto txn = context->getCurrentTransaction())
{
assert(txn->getState() != MergeTreeTransaction::COMMITTED);
if (txn->getState() == MergeTreeTransaction::ROLLED_BACK && !ast->as<ASTTransactionControl>())
if (txn->getState() == MergeTreeTransaction::ROLLED_BACK && !ast->as<ASTTransactionControl>() && !ast->as<ASTExplainQuery>())
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Cannot execute query: transaction is rolled back");
}

View File

@ -19,6 +19,7 @@ public:
QueryPipeline, /// 'EXPLAIN PIPELINE ...'
QueryEstimates, /// 'EXPLAIN ESTIMATE ...'
TableOverride, /// 'EXPLAIN TABLE OVERRIDE ...'
CurrentTransaction, /// 'EXPLAIN CURRENT TRANSACTION'
};
explicit ASTExplainQuery(ExplainKind kind_) : kind(kind_) {}
@ -111,6 +112,7 @@ private:
case QueryPipeline: return "EXPLAIN PIPELINE";
case QueryEstimates: return "EXPLAIN ESTIMATE";
case TableOverride: return "EXPLAIN TABLE OVERRIDE";
case CurrentTransaction: return "EXPLAIN CURRENT TRANSACTION";
}
__builtin_unreachable();

View File

@ -22,6 +22,7 @@ bool ParserExplainQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_plan("PLAN");
ParserKeyword s_estimates("ESTIMATE");
ParserKeyword s_table_override("TABLE OVERRIDE");
ParserKeyword s_current_transaction("CURRENT TRANSACTION");
if (s_explain.ignore(pos, expected))
{
@ -39,6 +40,8 @@ bool ParserExplainQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
kind = ASTExplainQuery::ExplainKind::QueryEstimates; //-V1048
else if (s_table_override.ignore(pos, expected))
kind = ASTExplainQuery::ExplainKind::TableOverride;
else if (s_current_transaction.ignore(pos, expected))
kind = ASTExplainQuery::ExplainKind::CurrentTransaction;
}
else
return false;
@ -79,6 +82,10 @@ bool ParserExplainQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
explain_query->setTableFunction(table_function);
explain_query->setTableOverride(table_override);
}
else if (kind == ASTExplainQuery::ExplainKind::CurrentTransaction)
{
/// Nothing to parse
}
else if (select_p.parse(pos, query, expected) ||
create_p.parse(pos, query, expected) ||
insert_p.parse(pos, query, expected))

View File

@ -160,6 +160,7 @@ public:
/// Returns true if the storage supports transactions for SELECT, INSERT and ALTER queries.
/// Storage may throw an exception later if some query kind is not fully supported.
/// This method can return true for readonly engines that return the same rows for reading (such as SystemNumbers)
virtual bool supportsTransactions() const { return false; }
/// Requires squashing small blocks to large for optimal storage.

View File

@ -193,7 +193,7 @@ std::pair<bool, ReplicatedMergeMutateTaskBase::PartLogWriter> MergeFromLogEntryT
settings.memory_profiler_sample_probability,
settings.max_untracked_memory);
transaction_ptr = std::make_unique<MergeTreeData::Transaction>(storage, nullptr); //FIXME
transaction_ptr = std::make_unique<MergeTreeData::Transaction>(storage, nullptr);
stopwatch_ptr = std::make_unique<Stopwatch>();
merge_task = storage.merger_mutator.mergePartsToTemporaryPart(
@ -230,7 +230,6 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite
/// Task is not needed
merge_task.reset();
//FIXME
storage.merger_mutator.renameMergedTemporaryPart(part, parts, nullptr, transaction_ptr.get());
try

View File

@ -39,7 +39,6 @@ struct MergeMutateSelectedEntry
FutureMergedMutatedPartPtr future_part;
CurrentlyMergingPartsTaggerPtr tagger;
MutationCommandsConstPtr commands;
//TransactionID mutation_tid;
MergeTreeTransactionPtr txn;
MergeMutateSelectedEntry(FutureMergedMutatedPartPtr future_part_, CurrentlyMergingPartsTaggerPtr tagger_,
MutationCommandsConstPtr commands_, const MergeTreeTransactionPtr & txn_ = nullptr)

View File

@ -63,7 +63,7 @@ std::pair<bool, ReplicatedMergeMutateTaskBase::PartLogWriter> MutateFromLogEntry
RWLockImpl::NO_QUERY, storage_settings_ptr->lock_acquire_timeout_for_background_operations);
StorageMetadataPtr metadata_snapshot = storage.getInMemoryMetadataPtr();
transaction_ptr = std::make_unique<MergeTreeData::Transaction>(storage, nullptr); //FIXME
transaction_ptr = std::make_unique<MergeTreeData::Transaction>(storage, nullptr);
future_mutated_part = std::make_shared<FutureMergedMutatedPart>();
future_mutated_part->name = entry.new_part_name;
@ -108,7 +108,6 @@ bool MutateFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrit
{
new_part = mutate_task->getFuture().get();
//FIXME
storage.renameTempPartAndReplace(new_part, nullptr, nullptr, transaction_ptr.get());
try

View File

@ -24,6 +24,7 @@ public:
size_t max_block_size,
unsigned num_streams) override;
bool supportsTransactions() const override { return true; }
private:
UInt64 max_array_length = 10;
UInt64 max_string_length = 10;

View File

@ -84,10 +84,10 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_)
{"projections", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"visible", std::make_shared<DataTypeUInt8>()},
{"creation_tid", getTransactionIDDataType()},
{"removal_tid", getTransactionIDDataType()},
{"creation_csn", std::make_shared<DataTypeUInt64>()},
{"removal_csn", std::make_shared<DataTypeUInt64>()},
{"creation_tid", getTransactionIDDataType()},
{"removal_tid", getTransactionIDDataType()},
{"creation_csn", std::make_shared<DataTypeUInt64>()},
{"removal_csn", std::make_shared<DataTypeUInt64>()},
}
)
{

View File

@ -31,6 +31,7 @@ public:
bool hasEvenlyDistributedRead() const override { return true; }
bool isSystemStorage() const override { return true; }
bool supportsTransactions() const override { return true; }
private:
bool multithreaded;

View File

@ -45,4 +45,4 @@ from system.transactions_info_log
where tid in (select tid from system.transactions_info_log where database=currentDatabase() and table='txn_counters' and not (tid.1=1 and tid.2=1))
or (database=currentDatabase() and table='txn_counters') order by event_time;
--drop table txn_counters;
drop table txn_counters;