2019-07-03 20:51:13 +00:00
import time
import pytest
from helpers . cluster import ClickHouseCluster
from helpers . test_tools import assert_eq_with_retry
cluster = ClickHouseCluster ( __file__ )
node1 = cluster . add_instance ( ' node1 ' , with_zookeeper = True )
node2 = cluster . add_instance ( ' node2 ' , with_zookeeper = True )
@pytest.fixture ( scope = " module " )
def started_cluster ( ) :
try :
cluster . start ( )
for node in [ node1 , node2 ] :
node . query ( '''
CREATE TABLE replicated_mt ( date Date , id UInt32 , value Int32 )
ENGINE = ReplicatedMergeTree ( ' /clickhouse/tables/replicated_mt ' , ' {replica} ' ) PARTITION BY toYYYYMM ( date ) ORDER BY id ;
''' .format(replica=node.name))
node1 . query ( '''
CREATE TABLE non_replicated_mt ( date Date , id UInt32 , value Int32 )
ENGINE = MergeTree ( ) PARTITION BY toYYYYMM ( date ) ORDER BY id ;
''' )
yield cluster
finally :
cluster . shutdown ( )
def corrupt_data_part_on_disk ( node , table , part_name ) :
part_path = node . query ( " SELECT path FROM system.parts WHERE table = ' {} ' and name = ' {} ' " . format ( table , part_name ) ) . strip ( )
node . exec_in_container ( [ ' bash ' , ' -c ' , ' cd {p} && ls *.bin | head -n 1 | xargs -I {{ }} sh -c \' echo " 1 " >> $1 \' -- {{ }} ' . format ( p = part_path ) ] , privileged = True )
def remove_checksums_on_disk ( node , table , part_name ) :
part_path = node . query ( " SELECT path FROM system.parts WHERE table = ' {} ' and name = ' {} ' " . format ( table , part_name ) ) . strip ( )
node . exec_in_container ( [ ' bash ' , ' -c ' , ' rm -r {p} /checksums.txt ' . format ( p = part_path ) ] , privileged = True )
def remove_part_from_disk ( node , table , part_name ) :
part_path = node . query ( " SELECT path FROM system.parts WHERE table = ' {} ' and name = ' {} ' " . format ( table , part_name ) ) . strip ( )
2019-07-04 10:21:14 +00:00
if not part_path :
raise Exception ( " Part " + part_name + " doesn ' t exist " )
2019-07-03 20:51:13 +00:00
node . exec_in_container ( [ ' bash ' , ' -c ' , ' rm -r {p} /* ' . format ( p = part_path ) ] , privileged = True )
def test_check_normal_table_corruption ( started_cluster ) :
node1 . query ( " INSERT INTO non_replicated_mt VALUES (toDate( ' 2019-02-01 ' ), 1, 10), (toDate( ' 2019-02-01 ' ), 2, 12) " )
2019-07-16 10:49:16 +00:00
assert node1 . query ( " CHECK TABLE non_replicated_mt PARTITION 201902 " , settings = { " check_query_single_value_result " : 0 } ) == " 201902_1_1_0 \t 1 \t \n "
2019-07-03 20:51:13 +00:00
remove_checksums_on_disk ( node1 , " non_replicated_mt " , " 201902_1_1_0 " )
2019-07-16 10:49:16 +00:00
assert node1 . query ( " CHECK TABLE non_replicated_mt " , settings = { " check_query_single_value_result " : 0 } ) . strip ( ) == " 201902_1_1_0 \t 1 \t Checksums recounted and written to disk. "
2019-07-03 20:51:13 +00:00
assert node1 . query ( " SELECT COUNT() FROM non_replicated_mt " ) == " 2 \n "
remove_checksums_on_disk ( node1 , " non_replicated_mt " , " 201902_1_1_0 " )
2019-07-16 10:49:16 +00:00
assert node1 . query ( " CHECK TABLE non_replicated_mt PARTITION 201902 " , settings = { " check_query_single_value_result " : 0 } ) . strip ( ) == " 201902_1_1_0 \t 1 \t Checksums recounted and written to disk. "
2019-07-03 20:51:13 +00:00
assert node1 . query ( " SELECT COUNT() FROM non_replicated_mt " ) == " 2 \n "
corrupt_data_part_on_disk ( node1 , " non_replicated_mt " , " 201902_1_1_0 " )
2019-07-16 10:49:16 +00:00
assert node1 . query ( " CHECK TABLE non_replicated_mt " , settings = { " check_query_single_value_result " : 0 } ) . strip ( ) == " 201902_1_1_0 \t 0 \t Cannot read all data. Bytes read: 2. Bytes expected: 16. "
2019-07-03 20:51:13 +00:00
2019-07-16 10:49:16 +00:00
assert node1 . query ( " CHECK TABLE non_replicated_mt " , settings = { " check_query_single_value_result " : 0 } ) . strip ( ) == " 201902_1_1_0 \t 0 \t Cannot read all data. Bytes read: 2. Bytes expected: 16. "
2019-07-03 20:51:13 +00:00
node1 . query ( " INSERT INTO non_replicated_mt VALUES (toDate( ' 2019-01-01 ' ), 1, 10), (toDate( ' 2019-01-01 ' ), 2, 12) " )
2019-07-16 10:49:16 +00:00
assert node1 . query ( " CHECK TABLE non_replicated_mt PARTITION 201901 " , settings = { " check_query_single_value_result " : 0 } ) == " 201901_2_2_0 \t 1 \t \n "
2019-07-03 20:51:13 +00:00
corrupt_data_part_on_disk ( node1 , " non_replicated_mt " , " 201901_2_2_0 " )
remove_checksums_on_disk ( node1 , " non_replicated_mt " , " 201901_2_2_0 " )
2019-07-16 10:49:16 +00:00
assert node1 . query ( " CHECK TABLE non_replicated_mt PARTITION 201901 " , settings = { " check_query_single_value_result " : 0 } ) == " 201901_2_2_0 \t 0 \t Check of part finished with error: \\ ' Cannot read all data. Bytes read: 2. Bytes expected: 16. \\ ' \n "
2019-07-03 20:51:13 +00:00
def test_check_replicated_table_simple ( started_cluster ) :
2019-07-04 10:21:14 +00:00
node1 . query ( " TRUNCATE TABLE replicated_mt " )
node2 . query ( " SYSTEM SYNC REPLICA replicated_mt " )
2019-07-03 20:51:13 +00:00
node1 . query ( " INSERT INTO replicated_mt VALUES (toDate( ' 2019-02-01 ' ), 1, 10), (toDate( ' 2019-02-01 ' ), 2, 12) " )
node2 . query ( " SYSTEM SYNC REPLICA replicated_mt " )
assert node1 . query ( " SELECT count() from replicated_mt " ) == " 2 \n "
assert node2 . query ( " SELECT count() from replicated_mt " ) == " 2 \n "
2019-07-16 10:49:16 +00:00
assert node1 . query ( " CHECK TABLE replicated_mt " , settings = { " check_query_single_value_result " : 0 } ) == " 201902_0_0_0 \t 1 \t \n "
assert node2 . query ( " CHECK TABLE replicated_mt " , settings = { " check_query_single_value_result " : 0 } ) == " 201902_0_0_0 \t 1 \t \n "
2019-07-03 20:51:13 +00:00
node2 . query ( " INSERT INTO replicated_mt VALUES (toDate( ' 2019-01-02 ' ), 3, 10), (toDate( ' 2019-01-02 ' ), 4, 12) " )
node1 . query ( " SYSTEM SYNC REPLICA replicated_mt " )
assert node1 . query ( " SELECT count() from replicated_mt " ) == " 4 \n "
assert node2 . query ( " SELECT count() from replicated_mt " ) == " 4 \n "
2019-07-16 10:49:16 +00:00
assert node1 . query ( " CHECK TABLE replicated_mt PARTITION 201901 " , settings = { " check_query_single_value_result " : 0 } ) == " 201901_0_0_0 \t 1 \t \n "
assert node2 . query ( " CHECK TABLE replicated_mt PARTITION 201901 " , settings = { " check_query_single_value_result " : 0 } ) == " 201901_0_0_0 \t 1 \t \n "
2019-07-03 20:51:13 +00:00
def test_check_replicated_table_corruption ( started_cluster ) :
node1 . query ( " TRUNCATE TABLE replicated_mt " )
2019-07-04 10:21:14 +00:00
node2 . query ( " SYSTEM SYNC REPLICA replicated_mt " )
2019-07-03 20:51:13 +00:00
node1 . query ( " INSERT INTO replicated_mt VALUES (toDate( ' 2019-02-01 ' ), 1, 10), (toDate( ' 2019-02-01 ' ), 2, 12) " )
node1 . query ( " INSERT INTO replicated_mt VALUES (toDate( ' 2019-01-02 ' ), 3, 10), (toDate( ' 2019-01-02 ' ), 4, 12) " )
node2 . query ( " SYSTEM SYNC REPLICA replicated_mt " )
assert node1 . query ( " SELECT count() from replicated_mt " ) == " 4 \n "
assert node2 . query ( " SELECT count() from replicated_mt " ) == " 4 \n "
2019-07-09 09:02:52 +00:00
part_name = node1 . query ( " SELECT name from system.parts where table = ' replicated_mt ' and partition_id = ' 201901 ' and active = 1 " ) . strip ( )
corrupt_data_part_on_disk ( node1 , " replicated_mt " , part_name )
2019-07-16 10:49:16 +00:00
assert node1 . query ( " CHECK TABLE replicated_mt PARTITION 201901 " , settings = { " check_query_single_value_result " : 0 } ) == " {p} \t 0 \t Part {p} looks broken. Removing it and queueing a fetch. \n " . format ( p = part_name )
2019-07-03 20:51:13 +00:00
node1 . query ( " SYSTEM SYNC REPLICA replicated_mt " )
2019-07-16 10:49:16 +00:00
assert node1 . query ( " CHECK TABLE replicated_mt PARTITION 201901 " , settings = { " check_query_single_value_result " : 0 } ) == " {} \t 1 \t \n " . format ( part_name )
2019-07-03 20:51:13 +00:00
assert node1 . query ( " SELECT count() from replicated_mt " ) == " 4 \n "
2019-07-09 09:02:52 +00:00
remove_part_from_disk ( node2 , " replicated_mt " , part_name )
2019-07-16 10:49:16 +00:00
assert node2 . query ( " CHECK TABLE replicated_mt PARTITION 201901 " , settings = { " check_query_single_value_result " : 0 } ) == " {p} \t 0 \t Part {p} looks broken. Removing it and queueing a fetch. \n " . format ( p = part_name )
2019-07-03 20:51:13 +00:00
node1 . query ( " SYSTEM SYNC REPLICA replicated_mt " )
2019-07-16 10:49:16 +00:00
assert node1 . query ( " CHECK TABLE replicated_mt PARTITION 201901 " , settings = { " check_query_single_value_result " : 0 } ) == " {} \t 1 \t \n " . format ( part_name )
2019-07-03 20:51:13 +00:00
assert node1 . query ( " SELECT count() from replicated_mt " ) == " 4 \n "