Review fixes and add test

This commit is contained in:
kssenii 2021-10-27 18:30:25 +00:00
parent 7de5fca075
commit 7024f51b55
7 changed files with 106 additions and 11 deletions

View File

@ -256,7 +256,7 @@ Pipe LibraryBridgeHelper::loadBase(const Poco::URI & uri, ReadWriteBufferFromHTT
std::move(out_stream_callback),
http_timeouts,
0,
Poco::Net::HTTPBasicCredentials{},
credentials,
DBMS_DEFAULT_BUFFER_SIZE,
getContext()->getReadSettings(),
ReadWriteBufferFromHTTP::HTTPHeaderEntries{});

View File

@ -101,6 +101,7 @@ private:
size_t bridge_port;
bool library_initialized = false;
ConnectionTimeouts http_timeouts;
Poco::Net::HTTPBasicCredentials credentials{};
};
}

View File

@ -54,7 +54,7 @@ std::unique_ptr<ReadBuffer> ReadIndirectBufferFromWebServer::initialize()
settings.tcp_keep_alive_timeout,
http_keep_alive_timeout),
0,
Poco::Net::HTTPBasicCredentials{},
credentials,
buf_size,
read_settings);
}

View File

@ -4,6 +4,7 @@
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadSettings.h>
#include <Interpreters/Context.h>
#include <Poco/Net/HTTPBasicCredentials.h>
namespace DB
@ -43,6 +44,8 @@ private:
off_t offset = 0;
ReadSettings read_settings;
Poco::Net::HTTPBasicCredentials credentials{};
};
}

View File

@ -107,15 +107,16 @@ namespace detail
std::function<void(size_t)> next_callback;
size_t buffer_size;
size_t bytes_read = 0;
/// Read from offset with range header if needed.
size_t start_byte = 0;
size_t bytes_read = 0;
/// Read from offset with range header if needed (for disk web).
size_t start_byte = 0;
/// Non-empty if content-length header was received.
std::optional<size_t> total_bytes_to_read;
/// Delayed exception in case retries with partial content are not satisfiable.
std::exception_ptr exception;
bool retry_with_range_header = false;
ReadSettings settings;
Poco::Logger * log;
@ -137,14 +138,18 @@ namespace detail
request.set(std::get<0>(http_header_entry), std::get<1>(http_header_entry));
}
bool with_partial_content = bytes_read && total_bytes_to_read;
/**
* Add range header if we have start offset (for disk web)
* or if we want to retry GET request on purpose.
*/
bool with_partial_content = start_byte || retry_with_range_header;
if (with_partial_content)
request.set("Range", fmt::format("bytes={}-", start_byte + bytes_read));
if (!credentials.getUsername().empty())
credentials.authenticate(request);
LOG_TRACE((&Poco::Logger::get("ReadWriteBufferFromHTTP")), "Sending request to {}", uri_.toString());
LOG_TRACE(log, "Sending request to {}", uri_.toString());
auto sess = session->getSession();
@ -205,7 +210,13 @@ namespace detail
, settings {settings_}
, log(&Poco::Logger::get("ReadWriteBufferFromHTTP"))
{
initialize();
if (settings.http_max_tries <= 0 || settings.http_retry_initial_backoff_ms <= 0
|| settings.http_retry_initial_backoff_ms >= settings.http_retry_max_backoff_ms)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Invalid setting for http backoff, "
"must be http_max_tries >= 1 (current is {}) and "
"0 < http_retry_initial_backoff_ms < settings.http_retry_max_backoff_ms (but now {} > {})",
settings.http_max_tries, settings.http_retry_initial_backoff_ms, settings.http_retry_max_backoff_ms)
}
void initialize()
@ -272,12 +283,28 @@ namespace detail
}
catch (const Poco::Exception & e)
{
bool can_retry_request = !bytes_read || total_bytes_to_read.has_value();
/**
* Retry request unconditionally if nothing has beed read yet.
* Otherwise if it is GET method retry with range header starting from bytes_read.
*/
bool can_retry_request = !bytes_read || method == Poco::Net::HTTPRequest::HTTP_GET;
if (!can_retry_request)
throw;
LOG_ERROR(&Poco::Logger::get("ReadBufferFromHTTP"), "Error: {}, code: {}", e.what(), e.code());
/**
* if total_size is not known, last write can fail if we retry with
* bytes_read == total_size and header `bytes=bytes_read-`
* (we will get an error code 416 - range not satisfiable).
* In this case rethrow previous exception.
*/
if (exception && !total_bytes_to_read.has_value() && e.code() == 416)
std::rethrow_exception(exception);
LOG_ERROR(log, "HTTP request to `{}` failed at try {}/{}. Error: {}, code: {}. (Current backoff wait is {}/{} ms)",
uri.toString(), i, settings.http_max_tries, e.what(), e.code(),
milliseconds_to_wait, settings.http_retry_max_backoff_ms);
retry_with_range_header = true;
exception = std::current_exception();
impl.reset();
sleepForMilliseconds(milliseconds_to_wait);

View File

@ -122,7 +122,7 @@ namespace
callback,
timeouts,
context->getSettingsRef().max_http_get_redirects,
Poco::Net::HTTPBasicCredentials{},
credentials,
DBMS_DEFAULT_BUFFER_SIZE,
context->getReadSettings(),
headers,
@ -189,6 +189,8 @@ namespace
std::unique_ptr<ReadBuffer> read_buf;
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
Poco::Net::HTTPBasicCredentials credentials{};
};
}

View File

@ -1,6 +1,10 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.network import PartitionManager
import threading
import time
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/named_collections.xml'], with_zookeeper=False, with_hdfs=True)
@ -95,3 +99,61 @@ def test_predefined_connection_configuration(started_cluster):
result = node1.query("SET max_http_get_redirects=1; select * from url(url1, url='http://hdfs1:50070/webhdfs/v1/simple_storage?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', format='TSV', structure='id UInt32, name String, weight Float64')")
assert(result == "1\tMark\t72.53\n")
node1.query("drop table WebHDFSStorageWithRedirect")
result = ''
def test_url_reconnect_at_start(started_cluster):
hdfs_api = started_cluster.hdfs_api
with PartitionManager() as pm:
node1.query(
"insert into table function hdfs('hdfs://hdfs1:9000/storage_big', 'TSV', 'id Int32') select number from numbers(500000)")
pm._add_rule({'probability': 1, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'})
def select():
global result
print("reading")
result = node1.query(
"select sum(cityHash64(id)) from url('http://hdfs1:50075/webhdfs/v1/storage_big?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', 'TSV', 'id Int32')")
print(result)
thread = threading.Thread(target=select)
thread.start()
time.sleep(3)
print("delete rule")
pm._delete_rule({'probability': 1, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'})
thread.join()
assert node1.contains_in_log("Error: Timeout, code:")
print(result)
result = ''
def test_url_reconnect_in_the_middle(started_cluster):
hdfs_api = started_cluster.hdfs_api
with PartitionManager() as pm:
node1.query(
"insert into table function hdfs('hdfs://hdfs1:9000/storage_big2', 'TSV', 'id Int32') select number from numbers(10000000)")
def select():
global result
print("reading")
result = node1.query(
"select sum(cityHash64(id)) from url('http://hdfs1:50075/webhdfs/v1/storage_big2?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', 'TSV', 'id Int32')")
print(result)
thread = threading.Thread(target=select)
print("add rule")
pm._add_rule({'probability': 0.3, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'})
thread.start()
time.sleep(0.5)
pm._add_rule({'probability': 1, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'})
time.sleep(3)
print("delete rule")
pm._delete_rule({'probability': 0.3, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'})
pm._delete_rule({'probability': 1, 'destination': node1.ip_address, 'source_port': 50075, 'action': 'DROP'})
thread.join()
assert node1.contains_in_log("Error: Timeout, code:")
print(result)