original parquet reader

Change-Id: I83a8ec8271edefcd96cb5b3bcd12f6b545d9dec0
This commit is contained in:
copperybean 2024-01-14 11:25:12 +08:00
parent 82e6a36bd4
commit 9d0ad7ba67
8 changed files with 1866 additions and 0 deletions

View File

@ -0,0 +1,29 @@
#pragma once
#include <Columns/IColumn.h>
namespace parquet
{
class PageReader;
class ColumnChunkMetaData;
class DataPageV1;
class DataPageV2;
}
namespace DB
{
class ParquetColumnReader
{
public:
virtual ColumnWithTypeAndName readBatch(UInt32 rows_num, const String & name) = 0;
virtual ~ParquetColumnReader() = default;
};
using ParquetColReaderPtr = std::unique_ptr<ParquetColumnReader>;
using ParquetColReaders = std::vector<ParquetColReaderPtr>;
}

View File

@ -0,0 +1,179 @@
#pragma once
#include <Core/Types.h>
#include <arrow/util/bit_stream_utils.h>
#include <arrow/util/decimal.h>
#include <parquet/types.h>
namespace DB
{
template <typename T> struct ToArrowDecimal;
template <> struct ToArrowDecimal<Decimal<wide::integer<128, signed>>>
{
using ArrowDecimal = arrow::Decimal128;
};
template <> struct ToArrowDecimal<Decimal<wide::integer<256, signed>>>
{
using ArrowDecimal = arrow::Decimal256;
};
class ParquetDataBuffer
{
private:
public:
ParquetDataBuffer(const uint8_t * data_, UInt64 avaible_, UInt8 datetime64_scale_ = DataTypeDateTime64::default_scale)
: data(reinterpret_cast<const Int8 *>(data_)), avaible(avaible_), datetime64_scale(datetime64_scale_) {}
template <typename TValue>
void ALWAYS_INLINE readValue(TValue & dst)
{
checkAvaible(sizeof(TValue));
dst = *reinterpret_cast<const TValue *>(data);
consume(sizeof(TValue));
}
void ALWAYS_INLINE readBytes(void * dst, size_t bytes)
{
checkAvaible(bytes);
memcpy(dst, data, bytes);
consume(bytes);
}
void ALWAYS_INLINE readDateTime64(DateTime64 & dst)
{
static const int max_scale_num = 9;
static const UInt64 pow10[max_scale_num + 1]
= {1000000000, 100000000, 10000000, 1000000, 100000, 10000, 1000, 100, 10, 1};
static const UInt64 spd = 60 * 60 * 24;
static const UInt64 scaled_day[max_scale_num + 1]
= {spd,
10 * spd,
100 * spd,
1000 * spd,
10000 * spd,
100000 * spd,
1000000 * spd,
10000000 * spd,
100000000 * spd,
1000000000 * spd};
checkAvaible(sizeof(parquet::Int96));
auto decoded = parquet::DecodeInt96Timestamp(*reinterpret_cast<const parquet::Int96 *>(data));
uint64_t scaled_nano = decoded.nanoseconds / pow10[datetime64_scale];
dst = static_cast<Int64>(decoded.days_since_epoch * scaled_day[datetime64_scale] + scaled_nano);
consume(sizeof(parquet::Int96));
}
/**
* This method should only be used to read string whose elements size is small.
* Because memcpySmallAllowReadWriteOverflow15 instead of memcpy is used according to ColumnString::indexImpl
*/
void ALWAYS_INLINE readString(ColumnString & column, size_t cursor)
{
// refer to: PlainByteArrayDecoder::DecodeArrowDense in encoding.cc
// deserializeBinarySSE2 in SerializationString.cpp
checkAvaible(4);
auto value_len = ::arrow::util::SafeLoadAs<Int32>(getArrowData());
if (unlikely(value_len < 0 || value_len > INT32_MAX - 4))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid or corrupted value_len '{}'", value_len);
}
consume(4);
checkAvaible(value_len);
auto chars_cursor = column.getChars().size();
column.getChars().resize(chars_cursor + value_len + 1);
memcpySmallAllowReadWriteOverflow15(&column.getChars()[chars_cursor], data, value_len);
column.getChars().back() = 0;
column.getOffsets().data()[cursor] = column.getChars().size();
consume(value_len);
}
template <is_over_big_decimal TDecimal>
void ALWAYS_INLINE readOverBigDecimal(TDecimal * out, Int32 elem_bytes_num)
{
using TArrowDecimal = typename ToArrowDecimal<TDecimal>::ArrowDecimal;
checkAvaible(elem_bytes_num);
// refer to: RawBytesToDecimalBytes in reader_internal.cc, Decimal128::FromBigEndian in decimal.cc
auto status = TArrowDecimal::FromBigEndian(getArrowData(), elem_bytes_num);
if (unlikely(!status.ok()))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Read parquet decimal failed: {}", status.status().ToString());
}
status.ValueUnsafe().ToBytes(reinterpret_cast<uint8_t *>(out));
consume(elem_bytes_num);
}
private:
const Int8 * data;
UInt64 avaible;
const UInt8 datetime64_scale;
void ALWAYS_INLINE checkAvaible(UInt64 num)
{
if (unlikely(avaible < num))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Consuming {} bytes while {} avaible", num, avaible);
}
}
const uint8_t * ALWAYS_INLINE getArrowData() { return reinterpret_cast<const uint8_t *>(data); }
void ALWAYS_INLINE consume(UInt64 num)
{
data += num;
avaible -= num;
}
};
class LazyNullMap
{
public:
LazyNullMap(UInt32 size_) : size(size_), col_nullable(nullptr) {}
void setNull(UInt32 cursor)
{
initialize();
null_map[cursor] = 1;
}
void setNull(UInt32 cursor, UInt32 count)
{
initialize();
memset(null_map + cursor, 1, count);
}
ColumnPtr getNullableCol() { return col_nullable; }
private:
UInt32 size;
UInt8 * null_map;
ColumnPtr col_nullable;
void initialize()
{
if (likely(col_nullable))
{
return;
}
auto col = ColumnVector<UInt8>::create(size);
null_map = col->getData().data();
col_nullable = std::move(col);
memset(null_map, 0, size);
}
};
}

View File

@ -0,0 +1,553 @@
#include "ParquetDataValuesReader.h"
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnsNumber.h>
#include <arrow/util/decimal.h>
namespace DB
{
void RleValuesReader::nextGroup()
{
// refer to:
// RleDecoder::NextCounts in rle_encoding.h and VectorizedRleValuesReader::readNextGroup in Spark
UInt32 indicator_value = 0;
[[maybe_unused]] auto read_res = bit_reader->GetVlqInt(&indicator_value);
assert(read_res);
cur_group_is_packed = indicator_value & 1;
cur_group_size = indicator_value >> 1;
if (cur_group_is_packed)
{
cur_group_size *= 8;
cur_packed_bit_values.resize(cur_group_size);
bit_reader->GetBatch(bit_width, cur_packed_bit_values.data(), cur_group_size);
}
else
{
cur_value = 0;
read_res = bit_reader->GetAligned((bit_width + 7) / 8, &cur_value);
assert(read_res);
}
cur_group_cursor = 0;
}
template <typename IndividualVisitor, typename RepeatedVisitor>
void RleValuesReader::visitValues(
UInt32 num_values, IndividualVisitor && individual_visitor, RepeatedVisitor && repeated_visitor)
{
// refer to: VisitNullBitmapInline in visitor_inline.h
while (num_values)
{
nextGroupIfNecessary();
auto cur_count = std::min(num_values, curGroupLeft());
if (cur_group_is_packed)
{
for (auto i = cur_group_cursor; i < cur_group_cursor + cur_count; i++)
{
individual_visitor(cur_packed_bit_values[i]);
}
}
else
{
repeated_visitor(cur_count, cur_value);
}
cur_group_cursor += cur_count;
num_values -= cur_count;
}
}
template <typename IndividualVisitor, typename RepeatedVisitor>
void RleValuesReader::visitNullableValues(
size_t cursor,
UInt32 num_values,
Int32 max_def_level,
LazyNullMap & null_map,
IndividualVisitor && individual_visitor,
RepeatedVisitor && repeated_visitor)
{
while (num_values)
{
nextGroupIfNecessary();
auto cur_count = std::min(num_values, curGroupLeft());
if (cur_group_is_packed)
{
for (auto i = cur_group_cursor; i < cur_group_cursor + cur_count; i++)
{
if (cur_packed_bit_values[i] == max_def_level)
{
individual_visitor(cursor);
}
else
{
null_map.setNull(cursor);
}
cursor++;
}
}
else
{
if (cur_value == max_def_level)
{
repeated_visitor(cursor, cur_count);
}
else
{
null_map.setNull(cursor, cur_count);
}
cursor += cur_count;
}
cur_group_cursor += cur_count;
num_values -= cur_count;
}
}
template <typename IndividualNullVisitor, typename SteppedValidVisitor, typename RepeatedVisitor>
void RleValuesReader::visitNullableBySteps(
size_t cursor,
UInt32 num_values,
Int32 max_def_level,
IndividualNullVisitor && individual_null_visitor,
SteppedValidVisitor && stepped_valid_visitor,
RepeatedVisitor && repeated_visitor)
{
// refer to:
// RleDecoder::GetBatch in rle_encoding.h and TypedColumnReaderImpl::ReadBatchSpaced in column_reader.cc
// VectorizedRleValuesReader::readBatchInternal in Spark
while (num_values > 0)
{
nextGroupIfNecessary();
auto cur_count = std::min(num_values, curGroupLeft());
if (cur_group_is_packed)
{
valid_index_steps.resize(cur_count + 1);
valid_index_steps[0] = 0;
auto step_idx = 0;
auto null_map_cursor = cursor;
for (auto i = cur_group_cursor; i < cur_group_cursor + cur_count; i++)
{
if (cur_packed_bit_values[i] == max_def_level)
{
valid_index_steps[++step_idx] = 1;
}
else
{
individual_null_visitor(null_map_cursor);
if (unlikely(valid_index_steps[step_idx] == UINT8_MAX))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "unsupported packed values number");
}
valid_index_steps[step_idx]++;
}
null_map_cursor++;
}
valid_index_steps.resize(step_idx + 1);
stepped_valid_visitor(cursor, valid_index_steps);
}
else
{
repeated_visitor(cur_value == max_def_level, cursor, cur_count);
}
cursor += cur_count;
cur_group_cursor += cur_count;
num_values -= cur_count;
}
}
template <typename TValue, typename ValueGetter>
void RleValuesReader::setValues(TValue * res_values, UInt32 num_values, ValueGetter && val_getter)
{
visitValues(
num_values,
/* individual_visitor */ [&](Int32 val)
{
*(res_values++) = val_getter(val);
},
/* repeated_visitor */ [&](UInt32 count, Int32 val)
{
std::fill(res_values, res_values + count, val_getter(val));
res_values += count;
}
);
}
template <typename TValue, typename ValueGetter>
void RleValuesReader::setValueBySteps(
TValue * res_values,
const std::vector<UInt8> & col_data_steps,
ValueGetter && val_getter)
{
auto step_iterator = col_data_steps.begin();
res_values += *(step_iterator++);
visitValues(
col_data_steps.size() - 1,
/* individual_visitor */ [&](Int32 val)
{
*res_values = val_getter(val);
res_values += *(step_iterator++);
},
/* repeated_visitor */ [&](UInt32 count, Int32 val)
{
auto getted_val = val_getter(val);
for (UInt32 i = 0; i < count; i++)
{
*res_values = getted_val;
res_values += *(step_iterator++);
}
}
);
}
namespace
{
template <typename TColumn, typename TValue = typename TColumn::ValueType>
TValue * getResizedPrimitiveData(TColumn & column, size_t size)
{
auto old_size = column.size();
column.getData().resize(size);
memset(column.getData().data() + old_size, 0, sizeof(TValue) * (size - old_size));
return column.getData().data();
}
} // anoynomous namespace
template <>
void ParquetPlainValuesReader<ColumnString>::readBatch(
MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values)
{
auto & column = *assert_cast<ColumnString *>(col_ptr.get());
auto cursor = column.size();
column.getOffsets().resize(cursor + num_values);
auto * offset_data = column.getOffsets().data();
auto & chars = column.getChars();
def_level_reader->visitValues(
num_values,
/* individual_visitor */ [&](Int32 val)
{
if (val == max_def_level)
{
plain_data_buffer.readString(column, cursor);
}
else
{
chars.push_back(0);
offset_data[cursor] = chars.size();
null_map.setNull(cursor);
}
cursor++;
},
/* repeated_visitor */ [&](UInt32 count, Int32 val)
{
if (val == max_def_level)
{
for (UInt32 i = 0; i < count; i++)
{
plain_data_buffer.readString(column, cursor);
cursor++;
}
}
else
{
null_map.setNull(cursor, count);
auto chars_size_bak = chars.size();
chars.resize(chars_size_bak + count);
memset(&chars[chars_size_bak], 0, count);
auto idx = cursor;
cursor += count;
// the type of offset_data is PaddedPODArray, which makes sure that the -1 index is avaible
for (auto val_offset = offset_data[idx - 1]; idx < cursor; idx++)
{
offset_data[idx] = ++val_offset;
}
}
}
);
}
template <>
void ParquetPlainValuesReader<ColumnDecimal<DateTime64>>::readBatch(
MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values)
{
auto cursor = col_ptr->size();
auto * column_data = getResizedPrimitiveData(
*assert_cast<ColumnDecimal<DateTime64> *>(col_ptr.get()), cursor + num_values);
def_level_reader->visitNullableValues(
cursor,
num_values,
max_def_level,
null_map,
/* individual_visitor */ [&](size_t nest_cursor)
{
plain_data_buffer.readDateTime64(column_data[nest_cursor]);
},
/* repeated_visitor */ [&](size_t nest_cursor, UInt32 count)
{
auto col_data_pos = column_data + nest_cursor;
for (UInt32 i = 0; i < count; i++)
{
plain_data_buffer.readDateTime64(col_data_pos[i]);
}
}
);
}
template <typename TColumn>
void ParquetPlainValuesReader<TColumn>::readBatch(
MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values)
{
auto cursor = col_ptr->size();
auto * column_data = getResizedPrimitiveData(*assert_cast<TColumn *>(col_ptr.get()), cursor + num_values);
using TValue = std::decay_t<decltype(*column_data)>;
def_level_reader->visitNullableValues(
cursor,
num_values,
max_def_level,
null_map,
/* individual_visitor */ [&](size_t nest_cursor)
{
plain_data_buffer.readValue(column_data[nest_cursor]);
},
/* repeated_visitor */ [&](size_t nest_cursor, UInt32 count)
{
plain_data_buffer.readBytes(column_data + nest_cursor, count * sizeof(TValue));
}
);
}
template <typename TColumn>
void ParquetFixedLenPlainReader<TColumn>::readBatch(
MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values)
{
if constexpr (std::same_as<TColumn, ColumnDecimal<Decimal128>> || std::same_as<TColumn, ColumnDecimal<Decimal256>>)
{
readOverBigDecimal(col_ptr, null_map, num_values);
}
else
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "unsupported type");
}
}
template <typename TColumnOverBigDecimal>
void ParquetFixedLenPlainReader<TColumnOverBigDecimal>::readOverBigDecimal(
MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values)
{
auto cursor = col_ptr->size();
auto * column_data = getResizedPrimitiveData(
*assert_cast<TColumnOverBigDecimal *>(col_ptr.get()), cursor + num_values);
def_level_reader->visitNullableValues(
cursor,
num_values,
max_def_level,
null_map,
/* individual_visitor */ [&](size_t nest_cursor)
{
plain_data_buffer.readOverBigDecimal(column_data + nest_cursor, elem_bytes_num);
},
/* repeated_visitor */ [&](size_t nest_cursor, UInt32 count)
{
auto col_data_pos = column_data + nest_cursor;
for (UInt32 i = 0; i < count; i++)
{
plain_data_buffer.readOverBigDecimal(col_data_pos + i, elem_bytes_num);
}
}
);
}
template <typename TColumnVector>
void ParquetRleLCReader<TColumnVector>::readBatch(
MutableColumnPtr & index_col, LazyNullMap & null_map, UInt32 num_values)
{
auto cursor = index_col->size();
auto * column_data = getResizedPrimitiveData(*assert_cast<TColumnVector *>(index_col.get()), cursor + num_values);
bool has_null = false;
// in ColumnLowCardinality, first element in dictionary is null
// so we should increase each value by 1 in parquet index
auto val_getter = [&](Int32 val) { return val + 1; };
def_level_reader->visitNullableBySteps(
cursor,
num_values,
max_def_level,
/* individual_null_visitor */ [&](UInt32 nest_cursor) {
column_data[nest_cursor] = 0;
has_null = true;
},
/* stepped_valid_visitor */ [&](UInt32 nest_cursor, const std::vector<UInt8> & valid_index_steps) {
rle_data_reader->setValueBySteps(column_data + nest_cursor, valid_index_steps, val_getter);
},
/* repeated_visitor */ [&](bool is_valid, UInt32 nest_cursor, UInt32 count) {
if (is_valid)
{
rle_data_reader->setValues(column_data + nest_cursor, count, val_getter);
}
else
{
auto data_pos = column_data + nest_cursor;
std::fill(data_pos, data_pos + count, 0);
has_null = true;
}
}
);
if (has_null)
{
null_map.setNull(0);
}
}
template <>
void ParquetRleDictReader<ColumnString>::readBatch(
MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values)
{
auto & column = *assert_cast<ColumnString *>(col_ptr.get());
auto cursor = column.size();
std::vector<Int32> value_cache;
const auto & dict_chars = static_cast<const ColumnString &>(page_dictionary).getChars();
const auto & dict_offsets = static_cast<const ColumnString &>(page_dictionary).getOffsets();
column.getOffsets().resize(cursor + num_values);
auto * offset_data = column.getOffsets().data();
auto & chars = column.getChars();
auto append_nulls = [&](UInt8 num) {
for (auto limit = cursor + num; cursor < limit; cursor++)
{
chars.push_back(0);
offset_data[cursor] = chars.size();
null_map.setNull(cursor);
}
};
auto append_string = [&](Int32 dict_idx) {
auto dict_chars_cursor = dict_offsets[dict_idx - 1];
auto value_len = dict_offsets[dict_idx] - dict_chars_cursor;
auto chars_cursor = chars.size();
chars.resize(chars_cursor + value_len);
memcpySmallAllowReadWriteOverflow15(&chars[chars_cursor], &dict_chars[dict_chars_cursor], value_len);
offset_data[cursor] = chars.size();
cursor++;
};
auto val_getter = [&](Int32 val) { return val + 1; };
def_level_reader->visitNullableBySteps(
cursor,
num_values,
max_def_level,
/* individual_null_visitor */ [&](UInt32) {},
/* stepped_valid_visitor */ [&](UInt32, const std::vector<UInt8> & valid_index_steps) {
value_cache.resize(valid_index_steps.size());
rle_data_reader->setValues(value_cache.data() + 1, valid_index_steps.size() - 1, val_getter);
append_nulls(valid_index_steps[0]);
for (size_t i = 1; i < valid_index_steps.size(); i++)
{
append_string(value_cache[i]);
append_nulls(valid_index_steps[i] - 1);
}
},
/* repeated_visitor */ [&](bool is_valid, UInt32, UInt32 count) {
if (is_valid)
{
value_cache.resize(count);
rle_data_reader->setValues(value_cache.data(), count, val_getter);
for (UInt32 i = 0; i < count; i++)
{
append_string(value_cache[i]);
}
}
else
{
append_nulls(count);
}
}
);
}
template <typename TColumnVector>
void ParquetRleDictReader<TColumnVector>::readBatch(
MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values)
{
auto cursor = col_ptr->size();
auto * column_data = getResizedPrimitiveData(*assert_cast<TColumnVector *>(col_ptr.get()), cursor + num_values);
const auto & dictionary_array = static_cast<const TColumnVector &>(page_dictionary).getData();
auto val_getter = [&](Int32 val) { return dictionary_array[val]; };
def_level_reader->visitNullableBySteps(
cursor,
num_values,
max_def_level,
/* individual_null_visitor */ [&](UInt32 nest_cursor) {
null_map.setNull(nest_cursor);
},
/* stepped_valid_visitor */ [&](UInt32 nest_cursor, const std::vector<UInt8> & valid_index_steps) {
rle_data_reader->setValueBySteps(column_data + nest_cursor, valid_index_steps, val_getter);
},
/* repeated_visitor */ [&](bool is_valid, UInt32 nest_cursor, UInt32 count) {
if (is_valid)
{
rle_data_reader->setValues(column_data + nest_cursor, count, val_getter);
}
else
{
null_map.setNull(nest_cursor, count);
}
}
);
}
template class ParquetPlainValuesReader<ColumnInt32>;
template class ParquetPlainValuesReader<ColumnInt64>;
template class ParquetPlainValuesReader<ColumnFloat32>;
template class ParquetPlainValuesReader<ColumnFloat64>;
template class ParquetPlainValuesReader<ColumnDecimal<Decimal32>>;
template class ParquetPlainValuesReader<ColumnDecimal<Decimal64>>;
template class ParquetPlainValuesReader<ColumnString>;
template class ParquetFixedLenPlainReader<ColumnDecimal<Decimal128>>;
template class ParquetFixedLenPlainReader<ColumnDecimal<Decimal256>>;
template class ParquetRleLCReader<ColumnUInt8>;
template class ParquetRleLCReader<ColumnUInt16>;
template class ParquetRleLCReader<ColumnUInt32>;
template class ParquetRleDictReader<ColumnInt32>;
template class ParquetRleDictReader<ColumnInt64>;
template class ParquetRleDictReader<ColumnFloat32>;
template class ParquetRleDictReader<ColumnFloat64>;
template class ParquetRleDictReader<ColumnDecimal<Decimal32>>;
template class ParquetRleDictReader<ColumnDecimal<Decimal64>>;
template class ParquetRleDictReader<ColumnDecimal<Decimal128>>;
template class ParquetRleDictReader<ColumnDecimal<Decimal256>>;
template class ParquetRleDictReader<ColumnDecimal<DateTime64>>;
template class ParquetRleDictReader<ColumnString>;
}

View File

@ -0,0 +1,263 @@
#pragma once
#include <functional>
#include <concepts>
#include <base/logger_useful.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Columns/IColumn.h>
#include <Core/Types.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <IO/ReadBuffer.h>
#include "ParquetDataBuffer.h"
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int PARQUET_EXCEPTION;
}
class RleValuesReader
{
public:
RleValuesReader(std::unique_ptr<arrow::BitUtil::BitReader> bit_reader_, Int32 bit_width_)
: bit_reader(std::move(bit_reader_)), bit_width(bit_width_) {}
/**
* @brief Used when the bit_width is 0, so all elements have same value.
*/
RleValuesReader(UInt32 total_size, Int32 val = 0)
: bit_reader(nullptr), bit_width(0), cur_group_size(total_size), cur_value(val), cur_group_is_packed(false)
{}
void nextGroup();
void nextGroupIfNecessary() { if (cur_group_cursor >= cur_group_size) nextGroup(); }
UInt32 curGroupLeft() const { return cur_group_size - cur_group_cursor; }
/**
* @brief Visit num_values elements.
* For RLE encoding, for same group, the value is same, so they can be visited repeatedly.
* For BitPacked encoding, the values may be different with each other, so they must be visited individual.
*
* @tparam IndividualVisitor A callback with signature: void(Int32 val)
* @tparam RepeatedVisitor A callback with signature: void(UInt32 count, Int32 val)
*/
template <typename IndividualVisitor, typename RepeatedVisitor>
void visitValues(UInt32 num_values, IndividualVisitor && individual_visitor, RepeatedVisitor && repeated_visitor);
/**
* @brief Visit num_values elements by parsed nullability.
* If the parsed value is same as max_def_level, then it is processed as null value.
*
* @tparam IndividualVisitor A callback with signature: void(size_t cursor)
* @tparam RepeatedVisitor A callback with signature: void(size_t cursor, UInt32 count)
*
* Because the null map is processed, so only the callbacks only need to process the valid data.
*/
template <typename IndividualVisitor, typename RepeatedVisitor>
void visitNullableValues(
size_t cursor,
UInt32 num_values,
Int32 max_def_level,
LazyNullMap & null_map,
IndividualVisitor && individual_visitor,
RepeatedVisitor && repeated_visitor);
/**
* @brief Visit num_values elements by parsed nullability.
* It may be inefficient to process the valid data individually like in visitNullableValues,
* so a valid_index_steps index array is generated first, in order to process valid data continuously.
*
* @tparam IndividualNullVisitor A callback with signature: void(size_t cursor), used to process null value
* @tparam SteppedValidVisitor A callback with signature:
* void(size_t cursor, const std::vector<UInt8> & valid_index_steps)
* for n valid elements with null value interleaved in a BitPacked group,
* i-th item in valid_index_steps describes how many elements in column there are after (i-1)-th valid element.
*
* take following BitPacked group with 2 valid elements for example:
* null valid null null valid null
* then the valid_index_steps has values [1, 3, 2].
* Please note that the the sum of valid_index_steps is same as elements number in this group.
*
* @tparam RepeatedVisitor A callback with signature: void(bool is_valid, UInt32 cursor, UInt32 count)
*/
template <typename IndividualNullVisitor, typename SteppedValidVisitor, typename RepeatedVisitor>
void visitNullableBySteps(
size_t cursor,
UInt32 num_values,
Int32 max_def_level,
IndividualNullVisitor && null_visitor,
SteppedValidVisitor && stepped_valid_visitor,
RepeatedVisitor && repeated_visitor);
/**
* @brief Set the Values to column_data directly
*
* @tparam TValue The type of column data.
* @tparam ValueGetter A callback with signature: TValue(Int32 val)
*/
template <typename TValue, typename ValueGetter>
void setValues(TValue * column_data, UInt32 num_values, ValueGetter && val_getter);
/**
* @brief Set the value by valid_index_steps generated in visitNullableBySteps.
* According to visitNullableBySteps, the elements number is valid_index_steps.size()-1,
* so valid_index_steps.size()-1 elements are read, and set to column_data with steps in valid_index_steps
*/
template <typename TValue, typename ValueGetter>
void setValueBySteps(
TValue * column_data,
const std::vector<UInt8> & col_data_steps,
ValueGetter && val_getter);
private:
std::unique_ptr<arrow::BitUtil::BitReader> bit_reader;
std::vector<Int32> cur_packed_bit_values;
std::vector<UInt8> valid_index_steps;
Int32 bit_width;
UInt32 cur_group_size = 0;
UInt32 cur_group_cursor = 0;
Int32 cur_value;
bool cur_group_is_packed;
};
using RleValuesReaderPtr = std::unique_ptr<RleValuesReader>;
class ParquetDataValuesReader
{
public:
virtual void readBatch(MutableColumnPtr & column, LazyNullMap & null_map, UInt32 num_values) = 0;
virtual ~ParquetDataValuesReader() = default;
};
using ParquetDataValuesReaderPtr = std::unique_ptr<ParquetDataValuesReader>;
/**
* The definition level is RLE or BitPacked encoding, while data is read directly
*/
template <typename TColumn>
class ParquetPlainValuesReader : public ParquetDataValuesReader
{
public:
ParquetPlainValuesReader(
Int32 max_def_level_,
std::unique_ptr<RleValuesReader> def_level_reader_,
ParquetDataBuffer data_buffer_)
: max_def_level(max_def_level_)
, def_level_reader(std::move(def_level_reader_))
, plain_data_buffer(std::move(data_buffer_))
{}
void readBatch(MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values) override;
private:
Int32 max_def_level;
std::unique_ptr<RleValuesReader> def_level_reader;
ParquetDataBuffer plain_data_buffer;
};
/**
* The data and definition level encoding are same as ParquetPlainValuesReader.
* But the element size is const and bigger than primitive data type.
*/
template <typename TColumn>
class ParquetFixedLenPlainReader : public ParquetDataValuesReader
{
public:
ParquetFixedLenPlainReader(
Int32 max_def_level_,
Int32 elem_bytes_num_,
std::unique_ptr<RleValuesReader> def_level_reader_,
ParquetDataBuffer data_buffer_)
: max_def_level(max_def_level_)
, elem_bytes_num(elem_bytes_num_)
, def_level_reader(std::move(def_level_reader_))
, plain_data_buffer(std::move(data_buffer_))
{}
void readOverBigDecimal(MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values);
void readBatch(MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values) override;
private:
Int32 max_def_level;
Int32 elem_bytes_num;
std::unique_ptr<RleValuesReader> def_level_reader;
ParquetDataBuffer plain_data_buffer;
};
/**
* Read data according to the format of ColumnLowCardinality format.
*
* Only index and null column are processed in this class.
* And all null value is mapped to first index in dictionary,
* so the result index valued is added by one.
*/
template <typename TColumnVector>
class ParquetRleLCReader : public ParquetDataValuesReader
{
public:
ParquetRleLCReader(
Int32 max_def_level_,
std::unique_ptr<RleValuesReader> def_level_reader_,
std::unique_ptr<RleValuesReader> rle_data_reader_)
: max_def_level(max_def_level_)
, def_level_reader(std::move(def_level_reader_))
, rle_data_reader(std::move(rle_data_reader_))
{}
void readBatch(MutableColumnPtr & index_col, LazyNullMap & null_map, UInt32 num_values) override;
private:
Int32 max_def_level;
std::unique_ptr<RleValuesReader> def_level_reader;
std::unique_ptr<RleValuesReader> rle_data_reader;
};
/**
* The definition level is RLE or BitPacked encoded,
* and the index of dictionary is also RLE or BitPacked encoded.
*
* while the result is not parsed as a low cardinality column,
* instead, a normal column is generated.
*/
template <typename TColumn>
class ParquetRleDictReader : public ParquetDataValuesReader
{
public:
ParquetRleDictReader(
Int32 max_def_level_,
std::unique_ptr<RleValuesReader> def_level_reader_,
std::unique_ptr<RleValuesReader> rle_data_reader_,
const IColumn & page_dictionary_)
: max_def_level(max_def_level_)
, def_level_reader(std::move(def_level_reader_))
, rle_data_reader(std::move(rle_data_reader_))
, page_dictionary(page_dictionary_)
{}
void readBatch(MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values) override;
private:
Int32 max_def_level;
std::unique_ptr<RleValuesReader> def_level_reader;
std::unique_ptr<RleValuesReader> rle_data_reader;
const IColumn & page_dictionary;
};
}

View File

@ -0,0 +1,506 @@
#include "ParquetLeafColReader.h"
#include <utility>
#include <Columns/ColumnLowCardinality.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnUnique.h>
#include <Columns/ColumnsNumber.h>
#include <Core/ColumnWithTypeAndName.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/NumberTraits.h>
#include <IO/ReadBufferFromMemory.h>
#include <arrow/util/bit_util.h>
#include <parquet/column_page.h>
#include <parquet/column_reader.h>
#include <parquet/metadata.h>
#include <parquet/schema.h>
#include <parquet/types.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int BAD_ARGUMENTS;
extern const int PARQUET_EXCEPTION;
}
namespace
{
template <typename TypeVisitor>
void visitColStrIndexType(size_t data_size, TypeVisitor && visitor)
{
// refer to: DataTypeLowCardinality::createColumnUniqueImpl
if (data_size < (1ull << 8))
{
visitor(static_cast<ColumnUInt8 *>(nullptr));
}
else if (data_size < (1ull << 16))
{
visitor(static_cast<ColumnUInt16 *>(nullptr));
}
else if (data_size < (1ull << 32))
{
visitor(static_cast<ColumnUInt32 *>(nullptr));
}
else
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "unsupported data size {}", data_size);
}
}
void reserveColumnStrRows(MutableColumnPtr & col, UInt32 rows_num)
{
col->reserve(rows_num);
/// Never reserve for too big size according to SerializationString::deserializeBinaryBulk
if (rows_num < 256 * 1024 * 1024)
{
try
{
static_cast<ColumnString *>(col.get())->getChars().reserve(rows_num);
}
catch (Exception & e)
{
e.addMessage("(limit = " + toString(rows_num) + ")");
throw;
}
}
};
template <typename TColumn>
ColumnPtr readDictPage(
const parquet::DictionaryPage & page,
const parquet::ColumnDescriptor & col_des,
const DataTypePtr & /* data_type */);
template <>
ColumnPtr readDictPage<ColumnString>(
const parquet::DictionaryPage & page,
const parquet::ColumnDescriptor & /* col_des */,
const DataTypePtr & /* data_type */)
{
auto col = ColumnString::create();
col->getOffsets().resize(page.num_values() + 1);
col->getChars().reserve(page.num_values());
ParquetDataBuffer buffer(page.data(), page.size());
// will be read as low cardinality column
// in which case, the null key is set to first position, so the first string should be empty
col->getChars().push_back(0);
col->getOffsets()[0] = 1;
for (auto i = 1; i <= page.num_values(); i++)
{
buffer.readString(*col, i);
}
return col;
}
template <>
ColumnPtr readDictPage<ColumnDecimal<DateTime64>>(
const parquet::DictionaryPage & page,
const parquet::ColumnDescriptor & /* col_des */,
const DataTypePtr & data_type)
{
auto & datetime_type = assert_cast<const DataTypeDateTime64 &>(*data_type);
auto dict_col = ColumnDecimal<DateTime64>::create(page.num_values(), datetime_type.getScale());
auto * col_data = dict_col->getData().data();
ParquetDataBuffer buffer(page.data(), page.size(), datetime_type.getScale());
for (auto i = 0; i < page.num_values(); i++)
{
buffer.readDateTime64(col_data[i]);
}
return dict_col;
}
template <is_col_over_big_decimal TColumnDecimal>
ColumnPtr readDictPage(
const parquet::DictionaryPage & page,
const parquet::ColumnDescriptor & col_des,
const DataTypePtr & /* data_type */)
{
auto dict_col = TColumnDecimal::create(page.num_values(), col_des.type_scale());
auto * col_data = dict_col->getData().data();
ParquetDataBuffer buffer(page.data(), page.size());
for (auto i = 0; i < page.num_values(); i++)
{
buffer.readOverBigDecimal(col_data + i, col_des.type_length());
}
return dict_col;
}
template <is_col_int_decimal TColumnDecimal> requires (!std::is_same_v<typename TColumnDecimal::ValueType, DateTime64>)
ColumnPtr readDictPage(
const parquet::DictionaryPage & page,
const parquet::ColumnDescriptor & col_des,
const DataTypePtr & /* data_type */)
{
auto dict_col = TColumnDecimal::create(page.num_values(), col_des.type_scale());
ParquetDataBuffer buffer(page.data(), page.size());
buffer.readBytes(dict_col->getData().data(), page.num_values() * sizeof(typename TColumnDecimal::ValueType));
return dict_col;
}
template <is_col_vector TColumnVector>
ColumnPtr readDictPage(
const parquet::DictionaryPage & page,
const parquet::ColumnDescriptor & /* col_des */,
const DataTypePtr & /* data_type */)
{
auto dict_col = TColumnVector::create(page.num_values());
ParquetDataBuffer buffer(page.data(), page.size());
buffer.readBytes(dict_col->getData().data(), page.num_values() * sizeof(typename TColumnVector::ValueType));
return dict_col;
}
template <typename TColumn>
std::unique_ptr<ParquetDataValuesReader> createPlainReader(
const parquet::ColumnDescriptor & col_des,
RleValuesReaderPtr def_level_reader,
ParquetDataBuffer buffer);
template <is_col_over_big_decimal TColumnDecimal>
std::unique_ptr<ParquetDataValuesReader> createPlainReader(
const parquet::ColumnDescriptor & col_des,
RleValuesReaderPtr def_level_reader,
ParquetDataBuffer buffer)
{
return std::make_unique<ParquetFixedLenPlainReader<TColumnDecimal>>(
col_des.max_definition_level(),
col_des.type_length(),
std::move(def_level_reader),
std::move(buffer));
}
template <typename TColumn>
std::unique_ptr<ParquetDataValuesReader> createPlainReader(
const parquet::ColumnDescriptor & col_des,
RleValuesReaderPtr def_level_reader,
ParquetDataBuffer buffer)
{
return std::make_unique<ParquetPlainValuesReader<TColumn>>(
col_des.max_definition_level(), std::move(def_level_reader), std::move(buffer));
}
} // anonymous namespace
template <typename TColumn>
ParquetLeafColReader<TColumn>::ParquetLeafColReader(
const parquet::ColumnDescriptor & col_descriptor_,
DataTypePtr base_type_,
std::unique_ptr<parquet::ColumnChunkMetaData> meta_,
std::unique_ptr<parquet::PageReader> reader_)
: col_descriptor(col_descriptor_)
, base_data_type(base_type_)
, col_chunk_meta(std::move(meta_))
, parquet_page_reader(std::move(reader_))
, log(&Poco::Logger::get("ParquetLeafColReader"))
{
}
template <typename TColumn>
ColumnWithTypeAndName ParquetLeafColReader<TColumn>::readBatch(UInt32 rows_num, const String & name)
{
reading_rows_num = rows_num;
auto readPageIfEmpty = [&]() {
while (!cur_page_values) readPage();
};
// make sure the dict page has been read, and the status is updated
readPageIfEmpty();
resetColumn(rows_num);
while (rows_num)
{
// if dictionary page encountered, another page should be read
readPageIfEmpty();
auto read_values = std::min(rows_num, cur_page_values);
data_values_reader->readBatch(column, *null_map, read_values);
cur_page_values -= read_values;
rows_num -= read_values;
}
return releaseColumn(name);
}
template <>
void ParquetLeafColReader<ColumnString>::resetColumn(UInt32 rows_num)
{
if (reading_low_cardinality)
{
assert(dictionary);
visitColStrIndexType(dictionary->size(), [&]<typename TColVec>(TColVec *) {
column = TColVec::create();
});
// only first position is used
null_map = std::make_unique<LazyNullMap>(1);
column->reserve(rows_num);
}
else
{
null_map = std::make_unique<LazyNullMap>(rows_num);
column = ColumnString::create();
reserveColumnStrRows(column, rows_num);
}
}
template <typename TColumn>
void ParquetLeafColReader<TColumn>::resetColumn(UInt32 rows_num)
{
assert(!reading_low_cardinality);
column = base_data_type->createColumn();
column->reserve(rows_num);
null_map = std::make_unique<LazyNullMap>(rows_num);
}
template <typename TColumn>
void ParquetLeafColReader<TColumn>::degradeDictionary()
{
assert(dictionary && column->size());
null_map = std::make_unique<LazyNullMap>(reading_rows_num);
auto col_existing = std::move(column);
column = ColumnString::create();
ColumnString & col_dest = *static_cast<ColumnString *>(column.get());
const ColumnString & col_dict_str = *static_cast<const ColumnString *>(dictionary.get());
visitColStrIndexType(dictionary->size(), [&]<typename TColVec>(TColVec *) {
const TColVec & col_src = *static_cast<const TColVec *>(col_existing.get());
reserveColumnStrRows(column, reading_rows_num);
col_dest.getOffsets().resize(col_src.size());
for (size_t i = 0; i < col_src.size(); i++)
{
auto src_idx = col_src.getData()[i];
if (0 == src_idx)
{
null_map->setNull(i);
}
auto dict_chars_cursor = col_dict_str.getOffsets()[src_idx - 1];
auto str_len = col_dict_str.getOffsets()[src_idx] - dict_chars_cursor;
auto dst_chars_cursor = col_dest.getChars().size();
col_dest.getChars().resize(dst_chars_cursor + str_len);
memcpySmallAllowReadWriteOverflow15(
&col_dest.getChars()[dst_chars_cursor], &col_dict_str.getChars()[dict_chars_cursor], str_len);
col_dest.getOffsets()[i] = col_dest.getChars().size();
}
});
LOG_INFO(log, "degraded dictionary to normal column");
}
template <typename TColumn>
ColumnWithTypeAndName ParquetLeafColReader<TColumn>::releaseColumn(const String & name)
{
DataTypePtr data_type = base_data_type;
if (reading_low_cardinality)
{
MutableColumnPtr col_unique;
if (null_map->getNullableCol())
{
data_type = std::make_shared<DataTypeNullable>(data_type);
col_unique = ColumnUnique<TColumn>::create(dictionary->assumeMutable(), true);
}
else
{
col_unique = ColumnUnique<TColumn>::create(dictionary->assumeMutable(), false);
}
column = ColumnLowCardinality::create(std::move(col_unique), std::move(column), true);
data_type = std::make_shared<DataTypeLowCardinality>(data_type);
}
else
{
if (null_map->getNullableCol())
{
column = ColumnNullable::create(std::move(column), null_map->getNullableCol()->assumeMutable());
data_type = std::make_shared<DataTypeNullable>(data_type);
}
}
ColumnWithTypeAndName res = {std::move(column), data_type, name};
column = nullptr;
null_map = nullptr;
return res;
}
template <typename TColumn>
void ParquetLeafColReader<TColumn>::readPage()
{
// refer to: ColumnReaderImplBase::ReadNewPage in column_reader.cc
auto cur_page = parquet_page_reader->NextPage();
switch (cur_page->type())
{
case parquet::PageType::DATA_PAGE:
readPageV1(*std::static_pointer_cast<parquet::DataPageV1>(cur_page));
break;
case parquet::PageType::DATA_PAGE_V2:
readPageV2(*std::static_pointer_cast<parquet::DataPageV2>(cur_page));
break;
case parquet::PageType::DICTIONARY_PAGE:
{
const parquet::DictionaryPage & dict_page = *std::static_pointer_cast<parquet::DictionaryPage>(cur_page);
if (unlikely(
dict_page.encoding() != parquet::Encoding::PLAIN_DICTIONARY
&& dict_page.encoding() != parquet::Encoding::PLAIN))
{
throw new Exception(
ErrorCodes::NOT_IMPLEMENTED, "Unsupported dictionary page encoding {}", dict_page.encoding());
}
LOG_INFO(log, "{} values in dictionary page of column {}", dict_page.num_values(), col_descriptor.name());
dictionary = readDictPage<TColumn>(dict_page, col_descriptor, base_data_type);
if (std::is_same_v<TColumn, ColumnString>)
{
reading_low_cardinality = true;
}
break;
}
default:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported page type: {}", cur_page->type());
}
}
template <typename TColumn>
void ParquetLeafColReader<TColumn>::readPageV1(const parquet::DataPageV1 & page)
{
static parquet::LevelDecoder repetition_level_decoder;
cur_page_values = page.num_values();
// refer to: VectorizedColumnReader::readPageV1 in Spark and LevelDecoder::SetData in column_reader.cc
if (page.definition_level_encoding() != parquet::Encoding::RLE && col_descriptor.max_definition_level() != 0)
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", page.definition_level_encoding());
}
const auto * buffer = page.data();
auto max_size = page.size();
if (col_descriptor.max_repetition_level() > 0)
{
auto rep_levels_bytes = repetition_level_decoder.SetData(
page.repetition_level_encoding(), col_descriptor.max_repetition_level(), 0, buffer, max_size);
buffer += rep_levels_bytes;
max_size -= rep_levels_bytes;
}
assert(col_descriptor.max_definition_level() >= 0);
std::unique_ptr<RleValuesReader> def_level_reader;
if (col_descriptor.max_definition_level() > 0) {
auto bit_width = arrow::BitUtil::Log2(col_descriptor.max_definition_level() + 1);
auto num_bytes = ::arrow::util::SafeLoadAs<int32_t>(buffer);
auto bit_reader = std::make_unique<arrow::BitUtil::BitReader>(buffer + 4, num_bytes);
num_bytes += 4;
buffer += num_bytes;
max_size -= num_bytes;
def_level_reader = std::make_unique<RleValuesReader>(std::move(bit_reader), bit_width);
}
else
{
def_level_reader = std::make_unique<RleValuesReader>(page.num_values());
}
switch (page.encoding())
{
case parquet::Encoding::PLAIN:
{
if (reading_low_cardinality)
{
reading_low_cardinality = false;
degradeDictionary();
}
ParquetDataBuffer parquet_buffer = [&]() {
if constexpr (!std::is_same_v<ColumnDecimal<DateTime64>, TColumn>)
return ParquetDataBuffer(buffer, max_size);
auto scale = assert_cast<const DataTypeDateTime64 &>(*base_data_type).getScale();
return ParquetDataBuffer(buffer, max_size, scale);
}();
data_values_reader = createPlainReader<TColumn>(
col_descriptor, std::move(def_level_reader), std::move(parquet_buffer));
break;
}
case parquet::Encoding::RLE_DICTIONARY:
case parquet::Encoding::PLAIN_DICTIONARY:
{
if (unlikely(!dictionary))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "dictionary should be existed");
}
// refer to: DictDecoderImpl::SetData in encoding.cc
auto bit_width = *buffer;
auto bit_reader = std::make_unique<arrow::BitUtil::BitReader>(++buffer, --max_size);
data_values_reader = createDictReader(
std::move(def_level_reader), std::make_unique<RleValuesReader>(std::move(bit_reader), bit_width));
break;
}
case parquet::Encoding::BYTE_STREAM_SPLIT:
case parquet::Encoding::DELTA_BINARY_PACKED:
case parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY:
case parquet::Encoding::DELTA_BYTE_ARRAY:
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", page.encoding());
default:
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unknown encoding type: {}", page.encoding());
}
}
template <typename TColumn>
void ParquetLeafColReader<TColumn>::readPageV2(const parquet::DataPageV2 & /*page*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "read page V2 is not implemented yet");
}
template <typename TColumn>
std::unique_ptr<ParquetDataValuesReader> ParquetLeafColReader<TColumn>::createDictReader(
std::unique_ptr<RleValuesReader> def_level_reader, std::unique_ptr<RleValuesReader> rle_data_reader)
{
if (reading_low_cardinality && std::same_as<TColumn, ColumnString>)
{
std::unique_ptr<ParquetDataValuesReader> res;
visitColStrIndexType(dictionary->size(), [&]<typename TCol>(TCol *) {
res = std::make_unique<ParquetRleLCReader<TCol>>(
col_descriptor.max_definition_level(),
std::move(def_level_reader),
std::move(rle_data_reader));
});
return res;
}
return std::make_unique<ParquetRleDictReader<TColumn>>(
col_descriptor.max_definition_level(),
std::move(def_level_reader),
std::move(rle_data_reader),
*assert_cast<const TColumn *>(dictionary.get()));
}
template class ParquetLeafColReader<ColumnInt32>;
template class ParquetLeafColReader<ColumnInt64>;
template class ParquetLeafColReader<ColumnFloat32>;
template class ParquetLeafColReader<ColumnFloat64>;
template class ParquetLeafColReader<ColumnString>;
template class ParquetLeafColReader<ColumnDecimal<Decimal32>>;
template class ParquetLeafColReader<ColumnDecimal<Decimal64>>;
template class ParquetLeafColReader<ColumnDecimal<Decimal128>>;
template class ParquetLeafColReader<ColumnDecimal<Decimal256>>;
template class ParquetLeafColReader<ColumnDecimal<DateTime64>>;
}

View File

@ -0,0 +1,63 @@
#pragma once
#include <base/logger_useful.h>
#include <Columns/IColumn.h>
#include <DataTypes/Serializations/ISerialization.h>
#include "ParquetColumnReader.h"
#include "ParquetDataValuesReader.h"
namespace parquet
{
class ColumnDescriptor;
}
namespace DB
{
template <typename TColumn>
class ParquetLeafColReader : public ParquetColumnReader
{
public:
ParquetLeafColReader(
const parquet::ColumnDescriptor & col_descriptor_,
DataTypePtr base_type_,
std::unique_ptr<parquet::ColumnChunkMetaData> meta_,
std::unique_ptr<parquet::PageReader> reader_);
ColumnWithTypeAndName readBatch(UInt32 rows_num, const String & name) override;
private:
const parquet::ColumnDescriptor & col_descriptor;
DataTypePtr base_data_type;
std::unique_ptr<parquet::ColumnChunkMetaData> col_chunk_meta;
std::unique_ptr<parquet::PageReader> parquet_page_reader;
std::unique_ptr<ParquetDataValuesReader> data_values_reader;
MutableColumnPtr column;
std::unique_ptr<LazyNullMap> null_map;
ColumnPtr dictionary;
UInt32 cur_page_values = 0;
UInt32 reading_rows_num = 0;
bool reading_low_cardinality = false;
Poco::Logger * log;
void resetColumn(UInt32 rows_num);
void degradeDictionary();
ColumnWithTypeAndName releaseColumn(const String & name);
void readPage();
void readPageV1(const parquet::DataPageV1 & page);
void readPageV2(const parquet::DataPageV2 & page);
std::unique_ptr<ParquetDataValuesReader> createDictReader(
std::unique_ptr<RleValuesReader> def_level_reader, std::unique_ptr<RleValuesReader> rle_data_reader);
};
}

View File

@ -0,0 +1,225 @@
#include "ParquetRecordReader.h"
#include <bit>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Common/Stopwatch.h>
#include <Core/Types.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/castColumn.h>
#include <arrow/status.h>
#include <parquet/arrow/reader.h>
#include <parquet/column_reader.h>
#include <parquet/properties.h>
#include "ParquetLeafColReader.h"
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int PARQUET_EXCEPTION;
}
// #define THROW_ARROW_NOT_OK(status) \
// do \
// { \
// if (::arrow::Status _s = (status); !_s.ok()) \
// throw Exception(_s.ToString(), ErrorCodes::BAD_ARGUMENTS); \
// } while (false)
#define THROW_PARQUET_EXCEPTION(s) \
do \
{ \
try { (s); } \
catch (const ::parquet::ParquetException & e) \
{ \
throw Exception(e.what(), ErrorCodes::PARQUET_EXCEPTION); \
} \
} while (false)
namespace
{
Int64 getTotalRows(const parquet::FileMetaData & meta_data)
{
Int64 res = 0;
for (int i = 0; i < meta_data.num_row_groups(); i++)
{
res += meta_data.RowGroup(i)->num_rows();
}
return res;
}
std::unique_ptr<ParquetColumnReader> createReader(
const parquet::ColumnDescriptor & col_descriptor,
DataTypePtr ch_type,
std::unique_ptr<parquet::ColumnChunkMetaData> meta,
std::unique_ptr<parquet::PageReader> reader)
{
if (col_descriptor.logical_type()->is_date() && parquet::Type::INT32 == col_descriptor.physical_type())
{
return std::make_unique<ParquetLeafColReader<ColumnInt32>>(
col_descriptor, std::make_shared<DataTypeDate32>(), std::move(meta), std::move(reader));
}
else if (col_descriptor.logical_type()->is_decimal())
{
switch (col_descriptor.physical_type())
{
case parquet::Type::INT32:
{
auto data_type = std::make_shared<DataTypeDecimal32>(
col_descriptor.type_precision(), col_descriptor.type_scale());
return std::make_unique<ParquetLeafColReader<ColumnDecimal<Decimal32>>>(
col_descriptor, data_type, std::move(meta), std::move(reader));
}
case parquet::Type::INT64:
{
auto data_type = std::make_shared<DataTypeDecimal64>(
col_descriptor.type_precision(), col_descriptor.type_scale());
return std::make_unique<ParquetLeafColReader<ColumnDecimal<Decimal64>>>(
col_descriptor, data_type, std::move(meta), std::move(reader));
}
case parquet::Type::FIXED_LEN_BYTE_ARRAY:
{
if (col_descriptor.type_length() <= static_cast<int>(DecimalUtils::max_precision<Decimal128>))
{
auto data_type = std::make_shared<DataTypeDecimal128>(
col_descriptor.type_precision(), col_descriptor.type_scale());
return std::make_unique<ParquetLeafColReader<ColumnDecimal<Decimal128>>>(
col_descriptor, data_type, std::move(meta), std::move(reader));
}
else
{
auto data_type = std::make_shared<DataTypeDecimal256>(
col_descriptor.type_precision(), col_descriptor.type_scale());
return std::make_unique<ParquetLeafColReader<ColumnDecimal<Decimal256>>>(
col_descriptor, data_type, std::move(meta), std::move(reader));
}
}
default:
throw Exception(
ErrorCodes::PARQUET_EXCEPTION,
"Type not supported for decimal: {}",
col_descriptor.physical_type());
}
}
else
{
switch (col_descriptor.physical_type())
{
case parquet::Type::INT32:
return std::make_unique<ParquetLeafColReader<ColumnInt32>>(
col_descriptor, std::make_shared<DataTypeInt32>(), std::move(meta), std::move(reader));
case parquet::Type::INT64:
return std::make_unique<ParquetLeafColReader<ColumnInt64>>(
col_descriptor, std::make_shared<DataTypeInt64>(), std::move(meta), std::move(reader));
case parquet::Type::FLOAT:
return std::make_unique<ParquetLeafColReader<ColumnFloat32>>(
col_descriptor, std::make_shared<DataTypeFloat32>(), std::move(meta), std::move(reader));
case parquet::Type::INT96:
{
DataTypePtr read_type = ch_type;
if (!isDateTime64(ch_type))
{
read_type = std::make_shared<DataTypeDateTime64>(ParquetRecordReader::default_datetime64_scale);
}
return std::make_unique<ParquetLeafColReader<ColumnDecimal<DateTime64>>>(
col_descriptor, read_type, std::move(meta), std::move(reader));
}
case parquet::Type::DOUBLE:
return std::make_unique<ParquetLeafColReader<ColumnFloat64>>(
col_descriptor, std::make_shared<DataTypeFloat64>(), std::move(meta), std::move(reader));
case parquet::Type::BYTE_ARRAY:
return std::make_unique<ParquetLeafColReader<ColumnString>>(
col_descriptor, std::make_shared<DataTypeString>(), std::move(meta), std::move(reader));
default:
throw Exception(
ErrorCodes::PARQUET_EXCEPTION, "Type not supported: {}", col_descriptor.physical_type());
}
}
}
} // anonymouse namespace
ParquetRecordReader::ParquetRecordReader(
Block header_,
std::shared_ptr<::arrow::io::RandomAccessFile> file,
const parquet::ReaderProperties& properties)
: header(std::move(header_))
{
// Only little endian system is supported currently
static_assert(std::endian::native == std::endian::little);
log = &Poco::Logger::get("ParquetRecordReader");
THROW_PARQUET_EXCEPTION(file_reader = parquet::ParquetFileReader::Open(std::move(file), properties));
left_rows = getTotalRows(*file_reader->metadata());
parquet_col_indice.reserve(header.columns());
column_readers.reserve(header.columns());
for (const auto & col_with_name : header)
{
auto idx = file_reader->metadata()->schema()->ColumnIndex(col_with_name.name);
if (idx < 0)
{
throw Exception("can not find column with name: " + col_with_name.name, ErrorCodes::BAD_ARGUMENTS);
}
parquet_col_indice.push_back(idx);
}
}
Chunk ParquetRecordReader::readChunk(UInt32 num_rows)
{
if (!left_rows)
{
return Chunk{};
}
if (!cur_row_group_left_rows)
{
loadNextRowGroup();
}
Columns columns(header.columns());
auto num_rows_read = std::min(static_cast<UInt64>(num_rows), cur_row_group_left_rows);
for (size_t i = 0; i < header.columns(); i++)
{
columns[i] = castColumn(
column_readers[i]->readBatch(num_rows_read, header.getByPosition(i).name),
header.getByPosition(i).type);
}
left_rows -= num_rows_read;
cur_row_group_left_rows -= num_rows_read;
return Chunk{std::move(columns), num_rows_read};
}
void ParquetRecordReader::loadNextRowGroup()
{
Stopwatch watch(CLOCK_MONOTONIC);
cur_row_group_reader = file_reader->RowGroup(next_row_group_idx);
column_readers.clear();
for (size_t i = 0; i < parquet_col_indice.size(); i++)
{
column_readers.emplace_back(createReader(
*file_reader->metadata()->schema()->Column(parquet_col_indice[i]),
header.getByPosition(i).type,
cur_row_group_reader->metadata()->ColumnChunk(parquet_col_indice[i]),
cur_row_group_reader->GetColumnPageReader(parquet_col_indice[i])));
}
LOG_DEBUG(log, "reading row group {} consumed {} ms", next_row_group_idx, watch.elapsedNanoseconds() / 1e6);
++next_row_group_idx;
cur_row_group_left_rows = cur_row_group_reader->metadata()->num_rows();
}
}

View File

@ -0,0 +1,48 @@
#pragma once
#include <base/logger_useful.h>
#include <Core/Block.h>
#include <Processors/Chunk.h>
#include <Storages/Parquet/ParquetColumnReader.h>
#include <arrow/io/interfaces.h>
#include <parquet/file_reader.h>
#include <parquet/properties.h>
#include "ParquetColumnReader.h"
namespace DB
{
class ParquetRecordReader
{
public:
ParquetRecordReader(
Block header_,
std::shared_ptr<::arrow::io::RandomAccessFile> file,
const parquet::ReaderProperties& properties);
Chunk readChunk(UInt32 num_rows);
// follow the scale generated by spark
static constexpr UInt8 default_datetime64_scale = 9;
private:
std::unique_ptr<parquet::ParquetFileReader> file_reader;
Block header;
std::shared_ptr<parquet::RowGroupReader> cur_row_group_reader;
ParquetColReaders column_readers;
std::vector<int> parquet_col_indice;
UInt64 left_rows;
UInt64 cur_row_group_left_rows = 0;
int next_row_group_idx = 0;
Poco::Logger * log;
void loadNextRowGroup();
};
}