diff --git a/.gitmodules b/.gitmodules index f6f2c652004..dab5b985760 100644 --- a/.gitmodules +++ b/.gitmodules @@ -140,3 +140,7 @@ [submodule "contrib/ryu"] path = contrib/ryu url = https://github.com/ClickHouse-Extras/ryu.git +[submodule "contrib/avro"] + path = contrib/avro + url = https://github.com/ClickHouse-Extras/avro.git + ignore = untracked diff --git a/CMakeLists.txt b/CMakeLists.txt index 7c8ccb6e17c..c194ea5bdc7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -352,6 +352,7 @@ include (cmake/find/simdjson.cmake) include (cmake/find/rapidjson.cmake) include (cmake/find/fastops.cmake) include (cmake/find/orc.cmake) +include (cmake/find/avro.cmake) find_contrib_lib(cityhash) find_contrib_lib(farmhash) diff --git a/cmake/find/avro.cmake b/cmake/find/avro.cmake new file mode 100644 index 00000000000..cdb3fc84d3d --- /dev/null +++ b/cmake/find/avro.cmake @@ -0,0 +1,28 @@ +option (ENABLE_AVRO "Enable Avro" ${ENABLE_LIBRARIES}) + +if (ENABLE_AVRO) + +option (USE_INTERNAL_AVRO_LIBRARY "Set to FALSE to use system avro library instead of bundled" ${NOT_UNBUNDLED}) + +if(NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/avro/lang/c++/CMakeLists.txt") + if(USE_INTERNAL_AVRO_LIBRARY) + message(WARNING "submodule contrib/avro is missing. to fix try run: \n git submodule update --init --recursive") + endif() + set(MISSING_INTERNAL_AVRO_LIBRARY 1) + set(USE_INTERNAL_AVRO_LIBRARY 0) +endif() + +if (NOT USE_INTERNAL_AVRO_LIBRARY) +elseif(NOT MISSING_INTERNAL_AVRO_LIBRARY) + include(cmake/find/snappy.cmake) + set(AVROCPP_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/avro/lang/c++/include") + set(AVROCPP_LIBRARY avrocpp) +endif () + +if (AVROCPP_LIBRARY AND AVROCPP_INCLUDE_DIR) + set(USE_AVRO 1) +endif() + +endif() + +message (STATUS "Using avro=${USE_AVRO}: ${AVROCPP_INCLUDE_DIR} : ${AVROCPP_LIBRARY}") diff --git a/cmake/find/boost.cmake b/cmake/find/boost.cmake index 6776d0cea06..ec10a34d839 100644 --- a/cmake/find/boost.cmake +++ b/cmake/find/boost.cmake @@ -31,6 +31,7 @@ if (NOT Boost_SYSTEM_LIBRARY AND NOT MISSING_INTERNAL_BOOST_LIBRARY) set (Boost_SYSTEM_LIBRARY boost_system_internal) set (Boost_PROGRAM_OPTIONS_LIBRARY boost_program_options_internal) set (Boost_FILESYSTEM_LIBRARY boost_filesystem_internal ${Boost_SYSTEM_LIBRARY}) + set (Boost_IOSTREAMS_LIBRARY boost_iostreams_internal) set (Boost_REGEX_LIBRARY boost_regex_internal) set (Boost_INCLUDE_DIRS) @@ -48,4 +49,4 @@ if (NOT Boost_SYSTEM_LIBRARY AND NOT MISSING_INTERNAL_BOOST_LIBRARY) list (APPEND Boost_INCLUDE_DIRS "${ClickHouse_SOURCE_DIR}/contrib/boost") endif () -message (STATUS "Using Boost: ${Boost_INCLUDE_DIRS} : ${Boost_PROGRAM_OPTIONS_LIBRARY},${Boost_SYSTEM_LIBRARY},${Boost_FILESYSTEM_LIBRARY},${Boost_REGEX_LIBRARY}") +message (STATUS "Using Boost: ${Boost_INCLUDE_DIRS} : ${Boost_PROGRAM_OPTIONS_LIBRARY},${Boost_SYSTEM_LIBRARY},${Boost_FILESYSTEM_LIBRARY},${Boost_IOSTREAMS_LIBRARY},${Boost_REGEX_LIBRARY}") diff --git a/cmake/find/poco.cmake b/cmake/find/poco.cmake index b44d2932276..0c676d374f1 100644 --- a/cmake/find/poco.cmake +++ b/cmake/find/poco.cmake @@ -14,6 +14,7 @@ if (NOT ENABLE_LIBRARIES) set (ENABLE_POCO_REDIS ${ENABLE_LIBRARIES} CACHE BOOL "") set (ENABLE_POCO_ODBC ${ENABLE_LIBRARIES} CACHE BOOL "") set (ENABLE_POCO_SQL ${ENABLE_LIBRARIES} CACHE BOOL "") + set (ENABLE_POCO_JSON ${ENABLE_LIBRARIES} CACHE BOOL "") endif () set (POCO_COMPONENTS Net XML SQL Data) @@ -34,6 +35,9 @@ if (NOT DEFINED ENABLE_POCO_ODBC OR ENABLE_POCO_ODBC) list (APPEND POCO_COMPONENTS DataODBC) list (APPEND POCO_COMPONENTS SQLODBC) endif () +if (NOT DEFINED ENABLE_POCO_JSON OR ENABLE_POCO_JSON) + list (APPEND POCO_COMPONENTS JSON) +endif () if (NOT USE_INTERNAL_POCO_LIBRARY) find_package (Poco COMPONENTS ${POCO_COMPONENTS}) @@ -112,6 +116,11 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY) endif () endif () + if (NOT DEFINED ENABLE_POCO_JSON OR ENABLE_POCO_JSON) + set (Poco_JSON_LIBRARY PocoJSON) + set (Poco_JSON_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/poco/JSON/include/") + endif () + if (OPENSSL_FOUND AND (NOT DEFINED ENABLE_POCO_NETSSL OR ENABLE_POCO_NETSSL)) set (Poco_NetSSL_LIBRARY PocoNetSSL ${OPENSSL_LIBRARIES}) set (Poco_Crypto_LIBRARY PocoCrypto ${OPENSSL_LIBRARIES}) @@ -145,8 +154,11 @@ endif () if (Poco_SQLODBC_LIBRARY AND ODBC_FOUND) set (USE_POCO_SQLODBC 1) endif () +if (Poco_JSON_LIBRARY) + set (USE_POCO_JSON 1) +endif () -message(STATUS "Using Poco: ${Poco_INCLUDE_DIRS} : ${Poco_Foundation_LIBRARY},${Poco_Util_LIBRARY},${Poco_Net_LIBRARY},${Poco_NetSSL_LIBRARY},${Poco_Crypto_LIBRARY},${Poco_XML_LIBRARY},${Poco_Data_LIBRARY},${Poco_DataODBC_LIBRARY},${Poco_SQL_LIBRARY},${Poco_SQLODBC_LIBRARY},${Poco_MongoDB_LIBRARY},${Poco_Redis_LIBRARY}; MongoDB=${USE_POCO_MONGODB}, Redis=${USE_POCO_REDIS}, DataODBC=${USE_POCO_DATAODBC}, NetSSL=${USE_POCO_NETSSL}") +message(STATUS "Using Poco: ${Poco_INCLUDE_DIRS} : ${Poco_Foundation_LIBRARY},${Poco_Util_LIBRARY},${Poco_Net_LIBRARY},${Poco_NetSSL_LIBRARY},${Poco_Crypto_LIBRARY},${Poco_XML_LIBRARY},${Poco_Data_LIBRARY},${Poco_DataODBC_LIBRARY},${Poco_SQL_LIBRARY},${Poco_SQLODBC_LIBRARY},${Poco_MongoDB_LIBRARY},${Poco_Redis_LIBRARY},${Poco_JSON_LIBRARY}; MongoDB=${USE_POCO_MONGODB}, Redis=${USE_POCO_REDIS}, DataODBC=${USE_POCO_DATAODBC}, NetSSL=${USE_POCO_NETSSL}, JSON=${USE_POCO_JSON}") # How to make sutable poco: # use branch: diff --git a/cmake/sanitize.cmake b/cmake/sanitize.cmake index 8a3dd7f7634..13947425f7b 100644 --- a/cmake/sanitize.cmake +++ b/cmake/sanitize.cmake @@ -53,6 +53,7 @@ if (SANITIZE) set (USE_CAPNP 0 CACHE BOOL "") set (USE_INTERNAL_ORC_LIBRARY 0 CACHE BOOL "") set (USE_ORC 0 CACHE BOOL "") + set (USE_AVRO 0 CACHE BOOL "") set (ENABLE_SSL 0 CACHE BOOL "") elseif (SANITIZE STREQUAL "thread") diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 89f12ce0b70..7c9db5bb06f 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -146,6 +146,20 @@ if (ENABLE_ICU AND USE_INTERNAL_ICU_LIBRARY) add_subdirectory (icu-cmake) endif () +if(USE_INTERNAL_SNAPPY_LIBRARY) + set(SNAPPY_BUILD_TESTS 0 CACHE INTERNAL "") + if (NOT MAKE_STATIC_LIBRARIES) + set(BUILD_SHARED_LIBS 1) # TODO: set at root dir + endif() + + add_subdirectory(snappy) + + set (SNAPPY_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/snappy") + if(SANITIZE STREQUAL "undefined") + target_compile_options(${SNAPPY_LIBRARY} PRIVATE -fno-sanitize=undefined) + endif() +endif() + if (USE_INTERNAL_PARQUET_LIBRARY) if (USE_INTERNAL_PARQUET_LIBRARY_NATIVE_CMAKE) # We dont use arrow's cmakefiles because they uses too many depends and download some libs in compile time @@ -189,20 +203,6 @@ if (USE_INTERNAL_PARQUET_LIBRARY_NATIVE_CMAKE) endif() else() - if(USE_INTERNAL_SNAPPY_LIBRARY) - set(SNAPPY_BUILD_TESTS 0 CACHE INTERNAL "") - if (NOT MAKE_STATIC_LIBRARIES) - set(BUILD_SHARED_LIBS 1) # TODO: set at root dir - endif() - - add_subdirectory(snappy) - - set (SNAPPY_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/snappy") - if(SANITIZE STREQUAL "undefined") - target_compile_options(${SNAPPY_LIBRARY} PRIVATE -fno-sanitize=undefined) - endif() - endif() - add_subdirectory(arrow-cmake) # The library is large - avoid bloat. @@ -212,6 +212,10 @@ else() endif() endif() +if (USE_INTERNAL_AVRO_LIBRARY) + add_subdirectory(avro-cmake) +endif() + if (USE_INTERNAL_POCO_LIBRARY) set (POCO_VERBOSE_MESSAGES 0 CACHE INTERNAL "") set (save_CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS}) diff --git a/contrib/avro b/contrib/avro new file mode 160000 index 00000000000..5b2752041c8 --- /dev/null +++ b/contrib/avro @@ -0,0 +1 @@ +Subproject commit 5b2752041c8d2f75eb5c1dbec8b4c25fc0e24d12 diff --git a/contrib/avro-cmake/CMakeLists.txt b/contrib/avro-cmake/CMakeLists.txt new file mode 100644 index 00000000000..f544b3c50cd --- /dev/null +++ b/contrib/avro-cmake/CMakeLists.txt @@ -0,0 +1,70 @@ +set(AVROCPP_ROOT_DIR ${CMAKE_SOURCE_DIR}/contrib/avro/lang/c++) +set(AVROCPP_INCLUDE_DIR ${AVROCPP_ROOT_DIR}/api) +set(AVROCPP_SOURCE_DIR ${AVROCPP_ROOT_DIR}/impl) + +set (CMAKE_CXX_STANDARD 17) + +if (EXISTS ${AVROCPP_ROOT_DIR}/../../share/VERSION.txt) + file(READ "${AVROCPP_ROOT_DIR}/../../share/VERSION.txt" + AVRO_VERSION) +endif() + +string(REPLACE "\n" "" AVRO_VERSION ${AVRO_VERSION}) +set (AVRO_VERSION_MAJOR ${AVRO_VERSION}) +set (AVRO_VERSION_MINOR "0") + +set (AVROCPP_SOURCE_FILES + ${AVROCPP_SOURCE_DIR}/Compiler.cc + ${AVROCPP_SOURCE_DIR}/Node.cc + ${AVROCPP_SOURCE_DIR}/LogicalType.cc + ${AVROCPP_SOURCE_DIR}/NodeImpl.cc + ${AVROCPP_SOURCE_DIR}/ResolverSchema.cc + ${AVROCPP_SOURCE_DIR}/Schema.cc + ${AVROCPP_SOURCE_DIR}/Types.cc + ${AVROCPP_SOURCE_DIR}/ValidSchema.cc + ${AVROCPP_SOURCE_DIR}/Zigzag.cc + ${AVROCPP_SOURCE_DIR}/BinaryEncoder.cc + ${AVROCPP_SOURCE_DIR}/BinaryDecoder.cc + ${AVROCPP_SOURCE_DIR}/Stream.cc + ${AVROCPP_SOURCE_DIR}/FileStream.cc + ${AVROCPP_SOURCE_DIR}/Generic.cc + ${AVROCPP_SOURCE_DIR}/GenericDatum.cc + ${AVROCPP_SOURCE_DIR}/DataFile.cc + ${AVROCPP_SOURCE_DIR}/parsing/Symbol.cc + ${AVROCPP_SOURCE_DIR}/parsing/ValidatingCodec.cc + ${AVROCPP_SOURCE_DIR}/parsing/JsonCodec.cc + ${AVROCPP_SOURCE_DIR}/parsing/ResolvingDecoder.cc + ${AVROCPP_SOURCE_DIR}/json/JsonIO.cc + ${AVROCPP_SOURCE_DIR}/json/JsonDom.cc + ${AVROCPP_SOURCE_DIR}/Resolver.cc + ${AVROCPP_SOURCE_DIR}/Validator.cc + ) + +add_library (avrocpp ${AVROCPP_SOURCE_FILES}) +set_target_properties (avrocpp PROPERTIES VERSION ${AVRO_VERSION_MAJOR}.${AVRO_VERSION_MINOR}) + +target_include_directories(avrocpp SYSTEM PUBLIC ${AVROCPP_INCLUDE_DIR}) + +target_include_directories(avrocpp SYSTEM PUBLIC ${Boost_INCLUDE_DIRS}) +target_link_libraries (avrocpp ${Boost_IOSTREAMS_LIBRARY}) + +if (SNAPPY_INCLUDE_DIR AND SNAPPY_LIBRARY) + target_compile_definitions (avrocpp PUBLIC SNAPPY_CODEC_AVAILABLE) + target_include_directories (avrocpp PRIVATE ${SNAPPY_INCLUDE_DIR}) + target_link_libraries (avrocpp ${SNAPPY_LIBRARY}) +endif () + +if (COMPILER_GCC) + set (SUPPRESS_WARNINGS -Wno-non-virtual-dtor) +elseif (COMPILER_CLANG) + set (SUPPRESS_WARNINGS -Wno-non-virtual-dtor) +endif () + +target_compile_options(avrocpp PRIVATE ${SUPPRESS_WARNINGS}) + +# create a symlink to include headers with +ADD_CUSTOM_TARGET(avro_symlink_headers ALL + COMMAND ${CMAKE_COMMAND} -E make_directory ${AVROCPP_ROOT_DIR}/include + COMMAND ${CMAKE_COMMAND} -E create_symlink ${AVROCPP_ROOT_DIR}/api ${AVROCPP_ROOT_DIR}/include/avro +) +add_dependencies(avrocpp avro_symlink_headers) \ No newline at end of file diff --git a/contrib/boost b/contrib/boost index 830e51edb59..86be2aef20b 160000 --- a/contrib/boost +++ b/contrib/boost @@ -1 +1 @@ -Subproject commit 830e51edb59c4f37a8638138581e1e56c29ac44f +Subproject commit 86be2aef20bee2356b744e5569eed6eaded85dbe diff --git a/contrib/boost-cmake/CMakeLists.txt b/contrib/boost-cmake/CMakeLists.txt index d9a8a70ef17..54dcd750320 100644 --- a/contrib/boost-cmake/CMakeLists.txt +++ b/contrib/boost-cmake/CMakeLists.txt @@ -37,3 +37,8 @@ target_link_libraries(boost_filesystem_internal PRIVATE boost_system_internal) if (USE_INTERNAL_PARQUET_LIBRARY) add_boost_lib(regex) endif() + +if (USE_INTERNAL_AVRO_LIBRARY) + add_boost_lib(iostreams) + target_link_libraries(boost_iostreams_internal PUBLIC ${ZLIB_LIBRARIES}) +endif() diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index d87ae447faa..eeda7aa6a1f 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -504,6 +504,10 @@ if (USE_POCO_NETSSL) dbms_target_link_libraries (PRIVATE ${Poco_NetSSL_LIBRARY} ${Poco_Crypto_LIBRARY}) endif() +if (USE_POCO_JSON) + dbms_target_link_libraries (PRIVATE ${Poco_JSON_LIBRARY}) +endif() + dbms_target_link_libraries (PRIVATE ${Poco_Foundation_LIBRARY}) if (USE_ICU) @@ -522,6 +526,11 @@ if (USE_PARQUET) endif () endif () +if (USE_AVRO) + dbms_target_link_libraries(PRIVATE ${AVROCPP_LIBRARY}) + dbms_target_include_directories (SYSTEM BEFORE PRIVATE ${AVROCPP_INCLUDE_DIR}) +endif () + if (OPENSSL_CRYPTO_LIBRARY) dbms_target_link_libraries (PRIVATE ${OPENSSL_CRYPTO_LIBRARY}) target_link_libraries (clickhouse_common_io PRIVATE ${OPENSSL_CRYPTO_LIBRARY}) diff --git a/dbms/src/AggregateFunctions/AggregateFunctionSequenceMatch.h b/dbms/src/AggregateFunctions/AggregateFunctionSequenceMatch.h index 61fd28f2a70..47240db8b0d 100644 --- a/dbms/src/AggregateFunctions/AggregateFunctionSequenceMatch.h +++ b/dbms/src/AggregateFunctions/AggregateFunctionSequenceMatch.h @@ -309,7 +309,7 @@ protected: /// Uses a DFA based approach in order to better handle patterns without /// time assertions. /// - /// NOTE: This implementation relies on the assumption that the pattern are *small*. + /// NOTE: This implementation relies on the assumption that the pattern is *small*. /// /// This algorithm performs in O(mn) (with m the number of DFA states and N the number /// of events) with a memory consumption and memory allocations in O(m). It means that diff --git a/dbms/src/Common/RemoteHostFilter.cpp b/dbms/src/Common/RemoteHostFilter.cpp index 16aaac35dbe..4c4aa3bca81 100644 --- a/dbms/src/Common/RemoteHostFilter.cpp +++ b/dbms/src/Common/RemoteHostFilter.cpp @@ -1,12 +1,13 @@ #include -#include #include -#include #include +#include +#include #include #include #include + namespace DB { namespace ErrorCodes diff --git a/dbms/src/Common/RemoteHostFilter.h b/dbms/src/Common/RemoteHostFilter.h index 86743891051..48d9b2bda7c 100644 --- a/dbms/src/Common/RemoteHostFilter.h +++ b/dbms/src/Common/RemoteHostFilter.h @@ -1,17 +1,19 @@ #pragma once +#include #include #include -#include -#include +namespace Poco { class URI; } +namespace Poco { namespace Util { class AbstractConfiguration; } } + namespace DB { class RemoteHostFilter { /** - * This class checks if url is allowed. + * This class checks if URL is allowed. * If primary_hosts and regexp_hosts are empty all urls are allowed. */ public: @@ -25,6 +27,7 @@ private: std::unordered_set primary_hosts; /// Allowed primary () URL from config.xml std::vector regexp_hosts; /// Allowed regexp () URL from config.xml - bool checkForDirectEntry(const std::string & str) const; /// Checks if the primary_hosts and regexp_hosts contain str. If primary_hosts and regexp_hosts are empty return true. + /// Checks if the primary_hosts and regexp_hosts contain str. If primary_hosts and regexp_hosts are empty return true. + bool checkForDirectEntry(const std::string & str) const; }; } diff --git a/dbms/src/Core/Settings.h b/dbms/src/Core/Settings.h index 9d115bd8d21..7bb8fa12d05 100644 --- a/dbms/src/Core/Settings.h +++ b/dbms/src/Core/Settings.h @@ -186,6 +186,7 @@ struct Settings : public SettingsCollection M(SettingBool, input_format_values_interpret_expressions, true, "For Values format: if the field could not be parsed by streaming parser, run SQL parser and try to interpret it as SQL expression.", 0) \ M(SettingBool, input_format_values_deduce_templates_of_expressions, true, "For Values format: if the field could not be parsed by streaming parser, run SQL parser, deduce template of the SQL expression, try to parse all rows using template and then interpret expression for all rows.", 0) \ M(SettingBool, input_format_values_accurate_types_of_literals, true, "For Values format: when parsing and interpreting expressions using template, check actual type of literal to avoid possible overflow and precision issues.", 0) \ + M(SettingString, input_format_avro_schema_registry_url, "", "For AvroConfluent format: Confluent Schema Registry URL.", 0) \ \ M(SettingBool, output_format_json_quote_64bit_integers, true, "Controls quoting of 64-bit integers in JSON output format.", 0) \ \ @@ -197,6 +198,8 @@ struct Settings : public SettingsCollection M(SettingUInt64, output_format_pretty_max_column_pad_width, 250, "Maximum width to pad all values in a column in Pretty formats.", 0) \ M(SettingBool, output_format_pretty_color, true, "Use ANSI escape sequences to paint colors in Pretty formats", 0) \ M(SettingUInt64, output_format_parquet_row_group_size, 1000000, "Row group size in rows.", 0) \ + M(SettingString, output_format_avro_codec, "", "Compression codec used for output. Possible values: 'null', 'deflate', 'snappy'.", 0) \ + M(SettingUInt64, output_format_avro_sync_interval, 16 * 1024, "Sync interval in bytes.", 0) \ \ M(SettingBool, use_client_time_zone, false, "Use client timezone for interpreting DateTime string values, instead of adopting server timezone.", 0) \ \ diff --git a/dbms/src/Core/Types.h b/dbms/src/Core/Types.h index ea80ab7d427..4f350ba00d5 100644 --- a/dbms/src/Core/Types.h +++ b/dbms/src/Core/Types.h @@ -31,7 +31,6 @@ enum class TypeIndex Float64, Date, DateTime, - DateTime32 = DateTime, DateTime64, String, FixedString, @@ -158,8 +157,6 @@ using Decimal32 = Decimal; using Decimal64 = Decimal; using Decimal128 = Decimal; -// TODO (nemkov): consider making a strong typedef -//using DateTime32 = time_t; using DateTime64 = Decimal64; template <> struct TypeName { static const char * get() { return "Decimal32"; } }; diff --git a/dbms/src/Core/config_core.h.in b/dbms/src/Core/config_core.h.in index fdbd69decd3..2365340cf33 100644 --- a/dbms/src/Core/config_core.h.in +++ b/dbms/src/Core/config_core.h.in @@ -10,5 +10,6 @@ #cmakedefine01 USE_POCO_DATAODBC #cmakedefine01 USE_POCO_MONGODB #cmakedefine01 USE_POCO_REDIS +#cmakedefine01 USE_POCO_JSON #cmakedefine01 USE_INTERNAL_LLVM_LIBRARY #cmakedefine01 USE_SSL diff --git a/dbms/src/Formats/FormatFactory.cpp b/dbms/src/Formats/FormatFactory.cpp index 240e591123f..f812b56aa5d 100644 --- a/dbms/src/Formats/FormatFactory.cpp +++ b/dbms/src/Formats/FormatFactory.cpp @@ -68,6 +68,7 @@ static FormatSettings getInputFormatSetting(const Settings & settings, const Con format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter; format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter; format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter; + format_settings.avro.schema_registry_url = settings.input_format_avro_schema_registry_url; return format_settings; } @@ -99,6 +100,8 @@ static FormatSettings getOutputFormatSetting(const Settings & settings, const Co format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter; format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter; format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter; + format_settings.avro.output_codec = settings.output_format_avro_codec; + format_settings.avro.output_sync_interval = settings.output_format_avro_sync_interval; return format_settings; } @@ -325,6 +328,8 @@ FormatFactory::FormatFactory() registerInputFormatProcessorORC(*this); registerInputFormatProcessorParquet(*this); registerOutputFormatProcessorParquet(*this); + registerInputFormatProcessorAvro(*this); + registerOutputFormatProcessorAvro(*this); registerInputFormatProcessorTemplate(*this); registerOutputFormatProcessorTemplate(*this); diff --git a/dbms/src/Formats/FormatFactory.h b/dbms/src/Formats/FormatFactory.h index cbf64afeaec..345ceaee690 100644 --- a/dbms/src/Formats/FormatFactory.h +++ b/dbms/src/Formats/FormatFactory.h @@ -166,6 +166,8 @@ void registerInputFormatProcessorORC(FormatFactory & factory); void registerOutputFormatProcessorParquet(FormatFactory & factory); void registerInputFormatProcessorProtobuf(FormatFactory & factory); void registerOutputFormatProcessorProtobuf(FormatFactory & factory); +void registerInputFormatProcessorAvro(FormatFactory & factory); +void registerOutputFormatProcessorAvro(FormatFactory & factory); void registerInputFormatProcessorTemplate(FormatFactory & factory); void registerOutputFormatProcessorTemplate(FormatFactory &factory); diff --git a/dbms/src/Formats/FormatSettings.h b/dbms/src/Formats/FormatSettings.h index 6219edf6e6d..cc6f7f4dbb3 100644 --- a/dbms/src/Formats/FormatSettings.h +++ b/dbms/src/Formats/FormatSettings.h @@ -110,6 +110,16 @@ struct FormatSettings }; Custom custom; + + struct Avro + { + String schema_registry_url; + String output_codec; + UInt64 output_sync_interval = 16 * 1024; + }; + + Avro avro; + }; } diff --git a/dbms/src/Formats/config_formats.h.in b/dbms/src/Formats/config_formats.h.in index 1ddd0e18aa9..308ded92b5d 100644 --- a/dbms/src/Formats/config_formats.h.in +++ b/dbms/src/Formats/config_formats.h.in @@ -2,6 +2,7 @@ // .h autogenerated by cmake! +#cmakedefine01 USE_AVRO #cmakedefine01 USE_CAPNP #cmakedefine01 USE_SNAPPY #cmakedefine01 USE_PARQUET diff --git a/dbms/src/IO/ReadHelpers.h b/dbms/src/IO/ReadHelpers.h index fc8e444330c..2e6e51a7835 100644 --- a/dbms/src/IO/ReadHelpers.h +++ b/dbms/src/IO/ReadHelpers.h @@ -746,6 +746,23 @@ inline void readBinary(Decimal128 & x, ReadBuffer & buf) { readPODBinary(x, buf) inline void readBinary(LocalDate & x, ReadBuffer & buf) { readPODBinary(x, buf); } +template +inline std::enable_if_t && (sizeof(T) <= 8), void> +readBinaryBigEndian(T & x, ReadBuffer & buf) /// Assuming little endian architecture. +{ + readPODBinary(x, buf); + + if constexpr (sizeof(x) == 1) + return; + else if constexpr (sizeof(x) == 2) + x = __builtin_bswap16(x); + else if constexpr (sizeof(x) == 4) + x = __builtin_bswap32(x); + else if constexpr (sizeof(x) == 8) + x = __builtin_bswap64(x); +} + + /// Generic methods to read value in text tab-separated format. template inline std::enable_if_t, void> diff --git a/dbms/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/dbms/src/Processors/Formats/Impl/AvroRowInputFormat.cpp new file mode 100644 index 00000000000..acbd892eb48 --- /dev/null +++ b/dbms/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -0,0 +1,671 @@ +#include "AvroRowInputFormat.h" +#if USE_AVRO + +#include + +#include +#include + +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int THERE_IS_NO_COLUMN; + extern const int INCORRECT_DATA; + extern const int ILLEGAL_COLUMN; + extern const int TYPE_MISMATCH; +} + +class InputStreamReadBufferAdapter : public avro::InputStream +{ +public: + InputStreamReadBufferAdapter(ReadBuffer & in_) : in(in_) {} + + bool next(const uint8_t ** data, size_t * len) override + { + if (in.eof()) + { + *len = 0; + return false; + } + + *data = reinterpret_cast(in.position()); + *len = in.available(); + + in.position() += in.available(); + return true; + } + + void backup(size_t len) override { in.position() -= len; } + + void skip(size_t len) override { in.tryIgnore(len); } + + size_t byteCount() const override { return in.count(); } + +private: + ReadBuffer & in; +}; + +static void deserializeNoop(IColumn &, avro::Decoder &) +{ +} + +/// Insert value with conversion to the column of target type. +template +static void insertNumber(IColumn & column, WhichDataType type, T value) +{ + switch (type.idx) + { + case TypeIndex::UInt8: + assert_cast(column).insertValue(value); + break; + case TypeIndex::Date: [[fallthrough]]; + case TypeIndex::UInt16: + assert_cast(column).insertValue(value); + break; + case TypeIndex::DateTime: [[fallthrough]]; + case TypeIndex::UInt32: + assert_cast(column).insertValue(value); + break; + case TypeIndex::DateTime64: [[fallthrough]]; + case TypeIndex::UInt64: + assert_cast(column).insertValue(value); + break; + case TypeIndex::Int8: + assert_cast(column).insertValue(value); + break; + case TypeIndex::Int16: + assert_cast(column).insertValue(value); + break; + case TypeIndex::Int32: + assert_cast(column).insertValue(value); + break; + case TypeIndex::Int64: + assert_cast(column).insertValue(value); + break; + case TypeIndex::Float32: + assert_cast(column).insertValue(value); + break; + case TypeIndex::Float64: + assert_cast(column).insertValue(value); + break; + default: + throw Exception("Type is not compatible with Avro", ErrorCodes::ILLEGAL_COLUMN); + } +} + + +AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type) +{ + WhichDataType target(target_type); + switch (root_node->type()) + { + case avro::AVRO_STRING: [[fallthrough]]; + case avro::AVRO_BYTES: + if (target.isString() || target.isFixedString()) + { + return [tmp = std::string()](IColumn & column, avro::Decoder & decoder) mutable + { + decoder.decodeString(tmp); + column.insertData(tmp.c_str(), tmp.length()); + }; + } + break; + case avro::AVRO_INT: + return [target](IColumn & column, avro::Decoder & decoder) + { + insertNumber(column, target, decoder.decodeInt()); + }; + case avro::AVRO_LONG: + if (target.isDateTime64()) + { + auto date_time_scale = assert_cast(*target_type).getScale(); + auto logical_type = root_node->logicalType().type(); + if ((logical_type == avro::LogicalType::TIMESTAMP_MILLIS && date_time_scale == 3) + || (logical_type == avro::LogicalType::TIMESTAMP_MICROS && date_time_scale == 6)) + { + return [](IColumn & column, avro::Decoder & decoder) + { + assert_cast(column).insertValue(decoder.decodeLong()); + }; + } + } + else + { + return [target](IColumn & column, avro::Decoder & decoder) + { + insertNumber(column, target, decoder.decodeLong()); + }; + } + break; + case avro::AVRO_FLOAT: + return [target](IColumn & column, avro::Decoder & decoder) + { + insertNumber(column, target, decoder.decodeFloat()); + }; + case avro::AVRO_DOUBLE: + return [target](IColumn & column, avro::Decoder & decoder) + { + insertNumber(column, target, decoder.decodeDouble()); + }; + case avro::AVRO_BOOL: + return [target](IColumn & column, avro::Decoder & decoder) + { + insertNumber(column, target, decoder.decodeBool()); + }; + case avro::AVRO_ARRAY: + if (target.isArray()) + { + auto nested_source_type = root_node->leafAt(0); + auto nested_target_type = assert_cast(*target_type).getNestedType(); + auto nested_deserialize = createDeserializeFn(nested_source_type, nested_target_type); + return [nested_deserialize](IColumn & column, avro::Decoder & decoder) + { + ColumnArray & column_array = assert_cast(column); + ColumnArray::Offsets & offsets = column_array.getOffsets(); + IColumn & nested_column = column_array.getData(); + size_t total = 0; + for (size_t n = decoder.arrayStart(); n != 0; n = decoder.arrayNext()) + { + total += n; + for (size_t i = 0; i < n; i++) + { + nested_deserialize(nested_column, decoder); + } + } + offsets.push_back(offsets.back() + total); + }; + } + break; + case avro::AVRO_UNION: + { + auto nullable_deserializer = [root_node, target_type](size_t non_null_union_index) + { + auto nested_deserialize = createDeserializeFn(root_node->leafAt(non_null_union_index), removeNullable(target_type)); + return [non_null_union_index, nested_deserialize](IColumn & column, avro::Decoder & decoder) + { + ColumnNullable & col = assert_cast(column); + size_t union_index = decoder.decodeUnionIndex(); + if (union_index == non_null_union_index) + { + nested_deserialize(col.getNestedColumn(), decoder); + col.getNullMapData().push_back(0); + } + else + { + col.insertDefault(); + } + }; + }; + if (root_node->leaves() == 2 && target.isNullable()) + { + if (root_node->leafAt(0)->type() == avro::AVRO_NULL) + return nullable_deserializer(1); + if (root_node->leafAt(1)->type() == avro::AVRO_NULL) + return nullable_deserializer(0); + } + break; + } + case avro::AVRO_NULL: + if (target.isNullable()) + { + auto nested_type = removeNullable(target_type); + if (nested_type->getTypeId() == TypeIndex::Nothing) + { + return [](IColumn &, avro::Decoder & decoder) + { + decoder.decodeNull(); + }; + } + else + { + return [](IColumn & column, avro::Decoder & decoder) + { + ColumnNullable & col = assert_cast(column); + decoder.decodeNull(); + col.insertDefault(); + }; + } + } + break; + case avro::AVRO_ENUM: + if (target.isString()) + { + std::vector symbols; + for (size_t i = 0; i < root_node->names(); i++) + { + symbols.push_back(root_node->nameAt(i)); + } + return [symbols](IColumn & column, avro::Decoder & decoder) + { + size_t enum_index = decoder.decodeEnum(); + const auto & enum_symbol = symbols[enum_index]; + column.insertData(enum_symbol.c_str(), enum_symbol.length()); + }; + } + if (target.isEnum()) + { + const auto & enum_type = dynamic_cast(*target_type); + std::vector symbol_mapping; + for (size_t i = 0; i < root_node->names(); i++) + { + symbol_mapping.push_back(enum_type.castToValue(root_node->nameAt(i))); + } + return [symbol_mapping](IColumn & column, avro::Decoder & decoder) + { + size_t enum_index = decoder.decodeEnum(); + column.insert(symbol_mapping[enum_index]); + }; + } + break; + case avro::AVRO_FIXED: + { + size_t fixed_size = root_node->fixedSize(); + if (target.isFixedString() && target_type->getSizeOfValueInMemory() == fixed_size) + { + return [tmp_fixed = std::vector(fixed_size)](IColumn & column, avro::Decoder & decoder) mutable + { + decoder.decodeFixed(tmp_fixed.size(), tmp_fixed); + column.insertData(reinterpret_cast(tmp_fixed.data()), tmp_fixed.size()); + }; + } + break; + } + case avro::AVRO_MAP: [[fallthrough]]; + case avro::AVRO_RECORD: [[fallthrough]]; + default: + break; + } + + throw Exception( + "Type " + target_type->getName() + " is not compatible with Avro " + avro::ValidSchema(root_node).toJson(false), + ErrorCodes::ILLEGAL_COLUMN); +} + +AvroDeserializer::SkipFn AvroDeserializer::createSkipFn(avro::NodePtr root_node) +{ + switch (root_node->type()) + { + case avro::AVRO_STRING: + return [](avro::Decoder & decoder) { decoder.skipString(); }; + case avro::AVRO_BYTES: + return [](avro::Decoder & decoder) { decoder.skipBytes(); }; + case avro::AVRO_INT: + return [](avro::Decoder & decoder) { decoder.decodeInt(); }; + case avro::AVRO_LONG: + return [](avro::Decoder & decoder) { decoder.decodeLong(); }; + case avro::AVRO_FLOAT: + return [](avro::Decoder & decoder) { decoder.decodeFloat(); }; + case avro::AVRO_DOUBLE: + return [](avro::Decoder & decoder) { decoder.decodeDouble(); }; + case avro::AVRO_BOOL: + return [](avro::Decoder & decoder) { decoder.decodeBool(); }; + case avro::AVRO_ARRAY: + { + auto nested_skip_fn = createSkipFn(root_node->leafAt(0)); + return [nested_skip_fn](avro::Decoder & decoder) + { + for (size_t n = decoder.arrayStart(); n != 0; n = decoder.arrayNext()) + { + for (size_t i = 0; i < n; ++i) + { + nested_skip_fn(decoder); + } + } + }; + } + case avro::AVRO_UNION: + { + std::vector union_skip_fns; + for (size_t i = 0; i < root_node->leaves(); i++) + { + union_skip_fns.push_back(createSkipFn(root_node->leafAt(i))); + } + return [union_skip_fns](avro::Decoder & decoder) { union_skip_fns[decoder.decodeUnionIndex()](decoder); }; + } + case avro::AVRO_NULL: + return [](avro::Decoder & decoder) { decoder.decodeNull(); }; + case avro::AVRO_ENUM: + return [](avro::Decoder & decoder) { decoder.decodeEnum(); }; + case avro::AVRO_FIXED: + { + auto fixed_size = root_node->fixedSize(); + return [fixed_size](avro::Decoder & decoder) { decoder.skipFixed(fixed_size); }; + } + case avro::AVRO_MAP: + { + auto value_skip_fn = createSkipFn(root_node->leafAt(1)); + return [value_skip_fn](avro::Decoder & decoder) + { + for (size_t n = decoder.mapStart(); n != 0; n = decoder.mapNext()) + { + for (size_t i = 0; i < n; ++i) + { + decoder.skipString(); + value_skip_fn(decoder); + } + } + }; + } + case avro::AVRO_RECORD: + { + std::vector field_skip_fns; + for (size_t i = 0; i < root_node->leaves(); i++) + { + field_skip_fns.push_back(createSkipFn(root_node->leafAt(i))); + } + return [field_skip_fns](avro::Decoder & decoder) + { + for (auto & skip_fn : field_skip_fns) + skip_fn(decoder); + }; + } + default: + throw Exception("Unsupported Avro type " + root_node->name().fullname() + " (" + toString(int(root_node->type())) + ")", ErrorCodes::ILLEGAL_COLUMN); + } +} + + +AvroDeserializer::AvroDeserializer(const ColumnsWithTypeAndName & columns, avro::ValidSchema schema) +{ + auto schema_root = schema.root(); + if (schema_root->type() != avro::AVRO_RECORD) + { + throw Exception("Root schema must be a record", ErrorCodes::TYPE_MISMATCH); + } + + field_mapping.resize(schema_root->leaves(), -1); + + for (size_t i = 0; i < schema_root->leaves(); ++i) + { + skip_fns.push_back(createSkipFn(schema_root->leafAt(i))); + deserialize_fns.push_back(&deserializeNoop); + } + + for (size_t i = 0; i < columns.size(); ++i) + { + const auto & column = columns[i]; + size_t field_index = 0; + if (!schema_root->nameIndex(column.name, field_index)) + { + throw Exception("Field " + column.name + " not found in Avro schema", ErrorCodes::THERE_IS_NO_COLUMN); + } + auto field_schema = schema_root->leafAt(field_index); + try + { + deserialize_fns[field_index] = createDeserializeFn(field_schema, column.type); + } + catch (Exception & e) + { + e.addMessage("column " + column.name); + throw; + } + field_mapping[field_index] = i; + } +} + +void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder & decoder) +{ + for (size_t i = 0; i < field_mapping.size(); i++) + { + if (field_mapping[i] >= 0) + { + deserialize_fns[i](*columns[field_mapping[i]], decoder); + } + else + { + skip_fns[i](decoder); + } + } +} + + +AvroRowInputFormat::AvroRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_) + : IRowInputFormat(header_, in_, params_) + , file_reader(std::make_unique(in_)) + , deserializer(header_.getColumnsWithTypeAndName(), file_reader.dataSchema()) +{ + file_reader.init(); +} + +bool AvroRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &) +{ + if (file_reader.hasMore()) + { + file_reader.decr(); + deserializer.deserializeRow(columns, file_reader.decoder()); + return true; + } + return false; +} + +#if USE_POCO_JSON +class AvroConfluentRowInputFormat::SchemaRegistry +{ +public: + SchemaRegistry(const std::string & base_url_) + { + if (base_url_.empty()) + { + throw Exception("Empty Schema Registry URL", ErrorCodes::BAD_ARGUMENTS); + } + try + { + base_url = base_url_; + } + catch (const Poco::SyntaxException & e) + { + throw Exception("Invalid Schema Registry URL: " + e.displayText(), ErrorCodes::BAD_ARGUMENTS); + } + } + + avro::ValidSchema getSchema(uint32_t id) const + { + try + { + try + { + /// TODO Host checking to prevent SSRF + + Poco::URI url(base_url, "/schemas/ids/" + std::to_string(id)); + + /// One second for connect/send/receive. Just in case. + ConnectionTimeouts timeouts({1, 0}, {1, 0}, {1, 0}); + + Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery()); + + auto session = makePooledHTTPSession(url, timeouts, 1); + session->sendRequest(request); + + Poco::Net::HTTPResponse response; + auto & response_body = session->receiveResponse(response); + + if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK) + { + throw Exception("HTTP code " + std::to_string(response.getStatus()), ErrorCodes::INCORRECT_DATA); + } + + Poco::JSON::Parser parser; + auto json_body = parser.parse(response_body).extract(); + auto schema = json_body->getValue("schema"); + return avro::compileJsonSchemaFromString(schema); + } + catch (const Exception &) + { + throw; + } + catch (const Poco::Exception & e) + { + throw Exception(Exception::CreateFromPoco, e); + } + catch (const avro::Exception & e) + { + throw Exception(e.what(), ErrorCodes::INCORRECT_DATA); + } + } + catch (Exception & e) + { + e.addMessage("while fetching schema id = " + std::to_string(id)); + throw; + } + } + +private: + Poco::URI base_url; +}; + +static uint32_t readConfluentSchemaId(ReadBuffer & in) +{ + uint8_t magic; + uint32_t schema_id; + + readBinaryBigEndian(magic, in); + readBinaryBigEndian(schema_id, in); + + if (magic != 0x00) + { + throw Exception("Invalid magic byte before AvroConfluent schema identifier." + " Must be zero byte, found " + std::to_string(int(magic)) + " instead", ErrorCodes::INCORRECT_DATA); + } + + return schema_id; +} + +AvroConfluentRowInputFormat::AvroConfluentRowInputFormat( + const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_) + : IRowInputFormat(header_.cloneEmpty(), in_, params_) + , header_columns(header_.getColumnsWithTypeAndName()) + , schema_registry(std::make_unique(format_settings_.avro.schema_registry_url)) + , input_stream(std::make_unique(in)) + , decoder(avro::binaryDecoder()) + +{ + decoder->init(*input_stream); +} + +bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &) +{ + if (in.eof()) + { + return false; + } + SchemaId schema_id = readConfluentSchemaId(in); + auto & deserializer = getOrCreateDeserializer(schema_id); + deserializer.deserializeRow(columns, *decoder); + decoder->drain(); + return true; +} + +AvroDeserializer & AvroConfluentRowInputFormat::getOrCreateDeserializer(SchemaId schema_id) +{ + auto it = deserializer_cache.find(schema_id); + if (it == deserializer_cache.end()) + { + auto schema = schema_registry->getSchema(schema_id); + AvroDeserializer deserializer(header_columns, schema); + it = deserializer_cache.emplace(schema_id, deserializer).first; + } + return it->second; +} +#endif + +void registerInputFormatProcessorAvro(FormatFactory & factory) +{ + factory.registerInputFormatProcessor("Avro", []( + ReadBuffer & buf, + const Block & sample, + const RowInputFormatParams & params, + const FormatSettings &) + { + return std::make_shared(sample, buf, params); + }); + +#if USE_POCO_JSON + + /// AvroConfluent format is disabled for the following reasons: + /// 1. There is no test for it. + /// 2. RemoteHostFilter is not used to prevent CSRF attacks. + +#if 0 + factory.registerInputFormatProcessor("AvroConfluent",[]( + ReadBuffer & buf, + const Block & sample, + const RowInputFormatParams & params, + const FormatSettings & settings) + { + return std::make_shared(sample, buf, params, settings); + }); +#endif + +#endif + +} + +} + +#else + +namespace DB +{ +class FormatFactory; +void registerInputFormatProcessorAvro(FormatFactory &) +{ +} +} + +#endif diff --git a/dbms/src/Processors/Formats/Impl/AvroRowInputFormat.h b/dbms/src/Processors/Formats/Impl/AvroRowInputFormat.h new file mode 100644 index 00000000000..0fb979b4f4e --- /dev/null +++ b/dbms/src/Processors/Formats/Impl/AvroRowInputFormat.h @@ -0,0 +1,79 @@ +#pragma once +#include "config_formats.h" +#include "config_core.h" +#if USE_AVRO + +#include +#include + +#include +#include +#include + +#include +#include +#include +#include + + +namespace DB +{ +class AvroDeserializer +{ +public: + AvroDeserializer(const ColumnsWithTypeAndName & columns, avro::ValidSchema schema); + void deserializeRow(MutableColumns & columns, avro::Decoder & decoder); + +private: + using DeserializeFn = std::function; + using SkipFn = std::function; + static DeserializeFn createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type); + static SkipFn createSkipFn(avro::NodePtr root_node); + + /// Map from field index in Avro schema to column number in block header. Or -1 if there is no corresponding column. + std::vector field_mapping; + + /// How to skip the corresponding field in Avro schema. + std::vector skip_fns; + + /// How to deserialize the corresponding field in Avro schema. + std::vector deserialize_fns; +}; + +class AvroRowInputFormat : public IRowInputFormat +{ +public: + AvroRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_); + virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override; + String getName() const override { return "AvroRowInputFormat"; } + +private: + avro::DataFileReaderBase file_reader; + AvroDeserializer deserializer; +}; + +#if USE_POCO_JSON +class AvroConfluentRowInputFormat : public IRowInputFormat +{ +public: + AvroConfluentRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_); + virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override; + String getName() const override { return "AvroConfluentRowInputFormat"; } + +private: + const ColumnsWithTypeAndName header_columns; + + class SchemaRegistry; + std::unique_ptr schema_registry; + + using SchemaId = uint32_t; + std::unordered_map deserializer_cache; + AvroDeserializer & getOrCreateDeserializer(SchemaId schema_id); + + avro::InputStreamPtr input_stream; + avro::DecoderPtr decoder; +}; +#endif + +} +#endif diff --git a/dbms/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp b/dbms/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp new file mode 100644 index 00000000000..26b427dfa31 --- /dev/null +++ b/dbms/src/Processors/Formats/Impl/AvroRowOutputFormat.cpp @@ -0,0 +1,396 @@ +#include "AvroRowOutputFormat.h" +#if USE_AVRO + +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ +namespace ErrorCodes +{ + extern const int BAD_TYPE_OF_FIELD; + extern const int BAD_ARGUMENTS; + extern const int THERE_IS_NO_COLUMN; + extern const int LOGICAL_ERROR; + extern const int INCORRECT_DATA; + extern const int CANNOT_READ_ALL_DATA; +} + +class OutputStreamWriteBufferAdapter : public avro::OutputStream +{ +public: + OutputStreamWriteBufferAdapter(WriteBuffer & out_) : out(out_) {} + + virtual bool next(uint8_t ** data, size_t * len) override + { + out.nextIfAtEnd(); + *data = reinterpret_cast(out.position()); + *len = out.available(); + out.position() += out.available(); + + return true; + } + + virtual void backup(size_t len) override { out.position() -= len; } + + virtual uint64_t byteCount() const override { return out.count(); } + virtual void flush() override { out.next(); } + +private: + WriteBuffer & out; +}; + + +AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeFn(DataTypePtr data_type, size_t & type_name_increment) +{ + ++type_name_increment; + + switch (data_type->getTypeId()) + { + case TypeIndex::UInt8: + return {avro::IntSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + encoder.encodeInt(assert_cast(column).getElement(row_num)); + }}; + case TypeIndex::Int8: + return {avro::IntSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + encoder.encodeInt(assert_cast(column).getElement(row_num)); + }}; + case TypeIndex::UInt16: + return {avro::IntSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + encoder.encodeInt(assert_cast(column).getElement(row_num)); + }}; + case TypeIndex::Int16: + return {avro::IntSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + encoder.encodeInt(assert_cast(column).getElement(row_num)); + }}; + case TypeIndex::UInt32: [[fallthrough]]; + case TypeIndex::DateTime: + return {avro::IntSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + encoder.encodeInt(assert_cast(column).getElement(row_num)); + }}; + case TypeIndex::Int32: + return {avro::IntSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + encoder.encodeInt(assert_cast(column).getElement(row_num)); + }}; + case TypeIndex::UInt64: + return {avro::LongSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + encoder.encodeLong(assert_cast(column).getElement(row_num)); + }}; + case TypeIndex::Int64: + return {avro::LongSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + encoder.encodeLong(assert_cast(column).getElement(row_num)); + }}; + case TypeIndex::Float32: + return {avro::FloatSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + encoder.encodeFloat(assert_cast(column).getElement(row_num)); + }}; + case TypeIndex::Float64: + return {avro::DoubleSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + encoder.encodeDouble(assert_cast(column).getElement(row_num)); + }}; + case TypeIndex::Date: + { + auto schema = avro::IntSchema(); + schema.root()->setLogicalType(avro::LogicalType(avro::LogicalType::DATE)); + return {schema, [](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + UInt16 date = assert_cast(column).getElement(row_num); + encoder.encodeInt(date); + }}; + } + case TypeIndex::DateTime64: + { + auto schema = avro::LongSchema(); + const auto & provided_type = assert_cast(*data_type); + + if (provided_type.getScale() == 3) + schema.root()->setLogicalType(avro::LogicalType(avro::LogicalType::TIMESTAMP_MILLIS)); + else if (provided_type.getScale() == 6) + schema.root()->setLogicalType(avro::LogicalType(avro::LogicalType::TIMESTAMP_MICROS)); + else + break; + + return {schema, [](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + const auto & col = assert_cast(column); + encoder.encodeLong(col.getElement(row_num)); + }}; + } + case TypeIndex::String: + return {avro::BytesSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + const StringRef & s = assert_cast(column).getDataAt(row_num); + encoder.encodeBytes(reinterpret_cast(s.data), s.size); + }}; + case TypeIndex::FixedString: + { + auto size = data_type->getSizeOfValueInMemory(); + auto schema = avro::FixedSchema(size, "fixed_" + toString(type_name_increment)); + return {schema, [](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + const StringRef & s = assert_cast(column).getDataAt(row_num); + encoder.encodeFixed(reinterpret_cast(s.data), s.size); + }}; + } + case TypeIndex::Enum8: + { + auto schema = avro::EnumSchema("enum8_" + toString(type_name_increment)); /// type names must be different for different types. + std::unordered_map enum_mapping; + const auto & enum_values = assert_cast(*data_type).getValues(); + for (size_t i = 0; i < enum_values.size(); ++i) + { + schema.addSymbol(enum_values[i].first); + enum_mapping.emplace(enum_values[i].second, i); + } + return {schema, [enum_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + auto enum_value = assert_cast(column).getElement(row_num); + encoder.encodeEnum(enum_mapping.at(enum_value)); + }}; + } + case TypeIndex::Enum16: + { + auto schema = avro::EnumSchema("enum16" + toString(type_name_increment)); + std::unordered_map enum_mapping; + const auto & enum_values = assert_cast(*data_type).getValues(); + for (size_t i = 0; i < enum_values.size(); ++i) + { + schema.addSymbol(enum_values[i].first); + enum_mapping.emplace(enum_values[i].second, i); + } + return {schema, [enum_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + auto enum_value = assert_cast(column).getElement(row_num); + encoder.encodeEnum(enum_mapping.at(enum_value)); + }}; + } + case TypeIndex::Array: + { + const auto & array_type = assert_cast(*data_type); + auto nested_mapping = createSchemaWithSerializeFn(array_type.getNestedType(), type_name_increment); + auto schema = avro::ArraySchema(nested_mapping.schema); + return {schema, [nested_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + const ColumnArray & column_array = assert_cast(column); + const ColumnArray::Offsets & offsets = column_array.getOffsets(); + size_t offset = offsets[row_num - 1]; + size_t next_offset = offsets[row_num]; + size_t row_count = next_offset - offset; + const IColumn & nested_column = column_array.getData(); + + encoder.arrayStart(); + if (row_count > 0) + { + encoder.setItemCount(row_count); + } + for (size_t i = offset; i < next_offset; ++i) + { + nested_mapping.serialize(nested_column, i, encoder); + } + encoder.arrayEnd(); + }}; + } + case TypeIndex::Nullable: + { + auto nested_type = removeNullable(data_type); + auto nested_mapping = createSchemaWithSerializeFn(nested_type, type_name_increment); + if (nested_type->getTypeId() == TypeIndex::Nothing) + { + return nested_mapping; + } + else + { + avro::UnionSchema union_schema; + union_schema.addType(avro::NullSchema()); + union_schema.addType(nested_mapping.schema); + return {union_schema, [nested_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + const ColumnNullable & col = assert_cast(column); + if (!col.isNullAt(row_num)) + { + encoder.encodeUnionIndex(1); + nested_mapping.serialize(col.getNestedColumn(), row_num, encoder); + } + else + { + encoder.encodeUnionIndex(0); + encoder.encodeNull(); + } + }}; + } + } + case TypeIndex::LowCardinality: + { + const auto & nested_type = removeLowCardinality(data_type); + auto nested_mapping = createSchemaWithSerializeFn(nested_type, type_name_increment); + return {nested_mapping.schema, [nested_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder) + { + const auto & col = assert_cast(column); + nested_mapping.serialize(*col.getDictionary().getNestedColumn(), col.getIndexAt(row_num), encoder); + }}; + } + case TypeIndex::Nothing: + return {avro::NullSchema(), [](const IColumn &, size_t, avro::Encoder & encoder) { encoder.encodeNull(); }}; + default: + break; + } + throw Exception("Type " + data_type->getName() + " is not supported for Avro output", ErrorCodes::ILLEGAL_COLUMN); +} + + +AvroSerializer::AvroSerializer(const ColumnsWithTypeAndName & columns) +{ + avro::RecordSchema record_schema("row"); + + size_t type_name_increment = 0; + for (auto & column : columns) + { + try + { + auto field_mapping = createSchemaWithSerializeFn(column.type, type_name_increment); + serialize_fns.push_back(field_mapping.serialize); + //TODO: verify name starts with A-Za-z_ + record_schema.addField(column.name, field_mapping.schema); + } + catch (Exception & e) + { + e.addMessage("column " + column.name); + throw; + } + } + schema.setSchema(record_schema); +} + +void AvroSerializer::serializeRow(const Columns & columns, size_t row_num, avro::Encoder & encoder) +{ + size_t num_columns = columns.size(); + for (size_t i = 0; i < num_columns; ++i) + { + serialize_fns[i](*columns[i], row_num, encoder); + } +} + +static avro::Codec getCodec(const std::string & codec_name) +{ + if (codec_name == "") + { +#ifdef SNAPPY_CODEC_AVAILABLE + return avro::Codec::SNAPPY_CODEC; +#else + return avro::Codec::DEFLATE_CODEC; +#endif + } + + if (codec_name == "null") return avro::Codec::NULL_CODEC; + if (codec_name == "deflate") return avro::Codec::DEFLATE_CODEC; +#ifdef SNAPPY_CODEC_AVAILABLE + if (codec_name == "snappy") return avro::Codec::SNAPPY_CODEC; +#endif + + throw Exception("Avro codec " + codec_name + " is not available", ErrorCodes::BAD_ARGUMENTS); +} + +AvroRowOutputFormat::AvroRowOutputFormat( + WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_) + : IRowOutputFormat(header_, out_, callback) + , settings(settings_) + , serializer(header_.getColumnsWithTypeAndName()) + , file_writer( + std::make_unique(out_), + serializer.getSchema(), + settings.avro.output_sync_interval, + getCodec(settings.avro.output_codec)) +{ +} + +AvroRowOutputFormat::~AvroRowOutputFormat() = default; + +void AvroRowOutputFormat::writePrefix() +{ + file_writer.syncIfNeeded(); +} + +void AvroRowOutputFormat::write(const Columns & columns, size_t row_num) +{ + file_writer.syncIfNeeded(); + serializer.serializeRow(columns, row_num, file_writer.encoder()); + file_writer.incr(); +} + +void AvroRowOutputFormat::writeSuffix() +{ + file_writer.close(); +} + +void registerOutputFormatProcessorAvro(FormatFactory & factory) +{ + factory.registerOutputFormatProcessor("Avro", []( + WriteBuffer & buf, + const Block & sample, + FormatFactory::WriteCallback callback, + const FormatSettings & settings) + { + return std::make_shared(buf, sample, callback, settings); + }); +} + +} + +#else + +namespace DB +{ +class FormatFactory; +void registerOutputFormatProcessorAvro(FormatFactory &) +{ +} +} + +#endif diff --git a/dbms/src/Processors/Formats/Impl/AvroRowOutputFormat.h b/dbms/src/Processors/Formats/Impl/AvroRowOutputFormat.h new file mode 100644 index 00000000000..4d404337d74 --- /dev/null +++ b/dbms/src/Processors/Formats/Impl/AvroRowOutputFormat.h @@ -0,0 +1,62 @@ +#pragma once +#include "config_formats.h" +#if USE_AVRO +#include + +#include +#include +#include +#include +#include + +#include +#include +#include + + +namespace DB +{ +class WriteBuffer; + +class AvroSerializer +{ +public: + AvroSerializer(const ColumnsWithTypeAndName & columns); + const avro::ValidSchema & getSchema() const { return schema; } + void serializeRow(const Columns & columns, size_t row_num, avro::Encoder & encoder); + +private: + using SerializeFn = std::function; + struct SchemaWithSerializeFn + { + avro::Schema schema; + SerializeFn serialize; + }; + + /// Type names for different complex types (e.g. enums, fixed strings) must be unique. We use simple incremental number to give them different names. + static SchemaWithSerializeFn createSchemaWithSerializeFn(DataTypePtr data_type, size_t & type_name_increment); + + std::vector serialize_fns; + avro::ValidSchema schema; +}; + +class AvroRowOutputFormat : public IRowOutputFormat +{ +public: + AvroRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_); + virtual ~AvroRowOutputFormat() override; + + String getName() const override { return "AvroRowOutputFormat"; } + void write(const Columns & columns, size_t row_num) override; + void writeField(const IColumn &, const IDataType &, size_t) override {} + virtual void writePrefix() override; + virtual void writeSuffix() override; + +private: + FormatSettings settings; + AvroSerializer serializer; + avro::DataFileWriterBase file_writer; +}; + +} +#endif diff --git a/dbms/tests/performance/parse_engine_file.xml b/dbms/tests/performance/parse_engine_file.xml index 080acbd53f2..8a0054bdd7f 100644 --- a/dbms/tests/performance/parse_engine_file.xml +++ b/dbms/tests/performance/parse_engine_file.xml @@ -34,6 +34,7 @@ TSKV RowBinary Native + Avro diff --git a/dbms/tests/performance/select_format.xml b/dbms/tests/performance/select_format.xml index 621247fee1e..189b35a2700 100644 --- a/dbms/tests/performance/select_format.xml +++ b/dbms/tests/performance/select_format.xml @@ -44,6 +44,7 @@ XML ODBCDriver2 MySQLWire + Avro diff --git a/dbms/tests/queries/0_stateless/01060_avro.reference b/dbms/tests/queries/0_stateless/01060_avro.reference new file mode 100644 index 00000000000..21fcc53f081 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01060_avro.reference @@ -0,0 +1,36 @@ +=== input += primitive +1,1,2,3.4,5.6,"b1","s1" +0,-1,9223372036854775807,3.00004,0.00001,"","" +1,2,"s1" +0,9223372036854775807,"" +"s1",2,1 +"",9223372036854775807,0 +"s1" +"" += complex +"A","t","['s1','s2']","[['a1'],['a2']]","s1",\N,"79cd909892d7e7ade1987cc7422628ba" +"C","f","[]","[]",\N,123,"79cd909892d7e7ade1987cc7422628ba" +"79cd909892d7e7ade1987cc7422628ba" +"79cd909892d7e7ade1987cc7422628ba" += logical_types +"2019-12-20","2020-01-10 07:31:56.227","2020-01-10 07:31:56.227000" +18250,1578641516227,1578641516227000 += compression +1000 +1000 += other +0 +1000 +not found +=== output += primitive +1,1,2,3.4,5.6,"b1","s1" += complex +"A","t","['s1','s2']","[['a1'],['a2']]","s1",\N,"79cd909892d7e7ade1987cc7422628ba" += logical_types +"2019-12-20","2020-01-10 07:31:56.227","2020-01-10 07:31:56.227000" += other +0 +1000 +147 diff --git a/dbms/tests/queries/0_stateless/01060_avro.sh b/dbms/tests/queries/0_stateless/01060_avro.sh new file mode 100755 index 00000000000..b57a7ad7a85 --- /dev/null +++ b/dbms/tests/queries/0_stateless/01060_avro.sh @@ -0,0 +1,70 @@ +#!/usr/bin/env bash + +set -e + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. $CUR_DIR/../shell_config.sh + +DATA_DIR=$CUR_DIR/data_avro + +# input +echo === input +echo = primitive + +cat $DATA_DIR/primitive.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a_bool UInt8, b_int Int32, c_long Int64, d_float Float32, e_double Float64, f_bytes String, g_string String' -q 'select * from table' +cat $DATA_DIR/primitive.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a_bool UInt8, c_long Int64, g_string String' -q 'select * from table' +cat $DATA_DIR/primitive.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'g_string String, c_long Int64, a_bool UInt8' -q 'select * from table' +cat $DATA_DIR/primitive.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'g_string String' -q 'select * from table' + +echo = complex +cat $DATA_DIR/complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "a_enum_to_string String, b_enum_to_enum Enum('t' = 1, 'f' = 0), c_array_string Array(String), d_array_array_string Array(Array(String)), e_union_null_string Nullable(String), f_union_long_null Nullable(Int64), g_fixed FixedString(32)" -q 'select * from table' +cat $DATA_DIR/complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "g_fixed FixedString(32)" -q 'select * from table' + +echo = logical_types +cat $DATA_DIR/logical_types.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "a_date Date, b_timestamp_millis DateTime64(3, 'UTC'), c_timestamp_micros DateTime64(6, 'UTC')" -q 'select * from table' +cat $DATA_DIR/logical_types.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a_date Int32, b_timestamp_millis Int64, c_timestamp_micros Int64' -q 'select * from table' + + + +echo = compression +cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table' +cat $DATA_DIR/simple.deflate.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table' + +#snappy is optional +#cat $DATA_DIR/simple.snappy.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table' + +echo = other +#no data +cat $DATA_DIR/empty.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table' +# type mismatch +cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int32' -q 'select count() from table' +# field not found +cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'b Int64' -q 'select count() from table' 2>&1 | grep -i 'not found' -o + + + + + + +# output +echo === output + +echo = primitive +S1="a_bool UInt8, b_int Int32, c_long Int64, d_float Float32, e_double Float64, f_bytes String, g_string String" +echo '1,1,2,3.4,5.6,"b1","s1"' | ${CLICKHOUSE_LOCAL} --input-format CSV -S "$S1" -q "select * from table format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S1" -q 'select * from table' + +echo = complex +S2="a_enum_to_string String, b_enum_to_enum Enum('t' = 1, 'f' = 0), c_array_string Array(String), d_array_array_string Array(Array(String)), e_union_null_string Nullable(String), f_union_long_null Nullable(Int64), g_fixed FixedString(32)" +echo "\"A\",\"t\",\"['s1','s2']\",\"[['a1'],['a2']]\",\"s1\",\N,\"79cd909892d7e7ade1987cc7422628ba\"" | ${CLICKHOUSE_LOCAL} --input-format CSV -S "$S2" -q "select * from table format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S2" -q 'select * from table' + +echo = logical_types +S3="a_date Date, b_timestamp_millis DateTime64(3, 'UTC'), c_timestamp_micros DateTime64(6, 'UTC')" +echo '"2019-12-20","2020-01-10 07:31:56.227","2020-01-10 07:31:56.227000"' | ${CLICKHOUSE_LOCAL} --input-format CSV -S "$S3" -q "select * from table format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S3" -q 'select * from table' + +echo = other +S4="a Int64" +${CLICKHOUSE_LOCAL} -q "select toInt64(number) as a from numbers(0) format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S4" -q 'select count() from table' +${CLICKHOUSE_LOCAL} -q "select toInt64(number) as a from numbers(1000) format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S4" -q 'select count() from table' + +# type supported via conversion +${CLICKHOUSE_LOCAL} -q "select toInt16(123) as a format Avro" | wc -c \ No newline at end of file diff --git a/dbms/tests/queries/0_stateless/data_avro/complex.avro b/dbms/tests/queries/0_stateless/data_avro/complex.avro new file mode 100644 index 00000000000..0880f581882 Binary files /dev/null and b/dbms/tests/queries/0_stateless/data_avro/complex.avro differ diff --git a/dbms/tests/queries/0_stateless/data_avro/complex.avsc b/dbms/tests/queries/0_stateless/data_avro/complex.avsc new file mode 100644 index 00000000000..325169aeb57 --- /dev/null +++ b/dbms/tests/queries/0_stateless/data_avro/complex.avsc @@ -0,0 +1,20 @@ +{ + "type": "record", + "name": "row", + "fields": [ + {"name": "a_enum_to_string", "type": { "type": "enum", "name": "enum_1", "symbols" : ["A", "B", "C"]}}, + {"name": "b_enum_to_enum", "type": { "type": "enum", "name": "enum_2", "symbols" : ["t", "f"]}}, + {"name": "c_array_string", "type": { "type": "array", "items": "string"}}, + {"name": "d_array_array_string", "type": { "type": "array", "items": {"type": "array", "items": "string"}}}, + {"name": "e_union_null_string", "type": ["null", "string"]}, + {"name": "f_union_long_null", "type": ["long", "null"]}, + {"name": "g_fixed", "type": {"type":"fixed", "size": 32, "name": "fixed_1"}}, + {"name": "h_record_skip", "type": { + "type": "record", + "name": "subrecord", + "fields": [ + {"name": "a", "type": "string"} + ] + }} + ] + } \ No newline at end of file diff --git a/dbms/tests/queries/0_stateless/data_avro/complex.json b/dbms/tests/queries/0_stateless/data_avro/complex.json new file mode 100644 index 00000000000..d05e09c72fc --- /dev/null +++ b/dbms/tests/queries/0_stateless/data_avro/complex.json @@ -0,0 +1,2 @@ +{"a_enum_to_string":"A","b_enum_to_enum":"t","c_array_string":["s1", "s2"],"d_array_array_string":[["a1"], ["a2"]],"e_union_null_string":{"string": "s1"},"f_union_long_null":null,"g_fixed":"79cd909892d7e7ade1987cc7422628ba","h_record_skip":{"a": "a"}} +{"a_enum_to_string":"C","b_enum_to_enum":"f","c_array_string":[],"d_array_array_string":[],"e_union_null_string":null,"f_union_long_null":{"long": 123},"g_fixed":"79cd909892d7e7ade1987cc7422628ba","h_record_skip":{"a": "a"}} \ No newline at end of file diff --git a/dbms/tests/queries/0_stateless/data_avro/empty.avro b/dbms/tests/queries/0_stateless/data_avro/empty.avro new file mode 100644 index 00000000000..7cfae81758c Binary files /dev/null and b/dbms/tests/queries/0_stateless/data_avro/empty.avro differ diff --git a/dbms/tests/queries/0_stateless/data_avro/empty.avsc b/dbms/tests/queries/0_stateless/data_avro/empty.avsc new file mode 100644 index 00000000000..923eda71054 --- /dev/null +++ b/dbms/tests/queries/0_stateless/data_avro/empty.avsc @@ -0,0 +1,7 @@ +{ + "type": "record", + "name": "row", + "fields": [ + {"name": "a", "type": "long"} + ] + } \ No newline at end of file diff --git a/dbms/tests/queries/0_stateless/data_avro/empty.json b/dbms/tests/queries/0_stateless/data_avro/empty.json new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dbms/tests/queries/0_stateless/data_avro/generate_avro.sh b/dbms/tests/queries/0_stateless/data_avro/generate_avro.sh new file mode 100755 index 00000000000..3538c8693e5 --- /dev/null +++ b/dbms/tests/queries/0_stateless/data_avro/generate_avro.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash + +#avro tools: https://www.apache.org/dyn/closer.cgi?path=avro/avro-1.9.1/java/avro-tools-1.9.1.jar + + +avro-tools fromjson --schema-file primitive.avsc primitive.json > primitive.avro +avro-tools fromjson --schema-file complex.avsc complex.json > complex.avro +avro-tools fromjson --schema-file logical_types.avsc logical_types.json > logical_types.avro +avro-tools fromjson --schema-file empty.avsc empty.json > empty.avro + +#compression +avro-tools fromjson --codec null --schema-file simple.avsc simple.json > simple.null.avro +avro-tools fromjson --codec deflate --schema-file simple.avsc simple.json > simple.deflate.avro +avro-tools fromjson --codec snappy --schema-file simple.avsc simple.json > simple.snappy.avro \ No newline at end of file diff --git a/dbms/tests/queries/0_stateless/data_avro/logical_types.avro b/dbms/tests/queries/0_stateless/data_avro/logical_types.avro new file mode 100644 index 00000000000..7b8a3f60b7a Binary files /dev/null and b/dbms/tests/queries/0_stateless/data_avro/logical_types.avro differ diff --git a/dbms/tests/queries/0_stateless/data_avro/logical_types.avsc b/dbms/tests/queries/0_stateless/data_avro/logical_types.avsc new file mode 100644 index 00000000000..5d9fd96821f --- /dev/null +++ b/dbms/tests/queries/0_stateless/data_avro/logical_types.avsc @@ -0,0 +1,9 @@ +{ + "type": "record", + "name": "row", + "fields": [ + {"name": "a_date", "type": { "type": "int", "logicalType": "date"}}, + {"name": "b_timestamp_millis", "type": { "type": "long", "logicalType": "timestamp-millis"}}, + {"name": "c_timestamp_micros", "type": { "type": "long", "logicalType": "timestamp-micros"}} + ] + } \ No newline at end of file diff --git a/dbms/tests/queries/0_stateless/data_avro/logical_types.json b/dbms/tests/queries/0_stateless/data_avro/logical_types.json new file mode 100644 index 00000000000..652b85246e7 --- /dev/null +++ b/dbms/tests/queries/0_stateless/data_avro/logical_types.json @@ -0,0 +1 @@ +{"a_date":18250,"b_timestamp_millis":1578641516227,"c_timestamp_micros":1578641516227000} diff --git a/dbms/tests/queries/0_stateless/data_avro/primitive.avro b/dbms/tests/queries/0_stateless/data_avro/primitive.avro new file mode 100644 index 00000000000..ef5eb36639f Binary files /dev/null and b/dbms/tests/queries/0_stateless/data_avro/primitive.avro differ diff --git a/dbms/tests/queries/0_stateless/data_avro/primitive.avsc b/dbms/tests/queries/0_stateless/data_avro/primitive.avsc new file mode 100644 index 00000000000..a4f06d02b01 --- /dev/null +++ b/dbms/tests/queries/0_stateless/data_avro/primitive.avsc @@ -0,0 +1,14 @@ +{ + "type": "record", + "name": "row", + "fields": [ + {"name": "a_bool", "type": "boolean"}, + {"name": "b_int", "type": "int"}, + {"name": "c_long", "type": "long"}, + {"name": "d_float", "type": "float"}, + {"name": "e_double", "type": "double"}, + {"name": "f_bytes", "type": "bytes"}, + {"name": "g_string", "type": "string"}, + {"name": "h_null", "type": "null"} + ] + } \ No newline at end of file diff --git a/dbms/tests/queries/0_stateless/data_avro/primitive.json b/dbms/tests/queries/0_stateless/data_avro/primitive.json new file mode 100644 index 00000000000..fc521c8829c --- /dev/null +++ b/dbms/tests/queries/0_stateless/data_avro/primitive.json @@ -0,0 +1,2 @@ +{"a_bool":true,"b_int":1,"c_long":2,"d_float":3.4,"e_double":5.6,"f_bytes":"b1","g_string":"s1","h_null": null} +{"a_bool":false,"b_int":-1,"c_long":9223372036854775807,"d_float":3.00004,"e_double":0.00001,"f_bytes":"","g_string":"","h_null": null} \ No newline at end of file diff --git a/dbms/tests/queries/0_stateless/data_avro/simple.avsc b/dbms/tests/queries/0_stateless/data_avro/simple.avsc new file mode 100644 index 00000000000..923eda71054 --- /dev/null +++ b/dbms/tests/queries/0_stateless/data_avro/simple.avsc @@ -0,0 +1,7 @@ +{ + "type": "record", + "name": "row", + "fields": [ + {"name": "a", "type": "long"} + ] + } \ No newline at end of file diff --git a/dbms/tests/queries/0_stateless/data_avro/simple.deflate.avro b/dbms/tests/queries/0_stateless/data_avro/simple.deflate.avro new file mode 100644 index 00000000000..d4ba226b447 Binary files /dev/null and b/dbms/tests/queries/0_stateless/data_avro/simple.deflate.avro differ diff --git a/dbms/tests/queries/0_stateless/data_avro/simple.json b/dbms/tests/queries/0_stateless/data_avro/simple.json new file mode 100644 index 00000000000..c09fc0b732f --- /dev/null +++ b/dbms/tests/queries/0_stateless/data_avro/simple.json @@ -0,0 +1,1000 @@ +{"a":1} +{"a":2} +{"a":3} +{"a":4} +{"a":5} +{"a":6} +{"a":7} +{"a":8} +{"a":9} +{"a":10} +{"a":11} +{"a":12} +{"a":13} +{"a":14} +{"a":15} +{"a":16} +{"a":17} +{"a":18} +{"a":19} +{"a":20} +{"a":21} +{"a":22} +{"a":23} +{"a":24} +{"a":25} +{"a":26} +{"a":27} +{"a":28} +{"a":29} +{"a":30} +{"a":31} +{"a":32} +{"a":33} +{"a":34} +{"a":35} +{"a":36} +{"a":37} +{"a":38} +{"a":39} +{"a":40} +{"a":41} +{"a":42} +{"a":43} +{"a":44} +{"a":45} +{"a":46} +{"a":47} +{"a":48} +{"a":49} +{"a":50} +{"a":51} +{"a":52} +{"a":53} +{"a":54} +{"a":55} +{"a":56} +{"a":57} +{"a":58} +{"a":59} +{"a":60} +{"a":61} +{"a":62} +{"a":63} +{"a":64} +{"a":65} +{"a":66} +{"a":67} +{"a":68} +{"a":69} +{"a":70} +{"a":71} +{"a":72} +{"a":73} +{"a":74} +{"a":75} +{"a":76} +{"a":77} +{"a":78} +{"a":79} +{"a":80} +{"a":81} +{"a":82} +{"a":83} +{"a":84} +{"a":85} +{"a":86} +{"a":87} +{"a":88} +{"a":89} +{"a":90} +{"a":91} +{"a":92} +{"a":93} +{"a":94} +{"a":95} +{"a":96} +{"a":97} +{"a":98} +{"a":99} +{"a":100} +{"a":101} +{"a":102} +{"a":103} +{"a":104} +{"a":105} +{"a":106} +{"a":107} +{"a":108} +{"a":109} +{"a":110} +{"a":111} +{"a":112} +{"a":113} +{"a":114} +{"a":115} +{"a":116} +{"a":117} +{"a":118} +{"a":119} +{"a":120} +{"a":121} +{"a":122} +{"a":123} +{"a":124} +{"a":125} +{"a":126} +{"a":127} +{"a":128} +{"a":129} +{"a":130} +{"a":131} +{"a":132} +{"a":133} +{"a":134} +{"a":135} +{"a":136} +{"a":137} +{"a":138} +{"a":139} +{"a":140} +{"a":141} +{"a":142} +{"a":143} +{"a":144} +{"a":145} +{"a":146} +{"a":147} +{"a":148} +{"a":149} +{"a":150} +{"a":151} +{"a":152} +{"a":153} +{"a":154} +{"a":155} +{"a":156} +{"a":157} +{"a":158} +{"a":159} +{"a":160} +{"a":161} +{"a":162} +{"a":163} +{"a":164} +{"a":165} +{"a":166} +{"a":167} +{"a":168} +{"a":169} +{"a":170} +{"a":171} +{"a":172} +{"a":173} +{"a":174} +{"a":175} +{"a":176} +{"a":177} +{"a":178} +{"a":179} +{"a":180} +{"a":181} +{"a":182} +{"a":183} +{"a":184} +{"a":185} +{"a":186} +{"a":187} +{"a":188} +{"a":189} +{"a":190} +{"a":191} +{"a":192} +{"a":193} +{"a":194} +{"a":195} +{"a":196} +{"a":197} +{"a":198} +{"a":199} +{"a":200} +{"a":201} +{"a":202} +{"a":203} +{"a":204} +{"a":205} +{"a":206} +{"a":207} +{"a":208} +{"a":209} +{"a":210} +{"a":211} +{"a":212} +{"a":213} +{"a":214} +{"a":215} +{"a":216} +{"a":217} +{"a":218} +{"a":219} +{"a":220} +{"a":221} +{"a":222} +{"a":223} +{"a":224} +{"a":225} +{"a":226} +{"a":227} +{"a":228} +{"a":229} +{"a":230} +{"a":231} +{"a":232} +{"a":233} +{"a":234} +{"a":235} +{"a":236} +{"a":237} +{"a":238} +{"a":239} +{"a":240} +{"a":241} +{"a":242} +{"a":243} +{"a":244} +{"a":245} +{"a":246} +{"a":247} +{"a":248} +{"a":249} +{"a":250} +{"a":251} +{"a":252} +{"a":253} +{"a":254} +{"a":255} +{"a":256} +{"a":257} +{"a":258} +{"a":259} +{"a":260} +{"a":261} +{"a":262} +{"a":263} +{"a":264} +{"a":265} +{"a":266} +{"a":267} +{"a":268} +{"a":269} +{"a":270} +{"a":271} +{"a":272} +{"a":273} +{"a":274} +{"a":275} +{"a":276} +{"a":277} +{"a":278} +{"a":279} +{"a":280} +{"a":281} +{"a":282} +{"a":283} +{"a":284} +{"a":285} +{"a":286} +{"a":287} +{"a":288} +{"a":289} +{"a":290} +{"a":291} +{"a":292} +{"a":293} +{"a":294} +{"a":295} +{"a":296} +{"a":297} +{"a":298} +{"a":299} +{"a":300} +{"a":301} +{"a":302} +{"a":303} +{"a":304} +{"a":305} +{"a":306} +{"a":307} +{"a":308} +{"a":309} +{"a":310} +{"a":311} +{"a":312} +{"a":313} +{"a":314} +{"a":315} +{"a":316} +{"a":317} +{"a":318} +{"a":319} +{"a":320} +{"a":321} +{"a":322} +{"a":323} +{"a":324} +{"a":325} +{"a":326} +{"a":327} +{"a":328} +{"a":329} +{"a":330} +{"a":331} +{"a":332} +{"a":333} +{"a":334} +{"a":335} +{"a":336} +{"a":337} +{"a":338} +{"a":339} +{"a":340} +{"a":341} +{"a":342} +{"a":343} +{"a":344} +{"a":345} +{"a":346} +{"a":347} +{"a":348} +{"a":349} +{"a":350} +{"a":351} +{"a":352} +{"a":353} +{"a":354} +{"a":355} +{"a":356} +{"a":357} +{"a":358} +{"a":359} +{"a":360} +{"a":361} +{"a":362} +{"a":363} +{"a":364} +{"a":365} +{"a":366} +{"a":367} +{"a":368} +{"a":369} +{"a":370} +{"a":371} +{"a":372} +{"a":373} +{"a":374} +{"a":375} +{"a":376} +{"a":377} +{"a":378} +{"a":379} +{"a":380} +{"a":381} +{"a":382} +{"a":383} +{"a":384} +{"a":385} +{"a":386} +{"a":387} +{"a":388} +{"a":389} +{"a":390} +{"a":391} +{"a":392} +{"a":393} +{"a":394} +{"a":395} +{"a":396} +{"a":397} +{"a":398} +{"a":399} +{"a":400} +{"a":401} +{"a":402} +{"a":403} +{"a":404} +{"a":405} +{"a":406} +{"a":407} +{"a":408} +{"a":409} +{"a":410} +{"a":411} +{"a":412} +{"a":413} +{"a":414} +{"a":415} +{"a":416} +{"a":417} +{"a":418} +{"a":419} +{"a":420} +{"a":421} +{"a":422} +{"a":423} +{"a":424} +{"a":425} +{"a":426} +{"a":427} +{"a":428} +{"a":429} +{"a":430} +{"a":431} +{"a":432} +{"a":433} +{"a":434} +{"a":435} +{"a":436} +{"a":437} +{"a":438} +{"a":439} +{"a":440} +{"a":441} +{"a":442} +{"a":443} +{"a":444} +{"a":445} +{"a":446} +{"a":447} +{"a":448} +{"a":449} +{"a":450} +{"a":451} +{"a":452} +{"a":453} +{"a":454} +{"a":455} +{"a":456} +{"a":457} +{"a":458} +{"a":459} +{"a":460} +{"a":461} +{"a":462} +{"a":463} +{"a":464} +{"a":465} +{"a":466} +{"a":467} +{"a":468} +{"a":469} +{"a":470} +{"a":471} +{"a":472} +{"a":473} +{"a":474} +{"a":475} +{"a":476} +{"a":477} +{"a":478} +{"a":479} +{"a":480} +{"a":481} +{"a":482} +{"a":483} +{"a":484} +{"a":485} +{"a":486} +{"a":487} +{"a":488} +{"a":489} +{"a":490} +{"a":491} +{"a":492} +{"a":493} +{"a":494} +{"a":495} +{"a":496} +{"a":497} +{"a":498} +{"a":499} +{"a":500} +{"a":501} +{"a":502} +{"a":503} +{"a":504} +{"a":505} +{"a":506} +{"a":507} +{"a":508} +{"a":509} +{"a":510} +{"a":511} +{"a":512} +{"a":513} +{"a":514} +{"a":515} +{"a":516} +{"a":517} +{"a":518} +{"a":519} +{"a":520} +{"a":521} +{"a":522} +{"a":523} +{"a":524} +{"a":525} +{"a":526} +{"a":527} +{"a":528} +{"a":529} +{"a":530} +{"a":531} +{"a":532} +{"a":533} +{"a":534} +{"a":535} +{"a":536} +{"a":537} +{"a":538} +{"a":539} +{"a":540} +{"a":541} +{"a":542} +{"a":543} +{"a":544} +{"a":545} +{"a":546} +{"a":547} +{"a":548} +{"a":549} +{"a":550} +{"a":551} +{"a":552} +{"a":553} +{"a":554} +{"a":555} +{"a":556} +{"a":557} +{"a":558} +{"a":559} +{"a":560} +{"a":561} +{"a":562} +{"a":563} +{"a":564} +{"a":565} +{"a":566} +{"a":567} +{"a":568} +{"a":569} +{"a":570} +{"a":571} +{"a":572} +{"a":573} +{"a":574} +{"a":575} +{"a":576} +{"a":577} +{"a":578} +{"a":579} +{"a":580} +{"a":581} +{"a":582} +{"a":583} +{"a":584} +{"a":585} +{"a":586} +{"a":587} +{"a":588} +{"a":589} +{"a":590} +{"a":591} +{"a":592} +{"a":593} +{"a":594} +{"a":595} +{"a":596} +{"a":597} +{"a":598} +{"a":599} +{"a":600} +{"a":601} +{"a":602} +{"a":603} +{"a":604} +{"a":605} +{"a":606} +{"a":607} +{"a":608} +{"a":609} +{"a":610} +{"a":611} +{"a":612} +{"a":613} +{"a":614} +{"a":615} +{"a":616} +{"a":617} +{"a":618} +{"a":619} +{"a":620} +{"a":621} +{"a":622} +{"a":623} +{"a":624} +{"a":625} +{"a":626} +{"a":627} +{"a":628} +{"a":629} +{"a":630} +{"a":631} +{"a":632} +{"a":633} +{"a":634} +{"a":635} +{"a":636} +{"a":637} +{"a":638} +{"a":639} +{"a":640} +{"a":641} +{"a":642} +{"a":643} +{"a":644} +{"a":645} +{"a":646} +{"a":647} +{"a":648} +{"a":649} +{"a":650} +{"a":651} +{"a":652} +{"a":653} +{"a":654} +{"a":655} +{"a":656} +{"a":657} +{"a":658} +{"a":659} +{"a":660} +{"a":661} +{"a":662} +{"a":663} +{"a":664} +{"a":665} +{"a":666} +{"a":667} +{"a":668} +{"a":669} +{"a":670} +{"a":671} +{"a":672} +{"a":673} +{"a":674} +{"a":675} +{"a":676} +{"a":677} +{"a":678} +{"a":679} +{"a":680} +{"a":681} +{"a":682} +{"a":683} +{"a":684} +{"a":685} +{"a":686} +{"a":687} +{"a":688} +{"a":689} +{"a":690} +{"a":691} +{"a":692} +{"a":693} +{"a":694} +{"a":695} +{"a":696} +{"a":697} +{"a":698} +{"a":699} +{"a":700} +{"a":701} +{"a":702} +{"a":703} +{"a":704} +{"a":705} +{"a":706} +{"a":707} +{"a":708} +{"a":709} +{"a":710} +{"a":711} +{"a":712} +{"a":713} +{"a":714} +{"a":715} +{"a":716} +{"a":717} +{"a":718} +{"a":719} +{"a":720} +{"a":721} +{"a":722} +{"a":723} +{"a":724} +{"a":725} +{"a":726} +{"a":727} +{"a":728} +{"a":729} +{"a":730} +{"a":731} +{"a":732} +{"a":733} +{"a":734} +{"a":735} +{"a":736} +{"a":737} +{"a":738} +{"a":739} +{"a":740} +{"a":741} +{"a":742} +{"a":743} +{"a":744} +{"a":745} +{"a":746} +{"a":747} +{"a":748} +{"a":749} +{"a":750} +{"a":751} +{"a":752} +{"a":753} +{"a":754} +{"a":755} +{"a":756} +{"a":757} +{"a":758} +{"a":759} +{"a":760} +{"a":761} +{"a":762} +{"a":763} +{"a":764} +{"a":765} +{"a":766} +{"a":767} +{"a":768} +{"a":769} +{"a":770} +{"a":771} +{"a":772} +{"a":773} +{"a":774} +{"a":775} +{"a":776} +{"a":777} +{"a":778} +{"a":779} +{"a":780} +{"a":781} +{"a":782} +{"a":783} +{"a":784} +{"a":785} +{"a":786} +{"a":787} +{"a":788} +{"a":789} +{"a":790} +{"a":791} +{"a":792} +{"a":793} +{"a":794} +{"a":795} +{"a":796} +{"a":797} +{"a":798} +{"a":799} +{"a":800} +{"a":801} +{"a":802} +{"a":803} +{"a":804} +{"a":805} +{"a":806} +{"a":807} +{"a":808} +{"a":809} +{"a":810} +{"a":811} +{"a":812} +{"a":813} +{"a":814} +{"a":815} +{"a":816} +{"a":817} +{"a":818} +{"a":819} +{"a":820} +{"a":821} +{"a":822} +{"a":823} +{"a":824} +{"a":825} +{"a":826} +{"a":827} +{"a":828} +{"a":829} +{"a":830} +{"a":831} +{"a":832} +{"a":833} +{"a":834} +{"a":835} +{"a":836} +{"a":837} +{"a":838} +{"a":839} +{"a":840} +{"a":841} +{"a":842} +{"a":843} +{"a":844} +{"a":845} +{"a":846} +{"a":847} +{"a":848} +{"a":849} +{"a":850} +{"a":851} +{"a":852} +{"a":853} +{"a":854} +{"a":855} +{"a":856} +{"a":857} +{"a":858} +{"a":859} +{"a":860} +{"a":861} +{"a":862} +{"a":863} +{"a":864} +{"a":865} +{"a":866} +{"a":867} +{"a":868} +{"a":869} +{"a":870} +{"a":871} +{"a":872} +{"a":873} +{"a":874} +{"a":875} +{"a":876} +{"a":877} +{"a":878} +{"a":879} +{"a":880} +{"a":881} +{"a":882} +{"a":883} +{"a":884} +{"a":885} +{"a":886} +{"a":887} +{"a":888} +{"a":889} +{"a":890} +{"a":891} +{"a":892} +{"a":893} +{"a":894} +{"a":895} +{"a":896} +{"a":897} +{"a":898} +{"a":899} +{"a":900} +{"a":901} +{"a":902} +{"a":903} +{"a":904} +{"a":905} +{"a":906} +{"a":907} +{"a":908} +{"a":909} +{"a":910} +{"a":911} +{"a":912} +{"a":913} +{"a":914} +{"a":915} +{"a":916} +{"a":917} +{"a":918} +{"a":919} +{"a":920} +{"a":921} +{"a":922} +{"a":923} +{"a":924} +{"a":925} +{"a":926} +{"a":927} +{"a":928} +{"a":929} +{"a":930} +{"a":931} +{"a":932} +{"a":933} +{"a":934} +{"a":935} +{"a":936} +{"a":937} +{"a":938} +{"a":939} +{"a":940} +{"a":941} +{"a":942} +{"a":943} +{"a":944} +{"a":945} +{"a":946} +{"a":947} +{"a":948} +{"a":949} +{"a":950} +{"a":951} +{"a":952} +{"a":953} +{"a":954} +{"a":955} +{"a":956} +{"a":957} +{"a":958} +{"a":959} +{"a":960} +{"a":961} +{"a":962} +{"a":963} +{"a":964} +{"a":965} +{"a":966} +{"a":967} +{"a":968} +{"a":969} +{"a":970} +{"a":971} +{"a":972} +{"a":973} +{"a":974} +{"a":975} +{"a":976} +{"a":977} +{"a":978} +{"a":979} +{"a":980} +{"a":981} +{"a":982} +{"a":983} +{"a":984} +{"a":985} +{"a":986} +{"a":987} +{"a":988} +{"a":989} +{"a":990} +{"a":991} +{"a":992} +{"a":993} +{"a":994} +{"a":995} +{"a":996} +{"a":997} +{"a":998} +{"a":999} +{"a":1000} diff --git a/dbms/tests/queries/0_stateless/data_avro/simple.null.avro b/dbms/tests/queries/0_stateless/data_avro/simple.null.avro new file mode 100644 index 00000000000..789ab45101f Binary files /dev/null and b/dbms/tests/queries/0_stateless/data_avro/simple.null.avro differ diff --git a/dbms/tests/queries/0_stateless/data_avro/simple.snappy.avro b/dbms/tests/queries/0_stateless/data_avro/simple.snappy.avro new file mode 100644 index 00000000000..b812ed6c7ea Binary files /dev/null and b/dbms/tests/queries/0_stateless/data_avro/simple.snappy.avro differ diff --git a/dbms/tests/queries/1_stateful/00154_avro.reference b/dbms/tests/queries/1_stateful/00154_avro.reference new file mode 100644 index 00000000000..7e243047e8b --- /dev/null +++ b/dbms/tests/queries/1_stateful/00154_avro.reference @@ -0,0 +1,2 @@ +17300372046749301651 +17300372046749301651 diff --git a/dbms/tests/queries/1_stateful/00154_avro.sql b/dbms/tests/queries/1_stateful/00154_avro.sql new file mode 100644 index 00000000000..3d43a23e516 --- /dev/null +++ b/dbms/tests/queries/1_stateful/00154_avro.sql @@ -0,0 +1,9 @@ +DROP TABLE IF EXISTS test.avro; + +CREATE TABLE test.avro AS test.hits ENGINE = File(Avro); +INSERT INTO test.avro SELECT * FROM test.hits WHERE intHash64(WatchID) % 100 = 0; + +SELECT sum(cityHash64(*)) FROM test.hits WHERE intHash64(WatchID) % 100 = 0; +SELECT sum(cityHash64(*)) FROM test.avro; + +DROP TABLE test.avro;