TV/updates/multicast/update_tmp.py
2024-12-12 16:18:14 +08:00

163 lines
5.5 KiB
Python

import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
from updates.subscribe import get_channels_by_subscribe_urls
from driver.utils import get_soup_driver
from utils.config import config
import utils.constants as constants
from utils.channel import format_channel_name
from utils.tools import get_pbar_remaining, resource_path, get_name_url
import json
# import asyncio
from requests import Session
from collections import defaultdict
from time import time
from tqdm import tqdm
def get_region_urls_from_IPTV_Multicast_source():
"""
Get the region urls from IPTV_Multicast_source
"""
region_url = {}
origin_url = "https://github.com/xisohi/IPTV-Multicast-source/blob/main/README.md"
soup = get_soup_driver(origin_url)
tbody = soup.find("tbody")
trs = tbody.find_all("tr") if tbody else []
for tr in trs:
tds = tr.find_all("td")
name = tds[0].get_text().strip()
unicom = tds[1].find("a", href=True).get("href")
mobile = tds[2].find("a", href=True).get("href")
telecom = tds[3].find("a", href=True).get("href")
if name not in region_url:
region_url[name] = {}
region_url[name]["联通"] = unicom
region_url[name]["移动"] = mobile
region_url[name]["电信"] = telecom
with open(
resource_path("updates/multicast/multicast_map.json"), "w", encoding="utf-8"
) as f:
json.dump(region_url, f, ensure_ascii=False, indent=4)
def get_multicast_urls_info_from_region_list():
"""
Get the multicast urls info from region
"""
urls_info = []
with open(
resource_path("updates/multicast/multicast_map.json"), "r", encoding="utf-8"
) as f:
region_url = json.load(f)
urls_info = [
{"region": region, "type": type, "url": url}
for region, value in region_url.items()
for type, url in value.items()
]
return urls_info
async def get_multicast_region_result():
"""
Get multicast region result
"""
multicast_region_urls_info = get_multicast_urls_info_from_region_list()
multicast_result = await get_channels_by_subscribe_urls(
multicast_region_urls_info, multicast=True
)
with open(
resource_path("updates/multicast/multicast_region_result.json"),
"w",
encoding="utf-8",
) as f:
json.dump(multicast_result, f, ensure_ascii=False, indent=4)
def get_multicast_region_type_result_txt():
"""
Get multicast region type result txt
"""
with open(
resource_path("updates/multicast/multicast_map.json"), "r", encoding="utf-8"
) as f:
region_url = json.load(f)
session = Session()
for region, value in region_url.items():
for type, url in value.items():
response = session.get(url)
content = response.text
with open(
resource_path(f"config/rtp/{region}_{type}.txt"),
"w",
encoding="utf-8",
) as f:
f.write(content)
def get_multicast_region_result_by_rtp_txt(callback=None):
"""
Get multicast region result by rtp txt
"""
rtp_path = resource_path("config/rtp")
config_region_list = set(config.multicast_region_list)
rtp_file_list = [
filename.rsplit(".", 1)[0]
for filename in os.listdir(rtp_path)
if filename.endswith(".txt")
and "_" in filename
and (
filename.rsplit(".", 1)[0].partition("_")[0] in config_region_list
or config_region_list & {"all", "ALL", "全部"}
)
]
total_files = len(rtp_file_list)
if callback:
callback(f"正在读取本地组播数据, 共{total_files}个文件", 0)
pbar = tqdm(total=total_files, desc="Loading local multicast rtp files")
multicast_result = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))
start_time = time()
for filename in rtp_file_list:
region, _, type = filename.partition("_")
with open(
os.path.join(rtp_path, f"{filename}.txt"), "r", encoding="utf-8"
) as f:
for line in f:
name_url = get_name_url(line, pattern=constants.rtp_pattern)
if name_url and name_url[0]:
channel_name = format_channel_name(name_url[0]["name"])
url = name_url[0]["url"]
if url not in multicast_result[channel_name][region][type]:
multicast_result[channel_name][region][type].append(url)
pbar.update()
if callback:
remaining_files = total_files - pbar.n
estimated_time = get_pbar_remaining(pbar.n, total_files, start_time)
callback(
f"正在读取{region}_{type}的组播数据, 剩余{remaining_files}个文件, 预计剩余时间: {estimated_time}",
int((pbar.n / total_files) * 100),
)
with open(
resource_path("updates/multicast/multicast_region_result.json"),
"w",
encoding="utf-8",
) as f:
json.dump(multicast_result, f, ensure_ascii=False, indent=4)
pbar.close()
return multicast_result
if __name__ == "__main__":
get_region_urls_from_IPTV_Multicast_source()
# asyncio.run(get_multicast_region_result())
get_multicast_region_type_result_txt()
# get_multicast_region_result_by_rtp_txt()