diff --git a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp index 4614e581a3c..823eb632b7f 100644 --- a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp +++ b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp @@ -72,9 +72,7 @@ void ReadBufferFromKafkaConsumer::commit() PrintOffsets("Polled offset", consumer->get_offsets_position(consumer->get_assignment())); - /// Since we can poll more messages than we already processed - commit only processed messages. - if (!messages.empty()) - consumer->async_commit(*std::prev(current)); + consumer->async_commit(); PrintOffsets("Committed offset", consumer->get_offsets_committed(consumer->get_assignment())); @@ -186,6 +184,9 @@ bool ReadBufferFromKafkaConsumer::nextImpl() auto new_position = reinterpret_cast(const_cast(current->get_payload().get_data())); BufferBase::set(new_position, current->get_payload().get_size(), 0); + /// Since we can poll more messages than we already processed - commit only processed messages. + consumer->store_offset(*current); + ++current; return true; diff --git a/dbms/src/Storages/Kafka/StorageKafka.cpp b/dbms/src/Storages/Kafka/StorageKafka.cpp index 2b41fa9e772..ed067993a18 100644 --- a/dbms/src/Storages/Kafka/StorageKafka.cpp +++ b/dbms/src/Storages/Kafka/StorageKafka.cpp @@ -261,9 +261,10 @@ ConsumerBufferPtr StorageKafka::createReadBuffer() conf.set("metadata.broker.list", brokers); conf.set("group.id", group); conf.set("client.id", VERSION_FULL); - conf.set("auto.offset.reset", "smallest"); // If no offset stored for this group, read all messages from the start - conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished - conf.set("enable.partition.eof", "false"); // Ignore EOF messages + conf.set("auto.offset.reset", "smallest"); // If no offset stored for this group, read all messages from the start + conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished + conf.set("enable.auto.offset.store", "false"); // Update offset automatically - to commit them all at once. + conf.set("enable.partition.eof", "false"); // Ignore EOF messages updateConfiguration(conf); // Create a consumer and subscribe to topics