Merge pull request #38105 from arenadata/ADQM-419

Add kerberosInit function as a replacement for kinit executable calls in Kafka and HDFS
This commit is contained in:
Yakov Olkhovskiy 2022-06-27 14:19:24 -04:00 committed by GitHub
commit d5f65ece9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 408 additions and 62 deletions

View File

@ -186,7 +186,6 @@ Similar to GraphiteMergeTree, the HDFS engine supports extended configuration us
| - | - | | - | - |
|hadoop\_kerberos\_keytab | "" | |hadoop\_kerberos\_keytab | "" |
|hadoop\_kerberos\_principal | "" | |hadoop\_kerberos\_principal | "" |
|hadoop\_kerberos\_kinit\_command | kinit |
|libhdfs3\_conf | "" | |libhdfs3\_conf | "" |
### Limitations {#limitations} ### Limitations {#limitations}
@ -200,8 +199,7 @@ Note that due to libhdfs3 limitations only old-fashioned approach is supported,
datanode communications are not secured by SASL (`HADOOP_SECURE_DN_USER` is a reliable indicator of such datanode communications are not secured by SASL (`HADOOP_SECURE_DN_USER` is a reliable indicator of such
security approach). Use `tests/integration/test_storage_kerberized_hdfs/hdfs_configs/bootstrap.sh` for reference. security approach). Use `tests/integration/test_storage_kerberized_hdfs/hdfs_configs/bootstrap.sh` for reference.
If `hadoop_kerberos_keytab`, `hadoop_kerberos_principal` or `hadoop_kerberos_kinit_command` is specified, `kinit` will be invoked. `hadoop_kerberos_keytab` and `hadoop_kerberos_principal` are mandatory in this case. `kinit` tool and krb5 configuration files are required. If `hadoop_kerberos_keytab`, `hadoop_kerberos_principal` or `hadoop_security_kerberos_ticket_cache_path` are specified, Kerberos authentication will be used. `hadoop_kerberos_keytab` and `hadoop_kerberos_principal` are mandatory in this case.
## HDFS Namenode HA support {#namenode-ha} ## HDFS Namenode HA support {#namenode-ha}
libhdfs3 support HDFS namenode HA. libhdfs3 support HDFS namenode HA.

View File

@ -168,7 +168,7 @@ For a list of possible configuration options, see the [librdkafka configuration
### Kerberos support {#kafka-kerberos-support} ### Kerberos support {#kafka-kerberos-support}
To deal with Kerberos-aware Kafka, add `security_protocol` child element with `sasl_plaintext` value. It is enough if Kerberos ticket-granting ticket is obtained and cached by OS facilities. To deal with Kerberos-aware Kafka, add `security_protocol` child element with `sasl_plaintext` value. It is enough if Kerberos ticket-granting ticket is obtained and cached by OS facilities.
ClickHouse is able to maintain Kerberos credentials using a keytab file. Consider `sasl_kerberos_service_name`, `sasl_kerberos_keytab`, `sasl_kerberos_principal` and `sasl.kerberos.kinit.cmd` child elements. ClickHouse is able to maintain Kerberos credentials using a keytab file. Consider `sasl_kerberos_service_name`, `sasl_kerberos_keytab` and `sasl_kerberos_principal` child elements.
Example: Example:

View File

@ -183,7 +183,6 @@ CREATE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9
| - | - | | - | - |
|hadoop\_kerberos\_keytab | "" | |hadoop\_kerberos\_keytab | "" |
|hadoop\_kerberos\_principal | "" | |hadoop\_kerberos\_principal | "" |
|hadoop\_kerberos\_kinit\_command | kinit |
### Ограничения {#limitations} ### Ограничения {#limitations}
* `hadoop_security_kerberos_ticket_cache_path` и `libhdfs3_conf` могут быть определены только на глобальном, а не на пользовательском уровне * `hadoop_security_kerberos_ticket_cache_path` и `libhdfs3_conf` могут быть определены только на глобальном, а не на пользовательском уровне
@ -196,7 +195,7 @@ CREATE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9
коммуникация с узлами данных не защищена SASL (`HADOOP_SECURE_DN_USER` надежный показатель такого коммуникация с узлами данных не защищена SASL (`HADOOP_SECURE_DN_USER` надежный показатель такого
подхода к безопасности). Используйте `tests/integration/test_storage_kerberized_hdfs/hdfs_configs/bootstrap.sh` для примера настроек. подхода к безопасности). Используйте `tests/integration/test_storage_kerberized_hdfs/hdfs_configs/bootstrap.sh` для примера настроек.
Если `hadoop_kerberos_keytab`, `hadoop_kerberos_principal` или `hadoop_kerberos_kinit_command` указаны в настройках, `kinit` будет вызван. `hadoop_kerberos_keytab` и `hadoop_kerberos_principal` обязательны в этом случае. Необходимо также будет установить `kinit` и файлы конфигурации krb5. Если `hadoop_kerberos_keytab`, `hadoop_kerberos_principal` или `hadoop_security_kerberos_ticket_cache_path` указаны в настройках, будет использоваться аутентификация с помощью Kerberos. `hadoop_kerberos_keytab` и `hadoop_kerberos_principal` обязательны в этом случае.
## Виртуальные столбцы {#virtual-columns} ## Виртуальные столбцы {#virtual-columns}

View File

@ -167,7 +167,7 @@ Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
### Поддержка Kerberos {#kafka-kerberos-support} ### Поддержка Kerberos {#kafka-kerberos-support}
Чтобы начать работу с Kafka с поддержкой Kerberos, добавьте дочерний элемент `security_protocol` со значением `sasl_plaintext`. Этого будет достаточно, если получен тикет на получение тикета (ticket-granting ticket) Kerberos и он кэшируется средствами ОС. Чтобы начать работу с Kafka с поддержкой Kerberos, добавьте дочерний элемент `security_protocol` со значением `sasl_plaintext`. Этого будет достаточно, если получен тикет на получение тикета (ticket-granting ticket) Kerberos и он кэшируется средствами ОС.
ClickHouse может поддерживать учетные данные Kerberos с помощью файла keytab. Рассмотрим дочерние элементы `sasl_kerberos_service_name`, `sasl_kerberos_keytab`, `sasl_kerberos_principal` и `sasl.kerberos.kinit.cmd`. ClickHouse может поддерживать учетные данные Kerberos с помощью файла keytab. Рассмотрим дочерние элементы `sasl_kerberos_service_name`, `sasl_kerberos_keytab` и `sasl_kerberos_principal`.
Пример: Пример:

View File

@ -0,0 +1,3 @@
if (ENABLE_EXAMPLES)
add_subdirectory(examples)
endif()

230
src/Access/KerberosInit.cpp Normal file
View File

@ -0,0 +1,230 @@
#include <Access/KerberosInit.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
#include <Poco/Logger.h>
#include <Loggers/Loggers.h>
#include <filesystem>
#include <boost/core/noncopyable.hpp>
#include <fmt/format.h>
#if USE_KRB5
#include <krb5.h>
#include <mutex>
using namespace DB;
namespace DB
{
namespace ErrorCodes
{
extern const int KERBEROS_ERROR;
}
}
namespace
{
struct K5Data
{
krb5_context ctx;
krb5_ccache out_cc;
krb5_principal me;
char * name;
krb5_boolean switch_to_cache;
};
/**
* This class implements programmatic implementation of kinit.
*/
class KerberosInit : boost::noncopyable
{
public:
void init(const String & keytab_file, const String & principal, const String & cache_name = "");
~KerberosInit();
private:
struct K5Data k5 {};
krb5_ccache defcache = nullptr;
krb5_get_init_creds_opt * options = nullptr;
// Credentials structure including ticket, session key, and lifetime info.
krb5_creds my_creds;
krb5_keytab keytab = nullptr;
krb5_principal defcache_princ = nullptr;
String fmtError(krb5_error_code code) const;
};
}
String KerberosInit::fmtError(krb5_error_code code) const
{
const char *msg;
msg = krb5_get_error_message(k5.ctx, code);
String fmt_error = fmt::format(" ({}, {})", code, msg);
krb5_free_error_message(k5.ctx, msg);
return fmt_error;
}
void KerberosInit::init(const String & keytab_file, const String & principal, const String & cache_name)
{
auto * log = &Poco::Logger::get("KerberosInit");
LOG_TRACE(log,"Trying to authenticate with Kerberos v5");
krb5_error_code ret;
const char *deftype = nullptr;
if (!std::filesystem::exists(keytab_file))
throw Exception("Keytab file does not exist", ErrorCodes::KERBEROS_ERROR);
ret = krb5_init_context(&k5.ctx);
if (ret)
throw Exception(ErrorCodes::KERBEROS_ERROR, "Error while initializing Kerberos 5 library ({})", ret);
if (!cache_name.empty())
{
ret = krb5_cc_resolve(k5.ctx, cache_name.c_str(), &k5.out_cc);
if (ret)
throw Exception("Error in resolving cache" + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
LOG_TRACE(log,"Resolved cache");
}
else
{
// Resolve the default cache and get its type and default principal (if it is initialized).
ret = krb5_cc_default(k5.ctx, &defcache);
if (ret)
throw Exception("Error while getting default cache" + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
LOG_TRACE(log,"Resolved default cache");
deftype = krb5_cc_get_type(k5.ctx, defcache);
if (krb5_cc_get_principal(k5.ctx, defcache, &defcache_princ) != 0)
defcache_princ = nullptr;
}
// Use the specified principal name.
ret = krb5_parse_name_flags(k5.ctx, principal.c_str(), 0, &k5.me);
if (ret)
throw Exception("Error when parsing principal name " + principal + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
// Cache related commands
if (k5.out_cc == nullptr && krb5_cc_support_switch(k5.ctx, deftype))
{
// Use an existing cache for the client principal if we can.
ret = krb5_cc_cache_match(k5.ctx, k5.me, &k5.out_cc);
if (ret && ret != KRB5_CC_NOTFOUND)
throw Exception("Error while searching for cache for " + principal + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
if (0 == ret)
{
LOG_TRACE(log,"Using default cache: {}", krb5_cc_get_name(k5.ctx, k5.out_cc));
k5.switch_to_cache = 1;
}
else if (defcache_princ != nullptr)
{
// Create a new cache to avoid overwriting the initialized default cache.
ret = krb5_cc_new_unique(k5.ctx, deftype, nullptr, &k5.out_cc);
if (ret)
throw Exception("Error while generating new cache" + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
LOG_TRACE(log,"Using default cache: {}", krb5_cc_get_name(k5.ctx, k5.out_cc));
k5.switch_to_cache = 1;
}
}
// Use the default cache if we haven't picked one yet.
if (k5.out_cc == nullptr)
{
k5.out_cc = defcache;
defcache = nullptr;
LOG_TRACE(log,"Using default cache: {}", krb5_cc_get_name(k5.ctx, k5.out_cc));
}
ret = krb5_unparse_name(k5.ctx, k5.me, &k5.name);
if (ret)
throw Exception("Error when unparsing name" + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
LOG_TRACE(log,"Using principal: {}", k5.name);
// Allocate a new initial credential options structure.
ret = krb5_get_init_creds_opt_alloc(k5.ctx, &options);
if (ret)
throw Exception("Error in options allocation" + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
// Resolve keytab
ret = krb5_kt_resolve(k5.ctx, keytab_file.c_str(), &keytab);
if (ret)
throw Exception("Error in resolving keytab "+keytab_file + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
LOG_TRACE(log,"Using keytab: {}", keytab_file);
// Set an output credential cache in initial credential options.
ret = krb5_get_init_creds_opt_set_out_ccache(k5.ctx, options, k5.out_cc);
if (ret)
throw Exception("Error in setting output credential cache" + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
// Action: init or renew
LOG_TRACE(log,"Trying to renew credentials");
memset(&my_creds, 0, sizeof(my_creds));
// Get renewed credential from KDC using an existing credential from output cache.
ret = krb5_get_renewed_creds(k5.ctx, &my_creds, k5.me, k5.out_cc, nullptr);
if (ret)
{
LOG_TRACE(log,"Renew failed {}", fmtError(ret));
LOG_TRACE(log,"Trying to get initial credentials");
// Request KDC for an initial credentials using keytab.
ret = krb5_get_init_creds_keytab(k5.ctx, &my_creds, k5.me, keytab, 0, nullptr, options);
if (ret)
throw Exception("Error in getting initial credentials" + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
else
LOG_TRACE(log,"Got initial credentials");
}
else
{
LOG_TRACE(log,"Successful renewal");
// Initialize a credential cache. Destroy any existing contents of cache and initialize it for the default principal.
ret = krb5_cc_initialize(k5.ctx, k5.out_cc, k5.me);
if (ret)
throw Exception("Error when initializing cache" + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
LOG_TRACE(log,"Initialized cache");
// Store credentials in a credential cache.
ret = krb5_cc_store_cred(k5.ctx, k5.out_cc, &my_creds);
if (ret)
LOG_TRACE(log,"Error while storing credentials");
LOG_TRACE(log,"Stored credentials");
}
if (k5.switch_to_cache)
{
// Make a credential cache the primary cache for its collection.
ret = krb5_cc_switch(k5.ctx, k5.out_cc);
if (ret)
throw Exception("Error while switching to new cache" + fmtError(ret), ErrorCodes::KERBEROS_ERROR);
}
LOG_TRACE(log,"Authenticated to Kerberos v5");
}
KerberosInit::~KerberosInit()
{
if (k5.ctx)
{
if (defcache)
krb5_cc_close(k5.ctx, defcache);
krb5_free_principal(k5.ctx, defcache_princ);
if (options)
krb5_get_init_creds_opt_free(k5.ctx, options);
if (my_creds.client == k5.me)
my_creds.client = nullptr;
krb5_free_cred_contents(k5.ctx, &my_creds);
if (keytab)
krb5_kt_close(k5.ctx, keytab);
krb5_free_unparsed_name(k5.ctx, k5.name);
krb5_free_principal(k5.ctx, k5.me);
if (k5.out_cc != nullptr)
krb5_cc_close(k5.ctx, k5.out_cc);
krb5_free_context(k5.ctx);
}
}
void kerberosInit(const String & keytab_file, const String & principal, const String & cache_name)
{
// Using mutex to prevent cache file corruptions
static std::mutex kinit_mtx;
std::unique_lock<std::mutex> lck(kinit_mtx);
KerberosInit k_init;
k_init.init(keytab_file, principal, cache_name);
}
#endif // USE_KRB5

11
src/Access/KerberosInit.h Normal file
View File

@ -0,0 +1,11 @@
#pragma once
#include "config_core.h"
#include <base/types.h>
#if USE_KRB5
void kerberosInit(const String & keytab_file, const String & principal, const String & cache_name = "");
#endif // USE_KRB5

View File

@ -0,0 +1,4 @@
if (TARGET ch_contrib::krb5)
add_executable (kerberos_init kerberos_init.cpp)
target_link_libraries (kerberos_init PRIVATE dbms ch_contrib::krb5)
endif()

View File

@ -0,0 +1,47 @@
#include <iostream>
#include <Poco/ConsoleChannel.h>
#include <Poco/Logger.h>
#include <Poco/AutoPtr.h>
#include <Common/Exception.h>
#include <Access/KerberosInit.h>
/** The example demonstrates using of kerberosInit function to obtain and cache Kerberos ticket-granting ticket.
* The first argument specifies keytab file. The second argument specifies principal name.
* The third argument is optional. It specifies credentials cache location.
* After successful run of kerberos_init it is useful to call klist command to list cached Kerberos tickets.
* It is also useful to run kdestroy to destroy Kerberos tickets if needed.
*/
using namespace DB;
int main(int argc, char ** argv)
{
std::cout << "Kerberos Init" << "\n";
if (argc < 3)
{
std::cout << "kerberos_init obtains and caches an initial ticket-granting ticket for principal." << "\n\n";
std::cout << "Usage:" << "\n" << " kerberos_init keytab principal [cache]" << "\n";
return 0;
}
String cache_name = "";
if (argc == 4)
cache_name = argv[3];
Poco::AutoPtr<Poco::ConsoleChannel> app_channel(new Poco::ConsoleChannel(std::cerr));
Poco::Logger::root().setChannel(app_channel);
Poco::Logger::root().setLevel("trace");
try
{
kerberosInit(argv[1], argv[2], cache_name);
}
catch (const Exception & e)
{
std::cout << "KerberosInit failure: " << getExceptionMessage(e, false) << "\n";
return -1;
}
std::cout << "Done" << "\n";
return 0;
}

View File

@ -10,7 +10,9 @@
#include <IO/WriteBufferFromString.h> #include <IO/WriteBufferFromString.h>
#include <IO/Operators.h> #include <IO/Operators.h>
#include <Common/logger_useful.h> #include <Common/logger_useful.h>
#if USE_KRB5
#include <Access/KerberosInit.h>
#endif // USE_KRB5
namespace DB namespace DB
{ {
@ -18,8 +20,10 @@ namespace ErrorCodes
{ {
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
extern const int NETWORK_ERROR; extern const int NETWORK_ERROR;
#if USE_KRB5
extern const int EXCESSIVE_ELEMENT_IN_CONFIG; extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int NO_ELEMENTS_IN_CONFIG; extern const int KERBEROS_ERROR;
#endif // USE_KRB5
} }
const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs"; const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs";
@ -40,25 +44,28 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration
String key_name; String key_name;
if (key == "hadoop_kerberos_keytab") if (key == "hadoop_kerberos_keytab")
{ {
#if USE_KRB5
need_kinit = true; need_kinit = true;
hadoop_kerberos_keytab = config.getString(key_path); hadoop_kerberos_keytab = config.getString(key_path);
#else // USE_KRB5
LOG_WARNING(&Poco::Logger::get("HDFSClient"), "hadoop_kerberos_keytab parameter is ignored because ClickHouse was built without support of krb5 library.");
#endif // USE_KRB5
continue; continue;
} }
else if (key == "hadoop_kerberos_principal") else if (key == "hadoop_kerberos_principal")
{ {
#if USE_KRB5
need_kinit = true; need_kinit = true;
hadoop_kerberos_principal = config.getString(key_path); hadoop_kerberos_principal = config.getString(key_path);
hdfsBuilderSetPrincipal(hdfs_builder, hadoop_kerberos_principal.c_str()); hdfsBuilderSetPrincipal(hdfs_builder, hadoop_kerberos_principal.c_str());
continue; #else // USE_KRB5
} LOG_WARNING(&Poco::Logger::get("HDFSClient"), "hadoop_kerberos_principal parameter is ignored because ClickHouse was built without support of krb5 library.");
else if (key == "hadoop_kerberos_kinit_command") #endif // USE_KRB5
{
need_kinit = true;
hadoop_kerberos_kinit_command = config.getString(key_path);
continue; continue;
} }
else if (key == "hadoop_security_kerberos_ticket_cache_path") else if (key == "hadoop_security_kerberos_ticket_cache_path")
{ {
#if USE_KRB5
if (isUser) if (isUser)
{ {
throw Exception("hadoop.security.kerberos.ticket.cache.path cannot be set per user", throw Exception("hadoop.security.kerberos.ticket.cache.path cannot be set per user",
@ -67,6 +74,9 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration
hadoop_security_kerberos_ticket_cache_path = config.getString(key_path); hadoop_security_kerberos_ticket_cache_path = config.getString(key_path);
// standard param - pass further // standard param - pass further
#else // USE_KRB5
LOG_WARNING(&Poco::Logger::get("HDFSClient"), "hadoop.security.kerberos.ticket.cache.path parameter is ignored because ClickHouse was built without support of krb5 library.");
#endif // USE_KRB5
} }
key_name = boost::replace_all_copy(key, "_", "."); key_name = boost::replace_all_copy(key, "_", ".");
@ -76,44 +86,21 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration
} }
} }
String HDFSBuilderWrapper::getKinitCmd() #if USE_KRB5
{
if (hadoop_kerberos_keytab.empty() || hadoop_kerberos_principal.empty())
{
throw Exception("Not enough parameters to run kinit",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
}
WriteBufferFromOwnString ss;
String cache_name = hadoop_security_kerberos_ticket_cache_path.empty() ?
String() :
(String(" -c \"") + hadoop_security_kerberos_ticket_cache_path + "\"");
// command to run looks like
// kinit -R -t /keytab_dir/clickhouse.keytab -k somebody@TEST.CLICKHOUSE.TECH || ..
ss << hadoop_kerberos_kinit_command << cache_name <<
" -R -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal <<
"|| " << hadoop_kerberos_kinit_command << cache_name << " -t \"" <<
hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal;
return ss.str();
}
void HDFSBuilderWrapper::runKinit() void HDFSBuilderWrapper::runKinit()
{ {
String cmd = getKinitCmd(); LOG_DEBUG(&Poco::Logger::get("HDFSClient"), "Running KerberosInit");
LOG_DEBUG(&Poco::Logger::get("HDFSClient"), "running kinit: {}", cmd); try
std::unique_lock<std::mutex> lck(kinit_mtx);
auto command = ShellCommand::execute(cmd);
auto status = command->tryWait();
if (status)
{ {
throw Exception("kinit failure: " + cmd, ErrorCodes::BAD_ARGUMENTS); kerberosInit(hadoop_kerberos_keytab,hadoop_kerberos_principal,hadoop_security_kerberos_ticket_cache_path);
} }
catch (const DB::Exception & e)
{
throw Exception("KerberosInit failure: "+ getExceptionMessage(e, false), ErrorCodes::KERBEROS_ERROR);
} }
LOG_DEBUG(&Poco::Logger::get("HDFSClient"), "Finished KerberosInit");
}
#endif // USE_KRB5
HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration & config) HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration & config)
{ {
@ -184,16 +171,16 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::A
} }
} }
#if USE_KRB5
if (builder.need_kinit) if (builder.need_kinit)
{ {
builder.runKinit(); builder.runKinit();
} }
#endif // USE_KRB5
return builder; return builder;
} }
std::mutex HDFSBuilderWrapper::kinit_mtx;
HDFSFSPtr createHDFSFS(hdfsBuilder * builder) HDFSFSPtr createHDFSFS(hdfsBuilder * builder)
{ {
HDFSFSPtr fs(hdfsBuilderConnect(builder)); HDFSFSPtr fs(hdfsBuilderConnect(builder));

View File

@ -9,7 +9,6 @@
#include <hdfs/hdfs.h> #include <hdfs/hdfs.h>
#include <base/types.h> #include <base/types.h>
#include <mutex>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
@ -69,10 +68,6 @@ public:
private: private:
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & prefix, bool isUser = false); void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & prefix, bool isUser = false);
String getKinitCmd();
void runKinit();
// hdfs builder relies on an external config data storage // hdfs builder relies on an external config data storage
std::pair<String, String>& keep(const String & k, const String & v) std::pair<String, String>& keep(const String & k, const String & v)
{ {
@ -80,14 +75,15 @@ private:
} }
hdfsBuilder * hdfs_builder; hdfsBuilder * hdfs_builder;
std::vector<std::pair<String, String>> config_stor;
#if USE_KRB5
void runKinit();
String hadoop_kerberos_keytab; String hadoop_kerberos_keytab;
String hadoop_kerberos_principal; String hadoop_kerberos_principal;
String hadoop_kerberos_kinit_command = "kinit";
String hadoop_security_kerberos_ticket_cache_path; String hadoop_security_kerberos_ticket_cache_path;
static std::mutex kinit_mtx;
std::vector<std::pair<String, String>> config_stor;
bool need_kinit{false}; bool need_kinit{false};
#endif // USE_KRB5
}; };
using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsDeleter>; using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsDeleter>;

View File

@ -45,7 +45,9 @@
#include <Common/CurrentMetrics.h> #include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#if USE_KRB5
#include <Access/KerberosInit.h>
#endif // USE_KRB5
namespace CurrentMetrics namespace CurrentMetrics
{ {
@ -517,6 +519,33 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & conf)
if (config.has(config_prefix)) if (config.has(config_prefix))
loadFromConfig(conf, config, config_prefix); loadFromConfig(conf, config, config_prefix);
#if USE_KRB5
if (conf.has_property("sasl.kerberos.kinit.cmd"))
LOG_WARNING(log, "sasl.kerberos.kinit.cmd configuration parameter is ignored.");
conf.set("sasl.kerberos.kinit.cmd","");
conf.set("sasl.kerberos.min.time.before.relogin","0");
if (conf.has_property("sasl.kerberos.keytab") && conf.has_property("sasl.kerberos.principal"))
{
String keytab = conf.get("sasl.kerberos.keytab");
String principal = conf.get("sasl.kerberos.principal");
LOG_DEBUG(log, "Running KerberosInit");
try
{
kerberosInit(keytab,principal);
}
catch (const Exception & e)
{
LOG_ERROR(log, "KerberosInit failure: {}", getExceptionMessage(e, false));
}
LOG_DEBUG(log, "Finished KerberosInit");
}
#else // USE_KRB5
if (conf.has_property("sasl.kerberos.keytab") || conf.has_property("sasl.kerberos.principal"))
LOG_WARNING(log, "Kerberos-related parameters are ignored because ClickHouse was built without support of krb5 library.");
#endif // USE_KRB5
// Update consumer topic-specific configuration // Update consumer topic-specific configuration
for (const auto & topic : topics) for (const auto & topic : topics)
{ {

View File

@ -113,7 +113,7 @@ def test_read_table_expired(started_cluster):
) )
assert False, "Exception have to be thrown" assert False, "Exception have to be thrown"
except Exception as ex: except Exception as ex:
assert "DB::Exception: kinit failure:" in str(ex) assert "DB::Exception: KerberosInit failure:" in str(ex)
started_cluster.unpause_container("hdfskerberos") started_cluster.unpause_container("hdfskerberos")

View File

@ -128,6 +128,48 @@ def test_kafka_json_as_string(kafka_cluster):
) )
def test_kafka_json_as_string_request_new_ticket_after_expiration(kafka_cluster):
# Ticket should be expired after the wait time
# On run of SELECT query new ticket should be requested and SELECT query should run fine.
kafka_produce(
kafka_cluster,
"kafka_json_as_string",
[
'{"t": 123, "e": {"x": "woof"} }',
"",
'{"t": 124, "e": {"x": "test"} }',
'{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}',
],
)
instance.query(
"""
CREATE TABLE test.kafka (field String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
kafka_topic_list = 'kafka_json_as_string',
kafka_commit_on_select = 1,
kafka_group_name = 'kafka_json_as_string',
kafka_format = 'JSONAsString',
kafka_flush_interval_ms=1000;
"""
)
time.sleep(45) # wait for ticket expiration
result = instance.query("SELECT * FROM test.kafka;")
expected = """\
{"t": 123, "e": {"x": "woof"} }
{"t": 124, "e": {"x": "test"} }
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
"""
assert TSV(result) == TSV(expected)
assert instance.contains_in_log(
"Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows"
)
def test_kafka_json_as_string_no_kdc(kafka_cluster): def test_kafka_json_as_string_no_kdc(kafka_cluster):
# When the test is run alone (not preceded by any other kerberized kafka test), # When the test is run alone (not preceded by any other kerberized kafka test),
# we need a ticket to # we need a ticket to
@ -182,7 +224,7 @@ def test_kafka_json_as_string_no_kdc(kafka_cluster):
assert TSV(result) == TSV(expected) assert TSV(result) == TSV(expected)
assert instance.contains_in_log("StorageKafka (kafka_no_kdc): Nothing to commit") assert instance.contains_in_log("StorageKafka (kafka_no_kdc): Nothing to commit")
assert instance.contains_in_log("Ticket expired") assert instance.contains_in_log("Ticket expired")
assert instance.contains_in_log("Kerberos ticket refresh failed") assert instance.contains_in_log("KerberosInit failure:")
if __name__ == "__main__": if __name__ == "__main__":