fix reader type, update comment

Change-Id: Iefec91bca223eedaabe302b7891808c6d94eed9d
This commit is contained in:
copperybean 2024-05-20 16:28:21 +08:00
parent 5d848aa32f
commit ad5f6f27df
2 changed files with 22 additions and 8 deletions

View File

@ -80,6 +80,7 @@ public:
* the second line shows the corresponding validation state, * the second line shows the corresponding validation state,
* then the valid_index_steps has values [1, 3, 2]. * then the valid_index_steps has values [1, 3, 2].
* Please note that the the sum of valid_index_steps is same as elements number in this group. * Please note that the the sum of valid_index_steps is same as elements number in this group.
* TODO the definition of valid_index_steps should be updated when supporting nested types
* *
* @tparam RepeatedVisitor A callback with signature: void(bool is_valid, UInt32 cursor, UInt32 count) * @tparam RepeatedVisitor A callback with signature: void(bool is_valid, UInt32 cursor, UInt32 count)
*/ */

View File

@ -27,6 +27,7 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int NOT_IMPLEMENTED;
extern const int PARQUET_EXCEPTION; extern const int PARQUET_EXCEPTION;
} }
@ -225,7 +226,7 @@ std::unique_ptr<ParquetColumnReader> ColReaderFactory::fromInt32INT(const parque
{ {
switch (int_type.bit_width()) switch (int_type.bit_width())
{ {
case sizeof(Int32): case 32:
{ {
if (int_type.is_signed()) if (int_type.is_signed())
return makeLeafReader<DataTypeInt32>(); return makeLeafReader<DataTypeInt32>();
@ -241,7 +242,7 @@ std::unique_ptr<ParquetColumnReader> ColReaderFactory::fromInt64INT(const parque
{ {
switch (int_type.bit_width()) switch (int_type.bit_width())
{ {
case sizeof(Int64): case 64:
{ {
if (int_type.is_signed()) if (int_type.is_signed())
return makeLeafReader<DataTypeInt64>(); return makeLeafReader<DataTypeInt64>();
@ -312,16 +313,28 @@ ParquetRecordReader::ParquetRecordReader(
{ {
log = &Poco::Logger::get("ParquetRecordReader"); log = &Poco::Logger::get("ParquetRecordReader");
std::unordered_map<String, parquet::schema::NodePtr> parquet_columns;
auto root = file_reader->metadata()->schema()->group_node();
for (int i = 0; i < root->field_count(); ++i)
{
auto & node = root->field(i);
parquet_columns.emplace(node->name(), node);
}
parquet_col_indice.reserve(header.columns()); parquet_col_indice.reserve(header.columns());
column_readers.reserve(header.columns()); column_readers.reserve(header.columns());
for (const auto & col_with_name : header) for (const auto & col_with_name : header)
{ {
auto idx = file_reader->metadata()->schema()->ColumnIndex(col_with_name.name); auto it = parquet_columns.find(col_with_name.name);
if (idx < 0) if (it == parquet_columns.end())
{ throw Exception(ErrorCodes::PARQUET_EXCEPTION, "no column with '{}' in parquet file", col_with_name.name);
auto msg = PreformattedMessage::create("can not find column with name: {}", col_with_name.name);
throw Exception(std::move(msg), ErrorCodes::PARQUET_EXCEPTION); auto node = it->second;
} if (!node->is_primitive())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "arrays and maps are not implemented in native parquet reader");
auto idx = file_reader->metadata()->schema()->ColumnIndex(*node);
chassert(idx >= 0);
parquet_col_indice.push_back(idx); parquet_col_indice.push_back(idx);
} }
if (reader_properties.pre_buffer()) if (reader_properties.pre_buffer())