diff --git a/src/Storages/MessageQueueSink.cpp b/src/Storages/MessageQueueSink.cpp index d4dabd60ef8..104e3506ade 100644 --- a/src/Storages/MessageQueueSink.cpp +++ b/src/Storages/MessageQueueSink.cpp @@ -46,6 +46,8 @@ void MessageQueueSink::consume(Chunk & chunk) if (columns.empty()) return; + /// The formatter might hold pointers to buffer (e.g. if PeekableWriteBuffer is used), which means the formatter + /// needs to be reset after buffer might reallocate its memory. In this exact case after restarting the buffer. if (row_format) { size_t row = 0; @@ -77,6 +79,4 @@ void MessageQueueSink::consume(Chunk & chunk) format->resetFormatter(); } } - - } diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index dd0bf1bf28f..8793ab72a16 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -5098,7 +5098,7 @@ def test_multiple_read_in_materialized_views(kafka_cluster, max_retries=15): def test_kafka_produce_http_interface_row_based_format(kafka_cluster): - # reproduction of #https://github.com/ClickHouse/ClickHouse/issues/61060 with validating the written messages + # reproduction of #61060 with validating the written messages admin_client = KafkaAdminClient( bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port) )