Merge pull request #12589 from bobrik/ivan/parallel-ranges

Parallelize PK range and skipping index stages
This commit is contained in:
Nikolai Kochetov 2020-07-27 19:23:00 +03:00 committed by GitHub
commit abdd160db1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 101 additions and 52 deletions

View File

@ -473,18 +473,9 @@ MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector<K
size_t tuple_size = indexes_mapping.size();
ordered_set.resize(tuple_size);
/// Create columns for points here to avoid extra allocations at 'checkInRange'.
left_point.reserve(tuple_size);
right_point.reserve(tuple_size);
for (size_t i = 0; i < tuple_size; ++i)
{
ordered_set[i] = set_elements[indexes_mapping[i].tuple_index];
left_point.emplace_back(ordered_set[i]->cloneEmpty());
right_point.emplace_back(ordered_set[i]->cloneEmpty());
}
Block block_to_sort;
SortDescription sort_description;
for (size_t i = 0; i < tuple_size; ++i)
@ -504,10 +495,21 @@ MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector<K
* 1: the intersection of the set and the range is non-empty
* 2: the range contains elements not in the set
*/
BoolMask MergeTreeSetIndex::checkInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types)
BoolMask MergeTreeSetIndex::checkInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types) const
{
size_t tuple_size = indexes_mapping.size();
ColumnsWithInfinity left_point;
ColumnsWithInfinity right_point;
left_point.reserve(tuple_size);
right_point.reserve(tuple_size);
for (size_t i = 0; i < tuple_size; ++i)
{
left_point.emplace_back(ordered_set[i]->cloneEmpty());
right_point.emplace_back(ordered_set[i]->cloneEmpty());
}
bool invert_left_infinities = false;
bool invert_right_infinities = false;

View File

@ -234,16 +234,13 @@ public:
bool hasMonotonicFunctionsChain() const;
BoolMask checkInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types);
BoolMask checkInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types) const;
private:
Columns ordered_set;
std::vector<KeyTuplePositionMapping> indexes_mapping;
using ColumnsWithInfinity = std::vector<ValueWithInfinity>;
ColumnsWithInfinity left_point;
ColumnsWithInfinity right_point;
};
}

View File

@ -463,7 +463,7 @@ static Field applyFunctionForField(
return (*block.safeGetByPosition(1).column)[0];
}
static FieldRef applyFunction(FunctionBasePtr & func, const DataTypePtr & current_type, const FieldRef & field)
static FieldRef applyFunction(const FunctionBasePtr & func, const DataTypePtr & current_type, const FieldRef & field)
{
/// Fallback for fields without block reference.
if (field.isExplicit())
@ -1098,10 +1098,10 @@ BoolMask KeyCondition::checkInRange(
std::optional<Range> KeyCondition::applyMonotonicFunctionsChainToRange(
Range key_range,
MonotonicFunctionsChain & functions,
const MonotonicFunctionsChain & functions,
DataTypePtr current_type)
{
for (auto & func : functions)
for (const auto & func : functions)
{
/// We check the monotonicity of each function on a specific range.
IFunction::Monotonicity monotonicity = func->getMonotonicityForRange(

View File

@ -306,7 +306,7 @@ public:
static std::optional<Range> applyMonotonicFunctionsChainToRange(
Range key_range,
MonotonicFunctionsChain & functions,
const MonotonicFunctionsChain & functions,
DataTypePtr current_type);
bool matchesExactContinuousRange() const;
@ -346,10 +346,10 @@ private:
Range range;
size_t key_column = 0;
/// For FUNCTION_IN_SET, FUNCTION_NOT_IN_SET
using MergeTreeSetIndexPtr = std::shared_ptr<MergeTreeSetIndex>;
using MergeTreeSetIndexPtr = std::shared_ptr<const MergeTreeSetIndex>;
MergeTreeSetIndexPtr set_index;
mutable MonotonicFunctionsChain monotonic_functions_chain; /// The function execution does not violate the constancy.
MonotonicFunctionsChain monotonic_functions_chain;
};
using RPN = std::vector<RPNElement>;

View File

@ -112,7 +112,7 @@ size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead(
for (const auto & part : parts)
{
MarkRanges ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings);
MarkRanges ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings, log);
/** In order to get a lower bound on the number of rows that match the condition on PK,
* consider only guaranteed full marks.
@ -173,8 +173,6 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
const unsigned num_streams,
const PartitionIdToMaxBlock * max_block_numbers_to_read) const
{
size_t part_index = 0;
/// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it.
/// The virtual column `_sample_factor` (which is equal to 1 / used sample rate) can be requested in the query.
Names virt_column_names;
@ -557,8 +555,6 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
if (select.prewhere())
prewhere_column = select.prewhere()->getColumnName();
RangesInDataParts parts_with_ranges;
std::vector<std::pair<MergeTreeIndexPtr, MergeTreeIndexConditionPtr>> useful_indices;
for (const auto & index : metadata_snapshot->getSecondaryIndices())
@ -569,37 +565,75 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
useful_indices.emplace_back(index_helper, condition);
}
/// Let's find what range to read from each part.
RangesInDataParts parts_with_ranges(parts.size());
size_t sum_marks = 0;
size_t sum_ranges = 0;
for (auto & part : parts)
{
RangesInDataPart ranges(part, part_index++);
if (metadata_snapshot->hasPrimaryKey())
ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings);
/// Let's find what range to read from each part.
{
auto process_part = [&](size_t part_index)
{
auto & part = parts[part_index];
RangesInDataPart ranges(part, part_index);
if (metadata_snapshot->hasPrimaryKey())
ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings, log);
else
{
size_t total_marks_count = part->getMarksCount();
if (total_marks_count)
{
if (part->index_granularity.hasFinalMark())
--total_marks_count;
ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}};
}
}
for (const auto & index_and_condition : useful_indices)
ranges.ranges = filterMarksUsingIndex(
index_and_condition.first, index_and_condition.second, part, ranges.ranges, settings, reader_settings, log);
if (!ranges.ranges.empty())
parts_with_ranges[part_index] = std::move(ranges);
};
size_t num_threads = std::min(size_t(num_streams), parts.size());
if (num_threads <= 1)
{
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
process_part(part_index);
}
else
{
size_t total_marks_count = part->getMarksCount();
if (total_marks_count)
{
if (part->index_granularity.hasFinalMark())
--total_marks_count;
ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}};
}
/// Parallel loading of data parts.
ThreadPool pool(num_threads);
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
pool.scheduleOrThrowOnError([&, part_index] { process_part(part_index); });
pool.wait();
}
for (const auto & index_and_condition : useful_indices)
ranges.ranges = filterMarksUsingIndex(
index_and_condition.first, index_and_condition.second, part, ranges.ranges, settings, reader_settings);
if (!ranges.ranges.empty())
/// Skip empty ranges.
size_t next_part = 0;
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
{
parts_with_ranges.push_back(ranges);
auto & part = parts_with_ranges[part_index];
if (!part.data_part)
continue;
sum_ranges += ranges.ranges.size();
sum_marks += ranges.getMarksCount();
sum_ranges += part.ranges.size();
sum_marks += part.getMarksCount();
if (next_part != part_index)
std::swap(parts_with_ranges[next_part], part);
++next_part;
}
parts_with_ranges.resize(next_part);
}
LOG_DEBUG(log, "Selected {} parts by date, {} parts by key, {} marks to read from {} ranges", parts.size(), parts_with_ranges.size(), sum_marks, sum_ranges);
@ -1292,7 +1326,8 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
const MergeTreeData::DataPartPtr & part,
const StorageMetadataPtr & metadata_snapshot,
const KeyCondition & key_condition,
const Settings & settings) const
const Settings & settings,
Poco::Logger * log)
{
MarkRanges res;
@ -1499,7 +1534,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
MergeTreeData::DataPartPtr part,
const MarkRanges & ranges,
const Settings & settings,
const MergeTreeReaderSettings & reader_settings) const
const MergeTreeReaderSettings & reader_settings,
Poco::Logger * log)
{
if (!part->volume->getDisk()->exists(part->getFullRelativePath() + index_helper->getFileName() + ".idx"))
{

View File

@ -95,19 +95,21 @@ private:
const KeyCondition & key_condition,
const Settings & settings) const;
MarkRanges markRangesFromPKRange(
static MarkRanges markRangesFromPKRange(
const MergeTreeData::DataPartPtr & part,
const StorageMetadataPtr & metadata_snapshot,
const KeyCondition & key_condition,
const Settings & settings) const;
const Settings & settings,
Poco::Logger * log);
MarkRanges filterMarksUsingIndex(
static MarkRanges filterMarksUsingIndex(
MergeTreeIndexPtr index_helper,
MergeTreeIndexConditionPtr condition,
MergeTreeData::DataPartPtr part,
const MarkRanges & ranges,
const Settings & settings,
const MergeTreeReaderSettings & reader_settings) const;
const MergeTreeReaderSettings & reader_settings,
Poco::Logger * log);
};
}

View File

@ -0,0 +1,12 @@
<test>
<create_query>create table test_parallel_index (x UInt64, y UInt64, z UInt64, INDEX a (y) TYPE minmax GRANULARITY 2,
INDEX b (z) TYPE set(8) GRANULARITY 2) engine = MergeTree order by x partition by bitAnd(x, 63 * 64) settings index_granularity = 4;</create_query>
<fill_query>insert into test_parallel_index select number, number, number from numbers(1048576);</fill_query>
<query>select sum(x) from test_parallel_index where toStartOfDay(toStartOfDay(toStartOfDay(toStartOfDay(toStartOfDay(toStartOfDay(toStartOfDay(toDateTime(x)))))))) in (select toDateTime(number * 8) from numbers(131072));</query>
<query>select sum(y) from test_parallel_index where toStartOfDay(toStartOfDay(toStartOfDay(toStartOfDay(toStartOfDay(toStartOfDay(toStartOfDay(toDateTime(y)))))))) in (select toDateTime(number * 8) from numbers(131072));</query>
<query>select sum(z) from test_parallel_index where z = 2 or z = 7 or z = 13 or z = 17 or z = 19 or z = 23;</query>
<drop_query>drop table if exists test_parallel_index;</drop_query>
</test>