Merge pull request #56715 from incfly/keeper-az-fix

Fix keeper availability zone configuration.
This commit is contained in:
Antonio Andelic 2023-11-18 19:24:24 +01:00 committed by GitHub
commit 9eaafff3d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 214 additions and 33 deletions

View File

@ -586,6 +586,7 @@
M(704, QUERY_CACHE_USED_WITH_NONDETERMINISTIC_FUNCTIONS) \
M(705, TABLE_NOT_EMPTY) \
M(706, LIBSSH_ERROR) \
M(707, GCP_ERROR) \
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \
M(1001, STD_EXCEPTION) \

View File

@ -17,5 +17,6 @@ const String keeper_system_path = "/keeper";
const String keeper_api_version_path = keeper_system_path + "/api_version";
const String keeper_api_feature_flags_path = keeper_system_path + "/feature_flags";
const String keeper_config_path = keeper_system_path + "/config";
const String keeper_availability_zone_path = keeper_system_path + "/availability_zone";
}

View File

@ -3,6 +3,7 @@
#include <Coordination/Defines.h>
#include <Disks/DiskLocal.h>
#include <Interpreters/Context.h>
#include <IO/S3/Credentials.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Coordination/KeeperConstants.h>
#include <Common/logger_useful.h>
@ -35,6 +36,29 @@ KeeperContext::KeeperContext(bool standalone_keeper_)
void KeeperContext::initialize(const Poco::Util::AbstractConfiguration & config, KeeperDispatcher * dispatcher_)
{
dispatcher = dispatcher_;
if (config.hasProperty("keeper_server.availability_zone"))
{
auto keeper_az = config.getString("keeper_server.availability_zone.value", "");
const auto auto_detect_for_cloud = config.getBool("keeper_server.availability_zone.enable_auto_detection_on_cloud", false);
if (keeper_az.empty() && auto_detect_for_cloud)
{
try
{
keeper_az = DB::S3::getRunningAvailabilityZone();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
if (!keeper_az.empty())
{
system_nodes_with_data[keeper_availability_zone_path] = keeper_az;
LOG_INFO(&Poco::Logger::get("KeeperContext"), "Initialize the KeeperContext with availability zone: '{}'", keeper_az);
}
}
digest_enabled = config.getBool("keeper_server.digest_enabled", false);
ignore_system_path_on_startup = config.getBool("keeper_server.ignore_system_path_on_startup", false);

View File

@ -3,7 +3,6 @@
#include <Disks/DiskSelector.h>
#include <IO/WriteBufferFromString.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <cstdint>
#include <memory>

View File

@ -1081,7 +1081,8 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
Coordination::ZooKeeperGetRequest & request = dynamic_cast<Coordination::ZooKeeperGetRequest &>(*zk_request);
if (request.path == Coordination::keeper_api_feature_flags_path
|| request.path == Coordination::keeper_config_path)
|| request.path == Coordination::keeper_config_path
|| request.path == Coordination::keeper_availability_zone_path)
return {};
if (!storage.uncommitted_state.getNode(request.path))

View File

@ -1,4 +1,15 @@
#include <IO/S3/Credentials.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
}
}
#if USE_AWS_S3
@ -11,17 +22,27 @@
# include <aws/core/utils/UUID.h>
# include <aws/core/http/HttpClientFactory.h>
# include <IO/S3/PocoHTTPClientFactory.h>
# include <aws/core/utils/HashingUtils.h>
# include <aws/core/platform/FileSystem.h>
# include <Common/logger_useful.h>
# include <IO/S3/PocoHTTPClient.h>
# include <IO/S3/Client.h>
# include <fstream>
# include <base/EnumReflection.h>
# include <boost/algorithm/string.hpp>
# include <boost/algorithm/string/split.hpp>
# include <boost/algorithm/string/classification.hpp>
# include <Poco/Exception.h>
# include <Poco/URI.h>
# include <Poco/Net/HTTPClientSession.h>
# include <Poco/Net/HTTPRequest.h>
# include <Poco/Net/HTTPResponse.h>
# include <Poco/StreamCopier.h>
namespace DB
{
@ -29,6 +50,7 @@ namespace DB
namespace ErrorCodes
{
extern const int AWS_ERROR;
extern const int GCP_ERROR;
}
namespace S3
@ -47,6 +69,7 @@ bool areCredentialsEmptyOrExpired(const Aws::Auth::AWSCredentials & credentials,
}
const char SSO_CREDENTIALS_PROVIDER_LOG_TAG[] = "SSOCredentialsProvider";
constexpr int AVAILABILITY_ZONE_REQUEST_TIMEOUT_SECONDS = 3;
}
@ -151,30 +174,6 @@ Aws::String AWSEC2MetadataClient::getDefaultCredentialsSecurely() const
return GetResourceWithAWSWebServiceResult(credentials_request).GetPayload();
}
Aws::String AWSEC2MetadataClient::getCurrentAvailabilityZone() const
{
String user_agent_string = awsComputeUserAgentString();
auto [new_token, response_code] = getEC2MetadataToken(user_agent_string);
if (response_code != Aws::Http::HttpResponseCode::OK || new_token.empty())
throw DB::Exception(ErrorCodes::AWS_ERROR,
"Failed to make token request. HTTP response code: {}", response_code);
token = std::move(new_token);
const String url = endpoint + EC2_AVAILABILITY_ZONE_RESOURCE;
std::shared_ptr<Aws::Http::HttpRequest> profile_request(
Aws::Http::CreateHttpRequest(url, Aws::Http::HttpMethod::HTTP_GET, Aws::Utils::Stream::DefaultResponseStreamFactoryMethod));
profile_request->SetHeaderValue(EC2_IMDS_TOKEN_HEADER, token);
profile_request->SetUserAgent(user_agent_string);
const auto result = GetResourceWithAWSWebServiceResult(profile_request);
if (result.GetResponseCode() != Aws::Http::HttpResponseCode::OK)
throw DB::Exception(ErrorCodes::AWS_ERROR,
"Failed to get availability zone. HTTP response code: {}", result.GetResponseCode());
return Aws::Utils::StringUtils::Trim(result.GetPayload().c_str());
}
std::pair<Aws::String, Aws::Http::HttpResponseCode> AWSEC2MetadataClient::getEC2MetadataToken(const std::string & user_agent_string) const
{
std::lock_guard locker(token_mutex);
@ -199,10 +198,10 @@ Aws::String AWSEC2MetadataClient::getCurrentRegion() const
return Aws::Region::AWS_GLOBAL;
}
std::shared_ptr<AWSEC2MetadataClient> InitEC2MetadataClient(const Aws::Client::ClientConfiguration & client_configuration)
static Aws::String getAWSMetadataEndpoint()
{
Aws::String ec2_metadata_service_endpoint = Aws::Environment::GetEnv("AWS_EC2_METADATA_SERVICE_ENDPOINT");
auto * logger = &Poco::Logger::get("AWSEC2InstanceProfileConfigLoader");
Aws::String ec2_metadata_service_endpoint = Aws::Environment::GetEnv("AWS_EC2_METADATA_SERVICE_ENDPOINT");
if (ec2_metadata_service_endpoint.empty())
{
Aws::String ec2_metadata_service_endpoint_mode = Aws::Environment::GetEnv("AWS_EC2_METADATA_SERVICE_ENDPOINT_MODE");
@ -233,10 +232,81 @@ std::shared_ptr<AWSEC2MetadataClient> InitEC2MetadataClient(const Aws::Client::C
}
}
}
LOG_INFO(logger, "Using IMDS endpoint: {}", ec2_metadata_service_endpoint);
return std::make_shared<AWSEC2MetadataClient>(client_configuration, ec2_metadata_service_endpoint.c_str());
return ec2_metadata_service_endpoint;
}
std::shared_ptr<AWSEC2MetadataClient> InitEC2MetadataClient(const Aws::Client::ClientConfiguration & client_configuration)
{
auto endpoint = getAWSMetadataEndpoint();
return std::make_shared<AWSEC2MetadataClient>(client_configuration, endpoint.c_str());
}
String AWSEC2MetadataClient::getAvailabilityZoneOrException()
{
Poco::URI uri(getAWSMetadataEndpoint() + EC2_AVAILABILITY_ZONE_RESOURCE);
Poco::Net::HTTPClientSession session(uri.getHost(), uri.getPort());
session.setTimeout(Poco::Timespan(AVAILABILITY_ZONE_REQUEST_TIMEOUT_SECONDS, 0));
Poco::Net::HTTPResponse response;
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, uri.getPath());
session.sendRequest(request);
std::istream & rs = session.receiveResponse(response);
if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK)
throw DB::Exception(ErrorCodes::AWS_ERROR, "Failed to get AWS availability zone. HTTP response code: {}", response.getStatus());
String response_data;
Poco::StreamCopier::copyToString(rs, response_data);
return response_data;
}
String getGCPAvailabilityZoneOrException()
{
Poco::URI uri(String(GCP_METADATA_SERVICE_ENDPOINT) + "/computeMetadata/v1/instance/zone");
Poco::Net::HTTPClientSession session(uri.getHost(), uri.getPort());
session.setTimeout(Poco::Timespan(AVAILABILITY_ZONE_REQUEST_TIMEOUT_SECONDS, 0));
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, uri.getPath());
Poco::Net::HTTPResponse response;
request.set("Metadata-Flavor", "Google");
session.sendRequest(request);
std::istream & rs = session.receiveResponse(response);
if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK)
throw DB::Exception(ErrorCodes::GCP_ERROR, "Failed to get GCP availability zone. HTTP response code: {}", response.getStatus());
String response_data;
Poco::StreamCopier::copyToString(rs, response_data);
Strings zone_info;
boost::split(zone_info, response_data, boost::is_any_of("/"));
/// We expect GCP returns a string as "projects/123456789/zones/us-central1a".
if (zone_info.size() != 4)
throw DB::Exception(ErrorCodes::GCP_ERROR, "Invalid format of GCP zone information, expect projects/<project-number>/zones/<zone-value>");
return zone_info[3];
}
String getRunningAvailabilityZone()
{
LOG_INFO(&Poco::Logger::get("Application"), "Trying to detect the availability zone.");
try
{
return AWSEC2MetadataClient::getAvailabilityZoneOrException();
}
catch (...)
{
auto aws_ex_msg = getExceptionMessage(std::current_exception(), false);
try
{
return getGCPAvailabilityZoneOrException();
}
catch (...)
{
auto gcp_ex_msg = getExceptionMessage(std::current_exception(), false);
throw DB::Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Failed to find the availability zone, tried AWS and GCP. AWS Error: {}\nGCP Error: {}", aws_ex_msg, gcp_ex_msg);
}
}
}
AWSEC2InstanceProfileConfigLoader::AWSEC2InstanceProfileConfigLoader(const std::shared_ptr<AWSEC2MetadataClient> & client_, bool use_secure_pull_)
: client(client_)
, use_secure_pull(use_secure_pull_)
@ -703,7 +773,6 @@ S3CredentialsProviderChain::S3CredentialsProviderChain(
aws_client_configuration.requestTimeoutMs = 1000;
aws_client_configuration.retryStrategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(1, 1000);
auto ec2_metadata_client = InitEC2MetadataClient(aws_client_configuration);
auto config_loader = std::make_shared<AWSEC2InstanceProfileConfigLoader>(ec2_metadata_client, !credentials_configuration.use_insecure_imds_request);
@ -721,4 +790,21 @@ S3CredentialsProviderChain::S3CredentialsProviderChain(
}
#else
namespace DB
{
namespace S3
{
std::string getRunningAvailabilityZone()
{
throw DB::Exception(ErrorCodes::UNSUPPORTED_METHOD, "Does not support availability zone detection for non-cloud environment");
}
}
}
#endif

View File

@ -4,6 +4,8 @@
#if USE_AWS_S3
# include <base/types.h>
# include <aws/core/client/ClientConfiguration.h>
# include <aws/core/internal/AWSHttpResourceClient.h>
# include <aws/core/config/AWSProfileConfigLoader.h>
@ -18,6 +20,12 @@ namespace DB::S3
inline static constexpr uint64_t DEFAULT_EXPIRATION_WINDOW_SECONDS = 120;
/// In GCP metadata service can be accessed via DNS regardless of IPv4 or IPv6.
static inline constexpr char GCP_METADATA_SERVICE_ENDPOINT[] = "http://metadata.google.internal";
/// getRunningAvailabilityZone returns the availability zone of the underlying compute resources where the current process runs.
std::string getRunningAvailabilityZone();
class AWSEC2MetadataClient : public Aws::Internal::AWSHttpResourceClient
{
static constexpr char EC2_SECURITY_CREDENTIALS_RESOURCE[] = "/latest/meta-data/iam/security-credentials";
@ -50,10 +58,11 @@ public:
virtual Aws::String getCurrentRegion() const;
virtual Aws::String getCurrentAvailabilityZone() const;
friend String getRunningAvailabilityZone();
private:
std::pair<Aws::String, Aws::Http::HttpResponseCode> getEC2MetadataToken(const std::string & user_agent_string) const;
static String getAvailabilityZoneOrException();
const Aws::String endpoint;
mutable std::recursive_mutex token_mutex;
@ -177,4 +186,17 @@ public:
}
#else
# include <string>
namespace DB
{
namespace S3
{
std::string getRunningAvailabilityZone();
}
}
#endif

View File

@ -11,6 +11,9 @@
<keeper_server>
<tcp_port>2181</tcp_port>
<availability_zone>
<value>az-zoo1</value>
</availability_zone>
<server_id>1</server_id>
<coordination_settings>

View File

@ -12,6 +12,10 @@
<keeper_server>
<tcp_port>2181</tcp_port>
<server_id>2</server_id>
<availability_zone>
<value>az-zoo2</value>
<enable_auto_detection_on_cloud>1</enable_auto_detection_on_cloud>
</availability_zone>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>

View File

@ -0,0 +1,2 @@
<clickhouse>
</clickhouse>

View File

@ -0,0 +1,38 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.keeper_utils import KeeperClient
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/keeper_config.xml"],
with_zookeeper=True,
stay_alive=True,
)
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_get_availability_zone():
with KeeperClient.from_cluster(cluster, "zoo1") as client1:
assert client1.get("/keeper/availability_zone") == "az-zoo1"
# Keeper2 set enable_auto_detection_on_cloud to true, but is ignored and <value>az-zoo2</value> is used.
with KeeperClient.from_cluster(cluster, "zoo2") as client2:
assert client2.get("/keeper/availability_zone") == "az-zoo2"
assert "availability_zone" in client2.ls("/keeper")
# keeper3 is not configured with availability_zone value.
with KeeperClient.from_cluster(cluster, "zoo3") as client3:
with pytest.raises(Exception):
client3.get("/keeper/availability_zone")