Add named collection for kafka / rabbit

This commit is contained in:
kssenii 2021-11-23 17:52:25 +03:00
parent 1bf30709f0
commit 980c87c466
10 changed files with 187 additions and 23 deletions

View File

@ -8,6 +8,12 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <IO/WriteBufferFromString.h>
#if USE_AMQPCPP
#include <Storages/RabbitMQ/RabbitMQSettings.h>
#endif
#if USE_RDKAFKA
#include <Storages/Kafka/KafkaSettings.h>
#endif
namespace DB
{
@ -359,4 +365,66 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(const
return std::nullopt;
}
template<typename T>
bool getExternalDataSourceConfiguration(const ASTs & args, BaseSettings<T> & settings, ContextPtr context)
{
if (args.empty())
return false;
if (const auto * collection = typeid_cast<const ASTIdentifier *>(args[0].get()))
{
const auto & config = context->getConfigRef();
const auto & config_prefix = fmt::format("named_collections.{}", collection->name());
if (!config.has(config_prefix))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no collection named `{}` in config", collection->name());
SettingsChanges config_settings;
for (auto & setting : settings.all())
{
const auto & setting_name = setting.getName();
auto setting_value = config.getString(config_prefix + '.' + setting_name, "");
if (!setting_value.empty())
{
std::cerr << setting_name << " " << setting_value << "\n";
config_settings.emplace_back(setting_name, setting_value);
}
}
/// Check key-value arguments.
for (size_t i = 1; i < args.size(); ++i)
{
if (const auto * ast_function = typeid_cast<const ASTFunction *>(args[i].get()))
{
const auto * args_expr = assert_cast<const ASTExpressionList *>(ast_function->arguments.get());
auto function_args = args_expr->children;
if (function_args.size() != 2)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument");
auto arg_name = function_args[0]->as<ASTIdentifier>()->name();
auto arg_value_ast = evaluateConstantExpressionOrIdentifierAsLiteral(function_args[1], context);
auto arg_value = arg_value_ast->as<ASTLiteral>()->value;
config_settings.emplace_back(arg_name, arg_value);
}
else
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected key-value defined argument");
}
}
settings.applyChanges(config_settings);
return true;
}
return false;
}
#if USE_AMQPCPP
template
bool getExternalDataSourceConfiguration(const ASTs & args, BaseSettings<RabbitMQSettingsTraits> & settings, ContextPtr context);
#endif
#if USE_RDKAFKA
template
bool getExternalDataSourceConfiguration(const ASTs & args, BaseSettings<KafkaSettingsTraits> & settings, ContextPtr context);
#endif
}

View File

@ -112,4 +112,7 @@ struct URLBasedDataSourceConfig
std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(const ASTs & args, ContextPtr context);
template<typename T>
bool getExternalDataSourceConfiguration(const ASTs & args, BaseSettings<T> & settings, ContextPtr context);
}

View File

@ -19,6 +19,7 @@
#include <Storages/Kafka/KafkaSettings.h>
#include <Storages/Kafka/KafkaSource.h>
#include <Storages/Kafka/WriteBufferToKafkaProducer.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <boost/algorithm/string/replace.hpp>
@ -168,7 +169,9 @@ namespace
}
StorageKafka::StorageKafka(
const StorageID & table_id_, ContextPtr context_, const ColumnsDescription & columns_, std::unique_ptr<KafkaSettings> kafka_settings_)
const StorageID & table_id_, ContextPtr context_,
const ColumnsDescription & columns_, std::unique_ptr<KafkaSettings> kafka_settings_,
const String & collection_name_)
: IStorage(table_id_)
, WithContext(context_->getGlobalContext())
, kafka_settings(std::move(kafka_settings_))
@ -187,6 +190,7 @@ StorageKafka::StorageKafka(
, intermediate_commit(kafka_settings->kafka_commit_every_batch.value)
, settings_adjustments(createSettingsAdjustments())
, thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value)
, collection_name(collection_name_)
{
if (kafka_settings->kafka_handle_error_mode == HandleKafkaErrorMode::STREAM)
{
@ -459,17 +463,25 @@ size_t StorageKafka::getPollTimeoutMillisecond() const
: getContext()->getSettingsRef().stream_poll_timeout_ms.totalMilliseconds();
}
String StorageKafka::getConfigPrefix() const
{
if (!collection_name.empty())
return "named_collections." + collection_name + "." + CONFIG_PREFIX; /// Add one more level to separate librdkafka configuration.
return CONFIG_PREFIX;
}
void StorageKafka::updateConfiguration(cppkafka::Configuration & conf)
{
// Update consumer configuration from the configuration
const auto & config = getContext()->getConfigRef();
if (config.has(CONFIG_PREFIX))
loadFromConfig(conf, config, CONFIG_PREFIX);
auto config_prefix = getConfigPrefix();
if (config.has(config_prefix))
loadFromConfig(conf, config, config_prefix);
// Update consumer topic-specific configuration
for (const auto & topic : topics)
{
const auto topic_config_key = CONFIG_PREFIX + "_" + topic;
const auto topic_config_key = config_prefix + "_" + topic;
if (config.has(topic_config_key))
loadFromConfig(conf, config, topic_config_key);
}
@ -666,6 +678,7 @@ void registerStorageKafka(StorageFactory & factory)
bool has_settings = args.storage_def->settings;
auto kafka_settings = std::make_unique<KafkaSettings>();
auto named_collection = getExternalDataSourceConfiguration(args.engine_args, *kafka_settings, args.getLocalContext());
if (has_settings)
{
kafka_settings->loadFromQuery(*args.storage_def);
@ -729,17 +742,25 @@ void registerStorageKafka(StorageFactory & factory)
* - Do intermediate commits when the batch consumed and handled
*/
/* 0 = raw, 1 = evaluateConstantExpressionAsLiteral, 2=evaluateConstantExpressionOrIdentifierAsLiteral */
CHECK_KAFKA_STORAGE_ARGUMENT(1, kafka_broker_list, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(2, kafka_topic_list, 1)
CHECK_KAFKA_STORAGE_ARGUMENT(3, kafka_group_name, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(4, kafka_format, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(5, kafka_row_delimiter, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(6, kafka_schema, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(7, kafka_num_consumers, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(8, kafka_max_block_size, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(9, kafka_skip_broken_messages, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(10, kafka_commit_every_batch, 0)
String collection_name;
if (named_collection)
{
collection_name = assert_cast<const ASTIdentifier *>(args.engine_args[0].get())->name();
}
else
{
/* 0 = raw, 1 = evaluateConstantExpressionAsLiteral, 2=evaluateConstantExpressionOrIdentifierAsLiteral */
CHECK_KAFKA_STORAGE_ARGUMENT(1, kafka_broker_list, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(2, kafka_topic_list, 1)
CHECK_KAFKA_STORAGE_ARGUMENT(3, kafka_group_name, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(4, kafka_format, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(5, kafka_row_delimiter, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(6, kafka_schema, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(7, kafka_num_consumers, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(8, kafka_max_block_size, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(9, kafka_skip_broken_messages, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(10, kafka_commit_every_batch, 0)
}
#undef CHECK_KAFKA_STORAGE_ARGUMENT
@ -765,7 +786,7 @@ void registerStorageKafka(StorageFactory & factory)
throw Exception("kafka_poll_max_batch_size can not be lower than 1", ErrorCodes::BAD_ARGUMENTS);
}
return StorageKafka::create(args.table_id, args.getContext(), args.columns, std::move(kafka_settings));
return StorageKafka::create(args.table_id, args.getContext(), args.columns, std::move(kafka_settings), collection_name);
};
factory.registerStorage("Kafka", creator_fn, StorageFactory::StorageFeatures{ .supports_settings = true, });

View File

@ -71,7 +71,8 @@ protected:
const StorageID & table_id_,
ContextPtr context_,
const ColumnsDescription & columns_,
std::unique_ptr<KafkaSettings> kafka_settings_);
std::unique_ptr<KafkaSettings> kafka_settings_,
const String & collection_name_);
private:
// Configuration and state
@ -119,8 +120,12 @@ private:
SettingsChanges createSettingsAdjustments();
ConsumerBufferPtr createReadBuffer(const size_t consumer_number);
/// If named_collection is specified.
String collection_name;
// Update Kafka configuration with values from CH user configuration.
void updateConfiguration(cppkafka::Configuration & conf);
String getConfigPrefix() const;
void threadFunc(size_t idx);
size_t getPollMaxBatchSize() const;

View File

@ -28,6 +28,8 @@ namespace DB
M(String, rabbitmq_vhost, "/", "RabbitMQ vhost.", 0) \
M(String, rabbitmq_queue_settings_list, "", "A list of rabbitmq queue settings", 0) \
M(Bool, rabbitmq_queue_consume, false, "Use user-defined queues and do not make any RabbitMQ setup: declaring exchanges, queues, bindings", 0) \
M(String, rabbitmq_username, "", "RabbitMQ username", 0) \
M(String, rabbitmq_password, "", "RabbitMQ password", 0) \
#define LIST_OF_RABBITMQ_SETTINGS(M) \
RABBITMQ_RELATED_SETTINGS(M) \

View File

@ -18,6 +18,7 @@
#include <Storages/RabbitMQ/RabbitMQSource.h>
#include <Storages/RabbitMQ/StorageRabbitMQ.h>
#include <Storages/RabbitMQ/WriteBufferToRabbitMQProducer.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <boost/algorithm/string/replace.hpp>
@ -96,12 +97,14 @@ StorageRabbitMQ::StorageRabbitMQ(
, is_attach(is_attach_)
{
auto parsed_address = parseAddress(getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_host_port), 5672);
auto rabbitmq_username = rabbitmq_settings->rabbitmq_username.value;
auto rabbitmq_password = rabbitmq_settings->rabbitmq_password.value;
configuration =
{
.host = parsed_address.first,
.port = parsed_address.second,
.username = getContext()->getConfigRef().getString("rabbitmq.username"),
.password = getContext()->getConfigRef().getString("rabbitmq.password"),
.username = rabbitmq_username.empty() ? getContext()->getConfigRef().getString("rabbitmq.username") : rabbitmq_username,
.password = rabbitmq_password.empty() ? getContext()->getConfigRef().getString("rabbitmq.password") : rabbitmq_password,
.vhost = getContext()->getConfigRef().getString("rabbitmq.vhost", getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_vhost)),
.secure = rabbitmq_settings->rabbitmq_secure.value,
.connection_string = getContext()->getMacros()->expand(rabbitmq_settings->rabbitmq_address)
@ -1141,9 +1144,9 @@ void registerStorageRabbitMQ(StorageFactory & factory)
{
auto creator_fn = [](const StorageFactory::Arguments & args)
{
auto rabbitmq_settings = std::make_unique<RabbitMQSettings>();
if (!args.storage_def->settings)
bool with_named_collection = getExternalDataSourceConfiguration(args.engine_args, *rabbitmq_settings, args.getLocalContext());
if (!with_named_collection && !args.storage_def->settings)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "RabbitMQ engine must have settings");
rabbitmq_settings->loadFromQuery(*args.storage_def);

View File

@ -0,0 +1,9 @@
<clickhouse>
<named_collections>
<kafka1>
<kafka_broker_list>kafka1:19092</kafka_broker_list>
<kafka_topic_list>conf</kafka_topic_list>
<kafka_group_name>conf</kafka_group_name>
</kafka1>
</named_collections>
</clickhouse>

View File

@ -43,7 +43,7 @@ from . import social_pb2
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs=['configs/kafka.xml'],
main_configs=['configs/kafka.xml', 'configs/named_collection.xml'],
with_kafka=True,
with_zookeeper=True, # For Replicated Table
macros={"kafka_broker":"kafka1",
@ -3171,6 +3171,28 @@ def test_kafka_consumer_failover(kafka_cluster):
kafka_delete_topic(admin_client, topic_name)
def test_kafka_predefined_configuration(kafka_cluster):
admin_client = KafkaAdminClient(bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port))
topic_name = 'conf'
kafka_create_topic(admin_client, topic_name)
messages = []
for i in range(50):
messages.append('{i}, {i}'.format(i=i))
kafka_produce(kafka_cluster, topic_name, messages)
instance.query(f'''
CREATE TABLE test.kafka (key UInt64, value UInt64) ENGINE = Kafka(kafka1, kafka_format='CSV');
''')
result = ''
while True:
result += instance.query('SELECT * FROM test.kafka', ignore_error=True)
if kafka_check_result(result):
break
kafka_check_result(result, True)
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -0,0 +1,14 @@
<clickhouse>
<named_collections>
<rabbit1>
<rabbitmq_host_port>rabbitmq1:5672</rabbitmq_host_port>
<rabbitmq_exchange_name>named</rabbitmq_exchange_name>
<rabbitmq_format>JSONEachRow</rabbitmq_format>
<rabbitmq_skip_broken_messages>111</rabbitmq_skip_broken_messages>
<num_consumers>12</num_consumers>
<rabbitmq_persistent>1</rabbitmq_persistent>
<rabbitmq_username>root</rabbitmq_username>
<rabbitmq_password>clickhouse</rabbitmq_password>
</rabbit1>
</named_collections>
</clickhouse>

View File

@ -19,7 +19,7 @@ from . import rabbitmq_pb2
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs=['configs/rabbitmq.xml', 'configs/macros.xml'],
main_configs=['configs/rabbitmq.xml', 'configs/macros.xml', 'configs/named_collection.xml'],
with_rabbitmq=True)
@ -2226,6 +2226,23 @@ def test_rabbitmq_random_detach(rabbitmq_cluster):
thread.join()
def test_rabbitmq_predefined_configuration(rabbitmq_cluster):
credentials = pika.PlainCredentials('root', 'clickhouse')
parameters = pika.ConnectionParameters(rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
instance.query('''
CREATE TABLE test.rabbitmq (key UInt64, value UInt64)
ENGINE = RabbitMQ(rabbit1, rabbitmq_vhost = '/') ''')
channel.basic_publish(exchange='named', routing_key='', body=json.dumps({'key': 1, 'value': 2}))
while True:
result = instance.query('SELECT * FROM test.rabbitmq ORDER BY key', ignore_error=True)
if result == "1\t2\n":
break
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")