diff --git a/src/Common/FailPoint.cpp b/src/Common/FailPoint.cpp index bc4f2cb43d2..b36d438d1e3 100644 --- a/src/Common/FailPoint.cpp +++ b/src/Common/FailPoint.cpp @@ -64,6 +64,7 @@ static struct InitFiu REGULAR(lazy_pipe_fds_fail_close) \ PAUSEABLE(infinite_sleep) \ PAUSEABLE(stop_moving_part_before_swap_with_active) \ + REGULAR(slowdown_index_analysis) \ namespace FailPoints diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index 03db96dd016..6e0ae8f7cca 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -40,6 +41,8 @@ #include #include #include +#include +#include #include #include #include @@ -74,6 +77,11 @@ namespace ErrorCodes extern const int DUPLICATED_PART_UUIDS; } +namespace FailPoints +{ + extern const char slowdown_index_analysis[]; +} + MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(const MergeTreeData & data_) : data(data_), log(getLogger(data.getLogName() + " (SelectExecutor)")) @@ -528,6 +536,8 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition( } auto query_context = context->hasQueryContext() ? context->getQueryContext() : context; + QueryStatusPtr query_status = context->getProcessListElement(); + PartFilterCounters part_filter_counters; if (query_context->getSettingsRef().allow_experimental_query_deduplication) selectPartsToReadWithUUIDFilter( @@ -549,7 +559,8 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition( minmax_columns_types, partition_pruner, max_block_numbers_to_read, - part_filter_counters); + part_filter_counters, + query_status); index_stats.emplace_back(ReadFromMergeTree::IndexStat{ .type = ReadFromMergeTree::IndexType::None, @@ -649,8 +660,13 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd auto mark_cache = context->getIndexMarkCache(); auto uncompressed_cache = context->getIndexUncompressedCache(); + auto query_status = context->getProcessListElement(); + auto process_part = [&](size_t part_index) { + if (query_status) + query_status->checkTimeLimit(); + auto & part = parts[part_index]; RangesInDataPart ranges(part, part_index); @@ -1545,13 +1561,22 @@ void MergeTreeDataSelectExecutor::selectPartsToRead( const DataTypes & minmax_columns_types, const std::optional & partition_pruner, const PartitionIdToMaxBlock * max_block_numbers_to_read, - PartFilterCounters & counters) + PartFilterCounters & counters, + QueryStatusPtr query_status) { MergeTreeData::DataPartsVector prev_parts; std::swap(prev_parts, parts); for (const auto & part_or_projection : prev_parts) { + if (query_status) + query_status->checkTimeLimit(); + + fiu_do_on(FailPoints::slowdown_index_analysis, + { + sleepForMilliseconds(1000); + }); + const auto * part = part_or_projection->isProjectionPart() ? part_or_projection->getParentPart() : part_or_projection.get(); if (part_values && part_values->find(part->name) == part_values->end()) continue; diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h index 3668eb0ad90..70536b7aa54 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.h @@ -126,7 +126,8 @@ private: const DataTypes & minmax_columns_types, const std::optional & partition_pruner, const PartitionIdToMaxBlock * max_block_numbers_to_read, - PartFilterCounters & counters); + PartFilterCounters & counters, + QueryStatusPtr query_status); /// Same as previous but also skip parts uuids if any to the query context, or skip parts which uuids marked as excluded. static void selectPartsToReadWithUUIDFilter( diff --git a/tests/queries/0_stateless/03176_check_timeout_in_index_analysis.reference b/tests/queries/0_stateless/03176_check_timeout_in_index_analysis.reference new file mode 100644 index 00000000000..05a83c81dae --- /dev/null +++ b/tests/queries/0_stateless/03176_check_timeout_in_index_analysis.reference @@ -0,0 +1,4 @@ +5 +03176_q1 1 0 0 +03176_q2 1 2 0 +03176_q3 0 1 1 diff --git a/tests/queries/0_stateless/03176_check_timeout_in_index_analysis.sql b/tests/queries/0_stateless/03176_check_timeout_in_index_analysis.sql new file mode 100644 index 00000000000..4163ad58c4e --- /dev/null +++ b/tests/queries/0_stateless/03176_check_timeout_in_index_analysis.sql @@ -0,0 +1,32 @@ +-- Tags: no-parallel, no-tsan, no-asan, no-ubsan, no-msan, no-debug, no-fasttest +-- no-parallel because the test uses failpoint + +CREATE TABLE t_03176(k UInt64, v UInt64) ENGINE=MergeTree() ORDER BY k PARTITION BY k; + +INSERT INTO t_03176 SELECT number, number FROM numbers(5); + +-- Table is partitioned by k to so it will have 5 partitions +SELECT count() FROM system.parts WHERE database = currentDatabase() AND table = 't_03176' AND active; + +-- This query is fast without failpoint: should take < 1 sec +EXPLAIN indexes = 1 SELECT * FROM t_03176 ORDER BY k LIMIT 5 SETTINGS log_comment = '03176_q1' FORMAT Null; + +SYSTEM ENABLE FAILPOINT slowdown_index_analysis; + +-- Check that failpont actually works: the query should take >= 5 sec +EXPLAIN indexes = 1 SELECT * FROM t_03176 ORDER BY k LIMIT 5 SETTINGS log_comment = '03176_q2' FORMAT Null; + +-- Now the query should be cancelled after about 1 sec +EXPLAIN indexes = 1 SELECT * FROM t_03176 ORDER BY k LIMIT 5 SETTINGS log_comment = '03176_q3', max_execution_time = 1.1 FORMAT Null; -- { serverError TIMEOUT_EXCEEDED } + +SYSTEM DISABLE FAILPOINT slowdown_index_analysis; + +SYSTEM FLUSH LOGS; + +-- Check that q1 was fast, q2 was slow and q3 had timeout +SELECT log_comment, type = 'QueryFinish', intDiv(query_duration_ms, 2000), length(exception) > 0 +FROM system.query_log +WHERE current_database = currentDatabase() AND log_comment LIKE '03176_q_' AND type IN ('QueryFinish', 'ExceptionBeforeStart') +ORDER BY log_comment; + +DROP TABLE t_03176;