From d3517140d9c471411fba8d7d98a6f7da7bd48172 Mon Sep 17 00:00:00 2001 From: Jianfei Hu Date: Mon, 13 Nov 2023 19:03:27 +0000 Subject: [PATCH 1/8] Revert "Revert "Add /keeper/availability-zone node to allow server load balancing within AZ."" This reverts commit e5aad4a6fd308e49c16d36a4bb04462cc2574bb8. --- src/Common/ErrorCodes.cpp | 1 + src/Coordination/KeeperConstants.h | 1 + src/Coordination/KeeperContext.cpp | 10 +- src/Coordination/KeeperContext.h | 3 +- src/Coordination/KeeperDispatcher.cpp | 12 +- src/Coordination/KeeperStorage.cpp | 3 +- src/IO/S3/Credentials.cpp | 155 ++++++++++++++---- src/IO/S3/Credentials.h | 23 ++- tests/integration/helpers/keeper_config1.xml | 1 + tests/integration/helpers/keeper_config2.xml | 1 + .../test_keeper_availability_zone/__init__.py | 0 .../configs/keeper_config.xml | 2 + .../test_keeper_availability_zone/test.py | 31 ++++ .../test_keeper_four_word_command/test.py | 8 +- 14 files changed, 212 insertions(+), 39 deletions(-) create mode 100644 tests/integration/test_keeper_availability_zone/__init__.py create mode 100644 tests/integration/test_keeper_availability_zone/configs/keeper_config.xml create mode 100644 tests/integration/test_keeper_availability_zone/test.py diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 28f8e6c6021..8e0d7fabf6b 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -586,6 +586,7 @@ M(704, CANNOT_USE_QUERY_CACHE_WITH_NONDETERMINISTIC_FUNCTIONS) \ M(705, TABLE_NOT_EMPTY) \ M(706, LIBSSH_ERROR) \ + M(707, GCP_ERROR) \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ M(1001, STD_EXCEPTION) \ diff --git a/src/Coordination/KeeperConstants.h b/src/Coordination/KeeperConstants.h index 5a52fbf272b..08a7c85585a 100644 --- a/src/Coordination/KeeperConstants.h +++ b/src/Coordination/KeeperConstants.h @@ -17,5 +17,6 @@ const String keeper_system_path = "/keeper"; const String keeper_api_version_path = keeper_system_path + "/api_version"; const String keeper_api_feature_flags_path = keeper_system_path + "/feature_flags"; const String keeper_config_path = keeper_system_path + "/config"; +const String keeper_availability_zone_path = keeper_system_path + "/availability_zone"; } diff --git a/src/Coordination/KeeperContext.cpp b/src/Coordination/KeeperContext.cpp index 25bfb6c6384..1cee2a8e446 100644 --- a/src/Coordination/KeeperContext.cpp +++ b/src/Coordination/KeeperContext.cpp @@ -32,9 +32,17 @@ KeeperContext::KeeperContext(bool standalone_keeper_) system_nodes_with_data[keeper_api_version_path] = toString(static_cast(KeeperApiVersion::WITH_MULTI_READ)); } -void KeeperContext::initialize(const Poco::Util::AbstractConfiguration & config, KeeperDispatcher * dispatcher_) +void KeeperContext::initialize(const Poco::Util::AbstractConfiguration & config, KeeperDispatcher * dispatcher_, const std::string & environment_az) { dispatcher = dispatcher_; + + /// We only use the environment availability zone when configuration option is missing. + auto keeper_az = config.getString("keeper_server.availability_zone", environment_az); + if (!keeper_az.empty()) + system_nodes_with_data[keeper_availability_zone_path] = keeper_az; + LOG_INFO(&Poco::Logger::get("KeeperContext"), + "Initialize the KeeperContext with availability zone: '{}', environment availability zone '{}'. ", keeper_az, environment_az); + digest_enabled = config.getBool("keeper_server.digest_enabled", false); ignore_system_path_on_startup = config.getBool("keeper_server.ignore_system_path_on_startup", false); diff --git a/src/Coordination/KeeperContext.h b/src/Coordination/KeeperContext.h index 2485f3d6c55..71f00ce5a93 100644 --- a/src/Coordination/KeeperContext.h +++ b/src/Coordination/KeeperContext.h @@ -3,7 +3,6 @@ #include #include #include - #include #include @@ -24,7 +23,7 @@ public: SHUTDOWN }; - void initialize(const Poco::Util::AbstractConfiguration & config, KeeperDispatcher * dispatcher_); + void initialize(const Poco::Util::AbstractConfiguration & config, KeeperDispatcher * dispatcher_, const std::string & environment_az); Phase getServerState() const; void setServerState(Phase server_state_); diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index ca454c18084..6d43445474d 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -370,7 +371,16 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf configuration_and_settings = KeeperConfigurationAndSettings::loadFromConfig(config, standalone_keeper); keeper_context = std::make_shared(standalone_keeper); - keeper_context->initialize(config, this); + String availability_zone; + try + { + availability_zone = DB::S3::getRunningAvailabilityZone(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + keeper_context->initialize(config, this, availability_zone); requests_queue = std::make_unique(configuration_and_settings->coordination_settings->max_request_queue_size); request_thread = ThreadFromGlobalPool([this] { requestThread(); }); diff --git a/src/Coordination/KeeperStorage.cpp b/src/Coordination/KeeperStorage.cpp index a4bb65a2a72..0d1d07ec7c5 100644 --- a/src/Coordination/KeeperStorage.cpp +++ b/src/Coordination/KeeperStorage.cpp @@ -1081,7 +1081,8 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce Coordination::ZooKeeperGetRequest & request = dynamic_cast(*zk_request); if (request.path == Coordination::keeper_api_feature_flags_path - || request.path == Coordination::keeper_config_path) + || request.path == Coordination::keeper_config_path + || request.path == Coordination::keeper_availability_zone_path) return {}; if (!storage.uncommitted_state.getNode(request.path)) diff --git a/src/IO/S3/Credentials.cpp b/src/IO/S3/Credentials.cpp index 1eea167e1b9..eb87568c8ab 100644 --- a/src/IO/S3/Credentials.cpp +++ b/src/IO/S3/Credentials.cpp @@ -1,4 +1,9 @@ +#include +#include #include +#include +#include +#include "Common/Exception.h" #if USE_AWS_S3 @@ -11,6 +16,7 @@ # include # include +# include # include # include @@ -22,6 +28,16 @@ # include # include +#include +#include + + +#include +#include +#include +#include +#include + namespace DB { @@ -29,6 +45,8 @@ namespace DB namespace ErrorCodes { extern const int AWS_ERROR; + extern const int GCP_ERROR; + extern const int UNSUPPORTED_METHOD; } namespace S3 @@ -151,30 +169,6 @@ Aws::String AWSEC2MetadataClient::getDefaultCredentialsSecurely() const return GetResourceWithAWSWebServiceResult(credentials_request).GetPayload(); } -Aws::String AWSEC2MetadataClient::getCurrentAvailabilityZone() const -{ - String user_agent_string = awsComputeUserAgentString(); - auto [new_token, response_code] = getEC2MetadataToken(user_agent_string); - if (response_code != Aws::Http::HttpResponseCode::OK || new_token.empty()) - throw DB::Exception(ErrorCodes::AWS_ERROR, - "Failed to make token request. HTTP response code: {}", response_code); - - token = std::move(new_token); - const String url = endpoint + EC2_AVAILABILITY_ZONE_RESOURCE; - std::shared_ptr profile_request( - Aws::Http::CreateHttpRequest(url, Aws::Http::HttpMethod::HTTP_GET, Aws::Utils::Stream::DefaultResponseStreamFactoryMethod)); - - profile_request->SetHeaderValue(EC2_IMDS_TOKEN_HEADER, token); - profile_request->SetUserAgent(user_agent_string); - - const auto result = GetResourceWithAWSWebServiceResult(profile_request); - if (result.GetResponseCode() != Aws::Http::HttpResponseCode::OK) - throw DB::Exception(ErrorCodes::AWS_ERROR, - "Failed to get availability zone. HTTP response code: {}", result.GetResponseCode()); - - return Aws::Utils::StringUtils::Trim(result.GetPayload().c_str()); -} - std::pair AWSEC2MetadataClient::getEC2MetadataToken(const std::string & user_agent_string) const { std::lock_guard locker(token_mutex); @@ -199,10 +193,10 @@ Aws::String AWSEC2MetadataClient::getCurrentRegion() const return Aws::Region::AWS_GLOBAL; } -std::shared_ptr InitEC2MetadataClient(const Aws::Client::ClientConfiguration & client_configuration) +static Aws::String getAWSMetadataEndpoint() { - Aws::String ec2_metadata_service_endpoint = Aws::Environment::GetEnv("AWS_EC2_METADATA_SERVICE_ENDPOINT"); auto * logger = &Poco::Logger::get("AWSEC2InstanceProfileConfigLoader"); + Aws::String ec2_metadata_service_endpoint = Aws::Environment::GetEnv("AWS_EC2_METADATA_SERVICE_ENDPOINT"); if (ec2_metadata_service_endpoint.empty()) { Aws::String ec2_metadata_service_endpoint_mode = Aws::Environment::GetEnv("AWS_EC2_METADATA_SERVICE_ENDPOINT_MODE"); @@ -233,8 +227,95 @@ std::shared_ptr InitEC2MetadataClient(const Aws::Client::C } } } - LOG_INFO(logger, "Using IMDS endpoint: {}", ec2_metadata_service_endpoint); - return std::make_shared(client_configuration, ec2_metadata_service_endpoint.c_str()); + return ec2_metadata_service_endpoint; +} + +std::shared_ptr InitEC2MetadataClient(const Aws::Client::ClientConfiguration & client_configuration) +{ + auto endpoint = getAWSMetadataEndpoint(); + return std::make_shared(client_configuration, endpoint.c_str()); +} + +String AWSEC2MetadataClient::getAvailabilityZoneOrException() +{ + Poco::URI uri(getAWSMetadataEndpoint() + EC2_AVAILABILITY_ZONE_RESOURCE); + Poco::Net::HTTPClientSession session(uri.getHost(), uri.getPort()); + + Poco::Net::HTTPResponse response; + Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, uri.getPath()); + session.sendRequest(request); + + std::istream & rs = session.receiveResponse(response); + if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK) + throw DB::Exception(ErrorCodes::AWS_ERROR, "Failed to get AWS availability zone. HTTP response code: {}", response.getStatus()); + String response_data; + Poco::StreamCopier::copyToString(rs, response_data); + return response_data; +} + +String getGCPAvailabilityZoneOrException() +{ + Poco::URI uri(String(GCP_METADATA_SERVICE_ENDPOINT) + "/computeMetadata/v1/instance/zone"); + Poco::Net::HTTPClientSession session(uri.getHost(), uri.getPort()); + Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, uri.getPath()); + Poco::Net::HTTPResponse response; + request.set("Metadata-Flavor", "Google"); + session.sendRequest(request); + std::istream & rs = session.receiveResponse(response); + if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK) + throw DB::Exception(ErrorCodes::GCP_ERROR, "Failed to get GCP availability zone. HTTP response code: {}", response.getStatus()); + String response_data; + Poco::StreamCopier::copyToString(rs, response_data); + Strings zone_info; + boost::split(zone_info, response_data, boost::is_any_of("/")); + /// We expect GCP returns a string as "projects/123456789/zones/us-central1a". + if (zone_info.size() != 4) + throw DB::Exception(ErrorCodes::GCP_ERROR, "Invalid format of GCP zone information, expect projects//zones/, got {}", response_data); + return zone_info[3]; +} + +String getRunningAvailabilityZoneImpl() +{ + LOG_INFO(&Poco::Logger::get("Application"), "Trying to detect the availability zone."); + try + { + auto aws_az = AWSEC2MetadataClient::getAvailabilityZoneOrException(); + return aws_az; + } + catch (const DB::Exception & aws_ex) + { + try + { + auto gcp_zone = getGCPAvailabilityZoneOrException(); + return gcp_zone; + } + catch (const DB::Exception & gcp_ex) + { + throw DB::Exception(ErrorCodes::UNSUPPORTED_METHOD, + "Failed to find the availability zone, tried AWS and GCP. AWS Error: {}\nGCP Error: {}", aws_ex.displayText(), gcp_ex.displayText()); + } + } +} + +std::variant getRunningAvailabilityZoneImplOrException() +{ + try + { + return getRunningAvailabilityZoneImpl(); + } + catch (...) + { + return std::current_exception(); + } +} + +String getRunningAvailabilityZone() +{ + static auto az_or_exception = getRunningAvailabilityZoneImplOrException(); + if (const auto * az = std::get_if(&az_or_exception)) + return *az; + else + std::rethrow_exception(std::get(az_or_exception)); } AWSEC2InstanceProfileConfigLoader::AWSEC2InstanceProfileConfigLoader(const std::shared_ptr & client_, bool use_secure_pull_) @@ -703,7 +784,6 @@ S3CredentialsProviderChain::S3CredentialsProviderChain( aws_client_configuration.requestTimeoutMs = 1000; aws_client_configuration.retryStrategy = std::make_shared(1, 1000); - auto ec2_metadata_client = InitEC2MetadataClient(aws_client_configuration); auto config_loader = std::make_shared(ec2_metadata_client, !credentials_configuration.use_insecure_imds_request); @@ -721,4 +801,21 @@ S3CredentialsProviderChain::S3CredentialsProviderChain( } +#else + +namespace DB +{ + +namespace S3 +{ + +String getRunningAvailabilityZone() +{ + throw Poco::Exception("Does not support availability zone detection for non-cloud environment"); +} + +} + +} + #endif diff --git a/src/IO/S3/Credentials.h b/src/IO/S3/Credentials.h index 0243e8e4986..a978679348f 100644 --- a/src/IO/S3/Credentials.h +++ b/src/IO/S3/Credentials.h @@ -1,5 +1,8 @@ #pragma once +#include +#include +#include #include "config.h" #if USE_AWS_S3 @@ -18,6 +21,12 @@ namespace DB::S3 inline static constexpr uint64_t DEFAULT_EXPIRATION_WINDOW_SECONDS = 120; +/// In GCP metadata service can be accessed via DNS regardless of IPv4 or IPv6. +static constexpr char GCP_METADATA_SERVICE_ENDPOINT[] = "http://metadata.google.internal"; + +/// getRunningAvailabilityZone returns the availability zone of the underlying compute resources where the current process runs. +String getRunningAvailabilityZone(); + class AWSEC2MetadataClient : public Aws::Internal::AWSHttpResourceClient { static constexpr char EC2_SECURITY_CREDENTIALS_RESOURCE[] = "/latest/meta-data/iam/security-credentials"; @@ -50,10 +59,11 @@ public: virtual Aws::String getCurrentRegion() const; - virtual Aws::String getCurrentAvailabilityZone() const; + friend String getRunningAvailabilityZoneImpl(); private: std::pair getEC2MetadataToken(const std::string & user_agent_string) const; + static String getAvailabilityZoneOrException(); const Aws::String endpoint; mutable std::recursive_mutex token_mutex; @@ -177,4 +187,15 @@ public: } +#else + +namespace DB +{ + +namespace S3 +{ +String getRunningAvailabilityZone(); +} + +} #endif diff --git a/tests/integration/helpers/keeper_config1.xml b/tests/integration/helpers/keeper_config1.xml index 7702aecba9c..29232989084 100644 --- a/tests/integration/helpers/keeper_config1.xml +++ b/tests/integration/helpers/keeper_config1.xml @@ -11,6 +11,7 @@ 2181 + az-zoo1 1 diff --git a/tests/integration/helpers/keeper_config2.xml b/tests/integration/helpers/keeper_config2.xml index 2a1a1c1003c..2601efbb313 100644 --- a/tests/integration/helpers/keeper_config2.xml +++ b/tests/integration/helpers/keeper_config2.xml @@ -12,6 +12,7 @@ 2181 2 + az-zoo2 10000 diff --git a/tests/integration/test_keeper_availability_zone/__init__.py b/tests/integration/test_keeper_availability_zone/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_keeper_availability_zone/configs/keeper_config.xml b/tests/integration/test_keeper_availability_zone/configs/keeper_config.xml new file mode 100644 index 00000000000..3cbf717bb67 --- /dev/null +++ b/tests/integration/test_keeper_availability_zone/configs/keeper_config.xml @@ -0,0 +1,2 @@ + + diff --git a/tests/integration/test_keeper_availability_zone/test.py b/tests/integration/test_keeper_availability_zone/test.py new file mode 100644 index 00000000000..1836f0e679b --- /dev/null +++ b/tests/integration/test_keeper_availability_zone/test.py @@ -0,0 +1,31 @@ +import pytest +from helpers.cluster import ClickHouseCluster +from helpers.keeper_utils import KeeperClient + + +cluster = ClickHouseCluster(__file__) + +node = cluster.add_instance( + "node", + main_configs=["configs/keeper_config.xml"], + with_zookeeper=True, + stay_alive=True, +) + + +@pytest.fixture(scope="module", autouse=True) +def started_cluster(): + try: + cluster.start() + yield cluster + + finally: + cluster.shutdown() + + +def test_get_availability_zone(): + with KeeperClient.from_cluster(cluster, "zoo1") as client1: + assert client1.get("/keeper/availability_zone") == "az-zoo1" + + with KeeperClient.from_cluster(cluster, "zoo2") as client2: + assert client2.get("/keeper/availability_zone") == "az-zoo2" diff --git a/tests/integration/test_keeper_four_word_command/test.py b/tests/integration/test_keeper_four_word_command/test.py index 71501133ae7..25c4bc55327 100644 --- a/tests/integration/test_keeper_four_word_command/test.py +++ b/tests/integration/test_keeper_four_word_command/test.py @@ -183,8 +183,8 @@ def test_cmd_mntr(started_cluster): # contains: # 10 nodes created by test # 3 nodes created by clickhouse "/clickhouse/task_queue/ddl" - # 1 root node, 3 keeper system nodes - assert int(result["zk_znode_count"]) == 14 + # 1 root node, 4 keeper system nodes + assert int(result["zk_znode_count"]) == 15 assert int(result["zk_watch_count"]) == 2 assert int(result["zk_ephemerals_count"]) == 2 assert int(result["zk_approximate_data_size"]) > 0 @@ -333,7 +333,7 @@ def test_cmd_srvr(started_cluster): assert int(result["Connections"]) == 1 assert int(result["Zxid"], 16) > 10 assert result["Mode"] == "leader" - assert result["Node count"] == "14" + assert result["Node count"] == "15" finally: destroy_zk_client(zk) @@ -373,7 +373,7 @@ def test_cmd_stat(started_cluster): assert int(result["Connections"]) == 1 assert int(result["Zxid"], 16) >= 10 assert result["Mode"] == "leader" - assert result["Node count"] == "14" + assert result["Node count"] == "15" # filter connection statistics cons = [n for n in data.split("\n") if "=" in n] From 554d9071898f13082576145e6c7409c1964ef2e6 Mon Sep 17 00:00:00 2001 From: Jianfei Hu Date: Mon, 13 Nov 2023 23:42:51 +0000 Subject: [PATCH 2/8] Fix the keeper_server availability zone configuration. Signed-off-by: Jianfei Hu --- src/Coordination/KeeperContext.cpp | 29 ++++++++++++++----- src/Coordination/KeeperContext.h | 2 +- src/Coordination/KeeperDispatcher.cpp | 12 +------- src/IO/S3/Credentials.cpp | 5 ++++ tests/integration/helpers/keeper_config1.xml | 4 ++- tests/integration/helpers/keeper_config2.xml | 4 ++- .../test_keeper_availability_zone/test.py | 5 ++++ 7 files changed, 40 insertions(+), 21 deletions(-) diff --git a/src/Coordination/KeeperContext.cpp b/src/Coordination/KeeperContext.cpp index 1cee2a8e446..9745a53d1ab 100644 --- a/src/Coordination/KeeperContext.cpp +++ b/src/Coordination/KeeperContext.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -32,16 +33,30 @@ KeeperContext::KeeperContext(bool standalone_keeper_) system_nodes_with_data[keeper_api_version_path] = toString(static_cast(KeeperApiVersion::WITH_MULTI_READ)); } -void KeeperContext::initialize(const Poco::Util::AbstractConfiguration & config, KeeperDispatcher * dispatcher_, const std::string & environment_az) +void KeeperContext::initialize(const Poco::Util::AbstractConfiguration & config, KeeperDispatcher * dispatcher_) { dispatcher = dispatcher_; - /// We only use the environment availability zone when configuration option is missing. - auto keeper_az = config.getString("keeper_server.availability_zone", environment_az); - if (!keeper_az.empty()) - system_nodes_with_data[keeper_availability_zone_path] = keeper_az; - LOG_INFO(&Poco::Logger::get("KeeperContext"), - "Initialize the KeeperContext with availability zone: '{}', environment availability zone '{}'. ", keeper_az, environment_az); + if (config.hasProperty("keeper_server.availability_zone")) + { + auto keeper_az = config.getString("keeper_server.availability_zone.value"); + if (config.getBool("keeper_server.availability_zone.enable_auto_detection_on_cloud", false)) + { + try + { + keeper_az = DB::S3::getRunningAvailabilityZone(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } + if (!keeper_az.empty()) + { + system_nodes_with_data[keeper_availability_zone_path] = keeper_az; + LOG_INFO(&Poco::Logger::get("KeeperContext"), "Initialize the KeeperContext with availability zone: '{}'.'. ", keeper_az); + } + } digest_enabled = config.getBool("keeper_server.digest_enabled", false); ignore_system_path_on_startup = config.getBool("keeper_server.ignore_system_path_on_startup", false); diff --git a/src/Coordination/KeeperContext.h b/src/Coordination/KeeperContext.h index 71f00ce5a93..1af34b19ccf 100644 --- a/src/Coordination/KeeperContext.h +++ b/src/Coordination/KeeperContext.h @@ -23,7 +23,7 @@ public: SHUTDOWN }; - void initialize(const Poco::Util::AbstractConfiguration & config, KeeperDispatcher * dispatcher_, const std::string & environment_az); + void initialize(const Poco::Util::AbstractConfiguration & config, KeeperDispatcher * dispatcher_); Phase getServerState() const; void setServerState(Phase server_state_); diff --git a/src/Coordination/KeeperDispatcher.cpp b/src/Coordination/KeeperDispatcher.cpp index 6d43445474d..ca454c18084 100644 --- a/src/Coordination/KeeperDispatcher.cpp +++ b/src/Coordination/KeeperDispatcher.cpp @@ -11,7 +11,6 @@ #include #include #include -#include #include #include @@ -371,16 +370,7 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf configuration_and_settings = KeeperConfigurationAndSettings::loadFromConfig(config, standalone_keeper); keeper_context = std::make_shared(standalone_keeper); - String availability_zone; - try - { - availability_zone = DB::S3::getRunningAvailabilityZone(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - } - keeper_context->initialize(config, this, availability_zone); + keeper_context->initialize(config, this); requests_queue = std::make_unique(configuration_and_settings->coordination_settings->max_request_queue_size); request_thread = ThreadFromGlobalPool([this] { requestThread(); }); diff --git a/src/IO/S3/Credentials.cpp b/src/IO/S3/Credentials.cpp index eb87568c8ab..4ba14572589 100644 --- a/src/IO/S3/Credentials.cpp +++ b/src/IO/S3/Credentials.cpp @@ -65,6 +65,7 @@ bool areCredentialsEmptyOrExpired(const Aws::Auth::AWSCredentials & credentials, } const char SSO_CREDENTIALS_PROVIDER_LOG_TAG[] = "SSOCredentialsProvider"; +const int AVAILABILITY_ZONE_REQUEST_TIMEOUT_SECONDS = 5; } @@ -244,6 +245,7 @@ String AWSEC2MetadataClient::getAvailabilityZoneOrException() Poco::Net::HTTPResponse response; Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, uri.getPath()); session.sendRequest(request); + session.setTimeout(Poco::Timespan(AVAILABILITY_ZONE_REQUEST_TIMEOUT_SECONDS, 0)); std::istream & rs = session.receiveResponse(response); if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK) @@ -257,9 +259,12 @@ String getGCPAvailabilityZoneOrException() { Poco::URI uri(String(GCP_METADATA_SERVICE_ENDPOINT) + "/computeMetadata/v1/instance/zone"); Poco::Net::HTTPClientSession session(uri.getHost(), uri.getPort()); + session.setTimeout(Poco::Timespan(AVAILABILITY_ZONE_REQUEST_TIMEOUT_SECONDS, 0)); + Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, uri.getPath()); Poco::Net::HTTPResponse response; request.set("Metadata-Flavor", "Google"); + session.sendRequest(request); std::istream & rs = session.receiveResponse(response); if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK) diff --git a/tests/integration/helpers/keeper_config1.xml b/tests/integration/helpers/keeper_config1.xml index 29232989084..12c6c0b78b6 100644 --- a/tests/integration/helpers/keeper_config1.xml +++ b/tests/integration/helpers/keeper_config1.xml @@ -11,7 +11,9 @@ 2181 - az-zoo1 + + az-zoo1 + 1 diff --git a/tests/integration/helpers/keeper_config2.xml b/tests/integration/helpers/keeper_config2.xml index 2601efbb313..0c58aaceb1c 100644 --- a/tests/integration/helpers/keeper_config2.xml +++ b/tests/integration/helpers/keeper_config2.xml @@ -12,7 +12,9 @@ 2181 2 - az-zoo2 + + az-zoo2 + 10000 diff --git a/tests/integration/test_keeper_availability_zone/test.py b/tests/integration/test_keeper_availability_zone/test.py index 1836f0e679b..b78e776f3c6 100644 --- a/tests/integration/test_keeper_availability_zone/test.py +++ b/tests/integration/test_keeper_availability_zone/test.py @@ -29,3 +29,8 @@ def test_get_availability_zone(): with KeeperClient.from_cluster(cluster, "zoo2") as client2: assert client2.get("/keeper/availability_zone") == "az-zoo2" + + # keeper3 is not configured with availability_zone value. + with KeeperClient.from_cluster(cluster, "zoo3") as client3: + with pytest.raises(Exception): + client3.get("/keeper/availability_zone") \ No newline at end of file From 9df2775f08d5ab377ba3aa2dd05010329b67ef20 Mon Sep 17 00:00:00 2001 From: Jianfei Hu Date: Tue, 14 Nov 2023 02:28:09 +0000 Subject: [PATCH 3/8] reduce timeout and setTimeout earlier. Signed-off-by: Jianfei Hu --- src/Coordination/KeeperContext.cpp | 7 ++++--- src/IO/S3/Credentials.cpp | 10 +++++----- tests/integration/helpers/keeper_config2.xml | 1 + .../integration/test_keeper_availability_zone/test.py | 6 ++++-- .../integration/test_keeper_four_word_command/test.py | 8 ++++---- 5 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/Coordination/KeeperContext.cpp b/src/Coordination/KeeperContext.cpp index 9745a53d1ab..c3cb166abee 100644 --- a/src/Coordination/KeeperContext.cpp +++ b/src/Coordination/KeeperContext.cpp @@ -39,8 +39,9 @@ void KeeperContext::initialize(const Poco::Util::AbstractConfiguration & config, if (config.hasProperty("keeper_server.availability_zone")) { - auto keeper_az = config.getString("keeper_server.availability_zone.value"); - if (config.getBool("keeper_server.availability_zone.enable_auto_detection_on_cloud", false)) + auto keeper_az = config.getString("keeper_server.availability_zone.value", ""); + const auto auto_detect_for_cloud = config.getBool("keeper_server.availability_zone.enable_auto_detection_on_cloud", false); + if (keeper_az.empty() && auto_detect_for_cloud) { try { @@ -54,7 +55,7 @@ void KeeperContext::initialize(const Poco::Util::AbstractConfiguration & config, if (!keeper_az.empty()) { system_nodes_with_data[keeper_availability_zone_path] = keeper_az; - LOG_INFO(&Poco::Logger::get("KeeperContext"), "Initialize the KeeperContext with availability zone: '{}'.'. ", keeper_az); + LOG_INFO(&Poco::Logger::get("KeeperContext"), "Initialize the KeeperContext with availability zone: '{}'", keeper_az); } } diff --git a/src/IO/S3/Credentials.cpp b/src/IO/S3/Credentials.cpp index 4ba14572589..7d6ed094486 100644 --- a/src/IO/S3/Credentials.cpp +++ b/src/IO/S3/Credentials.cpp @@ -65,7 +65,7 @@ bool areCredentialsEmptyOrExpired(const Aws::Auth::AWSCredentials & credentials, } const char SSO_CREDENTIALS_PROVIDER_LOG_TAG[] = "SSOCredentialsProvider"; -const int AVAILABILITY_ZONE_REQUEST_TIMEOUT_SECONDS = 5; +const int AVAILABILITY_ZONE_REQUEST_TIMEOUT_SECONDS = 3; } @@ -241,11 +241,11 @@ String AWSEC2MetadataClient::getAvailabilityZoneOrException() { Poco::URI uri(getAWSMetadataEndpoint() + EC2_AVAILABILITY_ZONE_RESOURCE); Poco::Net::HTTPClientSession session(uri.getHost(), uri.getPort()); + session.setTimeout(Poco::Timespan(AVAILABILITY_ZONE_REQUEST_TIMEOUT_SECONDS, 0)); Poco::Net::HTTPResponse response; Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, uri.getPath()); session.sendRequest(request); - session.setTimeout(Poco::Timespan(AVAILABILITY_ZONE_REQUEST_TIMEOUT_SECONDS, 0)); std::istream & rs = session.receiveResponse(response); if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK) @@ -287,17 +287,17 @@ String getRunningAvailabilityZoneImpl() auto aws_az = AWSEC2MetadataClient::getAvailabilityZoneOrException(); return aws_az; } - catch (const DB::Exception & aws_ex) + catch (const std::exception & aws_ex) { try { auto gcp_zone = getGCPAvailabilityZoneOrException(); return gcp_zone; } - catch (const DB::Exception & gcp_ex) + catch (const std::exception & gcp_ex) { throw DB::Exception(ErrorCodes::UNSUPPORTED_METHOD, - "Failed to find the availability zone, tried AWS and GCP. AWS Error: {}\nGCP Error: {}", aws_ex.displayText(), gcp_ex.displayText()); + "Failed to find the availability zone, tried AWS and GCP. AWS Error: {}\nGCP Error: {}", aws_ex.what(), gcp_ex.what()); } } } diff --git a/tests/integration/helpers/keeper_config2.xml b/tests/integration/helpers/keeper_config2.xml index 0c58aaceb1c..2afff2f5e59 100644 --- a/tests/integration/helpers/keeper_config2.xml +++ b/tests/integration/helpers/keeper_config2.xml @@ -14,6 +14,7 @@ 2 az-zoo2 + 1 diff --git a/tests/integration/test_keeper_availability_zone/test.py b/tests/integration/test_keeper_availability_zone/test.py index b78e776f3c6..a2003f8539e 100644 --- a/tests/integration/test_keeper_availability_zone/test.py +++ b/tests/integration/test_keeper_availability_zone/test.py @@ -27,10 +27,12 @@ def test_get_availability_zone(): with KeeperClient.from_cluster(cluster, "zoo1") as client1: assert client1.get("/keeper/availability_zone") == "az-zoo1" + # Keeper2 set enable_auto_detection_on_cloud to true, but is ignored and az-zoo2 is used. with KeeperClient.from_cluster(cluster, "zoo2") as client2: assert client2.get("/keeper/availability_zone") == "az-zoo2" - + assert "availability_zone" in client2.ls("/keeper") + # keeper3 is not configured with availability_zone value. with KeeperClient.from_cluster(cluster, "zoo3") as client3: with pytest.raises(Exception): - client3.get("/keeper/availability_zone") \ No newline at end of file + client3.get("/keeper/availability_zone") diff --git a/tests/integration/test_keeper_four_word_command/test.py b/tests/integration/test_keeper_four_word_command/test.py index 25c4bc55327..71501133ae7 100644 --- a/tests/integration/test_keeper_four_word_command/test.py +++ b/tests/integration/test_keeper_four_word_command/test.py @@ -183,8 +183,8 @@ def test_cmd_mntr(started_cluster): # contains: # 10 nodes created by test # 3 nodes created by clickhouse "/clickhouse/task_queue/ddl" - # 1 root node, 4 keeper system nodes - assert int(result["zk_znode_count"]) == 15 + # 1 root node, 3 keeper system nodes + assert int(result["zk_znode_count"]) == 14 assert int(result["zk_watch_count"]) == 2 assert int(result["zk_ephemerals_count"]) == 2 assert int(result["zk_approximate_data_size"]) > 0 @@ -333,7 +333,7 @@ def test_cmd_srvr(started_cluster): assert int(result["Connections"]) == 1 assert int(result["Zxid"], 16) > 10 assert result["Mode"] == "leader" - assert result["Node count"] == "15" + assert result["Node count"] == "14" finally: destroy_zk_client(zk) @@ -373,7 +373,7 @@ def test_cmd_stat(started_cluster): assert int(result["Connections"]) == 1 assert int(result["Zxid"], 16) >= 10 assert result["Mode"] == "leader" - assert result["Node count"] == "15" + assert result["Node count"] == "14" # filter connection statistics cons = [n for n in data.split("\n") if "=" in n] From d862dfdf9c753e17896f1c3a9d5cb01e71a5cee3 Mon Sep 17 00:00:00 2001 From: Jianfei Hu Date: Wed, 15 Nov 2023 18:38:23 +0000 Subject: [PATCH 4/8] fix comments Signed-off-by: Jianfei Hu --- src/IO/S3/Credentials.cpp | 53 +++++++++++---------------------------- src/IO/S3/Credentials.h | 11 ++++---- 2 files changed, 21 insertions(+), 43 deletions(-) diff --git a/src/IO/S3/Credentials.cpp b/src/IO/S3/Credentials.cpp index 7d6ed094486..bc336634114 100644 --- a/src/IO/S3/Credentials.cpp +++ b/src/IO/S3/Credentials.cpp @@ -1,9 +1,4 @@ -#include -#include #include -#include -#include -#include "Common/Exception.h" #if USE_AWS_S3 @@ -21,22 +16,24 @@ # include # include - +# include # include # include +# include +# include # include # include -#include -#include - - -#include -#include -#include -#include -#include +# include +# include +# include +# include +# include +# include +# include +# include +# include namespace DB @@ -65,7 +62,7 @@ bool areCredentialsEmptyOrExpired(const Aws::Auth::AWSCredentials & credentials, } const char SSO_CREDENTIALS_PROVIDER_LOG_TAG[] = "SSOCredentialsProvider"; -const int AVAILABILITY_ZONE_REQUEST_TIMEOUT_SECONDS = 3; +constexpr int AVAILABILITY_ZONE_REQUEST_TIMEOUT_SECONDS = 3; } @@ -275,11 +272,11 @@ String getGCPAvailabilityZoneOrException() boost::split(zone_info, response_data, boost::is_any_of("/")); /// We expect GCP returns a string as "projects/123456789/zones/us-central1a". if (zone_info.size() != 4) - throw DB::Exception(ErrorCodes::GCP_ERROR, "Invalid format of GCP zone information, expect projects//zones/, got {}", response_data); + throw DB::Exception(ErrorCodes::GCP_ERROR, "Invalid format of GCP zone information, expect projects//zones/"); return zone_info[3]; } -String getRunningAvailabilityZoneImpl() +String getRunningAvailabilityZone() { LOG_INFO(&Poco::Logger::get("Application"), "Trying to detect the availability zone."); try @@ -302,26 +299,6 @@ String getRunningAvailabilityZoneImpl() } } -std::variant getRunningAvailabilityZoneImplOrException() -{ - try - { - return getRunningAvailabilityZoneImpl(); - } - catch (...) - { - return std::current_exception(); - } -} - -String getRunningAvailabilityZone() -{ - static auto az_or_exception = getRunningAvailabilityZoneImplOrException(); - if (const auto * az = std::get_if(&az_or_exception)) - return *az; - else - std::rethrow_exception(std::get(az_or_exception)); -} AWSEC2InstanceProfileConfigLoader::AWSEC2InstanceProfileConfigLoader(const std::shared_ptr & client_, bool use_secure_pull_) : client(client_) diff --git a/src/IO/S3/Credentials.h b/src/IO/S3/Credentials.h index a978679348f..b1666e13757 100644 --- a/src/IO/S3/Credentials.h +++ b/src/IO/S3/Credentials.h @@ -1,12 +1,13 @@ #pragma once -#include -#include -#include #include "config.h" #if USE_AWS_S3 +# include +# include +# include + # include # include # include @@ -22,7 +23,7 @@ namespace DB::S3 inline static constexpr uint64_t DEFAULT_EXPIRATION_WINDOW_SECONDS = 120; /// In GCP metadata service can be accessed via DNS regardless of IPv4 or IPv6. -static constexpr char GCP_METADATA_SERVICE_ENDPOINT[] = "http://metadata.google.internal"; +static inline constexpr char GCP_METADATA_SERVICE_ENDPOINT[] = "http://metadata.google.internal"; /// getRunningAvailabilityZone returns the availability zone of the underlying compute resources where the current process runs. String getRunningAvailabilityZone(); @@ -59,7 +60,7 @@ public: virtual Aws::String getCurrentRegion() const; - friend String getRunningAvailabilityZoneImpl(); + friend String getRunningAvailabilityZone(); private: std::pair getEC2MetadataToken(const std::string & user_agent_string) const; From d0398e3c1d1f55281e65633a4947c807d3c0c022 Mon Sep 17 00:00:00 2001 From: Jianfei Hu Date: Wed, 15 Nov 2023 18:47:28 +0000 Subject: [PATCH 5/8] remove variant header Signed-off-by: Jianfei Hu --- src/IO/S3/Credentials.cpp | 1 - src/IO/S3/Credentials.h | 1 - 2 files changed, 2 deletions(-) diff --git a/src/IO/S3/Credentials.cpp b/src/IO/S3/Credentials.cpp index bc336634114..9ab21465593 100644 --- a/src/IO/S3/Credentials.cpp +++ b/src/IO/S3/Credentials.cpp @@ -21,7 +21,6 @@ # include # include -# include # include # include diff --git a/src/IO/S3/Credentials.h b/src/IO/S3/Credentials.h index b1666e13757..1f35443adf4 100644 --- a/src/IO/S3/Credentials.h +++ b/src/IO/S3/Credentials.h @@ -6,7 +6,6 @@ # include # include -# include # include # include From ea92dbb1c74c700b4df4172999d6ca504ff593bf Mon Sep 17 00:00:00 2001 From: Jianfei Hu Date: Wed, 15 Nov 2023 19:18:38 +0000 Subject: [PATCH 6/8] fix build for non USE_S3 case Signed-off-by: Jianfei Hu --- src/IO/S3/Credentials.cpp | 6 ++++-- src/IO/S3/Credentials.h | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/IO/S3/Credentials.cpp b/src/IO/S3/Credentials.cpp index 9ab21465593..e25f4551723 100644 --- a/src/IO/S3/Credentials.cpp +++ b/src/IO/S3/Credentials.cpp @@ -784,15 +784,17 @@ S3CredentialsProviderChain::S3CredentialsProviderChain( #else +# include + namespace DB { namespace S3 { -String getRunningAvailabilityZone() +std::string getRunningAvailabilityZone() { - throw Poco::Exception("Does not support availability zone detection for non-cloud environment"); + throw std::runtime_error("Does not support availability zone detection for non-cloud environment"); } } diff --git a/src/IO/S3/Credentials.h b/src/IO/S3/Credentials.h index 1f35443adf4..d8d103a847a 100644 --- a/src/IO/S3/Credentials.h +++ b/src/IO/S3/Credentials.h @@ -25,7 +25,7 @@ inline static constexpr uint64_t DEFAULT_EXPIRATION_WINDOW_SECONDS = 120; static inline constexpr char GCP_METADATA_SERVICE_ENDPOINT[] = "http://metadata.google.internal"; /// getRunningAvailabilityZone returns the availability zone of the underlying compute resources where the current process runs. -String getRunningAvailabilityZone(); +std::string getRunningAvailabilityZone(); class AWSEC2MetadataClient : public Aws::Internal::AWSHttpResourceClient { @@ -189,12 +189,14 @@ public: #else +# include + namespace DB { namespace S3 { -String getRunningAvailabilityZone(); +std::string getRunningAvailabilityZone(); } } From cffc6611e000faa19cac35dccfc093d77ae7e315 Mon Sep 17 00:00:00 2001 From: Jianfei Hu Date: Thu, 16 Nov 2023 07:05:41 +0000 Subject: [PATCH 7/8] Empty commit. From 69f214cdbcc85808d77893112f7c560285747a09 Mon Sep 17 00:00:00 2001 From: Jianfei Hu Date: Thu, 16 Nov 2023 08:04:57 +0000 Subject: [PATCH 8/8] fix comments. Signed-off-by: Jianfei Hu --- src/IO/S3/Credentials.cpp | 32 +++++++++++++++++++------------- src/IO/S3/Credentials.h | 1 - 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/src/IO/S3/Credentials.cpp b/src/IO/S3/Credentials.cpp index e25f4551723..73763853713 100644 --- a/src/IO/S3/Credentials.cpp +++ b/src/IO/S3/Credentials.cpp @@ -1,4 +1,15 @@ #include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int UNSUPPORTED_METHOD; +} + +} #if USE_AWS_S3 @@ -16,11 +27,9 @@ # include # include -# include # include # include -# include # include # include @@ -42,7 +51,6 @@ namespace ErrorCodes { extern const int AWS_ERROR; extern const int GCP_ERROR; - extern const int UNSUPPORTED_METHOD; } namespace S3 @@ -280,20 +288,20 @@ String getRunningAvailabilityZone() LOG_INFO(&Poco::Logger::get("Application"), "Trying to detect the availability zone."); try { - auto aws_az = AWSEC2MetadataClient::getAvailabilityZoneOrException(); - return aws_az; + return AWSEC2MetadataClient::getAvailabilityZoneOrException(); } - catch (const std::exception & aws_ex) + catch (...) { + auto aws_ex_msg = getExceptionMessage(std::current_exception(), false); try { - auto gcp_zone = getGCPAvailabilityZoneOrException(); - return gcp_zone; + return getGCPAvailabilityZoneOrException(); } - catch (const std::exception & gcp_ex) + catch (...) { + auto gcp_ex_msg = getExceptionMessage(std::current_exception(), false); throw DB::Exception(ErrorCodes::UNSUPPORTED_METHOD, - "Failed to find the availability zone, tried AWS and GCP. AWS Error: {}\nGCP Error: {}", aws_ex.what(), gcp_ex.what()); + "Failed to find the availability zone, tried AWS and GCP. AWS Error: {}\nGCP Error: {}", aws_ex_msg, gcp_ex_msg); } } } @@ -784,8 +792,6 @@ S3CredentialsProviderChain::S3CredentialsProviderChain( #else -# include - namespace DB { @@ -794,7 +800,7 @@ namespace S3 std::string getRunningAvailabilityZone() { - throw std::runtime_error("Does not support availability zone detection for non-cloud environment"); + throw DB::Exception(ErrorCodes::UNSUPPORTED_METHOD, "Does not support availability zone detection for non-cloud environment"); } } diff --git a/src/IO/S3/Credentials.h b/src/IO/S3/Credentials.h index d8d103a847a..ad73de23486 100644 --- a/src/IO/S3/Credentials.h +++ b/src/IO/S3/Credentials.h @@ -4,7 +4,6 @@ #if USE_AWS_S3 -# include # include # include