Merge pull request #57625 from ClickHouse/kafka-zookeeper

Kafka ZooKeeper
This commit is contained in:
János Benjamin Antal 2024-08-08 14:02:27 +00:00 committed by GitHub
commit c9adfe246b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 5635 additions and 2782 deletions

View File

@ -251,6 +251,44 @@ The number of rows in one Kafka message depends on whether the format is row-bas
- For row-based formats the number of rows in one Kafka message can be controlled by setting `kafka_max_rows_per_message`.
- For block-based formats we cannot divide block into smaller parts, but the number of rows in one block can be controlled by general setting [max_block_size](../../../operations/settings/settings.md#setting-max_block_size).
## Experimental engine to store committed offsets in ClickHouse Keeper
If `allow_experimental_kafka_offsets_storage_in_keeper` is enabled, then two more settings can be specified to the Kafka table engine:
- `kafka_keeper_path` specifies the path to the table in ClickHouse Keeper
- `kafka_replica_name` specifies the replica name in ClickHouse Keeper
Either both of the settings must be specified or neither of them. When both of them are specified, then a new, experimental Kafka engine will be used. The new engine doesn't depend on storing the committed offsets in Kafka, but stores them in ClickHouse Keeper. It still tries to commit the offsets to Kafka, but it only depends on those offsets when the table is created. In any other circumstances (table is restarted, or recovered after some error) the offsets stored in ClickHouse Keeper will be used as an offset to continue consuming messages from. Apart from the committed offset, it also stores how many messages were consumed in the last batch, so if the insert fails, the same amount of messages will be consumed, thus enabling deduplication if necessary.
Example:
``` sql
CREATE TABLE experimental_kafka (key UInt64, value UInt64)
ENGINE = Kafka('localhost:19092', 'my-topic', 'my-consumer', 'JSONEachRow')
SETTINGS
kafka_keeper_path = '/clickhouse/{database}/experimental_kafka',
kafka_replica_name = 'r1'
SETTINGS allow_experimental_kafka_offsets_storage_in_keeper=1;
```
Or to utilize the `uuid` and `replica` macros similarly to ReplicatedMergeTree:
``` sql
CREATE TABLE experimental_kafka (key UInt64, value UInt64)
ENGINE = Kafka('localhost:19092', 'my-topic', 'my-consumer', 'JSONEachRow')
SETTINGS
kafka_keeper_path = '/clickhouse/{database}/{uuid}',
kafka_replica_name = '{replica}'
SETTINGS allow_experimental_kafka_offsets_storage_in_keeper=1;
```
### Known limitations
As the new engine is experimental, it is not production ready yet. There are few known limitations of the implementation:
- The biggest limitation is the engine doesn't support direct reading. Reading from the engine using materialized views and writing to the engine work, but direct reading doesn't. As a result, all direct `SELECT` queries will fail.
- Rapidly dropping and recreating the table or specifying the same ClickHouse Keeper path to different engines might cause issues. As best practice you can use the `{uuid}` in `kafka_keeper_path` to avoid clashing paths.
- To make repeatable reads, messages cannot be consumed from multiple partitions on a single thread. On the other hand, the Kafka consumers have to be polled regularly to keep them alive. As a result of these two objectives, we decided to only allow creating multiple consumers if `kafka_thread_per_consumer` is enabled, otherwise it is too complicated to avoid issues regarding polling consumers regularly.
- Consumers created by the new storage engine do not show up in [`system.kafka_consumers`](../../../operations/system-tables/kafka_consumers.md) table.
**See Also**
- [Virtual columns](../../../engines/table-engines/index.md#table_engines-virtual_columns)

View File

@ -690,6 +690,7 @@ class IColumn;
M(UInt64, max_size_to_preallocate_for_joins, 100'000'000, "For how many elements it is allowed to preallocate space in all hash tables in total before join", 0) \
\
M(Bool, kafka_disable_num_consumers_limit, false, "Disable limit on kafka_num_consumers that depends on the number of available CPU cores", 0) \
M(Bool, allow_experimental_kafka_offsets_storage_in_keeper, false, "Allow experimental feature to store Kafka related offsets in ClickHouse Keeper. When enabled a ClickHouse Keeper path and replica name can be specified to the Kafka table engine. As a result instead of the regular Kafka engine, a new type of storage engine will be used that stores the committed offsets primarily in ClickHouse Keeper", 0) \
M(Bool, enable_software_prefetch_in_aggregation, true, "Enable use of software prefetch in aggregation", 0) \
M(Bool, allow_aggregate_partitions_independently, false, "Enable independent aggregation of partitions on separate threads when partition key suits group by key. Beneficial when number of partitions close to number of cores and partitions have roughly the same size", 0) \
M(Bool, force_aggregate_partitions_independently, false, "Force the use of optimization when it is applicable, but heuristics decided not to use it", 0) \

View File

@ -79,6 +79,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"restore_replace_external_engines_to_null", false, false, "New setting."},
{"input_format_json_max_depth", 1000000, 1000, "It was unlimited in previous versions, but that was unsafe."},
{"merge_tree_min_bytes_per_task_for_remote_reading", 4194304, 2097152, "Value is unified with `filesystem_prefetch_min_bytes_for_single_read_task`"},
{"allow_experimental_kafka_offsets_storage_in_keeper", false, false, "Allow the usage of experimental Kafka storage engine that stores the committed offsets in ClickHouse Keeper"},
{"allow_archive_path_syntax", true, true, "Added new setting to allow disabling archive path syntax."},
{"allow_experimental_time_series_table", false, false, "Added new setting to allow the TimeSeries table engine"},
{"enable_analyzer", 1, 1, "Added an alias to a setting `allow_experimental_analyzer`."},

View File

@ -0,0 +1,480 @@
#include <Storages/Kafka/KafkaConfigLoader.h>
#include <Access/KerberosInit.h>
#include <Storages/Kafka/StorageKafka.h>
#include <Storages/Kafka/StorageKafka2.h>
#include <Storages/Kafka/parseSyslogLevel.h>
#include <boost/algorithm/string/replace.hpp>
#include <Common/CurrentMetrics.h>
#include <Common/NamedCollections/NamedCollectionsFactory.h>
#include <Common/ThreadStatus.h>
#include <Common/config_version.h>
#include <Common/setThreadName.h>
namespace CurrentMetrics
{
extern const Metric KafkaLibrdkafkaThreads;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
template <typename TKafkaStorage>
struct KafkaInterceptors
{
static rd_kafka_resp_err_t rdKafkaOnThreadStart(rd_kafka_t *, rd_kafka_thread_type_t thread_type, const char *, void * ctx);
static rd_kafka_resp_err_t rdKafkaOnThreadExit(rd_kafka_t *, rd_kafka_thread_type_t, const char *, void * ctx);
static rd_kafka_resp_err_t
rdKafkaOnNew(rd_kafka_t * rk, const rd_kafka_conf_t *, void * ctx, char * /*errstr*/, size_t /*errstr_size*/);
static rd_kafka_resp_err_t rdKafkaOnConfDup(
rd_kafka_conf_t * new_conf, const rd_kafka_conf_t * /*old_conf*/, size_t /*filter_cnt*/, const char ** /*filter*/, void * ctx);
};
template <typename TStorageKafka>
rd_kafka_resp_err_t
KafkaInterceptors<TStorageKafka>::rdKafkaOnThreadStart(rd_kafka_t *, rd_kafka_thread_type_t thread_type, const char *, void * ctx)
{
TStorageKafka * self = reinterpret_cast<TStorageKafka *>(ctx);
CurrentMetrics::add(CurrentMetrics::KafkaLibrdkafkaThreads, 1);
const auto & storage_id = self->getStorageID();
const auto & table = storage_id.getTableName();
switch (thread_type)
{
case RD_KAFKA_THREAD_MAIN:
setThreadName(("rdk:m/" + table.substr(0, 9)).c_str());
break;
case RD_KAFKA_THREAD_BACKGROUND:
setThreadName(("rdk:bg/" + table.substr(0, 8)).c_str());
break;
case RD_KAFKA_THREAD_BROKER:
setThreadName(("rdk:b/" + table.substr(0, 9)).c_str());
break;
}
/// Create ThreadStatus to track memory allocations from librdkafka threads.
//
/// And store them in a separate list (thread_statuses) to make sure that they will be destroyed,
/// regardless how librdkafka calls the hooks.
/// But this can trigger use-after-free if librdkafka will not destroy threads after rd_kafka_wait_destroyed()
auto thread_status = std::make_shared<ThreadStatus>();
std::lock_guard lock(self->thread_statuses_mutex);
self->thread_statuses.emplace_back(std::move(thread_status));
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
template <typename TStorageKafka>
rd_kafka_resp_err_t KafkaInterceptors<TStorageKafka>::rdKafkaOnThreadExit(rd_kafka_t *, rd_kafka_thread_type_t, const char *, void * ctx)
{
TStorageKafka * self = reinterpret_cast<TStorageKafka *>(ctx);
CurrentMetrics::sub(CurrentMetrics::KafkaLibrdkafkaThreads, 1);
std::lock_guard lock(self->thread_statuses_mutex);
const auto it = std::find_if(
self->thread_statuses.begin(),
self->thread_statuses.end(),
[](const auto & thread_status_ptr) { return thread_status_ptr.get() == current_thread; });
if (it == self->thread_statuses.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No thread status for this librdkafka thread.");
self->thread_statuses.erase(it);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
template <typename TStorageKafka>
rd_kafka_resp_err_t KafkaInterceptors<TStorageKafka>::rdKafkaOnNew(
rd_kafka_t * rk, const rd_kafka_conf_t *, void * ctx, char * /*errstr*/, size_t /*errstr_size*/)
{
TStorageKafka * self = reinterpret_cast<TStorageKafka *>(ctx);
rd_kafka_resp_err_t status;
status = rd_kafka_interceptor_add_on_thread_start(rk, "init-thread", rdKafkaOnThreadStart, ctx);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
{
LOG_ERROR(self->log, "Cannot set on thread start interceptor due to {} error", status);
return status;
}
status = rd_kafka_interceptor_add_on_thread_exit(rk, "exit-thread", rdKafkaOnThreadExit, ctx);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
LOG_ERROR(self->log, "Cannot set on thread exit interceptor due to {} error", status);
return status;
}
template <typename TStorageKafka>
rd_kafka_resp_err_t KafkaInterceptors<TStorageKafka>::rdKafkaOnConfDup(
rd_kafka_conf_t * new_conf, const rd_kafka_conf_t * /*old_conf*/, size_t /*filter_cnt*/, const char ** /*filter*/, void * ctx)
{
TStorageKafka * self = reinterpret_cast<TStorageKafka *>(ctx);
rd_kafka_resp_err_t status;
// cppkafka copies configuration multiple times
status = rd_kafka_conf_interceptor_add_on_conf_dup(new_conf, "init", rdKafkaOnConfDup, ctx);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
{
LOG_ERROR(self->log, "Cannot set on conf dup interceptor due to {} error", status);
return status;
}
status = rd_kafka_conf_interceptor_add_on_new(new_conf, "init", rdKafkaOnNew, ctx);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
LOG_ERROR(self->log, "Cannot set on conf new interceptor due to {} error", status);
return status;
}
template struct KafkaInterceptors<StorageKafka>;
template struct KafkaInterceptors<StorageKafka2>;
namespace
{
void setKafkaConfigValue(cppkafka::Configuration & kafka_config, const String & key, const String & value)
{
/// "log_level" has valid underscore, the remaining librdkafka setting use dot.separated.format which isn't acceptable for XML.
/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
const String setting_name_in_kafka_config = (key == "log_level") ? key : boost::replace_all_copy(key, "_", ".");
kafka_config.set(setting_name_in_kafka_config, value);
}
void loadConfigProperty(
cppkafka::Configuration & kafka_config,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix,
const String & tag)
{
const String property_path = config_prefix + "." + tag;
const String property_value = config.getString(property_path);
setKafkaConfigValue(kafka_config, tag, property_value);
}
void loadNamedCollectionConfig(cppkafka::Configuration & kafka_config, const String & collection_name, const String & config_prefix)
{
const auto & collection = NamedCollectionFactory::instance().get(collection_name);
for (const auto & key : collection->getKeys(-1, config_prefix))
{
// Cut prefix with '.' before actual config tag.
const auto param_name = key.substr(config_prefix.size() + 1);
setKafkaConfigValue(kafka_config, param_name, collection->get<String>(key));
}
}
void loadLegacyTopicConfig(
cppkafka::Configuration & kafka_config,
const Poco::Util::AbstractConfiguration & config,
const String & collection_name,
const String & config_prefix)
{
if (!collection_name.empty())
{
loadNamedCollectionConfig(kafka_config, collection_name, config_prefix);
return;
}
Poco::Util::AbstractConfiguration::Keys tags;
config.keys(config_prefix, tags);
for (const auto & tag : tags)
{
loadConfigProperty(kafka_config, config, config_prefix, tag);
}
}
/// Read server configuration into cppkafa configuration, used by new per-topic configuration
void loadTopicConfig(
cppkafka::Configuration & kafka_config,
const Poco::Util::AbstractConfiguration & config,
const String & collection_name,
const String & config_prefix,
const String & topic)
{
if (!collection_name.empty())
{
const auto topic_prefix = fmt::format("{}.{}", config_prefix, KafkaConfigLoader::CONFIG_KAFKA_TOPIC_TAG);
const auto & collection = NamedCollectionFactory::instance().get(collection_name);
for (const auto & key : collection->getKeys(1, config_prefix))
{
/// Only consider key <kafka_topic>. Multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
if (!key.starts_with(topic_prefix))
continue;
const String kafka_topic_path = config_prefix + "." + key;
const String kafka_topic_name_path = kafka_topic_path + "." + KafkaConfigLoader::CONFIG_NAME_TAG;
if (topic == collection->get<String>(kafka_topic_name_path))
/// Found it! Now read the per-topic configuration into cppkafka.
loadNamedCollectionConfig(kafka_config, collection_name, kafka_topic_path);
}
}
else
{
/// Read all tags one level below <kafka>
Poco::Util::AbstractConfiguration::Keys tags;
config.keys(config_prefix, tags);
for (const auto & tag : tags)
{
if (tag == KafkaConfigLoader::CONFIG_NAME_TAG)
continue; // ignore <name>, it is used to match topic configurations
loadConfigProperty(kafka_config, config, config_prefix, tag);
}
}
}
/// Read server configuration into cppkafka configuration, used by global configuration and by legacy per-topic configuration
void loadFromConfig(
cppkafka::Configuration & kafka_config, const KafkaConfigLoader::LoadConfigParams & params, const String & config_prefix)
{
if (!params.collection_name.empty())
{
loadNamedCollectionConfig(kafka_config, params.collection_name, config_prefix);
return;
}
/// Read all tags one level below <kafka>
Poco::Util::AbstractConfiguration::Keys tags;
params.config.keys(config_prefix, tags);
for (const auto & tag : tags)
{
if (tag == KafkaConfigLoader::CONFIG_KAFKA_PRODUCER_TAG || tag == KafkaConfigLoader::CONFIG_KAFKA_CONSUMER_TAG)
/// Do not load consumer/producer properties, since they should be separated by different configuration objects.
continue;
if (tag.starts_with(
KafkaConfigLoader::CONFIG_KAFKA_TOPIC_TAG)) /// multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
{
// Update consumer topic-specific configuration (new syntax). Example with topics "football" and "baseball":
// <kafka>
// <kafka_topic>
// <name>football</name>
// <retry_backoff_ms>250</retry_backoff_ms>
// <fetch_min_bytes>5000</fetch_min_bytes>
// </kafka_topic>
// <kafka_topic>
// <name>baseball</name>
// <retry_backoff_ms>300</retry_backoff_ms>
// <fetch_min_bytes>2000</fetch_min_bytes>
// </kafka_topic>
// </kafka>
// Advantages: The period restriction no longer applies (e.g. <name>sports.football</name> will work), everything
// Kafka-related is below <kafka>.
for (const auto & topic : params.topics)
{
/// Read topic name between <name>...</name>
const String kafka_topic_path = config_prefix + "." + tag;
const String kafka_topic_name_path = kafka_topic_path + "." + KafkaConfigLoader::CONFIG_NAME_TAG;
const String topic_name = params.config.getString(kafka_topic_name_path);
if (topic_name != topic)
continue;
loadTopicConfig(kafka_config, params.config, params.collection_name, kafka_topic_path, topic);
}
continue;
}
if (tag.starts_with(KafkaConfigLoader::CONFIG_KAFKA_TAG))
/// skip legacy configuration per topic e.g. <kafka_TOPIC_NAME>.
/// it will be processed is a separate function
continue;
// Update configuration from the configuration. Example:
// <kafka>
// <retry_backoff_ms>250</retry_backoff_ms>
// <fetch_min_bytes>100000</fetch_min_bytes>
// </kafka>
loadConfigProperty(kafka_config, params.config, config_prefix, tag);
}
}
void loadLegacyConfigSyntax(
cppkafka::Configuration & kafka_config,
const Poco::Util::AbstractConfiguration & config,
const String & collection_name,
const Names & topics)
{
for (const auto & topic : topics)
{
const String kafka_topic_path = KafkaConfigLoader::CONFIG_KAFKA_TAG + "." + KafkaConfigLoader::CONFIG_KAFKA_TAG + "_" + topic;
loadLegacyTopicConfig(kafka_config, config, collection_name, kafka_topic_path);
}
}
void loadConsumerConfig(cppkafka::Configuration & kafka_config, const KafkaConfigLoader::LoadConfigParams & params)
{
const String consumer_path = KafkaConfigLoader::CONFIG_KAFKA_TAG + "." + KafkaConfigLoader::CONFIG_KAFKA_CONSUMER_TAG;
loadLegacyConfigSyntax(kafka_config, params.config, params.collection_name, params.topics);
// A new syntax has higher priority
loadFromConfig(kafka_config, params, consumer_path);
}
void loadProducerConfig(cppkafka::Configuration & kafka_config, const KafkaConfigLoader::LoadConfigParams & params)
{
const String producer_path = KafkaConfigLoader::CONFIG_KAFKA_TAG + "." + KafkaConfigLoader::CONFIG_KAFKA_PRODUCER_TAG;
loadLegacyConfigSyntax(kafka_config, params.config, params.collection_name, params.topics);
// A new syntax has higher priority
loadFromConfig(kafka_config, params, producer_path);
}
template <typename TKafkaStorage>
void updateGlobalConfiguration(
cppkafka::Configuration & kafka_config, TKafkaStorage & storage, const KafkaConfigLoader::LoadConfigParams & params)
{
loadFromConfig(kafka_config, params, KafkaConfigLoader::CONFIG_KAFKA_TAG);
#if USE_KRB5
if (kafka_config.has_property("sasl.kerberos.kinit.cmd"))
LOG_WARNING(params.log, "sasl.kerberos.kinit.cmd configuration parameter is ignored.");
kafka_config.set("sasl.kerberos.kinit.cmd", "");
kafka_config.set("sasl.kerberos.min.time.before.relogin", "0");
if (kafka_config.has_property("sasl.kerberos.keytab") && kafka_config.has_property("sasl.kerberos.principal"))
{
String keytab = kafka_config.get("sasl.kerberos.keytab");
String principal = kafka_config.get("sasl.kerberos.principal");
LOG_DEBUG(params.log, "Running KerberosInit");
try
{
kerberosInit(keytab, principal);
}
catch (const Exception & e)
{
LOG_ERROR(params.log, "KerberosInit failure: {}", getExceptionMessage(e, false));
}
LOG_DEBUG(params.log, "Finished KerberosInit");
}
#else // USE_KRB5
if (kafka_config.has_property("sasl.kerberos.keytab") || kafka_config.has_property("sasl.kerberos.principal"))
LOG_WARNING(params.log, "Ignoring Kerberos-related parameters because ClickHouse was built without krb5 library support.");
#endif // USE_KRB5
// No need to add any prefix, messages can be distinguished
kafka_config.set_log_callback(
[log = params.log](cppkafka::KafkaHandleBase & handle, int level, const std::string & facility, const std::string & message)
{
auto [poco_level, client_logs_level] = parseSyslogLevel(level);
const auto & kafka_object_config = handle.get_configuration();
const std::string client_id_key{"client.id"};
chassert(kafka_object_config.has_property(client_id_key) && "Kafka configuration doesn't have expected client.id set");
LOG_IMPL(
log,
client_logs_level,
poco_level,
"[client.id:{}] [rdk:{}] {}",
kafka_object_config.get(client_id_key),
facility,
message);
});
/// NOTE: statistics should be consumed, otherwise it creates too much
/// entries in the queue, that leads to memory leak and slow shutdown.
if (!kafka_config.has_property("statistics.interval.ms"))
{
// every 3 seconds by default. set to 0 to disable.
kafka_config.set("statistics.interval.ms", "3000");
}
// Configure interceptor to change thread name
//
// TODO: add interceptors support into the cppkafka.
// XXX: rdkafka uses pthread_set_name_np(), but glibc-compatibility overrides it to noop.
{
// This should be safe, since we wait the rdkafka object anyway.
void * self = static_cast<void *>(&storage);
int status;
status
= rd_kafka_conf_interceptor_add_on_new(kafka_config.get_handle(), "init", KafkaInterceptors<TKafkaStorage>::rdKafkaOnNew, self);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
LOG_ERROR(params.log, "Cannot set new interceptor due to {} error", status);
// cppkafka always copy the configuration
status = rd_kafka_conf_interceptor_add_on_conf_dup(
kafka_config.get_handle(), "init", KafkaInterceptors<TKafkaStorage>::rdKafkaOnConfDup, self);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
LOG_ERROR(params.log, "Cannot set dup conf interceptor due to {} error", status);
}
}
}
template <typename TKafkaStorage>
cppkafka::Configuration KafkaConfigLoader::getConsumerConfiguration(TKafkaStorage & storage, const ConsumerConfigParams & params)
{
cppkafka::Configuration conf;
conf.set("metadata.broker.list", params.brokers);
conf.set("group.id", params.group);
if (params.multiple_consumers)
conf.set("client.id", fmt::format("{}-{}", params.client_id, params.consumer_number));
else
conf.set("client.id", params.client_id);
conf.set("client.software.name", VERSION_NAME);
conf.set("client.software.version", VERSION_DESCRIBE);
conf.set("auto.offset.reset", "earliest"); // If no offset stored for this group, read all messages from the start
// that allows to prevent fast draining of the librdkafka queue
// during building of single insert block. Improves performance
// significantly, but may lead to bigger memory consumption.
size_t default_queued_min_messages = 100000; // must be greater than or equal to default
size_t max_allowed_queued_min_messages = 10000000; // must be less than or equal to max allowed value
conf.set(
"queued.min.messages", std::min(std::max(params.max_block_size, default_queued_min_messages), max_allowed_queued_min_messages));
updateGlobalConfiguration(conf, storage, params);
loadConsumerConfig(conf, params);
// those settings should not be changed by users.
conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished
conf.set("enable.auto.offset.store", "false"); // Update offset automatically - to commit them all at once.
conf.set("enable.partition.eof", "false"); // Ignore EOF messages
for (auto & property : conf.get_all())
{
LOG_TRACE(params.log, "Consumer set property {}:{}", property.first, property.second);
}
return conf;
}
template cppkafka::Configuration
KafkaConfigLoader::getConsumerConfiguration<StorageKafka>(StorageKafka & storage, const ConsumerConfigParams & params);
template cppkafka::Configuration
KafkaConfigLoader::getConsumerConfiguration<StorageKafka2>(StorageKafka2 & storage, const ConsumerConfigParams & params);
template <typename TKafkaStorage>
cppkafka::Configuration KafkaConfigLoader::getProducerConfiguration(TKafkaStorage & storage, const ProducerConfigParams & params)
{
cppkafka::Configuration conf;
conf.set("metadata.broker.list", params.brokers);
conf.set("client.id", params.client_id);
conf.set("client.software.name", VERSION_NAME);
conf.set("client.software.version", VERSION_DESCRIBE);
updateGlobalConfiguration(conf, storage, params);
loadProducerConfig(conf, params);
for (auto & property : conf.get_all())
{
LOG_TRACE(params.log, "Producer set property {}:{}", property.first, property.second);
}
return conf;
}
template cppkafka::Configuration
KafkaConfigLoader::getProducerConfiguration<StorageKafka>(StorageKafka & storage, const ProducerConfigParams & params);
template cppkafka::Configuration
KafkaConfigLoader::getProducerConfiguration<StorageKafka2>(StorageKafka2 & storage, const ProducerConfigParams & params);
}

View File

@ -0,0 +1,54 @@
#pragma once
#include <base/types.h>
#include <cppkafka/cppkafka.h>
#include <Core/Names.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Logger.h>
namespace DB
{
struct KafkaSettings;
class VirtualColumnsDescription;
struct KafkaConfigLoader
{
static inline const String CONFIG_KAFKA_TAG = "kafka";
static inline const String CONFIG_KAFKA_TOPIC_TAG = "kafka_topic";
static inline const String CONFIG_NAME_TAG = "name";
static inline const String CONFIG_KAFKA_CONSUMER_TAG = "consumer";
static inline const String CONFIG_KAFKA_PRODUCER_TAG = "producer";
using LogCallback = cppkafka::Configuration::LogCallback;
struct LoadConfigParams
{
const Poco::Util::AbstractConfiguration & config;
String & collection_name;
const Names & topics;
LoggerPtr & log;
};
struct ConsumerConfigParams : public LoadConfigParams
{
String brokers;
String group;
bool multiple_consumers;
size_t consumer_number;
String client_id;
size_t max_block_size;
};
struct ProducerConfigParams : public LoadConfigParams
{
String brokers;
String client_id;
};
template <typename TKafkaStorage>
static cppkafka::Configuration getConsumerConfiguration(TKafkaStorage & storage, const ConsumerConfigParams & params);
template <typename TKafkaStorage>
static cppkafka::Configuration getProducerConfiguration(TKafkaStorage & storage, const ProducerConfigParams & params);
};
}

View File

@ -9,6 +9,7 @@
#include <algorithm>
#include <Common/CurrentMetrics.h>
#include <Storages/Kafka/StorageKafkaUtils.h>
#include <Common/ProfileEvents.h>
#include <base/defines.h>
@ -20,13 +21,12 @@ namespace CurrentMetrics
namespace ProfileEvents
{
extern const Event KafkaRebalanceRevocations;
extern const Event KafkaRebalanceAssignments;
extern const Event KafkaRebalanceErrors;
extern const Event KafkaMessagesPolled;
extern const Event KafkaCommitFailures;
extern const Event KafkaCommits;
extern const Event KafkaConsumerErrors;
extern const Event KafkaRebalanceRevocations;
extern const Event KafkaRebalanceAssignments;
extern const Event KafkaRebalanceErrors;
extern const Event KafkaMessagesPolled;
extern const Event KafkaCommitFailures;
extern const Event KafkaCommits;
}
namespace DB
@ -199,44 +199,9 @@ KafkaConsumer::~KafkaConsumer()
// https://github.com/confluentinc/confluent-kafka-go/issues/189 etc.
void KafkaConsumer::drain()
{
auto start_time = std::chrono::steady_clock::now();
cppkafka::Error last_error(RD_KAFKA_RESP_ERR_NO_ERROR);
while (true)
{
auto msg = consumer->poll(100ms);
if (!msg)
break;
auto error = msg.get_error();
if (error)
{
if (msg.is_eof() || error == last_error)
{
break;
}
else
{
LOG_ERROR(log, "Error during draining: {}", error);
setExceptionInfo(error);
}
}
// i don't stop draining on first error,
// only if it repeats once again sequentially
last_error = error;
auto ts = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(ts-start_time) > DRAIN_TIMEOUT_MS)
{
LOG_ERROR(log, "Timeout during draining.");
break;
}
}
StorageKafkaUtils::drainConsumer(*consumer, DRAIN_TIMEOUT_MS, log, [this](const cppkafka::Error & err) { setExceptionInfo(err); });
}
void KafkaConsumer::commit()
{
auto print_offsets = [this] (const char * prefix, const cppkafka::TopicPartitionList & offsets)
@ -409,7 +374,7 @@ void KafkaConsumer::resetToLastCommitted(const char * msg)
{
if (!assignment.has_value() || assignment->empty())
{
LOG_TRACE(log, "Not assignned. Can't reset to last committed position.");
LOG_TRACE(log, "Not assigned. Can't reset to last committed position.");
return;
}
auto committed_offset = consumer->get_offsets_committed(consumer->get_assignment());
@ -473,7 +438,7 @@ ReadBufferPtr KafkaConsumer::consume()
// If we're doing a manual select then it's better to get something after a wait, then immediate nothing.
if (!assignment.has_value())
{
waited_for_assignment += poll_timeout; // slightly innaccurate, but rough calculation is ok.
waited_for_assignment += poll_timeout; // slightly inaccurate, but rough calculation is ok.
if (waited_for_assignment < MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS)
{
continue;
@ -535,26 +500,12 @@ ReadBufferPtr KafkaConsumer::getNextMessage()
return getNextMessage();
}
size_t KafkaConsumer::filterMessageErrors()
void KafkaConsumer::filterMessageErrors()
{
assert(current == messages.begin());
size_t skipped = std::erase_if(messages, [this](auto & message)
{
if (auto error = message.get_error())
{
ProfileEvents::increment(ProfileEvents::KafkaConsumerErrors);
LOG_ERROR(log, "Consumer error: {}", error);
setExceptionInfo(error);
return true;
}
return false;
});
if (skipped)
LOG_ERROR(log, "There were {} messages with an error", skipped);
return skipped;
StorageKafkaUtils::eraseMessageErrors(messages, log, [this](const cppkafka::Error & err) { setExceptionInfo(err); });
current = messages.begin();
}
void KafkaConsumer::resetIfStopped()

View File

@ -1,14 +1,12 @@
#pragma once
#include <boost/circular_buffer.hpp>
#include <fmt/ostream.h>
#include <Core/Names.h>
#include <base/types.h>
#include <IO/ReadBuffer.h>
#include <cppkafka/cppkafka.h>
#include <cppkafka/topic_partition.h>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics
@ -193,12 +191,8 @@ private:
void drain();
void cleanUnprocessed();
void resetIfStopped();
/// Return number of messages with an error.
size_t filterMessageErrors();
void filterMessageErrors();
ReadBufferPtr getNextMessage();
};
}
template <> struct fmt::formatter<cppkafka::TopicPartition> : fmt::ostream_formatter {};
template <> struct fmt::formatter<cppkafka::Error> : fmt::ostream_formatter {};

View File

@ -0,0 +1,384 @@
#include <Storages/Kafka/KafkaConsumer2.h>
#include <fmt/ranges.h>
#include <cppkafka/exceptions.h>
#include <cppkafka/topic_partition.h>
#include <cppkafka/cppkafka.h>
#include <cppkafka/topic_partition_list.h>
#include <fmt/ostream.h>
#include <boost/algorithm/string/join.hpp>
#include <IO/ReadBufferFromMemory.h>
#include <Storages/Kafka/StorageKafkaUtils.h>
#include <Common/logger_useful.h>
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <algorithm>
#include <iterator>
namespace CurrentMetrics
{
extern const Metric KafkaAssignedPartitions;
extern const Metric KafkaConsumersWithAssignment;
}
namespace ProfileEvents
{
extern const Event KafkaRebalanceRevocations;
extern const Event KafkaRebalanceAssignments;
extern const Event KafkaRebalanceErrors;
extern const Event KafkaMessagesPolled;
extern const Event KafkaCommitFailures;
extern const Event KafkaCommits;
}
namespace DB
{
using namespace std::chrono_literals;
static constexpr auto EVENT_POLL_TIMEOUT = 50ms;
static constexpr auto DRAIN_TIMEOUT_MS = 5000ms;
bool KafkaConsumer2::TopicPartition::operator<(const TopicPartition & other) const
{
return std::tie(topic, partition_id, offset) < std::tie(other.topic, other.partition_id, other.offset);
}
KafkaConsumer2::KafkaConsumer2(
ConsumerPtr consumer_,
LoggerPtr log_,
size_t max_batch_size,
size_t poll_timeout_,
const std::atomic<bool> & stopped_,
const Names & topics_)
: consumer(consumer_)
, log(log_)
, batch_size(max_batch_size)
, poll_timeout(poll_timeout_)
, stopped(stopped_)
, current(messages.begin())
, topics(topics_)
{
// called (synchronously, during poll) when we enter the consumer group
consumer->set_assignment_callback(
[this](const cppkafka::TopicPartitionList & topic_partitions)
{
CurrentMetrics::add(CurrentMetrics::KafkaAssignedPartitions, topic_partitions.size());
ProfileEvents::increment(ProfileEvents::KafkaRebalanceAssignments);
if (topic_partitions.empty())
{
LOG_INFO(log, "Got empty assignment: Not enough partitions in the topic for all consumers?");
}
else
{
LOG_TRACE(log, "Topics/partitions assigned: {}", topic_partitions);
CurrentMetrics::add(CurrentMetrics::KafkaConsumersWithAssignment, 1);
}
chassert(!assignment.has_value());
assignment.emplace();
assignment->reserve(topic_partitions.size());
needs_offset_update = true;
for (const auto & topic_partition : topic_partitions)
{
assignment->push_back(
TopicPartition{topic_partition.get_topic(), topic_partition.get_partition(), topic_partition.get_offset()});
}
// We need to initialize the queues here in order to detach them from the consumer queue. Otherwise `pollEvents` might eventually poll actual messages also.
initializeQueues(topic_partitions);
});
// called (synchronously, during poll) when we leave the consumer group
consumer->set_revocation_callback(
[this](const cppkafka::TopicPartitionList & topic_partitions)
{
CurrentMetrics::sub(CurrentMetrics::KafkaAssignedPartitions, topic_partitions.size());
ProfileEvents::increment(ProfileEvents::KafkaRebalanceRevocations);
// Rebalance is happening now, and now we have a chance to finish the work
// with topics/partitions we were working with before rebalance
LOG_TRACE(log, "Rebalance initiated. Revoking partitions: {}", topic_partitions);
if (!topic_partitions.empty())
{
CurrentMetrics::sub(CurrentMetrics::KafkaConsumersWithAssignment, 1);
}
assignment.reset();
queues.clear();
needs_offset_update = true;
});
consumer->set_rebalance_error_callback(
[this](cppkafka::Error err)
{
LOG_ERROR(log, "Rebalance error: {}", err);
ProfileEvents::increment(ProfileEvents::KafkaRebalanceErrors);
});
}
KafkaConsumer2::~KafkaConsumer2()
{
try
{
if (!consumer->get_subscription().empty())
{
try
{
consumer->unsubscribe();
}
catch (const cppkafka::HandleException & e)
{
LOG_ERROR(log, "Error during unsubscribe: {}", e.what());
}
drainConsumerQueue();
}
}
catch (const cppkafka::HandleException & e)
{
LOG_ERROR(log, "Error while destructing consumer: {}", e.what());
}
}
// Needed to drain rest of the messages / queued callback calls from the consumer after unsubscribe, otherwise consumer
// will hang on destruction. Partition queues doesn't have to be attached as events are not handled by those queues.
// see https://github.com/edenhill/librdkafka/issues/2077
// https://github.com/confluentinc/confluent-kafka-go/issues/189 etc.
void KafkaConsumer2::drainConsumerQueue()
{
StorageKafkaUtils::drainConsumer(*consumer, DRAIN_TIMEOUT_MS, log);
}
void KafkaConsumer2::pollEvents()
{
static constexpr auto max_tries = 5;
for (auto i = 0; i < max_tries; ++i)
{
auto msg = consumer->poll(EVENT_POLL_TIMEOUT);
if (!msg)
return;
// All the partition queues are detached, so the consumer shouldn't be able to poll any real messages
const auto err = msg.get_error();
chassert(RD_KAFKA_RESP_ERR_NO_ERROR != err.get_error() && "Consumer returned a message when it was not expected");
LOG_ERROR(log, "Consumer received error while polling events, code {}, error '{}'", err.get_error(), err.to_string());
}
};
bool KafkaConsumer2::polledDataUnusable(const TopicPartition & topic_partition) const
{
const auto different_topic_partition = current == messages.end()
? false
: (current->get_topic() != topic_partition.topic || current->get_partition() != topic_partition.partition_id);
return different_topic_partition;
}
KafkaConsumer2::TopicPartitions const * KafkaConsumer2::getKafkaAssignment() const
{
if (assignment.has_value())
{
return &*assignment;
}
return nullptr;
}
void KafkaConsumer2::updateOffsets(const TopicPartitions & topic_partitions)
{
cppkafka::TopicPartitionList original_topic_partitions;
original_topic_partitions.reserve(topic_partitions.size());
std::transform(
topic_partitions.begin(),
topic_partitions.end(),
std::back_inserter(original_topic_partitions),
[](const TopicPartition & tp)
{
return cppkafka::TopicPartition{tp.topic, tp.partition_id, tp.offset};
});
initializeQueues(original_topic_partitions);
needs_offset_update = false;
stalled_status = StalledStatus::NOT_STALLED;
}
void KafkaConsumer2::initializeQueues(const cppkafka::TopicPartitionList & topic_partitions)
{
queues.clear();
messages.clear();
current = messages.end();
// cppkafka itself calls assign(), but in order to detach the queues here we have to do the assignment manually. Later on we have to reassign the topic partitions with correct offsets.
consumer->assign(topic_partitions);
for (const auto & topic_partition : topic_partitions)
// This will also detach the partition queues from the consumer, thus the messages won't be forwarded without attaching them manually
queues.emplace(
TopicPartition{topic_partition.get_topic(), topic_partition.get_partition(), topic_partition.get_offset()},
consumer->get_partition_queue(topic_partition));
}
// it do the poll when needed
ReadBufferPtr KafkaConsumer2::consume(const TopicPartition & topic_partition, const std::optional<int64_t> & message_count)
{
resetIfStopped();
if (polledDataUnusable(topic_partition))
return nullptr;
if (hasMorePolledMessages())
{
if (auto next_message = getNextMessage(); next_message)
return next_message;
}
while (true)
{
stalled_status = StalledStatus::NO_MESSAGES_RETURNED;
auto & queue_to_poll_from = queues.at(topic_partition);
LOG_TRACE(log, "Batch size {}, offset {}", batch_size, topic_partition.offset);
const auto messages_to_pull = message_count.value_or(batch_size);
/// Don't drop old messages immediately, since we may need them for virtual columns.
auto new_messages = queue_to_poll_from.consume_batch(messages_to_pull, std::chrono::milliseconds(poll_timeout));
resetIfStopped();
if (stalled_status == StalledStatus::CONSUMER_STOPPED)
{
return nullptr;
}
if (new_messages.empty())
{
LOG_TRACE(log, "Stalled");
return nullptr;
}
else
{
messages = std::move(new_messages);
current = messages.begin();
LOG_TRACE(
log,
"Polled batch of {} messages. Offsets position: {}",
messages.size(),
consumer->get_offsets_position(consumer->get_assignment()));
break;
}
}
filterMessageErrors();
if (current == messages.end())
{
LOG_ERROR(log, "Only errors left");
stalled_status = StalledStatus::ERRORS_RETURNED;
return nullptr;
}
ProfileEvents::increment(ProfileEvents::KafkaMessagesPolled, messages.size());
stalled_status = StalledStatus::NOT_STALLED;
return getNextMessage();
}
void KafkaConsumer2::commit(const TopicPartition & topic_partition)
{
static constexpr auto max_retries = 5;
bool committed = false;
LOG_TEST(
log,
"Trying to commit offset {} to Kafka for topic-partition [{}:{}]",
topic_partition.offset,
topic_partition.topic,
topic_partition.partition_id);
const auto topic_partition_list = std::vector{cppkafka::TopicPartition{
topic_partition.topic,
topic_partition.partition_id,
topic_partition.offset,
}};
for (auto try_count = 0; try_count < max_retries && !committed; ++try_count)
{
try
{
// See https://github.com/edenhill/librdkafka/issues/1470
// broker may reject commit if during offsets.commit.timeout.ms (5000 by default),
// there were not enough replicas available for the __consumer_offsets topic.
// also some other temporary issues like client-server connectivity problems are possible
consumer->commit(topic_partition_list);
committed = true;
LOG_INFO(
log,
"Committed offset {} to Kafka for topic-partition [{}:{}]",
topic_partition.offset,
topic_partition.topic,
topic_partition.partition_id);
}
catch (const cppkafka::HandleException & e)
{
// If there were actually no offsets to commit, return. Retrying won't solve
// anything here
if (e.get_error() == RD_KAFKA_RESP_ERR__NO_OFFSET)
committed = true;
else
LOG_ERROR(log, "Exception during attempt to commit to Kafka: {}", e.what());
}
}
if (!committed)
{
// The failure is not the biggest issue, it only counts when a table is dropped and recreated, otherwise the offsets are taken from keeper.
ProfileEvents::increment(ProfileEvents::KafkaCommitFailures);
LOG_ERROR(log, "All commit attempts failed");
}
else
{
ProfileEvents::increment(ProfileEvents::KafkaCommits);
}
}
void KafkaConsumer2::subscribeIfNotSubscribedYet()
{
if (likely(is_subscribed))
return;
consumer->subscribe(topics);
is_subscribed = true;
LOG_DEBUG(log, "Subscribed.");
}
ReadBufferPtr KafkaConsumer2::getNextMessage()
{
while (current != messages.end())
{
const auto * data = current->get_payload().get_data();
size_t size = current->get_payload().get_size();
++current;
// `data` can be nullptr on case of the Kafka message has empty payload
if (data)
return std::make_shared<ReadBufferFromMemory>(data, size);
}
return nullptr;
}
void KafkaConsumer2::filterMessageErrors()
{
assert(current == messages.begin());
StorageKafkaUtils::eraseMessageErrors(messages, log);
current = messages.begin();
}
void KafkaConsumer2::resetIfStopped()
{
if (stopped)
{
stalled_status = StalledStatus::CONSUMER_STOPPED;
}
}
}

View File

@ -0,0 +1,162 @@
#pragma once
#include <Core/Names.h>
#include <IO/ReadBuffer.h>
#include <Common/CurrentMetrics.h>
#include <Common/SipHash.h>
#include <base/types.h>
#include <cppkafka/cppkafka.h>
#include <cppkafka/topic_partition.h>
#include <cppkafka/topic_partition_list.h>
namespace CurrentMetrics
{
extern const Metric KafkaConsumers;
}
namespace Poco
{
class Logger;
}
namespace DB
{
using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
class KafkaConsumer2
{
public:
static inline constexpr int INVALID_OFFSET = RD_KAFKA_OFFSET_INVALID;
static inline constexpr int BEGINNING_OFFSET = RD_KAFKA_OFFSET_BEGINNING;
static inline constexpr int END_OFFSET = RD_KAFKA_OFFSET_END;
struct TopicPartition
{
String topic;
int32_t partition_id;
int64_t offset{INVALID_OFFSET};
bool operator==(const TopicPartition &) const = default;
bool operator<(const TopicPartition & other) const;
};
using TopicPartitions = std::vector<TopicPartition>;
struct OnlyTopicNameAndPartitionIdHash
{
std::size_t operator()(const TopicPartition & tp) const
{
SipHash s;
s.update(tp.topic);
s.update(tp.partition_id);
return s.get64();
}
};
struct OnlyTopicNameAndPartitionIdEquality
{
bool operator()(const TopicPartition & lhs, const TopicPartition & rhs) const
{
return lhs.topic == rhs.topic && lhs.partition_id == rhs.partition_id;
}
};
struct TopicPartitionCount
{
String topic;
size_t partition_count;
};
using TopicPartitionCounts = std::vector<KafkaConsumer2::TopicPartitionCount>;
KafkaConsumer2(
ConsumerPtr consumer_,
LoggerPtr log_,
size_t max_batch_size,
size_t poll_timeout_,
const std::atomic<bool> & stopped_,
const Names & topics_);
~KafkaConsumer2();
// Poll only the main consumer queue without any topic-partition queues. This is useful to get notified about events, such as rebalance,
// new assignment, etc..
void pollEvents();
auto pollTimeout() const { return poll_timeout; }
inline bool hasMorePolledMessages() const { return (stalled_status == StalledStatus::NOT_STALLED) && (current != messages.end()); }
inline bool isStalled() const { return stalled_status != StalledStatus::NOT_STALLED; }
// Returns the topic partitions that the consumer got from rebalancing the consumer group. If the consumer received
// no topic partitions or all of them were revoked, it returns a null pointer.
TopicPartitions const * getKafkaAssignment() const;
// As the main source of offsets is not Kafka, the offsets needs to be pushed to the consumer from outside
// Returns true if it received new assignment and internal state should be updated by updateOffsets
bool needsOffsetUpdate() const { return needs_offset_update; }
void updateOffsets(const TopicPartitions & topic_partitions);
/// Polls batch of messages from the given topic-partition and returns read buffer containing the next message or
/// nullptr when there are no messages to process.
ReadBufferPtr consume(const TopicPartition & topic_partition, const std::optional<int64_t> & message_count);
void commit(const TopicPartition & topic_partition);
// Return values for the message that's being read.
String currentTopic() const { return current[-1].get_topic(); }
String currentKey() const { return current[-1].get_key(); }
auto currentOffset() const { return current[-1].get_offset(); }
auto currentPartition() const { return current[-1].get_partition(); }
auto currentTimestamp() const { return current[-1].get_timestamp(); }
const auto & currentHeaderList() const { return current[-1].get_header_list(); }
String currentPayload() const { return current[-1].get_payload(); }
void subscribeIfNotSubscribedYet();
private:
using Messages = std::vector<cppkafka::Message>;
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaConsumers};
enum class StalledStatus
{
NOT_STALLED,
NO_MESSAGES_RETURNED,
CONSUMER_STOPPED,
NO_ASSIGNMENT,
ERRORS_RETURNED
};
ConsumerPtr consumer;
LoggerPtr log;
const size_t batch_size = 1;
const size_t poll_timeout = 0;
StalledStatus stalled_status = StalledStatus::NO_MESSAGES_RETURNED;
const std::atomic<bool> & stopped;
bool is_subscribed = false;
// order is important, need to be destructed before consumer
Messages messages;
Messages::const_iterator current;
// order is important, need to be destructed before consumer
std::optional<TopicPartitions> assignment;
bool needs_offset_update{false};
std::unordered_map<TopicPartition, cppkafka::Queue, OnlyTopicNameAndPartitionIdHash, OnlyTopicNameAndPartitionIdEquality> queues;
const Names topics;
bool polledDataUnusable(const TopicPartition & topic_partition) const;
void drainConsumerQueue();
void resetIfStopped();
void filterMessageErrors();
ReadBufferPtr getNextMessage();
void initializeQueues(const cppkafka::TopicPartitionList & topic_partitions);
};
}

View File

@ -38,6 +38,8 @@ const auto KAFKA_CONSUMERS_POOL_TTL_MS_MAX = 600'000;
M(StreamingHandleErrorMode, kafka_handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for Kafka engine. Possible values: default (throw an exception after rabbitmq_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \
M(Bool, kafka_commit_on_select, false, "Commit messages when select query is made", 0) \
M(UInt64, kafka_max_rows_per_message, 1, "The maximum number of rows produced in one kafka message for row-based formats.", 0) \
M(String, kafka_keeper_path, "", "The path to the table in ClickHouse Keeper", 0) \
M(String, kafka_replica_name, "", "The replica name in ClickHouse Keeper", 0) \
#define OBSOLETE_KAFKA_SETTINGS(M, ALIAS) \
MAKE_OBSOLETE(M, Char, kafka_row_delimiter, '\0') \

View File

@ -262,7 +262,7 @@ Chunk KafkaSource::generateImpl()
// they are not needed here:
// and it's misleading to use them here,
// as columns 'materialized' that way stays 'ephemeral'
// i.e. will not be stored anythere
// i.e. will not be stored anywhere
// If needed any extra columns can be added using DEFAULT they can be added at MV level if needed.
auto result_block = non_virtual_header.cloneWithColumns(executor.getResultColumns());

View File

@ -1,13 +1,5 @@
#include <Storages/Kafka/StorageKafka.h>
#include <Storages/Kafka/parseSyslogLevel.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Formats/FormatFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h>
@ -21,18 +13,19 @@
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/QueryPlan/ISourceStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromStreamLikeEngine.h>
#include <Storages/Kafka/KafkaConfigLoader.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipeline.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/Kafka/KafkaProducer.h>
#include <Storages/Kafka/KafkaSettings.h>
#include <Storages/Kafka/KafkaSource.h>
#include <Storages/Kafka/StorageKafkaUtils.h>
#include <Storages/MessageQueueSink.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Common/NamedCollections/NamedCollectionsFactory.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <base/getFQDNOrHostName.h>
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/trim.hpp>
@ -41,10 +34,10 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Exception.h>
#include <Common/Macros.h>
#include <Common/NamedCollections/NamedCollectionsFactory.h>
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Processors/QueryPlan/ReadFromStreamLikeEngine.h>
#include <Common/logger_useful.h>
#include <Common/quoteString.h>
#include <Common/setThreadName.h>
@ -55,13 +48,8 @@
#include <Common/ProfileEvents.h>
#include <base/sleep.h>
#if USE_KRB5
#include <Access/KerberosInit.h>
#endif // USE_KRB5
namespace CurrentMetrics
{
extern const Metric KafkaLibrdkafkaThreads;
extern const Metric KafkaBackgroundReads;
extern const Metric KafkaConsumersInUse;
extern const Metric KafkaWrites;
@ -82,104 +70,10 @@ namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int QUERY_NOT_ALLOWED;
extern const int ABORTED;
}
struct StorageKafkaInterceptors
{
static rd_kafka_resp_err_t rdKafkaOnThreadStart(rd_kafka_t *, rd_kafka_thread_type_t thread_type, const char *, void * ctx)
{
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
CurrentMetrics::add(CurrentMetrics::KafkaLibrdkafkaThreads, 1);
const auto & storage_id = self->getStorageID();
const auto & table = storage_id.getTableName();
switch (thread_type)
{
case RD_KAFKA_THREAD_MAIN:
setThreadName(("rdk:m/" + table.substr(0, 9)).c_str());
break;
case RD_KAFKA_THREAD_BACKGROUND:
setThreadName(("rdk:bg/" + table.substr(0, 8)).c_str());
break;
case RD_KAFKA_THREAD_BROKER:
setThreadName(("rdk:b/" + table.substr(0, 9)).c_str());
break;
}
/// Create ThreadStatus to track memory allocations from librdkafka threads.
//
/// And store them in a separate list (thread_statuses) to make sure that they will be destroyed,
/// regardless how librdkafka calls the hooks.
/// But this can trigger use-after-free if librdkafka will not destroy threads after rd_kafka_wait_destroyed()
auto thread_status = std::make_shared<ThreadStatus>();
std::lock_guard lock(self->thread_statuses_mutex);
self->thread_statuses.emplace_back(std::move(thread_status));
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
static rd_kafka_resp_err_t rdKafkaOnThreadExit(rd_kafka_t *, rd_kafka_thread_type_t, const char *, void * ctx)
{
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
CurrentMetrics::sub(CurrentMetrics::KafkaLibrdkafkaThreads, 1);
std::lock_guard lock(self->thread_statuses_mutex);
const auto it = std::find_if(self->thread_statuses.begin(), self->thread_statuses.end(), [](const auto & thread_status_ptr)
{
return thread_status_ptr.get() == current_thread;
});
if (it == self->thread_statuses.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No thread status for this librdkafka thread.");
self->thread_statuses.erase(it);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
static rd_kafka_resp_err_t rdKafkaOnNew(rd_kafka_t * rk, const rd_kafka_conf_t *, void * ctx, char * /*errstr*/, size_t /*errstr_size*/)
{
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
rd_kafka_resp_err_t status;
status = rd_kafka_interceptor_add_on_thread_start(rk, "init-thread", rdKafkaOnThreadStart, ctx);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
{
LOG_ERROR(self->log, "Cannot set on thread start interceptor due to {} error", status);
return status;
}
status = rd_kafka_interceptor_add_on_thread_exit(rk, "exit-thread", rdKafkaOnThreadExit, ctx);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
LOG_ERROR(self->log, "Cannot set on thread exit interceptor due to {} error", status);
return status;
}
static rd_kafka_resp_err_t rdKafkaOnConfDup(rd_kafka_conf_t * new_conf, const rd_kafka_conf_t * /*old_conf*/, size_t /*filter_cnt*/, const char ** /*filter*/, void * ctx)
{
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
rd_kafka_resp_err_t status;
// cppkafka copies configuration multiple times
status = rd_kafka_conf_interceptor_add_on_conf_dup(new_conf, "init", rdKafkaOnConfDup, ctx);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
{
LOG_ERROR(self->log, "Cannot set on conf dup interceptor due to {} error", status);
return status;
}
status = rd_kafka_conf_interceptor_add_on_new(new_conf, "init", rdKafkaOnNew, ctx);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
LOG_ERROR(self->log, "Cannot set on conf new interceptor due to {} error", status);
return status;
}
};
class ReadFromStorageKafka final : public ReadFromStreamLikeEngine
{
public:
@ -241,182 +135,6 @@ private:
StorageSnapshotPtr storage_snapshot;
};
namespace
{
const String CONFIG_KAFKA_TAG = "kafka";
const String CONFIG_KAFKA_TOPIC_TAG = "kafka_topic";
const String CONFIG_KAFKA_CONSUMER_TAG = "consumer";
const String CONFIG_KAFKA_PRODUCER_TAG = "producer";
const String CONFIG_NAME_TAG = "name";
void setKafkaConfigValue(cppkafka::Configuration & kafka_config, const String & key, const String & value)
{
/// "log_level" has valid underscore, the remaining librdkafka setting use dot.separated.format which isn't acceptable for XML.
/// See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
const String setting_name_in_kafka_config = (key == "log_level") ? key : boost::replace_all_copy(key, "_", ".");
kafka_config.set(setting_name_in_kafka_config, value);
}
void loadConfigProperty(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const String & tag)
{
const String property_path = config_prefix + "." + tag;
const String property_value = config.getString(property_path);
setKafkaConfigValue(kafka_config, tag, property_value);
}
void loadNamedCollectionConfig(cppkafka::Configuration & kafka_config, const String & collection_name, const String & config_prefix)
{
const auto & collection = NamedCollectionFactory::instance().get(collection_name);
for (const auto & key : collection->getKeys(-1, config_prefix))
{
// Cut prefix with '.' before actual config tag.
const auto param_name = key.substr(config_prefix.size() + 1);
setKafkaConfigValue(kafka_config, param_name, collection->get<String>(key));
}
}
void loadLegacyTopicConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & collection_name, const String & config_prefix)
{
if (!collection_name.empty())
{
loadNamedCollectionConfig(kafka_config, collection_name, config_prefix);
return;
}
Poco::Util::AbstractConfiguration::Keys tags;
config.keys(config_prefix, tags);
for (const auto & tag : tags)
{
loadConfigProperty(kafka_config, config, config_prefix, tag);
}
}
/// Read server configuration into cppkafa configuration, used by new per-topic configuration
void loadTopicConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & collection_name, const String & config_prefix, const String & topic)
{
if (!collection_name.empty())
{
const auto topic_prefix = fmt::format("{}.{}", config_prefix, CONFIG_KAFKA_TOPIC_TAG);
const auto & collection = NamedCollectionFactory::instance().get(collection_name);
for (const auto & key : collection->getKeys(1, config_prefix))
{
/// Only consider key <kafka_topic>. Multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
if (!key.starts_with(topic_prefix))
continue;
const String kafka_topic_path = config_prefix + "." + key;
const String kafka_topic_name_path = kafka_topic_path + "." + CONFIG_NAME_TAG;
if (topic == collection->get<String>(kafka_topic_name_path))
/// Found it! Now read the per-topic configuration into cppkafka.
loadNamedCollectionConfig(kafka_config, collection_name, kafka_topic_path);
}
}
else
{
/// Read all tags one level below <kafka>
Poco::Util::AbstractConfiguration::Keys tags;
config.keys(config_prefix, tags);
for (const auto & tag : tags)
{
if (tag == CONFIG_NAME_TAG)
continue; // ignore <name>, it is used to match topic configurations
loadConfigProperty(kafka_config, config, config_prefix, tag);
}
}
}
/// Read server configuration into cppkafka configuration, used by global configuration and by legacy per-topic configuration
void loadFromConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & collection_name, const String & config_prefix, const Names & topics)
{
if (!collection_name.empty())
{
loadNamedCollectionConfig(kafka_config, collection_name, config_prefix);
return;
}
/// Read all tags one level below <kafka>
Poco::Util::AbstractConfiguration::Keys tags;
config.keys(config_prefix, tags);
for (const auto & tag : tags)
{
if (tag == CONFIG_KAFKA_PRODUCER_TAG || tag == CONFIG_KAFKA_CONSUMER_TAG)
/// Do not load consumer/producer properties, since they should be separated by different configuration objects.
continue;
if (tag.starts_with(CONFIG_KAFKA_TOPIC_TAG)) /// multiple occurrences given as "kafka_topic", "kafka_topic[1]", etc.
{
// Update consumer topic-specific configuration (new syntax). Example with topics "football" and "baseball":
// <kafka>
// <kafka_topic>
// <name>football</name>
// <retry_backoff_ms>250</retry_backoff_ms>
// <fetch_min_bytes>5000</fetch_min_bytes>
// </kafka_topic>
// <kafka_topic>
// <name>baseball</name>
// <retry_backoff_ms>300</retry_backoff_ms>
// <fetch_min_bytes>2000</fetch_min_bytes>
// </kafka_topic>
// </kafka>
// Advantages: The period restriction no longer applies (e.g. <name>sports.football</name> will work), everything
// Kafka-related is below <kafka>.
for (const auto & topic : topics)
{
/// Read topic name between <name>...</name>
const String kafka_topic_path = config_prefix + "." + tag;
const String kafka_topic_name_path = kafka_topic_path + "." + CONFIG_NAME_TAG;
const String topic_name = config.getString(kafka_topic_name_path);
if (topic_name != topic)
continue;
loadTopicConfig(kafka_config, config, collection_name, kafka_topic_path, topic);
}
continue;
}
if (tag.starts_with(CONFIG_KAFKA_TAG))
/// skip legacy configuration per topic e.g. <kafka_TOPIC_NAME>.
/// it will be processed is a separate function
continue;
// Update configuration from the configuration. Example:
// <kafka>
// <retry_backoff_ms>250</retry_backoff_ms>
// <fetch_min_bytes>100000</fetch_min_bytes>
// </kafka>
loadConfigProperty(kafka_config, config, config_prefix, tag);
}
}
void loadLegacyConfigSyntax(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & collection_name, const String & prefix, const Names & topics)
{
for (const auto & topic : topics)
{
const String kafka_topic_path = prefix + "." + CONFIG_KAFKA_TAG + "_" + topic;
loadLegacyTopicConfig(kafka_config, config, collection_name, kafka_topic_path);
}
}
void loadConsumerConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & collection_name, const String & prefix, const Names & topics)
{
const String consumer_path = prefix + "." + CONFIG_KAFKA_CONSUMER_TAG;
loadLegacyConfigSyntax(kafka_config, config, collection_name, prefix, topics);
// A new syntax has higher priority
loadFromConfig(kafka_config, config, collection_name, consumer_path, topics);
}
void loadProducerConfig(cppkafka::Configuration & kafka_config, const Poco::Util::AbstractConfiguration & config, const String & collection_name, const String & prefix, const Names & topics)
{
const String producer_path = prefix + "." + CONFIG_KAFKA_PRODUCER_TAG;
loadLegacyConfigSyntax(kafka_config, config, collection_name, prefix, topics);
// A new syntax has higher priority
loadFromConfig(kafka_config, config, collection_name, producer_path, topics);
}
}
StorageKafka::StorageKafka(
const StorageID & table_id_,
ContextPtr context_,
@ -428,19 +146,20 @@ StorageKafka::StorageKafka(
, WithContext(context_->getGlobalContext())
, kafka_settings(std::move(kafka_settings_))
, macros_info{.table_id = table_id_}
, topics(parseTopics(getContext()->getMacros()->expand(kafka_settings->kafka_topic_list.value, macros_info)))
, topics(StorageKafkaUtils::parseTopics(getContext()->getMacros()->expand(kafka_settings->kafka_topic_list.value, macros_info)))
, brokers(getContext()->getMacros()->expand(kafka_settings->kafka_broker_list.value, macros_info))
, group(getContext()->getMacros()->expand(kafka_settings->kafka_group_name.value, macros_info))
, client_id(
kafka_settings->kafka_client_id.value.empty() ? getDefaultClientId(table_id_)
: getContext()->getMacros()->expand(kafka_settings->kafka_client_id.value, macros_info))
kafka_settings->kafka_client_id.value.empty()
? StorageKafkaUtils::getDefaultClientId(table_id_)
: getContext()->getMacros()->expand(kafka_settings->kafka_client_id.value, macros_info))
, format_name(getContext()->getMacros()->expand(kafka_settings->kafka_format.value))
, max_rows_per_message(kafka_settings->kafka_max_rows_per_message.value)
, schema_name(getContext()->getMacros()->expand(kafka_settings->kafka_schema.value, macros_info))
, num_consumers(kafka_settings->kafka_num_consumers.value)
, log(getLogger("StorageKafka (" + table_id_.table_name + ")"))
, intermediate_commit(kafka_settings->kafka_commit_every_batch.value)
, settings_adjustments(createSettingsAdjustments())
, settings_adjustments(StorageKafkaUtils::createSettingsAdjustments(*kafka_settings, schema_name))
, thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value)
, collection_name(collection_name_)
{
@ -456,7 +175,7 @@ StorageKafka::StorageKafka(
storage_metadata.setColumns(columns_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
setVirtuals(createVirtuals(kafka_settings->kafka_handle_error_mode));
setVirtuals(StorageKafkaUtils::createVirtuals(kafka_settings->kafka_handle_error_mode));
auto task_count = thread_per_consumer ? num_consumers : 1;
for (size_t i = 0; i < task_count; ++i)
@ -481,76 +200,6 @@ StorageKafka::StorageKafka(
StorageKafka::~StorageKafka() = default;
VirtualColumnsDescription StorageKafka::createVirtuals(StreamingHandleErrorMode handle_error_mode)
{
VirtualColumnsDescription desc;
desc.addEphemeral("_topic", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()), "");
desc.addEphemeral("_key", std::make_shared<DataTypeString>(), "");
desc.addEphemeral("_offset", std::make_shared<DataTypeUInt64>(), "");
desc.addEphemeral("_partition", std::make_shared<DataTypeUInt64>(), "");
desc.addEphemeral("_timestamp", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>()), "");
desc.addEphemeral("_timestamp_ms", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime64>(3)), "");
desc.addEphemeral("_headers.name", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "");
desc.addEphemeral("_headers.value", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "");
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
desc.addEphemeral("_raw_message", std::make_shared<DataTypeString>(), "");
desc.addEphemeral("_error", std::make_shared<DataTypeString>(), "");
}
return desc;
}
SettingsChanges StorageKafka::createSettingsAdjustments()
{
SettingsChanges result;
// Needed for backward compatibility
if (!kafka_settings->input_format_skip_unknown_fields.changed)
{
// Always skip unknown fields regardless of the context (JSON or TSKV)
kafka_settings->input_format_skip_unknown_fields = true;
}
if (!kafka_settings->input_format_allow_errors_ratio.changed)
{
kafka_settings->input_format_allow_errors_ratio = 0.;
}
if (!kafka_settings->input_format_allow_errors_num.changed)
{
kafka_settings->input_format_allow_errors_num = kafka_settings->kafka_skip_broken_messages.value;
}
if (!schema_name.empty())
result.emplace_back("format_schema", schema_name);
for (const auto & setting : *kafka_settings)
{
const auto & name = setting.getName();
if (name.find("kafka_") == std::string::npos)
result.emplace_back(name, setting.getValue());
}
return result;
}
Names StorageKafka::parseTopics(String topic_list)
{
Names result;
boost::split(result,topic_list,[](char c){ return c == ','; });
for (String & topic : result)
{
boost::trim(topic);
}
return result;
}
String StorageKafka::getDefaultClientId(const StorageID & table_id_)
{
return fmt::format("{}-{}-{}-{}", VERSION_NAME, getFQDNOrHostName(), table_id_.database_name, table_id_.table_name);
}
void StorageKafka::read(
QueryPlan & query_plan,
const Names & column_names,
@ -751,65 +400,26 @@ KafkaConsumerPtr StorageKafka::createKafkaConsumer(size_t consumer_number)
topics);
return kafka_consumer_ptr;
}
cppkafka::Configuration StorageKafka::getConsumerConfiguration(size_t consumer_number)
{
cppkafka::Configuration conf;
conf.set("metadata.broker.list", brokers);
conf.set("group.id", group);
if (num_consumers > 1)
{
conf.set("client.id", fmt::format("{}-{}", client_id, consumer_number));
}
else
{
conf.set("client.id", client_id);
}
conf.set("client.software.name", VERSION_NAME);
conf.set("client.software.version", VERSION_DESCRIBE);
conf.set("auto.offset.reset", "earliest"); // If no offset stored for this group, read all messages from the start
// that allows to prevent fast draining of the librdkafka queue
// during building of single insert block. Improves performance
// significantly, but may lead to bigger memory consumption.
size_t default_queued_min_messages = 100000; // must be greater than or equal to default
size_t max_allowed_queued_min_messages = 10000000; // must be less than or equal to max allowed value
conf.set("queued.min.messages", std::min(std::max(getMaxBlockSize(), default_queued_min_messages), max_allowed_queued_min_messages));
updateGlobalConfiguration(conf);
updateConsumerConfiguration(conf);
// those settings should not be changed by users.
conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished
conf.set("enable.auto.offset.store", "false"); // Update offset automatically - to commit them all at once.
conf.set("enable.partition.eof", "false"); // Ignore EOF messages
for (auto & property : conf.get_all())
{
LOG_TRACE(log, "Consumer set property {}:{}", property.first, property.second);
}
return conf;
KafkaConfigLoader::ConsumerConfigParams params{
{getContext()->getConfigRef(), collection_name, topics, log},
brokers,
group,
num_consumers > 1,
consumer_number,
client_id,
getMaxBlockSize()};
return KafkaConfigLoader::getConsumerConfiguration(*this, params);
}
cppkafka::Configuration StorageKafka::getProducerConfiguration()
{
cppkafka::Configuration conf;
conf.set("metadata.broker.list", brokers);
conf.set("client.id", client_id);
conf.set("client.software.name", VERSION_NAME);
conf.set("client.software.version", VERSION_DESCRIBE);
updateGlobalConfiguration(conf);
updateProducerConfiguration(conf);
for (auto & property : conf.get_all())
{
LOG_TRACE(log, "Producer set property {}:{}", property.first, property.second);
}
return conf;
KafkaConfigLoader::ProducerConfigParams params{
{getContext()->getConfigRef(), collection_name, topics, log},
brokers,
client_id};
return KafkaConfigLoader::getProducerConfiguration(*this, params);
}
void StorageKafka::cleanConsumers()
@ -887,126 +497,6 @@ size_t StorageKafka::getPollTimeoutMillisecond() const
: getContext()->getSettingsRef().stream_poll_timeout_ms.totalMilliseconds();
}
void StorageKafka::updateGlobalConfiguration(cppkafka::Configuration & kafka_config)
{
const auto & config = getContext()->getConfigRef();
loadFromConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG, topics);
#if USE_KRB5
if (kafka_config.has_property("sasl.kerberos.kinit.cmd"))
LOG_WARNING(log, "sasl.kerberos.kinit.cmd configuration parameter is ignored.");
kafka_config.set("sasl.kerberos.kinit.cmd","");
kafka_config.set("sasl.kerberos.min.time.before.relogin","0");
if (kafka_config.has_property("sasl.kerberos.keytab") && kafka_config.has_property("sasl.kerberos.principal"))
{
String keytab = kafka_config.get("sasl.kerberos.keytab");
String principal = kafka_config.get("sasl.kerberos.principal");
LOG_DEBUG(log, "Running KerberosInit");
try
{
kerberosInit(keytab,principal);
}
catch (const Exception & e)
{
LOG_ERROR(log, "KerberosInit failure: {}", getExceptionMessage(e, false));
}
LOG_DEBUG(log, "Finished KerberosInit");
}
#else // USE_KRB5
if (kafka_config.has_property("sasl.kerberos.keytab") || kafka_config.has_property("sasl.kerberos.principal"))
LOG_WARNING(log, "Ignoring Kerberos-related parameters because ClickHouse was built without krb5 library support.");
#endif // USE_KRB5
// No need to add any prefix, messages can be distinguished
kafka_config.set_log_callback(
[this](cppkafka::KafkaHandleBase & handle, int level, const std::string & facility, const std::string & message)
{
auto [poco_level, client_logs_level] = parseSyslogLevel(level);
const auto & kafka_object_config = handle.get_configuration();
const std::string client_id_key{"client.id"};
chassert(kafka_object_config.has_property(client_id_key) && "Kafka configuration doesn't have expected client.id set");
LOG_IMPL(
log,
client_logs_level,
poco_level,
"[client.id:{}] [rdk:{}] {}",
kafka_object_config.get(client_id_key),
facility,
message);
});
/// NOTE: statistics should be consumed, otherwise it creates too much
/// entries in the queue, that leads to memory leak and slow shutdown.
if (!kafka_config.has_property("statistics.interval.ms"))
{
// every 3 seconds by default. set to 0 to disable.
kafka_config.set("statistics.interval.ms", "3000");
}
// Configure interceptor to change thread name
//
// TODO: add interceptors support into the cppkafka.
// XXX: rdkafka uses pthread_set_name_np(), but glibc-compatibliity overrides it to noop.
{
// This should be safe, since we wait the rdkafka object anyway.
void * self = static_cast<void *>(this);
int status;
status = rd_kafka_conf_interceptor_add_on_new(kafka_config.get_handle(),
"init", StorageKafkaInterceptors::rdKafkaOnNew, self);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
LOG_ERROR(log, "Cannot set new interceptor due to {} error", status);
// cppkafka always copy the configuration
status = rd_kafka_conf_interceptor_add_on_conf_dup(kafka_config.get_handle(),
"init", StorageKafkaInterceptors::rdKafkaOnConfDup, self);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
LOG_ERROR(log, "Cannot set dup conf interceptor due to {} error", status);
}
}
void StorageKafka::updateConsumerConfiguration(cppkafka::Configuration & kafka_config)
{
const auto & config = getContext()->getConfigRef();
loadConsumerConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG, topics);
}
void StorageKafka::updateProducerConfiguration(cppkafka::Configuration & kafka_config)
{
const auto & config = getContext()->getConfigRef();
loadProducerConfig(kafka_config, config, collection_name, CONFIG_KAFKA_TAG, topics);
}
bool StorageKafka::checkDependencies(const StorageID & table_id)
{
// Check if all dependencies are attached
auto view_ids = DatabaseCatalog::instance().getDependentViews(table_id);
if (view_ids.empty())
return true;
// Check the dependencies are ready?
for (const auto & view_id : view_ids)
{
auto view = DatabaseCatalog::instance().tryGetTable(view_id, getContext());
if (!view)
return false;
// If it materialized view, check it's target table
auto * materialized_view = dynamic_cast<StorageMaterializedView *>(view.get());
if (materialized_view && !materialized_view->tryGetTargetTable())
return false;
// Check all its dependencies
if (!checkDependencies(view_id))
return false;
}
return true;
}
void StorageKafka::threadFunc(size_t idx)
{
assert(idx < tasks.size());
@ -1027,7 +517,7 @@ void StorageKafka::threadFunc(size_t idx)
// Keep streaming as long as there are attached views and streaming is not cancelled
while (!task->stream_cancelled)
{
if (!checkDependencies(table_id))
if (!StorageKafkaUtils::checkDependencies(table_id, getContext()))
break;
LOG_DEBUG(log, "Started streaming to {} attached views", num_views);
@ -1109,7 +599,7 @@ bool StorageKafka::streamToViews()
/* allow_materialized */ false,
/* no_squash */ true,
/* no_destination */ true,
/* async_isnert */ false);
/* async_insert */ false);
auto block_io = interpreter.execute();
// Create a stream for each consumer and join them in a union stream
@ -1167,164 +657,4 @@ bool StorageKafka::streamToViews()
return some_stream_is_stalled;
}
void registerStorageKafka(StorageFactory & factory)
{
auto creator_fn = [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
size_t args_count = engine_args.size();
const bool has_settings = args.storage_def->settings;
auto kafka_settings = std::make_unique<KafkaSettings>();
String collection_name;
if (auto named_collection = tryGetNamedCollectionWithOverrides(args.engine_args, args.getLocalContext()))
{
for (const auto & setting : kafka_settings->all())
{
const auto & setting_name = setting.getName();
if (named_collection->has(setting_name))
kafka_settings->set(setting_name, named_collection->get<String>(setting_name));
}
collection_name = assert_cast<const ASTIdentifier *>(args.engine_args[0].get())->name();
}
if (has_settings)
{
kafka_settings->loadFromQuery(*args.storage_def);
}
// Check arguments and settings
#define CHECK_KAFKA_STORAGE_ARGUMENT(ARG_NUM, PAR_NAME, EVAL) \
/* One of the four required arguments is not specified */ \
if (args_count < (ARG_NUM) && (ARG_NUM) <= 4 && \
!kafka_settings->PAR_NAME.changed) \
{ \
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,\
"Required parameter '{}' " \
"for storage Kafka not specified", \
#PAR_NAME); \
} \
if (args_count >= (ARG_NUM)) \
{ \
/* The same argument is given in two places */ \
if (has_settings && \
kafka_settings->PAR_NAME.changed) \
{ \
throw Exception(ErrorCodes::BAD_ARGUMENTS, \
"The argument №{} of storage Kafka " \
"and the parameter '{}' " \
"in SETTINGS cannot be specified at the same time", \
#ARG_NUM, #PAR_NAME); \
} \
/* move engine args to settings */ \
else \
{ \
if ((EVAL) == 1) \
{ \
engine_args[(ARG_NUM)-1] = \
evaluateConstantExpressionAsLiteral( \
engine_args[(ARG_NUM)-1], \
args.getLocalContext()); \
} \
if ((EVAL) == 2) \
{ \
engine_args[(ARG_NUM)-1] = \
evaluateConstantExpressionOrIdentifierAsLiteral( \
engine_args[(ARG_NUM)-1], \
args.getLocalContext()); \
} \
kafka_settings->PAR_NAME = \
engine_args[(ARG_NUM)-1]->as<ASTLiteral &>().value; \
} \
}
/** Arguments of engine is following:
* - Kafka broker list
* - List of topics
* - Group ID (may be a constant expression with a string result)
* - Message format (string)
* - Row delimiter
* - Schema (optional, if the format supports it)
* - Number of consumers
* - Max block size for background consumption
* - Skip (at least) unreadable messages number
* - Do intermediate commits when the batch consumed and handled
*/
/* 0 = raw, 1 = evaluateConstantExpressionAsLiteral, 2=evaluateConstantExpressionOrIdentifierAsLiteral */
/// In case of named collection we already validated the arguments.
if (collection_name.empty())
{
CHECK_KAFKA_STORAGE_ARGUMENT(1, kafka_broker_list, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(2, kafka_topic_list, 1)
CHECK_KAFKA_STORAGE_ARGUMENT(3, kafka_group_name, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(4, kafka_format, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(5, kafka_row_delimiter, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(6, kafka_schema, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(7, kafka_num_consumers, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(8, kafka_max_block_size, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(9, kafka_skip_broken_messages, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(10, kafka_commit_every_batch, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(11, kafka_client_id, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(12, kafka_poll_timeout_ms, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(13, kafka_flush_interval_ms, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(14, kafka_thread_per_consumer, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(15, kafka_handle_error_mode, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(16, kafka_commit_on_select, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(17, kafka_max_rows_per_message, 0)
}
#undef CHECK_KAFKA_STORAGE_ARGUMENT
auto num_consumers = kafka_settings->kafka_num_consumers.value;
auto max_consumers = std::max<uint32_t>(getNumberOfPhysicalCPUCores(), 16);
if (!args.getLocalContext()->getSettingsRef().kafka_disable_num_consumers_limit && num_consumers > max_consumers)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The number of consumers can not be bigger than {}. "
"A single consumer can read any number of partitions. "
"Extra consumers are relatively expensive, "
"and using a lot of them can lead to high memory and CPU usage. "
"To achieve better performance "
"of getting data from Kafka, consider using a setting kafka_thread_per_consumer=1, "
"and ensure you have enough threads "
"in MessageBrokerSchedulePool (background_message_broker_schedule_pool_size). "
"See also https://clickhouse.com/docs/en/integrations/kafka#tuning-performance", max_consumers);
}
else if (num_consumers < 1)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Number of consumers can not be lower than 1");
}
if (kafka_settings->kafka_max_block_size.changed && kafka_settings->kafka_max_block_size.value < 1)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "kafka_max_block_size can not be lower than 1");
}
if (kafka_settings->kafka_poll_max_batch_size.changed && kafka_settings->kafka_poll_max_batch_size.value < 1)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "kafka_poll_max_batch_size can not be lower than 1");
}
NamesAndTypesList supported_columns;
for (const auto & column : args.columns)
{
if (column.default_desc.kind == ColumnDefaultKind::Alias)
supported_columns.emplace_back(column.name, column.type);
if (column.default_desc.kind == ColumnDefaultKind::Default && !column.default_desc.expression)
supported_columns.emplace_back(column.name, column.type);
}
// Kafka engine allows only ordinary columns without default expression or alias columns.
if (args.columns.getAll() != supported_columns)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL expressions for columns. "
"See https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka/#configuration");
}
return std::make_shared<StorageKafka>(args.table_id, args.getContext(), args.columns, args.comment, std::move(kafka_settings), collection_name);
};
factory.registerStorage("Kafka", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });
}
}

View File

@ -23,7 +23,8 @@ class ReadFromStorageKafka;
class StorageSystemKafkaConsumers;
class ThreadStatus;
struct StorageKafkaInterceptors;
template <typename TStorageKafka>
struct KafkaInterceptors;
using KafkaConsumerPtr = std::shared_ptr<KafkaConsumer>;
using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
@ -33,7 +34,8 @@ using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
*/
class StorageKafka final : public IStorage, WithContext
{
friend struct StorageKafkaInterceptors;
using KafkaInterceptors = KafkaInterceptors<StorageKafka>;
friend KafkaInterceptors;
public:
StorageKafka(
@ -133,7 +135,6 @@ private:
std::mutex thread_statuses_mutex;
std::list<std::shared_ptr<ThreadStatus>> thread_statuses;
SettingsChanges createSettingsAdjustments();
/// Creates KafkaConsumer object without real consumer (cppkafka::Consumer)
KafkaConsumerPtr createKafkaConsumer(size_t consumer_number);
/// Returns full consumer related configuration, also the configuration
@ -148,33 +149,15 @@ private:
std::atomic<bool> shutdown_called = false;
// Load Kafka global configuration
// https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md#global-configuration-properties
void updateGlobalConfiguration(cppkafka::Configuration & kafka_config);
// Load Kafka properties from consumer configuration
// NOTE: librdkafka allow to set a consumer property to a producer and vice versa,
// but a warning will be generated e.g:
// "Configuration property session.timeout.ms is a consumer property and
// will be ignored by this producer instance"
void updateConsumerConfiguration(cppkafka::Configuration & kafka_config);
// Load Kafka properties from producer configuration
void updateProducerConfiguration(cppkafka::Configuration & kafka_config);
void threadFunc(size_t idx);
size_t getPollMaxBatchSize() const;
size_t getMaxBlockSize() const;
size_t getPollTimeoutMillisecond() const;
static Names parseTopics(String topic_list);
static String getDefaultClientId(const StorageID & table_id_);
bool streamToViews();
bool checkDependencies(const StorageID & table_id);
void cleanConsumers();
static VirtualColumnsDescription createVirtuals(StreamingHandleErrorMode handle_error_mode);
};
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,241 @@
#pragma once
#include <Core/BackgroundSchedulePool.h>
#include <Core/Block.h>
#include <Core/Types.h>
#include <Storages/IStorage.h>
#include <Storages/Kafka/KafkaConsumer2.h>
#include <Storages/Kafka/KafkaSettings.h>
#include <Common/Macros.h>
#include <Common/SettingsChanges.h>
#include <Common/ThreadStatus.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Poco/Semaphore.h>
#include <atomic>
#include <filesystem>
#include <list>
#include <mutex>
#include <rdkafka.h>
namespace cppkafka
{
class Configuration;
}
namespace DB
{
template <typename TStorageKafka>
struct KafkaInterceptors;
using KafkaConsumer2Ptr = std::shared_ptr<KafkaConsumer2>;
/// Implements a Kafka queue table engine that can be used as a persistent queue / buffer,
/// or as a basic building block for creating pipelines with a continuous insertion / ETL.
///
/// It is similar to the already existing StorageKafka, it instead of storing the offsets
/// in Kafka, its main source of information about offsets is Keeper. On top of the
/// offsets, it also stores the number of messages (intent size) it tried to insert from
/// each topic. By storing the intent sizes it is possible to retry the same batch of
/// messages in case of any errors and giving deduplication a chance to deduplicate
/// blocks.
///
/// To not complicate things too much, the current implementation makes sure to fetch
/// messages only from a single topic-partition on a single thread at a time by
/// manipulating the queues of librdkafka. By pulling from multiple topic-partitions
/// the order of messages are not guaranteed, therefore they would have different
/// hashes for deduplication.
class StorageKafka2 final : public IStorage, WithContext
{
using KafkaInterceptors = KafkaInterceptors<StorageKafka2>;
friend KafkaInterceptors;
public:
StorageKafka2(
const StorageID & table_id_,
ContextPtr context_,
const ColumnsDescription & columns_,
const String & comment,
std::unique_ptr<KafkaSettings> kafka_settings_,
const String & collection_name_);
std::string getName() const override { return "Kafka"; }
bool noPushingToViews() const override { return true; }
void startup() override;
void shutdown(bool is_drop) override;
void drop() override;
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams) override;
SinkToStoragePtr
write(const ASTPtr & query, const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr context, bool async_insert) override;
/// We want to control the number of rows in a chunk inserted into Kafka
bool prefersLargeBlocks() const override { return false; }
const auto & getFormatName() const { return format_name; }
StreamingHandleErrorMode getHandleKafkaErrorMode() const { return kafka_settings->kafka_handle_error_mode; }
private:
using TopicPartition = KafkaConsumer2::TopicPartition;
using TopicPartitions = KafkaConsumer2::TopicPartitions;
struct LockedTopicPartitionInfo
{
zkutil::EphemeralNodeHolderPtr lock;
std::optional<int64_t> committed_offset;
std::optional<int64_t> intent_size;
};
using TopicPartitionLocks = std::unordered_map<
TopicPartition,
LockedTopicPartitionInfo,
KafkaConsumer2::OnlyTopicNameAndPartitionIdHash,
KafkaConsumer2::OnlyTopicNameAndPartitionIdEquality>;
struct ConsumerAndAssignmentInfo
{
KafkaConsumer2Ptr consumer;
size_t consume_from_topic_partition_index{0};
TopicPartitions topic_partitions{};
zkutil::ZooKeeperPtr keeper;
TopicPartitionLocks locks{};
Stopwatch watch{CLOCK_MONOTONIC_COARSE};
};
struct PolledBatchInfo
{
BlocksList blocks;
int64_t last_offset;
};
// Stream thread
struct TaskContext
{
BackgroundSchedulePool::TaskHolder holder;
std::atomic<bool> stream_cancelled{false};
explicit TaskContext(BackgroundSchedulePool::TaskHolder && task_) : holder(std::move(task_)) { }
};
enum class AssignmentChange
{
NotChanged,
Updated,
Lost
};
// Configuration and state
mutable std::mutex keeper_mutex;
zkutil::ZooKeeperPtr keeper;
String keeper_path;
String replica_path;
std::unique_ptr<KafkaSettings> kafka_settings;
Macros::MacroExpansionInfo macros_info;
const Names topics;
const String brokers;
const String group;
const String client_id;
const String format_name;
const size_t max_rows_per_message;
const String schema_name;
const size_t num_consumers; /// total number of consumers
LoggerPtr log;
Poco::Semaphore semaphore;
const SettingsChanges settings_adjustments;
/// Can differ from num_consumers in case of exception in startup() (or if startup() hasn't been called).
/// In this case we still need to be able to shutdown() properly.
size_t num_created_consumers = 0; /// number of actually created consumers.
std::vector<ConsumerAndAssignmentInfo> consumers;
std::vector<std::shared_ptr<TaskContext>> tasks;
bool thread_per_consumer = false;
/// For memory accounting in the librdkafka threads.
std::mutex thread_statuses_mutex;
std::list<std::shared_ptr<ThreadStatus>> thread_statuses;
/// If named_collection is specified.
String collection_name;
std::atomic<bool> shutdown_called = false;
// Handling replica activation.
std::atomic<bool> is_active = false;
zkutil::EphemeralNodeHolderPtr replica_is_active_node;
BackgroundSchedulePool::TaskHolder activating_task;
String active_node_identifier;
UInt64 consecutive_activate_failures = 0;
bool activate();
void activateAndReschedule();
void partialShutdown();
void assertActive() const;
KafkaConsumer2Ptr createConsumer(size_t consumer_number);
// Returns full consumer related configuration, also the configuration
// contains global kafka properties.
cppkafka::Configuration getConsumerConfiguration(size_t consumer_number);
// Returns full producer related configuration, also the configuration
// contains global kafka properties.
cppkafka::Configuration getProducerConfiguration();
void threadFunc(size_t idx);
size_t getPollMaxBatchSize() const;
size_t getMaxBlockSize() const;
size_t getPollTimeoutMillisecond() const;
enum class StallReason
{
NoAssignment,
CouldNotAcquireLocks,
NoPartitions,
NoMessages,
KeeperSessionEnded,
};
std::optional<StallReason> streamToViews(size_t idx);
std::optional<size_t> streamFromConsumer(ConsumerAndAssignmentInfo & consumer_info);
// Returns true if this is the first replica
bool createTableIfNotExists();
// Returns true if all of the nodes were cleaned up
bool removeTableNodesFromZooKeeper(zkutil::ZooKeeperPtr keeper_to_use, const zkutil::EphemeralNodeHolder::Ptr & drop_lock);
// Creates only the replica in ZooKeeper. Shouldn't be called on the first replica as it is created in createTableIfNotExists
void createReplica();
void dropReplica();
// Takes lock over topic partitions and sets the committed offset in topic_partitions.
std::optional<TopicPartitionLocks> lockTopicPartitions(zkutil::ZooKeeper & keeper_to_use, const TopicPartitions & topic_partitions);
void saveCommittedOffset(zkutil::ZooKeeper & keeper_to_use, const TopicPartition & topic_partition);
void saveIntent(zkutil::ZooKeeper & keeper_to_use, const TopicPartition & topic_partition, int64_t intent);
PolledBatchInfo pollConsumer(
KafkaConsumer2 & consumer,
const TopicPartition & topic_partition,
std::optional<int64_t> message_count,
Stopwatch & watch,
const ContextPtr & context);
void setZooKeeper();
zkutil::ZooKeeperPtr tryGetZooKeeper() const;
zkutil::ZooKeeperPtr getZooKeeper() const;
zkutil::ZooKeeperPtr getZooKeeperAndAssertActive() const;
zkutil::ZooKeeperPtr getZooKeeperIfTableShutDown() const;
std::filesystem::path getTopicPartitionPath(const TopicPartition & topic_partition);
};
}

View File

@ -0,0 +1,452 @@
#include <Storages/Kafka/StorageKafkaUtils.h>
#include <Databases/DatabaseReplicatedHelpers.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTIdentifier.h>
#include <Storages/IStorage.h>
#include <Storages/Kafka/KafkaSettings.h>
#include <Storages/Kafka/StorageKafka.h>
#include <Storages/Kafka/StorageKafka2.h>
#include <Storages/Kafka/parseSyslogLevel.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <base/getFQDNOrHostName.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/CurrentMetrics.h>
#include <Common/NamedCollections/NamedCollectionsFactory.h>
#include <Common/ThreadPool.h>
#include <Common/ThreadStatus.h>
#include <Common/config_version.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <boost/algorithm/string/replace.hpp>
#include <cppkafka/cppkafka.h>
#include <librdkafka/rdkafka.h>
#if USE_KRB5
# include <Access/KerberosInit.h>
#endif // USE_KRB5
namespace ProfileEvents
{
extern const Event KafkaConsumerErrors;
}
namespace DB
{
using namespace std::chrono_literals;
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int SUPPORT_IS_DISABLED;
}
void registerStorageKafka(StorageFactory & factory)
{
auto creator_fn = [](const StorageFactory::Arguments & args) -> std::shared_ptr<IStorage>
{
ASTs & engine_args = args.engine_args;
size_t args_count = engine_args.size();
const bool has_settings = args.storage_def->settings;
auto kafka_settings = std::make_unique<KafkaSettings>();
String collection_name;
if (auto named_collection = tryGetNamedCollectionWithOverrides(args.engine_args, args.getLocalContext()))
{
for (const auto & setting : kafka_settings->all())
{
const auto & setting_name = setting.getName();
if (named_collection->has(setting_name))
kafka_settings->set(setting_name, named_collection->get<String>(setting_name));
}
collection_name = assert_cast<const ASTIdentifier *>(args.engine_args[0].get())->name();
}
if (has_settings)
{
kafka_settings->loadFromQuery(*args.storage_def);
}
// Check arguments and settings
#define CHECK_KAFKA_STORAGE_ARGUMENT(ARG_NUM, PAR_NAME, EVAL) \
/* One of the four required arguments is not specified */ \
if (args_count < (ARG_NUM) && (ARG_NUM) <= 4 && !kafka_settings->PAR_NAME.changed) \
{ \
throw Exception( \
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, \
"Required parameter '{}' " \
"for storage Kafka not specified", \
#PAR_NAME); \
} \
if (args_count >= (ARG_NUM)) \
{ \
/* The same argument is given in two places */ \
if (has_settings && kafka_settings->PAR_NAME.changed) \
{ \
throw Exception( \
ErrorCodes::BAD_ARGUMENTS, \
"The argument №{} of storage Kafka " \
"and the parameter '{}' " \
"in SETTINGS cannot be specified at the same time", \
#ARG_NUM, \
#PAR_NAME); \
} \
/* move engine args to settings */ \
else \
{ \
if constexpr ((EVAL) == 1) \
{ \
engine_args[(ARG_NUM)-1] = evaluateConstantExpressionAsLiteral(engine_args[(ARG_NUM)-1], args.getLocalContext()); \
} \
if constexpr ((EVAL) == 2) \
{ \
engine_args[(ARG_NUM)-1] \
= evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[(ARG_NUM)-1], args.getLocalContext()); \
} \
kafka_settings->PAR_NAME = engine_args[(ARG_NUM)-1]->as<ASTLiteral &>().value; \
} \
}
/** Arguments of engine is following:
* - Kafka broker list
* - List of topics
* - Group ID (may be a constant expression with a string result)
* - Message format (string)
* - Row delimiter
* - Schema (optional, if the format supports it)
* - Number of consumers
* - Max block size for background consumption
* - Skip (at least) unreadable messages number
* - Do intermediate commits when the batch consumed and handled
*/
/* 0 = raw, 1 = evaluateConstantExpressionAsLiteral, 2=evaluateConstantExpressionOrIdentifierAsLiteral */
/// In case of named collection we already validated the arguments.
if (collection_name.empty())
{
CHECK_KAFKA_STORAGE_ARGUMENT(1, kafka_broker_list, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(2, kafka_topic_list, 1)
CHECK_KAFKA_STORAGE_ARGUMENT(3, kafka_group_name, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(4, kafka_format, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(5, kafka_row_delimiter, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(6, kafka_schema, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(7, kafka_num_consumers, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(8, kafka_max_block_size, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(9, kafka_skip_broken_messages, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(10, kafka_commit_every_batch, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(11, kafka_client_id, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(12, kafka_poll_timeout_ms, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(13, kafka_flush_interval_ms, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(14, kafka_thread_per_consumer, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(15, kafka_handle_error_mode, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(16, kafka_commit_on_select, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(17, kafka_max_rows_per_message, 0)
}
#undef CHECK_KAFKA_STORAGE_ARGUMENT
auto num_consumers = kafka_settings->kafka_num_consumers.value;
auto max_consumers = std::max<uint32_t>(getNumberOfPhysicalCPUCores(), 16);
if (!args.getLocalContext()->getSettingsRef().kafka_disable_num_consumers_limit && num_consumers > max_consumers)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"The number of consumers can not be bigger than {}. "
"A single consumer can read any number of partitions. "
"Extra consumers are relatively expensive, "
"and using a lot of them can lead to high memory and CPU usage. "
"To achieve better performance "
"of getting data from Kafka, consider using a setting kafka_thread_per_consumer=1, "
"and ensure you have enough threads "
"in MessageBrokerSchedulePool (background_message_broker_schedule_pool_size). "
"See also https://clickhouse.com/docs/integrations/kafka/kafka-table-engine#tuning-performance",
max_consumers);
}
else if (num_consumers < 1)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Number of consumers can not be lower than 1");
}
if (kafka_settings->kafka_max_block_size.changed && kafka_settings->kafka_max_block_size.value < 1)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "kafka_max_block_size can not be lower than 1");
}
if (kafka_settings->kafka_poll_max_batch_size.changed && kafka_settings->kafka_poll_max_batch_size.value < 1)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "kafka_poll_max_batch_size can not be lower than 1");
}
NamesAndTypesList supported_columns;
for (const auto & column : args.columns)
{
if (column.default_desc.kind == ColumnDefaultKind::Alias)
supported_columns.emplace_back(column.name, column.type);
if (column.default_desc.kind == ColumnDefaultKind::Default && !column.default_desc.expression)
supported_columns.emplace_back(column.name, column.type);
}
// Kafka engine allows only ordinary columns without default expression or alias columns.
if (args.columns.getAll() != supported_columns)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"KafkaEngine doesn't support DEFAULT/MATERIALIZED/EPHEMERAL expressions for columns. "
"See https://clickhouse.com/docs/en/engines/table-engines/integrations/kafka/#configuration");
}
const auto has_keeper_path = kafka_settings->kafka_keeper_path.changed && !kafka_settings->kafka_keeper_path.value.empty();
const auto has_replica_name = kafka_settings->kafka_replica_name.changed && !kafka_settings->kafka_replica_name.value.empty();
if (!has_keeper_path && !has_replica_name)
return std::make_shared<StorageKafka>(
args.table_id, args.getContext(), args.columns, args.comment, std::move(kafka_settings), collection_name);
if (!args.getLocalContext()->getSettingsRef().allow_experimental_kafka_offsets_storage_in_keeper && !args.query.attach)
throw Exception(
ErrorCodes::SUPPORT_IS_DISABLED,
"Storing the Kafka offsets in Keeper is experimental. Set `allow_experimental_kafka_offsets_storage_in_keeper` setting "
"to enable it");
if (!has_keeper_path || !has_replica_name)
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Either specify both zookeeper path and replica name or none of them");
const auto is_on_cluster = args.getLocalContext()->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY;
const auto is_replicated_database = args.getLocalContext()->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY
&& DatabaseCatalog::instance().getDatabase(args.table_id.database_name)->getEngineName() == "Replicated";
// UUID macro is only allowed:
// - with Atomic database only with ON CLUSTER queries, otherwise it is easy to misuse: each replica would have separate uuid generated.
// - with Replicated database
// - with attach queries, as those are used on server startup
const auto allow_uuid_macro = is_on_cluster || is_replicated_database || args.query.attach;
auto context = args.getContext();
// Unfold {database} and {table} macro on table creation, so table can be renamed.
if (args.mode < LoadingStrictnessLevel::ATTACH)
{
Macros::MacroExpansionInfo info;
/// NOTE: it's not recursive
info.expand_special_macros_only = true;
info.table_id = args.table_id;
// We could probably unfold UUID here too, but let's keep it similar to ReplicatedMergeTree, which doesn't do the unfolding.
info.table_id.uuid = UUIDHelpers::Nil;
kafka_settings->kafka_keeper_path.value = context->getMacros()->expand(kafka_settings->kafka_keeper_path.value, info);
info.level = 0;
kafka_settings->kafka_replica_name.value = context->getMacros()->expand(kafka_settings->kafka_replica_name.value, info);
}
auto * settings_query = args.storage_def->settings;
chassert(has_settings && "Unexpected settings query in StorageKafka");
settings_query->changes.setSetting("kafka_keeper_path", kafka_settings->kafka_keeper_path.value);
settings_query->changes.setSetting("kafka_replica_name", kafka_settings->kafka_replica_name.value);
// Expand other macros (such as {replica}). We do not expand them on previous step to make possible copying metadata files between replicas.
// Disable expanding {shard} macro, because it can lead to incorrect behavior and it doesn't make sense to shard Kafka tables.
Macros::MacroExpansionInfo info;
info.table_id = args.table_id;
if (is_replicated_database)
{
auto database = DatabaseCatalog::instance().getDatabase(args.table_id.database_name);
info.shard.reset();
info.replica = getReplicatedDatabaseReplicaName(database);
}
if (!allow_uuid_macro)
info.table_id.uuid = UUIDHelpers::Nil;
kafka_settings->kafka_keeper_path.value = context->getMacros()->expand(kafka_settings->kafka_keeper_path.value, info);
info.level = 0;
info.table_id.uuid = UUIDHelpers::Nil;
kafka_settings->kafka_replica_name.value = context->getMacros()->expand(kafka_settings->kafka_replica_name.value, info);
return std::make_shared<StorageKafka2>(
args.table_id, args.getContext(), args.columns, args.comment, std::move(kafka_settings), collection_name);
};
factory.registerStorage(
"Kafka",
creator_fn,
StorageFactory::StorageFeatures{
.supports_settings = true,
});
}
namespace StorageKafkaUtils
{
Names parseTopics(String topic_list)
{
Names result;
boost::split(result, topic_list, [](char c) { return c == ','; });
for (String & topic : result)
boost::trim(topic);
return result;
}
String getDefaultClientId(const StorageID & table_id)
{
return fmt::format("{}-{}-{}-{}", VERSION_NAME, getFQDNOrHostName(), table_id.database_name, table_id.table_name);
}
void drainConsumer(
cppkafka::Consumer & consumer, const std::chrono::milliseconds drain_timeout, const LoggerPtr & log, ErrorHandler error_handler)
{
auto start_time = std::chrono::steady_clock::now();
cppkafka::Error last_error(RD_KAFKA_RESP_ERR_NO_ERROR);
while (true)
{
auto msg = consumer.poll(100ms);
if (!msg)
break;
auto error = msg.get_error();
if (error)
{
if (msg.is_eof() || error == last_error)
{
break;
}
else
{
LOG_ERROR(log, "Error during draining: {}", error);
error_handler(error);
}
}
// i don't stop draining on first error,
// only if it repeats once again sequentially
last_error = error;
auto ts = std::chrono::steady_clock::now();
if (std::chrono::duration_cast<std::chrono::milliseconds>(ts - start_time) > drain_timeout)
{
LOG_ERROR(log, "Timeout during draining.");
break;
}
}
}
void eraseMessageErrors(Messages & messages, const LoggerPtr & log, ErrorHandler error_handler)
{
size_t skipped = std::erase_if(
messages,
[&](auto & message)
{
if (auto error = message.get_error())
{
ProfileEvents::increment(ProfileEvents::KafkaConsumerErrors);
LOG_ERROR(log, "Consumer error: {}", error);
error_handler(error);
return true;
}
return false;
});
if (skipped)
LOG_ERROR(log, "There were {} messages with an error", skipped);
}
SettingsChanges createSettingsAdjustments(KafkaSettings & kafka_settings, const String & schema_name)
{
SettingsChanges result;
// Needed for backward compatibility
if (!kafka_settings.input_format_skip_unknown_fields.changed)
{
// Always skip unknown fields regardless of the context (JSON or TSKV)
kafka_settings.input_format_skip_unknown_fields = true;
}
if (!kafka_settings.input_format_allow_errors_ratio.changed)
{
kafka_settings.input_format_allow_errors_ratio = 0.;
}
if (!kafka_settings.input_format_allow_errors_num.changed)
{
kafka_settings.input_format_allow_errors_num = kafka_settings.kafka_skip_broken_messages.value;
}
if (!schema_name.empty())
result.emplace_back("format_schema", schema_name);
for (const auto & setting : kafka_settings)
{
const auto & name = setting.getName();
if (name.find("kafka_") == std::string::npos)
result.emplace_back(name, setting.getValue());
}
return result;
}
bool checkDependencies(const StorageID & table_id, const ContextPtr& context)
{
// Check if all dependencies are attached
auto view_ids = DatabaseCatalog::instance().getDependentViews(table_id);
if (view_ids.empty())
return true;
// Check the dependencies are ready?
for (const auto & view_id : view_ids)
{
auto view = DatabaseCatalog::instance().tryGetTable(view_id, context);
if (!view)
return false;
// If it materialized view, check it's target table
auto * materialized_view = dynamic_cast<StorageMaterializedView *>(view.get());
if (materialized_view && !materialized_view->tryGetTargetTable())
return false;
// Check all its dependencies
if (!checkDependencies(view_id, context))
return false;
}
return true;
}
VirtualColumnsDescription createVirtuals(StreamingHandleErrorMode handle_error_mode)
{
VirtualColumnsDescription desc;
desc.addEphemeral("_topic", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()), "");
desc.addEphemeral("_key", std::make_shared<DataTypeString>(), "");
desc.addEphemeral("_offset", std::make_shared<DataTypeUInt64>(), "");
desc.addEphemeral("_partition", std::make_shared<DataTypeUInt64>(), "");
desc.addEphemeral("_timestamp", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>()), "");
desc.addEphemeral("_timestamp_ms", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime64>(3)), "");
desc.addEphemeral("_headers.name", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "");
desc.addEphemeral("_headers.value", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "");
if (handle_error_mode == StreamingHandleErrorMode::STREAM)
{
desc.addEphemeral("_raw_message", std::make_shared<DataTypeString>(), "");
desc.addEphemeral("_error", std::make_shared<DataTypeString>(), "");
}
return desc;
}
}
}

View File

@ -0,0 +1,61 @@
#pragma once
#include <chrono>
#include <Core/Names.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/StorageID.h>
#include <base/types.h>
#include <cppkafka/configuration.h>
#include <cppkafka/cppkafka.h>
#include <cppkafka/topic_partition.h>
#include <fmt/ostream.h>
#include <Core/SettingsEnums.h>
#include <librdkafka/rdkafka.h>
#include <Common/SettingsChanges.h>
namespace Poco
{
namespace Util
{
class AbstractConfiguration;
}
}
namespace DB
{
class VirtualColumnsDescription;
struct KafkaSettings;
namespace StorageKafkaUtils
{
Names parseTopics(String topic_list);
String getDefaultClientId(const StorageID & table_id);
using ErrorHandler = std::function<void(const cppkafka::Error &)>;
void drainConsumer(
cppkafka::Consumer & consumer,
std::chrono::milliseconds drain_timeout,
const LoggerPtr & log,
ErrorHandler error_handler = [](const cppkafka::Error & /*err*/) {});
using Messages = std::vector<cppkafka::Message>;
void eraseMessageErrors(Messages & messages, const LoggerPtr & log, ErrorHandler error_handler = [](const cppkafka::Error & /*err*/) {});
SettingsChanges createSettingsAdjustments(KafkaSettings & kafka_settings, const String & schema_name);
bool checkDependencies(const StorageID & table_id, const ContextPtr& context);
VirtualColumnsDescription createVirtuals(StreamingHandleErrorMode handle_error_mode);
}
}
template <>
struct fmt::formatter<cppkafka::TopicPartition> : fmt::ostream_formatter
{
};
template <>
struct fmt::formatter<cppkafka::Error> : fmt::ostream_formatter
{
};

View File

@ -1,4 +1,5 @@
#include "parseSyslogLevel.h"
#include <Storages/Kafka/parseSyslogLevel.h>
#include <sys/syslog.h>
/// Must be in a separate compilation unit due to macros overlaps:

View File

@ -48,7 +48,7 @@
<!-- This is only producer setting, if it was applied to consumer,
it would create warning message in logs. -->
<!-- default 30000 -->
<request_timeout_ms>30001</request_timeout_ms>
<request_timeout_ms>30001</request_timeout_ms>
</kafka_topic>
<!-- default 60000 -->
<transaction_timeout_ms>60001</transaction_timeout_ms>

File diff suppressed because it is too large Load Diff