diff --git a/src/Storages/Kafka/KafkaSettings.h b/src/Storages/Kafka/KafkaSettings.h index 93983fb6080..43984f81e05 100644 --- a/src/Storages/Kafka/KafkaSettings.h +++ b/src/Storages/Kafka/KafkaSettings.h @@ -19,6 +19,7 @@ struct KafkaSettings : public SettingsCollection M(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.", 0) \ M(SettingString, kafka_topic_list, "", "A list of Kafka topics.", 0) \ M(SettingString, kafka_group_name, "", "A group of Kafka consumers.", 0) \ + M(SettingString, kafka_client_id, "", "A client id of Kafka consumer.", 0) \ M(SettingString, kafka_format, "", "The message format for Kafka engine.", 0) \ M(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.", 0) \ M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine", 0) \ diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 930d6ff55b5..c35a031df1a 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -34,6 +34,7 @@ #include #include #include +#include namespace DB @@ -118,6 +119,7 @@ StorageKafka::StorageKafka( const ColumnsDescription & columns_, const String & brokers_, const String & group_, + const String & client_id_, const Names & topics_, const String & format_name_, char row_delimiter_, @@ -132,6 +134,7 @@ StorageKafka::StorageKafka( , topics(global_context.getMacros()->expand(topics_)) , brokers(global_context.getMacros()->expand(brokers_)) , group(global_context.getMacros()->expand(group_)) + , client_id(client_id_.empty() ? getDefaultClientId(table_id_) : global_context.getMacros()->expand(client_id_)) , format_name(global_context.getMacros()->expand(format_name_)) , row_delimiter(row_delimiter_) , schema_name(global_context.getMacros()->expand(schema_name_)) @@ -149,6 +152,13 @@ StorageKafka::StorageKafka( task->deactivate(); } +String StorageKafka::getDefaultClientId(const StorageID & table_id_) +{ + std::stringstream ss; + ss << VERSION_NAME << "-" << getFQDNOrHostName() << "-" << table_id_.database_name << "-" << table_id_.table_name; + return ss.str(); +} + Pipes StorageKafka::read( const Names & column_names, @@ -194,7 +204,7 @@ void StorageKafka::startup() { try { - pushReadBuffer(createReadBuffer()); + pushReadBuffer(createReadBuffer(i)); ++num_created_consumers; } catch (const cppkafka::Exception &) @@ -262,7 +272,7 @@ ProducerBufferPtr StorageKafka::createWriteBuffer(const Block & header) cppkafka::Configuration conf; conf.set("metadata.broker.list", brokers); conf.set("group.id", group); - conf.set("client.id", VERSION_FULL); + conf.set("client.id", client_id); // TODO: fill required settings updateConfiguration(conf); @@ -275,13 +285,22 @@ ProducerBufferPtr StorageKafka::createWriteBuffer(const Block & header) } -ConsumerBufferPtr StorageKafka::createReadBuffer() +ConsumerBufferPtr StorageKafka::createReadBuffer(const size_t consumer_number) { cppkafka::Configuration conf; conf.set("metadata.broker.list", brokers); conf.set("group.id", group); - conf.set("client.id", VERSION_FULL); + if (num_consumers > 1) + { + std::stringstream ss; + ss << client_id << "-" << consumer_number; + conf.set("client.id", ss.str()); + } + else + { + conf.set("client.id", client_id); + } conf.set("auto.offset.reset", "smallest"); // If no offset stored for this group, read all messages from the start @@ -503,6 +522,7 @@ void registerStorageKafka(StorageFactory & factory) * - Kafka broker list * - List of topics * - Group ID (may be a constaint expression with a string result) + * - Client ID * - Message format (string) * - Row delimiter * - Schema (optional, if the format supports it) @@ -709,9 +729,12 @@ void registerStorageKafka(StorageFactory & factory) } } + // Get and check client id + String client_id = kafka_settings.kafka_client_id.value; + return StorageKafka::create( args.table_id, args.context, args.columns, - brokers, group, topics, format, row_delimiter, schema, num_consumers, max_block_size, skip_broken, intermediate_commit); + brokers, group, client_id, topics, format, row_delimiter, schema, num_consumers, max_block_size, skip_broken, intermediate_commit); }; factory.registerStorage("Kafka", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, }); diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index c813ed0033d..1ea7d6dcad7 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -67,6 +67,7 @@ protected: const ColumnsDescription & columns_, const String & brokers_, const String & group_, + const String & client_id_, const Names & topics_, const String & format_name_, char row_delimiter_, @@ -83,6 +84,7 @@ private: Names topics; const String brokers; const String group; + const String client_id; const String format_name; char row_delimiter; /// optional row delimiter for generating char delimited stream in order to make various input stream parsers happy. const String schema_name; @@ -108,12 +110,13 @@ private: BackgroundSchedulePool::TaskHolder task; std::atomic stream_cancelled{false}; - ConsumerBufferPtr createReadBuffer(); + ConsumerBufferPtr createReadBuffer(const size_t consumer_number); // Update Kafka configuration with values from CH user configuration. void updateConfiguration(cppkafka::Configuration & conf); void threadFunc(); + static String getDefaultClientId(const StorageID & table_id_); bool streamToViews(); bool checkDependencies(const StorageID & table_id); }; diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index d09cc5659a1..6684266ccab 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -12,8 +12,11 @@ from helpers.network import PartitionManager import json import subprocess import kafka.errors -from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer +from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer, BrokerConnection from kafka.admin import NewTopic +from kafka.protocol.admin import DescribeGroupsResponse_v1, DescribeGroupsRequest_v1 +from kafka.protocol.group import MemberAssignment +import socket from google.protobuf.internal.encoder import _VarintBytes """ @@ -110,6 +113,32 @@ def kafka_check_result(result, check=False, ref_file='test_kafka_json.reference else: return TSV(result) == TSV(reference) +# https://stackoverflow.com/a/57692111/1555175 +def describe_consumer_group(name): + client = BrokerConnection('localhost', 9092, socket.AF_INET) + client.connect_blocking() + + list_members_in_groups = DescribeGroupsRequest_v1(groups=[name]) + future = client.send(list_members_in_groups) + while not future.is_done: + for resp, f in client.recv(): + f.success(resp) + + (error_code, group_id, state, protocol_type, protocol, members) = future.value.groups[0] + + res = [] + for member in members: + (member_id, client_id, client_host, member_metadata, member_assignment) = member + member_info = {} + member_info['member_id'] = member_id + member_info['client_id'] = client_id + member_info['client_host'] = client_host + member_topics_assignment = [] + for (topic, partitions) in MemberAssignment.decode(member_assignment).assignment: + member_topics_assignment.append({'topic':topic, 'partitions':partitions}) + member_info['assignment'] = member_topics_assignment + res.append(member_info) + return res # Fixtures @@ -161,6 +190,9 @@ def test_kafka_settings_old_syntax(kafka_cluster): kafka_check_result(result, True) + members = describe_consumer_group('old') + assert members[0]['client_id'] == u'ClickHouse-instance-test-kafka' + # text_desc = kafka_cluster.exec_in_container(kafka_cluster.get_container_id('kafka1'),"kafka-consumer-groups --bootstrap-server localhost:9092 --describe --members --group old --verbose")) @pytest.mark.timeout(180) def test_kafka_settings_new_syntax(kafka_cluster): @@ -172,6 +204,7 @@ def test_kafka_settings_new_syntax(kafka_cluster): kafka_group_name = 'new', kafka_format = 'JSONEachRow', kafka_row_delimiter = '\\n', + kafka_client_id = '{instance} test 1234', kafka_skip_broken_messages = 1; ''') @@ -197,6 +230,8 @@ def test_kafka_settings_new_syntax(kafka_cluster): kafka_check_result(result, True) + members = describe_consumer_group('new') + assert members[0]['client_id'] == u'instance test 1234' @pytest.mark.timeout(180) def test_kafka_consumer_hang(kafka_cluster): @@ -837,6 +872,7 @@ def test_kafka_virtual_columns2(kafka_cluster): SETTINGS kafka_broker_list = 'kafka1:19092', kafka_topic_list = 'virt2_0,virt2_1', kafka_group_name = 'virt2', + kafka_num_consumers = 2, kafka_format = 'JSONEachRow'; CREATE MATERIALIZED VIEW test.view Engine=Log AS @@ -866,6 +902,11 @@ def test_kafka_virtual_columns2(kafka_cluster): time.sleep(10) + members = describe_consumer_group('virt2') + #pprint.pprint(members) + members[0]['client_id'] = u'ClickHouse-instance-test-kafka-0' + members[1]['client_id'] = u'ClickHouse-instance-test-kafka-1' + result = instance.query("SELECT * FROM test.view ORDER BY value", ignore_error=True) expected = '''\