2019-11-05 18:47:15 +00:00
import os
import os . path as p
import sys
sys . path . insert ( 0 , os . path . dirname ( os . path . dirname ( os . path . abspath ( __file__ ) ) ) )
from helpers . cluster import ClickHouseCluster
from helpers . network import PartitionManager
from helpers . test_tools import TSV
2022-03-22 16:39:58 +00:00
2019-11-05 18:47:15 +00:00
class ClickHouseClusterWithDDLHelpers ( ClickHouseCluster ) :
2021-02-12 19:23:50 +00:00
def __init__ ( self , base_path , config_dir , testcase_name ) :
ClickHouseCluster . __init__ ( self , base_path , name = testcase_name )
2019-11-05 18:47:15 +00:00
self . test_config_dir = config_dir
def prepare ( self , replace_hostnames_with_ips = True ) :
try :
2020-09-16 04:26:10 +00:00
main_configs_files = [
" clusters.xml " ,
" zookeeper_session_timeout.xml " ,
" macro.xml " ,
" query_log.xml " ,
" ddl.xml " ,
2020-08-18 14:38:16 +00:00
]
main_configs = [
os . path . join ( self . test_config_dir , " config.d " , f )
for f in main_configs_files
2022-03-22 16:39:58 +00:00
]
2020-09-16 04:26:10 +00:00
user_configs = [
os . path . join ( self . test_config_dir , " users.d " , f )
for f in [ " restricted_user.xml " , " query_log.xml " ]
2022-03-22 16:39:58 +00:00
]
2020-08-12 08:55:04 +00:00
if self . test_config_dir == " configs_secure " :
2020-09-16 04:26:10 +00:00
main_configs + = [
os . path . join ( self . test_config_dir , f )
for f in [
" server.crt " ,
" server.key " ,
" dhparam.pem " ,
" config.d/ssl_conf.xml " ,
]
2022-03-22 16:39:58 +00:00
]
2020-08-19 07:45:16 +00:00
2020-10-02 16:54:07 +00:00
for i in range ( 4 ) :
2019-11-05 18:47:15 +00:00
self . add_instance (
2020-09-16 04:26:10 +00:00
" ch {} " . format ( i + 1 ) ,
2020-08-12 08:55:04 +00:00
main_configs = main_configs ,
user_configs = user_configs ,
2020-10-02 16:54:07 +00:00
macros = { " layer " : 0 , " shard " : i / / 2 + 1 , " replica " : i % 2 + 1 } ,
2019-11-05 18:47:15 +00:00
with_zookeeper = True ,
)
self . start ( )
# Replace config files for testing ability to set host in DNS and IP formats
if replace_hostnames_with_ips :
self . replace_domains_to_ip_addresses_in_cluster_config ( [ " ch1 " , " ch3 " ] )
# Select sacrifice instance to test CONNECTION_LOSS and server fail on it
sacrifice = self . instances [ " ch4 " ]
self . pm_random_drops = PartitionManager ( )
2020-09-16 04:26:10 +00:00
self . pm_random_drops . _add_rule (
{
" probability " : 0.01 ,
" destination " : sacrifice . ip_address ,
" source_port " : 2181 ,
" action " : " REJECT --reject-with tcp-reset " ,
2022-03-22 16:39:58 +00:00
}
2020-09-16 04:26:10 +00:00
)
self . pm_random_drops . _add_rule (
{
" probability " : 0.01 ,
" source " : sacrifice . ip_address ,
" destination_port " : 2181 ,
" action " : " REJECT --reject-with tcp-reset " ,
2022-03-22 16:39:58 +00:00
}
2020-09-16 04:26:10 +00:00
)
2019-11-05 18:47:15 +00:00
# Initialize databases and service tables
instance = self . instances [ " ch1 " ]
self . ddl_check_query (
instance ,
"""
CREATE TABLE IF NOT EXISTS all_tables ON CLUSTER ' cluster_no_replicas '
( database String , name String , engine String , metadata_modification_time DateTime )
ENGINE = Distributed ( ' cluster_no_replicas ' , ' system ' , ' tables ' )
""" ,
)
self . ddl_check_query (
instance , " CREATE DATABASE IF NOT EXISTS test ON CLUSTER ' cluster ' "
)
except Exception as e :
2020-10-02 16:54:07 +00:00
print ( e )
2019-11-05 18:47:15 +00:00
raise
def sync_replicas ( self , table , timeout = 5 ) :
2020-10-02 16:54:07 +00:00
for instance in list ( self . instances . values ( ) ) :
2019-11-05 18:47:15 +00:00
instance . query ( " SYSTEM SYNC REPLICA {} " . format ( table ) , timeout = timeout )
def check_all_hosts_successfully_executed ( self , tsv_content , num_hosts = None ) :
if num_hosts is None :
num_hosts = len ( self . instances )
M = TSV . toMat ( tsv_content )
2020-09-16 04:26:10 +00:00
hosts = [ ( l [ 0 ] , l [ 1 ] ) for l in M ] # (host, port)
2019-11-05 18:47:15 +00:00
codes = [ l [ 2 ] for l in M ]
messages = [ l [ 3 ] for l in M ]
assert len ( hosts ) == num_hosts and len ( set ( hosts ) ) == num_hosts , (
" \n " + tsv_content
2022-03-22 16:39:58 +00:00
)
2019-11-05 18:47:15 +00:00
assert len ( set ( codes ) ) == 1 , " \n " + tsv_content
assert codes [ 0 ] == " 0 " , " \n " + tsv_content
2020-07-17 13:11:44 +00:00
def ddl_check_query ( self , instance , query , num_hosts = None , settings = None ) :
2021-04-08 09:30:24 +00:00
contents = instance . query_with_retry ( query , settings = settings )
2019-11-05 18:47:15 +00:00
self . check_all_hosts_successfully_executed ( contents , num_hosts )
return contents
def replace_domains_to_ip_addresses_in_cluster_config ( self , instances_to_replace ) :
clusters_config = open (
p . join (
self . base_dir , " {} /config.d/clusters.xml " . format ( self . test_config_dir )
2022-03-22 16:39:58 +00:00
)
2019-11-05 18:47:15 +00:00
) . read ( )
2020-10-02 16:54:07 +00:00
for inst_name , inst in list ( self . instances . items ( ) ) :
2019-11-05 18:47:15 +00:00
clusters_config = clusters_config . replace ( inst_name , str ( inst . ip_address ) )
for inst_name in instances_to_replace :
inst = self . instances [ inst_name ]
2020-09-16 04:26:10 +00:00
self . instances [ inst_name ] . exec_in_container (
[
" bash " ,
" -c " ,
' echo " $NEW_CONFIG " > /etc/clickhouse-server/config.d/clusters.xml ' ,
] ,
environment = { " NEW_CONFIG " : clusters_config } ,
privileged = True ,
)
2019-11-05 18:47:15 +00:00
# print cluster.instances[inst_name].exec_in_container(['cat', "/etc/clickhouse-server/config.d/clusters.xml"])
@staticmethod
def ddl_check_there_are_no_dublicates ( instance ) :
query = " SELECT max(c), argMax(q, c) FROM (SELECT lower(query) AS q, count() AS c FROM system.query_log WHERE type=2 AND q LIKE ' /* ddl_entry=query- % ' GROUP BY query) "
rows = instance . query ( query )
2021-02-12 19:23:50 +00:00
assert len ( rows ) > 0 and rows [ 0 ] [ 0 ] == " 1 " , " dublicates on {} {} : {} " . format (
instance . name , instance . ip_address , rows
)
2019-11-05 18:47:15 +00:00
@staticmethod
def insert_reliable ( instance , query_insert ) :
"""
Make retries in case of UNKNOWN_STATUS_OF_INSERT or zkutil : : KeeperException errors
"""
2020-10-02 16:54:07 +00:00
for i in range ( 100 ) :
2019-11-05 18:47:15 +00:00
try :
instance . query ( query_insert )
return
except Exception as e :
last_exception = e
s = str ( e )
if not (
s . find ( " Unknown status, client must retry " ) > = 0
or s . find ( " zkutil::KeeperException " )
) :
raise e
2019-12-19 19:39:49 +00:00
raise last_exception