TV/utils/channel.py
2025-01-08 15:50:39 +08:00

758 lines
26 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import base64
import copy
import os
import pickle
import re
from collections import defaultdict
from logging import INFO
from bs4 import NavigableString
from opencc import OpenCC
import utils.constants as constants
from utils.config import config
from utils.speed import (
get_speed,
sort_urls,
check_ffmpeg_installed_status
)
from utils.tools import (
get_name_url,
check_url_by_keywords,
check_url_ipv_type,
get_total_urls,
process_nested_dict,
add_url_info,
remove_cache_info,
resource_path,
get_urls_from_file,
get_name_urls_from_file,
get_logger,
get_datetime_now
)
def get_channel_data_from_file(channels, file, use_old, whitelist):
"""
Get the channel data from the file
"""
current_category = ""
for line in file:
line = line.strip()
if "#genre#" in line:
current_category = line.partition(",")[0]
else:
name_url = get_name_url(
line, pattern=constants.demo_txt_pattern, check_url=False
)
if name_url and name_url[0]:
name = name_url[0]["name"]
url = name_url[0]["url"]
category_dict = channels[current_category]
if name not in category_dict:
category_dict[name] = []
if name in whitelist:
for whitelist_url in whitelist[name]:
category_dict[name].append((whitelist_url, None, None, "whitelist"))
if use_old and url:
info = url.partition("$")[2]
origin = None
if info and info.startswith("!"):
origin = "whitelist"
data = (url, None, None, origin)
if data not in category_dict[name]:
category_dict[name].append(data)
return channels
def get_channel_items():
"""
Get the channel items from the source file
"""
user_source_file = resource_path(config.source_file)
channels = defaultdict(lambda: defaultdict(list))
whitelist = get_name_urls_from_file(constants.whitelist_path)
whitelist_urls = get_urls_from_file(constants.whitelist_path)
whitelist_len = len(list(whitelist.keys()))
if whitelist_len:
print(f"Found {whitelist_len} channel in whitelist")
if os.path.exists(user_source_file):
with open(user_source_file, "r", encoding="utf-8") as file:
channels = get_channel_data_from_file(
channels, file, config.open_use_old_result, whitelist
)
if config.open_use_old_result:
result_cache_path = resource_path(constants.cache_path)
if os.path.exists(result_cache_path):
with open(result_cache_path, "rb") as file:
old_result = pickle.load(file)
for cate, data in channels.items():
if cate in old_result:
for name, info_list in data.items():
urls = [
item[0].partition("$")[0]
for item in info_list
if item[0]
]
if name in old_result[cate]:
for info in old_result[cate][name]:
if info:
try:
if info[3] == "whitelist" and not any(
url in info[0] for url in whitelist_urls):
continue
except:
pass
pure_url = info[0].partition("$")[0]
if pure_url not in urls:
channels[cate][name].append(info)
return channels
def format_channel_name(name):
"""
Format the channel name with sub and replace and lower
"""
if config.open_keep_all:
return name
cc = OpenCC("t2s")
name = cc.convert(name)
for region in constants.region_list:
name = name.replace(f"{region}", "")
name = re.sub(constants.sub_pattern, "", name)
for old, new in constants.replace_dict.items():
name = name.replace(old, new)
return name.lower()
def channel_name_is_equal(name1, name2):
"""
Check if the channel name is equal
"""
if config.open_keep_all:
return True
name1_format = format_channel_name(name1)
name2_format = format_channel_name(name2)
return name1_format == name2_format
def get_channel_results_by_name(name, data):
"""
Get channel results from data by name
"""
format_name = format_channel_name(name)
results = data.get(format_name, [])
return results
def get_element_child_text_list(element, child_name):
"""
Get the child text of the element
"""
text_list = []
children = element.find_all(child_name)
if children:
for child in children:
text = child.get_text(strip=True)
if text:
text_list.append(text)
return text_list
def get_multicast_ip_list(urls):
"""
Get the multicast ip list from urls
"""
ip_list = []
for url in urls:
pattern = r"rtp://((\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})(?::(\d+))?)"
matcher = re.search(pattern, url)
if matcher:
ip_list.append(matcher.group(1))
return ip_list
def get_channel_multicast_region_ip_list(result, channel_region, channel_type):
"""
Get the channel multicast region ip list by region and type from result
"""
return [
ip
for result_region, result_obj in result.items()
if result_region in channel_region
for type, urls in result_obj.items()
if type in channel_type
for ip in get_multicast_ip_list(urls)
]
def get_channel_multicast_name_region_type_result(result, names):
"""
Get the multicast name and region and type result by names from result
"""
name_region_type_result = {}
for name in names:
data = result.get(name)
if data:
name_region_type_result[name] = data
return name_region_type_result
def get_channel_multicast_region_type_list(result):
"""
Get the channel multicast region type list from result
"""
region_list = config.multicast_region_list
region_type_list = {
(region, type)
for region_type in result.values()
for region, types in region_type.items()
if "all" in region_list
or "ALL" in region_list
or "全部" in region_list
or region in region_list
for type in types
}
return list(region_type_list)
def get_channel_multicast_result(result, search_result):
"""
Get the channel multicast info result by result and search result
"""
info_result = {}
multicast_name = constants.origin_map["multicast"]
whitelist = get_urls_from_file(constants.whitelist_path)
blacklist = get_urls_from_file(constants.blacklist_path)
for name, result_obj in result.items():
info_list = [
(
(
add_url_info(
f"http://{url}/rtp/{ip}",
f"{result_region}{result_type}{multicast_name}-cache:{url}",
)
if config.open_sort
else add_url_info(
f"http://{url}/rtp/{ip}",
f"{result_region}{result_type}{multicast_name}",
)
),
date,
resolution,
)
for result_region, result_types in result_obj.items()
if result_region in search_result
for result_type, result_type_urls in result_types.items()
if result_type in search_result[result_region]
for ip in get_multicast_ip_list(result_type_urls) or []
for url, date, resolution in search_result[result_region][result_type]
if (whitelist and check_url_by_keywords(f"http://{url}/rtp/{ip}", whitelist)) or
(
check_url_ipv_type(f"http://{url}/rtp/{ip}") and not check_url_by_keywords(
f"http://{url}/rtp/{ip}", blacklist))
]
info_result[name] = info_list
return info_result
def get_results_from_soup(soup, name):
"""
Get the results from the soup
"""
results = []
if not soup.descendants:
return results
for element in soup.descendants:
if isinstance(element, NavigableString):
text = element.get_text(strip=True)
url = get_channel_url(text)
if url and not any(item[0] == url for item in results):
url_element = soup.find(lambda tag: tag.get_text(strip=True) == url)
if url_element:
name_element = url_element.find_previous_sibling()
if name_element:
channel_name = name_element.get_text(strip=True)
if channel_name_is_equal(name, channel_name):
info_element = url_element.find_next_sibling()
date, resolution = get_channel_info(
info_element.get_text(strip=True)
)
results.append((url, date, resolution))
return results
def get_results_from_multicast_soup(soup, hotel=False):
"""
Get the results from the multicast soup
"""
results = []
if not soup.descendants:
return results
for element in soup.descendants:
if isinstance(element, NavigableString):
text = element.strip()
if "失效" in text:
continue
url = get_channel_url(text)
if url and not any(item["url"] == url for item in results):
url_element = soup.find(lambda tag: tag.get_text(strip=True) == url)
if not url_element:
continue
parent_element = url_element.find_parent()
info_element = parent_element.find_all(recursive=False)[-1]
if not info_element:
continue
info_text = info_element.get_text(strip=True)
if "上线" in info_text and " " in info_text:
date, region, type = get_multicast_channel_info(info_text)
if hotel and "酒店" not in region:
continue
results.append(
{
"url": url,
"date": date,
"region": region,
"type": type,
}
)
return results
def get_results_from_soup_requests(soup, name):
"""
Get the results from the soup by requests
"""
results = []
elements = soup.find_all("div", class_="resultplus") if soup else []
for element in elements:
name_element = element.find("div", class_="channel")
if name_element:
channel_name = name_element.get_text(strip=True)
if channel_name_is_equal(name, channel_name):
text_list = get_element_child_text_list(element, "div")
url = date = resolution = None
for text in text_list:
text_url = get_channel_url(text)
if text_url:
url = text_url
if " " in text:
text_info = get_channel_info(text)
date, resolution = text_info
if url:
results.append((url, date, resolution))
return results
def get_results_from_multicast_soup_requests(soup, hotel=False):
"""
Get the results from the multicast soup by requests
"""
results = []
if not soup:
return results
elements = soup.find_all("div", class_="result")
for element in elements:
name_element = element.find("div", class_="channel")
if not name_element:
continue
text_list = get_element_child_text_list(element, "div")
url, date, region, type = None, None, None, None
valid = True
for text in text_list:
if "失效" in text:
valid = False
break
text_url = get_channel_url(text)
if text_url:
url = text_url
if url and "上线" in text and " " in text:
date, region, type = get_multicast_channel_info(text)
if url and valid:
if hotel and "酒店" not in region:
continue
results.append({"url": url, "date": date, "region": region, "type": type})
return results
def get_channel_url(text):
"""
Get the url from text
"""
url = None
url_search = re.search(
constants.url_pattern,
text,
)
if url_search:
url = url_search.group()
return url
def get_channel_info(text):
"""
Get the channel info from text
"""
date, resolution = None, None
if text:
date, resolution = (
(text.partition(" ")[0] if text.partition(" ")[0] else None),
(
text.partition(" ")[2].partition("")[2]
if text.partition(" ")[2].partition("")[2]
else None
),
)
return date, resolution
def get_multicast_channel_info(text):
"""
Get the multicast channel info from text
"""
date, region, type = None, None, None
if text:
text_split = text.split(" ")
filtered_data = list(filter(lambda x: x.strip() != "", text_split))
if filtered_data and len(filtered_data) == 4:
date = filtered_data[0]
region = filtered_data[2]
type = filtered_data[3]
return date, region, type
def init_info_data(data, cate, name):
"""
Init channel info data
"""
if data.get(cate) is None:
data[cate] = {}
if data[cate].get(name) is None:
data[cate][name] = []
def append_data_to_info_data(info_data, cate, name, data, origin=None, check=True, whitelist=None, blacklist=None):
"""
Append channel data to total info data
"""
init_info_data(info_data, cate, name)
urls = [x[0].partition("$")[0] for x in info_data[cate][name] if x[0]]
for item in data:
try:
url, date, resolution, *rest = item
url_origin = origin or (rest[0] if rest else None)
if not url_origin:
continue
if url:
url_partition = url.partition("$")
pure_url = url_partition[0]
url_info = url_partition[2]
white_info = url_info and url_info.startswith("!")
if (pure_url in urls) and not white_info:
continue
if white_info or (whitelist and check_url_by_keywords(url, whitelist)):
url_origin = "whitelist"
if (
url_origin == "whitelist"
or (not check)
or (
check and check_url_ipv_type(pure_url) and not check_url_by_keywords(url, blacklist))
):
info_data[cate][name].append((url, date, resolution, url_origin))
urls.append(pure_url)
except:
continue
def get_origin_method_name(method):
"""
Get the origin method name
"""
return "hotel" if method.startswith("hotel_") else method
def append_old_data_to_info_data(info_data, cate, name, data, whitelist=None, blacklist=None):
"""
Append history channel data to total info data
"""
append_data_to_info_data(
info_data,
cate,
name,
data,
whitelist=whitelist,
blacklist=blacklist
)
print("History:", len(data), end=", ")
def append_total_data(
items,
names,
data,
hotel_fofa_result=None,
multicast_result=None,
hotel_foodie_result=None,
subscribe_result=None,
online_search_result=None,
):
"""
Append all method data to total info data
"""
total_result = [
("hotel_fofa", hotel_fofa_result),
("multicast", multicast_result),
("hotel_foodie", hotel_foodie_result),
("subscribe", subscribe_result),
("online_search", online_search_result),
]
whitelist = get_urls_from_file(constants.whitelist_path)
blacklist = get_urls_from_file(constants.blacklist_path)
for cate, channel_obj in items:
for name, old_info_list in channel_obj.items():
print(f"{name}:", end=" ")
if config.open_use_old_result and old_info_list:
append_old_data_to_info_data(data, cate, name, old_info_list, whitelist=whitelist, blacklist=blacklist)
for method, result in total_result:
if config.open_method[method]:
origin_method = get_origin_method_name(method)
if not origin_method:
continue
name_results = get_channel_results_by_name(name, result)
append_data_to_info_data(
data, cate, name, name_results, origin=origin_method, whitelist=whitelist, blacklist=blacklist
)
print(f"{method.capitalize()}:", len(name_results), end=", ")
print(
"Total:",
len(data.get(cate, {}).get(name, [])),
)
if config.open_keep_all:
extra_cate = "📥其它频道"
for method, result in total_result:
if config.open_method[method]:
origin_method = get_origin_method_name(method)
if not origin_method:
continue
for name, urls in result.items():
if name in names:
continue
print(f"{name}:", end=" ")
if config.open_use_old_result:
old_info_list = channel_obj.get(name, [])
if old_info_list:
append_old_data_to_info_data(
data, extra_cate, name, old_info_list
)
append_data_to_info_data(
data, extra_cate, name, urls, origin=origin_method, whitelist=whitelist, blacklist=blacklist
)
print(name, f"{method.capitalize()}:", len(urls), end=", ")
print(
"Total:",
len(data.get(cate, {}).get(name, [])),
)
async def process_sort_channel_list(data, ipv6=False, callback=None):
"""
Process the sort channel list
"""
ipv6_proxy = None if (not config.open_ipv6 or ipv6) else constants.ipv6_proxy
open_filter_resolution = config.open_filter_resolution
get_resolution = open_filter_resolution and check_ffmpeg_installed_status()
sort_timeout = config.sort_timeout
need_sort_data = copy.deepcopy(data)
process_nested_dict(need_sort_data, seen=set(), flag=r"cache:(.*)", force_str="!")
result = {}
semaphore = asyncio.Semaphore(10)
async def limited_get_speed(info, ipv6_proxy, filter_resolution, timeout, callback):
async with semaphore:
return await get_speed(info[0], ipv6_proxy=ipv6_proxy, filter_resolution=filter_resolution, timeout=timeout,
callback=callback)
tasks = [
asyncio.create_task(
limited_get_speed(
info,
ipv6_proxy=ipv6_proxy,
filter_resolution=get_resolution,
timeout=sort_timeout,
callback=callback,
)
)
for channel_obj in need_sort_data.values()
for info_list in channel_obj.values()
for info in info_list
]
await asyncio.gather(*tasks)
logger = get_logger(constants.sort_log_path, level=INFO, init=True)
open_supply = config.open_supply
open_filter_speed = config.open_filter_speed
min_speed = config.min_speed
min_resolution = config.min_resolution_value
for cate, obj in data.items():
for name, info_list in obj.items():
info_list = sort_urls(name, info_list, supply=open_supply, filter_speed=open_filter_speed,
min_speed=min_speed, filter_resolution=open_filter_resolution,
min_resolution=min_resolution, logger=logger)
append_data_to_info_data(
result,
cate,
name,
info_list,
check=False,
)
logger.handlers.clear()
return result
def write_channel_to_file(data, ipv6=False, callback=None):
"""
Write channel to file
"""
try:
path = constants.result_path
if not os.path.exists("output"):
os.makedirs("output")
no_result_name = []
open_empty_category = config.open_empty_category
ipv_type_prefer = list(config.ipv_type_prefer)
if any(pref in ipv_type_prefer for pref in ["自动", "auto"]) or not ipv_type_prefer:
ipv_type_prefer = ["ipv6", "ipv4"] if ipv6 else ["ipv4", "ipv6"]
origin_type_prefer = config.origin_type_prefer
first_cate = True
content = ""
for cate, channel_obj in data.items():
print(f"\n{cate}:", end=" ")
content += f"{'\n\n' if not first_cate else ''}{cate},#genre#"
first_cate = False
channel_obj_keys = channel_obj.keys()
names_len = len(list(channel_obj_keys))
for i, name in enumerate(channel_obj_keys):
info_list = data.get(cate, {}).get(name, [])
channel_urls = get_total_urls(info_list, ipv_type_prefer, origin_type_prefer)
end_char = ", " if i < names_len - 1 else ""
print(f"{name}:", len(channel_urls), end=end_char)
if not channel_urls:
if open_empty_category:
no_result_name.append(name)
continue
for url in channel_urls:
content += f"\n{name},{url}"
if callback:
callback()
print()
if open_empty_category and no_result_name:
print("\n🈳 No result channel name:")
content += "\n\n🈳无结果频道,#genre#"
for i, name in enumerate(no_result_name):
end_char = ", " if i < len(no_result_name) - 1 else ""
print(name, end=end_char)
content += f"\n{name},url"
print()
if config.open_update_time:
update_time_url = next(
(get_total_urls(info_list, ipv_type_prefer, origin_type_prefer)[0]
for channel_obj in data.values()
for info_list in channel_obj.values() if info_list),
"url"
)
if config.update_time_position == "top":
content = f"🕘️更新时间,#genre#\n{get_datetime_now()},{update_time_url}\n\n{content}"
else:
content += f"\n\n🕘️更新时间,#genre#\n{get_datetime_now()},{update_time_url}"
with open(path, "w", encoding="utf-8") as f:
f.write(content)
except Exception as e:
print(f"❌ Write channel to file failed: {e}")
def get_multicast_fofa_search_org(region, type):
"""
Get the fofa search organization for multicast
"""
org = None
if region == "北京" and type == "联通":
org = "China Unicom Beijing Province Network"
elif type == "联通":
org = "CHINA UNICOM China169 Backbone"
elif type == "电信":
org = "Chinanet"
elif type == "移动":
org = "China Mobile communications corporation"
return org
def get_multicast_fofa_search_urls():
"""
Get the fofa search urls for multicast
"""
rtp_file_names = []
for filename in os.listdir(resource_path("config/rtp")):
if filename.endswith(".txt") and "_" in filename:
filename = filename.replace(".txt", "")
rtp_file_names.append(filename)
region_list = config.multicast_region_list
region_type_list = [
(parts[0], parts[1])
for name in rtp_file_names
if (parts := name.partition("_"))[0] in region_list
or "all" in region_list
or "ALL" in region_list
or "全部" in region_list
]
search_urls = []
for region, type in region_type_list:
search_url = "https://fofa.info/result?qbase64="
search_txt = f'"udpxy" && country="CN" && region="{region}" && org="{get_multicast_fofa_search_org(region, type)}"'
bytes_string = search_txt.encode("utf-8")
search_txt = base64.b64encode(bytes_string).decode("utf-8")
search_url += search_txt
search_urls.append((search_url, region, type))
return search_urls
def get_channel_data_cache_with_compare(data, new_data):
"""
Get channel data with cache compare new data
"""
for cate, obj in new_data.items():
for name, url_info in obj.items():
if url_info and cate in data and name in data[cate]:
new_urls = {
new_url.partition("$")[0]: new_resolution
for new_url, _, new_resolution, _ in url_info
}
updated_data = []
for info in data[cate][name]:
url, date, resolution, origin = info
base_url = url.partition("$")[0]
if base_url in new_urls:
resolution = new_urls[base_url]
updated_data.append((url, date, resolution, origin))
data[cate][name] = updated_data
def format_channel_url_info(data):
"""
Format channel url info, remove cache, add resolution to url
"""
for obj in data.values():
for url_info in obj.values():
for i, (url, date, resolution, origin) in enumerate(url_info):
url = remove_cache_info(url)
url_info[i] = (url, date, resolution, origin)