From 9d0ad7ba67b6855344512398b5f924bdad4ece9e Mon Sep 17 00:00:00 2001 From: copperybean Date: Sun, 14 Jan 2024 11:25:12 +0800 Subject: [PATCH] original parquet reader Change-Id: I83a8ec8271edefcd96cb5b3bcd12f6b545d9dec0 --- .../Impl/Parquet/ParquetColumnReader.h | 29 + .../Formats/Impl/Parquet/ParquetDataBuffer.h | 179 ++++++ .../Impl/Parquet/ParquetDataValuesReader.cpp | 553 ++++++++++++++++++ .../Impl/Parquet/ParquetDataValuesReader.h | 263 +++++++++ .../Impl/Parquet/ParquetLeafColReader.cpp | 506 ++++++++++++++++ .../Impl/Parquet/ParquetLeafColReader.h | 63 ++ .../Impl/Parquet/ParquetRecordReader.cpp | 225 +++++++ .../Impl/Parquet/ParquetRecordReader.h | 48 ++ 8 files changed, 1866 insertions(+) create mode 100644 src/Processors/Formats/Impl/Parquet/ParquetColumnReader.h create mode 100644 src/Processors/Formats/Impl/Parquet/ParquetDataBuffer.h create mode 100644 src/Processors/Formats/Impl/Parquet/ParquetDataValuesReader.cpp create mode 100644 src/Processors/Formats/Impl/Parquet/ParquetDataValuesReader.h create mode 100644 src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp create mode 100644 src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.h create mode 100644 src/Processors/Formats/Impl/Parquet/ParquetRecordReader.cpp create mode 100644 src/Processors/Formats/Impl/Parquet/ParquetRecordReader.h diff --git a/src/Processors/Formats/Impl/Parquet/ParquetColumnReader.h b/src/Processors/Formats/Impl/Parquet/ParquetColumnReader.h new file mode 100644 index 00000000000..cfd9d3ba5bd --- /dev/null +++ b/src/Processors/Formats/Impl/Parquet/ParquetColumnReader.h @@ -0,0 +1,29 @@ +#pragma once + +#include + +namespace parquet +{ + +class PageReader; +class ColumnChunkMetaData; +class DataPageV1; +class DataPageV2; + +} + +namespace DB +{ + +class ParquetColumnReader +{ +public: + virtual ColumnWithTypeAndName readBatch(UInt32 rows_num, const String & name) = 0; + + virtual ~ParquetColumnReader() = default; +}; + +using ParquetColReaderPtr = std::unique_ptr; +using ParquetColReaders = std::vector; + +} diff --git a/src/Processors/Formats/Impl/Parquet/ParquetDataBuffer.h b/src/Processors/Formats/Impl/Parquet/ParquetDataBuffer.h new file mode 100644 index 00000000000..1f83c74f9ad --- /dev/null +++ b/src/Processors/Formats/Impl/Parquet/ParquetDataBuffer.h @@ -0,0 +1,179 @@ +#pragma once + +#include + +#include +#include +#include + +namespace DB +{ + +template struct ToArrowDecimal; + +template <> struct ToArrowDecimal>> +{ + using ArrowDecimal = arrow::Decimal128; +}; + +template <> struct ToArrowDecimal>> +{ + using ArrowDecimal = arrow::Decimal256; +}; + + +class ParquetDataBuffer +{ +private: + +public: + ParquetDataBuffer(const uint8_t * data_, UInt64 avaible_, UInt8 datetime64_scale_ = DataTypeDateTime64::default_scale) + : data(reinterpret_cast(data_)), avaible(avaible_), datetime64_scale(datetime64_scale_) {} + + template + void ALWAYS_INLINE readValue(TValue & dst) + { + checkAvaible(sizeof(TValue)); + dst = *reinterpret_cast(data); + consume(sizeof(TValue)); + } + + void ALWAYS_INLINE readBytes(void * dst, size_t bytes) + { + checkAvaible(bytes); + memcpy(dst, data, bytes); + consume(bytes); + } + + void ALWAYS_INLINE readDateTime64(DateTime64 & dst) + { + static const int max_scale_num = 9; + static const UInt64 pow10[max_scale_num + 1] + = {1000000000, 100000000, 10000000, 1000000, 100000, 10000, 1000, 100, 10, 1}; + static const UInt64 spd = 60 * 60 * 24; + static const UInt64 scaled_day[max_scale_num + 1] + = {spd, + 10 * spd, + 100 * spd, + 1000 * spd, + 10000 * spd, + 100000 * spd, + 1000000 * spd, + 10000000 * spd, + 100000000 * spd, + 1000000000 * spd}; + + checkAvaible(sizeof(parquet::Int96)); + auto decoded = parquet::DecodeInt96Timestamp(*reinterpret_cast(data)); + + uint64_t scaled_nano = decoded.nanoseconds / pow10[datetime64_scale]; + dst = static_cast(decoded.days_since_epoch * scaled_day[datetime64_scale] + scaled_nano); + + consume(sizeof(parquet::Int96)); + } + + /** + * This method should only be used to read string whose elements size is small. + * Because memcpySmallAllowReadWriteOverflow15 instead of memcpy is used according to ColumnString::indexImpl + */ + void ALWAYS_INLINE readString(ColumnString & column, size_t cursor) + { + // refer to: PlainByteArrayDecoder::DecodeArrowDense in encoding.cc + // deserializeBinarySSE2 in SerializationString.cpp + checkAvaible(4); + auto value_len = ::arrow::util::SafeLoadAs(getArrowData()); + if (unlikely(value_len < 0 || value_len > INT32_MAX - 4)) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid or corrupted value_len '{}'", value_len); + } + consume(4); + checkAvaible(value_len); + + auto chars_cursor = column.getChars().size(); + column.getChars().resize(chars_cursor + value_len + 1); + + memcpySmallAllowReadWriteOverflow15(&column.getChars()[chars_cursor], data, value_len); + column.getChars().back() = 0; + + column.getOffsets().data()[cursor] = column.getChars().size(); + consume(value_len); + } + + template + void ALWAYS_INLINE readOverBigDecimal(TDecimal * out, Int32 elem_bytes_num) + { + using TArrowDecimal = typename ToArrowDecimal::ArrowDecimal; + + checkAvaible(elem_bytes_num); + + // refer to: RawBytesToDecimalBytes in reader_internal.cc, Decimal128::FromBigEndian in decimal.cc + auto status = TArrowDecimal::FromBigEndian(getArrowData(), elem_bytes_num); + if (unlikely(!status.ok())) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Read parquet decimal failed: {}", status.status().ToString()); + } + status.ValueUnsafe().ToBytes(reinterpret_cast(out)); + consume(elem_bytes_num); + } + +private: + const Int8 * data; + UInt64 avaible; + const UInt8 datetime64_scale; + + void ALWAYS_INLINE checkAvaible(UInt64 num) + { + if (unlikely(avaible < num)) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Consuming {} bytes while {} avaible", num, avaible); + } + } + + const uint8_t * ALWAYS_INLINE getArrowData() { return reinterpret_cast(data); } + + void ALWAYS_INLINE consume(UInt64 num) + { + data += num; + avaible -= num; + } +}; + + +class LazyNullMap +{ +public: + LazyNullMap(UInt32 size_) : size(size_), col_nullable(nullptr) {} + + void setNull(UInt32 cursor) + { + initialize(); + null_map[cursor] = 1; + } + + void setNull(UInt32 cursor, UInt32 count) + { + initialize(); + memset(null_map + cursor, 1, count); + } + + ColumnPtr getNullableCol() { return col_nullable; } + +private: + UInt32 size; + UInt8 * null_map; + ColumnPtr col_nullable; + + void initialize() + { + if (likely(col_nullable)) + { + return; + } + auto col = ColumnVector::create(size); + null_map = col->getData().data(); + col_nullable = std::move(col); + memset(null_map, 0, size); + } +}; + +} diff --git a/src/Processors/Formats/Impl/Parquet/ParquetDataValuesReader.cpp b/src/Processors/Formats/Impl/Parquet/ParquetDataValuesReader.cpp new file mode 100644 index 00000000000..659a7a11969 --- /dev/null +++ b/src/Processors/Formats/Impl/Parquet/ParquetDataValuesReader.cpp @@ -0,0 +1,553 @@ +#include "ParquetDataValuesReader.h" + +#include +#include + +#include + +namespace DB +{ + +void RleValuesReader::nextGroup() +{ + // refer to: + // RleDecoder::NextCounts in rle_encoding.h and VectorizedRleValuesReader::readNextGroup in Spark + UInt32 indicator_value = 0; + [[maybe_unused]] auto read_res = bit_reader->GetVlqInt(&indicator_value); + assert(read_res); + + cur_group_is_packed = indicator_value & 1; + cur_group_size = indicator_value >> 1; + + if (cur_group_is_packed) + { + cur_group_size *= 8; + cur_packed_bit_values.resize(cur_group_size); + bit_reader->GetBatch(bit_width, cur_packed_bit_values.data(), cur_group_size); + } + else + { + cur_value = 0; + read_res = bit_reader->GetAligned((bit_width + 7) / 8, &cur_value); + assert(read_res); + } + cur_group_cursor = 0; + +} + +template +void RleValuesReader::visitValues( + UInt32 num_values, IndividualVisitor && individual_visitor, RepeatedVisitor && repeated_visitor) +{ + // refer to: VisitNullBitmapInline in visitor_inline.h + while (num_values) + { + nextGroupIfNecessary(); + auto cur_count = std::min(num_values, curGroupLeft()); + + if (cur_group_is_packed) + { + for (auto i = cur_group_cursor; i < cur_group_cursor + cur_count; i++) + { + individual_visitor(cur_packed_bit_values[i]); + } + } + else + { + repeated_visitor(cur_count, cur_value); + } + cur_group_cursor += cur_count; + num_values -= cur_count; + } +} + +template +void RleValuesReader::visitNullableValues( + size_t cursor, + UInt32 num_values, + Int32 max_def_level, + LazyNullMap & null_map, + IndividualVisitor && individual_visitor, + RepeatedVisitor && repeated_visitor) +{ + while (num_values) + { + nextGroupIfNecessary(); + auto cur_count = std::min(num_values, curGroupLeft()); + + if (cur_group_is_packed) + { + for (auto i = cur_group_cursor; i < cur_group_cursor + cur_count; i++) + { + if (cur_packed_bit_values[i] == max_def_level) + { + individual_visitor(cursor); + } + else + { + null_map.setNull(cursor); + } + cursor++; + } + } + else + { + if (cur_value == max_def_level) + { + repeated_visitor(cursor, cur_count); + } + else + { + null_map.setNull(cursor, cur_count); + } + cursor += cur_count; + } + cur_group_cursor += cur_count; + num_values -= cur_count; + } +} + +template +void RleValuesReader::visitNullableBySteps( + size_t cursor, + UInt32 num_values, + Int32 max_def_level, + IndividualNullVisitor && individual_null_visitor, + SteppedValidVisitor && stepped_valid_visitor, + RepeatedVisitor && repeated_visitor) +{ + // refer to: + // RleDecoder::GetBatch in rle_encoding.h and TypedColumnReaderImpl::ReadBatchSpaced in column_reader.cc + // VectorizedRleValuesReader::readBatchInternal in Spark + while (num_values > 0) + { + nextGroupIfNecessary(); + auto cur_count = std::min(num_values, curGroupLeft()); + + if (cur_group_is_packed) + { + valid_index_steps.resize(cur_count + 1); + valid_index_steps[0] = 0; + auto step_idx = 0; + auto null_map_cursor = cursor; + + for (auto i = cur_group_cursor; i < cur_group_cursor + cur_count; i++) + { + if (cur_packed_bit_values[i] == max_def_level) + { + valid_index_steps[++step_idx] = 1; + } + else + { + individual_null_visitor(null_map_cursor); + if (unlikely(valid_index_steps[step_idx] == UINT8_MAX)) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "unsupported packed values number"); + } + valid_index_steps[step_idx]++; + } + null_map_cursor++; + } + valid_index_steps.resize(step_idx + 1); + stepped_valid_visitor(cursor, valid_index_steps); + } + else + { + repeated_visitor(cur_value == max_def_level, cursor, cur_count); + } + + cursor += cur_count; + cur_group_cursor += cur_count; + num_values -= cur_count; + } +} + +template +void RleValuesReader::setValues(TValue * res_values, UInt32 num_values, ValueGetter && val_getter) +{ + visitValues( + num_values, + /* individual_visitor */ [&](Int32 val) + { + *(res_values++) = val_getter(val); + }, + /* repeated_visitor */ [&](UInt32 count, Int32 val) + { + std::fill(res_values, res_values + count, val_getter(val)); + res_values += count; + } + ); +} + +template +void RleValuesReader::setValueBySteps( + TValue * res_values, + const std::vector & col_data_steps, + ValueGetter && val_getter) +{ + auto step_iterator = col_data_steps.begin(); + res_values += *(step_iterator++); + + visitValues( + col_data_steps.size() - 1, + /* individual_visitor */ [&](Int32 val) + { + *res_values = val_getter(val); + res_values += *(step_iterator++); + }, + /* repeated_visitor */ [&](UInt32 count, Int32 val) + { + auto getted_val = val_getter(val); + for (UInt32 i = 0; i < count; i++) + { + *res_values = getted_val; + res_values += *(step_iterator++); + } + } + ); +} + + +namespace +{ + +template +TValue * getResizedPrimitiveData(TColumn & column, size_t size) +{ + auto old_size = column.size(); + column.getData().resize(size); + memset(column.getData().data() + old_size, 0, sizeof(TValue) * (size - old_size)); + return column.getData().data(); +} + +} // anoynomous namespace + + +template <> +void ParquetPlainValuesReader::readBatch( + MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values) +{ + auto & column = *assert_cast(col_ptr.get()); + auto cursor = column.size(); + + column.getOffsets().resize(cursor + num_values); + auto * offset_data = column.getOffsets().data(); + auto & chars = column.getChars(); + + def_level_reader->visitValues( + num_values, + /* individual_visitor */ [&](Int32 val) + { + if (val == max_def_level) + { + plain_data_buffer.readString(column, cursor); + } + else + { + chars.push_back(0); + offset_data[cursor] = chars.size(); + null_map.setNull(cursor); + } + cursor++; + }, + /* repeated_visitor */ [&](UInt32 count, Int32 val) + { + if (val == max_def_level) + { + for (UInt32 i = 0; i < count; i++) + { + plain_data_buffer.readString(column, cursor); + cursor++; + } + } + else + { + null_map.setNull(cursor, count); + + auto chars_size_bak = chars.size(); + chars.resize(chars_size_bak + count); + memset(&chars[chars_size_bak], 0, count); + + auto idx = cursor; + cursor += count; + // the type of offset_data is PaddedPODArray, which makes sure that the -1 index is avaible + for (auto val_offset = offset_data[idx - 1]; idx < cursor; idx++) + { + offset_data[idx] = ++val_offset; + } + } + } + ); +} + + +template <> +void ParquetPlainValuesReader>::readBatch( + MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values) +{ + auto cursor = col_ptr->size(); + auto * column_data = getResizedPrimitiveData( + *assert_cast *>(col_ptr.get()), cursor + num_values); + + def_level_reader->visitNullableValues( + cursor, + num_values, + max_def_level, + null_map, + /* individual_visitor */ [&](size_t nest_cursor) + { + plain_data_buffer.readDateTime64(column_data[nest_cursor]); + }, + /* repeated_visitor */ [&](size_t nest_cursor, UInt32 count) + { + auto col_data_pos = column_data + nest_cursor; + for (UInt32 i = 0; i < count; i++) + { + plain_data_buffer.readDateTime64(col_data_pos[i]); + } + } + ); +} + +template +void ParquetPlainValuesReader::readBatch( + MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values) +{ + auto cursor = col_ptr->size(); + auto * column_data = getResizedPrimitiveData(*assert_cast(col_ptr.get()), cursor + num_values); + using TValue = std::decay_t; + + def_level_reader->visitNullableValues( + cursor, + num_values, + max_def_level, + null_map, + /* individual_visitor */ [&](size_t nest_cursor) + { + plain_data_buffer.readValue(column_data[nest_cursor]); + }, + /* repeated_visitor */ [&](size_t nest_cursor, UInt32 count) + { + plain_data_buffer.readBytes(column_data + nest_cursor, count * sizeof(TValue)); + } + ); +} + + +template +void ParquetFixedLenPlainReader::readBatch( + MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values) +{ + if constexpr (std::same_as> || std::same_as>) + { + readOverBigDecimal(col_ptr, null_map, num_values); + } + else + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "unsupported type"); + } +} + +template +void ParquetFixedLenPlainReader::readOverBigDecimal( + MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values) +{ + auto cursor = col_ptr->size(); + auto * column_data = getResizedPrimitiveData( + *assert_cast(col_ptr.get()), cursor + num_values); + + def_level_reader->visitNullableValues( + cursor, + num_values, + max_def_level, + null_map, + /* individual_visitor */ [&](size_t nest_cursor) + { + plain_data_buffer.readOverBigDecimal(column_data + nest_cursor, elem_bytes_num); + }, + /* repeated_visitor */ [&](size_t nest_cursor, UInt32 count) + { + auto col_data_pos = column_data + nest_cursor; + for (UInt32 i = 0; i < count; i++) + { + plain_data_buffer.readOverBigDecimal(col_data_pos + i, elem_bytes_num); + } + } + ); +} + + +template +void ParquetRleLCReader::readBatch( + MutableColumnPtr & index_col, LazyNullMap & null_map, UInt32 num_values) +{ + auto cursor = index_col->size(); + auto * column_data = getResizedPrimitiveData(*assert_cast(index_col.get()), cursor + num_values); + + bool has_null = false; + + // in ColumnLowCardinality, first element in dictionary is null + // so we should increase each value by 1 in parquet index + auto val_getter = [&](Int32 val) { return val + 1; }; + + def_level_reader->visitNullableBySteps( + cursor, + num_values, + max_def_level, + /* individual_null_visitor */ [&](UInt32 nest_cursor) { + column_data[nest_cursor] = 0; + has_null = true; + }, + /* stepped_valid_visitor */ [&](UInt32 nest_cursor, const std::vector & valid_index_steps) { + rle_data_reader->setValueBySteps(column_data + nest_cursor, valid_index_steps, val_getter); + }, + /* repeated_visitor */ [&](bool is_valid, UInt32 nest_cursor, UInt32 count) { + if (is_valid) + { + rle_data_reader->setValues(column_data + nest_cursor, count, val_getter); + } + else + { + auto data_pos = column_data + nest_cursor; + std::fill(data_pos, data_pos + count, 0); + has_null = true; + } + } + ); + if (has_null) + { + null_map.setNull(0); + } +} + +template <> +void ParquetRleDictReader::readBatch( + MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values) +{ + auto & column = *assert_cast(col_ptr.get()); + auto cursor = column.size(); + std::vector value_cache; + + const auto & dict_chars = static_cast(page_dictionary).getChars(); + const auto & dict_offsets = static_cast(page_dictionary).getOffsets(); + + column.getOffsets().resize(cursor + num_values); + auto * offset_data = column.getOffsets().data(); + auto & chars = column.getChars(); + + auto append_nulls = [&](UInt8 num) { + for (auto limit = cursor + num; cursor < limit; cursor++) + { + chars.push_back(0); + offset_data[cursor] = chars.size(); + null_map.setNull(cursor); + } + }; + + auto append_string = [&](Int32 dict_idx) { + auto dict_chars_cursor = dict_offsets[dict_idx - 1]; + auto value_len = dict_offsets[dict_idx] - dict_chars_cursor; + auto chars_cursor = chars.size(); + chars.resize(chars_cursor + value_len); + + memcpySmallAllowReadWriteOverflow15(&chars[chars_cursor], &dict_chars[dict_chars_cursor], value_len); + offset_data[cursor] = chars.size(); + cursor++; + }; + + auto val_getter = [&](Int32 val) { return val + 1; }; + + def_level_reader->visitNullableBySteps( + cursor, + num_values, + max_def_level, + /* individual_null_visitor */ [&](UInt32) {}, + /* stepped_valid_visitor */ [&](UInt32, const std::vector & valid_index_steps) { + value_cache.resize(valid_index_steps.size()); + rle_data_reader->setValues(value_cache.data() + 1, valid_index_steps.size() - 1, val_getter); + + append_nulls(valid_index_steps[0]); + for (size_t i = 1; i < valid_index_steps.size(); i++) + { + append_string(value_cache[i]); + append_nulls(valid_index_steps[i] - 1); + } + }, + /* repeated_visitor */ [&](bool is_valid, UInt32, UInt32 count) { + if (is_valid) + { + value_cache.resize(count); + rle_data_reader->setValues(value_cache.data(), count, val_getter); + for (UInt32 i = 0; i < count; i++) + { + append_string(value_cache[i]); + } + } + else + { + append_nulls(count); + } + } + ); +} + +template +void ParquetRleDictReader::readBatch( + MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values) +{ + auto cursor = col_ptr->size(); + auto * column_data = getResizedPrimitiveData(*assert_cast(col_ptr.get()), cursor + num_values); + const auto & dictionary_array = static_cast(page_dictionary).getData(); + + auto val_getter = [&](Int32 val) { return dictionary_array[val]; }; + def_level_reader->visitNullableBySteps( + cursor, + num_values, + max_def_level, + /* individual_null_visitor */ [&](UInt32 nest_cursor) { + null_map.setNull(nest_cursor); + }, + /* stepped_valid_visitor */ [&](UInt32 nest_cursor, const std::vector & valid_index_steps) { + rle_data_reader->setValueBySteps(column_data + nest_cursor, valid_index_steps, val_getter); + }, + /* repeated_visitor */ [&](bool is_valid, UInt32 nest_cursor, UInt32 count) { + if (is_valid) + { + rle_data_reader->setValues(column_data + nest_cursor, count, val_getter); + } + else + { + null_map.setNull(nest_cursor, count); + } + } + ); +} + + +template class ParquetPlainValuesReader; +template class ParquetPlainValuesReader; +template class ParquetPlainValuesReader; +template class ParquetPlainValuesReader; +template class ParquetPlainValuesReader>; +template class ParquetPlainValuesReader>; +template class ParquetPlainValuesReader; + +template class ParquetFixedLenPlainReader>; +template class ParquetFixedLenPlainReader>; + +template class ParquetRleLCReader; +template class ParquetRleLCReader; +template class ParquetRleLCReader; + +template class ParquetRleDictReader; +template class ParquetRleDictReader; +template class ParquetRleDictReader; +template class ParquetRleDictReader; +template class ParquetRleDictReader>; +template class ParquetRleDictReader>; +template class ParquetRleDictReader>; +template class ParquetRleDictReader>; +template class ParquetRleDictReader>; +template class ParquetRleDictReader; + +} diff --git a/src/Processors/Formats/Impl/Parquet/ParquetDataValuesReader.h b/src/Processors/Formats/Impl/Parquet/ParquetDataValuesReader.h new file mode 100644 index 00000000000..2c95f495339 --- /dev/null +++ b/src/Processors/Formats/Impl/Parquet/ParquetDataValuesReader.h @@ -0,0 +1,263 @@ +#pragma once + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include "ParquetDataBuffer.h" + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int PARQUET_EXCEPTION; +} + +class RleValuesReader +{ +public: + RleValuesReader(std::unique_ptr bit_reader_, Int32 bit_width_) + : bit_reader(std::move(bit_reader_)), bit_width(bit_width_) {} + + /** + * @brief Used when the bit_width is 0, so all elements have same value. + */ + RleValuesReader(UInt32 total_size, Int32 val = 0) + : bit_reader(nullptr), bit_width(0), cur_group_size(total_size), cur_value(val), cur_group_is_packed(false) + {} + + void nextGroup(); + + void nextGroupIfNecessary() { if (cur_group_cursor >= cur_group_size) nextGroup(); } + + UInt32 curGroupLeft() const { return cur_group_size - cur_group_cursor; } + + /** + * @brief Visit num_values elements. + * For RLE encoding, for same group, the value is same, so they can be visited repeatedly. + * For BitPacked encoding, the values may be different with each other, so they must be visited individual. + * + * @tparam IndividualVisitor A callback with signature: void(Int32 val) + * @tparam RepeatedVisitor A callback with signature: void(UInt32 count, Int32 val) + */ + template + void visitValues(UInt32 num_values, IndividualVisitor && individual_visitor, RepeatedVisitor && repeated_visitor); + + /** + * @brief Visit num_values elements by parsed nullability. + * If the parsed value is same as max_def_level, then it is processed as null value. + * + * @tparam IndividualVisitor A callback with signature: void(size_t cursor) + * @tparam RepeatedVisitor A callback with signature: void(size_t cursor, UInt32 count) + * + * Because the null map is processed, so only the callbacks only need to process the valid data. + */ + template + void visitNullableValues( + size_t cursor, + UInt32 num_values, + Int32 max_def_level, + LazyNullMap & null_map, + IndividualVisitor && individual_visitor, + RepeatedVisitor && repeated_visitor); + + /** + * @brief Visit num_values elements by parsed nullability. + * It may be inefficient to process the valid data individually like in visitNullableValues, + * so a valid_index_steps index array is generated first, in order to process valid data continuously. + * + * @tparam IndividualNullVisitor A callback with signature: void(size_t cursor), used to process null value + * @tparam SteppedValidVisitor A callback with signature: + * void(size_t cursor, const std::vector & valid_index_steps) + * for n valid elements with null value interleaved in a BitPacked group, + * i-th item in valid_index_steps describes how many elements in column there are after (i-1)-th valid element. + * + * take following BitPacked group with 2 valid elements for example: + * null valid null null valid null + * then the valid_index_steps has values [1, 3, 2]. + * Please note that the the sum of valid_index_steps is same as elements number in this group. + * + * @tparam RepeatedVisitor A callback with signature: void(bool is_valid, UInt32 cursor, UInt32 count) + */ + template + void visitNullableBySteps( + size_t cursor, + UInt32 num_values, + Int32 max_def_level, + IndividualNullVisitor && null_visitor, + SteppedValidVisitor && stepped_valid_visitor, + RepeatedVisitor && repeated_visitor); + + /** + * @brief Set the Values to column_data directly + * + * @tparam TValue The type of column data. + * @tparam ValueGetter A callback with signature: TValue(Int32 val) + */ + template + void setValues(TValue * column_data, UInt32 num_values, ValueGetter && val_getter); + + /** + * @brief Set the value by valid_index_steps generated in visitNullableBySteps. + * According to visitNullableBySteps, the elements number is valid_index_steps.size()-1, + * so valid_index_steps.size()-1 elements are read, and set to column_data with steps in valid_index_steps + */ + template + void setValueBySteps( + TValue * column_data, + const std::vector & col_data_steps, + ValueGetter && val_getter); + +private: + std::unique_ptr bit_reader; + + std::vector cur_packed_bit_values; + std::vector valid_index_steps; + + Int32 bit_width; + + UInt32 cur_group_size = 0; + UInt32 cur_group_cursor = 0; + Int32 cur_value; + bool cur_group_is_packed; +}; + +using RleValuesReaderPtr = std::unique_ptr; + + +class ParquetDataValuesReader +{ +public: + virtual void readBatch(MutableColumnPtr & column, LazyNullMap & null_map, UInt32 num_values) = 0; + + virtual ~ParquetDataValuesReader() = default; +}; + +using ParquetDataValuesReaderPtr = std::unique_ptr; + + +/** + * The definition level is RLE or BitPacked encoding, while data is read directly + */ +template +class ParquetPlainValuesReader : public ParquetDataValuesReader +{ +public: + + ParquetPlainValuesReader( + Int32 max_def_level_, + std::unique_ptr def_level_reader_, + ParquetDataBuffer data_buffer_) + : max_def_level(max_def_level_) + , def_level_reader(std::move(def_level_reader_)) + , plain_data_buffer(std::move(data_buffer_)) + {} + + void readBatch(MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values) override; + +private: + Int32 max_def_level; + std::unique_ptr def_level_reader; + ParquetDataBuffer plain_data_buffer; +}; + +/** + * The data and definition level encoding are same as ParquetPlainValuesReader. + * But the element size is const and bigger than primitive data type. + */ +template +class ParquetFixedLenPlainReader : public ParquetDataValuesReader +{ +public: + + ParquetFixedLenPlainReader( + Int32 max_def_level_, + Int32 elem_bytes_num_, + std::unique_ptr def_level_reader_, + ParquetDataBuffer data_buffer_) + : max_def_level(max_def_level_) + , elem_bytes_num(elem_bytes_num_) + , def_level_reader(std::move(def_level_reader_)) + , plain_data_buffer(std::move(data_buffer_)) + {} + + void readOverBigDecimal(MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values); + + void readBatch(MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values) override; + +private: + Int32 max_def_level; + Int32 elem_bytes_num; + std::unique_ptr def_level_reader; + ParquetDataBuffer plain_data_buffer; +}; + +/** + * Read data according to the format of ColumnLowCardinality format. + * + * Only index and null column are processed in this class. + * And all null value is mapped to first index in dictionary, + * so the result index valued is added by one. +*/ +template +class ParquetRleLCReader : public ParquetDataValuesReader +{ +public: + ParquetRleLCReader( + Int32 max_def_level_, + std::unique_ptr def_level_reader_, + std::unique_ptr rle_data_reader_) + : max_def_level(max_def_level_) + , def_level_reader(std::move(def_level_reader_)) + , rle_data_reader(std::move(rle_data_reader_)) + {} + + void readBatch(MutableColumnPtr & index_col, LazyNullMap & null_map, UInt32 num_values) override; + +private: + Int32 max_def_level; + std::unique_ptr def_level_reader; + std::unique_ptr rle_data_reader; +}; + +/** + * The definition level is RLE or BitPacked encoded, + * and the index of dictionary is also RLE or BitPacked encoded. + * + * while the result is not parsed as a low cardinality column, + * instead, a normal column is generated. + */ +template +class ParquetRleDictReader : public ParquetDataValuesReader +{ +public: + ParquetRleDictReader( + Int32 max_def_level_, + std::unique_ptr def_level_reader_, + std::unique_ptr rle_data_reader_, + const IColumn & page_dictionary_) + : max_def_level(max_def_level_) + , def_level_reader(std::move(def_level_reader_)) + , rle_data_reader(std::move(rle_data_reader_)) + , page_dictionary(page_dictionary_) + {} + + void readBatch(MutableColumnPtr & col_ptr, LazyNullMap & null_map, UInt32 num_values) override; + +private: + Int32 max_def_level; + std::unique_ptr def_level_reader; + std::unique_ptr rle_data_reader; + const IColumn & page_dictionary; +}; + +} diff --git a/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp new file mode 100644 index 00000000000..00dee9074fe --- /dev/null +++ b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp @@ -0,0 +1,506 @@ +#include "ParquetLeafColReader.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; + extern const int BAD_ARGUMENTS; + extern const int PARQUET_EXCEPTION; +} + +namespace +{ + +template +void visitColStrIndexType(size_t data_size, TypeVisitor && visitor) +{ + // refer to: DataTypeLowCardinality::createColumnUniqueImpl + if (data_size < (1ull << 8)) + { + visitor(static_cast(nullptr)); + } + else if (data_size < (1ull << 16)) + { + visitor(static_cast(nullptr)); + } + else if (data_size < (1ull << 32)) + { + visitor(static_cast(nullptr)); + } + else + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "unsupported data size {}", data_size); + } +} + +void reserveColumnStrRows(MutableColumnPtr & col, UInt32 rows_num) +{ + col->reserve(rows_num); + + /// Never reserve for too big size according to SerializationString::deserializeBinaryBulk + if (rows_num < 256 * 1024 * 1024) + { + try + { + static_cast(col.get())->getChars().reserve(rows_num); + } + catch (Exception & e) + { + e.addMessage("(limit = " + toString(rows_num) + ")"); + throw; + } + } +}; + + +template +ColumnPtr readDictPage( + const parquet::DictionaryPage & page, + const parquet::ColumnDescriptor & col_des, + const DataTypePtr & /* data_type */); + +template <> +ColumnPtr readDictPage( + const parquet::DictionaryPage & page, + const parquet::ColumnDescriptor & /* col_des */, + const DataTypePtr & /* data_type */) +{ + auto col = ColumnString::create(); + col->getOffsets().resize(page.num_values() + 1); + col->getChars().reserve(page.num_values()); + ParquetDataBuffer buffer(page.data(), page.size()); + + // will be read as low cardinality column + // in which case, the null key is set to first position, so the first string should be empty + col->getChars().push_back(0); + col->getOffsets()[0] = 1; + for (auto i = 1; i <= page.num_values(); i++) + { + buffer.readString(*col, i); + } + return col; +} + +template <> +ColumnPtr readDictPage>( + const parquet::DictionaryPage & page, + const parquet::ColumnDescriptor & /* col_des */, + const DataTypePtr & data_type) +{ + auto & datetime_type = assert_cast(*data_type); + auto dict_col = ColumnDecimal::create(page.num_values(), datetime_type.getScale()); + auto * col_data = dict_col->getData().data(); + ParquetDataBuffer buffer(page.data(), page.size(), datetime_type.getScale()); + for (auto i = 0; i < page.num_values(); i++) + { + buffer.readDateTime64(col_data[i]); + } + return dict_col; +} + +template +ColumnPtr readDictPage( + const parquet::DictionaryPage & page, + const parquet::ColumnDescriptor & col_des, + const DataTypePtr & /* data_type */) +{ + auto dict_col = TColumnDecimal::create(page.num_values(), col_des.type_scale()); + auto * col_data = dict_col->getData().data(); + ParquetDataBuffer buffer(page.data(), page.size()); + for (auto i = 0; i < page.num_values(); i++) + { + buffer.readOverBigDecimal(col_data + i, col_des.type_length()); + } + return dict_col; +} + +template requires (!std::is_same_v) +ColumnPtr readDictPage( + const parquet::DictionaryPage & page, + const parquet::ColumnDescriptor & col_des, + const DataTypePtr & /* data_type */) +{ + auto dict_col = TColumnDecimal::create(page.num_values(), col_des.type_scale()); + ParquetDataBuffer buffer(page.data(), page.size()); + buffer.readBytes(dict_col->getData().data(), page.num_values() * sizeof(typename TColumnDecimal::ValueType)); + return dict_col; +} + +template +ColumnPtr readDictPage( + const parquet::DictionaryPage & page, + const parquet::ColumnDescriptor & /* col_des */, + const DataTypePtr & /* data_type */) +{ + auto dict_col = TColumnVector::create(page.num_values()); + ParquetDataBuffer buffer(page.data(), page.size()); + buffer.readBytes(dict_col->getData().data(), page.num_values() * sizeof(typename TColumnVector::ValueType)); + return dict_col; +} + + +template +std::unique_ptr createPlainReader( + const parquet::ColumnDescriptor & col_des, + RleValuesReaderPtr def_level_reader, + ParquetDataBuffer buffer); + +template +std::unique_ptr createPlainReader( + const parquet::ColumnDescriptor & col_des, + RleValuesReaderPtr def_level_reader, + ParquetDataBuffer buffer) +{ + return std::make_unique>( + col_des.max_definition_level(), + col_des.type_length(), + std::move(def_level_reader), + std::move(buffer)); +} + +template +std::unique_ptr createPlainReader( + const parquet::ColumnDescriptor & col_des, + RleValuesReaderPtr def_level_reader, + ParquetDataBuffer buffer) +{ + return std::make_unique>( + col_des.max_definition_level(), std::move(def_level_reader), std::move(buffer)); +} + + +} // anonymous namespace + + +template +ParquetLeafColReader::ParquetLeafColReader( + const parquet::ColumnDescriptor & col_descriptor_, + DataTypePtr base_type_, + std::unique_ptr meta_, + std::unique_ptr reader_) + : col_descriptor(col_descriptor_) + , base_data_type(base_type_) + , col_chunk_meta(std::move(meta_)) + , parquet_page_reader(std::move(reader_)) + , log(&Poco::Logger::get("ParquetLeafColReader")) +{ +} + +template +ColumnWithTypeAndName ParquetLeafColReader::readBatch(UInt32 rows_num, const String & name) +{ + reading_rows_num = rows_num; + auto readPageIfEmpty = [&]() { + while (!cur_page_values) readPage(); + }; + + // make sure the dict page has been read, and the status is updated + readPageIfEmpty(); + resetColumn(rows_num); + + while (rows_num) + { + // if dictionary page encountered, another page should be read + readPageIfEmpty(); + + auto read_values = std::min(rows_num, cur_page_values); + data_values_reader->readBatch(column, *null_map, read_values); + + cur_page_values -= read_values; + rows_num -= read_values; + } + + return releaseColumn(name); +} + +template <> +void ParquetLeafColReader::resetColumn(UInt32 rows_num) +{ + if (reading_low_cardinality) + { + assert(dictionary); + visitColStrIndexType(dictionary->size(), [&](TColVec *) { + column = TColVec::create(); + }); + + // only first position is used + null_map = std::make_unique(1); + column->reserve(rows_num); + } + else + { + null_map = std::make_unique(rows_num); + column = ColumnString::create(); + reserveColumnStrRows(column, rows_num); + } +} + +template +void ParquetLeafColReader::resetColumn(UInt32 rows_num) +{ + assert(!reading_low_cardinality); + + column = base_data_type->createColumn(); + column->reserve(rows_num); + null_map = std::make_unique(rows_num); +} + +template +void ParquetLeafColReader::degradeDictionary() +{ + assert(dictionary && column->size()); + null_map = std::make_unique(reading_rows_num); + auto col_existing = std::move(column); + column = ColumnString::create(); + + ColumnString & col_dest = *static_cast(column.get()); + const ColumnString & col_dict_str = *static_cast(dictionary.get()); + + visitColStrIndexType(dictionary->size(), [&](TColVec *) { + const TColVec & col_src = *static_cast(col_existing.get()); + reserveColumnStrRows(column, reading_rows_num); + + col_dest.getOffsets().resize(col_src.size()); + for (size_t i = 0; i < col_src.size(); i++) + { + auto src_idx = col_src.getData()[i]; + if (0 == src_idx) + { + null_map->setNull(i); + } + auto dict_chars_cursor = col_dict_str.getOffsets()[src_idx - 1]; + auto str_len = col_dict_str.getOffsets()[src_idx] - dict_chars_cursor; + auto dst_chars_cursor = col_dest.getChars().size(); + col_dest.getChars().resize(dst_chars_cursor + str_len); + + memcpySmallAllowReadWriteOverflow15( + &col_dest.getChars()[dst_chars_cursor], &col_dict_str.getChars()[dict_chars_cursor], str_len); + col_dest.getOffsets()[i] = col_dest.getChars().size(); + } + }); + LOG_INFO(log, "degraded dictionary to normal column"); +} + +template +ColumnWithTypeAndName ParquetLeafColReader::releaseColumn(const String & name) +{ + DataTypePtr data_type = base_data_type; + if (reading_low_cardinality) + { + MutableColumnPtr col_unique; + if (null_map->getNullableCol()) + { + data_type = std::make_shared(data_type); + col_unique = ColumnUnique::create(dictionary->assumeMutable(), true); + } + else + { + col_unique = ColumnUnique::create(dictionary->assumeMutable(), false); + } + column = ColumnLowCardinality::create(std::move(col_unique), std::move(column), true); + data_type = std::make_shared(data_type); + } + else + { + if (null_map->getNullableCol()) + { + column = ColumnNullable::create(std::move(column), null_map->getNullableCol()->assumeMutable()); + data_type = std::make_shared(data_type); + } + } + ColumnWithTypeAndName res = {std::move(column), data_type, name}; + column = nullptr; + null_map = nullptr; + + return res; +} + +template +void ParquetLeafColReader::readPage() +{ + // refer to: ColumnReaderImplBase::ReadNewPage in column_reader.cc + auto cur_page = parquet_page_reader->NextPage(); + switch (cur_page->type()) + { + case parquet::PageType::DATA_PAGE: + readPageV1(*std::static_pointer_cast(cur_page)); + break; + case parquet::PageType::DATA_PAGE_V2: + readPageV2(*std::static_pointer_cast(cur_page)); + break; + case parquet::PageType::DICTIONARY_PAGE: + { + const parquet::DictionaryPage & dict_page = *std::static_pointer_cast(cur_page); + if (unlikely( + dict_page.encoding() != parquet::Encoding::PLAIN_DICTIONARY + && dict_page.encoding() != parquet::Encoding::PLAIN)) + { + throw new Exception( + ErrorCodes::NOT_IMPLEMENTED, "Unsupported dictionary page encoding {}", dict_page.encoding()); + } + LOG_INFO(log, "{} values in dictionary page of column {}", dict_page.num_values(), col_descriptor.name()); + + dictionary = readDictPage(dict_page, col_descriptor, base_data_type); + if (std::is_same_v) + { + reading_low_cardinality = true; + } + break; + } + default: + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported page type: {}", cur_page->type()); + } +} + +template +void ParquetLeafColReader::readPageV1(const parquet::DataPageV1 & page) +{ + static parquet::LevelDecoder repetition_level_decoder; + + cur_page_values = page.num_values(); + + // refer to: VectorizedColumnReader::readPageV1 in Spark and LevelDecoder::SetData in column_reader.cc + if (page.definition_level_encoding() != parquet::Encoding::RLE && col_descriptor.max_definition_level() != 0) + { + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", page.definition_level_encoding()); + } + const auto * buffer = page.data(); + auto max_size = page.size(); + + if (col_descriptor.max_repetition_level() > 0) + { + auto rep_levels_bytes = repetition_level_decoder.SetData( + page.repetition_level_encoding(), col_descriptor.max_repetition_level(), 0, buffer, max_size); + buffer += rep_levels_bytes; + max_size -= rep_levels_bytes; + } + + assert(col_descriptor.max_definition_level() >= 0); + std::unique_ptr def_level_reader; + if (col_descriptor.max_definition_level() > 0) { + auto bit_width = arrow::BitUtil::Log2(col_descriptor.max_definition_level() + 1); + auto num_bytes = ::arrow::util::SafeLoadAs(buffer); + auto bit_reader = std::make_unique(buffer + 4, num_bytes); + num_bytes += 4; + buffer += num_bytes; + max_size -= num_bytes; + def_level_reader = std::make_unique(std::move(bit_reader), bit_width); + } + else + { + def_level_reader = std::make_unique(page.num_values()); + } + + switch (page.encoding()) + { + case parquet::Encoding::PLAIN: + { + if (reading_low_cardinality) + { + reading_low_cardinality = false; + degradeDictionary(); + } + + ParquetDataBuffer parquet_buffer = [&]() { + if constexpr (!std::is_same_v, TColumn>) + return ParquetDataBuffer(buffer, max_size); + + auto scale = assert_cast(*base_data_type).getScale(); + return ParquetDataBuffer(buffer, max_size, scale); + }(); + data_values_reader = createPlainReader( + col_descriptor, std::move(def_level_reader), std::move(parquet_buffer)); + break; + } + case parquet::Encoding::RLE_DICTIONARY: + case parquet::Encoding::PLAIN_DICTIONARY: + { + if (unlikely(!dictionary)) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "dictionary should be existed"); + } + + // refer to: DictDecoderImpl::SetData in encoding.cc + auto bit_width = *buffer; + auto bit_reader = std::make_unique(++buffer, --max_size); + data_values_reader = createDictReader( + std::move(def_level_reader), std::make_unique(std::move(bit_reader), bit_width)); + break; + } + case parquet::Encoding::BYTE_STREAM_SPLIT: + case parquet::Encoding::DELTA_BINARY_PACKED: + case parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY: + case parquet::Encoding::DELTA_BYTE_ARRAY: + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", page.encoding()); + + default: + throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unknown encoding type: {}", page.encoding()); + } +} + +template +void ParquetLeafColReader::readPageV2(const parquet::DataPageV2 & /*page*/) +{ + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "read page V2 is not implemented yet"); +} + +template +std::unique_ptr ParquetLeafColReader::createDictReader( + std::unique_ptr def_level_reader, std::unique_ptr rle_data_reader) +{ + if (reading_low_cardinality && std::same_as) + { + std::unique_ptr res; + visitColStrIndexType(dictionary->size(), [&](TCol *) { + res = std::make_unique>( + col_descriptor.max_definition_level(), + std::move(def_level_reader), + std::move(rle_data_reader)); + }); + return res; + } + return std::make_unique>( + col_descriptor.max_definition_level(), + std::move(def_level_reader), + std::move(rle_data_reader), + *assert_cast(dictionary.get())); +} + + +template class ParquetLeafColReader; +template class ParquetLeafColReader; +template class ParquetLeafColReader; +template class ParquetLeafColReader; +template class ParquetLeafColReader; +template class ParquetLeafColReader>; +template class ParquetLeafColReader>; +template class ParquetLeafColReader>; +template class ParquetLeafColReader>; +template class ParquetLeafColReader>; + +} diff --git a/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.h b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.h new file mode 100644 index 00000000000..f730afe40ed --- /dev/null +++ b/src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.h @@ -0,0 +1,63 @@ +#pragma once + +#include +#include +#include + +#include "ParquetColumnReader.h" +#include "ParquetDataValuesReader.h" + +namespace parquet +{ + +class ColumnDescriptor; + +} + + +namespace DB +{ + +template +class ParquetLeafColReader : public ParquetColumnReader +{ +public: + ParquetLeafColReader( + const parquet::ColumnDescriptor & col_descriptor_, + DataTypePtr base_type_, + std::unique_ptr meta_, + std::unique_ptr reader_); + + ColumnWithTypeAndName readBatch(UInt32 rows_num, const String & name) override; + +private: + const parquet::ColumnDescriptor & col_descriptor; + DataTypePtr base_data_type; + std::unique_ptr col_chunk_meta; + std::unique_ptr parquet_page_reader; + std::unique_ptr data_values_reader; + + MutableColumnPtr column; + std::unique_ptr null_map; + + ColumnPtr dictionary; + + UInt32 cur_page_values = 0; + UInt32 reading_rows_num = 0; + bool reading_low_cardinality = false; + + Poco::Logger * log; + + void resetColumn(UInt32 rows_num); + void degradeDictionary(); + ColumnWithTypeAndName releaseColumn(const String & name); + + void readPage(); + void readPageV1(const parquet::DataPageV1 & page); + void readPageV2(const parquet::DataPageV2 & page); + + std::unique_ptr createDictReader( + std::unique_ptr def_level_reader, std::unique_ptr rle_data_reader); +}; + +} diff --git a/src/Processors/Formats/Impl/Parquet/ParquetRecordReader.cpp b/src/Processors/Formats/Impl/Parquet/ParquetRecordReader.cpp new file mode 100644 index 00000000000..a5744b85174 --- /dev/null +++ b/src/Processors/Formats/Impl/Parquet/ParquetRecordReader.cpp @@ -0,0 +1,225 @@ +#include "ParquetRecordReader.h" + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "ParquetLeafColReader.h" + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int PARQUET_EXCEPTION; +} + +// #define THROW_ARROW_NOT_OK(status) \ +// do \ +// { \ +// if (::arrow::Status _s = (status); !_s.ok()) \ +// throw Exception(_s.ToString(), ErrorCodes::BAD_ARGUMENTS); \ +// } while (false) + + +#define THROW_PARQUET_EXCEPTION(s) \ + do \ + { \ + try { (s); } \ + catch (const ::parquet::ParquetException & e) \ + { \ + throw Exception(e.what(), ErrorCodes::PARQUET_EXCEPTION); \ + } \ + } while (false) + +namespace +{ + +Int64 getTotalRows(const parquet::FileMetaData & meta_data) +{ + Int64 res = 0; + for (int i = 0; i < meta_data.num_row_groups(); i++) + { + res += meta_data.RowGroup(i)->num_rows(); + } + return res; +} + +std::unique_ptr createReader( + const parquet::ColumnDescriptor & col_descriptor, + DataTypePtr ch_type, + std::unique_ptr meta, + std::unique_ptr reader) +{ + if (col_descriptor.logical_type()->is_date() && parquet::Type::INT32 == col_descriptor.physical_type()) + { + return std::make_unique>( + col_descriptor, std::make_shared(), std::move(meta), std::move(reader)); + } + else if (col_descriptor.logical_type()->is_decimal()) + { + switch (col_descriptor.physical_type()) + { + case parquet::Type::INT32: + { + auto data_type = std::make_shared( + col_descriptor.type_precision(), col_descriptor.type_scale()); + return std::make_unique>>( + col_descriptor, data_type, std::move(meta), std::move(reader)); + } + case parquet::Type::INT64: + { + auto data_type = std::make_shared( + col_descriptor.type_precision(), col_descriptor.type_scale()); + return std::make_unique>>( + col_descriptor, data_type, std::move(meta), std::move(reader)); + } + case parquet::Type::FIXED_LEN_BYTE_ARRAY: + { + if (col_descriptor.type_length() <= static_cast(DecimalUtils::max_precision)) + { + auto data_type = std::make_shared( + col_descriptor.type_precision(), col_descriptor.type_scale()); + return std::make_unique>>( + col_descriptor, data_type, std::move(meta), std::move(reader)); + } + else + { + auto data_type = std::make_shared( + col_descriptor.type_precision(), col_descriptor.type_scale()); + return std::make_unique>>( + col_descriptor, data_type, std::move(meta), std::move(reader)); + } + } + default: + throw Exception( + ErrorCodes::PARQUET_EXCEPTION, + "Type not supported for decimal: {}", + col_descriptor.physical_type()); + } + } + else + { + switch (col_descriptor.physical_type()) + { + case parquet::Type::INT32: + return std::make_unique>( + col_descriptor, std::make_shared(), std::move(meta), std::move(reader)); + case parquet::Type::INT64: + return std::make_unique>( + col_descriptor, std::make_shared(), std::move(meta), std::move(reader)); + case parquet::Type::FLOAT: + return std::make_unique>( + col_descriptor, std::make_shared(), std::move(meta), std::move(reader)); + case parquet::Type::INT96: + { + DataTypePtr read_type = ch_type; + if (!isDateTime64(ch_type)) + { + read_type = std::make_shared(ParquetRecordReader::default_datetime64_scale); + } + return std::make_unique>>( + col_descriptor, read_type, std::move(meta), std::move(reader)); + } + case parquet::Type::DOUBLE: + return std::make_unique>( + col_descriptor, std::make_shared(), std::move(meta), std::move(reader)); + case parquet::Type::BYTE_ARRAY: + return std::make_unique>( + col_descriptor, std::make_shared(), std::move(meta), std::move(reader)); + default: + throw Exception( + ErrorCodes::PARQUET_EXCEPTION, "Type not supported: {}", col_descriptor.physical_type()); + } + } +} + +} // anonymouse namespace + +ParquetRecordReader::ParquetRecordReader( + Block header_, + std::shared_ptr<::arrow::io::RandomAccessFile> file, + const parquet::ReaderProperties& properties) + : header(std::move(header_)) +{ + // Only little endian system is supported currently + static_assert(std::endian::native == std::endian::little); + + log = &Poco::Logger::get("ParquetRecordReader"); + THROW_PARQUET_EXCEPTION(file_reader = parquet::ParquetFileReader::Open(std::move(file), properties)); + left_rows = getTotalRows(*file_reader->metadata()); + + parquet_col_indice.reserve(header.columns()); + column_readers.reserve(header.columns()); + for (const auto & col_with_name : header) + { + auto idx = file_reader->metadata()->schema()->ColumnIndex(col_with_name.name); + if (idx < 0) + { + throw Exception("can not find column with name: " + col_with_name.name, ErrorCodes::BAD_ARGUMENTS); + } + parquet_col_indice.push_back(idx); + } +} + +Chunk ParquetRecordReader::readChunk(UInt32 num_rows) +{ + if (!left_rows) + { + return Chunk{}; + } + if (!cur_row_group_left_rows) + { + loadNextRowGroup(); + } + + Columns columns(header.columns()); + auto num_rows_read = std::min(static_cast(num_rows), cur_row_group_left_rows); + for (size_t i = 0; i < header.columns(); i++) + { + columns[i] = castColumn( + column_readers[i]->readBatch(num_rows_read, header.getByPosition(i).name), + header.getByPosition(i).type); + } + left_rows -= num_rows_read; + cur_row_group_left_rows -= num_rows_read; + + return Chunk{std::move(columns), num_rows_read}; +} + +void ParquetRecordReader::loadNextRowGroup() +{ + Stopwatch watch(CLOCK_MONOTONIC); + cur_row_group_reader = file_reader->RowGroup(next_row_group_idx); + + column_readers.clear(); + for (size_t i = 0; i < parquet_col_indice.size(); i++) + { + column_readers.emplace_back(createReader( + *file_reader->metadata()->schema()->Column(parquet_col_indice[i]), + header.getByPosition(i).type, + cur_row_group_reader->metadata()->ColumnChunk(parquet_col_indice[i]), + cur_row_group_reader->GetColumnPageReader(parquet_col_indice[i]))); + } + LOG_DEBUG(log, "reading row group {} consumed {} ms", next_row_group_idx, watch.elapsedNanoseconds() / 1e6); + ++next_row_group_idx; + cur_row_group_left_rows = cur_row_group_reader->metadata()->num_rows(); +} + +} diff --git a/src/Processors/Formats/Impl/Parquet/ParquetRecordReader.h b/src/Processors/Formats/Impl/Parquet/ParquetRecordReader.h new file mode 100644 index 00000000000..d77cab6553b --- /dev/null +++ b/src/Processors/Formats/Impl/Parquet/ParquetRecordReader.h @@ -0,0 +1,48 @@ +#pragma once + +#include +#include +#include +#include + +#include +#include +#include + +#include "ParquetColumnReader.h" + +namespace DB +{ + +class ParquetRecordReader +{ +public: + ParquetRecordReader( + Block header_, + std::shared_ptr<::arrow::io::RandomAccessFile> file, + const parquet::ReaderProperties& properties); + + Chunk readChunk(UInt32 num_rows); + + // follow the scale generated by spark + static constexpr UInt8 default_datetime64_scale = 9; + +private: + std::unique_ptr file_reader; + + Block header; + + std::shared_ptr cur_row_group_reader; + ParquetColReaders column_readers; + + std::vector parquet_col_indice; + UInt64 left_rows; + UInt64 cur_row_group_left_rows = 0; + int next_row_group_idx = 0; + + Poco::Logger * log; + + void loadNextRowGroup(); +}; + +}