Merge pull request #50974 from kssenii/iceberg-metadata-fix

Fix iceberg V2 optional metadata parsing
This commit is contained in:
Kseniia Sumarokova 2023-06-15 09:31:36 +02:00 committed by GitHub
commit c8619ee6e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -184,11 +184,17 @@ struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
* Manifest file has the following format: '/iceberg_data/db/table_name/metadata/c87bfec7-d36c-4075-ad04-600b6b0f2020-m0.avro'
*
* `manifest file` is different in format version V1 and V2 and has the following contents:
* Format version V1:
* v1 v2
* status req req
* snapshot_id req opt
* sequence_number opt
* file_sequence_number opt
* data_file req req
* Example format version V1:
* statussnapshot_iddata_file
* 1 2819310504515118887 ('/iceberg_data/db/table_name/data/00000-1-3edca534-15a0-4f74-8a28-4733e0bf1270-00001.parquet','PARQUET',(),100,1070,67108864,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],0)
*
* Format version V2:
* Example format version V2:
* statussnapshot_idsequence_numberfile_sequence_numberdata_file
* 1 5887006101709926452 (0,'/iceberg_data/db/table_name/data/00000-1-c8045c90-8799-4eac-b957-79a0484e223c-00001.parquet','PARQUET',(),100,1070,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],[],0)
*
@ -209,22 +215,26 @@ struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
auto buffer = MetadataReadHelper::createReadBuffer(manifest_file, context, configuration);
auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer));
avro::NodePtr node;
if (metadata.format_version == 1)
node = file_reader->dataSchema().root()->leafAt(2);
else if (metadata.format_version == 2)
node = file_reader->dataSchema().root()->leafAt(4);
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected format version: {}", metadata.format_version);
avro::NodePtr root_node = file_reader->dataSchema().root();
size_t leaves_num = root_node->leaves();
size_t expected_min_num = metadata.format_version == 1 ? 3 : 2;
if (leaves_num < expected_min_num)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Unexpected number of columns {}. Expected at least {}",
root_node->leaves(), expected_min_num);
}
if (node->type() != avro::Type::AVRO_RECORD)
avro::NodePtr data_file_node = root_node->leafAt(static_cast<int>(leaves_num) - 1);
if (data_file_node->type() != avro::Type::AVRO_RECORD)
{
throw Exception(
ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `data_file` field should be Tuple type, got {}",
node->type());
data_file_node->type());
}
auto data_type = AvroSchemaReader::avroNodeToDataType(node);
auto data_type = AvroSchemaReader::avroNodeToDataType(data_file_node);
const auto columns = parseAvro(*file_reader, data_type, manifest_path, getFormatSettings(context));
const auto col_tuple = typeid_cast<ColumnTuple *>(columns.at(0).get());