mirror of
https://github.com/xmbjm/TV.git
synced 2025-01-21 00:45:38 -05:00
758 lines
26 KiB
Python
758 lines
26 KiB
Python
import asyncio
|
||
import base64
|
||
import copy
|
||
import os
|
||
import pickle
|
||
import re
|
||
from collections import defaultdict
|
||
from logging import INFO
|
||
|
||
from bs4 import NavigableString
|
||
from opencc import OpenCC
|
||
|
||
import utils.constants as constants
|
||
from utils.config import config
|
||
from utils.speed import (
|
||
get_speed,
|
||
sort_urls,
|
||
check_ffmpeg_installed_status
|
||
)
|
||
from utils.tools import (
|
||
get_name_url,
|
||
check_url_by_keywords,
|
||
check_url_ipv_type,
|
||
get_total_urls,
|
||
process_nested_dict,
|
||
add_url_info,
|
||
remove_cache_info,
|
||
resource_path,
|
||
get_urls_from_file,
|
||
get_name_urls_from_file,
|
||
get_logger,
|
||
get_datetime_now
|
||
)
|
||
|
||
|
||
def get_channel_data_from_file(channels, file, use_old, whitelist):
|
||
"""
|
||
Get the channel data from the file
|
||
"""
|
||
current_category = ""
|
||
|
||
for line in file:
|
||
line = line.strip()
|
||
if "#genre#" in line:
|
||
current_category = line.partition(",")[0]
|
||
else:
|
||
name_url = get_name_url(
|
||
line, pattern=constants.demo_txt_pattern, check_url=False
|
||
)
|
||
if name_url and name_url[0]:
|
||
name = name_url[0]["name"]
|
||
url = name_url[0]["url"]
|
||
category_dict = channels[current_category]
|
||
if name not in category_dict:
|
||
category_dict[name] = []
|
||
if name in whitelist:
|
||
for whitelist_url in whitelist[name]:
|
||
category_dict[name].append((whitelist_url, None, None, "whitelist"))
|
||
if use_old and url:
|
||
info = url.partition("$")[2]
|
||
origin = None
|
||
if info and info.startswith("!"):
|
||
origin = "whitelist"
|
||
data = (url, None, None, origin)
|
||
if data not in category_dict[name]:
|
||
category_dict[name].append(data)
|
||
return channels
|
||
|
||
|
||
def get_channel_items():
|
||
"""
|
||
Get the channel items from the source file
|
||
"""
|
||
user_source_file = resource_path(config.source_file)
|
||
channels = defaultdict(lambda: defaultdict(list))
|
||
whitelist = get_name_urls_from_file(constants.whitelist_path)
|
||
whitelist_urls = get_urls_from_file(constants.whitelist_path)
|
||
whitelist_len = len(list(whitelist.keys()))
|
||
if whitelist_len:
|
||
print(f"Found {whitelist_len} channel in whitelist")
|
||
|
||
if os.path.exists(user_source_file):
|
||
with open(user_source_file, "r", encoding="utf-8") as file:
|
||
channels = get_channel_data_from_file(
|
||
channels, file, config.open_use_old_result, whitelist
|
||
)
|
||
|
||
if config.open_use_old_result:
|
||
result_cache_path = resource_path(constants.cache_path)
|
||
if os.path.exists(result_cache_path):
|
||
with open(result_cache_path, "rb") as file:
|
||
old_result = pickle.load(file)
|
||
for cate, data in channels.items():
|
||
if cate in old_result:
|
||
for name, info_list in data.items():
|
||
urls = [
|
||
item[0].partition("$")[0]
|
||
for item in info_list
|
||
if item[0]
|
||
]
|
||
if name in old_result[cate]:
|
||
for info in old_result[cate][name]:
|
||
if info:
|
||
try:
|
||
if info[3] == "whitelist" and not any(
|
||
url in info[0] for url in whitelist_urls):
|
||
continue
|
||
except:
|
||
pass
|
||
pure_url = info[0].partition("$")[0]
|
||
if pure_url not in urls:
|
||
channels[cate][name].append(info)
|
||
return channels
|
||
|
||
|
||
def format_channel_name(name):
|
||
"""
|
||
Format the channel name with sub and replace and lower
|
||
"""
|
||
if config.open_keep_all:
|
||
return name
|
||
cc = OpenCC("t2s")
|
||
name = cc.convert(name)
|
||
for region in constants.region_list:
|
||
name = name.replace(f"{region}|", "")
|
||
name = re.sub(constants.sub_pattern, "", name)
|
||
for old, new in constants.replace_dict.items():
|
||
name = name.replace(old, new)
|
||
return name.lower()
|
||
|
||
|
||
def channel_name_is_equal(name1, name2):
|
||
"""
|
||
Check if the channel name is equal
|
||
"""
|
||
if config.open_keep_all:
|
||
return True
|
||
name1_format = format_channel_name(name1)
|
||
name2_format = format_channel_name(name2)
|
||
return name1_format == name2_format
|
||
|
||
|
||
def get_channel_results_by_name(name, data):
|
||
"""
|
||
Get channel results from data by name
|
||
"""
|
||
format_name = format_channel_name(name)
|
||
results = data.get(format_name, [])
|
||
return results
|
||
|
||
|
||
def get_element_child_text_list(element, child_name):
|
||
"""
|
||
Get the child text of the element
|
||
"""
|
||
text_list = []
|
||
children = element.find_all(child_name)
|
||
if children:
|
||
for child in children:
|
||
text = child.get_text(strip=True)
|
||
if text:
|
||
text_list.append(text)
|
||
return text_list
|
||
|
||
|
||
def get_multicast_ip_list(urls):
|
||
"""
|
||
Get the multicast ip list from urls
|
||
"""
|
||
ip_list = []
|
||
for url in urls:
|
||
pattern = r"rtp://((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})(?::(\d+))?)"
|
||
matcher = re.search(pattern, url)
|
||
if matcher:
|
||
ip_list.append(matcher.group(1))
|
||
return ip_list
|
||
|
||
|
||
def get_channel_multicast_region_ip_list(result, channel_region, channel_type):
|
||
"""
|
||
Get the channel multicast region ip list by region and type from result
|
||
"""
|
||
return [
|
||
ip
|
||
for result_region, result_obj in result.items()
|
||
if result_region in channel_region
|
||
for type, urls in result_obj.items()
|
||
if type in channel_type
|
||
for ip in get_multicast_ip_list(urls)
|
||
]
|
||
|
||
|
||
def get_channel_multicast_name_region_type_result(result, names):
|
||
"""
|
||
Get the multicast name and region and type result by names from result
|
||
"""
|
||
name_region_type_result = {}
|
||
for name in names:
|
||
data = result.get(name)
|
||
if data:
|
||
name_region_type_result[name] = data
|
||
return name_region_type_result
|
||
|
||
|
||
def get_channel_multicast_region_type_list(result):
|
||
"""
|
||
Get the channel multicast region type list from result
|
||
"""
|
||
region_list = config.multicast_region_list
|
||
region_type_list = {
|
||
(region, type)
|
||
for region_type in result.values()
|
||
for region, types in region_type.items()
|
||
if "all" in region_list
|
||
or "ALL" in region_list
|
||
or "全部" in region_list
|
||
or region in region_list
|
||
for type in types
|
||
}
|
||
return list(region_type_list)
|
||
|
||
|
||
def get_channel_multicast_result(result, search_result):
|
||
"""
|
||
Get the channel multicast info result by result and search result
|
||
"""
|
||
info_result = {}
|
||
multicast_name = constants.origin_map["multicast"]
|
||
whitelist = get_urls_from_file(constants.whitelist_path)
|
||
blacklist = get_urls_from_file(constants.blacklist_path)
|
||
for name, result_obj in result.items():
|
||
info_list = [
|
||
(
|
||
(
|
||
add_url_info(
|
||
f"http://{url}/rtp/{ip}",
|
||
f"{result_region}{result_type}{multicast_name}-cache:{url}",
|
||
)
|
||
if config.open_sort
|
||
else add_url_info(
|
||
f"http://{url}/rtp/{ip}",
|
||
f"{result_region}{result_type}{multicast_name}",
|
||
)
|
||
),
|
||
date,
|
||
resolution,
|
||
)
|
||
for result_region, result_types in result_obj.items()
|
||
if result_region in search_result
|
||
for result_type, result_type_urls in result_types.items()
|
||
if result_type in search_result[result_region]
|
||
for ip in get_multicast_ip_list(result_type_urls) or []
|
||
for url, date, resolution in search_result[result_region][result_type]
|
||
if (whitelist and check_url_by_keywords(f"http://{url}/rtp/{ip}", whitelist)) or
|
||
(
|
||
check_url_ipv_type(f"http://{url}/rtp/{ip}") and not check_url_by_keywords(
|
||
f"http://{url}/rtp/{ip}", blacklist))
|
||
]
|
||
info_result[name] = info_list
|
||
return info_result
|
||
|
||
|
||
def get_results_from_soup(soup, name):
|
||
"""
|
||
Get the results from the soup
|
||
"""
|
||
results = []
|
||
if not soup.descendants:
|
||
return results
|
||
for element in soup.descendants:
|
||
if isinstance(element, NavigableString):
|
||
text = element.get_text(strip=True)
|
||
url = get_channel_url(text)
|
||
if url and not any(item[0] == url for item in results):
|
||
url_element = soup.find(lambda tag: tag.get_text(strip=True) == url)
|
||
if url_element:
|
||
name_element = url_element.find_previous_sibling()
|
||
if name_element:
|
||
channel_name = name_element.get_text(strip=True)
|
||
if channel_name_is_equal(name, channel_name):
|
||
info_element = url_element.find_next_sibling()
|
||
date, resolution = get_channel_info(
|
||
info_element.get_text(strip=True)
|
||
)
|
||
results.append((url, date, resolution))
|
||
return results
|
||
|
||
|
||
def get_results_from_multicast_soup(soup, hotel=False):
|
||
"""
|
||
Get the results from the multicast soup
|
||
"""
|
||
results = []
|
||
if not soup.descendants:
|
||
return results
|
||
for element in soup.descendants:
|
||
if isinstance(element, NavigableString):
|
||
text = element.strip()
|
||
if "失效" in text:
|
||
continue
|
||
url = get_channel_url(text)
|
||
if url and not any(item["url"] == url for item in results):
|
||
url_element = soup.find(lambda tag: tag.get_text(strip=True) == url)
|
||
if not url_element:
|
||
continue
|
||
parent_element = url_element.find_parent()
|
||
info_element = parent_element.find_all(recursive=False)[-1]
|
||
if not info_element:
|
||
continue
|
||
info_text = info_element.get_text(strip=True)
|
||
if "上线" in info_text and " " in info_text:
|
||
date, region, type = get_multicast_channel_info(info_text)
|
||
if hotel and "酒店" not in region:
|
||
continue
|
||
results.append(
|
||
{
|
||
"url": url,
|
||
"date": date,
|
||
"region": region,
|
||
"type": type,
|
||
}
|
||
)
|
||
return results
|
||
|
||
|
||
def get_results_from_soup_requests(soup, name):
|
||
"""
|
||
Get the results from the soup by requests
|
||
"""
|
||
results = []
|
||
elements = soup.find_all("div", class_="resultplus") if soup else []
|
||
for element in elements:
|
||
name_element = element.find("div", class_="channel")
|
||
if name_element:
|
||
channel_name = name_element.get_text(strip=True)
|
||
if channel_name_is_equal(name, channel_name):
|
||
text_list = get_element_child_text_list(element, "div")
|
||
url = date = resolution = None
|
||
for text in text_list:
|
||
text_url = get_channel_url(text)
|
||
if text_url:
|
||
url = text_url
|
||
if " " in text:
|
||
text_info = get_channel_info(text)
|
||
date, resolution = text_info
|
||
if url:
|
||
results.append((url, date, resolution))
|
||
return results
|
||
|
||
|
||
def get_results_from_multicast_soup_requests(soup, hotel=False):
|
||
"""
|
||
Get the results from the multicast soup by requests
|
||
"""
|
||
results = []
|
||
if not soup:
|
||
return results
|
||
|
||
elements = soup.find_all("div", class_="result")
|
||
for element in elements:
|
||
name_element = element.find("div", class_="channel")
|
||
if not name_element:
|
||
continue
|
||
|
||
text_list = get_element_child_text_list(element, "div")
|
||
url, date, region, type = None, None, None, None
|
||
valid = True
|
||
|
||
for text in text_list:
|
||
if "失效" in text:
|
||
valid = False
|
||
break
|
||
|
||
text_url = get_channel_url(text)
|
||
if text_url:
|
||
url = text_url
|
||
|
||
if url and "上线" in text and " " in text:
|
||
date, region, type = get_multicast_channel_info(text)
|
||
|
||
if url and valid:
|
||
if hotel and "酒店" not in region:
|
||
continue
|
||
results.append({"url": url, "date": date, "region": region, "type": type})
|
||
|
||
return results
|
||
|
||
|
||
def get_channel_url(text):
|
||
"""
|
||
Get the url from text
|
||
"""
|
||
url = None
|
||
url_search = re.search(
|
||
constants.url_pattern,
|
||
text,
|
||
)
|
||
if url_search:
|
||
url = url_search.group()
|
||
return url
|
||
|
||
|
||
def get_channel_info(text):
|
||
"""
|
||
Get the channel info from text
|
||
"""
|
||
date, resolution = None, None
|
||
if text:
|
||
date, resolution = (
|
||
(text.partition(" ")[0] if text.partition(" ")[0] else None),
|
||
(
|
||
text.partition(" ")[2].partition("•")[2]
|
||
if text.partition(" ")[2].partition("•")[2]
|
||
else None
|
||
),
|
||
)
|
||
return date, resolution
|
||
|
||
|
||
def get_multicast_channel_info(text):
|
||
"""
|
||
Get the multicast channel info from text
|
||
"""
|
||
date, region, type = None, None, None
|
||
if text:
|
||
text_split = text.split(" ")
|
||
filtered_data = list(filter(lambda x: x.strip() != "", text_split))
|
||
if filtered_data and len(filtered_data) == 4:
|
||
date = filtered_data[0]
|
||
region = filtered_data[2]
|
||
type = filtered_data[3]
|
||
return date, region, type
|
||
|
||
|
||
def init_info_data(data, cate, name):
|
||
"""
|
||
Init channel info data
|
||
"""
|
||
if data.get(cate) is None:
|
||
data[cate] = {}
|
||
if data[cate].get(name) is None:
|
||
data[cate][name] = []
|
||
|
||
|
||
def append_data_to_info_data(info_data, cate, name, data, origin=None, check=True, whitelist=None, blacklist=None):
|
||
"""
|
||
Append channel data to total info data
|
||
"""
|
||
init_info_data(info_data, cate, name)
|
||
urls = [x[0].partition("$")[0] for x in info_data[cate][name] if x[0]]
|
||
for item in data:
|
||
try:
|
||
url, date, resolution, *rest = item
|
||
url_origin = origin or (rest[0] if rest else None)
|
||
if not url_origin:
|
||
continue
|
||
if url:
|
||
url_partition = url.partition("$")
|
||
pure_url = url_partition[0]
|
||
url_info = url_partition[2]
|
||
white_info = url_info and url_info.startswith("!")
|
||
if (pure_url in urls) and not white_info:
|
||
continue
|
||
if white_info or (whitelist and check_url_by_keywords(url, whitelist)):
|
||
url_origin = "whitelist"
|
||
if (
|
||
url_origin == "whitelist"
|
||
or (not check)
|
||
or (
|
||
check and check_url_ipv_type(pure_url) and not check_url_by_keywords(url, blacklist))
|
||
):
|
||
info_data[cate][name].append((url, date, resolution, url_origin))
|
||
urls.append(pure_url)
|
||
except:
|
||
continue
|
||
|
||
|
||
def get_origin_method_name(method):
|
||
"""
|
||
Get the origin method name
|
||
"""
|
||
return "hotel" if method.startswith("hotel_") else method
|
||
|
||
|
||
def append_old_data_to_info_data(info_data, cate, name, data, whitelist=None, blacklist=None):
|
||
"""
|
||
Append history channel data to total info data
|
||
"""
|
||
append_data_to_info_data(
|
||
info_data,
|
||
cate,
|
||
name,
|
||
data,
|
||
whitelist=whitelist,
|
||
blacklist=blacklist
|
||
)
|
||
print("History:", len(data), end=", ")
|
||
|
||
|
||
def append_total_data(
|
||
items,
|
||
names,
|
||
data,
|
||
hotel_fofa_result=None,
|
||
multicast_result=None,
|
||
hotel_foodie_result=None,
|
||
subscribe_result=None,
|
||
online_search_result=None,
|
||
):
|
||
"""
|
||
Append all method data to total info data
|
||
"""
|
||
total_result = [
|
||
("hotel_fofa", hotel_fofa_result),
|
||
("multicast", multicast_result),
|
||
("hotel_foodie", hotel_foodie_result),
|
||
("subscribe", subscribe_result),
|
||
("online_search", online_search_result),
|
||
]
|
||
whitelist = get_urls_from_file(constants.whitelist_path)
|
||
blacklist = get_urls_from_file(constants.blacklist_path)
|
||
for cate, channel_obj in items:
|
||
for name, old_info_list in channel_obj.items():
|
||
print(f"{name}:", end=" ")
|
||
if config.open_use_old_result and old_info_list:
|
||
append_old_data_to_info_data(data, cate, name, old_info_list, whitelist=whitelist, blacklist=blacklist)
|
||
for method, result in total_result:
|
||
if config.open_method[method]:
|
||
origin_method = get_origin_method_name(method)
|
||
if not origin_method:
|
||
continue
|
||
name_results = get_channel_results_by_name(name, result)
|
||
append_data_to_info_data(
|
||
data, cate, name, name_results, origin=origin_method, whitelist=whitelist, blacklist=blacklist
|
||
)
|
||
print(f"{method.capitalize()}:", len(name_results), end=", ")
|
||
print(
|
||
"Total:",
|
||
len(data.get(cate, {}).get(name, [])),
|
||
)
|
||
if config.open_keep_all:
|
||
extra_cate = "📥其它频道"
|
||
for method, result in total_result:
|
||
if config.open_method[method]:
|
||
origin_method = get_origin_method_name(method)
|
||
if not origin_method:
|
||
continue
|
||
for name, urls in result.items():
|
||
if name in names:
|
||
continue
|
||
print(f"{name}:", end=" ")
|
||
if config.open_use_old_result:
|
||
old_info_list = channel_obj.get(name, [])
|
||
if old_info_list:
|
||
append_old_data_to_info_data(
|
||
data, extra_cate, name, old_info_list
|
||
)
|
||
append_data_to_info_data(
|
||
data, extra_cate, name, urls, origin=origin_method, whitelist=whitelist, blacklist=blacklist
|
||
)
|
||
print(name, f"{method.capitalize()}:", len(urls), end=", ")
|
||
print(
|
||
"Total:",
|
||
len(data.get(cate, {}).get(name, [])),
|
||
)
|
||
|
||
|
||
async def process_sort_channel_list(data, ipv6=False, callback=None):
|
||
"""
|
||
Process the sort channel list
|
||
"""
|
||
ipv6_proxy = None if (not config.open_ipv6 or ipv6) else constants.ipv6_proxy
|
||
open_filter_resolution = config.open_filter_resolution
|
||
get_resolution = open_filter_resolution and check_ffmpeg_installed_status()
|
||
sort_timeout = config.sort_timeout
|
||
need_sort_data = copy.deepcopy(data)
|
||
process_nested_dict(need_sort_data, seen=set(), flag=r"cache:(.*)", force_str="!")
|
||
result = {}
|
||
semaphore = asyncio.Semaphore(10)
|
||
|
||
async def limited_get_speed(info, ipv6_proxy, filter_resolution, timeout, callback):
|
||
async with semaphore:
|
||
return await get_speed(info[0], ipv6_proxy=ipv6_proxy, filter_resolution=filter_resolution, timeout=timeout,
|
||
callback=callback)
|
||
|
||
tasks = [
|
||
asyncio.create_task(
|
||
limited_get_speed(
|
||
info,
|
||
ipv6_proxy=ipv6_proxy,
|
||
filter_resolution=get_resolution,
|
||
timeout=sort_timeout,
|
||
callback=callback,
|
||
)
|
||
)
|
||
for channel_obj in need_sort_data.values()
|
||
for info_list in channel_obj.values()
|
||
for info in info_list
|
||
]
|
||
await asyncio.gather(*tasks)
|
||
logger = get_logger(constants.sort_log_path, level=INFO, init=True)
|
||
open_supply = config.open_supply
|
||
open_filter_speed = config.open_filter_speed
|
||
min_speed = config.min_speed
|
||
min_resolution = config.min_resolution_value
|
||
for cate, obj in data.items():
|
||
for name, info_list in obj.items():
|
||
info_list = sort_urls(name, info_list, supply=open_supply, filter_speed=open_filter_speed,
|
||
min_speed=min_speed, filter_resolution=open_filter_resolution,
|
||
min_resolution=min_resolution, logger=logger)
|
||
append_data_to_info_data(
|
||
result,
|
||
cate,
|
||
name,
|
||
info_list,
|
||
check=False,
|
||
)
|
||
logger.handlers.clear()
|
||
return result
|
||
|
||
|
||
def write_channel_to_file(data, ipv6=False, callback=None):
|
||
"""
|
||
Write channel to file
|
||
"""
|
||
try:
|
||
path = constants.result_path
|
||
if not os.path.exists("output"):
|
||
os.makedirs("output")
|
||
no_result_name = []
|
||
open_empty_category = config.open_empty_category
|
||
ipv_type_prefer = list(config.ipv_type_prefer)
|
||
if any(pref in ipv_type_prefer for pref in ["自动", "auto"]) or not ipv_type_prefer:
|
||
ipv_type_prefer = ["ipv6", "ipv4"] if ipv6 else ["ipv4", "ipv6"]
|
||
origin_type_prefer = config.origin_type_prefer
|
||
first_cate = True
|
||
content = ""
|
||
for cate, channel_obj in data.items():
|
||
print(f"\n{cate}:", end=" ")
|
||
content += f"{'\n\n' if not first_cate else ''}{cate},#genre#"
|
||
first_cate = False
|
||
channel_obj_keys = channel_obj.keys()
|
||
names_len = len(list(channel_obj_keys))
|
||
for i, name in enumerate(channel_obj_keys):
|
||
info_list = data.get(cate, {}).get(name, [])
|
||
channel_urls = get_total_urls(info_list, ipv_type_prefer, origin_type_prefer)
|
||
end_char = ", " if i < names_len - 1 else ""
|
||
print(f"{name}:", len(channel_urls), end=end_char)
|
||
if not channel_urls:
|
||
if open_empty_category:
|
||
no_result_name.append(name)
|
||
continue
|
||
for url in channel_urls:
|
||
content += f"\n{name},{url}"
|
||
if callback:
|
||
callback()
|
||
print()
|
||
if open_empty_category and no_result_name:
|
||
print("\n🈳 No result channel name:")
|
||
content += "\n\n🈳无结果频道,#genre#"
|
||
for i, name in enumerate(no_result_name):
|
||
end_char = ", " if i < len(no_result_name) - 1 else ""
|
||
print(name, end=end_char)
|
||
content += f"\n{name},url"
|
||
print()
|
||
if config.open_update_time:
|
||
update_time_url = next(
|
||
(get_total_urls(info_list, ipv_type_prefer, origin_type_prefer)[0]
|
||
for channel_obj in data.values()
|
||
for info_list in channel_obj.values() if info_list),
|
||
"url"
|
||
)
|
||
if config.update_time_position == "top":
|
||
content = f"🕘️更新时间,#genre#\n{get_datetime_now()},{update_time_url}\n\n{content}"
|
||
else:
|
||
content += f"\n\n🕘️更新时间,#genre#\n{get_datetime_now()},{update_time_url}"
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
f.write(content)
|
||
except Exception as e:
|
||
print(f"❌ Write channel to file failed: {e}")
|
||
|
||
|
||
def get_multicast_fofa_search_org(region, type):
|
||
"""
|
||
Get the fofa search organization for multicast
|
||
"""
|
||
org = None
|
||
if region == "北京" and type == "联通":
|
||
org = "China Unicom Beijing Province Network"
|
||
elif type == "联通":
|
||
org = "CHINA UNICOM China169 Backbone"
|
||
elif type == "电信":
|
||
org = "Chinanet"
|
||
elif type == "移动":
|
||
org = "China Mobile communications corporation"
|
||
return org
|
||
|
||
|
||
def get_multicast_fofa_search_urls():
|
||
"""
|
||
Get the fofa search urls for multicast
|
||
"""
|
||
rtp_file_names = []
|
||
for filename in os.listdir(resource_path("config/rtp")):
|
||
if filename.endswith(".txt") and "_" in filename:
|
||
filename = filename.replace(".txt", "")
|
||
rtp_file_names.append(filename)
|
||
region_list = config.multicast_region_list
|
||
region_type_list = [
|
||
(parts[0], parts[1])
|
||
for name in rtp_file_names
|
||
if (parts := name.partition("_"))[0] in region_list
|
||
or "all" in region_list
|
||
or "ALL" in region_list
|
||
or "全部" in region_list
|
||
]
|
||
search_urls = []
|
||
for region, type in region_type_list:
|
||
search_url = "https://fofa.info/result?qbase64="
|
||
search_txt = f'"udpxy" && country="CN" && region="{region}" && org="{get_multicast_fofa_search_org(region, type)}"'
|
||
bytes_string = search_txt.encode("utf-8")
|
||
search_txt = base64.b64encode(bytes_string).decode("utf-8")
|
||
search_url += search_txt
|
||
search_urls.append((search_url, region, type))
|
||
return search_urls
|
||
|
||
|
||
def get_channel_data_cache_with_compare(data, new_data):
|
||
"""
|
||
Get channel data with cache compare new data
|
||
"""
|
||
for cate, obj in new_data.items():
|
||
for name, url_info in obj.items():
|
||
if url_info and cate in data and name in data[cate]:
|
||
new_urls = {
|
||
new_url.partition("$")[0]: new_resolution
|
||
for new_url, _, new_resolution, _ in url_info
|
||
}
|
||
updated_data = []
|
||
for info in data[cate][name]:
|
||
url, date, resolution, origin = info
|
||
base_url = url.partition("$")[0]
|
||
if base_url in new_urls:
|
||
resolution = new_urls[base_url]
|
||
updated_data.append((url, date, resolution, origin))
|
||
data[cate][name] = updated_data
|
||
|
||
|
||
def format_channel_url_info(data):
|
||
"""
|
||
Format channel url info, remove cache, add resolution to url
|
||
"""
|
||
for obj in data.values():
|
||
for url_info in obj.values():
|
||
for i, (url, date, resolution, origin) in enumerate(url_info):
|
||
url = remove_cache_info(url)
|
||
url_info[i] = (url, date, resolution, origin)
|