Fix kafka

This commit is contained in:
kssenii 2023-03-06 12:30:25 +01:00
parent c06af1f1e7
commit 8567e3976b

View File

@ -915,23 +915,27 @@ void registerStorageKafka(StorageFactory & factory)
*/
/* 0 = raw, 1 = evaluateConstantExpressionAsLiteral, 2=evaluateConstantExpressionOrIdentifierAsLiteral */
CHECK_KAFKA_STORAGE_ARGUMENT(1, kafka_broker_list, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(2, kafka_topic_list, 1)
CHECK_KAFKA_STORAGE_ARGUMENT(3, kafka_group_name, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(4, kafka_format, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(5, kafka_row_delimiter, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(6, kafka_schema, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(7, kafka_num_consumers, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(8, kafka_max_block_size, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(9, kafka_skip_broken_messages, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(10, kafka_commit_every_batch, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(11, kafka_client_id, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(12, kafka_poll_timeout_ms, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(13, kafka_flush_interval_ms, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(14, kafka_thread_per_consumer, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(15, kafka_handle_error_mode, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(16, kafka_commit_on_select, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(17, kafka_max_rows_per_message, 0)
/// In case of named collection we already validated the arguments.
if (collection_name.empty())
{
CHECK_KAFKA_STORAGE_ARGUMENT(1, kafka_broker_list, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(2, kafka_topic_list, 1)
CHECK_KAFKA_STORAGE_ARGUMENT(3, kafka_group_name, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(4, kafka_format, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(5, kafka_row_delimiter, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(6, kafka_schema, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(7, kafka_num_consumers, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(8, kafka_max_block_size, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(9, kafka_skip_broken_messages, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(10, kafka_commit_every_batch, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(11, kafka_client_id, 2)
CHECK_KAFKA_STORAGE_ARGUMENT(12, kafka_poll_timeout_ms, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(13, kafka_flush_interval_ms, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(14, kafka_thread_per_consumer, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(15, kafka_handle_error_mode, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(16, kafka_commit_on_select, 0)
CHECK_KAFKA_STORAGE_ARGUMENT(17, kafka_max_rows_per_message, 0)
}
#undef CHECK_KAFKA_STORAGE_ARGUMENT