diff --git a/.gitmodules b/.gitmodules index cc89b4cca31..a3ce6c2f5dc 100644 --- a/.gitmodules +++ b/.gitmodules @@ -67,3 +67,6 @@ [submodule "contrib/brotli"] path = contrib/brotli url = https://github.com/google/brotli.git +[submodule "contrib/arrow"] + path = contrib/arrow + url = https://github.com/apache/arrow diff --git a/CMakeLists.txt b/CMakeLists.txt index 1041a59b79f..1d211d50135 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -261,6 +261,7 @@ include (cmake/find_base64.cmake) if (ENABLE_TESTS) include (cmake/find_gtest.cmake) endif () +include (cmake/find_parquet.cmake) include (cmake/find_contrib_lib.cmake) find_contrib_lib(cityhash) diff --git a/cmake/Modules/FindArrow.cmake b/cmake/Modules/FindArrow.cmake new file mode 100644 index 00000000000..4043a474988 --- /dev/null +++ b/cmake/Modules/FindArrow.cmake @@ -0,0 +1,159 @@ +# https://github.com/apache/arrow/blob/master/cpp/cmake_modules/FindArrow.cmake + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# - Find ARROW (arrow/api.h, libarrow.a, libarrow.so) +# This module defines +# ARROW_INCLUDE_DIR, directory containing headers +# ARROW_LIBS, directory containing arrow libraries +# ARROW_STATIC_LIB, path to libarrow.a +# ARROW_SHARED_LIB, path to libarrow's shared library +# ARROW_SHARED_IMP_LIB, path to libarrow's import library (MSVC only) +# ARROW_FOUND, whether arrow has been found + +include(FindPkgConfig) +include(GNUInstallDirs) + +if ("$ENV{ARROW_HOME}" STREQUAL "") + pkg_check_modules(ARROW arrow) + if (ARROW_FOUND) + pkg_get_variable(ARROW_SO_VERSION arrow so_version) + set(ARROW_ABI_VERSION ${ARROW_SO_VERSION}) + message(STATUS "Arrow SO and ABI version: ${ARROW_SO_VERSION}") + pkg_get_variable(ARROW_FULL_SO_VERSION arrow full_so_version) + message(STATUS "Arrow full SO version: ${ARROW_FULL_SO_VERSION}") + if ("${ARROW_INCLUDE_DIRS}" STREQUAL "") + set(ARROW_INCLUDE_DIRS "/usr/${CMAKE_INSTALL_INCLUDEDIR}") + endif() + if ("${ARROW_LIBRARY_DIRS}" STREQUAL "") + set(ARROW_LIBRARY_DIRS "/usr/${CMAKE_INSTALL_LIBDIR}") + if (EXISTS "/etc/debian_version" AND CMAKE_LIBRARY_ARCHITECTURE) + set(ARROW_LIBRARY_DIRS + "${ARROW_LIBRARY_DIRS}/${CMAKE_LIBRARY_ARCHITECTURE}") + endif() + endif() + set(ARROW_INCLUDE_DIR ${ARROW_INCLUDE_DIRS}) + set(ARROW_LIBS ${ARROW_LIBRARY_DIRS}) + set(ARROW_SEARCH_LIB_PATH ${ARROW_LIBRARY_DIRS}) + endif() +else() + set(ARROW_HOME "$ENV{ARROW_HOME}") + + set(ARROW_SEARCH_HEADER_PATHS + ${ARROW_HOME}/include + ) + + set(ARROW_SEARCH_LIB_PATH + ${ARROW_HOME}/lib + ) + + find_path(ARROW_INCLUDE_DIR arrow/array.h PATHS + ${ARROW_SEARCH_HEADER_PATHS} + # make sure we don't accidentally pick up a different version + NO_DEFAULT_PATH + ) +endif() + +find_library(ARROW_LIB_PATH NAMES arrow + PATHS + ${ARROW_SEARCH_LIB_PATH} + NO_DEFAULT_PATH) +get_filename_component(ARROW_LIBS ${ARROW_LIB_PATH} DIRECTORY) + +find_library(ARROW_PYTHON_LIB_PATH NAMES arrow_python + PATHS + ${ARROW_SEARCH_LIB_PATH} + NO_DEFAULT_PATH) +get_filename_component(ARROW_PYTHON_LIBS ${ARROW_PYTHON_LIB_PATH} DIRECTORY) + +if (MSVC) + SET(CMAKE_FIND_LIBRARY_SUFFIXES ".lib" ".dll") + + if (MSVC AND NOT DEFINED ARROW_MSVC_STATIC_LIB_SUFFIX) + set(ARROW_MSVC_STATIC_LIB_SUFFIX "_static") + endif() + + find_library(ARROW_SHARED_LIBRARIES NAMES arrow + PATHS ${ARROW_HOME} NO_DEFAULT_PATH + PATH_SUFFIXES "bin" ) + + find_library(ARROW_PYTHON_SHARED_LIBRARIES NAMES arrow_python + PATHS ${ARROW_HOME} NO_DEFAULT_PATH + PATH_SUFFIXES "bin" ) + get_filename_component(ARROW_SHARED_LIBS ${ARROW_SHARED_LIBRARIES} PATH ) + get_filename_component(ARROW_PYTHON_SHARED_LIBS ${ARROW_PYTHON_SHARED_LIBRARIES} PATH ) +endif () + +if (ARROW_INCLUDE_DIR AND ARROW_LIBS) + set(ARROW_FOUND TRUE) + set(ARROW_LIB_NAME arrow) + set(ARROW_PYTHON_LIB_NAME arrow_python) + if (MSVC) + set(ARROW_STATIC_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX}) + set(ARROW_PYTHON_STATIC_LIB ${ARROW_PYTHON_LIBS}/${ARROW_PYTHON_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX}) + set(ARROW_SHARED_LIB ${ARROW_SHARED_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + set(ARROW_PYTHON_SHARED_LIB ${ARROW_PYTHON_SHARED_LIBS}/${ARROW_PYTHON_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + set(ARROW_SHARED_IMP_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}.lib) + set(ARROW_PYTHON_SHARED_IMP_LIB ${ARROW_PYTHON_LIBS}/${ARROW_PYTHON_LIB_NAME}.lib) + else() + set(ARROW_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_LIB_NAME}.a) + set(ARROW_PYTHON_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_PYTHON_LIB_NAME}.a) + + set(ARROW_SHARED_LIB ${ARROW_LIBS}/lib${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + set(ARROW_PYTHON_SHARED_LIB ${ARROW_LIBS}/lib${ARROW_PYTHON_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + endif() +endif() + +if (ARROW_FOUND) + if (NOT Arrow_FIND_QUIETLY) + message(STATUS "Found the Arrow core library: ${ARROW_LIB_PATH}") + message(STATUS "Found the Arrow Python library: ${ARROW_PYTHON_LIB_PATH}") + endif () +else () + if (NOT Arrow_FIND_QUIETLY) + set(ARROW_ERR_MSG "Could not find the Arrow library. Looked for headers") + set(ARROW_ERR_MSG "${ARROW_ERR_MSG} in ${ARROW_SEARCH_HEADER_PATHS}, and for libs") + set(ARROW_ERR_MSG "${ARROW_ERR_MSG} in ${ARROW_SEARCH_LIB_PATH}") + if (Arrow_FIND_REQUIRED) + message(FATAL_ERROR "${ARROW_ERR_MSG}") + else (Arrow_FIND_REQUIRED) + message(STATUS "${ARROW_ERR_MSG}") + endif (Arrow_FIND_REQUIRED) + endif () + set(ARROW_FOUND FALSE) +endif () + +if (MSVC) + mark_as_advanced( + ARROW_INCLUDE_DIR + ARROW_STATIC_LIB + ARROW_SHARED_LIB + ARROW_SHARED_IMP_LIB + ARROW_PYTHON_STATIC_LIB + ARROW_PYTHON_SHARED_LIB + ARROW_PYTHON_SHARED_IMP_LIB + ) +else() + mark_as_advanced( + ARROW_INCLUDE_DIR + ARROW_STATIC_LIB + ARROW_SHARED_LIB + ARROW_PYTHON_STATIC_LIB + ARROW_PYTHON_SHARED_LIB + ) +endif() diff --git a/cmake/Modules/FindParquet.cmake b/cmake/Modules/FindParquet.cmake new file mode 100644 index 00000000000..ab9c31efe2d --- /dev/null +++ b/cmake/Modules/FindParquet.cmake @@ -0,0 +1,147 @@ +# https://github.com/apache/arrow/blob/master/cpp/cmake_modules/FindParquet.cmake + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# - Find PARQUET (parquet/parquet.h, libparquet.a, libparquet.so) +# This module defines +# PARQUET_INCLUDE_DIR, directory containing headers +# PARQUET_LIBS, directory containing parquet libraries +# PARQUET_STATIC_LIB, path to libparquet.a +# PARQUET_SHARED_LIB, path to libparquet's shared library +# PARQUET_SHARED_IMP_LIB, path to libparquet's import library (MSVC only) +# PARQUET_FOUND, whether parquet has been found + +include(FindPkgConfig) + +if(NOT "$ENV{PARQUET_HOME}" STREQUAL "") + set(PARQUET_HOME "$ENV{PARQUET_HOME}") +endif() + +if (MSVC) + SET(CMAKE_FIND_LIBRARY_SUFFIXES ".lib" ".dll") + + if (MSVC AND NOT DEFINED PARQUET_MSVC_STATIC_LIB_SUFFIX) + set(PARQUET_MSVC_STATIC_LIB_SUFFIX "_static") + endif() + + find_library(PARQUET_SHARED_LIBRARIES NAMES parquet + PATHS ${PARQUET_HOME} NO_DEFAULT_PATH + PATH_SUFFIXES "bin" ) + + get_filename_component(PARQUET_SHARED_LIBS ${PARQUET_SHARED_LIBRARIES} PATH ) +endif () + +if(PARQUET_HOME) + set(PARQUET_SEARCH_HEADER_PATHS + ${PARQUET_HOME}/include + ) + set(PARQUET_SEARCH_LIB_PATH + ${PARQUET_HOME}/lib + ) + find_path(PARQUET_INCLUDE_DIR parquet/api/reader.h PATHS + ${PARQUET_SEARCH_HEADER_PATHS} + # make sure we don't accidentally pick up a different version + NO_DEFAULT_PATH + ) + find_library(PARQUET_LIBRARIES NAMES parquet + PATHS ${PARQUET_HOME} NO_DEFAULT_PATH + PATH_SUFFIXES "lib") + get_filename_component(PARQUET_LIBS ${PARQUET_LIBRARIES} PATH ) + + # Try to autodiscover the Parquet ABI version + get_filename_component(PARQUET_LIB_REALPATH ${PARQUET_LIBRARIES} REALPATH) + get_filename_component(PARQUET_EXT_REALPATH ${PARQUET_LIB_REALPATH} EXT) + string(REGEX MATCH ".([0-9]+.[0-9]+.[0-9]+)" HAS_ABI_VERSION ${PARQUET_EXT_REALPATH}) + if (HAS_ABI_VERSION) + if (APPLE) + string(REGEX REPLACE ".([0-9]+.[0-9]+.[0-9]+).dylib" "\\1" PARQUET_ABI_VERSION ${PARQUET_EXT_REALPATH}) + else() + string(REGEX REPLACE ".so.([0-9]+.[0-9]+.[0-9]+)" "\\1" PARQUET_ABI_VERSION ${PARQUET_EXT_REALPATH}) + endif() + string(REGEX REPLACE "([0-9]+).[0-9]+.[0-9]+" "\\1" PARQUET_SO_VERSION ${PARQUET_ABI_VERSION}) + else() + set(PARQUET_ABI_VERSION "1.0.0") + set(PARQUET_SO_VERSION "1") + endif() +else() + pkg_check_modules(PARQUET parquet) + if (PARQUET_FOUND) + pkg_get_variable(PARQUET_ABI_VERSION parquet abi_version) + message(STATUS "Parquet C++ ABI version: ${PARQUET_ABI_VERSION}") + pkg_get_variable(PARQUET_SO_VERSION parquet so_version) + message(STATUS "Parquet C++ SO version: ${PARQUET_SO_VERSION}") + set(PARQUET_INCLUDE_DIR ${PARQUET_INCLUDE_DIRS}) + set(PARQUET_LIBS ${PARQUET_LIBRARY_DIRS}) + set(PARQUET_SEARCH_LIB_PATH ${PARQUET_LIBRARY_DIRS}) + message(STATUS "Searching for parquet libs in: ${PARQUET_SEARCH_LIB_PATH}") + find_library(PARQUET_LIBRARIES NAMES parquet + PATHS ${PARQUET_SEARCH_LIB_PATH} NO_DEFAULT_PATH) + else() + find_path(PARQUET_INCLUDE_DIR NAMES parquet/api/reader.h ) + find_library(PARQUET_LIBRARIES NAMES parquet) + get_filename_component(PARQUET_LIBS ${PARQUET_LIBRARIES} PATH ) + endif() +endif() + +if (PARQUET_INCLUDE_DIR AND PARQUET_LIBRARIES) + set(PARQUET_FOUND TRUE) + set(PARQUET_LIB_NAME parquet) + if (MSVC) + set(PARQUET_STATIC_LIB "${PARQUET_LIBS}/${PARQUET_LIB_NAME}${PARQUET_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX}") + set(PARQUET_SHARED_LIB "${PARQUET_SHARED_LIBS}/${PARQUET_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}") + set(PARQUET_SHARED_IMP_LIB "${PARQUET_LIBS}/${PARQUET_LIB_NAME}.lib") + else() + set(PARQUET_STATIC_LIB ${PARQUET_LIBS}/${CMAKE_STATIC_LIBRARY_PREFIX}${PARQUET_LIB_NAME}.a) + set(PARQUET_SHARED_LIB ${PARQUET_LIBS}/${CMAKE_SHARED_LIBRARY_PREFIX}${PARQUET_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}) + endif() +else () + set(PARQUET_FOUND FALSE) +endif () + +if (PARQUET_FOUND) + if (NOT Parquet_FIND_QUIETLY) + message(STATUS "Found the Parquet library: ${PARQUET_LIBRARIES}") + endif () +else () + if (NOT Parquet_FIND_QUIETLY) + if (NOT PARQUET_FOUND) + set(PARQUET_ERR_MSG "${PARQUET_ERR_MSG} Could not find the parquet library.") + endif() + + set(PARQUET_ERR_MSG "${PARQUET_ERR_MSG} Looked in ") + if ( _parquet_roots ) + set(PARQUET_ERR_MSG "${PARQUET_ERR_MSG} in ${_parquet_roots}.") + else () + set(PARQUET_ERR_MSG "${PARQUET_ERR_MSG} system search paths.") + endif () + if (Parquet_FIND_REQUIRED) + message(FATAL_ERROR "${PARQUET_ERR_MSG}") + else (Parquet_FIND_REQUIRED) + message(STATUS "${PARQUET_ERR_MSG}") + endif (Parquet_FIND_REQUIRED) + endif () +endif () + +mark_as_advanced( + PARQUET_FOUND + PARQUET_INCLUDE_DIR + PARQUET_LIBS + PARQUET_LIBRARIES + PARQUET_STATIC_LIB + PARQUET_SHARED_LIB +) diff --git a/cmake/find_parquet.cmake b/cmake/find_parquet.cmake new file mode 100644 index 00000000000..92da31a73ae --- /dev/null +++ b/cmake/find_parquet.cmake @@ -0,0 +1,31 @@ +option (USE_INTERNAL_PARQUET_LIBRARY "Set to FALSE to use system parquet library instead of bundled" ${NOT_UNBUNDLED}) + +if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/CMakeLists.txt") + if (USE_INTERNAL_PARQUET_LIBRARY) + message (WARNING "submodule contrib/arrow (required for Parquet) is missing. to fix try run: \n git submodule update --init --recursive") + endif () + set (USE_INTERNAL_PARQUET_LIBRARY 0) + set (MISSING_INTERNAL_PARQUET_LIBRARY 1) +endif () + +if (NOT USE_INTERNAL_PARQUET_LIBRARY) + find_package (Arrow) + find_package (Parquet) +endif () + +if (ARROW_INCLUDE_DIR AND PARQUET_INCLUDE_DIR) +elseif (NOT MISSING_INTERNAL_PARQUET_LIBRARY) + set (USE_INTERNAL_PARQUET_LIBRARY 1) + # TODO: is it required? + # set (ARROW_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src/arrow") + # set (PARQUET_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src/parquet") + set (ARROW_LIBRARY arrow_static) + set (PARQUET_LIBRARY parquet_static) + set (USE_PARQUET 1) +endif () + +if (USE_PARQUET) + message (STATUS "Using Parquet: ${ARROW_INCLUDE_DIR} ${PARQUET_INCLUDE_DIR}") +else () + message (STATUS "Building without Parquet support") +endif () diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt index 6a88b0e5cbd..a06e8cbb886 100644 --- a/contrib/CMakeLists.txt +++ b/contrib/CMakeLists.txt @@ -150,6 +150,19 @@ if (USE_INTERNAL_CAPNP_LIBRARY) target_include_directories(${CAPNP_LIBRARY} PUBLIC $) endif () +if (USE_INTERNAL_PARQUET_LIBRARY) + set (ARROW_COMPUTE ON) + set (ARROW_PARQUET ON) + set (ARROW_VERBOSE_THIRDPARTY_BUILD ON) + set (PARQUET_ARROW_LINKAGE "static") + set (ARROW_BUILD_STATIC ON) + + # Because Arrow uses CMAKE_SOURCE_DIR as a project path + # Hopefully will be fixed in https://github.com/apache/arrow/pull/2676 + set (CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/cmake_modules") + add_subdirectory (arrow/cpp) +endif () + if (USE_INTERNAL_POCO_LIBRARY) set (POCO_VERBOSE_MESSAGES 0 CACHE INTERNAL "") set (save_CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS}) diff --git a/contrib/arrow b/contrib/arrow new file mode 160000 index 00000000000..af20905877f --- /dev/null +++ b/contrib/arrow @@ -0,0 +1 @@ +Subproject commit af20905877fb353367d7ee5a808f759532a5ca0f diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 86be7c070af..121fe5c4711 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -294,6 +294,13 @@ if (USE_RDKAFKA) target_link_libraries (dbms PRIVATE clickhouse_storage_kafka) endif () +if (USE_PARQUET) + target_link_libraries(dbms ${PARQUET_LIBRARY} ${ARROW_LIBRARY}) + if (NOT USE_INTERNAL_PARQUET_LIBRARY) + target_include_directories (dbms BEFORE PRIVATE ${PARQUET_INCLUDE_DIR} ${ARROW_INCLUDE_DIR}) + endif () +endif () + target_link_libraries(dbms PRIVATE ${OPENSSL_CRYPTO_LIBRARY} Threads::Threads) target_include_directories (dbms SYSTEM BEFORE PRIVATE ${DIVIDE_INCLUDE_DIR}) diff --git a/dbms/src/DataStreams/ParquetBlockInputStream.cpp b/dbms/src/DataStreams/ParquetBlockInputStream.cpp new file mode 100644 index 00000000000..58aa9d5957e --- /dev/null +++ b/dbms/src/DataStreams/ParquetBlockInputStream.cpp @@ -0,0 +1,336 @@ +#include +#include +#include + +// TODO: clear includes +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + + +#include +#include + +namespace DB +{ + +ParquetBlockInputStream::ParquetBlockInputStream(ReadBuffer & istr_, const Block & header_) + : istr(istr_) + , header(header_) +{ +} + +Block ParquetBlockInputStream::getHeader() const +{ + return header; +} + +/// Inserts numeric data right into internal column data to reduce an overhead +template +void ParquetBlockInputStream::fillColumnWithNumericData(std::shared_ptr & arrow_column, MutableColumnPtr & internal_column) +{ + PaddedPODArray & column_data = static_cast &>(*internal_column).getData(); + column_data.reserve(arrow_column->length()); + + for (size_t chunk_i = 0; chunk_i != static_cast(arrow_column->data()->num_chunks()); ++chunk_i) + { + std::shared_ptr chunk = arrow_column->data()->chunk(chunk_i); + /// buffers[0] is a null bitmap and buffers[1] are actual values + std::shared_ptr buffer = chunk->data()->buffers[1]; + + const NumericType * raw_data = reinterpret_cast(buffer->data()); + column_data.insert_assume_reserved(raw_data, raw_data + chunk->length()); + } +} + +/// Inserts chars and offsets right into internal column data to reduce an overhead. +/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars. +/// Also internal strings are null terminated. +void ParquetBlockInputStream::fillColumnWithStringData(std::shared_ptr & arrow_column, MutableColumnPtr & internal_column) +{ + PaddedPODArray & column_chars_t = static_cast(*internal_column).getChars(); + PaddedPODArray & column_offsets = static_cast(*internal_column).getOffsets(); + + size_t chars_t_size = 0; + for (size_t chunk_i = 0; chunk_i != static_cast(arrow_column->data()->num_chunks()); ++chunk_i) + { + arrow::BinaryArray & chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); + const size_t chunk_length = chunk.length(); + + chars_t_size += chunk.value_offset(chunk_length - 1) + chunk.value_length(chunk_length - 1); + chars_t_size += chunk_length; /// additional space for null bytes + } + + column_chars_t.reserve(chars_t_size); + column_offsets.reserve(arrow_column->length()); + + for (size_t chunk_i = 0; chunk_i != static_cast(arrow_column->data()->num_chunks()); ++chunk_i) + { + arrow::BinaryArray & chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); + std::shared_ptr buffer = chunk.value_data(); + const size_t chunk_length = chunk.length(); + + for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i) + { + const UInt8 * raw_data = buffer->data() + chunk.value_offset(offset_i); + column_chars_t.insert_assume_reserved(raw_data, raw_data + chunk.value_length(offset_i)); + column_chars_t.emplace_back('\0'); + + column_offsets.emplace_back(column_chars_t.size()); + } + } +} + +void ParquetBlockInputStream::fillColumnWithBooleanData(std::shared_ptr & arrow_column, MutableColumnPtr & internal_column) +{ + PaddedPODArray & column_data = static_cast &>(*internal_column).getData(); + column_data.resize(arrow_column->length()); + + for (size_t chunk_i = 0; chunk_i != static_cast(arrow_column->data()->num_chunks()); ++chunk_i) + { + arrow::BooleanArray & chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); + /// buffers[0] is a null bitmap and buffers[1] are actual values + std::shared_ptr buffer = chunk.data()->buffers[1]; + + for (size_t bool_i = 0; bool_i != static_cast(chunk.length()); ++bool_i) + column_data[bool_i] = chunk.Value(bool_i); + } +} + +/// Arrow stores Parquet::DATE in Int32, while ClickHouse stores Date in UInt16. Therefore, it should be checked before saving +void ParquetBlockInputStream::fillColumnWithDate32Data(std::shared_ptr & arrow_column, MutableColumnPtr & internal_column) +{ + PaddedPODArray & column_data = static_cast &>(*internal_column).getData(); + column_data.reserve(arrow_column->length()); + + for (size_t chunk_i = 0; chunk_i != static_cast(arrow_column->data()->num_chunks()); ++chunk_i) + { + arrow::Date32Array & chunk = static_cast(*(arrow_column->data()->chunk(chunk_i))); + + for (size_t value_i = 0; value_i != static_cast(chunk.length()); ++value_i) + { + UInt32 days_num = static_cast(chunk.Value(value_i)); + if (days_num > DATE_LUT_MAX_DAY_NUM) + { + // TODO: will it rollback correctly? + throw Exception( + "Input value " + std::to_string(days_num) + " of a column \"" + arrow_column->name() + "\" is greater than " + "max allowed Date value, which is " + std::to_string(DATE_LUT_MAX_DAY_NUM) + ); + } + + column_data.emplace_back(days_num); + } + } +} + +/// Creates a null bytemap from arrow's null bitmap +void ParquetBlockInputStream::fillByteMapFromArrowColumn(std::shared_ptr & arrow_column, MutableColumnPtr & bytemap) +{ + PaddedPODArray & bytemap_data = static_cast &>(*bytemap).getData(); + bytemap_data.reserve(arrow_column->length()); + + for (size_t chunk_i = 0; chunk_i != static_cast(arrow_column->data()->num_chunks()); ++chunk_i) + { + std::shared_ptr chunk = arrow_column->data()->chunk(chunk_i); + + for (size_t value_i = 0; value_i != static_cast(chunk->length()); ++value_i) + bytemap_data.emplace_back(chunk->IsNull(value_i)); + } +} + +#define FOR_ARROW_NUMERIC_TYPES(M) \ + M(arrow::Type::UINT8, UInt8) \ + M(arrow::Type::INT8, Int8) \ + M(arrow::Type::UINT16, UInt16) \ + M(arrow::Type::INT16, Int16) \ + M(arrow::Type::UINT32, UInt32) \ + M(arrow::Type::INT32, Int32) \ + M(arrow::Type::UINT64, UInt64) \ + M(arrow::Type::INT64, Int64) \ + M(arrow::Type::FLOAT, Float32) \ + M(arrow::Type::DOUBLE, Float64) + + +using NameToColumnPtr = std::unordered_map>; + +const std::unordered_map> ParquetBlockInputStream::arrow_type_to_internal_type = { + {arrow::Type::UINT8, std::make_shared()}, + {arrow::Type::INT8, std::make_shared()}, + {arrow::Type::UINT16, std::make_shared()}, + {arrow::Type::INT16, std::make_shared()}, + {arrow::Type::UINT32, std::make_shared()}, + {arrow::Type::INT32, std::make_shared()}, + {arrow::Type::UINT64, std::make_shared()}, + {arrow::Type::INT64, std::make_shared()}, + {arrow::Type::FLOAT, std::make_shared()}, + {arrow::Type::DOUBLE, std::make_shared()}, + + {arrow::Type::BOOL, std::make_shared()}, + {arrow::Type::DATE32, std::make_shared()}, + + {arrow::Type::STRING, std::make_shared()}//, + // TODO: add other types that are convertable to internal ones: + // 0. ENUM? + // 1. UUID -> String + // 2. JSON -> String +}; + + +Block ParquetBlockInputStream::readImpl() +{ + Block res; + + if (istr.eof()) + return res; + + std::string file_data; + + { + WriteBufferFromString file_buffer(file_data); + copyData(istr, file_buffer); + } + + arrow::Buffer buffer(file_data); + // TODO: maybe use parquet::RandomAccessSource? + auto reader = parquet::ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer)); + parquet::arrow::FileReader filereader(::arrow::default_memory_pool(), std::move(reader)); + std::shared_ptr table; + + // TODO: also catch a ParquetException thrown by filereader? + arrow::Status read_status = filereader.ReadTable(&table); + if (!read_status.ok()) + throw Exception("Error while reading parquet data: " + read_status.ToString()/*, ErrorCodes::TODO*/); + + if (0 == table->num_rows()) + throw Exception("Empty table in input data"/*, ErrorCodes::TODO*/); + + if (header.columns() > static_cast(table->num_columns())) + // TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable? + throw Exception("Number of columns is less than the table has" /*, ErrorCodes::TODO*/); + + + NameToColumnPtr name_to_column_ptr; + for (size_t i = 0; i != static_cast(table->num_columns()); ++i) + { + std::shared_ptr arrow_column = table->column(i); + name_to_column_ptr[arrow_column->name()] = arrow_column; + } + + for (size_t column_i = 0; column_i != header.columns(); ++column_i) + { + ColumnWithTypeAndName header_column = header.getByPosition(column_i); + + if (name_to_column_ptr.find(header_column.name) == name_to_column_ptr.end()) + // TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable? + throw Exception("Column \"" + header_column.name + "\" is not presented in input data" /*, ErrorCodes::TODO*/); + + std::shared_ptr arrow_column = name_to_column_ptr[header_column.name]; + arrow::Type::type arrow_type = arrow_column->type()->id(); + + if (arrow_type_to_internal_type.find(arrow_type) == arrow_type_to_internal_type.end()) + { + throw Exception( + "The type \"" + arrow_column->type()->name() + "\" of an input column \"" + arrow_column->name() + "\"" + " is not supported for conversion from a Parquet data format" + /*, ErrorCodes::TODO*/ + ); + } + + // TODO: check if a column is const? + if (!header_column.type->isNullable() && arrow_column->null_count()) + { + throw Exception("Can not insert NULL data into non-nullable column \"" + header_column.name + "\""/*, ErrorCodes::TODO*/); + } + + const bool target_column_is_nullable = header_column.type->isNullable() || arrow_column->null_count(); + + const DataTypePtr internal_nested_type = arrow_type_to_internal_type.at(arrow_type); + const DataTypePtr internal_type = target_column_is_nullable ? makeNullable(internal_nested_type) : internal_nested_type; + const std::string internal_nested_type_name = internal_nested_type->getName(); + + const DataTypePtr column_nested_type = + header_column.type->isNullable() + ? static_cast(header_column.type.get())->getNestedType() + : header_column.type; + + + const DataTypePtr column_type = header_column.type; + const std::string column_nested_type_name = column_nested_type->getName(); + + // TODO: can it be done with typeid_cast? + if (internal_nested_type_name != column_nested_type_name) + { + throw Exception( + "Input data type \"" + internal_nested_type_name + "\" for a column \"" + header_column.name + "\"" + " is not compatible with a column type \"" + column_nested_type_name + "\""/*, ErrorCodes::TODO*/ + ); + } + + ColumnWithTypeAndName column; + column.name = header_column.name; + column.type = internal_type; + + /// Data + MutableColumnPtr read_column = internal_nested_type->createColumn(); + + switch (arrow_type) + { + case arrow::Type::STRING: + fillColumnWithStringData(arrow_column, read_column); + break; + case arrow::Type::BOOL: + fillColumnWithBooleanData(arrow_column, read_column); + break; + case arrow::Type::DATE32: + fillColumnWithDate32Data(arrow_column, read_column); + break; +#define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \ + case ARROW_NUMERIC_TYPE: \ + fillColumnWithNumericData(arrow_column, read_column); \ + break; + + FOR_ARROW_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + // TODO: support TIMESTAMP_MICROS and TIMESTAMP_MILLIS with truncated micro- and milliseconds? + // TODO: read JSON as a string? + // TODO: read UUID as a string? + default: + throw Exception("Unsupported parquet type \"" + arrow_column->type()->name() + "\""/*, ErrorCodes::TODO*/); + } + + if (column.type->isNullable()) + { + MutableColumnPtr null_bytemap = DataTypeUInt8().createColumn(); + fillByteMapFromArrowColumn(arrow_column, null_bytemap); + column.column = ColumnNullable::create(std::move(read_column), std::move(null_bytemap)); + } + else + { + column.column = std::move(read_column); + } + res.insert(std::move(column)); + } + + return res; +} + +} diff --git a/dbms/src/DataStreams/ParquetBlockInputStream.h b/dbms/src/DataStreams/ParquetBlockInputStream.h new file mode 100644 index 00000000000..77d53340f86 --- /dev/null +++ b/dbms/src/DataStreams/ParquetBlockInputStream.h @@ -0,0 +1,43 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +// TODO: refine includes +#include + +namespace DB +{ + +class ParquetBlockInputStream : public IProfilingBlockInputStream +{ +public: + ParquetBlockInputStream(ReadBuffer & istr_, const Block & header_); + + String getName() const override { return "Parquet"; } + Block getHeader() const override; + +protected: + Block readImpl() override; + +private: + ReadBuffer & istr; + Block header; + + static void fillColumnWithStringData(std::shared_ptr & arrow_column, MutableColumnPtr & internal_column); + static void fillColumnWithBooleanData(std::shared_ptr & arrow_column, MutableColumnPtr & internal_column); + static void fillColumnWithDate32Data(std::shared_ptr & arrow_column, MutableColumnPtr & internal_column); + template + static void fillColumnWithNumericData(std::shared_ptr & arrow_column, MutableColumnPtr & internal_column); + + static void fillByteMapFromArrowColumn(std::shared_ptr & arrow_column, MutableColumnPtr & bytemap); + + static const std::unordered_map> arrow_type_to_internal_type; + + // TODO: check that this class implements every part of its parent +}; + +} diff --git a/dbms/src/DataStreams/ParquetBlockOutputStream.cpp b/dbms/src/DataStreams/ParquetBlockOutputStream.cpp new file mode 100644 index 00000000000..416bf02552d --- /dev/null +++ b/dbms/src/DataStreams/ParquetBlockOutputStream.cpp @@ -0,0 +1,271 @@ +// TODO: clean includes +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + +ParquetBlockOutputStream::ParquetBlockOutputStream(WriteBuffer & ostr_, const Block & header_) + : ostr(ostr_) + , header(header_) +{ +} + +void ParquetBlockOutputStream::flush() +{ + ostr.next(); +} + +void checkAppendStatus(arrow::Status & append_status, const std::string & column_name) +{ + if (!append_status.ok()) + { + throw Exception( + "Error while building a parquet column \"" + column_name + "\": " + append_status.ToString()/*, + ErrorCodes::TODO*/ + ); + } +} + +void checkFinishStatus(arrow::Status & finish_status, const std::string & column_name) +{ + if (!finish_status.ok()) + { + throw Exception( + "Error while writing a parquet column \"" + column_name + "\": " + finish_status.ToString()/*, + ErrorCodes::TODO*/ + ); + } +} + +template +void ParquetBlockOutputStream::fillArrowArrayWithNumericColumnData( + ColumnPtr write_column, + std::shared_ptr & arrow_array, + const PaddedPODArray * null_bytemap +) { + const PaddedPODArray & internal_data = static_cast &>(*write_column).getData(); + ArrowBuilderType numeric_builder; + arrow::Status append_status; + + const UInt8 * arrow_null_bytemap_raw_ptr = nullptr; + PaddedPODArray arrow_null_bytemap; + if (null_bytemap) + { + /// Invert values since Arrow interprets 1 as a non-null value, while CH as a null + arrow_null_bytemap.reserve(null_bytemap->size()); + for (size_t i = 0; i != null_bytemap->size(); ++i) + arrow_null_bytemap.emplace_back(1 ^ (*null_bytemap)[i]); + + arrow_null_bytemap_raw_ptr = arrow_null_bytemap.data(); + } + + append_status = numeric_builder.AppendValues(internal_data.data(), internal_data.size(), arrow_null_bytemap_raw_ptr); + checkAppendStatus(append_status, write_column->getName()); + + arrow::Status finish_status = numeric_builder.Finish(&arrow_array); + checkFinishStatus(finish_status, write_column->getName()); +} + +void ParquetBlockOutputStream::fillArrowArrayWithStringColumnData( + ColumnPtr write_column, + std::shared_ptr & arrow_array, + const PaddedPODArray * null_bytemap +) { + const ColumnString & internal_column = static_cast(*write_column); + arrow::StringBuilder string_builder; + arrow::Status append_status; + + for (size_t string_i = 0; string_i != internal_column.size(); ++string_i) + { + if (null_bytemap && (*null_bytemap)[string_i]) + { + append_status = string_builder.AppendNull(); + } + else + { + StringRef string_ref = internal_column.getDataAt(string_i); + append_status = string_builder.Append(string_ref.data, string_ref.size); + } + + checkAppendStatus(append_status, write_column->getName()); + } + + arrow::Status finish_status = string_builder.Finish(&arrow_array); + checkFinishStatus(finish_status, write_column->getName()); +} + +void ParquetBlockOutputStream::fillArrowArrayWithDateColumnData( + ColumnPtr write_column, + std::shared_ptr & arrow_array, + const PaddedPODArray * null_bytemap +) { + const PaddedPODArray & internal_data = static_cast &>(*write_column).getData(); + arrow::Date32Builder date32_builder; + arrow::Status append_status; + + for (size_t value_i = 0; value_i != internal_data.size(); ++value_i) + { + if (null_bytemap && (*null_bytemap)[value_i]) + append_status = date32_builder.AppendNull(); + else + /// Implicitly converts UInt16 to Int32 + append_status = date32_builder.Append(internal_data[value_i]); + + checkAppendStatus(append_status, write_column->getName()); + } + + arrow::Status finish_status = date32_builder.Finish(&arrow_array); + checkFinishStatus(finish_status, write_column->getName()); +} + +#define FOR_INTERNAL_NUMERIC_TYPES(M) \ + M(UInt8, arrow::UInt8Builder) \ + M(Int8, arrow::Int8Builder) \ + M(UInt16, arrow::UInt16Builder) \ + M(Int16, arrow::Int16Builder) \ + M(UInt32, arrow::UInt32Builder) \ + M(Int32, arrow::Int32Builder) \ + M(UInt64, arrow::UInt64Builder) \ + M(Int64, arrow::Int64Builder) \ + M(Float32, arrow::FloatBuilder) \ + M(Float64, arrow::DoubleBuilder) + +const std::unordered_map> ParquetBlockOutputStream::internal_type_to_arrow_type = { + {"UInt8", arrow::uint8()}, + {"Int8", arrow::int8()}, + {"UInt16", arrow::uint16()}, + {"Int16", arrow::int16()}, + {"UInt32", arrow::uint32()}, + {"Int32", arrow::int32()}, + {"UInt64", arrow::uint64()}, + {"Int64", arrow::int64()}, + {"Float32", arrow::float32()}, + {"Float64", arrow::float64()}, + + {"Date", arrow::date32()}, + + // TODO: ClickHouse can actually store non-utf8 strings! + {"String", arrow::utf8()}//, + // TODO: add other types: + // 1. FixedString + // 2. DateTime +}; + +const PaddedPODArray * extractNullBytemapPtr(ColumnPtr column) +{ + ColumnPtr null_column = static_cast(*column).getNullMapColumnPtr(); + const PaddedPODArray & null_bytemap = static_cast &>(*null_column).getData(); + return &null_bytemap; +} + +void ParquetBlockOutputStream::write(const Block & block) +{ + block.checkNumberOfRows(); + + const size_t columns_num = block.columns(); + + /// For arrow::Schema and arrow::Table creation + std::vector> arrow_fields; + std::vector> arrow_arrays; + arrow_fields.reserve(columns_num); + arrow_arrays.reserve(columns_num); + + for (size_t column_i = 0; column_i < columns_num; ++column_i) + { + // TODO: constructed every iteration + const ColumnWithTypeAndName & column = block.safeGetByPosition(column_i); + + const bool is_column_nullable = column.type->isNullable(); + const DataTypePtr column_nested_type = + is_column_nullable + ? static_cast(column.type.get())->getNestedType() + : column.type; + const DataTypePtr column_type = column.type; + // TODO: do not mix std::string and String + const std::string column_nested_type_name = column_nested_type->getName(); + + if (internal_type_to_arrow_type.find(column_nested_type_name) == internal_type_to_arrow_type.end()) + { + throw Exception( + "The type \"" + column_nested_type_name + "\" of a column \"" + column.name + "\"" + " is not supported for conversion into a Parquet data format" + /*, ErrorCodes::TODO*/ + ); + } + + arrow_fields.emplace_back(new arrow::Field( + column.name, + internal_type_to_arrow_type.at(column_nested_type_name), + is_column_nullable + )); + std::shared_ptr arrow_array; + + ColumnPtr nested_column = is_column_nullable ? static_cast(*column.column).getNestedColumnPtr() : column.column; + const PaddedPODArray * null_bytemap = is_column_nullable ? extractNullBytemapPtr(column.column) : nullptr; + + // TODO: use typeid_cast + if ("String" == column_nested_type_name) + { + fillArrowArrayWithStringColumnData(nested_column, arrow_array, null_bytemap); + } + else if ("Date" == column_nested_type_name) + { + fillArrowArrayWithDateColumnData(nested_column, arrow_array, null_bytemap); + } +#define DISPATCH(CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE) \ + else if (#CPP_NUMERIC_TYPE == column_nested_type_name) \ + { \ + fillArrowArrayWithNumericColumnData(nested_column, arrow_array, null_bytemap); \ + } + + FOR_INTERNAL_NUMERIC_TYPES(DISPATCH) +#undef DISPATCH + // TODO: there are also internal types that are convertable to parquet/arrow once: + // 1. FixedString(N) + // 2. DateTime + else + { + throw Exception( + "Internal type \"" + column_nested_type_name + "\" of a column \"" + column.name + "\"" + " is not supported for conversion into a Parquet data format"/*, ErrorCodes::TODO*/ + ); + } + + arrow_arrays.emplace_back(std::move(arrow_array)); + } + + std::shared_ptr arrow_schema = std::make_shared(std::move(arrow_fields)); + std::shared_ptr arrow_table = arrow::Table::Make(arrow_schema, arrow_arrays); + + // TODO: get rid of extra copying + std::shared_ptr sink = std::make_shared(); + + // TODO: calculate row_group_size depending on a number of rows and table size + + arrow::Status write_status = parquet::arrow::WriteTable( + *arrow_table, arrow::default_memory_pool(), sink, + /* row_group_size = */arrow_table->num_rows(), parquet::default_writer_properties(), + parquet::arrow::default_arrow_writer_properties() + ); + if (!write_status.ok()) + throw Exception("Error while writing a table: " + write_status.ToString()/*, ErrorCodes::TODO*/); + + std::shared_ptr table_buffer = sink->GetBuffer(); + writeString(reinterpret_cast(table_buffer->data()), table_buffer->size(), ostr); +} + +}; diff --git a/dbms/src/DataStreams/ParquetBlockOutputStream.h b/dbms/src/DataStreams/ParquetBlockOutputStream.h new file mode 100644 index 00000000000..896b73fba03 --- /dev/null +++ b/dbms/src/DataStreams/ParquetBlockOutputStream.h @@ -0,0 +1,36 @@ +#pragma once + +#include +#include + +namespace DB +{ + + +class ParquetBlockOutputStream : public IBlockOutputStream +{ +public: + ParquetBlockOutputStream(WriteBuffer & ostr_, const Block & header_); + + Block getHeader() const override { return header; } + void write(const Block & block) override; + void flush() override; + + String getContentType() const override { return "application/octet-stream"; } + +private: + WriteBuffer & ostr; + Block header; + + static void fillArrowArrayWithDateColumnData(ColumnPtr write_column, std::shared_ptr & arrow_array, + const PaddedPODArray * null_bytemap); + static void fillArrowArrayWithStringColumnData(ColumnPtr write_column, std::shared_ptr & arrow_array, + const PaddedPODArray * null_bytemap); + template + static void fillArrowArrayWithNumericColumnData(ColumnPtr write_column, std::shared_ptr & arrow_array, + const PaddedPODArray * null_bytemap); + + static const std::unordered_map> internal_type_to_arrow_type; +}; + +}