Merge pull request #71011 from MikhailBurdukov/access_header_for_s3

Add a new type of headers for S3 endpoints for custom authentication.
This commit is contained in:
Kseniia Sumarokova 2024-10-30 16:38:05 +00:00 committed by GitHub
commit 364276f53b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 279 additions and 5 deletions

View File

@ -290,6 +290,7 @@ The following settings can be specified in configuration file for given endpoint
- `expiration_window_seconds` — Grace period for checking if expiration-based credentials have expired. Optional, default value is `120`.
- `no_sign_request` - Ignore all the credentials so requests are not signed. Useful for accessing public buckets.
- `header` — Adds specified HTTP header to a request to given endpoint. Optional, can be specified multiple times.
- `access_header` - Adds specified HTTP header to a request to given endpoint, in cases where there are no other credentials from another source.
- `server_side_encryption_customer_key_base64` — If specified, required headers for accessing S3 objects with SSE-C encryption will be set. Optional.
- `server_side_encryption_kms_key_id` - If specified, required headers for accessing S3 objects with [SSE-KMS encryption](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingKMSEncryption.html) will be set. If an empty string is specified, the AWS managed S3 key will be used. Optional.
- `server_side_encryption_kms_encryption_context` - If specified alongside `server_side_encryption_kms_key_id`, the given encryption context header for SSE-KMS will be set. Optional.

View File

@ -138,6 +138,7 @@ CREATE TABLE table_with_asterisk (name String, value UInt32)
- `use_insecure_imds_request` — признак использования менее безопасного соединения при выполнении запроса к IMDS при получении учётных данных из метаданных Amazon EC2. Значение по умолчанию — `false`.
- `region` — название региона S3.
- `header` — добавляет указанный HTTP-заголовок к запросу на заданную точку приема запроса. Может быть определен несколько раз.
- `access_header` - добавляет указанный HTTP-заголовок к запросу на заданную точку приема запроса, в случая если не указаны другие способы авторизации.
- `server_side_encryption_customer_key_base64` — устанавливает необходимые заголовки для доступа к объектам S3 с шифрованием SSE-C.
- `single_read_retries` — Максимальное количество попыток запроса при единичном чтении. Значение по умолчанию — `4`.

View File

@ -177,7 +177,7 @@ std::unique_ptr<S3::Client> getClient(
auth_settings[S3AuthSetting::secret_access_key],
auth_settings[S3AuthSetting::server_side_encryption_customer_key_base64],
auth_settings.server_side_encryption_kms_config,
auth_settings.headers,
auth_settings.getHeaders(),
credentials_configuration,
auth_settings[S3AuthSetting::session_token]);
}

View File

@ -105,7 +105,9 @@ S3AuthSettings::S3AuthSettings(
}
}
headers = getHTTPHeaders(config_prefix, config);
headers = getHTTPHeaders(config_prefix, config, "header");
access_headers = getHTTPHeaders(config_prefix, config, "access_header");
server_side_encryption_kms_config = getSSEKMSConfig(config_prefix, config);
Poco::Util::AbstractConfiguration::Keys keys;
@ -119,6 +121,7 @@ S3AuthSettings::S3AuthSettings(
S3AuthSettings::S3AuthSettings(const S3AuthSettings & settings)
: headers(settings.headers)
, access_headers(settings.access_headers)
, users(settings.users)
, server_side_encryption_kms_config(settings.server_side_encryption_kms_config)
, impl(std::make_unique<S3AuthSettingsImpl>(*settings.impl))
@ -127,6 +130,7 @@ S3AuthSettings::S3AuthSettings(const S3AuthSettings & settings)
S3AuthSettings::S3AuthSettings(S3AuthSettings && settings) noexcept
: headers(std::move(settings.headers))
, access_headers(std::move(settings.access_headers))
, users(std::move(settings.users))
, server_side_encryption_kms_config(std::move(settings.server_side_encryption_kms_config))
, impl(std::make_unique<S3AuthSettingsImpl>(std::move(*settings.impl)))
@ -145,6 +149,7 @@ S3AUTH_SETTINGS_SUPPORTED_TYPES(S3AuthSettings, IMPLEMENT_SETTING_SUBSCRIPT_OPER
S3AuthSettings & S3AuthSettings::operator=(S3AuthSettings && settings) noexcept
{
headers = std::move(settings.headers);
access_headers = std::move(settings.access_headers);
users = std::move(settings.users);
server_side_encryption_kms_config = std::move(settings.server_side_encryption_kms_config);
*impl = std::move(*settings.impl);
@ -157,6 +162,9 @@ bool S3AuthSettings::operator==(const S3AuthSettings & right)
if (headers != right.headers)
return false;
if (access_headers != right.access_headers)
return false;
if (users != right.users)
return false;
@ -196,6 +204,9 @@ void S3AuthSettings::updateIfChanged(const S3AuthSettings & settings)
if (!settings.headers.empty())
headers = settings.headers;
if (!settings.access_headers.empty())
access_headers = settings.access_headers;
if (!settings.users.empty())
users.insert(settings.users.begin(), settings.users.end());
@ -205,6 +216,17 @@ void S3AuthSettings::updateIfChanged(const S3AuthSettings & settings)
server_side_encryption_kms_config = settings.server_side_encryption_kms_config;
}
HTTPHeaderEntries S3AuthSettings::getHeaders() const
{
bool auth_settings_is_default = !impl->isChanged("access_key_id");
if (access_headers.empty() || !auth_settings_is_default)
return headers;
HTTPHeaderEntries result(headers);
result.insert(result.end(), access_headers.begin(), access_headers.end());
return result;
}
}
}

View File

@ -55,8 +55,11 @@ struct S3AuthSettings
bool hasUpdates(const S3AuthSettings & other) const;
void updateIfChanged(const S3AuthSettings & settings);
bool canBeUsedByUser(const String & user) const { return users.empty() || users.contains(user); }
HTTPHeaderEntries getHeaders() const;
HTTPHeaderEntries headers;
HTTPHeaderEntries access_headers;
std::unordered_set<std::string> users;
ServerSideEncryptionKMSConfig server_side_encryption_kms_config;

View File

@ -74,14 +74,14 @@ namespace ErrorCodes
namespace S3
{
HTTPHeaderEntries getHTTPHeaders(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config)
HTTPHeaderEntries getHTTPHeaders(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config, const std::string header_key)
{
HTTPHeaderEntries headers;
Poco::Util::AbstractConfiguration::Keys subconfig_keys;
config.keys(config_elem, subconfig_keys);
for (const std::string & subkey : subconfig_keys)
{
if (subkey.starts_with("header"))
if (subkey.starts_with(header_key))
{
auto header_str = config.getString(config_elem + "." + subkey);
auto delimiter = header_str.find(':');

View File

@ -69,7 +69,7 @@ struct ProxyConfigurationResolver;
namespace S3
{
HTTPHeaderEntries getHTTPHeaders(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config);
HTTPHeaderEntries getHTTPHeaders(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config, std::string header_key = "header");
ServerSideEncryptionKMSConfig getSSEKMSConfig(const std::string & config_elem, const Poco::Util::AbstractConfiguration & config);
}

View File

@ -0,0 +1,9 @@
<clickhouse>
<named_collections>
<s3_mock>
<url>http://resolver:8081/root/test_named_colections.csv</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_mock>
</named_collections>
</clickhouse>

View File

@ -0,0 +1,8 @@
<clickhouse>
<s3>
<s3_mock>
<endpoint>http://resolver:8081/</endpoint>
<access_header>custom-auth-token: ValidToken1234</access_header>
</s3_mock>
</s3>
</clickhouse>

View File

@ -0,0 +1,9 @@
<clickhouse>
<users>
<default>
<password></password>
<profile>default</profile>
<named_collection_control>1</named_collection_control>
</default>
</users>
</clickhouse>

View File

@ -0,0 +1,97 @@
import http.client
import http.server
import random
import socketserver
import sys
import urllib.parse
UPSTREAM_HOST = "minio1:9001"
random.seed("No list objects/1.0")
def request(command, url, headers={}, data=None):
"""Mini-requests."""
class Dummy:
pass
parts = urllib.parse.urlparse(url)
c = http.client.HTTPConnection(parts.hostname, parts.port)
c.request(
command,
urllib.parse.urlunparse(parts._replace(scheme="", netloc="")),
headers=headers,
body=data,
)
r = c.getresponse()
result = Dummy()
result.status_code = r.status
result.headers = r.headers
result.content = r.read()
return result
CUSTOM_AUTH_TOKEN_HEADER = "custom-auth-token"
CUSTOM_AUTH_TOKEN_VALID_VALUE = "ValidToken1234"
class RequestHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/":
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"OK")
return
self.do_HEAD()
def do_PUT(self):
self.do_HEAD()
def do_DELETE(self):
self.do_HEAD()
def do_POST(self):
self.do_HEAD()
def do_HEAD(self):
custom_auth_token = self.headers.get(CUSTOM_AUTH_TOKEN_HEADER)
if custom_auth_token and custom_auth_token != CUSTOM_AUTH_TOKEN_VALID_VALUE:
self.send_response(403)
self.send_header("Content-Type", "application/xml")
self.end_headers()
body = f"""<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>AccessDenied</Code>
<Message>Access Denied. Custom token was {custom_auth_token}, the correct one: {CUSTOM_AUTH_TOKEN_VALID_VALUE}.</Message>
<Resource>RESOURCE</Resource>
<RequestId>REQUEST_ID</RequestId>
</Error>
"""
self.wfile.write(body.encode())
return
content_length = self.headers.get("Content-Length")
data = self.rfile.read(int(content_length)) if content_length else None
r = request(
self.command,
f"http://{UPSTREAM_HOST}{self.path}",
headers=self.headers,
data=data,
)
self.send_response(r.status_code)
for k, v in r.headers.items():
self.send_header(k, v)
self.end_headers()
self.wfile.write(r.content)
self.wfile.close()
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
"""Handle requests in a separate thread."""
httpd = ThreadedHTTPServer(("0.0.0.0", int(sys.argv[1])), RequestHandler)
httpd.serve_forever()

View File

@ -0,0 +1,124 @@
import logging
import os
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.mock_servers import start_mock_servers
from helpers.s3_tools import prepare_s3_bucket
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
def run_s3_mocks(started_cluster):
script_dir = os.path.join(os.path.dirname(__file__), "s3_mocks")
start_mock_servers(
started_cluster,
script_dir,
[
("mocker_s3.py", "resolver", "8081"),
],
)
@pytest.fixture(scope="module")
def started_cluster():
cluster = ClickHouseCluster(__file__, with_spark=True)
try:
cluster.add_instance(
"node1",
main_configs=[
"configs/config.d/named_collections.xml",
"configs/config.d/s3_headers.xml",
],
user_configs=["configs/users.d/users.xml"],
with_minio=True,
)
logging.info("Starting cluster...")
cluster.start()
prepare_s3_bucket(cluster)
logging.info("S3 bucket created")
run_s3_mocks(cluster)
yield cluster
finally:
cluster.shutdown()
CUSTOM_AUTH_TOKEN = "custom-auth-token"
CORRECT_TOKEN = "ValidToken1234"
INCORRECT_TOKEN = "InvalidToken1234"
@pytest.mark.parametrize(
"table_name, engine, query_with_invalid_token_must_fail",
[
pytest.param(
"test_access_header",
"S3('http://resolver:8081/root/test_access_header.csv', 'CSV')",
True,
id="test_access_over_custom_header",
),
pytest.param(
"test_static_override",
"S3('http://resolver:8081/root/test_static_override.csv', 'minio', 'minio123', 'CSV')",
False,
id="test_access_key_id_overrides_access_header",
),
pytest.param(
"test_named_colections",
"S3(s3_mock, format='CSV')",
False,
id="test_named_coll_overrides_access_header",
),
],
)
def test_custom_access_header(
started_cluster, table_name, engine, query_with_invalid_token_must_fail
):
instance = started_cluster.instances["node1"]
instance.query(
f"""
SET s3_truncate_on_insert=1;
INSERT INTO FUNCTION s3('http://minio1:9001/root/{table_name}.csv', 'minio', 'minio123','CSV')
SELECT number as a, toString(number) as b FROM numbers(3);
"""
)
instance.query(
f"""
DROP TABLE IF EXISTS {table_name};
CREATE TABLE {table_name} (name String, value UInt32)
ENGINE={engine};
"""
)
instance.query("SYSTEM DROP QUERY CACHE")
assert instance.query(f"SELECT count(*) FROM {table_name}") == "3\n"
config_path = "/etc/clickhouse-server/config.d/s3_headers.xml"
instance.replace_in_config(
config_path,
f"<access_header>{CUSTOM_AUTH_TOKEN}: {CORRECT_TOKEN}",
f"<access_header>{CUSTOM_AUTH_TOKEN}: {INCORRECT_TOKEN}",
)
instance.query("SYSTEM RELOAD CONFIG")
if query_with_invalid_token_must_fail:
instance.query_and_get_error(f"SELECT count(*) FROM {table_name}")
else:
assert instance.query(f"SELECT count(*) FROM {table_name}") == "3\n"
instance.replace_in_config(
config_path,
f"<access_header>{CUSTOM_AUTH_TOKEN}: {INCORRECT_TOKEN}",
f"<access_header>{CUSTOM_AUTH_TOKEN}: {CORRECT_TOKEN}",
)
instance.query("SYSTEM RELOAD CONFIG")
assert instance.query(f"SELECT count(*) FROM {table_name}") == "3\n"