Merge commit 'bf09c2047a47b74f089d701b9b1e4fcbe8955a7f' into fix23

This commit is contained in:
proller 2019-02-19 23:46:37 +03:00
commit 88076bb8c9
12 changed files with 1048 additions and 0 deletions

3
.gitmodules vendored
View File

@ -67,3 +67,6 @@
[submodule "contrib/brotli"]
path = contrib/brotli
url = https://github.com/google/brotli.git
[submodule "contrib/arrow"]
path = contrib/arrow
url = https://github.com/apache/arrow

View File

@ -261,6 +261,7 @@ include (cmake/find_base64.cmake)
if (ENABLE_TESTS)
include (cmake/find_gtest.cmake)
endif ()
include (cmake/find_parquet.cmake)
include (cmake/find_contrib_lib.cmake)
find_contrib_lib(cityhash)

View File

@ -0,0 +1,159 @@
# https://github.com/apache/arrow/blob/master/cpp/cmake_modules/FindArrow.cmake
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# - Find ARROW (arrow/api.h, libarrow.a, libarrow.so)
# This module defines
# ARROW_INCLUDE_DIR, directory containing headers
# ARROW_LIBS, directory containing arrow libraries
# ARROW_STATIC_LIB, path to libarrow.a
# ARROW_SHARED_LIB, path to libarrow's shared library
# ARROW_SHARED_IMP_LIB, path to libarrow's import library (MSVC only)
# ARROW_FOUND, whether arrow has been found
include(FindPkgConfig)
include(GNUInstallDirs)
if ("$ENV{ARROW_HOME}" STREQUAL "")
pkg_check_modules(ARROW arrow)
if (ARROW_FOUND)
pkg_get_variable(ARROW_SO_VERSION arrow so_version)
set(ARROW_ABI_VERSION ${ARROW_SO_VERSION})
message(STATUS "Arrow SO and ABI version: ${ARROW_SO_VERSION}")
pkg_get_variable(ARROW_FULL_SO_VERSION arrow full_so_version)
message(STATUS "Arrow full SO version: ${ARROW_FULL_SO_VERSION}")
if ("${ARROW_INCLUDE_DIRS}" STREQUAL "")
set(ARROW_INCLUDE_DIRS "/usr/${CMAKE_INSTALL_INCLUDEDIR}")
endif()
if ("${ARROW_LIBRARY_DIRS}" STREQUAL "")
set(ARROW_LIBRARY_DIRS "/usr/${CMAKE_INSTALL_LIBDIR}")
if (EXISTS "/etc/debian_version" AND CMAKE_LIBRARY_ARCHITECTURE)
set(ARROW_LIBRARY_DIRS
"${ARROW_LIBRARY_DIRS}/${CMAKE_LIBRARY_ARCHITECTURE}")
endif()
endif()
set(ARROW_INCLUDE_DIR ${ARROW_INCLUDE_DIRS})
set(ARROW_LIBS ${ARROW_LIBRARY_DIRS})
set(ARROW_SEARCH_LIB_PATH ${ARROW_LIBRARY_DIRS})
endif()
else()
set(ARROW_HOME "$ENV{ARROW_HOME}")
set(ARROW_SEARCH_HEADER_PATHS
${ARROW_HOME}/include
)
set(ARROW_SEARCH_LIB_PATH
${ARROW_HOME}/lib
)
find_path(ARROW_INCLUDE_DIR arrow/array.h PATHS
${ARROW_SEARCH_HEADER_PATHS}
# make sure we don't accidentally pick up a different version
NO_DEFAULT_PATH
)
endif()
find_library(ARROW_LIB_PATH NAMES arrow
PATHS
${ARROW_SEARCH_LIB_PATH}
NO_DEFAULT_PATH)
get_filename_component(ARROW_LIBS ${ARROW_LIB_PATH} DIRECTORY)
find_library(ARROW_PYTHON_LIB_PATH NAMES arrow_python
PATHS
${ARROW_SEARCH_LIB_PATH}
NO_DEFAULT_PATH)
get_filename_component(ARROW_PYTHON_LIBS ${ARROW_PYTHON_LIB_PATH} DIRECTORY)
if (MSVC)
SET(CMAKE_FIND_LIBRARY_SUFFIXES ".lib" ".dll")
if (MSVC AND NOT DEFINED ARROW_MSVC_STATIC_LIB_SUFFIX)
set(ARROW_MSVC_STATIC_LIB_SUFFIX "_static")
endif()
find_library(ARROW_SHARED_LIBRARIES NAMES arrow
PATHS ${ARROW_HOME} NO_DEFAULT_PATH
PATH_SUFFIXES "bin" )
find_library(ARROW_PYTHON_SHARED_LIBRARIES NAMES arrow_python
PATHS ${ARROW_HOME} NO_DEFAULT_PATH
PATH_SUFFIXES "bin" )
get_filename_component(ARROW_SHARED_LIBS ${ARROW_SHARED_LIBRARIES} PATH )
get_filename_component(ARROW_PYTHON_SHARED_LIBS ${ARROW_PYTHON_SHARED_LIBRARIES} PATH )
endif ()
if (ARROW_INCLUDE_DIR AND ARROW_LIBS)
set(ARROW_FOUND TRUE)
set(ARROW_LIB_NAME arrow)
set(ARROW_PYTHON_LIB_NAME arrow_python)
if (MSVC)
set(ARROW_STATIC_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX})
set(ARROW_PYTHON_STATIC_LIB ${ARROW_PYTHON_LIBS}/${ARROW_PYTHON_LIB_NAME}${ARROW_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX})
set(ARROW_SHARED_LIB ${ARROW_SHARED_LIBS}/${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
set(ARROW_PYTHON_SHARED_LIB ${ARROW_PYTHON_SHARED_LIBS}/${ARROW_PYTHON_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
set(ARROW_SHARED_IMP_LIB ${ARROW_LIBS}/${ARROW_LIB_NAME}.lib)
set(ARROW_PYTHON_SHARED_IMP_LIB ${ARROW_PYTHON_LIBS}/${ARROW_PYTHON_LIB_NAME}.lib)
else()
set(ARROW_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_LIB_NAME}.a)
set(ARROW_PYTHON_STATIC_LIB ${ARROW_LIBS}/lib${ARROW_PYTHON_LIB_NAME}.a)
set(ARROW_SHARED_LIB ${ARROW_LIBS}/lib${ARROW_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
set(ARROW_PYTHON_SHARED_LIB ${ARROW_LIBS}/lib${ARROW_PYTHON_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
endif()
endif()
if (ARROW_FOUND)
if (NOT Arrow_FIND_QUIETLY)
message(STATUS "Found the Arrow core library: ${ARROW_LIB_PATH}")
message(STATUS "Found the Arrow Python library: ${ARROW_PYTHON_LIB_PATH}")
endif ()
else ()
if (NOT Arrow_FIND_QUIETLY)
set(ARROW_ERR_MSG "Could not find the Arrow library. Looked for headers")
set(ARROW_ERR_MSG "${ARROW_ERR_MSG} in ${ARROW_SEARCH_HEADER_PATHS}, and for libs")
set(ARROW_ERR_MSG "${ARROW_ERR_MSG} in ${ARROW_SEARCH_LIB_PATH}")
if (Arrow_FIND_REQUIRED)
message(FATAL_ERROR "${ARROW_ERR_MSG}")
else (Arrow_FIND_REQUIRED)
message(STATUS "${ARROW_ERR_MSG}")
endif (Arrow_FIND_REQUIRED)
endif ()
set(ARROW_FOUND FALSE)
endif ()
if (MSVC)
mark_as_advanced(
ARROW_INCLUDE_DIR
ARROW_STATIC_LIB
ARROW_SHARED_LIB
ARROW_SHARED_IMP_LIB
ARROW_PYTHON_STATIC_LIB
ARROW_PYTHON_SHARED_LIB
ARROW_PYTHON_SHARED_IMP_LIB
)
else()
mark_as_advanced(
ARROW_INCLUDE_DIR
ARROW_STATIC_LIB
ARROW_SHARED_LIB
ARROW_PYTHON_STATIC_LIB
ARROW_PYTHON_SHARED_LIB
)
endif()

View File

@ -0,0 +1,147 @@
# https://github.com/apache/arrow/blob/master/cpp/cmake_modules/FindParquet.cmake
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# - Find PARQUET (parquet/parquet.h, libparquet.a, libparquet.so)
# This module defines
# PARQUET_INCLUDE_DIR, directory containing headers
# PARQUET_LIBS, directory containing parquet libraries
# PARQUET_STATIC_LIB, path to libparquet.a
# PARQUET_SHARED_LIB, path to libparquet's shared library
# PARQUET_SHARED_IMP_LIB, path to libparquet's import library (MSVC only)
# PARQUET_FOUND, whether parquet has been found
include(FindPkgConfig)
if(NOT "$ENV{PARQUET_HOME}" STREQUAL "")
set(PARQUET_HOME "$ENV{PARQUET_HOME}")
endif()
if (MSVC)
SET(CMAKE_FIND_LIBRARY_SUFFIXES ".lib" ".dll")
if (MSVC AND NOT DEFINED PARQUET_MSVC_STATIC_LIB_SUFFIX)
set(PARQUET_MSVC_STATIC_LIB_SUFFIX "_static")
endif()
find_library(PARQUET_SHARED_LIBRARIES NAMES parquet
PATHS ${PARQUET_HOME} NO_DEFAULT_PATH
PATH_SUFFIXES "bin" )
get_filename_component(PARQUET_SHARED_LIBS ${PARQUET_SHARED_LIBRARIES} PATH )
endif ()
if(PARQUET_HOME)
set(PARQUET_SEARCH_HEADER_PATHS
${PARQUET_HOME}/include
)
set(PARQUET_SEARCH_LIB_PATH
${PARQUET_HOME}/lib
)
find_path(PARQUET_INCLUDE_DIR parquet/api/reader.h PATHS
${PARQUET_SEARCH_HEADER_PATHS}
# make sure we don't accidentally pick up a different version
NO_DEFAULT_PATH
)
find_library(PARQUET_LIBRARIES NAMES parquet
PATHS ${PARQUET_HOME} NO_DEFAULT_PATH
PATH_SUFFIXES "lib")
get_filename_component(PARQUET_LIBS ${PARQUET_LIBRARIES} PATH )
# Try to autodiscover the Parquet ABI version
get_filename_component(PARQUET_LIB_REALPATH ${PARQUET_LIBRARIES} REALPATH)
get_filename_component(PARQUET_EXT_REALPATH ${PARQUET_LIB_REALPATH} EXT)
string(REGEX MATCH ".([0-9]+.[0-9]+.[0-9]+)" HAS_ABI_VERSION ${PARQUET_EXT_REALPATH})
if (HAS_ABI_VERSION)
if (APPLE)
string(REGEX REPLACE ".([0-9]+.[0-9]+.[0-9]+).dylib" "\\1" PARQUET_ABI_VERSION ${PARQUET_EXT_REALPATH})
else()
string(REGEX REPLACE ".so.([0-9]+.[0-9]+.[0-9]+)" "\\1" PARQUET_ABI_VERSION ${PARQUET_EXT_REALPATH})
endif()
string(REGEX REPLACE "([0-9]+).[0-9]+.[0-9]+" "\\1" PARQUET_SO_VERSION ${PARQUET_ABI_VERSION})
else()
set(PARQUET_ABI_VERSION "1.0.0")
set(PARQUET_SO_VERSION "1")
endif()
else()
pkg_check_modules(PARQUET parquet)
if (PARQUET_FOUND)
pkg_get_variable(PARQUET_ABI_VERSION parquet abi_version)
message(STATUS "Parquet C++ ABI version: ${PARQUET_ABI_VERSION}")
pkg_get_variable(PARQUET_SO_VERSION parquet so_version)
message(STATUS "Parquet C++ SO version: ${PARQUET_SO_VERSION}")
set(PARQUET_INCLUDE_DIR ${PARQUET_INCLUDE_DIRS})
set(PARQUET_LIBS ${PARQUET_LIBRARY_DIRS})
set(PARQUET_SEARCH_LIB_PATH ${PARQUET_LIBRARY_DIRS})
message(STATUS "Searching for parquet libs in: ${PARQUET_SEARCH_LIB_PATH}")
find_library(PARQUET_LIBRARIES NAMES parquet
PATHS ${PARQUET_SEARCH_LIB_PATH} NO_DEFAULT_PATH)
else()
find_path(PARQUET_INCLUDE_DIR NAMES parquet/api/reader.h )
find_library(PARQUET_LIBRARIES NAMES parquet)
get_filename_component(PARQUET_LIBS ${PARQUET_LIBRARIES} PATH )
endif()
endif()
if (PARQUET_INCLUDE_DIR AND PARQUET_LIBRARIES)
set(PARQUET_FOUND TRUE)
set(PARQUET_LIB_NAME parquet)
if (MSVC)
set(PARQUET_STATIC_LIB "${PARQUET_LIBS}/${PARQUET_LIB_NAME}${PARQUET_MSVC_STATIC_LIB_SUFFIX}${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(PARQUET_SHARED_LIB "${PARQUET_SHARED_LIBS}/${PARQUET_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}")
set(PARQUET_SHARED_IMP_LIB "${PARQUET_LIBS}/${PARQUET_LIB_NAME}.lib")
else()
set(PARQUET_STATIC_LIB ${PARQUET_LIBS}/${CMAKE_STATIC_LIBRARY_PREFIX}${PARQUET_LIB_NAME}.a)
set(PARQUET_SHARED_LIB ${PARQUET_LIBS}/${CMAKE_SHARED_LIBRARY_PREFIX}${PARQUET_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
endif()
else ()
set(PARQUET_FOUND FALSE)
endif ()
if (PARQUET_FOUND)
if (NOT Parquet_FIND_QUIETLY)
message(STATUS "Found the Parquet library: ${PARQUET_LIBRARIES}")
endif ()
else ()
if (NOT Parquet_FIND_QUIETLY)
if (NOT PARQUET_FOUND)
set(PARQUET_ERR_MSG "${PARQUET_ERR_MSG} Could not find the parquet library.")
endif()
set(PARQUET_ERR_MSG "${PARQUET_ERR_MSG} Looked in ")
if ( _parquet_roots )
set(PARQUET_ERR_MSG "${PARQUET_ERR_MSG} in ${_parquet_roots}.")
else ()
set(PARQUET_ERR_MSG "${PARQUET_ERR_MSG} system search paths.")
endif ()
if (Parquet_FIND_REQUIRED)
message(FATAL_ERROR "${PARQUET_ERR_MSG}")
else (Parquet_FIND_REQUIRED)
message(STATUS "${PARQUET_ERR_MSG}")
endif (Parquet_FIND_REQUIRED)
endif ()
endif ()
mark_as_advanced(
PARQUET_FOUND
PARQUET_INCLUDE_DIR
PARQUET_LIBS
PARQUET_LIBRARIES
PARQUET_STATIC_LIB
PARQUET_SHARED_LIB
)

31
cmake/find_parquet.cmake Normal file
View File

@ -0,0 +1,31 @@
option (USE_INTERNAL_PARQUET_LIBRARY "Set to FALSE to use system parquet library instead of bundled" ${NOT_UNBUNDLED})
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/CMakeLists.txt")
if (USE_INTERNAL_PARQUET_LIBRARY)
message (WARNING "submodule contrib/arrow (required for Parquet) is missing. to fix try run: \n git submodule update --init --recursive")
endif ()
set (USE_INTERNAL_PARQUET_LIBRARY 0)
set (MISSING_INTERNAL_PARQUET_LIBRARY 1)
endif ()
if (NOT USE_INTERNAL_PARQUET_LIBRARY)
find_package (Arrow)
find_package (Parquet)
endif ()
if (ARROW_INCLUDE_DIR AND PARQUET_INCLUDE_DIR)
elseif (NOT MISSING_INTERNAL_PARQUET_LIBRARY)
set (USE_INTERNAL_PARQUET_LIBRARY 1)
# TODO: is it required?
# set (ARROW_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src/arrow")
# set (PARQUET_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/src/parquet")
set (ARROW_LIBRARY arrow_static)
set (PARQUET_LIBRARY parquet_static)
set (USE_PARQUET 1)
endif ()
if (USE_PARQUET)
message (STATUS "Using Parquet: ${ARROW_INCLUDE_DIR} ${PARQUET_INCLUDE_DIR}")
else ()
message (STATUS "Building without Parquet support")
endif ()

View File

@ -150,6 +150,19 @@ if (USE_INTERNAL_CAPNP_LIBRARY)
target_include_directories(${CAPNP_LIBRARY} PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/capnproto/c++/src>)
endif ()
if (USE_INTERNAL_PARQUET_LIBRARY)
set (ARROW_COMPUTE ON)
set (ARROW_PARQUET ON)
set (ARROW_VERBOSE_THIRDPARTY_BUILD ON)
set (PARQUET_ARROW_LINKAGE "static")
set (ARROW_BUILD_STATIC ON)
# Because Arrow uses CMAKE_SOURCE_DIR as a project path
# Hopefully will be fixed in https://github.com/apache/arrow/pull/2676
set (CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${ClickHouse_SOURCE_DIR}/contrib/arrow/cpp/cmake_modules")
add_subdirectory (arrow/cpp)
endif ()
if (USE_INTERNAL_POCO_LIBRARY)
set (POCO_VERBOSE_MESSAGES 0 CACHE INTERNAL "")
set (save_CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})

1
contrib/arrow vendored Submodule

@ -0,0 +1 @@
Subproject commit af20905877fb353367d7ee5a808f759532a5ca0f

View File

@ -294,6 +294,13 @@ if (USE_RDKAFKA)
target_link_libraries (dbms PRIVATE clickhouse_storage_kafka)
endif ()
if (USE_PARQUET)
target_link_libraries(dbms ${PARQUET_LIBRARY} ${ARROW_LIBRARY})
if (NOT USE_INTERNAL_PARQUET_LIBRARY)
target_include_directories (dbms BEFORE PRIVATE ${PARQUET_INCLUDE_DIR} ${ARROW_INCLUDE_DIR})
endif ()
endif ()
target_link_libraries(dbms PRIVATE ${OPENSSL_CRYPTO_LIBRARY} Threads::Threads)
target_include_directories (dbms SYSTEM BEFORE PRIVATE ${DIVIDE_INCLUDE_DIR})

View File

@ -0,0 +1,336 @@
#include <algorithm>
#include <iterator>
#include <vector>
// TODO: clear includes
#include <Core/ColumnWithTypeAndName.h>
#include <Columns/IColumn.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <common/DateLUTImpl.h>
#include <DataStreams/ParquetBlockInputStream.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeNullable.h>
#include <IO/BufferBase.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/WriteHelpers.h>
#include <ext/range.h>
#include <arrow/buffer.h>
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/writer.h>
#include <parquet/exception.h>
#include <IO/copyData.h>
#include <IO/WriteBufferFromString.h>
namespace DB
{
ParquetBlockInputStream::ParquetBlockInputStream(ReadBuffer & istr_, const Block & header_)
: istr(istr_)
, header(header_)
{
}
Block ParquetBlockInputStream::getHeader() const
{
return header;
}
/// Inserts numeric data right into internal column data to reduce an overhead
template <typename NumericType>
void ParquetBlockInputStream::fillColumnWithNumericData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
{
PaddedPODArray<NumericType> & column_data = static_cast<ColumnVector<NumericType> &>(*internal_column).getData();
column_data.reserve(arrow_column->length());
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->data()->num_chunks()); ++chunk_i)
{
std::shared_ptr<arrow::Array> chunk = arrow_column->data()->chunk(chunk_i);
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk->data()->buffers[1];
const NumericType * raw_data = reinterpret_cast<const NumericType *>(buffer->data());
column_data.insert_assume_reserved(raw_data, raw_data + chunk->length());
}
}
/// Inserts chars and offsets right into internal column data to reduce an overhead.
/// Internal offsets are shifted by one to the right in comparison with Arrow ones. So the last offset should map to the end of all chars.
/// Also internal strings are null terminated.
void ParquetBlockInputStream::fillColumnWithStringData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
{
PaddedPODArray<UInt8> & column_chars_t = static_cast<ColumnString &>(*internal_column).getChars();
PaddedPODArray<UInt64> & column_offsets = static_cast<ColumnString &>(*internal_column).getOffsets();
size_t chars_t_size = 0;
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->data()->num_chunks()); ++chunk_i)
{
arrow::BinaryArray & chunk = static_cast<arrow::BinaryArray &>(*(arrow_column->data()->chunk(chunk_i)));
const size_t chunk_length = chunk.length();
chars_t_size += chunk.value_offset(chunk_length - 1) + chunk.value_length(chunk_length - 1);
chars_t_size += chunk_length; /// additional space for null bytes
}
column_chars_t.reserve(chars_t_size);
column_offsets.reserve(arrow_column->length());
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->data()->num_chunks()); ++chunk_i)
{
arrow::BinaryArray & chunk = static_cast<arrow::BinaryArray &>(*(arrow_column->data()->chunk(chunk_i)));
std::shared_ptr<arrow::Buffer> buffer = chunk.value_data();
const size_t chunk_length = chunk.length();
for (size_t offset_i = 0; offset_i != chunk_length; ++offset_i)
{
const UInt8 * raw_data = buffer->data() + chunk.value_offset(offset_i);
column_chars_t.insert_assume_reserved(raw_data, raw_data + chunk.value_length(offset_i));
column_chars_t.emplace_back('\0');
column_offsets.emplace_back(column_chars_t.size());
}
}
}
void ParquetBlockInputStream::fillColumnWithBooleanData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
{
PaddedPODArray<UInt8> & column_data = static_cast<ColumnVector<UInt8> &>(*internal_column).getData();
column_data.resize(arrow_column->length());
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->data()->num_chunks()); ++chunk_i)
{
arrow::BooleanArray & chunk = static_cast<arrow::BooleanArray &>(*(arrow_column->data()->chunk(chunk_i)));
/// buffers[0] is a null bitmap and buffers[1] are actual values
std::shared_ptr<arrow::Buffer> buffer = chunk.data()->buffers[1];
for (size_t bool_i = 0; bool_i != static_cast<size_t>(chunk.length()); ++bool_i)
column_data[bool_i] = chunk.Value(bool_i);
}
}
/// Arrow stores Parquet::DATE in Int32, while ClickHouse stores Date in UInt16. Therefore, it should be checked before saving
void ParquetBlockInputStream::fillColumnWithDate32Data(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
{
PaddedPODArray<UInt16> & column_data = static_cast<ColumnVector<UInt16> &>(*internal_column).getData();
column_data.reserve(arrow_column->length());
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->data()->num_chunks()); ++chunk_i)
{
arrow::Date32Array & chunk = static_cast<arrow::Date32Array &>(*(arrow_column->data()->chunk(chunk_i)));
for (size_t value_i = 0; value_i != static_cast<size_t>(chunk.length()); ++value_i)
{
UInt32 days_num = static_cast<UInt32>(chunk.Value(value_i));
if (days_num > DATE_LUT_MAX_DAY_NUM)
{
// TODO: will it rollback correctly?
throw Exception(
"Input value " + std::to_string(days_num) + " of a column \"" + arrow_column->name() + "\" is greater than "
"max allowed Date value, which is " + std::to_string(DATE_LUT_MAX_DAY_NUM)
);
}
column_data.emplace_back(days_num);
}
}
}
/// Creates a null bytemap from arrow's null bitmap
void ParquetBlockInputStream::fillByteMapFromArrowColumn(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & bytemap)
{
PaddedPODArray<UInt8> & bytemap_data = static_cast<ColumnVector<UInt8> &>(*bytemap).getData();
bytemap_data.reserve(arrow_column->length());
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->data()->num_chunks()); ++chunk_i)
{
std::shared_ptr<arrow::Array> chunk = arrow_column->data()->chunk(chunk_i);
for (size_t value_i = 0; value_i != static_cast<size_t>(chunk->length()); ++value_i)
bytemap_data.emplace_back(chunk->IsNull(value_i));
}
}
#define FOR_ARROW_NUMERIC_TYPES(M) \
M(arrow::Type::UINT8, UInt8) \
M(arrow::Type::INT8, Int8) \
M(arrow::Type::UINT16, UInt16) \
M(arrow::Type::INT16, Int16) \
M(arrow::Type::UINT32, UInt32) \
M(arrow::Type::INT32, Int32) \
M(arrow::Type::UINT64, UInt64) \
M(arrow::Type::INT64, Int64) \
M(arrow::Type::FLOAT, Float32) \
M(arrow::Type::DOUBLE, Float64)
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::Column>>;
const std::unordered_map<arrow::Type::type, std::shared_ptr<IDataType>> ParquetBlockInputStream::arrow_type_to_internal_type = {
{arrow::Type::UINT8, std::make_shared<DataTypeUInt8>()},
{arrow::Type::INT8, std::make_shared<DataTypeInt8>()},
{arrow::Type::UINT16, std::make_shared<DataTypeUInt16>()},
{arrow::Type::INT16, std::make_shared<DataTypeInt16>()},
{arrow::Type::UINT32, std::make_shared<DataTypeUInt32>()},
{arrow::Type::INT32, std::make_shared<DataTypeInt32>()},
{arrow::Type::UINT64, std::make_shared<DataTypeUInt64>()},
{arrow::Type::INT64, std::make_shared<DataTypeInt64>()},
{arrow::Type::FLOAT, std::make_shared<DataTypeFloat32>()},
{arrow::Type::DOUBLE, std::make_shared<DataTypeFloat64>()},
{arrow::Type::BOOL, std::make_shared<DataTypeUInt8>()},
{arrow::Type::DATE32, std::make_shared<DataTypeDate>()},
{arrow::Type::STRING, std::make_shared<DataTypeString>()}//,
// TODO: add other types that are convertable to internal ones:
// 0. ENUM?
// 1. UUID -> String
// 2. JSON -> String
};
Block ParquetBlockInputStream::readImpl()
{
Block res;
if (istr.eof())
return res;
std::string file_data;
{
WriteBufferFromString file_buffer(file_data);
copyData(istr, file_buffer);
}
arrow::Buffer buffer(file_data);
// TODO: maybe use parquet::RandomAccessSource?
auto reader = parquet::ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
parquet::arrow::FileReader filereader(::arrow::default_memory_pool(), std::move(reader));
std::shared_ptr<arrow::Table> table;
// TODO: also catch a ParquetException thrown by filereader?
arrow::Status read_status = filereader.ReadTable(&table);
if (!read_status.ok())
throw Exception("Error while reading parquet data: " + read_status.ToString()/*, ErrorCodes::TODO*/);
if (0 == table->num_rows())
throw Exception("Empty table in input data"/*, ErrorCodes::TODO*/);
if (header.columns() > static_cast<size_t>(table->num_columns()))
// TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable?
throw Exception("Number of columns is less than the table has" /*, ErrorCodes::TODO*/);
NameToColumnPtr name_to_column_ptr;
for (size_t i = 0; i != static_cast<size_t>(table->num_columns()); ++i)
{
std::shared_ptr<arrow::Column> arrow_column = table->column(i);
name_to_column_ptr[arrow_column->name()] = arrow_column;
}
for (size_t column_i = 0; column_i != header.columns(); ++column_i)
{
ColumnWithTypeAndName header_column = header.getByPosition(column_i);
if (name_to_column_ptr.find(header_column.name) == name_to_column_ptr.end())
// TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable?
throw Exception("Column \"" + header_column.name + "\" is not presented in input data" /*, ErrorCodes::TODO*/);
std::shared_ptr<arrow::Column> arrow_column = name_to_column_ptr[header_column.name];
arrow::Type::type arrow_type = arrow_column->type()->id();
if (arrow_type_to_internal_type.find(arrow_type) == arrow_type_to_internal_type.end())
{
throw Exception(
"The type \"" + arrow_column->type()->name() + "\" of an input column \"" + arrow_column->name() + "\""
" is not supported for conversion from a Parquet data format"
/*, ErrorCodes::TODO*/
);
}
// TODO: check if a column is const?
if (!header_column.type->isNullable() && arrow_column->null_count())
{
throw Exception("Can not insert NULL data into non-nullable column \"" + header_column.name + "\""/*, ErrorCodes::TODO*/);
}
const bool target_column_is_nullable = header_column.type->isNullable() || arrow_column->null_count();
const DataTypePtr internal_nested_type = arrow_type_to_internal_type.at(arrow_type);
const DataTypePtr internal_type = target_column_is_nullable ? makeNullable(internal_nested_type) : internal_nested_type;
const std::string internal_nested_type_name = internal_nested_type->getName();
const DataTypePtr column_nested_type =
header_column.type->isNullable()
? static_cast<const DataTypeNullable *>(header_column.type.get())->getNestedType()
: header_column.type;
const DataTypePtr column_type = header_column.type;
const std::string column_nested_type_name = column_nested_type->getName();
// TODO: can it be done with typeid_cast?
if (internal_nested_type_name != column_nested_type_name)
{
throw Exception(
"Input data type \"" + internal_nested_type_name + "\" for a column \"" + header_column.name + "\""
" is not compatible with a column type \"" + column_nested_type_name + "\""/*, ErrorCodes::TODO*/
);
}
ColumnWithTypeAndName column;
column.name = header_column.name;
column.type = internal_type;
/// Data
MutableColumnPtr read_column = internal_nested_type->createColumn();
switch (arrow_type)
{
case arrow::Type::STRING:
fillColumnWithStringData(arrow_column, read_column);
break;
case arrow::Type::BOOL:
fillColumnWithBooleanData(arrow_column, read_column);
break;
case arrow::Type::DATE32:
fillColumnWithDate32Data(arrow_column, read_column);
break;
#define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
case ARROW_NUMERIC_TYPE: \
fillColumnWithNumericData<CPP_NUMERIC_TYPE>(arrow_column, read_column); \
break;
FOR_ARROW_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
// TODO: support TIMESTAMP_MICROS and TIMESTAMP_MILLIS with truncated micro- and milliseconds?
// TODO: read JSON as a string?
// TODO: read UUID as a string?
default:
throw Exception("Unsupported parquet type \"" + arrow_column->type()->name() + "\""/*, ErrorCodes::TODO*/);
}
if (column.type->isNullable())
{
MutableColumnPtr null_bytemap = DataTypeUInt8().createColumn();
fillByteMapFromArrowColumn(arrow_column, null_bytemap);
column.column = ColumnNullable::create(std::move(read_column), std::move(null_bytemap));
}
else
{
column.column = std::move(read_column);
}
res.insert(std::move(column));
}
return res;
}
}

View File

@ -0,0 +1,43 @@
#pragma once
#include <Columns/IColumn.h>
#include <Columns/ColumnVector.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeDate.h>
// TODO: refine includes
#include <arrow/api.h>
namespace DB
{
class ParquetBlockInputStream : public IProfilingBlockInputStream
{
public:
ParquetBlockInputStream(ReadBuffer & istr_, const Block & header_);
String getName() const override { return "Parquet"; }
Block getHeader() const override;
protected:
Block readImpl() override;
private:
ReadBuffer & istr;
Block header;
static void fillColumnWithStringData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column);
static void fillColumnWithBooleanData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column);
static void fillColumnWithDate32Data(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column);
template <typename NumericType>
static void fillColumnWithNumericData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column);
static void fillByteMapFromArrowColumn(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & bytemap);
static const std::unordered_map<arrow::Type::type, std::shared_ptr<IDataType>> arrow_type_to_internal_type;
// TODO: check that this class implements every part of its parent
};
}

View File

@ -0,0 +1,271 @@
// TODO: clean includes
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnsNumber.h>
#include <Core/ColumnWithTypeAndName.h>
#include <DataTypes/DataTypeNullable.h>
#include <IO/WriteHelpers.h>
#include <arrow/api.h>
#include <arrow/io/api.h>
#include <parquet/arrow/writer.h>
#include <parquet/util/memory.h>
#include <parquet/exception.h>
#include <DataStreams/ParquetBlockOutputStream.h>
namespace DB
{
ParquetBlockOutputStream::ParquetBlockOutputStream(WriteBuffer & ostr_, const Block & header_)
: ostr(ostr_)
, header(header_)
{
}
void ParquetBlockOutputStream::flush()
{
ostr.next();
}
void checkAppendStatus(arrow::Status & append_status, const std::string & column_name)
{
if (!append_status.ok())
{
throw Exception(
"Error while building a parquet column \"" + column_name + "\": " + append_status.ToString()/*,
ErrorCodes::TODO*/
);
}
}
void checkFinishStatus(arrow::Status & finish_status, const std::string & column_name)
{
if (!finish_status.ok())
{
throw Exception(
"Error while writing a parquet column \"" + column_name + "\": " + finish_status.ToString()/*,
ErrorCodes::TODO*/
);
}
}
template <typename NumericType, typename ArrowBuilderType>
void ParquetBlockOutputStream::fillArrowArrayWithNumericColumnData(
ColumnPtr write_column,
std::shared_ptr<arrow::Array> & arrow_array,
const PaddedPODArray<UInt8> * null_bytemap
) {
const PaddedPODArray<NumericType> & internal_data = static_cast<const ColumnVector<NumericType> &>(*write_column).getData();
ArrowBuilderType numeric_builder;
arrow::Status append_status;
const UInt8 * arrow_null_bytemap_raw_ptr = nullptr;
PaddedPODArray<UInt8> arrow_null_bytemap;
if (null_bytemap)
{
/// Invert values since Arrow interprets 1 as a non-null value, while CH as a null
arrow_null_bytemap.reserve(null_bytemap->size());
for (size_t i = 0; i != null_bytemap->size(); ++i)
arrow_null_bytemap.emplace_back(1 ^ (*null_bytemap)[i]);
arrow_null_bytemap_raw_ptr = arrow_null_bytemap.data();
}
append_status = numeric_builder.AppendValues(internal_data.data(), internal_data.size(), arrow_null_bytemap_raw_ptr);
checkAppendStatus(append_status, write_column->getName());
arrow::Status finish_status = numeric_builder.Finish(&arrow_array);
checkFinishStatus(finish_status, write_column->getName());
}
void ParquetBlockOutputStream::fillArrowArrayWithStringColumnData(
ColumnPtr write_column,
std::shared_ptr<arrow::Array> & arrow_array,
const PaddedPODArray<UInt8> * null_bytemap
) {
const ColumnString & internal_column = static_cast<const ColumnString &>(*write_column);
arrow::StringBuilder string_builder;
arrow::Status append_status;
for (size_t string_i = 0; string_i != internal_column.size(); ++string_i)
{
if (null_bytemap && (*null_bytemap)[string_i])
{
append_status = string_builder.AppendNull();
}
else
{
StringRef string_ref = internal_column.getDataAt(string_i);
append_status = string_builder.Append(string_ref.data, string_ref.size);
}
checkAppendStatus(append_status, write_column->getName());
}
arrow::Status finish_status = string_builder.Finish(&arrow_array);
checkFinishStatus(finish_status, write_column->getName());
}
void ParquetBlockOutputStream::fillArrowArrayWithDateColumnData(
ColumnPtr write_column,
std::shared_ptr<arrow::Array> & arrow_array,
const PaddedPODArray<UInt8> * null_bytemap
) {
const PaddedPODArray<UInt16> & internal_data = static_cast<const ColumnVector<UInt16> &>(*write_column).getData();
arrow::Date32Builder date32_builder;
arrow::Status append_status;
for (size_t value_i = 0; value_i != internal_data.size(); ++value_i)
{
if (null_bytemap && (*null_bytemap)[value_i])
append_status = date32_builder.AppendNull();
else
/// Implicitly converts UInt16 to Int32
append_status = date32_builder.Append(internal_data[value_i]);
checkAppendStatus(append_status, write_column->getName());
}
arrow::Status finish_status = date32_builder.Finish(&arrow_array);
checkFinishStatus(finish_status, write_column->getName());
}
#define FOR_INTERNAL_NUMERIC_TYPES(M) \
M(UInt8, arrow::UInt8Builder) \
M(Int8, arrow::Int8Builder) \
M(UInt16, arrow::UInt16Builder) \
M(Int16, arrow::Int16Builder) \
M(UInt32, arrow::UInt32Builder) \
M(Int32, arrow::Int32Builder) \
M(UInt64, arrow::UInt64Builder) \
M(Int64, arrow::Int64Builder) \
M(Float32, arrow::FloatBuilder) \
M(Float64, arrow::DoubleBuilder)
const std::unordered_map<String, std::shared_ptr<arrow::DataType>> ParquetBlockOutputStream::internal_type_to_arrow_type = {
{"UInt8", arrow::uint8()},
{"Int8", arrow::int8()},
{"UInt16", arrow::uint16()},
{"Int16", arrow::int16()},
{"UInt32", arrow::uint32()},
{"Int32", arrow::int32()},
{"UInt64", arrow::uint64()},
{"Int64", arrow::int64()},
{"Float32", arrow::float32()},
{"Float64", arrow::float64()},
{"Date", arrow::date32()},
// TODO: ClickHouse can actually store non-utf8 strings!
{"String", arrow::utf8()}//,
// TODO: add other types:
// 1. FixedString
// 2. DateTime
};
const PaddedPODArray<UInt8> * extractNullBytemapPtr(ColumnPtr column)
{
ColumnPtr null_column = static_cast<const ColumnNullable &>(*column).getNullMapColumnPtr();
const PaddedPODArray<UInt8> & null_bytemap = static_cast<const ColumnVector<UInt8> &>(*null_column).getData();
return &null_bytemap;
}
void ParquetBlockOutputStream::write(const Block & block)
{
block.checkNumberOfRows();
const size_t columns_num = block.columns();
/// For arrow::Schema and arrow::Table creation
std::vector<std::shared_ptr<arrow::Field>> arrow_fields;
std::vector<std::shared_ptr<arrow::Array>> arrow_arrays;
arrow_fields.reserve(columns_num);
arrow_arrays.reserve(columns_num);
for (size_t column_i = 0; column_i < columns_num; ++column_i)
{
// TODO: constructed every iteration
const ColumnWithTypeAndName & column = block.safeGetByPosition(column_i);
const bool is_column_nullable = column.type->isNullable();
const DataTypePtr column_nested_type =
is_column_nullable
? static_cast<const DataTypeNullable *>(column.type.get())->getNestedType()
: column.type;
const DataTypePtr column_type = column.type;
// TODO: do not mix std::string and String
const std::string column_nested_type_name = column_nested_type->getName();
if (internal_type_to_arrow_type.find(column_nested_type_name) == internal_type_to_arrow_type.end())
{
throw Exception(
"The type \"" + column_nested_type_name + "\" of a column \"" + column.name + "\""
" is not supported for conversion into a Parquet data format"
/*, ErrorCodes::TODO*/
);
}
arrow_fields.emplace_back(new arrow::Field(
column.name,
internal_type_to_arrow_type.at(column_nested_type_name),
is_column_nullable
));
std::shared_ptr<arrow::Array> arrow_array;
ColumnPtr nested_column = is_column_nullable ? static_cast<const ColumnNullable &>(*column.column).getNestedColumnPtr() : column.column;
const PaddedPODArray<UInt8> * null_bytemap = is_column_nullable ? extractNullBytemapPtr(column.column) : nullptr;
// TODO: use typeid_cast
if ("String" == column_nested_type_name)
{
fillArrowArrayWithStringColumnData(nested_column, arrow_array, null_bytemap);
}
else if ("Date" == column_nested_type_name)
{
fillArrowArrayWithDateColumnData(nested_column, arrow_array, null_bytemap);
}
#define DISPATCH(CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE) \
else if (#CPP_NUMERIC_TYPE == column_nested_type_name) \
{ \
fillArrowArrayWithNumericColumnData<CPP_NUMERIC_TYPE, ARROW_BUILDER_TYPE>(nested_column, arrow_array, null_bytemap); \
}
FOR_INTERNAL_NUMERIC_TYPES(DISPATCH)
#undef DISPATCH
// TODO: there are also internal types that are convertable to parquet/arrow once:
// 1. FixedString(N)
// 2. DateTime
else
{
throw Exception(
"Internal type \"" + column_nested_type_name + "\" of a column \"" + column.name + "\""
" is not supported for conversion into a Parquet data format"/*, ErrorCodes::TODO*/
);
}
arrow_arrays.emplace_back(std::move(arrow_array));
}
std::shared_ptr<arrow::Schema> arrow_schema = std::make_shared<arrow::Schema>(std::move(arrow_fields));
std::shared_ptr<arrow::Table> arrow_table = arrow::Table::Make(arrow_schema, arrow_arrays);
// TODO: get rid of extra copying
std::shared_ptr<parquet::InMemoryOutputStream> sink = std::make_shared<parquet::InMemoryOutputStream>();
// TODO: calculate row_group_size depending on a number of rows and table size
arrow::Status write_status = parquet::arrow::WriteTable(
*arrow_table, arrow::default_memory_pool(), sink,
/* row_group_size = */arrow_table->num_rows(), parquet::default_writer_properties(),
parquet::arrow::default_arrow_writer_properties()
);
if (!write_status.ok())
throw Exception("Error while writing a table: " + write_status.ToString()/*, ErrorCodes::TODO*/);
std::shared_ptr<arrow::Buffer> table_buffer = sink->GetBuffer();
writeString(reinterpret_cast<const char *>(table_buffer->data()), table_buffer->size(), ostr);
}
};

View File

@ -0,0 +1,36 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
class ParquetBlockOutputStream : public IBlockOutputStream
{
public:
ParquetBlockOutputStream(WriteBuffer & ostr_, const Block & header_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void flush() override;
String getContentType() const override { return "application/octet-stream"; }
private:
WriteBuffer & ostr;
Block header;
static void fillArrowArrayWithDateColumnData(ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array,
const PaddedPODArray<UInt8> * null_bytemap);
static void fillArrowArrayWithStringColumnData(ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array,
const PaddedPODArray<UInt8> * null_bytemap);
template <typename NumericType, typename ArrowBuilderType>
static void fillArrowArrayWithNumericColumnData(ColumnPtr write_column, std::shared_ptr<arrow::Array> & arrow_array,
const PaddedPODArray<UInt8> * null_bytemap);
static const std::unordered_map<String, std::shared_ptr<arrow::DataType>> internal_type_to_arrow_type;
};
}