TV/updates/subscribe/request.py
2024-12-31 17:49:45 +08:00

146 lines
5.4 KiB
Python

from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from time import time
from requests import Session, exceptions
from tqdm.asyncio import tqdm_asyncio
import utils.constants as constants
from utils.channel import format_channel_name
from utils.config import config
from utils.retry import retry_func
from utils.tools import (
merge_objects,
get_pbar_remaining,
format_url_with_cache,
add_url_info,
get_name_url
)
async def get_channels_by_subscribe_urls(
urls,
multicast=False,
hotel=False,
retry=True,
error_print=True,
whitelist=None,
callback=None,
):
"""
Get the channels by subscribe urls
"""
if whitelist:
urls.sort(key=lambda url: whitelist.index(url) if url in whitelist else len(whitelist))
subscribe_results = {}
subscribe_urls_len = len(urls)
pbar = tqdm_asyncio(
total=subscribe_urls_len,
desc=f"Processing subscribe {'for multicast' if multicast else ''}",
)
start_time = time()
mode_name = "组播" if multicast else "酒店" if hotel else "订阅"
if callback:
callback(
f"正在获取{mode_name}源, 共{subscribe_urls_len}{mode_name}",
0,
)
hotel_name = constants.origin_map["hotel"]
multicast_name = constants.origin_map["multicast"]
subscribe_name = constants.origin_map["subscribe"]
def process_subscribe_channels(subscribe_info):
if (multicast or hotel) and isinstance(subscribe_info, dict):
region = subscribe_info.get("region")
type = subscribe_info.get("type", "")
subscribe_url = subscribe_info.get("url")
else:
subscribe_url = subscribe_info
channels = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))
in_whitelist = whitelist and (subscribe_url in whitelist)
session = Session()
try:
response = None
try:
response = (
retry_func(
lambda: session.get(
subscribe_url, timeout=config.request_timeout
),
name=subscribe_url,
)
if retry
else session.get(subscribe_url, timeout=config.request_timeout)
)
except exceptions.Timeout:
print(f"Timeout on subscribe: {subscribe_url}")
if response:
response.encoding = "utf-8"
content = response.text
data = get_name_url(
content,
pattern=(
constants.m3u_pattern
if "#EXTM3U" in content
else constants.txt_pattern
),
multiline=True,
)
for item in data:
name = item["name"]
url = item["url"]
if name and url:
url = url.partition("$")[0]
if not multicast:
info = (
f"{region}{hotel_name}"
if hotel
else (
f"{multicast_name}"
if "/rtp/" in url
else f"{subscribe_name}"
)
)
if in_whitelist:
info = "!"
url = add_url_info(url, info)
url = format_url_with_cache(
url, cache=subscribe_url if (multicast or hotel) else None
)
value = url if multicast else (url, None, None)
name = format_channel_name(name)
if name in channels:
if multicast:
if value not in channels[name][region][type]:
channels[name][region][type].append(value)
elif value not in channels[name]:
channels[name].append(value)
else:
if multicast:
channels[name][region][type] = [value]
else:
channels[name] = [value]
except Exception as e:
if error_print:
print(f"Error on {subscribe_url}: {e}")
finally:
session.close()
pbar.update()
remain = subscribe_urls_len - pbar.n
if callback:
callback(
f"正在获取{mode_name}源, 剩余{remain}{mode_name}源待获取, 预计剩余时间: {get_pbar_remaining(n=pbar.n, total=pbar.total, start_time=start_time)}",
int((pbar.n / subscribe_urls_len) * 100),
)
return channels
with ThreadPoolExecutor(max_workers=100) as executor:
futures = [
executor.submit(process_subscribe_channels, subscribe_url)
for subscribe_url in urls
]
for future in futures:
subscribe_results = merge_objects(subscribe_results, future.result())
pbar.close()
return subscribe_results