mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-14 18:32:29 +00:00
Fix
This commit is contained in:
parent
5057a31bac
commit
a7f389a760
@ -154,13 +154,12 @@ struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
|
||||
*/
|
||||
void processManifestList(Metadata & metadata, const Configuration & configuration, ContextPtr context)
|
||||
{
|
||||
static constexpr auto manifest_path = "manifest_path";
|
||||
|
||||
auto buf = MetadataReadHelper::createReadBuffer(metadata.manifest_list, context, configuration);
|
||||
auto file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buf));
|
||||
|
||||
auto data_type = AvroSchemaReader::avroNodeToDataType(file_reader->dataSchema().root()->leafAt(0));
|
||||
auto columns = parseAvro(*file_reader, data_type, manifest_path, getFormatSettings(context));
|
||||
Block header{{data_type->createColumn(), data_type, "manifest_path"}};
|
||||
auto columns = parseAvro(*file_reader, header, getFormatSettings(context));
|
||||
auto & col = columns.at(0);
|
||||
|
||||
if (col->getDataType() != TypeIndex::String)
|
||||
@ -207,9 +206,7 @@ struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
|
||||
*/
|
||||
Strings getFilesForRead(const Metadata & metadata, const Configuration & configuration, ContextPtr context)
|
||||
{
|
||||
static constexpr auto manifest_path = "data_file";
|
||||
|
||||
Strings keys;
|
||||
NameSet keys;
|
||||
for (const auto & manifest_file : metadata.manifest_files)
|
||||
{
|
||||
auto buffer = MetadataReadHelper::createReadBuffer(manifest_file, context, configuration);
|
||||
@ -226,59 +223,100 @@ struct IcebergMetadataParser<Configuration, MetadataReadHelper>::Impl
|
||||
root_node->leaves(), expected_min_num);
|
||||
}
|
||||
|
||||
avro::NodePtr status_node = root_node->leafAt(0);
|
||||
if (status_node->type() != avro::Type::AVRO_INT)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `status` field should be Int type, got {}",
|
||||
magic_enum::enum_name(status_node->type()));
|
||||
}
|
||||
|
||||
avro::NodePtr data_file_node = root_node->leafAt(static_cast<int>(leaves_num) - 1);
|
||||
if (data_file_node->type() != avro::Type::AVRO_RECORD)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `data_file` field should be Tuple type, got {}",
|
||||
data_file_node->type());
|
||||
}
|
||||
auto data_type = AvroSchemaReader::avroNodeToDataType(data_file_node);
|
||||
const auto columns = parseAvro(*file_reader, data_type, manifest_path, getFormatSettings(context));
|
||||
const auto col_tuple = typeid_cast<ColumnTuple *>(columns.at(0).get());
|
||||
|
||||
ColumnPtr col_str;
|
||||
if (metadata.format_version == 1)
|
||||
col_str = col_tuple->getColumnPtr(0);
|
||||
else
|
||||
col_str = col_tuple->getColumnPtr(1);
|
||||
|
||||
if (col_str->getDataType() != TypeIndex::String)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `file_path` field should be String type, got {}",
|
||||
col_str->getFamilyName());
|
||||
magic_enum::enum_name(data_file_node->type()));
|
||||
}
|
||||
|
||||
const auto * str_col = assert_cast<const ColumnString *>(col_str.get());
|
||||
for (size_t i = 0; i < str_col->size(); ++i)
|
||||
auto status_col_data_type = AvroSchemaReader::avroNodeToDataType(status_node);
|
||||
auto data_col_data_type = AvroSchemaReader::avroNodeToDataType(data_file_node);
|
||||
Block header{
|
||||
{status_col_data_type->createColumn(), status_col_data_type, "status"},
|
||||
{data_col_data_type->createColumn(), data_col_data_type, "data_file"}};
|
||||
|
||||
const auto columns = parseAvro(*file_reader, header, getFormatSettings(context));
|
||||
if (columns.size() != 2)
|
||||
{
|
||||
const auto data_path = std::string(str_col->getDataAt(i).toView());
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
|
||||
"Unexpected number of columns. Expected 2, got {}", columns.size());
|
||||
}
|
||||
|
||||
if (columns.at(0)->getDataType() != TypeIndex::Int32)
|
||||
{
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `status` field should be Int32 type, got {}",
|
||||
columns.at(0)->getFamilyName());
|
||||
}
|
||||
if (columns.at(1)->getDataType() != TypeIndex::Tuple)
|
||||
{
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `file_path` field should be Tuple type, got {}",
|
||||
columns.at(1)->getFamilyName());
|
||||
}
|
||||
|
||||
const auto status_int_column = assert_cast<ColumnInt32 *>(columns.at(0).get());
|
||||
const auto data_file_tuple_column = assert_cast<ColumnTuple *>(columns.at(1).get());
|
||||
|
||||
if (status_int_column->size() != data_file_tuple_column->size())
|
||||
{
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `file_path` and `status` have different rows number: {} and {}",
|
||||
status_int_column->size(), data_file_tuple_column->size());
|
||||
}
|
||||
|
||||
const auto * data_file_name_column = metadata.format_version == 1
|
||||
? data_file_tuple_column->getColumnPtr(0).get()
|
||||
: data_file_tuple_column->getColumnPtr(1).get();
|
||||
|
||||
if (data_file_name_column->getDataType() != TypeIndex::String)
|
||||
{
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
|
||||
"The parsed column from Avro file of `file_path` field should be String type, got {}",
|
||||
data_file_name_column->getFamilyName());
|
||||
}
|
||||
auto file_name_str_column = assert_cast<const ColumnString *>(data_file_name_column);
|
||||
|
||||
for (size_t i = 0; i < status_int_column->size(); ++i)
|
||||
{
|
||||
const auto status = status_int_column->getInt(i);
|
||||
const auto data_path = std::string(file_name_str_column->getDataAt(i).toView());
|
||||
const auto pos = data_path.find(configuration.url.key);
|
||||
const auto file_path = data_path.substr(pos);
|
||||
if (pos == std::string::npos)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected to find {} in data path: {}", configuration.url.key, data_path);
|
||||
keys.emplace_back(data_path.substr(pos));
|
||||
|
||||
if (status == 2)
|
||||
keys.erase(file_path);
|
||||
else
|
||||
keys.insert(file_path);
|
||||
}
|
||||
}
|
||||
|
||||
return keys;
|
||||
return std::vector<std::string>(keys.begin(), keys.end());
|
||||
}
|
||||
|
||||
MutableColumns parseAvro(
|
||||
avro::DataFileReaderBase & file_reader,
|
||||
const DataTypePtr & data_type,
|
||||
const String & field_name,
|
||||
const Block & header,
|
||||
const FormatSettings & settings)
|
||||
{
|
||||
auto deserializer = std::make_unique<AvroDeserializer>(
|
||||
Block{{data_type->createColumn(), data_type, field_name}}, file_reader.dataSchema(), true, true, settings);
|
||||
auto deserializer = std::make_unique<AvroDeserializer>(header, file_reader.dataSchema(), true, true, settings);
|
||||
MutableColumns columns = header.cloneEmptyColumns();
|
||||
|
||||
file_reader.init();
|
||||
MutableColumns columns;
|
||||
columns.emplace_back(data_type->createColumn());
|
||||
|
||||
RowReadExtension ext;
|
||||
while (file_reader.hasMore())
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user