Fix drop part

This commit is contained in:
alesapin 2021-07-06 13:58:53 +03:00
parent 2600922786
commit 53b23775a9
3 changed files with 23 additions and 14 deletions

View File

@ -1,5 +1,4 @@
#include <Storages/MergeTree/DropPartsRanges.h> #include <Storages/MergeTree/DropPartsRanges.h>
#include <common/logger_useful.h>
namespace DB namespace DB
{ {
@ -20,7 +19,7 @@ bool DropPartsRanges::isAffectedByDropRange(const std::string & new_part_name, s
{ {
if (!drop_range.isDisjoint(entry_info)) if (!drop_range.isDisjoint(entry_info))
{ {
postpone_reason = fmt::format("Has DROP RANGE with entry. Will postpone it's execution.", drop_range.getPartName()); postpone_reason = fmt::format("Has DROP RANGE affecting entry {} producing part {}. Will postpone it's execution.", drop_range.getPartName(), new_part_name);
return true; return true;
} }
} }
@ -33,22 +32,20 @@ bool DropPartsRanges::isAffectedByDropRange(const ReplicatedMergeTreeLogEntry &
return isAffectedByDropRange(entry.new_part_name, postpone_reason); return isAffectedByDropRange(entry.new_part_name, postpone_reason);
} }
void DropPartsRanges::addDropRange(const ReplicatedMergeTreeLogEntryPtr & entry, Poco::Logger * /*log*/) void DropPartsRanges::addDropRange(const ReplicatedMergeTreeLogEntryPtr & entry)
{ {
if (entry->type != ReplicatedMergeTreeLogEntry::DROP_RANGE) if (entry->type != ReplicatedMergeTreeLogEntry::DROP_RANGE)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to add entry of type {} to drop ranges, expected DROP_RANGE", entry->typeToString()); throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to add entry of type {} to drop ranges, expected DROP_RANGE", entry->typeToString());
//LOG_DEBUG(log, "ADD DROP RANGE {}", *entry->getDropRange(format_version));
MergeTreePartInfo entry_info = MergeTreePartInfo::fromPartName(*entry->getDropRange(format_version), format_version); MergeTreePartInfo entry_info = MergeTreePartInfo::fromPartName(*entry->getDropRange(format_version), format_version);
drop_ranges.emplace(entry->znode_name, entry_info); drop_ranges.emplace(entry->znode_name, entry_info);
} }
void DropPartsRanges::removeDropRange(const ReplicatedMergeTreeLogEntryPtr & entry, Poco::Logger * /*log*/) void DropPartsRanges::removeDropRange(const ReplicatedMergeTreeLogEntryPtr & entry)
{ {
if (entry->type != ReplicatedMergeTreeLogEntry::DROP_RANGE) if (entry->type != ReplicatedMergeTreeLogEntry::DROP_RANGE)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to remove entry of type {} from drop ranges, expected DROP_RANGE", entry->typeToString()); throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to remove entry of type {} from drop ranges, expected DROP_RANGE", entry->typeToString());
//LOG_DEBUG(log, "REMOVE DROP RANGE {}", *entry->getDropRange(format_version));
drop_ranges.erase(entry->znode_name); drop_ranges.erase(entry->znode_name);
} }

View File

@ -1,6 +1,6 @@
#pragma once #pragma once
#include <map> #include <unordered_map>
#include <Storages/MergeTree/MergeTreePartInfo.h> #include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h> #include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h> #include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
@ -8,26 +8,35 @@
namespace DB namespace DB
{ {
/// All drop ranges in ReplicatedQueue.
/// Used to postpone execution of entries affected by DROP RANGE
class DropPartsRanges class DropPartsRanges
{ {
private: private:
MergeTreeDataFormatVersion format_version; MergeTreeDataFormatVersion format_version;
std::map<std::string, MergeTreePartInfo> drop_ranges; /// znode_name -> drop_range
std::unordered_map<std::string, MergeTreePartInfo> drop_ranges;
public: public:
explicit DropPartsRanges(MergeTreeDataFormatVersion format_version_) explicit DropPartsRanges(MergeTreeDataFormatVersion format_version_)
: format_version(format_version_) : format_version(format_version_)
{} {}
/// Entry is affected by DROP_RANGE and must be postponed
bool isAffectedByDropRange(const ReplicatedMergeTreeLogEntry & entry, std::string & postpone_reason) const; bool isAffectedByDropRange(const ReplicatedMergeTreeLogEntry & entry, std::string & postpone_reason) const;
/// Part is affected by DROP_RANGE and must be postponed
bool isAffectedByDropRange(const std::string & new_part_name, std::string & postpone_reason) const; bool isAffectedByDropRange(const std::string & new_part_name, std::string & postpone_reason) const;
/// Already has equal DROP_RANGE. Don't need to assign new one
bool hasDropRange(const MergeTreePartInfo & new_drop_range_info) const; bool hasDropRange(const MergeTreePartInfo & new_drop_range_info) const;
void addDropRange(const ReplicatedMergeTreeLogEntryPtr & entry, Poco::Logger * log); /// Add DROP_RANGE to map
void addDropRange(const ReplicatedMergeTreeLogEntryPtr & entry);
void removeDropRange(const ReplicatedMergeTreeLogEntryPtr & entry, Poco::Logger * log); /// Remove DROP_RANGE from map
void removeDropRange(const ReplicatedMergeTreeLogEntryPtr & entry);
}; };

View File

@ -169,9 +169,10 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
} }
else else
{ {
drop_ranges.addDropRange(entry, log); drop_ranges.addDropRange(entry);
auto drop_range = *entry->getDropRange(format_version); auto drop_range = *entry->getDropRange(format_version);
/// DROP PARTS removes parts from virtual parts
/// DROP PARTS (not DROP PARTITIONS) removes parts from virtual parts.
MergeTreePartInfo drop_range_info = MergeTreePartInfo::fromPartName(drop_range, format_version); MergeTreePartInfo drop_range_info = MergeTreePartInfo::fromPartName(drop_range, format_version);
if (!drop_range_info.isFakeDropRangePart() && virtual_parts.getContainingPart(drop_range_info) == drop_range) if (!drop_range_info.isFakeDropRangePart() && virtual_parts.getContainingPart(drop_range_info) == drop_range)
virtual_parts.removePartAndCoveredParts(drop_range); virtual_parts.removePartAndCoveredParts(drop_range);
@ -271,7 +272,7 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
if (entry->type == LogEntry::DROP_RANGE) if (entry->type == LogEntry::DROP_RANGE)
{ {
drop_ranges.removeDropRange(entry, log); drop_ranges.removeDropRange(entry);
} }
if (entry->type == LogEntry::ALTER_METADATA) if (entry->type == LogEntry::ALTER_METADATA)
@ -284,7 +285,7 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
{ {
if (entry->type == LogEntry::DROP_RANGE) if (entry->type == LogEntry::DROP_RANGE)
{ {
drop_ranges.removeDropRange(entry, log); drop_ranges.removeDropRange(entry);
} }
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version)) for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
@ -996,6 +997,8 @@ bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & pa
{ {
std::lock_guard lock(state_mutex); std::lock_guard lock(state_mutex);
/// FIXME get rid of actual_part_name.
/// If new covering part jumps over DROP_RANGE we should execute drop range first
if (drop_ranges.isAffectedByDropRange(part_name, reject_reason)) if (drop_ranges.isAffectedByDropRange(part_name, reject_reason))
return false; return false;