Remove seemingly unnecessary nullptr check

This commit is contained in:
János Benjamin Antal 2024-07-15 12:40:44 +00:00
parent 7c1a181469
commit 123fd6b750
2 changed files with 67 additions and 4 deletions

View File

@ -401,15 +401,14 @@ void KafkaConsumer2::commit(const TopicPartition & topic_partition)
ReadBufferPtr KafkaConsumer2::getNextMessage()
{
while (current != messages.end())
if (current != messages.end())
{
const auto * data = current->get_payload().get_data();
size_t size = current->get_payload().get_size();
++current;
// TODO(antaljanosbenjamin): When this can be nullptr?
if (data)
return std::make_shared<ReadBufferFromMemory>(data, size);
chassert(data != nullptr);
return std::make_shared<ReadBufferFromMemory>(data, size);
}
return nullptr;
@ -433,7 +432,11 @@ size_t KafkaConsumer2::filterMessageErrors()
});
if (skipped)
{
LOG_ERROR(log, "There were {} messages with an error", skipped);
// Technically current is invalidated as soon as we erased a single message
current = messages.begin();
}
return skipped;
}

View File

@ -5444,6 +5444,66 @@ def test_multiple_read_in_materialized_views(kafka_cluster, create_query_generat
)
@pytest.mark.parametrize(
"create_query_generator",
[generate_old_create_table_query, generate_new_create_table_query],
)
def test_kafka_null_message(kafka_cluster, create_query_generator):
topic_name = "null_message"
instance.query(
f"""
DROP TABLE IF EXISTS test.null_message_view;
DROP TABLE IF EXISTS test.null_message_consumer;
DROP TABLE IF EXISTS test.null_message_kafka;
{create_query_generator("null_message_kafka", "value UInt64", topic_list=topic_name, consumer_group="mv")};
CREATE TABLE test.null_message_view (value UInt64)
ENGINE = MergeTree()
ORDER BY value;
CREATE MATERIALIZED VIEW test.null_message_consumer TO test.null_message_view AS
SELECT * FROM test.null_message_kafka;
"""
)
message_key_values = []
for i in range(5):
# Here the key is key for Kafka message
message = json.dumps({"value": i}) if i != 3 else None
message_key_values.append({"key": f"{i}".encode(), "message": message})
producer = get_kafka_producer(
kafka_cluster.kafka_port, producer_serializer, 15
)
for message_kv in message_key_values:
producer.send(topic=topic_name, key = message_kv["key"], value=message_kv["message"])
producer.flush()
expected = TSV(
"""
0
1
2
4
"""
)
with existing_kafka_topic(get_admin_client(kafka_cluster), topic_name):
result = instance.query_with_retry(
"SELECT * FROM test.null_message_view", check_callback=lambda res: TSV(res) == expected
)
assert expected == TSV(result)
instance.query(
"""
DROP TABLE test.null_message_consumer;
DROP TABLE test.null_message_view;
DROP TABLE test.null_message_kafka;
"""
)
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")