ClickHouse/src/Storages/Kafka/WriteBufferToKafkaProducer.h
2020-04-03 18:14:31 +03:00

48 lines
1.0 KiB
C++

#pragma once
#include <IO/WriteBuffer.h>
#include <Columns/IColumn.h>
#include <cppkafka/cppkafka.h>
#include <list>
namespace DB
{
class Block;
using ProducerPtr = std::shared_ptr<cppkafka::Producer>;
class WriteBufferToKafkaProducer : public WriteBuffer
{
public:
WriteBufferToKafkaProducer(
ProducerPtr producer_,
const std::string & topic_,
std::optional<char> delimiter,
size_t rows_per_message,
size_t chunk_size_,
std::chrono::milliseconds poll_timeout,
const Block & header);
~WriteBufferToKafkaProducer() override;
void countRow(const Columns & columns, size_t row);
void flush();
private:
void nextImpl() override;
ProducerPtr producer;
const std::string topic;
const std::optional<char> delim;
const size_t max_rows;
const size_t chunk_size;
const std::chrono::milliseconds timeout;
size_t rows = 0;
std::list<std::string> chunks;
std::optional<size_t> key_column_index;
std::optional<size_t> timestamp_column_index;
};
}