fix conflict

This commit is contained in:
attack204 2023-01-19 13:18:07 +08:00
parent 8caa2a0ee9
commit 7bd0010c50

View File

@ -199,44 +199,44 @@ StorageURLSource::StorageURLSource(
const URIParams & params,
bool glob_url)
: ISource(sample_block), name(std::move(name_)), uri_info(uri_info_)
{
auto headers = getHeaders(headers_);
/// Lazy initialization. We should not perform requests in constructor, because we need to do it in query pipeline.
initialize = [=, this](const URIInfo::FailoverOptions & uri_options)
{
auto headers = getHeaders(headers_);
if (uri_options.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty url list");
/// Lazy initialization. We should not perform requests in constructor, because we need to do it in query pipeline.
initialize = [=, this](const URIInfo::FailoverOptions & uri_options)
{
if (uri_options.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got empty url list");
auto first_option = uri_options.begin();
read_buf = getFirstAvailableURLReadBuffer(
first_option,
uri_options.end(),
context,
params,
http_method,
callback,
timeouts,
compression_method,
credentials,
headers,
glob_url,
uri_options.size() == 1,
download_threads);
auto first_option = uri_options.begin();
read_buf = getFirstAvailableURLReadBuffer(
first_option,
uri_options.end(),
context,
params,
http_method,
callback,
timeouts,
compression_method,
credentials,
headers,
glob_url,
uri_options.size() == 1,
download_threads);
auto input_format
= FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings);
QueryPipelineBuilder builder;
builder.init(Pipe(input_format));
auto input_format
= FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings);
QueryPipelineBuilder builder;
builder.init(Pipe(input_format));
builder.addSimpleTransform(
[&](const Block & cur_header)
{ return std::make_shared<AddingDefaultsTransform>(cur_header, columns, *input_format, context); });
builder.addSimpleTransform(
[&](const Block & cur_header)
{ return std::make_shared<AddingDefaultsTransform>(cur_header, columns, *input_format, context); });
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
};
}
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
};
}
std::unique_ptr<ReadBuffer> StorageURLSource::getFirstAvailableURLReadBuffer(
std::vector<String>::const_iterator & option,
@ -297,92 +297,8 @@ std::unique_ptr<ReadBuffer> StorageURLSource::getFirstAvailableURLReadBuffer(
{
try
{
ReadWriteBufferFromHTTP buffer(
request_uri,
Poco::Net::HTTPRequest::HTTP_HEAD,
callback,
timeouts,
credentials,
settings.max_http_get_redirects,
settings.max_read_buffer_size,
read_settings,
headers,
ReadWriteBufferFromHTTP::Range{0, std::nullopt},
&context->getRemoteHostFilter(),
true,
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error);
Poco::Net::HTTPResponse res;
for (size_t i = 0; i < settings.http_max_tries; ++i)
{
try
{
buffer.callWithRedirects(res, Poco::Net::HTTPRequest::HTTP_HEAD, true);
break;
}
catch (const Poco::Exception & e)
{
LOG_TRACE(
&Poco::Logger::get("StorageURLSource"),
"HTTP HEAD request to `{}` failed at try {}/{}. "
"Error: {}.",
request_uri.toString(),
i + 1,
settings.http_max_tries,
e.displayText());
if (!ReadWriteBufferFromHTTP::isRetriableError(res.getStatus()))
{
throw;
}
}
}
// to check if Range header is supported, we need to send a request with it set
const bool supports_ranges = (res.has("Accept-Ranges") && res.get("Accept-Ranges") == "bytes")
|| (res.has("Content-Range") && res.get("Content-Range").starts_with("bytes"));
if (supports_ranges)
LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "HTTP Range is supported");
else
LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "HTTP Range is not supported");
if (supports_ranges && res.getStatus() == Poco::Net::HTTPResponse::HTTP_PARTIAL_CONTENT
&& res.hasContentLength())
{
LOG_TRACE(
&Poco::Logger::get("StorageURLSource"),
"Using ParallelReadBuffer with {} workers with chunks of {} bytes",
download_threads,
settings.max_download_buffer_size);
auto read_buffer_factory = std::make_unique<RangedReadWriteBufferFromHTTPFactory>(
res.getContentLength(),
settings.max_download_buffer_size,
request_uri,
http_method,
callback,
timeouts,
credentials,
settings.max_http_get_redirects,
settings.max_read_buffer_size,
read_settings,
headers,
&context->getRemoteHostFilter(),
delay_initialization,
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error);
return wrapReadBufferWithCompressionMethod(
std::make_unique<ParallelReadBuffer>(
std::move(read_buffer_factory),
threadPoolCallbackRunner<void>(IOThreadPool::get(), "URLParallelRead"),
download_threads),
compression_method,
zstd_window_log_max);
}
buffer.callWithRedirects(res, Poco::Net::HTTPRequest::HTTP_HEAD, true);
break;
}
catch (const Poco::Exception & e)
{
@ -404,9 +320,11 @@ std::unique_ptr<ReadBuffer> StorageURLSource::getFirstAvailableURLReadBuffer(
// to check if Range header is supported, we need to send a request with it set
const bool supports_ranges = (res.has("Accept-Ranges") && res.get("Accept-Ranges") == "bytes")
|| (res.has("Content-Range") && res.get("Content-Range").starts_with("bytes"));
LOG_TRACE(
&Poco::Logger::get("StorageURLSource"),
fmt::runtime(supports_ranges ? "HTTP Range is supported" : "HTTP Range is not supported"));
if (supports_ranges)
LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "HTTP Range is supported");
else
LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "HTTP Range is not supported");
if (supports_ranges && res.getStatus() == Poco::Net::HTTPResponse::HTTP_PARTIAL_CONTENT