Parquet input: add a Date type support

This commit is contained in:
Ivan Zhukov 2018-05-17 20:41:35 +03:00
parent 31e27effa6
commit f1bef15705
2 changed files with 41 additions and 5 deletions

View File

@ -7,8 +7,10 @@
#include <Columns/IColumn.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <common/DateLUTImpl.h>
#include <DataStreams/ParquetBlockInputStream.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeDate.h>
#include <IO/BufferBase.h>
#include <IO/ReadBufferFromMemory.h>
#include <IO/WriteHelpers.h>
@ -111,6 +113,34 @@ void ParquetBlockInputStream::fillColumnWithBooleanData(std::shared_ptr<arrow::C
}
}
/// Arrow stores Parquet::DATE in Int32, while ClickHouse stores Date in UInt16. Therefore, it should be checked before saving
void ParquetBlockInputStream::fillColumnWithDate32Data(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column)
{
PaddedPODArray<UInt16> & column_data = static_cast<ColumnVector<UInt16> &>(*internal_column).getData();
column_data.reserve(arrow_column->length());
for (size_t chunk_i = 0; chunk_i != static_cast<size_t>(arrow_column->data()->num_chunks()); ++chunk_i)
{
arrow::Date32Array & chunk = static_cast<arrow::Date32Array &>(*(arrow_column->data()->chunk(chunk_i)));
for (size_t value_i = 0; value_i != static_cast<size_t>(chunk.length()); ++value_i)
{
UInt32 days_num = static_cast<UInt32>(chunk.Value(value_i));
if (days_num > DATE_LUT_MAX_DAY_NUM)
{
// TODO: will it rollback correctly?
throw Exception(
"Input value " + std::to_string(days_num) + " of a column \"" + arrow_column->name() + "\" is greater than "
"max allowed Date value, which is " + std::to_string(DATE_LUT_MAX_DAY_NUM)
);
}
column_data.emplace_back(days_num);
}
}
}
#define FOR_ARROW_NUMERIC_TYPES(M) \
M(arrow::Type::UINT8, UInt8) \
M(arrow::Type::INT8, Int8) \
@ -123,6 +153,7 @@ void ParquetBlockInputStream::fillColumnWithBooleanData(std::shared_ptr<arrow::C
M(arrow::Type::FLOAT, Float32) \
M(arrow::Type::DOUBLE, Float64)
using NameToColumnPtr = std::unordered_map<std::string, std::shared_ptr<arrow::Column>>;
Block ParquetBlockInputStream::readImpl()
@ -207,6 +238,10 @@ Block ParquetBlockInputStream::readImpl()
case arrow::Type::BOOL:
fillColumnWithBooleanData(arrow_column, read_column);
break;
case arrow::Type::DATE32:
fillColumnWithDate32Data(arrow_column, read_column);
break;
/* fillColumnWithNumericData<UInt32>(arrow_column, read_column); */
// TODO: check that values smaller than INT32 are being read correctly
#define DISPATCH(ARROW_NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
case ARROW_NUMERIC_TYPE: \

View File

@ -5,6 +5,7 @@
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeDate.h>
// TODO: refine includes
#include <arrow/api.h>
/* #include <DataStreams/MarkInCompressedFile.h> */
@ -34,6 +35,7 @@ private:
void fillColumnWithStringData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column);
void fillColumnWithBooleanData(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column);
void fillColumnWithDate32Data(std::shared_ptr<arrow::Column> & arrow_column, MutableColumnPtr & internal_column);
std::unordered_map<arrow::Type::type, std::shared_ptr<IDataType>> arrow_type_to_internal_type = {
{arrow::Type::UINT8, std::make_shared<DataTypeUInt8>()},
@ -47,11 +49,10 @@ private:
{arrow::Type::FLOAT, std::make_shared<DataTypeFloat32>()},
{arrow::Type::DOUBLE, std::make_shared<DataTypeFloat64>()},
{arrow::Type::STRING, std::make_shared<DataTypeString>()},
{arrow::Type::BOOL, std::make_shared<DataTypeUInt8>()}//,
// TODO:
/* {arrow::Type::DATE32, Date32, Int32Type}, */
/* {arrow::Type::DATE64, Date64, Int32Type}//, */
{arrow::Type::BOOL, std::make_shared<DataTypeUInt8>()},
{arrow::Type::DATE32, std::make_shared<DataTypeDate>()},
{arrow::Type::STRING, std::make_shared<DataTypeString>()}//,
// TODO: add other types
};