mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 01:22:04 +00:00
Added test for missing DeleteObjects
request.
This commit is contained in:
parent
92880d7840
commit
9ca8c035f2
@ -15,6 +15,13 @@
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
<s3_max_single_read_retries>10</s3_max_single_read_retries>
|
||||
</unstable_s3>
|
||||
<no_delete_objects_s3>
|
||||
<type>s3</type>
|
||||
<endpoint>http://resolver:8082/root/data/</endpoint>
|
||||
<access_key_id>minio</access_key_id>
|
||||
<secret_access_key>minio123</secret_access_key>
|
||||
<s3_max_single_read_retries>10</s3_max_single_read_retries>
|
||||
</no_delete_objects_s3>
|
||||
<hdd>
|
||||
<type>local</type>
|
||||
<path>/</path>
|
||||
@ -46,6 +53,13 @@
|
||||
</main>
|
||||
</volumes>
|
||||
</unstable_s3>
|
||||
<no_delete_objects_s3>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>no_delete_objects_s3</disk>
|
||||
</main>
|
||||
</volumes>
|
||||
</no_delete_objects_s3>
|
||||
<s3_cache>
|
||||
<volumes>
|
||||
<main>
|
||||
|
@ -0,0 +1,90 @@
|
||||
import http.client
|
||||
import http.server
|
||||
import random
|
||||
import socketserver
|
||||
import sys
|
||||
import urllib.parse
|
||||
|
||||
|
||||
UPSTREAM_HOST = "minio1:9001"
|
||||
random.seed("No delete objects/1.0")
|
||||
|
||||
|
||||
def request(command, url, headers={}, data=None):
|
||||
"""Mini-requests."""
|
||||
|
||||
class Dummy:
|
||||
pass
|
||||
|
||||
parts = urllib.parse.urlparse(url)
|
||||
c = http.client.HTTPConnection(parts.hostname, parts.port)
|
||||
c.request(
|
||||
command,
|
||||
urllib.parse.urlunparse(parts._replace(scheme="", netloc="")),
|
||||
headers=headers,
|
||||
body=data
|
||||
)
|
||||
r = c.getresponse()
|
||||
result = Dummy()
|
||||
result.status_code = r.status
|
||||
result.headers = r.headers
|
||||
result.content = r.read()
|
||||
return result
|
||||
|
||||
|
||||
class RequestHandler(http.server.BaseHTTPRequestHandler):
|
||||
def do_GET(self):
|
||||
if self.path == "/":
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "text/plain")
|
||||
self.end_headers()
|
||||
self.wfile.write(b"OK")
|
||||
else:
|
||||
self.do_HEAD()
|
||||
|
||||
def do_PUT(self):
|
||||
self.do_HEAD()
|
||||
|
||||
def do_DELETE(self):
|
||||
self.do_HEAD()
|
||||
|
||||
def do_POST(self):
|
||||
query = urllib.parse.urlparse(self.path).query
|
||||
params = urllib.parse.parse_qs(query, keep_blank_values=True)
|
||||
if 'delete' in params:
|
||||
self.send_response(501)
|
||||
self.send_header("Content-Type", "application/xml")
|
||||
self.end_headers()
|
||||
self.wfile.write(b"""<?xml version="1.0" encoding="UTF-8"?>
|
||||
<Error>
|
||||
<Code>NotImplemented</Code>
|
||||
<Message>Ima GCP and I can't do `DeleteObjects` request for ya. See https://issuetracker.google.com/issues/162653700 .</Message>
|
||||
<Resource>RESOURCE</Resource>
|
||||
<RequestId>REQUEST_ID</RequestId>
|
||||
</Error>""")
|
||||
else:
|
||||
self.do_HEAD()
|
||||
|
||||
def do_HEAD(self):
|
||||
content_length = self.headers.get("Content-Length")
|
||||
data = self.rfile.read(int(content_length)) if content_length else None
|
||||
r = request(
|
||||
self.command,
|
||||
f"http://{UPSTREAM_HOST}{self.path}",
|
||||
headers=self.headers,
|
||||
data=data
|
||||
)
|
||||
self.send_response(r.status_code)
|
||||
for k, v in r.headers.items():
|
||||
self.send_header(k, v)
|
||||
self.end_headers()
|
||||
self.wfile.write(r.content)
|
||||
self.wfile.close()
|
||||
|
||||
|
||||
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
|
||||
"""Handle requests in a separate thread."""
|
||||
|
||||
|
||||
httpd = ThreadedHTTPServer(("0.0.0.0", int(sys.argv[1])), RequestHandler)
|
||||
httpd.serve_forever()
|
@ -67,7 +67,10 @@ def create_table(node, table_name, **additional_settings):
|
||||
|
||||
def run_s3_mocks(cluster):
|
||||
logging.info("Starting s3 mocks")
|
||||
mocks = (("unstable_proxy.py", "resolver", "8081"),)
|
||||
mocks = (
|
||||
("unstable_proxy.py", "resolver", "8081"),
|
||||
("no_delete_objects.py", "resolver", "8082")
|
||||
)
|
||||
for mock_filename, container, port in mocks:
|
||||
container_id = cluster.get_container_id(container)
|
||||
current_dir = os.path.dirname(__file__)
|
||||
@ -637,6 +640,15 @@ def test_s3_disk_restart_during_load(cluster, node_name):
|
||||
thread.join()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("node_name", ["node"])
|
||||
def test_s3_no_delete_objects(cluster, node_name):
|
||||
node = cluster.instances[node_name]
|
||||
create_table(node, "s3_test_no_delete_objects", storage_policy="no_delete_objects_s3")
|
||||
node.query(
|
||||
"DROP TABLE s3_test_no_delete_objects SYNC"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("node_name", ["node"])
|
||||
def test_s3_disk_reads_on_unstable_connection(cluster, node_name):
|
||||
node = cluster.instances[node_name]
|
||||
|
Loading…
Reference in New Issue
Block a user