mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 09:32:01 +00:00
Add new integration test for kerberized Kafka; remove old kinit code from HDFSCommon
This commit is contained in:
parent
a156a77890
commit
2b76d0c6a9
@ -13,7 +13,6 @@ int KerberosInit::init(const String & keytab_file, const String & principal, con
|
|||||||
|
|
||||||
krb5_error_code ret;
|
krb5_error_code ret;
|
||||||
|
|
||||||
// todo: use deftype
|
|
||||||
const char *deftype = nullptr;
|
const char *deftype = nullptr;
|
||||||
int flags = 0;
|
int flags = 0;
|
||||||
|
|
||||||
|
@ -77,44 +77,8 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
String HDFSBuilderWrapper::getKinitCmd()
|
|
||||||
{
|
|
||||||
|
|
||||||
if (hadoop_kerberos_keytab.empty() || hadoop_kerberos_principal.empty())
|
|
||||||
{
|
|
||||||
throw Exception("Not enough parameters to run kinit",
|
|
||||||
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
|
|
||||||
}
|
|
||||||
|
|
||||||
WriteBufferFromOwnString ss;
|
|
||||||
|
|
||||||
String cache_name = hadoop_security_kerberos_ticket_cache_path.empty() ?
|
|
||||||
String() :
|
|
||||||
(String(" -c \"") + hadoop_security_kerberos_ticket_cache_path + "\"");
|
|
||||||
|
|
||||||
// command to run looks like
|
|
||||||
// kinit -R -t /keytab_dir/clickhouse.keytab -k somebody@TEST.CLICKHOUSE.TECH || ..
|
|
||||||
ss << hadoop_kerberos_kinit_command << cache_name <<
|
|
||||||
" -R -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal <<
|
|
||||||
"|| " << hadoop_kerberos_kinit_command << cache_name << " -t \"" <<
|
|
||||||
hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal;
|
|
||||||
return ss.str();
|
|
||||||
}
|
|
||||||
|
|
||||||
void HDFSBuilderWrapper::runKinit()
|
void HDFSBuilderWrapper::runKinit()
|
||||||
{ /*
|
{
|
||||||
String cmd = getKinitCmd();
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("HDFSClient"), "running kinit: {}", cmd);
|
|
||||||
|
|
||||||
std::unique_lock<std::mutex> lck(kinit_mtx);
|
|
||||||
|
|
||||||
auto command = ShellCommand::execute(cmd);
|
|
||||||
auto status = command->tryWait();
|
|
||||||
if (status)
|
|
||||||
{
|
|
||||||
throw Exception("kinit failure: " + cmd, ErrorCodes::BAD_ARGUMENTS);
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("HDFSClient"), "ADQM: running KerberosInit");
|
LOG_DEBUG(&Poco::Logger::get("HDFSClient"), "ADQM: running KerberosInit");
|
||||||
std::unique_lock<std::mutex> lck(kinit_mtx);
|
std::unique_lock<std::mutex> lck(kinit_mtx);
|
||||||
KerberosInit k_init;
|
KerberosInit k_init;
|
||||||
|
@ -69,8 +69,6 @@ public:
|
|||||||
private:
|
private:
|
||||||
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & prefix, bool isUser = false);
|
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & prefix, bool isUser = false);
|
||||||
|
|
||||||
String getKinitCmd();
|
|
||||||
|
|
||||||
void runKinit();
|
void runKinit();
|
||||||
|
|
||||||
// hdfs builder relies on an external config data storage
|
// hdfs builder relies on an external config data storage
|
||||||
|
@ -122,12 +122,51 @@ def test_kafka_json_as_string(kafka_cluster):
|
|||||||
{"t": 124, "e": {"x": "test"} }
|
{"t": 124, "e": {"x": "test"} }
|
||||||
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
|
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
|
||||||
"""
|
"""
|
||||||
logging.debug("ADQM: logs: %s", instance.grep_in_log("ADQM"))
|
|
||||||
assert TSV(result) == TSV(expected)
|
assert TSV(result) == TSV(expected)
|
||||||
assert instance.contains_in_log(
|
assert instance.contains_in_log(
|
||||||
"Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows"
|
"Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_kafka_json_as_string_request_new_ticket_after_expiration(kafka_cluster):
|
||||||
|
# Ticket should be expired after the wait time
|
||||||
|
# On run of SELECT query new ticket should be requested and SELECT query should run fine.
|
||||||
|
|
||||||
|
kafka_produce(
|
||||||
|
kafka_cluster,
|
||||||
|
"kafka_json_as_string",
|
||||||
|
[
|
||||||
|
'{"t": 123, "e": {"x": "woof"} }',
|
||||||
|
"",
|
||||||
|
'{"t": 124, "e": {"x": "test"} }',
|
||||||
|
'{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}',
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
instance.query(
|
||||||
|
"""
|
||||||
|
CREATE TABLE test.kafka (field String)
|
||||||
|
ENGINE = Kafka
|
||||||
|
SETTINGS kafka_broker_list = 'kerberized_kafka1:19092',
|
||||||
|
kafka_topic_list = 'kafka_json_as_string',
|
||||||
|
kafka_commit_on_select = 1,
|
||||||
|
kafka_group_name = 'kafka_json_as_string',
|
||||||
|
kafka_format = 'JSONAsString',
|
||||||
|
kafka_flush_interval_ms=1000;
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
time.sleep(45) # wait for ticket expiration
|
||||||
|
|
||||||
|
result = instance.query("SELECT * FROM test.kafka;")
|
||||||
|
expected = """\
|
||||||
|
{"t": 123, "e": {"x": "woof"} }
|
||||||
|
{"t": 124, "e": {"x": "test"} }
|
||||||
|
{"F1":"V1","F2":{"F21":"V21","F22":{},"F23":"V23","F24":"2019-12-24T16:28:04"},"F3":"V3"}
|
||||||
|
"""
|
||||||
|
assert TSV(result) == TSV(expected)
|
||||||
|
assert instance.contains_in_log(
|
||||||
|
"Parsing of message (topic: kafka_json_as_string, partition: 0, offset: 1) return no rows"
|
||||||
|
)
|
||||||
|
|
||||||
def test_kafka_json_as_string_no_kdc(kafka_cluster):
|
def test_kafka_json_as_string_no_kdc(kafka_cluster):
|
||||||
# When the test is run alone (not preceded by any other kerberized kafka test),
|
# When the test is run alone (not preceded by any other kerberized kafka test),
|
||||||
|
Loading…
Reference in New Issue
Block a user