From 92646e3c277fdb41b837abb8ac6206c56cd6098d Mon Sep 17 00:00:00 2001 From: flynn Date: Sat, 14 Jan 2023 17:55:59 +0000 Subject: [PATCH] initial --- src/Storages/StorageIceberg.cpp | 313 ++++++++++++++++++++++++++++++++ src/Storages/StorageIceberg.h | 117 ++++++++++++ 2 files changed, 430 insertions(+) create mode 100644 src/Storages/StorageIceberg.cpp create mode 100644 src/Storages/StorageIceberg.h diff --git a/src/Storages/StorageIceberg.cpp b/src/Storages/StorageIceberg.cpp new file mode 100644 index 00000000000..415cedc1920 --- /dev/null +++ b/src/Storages/StorageIceberg.cpp @@ -0,0 +1,313 @@ +#include "config.h" +#if USE_AWS_S3 + +# include +# include + +# include +# include +# include +# include + +# include +# include +# include + +# include + +# include +# include +# include + +# include + +# include +# include +# include + +# include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int S3_ERROR; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int INCORRECT_DATA; +} + +JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context) + : base_configuration(configuration_), table_path(table_path_) +{ + init(context); +} + +void JsonMetadataGetter::init(ContextPtr context) +{ + auto keys = getJsonLogFiles(); + + // read data from every json log file + for (const String & key : keys) + { + auto buf = createS3ReadBuffer(key, context); + + char c; + while (!buf->eof()) + { + /// May be some invalid characters before json. + while (buf->peek(c) && c != '{') + buf->ignore(); + + if (buf->eof()) + break; + + String json_str; + readJSONObjectPossiblyInvalid(json_str, *buf); + + if (json_str.empty()) + continue; + + const JSON json(json_str); + handleJSON(json); + } + } +} + +std::vector JsonMetadataGetter::getJsonLogFiles() +{ + std::vector keys; + + const auto & client = base_configuration.client; + + Aws::S3::Model::ListObjectsV2Request request; + Aws::S3::Model::ListObjectsV2Outcome outcome; + + bool is_finished{false}; + const auto bucket{base_configuration.uri.bucket}; + + request.SetBucket(bucket); + + /// DeltaLake format stores all metadata json files in _delta_log directory + static constexpr auto deltalake_metadata_directory = "_delta_log"; + request.SetPrefix(std::filesystem::path(table_path) / deltalake_metadata_directory); + + while (!is_finished) + { + outcome = client->ListObjectsV2(request); + if (!outcome.IsSuccess()) + throw Exception( + ErrorCodes::S3_ERROR, + "Could not list objects in bucket {} with key {}, S3 exception: {}, message: {}", + quoteString(bucket), + quoteString(table_path), + backQuote(outcome.GetError().GetExceptionName()), + quoteString(outcome.GetError().GetMessage())); + + const auto & result_batch = outcome.GetResult().GetContents(); + for (const auto & obj : result_batch) + { + const auto & filename = obj.GetKey(); + + // DeltaLake metadata files have json extension + if (std::filesystem::path(filename).extension() == ".json") + keys.push_back(filename); + } + + /// Needed in case any more results are available + /// if so, we will continue reading, and not read keys that were already read + request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken()); + + /// Set to false if all of the results were returned. Set to true if more keys + /// are available to return. If the number of results exceeds that specified by + /// MaxKeys, all of the results might not be returned + is_finished = !outcome.GetResult().GetIsTruncated(); + } + + return keys; +} + +std::shared_ptr JsonMetadataGetter::createS3ReadBuffer(const String & key, ContextPtr context) +{ + /// TODO: add parallel downloads + S3Settings::RequestSettings request_settings; + request_settings.max_single_read_retries = 10; + return std::make_shared( + base_configuration.client, + base_configuration.uri.bucket, + key, + base_configuration.uri.version_id, + request_settings, + context->getReadSettings()); +} + +void JsonMetadataGetter::handleJSON(const JSON & json) +{ + if (json.has("add")) + { + auto path = json["add"]["path"].getString(); + auto timestamp = json["add"]["modificationTime"].getInt(); + + metadata.setLastModifiedTime(path, timestamp); + } + else if (json.has("remove")) + { + auto path = json["remove"]["path"].getString(); + auto timestamp = json["remove"]["deletionTimestamp"].getInt(); + + metadata.remove(path, timestamp); + } +} + +namespace +{ + +StorageS3::S3Configuration getBaseConfiguration(const StorageS3Configuration & configuration) +{ + return {configuration.url, configuration.auth_settings, configuration.request_settings, configuration.headers}; +} + +// generateQueryFromKeys constructs query from all parquet filenames +// for underlying StorageS3 engine +String generateQueryFromKeys(const std::vector & keys) +{ + std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ",")); + return new_query; +} + + +StorageS3Configuration getAdjustedS3Configuration( + const ContextPtr & context, + StorageS3::S3Configuration & base_configuration, + const StorageS3Configuration & configuration, + const std::string & table_path, + Poco::Logger * log) +{ + JsonMetadataGetter getter{base_configuration, table_path, context}; + + auto keys = getter.getFiles(); + auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys); + + LOG_DEBUG(log, "New uri: {}", new_uri); + LOG_DEBUG(log, "Table path: {}", table_path); + + // set new url in configuration + StorageS3Configuration new_configuration; + new_configuration.url = new_uri; + new_configuration.auth_settings.access_key_id = configuration.auth_settings.access_key_id; + new_configuration.auth_settings.secret_access_key = configuration.auth_settings.secret_access_key; + new_configuration.format = configuration.format; + + return new_configuration; +} + +} + +StorageIceberg::StorageIceberg( + const StorageS3Configuration & configuration_, + const StorageID & table_id_, + ColumnsDescription columns_, + const ConstraintsDescription & constraints_, + const String & comment, + ContextPtr context_, + std::optional format_settings_) + : IStorage(table_id_) + , base_configuration{getBaseConfiguration(configuration_)} + , log(&Poco::Logger::get("StorageIceberg(" + table_id_.table_name + ")")) + , table_path(base_configuration.uri.key) +{ + StorageInMemoryMetadata storage_metadata; + StorageS3::updateS3Configuration(context_, base_configuration); + + auto new_configuration = getAdjustedS3Configuration(context_, base_configuration, configuration_, table_path, log); + + if (columns_.empty()) + { + columns_ = StorageS3::getTableStructureFromData( + new_configuration, /*distributed processing*/ false, format_settings_, context_, nullptr); + storage_metadata.setColumns(columns_); + } + else + storage_metadata.setColumns(columns_); + + + storage_metadata.setConstraints(constraints_); + storage_metadata.setComment(comment); + setInMemoryMetadata(storage_metadata); + + s3engine = std::make_shared( + new_configuration, + table_id_, + columns_, + constraints_, + comment, + context_, + format_settings_, + /* distributed_processing_ */ false, + nullptr); +} + +Pipe StorageIceberg::read( + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + size_t num_streams) +{ + StorageS3::updateS3Configuration(context, base_configuration); + + return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams); +} + +ColumnsDescription StorageIceberg::getTableStructureFromData( + const StorageS3Configuration & configuration, const std::optional & format_settings, ContextPtr ctx) +{ + auto base_configuration = getBaseConfiguration(configuration); + StorageS3::updateS3Configuration(ctx, base_configuration); + auto new_configuration = getAdjustedS3Configuration( + ctx, base_configuration, configuration, base_configuration.uri.key, &Poco::Logger::get("StorageIceberg")); + return StorageS3::getTableStructureFromData( + new_configuration, /*distributed processing*/ false, format_settings, ctx, /*object_infos*/ nullptr); +} + +void registerStorageIceberg(StorageFactory & factory) +{ + factory.registerStorage( + "Iceberg", + [](const StorageFactory::Arguments & args) + { + auto & engine_args = args.engine_args; + if (engine_args.empty() || engine_args.size() < 3) + throw Exception( + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, + "Storage Iceberg requires 3 to 4 arguments: table_url, access_key, secret_access_key, [format]"); + + StorageS3Configuration configuration; + + configuration.url = checkAndGetLiteralArgument(engine_args[0], "url"); + configuration.auth_settings.access_key_id = checkAndGetLiteralArgument(engine_args[1], "access_key_id"); + configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument(engine_args[2], "secret_access_key"); + + if (engine_args.size() == 4) + configuration.format = checkAndGetLiteralArgument(engine_args[3], "format"); + else + { + /// Iceberg uses Parquet by default. + configuration.format = "Parquet"; + } + + return std::make_shared( + configuration, args.table_id, args.columns, args.constraints, args.comment, args.getContext(), std::nullopt); + }, + { + .supports_settings = true, + .supports_schema_inference = true, + .source_access_type = AccessType::S3, + }); +} + +} + +#endif diff --git a/src/Storages/StorageIceberg.h b/src/Storages/StorageIceberg.h new file mode 100644 index 00000000000..d1e7bc6bcbd --- /dev/null +++ b/src/Storages/StorageIceberg.h @@ -0,0 +1,117 @@ +#pragma once + +#include "config.h" + +#if USE_AWS_S3 + +# include +# include + +# include +# include + +namespace Poco +{ +class Logger; +} + +namespace Aws::S3 +{ +class S3Client; +} + +namespace DB +{ + +// Class to parse iceberg metadata and find files needed for query in table +// Iceberg table directory outlook: +// table/ +// data/ +// metadata/ +// The metadata has three layers: metadata -> manifest list -> manifest files +class IcebergMetaParser +{ +public: + IcebergMetaParser(const StorageS3Configuration & configuration_, const String & table_path_, ContextPtr context_); + + void parseMeta(); + + String getNewestMetaFile(); + String getManiFestList(String metadata); + std::vector getManifestFiles(String manifest_list); + void getFilesForRead(const std::vector manifest_files); + + auto getFiles() const {return keys}; + +private: + std::vector keys; + + StorageS3Configuration base_configuration; + String table_path; + ContextPtr context; +}; + +// class to get deltalake log json files and read json from them +class JsonMetadataGetter +{ +public: + JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context); + + std::vector getFiles() { return std::move(metadata).listCurrentFiles(); } + +private: + void init(ContextPtr context); + + std::vector getJsonLogFiles(); + + std::shared_ptr createS3ReadBuffer(const String & key, ContextPtr context); + + void handleJSON(const JSON & json); + + StorageS3::S3Configuration base_configuration; + String table_path; + DeltaLakeMetadata metadata; +}; + +class StorageIceberg : public IStorage +{ +public: + // 1. Parses internal file structure of table + // 2. Finds out parts with latest version + // 3. Creates url for underlying StorageS3 enigne to handle reads + StorageIceberg( + const StorageS3Configuration & configuration_, + const StorageID & table_id_, + ColumnsDescription columns_, + const ConstraintsDescription & constraints_, + const String & comment, + ContextPtr context_, + std::optional format_settings_); + + String getName() const override { return "Iceberg"; } + + // Reads latest version of Iceberg table + Pipe read( + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + size_t num_streams) override; + + static ColumnsDescription getTableStructureFromData( + const StorageS3Configuration & configuration, + const std::optional & format_settings, + ContextPtr ctx); +private: + + StorageS3::S3Configuration base_configuration; + std::shared_ptr s3engine; + Poco::Logger * log; + String table_path; +}; + +} + +#endif