1
0

Merge pull request from Guovin/dev

Feat
This commit is contained in:
Govin 2024-10-30 17:52:30 +08:00 committed by GitHub
commit 234948369a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 385 additions and 399 deletions

@ -1,5 +1,6 @@
import tkinter as tk
from utils.config import config
import utils.constants as constants
from tkinter import ttk
from tkinter import scrolledtext
from tkinter import filedialog
@ -23,9 +24,7 @@ class DefaultUI:
frame_default_open_update_column1, text="开启更新:", width=8
)
self.open_update_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_update_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_update", fallback=True)
)
self.open_update_var = tk.BooleanVar(value=constants.open_update)
self.open_update_checkbutton = ttk.Checkbutton(
frame_default_open_update_column1,
variable=self.open_update_var,
@ -41,7 +40,7 @@ class DefaultUI:
)
self.open_use_old_result_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_use_old_result_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_use_old_result", fallback=True)
value=constants.open_use_old_result
)
self.open_use_old_result_checkbutton = ttk.Checkbutton(
frame_default_open_update_column2,
@ -66,9 +65,7 @@ class DefaultUI:
self.source_file_entry = tk.Entry(frame_default_source_file_column1)
self.source_file_label.pack(side=tk.LEFT, padx=4, pady=8)
self.source_file_entry.pack(fill=tk.X, padx=4, expand=True)
self.source_file_entry.insert(
0, config.get("Settings", "source_file", fallback="config/demo.txt")
)
self.source_file_entry.insert(0, constants.source_file)
self.source_file_button = tk.ttk.Button(
frame_default_source_file_column2,
@ -90,9 +87,7 @@ class DefaultUI:
self.final_file_entry = tk.Entry(frame_default_final_file_column1)
self.final_file_label.pack(side=tk.LEFT, padx=4, pady=8)
self.final_file_entry.pack(fill=tk.X, padx=4, expand=True)
self.final_file_entry.insert(
0, config.get("Settings", "final_file", fallback="output/result.txt")
)
self.final_file_entry.insert(0, constants.final_file)
self.final_file_button = tk.ttk.Button(
frame_default_final_file_column2,
@ -112,9 +107,7 @@ class DefaultUI:
frame_default_mode_params_column1, text="浏览器模式:", width=12
)
self.open_driver_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_driver_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_driver", fallback=True)
)
self.open_driver_var = tk.BooleanVar(value=constants.open_driver)
self.open_driver_checkbutton = ttk.Checkbutton(
frame_default_mode_params_column1,
variable=self.open_driver_var,
@ -129,9 +122,7 @@ class DefaultUI:
frame_default_mode_params_column2, text="开启代理:", width=12
)
self.open_proxy_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_proxy_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_proxy", fallback=False)
)
self.open_proxy_var = tk.BooleanVar(value=constants.open_proxy)
self.open_proxy_checkbutton = ttk.Checkbutton(
frame_default_mode_params_column2,
variable=self.open_proxy_var,
@ -155,9 +146,7 @@ class DefaultUI:
self.urls_limit_label.pack(side=tk.LEFT, padx=4, pady=8)
self.urls_limit_entry = tk.Entry(frame_default_channel_column1)
self.urls_limit_entry.pack(side=tk.LEFT, padx=4, pady=8)
self.urls_limit_entry.insert(
15, config.getint("Settings", "urls_limit", fallback=30)
)
self.urls_limit_entry.insert(0, constants.urls_limit)
self.urls_limit_entry.bind("<KeyRelease>", self.update_urls_limit)
self.ipv_type_label = tk.Label(
@ -167,10 +156,9 @@ class DefaultUI:
self.ipv_type_combo = ttk.Combobox(frame_default_channel_column2)
self.ipv_type_combo.pack(side=tk.LEFT, padx=4, pady=8)
self.ipv_type_combo["values"] = ("IPv4", "IPv6", "全部")
ipv_type = config.get("Settings", "ipv_type", fallback="全部").lower()
if ipv_type == "ipv4":
if constants.ipv_type == "ipv4":
self.ipv_type_combo.current(0)
elif ipv_type == "ipv6":
elif constants.ipv_type == "ipv6":
self.ipv_type_combo.current(1)
else:
self.ipv_type_combo.current(2)
@ -189,16 +177,14 @@ class DefaultUI:
frame_default_sort_column1, text="保留模式:", width=12
)
self.open_keep_all_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_keep_all_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_keep_all", fallback=False)
)
self.open_keep_all_var = tk.BooleanVar(value=constants.open_keep_all)
self.open_keep_all_checkbutton = ttk.Checkbutton(
frame_default_sort_column1,
variable=self.open_keep_all_var,
onvalue=True,
offvalue=False,
command=self.update_open_keep_all,
text="(非严格匹配)",
text="(保留所有查询记录)",
)
self.open_keep_all_checkbutton.pack(side=tk.LEFT, padx=4, pady=8)
@ -206,9 +192,7 @@ class DefaultUI:
frame_default_sort_column2, text="测速排序:", width=12
)
self.open_sort_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_sort_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_sort", fallback=True)
)
self.open_sort_var = tk.BooleanVar(value=constants.open_sort)
self.open_sort_checkbutton = ttk.Checkbutton(
frame_default_sort_column2,
variable=self.open_sort_var,
@ -224,9 +208,7 @@ class DefaultUI:
self.sort_timeout_label.pack(side=tk.LEFT, padx=4, pady=8)
self.sort_timeout_entry = tk.Entry(frame_default_sort_column3)
self.sort_timeout_entry.pack(side=tk.LEFT, padx=4, pady=8)
self.sort_timeout_entry.insert(
0, config.getint("Settings", "sort_timeout", fallback=5)
)
self.sort_timeout_entry.insert(0, constants.sort_timeout)
self.sort_timeout_entry.bind("<KeyRelease>", self.update_sort_timeout)
frame_default_sort_mode = tk.Frame(root)
@ -240,9 +222,7 @@ class DefaultUI:
frame_default_sort_mode_column1, text="FFmpeg测速:", width=12
)
self.open_ffmpeg_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_ffmpeg_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_ffmpeg", fallback=True)
)
self.open_ffmpeg_var = tk.BooleanVar(value=constants.open_ffmpeg)
self.open_ffmpeg_checkbutton = ttk.Checkbutton(
frame_default_sort_mode_column1,
variable=self.open_ffmpeg_var,
@ -257,9 +237,7 @@ class DefaultUI:
frame_default_sort_mode_column2, text="M3U转换:", width=12
)
self.open_m3u_result_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_m3u_result_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_m3u_result", fallback=True)
)
self.open_m3u_result_var = tk.BooleanVar(value=constants.open_m3u_result)
self.open_m3u_result_checkbutton = ttk.Checkbutton(
frame_default_sort_mode_column2,
variable=self.open_m3u_result_var,
@ -286,7 +264,7 @@ class DefaultUI:
)
self.open_filter_resolution_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_filter_resolution_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_filter_resolution", fallback=True)
value=constants.open_filter_resolution
)
self.open_filter_resolution_checkbutton = ttk.Checkbutton(
frame_default_resolution_params_column1,
@ -304,9 +282,7 @@ class DefaultUI:
self.min_resolution_label.pack(side=tk.LEFT, padx=4, pady=8)
self.min_resolution_entry = tk.Entry(frame_default_resolution_params_column2)
self.min_resolution_entry.pack(side=tk.LEFT, padx=4, pady=8)
self.min_resolution_entry.insert(
0, config.get("Settings", "min_resolution", fallback="1920x1080")
)
self.min_resolution_entry.insert(0, constants.min_resolution)
self.min_resolution_entry.bind("<KeyRelease>", self.update_min_resolution)
frame_default_sort_params = tk.Frame(root)
@ -329,9 +305,7 @@ class DefaultUI:
command=self.update_response_time_weight,
)
self.response_time_weight_scale.pack(side=tk.LEFT, padx=4, pady=8)
self.response_time_weight_scale.set(
config.getfloat("Settings", "response_time_weight", fallback=0.5)
)
self.response_time_weight_scale.set(constants.response_time_weight)
self.resolution_weight_label = tk.Label(
frame_default_sort_params_column2, text="分辨率权重:", width=12
@ -346,9 +320,7 @@ class DefaultUI:
command=self.update_resolution_weight,
)
self.resolution_weight_scale.pack(side=tk.LEFT, padx=4, pady=8)
self.resolution_weight_scale.set(
config.getfloat("Settings", "resolution_weight", fallback=0.5)
)
self.resolution_weight_scale.set(constants.resolution_weight)
frame_default_open_update_info = tk.Frame(root)
frame_default_open_update_info.pack(fill=tk.X)
@ -365,9 +337,7 @@ class DefaultUI:
frame_default_open_update_info_column1, text="显示更新时间:", width=12
)
self.open_update_time_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_update_time_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_update_time", fallback=True)
)
self.open_update_time_var = tk.BooleanVar(value=constants.open_update_time)
self.open_update_time_checkbutton = ttk.Checkbutton(
frame_default_open_update_info_column1,
variable=self.open_update_time_var,
@ -382,9 +352,7 @@ class DefaultUI:
frame_default_open_update_info_column2, text="显示接口信息:", width=12
)
self.open_url_info_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_url_info_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_url_info", fallback=True)
)
self.open_url_info_var = tk.BooleanVar(value=constants.open_url_info)
self.open_url_info_checkbutton = ttk.Checkbutton(
frame_default_open_update_info_column2,
variable=self.open_url_info_var,
@ -408,9 +376,7 @@ class DefaultUI:
self.domain_blacklist_text.pack(
side=tk.LEFT, padx=4, pady=8, expand=True, fill=tk.BOTH
)
self.domain_blacklist_text.insert(
tk.END, config.get("Settings", "domain_blacklist", fallback="")
)
self.domain_blacklist_text.insert(tk.END, ",".join(constants.domain_blacklist))
self.domain_blacklist_text.bind("<KeyRelease>", self.update_domain_blacklist)
frame_default_url_keywords_blacklist = tk.Frame(root)
@ -427,7 +393,7 @@ class DefaultUI:
side=tk.LEFT, padx=4, pady=8, expand=True, fill=tk.BOTH
)
self.url_keywords_blacklist_text.insert(
tk.END, config.get("Settings", "url_keywords_blacklist", fallback="")
tk.END, ",".join(constants.url_keywords_blacklist)
)
self.url_keywords_blacklist_text.bind(
"<KeyRelease>", self.update_url_keywords_blacklist

@ -1,6 +1,7 @@
import tkinter as tk
from tkinter import ttk
from utils.config import config
import utils.constants as constants
from select_combobox import SelectCombobox
import updates.fofa.fofa_map as fofa_map
@ -17,9 +18,7 @@ class HotelUI:
frame_hotel_open_hotel, text="开启酒店源:", width=9
)
self.open_hotel_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_hotel_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_hotel", fallback=True)
)
self.open_hotel_var = tk.BooleanVar(value=constants.open_hotel)
self.open_hotel_checkbutton = ttk.Checkbutton(
frame_hotel_open_hotel,
variable=self.open_hotel_var,
@ -37,7 +36,7 @@ class HotelUI:
)
self.open_hotel_mode_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_hotel_tonkiang_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_hotel_tonkiang", fallback=False)
value=constants.open_hotel_tonkiang
)
self.open_hotel_tonkiang_checkbutton = ttk.Checkbutton(
frame_hotel_mode,
@ -49,9 +48,7 @@ class HotelUI:
)
self.open_hotel_tonkiang_checkbutton.pack(side=tk.LEFT, padx=4, pady=8)
self.open_hotel_fofa_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_hotel_fofa", fallback=True)
)
self.open_hotel_fofa_var = tk.BooleanVar(value=constants.open_hotel_fofa)
self.open_hotel_fofa_checkbutton = ttk.Checkbutton(
frame_hotel_mode,
variable=self.open_hotel_fofa_var,
@ -70,17 +67,10 @@ class HotelUI:
)
self.region_list_label.pack(side=tk.LEFT, padx=4, pady=8)
regions = ["全部"] + list(getattr(fofa_map, "region_url").keys())
region_selected_values = [
value.strip()
for value in config.get(
"Settings", "hotel_region_list", fallback="全部"
).split(",")
if value.strip()
]
self.region_list_combo = SelectCombobox(
frame_hotel_region_list,
values=regions,
selected_values=region_selected_values,
selected_values=constants.hotel_region_list,
height=10,
command=self.update_region_list,
)
@ -95,9 +85,7 @@ class HotelUI:
self.page_num_label.pack(side=tk.LEFT, padx=4, pady=8)
self.page_num_entry = tk.Entry(frame_hotel_page_num)
self.page_num_entry.pack(side=tk.LEFT, padx=4, pady=8)
self.page_num_entry.insert(
0, config.getint("Settings", "hotel_page_num", fallback=3)
)
self.page_num_entry.insert(0, constants.hotel_page_num)
self.page_num_entry.bind("<KeyRelease>", self.update_page_num)
def update_open_hotel(self):

@ -1,6 +1,7 @@
import tkinter as tk
from tkinter import ttk
from utils.config import config, resource_path
import utils.constants as constants
from select_combobox import SelectCombobox
import os
@ -18,9 +19,7 @@ class MulticastUI:
frame_multicast_multicast, text="开启组播源:", width=9
)
self.open_multicast_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_multicast_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_multicast", fallback=True)
)
self.open_multicast_var = tk.BooleanVar(value=constants.open_multicast)
self.open_multicast_checkbutton = ttk.Checkbutton(
frame_multicast_multicast,
variable=self.open_multicast_var,
@ -38,9 +37,7 @@ class MulticastUI:
)
self.open_multicast_mode_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_multicast_tonkiang_var = tk.BooleanVar(
value=config.getboolean(
"Settings", "open_multicast_tonkiang", fallback=True
)
value=constants.open_multicast_tonkiang
)
self.open_multicast_tonkiang_checkbutton = ttk.Checkbutton(
frame_multicast_mode,
@ -53,7 +50,7 @@ class MulticastUI:
self.open_multicast_tonkiang_checkbutton.pack(side=tk.LEFT, padx=4, pady=8)
self.open_multicast_fofa_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_multicast_fofa", fallback=True)
value=constants.open_multicast_fofa
)
self.open_multicast_fofa_checkbutton = ttk.Checkbutton(
frame_multicast_mode,
@ -86,17 +83,10 @@ class MulticastUI:
if "全部" in regions:
regions.remove("全部")
regions.insert(0, "全部")
region_selected_values = [
value.strip()
for value in config.get(
"Settings", "multicast_region_list", fallback="全部"
).split(",")
if value.strip()
]
self.region_list_combo = SelectCombobox(
frame_multicast_region_list,
values=regions,
selected_values=region_selected_values,
selected_values=constants.multicast_region_list,
height=10,
command=self.update_region_list,
)
@ -113,9 +103,7 @@ class MulticastUI:
self.page_num_label.pack(side=tk.LEFT, padx=4, pady=8)
self.page_num_entry = tk.Entry(frame_multicast_page_num)
self.page_num_entry.pack(side=tk.LEFT, padx=4, pady=8)
self.page_num_entry.insert(
0, config.getint("Settings", "multicast_page_num", fallback=3)
)
self.page_num_entry.insert(0, constants.multicast_page_num)
self.page_num_entry.bind("<KeyRelease>", self.update_page_num)
def update_open_multicast(self):

@ -1,6 +1,7 @@
import tkinter as tk
from tkinter import ttk
from utils.config import config
import utils.constants as constants
class OnlineSearchUI:
@ -16,9 +17,7 @@ class OnlineSearchUI:
frame_online_search_open_online_search, text="开启关键字搜索:", width=13
)
self.open_online_search_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_online_search_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_online_search", fallback=False)
)
self.open_online_search_var = tk.BooleanVar(value=constants.open_online_search)
self.open_online_search_checkbutton = ttk.Checkbutton(
frame_online_search_open_online_search,
variable=self.open_online_search_var,
@ -37,9 +36,7 @@ class OnlineSearchUI:
self.page_num_label.pack(side=tk.LEFT, padx=4, pady=8)
self.page_num_entry = tk.Entry(frame_online_search_page_num)
self.page_num_entry.pack(side=tk.LEFT, padx=4, pady=8)
self.page_num_entry.insert(
0, config.getint("Settings", "online_search_page_num", fallback=3)
)
self.page_num_entry.insert(0, constants.online_search_page_num)
self.page_num_entry.bind("<KeyRelease>", self.update_page_num)
frame_online_search_recent_days = tk.Frame(root)
@ -51,9 +48,7 @@ class OnlineSearchUI:
self.recent_days_label.pack(side=tk.LEFT, padx=4, pady=8)
self.recent_days_entry = tk.Entry(frame_online_search_recent_days)
self.recent_days_entry.pack(side=tk.LEFT, padx=4, pady=8)
self.recent_days_entry.insert(
30, config.getint("Settings", "recent_days", fallback=30)
)
self.recent_days_entry.insert(30, constants.recent_days)
self.recent_days_entry.bind("<KeyRelease>", self.update_recent_days)
def update_open_online_search(self):

@ -1,6 +1,7 @@
import tkinter as tk
from tkinter import ttk
from utils.config import config
import utils.constants as constants
class PreferUI:
@ -8,14 +9,7 @@ class PreferUI:
"""
Init prefer UI
"""
origin_type_prefer = [
item.lower()
for item in config.get(
"Settings",
"origin_type_prefer",
fallback="subscribe,hotel,multicast,online_search",
).split(",")
]
origin_type_prefer = [item.lower() for item in constants.origin_type_prefer]
config_options = [
{"label_text": f"结果来源优先{i+1}:", "combo_box_value": i}
for i in range(len(origin_type_prefer))
@ -39,12 +33,10 @@ class PreferUI:
self.prefer_ipv_type_combo = ttk.Combobox(frame_prefer_ipv_type)
self.prefer_ipv_type_combo.pack(side=tk.LEFT, padx=4, pady=8)
self.prefer_ipv_type_combo["values"] = ("IPv4", "IPv6", "自动")
ipv_type_prefer = config.get(
"Settings", "ipv_type_prefer", fallback="IPv4"
).lower()
if ipv_type_prefer == "ipv4":
ipv_type_prefer = constants.ipv_type_prefer
if ipv_type_prefer[0] == "ipv4":
self.prefer_ipv_type_combo.current(0)
elif ipv_type_prefer == "ipv6":
elif ipv_type_prefer[0] == "ipv6":
self.prefer_ipv_type_combo.current(1)
else:
self.prefer_ipv_type_combo.current(2)
@ -90,14 +82,7 @@ class IpvNumInput:
self.entry_label.pack(side=tk.LEFT, padx=4, pady=8)
self.entry = tk.Entry(self.frame_column1)
self.entry.insert(
0,
config.getint(
"Settings",
f"{ipv_type}_num",
fallback=15,
),
)
self.entry.insert(0, constants.ipv_limit[ipv_type])
self.entry.pack(side=tk.LEFT, padx=4, pady=8)
def update_input(self, event):
@ -147,22 +132,13 @@ class ConfigOption:
self.entry = tk.Entry(self.column2)
self.entry.insert(
0,
config.getint(
"Settings",
f"{self.origin_type_prefer_obj[self.combo_box.get()]}_num",
fallback=10,
),
constants.source_limits[self.origin_type_prefer_obj[self.combo_box.get()]],
)
self.entry.pack(side=tk.LEFT, padx=4, pady=8)
def update_select(self, key):
origin_type_prefer_list = [
item.lower()
for item in config.get(
"Settings",
"origin_type_prefer",
fallback="subscribe,hotel,multicast,online_search",
).split(",")
item.lower() for item in constants.origin_type_prefer
]
origin_type_prefer_list[self.combo_box_value] = self.origin_type_prefer_obj[
self.combo_box.get()

@ -1,6 +1,7 @@
import tkinter as tk
from tkinter import ttk
from utils.config import config
import utils.constants as constants
from tkinter import scrolledtext
@ -16,9 +17,7 @@ class SubscribeUI:
frame_subscribe_open_subscribe, text="开启订阅源:", width=9
)
self.open_subscribe_label.pack(side=tk.LEFT, padx=4, pady=8)
self.open_subscribe_var = tk.BooleanVar(
value=config.getboolean("Settings", "open_subscribe", fallback=True)
)
self.open_subscribe_var = tk.BooleanVar(value=constants.open_subscribe)
self.open_subscribe_checkbutton = ttk.Checkbutton(
frame_subscribe_open_subscribe,
variable=self.open_subscribe_var,
@ -36,14 +35,12 @@ class SubscribeUI:
)
self.subscribe_urls_label.pack(side=tk.LEFT, padx=4, pady=8)
self.subscribe_urls_text = scrolledtext.ScrolledText(
frame_subscribe_subscribe_urls, height=5
frame_subscribe_subscribe_urls, height=40
)
self.subscribe_urls_text.pack(
side=tk.LEFT, padx=4, pady=8, expand=True, fill=tk.BOTH
)
self.subscribe_urls_text.insert(
tk.END, config.get("Settings", "subscribe_urls", fallback="")
)
self.subscribe_urls_text.insert(tk.END, ",".join(constants.subscribe_urls))
self.subscribe_urls_text.bind("<KeyRelease>", self.update_subscribe_urls)
def update_open_subscribe(self):

@ -1,4 +1,5 @@
from utils.config import config, resource_path
from utils.config import resource_path
import utils.constants as constants
from tqdm.asyncio import tqdm_asyncio
from time import time
from requests import get
@ -15,22 +16,14 @@ from collections import defaultdict
import pickle
import threading
timeout = config.getint("Settings", "request_timeout", fallback=10)
def get_fofa_urls_from_region_list():
"""
Get the FOFA url from region
"""
region_list = [
region.strip()
for region in config.get(
"Settings", "hotel_region_list", fallback="全部"
).split(",")
if region.strip()
]
urls = []
region_url = getattr(fofa_map, "region_url")
region_list = constants.hotel_region_list
if "all" in region_list or "ALL" in region_list or "全部" in region_list:
urls = [
(url, region)
@ -92,9 +85,9 @@ async def get_channels_by_fofa(urls=None, multicast=False, callback=None):
0,
)
proxy = None
open_proxy = config.getboolean("Settings", "open_proxy", fallback=False)
open_driver = config.getboolean("Settings", "open_driver", fallback=True)
open_sort = config.getboolean("Settings", "open_sort", fallback=True)
open_proxy = constants.open_proxy
open_driver = constants.open_driver
open_sort = constants.open_sort
if open_proxy:
test_url = fofa_urls[0][0]
proxy = await get_proxy(test_url, best=True, with_test=True)
@ -203,7 +196,7 @@ def process_fofa_json_url(url, region, open_sort):
# lambda: get(final_url, timeout=timeout),
# name=final_url,
# )
response = get(final_url, timeout=timeout)
response = get(final_url, timeout=constants.request_timeout)
try:
json_data = response.json()
if json_data["code"] == 0:

@ -3,7 +3,7 @@ from utils.channel import (
get_results_from_multicast_soup_requests,
)
from utils.tools import get_pbar_remaining, get_soup
from utils.config import config
import utils.constants as constants
from updates.proxy import get_proxy, get_proxy_next
from time import time
from driver.setup import setup_driver
@ -30,16 +30,10 @@ async def get_channels_by_hotel(callback=None):
channels = {}
pageUrl = "http://tonkiang.us/hoteliptv.php"
proxy = None
open_proxy = config.getboolean("Settings", "open_proxy", fallback=False)
open_driver = config.getboolean("Settings", "open_driver", fallback=True)
page_num = config.getint("Settings", "hotel_page_num", fallback=3)
region_list = [
region.strip()
for region in config.get(
"Settings", "hotel_region_list", fallback="全部"
).split(",")
if region.strip()
]
open_proxy = constants.open_proxy
open_driver = constants.open_driver
page_num = constants.hotel_page_num
region_list = constants.hotel_region_list
if "all" in region_list or "ALL" in region_list or "全部" in region_list:
region_list = list(getattr(fofa_map, "region_url").keys())
if open_proxy:

@ -7,7 +7,7 @@ from utils.channel import (
get_multicast_fofa_search_urls,
)
from utils.tools import get_pbar_remaining, get_soup, merge_objects
from utils.config import config
import utils.constants as constants
from updates.proxy import get_proxy, get_proxy_next
from updates.fofa import get_channels_by_fofa
from time import time
@ -34,15 +34,9 @@ async def get_channels_by_multicast(names, callback=None):
channels = {}
pageUrl = "http://tonkiang.us/hoteliptv.php"
proxy = None
open_multicast_tonkiang = config.getboolean(
"Settings", "open_multicast_tonkiang", fallback=True
)
open_multicast_fofa = config.getboolean(
"Settings", "open_multicast_fofa", fallback=True
)
open_proxy = config.getboolean("Settings", "open_proxy", fallback=False)
open_driver = config.getboolean("Settings", "open_driver", fallback=True)
page_num = config.getint("Settings", "multicast_page_num", fallback=3)
open_proxy = constants.open_proxy
open_driver = constants.open_driver
page_num = constants.multicast_page_num
if open_proxy:
proxy = await get_proxy(pageUrl, best=True, with_test=True)
multicast_region_result = get_multicast_region_result_by_rtp_txt(callback=callback)
@ -51,7 +45,7 @@ async def get_channels_by_multicast(names, callback=None):
)
region_type_list = get_channel_multicast_region_type_list(name_region_type_result)
search_region_type_result = defaultdict(lambda: defaultdict(list))
if open_multicast_fofa:
if constants.open_multicast_fofa:
fofa_search_urls = get_multicast_fofa_search_urls()
fofa_result = await get_channels_by_fofa(
fofa_search_urls, multicast=True, callback=callback
@ -159,7 +153,7 @@ async def get_channels_by_multicast(names, callback=None):
)
return {"region": region, "type": type, "data": info_list}
if open_multicast_tonkiang:
if constants.open_multicast_tonkiang:
region_type_list_len = len(region_type_list)
pbar = tqdm_asyncio(total=region_type_list_len, desc="Multicast search")
if callback:

@ -5,7 +5,8 @@ sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../.
from updates.subscribe import get_channels_by_subscribe_urls
from driver.utils import get_soup_driver
from utils.config import resource_path, config
from utils.config import resource_path
import utils.constants as constants
from utils.channel import format_channel_name, get_name_url
from utils.tools import get_pbar_remaining
import json
@ -103,13 +104,7 @@ def get_multicast_region_result_by_rtp_txt(callback=None):
Get multicast region result by rtp txt
"""
rtp_path = resource_path("config/rtp")
config_region_list = set(
region.strip()
for region in config.get(
"Settings", "multicast_region_list", fallback="全部"
).split(",")
if region.strip()
)
config_region_list = constants.multicast_region_list
rtp_file_list = [
filename.rsplit(".", 1)[0]
for filename in os.listdir(rtp_path)

@ -11,7 +11,7 @@ from utils.tools import (
get_soup,
format_url_with_cache,
)
from utils.config import config
import utils.constants as constants
from updates.proxy import get_proxy, get_proxy_next
from time import time
from driver.setup import setup_driver
@ -25,8 +25,6 @@ from tqdm.asyncio import tqdm_asyncio
from concurrent.futures import ThreadPoolExecutor
from requests_custom.utils import get_soup_requests, close_session
timeout = config.getint("Settings", "request_timeout", fallback=10)
async def use_accessible_url(callback):
"""
@ -35,8 +33,8 @@ async def use_accessible_url(callback):
callback(f"正在获取最优的关键字搜索节点", 0)
baseUrl1 = "https://www.foodieguide.com/iptvsearch/"
baseUrl2 = "http://tonkiang.us/"
task1 = create_task(get_speed(baseUrl1, timeout=timeout))
task2 = create_task(get_speed(baseUrl2, timeout=timeout))
task1 = create_task(get_speed(baseUrl1, timeout=constants.request_timeout))
task2 = create_task(get_speed(baseUrl2, timeout=constants.request_timeout))
task_results = await gather(task1, task2)
callback(f"获取关键字搜索节点完成", 100)
if task_results[0] == float("inf") and task_results[1] == float("inf"):
@ -57,9 +55,9 @@ async def get_channels_by_online_search(names, callback=None):
if not pageUrl:
return channels
proxy = None
open_proxy = config.getboolean("Settings", "open_proxy", fallback=False)
open_driver = config.getboolean("Settings", "open_driver", fallback=True)
page_num = config.getint("Settings", "online_search_page_num", fallback=3)
open_proxy = constants.open_proxy
open_driver = constants.open_driver
page_num = constants.online_search_page_num
if open_proxy:
proxy = await get_proxy(pageUrl, best=True, with_test=True)
start_time = time()

@ -3,7 +3,7 @@ from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio
from utils.speed import get_speed
from concurrent.futures import ThreadPoolExecutor
from utils.config import config
import utils.constants as constants
from driver.utils import get_soup_driver
from requests_custom.utils import get_soup_requests, close_session
from utils.retry import retry_func
@ -20,7 +20,7 @@ def get_proxy_list(page_count=1):
]
proxy_list = []
urls = []
open_driver = config.getboolean("Settings", "open_driver", fallback=True)
open_driver = constants.open_driver
for page_index in range(1, page_count + 1):
for pattern in url_pattern:
url = pattern.format(page_index)

@ -1,4 +1,4 @@
from utils.config import config
import utils.constants as constants
from tqdm.asyncio import tqdm_asyncio
from time import time
from requests import Session, exceptions
@ -13,8 +13,6 @@ from utils.tools import (
from concurrent.futures import ThreadPoolExecutor
from collections import defaultdict
timeout = config.getint("Settings", "request_timeout", fallback=10)
async def get_channels_by_subscribe_urls(
urls,
@ -28,12 +26,7 @@ async def get_channels_by_subscribe_urls(
Get the channels by subscribe urls
"""
subscribe_results = {}
subscribe_urls = [
url.strip()
for url in config.get("Settings", "subscribe_urls", fallback="").split(",")
if url.strip()
]
subscribe_urls_len = len(urls if urls else subscribe_urls)
subscribe_urls_len = len(urls if urls else constants.subscribe_urls)
pbar = tqdm_asyncio(
total=subscribe_urls_len,
desc=f"Processing subscribe {'for multicast' if multicast else ''}",
@ -60,11 +53,13 @@ async def get_channels_by_subscribe_urls(
try:
response = (
retry_func(
lambda: session.get(subscribe_url, timeout=timeout),
lambda: session.get(
subscribe_url, timeout=constants.request_timeout
),
name=subscribe_url,
)
if retry
else session.get(subscribe_url, timeout=timeout)
else session.get(subscribe_url, timeout=constants.request_timeout)
)
except exceptions.Timeout:
print(f"Timeout on subscribe: {subscribe_url}")
@ -115,7 +110,7 @@ async def get_channels_by_subscribe_urls(
with ThreadPoolExecutor(max_workers=100) as executor:
futures = [
executor.submit(process_subscribe_channels, subscribe_url)
for subscribe_url in (urls if urls else subscribe_urls)
for subscribe_url in (urls if urls else constants.subscribe_urls)
]
for future in futures:
subscribe_results = merge_objects(subscribe_results, future.result())

@ -1,9 +1,10 @@
from utils.config import config, resource_path
import utils.constants as constants
from utils.constants import get_resolution_value
from utils.tools import (
check_url_by_patterns,
get_total_urls_from_info_list,
process_nested_dict,
get_resolution_value,
add_url_info,
remove_cache_info,
)
@ -107,20 +108,19 @@ def get_channel_items():
"""
Get the channel items from the source file
"""
user_source_file = config.get("Settings", "source_file", fallback="config/demo.txt")
user_source_file = resource_path(constants.source_file)
channels = defaultdict(lambda: defaultdict(list))
open_use_old_result = config.getboolean(
"Settings", "open_use_old_result", fallback=True
)
if os.path.exists(resource_path(user_source_file)):
with open(resource_path(user_source_file), "r", encoding="utf-8") as file:
channels = get_channel_data_from_file(channels, file, open_use_old_result)
if os.path.exists(user_source_file):
with open(user_source_file, "r", encoding="utf-8") as file:
channels = get_channel_data_from_file(
channels, file, constants.open_use_old_result
)
if open_use_old_result:
if constants.open_use_old_result:
result_cache_path = resource_path("output/result_cache.pkl")
if os.path.exists(result_cache_path):
with open(resource_path("output/result_cache.pkl"), "rb") as file:
with open(result_cache_path, "rb") as file:
old_result = pickle.load(file)
for cate, data in channels.items():
if cate in old_result:
@ -132,14 +132,11 @@ def get_channel_items():
return channels
open_keep_all = config.getboolean("Settings", "open_keep_all", fallback=False)
def format_channel_name(name):
"""
Format the channel name with sub and replace and lower
"""
if open_keep_all:
if constants.open_keep_all:
return name
cc = OpenCC("t2s")
name = cc.convert(name)
@ -187,7 +184,7 @@ def channel_name_is_equal(name1, name2):
"""
Check if the channel name is equal
"""
if open_keep_all:
if constants.open_keep_all:
return True
name1_format = format_channel_name(name1)
name2_format = format_channel_name(name2)
@ -265,21 +262,15 @@ def get_channel_multicast_region_type_list(result):
"""
Get the channel multicast region type list from result
"""
config_region_list = set(
region.strip()
for region in config.get(
"Settings", "multicast_region_list", fallback="全部"
).split(",")
if region.strip()
)
region_list = constants.multicast_region_list
region_type_list = {
(region, type)
for region_type in result.values()
for region, types in region_type.items()
if "all" in config_region_list
or "ALL" in config_region_list
or "全部" in config_region_list
or region in config_region_list
if "all" in region_list
or "ALL" in region_list
or "全部" in region_list
or region in region_list
for type in types
}
return list(region_type_list)
@ -290,7 +281,6 @@ def get_channel_multicast_result(result, search_result):
Get the channel multicast info result by result and search result
"""
info_result = {}
open_sort = config.getboolean("Settings", "open_sort", fallback=True)
for name, result_obj in result.items():
info_list = [
(
@ -299,7 +289,7 @@ def get_channel_multicast_result(result, search_result):
f"http://{url}/rtp/{ip}",
f"{result_region}{result_type}组播源|cache:{url}",
)
if open_sort
if constants.open_sort
else add_url_info(
f"http://{url}/rtp/{ip}", f"{result_region}{result_type}组播源"
)
@ -539,7 +529,7 @@ def append_total_data(*args, **kwargs):
"""
Append total channel data
"""
if open_keep_all:
if constants.open_keep_all:
append_all_method_data_keep_all(*args, **kwargs)
else:
append_all_method_data(*args, **kwargs)
@ -567,12 +557,10 @@ def append_all_method_data(
("subscribe", subscribe_result),
("online_search", online_search_result),
]:
if config.getboolean("Settings", f"open_{method}", fallback=None):
if constants.open_method[method]:
if (
method == "hotel_tonkiang" or method == "hotel_fofa"
) and config.getboolean(
"Settings", f"open_hotel", fallback=True
) == False:
) and constants.open_hotel == False:
continue
name_results = get_channel_results_by_name(name, result)
origin_method = (
@ -585,9 +573,7 @@ def append_all_method_data(
)
print(f"{method.capitalize()}:", len(name_results), end=", ")
total_channel_data_len = len(data.get(cate, {}).get(name, []))
if total_channel_data_len == 0 or config.getboolean(
"Settings", "open_use_old_result", fallback=True
):
if total_channel_data_len == 0 or constants.open_use_old_result:
append_data_to_info_data(
data,
cate,
@ -621,14 +607,10 @@ def append_all_method_data_keep_all(
("subscribe", subscribe_result),
("online_search", online_search_result),
]:
if result and config.getboolean(
"Settings", f"open_{method}", fallback=None
):
if result and constants.open_method[method]:
if (
method == "hotel_tonkiang" or method == "hotel_fofa"
) and config.getboolean(
"Settings", f"open_hotel", fallback=True
) == False:
) and constants.open_hotel == False:
continue
origin_method = (
"hotel"
@ -641,9 +623,7 @@ def append_all_method_data_keep_all(
data, cate, name, urls, origin=origin_method
)
print(name, f"{method.capitalize()}:", len(urls), end=", ")
if config.getboolean(
"Settings", "open_use_old_result", fallback=True
):
if constants.open_use_old_result:
old_info_list = channel_obj.get(name, [])
append_data_to_info_data(
data,
@ -699,20 +679,16 @@ async def process_sort_channel_list(data, ipv6=False, callback=None):
"""
Processs the sort channel list
"""
open_ffmpeg = config.getboolean("Settings", "open_ffmpeg", fallback=True)
ipv_type = config.get("Settings", "ipv_type", fallback="全部").lower()
open_filter_resolution = config.getboolean(
"Settings", "open_filter_resolution", fallback=True
open_ipv6 = (
"ipv6" in constants.ipv_type
or "all" in constants.ipv_type
or "全部" in constants.ipv_type
)
min_resolution = get_resolution_value(
config.get("Settings", "min_resolution", fallback="1920x1080")
)
open_ipv6 = "ipv6" in ipv_type or "all" in ipv_type or "全部" in ipv_type
ipv6_proxy = None if not open_ipv6 or ipv6 else "http://www.ipv6proxy.net/go.php?u="
ffmpeg_installed = is_ffmpeg_installed()
if open_ffmpeg and not ffmpeg_installed:
if constants.open_ffmpeg and not ffmpeg_installed:
print("FFmpeg is not installed, using requests for sorting.")
is_ffmpeg = open_ffmpeg and ffmpeg_installed
is_ffmpeg = constants.open_ffmpeg and ffmpeg_installed
semaphore = asyncio.Semaphore(5)
need_sort_data = copy.deepcopy(data)
process_nested_dict(need_sort_data, seen=set(), flag=r"cache:(.*)")
@ -725,8 +701,8 @@ async def process_sort_channel_list(data, ipv6=False, callback=None):
semaphore,
ffmpeg=is_ffmpeg,
ipv6_proxy=ipv6_proxy,
filter_resolution=open_filter_resolution,
min_resolution=min_resolution,
filter_resolution=constants.open_filter_resolution,
min_resolution=constants.min_resolution_value,
callback=callback,
)
)
@ -763,9 +739,9 @@ async def process_sort_channel_list(data, ipv6=False, callback=None):
response_time, resolution = cache
if response_time and response_time != float("inf"):
if resolution:
if open_filter_resolution:
if constants.open_filter_resolution:
resolution_value = get_resolution_value(resolution)
if resolution_value < min_resolution:
if resolution_value < constants.min_resolution_value:
continue
url = add_url_info(url, resolution)
append_data_to_info_data(
@ -785,18 +761,13 @@ def write_channel_to_file(items, data, ipv6=False, callback=None):
"""
Write channel to file
"""
open_update_time = config.getboolean("Settings", "open_update_time", fallback=True)
if open_update_time:
if constants.open_update_time:
now = datetime.datetime.now()
if os.environ.get("GITHUB_ACTIONS"):
now += datetime.timedelta(hours=8)
update_time = now.strftime("%Y-%m-%d %H:%M:%S")
update_channel_urls_txt("更新时间", f"{update_time}", ["url"])
result_items = (
data.items()
if config.getboolean("Settings", "open_keep_all", fallback=False)
else items
)
result_items = data.items() if constants.open_keep_all else items
for cate, channel_obj in result_items:
print(f"\n{cate}:", end=" ")
channel_obj_keys = channel_obj.keys()
@ -830,25 +801,19 @@ def get_multicast_fofa_search_urls():
"""
Get the fofa search urls for multicast
"""
config_region_list = [
region.strip()
for region in config.get(
"Settings", "multicast_region_list", fallback="全部"
).split(",")
if region.strip()
]
rtp_file_names = []
for filename in os.listdir(resource_path("config/rtp")):
if filename.endswith(".txt") and "_" in filename:
filename = filename.replace(".txt", "")
rtp_file_names.append(filename)
region_list = constants.multicast_region_list
region_type_list = [
(parts[0], parts[1])
for name in rtp_file_names
if (parts := name.split("_"))[0] in config_region_list
or "all" in config_region_list
or "ALL" in config_region_list
or "全部" in config_region_list
if (parts := name.split("_"))[0] in region_list
or "all" in region_list
or "ALL" in region_list
or "全部" in region_list
]
search_urls = []
for region, type in region_type_list:

186
utils/constants.py Normal file

@ -0,0 +1,186 @@
from utils.config import config
import re
def get_resolution_value(resolution_str):
"""
Get resolution value from string
"""
pattern = r"(\d+)[xX*](\d+)"
match = re.search(pattern, resolution_str)
if match:
width, height = map(int, match.groups())
return width * height
else:
return 0
open_update = config.getboolean("Settings", "open_update", fallback=True)
open_filter_resolution = config.getboolean(
"Settings", "open_filter_resolution", fallback=True
)
ipv_type = config.get("Settings", "ipv_type", fallback="全部").lower()
ipv_type_prefer = [
type.strip().lower()
for type in config.get("Settings", "ipv_type_prefer", fallback="ipv4").split(",")
]
ipv4_num = config.getint("Settings", "ipv4_num", fallback=15)
ipv6_num = config.getint("Settings", "ipv6_num", fallback=15)
ipv_limit = {
"ipv4": ipv4_num,
"ipv6": ipv6_num,
}
origin_type_prefer = [
origin.strip().lower()
for origin in config.get(
"Settings",
"origin_type_prefer",
fallback="subscribe,hotel,multicast,online_search",
).split(",")
if origin.strip().lower()
]
hotel_num = config.getint("Settings", "hotel_num", fallback=10)
multicast_num = config.getint("Settings", "multicast_num", fallback=10)
subscribe_num = config.getint("Settings", "subscribe_num", fallback=10)
online_search_num = config.getint("Settings", "online_search_num", fallback=10)
source_limits = {
"hotel": hotel_num,
"multicast": multicast_num,
"subscribe": subscribe_num,
"online_search": online_search_num,
}
min_resolution = config.get("Settings", "min_resolution", fallback="1920x1080")
min_resolution_value = get_resolution_value(
config.get("Settings", "min_resolution", fallback="1920x1080")
)
urls_limit = config.getint("Settings", "urls_limit", fallback=30)
open_url_info = config.getboolean("Settings", "open_url_info", fallback=True)
recent_days = config.getint("Settings", "recent_days", fallback=30)
domain_blacklist = [
domain.strip()
for domain in config.get("Settings", "domain_blacklist", fallback="").split(",")
if domain.strip()
]
url_keywords_blacklist = [
keyword.strip()
for keyword in config.get("Settings", "url_keywords_blacklist", fallback="").split(
","
)
if keyword.strip()
]
source_file = config.get("Settings", "source_file", fallback="config/demo.txt")
final_file = config.get("Settings", "final_file", fallback="output/result.txt")
open_m3u_result = config.getboolean("Settings", "open_m3u_result", fallback=True)
open_keep_all = config.getboolean("Settings", "open_keep_all", fallback=False)
open_subscribe = config.getboolean("Settings", f"open_subscribe", fallback=True)
open_hotel = config.getboolean("Settings", f"open_hotel", fallback=True)
open_hotel_fofa = config.getboolean("Settings", f"open_hotel_fofa", fallback=True)
open_hotel_tonkiang = config.getboolean(
"Settings", f"open_hotel_tonkiang", fallback=True
)
open_multicast = config.getboolean("Settings", f"open_multicast", fallback=True)
open_multicast_tonkiang = config.getboolean(
"Settings", "open_multicast_tonkiang", fallback=True
)
open_multicast_fofa = config.getboolean(
"Settings", "open_multicast_fofa", fallback=True
)
open_online_search = config.getboolean("Settings", f"open_online_search", fallback=True)
open_method = {
"subscribe": open_subscribe,
"hotel": open_hotel,
"multicast": open_multicast,
"online_search": open_online_search,
"hotel_fofa": open_hotel_fofa,
"hotel_tonkiang": open_hotel_tonkiang,
"multicast_fofa": open_multicast_fofa,
"multicast_tonkiang": open_multicast_tonkiang,
}
open_use_old_result = config.getboolean(
"Settings", "open_use_old_result", fallback=True
)
open_sort = config.getboolean("Settings", "open_sort", fallback=True)
open_ffmpeg = config.getboolean("Settings", "open_ffmpeg", fallback=True)
ipv_type = config.get("Settings", "ipv_type", fallback="全部").lower()
open_update_time = config.getboolean("Settings", "open_update_time", fallback=True)
multicast_region_list = [
region.strip()
for region in config.get(
"Settings", "multicast_region_list", fallback="全部"
).split(",")
if region.strip()
]
hotel_region_list = [
region.strip()
for region in config.get("Settings", "hotel_region_list", fallback="全部").split(
","
)
if region.strip()
]
request_timeout = config.getint("Settings", "request_timeout", fallback=10)
sort_timeout = config.getint("Settings", "sort_timeout", fallback=10)
open_proxy = config.getboolean("Settings", "open_proxy", fallback=False)
open_driver = config.getboolean("Settings", "open_driver", fallback=True)
hotel_page_num = config.getint("Settings", "hotel_page_num", fallback=1)
multicast_page_num = config.getint("Settings", "multicast_page_num", fallback=1)
online_search_page_num = config.getint("Settings", "online_search_page_num", fallback=1)
subscribe_urls = [
url.strip()
for url in config.get("Settings", "subscribe_urls", fallback="").split(",")
if url.strip()
]
response_time_weight = config.getfloat("Settings", "response_time_weight", fallback=0.5)
resolution_weight = config.getfloat("Settings", "resolution_weight", fallback=0.5)
open_update_time = config.getboolean("Settings", "open_update_time", fallback=True)
open_url_info = config.getboolean("Settings", "open_url_info", fallback=True)

@ -2,10 +2,9 @@ from time import sleep
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
from utils.config import config
import utils.constants as constants
max_retries = 2
timeout = config.getint("Settings", "request_timeout", fallback=10)
def retry_func(func, retries=max_retries, name=""):
@ -26,7 +25,9 @@ def retry_func(func, retries=max_retries, name=""):
raise Exception(f"Failed to connect to the {name} reached the maximum retries.")
def locate_element_with_retry(driver, locator, timeout=timeout, retries=max_retries):
def locate_element_with_retry(
driver, locator, timeout=constants.request_timeout, retries=max_retries
):
"""
Locate the element with retry
"""
@ -40,7 +41,7 @@ def locate_element_with_retry(driver, locator, timeout=timeout, retries=max_retr
def find_clickable_element_with_retry(
driver, locator, timeout=timeout, retries=max_retries
driver, locator, timeout=constants.request_timeout, retries=max_retries
):
"""
Find the clickable element with retry

@ -3,7 +3,8 @@ from time import time
import asyncio
import re
from utils.config import config
from utils.tools import is_ipv6, get_resolution_value, add_url_info, remove_cache_info
from utils.constants import get_resolution_value
from utils.tools import is_ipv6, add_url_info, remove_cache_info
import subprocess
timeout = config.getint("Settings", "sort_timeout", fallback=5)
@ -127,20 +128,23 @@ async def get_speed_by_info(
url, _, resolution, _ = url_info
url_info = list(url_info)
cache_key = None
url_is_ipv6 = is_ipv6(url)
if "$" in url:
url, cache_info = url.split("$", 1)
matcher = re.search(r"cache:(.*)", cache_info)
if matcher:
cache_key = matcher.group(1)
url_show_info = remove_cache_info(cache_info)
url_is_ipv6 = is_ipv6(url)
if url_is_ipv6:
url = add_url_info(url, "IPv6")
url_info[0] = url
if cache_key in speed_cache:
speed = speed_cache[cache_key][0]
url_info[2] = speed_cache[cache_key][1]
return (tuple(url_info), speed) if speed != float("inf") else float("inf")
if speed != float("inf"):
if url_show_info:
url_info[0] = add_url_info(url, url_show_info)
return (tuple(url_info), speed)
else:
return float("inf")
try:
if ipv6_proxy and url_is_ipv6:
url = ipv6_proxy + url

@ -5,7 +5,9 @@ import urllib.parse
import ipaddress
from urllib.parse import urlparse
import socket
from utils.config import config, resource_path
from utils.config import resource_path
from utils.constants import get_resolution_value
import utils.constants as constants
import re
from bs4 import BeautifulSoup
from flask import render_template_string, send_file
@ -70,7 +72,7 @@ def filter_by_date(data):
Filter by date and limit
"""
default_recent_days = 30
use_recent_days = config.getint("Settings", "recent_days", fallback=30)
use_recent_days = constants.recent_days
if not isinstance(use_recent_days, int) or use_recent_days <= 0:
use_recent_days = default_recent_days
start_date = datetime.datetime.now() - datetime.timedelta(days=use_recent_days)
@ -87,11 +89,10 @@ def filter_by_date(data):
else:
unrecent_data.append(item)
recent_data_len = len(recent_data)
urls_limit = config.getint("Settings", "urls_limit", fallback=30)
if recent_data_len == 0:
recent_data = unrecent_data
elif recent_data_len < urls_limit:
recent_data.extend(unrecent_data[: urls_limit - len(recent_data)])
elif recent_data_len < constants.urls_limit:
recent_data.extend(unrecent_data[: constants.urls_limit - len(recent_data)])
return recent_data
@ -109,116 +110,78 @@ def get_soup(source):
return soup
def get_resolution_value(resolution_str):
"""
Get resolution value from string
"""
pattern = r"(\d+)[xX*](\d+)"
match = re.search(pattern, resolution_str)
if match:
width, height = map(int, match.groups())
return width * height
else:
return 0
def get_total_urls_from_info_list(infoList, ipv6=False):
"""
Get the total urls from info list
"""
open_filter_resolution = config.getboolean(
"Settings", "open_filter_resolution", fallback=True
)
ipv_type_prefer = [
type.strip().lower()
for type in config.get("Settings", "ipv_type_prefer", fallback="ipv4").split(
","
)
]
ipv_limit = {
"ipv4": config.getint("Settings", "ipv4_num", fallback=15),
"ipv6": config.getint("Settings", "ipv6_num", fallback=15),
}
origin_type_prefer = [
origin.strip().lower()
for origin in config.get(
"Settings",
"origin_type_prefer",
fallback="subscribe,hotel,multicast,online_search",
).split(",")
]
source_limits = {
"hotel": config.getint("Settings", "hotel_num", fallback=10),
"multicast": config.getint("Settings", "multicast_num", fallback=10),
"subscribe": config.getint("Settings", "subscribe_num", fallback=10),
"online_search": config.getint("Settings", "online_search_num", fallback=10),
}
min_resolution = get_resolution_value(
config.get("Settings", "min_resolution", fallback="1920x1080")
)
ipv_type_prefer = list(constants.ipv_type_prefer)
if "自动" in ipv_type_prefer or "auto" in ipv_type_prefer or not ipv_type_prefer:
ipv_type_prefer = ["ipv6", "ipv4"] if ipv6 else ["ipv4", "ipv6"]
origin_type_prefer = constants.origin_type_prefer
categorized_urls = {
origin: {"ipv4": [], "ipv6": []} for origin in origin_type_prefer
}
for url, _, resolution, origin in infoList:
if open_filter_resolution and resolution:
if constants.open_filter_resolution and resolution:
resolution_value = get_resolution_value(resolution)
if resolution_value < min_resolution:
if resolution_value < constants.min_resolution_value:
continue
if not origin or origin.lower() not in origin_type_prefer:
if not origin or (origin.lower() not in origin_type_prefer):
continue
if origin == "subscribe" and "/rtp/" in url:
origin = "multicast"
if (
("ipv6" in ipv_type_prefer)
or "自动" in ipv_type_prefer
or "random" in ipv_type_prefer
) and "IPv6" in url:
url_is_ipv6 = is_ipv6(url)
if url_is_ipv6:
url += "|IPv6"
if url_is_ipv6:
categorized_urls[origin]["ipv6"].append(url)
else:
categorized_urls[origin]["ipv4"].append(url)
total_urls = []
ipv_num = {
"ipv4": 0,
"ipv6": 0,
}
if "自动" in ipv_type_prefer or "auto" in ipv_type_prefer:
ipv_type_prefer = ["ipv6", "ipv4"] if ipv6 else ["ipv4", "ipv6"]
urls_limit = constants.urls_limit
for origin in origin_type_prefer:
if len(total_urls) >= urls_limit:
break
for ipv_type in ipv_type_prefer:
if ipv_num[ipv_type] < ipv_limit[ipv_type]:
urls = categorized_urls[origin][ipv_type][: source_limits[origin]]
if len(total_urls) >= urls_limit:
break
if ipv_num[ipv_type] < constants.ipv_limit[ipv_type]:
limit = min(
constants.source_limits[origin] - ipv_num[ipv_type],
constants.ipv_limit[ipv_type] - ipv_num[ipv_type],
)
urls = categorized_urls[origin][ipv_type][:limit]
total_urls.extend(urls)
ipv_num[ipv_type] += len(urls)
else:
continue
urls_limit = config.getint("Settings", "urls_limit", fallback=30)
ipv_type_total = list(dict.fromkeys(ipv_type_prefer + ["ipv4", "ipv6"]))
if len(total_urls) < urls_limit:
for origin in origin_type_prefer:
for ipv_type in ipv_type_total:
if len(total_urls) < urls_limit:
extra_urls = categorized_urls[origin][ipv_type][
: source_limits[origin]
]
total_urls.extend(extra_urls)
total_urls = list(dict.fromkeys(total_urls))[:urls_limit]
ipv_num[ipv_type] += urls_limit - len(total_urls)
if len(total_urls) >= urls_limit:
break
if len(total_urls) >= urls_limit:
break
for ipv_type in ipv_type_total:
if len(total_urls) >= urls_limit:
break
extra_urls = categorized_urls[origin][ipv_type][
: constants.source_limits[origin]
]
total_urls.extend(extra_urls)
total_urls = list(dict.fromkeys(total_urls))[:urls_limit]
total_urls = list(dict.fromkeys(total_urls))[:urls_limit]
open_url_info = config.getboolean("Settings", "open_url_info", fallback=True)
if not open_url_info:
if not constants.open_url_info:
return [url.split("$", 1)[0] for url in total_urls]
else:
return total_urls
@ -229,12 +192,11 @@ def get_total_urls_from_sorted_data(data):
Get the total urls with filter by date and depulicate from sorted data
"""
total_urls = []
urls_limit = config.getint("Settings", "urls_limit", fallback=30)
if len(data) > urls_limit:
if len(data) > constants.urls_limit:
total_urls = [url for (url, _, _, _), _ in filter_by_date(data)]
else:
total_urls = [url for (url, _, _, _), _ in data]
return list(dict.fromkeys(total_urls))[:urls_limit]
return list(dict.fromkeys(total_urls))[: constants.urls_limit]
def is_ipv6(url):
@ -266,14 +228,12 @@ def check_ipv6_support():
return False
ipv_type = config.get("Settings", "ipv_type", fallback="全部").lower()
def check_url_ipv_type(url):
"""
Check if the url is compatible with the ipv type in the config
"""
ipv6 = is_ipv6(url)
ipv_type = constants.ipv_type
return (
(ipv_type == "ipv4" and not ipv6)
or (ipv_type == "ipv6" and ipv6)
@ -286,12 +246,10 @@ def check_by_domain_blacklist(url):
"""
Check by domain blacklist
"""
domain_blacklist = [
(parsed_domain.netloc if parsed_domain.scheme else stripped_domain)
for domain in config.get("Settings", "domain_blacklist", fallback="").split(",")
if (stripped_domain := domain.strip())
and (parsed_domain := urlparse(stripped_domain))
]
domain_blacklist = {
(urlparse(domain).netloc if urlparse(domain).scheme else domain)
for domain in constants.domain_blacklist
}
return urlparse(url).netloc not in domain_blacklist
@ -299,14 +257,7 @@ def check_by_url_keywords_blacklist(url):
"""
Check by URL blacklist keywords
"""
url_keywords_blacklist = [
keyword.strip()
for keyword in config.get(
"Settings", "url_keywords_blacklist", fallback=""
).split(",")
if keyword.strip()
]
return not any(keyword in url for keyword in url_keywords_blacklist)
return not any(keyword in url for keyword in constants.url_keywords_blacklist)
def check_url_by_patterns(url):
@ -379,9 +330,9 @@ def convert_to_m3u():
"""
Convert result txt to m3u format
"""
user_final_file = config.get("Settings", "final_file", fallback="output/result.txt")
if os.path.exists(resource_path(user_final_file)):
with open(resource_path(user_final_file), "r", encoding="utf-8") as file:
user_final_file = resource_path(constants.final_file)
if os.path.exists(user_final_file):
with open(user_final_file, "r", encoding="utf-8") as file:
m3u_output = '#EXTM3U x-tvg-url="https://live.fanmingming.com/e.xml"\n'
current_group = None
for line in file:
@ -406,7 +357,7 @@ def convert_to_m3u():
if current_group:
m3u_output += f' group-title="{current_group}"'
m3u_output += f",{original_channel_name}\n{channel_link}\n"
m3u_file_path = os.path.splitext(resource_path(user_final_file))[0] + ".m3u"
m3u_file_path = os.path.splitext(user_final_file)[0] + ".m3u"
with open(m3u_file_path, "w", encoding="utf-8") as m3u_file:
m3u_file.write(m3u_output)
print(f"Result m3u file generated at: {m3u_file_path}")
@ -416,9 +367,9 @@ def get_result_file_content(show_result=False):
"""
Get the content of the result file
"""
user_final_file = config.get("Settings", "final_file", fallback="output/result.txt")
if config.getboolean("Settings", "open_m3u_result", fallback=True):
user_final_file = os.path.splitext(resource_path(user_final_file))[0] + ".m3u"
user_final_file = resource_path(constants.final_file)
if constants.open_m3u_result:
user_final_file = os.path.splitext(user_final_file)[0] + ".m3u"
if show_result == False:
return send_file(user_final_file, as_attachment=True)
with open(user_final_file, "r", encoding="utf-8") as file: