From 72622a9b0074b88853bb111dbeb68394e5fbcd56 Mon Sep 17 00:00:00 2001 From: Ivan Babrou Date: Sun, 19 Jul 2020 21:43:10 -0700 Subject: [PATCH 1/9] Parallelize PK range and skipping index stages This runs PK lookup and skipping index stages on parts in parallel, as described in #11564. While #12277 sped up PK lookups, skipping index stage may still be a bottleneck in a select query. Here we parallelize both stages between parts. On a query that uses a bloom filter skipping index to pick 2,688 rows out of 8,273,114,994 on a two day time span, this change reduces latency from 10.5s to 1.5s. --- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 58 ++++++++++++------- 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 306bcd9000a..e2294ea91ad 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -569,39 +569,55 @@ Pipes MergeTreeDataSelectExecutor::readFromParts( useful_indices.emplace_back(index_helper, condition); } + /// Parallel loading of data parts. + size_t num_threads = std::min(size_t(num_streams), parts.size()); + + std::mutex mutex; + + ThreadPool pool(num_threads); + /// Let's find what range to read from each part. size_t sum_marks = 0; size_t sum_ranges = 0; - for (auto & part : parts) + for (size_t i = 0; i < parts.size(); ++i) { - RangesInDataPart ranges(part, part_index++); - - if (metadata_snapshot->hasPrimaryKey()) - ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings); - else + pool.scheduleOrThrowOnError([&, i] { - size_t total_marks_count = part->getMarksCount(); - if (total_marks_count) + auto & part = parts[i]; + + RangesInDataPart ranges(part, part_index++); + + if (metadata_snapshot->hasPrimaryKey()) + ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings); + else { - if (part->index_granularity.hasFinalMark()) - --total_marks_count; - ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}}; + size_t total_marks_count = part->getMarksCount(); + if (total_marks_count) + { + if (part->index_granularity.hasFinalMark()) + --total_marks_count; + ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}}; + } } - } - for (const auto & index_and_condition : useful_indices) - ranges.ranges = filterMarksUsingIndex( - index_and_condition.first, index_and_condition.second, part, ranges.ranges, settings, reader_settings); + for (const auto & index_and_condition : useful_indices) + ranges.ranges = filterMarksUsingIndex( + index_and_condition.first, index_and_condition.second, part, ranges.ranges, settings, reader_settings); - if (!ranges.ranges.empty()) - { - parts_with_ranges.push_back(ranges); + if (!ranges.ranges.empty()) + { + std::lock_guard loading_lock(mutex); - sum_ranges += ranges.ranges.size(); - sum_marks += ranges.getMarksCount(); - } + parts_with_ranges.push_back(ranges); + + sum_ranges += ranges.ranges.size(); + sum_marks += ranges.getMarksCount(); + } + }); } + pool.wait(); + LOG_DEBUG(log, "Selected {} parts by date, {} parts by key, {} marks to read from {} ranges", parts.size(), parts_with_ranges.size(), sum_marks, sum_ranges); if (parts_with_ranges.empty()) From 0cc55781d88e83fdd3b8c653f5b61129fc8f02c6 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Mon, 20 Jul 2020 18:09:00 +0300 Subject: [PATCH 2/9] Try fix tests. --- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 96 ++++++++++--------- .../MergeTree/MergeTreeDataSelectExecutor.h | 10 +- 2 files changed, 59 insertions(+), 47 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index e2294ea91ad..3033bc8a5f1 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -112,7 +112,7 @@ size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead( for (const auto & part : parts) { - MarkRanges ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings); + MarkRanges ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings, log); /** In order to get a lower bound on the number of rows that match the condition on PK, * consider only guaranteed full marks. @@ -173,8 +173,6 @@ Pipes MergeTreeDataSelectExecutor::readFromParts( const unsigned num_streams, const PartitionIdToMaxBlock * max_block_numbers_to_read) const { - size_t part_index = 0; - /// If query contains restrictions on the virtual column `_part` or `_part_index`, select only parts suitable for it. /// The virtual column `_sample_factor` (which is equal to 1 / used sample rate) can be requested in the query. Names virt_column_names; @@ -557,8 +555,6 @@ Pipes MergeTreeDataSelectExecutor::readFromParts( if (select.prewhere()) prewhere_column = select.prewhere()->getColumnName(); - RangesInDataParts parts_with_ranges; - std::vector> useful_indices; for (const auto & index : metadata_snapshot->getSecondaryIndices()) @@ -569,55 +565,67 @@ Pipes MergeTreeDataSelectExecutor::readFromParts( useful_indices.emplace_back(index_helper, condition); } - /// Parallel loading of data parts. - size_t num_threads = std::min(size_t(num_streams), parts.size()); - - std::mutex mutex; - - ThreadPool pool(num_threads); - - /// Let's find what range to read from each part. + RangesInDataParts parts_with_ranges(parts.size()); size_t sum_marks = 0; size_t sum_ranges = 0; - for (size_t i = 0; i < parts.size(); ++i) + + /// Let's find what range to read from each part. { - pool.scheduleOrThrowOnError([&, i] + + /// Parallel loading of data parts. + size_t num_threads = std::min(size_t(num_streams), parts.size()); + ThreadPool pool(num_threads); + + for (size_t part_index = 0; part_index < parts.size(); ++part_index) { - auto & part = parts[i]; - - RangesInDataPart ranges(part, part_index++); - - if (metadata_snapshot->hasPrimaryKey()) - ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings); - else + pool.scheduleOrThrowOnError([&, part_index] { - size_t total_marks_count = part->getMarksCount(); - if (total_marks_count) + auto & part = parts[part_index]; + + RangesInDataPart ranges(part, part_index); + + if (metadata_snapshot->hasPrimaryKey()) + ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings, log); + else { - if (part->index_granularity.hasFinalMark()) - --total_marks_count; - ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}}; + size_t total_marks_count = part->getMarksCount(); + if (total_marks_count) + { + if (part->index_granularity.hasFinalMark()) + --total_marks_count; + ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}}; + } } - } - for (const auto & index_and_condition : useful_indices) - ranges.ranges = filterMarksUsingIndex( - index_and_condition.first, index_and_condition.second, part, ranges.ranges, settings, reader_settings); + for (const auto & index_and_condition : useful_indices) + ranges.ranges = filterMarksUsingIndex( + index_and_condition.first, index_and_condition.second, part, ranges.ranges, settings, reader_settings, log); - if (!ranges.ranges.empty()) - { - std::lock_guard loading_lock(mutex); + if (!ranges.ranges.empty()) + parts_with_ranges[part_index] = std::move(ranges); + }); + } - parts_with_ranges.push_back(ranges); + pool.wait(); - sum_ranges += ranges.ranges.size(); - sum_marks += ranges.getMarksCount(); - } - }); + /// Skip empty ranges. + size_t next_part = 0; + for (size_t part_index = 0; part_index < parts.size(); ++part_index) + { + auto & part = parts_with_ranges[part_index]; + if (!part.data_part) + continue; + + sum_ranges += part.ranges.size(); + sum_marks += part.getMarksCount(); + + if (next_part != part_index) + std::swap(parts_with_ranges[next_part], part); + + ++next_part; + } } - pool.wait(); - LOG_DEBUG(log, "Selected {} parts by date, {} parts by key, {} marks to read from {} ranges", parts.size(), parts_with_ranges.size(), sum_marks, sum_ranges); if (parts_with_ranges.empty()) @@ -1308,7 +1316,8 @@ MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange( const MergeTreeData::DataPartPtr & part, const StorageMetadataPtr & metadata_snapshot, const KeyCondition & key_condition, - const Settings & settings) const + const Settings & settings, + Poco::Logger * log) { MarkRanges res; @@ -1515,7 +1524,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex( MergeTreeData::DataPartPtr part, const MarkRanges & ranges, const Settings & settings, - const MergeTreeReaderSettings & reader_settings) const + const MergeTreeReaderSettings & reader_settings, + Poco::Logger * log) { if (!part->volume->getDisk()->exists(part->getFullRelativePath() + index_helper->getFileName() + ".idx")) { diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index 52d00546a05..9e37d150bd1 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -95,19 +95,21 @@ private: const KeyCondition & key_condition, const Settings & settings) const; - MarkRanges markRangesFromPKRange( + static MarkRanges markRangesFromPKRange( const MergeTreeData::DataPartPtr & part, const StorageMetadataPtr & metadata_snapshot, const KeyCondition & key_condition, - const Settings & settings) const; + const Settings & settings, + Poco::Logger * log); - MarkRanges filterMarksUsingIndex( + static MarkRanges filterMarksUsingIndex( MergeTreeIndexPtr index_helper, MergeTreeIndexConditionPtr condition, MergeTreeData::DataPartPtr part, const MarkRanges & ranges, const Settings & settings, - const MergeTreeReaderSettings & reader_settings) const; + const MergeTreeReaderSettings & reader_settings, + Poco::Logger * log); }; } From 12c5e376c60e275f9a1a36f1c6fca4c321e4db7f Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 21 Jul 2020 14:02:58 +0300 Subject: [PATCH 3/9] Remove mutable from RPNElement. --- src/Storages/MergeTree/KeyCondition.cpp | 6 +++--- src/Storages/MergeTree/KeyCondition.h | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Storages/MergeTree/KeyCondition.cpp b/src/Storages/MergeTree/KeyCondition.cpp index 79bbc0e7216..970e32f2a70 100644 --- a/src/Storages/MergeTree/KeyCondition.cpp +++ b/src/Storages/MergeTree/KeyCondition.cpp @@ -463,7 +463,7 @@ static Field applyFunctionForField( return (*block.safeGetByPosition(1).column)[0]; } -static FieldRef applyFunction(FunctionBasePtr & func, const DataTypePtr & current_type, const FieldRef & field) +static FieldRef applyFunction(const FunctionBasePtr & func, const DataTypePtr & current_type, const FieldRef & field) { /// Fallback for fields without block reference. if (field.isExplicit()) @@ -1098,10 +1098,10 @@ BoolMask KeyCondition::checkInRange( std::optional KeyCondition::applyMonotonicFunctionsChainToRange( Range key_range, - MonotonicFunctionsChain & functions, + const MonotonicFunctionsChain & functions, DataTypePtr current_type) { - for (auto & func : functions) + for (const auto & func : functions) { /// We check the monotonicity of each function on a specific range. IFunction::Monotonicity monotonicity = func->getMonotonicityForRange( diff --git a/src/Storages/MergeTree/KeyCondition.h b/src/Storages/MergeTree/KeyCondition.h index a37af2d677b..1f2d2f41718 100644 --- a/src/Storages/MergeTree/KeyCondition.h +++ b/src/Storages/MergeTree/KeyCondition.h @@ -306,7 +306,7 @@ public: static std::optional applyMonotonicFunctionsChainToRange( Range key_range, - MonotonicFunctionsChain & functions, + const MonotonicFunctionsChain & functions, DataTypePtr current_type); bool matchesExactContinuousRange() const; @@ -349,7 +349,7 @@ private: using MergeTreeSetIndexPtr = std::shared_ptr; MergeTreeSetIndexPtr set_index; - mutable MonotonicFunctionsChain monotonic_functions_chain; /// The function execution does not violate the constancy. + MonotonicFunctionsChain monotonic_functions_chain; }; using RPN = std::vector; From 2759d5b6ad80cfd49238638f152c817c5d66f8f9 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 21 Jul 2020 14:13:35 +0300 Subject: [PATCH 4/9] Make MergeTreeSetIndex::checkInRange const. --- src/Interpreters/Set.cpp | 20 ++++++++++---------- src/Interpreters/Set.h | 5 +---- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/src/Interpreters/Set.cpp b/src/Interpreters/Set.cpp index f331f3cecb3..9f7d0606e4b 100644 --- a/src/Interpreters/Set.cpp +++ b/src/Interpreters/Set.cpp @@ -473,18 +473,9 @@ MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vectorcloneEmpty()); - right_point.emplace_back(ordered_set[i]->cloneEmpty()); - } - Block block_to_sort; SortDescription sort_description; for (size_t i = 0; i < tuple_size; ++i) @@ -504,10 +495,19 @@ MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector & key_ranges, const DataTypes & data_types) +BoolMask MergeTreeSetIndex::checkInRange(const std::vector & key_ranges, const DataTypes & data_types) const { size_t tuple_size = indexes_mapping.size(); + ColumnsWithInfinity left_point(tuple_size); + ColumnsWithInfinity right_point(tuple_size); + + for (size_t i = 0; i < tuple_size; ++i) + { + left_point.emplace_back(ordered_set[i]->cloneEmpty()); + right_point.emplace_back(ordered_set[i]->cloneEmpty()); + } + bool invert_left_infinities = false; bool invert_right_infinities = false; diff --git a/src/Interpreters/Set.h b/src/Interpreters/Set.h index 933bace5e45..4df89831896 100644 --- a/src/Interpreters/Set.h +++ b/src/Interpreters/Set.h @@ -234,16 +234,13 @@ public: bool hasMonotonicFunctionsChain() const; - BoolMask checkInRange(const std::vector & key_ranges, const DataTypes & data_types); + BoolMask checkInRange(const std::vector & key_ranges, const DataTypes & data_types) const; private: Columns ordered_set; std::vector indexes_mapping; using ColumnsWithInfinity = std::vector; - - ColumnsWithInfinity left_point; - ColumnsWithInfinity right_point; }; } From 62a2f9819e372b0e6728297267c3d8deb5719fb8 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 21 Jul 2020 14:15:40 +0300 Subject: [PATCH 5/9] Make MergeTreeSetIndex::checkInRange const. --- src/Interpreters/Set.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Interpreters/Set.cpp b/src/Interpreters/Set.cpp index 9f7d0606e4b..07bd574a197 100644 --- a/src/Interpreters/Set.cpp +++ b/src/Interpreters/Set.cpp @@ -499,8 +499,10 @@ BoolMask MergeTreeSetIndex::checkInRange(const std::vector & key_ranges, { size_t tuple_size = indexes_mapping.size(); - ColumnsWithInfinity left_point(tuple_size); - ColumnsWithInfinity right_point(tuple_size); + ColumnsWithInfinity left_point; + ColumnsWithInfinity right_point; + left_point.reserve(tuple_size); + right_point.reserve(tuple_size); for (size_t i = 0; i < tuple_size; ++i) { From 755f15def3206ad4e670cec0908b3e8b56aac532 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 21 Jul 2020 14:22:45 +0300 Subject: [PATCH 6/9] Make MergeTreeSetIndex::checkInRange const. --- src/Storages/MergeTree/KeyCondition.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Storages/MergeTree/KeyCondition.h b/src/Storages/MergeTree/KeyCondition.h index 1f2d2f41718..39b6368cddc 100644 --- a/src/Storages/MergeTree/KeyCondition.h +++ b/src/Storages/MergeTree/KeyCondition.h @@ -346,7 +346,7 @@ private: Range range; size_t key_column = 0; /// For FUNCTION_IN_SET, FUNCTION_NOT_IN_SET - using MergeTreeSetIndexPtr = std::shared_ptr; + using MergeTreeSetIndexPtr = std::shared_ptr; MergeTreeSetIndexPtr set_index; MonotonicFunctionsChain monotonic_functions_chain; From 486a4932c39715568b1a8a37df64d141ab144b4a Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 21 Jul 2020 17:08:18 +0300 Subject: [PATCH 7/9] Fix tests. --- src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 3033bc8a5f1..4d5c919311a 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -624,6 +624,8 @@ Pipes MergeTreeDataSelectExecutor::readFromParts( ++next_part; } + + parts_with_ranges.resize(next_part); } LOG_DEBUG(log, "Selected {} parts by date, {} parts by key, {} marks to read from {} ranges", parts.size(), parts_with_ranges.size(), sum_marks, sum_ranges); From da4a66b4436dc3ba750742dd8fbca36bb229e6c1 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Tue, 21 Jul 2020 19:28:30 +0300 Subject: [PATCH 8/9] Added perftest. --- tests/performance/parallel_index.xml | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 tests/performance/parallel_index.xml diff --git a/tests/performance/parallel_index.xml b/tests/performance/parallel_index.xml new file mode 100644 index 00000000000..033e47ee8ee --- /dev/null +++ b/tests/performance/parallel_index.xml @@ -0,0 +1,12 @@ + + create table test_parallel_index (x UInt64, y UInt64, z UInt64, INDEX a (y) TYPE minmax GRANULARITY 2, + INDEX b (z) TYPE set(8) GRANULARITY 2) engine = MergeTree order by x partition by bitAnd(x, 63 * 64) settings index_granularity = 4; + + insert into test_parallel_index select number, number, number from numbers(1048576); + + select sum(x) from test_parallel_index where toStartOfDay(toStartOfDay(toStartOfDay(toStartOfDay(toStartOfDay(toStartOfDay(toStartOfDay(toDateTime(x)))))))) in (select toDateTime(number * 8) from numbers(131072)); + select sum(y) from test_parallel_index where toStartOfDay(toStartOfDay(toStartOfDay(toStartOfDay(toStartOfDay(toStartOfDay(toStartOfDay(toDateTime(y)))))))) in (select toDateTime(number * 8) from numbers(131072)); + select sum(z) from test_parallel_index where z = 2 or z = 7 or z = 13 or z = 17 or z = 19 or z = 23; + + drop table if exists test_parallel_index; + From b27066389ab8c5727483a1d65f6ff169724c2ec5 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 22 Jul 2020 14:51:35 +0300 Subject: [PATCH 9/9] Do not create ThreadPool for single thread. --- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 64 +++++++++++-------- 1 file changed, 36 insertions(+), 28 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 4d5c919311a..6df0d0ceabe 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -571,42 +571,50 @@ Pipes MergeTreeDataSelectExecutor::readFromParts( /// Let's find what range to read from each part. { - - /// Parallel loading of data parts. - size_t num_threads = std::min(size_t(num_streams), parts.size()); - ThreadPool pool(num_threads); - - for (size_t part_index = 0; part_index < parts.size(); ++part_index) + auto process_part = [&](size_t part_index) { - pool.scheduleOrThrowOnError([&, part_index] + auto & part = parts[part_index]; + + RangesInDataPart ranges(part, part_index); + + if (metadata_snapshot->hasPrimaryKey()) + ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings, log); + else { - auto & part = parts[part_index]; - - RangesInDataPart ranges(part, part_index); - - if (metadata_snapshot->hasPrimaryKey()) - ranges.ranges = markRangesFromPKRange(part, metadata_snapshot, key_condition, settings, log); - else + size_t total_marks_count = part->getMarksCount(); + if (total_marks_count) { - size_t total_marks_count = part->getMarksCount(); - if (total_marks_count) - { - if (part->index_granularity.hasFinalMark()) - --total_marks_count; - ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}}; - } + if (part->index_granularity.hasFinalMark()) + --total_marks_count; + ranges.ranges = MarkRanges{MarkRange{0, total_marks_count}}; } + } - for (const auto & index_and_condition : useful_indices) - ranges.ranges = filterMarksUsingIndex( + for (const auto & index_and_condition : useful_indices) + ranges.ranges = filterMarksUsingIndex( index_and_condition.first, index_and_condition.second, part, ranges.ranges, settings, reader_settings, log); - if (!ranges.ranges.empty()) - parts_with_ranges[part_index] = std::move(ranges); - }); - } + if (!ranges.ranges.empty()) + parts_with_ranges[part_index] = std::move(ranges); + }; - pool.wait(); + size_t num_threads = std::min(size_t(num_streams), parts.size()); + + if (num_threads <= 1) + { + for (size_t part_index = 0; part_index < parts.size(); ++part_index) + process_part(part_index); + } + else + { + /// Parallel loading of data parts. + ThreadPool pool(num_threads); + + for (size_t part_index = 0; part_index < parts.size(); ++part_index) + pool.scheduleOrThrowOnError([&, part_index] { process_part(part_index); }); + + pool.wait(); + } /// Skip empty ranges. size_t next_part = 0;