diff --git a/src/Storages/StorageDelta.cpp b/src/Storages/StorageDelta.cpp index 7ab522860e8..59e7c3d3187 100644 --- a/src/Storages/StorageDelta.cpp +++ b/src/Storages/StorageDelta.cpp @@ -60,20 +60,20 @@ std::vector DeltaLakeMetadata::ListCurrentFiles() && return keys; } -JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_) +JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context) : base_configuration(configuration_), table_path(table_path_) { - Init(); + Init(context); } -void JsonMetadataGetter::Init() +void JsonMetadataGetter::Init(ContextPtr context) { auto keys = getJsonLogFiles(); // read data from every json log file for (const String & key : keys) { - auto buf = createS3ReadBuffer(key); + auto buf = createS3ReadBuffer(key, context); while (!buf->eof()) { @@ -110,6 +110,8 @@ std::vector JsonMetadataGetter::getJsonLogFiles() const auto bucket{base_configuration.uri.bucket}; request.SetBucket(bucket); + + // DeltaLake format stores all metadata json files in _delta_log directory request.SetPrefix(std::filesystem::path(table_path) / "_delta_log"); while (!is_finished) @@ -129,18 +131,25 @@ std::vector JsonMetadataGetter::getJsonLogFiles() { const auto & filename = obj.GetKey(); + // DeltaLake metadata files have json extension if (std::filesystem::path(filename).extension() == ".json") keys.push_back(filename); } + // Needed in case any more results are available + // if so, we will continue reading, and not read keys that were already read request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken()); + + /// Set to false if all of the results were returned. Set to true if more keys + /// are available to return. If the number of results exceeds that specified by + /// MaxKeys, all of the results might not be returned is_finished = !outcome.GetResult().GetIsTruncated(); } return keys; } -std::shared_ptr JsonMetadataGetter::createS3ReadBuffer(const String & key) +std::shared_ptr JsonMetadataGetter::createS3ReadBuffer(const String & key, ContextPtr context) { // TBD: add parallel downloads return std::make_shared( @@ -149,7 +158,7 @@ std::shared_ptr JsonMetadataGetter::createS3ReadBuffer(const String key, base_configuration.uri.version_id, /* max single read retries */ 10, - ReadSettings{}); + context->getReadSettings()); } void JsonMetadataGetter::handleJSON(const JSON & json) @@ -186,7 +195,7 @@ StorageDelta::StorageDelta( StorageInMemoryMetadata storage_metadata; StorageS3::updateS3Configuration(context_, base_configuration); - JsonMetadataGetter getter{base_configuration, table_path}; + JsonMetadataGetter getter{base_configuration, table_path, context_}; auto keys = getter.getFiles(); @@ -245,6 +254,9 @@ Pipe StorageDelta::read( String StorageDelta::generateQueryFromKeys(std::vector && keys) { + // DeltaLake store data parts in different files + // keys are filenames of parts + // for StorageS3 to read all parts we need format {key1,key2,key3,...keyn} std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ",")); return new_query; } @@ -270,6 +282,7 @@ void registerStorageDelta(StorageFactory & factory) if (engine_args.size() == 4) configuration.format = checkAndGetLiteralArgument(engine_args[3], "format"); + // DeltaLake uses Parquet by default if (configuration.format == "auto") configuration.format = "Parquet"; diff --git a/src/Storages/StorageDelta.h b/src/Storages/StorageDelta.h index d0992dd4cb7..f759d0086a2 100644 --- a/src/Storages/StorageDelta.h +++ b/src/Storages/StorageDelta.h @@ -42,16 +42,16 @@ private: class JsonMetadataGetter { public: - JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_); + JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context); std::vector getFiles() { return std::move(metadata).ListCurrentFiles(); } private: - void Init(); + void Init(ContextPtr context); std::vector getJsonLogFiles(); - std::shared_ptr createS3ReadBuffer(const String & key); + std::shared_ptr createS3ReadBuffer(const String & key, ContextPtr context); void handleJSON(const JSON & json); @@ -63,6 +63,9 @@ private: class StorageDelta : public IStorage { public: + // 1. Parses internal file structure of table + // 2. Finds out parts with latest version + // 3. Creates url for underlying StorageS3 enigne to handle reads StorageDelta( const StorageS3Configuration & configuration_, const StorageID & table_id_, @@ -74,6 +77,7 @@ public: String getName() const override { return "DeltaLake"; } + // Reads latest version of DeltaLake table Pipe read( const Names & column_names, const StorageSnapshotPtr & storage_snapshot, @@ -85,6 +89,11 @@ public: private: void Init(); + + // DeltaLake stores data in parts in different files + // keys is vector of parts with latest version + // generateQueryFromKeys constructs query from parts filenames for + // underlying StorageS3 engine static String generateQueryFromKeys(std::vector && keys); StorageS3::S3Configuration base_configuration; diff --git a/src/Storages/StorageHudi.cpp b/src/Storages/StorageHudi.cpp index e2a46fad2d0..46016771f49 100644 --- a/src/Storages/StorageHudi.cpp +++ b/src/Storages/StorageHudi.cpp @@ -142,8 +142,7 @@ std::string StorageHudi::generateQueryFromKeys(std::vector && keys, std::erase_if(keys, [&format](const std::string & s) { return std::filesystem::path(s).extension() != "." + format; }); // for each partition path take only latest file - - std::unordered_map> latest_parquets; + std::unordered_map> latest_parts; for (const auto & key : keys) { @@ -161,9 +160,9 @@ std::string StorageHudi::generateQueryFromKeys(std::vector && keys, // every filename contains metadata split by "_", timestamp is after last "_" uint64_t timestamp = std::stoul(key.substr(key.find_last_of("_") + 1)); - auto it = latest_parquets.find(path); + auto it = latest_parts.find(path); - if (it != latest_parquets.end()) + if (it != latest_parts.end()) { if (it->second.second < timestamp) { @@ -172,13 +171,13 @@ std::string StorageHudi::generateQueryFromKeys(std::vector && keys, } else { - latest_parquets[path] = {key, timestamp}; + latest_parts[path] = {key, timestamp}; } } std::vector filtered_keys; std::transform( - latest_parquets.begin(), latest_parquets.end(), std::back_inserter(filtered_keys), [](const auto & kv) { return kv.second.first; }); + latest_parts.begin(), latest_parts.end(), std::back_inserter(filtered_keys), [](const auto & kv) { return kv.second.first; }); std::string new_query; @@ -218,6 +217,7 @@ void registerStorageHudi(StorageFactory & factory) if (engine_args.size() == 4) configuration.format = checkAndGetLiteralArgument(engine_args[3], "format"); + // Apache Hudi uses Parquet by default if (configuration.format == "auto") configuration.format = "Parquet"; diff --git a/src/Storages/StorageHudi.h b/src/Storages/StorageHudi.h index 91a77ec83ff..c25c347f4c2 100644 --- a/src/Storages/StorageHudi.h +++ b/src/Storages/StorageHudi.h @@ -23,6 +23,9 @@ namespace DB class StorageHudi : public IStorage { public: + // 1. Parses internal file structure of table + // 2. Finds out parts with latest version + // 3. Creates url for underlying StorageS3 enigne to handle reads StorageHudi( const StorageS3Configuration & configuration_, const StorageID & table_id_, @@ -34,6 +37,8 @@ public: String getName() const override { return "Hudi"; } + + // Reads latest version of Apache Hudi table Pipe read( const Names & column_names, const StorageSnapshotPtr & storage_snapshot, @@ -45,6 +50,12 @@ public: private: std::vector getKeysFromS3(); + + // Apache Hudi store parts of data in different files + // Every part file has timestamp in it + // Every partition(directory) in Apache Hudi has different versions of part + // To find needed parts we need to find out latest part file for every partition + // Part format is usually parquet, but can differ static std::string generateQueryFromKeys(std::vector && keys, String format); StorageS3::S3Configuration base_configuration;