Use keep_s3_on_delet flag instead of DeleteOnDestroyKeepS3 state

This commit is contained in:
Anton Ivashkin 2021-05-19 18:46:27 +03:00
parent d256cf4edf
commit 29336a4a34
4 changed files with 27 additions and 27 deletions

View File

@ -483,6 +483,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
= data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true); = data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true);
if (reservation) if (reservation)
{ {
/// When we have multi-volume storage, one of them was chosen, depends on TTL, free space, etc.
/// Chosen one may be S3 or not.
DiskPtr disk = reservation->getDisk(); DiskPtr disk = reservation->getDisk();
if (disk && disk->getType() == DiskType::Type::S3) if (disk && disk->getType() == DiskType::Type::S3)
{ {

View File

@ -187,7 +187,6 @@ static void incrementStateMetric(IMergeTreeDataPart::State state)
CurrentMetrics::add(CurrentMetrics::PartsDeleting); CurrentMetrics::add(CurrentMetrics::PartsDeleting);
return; return;
case IMergeTreeDataPart::State::DeleteOnDestroy: case IMergeTreeDataPart::State::DeleteOnDestroy:
case IMergeTreeDataPart::State::DeleteOnDestroyKeepS3:
CurrentMetrics::add(CurrentMetrics::PartsDeleteOnDestroy); CurrentMetrics::add(CurrentMetrics::PartsDeleteOnDestroy);
return; return;
} }
@ -213,7 +212,6 @@ static void decrementStateMetric(IMergeTreeDataPart::State state)
CurrentMetrics::sub(CurrentMetrics::PartsDeleting); CurrentMetrics::sub(CurrentMetrics::PartsDeleting);
return; return;
case IMergeTreeDataPart::State::DeleteOnDestroy: case IMergeTreeDataPart::State::DeleteOnDestroy:
case IMergeTreeDataPart::State::DeleteOnDestroyKeepS3:
CurrentMetrics::sub(CurrentMetrics::PartsDeleteOnDestroy); CurrentMetrics::sub(CurrentMetrics::PartsDeleteOnDestroy);
return; return;
} }
@ -407,7 +405,7 @@ void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns)
void IMergeTreeDataPart::removeIfNeeded() void IMergeTreeDataPart::removeIfNeeded()
{ {
if (state == State::DeleteOnDestroy || state == State::DeleteOnDestroyKeepS3 || is_temp) if (state == State::DeleteOnDestroy || is_temp)
{ {
try try
{ {
@ -431,11 +429,11 @@ void IMergeTreeDataPart::removeIfNeeded()
} }
if (parent_part) if (parent_part)
projectionRemove(parent_part->getFullRelativePath(), state == State::DeleteOnDestroyKeepS3); projectionRemove(parent_part->getFullRelativePath(), keep_s3_on_delete);
else else
remove(state == State::DeleteOnDestroyKeepS3); remove(keep_s3_on_delete);
if (state == State::DeleteOnDestroy || state == State::DeleteOnDestroyKeepS3) if (state == State::DeleteOnDestroy)
{ {
LOG_TRACE(storage.log, "Removed part from old location {}", path); LOG_TRACE(storage.log, "Removed part from old location {}", path);
} }
@ -480,8 +478,6 @@ String IMergeTreeDataPart::stateToString(IMergeTreeDataPart::State state)
return "Deleting"; return "Deleting";
case State::DeleteOnDestroy: case State::DeleteOnDestroy:
return "DeleteOnDestroy"; return "DeleteOnDestroy";
case State::DeleteOnDestroyKeepS3:
return "DeleteOnDestroyKeepS3";
} }
__builtin_unreachable(); __builtin_unreachable();

View File

@ -205,29 +205,30 @@ public:
/// Frozen by ALTER TABLE ... FREEZE ... It is used for information purposes in system.parts table. /// Frozen by ALTER TABLE ... FREEZE ... It is used for information purposes in system.parts table.
mutable std::atomic<bool> is_frozen {false}; mutable std::atomic<bool> is_frozen {false};
/// Flag for keep S3 data when zero-copy replication over S3 turned on.
mutable bool keep_s3_on_delete = false;
/** /**
* Part state is a stage of its lifetime. States are ordered and state of a part could be increased only. * Part state is a stage of its lifetime. States are ordered and state of a part could be increased only.
* Part state should be modified under data_parts mutex. * Part state should be modified under data_parts mutex.
* *
* Possible state transitions: * Possible state transitions:
* Temporary -> Precommitted: we are trying to commit a fetched, inserted or merged part to active set * Temporary -> Precommitted: we are trying to commit a fetched, inserted or merged part to active set
* Precommitted -> Outdated: we could not add a part to active set and are doing a rollback (for example it is duplicated part) * Precommitted -> Outdated: we could not add a part to active set and are doing a rollback (for example it is duplicated part)
* Precommitted -> Committed: we successfully committed a part to active dataset * Precommitted -> Committed: we successfully committed a part to active dataset
* Precommitted -> Outdated: a part was replaced by a covering part or DROP PARTITION * Precommitted -> Outdated: a part was replaced by a covering part or DROP PARTITION
* Outdated -> Deleting: a cleaner selected this part for deletion * Outdated -> Deleting: a cleaner selected this part for deletion
* Deleting -> Outdated: if an ZooKeeper error occurred during the deletion, we will retry deletion * Deleting -> Outdated: if an ZooKeeper error occurred during the deletion, we will retry deletion
* Committed -> DeleteOnDestroy if part was moved to another disk * Committed -> DeleteOnDestroy: if part was moved to another disk
* Committed -> DeleteOnDestroyKeepS3 if part was moved to another disk but shared data on S3
*/ */
enum class State enum class State
{ {
Temporary, /// the part is generating now, it is not in data_parts list Temporary, /// the part is generating now, it is not in data_parts list
PreCommitted, /// the part is in data_parts, but not used for SELECTs PreCommitted, /// the part is in data_parts, but not used for SELECTs
Committed, /// active data part, used by current and upcoming SELECTs Committed, /// active data part, used by current and upcoming SELECTs
Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes Outdated, /// not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes
Deleting, /// not active data part with identity refcounter, it is deleting right now by a cleaner Deleting, /// not active data part with identity refcounter, it is deleting right now by a cleaner
DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor DeleteOnDestroy, /// part was moved to another disk and should be deleted in own destructor
DeleteOnDestroyKeepS3, /// same as DeleteOnDestroy but shared S3 data should be keeped
}; };
using TTLInfo = MergeTreeDataPartTTLInfo; using TTLInfo = MergeTreeDataPartTTLInfo;

View File

@ -2727,20 +2727,21 @@ void MergeTreeData::swapActivePart(MergeTreeData::DataPartPtr part_copy)
/// We do not check allow_s3_zero_copy_replication here because data may be shared /// We do not check allow_s3_zero_copy_replication here because data may be shared
/// when allow_s3_zero_copy_replication turned on and off again /// when allow_s3_zero_copy_replication turned on and off again
bool keep_s3 = false;
original_active_part->keep_s3_on_delete = false;
if (original_active_part->volume->getDisk()->getType() == DiskType::Type::S3) if (original_active_part->volume->getDisk()->getType() == DiskType::Type::S3)
{ {
if (part_copy->volume->getDisk()->getType() == DiskType::Type::S3 if (part_copy->volume->getDisk()->getType() == DiskType::Type::S3
&& original_active_part->getUniqueId() == part_copy->getUniqueId()) && original_active_part->getUniqueId() == part_copy->getUniqueId())
{ /// May be when several volumes use the same S3 storage { /// May be when several volumes use the same S3 storage
keep_s3 = true; original_active_part->keep_s3_on_delete = true;
} }
else else
keep_s3 = !unlockSharedData(*original_active_part); original_active_part->keep_s3_on_delete = !unlockSharedData(*original_active_part);
} }
modifyPartState(original_active_part, keep_s3 ? DataPartState::DeleteOnDestroyKeepS3 : DataPartState::DeleteOnDestroy); modifyPartState(original_active_part, DataPartState::DeleteOnDestroy);
data_parts_indexes.erase(active_part_it); data_parts_indexes.erase(active_part_it);
auto part_it = data_parts_indexes.insert(part_copy).first; auto part_it = data_parts_indexes.insert(part_copy).first;