mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 23:21:59 +00:00
Merge pull request #15435 from ClickHouse/ADQM-160-fix
Try merge #14265 again
This commit is contained in:
commit
a4b4895b26
1
.gitmodules
vendored
1
.gitmodules
vendored
@ -186,3 +186,4 @@
|
|||||||
[submodule "contrib/cyrus-sasl"]
|
[submodule "contrib/cyrus-sasl"]
|
||||||
path = contrib/cyrus-sasl
|
path = contrib/cyrus-sasl
|
||||||
url = https://github.com/cyrusimap/cyrus-sasl
|
url = https://github.com/cyrusimap/cyrus-sasl
|
||||||
|
branch = cyrus-sasl-2.1
|
||||||
|
@ -14,10 +14,10 @@ if (NOT ENABLE_RDKAFKA)
|
|||||||
return()
|
return()
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
if (NOT ARCH_ARM AND USE_LIBGSASL)
|
if (NOT ARCH_ARM)
|
||||||
option (USE_INTERNAL_RDKAFKA_LIBRARY "Set to FALSE to use system librdkafka instead of the bundled" ${NOT_UNBUNDLED})
|
option (USE_INTERNAL_RDKAFKA_LIBRARY "Set to FALSE to use system librdkafka instead of the bundled" ${NOT_UNBUNDLED})
|
||||||
elseif(USE_INTERNAL_RDKAFKA_LIBRARY)
|
elseif(USE_INTERNAL_RDKAFKA_LIBRARY)
|
||||||
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't use internal librdkafka with ARCH_ARM=${ARCH_ARM} AND USE_LIBGSASL=${USE_LIBGSASL}")
|
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't use internal librdkafka with ARCH_ARM=${ARCH_ARM}")
|
||||||
endif ()
|
endif ()
|
||||||
|
|
||||||
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/cppkafka/CMakeLists.txt")
|
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/cppkafka/CMakeLists.txt")
|
||||||
|
2
contrib/cyrus-sasl
vendored
2
contrib/cyrus-sasl
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 6054630889fd1cd8d0659573d69badcee1e23a00
|
Subproject commit 9995bf9d8e14f58934d9313ac64f13780d6dd3c9
|
@ -133,6 +133,10 @@
|
|||||||
"name": "yandex/clickhouse-postgresql-java-client",
|
"name": "yandex/clickhouse-postgresql-java-client",
|
||||||
"dependent": []
|
"dependent": []
|
||||||
},
|
},
|
||||||
|
"docker/test/integration/kerberos_kdc": {
|
||||||
|
"name": "yandex/clickhouse-kerberos-kdc",
|
||||||
|
"dependent": []
|
||||||
|
},
|
||||||
"docker/test/base": {
|
"docker/test/base": {
|
||||||
"name": "yandex/clickhouse-test-base",
|
"name": "yandex/clickhouse-test-base",
|
||||||
"dependent": [
|
"dependent": [
|
||||||
|
@ -16,7 +16,8 @@ RUN apt-get update \
|
|||||||
odbc-postgresql \
|
odbc-postgresql \
|
||||||
sqlite3 \
|
sqlite3 \
|
||||||
curl \
|
curl \
|
||||||
tar
|
tar \
|
||||||
|
krb5-user
|
||||||
RUN rm -rf \
|
RUN rm -rf \
|
||||||
/var/lib/apt/lists/* \
|
/var/lib/apt/lists/* \
|
||||||
/var/cache/debconf \
|
/var/cache/debconf \
|
||||||
|
15
docker/test/integration/kerberos_kdc/Dockerfile
Normal file
15
docker/test/integration/kerberos_kdc/Dockerfile
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
# docker build -t yandex/clickhouse-kerberos-kdc .
|
||||||
|
|
||||||
|
FROM centos:6.6
|
||||||
|
# old OS to make is faster and smaller
|
||||||
|
|
||||||
|
RUN yum install -y krb5-server krb5-libs krb5-auth-dialog krb5-workstation
|
||||||
|
|
||||||
|
EXPOSE 88 749
|
||||||
|
|
||||||
|
RUN touch /config.sh
|
||||||
|
# should be overwritten e.g. via docker_compose volumes
|
||||||
|
# volumes: /some_path/my_kerberos_config.sh:/config.sh:ro
|
||||||
|
|
||||||
|
|
||||||
|
ENTRYPOINT ["/bin/bash", "/config.sh"]
|
@ -0,0 +1,59 @@
|
|||||||
|
version: '2.3'
|
||||||
|
|
||||||
|
services:
|
||||||
|
kafka_kerberized_zookeeper:
|
||||||
|
image: confluentinc/cp-zookeeper:5.2.0
|
||||||
|
# restart: always
|
||||||
|
hostname: kafka_kerberized_zookeeper
|
||||||
|
environment:
|
||||||
|
ZOOKEEPER_SERVER_ID: 1
|
||||||
|
ZOOKEEPER_CLIENT_PORT: 2181
|
||||||
|
ZOOKEEPER_SERVERS: "kafka_kerberized_zookeeper:2888:3888"
|
||||||
|
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/zookeeper_jaas.conf -Djava.security.krb5.conf=/etc/kafka/secrets/krb.conf -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider -Dsun.security.krb5.debug=true"
|
||||||
|
volumes:
|
||||||
|
- ${KERBERIZED_KAFKA_DIR}/secrets:/etc/kafka/secrets
|
||||||
|
- /dev/urandom:/dev/random
|
||||||
|
depends_on:
|
||||||
|
- kafka_kerberos
|
||||||
|
security_opt:
|
||||||
|
- label:disable
|
||||||
|
|
||||||
|
kerberized_kafka1:
|
||||||
|
image: confluentinc/cp-kafka:5.2.0
|
||||||
|
# restart: always
|
||||||
|
hostname: kerberized_kafka1
|
||||||
|
ports:
|
||||||
|
- "9092:9092"
|
||||||
|
- "9093:9093"
|
||||||
|
environment:
|
||||||
|
KAFKA_LISTENERS: OUTSIDE://:19092,UNSECURED_OUTSIDE://:19093,UNSECURED_INSIDE://:9093
|
||||||
|
KAFKA_ADVERTISED_LISTENERS: OUTSIDE://kerberized_kafka1:19092,UNSECURED_OUTSIDE://kerberized_kafka1:19093,UNSECURED_INSIDE://localhost:9093
|
||||||
|
# KAFKA_LISTENERS: INSIDE://kerberized_kafka1:9092,OUTSIDE://kerberized_kafka1:19092
|
||||||
|
# KAFKA_ADVERTISED_LISTENERS: INSIDE://localhost:9092,OUTSIDE://kerberized_kafka1:19092
|
||||||
|
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: GSSAPI
|
||||||
|
KAFKA_SASL_ENABLED_MECHANISMS: GSSAPI
|
||||||
|
KAFKA_SASL_KERBEROS_SERVICE_NAME: kafka
|
||||||
|
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: OUTSIDE:SASL_PLAINTEXT,UNSECURED_OUTSIDE:PLAINTEXT,UNSECURED_INSIDE:PLAINTEXT,
|
||||||
|
KAFKA_INTER_BROKER_LISTENER_NAME: OUTSIDE
|
||||||
|
KAFKA_BROKER_ID: 1
|
||||||
|
KAFKA_ZOOKEEPER_CONNECT: "kafka_kerberized_zookeeper:2181"
|
||||||
|
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
|
||||||
|
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||||
|
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/secrets/broker_jaas.conf -Djava.security.krb5.conf=/etc/kafka/secrets/krb.conf -Dsun.security.krb5.debug=true"
|
||||||
|
volumes:
|
||||||
|
- ${KERBERIZED_KAFKA_DIR}/secrets:/etc/kafka/secrets
|
||||||
|
- /dev/urandom:/dev/random
|
||||||
|
depends_on:
|
||||||
|
- kafka_kerberized_zookeeper
|
||||||
|
- kafka_kerberos
|
||||||
|
security_opt:
|
||||||
|
- label:disable
|
||||||
|
|
||||||
|
kafka_kerberos:
|
||||||
|
image: yandex/clickhouse-kerberos-kdc:${DOCKER_KERBEROS_KDC_TAG}
|
||||||
|
hostname: kafka_kerberos
|
||||||
|
volumes:
|
||||||
|
- ${KERBERIZED_KAFKA_DIR}/secrets:/tmp/keytab
|
||||||
|
- ${KERBERIZED_KAFKA_DIR}/../../kerberos_image_config.sh:/config.sh
|
||||||
|
- /dev/urandom:/dev/random
|
||||||
|
ports: [88, 749]
|
@ -27,6 +27,7 @@ export DOCKER_MYSQL_JAVA_CLIENT_TAG=${DOCKER_MYSQL_JAVA_CLIENT_TAG:=latest}
|
|||||||
export DOCKER_MYSQL_JS_CLIENT_TAG=${DOCKER_MYSQL_JS_CLIENT_TAG:=latest}
|
export DOCKER_MYSQL_JS_CLIENT_TAG=${DOCKER_MYSQL_JS_CLIENT_TAG:=latest}
|
||||||
export DOCKER_MYSQL_PHP_CLIENT_TAG=${DOCKER_MYSQL_PHP_CLIENT_TAG:=latest}
|
export DOCKER_MYSQL_PHP_CLIENT_TAG=${DOCKER_MYSQL_PHP_CLIENT_TAG:=latest}
|
||||||
export DOCKER_POSTGRESQL_JAVA_CLIENT_TAG=${DOCKER_POSTGRESQL_JAVA_CLIENT_TAG:=latest}
|
export DOCKER_POSTGRESQL_JAVA_CLIENT_TAG=${DOCKER_POSTGRESQL_JAVA_CLIENT_TAG:=latest}
|
||||||
|
export DOCKER_KERBEROS_KDC_TAG=${DOCKER_KERBEROS_KDC_TAG:=latest}
|
||||||
|
|
||||||
cd /ClickHouse/tests/integration
|
cd /ClickHouse/tests/integration
|
||||||
exec "$@"
|
exec "$@"
|
||||||
|
@ -165,6 +165,22 @@ Similar to GraphiteMergeTree, the Kafka engine supports extended configuration u
|
|||||||
|
|
||||||
For a list of possible configuration options, see the [librdkafka configuration reference](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Use the underscore (`_`) instead of a dot in the ClickHouse configuration. For example, `check.crcs=true` will be `<check_crcs>true</check_crcs>`.
|
For a list of possible configuration options, see the [librdkafka configuration reference](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). Use the underscore (`_`) instead of a dot in the ClickHouse configuration. For example, `check.crcs=true` will be `<check_crcs>true</check_crcs>`.
|
||||||
|
|
||||||
|
### Kerberos support {#kafka-kerberos-support}
|
||||||
|
|
||||||
|
To deal with Kerberos-aware Kafka, add `security_protocol` child element with `sasl_plaintext` value. It is enough if Kerberos ticket-granting ticket is obtained and cached by OS facilities.
|
||||||
|
ClickHouse is able to maintain Kerberos credentials using a keytab file. Consider `sasl_kerberos_service_name`, `sasl_kerberos_keytab`, `sasl_kerberos_principal` and `sasl.kerberos.kinit.cmd` child elements.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
|
||||||
|
``` xml
|
||||||
|
<!-- Kerberos-aware Kafka -->
|
||||||
|
<kafka>
|
||||||
|
<security_protocol>SASL_PLAINTEXT</security_protocol>
|
||||||
|
<sasl_kerberos_keytab>/home/kafkauser/kafkauser.keytab</sasl_kerberos_keytab>
|
||||||
|
<sasl_kerberos_principal>kafkauser/kafkahost@EXAMPLE.COM</sasl_kerberos_principal>
|
||||||
|
</kafka>
|
||||||
|
```
|
||||||
|
|
||||||
## Virtual Columns {#virtual-columns}
|
## Virtual Columns {#virtual-columns}
|
||||||
|
|
||||||
- `_topic` — Kafka topic.
|
- `_topic` — Kafka topic.
|
||||||
|
@ -45,7 +45,6 @@ def _create_env_file(path, variables, fname=DEFAULT_ENV_NAME):
|
|||||||
f.write("=".join([var, value]) + "\n")
|
f.write("=".join([var, value]) + "\n")
|
||||||
return full_path
|
return full_path
|
||||||
|
|
||||||
|
|
||||||
def subprocess_check_call(args):
|
def subprocess_check_call(args):
|
||||||
# Uncomment for debugging
|
# Uncomment for debugging
|
||||||
# print('run:', ' ' . join(args))
|
# print('run:', ' ' . join(args))
|
||||||
@ -125,6 +124,7 @@ class ClickHouseCluster:
|
|||||||
self.base_zookeeper_cmd = None
|
self.base_zookeeper_cmd = None
|
||||||
self.base_mysql_cmd = []
|
self.base_mysql_cmd = []
|
||||||
self.base_kafka_cmd = []
|
self.base_kafka_cmd = []
|
||||||
|
self.base_kerberized_kafka_cmd = []
|
||||||
self.base_rabbitmq_cmd = []
|
self.base_rabbitmq_cmd = []
|
||||||
self.base_cassandra_cmd = []
|
self.base_cassandra_cmd = []
|
||||||
self.pre_zookeeper_commands = []
|
self.pre_zookeeper_commands = []
|
||||||
@ -133,6 +133,7 @@ class ClickHouseCluster:
|
|||||||
self.with_mysql = False
|
self.with_mysql = False
|
||||||
self.with_postgres = False
|
self.with_postgres = False
|
||||||
self.with_kafka = False
|
self.with_kafka = False
|
||||||
|
self.with_kerberized_kafka = False
|
||||||
self.with_rabbitmq = False
|
self.with_rabbitmq = False
|
||||||
self.with_odbc_drivers = False
|
self.with_odbc_drivers = False
|
||||||
self.with_hdfs = False
|
self.with_hdfs = False
|
||||||
@ -169,7 +170,7 @@ class ClickHouseCluster:
|
|||||||
|
|
||||||
def add_instance(self, name, base_config_dir=None, main_configs=None, user_configs=None, dictionaries=None,
|
def add_instance(self, name, base_config_dir=None, main_configs=None, user_configs=None, dictionaries=None,
|
||||||
macros=None,
|
macros=None,
|
||||||
with_zookeeper=False, with_mysql=False, with_kafka=False, with_rabbitmq=False,
|
with_zookeeper=False, with_mysql=False, with_kafka=False, with_kerberized_kafka=False, with_rabbitmq=False,
|
||||||
clickhouse_path_dir=None,
|
clickhouse_path_dir=None,
|
||||||
with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False,
|
with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False,
|
||||||
with_redis=False, with_minio=False, with_cassandra=False,
|
with_redis=False, with_minio=False, with_cassandra=False,
|
||||||
@ -207,6 +208,7 @@ class ClickHouseCluster:
|
|||||||
zookeeper_config_path=self.zookeeper_config_path,
|
zookeeper_config_path=self.zookeeper_config_path,
|
||||||
with_mysql=with_mysql,
|
with_mysql=with_mysql,
|
||||||
with_kafka=with_kafka,
|
with_kafka=with_kafka,
|
||||||
|
with_kerberized_kafka=with_kerberized_kafka,
|
||||||
with_rabbitmq=with_rabbitmq,
|
with_rabbitmq=with_rabbitmq,
|
||||||
with_mongo=with_mongo,
|
with_mongo=with_mongo,
|
||||||
with_redis=with_redis,
|
with_redis=with_redis,
|
||||||
@ -290,6 +292,13 @@ class ClickHouseCluster:
|
|||||||
p.join(docker_compose_yml_dir, 'docker_compose_kafka.yml')]
|
p.join(docker_compose_yml_dir, 'docker_compose_kafka.yml')]
|
||||||
cmds.append(self.base_kafka_cmd)
|
cmds.append(self.base_kafka_cmd)
|
||||||
|
|
||||||
|
if with_kerberized_kafka and not self.with_kerberized_kafka:
|
||||||
|
self.with_kerberized_kafka = True
|
||||||
|
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_kafka.yml')])
|
||||||
|
self.base_kerberized_kafka_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
|
||||||
|
self.project_name, '--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_kafka.yml')]
|
||||||
|
cmds.append(self.base_kerberized_kafka_cmd)
|
||||||
|
|
||||||
if with_rabbitmq and not self.with_rabbitmq:
|
if with_rabbitmq and not self.with_rabbitmq:
|
||||||
self.with_rabbitmq = True
|
self.with_rabbitmq = True
|
||||||
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_rabbitmq.yml')])
|
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_rabbitmq.yml')])
|
||||||
@ -608,6 +617,11 @@ class ClickHouseCluster:
|
|||||||
self.kafka_docker_id = self.get_instance_docker_id('kafka1')
|
self.kafka_docker_id = self.get_instance_docker_id('kafka1')
|
||||||
self.wait_schema_registry_to_start(120)
|
self.wait_schema_registry_to_start(120)
|
||||||
|
|
||||||
|
if self.with_kerberized_kafka and self.base_kerberized_kafka_cmd:
|
||||||
|
env = os.environ.copy()
|
||||||
|
env['KERBERIZED_KAFKA_DIR'] = instance.path + '/'
|
||||||
|
subprocess.check_call(self.base_kerberized_kafka_cmd + common_opts + ['--renew-anon-volumes'], env=env)
|
||||||
|
self.kerberized_kafka_docker_id = self.get_instance_docker_id('kerberized_kafka1')
|
||||||
if self.with_rabbitmq and self.base_rabbitmq_cmd:
|
if self.with_rabbitmq and self.base_rabbitmq_cmd:
|
||||||
subprocess_check_call(self.base_rabbitmq_cmd + common_opts + ['--renew-anon-volumes'])
|
subprocess_check_call(self.base_rabbitmq_cmd + common_opts + ['--renew-anon-volumes'])
|
||||||
self.rabbitmq_docker_id = self.get_instance_docker_id('rabbitmq1')
|
self.rabbitmq_docker_id = self.get_instance_docker_id('rabbitmq1')
|
||||||
@ -788,9 +802,12 @@ services:
|
|||||||
- {instance_config_dir}:/etc/clickhouse-server/
|
- {instance_config_dir}:/etc/clickhouse-server/
|
||||||
- {db_dir}:/var/lib/clickhouse/
|
- {db_dir}:/var/lib/clickhouse/
|
||||||
- {logs_dir}:/var/log/clickhouse-server/
|
- {logs_dir}:/var/log/clickhouse-server/
|
||||||
|
- /etc/passwd:/etc/passwd:ro
|
||||||
{binary_volume}
|
{binary_volume}
|
||||||
{odbc_bridge_volume}
|
{odbc_bridge_volume}
|
||||||
{odbc_ini_path}
|
{odbc_ini_path}
|
||||||
|
{keytab_path}
|
||||||
|
{krb5_conf}
|
||||||
entrypoint: {entrypoint_cmd}
|
entrypoint: {entrypoint_cmd}
|
||||||
tmpfs: {tmpfs}
|
tmpfs: {tmpfs}
|
||||||
cap_add:
|
cap_add:
|
||||||
@ -820,7 +837,7 @@ class ClickHouseInstance:
|
|||||||
def __init__(
|
def __init__(
|
||||||
self, cluster, base_path, name, base_config_dir, custom_main_configs, custom_user_configs,
|
self, cluster, base_path, name, base_config_dir, custom_main_configs, custom_user_configs,
|
||||||
custom_dictionaries,
|
custom_dictionaries,
|
||||||
macros, with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_rabbitmq, with_mongo,
|
macros, with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_kerberized_kafka, with_rabbitmq, with_mongo,
|
||||||
with_redis, with_minio,
|
with_redis, with_minio,
|
||||||
with_cassandra, server_bin_path, odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers,
|
with_cassandra, server_bin_path, odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers,
|
||||||
hostname=None, env_variables=None,
|
hostname=None, env_variables=None,
|
||||||
@ -839,6 +856,7 @@ class ClickHouseInstance:
|
|||||||
self.custom_user_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_user_configs]
|
self.custom_user_config_paths = [p.abspath(p.join(base_path, c)) for c in custom_user_configs]
|
||||||
self.custom_dictionaries_paths = [p.abspath(p.join(base_path, c)) for c in custom_dictionaries]
|
self.custom_dictionaries_paths = [p.abspath(p.join(base_path, c)) for c in custom_dictionaries]
|
||||||
self.clickhouse_path_dir = p.abspath(p.join(base_path, clickhouse_path_dir)) if clickhouse_path_dir else None
|
self.clickhouse_path_dir = p.abspath(p.join(base_path, clickhouse_path_dir)) if clickhouse_path_dir else None
|
||||||
|
self.kerberos_secrets_dir = p.abspath(p.join(base_path, 'secrets'))
|
||||||
self.macros = macros if macros is not None else {}
|
self.macros = macros if macros is not None else {}
|
||||||
self.with_zookeeper = with_zookeeper
|
self.with_zookeeper = with_zookeeper
|
||||||
self.zookeeper_config_path = zookeeper_config_path
|
self.zookeeper_config_path = zookeeper_config_path
|
||||||
@ -848,6 +866,7 @@ class ClickHouseInstance:
|
|||||||
|
|
||||||
self.with_mysql = with_mysql
|
self.with_mysql = with_mysql
|
||||||
self.with_kafka = with_kafka
|
self.with_kafka = with_kafka
|
||||||
|
self.with_kerberized_kafka = with_kerberized_kafka
|
||||||
self.with_rabbitmq = with_rabbitmq
|
self.with_rabbitmq = with_rabbitmq
|
||||||
self.with_mongo = with_mongo
|
self.with_mongo = with_mongo
|
||||||
self.with_redis = with_redis
|
self.with_redis = with_redis
|
||||||
@ -863,6 +882,13 @@ class ClickHouseInstance:
|
|||||||
else:
|
else:
|
||||||
self.odbc_ini_path = ""
|
self.odbc_ini_path = ""
|
||||||
|
|
||||||
|
if with_kerberized_kafka:
|
||||||
|
self.keytab_path = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets:/tmp/keytab"
|
||||||
|
self.krb5_conf = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets/krb.conf:/etc/krb5.conf:ro"
|
||||||
|
else:
|
||||||
|
self.keytab_path = ""
|
||||||
|
self.krb5_conf = ""
|
||||||
|
|
||||||
self.docker_client = None
|
self.docker_client = None
|
||||||
self.ip_address = None
|
self.ip_address = None
|
||||||
self.client = None
|
self.client = None
|
||||||
@ -1192,6 +1218,9 @@ class ClickHouseInstance:
|
|||||||
if self.with_zookeeper:
|
if self.with_zookeeper:
|
||||||
shutil.copy(self.zookeeper_config_path, conf_d_dir)
|
shutil.copy(self.zookeeper_config_path, conf_d_dir)
|
||||||
|
|
||||||
|
if self.with_kerberized_kafka:
|
||||||
|
shutil.copytree(self.kerberos_secrets_dir, p.abspath(p.join(self.path, 'secrets')))
|
||||||
|
|
||||||
# Copy config.d configs
|
# Copy config.d configs
|
||||||
print "Copy custom test config files {} to {}".format(self.custom_main_config_paths, self.config_d_dir)
|
print "Copy custom test config files {} to {}".format(self.custom_main_config_paths, self.config_d_dir)
|
||||||
for path in self.custom_main_config_paths:
|
for path in self.custom_main_config_paths:
|
||||||
@ -1227,6 +1256,9 @@ class ClickHouseInstance:
|
|||||||
depends_on.append("kafka1")
|
depends_on.append("kafka1")
|
||||||
depends_on.append("schema-registry")
|
depends_on.append("schema-registry")
|
||||||
|
|
||||||
|
if self.with_kerberized_kafka:
|
||||||
|
depends_on.append("kerberized_kafka1")
|
||||||
|
|
||||||
if self.with_rabbitmq:
|
if self.with_rabbitmq:
|
||||||
depends_on.append("rabbitmq1")
|
depends_on.append("rabbitmq1")
|
||||||
|
|
||||||
@ -1290,6 +1322,8 @@ class ClickHouseInstance:
|
|||||||
user=os.getuid(),
|
user=os.getuid(),
|
||||||
env_file=env_file,
|
env_file=env_file,
|
||||||
odbc_ini_path=odbc_ini_path,
|
odbc_ini_path=odbc_ini_path,
|
||||||
|
keytab_path=self.keytab_path,
|
||||||
|
krb5_conf=self.krb5_conf,
|
||||||
entrypoint_cmd=entrypoint_cmd,
|
entrypoint_cmd=entrypoint_cmd,
|
||||||
networks=networks,
|
networks=networks,
|
||||||
app_net=app_net,
|
app_net=app_net,
|
||||||
|
@ -155,7 +155,9 @@ if __name__ == "__main__":
|
|||||||
elif image == "yandex/clickhouse-postgresql-java-client":
|
elif image == "yandex/clickhouse-postgresql-java-client":
|
||||||
env_tags += "-e {}={} ".format("DOCKER_POSTGRESQL_JAVA_CLIENT_TAG", tag)
|
env_tags += "-e {}={} ".format("DOCKER_POSTGRESQL_JAVA_CLIENT_TAG", tag)
|
||||||
elif image == "yandex/clickhouse-integration-test":
|
elif image == "yandex/clickhouse-integration-test":
|
||||||
env_tags += "-e {}={}".format("DOCKER_BASE_TAG", tag)
|
env_tags += "-e {}={} ".format("DOCKER_BASE_TAG", tag)
|
||||||
|
elif image == "yandex/clickhouse-kerberos-kdc":
|
||||||
|
env_tags += "-e {}={}".format("DOCKER_KERBEROS_KDC_TAG", tag)
|
||||||
else:
|
else:
|
||||||
logging.info("Unknown image {}".format(image))
|
logging.info("Unknown image {}".format(image))
|
||||||
|
|
||||||
|
@ -0,0 +1,26 @@
|
|||||||
|
<yandex>
|
||||||
|
<kafka>
|
||||||
|
<auto_offset_reset>earliest</auto_offset_reset>
|
||||||
|
<!-- Debugging of possible issues, like:
|
||||||
|
- https://github.com/edenhill/librdkafka/issues/2077
|
||||||
|
- https://github.com/edenhill/librdkafka/issues/1778
|
||||||
|
- #5615
|
||||||
|
|
||||||
|
XXX: for now this messages will appears in stderr.
|
||||||
|
-->
|
||||||
|
<security_protocol>SASL_PLAINTEXT</security_protocol>
|
||||||
|
<sasl_mechanism>GSSAPI</sasl_mechanism>
|
||||||
|
<sasl_kerberos_service_name>kafka</sasl_kerberos_service_name>
|
||||||
|
<sasl_kerberos_keytab>/tmp/keytab/clickhouse.keytab</sasl_kerberos_keytab>
|
||||||
|
<sasl_kerberos_principal>kafkauser/instance@TEST.CLICKHOUSE.TECH</sasl_kerberos_principal>
|
||||||
|
<debug>security</debug>
|
||||||
|
<api_version_request>false</api_version_request>
|
||||||
|
</kafka>
|
||||||
|
|
||||||
|
<kafka_consumer_hang>
|
||||||
|
<!-- default: 3000 -->
|
||||||
|
<heartbeat_interval_ms>300</heartbeat_interval_ms>
|
||||||
|
<!-- default: 10000 -->
|
||||||
|
<session_timeout_ms>6000</session_timeout_ms>
|
||||||
|
</kafka_consumer_hang>
|
||||||
|
</yandex>
|
@ -0,0 +1,11 @@
|
|||||||
|
<yandex>
|
||||||
|
<logger>
|
||||||
|
<level>trace</level>
|
||||||
|
<log>/var/log/clickhouse-server/log.log</log>
|
||||||
|
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
|
||||||
|
<size>1000M</size>
|
||||||
|
<count>10</count>
|
||||||
|
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
|
||||||
|
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
|
||||||
|
</logger>
|
||||||
|
</yandex>
|
@ -0,0 +1,132 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
|
||||||
|
set -x # trace
|
||||||
|
|
||||||
|
: "${REALM:=TEST.CLICKHOUSE.TECH}"
|
||||||
|
: "${DOMAIN_REALM:=test.clickhouse.tech}"
|
||||||
|
: "${KERB_MASTER_KEY:=masterkey}"
|
||||||
|
: "${KERB_ADMIN_USER:=admin}"
|
||||||
|
: "${KERB_ADMIN_PASS:=admin}"
|
||||||
|
|
||||||
|
create_config() {
|
||||||
|
: "${KDC_ADDRESS:=$(hostname -f)}"
|
||||||
|
|
||||||
|
cat>/etc/krb5.conf<<EOF
|
||||||
|
[logging]
|
||||||
|
default = FILE:/var/log/kerberos/krb5libs.log
|
||||||
|
kdc = FILE:/var/log/kerberos/krb5kdc.log
|
||||||
|
admin_server = FILE:/var/log/kerberos/kadmind.log
|
||||||
|
|
||||||
|
[libdefaults]
|
||||||
|
default_realm = $REALM
|
||||||
|
dns_lookup_realm = false
|
||||||
|
dns_lookup_kdc = false
|
||||||
|
ticket_lifetime = 15s
|
||||||
|
renew_lifetime = 15s
|
||||||
|
forwardable = true
|
||||||
|
# WARNING: We use weaker key types to simplify testing as stronger key types
|
||||||
|
# require the enhanced security JCE policy file to be installed. You should
|
||||||
|
# NOT run with this configuration in production or any real environment. You
|
||||||
|
# have been warned.
|
||||||
|
default_tkt_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
|
||||||
|
default_tgs_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
|
||||||
|
permitted_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
|
||||||
|
|
||||||
|
[realms]
|
||||||
|
$REALM = {
|
||||||
|
kdc = $KDC_ADDRESS
|
||||||
|
admin_server = $KDC_ADDRESS
|
||||||
|
}
|
||||||
|
|
||||||
|
[domain_realm]
|
||||||
|
.$DOMAIN_REALM = $REALM
|
||||||
|
$DOMAIN_REALM = $REALM
|
||||||
|
EOF
|
||||||
|
|
||||||
|
cat>/var/kerberos/krb5kdc/kdc.conf<<EOF
|
||||||
|
[kdcdefaults]
|
||||||
|
kdc_ports = 88
|
||||||
|
kdc_tcp_ports = 88
|
||||||
|
|
||||||
|
[realms]
|
||||||
|
$REALM = {
|
||||||
|
acl_file = /var/kerberos/krb5kdc/kadm5.acl
|
||||||
|
dict_file = /usr/share/dict/words
|
||||||
|
admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab
|
||||||
|
# WARNING: We use weaker key types to simplify testing as stronger key types
|
||||||
|
# require the enhanced security JCE policy file to be installed. You should
|
||||||
|
# NOT run with this configuration in production or any real environment. You
|
||||||
|
# have been warned.
|
||||||
|
master_key_type = des3-hmac-sha1
|
||||||
|
supported_enctypes = arcfour-hmac:normal des3-hmac-sha1:normal des-cbc-crc:normal des:normal des:v4 des:norealm des:onlyrealm des:afs3
|
||||||
|
default_principal_flags = +preauth
|
||||||
|
}
|
||||||
|
EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
create_db() {
|
||||||
|
/usr/sbin/kdb5_util -P $KERB_MASTER_KEY -r $REALM create -s
|
||||||
|
}
|
||||||
|
|
||||||
|
start_kdc() {
|
||||||
|
mkdir -p /var/log/kerberos
|
||||||
|
|
||||||
|
/etc/rc.d/init.d/krb5kdc start
|
||||||
|
/etc/rc.d/init.d/kadmin start
|
||||||
|
|
||||||
|
chkconfig krb5kdc on
|
||||||
|
chkconfig kadmin on
|
||||||
|
}
|
||||||
|
|
||||||
|
restart_kdc() {
|
||||||
|
/etc/rc.d/init.d/krb5kdc restart
|
||||||
|
/etc/rc.d/init.d/kadmin restart
|
||||||
|
}
|
||||||
|
|
||||||
|
create_admin_user() {
|
||||||
|
kadmin.local -q "addprinc -pw $KERB_ADMIN_PASS $KERB_ADMIN_USER/admin"
|
||||||
|
echo "*/admin@$REALM *" > /var/kerberos/krb5kdc/kadm5.acl
|
||||||
|
}
|
||||||
|
|
||||||
|
create_keytabs() {
|
||||||
|
|
||||||
|
kadmin.local -q "addprinc -randkey zookeeper/kafka_kerberized_zookeeper@${REALM}"
|
||||||
|
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/kafka_kerberized_zookeeper.keytab zookeeper/kafka_kerberized_zookeeper@${REALM}"
|
||||||
|
|
||||||
|
kadmin.local -q "addprinc -randkey kafka/kerberized_kafka1@${REALM}"
|
||||||
|
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/kerberized_kafka.keytab kafka/kerberized_kafka1@${REALM}"
|
||||||
|
|
||||||
|
kadmin.local -q "addprinc -randkey zkclient@${REALM}"
|
||||||
|
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/zkclient.keytab zkclient@${REALM}"
|
||||||
|
|
||||||
|
|
||||||
|
kadmin.local -q "addprinc -randkey kafkauser/instance@${REALM}"
|
||||||
|
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab kafkauser/instance@${REALM}"
|
||||||
|
|
||||||
|
chmod g+r /tmp/keytab/clickhouse.keytab
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
main() {
|
||||||
|
|
||||||
|
if [ ! -f /kerberos_initialized ]; then
|
||||||
|
create_config
|
||||||
|
create_db
|
||||||
|
create_admin_user
|
||||||
|
start_kdc
|
||||||
|
|
||||||
|
touch /kerberos_initialized
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [ ! -f /var/kerberos/krb5kdc/principal ]; then
|
||||||
|
while true; do sleep 1000; done
|
||||||
|
else
|
||||||
|
start_kdc
|
||||||
|
create_keytabs
|
||||||
|
tail -F /var/log/kerberos/krb5kdc.log
|
||||||
|
fi
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
[[ "$0" == "${BASH_SOURCE[0]}" ]] && main "$@"
|
@ -0,0 +1,14 @@
|
|||||||
|
KafkaServer {
|
||||||
|
com.sun.security.auth.module.Krb5LoginModule required
|
||||||
|
useKeyTab=true
|
||||||
|
storeKey=true
|
||||||
|
keyTab="/etc/kafka/secrets/kerberized_kafka.keytab"
|
||||||
|
principal="kafka/kerberized_kafka1@TEST.CLICKHOUSE.TECH";
|
||||||
|
};
|
||||||
|
Client {
|
||||||
|
com.sun.security.auth.module.Krb5LoginModule required
|
||||||
|
useKeyTab=true
|
||||||
|
storeKey=true
|
||||||
|
keyTab="/etc/kafka/secrets/zkclient.keytab"
|
||||||
|
principal="zkclient@TEST.CLICKHOUSE.TECH";
|
||||||
|
};
|
@ -0,0 +1,22 @@
|
|||||||
|
[logging]
|
||||||
|
default = FILE:/var/log/kerberos/krb5libs.log
|
||||||
|
kdc = FILE:/var/log/kerberos/krb5kdc.log
|
||||||
|
admin_server = FILE:/var/log/kerberos/kadmind.log
|
||||||
|
|
||||||
|
[libdefaults]
|
||||||
|
default_realm = TEST.CLICKHOUSE.TECH
|
||||||
|
dns_lookup_realm = false
|
||||||
|
dns_lookup_kdc = false
|
||||||
|
ticket_lifetime = 15s
|
||||||
|
renew_lifetime = 15s
|
||||||
|
forwardable = true
|
||||||
|
|
||||||
|
[realms]
|
||||||
|
TEST.CLICKHOUSE.TECH = {
|
||||||
|
kdc = kafka_kerberos
|
||||||
|
admin_server = kafka_kerberos
|
||||||
|
}
|
||||||
|
|
||||||
|
[domain_realm]
|
||||||
|
.TEST.CLICKHOUSE.TECH = TEST.CLICKHOUSE.TECH
|
||||||
|
TEST.CLICKHOUSE.TECH = TEST.CLICKHOUSE.TECH
|
@ -0,0 +1,14 @@
|
|||||||
|
Server {
|
||||||
|
com.sun.security.auth.module.Krb5LoginModule required
|
||||||
|
useKeyTab=true
|
||||||
|
storeKey=true
|
||||||
|
keyTab="/etc/kafka/secrets/kafka_kerberized_zookeeper.keytab"
|
||||||
|
principal="zookeeper/kafka_kerberized_zookeeper@TEST.CLICKHOUSE.TECH";
|
||||||
|
};
|
||||||
|
Client {
|
||||||
|
com.sun.security.auth.module.Krb5LoginModule required
|
||||||
|
useKeyTab=true
|
||||||
|
storeKey=true
|
||||||
|
keyTab="/etc/kafka/secrets/zkclient.keytab"
|
||||||
|
principal="zkclient@TEST.CLICKHOUSE.TECH";
|
||||||
|
};
|
146
tests/integration/test_storage_kerberized_kafka/test.py
Normal file
146
tests/integration/test_storage_kerberized_kafka/test.py
Normal file
@ -0,0 +1,146 @@
|
|||||||
|
import os.path as p
|
||||||
|
import random
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from helpers.cluster import ClickHouseCluster
|
||||||
|
from helpers.test_tools import TSV
|
||||||
|
from helpers.client import QueryRuntimeException
|
||||||
|
from helpers.network import PartitionManager
|
||||||
|
|
||||||
|
import json
|
||||||
|
import subprocess
|
||||||
|
import kafka.errors
|
||||||
|
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer, BrokerConnection
|
||||||
|
from kafka.admin import NewTopic
|
||||||
|
from kafka.protocol.admin import DescribeGroupsResponse_v1, DescribeGroupsRequest_v1
|
||||||
|
from kafka.protocol.group import MemberAssignment
|
||||||
|
import socket
|
||||||
|
|
||||||
|
cluster = ClickHouseCluster(__file__)
|
||||||
|
instance = cluster.add_instance('instance',
|
||||||
|
main_configs=['configs/kafka.xml', 'configs/log_conf.xml' ],
|
||||||
|
with_kerberized_kafka=True,
|
||||||
|
clickhouse_path_dir="clickhouse_path"
|
||||||
|
)
|
||||||
|
kafka_id = '' # instance.cluster.kafka_docker_id
|
||||||
|
|
||||||
|
# Helpers
|
||||||
|
|
||||||
|
def check_kafka_is_available():
|
||||||
|
|
||||||
|
# plaintext
|
||||||
|
p = subprocess.Popen(('docker',
|
||||||
|
'exec',
|
||||||
|
'-i',
|
||||||
|
kafka_id,
|
||||||
|
'/usr/bin/kafka-broker-api-versions',
|
||||||
|
'--bootstrap-server',
|
||||||
|
'localhost:9093'),
|
||||||
|
stdout=subprocess.PIPE)
|
||||||
|
p.communicate()
|
||||||
|
return p.returncode == 0
|
||||||
|
|
||||||
|
|
||||||
|
def wait_kafka_is_available(max_retries=50):
|
||||||
|
retries = 0
|
||||||
|
while True:
|
||||||
|
if check_kafka_is_available():
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
retries += 1
|
||||||
|
if retries > max_retries:
|
||||||
|
raise "Kafka is not available"
|
||||||
|
print("Waiting for Kafka to start up")
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
|
def kafka_produce(topic, messages, timestamp=None):
|
||||||
|
producer = KafkaProducer(bootstrap_servers="localhost:9093")
|
||||||
|
for message in messages:
|
||||||
|
producer.send(topic=topic, value=message, timestamp_ms=timestamp)
|
||||||
|
producer.flush()
|
||||||
|
print ("Produced {} messages for topic {}".format(len(messages), topic))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# Fixtures
|
||||||
|
|
||||||
|
@pytest.fixture(scope="module")
|
||||||
|
def kafka_cluster():
|
||||||
|
try:
|
||||||
|
global kafka_id
|
||||||
|
cluster.start()
|
||||||
|
kafka_id = instance.cluster.kerberized_kafka_docker_id
|
||||||
|
print("kafka_id is {}".format(kafka_id))
|
||||||
|
yield cluster
|
||||||
|
|
||||||
|
finally:
|
||||||
|
cluster.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def kafka_setup_teardown():
|
||||||
|
instance.query('DROP DATABASE IF EXISTS test; CREATE DATABASE test;')
|
||||||
|
wait_kafka_is_available()
|
||||||
|
print("kafka is available - running test")
|
||||||
|
yield # run test
|
||||||
|
|
||||||
|
# Tests
|
||||||
|
|
||||||
|
@pytest.mark.timeout(180) # wait to build containers
|
||||||
|
def test_kafka_json_as_string(kafka_cluster):
|
||||||
|
kafka_produce('kafka_json_as_string', ['{"t": 123, "e": {"x": "woof"} }', '', '{"t": 124, "e": {"x": "test"} }', '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}'])
|
||||||
|
|
||||||
|
instance.query('''
|
||||||
|
CREATE TABLE test.kafka (field String)
|
||||||
|
ENGINE = Kafka
|
||||||
|
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
|
||||||
|
kafka_topic_list = 'kafka_json_as_string',
|
||||||
|
kafka_group_name = 'kafka_json_as_string',
|
||||||
|
kafka_format = 'JSONAsString',
|
||||||
|
kafka_flush_interval_ms=1000;
|
||||||
|
''')
|
||||||
|
|
||||||
|
result = instance.query('SELECT * FROM test.kafka;')
|
||||||
|
expected = '''\
|
||||||
|
{"t": 123, "e": {"x": "woof"} }
|
||||||
|
{"t": 124, "e": {"x": "test"} }
|
||||||
|
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
|
||||||
|
'''
|
||||||
|
assert TSV(result) == TSV(expected)
|
||||||
|
assert instance.contains_in_log("Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows")
|
||||||
|
|
||||||
|
def test_kafka_json_as_string_no_kdc(kafka_cluster):
|
||||||
|
kafka_produce('kafka_json_as_string_no_kdc', ['{"t": 123, "e": {"x": "woof"} }', '', '{"t": 124, "e": {"x": "test"} }', '{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}'])
|
||||||
|
|
||||||
|
kafka_cluster.pause_container('kafka_kerberos')
|
||||||
|
time.sleep(45) # wait for ticket expiration
|
||||||
|
|
||||||
|
instance.query('''
|
||||||
|
CREATE TABLE test.kafka_no_kdc (field String)
|
||||||
|
ENGINE = Kafka
|
||||||
|
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
|
||||||
|
kafka_topic_list = 'kafka_json_as_string_no_kdc',
|
||||||
|
kafka_group_name = 'kafka_json_as_string_no_kdc',
|
||||||
|
kafka_format = 'JSONAsString',
|
||||||
|
kafka_flush_interval_ms=1000;
|
||||||
|
''')
|
||||||
|
|
||||||
|
result = instance.query('SELECT * FROM test.kafka_no_kdc;')
|
||||||
|
expected = ''
|
||||||
|
|
||||||
|
kafka_cluster.unpause_container('kafka_kerberos')
|
||||||
|
|
||||||
|
|
||||||
|
assert TSV(result) == TSV(expected)
|
||||||
|
assert instance.contains_in_log("StorageKafka (kafka_no_kdc): Nothing to commit")
|
||||||
|
assert instance.contains_in_log("Ticket expired")
|
||||||
|
assert instance.contains_in_log("Kerberos ticket refresh failed")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
cluster.start()
|
||||||
|
raw_input("Cluster created, press any key to destroy...")
|
||||||
|
cluster.shutdown()
|
Loading…
Reference in New Issue
Block a user