Upload external contribution data to s3 (#95747)

Context: We want to create a metric panel to track external contributions to the PyTorch repo

This PR creates a daily job to track how many external contributions occurred the day before and uploads it to a s3 collection which is accessible by rockset.

`upload_external_contrib_stats.py` is a python script which grabs the neccesary stats from github and sticks them into an s3 bucket. It is used here to do daily uploads, but can generally be used for larger queries as well.

Pull Request resolved: https://github.com/pytorch/pytorch/pull/95747
Approved by: https://github.com/huydhn, https://github.com/kit1980
This commit is contained in:
PaliC 2023-03-02 10:36:20 -08:00 committed by PyTorch MergeBot
parent 02fa2291f7
commit 3df1a9baca
5 changed files with 189 additions and 13 deletions

View file

@ -0,0 +1,27 @@
name: Upload contribution stats
on:
schedule:
# Choose a random time near midnight PST because it may be delayed if there are high loads
- cron: 37 7 * * *
jobs:
upload-contribution-stats:
runs-on: ubuntu-latest
steps:
- name: Checkout PyTorch
uses: pytorch/pytorch/.github/actions/checkout-pytorch@master
- run: |
pip3 install requests==2.26
pip3 install rockset==1.0.3
pip3 install boto3==1.19.12
- name: Upload external contribution stats
env:
ROCKSET_API_KEY: ${{ secrets.ROCKSET_API_KEY }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
echo "Uploading external contribution stats for $(date -v-1d +%F)"
python3 -m tools.stats.upload_external_contrib_stats --startDate "$(date -v-1d +%F)"

View file

@ -11,7 +11,7 @@ from tools.stats.upload_stats_lib import (
download_s3_artifacts,
is_rerun_disabled_tests,
unzip,
upload_to_s3,
upload_workflow_stats_to_s3,
)
from tools.stats.upload_test_stats import process_xml_element
@ -218,7 +218,7 @@ def save_results(
f" {disabled_test_name} from {filename}, failing {num_red}/{num_red + num_green}"
)
upload_to_s3(
upload_workflow_stats_to_s3(
workflow_id,
workflow_run_attempt,
"rerun_disabled_tests",

View file

@ -0,0 +1,142 @@
import argparse
import datetime
import json
import os
import urllib.parse
from typing import Any, Callable, cast, Dict, List, Optional, Set
from urllib.error import HTTPError
from urllib.request import Request, urlopen
# import time
from tools.stats.upload_stats_lib import upload_to_s3
FILTER_OUT_USERS = {"pytorchmergebot", "facebook-github-bot", "pytorch-bot[bot]"}
def _fetch_url(
url: str,
headers: Dict[str, str],
data: Optional[Dict[str, Any]] = None,
method: Optional[str] = None,
reader: Callable[[Any], Any] = lambda x: x.read(),
) -> Any:
token = os.environ.get("GITHUB_TOKEN")
if token is not None and url.startswith("https://api.github.com/"):
headers["Authorization"] = f"token {token}"
data_ = json.dumps(data).encode() if data is not None else None
try:
with urlopen(Request(url, headers=headers, data=data_, method=method)) as conn:
return reader(conn)
except HTTPError as err:
print(err.reason)
print(err.headers)
if err.code == 403 and all(
key in err.headers for key in ["X-RateLimit-Limit", "X-RateLimit-Used"]
):
print(
f"Rate limit exceeded: {err.headers['X-RateLimit-Used']}/{err.headers['X-RateLimit-Limit']}"
)
raise
def fetch_json(
url: str,
params: Optional[Dict[str, Any]] = None,
data: Optional[Dict[str, Any]] = None,
) -> List[Dict[str, Any]]:
headers = {"Accept": "application/vnd.github.v3+json"}
if params is not None and len(params) > 0:
url += "?" + "&".join(
f"{name}={urllib.parse.quote(str(val))}" for name, val in params.items()
)
return cast(
List[Dict[str, Any]],
_fetch_url(url, headers=headers, data=data, reader=json.load),
)
def get_external_pr_data(
start_date: datetime.date, end_date: datetime.date, period_length: int = 1
) -> List[Dict[str, Any]]:
pr_info = []
period_begin_date = start_date
pr_count = 0
users: Set[str] = set()
while period_begin_date < end_date:
period_end_date = period_begin_date + datetime.timedelta(days=period_length - 1)
page = 1
responses: List[Dict[str, Any]] = []
while len(responses) > 0 or page == 1:
response = cast(
Dict[str, Any],
fetch_json(
"https://api.github.com/search/issues",
params={
"q": f'repo:pytorch/pytorch is:pr is:closed \
label:"open source" label:Merged -label:Reverted closed:{period_begin_date}..{period_end_date}',
"per_page": "100",
"page": str(page),
},
),
)
items = response["items"]
for item in items:
u = item["user"]["login"]
if u not in FILTER_OUT_USERS:
pr_count += 1
users.add(u)
page += 1
pr_info.append(
{
"date": str(period_begin_date),
"pr_count": pr_count,
"user_count": len(users),
}
)
period_begin_date = period_end_date + datetime.timedelta(days=1)
return pr_info
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Upload external contribution stats to Rockset"
)
parser.add_argument(
"--startDate",
type=datetime.date.fromisoformat,
required=True,
help="the first date to upload data for in any valid ISO 8601 format format (eg. YYYY-MM-DD).",
)
parser.add_argument(
"--length",
type=int,
required=False,
help="the number of days to upload data for. Default is 1.",
default=1,
)
parser.add_argument(
"--period-length",
type=int,
required=False,
help="the number of days to group data for. Default is 1.",
default=1,
)
args = parser.parse_args()
for i in range(args.length):
startdate = args.startDate + datetime.timedelta(days=i)
data = get_external_pr_data(
startdate,
startdate + datetime.timedelta(days=args.period_length),
period_length=args.period_length,
)
upload_to_s3(
bucket_name="torchci-contribution-data",
key=f"external_contribution_counts/{str(startdate)}",
docs=data,
)
# uncomment when running large queries locally to avoid github's rate limiting
#
# import time
# time.sleep(20)

View file

@ -116,9 +116,8 @@ def upload_to_rockset(collection: str, docs: List[Any]) -> None:
def upload_to_s3(
workflow_run_id: int,
workflow_run_attempt: int,
collection: str,
bucket_name: str,
key: str,
docs: List[Dict[str, Any]],
) -> None:
print(f"Writing {len(docs)} documents to S3")
@ -127,10 +126,7 @@ def upload_to_s3(
json.dump(doc, body)
body.write("\n")
S3_RESOURCE.Object(
"ossci-raw-job-status",
f"{collection}/{workflow_run_id}/{workflow_run_attempt}",
).put(
S3_RESOURCE.Object(f"{bucket_name}", f"{key}",).put(
Body=gzip.compress(body.getvalue().encode()),
ContentEncoding="gzip",
ContentType="application/json",
@ -138,6 +134,17 @@ def upload_to_s3(
print("Done!")
def upload_workflow_stats_to_s3(
workflow_run_id: int,
workflow_run_attempt: int,
collection: str,
docs: List[Dict[str, Any]],
) -> None:
bucket_name = "ossci-raw-job-status"
key = f"{collection}/{workflow_run_id}/{workflow_run_attempt}"
upload_to_s3(bucket_name, key, docs)
def upload_file_to_s3(
file_name: str,
bucket: str,

View file

@ -11,7 +11,7 @@ from tools.stats.upload_stats_lib import (
download_s3_artifacts,
is_rerun_disabled_tests,
unzip,
upload_to_s3,
upload_workflow_stats_to_s3,
)
@ -340,14 +340,14 @@ if __name__ == "__main__":
test_case_summary, pytest_parallel_times
)
upload_to_s3(
upload_workflow_stats_to_s3(
args.workflow_run_id,
args.workflow_run_attempt,
"test_run_summary",
test_case_summary,
)
upload_to_s3(
upload_workflow_stats_to_s3(
args.workflow_run_id,
args.workflow_run_attempt,
"invoking_file_times",
@ -356,6 +356,6 @@ if __name__ == "__main__":
if args.head_branch == "master":
# For master jobs, upload everytihng.
upload_to_s3(
upload_workflow_stats_to_s3(
args.workflow_run_id, args.workflow_run_attempt, "test_run", test_cases
)