diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 38b9a5790e6..5a41b81252e 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -1218,7 +1218,7 @@ size_t StorageMergeTree::clearOldMutations(bool truncate) auto versions_it = std::lower_bound( part_versions_with_names.begin(), part_versions_with_names.end(), needle); - if (versions_it != part_versions_with_names.begin()) + if (versions_it != part_versions_with_names.begin() || !it->second.tid.isPrehistoric()) { done_count = std::distance(begin_it, it); break; @@ -1235,7 +1235,8 @@ size_t StorageMergeTree::clearOldMutations(bool truncate) { const auto & tid = it->second.tid; if (!tid.isPrehistoric() && !TransactionLog::instance().getCSN(tid)) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot remove mutation {}, because transaction {} is not committed. It's a bug"); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot remove mutation {}, because transaction {} is not committed. It's a bug", + it->first, tid); mutations_to_delete.push_back(std::move(it->second)); it = current_mutations_by_version.erase(it); } diff --git a/tests/queries/0_stateless/01168_mutations_isolation.sh b/tests/queries/0_stateless/01168_mutations_isolation.sh index e6049e50131..e034b467358 100755 --- a/tests/queries/0_stateless/01168_mutations_isolation.sh +++ b/tests/queries/0_stateless/01168_mutations_isolation.sh @@ -56,7 +56,7 @@ tx 6 "commit" tx 7 "begin transaction" tx 7 "select 7, n, _part from mt order by n" tx 8 "begin transaction" -tx 8 "alter table mt update n = 0 where 1" & +tx 8 "alter table mt update n = 0 where 1" >/dev/null & $CLICKHOUSE_CLIENT -q "kill mutation where database=currentDatabase() and mutation_id='mutation_15.txt' format Null" wait tx 7 "optimize table mt final" diff --git a/tests/queries/0_stateless/transactions.lib b/tests/queries/0_stateless/transactions.lib index be8745e68a5..d4d349d8827 100755 --- a/tests/queries/0_stateless/transactions.lib +++ b/tests/queries/0_stateless/transactions.lib @@ -1,6 +1,8 @@ #!/usr/bin/env bash -# Useful to run +# shellcheck disable=SC2015 + +# Useful to run queries in parallel sessions function tx() { tx_num=$1 @@ -14,12 +16,24 @@ function tx() ${CLICKHOUSE_CURL} -m 60 -sSk "$url" --data "$query" | sed "s/^/tx$tx_num\t/" } +# Waits for the last query in session to finish function tx_wait() { tx_num=$1 session="${CLICKHOUSE_TEST_ZOOKEEPER_PREFIX}_tx$tx_num" + # try get pid of previous query + query_pid="" + tmp_file_name="${CLICKHOUSE_TMP}/tmp_tx_${CLICKHOUSE_TEST_ZOOKEEPER_PREFIX}" + query_id_and_pid=$(grep -F "$session" "$tmp_file_name" 2>/dev/null | tail -1) ||: + read -r query_id query_pid <<< "$query_id_and_pid" ||: + # wait for previous query in transaction + if [ -n "$query_pid" ]; then + timeout 5 tail --pid=$query_pid -f /dev/null && return ||: + fi + + # there is no pid (or maybe we got wrong one), so wait using system.processes (it's less reliable) count=0 while [[ $($CLICKHOUSE_CLIENT -q "SELECT count() FROM system.processes WHERE query_id LIKE '$session%'") -gt 0 ]]; do sleep 0.5 @@ -31,6 +45,7 @@ function tx_wait() { done; } +# Wait for previous query in session to finish, starts new one asynchronously function tx_async() { tx_num=$1 @@ -43,6 +58,12 @@ function tx_async() url_without_session="https://${CLICKHOUSE_HOST}:${CLICKHOUSE_PORT_HTTPS}/?" url="${url_without_session}session_id=$session&query_id=$query_id&database=$CLICKHOUSE_DATABASE" + # We cannot be sure that query will actually start execution and appear in system.processes before the next call to tx_wait + # Also we cannot use global map in bash to store last query_id for each tx_num, so we use tmp file... + tmp_file_name="${CLICKHOUSE_TMP}/tmp_tx_${CLICKHOUSE_TEST_ZOOKEEPER_PREFIX}" + # run query asynchronously ${CLICKHOUSE_CURL} -m 60 -sSk "$url" --data "$query" | sed "s/^/tx$tx_num\t/" & + query_pid=$! + echo -e "$query_id\t$query_pid" >> "$tmp_file_name" }