StorageKafka: extended configuration, parallel consumers, offset tracking

This contains many fixes and corrections for the Kafka engine.
Most notably it now supports extended configuration similarly to GraphiteMergeTree.
Now it also allows specification of consumer count to parallelize consumption of
multiple partitions both in materialized views and in SELECT queries.

The offsets are now committed in the insertSuffix() method after all rows
are successfully read. If an exception is thrown during reading, affected consumer
unsubscribes from all assignments and rejoins the consumer group to rewind offsets.
This means that the consumer won't lose messages in case of write failures.
This commit is contained in:
Marek Vavruša 2017-12-15 19:45:11 -06:00 committed by alexey-milovidov
parent 6d4f106f11
commit c6e959d1e9
5 changed files with 346 additions and 159 deletions

View File

@ -29,7 +29,7 @@ public:
using NestedFieldList = std::vector<NestedField>;
/** schema_dir - base path for schema files
* schema_file - location of the capnproto schema, e.g. "schema.canpn"
* schema_file - location of the capnproto schema, e.g. "schema.capnp"
* root_object - name to the root object, e.g. "Message"
*/
CapnProtoRowInputStream(ReadBuffer & istr_, const Block & header_, const String & schema_dir, const String & schema_file, const String & root_object);

View File

@ -717,10 +717,10 @@ StoragePtr StorageFactory::get(
* - Schema (optional, if the format supports it)
*/
if (!args_ptr || !(args_ptr->size() == 4 || args_ptr->size() == 5))
if (!args_ptr || args_ptr->size() < 3 || args_ptr->size() > 6)
throw Exception(
"Storage Kafka requires 4 parameters"
" - Kafka broker list, list of topics to consume, consumer group ID, message format",
"Storage Kafka requires 3-6 parameters"
" - Kafka broker list, list of topics to consume, consumer group ID, message format, schema, number of consumers",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = *args_ptr;
@ -735,12 +735,28 @@ StoragePtr StorageFactory::get(
args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(args[2], local_context);
args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(args[3], local_context);
// Additionally parse schema if supported
// Parse format schema if supported (optional)
String schema;
if (args.size() == 5)
if (args.size() >= 5)
{
args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(args[4], local_context);
schema = static_cast<const ASTLiteral &>(*args[4]).value.safeGet<String>();
auto ast = typeid_cast<ASTLiteral *>(&*args[4]);
if (ast && ast->value.getType() == Field::Types::String)
schema = safeGet<String>(ast->value);
else
throw Exception("Format schema must be a string", ErrorCodes::BAD_ARGUMENTS);
}
// Parse number of consumers (optional)
UInt64 num_consumers = 1;
if (args.size() >= 6)
{
auto ast = typeid_cast<ASTLiteral *>(&*args[5]);
if (ast && ast->value.getType() == Field::Types::UInt64)
num_consumers = safeGet<UInt64>(ast->value);
else
throw Exception("Number of consumers must be a positive integer", ErrorCodes::BAD_ARGUMENTS);
}
// Parse topic list and consumer group
@ -754,14 +770,14 @@ StoragePtr StorageFactory::get(
if (ast && ast->value.getType() == Field::Types::String)
format = safeGet<String>(ast->value);
else
throw Exception(String("Format must be a string"), ErrorCodes::BAD_ARGUMENTS);
throw Exception("Format must be a string", ErrorCodes::BAD_ARGUMENTS);
return StorageKafka::create(
table_name, database_name, context, columns,
materialized_columns, alias_columns, column_defaults,
brokers, group, topics, format, schema);
brokers, group, topics, format, schema, num_consumers);
#else
throw Exception{"Storage `Kafka` disabled because ClickHouse built without kafka support.", ErrorCodes::SUPPORT_IS_DISABLED};
throw Exception("Storage `Kafka` disabled because ClickHouse built without Kafka support.", ErrorCodes::SUPPORT_IS_DISABLED);
#endif
}
else if (endsWith(name, "MergeTree"))

View File

@ -3,11 +3,13 @@
#if USE_RDKAFKA
#include <thread>
#include <boost/algorithm/string/replace.hpp>
#include <Common/Exception.h>
#include <Common/setThreadName.h>
#include <DataStreams/FormatFactory.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h>
@ -31,104 +33,70 @@ namespace ErrorCodes
extern const int INCORRECT_DATA;
extern const int UNKNOWN_EXCEPTION;
extern const int CANNOT_READ_FROM_ISTREAM;
extern const int SUPPORT_IS_DISABLED;
extern const int INVALID_CONFIG_PARAMETER;
extern const int LOGICAL_ERROR;
}
/// How long to wait for a single message (applies to each individual message)
static const auto READ_POLL_MS = 1 * 1000;
static const auto CLEANUP_TIMEOUT_MS = 2 * 1000;
using namespace Poco::Util;
/// How many messages to pull out of internal queue at once
static const UInt64 BATCH_SIZE_MAX = 16;
/// How long to wait for a single message (applies to each individual message)
static const auto READ_POLL_MS = 500;
static const auto CLEANUP_TIMEOUT_MS = 3000;
/// Configuration prefix
static const String CONFIG_PREFIX = "kafka";
class ReadBufferFromKafkaConsumer : public ReadBuffer
{
using Messages = std::vector<rd_kafka_message_t *>;
rd_kafka_t * consumer;
Messages messages;
Messages::iterator current;
Messages::iterator end;
rd_kafka_message_t * current;
Poco::Logger * log;
bool eof = false;
bool nextImpl() override
{
if (current == end)
{
// EOF reached in the previous batch, bail
if (eof)
return false;
// Fetch next batch of messages
bool res = fetchMessages();
if (!res)
{
LOG_ERROR(log, "Consumer error: " << rd_kafka_err2str(rd_kafka_last_error()));
return false;
}
// No error, but no messages read
if (current == end)
{
LOG_DEBUG(log, "No messages consumed.");
return false;
}
}
reset();
// Process next buffered message
rd_kafka_message_t * msg = *(current++);
rd_kafka_message_t * msg = rd_kafka_consumer_poll(consumer, READ_POLL_MS);
if (msg == nullptr)
return false;
if (msg->err)
{
if (msg->err != RD_KAFKA_RESP_ERR__PARTITION_EOF)
LOG_ERROR(log, "Consumer error: " << rd_kafka_err2str(msg->err) << " " << rd_kafka_message_errstr(msg));
else
{
// Reach EOF while reading current batch, skip it
eof = true;
if (current != end)
return nextImpl();
LOG_ERROR(log, "Consumer error: " << rd_kafka_err2str(msg->err) << " " << rd_kafka_message_errstr(msg));
rd_kafka_message_destroy(msg);
return false;
}
return false;
// Reach EOF while reading current batch, skip it
LOG_TRACE(log, "EOF reached for partition " << msg->partition << " offset " << msg->offset);
rd_kafka_message_destroy(msg);
return nextImpl();
}
// Consume message and mark the topic/partition offset
// The offsets will be committed in the insertSuffix() method after the block is completed
// If an exception is thrown before that would occur, the client will rejoin without comitting offsets
BufferBase::set(reinterpret_cast<char *>(msg->payload), msg->len, 0);
auto err = rd_kafka_offset_store(msg->rkt, msg->partition, msg->offset);
if (err)
LOG_ERROR(log, "Failed to store offsets: " << rd_kafka_err2str(err));
current = msg;
return true;
}
void reset()
{
for (auto it = messages.begin(); it < end; ++it)
rd_kafka_message_destroy(*it);
current = end = messages.begin();
}
bool fetchMessages()
{
rd_kafka_queue_t* queue = rd_kafka_queue_get_consumer(consumer);
if (queue == nullptr)
return false;
reset();
auto result = rd_kafka_consume_batch_queue(queue, READ_POLL_MS, messages.data(), messages.size());
if (result < 0)
return false;
current = messages.begin();
end = current + result;
return true;
if (current != nullptr)
{
rd_kafka_message_destroy(current);
current = nullptr;
}
}
public:
ReadBufferFromKafkaConsumer(rd_kafka_t * consumer_, size_t batch_size, Poco::Logger * log_)
: ReadBuffer(nullptr, 0), consumer(consumer_), messages(batch_size), current(messages.begin()), end(messages.begin()), log(log_) {}
ReadBufferFromKafkaConsumer(rd_kafka_t * consumer_, Poco::Logger * log_)
: ReadBuffer(nullptr, 0), consumer(consumer_), current(nullptr), log(log_) {}
~ReadBufferFromKafkaConsumer() { reset(); }
};
@ -137,22 +105,34 @@ class KafkaBlockInputStream : public IProfilingBlockInputStream
{
public:
KafkaBlockInputStream(StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size)
: storage(storage_)
KafkaBlockInputStream(StorageKafka & storage_, StorageKafka::ConsumerPtr consumer_, const Context & context_, const String & schema, size_t max_block_size)
: storage(storage_), consumer(consumer_)
{
// Always skip unknown fields regardless of the context (JSON or TSKV)
Context context = context_;
context.setSetting("input_format_skip_unknown_fields", UInt64(1));
if (schema.size() > 0)
context.setSetting("format_schema", schema);
// Create a formatted reader on Kafka messages
LOG_TRACE(storage.log, "Creating formatted reader");
read_buf = std::make_unique<ReadBufferFromKafkaConsumer>(storage.consumer, max_block_size, storage.log);
read_buf = std::make_unique<ReadBufferFromKafkaConsumer>(consumer->stream, storage.log);
reader = FormatFactory().getInput(storage.format_name, *read_buf, storage.getSampleBlock(), context, max_block_size);
}
~KafkaBlockInputStream() override
{
// An error was thrown during the stream or it did not finish successfully
// The read offsets weren't comitted, so consumer must rejoin the group from the original starting point
if (!finalized)
{
LOG_TRACE(storage.log, "KafkaBlockInputStream did not finish successfully, unsubscribing from assignments and rejoining");
consumer->unsubscribe();
consumer->subscribe(storage.topics);
}
// Return consumer for another reader
storage.pushConsumer(consumer);
}
String getName() const override
@ -177,21 +157,49 @@ public:
void readPrefixImpl() override
{
// Start reading data
finalized = false;
reader->readPrefix();
}
void readSuffixImpl() override
{
reader->readSuffix();
// Store offsets read in this stream asynchronously
auto err = rd_kafka_commit(consumer->stream, NULL, 1 /* async */);
if (err)
throw Exception("Failed to commit offsets: " + String(rd_kafka_err2str(err)), ErrorCodes::UNKNOWN_EXCEPTION);
// Mark as successfully finished
finalized = true;
}
private:
StorageKafka & storage;
StorageKafka::ConsumerPtr consumer;
Block sample_block;
std::unique_ptr<ReadBufferFromKafkaConsumer> read_buf;
BlockInputStreamPtr reader;
bool finalized = false;
};
static void loadFromConfig(struct rd_kafka_conf_s * conf, const AbstractConfiguration & config, const std::string & path)
{
AbstractConfiguration::Keys keys;
std::vector<char> errstr(512);
config.keys(path, keys);
for (const auto & key : keys)
{
const String key_path = path + "." + key;
const String key_name = boost::replace_all_copy(key, "_", ".");
if (rd_kafka_conf_set(conf, key_name.c_str(), config.getString(key_path).c_str(), errstr.data(), errstr.size()) != RD_KAFKA_CONF_OK)
throw Exception("Invalid Kafka setting " + key_path + " in config: " + String(errstr.data()), ErrorCodes::INVALID_CONFIG_PARAMETER);
}
}
StorageKafka::StorageKafka(
const std::string & table_name_,
const std::string & database_name_,
@ -201,33 +209,13 @@ StorageKafka::StorageKafka(
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, const String & schema_name_)
const String & format_name_, const String & schema_name_, size_t num_consumers_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
table_name(table_name_), database_name(database_name_), context(context_),
columns(columns_), topics(topics_), format_name(format_name_), schema_name(schema_name_),
conf(rd_kafka_conf_new()), log(&Logger::get("StorageKafka (" + table_name_ + ")"))
columns(columns_), topics(topics_), brokers(brokers_), group(group_), format_name(format_name_), schema_name(schema_name_),
num_consumers(num_consumers_), log(&Logger::get("StorageKafka (" + table_name_ + ")")),
semaphore(0, num_consumers_), mutex(), consumers(), event_update()
{
std::vector<char> errstr(512);
LOG_TRACE(log, "Setting brokers: " << brokers_);
if (rd_kafka_conf_set(conf, "metadata.broker.list", brokers_.c_str(), errstr.data(), errstr.size()) != RD_KAFKA_CONF_OK)
throw Exception(String(errstr.data()), ErrorCodes::INCORRECT_DATA);
LOG_TRACE(log, "Setting Group ID: " << group_ << " Client ID: clickhouse");
if (rd_kafka_conf_set(conf, "group.id", group_.c_str(), errstr.data(), errstr.size()) != RD_KAFKA_CONF_OK)
throw Exception(String(errstr.data()), ErrorCodes::INCORRECT_DATA);
if (rd_kafka_conf_set(conf, "client.id", VERSION_FULL, errstr.data(), errstr.size()) != RD_KAFKA_CONF_OK)
throw Exception(String(errstr.data()), ErrorCodes::INCORRECT_DATA);
// Don't store offsets of messages before they're processed
rd_kafka_conf_set(conf, "enable.auto.offset.store", "false", nullptr, 0);
// Try to fetch preferred number of bytes before timeout
const Settings & settings = context.getSettingsRef();
auto min_bytes = settings.preferred_block_size_bytes.toString();
rd_kafka_conf_set(conf, "fetch.min.bytes", min_bytes.c_str(), nullptr, 0);
}
@ -242,47 +230,52 @@ BlockInputStreams StorageKafka::read(
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
if (!conf)
if (num_consumers == 0)
return BlockInputStreams();
BlockInputStreams streams;
streams.reserve(num_streams);
streams.reserve(std::min(num_streams, num_consumers));
// Note: The block size is set to 1, otherwise it'd have to be able to return excess buffered messages
for (size_t i = 0; i < num_streams; ++i)
streams.push_back(std::make_shared<KafkaBlockInputStream>(*this, context, schema_name, 1));
// Claim as many consumers as requested, but don't block
for (size_t i = 0; i < streams.capacity(); ++i)
{
auto consumer = claimConsumer(0);
if (consumer == nullptr)
break;
LOG_DEBUG(log, "Starting reading " << num_streams << " streams, " << max_block_size << " block size");
streams.push_back(std::make_shared<KafkaBlockInputStream>(*this, consumer, context, schema_name, 1));
}
LOG_DEBUG(log, "Starting reading " << streams.size() << " streams, " << max_block_size << " block size");
return streams;
}
void StorageKafka::startup()
{
std::vector<char> errstr(512);
// Create a consumer from saved configuration
consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr.data(), errstr.size());
if (consumer == nullptr)
throw Exception("Failed to create consumer handle: " + String(errstr.data()), ErrorCodes::UNKNOWN_EXCEPTION);
rd_kafka_poll_set_consumer(consumer);
// Create a list of partitions
auto * topicList = rd_kafka_topic_partition_list_new(topics.size());
for (const auto & t : topics)
for (size_t i = 0; i < num_consumers; ++i)
{
LOG_TRACE(log, "Subscribing to topic: " + t);
rd_kafka_topic_partition_list_add(topicList, t.c_str(), RD_KAFKA_PARTITION_UA);
// Building configuration may throw, the consumer configuration must be destroyed in that case
auto consumer_conf = rd_kafka_conf_new();
try
{
consumerConfiguration(consumer_conf);
}
catch (...)
{
rd_kafka_conf_destroy(consumer_conf);
throw;
}
// Create a consumer and subscribe to topics
// Note: consumer takes ownership of the configuration
auto consumer = std::make_shared<StorageKafka::Consumer>(consumer_conf);
consumer->subscribe(topics);
// Make consumer available
pushConsumer(consumer);
}
// Subscribe to requested topics
auto err = rd_kafka_subscribe(consumer, topicList);
if (err)
throw Exception("Failed to subscribe: " + String(rd_kafka_err2str(err)), ErrorCodes::UNKNOWN_EXCEPTION);
rd_kafka_topic_partition_list_destroy(topicList);
// Start the reader thread
stream_thread = std::thread(&StorageKafka::streamThread, this);
}
@ -290,19 +283,19 @@ void StorageKafka::startup()
void StorageKafka::shutdown()
{
is_cancelled = true;
cancel_event.set();
// Interrupt streaming thread
stream_cancelled = true;
event_update.set();
// Unsubscribe from assignments
LOG_TRACE(log, "Unsubscribing from assignments");
rd_kafka_unsubscribe(consumer);
auto err = rd_kafka_consumer_close(consumer);
if (err)
for (size_t i = 0; i < num_consumers; ++i)
{
LOG_ERROR(log, "Failed to close: " + String(rd_kafka_err2str(err)));
auto consumer = claimConsumer(-1);
consumer->unsubscribe();
}
LOG_TRACE(log, "Destroying consumer");
rd_kafka_destroy(consumer);
// Wait for stream thread to finish
if (stream_thread.joinable())
stream_thread.join();
@ -312,21 +305,85 @@ void StorageKafka::shutdown()
void StorageKafka::updateDependencies()
{
cancel_event.set();
event_update.set();
}
void StorageKafka::consumerConfiguration(struct rd_kafka_conf_s * conf)
{
std::vector<char> errstr(512);
LOG_TRACE(log, "Setting brokers: " << brokers);
if (rd_kafka_conf_set(conf, "metadata.broker.list", brokers.c_str(), errstr.data(), errstr.size()) != RD_KAFKA_CONF_OK)
throw Exception(String(errstr.data()), ErrorCodes::INCORRECT_DATA);
LOG_TRACE(log, "Setting Group ID: " << group << " Client ID: clickhouse");
if (rd_kafka_conf_set(conf, "group.id", group.c_str(), errstr.data(), errstr.size()) != RD_KAFKA_CONF_OK)
throw Exception(String(errstr.data()), ErrorCodes::INCORRECT_DATA);
if (rd_kafka_conf_set(conf, "client.id", VERSION_FULL, errstr.data(), errstr.size()) != RD_KAFKA_CONF_OK)
throw Exception(String(errstr.data()), ErrorCodes::INCORRECT_DATA);
// We manually commit offsets after a stream successfully finished
rd_kafka_conf_set(conf, "enable.auto.commit", "false", nullptr, 0);
// Update consumer configuration from the configuration
const auto & config = context.getConfigRef();
if (config.has(CONFIG_PREFIX))
loadFromConfig(conf, config, CONFIG_PREFIX);
// Update consumer topic-specific configuration
for (const auto & topic : topics)
{
const auto topic_config_key = CONFIG_PREFIX + "_" + topic;
if (config.has(topic_config_key))
loadFromConfig(conf, config, topic_config_key);
}
}
StorageKafka::ConsumerPtr StorageKafka::claimConsumer(long wait_ms)
{
// Wait for the first free consumer
if (wait_ms >= 0)
{
if (!semaphore.tryWait(wait_ms))
return nullptr;
}
else
semaphore.wait();
// Take the first available consumer from the list
std::lock_guard<std::mutex> lock(mutex);
auto consumer = consumers.back();
consumers.pop_back();
return consumer;
}
void StorageKafka::pushConsumer(StorageKafka::ConsumerPtr c)
{
std::lock_guard<std::mutex> lock(mutex);
consumers.push_back(c);
semaphore.set();
}
void StorageKafka::streamThread()
{
setThreadName("KafkaStreamThread");
while (!is_cancelled)
while (!stream_cancelled)
{
try
{
auto dependencies = context.getDependencies(database_name, table_name);
if (dependencies.size() > 0)
// Keep streaming as long as there are attached views and streaming is not cancelled
while (!stream_cancelled)
{
// Check if all dependencies are attached
auto dependencies = context.getDependencies(database_name, table_name);
if (dependencies.size() == 0)
break;
LOG_DEBUG(log, "Started streaming to " << dependencies.size() << " attached views");
streamToViews();
LOG_DEBUG(log, "Stopped streaming to views");
@ -337,7 +394,8 @@ void StorageKafka::streamThread()
tryLogCurrentException(__PRETTY_FUNCTION__);
}
cancel_event.tryWait(READ_POLL_MS);
// Wait for attached views
event_update.tryWait(READ_POLL_MS);
}
LOG_DEBUG(log, "Stream thread finished");
@ -348,10 +406,7 @@ void StorageKafka::streamToViews()
{
auto table = context.getTable(database_name, table_name);
if (!table)
{
LOG_WARNING(log, "Destination table " << database_name << "." << table_name << " doesn't exist.");
return;
}
throw Exception("Engine table " + database_name + "." + table_name + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);
// Create an INSERT query for streaming data
auto insert = std::make_shared<ASTInsertQuery>();
@ -361,8 +416,18 @@ void StorageKafka::streamToViews()
// Limit the number of batched messages to allow early cancellations
const Settings & settings = context.getSettingsRef();
const size_t block_size = std::min(settings.max_block_size.value, BATCH_SIZE_MAX);
BlockInputStreamPtr in = std::make_shared<KafkaBlockInputStream>(*this, context, schema_name, block_size);
const size_t block_size = settings.max_block_size.value;
// Create a stream for each consumer and join them in a union stream
BlockInputStreams streams;
streams.reserve(num_consumers);
for (size_t i = 0; i < num_consumers; ++i)
{
auto consumer = claimConsumer(-1);
streams.push_back(std::make_shared<KafkaBlockInputStream>(*this, consumer, context, schema_name, block_size));
}
auto in = std::make_shared<UnionBlockInputStream<>>(streams, nullptr, num_consumers);
// Limit read batch to maximum block size to allow DDL
IProfilingBlockInputStream::LocalLimits limits;
@ -374,7 +439,63 @@ void StorageKafka::streamToViews()
// Execute the query
InterpreterInsertQuery interpreter{insert, context};
auto block_io = interpreter.execute();
copyData(*in, *block_io.out, &is_cancelled);
copyData(*in, *block_io.out, &stream_cancelled);
}
StorageKafka::Consumer::Consumer(struct rd_kafka_conf_s * conf)
{
std::vector<char> errstr(512);
stream = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr.data(), errstr.size());
if (stream == nullptr)
{
rd_kafka_conf_destroy(conf);
throw Exception("Failed to create consumer handle: " + String(errstr.data()), ErrorCodes::UNKNOWN_EXCEPTION);
}
rd_kafka_poll_set_consumer(stream);
}
StorageKafka::Consumer::~Consumer()
{
if (stream != nullptr)
{
rd_kafka_consumer_close(stream);
rd_kafka_destroy(stream);
stream = nullptr;
}
}
void StorageKafka::Consumer::subscribe(const Names & topics)
{
if (stream == nullptr)
throw Exception("Cannot subscribe to topics when consumer is closed", ErrorCodes::UNKNOWN_EXCEPTION);
// Create a list of partitions
auto * topicList = rd_kafka_topic_partition_list_new(topics.size());
for (const auto & t : topics)
{
rd_kafka_topic_partition_list_add(topicList, t.c_str(), RD_KAFKA_PARTITION_UA);
}
// Subscribe to requested topics
auto err = rd_kafka_subscribe(stream, topicList);
if (err)
{
rd_kafka_topic_partition_list_destroy(topicList);
throw Exception("Failed to subscribe: " + String(rd_kafka_err2str(err)), ErrorCodes::UNKNOWN_EXCEPTION);
}
rd_kafka_topic_partition_list_destroy(topicList);
}
void StorageKafka::Consumer::unsubscribe()
{
if (stream != nullptr)
rd_kafka_unsubscribe(stream);
}

View File

@ -9,6 +9,7 @@
#include <Storages/IStorage.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Poco/Event.h>
#include <Poco/Semaphore.h>
struct rd_kafka_s;
struct rd_kafka_conf_s;
@ -53,21 +54,46 @@ public:
void updateDependencies() override;
private:
/// Each engine typically has one consumer (able to process 1..N partitions)
/// In the future the table engine could have multiple consumers for better throughput
struct Consumer
{
Consumer(struct rd_kafka_conf_s * conf);
~Consumer();
void subscribe(const Names & topics);
void unsubscribe();
struct rd_kafka_s * stream = nullptr;
};
using ConsumerPtr = std::shared_ptr<Consumer>;
// Configuration and state
String table_name;
String database_name;
Context & context;
NamesAndTypesListPtr columns;
Names topics;
const String brokers;
const String group;
const String format_name;
const String schema_name;
struct rd_kafka_conf_s * conf;
struct rd_kafka_s * consumer;
std::mutex mutex;
size_t num_consumers; /// Total number of created consumers
Poco::Logger * log;
Poco::Event cancel_event;
std::atomic<bool> is_cancelled{false};
// Consumer list
Poco::Semaphore semaphore;
std::mutex mutex;
std::vector<ConsumerPtr> consumers; /// Available consumers
// Stream thread
Poco::Event event_update;
std::thread stream_thread;
std::atomic<bool> stream_cancelled{false};
void consumerConfiguration(struct rd_kafka_conf_s * conf);
ConsumerPtr claimConsumer(long wait_ms);
void pushConsumer(ConsumerPtr c);
void streamThread();
void streamToViews();
@ -82,7 +108,7 @@ protected:
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, const String & schema_name_);
const String & format_name_, const String & schema_name_, size_t num_consumers_);
};
}

View File

@ -9,7 +9,7 @@ A table engine backed by Apache Kafka, a streaming platform having three key cap
.. code-block:: text
Kafka(broker_list, topic_list, group_name, format[, schema])
Kafka(broker_list, topic_list, group_name, format[, schema, num_consumers])
Engine parameters:
@ -29,6 +29,9 @@ schema
Optional schema value for formats that require a schema to interpret consumed messages, for example Cap'n Proto format requires
a path to schema file and root object - ``schema:Message``. Self-describing formats such as JSON don't require any schema.
num_consumers
Number of created consumers per engine. By default ``1``. Create more consumers if the throughput of a single consumer is insufficient. The total number of consumers shouldn't exceed the number of partitions in given topic, as there can be at most 1 consumers assigned to any single partition.
Example:
.. code-block:: sql
@ -85,3 +88,24 @@ In order to stop topic consumption, or alter the transformation logc, you simply
ATTACH MATERIALIZED VIEW consumer;
Note: When you're performing ALTERs on target table, it's recommended to detach materializing views to prevent a mismatch between the current schema and the result of MATERIALIZED VIEWS.
Configuration
~~~~~~~~~~~~~
Similarly to GraphiteMergeTree, Kafka engine supports extended configuration through the ClickHouse config file. There are two configuration keys you can use - global, and per-topic. The global configuration is applied first, then per-topic configuration (if exists).
.. code-block:: xml
<!-- Global configuration options for all tables of Kafka engine type -->
<kafka>
<debug>cgrp</debug>
<auto_offset_reset>smallest</auto_offset_reset>
</kafka>
<!-- Configuration specific for topic "logs" -->
<kafka_topic_logs>
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_topic_logs>
See `librdkafka configuration reference <https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md>`_ for the list of possible configuration options. Use underscores instead of dots in the ClickHouse configuration, for example ``check.crcs=true`` would correspond to ``<check_crcs>true</check_crcs>``.