draft implementation, wip

This commit is contained in:
Mikhail Filimonov 2020-02-03 11:02:52 +01:00
parent a6e3265960
commit 24ff635435
7 changed files with 60 additions and 15 deletions

View File

@ -1,6 +1,7 @@
#pragma once
#include <Core/Types.h>
#include <Columns/IColumn.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <IO/BufferWithOwnMemory.h>
@ -9,7 +10,6 @@
#include <unordered_map>
#include <boost/noncopyable.hpp>
namespace DB
{
@ -53,7 +53,9 @@ public:
/// This callback allows to perform some additional actions after writing a single row.
/// It's initial purpose was to flush Kafka message for each row.
using WriteCallback = std::function<void()>;
using WriteCallback = std::function<void(
const Columns & columns,
size_t row)>;
private:
using InputCreator = std::function<BlockInputStreamPtr(

View File

@ -13,7 +13,7 @@ void IRowOutputFormat::consume(DB::Chunk chunk)
auto num_rows = chunk.getNumRows();
auto & columns = chunk.getColumns();
for (UInt64 row = 0; row < num_rows; ++row)
for (size_t row = 0; row < num_rows; ++row)
{
if (!first_row)
writeRowBetweenDelimiter();
@ -22,7 +22,7 @@ void IRowOutputFormat::consume(DB::Chunk chunk)
write(columns, row);
if (write_single_row_callback)
write_single_row_callback();
write_single_row_callback(columns, row);
}
}

View File

@ -28,11 +28,11 @@ Block KafkaBlockOutputStream::getHeader() const
void KafkaBlockOutputStream::writePrefix()
{
buffer = storage.createWriteBuffer();
buffer = storage.createWriteBuffer(getHeader());
if (!buffer)
throw Exception("Failed to create Kafka producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
child = FormatFactory::instance().getOutput(storage.getFormatName(), *buffer, getHeader(), context, [this]{ buffer->count_row(); });
child = FormatFactory::instance().getOutput(storage.getFormatName(), *buffer, getHeader(), context, [this](const Columns & columns, size_t row){ buffer->count_row(columns, row); });
}
void KafkaBlockOutputStream::write(const Block & block)

View File

@ -231,7 +231,7 @@ ConsumerBufferPtr StorageKafka::popReadBuffer(std::chrono::milliseconds timeout)
}
ProducerBufferPtr StorageKafka::createWriteBuffer()
ProducerBufferPtr StorageKafka::createWriteBuffer(Block header)
{
cppkafka::Configuration conf;
conf.set("metadata.broker.list", brokers);
@ -245,7 +245,7 @@ ProducerBufferPtr StorageKafka::createWriteBuffer()
size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds();
return std::make_shared<WriteBufferToKafkaProducer>(
producer, topics[0], row_delimiter ? std::optional<char>{row_delimiter} : std::optional<char>(), 1, 1024, std::chrono::milliseconds(poll_timeout));
producer, topics[0], row_delimiter ? std::optional<char>{row_delimiter} : std::optional<char>(), 1, 1024, std::chrono::milliseconds(poll_timeout), header);
}

View File

@ -53,7 +53,7 @@ public:
ConsumerBufferPtr popReadBuffer();
ConsumerBufferPtr popReadBuffer(std::chrono::milliseconds timeout);
ProducerBufferPtr createWriteBuffer();
ProducerBufferPtr createWriteBuffer(Block header);
const auto & getTopics() const { return topics; }
const auto & getFormatName() const { return format_name; }

View File

@ -1,4 +1,7 @@
#include "WriteBufferToKafkaProducer.h"
#include "Core/Block.h"
#include "Columns/ColumnString.h"
#include "Columns/ColumnsNumber.h"
namespace DB
{
@ -8,7 +11,9 @@ WriteBufferToKafkaProducer::WriteBufferToKafkaProducer(
std::optional<char> delimiter,
size_t rows_per_message,
size_t chunk_size_,
std::chrono::milliseconds poll_timeout)
std::chrono::milliseconds poll_timeout,
Block header
)
: WriteBuffer(nullptr, 0)
, producer(producer_)
, topic(topic_)
@ -17,6 +22,20 @@ WriteBufferToKafkaProducer::WriteBufferToKafkaProducer(
, chunk_size(chunk_size_)
, timeout(poll_timeout)
{
for (size_t i = 0; i < header.columns(); ++i)
{
auto column_info = header.getByPosition(i);
if (column_info.name == "_key" && isString(column_info.type) )
{
key_column_index = i;
}
else if (column_info.name == "_timestamp" && isDateTime(column_info.type)) {
timestamp_column_index = i;
}
}
}
WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer()
@ -24,8 +43,9 @@ WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer()
assert(rows == 0 && chunks.empty());
}
void WriteBufferToKafkaProducer::count_row()
void WriteBufferToKafkaProducer::count_row(const Columns & columns, size_t current_row)
{
if (++rows % max_rows == 0)
{
std::string payload;
@ -35,11 +55,30 @@ void WriteBufferToKafkaProducer::count_row()
int trunk_delim = delim && chunks.back()[offset() - 1] == delim ? 1 : 0;
payload.append(chunks.back(), 0, offset() - trunk_delim);
cppkafka::MessageBuilder builder(topic);
builder.payload(payload);
// Note: if it will be few rows per message - it will take the value from last row of block
if (key_column_index)
{
const auto & key_column = assert_cast<const ColumnString &>(*columns[key_column_index.value()]);
const auto key_data = key_column.getDataAt(current_row);
builder.key(cppkafka::Buffer(key_data.data, key_data.size));
}
if (timestamp_column_index)
{
const auto & timestamp_column = assert_cast<const ColumnUInt32 &>(*columns[timestamp_column_index.value()]);
const auto timestamp = std::chrono::seconds{timestamp_column.getElement(current_row)};
builder.timestamp(timestamp);
}
while (true)
{
try
{
producer->produce(cppkafka::MessageBuilder(topic).payload(payload));
producer->produce(builder);
}
catch (cppkafka::HandleException & e)
{

View File

@ -1,6 +1,7 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Columns/IColumn.h>
#include <cppkafka/cppkafka.h>
@ -8,7 +9,7 @@
namespace DB
{
class Block;
using ProducerPtr = std::shared_ptr<cppkafka::Producer>;
class WriteBufferToKafkaProducer : public WriteBuffer
@ -20,10 +21,11 @@ public:
std::optional<char> delimiter,
size_t rows_per_message,
size_t chunk_size_,
std::chrono::milliseconds poll_timeout);
std::chrono::milliseconds poll_timeout,
Block header);
~WriteBufferToKafkaProducer() override;
void count_row();
void count_row(const Columns & columns, size_t row);
void flush();
private:
@ -38,6 +40,8 @@ private:
size_t rows = 0;
std::list<std::string> chunks;
std::optional<size_t> key_column_index;
std::optional<size_t> timestamp_column_index;
};
}