mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 03:22:14 +00:00
Better
This commit is contained in:
parent
710ee96878
commit
1ecb6ad05d
@ -37,10 +37,10 @@ std::unique_ptr<ReadBuffer> ReadIndirectBufferFromWebServer::initialize()
|
||||
{
|
||||
Poco::URI uri(url);
|
||||
|
||||
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
|
||||
headers.emplace_back(std::make_pair("Range", fmt::format("bytes={}-", offset)));
|
||||
const auto & settings = context->getSettingsRef();
|
||||
read_settings.http_start_offset = offset;
|
||||
LOG_DEBUG(log, "Reading from offset: {}", offset);
|
||||
|
||||
const auto & settings = context->getSettingsRef();
|
||||
const auto & config = context->getConfigRef();
|
||||
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 20), 0};
|
||||
|
||||
@ -56,8 +56,7 @@ std::unique_ptr<ReadBuffer> ReadIndirectBufferFromWebServer::initialize()
|
||||
0,
|
||||
Poco::Net::HTTPBasicCredentials{},
|
||||
buf_size,
|
||||
read_settings,
|
||||
headers);
|
||||
read_settings);
|
||||
}
|
||||
|
||||
|
||||
|
@ -72,6 +72,7 @@ struct ReadSettings
|
||||
size_t http_max_tries = 1;
|
||||
size_t http_retry_initial_backoff_ms = 100;
|
||||
size_t http_retry_max_backoff_ms = 10000;
|
||||
size_t http_start_offset = 0;
|
||||
|
||||
ReadSettings adjustBufferSize(size_t file_size) const
|
||||
{
|
||||
|
@ -20,7 +20,6 @@
|
||||
#include <Common/RemoteHostFilter.h>
|
||||
#include <base/logger_useful.h>
|
||||
#include <Poco/URIStreamFactory.h>
|
||||
#include <charconv>
|
||||
|
||||
#if !defined(ARCADIA_BUILD)
|
||||
# include <Common/config.h>
|
||||
@ -108,15 +107,16 @@ namespace detail
|
||||
std::function<void(size_t)> next_callback;
|
||||
|
||||
size_t buffer_size;
|
||||
|
||||
size_t bytes_read = 0;
|
||||
/// Read from offset with range header if needed.
|
||||
size_t start_byte = 0;
|
||||
|
||||
bool with_partial_content = false;
|
||||
/// Non-empty if content-length header was received.
|
||||
std::optional<size_t> total_bytes_to_read;
|
||||
|
||||
/// Delayed exception in case retries with partial content are not satisfiable
|
||||
std::optional<Poco::Exception> exception;
|
||||
/// Delayed exception in case retries with partial content are not satisfiable.
|
||||
std::exception_ptr exception;
|
||||
|
||||
ReadSettings settings;
|
||||
Poco::Logger * log;
|
||||
|
||||
@ -137,7 +137,8 @@ namespace detail
|
||||
request.set(std::get<0>(http_header_entry), std::get<1>(http_header_entry));
|
||||
}
|
||||
|
||||
if (bytes_read && with_partial_content)
|
||||
bool with_partial_content = bytes_read && total_bytes_to_read;
|
||||
if (with_partial_content)
|
||||
request.set("Range", fmt::format("bytes={}-", start_byte + bytes_read));
|
||||
|
||||
if (!credentials.getUsername().empty())
|
||||
@ -159,9 +160,9 @@ namespace detail
|
||||
|
||||
if (with_partial_content && response.getStatus() != Poco::Net::HTTPResponse::HTTPStatus::HTTP_PARTIAL_CONTENT)
|
||||
{
|
||||
/// If we retries some request, throw error from that request.
|
||||
/// If we retried some request, throw error from that request.
|
||||
if (exception)
|
||||
exception->rethrow();
|
||||
std::rethrow_exception(exception);
|
||||
throw Exception(ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE, "Cannot read with range: {}", request.get("Range"));
|
||||
}
|
||||
|
||||
@ -200,36 +201,15 @@ namespace detail
|
||||
, http_header_entries {http_header_entries_}
|
||||
, remote_host_filter {remote_host_filter_}
|
||||
, buffer_size {buffer_size_}
|
||||
, start_byte {settings_.http_start_offset}
|
||||
, settings {settings_}
|
||||
, log(&Poco::Logger::get("ReadWriteBufferFromHTTP"))
|
||||
{
|
||||
/**
|
||||
* Get first byte from `bytes=offset-`, `bytes=offset-end`.
|
||||
* Now there are two places, where it can be set: 1. in DiskWeb (offset), 2. via config as part of named-collection.
|
||||
* Other cases not supported.
|
||||
*/
|
||||
auto range_header = std::find_if(http_header_entries_.begin(), http_header_entries_.end(),
|
||||
[&](const HTTPHeaderEntry & header) { return std::get<0>(header) == "Range"; });
|
||||
if (range_header != http_header_entries_.end())
|
||||
{
|
||||
if (method != Poco::Net::HTTPRequest::HTTP_GET)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Headers are allowed only with GET request");
|
||||
auto range = std::get<1>(*range_header).substr(std::strlen("bytes="));
|
||||
UInt64 start;
|
||||
auto parsed = tryParse<UInt64>(start, range);
|
||||
if (parsed)
|
||||
start_byte = start;
|
||||
else
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot extract start byte");
|
||||
}
|
||||
|
||||
|
||||
initialize();
|
||||
}
|
||||
|
||||
void initialize()
|
||||
{
|
||||
|
||||
Poco::Net::HTTPResponse response;
|
||||
istr = call(uri, response);
|
||||
|
||||
@ -244,14 +224,8 @@ namespace detail
|
||||
}
|
||||
|
||||
/// If it is the very first initialization.
|
||||
if (!bytes_read && !total_bytes_to_read)
|
||||
{
|
||||
/// If we do not know total size, disable retries in the middle of reading.
|
||||
if (response.hasContentLength())
|
||||
if (!bytes_read && !total_bytes_to_read && response.hasContentLength())
|
||||
total_bytes_to_read = response.getContentLength();
|
||||
else
|
||||
with_partial_content = false;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
@ -298,15 +272,14 @@ namespace detail
|
||||
}
|
||||
catch (const Poco::Exception & e)
|
||||
{
|
||||
if (bytes_read && !with_partial_content)
|
||||
bool can_retry_request = !bytes_read || total_bytes_to_read.has_value();
|
||||
if (!can_retry_request)
|
||||
throw;
|
||||
|
||||
LOG_ERROR(&Poco::Logger::get("ReadBufferFromHTTP"), "Error: {}, code: {}", e.what(), e.code());
|
||||
|
||||
exception = std::current_exception();
|
||||
impl.reset();
|
||||
|
||||
exception.reset();
|
||||
exception.emplace(e);
|
||||
|
||||
sleepForMilliseconds(milliseconds_to_wait);
|
||||
milliseconds_to_wait *= 2;
|
||||
}
|
||||
@ -315,7 +288,7 @@ namespace detail
|
||||
}
|
||||
|
||||
if (!successful_read && exception)
|
||||
exception->rethrow();
|
||||
std::rethrow_exception(exception);
|
||||
|
||||
if (!result)
|
||||
return false;
|
||||
|
@ -450,6 +450,8 @@ void registerStorageURL(StorageFactory & factory)
|
||||
for (const auto & [header, value] : configuration.headers)
|
||||
{
|
||||
auto value_literal = value.safeGet<String>();
|
||||
if (header == "Range")
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
|
||||
headers.emplace_back(std::make_pair(header, value_literal));
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user