Merge pull request #27794 from filimonov/kafka_null_messages

Avoid nullptr dereference during processing of NULL messages in Kafka for some formats
This commit is contained in:
alexey-milovidov 2021-08-19 11:25:07 +03:00 committed by GitHub
commit 215385e4d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 20 additions and 5 deletions

View File

@ -252,7 +252,11 @@ Block KafkaBlockInputStream::readImpl()
}
else
{
LOG_WARNING(log, "Parsing of message (topic: {}, partition: {}, offset: {}) return no rows.", buffer->currentTopic(), buffer->currentPartition(), buffer->currentOffset());
// We came here in case of tombstone (or sometimes zero-length) messages, and it is not something abnormal
// TODO: it seems like in case of put_error_to_stream=true we may need to process those differently
// currently we just skip them with note in logs.
buffer->storeLastReadMessageOffset();
LOG_DEBUG(log, "Parsing of message (topic: {}, partition: {}, offset: {}) return no rows.", buffer->currentTopic(), buffer->currentPartition(), buffer->currentOffset());
}
if (!buffer->hasMorePolledMessages()

View File

@ -466,13 +466,19 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
if (!allowed || !hasMorePolledMessages())
return false;
// XXX: very fishy place with const casting.
auto * new_position = reinterpret_cast<char *>(const_cast<unsigned char *>(current->get_payload().get_data()));
BufferBase::set(new_position, current->get_payload().get_size(), 0);
allowed = false;
const auto * message_data = current->get_payload().get_data();
size_t message_size = current->get_payload().get_size();
allowed = false;
++current;
/// If message is empty, return end of stream.
if (message_data == nullptr)
return false;
/// const_cast is needed, because ReadBuffer works with non-const char *.
auto * new_position = reinterpret_cast<char *>(const_cast<unsigned char *>(message_data));
BufferBase::set(new_position, message_size, 0);
return true;
}

View File

@ -283,6 +283,11 @@ def test_kafka_json_as_string(kafka_cluster):
kafka_produce(kafka_cluster, 'kafka_json_as_string', ['{"t": 123, "e": {"x": "woof"} }', '', '{"t": 124, "e": {"x": "test"} }',
'{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}'])
# 'tombstone' record (null value) = marker of deleted record
producer = KafkaProducer(bootstrap_servers="localhost:{}".format(cluster.kafka_port), value_serializer=producer_serializer, key_serializer=producer_serializer)
producer.send(topic='kafka_json_as_string', key='xxx')
producer.flush()
instance.query('''
CREATE TABLE test.kafka (field String)
ENGINE = Kafka