This commit is contained in:
avogar 2023-10-17 12:12:10 +00:00
parent 527e96db3f
commit 032d82e004
2 changed files with 62 additions and 54 deletions

View File

@ -2,34 +2,34 @@
#if USE_AWS_S3 && USE_AVRO
#include <Common/logger_useful.h>
# include <Common/logger_useful.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnTuple.h>
#include <Columns/IColumn.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <Formats/FormatFactory.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Processors/Formats/Impl/AvroRowInputFormat.h>
#include <Storages/DataLakes/Iceberg/IcebergMetadata.h>
#include <Storages/DataLakes/S3MetadataReader.h>
#include <Storages/StorageS3.h>
# include <Columns/ColumnString.h>
# include <Columns/ColumnTuple.h>
# include <Columns/IColumn.h>
# include <DataTypes/DataTypeArray.h>
# include <DataTypes/DataTypeDate.h>
# include <DataTypes/DataTypeDateTime64.h>
# include <DataTypes/DataTypeFactory.h>
# include <DataTypes/DataTypeFixedString.h>
# include <DataTypes/DataTypeMap.h>
# include <DataTypes/DataTypeNullable.h>
# include <DataTypes/DataTypeString.h>
# include <DataTypes/DataTypeTuple.h>
# include <DataTypes/DataTypeUUID.h>
# include <DataTypes/DataTypesDecimal.h>
# include <DataTypes/DataTypesNumber.h>
# include <Formats/FormatFactory.h>
# include <IO/ReadBufferFromString.h>
# include <IO/ReadHelpers.h>
# include <Processors/Formats/Impl/AvroRowInputFormat.h>
# include <Storages/DataLakes/Iceberg/IcebergMetadata.h>
# include <Storages/DataLakes/S3MetadataReader.h>
# include <Storages/StorageS3.h>
#include <Poco/JSON/Array.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
# include <Poco/JSON/Array.h>
# include <Poco/JSON/Object.h>
# include <Poco/JSON/Parser.h>
namespace DB
@ -37,10 +37,10 @@ namespace DB
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
extern const int ILLEGAL_COLUMN;
extern const int BAD_ARGUMENTS;
extern const int UNSUPPORTED_METHOD;
extern const int FILE_DOESNT_EXIST;
extern const int ILLEGAL_COLUMN;
extern const int BAD_ARGUMENTS;
extern const int UNSUPPORTED_METHOD;
}
IcebergMetadata::IcebergMetadata(
@ -236,7 +236,6 @@ DataTypePtr getFieldType(const Poco::Dynamic::Var & type, bool required)
return getComplexTypeFromObject(type.extract<Poco::JSON::Object::Ptr>());
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected 'type' field: {}", type.toString());
}
std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version)
@ -253,7 +252,10 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
current_schema_id = metadata_object->getValue<int>("current-schema-id");
auto schemas = metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>();
if (schemas->size() != 1)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not supported");
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not "
"supported");
/// Now we sure that there is only one schema.
schema = schemas->getObject(0);
@ -267,7 +269,10 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
/// Field "schemas" is optional for version 1, but after version 2 was introduced,
/// in most cases this field is added for new tables in version 1 as well.
if (metadata_object->has("schemas") && metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>()->size() > 1)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not supported");
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not "
"supported");
}
NamesAndTypesList names_and_types;
@ -283,10 +288,7 @@ std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::P
return {std::move(names_and_types), current_schema_id};
}
MutableColumns parseAvro(
avro::DataFileReaderBase & file_reader,
const Block & header,
const FormatSettings & settings)
MutableColumns parseAvro(avro::DataFileReaderBase & file_reader, const Block & header, const FormatSettings & settings)
{
auto deserializer = std::make_unique<AvroDeserializer>(header, file_reader.dataSchema(), true, true, settings);
MutableColumns columns = header.cloneEmptyColumns();
@ -311,19 +313,19 @@ std::pair<Int32, String> getMetadataFileAndVersion(const StorageS3::Configuratio
if (metadata_files.empty())
{
throw Exception(
ErrorCodes::FILE_DOESNT_EXIST,
"The metadata file for Iceberg table with path {} doesn't exist",
configuration.url.key);
ErrorCodes::FILE_DOESNT_EXIST, "The metadata file for Iceberg table with path {} doesn't exist", configuration.url.key);
}
std::vector<std::pair<UInt32, String>> metadata_files_with_versions;
metadata_files_with_versions.reserve(metadata_files.size());
for (const auto & file : metadata_files)
for (const auto & path : metadata_files)
{
String version_str(file.begin() + 1, file.begin() + file.find_first_of('.'));
String file_name(path.begin() + path.find_last_of('/') + 1, path.end());
String version_str(file_name.begin() + 1, file_name.begin() + file_name.find_first_of('.'));
if (!std::all_of(version_str.begin(), version_str.end(), isdigit))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file);
metadata_files_with_versions.emplace_back(std::stoi(version_str), file);
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file_name);
metadata_files_with_versions.emplace_back(std::stoi(version_str), path);
}
/// Get the latest version of metadata file: v<V>.metadata.json
@ -362,7 +364,8 @@ std::unique_ptr<IcebergMetadata> parseIcebergMetadata(const StorageS3::Configura
}
}
return std::make_unique<IcebergMetadata>(configuration, context_, metadata_version, format_version, manifest_list_file, schema_id, schema);
return std::make_unique<IcebergMetadata>(
configuration, context_, metadata_version, format_version, manifest_list_file, schema_id, schema);
}
/**
@ -402,7 +405,8 @@ Strings IcebergMetadata::getDataFiles()
LOG_TEST(log, "Collect manifest files from manifest list {}", manifest_list_file);
auto manifest_list_buf = S3DataLakeMetadataReadHelper::createReadBuffer(manifest_list_file, getContext(), configuration);
auto manifest_list_file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*manifest_list_buf));
auto manifest_list_file_reader
= std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*manifest_list_buf));
auto data_type = AvroSchemaReader::avroNodeToDataType(manifest_list_file_reader->dataSchema().root()->leafAt(0));
Block header{{data_type->createColumn(), data_type, "manifest_path"}};
@ -422,7 +426,7 @@ Strings IcebergMetadata::getDataFiles()
{
const auto file_path = col_str->getDataAt(i).toView();
const auto filename = std::filesystem::path(file_path).filename();
manifest_files.emplace_back(std::filesystem::path(configuration.url.key) / "metadata_path" / filename);
manifest_files.emplace_back(std::filesystem::path(configuration.url.key) / "metadata" / filename);
}
NameSet files;
@ -443,7 +447,10 @@ Strings IcebergMetadata::getDataFiles()
Poco::Dynamic::Var json = parser.parse(schema_json_string);
Poco::JSON::Object::Ptr schema_object = json.extract<Poco::JSON::Object::Ptr>();
if (schema_object->getValue<int>("schema-id") != current_schema_id)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not supported");
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD,
"Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not "
"supported");
avro::NodePtr root_node = manifest_file_reader->dataSchema().root();
size_t leaves_num = root_node->leaves();
@ -451,9 +458,7 @@ Strings IcebergMetadata::getDataFiles()
if (leaves_num < expected_min_num)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Unexpected number of columns {}. Expected at least {}",
root_node->leaves(), expected_min_num);
ErrorCodes::BAD_ARGUMENTS, "Unexpected number of columns {}. Expected at least {}", root_node->leaves(), expected_min_num);
}
avro::NodePtr status_node = root_node->leafAt(0);
@ -500,7 +505,7 @@ Strings IcebergMetadata::getDataFiles()
}
const auto * status_int_column = assert_cast<ColumnInt32 *>(columns.at(0).get());
const auto & data_file_tuple_type = assert_cast<const DataTypeTuple &>(*data_type.get());
const auto & data_file_tuple_type = assert_cast<const DataTypeTuple &>(*data_col_data_type.get());
const auto * data_file_tuple_column = assert_cast<ColumnTuple *>(columns.at(1).get());
if (status_int_column->size() != data_file_tuple_column->size())
@ -546,7 +551,8 @@ Strings IcebergMetadata::getDataFiles()
{
Int32 content_type = content_int_column->getElement(i);
if (DataFileContent(content_type) != DataFileContent::DATA)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: positional and equality deletes are not supported");
throw Exception(
ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: positional and equality deletes are not supported");
}
const auto status = status_int_column->getInt(i);
@ -563,6 +569,7 @@ Strings IcebergMetadata::getDataFiles()
}
else
{
LOG_TEST(log, "Processing data file for path: {}", file_path);
files.insert(file_path);
}
}

View File

@ -18,6 +18,7 @@ StoragePtr StorageIceberg::create(
configuration.update(context_);
auto metadata = parseIcebergMetadata(configuration, context_);
auto schema_from_metadata = metadata->getTableSchema();
configuration.keys = metadata->getDataFiles();
return std::make_shared<StorageIceberg>(std::move(metadata), configuration, context_, table_id_, columns_.empty() ? ColumnsDescription(schema_from_metadata) : columns_, constraints_, comment, format_settings_);
}
@ -52,7 +53,7 @@ void StorageIceberg::updateConfigurationImpl(ContextPtr local_context)
const bool updated = base_configuration.update(local_context);
auto new_metadata = parseIcebergMetadata(base_configuration, local_context);
/// Check if nothing was changed.
if (!updated && new_metadata->getVersion() == current_metadata->getVersion())
if (updated && new_metadata->getVersion() == current_metadata->getVersion())
return;
if (new_metadata->getVersion() != current_metadata->getVersion())