mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
Store offsets manually for each message (#6872)
This commit is contained in:
parent
abc7fbf6c8
commit
6c32fc3fc1
@ -72,9 +72,7 @@ void ReadBufferFromKafkaConsumer::commit()
|
||||
|
||||
PrintOffsets("Polled offset", consumer->get_offsets_position(consumer->get_assignment()));
|
||||
|
||||
/// Since we can poll more messages than we already processed - commit only processed messages.
|
||||
if (!messages.empty())
|
||||
consumer->async_commit(*std::prev(current));
|
||||
consumer->async_commit();
|
||||
|
||||
PrintOffsets("Committed offset", consumer->get_offsets_committed(consumer->get_assignment()));
|
||||
|
||||
@ -186,6 +184,9 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
|
||||
auto new_position = reinterpret_cast<char *>(const_cast<unsigned char *>(current->get_payload().get_data()));
|
||||
BufferBase::set(new_position, current->get_payload().get_size(), 0);
|
||||
|
||||
/// Since we can poll more messages than we already processed - commit only processed messages.
|
||||
consumer->store_offset(*current);
|
||||
|
||||
++current;
|
||||
|
||||
return true;
|
||||
|
@ -261,9 +261,10 @@ ConsumerBufferPtr StorageKafka::createReadBuffer()
|
||||
conf.set("metadata.broker.list", brokers);
|
||||
conf.set("group.id", group);
|
||||
conf.set("client.id", VERSION_FULL);
|
||||
conf.set("auto.offset.reset", "smallest"); // If no offset stored for this group, read all messages from the start
|
||||
conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished
|
||||
conf.set("enable.partition.eof", "false"); // Ignore EOF messages
|
||||
conf.set("auto.offset.reset", "smallest"); // If no offset stored for this group, read all messages from the start
|
||||
conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished
|
||||
conf.set("enable.auto.offset.store", "false"); // Update offset automatically - to commit them all at once.
|
||||
conf.set("enable.partition.eof", "false"); // Ignore EOF messages
|
||||
updateConfiguration(conf);
|
||||
|
||||
// Create a consumer and subscribe to topics
|
||||
|
Loading…
Reference in New Issue
Block a user