diff --git a/src/Storages/StorageDelta.cpp b/src/Storages/StorageDelta.cpp index 52f955dee0a..5fa60206360 100644 --- a/src/Storages/StorageDelta.cpp +++ b/src/Storages/StorageDelta.cpp @@ -2,9 +2,9 @@ #include #include -#include #include #include +#include #include #include @@ -24,49 +24,52 @@ namespace ErrorCodes extern const int S3_ERROR; } -void DeltaLakeMetadata::add(const String & key, uint64_t timestamp) { +void DeltaLakeMetadata::add(const String & key, uint64_t timestamp) +{ file_update_time[key] = timestamp; } -void DeltaLakeMetadata::remove(const String & key, uint64_t /*timestamp */) { +void DeltaLakeMetadata::remove(const String & key, uint64_t /*timestamp */) +{ file_update_time.erase(key); } -std::vector DeltaLakeMetadata::ListCurrentFiles() && { +std::vector DeltaLakeMetadata::ListCurrentFiles() && +{ std::vector keys; keys.reserve(file_update_time.size()); - for (auto && [k, _] : file_update_time) { + for (auto && [k, _] : file_update_time) + { keys.push_back(k); } return keys; } -JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_, - const String & table_path_, - Poco::Logger * log_) : - base_configuration(configuration_) - , table_path(table_path_) - , metadata() - , log(log_) - { - Init(); - } +JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, Poco::Logger * log_) + : base_configuration(configuration_), table_path(table_path_), metadata(), log(log_) +{ + Init(); +} -void JsonMetadataGetter::Init() { +void JsonMetadataGetter::Init() +{ auto keys = getJsonLogFiles(); - + // read data from every json log file - for (const String & key : keys) { + for (const String & key : keys) + { auto buf = createS3ReadBuffer(key); String json_str; size_t opening(0), closing(0); char c; - while (buf->read(c)) { + while (buf->read(c)) + { // skip all space characters for JSON to parse correctly - if (isspace(c)) { + if (isspace(c)) + { continue; } @@ -77,13 +80,14 @@ void JsonMetadataGetter::Init() { else if (c == '}') closing++; - if (opening == closing) { - + if (opening == closing) + { LOG_DEBUG(log, "JSON {}, {}", json_str, json_str.size()); JSON json(json_str); - if (json.has("add")) { + if (json.has("add")) + { auto path = json["add"]["path"].getString(); auto timestamp = json["add"]["modificationTime"].getInt(); @@ -91,7 +95,9 @@ void JsonMetadataGetter::Init() { LOG_DEBUG(log, "Path {}", path); LOG_DEBUG(log, "Timestamp {}", timestamp); - } else if (json.has("remove")) { + } + else if (json.has("remove")) + { auto path = json["remove"]["path"].getString(); auto timestamp = json["remove"]["deletionTimestamp"].getInt(); @@ -100,20 +106,18 @@ void JsonMetadataGetter::Init() { LOG_DEBUG(log, "Path {}", path); LOG_DEBUG(log, "Timestamp {}", timestamp); } - + // reset opening = 0; closing = 0; json_str.clear(); - } - } } - } -std::vector JsonMetadataGetter::getJsonLogFiles() { +std::vector JsonMetadataGetter::getJsonLogFiles() +{ std::vector keys; const auto & client = base_configuration.client; @@ -143,7 +147,7 @@ std::vector JsonMetadataGetter::getJsonLogFiles() { for (const auto & obj : result_batch) { const auto & filename = obj.GetKey(); - + if (filename.substr(filename.size() - 5) == ".json") keys.push_back(filename); } @@ -155,11 +159,17 @@ std::vector JsonMetadataGetter::getJsonLogFiles() { return keys; } -std::unique_ptr JsonMetadataGetter::createS3ReadBuffer(const String & key) { - // size_t object_size = DB::S3::getObjectSize(base_configuration.client, base_configuration.uri.bucket, key, base_configuration.uri.version_id, false); +std::unique_ptr JsonMetadataGetter::createS3ReadBuffer(const String & key) +{ // TBD: add parallel downloads - return std::make_unique(base_configuration.client, base_configuration.uri.bucket, key, base_configuration.uri.version_id, /* max single read retries */ 10, ReadSettings{}); + return std::make_unique( + base_configuration.client, + base_configuration.uri.bucket, + key, + base_configuration.uri.version_id, + /* max single read retries */ 10, + ReadSettings{}); } StorageDelta::StorageDelta( @@ -178,12 +188,13 @@ StorageDelta::StorageDelta( { StorageInMemoryMetadata storage_metadata; updateS3Configuration(context_, base_configuration); - + JsonMetadataGetter getter{base_configuration, table_path, log}; auto keys = getter.getFiles(); - for (const String & path : keys) { + for (const String & path : keys) + { LOG_DEBUG(log, "{}", path); } @@ -200,7 +211,7 @@ StorageDelta::StorageDelta( storage_metadata.setColumns(columns_); } else - storage_metadata.setColumns(columns_); + storage_metadata.setColumns(columns_); storage_metadata.setConstraints(constraints_); storage_metadata.setComment(comment); diff --git a/src/Storages/StorageDelta.h b/src/Storages/StorageDelta.h index 2aec4f815f3..4e9199a5faa 100644 --- a/src/Storages/StorageDelta.h +++ b/src/Storages/StorageDelta.h @@ -22,7 +22,8 @@ namespace DB { // class to parse json deltalake metadata and find files needed for query in table -class DeltaLakeMetadata { +class DeltaLakeMetadata +{ public: DeltaLakeMetadata() = default; @@ -38,13 +39,10 @@ private: }; // class to get deltalake log json files and read json from them -class JsonMetadataGetter +class JsonMetadataGetter { public: - JsonMetadataGetter(StorageS3::S3Configuration & configuration_, - const String & table_path_, - Poco::Logger * log_ - ); + JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, Poco::Logger * log_); private: void Init(); @@ -91,7 +89,7 @@ public: private: void Init(); static void updateS3Configuration(ContextPtr, StorageS3::S3Configuration &); - + private: String generateQueryFromKeys(std::vector && keys);