diff --git a/src/Storages/Kafka/KafkaConsumer.cpp b/src/Storages/Kafka/KafkaConsumer.cpp index 076b2c50559..31d431e27fe 100644 --- a/src/Storages/Kafka/KafkaConsumer.cpp +++ b/src/Storages/Kafka/KafkaConsumer.cpp @@ -549,26 +549,15 @@ void KafkaConsumer::setExceptionInfo(const cppkafka::Error & err, bool with_stac void KafkaConsumer::setExceptionInfo(const std::string & text, bool with_stacktrace) { - std::string exceptionWithTrace; + std::string enriched_text = text; if (with_stacktrace) { - try - { - throw Exception(); - } - catch (const std::exception & ex) - { - exceptionWithTrace = text + getExceptionStackTraceString(ex); - } - } - else - { - exceptionWithTrace = text; + enriched_text.append(StackTrace().toString()); } std::lock_guard lock(exception_mutex); - exceptions_buffer.push_back({exceptionWithTrace, static_cast(Poco::Timestamp().epochTime())}); + exceptions_buffer.push_back({enriched_text, static_cast(Poco::Timestamp().epochTime())}); } /* diff --git a/tests/integration/test_kafka_bad_messages/test.py b/tests/integration/test_kafka_bad_messages/test.py index 777d976933d..1633f230f83 100644 --- a/tests/integration/test_kafka_bad_messages/test.py +++ b/tests/integration/test_kafka_bad_messages/test.py @@ -322,22 +322,17 @@ def test_bad_messages_parsing_exception(kafka_cluster, max_retries=20): expected_result = """avro::Exception: Invalid data file. Magic does not match: : while parsing Kafka message (topic: Avro_err, partition: 0, offset: 0)\\'|1|1|1|default|kafka_Avro Cannot parse input: expected \\'{\\' before: \\'qwertyuiop\\': while parsing Kafka message (topic: JSONEachRow_err, partition: 0, offset: 0|1|1|1|default|kafka_JSONEachRow """ - retries = 0 - result_system_kafka_consumers = "" - while True: - # filter out stacktrace in exceptions.text[1] because it is hardly stable enough - result_system_kafka_consumers = instance.query( - """ - SELECT substr(exceptions.text[1], 1, 131), length(exceptions.text) > 1 AND length(exceptions.text) < 15, length(exceptions.time) > 1 AND length(exceptions.time) < 15, abs(dateDiff('second', exceptions.time[1], now())) < 40, database, table FROM system.kafka_consumers WHERE table in('kafka_Avro', 'kafka_JSONEachRow') ORDER BY table, assignments.partition_id[1] - """ - ) - result_system_kafka_consumers = result_system_kafka_consumers.replace("\t", "|") - if result_system_kafka_consumers == expected_result or retries > max_retries: - break - retries += 1 - time.sleep(1) + # filter out stacktrace in exceptions.text[1] because it is hardly stable enough + result_system_kafka_consumers = instance.query_with_retry( + """ + SELECT substr(exceptions.text[1], 1, 131), length(exceptions.text) > 1 AND length(exceptions.text) < 15, length(exceptions.time) > 1 AND length(exceptions.time) < 15, abs(dateDiff('second', exceptions.time[1], now())) < 40, database, table FROM system.kafka_consumers WHERE table in('kafka_Avro', 'kafka_JSONEachRow') ORDER BY table, assignments.partition_id[1] + """, + retry_count=max_retries, + sleep_time=1, + check_callback=lambda res: res.replace("\t", "|") == expected_result, + ) - assert result_system_kafka_consumers == expected_result + assert result_system_kafka_consumers.replace("\t", "|") == expected_result for format_name in [ "Avro", @@ -380,21 +375,16 @@ def test_bad_messages_to_mv(kafka_cluster, max_retries=20): expected_result = """Code: 6. DB::Exception: Cannot parse string \\'aaa\\' as UInt64: syntax error at begin of string. Note: there are toUInt64OrZero and to|1|1|1|default|kafka1 """ - retries = 0 - result_system_kafka_consumers = "" - while True: - result_system_kafka_consumers = instance.query( - """ - SELECT substr(exceptions.text[1], 1, 131), length(exceptions.text) > 1 AND length(exceptions.text) < 15, length(exceptions.time) > 1 AND length(exceptions.time) < 15, abs(dateDiff('second', exceptions.time[1], now())) < 40, database, table FROM system.kafka_consumers WHERE table='kafka1' ORDER BY table, assignments.partition_id[1] - """ - ) - result_system_kafka_consumers = result_system_kafka_consumers.replace("\t", "|") - if result_system_kafka_consumers == expected_result or retries > max_retries: - break - retries += 1 - time.sleep(1) + result_system_kafka_consumers = instance.query_with_retry( + """ + SELECT substr(exceptions.text[1], 1, 131), length(exceptions.text) > 1 AND length(exceptions.text) < 15, length(exceptions.time) > 1 AND length(exceptions.time) < 15, abs(dateDiff('second', exceptions.time[1], now())) < 40, database, table FROM system.kafka_consumers WHERE table='kafka1' ORDER BY table, assignments.partition_id[1] + """, + retry_count=max_retries, + sleep_time=1, + check_callback=lambda res: res.replace("\t", "|") == expected_result, + ) - assert result_system_kafka_consumers == expected_result + assert result_system_kafka_consumers.replace("\t", "|") == expected_result kafka_delete_topic(admin_client, "tomv")