From 41a199e17534ed68811f7bfd7d02f2fe12a2c14d Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Fri, 20 Jan 2023 20:10:23 +0100 Subject: [PATCH] Fix crash when `ListObjects` request fails (#45371) --- src/Core/Settings.h | 1 + src/Storages/StorageS3.cpp | 22 +++- src/Storages/StorageS3Settings.cpp | 5 + src/Storages/StorageS3Settings.h | 1 + .../s3_mocks/no_list_objects.py | 122 ++++++++++++++++++ tests/integration/test_storage_s3/test.py | 43 ++++++ 6 files changed, 193 insertions(+), 1 deletion(-) create mode 100644 tests/integration/test_storage_s3/s3_mocks/no_list_objects.py diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 1ade4ba2868..8f81684a1e3 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -95,6 +95,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(UInt64, s3_max_get_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_get_rps`", 0) \ M(UInt64, s3_max_put_rps, 0, "Limit on S3 PUT request per second rate before throttling. Zero means unlimited.", 0) \ M(UInt64, s3_max_put_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_put_rps`", 0) \ + M(UInt64, s3_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \ M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \ M(Bool, s3_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in s3 engine tables", 0) \ M(Bool, s3_check_objects_after_upload, false, "Check each uploaded object to s3 with head request to be sure that upload was successful", 0) \ diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 5c393f3864b..736aa0d8a5c 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -171,6 +171,7 @@ public: request.SetBucket(globbed_uri.bucket); request.SetPrefix(key_prefix); + request.SetMaxKeys(static_cast(request_settings.list_object_keys_size)); outcome_future = listObjectsAsync(); @@ -214,7 +215,24 @@ private: if (is_finished) return {}; - fillInternalBufferAssumeLocked(); + try + { + fillInternalBufferAssumeLocked(); + } + catch (...) + { + /// In case of exception thrown while listing new batch of files + /// iterator may be partially initialized and its further using may lead to UB. + /// Iterator is used by several processors from several threads and + /// it may take some time for threads to stop processors and they + /// may still use this iterator after exception is thrown. + /// To avoid this UB, reset the buffer and return defaults for further calls. + is_finished = true; + buffer.clear(); + buffer_iter = buffer.begin(); + throw; + } + return nextAssumeLocked(); } @@ -226,9 +244,11 @@ private: auto outcome = outcome_future.get(); if (!outcome.IsSuccess()) + { throw Exception(ErrorCodes::S3_ERROR, "Could not list objects in bucket {} with prefix {}, S3 exception: {}, message: {}", quoteString(request.GetBucket()), quoteString(request.GetPrefix()), backQuote(outcome.GetError().GetExceptionName()), quoteString(outcome.GetError().GetMessage())); + } const auto & result_batch = outcome.GetResult().GetContents(); diff --git a/src/Storages/StorageS3Settings.cpp b/src/Storages/StorageS3Settings.cpp index 2e6e6acdffd..be83d60131a 100644 --- a/src/Storages/StorageS3Settings.cpp +++ b/src/Storages/StorageS3Settings.cpp @@ -167,6 +167,7 @@ S3Settings::RequestSettings::RequestSettings(const NamedCollection & collection) { max_single_read_retries = collection.getOrDefault("max_single_read_retries", max_single_read_retries); max_connections = collection.getOrDefault("max_connections", max_connections); + list_object_keys_size = collection.getOrDefault("list_object_keys_size", list_object_keys_size); } S3Settings::RequestSettings::RequestSettings( @@ -180,6 +181,7 @@ S3Settings::RequestSettings::RequestSettings( max_single_read_retries = config.getUInt64(key + "max_single_read_retries", settings.s3_max_single_read_retries); max_connections = config.getUInt64(key + "max_connections", settings.s3_max_connections); check_objects_after_upload = config.getBool(key + "check_objects_after_upload", settings.s3_check_objects_after_upload); + list_object_keys_size = config.getUInt64(key + "list_object_keys_size", settings.s3_list_object_keys_size); /// NOTE: it would be better to reuse old throttlers to avoid losing token bucket state on every config reload, /// which could lead to exceeding limit for short time. But it is good enough unless very high `burst` values are used. @@ -219,6 +221,9 @@ void S3Settings::RequestSettings::updateFromSettingsImpl(const Settings & settin if (!if_changed || settings.s3_max_unexpected_write_error_retries.changed) max_unexpected_write_error_retries = settings.s3_max_unexpected_write_error_retries; + if (!if_changed || settings.s3_list_object_keys_size.changed) + list_object_keys_size = settings.s3_list_object_keys_size; + if ((!if_changed || settings.s3_max_get_rps.changed || settings.s3_max_get_burst.changed) && settings.s3_max_get_rps) get_request_throttler = std::make_shared( settings.s3_max_get_rps, settings.s3_max_get_burst ? settings.s3_max_get_burst : Throttler::default_burst_seconds * settings.s3_max_get_rps); diff --git a/src/Storages/StorageS3Settings.h b/src/Storages/StorageS3Settings.h index 98914d5bf25..bce772859f0 100644 --- a/src/Storages/StorageS3Settings.h +++ b/src/Storages/StorageS3Settings.h @@ -63,6 +63,7 @@ struct S3Settings size_t max_connections = 1024; bool check_objects_after_upload = false; size_t max_unexpected_write_error_retries = 4; + size_t list_object_keys_size = 1000; ThrottlerPtr get_request_throttler; ThrottlerPtr put_request_throttler; diff --git a/tests/integration/test_storage_s3/s3_mocks/no_list_objects.py b/tests/integration/test_storage_s3/s3_mocks/no_list_objects.py new file mode 100644 index 00000000000..1f6d0435872 --- /dev/null +++ b/tests/integration/test_storage_s3/s3_mocks/no_list_objects.py @@ -0,0 +1,122 @@ +import http.client +import http.server +import random +import socketserver +import sys +import urllib.parse + + +UPSTREAM_HOST = "minio1:9001" +random.seed("No list objects/1.0") + +list_request_counter = 0 +list_request_max_number = 10 + + +def request(command, url, headers={}, data=None): + """Mini-requests.""" + + class Dummy: + pass + + parts = urllib.parse.urlparse(url) + c = http.client.HTTPConnection(parts.hostname, parts.port) + c.request( + command, + urllib.parse.urlunparse(parts._replace(scheme="", netloc="")), + headers=headers, + body=data, + ) + r = c.getresponse() + result = Dummy() + result.status_code = r.status + result.headers = r.headers + result.content = r.read() + return result + + +class RequestHandler(http.server.BaseHTTPRequestHandler): + def do_GET(self): + if self.path == "/": + self.send_response(200) + self.send_header("Content-Type", "text/plain") + self.end_headers() + self.wfile.write(b"OK") + return + + query = urllib.parse.urlparse(self.path).query + params = urllib.parse.parse_qs(query, keep_blank_values=True) + + global list_request_counter + global list_request_max_number + + if "list-type" in params: + if list_request_counter > list_request_max_number: + self.send_response(501) + self.send_header("Content-Type", "application/xml") + self.end_headers() + self.wfile.write( + b""" + + NotImplemented + I can list object only once + RESOURCE + REQUEST_ID + + """ + ) + else: + list_request_counter += 1 + self.do_HEAD() + else: + self.do_HEAD() + + def do_PUT(self): + self.do_HEAD() + + def do_DELETE(self): + self.do_HEAD() + + def do_POST(self): + global list_request_counter + global list_request_max_number + + if self.path.startswith("/reset_counters"): + query = urllib.parse.urlparse(self.path).query + params = urllib.parse.parse_qs(query, keep_blank_values=True) + + list_request_counter = 0 + if "max" in params: + list_request_max_number = int(params["max"][0]) + + self.send_response(200) + self.send_header("Content-Type", "text/plain") + self.end_headers() + self.wfile.write(b"OK") + return + + self.do_HEAD() + + def do_HEAD(self): + content_length = self.headers.get("Content-Length") + data = self.rfile.read(int(content_length)) if content_length else None + r = request( + self.command, + f"http://{UPSTREAM_HOST}{self.path}", + headers=self.headers, + data=data, + ) + self.send_response(r.status_code) + for k, v in r.headers.items(): + self.send_header(k, v) + self.end_headers() + self.wfile.write(r.content) + self.wfile.close() + + +class ThreadedHTTPServer(socketserver.ThreadingMixIn, http.server.HTTPServer): + """Handle requests in a separate thread.""" + + +httpd = ThreadedHTTPServer(("0.0.0.0", int(sys.argv[1])), RequestHandler) +httpd.serve_forever() diff --git a/tests/integration/test_storage_s3/test.py b/tests/integration/test_storage_s3/test.py index 2e959a4d0ed..21d41ec2a38 100644 --- a/tests/integration/test_storage_s3/test.py +++ b/tests/integration/test_storage_s3/test.py @@ -703,6 +703,7 @@ def run_s3_mocks(started_cluster): ("mock_s3.py", "resolver", "8080"), ("unstable_server.py", "resolver", "8081"), ("echo.py", "resolver", "8082"), + ("no_list_objects.py", "resolver", "8083"), ], ) @@ -1711,3 +1712,45 @@ def test_environment_credentials(started_cluster): f"select count() from s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_cache3.jsonl')" ).strip() ) + + +def test_s3_list_objects_failure(started_cluster): + bucket = started_cluster.minio_bucket + instance = started_cluster.instances["dummy"] # type: ClickHouseInstance + filename = "test_no_list_{_partition_id}.csv" + + put_query = f""" + INSERT INTO TABLE FUNCTION + s3('http://resolver:8083/{bucket}/{filename}', 'CSV', 'c1 UInt32') + PARTITION BY c1 % 20 + SELECT number FROM numbers(100) + SETTINGS s3_truncate_on_insert=1 + """ + + run_query(instance, put_query) + + T = 10 + for _ in range(0, T): + started_cluster.exec_in_container( + started_cluster.get_container_id("resolver"), + [ + "curl", + "-X", + "POST", + f"http://localhost:8083/reset_counters?max={random.randint(1, 15)}", + ], + ) + + get_query = """ + SELECT sleep({seconds}) FROM s3('http://resolver:8083/{bucket}/test_no_list_*', 'CSV', 'c1 UInt32') + SETTINGS s3_list_object_keys_size = 1, max_threads = {max_threads}, enable_s3_requests_logging = 1, input_format_parallel_parsing = 0 + """.format( + bucket=bucket, seconds=random.random(), max_threads=random.randint(2, 20) + ) + + with pytest.raises(helpers.client.QueryRuntimeException) as ei: + result = run_query(instance, get_query) + print(result) + + assert ei.value.returncode == 243 + assert "Could not list objects" in ei.value.stderr