import hashlib import hmac import json import os from contextlib import asynccontextmanager from datetime import datetime from random import choice from secrets import token_hex from typing import Dict, List import cv2 import httpx import uvicorn from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.interval import IntervalTrigger from cryptography.fernet import Fernet from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, Request, Response from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse, RedirectResponse from prisma import Prisma from slack_bolt.adapter.fastapi.async_handler import AsyncSlackRequestHandler from slack_bolt.async_app import AsyncAck, AsyncApp from yarl import URL load_dotenv(dotenv_path="./.env") active_stream: Dict[str, str | bool] = {} active_streams: List[Dict[str, str | bool]] = [] scheduler = AsyncIOScheduler() FERNET = Fernet(os.environ["FERNET_KEY"]) def get_recording_duration(timestamp, stream_key): vid = cv2.VideoCapture( f"/home/onboard/recordings/{stream_key}/{datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%S.%fZ').strftime('%Y-%m-%d_%H-%M-%S-%f')}.mp4" ) return int( (vid.get(cv2.CAP_PROP_FRAME_COUNT) / vid.get(cv2.CAP_PROP_FPS)) / 60 ) # seconds to minutes def verify_gh_signature(payload_body, secret_token, signature_header): """Verify that the payload was sent from GitHub by validating SHA256. Raise and return 403 if not authorized. Args: payload_body: original request body to verify (request.body()) secret_token: GitHub app webhook token (WEBHOOK_SECRET) signature_header: header received from GitHub (x-hub-signature-256) """ if not signature_header: raise HTTPException( status_code=403, detail="x-hub-signature-256 header is missing!" ) hash_object = hmac.new( secret_token.encode("utf-8"), msg=payload_body, digestmod=hashlib.sha256 ) expected_signature = "sha256=" + hash_object.hexdigest() if not hmac.compare_digest(expected_signature, signature_header): raise HTTPException(status_code=403, detail="Request signatures didn't match!") async def get_recording_list(stream_key: str) -> List[str]: async with httpx.AsyncClient() as client: return [ recording["start"] for recording in ( await client.get( f"http://localhost:9997/v3/recordings/get/{stream_key}" ) ).json()["segments"] ] async def update_active(): global active_stream global active_streams async with httpx.AsyncClient() as client: streams_raw = (await client.get("http://localhost:9997/v3/paths/list")).json()[ "items" ] streams = [] for stream in streams_raw: streams.append({"name": stream["name"], "ready": stream["ready"]}) for stream in streams: if stream["ready"] and stream not in active_streams: active_streams.append(stream) if len(active_streams) == 0: print("No active streams") return if active_stream == {}: print("No current active stream, picking new one...") active_stream = choice(active_streams) return if len(active_streams) == 1: return print( f"starting to pick new active stream (switching away from {active_stream['name']})" ) new_stream = choice(active_streams) while new_stream["name"] == active_stream["name"]: print( f"re-attemppting to pick active stream since we picked {new_stream} again" ) new_stream = choice(active_streams) print(f"found new stream to make active: {new_stream}") print(f"trying to find user associated with stream {active_stream['name']}") old_active_stream_user = await db.user.find_first(where={"id": (await db.stream.find_first(where={"key": str(active_stream["name"])})).user_id}) # type: ignore await bolt.client.chat_postMessage(channel="C07ERCGG989", text=f"Hey <@{old_active_stream_user.slack_id}>, you're no longer in focus!") # type: ignore active_stream = new_stream active_stream_user = await db.user.find_first(where={"id": (await db.stream.find_first(where={"key": str(active_stream["name"])})).user_id}) # type: ignore await bolt.client.chat_postMessage(channel="C07ERCGG989", text=f"Hey <@{active_stream_user.slack_id}>, you're in focus! Make sure to tell us what you're working on!") # type: ignore return True async def check_for_new(): global active_stream global active_streams async with httpx.AsyncClient() as client: streams_raw = (await client.get("http://localhost:9997/v3/paths/list")).json()[ "items" ] streams_simple = [] for stream in streams_raw: if stream["ready"]: streams_simple.append(stream["name"]) active_streams_simple = [] for i in active_streams: active_streams_simple.append(i["name"]) if active_stream == {}: active_stream = {"name": i["name"], "ready": True} for stream in active_streams_simple: if stream not in streams_simple: active_streams.remove( next(item for item in active_streams if item["name"] == stream) ) active_stream = choice(active_streams) for stream in streams_simple: if stream not in active_streams_simple: active_streams.append({"name": stream, "ready": True}) if len(active_streams) == 0: print("No active streams") active_stream = {} @asynccontextmanager async def lifespan(app: FastAPI): await update_active() scheduler.start() scheduler.add_job(update_active, IntervalTrigger(seconds=5 * 60)) scheduler.add_job(check_for_new, IntervalTrigger(seconds=3)) await db.connect() async with httpx.AsyncClient() as client: for stream in await db.stream.find_many(): await client.post( "http://127.0.0.1:9997/v3/config/paths/add/" + stream.key, json={"name": stream.key}, ) yield scheduler.shutdown() await db.disconnect() api = FastAPI(lifespan=lifespan) # type: ignore api.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) db = Prisma() bolt = AsyncApp( token=os.environ["SLACK_TOKEN"], signing_secret=os.environ["SLACK_SIGNING_SECRET"] ) bolt_handler = AsyncSlackRequestHandler(bolt) @api.get("/auth/github/login") async def github_redirect(request: Request): return RedirectResponse( str( URL.build( scheme="https", host="github.com", path="/login/oauth/authorize", query={ "client_id": os.environ["GH_CLIENT_ID"], "redirect_uri": "https://live.onboard.hackclub.com/auth/github/callback", "scopes": "read:user", "state": request.query_params["state"], }, ) ) ) @api.get("/auth/github/callback") async def github_callback(request: Request): code: str = request.query_params["code"] state: str = request.query_params["state"] user_id, pr_id = FERNET.decrypt(bytes.fromhex(state)).decode().split("+") db_user = await db.user.find_first_or_raise(where={"slack_id": user_id}) user_stream_key = ( await db.stream.find_first_or_raise(where={"user_id": db_user.id}) ).key db_pr = await db.pullrequest.find_first_or_raise(where={"github_id": int(pr_id)}) async with httpx.AsyncClient() as client: token = ( await client.post( "https://github.com/login/oauth/access_token", json={ "client_id": os.environ["GH_CLIENT_ID"], "client_secret": os.environ["GH_CLIENT_SECRET"], "code": code, "redirect_uri": "https://live.onboard.hackclub.com/auth/github/callback", }, headers={"Accept": "application/json"}, ) ).json()["access_token"] gh_user: int = ( await client.get( "https://api.github.com/user", headers={ "Accept": "application/vnd.github.v3+json", "Authorization": f"Bearer {token}", }, ) ).json()["id"] if gh_user == db_pr.gh_user_id: await db.pullrequest.update( {"user": {"connect": {"id": db_user.id}}, "gh_user_id": gh_user}, {"id": db_pr.id}, ) stream_recs = await get_recording_list(user_stream_key) if stream_recs == []: return HTMLResponse( "
This info might be of use to them: {FERNET.encrypt(bytes(str(db_pr.gh_user_id) + " " + str(gh_user) + " " + user_id + " " + pr_id + " " + state, encoding='utf-8'))}
", status_code=403, ) @api.post("/api/v1/github/pr_event") async def pr_event(request: Request): verify_gh_signature( await request.body(), os.environ["GH_HOOK_SECRET"], request.headers.get("x-hub-signature-256"), ) body = json.loads(await request.body()) if body["action"] == "labeled": if body["label"]["id"] == 7336079497: print("Added label has same id as OBL label!") await db.pullrequest.create( { "github_id": body["pull_request"]["number"], "gh_user_id": body["pull_request"]["user"]["id"], } ) return @api.get("/api/v1/stream_key/{stream_key}") async def get_stream_by_key(stream_key: str): stream = await db.stream.find_first(where={"key": stream_key}) return ( stream if stream else Response(status_code=404, content="404: Stream not found") ) @api.get("/api/v1/active_stream") async def get_active_stream(): return active_stream["name"] if "name" in active_stream else "" @bolt.event("app_home_opened") async def handle_app_home_opened_events(body, logger, event, client): await client.views_publish( user_id=event["user"], # the view object that appears in the app home view={ "type": "home", "callback_id": "home_view", # body of the view "blocks": [ { "type": "section", "text": { "type": "mrkdwn", "text": "Welcome to OnBoard Live! Try sending `/onboard-live-apply` in the #onboard-live channel to get started!", }, }, ], }, ) @bolt.action("deny") async def deny(ack, body): await ack() message = body["message"] applicant_slack_id = message["blocks"][len(message) - 3]["text"]["text"].split( ": " )[ 1 ] # I hate it. You hate it. We all hate it. Carry on. applicant_name = message["blocks"][len(message) - 7]["text"]["text"].split( "Name: " )[ 1 ] # oops i did it again await bolt.client.chat_delete( channel=body["container"]["channel_id"], ts=message["ts"] ) await bolt.client.chat_postMessage( channel=body["container"]["channel_id"], text=f"{applicant_name}'s application has been denied! Remember to reach out to them if this is a fixable issue. Their username is <@{applicant_slack_id}>.", ) @bolt.action("approve") async def approve(ack, body): await ack() message = body["message"] applicant_slack_id = message["blocks"][len(message) - 3]["text"]["text"].split( ": " )[ 1 ] # I hate it. You hate it. We all hate it. Carry on. applicant_name = message["blocks"][len(message) - 7]["text"]["text"].split( "Name: " )[ 1 ] # oops i did it again await bolt.client.chat_delete( channel=body["container"]["channel_id"], ts=message["ts"] ) await bolt.client.chat_postMessage( channel=body["container"]["channel_id"], text=f"{applicant_name}'s application has been approved! Their username is <@{applicant_slack_id}>.", ) if applicant_slack_id in [d.slack_id for d in await db.user.find_many()]: # type: ignore return new_user = await db.user.create( {"slack_id": applicant_slack_id, "name": applicant_name} ) new_stream = await db.stream.create( {"user": {"connect": {"id": new_user.id}}, "key": token_hex(16)} ) sumbitter_convo = await bolt.client.conversations_open( users=applicant_slack_id, return_im=True ) async with httpx.AsyncClient() as client: await client.post( "http://127.0.0.1:9997/v3/config/paths/add/" + new_stream.key, json={"name": new_stream.key}, ) await bolt.client.chat_postMessage( channel=sumbitter_convo["channel"]["id"], text=f"Welcome to OnBoard Live! Your stream key is {new_stream.key}. To use your stream key the easy way, go to