This commit is contained in:
Yatsishin Ilya 2021-04-07 15:22:53 +03:00
parent b395838415
commit 9115fd8cec
10 changed files with 101 additions and 31 deletions

View File

@ -1,7 +1,5 @@
version: '2.3' version: '2.3'
services: services:
cassandra1: cassandra1:
image: cassandra image: cassandra:4.0
restart: always restart: always
ports:
- ${CASSANDRA_EXTERNAL_PORT}:${CASSANDRA_INTERNAL_PORT}

View File

@ -2,6 +2,7 @@ version: '2.3'
services: services:
postgres1: postgres1:
image: postgres image: postgres
command: ["postgres", "-c", "logging_collector=on", "-c", "log_directory=/postgres/logs", "-c", "log_filename=postgresql.log", "-c", "log_statement=all"]
restart: always restart: always
expose: expose:
- ${POSTGRES_PORT} - ${POSTGRES_PORT}
@ -24,6 +25,7 @@ services:
target: /postgres/ target: /postgres/
postgres2: postgres2:
image: postgres image: postgres
command: ["postgres", "-c", "logging_collector=on", "-c", "log_directory=/postgres/logs", "-c", "log_filename=postgresql.log", "-c", "log_statement=all"]
restart: always restart: always
environment: environment:
POSTGRES_HOST_AUTH_METHOD: "trust" POSTGRES_HOST_AUTH_METHOD: "trust"
@ -34,4 +36,4 @@ services:
volumes: volumes:
- type: ${POSTGRES_LOGS_FS:-tmpfs} - type: ${POSTGRES_LOGS_FS:-tmpfs}
source: ${POSTGRES2_DIR:-} source: ${POSTGRES2_DIR:-}
target: /postgres/ target: /postgres/

View File

@ -16,6 +16,7 @@ import traceback
import urllib.parse import urllib.parse
import shlex import shlex
from cassandra.policies import RoundRobinPolicy
import cassandra.cluster import cassandra.cluster
import psycopg2 import psycopg2
import pymongo import pymongo
@ -99,7 +100,6 @@ def get_docker_compose_path():
logging.debug(f"Fallback docker_compose_path to LOCAL_DOCKER_COMPOSE_DIR: {LOCAL_DOCKER_COMPOSE_DIR}") logging.debug(f"Fallback docker_compose_path to LOCAL_DOCKER_COMPOSE_DIR: {LOCAL_DOCKER_COMPOSE_DIR}")
return LOCAL_DOCKER_COMPOSE_DIR return LOCAL_DOCKER_COMPOSE_DIR
def check_kafka_is_available(kafka_id, kafka_port): def check_kafka_is_available(kafka_id, kafka_port):
p = subprocess.Popen(('docker', p = subprocess.Popen(('docker',
'exec', 'exec',
@ -225,7 +225,9 @@ class ClickHouseCluster:
# available when with_cassandra == True # available when with_cassandra == True
self.cassandra_host = "cassandra1" self.cassandra_host = "cassandra1"
self.cassandra_port = get_open_port() self.cassandra_port = 9042
self.cassandra_ip = None
self.cassandra_id = self.get_instance_docker_id(self.cassandra_host)
# available when with_rabbitmq == True # available when with_rabbitmq == True
self.rabbitmq_host = "rabbitmq1" self.rabbitmq_host = "rabbitmq1"
@ -551,8 +553,7 @@ class ClickHouseCluster:
if with_cassandra and not self.with_cassandra: if with_cassandra and not self.with_cassandra:
self.with_cassandra = True self.with_cassandra = True
env_variables['CASSANDRA_EXTERNAL_PORT'] = str(self.cassandra_port) env_variables['CASSANDRA_PORT'] = str(self.cassandra_port)
env_variables['CASSANDRA_INTERNAL_PORT'] = "9042"
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_cassandra.yml')]) self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_cassandra.yml')])
self.base_cassandra_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name, self.base_cassandra_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_cassandra.yml')] '--file', p.join(docker_compose_yml_dir, 'docker_compose_cassandra.yml')]
@ -778,7 +779,7 @@ class ClickHouseCluster:
logging.debug("Can't connect to Mongo " + str(ex)) logging.debug("Can't connect to Mongo " + str(ex))
time.sleep(1) time.sleep(1)
def wait_minio_to_start(self, timeout=30, secure=False): def wait_minio_to_start(self, timeout=120, secure=False):
os.environ['SSL_CERT_FILE'] = p.join(self.base_dir, self.minio_dir, 'certs', 'public.crt') os.environ['SSL_CERT_FILE'] = p.join(self.base_dir, self.minio_dir, 'certs', 'public.crt')
minio_client = Minio('localhost:{}'.format(self.minio_port), minio_client = Minio('localhost:{}'.format(self.minio_port),
access_key='minio', access_key='minio',
@ -819,18 +820,27 @@ class ClickHouseCluster:
logging.debug(("Can't connect to SchemaRegistry: %s", str(ex))) logging.debug(("Can't connect to SchemaRegistry: %s", str(ex)))
time.sleep(1) time.sleep(1)
def wait_cassandra_to_start(self, timeout=30): raise Exception("Can't wait Schema Registry to start")
cass_client = cassandra.cluster.Cluster(["localhost"], self.cassandra_port)
def wait_cassandra_to_start(self, timeout=120):
self.cassandra_ip = self.get_instance_ip(self.cassandra_host)
cass_client = cassandra.cluster.Cluster([self.cassandra_ip], port=self.cassandra_port, load_balancing_policy=RoundRobinPolicy())
start = time.time() start = time.time()
while time.time() - start < timeout: while time.time() - start < timeout:
try: try:
logging.info(f"Check Cassandra Online {self.cassandra_id} {self.cassandra_ip} {self.cassandra_port}")
check = self.exec_in_container(self.cassandra_id, ["bash", "-c", f"/opt/cassandra/bin/cqlsh -u cassandra -p cassandra -e 'describe keyspaces' {self.cassandra_ip} {self.cassandra_port}"], user='root')
logging.info("Cassandra Online")
cass_client.connect() cass_client.connect()
logging.info("Connected to Cassandra") logging.info("Connected Clients to Cassandra")
return return
except Exception as ex: except Exception as ex:
logging.warning("Can't connect to Cassandra: %s", str(ex)) logging.warning("Can't connect to Cassandra: %s", str(ex))
time.sleep(1) time.sleep(1)
raise Exception("Can't wait Cassandra to start")
def start(self, destroy_dirs=True): def start(self, destroy_dirs=True):
logging.debug("Cluster start called. is_up={}, destroy_dirs={}".format(self.is_up, destroy_dirs)) logging.debug("Cluster start called. is_up={}, destroy_dirs={}".format(self.is_up, destroy_dirs))
if self.is_up: if self.is_up:
@ -844,6 +854,8 @@ class ClickHouseCluster:
if not subprocess_call(['docker-compose', 'kill']): if not subprocess_call(['docker-compose', 'kill']):
subprocess_call(['docker-compose', 'down', '--volumes']) subprocess_call(['docker-compose', 'down', '--volumes'])
logging.debug("Unstopped containers killed") logging.debug("Unstopped containers killed")
subprocess_call(['docker-compose', 'ps', '--services', '--all'])
except: except:
pass pass
@ -895,7 +907,7 @@ class ClickHouseCluster:
os.makedirs(self.mysql_logs_dir) os.makedirs(self.mysql_logs_dir)
os.chmod(self.mysql_logs_dir, stat.S_IRWXO) os.chmod(self.mysql_logs_dir, stat.S_IRWXO)
subprocess_check_call(self.base_mysql_cmd + common_opts) subprocess_check_call(self.base_mysql_cmd + common_opts)
self.wait_mysql_to_start(120) self.wait_mysql_to_start(180)
if self.with_mysql8 and self.base_mysql8_cmd: if self.with_mysql8 and self.base_mysql8_cmd:
logging.debug('Setup MySQL 8') logging.debug('Setup MySQL 8')
@ -916,8 +928,7 @@ class ClickHouseCluster:
os.chmod(self.postgres2_logs_dir, stat.S_IRWXO) os.chmod(self.postgres2_logs_dir, stat.S_IRWXO)
subprocess_check_call(self.base_postgres_cmd + common_opts) subprocess_check_call(self.base_postgres_cmd + common_opts)
self.wait_postgres_to_start(30) self.wait_postgres_to_start(120)
self.wait_postgres_to_start(30)
if self.with_kafka and self.base_kafka_cmd: if self.with_kafka and self.base_kafka_cmd:
logging.debug('Setup Kafka') logging.debug('Setup Kafka')
@ -987,7 +998,7 @@ class ClickHouseCluster:
subprocess_check_call(clickhouse_start_cmd) subprocess_check_call(clickhouse_start_cmd)
logging.debug("ClickHouse instance created") logging.debug("ClickHouse instance created")
start_deadline = time.time() + 120.0 # seconds start_deadline = time.time() + 180.0 # seconds
for instance in self.instances.values(): for instance in self.instances.values():
instance.docker_client = self.docker_client instance.docker_client = self.docker_client
instance.ip_address = self.get_instance_ip(instance.name) instance.ip_address = self.get_instance_ip(instance.name)
@ -1334,8 +1345,20 @@ class ClickHouseInstance:
if not self.stay_alive: if not self.stay_alive:
raise Exception("clickhouse can be stopped only with stay_alive=True instance") raise Exception("clickhouse can be stopped only with stay_alive=True instance")
self.exec_in_container(["bash", "-c", "pkill {} clickhouse".format("-9" if kill else "")], user='root') try:
time.sleep(start_wait_sec) ps_clickhouse = self.exec_in_container(["bash", "-c", "ps -C clickhouse"], user='root')
if ps_clickhouse == " PID TTY STAT TIME COMMAND" :
logging.warning("ClickHouse process already stopped")
return
self.exec_in_container(["bash", "-c", "pkill {} clickhouse".format("-9" if kill else "")], user='root')
time.sleep(start_wait_sec)
ps_clickhouse = self.exec_in_container(["bash", "-c", "ps -C clickhouse"], user='root')
if ps_clickhouse != " PID TTY STAT TIME COMMAND" :
logging.warning(f"Force kill clickhouse in stop_clickhouse. ps:{ps_clickhouse}")
self.stop_clickhouse(kill=True)
except Exception as e:
logging.warning(f"Stop ClickHouse raised an error {e}")
def start_clickhouse(self, stop_wait_sec=5): def start_clickhouse(self, stop_wait_sec=5):
if not self.stay_alive: if not self.stay_alive:
@ -1360,8 +1383,10 @@ class ClickHouseInstance:
return len(result) > 0 return len(result) > 0
def grep_in_log(self, substring): def grep_in_log(self, substring):
logging.debug(f"grep in log called {substring}")
result = self.exec_in_container( result = self.exec_in_container(
["bash", "-c", 'grep "{}" /var/log/clickhouse-server/clickhouse-server.log || true'.format(substring)]) ["bash", "-c", 'grep "{}" /var/log/clickhouse-server/clickhouse-server.log || true'.format(substring)])
logging.debug(f"grep result {result}")
return result return result
def count_in_log(self, substring): def count_in_log(self, substring):

View File

@ -462,6 +462,9 @@ class SourceCassandra(ExternalSource):
) )
def prepare(self, structure, table_name, cluster): def prepare(self, structure, table_name, cluster):
if self.internal_hostname is None:
self.internal_hostname = cluster.cassandra_ip
self.client = cassandra.cluster.Cluster([self.internal_hostname], port=self.internal_port) self.client = cassandra.cluster.Cluster([self.internal_hostname], port=self.internal_port)
self.session = self.client.connect() self.session = self.client.connect()
self.session.execute( self.session.execute(

View File

@ -131,6 +131,48 @@ test_postgresql_protocol/test.py::test_psql_is_ready
test_postgresql_protocol/test.py::test_psql_client test_postgresql_protocol/test.py::test_psql_client
test_postgresql_protocol/test.py::test_python_client test_postgresql_protocol/test.py::test_python_client
test_postgresql_protocol/test.py::test_java_client test_postgresql_protocol/test.py::test_java_client
test_storage_kafka/test.py::test_kafka_json_as_string
test_storage_kafka/test.py::test_kafka_formats
test_storage_kafka/test.py::test_kafka_settings_old_syntax
test_storage_kafka/test.py::test_kafka_settings_new_syntax
test_storage_kafka/test.py::test_kafka_issue11308
test_storage_kafka/test.py::test_kafka_issue4116
test_storage_kafka/test.py::test_kafka_consumer_hang
test_storage_kafka/test.py::test_kafka_consumer_hang2
test_storage_kafka/test.py::test_kafka_csv_with_delimiter
test_storage_kafka/test.py::test_kafka_tsv_with_delimiter
test_storage_kafka/test.py::test_kafka_select_empty
test_storage_kafka/test.py::test_kafka_json_without_delimiter
test_storage_kafka/test.py::test_kafka_protobuf
test_storage_kafka/test.py::test_kafka_string_field_on_first_position_in_protobuf
test_storage_kafka/test.py::test_kafka_protobuf_no_delimiter
test_storage_kafka/test.py::test_kafka_materialized_view
test_storage_kafka/test.py::test_librdkafka_compression
test_storage_kafka/test.py::test_kafka_materialized_view_with_subquery
test_storage_kafka/test.py::test_kafka_many_materialized_views
test_storage_kafka/test.py::test_kafka_flush_on_big_message
test_storage_kafka/test.py::test_kafka_virtual_columns
test_storage_kafka/test.py::test_kafka_virtual_columns_with_materialized_view
test_storage_kafka/test.py::test_kafka_insert
test_storage_kafka/test.py::test_kafka_produce_consume
test_storage_kafka/test.py::test_kafka_commit_on_block_write
test_storage_kafka/test.py::test_kafka_virtual_columns2
test_storage_kafka/test.py::test_kafka_produce_key_timestamp
test_storage_kafka/test.py::test_kafka_flush_by_time
test_storage_kafka/test.py::test_kafka_flush_by_block_size
test_storage_kafka/test.py::test_kafka_lot_of_partitions_partial_commit_of_bulk
test_storage_kafka/test.py::test_kafka_rebalance
test_storage_kafka/test.py::test_kafka_no_holes_when_write_suffix_failed
test_storage_kafka/test.py::test_exception_from_destructor
test_storage_kafka/test.py::test_commits_of_unprocessed_messages_on_drop
test_storage_kafka/test.py::test_bad_reschedule
test_storage_kafka/test.py::test_kafka_duplicates_when_commit_failed
test_storage_kafka/test.py::test_premature_flush_on_eof
test_storage_kafka/test.py::test_kafka_unavailable
test_storage_kafka/test.py::test_kafka_issue14202
test_storage_kafka/test.py::test_kafka_csv_with_thread_per_consumer
test_storage_kerberized_kafka/test.py::test_kafka_json_as_string
test_storage_kerberized_kafka/test.py::test_kafka_json_as_string_no_kdc
test_storage_mysql/test.py::test_many_connections test_storage_mysql/test.py::test_many_connections
test_storage_mysql/test.py::test_insert_select test_storage_mysql/test.py::test_insert_select
test_storage_mysql/test.py::test_replace_select test_storage_mysql/test.py::test_replace_select

View File

@ -17,19 +17,19 @@ def started_cluster():
cluster.start() cluster.start()
for node in [node1, node2]: for node in [node1, node2]:
node.query(''' node.query_with_retry('''
CREATE TABLE test_table_replicated(date Date, id UInt32, value Int32) CREATE TABLE IF NOT EXISTS test_table_replicated(date Date, id UInt32, value Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/sometable', '{replica}') ORDER BY id; ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/sometable', '{replica}') ORDER BY id;
'''.format(replica=node.name)) '''.format(replica=node.name))
node.query('''CREATE TABLE test_table(date Date, id UInt32, value Int32) ENGINE=MergeTree ORDER BY id''') node.query_with_retry('''CREATE TABLE IF NOT EXISTS test_table(date Date, id UInt32, value Int32) ENGINE=MergeTree ORDER BY id''')
for node in [node3, node4]: for node in [node3, node4]:
node.query(''' node.query_with_retry('''
CREATE TABLE test_table_replicated(date Date, id UInt32, value Int32) CREATE TABLE IF NOT EXISTS test_table_replicated(date Date, id UInt32, value Int32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/1/someotable', '{replica}') ORDER BY id; ENGINE = ReplicatedMergeTree('/clickhouse/tables/1/someotable', '{replica}') ORDER BY id;
'''.format(replica=node.name)) '''.format(replica=node.name))
node.query('''CREATE TABLE test_table(date Date, id UInt32, value Int32) ENGINE=MergeTree ORDER BY id''') node.query_with_retry('''CREATE TABLE IF NOT EXISTS test_table(date Date, id UInt32, value Int32) ENGINE=MergeTree ORDER BY id''')
yield cluster yield cluster

View File

@ -116,7 +116,7 @@ def test_limited_ttl_merges_in_empty_pool_replicated(started_cluster):
node1.query("SYSTEM STOP TTL MERGES") node1.query("SYSTEM STOP TTL MERGES")
for i in range(100): for i in range(100):
node1.query("INSERT INTO replicated_ttl SELECT now() - INTERVAL 1 MONTH, {}, number FROM numbers(1)".format(i)) node1.query_with_retry("INSERT INTO replicated_ttl SELECT now() - INTERVAL 1 MONTH, {}, number FROM numbers(1)".format(i))
assert node1.query("SELECT COUNT() FROM replicated_ttl") == "100\n" assert node1.query("SELECT COUNT() FROM replicated_ttl") == "100\n"
@ -147,7 +147,7 @@ def test_limited_ttl_merges_two_replicas(started_cluster):
node2.query("SYSTEM STOP TTL MERGES") node2.query("SYSTEM STOP TTL MERGES")
for i in range(100): for i in range(100):
node1.query( node1.query_with_retry(
"INSERT INTO replicated_ttl_2 SELECT now() - INTERVAL 1 MONTH, {}, number FROM numbers(10000)".format(i)) "INSERT INTO replicated_ttl_2 SELECT now() - INTERVAL 1 MONTH, {}, number FROM numbers(10000)".format(i))
node2.query("SYSTEM SYNC REPLICA replicated_ttl_2", timeout=10) node2.query("SYSTEM SYNC REPLICA replicated_ttl_2", timeout=10)

View File

@ -45,7 +45,7 @@ CREATE TABLE distributed(date Date, id UInt32, shard_id UInt32)
2017-06-16 333 2 2017-06-16 333 2
''' '''
node1.query("INSERT INTO distributed FORMAT TSV", stdin=to_insert) node1.query("INSERT INTO distributed FORMAT TSV", stdin=to_insert)
time.sleep(0.5) time.sleep(5)
yield cluster yield cluster

View File

@ -20,14 +20,14 @@ def started_cluster():
def test_non_leader_replica(started_cluster): def test_non_leader_replica(started_cluster):
node1.query('''CREATE TABLE sometable(id UInt32, value String) node1.query_with_retry('''CREATE TABLE IF NOT EXISTS sometable(id UInt32, value String)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/sometable', '1') ORDER BY tuple()''') ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/sometable', '1') ORDER BY tuple()''')
node2.query('''CREATE TABLE sometable(id UInt32, value String) node2.query_with_retry('''CREATE TABLE IF NOT EXISTS sometable(id UInt32, value String)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/sometable', '2') ORDER BY tuple() SETTINGS replicated_can_become_leader = 0''') ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/sometable', '2') ORDER BY tuple() SETTINGS replicated_can_become_leader = 0''')
node1.query("INSERT INTO sometable SELECT number, toString(number) FROM numbers(100)") node1.query("INSERT INTO sometable SELECT number, toString(number) FROM numbers(100)")
node2.query("SYSTEM SYNC REPLICA sometable", timeout=10) node2.query_with_retry("SYSTEM SYNC REPLICA sometable", timeout=10)
assert node1.query("SELECT COUNT() FROM sometable") == "100\n" assert node1.query("SELECT COUNT() FROM sometable") == "100\n"
assert node2.query("SELECT COUNT() FROM sometable") == "100\n" assert node2.query("SELECT COUNT() FROM sometable") == "100\n"

View File

@ -25,7 +25,7 @@ def setup_module(module):
cluster = ClickHouseCluster(__file__, name=test_name) cluster = ClickHouseCluster(__file__, name=test_name)
SOURCE = SourceCassandra("Cassandra", "localhost", cluster.cassandra_port, cluster.cassandra_host, "9042", "", "") SOURCE = SourceCassandra("Cassandra", None, cluster.cassandra_port, cluster.cassandra_host, cluster.cassandra_port, "", "")
simple_tester = SimpleLayoutTester(test_name) simple_tester = SimpleLayoutTester(test_name)
simple_tester.cleanup() simple_tester.cleanup()