StorageKafka: make commit message only if messages are consumed

This commit is contained in:
Marek Vavruša 2017-12-22 16:32:48 -08:00 committed by alexey-milovidov
parent ad59a1460e
commit 77393287ab

View File

@ -52,6 +52,7 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
rd_kafka_t * consumer; rd_kafka_t * consumer;
rd_kafka_message_t * current; rd_kafka_message_t * current;
Poco::Logger * log; Poco::Logger * log;
size_t read_messages;
bool nextImpl() override bool nextImpl() override
{ {
@ -82,6 +83,7 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
// If an exception is thrown before that would occur, the client will rejoin without comitting offsets // If an exception is thrown before that would occur, the client will rejoin without comitting offsets
BufferBase::set(reinterpret_cast<char *>(msg->payload), msg->len, 0); BufferBase::set(reinterpret_cast<char *>(msg->payload), msg->len, 0);
current = msg; current = msg;
++read_messages;
return true; return true;
} }
@ -96,9 +98,22 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
public: public:
ReadBufferFromKafkaConsumer(rd_kafka_t * consumer_, Poco::Logger * log_) ReadBufferFromKafkaConsumer(rd_kafka_t * consumer_, Poco::Logger * log_)
: ReadBuffer(nullptr, 0), consumer(consumer_), current(nullptr), log(log_) {} : ReadBuffer(nullptr, 0), consumer(consumer_), current(nullptr), log(log_), read_messages(0) {}
~ReadBufferFromKafkaConsumer() { reset(); } ~ReadBufferFromKafkaConsumer() { reset(); }
/// Commit messages read with this consumer
void commit() {
LOG_TRACE(log, "Committing " << read_messages << " messages");
if (read_messages == 0)
return;
auto err = rd_kafka_commit(consumer, NULL, 1 /* async */);
if (err)
throw Exception("Failed to commit offsets: " + String(rd_kafka_err2str(err)), ErrorCodes::UNKNOWN_EXCEPTION);
read_messages = 0;
}
}; };
class KafkaBlockInputStream : public IProfilingBlockInputStream class KafkaBlockInputStream : public IProfilingBlockInputStream
@ -166,10 +181,8 @@ public:
{ {
reader->readSuffix(); reader->readSuffix();
// Store offsets read in this stream asynchronously // Store offsets read in this stream
auto err = rd_kafka_commit(consumer->stream, NULL, 1 /* async */); read_buf->commit();
if (err)
throw Exception("Failed to commit offsets: " + String(rd_kafka_err2str(err)), ErrorCodes::UNKNOWN_EXCEPTION);
// Mark as successfully finished // Mark as successfully finished
finalized = true; finalized = true;