ClickHouse/tests/ci/cherry_pick_utils/query.py

533 lines
15 KiB
Python

# -*- coding: utf-8 -*-
import json
import inspect
import logging
import time
from urllib3.util.retry import Retry # type: ignore
import requests # type: ignore
from requests.adapters import HTTPAdapter # type: ignore
class Query:
"""
Implements queries to the Github API using GraphQL
"""
_PULL_REQUEST = """
author {{
... on User {{
id
login
}}
}}
baseRepository {{
nameWithOwner
}}
mergeCommit {{
oid
parents(first: {min_page_size}) {{
totalCount
nodes {{
oid
}}
}}
}}
mergedBy {{
... on User {{
id
login
}}
}}
baseRefName
closed
headRefName
id
mergeable
merged
number
title
url
"""
def __init__(self, token, owner, name, team, max_page_size=100, min_page_size=10):
self._PULL_REQUEST = Query._PULL_REQUEST.format(min_page_size=min_page_size)
self._token = token
self._owner = owner
self._name = name
self._team = team
self._session = None
self._max_page_size = max_page_size
self._min_page_size = min_page_size
self.api_costs = {}
repo = self.get_repository()
self._id = repo["id"]
self.ssh_url = repo["sshUrl"]
self.default_branch = repo["defaultBranchRef"]["name"]
self.members = set(self.get_members())
def get_repository(self):
_QUERY = """
repository(owner: "{owner}" name: "{name}") {{
defaultBranchRef {{
name
}}
id
sshUrl
}}
"""
query = _QUERY.format(owner=self._owner, name=self._name)
return self._run(query)["repository"]
def get_members(self):
"""Get all team members for organization
Returns:
members: a map of members' logins to ids
"""
_QUERY = """
organization(login: "{organization}") {{
team(slug: "{team}") {{
members(first: {max_page_size} {next}) {{
pageInfo {{
hasNextPage
endCursor
}}
nodes {{
id
login
}}
}}
}}
}}
"""
members = {}
not_end = True
query = _QUERY.format(
organization=self._owner,
team=self._team,
max_page_size=self._max_page_size,
next="",
)
while not_end:
result = self._run(query)["organization"]["team"]
if result is None:
break
result = result["members"]
not_end = result["pageInfo"]["hasNextPage"]
query = _QUERY.format(
organization=self._owner,
team=self._team,
max_page_size=self._max_page_size,
next=f'after: "{result["pageInfo"]["endCursor"]}"',
)
# Update members with new nodes compatible with py3.8-py3.10
members = {
**members,
**{node["login"]: node["id"] for node in result["nodes"]},
}
return members
def get_pull_request(self, number):
_QUERY = """
repository(owner: "{owner}" name: "{name}") {{
pullRequest(number: {number}) {{
{pull_request_data}
}}
}}
"""
query = _QUERY.format(
owner=self._owner,
name=self._name,
number=number,
pull_request_data=self._PULL_REQUEST,
min_page_size=self._min_page_size,
)
return self._run(query)["repository"]["pullRequest"]
def find_pull_request(self, base, head):
_QUERY = """
repository(owner: "{owner}" name: "{name}") {{
pullRequests(
first: {min_page_size} baseRefName: "{base}" headRefName: "{head}"
) {{
nodes {{
{pull_request_data}
}}
totalCount
}}
}}
"""
query = _QUERY.format(
owner=self._owner,
name=self._name,
base=base,
head=head,
pull_request_data=self._PULL_REQUEST,
min_page_size=self._min_page_size,
)
result = self._run(query)["repository"]["pullRequests"]
if result["totalCount"] > 0:
return result["nodes"][0]
else:
return {}
def find_pull_requests(self, label_name):
"""
Get all pull-requests filtered by label name
"""
_QUERY = """
repository(owner: "{owner}" name: "{name}") {{
pullRequests(first: {min_page_size} labels: "{label_name}" states: OPEN) {{
nodes {{
{pull_request_data}
}}
}}
}}
"""
query = _QUERY.format(
owner=self._owner,
name=self._name,
label_name=label_name,
pull_request_data=self._PULL_REQUEST,
min_page_size=self._min_page_size,
)
return self._run(query)["repository"]["pullRequests"]["nodes"]
def get_pull_requests(self, before_commit):
"""
Get all merged pull-requests from the HEAD of default branch to the last commit (excluding)
"""
_QUERY = """
repository(owner: "{owner}" name: "{name}") {{
defaultBranchRef {{
target {{
... on Commit {{
history(first: {max_page_size} {next}) {{
pageInfo {{
hasNextPage
endCursor
}}
nodes {{
oid
associatedPullRequests(first: {min_page_size}) {{
totalCount
nodes {{
... on PullRequest {{
{pull_request_data}
labels(first: {min_page_size}) {{
totalCount
pageInfo {{
hasNextPage
endCursor
}}
nodes {{
name
color
}}
}}
}}
}}
}}
}}
}}
}}
}}
}}
}}
"""
pull_requests = []
not_end = True
query = _QUERY.format(
owner=self._owner,
name=self._name,
max_page_size=self._max_page_size,
min_page_size=self._min_page_size,
pull_request_data=self._PULL_REQUEST,
next="",
)
while not_end:
result = self._run(query)["repository"]["defaultBranchRef"]["target"][
"history"
]
not_end = result["pageInfo"]["hasNextPage"]
query = _QUERY.format(
owner=self._owner,
name=self._name,
max_page_size=self._max_page_size,
min_page_size=self._min_page_size,
pull_request_data=self._PULL_REQUEST,
next=f'after: "{result["pageInfo"]["endCursor"]}"',
)
for commit in result["nodes"]:
# FIXME: maybe include `before_commit`?
if str(commit["oid"]) == str(before_commit):
not_end = False
break
# TODO: fetch all pull-requests that were merged in a single commit.
assert (
commit["associatedPullRequests"]["totalCount"]
<= self._min_page_size
)
for pull_request in commit["associatedPullRequests"]["nodes"]:
if (
pull_request["baseRepository"]["nameWithOwner"]
== f"{self._owner}/{self._name}"
and pull_request["baseRefName"] == self.default_branch
and pull_request["mergeCommit"]["oid"] == commit["oid"]
):
pull_requests.append(pull_request)
return pull_requests
def create_pull_request(
self, source, target, title, description="", draft=False, can_modify=True
):
_QUERY = """
createPullRequest(input: {{
baseRefName: "{target}",
headRefName: "{source}",
repositoryId: "{id}",
title: "{title}",
body: "{body}",
draft: {draft},
maintainerCanModify: {modify}
}}) {{
pullRequest {{
{pull_request_data}
}}
}}
"""
query = _QUERY.format(
target=target,
source=source,
id=self._id,
title=title,
body=description,
draft="true" if draft else "false",
modify="true" if can_modify else "false",
pull_request_data=self._PULL_REQUEST,
)
return self._run(query, is_mutation=True)["createPullRequest"]["pullRequest"]
def merge_pull_request(self, pr_id):
_QUERY = """
mergePullRequest(input: {{
pullRequestId: "{pr_id}"
}}) {{
pullRequest {{
{pull_request_data}
}}
}}
"""
query = _QUERY.format(pr_id=pr_id, pull_request_data=self._PULL_REQUEST)
return self._run(query, is_mutation=True)["mergePullRequest"]["pullRequest"]
# FIXME: figure out how to add more assignees at once
def add_assignee(self, pr, assignee):
_QUERY = """
addAssigneesToAssignable(input: {{
assignableId: "{id1}",
assigneeIds: "{id2}"
}}) {{
clientMutationId
}}
"""
query = _QUERY.format(id1=pr["id"], id2=assignee["id"])
self._run(query, is_mutation=True)
def set_label(self, pull_request, label_name):
"""
Set label by name to the pull request
Args:
pull_request: JSON object returned by `get_pull_requests()`
label_name (string): label name
"""
_GET_LABEL = """
repository(owner: "{owner}" name: "{name}") {{
labels(first: {max_page_size} {next} query: "{label_name}") {{
pageInfo {{
hasNextPage
endCursor
}}
nodes {{
id
name
color
}}
}}
}}
"""
_SET_LABEL = """
addLabelsToLabelable(input: {{
labelableId: "{pr_id}",
labelIds: "{label_id}"
}}) {{
clientMutationId
}}
"""
labels = []
not_end = True
query = _GET_LABEL.format(
owner=self._owner,
name=self._name,
label_name=label_name,
max_page_size=self._max_page_size,
next="",
)
while not_end:
result = self._run(query)["repository"]["labels"]
not_end = result["pageInfo"]["hasNextPage"]
query = _GET_LABEL.format(
owner=self._owner,
name=self._name,
label_name=label_name,
max_page_size=self._max_page_size,
next=f'after: "{result["pageInfo"]["endCursor"]}"',
)
labels += list(result["nodes"])
if not labels:
return
query = _SET_LABEL.format(pr_id=pull_request["id"], label_id=labels[0]["id"])
self._run(query, is_mutation=True)
@property
def session(self):
if self._session is not None:
return self._session
retries = 5
self._session = requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=1,
status_forcelist=(403, 500, 502, 504),
)
adapter = HTTPAdapter(max_retries=retry)
self._session.mount("http://", adapter)
self._session.mount("https://", adapter)
return self._session
def _run(self, query, is_mutation=False):
# Get caller and parameters from the stack to track the progress
frame = inspect.getouterframes(inspect.currentframe(), 2)[1]
caller = frame[3]
f_parameters = inspect.signature(getattr(self, caller)).parameters
parameters = ", ".join(str(frame[0].f_locals[p]) for p in f_parameters)
mutation = ""
if is_mutation:
mutation = ", is mutation"
print(f"---GraphQL request for {caller}({parameters}){mutation}---")
headers = {"Authorization": f"bearer {self._token}"}
if is_mutation:
query = f"""
mutation {{
{query}
}}
"""
else:
query = f"""
query {{
{query}
rateLimit {{
cost
remaining
}}
}}
"""
def request_with_retry(retry=0):
max_retries = 5
# From time to time we face some concrete errors, when it worth to
# retry instead of failing competely
# We should sleep progressively
progressive_sleep = 5 * sum(i + 1 for i in range(retry))
if progressive_sleep:
logging.warning(
"Retry GraphQL request %s time, sleep %s seconds",
retry,
progressive_sleep,
)
time.sleep(progressive_sleep)
response = self.session.post(
"https://api.github.com/graphql", json={"query": query}, headers=headers
)
result = response.json()
if response.status_code == 200:
if "errors" in result:
raise Exception(
f"Errors occurred: {result['errors']}\nOriginal query: {query}"
)
if not is_mutation:
if caller not in self.api_costs:
self.api_costs[caller] = 0
self.api_costs[caller] += result["data"]["rateLimit"]["cost"]
return result["data"]
elif (
response.status_code == 403
and "secondary rate limit" in result["message"]
):
if retry <= max_retries:
logging.warning("Secondary rate limit reached")
return request_with_retry(retry + 1)
elif response.status_code == 502 and "errors" in result:
too_many_data = any(
True
for err in result["errors"]
if "message" in err
and "This may be the result of a timeout" in err["message"]
)
if too_many_data:
logging.warning(
"Too many data is requested, decreasing page size %s by 10%%",
self._max_page_size,
)
self._max_page_size = int(self._max_page_size * 0.9)
return request_with_retry(retry)
data = json.dumps(result, indent=4)
raise Exception(f"Query failed with code {response.status_code}:\n{data}")
return request_with_retry()