remove an unused argument

This commit is contained in:
Alexander Tokmakov 2023-02-13 14:46:46 +01:00
parent 8265db80ff
commit 69f579b8ec
3 changed files with 12 additions and 32 deletions

View File

@ -1107,8 +1107,7 @@ bool ReplicatedMergeTreeQueue::checkReplaceRangeCanBeRemoved(const MergeTreePart
void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
zkutil::ZooKeeperPtr zookeeper,
const MergeTreePartInfo & part_info,
const std::optional<ReplicatedMergeTreeLogEntryData> & covering_entry,
const String & fetch_entry_znode)
const std::optional<ReplicatedMergeTreeLogEntryData> & covering_entry)
{
/// TODO is it possible to simplify it?
Queue to_wait;
@ -1122,40 +1121,22 @@ void ReplicatedMergeTreeQueue::removePartProducingOpsInRange(
[[maybe_unused]] bool called_from_alter_query_directly = covering_entry && covering_entry->replace_range_entry
&& covering_entry->replace_range_entry->columns_version < 0;
[[maybe_unused]] bool called_for_broken_part = !covering_entry;
assert(currently_executing_drop_replace_ranges.contains(part_info) || called_from_alter_query_directly || called_for_broken_part || !fetch_entry_znode.empty());
auto is_simple_part_producing_op = [](const ReplicatedMergeTreeLogEntryData & data)
{
return data.type == LogEntry::GET_PART ||
data.type == LogEntry::ATTACH_PART ||
data.type == LogEntry::MERGE_PARTS ||
data.type == LogEntry::MUTATE_PART;
};
assert(currently_executing_drop_replace_ranges.contains(part_info) || called_from_alter_query_directly || called_for_broken_part);
for (Queue::iterator it = queue.begin(); it != queue.end();)
{
/// Skipping currently processing entry
if (!fetch_entry_znode.empty() && (*it)->znode_name == fetch_entry_znode)
{
++it;
continue;
}
bool is_simple_producing_op = is_simple_part_producing_op(**it);
auto type = (*it)->type;
bool is_simple_producing_op = type == LogEntry::GET_PART ||
type == LogEntry::ATTACH_PART ||
type == LogEntry::MERGE_PARTS ||
type == LogEntry::MUTATE_PART;
bool simple_op_covered = is_simple_producing_op && part_info.contains(MergeTreePartInfo::fromPartName((*it)->new_part_name, format_version));
bool replace_range_covered = covering_entry && checkReplaceRangeCanBeRemoved(part_info, *it, *covering_entry);
if (simple_op_covered || replace_range_covered)
{
if ((*it)->currently_executing)
{
bool is_covered_by_simple_op = covering_entry && is_simple_part_producing_op(*covering_entry);
bool is_fetching_covering_part = !fetch_entry_znode.empty();
if (is_covered_by_simple_op || is_fetching_covering_part)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot remove covered entry {} producing parts {}, it's a bug",
(*it)->znode_name, fmt::join((*it)->getVirtualPartNames(format_version), ", "));
to_wait.push_back(*it);
}
auto code = zookeeper->tryRemove(fs::path(replica_path) / "queue" / (*it)->znode_name);
if (code != Coordination::Error::ZOK)
LOG_INFO(log, "Couldn't remove {}: {}", (fs::path(replica_path) / "queue" / (*it)->znode_name).string(), Coordination::errorMessage(code));

View File

@ -340,8 +340,7 @@ public:
*/
void removePartProducingOpsInRange(zkutil::ZooKeeperPtr zookeeper,
const MergeTreePartInfo & part_info,
const std::optional<ReplicatedMergeTreeLogEntryData> & covering_entry,
const String & fetch_entry_znode);
const std::optional<ReplicatedMergeTreeLogEntryData> & covering_entry);
/** In the case where there are not enough parts to perform the merge in part_name
* - move actions with merged parts to the end of the queue

View File

@ -1888,7 +1888,7 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range_info.partition_id, drop_range_info.max_block);
{
auto pause_checking_parts = part_check_thread.pausePartsCheck();
queue.removePartProducingOpsInRange(getZooKeeper(), drop_range_info, entry, /* fetch_entry_znode= */ {});
queue.removePartProducingOpsInRange(getZooKeeper(), drop_range_info, entry);
part_check_thread.cancelRemovedPartsCheck(drop_range_info);
}
@ -1967,7 +1967,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
{
getContext()->getMergeList().cancelInPartition(getStorageID(), drop_range.partition_id, drop_range.max_block);
auto pause_checking_parts = part_check_thread.pausePartsCheck();
queue.removePartProducingOpsInRange(getZooKeeper(), drop_range, entry, /* fetch_entry_znode= */ {});
queue.removePartProducingOpsInRange(getZooKeeper(), drop_range, entry);
part_check_thread.cancelRemovedPartsCheck(drop_range);
}
else
@ -3512,7 +3512,7 @@ void StorageReplicatedMergeTree::removePartAndEnqueueFetch(const String & part_n
/// so GET_PART all_1_42_5 (and all source parts) is useless. The only thing we can do is to fetch all_1_42_5_63.
/// 2. If all_1_42_5_63 is lost, then replication may stuck waiting for all_1_42_5_63 to appear,
/// because we may have some covered parts (more precisely, parts with the same min and max blocks)
queue.removePartProducingOpsInRange(zookeeper, broken_part_info, /* covering_entry= */ {}, /* fetch_entry_znode= */ {});
queue.removePartProducingOpsInRange(zookeeper, broken_part_info, /* covering_entry= */ {});
String part_path = fs::path(replica_path) / "parts" / part_name;