diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index d073e477593..33f0e10156f 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -189,6 +189,9 @@ M(BuildVectorSimilarityIndexThreads, "Number of threads in the build vector similarity index thread pool.") \ M(BuildVectorSimilarityIndexThreadsActive, "Number of threads in the build vector similarity index thread pool running a task.") \ M(BuildVectorSimilarityIndexThreadsScheduled, "Number of queued or active jobs in the build vector similarity index thread pool.") \ + M(IcebergCatalogThreads, "Number of threads in the IcebergCatalog thread pool.") \ + M(IcebergCatalogThreadsActive, "Number of threads in the IcebergCatalog thread pool running a task.") \ + M(IcebergCatalogThreadsScheduled, "Number of queued or active jobs in the IcebergCatalog thread pool.") \ \ M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \ M(DiskPlainRewritableAzureFileCount, "Number of file entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \ diff --git a/src/Databases/Iceberg/DatabaseIceberg.cpp b/src/Databases/Iceberg/DatabaseIceberg.cpp index 6241c0d8423..c672d805e5e 100644 --- a/src/Databases/Iceberg/DatabaseIceberg.cpp +++ b/src/Databases/Iceberg/DatabaseIceberg.cpp @@ -25,9 +25,9 @@ namespace CurrentMetrics { - extern const Metric MergeTreeDataSelectExecutorThreads; - extern const Metric MergeTreeDataSelectExecutorThreadsActive; - extern const Metric MergeTreeDataSelectExecutorThreadsScheduled; + extern const Metric IcebergCatalogThreads; + extern const Metric IcebergCatalogThreadsActive; + extern const Metric IcebergCatalogThreadsScheduled; } @@ -236,9 +236,9 @@ DatabaseTablesIteratorPtr DatabaseIceberg::getTablesIterator( auto iceberg_tables = catalog->getTables(); size_t num_threads = std::min(10, iceberg_tables.size()); ThreadPool pool( - CurrentMetrics::MergeTreeDataSelectExecutorThreads, - CurrentMetrics::MergeTreeDataSelectExecutorThreadsActive, - CurrentMetrics::MergeTreeDataSelectExecutorThreadsScheduled, + CurrentMetrics::IcebergCatalogThreads, + CurrentMetrics::IcebergCatalogThreadsActive, + CurrentMetrics::IcebergCatalogThreadsScheduled, num_threads); DB::ThreadPoolCallbackRunnerLocal runner(pool, "RestCatalog"); diff --git a/src/Databases/Iceberg/RestCatalog.cpp b/src/Databases/Iceberg/RestCatalog.cpp index d336c4af707..0fe9e90e46f 100644 --- a/src/Databases/Iceberg/RestCatalog.cpp +++ b/src/Databases/Iceberg/RestCatalog.cpp @@ -26,9 +26,9 @@ namespace DB::ErrorCodes namespace CurrentMetrics { - extern const Metric MergeTreeDataSelectExecutorThreads; - extern const Metric MergeTreeDataSelectExecutorThreadsActive; - extern const Metric MergeTreeDataSelectExecutorThreadsScheduled; + extern const Metric IcebergCatalogThreads; + extern const Metric IcebergCatalogThreadsActive; + extern const Metric IcebergCatalogThreadsScheduled; } namespace Iceberg @@ -37,20 +37,14 @@ namespace Iceberg static constexpr auto config_endpoint = "config"; static constexpr auto namespaces_endpoint = "namespaces"; -std::string RestCatalog::Config::toString() const +namespace { - DB::WriteBufferFromOwnString wb; - if (!prefix.empty()) - wb << "prefix: " << prefix.string() << ", "; - if (!default_base_location.empty()) - wb << "default_base_location: " << default_base_location << ", "; - - return wb.str(); -} - -static std::pair parseCatalogCredential(const std::string & catalog_credential) +std::pair parseCatalogCredential(const std::string & catalog_credential) { + /// Parse a string of format ":" + /// into separare strings client_id and client_secret. + std::string client_id, client_secret; if (!catalog_credential.empty()) { @@ -67,6 +61,33 @@ static std::pair parseCatalogCredential(const std::str return std::pair(client_id, client_secret); } +DB::HTTPHeaderEntry parseAuthHeader(const std::string & auth_header) +{ + /// Parse a string of format "Authorization: " + /// into a key-value header "Authorization", " " + + auto pos = auth_header.find(':'); + if (pos == std::string::npos) + throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected format of auth header"); + + return DB::HTTPHeaderEntry(auth_header.substr(0, pos), auth_header.substr(pos + 1)); +} + +} + +std::string RestCatalog::Config::toString() const +{ + DB::WriteBufferFromOwnString wb; + + if (!prefix.empty()) + wb << "prefix: " << prefix.string() << ", "; + + if (!default_base_location.empty()) + wb << "default_base_location: " << default_base_location << ", "; + + return wb.str(); +} + RestCatalog::RestCatalog( const std::string & warehouse_, const std::string & base_url_, @@ -75,18 +96,14 @@ RestCatalog::RestCatalog( DB::ContextPtr context_) : ICatalog(warehouse_) , DB::WithContext(context_) - , log(getLogger("RestCatalog(" + warehouse_ + ")")) , base_url(base_url_) + , log(getLogger("RestCatalog(" + warehouse_ + ")")) { - std::tie(client_id, client_secret) = parseCatalogCredential(catalog_credential_); - if (!auth_header_.empty()) - { - auto pos = auth_header_.find(':'); - if (pos == std::string::npos) - throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected format of auth header"); + if (!catalog_credential_.empty()) + std::tie(client_id, client_secret) = parseCatalogCredential(catalog_credential_); + else if (!auth_header_.empty()) + auth_header = parseAuthHeader(auth_header_); - auth_header = DB::HTTPHeaderEntry(auth_header_.substr(0, pos), auth_header_.substr(pos + 1)); - } config = loadConfig(); } @@ -98,7 +115,7 @@ RestCatalog::Config RestCatalog::loadConfig() std::string json_str; readJSONObjectPossiblyInvalid(json_str, *buf); - LOG_TEST(log, "Received config result: {}", json_str); + LOG_TEST(log, "Received catalog configuration settings: {}", json_str); Poco::JSON::Parser parser; Poco::Dynamic::Var json = parser.parse(json_str); @@ -107,16 +124,16 @@ RestCatalog::Config RestCatalog::loadConfig() Config result; auto defaults_object = object->get("defaults").extract(); - parseConfig(defaults_object, result); + parseCatalogConfigurationSettings(defaults_object, result); auto overrides_object = object->get("overrides").extract(); - parseConfig(overrides_object, result); + parseCatalogConfigurationSettings(overrides_object, result); - LOG_TEST(log, "Parsed config: {}", result.toString()); + LOG_TEST(log, "Parsed catalog configuration settings: {}", result.toString()); return result; } -void RestCatalog::parseConfig(const Poco::JSON::Object::Ptr & object, Config & result) +void RestCatalog::parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result) { if (!object) return; @@ -135,34 +152,35 @@ DB::HTTPHeaderEntries RestCatalog::getHeaders(bool update_token) const return DB::HTTPHeaderEntries{auth_header.value()}; } - if (!access_token.has_value() || update_token) + if (!client_id.empty()) { - access_token = retrieveAccessToken(); + if (!access_token.has_value() || update_token) + { + access_token = retrieveAccessToken(); + } + + DB::HTTPHeaderEntries headers; + headers.emplace_back("Authorization", "Bearer " + access_token.value()); + return headers; } - DB::HTTPHeaderEntries headers; - headers.emplace_back("Authorization", "Bearer " + access_token.value()); - return headers; + return {}; } std::string RestCatalog::retrieveAccessToken() const { static constexpr auto oauth_tokens_endpoint = "oauth/tokens"; - const auto & context = getContext(); - const auto timeouts = DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings()); + /// TODO: + /// 1. support oauth2-server-uri + /// https://github.com/apache/iceberg/blob/918f81f3c3f498f46afcea17c1ac9cdc6913cb5c/open-api/rest-catalog-open-api.yaml#L183C82-L183C99 Poco::JSON::Object json; json.set("grant_type", "client_credentials"); - json.set("scope", "PRINCIPAL_ROLE:ALL"); + json.set("scope", "PRINCIPAL_ROLE:ALL"); /// TODO: add it into setting. json.set("client_id", client_id); json.set("client_secret", client_secret); - std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM - oss.exceptions(std::ios::failbit); - Poco::JSON::Stringifier::stringify(json, oss); - std::string json_str = oss.str(); - DB::HTTPHeaderEntries headers; headers.emplace_back("Content-Type", "application/x-www-form-urlencoded"); headers.emplace_back("Accepts", "application/json; charset=UTF-8"); @@ -176,23 +194,20 @@ std::string RestCatalog::retrieveAccessToken() const }; url.setQueryParameters(params); - LOG_TEST(log, "Writing {}: {}", url.toString(), json_str); - + const auto & context = getContext(); auto wb = DB::BuilderRWBufferFromHTTP(url) .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) .withMethod(Poco::Net::HTTPRequest::HTTP_POST) - .withSettings(getContext()->getReadSettings()) - .withTimeouts(timeouts) - .withHostFilter(&getContext()->getRemoteHostFilter()) + .withSettings(context->getReadSettings()) + .withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings())) + .withHostFilter(&context->getRemoteHostFilter()) .withSkipNotFound(false) .withHeaders(headers) .create(credentials); - json_str.clear(); + std::string json_str; readJSONObjectPossiblyInvalid(json_str, *wb); - LOG_TEST(log, "Received token result: {}", json_str); - Poco::JSON::Parser parser; Poco::Dynamic::Var res_json = parser.parse(json_str); const Poco::JSON::Object::Ptr & object = res_json.extract(); @@ -212,7 +227,6 @@ DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer( const Poco::URI::QueryParameters & params) const { const auto & context = getContext(); - const auto timeouts = DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings()); Poco::URI url(base_url / endpoint); if (!params.empty()) @@ -227,7 +241,7 @@ DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer( return DB::BuilderRWBufferFromHTTP(url) .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) .withSettings(getContext()->getReadSettings()) - .withTimeouts(timeouts) + .withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings())) .withHeaders(headers) .withHostFilter(&getContext()->getRemoteHostFilter()) .withDelayInit(false) @@ -240,7 +254,7 @@ DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer( return DB::BuilderRWBufferFromHTTP(url) .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) .withSettings(getContext()->getReadSettings()) - .withTimeouts(timeouts) + .withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings())) .withHeaders(new_headers) .withHostFilter(&getContext()->getRemoteHostFilter()) .withDelayInit(false) @@ -277,9 +291,9 @@ RestCatalog::Tables RestCatalog::getTables() const { size_t num_threads = 10; ThreadPool pool( - CurrentMetrics::MergeTreeDataSelectExecutorThreads, - CurrentMetrics::MergeTreeDataSelectExecutorThreadsActive, - CurrentMetrics::MergeTreeDataSelectExecutorThreadsScheduled, + CurrentMetrics::IcebergCatalogThreads, + CurrentMetrics::IcebergCatalogThreadsActive, + CurrentMetrics::IcebergCatalogThreadsScheduled, num_threads); DB::ThreadPoolCallbackRunnerLocal runner(pool, "RestCatalog"); diff --git a/src/Databases/Iceberg/RestCatalog.h b/src/Databases/Iceberg/RestCatalog.h index 116b4253684..6c842e1dff7 100644 --- a/src/Databases/Iceberg/RestCatalog.h +++ b/src/Databases/Iceberg/RestCatalog.h @@ -46,25 +46,33 @@ public: std::optional getStorageType() const override; private: - LoggerPtr log; - struct Config { + /// Prefix is a path of the catalog enpoint, + /// e.g. /v1/{prefix}/namespaces/{namespace}/tables/{table} std::filesystem::path prefix; + /// Base location is location of data in storage + /// (in filesystem or object storage). std::string default_base_location; std::string toString() const; }; const std::filesystem::path base_url; - std::optional auth_header; - mutable std::optional access_token; - std::string client_id; - std::string client_secret; + const LoggerPtr log; + + /// Catalog configuration settings from /v1/config endpoint. Config config; - Poco::Net::HTTPBasicCredentials credentials{}; + /// Auth headers of format: "Authorization": " " + std::optional auth_header; + /// Parameters for OAuth. + std::string client_id; + std::string client_secret; + mutable std::optional access_token; + + Poco::Net::HTTPBasicCredentials credentials{}; DB::ReadWriteBufferFromHTTPPtr createReadBuffer( const std::string & endpoint, @@ -97,7 +105,7 @@ private: Config loadConfig(); std::string retrieveAccessToken() const; DB::HTTPHeaderEntries getHeaders(bool update_token = false) const; - static void parseConfig(const Poco::JSON::Object::Ptr & object, Config & result); + static void parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result); }; }