Merge pull request #11252 from filimonov/kafka-clientid

Kafka clientid (finishing #11073)
This commit is contained in:
alexey-milovidov 2020-05-31 18:37:30 +03:00 committed by GitHub
commit 9a362eb02e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 75 additions and 7 deletions

View File

@ -19,6 +19,7 @@ struct KafkaSettings : public SettingsCollection<KafkaSettings>
M(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.", 0) \
M(SettingString, kafka_topic_list, "", "A list of Kafka topics.", 0) \
M(SettingString, kafka_group_name, "", "A group of Kafka consumers.", 0) \
M(SettingString, kafka_client_id, "", "A client id of Kafka consumer.", 0) \
M(SettingString, kafka_format, "", "The message format for Kafka engine.", 0) \
M(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.", 0) \
M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine", 0) \

View File

@ -34,6 +34,7 @@
#include <Common/quoteString.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <librdkafka/rdkafka.h>
#include <common/getFQDNOrHostName.h>
namespace DB
@ -118,6 +119,7 @@ StorageKafka::StorageKafka(
const ColumnsDescription & columns_,
const String & brokers_,
const String & group_,
const String & client_id_,
const Names & topics_,
const String & format_name_,
char row_delimiter_,
@ -132,6 +134,7 @@ StorageKafka::StorageKafka(
, topics(global_context.getMacros()->expand(topics_))
, brokers(global_context.getMacros()->expand(brokers_))
, group(global_context.getMacros()->expand(group_))
, client_id(client_id_.empty() ? getDefaultClientId(table_id_) : global_context.getMacros()->expand(client_id_))
, format_name(global_context.getMacros()->expand(format_name_))
, row_delimiter(row_delimiter_)
, schema_name(global_context.getMacros()->expand(schema_name_))
@ -149,6 +152,13 @@ StorageKafka::StorageKafka(
task->deactivate();
}
String StorageKafka::getDefaultClientId(const StorageID & table_id_)
{
std::stringstream ss;
ss << VERSION_NAME << "-" << getFQDNOrHostName() << "-" << table_id_.database_name << "-" << table_id_.table_name;
return ss.str();
}
Pipes StorageKafka::read(
const Names & column_names,
@ -194,7 +204,7 @@ void StorageKafka::startup()
{
try
{
pushReadBuffer(createReadBuffer());
pushReadBuffer(createReadBuffer(i));
++num_created_consumers;
}
catch (const cppkafka::Exception &)
@ -262,7 +272,7 @@ ProducerBufferPtr StorageKafka::createWriteBuffer(const Block & header)
cppkafka::Configuration conf;
conf.set("metadata.broker.list", brokers);
conf.set("group.id", group);
conf.set("client.id", VERSION_FULL);
conf.set("client.id", client_id);
// TODO: fill required settings
updateConfiguration(conf);
@ -275,13 +285,22 @@ ProducerBufferPtr StorageKafka::createWriteBuffer(const Block & header)
}
ConsumerBufferPtr StorageKafka::createReadBuffer()
ConsumerBufferPtr StorageKafka::createReadBuffer(const size_t consumer_number)
{
cppkafka::Configuration conf;
conf.set("metadata.broker.list", brokers);
conf.set("group.id", group);
conf.set("client.id", VERSION_FULL);
if (num_consumers > 1)
{
std::stringstream ss;
ss << client_id << "-" << consumer_number;
conf.set("client.id", ss.str());
}
else
{
conf.set("client.id", client_id);
}
conf.set("auto.offset.reset", "smallest"); // If no offset stored for this group, read all messages from the start
@ -503,6 +522,7 @@ void registerStorageKafka(StorageFactory & factory)
* - Kafka broker list
* - List of topics
* - Group ID (may be a constaint expression with a string result)
* - Client ID
* - Message format (string)
* - Row delimiter
* - Schema (optional, if the format supports it)
@ -709,9 +729,12 @@ void registerStorageKafka(StorageFactory & factory)
}
}
// Get and check client id
String client_id = kafka_settings.kafka_client_id.value;
return StorageKafka::create(
args.table_id, args.context, args.columns,
brokers, group, topics, format, row_delimiter, schema, num_consumers, max_block_size, skip_broken, intermediate_commit);
brokers, group, client_id, topics, format, row_delimiter, schema, num_consumers, max_block_size, skip_broken, intermediate_commit);
};
factory.registerStorage("Kafka", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });

View File

@ -67,6 +67,7 @@ protected:
const ColumnsDescription & columns_,
const String & brokers_,
const String & group_,
const String & client_id_,
const Names & topics_,
const String & format_name_,
char row_delimiter_,
@ -83,6 +84,7 @@ private:
Names topics;
const String brokers;
const String group;
const String client_id;
const String format_name;
char row_delimiter; /// optional row delimiter for generating char delimited stream in order to make various input stream parsers happy.
const String schema_name;
@ -108,12 +110,13 @@ private:
BackgroundSchedulePool::TaskHolder task;
std::atomic<bool> stream_cancelled{false};
ConsumerBufferPtr createReadBuffer();
ConsumerBufferPtr createReadBuffer(const size_t consumer_number);
// Update Kafka configuration with values from CH user configuration.
void updateConfiguration(cppkafka::Configuration & conf);
void threadFunc();
static String getDefaultClientId(const StorageID & table_id_);
bool streamToViews();
bool checkDependencies(const StorageID & table_id);
};

View File

@ -12,8 +12,11 @@ from helpers.network import PartitionManager
import json
import subprocess
import kafka.errors
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer, BrokerConnection
from kafka.admin import NewTopic
from kafka.protocol.admin import DescribeGroupsResponse_v1, DescribeGroupsRequest_v1
from kafka.protocol.group import MemberAssignment
import socket
from google.protobuf.internal.encoder import _VarintBytes
"""
@ -110,6 +113,32 @@ def kafka_check_result(result, check=False, ref_file='test_kafka_json.reference
else:
return TSV(result) == TSV(reference)
# https://stackoverflow.com/a/57692111/1555175
def describe_consumer_group(name):
client = BrokerConnection('localhost', 9092, socket.AF_INET)
client.connect_blocking()
list_members_in_groups = DescribeGroupsRequest_v1(groups=[name])
future = client.send(list_members_in_groups)
while not future.is_done:
for resp, f in client.recv():
f.success(resp)
(error_code, group_id, state, protocol_type, protocol, members) = future.value.groups[0]
res = []
for member in members:
(member_id, client_id, client_host, member_metadata, member_assignment) = member
member_info = {}
member_info['member_id'] = member_id
member_info['client_id'] = client_id
member_info['client_host'] = client_host
member_topics_assignment = []
for (topic, partitions) in MemberAssignment.decode(member_assignment).assignment:
member_topics_assignment.append({'topic':topic, 'partitions':partitions})
member_info['assignment'] = member_topics_assignment
res.append(member_info)
return res
# Fixtures
@ -161,6 +190,9 @@ def test_kafka_settings_old_syntax(kafka_cluster):
kafka_check_result(result, True)
members = describe_consumer_group('old')
assert members[0]['client_id'] == u'ClickHouse-instance-test-kafka'
# text_desc = kafka_cluster.exec_in_container(kafka_cluster.get_container_id('kafka1'),"kafka-consumer-groups --bootstrap-server localhost:9092 --describe --members --group old --verbose"))
@pytest.mark.timeout(180)
def test_kafka_settings_new_syntax(kafka_cluster):
@ -172,6 +204,7 @@ def test_kafka_settings_new_syntax(kafka_cluster):
kafka_group_name = 'new',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\\n',
kafka_client_id = '{instance} test 1234',
kafka_skip_broken_messages = 1;
''')
@ -197,6 +230,8 @@ def test_kafka_settings_new_syntax(kafka_cluster):
kafka_check_result(result, True)
members = describe_consumer_group('new')
assert members[0]['client_id'] == u'instance test 1234'
@pytest.mark.timeout(180)
def test_kafka_consumer_hang(kafka_cluster):
@ -837,6 +872,7 @@ def test_kafka_virtual_columns2(kafka_cluster):
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'virt2_0,virt2_1',
kafka_group_name = 'virt2',
kafka_num_consumers = 2,
kafka_format = 'JSONEachRow';
CREATE MATERIALIZED VIEW test.view Engine=Log AS
@ -866,6 +902,11 @@ def test_kafka_virtual_columns2(kafka_cluster):
time.sleep(10)
members = describe_consumer_group('virt2')
#pprint.pprint(members)
members[0]['client_id'] = u'ClickHouse-instance-test-kafka-0'
members[1]['client_id'] = u'ClickHouse-instance-test-kafka-1'
result = instance.query("SELECT * FROM test.view ORDER BY value", ignore_error=True)
expected = '''\