ClickHouse/tests/integration/test_storage_kerberized_kafka/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

290 lines
9.1 KiB
Python
Raw Normal View History

import os.path as p
import random
import threading
import time
import pytest
2021-02-24 11:46:58 +00:00
import logging
2024-05-16 14:43:54 +00:00
from helpers.cluster import ClickHouseCluster, is_arm
from helpers.test_tools import TSV
from helpers.client import QueryRuntimeException
import json
import subprocess
import kafka.errors
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer, BrokerConnection
from kafka.admin import NewTopic
from kafka.protocol.admin import DescribeGroupsResponse_v1, DescribeGroupsRequest_v1
from kafka.protocol.group import MemberAssignment
import socket
2024-05-16 14:43:54 +00:00
if is_arm():
# skip due to no arm support for clickhouse/kerberos-kdc docker image
pytestmark = pytest.mark.skip
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
"instance",
main_configs=["configs/kafka.xml"],
user_configs=["configs/users.xml"],
with_kerberized_kafka=True,
clickhouse_path_dir="clickhouse_path",
)
2020-10-02 16:54:07 +00:00
def producer_serializer(x):
return x.encode() if isinstance(x, str) else x
2021-02-18 21:21:50 +00:00
2021-02-18 21:21:50 +00:00
def get_kafka_producer(port, serializer):
errors = []
for _ in range(15):
try:
producer = KafkaProducer(
bootstrap_servers="localhost:{}".format(port),
value_serializer=serializer,
)
2021-02-18 21:21:50 +00:00
logging.debug("Kafka Connection establised: localhost:{}".format(port))
return producer
except Exception as e:
errors += [str(e)]
time.sleep(1)
2021-11-30 09:56:10 +00:00
raise Exception("Connection not establised, {}".format(errors))
2021-02-18 21:21:50 +00:00
2021-02-18 21:21:50 +00:00
def kafka_produce(kafka_cluster, topic, messages, timestamp=None):
logging.debug(
"kafka_produce server:{}:{} topic:{}".format(
"localhost", kafka_cluster.kerberized_kafka_port, topic
)
)
producer = get_kafka_producer(
kafka_cluster.kerberized_kafka_port, producer_serializer
)
for message in messages:
producer.send(topic=topic, value=message, timestamp_ms=timestamp)
producer.flush()
# Fixtures
@pytest.fixture(scope="module")
def kafka_cluster():
try:
cluster.start()
2021-08-19 11:32:32 +00:00
if instance.is_debug_build():
# https://github.com/ClickHouse/ClickHouse/issues/27651
pytest.skip(
"librdkafka calls system function for kinit which does not pass harmful check in debug build"
)
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def kafka_setup_teardown():
instance.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
yield # run test
# Tests
def test_kafka_json_as_string(kafka_cluster):
kafka_produce(
kafka_cluster,
"kafka_json_as_string",
[
'{"t": 123, "e": {"x": "woof"} }',
"",
'{"t": 124, "e": {"x": "test"} }',
'{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}',
],
)
instance.query(
"""
CREATE TABLE test.kafka (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string',
2021-11-30 09:56:10 +00:00
kafka_commit_on_select = 1,
kafka_group_name = 'kafka_json_as_string',
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
"""
)
2021-01-29 16:09:48 +00:00
time.sleep(3)
result = instance.query("SELECT * FROM test.kafka;")
expected = """\
{"t": 123, "e": {"x": "woof"} }
{"t": 124, "e": {"x": "test"} }
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
"""
assert TSV(result) == TSV(expected)
assert instance.contains_in_log(
"Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows"
)
def test_kafka_json_as_string_request_new_ticket_after_expiration(kafka_cluster):
# Ticket should be expired after the wait time
# On run of SELECT query new ticket should be requested and SELECT query should run fine.
kafka_produce(
kafka_cluster,
"kafka_json_as_string",
[
'{"t": 123, "e": {"x": "woof"} }',
"",
'{"t": 124, "e": {"x": "test"} }',
'{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}',
],
)
instance.query(
"""
CREATE TABLE test.kafka (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string',
kafka_commit_on_select = 1,
kafka_group_name = 'kafka_json_as_string',
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
"""
)
time.sleep(45) # wait for ticket expiration
result = instance.query("SELECT * FROM test.kafka;")
expected = """\
{"t": 123, "e": {"x": "woof"} }
{"t": 124, "e": {"x": "test"} }
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
"""
assert TSV(result) == TSV(expected)
assert instance.contains_in_log(
"Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows"
)
def test_kafka_json_as_string_no_kdc(kafka_cluster):
2021-12-17 19:49:40 +00:00
# When the test is run alone (not preceded by any other kerberized kafka test),
# we need a ticket to
# assert instance.contains_in_log("Ticket expired")
instance.query(
"""
CREATE TABLE test.kafka_no_kdc_warm_up (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string_no_kdc_warm_up',
kafka_group_name = 'kafka_json_as_string_no_kdc_warm_up',
kafka_commit_on_select = 1,
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
"""
)
instance.query("SELECT * FROM test.kafka_no_kdc_warm_up;")
kafka_produce(
kafka_cluster,
"kafka_json_as_string_no_kdc",
[
'{"t": 123, "e": {"x": "woof"} }',
"",
'{"t": 124, "e": {"x": "test"} }',
'{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}',
],
)
kafka_cluster.pause_container("kafka_kerberos")
time.sleep(45) # wait for ticket expiration
instance.query(
"""
CREATE TABLE test.kafka_no_kdc (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string_no_kdc',
kafka_group_name = 'kafka_json_as_string_no_kdc',
2021-11-30 09:56:10 +00:00
kafka_commit_on_select = 1,
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
"""
)
result = instance.query("SELECT * FROM test.kafka_no_kdc;")
expected = ""
kafka_cluster.unpause_container("kafka_kerberos")
assert TSV(result) == TSV(expected)
assert instance.contains_in_log("StorageKafka (kafka_no_kdc): Nothing to commit")
assert instance.contains_in_log("Ticket expired")
2022-06-07 11:59:46 +00:00
assert instance.contains_in_log("KerberosInit failure:")
def test_kafka_config_from_sql_named_collection(kafka_cluster):
kafka_produce(
kafka_cluster,
"kafka_json_as_string",
[
'{"t": 123, "e": {"x": "woof"} }',
"",
'{"t": 124, "e": {"x": "test"} }',
'{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}',
],
)
instance.query(
"""
CREATE NAMED COLLECTION kafka_config AS
kafka.security_protocol = 'SASL_PLAINTEXT',
kafka.sasl_mechanism = 'GSSAPI',
kafka.sasl_kerberos_service_name = 'kafka',
kafka.sasl_kerberos_keytab = '/tmp/keytab/clickhouse.keytab',
kafka.sasl_kerberos_principal = 'anotherkafkauser/instance@TEST.CLICKHOUSE.TECH',
kafka.debug = 'security',
kafka.api_version_request = 'false',
kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string',
kafka_commit_on_select = 1,
kafka_group_name = 'kafka_json_as_string',
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
"""
)
instance.query(
"""
CREATE TABLE test.kafka (field String)
ENGINE = Kafka(kafka_config);
"""
)
time.sleep(3)
result = instance.query("SELECT * FROM test.kafka;")
expected = """\
{"t": 123, "e": {"x": "woof"} }
{"t": 124, "e": {"x": "test"} }
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
"""
assert TSV(result) == TSV(expected)
assert instance.contains_in_log(
"Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows"
)
if __name__ == "__main__":
cluster.start()
2020-10-02 16:54:07 +00:00
input("Cluster created, press any key to destroy...")
cluster.shutdown()