fix intersecting parts, better fault injections

This commit is contained in:
Alexander Tokmakov 2023-03-16 21:28:07 +01:00
parent 8495deb7e3
commit d621b2c4ad
13 changed files with 138 additions and 5 deletions

View File

@ -41,6 +41,9 @@ if [ "$is_tsan_build" -eq "0" ]; then
export THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_TIME_US=10000
export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_TIME_US=10000
export THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_TIME_US=10000
export THREAD_FUZZER_EXPLICIT_SLEEP_PROBABILITY=0.01
export THREAD_FUZZER_EXPLICIT_MEMORY_EXCEPTION_PROBABILITY=0.01
fi
export ZOOKEEPER_FAULT_INJECTION=1

View File

@ -118,3 +118,9 @@ void CurrentMemoryTracker::free(Int64 size)
}
}
void CurrentMemoryTracker::injectFault()
{
if (auto * memory_tracker = getMemoryTracker())
memory_tracker->injectFault();
}

View File

@ -14,6 +14,9 @@ struct CurrentMemoryTracker
static void free(Int64 size);
static void check();
/// Throws MEMORY_LIMIT_EXCEEDED (if it's allowed to throw exceptions)
static void injectFault();
private:
static void allocImpl(Int64 size, bool throw_if_memory_exceeded);
};

View File

@ -134,6 +134,27 @@ void MemoryTracker::logMemoryUsage(Int64 current) const
"Current memory usage{}: {}.", (description ? " " + std::string(description) : ""), ReadableSize(current));
}
void MemoryTracker::injectFault() const
{
if (!memoryTrackerCanThrow(level, true))
{
LOG_WARNING(&Poco::Logger::get("MemoryTracker"),
"Cannot inject fault at specific point. Uncaught exceptions: {}, stack trace:\n{}",
std::uncaught_exceptions(), StackTrace().toString());
return;
}
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
const auto * description = description_ptr.load(std::memory_order_relaxed);
throw DB::Exception(
DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED,
"Memory tracker{}{}: fault injected (at specific point)",
description ? " " : "",
description ? description : "");
}
void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryTracker * query_tracker)
{

View File

@ -141,6 +141,8 @@ public:
fault_probability = value;
}
void injectFault() const;
void setSampleProbability(double value)
{
sample_probability = value;

View File

@ -109,6 +109,8 @@ void ThreadFuzzer::initConfiguration()
initFromEnv(migrate_probability, "THREAD_FUZZER_MIGRATE_PROBABILITY");
initFromEnv(sleep_probability, "THREAD_FUZZER_SLEEP_PROBABILITY");
initFromEnv(sleep_time_us, "THREAD_FUZZER_SLEEP_TIME_US");
initFromEnv(explicit_sleep_probability, "THREAD_FUZZER_EXPLICIT_SLEEP_PROBABILITY");
initFromEnv(explicit_memory_exception_probability, "THREAD_FUZZER_EXPLICIT_MEMORY_EXCEPTION_PROBABILITY");
#if THREAD_FUZZER_WRAP_PTHREAD
# define INIT_WRAPPER_PARAMS(RET, NAME, ...) \
@ -225,14 +227,28 @@ static void injection(
void ThreadFuzzer::maybeInjectSleep()
{
auto & fuzzer = ThreadFuzzer::instance();
injection(fuzzer.yield_probability, fuzzer.migrate_probability, fuzzer.sleep_probability, fuzzer.sleep_time_us);
injection(fuzzer.yield_probability, fuzzer.migrate_probability, fuzzer.explicit_sleep_probability, fuzzer.sleep_time_us);
}
/// Sometimes maybeInjectSleep() is not enough and we need to inject an exception.
/// The most suitable exception for this purpose is MEMORY_LIMIT_EXCEEDED: it can be thrown almost from everywhere.
/// NOTE We also have a query setting fault_probability, but it does not work for background operations (maybe we should fix it).
void ThreadFuzzer::maybeInjectMemoryLimitException()
{
auto & fuzzer = ThreadFuzzer::instance();
if (fuzzer.explicit_memory_exception_probability <= 0.0)
return;
std::bernoulli_distribution fault(fuzzer.explicit_memory_exception_probability);
if (fault(thread_local_rng))
CurrentMemoryTracker::injectFault();
}
void ThreadFuzzer::signalHandler(int)
{
DENY_ALLOCATIONS_IN_SCOPE;
auto saved_errno = errno;
maybeInjectSleep();
auto & fuzzer = ThreadFuzzer::instance();
injection(fuzzer.yield_probability, fuzzer.migrate_probability, fuzzer.sleep_probability, fuzzer.sleep_time_us);
errno = saved_errno;
}

View File

@ -59,12 +59,16 @@ public:
static bool isStarted();
static void maybeInjectSleep();
static void maybeInjectMemoryLimitException();
private:
uint64_t cpu_time_period_us = 0;
double yield_probability = 0;
double migrate_probability = 0;
double sleep_probability = 0;
double sleep_time_us = 0;
double explicit_sleep_probability = 0;
double explicit_memory_exception_probability = 0;
inline static std::atomic<bool> started{true};

View File

@ -220,6 +220,10 @@ public:
/// Frozen by ALTER TABLE ... FREEZE ... It is used for information purposes in system.parts table.
mutable std::atomic<bool> is_frozen {false};
/// Indicated that the part was marked Outdated because it's broken, not because it's actually outdated
/// See outdateBrokenPartAndCloneToDetached(...)
mutable bool outdated_because_broken = false;
/// Flag for keep S3 data when zero-copy replication over S3 turned on.
mutable bool force_keep_shared_data = false;

View File

@ -3836,7 +3836,10 @@ void MergeTreeData::outdateBrokenPartAndCloneToDetached(const DataPartPtr & part
DataPartsLock lock = lockParts();
if (part_to_detach->getState() == DataPartState::Active)
{
part_to_detach->outdated_because_broken = true;
removePartsFromWorkingSet(NO_TRANSACTION_RAW, {part_to_detach}, true, &lock);
}
}
void MergeTreeData::forcefullyMovePartToDetachedAndRemoveFromMemory(const MergeTreeData::DataPartPtr & part_to_detach, const String & prefix, bool restore_covered)

View File

@ -2,6 +2,7 @@
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/MergeTree/ReplicatedMergeTreePartHeader.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/ThreadFuzzer.h>
#include <Interpreters/Context.h>
@ -263,6 +264,8 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPartAndFetchIfPossible(
}
}
ThreadFuzzer::maybeInjectSleep();
if (storage.createEmptyPartInsteadOfLost(zookeeper, part_name))
{
/** This situation is possible if on all the replicas where the part was, it deteriorated.
@ -383,6 +386,9 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
/// Delete part locally.
storage.outdateBrokenPartAndCloneToDetached(part, "broken");
ThreadFuzzer::maybeInjectMemoryLimitException();
ThreadFuzzer::maybeInjectSleep();
/// Part is broken, let's try to find it and fetch.
searchForMissingPartAndFetchIfPossible(part_name, exists_in_zookeeper);
@ -399,6 +405,7 @@ CheckResult ReplicatedMergeTreePartCheckThread::checkPart(const String & part_na
String message = fmt::format(fmt_string, part_name);
LOG_ERROR(log, fmt_string, part_name);
storage.outdateBrokenPartAndCloneToDetached(part, "unexpected");
ThreadFuzzer::maybeInjectSleep();
return {part_name, false, message};
}
else

View File

@ -13,6 +13,7 @@
#include <Common/formatReadable.h>
#include <Common/thread_local_rng.h>
#include <Common/typeid_cast.h>
#include <Common/ThreadFuzzer.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
@ -3508,6 +3509,8 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
part->makeCloneInDetached("covered-by-broken", getInMemoryMetadataPtr());
}
ThreadFuzzer::maybeInjectSleep();
/// It's possible that queue contains entries covered by part_name.
/// For example, we had GET_PART all_1_42_5 and MUTATE_PART all_1_42_5_63,
/// then all_1_42_5_63 was executed by fetching, but part was written to disk incorrectly.
@ -3520,6 +3523,8 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
/// because we may have some covered parts (more precisely, parts with the same min and max blocks)
queue.removePartProducingOpsInRange(zookeeper, broken_part_info, /* covering_entry= */ {});
ThreadFuzzer::maybeInjectSleep();
String part_path = fs::path(replica_path) / "parts" / part_name;
while (true)
@ -6513,10 +6518,12 @@ void StorageReplicatedMergeTree::clearOldPartsAndRemoveFromZK()
for (const auto & part : parts)
{
if (!part->is_duplicate)
parts_to_delete_completely.emplace_back(part);
else
/// Broken part can be removed from zk by removePartAndEnqueueFetch(...) only.
/// Removal without enqueueing a fetch leads to intersecting parts.
if (part->is_duplicate || part->outdated_because_broken)
parts_to_delete_only_from_filesystem.emplace_back(part);
else
parts_to_delete_completely.emplace_back(part);
}
parts.clear();

View File

@ -0,0 +1,8 @@
1 0 all_0_0_0
1 1 all_1_2_1
1 2 all_1_2_1
0
3 0 all_0_3_2
3 1 all_0_3_2
3 2 all_0_3_2
3 3 all_0_3_2

View File

@ -0,0 +1,49 @@
#!/usr/bin/env bash
# Tags: long, zookeeper
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "drop table if exists rmt1 sync;"
$CLICKHOUSE_CLIENT -q "drop table if exists rmt2 sync;"
$CLICKHOUSE_CLIENT -q "create table rmt1 (n int) engine=ReplicatedMergeTree('/test/02369/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{database}', '1') order by n
settings cleanup_delay_period=0, cleanup_delay_period_random_add=0, old_parts_lifetime=0"
$CLICKHOUSE_CLIENT -q "create table rmt2 (n int) engine=ReplicatedMergeTree('/test/02369/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/{database}', '2') order by n"
$CLICKHOUSE_CLIENT -q "system stop replicated sends rmt2"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "insert into rmt2 values (0);"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "insert into rmt1 values (1);"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "insert into rmt1 values (2);"
$CLICKHOUSE_CLIENT --receive_timeout=3 -q "system sync replica rmt1;" 2>/dev/null 1>/dev/null
$CLICKHOUSE_CLIENT --optimize_throw_if_noop=1 -q "optimize table rmt1;"
$CLICKHOUSE_CLIENT -q "system start replicated sends rmt2"
$CLICKHOUSE_CLIENT -q "system sync replica rmt1;"
$CLICKHOUSE_CLIENT -q "select 1, *, _part from rmt1 order by n;"
path=$($CLICKHOUSE_CLIENT -q "select path from system.parts where database='$CLICKHOUSE_DATABASE' and table='rmt1' and name='all_1_2_1'")
# ensure that path is absolute before removing
$CLICKHOUSE_CLIENT -q "select throwIf(substring('$path', 1, 1) != '/', 'Path is relative: $path')" || exit
rm -rf $path
$CLICKHOUSE_CLIENT -q "select * from rmt1;" 2>&1 | grep LOGICAL_ERROR
$CLICKHOUSE_CLIENT --min_bytes_to_use_direct_io=1 --local_filesystem_read_method=pread_threadpool -q "select * from rmt1;" 2>&1 | grep LOGICAL_ERROR
$CLICKHOUSE_CLIENT -q "select sleep(0.1) from numbers($(($RANDOM % 30))) settings max_block_size=1 format Null"
$CLICKHOUSE_CLIENT -q "detach table rmt1;"
$CLICKHOUSE_CLIENT -q "attach table rmt1;"
$CLICKHOUSE_CLIENT --insert_keeper_fault_injection_probability=0 -q "insert into rmt1 values (3);"
$CLICKHOUSE_CLIENT -q "system sync replica rmt1;"
$CLICKHOUSE_CLIENT -q "optimize table rmt1 final;"
$CLICKHOUSE_CLIENT -q "system sync replica rmt1;"
$CLICKHOUSE_CLIENT -q "select 3, *, _part from rmt1 order by n;"
$CLICKHOUSE_CLIENT -q "drop table rmt1 sync;"
$CLICKHOUSE_CLIENT -q "drop table rmt2 sync;"