system_kafka_consumers: cleanup, test should be more stable

This commit is contained in:
Ilya Golshtein 2023-07-10 14:03:49 +00:00
parent 803a80b098
commit 46be5e5782
6 changed files with 32 additions and 22 deletions

View File

@ -173,7 +173,7 @@ Similar to GraphiteMergeTree, the Kafka engine supports extended configuration u
<!-- Global configuration options for all tables of Kafka engine type -->
<debug>cgrp</debug>
<auto_offset_reset>smallest</auto_offset_reset>
<statistics_interval_ms>10</statistics_interval_ms>
<statistics_interval_ms>600</statistics_interval_ms>
<!-- Configuration specific to topics "logs" and "stats" -->

View File

@ -14,6 +14,7 @@ Columns:
- `assignments.topic` (Array(String)) - Kafka topic.
- `assignments.partition_id` (Array(Int32)) - Kafka partition id. Note, that only one consumer can be assigned to a partition.
- `assignments.current_offset` (Array(Int64)) - current offset.
- `assignments.offset_committed` (Array(Int64)) - committed offset.
- `last_exception_time`, (DateTime) - timestamp when the most recent exception was generated.
- `last_exception`, (String) - text of the most recent exception.
- `last_poll_time`, (DateTime) - timestamp of the most recent poll.
@ -24,7 +25,7 @@ Columns:
- `num_rebalance_revocations`, (UInt64) - number of times the consumer was revoked its partitions
- `num_rebalance_assignments`, (UInt64) - number of times the consumer was assigned to Kafka cluster
- `is_currently_used`, (UInt8) - consumer is in use
- `rdkafka_stat` (String) - library internal statistic. See https://github.com/ClickHouse/librdkafka/blob/master/STATISTICS.md . Set `statistics_interval_ms` to disable.
- `rdkafka_stat` (String) - library internal statistic. See https://github.com/ClickHouse/librdkafka/blob/master/STATISTICS.md . Set `statistics_interval_ms` to 0 disable, default is 3000 (once in three seconds).
Example:

View File

@ -27,7 +27,7 @@ using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
class KafkaConsumer
{
public:
struct Stat // for system.kafka_consumers
struct Stat // system.kafka_consumers data
{
struct Assignment
{
@ -37,10 +37,11 @@ public:
Int64 offset_committed;
};
using Assignments = std::vector<Assignment>;
String consumer_id; // cpp_consumer->get_member_id();
String consumer_id;
Assignments assignments;
String last_exception;
Int64 last_exception_time;
UInt64 last_exception_time;
UInt64 last_poll_time;
UInt64 num_messages_read;
UInt64 last_commit_timestamp_usec;
@ -100,6 +101,7 @@ public:
void inUse() { in_use = true; }
void notInUse() { in_use = false; }
// For system.kafka_consumers
Stat getStat();
private:
@ -137,11 +139,11 @@ private:
std::optional<cppkafka::TopicPartitionList> assignment;
const Names topics;
/// system.kafka_consumers data is retrieved asynchronously,
mutable std::mutex exception_mutex;
String last_exception_text;
std::atomic<Int64> last_exception_timestamp_usec = 0;
std::atomic<UInt64> last_exception_timestamp_usec = 0;
std::atomic<UInt64> last_poll_timestamp_usec = 0;
std::atomic<UInt64> num_messages_read = 0;
std::atomic<UInt64> last_commit_timestamp_usec = 0;

View File

@ -648,7 +648,7 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
if (!config.has(config_prefix + "." + "statistics_interval_ms"))
{
kafka_config.set("statistics.interval.ms", "600"); // every 600 milliseconds
kafka_config.set("statistics.interval.ms", "3000"); // every 3 seconds by default. set to 0 to disable.
}
if (kafka_config.get("statistics.interval.ms") != "0")

View File

@ -10,6 +10,10 @@
-->
<debug>cgrp,consumer,topic,protocol</debug>
<!-- librdkafka stat in system.kafka_consumers -->
<!-- default 3000 (every three second) seems too long for test -->
<statistics_interval_ms>600</statistics_interval_ms>
<kafka_topic>
<name>consumer_hang</name>
<!-- default: 3000 -->

View File

@ -4582,7 +4582,7 @@ def test_system_kafka_consumers(kafka_cluster):
result_system_kafka_consumers = instance.query(
"""
create or replace function stable_timestamp as
(d)->multiIf(d==toDateTime('1970-01-01 00:00:00'), 'never', abs(dateDiff('second', d, now())) < 20, 'now', toString(d));
(d)->multiIf(d==toDateTime('1970-01-01 00:00:00'), 'never', abs(dateDiff('second', d, now())) < 30, 'now', toString(d));
SELECT database, table, length(consumer_id), assignments.topic, assignments.partition_id,
assignments.current_offset, stable_timestamp(last_exception_time) as last_exception_time_,
@ -4620,7 +4620,7 @@ is_currently_used: 0
kafka_delete_topic(admin_client, topic)
def test_system_kafka_consumers_rebalance(kafka_cluster):
def test_system_kafka_consumers_rebalance(kafka_cluster, max_retries = 15):
# based on test_kafka_consumer_hang2
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
@ -4682,7 +4682,7 @@ def test_system_kafka_consumers_rebalance(kafka_cluster):
result_system_kafka_consumers = instance.query(
"""
create or replace function stable_timestamp as
(d)->multiIf(d==toDateTime('1970-01-01 00:00:00'), 'never', abs(dateDiff('second', d, now())) < 20, 'now', toString(d));
(d)->multiIf(d==toDateTime('1970-01-01 00:00:00'), 'never', abs(dateDiff('second', d, now())) < 30, 'now', toString(d));
SELECT database, table, length(consumer_id), assignments.topic, assignments.partition_id,
assignments.current_offset, stable_timestamp(last_exception_time) as last_exception_time_,
if(length(last_exception)>0, last_exception, 'no exception') as last_exception_,
@ -4735,23 +4735,26 @@ is_currently_used: 0
"""
)
result_rdkafka_stat = instance.query(
"""
SELECT table, JSONExtractString(rdkafka_stat, 'type')
FROM system.kafka_consumers WHERE database='test' and table IN ('kafka', 'kafka2') format Vertical;
"""
)
retries = 0
result_rdkafka_stat = ""
while True:
result_rdkafka_stat = instance.query(
"""
SELECT table, JSONExtractString(rdkafka_stat, 'type')
FROM system.kafka_consumers WHERE database='test' and table = 'kafka' format Vertical;
"""
)
if result_rdkafka_stat.find('consumer') or retries > max_retries:
break
retries += 1
time.sleep(1)
assert (
result_rdkafka_stat
== """Row 1:
table: kafka
JSONExtractString(rdkafka_stat, 'type'): consumer
Row 2:
table: kafka2
JSONExtractString(rdkafka_stat, 'type'): consumer
"""
)