fix an assertion with implicit transactions in interserver mode

This commit is contained in:
Alexander Tokmakov 2023-03-07 20:17:09 +01:00
parent e23f624968
commit debd69f03a
3 changed files with 38 additions and 37 deletions

View File

@ -451,10 +451,24 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// Avoid early destruction of process_list_entry if it was not saved to `res` yet (in case of exception) /// Avoid early destruction of process_list_entry if it was not saved to `res` yet (in case of exception)
ProcessList::EntryPtr process_list_entry; ProcessList::EntryPtr process_list_entry;
BlockIO res; BlockIO res;
std::shared_ptr<InterpreterTransactionControlQuery> implicit_txn_control{}; auto implicit_txn_control = std::make_shared<bool>(false);
String query_database; String query_database;
String query_table; String query_table;
auto execute_implicit_tcl_query = [implicit_txn_control](const ContextMutablePtr & query_context, ASTTransactionControl::QueryType tcl_type)
{
/// Unset the flag on COMMIT and ROLLBACK
SCOPE_EXIT({ if (tcl_type != ASTTransactionControl::BEGIN) *implicit_txn_control = false; });
ASTPtr tcl_ast = std::make_shared<ASTTransactionControl>(tcl_type);
InterpreterTransactionControlQuery tc(tcl_ast, query_context);
tc.execute();
/// Set the flag after successful BIGIN
if (tcl_type == ASTTransactionControl::BEGIN)
*implicit_txn_control = true;
};
try try
{ {
if (auto txn = context->getCurrentTransaction()) if (auto txn = context->getCurrentTransaction())
@ -674,14 +688,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (context->isGlobalContext()) if (context->isGlobalContext())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot create transactions"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context cannot create transactions");
/// If there is no session (which is the default for the HTTP Handler), set up one just for this as it is necessary execute_implicit_tcl_query(context, ASTTransactionControl::BEGIN);
/// to control the transaction lifetime
if (!context->hasSessionContext())
context->makeSessionContext();
auto tc = std::make_shared<InterpreterTransactionControlQuery>(ast, context);
tc->executeBegin(context->getSessionContext());
implicit_txn_control = std::move(tc);
} }
catch (Exception & e) catch (Exception & e)
{ {
@ -949,6 +956,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
log_processors_profiles = settings.log_processors_profiles, log_processors_profiles = settings.log_processors_profiles,
status_info_to_query_log, status_info_to_query_log,
implicit_txn_control, implicit_txn_control,
execute_implicit_tcl_query,
pulling_pipeline = pipeline.pulling(), pulling_pipeline = pipeline.pulling(),
query_span](QueryPipeline & query_pipeline) mutable query_span](QueryPipeline & query_pipeline) mutable
{ {
@ -1062,21 +1070,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
} }
} }
if (implicit_txn_control) if (*implicit_txn_control)
{ execute_implicit_tcl_query(context, ASTTransactionControl::COMMIT);
try
{
implicit_txn_control->executeCommit(context->getSessionContext());
implicit_txn_control.reset();
}
catch (const Exception &)
{
/// An exception might happen when trying to commit the transaction. For example we might get an immediate exception
/// because ZK is down and wait_changes_become_visible_after_commit_mode == WAIT_UNKNOWN
implicit_txn_control.reset();
throw;
}
}
} }
if (query_span) if (query_span)
@ -1104,13 +1099,11 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
quota(quota), quota(quota),
status_info_to_query_log, status_info_to_query_log,
implicit_txn_control, implicit_txn_control,
execute_implicit_tcl_query,
query_span](bool log_error) mutable query_span](bool log_error) mutable
{ {
if (implicit_txn_control) if (*implicit_txn_control)
{ execute_implicit_tcl_query(context, ASTTransactionControl::ROLLBACK);
implicit_txn_control->executeRollback(context->getSessionContext());
implicit_txn_control.reset();
}
else if (auto txn = context->getCurrentTransaction()) else if (auto txn = context->getCurrentTransaction())
txn->onException(); txn->onException();
@ -1179,15 +1172,10 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
} }
catch (...) catch (...)
{ {
if (implicit_txn_control) if (*implicit_txn_control)
{ execute_implicit_tcl_query(context, ASTTransactionControl::ROLLBACK);
implicit_txn_control->executeRollback(context->getSessionContext());
implicit_txn_control.reset();
}
else if (auto txn = context->getCurrentTransaction()) else if (auto txn = context->getCurrentTransaction())
{
txn->onException(); txn->onException();
}
if (!internal) if (!internal)
onExceptionBeforeStart(query_for_logging, context, ast, query_span, start_watch.elapsedMilliseconds()); onExceptionBeforeStart(query_for_logging, context, ast, query_span, start_watch.elapsedMilliseconds());

View File

@ -12,3 +12,6 @@ in_transaction 10000
out_transaction 0 out_transaction 0
{"'implicit_True'":"implicit_True","all":"2","is_empty":0} {"'implicit_True'":"implicit_True","all":"2","is_empty":0}
{"'implicit_False'":"implicit_False","all":"2","is_empty":1} {"'implicit_False'":"implicit_False","all":"2","is_empty":1}
0
0
0

View File

@ -1,4 +1,4 @@
-- Tags: no-ordinary-database -- Tags: no-ordinary-database, no-fasttest
CREATE TABLE landing (n Int64) engine=MergeTree order by n; CREATE TABLE landing (n Int64) engine=MergeTree order by n;
CREATE TABLE target (n Int64) engine=MergeTree order by n; CREATE TABLE target (n Int64) engine=MergeTree order by n;
@ -92,3 +92,13 @@ WHERE
query LIKE '-- Verify that the transaction_id column is NOT populated without transaction%' query LIKE '-- Verify that the transaction_id column is NOT populated without transaction%'
GROUP BY transaction_id GROUP BY transaction_id
FORMAT JSONEachRow; FORMAT JSONEachRow;
SET implicit_transaction=1;
SET throw_on_unsupported_query_inside_transaction=1;
SELECT * FROM system.one;
SELECT * FROM cluster('test_cluster_interserver_secret', system, one); -- { serverError NOT_IMPLEMENTED }
SELECT * FROM cluster('test_cluster_two_shards', system, one); -- { serverError NOT_IMPLEMENTED }
SET throw_on_unsupported_query_inside_transaction=0;
-- there's not session in the interserver mode
SELECT * FROM cluster('test_cluster_interserver_secret', system, one) FORMAT Null; -- { serverError INVALID_TRANSACTION }
SELECT * FROM cluster('test_cluster_two_shards', system, one);