mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 23:52:03 +00:00
Merge pull request #53668 from ClickHouse/pufit/fix_s3_threads
Limiting the number of parsing threads for the S3 source
This commit is contained in:
commit
926533306c
@ -527,7 +527,7 @@ StorageS3Source::StorageS3Source(
|
||||
const String & bucket_,
|
||||
const String & version_id_,
|
||||
std::shared_ptr<IIterator> file_iterator_,
|
||||
const size_t download_thread_num_,
|
||||
const size_t max_parsing_threads_,
|
||||
std::optional<SelectQueryInfo> query_info_)
|
||||
: ISource(info.source_header, false)
|
||||
, WithContext(context_)
|
||||
@ -546,7 +546,7 @@ StorageS3Source::StorageS3Source(
|
||||
, query_info(std::move(query_info_))
|
||||
, requested_virtual_columns(info.requested_virtual_columns)
|
||||
, file_iterator(file_iterator_)
|
||||
, download_thread_num(download_thread_num_)
|
||||
, max_parsing_threads(max_parsing_threads_)
|
||||
, create_reader_pool(CurrentMetrics::StorageS3Threads, CurrentMetrics::StorageS3ThreadsActive, 1)
|
||||
, create_reader_scheduler(threadPoolCallbackRunner<ReaderHolder>(create_reader_pool, "CreateS3Reader"))
|
||||
{
|
||||
@ -573,9 +573,17 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
|
||||
|
||||
auto read_buf = createS3ReadBuffer(key_with_info.key, object_size);
|
||||
auto input_format = FormatFactory::instance().getInput(
|
||||
format, *read_buf, sample_block, getContext(), max_block_size,
|
||||
format_settings, std::nullopt, std::nullopt,
|
||||
/* is_remote_fs */ true, compression_method);
|
||||
format,
|
||||
*read_buf,
|
||||
sample_block,
|
||||
getContext(),
|
||||
max_block_size,
|
||||
format_settings,
|
||||
max_parsing_threads,
|
||||
/* max_download_threads= */ std::nullopt,
|
||||
/* is_remote_fs */ true,
|
||||
compression_method);
|
||||
|
||||
if (query_info.has_value())
|
||||
input_format->setQueryInfo(query_info.value(), getContext());
|
||||
|
||||
@ -1035,7 +1043,9 @@ Pipe StorageS3::read(
|
||||
|
||||
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
|
||||
|
||||
const size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
|
||||
const size_t max_threads = local_context->getSettingsRef().max_threads;
|
||||
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / num_streams);
|
||||
|
||||
for (size_t i = 0; i < num_streams; ++i)
|
||||
{
|
||||
pipes.emplace_back(std::make_shared<StorageS3Source>(
|
||||
@ -1051,7 +1061,7 @@ Pipe StorageS3::read(
|
||||
query_configuration.url.bucket,
|
||||
query_configuration.url.version_id,
|
||||
iterator_wrapper,
|
||||
max_download_threads,
|
||||
max_parsing_threads,
|
||||
query_info));
|
||||
}
|
||||
|
||||
|
@ -130,7 +130,7 @@ public:
|
||||
const String & bucket,
|
||||
const String & version_id,
|
||||
std::shared_ptr<IIterator> file_iterator_,
|
||||
size_t download_thread_num,
|
||||
size_t max_parsing_threads,
|
||||
std::optional<SelectQueryInfo> query_info);
|
||||
|
||||
~StorageS3Source() override;
|
||||
@ -218,7 +218,7 @@ private:
|
||||
|
||||
NamesAndTypesList requested_virtual_columns;
|
||||
std::shared_ptr<IIterator> file_iterator;
|
||||
size_t download_thread_num = 1;
|
||||
size_t max_parsing_threads = 1;
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("StorageS3Source");
|
||||
|
||||
|
@ -221,7 +221,7 @@ StorageURLSource::StorageURLSource(
|
||||
UInt64 max_block_size,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
CompressionMethod compression_method,
|
||||
size_t download_threads,
|
||||
size_t max_parsing_threads,
|
||||
const SelectQueryInfo & query_info,
|
||||
const HTTPHeaderEntries & headers_,
|
||||
const URIParams & params,
|
||||
@ -275,7 +275,6 @@ StorageURLSource::StorageURLSource(
|
||||
file_progress_callback(FileProgress(0, file_size));
|
||||
}
|
||||
|
||||
// TODO: Pass max_parsing_threads and max_download_threads adjusted for num_streams.
|
||||
input_format = FormatFactory::instance().getInput(
|
||||
format,
|
||||
*read_buf,
|
||||
@ -283,9 +282,9 @@ StorageURLSource::StorageURLSource(
|
||||
context,
|
||||
max_block_size,
|
||||
format_settings,
|
||||
download_threads,
|
||||
/*max_download_threads*/ std::nullopt,
|
||||
/* is_remote_fs */ true,
|
||||
max_parsing_threads,
|
||||
/* max_download_threads= */ std::nullopt,
|
||||
/* is_remote_fs= */ true,
|
||||
compression_method);
|
||||
input_format->setQueryInfo(query_info, context);
|
||||
|
||||
@ -706,8 +705,6 @@ Pipe IStorageURLBase::read(
|
||||
{
|
||||
auto params = getReadURIParams(column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size);
|
||||
|
||||
size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
|
||||
|
||||
std::shared_ptr<StorageURLSource::IteratorWrapper> iterator_wrapper{nullptr};
|
||||
bool is_url_with_globs = urlWithGlobs(uri);
|
||||
size_t max_addresses = local_context->getSettingsRef().glob_expansion_max_elements;
|
||||
@ -754,7 +751,9 @@ Pipe IStorageURLBase::read(
|
||||
Pipes pipes;
|
||||
pipes.reserve(num_streams);
|
||||
|
||||
size_t download_threads = num_streams >= max_download_threads ? 1 : (max_download_threads / num_streams);
|
||||
const size_t max_threads = local_context->getSettingsRef().max_threads;
|
||||
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / num_streams);
|
||||
|
||||
for (size_t i = 0; i < num_streams; ++i)
|
||||
{
|
||||
pipes.emplace_back(std::make_shared<StorageURLSource>(
|
||||
@ -775,7 +774,7 @@ Pipe IStorageURLBase::read(
|
||||
max_block_size,
|
||||
getHTTPTimeouts(local_context),
|
||||
compression_method,
|
||||
download_threads,
|
||||
max_parsing_threads,
|
||||
query_info,
|
||||
headers,
|
||||
params,
|
||||
@ -793,7 +792,7 @@ Pipe StorageURLWithFailover::read(
|
||||
ContextPtr local_context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
size_t /*num_streams*/)
|
||||
size_t num_streams)
|
||||
{
|
||||
auto params = getReadURIParams(column_names, storage_snapshot, query_info, local_context, processed_stage, max_block_size);
|
||||
|
||||
@ -807,6 +806,9 @@ Pipe StorageURLWithFailover::read(
|
||||
|
||||
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(), getVirtuals());
|
||||
|
||||
const size_t max_threads = local_context->getSettingsRef().max_threads;
|
||||
const size_t max_parsing_threads = num_streams >= max_threads ? 1 : (max_threads / num_streams);
|
||||
|
||||
auto pipe = Pipe(std::make_shared<StorageURLSource>(
|
||||
read_from_format_info,
|
||||
iterator_wrapper,
|
||||
@ -819,7 +821,7 @@ Pipe StorageURLWithFailover::read(
|
||||
max_block_size,
|
||||
getHTTPTimeouts(local_context),
|
||||
compression_method,
|
||||
local_context->getSettingsRef().max_download_threads,
|
||||
max_parsing_threads,
|
||||
query_info,
|
||||
headers,
|
||||
params));
|
||||
|
@ -170,7 +170,7 @@ public:
|
||||
UInt64 max_block_size,
|
||||
const ConnectionTimeouts & timeouts,
|
||||
CompressionMethod compression_method,
|
||||
size_t download_threads,
|
||||
size_t max_parsing_threads,
|
||||
const SelectQueryInfo & query_info,
|
||||
const HTTPHeaderEntries & headers_ = {},
|
||||
const URIParams & params = {},
|
||||
|
Loading…
Reference in New Issue
Block a user