mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 17:41:59 +00:00
Better
This commit is contained in:
parent
710ee96878
commit
1ecb6ad05d
@ -37,10 +37,10 @@ std::unique_ptr<ReadBuffer> ReadIndirectBufferFromWebServer::initialize()
|
|||||||
{
|
{
|
||||||
Poco::URI uri(url);
|
Poco::URI uri(url);
|
||||||
|
|
||||||
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
|
read_settings.http_start_offset = offset;
|
||||||
headers.emplace_back(std::make_pair("Range", fmt::format("bytes={}-", offset)));
|
|
||||||
const auto & settings = context->getSettingsRef();
|
|
||||||
LOG_DEBUG(log, "Reading from offset: {}", offset);
|
LOG_DEBUG(log, "Reading from offset: {}", offset);
|
||||||
|
|
||||||
|
const auto & settings = context->getSettingsRef();
|
||||||
const auto & config = context->getConfigRef();
|
const auto & config = context->getConfigRef();
|
||||||
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 20), 0};
|
Poco::Timespan http_keep_alive_timeout{config.getUInt("keep_alive_timeout", 20), 0};
|
||||||
|
|
||||||
@ -56,8 +56,7 @@ std::unique_ptr<ReadBuffer> ReadIndirectBufferFromWebServer::initialize()
|
|||||||
0,
|
0,
|
||||||
Poco::Net::HTTPBasicCredentials{},
|
Poco::Net::HTTPBasicCredentials{},
|
||||||
buf_size,
|
buf_size,
|
||||||
read_settings,
|
read_settings);
|
||||||
headers);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -72,6 +72,7 @@ struct ReadSettings
|
|||||||
size_t http_max_tries = 1;
|
size_t http_max_tries = 1;
|
||||||
size_t http_retry_initial_backoff_ms = 100;
|
size_t http_retry_initial_backoff_ms = 100;
|
||||||
size_t http_retry_max_backoff_ms = 10000;
|
size_t http_retry_max_backoff_ms = 10000;
|
||||||
|
size_t http_start_offset = 0;
|
||||||
|
|
||||||
ReadSettings adjustBufferSize(size_t file_size) const
|
ReadSettings adjustBufferSize(size_t file_size) const
|
||||||
{
|
{
|
||||||
|
@ -20,7 +20,6 @@
|
|||||||
#include <Common/RemoteHostFilter.h>
|
#include <Common/RemoteHostFilter.h>
|
||||||
#include <base/logger_useful.h>
|
#include <base/logger_useful.h>
|
||||||
#include <Poco/URIStreamFactory.h>
|
#include <Poco/URIStreamFactory.h>
|
||||||
#include <charconv>
|
|
||||||
|
|
||||||
#if !defined(ARCADIA_BUILD)
|
#if !defined(ARCADIA_BUILD)
|
||||||
# include <Common/config.h>
|
# include <Common/config.h>
|
||||||
@ -108,15 +107,16 @@ namespace detail
|
|||||||
std::function<void(size_t)> next_callback;
|
std::function<void(size_t)> next_callback;
|
||||||
|
|
||||||
size_t buffer_size;
|
size_t buffer_size;
|
||||||
|
|
||||||
size_t bytes_read = 0;
|
size_t bytes_read = 0;
|
||||||
|
/// Read from offset with range header if needed.
|
||||||
size_t start_byte = 0;
|
size_t start_byte = 0;
|
||||||
|
|
||||||
bool with_partial_content = false;
|
/// Non-empty if content-length header was received.
|
||||||
std::optional<size_t> total_bytes_to_read;
|
std::optional<size_t> total_bytes_to_read;
|
||||||
|
|
||||||
/// Delayed exception in case retries with partial content are not satisfiable
|
/// Delayed exception in case retries with partial content are not satisfiable.
|
||||||
std::optional<Poco::Exception> exception;
|
std::exception_ptr exception;
|
||||||
|
|
||||||
ReadSettings settings;
|
ReadSettings settings;
|
||||||
Poco::Logger * log;
|
Poco::Logger * log;
|
||||||
|
|
||||||
@ -137,7 +137,8 @@ namespace detail
|
|||||||
request.set(std::get<0>(http_header_entry), std::get<1>(http_header_entry));
|
request.set(std::get<0>(http_header_entry), std::get<1>(http_header_entry));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (bytes_read && with_partial_content)
|
bool with_partial_content = bytes_read && total_bytes_to_read;
|
||||||
|
if (with_partial_content)
|
||||||
request.set("Range", fmt::format("bytes={}-", start_byte + bytes_read));
|
request.set("Range", fmt::format("bytes={}-", start_byte + bytes_read));
|
||||||
|
|
||||||
if (!credentials.getUsername().empty())
|
if (!credentials.getUsername().empty())
|
||||||
@ -159,9 +160,9 @@ namespace detail
|
|||||||
|
|
||||||
if (with_partial_content && response.getStatus() != Poco::Net::HTTPResponse::HTTPStatus::HTTP_PARTIAL_CONTENT)
|
if (with_partial_content && response.getStatus() != Poco::Net::HTTPResponse::HTTPStatus::HTTP_PARTIAL_CONTENT)
|
||||||
{
|
{
|
||||||
/// If we retries some request, throw error from that request.
|
/// If we retried some request, throw error from that request.
|
||||||
if (exception)
|
if (exception)
|
||||||
exception->rethrow();
|
std::rethrow_exception(exception);
|
||||||
throw Exception(ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE, "Cannot read with range: {}", request.get("Range"));
|
throw Exception(ErrorCodes::HTTP_RANGE_NOT_SATISFIABLE, "Cannot read with range: {}", request.get("Range"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -200,36 +201,15 @@ namespace detail
|
|||||||
, http_header_entries {http_header_entries_}
|
, http_header_entries {http_header_entries_}
|
||||||
, remote_host_filter {remote_host_filter_}
|
, remote_host_filter {remote_host_filter_}
|
||||||
, buffer_size {buffer_size_}
|
, buffer_size {buffer_size_}
|
||||||
|
, start_byte {settings_.http_start_offset}
|
||||||
, settings {settings_}
|
, settings {settings_}
|
||||||
, log(&Poco::Logger::get("ReadWriteBufferFromHTTP"))
|
, log(&Poco::Logger::get("ReadWriteBufferFromHTTP"))
|
||||||
{
|
{
|
||||||
/**
|
|
||||||
* Get first byte from `bytes=offset-`, `bytes=offset-end`.
|
|
||||||
* Now there are two places, where it can be set: 1. in DiskWeb (offset), 2. via config as part of named-collection.
|
|
||||||
* Other cases not supported.
|
|
||||||
*/
|
|
||||||
auto range_header = std::find_if(http_header_entries_.begin(), http_header_entries_.end(),
|
|
||||||
[&](const HTTPHeaderEntry & header) { return std::get<0>(header) == "Range"; });
|
|
||||||
if (range_header != http_header_entries_.end())
|
|
||||||
{
|
|
||||||
if (method != Poco::Net::HTTPRequest::HTTP_GET)
|
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Headers are allowed only with GET request");
|
|
||||||
auto range = std::get<1>(*range_header).substr(std::strlen("bytes="));
|
|
||||||
UInt64 start;
|
|
||||||
auto parsed = tryParse<UInt64>(start, range);
|
|
||||||
if (parsed)
|
|
||||||
start_byte = start;
|
|
||||||
else
|
|
||||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot extract start byte");
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
initialize();
|
initialize();
|
||||||
}
|
}
|
||||||
|
|
||||||
void initialize()
|
void initialize()
|
||||||
{
|
{
|
||||||
|
|
||||||
Poco::Net::HTTPResponse response;
|
Poco::Net::HTTPResponse response;
|
||||||
istr = call(uri, response);
|
istr = call(uri, response);
|
||||||
|
|
||||||
@ -244,14 +224,8 @@ namespace detail
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// If it is the very first initialization.
|
/// If it is the very first initialization.
|
||||||
if (!bytes_read && !total_bytes_to_read)
|
if (!bytes_read && !total_bytes_to_read && response.hasContentLength())
|
||||||
{
|
total_bytes_to_read = response.getContentLength();
|
||||||
/// If we do not know total size, disable retries in the middle of reading.
|
|
||||||
if (response.hasContentLength())
|
|
||||||
total_bytes_to_read = response.getContentLength();
|
|
||||||
else
|
|
||||||
with_partial_content = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
@ -298,15 +272,14 @@ namespace detail
|
|||||||
}
|
}
|
||||||
catch (const Poco::Exception & e)
|
catch (const Poco::Exception & e)
|
||||||
{
|
{
|
||||||
if (bytes_read && !with_partial_content)
|
bool can_retry_request = !bytes_read || total_bytes_to_read.has_value();
|
||||||
|
if (!can_retry_request)
|
||||||
throw;
|
throw;
|
||||||
|
|
||||||
LOG_ERROR(&Poco::Logger::get("ReadBufferFromHTTP"), "Error: {}, code: {}", e.what(), e.code());
|
LOG_ERROR(&Poco::Logger::get("ReadBufferFromHTTP"), "Error: {}, code: {}", e.what(), e.code());
|
||||||
|
|
||||||
|
exception = std::current_exception();
|
||||||
impl.reset();
|
impl.reset();
|
||||||
|
|
||||||
exception.reset();
|
|
||||||
exception.emplace(e);
|
|
||||||
|
|
||||||
sleepForMilliseconds(milliseconds_to_wait);
|
sleepForMilliseconds(milliseconds_to_wait);
|
||||||
milliseconds_to_wait *= 2;
|
milliseconds_to_wait *= 2;
|
||||||
}
|
}
|
||||||
@ -315,7 +288,7 @@ namespace detail
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!successful_read && exception)
|
if (!successful_read && exception)
|
||||||
exception->rethrow();
|
std::rethrow_exception(exception);
|
||||||
|
|
||||||
if (!result)
|
if (!result)
|
||||||
return false;
|
return false;
|
||||||
|
@ -450,6 +450,8 @@ void registerStorageURL(StorageFactory & factory)
|
|||||||
for (const auto & [header, value] : configuration.headers)
|
for (const auto & [header, value] : configuration.headers)
|
||||||
{
|
{
|
||||||
auto value_literal = value.safeGet<String>();
|
auto value_literal = value.safeGet<String>();
|
||||||
|
if (header == "Range")
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
|
||||||
headers.emplace_back(std::make_pair(header, value_literal));
|
headers.emplace_back(std::make_pair(header, value_literal));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user