Merge pull request #25884 from ClickHouse/fix_drop_part_in_queue

Relax `DROP PART` guarantees and turn on checks in ReplicationQueue.
This commit is contained in:
alesapin 2021-07-07 10:48:48 +03:00 committed by GitHub
commit 0d8844c510
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 249 additions and 18 deletions

View File

@ -8,6 +8,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ActiveDataPartSet::ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, const Strings & names)
: format_version(format_version_)
@ -16,8 +21,7 @@ ActiveDataPartSet::ActiveDataPartSet(MergeTreeDataFormatVersion format_version_,
add(name);
}
/// FIXME replace warnings with logical errors
bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts, Poco::Logger * log)
bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts)
{
/// TODO make it exception safe (out_replaced_parts->push_back(...) may throw)
auto part_info = MergeTreePartInfo::fromPartName(name, format_version);
@ -38,11 +42,7 @@ bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts, P
if (!part_info.contains(it->first))
{
if (!part_info.isDisjoint(it->first))
{
if (log)
LOG_ERROR(log, "Part {} intersects previous part {}. It is a bug.", name, it->first.getPartName());
assert(false);
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects previous part {}. It is a bug.", name, it->first.getPartName());
++it;
break;
}
@ -65,11 +65,7 @@ bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts, P
}
if (it != part_info_to_name.end() && !part_info.isDisjoint(it->first))
{
if (log)
LOG_ERROR(log, "Part {} intersects next part {}. It is a bug.", name, it->first.getPartName());
assert(false);
}
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects next part {}. It is a bug.", name, it->first.getPartName());
part_info_to_name.emplace(part_info, name);
return true;

View File

@ -50,7 +50,7 @@ public:
/// Returns true if the part was actually added. If out_replaced_parts != nullptr, it will contain
/// parts that were replaced from the set by the newly added part.
bool add(const String & name, Strings * out_replaced_parts = nullptr, Poco::Logger * log = nullptr);
bool add(const String & name, Strings * out_replaced_parts = nullptr);
bool remove(const MergeTreePartInfo & part_info)
{

View File

@ -0,0 +1,65 @@
#include <Storages/MergeTree/DropPartsRanges.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
bool DropPartsRanges::isAffectedByDropRange(const std::string & new_part_name, std::string & postpone_reason) const
{
if (new_part_name.empty())
return false;
MergeTreePartInfo entry_info = MergeTreePartInfo::fromPartName(new_part_name, format_version);
for (const auto & [znode, drop_range] : drop_ranges)
{
if (!drop_range.isDisjoint(entry_info))
{
postpone_reason = fmt::format("Has DROP RANGE affecting entry {} producing part {}. Will postpone it's execution.", drop_range.getPartName(), new_part_name);
return true;
}
}
return false;
}
bool DropPartsRanges::isAffectedByDropRange(const ReplicatedMergeTreeLogEntry & entry, std::string & postpone_reason) const
{
return isAffectedByDropRange(entry.new_part_name, postpone_reason);
}
void DropPartsRanges::addDropRange(const ReplicatedMergeTreeLogEntryPtr & entry)
{
if (entry->type != ReplicatedMergeTreeLogEntry::DROP_RANGE)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to add entry of type {} to drop ranges, expected DROP_RANGE", entry->typeToString());
MergeTreePartInfo entry_info = MergeTreePartInfo::fromPartName(*entry->getDropRange(format_version), format_version);
drop_ranges.emplace(entry->znode_name, entry_info);
}
void DropPartsRanges::removeDropRange(const ReplicatedMergeTreeLogEntryPtr & entry)
{
if (entry->type != ReplicatedMergeTreeLogEntry::DROP_RANGE)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to remove entry of type {} from drop ranges, expected DROP_RANGE", entry->typeToString());
auto it = drop_ranges.find(entry->znode_name);
assert(it != drop_ranges.end());
drop_ranges.erase(it);
}
bool DropPartsRanges::hasDropRange(const MergeTreePartInfo & new_drop_range_info) const
{
for (const auto & [znode_name, drop_range] : drop_ranges)
{
if (drop_range.contains(new_drop_range_info))
return true;
}
return false;
}
}

View File

@ -0,0 +1,43 @@
#pragma once
#include <unordered_map>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
#include <Storages/MergeTree/ReplicatedMergeTreeLogEntry.h>
namespace DB
{
/// All drop ranges in ReplicatedQueue.
/// Used to postpone execution of entries affected by DROP RANGE
class DropPartsRanges
{
private:
MergeTreeDataFormatVersion format_version;
/// znode_name -> drop_range
std::unordered_map<std::string, MergeTreePartInfo> drop_ranges;
public:
explicit DropPartsRanges(MergeTreeDataFormatVersion format_version_)
: format_version(format_version_)
{}
/// Entry is affected by DROP_RANGE and must be postponed
bool isAffectedByDropRange(const ReplicatedMergeTreeLogEntry & entry, std::string & postpone_reason) const;
/// Part is affected by DROP_RANGE and must be postponed
bool isAffectedByDropRange(const std::string & new_part_name, std::string & postpone_reason) const;
/// Already has equal DROP_RANGE. Don't need to assign new one
bool hasDropRange(const MergeTreePartInfo & new_drop_range_info) const;
/// Add DROP_RANGE to map
void addDropRange(const ReplicatedMergeTreeLogEntryPtr & entry);
/// Remove DROP_RANGE from map
void removeDropRange(const ReplicatedMergeTreeLogEntryPtr & entry);
};
}

View File

@ -431,6 +431,16 @@ std::optional<String> ReplicatedMergeTreeLogEntryData::getDropRange(MergeTreeDat
return {};
}
bool ReplicatedMergeTreeLogEntryData::isDropPart(MergeTreeDataFormatVersion format_version) const
{
if (type == DROP_RANGE)
{
auto drop_range_info = MergeTreePartInfo::fromPartName(new_part_name, format_version);
return !drop_range_info.isFakeDropRangePart();
}
return false;
}
Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormatVersion format_version) const
{
/// Doesn't produce any part
@ -439,7 +449,30 @@ Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormat
/// DROP_RANGE does not add a real part, but we must disable merges in that range
if (type == DROP_RANGE)
{
auto drop_range_part_info = MergeTreePartInfo::fromPartName(new_part_name, format_version);
/// It's DROP PART and we don't want to add it into virtual parts
/// because it can lead to intersecting parts on stale replicas and this
/// problem is fundamental. So we have very weak guarantees for DROP
/// PART. If any concurrent merge will be assigned then DROP PART will
/// delete nothing and part will be successfully merged into bigger part.
///
/// dropPart used in the following cases:
/// 1) Remove empty parts after TTL.
/// 2) Remove parts after move between shards.
/// 3) User queries: ALTER TABLE DROP PART 'part_name'.
///
/// In the first case merge of empty part is even better than DROP. In
/// the second case part UUIDs used to forbid merges for moding parts so
/// there is no problem with concurrent merges. The third case is quite
/// rare and we give very weak guarantee: there will be no active part
/// with this name, but possibly it was merged to some other part.
if (!drop_range_part_info.isFakeDropRangePart())
return {};
return {new_part_name};
}
if (type == REPLACE_RANGE)
{

View File

@ -143,6 +143,10 @@ struct ReplicatedMergeTreeLogEntryData
/// Returns fake part for drop range (for DROP_RANGE and REPLACE_RANGE)
std::optional<String> getDropRange(MergeTreeDataFormatVersion format_version) const;
/// This entry is DROP PART, not DROP PARTITION. They both have same
/// DROP_RANGE entry type, but differs in information about drop range.
bool isDropPart(MergeTreeDataFormatVersion format_version) const;
/// Access under queue_mutex, see ReplicatedMergeTreeQueue.
bool currently_executing = false; /// Whether the action is executing now.
bool removed_by_other_entry = false;

View File

@ -26,6 +26,7 @@ ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree &
, format_version(storage.format_version)
, current_parts(format_version)
, virtual_parts(format_version)
, drop_ranges(format_version)
{
zookeeper_path = storage.zookeeper_path;
replica_path = storage.replica_path;
@ -52,8 +53,8 @@ void ReplicatedMergeTreeQueue::initialize(const MergeTreeData::DataParts & parts
std::lock_guard lock(state_mutex);
for (const auto & part : parts)
{
current_parts.add(part->name, nullptr, log);
virtual_parts.add(part->name, nullptr, log);
current_parts.add(part->name, nullptr);
virtual_parts.add(part->name, nullptr);
}
}
@ -154,7 +155,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
{
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
{
virtual_parts.add(virtual_part_name, nullptr, log);
virtual_parts.add(virtual_part_name, nullptr);
/// Don't add drop range parts to mutations
/// they don't produce any useful parts
if (entry->type != LogEntry::DROP_RANGE)
@ -168,6 +169,13 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
}
else
{
drop_ranges.addDropRange(entry);
/// DROP PART remove parts, so we remove it from virtual parts to
/// preserve invariant virtual_parts = current_parts + queue
if (entry->isDropPart(format_version))
virtual_parts.removePartAndCoveredParts(*entry->getDropRange(format_version));
queue.push_front(entry);
}
@ -248,7 +256,7 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
{
current_parts.add(virtual_part_name, nullptr, log);
current_parts.add(virtual_part_name, nullptr);
/// These parts are already covered by newer part, we don't have to
/// mutate it.
@ -257,10 +265,23 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
if (auto drop_range_part_name = entry->getDropRange(format_version))
{
current_parts.remove(*drop_range_part_name);
MergeTreePartInfo drop_range_info = MergeTreePartInfo::fromPartName(*drop_range_part_name, format_version);
/// DROP PART doesn't have virtual parts so remove from current
/// parts all covered parts.
if (entry->isDropPart(format_version))
current_parts.removePartAndCoveredParts(*drop_range_part_name);
else
current_parts.remove(*drop_range_part_name);
virtual_parts.remove(*drop_range_part_name);
}
if (entry->type == LogEntry::DROP_RANGE)
{
drop_ranges.removeDropRange(entry);
}
if (entry->type == LogEntry::ALTER_METADATA)
{
LOG_TRACE(log, "Finishing metadata alter with version {}", entry->alter_version);
@ -269,6 +290,11 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
}
else
{
if (entry->type == LogEntry::DROP_RANGE)
{
drop_ranges.removeDropRange(entry);
}
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
{
/// Because execution of the entry is unsuccessful,
@ -978,6 +1004,11 @@ bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & pa
{
std::lock_guard lock(state_mutex);
/// FIXME get rid of actual_part_name.
/// If new covering part jumps over DROP_RANGE we should execute drop range first
if (drop_ranges.isAffectedByDropRange(part_name, reject_reason))
return false;
if (isNotCoveredByFuturePartsImpl(entry.znode_name, part_name, reject_reason, lock))
{
CurrentlyExecuting::setActualPartName(entry, part_name, *this);
@ -1003,6 +1034,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
return false;
}
if (entry.type != LogEntry::DROP_RANGE && drop_ranges.isAffectedByDropRange(entry, out_postpone_reason))
return false;
/// Check that fetches pool is not overloaded
if ((entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART)
&& !storage.canExecuteFetch(entry, out_postpone_reason))
@ -2074,6 +2108,12 @@ bool ReplicatedMergeTreeMergePredicate::isMutationFinished(const ReplicatedMerge
return true;
}
bool ReplicatedMergeTreeMergePredicate::hasDropRange(const MergeTreePartInfo & new_drop_range_info) const
{
std::lock_guard lock(queue.state_mutex);
return queue.drop_ranges.hasDropRange(new_drop_range_info);
}
ReplicatedMergeTreeQueue::SubscriberHandler
ReplicatedMergeTreeQueue::addSubscriber(ReplicatedMergeTreeQueue::SubscriberCallBack && callback)

View File

@ -11,6 +11,7 @@
#include <Storages/MergeTree/PinnedPartUUIDs.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumAddedParts.h>
#include <Storages/MergeTree/ReplicatedMergeTreeAltersSequence.h>
#include <Storages/MergeTree/DropPartsRanges.h>
#include <Common/ZooKeeper/ZooKeeper.h>
@ -100,6 +101,10 @@ private:
*/
ActiveDataPartSet virtual_parts;
/// Dropped ranges inserted into queue
DropPartsRanges drop_ranges;
/// A set of mutations loaded from ZooKeeper.
/// mutations_by_partition is an index partition ID -> block ID -> mutation into this set.
/// Note that mutations are updated in such a way that they are always more recent than
@ -475,6 +480,8 @@ public:
/// The version of "log" node that is used to check that no new merges have appeared.
int32_t getVersion() const { return merges_version; }
bool hasDropRange(const MergeTreePartInfo & new_drop_range_info) const;
private:
const ReplicatedMergeTreeQueue & queue;

View File

@ -2104,6 +2104,10 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
try
{
String part_name = entry.actual_new_part_name.empty() ? entry.new_part_name : entry.actual_new_part_name;
if (!entry.actual_new_part_name.empty())
LOG_DEBUG(log, "Will fetch part {} instead of {}", entry.actual_new_part_name, entry.new_part_name);
if (!fetchPart(part_name, metadata_snapshot, fs::path(zookeeper_path) / "replicas" / replica, false, entry.quorum))
return false;
}
@ -6986,6 +6990,14 @@ bool StorageReplicatedMergeTree::dropPartImpl(
return false;
}
if (merge_pred.hasDropRange(part->info))
{
if (throw_if_noop)
throw Exception("Already has DROP RANGE for part " + part_name + " in queue.", ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
return false;
}
/// There isn't a lot we can do otherwise. Can't cancel merges because it is possible that a replica already
/// finished the merge.
if (partIsAssignedToBackgroundOperation(part))

View File

@ -688,6 +688,24 @@ private:
bool fetch_part,
ContextPtr query_context) override;
/// NOTE: there are no guarantees for concurrent merges. Dropping part can
/// be concurrently merged into some covering part and dropPart will do
/// nothing. There are some fundamental problems with it. But this is OK
/// because:
///
/// dropPart used in the following cases:
/// 1) Remove empty parts after TTL.
/// 2) Remove parts after move between shards.
/// 3) User queries: ALTER TABLE DROP PART 'part_name'.
///
/// In the first case merge of empty part is even better than DROP. In the
/// second case part UUIDs used to forbid merges for moving parts so there
/// is no problem with concurrent merges. The third case is quite rare and
/// we give very weak guarantee: there will be no active part with this
/// name, but possibly it was merged to some other part.
///
/// NOTE: don't rely on dropPart if you 100% need to remove non-empty part
/// and don't use any explicit locking mechanism for merges.
bool dropPartImpl(zkutil::ZooKeeperPtr & zookeeper, String part_name, LogEntry & entry, bool detach, bool throw_if_noop);
/// Check granularity of already existing replicated table in zookeeper if it exists

View File

@ -30,6 +30,7 @@ SRCS(
MergeTree/BackgroundJobsExecutor.cpp
MergeTree/BoolMask.cpp
MergeTree/DataPartsExchange.cpp
MergeTree/DropPartsRanges.cpp
MergeTree/EphemeralLockInZooKeeper.cpp
MergeTree/IMergeTreeDataPart.cpp
MergeTree/IMergeTreeDataPartWriter.cpp

View File

@ -85,12 +85,23 @@ function optimize_thread()
done
}
function drop_part_thread()
{
while true; do
REPLICA=$(($RANDOM % 16))
part=$($CLICKHOUSE_CLIENT -q "SELECT name FROM system.parts WHERE active AND database='$CLICKHOUSE_DATABASE' and table='dst_$REPLICA' ORDER BY rand() LIMIT 1")
$CLICKHOUSE_CLIENT -q "ALTER TABLE dst_$REPLICA DROP PART '$part'" 2>/dev/null
sleep 0.$RANDOM;
done
}
#export -f create_drop_thread;
export -f insert_thread;
export -f move_partition_src_dst_thread;
export -f replace_partition_src_src_thread;
export -f drop_partition_thread;
export -f optimize_thread;
export -f drop_part_thread;
TIMEOUT=60
@ -102,6 +113,7 @@ timeout $TIMEOUT bash -c move_partition_src_dst_thread &
timeout $TIMEOUT bash -c replace_partition_src_src_thread &
timeout $TIMEOUT bash -c drop_partition_thread &
timeout $TIMEOUT bash -c optimize_thread &
timeout $TIMEOUT bash -c drop_part_thread &
wait
for ((i=0; i<16; i++)) do