kdc_network_reject: test passed

This commit is contained in:
Ilya Golshtein 2024-08-30 21:46:56 +00:00
parent ccaa8d03a9
commit 7f637f7acf
2 changed files with 33 additions and 18 deletions

View File

@ -303,6 +303,7 @@ class _NetworkManager:
destination_port=None,
action=None,
probability=None,
protocol=None,
custom_args=None,
):
ret = []
@ -317,7 +318,7 @@ class _NetworkManager:
str(probability),
]
)
ret.extend(["-p", "tcp"])
ret.extend(["-p", "tcp" if protocol is None else protocol])
if source is not None:
ret.extend(["-s", source])
if destination is not None:

View File

@ -8,6 +8,7 @@ import logging
from helpers.cluster import ClickHouseCluster, is_arm
from helpers.test_tools import TSV
from helpers.client import QueryRuntimeException
from helpers.network import PartitionManager
import json
import subprocess
@ -204,27 +205,40 @@ def test_kafka_json_as_string_no_kdc(kafka_cluster):
],
)
kafka_cluster.pause_container("kafka_kerberos")
time.sleep(45) # wait for ticket expiration
# temporary prevent CH - KDC communications
with PartitionManager() as pm:
for other_node in ["kafka_kerberos"]:
for node in kafka_cluster.instances.values():
source = node.ip_address
destination = kafka_cluster.get_instance_ip(other_node)
logging.debug(f"partitioning source {source}, destination {destination}")
pm._add_rule(
{
"source": source,
"destination": destination,
"action": "REJECT",
"protocol": "all"
}
)
instance.query(
"""
CREATE TABLE test.kafka_no_kdc (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string_no_kdc',
kafka_group_name = 'kafka_json_as_string_no_kdc',
kafka_commit_on_select = 1,
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
"""
)
time.sleep(45) # wait for ticket expiration
result = instance.query("SELECT * FROM test.kafka_no_kdc;")
instance.query(
"""
CREATE TABLE test.kafka_no_kdc (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string_no_kdc',
kafka_group_name = 'kafka_json_as_string_no_kdc',
kafka_commit_on_select = 1,
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
"""
)
result = instance.query("SELECT * FROM test.kafka_no_kdc;")
expected = ""
kafka_cluster.unpause_container("kafka_kerberos")
assert TSV(result) == TSV(expected)
assert instance.contains_in_log("StorageKafka (kafka_no_kdc): Nothing to commit")
assert instance.contains_in_log("Ticket expired")