135 lines
4.3 KiB
Python
135 lines
4.3 KiB
Python
from yt_dlp import YoutubeDL
|
|
from importlib.metadata import version
|
|
print("yt-dlp version", version("yt_dlp"))
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from urllib.parse import unquote, urlparse, parse_qs
|
|
from threading import Event, Thread
|
|
from datetime import datetime, timedelta
|
|
from time import sleep
|
|
from os import environ
|
|
import logging
|
|
import re
|
|
from textvid import generate_video_from_text
|
|
|
|
ctx_cache = {}
|
|
ips_running_ytdl = []
|
|
|
|
def cache_prune_loop():
|
|
while True:
|
|
sleep(3600)
|
|
for key in ctx_cache:
|
|
if datetime.now() >= ctx_cache[key]['expire']:
|
|
del ctx_cache[key]
|
|
Thread(target=cache_prune_loop, daemon=True).start()
|
|
|
|
class Handler(BaseHTTPRequestHandler):
|
|
def address_string(self):
|
|
return getattr(self, 'headers', {}).get('X-Forwarded-For', '').split(',')[0] or self.client_address[0]
|
|
|
|
def is_pc_vrchat(self):
|
|
ua = self.headers.get('User-Agent', '')
|
|
ae = self.headers.get('Accept-Encoding', '')
|
|
return ua.startswith("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/") and ua.endswith(" Safari/537.36") and ae == "identity"
|
|
|
|
def send_error(self, code, message=""):
|
|
body = bytes(message, "utf-8")
|
|
self.send_response(code)
|
|
self.send_header("Content-Type", "text/plain")
|
|
self.send_header("Content-Length", str(len(body)))
|
|
self.end_headers()
|
|
self.wfile.write(body)
|
|
|
|
def do_GET(self):
|
|
# block other bot junk in reverse proxy
|
|
if self.path in ["/", "/favicon.ico"] or self.path.startswith("/."):
|
|
self.send_error(404)
|
|
return
|
|
|
|
path = unquote(self.path)
|
|
match = re.match("\/(?:id\/|(?:https?:\/\/)?(?:(?:www\.|music\.|m\.)?youtube\.com\/(?:watch\?v=|shorts\/)|youtu\.be\/))([A-Za-z0-9_-]{11})", path)
|
|
if match:
|
|
if self.is_pc_vrchat():
|
|
self.send_response(302)
|
|
self.send_header("Location", "https://www.youtube.com/watch?v=" + match[1])
|
|
self.end_headers()
|
|
return
|
|
query = match[1]
|
|
else:
|
|
query = "ytsearch:" + path[1:]
|
|
|
|
ctx = ctx_cache.get(query)
|
|
|
|
if not ctx or 'expire' in ctx and datetime.now() >= ctx['expire']:
|
|
client_ip = self.address_string()
|
|
if client_ip in ips_running_ytdl:
|
|
self.send_error(429)
|
|
return
|
|
try:
|
|
ips_running_ytdl.append(client_ip)
|
|
ctx_cache[query] = ctx = {
|
|
'event': Event(),
|
|
'expire': datetime.now() + timedelta(hours=5)
|
|
}
|
|
with YoutubeDL() as ydl:
|
|
info = ydl.extract_info(query, download=False)
|
|
|
|
selection = info
|
|
if "entries" in info:
|
|
if not info["entries"]:
|
|
raise Exception("ERROR: No videos found!")
|
|
else:
|
|
selection = info["entries"][0]
|
|
|
|
ctx['id'] = selection['id']
|
|
|
|
suitable_formats = list(filter(lambda x: x['ext'] == "mp4" and x['vcodec'] != 'none' and x['acodec'] != 'none', selection["formats"]))
|
|
if not suitable_formats:
|
|
raise Exception(f"ERROR: {selection['id']}: No suitable formats of this video available!")
|
|
best_format = max(suitable_formats, key=lambda x: x['height'])
|
|
|
|
ctx['url'] = best_format['url']
|
|
|
|
expire = parse_qs(urlparse(best_format['url']).query).get('expire', [])[0]
|
|
if expire:
|
|
expire = datetime.fromtimestamp(int(expire))
|
|
if expire < ctx['expire']: ctx['expire'] = expire
|
|
except Exception as e:
|
|
logging.exception(e)
|
|
ctx['exception'] = e
|
|
ctx['error_vid'] = generate_video_from_text(re.sub("(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]", '', str(e)))
|
|
finally:
|
|
ips_running_ytdl.remove(client_ip)
|
|
ctx['event'].set()
|
|
elif 'url' not in ctx:
|
|
ctx['event'].wait(60)
|
|
|
|
if self.is_pc_vrchat():
|
|
if ctx.get('id'):
|
|
self.send_response(302)
|
|
self.send_header("Location", "https://www.youtube.com/watch?v=" + ctx['id'])
|
|
self.end_headers()
|
|
return
|
|
|
|
if not ctx.get('url'):
|
|
if 'exception' in ctx:
|
|
if 'error_vid' in ctx:
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "video/mp4")
|
|
self.send_header("Content-Length", str(len(ctx['error_vid'])))
|
|
self.end_headers()
|
|
self.wfile.write(ctx['error_vid'])
|
|
else:
|
|
self.send_error(500, message=str(ctx['exception']))
|
|
else:
|
|
self.send_error(404)
|
|
else:
|
|
url = ctx['url']
|
|
if 'PROXY' in environ:
|
|
url = environ['PROXY'] + url.replace("https://",'')
|
|
self.send_response(302)
|
|
self.send_header("Location", url)
|
|
self.end_headers()
|
|
|
|
with ThreadingHTTPServer((environ.get('ADDRESS', ''), int(environ.get('PORT', 80))), Handler) as server:
|
|
server.serve_forever()
|