1
0

feat:url more info

This commit is contained in:
guorong.zheng 2024-10-29 18:11:54 +08:00
parent c1a135ccff
commit 8e8d3dd7ff
9 changed files with 147 additions and 117 deletions

@ -130,7 +130,7 @@ class UpdateSource:
def get_urls_len(self, filter=False):
data = copy.deepcopy(self.channel_data)
if filter:
process_nested_dict(data, seen=set(), flag="$cache:")
process_nested_dict(data, seen=set(), flag=r"cache:(.*)")
processed_urls = set(
url_info[0]
for channel_obj in data.values()

Binary file not shown.

@ -8,7 +8,7 @@ from driver.setup import setup_driver
import re
from utils.retry import retry_func
from utils.channel import format_channel_name
from utils.tools import merge_objects, get_pbar_remaining
from utils.tools import merge_objects, get_pbar_remaining, add_url_info
from updates.proxy import get_proxy, get_proxy_next
from requests_custom.utils import get_source_requests, close_session
from collections import defaultdict
@ -32,11 +32,16 @@ def get_fofa_urls_from_region_list():
urls = []
region_url = getattr(fofa_map, "region_url")
if "all" in region_list or "ALL" in region_list or "全部" in region_list:
urls = [url for url_list in region_url.values() for url in url_list if url]
urls = [
(url, region)
for region, url_list in region_url.items()
for url in url_list
if url
]
else:
for region in region_list:
if region in region_url:
urls.append(region_url[region])
urls.extend([(url, region) for url in region_url[region] if url])
return urls
@ -56,14 +61,16 @@ def update_fofa_region_result_tmp(result, multicast=False):
def get_fofa_region_result_tmp(multicast: False):
with open(
resource_path(
f"updates/fofa/fofa_{'multicast' if multicast else 'hotel'}_region_result.pkl"
),
"rb",
) as file:
result = pickle.load(file)
return result
try:
with open(
resource_path(
f"updates/fofa/fofa_{'multicast' if multicast else 'hotel'}_region_result.pkl"
),
"rb",
) as file:
return pickle.load(file)
except:
return {}
async def get_channels_by_fofa(urls=None, multicast=False, callback=None):
@ -89,7 +96,7 @@ async def get_channels_by_fofa(urls=None, multicast=False, callback=None):
open_driver = config.getboolean("Settings", "open_driver", fallback=True)
open_sort = config.getboolean("Settings", "open_sort", fallback=True)
if open_proxy:
test_url = fofa_urls[0][0] if multicast else fofa_urls[0]
test_url = fofa_urls[0][0]
proxy = await get_proxy(test_url, best=True, with_test=True)
cancel_event = threading.Event()
@ -97,7 +104,7 @@ async def get_channels_by_fofa(urls=None, multicast=False, callback=None):
nonlocal proxy, fofa_urls_len, open_driver, open_sort, cancel_event
if cancel_event.is_set():
return {}
fofa_url = fofa_info[0] if multicast else fofa_info
fofa_url = fofa_info[0]
results = defaultdict(lambda: defaultdict(list))
driver = None
try:
@ -130,7 +137,9 @@ async def get_channels_by_fofa(urls=None, multicast=False, callback=None):
else:
with ThreadPoolExecutor(max_workers=100) as executor:
futures = [
executor.submit(process_fofa_json_url, url, open_sort)
executor.submit(
process_fofa_json_url, url, fofa_info[1], open_sort
)
for url in urls
]
for future in futures:
@ -183,7 +192,7 @@ async def get_channels_by_fofa(urls=None, multicast=False, callback=None):
return fofa_results
def process_fofa_json_url(url, open_sort):
def process_fofa_json_url(url, region, open_sort):
"""
Process the FOFA json url
"""
@ -205,9 +214,14 @@ def process_fofa_json_url(url, open_sort):
item_url = item.get("url").strip()
if item_name and item_url:
total_url = (
f"{url}{item_url}$cache:{url}"
add_url_info(
f"{url}{item_url}",
f"{region}酒店源|cache:{url}",
)
if open_sort
else f"{url}{item_url}"
else add_url_info(
f"{url}{item_url}", f"{region}酒店源"
)
)
if item_name not in channels:
channels[item_name] = [(total_url, None, None)]

@ -1,5 +1,3 @@
from asyncio import create_task, gather
from utils.speed import get_speed
from utils.channel import (
get_results_from_multicast_soup,
get_results_from_multicast_soup_requests,
@ -43,8 +41,7 @@ async def get_channels_by_hotel(callback=None):
if region.strip()
]
if "all" in region_list or "ALL" in region_list or "全部" in region_list:
fofa_region_name_list = list(getattr(fofa_map, "region_url").keys())
region_list = fofa_region_name_list
region_list = list(getattr(fofa_map, "region_url").keys())
if open_proxy:
proxy = await get_proxy(pageUrl, best=True, with_test=True)
start_time = time()
@ -85,7 +82,7 @@ async def get_channels_by_hotel(callback=None):
page_soup = get_soup_requests(pageUrl, data=post_form, proxy=proxy)
if not page_soup:
print(f"{name}:Request fail.")
return {"region": region, "type": type, "data": info_list}
return info_list
else:
a_tags = page_soup.find_all("a", href=True)
for a_tag in a_tags:
@ -156,7 +153,7 @@ async def get_channels_by_hotel(callback=None):
f"正在获取Tonkiang酒店源, 剩余{region_list_len - pbar.n}个地区待查询, 预计剩余时间: {get_pbar_remaining(n=pbar.n, total=pbar.total, start_time=start_time)}",
int((pbar.n / region_list_len) * 100),
)
return {"region": region, "type": type, "data": info_list}
return info_list
region_list_len = len(region_list)
pbar = tqdm_asyncio(total=region_list_len, desc="Tonkiang hotel search")
@ -172,22 +169,20 @@ async def get_channels_by_hotel(callback=None):
for future in as_completed(futures):
region = futures[future]
result = future.result()
data = result.get("data")
if data:
for item in data:
if result:
for item in result:
url = item.get("url")
date = item.get("date")
if url:
search_region_result[region].append((url, date, None))
urls = [
f"http://{url}/ZHGXTV/Public/json/live_interface.txt"
for result in search_region_result.values()
{region: region, url: f"http://{url}/ZHGXTV/Public/json/live_interface.txt"}
for region, result in search_region_result.items()
for url, _, _ in result
]
open_sort = config.getboolean("Settings", "open_sort", fallback=True)
channels = await get_channels_by_subscribe_urls(
urls, hotel=True, retry=False, error_print=False, with_cache=open_sort
urls, hotel=True, retry=False, error_print=False
)
if not open_driver:
close_session()

@ -4,7 +4,12 @@ from time import time
from requests import Session, exceptions
from utils.retry import retry_func
from utils.channel import get_name_url, format_channel_name
from utils.tools import merge_objects, get_pbar_remaining, format_url_with_cache
from utils.tools import (
merge_objects,
get_pbar_remaining,
format_url_with_cache,
add_url_info,
)
from concurrent.futures import ThreadPoolExecutor
from collections import defaultdict
@ -17,7 +22,6 @@ async def get_channels_by_subscribe_urls(
hotel=False,
retry=True,
error_print=True,
with_cache=False,
callback=None,
):
"""
@ -44,9 +48,9 @@ async def get_channels_by_subscribe_urls(
session = Session()
def process_subscribe_channels(subscribe_info):
if multicast and isinstance(subscribe_info, dict):
if (multicast or hotel) and isinstance(subscribe_info, dict):
region = subscribe_info.get("region")
type = subscribe_info.get("type")
type = subscribe_info.get("type", "")
subscribe_url = subscribe_info.get("url")
else:
subscribe_url = subscribe_info
@ -72,8 +76,15 @@ async def get_channels_by_subscribe_urls(
name = item["name"]
url = item["url"]
if name and url:
if not multicast:
info = (
f"{region}酒店源"
if hotel
else "组播源" if "/rtp/" in url else "订阅源"
)
url = add_url_info(url, info)
url = format_url_with_cache(
url, cache=subscribe_url if with_cache else None
url, cache=subscribe_url if (multicast or hotel) else None
)
value = url if multicast else (url, None, None)
name = format_channel_name(name)

@ -4,11 +4,12 @@ from utils.tools import (
get_total_urls_from_info_list,
process_nested_dict,
get_resolution_value,
add_url_info,
remove_cache_info,
)
from utils.speed import (
sort_urls_by_speed_and_resolution,
is_ffmpeg_installed,
add_info_url,
speed_cache,
)
import os
@ -294,9 +295,14 @@ def get_channel_multicast_result(result, search_result):
info_list = [
(
(
f"http://{url}/rtp/{ip}$cache:{url}"
add_url_info(
f"http://{url}/rtp/{ip}",
f"{result_region}{result_type}组播源|cache:{url}",
)
if open_sort
else f"http://{url}/rtp/{ip}"
else add_url_info(
f"http://{url}/rtp/{ip}", f"{result_region}{result_type}组播源"
)
),
date,
resolution,
@ -468,7 +474,7 @@ def get_channel_url(text):
text,
)
if url_search:
url = url_search.group().strip()
url = url_search.group()
return url
@ -709,7 +715,7 @@ async def process_sort_channel_list(data, ipv6=False, callback=None):
is_ffmpeg = open_ffmpeg and ffmpeg_installed
semaphore = asyncio.Semaphore(5)
need_sort_data = copy.deepcopy(data)
process_nested_dict(need_sort_data, seen=set(), flag="$cache:")
process_nested_dict(need_sort_data, seen=set(), flag=r"cache:(.*)")
tasks = [
asyncio.create_task(
sort_channel_list(
@ -737,39 +743,41 @@ async def process_sort_channel_list(data, ipv6=False, callback=None):
for name, info_list in obj.items():
sort_info_list = sort_data.get(cate, {}).get(name, [])
sort_urls = {
sort_url[0].split("$")[0]
remove_cache_info(sort_url[0])
for sort_url in sort_info_list
if sort_url and sort_url[0]
}
for url, date, resolution, origin in info_list:
url_rsplit = url.rsplit("$cache:", 1)
if len(url_rsplit) != 2:
continue
url, cache_key = url_rsplit
url = url.split("$")[0]
if url in sort_urls or cache_key not in speed_cache:
continue
cache = speed_cache[cache_key]
if not cache:
continue
response_time, resolution = cache
if response_time and response_time != float("inf"):
if resolution:
url = add_info_url(url, resolution)
if open_filter_resolution:
resolution_value = get_resolution_value(resolution)
if resolution_value < min_resolution:
continue
append_data_to_info_data(
sort_data,
cate,
name,
[(url, date, resolution, origin)],
check=False,
)
logging.info(
f"Name: {name}, URL: {url}, Date: {date}, Resolution: {resolution}, Response Time: {response_time} ms"
)
if "$" in url:
matcher = re.search(r"cache:(.*)", url)
if matcher:
cache_key = matcher.group(1)
if not cache_key:
continue
url = remove_cache_info(url)
if url in sort_urls or cache_key not in speed_cache:
continue
cache = speed_cache[cache_key]
if not cache:
continue
response_time, resolution = cache
if response_time and response_time != float("inf"):
if resolution:
if open_filter_resolution:
resolution_value = get_resolution_value(resolution)
if resolution_value < min_resolution:
continue
url = add_url_info(url, resolution)
append_data_to_info_data(
sort_data,
cate,
name,
[(url, date, resolution, origin)],
check=False,
)
logging.info(
f"Name: {name}, URL: {url}, Date: {date}, Resolution: {resolution}, Response Time: {response_time} ms"
)
return sort_data
@ -881,7 +889,7 @@ def format_channel_url_info(data):
for obj in data.values():
for url_info in obj.values():
for i, (url, date, resolution, origin) in enumerate(url_info):
url = url.split("$", 1)[0]
url = remove_cache_info(url)
if resolution:
url = add_info_url(url, resolution)
url = add_url_info(url, resolution)
url_info[i] = (url, date, resolution, origin)

@ -3,7 +3,7 @@ from time import time
import asyncio
import re
from utils.config import config
from utils.tools import is_ipv6, get_resolution_value
from utils.tools import is_ipv6, get_resolution_value, add_url_info, remove_cache_info
import subprocess
timeout = config.getint("Settings", "sort_timeout", fallback=5)
@ -106,23 +106,14 @@ async def check_stream_speed(url_info):
if frame is None or frame == float("inf"):
return float("inf")
if resolution:
url_info[0] = add_info_url(url, resolution)
url_info[0] = add_url_info(url, resolution)
url_info[2] = resolution
return (tuple(url_info), frame)
return (url_info, frame)
except Exception as e:
print(e)
return float("inf")
def add_info_url(url, info):
"""
Format the url
"""
separator = "|" if "$" in url else "$"
url += f"{separator}{info}"
return url
speed_cache = {}
@ -138,13 +129,13 @@ async def get_speed_by_info(
cache_key = None
if "$" in url:
url, cache_info = url.split("$", 1)
if "cache:" in cache_info:
matcher = re.search(r"cache:(.*)", cache_info)
if matcher:
cache_key = matcher.group(1)
matcher = re.search(r"cache:(.*)", cache_info)
if matcher:
cache_key = matcher.group(1)
url_show_info = remove_cache_info(cache_info)
url_is_ipv6 = is_ipv6(url)
if url_is_ipv6:
url = add_info_url(url, "IPv6")
url = add_url_info(url, "IPv6")
url_info[0] = url
if cache_key in speed_cache:
speed = speed_cache[cache_key][0]
@ -162,12 +153,13 @@ async def get_speed_by_info(
else:
url_speed = await get_speed(url)
speed = (
(tuple(url_info), url_speed)
if url_speed != float("inf")
else float("inf")
(url_info, url_speed) if url_speed != float("inf") else float("inf")
)
if cache_key and cache_key not in speed_cache:
speed_cache[cache_key] = (url_speed, resolution)
if url_show_info:
speed[0][0] = add_url_info(speed[0][0], url_show_info)
speed = (tuple(speed[0]), speed[1])
return speed
except Exception:
return float("inf")

@ -203,14 +203,13 @@ def get_total_urls_from_info_list(infoList, ipv6=False):
if len(total_urls) < urls_limit:
for origin in origin_type_prefer:
for ipv_type in ipv_type_total:
if ipv_num[ipv_type] < ipv_limit[ipv_type]:
extra_urls = (
categorized_urls[origin][ipv_type][source_limits[origin] :]
if ipv_type in ipv_type_prefer
else categorized_urls[origin][ipv_type][: source_limits[origin]]
)
if len(total_urls) < urls_limit:
extra_urls = categorized_urls[origin][ipv_type][
: source_limits[origin]
]
total_urls.extend(extra_urls)
ipv_num[ipv_type] += len(extra_urls)
total_urls = list(dict.fromkeys(total_urls))[:urls_limit]
ipv_num[ipv_type] += urls_limit - len(total_urls)
if len(total_urls) >= urls_limit:
break
if len(total_urls) >= urls_limit:
@ -436,7 +435,11 @@ def remove_duplicates_from_tuple_list(tuple_list, seen, flag=None):
"""
unique_list = []
for item in tuple_list:
part = item[0] if flag is None else item[0].rsplit(flag, 1)[-1]
if flag:
matcher = re.search(flag, item[0])
part = matcher.group(1) if matcher else item[0]
else:
part = item[0]
if part not in seen:
seen.add(part)
unique_list.append(item)
@ -454,34 +457,41 @@ def process_nested_dict(data, seen, flag=None):
data[key] = remove_duplicates_from_tuple_list(value, seen, flag)
ip_pattern = re.compile(
r"""
(
(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}) # IPv4
|([a-zA-Z0-9.-]+\.[a-zA-Z]{2,}) # Domain
|(\[([0-9a-fA-F:]+)\]) # IPv6
)
(?::(\d+))? # Port
""",
re.VERBOSE,
url_domain_pattern = re.compile(
r"\b((https?):\/\/)?(\[[0-9a-fA-F:]+\]|([\w-]+\.)+[\w-]+)(:[0-9]{1,5})?\b"
)
def get_ip(url):
def get_url_domain(url):
"""
Get the IP address with flags
Get the url domain
"""
matcher = ip_pattern.search(url)
matcher = url_domain_pattern.search(url)
if matcher:
return matcher.group(1)
return matcher.group()
return None
def add_url_info(url, info):
"""
Add url info to the URL
"""
if info:
separator = "|" if "$" in url else "$"
url += f"{separator}{info}"
return url
def format_url_with_cache(url, cache=None):
"""
Format the URL with cache
"""
if not cache:
cache = get_ip(url) or ""
cache = cache or get_url_domain(url) or ""
return add_url_info(url, f"cache:{cache}") if cache else url
return f"{url}$cache:{cache}"
def remove_cache_info(str):
"""
Remove the cache info from the string
"""
return re.sub(r"cache:.*|\|cache:.*", "", str)