Revert "Add /keeper/availability-zone node to allow server load balancing within AZ."

This commit is contained in:
Alexey Milovidov 2023-11-11 05:43:58 +03:00 committed by GitHub
parent c095b1e9ef
commit e5aad4a6fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 39 additions and 212 deletions

View File

@ -586,7 +586,6 @@
M(704, CANNOT_USE_QUERY_CACHE_WITH_NONDETERMINISTIC_FUNCTIONS) \
M(705, TABLE_NOT_EMPTY) \
M(706, LIBSSH_ERROR) \
M(707, GCP_ERROR) \
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \
M(1001, STD_EXCEPTION) \

View File

@ -17,6 +17,5 @@ const String keeper_system_path = "/keeper";
const String keeper_api_version_path = keeper_system_path + "/api_version";
const String keeper_api_feature_flags_path = keeper_system_path + "/feature_flags";
const String keeper_config_path = keeper_system_path + "/config";
const String keeper_availability_zone_path = keeper_system_path + "/availability_zone";
}

View File

@ -32,17 +32,9 @@ KeeperContext::KeeperContext(bool standalone_keeper_)
system_nodes_with_data[keeper_api_version_path] = toString(static_cast<uint8_t>(KeeperApiVersion::WITH_MULTI_READ));
}
void KeeperContext::initialize(const Poco::Util::AbstractConfiguration & config, KeeperDispatcher * dispatcher_, const std::string & environment_az)
void KeeperContext::initialize(const Poco::Util::AbstractConfiguration & config, KeeperDispatcher * dispatcher_)
{
dispatcher = dispatcher_;
/// We only use the environment availability zone when configuration option is missing.
auto keeper_az = config.getString("keeper_server.availability_zone", environment_az);
if (!keeper_az.empty())
system_nodes_with_data[keeper_availability_zone_path] = keeper_az;
LOG_INFO(&Poco::Logger::get("KeeperContext"),
"Initialize the KeeperContext with availability zone: '{}', environment availability zone '{}'. ", keeper_az, environment_az);
digest_enabled = config.getBool("keeper_server.digest_enabled", false);
ignore_system_path_on_startup = config.getBool("keeper_server.ignore_system_path_on_startup", false);

View File

@ -3,6 +3,7 @@
#include <Disks/DiskSelector.h>
#include <IO/WriteBufferFromString.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <cstdint>
#include <memory>
@ -23,7 +24,7 @@ public:
SHUTDOWN
};
void initialize(const Poco::Util::AbstractConfiguration & config, KeeperDispatcher * dispatcher_, const std::string & environment_az);
void initialize(const Poco::Util::AbstractConfiguration & config, KeeperDispatcher * dispatcher_);
Phase getServerState() const;
void setServerState(Phase server_state_);

View File

@ -11,7 +11,6 @@
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
#include <Common/logger_useful.h>
#include <IO/S3/Credentials.h>
#include <atomic>
#include <future>
@ -371,16 +370,7 @@ void KeeperDispatcher::initialize(const Poco::Util::AbstractConfiguration & conf
configuration_and_settings = KeeperConfigurationAndSettings::loadFromConfig(config, standalone_keeper);
keeper_context = std::make_shared<KeeperContext>(standalone_keeper);
String availability_zone;
try
{
availability_zone = DB::S3::getRunningAvailabilityZone();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
keeper_context->initialize(config, this, availability_zone);
keeper_context->initialize(config, this);
requests_queue = std::make_unique<RequestsQueue>(configuration_and_settings->coordination_settings->max_request_queue_size);
request_thread = ThreadFromGlobalPool([this] { requestThread(); });

View File

@ -1081,8 +1081,7 @@ struct KeeperStorageGetRequestProcessor final : public KeeperStorageRequestProce
Coordination::ZooKeeperGetRequest & request = dynamic_cast<Coordination::ZooKeeperGetRequest &>(*zk_request);
if (request.path == Coordination::keeper_api_feature_flags_path
|| request.path == Coordination::keeper_config_path
|| request.path == Coordination::keeper_availability_zone_path)
|| request.path == Coordination::keeper_config_path)
return {};
if (!storage.uncommitted_state.getNode(request.path))

View File

@ -1,9 +1,4 @@
#include <exception>
#include <variant>
#include <IO/S3/Credentials.h>
#include <boost/algorithm/string/classification.hpp>
#include <Poco/Exception.h>
#include "Common/Exception.h"
#if USE_AWS_S3
@ -16,7 +11,6 @@
# include <aws/core/utils/UUID.h>
# include <aws/core/http/HttpClientFactory.h>
# include <IO/S3/PocoHTTPClientFactory.h>
# include <aws/core/utils/HashingUtils.h>
# include <aws/core/platform/FileSystem.h>
@ -28,16 +22,6 @@
# include <fstream>
# include <base/EnumReflection.h>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/split.hpp>
#include <Poco/URI.h>
#include <Poco/Net/HTTPClientSession.h>
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/StreamCopier.h>
namespace DB
{
@ -45,8 +29,6 @@ namespace DB
namespace ErrorCodes
{
extern const int AWS_ERROR;
extern const int GCP_ERROR;
extern const int UNSUPPORTED_METHOD;
}
namespace S3
@ -169,6 +151,30 @@ Aws::String AWSEC2MetadataClient::getDefaultCredentialsSecurely() const
return GetResourceWithAWSWebServiceResult(credentials_request).GetPayload();
}
Aws::String AWSEC2MetadataClient::getCurrentAvailabilityZone() const
{
String user_agent_string = awsComputeUserAgentString();
auto [new_token, response_code] = getEC2MetadataToken(user_agent_string);
if (response_code != Aws::Http::HttpResponseCode::OK || new_token.empty())
throw DB::Exception(ErrorCodes::AWS_ERROR,
"Failed to make token request. HTTP response code: {}", response_code);
token = std::move(new_token);
const String url = endpoint + EC2_AVAILABILITY_ZONE_RESOURCE;
std::shared_ptr<Aws::Http::HttpRequest> profile_request(
Aws::Http::CreateHttpRequest(url, Aws::Http::HttpMethod::HTTP_GET, Aws::Utils::Stream::DefaultResponseStreamFactoryMethod));
profile_request->SetHeaderValue(EC2_IMDS_TOKEN_HEADER, token);
profile_request->SetUserAgent(user_agent_string);
const auto result = GetResourceWithAWSWebServiceResult(profile_request);
if (result.GetResponseCode() != Aws::Http::HttpResponseCode::OK)
throw DB::Exception(ErrorCodes::AWS_ERROR,
"Failed to get availability zone. HTTP response code: {}", result.GetResponseCode());
return Aws::Utils::StringUtils::Trim(result.GetPayload().c_str());
}
std::pair<Aws::String, Aws::Http::HttpResponseCode> AWSEC2MetadataClient::getEC2MetadataToken(const std::string & user_agent_string) const
{
std::lock_guard locker(token_mutex);
@ -193,10 +199,10 @@ Aws::String AWSEC2MetadataClient::getCurrentRegion() const
return Aws::Region::AWS_GLOBAL;
}
static Aws::String getAWSMetadataEndpoint()
std::shared_ptr<AWSEC2MetadataClient> InitEC2MetadataClient(const Aws::Client::ClientConfiguration & client_configuration)
{
auto * logger = &Poco::Logger::get("AWSEC2InstanceProfileConfigLoader");
Aws::String ec2_metadata_service_endpoint = Aws::Environment::GetEnv("AWS_EC2_METADATA_SERVICE_ENDPOINT");
auto * logger = &Poco::Logger::get("AWSEC2InstanceProfileConfigLoader");
if (ec2_metadata_service_endpoint.empty())
{
Aws::String ec2_metadata_service_endpoint_mode = Aws::Environment::GetEnv("AWS_EC2_METADATA_SERVICE_ENDPOINT_MODE");
@ -227,95 +233,8 @@ static Aws::String getAWSMetadataEndpoint()
}
}
}
return ec2_metadata_service_endpoint;
}
std::shared_ptr<AWSEC2MetadataClient> InitEC2MetadataClient(const Aws::Client::ClientConfiguration & client_configuration)
{
auto endpoint = getAWSMetadataEndpoint();
return std::make_shared<AWSEC2MetadataClient>(client_configuration, endpoint.c_str());
}
String AWSEC2MetadataClient::getAvailabilityZoneOrException()
{
Poco::URI uri(getAWSMetadataEndpoint() + EC2_AVAILABILITY_ZONE_RESOURCE);
Poco::Net::HTTPClientSession session(uri.getHost(), uri.getPort());
Poco::Net::HTTPResponse response;
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, uri.getPath());
session.sendRequest(request);
std::istream & rs = session.receiveResponse(response);
if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK)
throw DB::Exception(ErrorCodes::AWS_ERROR, "Failed to get AWS availability zone. HTTP response code: {}", response.getStatus());
String response_data;
Poco::StreamCopier::copyToString(rs, response_data);
return response_data;
}
String getGCPAvailabilityZoneOrException()
{
Poco::URI uri(String(GCP_METADATA_SERVICE_ENDPOINT) + "/computeMetadata/v1/instance/zone");
Poco::Net::HTTPClientSession session(uri.getHost(), uri.getPort());
Poco::Net::HTTPRequest request(Poco::Net::HTTPRequest::HTTP_GET, uri.getPath());
Poco::Net::HTTPResponse response;
request.set("Metadata-Flavor", "Google");
session.sendRequest(request);
std::istream & rs = session.receiveResponse(response);
if (response.getStatus() != Poco::Net::HTTPResponse::HTTP_OK)
throw DB::Exception(ErrorCodes::GCP_ERROR, "Failed to get GCP availability zone. HTTP response code: {}", response.getStatus());
String response_data;
Poco::StreamCopier::copyToString(rs, response_data);
Strings zone_info;
boost::split(zone_info, response_data, boost::is_any_of("/"));
/// We expect GCP returns a string as "projects/123456789/zones/us-central1a".
if (zone_info.size() != 4)
throw DB::Exception(ErrorCodes::GCP_ERROR, "Invalid format of GCP zone information, expect projects/<project-number>/zones/<zone-value>, got {}", response_data);
return zone_info[3];
}
String getRunningAvailabilityZoneImpl()
{
LOG_INFO(&Poco::Logger::get("Application"), "Trying to detect the availability zone.");
try
{
auto aws_az = AWSEC2MetadataClient::getAvailabilityZoneOrException();
return aws_az;
}
catch (const DB::Exception & aws_ex)
{
try
{
auto gcp_zone = getGCPAvailabilityZoneOrException();
return gcp_zone;
}
catch (const DB::Exception & gcp_ex)
{
throw DB::Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Failed to find the availability zone, tried AWS and GCP. AWS Error: {}\nGCP Error: {}", aws_ex.displayText(), gcp_ex.displayText());
}
}
}
std::variant<String, std::exception_ptr> getRunningAvailabilityZoneImplOrException()
{
try
{
return getRunningAvailabilityZoneImpl();
}
catch (...)
{
return std::current_exception();
}
}
String getRunningAvailabilityZone()
{
static auto az_or_exception = getRunningAvailabilityZoneImplOrException();
if (const auto * az = std::get_if<String>(&az_or_exception))
return *az;
else
std::rethrow_exception(std::get<std::exception_ptr>(az_or_exception));
LOG_INFO(logger, "Using IMDS endpoint: {}", ec2_metadata_service_endpoint);
return std::make_shared<AWSEC2MetadataClient>(client_configuration, ec2_metadata_service_endpoint.c_str());
}
AWSEC2InstanceProfileConfigLoader::AWSEC2InstanceProfileConfigLoader(const std::shared_ptr<AWSEC2MetadataClient> & client_, bool use_secure_pull_)
@ -784,6 +703,7 @@ S3CredentialsProviderChain::S3CredentialsProviderChain(
aws_client_configuration.requestTimeoutMs = 1000;
aws_client_configuration.retryStrategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(1, 1000);
auto ec2_metadata_client = InitEC2MetadataClient(aws_client_configuration);
auto config_loader = std::make_shared<AWSEC2InstanceProfileConfigLoader>(ec2_metadata_client, !credentials_configuration.use_insecure_imds_request);
@ -801,21 +721,4 @@ S3CredentialsProviderChain::S3CredentialsProviderChain(
}
#else
namespace DB
{
namespace S3
{
String getRunningAvailabilityZone()
{
throw Poco::Exception("Does not support availability zone detection for non-cloud environment");
}
}
}
#endif

View File

@ -1,8 +1,5 @@
#pragma once
#include <exception>
#include <base/types.h>
#include <variant>
#include "config.h"
#if USE_AWS_S3
@ -21,12 +18,6 @@ namespace DB::S3
inline static constexpr uint64_t DEFAULT_EXPIRATION_WINDOW_SECONDS = 120;
/// In GCP metadata service can be accessed via DNS regardless of IPv4 or IPv6.
static constexpr char GCP_METADATA_SERVICE_ENDPOINT[] = "http://metadata.google.internal";
/// getRunningAvailabilityZone returns the availability zone of the underlying compute resources where the current process runs.
String getRunningAvailabilityZone();
class AWSEC2MetadataClient : public Aws::Internal::AWSHttpResourceClient
{
static constexpr char EC2_SECURITY_CREDENTIALS_RESOURCE[] = "/latest/meta-data/iam/security-credentials";
@ -59,11 +50,10 @@ public:
virtual Aws::String getCurrentRegion() const;
friend String getRunningAvailabilityZoneImpl();
virtual Aws::String getCurrentAvailabilityZone() const;
private:
std::pair<Aws::String, Aws::Http::HttpResponseCode> getEC2MetadataToken(const std::string & user_agent_string) const;
static String getAvailabilityZoneOrException();
const Aws::String endpoint;
mutable std::recursive_mutex token_mutex;
@ -187,15 +177,4 @@ public:
}
#else
namespace DB
{
namespace S3
{
String getRunningAvailabilityZone();
}
}
#endif

View File

@ -11,7 +11,6 @@
<keeper_server>
<tcp_port>2181</tcp_port>
<availability_zone>az-zoo1</availability_zone>
<server_id>1</server_id>
<coordination_settings>

View File

@ -12,7 +12,6 @@
<keeper_server>
<tcp_port>2181</tcp_port>
<server_id>2</server_id>
<availability_zone>az-zoo2</availability_zone>
<coordination_settings>
<operation_timeout_ms>10000</operation_timeout_ms>

View File

@ -1,2 +0,0 @@
<clickhouse>
</clickhouse>

View File

@ -1,31 +0,0 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.keeper_utils import KeeperClient
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance(
"node",
main_configs=["configs/keeper_config.xml"],
with_zookeeper=True,
stay_alive=True,
)
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_get_availability_zone():
with KeeperClient.from_cluster(cluster, "zoo1") as client1:
assert client1.get("/keeper/availability_zone") == "az-zoo1"
with KeeperClient.from_cluster(cluster, "zoo2") as client2:
assert client2.get("/keeper/availability_zone") == "az-zoo2"

View File

@ -183,8 +183,8 @@ def test_cmd_mntr(started_cluster):
# contains:
# 10 nodes created by test
# 3 nodes created by clickhouse "/clickhouse/task_queue/ddl"
# 1 root node, 4 keeper system nodes
assert int(result["zk_znode_count"]) == 15
# 1 root node, 3 keeper system nodes
assert int(result["zk_znode_count"]) == 14
assert int(result["zk_watch_count"]) == 2
assert int(result["zk_ephemerals_count"]) == 2
assert int(result["zk_approximate_data_size"]) > 0
@ -333,7 +333,7 @@ def test_cmd_srvr(started_cluster):
assert int(result["Connections"]) == 1
assert int(result["Zxid"], 16) > 10
assert result["Mode"] == "leader"
assert result["Node count"] == "15"
assert result["Node count"] == "14"
finally:
destroy_zk_client(zk)
@ -373,7 +373,7 @@ def test_cmd_stat(started_cluster):
assert int(result["Connections"]) == 1
assert int(result["Zxid"], 16) >= 10
assert result["Mode"] == "leader"
assert result["Node count"] == "15"
assert result["Node count"] == "14"
# filter connection statistics
cons = [n for n in data.split("\n") if "=" in n]