This commit is contained in:
kssenii 2023-06-14 11:31:28 +02:00
parent e03cd725b0
commit 2a3ef3941e

View File

@ -184,11 +184,17 @@ struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
* Manifest file has the following format: '/iceberg_data/db/table_name/metadata/c87bfec7-d36c-4075-ad04-600b6b0f2020-m0.avro' * Manifest file has the following format: '/iceberg_data/db/table_name/metadata/c87bfec7-d36c-4075-ad04-600b6b0f2020-m0.avro'
* *
* `manifest file` is different in format version V1 and V2 and has the following contents: * `manifest file` is different in format version V1 and V2 and has the following contents:
* Format version V1: * v1 v2
* status req req
* snapshot_id req opt
* sequence_number opt
* file_sequence_number opt
* data_file req req
* Example format version V1:
* statussnapshot_iddata_file * statussnapshot_iddata_file
* 1 2819310504515118887 ('/iceberg_data/db/table_name/data/00000-1-3edca534-15a0-4f74-8a28-4733e0bf1270-00001.parquet','PARQUET',(),100,1070,67108864,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],0) * 1 2819310504515118887 ('/iceberg_data/db/table_name/data/00000-1-3edca534-15a0-4f74-8a28-4733e0bf1270-00001.parquet','PARQUET',(),100,1070,67108864,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],0)
* *
* Format version V2: * Example format version V2:
* statussnapshot_idsequence_numberfile_sequence_numberdata_file * statussnapshot_idsequence_numberfile_sequence_numberdata_file
* 1 5887006101709926452 (0,'/iceberg_data/db/table_name/data/00000-1-c8045c90-8799-4eac-b957-79a0484e223c-00001.parquet','PARQUET',(),100,1070,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],[],0) * 1 5887006101709926452 (0,'/iceberg_data/db/table_name/data/00000-1-c8045c90-8799-4eac-b957-79a0484e223c-00001.parquet','PARQUET',(),100,1070,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],[],0)
* *
@ -209,22 +215,25 @@ struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
auto buffer = MetadataReadHelper::createReadBuffer(manifest_file, context, configuration); auto buffer = MetadataReadHelper::createReadBuffer(manifest_file, context, configuration);
auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer)); auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer));
avro::NodePtr node; avro::NodePtr root_node = file_reader->dataSchema().root();
if (metadata.format_version == 1) int leaves_num = static_cast<int>(root_node->leaves());
node = file_reader->dataSchema().root()->leafAt(2); if (leaves_num < 2)
else if (metadata.format_version == 2) {
node = file_reader->dataSchema().root()->leafAt(4); throw Exception(
else ErrorCodes::BAD_ARGUMENTS,
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected format version: {}", metadata.format_version); "Unexpected number of columns {}. Expected at least 2",
root_node->leaves());
}
if (node->type() != avro::Type::AVRO_RECORD) avro::NodePtr data_file_node = root_node->leafAt(leaves_num - 1);
if (data_file_node->type() != avro::Type::AVRO_RECORD)
{ {
throw Exception( throw Exception(
ErrorCodes::ILLEGAL_COLUMN, ErrorCodes::ILLEGAL_COLUMN,
"The parsed column from Avro file of `data_file` field should be Tuple type, got {}", "The parsed column from Avro file of `data_file` field should be Tuple type, got {}",
node->type()); data_file_node->type());
} }
auto data_type = AvroSchemaReader::avroNodeToDataType(node); auto data_type = AvroSchemaReader::avroNodeToDataType(data_file_node);
const auto columns = parseAvro(*file_reader, data_type, manifest_path, getFormatSettings(context)); const auto columns = parseAvro(*file_reader, data_type, manifest_path, getFormatSettings(context));
const auto col_tuple = typeid_cast<ColumnTuple *>(columns.at(0).get()); const auto col_tuple = typeid_cast<ColumnTuple *>(columns.at(0).get());