do not crash on intersecting parts

This commit is contained in:
Alexander Tokmakov 2021-06-01 16:25:23 +03:00
parent cdd46aa117
commit 5969891611
9 changed files with 93 additions and 68 deletions

View File

@ -1,6 +1,8 @@
#include <Storages/MergeTree/ActiveDataPartSet.h>
#include <Common/Exception.h>
#include <common/logger_useful.h>
#include <algorithm>
#include <cassert>
namespace DB
@ -18,8 +20,8 @@ ActiveDataPartSet::ActiveDataPartSet(MergeTreeDataFormatVersion format_version_,
add(name);
}
bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts)
/// FIXME replace warnings with logical errors
bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts, Poco::Logger * log)
{
/// TODO make it exception safe (out_replaced_parts->push_back(...) may throw)
auto part_info = MergeTreePartInfo::fromPartName(name, format_version);
@ -40,7 +42,10 @@ bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts)
if (!part_info.contains(it->first))
{
if (!part_info.isDisjoint(it->first))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects previous part {}. It is a bug.", name, it->first.getPartName());
{
LOG_ERROR(log, "Part {} intersects previous part {}. It is a bug.", name, it->first.getPartName());
assert(false);
}
++it;
break;
}
@ -56,15 +61,17 @@ bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts)
/// Let's go to the right.
while (it != part_info_to_name.end() && part_info.contains(it->first))
{
if (part_info == it->first)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected duplicate part {}. It is a bug.", name);
assert(part_info != it->first);
if (out_replaced_parts)
out_replaced_parts->push_back(it->second);
part_info_to_name.erase(it++);
}
if (it != part_info_to_name.end() && !part_info.isDisjoint(it->first))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects next part {}. It is a bug.", name, it->first.getPartName());
{
LOG_ERROR(log, "Part {} intersects next part {}. It is a bug.", name, it->first.getPartName());
assert(false);
}
part_info_to_name.emplace(part_info, name);
return true;

View File

@ -5,6 +5,10 @@
#include <map>
#include <vector>
namespace Poco
{
class Logger;
}
namespace DB
{
@ -46,7 +50,7 @@ public:
/// Returns true if the part was actually added. If out_replaced_parts != nullptr, it will contain
/// parts that were replaced from the set by the newly added part.
bool add(const String & name, Strings * out_replaced_parts = nullptr);
bool add(const String & name, Strings * out_replaced_parts = nullptr, Poco::Logger * log = nullptr);
bool remove(const MergeTreePartInfo & part_info)
{

View File

@ -4897,7 +4897,11 @@ void MergeTreeData::removeQueryId(const String & query_id) const
{
std::lock_guard lock(query_id_set_mutex);
if (query_id_set.find(query_id) == query_id_set.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "We have query_id removed but it's not recorded. This is a bug");
{
/// Do not throw exception, because this method is used in destructor.
LOG_WARNING(log, "We have query_id removed but it's not recorded. This is a bug");
assert(false);
}
else
query_id_set.erase(query_id);
}

View File

@ -413,6 +413,24 @@ ReplicatedMergeTreeLogEntry::Ptr ReplicatedMergeTreeLogEntry::parse(const String
return res;
}
std::optional<String> ReplicatedMergeTreeLogEntryData::getDropRange(MergeTreeDataFormatVersion format_version) const
{
if (type == DROP_RANGE)
return new_part_name;
if (type == REPLACE_RANGE)
{
auto drop_range_info = MergeTreePartInfo::fromPartName(replace_range_entry->drop_range_part_name, format_version);
if (!ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range_info))
{
/// It's REPLACE, not MOVE or ATTACH, so drop range is real
return replace_range_entry->drop_range_part_name;
}
}
return {};
}
Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormatVersion format_version) const
{
/// Doesn't produce any part
@ -431,11 +449,8 @@ Strings ReplicatedMergeTreeLogEntryData::getVirtualPartNames(MergeTreeDataFormat
{
Strings res = replace_range_entry->new_part_names;
auto drop_range_info = MergeTreePartInfo::fromPartName(replace_range_entry->drop_range_part_name, format_version);
if (!ReplaceRangeEntry::isMovePartitionOrAttachFrom(drop_range_info))
{
/// It's REPLACE, not MOVE or ATTACH, so drop range is real
res.emplace_back(replace_range_entry->drop_range_part_name);
}
if (auto drop_range = getDropRange(format_version))
res.emplace_back(*drop_range);
return res;
}

View File

@ -152,6 +152,9 @@ struct ReplicatedMergeTreeLogEntryData
return res;
}
/// Returns fake part for drop range (for DROP_RANGE and REPLACE_RANGE)
std::optional<String> getDropRange(MergeTreeDataFormatVersion format_version) const;
/// Access under queue_mutex, see ReplicatedMergeTreeQueue.
bool currently_executing = false; /// Whether the action is executing now.
bool removed_by_other_entry = false;

View File

@ -191,7 +191,7 @@ void ReplicatedMergeTreePartCheckThread::searchForMissingPartAndFetchIfPossible(
if (missing_part_search_result == MissingPartSearchResult::LostForever)
{
/// Is it in the replication queue? If there is - delete, because the task can not be processed.
if (!storage.queue.markPartAsLostForever(zookeeper, part_name))
if (!storage.queue.remove(zookeeper, part_name))
{
/// The part was not in our queue.
LOG_WARNING(log, "Missing part {} is not in our queue, this can happen rarely.", part_name);

View File

@ -34,24 +34,29 @@ ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree &
}
void ReplicatedMergeTreeQueue::clear()
{
auto locks = lockQueue();
assert(future_parts.empty());
current_parts.clear();
virtual_parts.clear();
queue.clear();
inserts_by_time.clear();
mutations_by_znode.clear();
mutations_by_partition.clear();
mutation_pointer.clear();
}
void ReplicatedMergeTreeQueue::initialize(const MergeTreeData::DataParts & parts)
{
addVirtualParts(parts);
}
void ReplicatedMergeTreeQueue::addVirtualParts(const MergeTreeData::DataParts & parts)
{
std::lock_guard lock(state_mutex);
for (const auto & part : parts)
{
current_parts.add(part->name);
virtual_parts.add(part->name);
current_parts.add(part->name, nullptr, log);
virtual_parts.add(part->name, nullptr, log);
}
}
bool ReplicatedMergeTreeQueue::isVirtualPart(const MergeTreeData::DataPartPtr & data_part) const
{
std::lock_guard lock(state_mutex);
@ -74,9 +79,6 @@ bool ReplicatedMergeTreeQueue::load(zkutil::ZooKeeperPtr zookeeper)
/// Reset batch size on initialization to recover from possible errors of too large batch size.
current_multi_batch_size = 1;
String log_pointer_str = zookeeper->get(replica_path + "/log_pointer");
log_pointer = log_pointer_str.empty() ? 0 : parse<UInt64>(log_pointer_str);
std::unordered_set<String> already_loaded_paths;
{
std::lock_guard lock(state_mutex);
@ -134,7 +136,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
{
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
{
virtual_parts.add(virtual_part_name);
virtual_parts.add(virtual_part_name, nullptr, log);
addPartToMutations(virtual_part_name);
}
@ -221,23 +223,17 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
for (const String & virtual_part_name : entry->getVirtualPartNames(format_version))
{
current_parts.add(virtual_part_name);
current_parts.add(virtual_part_name, nullptr, log);
/// These parts are already covered by newer part, we don't have to
/// mutate it.
removeCoveredPartsFromMutations(virtual_part_name, /*remove_part = */ false, /*remove_covered_parts = */ true);
}
String drop_range_part_name;
if (entry->type == LogEntry::DROP_RANGE)
drop_range_part_name = entry->new_part_name;
else if (entry->type == LogEntry::REPLACE_RANGE)
drop_range_part_name = entry->replace_range_entry->drop_range_part_name;
if (!drop_range_part_name.empty())
if (auto drop_range_part_name = entry->getDropRange(format_version))
{
current_parts.remove(drop_range_part_name);
virtual_parts.remove(drop_range_part_name);
current_parts.remove(*drop_range_part_name);
virtual_parts.remove(*drop_range_part_name);
}
if (entry->type == LogEntry::ALTER_METADATA)
@ -302,9 +298,7 @@ void ReplicatedMergeTreeQueue::addPartToMutations(const String & part_name)
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
/// Do not add special virtual parts to parts_to_do
auto max_level = MergeTreePartInfo::MAX_LEVEL; /// DROP/DETACH PARTITION
auto another_max_level = std::numeric_limits<decltype(part_info.level)>::max(); /// REPLACE/MOVE PARTITION
if (part_info.level == max_level || part_info.level == another_max_level)
if (part_info.isFakeDropRangePart())
return;
auto in_partition = mutations_by_partition.find(part_info.partition_id);
@ -344,7 +338,9 @@ void ReplicatedMergeTreeQueue::updateTimesInZooKeeper(
auto code = zookeeper->tryMulti(ops, responses);
if (code != Coordination::Error::ZOK)
LOG_ERROR(log, "Couldn't set value of nodes for insert times ({}/min_unprocessed_insert_time, max_processed_insert_time): {}. This shouldn't happen often.", replica_path, Coordination::errorMessage(code));
LOG_ERROR(log, "Couldn't set value of nodes for insert times "
"({}/min_unprocessed_insert_time, max_processed_insert_time): {}. "
"This shouldn't happen often.", replica_path, Coordination::errorMessage(code));
}
}
@ -392,7 +388,8 @@ void ReplicatedMergeTreeQueue::removeProcessedEntry(zkutil::ZooKeeperPtr zookeep
}
if (!found && need_remove_from_zk)
throw Exception("Can't find " + entry->znode_name + " in the memory queue. It is a bug", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find {} in the memory queue. It is a bug. Entry: {}",
entry->znode_name, entry->toString());
notifySubscribers(queue_size);
@ -434,7 +431,7 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri
{
auto part_in_current_parts = current_parts.getContainingPart(source_part);
if (part_in_current_parts == source_part)
virtual_parts.add(source_part);
virtual_parts.add(source_part, nullptr, log);
}
}
@ -462,8 +459,9 @@ bool ReplicatedMergeTreeQueue::remove(zkutil::ZooKeeperPtr zookeeper, const Stri
}
bool ReplicatedMergeTreeQueue::removeFromVirtualParts(const MergeTreePartInfo & part_info)
bool ReplicatedMergeTreeQueue::removeFailedQuorumPart(const MergeTreePartInfo & part_info)
{
assert(part_info.level == 0);
std::lock_guard lock(state_mutex);
return virtual_parts.remove(part_info);
}
@ -587,8 +585,6 @@ int32_t ReplicatedMergeTreeQueue::pullLogsToQueue(zkutil::ZooKeeperPtr zookeeper
{
std::lock_guard state_lock(state_mutex);
log_pointer = last_entry_index + 1;
for (size_t copied_entry_idx = 0, num_copied_entries = copied_entries.size(); copied_entry_idx < num_copied_entries; ++copied_entry_idx)
{
String path_created = dynamic_cast<const Coordination::CreateResponse &>(*responses[copied_entry_idx]).path_created;
@ -758,9 +754,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
/// Such parts do not exist and will never appear, so we should not add virtual parts to parts_to_do list.
/// Fortunately, it's easy to distinguish virtual parts from normal parts by part level.
/// See StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(...)
auto max_level = MergeTreePartInfo::MAX_LEVEL; /// DROP/DETACH PARTITION
auto another_max_level = std::numeric_limits<decltype(part_info.level)>::max(); /// REPLACE/MOVE PARTITION
if (part_info.level == max_level || part_info.level == another_max_level)
if (part_info.isFakeDropRangePart())
continue;
auto it = entry->block_numbers.find(part_info.partition_id);
@ -941,9 +935,6 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
if ((*it)->currently_executing)
to_wait.push_back(*it);
auto code = zookeeper->tryRemove(replica_path + "/queue/" + (*it)->znode_name);
/// FIXME it's probably unsafe to remove entries non-atomically
/// when this method called directly from alter query (not from replication queue task),
/// because entries will be lost if ALTER fails.
if (code != Coordination::Error::ZOK)
LOG_INFO(log, "Couldn't remove {}: {}", replica_path + "/queue/" + (*it)->znode_name, Coordination::errorMessage(code));
@ -1259,7 +1250,9 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::CurrentlyExecuting(const Replicate
for (const String & new_part_name : entry->getBlockingPartNames(queue.format_version))
{
if (!queue.future_parts.emplace(new_part_name, entry).second)
throw Exception("Tagging already tagged future part " + new_part_name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Tagging already tagged future part {}. This is a bug. "
"It happened on attempt to execute {}: {}",
new_part_name, entry->znode_name, entry->toString());
}
}
@ -1277,7 +1270,9 @@ void ReplicatedMergeTreeQueue::CurrentlyExecuting::setActualPartName(ReplicatedM
return;
if (!queue.future_parts.emplace(entry.actual_new_part_name, entry.shared_from_this()).second)
throw Exception("Attaching already existing future part " + entry.actual_new_part_name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attaching already existing future part {}. This is a bug. "
"It happened on attempt to execute {}: {}",
entry.actual_new_part_name, entry.znode_name, entry.toString());
}
@ -1296,13 +1291,19 @@ ReplicatedMergeTreeQueue::CurrentlyExecuting::~CurrentlyExecuting()
for (const String & new_part_name : entry->getBlockingPartNames(queue.format_version))
{
if (!queue.future_parts.erase(new_part_name))
{
LOG_ERROR(queue.log, "Untagging already untagged future part {}. This is a bug.", new_part_name);
assert(false);
}
}
if (!entry->actual_new_part_name.empty())
{
if (entry->actual_new_part_name != entry->new_part_name && !queue.future_parts.erase(entry->actual_new_part_name))
{
LOG_ERROR(queue.log, "Untagging already untagged future part {}. This is a bug.", entry->actual_new_part_name);
assert(false);
}
entry->actual_new_part_name.clear();
}

View File

@ -92,9 +92,6 @@ private:
using FuturePartsSet = std::map<String, LogEntryPtr>;
FuturePartsSet future_parts;
/// Index of the first log entry that we didn't see yet.
Int64 log_pointer = 0;
/// Avoid parallel execution of queue enties, which may remove other entries from the queue.
bool currently_executing_drop_or_replace_range = false;
@ -183,9 +180,6 @@ private:
/// Ensures that only one thread is simultaneously updating mutations.
std::mutex update_mutations_mutex;
/// Put a set of (already existing) parts in virtual_parts.
void addVirtualParts(const MergeTreeData::DataParts & parts);
/// Insert new entry from log into queue
void insertUnlocked(
const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed,
@ -275,7 +269,10 @@ public:
ReplicatedMergeTreeQueue(StorageReplicatedMergeTree & storage_, ReplicatedMergeTreeMergeStrategyPicker & merge_strategy_picker_);
~ReplicatedMergeTreeQueue();
/// Clears queue state
void clear();
/// Put a set of (already existing) parts in virtual_parts.
void initialize(const MergeTreeData::DataParts & parts);
/** Inserts an action to the end of the queue.
@ -295,7 +292,7 @@ public:
*/
bool load(zkutil::ZooKeeperPtr zookeeper);
bool removeFromVirtualParts(const MergeTreePartInfo & part_info);
bool removeFailedQuorumPart(const MergeTreePartInfo & part_info);
/** Copy the new entries from the shared log to the queue of this replica. Set the log_pointer to the appropriate value.
* If watch_callback is not empty, will call it when new entries appear in the log.

View File

@ -1465,12 +1465,6 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
return true;
}
if (entry.type == LogEntry::PART_IS_LOST)
{
queue.executePartIsLost(getZooKeeper(), entry);
return true;
}
const bool is_get_or_attach = entry.type == LogEntry::GET_PART || entry.type == LogEntry::ATTACH_PART;
if (is_get_or_attach || entry.type == LogEntry::MERGE_PARTS || entry.type == LogEntry::MUTATE_PART)