2019-01-21 14:02:03 +00:00
|
|
|
#pragma once
|
|
|
|
|
2019-04-22 13:23:05 +00:00
|
|
|
#include <Core/Names.h>
|
|
|
|
#include <IO/DelimitedReadBuffer.h>
|
2019-01-21 14:02:03 +00:00
|
|
|
#include <common/logger_useful.h>
|
|
|
|
|
|
|
|
#include <cppkafka/cppkafka.h>
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
2019-04-22 13:23:05 +00:00
|
|
|
|
|
|
|
using BufferPtr = std::shared_ptr<DelimitedReadBuffer>;
|
2019-01-21 14:02:03 +00:00
|
|
|
using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
|
|
|
|
|
|
|
|
class ReadBufferFromKafkaConsumer : public ReadBuffer
|
|
|
|
{
|
|
|
|
public:
|
2019-05-16 15:20:30 +00:00
|
|
|
ReadBufferFromKafkaConsumer(
|
|
|
|
ConsumerPtr consumer_, Poco::Logger * log_, size_t max_batch_size, size_t poll_timeout_, bool intermediate_commit_)
|
2019-05-14 15:52:03 +00:00
|
|
|
: ReadBuffer(nullptr, 0)
|
|
|
|
, consumer(consumer_)
|
|
|
|
, log(log_)
|
|
|
|
, batch_size(max_batch_size)
|
|
|
|
, poll_timeout(poll_timeout_)
|
2019-05-16 15:20:30 +00:00
|
|
|
, intermediate_commit(intermediate_commit_)
|
2019-05-14 15:52:03 +00:00
|
|
|
, current(messages.begin())
|
2019-01-21 14:02:03 +00:00
|
|
|
{
|
|
|
|
}
|
|
|
|
|
2019-04-22 13:23:05 +00:00
|
|
|
void commit(); // Commit all processed messages.
|
|
|
|
void subscribe(const Names & topics); // Subscribe internal consumer to topics.
|
|
|
|
void unsubscribe(); // Unsubscribe internal consumer in case of failure.
|
2019-01-21 14:02:03 +00:00
|
|
|
|
2019-05-14 15:52:03 +00:00
|
|
|
auto pollTimeout() { return poll_timeout; }
|
|
|
|
|
2019-01-21 14:02:03 +00:00
|
|
|
private:
|
2019-01-25 12:48:59 +00:00
|
|
|
using Messages = std::vector<cppkafka::Message>;
|
|
|
|
|
2019-01-21 14:02:03 +00:00
|
|
|
ConsumerPtr consumer;
|
|
|
|
Poco::Logger * log;
|
2019-01-25 12:48:59 +00:00
|
|
|
const size_t batch_size = 1;
|
2019-05-14 15:52:03 +00:00
|
|
|
const size_t poll_timeout = 0;
|
2019-05-15 16:11:50 +00:00
|
|
|
bool stalled = false;
|
2019-05-16 15:20:30 +00:00
|
|
|
bool intermediate_commit = true;
|
2019-01-25 12:48:59 +00:00
|
|
|
|
|
|
|
Messages messages;
|
|
|
|
Messages::const_iterator current;
|
2019-01-21 14:02:03 +00:00
|
|
|
|
|
|
|
bool nextImpl() override;
|
|
|
|
};
|
|
|
|
|
2019-04-22 13:23:05 +00:00
|
|
|
}
|