Merge branch 'avro' of https://github.com/oandrew/ClickHouse into oandrew-avro

This commit is contained in:
Alexey Milovidov 2020-01-18 21:23:55 +03:00
commit 76e04b9843
42 changed files with 2502 additions and 17 deletions

4
.gitmodules vendored
View File

@ -140,3 +140,7 @@
[submodule "contrib/ryu"]
path = contrib/ryu
url = https://github.com/ClickHouse-Extras/ryu.git
[submodule "contrib/avro"]
path = contrib/avro
url = https://github.com/apache/avro.git
ignore = untracked

View File

@ -352,6 +352,7 @@ include (cmake/find/simdjson.cmake)
include (cmake/find/rapidjson.cmake)
include (cmake/find/fastops.cmake)
include (cmake/find/orc.cmake)
include (cmake/find/avro.cmake)
find_contrib_lib(cityhash)
find_contrib_lib(farmhash)

28
cmake/find/avro.cmake Normal file
View File

@ -0,0 +1,28 @@
option (ENABLE_AVRO "Enable Avro" ${ENABLE_LIBRARIES})
if (ENABLE_AVRO)
option (USE_INTERNAL_AVRO_LIBRARY "Set to FALSE to use system avro library instead of bundled" ${NOT_UNBUNDLED})
if(NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/avro/lang/c++/CMakeLists.txt")
if(USE_INTERNAL_AVRO_LIBRARY)
message(WARNING "submodule contrib/avro is missing. to fix try run: \n git submodule update --init --recursive")
endif()
set(MISSING_INTERNAL_AVRO_LIBRARY 1)
set(USE_INTERNAL_AVRO_LIBRARY 0)
endif()
if (NOT USE_INTERNAL_AVRO_LIBRARY)
elseif(NOT MISSING_INTERNAL_AVRO_LIBRARY)
include(cmake/find/snappy.cmake)
set(AVROCPP_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/avro/lang/c++/include")
set(AVROCPP_LIBRARY avrocpp)
endif ()
if (AVROCPP_LIBRARY AND AVROCPP_INCLUDE_DIR)
set(USE_AVRO 1)
endif()
endif()
message (STATUS "Using avro=${USE_AVRO}: ${AVROCPP_INCLUDE_DIR} : ${AVROCPP_LIBRARY}")

View File

@ -31,6 +31,7 @@ if (NOT Boost_SYSTEM_LIBRARY AND NOT MISSING_INTERNAL_BOOST_LIBRARY)
set (Boost_SYSTEM_LIBRARY boost_system_internal)
set (Boost_PROGRAM_OPTIONS_LIBRARY boost_program_options_internal)
set (Boost_FILESYSTEM_LIBRARY boost_filesystem_internal ${Boost_SYSTEM_LIBRARY})
set (Boost_IOSTREAMS_LIBRARY boost_iostreams_internal)
set (Boost_REGEX_LIBRARY boost_regex_internal)
set (Boost_INCLUDE_DIRS)
@ -48,4 +49,4 @@ if (NOT Boost_SYSTEM_LIBRARY AND NOT MISSING_INTERNAL_BOOST_LIBRARY)
list (APPEND Boost_INCLUDE_DIRS "${ClickHouse_SOURCE_DIR}/contrib/boost")
endif ()
message (STATUS "Using Boost: ${Boost_INCLUDE_DIRS} : ${Boost_PROGRAM_OPTIONS_LIBRARY},${Boost_SYSTEM_LIBRARY},${Boost_FILESYSTEM_LIBRARY},${Boost_REGEX_LIBRARY}")
message (STATUS "Using Boost: ${Boost_INCLUDE_DIRS} : ${Boost_PROGRAM_OPTIONS_LIBRARY},${Boost_SYSTEM_LIBRARY},${Boost_FILESYSTEM_LIBRARY},${Boost_IOSTREAMS_LIBRARY},${Boost_REGEX_LIBRARY}")

View File

@ -14,6 +14,7 @@ if (NOT ENABLE_LIBRARIES)
set (ENABLE_POCO_REDIS ${ENABLE_LIBRARIES} CACHE BOOL "")
set (ENABLE_POCO_ODBC ${ENABLE_LIBRARIES} CACHE BOOL "")
set (ENABLE_POCO_SQL ${ENABLE_LIBRARIES} CACHE BOOL "")
set (ENABLE_POCO_JSON ${ENABLE_LIBRARIES} CACHE BOOL "")
endif ()
set (POCO_COMPONENTS Net XML SQL Data)
@ -34,6 +35,9 @@ if (NOT DEFINED ENABLE_POCO_ODBC OR ENABLE_POCO_ODBC)
list (APPEND POCO_COMPONENTS DataODBC)
list (APPEND POCO_COMPONENTS SQLODBC)
endif ()
if (NOT DEFINED ENABLE_POCO_JSON OR ENABLE_POCO_JSON)
list (APPEND POCO_COMPONENTS JSON)
endif ()
if (NOT USE_INTERNAL_POCO_LIBRARY)
find_package (Poco COMPONENTS ${POCO_COMPONENTS})
@ -112,6 +116,11 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY)
endif ()
endif ()
if (NOT DEFINED ENABLE_POCO_JSON OR ENABLE_POCO_JSON)
set (Poco_JSON_LIBRARY PocoJSON)
set (Poco_JSON_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/poco/JSON/include/")
endif ()
if (OPENSSL_FOUND AND (NOT DEFINED ENABLE_POCO_NETSSL OR ENABLE_POCO_NETSSL))
set (Poco_NetSSL_LIBRARY PocoNetSSL ${OPENSSL_LIBRARIES})
set (Poco_Crypto_LIBRARY PocoCrypto ${OPENSSL_LIBRARIES})
@ -145,8 +154,11 @@ endif ()
if (Poco_SQLODBC_LIBRARY AND ODBC_FOUND)
set (USE_POCO_SQLODBC 1)
endif ()
if (Poco_JSON_LIBRARY)
set (USE_POCO_JSON 1)
endif ()
message(STATUS "Using Poco: ${Poco_INCLUDE_DIRS} : ${Poco_Foundation_LIBRARY},${Poco_Util_LIBRARY},${Poco_Net_LIBRARY},${Poco_NetSSL_LIBRARY},${Poco_Crypto_LIBRARY},${Poco_XML_LIBRARY},${Poco_Data_LIBRARY},${Poco_DataODBC_LIBRARY},${Poco_SQL_LIBRARY},${Poco_SQLODBC_LIBRARY},${Poco_MongoDB_LIBRARY},${Poco_Redis_LIBRARY}; MongoDB=${USE_POCO_MONGODB}, Redis=${USE_POCO_REDIS}, DataODBC=${USE_POCO_DATAODBC}, NetSSL=${USE_POCO_NETSSL}")
message(STATUS "Using Poco: ${Poco_INCLUDE_DIRS} : ${Poco_Foundation_LIBRARY},${Poco_Util_LIBRARY},${Poco_Net_LIBRARY},${Poco_NetSSL_LIBRARY},${Poco_Crypto_LIBRARY},${Poco_XML_LIBRARY},${Poco_Data_LIBRARY},${Poco_DataODBC_LIBRARY},${Poco_SQL_LIBRARY},${Poco_SQLODBC_LIBRARY},${Poco_MongoDB_LIBRARY},${Poco_Redis_LIBRARY},${Poco_JSON_LIBRARY}; MongoDB=${USE_POCO_MONGODB}, Redis=${USE_POCO_REDIS}, DataODBC=${USE_POCO_DATAODBC}, NetSSL=${USE_POCO_NETSSL}, JSON=${USE_POCO_JSON}")
# How to make sutable poco:
# use branch:

View File

@ -50,9 +50,11 @@ if (SANITIZE)
set (USE_SIMDJSON 0 CACHE BOOL "")
set (ENABLE_ORC 0 CACHE BOOL "")
set (ENABLE_PARQUET 0 CACHE BOOL "")
set (ENABLE_AVRO 0 CACHE BOOL "")
set (USE_CAPNP 0 CACHE BOOL "")
set (USE_INTERNAL_ORC_LIBRARY 0 CACHE BOOL "")
set (USE_ORC 0 CACHE BOOL "")
set (USE_AVRO 0 CACHE BOOL "")
set (ENABLE_SSL 0 CACHE BOOL "")
elseif (SANITIZE STREQUAL "thread")

View File

@ -146,6 +146,20 @@ if (ENABLE_ICU AND USE_INTERNAL_ICU_LIBRARY)
add_subdirectory (icu-cmake)
endif ()
if(USE_INTERNAL_SNAPPY_LIBRARY)
set(SNAPPY_BUILD_TESTS 0 CACHE INTERNAL "")
if (NOT MAKE_STATIC_LIBRARIES)
set(BUILD_SHARED_LIBS 1) # TODO: set at root dir
endif()
add_subdirectory(snappy)
set (SNAPPY_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/snappy")
if(SANITIZE STREQUAL "undefined")
target_compile_options(${SNAPPY_LIBRARY} PRIVATE -fno-sanitize=undefined)
endif()
endif()
if (USE_INTERNAL_PARQUET_LIBRARY)
if (USE_INTERNAL_PARQUET_LIBRARY_NATIVE_CMAKE)
# We dont use arrow's cmakefiles because they uses too many depends and download some libs in compile time
@ -189,20 +203,6 @@ if (USE_INTERNAL_PARQUET_LIBRARY_NATIVE_CMAKE)
endif()
else()
if(USE_INTERNAL_SNAPPY_LIBRARY)
set(SNAPPY_BUILD_TESTS 0 CACHE INTERNAL "")
if (NOT MAKE_STATIC_LIBRARIES)
set(BUILD_SHARED_LIBS 1) # TODO: set at root dir
endif()
add_subdirectory(snappy)
set (SNAPPY_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/snappy")
if(SANITIZE STREQUAL "undefined")
target_compile_options(${SNAPPY_LIBRARY} PRIVATE -fno-sanitize=undefined)
endif()
endif()
add_subdirectory(arrow-cmake)
# The library is large - avoid bloat.
@ -212,6 +212,10 @@ else()
endif()
endif()
if (USE_INTERNAL_AVRO_LIBRARY)
add_subdirectory(avro-cmake)
endif()
if (USE_INTERNAL_POCO_LIBRARY)
set (POCO_VERBOSE_MESSAGES 0 CACHE INTERNAL "")
set (save_CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})

1
contrib/avro vendored Submodule

@ -0,0 +1 @@
Subproject commit 89218262cde62e98fcb3778b86cd3f03056c54f3

View File

@ -0,0 +1,70 @@
set(AVROCPP_ROOT_DIR ${CMAKE_SOURCE_DIR}/contrib/avro/lang/c++)
set(AVROCPP_INCLUDE_DIR ${AVROCPP_ROOT_DIR}/api)
set(AVROCPP_SOURCE_DIR ${AVROCPP_ROOT_DIR}/impl)
set (CMAKE_CXX_STANDARD 17)
if (EXISTS ${AVROCPP_ROOT_DIR}/../../share/VERSION.txt)
file(READ "${AVROCPP_ROOT_DIR}/../../share/VERSION.txt"
AVRO_VERSION)
endif()
string(REPLACE "\n" "" AVRO_VERSION ${AVRO_VERSION})
set (AVRO_VERSION_MAJOR ${AVRO_VERSION})
set (AVRO_VERSION_MINOR "0")
set (AVROCPP_SOURCE_FILES
${AVROCPP_SOURCE_DIR}/Compiler.cc
${AVROCPP_SOURCE_DIR}/Node.cc
${AVROCPP_SOURCE_DIR}/LogicalType.cc
${AVROCPP_SOURCE_DIR}/NodeImpl.cc
${AVROCPP_SOURCE_DIR}/ResolverSchema.cc
${AVROCPP_SOURCE_DIR}/Schema.cc
${AVROCPP_SOURCE_DIR}/Types.cc
${AVROCPP_SOURCE_DIR}/ValidSchema.cc
${AVROCPP_SOURCE_DIR}/Zigzag.cc
${AVROCPP_SOURCE_DIR}/BinaryEncoder.cc
${AVROCPP_SOURCE_DIR}/BinaryDecoder.cc
${AVROCPP_SOURCE_DIR}/Stream.cc
${AVROCPP_SOURCE_DIR}/FileStream.cc
${AVROCPP_SOURCE_DIR}/Generic.cc
${AVROCPP_SOURCE_DIR}/GenericDatum.cc
${AVROCPP_SOURCE_DIR}/DataFile.cc
${AVROCPP_SOURCE_DIR}/parsing/Symbol.cc
${AVROCPP_SOURCE_DIR}/parsing/ValidatingCodec.cc
${AVROCPP_SOURCE_DIR}/parsing/JsonCodec.cc
${AVROCPP_SOURCE_DIR}/parsing/ResolvingDecoder.cc
${AVROCPP_SOURCE_DIR}/json/JsonIO.cc
${AVROCPP_SOURCE_DIR}/json/JsonDom.cc
${AVROCPP_SOURCE_DIR}/Resolver.cc
${AVROCPP_SOURCE_DIR}/Validator.cc
)
add_library (avrocpp ${AVROCPP_SOURCE_FILES})
set_target_properties (avrocpp PROPERTIES VERSION ${AVRO_VERSION_MAJOR}.${AVRO_VERSION_MINOR})
target_include_directories(avrocpp SYSTEM PUBLIC ${AVROCPP_INCLUDE_DIR})
target_include_directories(avrocpp SYSTEM PUBLIC ${Boost_INCLUDE_DIRS})
target_link_libraries (avrocpp ${Boost_IOSTREAMS_LIBRARY})
if (SNAPPY_INCLUDE_DIR AND SNAPPY_LIBRARY)
target_compile_definitions (avrocpp PUBLIC SNAPPY_CODEC_AVAILABLE)
target_include_directories (avrocpp PRIVATE ${SNAPPY_INCLUDE_DIR})
target_link_libraries (avrocpp ${SNAPPY_LIBRARY})
endif ()
if (COMPILER_GCC)
set (SUPPRESS_WARNINGS -Wno-non-virtual-dtor)
elseif (COMPILER_CLANG)
set (SUPPRESS_WARNINGS -Wno-non-virtual-dtor)
endif ()
target_compile_options(avrocpp PRIVATE ${SUPPRESS_WARNINGS})
# create a symlink to include headers with <avro/...>
ADD_CUSTOM_TARGET(avro_symlink_headers ALL
COMMAND ${CMAKE_COMMAND} -E make_directory ${AVROCPP_ROOT_DIR}/include
COMMAND ${CMAKE_COMMAND} -E create_symlink ${AVROCPP_ROOT_DIR}/api ${AVROCPP_ROOT_DIR}/include/avro
)
add_dependencies(avrocpp avro_symlink_headers)

2
contrib/boost vendored

@ -1 +1 @@
Subproject commit 830e51edb59c4f37a8638138581e1e56c29ac44f
Subproject commit 86be2aef20bee2356b744e5569eed6eaded85dbe

View File

@ -37,3 +37,8 @@ target_link_libraries(boost_filesystem_internal PRIVATE boost_system_internal)
if (USE_INTERNAL_PARQUET_LIBRARY)
add_boost_lib(regex)
endif()
if (USE_INTERNAL_AVRO_LIBRARY)
add_boost_lib(iostreams)
target_link_libraries(boost_iostreams_internal PUBLIC ${ZLIB_LIBRARIES})
endif()

View File

@ -504,6 +504,10 @@ if (USE_POCO_NETSSL)
dbms_target_link_libraries (PRIVATE ${Poco_NetSSL_LIBRARY} ${Poco_Crypto_LIBRARY})
endif()
if (USE_POCO_JSON)
dbms_target_link_libraries (PRIVATE ${Poco_JSON_LIBRARY})
endif()
dbms_target_link_libraries (PRIVATE ${Poco_Foundation_LIBRARY})
if (USE_ICU)
@ -522,6 +526,11 @@ if (USE_PARQUET)
endif ()
endif ()
if (USE_AVRO)
dbms_target_link_libraries(PRIVATE ${AVROCPP_LIBRARY})
dbms_target_include_directories (SYSTEM BEFORE PRIVATE ${AVROCPP_INCLUDE_DIR})
endif ()
if (OPENSSL_CRYPTO_LIBRARY)
dbms_target_link_libraries (PRIVATE ${OPENSSL_CRYPTO_LIBRARY})
target_link_libraries (clickhouse_common_io PRIVATE ${OPENSSL_CRYPTO_LIBRARY})

View File

@ -186,6 +186,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, input_format_values_interpret_expressions, true, "For Values format: if the field could not be parsed by streaming parser, run SQL parser and try to interpret it as SQL expression.", 0) \
M(SettingBool, input_format_values_deduce_templates_of_expressions, true, "For Values format: if the field could not be parsed by streaming parser, run SQL parser, deduce template of the SQL expression, try to parse all rows using template and then interpret expression for all rows.", 0) \
M(SettingBool, input_format_values_accurate_types_of_literals, true, "For Values format: when parsing and interpreting expressions using template, check actual type of literal to avoid possible overflow and precision issues.", 0) \
M(SettingString, input_format_avro_schema_registry_url, "", "For AvroConfluent format: Confluent Schema Registry URL.", 0) \
\
M(SettingBool, output_format_json_quote_64bit_integers, true, "Controls quoting of 64-bit integers in JSON output format.", 0) \
\
@ -197,6 +198,8 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, output_format_pretty_max_column_pad_width, 250, "Maximum width to pad all values in a column in Pretty formats.", 0) \
M(SettingBool, output_format_pretty_color, true, "Use ANSI escape sequences to paint colors in Pretty formats", 0) \
M(SettingUInt64, output_format_parquet_row_group_size, 1000000, "Row group size in rows.", 0) \
M(SettingString, output_format_avro_codec, "", "Compression codec used for output. Possible values: 'null', 'deflate', 'snappy'.", 0) \
M(SettingUInt64, output_format_avro_sync_interval, 16 * 1024, "Sync interval in bytes.", 0) \
\
M(SettingBool, use_client_time_zone, false, "Use client timezone for interpreting DateTime string values, instead of adopting server timezone.", 0) \
\

View File

@ -10,5 +10,6 @@
#cmakedefine01 USE_POCO_DATAODBC
#cmakedefine01 USE_POCO_MONGODB
#cmakedefine01 USE_POCO_REDIS
#cmakedefine01 USE_POCO_JSON
#cmakedefine01 USE_INTERNAL_LLVM_LIBRARY
#cmakedefine01 USE_SSL

View File

@ -68,6 +68,7 @@ static FormatSettings getInputFormatSetting(const Settings & settings, const Con
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
format_settings.avro.schema_registry_url = settings.input_format_avro_schema_registry_url;
return format_settings;
}
@ -99,6 +100,8 @@ static FormatSettings getOutputFormatSetting(const Settings & settings, const Co
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
format_settings.avro.output_codec = settings.output_format_avro_codec;
format_settings.avro.output_sync_interval = settings.output_format_avro_sync_interval;
return format_settings;
}
@ -325,6 +328,8 @@ FormatFactory::FormatFactory()
registerInputFormatProcessorORC(*this);
registerInputFormatProcessorParquet(*this);
registerOutputFormatProcessorParquet(*this);
registerInputFormatProcessorAvro(*this);
registerOutputFormatProcessorAvro(*this);
registerInputFormatProcessorTemplate(*this);
registerOutputFormatProcessorTemplate(*this);

View File

@ -166,6 +166,8 @@ void registerInputFormatProcessorORC(FormatFactory & factory);
void registerOutputFormatProcessorParquet(FormatFactory & factory);
void registerInputFormatProcessorProtobuf(FormatFactory & factory);
void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
void registerInputFormatProcessorAvro(FormatFactory & factory);
void registerOutputFormatProcessorAvro(FormatFactory & factory);
void registerInputFormatProcessorTemplate(FormatFactory & factory);
void registerOutputFormatProcessorTemplate(FormatFactory &factory);

View File

@ -110,6 +110,16 @@ struct FormatSettings
};
Custom custom;
struct Avro
{
String schema_registry_url;
String output_codec;
UInt64 output_sync_interval = 16 * 1024;
};
Avro avro;
};
}

View File

@ -2,6 +2,7 @@
// .h autogenerated by cmake!
#cmakedefine01 USE_AVRO
#cmakedefine01 USE_CAPNP
#cmakedefine01 USE_SNAPPY
#cmakedefine01 USE_PARQUET

View File

@ -0,0 +1,644 @@
#include "AvroRowInputFormat.h"
#if USE_AVRO
#include <numeric>
#include <Core/Defines.h>
#include <Core/Field.h>
#include <IO/Operators.h>
#include <IO/ReadHelpers.h>
#include <Formats/verbosePrintString.h>
#include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/getLeastSupertype.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <avro/Compiler.hh>
#include <avro/DataFile.hh>
#include <avro/Decoder.hh>
#include <avro/Encoder.hh>
#include <avro/Generic.hh>
#include <avro/GenericDatum.hh>
#include <avro/Node.hh>
#include <avro/NodeConcepts.hh>
#include <avro/NodeImpl.hh>
#include <avro/Reader.hh>
#include <avro/Schema.hh>
#include <avro/Specific.hh>
#include <avro/ValidSchema.hh>
#include <avro/Writer.hh>
#include <Poco/BinaryReader.h>
#include <Poco/Buffer.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <Poco/MemoryStream.h>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Poco.h>
#include <Poco/URI.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_TYPE_OF_FIELD;
extern const int BAD_ARGUMENTS;
extern const int THERE_IS_NO_COLUMN;
extern const int LOGICAL_ERROR;
extern const int INCORRECT_DATA;
extern const int CANNOT_READ_ALL_DATA;
extern const int ILLEGAL_COLUMN;
extern const int TYPE_MISMATCH;
}
class InputStreamReadBufferAdapter : public avro::InputStream
{
public:
InputStreamReadBufferAdapter(ReadBuffer & in_) : in(in_) {}
bool next(const uint8_t ** data, size_t * len)
{
if (in.eof())
{
*len = 0;
return false;
}
*data = reinterpret_cast<const uint8_t *>(in.position());
*len = in.available();
in.position() += in.available();
return true;
}
void backup(size_t len) { in.position() -= len; }
void skip(size_t len) { in.tryIgnore(len); }
size_t byteCount() const { return in.count(); }
private:
ReadBuffer & in;
};
static void deserializeNoop(IColumn &, avro::Decoder &)
{
}
AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type)
{
auto logical_type = root_node->logicalType().type();
WhichDataType target(target_type);
switch (root_node->type())
{
case avro::AVRO_STRING:
if (target.isString())
{
return [tmp = std::string()](IColumn & column, avro::Decoder & decoder) mutable
{
decoder.decodeString(tmp);
column.insertData(tmp.c_str(), tmp.length());
};
}
break;
case avro::AVRO_BYTES:
if (target.isString())
{
return [tmp = std::string()](IColumn & column, avro::Decoder & decoder) mutable
{
decoder.decodeString(tmp);
column.insertData(tmp.c_str(), tmp.length());
};
}
break;
case avro::AVRO_INT:
if (target.isInt32())
{
return [](IColumn & column, avro::Decoder & decoder)
{
assert_cast<ColumnInt32 &>(column).insertValue(decoder.decodeInt());
};
}
if (target.isDate() && logical_type == avro::LogicalType::DATE)
{
return [](IColumn & column, avro::Decoder & decoder)
{
assert_cast<DataTypeDate::ColumnType &>(column).insertValue(decoder.decodeInt());
};
}
break;
case avro::AVRO_LONG:
if (target.isInt64())
{
return [](IColumn & column, avro::Decoder & decoder)
{
assert_cast<ColumnInt64 &>(column).insertValue(decoder.decodeLong());
};
}
if (target.isDateTime64())
{
auto date_time_scale = assert_cast<const DataTypeDateTime64 &>(*target_type).getScale();
if ((logical_type == avro::LogicalType::TIMESTAMP_MILLIS && date_time_scale == 3)
|| (logical_type == avro::LogicalType::TIMESTAMP_MICROS && date_time_scale == 6))
{
return [](IColumn & column, avro::Decoder & decoder)
{
assert_cast<DataTypeDateTime64::ColumnType &>(column).insertValue(decoder.decodeLong());
};
}
}
break;
case avro::AVRO_FLOAT:
if (target.isFloat32())
{
return [](IColumn & column, avro::Decoder & decoder)
{
assert_cast<ColumnFloat32 &>(column).insertValue(decoder.decodeFloat());
};
}
break;
case avro::AVRO_DOUBLE:
if (target.isFloat64())
{
return [](IColumn & column, avro::Decoder & decoder)
{
assert_cast<ColumnFloat64 &>(column).insertValue(decoder.decodeDouble());
};
}
break;
case avro::AVRO_BOOL:
if (target.isUInt8())
{
return [](IColumn & column, avro::Decoder & decoder)
{
assert_cast<ColumnUInt8 &>(column).insertValue(decoder.decodeBool());
};
}
break;
case avro::AVRO_ARRAY:
if (target.isArray())
{
auto nested_source_type = root_node->leafAt(0);
auto nested_target_type = assert_cast<const DataTypeArray &>(*target_type).getNestedType();
auto nested_deserialize = createDeserializeFn(nested_source_type, nested_target_type);
return [nested_deserialize](IColumn & column, avro::Decoder & decoder)
{
ColumnArray & column_array = assert_cast<ColumnArray &>(column);
ColumnArray::Offsets & offsets = column_array.getOffsets();
IColumn & nested_column = column_array.getData();
size_t total = 0;
for (size_t n = decoder.arrayStart(); n != 0; n = decoder.arrayNext())
{
total += n;
for (size_t i = 0; i < n; i++)
{
nested_deserialize(nested_column, decoder);
}
}
offsets.push_back(offsets.back() + total);
};
}
break;
case avro::AVRO_UNION:
{
auto nullable_deserializer = [root_node, target_type](size_t non_null_union_index)
{
auto nested_deserialize = createDeserializeFn(root_node->leafAt(non_null_union_index), removeNullable(target_type));
return [non_null_union_index, nested_deserialize](IColumn & column, avro::Decoder & decoder)
{
ColumnNullable & col = assert_cast<ColumnNullable &>(column);
size_t union_index = decoder.decodeUnionIndex();
if (union_index == non_null_union_index)
{
nested_deserialize(col.getNestedColumn(), decoder);
col.getNullMapData().push_back(0);
}
else
{
col.insertDefault();
}
};
};
if (root_node->leaves() == 2 && target.isNullable())
{
if (root_node->leafAt(0)->type() == avro::AVRO_NULL)
return nullable_deserializer(1);
if (root_node->leafAt(1)->type() == avro::AVRO_NULL)
return nullable_deserializer(0);
}
break;
}
case avro::AVRO_NULL:
if (target.isNullable())
{
auto nested_type = removeNullable(target_type);
if (nested_type->getTypeId() == TypeIndex::Nothing)
{
return [](IColumn &, avro::Decoder & decoder)
{
decoder.decodeNull();
};
}
else
{
return [](IColumn & column, avro::Decoder & decoder)
{
ColumnNullable & col = assert_cast<ColumnNullable &>(column);
decoder.decodeNull();
col.insertDefault();
};
}
}
break;
case avro::AVRO_ENUM:
if (target.isString())
{
std::vector<std::string> symbols;
for (size_t i = 0; i < root_node->names(); i++)
{
symbols.push_back(root_node->nameAt(i));
}
return [symbols](IColumn & column, avro::Decoder & decoder)
{
size_t enum_index = decoder.decodeEnum();
const auto & enum_symbol = symbols[enum_index];
column.insertData(enum_symbol.c_str(), enum_symbol.length());
};
}
if (target.isEnum())
{
const auto & enum_type = dynamic_cast<const IDataTypeEnum &>(*target_type);
std::vector<Field> symbol_mapping;
for (size_t i = 0; i < root_node->names(); i++)
{
symbol_mapping.push_back(enum_type.castToValue(root_node->nameAt(i)));
}
return [symbol_mapping](IColumn & column, avro::Decoder & decoder)
{
size_t enum_index = decoder.decodeEnum();
column.insert(symbol_mapping[enum_index]);
};
}
break;
case avro::AVRO_FIXED:
{
size_t fixed_size = root_node->fixedSize();
if (target.isFixedString() && target_type->getSizeOfValueInMemory() == fixed_size)
{
return [tmp_fixed = std::vector<uint8_t>(fixed_size)](IColumn & column, avro::Decoder & decoder) mutable
{
decoder.decodeFixed(tmp_fixed.size(), tmp_fixed);
column.insertData(reinterpret_cast<const char *>(tmp_fixed.data()), tmp_fixed.size());
};
}
break;
}
case avro::AVRO_MAP:
case avro::AVRO_RECORD:
default:
break;
}
throw Exception(
"Type " + target_type->getName() + " is not compatible" + " with Avro " + avro::ValidSchema(root_node).toJson(false),
ErrorCodes::ILLEGAL_COLUMN);
}
AvroDeserializer::SkipFn AvroDeserializer::createSkipFn(avro::NodePtr root_node)
{
switch (root_node->type())
{
case avro::AVRO_STRING:
return [](avro::Decoder & decoder) { decoder.skipString(); };
case avro::AVRO_BYTES:
return [](avro::Decoder & decoder) { decoder.skipBytes(); };
case avro::AVRO_INT:
return [](avro::Decoder & decoder) { decoder.decodeInt(); };
case avro::AVRO_LONG:
return [](avro::Decoder & decoder) { decoder.decodeLong(); };
case avro::AVRO_FLOAT:
return [](avro::Decoder & decoder) { decoder.decodeFloat(); };
case avro::AVRO_DOUBLE:
return [](avro::Decoder & decoder) { decoder.decodeDouble(); };
case avro::AVRO_BOOL:
return [](avro::Decoder & decoder) { decoder.decodeBool(); };
case avro::AVRO_ARRAY:
{
auto nested_skip_fn = createSkipFn(root_node->leafAt(0));
return [nested_skip_fn](avro::Decoder & decoder)
{
for (size_t n = decoder.arrayStart(); n != 0; n = decoder.arrayNext())
{
for (size_t i = 0; i < n; ++i)
{
nested_skip_fn(decoder);
}
}
};
}
case avro::AVRO_UNION:
{
std::vector<SkipFn> union_skip_fns;
for (size_t i = 0; i < root_node->leaves(); i++)
{
union_skip_fns.push_back(createSkipFn(root_node->leafAt(i)));
}
return [union_skip_fns](avro::Decoder & decoder) { union_skip_fns[decoder.decodeUnionIndex()](decoder); };
}
case avro::AVRO_NULL:
return [](avro::Decoder & decoder) { decoder.decodeNull(); };
case avro::AVRO_ENUM:
return [](avro::Decoder & decoder) { decoder.decodeEnum(); };
case avro::AVRO_FIXED:
{
auto fixed_size = root_node->fixedSize();
return [fixed_size](avro::Decoder & decoder) { decoder.skipFixed(fixed_size); };
}
case avro::AVRO_MAP:
{
auto value_skip_fn = createSkipFn(root_node->leafAt(1));
return [value_skip_fn](avro::Decoder & decoder)
{
for (size_t n = decoder.mapStart(); n != 0; n = decoder.mapNext())
{
for (size_t i = 0; i < n; ++i)
{
decoder.skipString();
value_skip_fn(decoder);
}
}
};
}
case avro::AVRO_RECORD:
{
std::vector<SkipFn> field_skip_fns;
for (size_t i = 0; i < root_node->leaves(); i++)
{
field_skip_fns.push_back(createSkipFn(root_node->leafAt(i)));
}
return [field_skip_fns](avro::Decoder & decoder)
{
for (auto & skip_fn : field_skip_fns)
skip_fn(decoder);
};
}
default:
throw Exception("Unsupported Avro type", ErrorCodes::ILLEGAL_COLUMN);
}
}
AvroDeserializer::AvroDeserializer(const DB::ColumnsWithTypeAndName & columns, avro::ValidSchema schema)
{
auto schema_root = schema.root();
if (schema_root->type() != avro::AVRO_RECORD)
{
throw Exception("Root schema must be a record", ErrorCodes::TYPE_MISMATCH);
}
field_mapping.resize(schema_root->leaves(), -1);
for (size_t i = 0; i < schema_root->leaves(); ++i)
{
skip_fns.push_back(createSkipFn(schema_root->leafAt(i)));
deserialize_fns.push_back(&deserializeNoop);
}
for (size_t i = 0; i < columns.size(); ++i)
{
const auto & column = columns[i];
size_t field_index;
if (!schema_root->nameIndex(column.name, field_index))
{
throw Exception("Field " + column.name + " not found in Avro schema", ErrorCodes::THERE_IS_NO_COLUMN);
}
auto field_schema = schema_root->leafAt(field_index);
try
{
deserialize_fns[field_index] = createDeserializeFn(field_schema, column.type);
}
catch (Exception & e)
{
e.addMessage("column " + column.name);
e.rethrow();
}
field_mapping[field_index] = i;
}
}
void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder & decoder)
{
for (size_t i = 0; i < field_mapping.size(); i++)
{
if (field_mapping[i] >= 0)
{
deserialize_fns[i](*columns[field_mapping[i]], decoder);
}
else
{
skip_fns[i](decoder);
}
}
}
AvroRowInputFormat::AvroRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_)
: IRowInputFormat(header_, in_, params_)
, file_reader(std::make_unique<InputStreamReadBufferAdapter>(in_))
, deserializer(header_.getColumnsWithTypeAndName(), file_reader.dataSchema())
{
file_reader.init();
}
bool AvroRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
if (file_reader.hasMore())
{
file_reader.decr();
deserializer.deserializeRow(columns, file_reader.decoder());
return true;
}
return false;
}
#ifdef USE_POCO_JSON
class AvroConfluentRowInputFormat::SchemaRegistry
{
public:
SchemaRegistry(const std::string & base_url_)
{
if (base_url_.empty())
{
throw Exception("Empty Schema Registry URL", ErrorCodes::BAD_ARGUMENTS);
}
try
{
base_url = base_url_;
}
catch (Poco::SyntaxException & e)
{
throw Exception("Invalid Schema Registry URL: " + e.displayText(), ErrorCodes::BAD_ARGUMENTS);
}
}
avro::ValidSchema getSchema(uint32_t id)
{
try
{
try
{
Poco::URI url(base_url, "/schemas/ids/" + std::to_string(id));
Poco::Net::HTTPClientSession session(url.getHost(), url.getPort());
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery());
session.sendRequest(request);
Poco::Net::HTTPResponse response;
auto & response_body = session.receiveResponse(response);
if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK)
{
throw Exception("http code " + std::to_string(response.getStatus()), ErrorCodes::INCORRECT_DATA);
}
Poco::JSON::Parser parser;
auto json_body = parser.parse(response_body).extract<Poco::JSON::Object::Ptr>();
auto schema = json_body->getValue<std::string>("schema");
return avro::compileJsonSchemaFromString(schema);
}
catch (const Exception & e)
{
throw e;
}
catch (const Poco::Exception & e)
{
throw Exception(Exception::CreateFromPoco, e);
}
catch (const avro::Exception & e)
{
throw Exception(e.what(), ErrorCodes::INCORRECT_DATA);
}
}
catch (Exception & e)
{
e.addMessage("while fetching schema id=" + std::to_string(id));
throw;
}
}
private:
Poco::URI base_url;
};
static uint32_t readConfluentSchemaId(ReadBuffer & in)
{
Poco::Buffer<char> buf(5);
in.readStrict(buf.begin(), buf.capacity());
Poco::MemoryBinaryReader binary_reader(buf, Poco::BinaryReader::BIG_ENDIAN_BYTE_ORDER);
uint8_t magic;
uint32_t schema_id;
binary_reader >> magic >> schema_id;
if (magic != 0x00)
{
throw Exception("Invalid magic byte", ErrorCodes::INCORRECT_DATA);
}
return schema_id;
}
AvroConfluentRowInputFormat::AvroConfluentRowInputFormat(
const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_.cloneEmpty(), in_, params_)
, header_columns(header_.getColumnsWithTypeAndName())
, schema_registry(std::make_unique<SchemaRegistry>(format_settings_.avro.schema_registry_url))
, input_stream(std::make_unique<InputStreamReadBufferAdapter>(in))
, decoder(avro::binaryDecoder())
{
(void)format_settings_;
decoder->init(*input_stream);
}
bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
if (in.eof())
{
return false;
}
SchemaId schema_id = readConfluentSchemaId(in);
auto & deserializer = getOrCreateDeserializer(schema_id);
deserializer.deserializeRow(columns, *decoder);
decoder->drain();
return true;
}
AvroDeserializer & AvroConfluentRowInputFormat::getOrCreateDeserializer(SchemaId schema_id)
{
auto it = deserializer_cache.find(schema_id);
if (it == deserializer_cache.end())
{
auto schema = schema_registry->getSchema(schema_id);
AvroDeserializer deserializer(header_columns, schema);
it = deserializer_cache.emplace(schema_id, deserializer).first;
}
return it->second;
}
#endif
void registerInputFormatProcessorAvro(FormatFactory & factory)
{
factory.registerInputFormatProcessor("Avro", [=](
ReadBuffer & buf,
const Block & sample,
const RowInputFormatParams & params,
const FormatSettings &)
{
return std::make_shared<AvroRowInputFormat>(sample, buf, params);
});
#ifdef USE_POCO_JSON
factory.registerInputFormatProcessor("AvroConfluent",[=](
ReadBuffer & buf,
const Block & sample,
const RowInputFormatParams & params,
const FormatSettings & settings)
{
return std::make_shared<AvroConfluentRowInputFormat>(sample, buf, params, settings);
});
#endif
}
}
#else
namespace DB
{
class FormatFactory;
void registerInputFormatProcessorAvro(FormatFactory &)
{
}
}
#endif

View File

@ -0,0 +1,73 @@
#pragma once
#include "config_formats.h"
#if USE_AVRO
#include <unordered_map>
#include <vector>
#include <Core/Block.h>
#include <Formats/FormatSchemaInfo.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <avro/DataFile.hh>
#include <avro/Decoder.hh>
#include <avro/Schema.hh>
#include <avro/ValidSchema.hh>
namespace DB
{
class AvroDeserializer
{
public:
AvroDeserializer(const DB::ColumnsWithTypeAndName & columns, avro::ValidSchema schema);
void deserializeRow(MutableColumns & columns, avro::Decoder & decoder);
private:
using DeserializeFn = std::function<void(IColumn & column, avro::Decoder & decoder)>;
using SkipFn = std::function<void(avro::Decoder & decoder)>;
static DeserializeFn createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type);
static SkipFn createSkipFn(avro::NodePtr root_node);
std::vector<int> field_mapping;
std::vector<SkipFn> skip_fns;
std::vector<DeserializeFn> deserialize_fns;
};
class AvroRowInputFormat : public IRowInputFormat
{
public:
AvroRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_);
virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
String getName() const override { return "AvroRowInputFormat"; }
private:
avro::DataFileReaderBase file_reader;
AvroDeserializer deserializer;
};
#ifdef USE_POCO_JSON
class AvroConfluentRowInputFormat : public IRowInputFormat
{
public:
AvroConfluentRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_);
virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
String getName() const override { return "AvroConfluentRowInputFormat"; }
private:
const DB::ColumnsWithTypeAndName header_columns;
class SchemaRegistry;
std::unique_ptr<SchemaRegistry> schema_registry;
using SchemaId = uint32_t;
std::unordered_map<SchemaId, AvroDeserializer> deserializer_cache;
AvroDeserializer & getOrCreateDeserializer(SchemaId schema_id);
avro::InputStreamPtr input_stream;
avro::DecoderPtr decoder;
};
#endif
}
#endif

View File

@ -0,0 +1,367 @@
#include "AvroRowOutputFormat.h"
#if USE_AVRO
#include <Core/Defines.h>
#include <Core/Field.h>
#include <IO/Operators.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <Formats/verbosePrintString.h>
#include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <avro/Compiler.hh>
#include <avro/DataFile.hh>
#include <avro/Decoder.hh>
#include <avro/Encoder.hh>
#include <avro/Generic.hh>
#include <avro/GenericDatum.hh>
#include <avro/Node.hh>
#include <avro/NodeConcepts.hh>
#include <avro/NodeImpl.hh>
#include <avro/Reader.hh>
#include <avro/Schema.hh>
#include <avro/Specific.hh>
#include <avro/ValidSchema.hh>
#include <avro/Writer.hh>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_TYPE_OF_FIELD;
extern const int BAD_ARGUMENTS;
extern const int THERE_IS_NO_COLUMN;
extern const int LOGICAL_ERROR;
extern const int INCORRECT_DATA;
extern const int CANNOT_READ_ALL_DATA;
}
class OutputStreamWriteBufferAdapter : public avro::OutputStream
{
public:
OutputStreamWriteBufferAdapter(WriteBuffer & out_) : out(out_) {}
virtual bool next(uint8_t ** data, size_t * len) override
{
out.nextIfAtEnd();
*data = reinterpret_cast<uint8_t *>(out.position());
*len = out.available();
out.position() += out.available();
return true;
}
virtual void backup(size_t len) override { out.position() -= len; }
virtual uint64_t byteCount() const override { return out.count(); }
virtual void flush() override { out.next(); }
private:
WriteBuffer & out;
};
AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeFn(DataTypePtr data_type)
{
switch (data_type->getTypeId())
{
case TypeIndex::UInt8:
return {avro::BoolSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
encoder.encodeBool(assert_cast<const ColumnUInt8 &>(column).getElement(row_num));
}};
case TypeIndex::Int32:
return {avro::IntSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
encoder.encodeInt(assert_cast<const ColumnInt32 &>(column).getElement(row_num));
}};
case TypeIndex::Int64:
return {avro::LongSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
encoder.encodeLong(assert_cast<const ColumnInt64 &>(column).getElement(row_num));
}};
case TypeIndex::Float32:
return {avro::FloatSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
encoder.encodeFloat(assert_cast<const ColumnFloat32 &>(column).getElement(row_num));
}};
case TypeIndex::Float64:
return {avro::DoubleSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
encoder.encodeDouble(assert_cast<const ColumnFloat64 &>(column).getElement(row_num));
}};
case TypeIndex::Date:
{
auto schema = avro::IntSchema();
schema.root()->setLogicalType(avro::LogicalType(avro::LogicalType::DATE));
return {schema, [](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
UInt16 date = assert_cast<const DataTypeDate::ColumnType &>(column).getElement(row_num);
encoder.encodeInt(date);
}};
}
case TypeIndex::DateTime64:
{
auto schema = avro::LongSchema();
const auto & provided_type = assert_cast<const DataTypeDateTime64 &>(*data_type);
if (provided_type.getScale() == 3)
schema.root()->setLogicalType(avro::LogicalType(avro::LogicalType::TIMESTAMP_MILLIS));
else if (provided_type.getScale() == 6)
schema.root()->setLogicalType(avro::LogicalType(avro::LogicalType::TIMESTAMP_MICROS));
else
break;
return {schema, [](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
const auto & col = assert_cast<const DataTypeDateTime64::ColumnType &>(column);
encoder.encodeLong(col.getElement(row_num));
}};
}
case TypeIndex::String:
return {avro::StringSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
const StringRef & s = assert_cast<const ColumnString &>(column).getDataAt(row_num);
encoder.encodeBytes(reinterpret_cast<const uint8_t *>(s.data), s.size);
}};
case TypeIndex::FixedString:
{
auto schema = avro::FixedSchema(data_type->getSizeOfValueInMemory(), "fixed");
return {schema, [](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
const StringRef & s = assert_cast<const ColumnFixedString &>(column).getDataAt(row_num);
encoder.encodeFixed(reinterpret_cast<const uint8_t *>(s.data), s.size);
}};
}
case TypeIndex::Enum8:
{
auto schema = avro::EnumSchema("enum8");
std::unordered_map<DataTypeEnum8::FieldType, size_t> enum_mapping;
const auto & enum_values = assert_cast<const DataTypeEnum8 &>(*data_type).getValues();
for (size_t i = 0; i < enum_values.size(); ++i)
{
schema.addSymbol(enum_values[i].first);
enum_mapping.emplace(enum_values[i].second, i);
}
return {schema, [enum_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
auto enum_value = assert_cast<const DataTypeEnum8::ColumnType &>(column).getElement(row_num);
encoder.encodeEnum(enum_mapping.at(enum_value));
}};
}
case TypeIndex::Enum16:
{
auto schema = avro::EnumSchema("enum16");
std::unordered_map<DataTypeEnum16::FieldType, size_t> enum_mapping;
const auto & enum_values = assert_cast<const DataTypeEnum16 &>(*data_type).getValues();
for (size_t i = 0; i < enum_values.size(); ++i)
{
schema.addSymbol(enum_values[i].first);
enum_mapping.emplace(enum_values[i].second, i);
}
return {schema, [enum_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
auto enum_value = assert_cast<const DataTypeEnum16::ColumnType &>(column).getElement(row_num);
encoder.encodeEnum(enum_mapping.at(enum_value));
}};
}
case TypeIndex::Array:
{
const auto & array_type = assert_cast<const DataTypeArray &>(*data_type);
auto nested_mapping = createSchemaWithSerializeFn(array_type.getNestedType());
auto schema = avro::ArraySchema(nested_mapping.schema);
return {schema, [nested_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
const ColumnArray & column_array = assert_cast<const ColumnArray &>(column);
const ColumnArray::Offsets & offsets = column_array.getOffsets();
size_t offset = offsets[row_num - 1];
size_t next_offset = offsets[row_num];
size_t row_count = next_offset - offset;
const IColumn & nested_column = column_array.getData();
encoder.arrayStart();
if (row_count > 0)
{
encoder.setItemCount(row_count);
}
for (size_t i = offset; i < next_offset; ++i)
{
nested_mapping.serialize(nested_column, i, encoder);
}
encoder.arrayEnd();
}};
}
case TypeIndex::Nullable:
{
auto nested_type = removeNullable(data_type);
auto nested_mapping = createSchemaWithSerializeFn(nested_type);
if (nested_type->getTypeId() == TypeIndex::Nothing)
{
return nested_mapping;
}
else
{
avro::UnionSchema union_schema;
union_schema.addType(avro::NullSchema());
union_schema.addType(nested_mapping.schema);
return {union_schema, [nested_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
const ColumnNullable & col = assert_cast<const ColumnNullable &>(column);
if (!col.isNullAt(row_num))
{
encoder.encodeUnionIndex(1);
nested_mapping.serialize(col.getNestedColumn(), row_num, encoder);
}
else
{
encoder.encodeUnionIndex(0);
encoder.encodeNull();
}
}};
}
}
case TypeIndex::LowCardinality:
{
const auto & nested_type = removeLowCardinality(data_type);
auto nested_mapping = createSchemaWithSerializeFn(nested_type);
return {nested_mapping.schema, [nested_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder)
{
const auto & col = assert_cast<const ColumnLowCardinality &>(column);
nested_mapping.serialize(*col.getDictionary().getNestedColumn(), col.getIndexAt(row_num), encoder);
}};
}
case TypeIndex::Nothing:
return {avro::NullSchema(), [](const IColumn &, size_t, avro::Encoder & encoder) { encoder.encodeNull(); }};
default:
break;
}
throw Exception("Type " + data_type->getName() + " is not supported for Avro output", ErrorCodes::ILLEGAL_COLUMN);
}
AvroSerializer::AvroSerializer(const ColumnsWithTypeAndName & columns)
{
avro::RecordSchema record_schema("row");
for (auto & column : columns)
{
try
{
auto field_mapping = createSchemaWithSerializeFn(column.type);
serialize_fns.push_back(field_mapping.serialize);
//TODO: verify name starts with A-Za-z_
record_schema.addField(column.name, field_mapping.schema);
}
catch (Exception & e)
{
e.addMessage("column " + column.name);
e.rethrow();
}
}
schema.setSchema(record_schema);
}
void AvroSerializer::serializeRow(const Columns & columns, size_t row_num, avro::Encoder & encoder)
{
size_t num_columns = columns.size();
for (size_t i = 0; i < num_columns; ++i)
{
serialize_fns[i](*columns[i], row_num, encoder);
}
}
static avro::Codec getCodec(const std::string& codec_name)
{
if (codec_name == "")
{
#ifdef SNAPPY_CODEC_AVAILABLE
return avro::Codec::SNAPPY_CODEC;
#else
return avro::Codec::DEFLATE_CODEC;
#endif
}
if (codec_name == "null") return avro::Codec::NULL_CODEC;
if (codec_name == "deflate") return avro::Codec::DEFLATE_CODEC;
#ifdef SNAPPY_CODEC_AVAILABLE
if (codec_name == "snappy") return avro::Codec::SNAPPY_CODEC;
#endif
throw Exception("Avro codec " + codec_name + " is not available", ErrorCodes::BAD_ARGUMENTS);
}
AvroRowOutputFormat::AvroRowOutputFormat(
WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_, callback)
, settings(settings_)
, serializer(header_.getColumnsWithTypeAndName())
, file_writer(
std::make_unique<OutputStreamWriteBufferAdapter>(out_),
serializer.getSchema(),
settings.avro.output_sync_interval,
getCodec(settings.avro.output_codec))
{
}
AvroRowOutputFormat::~AvroRowOutputFormat() = default;
void AvroRowOutputFormat::writePrefix()
{
file_writer.syncIfNeeded();
}
void AvroRowOutputFormat::write(const Columns & columns, size_t row_num)
{
file_writer.syncIfNeeded();
serializer.serializeRow(columns, row_num, file_writer.encoder());
file_writer.incr();
}
void AvroRowOutputFormat::writeSuffix()
{
file_writer.close();
}
void registerOutputFormatProcessorAvro(FormatFactory & factory)
{
factory.registerOutputFormatProcessor("Avro",[=](
WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback callback,
const FormatSettings & settings)
{
return std::make_shared<AvroRowOutputFormat>(buf, sample, callback, settings);
});
}
}
#else
namespace DB
{
class FormatFactory;
void registerOutputFormatProcessorAvro(FormatFactory &)
{
}
}
#endif

View File

@ -0,0 +1,60 @@
#pragma once
#include "config_formats.h"
#if USE_AVRO
#include <unordered_map>
#include <Core/Block.h>
#include <Formats/FormatSchemaInfo.h>
#include <Formats/FormatSettings.h>
#include <IO/WriteBuffer.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <avro/DataFile.hh>
#include <avro/Schema.hh>
#include <avro/ValidSchema.hh>
namespace DB
{
class WriteBuffer;
class AvroSerializer
{
public:
AvroSerializer(const ColumnsWithTypeAndName & columns);
const avro::ValidSchema & getSchema() const { return schema; }
void serializeRow(const Columns & columns, size_t row_num, avro::Encoder & encoder);
private:
using SerializeFn = std::function<void(const IColumn & column, size_t row_num, avro::Encoder & encoder)>;
struct SchemaWithSerializeFn
{
avro::Schema schema;
SerializeFn serialize;
};
static SchemaWithSerializeFn createSchemaWithSerializeFn(DataTypePtr data_type);
std::vector<SerializeFn> serialize_fns;
avro::ValidSchema schema;
};
class AvroRowOutputFormat : public IRowOutputFormat
{
public:
AvroRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_);
virtual ~AvroRowOutputFormat() override;
String getName() const override { return "AvroRowOutputFormat"; }
void write(const Columns & columns, size_t row_num) override;
void writeField(const IColumn &, const IDataType &, size_t) override {}
virtual void writePrefix() override;
virtual void writeSuffix() override;
private:
FormatSettings settings;
AvroSerializer serializer;
avro::DataFileWriterBase file_writer;
};
}
#endif

View File

@ -0,0 +1,36 @@
=== input
= primitive
1,1,2,3.4,5.6,"b1","s1"
0,-1,9223372036854775807,3.00004,0.00001,"",""
1,2,"s1"
0,9223372036854775807,""
"s1",2,1
"",9223372036854775807,0
"s1"
""
= complex
"A","t","['s1','s2']","[['a1'],['a2']]","s1",\N,"79cd909892d7e7ade1987cc7422628ba"
"C","f","[]","[]",\N,123,"79cd909892d7e7ade1987cc7422628ba"
"79cd909892d7e7ade1987cc7422628ba"
"79cd909892d7e7ade1987cc7422628ba"
= logical_types
"2019-12-20","2020-01-10 07:31:56.227","2020-01-10 07:31:56.227000"
18250,1578641516227,1578641516227000
= compression
1000
1000
= other
0
not compatible
not found
=== output
= primitive
1,1,2,3.4,5.6,"b1","s1"
= complex
"A","t","['s1','s2']","[['a1'],['a2']]","s1",\N,"79cd909892d7e7ade1987cc7422628ba"
= logical_types
"2019-12-20","2020-01-10 07:31:56.227","2020-01-10 07:31:56.227000"
= other
0
1000
not supported

View File

@ -0,0 +1,70 @@
#!/usr/bin/env bash
set -e
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CUR_DIR/../shell_config.sh
DATA_DIR=$CUR_DIR/data_avro
# input
echo === input
echo = primitive
cat $DATA_DIR/primitive.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a_bool UInt8, b_int Int32, c_long Int64, d_float Float32, e_double Float64, f_bytes String, g_string String' -q 'select * from table'
cat $DATA_DIR/primitive.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a_bool UInt8, c_long Int64, g_string String' -q 'select * from table'
cat $DATA_DIR/primitive.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'g_string String, c_long Int64, a_bool UInt8' -q 'select * from table'
cat $DATA_DIR/primitive.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'g_string String' -q 'select * from table'
echo = complex
cat $DATA_DIR/complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "a_enum_to_string String, b_enum_to_enum Enum('t' = 1, 'f' = 0), c_array_string Array(String), d_array_array_string Array(Array(String)), e_union_null_string Nullable(String), f_union_long_null Nullable(Int64), g_fixed FixedString(32)" -q 'select * from table'
cat $DATA_DIR/complex.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "g_fixed FixedString(32)" -q 'select * from table'
echo = logical_types
cat $DATA_DIR/logical_types.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "a_date Date, b_timestamp_millis DateTime64(3, 'UTC'), c_timestamp_micros DateTime64(6, 'UTC')" -q 'select * from table'
cat $DATA_DIR/logical_types.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a_date Int32, b_timestamp_millis Int64, c_timestamp_micros Int64' -q 'select * from table'
echo = compression
cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'
cat $DATA_DIR/simple.deflate.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'
#snappy is optional
#cat $DATA_DIR/simple.snappy.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'
echo = other
#no data
cat $DATA_DIR/empty.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int64' -q 'select count() from table'
# type mismatch
cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'a Int32' -q 'select count() from table' 2>&1 | grep -i 'not compatible' -o
# field not found
cat $DATA_DIR/simple.null.avro | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S 'b Int64' -q 'select count() from table' 2>&1 | grep -i 'not found' -o
# output
echo === output
echo = primitive
S1="a_bool UInt8, b_int Int32, c_long Int64, d_float Float32, e_double Float64, f_bytes String, g_string String"
echo '1,1,2,3.4,5.6,"b1","s1"' | ${CLICKHOUSE_LOCAL} --input-format CSV -S "$S1" -q "select * from table format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S1" -q 'select * from table'
echo = complex
S2="a_enum_to_string String, b_enum_to_enum Enum('t' = 1, 'f' = 0), c_array_string Array(String), d_array_array_string Array(Array(String)), e_union_null_string Nullable(String), f_union_long_null Nullable(Int64), g_fixed FixedString(32)"
echo "\"A\",\"t\",\"['s1','s2']\",\"[['a1'],['a2']]\",\"s1\",\N,\"79cd909892d7e7ade1987cc7422628ba\"" | ${CLICKHOUSE_LOCAL} --input-format CSV -S "$S2" -q "select * from table format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S2" -q 'select * from table'
echo = logical_types
S3="a_date Date, b_timestamp_millis DateTime64(3, 'UTC'), c_timestamp_micros DateTime64(6, 'UTC')"
echo '"2019-12-20","2020-01-10 07:31:56.227","2020-01-10 07:31:56.227000"' | ${CLICKHOUSE_LOCAL} --input-format CSV -S "$S3" -q "select * from table format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S3" -q 'select * from table'
echo = other
S4="a Int64"
${CLICKHOUSE_LOCAL} -q "select toInt64(number) as a from numbers(0) format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S4" -q 'select count() from table'
${CLICKHOUSE_LOCAL} -q "select toInt64(number) as a from numbers(1000) format Avro" | ${CLICKHOUSE_LOCAL} --input-format Avro --output-format CSV -S "$S4" -q 'select count() from table'
# type not supported
${CLICKHOUSE_LOCAL} -q "select toInt16(123) as a format Avro" 2>&1 | grep -i 'not supported' -o

Binary file not shown.

View File

@ -0,0 +1,20 @@
{
"type": "record",
"name": "row",
"fields": [
{"name": "a_enum_to_string", "type": { "type": "enum", "name": "enum_1", "symbols" : ["A", "B", "C"]}},
{"name": "b_enum_to_enum", "type": { "type": "enum", "name": "enum_2", "symbols" : ["t", "f"]}},
{"name": "c_array_string", "type": { "type": "array", "items": "string"}},
{"name": "d_array_array_string", "type": { "type": "array", "items": {"type": "array", "items": "string"}}},
{"name": "e_union_null_string", "type": ["null", "string"]},
{"name": "f_union_long_null", "type": ["long", "null"]},
{"name": "g_fixed", "type": {"type":"fixed", "size": 32, "name": "fixed_1"}},
{"name": "h_record_skip", "type": {
"type": "record",
"name": "subrecord",
"fields": [
{"name": "a", "type": "string"}
]
}}
]
}

View File

@ -0,0 +1,2 @@
{"a_enum_to_string":"A","b_enum_to_enum":"t","c_array_string":["s1", "s2"],"d_array_array_string":[["a1"], ["a2"]],"e_union_null_string":{"string": "s1"},"f_union_long_null":null,"g_fixed":"79cd909892d7e7ade1987cc7422628ba","h_record_skip":{"a": "a"}}
{"a_enum_to_string":"C","b_enum_to_enum":"f","c_array_string":[],"d_array_array_string":[],"e_union_null_string":null,"f_union_long_null":{"long": 123},"g_fixed":"79cd909892d7e7ade1987cc7422628ba","h_record_skip":{"a": "a"}}

Binary file not shown.

View File

@ -0,0 +1,7 @@
{
"type": "record",
"name": "row",
"fields": [
{"name": "a", "type": "long"}
]
}

View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
#avro tools: https://www.apache.org/dyn/closer.cgi?path=avro/avro-1.9.1/java/avro-tools-1.9.1.jar
avro-tools fromjson --schema-file primitive.avsc primitive.json > primitive.avro
avro-tools fromjson --schema-file complex.avsc complex.json > complex.avro
avro-tools fromjson --schema-file logical_types.avsc logical_types.json > logical_types.avro
avro-tools fromjson --schema-file empty.avsc empty.json > empty.avro
#compression
avro-tools fromjson --codec null --schema-file simple.avsc simple.json > simple.null.avro
avro-tools fromjson --codec deflate --schema-file simple.avsc simple.json > simple.deflate.avro
avro-tools fromjson --codec snappy --schema-file simple.avsc simple.json > simple.snappy.avro

View File

@ -0,0 +1,9 @@
{
"type": "record",
"name": "row",
"fields": [
{"name": "a_date", "type": { "type": "int", "logicalType": "date"}},
{"name": "b_timestamp_millis", "type": { "type": "long", "logicalType": "timestamp-millis"}},
{"name": "c_timestamp_micros", "type": { "type": "long", "logicalType": "timestamp-micros"}}
]
}

View File

@ -0,0 +1 @@
{"a_date":18250,"b_timestamp_millis":1578641516227,"c_timestamp_micros":1578641516227000}

View File

@ -0,0 +1,14 @@
{
"type": "record",
"name": "row",
"fields": [
{"name": "a_bool", "type": "boolean"},
{"name": "b_int", "type": "int"},
{"name": "c_long", "type": "long"},
{"name": "d_float", "type": "float"},
{"name": "e_double", "type": "double"},
{"name": "f_bytes", "type": "bytes"},
{"name": "g_string", "type": "string"},
{"name": "h_null", "type": "null"}
]
}

View File

@ -0,0 +1,2 @@
{"a_bool":true,"b_int":1,"c_long":2,"d_float":3.4,"e_double":5.6,"f_bytes":"b1","g_string":"s1","h_null": null}
{"a_bool":false,"b_int":-1,"c_long":9223372036854775807,"d_float":3.00004,"e_double":0.00001,"f_bytes":"","g_string":"","h_null": null}

View File

@ -0,0 +1,7 @@
{
"type": "record",
"name": "row",
"fields": [
{"name": "a", "type": "long"}
]
}

File diff suppressed because it is too large Load Diff