ClickHouse/tests/integration/test_storage_kerberized_kafka/test.py
2021-12-17 22:49:40 +03:00

148 lines
5.6 KiB
Python

import os.path as p
import random
import threading
import time
import pytest
import logging
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
from helpers.client import QueryRuntimeException
from helpers.network import PartitionManager
import json
import subprocess
import kafka.errors
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer, BrokerConnection
from kafka.admin import NewTopic
from kafka.protocol.admin import DescribeGroupsResponse_v1, DescribeGroupsRequest_v1
from kafka.protocol.group import MemberAssignment
import socket
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs=['configs/kafka.xml'],
user_configs=['configs/users.xml'],
with_kerberized_kafka=True,
clickhouse_path_dir="clickhouse_path")
def producer_serializer(x):
return x.encode() if isinstance(x, str) else x
def get_kafka_producer(port, serializer):
errors = []
for _ in range(15):
try:
producer = KafkaProducer(bootstrap_servers="localhost:{}".format(port), value_serializer=serializer)
logging.debug("Kafka Connection establised: localhost:{}".format(port))
return producer
except Exception as e:
errors += [str(e)]
time.sleep(1)
raise Exception("Connection not establised, {}".format(errors))
def kafka_produce(kafka_cluster, topic, messages, timestamp=None):
logging.debug("kafka_produce server:{}:{} topic:{}".format("localhost", kafka_cluster.kerberized_kafka_port, topic))
producer = get_kafka_producer(kafka_cluster.kerberized_kafka_port, producer_serializer)
for message in messages:
producer.send(topic=topic, value=message, timestamp_ms=timestamp)
producer.flush()
# Fixtures
@pytest.fixture(scope="module")
def kafka_cluster():
try:
cluster.start()
if instance.is_debug_build():
# https://github.com/ClickHouse/ClickHouse/issues/27651
pytest.skip("librdkafka calls system function for kinit which does not pass harmful check in debug build")
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def kafka_setup_teardown():
instance.query('DROP DATABASE IF EXISTS test; CREATE DATABASE test;')
yield # run test
# Tests
def test_kafka_json_as_string(kafka_cluster):
kafka_produce(kafka_cluster, 'kafka_json_as_string', ['{"t": 123, "e": {"x": "woof"} }', '', '{"t": 124, "e": {"x": "test"} }', '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}'])
instance.query('''
CREATE TABLE test.kafka (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string',
kafka_commit_on_select = 1,
kafka_group_name = 'kafka_json_as_string',
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
''')
time.sleep(3)
result = instance.query('SELECT * FROM test.kafka;')
expected = '''\
{"t": 123, "e": {"x": "woof"} }
{"t": 124, "e": {"x": "test"} }
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
'''
assert TSV(result) == TSV(expected)
assert instance.contains_in_log("Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows")
def test_kafka_json_as_string_no_kdc(kafka_cluster):
# When the test is run alone (not preceded by any other kerberized kafka test),
# we need a ticket to
# assert instance.contains_in_log("Ticket expired")
instance.query('''
CREATE TABLE test.kafka_no_kdc_warm_up (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string_no_kdc_warm_up',
kafka_group_name = 'kafka_json_as_string_no_kdc_warm_up',
kafka_commit_on_select = 1,
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
''')
instance.query('SELECT * FROM test.kafka_no_kdc_warm_up;')
kafka_produce(kafka_cluster, 'kafka_json_as_string_no_kdc', ['{"t": 123, "e": {"x": "woof"} }', '', '{"t": 124, "e": {"x": "test"} }', '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}'])
kafka_cluster.pause_container('kafka_kerberos')
time.sleep(45) # wait for ticket expiration
instance.query('''
CREATE TABLE test.kafka_no_kdc (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string_no_kdc',
kafka_group_name = 'kafka_json_as_string_no_kdc',
kafka_commit_on_select = 1,
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
''')
result = instance.query('SELECT * FROM test.kafka_no_kdc;')
expected = ''
kafka_cluster.unpause_container('kafka_kerberos')
assert TSV(result) == TSV(expected)
assert instance.contains_in_log("StorageKafka (kafka_no_kdc): Nothing to commit")
assert instance.contains_in_log("Ticket expired")
assert instance.contains_in_log("Kerberos ticket refresh failed")
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")
cluster.shutdown()