add kill transaction query

This commit is contained in:
Alexander Tokmakov 2022-03-10 22:29:58 +01:00
parent 061fa6a6f2
commit 7f47f20aba
15 changed files with 204 additions and 28 deletions

View File

@ -102,6 +102,7 @@ enum class AccessType
\
M(KILL_QUERY, "", GLOBAL, ALL) /* allows to kill a query started by another user
(anyone can kill his own queries) */\
M(KILL_TRANSACTION, "", GLOBAL, ALL) \
\
M(MOVE_PARTITION_BETWEEN_SHARDS, "", GLOBAL, ALL) /* required to be able to move a part/partition to a table
identified by its ZooKeeper path */\

View File

@ -7,6 +7,7 @@
#include <Interpreters/executeQuery.h>
#include <Interpreters/CancellationCode.h>
#include <Interpreters/InterpreterAlterQuery.h>
#include <Interpreters/TransactionLog.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ParserAlterQuery.h>
#include <Parsers/parseQuery.h>
@ -358,6 +359,49 @@ BlockIO InterpreterKillQueryQuery::execute()
break;
}
case ASTKillQueryQuery::Type::Transaction:
{
getContext()->checkAccess(AccessType::KILL_TRANSACTION);
Block transactions_block = getSelectResult("tid, tid_hash, elapsed, is_readonly, state", "system.transactions");
if (!transactions_block)
return res_io;
const ColumnUInt64 & tid_hash_col = typeid_cast<const ColumnUInt64 &>(*transactions_block.getByName("tid_hash").column);
auto header = transactions_block.cloneEmpty();
header.insert(0, {ColumnString::create(), std::make_shared<DataTypeString>(), "kill_status"});
MutableColumns res_columns = header.cloneEmptyColumns();
for (size_t i = 0; i < transactions_block.rows(); ++i)
{
UInt64 tid_hash = tid_hash_col.getUInt(i);
CancellationCode code = CancellationCode::Unknown;
if (!query.test)
{
auto txn = TransactionLog::instance().tryGetRunningTransaction(tid_hash);
if (txn)
{
txn->onException();
if (txn->getState() == MergeTreeTransaction::ROLLED_BACK)
code = CancellationCode::CancelSent;
else
code = CancellationCode::CancelCannotBeSent;
}
else
{
code = CancellationCode::NotFound;
}
}
insertResultRow(i, code, transactions_block, header, res_columns);
}
res_io.pipeline = QueryPipeline(Pipe(std::make_shared<SourceFromSingleChunk>(header.cloneWithColumns(std::move(res_columns)))));
break;
}
}
return res_io;

View File

@ -30,6 +30,15 @@ MergeTreeTransaction::State MergeTreeTransaction::getState() const
return COMMITTED;
}
void MergeTreeTransaction::checkIsNotCancelled() const
{
CSN c = csn.load();
if (c == Tx::RolledBackCSN)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction was cancelled");
else if (c != Tx::UnknownCSN)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CSN state: {}", c);
}
void MergeTreeTransaction::addNewPart(const StoragePtr & storage, const DataPartPtr & new_part, MergeTreeTransaction * txn)
{
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
@ -79,58 +88,61 @@ void MergeTreeTransaction::addNewPartAndRemoveCovered(const StoragePtr & storage
void MergeTreeTransaction::addNewPart(const StoragePtr & storage, const DataPartPtr & new_part)
{
CSN c = csn.load();
if (c == Tx::RolledBackCSN)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction was cancelled");
else if (c != Tx::UnknownCSN)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CSN state: {}", c);
std::lock_guard lock{mutex};
checkIsNotCancelled();
storages.insert(storage);
creating_parts.push_back(new_part);
}
void MergeTreeTransaction::removeOldPart(const StoragePtr & storage, const DataPartPtr & part_to_remove, const TransactionInfoContext & context)
{
CSN c = csn.load();
if (c == Tx::RolledBackCSN)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction was cancelled");
else if (c != Tx::UnknownCSN)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CSN state: {}", c);
{
std::lock_guard lock{mutex};
checkIsNotCancelled();
LockMemoryExceptionInThread lock_memory_tracker(VariableContext::Global);
part_to_remove->version.lockMaxTID(tid, context);
storages.insert(storage);
removing_parts.push_back(part_to_remove);
}
part_to_remove->appendRemovalTIDToVersionMetadata();
}
void MergeTreeTransaction::addMutation(const StoragePtr & table, const String & mutation_id)
{
std::lock_guard lock{mutex};
checkIsNotCancelled();
storages.insert(table);
mutations.emplace_back(table, mutation_id);
}
bool MergeTreeTransaction::isReadOnly() const
{
std::lock_guard lock{mutex};
assert((creating_parts.empty() && removing_parts.empty() && mutations.empty()) == storages.empty());
return storages.empty();
}
void MergeTreeTransaction::beforeCommit()
{
for (const auto & table_and_mutation : mutations)
table_and_mutation.first->waitForMutation(table_and_mutation.second);
CSN expected = Tx::UnknownCSN;
bool can_commit = csn.compare_exchange_strong(expected, Tx::CommittingCSN);
if (can_commit)
return;
if (!can_commit)
{
if (expected == Tx::RolledBackCSN)
throw Exception(ErrorCodes::INVALID_TRANSACTION, "Transaction was cancelled");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected CSN state: {}", expected);
}
RunningMutationsList mutations_to_wait;
{
std::lock_guard lock{mutex};
mutations_to_wait = mutations;
}
for (const auto & table_and_mutation : mutations_to_wait)
table_and_mutation.first->waitForMutation(table_and_mutation.second);
}
void MergeTreeTransaction::afterCommit(CSN assigned_csn) noexcept
@ -195,6 +207,8 @@ String MergeTreeTransaction::dumpDescription() const
return res;
}
std::lock_guard lock{mutex};
res += fmt::format(", affects {} tables:", storages.size());
using ChangesInTable = std::tuple<Strings, Strings, Strings>;

View File

@ -2,6 +2,7 @@
#include <Interpreters/TransactionVersionMetadata.h>
#include <boost/noncopyable.hpp>
#include <Storages/IStorage_fwd.h>
#include <Common/Stopwatch.h>
#include <list>
#include <unordered_set>
@ -46,10 +47,16 @@ public:
String dumpDescription() const;
Float64 elapsedSeconds() const { return elapsed.elapsedSeconds(); }
private:
void beforeCommit();
void afterCommit(CSN assigned_csn) noexcept;
bool rollback() noexcept;
void checkIsNotCancelled() const;
mutable std::mutex mutex;
Stopwatch elapsed;
Snapshot snapshot;
@ -61,7 +68,8 @@ private:
std::list<Snapshot>::iterator snapshot_in_use_it;
std::vector<std::pair<StoragePtr, String>> mutations;
using RunningMutationsList = std::vector<std::pair<StoragePtr, String>>;
RunningMutationsList mutations;
};
using MergeTreeTransactionPtr = std::shared_ptr<MergeTreeTransaction>;

View File

@ -6,6 +6,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
MergeTreeTransactionHolder::MergeTreeTransactionHolder(const MergeTreeTransactionPtr & txn_, bool autocommit_ = false, const Context * owned_by_session_context_)
: txn(txn_)
, autocommit(autocommit_)

View File

@ -372,4 +372,10 @@ Snapshot TransactionLog::getOldestSnapshot() const
return snapshots_in_use.front();
}
TransactionLog::TransactionsList TransactionLog::getTransactionsList() const
{
std::lock_guard lock{running_list_mutex};
return running_list;
}
}

View File

@ -75,6 +75,9 @@ public:
MergeTreeTransactionPtr tryGetRunningTransaction(const TIDHash & tid);
using TransactionsList = std::unordered_map<TIDHash, MergeTreeTransactionPtr>;
TransactionsList getTransactionsList() const;
private:
void loadLogFromZooKeeper();
void runUpdatingThread();
@ -100,7 +103,7 @@ private:
std::unordered_map<TIDHash, CSN> tid_to_csn;
mutable std::mutex running_list_mutex;
std::unordered_map<TIDHash, MergeTreeTransactionPtr> running_list;
TransactionsList running_list;
std::list<Snapshot> snapshots_in_use;
String zookeeper_path;

View File

@ -24,6 +24,9 @@ void ASTKillQueryQuery::formatQueryImpl(const FormatSettings & settings, FormatS
case Type::PartMoveToShard:
settings.ostr << "PART_MOVE_TO_SHARD";
break;
case Type::Transaction:
settings.ostr << "TRANSACTION";
break;
}
formatOnCluster(settings);

View File

@ -14,6 +14,7 @@ public:
Query, /// KILL QUERY
Mutation, /// KILL MUTATION
PartMoveToShard, /// KILL PART_MOVE_TO_SHARD
Transaction, /// KILL TRANSACTION
};
Type type = Type::Query;

View File

@ -18,6 +18,7 @@ bool ParserKillQueryQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
ParserKeyword p_query{"QUERY"};
ParserKeyword p_mutation{"MUTATION"};
ParserKeyword p_part_move_to_shard{"PART_MOVE_TO_SHARD"};
ParserKeyword p_transaction{"TRANSACTION"};
ParserKeyword p_on{"ON"};
ParserKeyword p_test{"TEST"};
ParserKeyword p_sync{"SYNC"};
@ -34,6 +35,8 @@ bool ParserKillQueryQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
query->type = ASTKillQueryQuery::Type::Mutation;
else if (p_part_move_to_shard.ignore(pos, expected))
query->type = ASTKillQueryQuery::Type::PartMoveToShard;
else if (p_transaction.ignore(pos, expected))
query->type = ASTKillQueryQuery::Type::Transaction;
else
return false;

View File

@ -0,0 +1,49 @@
#include <Storages/System/StorageSystemTransactions.h>
#include <Interpreters/TransactionLog.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeEnum.h>
#include <Interpreters/Context.h>
namespace DB
{
static DataTypePtr getStateEnumType()
{
return std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
{"RUNNING", static_cast<Int8>(MergeTreeTransaction::State::RUNNING)},
{"COMMITTED", static_cast<Int8>(MergeTreeTransaction::State::COMMITTED)},
{"ROLLED_BACK", static_cast<Int8>(MergeTreeTransaction::State::ROLLED_BACK)},
});
}
NamesAndTypesList StorageSystemTransactions::getNamesAndTypes()
{
return {
{"tid", getTransactionIDDataType()},
{"tid_hash", std::make_shared<DataTypeUInt64>()},
{"elapsed", std::make_shared<DataTypeFloat64>()},
{"is_readonly", std::make_shared<DataTypeUInt8>()},
{"state", getStateEnumType()},
};
}
void StorageSystemTransactions::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
{
auto list = TransactionLog::instance().getTransactionsList();
for (const auto & elem : list)
{
auto txn = elem.second;
size_t i = 0;
res_columns[i++]->insert(Tuple{txn->tid.start_csn, txn->tid.local_tid, txn->tid.host_id});
res_columns[i++]->insert(txn->tid.getHash());
res_columns[i++]->insert(txn->elapsedSeconds());
res_columns[i++]->insert(txn->isReadOnly());
res_columns[i++]->insert(txn->getState());
}
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <base/shared_ptr_helper.h>
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
class Context;
class StorageSystemTransactions final : public shared_ptr_helper<StorageSystemTransactions>, public IStorageSystemOneBlock<StorageSystemTransactions>
{
friend struct shared_ptr_helper<StorageSystemTransactions>;
public:
String getName() const override { return "SystemTransactions"; }
static NamesAndTypesList getNamesAndTypes();
static NamesAndAliases getNamesAndAliases() { return {}; }
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
};
}

View File

@ -68,6 +68,7 @@
#include <Storages/System/StorageSystemUserDirectories.h>
#include <Storages/System/StorageSystemPrivileges.h>
#include <Storages/System/StorageSystemAsynchronousInserts.h>
#include <Storages/System/StorageSystemTransactions.h>
#ifdef OS_LINUX
#include <Storages/System/StorageSystemStackTrace.h>
@ -162,6 +163,9 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b
if (has_zookeeper)
attach<StorageSystemZooKeeper>(context, system_database, "zookeeper");
if (context->getConfigRef().getInt("_enable_experimental_mvcc_prototype_test_helper_dev", 0) == 42)
attach<StorageSystemTransactions>(context, system_database, "transactions");
}
void attachSystemTablesAsync(ContextPtr context, IDatabase & system_database, AsynchronousMetrics & async_metrics)

View File

@ -31,7 +31,8 @@ tx11 9 21 all_1_14_1_17
tx11 9 41 all_1_14_1_17
tx11 9 61 all_1_14_1_17
tx11 9 81 all_1_14_1_17
tx13 10 22 all_1_14_1_18
tx13 10 42 all_1_14_1_18
tx13 10 62 all_1_14_1_18
tx13 10 82 all_1_14_1_18
1 1 RUNNING
tx14 10 22 all_1_14_1_18
tx14 10 42 all_1_14_1_18
tx14 10 62 all_1_14_1_18
tx14 10 82 all_1_14_1_18

View File

@ -81,6 +81,13 @@ tx_wait 11
tx_wait 12
tx 13 "begin transaction"
tx 13 "select 10, n, _part from mt order by n"
tid_to_kill=$(tx 13 "select transactionID()" | grep -Po "\(.*")
$CLICKHOUSE_CLIENT -q "select count(), any(is_readonly), any(state) from system.transactions where tid=$tid_to_kill"
tx_async 13 "alter table mt update n = 0 where 1" >/dev/null
$CLICKHOUSE_CLIENT -q "kill transaction where tid=$tid_to_kill format Null"
tx_sync 13 "rollback"
tx 14 "begin transaction"
tx 14 "select 10, n, _part from mt order by n"
$CLICKHOUSE_CLIENT --database_atomic_wait_for_drop_and_detach_synchronously=0 -q "drop table mt"