Do not create ThreadPool for single thread.

This commit is contained in:
Nikolai Kochetov 2020-07-22 14:51:35 +03:00
parent da4a66b443
commit b27066389a

View File

@ -571,42 +571,50 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
/// Let's find what range to read from each part.
{
/// Parallel loading of data parts.
size_t num_threads = std::min(size_t(num_streams), parts.size());
ThreadPool pool(num_threads);
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
auto process_part = [&](size_t part_index)
{
pool.scheduleOrThrowOnError([&, part_index]
auto & part = parts[part_index];
RangesInDataPart ranges(part, part_index);
if (metadata_snapshot->hasPrimaryKey())
ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings, log);
else
{
auto & part = parts[part_index];
RangesInDataPart ranges(part, part_index);
if (metadata_snapshot->hasPrimaryKey())
ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings, log);
else
size_t total_marks_count = part->getMarksCount();
if (total_marks_count)
{
size_t total_marks_count = part->getMarksCount();
if (total_marks_count)
{
if (part->index_granularity.hasFinalMark())
--total_marks_count;
ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}};
}
if (part->index_granularity.hasFinalMark())
--total_marks_count;
ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}};
}
}
for (const auto & index_and_condition : useful_indices)
ranges.ranges = filterMarksUsingIndex(
for (const auto & index_and_condition : useful_indices)
ranges.ranges = filterMarksUsingIndex(
index_and_condition.first, index_and_condition.second, part, ranges.ranges, settings, reader_settings, log);
if (!ranges.ranges.empty())
parts_with_ranges[part_index] = std::move(ranges);
});
}
if (!ranges.ranges.empty())
parts_with_ranges[part_index] = std::move(ranges);
};
pool.wait();
size_t num_threads = std::min(size_t(num_streams), parts.size());
if (num_threads <= 1)
{
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
process_part(part_index);
}
else
{
/// Parallel loading of data parts.
ThreadPool pool(num_threads);
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
pool.scheduleOrThrowOnError([&, part_index] { process_part(part_index); });
pool.wait();
}
/// Skip empty ranges.
size_t next_part = 0;