From 7024f51b55d39aad7d6ef48ebc32ed1872278743 Mon Sep 17 00:00:00 2001 From: kssenii Date: Wed, 27 Oct 2021 18:30:25 +0000 Subject: [PATCH] Review fixes and add test --- src/Bridge/LibraryBridgeHelper.cpp | 2 +- src/Bridge/LibraryBridgeHelper.h | 1 + src/Disks/ReadIndirectBufferFromWebServer.cpp | 2 +- src/Disks/ReadIndirectBufferFromWebServer.h | 3 + src/IO/ReadWriteBufferFromHTTP.h | 43 ++++++++++--- src/Storages/StorageURL.cpp | 4 +- .../test_redirect_url_storage/test.py | 62 +++++++++++++++++++ 7 files changed, 106 insertions(+), 11 deletions(-) diff --git a/src/Bridge/LibraryBridgeHelper.cpp b/src/Bridge/LibraryBridgeHelper.cpp index bd0604ec8e0..932411b1e1c 100644 --- a/src/Bridge/LibraryBridgeHelper.cpp +++ b/src/Bridge/LibraryBridgeHelper.cpp @@ -256,7 +256,7 @@ Pipe LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWriteBufferFromHTT std::move(out_stream_callback), http_timeouts, 0, - Poco::Net::HTTPBasicCredentials{}, + credentials, DBMS_DEFAULT_BUFFER_SIZE, getContext()->getReadSettings(), ReadWriteBufferFromHTTP::HTTPHeaderEntries{}); diff --git a/src/Bridge/LibraryBridgeHelper.h b/src/Bridge/LibraryBridgeHelper.h index a209fff6ca0..393cadebfb5 100644 --- a/src/Bridge/LibraryBridgeHelper.h +++ b/src/Bridge/LibraryBridgeHelper.h @@ -101,6 +101,7 @@ private: size_t bridge_port; bool library_initialized = false; ConnectionTimeouts http_timeouts; + Poco::Net::HTTPBasicCredentials credentials{}; }; } diff --git a/src/Disks/ReadIndirectBufferFromWebServer.cpp b/src/Disks/ReadIndirectBufferFromWebServer.cpp index 4ed93438693..2c51e1ff5c7 100644 --- a/src/Disks/ReadIndirectBufferFromWebServer.cpp +++ b/src/Disks/ReadIndirectBufferFromWebServer.cpp @@ -54,7 +54,7 @@ std::unique_ptr ReadIndirectBufferFromWebServer::initialize() settings.tcp_keep_alive_timeout, http_keep_alive_timeout), 0, - Poco::Net::HTTPBasicCredentials{}, + credentials, buf_size, read_settings); } diff --git a/src/Disks/ReadIndirectBufferFromWebServer.h b/src/Disks/ReadIndirectBufferFromWebServer.h index 0a4fc95eb19..3c35e15d4ef 100644 --- a/src/Disks/ReadIndirectBufferFromWebServer.h +++ b/src/Disks/ReadIndirectBufferFromWebServer.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB @@ -43,6 +44,8 @@ private: off_t offset = 0; ReadSettings read_settings; + + Poco::Net::HTTPBasicCredentials credentials{}; }; } diff --git a/src/IO/ReadWriteBufferFromHTTP.h b/src/IO/ReadWriteBufferFromHTTP.h index 3dd3edd251c..37946f7cfc4 100644 --- a/src/IO/ReadWriteBufferFromHTTP.h +++ b/src/IO/ReadWriteBufferFromHTTP.h @@ -107,15 +107,16 @@ namespace detail std::function next_callback; size_t buffer_size; - size_t bytes_read = 0; - /// Read from offset with range header if needed. - size_t start_byte = 0; + size_t bytes_read = 0; + /// Read from offset with range header if needed (for disk web). + size_t start_byte = 0; /// Non-empty if content-length header was received. std::optional total_bytes_to_read; /// Delayed exception in case retries with partial content are not satisfiable. std::exception_ptr exception; + bool retry_with_range_header = false; ReadSettings settings; Poco::Logger * log; @@ -137,14 +138,18 @@ namespace detail request.set(std::get<0>(http_header_entry), std::get<1>(http_header_entry)); } - bool with_partial_content = bytes_read && total_bytes_to_read; + /** + * Add range header if we have start offset (for disk web) + * or if we want to retry GET request on purpose. + */ + bool with_partial_content = start_byte || retry_with_range_header; if (with_partial_content) request.set("Range", fmt::format("bytes={}-", start_byte + bytes_read)); if (!credentials.getUsername().empty()) credentials.authenticate(request); - LOG_TRACE((&Poco::Logger::get("ReadWriteBufferFromHTTP")), "Sending request to {}", uri_.toString()); + LOG_TRACE(log, "Sending request to {}", uri_.toString()); auto sess = session->getSession(); @@ -205,7 +210,13 @@ namespace detail , settings {settings_} , log(&Poco::Logger::get("ReadWriteBufferFromHTTP")) { - initialize(); + if (settings.http_max_tries <= 0 || settings.http_retry_initial_backoff_ms <= 0 + || settings.http_retry_initial_backoff_ms >= settings.http_retry_max_backoff_ms) + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Invalid setting for http backoff, " + "must be http_max_tries >= 1 (current is {}) and " + "0 < http_retry_initial_backoff_ms < settings.http_retry_max_backoff_ms (but now {} > {})", + settings.http_max_tries, settings.http_retry_initial_backoff_ms, settings.http_retry_max_backoff_ms) } void initialize() @@ -272,12 +283,28 @@ namespace detail } catch (const Poco::Exception & e) { - bool can_retry_request = !bytes_read || total_bytes_to_read.has_value(); + /** + * Retry request unconditionally if nothing has beed read yet. + * Otherwise if it is GET method retry with range header starting from bytes_read. + */ + bool can_retry_request = !bytes_read || method == Poco::Net::HTTPRequest::HTTP_GET; if (!can_retry_request) throw; - LOG_ERROR(&Poco::Logger::get("ReadBufferFromHTTP"), "Error: {}, code: {}", e.what(), e.code()); + /** + * if total_size is not known, last write can fail if we retry with + * bytes_read == total_size and header `bytes=bytes_read-` + * (we will get an error code 416 - range not satisfiable). + * In this case rethrow previous exception. + */ + if (exception && !total_bytes_to_read.has_value() && e.code() == 416) + std::rethrow_exception(exception); + LOG_ERROR(log, "HTTP request to `{}` failed at try {}/{}. Error: {}, code: {}. (Current backoff wait is {}/{} ms)", + uri.toString(), i, settings.http_max_tries, e.what(), e.code(), + milliseconds_to_wait, settings.http_retry_max_backoff_ms); + + retry_with_range_header = true; exception = std::current_exception(); impl.reset(); sleepForMilliseconds(milliseconds_to_wait); diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index f1531279c2b..f1b737ec443 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -122,7 +122,7 @@ namespace callback, timeouts, context->getSettingsRef().max_http_get_redirects, - Poco::Net::HTTPBasicCredentials{}, + credentials, DBMS_DEFAULT_BUFFER_SIZE, context->getReadSettings(), headers, @@ -189,6 +189,8 @@ namespace std::unique_ptr read_buf; std::unique_ptr pipeline; std::unique_ptr reader; + + Poco::Net::HTTPBasicCredentials credentials{}; }; } diff --git a/tests/integration/test_redirect_url_storage/test.py b/tests/integration/test_redirect_url_storage/test.py index d3808cd890d..7fe1d9627e5 100644 --- a/tests/integration/test_redirect_url_storage/test.py +++ b/tests/integration/test_redirect_url_storage/test.py @@ -1,6 +1,10 @@ import pytest from helpers.cluster import ClickHouseCluster +from helpers.network import PartitionManager +import threading +import time + cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance('node1', main_configs=['configs/named_collections.xml'], with_zookeeper=False, with_hdfs=True) @@ -95,3 +99,61 @@ def test_predefined_connection_configuration(started_cluster): result = node1.query("SET max_http_get_redirects=1; select * from url(url1, url='http://hdfs1:50070/webhdfs/v1/simple_storage?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', format='TSV', structure='id UInt32, name String, weight Float64')") assert(result == "1\tMark\t72.53\n") node1.query("drop table WebHDFSStorageWithRedirect") + + +result = '' +def test_url_reconnect_at_start(started_cluster): + hdfs_api = started_cluster.hdfs_api + + with PartitionManager() as pm: + node1.query( + "insert into table function hdfs('hdfs://hdfs1:9000/storage_big', 'TSV', 'id Int32') select number from numbers(500000)") + + pm._add_rule({'probability': 1, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'}) + + def select(): + global result + print("reading") + result = node1.query( + "select sum(cityHash64(id)) from url('http://hdfs1:50075/webhdfs/v1/storage_big?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', 'TSV', 'id Int32')") + print(result) + + thread = threading.Thread(target=select) + thread.start() + time.sleep(3) + print("delete rule") + pm._delete_rule({'probability': 1, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'}) + + thread.join() + assert node1.contains_in_log("Error: Timeout, code:") + print(result) + +result = '' +def test_url_reconnect_in_the_middle(started_cluster): + hdfs_api = started_cluster.hdfs_api + + with PartitionManager() as pm: + node1.query( + "insert into table function hdfs('hdfs://hdfs1:9000/storage_big2', 'TSV', 'id Int32') select number from numbers(10000000)") + + def select(): + global result + print("reading") + result = node1.query( + "select sum(cityHash64(id)) from url('http://hdfs1:50075/webhdfs/v1/storage_big2?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', 'TSV', 'id Int32')") + print(result) + + thread = threading.Thread(target=select) + print("add rule") + pm._add_rule({'probability': 0.3, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'}) + thread.start() + time.sleep(0.5) + pm._add_rule({'probability': 1, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'}) + time.sleep(3) + print("delete rule") + pm._delete_rule({'probability': 0.3, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'}) + pm._delete_rule({'probability': 1, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'}) + + thread.join() + assert node1.contains_in_log("Error: Timeout, code:") + print(result)