Add test for data corruption

This commit is contained in:
alesapin 2019-07-03 23:51:13 +03:00
parent c880e00ffa
commit 477a7450fb
4 changed files with 138 additions and 9 deletions

View File

@ -1138,19 +1138,19 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, const Context & c
String checksums_path = full_part_path + "checksums.txt";
if (!Poco::File(checksums_path).exists())
{
auto counted_checksums = checkDataPart(part, false, primary_key_data_types, skip_indices);
try
{
auto counted_checksums = checkDataPart(part, false, primary_key_data_types, skip_indices);
counted_checksums.checkEqual(part->checksums, true);
WriteBufferFromFile out(full_part_path + "checksums.txt.tmp", 4096);
part->checksums.write(out);
Poco::File(full_part_path + "checksums.txt.tmp").renameTo(full_part_path + "checksums.txt");
results.emplace_back(part->name, true, "Checksums recounted and written to disk");
results.emplace_back(part->name, true, "Checksums recounted and written to disk.");
}
catch (Exception & ex)
{
results.emplace_back(part->name, false,
"Checksums file absent and counted doesn't equal to checksums in memory. Error: '" + ex.message() + "'");
"Check of part finished with error: '" + ex.message() + "'");
}
}
else

View File

@ -0,0 +1,122 @@
import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True)
node2 = cluster.add_instance('node2', with_zookeeper=True)
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
for node in [node1, node2]:
node.query('''
CREATE TABLE replicated_mt(date Date, id UInt32, value Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/replicated_mt', '{replica}') PARTITION BY toYYYYMM(date) ORDER BY id;
'''.format(replica=node.name))
node1.query('''
CREATE TABLE non_replicated_mt(date Date, id UInt32, value Int32)
ENGINE = MergeTree() PARTITION BY toYYYYMM(date) ORDER BY id;
''')
yield cluster
finally:
cluster.shutdown()
def corrupt_data_part_on_disk(node, table, part_name):
part_path = node.query("SELECT path FROM system.parts WHERE table = '{}' and name = '{}'".format(table, part_name)).strip()
node.exec_in_container(['bash', '-c', 'cd {p} && ls *.bin | head -n 1 | xargs -I{{}} sh -c \'echo "1" >> $1\' -- {{}}'.format(p=part_path)], privileged=True)
def remove_checksums_on_disk(node, table, part_name):
part_path = node.query("SELECT path FROM system.parts WHERE table = '{}' and name = '{}'".format(table, part_name)).strip()
node.exec_in_container(['bash', '-c', 'rm -r {p}/checksums.txt'.format(p=part_path)], privileged=True)
def remove_part_from_disk(node, table, part_name):
part_path = node.query("SELECT path FROM system.parts WHERE table = '{}' and name = '{}'".format(table, part_name)).strip()
node.exec_in_container(['bash', '-c', 'rm -r {p}/*'.format(p=part_path)], privileged=True)
def test_check_normal_table_corruption(started_cluster):
node1.query("INSERT INTO non_replicated_mt VALUES (toDate('2019-02-01'), 1, 10), (toDate('2019-02-01'), 2, 12)")
assert node1.query("CHECK TABLE non_replicated_mt PARTITION 201902") == "201902_1_1_0\t1\t\n"
remove_checksums_on_disk(node1, "non_replicated_mt", "201902_1_1_0")
assert node1.query("CHECK TABLE non_replicated_mt").strip() == "201902_1_1_0\t1\tChecksums recounted and written to disk."
assert node1.query("SELECT COUNT() FROM non_replicated_mt") == "2\n"
remove_checksums_on_disk(node1, "non_replicated_mt", "201902_1_1_0")
assert node1.query("CHECK TABLE non_replicated_mt PARTITION 201902").strip() == "201902_1_1_0\t1\tChecksums recounted and written to disk."
assert node1.query("SELECT COUNT() FROM non_replicated_mt") == "2\n"
corrupt_data_part_on_disk(node1, "non_replicated_mt", "201902_1_1_0")
assert node1.query("CHECK TABLE non_replicated_mt").strip() == "201902_1_1_0\t0\tCannot read all data. Bytes read: 2. Bytes expected: 16."
assert node1.query("CHECK TABLE non_replicated_mt").strip() == "201902_1_1_0\t0\tCannot read all data. Bytes read: 2. Bytes expected: 16."
node1.query("INSERT INTO non_replicated_mt VALUES (toDate('2019-01-01'), 1, 10), (toDate('2019-01-01'), 2, 12)")
assert node1.query("CHECK TABLE non_replicated_mt PARTITION 201901") == "201901_2_2_0\t1\t\n"
corrupt_data_part_on_disk(node1, "non_replicated_mt", "201901_2_2_0")
remove_checksums_on_disk(node1, "non_replicated_mt", "201901_2_2_0")
assert node1.query("CHECK TABLE non_replicated_mt PARTITION 201901") == "201901_2_2_0\t0\tCheck of part finished with error: \\'Cannot read all data. Bytes read: 2. Bytes expected: 16.\\'\n"
def test_check_replicated_table_simple(started_cluster):
node1.query("INSERT INTO replicated_mt VALUES (toDate('2019-02-01'), 1, 10), (toDate('2019-02-01'), 2, 12)")
node2.query("SYSTEM SYNC REPLICA replicated_mt")
assert node1.query("SELECT count() from replicated_mt") == "2\n"
assert node2.query("SELECT count() from replicated_mt") == "2\n"
assert node1.query("CHECK TABLE replicated_mt") == "201902_0_0_0\t1\t\n"
assert node2.query("CHECK TABLE replicated_mt") == "201902_0_0_0\t1\t\n"
node2.query("INSERT INTO replicated_mt VALUES (toDate('2019-01-02'), 3, 10), (toDate('2019-01-02'), 4, 12)")
node1.query("SYSTEM SYNC REPLICA replicated_mt")
assert node1.query("SELECT count() from replicated_mt") == "4\n"
assert node2.query("SELECT count() from replicated_mt") == "4\n"
assert node1.query("CHECK TABLE replicated_mt PARTITION 201901") == "201901_0_0_0\t1\t\n"
assert node2.query("CHECK TABLE replicated_mt PARTITION 201901") == "201901_0_0_0\t1\t\n"
def test_check_replicated_table_corruption(started_cluster):
node1.query("TRUNCATE TABLE replicated_mt")
node1.query("INSERT INTO replicated_mt VALUES (toDate('2019-02-01'), 1, 10), (toDate('2019-02-01'), 2, 12)")
node1.query("INSERT INTO replicated_mt VALUES (toDate('2019-01-02'), 3, 10), (toDate('2019-01-02'), 4, 12)")
node2.query("SYSTEM SYNC REPLICA replicated_mt")
assert node1.query("SELECT count() from replicated_mt") == "4\n"
assert node2.query("SELECT count() from replicated_mt") == "4\n"
corrupt_data_part_on_disk(node1, "replicated_mt", "201901_2_2_0")
assert node1.query("CHECK TABLE replicated_mt PARTITION 201901") == "201901_2_2_0\t0\tPart 201901_2_2_0 looks broken. Removing it and queueing a fetch.\n"
node1.query("SYSTEM SYNC REPLICA replicated_mt")
assert node1.query("CHECK TABLE replicated_mt PARTITION 201901") == "201901_2_2_0\t1\t\n"
assert node1.query("SELECT count() from replicated_mt") == "4\n"
remove_part_from_disk(node2, "replicated_mt", "201901_2_2_0")
assert node2.query("CHECK TABLE replicated_mt PARTITION 201901") == "201901_2_2_0\t0\tPart 201901_2_2_0 looks broken. Removing it and queueing a fetch.\n"
node1.query("SYSTEM SYNC REPLICA replicated_mt")
assert node1.query("CHECK TABLE replicated_mt PARTITION 201901") == "201901_2_2_0\t1\t\n"
assert node1.query("SELECT count() from replicated_mt") == "4\n"

View File

@ -1,10 +1,17 @@
1
1
1
8873898 12457120258355519194
8873898 12457120258355519194
8873898 12457120258355519194
8873898 12457120258355519194
1
1
1
AdvEngineID.bin 1
CounterID.bin 1
RegionID.bin 1
SearchPhrase.bin 1
UserID.bin 1
__marks.mrk 1
AdvEngineID.bin 1
CounterID.bin 1
RegionID.bin 1
SearchPhrase.bin 1
UserID.bin 1
data.bin 1
index.mrk 1