diff --git a/src/Storages/StorageDelta.cpp b/src/Storages/StorageDelta.cpp index 46db33279ab..a79762e29db 100644 --- a/src/Storages/StorageDelta.cpp +++ b/src/Storages/StorageDelta.cpp @@ -18,6 +18,9 @@ #include +#include +#include + namespace DB { @@ -25,6 +28,7 @@ namespace ErrorCodes { extern const int S3_ERROR; extern const int BAD_ARGUMENTS; + extern const int INCORRECT_DATA; } void DeltaLakeMetadata::add(const String & filename, uint64_t timestamp) @@ -34,7 +38,9 @@ void DeltaLakeMetadata::add(const String & filename, uint64_t timestamp) void DeltaLakeMetadata::remove(const String & filename, uint64_t /*timestamp */) { - file_update_time.erase(filename); + bool erase = file_update_time.erase(filename); + if (!erase) + throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid table metadata, tried to remove {} before adding it", filename); } std::vector DeltaLakeMetadata::ListCurrentFiles() && @@ -95,9 +101,6 @@ void JsonMetadataGetter::Init() auto timestamp = json["add"]["modificationTime"].getInt(); metadata.add(path, timestamp); - - LOG_DEBUG(log, "Path {}", path); - LOG_DEBUG(log, "Timestamp {}", timestamp); } else if (json.has("remove")) { @@ -105,9 +108,6 @@ void JsonMetadataGetter::Init() auto timestamp = json["remove"]["deletionTimestamp"].getInt(); metadata.remove(path, timestamp); - - LOG_DEBUG(log, "Path {}", path); - LOG_DEBUG(log, "Timestamp {}", timestamp); } // reset @@ -132,7 +132,7 @@ std::vector JsonMetadataGetter::getJsonLogFiles() const auto bucket{base_configuration.uri.bucket}; request.SetBucket(bucket); - request.SetPrefix(table_path + "_delta_log"); + request.SetPrefix(std::filesystem::path(table_path) / "_delta_log"); while (!is_finished) { @@ -151,7 +151,7 @@ std::vector JsonMetadataGetter::getJsonLogFiles() { const auto & filename = obj.GetKey(); - if (filename.substr(filename.size() - 5) == ".json") + if (std::filesystem::path(filename).extension() == ".json") keys.push_back(filename); } @@ -164,7 +164,6 @@ std::vector JsonMetadataGetter::getJsonLogFiles() std::unique_ptr JsonMetadataGetter::createS3ReadBuffer(const String & key) { - // TBD: add parallel downloads return std::make_unique( base_configuration.client, @@ -196,11 +195,6 @@ StorageDelta::StorageDelta( auto keys = getter.getFiles(); - for (const String & path : keys) - { - LOG_DEBUG(log, "{}", path); - } - auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(std::move(keys)); LOG_DEBUG(log, "New uri: {}", new_uri); @@ -225,7 +219,7 @@ StorageDelta::StorageDelta( access_key_, secret_access_key_, table_id_, - String("Parquet"), // format name + "Parquet", // format name base_configuration.rw_settings, columns_, constraints_, @@ -297,18 +291,7 @@ void StorageDelta::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configurat String StorageDelta::generateQueryFromKeys(std::vector && keys) { - String new_query; - - for (auto && key : keys) - { - if (!new_query.empty()) - { - new_query += ","; - } - new_query += key; - } - new_query = "{" + new_query + "}"; - + std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ",")); return new_query; } diff --git a/src/Storages/StorageDelta.h b/src/Storages/StorageDelta.h index f4547666c8c..f7fa4120495 100644 --- a/src/Storages/StorageDelta.h +++ b/src/Storages/StorageDelta.h @@ -29,11 +29,9 @@ class DeltaLakeMetadata public: DeltaLakeMetadata() = default; -public: void add(const String & filename, uint64_t timestamp); void remove(const String & filename, uint64_t timestamp); -public: std::vector ListCurrentFiles() &&; private: @@ -46,18 +44,15 @@ class JsonMetadataGetter public: JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, Poco::Logger * log_); + std::vector getFiles() { return std::move(metadata).ListCurrentFiles(); } + private: void Init(); std::vector getJsonLogFiles(); -private: std::unique_ptr createS3ReadBuffer(const String & key); -public: - std::vector getFiles() { return std::move(metadata).ListCurrentFiles(); } - -private: StorageS3::S3Configuration base_configuration; String table_path; DeltaLakeMetadata metadata; @@ -92,11 +87,8 @@ private: void Init(); static void updateS3Configuration(ContextPtr, StorageS3::S3Configuration &); - -private: static String generateQueryFromKeys(std::vector && keys); -private: StorageS3::S3Configuration base_configuration; std::shared_ptr s3engine; Poco::Logger * log;