diff --git a/tests/integration/helpers/network.py b/tests/integration/helpers/network.py index 065836396f3..f24b5924e73 100644 --- a/tests/integration/helpers/network.py +++ b/tests/integration/helpers/network.py @@ -303,6 +303,7 @@ class _NetworkManager: destination_port=None, action=None, probability=None, + protocol=None, custom_args=None, ): ret = [] @@ -317,7 +318,7 @@ class _NetworkManager: str(probability), ] ) - ret.extend(["-p", "tcp"]) + ret.extend(["-p", "tcp" if protocol is None else protocol]) if source is not None: ret.extend(["-s", source]) if destination is not None: diff --git a/tests/integration/test_storage_kerberized_kafka/test.py b/tests/integration/test_storage_kerberized_kafka/test.py index 24d10d7ff83..a00914543c6 100644 --- a/tests/integration/test_storage_kerberized_kafka/test.py +++ b/tests/integration/test_storage_kerberized_kafka/test.py @@ -8,6 +8,7 @@ import logging from helpers.cluster import ClickHouseCluster, is_arm from helpers.test_tools import TSV from helpers.client import QueryRuntimeException +from helpers.network import PartitionManager import json import subprocess @@ -138,7 +139,7 @@ def test_kafka_json_as_string_request_new_ticket_after_expiration(kafka_cluster) kafka_produce( kafka_cluster, - "kafka_json_as_string", + "kafka_json_as_string_after_expiration", [ '{"t": 123, "e": {"x": "woof"} }', "", @@ -152,9 +153,9 @@ def test_kafka_json_as_string_request_new_ticket_after_expiration(kafka_cluster) CREATE TABLE test.kafka (field String) ENGINE = Kafka SETTINGS kafka_broker_list = 'kerberized_kafka1:19092', - kafka_topic_list = 'kafka_json_as_string', + kafka_topic_list = 'kafka_json_as_string_after_expiration', kafka_commit_on_select = 1, - kafka_group_name = 'kafka_json_as_string', + kafka_group_name = 'kafka_json_as_string_after_expiration', kafka_format = 'JSONAsString', kafka_flush_interval_ms=1000; """ @@ -170,7 +171,7 @@ def test_kafka_json_as_string_request_new_ticket_after_expiration(kafka_cluster) """ assert TSV(result) == TSV(expected) assert instance.contains_in_log( - "Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows" + "Parsing of message (topic: kafka_json_as_string_after_expiration, partition: 0, offset: 1) return no rows" ) @@ -204,27 +205,40 @@ def test_kafka_json_as_string_no_kdc(kafka_cluster): ], ) - kafka_cluster.pause_container("kafka_kerberos") - time.sleep(45) # wait for ticket expiration + # temporary prevent CH - KDC communications + with PartitionManager() as pm: + other_node = "kafka_kerberos" + for node in kafka_cluster.instances.values(): + source = node.ip_address + destination = kafka_cluster.get_instance_ip(other_node) + logging.debug(f"partitioning source {source}, destination {destination}") + pm._add_rule( + { + "source": source, + "destination": destination, + "action": "REJECT", + "protocol": "all", + } + ) - instance.query( - """ - CREATE TABLE test.kafka_no_kdc (field String) - ENGINE = Kafka - SETTINGS kafka_broker_list = 'kerberized_kafka1:19092', - kafka_topic_list = 'kafka_json_as_string_no_kdc', - kafka_group_name = 'kafka_json_as_string_no_kdc', - kafka_commit_on_select = 1, - kafka_format = 'JSONAsString', - kafka_flush_interval_ms=1000; - """ - ) + time.sleep(45) # wait for ticket expiration - result = instance.query("SELECT * FROM test.kafka_no_kdc;") + instance.query( + """ + CREATE TABLE test.kafka_no_kdc (field String) + ENGINE = Kafka + SETTINGS kafka_broker_list = 'kerberized_kafka1:19092', + kafka_topic_list = 'kafka_json_as_string_no_kdc', + kafka_group_name = 'kafka_json_as_string_no_kdc', + kafka_commit_on_select = 1, + kafka_format = 'JSONAsString', + kafka_flush_interval_ms=1000; + """ + ) + + result = instance.query("SELECT * FROM test.kafka_no_kdc;") expected = "" - kafka_cluster.unpause_container("kafka_kerberos") - assert TSV(result) == TSV(expected) assert instance.contains_in_log("StorageKafka (kafka_no_kdc): Nothing to commit") assert instance.contains_in_log("Ticket expired") @@ -234,7 +248,7 @@ def test_kafka_json_as_string_no_kdc(kafka_cluster): def test_kafka_config_from_sql_named_collection(kafka_cluster): kafka_produce( kafka_cluster, - "kafka_json_as_string", + "kafka_json_as_string_named_collection", [ '{"t": 123, "e": {"x": "woof"} }', "", @@ -245,6 +259,7 @@ def test_kafka_config_from_sql_named_collection(kafka_cluster): instance.query( """ + DROP NAMED COLLECTION IF EXISTS kafka_config; CREATE NAMED COLLECTION kafka_config AS kafka.security_protocol = 'SASL_PLAINTEXT', kafka.sasl_mechanism = 'GSSAPI', @@ -255,9 +270,9 @@ def test_kafka_config_from_sql_named_collection(kafka_cluster): kafka.api_version_request = 'false', kafka_broker_list = 'kerberized_kafka1:19092', - kafka_topic_list = 'kafka_json_as_string', + kafka_topic_list = 'kafka_json_as_string_named_collection', kafka_commit_on_select = 1, - kafka_group_name = 'kafka_json_as_string', + kafka_group_name = 'kafka_json_as_string_named_collection', kafka_format = 'JSONAsString', kafka_flush_interval_ms=1000; """ @@ -279,7 +294,7 @@ def test_kafka_config_from_sql_named_collection(kafka_cluster): """ assert TSV(result) == TSV(expected) assert instance.contains_in_log( - "Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows" + "Parsing of message (topic: kafka_json_as_string_named_collection, partition: 0, offset: 1) return no rows" )