Merge pull request #4025 from abyss7/fix-kafka

Update Kafka libraries
This commit is contained in:
alexey-milovidov 2019-01-15 00:23:20 +03:00 committed by GitHub
commit bcc54d1606
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 176 additions and 233 deletions

3
.gitignore vendored
View File

@ -251,3 +251,6 @@ website/package-lock.json
# cquery cache
/.cquery-cache
# ccls cache
/.ccls-cache

3
.gitmodules vendored
View File

@ -61,3 +61,6 @@
[submodule "contrib/libgsasl"]
path = contrib/libgsasl
url = https://github.com/ClickHouse-Extras/libgsasl.git
[submodule "contrib/cppkafka"]
path = contrib/cppkafka
url = https://github.com/mfontanini/cppkafka.git

View File

@ -20,11 +20,13 @@ if (NOT USE_INTERNAL_RDKAFKA_LIBRARY)
if (USE_STATIC_LIBRARIES AND NOT OS_FREEBSD)
find_library (SASL2_LIBRARY sasl2)
endif ()
set (CPPKAFKA_LIBRARY cppkafka) # TODO: try to use unbundled version.
endif ()
if (RDKAFKA_LIB AND RDKAFKA_INCLUDE_DIR)
set (USE_RDKAFKA 1)
set (RDKAFKA_LIBRARY ${RDKAFKA_LIB} ${OPENSSL_LIBRARIES})
set (CPPKAFKA_LIBRARY cppkafka)
if (SASL2_LIBRARY)
list (APPEND RDKAFKA_LIBRARY ${SASL2_LIBRARY})
endif ()
@ -35,9 +37,10 @@ elseif (NOT MISSING_INTERNAL_RDKAFKA_LIBRARY AND NOT ARCH_ARM)
set (USE_INTERNAL_RDKAFKA_LIBRARY 1)
set (RDKAFKA_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/librdkafka/src")
set (RDKAFKA_LIBRARY rdkafka)
set (CPPKAFKA_LIBRARY cppkafka)
set (USE_RDKAFKA 1)
endif ()
endif ()
message (STATUS "Using librdkafka=${USE_RDKAFKA}: ${RDKAFKA_INCLUDE_DIR} : ${RDKAFKA_LIBRARY}")
message (STATUS "Using librdkafka=${USE_RDKAFKA}: ${RDKAFKA_INCLUDE_DIR} : ${RDKAFKA_LIBRARY} ${CPPKAFKA_LIBRARY}")

View File

@ -125,6 +125,10 @@ if (USE_INTERNAL_RDKAFKA_LIBRARY)
target_include_directories(rdkafka BEFORE PRIVATE ${OPENSSL_INCLUDE_DIR})
endif ()
if (USE_RDKAFKA)
add_subdirectory (cppkafka-cmake)
endif()
if (ENABLE_ODBC AND USE_INTERNAL_ODBC_LIBRARY)
add_subdirectory (unixodbc-cmake)
endif ()

1
contrib/cppkafka vendored Submodule

@ -0,0 +1 @@
Subproject commit 520465510efef7704346cf8d140967c4abb057c1

View File

@ -0,0 +1,31 @@
set(CPPKAFKA_DIR ${CMAKE_SOURCE_DIR}/contrib/cppkafka)
set(SRCS
${CPPKAFKA_DIR}/src/configuration.cpp
${CPPKAFKA_DIR}/src/topic_configuration.cpp
${CPPKAFKA_DIR}/src/configuration_option.cpp
${CPPKAFKA_DIR}/src/exceptions.cpp
${CPPKAFKA_DIR}/src/topic.cpp
${CPPKAFKA_DIR}/src/buffer.cpp
${CPPKAFKA_DIR}/src/queue.cpp
${CPPKAFKA_DIR}/src/message.cpp
${CPPKAFKA_DIR}/src/message_timestamp.cpp
${CPPKAFKA_DIR}/src/message_internal.cpp
${CPPKAFKA_DIR}/src/topic_partition.cpp
${CPPKAFKA_DIR}/src/topic_partition_list.cpp
${CPPKAFKA_DIR}/src/metadata.cpp
${CPPKAFKA_DIR}/src/group_information.cpp
${CPPKAFKA_DIR}/src/error.cpp
${CPPKAFKA_DIR}/src/event.cpp
${CPPKAFKA_DIR}/src/kafka_handle_base.cpp
${CPPKAFKA_DIR}/src/producer.cpp
${CPPKAFKA_DIR}/src/consumer.cpp
)
add_library(cppkafka ${LINK_MODE} ${SRCS})
target_link_libraries(cppkafka PRIVATE ${RDKAFKA_LIBRARY})
target_include_directories(cppkafka PRIVATE ${CPPKAFKA_DIR}/include/cppkafka)
target_include_directories(cppkafka PRIVATE ${Boost_INCLUDE_DIRS})
target_include_directories(cppkafka SYSTEM PUBLIC ${CPPKAFKA_DIR}/include)

2
contrib/librdkafka vendored

@ -1 +1 @@
Subproject commit 7478b5ef16aadd6543fe38bc6a2deb895c70da98
Subproject commit 363dcad5a23dc29381cc626620e68ae418b3af19

View File

@ -1,60 +1,63 @@
set(RDKAFKA_SOURCE_DIR ${CMAKE_SOURCE_DIR}/contrib/librdkafka/src)
set(SRCS
${RDKAFKA_SOURCE_DIR}/crc32c.c
${RDKAFKA_SOURCE_DIR}/rdaddr.c
${RDKAFKA_SOURCE_DIR}/rdavl.c
${RDKAFKA_SOURCE_DIR}/rdbuf.c
${RDKAFKA_SOURCE_DIR}/rdcrc32.c
${RDKAFKA_SOURCE_DIR}/rdkafka.c
${RDKAFKA_SOURCE_DIR}/rdkafka_assignor.c
${RDKAFKA_SOURCE_DIR}/rdkafka_broker.c
${RDKAFKA_SOURCE_DIR}/rdkafka_buf.c
${RDKAFKA_SOURCE_DIR}/rdkafka_cgrp.c
${RDKAFKA_SOURCE_DIR}/rdkafka_conf.c
${RDKAFKA_SOURCE_DIR}/rdkafka_event.c
${RDKAFKA_SOURCE_DIR}/rdkafka_feature.c
${RDKAFKA_SOURCE_DIR}/rdkafka_lz4.c
${RDKAFKA_SOURCE_DIR}/rdkafka_metadata.c
${RDKAFKA_SOURCE_DIR}/rdkafka_metadata_cache.c
${RDKAFKA_SOURCE_DIR}/rdkafka_msg.c
${RDKAFKA_SOURCE_DIR}/rdkafka_msgset_reader.c
${RDKAFKA_SOURCE_DIR}/rdkafka_msgset_writer.c
${RDKAFKA_SOURCE_DIR}/rdkafka_offset.c
${RDKAFKA_SOURCE_DIR}/rdkafka_op.c
${RDKAFKA_SOURCE_DIR}/rdkafka_partition.c
${RDKAFKA_SOURCE_DIR}/rdkafka_pattern.c
${RDKAFKA_SOURCE_DIR}/rdkafka_queue.c
${RDKAFKA_SOURCE_DIR}/rdkafka_range_assignor.c
${RDKAFKA_SOURCE_DIR}/rdkafka_request.c
${RDKAFKA_SOURCE_DIR}/rdkafka_roundrobin_assignor.c
${RDKAFKA_SOURCE_DIR}/rdkafka_sasl.c
${RDKAFKA_SOURCE_DIR}/rdkafka_sasl_plain.c
${RDKAFKA_SOURCE_DIR}/rdkafka_subscription.c
${RDKAFKA_SOURCE_DIR}/rdkafka_timer.c
${RDKAFKA_SOURCE_DIR}/rdkafka_topic.c
${RDKAFKA_SOURCE_DIR}/rdkafka_transport.c
${RDKAFKA_SOURCE_DIR}/rdkafka_interceptor.c
${RDKAFKA_SOURCE_DIR}/rdkafka_header.c
${RDKAFKA_SOURCE_DIR}/rdlist.c
${RDKAFKA_SOURCE_DIR}/rdlog.c
${RDKAFKA_SOURCE_DIR}/rdmurmur2.c
${RDKAFKA_SOURCE_DIR}/rdports.c
${RDKAFKA_SOURCE_DIR}/rdrand.c
${RDKAFKA_SOURCE_DIR}/rdregex.c
${RDKAFKA_SOURCE_DIR}/rdstring.c
${RDKAFKA_SOURCE_DIR}/rdunittest.c
${RDKAFKA_SOURCE_DIR}/rdvarint.c
${RDKAFKA_SOURCE_DIR}/snappy.c
${RDKAFKA_SOURCE_DIR}/tinycthread.c
${RDKAFKA_SOURCE_DIR}/xxhash.c
${RDKAFKA_SOURCE_DIR}/lz4.c
${RDKAFKA_SOURCE_DIR}/lz4frame.c
${RDKAFKA_SOURCE_DIR}/lz4hc.c
${RDKAFKA_SOURCE_DIR}/rdgz.c
${RDKAFKA_SOURCE_DIR}/crc32c.c
${RDKAFKA_SOURCE_DIR}/rdaddr.c
${RDKAFKA_SOURCE_DIR}/rdavl.c
${RDKAFKA_SOURCE_DIR}/rdbuf.c
${RDKAFKA_SOURCE_DIR}/rdcrc32.c
${RDKAFKA_SOURCE_DIR}/rdkafka.c
${RDKAFKA_SOURCE_DIR}/rdkafka_assignor.c
${RDKAFKA_SOURCE_DIR}/rdkafka_background.c
${RDKAFKA_SOURCE_DIR}/rdkafka_broker.c
${RDKAFKA_SOURCE_DIR}/rdkafka_buf.c
${RDKAFKA_SOURCE_DIR}/rdkafka_cgrp.c
${RDKAFKA_SOURCE_DIR}/rdkafka_conf.c
${RDKAFKA_SOURCE_DIR}/rdkafka_event.c
${RDKAFKA_SOURCE_DIR}/rdkafka_feature.c
${RDKAFKA_SOURCE_DIR}/rdkafka_idempotence.c
${RDKAFKA_SOURCE_DIR}/rdkafka_lz4.c
${RDKAFKA_SOURCE_DIR}/rdkafka_metadata.c
${RDKAFKA_SOURCE_DIR}/rdkafka_metadata_cache.c
${RDKAFKA_SOURCE_DIR}/rdkafka_msg.c
${RDKAFKA_SOURCE_DIR}/rdkafka_msgset_reader.c
${RDKAFKA_SOURCE_DIR}/rdkafka_msgset_writer.c
${RDKAFKA_SOURCE_DIR}/rdkafka_offset.c
${RDKAFKA_SOURCE_DIR}/rdkafka_op.c
${RDKAFKA_SOURCE_DIR}/rdkafka_partition.c
${RDKAFKA_SOURCE_DIR}/rdkafka_pattern.c
${RDKAFKA_SOURCE_DIR}/rdkafka_queue.c
${RDKAFKA_SOURCE_DIR}/rdkafka_range_assignor.c
${RDKAFKA_SOURCE_DIR}/rdkafka_request.c
${RDKAFKA_SOURCE_DIR}/rdkafka_roundrobin_assignor.c
${RDKAFKA_SOURCE_DIR}/rdkafka_sasl.c
${RDKAFKA_SOURCE_DIR}/rdkafka_sasl_plain.c
${RDKAFKA_SOURCE_DIR}/rdkafka_subscription.c
${RDKAFKA_SOURCE_DIR}/rdkafka_timer.c
${RDKAFKA_SOURCE_DIR}/rdkafka_topic.c
${RDKAFKA_SOURCE_DIR}/rdkafka_transport.c
${RDKAFKA_SOURCE_DIR}/rdkafka_interceptor.c
${RDKAFKA_SOURCE_DIR}/rdkafka_header.c
${RDKAFKA_SOURCE_DIR}/rdlist.c
${RDKAFKA_SOURCE_DIR}/rdlog.c
${RDKAFKA_SOURCE_DIR}/rdmurmur2.c
${RDKAFKA_SOURCE_DIR}/rdports.c
${RDKAFKA_SOURCE_DIR}/rdrand.c
${RDKAFKA_SOURCE_DIR}/rdregex.c
${RDKAFKA_SOURCE_DIR}/rdstring.c
${RDKAFKA_SOURCE_DIR}/rdunittest.c
${RDKAFKA_SOURCE_DIR}/rdvarint.c
${RDKAFKA_SOURCE_DIR}/snappy.c
${RDKAFKA_SOURCE_DIR}/tinycthread.c
${RDKAFKA_SOURCE_DIR}/tinycthread_extra.c
${RDKAFKA_SOURCE_DIR}/xxhash.c
${RDKAFKA_SOURCE_DIR}/lz4.c
${RDKAFKA_SOURCE_DIR}/lz4frame.c
${RDKAFKA_SOURCE_DIR}/lz4hc.c
${RDKAFKA_SOURCE_DIR}/rdgz.c
)
add_library(rdkafka ${LINK_MODE} ${SRCS})
target_include_directories(rdkafka PRIVATE include)
target_include_directories(rdkafka SYSTEM PUBLIC include)
target_include_directories(rdkafka SYSTEM PUBLIC ${RDKAFKA_SOURCE_DIR})
target_link_libraries(rdkafka PUBLIC ${ZLIB_LIBRARIES} ${OPENSSL_SSL_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY})

View File

@ -1,4 +1,4 @@
// Automatically generated by ./configure
// Automatically generated by ./configure
#ifndef _CONFIG_H_
#define _CONFIG_H_
#define ARCH "x86_64"
@ -71,4 +71,8 @@
#define HAVE_PTHREAD_SETNAME_GNU 1
// python
//#define HAVE_PYTHON 1
// C11 threads
#if (__STDC_VERSION__ >= 201112L) && !defined(__STDC_NO_THREADS__)
# define WITH_C11THREADS 1
#endif
#endif /* _CONFIG_H_ */

View File

@ -0,0 +1,5 @@
#if __has_include(<rdkafka.h>) // maybe bundled
# include_next <rdkafka.h> // Y_IGNORE
#else // system
# include_next <librdkafka/rdkafka.h>
#endif

View File

@ -287,6 +287,7 @@ endif ()
if (USE_RDKAFKA)
target_link_libraries (dbms PRIVATE ${RDKAFKA_LIBRARY})
target_link_libraries (dbms PRIVATE ${CPPKAFKA_LIBRARY})
if (NOT USE_INTERNAL_RDKAFKA_LIBRARY)
target_include_directories (dbms SYSTEM BEFORE PRIVATE ${RDKAFKA_INCLUDE_DIR})
endif ()

View File

@ -1,40 +1,34 @@
#include <Common/config.h>
#include <Common/config_version.h>
#include <Storages/Kafka/StorageKafka.h>
#if USE_RDKAFKA
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Macros.h>
#include <Common/Exception.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
#include <Formats/FormatFactory.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <Formats/FormatFactory.h>
#include <IO/ReadBuffer.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTCreateQuery.h>
#include <Storages/Kafka/KafkaSettings.h>
#include <Storages/Kafka/StorageKafka.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/StorageFactory.h>
#include <IO/ReadBuffer.h>
#include <Storages/StorageMaterializedView.h>
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/config_version.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
#include <common/logger_useful.h>
#if __has_include(<rdkafka.h>) // maybe bundled
#include <rdkafka.h> // Y_IGNORE
#else // system
#include <librdkafka/rdkafka.h>
#endif
namespace DB
{
@ -62,8 +56,8 @@ static const String CONFIG_PREFIX = "kafka";
class ReadBufferFromKafkaConsumer : public ReadBuffer
{
rd_kafka_t * consumer;
rd_kafka_message_t * current = nullptr;
ConsumerPtr consumer;
cppkafka::Message current;
bool current_pending = false; /// We've fetched "current" message and need to process it on the next iteration.
Poco::Logger * log;
size_t read_messages = 0;
@ -73,42 +67,36 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
{
if (current_pending)
{
BufferBase::set(reinterpret_cast<char *>(current->payload), current->len, 0);
// XXX: very fishy place with const casting.
BufferBase::set(reinterpret_cast<char *>(const_cast<unsigned char *>(current.get_payload().get_data())), current.get_payload().get_size(), 0);
current_pending = false;
return true;
}
// Process next buffered message
rd_kafka_message_t * msg = rd_kafka_consumer_poll(consumer, READ_POLL_MS); // XXX: use RAII.
if (msg == nullptr)
auto message = consumer->poll(std::chrono::milliseconds(READ_POLL_MS));
if (!message)
return false;
if (msg->err)
if (message.is_eof())
{
if (msg->err != RD_KAFKA_RESP_ERR__PARTITION_EOF)
{
LOG_ERROR(log, "Consumer error: " << rd_kafka_err2str(msg->err) << " " << rd_kafka_message_errstr(msg));
rd_kafka_message_destroy(msg);
return false;
}
// Reach EOF while reading current batch, skip it
LOG_TRACE(log, "EOF reached for partition " << msg->partition << " offset " << msg->offset);
rd_kafka_message_destroy(msg);
// Reached EOF while reading current batch, skip it.
LOG_TRACE(log, "EOF reached for partition " << message.get_partition() << " offset " << message.get_offset());
return nextImpl();
}
if (msg->len && !msg->payload)
throw Exception("Logical error: nullptr message returned with non-zero length", ErrorCodes::LOGICAL_ERROR);
else if (auto err = message.get_error())
{
LOG_ERROR(log, "Consumer error: " << err);
return false;
}
++read_messages;
// Now we've received a new message. Check if we need to produce a delimiter
if (row_delimiter != '\0' && current != nullptr)
if (row_delimiter != '\0' && current)
{
BufferBase::set(&row_delimiter, 1, 0);
reset();
current = msg;
current = std::move(message);
current_pending = true;
return true;
}
@ -116,31 +104,21 @@ class ReadBufferFromKafkaConsumer : public ReadBuffer
// Consume message and mark the topic/partition offset
// The offsets will be committed in the readSuffix() method after the block is completed
// If an exception is thrown before that would occur, the client will rejoin without committing offsets
reset();
current = msg;
BufferBase::set(reinterpret_cast<char *>(current->payload), current->len, 0);
current = std::move(message);
// XXX: very fishy place with const casting.
BufferBase::set(reinterpret_cast<char *>(const_cast<unsigned char *>(current.get_payload().get_data())), current.get_payload().get_size(), 0);
return true;
}
void reset()
{
if (current != nullptr)
{
rd_kafka_message_destroy(current);
current = nullptr;
}
}
public:
ReadBufferFromKafkaConsumer(rd_kafka_t * consumer_, Poco::Logger * log_, char row_delimiter_)
ReadBufferFromKafkaConsumer(ConsumerPtr consumer_, Poco::Logger * log_, char row_delimiter_)
: ReadBuffer(nullptr, 0), consumer(consumer_), log(log_), row_delimiter(row_delimiter_)
{
if (row_delimiter != '\0')
LOG_TRACE(log, "Row delimiter is: " << row_delimiter);
}
~ReadBufferFromKafkaConsumer() override { reset(); }
/// Commit messages read with this consumer
void commit()
{
@ -148,10 +126,7 @@ public:
if (read_messages == 0)
return;
auto err = rd_kafka_commit(consumer, nullptr, 1 /* async */);
if (err)
throw Exception("Failed to commit offsets: " + String(rd_kafka_err2str(err)), ErrorCodes::UNKNOWN_EXCEPTION);
consumer->async_commit();
read_messages = 0;
}
};
@ -215,7 +190,7 @@ public:
if (consumer == nullptr)
throw Exception("Failed to claim consumer: ", ErrorCodes::TIMEOUT_EXCEEDED);
read_buf = std::make_unique<ReadBufferFromKafkaConsumer>(consumer->stream, storage.log, storage.row_delimiter);
read_buf = std::make_unique<ReadBufferFromKafkaConsumer>(consumer, storage.log, storage.row_delimiter);
reader = FormatFactory::instance().getInput(storage.format_name, *read_buf, storage.getSampleBlock(), context, max_block_size);
}
@ -239,7 +214,7 @@ public:
private:
StorageKafka & storage;
StorageKafka::ConsumerPtr consumer;
ConsumerPtr consumer;
Context context;
size_t max_block_size;
Block sample_block;
@ -251,7 +226,7 @@ private:
bool hasClaimed() { return consumer != nullptr; }
};
static void loadFromConfig(struct rd_kafka_conf_s * conf, const AbstractConfiguration & config, const std::string & path)
static void loadFromConfig(cppkafka::Configuration & conf, const AbstractConfiguration & config, const std::string & path)
{
AbstractConfiguration::Keys keys;
std::vector<char> errstr(512);
@ -262,8 +237,7 @@ static void loadFromConfig(struct rd_kafka_conf_s * conf, const AbstractConfigur
{
const String key_path = path + "." + key;
const String key_name = boost::replace_all_copy(key, "_", ".");
if (rd_kafka_conf_set(conf, key_name.c_str(), config.getString(key_path).c_str(), errstr.data(), errstr.size()) != RD_KAFKA_CONF_OK)
throw Exception("Invalid Kafka setting " + key_path + " in config: " + String(errstr.data()), ErrorCodes::INVALID_CONFIG_PARAMETER);
conf.set(key_name, config.getString(key_path));
}
}
@ -326,21 +300,8 @@ void StorageKafka::startup()
{
for (size_t i = 0; i < num_consumers; ++i)
{
// Building configuration may throw, the consumer configuration must be destroyed in that case
auto consumer_conf = rd_kafka_conf_new();
try
{
consumerConfiguration(consumer_conf);
}
catch (...)
{
rd_kafka_conf_destroy(consumer_conf);
throw;
}
// Create a consumer and subscribe to topics
// Note: consumer takes ownership of the configuration
auto consumer = std::make_shared<StorageKafka::Consumer>(consumer_conf);
auto consumer = std::make_shared<cppkafka::Consumer>(createConsumerConfiguration());
consumer->subscribe(topics);
// Make consumer available
@ -362,7 +323,7 @@ void StorageKafka::shutdown()
for (size_t i = 0; i < num_created_consumers; ++i)
{
auto consumer = claimConsumer();
consumer->close();
// FIXME: not sure if really close consumers here, and if we really need to close them here.
}
LOG_TRACE(log, "Waiting for cleanup");
@ -378,24 +339,20 @@ void StorageKafka::updateDependencies()
}
void StorageKafka::consumerConfiguration(struct rd_kafka_conf_s * conf)
cppkafka::Configuration StorageKafka::createConsumerConfiguration()
{
std::vector<char> errstr(512);
cppkafka::Configuration conf;
LOG_TRACE(log, "Setting brokers: " << brokers);
if (rd_kafka_conf_set(conf, "metadata.broker.list", brokers.c_str(), errstr.data(), errstr.size()) != RD_KAFKA_CONF_OK)
throw Exception(String(errstr.data()), ErrorCodes::INCORRECT_DATA);
conf.set("metadata.broker.list", brokers);
LOG_TRACE(log, "Setting Group ID: " << group << " Client ID: clickhouse");
conf.set("group.id", group);
if (rd_kafka_conf_set(conf, "group.id", group.c_str(), errstr.data(), errstr.size()) != RD_KAFKA_CONF_OK)
throw Exception(String(errstr.data()), ErrorCodes::INCORRECT_DATA);
if (rd_kafka_conf_set(conf, "client.id", VERSION_FULL, errstr.data(), errstr.size()) != RD_KAFKA_CONF_OK)
throw Exception(String(errstr.data()), ErrorCodes::INCORRECT_DATA);
conf.set("client.id", VERSION_FULL);
// We manually commit offsets after a stream successfully finished
rd_kafka_conf_set(conf, "enable.auto.commit", "false", nullptr, 0);
conf.set("enable.auto.commit", "false");
// Update consumer configuration from the configuration
const auto & config = global_context.getConfigRef();
@ -409,14 +366,16 @@ void StorageKafka::consumerConfiguration(struct rd_kafka_conf_s * conf)
if (config.has(topic_config_key))
loadFromConfig(conf, config, topic_config_key);
}
return conf;
}
StorageKafka::ConsumerPtr StorageKafka::claimConsumer()
ConsumerPtr StorageKafka::claimConsumer()
{
return tryClaimConsumer(-1L);
}
StorageKafka::ConsumerPtr StorageKafka::tryClaimConsumer(long wait_ms)
ConsumerPtr StorageKafka::tryClaimConsumer(long wait_ms)
{
// Wait for the first free consumer
if (wait_ms >= 0)
@ -434,7 +393,7 @@ StorageKafka::ConsumerPtr StorageKafka::tryClaimConsumer(long wait_ms)
return consumer;
}
void StorageKafka::pushConsumer(StorageKafka::ConsumerPtr consumer)
void StorageKafka::pushConsumer(ConsumerPtr consumer)
{
std::lock_guard lock(mutex);
consumers.push_back(consumer);
@ -557,64 +516,6 @@ bool StorageKafka::streamToViews()
}
StorageKafka::Consumer::Consumer(struct rd_kafka_conf_s * conf)
{
std::vector<char> errstr(512);
stream = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr.data(), errstr.size());
if (stream == nullptr)
{
rd_kafka_conf_destroy(conf);
throw Exception("Failed to create consumer handle: " + String(errstr.data()), ErrorCodes::UNKNOWN_EXCEPTION);
}
rd_kafka_poll_set_consumer(stream);
}
StorageKafka::Consumer::~Consumer()
{
close();
}
void StorageKafka::Consumer::subscribe(const Names & topics_to_subscribe)
{
if (stream == nullptr)
throw Exception("Cannot subscribe to topics when consumer is closed", ErrorCodes::UNKNOWN_EXCEPTION);
// Create a list of partitions
auto * topic_list = rd_kafka_topic_partition_list_new(topics_to_subscribe.size());
for (const auto & topic : topics_to_subscribe)
rd_kafka_topic_partition_list_add(topic_list, topic.c_str(), RD_KAFKA_PARTITION_UA);
// Subscribe to requested topics
auto err = rd_kafka_subscribe(stream, topic_list);
if (err)
{
rd_kafka_topic_partition_list_destroy(topic_list);
throw Exception("Failed to subscribe: " + String(rd_kafka_err2str(err)), ErrorCodes::UNKNOWN_EXCEPTION);
}
rd_kafka_topic_partition_list_destroy(topic_list);
}
void StorageKafka::Consumer::unsubscribe()
{
if (stream != nullptr)
rd_kafka_unsubscribe(stream);
}
void StorageKafka::Consumer::close()
{
if (stream != nullptr)
{
rd_kafka_consumer_close(stream);
rd_kafka_destroy(stream);
stream = nullptr;
}
}
void registerStorageKafka(StorageFactory & factory)
{
factory.registerStorage("Kafka", [](const StorageFactory::Arguments & args)

View File

@ -1,24 +1,24 @@
#pragma once
#include <Common/config.h>
#if USE_RDKAFKA
#include <mutex>
#include <ext/shared_ptr_helper.h>
#include <Core/NamesAndTypes.h>
#include <Core/BackgroundSchedulePool.h>
#include <Storages/IStorage.h>
#include <Core/NamesAndTypes.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Storages/IStorage.h>
#include <Poco/Event.h>
#include <Poco/Semaphore.h>
#include <ext/shared_ptr_helper.h>
struct rd_kafka_s;
struct rd_kafka_conf_s;
#include <cppkafka/cppkafka.h>
#include <mutex>
namespace DB
{
class StorageKafka;
using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
/** Implements a Kafka queue table engine that can be used as a persistent queue / buffer,
* or as a basic building block for creating pipelines with a continuous insertion / ETL.
@ -53,22 +53,6 @@ public:
void updateDependencies() override;
private:
/// Each engine typically has one consumer (able to process 1..N partitions)
/// It is however possible to create multiple consumers per table, as long
/// as the total number of consumers is <= number of partitions.
struct Consumer
{
Consumer(struct rd_kafka_conf_s * conf);
~Consumer();
void subscribe(const Names & topics);
void unsubscribe();
void close();
struct rd_kafka_s * stream = nullptr;
};
using ConsumerPtr = std::shared_ptr<Consumer>;
// Configuration and state
String table_name;
String database_name;
@ -100,7 +84,7 @@ private:
BackgroundSchedulePool::TaskHolder task;
std::atomic<bool> stream_cancelled{false};
void consumerConfiguration(struct rd_kafka_conf_s * conf);
cppkafka::Configuration createConsumerConfiguration();
ConsumerPtr claimConsumer();
ConsumerPtr tryClaimConsumer(long wait_ms);
void pushConsumer(ConsumerPtr c);