mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-12 01:12:12 +00:00
Refactor code
This commit is contained in:
parent
0a1a3a230e
commit
29c32ed831
@ -82,7 +82,7 @@ void ParallelReadBuffer::processor()
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Start processing
|
/// Start processing
|
||||||
readerThreadFunction(worker);
|
readerThreadFunction(std::move(worker));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -162,7 +162,7 @@ namespace detail
|
|||||||
range_header_value = fmt::format("bytes={}-{}", getOffset(), *read_range.end);
|
range_header_value = fmt::format("bytes={}-{}", getOffset(), *read_range.end);
|
||||||
else
|
else
|
||||||
range_header_value = fmt::format("bytes={}-", getOffset());
|
range_header_value = fmt::format("bytes={}-", getOffset());
|
||||||
LOG_ERROR(log, "Adding header: Range: {}", range_header_value);
|
LOG_TEST(log, "Adding header: Range: {}", range_header_value);
|
||||||
request.set("Range", range_header_value);
|
request.set("Range", range_header_value);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -430,7 +430,8 @@ namespace detail
|
|||||||
if (next_callback)
|
if (next_callback)
|
||||||
next_callback(count());
|
next_callback(count());
|
||||||
|
|
||||||
if (read_range.end && getOffset() > read_range.end.value()) {
|
if (read_range.end && getOffset() > read_range.end.value())
|
||||||
|
{
|
||||||
assert(getOffset() == read_range.end.value() + 1);
|
assert(getOffset() == read_range.end.value() + 1);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -620,6 +621,7 @@ public:
|
|||||||
|
|
||||||
using Range = std::pair<size_t, size_t>;
|
using Range = std::pair<size_t, size_t>;
|
||||||
|
|
||||||
|
// return upper exclusive range of values, i.e. [from_range, to_range>
|
||||||
std::optional<Range> nextRange()
|
std::optional<Range> nextRange()
|
||||||
{
|
{
|
||||||
if (from_range >= total_size)
|
if (from_range >= total_size)
|
||||||
@ -627,14 +629,14 @@ public:
|
|||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto to_range = from_range + range_step - 1;
|
auto to_range = from_range + range_step;
|
||||||
if (to_range >= total_size)
|
if (to_range >= total_size)
|
||||||
{
|
{
|
||||||
to_range = total_size - 1;
|
to_range = total_size - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
Range range{from_range, to_range};
|
Range range{from_range, to_range};
|
||||||
from_range = to_range + 1;
|
from_range = to_range;
|
||||||
return std::move(range);
|
return std::move(range);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -738,7 +740,8 @@ public:
|
|||||||
buffer_size,
|
buffer_size,
|
||||||
settings,
|
settings,
|
||||||
http_header_entries,
|
http_header_entries,
|
||||||
ReadWriteBufferFromHTTP::Range{next_range->first, next_range->second},
|
// HTTP Range has inclusive bounds, i.e. [from, to]
|
||||||
|
ReadWriteBufferFromHTTP::Range{next_range->first, next_range->second - 1},
|
||||||
remote_host_filter,
|
remote_host_filter,
|
||||||
delay_initialization,
|
delay_initialization,
|
||||||
use_external_buffer,
|
use_external_buffer,
|
||||||
|
@ -259,25 +259,34 @@ namespace
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
auto http_session = makeHTTPSession(request_uri, timeouts);
|
bool supports_ranges = false;
|
||||||
auto request = Poco::Net::HTTPRequest(Poco::Net::HTTPRequest::HTTP_HEAD, request_uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
|
std::optional<size_t> content_length;
|
||||||
request.setHost(request_uri.getHost()); // use original, not resolved host name in header
|
try
|
||||||
request.set("Range", "bytes=0-");
|
{
|
||||||
http_session->sendRequest(request);
|
auto http_session = makeHTTPSession(request_uri, timeouts);
|
||||||
Poco::Net::HTTPResponse res;
|
auto request = Poco::Net::HTTPRequest(Poco::Net::HTTPRequest::HTTP_HEAD, request_uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1);
|
||||||
receiveResponse(*http_session, request, res, true);
|
request.setHost(request_uri.getHost()); // use original, not resolved host name in header
|
||||||
bool supports_ranges = res.has("Accept-Ranges") && res.get("Accept-Ranges") == "bytes";
|
// to check if Range header is supported, we need to send a request with it set
|
||||||
if (supports_ranges) {
|
request.set("Range", "bytes=0-");
|
||||||
LOG_ERROR(&Poco::Logger::get(__PRETTY_FUNCTION__), "Ranges supported");
|
http_session->sendRequest(request);
|
||||||
} else {
|
Poco::Net::HTTPResponse res;
|
||||||
LOG_ERROR(&Poco::Logger::get(__PRETTY_FUNCTION__), "Ranges not supported");
|
receiveResponse(*http_session, request, res, true);
|
||||||
|
supports_ranges = res.has("Accept-Ranges") && res.get("Accept-Ranges") == "bytes";
|
||||||
|
LOG_TRACE(&Poco::Logger::get("StorageURLSource"), fmt::runtime(supports_ranges ? "HTTP Range is supported" : "HTTP Range is not supported"));
|
||||||
|
|
||||||
|
if (res.hasContentLength())
|
||||||
|
{
|
||||||
|
content_length.emplace(res.getContentLength());
|
||||||
|
}
|
||||||
|
} catch (...)
|
||||||
|
{
|
||||||
|
LOG_TRACE(&Poco::Logger::get(__PRETTY_FUNCTION__), "HEAD request failed. HTTP Range cannot be used.");
|
||||||
}
|
}
|
||||||
|
|
||||||
//if (!supports_ranges)
|
if (supports_ranges && content_length)
|
||||||
if (supports_ranges && res.hasContentLength())
|
|
||||||
{
|
{
|
||||||
auto read_buffer_factory = std::make_unique<RangedReadWriteBufferFromHTTPFactory>(
|
auto read_buffer_factory = std::make_unique<RangedReadWriteBufferFromHTTPFactory>(
|
||||||
res.getContentLength(),
|
*content_length,
|
||||||
10 * 1024 * 1024,
|
10 * 1024 * 1024,
|
||||||
request_uri,
|
request_uri,
|
||||||
http_method,
|
http_method,
|
||||||
|
Loading…
Reference in New Issue
Block a user