From a8e136315159ab28a5484344f75eeba4a2d09747 Mon Sep 17 00:00:00 2001 From: flynn Date: Wed, 18 Jan 2023 03:43:20 +0000 Subject: [PATCH] implement storage iceberg --- .../Formats/Impl/AvroRowInputFormat.cpp | 50 ++-- .../Formats/Impl/AvroRowInputFormat.h | 19 +- src/Storages/StorageIceberg.cpp | 232 +++++++++++++----- src/Storages/StorageIceberg.h | 35 +-- 4 files changed, 216 insertions(+), 120 deletions(-) diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index 8c6cd8bd91b..5dfcf1ea589 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -68,35 +68,35 @@ namespace ErrorCodes extern const int CANNOT_READ_ALL_DATA; } -class InputStreamReadBufferAdapter : public avro::InputStream +bool AvroInputStreamReadBufferAdapter::next(const uint8_t ** data, size_t * len) { -public: - explicit InputStreamReadBufferAdapter(ReadBuffer & in_) : in(in_) {} - - bool next(const uint8_t ** data, size_t * len) override + if (in.eof()) { - if (in.eof()) - { - *len = 0; - return false; - } - - *data = reinterpret_cast(in.position()); - *len = in.available(); - - in.position() += in.available(); - return true; + *len = 0; + return false; } - void backup(size_t len) override { in.position() -= len; } + *data = reinterpret_cast(in.position()); + *len = in.available(); - void skip(size_t len) override { in.tryIgnore(len); } + in.position() += in.available(); + return true; +} - size_t byteCount() const override { return in.count(); } +void AvroInputStreamReadBufferAdapter::backup(size_t len) +{ + in.position() -= len; +} -private: - ReadBuffer & in; -}; +void AvroInputStreamReadBufferAdapter::skip(size_t len) +{ + in.tryIgnore(len); +} + +size_t AvroInputStreamReadBufferAdapter::byteCount() const +{ + return in.count(); +} /// Insert value with conversion to the column of target type. template @@ -758,7 +758,7 @@ AvroRowInputFormat::AvroRowInputFormat(const Block & header_, ReadBuffer & in_, void AvroRowInputFormat::readPrefix() { - file_reader_ptr = std::make_unique(std::make_unique(*in)); + file_reader_ptr = std::make_unique(std::make_unique(*in)); deserializer_ptr = std::make_unique( output.getHeader(), file_reader_ptr->dataSchema(), format_settings.avro.allow_missing_fields, format_settings.avro.null_as_default); file_reader_ptr->init(); @@ -915,7 +915,7 @@ AvroConfluentRowInputFormat::AvroConfluentRowInputFormat( void AvroConfluentRowInputFormat::readPrefix() { - input_stream = std::make_unique(*in); + input_stream = std::make_unique(*in); decoder = avro::binaryDecoder(); decoder->init(*input_stream); } @@ -972,7 +972,7 @@ NamesAndTypesList AvroSchemaReader::readSchema() } else { - auto file_reader_ptr = std::make_unique(std::make_unique(in)); + auto file_reader_ptr = std::make_unique(std::make_unique(in)); root_node = file_reader_ptr->dataSchema().root(); } diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.h b/src/Processors/Formats/Impl/AvroRowInputFormat.h index 4525d7d33b0..cef792da546 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.h +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.h @@ -28,6 +28,23 @@ namespace ErrorCodes extern const int INCORRECT_DATA; } +class AvroInputStreamReadBufferAdapter : public avro::InputStream +{ +public: + explicit AvroInputStreamReadBufferAdapter(ReadBuffer & in_) : in(in_) {} + + bool next(const uint8_t ** data, size_t * len) override; + + void backup(size_t len) override; + + void skip(size_t len) override; + + size_t byteCount() const override; + +private: + ReadBuffer & in; +}; + class AvroDeserializer { public: @@ -185,8 +202,8 @@ public: NamesAndTypesList readSchema() override; + static DataTypePtr avroNodeToDataType(avro::NodePtr node); private: - DataTypePtr avroNodeToDataType(avro::NodePtr node); bool confluent; const FormatSettings format_settings; diff --git a/src/Storages/StorageIceberg.cpp b/src/Storages/StorageIceberg.cpp index 415cedc1920..94c013a404b 100644 --- a/src/Storages/StorageIceberg.cpp +++ b/src/Storages/StorageIceberg.cpp @@ -25,7 +25,11 @@ # include # include -# include +# include + +# include +# include +# include namespace DB { @@ -35,48 +39,35 @@ namespace ErrorCodes extern const int S3_ERROR; extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int INCORRECT_DATA; + extern const int FILE_DOESNT_EXIST; } -JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context) - : base_configuration(configuration_), table_path(table_path_) +IcebergMetaParser::IcebergMetaParser(const StorageS3Configuration & configuration_, const String & table_path_, ContextPtr context_) + : base_configuration(configuration_), table_path(table_path_), context(context_) { - init(context); } -void JsonMetadataGetter::init(ContextPtr context) +std::vector IcebergMetaParser::getFiles() const { - auto keys = getJsonLogFiles(); + auto metadata = getNewestMetaFile(); + auto manifest_list = getManiFestList(metadata); - // read data from every json log file - for (const String & key : keys) + /// When table first created and does not have any data + if (manifest_list.empty()) { - auto buf = createS3ReadBuffer(key, context); - - char c; - while (!buf->eof()) - { - /// May be some invalid characters before json. - while (buf->peek(c) && c != '{') - buf->ignore(); - - if (buf->eof()) - break; - - String json_str; - readJSONObjectPossiblyInvalid(json_str, *buf); - - if (json_str.empty()) - continue; - - const JSON json(json_str); - handleJSON(json); - } + return {}; } + + auto manifest_files = getManifestFiles(manifest_list); + return getFilesForRead(manifest_files); } -std::vector JsonMetadataGetter::getJsonLogFiles() +String IcebergMetaParser::getNewestMetaFile() const { - std::vector keys; + /// Iceberg stores all the metadata.json in metadata directory, and the + /// newest version has the max version name, so we should list all of them + /// then find the newest metadata. + std::vector metadata_files; const auto & client = base_configuration.client; @@ -88,9 +79,8 @@ std::vector JsonMetadataGetter::getJsonLogFiles() request.SetBucket(bucket); - /// DeltaLake format stores all metadata json files in _delta_log directory - static constexpr auto deltalake_metadata_directory = "_delta_log"; - request.SetPrefix(std::filesystem::path(table_path) / deltalake_metadata_directory); + static constexpr auto metadata_directory = "metadata"; + request.SetPrefix(std::filesystem::path(table_path) / metadata_directory); while (!is_finished) { @@ -109,27 +99,156 @@ std::vector JsonMetadataGetter::getJsonLogFiles() { const auto & filename = obj.GetKey(); - // DeltaLake metadata files have json extension if (std::filesystem::path(filename).extension() == ".json") - keys.push_back(filename); + metadata_files.push_back(filename); } - /// Needed in case any more results are available - /// if so, we will continue reading, and not read keys that were already read request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken()); - /// Set to false if all of the results were returned. Set to true if more keys - /// are available to return. If the number of results exceeds that specified by - /// MaxKeys, all of the results might not be returned is_finished = !outcome.GetResult().GetIsTruncated(); } - return keys; + if (metadata_files.empty()) + throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "The metadata file for Iceberg table with path {} doesn't exist", table_path); + + auto it = std::max_element(metadata_files.begin(), metadata_files.end()); + return *it; } -std::shared_ptr JsonMetadataGetter::createS3ReadBuffer(const String & key, ContextPtr context) +String IcebergMetaParser::getManiFestList(String metadata_name) const +{ + auto buffer = createS3ReadBuffer(metadata_name, context); + String json_str; + readJSONObjectPossiblyInvalid(json_str, file); + + /// Looks like base/base/JSON.h can not parse this json file + Poco::JSON::Parser parser; + Poco::Dynamic::Var json = parser.parse(json_str); + Poco::JSON::Object::Ptr object = json.extract(); + + auto current_snapshot_id = object->getValue("current-snapshot-id"); + + auto snapshots = object->get("snapshots").extract(); + + for (size_t i = 0; i < snapshots->size(); ++i) + { + auto snapshot = snapshots->getObject(static_cast(i)); + if (snapshot->getValue("snapshot-id") == current_snapshot_id) + return object->getValue("manifest-list"); + } + + return {}; +} + +static ColumnPtr +parseAvro(const std::uniq_ptr & file_reader, const DataTypePtr & data_type, const String & field_name) +{ + auto deserializer = std::make_unique( + Block{{data_type->createColumn(), data_type, field_name}}, file_reader->dataSchema(), true, true); + file_reader->init(); + MutableColumns columns; + columns.emplace_back(data_type->createColumn()); + + RowReadExtension ext; + while (file_reader->hasMore()) + { + file_reader->decr(); + deserializer->deserializeRow(columns, file_reader->decoder, ext); + } + return columns.at(0); +} + +std::vector IcebergMetaParser::getManifestFiles(const String & manifest_list) const +{ + auto buffer = createS3ReadBuffer(manifest_list, context); + + auto file_reader = std::make_unique(std::make_unique(in)); + + static constexpr manifest_path = "manifest_path"; + + /// The manifest_path is the first field in manifest list file, + /// And its have String data type + /// {'manifest_path': 'xxx', ...} + auto data_type = AvroSchemaReader::avroNodeToDataType(file_reader->dataSchema().root()->leafAt(0)); + auto col = parseAvro(file_reader, data_type, manifest_path); + + std::vector res; + if (col->getDataType() == TypeIndex::String) + { + const auto * col_str = typeid_cast(col.get()); + size_t col_size = col_str->size(); + for (size_t i = 0; i < col_size; ++i) + { + auto file_path = col_str[i].safeGet(); + /// We just need obtain the file name + std::filesystem::path path(file_path); + res.emplace_back(path.filename()); + } + + return res; + } + Throw Exception( + ErrorCodes::ILLEGAL_COLUMN, + "The parsed column from Avro file for manifest_path should have data type String, but get {}", + col->getFamilyName()); +} + +std::vector IcebergMetaParser::getFilesForRead(const std::vector & manifest_files) const +{ + std::vector keys; + for (const auto & manifest_file : manifest_files) + { + auto buffer = createS3ReadBuffer(manifest_file, context); + + auto file_reader = std::make_unique(std::make_unique(in)); + + static constexpr manifest_path = "data_file"; + + /// The data_file filed at the 3rd position of the manifest file: + /// {'status': xx, 'snapshot_id': xx, 'data_file': {'file_path': 'xxx', ...}, ...} + /// and it's also a nested record, so its result type is a nested Tuple + auto data_type = AvroSchemaReader::avroNodeToDataType(file_reader->dataSchema().root()->leafAt(2)); + auto col = parseAvro(file_reader, data_type, manifest_path); + + std::vector res; + if (col->getDataType() == TypeIndex::Tuple) + { + auto * col_tuple = typeid_cast(col.get()); + auto * col_str = col_tuple->getColumnPtr(0); + if (col_str->getDataType() == TypeIndex::String) + { + const auto * str_col = typeid_cast(col_str.get()); + size_t col_size = str_col->size(); + for (size_t i = 0; i < col_size; ++i) + { + auto file_path = std_col[i].safeGet(); + /// We just obtain the parition/file name + std::filesystem::path path(file_path); + res.emplace_back(path.parent_path().filename() + '/' + path.filename()); + } + } + else + { + Throw Exception( + ErrorCodes::ILLEGAL_COLUMN, + "The parsed column from Avro file for file_path should have data type String, got {}", + col_str->getFamilyName()); + } + } + else + { + Throw Exception( + ErrorCodes::ILLEGAL_COLUMN, + "The parsed column from Avro file for data_file field should have data type Tuple, got {}", + col->getFamilyName()); + } + } + + return res; +} + +std::shared_ptr IcebergMetaParser::createS3ReadBuffer(const String & key, ContextPtr context) { - /// TODO: add parallel downloads S3Settings::RequestSettings request_settings; request_settings.max_single_read_retries = 10; return std::make_shared( @@ -141,24 +260,6 @@ std::shared_ptr JsonMetadataGetter::createS3ReadBuffer(const String context->getReadSettings()); } -void JsonMetadataGetter::handleJSON(const JSON & json) -{ - if (json.has("add")) - { - auto path = json["add"]["path"].getString(); - auto timestamp = json["add"]["modificationTime"].getInt(); - - metadata.setLastModifiedTime(path, timestamp); - } - else if (json.has("remove")) - { - auto path = json["remove"]["path"].getString(); - auto timestamp = json["remove"]["deletionTimestamp"].getInt(); - - metadata.remove(path, timestamp); - } -} - namespace { @@ -183,10 +284,11 @@ StorageS3Configuration getAdjustedS3Configuration( const std::string & table_path, Poco::Logger * log) { - JsonMetadataGetter getter{base_configuration, table_path, context}; + IcebergMetaParser parser{base_configuration, table_path, context}; - auto keys = getter.getFiles(); - auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(keys); + auto keys = parser.getFiles(); + static constexpr iceberg_data_directory = "data"; + auto new_uri = std::filesystem::path(base_configuration.uri.uri.toString()) / iceberg_data_directory / generateQueryFromKeys(keys); LOG_DEBUG(log, "New uri: {}", new_uri); LOG_DEBUG(log, "Table path: {}", table_path); diff --git a/src/Storages/StorageIceberg.h b/src/Storages/StorageIceberg.h index d1e7bc6bcbd..a139a40fc29 100644 --- a/src/Storages/StorageIceberg.h +++ b/src/Storages/StorageIceberg.h @@ -34,43 +34,20 @@ class IcebergMetaParser public: IcebergMetaParser(const StorageS3Configuration & configuration_, const String & table_path_, ContextPtr context_); - void parseMeta(); - - String getNewestMetaFile(); - String getManiFestList(String metadata); - std::vector getManifestFiles(String manifest_list); - void getFilesForRead(const std::vector manifest_files); - - auto getFiles() const {return keys}; + std::vector getFiles() const; private: - std::vector keys; - StorageS3Configuration base_configuration; String table_path; ContextPtr context; -}; -// class to get deltalake log json files and read json from them -class JsonMetadataGetter -{ -public: - JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context); - - std::vector getFiles() { return std::move(metadata).listCurrentFiles(); } - -private: - void init(ContextPtr context); - - std::vector getJsonLogFiles(); + /// Just get file name + String getNewestMetaFile() const; + String getManiFestList(const String & metadata_name) const; + std::vector getManifestFiles(const String & manifest_list) const; + std::vector getFilesForRead(const std::vector & manifest_files); std::shared_ptr createS3ReadBuffer(const String & key, ContextPtr context); - - void handleJSON(const JSON & json); - - StorageS3::S3Configuration base_configuration; - String table_path; - DeltaLakeMetadata metadata; }; class StorageIceberg : public IStorage