fix some tests

This commit is contained in:
Alexander Tokmakov 2023-04-14 19:42:17 +02:00
parent 93368b4d55
commit 70c1000c22
6 changed files with 30 additions and 17 deletions

View File

@ -184,7 +184,7 @@ scope_guard MergeTreeTransaction::beforeCommit()
/// We should wait for mutations to finish before committing transaction, because some mutation may fail and cause rollback.
for (const auto & table_and_mutation : mutations_to_wait)
table_and_mutation.first->waitForMutation(table_and_mutation.second);
table_and_mutation.first->waitForMutation(table_and_mutation.second, /* wait_for_another_mutation */ false);
assert([&]()
{

View File

@ -503,7 +503,7 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Mutations are not supported by storage {}", getName());
}
virtual void waitForMutation(const String & /*mutation_id*/)
virtual void waitForMutation(const String & /*mutation_id*/, bool /*wait_for_another_mutation*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Mutations are not supported by storage {}", getName());
}

View File

@ -18,6 +18,7 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/typeid_cast.h>
#include <Common/CurrentMetrics.h>
#include <Common/ThreadFuzzer.h>
#include <Compression/CompressedReadBuffer.h>
#include <Core/QueryProcessingStage.h>
#include <DataTypes/DataTypeEnum.h>
@ -2001,6 +2002,7 @@ size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lif
{
if (isOldPartDirectory(disk, it->path(), deadline))
{
ThreadFuzzer::maybeInjectSleep();
if (temporary_parts.contains(basename))
{
/// Actually we don't rely on temporary_directories_lifetime when removing old temporaries directories,
@ -2008,6 +2010,13 @@ size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lif
LOG_INFO(LogFrequencyLimiter(log, 10), "{} is in use (by merge/mutation/INSERT) (consider increasing temporary_directories_lifetime setting)", full_path);
continue;
}
else if (!disk->exists(it->path()))
{
/// We should recheck that the dir exists, otherwise we can get "No such file or directory"
/// due to a race condition with "Renaming temporary part" (temporary part holder could be already released, so the check above is not enough)
LOG_WARNING(log, "Temporary directory {} suddenly disappeared while iterating, assuming it was concurrently renamed to persistent", it->path());
continue;
}
else
{
LOG_WARNING(log, "Removing temporary directory {}", full_path);

View File

@ -341,7 +341,7 @@ void StorageMergeTree::alter(
if (prev_mutation != 0)
{
LOG_DEBUG(log, "Cannot change metadata with barrier alter query, will wait for mutation {}", prev_mutation);
waitForMutation(prev_mutation);
waitForMutation(prev_mutation, /* from_another_mutation */ true);
LOG_DEBUG(log, "Mutation {} finished", prev_mutation);
}
}
@ -537,19 +537,19 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
mutation_wait_event.notify_all();
}
void StorageMergeTree::waitForMutation(Int64 version)
void StorageMergeTree::waitForMutation(Int64 version, bool wait_for_another_mutation)
{
String mutation_id = MergeTreeMutationEntry::versionToFileName(version);
waitForMutation(version, mutation_id);
waitForMutation(version, mutation_id, wait_for_another_mutation);
}
void StorageMergeTree::waitForMutation(const String & mutation_id)
void StorageMergeTree::waitForMutation(const String & mutation_id, bool wait_for_another_mutation)
{
Int64 version = MergeTreeMutationEntry::parseFileName(mutation_id);
waitForMutation(version, mutation_id);
waitForMutation(version, mutation_id, wait_for_another_mutation);
}
void StorageMergeTree::waitForMutation(Int64 version, const String & mutation_id)
void StorageMergeTree::waitForMutation(Int64 version, const String & mutation_id, bool wait_for_another_mutation)
{
LOG_INFO(log, "Waiting mutation: {}", mutation_id);
{
@ -569,7 +569,7 @@ void StorageMergeTree::waitForMutation(Int64 version, const String & mutation_id
std::set<String> mutation_ids;
mutation_ids.insert(mutation_id);
auto mutation_status = getIncompleteMutationsStatus(version, &mutation_ids);
auto mutation_status = getIncompleteMutationsStatus(version, &mutation_ids, wait_for_another_mutation);
checkMutationStatus(mutation_status, mutation_ids);
LOG_INFO(log, "Mutation {} done", mutation_id);
@ -619,7 +619,8 @@ bool comparator(const PartVersionWithName & f, const PartVersionWithName & s)
}
std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsStatus(Int64 mutation_version, std::set<String> * mutation_ids) const
std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsStatus(
Int64 mutation_version, std::set<String> * mutation_ids, bool from_another_mutation) const
{
std::lock_guard lock(currently_processing_in_background_mutex);
@ -633,7 +634,9 @@ std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsS
const auto & mutation_entry = current_mutation_it->second;
auto txn = tryGetTransactionForMutation(mutation_entry, log);
assert(txn || mutation_entry.tid.isPrehistoric());
/// There's no way a transaction may finish before a mutation that was started by the transaction.
/// But sometimes we need to check status of an unrelated mutation, in this case we don't care about transactions.
assert(txn || mutation_entry.tid.isPrehistoric() || from_another_mutation);
auto data_parts = getVisibleDataPartsVector(txn);
for (const auto & data_part : data_parts)
{
@ -658,7 +661,7 @@ std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsS
mutation_ids->insert(it->second.file_name);
}
}
else if (txn)
else if (txn && !from_another_mutation)
{
/// Part is locked by concurrent transaction, most likely it will never be mutated
TIDHash part_locked = data_part->version.removal_tid_lock.load();

View File

@ -183,9 +183,9 @@ private:
/// and into in-memory structures. Wake up merge-mutation task.
Int64 startMutation(const MutationCommands & commands, ContextPtr query_context);
/// Wait until mutation with version will finish mutation for all parts
void waitForMutation(Int64 version);
void waitForMutation(const String & mutation_id) override;
void waitForMutation(Int64 version, const String & mutation_id);
void waitForMutation(Int64 version, bool wait_for_another_mutation = false);
void waitForMutation(const String & mutation_id, bool wait_for_another_mutation) override;
void waitForMutation(Int64 version, const String & mutation_id, bool wait_for_another_mutation = false);
void setMutationCSN(const String & mutation_id, CSN csn) override;
@ -246,7 +246,8 @@ private:
/// because we can execute several mutations at once. Order is important for
/// better readability of exception message. If mutation was killed doesn't
/// return any ids.
std::optional<MergeTreeMutationStatus> getIncompleteMutationsStatus(Int64 mutation_version, std::set<String> * mutation_ids = nullptr) const;
std::optional<MergeTreeMutationStatus> getIncompleteMutationsStatus(Int64 mutation_version, std::set<String> * mutation_ids = nullptr,
bool from_another_mutation = true) const;
void fillNewPartName(MutableDataPartPtr & part, DataPartsLock & lock);

View File

@ -3,6 +3,6 @@
<!-- Default is 86400 (1 day), but we have protection from removal of tmp dirs that are currently in use -->
<temporary_directories_lifetime>1</temporary_directories_lifetime>
<!-- Default is 60 seconds, but let's make tests more aggressive -->
<merge_tree_clear_old_temporary_directories_interval_seconds>10</merge_tree_clear_old_temporary_directories_interval_seconds>
<merge_tree_clear_old_temporary_directories_interval_seconds>5</merge_tree_clear_old_temporary_directories_interval_seconds>
</merge_tree>
</clickhouse>