From 0aea4c7184215edbf992e49d775442a6c7d18a8b Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Thu, 25 Aug 2022 14:22:27 +0200 Subject: [PATCH] fix --- src/Storages/StorageMergeTree.cpp | 12 +++ tests/integration/test_transactions/test.py | 95 ++++++++++++++++++--- 2 files changed, 96 insertions(+), 11 deletions(-) diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 574b04b3be5..507047751f3 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -1242,6 +1242,18 @@ size_t StorageMergeTree::clearOldMutations(bool truncate) size_t done_count = std::distance(begin_it, end_it); + if (done_count <= finished_mutations_to_keep) + return 0; + + for (auto it = begin_it; it != end_it; ++it) + { + if (!it->second.tid.isPrehistoric()) + { + done_count = std::distance(begin_it, it); + break; + } + } + if (done_count <= finished_mutations_to_keep) return 0; diff --git a/tests/integration/test_transactions/test.py b/tests/integration/test_transactions/test.py index 2dfdc889856..c7694187033 100644 --- a/tests/integration/test_transactions/test.py +++ b/tests/integration/test_transactions/test.py @@ -24,7 +24,7 @@ def tx(session, query): return node.http_query(None, data=query, params=params) -def test_rollback_unfinished_on_restart(start_cluster): +def test_rollback_unfinished_on_restart1(start_cluster): node.query( "create table mt (n int, m int) engine=MergeTree order by n partition by n % 2" ) @@ -67,14 +67,6 @@ def test_rollback_unfinished_on_restart(start_cluster): tx(1, "insert into mt values (5, 50)") tx(1, "alter table mt update m = m+n in partition id '1' where 1") - # check that uncommitted merge will be rolled back on restart - tx(2, "begin transaction") - tid4 = tx(2, "select transactionID()").strip() - tx( - 2, - "optimize table mt partition id '0' final settings optimize_throw_if_noop = 1", - ) - # check that uncommitted insert will be rolled back on restart tx(3, "begin transaction") tid5 = tx(3, "select transactionID()").strip() @@ -102,14 +94,14 @@ def test_rollback_unfinished_on_restart(start_cluster): res = res.replace(tid1, "tid1").replace("csn" + csn1 + "_", "csn_1") res = res.replace(tid2, "tid2").replace("csn" + csn2 + "_", "csn_2") res = res.replace(tid3, "tid3") - res = res.replace(tid4, "tid4") res = res.replace(tid5, "tid5") res = res.replace(tid6, "tid6").replace("csn" + csn6 + "_", "csn_6") assert ( res == "0_2_2_0\t1\ttid0\tcsn1_\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0_\n" - "0_2_4_1\t0\ttid4\tcsn18446744073709551615_\ttid0\tcsn0_\n" + "0_2_2_0_7\t0\ttid3\tcsn18446744073709551615_\ttid0\tcsn0_\n" "0_4_4_0\t1\ttid2\tcsn_2\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0_\n" + "0_4_4_0_7\t0\ttid3\tcsn18446744073709551615_\ttid0\tcsn0_\n" "0_8_8_0\t0\ttid5\tcsn18446744073709551615_\ttid0\tcsn0_\n" "1_1_1_0\t0\ttid0\tcsn1_\ttid1\tcsn_1\n" "1_3_3_0\t1\ttid2\tcsn_2\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0_\n" @@ -118,3 +110,84 @@ def test_rollback_unfinished_on_restart(start_cluster): "1_6_6_0\t0\ttid3\tcsn18446744073709551615_\ttid0\tcsn0_\n" "1_6_6_0_7\t0\ttid3\tcsn18446744073709551615_\ttid0\tcsn0_\n" ) + +def test_rollback_unfinished_on_restart2(start_cluster): + node.query( + "create table mt (n int, m int) engine=MergeTree order by n partition by n % 2" + ) + node.query("insert into mt values (1, 10), (2, 20)") + tid0 = "(1,1,'00000000-0000-0000-0000-000000000000')" + + # it will hold a snapshot and avoid parts cleanup + tx(0, "begin transaction") + + tx(4, "begin transaction") + + tx(1, "begin transaction") + tid1 = tx(1, "select transactionID()").strip() + tx(1, "alter table mt drop partition id '1'") + tx(1, "commit") + + tx(1, "begin transaction") + tid2 = tx(1, "select transactionID()").strip() + tx(1, "insert into mt values (3, 30), (4, 40)") + tx(1, "commit") + + node.query("system flush logs") + csn1 = node.query( + "select csn from system.transactions_info_log where type='Commit' and tid={}".format( + tid1 + ) + ).strip() + csn2 = node.query( + "select csn from system.transactions_info_log where type='Commit' and tid={}".format( + tid2 + ) + ).strip() + + # check that uncommitted merge will be rolled back on restart + tx(2, "begin transaction") + tid4 = tx(2, "select transactionID()").strip() + tx( + 2, + "optimize table mt partition id '0' final settings optimize_throw_if_noop = 1", + ) + + # check that uncommitted insert will be rolled back on restart + tx(3, "begin transaction") + tid5 = tx(3, "select transactionID()").strip() + tx(3, "insert into mt values (6, 70)") + + tid6 = tx(4, "select transactionID()").strip() + tx(4, "commit") + node.query("system flush logs") + csn6 = node.query( + "select csn from system.transactions_info_log where type='Commit' and tid={}".format( + tid6 + ) + ).strip() + + node.restart_clickhouse(kill=True) + + assert ( + node.query("select *, _part from mt order by n") + == "2\t20\t0_2_2_0\n3\t30\t1_3_3_0\n4\t40\t0_4_4_0\n" + ) + res = node.query( + "select name, active, creation_tid, 'csn' || toString(creation_csn) || '_', removal_tid, 'csn' || toString(removal_csn) || '_' from system.parts where table='mt' order by name" + ) + res = res.replace(tid0, "tid0") + res = res.replace(tid1, "tid1").replace("csn" + csn1 + "_", "csn_1") + res = res.replace(tid2, "tid2").replace("csn" + csn2 + "_", "csn_2") + res = res.replace(tid4, "tid4") + res = res.replace(tid5, "tid5") + res = res.replace(tid6, "tid6").replace("csn" + csn6 + "_", "csn_6") + assert ( + res + == "0_2_2_0\t1\ttid0\tcsn1_\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0_\n" + "0_2_4_1\t0\ttid4\tcsn18446744073709551615_\ttid0\tcsn0_\n" + "0_4_4_0\t1\ttid2\tcsn_2\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0_\n" + "0_5_5_0\t0\ttid5\tcsn18446744073709551615_\ttid0\tcsn0_\n" + "1_1_1_0\t0\ttid0\tcsn1_\ttid1\tcsn_1\n" + "1_3_3_0\t1\ttid2\tcsn_2\t(0,0,'00000000-0000-0000-0000-000000000000')\tcsn0_\n" + ) \ No newline at end of file