Merge pull request #32907 from kssenii/url-better

Better handling of globs for url storage
This commit is contained in:
Kseniia Sumarokova 2021-12-17 19:08:38 +03:00 committed by GitHub
commit f6e7e11742
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -95,11 +95,20 @@ namespace
class StorageURLSource : public SourceWithProgress
{
using URIParams = std::vector<std::pair<String, String>>;
public:
struct URIInfo
{
using FailoverOptions = std::vector<String>;
std::vector<FailoverOptions> uri_list_to_read;
std::atomic<size_t> next_uri_to_read = 0;
};
using URIInfoPtr = std::shared_ptr<URIInfo>;
StorageURLSource(
const std::vector<String> & uri_options,
URIInfoPtr uri_info_,
const std::string & http_method,
std::function<void(std::ostream &)> callback,
const String & format,
@ -114,10 +123,12 @@ namespace
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {},
const URIParams & params = {})
: SourceWithProgress(sample_block), name(std::move(name_))
, uri_info(uri_info_)
{
auto headers = getHeaders(headers_);
/// Lazy initialization. We should not perform requests in constructor, because we need to do it in query pipeline.
initialize = [=, this]
initialize = [=, this](const URIInfo::FailoverOptions & uri_options)
{
WriteBufferFromOwnString error_message;
for (auto option = uri_options.begin(); option < uri_options.end(); ++option)
@ -135,10 +146,11 @@ namespace
if (n != std::string::npos)
{
credentials.setUsername(user_info.substr(0, n));
credentials.setPassword(user_info.substr(n+1));
credentials.setPassword(user_info.substr(n + 1));
}
}
/// Get first alive uri.
read_buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>(
request_uri,
@ -188,29 +200,34 @@ namespace
Chunk generate() override
{
if (initialize)
while (true)
{
initialize();
initialize = {};
if (!reader)
{
auto current_uri_pos = uri_info->next_uri_to_read.fetch_add(1);
if (current_uri_pos >= uri_info->uri_list_to_read.size())
return {};
auto current_uri = uri_info->uri_list_to_read[current_uri_pos];
initialize(current_uri);
}
Chunk chunk;
if (reader->pull(chunk))
return chunk;
pipeline->reset();
reader.reset();
}
if (!reader)
return {};
Chunk chunk;
if (reader->pull(chunk))
return chunk;
pipeline->reset();
reader.reset();
return {};
}
private:
std::function<void()> initialize;
using InitializeFunc = std::function<void(const URIInfo::FailoverOptions &)>;
InitializeFunc initialize;
String name;
URIInfoPtr uri_info;
std::unique_ptr<ReadBuffer> read_buf;
std::unique_ptr<QueryPipeline> pipeline;
std::unique_ptr<PullingPipelineExecutor> reader;
@ -332,7 +349,7 @@ Pipe IStorageURLBase::read(
ContextPtr local_context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned /*num_streams*/)
unsigned num_streams)
{
auto params = getReadURIParams(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size);
bool with_globs = (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos)
@ -341,18 +358,23 @@ Pipe IStorageURLBase::read(
if (with_globs)
{
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
std::vector<String> url_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses);
std::vector<String> uri_options;
auto uri_descriptions = parseRemoteDescription(uri, 0, uri.size(), ',', max_addresses);
if (num_streams > uri_descriptions.size())
num_streams = uri_descriptions.size();
/// For each uri (which acts like shard) check if it has failover options
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
for (const auto & description : uri_descriptions)
uri_info->uri_list_to_read.emplace_back(parseRemoteDescription(description, 0, description.size(), '|', max_addresses));
Pipes pipes;
for (const auto & url_description : url_descriptions)
{
/// For each uri (which acts like shard) check if it has failover options
uri_options = parseRemoteDescription(url_description, 0, url_description.size(), '|', max_addresses);
StoragePtr shard;
pipes.reserve(num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageURLSource>(
uri_options,
uri_info,
getReadMethod(),
getReadPOSTDataCallback(
column_names, metadata_snapshot, query_info,
@ -371,9 +393,10 @@ Pipe IStorageURLBase::read(
}
else
{
std::vector<String> uri_options{uri};
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
uri_info->uri_list_to_read.emplace_back(std::vector<String>{uri});
return Pipe(std::make_shared<StorageURLSource>(
uri_options,
uri_info,
getReadMethod(),
getReadPOSTDataCallback(
column_names, metadata_snapshot, query_info,
@ -402,8 +425,10 @@ Pipe StorageURLWithFailover::read(
{
auto params = getReadURIParams(column_names, metadata_snapshot, query_info, local_context, processed_stage, max_block_size);
auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
uri_info->uri_list_to_read.emplace_back(uri_options);
auto pipe = Pipe(std::make_shared<StorageURLSource>(
uri_options,
uri_info,
getReadMethod(),
getReadPOSTDataCallback(
column_names, metadata_snapshot, query_info,