Add Avro formats

Add Avro file input/output formats
Add AvroConfluent input format (for Kafka)
This commit is contained in:
Andrew Onyshchuk 2020-01-08 03:13:12 -06:00
parent 8140b2f75a
commit 7320447f92
20 changed files with 1246 additions and 3 deletions

6
.gitmodules vendored
View File

@ -46,7 +46,7 @@
url = https://github.com/ClickHouse-Extras/protobuf.git url = https://github.com/ClickHouse-Extras/protobuf.git
[submodule "contrib/boost"] [submodule "contrib/boost"]
path = contrib/boost path = contrib/boost
url = https://github.com/ClickHouse-Extras/boost.git url = https://github.com/oandrew/clickhouse-boost
[submodule "contrib/base64"] [submodule "contrib/base64"]
path = contrib/base64 path = contrib/base64
url = https://github.com/aklomp/base64.git url = https://github.com/aklomp/base64.git
@ -137,3 +137,7 @@
[submodule "contrib/ryu"] [submodule "contrib/ryu"]
path = contrib/ryu path = contrib/ryu
url = https://github.com/ClickHouse-Extras/ryu.git url = https://github.com/ClickHouse-Extras/ryu.git
[submodule "contrib/avro"]
path = contrib/avro
url = https://github.com/apache/avro.git
ignore = untracked

View File

@ -351,6 +351,7 @@ include (cmake/find/simdjson.cmake)
include (cmake/find/rapidjson.cmake) include (cmake/find/rapidjson.cmake)
include (cmake/find/fastops.cmake) include (cmake/find/fastops.cmake)
include (cmake/find/orc.cmake) include (cmake/find/orc.cmake)
include (cmake/find/avro.cmake)
find_contrib_lib(cityhash) find_contrib_lib(cityhash)
find_contrib_lib(farmhash) find_contrib_lib(farmhash)

43
cmake/find/avro.cmake Normal file
View File

@ -0,0 +1,43 @@
option (ENABLE_AVRO "Enable Avro" ${ENABLE_LIBRARIES})
if (ENABLE_AVRO)
option (USE_INTERNAL_AVRO_LIBRARY "Set to FALSE to use system avro library instead of bundled" ${NOT_UNBUNDLED})
if(NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/avro/lang/c++/CMakeLists.txt")
if(USE_INTERNAL_AVRO_LIBRARY)
message(WARNING "submodule contrib/avro is missing. to fix try run: \n git submodule update --init --recursive")
endif()
set(MISSING_INTERNAL_AVRO_LIBRARY 1)
set(USE_INTERNAL_AVRO_LIBRARY 0)
endif()
if (NOT USE_INTERNAL_AVRO_LIBRARY)
find_package(Snappy REQUIRED)
find_library(AVROCPP avrocpp)
elseif(NOT MISSING_INTERNAL_AVRO_LIBRARY)
include(cmake/find/snappy.cmake)
add_subdirectory(contrib/avro-cmake)
set(AVROCPP_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/avro/lang/c++/include")
set(AVROCPP_LIBRARY avrocpp_s)
endif ()
if (AVROCPP_LIBRARY AND AVROCPP_INCLUDE_DIR)
set(USE_AVRO 1)
endif()
# if (AVROCPP_LIBRARY AND AVROCPP_INCLUDE_DIR)
# set(USE_AVROCPP 1)
# elseif (Boost_INCLUDE_DIRS AND SNAPPY_LIBRARY)
# set(AVROCPP_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/avro/lang/c++/include")
# set(AVROCPP_LIBRARY avrocpp_s)
# set(USE_AVROCPP 1)
# else()
# set(USE_INTERNAL_AVROCPP_LIBRARY 0)
# message(STATUS "avro deps: ${Boost_INCLUDE_DIRS}; ${SNAPPY_LIBRARY}; ${ZLIB_LIBRARY}")
# endif()
endif()
message (STATUS "Using avro=${USE_AVRO}: ${AVROCPP_LIBRARY} ${AVROCPP_INCLUDE_DIR}")

View File

@ -31,6 +31,7 @@ if (NOT Boost_SYSTEM_LIBRARY AND NOT MISSING_INTERNAL_BOOST_LIBRARY)
set (Boost_SYSTEM_LIBRARY boost_system_internal) set (Boost_SYSTEM_LIBRARY boost_system_internal)
set (Boost_PROGRAM_OPTIONS_LIBRARY boost_program_options_internal) set (Boost_PROGRAM_OPTIONS_LIBRARY boost_program_options_internal)
set (Boost_FILESYSTEM_LIBRARY boost_filesystem_internal ${Boost_SYSTEM_LIBRARY}) set (Boost_FILESYSTEM_LIBRARY boost_filesystem_internal ${Boost_SYSTEM_LIBRARY})
set (Boost_IOSTREAMS_LIBRARY boost_iostreams_internal)
set (Boost_REGEX_LIBRARY boost_regex_internal) set (Boost_REGEX_LIBRARY boost_regex_internal)
set (Boost_INCLUDE_DIRS) set (Boost_INCLUDE_DIRS)
@ -48,4 +49,4 @@ if (NOT Boost_SYSTEM_LIBRARY AND NOT MISSING_INTERNAL_BOOST_LIBRARY)
list (APPEND Boost_INCLUDE_DIRS "${ClickHouse_SOURCE_DIR}/contrib/boost") list (APPEND Boost_INCLUDE_DIRS "${ClickHouse_SOURCE_DIR}/contrib/boost")
endif () endif ()
message (STATUS "Using Boost: ${Boost_INCLUDE_DIRS} : ${Boost_PROGRAM_OPTIONS_LIBRARY},${Boost_SYSTEM_LIBRARY},${Boost_FILESYSTEM_LIBRARY},${Boost_REGEX_LIBRARY}") message (STATUS "Using Boost: ${Boost_INCLUDE_DIRS} : ${Boost_PROGRAM_OPTIONS_LIBRARY},${Boost_SYSTEM_LIBRARY},${Boost_FILESYSTEM_LIBRARY},${Boost_IOSTREAMS_LIBRARY},${Boost_REGEX_LIBRARY}")

View File

@ -51,9 +51,11 @@ if (SANITIZE)
set (ENABLE_READLINE 0 CACHE BOOL "") set (ENABLE_READLINE 0 CACHE BOOL "")
set (ENABLE_ORC 0 CACHE BOOL "") set (ENABLE_ORC 0 CACHE BOOL "")
set (ENABLE_PARQUET 0 CACHE BOOL "") set (ENABLE_PARQUET 0 CACHE BOOL "")
set (ENABLE_AVRO 0 CACHE BOOL "")
set (USE_CAPNP 0 CACHE BOOL "") set (USE_CAPNP 0 CACHE BOOL "")
set (USE_INTERNAL_ORC_LIBRARY 0 CACHE BOOL "") set (USE_INTERNAL_ORC_LIBRARY 0 CACHE BOOL "")
set (USE_ORC 0 CACHE BOOL "") set (USE_ORC 0 CACHE BOOL "")
set (USE_AVRO 0 CACHE BOOL "")
set (ENABLE_SSL 0 CACHE BOOL "") set (ENABLE_SSL 0 CACHE BOOL "")
elseif (SANITIZE STREQUAL "thread") elseif (SANITIZE STREQUAL "thread")

View File

@ -212,6 +212,22 @@ else()
endif() endif()
endif() endif()
if (USE_INTERNAL_AVRO_LIBRARY)
if(USE_INTERNAL_SNAPPY_LIBRARY)
set(SNAPPY_BUILD_TESTS 0 CACHE INTERNAL "")
if (NOT MAKE_STATIC_LIBRARIES)
set(BUILD_SHARED_LIBS 1) # TODO: set at root dir
endif()
add_subdirectory(snappy)
set (SNAPPY_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/snappy")
if(SANITIZE STREQUAL "undefined")
target_compile_options(${SNAPPY_LIBRARY} PRIVATE -fno-sanitize=undefined)
endif()
endif()
endif()
if (USE_INTERNAL_POCO_LIBRARY) if (USE_INTERNAL_POCO_LIBRARY)
set (POCO_VERBOSE_MESSAGES 0 CACHE INTERNAL "") set (POCO_VERBOSE_MESSAGES 0 CACHE INTERNAL "")
set (save_CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS}) set (save_CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})

1
contrib/avro vendored Submodule

@ -0,0 +1 @@
Subproject commit 89218262cde62e98fcb3778b86cd3f03056c54f3

View File

@ -0,0 +1,76 @@
# project and source dir
set(AVROCPP_ROOT_DIR ${CMAKE_SOURCE_DIR}/contrib/avro/lang/c++)
set(AVROCPP_INCLUDE_DIR ${AVROCPP_ROOT_DIR}/api)
set(AVROCPP_SOURCE_DIR ${AVROCPP_ROOT_DIR}/impl)
#set(AVROCPP_COMMON_DIR ${HDFS3_SOURCE_DIR}/common)
if (EXISTS ${AVROCPP_ROOT_DIR}/../../share/VERSION.txt)
file(READ "${AVROCPP_ROOT_DIR}/../../share/VERSION.txt"
AVRO_VERSION)
endif()
string(REPLACE "\n" "" AVRO_VERSION ${AVRO_VERSION})
set (AVRO_VERSION_MAJOR ${AVRO_VERSION})
set (AVRO_VERSION_MINOR "0")
set (AVROCPP_SOURCE_FILES
${AVROCPP_SOURCE_DIR}/Compiler.cc
${AVROCPP_SOURCE_DIR}/Node.cc
${AVROCPP_SOURCE_DIR}/LogicalType.cc
${AVROCPP_SOURCE_DIR}/NodeImpl.cc
${AVROCPP_SOURCE_DIR}/ResolverSchema.cc
${AVROCPP_SOURCE_DIR}/Schema.cc
${AVROCPP_SOURCE_DIR}/Types.cc
${AVROCPP_SOURCE_DIR}/ValidSchema.cc
${AVROCPP_SOURCE_DIR}/Zigzag.cc
${AVROCPP_SOURCE_DIR}/BinaryEncoder.cc
${AVROCPP_SOURCE_DIR}/BinaryDecoder.cc
${AVROCPP_SOURCE_DIR}/Stream.cc
${AVROCPP_SOURCE_DIR}/FileStream.cc
${AVROCPP_SOURCE_DIR}/Generic.cc
${AVROCPP_SOURCE_DIR}/GenericDatum.cc
${AVROCPP_SOURCE_DIR}/DataFile.cc
${AVROCPP_SOURCE_DIR}/parsing/Symbol.cc
${AVROCPP_SOURCE_DIR}/parsing/ValidatingCodec.cc
${AVROCPP_SOURCE_DIR}/parsing/JsonCodec.cc
${AVROCPP_SOURCE_DIR}/parsing/ResolvingDecoder.cc
${AVROCPP_SOURCE_DIR}/json/JsonIO.cc
${AVROCPP_SOURCE_DIR}/json/JsonDom.cc
${AVROCPP_SOURCE_DIR}/Resolver.cc
${AVROCPP_SOURCE_DIR}/Validator.cc
)
add_definitions(-std=c++17 -fPIC)
add_library (avrocpp SHARED ${AVROCPP_SOURCE_FILES})
set_property (TARGET avrocpp
APPEND PROPERTY COMPILE_DEFINITIONS AVRO_DYN_LINK)
add_library (avrocpp_s STATIC ${AVROCPP_SOURCE_FILES})
set_property (TARGET avrocpp avrocpp_s
APPEND PROPERTY COMPILE_DEFINITIONS AVRO_SOURCE)
set_target_properties (avrocpp PROPERTIES
VERSION ${AVRO_VERSION_MAJOR}.${AVRO_VERSION_MINOR})
set_target_properties (avrocpp_s PROPERTIES
VERSION ${AVRO_VERSION_MAJOR}.${AVRO_VERSION_MINOR})
target_link_libraries (avrocpp ${Boost_IOSTREAMS_LIBRARY} ${SNAPPY_LIBRARY})
target_link_libraries (avrocpp_s ${Boost_IOSTREAMS_LIBRARY} ${SNAPPY_LIBRARY})
target_compile_definitions (avrocpp PUBLIC SNAPPY_CODEC_AVAILABLE)
target_compile_definitions (avrocpp_s PUBLIC SNAPPY_CODEC_AVAILABLE)
include_directories(${AVROCPP_INCLUDE_DIR})
include_directories(${Boost_INCLUDE_DIRS})
include_directories(${SNAPPY_INCLUDE_DIR})
ADD_CUSTOM_TARGET(symlink_headers ALL
COMMAND ${CMAKE_COMMAND} -E make_directory ${AVROCPP_ROOT_DIR}/include
COMMAND ${CMAKE_COMMAND} -E create_symlink ${AVROCPP_ROOT_DIR}/api ${AVROCPP_ROOT_DIR}/include/avro
)

2
contrib/boost vendored

@ -1 +1 @@
Subproject commit 830e51edb59c4f37a8638138581e1e56c29ac44f Subproject commit a2cfeb63eaf3b32cf233105b1a40f4a5f26b8495

View File

@ -37,3 +37,8 @@ target_link_libraries(boost_filesystem_internal PRIVATE boost_system_internal)
if (USE_INTERNAL_PARQUET_LIBRARY) if (USE_INTERNAL_PARQUET_LIBRARY)
add_boost_lib(regex) add_boost_lib(regex)
endif() endif()
if (USE_INTERNAL_AVRO_LIBRARY)
add_boost_lib(iostreams)
target_link_libraries(boost_iostreams_internal PUBLIC ${ZLIB_LIBRARIES})
endif()

View File

@ -483,6 +483,11 @@ if (USE_PARQUET)
endif () endif ()
endif () endif ()
if (USE_AVRO)
dbms_target_link_libraries(PRIVATE ${AVROCPP_LIBRARY})
dbms_target_include_directories (SYSTEM BEFORE PRIVATE ${AVROCPP_INCLUDE_DIR})
endif ()
if (OPENSSL_CRYPTO_LIBRARY) if (OPENSSL_CRYPTO_LIBRARY)
dbms_target_link_libraries (PRIVATE ${OPENSSL_CRYPTO_LIBRARY}) dbms_target_link_libraries (PRIVATE ${OPENSSL_CRYPTO_LIBRARY})
target_link_libraries (clickhouse_common_io PRIVATE ${OPENSSL_CRYPTO_LIBRARY}) target_link_libraries (clickhouse_common_io PRIVATE ${OPENSSL_CRYPTO_LIBRARY})

View File

@ -186,6 +186,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, input_format_values_interpret_expressions, true, "For Values format: if field could not be parsed by streaming parser, run SQL parser and try to interpret it as SQL expression.", 0) \ M(SettingBool, input_format_values_interpret_expressions, true, "For Values format: if field could not be parsed by streaming parser, run SQL parser and try to interpret it as SQL expression.", 0) \
M(SettingBool, input_format_values_deduce_templates_of_expressions, true, "For Values format: if field could not be parsed by streaming parser, run SQL parser, deduce template of the SQL expression, try to parse all rows using template and then interpret expression for all rows.", 0) \ M(SettingBool, input_format_values_deduce_templates_of_expressions, true, "For Values format: if field could not be parsed by streaming parser, run SQL parser, deduce template of the SQL expression, try to parse all rows using template and then interpret expression for all rows.", 0) \
M(SettingBool, input_format_values_accurate_types_of_literals, true, "For Values format: when parsing and interpreting expressions using template, check actual type of literal to avoid possible overflow and precision issues.", 0) \ M(SettingBool, input_format_values_accurate_types_of_literals, true, "For Values format: when parsing and interpreting expressions using template, check actual type of literal to avoid possible overflow and precision issues.", 0) \
M(SettingString, input_format_avro_schema_registry_url, "", "For AvroConfluent format: Confluent Schema Registry URL.", 0) \
\ \
M(SettingBool, output_format_json_quote_64bit_integers, true, "Controls quoting of 64-bit integers in JSON output format.", 0) \ M(SettingBool, output_format_json_quote_64bit_integers, true, "Controls quoting of 64-bit integers in JSON output format.", 0) \
\ \

View File

@ -68,6 +68,7 @@ static FormatSettings getInputFormatSetting(const Settings & settings, const Con
format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter; format_settings.custom.row_before_delimiter = settings.format_custom_row_before_delimiter;
format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter; format_settings.custom.row_after_delimiter = settings.format_custom_row_after_delimiter;
format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter; format_settings.custom.row_between_delimiter = settings.format_custom_row_between_delimiter;
format_settings.avro.schema_registry_url = settings.input_format_avro_schema_registry_url;
return format_settings; return format_settings;
} }
@ -325,6 +326,8 @@ FormatFactory::FormatFactory()
registerInputFormatProcessorORC(*this); registerInputFormatProcessorORC(*this);
registerInputFormatProcessorParquet(*this); registerInputFormatProcessorParquet(*this);
registerOutputFormatProcessorParquet(*this); registerOutputFormatProcessorParquet(*this);
registerInputFormatProcessorAvro(*this);
registerOutputFormatProcessorAvro(*this);
registerInputFormatProcessorTemplate(*this); registerInputFormatProcessorTemplate(*this);
registerOutputFormatProcessorTemplate(*this); registerOutputFormatProcessorTemplate(*this);

View File

@ -166,6 +166,8 @@ void registerInputFormatProcessorORC(FormatFactory & factory);
void registerOutputFormatProcessorParquet(FormatFactory & factory); void registerOutputFormatProcessorParquet(FormatFactory & factory);
void registerInputFormatProcessorProtobuf(FormatFactory & factory); void registerInputFormatProcessorProtobuf(FormatFactory & factory);
void registerOutputFormatProcessorProtobuf(FormatFactory & factory); void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
void registerInputFormatProcessorAvro(FormatFactory & factory);
void registerOutputFormatProcessorAvro(FormatFactory & factory);
void registerInputFormatProcessorTemplate(FormatFactory & factory); void registerInputFormatProcessorTemplate(FormatFactory & factory);
void registerOutputFormatProcessorTemplate(FormatFactory &factory); void registerOutputFormatProcessorTemplate(FormatFactory &factory);

View File

@ -110,6 +110,12 @@ struct FormatSettings
}; };
Custom custom; Custom custom;
struct Avro
{
String schema_registry_url;
} avro;
}; };
} }

View File

@ -2,6 +2,7 @@
// .h autogenerated by cmake! // .h autogenerated by cmake!
#cmakedefine01 USE_AVRO
#cmakedefine01 USE_CAPNP #cmakedefine01 USE_CAPNP
#cmakedefine01 USE_SNAPPY #cmakedefine01 USE_SNAPPY
#cmakedefine01 USE_PARQUET #cmakedefine01 USE_PARQUET

View File

@ -0,0 +1,620 @@
#include "AvroRowInputFormat.h"
#if USE_AVRO
#include <numeric>
#include <Core/Defines.h>
#include <Core/Field.h>
#include <IO/Operators.h>
#include <IO/ReadHelpers.h>
#include <Formats/verbosePrintString.h>
#include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/getLeastSupertype.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <avro/Compiler.hh>
#include <avro/DataFile.hh>
#include <avro/Decoder.hh>
#include <avro/Encoder.hh>
#include <avro/Generic.hh>
#include <avro/GenericDatum.hh>
#include <avro/Node.hh>
#include <avro/NodeConcepts.hh>
#include <avro/NodeImpl.hh>
#include <avro/Reader.hh>
#include <avro/Schema.hh>
#include <avro/Specific.hh>
#include <avro/ValidSchema.hh>
#include <avro/Writer.hh>
#include <Poco/BinaryReader.h>
#include <Poco/Buffer.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <Poco/MemoryStream.h>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Poco.h>
#include <Poco/URI.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_TYPE_OF_FIELD;
extern const int BAD_ARGUMENTS;
extern const int THERE_IS_NO_COLUMN;
extern const int LOGICAL_ERROR;
extern const int INCORRECT_DATA;
extern const int CANNOT_READ_ALL_DATA;
extern const int ILLEGAL_COLUMN;
extern const int TYPE_MISMATCH;
}
class InputStreamReadBufferAdapter : public avro::InputStream
{
public:
InputStreamReadBufferAdapter(ReadBuffer & in_) : in(in_) {}
bool next(const uint8_t ** data, size_t * len)
{
if (in.eof())
{
*len = 0;
return false;
}
*data = (const uint8_t *)in.position();
*len = in.available();
in.position() += in.available();
return true;
}
void backup(size_t len) { in.position() -= len; }
void skip(size_t len) { in.tryIgnore(len); }
size_t byteCount() const { return in.count(); }
private:
ReadBuffer & in;
};
static void deserializeNoop(IColumn &, avro::Decoder &)
{
}
AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type)
{
auto logical_type = root_node->logicalType().type();
WhichDataType target(target_type);
switch (root_node->type())
{
case avro::AVRO_STRING:
if (target.isString())
{
return [tmp = std::string()](IColumn & column, avro::Decoder & decoder) mutable {
decoder.decodeString(tmp);
column.insertData(tmp.c_str(), tmp.length());
};
}
case avro::AVRO_BYTES:
if (target.isString())
{
return [tmp = std::string()](IColumn & column, avro::Decoder & decoder) mutable {
decoder.decodeString(tmp);
column.insertData(tmp.c_str(), tmp.length());
};
}
break;
case avro::AVRO_INT:
if (target.isInt32())
{
return
[](IColumn & column, avro::Decoder & decoder) { assert_cast<ColumnInt32 &>(column).insertValue(decoder.decodeInt()); };
}
if (target.isDate() && logical_type == avro::LogicalType::DATE)
{
return [](IColumn & column, avro::Decoder & decoder) {
assert_cast<DataTypeDate::ColumnType &>(column).insertValue(decoder.decodeInt());
};
}
break;
case avro::AVRO_LONG:
if (target.isInt64())
{
return
[](IColumn & column, avro::Decoder & decoder) { assert_cast<ColumnInt64 &>(column).insertValue(decoder.decodeLong()); };
}
if (target.isDateTime64())
{
auto date_time_scale = assert_cast<const DataTypeDateTime64 &>(*target_type).getScale();
if ((logical_type == avro::LogicalType::TIMESTAMP_MILLIS && date_time_scale == 3)
|| (logical_type == avro::LogicalType::TIMESTAMP_MICROS && date_time_scale == 6))
{
return [](IColumn & column, avro::Decoder & decoder) {
assert_cast<DataTypeDateTime64::ColumnType &>(column).insertValue(decoder.decodeLong());
};
}
}
break;
case avro::AVRO_FLOAT:
if (target.isFloat32())
{
return [](IColumn & column, avro::Decoder & decoder) {
assert_cast<ColumnFloat32 &>(column).insertValue(decoder.decodeFloat());
};
}
break;
case avro::AVRO_DOUBLE:
if (target.isFloat64())
{
return [](IColumn & column, avro::Decoder & decoder) {
assert_cast<ColumnFloat64 &>(column).insertValue(decoder.decodeDouble());
};
}
break;
case avro::AVRO_BOOL:
if (target.isUInt8())
{
return
[](IColumn & column, avro::Decoder & decoder) { assert_cast<ColumnUInt8 &>(column).insertValue(decoder.decodeBool()); };
}
break;
case avro::AVRO_ARRAY: {
if (target.isArray())
{
auto nested_source_type = root_node->leafAt(0);
auto nested_target_type = assert_cast<const DataTypeArray &>(*target_type).getNestedType();
auto nested_deserialize = createDeserializeFn(nested_source_type, nested_target_type);
return [nested_deserialize](IColumn & column, avro::Decoder & decoder) {
ColumnArray & column_array = assert_cast<ColumnArray &>(column);
ColumnArray::Offsets & offsets = column_array.getOffsets();
IColumn & nested_column = column_array.getData();
size_t total = 0;
for (size_t n = decoder.arrayStart(); n != 0; n = decoder.arrayNext())
{
total += n;
for (size_t i = 0; i < n; i++)
{
nested_deserialize(nested_column, decoder);
}
}
offsets.push_back(offsets.back() + total);
};
}
break;
}
case avro::AVRO_UNION: {
auto nullable_deserializer = [root_node, target_type](size_t non_null_union_index) {
auto nested_deserialize = createDeserializeFn(root_node->leafAt(non_null_union_index), removeNullable(target_type));
return [non_null_union_index, nested_deserialize](IColumn & column, avro::Decoder & decoder) {
ColumnNullable & col = assert_cast<ColumnNullable &>(column);
size_t union_index = decoder.decodeUnionIndex();
if (union_index == non_null_union_index)
{
nested_deserialize(col.getNestedColumn(), decoder);
col.getNullMapData().push_back(0);
}
else
{
col.insertDefault();
}
};
};
if (root_node->leaves() == 2 && target.isNullable())
{
if (root_node->leafAt(0)->type() == avro::AVRO_NULL)
return nullable_deserializer(1);
if (root_node->leafAt(1)->type() == avro::AVRO_NULL)
return nullable_deserializer(0);
}
break;
}
case avro::AVRO_NULL:
if (target.isNullable())
{
auto nested_type = removeNullable(target_type);
if (nested_type->getTypeId() == TypeIndex::Nothing)
{
return [](IColumn & column, avro::Decoder & decoder) {
(void)column;
decoder.decodeNull();
};
}
else
{
return [](IColumn & column, avro::Decoder & decoder) {
ColumnNullable & col = assert_cast<ColumnNullable &>(column);
decoder.decodeNull();
col.insertDefault();
};
}
}
break;
case avro::AVRO_ENUM:
if (target.isString())
{
std::vector<std::string> symbols;
for (size_t i = 0; i < root_node->names(); i++)
{
symbols.push_back(root_node->nameAt(i));
}
return [symbols](IColumn & column, avro::Decoder & decoder) {
size_t enum_index = decoder.decodeEnum();
const auto & enum_symbol = symbols[enum_index];
column.insertData(enum_symbol.c_str(), enum_symbol.length());
};
}
if (target.isEnum())
{
const auto & enum_type = assert_cast<const IDataTypeEnum &>(*target_type);
std::vector<Field> symbol_mapping;
for (size_t i = 0; i < root_node->names(); i++)
{
symbol_mapping.push_back(enum_type.castToValue(root_node->nameAt(i)));
}
return [symbol_mapping](IColumn & column, avro::Decoder & decoder) {
size_t enum_index = decoder.decodeEnum();
column.insert(symbol_mapping[enum_index]);
};
}
break;
case avro::AVRO_FIXED: {
size_t fixed_size = root_node->fixedSize();
if (target.isFixedString() && target_type->getSizeOfValueInMemory() == fixed_size)
{
return [tmp_fixed = std::vector<uint8_t>(fixed_size)](IColumn & column, avro::Decoder & decoder) mutable {
decoder.decodeFixed(tmp_fixed.size(), tmp_fixed);
column.insertData(reinterpret_cast<const char *>(tmp_fixed.data()), tmp_fixed.size());
};
}
break;
}
case avro::AVRO_MAP:
case avro::AVRO_RECORD:
default:
break;
}
throw Exception(
"Type " + target_type->getName() + " is not compatible" + " with Avro " + avro::ValidSchema(root_node).toJson(false),
ErrorCodes::ILLEGAL_COLUMN);
}
AvroDeserializer::SkipFn AvroDeserializer::createSkipFn(avro::NodePtr root_node)
{
switch (root_node->type())
{
case avro::AVRO_STRING:
return [](avro::Decoder & decoder) { decoder.skipString(); };
case avro::AVRO_BYTES:
return [](avro::Decoder & decoder) { decoder.skipBytes(); };
case avro::AVRO_INT:
return [](avro::Decoder & decoder) { decoder.decodeInt(); };
case avro::AVRO_LONG:
return [](avro::Decoder & decoder) { decoder.decodeLong(); };
case avro::AVRO_FLOAT:
return [](avro::Decoder & decoder) { decoder.decodeFloat(); };
case avro::AVRO_DOUBLE:
return [](avro::Decoder & decoder) { decoder.decodeDouble(); };
case avro::AVRO_BOOL:
return [](avro::Decoder & decoder) { decoder.decodeBool(); };
case avro::AVRO_ARRAY: {
auto nested_skip_fn = createSkipFn(root_node->leafAt(0));
return [nested_skip_fn](avro::Decoder & decoder) {
for (size_t n = decoder.arrayStart(); n != 0; n = decoder.arrayNext())
{
for (size_t i = 0; i < n; ++i)
{
nested_skip_fn(decoder);
}
}
};
}
case avro::AVRO_UNION: {
std::vector<SkipFn> union_skip_fns;
for (size_t i = 0; i < root_node->leaves(); i++)
{
union_skip_fns.push_back(createSkipFn(root_node->leafAt(i)));
}
return [union_skip_fns](avro::Decoder & decoder) { union_skip_fns[decoder.decodeUnionIndex()](decoder); };
}
case avro::AVRO_NULL:
return [](avro::Decoder & decoder) { decoder.decodeNull(); };
case avro::AVRO_ENUM:
return [](avro::Decoder & decoder) { decoder.decodeEnum(); };
case avro::AVRO_FIXED: {
auto fixed_size = root_node->fixedSize();
return [fixed_size](avro::Decoder & decoder) { decoder.skipFixed(fixed_size); };
}
case avro::AVRO_MAP: {
auto value_skip_fn = createSkipFn(root_node->leafAt(1));
return [value_skip_fn](avro::Decoder & decoder) {
for (size_t n = decoder.mapStart(); n != 0; n = decoder.mapNext())
{
for (size_t i = 0; i < n; ++i)
{
decoder.skipString();
value_skip_fn(decoder);
}
}
};
}
case avro::AVRO_RECORD: {
std::vector<SkipFn> field_skip_fns;
for (size_t i = 0; i < root_node->leaves(); i++)
{
field_skip_fns.push_back(createSkipFn(root_node->leafAt(i)));
}
return [field_skip_fns](avro::Decoder & decoder) {
for (auto & skip_fn : field_skip_fns)
skip_fn(decoder);
};
}
default:
throw Exception("Unsupported Avro type", ErrorCodes::ILLEGAL_COLUMN);
}
}
AvroDeserializer::AvroDeserializer(const DB::ColumnsWithTypeAndName & columns, avro::ValidSchema schema)
{
auto schema_root = schema.root();
if (schema_root->type() != avro::AVRO_RECORD)
{
throw Exception("Root schema must be a record", ErrorCodes::TYPE_MISMATCH);
}
field_mapping.resize(schema_root->leaves(), -1);
for (size_t i = 0; i < schema_root->leaves(); ++i)
{
skip_fns.push_back(createSkipFn(schema_root->leafAt(i)));
deserialize_fns.push_back(&deserializeNoop);
}
for (size_t i = 0; i < columns.size(); ++i)
{
const auto & column = columns[i];
size_t field_index;
if (!schema_root->nameIndex(column.name, field_index))
{
throw Exception("Field " + column.name + " not found in Avro schema", ErrorCodes::THERE_IS_NO_COLUMN);
}
auto field_schema = schema_root->leafAt(field_index);
try
{
deserialize_fns[field_index] = createDeserializeFn(field_schema, column.type);
}
catch (Exception & e)
{
e.addMessage("column " + column.name);
e.rethrow();
}
field_mapping[field_index] = i;
}
}
void AvroDeserializer::deserializeRow(MutableColumns & columns, avro::Decoder & decoder)
{
for (size_t i = 0; i < field_mapping.size(); i++)
{
if (field_mapping[i] >= 0)
{
deserialize_fns[i](*columns[field_mapping[i]], decoder);
}
else
{
skip_fns[i](decoder);
}
}
}
AvroRowInputFormat::AvroRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_)
: IRowInputFormat(header_, in_, params_)
, file_reader(std::make_unique<InputStreamReadBufferAdapter>(in_))
, deserializer(header_.getColumnsWithTypeAndName(), file_reader.dataSchema())
{
file_reader.init();
}
bool AvroRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
if (file_reader.hasMore())
{
file_reader.decr();
deserializer.deserializeRow(columns, file_reader.decoder());
return true;
}
return false;
}
class AvroConfluentRowInputFormat::SchemaRegistry
{
public:
SchemaRegistry(const std::string & base_url_)
{
if (base_url_.empty())
{
throw Exception("Empty Schema Registry URL", ErrorCodes::BAD_ARGUMENTS);
}
try
{
base_url = base_url_;
}
catch (Poco::SyntaxException & e)
{
throw Exception("Invalid Schema Registry URL", Exception(Exception::CreateFromPoco, e), ErrorCodes::BAD_ARGUMENTS);
}
}
avro::ValidSchema getSchema(uint32_t id)
{
try
{
try
{
Poco::URI url(base_url, "/schemas/ids/" + std::to_string(id));
Poco::Net::HTTPClientSession session(url.getHost(), url.getPort());
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, url.getPathAndQuery());
session.sendRequest(request);
Poco::Net::HTTPResponse response;
auto & response_body = session.receiveResponse(response);
if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK)
{
throw Exception("http code " + std::to_string(response.getStatus()), ErrorCodes::INCORRECT_DATA);
}
Poco::JSON::Parser parser;
auto json_body = parser.parse(response_body).extract<Poco::JSON::Object::Ptr>();
auto schema = json_body->getValue<std::string>("schema");
return avro::compileJsonSchemaFromString(schema);
}
catch (const Exception & e)
{
throw e;
}
catch (const Poco::Exception & e)
{
throw Exception(Exception::CreateFromPoco, e);
}
catch (const avro::Exception & e)
{
throw Exception(e.what(), ErrorCodes::INCORRECT_DATA);
}
}
catch (Exception & e)
{
e.addMessage("while fetching schema id=" + std::to_string(id));
throw;
}
}
private:
Poco::URI base_url;
};
static uint32_t readConfluentSchemaId(ReadBuffer & in)
{
Poco::Buffer<char> buf(5);
in.readStrict(buf.begin(), buf.capacity());
Poco::MemoryBinaryReader binary_reader(buf, Poco::BinaryReader::BIG_ENDIAN_BYTE_ORDER);
uint8_t magic;
uint32_t schema_id;
binary_reader >> magic >> schema_id;
if (magic != 0x00)
{
throw Exception("Invalid magic byte", ErrorCodes::INCORRECT_DATA);
}
return schema_id;
}
AvroConfluentRowInputFormat::AvroConfluentRowInputFormat(
const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_)
: IRowInputFormat(header_.cloneEmpty(), in_, params_)
, columns(header_.getColumnsWithTypeAndName())
, schema_registry(std::make_unique<SchemaRegistry>(format_settings_.avro.schema_registry_url))
, input_stream(std::make_unique<InputStreamReadBufferAdapter>(in))
, decoder(avro::binaryDecoder())
{
(void)format_settings_;
decoder->init(*input_stream);
}
bool AvroConfluentRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &)
{
if (in.eof())
{
return false;
}
SchemaId schema_id = readConfluentSchemaId(in);
auto & deserializer = getOrCreateDeserializer(schema_id);
deserializer.deserializeRow(columns, *decoder);
decoder->drain();
return true;
}
AvroDeserializer & AvroConfluentRowInputFormat::getOrCreateDeserializer(SchemaId schema_id)
{
auto it = deserializer_cache.find(schema_id);
if (it == deserializer_cache.end())
{
auto schema = schema_registry->getSchema(schema_id);
AvroDeserializer deserializer(columns, schema);
it = deserializer_cache.emplace(schema_id, deserializer).first;
}
return it->second;
}
void registerInputFormatProcessorAvro(FormatFactory & factory)
{
factory.registerInputFormatProcessor(
"Avro",
[=](ReadBuffer & buf,
const Block & sample,
const Context & context,
const RowInputFormatParams & params,
const FormatSettings & settings) {
(void)(params);
(void)(settings);
(void)(context);
return std::make_shared<AvroRowInputFormat>(sample, buf, params);
});
factory.registerInputFormatProcessor(
"AvroConfluent",
[=](ReadBuffer & buf,
const Block & sample,
const Context & context,
const RowInputFormatParams & params,
const FormatSettings & settings) {
(void)(params);
(void)(settings);
(void)(context);
return std::make_shared<AvroConfluentRowInputFormat>(sample, buf, params, settings);
});
}
}
#else
namespace DB
{
class FormatFactory;
void registerInputFormatProcessorAvro(FormatFactory &)
{
}
}
#endif

View File

@ -0,0 +1,70 @@
#pragma once
#include "config_formats.h"
#if USE_AVRO
#include <unordered_map>
#include <vector>
#include <Core/Block.h>
#include <Formats/FormatSchemaInfo.h>
#include <Processors/Formats/IRowInputFormat.h>
#include <avro/DataFile.hh>
#include <avro/Decoder.hh>
#include <avro/Schema.hh>
#include <avro/ValidSchema.hh>
namespace DB
{
class AvroDeserializer
{
public:
AvroDeserializer(const DB::ColumnsWithTypeAndName & columns, avro::ValidSchema schema);
void deserializeRow(MutableColumns & columns, avro::Decoder & decoder);
private:
using DeserializeFn = std::function<void(IColumn & column, avro::Decoder & decoder)>;
using SkipFn = std::function<void(avro::Decoder & decoder)>;
static DeserializeFn createDeserializeFn(avro::NodePtr root_node, DataTypePtr target_type);
static SkipFn createSkipFn(avro::NodePtr root_node);
std::vector<int> field_mapping;
std::vector<SkipFn> skip_fns;
std::vector<DeserializeFn> deserialize_fns;
};
class AvroRowInputFormat : public IRowInputFormat
{
public:
AvroRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_);
virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
String getName() const override { return "AvroRowInputFormat"; }
private:
avro::DataFileReaderBase file_reader;
AvroDeserializer deserializer;
};
class AvroConfluentRowInputFormat : public IRowInputFormat
{
public:
AvroConfluentRowInputFormat(const Block & header_, ReadBuffer & in_, Params params_, const FormatSettings & format_settings_);
virtual bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
String getName() const override { return "AvroConfluentRowInputFormat"; }
private:
const DB::ColumnsWithTypeAndName columns;
class SchemaRegistry;
std::unique_ptr<SchemaRegistry> schema_registry;
using SchemaId = uint32_t;
std::unordered_map<SchemaId, AvroDeserializer> deserializer_cache;
AvroDeserializer & getOrCreateDeserializer(SchemaId schema_id);
avro::InputStreamPtr input_stream;
avro::DecoderPtr decoder;
};
}
#endif

View File

@ -0,0 +1,326 @@
#include "AvroRowOutputFormat.h"
#if USE_AVRO
#include <Core/Defines.h>
#include <Core/Field.h>
#include <IO/Operators.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
#include <Formats/verbosePrintString.h>
#include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <avro/Compiler.hh>
#include <avro/DataFile.hh>
#include <avro/Decoder.hh>
#include <avro/Encoder.hh>
#include <avro/Generic.hh>
#include <avro/GenericDatum.hh>
#include <avro/Node.hh>
#include <avro/NodeConcepts.hh>
#include <avro/NodeImpl.hh>
#include <avro/Reader.hh>
#include <avro/Schema.hh>
#include <avro/Specific.hh>
#include <avro/ValidSchema.hh>
#include <avro/Writer.hh>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_TYPE_OF_FIELD;
extern const int BAD_ARGUMENTS;
extern const int THERE_IS_NO_COLUMN;
extern const int LOGICAL_ERROR;
extern const int INCORRECT_DATA;
extern const int CANNOT_READ_ALL_DATA;
}
class OutputStreamWriteBufferAdapter : public avro::OutputStream
{
public:
OutputStreamWriteBufferAdapter(WriteBuffer & out_) : out(out_) {}
virtual bool next(uint8_t ** data, size_t * len) override
{
out.nextIfAtEnd();
*data = (uint8_t *)out.position();
*len = out.available();
out.position() += out.available();
return true;
}
virtual void backup(size_t len) override { out.position() -= len; }
virtual uint64_t byteCount() const override { return out.count(); }
virtual void flush() override { out.next(); }
private:
WriteBuffer & out;
};
AvroSerializer::SchemaWithSerializeFn AvroSerializer::createSchemaWithSerializeFn(DataTypePtr data_type)
{
switch (data_type->getTypeId())
{
case TypeIndex::UInt8:
return {avro::BoolSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder) {
encoder.encodeBool(assert_cast<const ColumnUInt8 &>(column).getElement(row_num));
}};
case TypeIndex::Int32:
return {avro::IntSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder) {
encoder.encodeInt(assert_cast<const ColumnInt32 &>(column).getElement(row_num));
}};
case TypeIndex::Int64:
return {avro::LongSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder) {
encoder.encodeLong(assert_cast<const ColumnInt64 &>(column).getElement(row_num));
}};
case TypeIndex::Float32:
return {avro::FloatSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder) {
encoder.encodeFloat(assert_cast<const ColumnFloat32 &>(column).getElement(row_num));
}};
case TypeIndex::Float64:
return {avro::DoubleSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder) {
encoder.encodeDouble(assert_cast<const ColumnFloat64 &>(column).getElement(row_num));
}};
case TypeIndex::Date: {
auto schema = avro::IntSchema();
schema.root()->setLogicalType(avro::LogicalType(avro::LogicalType::DATE));
return {schema, [](const IColumn & column, size_t row_num, avro::Encoder & encoder) {
UInt16 date = assert_cast<const DataTypeDate::ColumnType &>(column).getElement(row_num);
encoder.encodeInt(date);
}};
}
case TypeIndex::DateTime:
throw Exception("Unsupported Avro type", ErrorCodes::BAD_TYPE_OF_FIELD);
case TypeIndex::DateTime64: {
auto schema = avro::LongSchema();
const auto & provided_type = assert_cast<const DataTypeDateTime64 &>(*data_type);
if (provided_type.getScale() == 3)
schema.root()->setLogicalType(avro::LogicalType(avro::LogicalType::TIMESTAMP_MILLIS));
else if (provided_type.getScale() == 6)
schema.root()->setLogicalType(avro::LogicalType(avro::LogicalType::TIMESTAMP_MICROS));
else
throw Exception("Unsupported Avro type", ErrorCodes::BAD_TYPE_OF_FIELD);
return {schema, [](const IColumn & column, size_t row_num, avro::Encoder & encoder) {
const auto & col = assert_cast<const DataTypeDateTime64::ColumnType &>(column);
encoder.encodeLong(col.getElement(row_num));
}};
}
case TypeIndex::String:
return {avro::StringSchema(), [](const IColumn & column, size_t row_num, avro::Encoder & encoder) {
const StringRef & s = assert_cast<const ColumnString &>(column).getDataAt(row_num);
encoder.encodeBytes(reinterpret_cast<const uint8_t *>(s.data), s.size);
}};
case TypeIndex::FixedString: {
return {avro::FixedSchema(data_type->getSizeOfValueInMemory(), "fixed"),
[](const IColumn & column, size_t row_num, avro::Encoder & encoder) {
const StringRef & s = assert_cast<const ColumnFixedString &>(column).getDataAt(row_num);
encoder.encodeFixed(reinterpret_cast<const uint8_t *>(s.data), s.size);
}};
}
case TypeIndex::Enum8: {
auto schema = avro::EnumSchema("enum");
std::unordered_map<DataTypeEnum8::FieldType, size_t> enum_mapping;
const auto & enum_values = assert_cast<const DataTypeEnum8 &>(*data_type).getValues();
for (size_t i = 0; i < enum_values.size(); ++i)
{
schema.addSymbol(enum_values[i].first);
enum_mapping.emplace(enum_values[i].second, i);
}
return {schema, [enum_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder) {
auto enum_value = assert_cast<const DataTypeEnum8::ColumnType &>(column).getElement(row_num);
encoder.encodeEnum(enum_mapping.at(enum_value));
}};
}
case TypeIndex::Enum16: {
auto schema = avro::EnumSchema("enum");
std::unordered_map<DataTypeEnum16::FieldType, size_t> enum_mapping;
const auto & enum_values = assert_cast<const DataTypeEnum16 &>(*data_type).getValues();
for (size_t i = 0; i < enum_values.size(); ++i)
{
schema.addSymbol(enum_values[i].first);
enum_mapping.emplace(enum_values[i].second, i);
}
return {schema, [enum_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder) {
auto enum_value = assert_cast<const DataTypeEnum16::ColumnType &>(column).getElement(row_num);
encoder.encodeEnum(enum_mapping.at(enum_value));
}};
}
case TypeIndex::Array: {
const auto & array_type = assert_cast<const DataTypeArray &>(*data_type);
auto nested_mapping = createSchemaWithSerializeFn(array_type.getNestedType());
return {avro::ArraySchema(nested_mapping.schema),
[nested_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder) {
const ColumnArray & column_array = assert_cast<const ColumnArray &>(column);
const ColumnArray::Offsets & offsets = column_array.getOffsets();
size_t offset = offsets[row_num - 1];
size_t next_offset = offsets[row_num];
size_t row_count = next_offset - offset;
const IColumn & nested_column = column_array.getData();
encoder.arrayStart();
if (row_count > 0)
{
encoder.setItemCount(row_count);
}
for (size_t i = offset; i < next_offset; ++i)
{
nested_mapping.serialize(nested_column, i, encoder);
}
encoder.arrayEnd();
}};
}
case TypeIndex::Nullable: {
auto nested_type = removeNullable(data_type);
auto nested_mapping = createSchemaWithSerializeFn(nested_type);
if (nested_type->getTypeId() == TypeIndex::Nothing)
{
return nested_mapping;
}
else
{
avro::UnionSchema union_schema;
union_schema.addType(avro::NullSchema());
union_schema.addType(nested_mapping.schema);
return {union_schema, [nested_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder) {
const ColumnNullable & col = assert_cast<const ColumnNullable &>(column);
if (!col.isNullAt(row_num))
{
encoder.encodeUnionIndex(1);
nested_mapping.serialize(col.getNestedColumn(), row_num, encoder);
}
else
{
encoder.encodeUnionIndex(0);
encoder.encodeNull();
}
}};
}
}
case TypeIndex::LowCardinality: {
const auto & nested_type = removeLowCardinality(data_type);
auto nested_mapping = createSchemaWithSerializeFn(nested_type);
return {nested_mapping.schema, [nested_mapping](const IColumn & column, size_t row_num, avro::Encoder & encoder) {
const auto & col = assert_cast<const ColumnLowCardinality &>(column);
nested_mapping.serialize(*col.getDictionary().getNestedColumn(), col.getIndexAt(row_num), encoder);
}};
}
case TypeIndex::Nothing:
return {avro::NullSchema(), [](const IColumn &, size_t, avro::Encoder & encoder) { encoder.encodeNull(); }};
default:
break;
}
throw Exception("Type " + data_type->getName() + " is not supported for Avro output", ErrorCodes::ILLEGAL_COLUMN);
}
AvroSerializer::AvroSerializer(const ColumnsWithTypeAndName & columns)
{
avro::RecordSchema record_schema("row");
for (auto & column : columns)
{
try
{
auto field_mapping = createSchemaWithSerializeFn(column.type);
serialize_fns.push_back(field_mapping.serialize);
//TODO: verify name starts with A-Za-z_
record_schema.addField(column.name, field_mapping.schema);
}
catch (Exception & e)
{
e.addMessage("column " + column.name);
e.rethrow();
}
}
schema.setSchema(record_schema);
}
void AvroSerializer::serializeRow(const Columns & columns, size_t row_num, avro::Encoder & encoder)
{
size_t num_columns = columns.size();
for (size_t i = 0; i < num_columns; ++i)
{
serialize_fns[i](*columns[i], row_num, encoder);
}
}
AvroRowOutputFormat::AvroRowOutputFormat(
WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_)
: IRowOutputFormat(header_, out_, callback)
, settings(settings_)
, serializer(header_.getColumnsWithTypeAndName())
, file_writer(std::make_unique<OutputStreamWriteBufferAdapter>(out_), serializer.getSchema(), 16 * 1024, avro::Codec::SNAPPY_CODEC)
{
}
AvroRowOutputFormat::~AvroRowOutputFormat() = default;
void AvroRowOutputFormat::writePrefix()
{
file_writer.syncIfNeeded();
}
void AvroRowOutputFormat::write(const Columns & columns, size_t row_num)
{
file_writer.syncIfNeeded();
serializer.serializeRow(columns, row_num, file_writer.encoder());
file_writer.incr();
}
void AvroRowOutputFormat::writeSuffix()
{
file_writer.close();
}
void registerOutputFormatProcessorAvro(FormatFactory & factory)
{
factory.registerOutputFormatProcessor(
"Avro",
[=](WriteBuffer & buf,
const Block & sample,
const Context & context,
FormatFactory::WriteCallback callback,
const FormatSettings & settings) {
(void)(context);
(void)(callback);
return std::make_shared<AvroRowOutputFormat>(buf, sample, callback, settings);
});
}
}
#else
namespace DB
{
class FormatFactory;
void registerOutputFormatProcessorAvro(FormatFactory &)
{
}
}
#endif

View File

@ -0,0 +1,60 @@
#pragma once
#include "config_formats.h"
#if USE_AVRO
#include <unordered_map>
#include <Core/Block.h>
#include <Formats/FormatSchemaInfo.h>
#include <Formats/FormatSettings.h>
#include <IO/WriteBuffer.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <avro/DataFile.hh>
#include <avro/Schema.hh>
#include <avro/ValidSchema.hh>
namespace DB
{
class WriteBuffer;
class AvroSerializer
{
public:
AvroSerializer(const ColumnsWithTypeAndName & columns);
const avro::ValidSchema & getSchema() const { return schema; }
void serializeRow(const Columns & columns, size_t row_num, avro::Encoder & encoder);
private:
using SerializeFn = std::function<void(const IColumn & column, size_t row_num, avro::Encoder & encoder)>;
struct SchemaWithSerializeFn
{
avro::Schema schema;
SerializeFn serialize;
};
static SchemaWithSerializeFn createSchemaWithSerializeFn(DataTypePtr data_type);
std::vector<SerializeFn> serialize_fns;
avro::ValidSchema schema;
};
class AvroRowOutputFormat : public IRowOutputFormat
{
public:
AvroRowOutputFormat(WriteBuffer & out_, const Block & header_, FormatFactory::WriteCallback callback, const FormatSettings & settings_);
virtual ~AvroRowOutputFormat() override;
String getName() const override { return "AvroRowOutputFormat"; }
void write(const Columns & columns, size_t row_num) override;
void writeField(const IColumn &, const IDataType &, size_t) override {}
virtual void writePrefix() override;
virtual void writeSuffix() override;
private:
FormatSettings settings;
AvroSerializer serializer;
avro::DataFileWriterBase file_writer;
};
}
#endif