diff --git a/src/IO/ReadWriteBufferFromHTTP.h b/src/IO/ReadWriteBufferFromHTTP.h index aee2cd6c3bd..6a644a22289 100644 --- a/src/IO/ReadWriteBufferFromHTTP.h +++ b/src/IO/ReadWriteBufferFromHTTP.h @@ -150,10 +150,13 @@ namespace detail if (withPartialContent()) { + String range_header_value; if (read_range.end) - request.set("Range", fmt::format("bytes={}-{}", read_range.begin + bytes_read, *read_range.end)); + range_header_value = fmt::format("bytes={}-{}", read_range.begin + bytes_read, *read_range.end); else - request.set("Range", fmt::format("bytes={}-", read_range.begin + bytes_read)); + range_header_value = fmt::format("bytes={}-", read_range.begin + bytes_read); + LOG_TRACE(log, "Adding header: Range: {}", range_header_value); + request.set("Range", range_header_value); } if (!credentials.getUsername().empty()) @@ -360,7 +363,7 @@ namespace detail catch (const Poco::Exception & e) { /** - * Retry request unconditionally if nothing has beed read yet. + * Retry request unconditionally if nothing has been read yet. * Otherwise if it is GET method retry with range header starting from bytes_read. */ bool can_retry_request = !bytes_read || method == Poco::Net::HTTPRequest::HTTP_GET; @@ -368,9 +371,11 @@ namespace detail throw; LOG_ERROR(log, - "HTTP request to `{}` failed at try {}/{} with bytes read: {}. " + "HTTP request to `{}` failed at try {}/{} with bytes read: {}/{}. " "Error: {}. (Current backoff wait is {}/{} ms)", - uri.toString(), i, settings.http_max_tries, bytes_read, e.displayText(), + uri.toString(), i, settings.http_max_tries, + bytes_read, read_range.end ? toString(*read_range.end) : "unknown", + e.displayText(), milliseconds_to_wait, settings.http_retry_max_backoff_ms); retry_with_range_header = true; diff --git a/tests/integration/helpers/network.py b/tests/integration/helpers/network.py index decb406879e..2bf0867c847 100644 --- a/tests/integration/helpers/network.py +++ b/tests/integration/helpers/network.py @@ -151,7 +151,7 @@ class _NetworkManager: def _iptables_cmd_suffix( source=None, destination=None, source_port=None, destination_port=None, - action=None, probability=None): + action=None, probability=None, custom_args=None): ret = [] if probability is not None: ret.extend(['-m', 'statistic', '--mode', 'random', '--probability', str(probability)]) @@ -166,6 +166,8 @@ class _NetworkManager: ret.extend(['--dport', str(destination_port)]) if action is not None: ret.extend(['-j'] + action.split()) + if custom_args is not None: + ret.extend(custom_args) return ret def __init__( diff --git a/tests/integration/test_redirect_url_storage/test.py b/tests/integration/test_redirect_url_storage/test.py index 080d10e9cf4..061920954b6 100644 --- a/tests/integration/test_redirect_url_storage/test.py +++ b/tests/integration/test_redirect_url_storage/test.py @@ -109,7 +109,8 @@ def test_url_reconnect(started_cluster): node1.query( "insert into table function hdfs('hdfs://hdfs1:9000/storage_big', 'TSV', 'id Int32') select number from numbers(500000)") - pm._add_rule({'destination': node1.ip_address, 'source_port': 50075, 'action': 'REJECT'}) + pm_rule = {'destination': node1.ip_address, 'source_port': 50075, 'action': 'REJECT'} + pm._add_rule(pm_rule) def select(): global result @@ -121,7 +122,7 @@ def test_url_reconnect(started_cluster): thread.start() time.sleep(4) - pm._delete_rule({'destination': node1.ip_address, 'source_port': 50075, 'action': 'REJECT'}) + pm._delete_rule(pm_rule) thread.join() diff --git a/tests/integration/test_storage_s3/test.py b/tests/integration/test_storage_s3/test.py index bd918144935..74ce3ed0f6c 100644 --- a/tests/integration/test_storage_s3/test.py +++ b/tests/integration/test_storage_s3/test.py @@ -10,6 +10,7 @@ import time import helpers.client import pytest from helpers.cluster import ClickHouseCluster, ClickHouseInstance, get_instances_dir +from helpers.network import PartitionManager MINIO_INTERNAL_PORT = 9001 @@ -757,3 +758,40 @@ def test_predefined_connection_configuration(started_cluster): result = instance.query("SELECT * FROM s3(s3_conf1, format='CSV', structure='id UInt32')") assert result == instance.query("SELECT number FROM numbers(10)") + +result = "" +def test_url_reconnect_in_the_middle(started_cluster): + bucket = started_cluster.minio_bucket + instance = started_cluster.instances["dummy"] + table_format = "id String, data String" + filename = "test_url_reconnect_{}.tsv".format(random.randint(0, 1000)) + + instance.query(f"""insert into table function + s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{filename}', 'TSV', '{table_format}') + select number, randomPrintableASCII(number % 1000) from numbers(1000000)""") + + with PartitionManager() as pm: + pm_rule_reject = {'probability': 0.02, 'destination': instance.ip_address, 'source_port': started_cluster.minio_port, 'action': 'REJECT --reject-with tcp-reset'} + pm_rule_drop_all = {'destination': instance.ip_address, 'source_port': started_cluster.minio_port, 'action': 'DROP'} + pm._add_rule(pm_rule_reject) + + def select(): + global result + result = instance.query( + f"""select sum(cityHash64(x)) from (select toUInt64(id) + sleep(0.1) as x from + url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{filename}', 'TSV', '{table_format}') + settings http_max_tries = 10, http_retry_max_backoff_ms=2000, http_send_timeout=1, http_receive_timeout=1)""") + assert(int(result), 3914219105369203805) + + thread = threading.Thread(target=select) + thread.start() + time.sleep(4) + pm._add_rule(pm_rule_drop_all) + + time.sleep(2) + pm._delete_rule(pm_rule_drop_all) + pm._delete_rule(pm_rule_reject) + + thread.join() + + assert(int(result), 3914219105369203805)