This commit is contained in:
Alexander Gololobov 2024-09-16 15:25:37 +02:00 committed by GitHub
commit 996caaa06f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 66 additions and 3 deletions

View File

@ -64,6 +64,7 @@ static struct InitFiu
REGULAR(lazy_pipe_fds_fail_close) \
PAUSEABLE(infinite_sleep) \
PAUSEABLE(stop_moving_part_before_swap_with_active) \
REGULAR(slowdown_index_analysis) \
namespace FailPoints

View File

@ -23,6 +23,7 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProcessList.h>
#include <Processors/ConcatProcessor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
@ -40,6 +41,8 @@
#include <Core/UUID.h>
#include <Core/Settings.h>
#include <Common/CurrentMetrics.h>
#include <Common/FailPoint.h>
#include <base/sleep.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeUUID.h>
@ -74,6 +77,11 @@ namespace ErrorCodes
extern const int DUPLICATED_PART_UUIDS;
}
namespace FailPoints
{
extern const char slowdown_index_analysis[];
}
MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(const MergeTreeData & data_)
: data(data_), log(getLogger(data.getLogName() + " (SelectExecutor)"))
@ -528,6 +536,8 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition(
}
auto query_context = context->hasQueryContext() ? context->getQueryContext() : context;
QueryStatusPtr query_status = context->getProcessListElement();
PartFilterCounters part_filter_counters;
if (query_context->getSettingsRef().allow_experimental_query_deduplication)
selectPartsToReadWithUUIDFilter(
@ -549,7 +559,8 @@ void MergeTreeDataSelectExecutor::filterPartsByPartition(
minmax_columns_types,
partition_pruner,
max_block_numbers_to_read,
part_filter_counters);
part_filter_counters,
query_status);
index_stats.emplace_back(ReadFromMergeTree::IndexStat{
.type = ReadFromMergeTree::IndexType::None,
@ -649,8 +660,13 @@ RangesInDataParts MergeTreeDataSelectExecutor::filterPartsByPrimaryKeyAndSkipInd
auto mark_cache = context->getIndexMarkCache();
auto uncompressed_cache = context->getIndexUncompressedCache();
auto query_status = context->getProcessListElement();
auto process_part = [&](size_t part_index)
{
if (query_status)
query_status->checkTimeLimit();
auto & part = parts[part_index];
RangesInDataPart ranges(part, part_index);
@ -1545,13 +1561,22 @@ void MergeTreeDataSelectExecutor::selectPartsToRead(
const DataTypes & minmax_columns_types,
const std::optional<PartitionPruner> & partition_pruner,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
PartFilterCounters & counters)
PartFilterCounters & counters,
QueryStatusPtr query_status)
{
MergeTreeData::DataPartsVector prev_parts;
std::swap(prev_parts, parts);
for (const auto & part_or_projection : prev_parts)
{
if (query_status)
query_status->checkTimeLimit();
fiu_do_on(FailPoints::slowdown_index_analysis,
{
sleepForMilliseconds(1000);
});
const auto * part = part_or_projection->isProjectionPart() ? part_or_projection->getParentPart() : part_or_projection.get();
if (part_values && part_values->find(part->name) == part_values->end())
continue;

View File

@ -126,7 +126,8 @@ private:
const DataTypes & minmax_columns_types,
const std::optional<PartitionPruner> & partition_pruner,
const PartitionIdToMaxBlock * max_block_numbers_to_read,
PartFilterCounters & counters);
PartFilterCounters & counters,
QueryStatusPtr query_status);
/// Same as previous but also skip parts uuids if any to the query context, or skip parts which uuids marked as excluded.
static void selectPartsToReadWithUUIDFilter(

View File

@ -0,0 +1,4 @@
5
03176_q1 1 0 0
03176_q2 1 2 0
03176_q3 0 1 1

View File

@ -0,0 +1,32 @@
-- Tags: no-parallel, no-tsan, no-asan, no-ubsan, no-msan, no-debug, no-fasttest
-- no-parallel because the test uses failpoint
CREATE TABLE t_03176(k UInt64, v UInt64) ENGINE=MergeTree() ORDER BY k PARTITION BY k;
INSERT INTO t_03176 SELECT number, number FROM numbers(5);
-- Table is partitioned by k to so it will have 5 partitions
SELECT count() FROM system.parts WHERE database = currentDatabase() AND table = 't_03176' AND active;
-- This query is fast without failpoint: should take < 1 sec
EXPLAIN indexes = 1 SELECT * FROM t_03176 ORDER BY k LIMIT 5 SETTINGS log_comment = '03176_q1' FORMAT Null;
SYSTEM ENABLE FAILPOINT slowdown_index_analysis;
-- Check that failpont actually works: the query should take >= 5 sec
EXPLAIN indexes = 1 SELECT * FROM t_03176 ORDER BY k LIMIT 5 SETTINGS log_comment = '03176_q2' FORMAT Null;
-- Now the query should be cancelled after about 1 sec
EXPLAIN indexes = 1 SELECT * FROM t_03176 ORDER BY k LIMIT 5 SETTINGS log_comment = '03176_q3', max_execution_time = 1.1 FORMAT Null; -- { serverError TIMEOUT_EXCEEDED }
SYSTEM DISABLE FAILPOINT slowdown_index_analysis;
SYSTEM FLUSH LOGS;
-- Check that q1 was fast, q2 was slow and q3 had timeout
SELECT log_comment, type = 'QueryFinish', intDiv(query_duration_ms, 2000), length(exception) > 0
FROM system.query_log
WHERE current_database = currentDatabase() AND log_comment LIKE '03176_q_' AND type IN ('QueryFinish', 'ExceptionBeforeStart')
ORDER BY log_comment;
DROP TABLE t_03176;