Revert "Revert "Test and doc for PR12771 krb5 + cyrus-sasl + kerberized kafka""

This reverts commit c298c633a7.
This commit is contained in:
Alexander Tokmakov 2020-09-29 11:56:37 +03:00
parent 51b9aaf4d8
commit a7d3a024c7
20 changed files with 505 additions and 7 deletions

1
.gitmodules vendored
View File

@ -186,3 +186,4 @@
[submodule "contrib/cyrus-sasl"]
path = contrib/cyrus-sasl
url = https://github.com/cyrusimap/cyrus-sasl
branch = cyrus-sasl-2.1

View File

@ -14,10 +14,10 @@ if (NOT ENABLE_RDKAFKA)
return()
endif()
if (NOT ARCH_ARM AND USE_LIBGSASL)
if (NOT ARCH_ARM)
option (USE_INTERNAL_RDKAFKA_LIBRARY "Set to FALSE to use system librdkafka instead of the bundled" ${NOT_UNBUNDLED})
elseif(USE_INTERNAL_RDKAFKA_LIBRARY)
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't use internal librdkafka with ARCH_ARM=${ARCH_ARM} AND USE_LIBGSASL=${USE_LIBGSASL}")
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't use internal librdkafka with ARCH_ARM=${ARCH_ARM}")
endif ()
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/cppkafka/CMakeLists.txt")

2
contrib/cyrus-sasl vendored

@ -1 +1 @@
Subproject commit 6054630889fd1cd8d0659573d69badcee1e23a00
Subproject commit 9995bf9d8e14f58934d9313ac64f13780d6dd3c9

View File

@ -133,6 +133,10 @@
"name": "yandex/clickhouse-postgresql-java-client",
"dependent": []
},
"docker/test/integration/kerberos_kdc": {
"name": "yandex/clickhouse-kerberos-kdc",
"dependent": []
},
"docker/test/base": {
"name": "yandex/clickhouse-test-base",
"dependent": [

View File

@ -16,7 +16,8 @@ RUN apt-get update \
odbc-postgresql \
sqlite3 \
curl \
tar
tar \
krb5-user
RUN rm -rf \
/var/lib/apt/lists/* \
/var/cache/debconf \

View File

@ -0,0 +1,15 @@
# docker build -t yandex/clickhouse-kerberos-kdc .
FROM centos:6.6
# old OS to make is faster and smaller
RUN yum install -y krb5-server krb5-libs krb5-auth-dialog krb5-workstation
EXPOSE 88 749
RUN touch /config.sh
# should be overwritten e.g. via docker_compose volumes
# volumes: /some_path/my_kerberos_config.sh:/config.sh:ro
ENTRYPOINT ["/bin/bash", "/config.sh"]

View File

@ -0,0 +1,59 @@
version: '2.3'
services:
kafka_kerberized_zookeeper:
image: confluentinc/cp-zookeeper:5.2.0
# restart: always
hostname: kafka_kerberized_zookeeper
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_SERVERS: "kafka_kerberized_zookeeper:2888:3888"
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/zookeeper_jaas.conf -Djava.security.krb5.conf=/etc/kafka/secrets/krb.conf -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider -Dsun.security.krb5.debug=true"
volumes:
- ${KERBERIZED_KAFKA_DIR}/secrets:/etc/kafka/secrets
- /dev/urandom:/dev/random
depends_on:
- kafka_kerberos
security_opt:
- label:disable
kerberized_kafka1:
image: confluentinc/cp-kafka:5.2.0
# restart: always
hostname: kerberized_kafka1
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_LISTENERS: OUTSIDE://:19092,UNSECURED_OUTSIDE://:19093,UNSECURED_INSIDE://:9093
KAFKA_ADVERTISED_LISTENERS: OUTSIDE://kerberized_kafka1:19092,UNSECURED_OUTSIDE://kerberized_kafka1:19093,UNSECURED_INSIDE://localhost:9093
# KAFKA_LISTENERS: INSIDE://kerberized_kafka1:9092,OUTSIDE://kerberized_kafka1:19092
# KAFKA_ADVERTISED_LISTENERS: INSIDE://localhost:9092,OUTSIDE://kerberized_kafka1:19092
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: GSSAPI
KAFKA_SASL_ENABLED_MECHANISMS: GSSAPI
KAFKA_SASL_KERBEROS_SERVICE_NAME: kafka
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: OUTSIDE:SASL_PLAINTEXT,UNSECURED_OUTSIDE:PLAINTEXT,UNSECURED_INSIDE:PLAINTEXT,
KAFKA_INTER_BROKER_LISTENER_NAME: OUTSIDE
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "kafka_kerberized_zookeeper:2181"
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/broker_jaas.conf -Djava.security.krb5.conf=/etc/kafka/secrets/krb.conf -Dsun.security.krb5.debug=true"
volumes:
- ${KERBERIZED_KAFKA_DIR}/secrets:/etc/kafka/secrets
- /dev/urandom:/dev/random
depends_on:
- kafka_kerberized_zookeeper
- kafka_kerberos
security_opt:
- label:disable
kafka_kerberos:
image: yandex/clickhouse-kerberos-kdc:${DOCKER_KERBEROS_KDC_TAG}
hostname: kafka_kerberos
volumes:
- ${KERBERIZED_KAFKA_DIR}/secrets:/tmp/keytab
- ${KERBERIZED_KAFKA_DIR}/../../kerberos_image_config.sh:/config.sh
- /dev/urandom:/dev/random
ports: [88, 749]

View File

@ -27,6 +27,7 @@ export DOCKER_MYSQL_JAVA_CLIENT_TAG=${DOCKER_MYSQL_JAVA_CLIENT_TAG:=latest}
export DOCKER_MYSQL_JS_CLIENT_TAG=${DOCKER_MYSQL_JS_CLIENT_TAG:=latest}
export DOCKER_MYSQL_PHP_CLIENT_TAG=${DOCKER_MYSQL_PHP_CLIENT_TAG:=latest}
export DOCKER_POSTGRESQL_JAVA_CLIENT_TAG=${DOCKER_POSTGRESQL_JAVA_CLIENT_TAG:=latest}
export DOCKER_KERBEROS_KDC_TAG=${DOCKER_KERBEROS_KDC_TAG:=latest}
cd /ClickHouse/tests/integration
exec "$@"

View File

@ -165,6 +165,22 @@ Similar to GraphiteMergeTree, the Kafka engine supports extended configuration u
For a list of possible configuration options, see the [librdkafka configuration reference](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Use the underscore (`_`) instead of a dot in the ClickHouse configuration. For example, `check.crcs=true` will be `<check_crcs>true</check_crcs>`.
### Kerberos support {#kafka-kerberos-support}
To deal with Kerberos-aware Kafka, add `security_protocol` child element with `sasl_plaintext` value. It is enough if Kerberos ticket-granting ticket is obtained and cached by OS facilities.
ClickHouse is able to maintain Kerberos credentials using a keytab file. Consider `sasl_kerberos_service_name`, `sasl_kerberos_keytab`, `sasl_kerberos_principal` and `sasl.kerberos.kinit.cmd` child elements.
Example:
``` xml
<!-- Kerberos-aware Kafka -->
<kafka>
<security_protocol>SASL_PLAINTEXT</security_protocol>
<sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
<sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
</kafka>
```
## Virtual Columns {#virtual-columns}
- `_topic` — Kafka topic.

View File

@ -45,7 +45,6 @@ def _create_env_file(path, variables, fname=DEFAULT_ENV_NAME):
f.write("=".join([var, value]) + "\n")
return full_path
def subprocess_check_call(args):
# Uncomment for debugging
# print('run:', ' ' . join(args))
@ -125,6 +124,7 @@ class ClickHouseCluster:
self.base_zookeeper_cmd = None
self.base_mysql_cmd = []
self.base_kafka_cmd = []
self.base_kerberized_kafka_cmd = []
self.base_rabbitmq_cmd = []
self.base_cassandra_cmd = []
self.pre_zookeeper_commands = []
@ -133,6 +133,7 @@ class ClickHouseCluster:
self.with_mysql = False
self.with_postgres = False
self.with_kafka = False
self.with_kerberized_kafka = False
self.with_rabbitmq = False
self.with_odbc_drivers = False
self.with_hdfs = False
@ -169,7 +170,7 @@ class ClickHouseCluster:
def add_instance(self, name, base_config_dir=None, main_configs=None, user_configs=None, dictionaries=None,
macros=None,
with_zookeeper=False, with_mysql=False, with_kafka=False, with_rabbitmq=False,
with_zookeeper=False, with_mysql=False, with_kafka=False, with_kerberized_kafka=False, with_rabbitmq=False,
clickhouse_path_dir=None,
with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False,
with_redis=False, with_minio=False, with_cassandra=False,
@ -207,6 +208,7 @@ class ClickHouseCluster:
zookeeper_config_path=self.zookeeper_config_path,
with_mysql=with_mysql,
with_kafka=with_kafka,
with_kerberized_kafka=with_kerberized_kafka,
with_rabbitmq=with_rabbitmq,
with_mongo=with_mongo,
with_redis=with_redis,
@ -290,6 +292,13 @@ class ClickHouseCluster:
p.join(docker_compose_yml_dir, 'docker_compose_kafka.yml')]
cmds.append(self.base_kafka_cmd)
if with_kerberized_kafka and not self.with_kerberized_kafka:
self.with_kerberized_kafka = True
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_kafka.yml')])
self.base_kerberized_kafka_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_kafka.yml')]
cmds.append(self.base_kerberized_kafka_cmd)
if with_rabbitmq and not self.with_rabbitmq:
self.with_rabbitmq = True
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_rabbitmq.yml')])
@ -608,6 +617,11 @@ class ClickHouseCluster:
self.kafka_docker_id = self.get_instance_docker_id('kafka1')
self.wait_schema_registry_to_start(120)
if self.with_kerberized_kafka and self.base_kerberized_kafka_cmd:
env = os.environ.copy()
env['KERBERIZED_KAFKA_DIR'] = instance.path + '/'
subprocess.check_call(self.base_kerberized_kafka_cmd + common_opts + ['--renew-anon-volumes'], env=env)
self.kerberized_kafka_docker_id = self.get_instance_docker_id('kerberized_kafka1')
if self.with_rabbitmq and self.base_rabbitmq_cmd:
subprocess_check_call(self.base_rabbitmq_cmd + common_opts + ['--renew-anon-volumes'])
self.rabbitmq_docker_id = self.get_instance_docker_id('rabbitmq1')
@ -788,9 +802,12 @@ services:
- {instance_config_dir}:/etc/clickhouse-server/
- {db_dir}:/var/lib/clickhouse/
- {logs_dir}:/var/log/clickhouse-server/
- /etc/passwd:/etc/passwd:ro
{binary_volume}
{odbc_bridge_volume}
{odbc_ini_path}
{keytab_path}
{krb5_conf}
entrypoint: {entrypoint_cmd}
tmpfs: {tmpfs}
cap_add:
@ -820,7 +837,7 @@ class ClickHouseInstance:
def __init__(
self, cluster, base_path, name, base_config_dir, custom_main_configs, custom_user_configs,
custom_dictionaries,
macros, with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_rabbitmq, with_mongo,
macros, with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_kerberized_kafka, with_rabbitmq, with_mongo,
with_redis, with_minio,
with_cassandra, server_bin_path, odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers,
hostname=None, env_variables=None,
@ -839,6 +856,7 @@ class ClickHouseInstance:
self.custom_user_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_user_configs]
self.custom_dictionaries_paths = [p.abspath(p.join(base_path, c)) for c in custom_dictionaries]
self.clickhouse_path_dir = p.abspath(p.join(base_path, clickhouse_path_dir)) if clickhouse_path_dir else None
self.kerberos_secrets_dir = p.abspath(p.join(base_path, 'secrets'))
self.macros = macros if macros is not None else {}
self.with_zookeeper = with_zookeeper
self.zookeeper_config_path = zookeeper_config_path
@ -848,6 +866,7 @@ class ClickHouseInstance:
self.with_mysql = with_mysql
self.with_kafka = with_kafka
self.with_kerberized_kafka = with_kerberized_kafka
self.with_rabbitmq = with_rabbitmq
self.with_mongo = with_mongo
self.with_redis = with_redis
@ -863,6 +882,13 @@ class ClickHouseInstance:
else:
self.odbc_ini_path = ""
if with_kerberized_kafka:
self.keytab_path = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets:/tmp/keytab"
self.krb5_conf = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets/krb.conf:/etc/krb5.conf:ro"
else:
self.keytab_path = ""
self.krb5_conf = ""
self.docker_client = None
self.ip_address = None
self.client = None
@ -1192,6 +1218,9 @@ class ClickHouseInstance:
if self.with_zookeeper:
shutil.copy(self.zookeeper_config_path, conf_d_dir)
if self.with_kerberized_kafka:
shutil.copytree(self.kerberos_secrets_dir, p.abspath(p.join(self.path, 'secrets')))
# Copy config.d configs
print "Copy custom test config files {} to {}".format(self.custom_main_config_paths, self.config_d_dir)
for path in self.custom_main_config_paths:
@ -1227,6 +1256,9 @@ class ClickHouseInstance:
depends_on.append("kafka1")
depends_on.append("schema-registry")
if self.with_kerberized_kafka:
depends_on.append("kerberized_kafka1")
if self.with_rabbitmq:
depends_on.append("rabbitmq1")
@ -1290,6 +1322,8 @@ class ClickHouseInstance:
user=os.getuid(),
env_file=env_file,
odbc_ini_path=odbc_ini_path,
keytab_path=self.keytab_path,
krb5_conf=self.krb5_conf,
entrypoint_cmd=entrypoint_cmd,
networks=networks,
app_net=app_net,

View File

@ -156,6 +156,8 @@ if __name__ == "__main__":
env_tags += "-e {}={} ".format("DOCKER_POSTGRESQL_JAVA_CLIENT_TAG", tag)
elif image == "yandex/clickhouse-integration-test":
env_tags += "-e {}={}".format("DOCKER_BASE_TAG", tag)
elif image == "yandex/clickhouse-kerberos-kdc":
env_tags += "-e {}={}".format("DOCKER_KERBEROS_KDC_TAG", tag)
else:
logging.info("Unknown image {}".format(image))

View File

@ -0,0 +1,26 @@
<yandex>
<kafka>
<auto_offset_reset>earliest</auto_offset_reset>
<!-- Debugging of possible issues, like:
- https://github.com/edenhill/librdkafka/issues/2077
- https://github.com/edenhill/librdkafka/issues/1778
- #5615
XXX: for now this messages will appears in stderr.
-->
<security_protocol>SASL_PLAINTEXT</security_protocol>
<sasl_mechanism>GSSAPI</sasl_mechanism>
<sasl_kerberos_service_name>kafka</sasl_kerberos_service_name>
<sasl_kerberos_keytab>/tmp/keytab/clickhouse.keytab</sasl_kerberos_keytab>
<sasl_kerberos_principal>kafkauser/instance@TEST.CLICKHOUSE.TECH</sasl_kerberos_principal>
<debug>security</debug>
<api_version_request>false</api_version_request>
</kafka>
<kafka_consumer_hang>
<!-- default: 3000 -->
<heartbeat_interval_ms>300</heartbeat_interval_ms>
<!-- default: 10000 -->
<session_timeout_ms>6000</session_timeout_ms>
</kafka_consumer_hang>
</yandex>

View File

@ -0,0 +1,11 @@
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,132 @@
#!/bin/bash
set -x # trace
: "${REALM:=TEST.CLICKHOUSE.TECH}"
: "${DOMAIN_REALM:=test.clickhouse.tech}"
: "${KERB_MASTER_KEY:=masterkey}"
: "${KERB_ADMIN_USER:=admin}"
: "${KERB_ADMIN_PASS:=admin}"
create_config() {
: "${KDC_ADDRESS:=$(hostname -f)}"
cat>/etc/krb5.conf<<EOF
[logging]
default = FILE:/var/log/kerberos/krb5libs.log
kdc = FILE:/var/log/kerberos/krb5kdc.log
admin_server = FILE:/var/log/kerberos/kadmind.log
[libdefaults]
default_realm = $REALM
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 15s
renew_lifetime = 15s
forwardable = true
# WARNING: We use weaker key types to simplify testing as stronger key types
# require the enhanced security JCE policy file to be installed. You should
# NOT run with this configuration in production or any real environment. You
# have been warned.
default_tkt_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
default_tgs_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
permitted_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
[realms]
$REALM = {
kdc = $KDC_ADDRESS
admin_server = $KDC_ADDRESS
}
[domain_realm]
.$DOMAIN_REALM = $REALM
$DOMAIN_REALM = $REALM
EOF
cat>/var/kerberos/krb5kdc/kdc.conf<<EOF
[kdcdefaults]
kdc_ports = 88
kdc_tcp_ports = 88
[realms]
$REALM = {
acl_file = /var/kerberos/krb5kdc/kadm5.acl
dict_file = /usr/share/dict/words
admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab
# WARNING: We use weaker key types to simplify testing as stronger key types
# require the enhanced security JCE policy file to be installed. You should
# NOT run with this configuration in production or any real environment. You
# have been warned.
master_key_type = des3-hmac-sha1
supported_enctypes = arcfour-hmac:normal des3-hmac-sha1:normal des-cbc-crc:normal des:normal des:v4 des:norealm des:onlyrealm des:afs3
default_principal_flags = +preauth
}
EOF
}
create_db() {
/usr/sbin/kdb5_util -P $KERB_MASTER_KEY -r $REALM create -s
}
start_kdc() {
mkdir -p /var/log/kerberos
/etc/rc.d/init.d/krb5kdc start
/etc/rc.d/init.d/kadmin start
chkconfig krb5kdc on
chkconfig kadmin on
}
restart_kdc() {
/etc/rc.d/init.d/krb5kdc restart
/etc/rc.d/init.d/kadmin restart
}
create_admin_user() {
kadmin.local -q "addprinc -pw $KERB_ADMIN_PASS $KERB_ADMIN_USER/admin"
echo "*/admin@$REALM *" > /var/kerberos/krb5kdc/kadm5.acl
}
create_keytabs() {
kadmin.local -q "addprinc -randkey zookeeper/kafka_kerberized_zookeeper@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/kafka_kerberized_zookeeper.keytab zookeeper/kafka_kerberized_zookeeper@${REALM}"
kadmin.local -q "addprinc -randkey kafka/kerberized_kafka1@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/kerberized_kafka.keytab kafka/kerberized_kafka1@${REALM}"
kadmin.local -q "addprinc -randkey zkclient@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/zkclient.keytab zkclient@${REALM}"
kadmin.local -q "addprinc -randkey kafkauser/instance@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab kafkauser/instance@${REALM}"
chmod g+r /tmp/keytab/clickhouse.keytab
}
main() {
if [ ! -f /kerberos_initialized ]; then
create_config
create_db
create_admin_user
start_kdc
touch /kerberos_initialized
fi
if [ ! -f /var/kerberos/krb5kdc/principal ]; then
while true; do sleep 1000; done
else
start_kdc
create_keytabs
tail -F /var/log/kerberos/krb5kdc.log
fi
}
[[ "$0" == "${BASH_SOURCE[0]}" ]] && main "$@"

View File

@ -0,0 +1,14 @@
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/secrets/kerberized_kafka.keytab"
principal="kafka/kerberized_kafka1@TEST.CLICKHOUSE.TECH";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/secrets/zkclient.keytab"
principal="zkclient@TEST.CLICKHOUSE.TECH";
};

View File

@ -0,0 +1,22 @@
[logging]
default = FILE:/var/log/kerberos/krb5libs.log
kdc = FILE:/var/log/kerberos/krb5kdc.log
admin_server = FILE:/var/log/kerberos/kadmind.log
[libdefaults]
default_realm = TEST.CLICKHOUSE.TECH
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 15s
renew_lifetime = 15s
forwardable = true
[realms]
TEST.CLICKHOUSE.TECH = {
kdc = kafka_kerberos
admin_server = kafka_kerberos
}
[domain_realm]
.TEST.CLICKHOUSE.TECH = TEST.CLICKHOUSE.TECH
TEST.CLICKHOUSE.TECH = TEST.CLICKHOUSE.TECH

View File

@ -0,0 +1,14 @@
Server {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/secrets/kafka_kerberized_zookeeper.keytab"
principal="zookeeper/kafka_kerberized_zookeeper@TEST.CLICKHOUSE.TECH";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/kafka/secrets/zkclient.keytab"
principal="zkclient@TEST.CLICKHOUSE.TECH";
};

View File

@ -0,0 +1,146 @@
import os.path as p
import random
import threading
import time
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
from helpers.client import QueryRuntimeException
from helpers.network import PartitionManager
import json
import subprocess
import kafka.errors
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer, BrokerConnection
from kafka.admin import NewTopic
from kafka.protocol.admin import DescribeGroupsResponse_v1, DescribeGroupsRequest_v1
from kafka.protocol.group import MemberAssignment
import socket
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs=['configs/kafka.xml', 'configs/log_conf.xml' ],
with_kerberized_kafka=True,
clickhouse_path_dir="clickhouse_path"
)
kafka_id = '' # instance.cluster.kafka_docker_id
# Helpers
def check_kafka_is_available():
# plaintext
p = subprocess.Popen(('docker',
'exec',
'-i',
kafka_id,
'/usr/bin/kafka-broker-api-versions',
'--bootstrap-server',
'localhost:9093'),
stdout=subprocess.PIPE)
p.communicate()
return p.returncode == 0
def wait_kafka_is_available(max_retries=50):
retries = 0
while True:
if check_kafka_is_available():
break
else:
retries += 1
if retries > max_retries:
raise "Kafka is not available"
print("Waiting for Kafka to start up")
time.sleep(1)
def kafka_produce(topic, messages, timestamp=None):
producer = KafkaProducer(bootstrap_servers="localhost:9093")
for message in messages:
producer.send(topic=topic, value=message, timestamp_ms=timestamp)
producer.flush()
print ("Produced {} messages for topic {}".format(len(messages), topic))
# Fixtures
@pytest.fixture(scope="module")
def kafka_cluster():
try:
global kafka_id
cluster.start()
kafka_id = instance.cluster.kerberized_kafka_docker_id
print("kafka_id is {}".format(kafka_id))
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def kafka_setup_teardown():
instance.query('DROP DATABASE IF EXISTS test; CREATE DATABASE test;')
wait_kafka_is_available()
print("kafka is available - running test")
yield # run test
# Tests
@pytest.mark.timeout(180) # wait to build containers
def test_kafka_json_as_string(kafka_cluster):
kafka_produce('kafka_json_as_string', ['{"t": 123, "e": {"x": "woof"} }', '', '{"t": 124, "e": {"x": "test"} }', '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}'])
instance.query('''
CREATE TABLE test.kafka (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string',
kafka_group_name = 'kafka_json_as_string',
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
''')
result = instance.query('SELECT * FROM test.kafka;')
expected = '''\
{"t": 123, "e": {"x": "woof"} }
{"t": 124, "e": {"x": "test"} }
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
'''
assert TSV(result) == TSV(expected)
assert instance.contains_in_log("Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows")
def test_kafka_json_as_string_no_kdc(kafka_cluster):
kafka_produce('kafka_json_as_string_no_kdc', ['{"t": 123, "e": {"x": "woof"} }', '', '{"t": 124, "e": {"x": "test"} }', '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}'])
kafka_cluster.pause_container('kafka_kerberos')
time.sleep(45) # wait for ticket expiration
instance.query('''
CREATE TABLE test.kafka_no_kdc (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string_no_kdc',
kafka_group_name = 'kafka_json_as_string_no_kdc',
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
''')
result = instance.query('SELECT * FROM test.kafka_no_kdc;')
expected = ''
kafka_cluster.unpause_container('kafka_kerberos')
assert TSV(result) == TSV(expected)
assert instance.contains_in_log("StorageKafka (kafka_no_kdc): Nothing to commit")
assert instance.contains_in_log("Ticket expired")
assert instance.contains_in_log("Kerberos ticket refresh failed")
if __name__ == '__main__':
cluster.start()
raw_input("Cluster created, press any key to destroy...")
cluster.shutdown()