fix style

Change-Id: I8f7ebd173558b16d94d3161cb0b5300e7e78833d
This commit is contained in:
copperybean 2024-02-24 22:47:53 +08:00
parent 18b3ebcda3
commit e1fcdba4dd
5 changed files with 54 additions and 34 deletions

View File

@ -9,6 +9,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int PARQUET_EXCEPTION;
}
template <typename T> struct ToArrowDecimal;
template <> struct ToArrowDecimal<Decimal<wide::integer<128, signed>>>
@ -27,8 +32,8 @@ class ParquetDataBuffer
private:
public:
ParquetDataBuffer(const uint8_t * data_, UInt64 avaible_, UInt8 datetime64_scale_ = DataTypeDateTime64::default_scale)
: data(reinterpret_cast<const Int8 *>(data_)), avaible(avaible_), datetime64_scale(datetime64_scale_) {}
ParquetDataBuffer(const uint8_t * data_, UInt64 available_, UInt8 datetime64_scale_ = DataTypeDateTime64::default_scale)
: data(reinterpret_cast<const Int8 *>(data_)), available(available_), datetime64_scale(datetime64_scale_) {}
template <typename TValue>
void ALWAYS_INLINE readValue(TValue & dst)
@ -84,7 +89,7 @@ public:
auto value_len = ::arrow::util::SafeLoadAs<Int32>(getArrowData());
if (unlikely(value_len < 0 || value_len > INT32_MAX - 4))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid or corrupted value_len '{}'", value_len);
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Invalid or corrupted value_len '{}'", value_len);
}
consume(4);
checkAvaible(value_len);
@ -110,7 +115,7 @@ public:
auto status = TArrowDecimal::FromBigEndian(getArrowData(), elem_bytes_num);
if (unlikely(!status.ok()))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Read parquet decimal failed: {}", status.status().ToString());
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Read parquet decimal failed: {}", status.status().ToString());
}
status.ValueUnsafe().ToBytes(reinterpret_cast<uint8_t *>(out));
consume(elem_bytes_num);
@ -118,14 +123,14 @@ public:
private:
const Int8 * data;
UInt64 avaible;
UInt64 available;
const UInt8 datetime64_scale;
void ALWAYS_INLINE checkAvaible(UInt64 num)
{
if (unlikely(avaible < num))
if (unlikely(available < num))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Consuming {} bytes while {} avaible", num, avaible);
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Consuming {} bytes while {} available", num, available);
}
}
@ -134,7 +139,7 @@ private:
void ALWAYS_INLINE consume(UInt64 num)
{
data += num;
avaible -= num;
available -= num;
}
};

View File

@ -8,6 +8,12 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int PARQUET_EXCEPTION;
}
void RleValuesReader::nextGroup()
{
// refer to:
@ -142,7 +148,7 @@ void RleValuesReader::visitNullableBySteps(
individual_null_visitor(null_map_cursor);
if (unlikely(valid_index_steps[step_idx] == UINT8_MAX))
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "unsupported packed values number");
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "unsupported packed values number");
}
valid_index_steps[step_idx]++;
}
@ -270,7 +276,7 @@ void ParquetPlainValuesReader<ColumnString>::readBatch(
auto idx = cursor;
cursor += count;
// the type of offset_data is PaddedPODArray, which makes sure that the -1 index is avaible
// the type of offset_data is PaddedPODArray, which makes sure that the -1 index is available
for (auto val_offset = offset_data[idx - 1]; idx < cursor; idx++)
{
offset_data[idx] = ++val_offset;
@ -394,14 +400,17 @@ void ParquetRleLCReader<TColumnVector>::readBatch(
cursor,
num_values,
max_def_level,
/* individual_null_visitor */ [&](size_t nest_cursor) {
/* individual_null_visitor */ [&](size_t nest_cursor)
{
column_data[nest_cursor] = 0;
has_null = true;
},
/* stepped_valid_visitor */ [&](size_t nest_cursor, const std::vector<UInt8> & valid_index_steps) {
/* stepped_valid_visitor */ [&](size_t nest_cursor, const std::vector<UInt8> & valid_index_steps)
{
rle_data_reader->setValueBySteps(column_data + nest_cursor, valid_index_steps, val_getter);
},
/* repeated_visitor */ [&](bool is_valid, size_t nest_cursor, UInt32 count) {
/* repeated_visitor */ [&](bool is_valid, size_t nest_cursor, UInt32 count)
{
if (is_valid)
{
rle_data_reader->setValues(column_data + nest_cursor, count, val_getter);
@ -435,7 +444,8 @@ void ParquetRleDictReader<ColumnString>::readBatch(
auto * offset_data = column.getOffsets().data();
auto & chars = column.getChars();
auto append_nulls = [&](UInt8 num) {
auto append_nulls = [&](UInt8 num)
{
for (auto limit = cursor + num; cursor < limit; cursor++)
{
chars.push_back(0);
@ -444,7 +454,8 @@ void ParquetRleDictReader<ColumnString>::readBatch(
}
};
auto append_string = [&](Int32 dict_idx) {
auto append_string = [&](Int32 dict_idx)
{
auto dict_chars_cursor = dict_offsets[dict_idx - 1];
auto value_len = dict_offsets[dict_idx] - dict_chars_cursor;
auto chars_cursor = chars.size();
@ -462,7 +473,8 @@ void ParquetRleDictReader<ColumnString>::readBatch(
num_values,
max_def_level,
/* individual_null_visitor */ [&](size_t) {},
/* stepped_valid_visitor */ [&](size_t, const std::vector<UInt8> & valid_index_steps) {
/* stepped_valid_visitor */ [&](size_t, const std::vector<UInt8> & valid_index_steps)
{
value_cache.resize(valid_index_steps.size());
rle_data_reader->setValues(
value_cache.data() + 1, static_cast<UInt32>(valid_index_steps.size() - 1), val_getter);
@ -474,7 +486,8 @@ void ParquetRleDictReader<ColumnString>::readBatch(
append_nulls(valid_index_steps[i] - 1);
}
},
/* repeated_visitor */ [&](bool is_valid, size_t, UInt32 count) {
/* repeated_visitor */ [&](bool is_valid, size_t, UInt32 count)
{
if (is_valid)
{
value_cache.resize(count);
@ -505,13 +518,16 @@ void ParquetRleDictReader<TColumnVector>::readBatch(
cursor,
num_values,
max_def_level,
/* individual_null_visitor */ [&](size_t nest_cursor) {
/* individual_null_visitor */ [&](size_t nest_cursor)
{
null_map.setNull(nest_cursor);
},
/* stepped_valid_visitor */ [&](size_t nest_cursor, const std::vector<UInt8> & valid_index_steps) {
/* stepped_valid_visitor */ [&](size_t nest_cursor, const std::vector<UInt8> & valid_index_steps)
{
rle_data_reader->setValueBySteps(column_data + nest_cursor, valid_index_steps, val_getter);
},
/* repeated_visitor */ [&](bool is_valid, size_t nest_cursor, UInt32 count) {
/* repeated_visitor */ [&](bool is_valid, size_t nest_cursor, UInt32 count)
{
if (is_valid)
{
rle_data_reader->setValues(column_data + nest_cursor, count, val_getter);

View File

@ -15,12 +15,6 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int PARQUET_EXCEPTION;
}
class RleValuesReader
{
public:

View File

@ -216,7 +216,8 @@ template <typename TColumn>
ColumnWithTypeAndName ParquetLeafColReader<TColumn>::readBatch(UInt64 rows_num, const String & name)
{
reading_rows_num = rows_num;
auto readPageIfEmpty = [&]() {
auto readPageIfEmpty = [&]()
{
while (!cur_page_values) readPage();
};
@ -245,7 +246,8 @@ void ParquetLeafColReader<ColumnString>::resetColumn(UInt64 rows_num)
if (reading_low_cardinality)
{
assert(dictionary);
visitColStrIndexType(dictionary->size(), [&]<typename TColVec>(TColVec *) {
visitColStrIndexType(dictionary->size(), [&]<typename TColVec>(TColVec *)
{
column = TColVec::create();
});
@ -289,7 +291,8 @@ void ParquetLeafColReader<TColumn>::degradeDictionary()
ColumnString & col_dest = *static_cast<ColumnString *>(column.get());
const ColumnString & col_dict_str = *static_cast<const ColumnString *>(dictionary.get());
visitColStrIndexType(dictionary->size(), [&]<typename TColVec>(TColVec *) {
visitColStrIndexType(dictionary->size(), [&]<typename TColVec>(TColVec *)
{
const TColVec & col_src = *static_cast<const TColVec *>(col_existing.get());
reserveColumnStrRows(column, reading_rows_num);
@ -411,7 +414,8 @@ void ParquetLeafColReader<TColumn>::readPageV1(const parquet::DataPageV1 & page)
assert(col_descriptor.max_definition_level() >= 0);
std::unique_ptr<RleValuesReader> def_level_reader;
if (col_descriptor.max_definition_level() > 0) {
if (col_descriptor.max_definition_level() > 0)
{
auto bit_width = arrow::bit_util::Log2(col_descriptor.max_definition_level() + 1);
auto num_bytes = ::arrow::util::SafeLoadAs<int32_t>(buffer);
auto bit_reader = std::make_unique<arrow::bit_util::BitReader>(buffer + 4, num_bytes);
@ -435,7 +439,8 @@ void ParquetLeafColReader<TColumn>::readPageV1(const parquet::DataPageV1 & page)
degradeDictionary();
}
ParquetDataBuffer parquet_buffer = [&]() {
ParquetDataBuffer parquet_buffer = [&]()
{
if constexpr (!std::is_same_v<ColumnDecimal<DateTime64>, TColumn>)
return ParquetDataBuffer(buffer, max_size);
@ -485,7 +490,8 @@ std::unique_ptr<ParquetDataValuesReader> ParquetLeafColReader<TColumn>::createDi
if (reading_low_cardinality && std::same_as<TColumn, ColumnString>)
{
std::unique_ptr<ParquetDataValuesReader> res;
visitColStrIndexType(dictionary->size(), [&]<typename TCol>(TCol *) {
visitColStrIndexType(dictionary->size(), [&]<typename TCol>(TCol *)
{
res = std::make_unique<ParquetRleLCReader<TCol>>(
col_descriptor.max_definition_level(),
std::move(def_level_reader),

View File

@ -27,7 +27,6 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int PARQUET_EXCEPTION;
}
@ -142,7 +141,7 @@ std::unique_ptr<ParquetColumnReader> createColReader(
}
}
} // anonymouse namespace
} // anonymous namespace
ParquetRecordReader::ParquetRecordReader(
Block header_,