218 lines
6.5 KiB
Python
218 lines
6.5 KiB
Python
from yt_dlp import YoutubeDL
|
|
from importlib.metadata import version
|
|
print("yt-dlp version", version("yt_dlp"))
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from urllib.parse import unquote, urlparse, parse_qs
|
|
from threading import Event, Thread
|
|
from datetime import datetime, timedelta
|
|
from time import sleep
|
|
from os import environ, makedirs, stat
|
|
import logging
|
|
import re
|
|
from ffmpeg import FFmpeg
|
|
import textwrap
|
|
from pathlib import Path
|
|
from hashlib import sha256
|
|
from shutil import copyfileobj
|
|
from math import ceil
|
|
|
|
if 'DEBUG' in environ:
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
class Ratelimit(Exception): pass
|
|
class CachedException(Exception): pass
|
|
class Handler(BaseHTTPRequestHandler):
|
|
def address_string(self):
|
|
return getattr(self, 'headers', {}).get('X-Forwarded-For', '').split(',')[0] or self.client_address[0]
|
|
|
|
def send_error(self, code, message=""):
|
|
body = bytes(message, "utf-8")
|
|
self.send_response(code)
|
|
self.send_header("Content-Type", "text/plain")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def send_error_video(self, text: str):
|
|
makedirs("errors", exist_ok=True)
|
|
hash = sha256(bytes(text, "utf8")).hexdigest()
|
|
file = Path(f"errors/{hash}.mp4")
|
|
|
|
if not file.exists():
|
|
text = re.sub("(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]", '', text)
|
|
text = text.replace("\\", "\\\\").replace('"', '""').replace("'", "''").replace("%", "\\%").replace(":", "\\:")
|
|
text = textwrap.fill(text, 90)
|
|
peg = FFmpeg().option("y").input("bg.png", {
|
|
"framerate": "0.1"
|
|
}).output(str(file), {
|
|
"f": "mp4",
|
|
"t": "10",
|
|
"c:v": "libx264",
|
|
"pix_fmt": "yuv420p",
|
|
"vf": "drawtext=font=monospace:fontsize=24:x=10:y=10:text='"+text+"':"
|
|
})
|
|
@peg.on("start")
|
|
def on_start(arguments):
|
|
logging.debug("cmd:" + ' '.join(arguments))
|
|
@peg.on("stderr")
|
|
def on_stderr(line):
|
|
logging.debug("stderr:" + line)
|
|
peg.execute()
|
|
|
|
with file.open('rb') as f:
|
|
fs = stat(f.fileno())
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "video/mp4")
|
|
self.send_header("Content-Length", str(fs.st_size))
|
|
self.end_headers()
|
|
copyfileobj(f, self.wfile)
|
|
|
|
def do_GET(self):
|
|
try:
|
|
if self.path in ["/", "/favicon.ico"] or self.path.startswith("/."):
|
|
self.send_error(404)
|
|
return
|
|
|
|
path = unquote(self.path)
|
|
id_match = re.match("\/(?:id\/|(?:https?:\/\/)?(?:(?:www\.|music\.|m\.)?youtube\.com\/(?:watch\?v=|shorts\/|live\/)|youtu\.be\/))([A-Za-z0-9_-]{11})", path)
|
|
|
|
if id_match:
|
|
video_id = id_match[1]
|
|
else:
|
|
search_match = re.match("^\/(.+?)(?:\/(\d*))?$", path)
|
|
if not search_match:
|
|
self.send_error(404)
|
|
return
|
|
search_query = search_match[1]
|
|
search_index = int(search_match[2]) if search_match[2] and search_match[2].isdigit() else 1
|
|
search_index = max(min(search_index, 100), 1)
|
|
video_id = ytdl_search_to_id(self, search_query, search_index)
|
|
|
|
self.send_response(302)
|
|
self.send_header("Location", f"https://www.youtube.com/watch?v={video_id}")
|
|
self.end_headers()
|
|
#half this code now defunct
|
|
return
|
|
|
|
video_url = ytdl_resolve_mp4_url(self, video_id)
|
|
|
|
if 'PROXY' in environ:
|
|
video_url = environ['PROXY'] + video_url.replace("https://",'')
|
|
self.send_response(302)
|
|
self.send_header("Location", video_url)
|
|
self.end_headers()
|
|
except Ratelimit:
|
|
self.send_error(429)
|
|
except CachedException as e:
|
|
self.send_error_video(str(e))
|
|
except Exception as e:
|
|
logging.exception(e)
|
|
self.send_error_video(str(e))
|
|
|
|
|
|
ips_running_ytdl = []
|
|
def invoke_youtubedl(self: Handler, input: str) -> dict:
|
|
ip = self.address_string()
|
|
if ip in ips_running_ytdl:
|
|
raise Ratelimit()
|
|
ips_running_ytdl.append(ip)
|
|
try:
|
|
with YoutubeDL({'extractor_args': {'youtube': {'skip': ['dash', 'hls']}}}) as ydl:
|
|
return ydl.extract_info(input, download=False, process=False)
|
|
finally:
|
|
ips_running_ytdl.remove(ip)
|
|
|
|
|
|
search_cache = {}
|
|
def ytdl_search_to_id(self: Handler, query: str, index: int) -> str:
|
|
ctx = search_cache.get(query)
|
|
if ctx:
|
|
ctx['event'].wait(60)
|
|
if 'error' in ctx:
|
|
raise CachedException(ctx['error'])
|
|
results = ctx.get('results')
|
|
else:
|
|
results = None
|
|
|
|
if results == None or results['count'] < index or datetime.now() >= ctx['expires_at']:
|
|
search_cache[query] = ctx = {
|
|
'event': Event(),
|
|
'expires_at': datetime.now() + timedelta(hours=5)
|
|
}
|
|
try:
|
|
count = ceil(index/10)*10
|
|
info = invoke_youtubedl(self, f"ytsearch{count}:{query}")
|
|
entries = list(info['entries'])
|
|
if not entries:
|
|
raise Exception("ERROR: No results!")
|
|
ids = [video['id'] for video in entries]
|
|
ctx['results'] = results = {'ids': ids, 'count': count}
|
|
except Exception as e:
|
|
ctx['error'] = str(e)
|
|
raise
|
|
finally:
|
|
ctx['event'].set()
|
|
|
|
return results['ids'][min(index, len(results['ids'])) - 1]
|
|
|
|
|
|
resolve_cache = {}
|
|
def ytdl_resolve_mp4_url(self: Handler, input: str) -> str:
|
|
ctx = resolve_cache.get(input)
|
|
if ctx and datetime.now() <= ctx['expires_at']:
|
|
ctx['event'].wait(60)
|
|
if 'error' in ctx:
|
|
raise CachedException(ctx['error'])
|
|
return ctx['url']
|
|
|
|
resolve_cache[input] = ctx = {
|
|
'event': Event(),
|
|
'expires_at': datetime.now() + timedelta(hours=5)
|
|
}
|
|
try:
|
|
info = invoke_youtubedl(self, input)
|
|
selection = info
|
|
if "entries" in info:
|
|
if not info["entries"]:
|
|
raise Exception("ERROR: No video found!")
|
|
else:
|
|
selection = info["entries"][0]
|
|
|
|
suitable_formats = list(filter(lambda x: x['ext'] == "mp4" and x['vcodec'] != 'none' and x['acodec'] != 'none', selection["formats"]))
|
|
if not suitable_formats:
|
|
raise Exception(f"ERROR: {selection['id']}: No suitable formats of this video available!")
|
|
best_format = max(suitable_formats, key=lambda x: x['height'])
|
|
|
|
ctx['url'] = url = best_format['url']
|
|
|
|
try:
|
|
expire = parse_qs(urlparse(url).query).get('expire', [])[0]
|
|
if expire:
|
|
expire = datetime.fromtimestamp(int(expire))
|
|
if expire < ctx['expires_at']:
|
|
ctx['expires_at'] = expire
|
|
except Exception as e:
|
|
logging.exception("failed parsing expire", e)
|
|
except Exception as e:
|
|
ctx['error'] = str(e)
|
|
raise
|
|
finally:
|
|
ctx['event'].set()
|
|
|
|
return url
|
|
|
|
|
|
def cache_prune_loop():
|
|
while True:
|
|
sleep(3600)
|
|
for key in list(search_cache.keys()):
|
|
if datetime.now() >= search_cache[key]['expires_at']:
|
|
del search_cache[key]
|
|
for key in list(resolve_cache.keys()):
|
|
if datetime.now() >= resolve_cache[key]['expires_at']:
|
|
del resolve_cache[key]
|
|
Thread(target=cache_prune_loop, daemon=True).start()
|
|
|
|
with ThreadingHTTPServer((environ.get('ADDRESS', ''), int(environ.get('PORT', 80))), Handler) as server:
|
|
server.serve_forever()
|