Merge pull request #10983 from azat/configure-librdkafka

Configure librdkafka (logging and thread names)
This commit is contained in:
alexey-milovidov 2020-05-17 23:25:18 +03:00 committed by GitHub
commit 3820075813
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 108 additions and 0 deletions

View File

@ -1,4 +1,5 @@
#include <Storages/Kafka/StorageKafka.h>
#include <Storages/Kafka/parseSyslogLevel.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/LimitBlockInputStream.h>
@ -32,6 +33,7 @@
#include <common/logger_useful.h>
#include <Common/quoteString.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <librdkafka/rdkafka.h>
namespace DB
@ -67,6 +69,46 @@ namespace
conf.set(key_name, config.getString(key_path));
}
}
rd_kafka_resp_err_t rdKafkaOnThreadStart(rd_kafka_t *, rd_kafka_thread_type_t thread_type, const char *, void * ctx)
{
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
const auto & storage_id = self->getStorageID();
const auto & table = storage_id.getTableName();
switch (thread_type)
{
case RD_KAFKA_THREAD_MAIN:
setThreadName(("rdk:m/" + table.substr(0, 9)).c_str());
break;
case RD_KAFKA_THREAD_BACKGROUND:
setThreadName(("rdk:bg/" + table.substr(0, 8)).c_str());
break;
case RD_KAFKA_THREAD_BROKER:
setThreadName(("rdk:b/" + table.substr(0, 9)).c_str());
break;
}
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
rd_kafka_resp_err_t rdKafkaOnNew(rd_kafka_t * rk, const rd_kafka_conf_t *, void * ctx, char * /*errstr*/, size_t /*errstr_size*/)
{
return rd_kafka_interceptor_add_on_thread_start(rk, "setThreadName", rdKafkaOnThreadStart, ctx);
}
rd_kafka_resp_err_t rdKafkaOnConfDup(rd_kafka_conf_t * new_conf, const rd_kafka_conf_t * /*old_conf*/, size_t /*filter_cnt*/, const char ** /*filter*/, void * ctx)
{
rd_kafka_resp_err_t status;
// cppkafka copies configuration multiple times
status = rd_kafka_conf_interceptor_add_on_conf_dup(new_conf, "setThreadName", rdKafkaOnConfDup, ctx);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
return status;
status = rd_kafka_conf_interceptor_add_on_new(new_conf, "setThreadName", rdKafkaOnNew, ctx);
return status;
}
}
StorageKafka::StorageKafka(
@ -278,6 +320,33 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & conf)
if (config.has(topic_config_key))
loadFromConfig(conf, config, topic_config_key);
}
// No need to add any prefix, messages can be distinguished
conf.set_log_callback([this](cppkafka::KafkaHandleBase &, int level, const std::string & /* facility */, const std::string & message)
{
auto [poco_level, client_logs_level] = parseSyslogLevel(level);
LOG_SIMPLE(log, message, client_logs_level, poco_level);
});
// Configure interceptor to change thread name
//
// TODO: add interceptors support into the cppkafka.
// XXX: rdkafka uses pthread_set_name_np(), but glibc-compatibliity overrides it to noop.
{
// This should be safe, since we wait the rdkafka object anyway.
void * self = reinterpret_cast<void *>(this);
int status;
status = rd_kafka_conf_interceptor_add_on_new(conf.get_handle(), "setThreadName", rdKafkaOnNew, self);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
LOG_ERROR(log, "Cannot set new interceptor");
// cppkafka always copy the configuration
status = rd_kafka_conf_interceptor_add_on_conf_dup(conf.get_handle(), "setThreadName", rdKafkaOnConfDup, self);
if (status != RD_KAFKA_RESP_ERR_NO_ERROR)
LOG_ERROR(log, "Cannot set dup conf interceptor");
}
}
bool StorageKafka::checkDependencies(const StorageID & table_id)

View File

@ -0,0 +1,32 @@
#include "parseSyslogLevel.h"
#include <sys/syslog.h>
/// Must be in a sepearate compilation unit due to macros overlaps:
/// - syslog (LOG_DEBUG/...)
/// - logger_useful.h (LOG_DEBUG()/...)
std::pair<Poco::Message::Priority, DB::LogsLevel> parseSyslogLevel(const int level)
{
using DB::LogsLevel;
using Poco::Message;
switch (level)
{
case LOG_EMERG: [[fallthrough]];
case LOG_ALERT:
return std::make_pair(Message::PRIO_FATAL, LogsLevel::error);
case LOG_CRIT:
return std::make_pair(Message::PRIO_CRITICAL, LogsLevel::error);
case LOG_ERR:
return std::make_pair(Message::PRIO_ERROR, LogsLevel::error);
case LOG_WARNING:
return std::make_pair(Message::PRIO_WARNING, LogsLevel::warning);
case LOG_NOTICE:
return std::make_pair(Message::PRIO_NOTICE, LogsLevel::information);
case LOG_INFO:
return std::make_pair(Message::PRIO_INFORMATION, LogsLevel::information);
case LOG_DEBUG:
return std::make_pair(Message::PRIO_DEBUG, LogsLevel::debug);
default:
return std::make_pair(Message::PRIO_TRACE, LogsLevel::trace);
}
}

View File

@ -0,0 +1,7 @@
#pragma once
#include <utility>
#include <Poco/Message.h>
#include <Common/CurrentThread.h>
std::pair<Poco::Message::Priority, DB::LogsLevel> parseSyslogLevel(const int level);