From b637699ccd07db2318468698705a5fa3b3e926a0 Mon Sep 17 00:00:00 2001 From: roman Date: Thu, 13 Aug 2020 16:03:29 +0100 Subject: [PATCH 1/3] [mergeTree]: fail fast if max_rows_to_read limit exceeded on parts scan The motivation behind this change is to skip ranges scan for all selected parts if it is clear that `max_rows_to_read` is already exceeded. The change is quite noticeable for queries over big number of parts. --- .../MergeTree/MergeTreeDataSelectExecutor.cpp | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 2415ce75e77..de6d1793144 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -79,6 +79,7 @@ namespace ErrorCodes extern const int ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER; extern const int ILLEGAL_COLUMN; extern const int ARGUMENT_OUT_OF_BOUND; + extern const int TOO_MANY_ROWS; } @@ -573,6 +574,8 @@ Pipes MergeTreeDataSelectExecutor::readFromParts( /// Let's find what range to read from each part. { + std::atomic total_rows {0}; + auto process_part = [&](size_t part_index) { auto & part = parts[part_index]; @@ -599,7 +602,20 @@ Pipes MergeTreeDataSelectExecutor::readFromParts( index_and_condition.first, index_and_condition.second, part, ranges.ranges, settings, reader_settings, log); if (!ranges.ranges.empty()) + { + if (settings.read_overflow_mode == OverflowMode::THROW && settings.max_rows_to_read) + { + /// Fail fast if estimated number of rows to read exceeds the limit + size_t total_rows_estimate = total_rows.fetch_add(ranges.getRowsCount()); + if (total_rows_estimate > settings.max_rows_to_read) + throw Exception( + "Limit for rows exceeded, max rows: " + formatReadableQuantity(settings.max_rows_to_read) + + ", current rows: " + formatReadableQuantity(total_rows_estimate), + ErrorCodes::TOO_MANY_ROWS); + } + parts_with_ranges[part_index] = std::move(ranges); + } }; size_t num_threads = std::min(size_t(num_streams), parts.size()); From 35e28b4c6bb26ffc44f9f34e6a7bd9b921d0e435 Mon Sep 17 00:00:00 2001 From: roman Date: Mon, 17 Aug 2020 09:52:04 +0100 Subject: [PATCH 2/3] [mergeTree]: make exception message more clear --- src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index de6d1793144..7fd0d2e7945 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -606,11 +606,14 @@ Pipes MergeTreeDataSelectExecutor::readFromParts( if (settings.read_overflow_mode == OverflowMode::THROW && settings.max_rows_to_read) { /// Fail fast if estimated number of rows to read exceeds the limit - size_t total_rows_estimate = total_rows.fetch_add(ranges.getRowsCount()); + auto current_rows_estimate = ranges.getRowsCount(); + size_t prev_total_rows_estimate = total_rows.fetch_add(current_rows_estimate); + size_t total_rows_estimate = current_rows_estimate + prev_total_rows_estimate; if (total_rows_estimate > settings.max_rows_to_read) throw Exception( - "Limit for rows exceeded, max rows: " + formatReadableQuantity(settings.max_rows_to_read) - + ", current rows: " + formatReadableQuantity(total_rows_estimate), + "Limit for rows (controlled by 'max_rows_to_read' setting) exceeded, max rows: " + + formatReadableQuantity(settings.max_rows_to_read) + + ", estimated rows to read (at least): " + formatReadableQuantity(total_rows_estimate), ErrorCodes::TOO_MANY_ROWS); } From ac72148c3e6b27765a7eb80c8c438de019a9eb73 Mon Sep 17 00:00:00 2001 From: roman Date: Tue, 18 Aug 2020 13:22:54 +0100 Subject: [PATCH 3/3] [tests]: adjust staeless/01064 test `max_rows_to_read` limits With default index_granularity=8096 CH always reads at least that number of rows per selected part, but limit is checked afterwards. The optimization that interrupts execution of queries based on approx number of rows to read breaks the test. This means that test case is potetntially contains incorrect limits. --- ...cremental_streaming_from_2_src_with_feedback.sql | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/tests/queries/0_stateless/01064_incremental_streaming_from_2_src_with_feedback.sql b/tests/queries/0_stateless/01064_incremental_streaming_from_2_src_with_feedback.sql index 1216b06eeb2..a653206fe18 100644 --- a/tests/queries/0_stateless/01064_incremental_streaming_from_2_src_with_feedback.sql +++ b/tests/queries/0_stateless/01064_incremental_streaming_from_2_src_with_feedback.sql @@ -12,6 +12,7 @@ DROP TABLE IF EXISTS mv_checkouts2target; -- that is the final table, which is filled incrementally from 2 different sources CREATE TABLE target_table Engine=SummingMergeTree() ORDER BY id +SETTINGS index_granularity=128 AS SELECT number as id, @@ -85,12 +86,20 @@ INSERT INTO logins SELECT number as id, '2000-01-01 08:00:00' from numbers(50 INSERT INTO checkouts SELECT number as id, '2000-01-01 10:00:00' from numbers(50000); -- ensure that we don't read whole target table during join -set max_rows_to_read = 2000; +-- by this time we should have 3 parts for target_table because of prev inserts +-- and we plan to make two more inserts. With index_granularity=128 and max id=1000 +-- we expect to read not more than: +-- (1000/128) marks per part * (3 + 2) parts * 128 granularity = 5120 rows +set max_rows_to_read = 5120; INSERT INTO logins SELECT number as id, '2000-01-01 11:00:00' from numbers(1000); INSERT INTO checkouts SELECT number as id, '2000-01-01 11:10:00' from numbers(1000); -set max_rows_to_read = 10; +-- by this time we should have 5 parts for target_table because of prev inserts +-- and we plan to make two more inserts. With index_granularity=128 and max id=1 +-- we expect to read not more than: +-- 1 mark per part * (5 + 2) parts * 128 granularity = 896 rows +set max_rows_to_read = 896; INSERT INTO logins SELECT number+2 as id, '2001-01-01 11:10:01' from numbers(1); INSERT INTO checkouts SELECT number+2 as id, '2001-01-01 11:10:02' from numbers(1);