import asyncio import http.cookies import json import re import subprocess from time import time from urllib.parse import quote, urlparse import m3u8 from aiohttp import ClientSession, TCPConnector from multidict import CIMultiDictProxy import utils.constants as constants from utils.config import config from utils.tools import is_ipv6, remove_cache_info, get_resolution_value http.cookies._is_legal_key = lambda _: True async def get_speed_with_download(url: str, session: ClientSession = None, timeout: int = config.sort_timeout) -> dict[ str, float | None]: """ Get the speed of the url with a total timeout """ start_time = time() total_size = 0 total_time = 0 info = {'speed': None, 'delay': None} if session is None: session = ClientSession(connector=TCPConnector(ssl=False), trust_env=True) created_session = True else: created_session = False try: async with session.get(url, timeout=timeout) as response: if response.status != 200: raise Exception("Invalid response") info['delay'] = int(round((time() - start_time) * 1000)) async for chunk in response.content.iter_any(): if chunk: total_size += len(chunk) except: pass finally: if total_size > 0: total_time += time() - start_time info['speed'] = ((total_size / total_time) if total_time > 0 else 0) / 1024 / 1024 if created_session: await session.close() return info async def get_m3u8_headers(url: str, session: ClientSession = None, timeout: int = 5) -> CIMultiDictProxy[str] | dict[ any, any]: """ Get the headers of the m3u8 url """ if session is None: session = ClientSession(connector=TCPConnector(ssl=False), trust_env=True) created_session = True else: created_session = False headers = {} try: async with session.head(url, timeout=timeout) as response: headers = response.headers except: pass finally: if created_session: await session.close() return headers def check_m3u8_valid(headers: CIMultiDictProxy[str] | dict[any, any]) -> bool: """ Check if the m3u8 url is valid """ content_type = headers.get('Content-Type', '').lower() if not content_type: return False return any(item in content_type for item in ['application/vnd.apple.mpegurl', 'audio/mpegurl', 'audio/x-mpegurl']) async def get_speed_m3u8(url: str, filter_resolution: bool = config.open_filter_resolution, timeout: int = config.sort_timeout) -> dict[str, float | None]: """ Get the speed of the m3u8 url with a total timeout """ info = {'speed': None, 'delay': None, 'resolution': None} location = None try: url = quote(url, safe=':/?$&=@[]').partition('$')[0] async with ClientSession(connector=TCPConnector(ssl=False), trust_env=True) as session: headers = await get_m3u8_headers(url, session) location = headers.get('Location') if location: info.update(await get_speed_m3u8(location, filter_resolution, timeout)) elif check_m3u8_valid(headers): m3u8_obj = m3u8.load(url, timeout=2) playlists = m3u8_obj.data.get('playlists') segments = m3u8_obj.segments if not segments and playlists: parsed_url = urlparse(url) uri = f"{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path.rsplit('/', 1)[0]}/{playlists[0].get('uri', '')}" uri_headers = await get_m3u8_headers(uri, session) if not check_m3u8_valid(uri_headers): if uri_headers.get('Content-Length'): info.update(await get_speed_with_download(uri, session, timeout)) raise Exception("Invalid m3u8") m3u8_obj = m3u8.load(uri, timeout=2) segments = m3u8_obj.segments if not segments: raise Exception("Segments not found") ts_urls = [segment.absolute_uri for segment in segments] speed_list = [] start_time = time() for ts_url in ts_urls: if time() - start_time > timeout: break download_info = await get_speed_with_download(ts_url, session, timeout) speed_list.append(download_info['speed']) if info['delay'] is None and download_info['delay'] is not None: info['delay'] = download_info['delay'] info['speed'] = (sum(speed_list) / len(speed_list)) if speed_list else 0 elif headers.get('Content-Length'): info.update(await get_speed_with_download(url, session, timeout)) except: pass finally: if filter_resolution and not location and info['delay'] is not None: info['resolution'] = await get_resolution_ffprobe(url, timeout) return info async def get_delay_requests(url, timeout=config.sort_timeout, proxy=None): """ Get the delay of the url by requests """ async with ClientSession( connector=TCPConnector(ssl=False), trust_env=True ) as session: start = time() end = None try: async with session.get(url, timeout=timeout, proxy=proxy) as response: if response.status == 404: return float("inf") content = await response.read() if content: end = time() else: return float("inf") except Exception as e: return float("inf") return int(round((end - start) * 1000)) if end else float("inf") def check_ffmpeg_installed_status(): """ Check ffmpeg is installed """ status = False try: result = subprocess.run( ["ffmpeg", "-version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) status = result.returncode == 0 except FileNotFoundError: status = False except Exception as e: print(e) finally: return status async def ffmpeg_url(url, timeout=config.sort_timeout): """ Get url info by ffmpeg """ args = ["ffmpeg", "-t", str(timeout), "-stats", "-i", url, "-f", "null", "-"] proc = None res = None try: proc = await asyncio.create_subprocess_exec( *args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) out, err = await asyncio.wait_for(proc.communicate(), timeout=timeout + 2) if out: res = out.decode("utf-8") if err: res = err.decode("utf-8") return None except asyncio.TimeoutError: if proc: proc.kill() return None except Exception: if proc: proc.kill() return None finally: if proc: await proc.wait() return res async def get_resolution_ffprobe(url: str, timeout: int = config.sort_timeout) -> str | None: """ Get the resolution of the url by ffprobe """ resolution = None proc = None try: probe_args = [ 'ffprobe', '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height', "-of", 'json', url ] proc = await asyncio.create_subprocess_exec(*probe_args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) out, _ = await asyncio.wait_for(proc.communicate(), timeout) video_stream = json.loads(out.decode('utf-8'))["streams"][0] resolution = f"{video_stream['width']}x{video_stream['height']}" except: if proc: proc.kill() finally: if proc: await proc.wait() return resolution def get_video_info(video_info): """ Get the video info """ frame_size = float("inf") resolution = None if video_info is not None: info_data = video_info.replace(" ", "") matches = re.findall(r"frame=(\d+)", info_data) if matches: frame_size = int(matches[-1]) match = re.search(r"(\d{3,4}x\d{3,4})", video_info) if match: resolution = match.group(0) return frame_size, resolution async def check_stream_delay(url_info): """ Check the stream delay """ try: url = url_info[0] video_info = await ffmpeg_url(url) if video_info is None: return float("inf") frame, resolution = get_video_info(video_info) if frame is None or frame == float("inf"): return float("inf") url_info[2] = resolution return url_info, frame except Exception as e: print(e) return float("inf") cache = {} async def get_speed(url, ipv6_proxy=None, filter_resolution=config.open_filter_resolution, timeout=config.sort_timeout, callback=None): """ Get the speed (response time and resolution) of the url """ data = {'speed': None, 'delay': None, 'resolution': None} try: cache_key = None url_is_ipv6 = is_ipv6(url) if "$" in url: url, _, cache_info = url.partition("$") matcher = re.search(r"cache:(.*)", cache_info) if matcher: cache_key = matcher.group(1) if cache_key in cache: return cache[cache_key][0] if ipv6_proxy and url_is_ipv6: data['speed'] = float("inf") data['delay'] = float("-inf") data['resolution'] = "1920x1080" elif re.match(constants.rtmp_url_pattern, url) is not None: start_time = time() data['resolution'] = await get_resolution_ffprobe(url, timeout) data['delay'] = int(round((time() - start_time) * 1000)) data['speed'] = float("inf") if data['resolution'] is not None else 0 else: data.update(await get_speed_m3u8(url, filter_resolution, timeout)) if cache_key and cache_key not in cache: cache[cache_key] = data return data except: return data finally: if callback: callback() def sort_urls_key(item): """ Sort the urls with key """ speed, resolution, origin = item["speed"], item["resolution"], item["origin"] if origin == "whitelist": return float("inf") else: return (speed if speed is not None else float("-inf")) + get_resolution_value(resolution) def sort_urls(name, data, supply=config.open_supply, filter_speed=config.open_filter_speed, min_speed=config.min_speed, filter_resolution=config.open_filter_resolution, min_resolution=config.min_resolution_value, logger=None): """ Sort the urls with info """ filter_data = [] for url, date, resolution, origin in data: result = { "url": remove_cache_info(url), "date": date, "delay": None, "speed": None, "resolution": resolution, "origin": origin } if origin == "whitelist": filter_data.append(result) continue cache_key_match = re.search(r"cache:(.*)", url.partition("$")[2]) cache_key = cache_key_match.group(1) if cache_key_match else None if cache_key and cache_key in cache: cache_item = cache[cache_key] if cache_item: speed, delay, cache_resolution = cache_item['speed'], cache_item['delay'], cache_item['resolution'] resolution = cache_resolution or resolution if speed is not None: try: if logger: logger.info( f"Name: {name}, URL: {result["url"]}, Date: {date}, Delay: {delay} ms, Speed: {speed:.2f} M/s, Resolution: {resolution}" ) except Exception as e: print(e) if (not supply and filter_speed and speed < min_speed) or ( not supply and filter_resolution and get_resolution_value(resolution) < min_resolution) or ( supply and delay is None): continue result["delay"] = delay result["speed"] = speed result["resolution"] = resolution filter_data.append(result) filter_data.sort(key=sort_urls_key, reverse=True) return [ (item["url"], item["date"], item["resolution"], item["origin"]) for item in filter_data ]