diff --git a/.gitmodules b/.gitmodules index 847abf7d931..f6990fed41f 100644 --- a/.gitmodules +++ b/.gitmodules @@ -103,3 +103,6 @@ [submodule "contrib/fastops"] path = contrib/fastops url = https://github.com/ClickHouse-Extras/fastops +[submodule "contrib/orc"] + path = contrib/orc + url = https://github.com/apache/orc diff --git a/CMakeLists.txt b/CMakeLists.txt index 8466fa5d33d..6ac4d67f6ae 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -481,6 +481,7 @@ include (cmake/find_mimalloc.cmake) include (cmake/find_simdjson.cmake) include (cmake/find_rapidjson.cmake) include (cmake/find_fastops.cmake) +include (cmake/find_orc.cmake) find_contrib_lib(cityhash) find_contrib_lib(farmhash) diff --git a/cmake/find_orc.cmake b/cmake/find_orc.cmake new file mode 100644 index 00000000000..3676bec1b6b --- /dev/null +++ b/cmake/find_orc.cmake @@ -0,0 +1,8 @@ +##TODO replace hardcode to find procedure + +set(USE_ORC 0) +set(USE_INTERNAL_ORC_LIBRARY ON) + +if (ARROW_LIBRARY) + set(USE_ORC 1) +endif() \ No newline at end of file diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index c478311d77a..e652c393141 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -10,6 +10,18 @@ endif () set_property(DIRECTORY PROPERTY EXCLUDE_FROM_ALL 1) +if (USE_INTERNAL_ORC_LIBRARY) + set(BUILD_JAVA OFF) + set (ANALYZE_JAVA OFF) + set (BUILD_CPP_TESTS OFF) + set (BUILD_TOOLS OFF) + option(BUILD_JAVA OFF) + option (ANALYZE_JAVA OFF) + option (BUILD_CPP_TESTS OFF) + option (BUILD_TOOLS OFF) + set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/contrib/orc/cmake_modules") + add_subdirectory(orc) +endif() if (USE_INTERNAL_UNWIND_LIBRARY) add_subdirectory (libunwind-cmake) diff --git a/contrib/arrow-cmake/CMakeLists.txt b/contrib/arrow-cmake/CMakeLists.txt index a7b6628ea4e..7b94acc9031 100644 --- a/contrib/arrow-cmake/CMakeLists.txt +++ b/contrib/arrow-cmake/CMakeLists.txt @@ -47,6 +47,71 @@ target_include_directories(${THRIFT_LIBRARY} SYSTEM PUBLIC ${ClickHouse_SOURCE_D target_link_libraries(${THRIFT_LIBRARY} PRIVATE Threads::Threads) +# === orc + +set(ORC_SOURCE_DIR ${ClickHouse_SOURCE_DIR}/contrib/orc/c++) +set(ORC_INCLUDE_DIR ${ORC_SOURCE_DIR}/include) +set(ORC_SOURCE_SRC_DIR ${ORC_SOURCE_DIR}/src) +set(ORC_SOURCE_WRAP_DIR ${ORC_SOURCE_DIR}/wrap) + +set(ORC_BUILD_SRC_DIR ${CMAKE_CURRENT_BINARY_DIR}/../orc/c++/src) +set(ORC_BUILD_INCLUDE_DIR ${CMAKE_CURRENT_BINARY_DIR}/../orc/c++/include) + +set(GOOGLE_PROTOBUF_DIR ${ClickHouse_SOURCE_DIR}/contrib/protobuf/src/) +set(ORC_ADDITION_SOURCE_DIR ${CMAKE_CURRENT_BINARY_DIR}) +set(ARROW_SRC_DIR ${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src) + +set(PROTOBUF_EXECUTABLE ${CMAKE_CURRENT_BINARY_DIR}/../protobuf/cmake/protoc) +set(PROTO_DIR ${ORC_SOURCE_DIR}/../proto) + + +add_custom_command(OUTPUT orc_proto.pb.h orc_proto.pb.cc + COMMAND ${PROTOBUF_EXECUTABLE} + -I ${PROTO_DIR} + --cpp_out="${CMAKE_CURRENT_BINARY_DIR}" + "${PROTO_DIR}/orc_proto.proto") + +include_directories(SYSTEM ${ORC_INCLUDE_DIR}) +include_directories(SYSTEM ${ORC_SOURCE_SRC_DIR}) +include_directories(SYSTEM ${ORC_SOURCE_WRAP_DIR}) +include_directories(SYSTEM ${GOOGLE_PROTOBUF_DIR}) +include_directories(SYSTEM ${ORC_BUILD_SRC_DIR}) +include_directories(SYSTEM ${ORC_BUILD_INCLUDE_DIR}) +include_directories(SYSTEM ${ORC_ADDITION_SOURCE_DIR}) +include_directories(SYSTEM ${ARROW_SRC_DIR}) + + +set(ORC_SRCS + ${ARROW_SRC_DIR}/arrow/adapters/orc/adapter.cc + ${ORC_SOURCE_SRC_DIR}/Exceptions.cc + ${ORC_SOURCE_SRC_DIR}/OrcFile.cc + ${ORC_SOURCE_SRC_DIR}/Reader.cc + ${ORC_SOURCE_SRC_DIR}/ByteRLE.cc + ${ORC_SOURCE_SRC_DIR}/ColumnPrinter.cc + ${ORC_SOURCE_SRC_DIR}/ColumnReader.cc + ${ORC_SOURCE_SRC_DIR}/ColumnWriter.cc + ${ORC_SOURCE_SRC_DIR}/Common.cc + ${ORC_SOURCE_SRC_DIR}/Compression.cc + ${ORC_SOURCE_SRC_DIR}/Exceptions.cc + ${ORC_SOURCE_SRC_DIR}/Int128.cc + ${ORC_SOURCE_SRC_DIR}/LzoDecompressor.cc + ${ORC_SOURCE_SRC_DIR}/MemoryPool.cc + ${ORC_SOURCE_SRC_DIR}/OrcFile.cc + ${ORC_SOURCE_SRC_DIR}/Reader.cc + ${ORC_SOURCE_SRC_DIR}/RLE.cc + ${ORC_SOURCE_SRC_DIR}/RLEv1.cc + ${ORC_SOURCE_SRC_DIR}/RLEv2.cc + ${ORC_SOURCE_SRC_DIR}/Statistics.cc + ${ORC_SOURCE_SRC_DIR}/StripeStream.cc + ${ORC_SOURCE_SRC_DIR}/Timezone.cc + ${ORC_SOURCE_SRC_DIR}/TypeImpl.cc + ${ORC_SOURCE_SRC_DIR}/Vector.cc + ${ORC_SOURCE_SRC_DIR}/Writer.cc + ${ORC_SOURCE_SRC_DIR}/io/InputStream.cc + ${ORC_SOURCE_SRC_DIR}/io/OutputStream.cc + ${ORC_ADDITION_SOURCE_DIR}/orc_proto.pb.cc + ) + # === arrow @@ -103,6 +168,7 @@ set(ARROW_SRCS ${LIBRARY_DIR}/util/thread-pool.cc ${LIBRARY_DIR}/util/trie.cc ${LIBRARY_DIR}/util/utf8.cc + ${ORC_SRCS} ) set(ARROW_SRCS ${ARROW_SRCS} @@ -151,6 +217,7 @@ endif() add_library(${ARROW_LIBRARY} ${ARROW_SRCS}) +add_dependencies(${ARROW_LIBRARY} protoc) target_include_directories(${ARROW_LIBRARY} SYSTEM PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/cpp/src ${Boost_INCLUDE_DIRS}) target_link_libraries(${ARROW_LIBRARY} PRIVATE ${DOUBLE_CONVERSION_LIBRARIES} Threads::Threads) if (ARROW_WITH_LZ4) diff --git a/contrib/orc b/contrib/orc new file mode 160000 index 00000000000..5981208e394 --- /dev/null +++ b/contrib/orc @@ -0,0 +1 @@ +Subproject commit 5981208e39447df84827f6a961d1da76bacb6078 diff --git a/dbms/src/Formats/FormatFactory.cpp b/dbms/src/Formats/FormatFactory.cpp index 75f30e74761..491363e01b4 100644 --- a/dbms/src/Formats/FormatFactory.cpp +++ b/dbms/src/Formats/FormatFactory.cpp @@ -238,6 +238,7 @@ void registerOutputFormatProcessorTSKV(FormatFactory & factory); void registerInputFormatProcessorJSONEachRow(FormatFactory & factory); void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory); void registerInputFormatProcessorParquet(FormatFactory & factory); +void registerInputFormatProcessorORC(FormatFactory & factory); void registerOutputFormatProcessorParquet(FormatFactory & factory); void registerInputFormatProcessorProtobuf(FormatFactory & factory); void registerOutputFormatProcessorProtobuf(FormatFactory & factory); @@ -288,6 +289,7 @@ FormatFactory::FormatFactory() registerInputFormatProcessorProtobuf(*this); registerOutputFormatProcessorProtobuf(*this); registerInputFormatProcessorCapnProto(*this); + registerInputFormatProcessorORC(*this); registerInputFormatProcessorParquet(*this); registerOutputFormatProcessorParquet(*this); diff --git a/dbms/src/Formats/config_formats.h.in b/dbms/src/Formats/config_formats.h.in index 7837bed56d0..1ddd0e18aa9 100644 --- a/dbms/src/Formats/config_formats.h.in +++ b/dbms/src/Formats/config_formats.h.in @@ -5,4 +5,5 @@ #cmakedefine01 USE_CAPNP #cmakedefine01 USE_SNAPPY #cmakedefine01 USE_PARQUET +#cmakedefine01 USE_ORC #cmakedefine01 USE_PROTOBUF diff --git a/dbms/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp b/dbms/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp new file mode 100644 index 00000000000..24a144b10b5 --- /dev/null +++ b/dbms/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp @@ -0,0 +1,420 @@ +#include "config_formats.h" +#include "ArrowColumnToCHColumn.h" + +#if USE_ORC or USE_PARQUET +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + const std::unordered_map> arrow_type_to_internal_type ={ + //{arrow::Type::DECIMAL, std::make_shared()}, + {arrow::Type::UINT8, std::make_shared()}, + {arrow::Type::INT8, std::make_shared()}, + {arrow::Type::UINT16, std::make_shared()}, + {arrow::Type::INT16, std::make_shared()}, + {arrow::Type::UINT32, std::make_shared()}, + {arrow::Type::INT32, std::make_shared()}, + {arrow::Type::UINT64, std::make_shared()}, + {arrow::Type::INT64, std::make_shared()}, + {arrow::Type::HALF_FLOAT, std::make_shared()}, + {arrow::Type::FLOAT, std::make_shared()}, + {arrow::Type::DOUBLE, std::make_shared()}, + + {arrow::Type::BOOL, std::make_shared()}, + //{arrow::Type::DATE32, std::make_shared()}, + {arrow::Type::DATE32, std::make_shared()}, + //{arrow::Type::DATE32, std::make_shared()}, + {arrow::Type::DATE64, std::make_shared()}, + {arrow::Type::TIMESTAMP, std::make_shared()}, + //{arrow::Type::TIME32, std::make_shared()}, + + + {arrow::Type::STRING, std::make_shared()}, + {arrow::Type::BINARY, std::make_shared()}, + //{arrow::Type::FIXED_SIZE_BINARY, std::make_shared()}, + //{arrow::Type::UUID, std::make_shared()}, + + + // TODO: add other types that are convertable to internal ones: + // 0. ENUM? + // 1. UUID -> String + // 2. JSON -> String + // Full list of types: contrib/arrow/cpp/src/arrow/type.h + }; + + namespace ErrorCodes + { + extern const int UNKNOWN_TYPE; + extern const int VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE; + extern const int CANNOT_READ_ALL_DATA; + extern const int EMPTY_DATA_PASSED; + extern const int SIZES_OF_COLUMNS_DOESNT_MATCH; + extern const int CANNOT_CONVERT_TYPE; + extern const int CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN; + extern const int THERE_IS_NO_COLUMN; + } + + template> + static void + fillColumnWithNumericData(std::shared_ptr &arrow_column, MutableColumnPtr &internal_column) + { + auto &column_data = static_cast(*internal_column).getData(); + column_data.reserve(arrow_column->length()); + + for (size_t chunk_i = 0, num_chunks = static_cast(arrow_column->data()->num_chunks()); + chunk_i < num_chunks; ++chunk_i) + { + std::shared_ptr chunk = arrow_column->data()->chunk(chunk_i); + /// buffers[0] is a null bitmap and buffers[1] are actual values + std::shared_ptr buffer = chunk->data()->buffers[1]; + + const auto *raw_data = reinterpret_cast(buffer->data()); + column_data.insert_assume_reserved(raw_data, raw_data + chunk->length()); + } + } + + void fillColumnWithStringData(std::shared_ptr &arrow_column, MutableColumnPtr &internal_column) + { + PaddedPODArray &column_chars_t = static_cast(*internal_column).getChars(); + PaddedPODArray &column_offsets = static_cast(*internal_column).getOffsets(); + + size_t chars_t_size = 0; + for (size_t chunk_i = 0, num_chunks = static_cast(arrow_column->data()->num_chunks()); + chunk_i < num_chunks; ++chunk_i) + { + arrow::BinaryArray &chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); + const size_t chunk_length = chunk.length(); + + chars_t_size += chunk.value_offset(chunk_length - 1) + chunk.value_length(chunk_length - 1); + chars_t_size += chunk_length; /// additional space for null bytes + } + + column_chars_t.reserve(chars_t_size); + column_offsets.reserve(arrow_column->length()); + + for (size_t chunk_i = 0, num_chunks = static_cast(arrow_column->data()->num_chunks()); + chunk_i < num_chunks; ++chunk_i) + { + arrow::BinaryArray &chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); + std::shared_ptr buffer = chunk.value_data(); + const size_t chunk_length = chunk.length(); + + for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i) + { + if (!chunk.IsNull(offset_i) && buffer) + { + const UInt8 *raw_data = buffer->data() + chunk.value_offset(offset_i); + column_chars_t.insert_assume_reserved(raw_data, raw_data + chunk.value_length(offset_i)); + } + column_chars_t.emplace_back('\0'); + + column_offsets.emplace_back(column_chars_t.size()); + } + } + } + + void + fillColumnWithBooleanData(std::shared_ptr &arrow_column, MutableColumnPtr &internal_column) + { + auto &column_data = static_cast &>(*internal_column).getData(); + column_data.resize(arrow_column->length()); + + for (size_t chunk_i = 0, num_chunks = static_cast(arrow_column->data()->num_chunks()); + chunk_i < num_chunks; ++chunk_i) + { + arrow::BooleanArray &chunk = static_cast(*(arrow_column->data()->chunk( + chunk_i))); + /// buffers[0] is a null bitmap and buffers[1] are actual values + std::shared_ptr buffer = chunk.data()->buffers[1]; + + for (size_t bool_i = 0; bool_i != static_cast(chunk.length()); ++bool_i) + column_data[bool_i] = chunk.Value(bool_i); + } + } + + void + fillColumnWithDate32Data(std::shared_ptr &arrow_column, MutableColumnPtr &internal_column) + { + PaddedPODArray &column_data = static_cast &>(*internal_column).getData(); + column_data.reserve(arrow_column->length()); + + for (size_t chunk_i = 0, num_chunks = static_cast(arrow_column->data()->num_chunks()); + chunk_i < num_chunks; ++chunk_i) + { + arrow::Date32Array &chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); + + for (size_t value_i = 0, length = static_cast(chunk.length()); value_i < length; ++value_i) + { + UInt32 days_num = static_cast(chunk.Value(value_i)); + if (days_num > DATE_LUT_MAX_DAY_NUM) + { + // TODO: will it rollback correctly? + throw Exception + { + "Input value " + std::to_string(days_num) + " of a column \"" + arrow_column->name() + + "\" is greater than " + "max allowed Date value, which is " + + std::to_string(DATE_LUT_MAX_DAY_NUM), + ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE}; + } + + column_data.emplace_back(days_num); + } + } + } + + void + fillColumnWithDate64Data(std::shared_ptr &arrow_column, MutableColumnPtr &internal_column) + { + auto &column_data = static_cast &>(*internal_column).getData(); + column_data.reserve(arrow_column->length()); + + for (size_t chunk_i = 0, num_chunks = static_cast(arrow_column->data()->num_chunks()); + chunk_i < num_chunks; ++chunk_i) + { + auto &chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); + for (size_t value_i = 0, length = static_cast(chunk.length()); value_i < length; ++value_i) + { + auto timestamp = static_cast(chunk.Value(value_i) / 1000); // Always? in ms + column_data.emplace_back(timestamp); + } + } + } + + void + fillColumnWithTimestampData(std::shared_ptr &arrow_column, MutableColumnPtr &internal_column) + { + auto &column_data = static_cast &>(*internal_column).getData(); + column_data.reserve(arrow_column->length()); + + for (size_t chunk_i = 0, num_chunks = static_cast(arrow_column->data()->num_chunks()); + chunk_i < num_chunks; ++chunk_i) + { + auto &chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); + const auto &type = static_cast(*chunk.type()); + + UInt32 divide = 1; + const auto unit = type.unit(); + switch (unit) + { + case arrow::TimeUnit::SECOND: + divide = 1; + break; + case arrow::TimeUnit::MILLI: + divide = 1000; + break; + case arrow::TimeUnit::MICRO: + divide = 1000000; + break; + case arrow::TimeUnit::NANO: + divide = 1000000000; + break; + } + + for (size_t value_i = 0, length = static_cast(chunk.length()); value_i < length; ++value_i) + { + auto timestamp = static_cast(chunk.Value(value_i) / + divide); // ms! TODO: check other 's' 'ns' ... + column_data.emplace_back(timestamp); + } + } + } + + void + fillColumnWithDecimalData(std::shared_ptr &arrow_column, MutableColumnPtr &internal_column) + { + auto &column = static_cast &>(*internal_column); + auto &column_data = column.getData(); + column_data.reserve(arrow_column->length()); + + for (size_t chunk_i = 0, num_chunks = static_cast(arrow_column->data()->num_chunks()); + chunk_i < num_chunks; ++chunk_i) + { + auto &chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); + for (size_t value_i = 0, length = static_cast(chunk.length()); value_i < length; ++value_i) + { + column_data.emplace_back( + chunk.IsNull(value_i) ? Decimal128(0) : *reinterpret_cast(chunk.Value( + value_i))); // TODO: copy column + } + } + } + +/// Creates a null bytemap from arrow's null bitmap + void + fillByteMapFromArrowColumn(std::shared_ptr &arrow_column, MutableColumnPtr &bytemap) + { + PaddedPODArray &bytemap_data = static_cast &>(*bytemap).getData(); + bytemap_data.reserve(arrow_column->length()); + + for (size_t chunk_i = 0; chunk_i != static_cast(arrow_column->data()->num_chunks()); ++chunk_i) + { + std::shared_ptr chunk = arrow_column->data()->chunk(chunk_i); + + for (size_t value_i = 0; value_i != static_cast(chunk->length()); ++value_i) + bytemap_data.emplace_back(chunk->IsNull(value_i)); + } + } + + void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk &res, std::shared_ptr &table, + arrow::Status &read_status, const Block &header, + int &row_group_current, const Context &context) + { + Columns columns_list; + UInt64 num_rows = 0; + + columns_list.reserve(header.rows()); + + using NameToColumnPtr = std::unordered_map>; + if (!read_status.ok()) + throw Exception{"Error while reading ORC data: " + read_status.ToString(), + ErrorCodes::CANNOT_READ_ALL_DATA}; + + if (0 == table->num_rows()) + throw Exception{"Empty table in input data", ErrorCodes::EMPTY_DATA_PASSED}; + + if (header.columns() > static_cast(table->num_columns())) + // TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable? + throw Exception{"Number of columns is less than the table has", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH}; + + ++row_group_current; + + NameToColumnPtr name_to_column_ptr; + for (size_t i = 0, num_columns = static_cast(table->num_columns()); i < num_columns; ++i) + { + std::shared_ptr arrow_column = table->column(i); + name_to_column_ptr[arrow_column->name()] = arrow_column; + } + + for (size_t column_i = 0, columns = header.columns(); column_i < columns; ++column_i) + { + ColumnWithTypeAndName header_column = header.getByPosition(column_i); + + if (name_to_column_ptr.find(header_column.name) == name_to_column_ptr.end()) + // TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable? + throw Exception{"Column \"" + header_column.name + "\" is not presented in input data", + ErrorCodes::THERE_IS_NO_COLUMN}; + + std::shared_ptr arrow_column = name_to_column_ptr[header_column.name]; + arrow::Type::type arrow_type = arrow_column->type()->id(); + + // TODO: check if a column is const? + if (!header_column.type->isNullable() && arrow_column->null_count()) + { + throw Exception{"Can not insert NULL data into non-nullable column \"" + header_column.name + "\"", + ErrorCodes::CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN}; + } + + const bool target_column_is_nullable = header_column.type->isNullable() || arrow_column->null_count(); + + DataTypePtr internal_nested_type; + + if (arrow_type == arrow::Type::DECIMAL) + { + const auto decimal_type = static_cast(arrow_column->type().get()); + internal_nested_type = std::make_shared>(decimal_type->precision(), + decimal_type->scale()); + } else if (arrow_type_to_internal_type.find(arrow_type) != arrow_type_to_internal_type.end()) + { + internal_nested_type = arrow_type_to_internal_type.at(arrow_type); + } + else + { + throw Exception + { + "The type \"" + arrow_column->type()->name() + "\" of an input column \"" + arrow_column->name() + + "\" is not supported for conversion from a ORC data format", + ErrorCodes::CANNOT_CONVERT_TYPE}; + } + + const DataTypePtr internal_type = target_column_is_nullable ? makeNullable(internal_nested_type) + : internal_nested_type; + const std::string internal_nested_type_name = internal_nested_type->getName(); + + const DataTypePtr column_nested_type = header_column.type->isNullable() + ? static_cast(header_column.type.get())->getNestedType() + : header_column.type; + + const DataTypePtr column_type = header_column.type; + + const std::string column_nested_type_name = column_nested_type->getName(); + + ColumnWithTypeAndName column; + column.name = header_column.name; + column.type = internal_type; + + /// Data + MutableColumnPtr read_column = internal_nested_type->createColumn(); + switch (arrow_type) + { + case arrow::Type::STRING: + case arrow::Type::BINARY: + //case arrow::Type::FIXED_SIZE_BINARY: + fillColumnWithStringData(arrow_column, read_column); + break; + case arrow::Type::BOOL: + fillColumnWithBooleanData(arrow_column, read_column); + break; + case arrow::Type::DATE32: + fillColumnWithDate32Data(arrow_column, read_column); + break; + case arrow::Type::DATE64: + fillColumnWithDate64Data(arrow_column, read_column); + break; + case arrow::Type::TIMESTAMP: + fillColumnWithTimestampData(arrow_column, read_column); + break; + case arrow::Type::DECIMAL: + //fillColumnWithNumericData>(arrow_column, read_column); // Have problems with trash values under NULL, but faster + fillColumnWithDecimalData(arrow_column, read_column /*, internal_nested_type*/); + + break; +# define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \ + case ARROW_NUMERIC_TYPE: \ + fillColumnWithNumericData(arrow_column, read_column); \ + break; + + FOR_ARROW_NUMERIC_TYPES(DISPATCH) +# undef DISPATCH + // TODO: support TIMESTAMP_MICROS and TIMESTAMP_MILLIS with truncated micro- and milliseconds? + // TODO: read JSON as a string? + // TODO: read UUID as a string? + default: + throw Exception + { + "Unsupported ORC type \"" + arrow_column->type()->name() + "\" of an input column \"" + + arrow_column->name() + "\"", + ErrorCodes::UNKNOWN_TYPE + }; + } + + + if (column.type->isNullable()) + { + MutableColumnPtr null_bytemap = DataTypeUInt8().createColumn(); + fillByteMapFromArrowColumn(arrow_column, null_bytemap); + column.column = ColumnNullable::create(std::move(read_column), std::move(null_bytemap)); + } + else + column.column = std::move(read_column); + + column.column = castColumn(column, column_type, context); + column.type = column_type; + num_rows = column.column->size(); + columns_list.push_back(std::move(column.column)); + } + + res.setColumns(columns_list, num_rows); + } +} +#endif diff --git a/dbms/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h b/dbms/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h new file mode 100644 index 00000000000..cf4f021f8c0 --- /dev/null +++ b/dbms/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h @@ -0,0 +1,45 @@ +#include "config_formats.h" + +#if USE_ORC or USE_PARQUET + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + + class ArrowColumnToCHColumn + { + private: + +# define FOR_ARROW_NUMERIC_TYPES(M) \ + M(arrow::Type::UINT8, DB::UInt8) \ + M(arrow::Type::INT8, DB::Int8) \ + M(arrow::Type::UINT16, DB::UInt16) \ + M(arrow::Type::INT16, DB::Int16) \ + M(arrow::Type::UINT32, DB::UInt32) \ + M(arrow::Type::INT32, DB::Int32) \ + M(arrow::Type::UINT64, DB::UInt64) \ + M(arrow::Type::INT64, DB::Int64) \ + M(arrow::Type::FLOAT, DB::Float32) \ + M(arrow::Type::DOUBLE, DB::Float64) + + + public: + + static void arrowTableToCHChunk(Chunk &res, std::shared_ptr &table, + arrow::Status &read_status, const Block &header, + int &row_group_current, const Context &context); + }; +} +#endif diff --git a/dbms/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp b/dbms/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp new file mode 100644 index 00000000000..4da6b10f3ab --- /dev/null +++ b/dbms/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp @@ -0,0 +1,90 @@ +#include "ORCBlockInputFormat.h" +#if USE_ORC + +#include +#include +#include +#include +#include +#include +#include +#include "ArrowColumnToCHColumn.h" + +namespace DB +{ + ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer &in_, Block header_, const Context &context_) + : IInputFormat(std::move(header_), in_), context{context_} { + } + + Chunk ORCBlockInputFormat::generate() + { + Chunk res; + + auto &header = getPort().getHeader(); + + if (!in.eof()) + { + if (row_group_current < row_group_total) + throw Exception{"Got new data, but data from previous chunks was not read " + + std::to_string(row_group_current) + "/" + std::to_string(row_group_total), + ErrorCodes::CANNOT_READ_ALL_DATA}; + + file_data.clear(); + { + WriteBufferFromString file_buffer(file_data); + copyData(in, file_buffer); + } + + std::unique_ptr local_buffer = std::make_unique(file_data); + + + std::shared_ptr in_stream(new arrow::io::BufferReader(*local_buffer)); + + bool ok = arrow::adapters::orc::ORCFileReader::Open(in_stream, arrow::default_memory_pool(), + &file_reader).ok(); + if (!ok) + return res; + + row_group_total = file_reader->NumberOfRows(); + row_group_current = 0; + + } else + return res; + + if (row_group_current >= row_group_total) + return res; + std::shared_ptr table; + + arrow::Status read_status = file_reader->Read(&table); + + ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, read_status, header, row_group_current, context); + + return res; + } + + void registerInputFormatProcessorORC(FormatFactory &factory) + { + factory.registerInputFormatProcessor( + "ORC", + [](ReadBuffer &buf, + const Block &sample, + const Context &context, + const RowInputFormatParams &, + const FormatSettings & /* settings */) + { + return std::make_shared(buf, sample, context); + }); + } + +} +#else + +namespace DB +{ + class FormatFactory; + void registerInputFormatProcessorORC(FormatFactory &) + { + } +} + +#endif diff --git a/dbms/src/Processors/Formats/Impl/ORCBlockInputFormat.h b/dbms/src/Processors/Formats/Impl/ORCBlockInputFormat.h new file mode 100644 index 00000000000..afc65a2e912 --- /dev/null +++ b/dbms/src/Processors/Formats/Impl/ORCBlockInputFormat.h @@ -0,0 +1,40 @@ +#pragma once + +#include "config_formats.h" +#include +#include +#include + +#if USE_ORC + +#include "arrow/adapters/orc/adapter.h" +#include "arrow/io/interfaces.h" + +namespace DB +{ +class Context; + +class ORCBlockInputFormat: public IInputFormat +{ +public: + ORCBlockInputFormat(ReadBuffer & in_, Block header_, const Context & context_); + + String getName() const override { return "ORCBlockInputFormat"; } + +protected: + Chunk generate() override; + +private: + + // TODO: check that this class implements every part of its parent + + const Context & context; + + std::unique_ptr file_reader; + std::string file_data; + int row_group_total = 0; + int row_group_current = 0; +}; + +} +#endif diff --git a/dbms/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/dbms/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 4f42597eac5..32a55c70e55 100644 --- a/dbms/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/dbms/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -1,479 +1,82 @@ #include "ParquetBlockInputFormat.h" #if USE_PARQUET -#include -#include -#include -// TODO: clear includes -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include #include #include #include #include #include #include -#include -#include -#include #include #include #include +#include "ArrowColumnToCHColumn.h" namespace DB { -namespace ErrorCodes -{ - extern const int UNKNOWN_TYPE; - extern const int VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE; - extern const int CANNOT_READ_ALL_DATA; - extern const int EMPTY_DATA_PASSED; - extern const int SIZES_OF_COLUMNS_DOESNT_MATCH; - extern const int CANNOT_CONVERT_TYPE; - extern const int CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN; - extern const int THERE_IS_NO_COLUMN; -} -ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_, const Context & context_) - : IInputFormat(std::move(header_), in_), context{context_} -{ -} - -/// Inserts numeric data right into internal column data to reduce an overhead -template > -static void fillColumnWithNumericData(std::shared_ptr & arrow_column, MutableColumnPtr & internal_column) -{ - auto & column_data = static_cast(*internal_column).getData(); - column_data.reserve(arrow_column->length()); - - for (size_t chunk_i = 0, num_chunks = static_cast(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i) - { - std::shared_ptr chunk = arrow_column->data()->chunk(chunk_i); - /// buffers[0] is a null bitmap and buffers[1] are actual values - std::shared_ptr buffer = chunk->data()->buffers[1]; - - const auto * raw_data = reinterpret_cast(buffer->data()); - column_data.insert_assume_reserved(raw_data, raw_data + chunk->length()); - } -} - -/// Inserts chars and offsets right into internal column data to reduce an overhead. -/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars. -/// Also internal strings are null terminated. -static void fillColumnWithStringData(std::shared_ptr & arrow_column, MutableColumnPtr & internal_column) -{ - PaddedPODArray & column_chars_t = static_cast(*internal_column).getChars(); - PaddedPODArray & column_offsets = static_cast(*internal_column).getOffsets(); - - size_t chars_t_size = 0; - for (size_t chunk_i = 0, num_chunks = static_cast(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i) - { - arrow::BinaryArray & chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); - const size_t chunk_length = chunk.length(); - - chars_t_size += chunk.value_offset(chunk_length - 1) + chunk.value_length(chunk_length - 1); - chars_t_size += chunk_length; /// additional space for null bytes + ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer &in_, Block header_, const Context &context_) + : IInputFormat(std::move(header_), in_), context{context_} { } - column_chars_t.reserve(chars_t_size); - column_offsets.reserve(arrow_column->length()); - - for (size_t chunk_i = 0, num_chunks = static_cast(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i) + Chunk ParquetBlockInputFormat::generate() { - arrow::BinaryArray & chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); - std::shared_ptr buffer = chunk.value_data(); - const size_t chunk_length = chunk.length(); + Chunk res; + auto &header = getPort().getHeader(); - for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i) + if (!in.eof()) { - if (!chunk.IsNull(offset_i) && buffer) + /* + First we load whole stream into string (its very bad and limiting .parquet file size to half? of RAM) + Then producing blocks for every row_group (dont load big .parquet files with one row_group - it can eat x10+ RAM from .parquet file size) + */ + + if (row_group_current < row_group_total) + throw Exception{"Got new data, but data from previous chunks was not read " + + std::to_string(row_group_current) + "/" + std::to_string(row_group_total), + ErrorCodes::CANNOT_READ_ALL_DATA}; + + file_data.clear(); { - const UInt8 * raw_data = buffer->data() + chunk.value_offset(offset_i); - column_chars_t.insert_assume_reserved(raw_data, raw_data + chunk.value_length(offset_i)); - } - column_chars_t.emplace_back('\0'); - - column_offsets.emplace_back(column_chars_t.size()); - } - } -} - -static void fillColumnWithBooleanData(std::shared_ptr & arrow_column, MutableColumnPtr & internal_column) -{ - auto & column_data = static_cast &>(*internal_column).getData(); - column_data.reserve(arrow_column->length()); - - for (size_t chunk_i = 0, num_chunks = static_cast(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i) - { - arrow::BooleanArray & chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); - /// buffers[0] is a null bitmap and buffers[1] are actual values - std::shared_ptr buffer = chunk.data()->buffers[1]; - - for (size_t bool_i = 0; bool_i != static_cast(chunk.length()); ++bool_i) - column_data.emplace_back(chunk.Value(bool_i)); - } -} - -/// Arrow stores Parquet::DATE in Int32, while ClickHouse stores Date in UInt16. Therefore, it should be checked before saving -static void fillColumnWithDate32Data(std::shared_ptr & arrow_column, MutableColumnPtr & internal_column) -{ - PaddedPODArray & column_data = static_cast &>(*internal_column).getData(); - column_data.reserve(arrow_column->length()); - - for (size_t chunk_i = 0, num_chunks = static_cast(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i) - { - arrow::Date32Array & chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); - - for (size_t value_i = 0, length = static_cast(chunk.length()); value_i < length; ++value_i) - { - UInt32 days_num = static_cast(chunk.Value(value_i)); - if (days_num > DATE_LUT_MAX_DAY_NUM) - { - // TODO: will it rollback correctly? - throw Exception{"Input value " + std::to_string(days_num) + " of a column \"" + arrow_column->name() - + "\" is greater than " - "max allowed Date value, which is " - + std::to_string(DATE_LUT_MAX_DAY_NUM), - ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE}; + WriteBufferFromString file_buffer(file_data); + copyData(in, file_buffer); } - column_data.emplace_back(days_num); + buffer = std::make_unique(file_data); + // TODO: maybe use parquet::RandomAccessSource? + auto reader = parquet::ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(*buffer)); + file_reader = std::make_unique(::arrow::default_memory_pool(), + std::move(reader)); + row_group_total = file_reader->num_row_groups(); + row_group_current = 0; } - } -} + //DUMP(row_group_current, row_group_total); + if (row_group_current >= row_group_total) + return res; -/// Arrow stores Parquet::DATETIME in Int64, while ClickHouse stores DateTime in UInt32. Therefore, it should be checked before saving -static void fillColumnWithDate64Data(std::shared_ptr & arrow_column, MutableColumnPtr & internal_column) -{ - auto & column_data = static_cast &>(*internal_column).getData(); - column_data.reserve(arrow_column->length()); + // TODO: also catch a ParquetException thrown by filereader? + //arrow::Status read_status = filereader.ReadTable(&table); + std::shared_ptr table; + arrow::Status read_status = file_reader->ReadRowGroup(row_group_current, &table); - for (size_t chunk_i = 0, num_chunks = static_cast(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i) - { - auto & chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); - for (size_t value_i = 0, length = static_cast(chunk.length()); value_i < length; ++value_i) - { - auto timestamp = static_cast(chunk.Value(value_i) / 1000); // Always? in ms - column_data.emplace_back(timestamp); - } - } -} - -static void fillColumnWithTimestampData(std::shared_ptr & arrow_column, MutableColumnPtr & internal_column) -{ - auto & column_data = static_cast &>(*internal_column).getData(); - column_data.reserve(arrow_column->length()); - - for (size_t chunk_i = 0, num_chunks = static_cast(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i) - { - auto & chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); - const auto & type = static_cast(*chunk.type()); - - UInt32 divide = 1; - const auto unit = type.unit(); - switch (unit) - { - case arrow::TimeUnit::SECOND: - divide = 1; - break; - case arrow::TimeUnit::MILLI: - divide = 1000; - break; - case arrow::TimeUnit::MICRO: - divide = 1000000; - break; - case arrow::TimeUnit::NANO: - divide = 1000000000; - break; - } - - for (size_t value_i = 0, length = static_cast(chunk.length()); value_i < length; ++value_i) - { - auto timestamp = static_cast(chunk.Value(value_i) / divide); // ms! TODO: check other 's' 'ns' ... - column_data.emplace_back(timestamp); - } - } -} - -static void fillColumnWithDecimalData(std::shared_ptr & arrow_column, MutableColumnPtr & internal_column) -{ - auto & column = static_cast &>(*internal_column); - auto & column_data = column.getData(); - column_data.reserve(arrow_column->length()); - - for (size_t chunk_i = 0, num_chunks = static_cast(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i) - { - auto & chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); - for (size_t value_i = 0, length = static_cast(chunk.length()); value_i < length; ++value_i) - { - column_data.emplace_back(chunk.IsNull(value_i) ? Decimal128(0) : *reinterpret_cast(chunk.Value(value_i))); // TODO: copy column - } - } -} - -/// Creates a null bytemap from arrow's null bitmap -static void fillByteMapFromArrowColumn(std::shared_ptr & arrow_column, MutableColumnPtr & bytemap) -{ - PaddedPODArray & bytemap_data = static_cast &>(*bytemap).getData(); - bytemap_data.reserve(arrow_column->length()); - - for (size_t chunk_i = 0; chunk_i != static_cast(arrow_column->data()->num_chunks()); ++chunk_i) - { - std::shared_ptr chunk = arrow_column->data()->chunk(chunk_i); - - for (size_t value_i = 0; value_i != static_cast(chunk->length()); ++value_i) - bytemap_data.emplace_back(chunk->IsNull(value_i)); - } -} - -# define FOR_ARROW_NUMERIC_TYPES(M) \ - M(arrow::Type::UINT8, UInt8) \ - M(arrow::Type::INT8, Int8) \ - M(arrow::Type::UINT16, UInt16) \ - M(arrow::Type::INT16, Int16) \ - M(arrow::Type::UINT32, UInt32) \ - M(arrow::Type::INT32, Int32) \ - M(arrow::Type::UINT64, UInt64) \ - M(arrow::Type::INT64, Int64) \ - M(arrow::Type::FLOAT, Float32) \ - M(arrow::Type::DOUBLE, Float64) -//M(arrow::Type::HALF_FLOAT, Float32) // TODO - - -using NameToColumnPtr = std::unordered_map>; - -const std::unordered_map> arrow_type_to_internal_type = { - //{arrow::Type::DECIMAL, std::make_shared()}, - {arrow::Type::UINT8, std::make_shared()}, - {arrow::Type::INT8, std::make_shared()}, - {arrow::Type::UINT16, std::make_shared()}, - {arrow::Type::INT16, std::make_shared()}, - {arrow::Type::UINT32, std::make_shared()}, - {arrow::Type::INT32, std::make_shared()}, - {arrow::Type::UINT64, std::make_shared()}, - {arrow::Type::INT64, std::make_shared()}, - {arrow::Type::HALF_FLOAT, std::make_shared()}, - {arrow::Type::FLOAT, std::make_shared()}, - {arrow::Type::DOUBLE, std::make_shared()}, - - {arrow::Type::BOOL, std::make_shared()}, - //{arrow::Type::DATE32, std::make_shared()}, - {arrow::Type::DATE32, std::make_shared()}, - //{arrow::Type::DATE32, std::make_shared()}, - {arrow::Type::DATE64, std::make_shared()}, - {arrow::Type::TIMESTAMP, std::make_shared()}, - //{arrow::Type::TIME32, std::make_shared()}, - - - {arrow::Type::STRING, std::make_shared()}, - {arrow::Type::BINARY, std::make_shared()}, - //{arrow::Type::FIXED_SIZE_BINARY, std::make_shared()}, - //{arrow::Type::UUID, std::make_shared()}, - - - // TODO: add other types that are convertable to internal ones: - // 0. ENUM? - // 1. UUID -> String - // 2. JSON -> String - // Full list of types: contrib/arrow/cpp/src/arrow/type.h -}; - - -Chunk ParquetBlockInputFormat::generate() -{ - Chunk res; - Columns columns_list; - UInt64 num_rows = 0; - auto & header = getPort().getHeader(); - columns_list.reserve(header.rows()); - - if (!in.eof()) - { - /* - First we load whole stream into string (its very bad and limiting .parquet file size to half? of RAM) - Then producing blocks for every row_group (dont load big .parquet files with one row_group - it can eat x10+ RAM from .parquet file size) - */ - - if (row_group_current < row_group_total) - throw Exception{"Got new data, but data from previous chunks not readed " + std::to_string(row_group_current) + "/" + std::to_string(row_group_total), ErrorCodes::CANNOT_READ_ALL_DATA}; - - file_data.clear(); - { - WriteBufferFromString file_buffer(file_data); - copyData(in, file_buffer); - } - - buffer = std::make_unique(file_data); - // TODO: maybe use parquet::RandomAccessSource? - auto reader = parquet::ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(*buffer)); - file_reader = std::make_unique(::arrow::default_memory_pool(), std::move(reader)); - row_group_total = file_reader->num_row_groups(); - row_group_current = 0; - } - //DUMP(row_group_current, row_group_total); - if (row_group_current >= row_group_total) + ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, read_status, header, row_group_current, context); return res; - - // TODO: also catch a ParquetException thrown by filereader? - //arrow::Status read_status = filereader.ReadTable(&table); - std::shared_ptr table; - arrow::Status read_status = file_reader->ReadRowGroup(row_group_current, &table); - - if (!read_status.ok()) - throw Exception{"Error while reading parquet data: " + read_status.ToString(), ErrorCodes::CANNOT_READ_ALL_DATA}; - - if (0 == table->num_rows()) - throw Exception{"Empty table in input data", ErrorCodes::EMPTY_DATA_PASSED}; - - if (header.columns() > static_cast(table->num_columns())) - // TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable? - throw Exception{"Number of columns is less than the table has", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH}; - - ++row_group_current; - - NameToColumnPtr name_to_column_ptr; - for (size_t i = 0, num_columns = static_cast(table->num_columns()); i < num_columns; ++i) - { - std::shared_ptr arrow_column = table->column(i); - name_to_column_ptr[arrow_column->name()] = arrow_column; } - for (size_t column_i = 0, columns = header.columns(); column_i < columns; ++column_i) + void registerInputFormatProcessorParquet(FormatFactory &factory) { - ColumnWithTypeAndName header_column = header.getByPosition(column_i); - - if (name_to_column_ptr.find(header_column.name) == name_to_column_ptr.end()) - // TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable? - throw Exception{"Column \"" + header_column.name + "\" is not presented in input data", ErrorCodes::THERE_IS_NO_COLUMN}; - - std::shared_ptr arrow_column = name_to_column_ptr[header_column.name]; - arrow::Type::type arrow_type = arrow_column->type()->id(); - - // TODO: check if a column is const? - if (!header_column.type->isNullable() && arrow_column->null_count()) - { - throw Exception{"Can not insert NULL data into non-nullable column \"" + header_column.name + "\"", - ErrorCodes::CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN}; - } - - const bool target_column_is_nullable = header_column.type->isNullable() || arrow_column->null_count(); - - DataTypePtr internal_nested_type; - - if (arrow_type == arrow::Type::DECIMAL) - { - const auto decimal_type = static_cast(arrow_column->type().get()); - internal_nested_type = std::make_shared>(decimal_type->precision(), decimal_type->scale()); - } - else if (arrow_type_to_internal_type.find(arrow_type) != arrow_type_to_internal_type.end()) - { - internal_nested_type = arrow_type_to_internal_type.at(arrow_type); - } - else - { - throw Exception{"The type \"" + arrow_column->type()->name() + "\" of an input column \"" + arrow_column->name() - + "\" is not supported for conversion from a Parquet data format", - ErrorCodes::CANNOT_CONVERT_TYPE}; - } - - const DataTypePtr internal_type = target_column_is_nullable ? makeNullable(internal_nested_type) : internal_nested_type; - const std::string internal_nested_type_name = internal_nested_type->getName(); - - const DataTypePtr column_nested_type = header_column.type->isNullable() - ? static_cast(header_column.type.get())->getNestedType() - : header_column.type; - - const DataTypePtr column_type = header_column.type; - - const std::string column_nested_type_name = column_nested_type->getName(); - - ColumnWithTypeAndName column; - column.name = header_column.name; - column.type = internal_type; - - /// Data - MutableColumnPtr read_column = internal_nested_type->createColumn(); - switch (arrow_type) - { - case arrow::Type::STRING: - case arrow::Type::BINARY: - //case arrow::Type::FIXED_SIZE_BINARY: - fillColumnWithStringData(arrow_column, read_column); - break; - case arrow::Type::BOOL: - fillColumnWithBooleanData(arrow_column, read_column); - break; - case arrow::Type::DATE32: - fillColumnWithDate32Data(arrow_column, read_column); - break; - case arrow::Type::DATE64: - fillColumnWithDate64Data(arrow_column, read_column); - break; - case arrow::Type::TIMESTAMP: - fillColumnWithTimestampData(arrow_column, read_column); - break; - case arrow::Type::DECIMAL: - //fillColumnWithNumericData>(arrow_column, read_column); // Have problems with trash values under NULL, but faster - fillColumnWithDecimalData(arrow_column, read_column /*, internal_nested_type*/); - break; -# define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \ - case ARROW_NUMERIC_TYPE: \ - fillColumnWithNumericData(arrow_column, read_column); \ - break; - - FOR_ARROW_NUMERIC_TYPES(DISPATCH) -# undef DISPATCH - // TODO: support TIMESTAMP_MICROS and TIMESTAMP_MILLIS with truncated micro- and milliseconds? - // TODO: read JSON as a string? - // TODO: read UUID as a string? - default: - throw Exception{"Unsupported parquet type \"" + arrow_column->type()->name() + "\" of an input column \"" - + arrow_column->name() + "\"", - ErrorCodes::UNKNOWN_TYPE}; - } - - if (column.type->isNullable()) - { - MutableColumnPtr null_bytemap = DataTypeUInt8().createColumn(); - fillByteMapFromArrowColumn(arrow_column, null_bytemap); - column.column = ColumnNullable::create(std::move(read_column), std::move(null_bytemap)); - } - else - { - column.column = std::move(read_column); - } - - column.column = castColumn(column, column_type, context); - column.type = column_type; - num_rows = column.column->size(); - columns_list.push_back(std::move(column.column)); + factory.registerInputFormatProcessor( + "Parquet", + [](ReadBuffer &buf, + const Block &sample, + const Context &context, + const RowInputFormatParams &, + const FormatSettings & /* settings */) + { + return std::make_shared(buf, sample, context); + }); } - res.setColumns(columns_list, num_rows); - return res; -} - -void registerInputFormatProcessorParquet(FormatFactory & factory) -{ - factory.registerInputFormatProcessor( - "Parquet", - [](ReadBuffer & buf, - const Block & sample, - const Context & context, - const RowInputFormatParams &, - const FormatSettings & /* settings */){ return std::make_shared(buf, sample, context); }); -} - } #else