[ci] write test suites to rockset

Currently we upload all `testcase` elements as individual test runs to
Rockset. It would be nice to also have `testsuite`s as well, which
aggregate high level information.

These aggregations could technically be performed in the backend, but it's
faster to just log the data since we already have it in the XML test
report.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/79265

Approved by: https://github.com/seemethere
This commit is contained in:
Michael Suo 2022-06-09 23:17:32 -07:00 committed by PyTorch MergeBot
parent cec251fc4b
commit eaaa34daef
2 changed files with 37 additions and 22 deletions

View file

@ -4,7 +4,7 @@ import requests
import zipfile
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Dict, List, Any
from typing import Dict, List, Any, Tuple
from tempfile import TemporaryDirectory
import rockset # type: ignore[import]
@ -22,10 +22,10 @@ def get_request_headers() -> Dict[str, str]:
def parse_xml_report(
report: Path, workflow_id: int, workflow_run_attempt: int
tag: str, report: Path, workflow_id: int, workflow_run_attempt: int
) -> List[Dict[str, Any]]:
"""Convert a test report xml file into a JSON-serializable list of test cases."""
print(f"Parsing test report: {report}")
print(f"Parsing {tag}s for test report: {report}")
# [Job id in artifacts]
# Retrieve the job id from the report path. In our GHA workflows, we append
# the job id to the end of the report name, so `report` looks like:
@ -37,7 +37,7 @@ def parse_xml_report(
root = ET.parse(report)
test_cases = []
for test_case in root.iter("testcase"):
for test_case in root.iter(tag):
case = process_xml_element(test_case)
case["workflow_id"] = workflow_id
case["workflow_run_attempt"] = workflow_run_attempt
@ -58,14 +58,17 @@ def process_xml_element(element: ET.Element) -> Dict[str, Any]:
# {"name": "test_foo", "classname": "test_bar"}
ret.update(element.attrib)
# By default, all attributes are strings. Apply a few special conversions
# here for well-known attributes so that they are the right type in Rockset.
line = ret.get("line")
if line:
ret["line"] = int(line)
time = ret.get("time")
if time:
ret["time"] = float(time)
# The XML format encodes all values as strings. Convert to ints/floats if
# possible to make aggregation possible in Rockset.
for k, v in ret.items():
try:
ret[k] = int(v)
except ValueError:
pass
try:
ret[k] = float(v)
except ValueError:
pass
# Convert inner and outer text into special dict elements.
# e.g.
@ -181,18 +184,18 @@ def download_and_extract_gha_artifacts(
download_and_extract_artifact(Path(name), url, workflow_run_attempt)
def upload_to_rockset(test_cases: List[Any]) -> None:
print(f"Writing {len(test_cases)} test cases to Rockset")
def upload_to_rockset(collection: str, docs: List[Any]) -> None:
print(f"Writing {len(docs)} documents to Rockset")
client = rockset.Client(
api_server="api.rs2.usw2.rockset.com", api_key=os.environ["ROCKSET_API_KEY"]
)
client.Collection.retrieve("test_run").add_docs(test_cases)
client.Collection.retrieve(collection).add_docs(docs)
print("Done!")
def get_test_cases(
def get_tests(
workflow_run_id: int, workflow_run_attempt: int
) -> List[Dict[str, Any]]:
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
with TemporaryDirectory() as temp_dir:
print("Using temporary directory:", temp_dir)
os.chdir(temp_dir)
@ -203,16 +206,26 @@ def get_test_cases(
# Parse the reports and transform them to JSON
test_cases = []
test_suites = []
for xml_report in Path(".").glob("**/*.xml"):
test_cases.extend(
parse_xml_report(
"testcase",
xml_report,
workflow_run_id,
workflow_run_attempt,
)
)
test_suites.extend(
parse_xml_report(
"testsuite",
xml_report,
workflow_run_id,
workflow_run_attempt,
)
)
return test_cases
return test_cases, test_suites
if __name__ == "__main__":
@ -230,5 +243,6 @@ if __name__ == "__main__":
help="which retry of the workflow this is",
)
args = parser.parse_args()
test_cases = get_test_cases(args.workflow_run_id, args.workflow_run_attempt)
upload_to_rockset(test_cases)
test_cases, test_suites = get_tests(args.workflow_run_id, args.workflow_run_attempt)
upload_to_rockset("test_run", test_cases)
upload_to_rockset("test_suite", test_suites)

View file

@ -3,7 +3,7 @@ import os
IN_CI = os.environ.get("CI")
from tools.stats.upload_test_stats import get_test_cases
from tools.stats.upload_test_stats import get_tests
class TestUploadTestStats(unittest.TestCase):
@ -13,8 +13,9 @@ class TestUploadTestStats(unittest.TestCase):
)
def test_existing_job(self) -> None:
"""Run on a known-good job and make sure we don't error and get basically okay reults."""
test_cases = get_test_cases(2465214458, 1)
test_cases, test_suites = get_tests(2465214458, 1)
self.assertEqual(len(test_cases), 731457)
self.assertEqual(len(test_suites), 7781)
if __name__ == "__main__":