From e82c4800f282bb750cbd3fbec066d264c63b7d4a Mon Sep 17 00:00:00 2001 From: Daniil Rubin Date: Fri, 26 Aug 2022 18:17:32 +0000 Subject: [PATCH] Better --- src/Storages/StorageHudi.cpp | 134 ++++++++++++++++++----------------- src/Storages/StorageHudi.h | 23 +++--- 2 files changed, 80 insertions(+), 77 deletions(-) diff --git a/src/Storages/StorageHudi.cpp b/src/Storages/StorageHudi.cpp index 21086a7e085..efba2d3f85f 100644 --- a/src/Storages/StorageHudi.cpp +++ b/src/Storages/StorageHudi.cpp @@ -12,9 +12,11 @@ #include -namespace DB { +namespace DB +{ -namespace ErrorCodes { +namespace ErrorCodes +{ extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; extern const int S3_ERROR; } @@ -24,11 +26,11 @@ StorageHudi::StorageHudi( const String & access_key_, const String & secret_access_key_, const StorageID & table_id_, - const ColumnsDescription & columns_, + ColumnsDescription columns_, const ConstraintsDescription & constraints_, const String & comment, - ContextPtr context_ -) : IStorage(table_id_) + ContextPtr context_) + : IStorage(table_id_) , base_configuration({uri_, access_key_, secret_access_key_, {}, {}, {}}) , log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")")) { @@ -38,25 +40,16 @@ StorageHudi::StorageHudi( auto keys = getKeysFromS3(); auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(std::move(keys)); - + LOG_DEBUG(log, "New uri: {}", new_uri); auto s3_uri = S3::URI(Poco::URI(new_uri)); -// StorageS3::S3Configuration s3_configuration{s3_uri, access_key_, secret_access_key_, {}, {}, {}}; if (columns_.empty()) { - auto columns = StorageS3::getTableStructureFromData( - String("Parquet"), - s3_uri, - access_key_, - secret_access_key_, - "", - false, - {}, - context_ - ); - storage_metadata.setColumns(columns); + columns_ + = StorageS3::getTableStructureFromData(String("Parquet"), s3_uri, access_key_, secret_access_key_, "", false, {}, context_); + storage_metadata.setColumns(columns_); } else storage_metadata.setColumns(columns_); @@ -76,30 +69,21 @@ StorageHudi::StorageHudi( constraints_, comment, context_, - std::nullopt - ); + std::nullopt); } Pipe StorageHudi::read( - const Names & column_names, - const StorageSnapshotPtr & storage_snapshot, - SelectQueryInfo & query_info, - ContextPtr context, - QueryProcessingStage::Enum processed_stage, - size_t max_block_size, - unsigned num_streams) + const Names & column_names, + const StorageSnapshotPtr & storage_snapshot, + SelectQueryInfo & query_info, + ContextPtr context, + QueryProcessingStage::Enum processed_stage, + size_t max_block_size, + unsigned num_streams) { updateS3Configuration(context, base_configuration); - //auto keys = getKeysFromS3(); - - //auto new_uri = base_configuration.uri.uri.toString() + "/" + generateQueryFromKeys(std::forward(keys)); - //s3_configuration.uri = S3::URI(Poco::URI(new_uri)); - - return s3engine->read(column_names, storage_snapshot, - query_info, context, processed_stage, - max_block_size, num_streams); - + return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams); } void StorageHudi::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configuration & upd) @@ -128,7 +112,8 @@ void StorageHudi::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configurati S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration( settings.auth_settings.region, - ctx->getRemoteHostFilter(), ctx->getGlobalContext()->getSettingsRef().s3_max_redirects, + ctx->getRemoteHostFilter(), + ctx->getGlobalContext()->getSettingsRef().s3_max_redirects, ctx->getGlobalContext()->getSettingsRef().enable_s3_requests_logging); client_configuration.endpointOverride = upd.uri.endpoint; @@ -147,11 +132,12 @@ void StorageHudi::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configurati upd.auth_settings = std::move(settings.auth_settings); } -std::vector StorageHudi::getKeysFromS3() { +std::vector StorageHudi::getKeysFromS3() +{ std::vector keys; const auto & client = base_configuration.client; - + Aws::S3::Model::ListObjectsV2Request request; Aws::S3::Model::ListObjectsV2Outcome outcome; @@ -176,7 +162,7 @@ std::vector StorageHudi::getKeysFromS3() { const auto & result_batch = outcome.GetResult().GetContents(); for (const auto & obj : result_batch) { - const auto& filename = obj.GetKey(); + const auto & filename = obj.GetKey(); keys.push_back(filename); //LOG_DEBUG(log, "Found file: {}", filename); } @@ -188,49 +174,64 @@ std::vector StorageHudi::getKeysFromS3() { return keys; } -std::string StorageHudi::generateQueryFromKeys(std::vector&& keys) { +std::string StorageHudi::generateQueryFromKeys(std::vector && keys) +{ // filter only .parquet files - std::erase_if(keys, [](const std::string& s) { - if (s.size() >= 8) { - return s.substr(s.size() - 8) != ".parquet"; - } - return true; - }); + std::erase_if( + keys, + [](const std::string & s) + { + if (s.size() >= 8) + { + return s.substr(s.size() - 8) != ".parquet"; + } + return true; + }); // for each partition path take only latest parquet file std::unordered_map> latest_parquets; - - for (const auto& key : keys) { + for (const auto & key : keys) + { auto slash = key.find_last_of("/"); std::string path; - if (slash == std::string::npos) { + if (slash == std::string::npos) + { path = ""; - } else { + } + else + { path = key.substr(0, slash); } - uint64_t timestamp = std::stoul(key.substr(key.find_last_of("_")+1)); + uint64_t timestamp = std::stoul(key.substr(key.find_last_of("_") + 1)); auto it = latest_parquets.find(path); - - if (it != latest_parquets.end()) { - if (it->second.second < timestamp) { + + if (it != latest_parquets.end()) + { + if (it->second.second < timestamp) + { it->second = {key, timestamp}; } - } else { + } + else + { latest_parquets[path] = {key, timestamp}; } } std::vector filtered_keys; - std::transform(latest_parquets.begin(), latest_parquets.end(), std::back_inserter(filtered_keys), [](const auto& kv){return kv.second.first;}); + std::transform( + latest_parquets.begin(), latest_parquets.end(), std::back_inserter(filtered_keys), [](const auto & kv) { return kv.second.first; }); std::string new_query; - - for (auto&& key : filtered_keys) { - if (!new_query.empty()) { + + for (auto && key : filtered_keys) + { + if (!new_query.empty()) + { new_query += ","; } new_query += key; @@ -243,7 +244,9 @@ std::string StorageHudi::generateQueryFromKeys(std::vector&& keys) void registerStorageHudi(StorageFactory & factory) { - factory.registerStorage("Hudi", [](const StorageFactory::Arguments & args) + factory.registerStorage( + "Hudi", + [](const StorageFactory::Arguments & args) { auto & engine_args = args.engine_args; if (engine_args.empty()) @@ -269,11 +272,10 @@ void registerStorageHudi(StorageFactory & factory) args.getContext()); }, { - .supports_settings = true, - .supports_schema_inference = true, - .source_access_type = AccessType::S3, - }); + .supports_settings = true, + .supports_schema_inference = true, + .source_access_type = AccessType::S3, + }); } } - diff --git a/src/Storages/StorageHudi.h b/src/Storages/StorageHudi.h index 61e2af1340d..dd5cc18495e 100644 --- a/src/Storages/StorageHudi.h +++ b/src/Storages/StorageHudi.h @@ -5,30 +5,31 @@ #include #include -namespace Poco { - class Logger; +namespace Poco +{ +class Logger; } namespace Aws::S3 { - class S3Client; +class S3Client; } namespace DB { -class StorageHudi : public IStorage { +class StorageHudi : public IStorage +{ public: StorageHudi( - const S3::URI& uri_, - const String& access_key_, - const String& secret_access_key_, + const S3::URI & uri_, + const String & access_key_, + const String & secret_access_key_, const StorageID & table_id_, - const ColumnsDescription & columns_, + ColumnsDescription columns_, const ConstraintsDescription & constraints_, const String & comment, - ContextPtr context_ - ); + ContextPtr context_); String getName() const override { return "Hudi"; } @@ -46,7 +47,7 @@ private: private: std::vector getKeysFromS3(); - std::string generateQueryFromKeys(std::vector&& keys); + std::string generateQueryFromKeys(std::vector && keys); private: StorageS3::S3Configuration base_configuration;