Fix crash when ListObjects request fails (#45371)

This commit is contained in:
Anton Popov 2023-01-20 20:10:23 +01:00 committed by GitHub
parent 4bbe90f6b4
commit 41a199e175
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 193 additions and 1 deletions

View File

@ -95,6 +95,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, s3_max_get_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_get_rps`", 0) \
M(UInt64, s3_max_put_rps, 0, "Limit on S3 PUT request per second rate before throttling. Zero means unlimited.", 0) \
M(UInt64, s3_max_put_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_put_rps`", 0) \
M(UInt64, s3_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \
M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \

View File

@ -171,6 +171,7 @@ public:
request.SetBucket(globbed_uri.bucket);
request.SetPrefix(key_prefix);
request.SetMaxKeys(static_cast<int>(request_settings.list_object_keys_size));
outcome_future = listObjectsAsync();
@ -214,7 +215,24 @@ private:
if (is_finished)
return {};
fillInternalBufferAssumeLocked();
try
{
fillInternalBufferAssumeLocked();
}
catch (...)
{
/// In case of exception thrown while listing new batch of files
/// iterator may be partially initialized and its further using may lead to UB.
/// Iterator is used by several processors from several threads and
/// it may take some time for threads to stop processors and they
/// may still use this iterator after exception is thrown.
/// To avoid this UB, reset the buffer and return defaults for further calls.
is_finished = true;
buffer.clear();
buffer_iter = buffer.begin();
throw;
}
return nextAssumeLocked();
}
@ -226,9 +244,11 @@ private:
auto outcome = outcome_future.get();
if (!outcome.IsSuccess())
{
throw Exception(ErrorCodes::S3_ERROR, "Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {}",
quoteString(request.GetBucket()), quoteString(request.GetPrefix()),
backQuote(outcome.GetError().GetExceptionName()), quoteString(outcome.GetError().GetMessage()));
}
const auto & result_batch = outcome.GetResult().GetContents();

View File

@ -167,6 +167,7 @@ S3Settings::RequestSettings::RequestSettings(const NamedCollection & collection)
{
max_single_read_retries = collection.getOrDefault<UInt64>("max_single_read_retries", max_single_read_retries);
max_connections = collection.getOrDefault<UInt64>("max_connections", max_connections);
list_object_keys_size = collection.getOrDefault<UInt64>("list_object_keys_size", list_object_keys_size);
}
S3Settings::RequestSettings::RequestSettings(
@ -180,6 +181,7 @@ S3Settings::RequestSettings::RequestSettings(
max_single_read_retries = config.getUInt64(key + "max_single_read_retries", settings.s3_max_single_read_retries);
max_connections = config.getUInt64(key + "max_connections", settings.s3_max_connections);
check_objects_after_upload = config.getBool(key + "check_objects_after_upload", settings.s3_check_objects_after_upload);
list_object_keys_size = config.getUInt64(key + "list_object_keys_size", settings.s3_list_object_keys_size);
/// NOTE: it would be better to reuse old throttlers to avoid losing token bucket state on every config reload,
/// which could lead to exceeding limit for short time. But it is good enough unless very high `burst` values are used.
@ -219,6 +221,9 @@ void S3Settings::RequestSettings::updateFromSettingsImpl(const Settings & settin
if (!if_changed || settings.s3_max_unexpected_write_error_retries.changed)
max_unexpected_write_error_retries = settings.s3_max_unexpected_write_error_retries;
if (!if_changed || settings.s3_list_object_keys_size.changed)
list_object_keys_size = settings.s3_list_object_keys_size;
if ((!if_changed || settings.s3_max_get_rps.changed || settings.s3_max_get_burst.changed) && settings.s3_max_get_rps)
get_request_throttler = std::make_shared<Throttler>(
settings.s3_max_get_rps, settings.s3_max_get_burst ? settings.s3_max_get_burst : Throttler::default_burst_seconds * settings.s3_max_get_rps);

View File

@ -63,6 +63,7 @@ struct S3Settings
size_t max_connections = 1024;
bool check_objects_after_upload = false;
size_t max_unexpected_write_error_retries = 4;
size_t list_object_keys_size = 1000;
ThrottlerPtr get_request_throttler;
ThrottlerPtr put_request_throttler;

View File

@ -0,0 +1,122 @@
import http.client
import http.server
import random
import socketserver
import sys
import urllib.parse
UPSTREAM_HOST = "minio1:9001"
random.seed("No list objects/1.0")
list_request_counter = 0
list_request_max_number = 10
def request(command, url, headers={}, data=None):
"""Mini-requests."""
class Dummy:
pass
parts = urllib.parse.urlparse(url)
c = http.client.HTTPConnection(parts.hostname, parts.port)
c.request(
command,
urllib.parse.urlunparse(parts._replace(scheme="", netloc="")),
headers=headers,
body=data,
)
r = c.getresponse()
result = Dummy()
result.status_code = r.status
result.headers = r.headers
result.content = r.read()
return result
class RequestHandler(http.server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/":
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"OK")
return
query = urllib.parse.urlparse(self.path).query
params = urllib.parse.parse_qs(query, keep_blank_values=True)
global list_request_counter
global list_request_max_number
if "list-type" in params:
if list_request_counter > list_request_max_number:
self.send_response(501)
self.send_header("Content-Type", "application/xml")
self.end_headers()
self.wfile.write(
b"""<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>NotImplemented</Code>
<Message>I can list object only once</Message>
<Resource>RESOURCE</Resource>
<RequestId>REQUEST_ID</RequestId>
</Error>
"""
)
else:
list_request_counter += 1
self.do_HEAD()
else:
self.do_HEAD()
def do_PUT(self):
self.do_HEAD()
def do_DELETE(self):
self.do_HEAD()
def do_POST(self):
global list_request_counter
global list_request_max_number
if self.path.startswith("/reset_counters"):
query = urllib.parse.urlparse(self.path).query
params = urllib.parse.parse_qs(query, keep_blank_values=True)
list_request_counter = 0
if "max" in params:
list_request_max_number = int(params["max"][0])
self.send_response(200)
self.send_header("Content-Type", "text/plain")
self.end_headers()
self.wfile.write(b"OK")
return
self.do_HEAD()
def do_HEAD(self):
content_length = self.headers.get("Content-Length")
data = self.rfile.read(int(content_length)) if content_length else None
r = request(
self.command,
f"http://{UPSTREAM_HOST}{self.path}",
headers=self.headers,
data=data,
)
self.send_response(r.status_code)
for k, v in r.headers.items():
self.send_header(k, v)
self.end_headers()
self.wfile.write(r.content)
self.wfile.close()
class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer):
"""Handle requests in a separate thread."""
httpd = ThreadedHTTPServer(("0.0.0.0", int(sys.argv[1])), RequestHandler)
httpd.serve_forever()

View File

@ -703,6 +703,7 @@ def run_s3_mocks(started_cluster):
("mock_s3.py", "resolver", "8080"),
("unstable_server.py", "resolver", "8081"),
("echo.py", "resolver", "8082"),
("no_list_objects.py", "resolver", "8083"),
],
)
@ -1711,3 +1712,45 @@ def test_environment_credentials(started_cluster):
f"select count() from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl')"
).strip()
)
def test_s3_list_objects_failure(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"] # type: ClickHouseInstance
filename = "test_no_list_{_partition_id}.csv"
put_query = f"""
INSERT INTO TABLE FUNCTION
s3('http://resolver:8083/{bucket}/{filename}', 'CSV', 'c1 UInt32')
PARTITION BY c1 % 20
SELECT number FROM numbers(100)
SETTINGS s3_truncate_on_insert=1
"""
run_query(instance, put_query)
T = 10
for _ in range(0, T):
started_cluster.exec_in_container(
started_cluster.get_container_id("resolver"),
[
"curl",
"-X",
"POST",
f"http://localhost:8083/reset_counters?max={random.randint(1, 15)}",
],
)
get_query = """
SELECT sleep({seconds}) FROM s3('http://resolver:8083/{bucket}/test_no_list_*', 'CSV', 'c1 UInt32')
SETTINGS s3_list_object_keys_size = 1, max_threads = {max_threads}, enable_s3_requests_logging = 1, input_format_parallel_parsing = 0
""".format(
bucket=bucket, seconds=random.random(), max_threads=random.randint(2, 20)
)
with pytest.raises(helpers.client.QueryRuntimeException) as ei:
result = run_query(instance, get_query)
print(result)
assert ei.value.returncode == 243
assert "Could not list objects" in ei.value.stderr