exceptions_kafka_consumers: code review suggestions are addressed

This commit is contained in:
Ilya Golshtein 2023-08-24 13:10:21 +00:00
parent 901240eede
commit f58b4a812d
2 changed files with 22 additions and 43 deletions

View File

@ -549,26 +549,15 @@ void KafkaConsumer::setExceptionInfo(const cppkafka::Error & err, bool with_stac
void KafkaConsumer::setExceptionInfo(const std::string & text, bool with_stacktrace)
{
std::string exceptionWithTrace;
std::string enriched_text = text;
if (with_stacktrace)
{
try
{
throw Exception();
}
catch (const std::exception & ex)
{
exceptionWithTrace = text + getExceptionStackTraceString(ex);
}
}
else
{
exceptionWithTrace = text;
enriched_text.append(StackTrace().toString());
}
std::lock_guard<std::mutex> lock(exception_mutex);
exceptions_buffer.push_back({exceptionWithTrace, static_cast<UInt64>(Poco::Timestamp().epochTime())});
exceptions_buffer.push_back({enriched_text, static_cast<UInt64>(Poco::Timestamp().epochTime())});
}
/*

View File

@ -322,22 +322,17 @@ def test_bad_messages_parsing_exception(kafka_cluster, max_retries=20):
expected_result = """avro::Exception: Invalid data file. Magic does not match: : while parsing Kafka message (topic: Avro_err, partition: 0, offset: 0)\\'|1|1|1|default|kafka_Avro
Cannot parse input: expected \\'{\\' before: \\'qwertyuiop\\': while parsing Kafka message (topic: JSONEachRow_err, partition: 0, offset: 0|1|1|1|default|kafka_JSONEachRow
"""
retries = 0
result_system_kafka_consumers = ""
while True:
# filter out stacktrace in exceptions.text[1] because it is hardly stable enough
result_system_kafka_consumers = instance.query(
"""
SELECT substr(exceptions.text[1], 1, 131), length(exceptions.text) > 1 AND length(exceptions.text) < 15, length(exceptions.time) > 1 AND length(exceptions.time) < 15, abs(dateDiff('second', exceptions.time[1], now())) < 40, database, table FROM system.kafka_consumers WHERE table in('kafka_Avro', 'kafka_JSONEachRow') ORDER BY table, assignments.partition_id[1]
"""
)
result_system_kafka_consumers = result_system_kafka_consumers.replace("\t", "|")
if result_system_kafka_consumers == expected_result or retries > max_retries:
break
retries += 1
time.sleep(1)
# filter out stacktrace in exceptions.text[1] because it is hardly stable enough
result_system_kafka_consumers = instance.query_with_retry(
"""
SELECT substr(exceptions.text[1], 1, 131), length(exceptions.text) > 1 AND length(exceptions.text) < 15, length(exceptions.time) > 1 AND length(exceptions.time) < 15, abs(dateDiff('second', exceptions.time[1], now())) < 40, database, table FROM system.kafka_consumers WHERE table in('kafka_Avro', 'kafka_JSONEachRow') ORDER BY table, assignments.partition_id[1]
""",
retry_count=max_retries,
sleep_time=1,
check_callback=lambda res: res.replace("\t", "|") == expected_result,
)
assert result_system_kafka_consumers == expected_result
assert result_system_kafka_consumers.replace("\t", "|") == expected_result
for format_name in [
"Avro",
@ -380,21 +375,16 @@ def test_bad_messages_to_mv(kafka_cluster, max_retries=20):
expected_result = """Code: 6. DB::Exception: Cannot parse string \\'aaa\\' as UInt64: syntax error at begin of string. Note: there are toUInt64OrZero and to|1|1|1|default|kafka1
"""
retries = 0
result_system_kafka_consumers = ""
while True:
result_system_kafka_consumers = instance.query(
"""
SELECT substr(exceptions.text[1], 1, 131), length(exceptions.text) > 1 AND length(exceptions.text) < 15, length(exceptions.time) > 1 AND length(exceptions.time) < 15, abs(dateDiff('second', exceptions.time[1], now())) < 40, database, table FROM system.kafka_consumers WHERE table='kafka1' ORDER BY table, assignments.partition_id[1]
"""
)
result_system_kafka_consumers = result_system_kafka_consumers.replace("\t", "|")
if result_system_kafka_consumers == expected_result or retries > max_retries:
break
retries += 1
time.sleep(1)
result_system_kafka_consumers = instance.query_with_retry(
"""
SELECT substr(exceptions.text[1], 1, 131), length(exceptions.text) > 1 AND length(exceptions.text) < 15, length(exceptions.time) > 1 AND length(exceptions.time) < 15, abs(dateDiff('second', exceptions.time[1], now())) < 40, database, table FROM system.kafka_consumers WHERE table='kafka1' ORDER BY table, assignments.partition_id[1]
""",
retry_count=max_retries,
sleep_time=1,
check_callback=lambda res: res.replace("\t", "|") == expected_result,
)
assert result_system_kafka_consumers == expected_result
assert result_system_kafka_consumers.replace("\t", "|") == expected_result
kafka_delete_topic(admin_client, "tomv")