This commit is contained in:
kssenii 2021-10-23 18:53:59 +00:00
parent 710ee96878
commit 1ecb6ad05d
4 changed files with 24 additions and 49 deletions

View File

@ -37,10 +37,10 @@ std::unique_ptr<ReadBuffer> ReadIndirectBufferFromWebServer::initialize()
{
Poco::URI uri(url);
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
headers.emplace_back(std::make_pair("Range", fmt::format("bytes={}-", offset)));
const auto & settings = context->getSettingsRef();
read_settings.http_start_offset = offset;
LOG_DEBUG(log, "Reading from offset: {}", offset);
const auto & settings = context->getSettingsRef();
const auto & config = context->getConfigRef();
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 20), 0};
@ -56,8 +56,7 @@ std::unique_ptr<ReadBuffer> ReadIndirectBufferFromWebServer::initialize()
0,
Poco::Net::HTTPBasicCredentials{},
buf_size,
read_settings,
headers);
read_settings);
}

View File

@ -72,6 +72,7 @@ struct ReadSettings
size_t http_max_tries = 1;
size_t http_retry_initial_backoff_ms = 100;
size_t http_retry_max_backoff_ms = 10000;
size_t http_start_offset = 0;
ReadSettings adjustBufferSize(size_t file_size) const
{

View File

@ -20,7 +20,6 @@
#include <Common/RemoteHostFilter.h>
#include <base/logger_useful.h>
#include <Poco/URIStreamFactory.h>
#include <charconv>
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
@ -108,15 +107,16 @@ namespace detail
std::function<void(size_t)> next_callback;
size_t buffer_size;
size_t bytes_read = 0;
/// Read from offset with range header if needed.
size_t start_byte = 0;
bool with_partial_content = false;
/// Non-empty if content-length header was received.
std::optional<size_t> total_bytes_to_read;
/// Delayed exception in case retries with partial content are not satisfiable
std::optional<Poco::Exception> exception;
/// Delayed exception in case retries with partial content are not satisfiable.
std::exception_ptr exception;
ReadSettings settings;
Poco::Logger * log;
@ -137,7 +137,8 @@ namespace detail
request.set(std::get<0>(http_header_entry), std::get<1>(http_header_entry));
}
if (bytes_read && with_partial_content)
bool with_partial_content = bytes_read && total_bytes_to_read;
if (with_partial_content)
request.set("Range", fmt::format("bytes={}-", start_byte + bytes_read));
if (!credentials.getUsername().empty())
@ -159,9 +160,9 @@ namespace detail
if (with_partial_content && response.getStatus() != Poco::Net::HTTPResponse::HTTPStatus::HTTP_PARTIAL_CONTENT)
{
/// If we retries some request, throw error from that request.
/// If we retried some request, throw error from that request.
if (exception)
exception->rethrow();
std::rethrow_exception(exception);
throw Exception(ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE, "Cannot read with range: {}", request.get("Range"));
}
@ -200,36 +201,15 @@ namespace detail
, http_header_entries {http_header_entries_}
, remote_host_filter {remote_host_filter_}
, buffer_size {buffer_size_}
, start_byte {settings_.http_start_offset}
, settings {settings_}
, log(&Poco::Logger::get("ReadWriteBufferFromHTTP"))
{
/**
* Get first byte from `bytes=offset-`, `bytes=offset-end`.
* Now there are two places, where it can be set: 1. in DiskWeb (offset), 2. via config as part of named-collection.
* Other cases not supported.
*/
auto range_header = std::find_if(http_header_entries_.begin(), http_header_entries_.end(),
[&](const HTTPHeaderEntry & header) { return std::get<0>(header) == "Range"; });
if (range_header != http_header_entries_.end())
{
if (method != Poco::Net::HTTPRequest::HTTP_GET)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Headers are allowed only with GET request");
auto range = std::get<1>(*range_header).substr(std::strlen("bytes="));
UInt64 start;
auto parsed = tryParse<UInt64>(start, range);
if (parsed)
start_byte = start;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot extract start byte");
}
initialize();
}
void initialize()
{
Poco::Net::HTTPResponse response;
istr = call(uri, response);
@ -244,14 +224,8 @@ namespace detail
}
/// If it is the very first initialization.
if (!bytes_read && !total_bytes_to_read)
{
/// If we do not know total size, disable retries in the middle of reading.
if (response.hasContentLength())
if (!bytes_read && !total_bytes_to_read && response.hasContentLength())
total_bytes_to_read = response.getContentLength();
else
with_partial_content = false;
}
try
{
@ -298,15 +272,14 @@ namespace detail
}
catch (const Poco::Exception & e)
{
if (bytes_read && !with_partial_content)
bool can_retry_request = !bytes_read || total_bytes_to_read.has_value();
if (!can_retry_request)
throw;
LOG_ERROR(&Poco::Logger::get("ReadBufferFromHTTP"), "Error: {}, code: {}", e.what(), e.code());
exception = std::current_exception();
impl.reset();
exception.reset();
exception.emplace(e);
sleepForMilliseconds(milliseconds_to_wait);
milliseconds_to_wait *= 2;
}
@ -315,7 +288,7 @@ namespace detail
}
if (!successful_read && exception)
exception->rethrow();
std::rethrow_exception(exception);
if (!result)
return false;

View File

@ -450,6 +450,8 @@ void registerStorageURL(StorageFactory & factory)
for (const auto & [header, value] : configuration.headers)
{
auto value_literal = value.safeGet<String>();
if (header == "Range")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
headers.emplace_back(std::make_pair(header, value_literal));
}