mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Fixes and improvements for Iceberg storage
This commit is contained in:
parent
9629e255e7
commit
f87938735a
@ -104,6 +104,7 @@ if (TARGET ch_contrib::nats_io)
|
|||||||
endif()
|
endif()
|
||||||
|
|
||||||
add_headers_and_sources(dbms Storages/DataLakes)
|
add_headers_and_sources(dbms Storages/DataLakes)
|
||||||
|
add_headers_and_sources(dbms Storages/DataLakes/Iceberg)
|
||||||
add_headers_and_sources(dbms Common/NamedCollections)
|
add_headers_and_sources(dbms Common/NamedCollections)
|
||||||
|
|
||||||
if (TARGET ch_contrib::amqp_cpp)
|
if (TARGET ch_contrib::amqp_cpp)
|
||||||
|
@ -204,7 +204,7 @@ public:
|
|||||||
* By default - the same as read.
|
* By default - the same as read.
|
||||||
* Don't use for small reads.
|
* Don't use for small reads.
|
||||||
*/
|
*/
|
||||||
[[nodiscard]] virtual size_t readBig(char * to, size_t n) { return read(to, n); }
|
[[nodiscard]] virtual size_t readBig(char * to, size_t n) { return read(to, n); }
|
||||||
|
|
||||||
/** Do something to allow faster subsequent call to 'nextImpl' if possible.
|
/** Do something to allow faster subsequent call to 'nextImpl' if possible.
|
||||||
* It's used for asynchronous readers with double-buffering.
|
* It's used for asynchronous readers with double-buffering.
|
||||||
|
@ -27,6 +27,12 @@ public:
|
|||||||
, base_configuration(configuration_)
|
, base_configuration(configuration_)
|
||||||
, log(&Poco::Logger::get(getName())) {} // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall)
|
, log(&Poco::Logger::get(getName())) {} // NOLINT(clang-analyzer-optin.cplusplus.VirtualCall)
|
||||||
|
|
||||||
|
template <class ...Args>
|
||||||
|
static StoragePtr create(const Configuration & configuration_, ContextPtr context_, Args && ...args)
|
||||||
|
{
|
||||||
|
return std::make_shared<IStorageDataLake<Storage, Name, MetadataParser>>(configuration_, context_, std::forward<Args>(args)...);
|
||||||
|
}
|
||||||
|
|
||||||
String getName() const override { return name; }
|
String getName() const override { return name; }
|
||||||
|
|
||||||
static ColumnsDescription getTableStructureFromData(
|
static ColumnsDescription getTableStructureFromData(
|
||||||
@ -109,8 +115,7 @@ static StoragePtr createDataLakeStorage(const StorageFactory::Arguments & args)
|
|||||||
if (configuration.format == "auto")
|
if (configuration.format == "auto")
|
||||||
configuration.format = "Parquet";
|
configuration.format = "Parquet";
|
||||||
|
|
||||||
return std::make_shared<DataLake>(
|
return DataLake::create(configuration, args.getContext(), args.table_id, args.columns, args.constraints,
|
||||||
configuration, args.getContext(), args.table_id, args.columns, args.constraints,
|
|
||||||
args.comment, getFormatSettings(args.getContext()));
|
args.comment, getFormatSettings(args.getContext()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
580
src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp
Normal file
580
src/Storages/DataLakes/Iceberg/IcebergMetadata.cpp
Normal file
@ -0,0 +1,580 @@
|
|||||||
|
#include "config.h"
|
||||||
|
|
||||||
|
#if USE_AWS_S3 && USE_AVRO
|
||||||
|
|
||||||
|
#include <Common/logger_useful.h>
|
||||||
|
|
||||||
|
#include <Columns/ColumnString.h>
|
||||||
|
#include <Columns/ColumnTuple.h>
|
||||||
|
#include <Columns/IColumn.h>
|
||||||
|
#include <DataTypes/DataTypeArray.h>
|
||||||
|
#include <DataTypes/DataTypeDate.h>
|
||||||
|
#include <DataTypes/DataTypeDateTime64.h>
|
||||||
|
#include <DataTypes/DataTypeFactory.h>
|
||||||
|
#include <DataTypes/DataTypeFixedString.h>
|
||||||
|
#include <DataTypes/DataTypeMap.h>
|
||||||
|
#include <DataTypes/DataTypeNullable.h>
|
||||||
|
#include <DataTypes/DataTypeString.h>
|
||||||
|
#include <DataTypes/DataTypeTuple.h>
|
||||||
|
#include <DataTypes/DataTypeUUID.h>
|
||||||
|
#include <DataTypes/DataTypesDecimal.h>
|
||||||
|
#include <DataTypes/DataTypesNumber.h>
|
||||||
|
#include <Formats/FormatFactory.h>
|
||||||
|
#include <IO/ReadBufferFromString.h>
|
||||||
|
#include <IO/ReadHelpers.h>
|
||||||
|
#include <Processors/Formats/Impl/AvroRowInputFormat.h>
|
||||||
|
#include <Storages/DataLakes/Iceberg/IcebergMetadata.h>
|
||||||
|
#include <Storages/DataLakes/S3MetadataReader.h>
|
||||||
|
#include <Storages/StorageS3.h>
|
||||||
|
|
||||||
|
#include <Poco/JSON/Array.h>
|
||||||
|
#include <Poco/JSON/Object.h>
|
||||||
|
#include <Poco/JSON/Parser.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
namespace ErrorCodes
|
||||||
|
{
|
||||||
|
extern const int FILE_DOESNT_EXIST;
|
||||||
|
extern const int ILLEGAL_COLUMN;
|
||||||
|
extern const int BAD_ARGUMENTS;
|
||||||
|
extern const int UNSUPPORTED_METHOD;
|
||||||
|
}
|
||||||
|
|
||||||
|
IcebergMetadata::IcebergMetadata(
|
||||||
|
const StorageS3::Configuration & configuration_,
|
||||||
|
DB::ContextPtr context_,
|
||||||
|
DB::Int32 metadata_version_,
|
||||||
|
DB::Int32 format_version_,
|
||||||
|
DB::String manifest_list_file_,
|
||||||
|
DB::Int32 current_schema_id_,
|
||||||
|
DB::NamesAndTypesList schema_)
|
||||||
|
: WithContext(context_)
|
||||||
|
, configuration(configuration_)
|
||||||
|
, metadata_version(metadata_version_)
|
||||||
|
, format_version(format_version_)
|
||||||
|
, manifest_list_file(std::move(manifest_list_file_))
|
||||||
|
, current_schema_id(current_schema_id_)
|
||||||
|
, schema(std::move(schema_))
|
||||||
|
, log(&Poco::Logger::get("IcebergMetadata"))
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
|
||||||
|
enum class ManifestEntryStatus
|
||||||
|
{
|
||||||
|
EXISTING = 0,
|
||||||
|
ADDED = 1,
|
||||||
|
DELETED = 2,
|
||||||
|
};
|
||||||
|
|
||||||
|
enum class DataFileContent
|
||||||
|
{
|
||||||
|
DATA = 0,
|
||||||
|
POSITION_DELETES = 1,
|
||||||
|
EQUALITY_DELETES = 2,
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Iceberg supports the next data types (see https://iceberg.apache.org/spec/#schemas-and-data-types):
|
||||||
|
* - Primitive types:
|
||||||
|
* - boolean
|
||||||
|
* - int
|
||||||
|
* - long
|
||||||
|
* - float
|
||||||
|
* - double
|
||||||
|
* - decimal(P, S)
|
||||||
|
* - date
|
||||||
|
* - time (time of day in microseconds since midnight)
|
||||||
|
* - timestamp (in microseconds since 1970-01-01)
|
||||||
|
* - timestamptz (timestamp with timezone, stores values in UTC timezone)
|
||||||
|
* - string
|
||||||
|
* - uuid
|
||||||
|
* - fixed(L) (fixed-length byte array of length L)
|
||||||
|
* - binary
|
||||||
|
* - Complex types:
|
||||||
|
* - struct(field1: Type1, field2: Type2, ...) (tuple of typed values)
|
||||||
|
* - list(nested_type)
|
||||||
|
* - map(Key, Value)
|
||||||
|
*
|
||||||
|
* Example of table schema in metadata:
|
||||||
|
* {
|
||||||
|
* "type" : "struct",
|
||||||
|
* "schema-id" : 0,
|
||||||
|
* "fields" : [
|
||||||
|
* {
|
||||||
|
* "id" : 1,
|
||||||
|
* "name" : "id",
|
||||||
|
* "required" : false,
|
||||||
|
* "type" : "long"
|
||||||
|
* },
|
||||||
|
* {
|
||||||
|
* "id" : 2,
|
||||||
|
* "name" : "array",
|
||||||
|
* "required" : false,
|
||||||
|
* "type" : {
|
||||||
|
* "type" : "list",
|
||||||
|
* "element-id" : 5,
|
||||||
|
* "element" : "int",
|
||||||
|
* "element-required" : false
|
||||||
|
* },
|
||||||
|
* {
|
||||||
|
* "id" : 3,
|
||||||
|
* "name" : "data",
|
||||||
|
* "required" : false,
|
||||||
|
* "type" : "binary"
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
*/
|
||||||
|
|
||||||
|
DataTypePtr getSimpleTypeByName(const String & type_name)
|
||||||
|
{
|
||||||
|
if (type_name == "boolean")
|
||||||
|
return DataTypeFactory::instance().get("Bool");
|
||||||
|
if (type_name == "int")
|
||||||
|
return std::make_shared<DataTypeInt32>();
|
||||||
|
if (type_name == "long")
|
||||||
|
return std::make_shared<DataTypeInt64>();
|
||||||
|
if (type_name == "float")
|
||||||
|
return std::make_shared<DataTypeFloat32>();
|
||||||
|
if (type_name == "double")
|
||||||
|
return std::make_shared<DataTypeFloat64>();
|
||||||
|
if (type_name == "date")
|
||||||
|
return std::make_shared<DataTypeDate>();
|
||||||
|
/// Time type represents time of the day in microseconds since midnight.
|
||||||
|
/// We don't have similar type for it, let's use just Int64.
|
||||||
|
if (type_name == "time")
|
||||||
|
return std::make_shared<DataTypeInt64>();
|
||||||
|
if (type_name == "timestamp")
|
||||||
|
return std::make_shared<DataTypeDateTime64>(6);
|
||||||
|
if (type_name == "timestamptz")
|
||||||
|
return std::make_shared<DataTypeDateTime64>(6, "UTC");
|
||||||
|
if (type_name == "string" || type_name == "binary")
|
||||||
|
return std::make_shared<DataTypeString>();
|
||||||
|
if (type_name == "uuid")
|
||||||
|
return std::make_shared<DataTypeUUID>();
|
||||||
|
|
||||||
|
if (type_name.starts_with("fixed[") && type_name.ends_with(']'))
|
||||||
|
{
|
||||||
|
ReadBufferFromString buf(std::string_view(type_name.begin() + 6, type_name.end() - 1));
|
||||||
|
size_t n;
|
||||||
|
readIntText(n, buf);
|
||||||
|
return std::make_shared<DataTypeFixedString>(n);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type_name.starts_with("decimal(") && type_name.ends_with(')'))
|
||||||
|
{
|
||||||
|
ReadBufferFromString buf(std::string_view(type_name.begin() + 8, type_name.end() - 1));
|
||||||
|
size_t precision;
|
||||||
|
size_t scale;
|
||||||
|
readIntText(precision, buf);
|
||||||
|
skipWhitespaceIfAny(buf);
|
||||||
|
assertChar(',', buf);
|
||||||
|
skipWhitespaceIfAny(buf);
|
||||||
|
tryReadIntText(scale, buf);
|
||||||
|
return createDecimal<DataTypeDecimal>(precision, scale);
|
||||||
|
}
|
||||||
|
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown Iceberg type: {}", type_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
DataTypePtr getFieldType(const Poco::Dynamic::Var & type, bool required);
|
||||||
|
|
||||||
|
DataTypePtr getComplexTypeFromObject(const Poco::JSON::Object::Ptr & type)
|
||||||
|
{
|
||||||
|
String type_name = type->getValue<String>("type");
|
||||||
|
if (type_name == "list")
|
||||||
|
{
|
||||||
|
bool element_required = type->getValue<bool>("element-required");
|
||||||
|
auto element_type = getFieldType(type->get("element"), element_required);
|
||||||
|
return std::make_shared<DataTypeArray>(element_type);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type_name == "map")
|
||||||
|
{
|
||||||
|
auto key_type = getFieldType(type->get("key"), true);
|
||||||
|
auto value_required = type->getValue<bool>("value-required");
|
||||||
|
auto value_type = getFieldType(type->get("value"), value_required);
|
||||||
|
return std::make_shared<DataTypeMap>(key_type, value_type);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type_name == "struct")
|
||||||
|
{
|
||||||
|
DataTypes element_types;
|
||||||
|
Names element_names;
|
||||||
|
auto fields = type->get("fields").extract<Poco::JSON::Array::Ptr>();
|
||||||
|
element_types.reserve(fields->size());
|
||||||
|
element_names.reserve(fields->size());
|
||||||
|
for (size_t i = 0; i != fields->size(); ++i)
|
||||||
|
{
|
||||||
|
auto field = fields->getObject(static_cast<Int32>(i));
|
||||||
|
element_names.push_back(field->getValue<String>("name"));
|
||||||
|
auto required = field->getValue<bool>("required");
|
||||||
|
element_types.push_back(getFieldType(field->get("type"), required));
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::make_shared<DataTypeTuple>(element_types, element_names);
|
||||||
|
}
|
||||||
|
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown Iceberg type: {}", type_name);
|
||||||
|
}
|
||||||
|
|
||||||
|
DataTypePtr getFieldType(const Poco::Dynamic::Var & type, bool required)
|
||||||
|
{
|
||||||
|
if (type.isString())
|
||||||
|
{
|
||||||
|
const String & type_name = type.extract<String>();
|
||||||
|
auto data_type = getSimpleTypeByName(type_name);
|
||||||
|
return required ? data_type : makeNullable(data_type);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (type.isStruct())
|
||||||
|
return getComplexTypeFromObject(type.extract<Poco::JSON::Object::Ptr>());
|
||||||
|
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected 'type' field: {}", type.toString());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
std::pair<NamesAndTypesList, Int32> parseTableSchema(const Poco::JSON::Object::Ptr & metadata_object, int format_version)
|
||||||
|
{
|
||||||
|
Poco::JSON::Object::Ptr schema;
|
||||||
|
Int32 current_schema_id;
|
||||||
|
|
||||||
|
/// First check if schema was evolved, because we don't support it yet.
|
||||||
|
/// For version 2 we can check it by using field schemas, but in version 1
|
||||||
|
/// this field is optional and we will check it later during parsing manifest files
|
||||||
|
/// (we will compare schema id from manifest file and currently used schema).
|
||||||
|
if (format_version == 2)
|
||||||
|
{
|
||||||
|
current_schema_id = metadata_object->getValue<int>("current-schema-id");
|
||||||
|
auto schemas = metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>();
|
||||||
|
if (schemas->size() > 1)
|
||||||
|
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not supported");
|
||||||
|
for (size_t i = 0; i != schemas->size(); ++i)
|
||||||
|
{
|
||||||
|
auto current_schema = schemas->getObject(static_cast<UInt32>(i));
|
||||||
|
if (current_schema->getValue<int>("schema-id") == current_schema_id)
|
||||||
|
{
|
||||||
|
schema = current_schema;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
schema = metadata_object->getObject("schema");
|
||||||
|
current_schema_id = schema->getValue<int>("schema-id");
|
||||||
|
/// Field "schemas" is optional for version 1, but after version 2 was introduced,
|
||||||
|
/// in most cases this field is added for new tables in version 1 as well.
|
||||||
|
if (metadata_object->has("schemas") && metadata_object->get("schemas").extract<Poco::JSON::Array::Ptr>()->size() > 1)
|
||||||
|
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not supported");
|
||||||
|
}
|
||||||
|
|
||||||
|
NamesAndTypesList names_and_types;
|
||||||
|
auto fields = schema->get("fields").extract<Poco::JSON::Array::Ptr>();
|
||||||
|
for (size_t i = 0; i != fields->size(); ++i)
|
||||||
|
{
|
||||||
|
auto field = fields->getObject(static_cast<UInt32>(i));
|
||||||
|
auto name = field->getValue<String>("name");
|
||||||
|
bool required = field->getValue<bool>("required");
|
||||||
|
names_and_types.push_back({name, getFieldType(field->get("type"), required)});
|
||||||
|
}
|
||||||
|
|
||||||
|
return {std::move(names_and_types), current_schema_id};
|
||||||
|
}
|
||||||
|
|
||||||
|
MutableColumns parseAvro(
|
||||||
|
avro::DataFileReaderBase & file_reader,
|
||||||
|
const Block & header,
|
||||||
|
const FormatSettings & settings)
|
||||||
|
{
|
||||||
|
auto deserializer = std::make_unique<AvroDeserializer>(header, file_reader.dataSchema(), true, true, settings);
|
||||||
|
MutableColumns columns = header.cloneEmptyColumns();
|
||||||
|
|
||||||
|
file_reader.init();
|
||||||
|
RowReadExtension ext;
|
||||||
|
while (file_reader.hasMore())
|
||||||
|
{
|
||||||
|
file_reader.decr();
|
||||||
|
deserializer->deserializeRow(columns, file_reader.decoder(), ext);
|
||||||
|
}
|
||||||
|
return columns;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Each version of table metadata is stored in a `metadata` directory and
|
||||||
|
* has format: v<V>.metadata.json, where V - metadata version.
|
||||||
|
*/
|
||||||
|
std::pair<Int32, String> getMetadataFileAndVersion(const StorageS3::Configuration & configuration)
|
||||||
|
{
|
||||||
|
const auto metadata_files = S3DataLakeMetadataReadHelper::listFiles(configuration, "metadata", ".metadata.json");
|
||||||
|
if (metadata_files.empty())
|
||||||
|
{
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::FILE_DOESNT_EXIST,
|
||||||
|
"The metadata file for Iceberg table with path {} doesn't exist",
|
||||||
|
configuration.url.key);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<std::pair<UInt32, String>> metadata_files_with_versions;
|
||||||
|
metadata_files_with_versions.reserve(metadata_files.size());
|
||||||
|
for (const auto & file : metadata_files)
|
||||||
|
{
|
||||||
|
String version_str(file.begin() + 1, file.begin() + file.find_first_of('.'));
|
||||||
|
if (!std::all_of(version_str.begin(), version_str.end(), isdigit))
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Bad metadata file name: {}. Expected vN.metadata.json where N is a number", file);
|
||||||
|
metadata_files_with_versions.emplace_back(std::stoi(version_str), file);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the latest version of metadata file: v<V>.metadata.json
|
||||||
|
return *std::max_element(metadata_files_with_versions.begin(), metadata_files_with_versions.end());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
std::unique_ptr<IcebergMetadata> parseIcebergMetadata(const StorageS3::Configuration & configuration, ContextPtr context_)
|
||||||
|
{
|
||||||
|
const auto [metadata_version, metadata_file_path] = getMetadataFileAndVersion(configuration);
|
||||||
|
LOG_DEBUG(&Poco::Logger::get("IcebergMetadata"), "Parse metadata {}", metadata_file_path);
|
||||||
|
auto buf = S3DataLakeMetadataReadHelper::createReadBuffer(metadata_file_path, context_, configuration);
|
||||||
|
String json_str;
|
||||||
|
readJSONObjectPossiblyInvalid(json_str, *buf);
|
||||||
|
|
||||||
|
Poco::JSON::Parser parser; /// For some reason base/base/JSON.h can not parse this json file
|
||||||
|
Poco::Dynamic::Var json = parser.parse(json_str);
|
||||||
|
Poco::JSON::Object::Ptr object = json.extract<Poco::JSON::Object::Ptr>();
|
||||||
|
|
||||||
|
auto format_version = object->getValue<int>("format-version");
|
||||||
|
auto [schema, schema_id] = parseTableSchema(object, format_version);
|
||||||
|
|
||||||
|
auto current_snapshot_id = object->getValue<Int64>("current-snapshot-id");
|
||||||
|
auto snapshots = object->get("snapshots").extract<Poco::JSON::Array::Ptr>();
|
||||||
|
|
||||||
|
String manifest_list_file;
|
||||||
|
for (size_t i = 0; i < snapshots->size(); ++i)
|
||||||
|
{
|
||||||
|
const auto snapshot = snapshots->getObject(static_cast<UInt32>(i));
|
||||||
|
if (snapshot->getValue<Int64>("snapshot-id") == current_snapshot_id)
|
||||||
|
{
|
||||||
|
const auto path = snapshot->getValue<String>("manifest-list");
|
||||||
|
manifest_list_file = std::filesystem::path(configuration.url.key) / "metadata" / std::filesystem::path(path).filename();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return std::make_unique<IcebergMetadata>(configuration, context_, metadata_version, format_version, manifest_list_file, schema_id, schema);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Manifest file has the following format: '/iceberg_data/db/table_name/metadata/c87bfec7-d36c-4075-ad04-600b6b0f2020-m0.avro'
|
||||||
|
*
|
||||||
|
* `manifest file` is different in format version V1 and V2 and has the following contents:
|
||||||
|
* v1 v2
|
||||||
|
* status req req
|
||||||
|
* snapshot_id req opt
|
||||||
|
* sequence_number opt
|
||||||
|
* file_sequence_number opt
|
||||||
|
* data_file req req
|
||||||
|
* Example format version V1:
|
||||||
|
* ┌─status─┬─────────snapshot_id─┬─data_file───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
|
||||||
|
* │ 1 │ 2819310504515118887 │ ('/iceberg_data/db/table_name/data/00000-1-3edca534-15a0-4f74-8a28-4733e0bf1270-00001.parquet','PARQUET',(),100,1070,67108864,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],0) │
|
||||||
|
* └────────┴─────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
|
||||||
|
* Example format version V2:
|
||||||
|
* ┌─status─┬─────────snapshot_id─┬─sequence_number─┬─file_sequence_number─┬─data_file───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
|
||||||
|
* │ 1 │ 5887006101709926452 │ ᴺᵁᴸᴸ │ ᴺᵁᴸᴸ │ (0,'/iceberg_data/db/table_name/data/00000-1-c8045c90-8799-4eac-b957-79a0484e223c-00001.parquet','PARQUET',(),100,1070,[(1,233),(2,210)],[(1,100),(2,100)],[(1,0),(2,0)],[],[(1,'\0'),(2,'0')],[(1,'c'),(2,'99')],NULL,[4],[],0) │
|
||||||
|
* └────────┴─────────────────────┴─────────────────┴──────────────────────┴─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
|
||||||
|
* In case of partitioned data we'll have extra directory partition=value:
|
||||||
|
* ─status─┬─────────snapshot_id─┬─data_file──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
|
||||||
|
* │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=0/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00001.parquet','PARQUET',(0),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0\0'),(2,'1')],[(1,'\0\0\0\0\0\0\0\0'),(2,'1')],NULL,[4],0) │
|
||||||
|
* │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=1/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00002.parquet','PARQUET',(1),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0'),(2,'2')],[(1,'\0\0\0\0\0\0\0'),(2,'2')],NULL,[4],0) │
|
||||||
|
* │ 1 │ 2252246380142525104 │ ('/iceberg_data/db/table_name/data/a=2/00000-1-c9535a00-2f4f-405c-bcfa-6d4f9f477235-00003.parquet','PARQUET',(2),1,631,67108864,[(1,46),(2,48)],[(1,1),(2,1)],[(1,0),(2,0)],[],[(1,'\0\0\0\0\0\0\0'),(2,'3')],[(1,'\0\0\0\0\0\0\0'),(2,'3')],NULL,[4],0) │
|
||||||
|
* └────────┴─────────────────────┴────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
|
||||||
|
*/
|
||||||
|
Strings IcebergMetadata::getDataFiles()
|
||||||
|
{
|
||||||
|
if (!data_files.empty())
|
||||||
|
return data_files;
|
||||||
|
|
||||||
|
Strings manifest_files;
|
||||||
|
if (manifest_list_file.empty())
|
||||||
|
return data_files;
|
||||||
|
|
||||||
|
LOG_TEST(log, "Collect manifest files from manifest list {}", manifest_list_file);
|
||||||
|
|
||||||
|
auto manifest_list_buf = S3DataLakeMetadataReadHelper::createReadBuffer(manifest_list_file, getContext(), configuration);
|
||||||
|
auto manifest_list_file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*manifest_list_buf));
|
||||||
|
|
||||||
|
auto data_type = AvroSchemaReader::avroNodeToDataType(manifest_list_file_reader->dataSchema().root()->leafAt(0));
|
||||||
|
Block header{{data_type->createColumn(), data_type, "manifest_path"}};
|
||||||
|
auto columns = parseAvro(*manifest_list_file_reader, header, getFormatSettings(getContext()));
|
||||||
|
auto & col = columns.at(0);
|
||||||
|
|
||||||
|
if (col->getDataType() != TypeIndex::String)
|
||||||
|
{
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::ILLEGAL_COLUMN,
|
||||||
|
"The parsed column from Avro file of `manifest_path` field should be String type, got {}",
|
||||||
|
col->getFamilyName());
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto * col_str = typeid_cast<ColumnString *>(col.get());
|
||||||
|
for (size_t i = 0; i < col_str->size(); ++i)
|
||||||
|
{
|
||||||
|
const auto file_path = col_str->getDataAt(i).toView();
|
||||||
|
const auto filename = std::filesystem::path(file_path).filename();
|
||||||
|
manifest_files.emplace_back(std::filesystem::path(configuration.url.key) / "metadata_path" / filename);
|
||||||
|
}
|
||||||
|
|
||||||
|
NameSet files;
|
||||||
|
LOG_TEST(log, "Collect data files");
|
||||||
|
for (const auto & manifest_file : manifest_files)
|
||||||
|
{
|
||||||
|
LOG_TEST(log, "Process manifest file {}", manifest_file);
|
||||||
|
|
||||||
|
auto buffer = S3DataLakeMetadataReadHelper::createReadBuffer(manifest_file, getContext(), configuration);
|
||||||
|
auto manifest_file_reader = std::make_unique<avro::DataFileReaderBase>(std::make_unique<AvroInputStreamReadBufferAdapter>(*buffer));
|
||||||
|
|
||||||
|
/// Manifest file should always have table schema in avro file metadata. By now we don't support tables with evolved schema,
|
||||||
|
/// so we should check if all manifest files have the same schema as in table metadata.
|
||||||
|
auto avro_metadata = manifest_file_reader->metadata();
|
||||||
|
std::vector<uint8_t> schema_json = avro_metadata["schema"];
|
||||||
|
String schema_json_string = String(reinterpret_cast<char *>(schema_json.data()), schema_json.size());
|
||||||
|
Poco::JSON::Parser parser;
|
||||||
|
Poco::Dynamic::Var json = parser.parse(schema_json_string);
|
||||||
|
Poco::JSON::Object::Ptr schema_object = json.extract<Poco::JSON::Object::Ptr>();
|
||||||
|
if (schema_object->getValue<int>("schema-id") != current_schema_id)
|
||||||
|
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: the table schema has been changed at least 1 time, reading tables with evolved schema is not supported");
|
||||||
|
|
||||||
|
avro::NodePtr root_node = manifest_file_reader->dataSchema().root();
|
||||||
|
size_t leaves_num = root_node->leaves();
|
||||||
|
size_t expected_min_num = format_version == 1 ? 3 : 2;
|
||||||
|
if (leaves_num < expected_min_num)
|
||||||
|
{
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::BAD_ARGUMENTS,
|
||||||
|
"Unexpected number of columns {}. Expected at least {}",
|
||||||
|
root_node->leaves(), expected_min_num);
|
||||||
|
}
|
||||||
|
|
||||||
|
avro::NodePtr status_node = root_node->leafAt(0);
|
||||||
|
if (status_node->type() != avro::Type::AVRO_INT)
|
||||||
|
{
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::ILLEGAL_COLUMN,
|
||||||
|
"The parsed column from Avro file of `status` field should be Int type, got {}",
|
||||||
|
magic_enum::enum_name(status_node->type()));
|
||||||
|
}
|
||||||
|
|
||||||
|
avro::NodePtr data_file_node = root_node->leafAt(static_cast<int>(leaves_num) - 1);
|
||||||
|
if (data_file_node->type() != avro::Type::AVRO_RECORD)
|
||||||
|
{
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::ILLEGAL_COLUMN,
|
||||||
|
"The parsed column from Avro file of `data_file` field should be Tuple type, got {}",
|
||||||
|
magic_enum::enum_name(data_file_node->type()));
|
||||||
|
}
|
||||||
|
|
||||||
|
auto status_col_data_type = AvroSchemaReader::avroNodeToDataType(status_node);
|
||||||
|
auto data_col_data_type = AvroSchemaReader::avroNodeToDataType(data_file_node);
|
||||||
|
Block manifest_file_header{
|
||||||
|
{status_col_data_type->createColumn(), status_col_data_type, "status"},
|
||||||
|
{data_col_data_type->createColumn(), data_col_data_type, "data_file"}};
|
||||||
|
|
||||||
|
columns = parseAvro(*manifest_file_reader, manifest_file_header, getFormatSettings(getContext()));
|
||||||
|
if (columns.size() != 2)
|
||||||
|
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Unexpected number of columns. Expected 2, got {}", columns.size());
|
||||||
|
|
||||||
|
if (columns.at(0)->getDataType() != TypeIndex::Int32)
|
||||||
|
{
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::ILLEGAL_COLUMN,
|
||||||
|
"The parsed column from Avro file of `status` field should be Int32 type, got {}",
|
||||||
|
columns.at(0)->getFamilyName());
|
||||||
|
}
|
||||||
|
if (columns.at(1)->getDataType() != TypeIndex::Tuple)
|
||||||
|
{
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::ILLEGAL_COLUMN,
|
||||||
|
"The parsed column from Avro file of `file_path` field should be Tuple type, got {}",
|
||||||
|
columns.at(1)->getFamilyName());
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto * status_int_column = assert_cast<ColumnInt32 *>(columns.at(0).get());
|
||||||
|
const auto & data_file_tuple_type = assert_cast<const DataTypeTuple &>(*data_type.get());
|
||||||
|
const auto * data_file_tuple_column = assert_cast<ColumnTuple *>(columns.at(1).get());
|
||||||
|
|
||||||
|
if (status_int_column->size() != data_file_tuple_column->size())
|
||||||
|
{
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::ILLEGAL_COLUMN,
|
||||||
|
"The parsed column from Avro file of `file_path` and `status` have different rows number: {} and {}",
|
||||||
|
status_int_column->size(),
|
||||||
|
data_file_tuple_column->size());
|
||||||
|
}
|
||||||
|
|
||||||
|
ColumnPtr file_path_column = data_file_tuple_column->getColumnPtr(data_file_tuple_type.getPositionByName("file_path"));
|
||||||
|
|
||||||
|
if (file_path_column->getDataType() != TypeIndex::String)
|
||||||
|
{
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::ILLEGAL_COLUMN,
|
||||||
|
"The parsed column from Avro file of `file_path` field should be String type, got {}",
|
||||||
|
file_path_column->getFamilyName());
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto * file_path_string_column = assert_cast<const ColumnString *>(file_path_column.get());
|
||||||
|
|
||||||
|
ColumnPtr content_column;
|
||||||
|
const ColumnInt32 * content_int_column = nullptr;
|
||||||
|
if (format_version == 2)
|
||||||
|
{
|
||||||
|
content_column = data_file_tuple_column->getColumnPtr(data_file_tuple_type.getPositionByName("content"));
|
||||||
|
if (content_column->getDataType() != TypeIndex::Int32)
|
||||||
|
{
|
||||||
|
throw Exception(
|
||||||
|
ErrorCodes::ILLEGAL_COLUMN,
|
||||||
|
"The parsed column from Avro file of `content` field should be Int type, got {}",
|
||||||
|
content_column->getFamilyName());
|
||||||
|
}
|
||||||
|
|
||||||
|
content_int_column = assert_cast<const ColumnInt32 *>(content_column.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
for (size_t i = 0; i < data_file_tuple_column->size(); ++i)
|
||||||
|
{
|
||||||
|
if (format_version == 2)
|
||||||
|
{
|
||||||
|
Int32 content_type = content_int_column->getElement(i);
|
||||||
|
if (DataFileContent(content_type) != DataFileContent::DATA)
|
||||||
|
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Cannot read Iceberg table: positional and equality deletes are not supported");
|
||||||
|
}
|
||||||
|
|
||||||
|
const auto status = status_int_column->getInt(i);
|
||||||
|
const auto data_path = std::string(file_path_string_column->getDataAt(i).toView());
|
||||||
|
const auto pos = data_path.find(configuration.url.key);
|
||||||
|
const auto file_path = data_path.substr(pos);
|
||||||
|
if (pos == std::string::npos)
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected to find {} in data path: {}", configuration.url.key, data_path);
|
||||||
|
|
||||||
|
if (ManifestEntryStatus(status) == ManifestEntryStatus::DELETED)
|
||||||
|
{
|
||||||
|
LOG_TEST(log, "Processing delete file for path: {}", file_path);
|
||||||
|
chassert(!files.contains(file_path));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
files.insert(file_path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return data_files;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
95
src/Storages/DataLakes/Iceberg/IcebergMetadata.h
Normal file
95
src/Storages/DataLakes/Iceberg/IcebergMetadata.h
Normal file
@ -0,0 +1,95 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#if USE_AWS_S3 && USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
|
||||||
|
|
||||||
|
#include <Storages/StorageS3.h>
|
||||||
|
#include <Interpreters/Context_fwd.h>
|
||||||
|
#include <Core/Types.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Useful links:
|
||||||
|
* - https://iceberg.apache.org/spec/
|
||||||
|
*
|
||||||
|
* Iceberg has two format versions, v1 and v2. The content of metadata files depends on the version.
|
||||||
|
*
|
||||||
|
* Unlike DeltaLake, Iceberg has several metadata layers: `table metadata`, `manifest list` and `manifest_files`.
|
||||||
|
* Metadata file - json file.
|
||||||
|
* Manifest list – an Avro file that lists manifest files; one per snapshot.
|
||||||
|
* Manifest file – an Avro file that lists data or delete files; a subset of a snapshot.
|
||||||
|
* All changes to table state create a new metadata file and replace the old metadata with an atomic swap.
|
||||||
|
*
|
||||||
|
* In order to find out which data files to read, we need to find the `manifest list`
|
||||||
|
* which corresponds to the latest snapshot. We find it by checking a list of snapshots
|
||||||
|
* in metadata's "snapshots" section.
|
||||||
|
*
|
||||||
|
* Example of metadata.json file.
|
||||||
|
* {
|
||||||
|
* "format-version" : 1,
|
||||||
|
* "table-uuid" : "ca2965ad-aae2-4813-8cf7-2c394e0c10f5",
|
||||||
|
* "location" : "/iceberg_data/db/table_name",
|
||||||
|
* "last-updated-ms" : 1680206743150,
|
||||||
|
* "last-column-id" : 2,
|
||||||
|
* "schema" : { "type" : "struct", "schema-id" : 0, "fields" : [ {<field1_info>}, {<field2_info>}, ... ] },
|
||||||
|
* "current-schema-id" : 0,
|
||||||
|
* "schemas" : [ ],
|
||||||
|
* ...
|
||||||
|
* "current-snapshot-id" : 2819310504515118887,
|
||||||
|
* "refs" : { "main" : { "snapshot-id" : 2819310504515118887, "type" : "branch" } },
|
||||||
|
* "snapshots" : [ {
|
||||||
|
* "snapshot-id" : 2819310504515118887,
|
||||||
|
* "timestamp-ms" : 1680206743150,
|
||||||
|
* "summary" : {
|
||||||
|
* "operation" : "append", "spark.app.id" : "local-1680206733239",
|
||||||
|
* "added-data-files" : "1", "added-records" : "100",
|
||||||
|
* "added-files-size" : "1070", "changed-partition-count" : "1",
|
||||||
|
* "total-records" : "100", "total-files-size" : "1070", "total-data-files" : "1", "total-delete-files" : "0",
|
||||||
|
* "total-position-deletes" : "0", "total-equality-deletes" : "0"
|
||||||
|
* },
|
||||||
|
* "manifest-list" : "/iceberg_data/db/table_name/metadata/snap-2819310504515118887-1-c87bfec7-d36c-4075-ad04-600b6b0f2020.avro",
|
||||||
|
* "schema-id" : 0
|
||||||
|
* } ],
|
||||||
|
* "statistics" : [ ],
|
||||||
|
* "snapshot-log" : [ ... ],
|
||||||
|
* "metadata-log" : [ ]
|
||||||
|
* }
|
||||||
|
*/
|
||||||
|
class IcebergMetadata : WithContext
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
IcebergMetadata(const StorageS3::Configuration & configuration_,
|
||||||
|
ContextPtr context_,
|
||||||
|
Int32 metadata_version_,
|
||||||
|
Int32 format_version_,
|
||||||
|
String manifest_list_file_,
|
||||||
|
Int32 current_schema_id_,
|
||||||
|
NamesAndTypesList schema_);
|
||||||
|
|
||||||
|
/// Get data files. On first request it reads manifest_list file and iterates through manifest files to find all data files.
|
||||||
|
/// All subsequent calls will return saved list of files (because it cannot be changed without changing metadata file)
|
||||||
|
Strings getDataFiles();
|
||||||
|
|
||||||
|
/// Get table schema parsed from metadata.
|
||||||
|
NamesAndTypesList getTableSchema() const { return schema; }
|
||||||
|
|
||||||
|
size_t getVersion() const { return metadata_version; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
const StorageS3::Configuration configuration;
|
||||||
|
Int32 metadata_version;
|
||||||
|
Int32 format_version;
|
||||||
|
String manifest_list_file;
|
||||||
|
Int32 current_schema_id;
|
||||||
|
NamesAndTypesList schema;
|
||||||
|
Strings data_files;
|
||||||
|
Poco::Logger * log;
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
std::unique_ptr<IcebergMetadata> parseIcebergMetadata(const StorageS3::Configuration & configuration, ContextPtr context);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
69
src/Storages/DataLakes/Iceberg/StorageIceberg.cpp
Normal file
69
src/Storages/DataLakes/Iceberg/StorageIceberg.cpp
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
#include <Storages/DataLakes/Iceberg/StorageIceberg.h>
|
||||||
|
|
||||||
|
#if USE_AWS_S3 && USE_AVRO
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
StoragePtr StorageIceberg::create(
|
||||||
|
const DB::StorageIceberg::Configuration & base_configuration,
|
||||||
|
DB::ContextPtr context_,
|
||||||
|
const DB::StorageID & table_id_,
|
||||||
|
const DB::ColumnsDescription & columns_,
|
||||||
|
const DB::ConstraintsDescription & constraints_,
|
||||||
|
const DB::String & comment,
|
||||||
|
std::optional<FormatSettings> format_settings_)
|
||||||
|
{
|
||||||
|
auto configuration{base_configuration};
|
||||||
|
configuration.update(context_);
|
||||||
|
auto metadata = parseIcebergMetadata(configuration, context_);
|
||||||
|
auto schema_from_metadata = metadata->getTableSchema();
|
||||||
|
return std::make_shared<StorageIceberg>(std::move(metadata), configuration, context_, table_id_, columns_.empty() ? ColumnsDescription(schema_from_metadata) : columns_, constraints_, comment, format_settings_);
|
||||||
|
}
|
||||||
|
|
||||||
|
StorageIceberg::StorageIceberg(
|
||||||
|
std::unique_ptr<IcebergMetadata> metadata_,
|
||||||
|
const Configuration & configuration_,
|
||||||
|
ContextPtr context_,
|
||||||
|
const StorageID & table_id_,
|
||||||
|
const ColumnsDescription & columns_,
|
||||||
|
const ConstraintsDescription & constraints_,
|
||||||
|
const String & comment,
|
||||||
|
std::optional<FormatSettings> format_settings_)
|
||||||
|
: StorageS3(configuration_, context_, table_id_, columns_, constraints_, comment, format_settings_)
|
||||||
|
, current_metadata(std::move(metadata_))
|
||||||
|
, base_configuration(configuration_)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
ColumnsDescription StorageIceberg::getTableStructureFromMetadata(
|
||||||
|
Configuration & base_configuration,
|
||||||
|
const std::optional<FormatSettings> &,
|
||||||
|
ContextPtr local_context)
|
||||||
|
{
|
||||||
|
auto configuration{base_configuration};
|
||||||
|
configuration.update(local_context);
|
||||||
|
auto metadata = parseIcebergMetadata(configuration, local_context);
|
||||||
|
return ColumnsDescription(metadata->getTableSchema());
|
||||||
|
}
|
||||||
|
|
||||||
|
void StorageIceberg::updateConfigurationImpl(ContextPtr local_context)
|
||||||
|
{
|
||||||
|
const bool updated = base_configuration.update(local_context);
|
||||||
|
auto new_metadata = parseIcebergMetadata(base_configuration, local_context);
|
||||||
|
/// Check if nothing was changed.
|
||||||
|
if (!updated && new_metadata->getVersion() == current_metadata->getVersion())
|
||||||
|
return;
|
||||||
|
|
||||||
|
if (new_metadata->getVersion() != current_metadata->getVersion())
|
||||||
|
current_metadata = std::move(new_metadata);
|
||||||
|
|
||||||
|
auto updated_configuration{base_configuration};
|
||||||
|
/// If metadata wasn't changed, we won't list data files again.
|
||||||
|
updated_configuration.keys = current_metadata->getDataFiles();
|
||||||
|
StorageS3::useConfiguration(updated_configuration);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
80
src/Storages/DataLakes/Iceberg/StorageIceberg.h
Normal file
80
src/Storages/DataLakes/Iceberg/StorageIceberg.h
Normal file
@ -0,0 +1,80 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "config.h"
|
||||||
|
|
||||||
|
#if USE_AWS_S3 && USE_AVRO
|
||||||
|
|
||||||
|
# include <filesystem>
|
||||||
|
# include <Formats/FormatFactory.h>
|
||||||
|
# include <Storages/DataLakes/Iceberg/IcebergMetadata.h>
|
||||||
|
# include <Storages/IStorage.h>
|
||||||
|
# include <Storages/StorageFactory.h>
|
||||||
|
# include <Storages/StorageS3.h>
|
||||||
|
# include <Common/logger_useful.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
|
||||||
|
class StorageIceberg : public StorageS3
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
static constexpr auto name = "Iceberg";
|
||||||
|
|
||||||
|
using Configuration = StorageS3::Configuration;
|
||||||
|
|
||||||
|
static StoragePtr create(const Configuration & base_configuration,
|
||||||
|
ContextPtr context_,
|
||||||
|
const StorageID & table_id_,
|
||||||
|
const ColumnsDescription & columns_,
|
||||||
|
const ConstraintsDescription & constraints_,
|
||||||
|
const String & comment,
|
||||||
|
std::optional<FormatSettings> format_settings_);
|
||||||
|
|
||||||
|
StorageIceberg(
|
||||||
|
std::unique_ptr<IcebergMetadata> metadata_,
|
||||||
|
const Configuration & configuration_,
|
||||||
|
ContextPtr context_,
|
||||||
|
const StorageID & table_id_,
|
||||||
|
const ColumnsDescription & columns_,
|
||||||
|
const ConstraintsDescription & constraints_,
|
||||||
|
const String & comment,
|
||||||
|
std::optional<FormatSettings> format_settings_);
|
||||||
|
|
||||||
|
String getName() const override { return name; }
|
||||||
|
|
||||||
|
static ColumnsDescription getTableStructureFromMetadata(
|
||||||
|
Configuration & base_configuration,
|
||||||
|
const std::optional<FormatSettings> &,
|
||||||
|
ContextPtr local_context);
|
||||||
|
|
||||||
|
static Configuration getConfiguration(ASTs & engine_args, ContextPtr local_context)
|
||||||
|
{
|
||||||
|
return StorageS3::getConfiguration(engine_args, local_context, /* get_format_from_file */false);
|
||||||
|
}
|
||||||
|
|
||||||
|
Configuration updateConfigurationAndGetCopy(ContextPtr local_context) override
|
||||||
|
{
|
||||||
|
std::lock_guard lock(configuration_update_mutex);
|
||||||
|
updateConfigurationImpl(local_context);
|
||||||
|
return StorageS3::getConfiguration();
|
||||||
|
}
|
||||||
|
|
||||||
|
void updateConfiguration(ContextPtr local_context) override
|
||||||
|
{
|
||||||
|
std::lock_guard lock(configuration_update_mutex);
|
||||||
|
updateConfigurationImpl(local_context);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
void updateConfigurationImpl(ContextPtr local_context);
|
||||||
|
|
||||||
|
std::unique_ptr<IcebergMetadata> current_metadata;
|
||||||
|
Configuration base_configuration;
|
||||||
|
std::mutex configuration_update_mutex;
|
||||||
|
};
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
@ -1,26 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#if USE_AVRO /// StorageIceberg depending on Avro to parse metadata with Avro format.
|
|
||||||
|
|
||||||
#include <Interpreters/Context_fwd.h>
|
|
||||||
#include <Core/Types.h>
|
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
|
|
||||||
template <typename Configuration, typename MetadataReadHelper>
|
|
||||||
struct IcebergMetadataParser
|
|
||||||
{
|
|
||||||
public:
|
|
||||||
IcebergMetadataParser<Configuration, MetadataReadHelper>();
|
|
||||||
|
|
||||||
Strings getFiles(const Configuration & configuration, ContextPtr context);
|
|
||||||
|
|
||||||
private:
|
|
||||||
struct Impl;
|
|
||||||
std::shared_ptr<Impl> impl;
|
|
||||||
};
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif
|
|
@ -1,25 +0,0 @@
|
|||||||
#pragma once
|
|
||||||
|
|
||||||
#include <Storages/IStorage.h>
|
|
||||||
#include <Storages/DataLakes/IStorageDataLake.h>
|
|
||||||
#include <Storages/DataLakes/IcebergMetadataParser.h>
|
|
||||||
#include "config.h"
|
|
||||||
|
|
||||||
#if USE_AWS_S3 && USE_AVRO
|
|
||||||
#include <Storages/DataLakes/S3MetadataReader.h>
|
|
||||||
#include <Storages/StorageS3.h>
|
|
||||||
#endif
|
|
||||||
|
|
||||||
namespace DB
|
|
||||||
{
|
|
||||||
|
|
||||||
struct StorageIcebergName
|
|
||||||
{
|
|
||||||
static constexpr auto name = "Iceberg";
|
|
||||||
};
|
|
||||||
|
|
||||||
#if USE_AWS_S3 && USE_AVRO
|
|
||||||
using StorageIcebergS3 = IStorageDataLake<StorageS3, StorageIcebergName, IcebergMetadataParser<StorageS3::Configuration, S3DataLakeMetadataReadHelper>>;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
}
|
|
@ -4,7 +4,7 @@
|
|||||||
#if USE_AWS_S3
|
#if USE_AWS_S3
|
||||||
|
|
||||||
#include <Storages/DataLakes/StorageDeltaLake.h>
|
#include <Storages/DataLakes/StorageDeltaLake.h>
|
||||||
#include <Storages/DataLakes/StorageIceberg.h>
|
#include <Storages/DataLakes/Iceberg/StorageIceberg.h>
|
||||||
#include <Storages/DataLakes/StorageHudi.h>
|
#include <Storages/DataLakes/StorageHudi.h>
|
||||||
|
|
||||||
|
|
||||||
@ -35,7 +35,7 @@ void registerStorageDeltaLake(StorageFactory & factory)
|
|||||||
|
|
||||||
void registerStorageIceberg(StorageFactory & factory)
|
void registerStorageIceberg(StorageFactory & factory)
|
||||||
{
|
{
|
||||||
REGISTER_DATA_LAKE_STORAGE(StorageIcebergS3, StorageIcebergName::name)
|
REGISTER_DATA_LAKE_STORAGE(StorageIceberg, StorageIceberg::name)
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -33,7 +33,7 @@ protected:
|
|||||||
if (TableFunction::configuration.structure != "auto")
|
if (TableFunction::configuration.structure != "auto")
|
||||||
columns = parseColumnsListFromString(TableFunction::configuration.structure, context);
|
columns = parseColumnsListFromString(TableFunction::configuration.structure, context);
|
||||||
|
|
||||||
StoragePtr storage = std::make_shared<Storage>(
|
StoragePtr storage = Storage::create(
|
||||||
TableFunction::configuration, context, StorageID(TableFunction::getDatabaseName(), table_name),
|
TableFunction::configuration, context, StorageID(TableFunction::getDatabaseName(), table_name),
|
||||||
columns, ConstraintsDescription{}, String{}, std::nullopt);
|
columns, ConstraintsDescription{}, String{}, std::nullopt);
|
||||||
|
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
#if USE_AWS_S3 && USE_AVRO
|
#if USE_AWS_S3 && USE_AVRO
|
||||||
|
|
||||||
#include <Storages/DataLakes/StorageIceberg.h>
|
#include <Storages/DataLakes/Iceberg/StorageIceberg.h>
|
||||||
#include <TableFunctions/ITableFunctionDataLake.h>
|
#include <TableFunctions/ITableFunctionDataLake.h>
|
||||||
#include <TableFunctions/TableFunctionFactory.h>
|
#include <TableFunctions/TableFunctionFactory.h>
|
||||||
#include <TableFunctions/TableFunctionS3.h>
|
#include <TableFunctions/TableFunctionS3.h>
|
||||||
@ -17,7 +17,7 @@ struct TableFunctionIcebergName
|
|||||||
static constexpr auto name = "iceberg";
|
static constexpr auto name = "iceberg";
|
||||||
};
|
};
|
||||||
|
|
||||||
using TableFunctionIceberg = ITableFunctionDataLake<TableFunctionIcebergName, StorageIcebergS3, TableFunctionS3>;
|
using TableFunctionIceberg = ITableFunctionDataLake<TableFunctionIcebergName, StorageIceberg, TableFunctionS3>;
|
||||||
|
|
||||||
void registerTableFunctionIceberg(TableFunctionFactory & factory)
|
void registerTableFunctionIceberg(TableFunctionFactory & factory)
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user