diff --git a/docker/test/integration/runner/compose/docker_compose_cassandra.yml b/docker/test/integration/runner/compose/docker_compose_cassandra.yml index e8aed06c08a..b6190a11d73 100644 --- a/docker/test/integration/runner/compose/docker_compose_cassandra.yml +++ b/docker/test/integration/runner/compose/docker_compose_cassandra.yml @@ -1,7 +1,5 @@ version: '2.3' services: cassandra1: - image: cassandra + image: cassandra:4.0 restart: always - ports: - - ${CASSANDRA_EXTERNAL_PORT}:${CASSANDRA_INTERNAL_PORT} diff --git a/docker/test/integration/runner/compose/docker_compose_postgres.yml b/docker/test/integration/runner/compose/docker_compose_postgres.yml index 3b760e2f84b..a2f205c7afd 100644 --- a/docker/test/integration/runner/compose/docker_compose_postgres.yml +++ b/docker/test/integration/runner/compose/docker_compose_postgres.yml @@ -2,6 +2,7 @@ version: '2.3' services: postgres1: image: postgres + command: ["postgres", "-c", "logging_collector=on", "-c", "log_directory=/postgres/logs", "-c", "log_filename=postgresql.log", "-c", "log_statement=all"] restart: always expose: - ${POSTGRES_PORT} @@ -24,6 +25,7 @@ services: target: /postgres/ postgres2: image: postgres + command: ["postgres", "-c", "logging_collector=on", "-c", "log_directory=/postgres/logs", "-c", "log_filename=postgresql.log", "-c", "log_statement=all"] restart: always environment: POSTGRES_HOST_AUTH_METHOD: "trust" @@ -34,4 +36,4 @@ services: volumes: - type: ${POSTGRES_LOGS_FS:-tmpfs} source: ${POSTGRES2_DIR:-} - target: /postgres/ \ No newline at end of file + target: /postgres/ diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index 0a64f8fdc7b..3f26135dccd 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -16,6 +16,7 @@ import traceback import urllib.parse import shlex +from cassandra.policies import RoundRobinPolicy import cassandra.cluster import psycopg2 import pymongo @@ -99,7 +100,6 @@ def get_docker_compose_path(): logging.debug(f"Fallback docker_compose_path to LOCAL_DOCKER_COMPOSE_DIR: {LOCAL_DOCKER_COMPOSE_DIR}") return LOCAL_DOCKER_COMPOSE_DIR - def check_kafka_is_available(kafka_id, kafka_port): p = subprocess.Popen(('docker', 'exec', @@ -225,7 +225,9 @@ class ClickHouseCluster: # available when with_cassandra == True self.cassandra_host = "cassandra1" - self.cassandra_port = get_open_port() + self.cassandra_port = 9042 + self.cassandra_ip = None + self.cassandra_id = self.get_instance_docker_id(self.cassandra_host) # available when with_rabbitmq == True self.rabbitmq_host = "rabbitmq1" @@ -551,8 +553,7 @@ class ClickHouseCluster: if with_cassandra and not self.with_cassandra: self.with_cassandra = True - env_variables['CASSANDRA_EXTERNAL_PORT'] = str(self.cassandra_port) - env_variables['CASSANDRA_INTERNAL_PORT'] = "9042" + env_variables['CASSANDRA_PORT'] = str(self.cassandra_port) self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_cassandra.yml')]) self.base_cassandra_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name, '--file', p.join(docker_compose_yml_dir, 'docker_compose_cassandra.yml')] @@ -778,7 +779,7 @@ class ClickHouseCluster: logging.debug("Can't connect to Mongo " + str(ex)) time.sleep(1) - def wait_minio_to_start(self, timeout=30, secure=False): + def wait_minio_to_start(self, timeout=120, secure=False): os.environ['SSL_CERT_FILE'] = p.join(self.base_dir, self.minio_dir, 'certs', 'public.crt') minio_client = Minio('localhost:{}'.format(self.minio_port), access_key='minio', @@ -819,18 +820,27 @@ class ClickHouseCluster: logging.debug(("Can't connect to SchemaRegistry: %s", str(ex))) time.sleep(1) - def wait_cassandra_to_start(self, timeout=30): - cass_client = cassandra.cluster.Cluster(["localhost"], self.cassandra_port) + raise Exception("Can't wait Schema Registry to start") + + + def wait_cassandra_to_start(self, timeout=120): + self.cassandra_ip = self.get_instance_ip(self.cassandra_host) + cass_client = cassandra.cluster.Cluster([self.cassandra_ip], port=self.cassandra_port, load_balancing_policy=RoundRobinPolicy()) start = time.time() while time.time() - start < timeout: try: + logging.info(f"Check Cassandra Online {self.cassandra_id} {self.cassandra_ip} {self.cassandra_port}") + check = self.exec_in_container(self.cassandra_id, ["bash", "-c", f"/opt/cassandra/bin/cqlsh -u cassandra -p cassandra -e 'describe keyspaces' {self.cassandra_ip} {self.cassandra_port}"], user='root') + logging.info("Cassandra Online") cass_client.connect() - logging.info("Connected to Cassandra") + logging.info("Connected Clients to Cassandra") return except Exception as ex: logging.warning("Can't connect to Cassandra: %s", str(ex)) time.sleep(1) + raise Exception("Can't wait Cassandra to start") + def start(self, destroy_dirs=True): logging.debug("Cluster start called. is_up={}, destroy_dirs={}".format(self.is_up, destroy_dirs)) if self.is_up: @@ -844,6 +854,8 @@ class ClickHouseCluster: if not subprocess_call(['docker-compose', 'kill']): subprocess_call(['docker-compose', 'down', '--volumes']) logging.debug("Unstopped containers killed") + subprocess_call(['docker-compose', 'ps', '--services', '--all']) + except: pass @@ -895,7 +907,7 @@ class ClickHouseCluster: os.makedirs(self.mysql_logs_dir) os.chmod(self.mysql_logs_dir, stat.S_IRWXO) subprocess_check_call(self.base_mysql_cmd + common_opts) - self.wait_mysql_to_start(120) + self.wait_mysql_to_start(180) if self.with_mysql8 and self.base_mysql8_cmd: logging.debug('Setup MySQL 8') @@ -916,8 +928,7 @@ class ClickHouseCluster: os.chmod(self.postgres2_logs_dir, stat.S_IRWXO) subprocess_check_call(self.base_postgres_cmd + common_opts) - self.wait_postgres_to_start(30) - self.wait_postgres_to_start(30) + self.wait_postgres_to_start(120) if self.with_kafka and self.base_kafka_cmd: logging.debug('Setup Kafka') @@ -987,7 +998,7 @@ class ClickHouseCluster: subprocess_check_call(clickhouse_start_cmd) logging.debug("ClickHouse instance created") - start_deadline = time.time() + 120.0 # seconds + start_deadline = time.time() + 180.0 # seconds for instance in self.instances.values(): instance.docker_client = self.docker_client instance.ip_address = self.get_instance_ip(instance.name) @@ -1334,8 +1345,20 @@ class ClickHouseInstance: if not self.stay_alive: raise Exception("clickhouse can be stopped only with stay_alive=True instance") - self.exec_in_container(["bash", "-c", "pkill {} clickhouse".format("-9" if kill else "")], user='root') - time.sleep(start_wait_sec) + try: + ps_clickhouse = self.exec_in_container(["bash", "-c", "ps -C clickhouse"], user='root') + if ps_clickhouse == " PID TTY STAT TIME COMMAND" : + logging.warning("ClickHouse process already stopped") + return + + self.exec_in_container(["bash", "-c", "pkill {} clickhouse".format("-9" if kill else "")], user='root') + time.sleep(start_wait_sec) + ps_clickhouse = self.exec_in_container(["bash", "-c", "ps -C clickhouse"], user='root') + if ps_clickhouse != " PID TTY STAT TIME COMMAND" : + logging.warning(f"Force kill clickhouse in stop_clickhouse. ps:{ps_clickhouse}") + self.stop_clickhouse(kill=True) + except Exception as e: + logging.warning(f"Stop ClickHouse raised an error {e}") def start_clickhouse(self, stop_wait_sec=5): if not self.stay_alive: @@ -1360,8 +1383,10 @@ class ClickHouseInstance: return len(result) > 0 def grep_in_log(self, substring): + logging.debug(f"grep in log called {substring}") result = self.exec_in_container( ["bash", "-c", 'grep "{}" /var/log/clickhouse-server/clickhouse-server.log || true'.format(substring)]) + logging.debug(f"grep result {result}") return result def count_in_log(self, substring): diff --git a/tests/integration/helpers/external_sources.py b/tests/integration/helpers/external_sources.py index 66cd45583a7..32ebdfa58c6 100644 --- a/tests/integration/helpers/external_sources.py +++ b/tests/integration/helpers/external_sources.py @@ -462,6 +462,9 @@ class SourceCassandra(ExternalSource): ) def prepare(self, structure, table_name, cluster): + if self.internal_hostname is None: + self.internal_hostname = cluster.cassandra_ip + self.client = cassandra.cluster.Cluster([self.internal_hostname], port=self.internal_port) self.session = self.client.connect() self.session.execute( diff --git a/tests/integration/parallel.txt b/tests/integration/parallel.txt index 67daeced0f1..8fd6c6999cc 100644 --- a/tests/integration/parallel.txt +++ b/tests/integration/parallel.txt @@ -131,6 +131,48 @@ test_postgresql_protocol/test.py::test_psql_is_ready test_postgresql_protocol/test.py::test_psql_client test_postgresql_protocol/test.py::test_python_client test_postgresql_protocol/test.py::test_java_client +test_storage_kafka/test.py::test_kafka_json_as_string +test_storage_kafka/test.py::test_kafka_formats +test_storage_kafka/test.py::test_kafka_settings_old_syntax +test_storage_kafka/test.py::test_kafka_settings_new_syntax +test_storage_kafka/test.py::test_kafka_issue11308 +test_storage_kafka/test.py::test_kafka_issue4116 +test_storage_kafka/test.py::test_kafka_consumer_hang +test_storage_kafka/test.py::test_kafka_consumer_hang2 +test_storage_kafka/test.py::test_kafka_csv_with_delimiter +test_storage_kafka/test.py::test_kafka_tsv_with_delimiter +test_storage_kafka/test.py::test_kafka_select_empty +test_storage_kafka/test.py::test_kafka_json_without_delimiter +test_storage_kafka/test.py::test_kafka_protobuf +test_storage_kafka/test.py::test_kafka_string_field_on_first_position_in_protobuf +test_storage_kafka/test.py::test_kafka_protobuf_no_delimiter +test_storage_kafka/test.py::test_kafka_materialized_view +test_storage_kafka/test.py::test_librdkafka_compression +test_storage_kafka/test.py::test_kafka_materialized_view_with_subquery +test_storage_kafka/test.py::test_kafka_many_materialized_views +test_storage_kafka/test.py::test_kafka_flush_on_big_message +test_storage_kafka/test.py::test_kafka_virtual_columns +test_storage_kafka/test.py::test_kafka_virtual_columns_with_materialized_view +test_storage_kafka/test.py::test_kafka_insert +test_storage_kafka/test.py::test_kafka_produce_consume +test_storage_kafka/test.py::test_kafka_commit_on_block_write +test_storage_kafka/test.py::test_kafka_virtual_columns2 +test_storage_kafka/test.py::test_kafka_produce_key_timestamp +test_storage_kafka/test.py::test_kafka_flush_by_time +test_storage_kafka/test.py::test_kafka_flush_by_block_size +test_storage_kafka/test.py::test_kafka_lot_of_partitions_partial_commit_of_bulk +test_storage_kafka/test.py::test_kafka_rebalance +test_storage_kafka/test.py::test_kafka_no_holes_when_write_suffix_failed +test_storage_kafka/test.py::test_exception_from_destructor +test_storage_kafka/test.py::test_commits_of_unprocessed_messages_on_drop +test_storage_kafka/test.py::test_bad_reschedule +test_storage_kafka/test.py::test_kafka_duplicates_when_commit_failed +test_storage_kafka/test.py::test_premature_flush_on_eof +test_storage_kafka/test.py::test_kafka_unavailable +test_storage_kafka/test.py::test_kafka_issue14202 +test_storage_kafka/test.py::test_kafka_csv_with_thread_per_consumer +test_storage_kerberized_kafka/test.py::test_kafka_json_as_string +test_storage_kerberized_kafka/test.py::test_kafka_json_as_string_no_kdc test_storage_mysql/test.py::test_many_connections test_storage_mysql/test.py::test_insert_select test_storage_mysql/test.py::test_replace_select diff --git a/tests/integration/test_alter_on_mixed_type_cluster/test.py b/tests/integration/test_alter_on_mixed_type_cluster/test.py index 852554f009d..c22626cb379 100644 --- a/tests/integration/test_alter_on_mixed_type_cluster/test.py +++ b/tests/integration/test_alter_on_mixed_type_cluster/test.py @@ -17,19 +17,19 @@ def started_cluster(): cluster.start() for node in [node1, node2]: - node.query(''' - CREATE TABLE test_table_replicated(date Date, id UInt32, value Int32) + node.query_with_retry(''' + CREATE TABLE IF NOT EXISTS test_table_replicated(date Date, id UInt32, value Int32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/sometable', '{replica}') ORDER BY id; '''.format(replica=node.name)) - node.query('''CREATE TABLE test_table(date Date, id UInt32, value Int32) ENGINE=MergeTree ORDER BY id''') + node.query_with_retry('''CREATE TABLE IF NOT EXISTS test_table(date Date, id UInt32, value Int32) ENGINE=MergeTree ORDER BY id''') for node in [node3, node4]: - node.query(''' - CREATE TABLE test_table_replicated(date Date, id UInt32, value Int32) + node.query_with_retry(''' + CREATE TABLE IF NOT EXISTS test_table_replicated(date Date, id UInt32, value Int32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/1/someotable', '{replica}') ORDER BY id; '''.format(replica=node.name)) - node.query('''CREATE TABLE test_table(date Date, id UInt32, value Int32) ENGINE=MergeTree ORDER BY id''') + node.query_with_retry('''CREATE TABLE IF NOT EXISTS test_table(date Date, id UInt32, value Int32) ENGINE=MergeTree ORDER BY id''') yield cluster diff --git a/tests/integration/test_concurrent_ttl_merges/test.py b/tests/integration/test_concurrent_ttl_merges/test.py index ba5ed9f0758..18206a4dfb9 100644 --- a/tests/integration/test_concurrent_ttl_merges/test.py +++ b/tests/integration/test_concurrent_ttl_merges/test.py @@ -116,7 +116,7 @@ def test_limited_ttl_merges_in_empty_pool_replicated(started_cluster): node1.query("SYSTEM STOP TTL MERGES") for i in range(100): - node1.query("INSERT INTO replicated_ttl SELECT now() - INTERVAL 1 MONTH, {}, number FROM numbers(1)".format(i)) + node1.query_with_retry("INSERT INTO replicated_ttl SELECT now() - INTERVAL 1 MONTH, {}, number FROM numbers(1)".format(i)) assert node1.query("SELECT COUNT() FROM replicated_ttl") == "100\n" @@ -147,7 +147,7 @@ def test_limited_ttl_merges_two_replicas(started_cluster): node2.query("SYSTEM STOP TTL MERGES") for i in range(100): - node1.query( + node1.query_with_retry( "INSERT INTO replicated_ttl_2 SELECT now() - INTERVAL 1 MONTH, {}, number FROM numbers(10000)".format(i)) node2.query("SYSTEM SYNC REPLICA replicated_ttl_2", timeout=10) diff --git a/tests/integration/test_cross_replication/test.py b/tests/integration/test_cross_replication/test.py index 8a118934c93..cc5618e04e6 100644 --- a/tests/integration/test_cross_replication/test.py +++ b/tests/integration/test_cross_replication/test.py @@ -45,7 +45,7 @@ CREATE TABLE distributed(date Date, id UInt32, shard_id UInt32) 2017-06-16 333 2 ''' node1.query("INSERT INTO distributed FORMAT TSV", stdin=to_insert) - time.sleep(0.5) + time.sleep(5) yield cluster diff --git a/tests/integration/test_ddl_worker_non_leader/test.py b/tests/integration/test_ddl_worker_non_leader/test.py index b64f99d5345..172fc03c005 100644 --- a/tests/integration/test_ddl_worker_non_leader/test.py +++ b/tests/integration/test_ddl_worker_non_leader/test.py @@ -20,14 +20,14 @@ def started_cluster(): def test_non_leader_replica(started_cluster): - node1.query('''CREATE TABLE sometable(id UInt32, value String) + node1.query_with_retry('''CREATE TABLE IF NOT EXISTS sometable(id UInt32, value String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/sometable', '1') ORDER BY tuple()''') - node2.query('''CREATE TABLE sometable(id UInt32, value String) + node2.query_with_retry('''CREATE TABLE IF NOT EXISTS sometable(id UInt32, value String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/sometable', '2') ORDER BY tuple() SETTINGS replicated_can_become_leader = 0''') node1.query("INSERT INTO sometable SELECT number, toString(number) FROM numbers(100)") - node2.query("SYSTEM SYNC REPLICA sometable", timeout=10) + node2.query_with_retry("SYSTEM SYNC REPLICA sometable", timeout=10) assert node1.query("SELECT COUNT() FROM sometable") == "100\n" assert node2.query("SELECT COUNT() FROM sometable") == "100\n" diff --git a/tests/integration/test_dictionaries_all_layouts_separate_sources/test_cassandra.py b/tests/integration/test_dictionaries_all_layouts_separate_sources/test_cassandra.py index 1271619f1f7..65080ab40dd 100644 --- a/tests/integration/test_dictionaries_all_layouts_separate_sources/test_cassandra.py +++ b/tests/integration/test_dictionaries_all_layouts_separate_sources/test_cassandra.py @@ -25,7 +25,7 @@ def setup_module(module): cluster = ClickHouseCluster(__file__, name=test_name) - SOURCE = SourceCassandra("Cassandra", "localhost", cluster.cassandra_port, cluster.cassandra_host, "9042", "", "") + SOURCE = SourceCassandra("Cassandra", None, cluster.cassandra_port, cluster.cassandra_host, cluster.cassandra_port, "", "") simple_tester = SimpleLayoutTester(test_name) simple_tester.cleanup()