Fix url with failover case

This commit is contained in:
kssenii 2021-10-03 13:53:24 +00:00
parent e3e23f0923
commit 0cb7193fde
2 changed files with 86 additions and 87 deletions

View File

@ -57,10 +57,36 @@ IStorageURLBase::IStorageURLBase(
namespace
{
ReadWriteBufferFromHTTP::HTTPHeaderEntries getHeaders(
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_)
{
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers(headers_.begin(), headers_.end());
// Propagate OpenTelemetry trace context, if any, downstream.
if (CurrentThread::isInitialized())
{
const auto & thread_trace_context = CurrentThread::get().thread_trace_context;
if (thread_trace_context.trace_id != UUID())
{
headers.emplace_back("traceparent",
thread_trace_context.composeTraceparentHeader());
if (!thread_trace_context.tracestate.empty())
{
headers.emplace_back("tracestate",
thread_trace_context.tracestate);
}
}
}
return headers;
}
class StorageURLSource : public SourceWithProgress
{
using URIParams = std::vector<std::pair<String, String>>;
public:
StorageURLSource(const Poco::URI & uri,
StorageURLSource(
const std::vector<Poco::URI> & uri_options,
const std::string & method,
std::function<void(std::ostream &)> callback,
const String & format,
@ -71,47 +97,49 @@ namespace
const ColumnsDescription & columns,
UInt64 max_block_size,
const ConnectionTimeouts & timeouts,
const CompressionMethod compression_method,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {})
const String & compression_method,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {},
const URIParams & params = {})
: SourceWithProgress(sample_block), name(std::move(name_))
{
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
for (const auto & header : headers_)
headers.emplace_back(header);
// Propagate OpenTelemetry trace context, if any, downstream.
if (CurrentThread::isInitialized())
{
const auto & thread_trace_context = CurrentThread::get().thread_trace_context;
if (thread_trace_context.trace_id != UUID())
{
headers.emplace_back("traceparent",
thread_trace_context.composeTraceparentHeader());
if (!thread_trace_context.tracestate.empty())
{
headers.emplace_back("tracestate",
thread_trace_context.tracestate);
}
}
}
auto headers = getHeaders(headers_);
/// Lazy initialization. We should not perform requests in constructor, because we need to do it in query pipeline.
initialize = [=, this]
{
read_buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
uri,
method,
std::move(callback),
timeouts,
context->getSettingsRef().max_http_get_redirects,
Poco::Net::HTTPBasicCredentials{},
DBMS_DEFAULT_BUFFER_SIZE,
headers,
context->getRemoteHostFilter()),
compression_method);
WriteBufferFromOwnString error_message;
for (auto option = uri_options.begin(); option < uri_options.end(); ++option)
{
auto request_uri = *option;
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
try
{
read_buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
request_uri,
method,
std::move(callback),
timeouts,
context->getSettingsRef().max_http_get_redirects,
Poco::Net::HTTPBasicCredentials{},
DBMS_DEFAULT_BUFFER_SIZE,
headers,
context->getRemoteHostFilter()),
chooseCompressionMethod(request_uri.getPath(), compression_method));
}
catch (...)
{
if (uri_options.size() == 1)
throw;
if (option == uri_options.end() - 1)
throw Exception(ErrorCodes::NETWORK_ERROR, "All uri options are unreachable. {}", error_message.str());
error_message << option->toString() << " error: " << getCurrentExceptionMessage(false) << "\n";
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings);
QueryPipelineBuilder builder;
@ -237,13 +265,10 @@ Pipe IStorageURLBase::read(
size_t max_block_size,
unsigned /*num_streams*/)
{
auto request_uri = uri;
auto params = getReadURIParams(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size);
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
std::vector<Poco::URI> uri_options{uri};
return Pipe(std::make_shared<StorageURLSource>(
request_uri,
uri_options,
getReadMethod(),
getReadPOSTDataCallback(
column_names, metadata_snapshot, query_info,
@ -256,8 +281,7 @@ Pipe IStorageURLBase::read(
metadata_snapshot->getColumns(),
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
chooseCompressionMethod(request_uri.getPath(), compression_method),
headers));
compression_method, headers, params));
}
@ -271,48 +295,23 @@ Pipe StorageURLWithFailover::read(
unsigned /*num_streams*/)
{
auto params = getReadURIParams(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size);
WriteBufferFromOwnString error_message;
error_message << "Detailed description:";
for (const auto & uri_option : uri_options)
{
auto request_uri = uri_option;
for (const auto & [param, value] : params)
request_uri.addQueryParameter(param, value);
try
{
/// Check for uri accessibility is done in constructor of ReadWriteBufferFromHTTP while creating StorageURLSource.
auto url_source = std::make_shared<StorageURLSource>(
request_uri,
getReadMethod(),
getReadPOSTDataCallback(
column_names, metadata_snapshot, query_info,
local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
getHeaderBlock(column_names, metadata_snapshot),
local_context,
metadata_snapshot->getColumns(),
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
chooseCompressionMethod(request_uri.getPath(), compression_method));
std::shuffle(uri_options.begin(), uri_options.end(), thread_local_rng);
return Pipe(url_source);
}
catch (...)
{
error_message << " Host: " << uri_option.getHost() << ", post: " << uri_option.getPort() << ", path: " << uri_option.getPath();
error_message << ", error: " << getCurrentExceptionMessage(false) << ";";
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
throw Exception(ErrorCodes::NETWORK_ERROR, "All uri options are unreachable. {}", error_message.str());
auto pipe = Pipe(std::make_shared<StorageURLSource>(
uri_options,
getReadMethod(),
getReadPOSTDataCallback(
column_names, metadata_snapshot, query_info,
local_context, processed_stage, max_block_size),
format_name,
format_settings,
getName(),
getHeaderBlock(column_names, metadata_snapshot),
local_context,
metadata_snapshot->getColumns(),
max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params));
std::shuffle(uri_options.begin(), uri_options.end(), thread_local_rng);
return pipe;
}

View File

@ -54,7 +54,7 @@ def test_url_with_globs_and_failover(started_cluster):
result = node1.query(
"select * from url('http://hdfs1:50075/webhdfs/v1/simple_storage_{0|1|2|3}_{1..3}?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', 'TSV', 'data String') as data order by data")
assert result == "1\n2\n3\n"
assert result == "1\n2\n3\n" or result == "4\n5\n6\n"
def test_url_with_redirect_not_allowed(started_cluster):