system_kafka_consumers: exceptions seem to work

This commit is contained in:
Ilya Golshtein 2023-08-01 12:01:13 +00:00
parent e8b10f0126
commit 7dab2dc041
6 changed files with 193 additions and 22 deletions

View File

@ -61,6 +61,7 @@ KafkaConsumer::KafkaConsumer(
, stopped(stopped_) , stopped(stopped_)
, current(messages.begin()) , current(messages.begin())
, topics(_topics) , topics(_topics)
, exceptions_buffer(EXCEPTIONS_DEPTH)
{ {
// called (synchronously, during poll) when we enter the consumer group // called (synchronously, during poll) when we enter the consumer group
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions) consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
@ -128,6 +129,7 @@ KafkaConsumer::KafkaConsumer(
{ {
LOG_ERROR(log, "Rebalance error: {}", err); LOG_ERROR(log, "Rebalance error: {}", err);
ProfileEvents::increment(ProfileEvents::KafkaRebalanceErrors); ProfileEvents::increment(ProfileEvents::KafkaRebalanceErrors);
setExceptionInfo(err);
}); });
} }
@ -181,6 +183,7 @@ void KafkaConsumer::drain()
else else
{ {
LOG_ERROR(log, "Error during draining: {}", error); LOG_ERROR(log, "Error during draining: {}", error);
setExceptionInfo(error);
} }
} }
@ -265,7 +268,10 @@ void KafkaConsumer::commit()
if (e.get_error() == RD_KAFKA_RESP_ERR__NO_OFFSET) if (e.get_error() == RD_KAFKA_RESP_ERR__NO_OFFSET)
committed = true; committed = true;
else else
{
LOG_ERROR(log, "Exception during commit attempt: {}", e.what()); LOG_ERROR(log, "Exception during commit attempt: {}", e.what());
setExceptionInfo(e.what());
}
} }
--max_retries; --max_retries;
} }
@ -503,6 +509,7 @@ size_t KafkaConsumer::filterMessageErrors()
{ {
ProfileEvents::increment(ProfileEvents::KafkaConsumerErrors); ProfileEvents::increment(ProfileEvents::KafkaConsumerErrors);
LOG_ERROR(log, "Consumer error: {}", error); LOG_ERROR(log, "Consumer error: {}", error);
setExceptionInfo(error);
return true; return true;
} }
return false; return false;
@ -535,12 +542,15 @@ void KafkaConsumer::storeLastReadMessageOffset()
} }
} }
void KafkaConsumer::setExceptionInfo(const cppkafka::Error & err)
{
setExceptionInfo(err.to_string());
}
void KafkaConsumer::setExceptionInfo(const String & text) void KafkaConsumer::setExceptionInfo(const String & text)
{ {
last_exception_timestamp_usec = static_cast<Int64>(Poco::Timestamp().epochTime());
std::lock_guard<std::mutex> lock(exception_mutex); std::lock_guard<std::mutex> lock(exception_mutex);
last_exception_text = text; exceptions_buffer.push_back({text, static_cast<UInt64>(Poco::Timestamp().epochTime())});
} }
/* /*
@ -576,9 +586,6 @@ KafkaConsumer::Stat KafkaConsumer::getStat()
return { return {
.consumer_id = getMemberId() /* consumer->get_member_id() */ , .consumer_id = getMemberId() /* consumer->get_member_id() */ ,
.assignments = std::move(assignments), .assignments = std::move(assignments),
.last_exception = [&](){std::lock_guard<std::mutex> lock(exception_mutex);
return last_exception_text;}(),
.last_exception_time = last_exception_timestamp_usec.load(),
.last_poll_time = last_poll_timestamp_usec.load(), .last_poll_time = last_poll_timestamp_usec.load(),
.num_messages_read = num_messages_read.load(), .num_messages_read = num_messages_read.load(),
@ -587,6 +594,8 @@ KafkaConsumer::Stat KafkaConsumer::getStat()
.num_commits = num_commits.load(), .num_commits = num_commits.load(),
.num_rebalance_assignments = num_rebalance_assignments.load(), .num_rebalance_assignments = num_rebalance_assignments.load(),
.num_rebalance_revocations = num_rebalance_revocations.load(), .num_rebalance_revocations = num_rebalance_revocations.load(),
.exceptions_buffer = [&](){std::lock_guard<std::mutex> lock(exception_mutex);
return exceptions_buffer;}(),
.in_use = in_use.load() .in_use = in_use.load()
}; };
} }

View File

@ -1,5 +1,7 @@
#pragma once #pragma once
#include <boost/circular_buffer.hpp>
#include <Core/Names.h> #include <Core/Names.h>
#include <base/types.h> #include <base/types.h>
#include <IO/ReadBuffer.h> #include <IO/ReadBuffer.h>
@ -27,6 +29,13 @@ using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
class KafkaConsumer class KafkaConsumer
{ {
public: public:
struct ExceptionInfo
{
String text;
UInt64 timestamp_usec;
};
using ExceptionsBuffer = boost::circular_buffer<ExceptionInfo>;
struct Stat // system.kafka_consumers data struct Stat // system.kafka_consumers data
{ {
struct Assignment struct Assignment
@ -39,8 +48,8 @@ public:
String consumer_id; String consumer_id;
Assignments assignments; Assignments assignments;
String last_exception; // String last_exception;
UInt64 last_exception_time; // UInt64 last_exception_time;
UInt64 last_poll_time; UInt64 last_poll_time;
UInt64 num_messages_read; UInt64 num_messages_read;
UInt64 last_commit_timestamp_usec; UInt64 last_commit_timestamp_usec;
@ -48,6 +57,7 @@ public:
UInt64 num_commits; UInt64 num_commits;
UInt64 num_rebalance_assignments; UInt64 num_rebalance_assignments;
UInt64 num_rebalance_revocations; UInt64 num_rebalance_revocations;
KafkaConsumer::ExceptionsBuffer exceptions_buffer;
bool in_use; bool in_use;
}; };
@ -96,6 +106,7 @@ public:
auto currentTimestamp() const { return current[-1].get_timestamp(); } auto currentTimestamp() const { return current[-1].get_timestamp(); }
const auto & currentHeaderList() const { return current[-1].get_header_list(); } const auto & currentHeaderList() const { return current[-1].get_header_list(); }
String currentPayload() const { return current[-1].get_payload(); } String currentPayload() const { return current[-1].get_payload(); }
void setExceptionInfo(const cppkafka::Error & err);
void setExceptionInfo(const String & text); void setExceptionInfo(const String & text);
void inUse() { in_use = true; } void inUse() { in_use = true; }
void notInUse() { in_use = false; } void notInUse() { in_use = false; }
@ -103,6 +114,7 @@ public:
// For system.kafka_consumers // For system.kafka_consumers
Stat getStat(); Stat getStat();
private: private:
using Messages = std::vector<cppkafka::Message>; using Messages = std::vector<cppkafka::Message>;
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaConsumers}; CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaConsumers};
@ -140,7 +152,10 @@ private:
/// system.kafka_consumers data is retrieved asynchronously, /// system.kafka_consumers data is retrieved asynchronously,
mutable std::mutex exception_mutex; mutable std::mutex exception_mutex;
String last_exception_text;
const size_t EXCEPTIONS_DEPTH = 10;
ExceptionsBuffer exceptions_buffer;
std::atomic<UInt64> last_exception_timestamp_usec = 0; std::atomic<UInt64> last_exception_timestamp_usec = 0;
std::atomic<UInt64> last_poll_timestamp_usec = 0; std::atomic<UInt64> last_poll_timestamp_usec = 0;

View File

@ -133,6 +133,7 @@ Chunk KafkaSource::generateImpl()
{ {
e.addMessage("while parsing Kafka message (topic: {}, partition: {}, offset: {})'", e.addMessage("while parsing Kafka message (topic: {}, partition: {}, offset: {})'",
consumer->currentTopic(), consumer->currentPartition(), consumer->currentOffset()); consumer->currentTopic(), consumer->currentPartition(), consumer->currentOffset());
consumer->setExceptionInfo(e.message());
throw std::move(e); throw std::move(e);
} }
}; };

View File

@ -31,8 +31,8 @@ NamesAndTypesList StorageSystemKafkaConsumers::getNamesAndTypes()
{"assignments.topic", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())}, {"assignments.topic", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"assignments.partition_id", std::make_shared<DataTypeArray>(std::make_shared<DataTypeInt32>())}, {"assignments.partition_id", std::make_shared<DataTypeArray>(std::make_shared<DataTypeInt32>())},
{"assignments.current_offset", std::make_shared<DataTypeArray>(std::make_shared<DataTypeInt64>())}, {"assignments.current_offset", std::make_shared<DataTypeArray>(std::make_shared<DataTypeInt64>())},
{"last_exception_time", std::make_shared<DataTypeDateTime>()}, {"exceptions.time", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
{"last_exception", std::make_shared<DataTypeString>()}, {"exceptions.text", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"last_poll_time", std::make_shared<DataTypeDateTime>()}, {"last_poll_time", std::make_shared<DataTypeDateTime>()},
{"num_messages_read", std::make_shared<DataTypeUInt64>()}, {"num_messages_read", std::make_shared<DataTypeUInt64>()},
{"last_commit_time", std::make_shared<DataTypeDateTime>()}, {"last_commit_time", std::make_shared<DataTypeDateTime>()},
@ -66,8 +66,10 @@ void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, Context
auto & assigments_current_offset = assert_cast<ColumnInt64 &>(assert_cast<ColumnArray &>(*res_columns[index]).getData()); auto & assigments_current_offset = assert_cast<ColumnInt64 &>(assert_cast<ColumnArray &>(*res_columns[index]).getData());
auto & assigments_current_offset_offsets = assert_cast<ColumnArray &>(*res_columns[index++]).getOffsets(); auto & assigments_current_offset_offsets = assert_cast<ColumnArray &>(*res_columns[index++]).getOffsets();
auto & last_exception_time = assert_cast<ColumnDateTime &>(*res_columns[index++]); auto & exceptions_time = assert_cast<ColumnDateTime &>(assert_cast<ColumnArray &>(*res_columns[index]).getData());
auto & last_exception = assert_cast<ColumnString &>(*res_columns[index++]); auto & exceptions_time_offset = assert_cast<ColumnArray &>(*res_columns[index++]).getOffsets();
auto & exceptions_text = assert_cast<ColumnString &>(assert_cast<ColumnArray &>(*res_columns[index]).getData());
auto & exceptions_text_offset = assert_cast<ColumnArray &>(*res_columns[index++]).getOffsets();
auto & last_poll_time = assert_cast<ColumnDateTime &>(*res_columns[index++]); auto & last_poll_time = assert_cast<ColumnDateTime &>(*res_columns[index++]);
auto & num_messages_read = assert_cast<ColumnUInt64 &>(*res_columns[index++]); auto & num_messages_read = assert_cast<ColumnUInt64 &>(*res_columns[index++]);
auto & last_commit_time = assert_cast<ColumnDateTime &>(*res_columns[index++]); auto & last_commit_time = assert_cast<ColumnDateTime &>(*res_columns[index++]);
@ -80,6 +82,7 @@ void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, Context
const auto access = context->getAccess(); const auto access = context->getAccess();
size_t last_assignment_num = 0; size_t last_assignment_num = 0;
size_t exceptions_num = 0;
auto add_row = [&](const DatabaseTablesIteratorPtr & it, StorageKafka * storage_kafka_ptr) auto add_row = [&](const DatabaseTablesIteratorPtr & it, StorageKafka * storage_kafka_ptr)
{ {
@ -121,8 +124,19 @@ void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, Context
assigments_partition_id_offsets.push_back(last_assignment_num); assigments_partition_id_offsets.push_back(last_assignment_num);
assigments_current_offset_offsets.push_back(last_assignment_num); assigments_current_offset_offsets.push_back(last_assignment_num);
last_exception.insertData(consumer_stat.last_exception.data(), consumer_stat.last_exception.size()); // last_exception.insertData(consumer_stat.last_exception.data(), consumer_stat.last_exception.size());
last_exception_time.insert(consumer_stat.last_exception_time); // last_exception_time.insert(consumer_stat.last_exception_time);
for (auto excit = consumer_stat.exceptions_buffer.begin();
excit != consumer_stat.exceptions_buffer.end();
++excit)
{
exceptions_text.insertData(excit->text.data(), excit->text.size());
exceptions_time.insert(excit->timestamp_usec);
}
exceptions_num += consumer_stat.exceptions_buffer.size();
exceptions_text_offset.push_back(exceptions_num);
exceptions_time_offset.push_back(exceptions_num);
last_poll_time.insert(consumer_stat.last_poll_time); last_poll_time.insert(consumer_stat.last_poll_time);
num_messages_read.insert(consumer_stat.num_messages_read); num_messages_read.insert(consumer_stat.num_messages_read);

View File

@ -90,7 +90,7 @@ def producer_serializer(x):
return x.encode() if isinstance(x, str) else x return x.encode() if isinstance(x, str) else x
def kafka_produce(kafka_cluster, topic, messages, timestamp=None, retries=15): def kafka_produce(kafka_cluster, topic, messages, timestamp=None, retries=15, partition=None):
logging.debug( logging.debug(
"kafka_produce server:{}:{} topic:{}".format( "kafka_produce server:{}:{} topic:{}".format(
"localhost", kafka_cluster.kafka_port, topic "localhost", kafka_cluster.kafka_port, topic
@ -100,7 +100,7 @@ def kafka_produce(kafka_cluster, topic, messages, timestamp=None, retries=15):
kafka_cluster.kafka_port, producer_serializer, retries kafka_cluster.kafka_port, producer_serializer, retries
) )
for message in messages: for message in messages:
producer.send(topic=topic, value=message, timestamp_ms=timestamp) producer.send(topic=topic, value=message, timestamp_ms=timestamp, partition=partition)
producer.flush() producer.flush()
@ -115,7 +115,7 @@ def kafka_cluster():
cluster.shutdown() cluster.shutdown()
def test_bad_messages_parsing(kafka_cluster): def test_bad_messages_parsing_stream(kafka_cluster):
admin_client = KafkaAdminClient( admin_client = KafkaAdminClient(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port) bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
) )
@ -227,6 +227,13 @@ message Message {
assert rows == len(messages) assert rows == len(messages)
result_system_kafka_consumers = instance.query(
"""
SELECT last_exception, last_exception_time, database, table FROM system.kafka_consumers
"""
)
logging.debug(f"result_system_kafka_consumers (test_bad_messages_parsing 2): {result_system_kafka_consumers}")
kafka_delete_topic(admin_client, f"{format_name}_err") kafka_delete_topic(admin_client, f"{format_name}_err")
capn_proto_schema = """ capn_proto_schema = """
@ -244,7 +251,7 @@ struct Message
f""" f"""
DROP TABLE IF EXISTS view; DROP TABLE IF EXISTS view;
DROP TABLE IF EXISTS kafka; DROP TABLE IF EXISTS kafka;
CREATE TABLE kafka (key UInt64, value UInt64) CREATE TABLE kafka (key UInt64, value UInt64)
ENGINE = Kafka ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092', SETTINGS kafka_broker_list = 'kafka1:19092',
@ -253,9 +260,9 @@ struct Message
kafka_format = 'CapnProto', kafka_format = 'CapnProto',
kafka_handle_error_mode='stream', kafka_handle_error_mode='stream',
kafka_schema='schema_test_errors:Message'; kafka_schema='schema_test_errors:Message';
CREATE MATERIALIZED VIEW view Engine=Log AS CREATE MATERIALIZED VIEW view Engine=Log AS
SELECT _error FROM kafka WHERE length(_error) != 0 ; SELECT _error FROM kafka WHERE length(_error) != 0 ;
""" """
) )
@ -278,6 +285,113 @@ struct Message
kafka_delete_topic(admin_client, "CapnProto_err") kafka_delete_topic(admin_client, "CapnProto_err")
def test_bad_messages_parsing_exception(kafka_cluster, max_retries=20):
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
)
for format_name in [
"Avro",
"JSONEachRow",
]:
print(format_name)
kafka_create_topic(admin_client, f"{format_name}_err", num_partitions=2)
instance.query(
f"""
DROP TABLE IF EXISTS view_{format_name};
DROP TABLE IF EXISTS kafka_{format_name};
CREATE TABLE kafka_{format_name} (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{format_name}_err',
kafka_group_name = '{format_name}',
kafka_format = '{format_name}',
kafka_num_consumers = 2;
CREATE MATERIALIZED VIEW view_{format_name} Engine=Log AS
SELECT * FROM kafka_{format_name};
"""
)
kafka_produce(kafka_cluster, f"{format_name}_err", ["qwertyuiop"], partition=0)
kafka_produce(kafka_cluster, f"{format_name}_err", ["asdfghjkl"], partition=1)
kafka_produce(kafka_cluster, f"{format_name}_err", ["zxcvbnm"], partition=0)
time.sleep(6)
result_system_kafka_consumers = instance.query(
# """
# SELECT exceptions.text[1], length(exceptions.text) > 3 AND length(exceptions.text) < 10, length(exceptions.time) > 3 AND length(exceptions.time) < 10, abs(dateDiff('second', exceptions.time[1], now())) < 30, database, table FROM system.kafka_consumers ORDER BY table, assignments.partition_id[1]
# """
"""
SELECT length(exceptions.text), length(exceptions.time), exceptions.time[1], database, table FROM system.kafka_consumers ORDER BY table, assignments.partition_id[1]
"""
)
logging.debug(f"result_system_kafka_consumers (test_bad_messages_parsing_exception 1): {result_system_kafka_consumers}")
logging.debug(f"result_system_kafka_consumers (test_bad_messages_parsing_exception 2): {result_system_kafka_consumers}")
expected_result = """1|1|1|default|kafka_Avro
1|1|1|default|kafka_Avro
1|1|1|default|kafka_JSONEachRow
1|1|1|default|kafka_JSONEachRow
"""
retries = 0
result_system_kafka_consumers = ""
while True:
result_system_kafka_consumers = instance.query(
# """
# SELECT exceptions.text[1], length(exceptions.text) > 3 AND length(exceptions.text) < 10, length(exceptions.time) > 3 AND length(exceptions.time) < 10, abs(dateDiff('second', exceptions.time[1], now())) < 30, database, table FROM system.kafka_consumers ORDER BY table, assignments.partition_id[1]
# """
# """
# SELECT CONCAT((length(exceptions.text) > 2 AND length(exceptions.text) < 15)::String, '_', (length(exceptions.time) > 2 AND length(exceptions.time) < 15)::String, '_', (abs(dateDiff('second', exceptions.time[1], now())) < 40)::String, '_', database, '_', table) FROM system.kafka_consumers ORDER BY table, assignments.partition_id[1] format PrettySpaceNoEscapesMonoBlock
# """
"""
SELECT length(exceptions.text) > 1 AND length(exceptions.text) < 15, length(exceptions.time) > 1 AND length(exceptions.time) < 15, abs(dateDiff('second', exceptions.time[1], now())) < 40, database, table FROM system.kafka_consumers ORDER BY table, assignments.partition_id[1]
"""
)
result_system_kafka_consumers = result_system_kafka_consumers.replace('\t', '|')
if result_system_kafka_consumers == expected_result or retries > max_retries:
break
retries += 1
time.sleep(1)
# assert result_system_kafka_consumers == """ avro::Exception: Invalid data file. Magic does not match: : while parsing Kafka message (topic: Avro_err, partition: 0, offset: 0)' 1 1 1 default kafka_Avro
# avro::Exception: Invalid data file. Magic does not match: : while parsing Kafka message (topic: Avro_err, partition: 1, offset: 0)' 1 1 1 default kafka_Avro
# Cannot parse input: expected '{' before: 'qwertyuiop': while parsing Kafka message (topic: JSONEachRow_err, partition: 0, offset: 0)' 1 1 1 default kafka_JSONEachRow
# Cannot parse input: expected '{' before: 'asdfghjkl': while parsing Kafka message (topic: JSONEachRow_err, partition: 1, offset: 0)' 1 1 1 default kafka_JSONEachRow"""
assert result_system_kafka_consumers == expected_result
# for format_name in [
# "TSV",
# "TSKV",
# "JSONEachRow",
# "CSV",
# "Values",
# "JSON",
# "JSONCompactEachRow",
# "JSONObjectEachRow",
# "Avro",
# "RowBinary",
# "JSONColumns",
# "JSONColumnsWithMetadata",
# "Native",
# "Arrow",
# "ArrowStream",
# "Parquet",
# "ORC",
# "JSONCompactColumns",
# "BSONEachRow",
# "MySQLDump",
# ]:
# kafka_delete_topic(admin_client, f"{format_name}_err")
if __name__ == "__main__": if __name__ == "__main__":
cluster.start() cluster.start()

View File

@ -1929,6 +1929,15 @@ def test_kafka_flush_on_big_message(kafka_cluster):
if int(result) == kafka_messages * batch_messages: if int(result) == kafka_messages * batch_messages:
break break
result_system_kafka_consumers = instance.query(
"""
SELECT last_exception, last_exception_time, database, table FROM system.kafka_consumers
"""
)
logging.debug(f"result_system_kafka_consumers (test_kafka_flush_on_big_message): {result_system_kafka_consumers}")
assert (result_system_kafka_consumers == "fake string")
instance.query( instance.query(
""" """
DROP TABLE test.consumer; DROP TABLE test.consumer;
@ -3213,6 +3222,15 @@ def test_kafka_duplicates_when_commit_failed(kafka_cluster):
result = instance.query("SELECT count(), uniqExact(key), max(key) FROM test.view") result = instance.query("SELECT count(), uniqExact(key), max(key) FROM test.view")
logging.debug(result) logging.debug(result)
result_system_kafka_consumers = instance.query(
"""
SELECT last_exception, last_exception_time, database, table FROM system.kafka_consumers
"""
)
logging.debug(f"result_system_kafka_consumers (test_kafka_duplicates_when_commit_failed): {result_system_kafka_consumers}")
instance.query( instance.query(
""" """
DROP TABLE test.consumer SYNC; DROP TABLE test.consumer SYNC;
@ -4692,7 +4710,7 @@ def test_system_kafka_consumers_rebalance(kafka_cluster, max_retries=15):
FROM system.kafka_consumers WHERE database='test' and table IN ('kafka', 'kafka2') format Vertical; FROM system.kafka_consumers WHERE database='test' and table IN ('kafka', 'kafka2') format Vertical;
""" """
) )
logging.debug(f"result_system_kafka_consumers (1): {result_system_kafka_consumers}") logging.debug(f"result_system_kafka_consumers: {result_system_kafka_consumers}")
assert ( assert (
result_system_kafka_consumers result_system_kafka_consumers
== """Row 1: == """Row 1: