mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Add KerberosInit into StorageKafka
This commit is contained in:
parent
cb53aa15ec
commit
a156a77890
@ -9,6 +9,7 @@ int KerberosInit::init(const String & keytab_file, const String & principal, con
|
||||
{
|
||||
auto adqm_log = &Poco::Logger::get("ADQM");
|
||||
LOG_DEBUG(adqm_log,"KerberosInit: begin");
|
||||
//LOG_DEBUG(adqm_log,"KerberosInit: do nothing"); return 0;
|
||||
|
||||
krb5_error_code ret;
|
||||
|
||||
|
@ -118,7 +118,8 @@ void HDFSBuilderWrapper::runKinit()
|
||||
LOG_DEBUG(&Poco::Logger::get("HDFSClient"), "ADQM: running KerberosInit");
|
||||
std::unique_lock<std::mutex> lck(kinit_mtx);
|
||||
KerberosInit k_init;
|
||||
try {
|
||||
try
|
||||
{
|
||||
k_init.init(hadoop_kerberos_keytab,hadoop_kerberos_principal,hadoop_security_kerberos_ticket_cache_path);
|
||||
} catch (const DB::Exception & e) {
|
||||
throw Exception("KerberosInit failure: "+ DB::getExceptionMessage(e, false), ErrorCodes::BAD_ARGUMENTS);
|
||||
|
@ -44,6 +44,8 @@
|
||||
#include <Common/CurrentMetrics.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
|
||||
#include <Access/KerberosInit.h>
|
||||
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
@ -515,6 +517,25 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & conf)
|
||||
if (config.has(config_prefix))
|
||||
loadFromConfig(conf, config, config_prefix);
|
||||
|
||||
if (conf.has_property("sasl.kerberos.keytab") && conf.has_property("sasl.kerberos.principal"))
|
||||
{
|
||||
LOG_DEBUG(log, "ADQM: preparing KerberosInit");
|
||||
String keytab = conf.get("sasl.kerberos.keytab");
|
||||
String principal = conf.get("sasl.kerberos.principal");
|
||||
LOG_DEBUG(log, "ADQM: keytab: {}, principal: {}", keytab, principal);
|
||||
LOG_DEBUG(log, "ADQM: running KerberosInit");
|
||||
KerberosInit k_init;
|
||||
try
|
||||
{
|
||||
k_init.init(keytab,principal);
|
||||
} catch (const DB::Exception & e) {
|
||||
LOG_ERROR(log, "ADQM: KerberosInit failure: {}", DB::getExceptionMessage(e, false));
|
||||
}
|
||||
LOG_DEBUG(log, "ADQM: finished KerberosInit");
|
||||
conf.set("sasl.kerberos.kinit.cmd","");
|
||||
conf.set("sasl.kerberos.min.time.before.relogin","0");
|
||||
}
|
||||
|
||||
// Update consumer topic-specific configuration
|
||||
for (const auto & topic : topics)
|
||||
{
|
||||
|
@ -122,6 +122,7 @@ def test_kafka_json_as_string(kafka_cluster):
|
||||
{"t": 124, "e": {"x": "test"} }
|
||||
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
|
||||
"""
|
||||
logging.debug("ADQM: logs: %s", instance.grep_in_log("ADQM"))
|
||||
assert TSV(result) == TSV(expected)
|
||||
assert instance.contains_in_log(
|
||||
"Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows"
|
||||
@ -182,7 +183,8 @@ def test_kafka_json_as_string_no_kdc(kafka_cluster):
|
||||
assert TSV(result) == TSV(expected)
|
||||
assert instance.contains_in_log("StorageKafka (kafka_no_kdc): Nothing to commit")
|
||||
assert instance.contains_in_log("Ticket expired")
|
||||
assert instance.contains_in_log("Kerberos ticket refresh failed")
|
||||
#~ assert instance.contains_in_log("Kerberos ticket refresh failed")
|
||||
assert instance.contains_in_log("KerberosInit failure:")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
Loading…
Reference in New Issue
Block a user