mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Retry on Yandex.S3 throttle
This commit is contained in:
parent
18507cf1d7
commit
55e887728e
2
contrib/aws
vendored
2
contrib/aws
vendored
@ -1 +1 @@
|
|||||||
Subproject commit 06aa8759d17f2032ffd5efa83969270ca9ac727b
|
Subproject commit 00b03604543367d7e310cb0993973fdcb723ea79
|
@ -12,6 +12,15 @@
|
|||||||
<!-- Avoid extra retries to speed up tests -->
|
<!-- Avoid extra retries to speed up tests -->
|
||||||
<retry_attempts>0</retry_attempts>
|
<retry_attempts>0</retry_attempts>
|
||||||
</s3>
|
</s3>
|
||||||
|
<s3_retryable>
|
||||||
|
<type>s3</type>
|
||||||
|
<!-- Use custom S3 endpoint -->
|
||||||
|
<endpoint>http://resolver:8080/root/data/</endpoint>
|
||||||
|
<access_key_id>minio</access_key_id>
|
||||||
|
<secret_access_key>minio123</secret_access_key>
|
||||||
|
<!-- ClickHouse starts earlier than custom S3 endpoint. Skip access check to avoid fail on start-up -->
|
||||||
|
<skip_access_check>true</skip_access_check>
|
||||||
|
</s3_retryable>
|
||||||
<default/>
|
<default/>
|
||||||
</disks>
|
</disks>
|
||||||
<policies>
|
<policies>
|
||||||
@ -32,6 +41,13 @@
|
|||||||
</external>
|
</external>
|
||||||
</volumes>
|
</volumes>
|
||||||
</s3_cold>
|
</s3_cold>
|
||||||
|
<s3_retryable>
|
||||||
|
<volumes>
|
||||||
|
<main>
|
||||||
|
<disk>s3_retryable</disk>
|
||||||
|
</main>
|
||||||
|
</volumes>
|
||||||
|
</s3_retryable>
|
||||||
</policies>
|
</policies>
|
||||||
</storage_configuration>
|
</storage_configuration>
|
||||||
</clickhouse>
|
</clickhouse>
|
||||||
|
@ -18,6 +18,16 @@ def fail_request(_request_number):
|
|||||||
return 'OK'
|
return 'OK'
|
||||||
|
|
||||||
|
|
||||||
|
@route('/throttle_request/<_request_number>')
|
||||||
|
def fail_request(_request_number):
|
||||||
|
request_number = int(_request_number)
|
||||||
|
if request_number > 0:
|
||||||
|
cache['throttle_request_number'] = request_number
|
||||||
|
else:
|
||||||
|
cache.pop('throttle_request_number', None)
|
||||||
|
return 'OK'
|
||||||
|
|
||||||
|
|
||||||
# Handle for MultipleObjectsDelete.
|
# Handle for MultipleObjectsDelete.
|
||||||
@route('/<_bucket>', ['POST'])
|
@route('/<_bucket>', ['POST'])
|
||||||
def delete(_bucket):
|
def delete(_bucket):
|
||||||
@ -37,6 +47,15 @@ def server(_bucket, _path):
|
|||||||
response.content_type = 'text/xml'
|
response.content_type = 'text/xml'
|
||||||
return '<?xml version="1.0" encoding="UTF-8"?><Error><Code>ExpectedError</Code><Message>Expected Error</Message><RequestId>txfbd566d03042474888193-00608d7537</RequestId></Error>'
|
return '<?xml version="1.0" encoding="UTF-8"?><Error><Code>ExpectedError</Code><Message>Expected Error</Message><RequestId>txfbd566d03042474888193-00608d7537</RequestId></Error>'
|
||||||
|
|
||||||
|
if cache.get('throttle_request_number', None):
|
||||||
|
request_number = cache.pop('throttle_request_number') - 1
|
||||||
|
if request_number > 0:
|
||||||
|
cache['throttle_request_number'] = request_number
|
||||||
|
else:
|
||||||
|
response.status = 429
|
||||||
|
response.content_type = 'text/xml'
|
||||||
|
return '<?xml version="1.0" encoding="UTF-8"?><Error><Code>TooManyRequestsException</Code><Message>Please reduce your request rate.</Message><RequestId>txfbd566d03042474888193-00608d7538</RequestId></Error>'
|
||||||
|
|
||||||
response.set_header("Location", "http://minio1:9001/" + _bucket + '/' + _path)
|
response.set_header("Location", "http://minio1:9001/" + _bucket + '/' + _path)
|
||||||
response.status = 307
|
response.status = 307
|
||||||
return 'Redirected'
|
return 'Redirected'
|
||||||
|
@ -38,6 +38,12 @@ def fail_request(cluster, request):
|
|||||||
assert response == 'OK', 'Expected "OK", but got "{}"'.format(response)
|
assert response == 'OK', 'Expected "OK", but got "{}"'.format(response)
|
||||||
|
|
||||||
|
|
||||||
|
def throttle_request(cluster, request):
|
||||||
|
response = cluster.exec_in_container(cluster.get_container_id('resolver'),
|
||||||
|
["curl", "-s", "http://resolver:8080/throttle_request/{}".format(request)])
|
||||||
|
assert response == 'OK', 'Expected "OK", but got "{}"'.format(response)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="module")
|
@pytest.fixture(scope="module")
|
||||||
def cluster():
|
def cluster():
|
||||||
try:
|
try:
|
||||||
@ -186,3 +192,27 @@ def test_move_failover(cluster):
|
|||||||
# Ensure data is not corrupted.
|
# Ensure data is not corrupted.
|
||||||
assert node.query("CHECK TABLE s3_failover_test") == '1\n'
|
assert node.query("CHECK TABLE s3_failover_test") == '1\n'
|
||||||
assert node.query("SELECT id,data FROM s3_failover_test FORMAT Values") == "(0,'data'),(1,'data')"
|
assert node.query("SELECT id,data FROM s3_failover_test FORMAT Values") == "(0,'data'),(1,'data')"
|
||||||
|
|
||||||
|
|
||||||
|
# Check that throttled request retries and does not cause an error on disk with default `retry_attempts` (>0)
|
||||||
|
def test_throttle_retry(cluster):
|
||||||
|
node = cluster.instances["node"]
|
||||||
|
|
||||||
|
node.query(
|
||||||
|
"""
|
||||||
|
CREATE TABLE s3_throttle_retry_test (
|
||||||
|
id Int64
|
||||||
|
) ENGINE=MergeTree()
|
||||||
|
ORDER BY id
|
||||||
|
SETTINGS storage_policy='s3_retryable'
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
data = "(42)"
|
||||||
|
node.query("INSERT INTO s3_throttle_retry_test VALUES {}".format(data))
|
||||||
|
|
||||||
|
throttle_request(cluster, 1)
|
||||||
|
|
||||||
|
assert node.query("""
|
||||||
|
SELECT * FROM s3_throttle_retry_test
|
||||||
|
""") == '42\n'
|
||||||
|
Loading…
Reference in New Issue
Block a user