fix build

Change-Id: I57f025b17a04e2c5dded3f18e7f477841287a2c2
This commit is contained in:
copperybean 2024-01-14 12:01:23 +08:00
parent 9d0ad7ba67
commit 8fb89cec9f
12 changed files with 71 additions and 56 deletions

View File

@ -44,6 +44,10 @@ concept is_over_big_int =
|| std::is_same_v<T, UInt256>
|| std::is_same_v<T, Decimal128>
|| std::is_same_v<T, Decimal256>;
template <class T>
concept is_over_big_decimal = is_decimal<T> && is_over_big_int<typename T::NativeType>;
}
template <> struct is_signed<DB::Decimal32> { static constexpr bool value = true; };

View File

@ -141,6 +141,14 @@ protected:
UInt32 scale;
};
template <class TCol>
concept is_col_over_big_decimal = std::is_same_v<TCol, ColumnDecimal<typename TCol::ValueType>>
&& is_decimal<typename TCol::ValueType> && is_over_big_int<typename TCol::NativeT>;
template <class TCol>
concept is_col_int_decimal = std::is_same_v<TCol, ColumnDecimal<typename TCol::ValueType>>
&& is_decimal<typename TCol::ValueType> && std::is_integral_v<typename TCol::NativeT>;
template <class> class ColumnVector;
template <class T> struct ColumnVectorOrDecimalT { using Col = ColumnVector<T>; };
template <is_decimal T> struct ColumnVectorOrDecimalT<T> { using Col = ColumnDecimal<T>; };

View File

@ -441,6 +441,9 @@ ColumnPtr ColumnVector<T>::indexImpl(const PaddedPODArray<Type> & indexes, size_
return res;
}
template <class TCol>
concept is_col_vector = std::is_same_v<TCol, ColumnVector<typename TCol::ValueType>>;
/// Prevent implicit template instantiation of ColumnVector for common types
extern template class ColumnVector<UInt8>;

View File

@ -600,6 +600,7 @@
M(719, QUERY_CACHE_USED_WITH_SYSTEM_TABLE) \
M(720, USER_EXPIRED) \
M(721, DEPRECATED_FUNCTION) \
M(722, PARQUET_EXCEPTION) \
\
M(900, DISTRIBUTED_CACHE_ERROR) \
M(901, CANNOT_USE_DISTRIBUTED_CACHE) \

View File

@ -1,6 +1,7 @@
#pragma once
#include <Columns/IColumn.h>
#include <Core/ColumnWithTypeAndName.h>
namespace parquet
{
@ -18,7 +19,7 @@ namespace DB
class ParquetColumnReader
{
public:
virtual ColumnWithTypeAndName readBatch(UInt32 rows_num, const String & name) = 0;
virtual ColumnWithTypeAndName readBatch(UInt64 rows_num, const String & name) = 0;
virtual ~ParquetColumnReader() = default;
};

View File

@ -142,15 +142,19 @@ private:
class LazyNullMap
{
public:
LazyNullMap(UInt32 size_) : size(size_), col_nullable(nullptr) {}
LazyNullMap(UInt64 size_) : size(size_), col_nullable(nullptr) {}
void setNull(UInt32 cursor)
template <typename T>
requires std::is_integral_v<T>
void setNull(T cursor)
{
initialize();
null_map[cursor] = 1;
}
void setNull(UInt32 cursor, UInt32 count)
template <typename T>
requires std::is_integral_v<T>
void setNull(T cursor, UInt32 count)
{
initialize();
memset(null_map + cursor, 1, count);
@ -159,7 +163,7 @@ public:
ColumnPtr getNullableCol() { return col_nullable; }
private:
UInt32 size;
UInt64 size;
UInt8 * null_map;
ColumnPtr col_nullable;

View File

@ -189,7 +189,7 @@ void RleValuesReader::setValueBySteps(
res_values += *(step_iterator++);
visitValues(
col_data_steps.size() - 1,
static_cast<UInt32>(col_data_steps.size() - 1),
/* individual_visitor */ [&](Int32 val)
{
*res_values = val_getter(val);
@ -394,14 +394,14 @@ void ParquetRleLCReader<TColumnVector>::readBatch(
cursor,
num_values,
max_def_level,
/* individual_null_visitor */ [&](UInt32 nest_cursor) {
/* individual_null_visitor */ [&](size_t nest_cursor) {
column_data[nest_cursor] = 0;
has_null = true;
},
/* stepped_valid_visitor */ [&](UInt32 nest_cursor, const std::vector<UInt8> & valid_index_steps) {
/* stepped_valid_visitor */ [&](size_t nest_cursor, const std::vector<UInt8> & valid_index_steps) {
rle_data_reader->setValueBySteps(column_data + nest_cursor, valid_index_steps, val_getter);
},
/* repeated_visitor */ [&](bool is_valid, UInt32 nest_cursor, UInt32 count) {
/* repeated_visitor */ [&](bool is_valid, size_t nest_cursor, UInt32 count) {
if (is_valid)
{
rle_data_reader->setValues(column_data + nest_cursor, count, val_getter);
@ -461,10 +461,11 @@ void ParquetRleDictReader<ColumnString>::readBatch(
cursor,
num_values,
max_def_level,
/* individual_null_visitor */ [&](UInt32) {},
/* stepped_valid_visitor */ [&](UInt32, const std::vector<UInt8> & valid_index_steps) {
/* individual_null_visitor */ [&](size_t) {},
/* stepped_valid_visitor */ [&](size_t, const std::vector<UInt8> & valid_index_steps) {
value_cache.resize(valid_index_steps.size());
rle_data_reader->setValues(value_cache.data() + 1, valid_index_steps.size() - 1, val_getter);
rle_data_reader->setValues(
value_cache.data() + 1, static_cast<UInt32>(valid_index_steps.size() - 1), val_getter);
append_nulls(valid_index_steps[0]);
for (size_t i = 1; i < valid_index_steps.size(); i++)
@ -473,7 +474,7 @@ void ParquetRleDictReader<ColumnString>::readBatch(
append_nulls(valid_index_steps[i] - 1);
}
},
/* repeated_visitor */ [&](bool is_valid, UInt32, UInt32 count) {
/* repeated_visitor */ [&](bool is_valid, size_t, UInt32 count) {
if (is_valid)
{
value_cache.resize(count);
@ -504,13 +505,13 @@ void ParquetRleDictReader<TColumnVector>::readBatch(
cursor,
num_values,
max_def_level,
/* individual_null_visitor */ [&](UInt32 nest_cursor) {
/* individual_null_visitor */ [&](size_t nest_cursor) {
null_map.setNull(nest_cursor);
},
/* stepped_valid_visitor */ [&](UInt32 nest_cursor, const std::vector<UInt8> & valid_index_steps) {
/* stepped_valid_visitor */ [&](size_t nest_cursor, const std::vector<UInt8> & valid_index_steps) {
rle_data_reader->setValueBySteps(column_data + nest_cursor, valid_index_steps, val_getter);
},
/* repeated_visitor */ [&](bool is_valid, UInt32 nest_cursor, UInt32 count) {
/* repeated_visitor */ [&](bool is_valid, size_t nest_cursor, UInt32 count) {
if (is_valid)
{
rle_data_reader->setValues(column_data + nest_cursor, count, val_getter);

View File

@ -3,7 +3,6 @@
#include <functional>
#include <concepts>
#include <base/logger_useful.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
@ -25,7 +24,7 @@ namespace ErrorCodes
class RleValuesReader
{
public:
RleValuesReader(std::unique_ptr<arrow::BitUtil::BitReader> bit_reader_, Int32 bit_width_)
RleValuesReader(std::unique_ptr<arrow::bit_util::BitReader> bit_reader_, Int32 bit_width_)
: bit_reader(std::move(bit_reader_)), bit_width(bit_width_) {}
/**
@ -45,7 +44,7 @@ public:
* @brief Visit num_values elements.
* For RLE encoding, for same group, the value is same, so they can be visited repeatedly.
* For BitPacked encoding, the values may be different with each other, so they must be visited individual.
*
*
* @tparam IndividualVisitor A callback with signature: void(Int32 val)
* @tparam RepeatedVisitor A callback with signature: void(UInt32 count, Int32 val)
*/
@ -55,10 +54,10 @@ public:
/**
* @brief Visit num_values elements by parsed nullability.
* If the parsed value is same as max_def_level, then it is processed as null value.
*
*
* @tparam IndividualVisitor A callback with signature: void(size_t cursor)
* @tparam RepeatedVisitor A callback with signature: void(size_t cursor, UInt32 count)
*
*
* Because the null map is processed, so only the callbacks only need to process the valid data.
*/
template <typename IndividualVisitor, typename RepeatedVisitor>
@ -74,18 +73,18 @@ public:
* @brief Visit num_values elements by parsed nullability.
* It may be inefficient to process the valid data individually like in visitNullableValues,
* so a valid_index_steps index array is generated first, in order to process valid data continuously.
*
*
* @tparam IndividualNullVisitor A callback with signature: void(size_t cursor), used to process null value
* @tparam SteppedValidVisitor A callback with signature:
* void(size_t cursor, const std::vector<UInt8> & valid_index_steps)
* for n valid elements with null value interleaved in a BitPacked group,
* i-th item in valid_index_steps describes how many elements in column there are after (i-1)-th valid element.
*
*
* take following BitPacked group with 2 valid elements for example:
* null valid null null valid null
* then the valid_index_steps has values [1, 3, 2].
* Please note that the the sum of valid_index_steps is same as elements number in this group.
*
*
* @tparam RepeatedVisitor A callback with signature: void(bool is_valid, UInt32 cursor, UInt32 count)
*/
template <typename IndividualNullVisitor, typename SteppedValidVisitor, typename RepeatedVisitor>
@ -99,7 +98,7 @@ public:
/**
* @brief Set the Values to column_data directly
*
*
* @tparam TValue The type of column data.
* @tparam ValueGetter A callback with signature: TValue(Int32 val)
*/
@ -118,7 +117,7 @@ public:
ValueGetter && val_getter);
private:
std::unique_ptr<arrow::BitUtil::BitReader> bit_reader;
std::unique_ptr<arrow::bit_util::BitReader> bit_reader;
std::vector<Int32> cur_packed_bit_values;
std::vector<UInt8> valid_index_steps;
@ -203,7 +202,7 @@ private:
/**
* Read data according to the format of ColumnLowCardinality format.
*
*
* Only index and null column are processed in this class.
* And all null value is mapped to first index in dictionary,
* so the result index valued is added by one.
@ -232,7 +231,7 @@ private:
/**
* The definition level is RLE or BitPacked encoded,
* and the index of dictionary is also RLE or BitPacked encoded.
*
*
* while the result is not parsed as a low cardinality column,
* instead, a normal column is generated.
*/

View File

@ -7,6 +7,7 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnUnique.h>
#include <Columns/ColumnsNumber.h>
#include <Common/logger_useful.h>
#include <Core/ColumnWithTypeAndName.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeLowCardinality.h>
@ -58,7 +59,7 @@ void visitColStrIndexType(size_t data_size, TypeVisitor && visitor)
}
}
void reserveColumnStrRows(MutableColumnPtr & col, UInt32 rows_num)
void reserveColumnStrRows(MutableColumnPtr & col, UInt64 rows_num)
{
col->reserve(rows_num);
@ -212,7 +213,7 @@ ParquetLeafColReader<TColumn>::ParquetLeafColReader(
}
template <typename TColumn>
ColumnWithTypeAndName ParquetLeafColReader<TColumn>::readBatch(UInt32 rows_num, const String & name)
ColumnWithTypeAndName ParquetLeafColReader<TColumn>::readBatch(UInt64 rows_num, const String & name)
{
reading_rows_num = rows_num;
auto readPageIfEmpty = [&]() {
@ -228,7 +229,7 @@ ColumnWithTypeAndName ParquetLeafColReader<TColumn>::readBatch(UInt32 rows_num,
// if dictionary page encountered, another page should be read
readPageIfEmpty();
auto read_values = std::min(rows_num, cur_page_values);
auto read_values = static_cast<UInt32>(std::min(rows_num, static_cast<UInt64>(cur_page_values)));
data_values_reader->readBatch(column, *null_map, read_values);
cur_page_values -= read_values;
@ -239,7 +240,7 @@ ColumnWithTypeAndName ParquetLeafColReader<TColumn>::readBatch(UInt32 rows_num,
}
template <>
void ParquetLeafColReader<ColumnString>::resetColumn(UInt32 rows_num)
void ParquetLeafColReader<ColumnString>::resetColumn(UInt64 rows_num)
{
if (reading_low_cardinality)
{
@ -261,7 +262,7 @@ void ParquetLeafColReader<ColumnString>::resetColumn(UInt32 rows_num)
}
template <typename TColumn>
void ParquetLeafColReader<TColumn>::resetColumn(UInt32 rows_num)
void ParquetLeafColReader<TColumn>::resetColumn(UInt64 rows_num)
{
assert(!reading_low_cardinality);
@ -403,9 +404,9 @@ void ParquetLeafColReader<TColumn>::readPageV1(const parquet::DataPageV1 & page)
assert(col_descriptor.max_definition_level() >= 0);
std::unique_ptr<RleValuesReader> def_level_reader;
if (col_descriptor.max_definition_level() > 0) {
auto bit_width = arrow::BitUtil::Log2(col_descriptor.max_definition_level() + 1);
auto bit_width = arrow::bit_util::Log2(col_descriptor.max_definition_level() + 1);
auto num_bytes = ::arrow::util::SafeLoadAs<int32_t>(buffer);
auto bit_reader = std::make_unique<arrow::BitUtil::BitReader>(buffer + 4, num_bytes);
auto bit_reader = std::make_unique<arrow::bit_util::BitReader>(buffer + 4, num_bytes);
num_bytes += 4;
buffer += num_bytes;
max_size -= num_bytes;
@ -447,7 +448,7 @@ void ParquetLeafColReader<TColumn>::readPageV1(const parquet::DataPageV1 & page)
// refer to: DictDecoderImpl::SetData in encoding.cc
auto bit_width = *buffer;
auto bit_reader = std::make_unique<arrow::BitUtil::BitReader>(++buffer, --max_size);
auto bit_reader = std::make_unique<arrow::bit_util::BitReader>(++buffer, --max_size);
data_values_reader = createDictReader(
std::move(def_level_reader), std::make_unique<RleValuesReader>(std::move(bit_reader), bit_width));
break;

View File

@ -1,6 +1,5 @@
#pragma once
#include <base/logger_useful.h>
#include <Columns/IColumn.h>
#include <DataTypes/Serializations/ISerialization.h>
@ -28,7 +27,7 @@ public:
std::unique_ptr<parquet::ColumnChunkMetaData> meta_,
std::unique_ptr<parquet::PageReader> reader_);
ColumnWithTypeAndName readBatch(UInt32 rows_num, const String & name) override;
ColumnWithTypeAndName readBatch(UInt64 rows_num, const String & name) override;
private:
const parquet::ColumnDescriptor & col_descriptor;
@ -42,13 +41,13 @@ private:
ColumnPtr dictionary;
UInt64 reading_rows_num = 0;
UInt32 cur_page_values = 0;
UInt32 reading_rows_num = 0;
bool reading_low_cardinality = false;
Poco::Logger * log;
void resetColumn(UInt32 rows_num);
void resetColumn(UInt64 rows_num);
void degradeDictionary();
ColumnWithTypeAndName releaseColumn(const String & name);

View File

@ -5,6 +5,7 @@
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Common/logger_useful.h>
#include <Common/Stopwatch.h>
#include <Core/Types.h>
#include <DataTypes/DataTypeDate32.h>
@ -30,21 +31,14 @@ namespace ErrorCodes
extern const int PARQUET_EXCEPTION;
}
// #define THROW_ARROW_NOT_OK(status) \
// do \
// { \
// if (::arrow::Status _s = (status); !_s.ok()) \
// throw Exception(_s.ToString(), ErrorCodes::BAD_ARGUMENTS); \
// } while (false)
#define THROW_PARQUET_EXCEPTION(s) \
do \
{ \
try { (s); } \
catch (const ::parquet::ParquetException & e) \
{ \
throw Exception(e.what(), ErrorCodes::PARQUET_EXCEPTION); \
auto msg = PreformattedMessage::create("Excepted when reading parquet: {}", e.what()); \
throw Exception(std::move(msg), ErrorCodes::PARQUET_EXCEPTION); \
} \
} while (false)
@ -172,13 +166,14 @@ ParquetRecordReader::ParquetRecordReader(
auto idx = file_reader->metadata()->schema()->ColumnIndex(col_with_name.name);
if (idx < 0)
{
throw Exception("can not find column with name: " + col_with_name.name, ErrorCodes::BAD_ARGUMENTS);
auto msg = PreformattedMessage::create("can not find column with name: {}", col_with_name.name);
throw Exception(std::move(msg), ErrorCodes::BAD_ARGUMENTS);
}
parquet_col_indice.push_back(idx);
}
}
Chunk ParquetRecordReader::readChunk(UInt32 num_rows)
Chunk ParquetRecordReader::readChunk(size_t num_rows)
{
if (!left_rows)
{
@ -190,7 +185,7 @@ Chunk ParquetRecordReader::readChunk(UInt32 num_rows)
}
Columns columns(header.columns());
auto num_rows_read = std::min(static_cast<UInt64>(num_rows), cur_row_group_left_rows);
auto num_rows_read = std::min(num_rows, cur_row_group_left_rows);
for (size_t i = 0; i < header.columns(); i++)
{
columns[i] = castColumn(

View File

@ -1,9 +1,8 @@
#pragma once
#include <base/logger_useful.h>
#include <Core/Block.h>
#include <Processors/Chunk.h>
#include <Storages/Parquet/ParquetColumnReader.h>
#include <Processors/Formats/Impl/Parquet/ParquetColumnReader.h>
#include <arrow/io/interfaces.h>
#include <parquet/file_reader.h>
@ -22,8 +21,8 @@ public:
std::shared_ptr<::arrow::io::RandomAccessFile> file,
const parquet::ReaderProperties& properties);
Chunk readChunk(UInt32 num_rows);
Chunk readChunk(size_t num_rows);
// follow the scale generated by spark
static constexpr UInt8 default_datetime64_scale = 9;