mirror of
https://github.com/obsproject/obs-studio.git
synced 2025-12-30 18:08:08 -05:00
373 lines
13 KiB
Python
373 lines
13 KiB
Python
import asyncio
|
||
import json
|
||
import os
|
||
import time
|
||
import sys
|
||
import zipfile
|
||
|
||
import aiohttp
|
||
import requests
|
||
|
||
from io import BytesIO
|
||
from typing import List, Dict
|
||
from collections import defaultdict
|
||
|
||
MINIMUM_PURGE_AGE = 9.75 * 24 * 60 * 60 # slightly less than 10 days
|
||
TIMEOUT = 10
|
||
SKIPPED_SERVICES = {"SHOWROOM", "Dacast"}
|
||
SERVICES_FILE = "plugins/rtmp-services/data/services.json"
|
||
PACKAGE_FILE = "plugins/rtmp-services/data/package.json"
|
||
CACHE_FILE = "other/timestamps.json"
|
||
GITHUB_OUTPUT_FILE = os.environ.get("GITHUB_OUTPUT", None)
|
||
|
||
DO_NOT_PING = {"jp9000"}
|
||
PR_MESSAGE = """This is an automatically created pull request to remove unresponsive servers and services.
|
||
|
||
| Service | Action Taken | Author(s) |
|
||
| ------- | ------------ | --------- |
|
||
{table}
|
||
|
||
If you are not responsible for an affected service and want to be excluded from future pings please let us know.
|
||
|
||
Created by workflow run: https://github.com/{repository}/actions/runs/{run_id}"""
|
||
|
||
# GQL is great isn't it
|
||
GQL_QUERY = """{
|
||
repositoryOwner(login: "obsproject") {
|
||
repository(name: "obs-studio") {
|
||
object(expression: "master") {
|
||
... on Commit {
|
||
blame(path: "plugins/rtmp-services/data/services.json") {
|
||
ranges {
|
||
startingLine
|
||
endingLine
|
||
commit {
|
||
author {
|
||
user {
|
||
login
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}"""
|
||
|
||
|
||
def get_last_artifact():
|
||
s = requests.session()
|
||
s.headers["Authorization"] = f'Bearer {os.environ["GITHUB_TOKEN"]}'
|
||
|
||
run_id = os.environ["WORKFLOW_RUN_ID"]
|
||
repo = os.environ["REPOSITORY"]
|
||
|
||
# fetch run first, get workflow id from there to get workflow runs
|
||
r = s.get(f"https://api.github.com/repos/{repo}/actions/runs/{run_id}")
|
||
r.raise_for_status()
|
||
workflow_id = r.json()["workflow_id"]
|
||
|
||
r = s.get(
|
||
f"https://api.github.com/repos/{repo}/actions/workflows/{workflow_id}/runs",
|
||
params=dict(
|
||
per_page=1,
|
||
status="completed",
|
||
branch="master",
|
||
conclusion="success",
|
||
event="schedule",
|
||
),
|
||
)
|
||
r.raise_for_status()
|
||
runs = r.json()
|
||
if not runs["workflow_runs"]:
|
||
raise ValueError("No completed workflow runs found")
|
||
|
||
r = s.get(runs["workflow_runs"][0]["artifacts_url"])
|
||
r.raise_for_status()
|
||
|
||
for artifact in r.json()["artifacts"]:
|
||
if artifact["name"] == "timestamps":
|
||
artifact_url = artifact["archive_download_url"]
|
||
break
|
||
else:
|
||
raise ValueError("No previous artifact found.")
|
||
|
||
r = s.get(artifact_url)
|
||
r.raise_for_status()
|
||
zip_data = BytesIO()
|
||
zip_data.write(r.content)
|
||
|
||
with zipfile.ZipFile(zip_data) as zip_ref:
|
||
for info in zip_ref.infolist():
|
||
if info.filename == "timestamps.json":
|
||
return json.loads(zip_ref.read(info.filename))
|
||
|
||
|
||
def find_people_to_blame(raw_services: str, servers: list[tuple[str, str]]) -> dict:
|
||
if not servers:
|
||
return dict()
|
||
|
||
# Fetch Blame data from github
|
||
s = requests.session()
|
||
s.headers["Authorization"] = f'Bearer {os.environ["GITHUB_TOKEN"]}'
|
||
|
||
r = s.post(
|
||
"https://api.github.com/graphql", json=dict(query=GQL_QUERY, variables=dict())
|
||
)
|
||
r.raise_for_status()
|
||
j = r.json()
|
||
|
||
# The file is only ~2600 lines so this isn't too crazy and makes the lookup very easy
|
||
line_author = dict()
|
||
for blame in j["data"]["repositoryOwner"]["repository"]["object"]["blame"][
|
||
"ranges"
|
||
]:
|
||
for i in range(blame["startingLine"] - 1, blame["endingLine"]):
|
||
if user := blame["commit"]["author"]["user"]:
|
||
line_author[i] = user["login"]
|
||
|
||
service_authors = defaultdict(set)
|
||
for i, line in enumerate(raw_services.splitlines()):
|
||
if '"url":' not in line:
|
||
continue
|
||
for server, service in servers:
|
||
if server in line and (author := line_author.get(i)):
|
||
if author not in DO_NOT_PING:
|
||
service_authors[service].add(author)
|
||
|
||
return service_authors
|
||
|
||
|
||
def set_output(name, value):
|
||
if not GITHUB_OUTPUT_FILE:
|
||
return
|
||
|
||
try:
|
||
with open(GITHUB_OUTPUT_FILE, "a", encoding="utf-8", newline="\n") as f:
|
||
f.write(f"{name}={value}\n")
|
||
except Exception as e:
|
||
print(f"Writing to github output files failed: {e!r}")
|
||
|
||
|
||
async def check_servers_task(
|
||
session: aiohttp.ClientSession, host: str, protocol: str, servers: List[str]
|
||
) -> List[Dict]:
|
||
query = [dict(url=h, protocol=protocol) for h in servers]
|
||
|
||
async with session.get(
|
||
f"http://{host}:8999/test_remote_servers", json=query
|
||
) as resp:
|
||
return await resp.json()
|
||
|
||
|
||
async def process_services(session: aiohttp.ClientSession, check_servers: List[str]):
|
||
try:
|
||
with open(SERVICES_FILE, encoding="utf-8") as services_file:
|
||
raw_services = services_file.read()
|
||
services = json.loads(raw_services)
|
||
with open(PACKAGE_FILE, encoding="utf-8") as package_file:
|
||
package = json.load(package_file)
|
||
except OSError as e:
|
||
print(f"❌ Could not open services/package file: {e}")
|
||
return 1
|
||
|
||
# attempt to load last check result cache
|
||
try:
|
||
with open(CACHE_FILE, encoding="utf-8") as check_file:
|
||
fail_timestamps = json.load(check_file)
|
||
except OSError as e:
|
||
# cache might be evicted or not exist yet, so this is non-fatal
|
||
print(
|
||
f"⚠️ Could not read cache file, trying to get last artifact (Exception: {e})"
|
||
)
|
||
|
||
try:
|
||
fail_timestamps = get_last_artifact()
|
||
except Exception as e:
|
||
print(f"⚠️ Could not fetch cache file, starting fresh. (Exception: {e})")
|
||
fail_timestamps = dict()
|
||
else:
|
||
print("Fetched cache file from last run artifact.")
|
||
else:
|
||
print("Successfully loaded cache file:", CACHE_FILE)
|
||
|
||
start_time = int(time.time())
|
||
affected_services = dict()
|
||
removed_servers = list()
|
||
|
||
# create temporary new list
|
||
new_services = services.copy()
|
||
new_services["services"] = []
|
||
|
||
for service in services["services"]:
|
||
# skip services that do custom stuff that we can't easily check
|
||
if service["name"] in SKIPPED_SERVICES:
|
||
new_services["services"].append(service)
|
||
continue
|
||
|
||
service_type = service.get("recommended", {}).get("output", "rtmp_output")
|
||
if service_type not in {"rtmp_output", "ffmpeg_hls_muxer"}:
|
||
print("Unknown service type:", service_type)
|
||
new_services["services"].append(service)
|
||
continue
|
||
|
||
protocol = "rtmp" if service_type == "rtmp_output" else "hls"
|
||
|
||
# create a copy to mess with
|
||
new_service = service.copy()
|
||
new_service["servers"] = []
|
||
|
||
# run checks for all the servers, and store results in timestamp cache
|
||
try:
|
||
servers = [s["url"] for s in service["servers"]]
|
||
tasks = []
|
||
for host in check_servers:
|
||
tasks.append(
|
||
asyncio.create_task(
|
||
check_servers_task(session, host, protocol, servers)
|
||
)
|
||
)
|
||
results = await asyncio.gather(*tasks)
|
||
except Exception as e:
|
||
print(
|
||
f"❌ Querying server status for \"{service['name']}\" failed with: {e}"
|
||
)
|
||
return 1
|
||
|
||
# go over results
|
||
for server, result in zip(service["servers"], zip(*results)):
|
||
failure_count = sum(not res["status"] for res in result)
|
||
probe_count = len(result)
|
||
# only treat server as failed if all check servers reported a failure
|
||
is_ok = failure_count < probe_count
|
||
|
||
if not is_ok:
|
||
failures = {res["comment"] for res in result if not res["status"]}
|
||
print(
|
||
f"⚠️ Connecting to server failed: {server['url']} (Reason(s): {failures})"
|
||
)
|
||
|
||
if ts := fail_timestamps.get(server["url"], None):
|
||
if (delta := start_time - ts) >= MINIMUM_PURGE_AGE:
|
||
print(
|
||
f'🗑️ Purging server "{server["url"]}", it has been '
|
||
f"unresponsive for {round(delta/60/60/24)} days."
|
||
)
|
||
removed_servers.append((server["url"], service["name"]))
|
||
# continuing here means not adding it to the new list, thus dropping it
|
||
continue
|
||
else:
|
||
fail_timestamps[server["url"]] = start_time
|
||
elif is_ok and server["url"] in fail_timestamps:
|
||
# remove timestamp of failed check if server is back
|
||
delta = start_time - fail_timestamps[server["url"]]
|
||
print(
|
||
f'💡 Server "{server["url"]}" is back after {round(delta/60/60/24)} days!'
|
||
)
|
||
del fail_timestamps[server["url"]]
|
||
|
||
new_service["servers"].append(server)
|
||
|
||
if (diff := len(service["servers"]) - len(new_service["servers"])) > 0:
|
||
print(f'ℹ️ Removed {diff} server(s) from {service["name"]}')
|
||
affected_services[service["name"]] = f"{diff} servers removed"
|
||
|
||
# remove services with no valid servers
|
||
if not new_service["servers"]:
|
||
print(f'💀 Service "{service["name"]}" has no valid servers left, removing!')
|
||
affected_services[service["name"]] = f"Service removed"
|
||
else:
|
||
new_services["services"].append(new_service)
|
||
|
||
# wait a bit between services
|
||
await asyncio.sleep(2.0)
|
||
|
||
# write cache file
|
||
try:
|
||
os.makedirs("other", exist_ok=True)
|
||
with open(CACHE_FILE, "w", encoding="utf-8") as cache_file:
|
||
json.dump(fail_timestamps, cache_file)
|
||
except OSError as e:
|
||
print(f"❌ Could not write cache file: {e}")
|
||
return 1
|
||
else:
|
||
print("Successfully wrote cache file:", CACHE_FILE)
|
||
|
||
if removed_servers:
|
||
# increment package version and save that as well
|
||
package["version"] += 1
|
||
package["files"][0]["version"] += 1
|
||
|
||
try:
|
||
with open(SERVICES_FILE, "w", encoding="utf-8") as services_file:
|
||
json.dump(new_services, services_file, indent=4, ensure_ascii=False)
|
||
services_file.write("\n")
|
||
|
||
with open(PACKAGE_FILE, "w", encoding="utf-8") as package_file:
|
||
json.dump(package, package_file, indent=4)
|
||
package_file.write("\n")
|
||
except OSError as e:
|
||
print(f"❌ Could not write services/package file: {e}")
|
||
return 1
|
||
else:
|
||
print(
|
||
f"Successfully wrote services/package files:\n- {SERVICES_FILE}\n- {PACKAGE_FILE}"
|
||
)
|
||
|
||
# try to find authors to ping, this is optional and is allowed to fail
|
||
try:
|
||
service_authors = find_people_to_blame(raw_services, removed_servers)
|
||
except Exception as e:
|
||
print(f"⚠ Could not fetch blame for some reason: {e}")
|
||
service_authors = dict()
|
||
|
||
# set GitHub outputs
|
||
set_output("make_pr", "true")
|
||
msg = PR_MESSAGE.format(
|
||
repository=os.environ["REPOSITORY"],
|
||
run_id=os.environ["WORKFLOW_RUN_ID"],
|
||
table="\n".join(
|
||
"| {name} | {action} | {authors} |".format(
|
||
name=name.replace("|", "\\|"),
|
||
action=action,
|
||
authors=", ".join(
|
||
f"@{author}" for author in sorted(service_authors.get(name, []))
|
||
),
|
||
)
|
||
for name, action in sorted(affected_services.items())
|
||
),
|
||
)
|
||
set_output("pr_message", json.dumps(msg))
|
||
else:
|
||
set_output("make_pr", "false")
|
||
|
||
|
||
async def main():
|
||
# check for environment variables
|
||
try:
|
||
api_key = os.environ["API_KEY"]
|
||
servers = os.environ["API_SERVERS"].split(",")
|
||
if not servers:
|
||
raise ValueError("No checker servers!")
|
||
# Mask everything except the region code
|
||
for server in servers:
|
||
prefix = server[: server.index(".") + 1]
|
||
print(f"::add-mask::{prefix}")
|
||
suffix = server[server.index(".", len(prefix)) :]
|
||
print(f"::add-mask::{suffix}")
|
||
except Exception as e:
|
||
print(f"❌ Failed getting required environment variables: {e}")
|
||
return 1
|
||
|
||
# create aiohttp session
|
||
async with aiohttp.ClientSession() as session:
|
||
session.headers["Authorization"] = api_key
|
||
session.headers["User-Agent"] = "OBS Repo Service Checker/1.0"
|
||
return await process_services(session, servers)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(asyncio.run(main()))
|