diff --git a/.gitmodules b/.gitmodules index eb21c4bfd00..865a876b276 100644 --- a/.gitmodules +++ b/.gitmodules @@ -186,3 +186,4 @@ [submodule "contrib/cyrus-sasl"] path = contrib/cyrus-sasl url = https://github.com/cyrusimap/cyrus-sasl + branch = cyrus-sasl-2.1 diff --git a/cmake/find/rdkafka.cmake b/cmake/find/rdkafka.cmake index d9f815dbcdd..ac11322f408 100644 --- a/cmake/find/rdkafka.cmake +++ b/cmake/find/rdkafka.cmake @@ -14,10 +14,10 @@ if (NOT ENABLE_RDKAFKA) return() endif() -if (NOT ARCH_ARM AND USE_LIBGSASL) +if (NOT ARCH_ARM) option (USE_INTERNAL_RDKAFKA_LIBRARY "Set to FALSE to use system librdkafka instead of the bundled" ${NOT_UNBUNDLED}) elseif(USE_INTERNAL_RDKAFKA_LIBRARY) - message (${RECONFIGURE_MESSAGE_LEVEL} "Can't use internal librdkafka with ARCH_ARM=${ARCH_ARM} AND USE_LIBGSASL=${USE_LIBGSASL}") + message (${RECONFIGURE_MESSAGE_LEVEL} "Can't use internal librdkafka with ARCH_ARM=${ARCH_ARM}") endif () if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/cppkafka/CMakeLists.txt") diff --git a/contrib/cyrus-sasl b/contrib/cyrus-sasl index 6054630889f..9995bf9d8e1 160000 --- a/contrib/cyrus-sasl +++ b/contrib/cyrus-sasl @@ -1 +1 @@ -Subproject commit 6054630889fd1cd8d0659573d69badcee1e23a00 +Subproject commit 9995bf9d8e14f58934d9313ac64f13780d6dd3c9 diff --git a/docker/images.json b/docker/images.json index 8c2cb35b004..e9e91864e1e 100644 --- a/docker/images.json +++ b/docker/images.json @@ -133,6 +133,10 @@ "name": "yandex/clickhouse-postgresql-java-client", "dependent": [] }, + "docker/test/integration/kerberos_kdc": { + "name": "yandex/clickhouse-kerberos-kdc", + "dependent": [] + }, "docker/test/base": { "name": "yandex/clickhouse-test-base", "dependent": [ diff --git a/docker/test/integration/base/Dockerfile b/docker/test/integration/base/Dockerfile index 35decd907c0..3e4e88965e0 100644 --- a/docker/test/integration/base/Dockerfile +++ b/docker/test/integration/base/Dockerfile @@ -16,7 +16,8 @@ RUN apt-get update \ odbc-postgresql \ sqlite3 \ curl \ - tar + tar \ + krb5-user RUN rm -rf \ /var/lib/apt/lists/* \ /var/cache/debconf \ diff --git a/docker/test/integration/kerberos_kdc/Dockerfile b/docker/test/integration/kerberos_kdc/Dockerfile new file mode 100644 index 00000000000..ea231b1191d --- /dev/null +++ b/docker/test/integration/kerberos_kdc/Dockerfile @@ -0,0 +1,15 @@ +# docker build -t yandex/clickhouse-kerberos-kdc . + +FROM centos:6.6 +# old OS to make is faster and smaller + +RUN yum install -y krb5-server krb5-libs krb5-auth-dialog krb5-workstation + +EXPOSE 88 749 + +RUN touch /config.sh +# should be overwritten e.g. via docker_compose volumes +# volumes: /some_path/my_kerberos_config.sh:/config.sh:ro + + +ENTRYPOINT ["/bin/bash", "/config.sh"] diff --git a/docker/test/integration/runner/compose/docker_compose_kerberized_kafka.yml b/docker/test/integration/runner/compose/docker_compose_kerberized_kafka.yml new file mode 100644 index 00000000000..3ce0000b148 --- /dev/null +++ b/docker/test/integration/runner/compose/docker_compose_kerberized_kafka.yml @@ -0,0 +1,59 @@ +version: '2.3' + +services: + kafka_kerberized_zookeeper: + image: confluentinc/cp-zookeeper:5.2.0 + # restart: always + hostname: kafka_kerberized_zookeeper + environment: + ZOOKEEPER_SERVER_ID: 1 + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_SERVERS: "kafka_kerberized_zookeeper:2888:3888" + KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/zookeeper_jaas.conf -Djava.security.krb5.conf=/etc/kafka/secrets/krb.conf -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider -Dsun.security.krb5.debug=true" + volumes: + - ${KERBERIZED_KAFKA_DIR}/secrets:/etc/kafka/secrets + - /dev/urandom:/dev/random + depends_on: + - kafka_kerberos + security_opt: + - label:disable + + kerberized_kafka1: + image: confluentinc/cp-kafka:5.2.0 + # restart: always + hostname: kerberized_kafka1 + ports: + - "9092:9092" + - "9093:9093" + environment: + KAFKA_LISTENERS: OUTSIDE://:19092,UNSECURED_OUTSIDE://:19093,UNSECURED_INSIDE://:9093 + KAFKA_ADVERTISED_LISTENERS: OUTSIDE://kerberized_kafka1:19092,UNSECURED_OUTSIDE://kerberized_kafka1:19093,UNSECURED_INSIDE://localhost:9093 + # KAFKA_LISTENERS: INSIDE://kerberized_kafka1:9092,OUTSIDE://kerberized_kafka1:19092 + # KAFKA_ADVERTISED_LISTENERS: INSIDE://localhost:9092,OUTSIDE://kerberized_kafka1:19092 + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: GSSAPI + KAFKA_SASL_ENABLED_MECHANISMS: GSSAPI + KAFKA_SASL_KERBEROS_SERVICE_NAME: kafka + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: OUTSIDE:SASL_PLAINTEXT,UNSECURED_OUTSIDE:PLAINTEXT,UNSECURED_INSIDE:PLAINTEXT, + KAFKA_INTER_BROKER_LISTENER_NAME: OUTSIDE + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: "kafka_kerberized_zookeeper:2181" + KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/broker_jaas.conf -Djava.security.krb5.conf=/etc/kafka/secrets/krb.conf -Dsun.security.krb5.debug=true" + volumes: + - ${KERBERIZED_KAFKA_DIR}/secrets:/etc/kafka/secrets + - /dev/urandom:/dev/random + depends_on: + - kafka_kerberized_zookeeper + - kafka_kerberos + security_opt: + - label:disable + + kafka_kerberos: + image: yandex/clickhouse-kerberos-kdc:${DOCKER_KERBEROS_KDC_TAG} + hostname: kafka_kerberos + volumes: + - ${KERBERIZED_KAFKA_DIR}/secrets:/tmp/keytab + - ${KERBERIZED_KAFKA_DIR}/../../kerberos_image_config.sh:/config.sh + - /dev/urandom:/dev/random + ports: [88, 749] diff --git a/docker/test/integration/runner/dockerd-entrypoint.sh b/docker/test/integration/runner/dockerd-entrypoint.sh index c38260279ed..cbdb7317b1e 100755 --- a/docker/test/integration/runner/dockerd-entrypoint.sh +++ b/docker/test/integration/runner/dockerd-entrypoint.sh @@ -27,6 +27,7 @@ export DOCKER_MYSQL_JAVA_CLIENT_TAG=${DOCKER_MYSQL_JAVA_CLIENT_TAG:=latest} export DOCKER_MYSQL_JS_CLIENT_TAG=${DOCKER_MYSQL_JS_CLIENT_TAG:=latest} export DOCKER_MYSQL_PHP_CLIENT_TAG=${DOCKER_MYSQL_PHP_CLIENT_TAG:=latest} export DOCKER_POSTGRESQL_JAVA_CLIENT_TAG=${DOCKER_POSTGRESQL_JAVA_CLIENT_TAG:=latest} +export DOCKER_KERBEROS_KDC_TAG=${DOCKER_KERBEROS_KDC_TAG:=latest} cd /ClickHouse/tests/integration exec "$@" diff --git a/docs/en/engines/table-engines/integrations/kafka.md b/docs/en/engines/table-engines/integrations/kafka.md index fe9aa2ca25e..d0a4bc928a7 100644 --- a/docs/en/engines/table-engines/integrations/kafka.md +++ b/docs/en/engines/table-engines/integrations/kafka.md @@ -165,6 +165,22 @@ Similar to GraphiteMergeTree, the Kafka engine supports extended configuration u For a list of possible configuration options, see the [librdkafka configuration reference](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Use the underscore (`_`) instead of a dot in the ClickHouse configuration. For example, `check.crcs=true` will be `true`. +### Kerberos support {#kafka-kerberos-support} + +To deal with Kerberos-aware Kafka, add `security_protocol` child element with `sasl_plaintext` value. It is enough if Kerberos ticket-granting ticket is obtained and cached by OS facilities. +ClickHouse is able to maintain Kerberos credentials using a keytab file. Consider `sasl_kerberos_service_name`, `sasl_kerberos_keytab`, `sasl_kerberos_principal` and `sasl.kerberos.kinit.cmd` child elements. + +Example: + +``` xml + + + SASL_PLAINTEXT + /home/kafkauser/kafkauser.keytab + kafkauser/kafkahost@EXAMPLE.COM + +``` + ## Virtual Columns {#virtual-columns} - `_topic` — Kafka topic. diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index 0b7fa9264bd..d99d8a17844 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -45,7 +45,6 @@ def _create_env_file(path, variables, fname=DEFAULT_ENV_NAME): f.write("=".join([var, value]) + "\n") return full_path - def subprocess_check_call(args): # Uncomment for debugging # print('run:', ' ' . join(args)) @@ -125,6 +124,7 @@ class ClickHouseCluster: self.base_zookeeper_cmd = None self.base_mysql_cmd = [] self.base_kafka_cmd = [] + self.base_kerberized_kafka_cmd = [] self.base_rabbitmq_cmd = [] self.base_cassandra_cmd = [] self.pre_zookeeper_commands = [] @@ -133,6 +133,7 @@ class ClickHouseCluster: self.with_mysql = False self.with_postgres = False self.with_kafka = False + self.with_kerberized_kafka = False self.with_rabbitmq = False self.with_odbc_drivers = False self.with_hdfs = False @@ -169,7 +170,7 @@ class ClickHouseCluster: def add_instance(self, name, base_config_dir=None, main_configs=None, user_configs=None, dictionaries=None, macros=None, - with_zookeeper=False, with_mysql=False, with_kafka=False, with_rabbitmq=False, + with_zookeeper=False, with_mysql=False, with_kafka=False, with_kerberized_kafka=False, with_rabbitmq=False, clickhouse_path_dir=None, with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False, with_redis=False, with_minio=False, with_cassandra=False, @@ -207,6 +208,7 @@ class ClickHouseCluster: zookeeper_config_path=self.zookeeper_config_path, with_mysql=with_mysql, with_kafka=with_kafka, + with_kerberized_kafka=with_kerberized_kafka, with_rabbitmq=with_rabbitmq, with_mongo=with_mongo, with_redis=with_redis, @@ -290,6 +292,13 @@ class ClickHouseCluster: p.join(docker_compose_yml_dir, 'docker_compose_kafka.yml')] cmds.append(self.base_kafka_cmd) + if with_kerberized_kafka and not self.with_kerberized_kafka: + self.with_kerberized_kafka = True + self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_kafka.yml')]) + self.base_kerberized_kafka_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', + self.project_name, '--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_kafka.yml')] + cmds.append(self.base_kerberized_kafka_cmd) + if with_rabbitmq and not self.with_rabbitmq: self.with_rabbitmq = True self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_rabbitmq.yml')]) @@ -608,6 +617,11 @@ class ClickHouseCluster: self.kafka_docker_id = self.get_instance_docker_id('kafka1') self.wait_schema_registry_to_start(120) + if self.with_kerberized_kafka and self.base_kerberized_kafka_cmd: + env = os.environ.copy() + env['KERBERIZED_KAFKA_DIR'] = instance.path + '/' + subprocess.check_call(self.base_kerberized_kafka_cmd + common_opts + ['--renew-anon-volumes'], env=env) + self.kerberized_kafka_docker_id = self.get_instance_docker_id('kerberized_kafka1') if self.with_rabbitmq and self.base_rabbitmq_cmd: subprocess_check_call(self.base_rabbitmq_cmd + common_opts + ['--renew-anon-volumes']) self.rabbitmq_docker_id = self.get_instance_docker_id('rabbitmq1') @@ -788,9 +802,12 @@ services: - {instance_config_dir}:/etc/clickhouse-server/ - {db_dir}:/var/lib/clickhouse/ - {logs_dir}:/var/log/clickhouse-server/ + - /etc/passwd:/etc/passwd:ro {binary_volume} {odbc_bridge_volume} {odbc_ini_path} + {keytab_path} + {krb5_conf} entrypoint: {entrypoint_cmd} tmpfs: {tmpfs} cap_add: @@ -820,7 +837,7 @@ class ClickHouseInstance: def __init__( self, cluster, base_path, name, base_config_dir, custom_main_configs, custom_user_configs, custom_dictionaries, - macros, with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_rabbitmq, with_mongo, + macros, with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_kerberized_kafka, with_rabbitmq, with_mongo, with_redis, with_minio, with_cassandra, server_bin_path, odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers, hostname=None, env_variables=None, @@ -839,6 +856,7 @@ class ClickHouseInstance: self.custom_user_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_user_configs] self.custom_dictionaries_paths = [p.abspath(p.join(base_path, c)) for c in custom_dictionaries] self.clickhouse_path_dir = p.abspath(p.join(base_path, clickhouse_path_dir)) if clickhouse_path_dir else None + self.kerberos_secrets_dir = p.abspath(p.join(base_path, 'secrets')) self.macros = macros if macros is not None else {} self.with_zookeeper = with_zookeeper self.zookeeper_config_path = zookeeper_config_path @@ -848,6 +866,7 @@ class ClickHouseInstance: self.with_mysql = with_mysql self.with_kafka = with_kafka + self.with_kerberized_kafka = with_kerberized_kafka self.with_rabbitmq = with_rabbitmq self.with_mongo = with_mongo self.with_redis = with_redis @@ -863,6 +882,13 @@ class ClickHouseInstance: else: self.odbc_ini_path = "" + if with_kerberized_kafka: + self.keytab_path = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets:/tmp/keytab" + self.krb5_conf = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets/krb.conf:/etc/krb5.conf:ro" + else: + self.keytab_path = "" + self.krb5_conf = "" + self.docker_client = None self.ip_address = None self.client = None @@ -1192,6 +1218,9 @@ class ClickHouseInstance: if self.with_zookeeper: shutil.copy(self.zookeeper_config_path, conf_d_dir) + if self.with_kerberized_kafka: + shutil.copytree(self.kerberos_secrets_dir, p.abspath(p.join(self.path, 'secrets'))) + # Copy config.d configs print "Copy custom test config files {} to {}".format(self.custom_main_config_paths, self.config_d_dir) for path in self.custom_main_config_paths: @@ -1227,6 +1256,9 @@ class ClickHouseInstance: depends_on.append("kafka1") depends_on.append("schema-registry") + if self.with_kerberized_kafka: + depends_on.append("kerberized_kafka1") + if self.with_rabbitmq: depends_on.append("rabbitmq1") @@ -1290,6 +1322,8 @@ class ClickHouseInstance: user=os.getuid(), env_file=env_file, odbc_ini_path=odbc_ini_path, + keytab_path=self.keytab_path, + krb5_conf=self.krb5_conf, entrypoint_cmd=entrypoint_cmd, networks=networks, app_net=app_net, diff --git a/tests/integration/runner b/tests/integration/runner index f097a42e52a..f38ab0aa042 100755 --- a/tests/integration/runner +++ b/tests/integration/runner @@ -156,6 +156,8 @@ if __name__ == "__main__": env_tags += "-e {}={} ".format("DOCKER_POSTGRESQL_JAVA_CLIENT_TAG", tag) elif image == "yandex/clickhouse-integration-test": env_tags += "-e {}={}".format("DOCKER_BASE_TAG", tag) + elif image == "yandex/clickhouse-kerberos-kdc": + env_tags += "-e {}={}".format("DOCKER_KERBEROS_KDC_TAG", tag) else: logging.info("Unknown image {}".format(image)) diff --git a/tests/integration/test_storage_kerberized_kafka/__init__.py b/tests/integration/test_storage_kerberized_kafka/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_storage_kerberized_kafka/clickhouse_path/EMPTY_DIR b/tests/integration/test_storage_kerberized_kafka/clickhouse_path/EMPTY_DIR new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_storage_kerberized_kafka/configs/kafka.xml b/tests/integration/test_storage_kerberized_kafka/configs/kafka.xml new file mode 100644 index 00000000000..0302bd78e3f --- /dev/null +++ b/tests/integration/test_storage_kerberized_kafka/configs/kafka.xml @@ -0,0 +1,26 @@ + + + earliest + + SASL_PLAINTEXT + GSSAPI + kafka + /tmp/keytab/clickhouse.keytab + kafkauser/instance@TEST.CLICKHOUSE.TECH + security + false + + + + + 300 + + 6000 + + diff --git a/tests/integration/test_storage_kerberized_kafka/configs/log_conf.xml b/tests/integration/test_storage_kerberized_kafka/configs/log_conf.xml new file mode 100644 index 00000000000..95466269afe --- /dev/null +++ b/tests/integration/test_storage_kerberized_kafka/configs/log_conf.xml @@ -0,0 +1,11 @@ + + + trace + /var/log/clickhouse-server/log.log + /var/log/clickhouse-server/log.err.log + 1000M + 10 + /var/log/clickhouse-server/stderr.log + /var/log/clickhouse-server/stdout.log + + \ No newline at end of file diff --git a/tests/integration/test_storage_kerberized_kafka/kerberos_image_config.sh b/tests/integration/test_storage_kerberized_kafka/kerberos_image_config.sh new file mode 100644 index 00000000000..dda10d47d94 --- /dev/null +++ b/tests/integration/test_storage_kerberized_kafka/kerberos_image_config.sh @@ -0,0 +1,132 @@ +#!/bin/bash + + +set -x # trace + +: "${REALM:=TEST.CLICKHOUSE.TECH}" +: "${DOMAIN_REALM:=test.clickhouse.tech}" +: "${KERB_MASTER_KEY:=masterkey}" +: "${KERB_ADMIN_USER:=admin}" +: "${KERB_ADMIN_PASS:=admin}" + +create_config() { + : "${KDC_ADDRESS:=$(hostname -f)}" + + cat>/etc/krb5.conf</var/kerberos/krb5kdc/kdc.conf< /var/kerberos/krb5kdc/kadm5.acl +} + +create_keytabs() { + + kadmin.local -q "addprinc -randkey zookeeper/kafka_kerberized_zookeeper@${REALM}" + kadmin.local -q "ktadd -norandkey -k /tmp/keytab/kafka_kerberized_zookeeper.keytab zookeeper/kafka_kerberized_zookeeper@${REALM}" + + kadmin.local -q "addprinc -randkey kafka/kerberized_kafka1@${REALM}" + kadmin.local -q "ktadd -norandkey -k /tmp/keytab/kerberized_kafka.keytab kafka/kerberized_kafka1@${REALM}" + + kadmin.local -q "addprinc -randkey zkclient@${REALM}" + kadmin.local -q "ktadd -norandkey -k /tmp/keytab/zkclient.keytab zkclient@${REALM}" + + + kadmin.local -q "addprinc -randkey kafkauser/instance@${REALM}" + kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab kafkauser/instance@${REALM}" + + chmod g+r /tmp/keytab/clickhouse.keytab + +} + +main() { + + if [ ! -f /kerberos_initialized ]; then + create_config + create_db + create_admin_user + start_kdc + + touch /kerberos_initialized + fi + + if [ ! -f /var/kerberos/krb5kdc/principal ]; then + while true; do sleep 1000; done + else + start_kdc + create_keytabs + tail -F /var/log/kerberos/krb5kdc.log + fi + +} + +[[ "$0" == "${BASH_SOURCE[0]}" ]] && main "$@" diff --git a/tests/integration/test_storage_kerberized_kafka/secrets/broker_jaas.conf b/tests/integration/test_storage_kerberized_kafka/secrets/broker_jaas.conf new file mode 100644 index 00000000000..8a55ec2faa0 --- /dev/null +++ b/tests/integration/test_storage_kerberized_kafka/secrets/broker_jaas.conf @@ -0,0 +1,14 @@ +KafkaServer { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/etc/kafka/secrets/kerberized_kafka.keytab" + principal="kafka/kerberized_kafka1@TEST.CLICKHOUSE.TECH"; +}; +Client { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/etc/kafka/secrets/zkclient.keytab" + principal="zkclient@TEST.CLICKHOUSE.TECH"; +}; diff --git a/tests/integration/test_storage_kerberized_kafka/secrets/krb.conf b/tests/integration/test_storage_kerberized_kafka/secrets/krb.conf new file mode 100644 index 00000000000..1efdf510f22 --- /dev/null +++ b/tests/integration/test_storage_kerberized_kafka/secrets/krb.conf @@ -0,0 +1,22 @@ +[logging] + default = FILE:/var/log/kerberos/krb5libs.log + kdc = FILE:/var/log/kerberos/krb5kdc.log + admin_server = FILE:/var/log/kerberos/kadmind.log + +[libdefaults] + default_realm = TEST.CLICKHOUSE.TECH + dns_lookup_realm = false + dns_lookup_kdc = false + ticket_lifetime = 15s + renew_lifetime = 15s + forwardable = true + +[realms] + TEST.CLICKHOUSE.TECH = { + kdc = kafka_kerberos + admin_server = kafka_kerberos + } + +[domain_realm] + .TEST.CLICKHOUSE.TECH = TEST.CLICKHOUSE.TECH + TEST.CLICKHOUSE.TECH = TEST.CLICKHOUSE.TECH diff --git a/tests/integration/test_storage_kerberized_kafka/secrets/zookeeper_jaas.conf b/tests/integration/test_storage_kerberized_kafka/secrets/zookeeper_jaas.conf new file mode 100644 index 00000000000..1b1f8103f42 --- /dev/null +++ b/tests/integration/test_storage_kerberized_kafka/secrets/zookeeper_jaas.conf @@ -0,0 +1,14 @@ +Server { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/etc/kafka/secrets/kafka_kerberized_zookeeper.keytab" + principal="zookeeper/kafka_kerberized_zookeeper@TEST.CLICKHOUSE.TECH"; +}; +Client { + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/etc/kafka/secrets/zkclient.keytab" + principal="zkclient@TEST.CLICKHOUSE.TECH"; +}; diff --git a/tests/integration/test_storage_kerberized_kafka/test.py b/tests/integration/test_storage_kerberized_kafka/test.py new file mode 100644 index 00000000000..ec23d340977 --- /dev/null +++ b/tests/integration/test_storage_kerberized_kafka/test.py @@ -0,0 +1,146 @@ +import os.path as p +import random +import threading +import time +import pytest + +from helpers.cluster import ClickHouseCluster +from helpers.test_tools import TSV +from helpers.client import QueryRuntimeException +from helpers.network import PartitionManager + +import json +import subprocess +import kafka.errors +from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer, BrokerConnection +from kafka.admin import NewTopic +from kafka.protocol.admin import DescribeGroupsResponse_v1, DescribeGroupsRequest_v1 +from kafka.protocol.group import MemberAssignment +import socket + +cluster = ClickHouseCluster(__file__) +instance = cluster.add_instance('instance', + main_configs=['configs/kafka.xml', 'configs/log_conf.xml' ], + with_kerberized_kafka=True, + clickhouse_path_dir="clickhouse_path" + ) +kafka_id = '' # instance.cluster.kafka_docker_id + +# Helpers + +def check_kafka_is_available(): + + # plaintext + p = subprocess.Popen(('docker', + 'exec', + '-i', + kafka_id, + '/usr/bin/kafka-broker-api-versions', + '--bootstrap-server', + 'localhost:9093'), + stdout=subprocess.PIPE) + p.communicate() + return p.returncode == 0 + + +def wait_kafka_is_available(max_retries=50): + retries = 0 + while True: + if check_kafka_is_available(): + break + else: + retries += 1 + if retries > max_retries: + raise "Kafka is not available" + print("Waiting for Kafka to start up") + time.sleep(1) + + +def kafka_produce(topic, messages, timestamp=None): + producer = KafkaProducer(bootstrap_servers="localhost:9093") + for message in messages: + producer.send(topic=topic, value=message, timestamp_ms=timestamp) + producer.flush() + print ("Produced {} messages for topic {}".format(len(messages), topic)) + + + +# Fixtures + +@pytest.fixture(scope="module") +def kafka_cluster(): + try: + global kafka_id + cluster.start() + kafka_id = instance.cluster.kerberized_kafka_docker_id + print("kafka_id is {}".format(kafka_id)) + yield cluster + + finally: + cluster.shutdown() + + +@pytest.fixture(autouse=True) +def kafka_setup_teardown(): + instance.query('DROP DATABASE IF EXISTS test; CREATE DATABASE test;') + wait_kafka_is_available() + print("kafka is available - running test") + yield # run test + +# Tests + +@pytest.mark.timeout(180) # wait to build containers +def test_kafka_json_as_string(kafka_cluster): + kafka_produce('kafka_json_as_string', ['{"t": 123, "e": {"x": "woof"} }', '', '{"t": 124, "e": {"x": "test"} }', '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}']) + + instance.query(''' + CREATE TABLE test.kafka (field String) + ENGINE = Kafka + SETTINGS kafka_broker_list = 'kerberized_kafka1:19092', + kafka_topic_list = 'kafka_json_as_string', + kafka_group_name = 'kafka_json_as_string', + kafka_format = 'JSONAsString', + kafka_flush_interval_ms=1000; + ''') + + result = instance.query('SELECT * FROM test.kafka;') + expected = '''\ +{"t": 123, "e": {"x": "woof"} } +{"t": 124, "e": {"x": "test"} } +{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"} +''' + assert TSV(result) == TSV(expected) + assert instance.contains_in_log("Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows") + +def test_kafka_json_as_string_no_kdc(kafka_cluster): + kafka_produce('kafka_json_as_string_no_kdc', ['{"t": 123, "e": {"x": "woof"} }', '', '{"t": 124, "e": {"x": "test"} }', '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}']) + + kafka_cluster.pause_container('kafka_kerberos') + time.sleep(45) # wait for ticket expiration + + instance.query(''' + CREATE TABLE test.kafka_no_kdc (field String) + ENGINE = Kafka + SETTINGS kafka_broker_list = 'kerberized_kafka1:19092', + kafka_topic_list = 'kafka_json_as_string_no_kdc', + kafka_group_name = 'kafka_json_as_string_no_kdc', + kafka_format = 'JSONAsString', + kafka_flush_interval_ms=1000; + ''') + + result = instance.query('SELECT * FROM test.kafka_no_kdc;') + expected = '' + + kafka_cluster.unpause_container('kafka_kerberos') + + + assert TSV(result) == TSV(expected) + assert instance.contains_in_log("StorageKafka (kafka_no_kdc): Nothing to commit") + assert instance.contains_in_log("Ticket expired") + assert instance.contains_in_log("Kerberos ticket refresh failed") + + +if __name__ == '__main__': + cluster.start() + raw_input("Cluster created, press any key to destroy...") + cluster.shutdown()