Merge pull request #69136 from ilejn/kdc_network_reject

Do not use docker pause for Kerberos KDC container in integration tests
This commit is contained in:
Alexey Milovidov 2024-09-08 05:00:29 +00:00 committed by GitHub
commit 7d670bf3c2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 42 additions and 26 deletions

View File

@ -303,6 +303,7 @@ class _NetworkManager:
destination_port=None,
action=None,
probability=None,
protocol=None,
custom_args=None,
):
ret = []
@ -317,7 +318,7 @@ class _NetworkManager:
str(probability),
]
)
ret.extend(["-p", "tcp"])
ret.extend(["-p", "tcp" if protocol is None else protocol])
if source is not None:
ret.extend(["-s", source])
if destination is not None:

View File

@ -8,6 +8,7 @@ import logging
from helpers.cluster import ClickHouseCluster, is_arm
from helpers.test_tools import TSV
from helpers.client import QueryRuntimeException
from helpers.network import PartitionManager
import json
import subprocess
@ -138,7 +139,7 @@ def test_kafka_json_as_string_request_new_ticket_after_expiration(kafka_cluster)
kafka_produce(
kafka_cluster,
"kafka_json_as_string",
"kafka_json_as_string_after_expiration",
[
'{"t": 123, "e": {"x": "woof"} }',
"",
@ -152,9 +153,9 @@ def test_kafka_json_as_string_request_new_ticket_after_expiration(kafka_cluster)
CREATE TABLE test.kafka (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string',
kafka_topic_list = 'kafka_json_as_string_after_expiration',
kafka_commit_on_select = 1,
kafka_group_name = 'kafka_json_as_string',
kafka_group_name = 'kafka_json_as_string_after_expiration',
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
"""
@ -170,7 +171,7 @@ def test_kafka_json_as_string_request_new_ticket_after_expiration(kafka_cluster)
"""
assert TSV(result) == TSV(expected)
assert instance.contains_in_log(
"Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows"
"Parsing of message (topic: kafka_json_as_string_after_expiration, partition: 0, offset: 1) return no rows"
)
@ -204,27 +205,40 @@ def test_kafka_json_as_string_no_kdc(kafka_cluster):
],
)
kafka_cluster.pause_container("kafka_kerberos")
time.sleep(45) # wait for ticket expiration
# temporary prevent CH - KDC communications
with PartitionManager() as pm:
other_node = "kafka_kerberos"
for node in kafka_cluster.instances.values():
source = node.ip_address
destination = kafka_cluster.get_instance_ip(other_node)
logging.debug(f"partitioning source {source}, destination {destination}")
pm._add_rule(
{
"source": source,
"destination": destination,
"action": "REJECT",
"protocol": "all",
}
)
instance.query(
"""
CREATE TABLE test.kafka_no_kdc (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string_no_kdc',
kafka_group_name = 'kafka_json_as_string_no_kdc',
kafka_commit_on_select = 1,
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
"""
)
time.sleep(45) # wait for ticket expiration
result = instance.query("SELECT * FROM test.kafka_no_kdc;")
instance.query(
"""
CREATE TABLE test.kafka_no_kdc (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string_no_kdc',
kafka_group_name = 'kafka_json_as_string_no_kdc',
kafka_commit_on_select = 1,
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
"""
)
result = instance.query("SELECT * FROM test.kafka_no_kdc;")
expected = ""
kafka_cluster.unpause_container("kafka_kerberos")
assert TSV(result) == TSV(expected)
assert instance.contains_in_log("StorageKafka (kafka_no_kdc): Nothing to commit")
assert instance.contains_in_log("Ticket expired")
@ -234,7 +248,7 @@ def test_kafka_json_as_string_no_kdc(kafka_cluster):
def test_kafka_config_from_sql_named_collection(kafka_cluster):
kafka_produce(
kafka_cluster,
"kafka_json_as_string",
"kafka_json_as_string_named_collection",
[
'{"t": 123, "e": {"x": "woof"} }',
"",
@ -245,6 +259,7 @@ def test_kafka_config_from_sql_named_collection(kafka_cluster):
instance.query(
"""
DROP NAMED COLLECTION IF EXISTS kafka_config;
CREATE NAMED COLLECTION kafka_config AS
kafka.security_protocol = 'SASL_PLAINTEXT',
kafka.sasl_mechanism = 'GSSAPI',
@ -255,9 +270,9 @@ def test_kafka_config_from_sql_named_collection(kafka_cluster):
kafka.api_version_request = 'false',
kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string',
kafka_topic_list = 'kafka_json_as_string_named_collection',
kafka_commit_on_select = 1,
kafka_group_name = 'kafka_json_as_string',
kafka_group_name = 'kafka_json_as_string_named_collection',
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
"""
@ -279,7 +294,7 @@ def test_kafka_config_from_sql_named_collection(kafka_cluster):
"""
assert TSV(result) == TSV(expected)
assert instance.contains_in_log(
"Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows"
"Parsing of message (topic: kafka_json_as_string_named_collection, partition: 0, offset: 1) return no rows"
)