u2b.cx/server.py

218 lines
6.5 KiB
Python

from yt_dlp import YoutubeDL
from importlib.metadata import version
print("yt-dlp version", version("yt_dlp"))
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.parse import unquote, urlparse, parse_qs
from threading import Event, Thread
from datetime import datetime, timedelta
from time import sleep
from os import environ, makedirs, stat
import logging
import re
from ffmpeg import FFmpeg
import textwrap
from pathlib import Path
from hashlib import sha256
from shutil import copyfileobj
from math import ceil
if 'DEBUG' in environ:
logging.basicConfig(level=logging.DEBUG)
class Ratelimit(Exception): pass
class CachedException(Exception): pass
class Handler(BaseHTTPRequestHandler):
def address_string(self):
return getattr(self, 'headers', {}).get('X-Forwarded-For', '').split(',')[0] or self.client_address[0]
def send_error(self, code, message=""):
body = bytes(message, "utf-8")
self.send_response(code)
self.send_header("Content-Type", "text/plain")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def send_error_video(self, text: str):
makedirs("errors", exist_ok=True)
hash = sha256(bytes(text, "utf8")).hexdigest()
file = Path(f"errors/{hash}.mp4")
if not file.exists():
text = re.sub("(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]", '', text)
text = text.replace("\\", "\\\\").replace('"', '""').replace("'", "''").replace("%", "\\%").replace(":", "\\:")
text = textwrap.fill(text, 90)
peg = FFmpeg().option("y").input("bg.png", {
"framerate": "0.1"
}).output(str(file), {
"f": "mp4",
"t": "10",
"c:v": "libx264",
"pix_fmt": "yuv420p",
"vf": "drawtext=font=monospace:fontsize=24:x=10:y=10:text='"+text+"':"
})
@peg.on("start")
def on_start(arguments):
logging.debug("cmd:" + ' '.join(arguments))
@peg.on("stderr")
def on_stderr(line):
logging.debug("stderr:" + line)
peg.execute()
with file.open('rb') as f:
fs = stat(f.fileno())
self.send_response(200)
self.send_header("Content-Type", "video/mp4")
self.send_header("Content-Length", str(fs.st_size))
self.end_headers()
copyfileobj(f, self.wfile)
def do_GET(self):
try:
if self.path in ["/", "/favicon.ico"] or self.path.startswith("/."):
self.send_error(404)
return
path = unquote(self.path)
id_match = re.match("\/(?:id\/|(?:https?:\/\/)?(?:(?:www\.|music\.|m\.)?youtube\.com\/(?:watch\?v=|shorts\/|live\/)|youtu\.be\/))([A-Za-z0-9_-]{11})", path)
if id_match:
video_id = id_match[1]
else:
search_match = re.match("^\/(.+?)(?:\/(\d*))?$", path)
if not search_match:
self.send_error(404)
return
search_query = search_match[1]
search_index = int(search_match[2]) if search_match[2] and search_match[2].isdigit() else 1
search_index = max(min(search_index, 100), 1)
video_id = ytdl_search_to_id(self, search_query, search_index)
self.send_response(302)
self.send_header("Location", f"https://www.youtube.com/watch?v={video_id}")
self.end_headers()
#half this code now defunct
return
video_url = ytdl_resolve_mp4_url(self, video_id)
if 'PROXY' in environ:
video_url = environ['PROXY'] + video_url.replace("https://",'')
self.send_response(302)
self.send_header("Location", video_url)
self.end_headers()
except Ratelimit:
self.send_error(429)
except CachedException as e:
self.send_error_video(str(e))
except Exception as e:
logging.exception(e)
self.send_error_video(str(e))
ips_running_ytdl = []
def invoke_youtubedl(self: Handler, input: str) -> dict:
ip = self.address_string()
if ip in ips_running_ytdl:
raise Ratelimit()
ips_running_ytdl.append(ip)
try:
with YoutubeDL({'extractor_args': {'youtube': {'skip': ['dash', 'hls']}}}) as ydl:
return ydl.extract_info(input, download=False, process=False)
finally:
ips_running_ytdl.remove(ip)
search_cache = {}
def ytdl_search_to_id(self: Handler, query: str, index: int) -> str:
ctx = search_cache.get(query)
if ctx:
ctx['event'].wait(60)
if 'error' in ctx:
raise CachedException(ctx['error'])
results = ctx.get('results')
else:
results = None
if results == None or results['count'] < index or datetime.now() >= ctx['expires_at']:
search_cache[query] = ctx = {
'event': Event(),
'expires_at': datetime.now() + timedelta(hours=5)
}
try:
count = ceil(index/10)*10
info = invoke_youtubedl(self, f"ytsearch{count}:{query}")
entries = list(info['entries'])
if not entries:
raise Exception("ERROR: No results!")
ids = [video['id'] for video in entries]
ctx['results'] = results = {'ids': ids, 'count': count}
except Exception as e:
ctx['error'] = str(e)
raise
finally:
ctx['event'].set()
return results['ids'][min(index, len(results['ids'])) - 1]
resolve_cache = {}
def ytdl_resolve_mp4_url(self: Handler, input: str) -> str:
ctx = resolve_cache.get(input)
if ctx and datetime.now() <= ctx['expires_at']:
ctx['event'].wait(60)
if 'error' in ctx:
raise CachedException(ctx['error'])
return ctx['url']
resolve_cache[input] = ctx = {
'event': Event(),
'expires_at': datetime.now() + timedelta(hours=5)
}
try:
info = invoke_youtubedl(self, input)
selection = info
if "entries" in info:
if not info["entries"]:
raise Exception("ERROR: No video found!")
else:
selection = info["entries"][0]
suitable_formats = list(filter(lambda x: x['ext'] == "mp4" and x['vcodec'] != 'none' and x['acodec'] != 'none', selection["formats"]))
if not suitable_formats:
raise Exception(f"ERROR: {selection['id']}: No suitable formats of this video available!")
best_format = max(suitable_formats, key=lambda x: x['height'])
ctx['url'] = url = best_format['url']
try:
expire = parse_qs(urlparse(url).query).get('expire', [])[0]
if expire:
expire = datetime.fromtimestamp(int(expire))
if expire < ctx['expires_at']:
ctx['expires_at'] = expire
except Exception as e:
logging.exception("failed parsing expire", e)
except Exception as e:
ctx['error'] = str(e)
raise
finally:
ctx['event'].set()
return url
def cache_prune_loop():
while True:
sleep(3600)
for key in list(search_cache.keys()):
if datetime.now() >= search_cache[key]['expires_at']:
del search_cache[key]
for key in list(resolve_cache.keys()):
if datetime.now() >= resolve_cache[key]['expires_at']:
del resolve_cache[key]
Thread(target=cache_prune_loop, daemon=True).start()
with ThreadingHTTPServer((environ.get('ADDRESS', ''), int(environ.get('PORT', 80))), Handler) as server:
server.serve_forever()