Merge pull request #8969 from filimonov/kafka_producer_with_key_and_timestamp

key and timestamp in Kafka producer
This commit is contained in:
Alexander Kuzmenkov 2020-02-11 22:03:37 +03:00 committed by GitHub
commit 413cb601dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 130 additions and 21 deletions

View File

@ -1,6 +1,7 @@
#pragma once
#include <Core/Types.h>
#include <Columns/IColumn.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <IO/BufferWithOwnMemory.h>
@ -9,7 +10,6 @@
#include <unordered_map>
#include <boost/noncopyable.hpp>
namespace DB
{
@ -53,7 +53,9 @@ public:
/// This callback allows to perform some additional actions after writing a single row.
/// It's initial purpose was to flush Kafka message for each row.
using WriteCallback = std::function<void()>;
using WriteCallback = std::function<void(
const Columns & columns,
size_t row)>;
private:
using InputCreator = std::function<BlockInputStreamPtr(

View File

@ -45,7 +45,7 @@ try
BlockInputStreamPtr block_input = std::make_shared<InputStreamFromInputFormat>(std::move(input_format));
BlockOutputStreamPtr block_output = std::make_shared<OutputStreamToOutputFormat>(
std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, [] {}, format_settings));
std::make_shared<TabSeparatedRowOutputFormat>(out_buf, sample, false, false, [](const Columns & /* columns */, size_t /* row */){}, format_settings));
copyData(*block_input, *block_output);
return 0;

View File

@ -13,7 +13,7 @@ void IRowOutputFormat::consume(DB::Chunk chunk)
auto num_rows = chunk.getNumRows();
auto & columns = chunk.getColumns();
for (UInt64 row = 0; row < num_rows; ++row)
for (size_t row = 0; row < num_rows; ++row)
{
if (!first_row)
writeRowBetweenDelimiter();
@ -22,7 +22,7 @@ void IRowOutputFormat::consume(DB::Chunk chunk)
write(columns, row);
if (write_single_row_callback)
write_single_row_callback();
write_single_row_callback(columns, row);
}
}

View File

@ -28,11 +28,11 @@ Block KafkaBlockOutputStream::getHeader() const
void KafkaBlockOutputStream::writePrefix()
{
buffer = storage.createWriteBuffer();
buffer = storage.createWriteBuffer(getHeader());
if (!buffer)
throw Exception("Failed to create Kafka producer!", ErrorCodes::CANNOT_CREATE_IO_BUFFER);
child = FormatFactory::instance().getOutput(storage.getFormatName(), *buffer, getHeader(), context, [this]{ buffer->count_row(); });
child = FormatFactory::instance().getOutput(storage.getFormatName(), *buffer, getHeader(), context, [this](const Columns & columns, size_t row){ buffer->count_row(columns, row); });
}
void KafkaBlockOutputStream::write(const Block & block)

View File

@ -231,7 +231,7 @@ ConsumerBufferPtr StorageKafka::popReadBuffer(std::chrono::milliseconds timeout)
}
ProducerBufferPtr StorageKafka::createWriteBuffer()
ProducerBufferPtr StorageKafka::createWriteBuffer(const Block & header)
{
cppkafka::Configuration conf;
conf.set("metadata.broker.list", brokers);
@ -245,7 +245,7 @@ ProducerBufferPtr StorageKafka::createWriteBuffer()
size_t poll_timeout = settings.stream_poll_timeout_ms.totalMilliseconds();
return std::make_shared<WriteBufferToKafkaProducer>(
producer, topics[0], row_delimiter ? std::optional<char>{row_delimiter} : std::optional<char>(), 1, 1024, std::chrono::milliseconds(poll_timeout));
producer, topics[0], row_delimiter ? std::optional<char>{row_delimiter} : std::nullopt, 1, 1024, std::chrono::milliseconds(poll_timeout), header);
}

View File

@ -54,7 +54,7 @@ public:
ConsumerBufferPtr popReadBuffer();
ConsumerBufferPtr popReadBuffer(std::chrono::milliseconds timeout);
ProducerBufferPtr createWriteBuffer();
ProducerBufferPtr createWriteBuffer(const Block & header);
const auto & getTopics() const { return topics; }
const auto & getFormatName() const { return format_name; }

View File

@ -1,4 +1,7 @@
#include "WriteBufferToKafkaProducer.h"
#include "Core/Block.h"
#include "Columns/ColumnString.h"
#include "Columns/ColumnsNumber.h"
namespace DB
{
@ -8,7 +11,9 @@ WriteBufferToKafkaProducer::WriteBufferToKafkaProducer(
std::optional<char> delimiter,
size_t rows_per_message,
size_t chunk_size_,
std::chrono::milliseconds poll_timeout)
std::chrono::milliseconds poll_timeout,
const Block & header
)
: WriteBuffer(nullptr, 0)
, producer(producer_)
, topic(topic_)
@ -17,6 +22,26 @@ WriteBufferToKafkaProducer::WriteBufferToKafkaProducer(
, chunk_size(chunk_size_)
, timeout(poll_timeout)
{
if (header.has("_key"))
{
auto column_index = header.getPositionByName("_key");
auto column_info = header.getByPosition(column_index);
if (isString(column_info.type))
{
key_column_index = column_index;
}
// else ? (not sure it's a good place to report smth to user)
}
if (header.has("_timestamp"))
{
auto column_index = header.getPositionByName("_timestamp");
auto column_info = header.getByPosition(column_index);
if (isDateTime(column_info.type))
{
timestamp_column_index = column_index;
}
}
}
WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer()
@ -24,22 +49,51 @@ WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer()
assert(rows == 0 && chunks.empty());
}
void WriteBufferToKafkaProducer::count_row()
void WriteBufferToKafkaProducer::count_row(const Columns & columns, size_t current_row)
{
if (++rows % max_rows == 0)
{
const std::string & last_chunk = chunks.back();
size_t last_chunk_size = offset();
// if last character of last chunk is delimeter - we don't need it
if (delim && last_chunk[last_chunk_size - 1] == delim)
--last_chunk_size;
std::string payload;
payload.reserve((chunks.size() - 1) * chunk_size + offset());
payload.reserve((chunks.size() - 1) * chunk_size + last_chunk_size);
// concat all chunks except the last one
for (auto i = chunks.begin(), e = --chunks.end(); i != e; ++i)
payload.append(*i);
int trunk_delim = delim && chunks.back()[offset() - 1] == delim ? 1 : 0;
payload.append(chunks.back(), 0, offset() - trunk_delim);
// add last one
payload.append(last_chunk, 0, last_chunk_size);
cppkafka::MessageBuilder builder(topic);
builder.payload(payload);
// Note: if it will be few rows per message - it will take the value from last row of block
if (key_column_index)
{
const auto & key_column = assert_cast<const ColumnString &>(*columns[key_column_index.value()]);
const auto key_data = key_column.getDataAt(current_row);
builder.key(cppkafka::Buffer(key_data.data, key_data.size));
}
if (timestamp_column_index)
{
const auto & timestamp_column = assert_cast<const ColumnUInt32 &>(*columns[timestamp_column_index.value()]);
const auto timestamp = std::chrono::seconds{timestamp_column.getElement(current_row)};
builder.timestamp(timestamp);
}
while (true)
{
try
{
producer->produce(cppkafka::MessageBuilder(topic).payload(payload));
producer->produce(builder);
}
catch (cppkafka::HandleException & e)
{

View File

@ -1,6 +1,7 @@
#pragma once
#include <IO/WriteBuffer.h>
#include <Columns/IColumn.h>
#include <cppkafka/cppkafka.h>
@ -8,7 +9,7 @@
namespace DB
{
class Block;
using ProducerPtr = std::shared_ptr<cppkafka::Producer>;
class WriteBufferToKafkaProducer : public WriteBuffer
@ -20,10 +21,11 @@ public:
std::optional<char> delimiter,
size_t rows_per_message,
size_t chunk_size_,
std::chrono::milliseconds poll_timeout);
std::chrono::milliseconds poll_timeout,
const Block & header);
~WriteBufferToKafkaProducer() override;
void count_row();
void count_row(const Columns & columns, size_t row);
void flush();
private:
@ -38,6 +40,8 @@ private:
size_t rows = 0;
std::list<std::string> chunks;
std::optional<size_t> key_column_index;
std::optional<size_t> timestamp_column_index;
};
}

View File

@ -789,6 +789,55 @@ def test_kafka_virtual_columns2(kafka_cluster):
assert TSV(result) == TSV(expected)
@pytest.mark.timeout(240)
def test_kafka_produce_key_timestamp(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka_writer (key UInt64, value UInt64, _key String, _timestamp DateTime)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'insert3',
kafka_group_name = 'insert3',
kafka_format = 'TSV',
kafka_row_delimiter = '\\n';
CREATE TABLE test.kafka (key UInt64, value UInt64, inserted_key String, inserted_timestamp DateTime)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'insert3',
kafka_group_name = 'insert3',
kafka_format = 'TSV',
kafka_row_delimiter = '\\n';
CREATE MATERIALIZED VIEW test.view Engine=Log AS
SELECT key, value, inserted_key, toUnixTimestamp(inserted_timestamp), _key, _topic, _partition, _offset, toUnixTimestamp(_timestamp) FROM test.kafka;
''')
instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({}))".format(1,1,'k1',1577836801))
instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({}))".format(2,2,'k2',1577836802))
instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({})),({},{},'{}',toDateTime({}))".format(3,3,'k3',1577836803,4,4,'k4',1577836804))
instance.query("INSERT INTO test.kafka_writer VALUES ({},{},'{}',toDateTime({}))".format(5,5,'k5',1577836805))
time.sleep(10)
result = instance.query("SELECT * FROM test.view ORDER BY value", ignore_error=True)
# print(result)
expected = '''\
1 1 k1 1577836801 k1 insert3 0 0 1577836801
2 2 k2 1577836802 k2 insert3 0 1 1577836802
3 3 k3 1577836803 k3 insert3 0 2 1577836803
4 4 k4 1577836804 k4 insert3 0 3 1577836804
5 5 k5 1577836805 k5 insert3 0 4 1577836805
'''
assert TSV(result) == TSV(expected)
@pytest.mark.timeout(600)
def test_kafka_flush_by_time(kafka_cluster):
instance.query('''
@ -876,9 +925,9 @@ def test_kafka_flush_by_block_size(kafka_cluster):
time.sleep(1)
result = instance.query('SELECT count() FROM test.view')
print(result)
# print(result)
# kafka_cluster.open_bash_shell('instance')
# kafka_cluster.open_bash_shell('instance')
instance.query('''
DROP TABLE test.consumer;