Merge pull request #44202 from ClickHouse/fix_an_assertion_in_transactions

Fix an assertion in transactions
This commit is contained in:
Alexander Tokmakov 2022-12-14 14:25:09 +03:00 committed by GitHub
commit 816eba6149
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 19 additions and 13 deletions

View File

@ -1126,6 +1126,7 @@ void DatabaseCatalog::cleanupStoreDirectoryTask()
continue; continue;
size_t affected_dirs = 0; size_t affected_dirs = 0;
size_t checked_dirs = 0;
for (auto it = disk->iterateDirectory("store"); it->isValid(); it->next()) for (auto it = disk->iterateDirectory("store"); it->isValid(); it->next())
{ {
String prefix = it->name(); String prefix = it->name();
@ -1135,6 +1136,7 @@ void DatabaseCatalog::cleanupStoreDirectoryTask()
if (!expected_prefix_dir) if (!expected_prefix_dir)
{ {
LOG_WARNING(log, "Found invalid directory {} on disk {}, will try to remove it", it->path(), disk_name); LOG_WARNING(log, "Found invalid directory {} on disk {}, will try to remove it", it->path(), disk_name);
checked_dirs += 1;
affected_dirs += maybeRemoveDirectory(disk_name, disk, it->path()); affected_dirs += maybeRemoveDirectory(disk_name, disk, it->path());
continue; continue;
} }
@ -1150,6 +1152,7 @@ void DatabaseCatalog::cleanupStoreDirectoryTask()
if (!expected_dir) if (!expected_dir)
{ {
LOG_WARNING(log, "Found invalid directory {} on disk {}, will try to remove it", jt->path(), disk_name); LOG_WARNING(log, "Found invalid directory {} on disk {}, will try to remove it", jt->path(), disk_name);
checked_dirs += 1;
affected_dirs += maybeRemoveDirectory(disk_name, disk, jt->path()); affected_dirs += maybeRemoveDirectory(disk_name, disk, jt->path());
continue; continue;
} }
@ -1161,6 +1164,7 @@ void DatabaseCatalog::cleanupStoreDirectoryTask()
/// so it looks safe enough to remove directory if we don't have uuid mapping for it. /// so it looks safe enough to remove directory if we don't have uuid mapping for it.
/// No table or database using this directory should concurrently appear, /// No table or database using this directory should concurrently appear,
/// because creation of new table would fail with "directory already exists". /// because creation of new table would fail with "directory already exists".
checked_dirs += 1;
affected_dirs += maybeRemoveDirectory(disk_name, disk, jt->path()); affected_dirs += maybeRemoveDirectory(disk_name, disk, jt->path());
} }
} }
@ -1168,7 +1172,7 @@ void DatabaseCatalog::cleanupStoreDirectoryTask()
if (affected_dirs) if (affected_dirs)
LOG_INFO(log, "Cleaned up {} directories from store/ on disk {}", affected_dirs, disk_name); LOG_INFO(log, "Cleaned up {} directories from store/ on disk {}", affected_dirs, disk_name);
else if (checked_dirs == 0)
LOG_TEST(log, "Nothing to clean up from store/ on disk {}", disk_name); LOG_TEST(log, "Nothing to clean up from store/ on disk {}", disk_name);
} }

View File

@ -3884,9 +3884,9 @@ MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVectorInPartiti
return res; return res;
} }
MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const MergeTreePartInfo & part_info, const MergeTreeData::DataPartStates & valid_states) MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const MergeTreePartInfo & part_info, const MergeTreeData::DataPartStates & valid_states, DataPartsLock * acquired_lock)
{ {
auto lock = lockParts(); auto lock = (acquired_lock) ? DataPartsLock() : lockParts();
auto it = data_parts_by_info.find(part_info); auto it = data_parts_by_info.find(part_info);
if (it == data_parts_by_info.end()) if (it == data_parts_by_info.end())
@ -3899,9 +3899,9 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const MergeTreePartInf
return nullptr; return nullptr;
} }
MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_name, const MergeTreeData::DataPartStates & valid_states) MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_name, const MergeTreeData::DataPartStates & valid_states, DataPartsLock * acquired_lock)
{ {
return getPartIfExists(MergeTreePartInfo::fromPartName(part_name, format_version), valid_states); return getPartIfExists(MergeTreePartInfo::fromPartName(part_name, format_version), valid_states, acquired_lock);
} }

View File

@ -514,8 +514,8 @@ public:
DataPartsVector getDataPartsVectorInPartitionForInternalUsage(const DataPartStates & affordable_states, const String & partition_id, DataPartsLock * acquired_lock = nullptr) const; DataPartsVector getDataPartsVectorInPartitionForInternalUsage(const DataPartStates & affordable_states, const String & partition_id, DataPartsLock * acquired_lock = nullptr) const;
/// Returns the part with the given name and state or nullptr if no such part. /// Returns the part with the given name and state or nullptr if no such part.
DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states); DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states, DataPartsLock * acquired_lock = nullptr);
DataPartPtr getPartIfExists(const MergeTreePartInfo & part_info, const DataPartStates & valid_states); DataPartPtr getPartIfExists(const MergeTreePartInfo & part_info, const DataPartStates & valid_states, DataPartsLock * acquired_lock = nullptr);
/// Total size of active parts in bytes. /// Total size of active parts in bytes.
size_t getTotalActiveSizeInBytes() const; size_t getTotalActiveSizeInBytes() const;

View File

@ -1370,19 +1370,21 @@ MergeTreeDataPartPtr StorageMergeTree::outdatePart(MergeTreeTransaction * txn, c
{ {
/// Forcefully stop merges and make part outdated /// Forcefully stop merges and make part outdated
auto merge_blocker = stopMergesAndWait(); auto merge_blocker = stopMergesAndWait();
auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active}); auto parts_lock = lockParts();
auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active}, &parts_lock);
if (!part) if (!part)
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "Part {} not found, won't try to drop it.", part_name); throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "Part {} not found, won't try to drop it.", part_name);
removePartsFromWorkingSet(txn, {part}, true); removePartsFromWorkingSet(txn, {part}, true, &parts_lock);
return part; return part;
} }
else else
{ {
/// Wait merges selector /// Wait merges selector
std::unique_lock lock(currently_processing_in_background_mutex); std::unique_lock lock(currently_processing_in_background_mutex);
auto parts_lock = lockParts();
auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active}); auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active}, &parts_lock);
/// It's okay, part was already removed /// It's okay, part was already removed
if (!part) if (!part)
return nullptr; return nullptr;
@ -1392,7 +1394,7 @@ MergeTreeDataPartPtr StorageMergeTree::outdatePart(MergeTreeTransaction * txn, c
if (currently_merging_mutating_parts.contains(part)) if (currently_merging_mutating_parts.contains(part))
return nullptr; return nullptr;
removePartsFromWorkingSet(txn, {part}, true); removePartsFromWorkingSet(txn, {part}, true, &parts_lock);
return part; return part;
} }
} }

View File

@ -65,7 +65,7 @@ $CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '0', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '1', 1);" $CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '1', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (2, '0', 1);" $CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (2, '0', 1);"
query_with_retry "ALTER TABLE src MOVE PARTITION 1 TO TABLE dst;" &>- query_with_retry "ALTER TABLE src MOVE PARTITION 1 TO TABLE dst;" &>/dev/null
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst;" $CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM src;" $CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM src;"
@ -85,7 +85,7 @@ $CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '0', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '1', 1);" $CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (1, '1', 1);"
$CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (2, '0', 1);" $CLICKHOUSE_CLIENT --query="INSERT INTO src VALUES (2, '0', 1);"
query_with_retry "ALTER TABLE src MOVE PARTITION 1 TO TABLE dst;" &>- query_with_retry "ALTER TABLE src MOVE PARTITION 1 TO TABLE dst;" &>/dev/null
$CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst;" $CLICKHOUSE_CLIENT --query="SYSTEM SYNC REPLICA dst;"
$CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM src;" $CLICKHOUSE_CLIENT --query="SELECT count(), sum(d) FROM src;"