diff --git a/.gitmodules b/.gitmodules index 923554a1532..24211b6707e 100644 --- a/.gitmodules +++ b/.gitmodules @@ -61,3 +61,6 @@ [submodule "contrib/libgsasl"] path = contrib/libgsasl url = https://github.com/ClickHouse-Extras/libgsasl.git +[submodule "contrib/cppkafka"] + path = contrib/cppkafka + url = https://github.com/mfontanini/cppkafka.git diff --git a/cmake/find_rdkafka.cmake b/cmake/find_rdkafka.cmake index 9ba48cadfcd..b0a0a98b382 100644 --- a/cmake/find_rdkafka.cmake +++ b/cmake/find_rdkafka.cmake @@ -25,6 +25,7 @@ endif () if (RDKAFKA_LIB AND RDKAFKA_INCLUDE_DIR) set (USE_RDKAFKA 1) set (RDKAFKA_LIBRARY ${RDKAFKA_LIB} ${OPENSSL_LIBRARIES}) + set (CPPKAFKA_LIBRARY cppkafka) if (SASL2_LIBRARY) list (APPEND RDKAFKA_LIBRARY ${SASL2_LIBRARY}) endif () @@ -35,9 +36,10 @@ elseif (NOT MISSING_INTERNAL_RDKAFKA_LIBRARY AND NOT ARCH_ARM) set (USE_INTERNAL_RDKAFKA_LIBRARY 1) set (RDKAFKA_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/librdkafka/src") set (RDKAFKA_LIBRARY rdkafka) + set (CPPKAFKA_LIBRARY cppkafka) set (USE_RDKAFKA 1) endif () endif () -message (STATUS "Using librdkafka=${USE_RDKAFKA}: ${RDKAFKA_INCLUDE_DIR} : ${RDKAFKA_LIBRARY}") +message (STATUS "Using librdkafka=${USE_RDKAFKA}: ${RDKAFKA_INCLUDE_DIR} : ${RDKAFKA_LIBRARY} ${CPPKAFKA_LIBRARY}") diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 9d964f288d8..25ad30e02eb 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -121,6 +121,7 @@ endif () if (USE_INTERNAL_RDKAFKA_LIBRARY) add_subdirectory (librdkafka-cmake) + add_subdirectory (cppkafka-cmake) target_include_directories(rdkafka BEFORE PRIVATE ${ZLIB_INCLUDE_DIR}) target_include_directories(rdkafka BEFORE PRIVATE ${OPENSSL_INCLUDE_DIR}) endif () diff --git a/contrib/cppkafka b/contrib/cppkafka new file mode 160000 index 00000000000..520465510ef --- /dev/null +++ b/contrib/cppkafka @@ -0,0 +1 @@ +Subproject commit 520465510efef7704346cf8d140967c4abb057c1 diff --git a/contrib/cppkafka-cmake/CMakeLists.txt b/contrib/cppkafka-cmake/CMakeLists.txt new file mode 100644 index 00000000000..f62fa471172 --- /dev/null +++ b/contrib/cppkafka-cmake/CMakeLists.txt @@ -0,0 +1,31 @@ +set(CPPKAFKA_DIR ${CMAKE_SOURCE_DIR}/contrib/cppkafka) + +set(SRCS + ${CPPKAFKA_DIR}/src/configuration.cpp + ${CPPKAFKA_DIR}/src/topic_configuration.cpp + ${CPPKAFKA_DIR}/src/configuration_option.cpp + ${CPPKAFKA_DIR}/src/exceptions.cpp + ${CPPKAFKA_DIR}/src/topic.cpp + ${CPPKAFKA_DIR}/src/buffer.cpp + ${CPPKAFKA_DIR}/src/queue.cpp + ${CPPKAFKA_DIR}/src/message.cpp + ${CPPKAFKA_DIR}/src/message_timestamp.cpp + ${CPPKAFKA_DIR}/src/message_internal.cpp + ${CPPKAFKA_DIR}/src/topic_partition.cpp + ${CPPKAFKA_DIR}/src/topic_partition_list.cpp + ${CPPKAFKA_DIR}/src/metadata.cpp + ${CPPKAFKA_DIR}/src/group_information.cpp + ${CPPKAFKA_DIR}/src/error.cpp + ${CPPKAFKA_DIR}/src/event.cpp + + ${CPPKAFKA_DIR}/src/kafka_handle_base.cpp + ${CPPKAFKA_DIR}/src/producer.cpp + ${CPPKAFKA_DIR}/src/consumer.cpp +) + +add_library(cppkafka ${LINK_MODE} ${SRCS}) + +target_link_libraries(cppkafka ${RDKAFKA_LIBRARY}) +target_include_directories(cppkafka PRIVATE ${CPPKAFKA_DIR}/include/cppkafka) +target_include_directories(cppkafka PRIVATE ${Boost_INCLUDE_DIRS}) +target_include_directories(cppkafka SYSTEM PUBLIC ${CPPKAFKA_DIR}/include) diff --git a/contrib/librdkafka-cmake/CMakeLists.txt b/contrib/librdkafka-cmake/CMakeLists.txt index 90421cfb31d..3b35634dabc 100644 --- a/contrib/librdkafka-cmake/CMakeLists.txt +++ b/contrib/librdkafka-cmake/CMakeLists.txt @@ -1,60 +1,60 @@ set(RDKAFKA_SOURCE_DIR ${CMAKE_SOURCE_DIR}/contrib/librdkafka/src) set(SRCS -${RDKAFKA_SOURCE_DIR}/crc32c.c -${RDKAFKA_SOURCE_DIR}/rdaddr.c -${RDKAFKA_SOURCE_DIR}/rdavl.c -${RDKAFKA_SOURCE_DIR}/rdbuf.c -${RDKAFKA_SOURCE_DIR}/rdcrc32.c -${RDKAFKA_SOURCE_DIR}/rdkafka.c -${RDKAFKA_SOURCE_DIR}/rdkafka_assignor.c -${RDKAFKA_SOURCE_DIR}/rdkafka_broker.c -${RDKAFKA_SOURCE_DIR}/rdkafka_buf.c -${RDKAFKA_SOURCE_DIR}/rdkafka_cgrp.c -${RDKAFKA_SOURCE_DIR}/rdkafka_conf.c -${RDKAFKA_SOURCE_DIR}/rdkafka_event.c -${RDKAFKA_SOURCE_DIR}/rdkafka_feature.c -${RDKAFKA_SOURCE_DIR}/rdkafka_lz4.c -${RDKAFKA_SOURCE_DIR}/rdkafka_metadata.c -${RDKAFKA_SOURCE_DIR}/rdkafka_metadata_cache.c -${RDKAFKA_SOURCE_DIR}/rdkafka_msg.c -${RDKAFKA_SOURCE_DIR}/rdkafka_msgset_reader.c -${RDKAFKA_SOURCE_DIR}/rdkafka_msgset_writer.c -${RDKAFKA_SOURCE_DIR}/rdkafka_offset.c -${RDKAFKA_SOURCE_DIR}/rdkafka_op.c -${RDKAFKA_SOURCE_DIR}/rdkafka_partition.c -${RDKAFKA_SOURCE_DIR}/rdkafka_pattern.c -${RDKAFKA_SOURCE_DIR}/rdkafka_queue.c -${RDKAFKA_SOURCE_DIR}/rdkafka_range_assignor.c -${RDKAFKA_SOURCE_DIR}/rdkafka_request.c -${RDKAFKA_SOURCE_DIR}/rdkafka_roundrobin_assignor.c -${RDKAFKA_SOURCE_DIR}/rdkafka_sasl.c -${RDKAFKA_SOURCE_DIR}/rdkafka_sasl_plain.c -${RDKAFKA_SOURCE_DIR}/rdkafka_subscription.c -${RDKAFKA_SOURCE_DIR}/rdkafka_timer.c -${RDKAFKA_SOURCE_DIR}/rdkafka_topic.c -${RDKAFKA_SOURCE_DIR}/rdkafka_transport.c -${RDKAFKA_SOURCE_DIR}/rdkafka_interceptor.c -${RDKAFKA_SOURCE_DIR}/rdkafka_header.c -${RDKAFKA_SOURCE_DIR}/rdlist.c -${RDKAFKA_SOURCE_DIR}/rdlog.c -${RDKAFKA_SOURCE_DIR}/rdmurmur2.c -${RDKAFKA_SOURCE_DIR}/rdports.c -${RDKAFKA_SOURCE_DIR}/rdrand.c -${RDKAFKA_SOURCE_DIR}/rdregex.c -${RDKAFKA_SOURCE_DIR}/rdstring.c -${RDKAFKA_SOURCE_DIR}/rdunittest.c -${RDKAFKA_SOURCE_DIR}/rdvarint.c -${RDKAFKA_SOURCE_DIR}/snappy.c -${RDKAFKA_SOURCE_DIR}/tinycthread.c -${RDKAFKA_SOURCE_DIR}/xxhash.c -${RDKAFKA_SOURCE_DIR}/lz4.c -${RDKAFKA_SOURCE_DIR}/lz4frame.c -${RDKAFKA_SOURCE_DIR}/lz4hc.c -${RDKAFKA_SOURCE_DIR}/rdgz.c + ${RDKAFKA_SOURCE_DIR}/crc32c.c + ${RDKAFKA_SOURCE_DIR}/rdaddr.c + ${RDKAFKA_SOURCE_DIR}/rdavl.c + ${RDKAFKA_SOURCE_DIR}/rdbuf.c + ${RDKAFKA_SOURCE_DIR}/rdcrc32.c + ${RDKAFKA_SOURCE_DIR}/rdkafka.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_assignor.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_broker.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_buf.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_cgrp.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_conf.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_event.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_feature.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_lz4.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_metadata.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_metadata_cache.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_msg.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_msgset_reader.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_msgset_writer.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_offset.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_op.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_partition.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_pattern.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_queue.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_range_assignor.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_request.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_roundrobin_assignor.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_sasl.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_sasl_plain.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_subscription.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_timer.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_topic.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_transport.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_interceptor.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_header.c + ${RDKAFKA_SOURCE_DIR}/rdlist.c + ${RDKAFKA_SOURCE_DIR}/rdlog.c + ${RDKAFKA_SOURCE_DIR}/rdmurmur2.c + ${RDKAFKA_SOURCE_DIR}/rdports.c + ${RDKAFKA_SOURCE_DIR}/rdrand.c + ${RDKAFKA_SOURCE_DIR}/rdregex.c + ${RDKAFKA_SOURCE_DIR}/rdstring.c + ${RDKAFKA_SOURCE_DIR}/rdunittest.c + ${RDKAFKA_SOURCE_DIR}/rdvarint.c + ${RDKAFKA_SOURCE_DIR}/snappy.c + ${RDKAFKA_SOURCE_DIR}/tinycthread.c + ${RDKAFKA_SOURCE_DIR}/xxhash.c + ${RDKAFKA_SOURCE_DIR}/lz4.c + ${RDKAFKA_SOURCE_DIR}/lz4frame.c + ${RDKAFKA_SOURCE_DIR}/lz4hc.c + ${RDKAFKA_SOURCE_DIR}/rdgz.c ) add_library(rdkafka ${LINK_MODE} ${SRCS}) -target_include_directories(rdkafka PRIVATE include) +target_include_directories(rdkafka SYSTEM PUBLIC include) target_include_directories(rdkafka SYSTEM PUBLIC ${RDKAFKA_SOURCE_DIR}) target_link_libraries(rdkafka PUBLIC ${ZLIB_LIBRARIES} ${OPENSSL_SSL_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY}) diff --git a/contrib/librdkafka-cmake/include/librdkafka/rdkafka.h b/contrib/librdkafka-cmake/include/librdkafka/rdkafka.h new file mode 100644 index 00000000000..3387659281a --- /dev/null +++ b/contrib/librdkafka-cmake/include/librdkafka/rdkafka.h @@ -0,0 +1,5 @@ +#if __has_include() // maybe bundled +# include_next // Y_IGNORE +#else // system +# include_next +#endif diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 7e39fd2f7af..84099810164 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -287,6 +287,7 @@ endif () if (USE_RDKAFKA) target_link_libraries (dbms PRIVATE ${RDKAFKA_LIBRARY}) + target_link_libraries (dbms PRIVATE ${CPPKAFKA_LIBRARY}) if (NOT USE_INTERNAL_RDKAFKA_LIBRARY) target_include_directories (dbms SYSTEM BEFORE PRIVATE ${RDKAFKA_INCLUDE_DIR}) endif () diff --git a/dbms/src/Storages/Kafka/StorageKafka.cpp b/dbms/src/Storages/Kafka/StorageKafka.cpp index e6ccf544ba1..f855ea7e877 100644 --- a/dbms/src/Storages/Kafka/StorageKafka.cpp +++ b/dbms/src/Storages/Kafka/StorageKafka.cpp @@ -1,39 +1,35 @@ -#include -#include +#include + #if USE_RDKAFKA -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include #include #include +#include +#include #include #include #include +#include #include #include #include -#include #include -#include -#include #include -#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include -#if __has_include() // maybe bundled -#include // Y_IGNORE -#else // system -#include -#endif +#include namespace DB diff --git a/dbms/src/Storages/Kafka/StorageKafka.h b/dbms/src/Storages/Kafka/StorageKafka.h index 561349ac474..e7cce510166 100644 --- a/dbms/src/Storages/Kafka/StorageKafka.h +++ b/dbms/src/Storages/Kafka/StorageKafka.h @@ -1,16 +1,18 @@ #pragma once + #include + #if USE_RDKAFKA -#include - -#include -#include #include -#include +#include #include +#include #include #include +#include + +#include struct rd_kafka_s; struct rd_kafka_conf_s;