from yt_dlp import YoutubeDL from importlib.metadata import version print("yt-dlp version", version("yt_dlp")) from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from urllib.parse import unquote, urlparse, parse_qs from threading import Event, Thread from datetime import datetime, timedelta from time import sleep from os import environ, makedirs, stat import logging import re from ffmpeg import FFmpeg import textwrap from pathlib import Path from hashlib import sha256 from shutil import copyfileobj from math import ceil if 'DEBUG' in environ: logging.basicConfig(level=logging.DEBUG) class Ratelimit(Exception): pass class CachedException(Exception): pass class Handler(BaseHTTPRequestHandler): def address_string(self): return getattr(self, 'headers', {}).get('X-Forwarded-For', '').split(',')[0] or self.client_address[0] def send_error(self, code, message=""): body = bytes(message, "utf-8") self.send_response(code) self.send_header("Content-Type", "text/plain") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) def send_error_video(self, text: str): makedirs("errors", exist_ok=True) hash = sha256(bytes(text, "utf8")).hexdigest() file = Path(f"errors/{hash}.mp4") if not file.exists(): text = re.sub("(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]", '', text) text = text.replace("\\", "\\\\").replace('"', '""').replace("'", "''").replace("%", "\\%").replace(":", "\\:") text = textwrap.fill(text, 90) peg = FFmpeg().option("y").input("bg.png", { "framerate": "0.1" }).output(str(file), { "f": "mp4", "t": "10", "c:v": "libx264", "pix_fmt": "yuv420p", "vf": "drawtext=font=monospace:fontsize=24:x=10:y=10:text='"+text+"':" }) @peg.on("start") def on_start(arguments): logging.debug("cmd:" + ' '.join(arguments)) @peg.on("stderr") def on_stderr(line): logging.debug("stderr:" + line) peg.execute() with file.open('rb') as f: fs = stat(f.fileno()) self.send_response(200) self.send_header("Content-Type", "video/mp4") self.send_header("Content-Length", str(fs.st_size)) self.end_headers() copyfileobj(f, self.wfile) def do_GET(self): try: if self.path in ["/", "/favicon.ico"] or self.path.startswith("/."): self.send_error(404) return path = unquote(self.path) id_match = re.match("\/(?:id\/|(?:https?:\/\/)?(?:(?:www\.|music\.|m\.)?youtube\.com\/(?:watch\?v=|shorts\/|live\/)|youtu\.be\/))([A-Za-z0-9_-]{11})", path) if id_match: video_id = id_match[1] else: search_match = re.match("^\/(.+?)(?:\/(\d*))?$", path) if not search_match: self.send_error(404) return search_query = search_match[1] search_index = int(search_match[2]) if search_match[2] and search_match[2].isdigit() else 1 search_index = max(min(search_index, 100), 1) video_id = ytdl_search_to_id(self, search_query, search_index) self.send_response(302) self.send_header("Location", f"https://www.youtube.com/watch?v={video_id}") self.end_headers() #half this code now defunct return video_url = ytdl_resolve_mp4_url(self, video_id) if 'PROXY' in environ: video_url = environ['PROXY'] + video_url.replace("https://",'') self.send_response(302) self.send_header("Location", video_url) self.end_headers() except Ratelimit: self.send_error(429) except CachedException as e: self.send_error_video(str(e)) except Exception as e: logging.exception(e) self.send_error_video(str(e)) ips_running_ytdl = [] def invoke_youtubedl(self: Handler, input: str) -> dict: ip = self.address_string() if ip in ips_running_ytdl: raise Ratelimit() ips_running_ytdl.append(ip) try: with YoutubeDL({'extractor_args': {'youtube': {'skip': ['dash', 'hls']}}}) as ydl: return ydl.extract_info(input, download=False, process=False) finally: ips_running_ytdl.remove(ip) search_cache = {} def ytdl_search_to_id(self: Handler, query: str, index: int) -> str: ctx = search_cache.get(query) if ctx: ctx['event'].wait(60) if 'error' in ctx: raise CachedException(ctx['error']) results = ctx.get('results') else: results = None if results == None or results['count'] < index or datetime.now() >= ctx['expires_at']: search_cache[query] = ctx = { 'event': Event(), 'expires_at': datetime.now() + timedelta(hours=5) } try: count = ceil(index/10)*10 info = invoke_youtubedl(self, f"ytsearch{count}:{query}") entries = list(info['entries']) if not entries: raise Exception("ERROR: No results!") ids = [video['id'] for video in entries] ctx['results'] = results = {'ids': ids, 'count': count} except Exception as e: ctx['error'] = str(e) raise finally: ctx['event'].set() return results['ids'][min(index, len(results['ids'])) - 1] resolve_cache = {} def ytdl_resolve_mp4_url(self: Handler, input: str) -> str: ctx = resolve_cache.get(input) if ctx and datetime.now() <= ctx['expires_at']: ctx['event'].wait(60) if 'error' in ctx: raise CachedException(ctx['error']) return ctx['url'] resolve_cache[input] = ctx = { 'event': Event(), 'expires_at': datetime.now() + timedelta(hours=5) } try: info = invoke_youtubedl(self, input) selection = info if "entries" in info: if not info["entries"]: raise Exception("ERROR: No video found!") else: selection = info["entries"][0] suitable_formats = list(filter(lambda x: x['ext'] == "mp4" and x['vcodec'] != 'none' and x['acodec'] != 'none', selection["formats"])) if not suitable_formats: raise Exception(f"ERROR: {selection['id']}: No suitable formats of this video available!") best_format = max(suitable_formats, key=lambda x: x['height']) ctx['url'] = url = best_format['url'] try: expire = parse_qs(urlparse(url).query).get('expire', [])[0] if expire: expire = datetime.fromtimestamp(int(expire)) if expire < ctx['expires_at']: ctx['expires_at'] = expire except Exception as e: logging.exception("failed parsing expire", e) except Exception as e: ctx['error'] = str(e) raise finally: ctx['event'].set() return url def cache_prune_loop(): while True: sleep(3600) for key in list(search_cache.keys()): if datetime.now() >= search_cache[key]['expires_at']: del search_cache[key] for key in list(resolve_cache.keys()): if datetime.now() >= resolve_cache[key]['expires_at']: del resolve_cache[key] Thread(target=cache_prune_loop, daemon=True).start() with ThreadingHTTPServer((environ.get('ADDRESS', ''), int(environ.get('PORT', 80))), Handler) as server: server.serve_forever()