feat:multicast update

This commit is contained in:
guorong.zheng 2024-08-09 18:00:07 +08:00
parent 884c99a355
commit c3bfaf4b14
5 changed files with 133 additions and 115 deletions

@ -35,7 +35,7 @@ subscribe_urls = [
"https://github.moeyy.xyz/https://raw.githubusercontent.com/PizazzGY/TVBox/main/live.txt",
]
open_multicast = True
region_list = ["all"]
region_list = ["广东"]
open_proxy = False
open_driver = False
open_use_old_result = True

@ -7,7 +7,7 @@ from utils.channel import (
get_channel_multicast_region_type_list,
get_channel_multicast_result,
)
from utils.tools import check_url_by_patterns, get_pbar_remaining, get_soup
from utils.tools import get_pbar_remaining, get_soup
from utils.config import get_config
from proxy import get_proxy, get_proxy_next
from time import time, sleep
@ -19,7 +19,7 @@ from utils.retry import (
)
from selenium.webdriver.common.by import By
from tqdm.asyncio import tqdm_asyncio
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests_custom.utils import get_soup_requests, close_session
import urllib.parse as urlparse
from urllib.parse import parse_qs
@ -148,10 +148,8 @@ async def get_channels_by_multicast(names, callback):
multicast_region_result, names
)
region_type_list = get_channel_multicast_region_type_list(name_region_type_result)
search_region_type_result = defaultdict(lambda: defaultdict(list))
def process_channel_by_multicast(params):
region, type = params
def process_channel_by_multicast(region, type):
name = f"{region}{type}"
info_list = []
nonlocal proxy
@ -172,17 +170,17 @@ async def get_channels_by_multicast(names, callback):
search_submit(driver, name)
else:
page_soup = None
request_url = f"{pageUrl}?net={name}"
post_form = {"saerch": name}
code = None
try:
page_soup = retry_func(
lambda: get_soup_requests(request_url, proxy=proxy),
lambda: get_soup_requests(pageUrl, data=post_form, proxy=proxy),
name=f"multicast search:{name}",
)
except Exception as e:
if config.open_proxy:
proxy = get_proxy_next()
page_soup = get_soup_requests(request_url, proxy=proxy)
page_soup = get_soup_requests(pageUrl, data=post_form, proxy=proxy)
if not page_soup:
print(f"{name}:Request fail.")
return {"region": region, "type": type, "data": info_list}
@ -265,10 +263,7 @@ async def get_channels_by_multicast(names, callback):
# search_submit(driver, name)
# retries += 1
# continue
for result in results:
url, date, _, _ = result
if url:
info_list.append((url, date, None))
info_list = info_list + results
# break
else:
print(f"{name}:No results found")
@ -301,18 +296,25 @@ async def get_channels_by_multicast(names, callback):
callback(
f"正在进行组播更新, {len(names)}个频道, 共{region_type_list_len}个地区组播源", 0
)
search_region_type_result = defaultdict(lambda: defaultdict(list))
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [
executor.submit(process_channel_by_multicast, (region, type))
for (region, type) in region_type_list
]
for future in futures:
futures = {
executor.submit(process_channel_by_multicast, region, type): (region, type)
for region, type in region_type_list
}
for future in as_completed(futures):
region, type = futures[future]
result = future.result()
region = result.get("region")
type = result.get("type")
data = result.get("data", [])
if region and type and data:
search_region_type_result[region][type] = data
data = result.get("data")
if data:
region_type_results = search_region_type_result[region][type]
for item in data:
url = item.get("url")
date = item.get("date")
if url:
region_type_results.append((url, date, None))
channels = get_channel_multicast_result(
name_region_type_result, search_region_type_result
)

@ -14,14 +14,19 @@ headers = {
session = requests.Session()
def get_source_requests(url, proxy=None, timeout=30):
def get_source_requests(url, data=None, proxy=None, timeout=30):
"""
Get the source by requests
"""
proxies = {"http": proxy}
ua = UserAgent()
headers["User-Agent"] = ua.random
response = session.get(url, headers=headers, proxies=proxies, timeout=timeout)
if data:
response = session.post(
url, headers=headers, data=data, proxies=proxies, timeout=timeout
)
else:
response = session.get(url, headers=headers, proxies=proxies, timeout=timeout)
source = re.sub(
r"<!--.*?-->",
"",
@ -31,11 +36,11 @@ def get_source_requests(url, proxy=None, timeout=30):
return source
def get_soup_requests(url, proxy=None, timeout=30):
def get_soup_requests(url, data=None, proxy=None, timeout=30):
"""
Get the soup by requests
"""
source = get_source_requests(url, proxy, timeout)
source = get_source_requests(url, data, proxy, timeout)
soup = BeautifulSoup(source, "html.parser")
return soup

@ -176,16 +176,14 @@ def get_channel_multicast_region_ip_list(result, channel_region, channel_type):
"""
Get the channel multicast region ip list by region and type from result
"""
ip_list = []
if result and channel_region and channel_type:
for result_region, result_obj in result.items():
if result_region in channel_region:
types = result_obj.keys()
for type in types:
if type in channel_type:
urls = result_obj[type]
ip_list = ip_list + get_multicast_ip_list(urls)
return ip_list
return [
ip
for result_region, result_obj in result.items()
if result_region in channel_region
for type, urls in result_obj.items()
if type in channel_type
for ip in get_multicast_ip_list(urls)
]
def get_channel_multicast_total_url_list(url, ip_list):
@ -216,11 +214,14 @@ def get_channel_multicast_region_type_list(result):
"""
Get the channel multicast region type list from result
"""
region_type_list = set()
for region_type in result.values():
for region, types in region_type.items():
for type in types.keys():
region_type_list.add((region, type))
config_region_list = set(getattr(config, "region_list", []))
region_type_list = {
(region, type)
for region_type in result.values()
for region, types in region_type.items()
if "all" in config_region_list or region in config_region_list
for type in types
}
return list(region_type_list)
@ -229,25 +230,19 @@ def get_channel_multicast_result(result, search_result):
Get the channel multicast info result by result and search result
"""
info_result = {}
for region, type_obj in search_result.items():
for name, result_obj in result.items():
info_list = []
result_type_obj = result_obj.get(region)
if result_type_obj:
for type, data_list in type_obj.items():
urls = result_type_obj.get(type)
if urls:
ip_list = get_multicast_ip_list(urls)
if ip_list:
for data in data_list:
url, date, resolution = data
total_urls = get_channel_multicast_total_url_list(
url, ip_list
)
for total_url in total_urls:
if check_url_by_patterns(total_url):
info_list.append((total_url, date, resolution))
info_result[name] = info_list
for name, result_obj in result.items():
info_list = [
(total_url, date, resolution)
for result_region, result_types in result_obj.items()
if result_region in search_result
for result_type, result_type_urls in result_types.items()
if result_type in search_result[result_region]
for ip in get_multicast_ip_list(result_type_urls) or []
for url, date, resolution in search_result[result_region][result_type]
for total_url in get_channel_multicast_total_url_list(url, [ip])
if check_url_by_patterns(total_url)
]
info_result[name] = info_list
return info_result
@ -282,29 +277,36 @@ def get_results_from_multicast_soup(soup):
results = []
for element in soup.descendants:
if isinstance(element, NavigableString):
text = element.get_text(strip=True)
text = element.strip()
url = get_channel_url(text)
if url and not any(item[0] == url for item in results):
if url and not any(item["url"] == url for item in results):
url_element = soup.find(lambda tag: tag.get_text(strip=True) == url)
if url_element:
next_first_element = url_element.find_next_sibling()
if next_first_element:
valid_element = next_first_element.find_next_sibling()
valid = False
if valid_element:
valid_text = valid_element.get_text(strip=True)
if "失效" not in valid_text:
valid = True
if valid:
both_element = valid_element.find_next_sibling()
info_element = both_element.find_next_sibling()
if info_element:
info_text = info_element.get_text(strip=True)
if "上线" in info_text and " " in info_text:
date, region, type = get_multicast_channel_info(
info_text
)
results.append((url, date, region, type))
if not url_element:
continue
valid_element = url_element.find_next_sibling()
if not valid_element:
continue
valid_text = valid_element.get_text(strip=True)
if "失效" in valid_text:
continue
info_element = valid_element.find_next_sibling().find_next_sibling()
if not info_element:
continue
info_text = info_element.get_text(strip=True)
if "上线" in info_text and " " in info_text:
date, region, type = get_multicast_channel_info(info_text)
results.append(
{
"url": url,
"date": date,
"region": region,
"type": type,
}
)
return results
@ -338,23 +340,34 @@ def get_results_from_multicast_soup_requests(soup):
Get the results from the multicast soup by requests
"""
results = []
elements = soup.find_all("div", class_="result") if soup else []
if not soup:
return results
elements = soup.find_all("div", class_="result")
for element in elements:
name_element = element.find("div", class_="channel")
if name_element:
text_list = get_element_child_text_list(element, "div")
url = date = region = type = None
valid = True
for text in text_list:
text_url = get_channel_url(text)
if text_url:
url = text_url
if "失效" in text:
valid = False
if url and "上线" in text and " " in text:
date, region, type = get_multicast_channel_info(text)
if url and valid:
results.append((url, date, region, type))
if not name_element:
continue
text_list = get_element_child_text_list(element, "div")
url, date, region, type = None, None, None, None
valid = True
for text in text_list:
if "失效" in text:
valid = False
break
text_url = get_channel_url(text)
if text_url:
url = text_url
if url and "上线" in text and " " in text:
date, region, type = get_multicast_channel_info(text)
if url and valid:
results.append({"url": url, "date": date, "region": region, "type": type})
return results

@ -45,7 +45,7 @@ def is_ffmpeg_installed():
return False
async def ffmpeg_url(url, timeout):
async def ffmpeg_url(url, timeout=timeout):
"""
Get url info by ffmpeg
"""
@ -56,7 +56,7 @@ async def ffmpeg_url(url, timeout):
proc = await asyncio.create_subprocess_exec(
*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
out, err = await asyncio.wait_for(proc.communicate(), timeout=timeout)
out, err = await asyncio.wait_for(proc.communicate(), timeout=timeout + 15)
if out:
res = out.decode("utf-8")
if err:
@ -99,7 +99,7 @@ async def check_stream_speed(url_info):
"""
try:
url = url_info[0]
video_info = await ffmpeg_url(url, timeout)
video_info = await ffmpeg_url(url, timeout=timeout)
if video_info is None:
return float("inf")
frame, resolution = get_video_info(video_info)
@ -114,22 +114,21 @@ async def check_stream_speed(url_info):
return float("inf")
async def get_info_with_speed(url_info, semaphore):
async def get_info_with_speed(url_info):
"""
Get the info with speed
"""
async with semaphore:
url, _, _ = url_info
url_info = list(url_info)
if "$" in url:
url = url.split("$")[0]
url = quote(url, safe=":/?&=$[]")
url_info[0] = url
try:
speed = await check_stream_speed(url_info)
return speed
except Exception:
return float("inf")
url, _, _ = url_info
url_info = list(url_info)
if "$" in url:
url = url.split("$")[0]
url = quote(url, safe=":/?&=$[]")
url_info[0] = url
try:
speed = await check_stream_speed(url_info)
return speed
except Exception:
return float("inf")
async def sort_urls_by_speed_and_resolution(infoList, ffmpeg=False):
@ -137,9 +136,8 @@ async def sort_urls_by_speed_and_resolution(infoList, ffmpeg=False):
Sort by speed and resolution
"""
if ffmpeg:
semaphore = asyncio.Semaphore(10)
response = await asyncio.gather(
*(get_info_with_speed(url_info, semaphore) for url_info in infoList)
*(get_info_with_speed(url_info) for url_info in infoList)
)
valid_response = [res for res in response if res != float("inf")]
else: