from yt_dlp import YoutubeDL from importlib.metadata import version print("yt-dlp version", version("yt_dlp")) from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from urllib.parse import unquote, urlparse, parse_qs from threading import Event, Thread from datetime import datetime, timedelta from time import sleep from os import environ import logging import re from textvid import generate_video_from_text ctx_cache = {} ips_running_ytdl = [] def cache_prune_loop(): while True: sleep(3600) for key in ctx_cache: if datetime.now() >= ctx_cache[key]['expire']: del ctx_cache[key] Thread(target=cache_prune_loop, daemon=True).start() class Handler(BaseHTTPRequestHandler): def address_string(self): return getattr(self, 'headers', {}).get('X-Forwarded-For', '').split(',')[0] or self.client_address[0] def is_pc_vrchat(self): ua = self.headers.get('User-Agent', '') ae = self.headers.get('Accept-Encoding', '') return ua.startswith("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/") and ua.endswith(" Safari/537.36") and ae == "identity" def send_error(self, code, message=""): body = bytes(message, "utf-8") self.send_response(code) self.send_header("Content-Type", "text/plain") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def do_GET(self): # block other bot junk in reverse proxy if self.path in ["/", "/favicon.ico"] or self.path.startswith("/."): self.send_error(404) return path = unquote(self.path) match = re.match("\/(?:id\/|(?:https?:\/\/)?(?:(?:www\.|music\.|m\.)?youtube\.com\/(?:watch\?v=|shorts\/)|youtu\.be\/))([A-Za-z0-9_-]{11})", path) if match: if self.is_pc_vrchat(): self.send_response(302) self.send_header("Location", "https://www.youtube.com/watch?v=" + match[1]) self.end_headers() return query = match[1] else: query = "ytsearch:" + path[1:] ctx = ctx_cache.get(query) if not ctx or 'expire' in ctx and datetime.now() >= ctx['expire']: client_ip = self.address_string() if client_ip in ips_running_ytdl: self.send_error(429) return try: ips_running_ytdl.append(client_ip) ctx_cache[query] = ctx = { 'event': Event(), 'expire': datetime.now() + timedelta(hours=5) } with YoutubeDL() as ydl: info = ydl.extract_info(query, download=False) selection = info if "entries" in info: if not info["entries"]: raise Exception("ERROR: No videos found!") else: selection = info["entries"][0] ctx['id'] = selection['id'] suitable_formats = list(filter(lambda x: x['ext'] == "mp4" and x['vcodec'] != 'none' and x['acodec'] != 'none', selection["formats"])) if not suitable_formats: raise Exception(f"ERROR: {selection['id']}: No suitable formats of this video available!") best_format = max(suitable_formats, key=lambda x: x['height']) ctx['url'] = best_format['url'] expire = parse_qs(urlparse(best_format['url']).query).get('expire', [])[0] if expire: expire = datetime.fromtimestamp(int(expire)) if expire < ctx['expire']: ctx['expire'] = expire except Exception as e: logging.exception(e) ctx['exception'] = e ctx['error_vid'] = generate_video_from_text(re.sub("(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]", '', str(e))) finally: ips_running_ytdl.remove(client_ip) ctx['event'].set() elif 'url' not in ctx: ctx['event'].wait(60) if self.is_pc_vrchat(): if ctx.get('id'): self.send_response(302) self.send_header("Location", "https://www.youtube.com/watch?v=" + ctx['id']) self.end_headers() return if not ctx.get('url'): if 'exception' in ctx: if 'error_vid' in ctx: self.send_response(200) self.send_header("Content-Type", "video/mp4") self.send_header("Content-Length", str(len(ctx['error_vid']))) self.end_headers() self.wfile.write(ctx['error_vid']) else: self.send_error(500, message=str(ctx['exception'])) else: self.send_error(404) else: url = ctx['url'] if 'PROXY' in environ: url = environ['PROXY'] + url.replace("https://",'') self.send_response(302) self.send_header("Location", url) self.end_headers() with ThreadingHTTPServer((environ.get('ADDRESS', ''), int(environ.get('PORT', 80))), Handler) as server: server.serve_forever()