Add comments

This commit is contained in:
kssenii 2024-11-20 17:32:46 +01:00
parent 599d977d42
commit fd5023e2a6
4 changed files with 94 additions and 69 deletions

View File

@ -189,6 +189,9 @@
M(BuildVectorSimilarityIndexThreads, "Number of threads in the build vector similarity index thread pool.") \ M(BuildVectorSimilarityIndexThreads, "Number of threads in the build vector similarity index thread pool.") \
M(BuildVectorSimilarityIndexThreadsActive, "Number of threads in the build vector similarity index thread pool running a task.") \ M(BuildVectorSimilarityIndexThreadsActive, "Number of threads in the build vector similarity index thread pool running a task.") \
M(BuildVectorSimilarityIndexThreadsScheduled, "Number of queued or active jobs in the build vector similarity index thread pool.") \ M(BuildVectorSimilarityIndexThreadsScheduled, "Number of queued or active jobs in the build vector similarity index thread pool.") \
M(IcebergCatalogThreads, "Number of threads in the IcebergCatalog thread pool.") \
M(IcebergCatalogThreadsActive, "Number of threads in the IcebergCatalog thread pool running a task.") \
M(IcebergCatalogThreadsScheduled, "Number of queued or active jobs in the IcebergCatalog thread pool.") \
\ \
M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \ M(DiskPlainRewritableAzureDirectoryMapSize, "Number of local-to-remote path entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \
M(DiskPlainRewritableAzureFileCount, "Number of file entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \ M(DiskPlainRewritableAzureFileCount, "Number of file entries in the 'plain_rewritable' in-memory map for AzureObjectStorage.") \

View File

@ -25,9 +25,9 @@
namespace CurrentMetrics namespace CurrentMetrics
{ {
extern const Metric MergeTreeDataSelectExecutorThreads; extern const Metric IcebergCatalogThreads;
extern const Metric MergeTreeDataSelectExecutorThreadsActive; extern const Metric IcebergCatalogThreadsActive;
extern const Metric MergeTreeDataSelectExecutorThreadsScheduled; extern const Metric IcebergCatalogThreadsScheduled;
} }
@ -236,9 +236,9 @@ DatabaseTablesIteratorPtr DatabaseIceberg::getTablesIterator(
auto iceberg_tables = catalog->getTables(); auto iceberg_tables = catalog->getTables();
size_t num_threads = std::min<size_t>(10, iceberg_tables.size()); size_t num_threads = std::min<size_t>(10, iceberg_tables.size());
ThreadPool pool( ThreadPool pool(
CurrentMetrics::MergeTreeDataSelectExecutorThreads, CurrentMetrics::IcebergCatalogThreads,
CurrentMetrics::MergeTreeDataSelectExecutorThreadsActive, CurrentMetrics::IcebergCatalogThreadsActive,
CurrentMetrics::MergeTreeDataSelectExecutorThreadsScheduled, CurrentMetrics::IcebergCatalogThreadsScheduled,
num_threads); num_threads);
DB::ThreadPoolCallbackRunnerLocal<void> runner(pool, "RestCatalog"); DB::ThreadPoolCallbackRunnerLocal<void> runner(pool, "RestCatalog");

View File

@ -26,9 +26,9 @@ namespace DB::ErrorCodes
namespace CurrentMetrics namespace CurrentMetrics
{ {
extern const Metric MergeTreeDataSelectExecutorThreads; extern const Metric IcebergCatalogThreads;
extern const Metric MergeTreeDataSelectExecutorThreadsActive; extern const Metric IcebergCatalogThreadsActive;
extern const Metric MergeTreeDataSelectExecutorThreadsScheduled; extern const Metric IcebergCatalogThreadsScheduled;
} }
namespace Iceberg namespace Iceberg
@ -37,20 +37,14 @@ namespace Iceberg
static constexpr auto config_endpoint = "config"; static constexpr auto config_endpoint = "config";
static constexpr auto namespaces_endpoint = "namespaces"; static constexpr auto namespaces_endpoint = "namespaces";
std::string RestCatalog::Config::toString() const namespace
{ {
DB::WriteBufferFromOwnString wb;
if (!prefix.empty()) std::pair<std::string, std::string> parseCatalogCredential(const std::string & catalog_credential)
wb << "prefix: " << prefix.string() << ", ";
if (!default_base_location.empty())
wb << "default_base_location: " << default_base_location << ", ";
return wb.str();
}
static std::pair<std::string, std::string> parseCatalogCredential(const std::string & catalog_credential)
{ {
/// Parse a string of format "<client_id>:<client_secret>"
/// into separare strings client_id and client_secret.
std::string client_id, client_secret; std::string client_id, client_secret;
if (!catalog_credential.empty()) if (!catalog_credential.empty())
{ {
@ -67,6 +61,33 @@ static std::pair<std::string, std::string> parseCatalogCredential(const std::str
return std::pair(client_id, client_secret); return std::pair(client_id, client_secret);
} }
DB::HTTPHeaderEntry parseAuthHeader(const std::string & auth_header)
{
/// Parse a string of format "Authorization: <auth_scheme> <auth_token>"
/// into a key-value header "Authorization", "<auth_scheme> <auth_token>"
auto pos = auth_header.find(':');
if (pos == std::string::npos)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected format of auth header");
return DB::HTTPHeaderEntry(auth_header.substr(0, pos), auth_header.substr(pos + 1));
}
}
std::string RestCatalog::Config::toString() const
{
DB::WriteBufferFromOwnString wb;
if (!prefix.empty())
wb << "prefix: " << prefix.string() << ", ";
if (!default_base_location.empty())
wb << "default_base_location: " << default_base_location << ", ";
return wb.str();
}
RestCatalog::RestCatalog( RestCatalog::RestCatalog(
const std::string & warehouse_, const std::string & warehouse_,
const std::string & base_url_, const std::string & base_url_,
@ -75,18 +96,14 @@ RestCatalog::RestCatalog(
DB::ContextPtr context_) DB::ContextPtr context_)
: ICatalog(warehouse_) : ICatalog(warehouse_)
, DB::WithContext(context_) , DB::WithContext(context_)
, log(getLogger("RestCatalog(" + warehouse_ + ")"))
, base_url(base_url_) , base_url(base_url_)
, log(getLogger("RestCatalog(" + warehouse_ + ")"))
{ {
std::tie(client_id, client_secret) = parseCatalogCredential(catalog_credential_); if (!catalog_credential_.empty())
if (!auth_header_.empty()) std::tie(client_id, client_secret) = parseCatalogCredential(catalog_credential_);
{ else if (!auth_header_.empty())
auto pos = auth_header_.find(':'); auth_header = parseAuthHeader(auth_header_);
if (pos == std::string::npos)
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Unexpected format of auth header");
auth_header = DB::HTTPHeaderEntry(auth_header_.substr(0, pos), auth_header_.substr(pos + 1));
}
config = loadConfig(); config = loadConfig();
} }
@ -98,7 +115,7 @@ RestCatalog::Config RestCatalog::loadConfig()
std::string json_str; std::string json_str;
readJSONObjectPossiblyInvalid(json_str, *buf); readJSONObjectPossiblyInvalid(json_str, *buf);
LOG_TEST(log, "Received config result: {}", json_str); LOG_TEST(log, "Received catalog configuration settings: {}", json_str);
Poco::JSON::Parser parser; Poco::JSON::Parser parser;
Poco::Dynamic::Var json = parser.parse(json_str); Poco::Dynamic::Var json = parser.parse(json_str);
@ -107,16 +124,16 @@ RestCatalog::Config RestCatalog::loadConfig()
Config result; Config result;
auto defaults_object = object->get("defaults").extract<Poco::JSON::Object::Ptr>(); auto defaults_object = object->get("defaults").extract<Poco::JSON::Object::Ptr>();
parseConfig(defaults_object, result); parseCatalogConfigurationSettings(defaults_object, result);
auto overrides_object = object->get("overrides").extract<Poco::JSON::Object::Ptr>(); auto overrides_object = object->get("overrides").extract<Poco::JSON::Object::Ptr>();
parseConfig(overrides_object, result); parseCatalogConfigurationSettings(overrides_object, result);
LOG_TEST(log, "Parsed config: {}", result.toString()); LOG_TEST(log, "Parsed catalog configuration settings: {}", result.toString());
return result; return result;
} }
void RestCatalog::parseConfig(const Poco::JSON::Object::Ptr & object, Config & result) void RestCatalog::parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result)
{ {
if (!object) if (!object)
return; return;
@ -135,34 +152,35 @@ DB::HTTPHeaderEntries RestCatalog::getHeaders(bool update_token) const
return DB::HTTPHeaderEntries{auth_header.value()}; return DB::HTTPHeaderEntries{auth_header.value()};
} }
if (!access_token.has_value() || update_token) if (!client_id.empty())
{ {
access_token = retrieveAccessToken(); if (!access_token.has_value() || update_token)
{
access_token = retrieveAccessToken();
}
DB::HTTPHeaderEntries headers;
headers.emplace_back("Authorization", "Bearer " + access_token.value());
return headers;
} }
DB::HTTPHeaderEntries headers; return {};
headers.emplace_back("Authorization", "Bearer " + access_token.value());
return headers;
} }
std::string RestCatalog::retrieveAccessToken() const std::string RestCatalog::retrieveAccessToken() const
{ {
static constexpr auto oauth_tokens_endpoint = "oauth/tokens"; static constexpr auto oauth_tokens_endpoint = "oauth/tokens";
const auto & context = getContext(); /// TODO:
const auto timeouts = DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings()); /// 1. support oauth2-server-uri
/// https://github.com/apache/iceberg/blob/918f81f3c3f498f46afcea17c1ac9cdc6913cb5c/open-api/rest-catalog-open-api.yaml#L183C82-L183C99
Poco::JSON::Object json; Poco::JSON::Object json;
json.set("grant_type", "client_credentials"); json.set("grant_type", "client_credentials");
json.set("scope", "PRINCIPAL_ROLE:ALL"); json.set("scope", "PRINCIPAL_ROLE:ALL"); /// TODO: add it into setting.
json.set("client_id", client_id); json.set("client_id", client_id);
json.set("client_secret", client_secret); json.set("client_secret", client_secret);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
std::string json_str = oss.str();
DB::HTTPHeaderEntries headers; DB::HTTPHeaderEntries headers;
headers.emplace_back("Content-Type", "application/x-www-form-urlencoded"); headers.emplace_back("Content-Type", "application/x-www-form-urlencoded");
headers.emplace_back("Accepts", "application/json; charset=UTF-8"); headers.emplace_back("Accepts", "application/json; charset=UTF-8");
@ -176,23 +194,20 @@ std::string RestCatalog::retrieveAccessToken() const
}; };
url.setQueryParameters(params); url.setQueryParameters(params);
LOG_TEST(log, "Writing {}: {}", url.toString(), json_str); const auto & context = getContext();
auto wb = DB::BuilderRWBufferFromHTTP(url) auto wb = DB::BuilderRWBufferFromHTTP(url)
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
.withMethod(Poco::Net::HTTPRequest::HTTP_POST) .withMethod(Poco::Net::HTTPRequest::HTTP_POST)
.withSettings(getContext()->getReadSettings()) .withSettings(context->getReadSettings())
.withTimeouts(timeouts) .withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings()))
.withHostFilter(&getContext()->getRemoteHostFilter()) .withHostFilter(&context->getRemoteHostFilter())
.withSkipNotFound(false) .withSkipNotFound(false)
.withHeaders(headers) .withHeaders(headers)
.create(credentials); .create(credentials);
json_str.clear(); std::string json_str;
readJSONObjectPossiblyInvalid(json_str, *wb); readJSONObjectPossiblyInvalid(json_str, *wb);
LOG_TEST(log, "Received token result: {}", json_str);
Poco::JSON::Parser parser; Poco::JSON::Parser parser;
Poco::Dynamic::Var res_json = parser.parse(json_str); Poco::Dynamic::Var res_json = parser.parse(json_str);
const Poco::JSON::Object::Ptr & object = res_json.extract<Poco::JSON::Object::Ptr>(); const Poco::JSON::Object::Ptr & object = res_json.extract<Poco::JSON::Object::Ptr>();
@ -212,7 +227,6 @@ DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer(
const Poco::URI::QueryParameters & params) const const Poco::URI::QueryParameters & params) const
{ {
const auto & context = getContext(); const auto & context = getContext();
const auto timeouts = DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings());
Poco::URI url(base_url / endpoint); Poco::URI url(base_url / endpoint);
if (!params.empty()) if (!params.empty())
@ -227,7 +241,7 @@ DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer(
return DB::BuilderRWBufferFromHTTP(url) return DB::BuilderRWBufferFromHTTP(url)
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
.withSettings(getContext()->getReadSettings()) .withSettings(getContext()->getReadSettings())
.withTimeouts(timeouts) .withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings()))
.withHeaders(headers) .withHeaders(headers)
.withHostFilter(&getContext()->getRemoteHostFilter()) .withHostFilter(&getContext()->getRemoteHostFilter())
.withDelayInit(false) .withDelayInit(false)
@ -240,7 +254,7 @@ DB::ReadWriteBufferFromHTTPPtr RestCatalog::createReadBuffer(
return DB::BuilderRWBufferFromHTTP(url) return DB::BuilderRWBufferFromHTTP(url)
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP) .withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
.withSettings(getContext()->getReadSettings()) .withSettings(getContext()->getReadSettings())
.withTimeouts(timeouts) .withTimeouts(DB::ConnectionTimeouts::getHTTPTimeouts(context->getSettingsRef(), context->getServerSettings()))
.withHeaders(new_headers) .withHeaders(new_headers)
.withHostFilter(&getContext()->getRemoteHostFilter()) .withHostFilter(&getContext()->getRemoteHostFilter())
.withDelayInit(false) .withDelayInit(false)
@ -277,9 +291,9 @@ RestCatalog::Tables RestCatalog::getTables() const
{ {
size_t num_threads = 10; size_t num_threads = 10;
ThreadPool pool( ThreadPool pool(
CurrentMetrics::MergeTreeDataSelectExecutorThreads, CurrentMetrics::IcebergCatalogThreads,
CurrentMetrics::MergeTreeDataSelectExecutorThreadsActive, CurrentMetrics::IcebergCatalogThreadsActive,
CurrentMetrics::MergeTreeDataSelectExecutorThreadsScheduled, CurrentMetrics::IcebergCatalogThreadsScheduled,
num_threads); num_threads);
DB::ThreadPoolCallbackRunnerLocal<void> runner(pool, "RestCatalog"); DB::ThreadPoolCallbackRunnerLocal<void> runner(pool, "RestCatalog");

View File

@ -46,25 +46,33 @@ public:
std::optional<StorageType> getStorageType() const override; std::optional<StorageType> getStorageType() const override;
private: private:
LoggerPtr log;
struct Config struct Config
{ {
/// Prefix is a path of the catalog enpoint,
/// e.g. /v1/{prefix}/namespaces/{namespace}/tables/{table}
std::filesystem::path prefix; std::filesystem::path prefix;
/// Base location is location of data in storage
/// (in filesystem or object storage).
std::string default_base_location; std::string default_base_location;
std::string toString() const; std::string toString() const;
}; };
const std::filesystem::path base_url; const std::filesystem::path base_url;
std::optional<DB::HTTPHeaderEntry> auth_header; const LoggerPtr log;
mutable std::optional<std::string> access_token;
std::string client_id; /// Catalog configuration settings from /v1/config endpoint.
std::string client_secret;
Config config; Config config;
Poco::Net::HTTPBasicCredentials credentials{}; /// Auth headers of format: "Authorization": "<auth_scheme> <token>"
std::optional<DB::HTTPHeaderEntry> auth_header;
/// Parameters for OAuth.
std::string client_id;
std::string client_secret;
mutable std::optional<std::string> access_token;
Poco::Net::HTTPBasicCredentials credentials{};
DB::ReadWriteBufferFromHTTPPtr createReadBuffer( DB::ReadWriteBufferFromHTTPPtr createReadBuffer(
const std::string & endpoint, const std::string & endpoint,
@ -97,7 +105,7 @@ private:
Config loadConfig(); Config loadConfig();
std::string retrieveAccessToken() const; std::string retrieveAccessToken() const;
DB::HTTPHeaderEntries getHeaders(bool update_token = false) const; DB::HTTPHeaderEntries getHeaders(bool update_token = false) const;
static void parseConfig(const Poco::JSON::Object::Ptr & object, Config & result); static void parseCatalogConfigurationSettings(const Poco::JSON::Object::Ptr & object, Config & result);
}; };
} }