Parallelize PK range and skipping index stages

This runs PK lookup and skipping index stages on parts
in parallel, as described in #11564.

While #12277 sped up PK lookups, skipping index stage
may still be a bottleneck in a select query. Here we
parallelize both stages between parts.

On a query that uses a bloom filter skipping index to pick
2,688 rows out of 8,273,114,994 on a two day time span,
this change reduces latency from 10.5s to 1.5s.
This commit is contained in:
Ivan Babrou 2020-07-19 21:43:10 -07:00
parent b8cc2bee53
commit 72622a9b00

View File

@ -569,39 +569,55 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
useful_indices.emplace_back(index_helper, condition);
}
/// Parallel loading of data parts.
size_t num_threads = std::min(size_t(num_streams), parts.size());
std::mutex mutex;
ThreadPool pool(num_threads);
/// Let's find what range to read from each part.
size_t sum_marks = 0;
size_t sum_ranges = 0;
for (auto & part : parts)
for (size_t i = 0; i < parts.size(); ++i)
{
RangesInDataPart ranges(part, part_index++);
if (metadata_snapshot->hasPrimaryKey())
ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings);
else
pool.scheduleOrThrowOnError([&, i]
{
size_t total_marks_count = part->getMarksCount();
if (total_marks_count)
auto & part = parts[i];
RangesInDataPart ranges(part, part_index++);
if (metadata_snapshot->hasPrimaryKey())
ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings);
else
{
if (part->index_granularity.hasFinalMark())
--total_marks_count;
ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}};
size_t total_marks_count = part->getMarksCount();
if (total_marks_count)
{
if (part->index_granularity.hasFinalMark())
--total_marks_count;
ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}};
}
}
}
for (const auto & index_and_condition : useful_indices)
ranges.ranges = filterMarksUsingIndex(
index_and_condition.first, index_and_condition.second, part, ranges.ranges, settings, reader_settings);
for (const auto & index_and_condition : useful_indices)
ranges.ranges = filterMarksUsingIndex(
index_and_condition.first, index_and_condition.second, part, ranges.ranges, settings, reader_settings);
if (!ranges.ranges.empty())
{
parts_with_ranges.push_back(ranges);
if (!ranges.ranges.empty())
{
std::lock_guard loading_lock(mutex);
sum_ranges += ranges.ranges.size();
sum_marks += ranges.getMarksCount();
}
parts_with_ranges.push_back(ranges);
sum_ranges += ranges.ranges.size();
sum_marks += ranges.getMarksCount();
}
});
}
pool.wait();
LOG_DEBUG(log, "Selected {} parts by date, {} parts by key, {} marks to read from {} ranges", parts.size(), parts_with_ranges.size(), sum_marks, sum_ranges);
if (parts_with_ranges.empty())