This commit is contained in:
kssenii 2021-10-23 18:53:59 +00:00
parent 710ee96878
commit 1ecb6ad05d
4 changed files with 24 additions and 49 deletions

View File

@ -37,10 +37,10 @@ std::unique_ptr<ReadBuffer> ReadIndirectBufferFromWebServer::initialize()
{ {
Poco::URI uri(url); Poco::URI uri(url);
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers; read_settings.http_start_offset = offset;
headers.emplace_back(std::make_pair("Range", fmt::format("bytes={}-", offset)));
const auto & settings = context->getSettingsRef();
LOG_DEBUG(log, "Reading from offset: {}", offset); LOG_DEBUG(log, "Reading from offset: {}", offset);
const auto & settings = context->getSettingsRef();
const auto & config = context->getConfigRef(); const auto & config = context->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 20), 0}; Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 20), 0};
@ -56,8 +56,7 @@ std::unique_ptr<ReadBuffer> ReadIndirectBufferFromWebServer::initialize()
0, 0,
Poco::Net::HTTPBasicCredentials{}, Poco::Net::HTTPBasicCredentials{},
buf_size, buf_size,
read_settings, read_settings);
headers);
} }

View File

@ -72,6 +72,7 @@ struct ReadSettings
size_t http_max_tries = 1; size_t http_max_tries = 1;
size_t http_retry_initial_backoff_ms = 100; size_t http_retry_initial_backoff_ms = 100;
size_t http_retry_max_backoff_ms = 10000; size_t http_retry_max_backoff_ms = 10000;
size_t http_start_offset = 0;
ReadSettings adjustBufferSize(size_t file_size) const ReadSettings adjustBufferSize(size_t file_size) const
{ {

View File

@ -20,7 +20,6 @@
#include <Common/RemoteHostFilter.h> #include <Common/RemoteHostFilter.h>
#include <base/logger_useful.h> #include <base/logger_useful.h>
#include <Poco/URIStreamFactory.h> #include <Poco/URIStreamFactory.h>
#include <charconv>
#if !defined(ARCADIA_BUILD) #if !defined(ARCADIA_BUILD)
# include <Common/config.h> # include <Common/config.h>
@ -108,15 +107,16 @@ namespace detail
std::function<void(size_t)> next_callback; std::function<void(size_t)> next_callback;
size_t buffer_size; size_t buffer_size;
size_t bytes_read = 0; size_t bytes_read = 0;
/// Read from offset with range header if needed.
size_t start_byte = 0; size_t start_byte = 0;
bool with_partial_content = false; /// Non-empty if content-length header was received.
std::optional<size_t> total_bytes_to_read; std::optional<size_t> total_bytes_to_read;
/// Delayed exception in case retries with partial content are not satisfiable /// Delayed exception in case retries with partial content are not satisfiable.
std::optional<Poco::Exception> exception; std::exception_ptr exception;
ReadSettings settings; ReadSettings settings;
Poco::Logger * log; Poco::Logger * log;
@ -137,7 +137,8 @@ namespace detail
request.set(std::get<0>(http_header_entry), std::get<1>(http_header_entry)); request.set(std::get<0>(http_header_entry), std::get<1>(http_header_entry));
} }
if (bytes_read && with_partial_content) bool with_partial_content = bytes_read && total_bytes_to_read;
if (with_partial_content)
request.set("Range", fmt::format("bytes={}-", start_byte + bytes_read)); request.set("Range", fmt::format("bytes={}-", start_byte + bytes_read));
if (!credentials.getUsername().empty()) if (!credentials.getUsername().empty())
@ -159,9 +160,9 @@ namespace detail
if (with_partial_content && response.getStatus() != Poco::Net::HTTPResponse::HTTPStatus::HTTP_PARTIAL_CONTENT) if (with_partial_content && response.getStatus() != Poco::Net::HTTPResponse::HTTPStatus::HTTP_PARTIAL_CONTENT)
{ {
/// If we retries some request, throw error from that request. /// If we retried some request, throw error from that request.
if (exception) if (exception)
exception->rethrow(); std::rethrow_exception(exception);
throw Exception(ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE, "Cannot read with range: {}", request.get("Range")); throw Exception(ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE, "Cannot read with range: {}", request.get("Range"));
} }
@ -200,36 +201,15 @@ namespace detail
, http_header_entries {http_header_entries_} , http_header_entries {http_header_entries_}
, remote_host_filter {remote_host_filter_} , remote_host_filter {remote_host_filter_}
, buffer_size {buffer_size_} , buffer_size {buffer_size_}
, start_byte {settings_.http_start_offset}
, settings {settings_} , settings {settings_}
, log(&Poco::Logger::get("ReadWriteBufferFromHTTP")) , log(&Poco::Logger::get("ReadWriteBufferFromHTTP"))
{ {
/**
* Get first byte from `bytes=offset-`, `bytes=offset-end`.
* Now there are two places, where it can be set: 1. in DiskWeb (offset), 2. via config as part of named-collection.
* Other cases not supported.
*/
auto range_header = std::find_if(http_header_entries_.begin(), http_header_entries_.end(),
[&](const HTTPHeaderEntry & header) { return std::get<0>(header) == "Range"; });
if (range_header != http_header_entries_.end())
{
if (method != Poco::Net::HTTPRequest::HTTP_GET)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Headers are allowed only with GET request");
auto range = std::get<1>(*range_header).substr(std::strlen("bytes="));
UInt64 start;
auto parsed = tryParse<UInt64>(start, range);
if (parsed)
start_byte = start;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot extract start byte");
}
initialize(); initialize();
} }
void initialize() void initialize()
{ {
Poco::Net::HTTPResponse response; Poco::Net::HTTPResponse response;
istr = call(uri, response); istr = call(uri, response);
@ -244,14 +224,8 @@ namespace detail
} }
/// If it is the very first initialization. /// If it is the very first initialization.
if (!bytes_read && !total_bytes_to_read) if (!bytes_read && !total_bytes_to_read && response.hasContentLength())
{ total_bytes_to_read = response.getContentLength();
/// If we do not know total size, disable retries in the middle of reading.
if (response.hasContentLength())
total_bytes_to_read = response.getContentLength();
else
with_partial_content = false;
}
try try
{ {
@ -298,15 +272,14 @@ namespace detail
} }
catch (const Poco::Exception & e) catch (const Poco::Exception & e)
{ {
if (bytes_read && !with_partial_content) bool can_retry_request = !bytes_read || total_bytes_to_read.has_value();
if (!can_retry_request)
throw; throw;
LOG_ERROR(&Poco::Logger::get("ReadBufferFromHTTP"), "Error: {}, code: {}", e.what(), e.code()); LOG_ERROR(&Poco::Logger::get("ReadBufferFromHTTP"), "Error: {}, code: {}", e.what(), e.code());
exception = std::current_exception();
impl.reset(); impl.reset();
exception.reset();
exception.emplace(e);
sleepForMilliseconds(milliseconds_to_wait); sleepForMilliseconds(milliseconds_to_wait);
milliseconds_to_wait *= 2; milliseconds_to_wait *= 2;
} }
@ -315,7 +288,7 @@ namespace detail
} }
if (!successful_read && exception) if (!successful_read && exception)
exception->rethrow(); std::rethrow_exception(exception);
if (!result) if (!result)
return false; return false;

View File

@ -450,6 +450,8 @@ void registerStorageURL(StorageFactory & factory)
for (const auto & [header, value] : configuration.headers) for (const auto & [header, value] : configuration.headers)
{ {
auto value_literal = value.safeGet<String>(); auto value_literal = value.safeGet<String>();
if (header == "Range")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
headers.emplace_back(std::make_pair(header, value_literal)); headers.emplace_back(std::make_pair(header, value_literal));
} }