mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
ADQM-34 add orc input format
This commit is contained in:
parent
b9870245fd
commit
caf0d30a5f
3
.gitmodules
vendored
3
.gitmodules
vendored
@ -103,3 +103,6 @@
|
|||||||
[submodule "contrib/fastops"]
|
[submodule "contrib/fastops"]
|
||||||
path = contrib/fastops
|
path = contrib/fastops
|
||||||
url = https://github.com/ClickHouse-Extras/fastops
|
url = https://github.com/ClickHouse-Extras/fastops
|
||||||
|
[submodule "contrib/orc"]
|
||||||
|
path = contrib/orc
|
||||||
|
url = https://github.com/apache/orc
|
||||||
|
@ -481,6 +481,7 @@ include (cmake/find_mimalloc.cmake)
|
|||||||
include (cmake/find_simdjson.cmake)
|
include (cmake/find_simdjson.cmake)
|
||||||
include (cmake/find_rapidjson.cmake)
|
include (cmake/find_rapidjson.cmake)
|
||||||
include (cmake/find_fastops.cmake)
|
include (cmake/find_fastops.cmake)
|
||||||
|
include (cmake/find_orc.cmake)
|
||||||
|
|
||||||
find_contrib_lib(cityhash)
|
find_contrib_lib(cityhash)
|
||||||
find_contrib_lib(farmhash)
|
find_contrib_lib(farmhash)
|
||||||
|
8
cmake/find_orc.cmake
Normal file
8
cmake/find_orc.cmake
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
##TODO replace hardcode to find procedure
|
||||||
|
|
||||||
|
set(USE_ORC 0)
|
||||||
|
set(USE_INTERNAL_ORC_LIBRARY ON)
|
||||||
|
|
||||||
|
if (ARROW_LIBRARY)
|
||||||
|
set(USE_ORC 1)
|
||||||
|
endif()
|
12
contrib/CMakeLists.txt
vendored
12
contrib/CMakeLists.txt
vendored
@ -10,6 +10,18 @@ endif ()
|
|||||||
|
|
||||||
set_property(DIRECTORY PROPERTY EXCLUDE_FROM_ALL 1)
|
set_property(DIRECTORY PROPERTY EXCLUDE_FROM_ALL 1)
|
||||||
|
|
||||||
|
if (USE_INTERNAL_ORC_LIBRARY)
|
||||||
|
set(BUILD_JAVA OFF)
|
||||||
|
set (ANALYZE_JAVA OFF)
|
||||||
|
set (BUILD_CPP_TESTS OFF)
|
||||||
|
set (BUILD_TOOLS OFF)
|
||||||
|
option(BUILD_JAVA OFF)
|
||||||
|
option (ANALYZE_JAVA OFF)
|
||||||
|
option (BUILD_CPP_TESTS OFF)
|
||||||
|
option (BUILD_TOOLS OFF)
|
||||||
|
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/contrib/orc/cmake_modules")
|
||||||
|
add_subdirectory(orc)
|
||||||
|
endif()
|
||||||
|
|
||||||
if (USE_INTERNAL_UNWIND_LIBRARY)
|
if (USE_INTERNAL_UNWIND_LIBRARY)
|
||||||
add_subdirectory (libunwind-cmake)
|
add_subdirectory (libunwind-cmake)
|
||||||
|
@ -47,6 +47,71 @@ target_include_directories(${THRIFT_LIBRARY} SYSTEM PUBLIC ${ClickHouse_SOURCE_D
|
|||||||
target_link_libraries(${THRIFT_LIBRARY} PRIVATE Threads::Threads)
|
target_link_libraries(${THRIFT_LIBRARY} PRIVATE Threads::Threads)
|
||||||
|
|
||||||
|
|
||||||
|
# === orc
|
||||||
|
|
||||||
|
set(ORC_SOURCE_DIR ${ClickHouse_SOURCE_DIR}/contrib/orc/c++)
|
||||||
|
set(ORC_INCLUDE_DIR ${ORC_SOURCE_DIR}/include)
|
||||||
|
set(ORC_SOURCE_SRC_DIR ${ORC_SOURCE_DIR}/src)
|
||||||
|
set(ORC_SOURCE_WRAP_DIR ${ORC_SOURCE_DIR}/wrap)
|
||||||
|
|
||||||
|
set(ORC_BUILD_SRC_DIR ${CMAKE_CURRENT_BINARY_DIR}/../orc/c++/src)
|
||||||
|
set(ORC_BUILD_INCLUDE_DIR ${CMAKE_CURRENT_BINARY_DIR}/../orc/c++/include)
|
||||||
|
|
||||||
|
set(GOOGLE_PROTOBUF_DIR ${ClickHouse_SOURCE_DIR}/contrib/protobuf/src/)
|
||||||
|
set(ORC_ADDITION_SOURCE_DIR ${CMAKE_CURRENT_BINARY_DIR})
|
||||||
|
set(ARROW_SRC_DIR ${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src)
|
||||||
|
|
||||||
|
set(PROTOBUF_EXECUTABLE ${CMAKE_CURRENT_BINARY_DIR}/../protobuf/cmake/protoc)
|
||||||
|
set(PROTO_DIR ${ORC_SOURCE_DIR}/../proto)
|
||||||
|
|
||||||
|
|
||||||
|
add_custom_command(OUTPUT orc_proto.pb.h orc_proto.pb.cc
|
||||||
|
COMMAND ${PROTOBUF_EXECUTABLE}
|
||||||
|
-I ${PROTO_DIR}
|
||||||
|
--cpp_out="${CMAKE_CURRENT_BINARY_DIR}"
|
||||||
|
"${PROTO_DIR}/orc_proto.proto")
|
||||||
|
|
||||||
|
include_directories(SYSTEM ${ORC_INCLUDE_DIR})
|
||||||
|
include_directories(SYSTEM ${ORC_SOURCE_SRC_DIR})
|
||||||
|
include_directories(SYSTEM ${ORC_SOURCE_WRAP_DIR})
|
||||||
|
include_directories(SYSTEM ${GOOGLE_PROTOBUF_DIR})
|
||||||
|
include_directories(SYSTEM ${ORC_BUILD_SRC_DIR})
|
||||||
|
include_directories(SYSTEM ${ORC_BUILD_INCLUDE_DIR})
|
||||||
|
include_directories(SYSTEM ${ORC_ADDITION_SOURCE_DIR})
|
||||||
|
include_directories(SYSTEM ${ARROW_SRC_DIR})
|
||||||
|
|
||||||
|
|
||||||
|
set(ORC_SRCS
|
||||||
|
${ARROW_SRC_DIR}/arrow/adapters/orc/adapter.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/Exceptions.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/OrcFile.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/Reader.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/ByteRLE.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/ColumnPrinter.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/ColumnReader.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/ColumnWriter.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/Common.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/Compression.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/Exceptions.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/Int128.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/LzoDecompressor.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/MemoryPool.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/OrcFile.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/Reader.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/RLE.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/RLEv1.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/RLEv2.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/Statistics.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/StripeStream.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/Timezone.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/TypeImpl.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/Vector.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/Writer.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/io/InputStream.cc
|
||||||
|
${ORC_SOURCE_SRC_DIR}/io/OutputStream.cc
|
||||||
|
${ORC_ADDITION_SOURCE_DIR}/orc_proto.pb.cc
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# === arrow
|
# === arrow
|
||||||
|
|
||||||
@ -103,6 +168,7 @@ set(ARROW_SRCS
|
|||||||
${LIBRARY_DIR}/util/thread-pool.cc
|
${LIBRARY_DIR}/util/thread-pool.cc
|
||||||
${LIBRARY_DIR}/util/trie.cc
|
${LIBRARY_DIR}/util/trie.cc
|
||||||
${LIBRARY_DIR}/util/utf8.cc
|
${LIBRARY_DIR}/util/utf8.cc
|
||||||
|
${ORC_SRCS}
|
||||||
)
|
)
|
||||||
|
|
||||||
set(ARROW_SRCS ${ARROW_SRCS}
|
set(ARROW_SRCS ${ARROW_SRCS}
|
||||||
@ -151,6 +217,7 @@ endif()
|
|||||||
|
|
||||||
|
|
||||||
add_library(${ARROW_LIBRARY} ${ARROW_SRCS})
|
add_library(${ARROW_LIBRARY} ${ARROW_SRCS})
|
||||||
|
add_dependencies(${ARROW_LIBRARY} protoc)
|
||||||
target_include_directories(${ARROW_LIBRARY} SYSTEM PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/cpp/src ${Boost_INCLUDE_DIRS})
|
target_include_directories(${ARROW_LIBRARY} SYSTEM PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/cpp/src ${Boost_INCLUDE_DIRS})
|
||||||
target_link_libraries(${ARROW_LIBRARY} PRIVATE ${DOUBLE_CONVERSION_LIBRARIES} Threads::Threads)
|
target_link_libraries(${ARROW_LIBRARY} PRIVATE ${DOUBLE_CONVERSION_LIBRARIES} Threads::Threads)
|
||||||
if (ARROW_WITH_LZ4)
|
if (ARROW_WITH_LZ4)
|
||||||
|
1
contrib/orc
vendored
Submodule
1
contrib/orc
vendored
Submodule
@ -0,0 +1 @@
|
|||||||
|
Subproject commit 5981208e39447df84827f6a961d1da76bacb6078
|
@ -238,6 +238,7 @@ void registerOutputFormatProcessorTSKV(FormatFactory & factory);
|
|||||||
void registerInputFormatProcessorJSONEachRow(FormatFactory & factory);
|
void registerInputFormatProcessorJSONEachRow(FormatFactory & factory);
|
||||||
void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory);
|
void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory);
|
||||||
void registerInputFormatProcessorParquet(FormatFactory & factory);
|
void registerInputFormatProcessorParquet(FormatFactory & factory);
|
||||||
|
void registerInputFormatProcessorORC(FormatFactory & factory);
|
||||||
void registerOutputFormatProcessorParquet(FormatFactory & factory);
|
void registerOutputFormatProcessorParquet(FormatFactory & factory);
|
||||||
void registerInputFormatProcessorProtobuf(FormatFactory & factory);
|
void registerInputFormatProcessorProtobuf(FormatFactory & factory);
|
||||||
void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
|
void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
|
||||||
@ -288,6 +289,7 @@ FormatFactory::FormatFactory()
|
|||||||
registerInputFormatProcessorProtobuf(*this);
|
registerInputFormatProcessorProtobuf(*this);
|
||||||
registerOutputFormatProcessorProtobuf(*this);
|
registerOutputFormatProcessorProtobuf(*this);
|
||||||
registerInputFormatProcessorCapnProto(*this);
|
registerInputFormatProcessorCapnProto(*this);
|
||||||
|
registerInputFormatProcessorORC(*this);
|
||||||
registerInputFormatProcessorParquet(*this);
|
registerInputFormatProcessorParquet(*this);
|
||||||
registerOutputFormatProcessorParquet(*this);
|
registerOutputFormatProcessorParquet(*this);
|
||||||
|
|
||||||
|
@ -5,4 +5,5 @@
|
|||||||
#cmakedefine01 USE_CAPNP
|
#cmakedefine01 USE_CAPNP
|
||||||
#cmakedefine01 USE_SNAPPY
|
#cmakedefine01 USE_SNAPPY
|
||||||
#cmakedefine01 USE_PARQUET
|
#cmakedefine01 USE_PARQUET
|
||||||
|
#cmakedefine01 USE_ORC
|
||||||
#cmakedefine01 USE_PROTOBUF
|
#cmakedefine01 USE_PROTOBUF
|
||||||
|
420
dbms/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp
Normal file
420
dbms/src/Processors/Formats/Impl/ArrowColumnToCHColumn.cpp
Normal file
@ -0,0 +1,420 @@
|
|||||||
|
#include "config_formats.h"
|
||||||
|
#include "ArrowColumnToCHColumn.h"
|
||||||
|
|
||||||
|
#if USE_ORC or USE_PARQUET
|
||||||
|
#include <DataTypes/DataTypeFactory.h>
|
||||||
|
#include <DataTypes/DataTypeNullable.h>
|
||||||
|
#include <DataTypes/DataTypeString.h>
|
||||||
|
#include <DataTypes/DataTypesDecimal.h>
|
||||||
|
#include <DataTypes/DataTypesNumber.h>
|
||||||
|
#include <common/DateLUTImpl.h>
|
||||||
|
#include <Core/Types.h>
|
||||||
|
#include <Core/Block.h>
|
||||||
|
#include <Columns/ColumnString.h>
|
||||||
|
#include <Columns/ColumnNullable.h>
|
||||||
|
#include <Interpreters/castColumn.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
const std::unordered_map<arrow::Type::type, std::shared_ptr<IDataType>> arrow_type_to_internal_type ={
|
||||||
|
//{arrow::Type::DECIMAL, std::make_shared<DataTypeDecimal>()},
|
||||||
|
{arrow::Type::UINT8, std::make_shared<DataTypeUInt8>()},
|
||||||
|
{arrow::Type::INT8, std::make_shared<DataTypeInt8>()},
|
||||||
|
{arrow::Type::UINT16, std::make_shared<DataTypeUInt16>()},
|
||||||
|
{arrow::Type::INT16, std::make_shared<DataTypeInt16>()},
|
||||||
|
{arrow::Type::UINT32, std::make_shared<DataTypeUInt32>()},
|
||||||
|
{arrow::Type::INT32, std::make_shared<DataTypeInt32>()},
|
||||||
|
{arrow::Type::UINT64, std::make_shared<DataTypeUInt64>()},
|
||||||
|
{arrow::Type::INT64, std::make_shared<DataTypeInt64>()},
|
||||||
|
{arrow::Type::HALF_FLOAT, std::make_shared<DataTypeFloat32>()},
|
||||||
|
{arrow::Type::FLOAT, std::make_shared<DataTypeFloat32>()},
|
||||||
|
{arrow::Type::DOUBLE, std::make_shared<DataTypeFloat64>()},
|
||||||
|
|
||||||
|
{arrow::Type::BOOL, std::make_shared<DataTypeUInt8>()},
|
||||||
|
//{arrow::Type::DATE32, std::make_shared<DataTypeDate>()},
|
||||||
|
{arrow::Type::DATE32, std::make_shared<DataTypeDate>()},
|
||||||
|
//{arrow::Type::DATE32, std::make_shared<DataTypeDateTime>()},
|
||||||
|
{arrow::Type::DATE64, std::make_shared<DataTypeDateTime>()},
|
||||||
|
{arrow::Type::TIMESTAMP, std::make_shared<DataTypeDateTime>()},
|
||||||
|
//{arrow::Type::TIME32, std::make_shared<DataTypeDateTime>()},
|
||||||
|
|
||||||
|
|
||||||
|
{arrow::Type::STRING, std::make_shared<DataTypeString>()},
|
||||||
|
{arrow::Type::BINARY, std::make_shared<DataTypeString>()},
|
||||||
|
//{arrow::Type::FIXED_SIZE_BINARY, std::make_shared<DataTypeString>()},
|
||||||
|
//{arrow::Type::UUID, std::make_shared<DataTypeString>()},
|
||||||
|
|
||||||
|
|
||||||
|
// TODO: add other types that are convertable to internal ones:
|
||||||
|
// 0. ENUM?
|
||||||
|
// 1. UUID -> String
|
||||||
|
// 2. JSON -> String
|
||||||
|
// Full list of types: contrib/arrow/cpp/src/arrow/type.h
|
||||||
|
};
|
||||||
|
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int UNKNOWN_TYPE;
|
||||||
|
extern const int VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE;
|
||||||
|
extern const int CANNOT_READ_ALL_DATA;
|
||||||
|
extern const int EMPTY_DATA_PASSED;
|
||||||
|
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
|
||||||
|
extern const int CANNOT_CONVERT_TYPE;
|
||||||
|
extern const int CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN;
|
||||||
|
extern const int THERE_IS_NO_COLUMN;
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename NumericType, typename VectorType = ColumnVector<NumericType>>
|
||||||
|
static void
|
||||||
|
fillColumnWithNumericData(std::shared_ptr<arrow::Column> &arrow_column, MutableColumnPtr &internal_column)
|
||||||
|
{
|
||||||
|
auto &column_data = static_cast<VectorType &>(*internal_column).getData();
|
||||||
|
column_data.reserve(arrow_column->length());
|
||||||
|
|
||||||
|
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks());
|
||||||
|
chunk_i < num_chunks; ++chunk_i)
|
||||||
|
{
|
||||||
|
std::shared_ptr<arrow::Array> chunk = arrow_column->data()->chunk(chunk_i);
|
||||||
|
/// buffers[0] is a null bitmap and buffers[1] are actual values
|
||||||
|
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
|
||||||
|
|
||||||
|
const auto *raw_data = reinterpret_cast<const NumericType *>(buffer->data());
|
||||||
|
column_data.insert_assume_reserved(raw_data, raw_data + chunk->length());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void fillColumnWithStringData(std::shared_ptr<arrow::Column> &arrow_column, MutableColumnPtr &internal_column)
|
||||||
|
{
|
||||||
|
PaddedPODArray<UInt8> &column_chars_t = static_cast<ColumnString &>(*internal_column).getChars();
|
||||||
|
PaddedPODArray<UInt64> &column_offsets = static_cast<ColumnString &>(*internal_column).getOffsets();
|
||||||
|
|
||||||
|
size_t chars_t_size = 0;
|
||||||
|
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks());
|
||||||
|
chunk_i < num_chunks; ++chunk_i)
|
||||||
|
{
|
||||||
|
arrow::BinaryArray &chunk = static_cast<arrow::BinaryArray &>(*(arrow_column->data()->chunk(chunk_i)));
|
||||||
|
const size_t chunk_length = chunk.length();
|
||||||
|
|
||||||
|
chars_t_size += chunk.value_offset(chunk_length - 1) + chunk.value_length(chunk_length - 1);
|
||||||
|
chars_t_size += chunk_length; /// additional space for null bytes
|
||||||
|
}
|
||||||
|
|
||||||
|
column_chars_t.reserve(chars_t_size);
|
||||||
|
column_offsets.reserve(arrow_column->length());
|
||||||
|
|
||||||
|
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks());
|
||||||
|
chunk_i < num_chunks; ++chunk_i)
|
||||||
|
{
|
||||||
|
arrow::BinaryArray &chunk = static_cast<arrow::BinaryArray &>(*(arrow_column->data()->chunk(chunk_i)));
|
||||||
|
std::shared_ptr<arrow::Buffer> buffer = chunk.value_data();
|
||||||
|
const size_t chunk_length = chunk.length();
|
||||||
|
|
||||||
|
for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i)
|
||||||
|
{
|
||||||
|
if (!chunk.IsNull(offset_i) && buffer)
|
||||||
|
{
|
||||||
|
const UInt8 *raw_data = buffer->data() + chunk.value_offset(offset_i);
|
||||||
|
column_chars_t.insert_assume_reserved(raw_data, raw_data + chunk.value_length(offset_i));
|
||||||
|
}
|
||||||
|
column_chars_t.emplace_back('\0');
|
||||||
|
|
||||||
|
column_offsets.emplace_back(column_chars_t.size());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
fillColumnWithBooleanData(std::shared_ptr<arrow::Column> &arrow_column, MutableColumnPtr &internal_column)
|
||||||
|
{
|
||||||
|
auto &column_data = static_cast<ColumnVector<UInt8> &>(*internal_column).getData();
|
||||||
|
column_data.resize(arrow_column->length());
|
||||||
|
|
||||||
|
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks());
|
||||||
|
chunk_i < num_chunks; ++chunk_i)
|
||||||
|
{
|
||||||
|
arrow::BooleanArray &chunk = static_cast<arrow::BooleanArray &>(*(arrow_column->data()->chunk(
|
||||||
|
chunk_i)));
|
||||||
|
/// buffers[0] is a null bitmap and buffers[1] are actual values
|
||||||
|
std::shared_ptr<arrow::Buffer> buffer = chunk.data()->buffers[1];
|
||||||
|
|
||||||
|
for (size_t bool_i = 0; bool_i != static_cast<size_t>(chunk.length()); ++bool_i)
|
||||||
|
column_data[bool_i] = chunk.Value(bool_i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
fillColumnWithDate32Data(std::shared_ptr<arrow::Column> &arrow_column, MutableColumnPtr &internal_column)
|
||||||
|
{
|
||||||
|
PaddedPODArray<UInt16> &column_data = static_cast<ColumnVector<UInt16> &>(*internal_column).getData();
|
||||||
|
column_data.reserve(arrow_column->length());
|
||||||
|
|
||||||
|
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks());
|
||||||
|
chunk_i < num_chunks; ++chunk_i)
|
||||||
|
{
|
||||||
|
arrow::Date32Array &chunk = static_cast<arrow::Date32Array &>(*(arrow_column->data()->chunk(chunk_i)));
|
||||||
|
|
||||||
|
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
|
||||||
|
{
|
||||||
|
UInt32 days_num = static_cast<UInt32>(chunk.Value(value_i));
|
||||||
|
if (days_num > DATE_LUT_MAX_DAY_NUM)
|
||||||
|
{
|
||||||
|
// TODO: will it rollback correctly?
|
||||||
|
throw Exception
|
||||||
|
{
|
||||||
|
"Input value " + std::to_string(days_num) + " of a column \"" + arrow_column->name()
|
||||||
|
+ "\" is greater than "
|
||||||
|
"max allowed Date value, which is "
|
||||||
|
+ std::to_string(DATE_LUT_MAX_DAY_NUM),
|
||||||
|
ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE};
|
||||||
|
}
|
||||||
|
|
||||||
|
column_data.emplace_back(days_num);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
fillColumnWithDate64Data(std::shared_ptr<arrow::Column> &arrow_column, MutableColumnPtr &internal_column)
|
||||||
|
{
|
||||||
|
auto &column_data = static_cast<ColumnVector<UInt32> &>(*internal_column).getData();
|
||||||
|
column_data.reserve(arrow_column->length());
|
||||||
|
|
||||||
|
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks());
|
||||||
|
chunk_i < num_chunks; ++chunk_i)
|
||||||
|
{
|
||||||
|
auto &chunk = static_cast<arrow::Date64Array &>(*(arrow_column->data()->chunk(chunk_i)));
|
||||||
|
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
|
||||||
|
{
|
||||||
|
auto timestamp = static_cast<UInt32>(chunk.Value(value_i) / 1000); // Always? in ms
|
||||||
|
column_data.emplace_back(timestamp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
fillColumnWithTimestampData(std::shared_ptr<arrow::Column> &arrow_column, MutableColumnPtr &internal_column)
|
||||||
|
{
|
||||||
|
auto &column_data = static_cast<ColumnVector<UInt32> &>(*internal_column).getData();
|
||||||
|
column_data.reserve(arrow_column->length());
|
||||||
|
|
||||||
|
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks());
|
||||||
|
chunk_i < num_chunks; ++chunk_i)
|
||||||
|
{
|
||||||
|
auto &chunk = static_cast<arrow::TimestampArray &>(*(arrow_column->data()->chunk(chunk_i)));
|
||||||
|
const auto &type = static_cast<const ::arrow::TimestampType &>(*chunk.type());
|
||||||
|
|
||||||
|
UInt32 divide = 1;
|
||||||
|
const auto unit = type.unit();
|
||||||
|
switch (unit)
|
||||||
|
{
|
||||||
|
case arrow::TimeUnit::SECOND:
|
||||||
|
divide = 1;
|
||||||
|
break;
|
||||||
|
case arrow::TimeUnit::MILLI:
|
||||||
|
divide = 1000;
|
||||||
|
break;
|
||||||
|
case arrow::TimeUnit::MICRO:
|
||||||
|
divide = 1000000;
|
||||||
|
break;
|
||||||
|
case arrow::TimeUnit::NANO:
|
||||||
|
divide = 1000000000;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
|
||||||
|
{
|
||||||
|
auto timestamp = static_cast<UInt32>(chunk.Value(value_i) /
|
||||||
|
divide); // ms! TODO: check other 's' 'ns' ...
|
||||||
|
column_data.emplace_back(timestamp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void
|
||||||
|
fillColumnWithDecimalData(std::shared_ptr<arrow::Column> &arrow_column, MutableColumnPtr &internal_column)
|
||||||
|
{
|
||||||
|
auto &column = static_cast<ColumnDecimal<Decimal128> &>(*internal_column);
|
||||||
|
auto &column_data = column.getData();
|
||||||
|
column_data.reserve(arrow_column->length());
|
||||||
|
|
||||||
|
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks());
|
||||||
|
chunk_i < num_chunks; ++chunk_i)
|
||||||
|
{
|
||||||
|
auto &chunk = static_cast<arrow::DecimalArray &>(*(arrow_column->data()->chunk(chunk_i)));
|
||||||
|
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
|
||||||
|
{
|
||||||
|
column_data.emplace_back(
|
||||||
|
chunk.IsNull(value_i) ? Decimal128(0) : *reinterpret_cast<const Decimal128 *>(chunk.Value(
|
||||||
|
value_i))); // TODO: copy column
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a null bytemap from arrow's null bitmap
|
||||||
|
void
|
||||||
|
fillByteMapFromArrowColumn(std::shared_ptr<arrow::Column> &arrow_column, MutableColumnPtr &bytemap)
|
||||||
|
{
|
||||||
|
PaddedPODArray<UInt8> &bytemap_data = static_cast<ColumnVector<UInt8> &>(*bytemap).getData();
|
||||||
|
bytemap_data.reserve(arrow_column->length());
|
||||||
|
|
||||||
|
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->data()->num_chunks()); ++chunk_i)
|
||||||
|
{
|
||||||
|
std::shared_ptr<arrow::Array> chunk = arrow_column->data()->chunk(chunk_i);
|
||||||
|
|
||||||
|
for (size_t value_i = 0; value_i != static_cast<size_t>(chunk->length()); ++value_i)
|
||||||
|
bytemap_data.emplace_back(chunk->IsNull(value_i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void ArrowColumnToCHColumn::arrowTableToCHChunk(Chunk &res, std::shared_ptr<arrow::Table> &table,
|
||||||
|
arrow::Status &read_status, const Block &header,
|
||||||
|
int &row_group_current, const Context &context)
|
||||||
|
{
|
||||||
|
Columns columns_list;
|
||||||
|
UInt64 num_rows = 0;
|
||||||
|
|
||||||
|
columns_list.reserve(header.rows());
|
||||||
|
|
||||||
|
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::Column>>;
|
||||||
|
if (!read_status.ok())
|
||||||
|
throw Exception{"Error while reading ORC data: " + read_status.ToString(),
|
||||||
|
ErrorCodes::CANNOT_READ_ALL_DATA};
|
||||||
|
|
||||||
|
if (0 == table->num_rows())
|
||||||
|
throw Exception{"Empty table in input data", ErrorCodes::EMPTY_DATA_PASSED};
|
||||||
|
|
||||||
|
if (header.columns() > static_cast<size_t>(table->num_columns()))
|
||||||
|
// TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable?
|
||||||
|
throw Exception{"Number of columns is less than the table has", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH};
|
||||||
|
|
||||||
|
++row_group_current;
|
||||||
|
|
||||||
|
NameToColumnPtr name_to_column_ptr;
|
||||||
|
for (size_t i = 0, num_columns = static_cast<size_t>(table->num_columns()); i < num_columns; ++i)
|
||||||
|
{
|
||||||
|
std::shared_ptr<arrow::Column> arrow_column = table->column(i);
|
||||||
|
name_to_column_ptr[arrow_column->name()] = arrow_column;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (size_t column_i = 0, columns = header.columns(); column_i < columns; ++column_i)
|
||||||
|
{
|
||||||
|
ColumnWithTypeAndName header_column = header.getByPosition(column_i);
|
||||||
|
|
||||||
|
if (name_to_column_ptr.find(header_column.name) == name_to_column_ptr.end())
|
||||||
|
// TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable?
|
||||||
|
throw Exception{"Column \"" + header_column.name + "\" is not presented in input data",
|
||||||
|
ErrorCodes::THERE_IS_NO_COLUMN};
|
||||||
|
|
||||||
|
std::shared_ptr<arrow::Column> arrow_column = name_to_column_ptr[header_column.name];
|
||||||
|
arrow::Type::type arrow_type = arrow_column->type()->id();
|
||||||
|
|
||||||
|
// TODO: check if a column is const?
|
||||||
|
if (!header_column.type->isNullable() && arrow_column->null_count())
|
||||||
|
{
|
||||||
|
throw Exception{"Can not insert NULL data into non-nullable column \"" + header_column.name + "\"",
|
||||||
|
ErrorCodes::CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN};
|
||||||
|
}
|
||||||
|
|
||||||
|
const bool target_column_is_nullable = header_column.type->isNullable() || arrow_column->null_count();
|
||||||
|
|
||||||
|
DataTypePtr internal_nested_type;
|
||||||
|
|
||||||
|
if (arrow_type == arrow::Type::DECIMAL)
|
||||||
|
{
|
||||||
|
const auto decimal_type = static_cast<arrow::DecimalType *>(arrow_column->type().get());
|
||||||
|
internal_nested_type = std::make_shared<DataTypeDecimal<Decimal128>>(decimal_type->precision(),
|
||||||
|
decimal_type->scale());
|
||||||
|
} else if (arrow_type_to_internal_type.find(arrow_type) != arrow_type_to_internal_type.end())
|
||||||
|
{
|
||||||
|
internal_nested_type = arrow_type_to_internal_type.at(arrow_type);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
throw Exception
|
||||||
|
{
|
||||||
|
"The type \"" + arrow_column->type()->name() + "\" of an input column \"" + arrow_column->name()
|
||||||
|
+ "\" is not supported for conversion from a ORC data format",
|
||||||
|
ErrorCodes::CANNOT_CONVERT_TYPE};
|
||||||
|
}
|
||||||
|
|
||||||
|
const DataTypePtr internal_type = target_column_is_nullable ? makeNullable(internal_nested_type)
|
||||||
|
: internal_nested_type;
|
||||||
|
const std::string internal_nested_type_name = internal_nested_type->getName();
|
||||||
|
|
||||||
|
const DataTypePtr column_nested_type = header_column.type->isNullable()
|
||||||
|
? static_cast<const DataTypeNullable *>(header_column.type.get())->getNestedType()
|
||||||
|
: header_column.type;
|
||||||
|
|
||||||
|
const DataTypePtr column_type = header_column.type;
|
||||||
|
|
||||||
|
const std::string column_nested_type_name = column_nested_type->getName();
|
||||||
|
|
||||||
|
ColumnWithTypeAndName column;
|
||||||
|
column.name = header_column.name;
|
||||||
|
column.type = internal_type;
|
||||||
|
|
||||||
|
/// Data
|
||||||
|
MutableColumnPtr read_column = internal_nested_type->createColumn();
|
||||||
|
switch (arrow_type)
|
||||||
|
{
|
||||||
|
case arrow::Type::STRING:
|
||||||
|
case arrow::Type::BINARY:
|
||||||
|
//case arrow::Type::FIXED_SIZE_BINARY:
|
||||||
|
fillColumnWithStringData(arrow_column, read_column);
|
||||||
|
break;
|
||||||
|
case arrow::Type::BOOL:
|
||||||
|
fillColumnWithBooleanData(arrow_column, read_column);
|
||||||
|
break;
|
||||||
|
case arrow::Type::DATE32:
|
||||||
|
fillColumnWithDate32Data(arrow_column, read_column);
|
||||||
|
break;
|
||||||
|
case arrow::Type::DATE64:
|
||||||
|
fillColumnWithDate64Data(arrow_column, read_column);
|
||||||
|
break;
|
||||||
|
case arrow::Type::TIMESTAMP:
|
||||||
|
fillColumnWithTimestampData(arrow_column, read_column);
|
||||||
|
break;
|
||||||
|
case arrow::Type::DECIMAL:
|
||||||
|
//fillColumnWithNumericData<Decimal128, ColumnDecimal<Decimal128>>(arrow_column, read_column); // Have problems with trash values under NULL, but faster
|
||||||
|
fillColumnWithDecimalData(arrow_column, read_column /*, internal_nested_type*/);
|
||||||
|
|
||||||
|
break;
|
||||||
|
# define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
|
||||||
|
case ARROW_NUMERIC_TYPE: \
|
||||||
|
fillColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, read_column); \
|
||||||
|
break;
|
||||||
|
|
||||||
|
FOR_ARROW_NUMERIC_TYPES(DISPATCH)
|
||||||
|
# undef DISPATCH
|
||||||
|
// TODO: support TIMESTAMP_MICROS and TIMESTAMP_MILLIS with truncated micro- and milliseconds?
|
||||||
|
// TODO: read JSON as a string?
|
||||||
|
// TODO: read UUID as a string?
|
||||||
|
default:
|
||||||
|
throw Exception
|
||||||
|
{
|
||||||
|
"Unsupported ORC type \"" + arrow_column->type()->name() + "\" of an input column \""
|
||||||
|
+ arrow_column->name() + "\"",
|
||||||
|
ErrorCodes::UNKNOWN_TYPE
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (column.type->isNullable())
|
||||||
|
{
|
||||||
|
MutableColumnPtr null_bytemap = DataTypeUInt8().createColumn();
|
||||||
|
fillByteMapFromArrowColumn(arrow_column, null_bytemap);
|
||||||
|
column.column = ColumnNullable::create(std::move(read_column), std::move(null_bytemap));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
column.column = std::move(read_column);
|
||||||
|
|
||||||
|
column.column = castColumn(column, column_type, context);
|
||||||
|
column.type = column_type;
|
||||||
|
num_rows = column.column->size();
|
||||||
|
columns_list.push_back(std::move(column.column));
|
||||||
|
}
|
||||||
|
|
||||||
|
res.setColumns(columns_list, num_rows);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
45
dbms/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h
Normal file
45
dbms/src/Processors/Formats/Impl/ArrowColumnToCHColumn.h
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
#include "config_formats.h"
|
||||||
|
|
||||||
|
#if USE_ORC or USE_PARQUET
|
||||||
|
|
||||||
|
#include <DataTypes/IDataType.h>
|
||||||
|
#include <DataTypes/DataTypesNumber.h>
|
||||||
|
#include <DataTypes/DataTypeDate.h>
|
||||||
|
#include <DataTypes/DataTypeDateTime.h>
|
||||||
|
#include <DataTypes/DataTypeString.h>
|
||||||
|
#include <arrow/type.h>
|
||||||
|
#include <Columns/ColumnVector.h>
|
||||||
|
#include <arrow/table.h>
|
||||||
|
#include <arrow/array.h>
|
||||||
|
#include <arrow/buffer.h>
|
||||||
|
#include <Processors/Chunk.h>
|
||||||
|
#include <Core/Block.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
class ArrowColumnToCHColumn
|
||||||
|
{
|
||||||
|
private:
|
||||||
|
|
||||||
|
# define FOR_ARROW_NUMERIC_TYPES(M) \
|
||||||
|
M(arrow::Type::UINT8, DB::UInt8) \
|
||||||
|
M(arrow::Type::INT8, DB::Int8) \
|
||||||
|
M(arrow::Type::UINT16, DB::UInt16) \
|
||||||
|
M(arrow::Type::INT16, DB::Int16) \
|
||||||
|
M(arrow::Type::UINT32, DB::UInt32) \
|
||||||
|
M(arrow::Type::INT32, DB::Int32) \
|
||||||
|
M(arrow::Type::UINT64, DB::UInt64) \
|
||||||
|
M(arrow::Type::INT64, DB::Int64) \
|
||||||
|
M(arrow::Type::FLOAT, DB::Float32) \
|
||||||
|
M(arrow::Type::DOUBLE, DB::Float64)
|
||||||
|
|
||||||
|
|
||||||
|
public:
|
||||||
|
|
||||||
|
static void arrowTableToCHChunk(Chunk &res, std::shared_ptr<arrow::Table> &table,
|
||||||
|
arrow::Status &read_status, const Block &header,
|
||||||
|
int &row_group_current, const Context &context);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
#endif
|
90
dbms/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp
Normal file
90
dbms/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
#include "ORCBlockInputFormat.h"
|
||||||
|
#if USE_ORC
|
||||||
|
|
||||||
|
#include <Formats/FormatFactory.h>
|
||||||
|
#include <IO/BufferBase.h>
|
||||||
|
#include <IO/ReadBufferFromMemory.h>
|
||||||
|
#include <IO/WriteBufferFromString.h>
|
||||||
|
#include <IO/WriteHelpers.h>
|
||||||
|
#include <IO/copyData.h>
|
||||||
|
#include <arrow/io/memory.h>
|
||||||
|
#include "ArrowColumnToCHColumn.h"
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer &in_, Block header_, const Context &context_)
|
||||||
|
: IInputFormat(std::move(header_), in_), context{context_} {
|
||||||
|
}
|
||||||
|
|
||||||
|
Chunk ORCBlockInputFormat::generate()
|
||||||
|
{
|
||||||
|
Chunk res;
|
||||||
|
|
||||||
|
auto &header = getPort().getHeader();
|
||||||
|
|
||||||
|
if (!in.eof())
|
||||||
|
{
|
||||||
|
if (row_group_current < row_group_total)
|
||||||
|
throw Exception{"Got new data, but data from previous chunks was not read " +
|
||||||
|
std::to_string(row_group_current) + "/" + std::to_string(row_group_total),
|
||||||
|
ErrorCodes::CANNOT_READ_ALL_DATA};
|
||||||
|
|
||||||
|
file_data.clear();
|
||||||
|
{
|
||||||
|
WriteBufferFromString file_buffer(file_data);
|
||||||
|
copyData(in, file_buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<arrow::Buffer> local_buffer = std::make_unique<arrow::Buffer>(file_data);
|
||||||
|
|
||||||
|
|
||||||
|
std::shared_ptr<arrow::io::RandomAccessFile> in_stream(new arrow::io::BufferReader(*local_buffer));
|
||||||
|
|
||||||
|
bool ok = arrow::adapters::orc::ORCFileReader::Open(in_stream, arrow::default_memory_pool(),
|
||||||
|
&file_reader).ok();
|
||||||
|
if (!ok)
|
||||||
|
return res;
|
||||||
|
|
||||||
|
row_group_total = file_reader->NumberOfRows();
|
||||||
|
row_group_current = 0;
|
||||||
|
|
||||||
|
} else
|
||||||
|
return res;
|
||||||
|
|
||||||
|
if (row_group_current >= row_group_total)
|
||||||
|
return res;
|
||||||
|
std::shared_ptr<arrow::Table> table;
|
||||||
|
|
||||||
|
arrow::Status read_status = file_reader->Read(&table);
|
||||||
|
|
||||||
|
ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, read_status, header, row_group_current, context);
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
void registerInputFormatProcessorORC(FormatFactory &factory)
|
||||||
|
{
|
||||||
|
factory.registerInputFormatProcessor(
|
||||||
|
"ORC",
|
||||||
|
[](ReadBuffer &buf,
|
||||||
|
const Block &sample,
|
||||||
|
const Context &context,
|
||||||
|
const RowInputFormatParams &,
|
||||||
|
const FormatSettings & /* settings */)
|
||||||
|
{
|
||||||
|
return std::make_shared<ORCBlockInputFormat>(buf, sample, context);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
class FormatFactory;
|
||||||
|
void registerInputFormatProcessorORC(FormatFactory &)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
40
dbms/src/Processors/Formats/Impl/ORCBlockInputFormat.h
Normal file
40
dbms/src/Processors/Formats/Impl/ORCBlockInputFormat.h
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "config_formats.h"
|
||||||
|
#include <DataStreams/IBlockInputStream.h>
|
||||||
|
#include <Processors/Chunk.h>
|
||||||
|
#include <Processors/Formats/IInputFormat.h>
|
||||||
|
|
||||||
|
#if USE_ORC
|
||||||
|
|
||||||
|
#include "arrow/adapters/orc/adapter.h"
|
||||||
|
#include "arrow/io/interfaces.h"
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
class Context;
|
||||||
|
|
||||||
|
class ORCBlockInputFormat: public IInputFormat
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
ORCBlockInputFormat(ReadBuffer & in_, Block header_, const Context & context_);
|
||||||
|
|
||||||
|
String getName() const override { return "ORCBlockInputFormat"; }
|
||||||
|
|
||||||
|
protected:
|
||||||
|
Chunk generate() override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
|
||||||
|
// TODO: check that this class implements every part of its parent
|
||||||
|
|
||||||
|
const Context & context;
|
||||||
|
|
||||||
|
std::unique_ptr<arrow::adapters::orc::ORCFileReader> file_reader;
|
||||||
|
std::string file_data;
|
||||||
|
int row_group_total = 0;
|
||||||
|
int row_group_current = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
#endif
|
@ -1,301 +1,28 @@
|
|||||||
#include "ParquetBlockInputFormat.h"
|
#include "ParquetBlockInputFormat.h"
|
||||||
#if USE_PARQUET
|
#if USE_PARQUET
|
||||||
|
|
||||||
#include <algorithm>
|
|
||||||
#include <iterator>
|
|
||||||
#include <vector>
|
|
||||||
// TODO: clear includes
|
|
||||||
#include <Columns/ColumnNullable.h>
|
|
||||||
#include <Columns/ColumnString.h>
|
|
||||||
#include <Columns/ColumnsNumber.h>
|
|
||||||
#include <Columns/IColumn.h>
|
|
||||||
#include <Core/ColumnWithTypeAndName.h>
|
|
||||||
#include <DataTypes/DataTypeDate.h>
|
|
||||||
#include <DataTypes/DataTypeDateTime.h>
|
|
||||||
#include <DataTypes/DataTypeFactory.h>
|
|
||||||
#include <DataTypes/DataTypeNullable.h>
|
|
||||||
#include <DataTypes/DataTypeString.h>
|
|
||||||
#include <DataTypes/DataTypesDecimal.h>
|
|
||||||
#include <DataTypes/DataTypesNumber.h>
|
|
||||||
#include <Formats/FormatFactory.h>
|
#include <Formats/FormatFactory.h>
|
||||||
#include <IO/BufferBase.h>
|
#include <IO/BufferBase.h>
|
||||||
#include <IO/ReadBufferFromMemory.h>
|
#include <IO/ReadBufferFromMemory.h>
|
||||||
#include <IO/WriteBufferFromString.h>
|
#include <IO/WriteBufferFromString.h>
|
||||||
#include <IO/WriteHelpers.h>
|
#include <IO/WriteHelpers.h>
|
||||||
#include <IO/copyData.h>
|
#include <IO/copyData.h>
|
||||||
#include <Interpreters/castColumn.h>
|
|
||||||
#include <common/DateLUTImpl.h>
|
|
||||||
#include <ext/range.h>
|
|
||||||
#include <arrow/api.h>
|
#include <arrow/api.h>
|
||||||
#include <parquet/arrow/reader.h>
|
#include <parquet/arrow/reader.h>
|
||||||
#include <parquet/file_reader.h>
|
#include <parquet/file_reader.h>
|
||||||
|
#include "ArrowColumnToCHColumn.h"
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
namespace ErrorCodes
|
|
||||||
{
|
|
||||||
extern const int UNKNOWN_TYPE;
|
|
||||||
extern const int VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE;
|
|
||||||
extern const int CANNOT_READ_ALL_DATA;
|
|
||||||
extern const int EMPTY_DATA_PASSED;
|
|
||||||
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
|
|
||||||
extern const int CANNOT_CONVERT_TYPE;
|
|
||||||
extern const int CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN;
|
|
||||||
extern const int THERE_IS_NO_COLUMN;
|
|
||||||
}
|
|
||||||
|
|
||||||
ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_, const Context & context_)
|
ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer &in_, Block header_, const Context &context_)
|
||||||
: IInputFormat(std::move(header_), in_), context{context_}
|
: IInputFormat(std::move(header_), in_), context{context_} {
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Inserts numeric data right into internal column data to reduce an overhead
|
|
||||||
template <typename NumericType, typename VectorType = ColumnVector<NumericType>>
|
|
||||||
static void fillColumnWithNumericData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
|
|
||||||
{
|
|
||||||
auto & column_data = static_cast<VectorType &>(*internal_column).getData();
|
|
||||||
column_data.reserve(arrow_column->length());
|
|
||||||
|
|
||||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
|
||||||
{
|
|
||||||
std::shared_ptr<arrow::Array> chunk = arrow_column->data()->chunk(chunk_i);
|
|
||||||
/// buffers[0] is a null bitmap and buffers[1] are actual values
|
|
||||||
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
|
|
||||||
|
|
||||||
const auto * raw_data = reinterpret_cast<const NumericType *>(buffer->data());
|
|
||||||
column_data.insert_assume_reserved(raw_data, raw_data + chunk->length());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Inserts chars and offsets right into internal column data to reduce an overhead.
|
|
||||||
/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars.
|
|
||||||
/// Also internal strings are null terminated.
|
|
||||||
static void fillColumnWithStringData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
|
|
||||||
{
|
|
||||||
PaddedPODArray<UInt8> & column_chars_t = static_cast<ColumnString &>(*internal_column).getChars();
|
|
||||||
PaddedPODArray<UInt64> & column_offsets = static_cast<ColumnString &>(*internal_column).getOffsets();
|
|
||||||
|
|
||||||
size_t chars_t_size = 0;
|
|
||||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
|
||||||
{
|
|
||||||
arrow::BinaryArray & chunk = static_cast<arrow::BinaryArray &>(*(arrow_column->data()->chunk(chunk_i)));
|
|
||||||
const size_t chunk_length = chunk.length();
|
|
||||||
|
|
||||||
chars_t_size += chunk.value_offset(chunk_length - 1) + chunk.value_length(chunk_length - 1);
|
|
||||||
chars_t_size += chunk_length; /// additional space for null bytes
|
|
||||||
}
|
}
|
||||||
|
|
||||||
column_chars_t.reserve(chars_t_size);
|
Chunk ParquetBlockInputFormat::generate()
|
||||||
column_offsets.reserve(arrow_column->length());
|
|
||||||
|
|
||||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
|
||||||
{
|
{
|
||||||
arrow::BinaryArray & chunk = static_cast<arrow::BinaryArray &>(*(arrow_column->data()->chunk(chunk_i)));
|
|
||||||
std::shared_ptr<arrow::Buffer> buffer = chunk.value_data();
|
|
||||||
const size_t chunk_length = chunk.length();
|
|
||||||
|
|
||||||
for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i)
|
|
||||||
{
|
|
||||||
if (!chunk.IsNull(offset_i) && buffer)
|
|
||||||
{
|
|
||||||
const UInt8 * raw_data = buffer->data() + chunk.value_offset(offset_i);
|
|
||||||
column_chars_t.insert_assume_reserved(raw_data, raw_data + chunk.value_length(offset_i));
|
|
||||||
}
|
|
||||||
column_chars_t.emplace_back('\0');
|
|
||||||
|
|
||||||
column_offsets.emplace_back(column_chars_t.size());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void fillColumnWithBooleanData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
|
|
||||||
{
|
|
||||||
auto & column_data = static_cast<ColumnVector<UInt8> &>(*internal_column).getData();
|
|
||||||
column_data.reserve(arrow_column->length());
|
|
||||||
|
|
||||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
|
||||||
{
|
|
||||||
arrow::BooleanArray & chunk = static_cast<arrow::BooleanArray &>(*(arrow_column->data()->chunk(chunk_i)));
|
|
||||||
/// buffers[0] is a null bitmap and buffers[1] are actual values
|
|
||||||
std::shared_ptr<arrow::Buffer> buffer = chunk.data()->buffers[1];
|
|
||||||
|
|
||||||
for (size_t bool_i = 0; bool_i != static_cast<size_t>(chunk.length()); ++bool_i)
|
|
||||||
column_data.emplace_back(chunk.Value(bool_i));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Arrow stores Parquet::DATE in Int32, while ClickHouse stores Date in UInt16. Therefore, it should be checked before saving
|
|
||||||
static void fillColumnWithDate32Data(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
|
|
||||||
{
|
|
||||||
PaddedPODArray<UInt16> & column_data = static_cast<ColumnVector<UInt16> &>(*internal_column).getData();
|
|
||||||
column_data.reserve(arrow_column->length());
|
|
||||||
|
|
||||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
|
||||||
{
|
|
||||||
arrow::Date32Array & chunk = static_cast<arrow::Date32Array &>(*(arrow_column->data()->chunk(chunk_i)));
|
|
||||||
|
|
||||||
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
|
|
||||||
{
|
|
||||||
UInt32 days_num = static_cast<UInt32>(chunk.Value(value_i));
|
|
||||||
if (days_num > DATE_LUT_MAX_DAY_NUM)
|
|
||||||
{
|
|
||||||
// TODO: will it rollback correctly?
|
|
||||||
throw Exception{"Input value " + std::to_string(days_num) + " of a column \"" + arrow_column->name()
|
|
||||||
+ "\" is greater than "
|
|
||||||
"max allowed Date value, which is "
|
|
||||||
+ std::to_string(DATE_LUT_MAX_DAY_NUM),
|
|
||||||
ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE};
|
|
||||||
}
|
|
||||||
|
|
||||||
column_data.emplace_back(days_num);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Arrow stores Parquet::DATETIME in Int64, while ClickHouse stores DateTime in UInt32. Therefore, it should be checked before saving
|
|
||||||
static void fillColumnWithDate64Data(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
|
|
||||||
{
|
|
||||||
auto & column_data = static_cast<ColumnVector<UInt32> &>(*internal_column).getData();
|
|
||||||
column_data.reserve(arrow_column->length());
|
|
||||||
|
|
||||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
|
||||||
{
|
|
||||||
auto & chunk = static_cast<arrow::Date64Array &>(*(arrow_column->data()->chunk(chunk_i)));
|
|
||||||
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
|
|
||||||
{
|
|
||||||
auto timestamp = static_cast<UInt32>(chunk.Value(value_i) / 1000); // Always? in ms
|
|
||||||
column_data.emplace_back(timestamp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void fillColumnWithTimestampData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
|
|
||||||
{
|
|
||||||
auto & column_data = static_cast<ColumnVector<UInt32> &>(*internal_column).getData();
|
|
||||||
column_data.reserve(arrow_column->length());
|
|
||||||
|
|
||||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
|
||||||
{
|
|
||||||
auto & chunk = static_cast<arrow::TimestampArray &>(*(arrow_column->data()->chunk(chunk_i)));
|
|
||||||
const auto & type = static_cast<const ::arrow::TimestampType &>(*chunk.type());
|
|
||||||
|
|
||||||
UInt32 divide = 1;
|
|
||||||
const auto unit = type.unit();
|
|
||||||
switch (unit)
|
|
||||||
{
|
|
||||||
case arrow::TimeUnit::SECOND:
|
|
||||||
divide = 1;
|
|
||||||
break;
|
|
||||||
case arrow::TimeUnit::MILLI:
|
|
||||||
divide = 1000;
|
|
||||||
break;
|
|
||||||
case arrow::TimeUnit::MICRO:
|
|
||||||
divide = 1000000;
|
|
||||||
break;
|
|
||||||
case arrow::TimeUnit::NANO:
|
|
||||||
divide = 1000000000;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
|
|
||||||
{
|
|
||||||
auto timestamp = static_cast<UInt32>(chunk.Value(value_i) / divide); // ms! TODO: check other 's' 'ns' ...
|
|
||||||
column_data.emplace_back(timestamp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void fillColumnWithDecimalData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
|
|
||||||
{
|
|
||||||
auto & column = static_cast<ColumnDecimal<Decimal128> &>(*internal_column);
|
|
||||||
auto & column_data = column.getData();
|
|
||||||
column_data.reserve(arrow_column->length());
|
|
||||||
|
|
||||||
for (size_t chunk_i = 0, num_chunks = static_cast<size_t>(arrow_column->data()->num_chunks()); chunk_i < num_chunks; ++chunk_i)
|
|
||||||
{
|
|
||||||
auto & chunk = static_cast<arrow::DecimalArray &>(*(arrow_column->data()->chunk(chunk_i)));
|
|
||||||
for (size_t value_i = 0, length = static_cast<size_t>(chunk.length()); value_i < length; ++value_i)
|
|
||||||
{
|
|
||||||
column_data.emplace_back(chunk.IsNull(value_i) ? Decimal128(0) : *reinterpret_cast<const Decimal128 *>(chunk.Value(value_i))); // TODO: copy column
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates a null bytemap from arrow's null bitmap
|
|
||||||
static void fillByteMapFromArrowColumn(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & bytemap)
|
|
||||||
{
|
|
||||||
PaddedPODArray<UInt8> & bytemap_data = static_cast<ColumnVector<UInt8> &>(*bytemap).getData();
|
|
||||||
bytemap_data.reserve(arrow_column->length());
|
|
||||||
|
|
||||||
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->data()->num_chunks()); ++chunk_i)
|
|
||||||
{
|
|
||||||
std::shared_ptr<arrow::Array> chunk = arrow_column->data()->chunk(chunk_i);
|
|
||||||
|
|
||||||
for (size_t value_i = 0; value_i != static_cast<size_t>(chunk->length()); ++value_i)
|
|
||||||
bytemap_data.emplace_back(chunk->IsNull(value_i));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
# define FOR_ARROW_NUMERIC_TYPES(M) \
|
|
||||||
M(arrow::Type::UINT8, UInt8) \
|
|
||||||
M(arrow::Type::INT8, Int8) \
|
|
||||||
M(arrow::Type::UINT16, UInt16) \
|
|
||||||
M(arrow::Type::INT16, Int16) \
|
|
||||||
M(arrow::Type::UINT32, UInt32) \
|
|
||||||
M(arrow::Type::INT32, Int32) \
|
|
||||||
M(arrow::Type::UINT64, UInt64) \
|
|
||||||
M(arrow::Type::INT64, Int64) \
|
|
||||||
M(arrow::Type::FLOAT, Float32) \
|
|
||||||
M(arrow::Type::DOUBLE, Float64)
|
|
||||||
//M(arrow::Type::HALF_FLOAT, Float32) // TODO
|
|
||||||
|
|
||||||
|
|
||||||
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::Column>>;
|
|
||||||
|
|
||||||
const std::unordered_map<arrow::Type::type, std::shared_ptr<IDataType>> arrow_type_to_internal_type = {
|
|
||||||
//{arrow::Type::DECIMAL, std::make_shared<DataTypeDecimal>()},
|
|
||||||
{arrow::Type::UINT8, std::make_shared<DataTypeUInt8>()},
|
|
||||||
{arrow::Type::INT8, std::make_shared<DataTypeInt8>()},
|
|
||||||
{arrow::Type::UINT16, std::make_shared<DataTypeUInt16>()},
|
|
||||||
{arrow::Type::INT16, std::make_shared<DataTypeInt16>()},
|
|
||||||
{arrow::Type::UINT32, std::make_shared<DataTypeUInt32>()},
|
|
||||||
{arrow::Type::INT32, std::make_shared<DataTypeInt32>()},
|
|
||||||
{arrow::Type::UINT64, std::make_shared<DataTypeUInt64>()},
|
|
||||||
{arrow::Type::INT64, std::make_shared<DataTypeInt64>()},
|
|
||||||
{arrow::Type::HALF_FLOAT, std::make_shared<DataTypeFloat32>()},
|
|
||||||
{arrow::Type::FLOAT, std::make_shared<DataTypeFloat32>()},
|
|
||||||
{arrow::Type::DOUBLE, std::make_shared<DataTypeFloat64>()},
|
|
||||||
|
|
||||||
{arrow::Type::BOOL, std::make_shared<DataTypeUInt8>()},
|
|
||||||
//{arrow::Type::DATE32, std::make_shared<DataTypeDate>()},
|
|
||||||
{arrow::Type::DATE32, std::make_shared<DataTypeDate>()},
|
|
||||||
//{arrow::Type::DATE32, std::make_shared<DataTypeDateTime>()},
|
|
||||||
{arrow::Type::DATE64, std::make_shared<DataTypeDateTime>()},
|
|
||||||
{arrow::Type::TIMESTAMP, std::make_shared<DataTypeDateTime>()},
|
|
||||||
//{arrow::Type::TIME32, std::make_shared<DataTypeDateTime>()},
|
|
||||||
|
|
||||||
|
|
||||||
{arrow::Type::STRING, std::make_shared<DataTypeString>()},
|
|
||||||
{arrow::Type::BINARY, std::make_shared<DataTypeString>()},
|
|
||||||
//{arrow::Type::FIXED_SIZE_BINARY, std::make_shared<DataTypeString>()},
|
|
||||||
//{arrow::Type::UUID, std::make_shared<DataTypeString>()},
|
|
||||||
|
|
||||||
|
|
||||||
// TODO: add other types that are convertable to internal ones:
|
|
||||||
// 0. ENUM?
|
|
||||||
// 1. UUID -> String
|
|
||||||
// 2. JSON -> String
|
|
||||||
// Full list of types: contrib/arrow/cpp/src/arrow/type.h
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
Chunk ParquetBlockInputFormat::generate()
|
|
||||||
{
|
|
||||||
Chunk res;
|
Chunk res;
|
||||||
Columns columns_list;
|
auto &header = getPort().getHeader();
|
||||||
UInt64 num_rows = 0;
|
|
||||||
auto & header = getPort().getHeader();
|
|
||||||
columns_list.reserve(header.rows());
|
|
||||||
|
|
||||||
if (!in.eof())
|
if (!in.eof())
|
||||||
{
|
{
|
||||||
@ -305,7 +32,9 @@ Chunk ParquetBlockInputFormat::generate()
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
if (row_group_current < row_group_total)
|
if (row_group_current < row_group_total)
|
||||||
throw Exception{"Got new data, but data from previous chunks not readed " + std::to_string(row_group_current) + "/" + std::to_string(row_group_total), ErrorCodes::CANNOT_READ_ALL_DATA};
|
throw Exception{"Got new data, but data from previous chunks was not read " +
|
||||||
|
std::to_string(row_group_current) + "/" + std::to_string(row_group_total),
|
||||||
|
ErrorCodes::CANNOT_READ_ALL_DATA};
|
||||||
|
|
||||||
file_data.clear();
|
file_data.clear();
|
||||||
{
|
{
|
||||||
@ -316,7 +45,8 @@ Chunk ParquetBlockInputFormat::generate()
|
|||||||
buffer = std::make_unique<arrow::Buffer>(file_data);
|
buffer = std::make_unique<arrow::Buffer>(file_data);
|
||||||
// TODO: maybe use parquet::RandomAccessSource?
|
// TODO: maybe use parquet::RandomAccessSource?
|
||||||
auto reader = parquet::ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(*buffer));
|
auto reader = parquet::ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(*buffer));
|
||||||
file_reader = std::make_unique<parquet::arrow::FileReader>(::arrow::default_memory_pool(), std::move(reader));
|
file_reader = std::make_unique<parquet::arrow::FileReader>(::arrow::default_memory_pool(),
|
||||||
|
std::move(reader));
|
||||||
row_group_total = file_reader->num_row_groups();
|
row_group_total = file_reader->num_row_groups();
|
||||||
row_group_current = 0;
|
row_group_current = 0;
|
||||||
}
|
}
|
||||||
@ -329,150 +59,23 @@ Chunk ParquetBlockInputFormat::generate()
|
|||||||
std::shared_ptr<arrow::Table> table;
|
std::shared_ptr<arrow::Table> table;
|
||||||
arrow::Status read_status = file_reader->ReadRowGroup(row_group_current, &table);
|
arrow::Status read_status = file_reader->ReadRowGroup(row_group_current, &table);
|
||||||
|
|
||||||
if (!read_status.ok())
|
ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, read_status, header, row_group_current, context);
|
||||||
throw Exception{"Error while reading parquet data: " + read_status.ToString(), ErrorCodes::CANNOT_READ_ALL_DATA};
|
|
||||||
|
|
||||||
if (0 == table->num_rows())
|
|
||||||
throw Exception{"Empty table in input data", ErrorCodes::EMPTY_DATA_PASSED};
|
|
||||||
|
|
||||||
if (header.columns() > static_cast<size_t>(table->num_columns()))
|
|
||||||
// TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable?
|
|
||||||
throw Exception{"Number of columns is less than the table has", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH};
|
|
||||||
|
|
||||||
++row_group_current;
|
|
||||||
|
|
||||||
NameToColumnPtr name_to_column_ptr;
|
|
||||||
for (size_t i = 0, num_columns = static_cast<size_t>(table->num_columns()); i < num_columns; ++i)
|
|
||||||
{
|
|
||||||
std::shared_ptr<arrow::Column> arrow_column = table->column(i);
|
|
||||||
name_to_column_ptr[arrow_column->name()] = arrow_column;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (size_t column_i = 0, columns = header.columns(); column_i < columns; ++column_i)
|
|
||||||
{
|
|
||||||
ColumnWithTypeAndName header_column = header.getByPosition(column_i);
|
|
||||||
|
|
||||||
if (name_to_column_ptr.find(header_column.name) == name_to_column_ptr.end())
|
|
||||||
// TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable?
|
|
||||||
throw Exception{"Column \"" + header_column.name + "\" is not presented in input data", ErrorCodes::THERE_IS_NO_COLUMN};
|
|
||||||
|
|
||||||
std::shared_ptr<arrow::Column> arrow_column = name_to_column_ptr[header_column.name];
|
|
||||||
arrow::Type::type arrow_type = arrow_column->type()->id();
|
|
||||||
|
|
||||||
// TODO: check if a column is const?
|
|
||||||
if (!header_column.type->isNullable() && arrow_column->null_count())
|
|
||||||
{
|
|
||||||
throw Exception{"Can not insert NULL data into non-nullable column \"" + header_column.name + "\"",
|
|
||||||
ErrorCodes::CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN};
|
|
||||||
}
|
|
||||||
|
|
||||||
const bool target_column_is_nullable = header_column.type->isNullable() || arrow_column->null_count();
|
|
||||||
|
|
||||||
DataTypePtr internal_nested_type;
|
|
||||||
|
|
||||||
if (arrow_type == arrow::Type::DECIMAL)
|
|
||||||
{
|
|
||||||
const auto decimal_type = static_cast<arrow::DecimalType *>(arrow_column->type().get());
|
|
||||||
internal_nested_type = std::make_shared<DataTypeDecimal<Decimal128>>(decimal_type->precision(), decimal_type->scale());
|
|
||||||
}
|
|
||||||
else if (arrow_type_to_internal_type.find(arrow_type) != arrow_type_to_internal_type.end())
|
|
||||||
{
|
|
||||||
internal_nested_type = arrow_type_to_internal_type.at(arrow_type);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
throw Exception{"The type \"" + arrow_column->type()->name() + "\" of an input column \"" + arrow_column->name()
|
|
||||||
+ "\" is not supported for conversion from a Parquet data format",
|
|
||||||
ErrorCodes::CANNOT_CONVERT_TYPE};
|
|
||||||
}
|
|
||||||
|
|
||||||
const DataTypePtr internal_type = target_column_is_nullable ? makeNullable(internal_nested_type) : internal_nested_type;
|
|
||||||
const std::string internal_nested_type_name = internal_nested_type->getName();
|
|
||||||
|
|
||||||
const DataTypePtr column_nested_type = header_column.type->isNullable()
|
|
||||||
? static_cast<const DataTypeNullable *>(header_column.type.get())->getNestedType()
|
|
||||||
: header_column.type;
|
|
||||||
|
|
||||||
const DataTypePtr column_type = header_column.type;
|
|
||||||
|
|
||||||
const std::string column_nested_type_name = column_nested_type->getName();
|
|
||||||
|
|
||||||
ColumnWithTypeAndName column;
|
|
||||||
column.name = header_column.name;
|
|
||||||
column.type = internal_type;
|
|
||||||
|
|
||||||
/// Data
|
|
||||||
MutableColumnPtr read_column = internal_nested_type->createColumn();
|
|
||||||
switch (arrow_type)
|
|
||||||
{
|
|
||||||
case arrow::Type::STRING:
|
|
||||||
case arrow::Type::BINARY:
|
|
||||||
//case arrow::Type::FIXED_SIZE_BINARY:
|
|
||||||
fillColumnWithStringData(arrow_column, read_column);
|
|
||||||
break;
|
|
||||||
case arrow::Type::BOOL:
|
|
||||||
fillColumnWithBooleanData(arrow_column, read_column);
|
|
||||||
break;
|
|
||||||
case arrow::Type::DATE32:
|
|
||||||
fillColumnWithDate32Data(arrow_column, read_column);
|
|
||||||
break;
|
|
||||||
case arrow::Type::DATE64:
|
|
||||||
fillColumnWithDate64Data(arrow_column, read_column);
|
|
||||||
break;
|
|
||||||
case arrow::Type::TIMESTAMP:
|
|
||||||
fillColumnWithTimestampData(arrow_column, read_column);
|
|
||||||
break;
|
|
||||||
case arrow::Type::DECIMAL:
|
|
||||||
//fillColumnWithNumericData<Decimal128, ColumnDecimal<Decimal128>>(arrow_column, read_column); // Have problems with trash values under NULL, but faster
|
|
||||||
fillColumnWithDecimalData(arrow_column, read_column /*, internal_nested_type*/);
|
|
||||||
break;
|
|
||||||
# define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
|
|
||||||
case ARROW_NUMERIC_TYPE: \
|
|
||||||
fillColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, read_column); \
|
|
||||||
break;
|
|
||||||
|
|
||||||
FOR_ARROW_NUMERIC_TYPES(DISPATCH)
|
|
||||||
# undef DISPATCH
|
|
||||||
// TODO: support TIMESTAMP_MICROS and TIMESTAMP_MILLIS with truncated micro- and milliseconds?
|
|
||||||
// TODO: read JSON as a string?
|
|
||||||
// TODO: read UUID as a string?
|
|
||||||
default:
|
|
||||||
throw Exception{"Unsupported parquet type \"" + arrow_column->type()->name() + "\" of an input column \""
|
|
||||||
+ arrow_column->name() + "\"",
|
|
||||||
ErrorCodes::UNKNOWN_TYPE};
|
|
||||||
}
|
|
||||||
|
|
||||||
if (column.type->isNullable())
|
|
||||||
{
|
|
||||||
MutableColumnPtr null_bytemap = DataTypeUInt8().createColumn();
|
|
||||||
fillByteMapFromArrowColumn(arrow_column, null_bytemap);
|
|
||||||
column.column = ColumnNullable::create(std::move(read_column), std::move(null_bytemap));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
column.column = std::move(read_column);
|
|
||||||
}
|
|
||||||
|
|
||||||
column.column = castColumn(column, column_type, context);
|
|
||||||
column.type = column_type;
|
|
||||||
num_rows = column.column->size();
|
|
||||||
columns_list.push_back(std::move(column.column));
|
|
||||||
}
|
|
||||||
|
|
||||||
res.setColumns(columns_list, num_rows);
|
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
void registerInputFormatProcessorParquet(FormatFactory & factory)
|
void registerInputFormatProcessorParquet(FormatFactory &factory)
|
||||||
{
|
{
|
||||||
factory.registerInputFormatProcessor(
|
factory.registerInputFormatProcessor(
|
||||||
"Parquet",
|
"Parquet",
|
||||||
[](ReadBuffer & buf,
|
[](ReadBuffer &buf,
|
||||||
const Block & sample,
|
const Block &sample,
|
||||||
const Context & context,
|
const Context &context,
|
||||||
const RowInputFormatParams &,
|
const RowInputFormatParams &,
|
||||||
const FormatSettings & /* settings */){ return std::make_shared<ParquetBlockInputFormat>(buf, sample, context); });
|
const FormatSettings & /* settings */)
|
||||||
}
|
{
|
||||||
|
return std::make_shared<ParquetBlockInputFormat>(buf, sample, context);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user