Merge pull request #48792 from ClickHouse/fix_some_tests3

Fix some tests
This commit is contained in:
Alexey Milovidov 2023-04-29 22:04:11 +03:00 committed by GitHub
commit 15d0379e2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 32 additions and 19 deletions

View File

@ -184,7 +184,7 @@ scope_guard MergeTreeTransaction::beforeCommit()
/// We should wait for mutations to finish before committing transaction, because some mutation may fail and cause rollback.
for (const auto & table_and_mutation : mutations_to_wait)
table_and_mutation.first->waitForMutation(table_and_mutation.second);
table_and_mutation.first->waitForMutation(table_and_mutation.second, /* wait_for_another_mutation */ false);
assert([&]()
{

View File

@ -512,7 +512,7 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Mutations are not supported by storage {}", getName());
}
virtual void waitForMutation(const String & /*mutation_id*/)
virtual void waitForMutation(const String & /*mutation_id*/, bool /*wait_for_another_mutation*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Mutations are not supported by storage {}", getName());
}

View File

@ -18,6 +18,7 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/typeid_cast.h>
#include <Common/CurrentMetrics.h>
#include <Common/ThreadFuzzer.h>
#include <Compression/CompressedReadBuffer.h>
#include <Core/QueryProcessingStage.h>
#include <DataTypes/DataTypeEnum.h>
@ -2044,6 +2045,7 @@ size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lif
{
if (isOldPartDirectory(disk, it->path(), deadline))
{
ThreadFuzzer::maybeInjectSleep();
if (temporary_parts.contains(basename))
{
/// Actually we don't rely on temporary_directories_lifetime when removing old temporaries directories,
@ -2051,6 +2053,13 @@ size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lif
LOG_INFO(LogFrequencyLimiter(log, 10), "{} is in use (by merge/mutation/INSERT) (consider increasing temporary_directories_lifetime setting)", full_path);
continue;
}
else if (!disk->exists(it->path()))
{
/// We should recheck that the dir exists, otherwise we can get "No such file or directory"
/// due to a race condition with "Renaming temporary part" (temporary part holder could be already released, so the check above is not enough)
LOG_WARNING(log, "Temporary directory {} suddenly disappeared while iterating, assuming it was concurrently renamed to persistent", it->path());
continue;
}
else
{
LOG_WARNING(log, "Removing temporary directory {}", full_path);

View File

@ -339,7 +339,7 @@ void StorageMergeTree::alter(
if (prev_mutation != 0)
{
LOG_DEBUG(log, "Cannot change metadata with barrier alter query, will wait for mutation {}", prev_mutation);
waitForMutation(prev_mutation);
waitForMutation(prev_mutation, /* from_another_mutation */ true);
LOG_DEBUG(log, "Mutation {} finished", prev_mutation);
}
}
@ -535,19 +535,19 @@ void StorageMergeTree::updateMutationEntriesErrors(FutureMergedMutatedPartPtr re
mutation_wait_event.notify_all();
}
void StorageMergeTree::waitForMutation(Int64 version)
void StorageMergeTree::waitForMutation(Int64 version, bool wait_for_another_mutation)
{
String mutation_id = MergeTreeMutationEntry::versionToFileName(version);
waitForMutation(version, mutation_id);
waitForMutation(version, mutation_id, wait_for_another_mutation);
}
void StorageMergeTree::waitForMutation(const String & mutation_id)
void StorageMergeTree::waitForMutation(const String & mutation_id, bool wait_for_another_mutation)
{
Int64 version = MergeTreeMutationEntry::parseFileName(mutation_id);
waitForMutation(version, mutation_id);
waitForMutation(version, mutation_id, wait_for_another_mutation);
}
void StorageMergeTree::waitForMutation(Int64 version, const String & mutation_id)
void StorageMergeTree::waitForMutation(Int64 version, const String & mutation_id, bool wait_for_another_mutation)
{
LOG_INFO(log, "Waiting mutation: {}", mutation_id);
{
@ -567,7 +567,7 @@ void StorageMergeTree::waitForMutation(Int64 version, const String & mutation_id
std::set<String> mutation_ids;
mutation_ids.insert(mutation_id);
auto mutation_status = getIncompleteMutationsStatus(version, &mutation_ids);
auto mutation_status = getIncompleteMutationsStatus(version, &mutation_ids, wait_for_another_mutation);
checkMutationStatus(mutation_status, mutation_ids);
LOG_INFO(log, "Mutation {} done", mutation_id);
@ -617,7 +617,8 @@ bool comparator(const PartVersionWithName & f, const PartVersionWithName & s)
}
std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsStatus(Int64 mutation_version, std::set<String> * mutation_ids) const
std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsStatus(
Int64 mutation_version, std::set<String> * mutation_ids, bool from_another_mutation) const
{
std::lock_guard lock(currently_processing_in_background_mutex);
@ -631,7 +632,9 @@ std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsS
const auto & mutation_entry = current_mutation_it->second;
auto txn = tryGetTransactionForMutation(mutation_entry, log);
assert(txn || mutation_entry.tid.isPrehistoric());
/// There's no way a transaction may finish before a mutation that was started by the transaction.
/// But sometimes we need to check status of an unrelated mutation, in this case we don't care about transactions.
assert(txn || mutation_entry.tid.isPrehistoric() || from_another_mutation);
auto data_parts = getVisibleDataPartsVector(txn);
for (const auto & data_part : data_parts)
{
@ -656,7 +659,7 @@ std::optional<MergeTreeMutationStatus> StorageMergeTree::getIncompleteMutationsS
mutation_ids->insert(it->second.file_name);
}
}
else if (txn)
else if (txn && !from_another_mutation)
{
/// Part is locked by concurrent transaction, most likely it will never be mutated
TIDHash part_locked = data_part->version.removal_tid_lock.load();

View File

@ -190,9 +190,9 @@ private:
/// and into in-memory structures. Wake up merge-mutation task.
Int64 startMutation(const MutationCommands & commands, ContextPtr query_context);
/// Wait until mutation with version will finish mutation for all parts
void waitForMutation(Int64 version);
void waitForMutation(const String & mutation_id) override;
void waitForMutation(Int64 version, const String & mutation_id);
void waitForMutation(Int64 version, bool wait_for_another_mutation = false);
void waitForMutation(const String & mutation_id, bool wait_for_another_mutation) override;
void waitForMutation(Int64 version, const String & mutation_id, bool wait_for_another_mutation = false);
void setMutationCSN(const String & mutation_id, CSN csn) override;
@ -253,7 +253,8 @@ private:
/// because we can execute several mutations at once. Order is important for
/// better readability of exception message. If mutation was killed doesn't
/// return any ids.
std::optional<MergeTreeMutationStatus> getIncompleteMutationsStatus(Int64 mutation_version, std::set<String> * mutation_ids = nullptr) const;
std::optional<MergeTreeMutationStatus> getIncompleteMutationsStatus(Int64 mutation_version, std::set<String> * mutation_ids = nullptr,
bool from_another_mutation = false) const;
void fillNewPartName(MutableDataPartPtr & part, DataPartsLock & lock);

View File

@ -3,6 +3,6 @@
<!-- Default is 86400 (1 day), but we have protection from removal of tmp dirs that are currently in use -->
<temporary_directories_lifetime>1</temporary_directories_lifetime>
<!-- Default is 60 seconds, but let's make tests more aggressive -->
<merge_tree_clear_old_temporary_directories_interval_seconds>10</merge_tree_clear_old_temporary_directories_interval_seconds>
<merge_tree_clear_old_temporary_directories_interval_seconds>5</merge_tree_clear_old_temporary_directories_interval_seconds>
</merge_tree>
</clickhouse>

View File

@ -13,8 +13,8 @@ $CLICKHOUSE_CLIENT -n -q "
DROP TABLE IF EXISTS alter_table0;
DROP TABLE IF EXISTS alter_table1;
CREATE TABLE alter_table0 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r1') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50));
CREATE TABLE alter_table1 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r2') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50));
CREATE TABLE alter_table0 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r1') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50 + 100));
CREATE TABLE alter_table1 (a UInt8, b Int16, c Float32, d String, e Array(UInt8), f Nullable(UUID), g Tuple(UInt8, UInt16)) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/alter_table', 'r2') ORDER BY a PARTITION BY b % 10 SETTINGS old_parts_lifetime = 1, cleanup_delay_period = 1, cleanup_delay_period_random_add = 0, replicated_max_mutations_in_one_entry = $(($RANDOM / 50 + 200));
"
function thread1()