add predefined macros support in kafka engine settings

This commit is contained in:
Yury Bogomolov 2023-07-21 01:06:49 +04:00
parent eaf3cffb92
commit c8b128aad4
3 changed files with 54 additions and 5 deletions

View File

@ -250,15 +250,20 @@ StorageKafka::StorageKafka(
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, kafka_settings(std::move(kafka_settings_))
, topics(parseTopics(getContext()->getMacros()->expand(kafka_settings->kafka_topic_list.value)))
, brokers(getContext()->getMacros()->expand(kafka_settings->kafka_broker_list.value))
, group(getContext()->getMacros()->expand(kafka_settings->kafka_group_name.value))
, macros_info([&table_id_](){
Macros::MacroExpansionInfo info;
info.table_id = table_id_;
return info;
}())
, topics(parseTopics(getContext()->getMacros()->expand(kafka_settings->kafka_topic_list.value, macros_info)))
, brokers(getContext()->getMacros()->expand(kafka_settings->kafka_broker_list.value, macros_info))
, group(getContext()->getMacros()->expand(kafka_settings->kafka_group_name.value, macros_info))
, client_id(
kafka_settings->kafka_client_id.value.empty() ? getDefaultClientId(table_id_)
: getContext()->getMacros()->expand(kafka_settings->kafka_client_id.value))
: getContext()->getMacros()->expand(kafka_settings->kafka_client_id.value, macros_info))
, format_name(getContext()->getMacros()->expand(kafka_settings->kafka_format.value))
, max_rows_per_message(kafka_settings->kafka_max_rows_per_message.value)
, schema_name(getContext()->getMacros()->expand(kafka_settings->kafka_schema.value))
, schema_name(getContext()->getMacros()->expand(kafka_settings->kafka_schema.value, macros_info))
, num_consumers(kafka_settings->kafka_num_consumers.value)
, log(&Poco::Logger::get("StorageKafka (" + table_id_.table_name + ")"))
, semaphore(0, static_cast<int>(num_consumers))

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/Macros.h>
#include <Core/BackgroundSchedulePool.h>
#include <Storages/IStorage.h>
#include <Storages/Kafka/KafkaConsumer.h>
@ -79,6 +80,7 @@ public:
private:
// Configuration and state
std::unique_ptr<KafkaSettings> kafka_settings;
Macros::MacroExpansionInfo macros_info;
const Names topics;
const String brokers;
const String group;

View File

@ -444,6 +444,48 @@ def test_kafka_settings_new_syntax(kafka_cluster):
assert members[0]["client_id"] == "instance test 1234"
def test_kafka_settings_predefined_macros(kafka_cluster):
instance.query(
"""
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = '{kafka_broker}:19092',
kafka_topic_list = '{database}_{table}_topic',
kafka_group_name = '{database}_{table}_group',
kafka_format = '{kafka_format_json_each_row}',
kafka_row_delimiter = '\\n',
kafka_commit_on_select = 1,
kafka_client_id = '{database}_{table} test 1234',
kafka_skip_broken_messages = 1;
"""
)
messages = []
for i in range(25):
messages.append(json.dumps({"key": i, "value": i}))
kafka_produce(kafka_cluster, "test_kafka", messages)
# Insert couple of malformed messages.
kafka_produce(kafka_cluster, "test_kafka", ["}{very_broken_message,"])
kafka_produce(kafka_cluster, "test_kafka", ["}another{very_broken_message,"])
messages = []
for i in range(25, 50):
messages.append(json.dumps({"key": i, "value": i}))
kafka_produce(kafka_cluster, "test_kafka", messages)
result = ""
while True:
result += instance.query("SELECT * FROM test.kafka", ignore_error=True)
if kafka_check_result(result):
break
kafka_check_result(result, True)
members = describe_consumer_group(kafka_cluster, "new")
assert members[0]["client_id"] == "test_kafka test 1234"
def test_kafka_json_as_string(kafka_cluster):
kafka_produce(
kafka_cluster,