2019-06-04 20:59:31 +00:00
import time
import pytest
import subprocess
from helpers . cluster import ClickHouseCluster
from helpers . test_tools import assert_eq_with_retry
2019-07-03 16:37:37 +00:00
from helpers . client import QueryRuntimeException
from helpers . test_tools import TSV
2019-06-04 20:59:31 +00:00
cluster = ClickHouseCluster ( __file__ )
2019-07-03 16:37:37 +00:00
def _fill_nodes ( nodes , table_name ) :
for node in nodes :
node . query (
'''
CREATE DATABASE IF NOT EXISTS test ;
CREATE TABLE IF NOT EXISTS { 0 } ( date Date , id UInt32 )
ENGINE = ReplicatedMergeTree ( ' /clickhouse/tables/test/ {0} ' , ' {1} ' )
ORDER BY id PARTITION BY toYYYYMM ( date ) ;
''' .format(table_name, node.name)
)
node1 = cluster . add_instance ( ' node1 ' , main_configs = [ ' configs/listen_host.xml ' ] , with_zookeeper = True , ipv6_address = ' 2001:3984:3989::1:1111 ' )
node2 = cluster . add_instance ( ' node2 ' , main_configs = [ ' configs/listen_host.xml ' , ' configs/dns_update_long.xml ' ] ,
with_zookeeper = True , ipv6_address = ' 2001:3984:3989::1:1112 ' )
2019-06-04 20:59:31 +00:00
@pytest.fixture ( scope = " module " )
2019-07-03 16:37:37 +00:00
def cluster_without_dns_cache_update ( ) :
2019-06-04 20:59:31 +00:00
try :
cluster . start ( )
2019-07-03 16:37:37 +00:00
_fill_nodes ( [ node1 , node2 ] , ' test_table_drop ' )
2019-06-04 20:59:31 +00:00
yield cluster
except Exception as ex :
print ex
finally :
cluster . shutdown ( )
pass
2019-07-03 16:37:37 +00:00
# node1 is a source, node2 downloads data
# node2 has long dns_cache_update_period, so dns cache update wouldn't work
def test_ip_change_drop_dns_cache ( cluster_without_dns_cache_update ) :
2019-06-05 09:23:41 +00:00
# First we check, that normal replication works
2019-07-03 16:37:37 +00:00
node1 . query ( " INSERT INTO test_table_drop VALUES ( ' 2018-10-01 ' , 1), ( ' 2018-10-02 ' , 2), ( ' 2018-10-03 ' , 3) " )
assert node1 . query ( " SELECT count(*) from test_table_drop " ) == " 3 \n "
assert_eq_with_retry ( node2 , " SELECT count(*) from test_table_drop " , " 3 " )
2019-06-04 20:59:31 +00:00
2019-06-05 09:23:41 +00:00
# We change source node ip
2019-06-04 20:59:31 +00:00
cluster . restart_instance_with_ip_change ( node1 , " 2001:3984:3989::1:7777 " )
2019-06-05 09:23:41 +00:00
# Put some data to source node1
2019-07-03 16:37:37 +00:00
node1 . query ( " INSERT INTO test_table_drop VALUES ( ' 2018-10-01 ' , 5), ( ' 2018-10-02 ' , 6), ( ' 2018-10-03 ' , 7) " )
2019-06-05 09:23:41 +00:00
# Check that data is placed on node1
2019-07-03 16:37:37 +00:00
assert node1 . query ( " SELECT count(*) from test_table_drop " ) == " 6 \n "
2019-06-04 20:59:31 +00:00
2019-06-05 09:23:41 +00:00
# Because of DNS cache dest node2 cannot download data from node1
2019-06-04 20:59:31 +00:00
with pytest . raises ( Exception ) :
2019-07-03 16:37:37 +00:00
assert_eq_with_retry ( node2 , " SELECT count(*) from test_table_drop " , " 6 " )
2019-06-04 20:59:31 +00:00
2019-06-05 09:23:41 +00:00
# drop DNS cache
2019-06-04 20:59:31 +00:00
node2 . query ( " SYSTEM DROP DNS CACHE " )
2019-06-05 09:23:41 +00:00
# Data is downloaded
2019-07-03 16:37:37 +00:00
assert_eq_with_retry ( node2 , " SELECT count(*) from test_table_drop " , " 6 " )
# Just to be sure check one more time
node1 . query ( " INSERT INTO test_table_drop VALUES ( ' 2018-10-01 ' , 8) " )
assert node1 . query ( " SELECT count(*) from test_table_drop " ) == " 7 \n "
assert_eq_with_retry ( node2 , " SELECT count(*) from test_table_drop " , " 7 " )
node3 = cluster . add_instance ( ' node3 ' , main_configs = [ ' configs/listen_host.xml ' ] ,
with_zookeeper = True , ipv6_address = ' 2001:3984:3989::1:1113 ' )
node4 = cluster . add_instance ( ' node4 ' , main_configs = [ ' configs/remote_servers.xml ' , ' configs/listen_host.xml ' , ' configs/dns_update_short.xml ' ] ,
with_zookeeper = True , ipv6_address = ' 2001:3984:3989::1:1114 ' )
@pytest.fixture ( scope = " module " )
def cluster_with_dns_cache_update ( ) :
try :
cluster . start ( )
_fill_nodes ( [ node3 , node4 ] , ' test_table_update ' )
yield cluster
except Exception as ex :
print ex
finally :
cluster . shutdown ( )
pass
# node3 is a source, node4 downloads data
# node4 has short dns_cache_update_period, so testing update of dns cache
def test_ip_change_update_dns_cache ( cluster_with_dns_cache_update ) :
# First we check, that normal replication works
node3 . query ( " INSERT INTO test_table_update VALUES ( ' 2018-10-01 ' , 1), ( ' 2018-10-02 ' , 2), ( ' 2018-10-03 ' , 3) " )
assert node3 . query ( " SELECT count(*) from test_table_update " ) == " 3 \n "
assert_eq_with_retry ( node4 , " SELECT count(*) from test_table_update " , " 3 " )
# We change source node ip
cluster . restart_instance_with_ip_change ( node3 , " 2001:3984:3989::1:8888 " )
# Put some data to source node3
node3 . query ( " INSERT INTO test_table_update VALUES ( ' 2018-10-01 ' , 5), ( ' 2018-10-02 ' , 6), ( ' 2018-10-03 ' , 7) " )
# Check that data is placed on node3
assert node3 . query ( " SELECT count(*) from test_table_update " ) == " 6 \n "
# Because of DNS cache update, ip of node3 would be updated
assert_eq_with_retry ( node4 , " SELECT count(*) from test_table_update " , " 6 " )
2019-06-05 09:23:41 +00:00
# Just to be sure check one more time
2019-07-03 16:37:37 +00:00
node3 . query ( " INSERT INTO test_table_update VALUES ( ' 2018-10-01 ' , 8) " )
assert node3 . query ( " SELECT count(*) from test_table_update " ) == " 7 \n "
assert_eq_with_retry ( node4 , " SELECT count(*) from test_table_update " , " 7 " )
def test_dns_cache_update ( cluster_with_dns_cache_update ) :
node4 . exec_in_container ( [ ' bash ' , ' -c ' , ' echo 127.0.0.1 localhost > /etc/hosts ' ] , privileged = True , user = ' root ' )
node4 . exec_in_container ( [ ' bash ' , ' -c ' , ' echo ::1 localhost >> /etc/hosts ' ] , privileged = True , user = ' root ' )
node4 . exec_in_container ( [ ' bash ' , ' -c ' , ' echo 127.255.255.255 lost_host >> /etc/hosts ' ] , privileged = True , user = ' root ' )
with pytest . raises ( QueryRuntimeException ) :
node4 . query ( " SELECT * FROM remote( ' lost_host ' , ' system ' , ' one ' ) " )
node4 . query ( " CREATE TABLE distributed_lost_host (dummy UInt8) ENGINE = Distributed(lost_host_cluster, ' system ' , ' one ' ) " )
with pytest . raises ( QueryRuntimeException ) :
node4 . query ( " SELECT * FROM distributed_lost_host " )
node4 . exec_in_container ( [ ' bash ' , ' -c ' , ' echo 127.0.0.1 localhost > /etc/hosts ' ] , privileged = True , user = ' root ' )
node4 . exec_in_container ( [ ' bash ' , ' -c ' , ' echo ::1 localhost >> /etc/hosts ' ] , privileged = True , user = ' root ' )
node4 . exec_in_container ( [ ' bash ' , ' -c ' , ' echo 127.0.0.1 lost_host >> /etc/hosts ' ] , privileged = True , user = ' root ' )
# Wait a bit until dns cache will be updated
assert_eq_with_retry ( node4 , " SELECT * FROM remote( ' lost_host ' , ' system ' , ' one ' ) " , " 0 " )
assert_eq_with_retry ( node4 , " SELECT * FROM distributed_lost_host " , " 0 " )
assert TSV ( node4 . query ( " SELECT DISTINCT host_name, host_address FROM system.clusters WHERE cluster= ' lost_host_cluster ' " ) ) == TSV ( " lost_host \t 127.0.0.1 \n " )
2019-07-03 23:05:59 +00:00
assert TSV ( node4 . query ( " SELECT hostName() " ) ) == TSV ( " node4 " )