diff --git a/.gitmodules b/.gitmodules index 6b6b734989d..0fda654f07c 100644 --- a/.gitmodules +++ b/.gitmodules @@ -79,3 +79,9 @@ [submodule "contrib/hyperscan"] path = contrib/hyperscan url = https://github.com/ClickHouse-Extras/hyperscan.git +[submodule "contrib/simdjson"] + path = contrib/simdjson + url = https://github.com/lemire/simdjson.git +[submodule "contrib/rapidjson"] + path = contrib/rapidjson + url = https://github.com/Tencent/rapidjson diff --git a/CMakeLists.txt b/CMakeLists.txt index 4b45792ef76..79b3b1ddba3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,15 @@ project(ClickHouse) cmake_minimum_required(VERSION 3.3) -cmake_policy(SET CMP0023 NEW) + +foreach(policy + CMP0023 + CMP0074 # CMake 3.12 + ) + if(POLICY ${policy}) + cmake_policy(SET ${policy} NEW) + endif() +endforeach() + set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake/Modules/") set(CMAKE_EXPORT_COMPILE_COMMANDS 1) # Write compile_commands.json set(CMAKE_LINK_DEPENDS_NO_SHARED 1) # Do not relink all depended targets on .so @@ -301,6 +310,7 @@ include (cmake/find_rt.cmake) include (cmake/find_execinfo.cmake) include (cmake/find_readline_edit.cmake) include (cmake/find_re2.cmake) +include (cmake/find_libgsasl.cmake) include (cmake/find_rdkafka.cmake) include (cmake/find_capnp.cmake) include (cmake/find_llvm.cmake) @@ -308,7 +318,6 @@ include (cmake/find_cpuid.cmake) # Freebsd, bundled if (NOT USE_CPUID) include (cmake/find_cpuinfo.cmake) # Debian endif() -include (cmake/find_libgsasl.cmake) include (cmake/find_libxml2.cmake) include (cmake/find_brotli.cmake) include (cmake/find_protobuf.cmake) @@ -318,6 +327,8 @@ include (cmake/find_consistent-hashing.cmake) include (cmake/find_base64.cmake) include (cmake/find_hyperscan.cmake) include (cmake/find_lfalloc.cmake) +include (cmake/find_simdjson.cmake) +include (cmake/find_rapidjson.cmake) find_contrib_lib(cityhash) find_contrib_lib(farmhash) find_contrib_lib(metrohash) diff --git a/README.md b/README.md index 02a50be007b..8e7653d60cb 100644 --- a/README.md +++ b/README.md @@ -12,8 +12,8 @@ ClickHouse is an open-source column-oriented database management system that all * You can also [fill this form](https://forms.yandex.com/surveys/meet-yandex-clickhouse-team/) to meet Yandex ClickHouse team in person. ## Upcoming Events -* [ClickHouse Community Meetup in Limassol](https://www.facebook.com/events/386638262181785/) on May 7. * ClickHouse at [Percona Live 2019](https://www.percona.com/live/19/other-open-source-databases-track) in Austin on May 28-30. +* [ClickHouse Community Meetup in San Francisco](https://www.meetup.com/San-Francisco-Bay-Area-ClickHouse-Meetup/events/261110652/) on June 4. * [ClickHouse Community Meetup in Beijing](https://www.huodongxing.com/event/2483759276200) on June 8. * [ClickHouse Community Meetup in Shenzhen](https://www.huodongxing.com/event/3483759917300) on October 20. * [ClickHouse Community Meetup in Shanghai](https://www.huodongxing.com/event/4483760336000) on October 27. diff --git a/cmake/find_boost.cmake b/cmake/find_boost.cmake index b37782556d1..6776d0cea06 100644 --- a/cmake/find_boost.cmake +++ b/cmake/find_boost.cmake @@ -1,9 +1,12 @@ option (USE_INTERNAL_BOOST_LIBRARY "Set to FALSE to use system boost library instead of bundled" ${NOT_UNBUNDLED}) # Test random file existing in all package variants -if (USE_INTERNAL_BOOST_LIBRARY AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/boost/libs/system/src/error_code.cpp") - message (WARNING "submodules in contrib/boost is missing. to fix try run: \n git submodule update --init --recursive") - set (USE_INTERNAL_BOOST_LIBRARY 0) +if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/boost/libs/system/src/error_code.cpp") + if(USE_INTERNAL_BOOST_LIBRARY) + message(WARNING "submodules in contrib/boost is missing. to fix try run: \n git submodule update --init --recursive") + endif() + set (USE_INTERNAL_BOOST_LIBRARY 0) + set (MISSING_INTERNAL_BOOST_LIBRARY 1) endif () if (NOT USE_INTERNAL_BOOST_LIBRARY) @@ -21,10 +24,9 @@ if (NOT USE_INTERNAL_BOOST_LIBRARY) set (Boost_INCLUDE_DIRS "") set (Boost_SYSTEM_LIBRARY "") endif () - endif () -if (NOT Boost_SYSTEM_LIBRARY) +if (NOT Boost_SYSTEM_LIBRARY AND NOT MISSING_INTERNAL_BOOST_LIBRARY) set (USE_INTERNAL_BOOST_LIBRARY 1) set (Boost_SYSTEM_LIBRARY boost_system_internal) set (Boost_PROGRAM_OPTIONS_LIBRARY boost_program_options_internal) @@ -44,7 +46,6 @@ if (NOT Boost_SYSTEM_LIBRARY) # For packaged version: list (APPEND Boost_INCLUDE_DIRS "${ClickHouse_SOURCE_DIR}/contrib/boost") - endif () message (STATUS "Using Boost: ${Boost_INCLUDE_DIRS} : ${Boost_PROGRAM_OPTIONS_LIBRARY},${Boost_SYSTEM_LIBRARY},${Boost_FILESYSTEM_LIBRARY},${Boost_REGEX_LIBRARY}") diff --git a/cmake/find_icu.cmake b/cmake/find_icu.cmake index 39991eef87d..d83a7ba4cb4 100644 --- a/cmake/find_icu.cmake +++ b/cmake/find_icu.cmake @@ -1,6 +1,9 @@ option(ENABLE_ICU "Enable ICU" ON) if(ENABLE_ICU) + if (APPLE) + set(ICU_ROOT "/usr/local/opt/icu4c" CACHE STRING "") + endif() find_package(ICU COMPONENTS i18n uc data) # TODO: remove Modules/FindICU.cmake after cmake 3.7 #set (ICU_LIBRARIES ${ICU_I18N_LIBRARY} ${ICU_UC_LIBRARY} ${ICU_DATA_LIBRARY} CACHE STRING "") if(ICU_FOUND) diff --git a/cmake/find_lfalloc.cmake b/cmake/find_lfalloc.cmake index c9b2ce5d436..81b1827e44c 100644 --- a/cmake/find_lfalloc.cmake +++ b/cmake/find_lfalloc.cmake @@ -1,4 +1,4 @@ -if (NOT SANITIZE AND NOT ARCH_ARM AND NOT ARCH_32 AND NOT ARCH_PPC64LE AND NOT OS_FREEBSD) +if (NOT SANITIZE AND NOT ARCH_ARM AND NOT ARCH_32 AND NOT ARCH_PPC64LE AND NOT OS_FREEBSD AND NOT APPLE) option (ENABLE_LFALLOC "Set to FALSE to use system libgsasl library instead of bundled" ${NOT_UNBUNDLED}) endif () diff --git a/cmake/find_libgsasl.cmake b/cmake/find_libgsasl.cmake index ef1bbefe0df..729401292db 100644 --- a/cmake/find_libgsasl.cmake +++ b/cmake/find_libgsasl.cmake @@ -22,4 +22,8 @@ elseif (NOT MISSING_INTERNAL_LIBGSASL_LIBRARY AND NOT APPLE AND NOT ARCH_32) set (LIBGSASL_LIBRARY libgsasl) endif () -message (STATUS "Using libgsasl: ${LIBGSASL_INCLUDE_DIR} : ${LIBGSASL_LIBRARY}") +if(LIBGSASL_LIBRARY AND LIBGSASL_INCLUDE_DIR) + set (USE_LIBGSASL 1) +endif() + +message (STATUS "Using libgsasl=${USE_LIBGSASL}: ${LIBGSASL_INCLUDE_DIR} : ${LIBGSASL_LIBRARY}") diff --git a/cmake/find_rapidjson.cmake b/cmake/find_rapidjson.cmake new file mode 100644 index 00000000000..bd8f0fbb449 --- /dev/null +++ b/cmake/find_rapidjson.cmake @@ -0,0 +1,9 @@ +if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/rapidjson/include/rapidjson/rapidjson.h") + message (WARNING "submodule contrib/rapidjson is missing. to fix try run: \n git submodule update --init --recursive") + return() +endif () + +option (USE_RAPIDJSON "Use rapidjson" ON) +set (RAPIDJSON_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/rapidjson/include") + +message(STATUS "Using rapidjson=${USE_RAPIDJSON}: ${RAPIDJSON_INCLUDE_DIR}") diff --git a/cmake/find_rdkafka.cmake b/cmake/find_rdkafka.cmake index 3363c657f91..8469969cf62 100644 --- a/cmake/find_rdkafka.cmake +++ b/cmake/find_rdkafka.cmake @@ -10,7 +10,7 @@ endif () if (ENABLE_RDKAFKA) -if (OS_LINUX AND NOT ARCH_ARM) +if (OS_LINUX AND NOT ARCH_ARM AND USE_LIBGSASL) option (USE_INTERNAL_RDKAFKA_LIBRARY "Set to FALSE to use system librdkafka instead of the bundled" ${NOT_UNBUNDLED}) endif () diff --git a/cmake/find_re2.cmake b/cmake/find_re2.cmake index c0136a6cc21..05ba80f143f 100644 --- a/cmake/find_re2.cmake +++ b/cmake/find_re2.cmake @@ -1,5 +1,13 @@ option (USE_INTERNAL_RE2_LIBRARY "Set to FALSE to use system re2 library instead of bundled [slower]" ${NOT_UNBUNDLED}) +if(NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/re2/CMakeLists.txt") + if(USE_INTERNAL_RE2_LIBRARY) + message(WARNING "submodule contrib/re2 is missing. to fix try run: \n git submodule update --init --recursive") + endif() + set(USE_INTERNAL_RE2_LIBRARY 0) + set(MISSING_INTERNAL_RE2_LIBRARY 1) +endif() + if (NOT USE_INTERNAL_RE2_LIBRARY) find_library (RE2_LIBRARY re2) find_path (RE2_INCLUDE_DIR NAMES re2/re2.h PATHS ${RE2_INCLUDE_PATHS}) diff --git a/cmake/find_simdjson.cmake b/cmake/find_simdjson.cmake new file mode 100644 index 00000000000..a556fa5f2b2 --- /dev/null +++ b/cmake/find_simdjson.cmake @@ -0,0 +1,14 @@ +if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/simdjson/include/simdjson/jsonparser.h") + message (WARNING "submodule contrib/simdjson is missing. to fix try run: \n git submodule update --init --recursive") + return() +endif () + +if (NOT HAVE_AVX2) + message (WARNING "submodule contrib/simdjson requires AVX2 support") + return() +endif () + +option (USE_SIMDJSON "Use simdjson" ON) +set (SIMDJSON_LIBRARY "simdjson") + +message(STATUS "Using simdjson=${USE_SIMDJSON}: ${SIMDJSON_LIBRARY}") diff --git a/cmake/find_zlib.cmake b/cmake/find_zlib.cmake index fb6b8c7971d..42cfce871d7 100644 --- a/cmake/find_zlib.cmake +++ b/cmake/find_zlib.cmake @@ -2,20 +2,28 @@ if (NOT OS_FREEBSD AND NOT ARCH_32) option (USE_INTERNAL_ZLIB_LIBRARY "Set to FALSE to use system zlib library instead of bundled" ${NOT_UNBUNDLED}) endif () +if (NOT MSVC) + set (INTERNAL_ZLIB_NAME "zlib-ng" CACHE INTERNAL "") +else () + set (INTERNAL_ZLIB_NAME "zlib" CACHE INTERNAL "") + if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/${INTERNAL_ZLIB_NAME}") + message (WARNING "Will use standard zlib, please clone manually:\n git clone https://github.com/madler/zlib.git ${ClickHouse_SOURCE_DIR}/contrib/${INTERNAL_ZLIB_NAME}") + endif () +endif () + +if(NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/${INTERNAL_ZLIB_NAME}/zlib.h") + if(USE_INTERNAL_ZLIB_LIBRARY) + message(WARNING "submodule contrib/${INTERNAL_ZLIB_NAME} is missing. to fix try run: \n git submodule update --init --recursive") + endif() + set(USE_INTERNAL_ZLIB_LIBRARY 0) + set(MISSING_INTERNAL_ZLIB_LIBRARY 1) +endif() + if (NOT USE_INTERNAL_ZLIB_LIBRARY) find_package (ZLIB) endif () -if (NOT ZLIB_FOUND) - if (NOT MSVC) - set (INTERNAL_ZLIB_NAME "zlib-ng" CACHE INTERNAL "") - else () - set (INTERNAL_ZLIB_NAME "zlib" CACHE INTERNAL "") - if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/${INTERNAL_ZLIB_NAME}") - message (WARNING "Will use standard zlib, please clone manually:\n git clone https://github.com/madler/zlib.git ${ClickHouse_SOURCE_DIR}/contrib/${INTERNAL_ZLIB_NAME}") - endif () - endif () - +if (NOT ZLIB_FOUND AND NOT MISSING_INTERNAL_ZLIB_LIBRARY) set (USE_INTERNAL_ZLIB_LIBRARY 1) set (ZLIB_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/${INTERNAL_ZLIB_NAME}" "${ClickHouse_BINARY_DIR}/contrib/${INTERNAL_ZLIB_NAME}" CACHE INTERNAL "") # generated zconf.h set (ZLIB_INCLUDE_DIRS ${ZLIB_INCLUDE_DIR}) # for poco diff --git a/cmake/find_zstd.cmake b/cmake/find_zstd.cmake index 24bc851ed57..e4f32d4b170 100644 --- a/cmake/find_zstd.cmake +++ b/cmake/find_zstd.cmake @@ -1,9 +1,12 @@ option (USE_INTERNAL_ZSTD_LIBRARY "Set to FALSE to use system zstd library instead of bundled" ${NOT_UNBUNDLED}) -if (USE_INTERNAL_ZSTD_LIBRARY AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/zstd/lib/zstd.h") - message (WARNING "submodule contrib/zstd is missing. to fix try run: \n git submodule update --init --recursive") - set (USE_INTERNAL_ZSTD_LIBRARY 0) -endif () +if(NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/zstd/lib/zstd.h") + if(USE_INTERNAL_ZSTD_LIBRARY) + message(WARNING "submodule contrib/zstd is missing. to fix try run: \n git submodule update --init --recursive") + endif() + set(USE_INTERNAL_ZSTD_LIBRARY 0) + set(MISSING_INTERNAL_ZSTD_LIBRARY 1) +endif() if (NOT USE_INTERNAL_ZSTD_LIBRARY) find_library (ZSTD_LIBRARY zstd) @@ -11,7 +14,7 @@ if (NOT USE_INTERNAL_ZSTD_LIBRARY) endif () if (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR) -else () +elseif (NOT MISSING_INTERNAL_ZSTD_LIBRARY) set (USE_INTERNAL_ZSTD_LIBRARY 1) set (ZSTD_LIBRARY zstd) set (ZSTD_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/zstd/lib) diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index abae6f7deec..71c269ad2bc 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -227,7 +227,7 @@ if (USE_INTERNAL_POCO_LIBRARY) set (ENABLE_TESTS 0) set (POCO_ENABLE_TESTS 0) set (CMAKE_DISABLE_FIND_PACKAGE_ZLIB 1) - if (MSVC) + if (MSVC OR NOT USE_POCO_DATAODBC) set (ENABLE_DATA_ODBC 0 CACHE INTERNAL "") # TODO (build fail) endif () add_subdirectory (poco) @@ -313,3 +313,7 @@ endif() if (USE_INTERNAL_HYPERSCAN_LIBRARY) add_subdirectory (hyperscan) endif() + +if (USE_SIMDJSON) + add_subdirectory (simdjson-cmake) +endif() diff --git a/contrib/boost b/contrib/boost index 471ea208abb..79bf85ea99c 160000 --- a/contrib/boost +++ b/contrib/boost @@ -1 +1 @@ -Subproject commit 471ea208abb92a5cba7d3a08a819bb728f27e95f +Subproject commit 79bf85ea99c05ba4fb6959474d4464ab126f8973 diff --git a/contrib/librdkafka-cmake/CMakeLists.txt b/contrib/librdkafka-cmake/CMakeLists.txt index 6dc86bf3219..3c9c17b1796 100644 --- a/contrib/librdkafka-cmake/CMakeLists.txt +++ b/contrib/librdkafka-cmake/CMakeLists.txt @@ -33,6 +33,7 @@ set(SRCS ${RDKAFKA_SOURCE_DIR}/rdkafka_roundrobin_assignor.c ${RDKAFKA_SOURCE_DIR}/rdkafka_sasl.c ${RDKAFKA_SOURCE_DIR}/rdkafka_sasl_plain.c + ${RDKAFKA_SOURCE_DIR}/rdkafka_sasl_scram.c ${RDKAFKA_SOURCE_DIR}/rdkafka_subscription.c ${RDKAFKA_SOURCE_DIR}/rdkafka_timer.c ${RDKAFKA_SOURCE_DIR}/rdkafka_topic.c @@ -58,7 +59,7 @@ add_library(rdkafka ${SRCS}) target_include_directories(rdkafka SYSTEM PUBLIC include) target_include_directories(rdkafka SYSTEM PUBLIC ${RDKAFKA_SOURCE_DIR}) # Because weird logic with "include_next" is used. target_include_directories(rdkafka SYSTEM PRIVATE ${ZSTD_INCLUDE_DIR}/common) # Because wrong path to "zstd_errors.h" is used. -target_link_libraries(rdkafka PUBLIC ${ZLIB_LIBRARIES} ${ZSTD_LIBRARY} ${LZ4_LIBRARY}) +target_link_libraries(rdkafka PUBLIC ${ZLIB_LIBRARIES} ${ZSTD_LIBRARY} ${LZ4_LIBRARY} ${LIBGSASL_LIBRARY}) if(OPENSSL_SSL_LIBRARY AND OPENSSL_CRYPTO_LIBRARY) target_link_libraries(rdkafka PUBLIC ${OPENSSL_SSL_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY}) endif() diff --git a/contrib/librdkafka-cmake/config.h b/contrib/librdkafka-cmake/config.h index ae4e370a628..403a79ea42e 100644 --- a/contrib/librdkafka-cmake/config.h +++ b/contrib/librdkafka-cmake/config.h @@ -12,7 +12,7 @@ #define ENABLE_SHAREDPTR_DEBUG 0 #define ENABLE_LZ4_EXT 1 #define ENABLE_SSL 1 -//#define ENABLE_SASL 1 +#define ENABLE_SASL 1 #define MKL_APP_NAME "librdkafka" #define MKL_APP_DESC_ONELINE "The Apache Kafka C/C++ library" // distro @@ -62,7 +62,7 @@ // libssl #define WITH_SSL 1 // WITH_SASL_SCRAM -//#define WITH_SASL_SCRAM 1 +#define WITH_SASL_SCRAM 1 // crc32chw #if !defined(__PPC__) #define WITH_CRC32C_HW 1 diff --git a/contrib/rapidjson b/contrib/rapidjson new file mode 160000 index 00000000000..01950eb7ace --- /dev/null +++ b/contrib/rapidjson @@ -0,0 +1 @@ +Subproject commit 01950eb7acec78818d68b762efc869bba2420d82 diff --git a/contrib/simdjson b/contrib/simdjson new file mode 160000 index 00000000000..14cd1f7a0b0 --- /dev/null +++ b/contrib/simdjson @@ -0,0 +1 @@ +Subproject commit 14cd1f7a0b0563db78bda8053a9f6ac2ea95a441 diff --git a/contrib/simdjson-cmake/CMakeLists.txt b/contrib/simdjson-cmake/CMakeLists.txt new file mode 100644 index 00000000000..16a5dc1a791 --- /dev/null +++ b/contrib/simdjson-cmake/CMakeLists.txt @@ -0,0 +1,18 @@ +if (NOT HAVE_AVX2) + message (FATAL_ERROR "No AVX2 support") +endif () +set(SIMDJSON_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/simdjson/include") +set(SIMDJSON_SRC_DIR "${SIMDJSON_INCLUDE_DIR}/../src") +set(SIMDJSON_SRC + ${SIMDJSON_SRC_DIR}/jsonioutil.cpp + ${SIMDJSON_SRC_DIR}/jsonminifier.cpp + ${SIMDJSON_SRC_DIR}/jsonparser.cpp + ${SIMDJSON_SRC_DIR}/stage1_find_marks.cpp + ${SIMDJSON_SRC_DIR}/stage2_build_tape.cpp + ${SIMDJSON_SRC_DIR}/parsedjson.cpp + ${SIMDJSON_SRC_DIR}/parsedjsoniterator.cpp +) + +add_library(${SIMDJSON_LIBRARY} ${SIMDJSON_SRC}) +target_include_directories(${SIMDJSON_LIBRARY} PUBLIC "${SIMDJSON_INCLUDE_DIR}") +target_compile_options(${SIMDJSON_LIBRARY} PRIVATE -mavx2 -mbmi -mbmi2 -mpclmul) diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 615dc666500..d0ca68543f0 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -189,8 +189,17 @@ target_link_libraries (clickhouse_common_io ${Poco_Net_LIBRARY} ${Poco_Util_LIBRARY} ${Poco_Foundation_LIBRARY} - ${RE2_LIBRARY} - ${RE2_ST_LIBRARY} +) + +if(RE2_LIBRARY) + target_link_libraries(clickhouse_common_io PUBLIC ${RE2_LIBRARY}) +endif() +if(RE2_ST_LIBRARY) + target_link_libraries(clickhouse_common_io PUBLIC ${RE2_ST_LIBRARY}) +endif() + +target_link_libraries(clickhouse_common_io + PUBLIC ${CITYHASH_LIBRARIES} PRIVATE ${ZLIB_LIBRARIES} @@ -208,7 +217,9 @@ target_link_libraries (clickhouse_common_io ) -target_include_directories(clickhouse_common_io SYSTEM BEFORE PUBLIC ${RE2_INCLUDE_DIR}) +if(RE2_INCLUDE_DIR) + target_include_directories(clickhouse_common_io SYSTEM BEFORE PUBLIC ${RE2_INCLUDE_DIR}) +endif() if (USE_LFALLOC) target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${LFALLOC_INCLUDE_DIR}) diff --git a/dbms/programs/CMakeLists.txt b/dbms/programs/CMakeLists.txt index af65df6e8fe..6626d90e5f5 100644 --- a/dbms/programs/CMakeLists.txt +++ b/dbms/programs/CMakeLists.txt @@ -209,6 +209,9 @@ else () install (FILES ${CMAKE_CURRENT_BINARY_DIR}/clickhouse-obfuscator DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) list(APPEND CLICKHOUSE_BUNDLE clickhouse-obfuscator) endif () + if(ENABLE_CLICKHOUSE_ODBC_BRIDGE) + list(APPEND CLICKHOUSE_BUNDLE clickhouse-odbc-bridge) + endif() # install always because depian package want this files: add_custom_target (clickhouse-clang ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-clang DEPENDS clickhouse) diff --git a/dbms/programs/copier/CMakeLists.txt b/dbms/programs/copier/CMakeLists.txt index 55b2fc7e1cb..0aec381ebd5 100644 --- a/dbms/programs/copier/CMakeLists.txt +++ b/dbms/programs/copier/CMakeLists.txt @@ -1,5 +1,5 @@ set(CLICKHOUSE_COPIER_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/ClusterCopier.cpp) -set(CLICKHOUSE_COPIER_LINK PRIVATE clickhouse_functions clickhouse_table_functions clickhouse_aggregate_functions PUBLIC daemon) +set(CLICKHOUSE_COPIER_LINK PRIVATE clickhouse_functions clickhouse_table_functions clickhouse_aggregate_functions clickhouse_dictionaries PUBLIC daemon) set(CLICKHOUSE_COPIER_INCLUDE SYSTEM PRIVATE ${PCG_RANDOM_INCLUDE_DIR}) clickhouse_program_add(copier) diff --git a/dbms/programs/copier/ClusterCopier.cpp b/dbms/programs/copier/ClusterCopier.cpp index 75096df74ed..5d388686d55 100644 --- a/dbms/programs/copier/ClusterCopier.cpp +++ b/dbms/programs/copier/ClusterCopier.cpp @@ -63,6 +63,7 @@ #include #include #include +#include #include #include @@ -2169,6 +2170,7 @@ void ClusterCopierApp::mainImpl() registerAggregateFunctions(); registerTableFunctions(); registerStorages(); + registerDictionaries(); static const std::string default_database = "_local"; context->addDatabase(default_database, std::make_shared(default_database)); diff --git a/dbms/programs/performance-test/PerformanceTestSuite.cpp b/dbms/programs/performance-test/PerformanceTestSuite.cpp index 9c72ba953dc..523e6787364 100644 --- a/dbms/programs/performance-test/PerformanceTestSuite.cpp +++ b/dbms/programs/performance-test/PerformanceTestSuite.cpp @@ -370,7 +370,7 @@ try Poco::Logger * log = &Poco::Logger::get("PerformanceTestSuite"); if (options.count("help")) { - std::cout << "Usage: " << argv[0] << " [options] [test_file ...] [tests_folder]\n"; + std::cout << "Usage: " << argv[0] << " [options]\n"; std::cout << desc << "\n"; return 0; } diff --git a/dbms/programs/performance-test/TestStats.cpp b/dbms/programs/performance-test/TestStats.cpp index 100c7a84391..4a3ec281d90 100644 --- a/dbms/programs/performance-test/TestStats.cpp +++ b/dbms/programs/performance-test/TestStats.cpp @@ -1,4 +1,5 @@ #include "TestStats.h" +#include namespace DB { @@ -92,11 +93,10 @@ void TestStats::update_average_speed( avg_speed_value /= number_of_info_batches; if (avg_speed_first == 0) - { avg_speed_first = avg_speed_value; - } - if (std::abs(avg_speed_value - avg_speed_first) >= precision) + auto [min, max] = std::minmax(avg_speed_value, avg_speed_first); + if (1 - min / max >= precision) { avg_speed_first = avg_speed_value; avg_speed_watch.restart(); diff --git a/dbms/programs/performance-test/TestStats.h b/dbms/programs/performance-test/TestStats.h index 84880b7b189..5d70edc437c 100644 --- a/dbms/programs/performance-test/TestStats.h +++ b/dbms/programs/performance-test/TestStats.h @@ -40,11 +40,11 @@ struct TestStats double avg_rows_speed_value = 0; double avg_rows_speed_first = 0; - static inline double avg_rows_speed_precision = 0.001; + static inline double avg_rows_speed_precision = 0.005; double avg_bytes_speed_value = 0; double avg_bytes_speed_first = 0; - static inline double avg_bytes_speed_precision = 0.001; + static inline double avg_bytes_speed_precision = 0.005; size_t number_of_rows_speed_info_batches = 0; size_t number_of_bytes_speed_info_batches = 0; diff --git a/dbms/programs/server/Server.cpp b/dbms/programs/server/Server.cpp index fea06e9506d..e446a88abc5 100644 --- a/dbms/programs/server/Server.cpp +++ b/dbms/programs/server/Server.cpp @@ -79,6 +79,7 @@ namespace ErrorCodes extern const int SYSTEM_ERROR; extern const int FAILED_TO_GETPWUID; extern const int MISMATCHING_USERS_FOR_PROCESS_AND_DATA; + extern const int NETWORK_ERROR; } @@ -587,12 +588,12 @@ int Server::main(const std::vector & /*args*/) return socket_address; }; - auto socket_bind_listen = [&](auto & socket, const std::string & host, UInt16 port, bool secure = 0) + auto socket_bind_listen = [&](auto & socket, const std::string & host, UInt16 port, [[maybe_unused]] bool secure = 0) { auto address = make_socket_address(host, port); -#if !defined(POCO_CLICKHOUSE_PATCH) || POCO_VERSION <= 0x02000000 // TODO: fill correct version +#if !defined(POCO_CLICKHOUSE_PATCH) || POCO_VERSION < 0x01090100 if (secure) - /// Bug in old poco, listen() after bind() with reusePort param will fail because have no implementation in SecureServerSocketImpl + /// Bug in old (<1.9.1) poco, listen() after bind() with reusePort param will fail because have no implementation in SecureServerSocketImpl /// https://github.com/pocoproject/poco/pull/2257 socket.bind(address, /* reuseAddress = */ true); else @@ -611,13 +612,15 @@ int Server::main(const std::vector & /*args*/) for (const auto & listen_host : listen_hosts) { /// For testing purposes, user may omit tcp_port or http_port or https_port in configuration file. + uint16_t listen_port = 0; try { /// HTTP if (config().has("http_port")) { Poco::Net::ServerSocket socket; - auto address = socket_bind_listen(socket, listen_host, config().getInt("http_port")); + listen_port = config().getInt("http_port"); + auto address = socket_bind_listen(socket, listen_host, listen_port); socket.setReceiveTimeout(settings.http_receive_timeout); socket.setSendTimeout(settings.http_send_timeout); servers.emplace_back(std::make_unique( @@ -634,7 +637,8 @@ int Server::main(const std::vector & /*args*/) { #if USE_POCO_NETSSL Poco::Net::SecureServerSocket socket; - auto address = socket_bind_listen(socket, listen_host, config().getInt("https_port"), /* secure = */ true); + listen_port = config().getInt("https_port"); + auto address = socket_bind_listen(socket, listen_host, listen_port, /* secure = */ true); socket.setReceiveTimeout(settings.http_receive_timeout); socket.setSendTimeout(settings.http_send_timeout); servers.emplace_back(std::make_unique( @@ -654,7 +658,8 @@ int Server::main(const std::vector & /*args*/) if (config().has("tcp_port")) { Poco::Net::ServerSocket socket; - auto address = socket_bind_listen(socket, listen_host, config().getInt("tcp_port")); + listen_port = config().getInt("tcp_port"); + auto address = socket_bind_listen(socket, listen_host, listen_port); socket.setReceiveTimeout(settings.receive_timeout); socket.setSendTimeout(settings.send_timeout); servers.emplace_back(std::make_unique( @@ -671,7 +676,8 @@ int Server::main(const std::vector & /*args*/) { #if USE_POCO_NETSSL Poco::Net::SecureServerSocket socket; - auto address = socket_bind_listen(socket, listen_host, config().getInt("tcp_port_secure"), /* secure = */ true); + listen_port = config().getInt("tcp_port_secure"); + auto address = socket_bind_listen(socket, listen_host, listen_port, /* secure = */ true); socket.setReceiveTimeout(settings.receive_timeout); socket.setSendTimeout(settings.send_timeout); servers.emplace_back(std::make_unique( @@ -694,7 +700,8 @@ int Server::main(const std::vector & /*args*/) if (config().has("interserver_http_port")) { Poco::Net::ServerSocket socket; - auto address = socket_bind_listen(socket, listen_host, config().getInt("interserver_http_port")); + listen_port = config().getInt("interserver_http_port"); + auto address = socket_bind_listen(socket, listen_host, listen_port); socket.setReceiveTimeout(settings.http_receive_timeout); socket.setSendTimeout(settings.http_send_timeout); servers.emplace_back(std::make_unique( @@ -710,7 +717,8 @@ int Server::main(const std::vector & /*args*/) { #if USE_POCO_NETSSL Poco::Net::SecureServerSocket socket; - auto address = socket_bind_listen(socket, listen_host, config().getInt("interserver_https_port"), /* secure = */ true); + listen_port = config().getInt("interserver_https_port"); + auto address = socket_bind_listen(socket, listen_host, listen_port, /* secure = */ true); socket.setReceiveTimeout(settings.http_receive_timeout); socket.setSendTimeout(settings.http_send_timeout); servers.emplace_back(std::make_unique( @@ -726,16 +734,17 @@ int Server::main(const std::vector & /*args*/) #endif } } - catch (const Poco::Net::NetException & e) + catch (const Poco::Exception & e) { + std::string message = "Listen [" + listen_host + "]:" + std::to_string(listen_port) + " failed: " + std::to_string(e.code()) + ": " + e.what() + ": " + e.message(); if (listen_try) - LOG_ERROR(log, "Listen [" << listen_host << "]: " << e.code() << ": " << e.what() << ": " << e.message() + LOG_ERROR(log, message << " If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to " "specify not disabled IPv4 or IPv6 address to listen in element of configuration " "file. Example for disabled IPv6: 0.0.0.0 ." " Example for disabled IPv4: ::"); else - throw; + throw Exception{message, ErrorCodes::NETWORK_ERROR}; } } diff --git a/dbms/src/AggregateFunctions/AggregateFunctionMLMethod.cpp b/dbms/src/AggregateFunctions/AggregateFunctionMLMethod.cpp new file mode 100644 index 00000000000..11daf5ca819 --- /dev/null +++ b/dbms/src/AggregateFunctions/AggregateFunctionMLMethod.cpp @@ -0,0 +1,474 @@ +#include "AggregateFunctionMLMethod.h" + +#include +#include +#include +#include +#include +#include "AggregateFunctionFactory.h" +#include "FactoryHelpers.h" +#include "Helpers.h" + + +namespace DB +{ +namespace +{ + using FuncLinearRegression = AggregateFunctionMLMethod; + using FuncLogisticRegression = AggregateFunctionMLMethod; + template + AggregateFunctionPtr + createAggregateFunctionMLMethod(const std::string & name, const DataTypes & argument_types, const Array & parameters) + { + if (parameters.size() > 4) + throw Exception( + "Aggregate function " + name + + " requires at most four parameters: learning_rate, l2_regularization_coef, mini-batch size and weights_updater " + "method", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + if (argument_types.size() < 2) + throw Exception( + "Aggregate function " + name + " requires at least two arguments: target and model's parameters", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + for (size_t i = 0; i < argument_types.size(); ++i) + { + if (!isNumber(argument_types[i])) + throw Exception( + "Argument " + std::to_string(i) + " of type " + argument_types[i]->getName() + + " must be numeric for aggregate function " + name, + ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } + + /// Such default parameters were picked because they did good on some tests, + /// though it still requires to fit parameters to achieve better result + auto learning_rate = Float64(0.01); + auto l2_reg_coef = Float64(0.01); + UInt32 batch_size = 1; + + std::shared_ptr weights_updater = std::make_shared(); + std::shared_ptr gradient_computer; + + if (!parameters.empty()) + { + learning_rate = applyVisitor(FieldVisitorConvertToNumber(), parameters[0]); + } + if (parameters.size() > 1) + { + l2_reg_coef = applyVisitor(FieldVisitorConvertToNumber(), parameters[1]); + } + if (parameters.size() > 2) + { + batch_size = applyVisitor(FieldVisitorConvertToNumber(), parameters[2]); + } + if (parameters.size() > 3) + { + if (applyVisitor(FieldVisitorToString(), parameters[3]) == "\'SGD\'") + { + weights_updater = std::make_shared(); + } + else if (applyVisitor(FieldVisitorToString(), parameters[3]) == "\'Momentum\'") + { + weights_updater = std::make_shared(); + } + else if (applyVisitor(FieldVisitorToString(), parameters[3]) == "\'Nesterov\'") + { + weights_updater = std::make_shared(); + } + else + { + throw Exception("Invalid parameter for weights updater", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } + } + + if (std::is_same::value) + { + gradient_computer = std::make_shared(); + } + else if (std::is_same::value) + { + gradient_computer = std::make_shared(); + } + else + { + throw Exception("Such gradient computer is not implemented yet", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); + } + + return std::make_shared( + argument_types.size() - 1, + gradient_computer, + weights_updater, + learning_rate, + l2_reg_coef, + batch_size, + argument_types, + parameters); + } + +} + +void registerAggregateFunctionMLMethod(AggregateFunctionFactory & factory) +{ + factory.registerFunction("LinearRegression", createAggregateFunctionMLMethod); + factory.registerFunction("LogisticRegression", createAggregateFunctionMLMethod); +} + +LinearModelData::LinearModelData( + Float64 learning_rate, + Float64 l2_reg_coef, + UInt32 param_num, + UInt32 batch_capacity, + std::shared_ptr gradient_computer, + std::shared_ptr weights_updater) + : learning_rate(learning_rate) + , l2_reg_coef(l2_reg_coef) + , batch_capacity(batch_capacity) + , batch_size(0) + , gradient_computer(std::move(gradient_computer)) + , weights_updater(std::move(weights_updater)) +{ + weights.resize(param_num, Float64{0.0}); + gradient_batch.resize(param_num + 1, Float64{0.0}); +} + +void LinearModelData::update_state() +{ + if (batch_size == 0) + return; + + weights_updater->update(batch_size, weights, bias, gradient_batch); + batch_size = 0; + ++iter_num; + gradient_batch.assign(gradient_batch.size(), Float64{0.0}); +} + +void LinearModelData::predict( + ColumnVector::Container & container, Block & block, const ColumnNumbers & arguments, const Context & context) const +{ + gradient_computer->predict(container, block, arguments, weights, bias, context); +} + +void LinearModelData::read(ReadBuffer & buf) +{ + readBinary(bias, buf); + readBinary(weights, buf); + readBinary(iter_num, buf); + readBinary(gradient_batch, buf); + readBinary(batch_size, buf); + weights_updater->read(buf); +} + +void LinearModelData::write(WriteBuffer & buf) const +{ + writeBinary(bias, buf); + writeBinary(weights, buf); + writeBinary(iter_num, buf); + writeBinary(gradient_batch, buf); + writeBinary(batch_size, buf); + weights_updater->write(buf); +} + +void LinearModelData::merge(const DB::LinearModelData & rhs) +{ + if (iter_num == 0 && rhs.iter_num == 0) + return; + + update_state(); + /// can't update rhs state because it's constant + + Float64 frac = (static_cast(iter_num) * iter_num) / (iter_num * iter_num + rhs.iter_num * rhs.iter_num); + + for (size_t i = 0; i < weights.size(); ++i) + { + weights[i] = weights[i] * frac + rhs.weights[i] * (1 - frac); + } + bias = bias * frac + rhs.bias * (1 - frac); + + iter_num += rhs.iter_num; + weights_updater->merge(*rhs.weights_updater, frac, 1 - frac); +} + +void LinearModelData::add(const IColumn ** columns, size_t row_num) +{ + /// first column stores target; features start from (columns + 1) + const auto target = (*columns[0])[row_num].get(); + /// Here we have columns + 1 as first column corresponds to target value, and others - to features + weights_updater->add_to_batch( + gradient_batch, *gradient_computer, weights, bias, learning_rate, l2_reg_coef, target, columns + 1, row_num); + + ++batch_size; + if (batch_size == batch_capacity) + { + update_state(); + } +} + + +void Nesterov::read(ReadBuffer & buf) +{ + readBinary(accumulated_gradient, buf); +} + +void Nesterov::write(WriteBuffer & buf) const +{ + writeBinary(accumulated_gradient, buf); +} + +void Nesterov::merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) +{ + auto & nesterov_rhs = static_cast(rhs); + for (size_t i = 0; i < accumulated_gradient.size(); ++i) + { + accumulated_gradient[i] = accumulated_gradient[i] * frac + nesterov_rhs.accumulated_gradient[i] * rhs_frac; + } +} + +void Nesterov::update(UInt32 batch_size, std::vector & weights, Float64 & bias, const std::vector & batch_gradient) +{ + if (accumulated_gradient.empty()) + { + accumulated_gradient.resize(batch_gradient.size(), Float64{0.0}); + } + + for (size_t i = 0; i < batch_gradient.size(); ++i) + { + accumulated_gradient[i] = accumulated_gradient[i] * alpha_ + batch_gradient[i] / batch_size; + } + for (size_t i = 0; i < weights.size(); ++i) + { + weights[i] += accumulated_gradient[i]; + } + bias += accumulated_gradient[weights.size()]; +} + +void Nesterov::add_to_batch( + std::vector & batch_gradient, + IGradientComputer & gradient_computer, + const std::vector & weights, + Float64 bias, + Float64 learning_rate, + Float64 l2_reg_coef, + Float64 target, + const IColumn ** columns, + size_t row_num) +{ + if (accumulated_gradient.empty()) + { + accumulated_gradient.resize(batch_gradient.size(), Float64{0.0}); + } + + std::vector shifted_weights(weights.size()); + for (size_t i = 0; i != shifted_weights.size(); ++i) + { + shifted_weights[i] = weights[i] + accumulated_gradient[i] * alpha_; + } + auto shifted_bias = bias + accumulated_gradient[weights.size()] * alpha_; + + gradient_computer.compute(batch_gradient, shifted_weights, shifted_bias, learning_rate, l2_reg_coef, target, columns, row_num); +} + +void Momentum::read(ReadBuffer & buf) +{ + readBinary(accumulated_gradient, buf); +} + +void Momentum::write(WriteBuffer & buf) const +{ + writeBinary(accumulated_gradient, buf); +} + +void Momentum::merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) +{ + auto & momentum_rhs = static_cast(rhs); + for (size_t i = 0; i < accumulated_gradient.size(); ++i) + { + accumulated_gradient[i] = accumulated_gradient[i] * frac + momentum_rhs.accumulated_gradient[i] * rhs_frac; + } +} + +void Momentum::update(UInt32 batch_size, std::vector & weights, Float64 & bias, const std::vector & batch_gradient) +{ + /// batch_size is already checked to be greater than 0 + if (accumulated_gradient.empty()) + { + accumulated_gradient.resize(batch_gradient.size(), Float64{0.0}); + } + + for (size_t i = 0; i < batch_gradient.size(); ++i) + { + accumulated_gradient[i] = accumulated_gradient[i] * alpha_ + batch_gradient[i] / batch_size; + } + for (size_t i = 0; i < weights.size(); ++i) + { + weights[i] += accumulated_gradient[i]; + } + bias += accumulated_gradient[weights.size()]; +} + +void StochasticGradientDescent::update( + UInt32 batch_size, std::vector & weights, Float64 & bias, const std::vector & batch_gradient) +{ + /// batch_size is already checked to be greater than 0 + for (size_t i = 0; i < weights.size(); ++i) + { + weights[i] += batch_gradient[i] / batch_size; + } + bias += batch_gradient[weights.size()] / batch_size; +} + +void IWeightsUpdater::add_to_batch( + std::vector & batch_gradient, + IGradientComputer & gradient_computer, + const std::vector & weights, + Float64 bias, + Float64 learning_rate, + Float64 l2_reg_coef, + Float64 target, + const IColumn ** columns, + size_t row_num) +{ + gradient_computer.compute(batch_gradient, weights, bias, learning_rate, l2_reg_coef, target, columns, row_num); +} + +void LogisticRegression::predict( + ColumnVector::Container & container, + Block & block, + const ColumnNumbers & arguments, + const std::vector & weights, + Float64 bias, + const Context & context) const +{ + size_t rows_num = block.rows(); + std::vector results(rows_num, bias); + + for (size_t i = 1; i < arguments.size(); ++i) + { + const ColumnWithTypeAndName & cur_col = block.getByPosition(arguments[i]); + if (!isNumber(cur_col.type)) + { + throw Exception("Prediction arguments must have numeric type", ErrorCodes::BAD_ARGUMENTS); + } + + /// If column type is already Float64 then castColumn simply returns it + auto features_col_ptr = castColumn(cur_col, std::make_shared(), context); + auto features_column = typeid_cast(features_col_ptr.get()); + + if (!features_column) + { + throw Exception("Unexpectedly cannot dynamically cast features column " + std::to_string(i), ErrorCodes::LOGICAL_ERROR); + } + + for (size_t row_num = 0; row_num != rows_num; ++row_num) + { + results[row_num] += weights[i - 1] * features_column->getElement(row_num); + } + } + + container.reserve(rows_num); + for (size_t row_num = 0; row_num != rows_num; ++row_num) + { + container.emplace_back(1 / (1 + exp(-results[row_num]))); + } +} + +void LogisticRegression::compute( + std::vector & batch_gradient, + const std::vector & weights, + Float64 bias, + Float64 learning_rate, + Float64 l2_reg_coef, + Float64 target, + const IColumn ** columns, + size_t row_num) +{ + Float64 derivative = bias; + for (size_t i = 0; i < weights.size(); ++i) + { + auto value = (*columns[i])[row_num].get(); + derivative += weights[i] * value; + } + derivative *= target; + derivative = exp(derivative); + + batch_gradient[weights.size()] += learning_rate * target / (derivative + 1); + for (size_t i = 0; i < weights.size(); ++i) + { + auto value = (*columns[i])[row_num].get(); + batch_gradient[i] += learning_rate * target * value / (derivative + 1) - 2 * l2_reg_coef * weights[i]; + } +} + +void LinearRegression::predict( + ColumnVector::Container & container, + Block & block, + const ColumnNumbers & arguments, + const std::vector & weights, + Float64 bias, + const Context & context) const +{ + if (weights.size() + 1 != arguments.size()) + { + throw Exception("In predict function number of arguments differs from the size of weights vector", ErrorCodes::LOGICAL_ERROR); + } + + size_t rows_num = block.rows(); + std::vector results(rows_num, bias); + + for (size_t i = 1; i < arguments.size(); ++i) + { + const ColumnWithTypeAndName & cur_col = block.getByPosition(arguments[i]); + if (!isNumber(cur_col.type)) + { + throw Exception("Prediction arguments must have numeric type", ErrorCodes::BAD_ARGUMENTS); + } + + /// If column type is already Float64 then castColumn simply returns it + auto features_col_ptr = castColumn(cur_col, std::make_shared(), context); + auto features_column = typeid_cast(features_col_ptr.get()); + + if (!features_column) + { + throw Exception("Unexpectedly cannot dynamically cast features column " + std::to_string(i), ErrorCodes::LOGICAL_ERROR); + } + + for (size_t row_num = 0; row_num != rows_num; ++row_num) + { + results[row_num] += weights[i - 1] * features_column->getElement(row_num); + } + } + + container.reserve(rows_num); + for (size_t row_num = 0; row_num != rows_num; ++row_num) + { + container.emplace_back(results[row_num]); + } +} + +void LinearRegression::compute( + std::vector & batch_gradient, + const std::vector & weights, + Float64 bias, + Float64 learning_rate, + Float64 l2_reg_coef, + Float64 target, + const IColumn ** columns, + size_t row_num) +{ + Float64 derivative = (target - bias); + for (size_t i = 0; i < weights.size(); ++i) + { + auto value = (*columns[i])[row_num].get(); + derivative -= weights[i] * value; + } + derivative *= (2 * learning_rate); + + batch_gradient[weights.size()] += derivative; + for (size_t i = 0; i < weights.size(); ++i) + { + auto value = (*columns[i])[row_num].get(); + batch_gradient[i] += derivative * value - 2 * l2_reg_coef * weights[i]; + } +} + +} diff --git a/dbms/src/AggregateFunctions/AggregateFunctionMLMethod.h b/dbms/src/AggregateFunctions/AggregateFunctionMLMethod.h new file mode 100644 index 00000000000..8160eb2ef7d --- /dev/null +++ b/dbms/src/AggregateFunctions/AggregateFunctionMLMethod.h @@ -0,0 +1,330 @@ +#pragma once + +#include +#include +#include +#include +#include "IAggregateFunction.h" + +namespace DB +{ +namespace ErrorCodes +{ + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int BAD_ARGUMENTS; +} + +/** +GradientComputer class computes gradient according to its loss function +*/ +class IGradientComputer +{ +public: + IGradientComputer() {} + + virtual ~IGradientComputer() = default; + + /// Adds computed gradient in new point (weights, bias) to batch_gradient + virtual void compute( + std::vector & batch_gradient, + const std::vector & weights, + Float64 bias, + Float64 learning_rate, + Float64 l2_reg_coef, + Float64 target, + const IColumn ** columns, + size_t row_num) + = 0; + + virtual void predict( + ColumnVector::Container & container, + Block & block, + const ColumnNumbers & arguments, + const std::vector & weights, + Float64 bias, + const Context & context) const = 0; +}; + + +class LinearRegression : public IGradientComputer +{ +public: + LinearRegression() {} + + void compute( + std::vector & batch_gradient, + const std::vector & weights, + Float64 bias, + Float64 learning_rate, + Float64 l2_reg_coef, + Float64 target, + const IColumn ** columns, + size_t row_num) override; + + void predict( + ColumnVector::Container & container, + Block & block, + const ColumnNumbers & arguments, + const std::vector & weights, + Float64 bias, + const Context & context) const override; +}; + + +class LogisticRegression : public IGradientComputer +{ +public: + LogisticRegression() {} + + void compute( + std::vector & batch_gradient, + const std::vector & weights, + Float64 bias, + Float64 learning_rate, + Float64 l2_reg_coef, + Float64 target, + const IColumn ** columns, + size_t row_num) override; + + void predict( + ColumnVector::Container & container, + Block & block, + const ColumnNumbers & arguments, + const std::vector & weights, + Float64 bias, + const Context & context) const override; +}; + + +/** +* IWeightsUpdater class defines the way to update current weights +* and uses GradientComputer class on each iteration +*/ +class IWeightsUpdater +{ +public: + virtual ~IWeightsUpdater() = default; + + /// Calls GradientComputer to update current mini-batch + virtual void add_to_batch( + std::vector & batch_gradient, + IGradientComputer & gradient_computer, + const std::vector & weights, + Float64 bias, + Float64 learning_rate, + Float64 l2_reg_coef, + Float64 target, + const IColumn ** columns, + size_t row_num); + + /// Updates current weights according to the gradient from the last mini-batch + virtual void update(UInt32 batch_size, std::vector & weights, Float64 & bias, const std::vector & gradient) = 0; + + /// Used during the merge of two states + virtual void merge(const IWeightsUpdater &, Float64, Float64) {} + + /// Used for serialization when necessary + virtual void write(WriteBuffer &) const {} + + /// Used for serialization when necessary + virtual void read(ReadBuffer &) {} +}; + + +class StochasticGradientDescent : public IWeightsUpdater +{ +public: + void update(UInt32 batch_size, std::vector & weights, Float64 & bias, const std::vector & batch_gradient) override; +}; + + +class Momentum : public IWeightsUpdater +{ +public: + Momentum() {} + + Momentum(Float64 alpha) : alpha_(alpha) {} + + void update(UInt32 batch_size, std::vector & weights, Float64 & bias, const std::vector & batch_gradient) override; + + virtual void merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) override; + + void write(WriteBuffer & buf) const override; + + void read(ReadBuffer & buf) override; + +private: + Float64 alpha_{0.1}; + std::vector accumulated_gradient; +}; + + +class Nesterov : public IWeightsUpdater +{ +public: + Nesterov() {} + + Nesterov(Float64 alpha) : alpha_(alpha) {} + + void add_to_batch( + std::vector & batch_gradient, + IGradientComputer & gradient_computer, + const std::vector & weights, + Float64 bias, + Float64 learning_rate, + Float64 l2_reg_coef, + Float64 target, + const IColumn ** columns, + size_t row_num) override; + + void update(UInt32 batch_size, std::vector & weights, Float64 & bias, const std::vector & batch_gradient) override; + + virtual void merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) override; + + void write(WriteBuffer & buf) const override; + + void read(ReadBuffer & buf) override; + +private: + Float64 alpha_{0.1}; + std::vector accumulated_gradient; +}; + + +/** +* LinearModelData is a class which manages current state of learning +*/ +class LinearModelData +{ +public: + LinearModelData() {} + + LinearModelData( + Float64 learning_rate, + Float64 l2_reg_coef, + UInt32 param_num, + UInt32 batch_capacity, + std::shared_ptr gradient_computer, + std::shared_ptr weights_updater); + + void add(const IColumn ** columns, size_t row_num); + + void merge(const LinearModelData & rhs); + + void write(WriteBuffer & buf) const; + + void read(ReadBuffer & buf); + + void + predict(ColumnVector::Container & container, Block & block, const ColumnNumbers & arguments, const Context & context) const; + +private: + std::vector weights; + Float64 bias{0.0}; + + Float64 learning_rate; + Float64 l2_reg_coef; + UInt32 batch_capacity; + + UInt32 iter_num = 0; + std::vector gradient_batch; + UInt32 batch_size; + + std::shared_ptr gradient_computer; + std::shared_ptr weights_updater; + + /** + * The function is called when we want to flush current batch and update our weights + */ + void update_state(); +}; + + +template < + /// Implemented Machine Learning method + typename Data, + /// Name of the method + typename Name> +class AggregateFunctionMLMethod final : public IAggregateFunctionDataHelper> +{ +public: + String getName() const override { return Name::name; } + + explicit AggregateFunctionMLMethod( + UInt32 param_num, + std::shared_ptr gradient_computer, + std::shared_ptr weights_updater, + Float64 learning_rate, + Float64 l2_reg_coef, + UInt32 batch_size, + const DataTypes & arguments_types, + const Array & params) + : IAggregateFunctionDataHelper>(arguments_types, params) + , param_num(param_num) + , learning_rate(learning_rate) + , l2_reg_coef(l2_reg_coef) + , batch_size(batch_size) + , gradient_computer(std::move(gradient_computer)) + , weights_updater(std::move(weights_updater)) + { + } + + DataTypePtr getReturnType() const override { return std::make_shared>(); } + + void create(AggregateDataPtr place) const override + { + new (place) Data(learning_rate, l2_reg_coef, param_num, batch_size, gradient_computer, weights_updater); + } + + void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override + { + this->data(place).add(columns, row_num); + } + + void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override { this->data(place).merge(this->data(rhs)); } + + void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override { this->data(place).write(buf); } + + void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override { this->data(place).read(buf); } + + void predictValues( + ConstAggregateDataPtr place, IColumn & to, Block & block, const ColumnNumbers & arguments, const Context & context) const override + { + if (arguments.size() != param_num + 1) + throw Exception( + "Predict got incorrect number of arguments. Got: " + std::to_string(arguments.size()) + + ". Required: " + std::to_string(param_num + 1), + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + + auto & column = dynamic_cast &>(to); + + this->data(place).predict(column.getData(), block, arguments, context); + } + + void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override + { + std::ignore = place; + std::ignore = to; + throw std::runtime_error("not implemented"); + } + + const char * getHeaderFilePath() const override { return __FILE__; } + +private: + UInt32 param_num; + Float64 learning_rate; + Float64 l2_reg_coef; + UInt32 batch_size; + std::shared_ptr gradient_computer; + std::shared_ptr weights_updater; +}; + +struct NameLinearRegression +{ + static constexpr auto name = "LinearRegression"; +}; +struct NameLogisticRegression +{ + static constexpr auto name = "LogisticRegression"; +}; +} diff --git a/dbms/src/AggregateFunctions/AggregateFunctionQuantile.cpp b/dbms/src/AggregateFunctions/AggregateFunctionQuantile.cpp index 9c4e63c1dc2..d41654daee5 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionQuantile.cpp +++ b/dbms/src/AggregateFunctions/AggregateFunctionQuantile.cpp @@ -43,8 +43,12 @@ template using FuncQuantilesTDigestWeighted = template