This commit is contained in:
Alexander Tokmakov 2021-11-09 23:11:02 +03:00
parent 36f213e51c
commit 4f542ab019
4 changed files with 54 additions and 8 deletions

View File

@ -150,10 +150,13 @@ namespace detail
if (withPartialContent())
{
String range_header_value;
if (read_range.end)
request.set("Range", fmt::format("bytes={}-{}", read_range.begin + bytes_read, *read_range.end));
range_header_value = fmt::format("bytes={}-{}", read_range.begin + bytes_read, *read_range.end);
else
request.set("Range", fmt::format("bytes={}-", read_range.begin + bytes_read));
range_header_value = fmt::format("bytes={}-", read_range.begin + bytes_read);
LOG_TRACE(log, "Adding header: Range: {}", range_header_value);
request.set("Range", range_header_value);
}
if (!credentials.getUsername().empty())
@ -360,7 +363,7 @@ namespace detail
catch (const Poco::Exception & e)
{
/**
* Retry request unconditionally if nothing has beed read yet.
* Retry request unconditionally if nothing has been read yet.
* Otherwise if it is GET method retry with range header starting from bytes_read.
*/
bool can_retry_request = !bytes_read || method == Poco::Net::HTTPRequest::HTTP_GET;
@ -368,9 +371,11 @@ namespace detail
throw;
LOG_ERROR(log,
"HTTP request to `{}` failed at try {}/{} with bytes read: {}. "
"HTTP request to `{}` failed at try {}/{} with bytes read: {}/{}. "
"Error: {}. (Current backoff wait is {}/{} ms)",
uri.toString(), i, settings.http_max_tries, bytes_read, e.displayText(),
uri.toString(), i, settings.http_max_tries,
bytes_read, read_range.end ? toString(*read_range.end) : "unknown",
e.displayText(),
milliseconds_to_wait, settings.http_retry_max_backoff_ms);
retry_with_range_header = true;

View File

@ -151,7 +151,7 @@ class _NetworkManager:
def _iptables_cmd_suffix(
source=None, destination=None,
source_port=None, destination_port=None,
action=None, probability=None):
action=None, probability=None, custom_args=None):
ret = []
if probability is not None:
ret.extend(['-m', 'statistic', '--mode', 'random', '--probability', str(probability)])
@ -166,6 +166,8 @@ class _NetworkManager:
ret.extend(['--dport', str(destination_port)])
if action is not None:
ret.extend(['-j'] + action.split())
if custom_args is not None:
ret.extend(custom_args)
return ret
def __init__(

View File

@ -109,7 +109,8 @@ def test_url_reconnect(started_cluster):
node1.query(
"insert into table function hdfs('hdfs://hdfs1:9000/storage_big', 'TSV', 'id Int32') select number from numbers(500000)")
pm._add_rule({'destination': node1.ip_address, 'source_port': 50075, 'action': 'REJECT'})
pm_rule = {'destination': node1.ip_address, 'source_port': 50075, 'action': 'REJECT'}
pm._add_rule(pm_rule)
def select():
global result
@ -121,7 +122,7 @@ def test_url_reconnect(started_cluster):
thread.start()
time.sleep(4)
pm._delete_rule({'destination': node1.ip_address, 'source_port': 50075, 'action': 'REJECT'})
pm._delete_rule(pm_rule)
thread.join()

View File

@ -10,6 +10,7 @@ import time
import helpers.client
import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance, get_instances_dir
from helpers.network import PartitionManager
MINIO_INTERNAL_PORT = 9001
@ -757,3 +758,40 @@ def test_predefined_connection_configuration(started_cluster):
result = instance.query("SELECT * FROM s3(s3_conf1, format='CSV', structure='id UInt32')")
assert result == instance.query("SELECT number FROM numbers(10)")
result = ""
def test_url_reconnect_in_the_middle(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
table_format = "id String, data String"
filename = "test_url_reconnect_{}.tsv".format(random.randint(0, 1000))
instance.query(f"""insert into table function
s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{filename}', 'TSV', '{table_format}')
select number, randomPrintableASCII(number % 1000) from numbers(1000000)""")
with PartitionManager() as pm:
pm_rule_reject = {'probability': 0.02, 'destination': instance.ip_address, 'source_port': started_cluster.minio_port, 'action': 'REJECT --reject-with tcp-reset'}
pm_rule_drop_all = {'destination': instance.ip_address, 'source_port': started_cluster.minio_port, 'action': 'DROP'}
pm._add_rule(pm_rule_reject)
def select():
global result
result = instance.query(
f"""select sum(cityHash64(x)) from (select toUInt64(id) + sleep(0.1) as x from
url('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{filename}', 'TSV', '{table_format}')
settings http_max_tries = 10, http_retry_max_backoff_ms=2000, http_send_timeout=1, http_receive_timeout=1)""")
assert(int(result), 3914219105369203805)
thread = threading.Thread(target=select)
thread.start()
time.sleep(4)
pm._add_rule(pm_rule_drop_all)
time.sleep(2)
pm._delete_rule(pm_rule_drop_all)
pm._delete_rule(pm_rule_reject)
thread.join()
assert(int(result), 3914219105369203805)