From 77393287ab50c270339a9c3320f993ba1a599c94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marek=20Vavru=C5=A1a?= Date: Fri, 22 Dec 2017 16:32:48 -0800 Subject: [PATCH] StorageKafka: make commit message only if messages are consumed --- dbms/src/Storages/StorageKafka.cpp | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/dbms/src/Storages/StorageKafka.cpp b/dbms/src/Storages/StorageKafka.cpp index 822e7f63558..ccfa771b340 100644 --- a/dbms/src/Storages/StorageKafka.cpp +++ b/dbms/src/Storages/StorageKafka.cpp @@ -52,6 +52,7 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer rd_kafka_t * consumer; rd_kafka_message_t * current; Poco::Logger * log; + size_t read_messages; bool nextImpl() override { @@ -82,6 +83,7 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer // If an exception is thrown before that would occur, the client will rejoin without comitting offsets BufferBase::set(reinterpret_cast(msg->payload), msg->len, 0); current = msg; + ++read_messages; return true; } @@ -96,9 +98,22 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer public: ReadBufferFromKafkaConsumer(rd_kafka_t * consumer_, Poco::Logger * log_) - : ReadBuffer(nullptr, 0), consumer(consumer_), current(nullptr), log(log_) {} + : ReadBuffer(nullptr, 0), consumer(consumer_), current(nullptr), log(log_), read_messages(0) {} ~ReadBufferFromKafkaConsumer() { reset(); } + + /// Commit messages read with this consumer + void commit() { + LOG_TRACE(log, "Committing " << read_messages << " messages"); + if (read_messages == 0) + return; + + auto err = rd_kafka_commit(consumer, NULL, 1 /* async */); + if (err) + throw Exception("Failed to commit offsets: " + String(rd_kafka_err2str(err)), ErrorCodes::UNKNOWN_EXCEPTION); + + read_messages = 0; + } }; class KafkaBlockInputStream : public IProfilingBlockInputStream @@ -166,10 +181,8 @@ public: { reader->readSuffix(); - // Store offsets read in this stream asynchronously - auto err = rd_kafka_commit(consumer->stream, NULL, 1 /* async */); - if (err) - throw Exception("Failed to commit offsets: " + String(rd_kafka_err2str(err)), ErrorCodes::UNKNOWN_EXCEPTION); + // Store offsets read in this stream + read_buf->commit(); // Mark as successfully finished finalized = true;