mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
fix
This commit is contained in:
parent
6e538f04c4
commit
a6f55b1430
@ -1231,9 +1231,9 @@ void IMergeTreeDataPart::assertHasVersionMetadata(MergeTreeTransaction * txn) co
|
|||||||
assert(!txn || data_part_storage->exists(TXN_VERSION_METADATA_FILE_NAME));
|
assert(!txn || data_part_storage->exists(TXN_VERSION_METADATA_FILE_NAME));
|
||||||
}
|
}
|
||||||
|
|
||||||
void IMergeTreeDataPart::storeVersionMetadata() const
|
void IMergeTreeDataPart::storeVersionMetadata(bool force) const
|
||||||
{
|
{
|
||||||
if (!wasInvolvedInTransaction())
|
if (!wasInvolvedInTransaction() && !force)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
LOG_TEST(storage.log, "Writing version for {} (creation: {}, removal {})", name, version.creation_tid, version.removal_tid);
|
LOG_TEST(storage.log, "Writing version for {} (creation: {}, removal {})", name, version.creation_tid, version.removal_tid);
|
||||||
@ -1285,8 +1285,6 @@ void IMergeTreeDataPart::loadVersionMetadata() const
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
data_part_storage->loadVersionMetadata(version, storage.log);
|
data_part_storage->loadVersionMetadata(version, storage.log);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
catch (Exception & e)
|
catch (Exception & e)
|
||||||
{
|
{
|
||||||
|
@ -411,7 +411,7 @@ public:
|
|||||||
void assertHasVersionMetadata(MergeTreeTransaction * txn) const;
|
void assertHasVersionMetadata(MergeTreeTransaction * txn) const;
|
||||||
|
|
||||||
/// [Re]writes file with transactional metadata on disk
|
/// [Re]writes file with transactional metadata on disk
|
||||||
void storeVersionMetadata() const;
|
void storeVersionMetadata(bool force = false) const;
|
||||||
|
|
||||||
/// Appends the corresponding CSN to file on disk (without fsync)
|
/// Appends the corresponding CSN to file on disk (without fsync)
|
||||||
void appendCSNToVersionMetadata(VersionMetadata::WhichCSN which_csn) const;
|
void appendCSNToVersionMetadata(VersionMetadata::WhichCSN which_csn) const;
|
||||||
|
@ -1457,7 +1457,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
|
|||||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} has invalid version metadata: {}", part->name, version.toString());
|
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} has invalid version metadata: {}", part->name, version.toString());
|
||||||
|
|
||||||
if (version_updated)
|
if (version_updated)
|
||||||
part->storeVersionMetadata();
|
part->storeVersionMetadata(/* force */ true);
|
||||||
|
|
||||||
/// Deactivate part if creation was not committed or if removal was.
|
/// Deactivate part if creation was not committed or if removal was.
|
||||||
if (version.creation_csn == Tx::RolledBackCSN || version.removal_csn)
|
if (version.creation_csn == Tx::RolledBackCSN || version.removal_csn)
|
||||||
|
@ -114,9 +114,9 @@ def test_rollback_unfinished_on_restart1(start_cluster):
|
|||||||
|
|
||||||
def test_rollback_unfinished_on_restart2(start_cluster):
|
def test_rollback_unfinished_on_restart2(start_cluster):
|
||||||
node.query(
|
node.query(
|
||||||
"create table mt (n int, m int) engine=MergeTree order by n partition by n % 2"
|
"create table mt2 (n int, m int) engine=MergeTree order by n partition by n % 2"
|
||||||
)
|
)
|
||||||
node.query("insert into mt values (1, 10), (2, 20)")
|
node.query("insert into mt2 values (1, 10), (2, 20)")
|
||||||
tid0 = "(1,1,'00000000-0000-0000-0000-000000000000')"
|
tid0 = "(1,1,'00000000-0000-0000-0000-000000000000')"
|
||||||
|
|
||||||
# it will hold a snapshot and avoid parts cleanup
|
# it will hold a snapshot and avoid parts cleanup
|
||||||
@ -126,12 +126,12 @@ def test_rollback_unfinished_on_restart2(start_cluster):
|
|||||||
|
|
||||||
tx(1, "begin transaction")
|
tx(1, "begin transaction")
|
||||||
tid1 = tx(1, "select transactionID()").strip()
|
tid1 = tx(1, "select transactionID()").strip()
|
||||||
tx(1, "alter table mt drop partition id '1'")
|
tx(1, "alter table mt2 drop partition id '1'")
|
||||||
tx(1, "commit")
|
tx(1, "commit")
|
||||||
|
|
||||||
tx(1, "begin transaction")
|
tx(1, "begin transaction")
|
||||||
tid2 = tx(1, "select transactionID()").strip()
|
tid2 = tx(1, "select transactionID()").strip()
|
||||||
tx(1, "insert into mt values (3, 30), (4, 40)")
|
tx(1, "insert into mt2 values (3, 30), (4, 40)")
|
||||||
tx(1, "commit")
|
tx(1, "commit")
|
||||||
|
|
||||||
node.query("system flush logs")
|
node.query("system flush logs")
|
||||||
@ -151,13 +151,13 @@ def test_rollback_unfinished_on_restart2(start_cluster):
|
|||||||
tid4 = tx(2, "select transactionID()").strip()
|
tid4 = tx(2, "select transactionID()").strip()
|
||||||
tx(
|
tx(
|
||||||
2,
|
2,
|
||||||
"optimize table mt partition id '0' final settings optimize_throw_if_noop = 1",
|
"optimize table mt2 partition id '0' final settings optimize_throw_if_noop = 1",
|
||||||
)
|
)
|
||||||
|
|
||||||
# check that uncommitted insert will be rolled back on restart
|
# check that uncommitted insert will be rolled back on restart
|
||||||
tx(3, "begin transaction")
|
tx(3, "begin transaction")
|
||||||
tid5 = tx(3, "select transactionID()").strip()
|
tid5 = tx(3, "select transactionID()").strip()
|
||||||
tx(3, "insert into mt values (6, 70)")
|
tx(3, "insert into mt2 values (6, 70)")
|
||||||
|
|
||||||
tid6 = tx(4, "select transactionID()").strip()
|
tid6 = tx(4, "select transactionID()").strip()
|
||||||
tx(4, "commit")
|
tx(4, "commit")
|
||||||
@ -171,11 +171,11 @@ def test_rollback_unfinished_on_restart2(start_cluster):
|
|||||||
node.restart_clickhouse(kill=True)
|
node.restart_clickhouse(kill=True)
|
||||||
|
|
||||||
assert (
|
assert (
|
||||||
node.query("select *, _part from mt order by n")
|
node.query("select *, _part from mt2 order by n")
|
||||||
== "2\t20\t0_2_2_0\n3\t30\t1_3_3_0\n4\t40\t0_4_4_0\n"
|
== "2\t20\t0_2_2_0\n3\t30\t1_3_3_0\n4\t40\t0_4_4_0\n"
|
||||||
)
|
)
|
||||||
res = node.query(
|
res = node.query(
|
||||||
"select name, active, creation_tid, 'csn' || toString(creation_csn) || '_', removal_tid, 'csn' || toString(removal_csn) || '_' from system.parts where table='mt' order by name"
|
"select name, active, creation_tid, 'csn' || toString(creation_csn) || '_', removal_tid, 'csn' || toString(removal_csn) || '_' from system.parts where table='mt2' order by name"
|
||||||
)
|
)
|
||||||
res = res.replace(tid0, "tid0")
|
res = res.replace(tid0, "tid0")
|
||||||
res = res.replace(tid1, "tid1").replace("csn" + csn1 + "_", "csn_1")
|
res = res.replace(tid1, "tid1").replace("csn" + csn1 + "_", "csn_1")
|
||||||
|
Loading…
Reference in New Issue
Block a user