This commit is contained in:
Arthur Passos 2024-11-21 09:40:48 +04:00 committed by GitHub
commit b82bbdacea
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 308 additions and 148 deletions

View File

@ -48,6 +48,22 @@ public:
consume(bytes);
}
template <typename TValue, typename ParquetType>
void ALWAYS_INLINE readValuesOfDifferentSize(TValue * dst, size_t count)
{
auto necessary_bytes = count * sizeof(ParquetType);
checkAvaible(necessary_bytes);
const ParquetType* src = reinterpret_cast<const ParquetType*>(data);
for (std::size_t i = 0; i < count; i++)
{
dst[i] = static_cast<TValue>(src[i]);
}
consume(necessary_bytes);
}
void ALWAYS_INLINE readDateTime64FromInt96(DateTime64 & dst)
{
static const int max_scale_num = 9;

View File

@ -240,8 +240,8 @@ TValue * getResizedPrimitiveData(TColumn & column, size_t size)
} // anoynomous namespace
template <>
void ParquetPlainValuesReader<ColumnString>::readBatch(
template <typename TColumn>
void ParquetPlainByteArrayValuesReader<TColumn>::readBatch(
MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values)
{
auto & column = *assert_cast<ColumnString *>(col_ptr.get());
@ -322,8 +322,8 @@ void ParquetBitPlainReader<TColumn>::readBatch(
}
template <>
void ParquetPlainValuesReader<ColumnDecimal<DateTime64>, ParquetReaderTypes::TimestampInt96>::readBatch(
template <typename TColumn>
void ParquetPlainInt96ValuesReader<TColumn>::readBatch(
MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values)
{
auto cursor = col_ptr->size();
@ -350,8 +350,8 @@ void ParquetPlainValuesReader<ColumnDecimal<DateTime64>, ParquetReaderTypes::Tim
);
}
template <typename TColumn, ParquetReaderTypes reader_type>
void ParquetPlainValuesReader<TColumn, reader_type>::readBatch(
template <typename TColumn, typename ParquetType>
void ParquetPlainValuesReader<TColumn, ParquetType>::readBatch(
MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values)
{
auto cursor = col_ptr->size();
@ -365,11 +365,11 @@ void ParquetPlainValuesReader<TColumn, reader_type>::readBatch(
null_map,
/* individual_visitor */ [&](size_t nest_cursor)
{
plain_data_buffer.readValue(column_data[nest_cursor]);
plain_data_buffer.readValuesOfDifferentSize<TValue, ParquetType>(column_data + nest_cursor, 1);
},
/* repeated_visitor */ [&](size_t nest_cursor, UInt32 count)
{
plain_data_buffer.readBytes(column_data + nest_cursor, count * sizeof(TValue));
plain_data_buffer.readValuesOfDifferentSize<TValue, ParquetType>(column_data + nest_cursor, count);
}
);
}
@ -576,18 +576,19 @@ void ParquetRleDictReader<TColumnVector>::readBatch(
}
template class ParquetPlainValuesReader<ColumnInt32>;
template class ParquetPlainValuesReader<ColumnUInt32>;
template class ParquetPlainValuesReader<ColumnInt64>;
template class ParquetPlainValuesReader<ColumnUInt64>;
template class ParquetPlainValuesReader<ColumnBFloat16>;
template class ParquetPlainValuesReader<ColumnFloat32>;
template class ParquetPlainValuesReader<ColumnFloat64>;
template class ParquetPlainValuesReader<ColumnDecimal<Decimal32>>;
template class ParquetPlainValuesReader<ColumnDecimal<Decimal64>>;
template class ParquetPlainValuesReader<ColumnDecimal<DateTime64>>;
template class ParquetPlainValuesReader<ColumnString>;
template class ParquetPlainValuesReader<ColumnUInt8>;
template class ParquetPlainValuesReader<ColumnUInt8, int32_t>;
template class ParquetPlainValuesReader<ColumnInt8, int32_t>;
template class ParquetPlainValuesReader<ColumnUInt16, int32_t>;
template class ParquetPlainValuesReader<ColumnInt16, int32_t>;
template class ParquetPlainValuesReader<ColumnUInt32, int32_t>;
template class ParquetPlainValuesReader<ColumnInt32, int32_t>;
template class ParquetPlainValuesReader<ColumnUInt64, int64_t>;
template class ParquetPlainValuesReader<ColumnInt64, int64_t>;
template class ParquetPlainValuesReader<ColumnFloat32, float>;
template class ParquetPlainValuesReader<ColumnFloat64, double>;
template class ParquetPlainValuesReader<ColumnDecimal<Decimal32>, int32_t>;
template class ParquetPlainValuesReader<ColumnDecimal<Decimal64>, int64_t>;
template class ParquetPlainValuesReader<ColumnDecimal<DateTime64>, int64_t>;
template class ParquetBitPlainReader<ColumnUInt8>;
@ -598,12 +599,10 @@ template class ParquetRleLCReader<ColumnUInt8>;
template class ParquetRleLCReader<ColumnUInt16>;
template class ParquetRleLCReader<ColumnUInt32>;
template class ParquetRleDictReader<ColumnUInt8>;
template class ParquetRleDictReader<ColumnInt32>;
template class ParquetRleDictReader<ColumnUInt32>;
template class ParquetRleDictReader<ColumnInt64>;
template class ParquetRleDictReader<ColumnUInt64>;
template class ParquetRleDictReader<ColumnBFloat16>;
template class ParquetRleDictReader<ColumnFloat32>;
template class ParquetRleDictReader<ColumnFloat64>;
template class ParquetRleDictReader<ColumnDecimal<Decimal32>>;
@ -613,4 +612,8 @@ template class ParquetRleDictReader<ColumnDecimal<Decimal256>>;
template class ParquetRleDictReader<ColumnDecimal<DateTime64>>;
template class ParquetRleDictReader<ColumnString>;
template class ParquetPlainByteArrayValuesReader<ColumnString>;
template class ParquetPlainInt96ValuesReader<ColumnDecimal<DateTime64>>;
}

View File

@ -150,7 +150,7 @@ enum class ParquetReaderTypes
/**
* The definition level is RLE or BitPacked encoding, while data is read directly
*/
template <typename TColumn, ParquetReaderTypes reader_type = ParquetReaderTypes::Normal>
template <typename TColumn, typename ParquetType>
class ParquetPlainValuesReader : public ParquetDataValuesReader
{
public:
@ -172,6 +172,50 @@ private:
ParquetDataBuffer plain_data_buffer;
};
template <typename TColumn>
class ParquetPlainInt96ValuesReader : public ParquetDataValuesReader
{
public:
ParquetPlainInt96ValuesReader(
Int32 max_def_level_,
std::unique_ptr<RleValuesReader> def_level_reader_,
ParquetDataBuffer data_buffer_)
: max_def_level(max_def_level_)
, def_level_reader(std::move(def_level_reader_))
, plain_data_buffer(std::move(data_buffer_))
{}
void readBatch(MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values) override;
private:
Int32 max_def_level;
std::unique_ptr<RleValuesReader> def_level_reader;
ParquetDataBuffer plain_data_buffer;
};
template <typename TColumn>
class ParquetPlainByteArrayValuesReader : public ParquetDataValuesReader
{
public:
ParquetPlainByteArrayValuesReader(
Int32 max_def_level_,
std::unique_ptr<RleValuesReader> def_level_reader_,
ParquetDataBuffer data_buffer_)
: max_def_level(max_def_level_)
, def_level_reader(std::move(def_level_reader_))
, plain_data_buffer(std::move(data_buffer_))
{}
void readBatch(MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values) override;
private:
Int32 max_def_level;
std::unique_ptr<RleValuesReader> def_level_reader;
ParquetDataBuffer plain_data_buffer;
};
template <typename TColumn>
class ParquetBitPlainReader : public ParquetDataValuesReader
{

View File

@ -173,13 +173,7 @@ ColumnPtr readDictPage(
}
template <typename TColumn>
std::unique_ptr<ParquetDataValuesReader> createPlainReader(
const parquet::ColumnDescriptor & col_des,
RleValuesReaderPtr def_level_reader,
ParquetDataBuffer buffer);
template <is_col_over_big_decimal TColumnDecimal>
template <is_col_over_big_decimal TColumnDecimal, typename ParquetType>
std::unique_ptr<ParquetDataValuesReader> createPlainReader(
const parquet::ColumnDescriptor & col_des,
RleValuesReaderPtr def_level_reader,
@ -192,25 +186,62 @@ std::unique_ptr<ParquetDataValuesReader> createPlainReader(
std::move(buffer));
}
template <typename TColumn>
template <typename TColumn, typename ParquetType>
std::unique_ptr<ParquetDataValuesReader> createPlainReader(
const parquet::ColumnDescriptor & col_des,
RleValuesReaderPtr def_level_reader,
ParquetDataBuffer buffer)
{
if (std::is_same_v<TColumn, ColumnDecimal<DateTime64>> && col_des.physical_type() == parquet::Type::INT96)
return std::make_unique<ParquetPlainValuesReader<TColumn, ParquetReaderTypes::TimestampInt96>>(
if constexpr (std::is_same_v<TColumn, ColumnDecimal<DateTime64>> && std::is_same_v<ParquetType, ParquetInt96TypeStub>)
return std::make_unique<ParquetPlainInt96ValuesReader<TColumn>>(
col_des.max_definition_level(), std::move(def_level_reader), std::move(buffer));
return std::make_unique<ParquetPlainValuesReader<TColumn>>(
if constexpr (std::is_same_v<ParquetType, ParquetByteArrayTypeStub>)
{
return std::make_unique<ParquetPlainByteArrayValuesReader<TColumn>>(
col_des.max_definition_level(), std::move(def_level_reader), std::move(buffer));
}
return std::make_unique<ParquetPlainValuesReader<TColumn, ParquetType>>(
col_des.max_definition_level(), std::move(def_level_reader), std::move(buffer));
}
template <typename TColumn, typename ParquetType>
std::unique_ptr<ParquetDataValuesReader> createReader(
const parquet::ColumnDescriptor & col_descriptor,
RleValuesReaderPtr def_level_reader,
const uint8_t * buffer,
std::size_t buffer_max_size,
const DataTypePtr & base_data_type)
{
if constexpr (std::is_same_v<ParquetType, bool>)
{
auto bit_reader = std::make_unique<arrow::bit_util::BitReader>(buffer, buffer_max_size);
return std::make_unique<ParquetBitPlainReader<TColumn>>(
col_descriptor.max_definition_level(), std::move(def_level_reader), std::move(bit_reader));
}
else
{
ParquetDataBuffer parquet_buffer = [&]()
{
if constexpr (!std::is_same_v<ColumnDecimal<DateTime64>, TColumn>)
return ParquetDataBuffer(buffer, buffer_max_size);
auto scale = assert_cast<const DataTypeDateTime64 &>(*base_data_type).getScale();
return ParquetDataBuffer(buffer, buffer_max_size, scale);
}();
return createPlainReader<TColumn, ParquetType>(col_descriptor, std::move(def_level_reader), parquet_buffer);
}
}
} // anonymous namespace
template <typename TColumn>
ParquetLeafColReader<TColumn>::ParquetLeafColReader(
template <typename TColumn, typename ParquetType>
ParquetLeafColReader<TColumn, ParquetType>::ParquetLeafColReader(
const parquet::ColumnDescriptor & col_descriptor_,
DataTypePtr base_type_,
std::unique_ptr<parquet::ColumnChunkMetaData> meta_,
@ -223,8 +254,8 @@ ParquetLeafColReader<TColumn>::ParquetLeafColReader(
{
}
template <typename TColumn>
ColumnWithTypeAndName ParquetLeafColReader<TColumn>::readBatch(UInt64 rows_num, const String & name)
template <typename TColumn, typename ParquetType>
ColumnWithTypeAndName ParquetLeafColReader<TColumn, ParquetType>::readBatch(UInt64 rows_num, const String & name)
{
reading_rows_num = rows_num;
auto readPageIfEmpty = [&]()
@ -251,41 +282,42 @@ ColumnWithTypeAndName ParquetLeafColReader<TColumn>::readBatch(UInt64 rows_num,
return releaseColumn(name);
}
template <>
void ParquetLeafColReader<ColumnString>::resetColumn(UInt64 rows_num)
template <typename TColumn, typename ParquetType>
void ParquetLeafColReader<TColumn, ParquetType>::resetColumn(UInt64 rows_num)
{
if (reading_low_cardinality)
if constexpr (std::is_same_v<TColumn, ColumnString>)
{
assert(dictionary);
visitColStrIndexType(dictionary->size(), [&]<typename TColVec>(TColVec *)
if (reading_low_cardinality)
{
column = TColVec::create();
});
assert(dictionary);
visitColStrIndexType(dictionary->size(), [&]<typename TColVec>(TColVec *)
{
column = TColVec::create();
});
// only first position is used
null_map = std::make_unique<LazyNullMap>(1);
column->reserve(rows_num);
// only first position is used
null_map = std::make_unique<LazyNullMap>(1);
column->reserve(rows_num);
}
else
{
null_map = std::make_unique<LazyNullMap>(rows_num);
column = ColumnString::create();
reserveColumnStrRows(column, rows_num);
}
}
else
{
assert(!reading_low_cardinality);
column = base_data_type->createColumn();
column->reserve(rows_num);
null_map = std::make_unique<LazyNullMap>(rows_num);
column = ColumnString::create();
reserveColumnStrRows(column, rows_num);
}
}
template <typename TColumn>
void ParquetLeafColReader<TColumn>::resetColumn(UInt64 rows_num)
{
assert(!reading_low_cardinality);
column = base_data_type->createColumn();
column->reserve(rows_num);
null_map = std::make_unique<LazyNullMap>(rows_num);
}
template <typename TColumn>
void ParquetLeafColReader<TColumn>::degradeDictionary()
template <typename TColumn, typename ParquetType>
void ParquetLeafColReader<TColumn, ParquetType>::degradeDictionary()
{
// if last batch read all dictionary indices, then degrade is not needed this time
if (!column)
@ -331,8 +363,8 @@ void ParquetLeafColReader<TColumn>::degradeDictionary()
LOG_DEBUG(log, "degraded dictionary to normal column");
}
template <typename TColumn>
ColumnWithTypeAndName ParquetLeafColReader<TColumn>::releaseColumn(const String & name)
template <typename TColumn, typename ParquetType>
ColumnWithTypeAndName ParquetLeafColReader<TColumn, ParquetType>::releaseColumn(const String & name)
{
DataTypePtr data_type = base_data_type;
if (reading_low_cardinality)
@ -365,8 +397,8 @@ ColumnWithTypeAndName ParquetLeafColReader<TColumn>::releaseColumn(const String
return res;
}
template <typename TColumn>
void ParquetLeafColReader<TColumn>::readPage()
template <typename TColumn, typename ParquetType>
void ParquetLeafColReader<TColumn, ParquetType>::readPage()
{
// refer to: ColumnReaderImplBase::ReadNewPage in column_reader.cc
// this is where decompression happens
@ -408,8 +440,8 @@ void ParquetLeafColReader<TColumn>::readPage()
}
}
template <typename TColumn>
void ParquetLeafColReader<TColumn>::initDataReader(
template <typename TColumn, typename ParquetType>
void ParquetLeafColReader<TColumn, ParquetType>::initDataReader(
parquet::Encoding::type enconding_type,
const uint8_t * buffer,
std::size_t max_size,
@ -425,29 +457,8 @@ void ParquetLeafColReader<TColumn>::initDataReader(
degradeDictionary();
}
if (col_descriptor.physical_type() == parquet::Type::BOOLEAN)
{
if constexpr (std::is_same_v<TColumn, ColumnUInt8>)
{
auto bit_reader = std::make_unique<arrow::bit_util::BitReader>(buffer, max_size);
data_values_reader = std::make_unique<ParquetBitPlainReader<ColumnUInt8>>(col_descriptor.max_definition_level(),
std::move(def_level_reader),
std::move(bit_reader));
}
}
else
{
ParquetDataBuffer parquet_buffer = [&]()
{
if constexpr (!std::is_same_v<ColumnDecimal<DateTime64>, TColumn>)
return ParquetDataBuffer(buffer, max_size);
auto scale = assert_cast<const DataTypeDateTime64 &>(*base_data_type).getScale();
return ParquetDataBuffer(buffer, max_size, scale);
}();
data_values_reader = createPlainReader<TColumn>(
col_descriptor, std::move(def_level_reader), std::move(parquet_buffer));
}
data_values_reader = createReader<TColumn, ParquetType>(
col_descriptor, std::move(def_level_reader), buffer, max_size, base_data_type);
break;
}
case parquet::Encoding::RLE_DICTIONARY:
@ -476,8 +487,8 @@ void ParquetLeafColReader<TColumn>::initDataReader(
}
}
template <typename TColumn>
void ParquetLeafColReader<TColumn>::readPageV1(const parquet::DataPageV1 & page)
template <typename TColumn, typename ParquetType>
void ParquetLeafColReader<TColumn, ParquetType>::readPageV1(const parquet::DataPageV1 & page)
{
cur_page_values = page.num_values();
@ -562,8 +573,8 @@ void ParquetLeafColReader<TColumn>::readPageV1(const parquet::DataPageV1 & page)
* The data buffer is "offset-ed" by rl bytes length and then dl decoder is built using RLE decoder. Since dl bytes length was present in the header,
* there is no need to read it and apply an offset like in page v1.
* */
template <typename TColumn>
void ParquetLeafColReader<TColumn>::readPageV2(const parquet::DataPageV2 & page)
template <typename TColumn, typename ParquetType>
void ParquetLeafColReader<TColumn, ParquetType>::readPageV2(const parquet::DataPageV2 & page)
{
cur_page_values = page.num_values();
@ -609,28 +620,32 @@ void ParquetLeafColReader<TColumn>::readPageV2(const parquet::DataPageV2 & page)
initDataReader(page.encoding(), buffer, page.size() - total_levels_length, std::move(def_level_reader));
}
template <typename TColumn>
std::unique_ptr<ParquetDataValuesReader> ParquetLeafColReader<TColumn>::createDictReader(
template <typename TColumn, typename ParquetType>
std::unique_ptr<ParquetDataValuesReader> ParquetLeafColReader<TColumn, ParquetType>::createDictReader(
std::unique_ptr<RleValuesReader> def_level_reader, std::unique_ptr<RleValuesReader> rle_data_reader)
{
if (reading_low_cardinality && std::same_as<TColumn, ColumnString>)
{
std::unique_ptr<ParquetDataValuesReader> res;
visitColStrIndexType(dictionary->size(), [&]<typename TCol>(TCol *)
{
res = std::make_unique<ParquetRleLCReader<TCol>>(
col_descriptor.max_definition_level(),
std::move(def_level_reader),
std::move(rle_data_reader));
});
return res;
}
if (col_descriptor.physical_type() == parquet::Type::type::BOOLEAN)
if constexpr (std::is_same_v<TColumn, ColumnUInt8> || std::is_same_v<TColumn, ColumnInt8>
|| std::is_same_v<TColumn, ColumnUInt16> || std::is_same_v<TColumn, ColumnInt16>)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Dictionary encoding for booleans is not supported");
}
if (reading_low_cardinality)
{
if constexpr (std::same_as<TColumn, ColumnString>)
{
std::unique_ptr<ParquetDataValuesReader> res;
visitColStrIndexType(dictionary->size(), [&]<typename TCol>(TCol *)
{
res = std::make_unique<ParquetRleLCReader<TCol>>(
col_descriptor.max_definition_level(),
std::move(def_level_reader),
std::move(rle_data_reader));
});
return res;
}
}
return std::make_unique<ParquetRleDictReader<TColumn>>(
col_descriptor.max_definition_level(),
std::move(def_level_reader),
@ -639,19 +654,23 @@ std::unique_ptr<ParquetDataValuesReader> ParquetLeafColReader<TColumn>::createDi
}
template class ParquetLeafColReader<ColumnUInt8>;
template class ParquetLeafColReader<ColumnInt32>;
template class ParquetLeafColReader<ColumnUInt32>;
template class ParquetLeafColReader<ColumnInt64>;
template class ParquetLeafColReader<ColumnUInt64>;
template class ParquetLeafColReader<ColumnBFloat16>;
template class ParquetLeafColReader<ColumnFloat32>;
template class ParquetLeafColReader<ColumnFloat64>;
template class ParquetLeafColReader<ColumnString>;
template class ParquetLeafColReader<ColumnDecimal<Decimal32>>;
template class ParquetLeafColReader<ColumnDecimal<Decimal64>>;
template class ParquetLeafColReader<ColumnDecimal<Decimal128>>;
template class ParquetLeafColReader<ColumnDecimal<Decimal256>>;
template class ParquetLeafColReader<ColumnDecimal<DateTime64>>;
template class ParquetLeafColReader<ColumnUInt8, bool>;
template class ParquetLeafColReader<ColumnUInt8, int32_t>;
template class ParquetLeafColReader<ColumnInt8, int32_t>;
template class ParquetLeafColReader<ColumnUInt16, int32_t>;
template class ParquetLeafColReader<ColumnInt16, int32_t>;
template class ParquetLeafColReader<ColumnUInt32, int32_t>;
template class ParquetLeafColReader<ColumnInt32, int32_t>;
template class ParquetLeafColReader<ColumnUInt64, int64_t>;
template class ParquetLeafColReader<ColumnInt64, int64_t>;
template class ParquetLeafColReader<ColumnFloat32, float>;
template class ParquetLeafColReader<ColumnFloat64, double>;
template class ParquetLeafColReader<ColumnString, ParquetByteArrayTypeStub>;
template class ParquetLeafColReader<ColumnDecimal<Decimal32>, int32_t>;
template class ParquetLeafColReader<ColumnDecimal<Decimal64>, int64_t>;
template class ParquetLeafColReader<ColumnDecimal<Decimal128>, ParquetByteArrayTypeStub>;
template class ParquetLeafColReader<ColumnDecimal<Decimal256>, ParquetByteArrayTypeStub>;
template class ParquetLeafColReader<ColumnDecimal<DateTime64>, ParquetInt96TypeStub>;
template class ParquetLeafColReader<ColumnDecimal<DateTime64>, int64_t>;
}

View File

@ -17,7 +17,10 @@ class ColumnDescriptor;
namespace DB
{
template <typename TColumn>
struct ParquetByteArrayTypeStub {};
struct ParquetInt96TypeStub {};
template <typename TColumn, typename ParquetType>
class ParquetLeafColReader : public ParquetColumnReader
{
public:

View File

@ -93,19 +93,20 @@ private:
std::unique_ptr<ParquetColumnReader> fromInt32INT(const parquet::IntLogicalType & int_type);
std::unique_ptr<ParquetColumnReader> fromInt64INT(const parquet::IntLogicalType & int_type);
template<class DataType>
template<class ClickHouseType, typename ParquetType>
auto makeLeafReader()
{
return std::make_unique<ParquetLeafColReader<typename DataType::ColumnType>>(
col_descriptor, std::make_shared<DataType>(), std::move(meta), std::move(page_reader));
return std::make_unique<ParquetLeafColReader<typename ClickHouseType::ColumnType, ParquetType>>(
col_descriptor, std::make_shared<ClickHouseType>(), std::move(meta), std::move(page_reader));
}
template<class DecimalType>
template<class DecimalType, typename ParquetType>
auto makeDecimalLeafReader()
{
auto data_type = std::make_shared<DataTypeDecimal<DecimalType>>(
col_descriptor.type_precision(), col_descriptor.type_scale());
return std::make_unique<ParquetLeafColReader<ColumnDecimal<DecimalType>>>(
return std::make_unique<ParquetLeafColReader<ColumnDecimal<DecimalType>, ParquetType>>(
col_descriptor, std::move(data_type), std::move(meta), std::move(page_reader));
}
@ -157,11 +158,11 @@ std::unique_ptr<ParquetColumnReader> ColReaderFactory::fromInt32()
case parquet::LogicalType::Type::INT:
return fromInt32INT(dynamic_cast<const parquet::IntLogicalType &>(*col_descriptor.logical_type()));
case parquet::LogicalType::Type::NONE:
return makeLeafReader<DataTypeInt32>();
return makeLeafReader<DataTypeInt32, int32_t>();
case parquet::LogicalType::Type::DATE:
return makeLeafReader<DataTypeDate32>();
return makeLeafReader<DataTypeDate32, int32_t>();
case parquet::LogicalType::Type::DECIMAL:
return makeDecimalLeafReader<Decimal32>();
return makeDecimalLeafReader<Decimal32, int32_t>();
default:
return throwUnsupported();
}
@ -174,16 +175,16 @@ std::unique_ptr<ParquetColumnReader> ColReaderFactory::fromInt64()
case parquet::LogicalType::Type::INT:
return fromInt64INT(dynamic_cast<const parquet::IntLogicalType &>(*col_descriptor.logical_type()));
case parquet::LogicalType::Type::NONE:
return makeLeafReader<DataTypeInt64>();
return makeLeafReader<DataTypeInt64, int64_t>();
case parquet::LogicalType::Type::TIMESTAMP:
{
const auto & tm_type = dynamic_cast<const parquet::TimestampLogicalType &>(*col_descriptor.logical_type());
auto read_type = std::make_shared<DataTypeDateTime64>(getScaleFromLogicalTimestamp(tm_type.time_unit()));
return std::make_unique<ParquetLeafColReader<ColumnDecimal<DateTime64>>>(
return std::make_unique<ParquetLeafColReader<ColumnDecimal<DateTime64>, int64_t>>(
col_descriptor, std::move(read_type), std::move(meta), std::move(page_reader));
}
case parquet::LogicalType::Type::DECIMAL:
return makeDecimalLeafReader<Decimal64>();
return makeDecimalLeafReader<Decimal64, int64_t>();
default:
return throwUnsupported();
}
@ -195,7 +196,7 @@ std::unique_ptr<ParquetColumnReader> ColReaderFactory::fromByteArray()
{
case parquet::LogicalType::Type::STRING:
case parquet::LogicalType::Type::NONE:
return makeLeafReader<DataTypeString>();
return makeLeafReader<DataTypeString, ParquetByteArrayTypeStub>();
default:
return throwUnsupported();
}
@ -210,9 +211,9 @@ std::unique_ptr<ParquetColumnReader> ColReaderFactory::fromFLBA()
if (col_descriptor.type_length() > 0)
{
if (col_descriptor.type_length() <= static_cast<int>(sizeof(Decimal128)))
return makeDecimalLeafReader<Decimal128>();
return makeDecimalLeafReader<Decimal128, ParquetByteArrayTypeStub>();
if (col_descriptor.type_length() <= static_cast<int>(sizeof(Decimal256)))
return makeDecimalLeafReader<Decimal256>();
return makeDecimalLeafReader<Decimal256, ParquetByteArrayTypeStub>();
}
return throwUnsupported(PreformattedMessage::create(
@ -227,11 +228,23 @@ std::unique_ptr<ParquetColumnReader> ColReaderFactory::fromInt32INT(const parque
{
switch (int_type.bit_width())
{
case 8:
{
if (int_type.is_signed())
return makeLeafReader<DataTypeInt8, int32_t>();
return makeLeafReader<DataTypeUInt8, int32_t>();
}
case 16:
{
if (int_type.is_signed())
return makeLeafReader<DataTypeInt16, int32_t>();
return makeLeafReader<DataTypeUInt16, int32_t>();
}
case 32:
{
if (int_type.is_signed())
return makeLeafReader<DataTypeInt32>();
return makeLeafReader<DataTypeUInt32>();
return makeLeafReader<DataTypeInt32, int32_t>();
return makeLeafReader<DataTypeUInt32, int32_t>();
}
default:
return throwUnsupported(PreformattedMessage::create(", bit width: {}", int_type.bit_width()));
@ -245,8 +258,8 @@ std::unique_ptr<ParquetColumnReader> ColReaderFactory::fromInt64INT(const parque
case 64:
{
if (int_type.is_signed())
return makeLeafReader<DataTypeInt64>();
return makeLeafReader<DataTypeUInt64>();
return makeLeafReader<DataTypeInt64, int64_t>();
return makeLeafReader<DataTypeUInt64, int64_t>();
}
default:
return throwUnsupported(PreformattedMessage::create(", bit width: {}", int_type.bit_width()));
@ -263,7 +276,7 @@ std::unique_ptr<ParquetColumnReader> ColReaderFactory::makeReader()
switch (col_descriptor.physical_type())
{
case parquet::Type::BOOLEAN:
return makeLeafReader<DataTypeUInt8>();
return makeLeafReader<DataTypeUInt8, bool>();
case parquet::Type::INT32:
return fromInt32();
case parquet::Type::INT64:
@ -276,13 +289,13 @@ std::unique_ptr<ParquetColumnReader> ColReaderFactory::makeReader()
auto scale = getScaleFromArrowTimeUnit(arrow_properties.coerce_int96_timestamp_unit());
read_type = std::make_shared<DataTypeDateTime64>(scale);
}
return std::make_unique<ParquetLeafColReader<ColumnDecimal<DateTime64>>>(
return std::make_unique<ParquetLeafColReader<ColumnDecimal<DateTime64>, ParquetInt96TypeStub>>(
col_descriptor, read_type, std::move(meta), std::move(page_reader));
}
case parquet::Type::FLOAT:
return makeLeafReader<DataTypeFloat32>();
return makeLeafReader<DataTypeFloat32, float>();
case parquet::Type::DOUBLE:
return makeLeafReader<DataTypeFloat64>();
return makeLeafReader<DataTypeFloat64, double>();
case parquet::Type::BYTE_ARRAY:
return fromByteArray();
case parquet::Type::FIXED_LEN_BYTE_ARRAY:

View File

@ -0,0 +1,40 @@
-94 53304 17815465730223871
57 15888 33652524900575246
-4 14877 53832092832965652
33 3387 86326601511136103
104 3383 115438187156564782
-11 37403 145056169255259589
-72 46473 159324626361233509
103 35510 173644182696185097
-26 60902 185175917734318892
70 48767 193167023342307884
2 21648 247953090704786001
20 2986 268127160817221407
76 20277 290178827409195337
61 28692 305149163504092270
-74 65427 326871531363668398
-15 20256 351812901947846888
-39 65472 357371822264135234
79 38671 371605113770958364
-29 41706 394460710549666968
92 25026 412913269933311543
-94 53304 17815465730223871
57 15888 33652524900575246
-4 14877 53832092832965652
33 3387 86326601511136103
104 3383 115438187156564782
-11 37403 145056169255259589
-72 46473 159324626361233509
103 35510 173644182696185097
-26 60902 185175917734318892
70 48767 193167023342307884
2 21648 247953090704786001
20 2986 268127160817221407
76 20277 290178827409195337
61 28692 305149163504092270
-74 65427 326871531363668398
-15 20256 351812901947846888
-39 65472 357371822264135234
79 38671 371605113770958364
-29 41706 394460710549666968
92 25026 412913269933311543

View File

@ -0,0 +1,22 @@
#!/usr/bin/env bash
# Tags: no-ubsan, no-fasttest
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
USER_FILES_PATH=$($CLICKHOUSE_CLIENT_BINARY --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')
WORKING_DIR="${USER_FILES_PATH}/${CLICKHOUSE_TEST_UNIQUE_NAME}"
mkdir -p "${WORKING_DIR}"
DATA_FILE="${CUR_DIR}/data_parquet/multi_column_bf.gz.parquet"
DATA_FILE_USER_PATH="${WORKING_DIR}/multi_column_bf.gz.parquet"
cp ${DATA_FILE} ${DATA_FILE_USER_PATH}
${CLICKHOUSE_CLIENT} --query="select int8_logical, uint16_logical, uint64_logical from file('${DATA_FILE_USER_PATH}', Parquet) order by uint64_logical limit 20 SETTINGS input_format_parquet_use_native_reader=false;";
${CLICKHOUSE_CLIENT} --query="select int8_logical, uint16_logical, uint64_logical from file('${DATA_FILE_USER_PATH}', Parquet) order by uint64_logical limit 20 SETTINGS input_format_parquet_use_native_reader=true;";