Split download threads when multiple URLs are used

This commit is contained in:
Antonio Andelic 2022-03-14 09:27:09 +00:00
parent f5d3a8a31d
commit 9dda2863d3

View File

@ -3,34 +3,34 @@
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h> #include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTCreateQuery.h> #include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTInsertQuery.h> #include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ReadHelpers.h>
#include <IO/ParallelReadBuffer.h>
#include <IO/WriteBufferFromHTTP.h>
#include <IO/WriteHelpers.h>
#include <IO/ConnectionTimeouts.h> #include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h> #include <IO/ConnectionTimeoutsContext.h>
#include <IO/IOThreadPool.h> #include <IO/IOThreadPool.h>
#include <IO/ParallelReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromHTTP.h>
#include <IO/WriteHelpers.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Formats/ReadSchemaUtils.h> #include <Formats/ReadSchemaUtils.h>
#include <Processors/Formats/IInputFormat.h> #include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/IOutputFormat.h> #include <Processors/Formats/IOutputFormat.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Storages/PartitionedSink.h>
#include <Common/parseRemoteDescription.h> #include <Common/parseRemoteDescription.h>
#include "IO/HTTPCommon.h" #include "IO/HTTPCommon.h"
#include "IO/ReadWriteBufferFromHTTP.h" #include "IO/ReadWriteBufferFromHTTP.h"
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Storages/PartitionedSink.h>
#include <Poco/Net/HTTPRequest.h> #include <algorithm>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Sources/SourceWithProgress.h> #include <Processors/Sources/SourceWithProgress.h>
#include <QueryPipeline/QueryPipelineBuilder.h> #include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <base/logger_useful.h> #include <base/logger_useful.h>
#include <algorithm> #include <Poco/Net/HTTPRequest.h>
namespace DB namespace DB
@ -47,8 +47,7 @@ namespace ErrorCodes
static bool urlWithGlobs(const String & uri) static bool urlWithGlobs(const String & uri)
{ {
return (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos) return (uri.find('{') != std::string::npos && uri.find('}') != std::string::npos) || uri.find('|') != std::string::npos;
|| uri.find('|') != std::string::npos;
} }
@ -92,8 +91,7 @@ IStorageURLBase::IStorageURLBase(
namespace namespace
{ {
ReadWriteBufferFromHTTP::HTTPHeaderEntries getHeaders( ReadWriteBufferFromHTTP::HTTPHeaderEntries getHeaders(const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_)
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_)
{ {
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers(headers_.begin(), headers_.end()); ReadWriteBufferFromHTTP::HTTPHeaderEntries headers(headers_.begin(), headers_.end());
// Propagate OpenTelemetry trace context, if any, downstream. // Propagate OpenTelemetry trace context, if any, downstream.
@ -102,13 +100,11 @@ namespace
const auto & thread_trace_context = CurrentThread::get().thread_trace_context; const auto & thread_trace_context = CurrentThread::get().thread_trace_context;
if (thread_trace_context.trace_id != UUID()) if (thread_trace_context.trace_id != UUID())
{ {
headers.emplace_back("traceparent", headers.emplace_back("traceparent", thread_trace_context.composeTraceparentHeader());
thread_trace_context.composeTraceparentHeader());
if (!thread_trace_context.tracestate.empty()) if (!thread_trace_context.tracestate.empty())
{ {
headers.emplace_back("tracestate", headers.emplace_back("tracestate", thread_trace_context.tracestate);
thread_trace_context.tracestate);
} }
} }
} }
@ -118,8 +114,7 @@ namespace
class StorageURLSource : public SourceWithProgress class StorageURLSource : public SourceWithProgress
{ {
using URIParams = std::vector<std::pair<String, String>>;
using URIParams = std::vector<std::pair<String, String>>;
public: public:
struct URIInfo struct URIInfo
@ -164,11 +159,11 @@ namespace
UInt64 max_block_size, UInt64 max_block_size,
const ConnectionTimeouts & timeouts, const ConnectionTimeouts & timeouts,
const String & compression_method, const String & compression_method,
size_t download_threads,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {}, const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_ = {},
const URIParams & params = {}, const URIParams & params = {},
bool glob_url = false) bool glob_url = false)
: SourceWithProgress(sample_block), name(std::move(name_)) : SourceWithProgress(sample_block), name(std::move(name_)), uri_info(uri_info_)
, uri_info(uri_info_)
{ {
auto headers = getHeaders(headers_); auto headers = getHeaders(headers_);
@ -180,33 +175,40 @@ namespace
auto first_option = uri_options.begin(); auto first_option = uri_options.begin();
read_buf = getFirstAvailableURLReadBuffer( read_buf = getFirstAvailableURLReadBuffer(
first_option, uri_options.end(), context, params, http_method, first_option,
callback, timeouts, compression_method, credentials, headers, glob_url, uri_options.size() == 1); uri_options.end(),
context,
params,
http_method,
callback,
timeouts,
compression_method,
credentials,
headers,
glob_url,
uri_options.size() == 1,
download_threads);
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings); auto input_format
= FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size, format_settings);
QueryPipelineBuilder builder; QueryPipelineBuilder builder;
builder.init(Pipe(input_format)); builder.init(Pipe(input_format));
builder.addSimpleTransform([&](const Block & cur_header) builder.addSimpleTransform(
{ [&](const Block & cur_header)
return std::make_shared<AddingDefaultsTransform>(cur_header, columns, *input_format, context); { return std::make_shared<AddingDefaultsTransform>(cur_header, columns, *input_format, context); });
});
pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder))); pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
reader = std::make_unique<PullingPipelineExecutor>(*pipeline); reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
}; };
} }
String getName() const override String getName() const override { return name; }
{
return name;
}
Chunk generate() override Chunk generate() override
{ {
while (true) while (true)
{ {
if (!reader) if (!reader)
{ {
auto current_uri_pos = uri_info->next_uri_to_read.fetch_add(1); auto current_uri_pos = uri_info->next_uri_to_read.fetch_add(1);
@ -243,7 +245,8 @@ namespace
Poco::Net::HTTPBasicCredentials & credentials, Poco::Net::HTTPBasicCredentials & credentials,
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers, const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers,
bool glob_url, bool glob_url,
bool delay_initialization) bool delay_initialization,
size_t download_threads)
{ {
String first_exception_message; String first_exception_message;
ReadSettings read_settings = context->getReadSettings(); ReadSettings read_settings = context->getReadSettings();
@ -262,7 +265,7 @@ namespace
const auto settings = context->getSettings(); const auto settings = context->getSettings();
try try
{ {
if (settings.max_download_threads > 1) if (download_threads > 1)
{ {
try try
{ {
@ -296,7 +299,7 @@ namespace
LOG_TRACE( LOG_TRACE(
&Poco::Logger::get("StorageURLSource"), &Poco::Logger::get("StorageURLSource"),
"Using ParallelReadBuffer with {} workers with chunks of {} bytes", "Using ParallelReadBuffer with {} workers with chunks of {} bytes",
settings.max_download_threads, download_threads,
settings.max_download_buffer_size); settings.max_download_buffer_size);
auto read_buffer_factory = std::make_unique<RangedReadWriteBufferFromHTTPFactory>( auto read_buffer_factory = std::make_unique<RangedReadWriteBufferFromHTTPFactory>(
@ -317,7 +320,7 @@ namespace
/* skip_url_not_found_error */ skip_url_not_found_error); /* skip_url_not_found_error */ skip_url_not_found_error);
return wrapReadBufferWithCompressionMethod( return wrapReadBufferWithCompressionMethod(
std::make_unique<ParallelReadBuffer>( std::make_unique<ParallelReadBuffer>(
std::move(read_buffer_factory), &IOThreadPool::get(), settings.max_download_threads), std::move(read_buffer_factory), &IOThreadPool::get(), download_threads),
chooseCompressionMethod(request_uri.getPath(), compression_method)); chooseCompressionMethod(request_uri.getPath(), compression_method));
} }
} }
@ -329,11 +332,7 @@ namespace
} }
} }
LOG_TRACE( LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "Using single-threaded read buffer");
&Poco::Logger::get("StorageURLSource"),
"Using single-threaded read buffer",
settings.max_download_threads,
settings.max_download_buffer_size);
return wrapReadBufferWithCompressionMethod( return wrapReadBufferWithCompressionMethod(
std::make_unique<ReadWriteBufferFromHTTP>( std::make_unique<ReadWriteBufferFromHTTP>(
@ -401,10 +400,10 @@ StorageURLSink::StorageURLSink(
std::string content_encoding = toContentEncodingName(compression_method); std::string content_encoding = toContentEncodingName(compression_method);
write_buf = wrapWriteBufferWithCompressionMethod( write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromHTTP>(Poco::URI(uri), http_method, content_type, content_encoding, timeouts), std::make_unique<WriteBufferFromHTTP>(Poco::URI(uri), http_method, content_type, content_encoding, timeouts),
compression_method, 3); compression_method,
writer = FormatFactory::instance().getOutputFormat(format, *write_buf, sample_block, 3);
context, {} /* write callback */, format_settings); writer = FormatFactory::instance().getOutputFormat(format, *write_buf, sample_block, context, {} /* write callback */, format_settings);
} }
@ -433,15 +432,15 @@ public:
const ConnectionTimeouts & timeouts_, const ConnectionTimeouts & timeouts_,
const CompressionMethod compression_method_, const CompressionMethod compression_method_,
const String & http_method_) const String & http_method_)
: PartitionedSink(partition_by, context_, sample_block_) : PartitionedSink(partition_by, context_, sample_block_)
, uri(uri_) , uri(uri_)
, format(format_) , format(format_)
, format_settings(format_settings_) , format_settings(format_settings_)
, sample_block(sample_block_) , sample_block(sample_block_)
, context(context_) , context(context_)
, timeouts(timeouts_) , timeouts(timeouts_)
, compression_method(compression_method_) , compression_method(compression_method_)
, http_method(http_method_) , http_method(http_method_)
{ {
} }
@ -449,8 +448,8 @@ public:
{ {
auto partition_path = PartitionedSink::replaceWildcards(uri, partition_id); auto partition_path = PartitionedSink::replaceWildcards(uri, partition_id);
context->getRemoteHostFilter().checkURL(Poco::URI(partition_path)); context->getRemoteHostFilter().checkURL(Poco::URI(partition_path));
return std::make_shared<StorageURLSink>(partition_path, format, return std::make_shared<StorageURLSink>(
format_settings, sample_block, context, timeouts, compression_method, http_method); partition_path, format, format_settings, sample_block, context, timeouts, compression_method, http_method);
} }
private: private:
@ -540,7 +539,8 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
credentials, credentials,
headers, headers,
false, false,
false); false,
context->getSettingsRef().max_download_threads);
}; };
try try
@ -557,7 +557,10 @@ ColumnsDescription IStorageURLBase::getTableStructureFromData(
} while (++option < urls_to_check.end()); } while (++option < urls_to_check.end());
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "All attempts to extract table structure from urls failed. Errors:\n{}", exception_messages); throw Exception(
ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE,
"All attempts to extract table structure from urls failed. Errors:\n{}",
exception_messages);
} }
bool IStorageURLBase::isColumnOriented() const bool IStorageURLBase::isColumnOriented() const
@ -590,6 +593,8 @@ Pipe IStorageURLBase::read(
block_for_format = metadata_snapshot->getSampleBlock(); block_for_format = metadata_snapshot->getSampleBlock();
} }
size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
if (urlWithGlobs(uri)) if (urlWithGlobs(uri))
{ {
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements; size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
@ -606,14 +611,19 @@ Pipe IStorageURLBase::read(
Pipes pipes; Pipes pipes;
pipes.reserve(num_streams); pipes.reserve(num_streams);
size_t remaining_download_threads = max_download_threads;
for (size_t i = 0; i < num_streams; ++i) for (size_t i = 0; i < num_streams; ++i)
{ {
size_t current_need_download_threads = num_streams >= max_download_threads ? 1 : (max_download_threads / num_streams);
size_t current_download_threads = std::min(current_need_download_threads, remaining_download_threads);
remaining_download_threads -= current_download_threads;
current_download_threads = std::max(static_cast<size_t>(1), current_download_threads);
pipes.emplace_back(std::make_shared<StorageURLSource>( pipes.emplace_back(std::make_shared<StorageURLSource>(
uri_info, uri_info,
getReadMethod(), getReadMethod(),
getReadPOSTDataCallback( getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
column_names, columns_description, query_info,
local_context, processed_stage, max_block_size),
format_name, format_name,
format_settings, format_settings,
getName(), getName(),
@ -622,7 +632,11 @@ Pipe IStorageURLBase::read(
columns_description, columns_description,
max_block_size, max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context), ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params, /* glob_url */true)); compression_method,
current_download_threads,
headers,
params,
/* glob_url */ true));
} }
return Pipe::unitePipes(std::move(pipes)); return Pipe::unitePipes(std::move(pipes));
} }
@ -633,9 +647,7 @@ Pipe IStorageURLBase::read(
return Pipe(std::make_shared<StorageURLSource>( return Pipe(std::make_shared<StorageURLSource>(
uri_info, uri_info,
getReadMethod(), getReadMethod(),
getReadPOSTDataCallback( getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
column_names, columns_description, query_info,
local_context, processed_stage, max_block_size),
format_name, format_name,
format_settings, format_settings,
getName(), getName(),
@ -644,7 +656,10 @@ Pipe IStorageURLBase::read(
columns_description, columns_description,
max_block_size, max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context), ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params)); compression_method,
max_download_threads,
headers,
params));
} }
} }
@ -676,12 +691,10 @@ Pipe StorageURLWithFailover::read(
auto uri_info = std::make_shared<StorageURLSource::URIInfo>(); auto uri_info = std::make_shared<StorageURLSource::URIInfo>();
uri_info->uri_list_to_read.emplace_back(uri_options); uri_info->uri_list_to_read.emplace_back(uri_options);
auto pipe = Pipe(std::make_shared<StorageURLSource>( auto pipe = Pipe(std::make_shared<StorageURLSource>(
uri_info, uri_info,
getReadMethod(), getReadMethod(),
getReadPOSTDataCallback( getReadPOSTDataCallback(column_names, columns_description, query_info, local_context, processed_stage, max_block_size),
column_names, columns_description, query_info,
local_context, processed_stage, max_block_size),
format_name, format_name,
format_settings, format_settings,
getName(), getName(),
@ -690,7 +703,10 @@ Pipe StorageURLWithFailover::read(
columns_description, columns_description,
max_block_size, max_block_size,
ConnectionTimeouts::getHTTPTimeouts(local_context), ConnectionTimeouts::getHTTPTimeouts(local_context),
compression_method, headers, params)); compression_method,
local_context->getSettingsRef().max_download_threads,
headers,
params));
std::shuffle(uri_options.begin(), uri_options.end(), thread_local_rng); std::shuffle(uri_options.begin(), uri_options.end(), thread_local_rng);
return pipe; return pipe;
} }
@ -710,17 +726,26 @@ SinkToStoragePtr IStorageURLBase::write(const ASTPtr & query, const StorageMetad
{ {
return std::make_shared<PartitionedStorageURLSink>( return std::make_shared<PartitionedStorageURLSink>(
partition_by_ast, partition_by_ast,
uri, format_name, uri,
format_settings, metadata_snapshot->getSampleBlock(), context, format_name,
format_settings,
metadata_snapshot->getSampleBlock(),
context,
ConnectionTimeouts::getHTTPTimeouts(context), ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(uri, compression_method), http_method); chooseCompressionMethod(uri, compression_method),
http_method);
} }
else else
{ {
return std::make_shared<StorageURLSink>(uri, format_name, return std::make_shared<StorageURLSink>(
format_settings, metadata_snapshot->getSampleBlock(), context, uri,
format_name,
format_settings,
metadata_snapshot->getSampleBlock(),
context,
ConnectionTimeouts::getHTTPTimeouts(context), ConnectionTimeouts::getHTTPTimeouts(context),
chooseCompressionMethod(uri, compression_method), http_method); chooseCompressionMethod(uri, compression_method),
http_method);
} }
} }
@ -737,8 +762,19 @@ StorageURL::StorageURL(
const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_, const ReadWriteBufferFromHTTP::HTTPHeaderEntries & headers_,
const String & http_method_, const String & http_method_,
ASTPtr partition_by_) ASTPtr partition_by_)
: IStorageURLBase(uri_, context_, table_id_, format_name_, format_settings_, : IStorageURLBase(
columns_, constraints_, comment, compression_method_, headers_, http_method_, partition_by_) uri_,
context_,
table_id_,
format_name_,
format_settings_,
columns_,
constraints_,
comment,
compression_method_,
headers_,
http_method_,
partition_by_)
{ {
context_->getRemoteHostFilter().checkURL(Poco::URI(uri)); context_->getRemoteHostFilter().checkURL(Poco::URI(uri));
} }
@ -789,8 +825,7 @@ FormatSettings StorageURL::getFormatSettingsFromArgs(const StorageFactory::Argum
// Apply changes from SETTINGS clause, with validation. // Apply changes from SETTINGS clause, with validation.
user_format_settings.applyChanges(args.storage_def->settings->changes); user_format_settings.applyChanges(args.storage_def->settings->changes);
format_settings = getFormatSettings(args.getContext(), format_settings = getFormatSettings(args.getContext(), user_format_settings);
user_format_settings);
} }
else else
{ {
@ -809,12 +844,12 @@ URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, Contex
auto [common_configuration, storage_specific_args] = named_collection.value(); auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration); configuration.set(common_configuration);
if (!configuration.http_method.empty() if (!configuration.http_method.empty() && configuration.http_method != Poco::Net::HTTPRequest::HTTP_POST
&& configuration.http_method != Poco::Net::HTTPRequest::HTTP_POST
&& configuration.http_method != Poco::Net::HTTPRequest::HTTP_PUT) && configuration.http_method != Poco::Net::HTTPRequest::HTTP_PUT)
throw Exception(ErrorCodes::BAD_ARGUMENTS, throw Exception(
"Http method can be POST or PUT (current: {}). For insert default is POST, for select GET", ErrorCodes::BAD_ARGUMENTS,
configuration.http_method); "Http method can be POST or PUT (current: {}). For insert default is POST, for select GET",
configuration.http_method);
if (!storage_specific_args.empty()) if (!storage_specific_args.empty())
{ {
@ -832,7 +867,8 @@ URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, Contex
{ {
if (args.empty() || args.size() > 3) if (args.empty() || args.size() > 3)
throw Exception( throw Exception(
"Storage URL requires 1, 2 or 3 arguments: url, name of used format (taken from file extension by default) and optional compression method.", "Storage URL requires 1, 2 or 3 arguments: url, name of used format (taken from file extension by default) and optional "
"compression method.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & arg : args) for (auto & arg : args)
@ -854,43 +890,45 @@ URLBasedDataSourceConfiguration StorageURL::getConfiguration(ASTs & args, Contex
void registerStorageURL(StorageFactory & factory) void registerStorageURL(StorageFactory & factory)
{ {
factory.registerStorage("URL", [](const StorageFactory::Arguments & args) factory.registerStorage(
{ "URL",
ASTs & engine_args = args.engine_args; [](const StorageFactory::Arguments & args)
auto configuration = StorageURL::getConfiguration(engine_args, args.getLocalContext());
auto format_settings = StorageURL::getFormatSettingsFromArgs(args);
ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
for (const auto & [header, value] : configuration.headers)
{ {
auto value_literal = value.safeGet<String>(); ASTs & engine_args = args.engine_args;
if (header == "Range") auto configuration = StorageURL::getConfiguration(engine_args, args.getLocalContext());
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed"); auto format_settings = StorageURL::getFormatSettingsFromArgs(args);
headers.emplace_back(std::make_pair(header, value_literal));
}
ASTPtr partition_by; ReadWriteBufferFromHTTP::HTTPHeaderEntries headers;
if (args.storage_def->partition_by) for (const auto & [header, value] : configuration.headers)
partition_by = args.storage_def->partition_by->clone(); {
auto value_literal = value.safeGet<String>();
if (header == "Range")
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Range headers are not allowed");
headers.emplace_back(std::make_pair(header, value_literal));
}
return StorageURL::create( ASTPtr partition_by;
configuration.url, if (args.storage_def->partition_by)
args.table_id, partition_by = args.storage_def->partition_by->clone();
configuration.format,
format_settings, return StorageURL::create(
args.columns, configuration.url,
args.constraints, args.table_id,
args.comment, configuration.format,
args.getContext(), format_settings,
configuration.compression_method, args.columns,
headers, args.constraints,
configuration.http_method, args.comment,
partition_by); args.getContext(),
}, configuration.compression_method,
{ headers,
.supports_settings = true, configuration.http_method,
.supports_schema_inference = true, partition_by);
.source_access_type = AccessType::URL, },
}); {
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::URL,
});
} }
} }