Use cppkafka instead of raw C interface

This commit is contained in:
Ivan Lezhankin 2018-12-27 17:18:20 +03:00
parent 9c35598373
commit 337c092c7e
2 changed files with 42 additions and 155 deletions

View File

@ -29,8 +29,6 @@
#include <Common/typeid_cast.h>
#include <common/logger_useful.h>
#include <cppkafka/cppkafka.h>
namespace DB
{
@ -58,8 +56,8 @@ static const String CONFIG_PREFIX = "kafka";
class ReadBufferFromKafkaConsumer : public ReadBuffer
{
rd_kafka_t * consumer;
rd_kafka_message_t * current = nullptr;
ConsumerPtr consumer;
cppkafka::Message current;
bool current_pending = false; /// We've fetched "current" message and need to process it on the next iteration.
Poco::Logger * log;
size_t read_messages = 0;
@ -69,42 +67,36 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
{
if (current_pending)
{
BufferBase::set(reinterpret_cast<char *>(current->payload), current->len, 0);
// XXX: very fishy place with const casting.
BufferBase::set(reinterpret_cast<char *>(const_cast<unsigned char *>(current.get_payload().get_data())), current.get_payload().get_size(), 0);
current_pending = false;
return true;
}
// Process next buffered message
rd_kafka_message_t * msg = rd_kafka_consumer_poll(consumer, READ_POLL_MS); // XXX: use RAII.
if (msg == nullptr)
auto message = consumer->poll(std::chrono::milliseconds(READ_POLL_MS));
if (!message)
return false;
if (msg->err)
if (message.is_eof())
{
if (msg->err != RD_KAFKA_RESP_ERR__PARTITION_EOF)
{
LOG_ERROR(log, "Consumer error: " << rd_kafka_err2str(msg->err) << " " << rd_kafka_message_errstr(msg));
rd_kafka_message_destroy(msg);
return false;
}
// Reach EOF while reading current batch, skip it
LOG_TRACE(log, "EOF reached for partition " << msg->partition << " offset " << msg->offset);
rd_kafka_message_destroy(msg);
// Reached EOF while reading current batch, skip it.
LOG_TRACE(log, "EOF reached for partition " << message.get_partition() << " offset " << message.get_offset());
return nextImpl();
}
if (msg->len && !msg->payload)
throw Exception("Logical error: nullptr message returned with non-zero length", ErrorCodes::LOGICAL_ERROR);
else if (auto err = message.get_error())
{
LOG_ERROR(log, "Consumer error: " << err);
return false;
}
++read_messages;
// Now we've received a new message. Check if we need to produce a delimiter
if (row_delimiter != '\0' && current != nullptr)
if (row_delimiter != '\0' && current)
{
BufferBase::set(&row_delimiter, 1, 0);
reset();
current = msg;
current = std::move(message);
current_pending = true;
return true;
}
@ -112,31 +104,21 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
// Consume message and mark the topic/partition offset
// The offsets will be committed in the readSuffix() method after the block is completed
// If an exception is thrown before that would occur, the client will rejoin without committing offsets
reset();
current = msg;
BufferBase::set(reinterpret_cast<char *>(current->payload), current->len, 0);
current = std::move(message);
// XXX: very fishy place with const casting.
BufferBase::set(reinterpret_cast<char *>(const_cast<unsigned char *>(current.get_payload().get_data())), current.get_payload().get_size(), 0);
return true;
}
void reset()
{
if (current != nullptr)
{
rd_kafka_message_destroy(current);
current = nullptr;
}
}
public:
ReadBufferFromKafkaConsumer(rd_kafka_t * consumer_, Poco::Logger * log_, char row_delimiter_)
ReadBufferFromKafkaConsumer(ConsumerPtr consumer_, Poco::Logger * log_, char row_delimiter_)
: ReadBuffer(nullptr, 0), consumer(consumer_), log(log_), row_delimiter(row_delimiter_)
{
if (row_delimiter != '\0')
LOG_TRACE(log, "Row delimiter is: " << row_delimiter);
}
~ReadBufferFromKafkaConsumer() override { reset(); }
/// Commit messages read with this consumer
void commit()
{
@ -144,10 +126,7 @@ public:
if (read_messages == 0)
return;
auto err = rd_kafka_commit(consumer, nullptr, 1 /* async */);
if (err)
throw Exception("Failed to commit offsets: " + String(rd_kafka_err2str(err)), ErrorCodes::UNKNOWN_EXCEPTION);
consumer->async_commit();
read_messages = 0;
}
};
@ -211,7 +190,7 @@ public:
if (consumer == nullptr)
throw Exception("Failed to claim consumer: ", ErrorCodes::TIMEOUT_EXCEEDED);
read_buf = std::make_unique<ReadBufferFromKafkaConsumer>(consumer->stream, storage.log, storage.row_delimiter);
read_buf = std::make_unique<ReadBufferFromKafkaConsumer>(consumer, storage.log, storage.row_delimiter);
reader = FormatFactory::instance().getInput(storage.format_name, *read_buf, storage.getSampleBlock(), context, max_block_size);
}
@ -235,7 +214,7 @@ public:
private:
StorageKafka & storage;
StorageKafka::ConsumerPtr consumer;
ConsumerPtr consumer;
Context context;
size_t max_block_size;
Block sample_block;
@ -247,7 +226,7 @@ private:
bool hasClaimed() { return consumer != nullptr; }
};
static void loadFromConfig(struct rd_kafka_conf_s * conf, const AbstractConfiguration & config, const std::string & path)
static void loadFromConfig(cppkafka::Configuration & conf, const AbstractConfiguration & config, const std::string & path)
{
AbstractConfiguration::Keys keys;
std::vector<char> errstr(512);
@ -258,8 +237,7 @@ static void loadFromConfig(struct rd_kafka_conf_s * conf, const AbstractConfigur
{
const String key_path = path + "." + key;
const String key_name = boost::replace_all_copy(key, "_", ".");
if (rd_kafka_conf_set(conf, key_name.c_str(), config.getString(key_path).c_str(), errstr.data(), errstr.size()) != RD_KAFKA_CONF_OK)
throw Exception("Invalid Kafka setting " + key_path + " in config: " + String(errstr.data()), ErrorCodes::INVALID_CONFIG_PARAMETER);
conf.set(key_name, config.getString(key_path));
}
}
@ -322,21 +300,8 @@ void StorageKafka::startup()
{
for (size_t i = 0; i < num_consumers; ++i)
{
// Building configuration may throw, the consumer configuration must be destroyed in that case
auto consumer_conf = rd_kafka_conf_new();
try
{
consumerConfiguration(consumer_conf);
}
catch (...)
{
rd_kafka_conf_destroy(consumer_conf);
throw;
}
// Create a consumer and subscribe to topics
// Note: consumer takes ownership of the configuration
auto consumer = std::make_shared<StorageKafka::Consumer>(consumer_conf);
auto consumer = std::make_shared<cppkafka::Consumer>(createConsumerConfiguration());
consumer->subscribe(topics);
// Make consumer available
@ -358,7 +323,7 @@ void StorageKafka::shutdown()
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto consumer = claimConsumer();
consumer->close();
// FIXME: not sure if really close consumers here, and if we really need to close them here.
}
LOG_TRACE(log, "Waiting for cleanup");
@ -374,24 +339,20 @@ void StorageKafka::updateDependencies()
}
void StorageKafka::consumerConfiguration(struct rd_kafka_conf_s * conf)
cppkafka::Configuration StorageKafka::createConsumerConfiguration()
{
std::vector<char> errstr(512);
cppkafka::Configuration conf;
LOG_TRACE(log, "Setting brokers: " << brokers);
if (rd_kafka_conf_set(conf, "metadata.broker.list", brokers.c_str(), errstr.data(), errstr.size()) != RD_KAFKA_CONF_OK)
throw Exception(String(errstr.data()), ErrorCodes::INCORRECT_DATA);
conf.set("metadata.broker.list", brokers);
LOG_TRACE(log, "Setting Group ID: " << group << " Client ID: clickhouse");
conf.set("group.id", group);
if (rd_kafka_conf_set(conf, "group.id", group.c_str(), errstr.data(), errstr.size()) != RD_KAFKA_CONF_OK)
throw Exception(String(errstr.data()), ErrorCodes::INCORRECT_DATA);
if (rd_kafka_conf_set(conf, "client.id", VERSION_FULL, errstr.data(), errstr.size()) != RD_KAFKA_CONF_OK)
throw Exception(String(errstr.data()), ErrorCodes::INCORRECT_DATA);
conf.set("client.id", VERSION_FULL);
// We manually commit offsets after a stream successfully finished
rd_kafka_conf_set(conf, "enable.auto.commit", "false", nullptr, 0);
conf.set("enable.auto.commit", "false");
// Update consumer configuration from the configuration
const auto & config = global_context.getConfigRef();
@ -405,14 +366,16 @@ void StorageKafka::consumerConfiguration(struct rd_kafka_conf_s * conf)
if (config.has(topic_config_key))
loadFromConfig(conf, config, topic_config_key);
}
return conf;
}
StorageKafka::ConsumerPtr StorageKafka::claimConsumer()
ConsumerPtr StorageKafka::claimConsumer()
{
return tryClaimConsumer(-1L);
}
StorageKafka::ConsumerPtr StorageKafka::tryClaimConsumer(long wait_ms)
ConsumerPtr StorageKafka::tryClaimConsumer(long wait_ms)
{
// Wait for the first free consumer
if (wait_ms >= 0)
@ -430,7 +393,7 @@ StorageKafka::ConsumerPtr StorageKafka::tryClaimConsumer(long wait_ms)
return consumer;
}
void StorageKafka::pushConsumer(StorageKafka::ConsumerPtr consumer)
void StorageKafka::pushConsumer(ConsumerPtr consumer)
{
std::lock_guard lock(mutex);
consumers.push_back(consumer);
@ -553,64 +516,6 @@ bool StorageKafka::streamToViews()
}
StorageKafka::Consumer::Consumer(struct rd_kafka_conf_s * conf)
{
std::vector<char> errstr(512);
stream = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr.data(), errstr.size());
if (stream == nullptr)
{
rd_kafka_conf_destroy(conf);
throw Exception("Failed to create consumer handle: " + String(errstr.data()), ErrorCodes::UNKNOWN_EXCEPTION);
}
rd_kafka_poll_set_consumer(stream);
}
StorageKafka::Consumer::~Consumer()
{
close();
}
void StorageKafka::Consumer::subscribe(const Names & topics_to_subscribe)
{
if (stream == nullptr)
throw Exception("Cannot subscribe to topics when consumer is closed", ErrorCodes::UNKNOWN_EXCEPTION);
// Create a list of partitions
auto * topic_list = rd_kafka_topic_partition_list_new(topics_to_subscribe.size());
for (const auto & topic : topics_to_subscribe)
rd_kafka_topic_partition_list_add(topic_list, topic.c_str(), RD_KAFKA_PARTITION_UA);
// Subscribe to requested topics
auto err = rd_kafka_subscribe(stream, topic_list);
if (err)
{
rd_kafka_topic_partition_list_destroy(topic_list);
throw Exception("Failed to subscribe: " + String(rd_kafka_err2str(err)), ErrorCodes::UNKNOWN_EXCEPTION);
}
rd_kafka_topic_partition_list_destroy(topic_list);
}
void StorageKafka::Consumer::unsubscribe()
{
if (stream != nullptr)
rd_kafka_unsubscribe(stream);
}
void StorageKafka::Consumer::close()
{
if (stream != nullptr)
{
rd_kafka_consumer_close(stream);
rd_kafka_destroy(stream);
stream = nullptr;
}
}
void registerStorageKafka(StorageFactory & factory)
{
factory.registerStorage("Kafka", [](const StorageFactory::Arguments & args)

View File

@ -12,15 +12,13 @@
#include <Poco/Semaphore.h>
#include <ext/shared_ptr_helper.h>
#include <cppkafka/cppkafka.h>
#include <mutex>
struct rd_kafka_s;
struct rd_kafka_conf_s;
namespace DB
{
class StorageKafka;
using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
/** Implements a Kafka queue table engine that can be used as a persistent queue / buffer,
* or as a basic building block for creating pipelines with a continuous insertion / ETL.
@ -55,22 +53,6 @@ public:
void updateDependencies() override;
private:
/// Each engine typically has one consumer (able to process 1..N partitions)
/// It is however possible to create multiple consumers per table, as long
/// as the total number of consumers is <= number of partitions.
struct Consumer
{
Consumer(struct rd_kafka_conf_s * conf);
~Consumer();
void subscribe(const Names & topics);
void unsubscribe();
void close();
struct rd_kafka_s * stream = nullptr;
};
using ConsumerPtr = std::shared_ptr<Consumer>;
// Configuration and state
String table_name;
String database_name;
@ -102,7 +84,7 @@ private:
BackgroundSchedulePool::TaskHolder task;
std::atomic<bool> stream_cancelled{false};
void consumerConfiguration(struct rd_kafka_conf_s * conf);
cppkafka::Configuration createConsumerConfiguration();
ConsumerPtr claimConsumer();
ConsumerPtr tryClaimConsumer(long wait_ms);
void pushConsumer(ConsumerPtr c);