Kafka: catch and log exceptions in desctructors. Fixes #9494

This commit is contained in:
Mikhail Filimonov 2020-03-04 17:38:12 +01:00
parent 5f1d07a714
commit 1be66874f9
2 changed files with 50 additions and 4 deletions

View File

@ -78,9 +78,16 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
ReadBufferFromKafkaConsumer::~ReadBufferFromKafkaConsumer()
{
/// NOTE: see https://github.com/edenhill/librdkafka/issues/2077
consumer->unsubscribe();
consumer->unassign();
while (consumer->get_consumer_queue().next_event(100ms));
try {
if (!consumer->get_subscription().empty())
consumer->unsubscribe();
if (!assignment.empty()) {
consumer->unassign();
}
while (consumer->get_consumer_queue().next_event(100ms));
} catch (const cppkafka::HandleException & e) {
LOG_ERROR(log, "Exception from ReadBufferFromKafkaConsumer destructor: " << e.what());
}
}
void ReadBufferFromKafkaConsumer::commit()
@ -184,7 +191,14 @@ void ReadBufferFromKafkaConsumer::unsubscribe()
current = messages.begin();
BufferBase::set(nullptr, 0, 0);
consumer->unsubscribe();
// it should not raise exception as used in destructor
try {
if (!consumer->get_subscription().empty())
consumer->unsubscribe();
} catch (const cppkafka::HandleException &e) {
LOG_ERROR(log, "Exception from ReadBufferFromKafkaConsumer::unsubscribe: " << e.what());
}
}

View File

@ -1161,6 +1161,38 @@ def test_kafka_no_holes_when_write_suffix_failed(kafka_cluster):
assert TSV(result) == TSV('22\t22\t22')
@pytest.mark.timeout(120)
def test_exception_from_destructor(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'xyz',
kafka_group_name = '',
kafka_format = 'JSONEachRow';
''')
instance.query_and_get_error('''
SELECT * FROM test.kafka;
''')
instance.query('''
DROP TABLE test.kafka;
''')
instance.query('''
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'xyz',
kafka_group_name = '',
kafka_format = 'JSONEachRow';
''')
instance.query('''
DROP TABLE test.kafka;
''')
kafka_cluster.open_bash_shell('instance')
assert TSV(instance.query('SELECT 1')) == TSV('1')
if __name__ == '__main__':
cluster.start()