u2b.cx/server.py

135 lines
4.3 KiB
Python

from yt_dlp import YoutubeDL
from importlib.metadata import version
print("yt-dlp version", version("yt_dlp"))
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.parse import unquote, urlparse, parse_qs
from threading import Event, Thread
from datetime import datetime, timedelta
from time import sleep
from os import environ
import logging
import re
from textvid import generate_video_from_text
ctx_cache = {}
ips_running_ytdl = []
def cache_prune_loop():
while True:
sleep(3600)
for key in ctx_cache:
if datetime.now() >= ctx_cache[key]['expire']:
del ctx_cache[key]
Thread(target=cache_prune_loop, daemon=True).start()
class Handler(BaseHTTPRequestHandler):
def address_string(self):
return getattr(self, 'headers', {}).get('X-Forwarded-For', '').split(',')[0] or self.client_address[0]
def is_pc_vrchat(self):
ua = self.headers.get('User-Agent', '')
ae = self.headers.get('Accept-Encoding', '')
return ua.startswith("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/") and ua.endswith(" Safari/537.36") and ae == "identity"
def send_error(self, code, message=""):
body = bytes(message, "utf-8")
self.send_response(code)
self.send_header("Content-Type", "text/plain")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self):
# block other bot junk in reverse proxy
if self.path in ["/", "/favicon.ico"] or self.path.startswith("/."):
self.send_error(404)
return
path = unquote(self.path)
match = re.match("\/(?:id\/|(?:https?:\/\/)?(?:(?:www\.|music\.|m\.)?youtube\.com\/(?:watch\?v=|shorts\/)|youtu\.be\/))([A-Za-z0-9_-]{11})", path)
if match:
if self.is_pc_vrchat():
self.send_response(302)
self.send_header("Location", "https://www.youtube.com/watch?v=" + match[1])
self.end_headers()
return
query = match[1]
else:
query = "ytsearch:" + path[1:]
ctx = ctx_cache.get(query)
if not ctx or 'expire' in ctx and datetime.now() >= ctx['expire']:
client_ip = self.address_string()
if client_ip in ips_running_ytdl:
self.send_error(429)
return
try:
ips_running_ytdl.append(client_ip)
ctx_cache[query] = ctx = {
'event': Event(),
'expire': datetime.now() + timedelta(hours=5)
}
with YoutubeDL() as ydl:
info = ydl.extract_info(query, download=False)
selection = info
if "entries" in info:
if not info["entries"]:
raise Exception("ERROR: No videos found!")
else:
selection = info["entries"][0]
ctx['id'] = selection['id']
suitable_formats = list(filter(lambda x: x['ext'] == "mp4" and x['vcodec'] != 'none' and x['acodec'] != 'none', selection["formats"]))
if not suitable_formats:
raise Exception(f"ERROR: {selection['id']}: No suitable formats of this video available!")
best_format = max(suitable_formats, key=lambda x: x['height'])
ctx['url'] = best_format['url']
expire = parse_qs(urlparse(best_format['url']).query).get('expire', [])[0]
if expire:
expire = datetime.fromtimestamp(int(expire))
if expire < ctx['expire']: ctx['expire'] = expire
except Exception as e:
logging.exception(e)
ctx['exception'] = e
ctx['error_vid'] = generate_video_from_text(re.sub("(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]", '', str(e)))
finally:
ips_running_ytdl.remove(client_ip)
ctx['event'].set()
elif 'url' not in ctx:
ctx['event'].wait(60)
if self.is_pc_vrchat():
if ctx.get('id'):
self.send_response(302)
self.send_header("Location", "https://www.youtube.com/watch?v=" + ctx['id'])
self.end_headers()
return
if not ctx.get('url'):
if 'exception' in ctx:
if 'error_vid' in ctx:
self.send_response(200)
self.send_header("Content-Type", "video/mp4")
self.send_header("Content-Length", str(len(ctx['error_vid'])))
self.end_headers()
self.wfile.write(ctx['error_vid'])
else:
self.send_error(500, message=str(ctx['exception']))
else:
self.send_error(404)
else:
url = ctx['url']
if 'PROXY' in environ:
url = environ['PROXY'] + url.replace("https://",'')
self.send_response(302)
self.send_header("Location", url)
self.end_headers()
with ThreadingHTTPServer((environ.get('ADDRESS', ''), int(environ.get('PORT', 80))), Handler) as server:
server.serve_forever()