Use Dr.CI results to classify flaky failures in trymerge (#110054)

After https://github.com/pytorch/test-infra/pull/4589, we can now query Dr.CI to get the list of flaky failures there.  This change queries Dr.CI API endpoint and check if the failure is a flaky one using `is_flaky` function.

Because the change is relatively large, I'm breaking it down to several smaller PRs in this order:

* [x] This PR queries Dr.CI and adds `is_flaky` check
* [ ] Clean up the flaky rules logic because it has already been implemented on Dr. CI
* [ ] Clean up the broken trunk logic for the same reason

### Testing

* Create a new `drci_mocks.json` file to catch the JSON response from Dr.CI API endpoint. The API requires `DRCI_BOT_KEY`.
*  `pytest -v test_trymerge.py`

Pull Request resolved: https://github.com/pytorch/pytorch/pull/110054
Approved by: https://github.com/clee2000
This commit is contained in:
Huy Do 2023-09-27 21:21:29 +00:00 committed by PyTorch MergeBot
parent 213badf632
commit 955298bc40
8 changed files with 147 additions and 24 deletions

BIN
.github/scripts/drci_mocks.json.gz vendored Normal file

Binary file not shown.

View file

@ -5,7 +5,7 @@ import os
import warnings
from dataclasses import dataclass
from typing import Any, Callable, cast, Dict, List, Optional, Tuple
from typing import Any, Callable, cast, Dict, List, Optional, Tuple, Union
from urllib.error import HTTPError
from urllib.parse import quote
from urllib.request import Request, urlopen
@ -26,7 +26,7 @@ def gh_fetch_url_and_headers(
url: str,
*,
headers: Optional[Dict[str, str]] = None,
data: Optional[Dict[str, Any]] = None,
data: Union[Optional[Dict[str, Any]], str] = None,
method: Optional[str] = None,
reader: Callable[[Any], Any] = lambda x: x.read(),
) -> Tuple[Any, Any]:
@ -35,7 +35,11 @@ def gh_fetch_url_and_headers(
token = os.environ.get("GITHUB_TOKEN")
if token is not None and url.startswith("https://api.github.com/"):
headers["Authorization"] = f"token {token}"
data_ = json.dumps(data).encode() if data is not None else None
data_ = None
if data is not None:
data_ = data.encode() if isinstance(data, str) else json.dumps(data).encode()
try:
with urlopen(Request(url, headers=headers, data=data_, method=method)) as conn:
return conn.headers, reader(conn)
@ -57,7 +61,7 @@ def gh_fetch_url(
url: str,
*,
headers: Optional[Dict[str, str]] = None,
data: Optional[Dict[str, Any]] = None,
data: Union[Optional[Dict[str, Any]], str] = None,
method: Optional[str] = None,
reader: Callable[[Any], Any] = lambda x: x.read(),
) -> Any:

Binary file not shown.

Binary file not shown.

View file

@ -23,6 +23,7 @@ from trymerge import (
find_matching_merge_rule,
FlakyRule,
get_classifications,
get_drci_classifications,
get_rockset_results,
gh_get_team_members,
gh_graphql,
@ -42,6 +43,7 @@ if "GIT_REMOTE_URL" not in os.environ:
GQL_MOCKS = "gql_mocks.json.gz"
ROCKSET_MOCKS = "rockset_mocks.json.gz"
DRCI_MOCKS = "drci_mocks.json.gz"
def mock_query(
@ -72,19 +74,20 @@ def mock_query(
try:
rc = fallback_function(*args)
except HTTPError as err:
if err.code == 401:
if err.code == 401 or err.code == 403:
err_msg = f"If you are seeing this message during workflow run, please make sure to update {file_name}"
err_msg += f" locally, by deleting it and running {os.path.basename(__file__)} with "
err_msg += " GitHub Personal Access Token passed via GITHUB_TOKEN environment variable"
err_msg += (
" the rockset api key passed via ROCKSET_API_KEY environment variable"
)
err_msg += f" locally, by deleting it and running {os.path.basename(__file__)} with"
err_msg += " GitHub Personal Access Token passed via GITHUB_TOKEN,"
err_msg += " the rockset api key passed via ROCKSET_API_KEY,"
err_msg += " and drci api key passed via DRCI_BOT_KEY environment variables"
if (
os.getenv("GITHUB_TOKEN") is None
or os.getenv("ROCKSET_API_KEY") is None
or os.getenv("DRCI_BOT_KEY") is None
):
err_msg = (
"Failed to update cached GraphQL queries as GITHUB_TOKEN or ROCKSET_API_KEY is not defined."
"Failed to update cached queries as GITHUB_TOKEN or ROCKSET_API_KEY or DRCI_BOT_KEY "
+ "is not defined. "
+ err_msg
)
raise RuntimeError(err_msg) from err
@ -117,6 +120,16 @@ def mocked_rockset_results(head_sha: str, merge_base: str, num_retries: int = 3)
)
def mocked_drci_classifications(pr_num: int, project: str, num_retries: int = 3) -> Any:
return mock_query(
get_drci_classifications,
DRCI_MOCKS,
lambda x, y: f"{x} {y}",
pr_num,
project,
)
def mock_parse_args(revert: bool = False, force: bool = False) -> Any:
class Object:
def __init__(self) -> None:
@ -245,6 +258,9 @@ class DummyGitRepo(GitRepo):
@mock.patch("trymerge.read_flaky_rules", side_effect=empty_flaky_rules)
@mock.patch("trymerge.get_rockset_results", side_effect=empty_rockset_results)
@mock.patch("trymerge.gh_graphql", side_effect=mocked_gh_graphql)
@mock.patch(
"trymerge.get_drci_classifications", side_effect=mocked_drci_classifications
)
class TestTryMerge(TestCase):
def test_merge_rules_valid(self, *args: Any) -> None:
"Test that merge_rules.yaml can be parsed"
@ -623,12 +639,7 @@ class TestTryMerge(TestCase):
case["expected"], is_broken_trunk(case["head_job"], case["base_jobs"])
)
def test_get_merge_base(
self,
mock_gh_graphql: Any,
mock_get_rockset_results: Any,
mock_read_flaky_rules: Any,
) -> None:
def test_get_merge_base(self, *args: Any) -> None:
pr = GitHubPR("pytorch", "pytorch", 104121)
mock_merge_base = "mocked-sha"
@ -646,6 +657,9 @@ class TestTryMerge(TestCase):
@mock.patch("trymerge.get_rockset_results", side_effect=mocked_rockset_results)
@mock.patch("trymerge.gh_graphql", side_effect=mocked_gh_graphql)
@mock.patch("trymerge.gh_fetch_merge_base", return_value="")
@mock.patch(
"trymerge.get_drci_classifications", side_effect=mocked_drci_classifications
)
class TestBypassFailures(TestCase):
def test_get_classifications(self, *args: Any) -> None:
flaky_rules = [
@ -655,7 +669,13 @@ class TestBypassFailures(TestCase):
pr = GitHubPR("pytorch", "pytorch", 92863)
checks = pr.get_checkrun_conclusions()
checks = get_classifications(
checks, pr.last_commit()["oid"], pr.get_merge_base(), flaky_rules, []
pr.pr_num,
pr.project,
checks,
pr.last_commit()["oid"],
pr.get_merge_base(),
flaky_rules,
[],
)
self.assertTrue(
checks[
@ -692,11 +712,34 @@ class TestBypassFailures(TestCase):
self.assertTrue(len(ignorable["FLAKY"]) == 1)
self.assertTrue(len(ignorable["BROKEN_TRUNK"]) == 1)
def test_get_classifications_similar_failures(self, *args: Any) -> None:
pr = GitHubPR("pytorch", "pytorch", 109750)
checks = pr.get_checkrun_conclusions()
checks = get_classifications(
pr.pr_num,
pr.project,
checks,
pr.last_commit()["oid"],
pr.get_merge_base(),
[],
[],
)
pending, failed, ignorable = categorize_checks(checks, list(checks.keys()))
self.assertTrue(len(pending) == 0)
self.assertTrue(len(failed) == 0)
self.assertTrue(len(ignorable["FLAKY"]) == 1)
def test_get_classifications_unstable(self, *args: Any) -> None:
pr = GitHubPR("pytorch", "pytorch", 104312)
checks = pr.get_checkrun_conclusions()
checks = get_classifications(
checks, pr.last_commit()["oid"], pr.get_merge_base(), [], []
pr.pr_num,
pr.project,
checks,
pr.last_commit()["oid"],
pr.get_merge_base(),
[],
[],
)
workflow_name = "linux-bionic-cuda12.1-py3.10-gcc9-bazel-test"
job_name = "build-and-test (default, 1, 1, linux.4xlarge.nvidia.gpu, unstable)"
@ -714,7 +757,13 @@ class TestBypassFailures(TestCase):
pr = GitHubPR("pytorch", "pytorch", 105998)
checks = pr.get_checkrun_conclusions()
checks = get_classifications(
checks, pr.last_commit()["oid"], pr.get_merge_base(), [], []
pr.pr_num,
pr.project,
checks,
pr.last_commit()["oid"],
pr.get_merge_base(),
[],
[],
)
pending, failed, ignorable = categorize_checks(
checks, list(checks.keys()), ok_failed_checks_threshold=1
@ -761,7 +810,13 @@ class TestBypassFailures(TestCase):
) as mocked_gh_fetch_merge_base:
checks = pr.get_checkrun_conclusions()
checks = get_classifications(
checks, pr.last_commit()["oid"], pr.get_merge_base(), [], []
pr.pr_num,
pr.project,
checks,
pr.last_commit()["oid"],
pr.get_merge_base(),
[],
[],
)
pending, failed, _ = categorize_checks(checks, list(checks.keys()))
@ -796,7 +851,13 @@ class TestBypassFailures(TestCase):
# No broken trunk or flaky rules, then all failures are ignored when ic is used
checks = get_classifications(
checks, pr.last_commit()["oid"], None, [], [broken_trunk, flaky]
pr.pr_num,
pr.project,
checks,
pr.last_commit()["oid"],
None,
[],
[broken_trunk, flaky],
)
self.assertTrue(checks[flaky].classification == "IGNORE_CURRENT_CHECK")
self.assertTrue(checks[broken_trunk].classification == "IGNORE_CURRENT_CHECK")
@ -812,6 +873,8 @@ class TestBypassFailures(TestCase):
# merge base here to get the results from Rockset, and that categorize the
# broken trunk failure too
checks = get_classifications(
pr.pr_num,
pr.project,
checks,
pr.last_commit()["oid"],
pr.get_merge_base(),
@ -830,6 +893,8 @@ class TestBypassFailures(TestCase):
# Broken trunk takes precedence over ignore current (no flaky rule is set here)
checks = get_classifications(
pr.pr_num,
pr.project,
checks,
pr.last_commit()["oid"],
pr.get_merge_base(),
@ -849,7 +914,9 @@ class TestBypassFailures(TestCase):
@mock.patch("trymerge.read_flaky_rules", side_effect=xla_is_flaky_rules)
@mock.patch("trymerge.read_merge_rules", side_effect=xla_merge_rules)
def test_dont_ignore_flaky_failures(self, *args: Any) -> None:
"""Regression test for https://github.com/pytorch/test-infra/issues/4126"""
"""
Regression test for https://github.com/pytorch/test-infra/issues/4126
"""
pr = GitHubPR("pytorch", "pytorch", 100369)
repo = DummyGitRepo()
# Check that failure is classified as flaky but still raises exception
@ -865,6 +932,9 @@ class TestBypassFailures(TestCase):
@mock.patch("trymerge.get_rockset_results", side_effect=mocked_rockset_results)
@mock.patch("trymerge.gh_graphql", side_effect=mocked_gh_graphql)
@mock.patch("trymerge.gh_fetch_merge_base", return_value="")
@mock.patch(
"trymerge.get_drci_classifications", side_effect=mocked_drci_classifications
)
class TestGitHubPRGhstackDependencies2(TestCase):
def test_pr_dependencies(self, *args: Any) -> None:
pr = GitHubPR("pytorch", "pytorch", 106068)

View file

@ -1313,6 +1313,8 @@ def find_matching_merge_rule(
f"{type(e)}\n{e}"
)
checks = get_classifications(
pr.pr_num,
pr.project,
checks,
pr.last_commit()["oid"],
base_rev,
@ -1575,6 +1577,27 @@ where
return []
@retries_decorator()
def get_drci_classifications(pr_num: int, project: str = "pytorch") -> Any:
"""
Query HUD API to find similar failures to decide if they are flaky
"""
# NB: This doesn't work internally atm because this requires making an
# external API call to HUD
failures = gh_fetch_url(
f"https://hud.pytorch.org/api/drci/drci?prNumber={pr_num}",
data=f"repo={project}",
headers={
"Authorization": os.getenv("DRCI_BOT_KEY", ""),
"Accept": "application/vnd.github.v3+json",
},
method="POST",
reader=json.load,
)
return failures.get(str(pr_num), {}) if failures else {}
REMOVE_JOB_NAME_SUFFIX_REGEX = re.compile(r", [0-9]+, [0-9]+, .+\)$")
@ -1595,7 +1618,24 @@ def is_broken_trunk(
)
def is_flaky(
head_job: Optional[Dict[str, Any]],
flaky_rules: List[FlakyRule],
drci_classifications: Any,
) -> bool:
if not head_job or not drci_classifications:
return False
# Consult the list of flaky failures from Dr.CI
return any(
head_job["name"] == flaky["name"]
for flaky in drci_classifications.get("FLAKY", [])
) or any(rule.matches(head_job) for rule in flaky_rules)
def get_classifications(
pr_num: int,
project: str,
checks: Dict[str, JobCheckState],
head_sha: str,
merge_base: Optional[str],
@ -1652,6 +1692,11 @@ def get_classifications(
overwrite_failed_run_attempt=False,
)
# Get the failure classification from Dr.CI, which is the source of truth
# going forward
drci_classifications = get_drci_classifications(pr_num=pr_num, project=project)
print(f"From Dr.CI: {json.dumps(drci_classifications)}")
checks_with_classifications = checks.copy()
for name, check in checks.items():
if check.status == "SUCCESS":
@ -1682,7 +1727,7 @@ def get_classifications(
)
continue
elif any(rule.matches(head_sha_job) for rule in flaky_rules):
elif is_flaky(head_sha_job, flaky_rules, drci_classifications):
checks_with_classifications[name] = JobCheckState(
check.name, check.url, check.status, "FLAKY", check.job_id, check.title
)
@ -2007,6 +2052,8 @@ def merge(
checks = pr.get_checkrun_conclusions()
checks = get_classifications(
pr.pr_num,
pr.project,
checks,
pr.last_commit()["oid"],
pr.get_merge_base(),

View file

@ -41,6 +41,7 @@ jobs:
REBASE: ${{ github.event.client_payload.rebase }}
IGNORE_CURRENT: ${{ github.event.client_payload.ignore_current }}
ROCKSET_API_KEY: ${{ secrets.ROCKSET_API_KEY }}
DRCI_BOT_KEY: ${{ secrets.DRCI_BOT_KEY }}
run: |
set -ex
if [ -n "${REBASE}" ]; then

View file

@ -391,6 +391,7 @@ exclude_patterns=[
'test/cpp/jit/upgrader_models/*.ptl',
'test/cpp/jit/upgrader_models/*.ptl.ff',
'**/*.png',
'**/*.gz',
]
command = [
'python3',