Merge pull request from Guovin/master

Refactor
This commit is contained in:
Govin 2024-07-08 10:19:26 +08:00 committed by GitHub
commit 692f238ab2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 1259 additions and 1130 deletions

@ -2,7 +2,7 @@ name: 'update schedule'
on:
schedule:
- cron: '0 0 * * *'
- cron: '0 22 * * *'
workflow_dispatch:
branches:
- master

@ -13,7 +13,7 @@ RUN sed -i 's@deb.debian.org@mirrors.aliyun.com@g' /etc/apt/sources.list \
RUN apt-get update && apt-get install -y chromium chromium-driver cron
RUN (crontab -l 2>/dev/null; echo "0 0 * * * cd /app && pipenv run python main.py scheduled_task") | crontab -
RUN (crontab -l 2>/dev/null; echo "0 22 * * * cd /app && pipenv run python main.py scheduled_task") | crontab -
EXPOSE 8000

@ -14,7 +14,6 @@ requests = "*"
feedparser = "*"
pytz = "*"
selenium = "*"
selenium-stealth = "*"
bs4 = "*"
tqdm = "*"
async-timeout = "*"

10
Pipfile.lock generated

@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
"sha256": "513ed2d5439904fe7a8043886b946726a4e704a202910667678927224f65e98e"
"sha256": "f2e414a925be2ea62fcf7660090d518ea80826d1e510d62732a75b5894b58730"
},
"pipfile-spec": 6,
"requires": {
@ -860,14 +860,6 @@
"markers": "python_version >= '3.8'",
"version": "==4.22.0"
},
"selenium-stealth": {
"hashes": [
"sha256:b62da5452aa4a84f29a4dfb21a9696aff20788a7c570dd0b81bc04a940848b97"
],
"index": "aliyun",
"markers": "python_version >= '3' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'",
"version": "==1.0.6"
},
"setuptools": {
"hashes": [
"sha256:937a48c7cdb7a21eb53cd7f9b59e525503aa8abaf3584c730dc5f7a5bec3a650",

@ -9,7 +9,7 @@
- 自定义模板,生成您想要的频道分类与频道顺序
- 支持多种获取源方式:线上检索、组播源、酒店源、订阅源
- 接口测速验效,响应时间、分辨率优先级,过滤无效接口
- 定时执行,北京时间每日 8:00 执行更新
- 定时执行,北京时间每日 6:00 执行更新
- 支持多种运行方式工作流、命令行、界面软件、Docker
- 更多功能请见[配置参数](./docs/config.md)
@ -62,22 +62,6 @@ Fork 本项目并开启工作流更新
[更新日志](./CHANGELOG.md)
## 免责声明
本项目是为了提供编程学习和研究的资源。项目中收集的数据来源于网络,开发者不对数据的准确性、完整性或可靠性做任何保证。
开发者不对任何可能因使用这些代码或数据而产生的任何直接或间接损失负责。使用者应自行判断其使用的合法性和风险。
本项目的代码和数据仅供学习和研究使用,不得用于任何商业用途。任何人或组织在使用时,应遵守相关法律法规,尊重并保护开发者的权益。
如果您使用了本项目的代码或数据,即表示您已了解并同意此免责声明。如果您不同意此免责声明,您应立即停止使用本项目的代码和数据。
此外,本项目的代码和数据可能会不定期进行更新,但不保证更新的及时性和准确性,也不保证代码的稳定性和功能性。
在任何情况下,因使用或无法使用本项目的代码或数据所产生的任何损害或其他责任,开发者和任何贡献者都不承担任何责任。
使用本项目的代码或数据即表示您已经了解并接受这些条款。
## 许可证
[MIT](./LICENSE) License © 2024-PRESENT [Govin](https://github.com/guovin)

@ -9,7 +9,7 @@ Customize channel menus and automatically obtain and update the latest live sour
- Custom templates for creating desired channel categories and order
- Supports multiple source acquisition methods: online search, multicast source, hotel source, subscription source
- Interface speed testing and verification, with priority on response time and resolution, filtering out ineffective interfaces
- Scheduled execution at 8:00 AM Beijing time daily
- Scheduled execution at 6:00 AM Beijing time daily
- Supports various execution methods: workflows, command line, GUI software, Docker
- For more features, see [Config parameter](./docs/config_en.md)
@ -62,22 +62,6 @@ If you don't want to bother, and my configuration just meets your needs, you can
[Changelog](./CHANGELOG.md)
## Disclaimer
This project is provided for programming learning and research resources. The data collected in the project comes from the network, and the developer does not make any guarantees about the accuracy, completeness, or reliability of the data.
The developer is not responsible for any direct or indirect losses that may be caused by the use of these codes or data. Users should judge the legality and risk of their use by themselves.
The code and data of this project are only for learning and research use, and must not be used for any commercial purposes. Anyone or organization should abide by relevant laws and regulations when using it, respect and protect the rights and interests of developers.
If you use the code or data of this project, it means that you have understood and agreed to this disclaimer. If you do not agree with this disclaimer, you should stop using the code and data of this project immediately.
In addition, the code and data of this project may be updated irregularly, but there is no guarantee of the timeliness and accuracy of the update, nor the stability and functionality of the code.
In any case, the developer and any contributor do not assume any responsibility for any damage or other liability caused by the use or inability to use the code or data of this project.
Using the code or data of this project means that you have understood and accepted these terms.
## License
[MIT](./LICENSE) License © 2024-PRESENT [Govin](https://github.com/guovin)

@ -176,12 +176,12 @@ https://mirror.ghproxy.com/raw.githubusercontent.com/您的github用户名/仓
如果访问该链接能正常返回更新后的接口内容,说明您的直播源接口链接已经大功告成了!将该链接复制粘贴到 TVBox 等软件配置栏中即可使用~
- 注意:除了首次执行工作流需要您手动触发,后续执行(默认北京时间每日 8:00将自动触发。如果您修改了模板或配置文件想立刻执行更新可手动触发2中的 Run workflow 即可。
- 注意:除了首次执行工作流需要您手动触发,后续执行(默认北京时间每日 6:00将自动触发。如果您修改了模板或配置文件想立刻执行更新可手动触发2中的 Run workflow 即可。
## 步骤七:修改工作流更新频率
![.github/workflows/main.yml](./images/schedule-cron.png '.github/workflows/main.yml')
如果您想修改更新频率(默认北京时间每日 8:00可修改 on:schedule:- cron 字段。
如果您想修改更新频率(默认北京时间每日 6:00可修改 on:schedule:- cron 字段。
### 1. 强烈不建议修改,因为短时间内的接口内容并无差异,过高的更新频率与高耗时运行的工作流都有可能被判定为资源滥用,导致仓库与账户被封禁的风险。

@ -176,12 +176,12 @@ https://mirror.ghproxy.com/raw.githubusercontent.com/your github username/reposi
If you can access this link and it returns the updated interface content, then your live source interface link has been successfully created! Simply copy and paste this link into software like TVBox in the configuration field to use~
- Note: Except for the first execution of the workflow, which requires you to manually trigger it, subsequent executions (default: daily at 8:00 am Beijing time) will be automatically triggered. If you have modified the template or configuration files and want to execute the update immediately, you can manually trigger (2) Run workflow.
- Note: Except for the first execution of the workflow, which requires you to manually trigger it, subsequent executions (default: daily at 6:00 am Beijing time) will be automatically triggered. If you have modified the template or configuration files and want to execute the update immediately, you can manually trigger (2) Run workflow.
## Step 7: Modify Workflow Update Frequency
![.github/workflows/main.yml](./images/schedule-cron.png '.github/workflows/main.yml')
If you want to modify the update frequency (default: daily at 8:00 am Beijing time), you can modify the on:schedule:- cron field.
If you want to modify the update frequency (default: daily at 6:00 am Beijing time), you can modify the on:schedule:- cron field.
### 1. It is strongly discouraged to make modifications, as there is no difference in the content of the interface in a short period of time. Both too frequent updates and high-consumption running workflows may be judged as resource abuse, leading to the risk of the repository and account being banned.

0
driver/__init__.py Normal file

28
driver/setup.py Normal file

@ -0,0 +1,28 @@
from selenium import webdriver
from utils.config import get_config
config = get_config()
def setup_driver(proxy=None):
"""
Setup the driver for selenium
"""
options = webdriver.ChromeOptions()
options.add_argument("start-maximized")
options.add_argument("--headless")
options.add_argument("--disable-gpu")
options.add_experimental_option("excludeSwitches", ["enable-logging"])
options.add_experimental_option("useAutomationExtension", False)
options.add_argument("blink-settings=imagesEnabled=false")
options.add_argument("--log-level=3")
options.add_argument("--ignore-certificate-errors")
options.add_argument("--allow-running-insecure-content")
options.add_argument("blink-settings=imagesEnabled=false")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
options.add_argument("--disable-extensions")
if proxy:
options.add_argument("--proxy-server=%s" % proxy)
driver = webdriver.Chrome(options=options)
return driver

1
fofa/__init__.py Normal file

@ -0,0 +1 @@
from .request import get_channels_by_fofa

114
fofa/request.py Normal file

@ -0,0 +1,114 @@
from utils.config import get_config
from tqdm.asyncio import tqdm_asyncio
from time import time
from requests import get
from concurrent.futures import ThreadPoolExecutor
import fofa_map
from driver.setup import setup_driver
import re
from utils.retry import retry_func
from utils.channel import format_channel_name
from utils.tools import merge_objects, get_pbar_remaining
from proxy import get_proxy
config = get_config()
timeout = 10
def get_fofa_urls_from_region_list():
"""
Get the FOFA url from region
"""
region_list = getattr(config, "region_list", [])
urls = []
region_url = getattr(fofa_map, "region_url")
if "all" in region_list:
urls = [url for url in region_url.values() if url]
else:
for region in region_list:
if region in region_url:
urls.append(region_url[region])
return urls
async def get_channels_by_fofa(callback):
"""
Get the channel by FOFA
"""
fofa_urls = get_fofa_urls_from_region_list()
fofa_urls_len = len(fofa_urls)
pbar = tqdm_asyncio(total=fofa_urls_len, desc="Processing multicast")
start_time = time()
fofa_results = {}
callback(f"正在获取组播源更新, 共{fofa_urls_len}个地区", 0)
proxy = None
if config.open_proxy:
proxy = await get_proxy(fofa_urls[0], best=True, with_test=True)
driver = setup_driver(proxy)
def process_fofa_channels(fofa_url, fofa_urls_len):
try:
retry_func(lambda: driver.get(fofa_url), name=fofa_url)
fofa_source = re.sub(r"<!--.*?-->", "", driver.page_source, flags=re.DOTALL)
urls = set(re.findall(r"https?://[\w\.-]+:\d+", fofa_source))
with ThreadPoolExecutor(max_workers=100) as executor:
futures = [executor.submit(process_fofa_json_url, url) for url in urls]
for future in futures:
merge_objects(fofa_results, future.result())
except Exception as e:
print(e)
finally:
pbar.update()
remain = fofa_urls_len - pbar.n
callback(
f"正在获取组播源更新, 剩余{remain}个地区待获取, 预计剩余时间: {get_pbar_remaining(pbar, start_time)}",
int((pbar.n / fofa_urls_len) * 100),
)
if config.open_online_search and pbar.n / fofa_urls_len == 1:
callback("正在获取在线搜索结果, 请耐心等待", 0)
for fofa_url in fofa_urls:
process_fofa_channels(fofa_url, fofa_urls_len)
driver.quit()
pbar.close()
return fofa_results
def process_fofa_json_url(url):
"""
Process the FOFA json url
"""
channels = {}
try:
final_url = url + "/iptv/live/1000.json?key=txiptv"
# response = retry_func(
# lambda: get(final_url, timeout=timeout),
# name=final_url,
# )
response = get(final_url, timeout=timeout)
try:
json_data = response.json()
if json_data["code"] == 0:
try:
for item in json_data["data"]:
if isinstance(item, dict):
item_name = format_channel_name(item.get("name"))
item_url = item.get("url").strip()
if item_name and item_url:
total_url = url + item_url
if item_name not in channels:
channels[item_name] = [total_url]
else:
channels[item_name].append(total_url)
except Exception as e:
# print(f"Error on fofa: {e}")
pass
except Exception as e:
# print(f"{url}: {e}")
pass
except Exception as e:
# print(f"{url}: {e}")
pass
finally:
return channels

243
main.py

@ -1,22 +1,20 @@
import asyncio
from utils import (
from utils.config import get_config
from utils.channel import (
get_channel_items,
update_channel_urls_txt,
append_data_to_info_data,
append_all_method_data,
sort_channel_list,
write_channel_to_file,
)
from utils.tools import (
update_file,
sort_urls_by_speed_and_resolution,
get_total_urls_from_info_list,
get_channels_by_subscribe_urls,
check_url_by_patterns,
get_channels_by_fofa,
get_channels_by_online_search,
format_channel_name,
resource_path,
load_external_config,
get_pbar_remaining,
get_ip_address,
)
import logging
from logging.handlers import RotatingFileHandler
from subscribe import get_channels_by_subscribe_urls
from fofa import get_channels_by_fofa
from online_search import get_channels_by_online_search
import os
from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio
@ -24,13 +22,7 @@ from time import time
from flask import Flask, render_template_string
import sys
config_path = resource_path("user_config.py")
default_config_path = resource_path("config.py")
config = (
load_external_config("user_config.py")
if os.path.exists(config_path)
else load_external_config("config.py")
)
config = get_config()
app = Flask(__name__)
@ -49,146 +41,43 @@ class UpdateSource:
self.run_ui = False
self.tasks = []
self.channel_items = get_channel_items()
self.results = {}
self.subscribe_result = {}
self.multicast_result = {}
self.online_search_result = {}
self.channel_data = {}
self.pbar = None
self.total = 0
self.start_time = None
def check_info_data(self, cate, name):
if self.channel_data.get(cate) is None:
self.channel_data[cate] = {}
if self.channel_data[cate].get(name) is None:
self.channel_data[cate][name] = []
def append_data_to_info_data(self, cate, name, data, check=True):
self.check_info_data(cate, name)
for url, date, resolution in data:
if (url and not check) or (url and check and check_url_by_patterns(url)):
self.channel_data[cate][name].append((url, date, resolution))
async def sort_channel_list(self, cate, name, info_list):
try:
sorted_data = await sort_urls_by_speed_and_resolution(info_list)
if sorted_data:
self.check_info_data(cate, name)
self.channel_data[cate][name] = []
for (
url,
date,
resolution,
), response_time in sorted_data:
logging.info(
f"Name: {name}, URL: {url}, Date: {date}, Resolution: {resolution}, Response Time: {response_time}ms"
)
data = [
(url, date, resolution)
for (url, date, resolution), _ in sorted_data
]
self.append_data_to_info_data(cate, name, data, False)
except Exception as e:
logging.error(f"Error: {e}")
finally:
self.pbar.update()
self.pbar.set_description(
f"Sorting, {self.pbar.total - self.pbar.n} channels remaining"
)
self.update_progress(
f"正在测速排序, 剩余{self.pbar.total - self.pbar.n}个频道, 预计剩余时间: {get_pbar_remaining(self.pbar, self.start_time)}",
int((self.pbar.n / self.total) * 100),
)
def process_channel(self):
for cate, channel_obj in self.channel_items.items():
for name, old_urls in channel_obj.items():
formatName = format_channel_name(name)
if config.open_subscribe:
self.append_data_to_info_data(
cate, name, self.results["open_subscribe"].get(formatName, [])
)
print(
name,
"subscribe num:",
len(self.results["open_subscribe"].get(formatName, [])),
)
if config.open_multicast:
self.append_data_to_info_data(
cate, name, self.results["open_multicast"].get(formatName, [])
)
print(
name,
"multicast num:",
len(self.results["open_multicast"].get(formatName, [])),
)
if config.open_online_search:
self.append_data_to_info_data(
cate,
name,
self.results["open_online_search"].get(formatName, []),
)
print(
name,
"online search num:",
len(self.results["open_online_search"].get(formatName, [])),
)
print(
name,
"total num:",
len(self.channel_data.get(cate, {}).get(name, [])),
)
if len(self.channel_data.get(cate, {}).get(name, [])) == 0:
self.append_data_to_info_data(
cate, name, [(url, None, None) for url in old_urls]
)
def write_channel_to_file(self):
self.pbar = tqdm(total=self.total)
self.pbar.set_description(f"Writing, {self.total} channels remaining")
self.start_time = time()
for cate, channel_obj in self.channel_items.items():
for name in channel_obj.keys():
info_list = self.channel_data.get(cate, {}).get(name, [])
try:
channel_urls = get_total_urls_from_info_list(info_list)
print("write:", cate, name, "num:", len(channel_urls))
update_channel_urls_txt(cate, name, channel_urls)
finally:
self.pbar.update()
self.pbar.set_description(
f"Writing, {self.pbar.total - self.pbar.n} channels remaining"
)
self.update_progress(
f"正在写入结果, 剩余{self.pbar.total - self.pbar.n}个接口, 预计剩余时间: {get_pbar_remaining(self.pbar, self.start_time)}",
int((self.pbar.n / self.total) * 100),
)
async def visit_page(self, channel_names=None):
task_dict = {
"open_subscribe": get_channels_by_subscribe_urls,
"open_multicast": get_channels_by_fofa,
"open_online_search": get_channels_by_online_search,
}
for config_name, task_func in task_dict.items():
if getattr(config, config_name):
task = None
if config_name == "open_subscribe" or config_name == "open_multicast":
task = asyncio.create_task(task_func(self.update_progress))
else:
task = asyncio.create_task(
task_func(channel_names, self.update_progress)
)
if task:
self.tasks.append(task)
task_results = await tqdm_asyncio.gather(*self.tasks, disable=True)
self.tasks = []
for i, config_name in enumerate(
[name for name in task_dict if getattr(config, name)]
):
self.results[config_name] = task_results[i]
if config.open_subscribe:
subscribe_task = asyncio.create_task(
get_channels_by_subscribe_urls(self.update_progress)
)
self.tasks.append(subscribe_task)
self.subscribe_result = await subscribe_task
if config.open_multicast:
multicast_task = asyncio.create_task(
get_channels_by_fofa(self.update_progress)
)
self.tasks.append(multicast_task)
self.multicast_result = await multicast_task
if config.open_online_search:
online_search_task = asyncio.create_task(
get_channels_by_online_search(channel_names, self.update_progress)
)
self.tasks.append(online_search_task)
self.online_search_result = await online_search_task
def pbar_update(self, name=""):
self.pbar.update()
self.update_progress(
f"正在进行{name}, 剩余{self.pbar.total - self.pbar.n}个接口, 预计剩余时间: {get_pbar_remaining(self.pbar, self.start_time)}",
int((self.pbar.n / self.total) * 100),
)
async def main(self):
try:
self.tasks = []
channel_names = [
name
for channel_obj in self.channel_items.values()
@ -196,29 +85,53 @@ class UpdateSource:
]
self.total = len(channel_names)
await self.visit_page(channel_names)
self.process_channel()
self.tasks = []
self.channel_data = append_all_method_data(
self.channel_items.items(),
self.channel_data,
self.subscribe_result,
self.multicast_result,
self.online_search_result,
)
if config.open_sort:
semaphore = asyncio.Semaphore(100)
self.tasks = [
asyncio.create_task(self.sort_channel_list(cate, name, info_list))
asyncio.create_task(
sort_channel_list(
semaphore,
cate,
name,
info_list,
lambda: self.pbar_update("测速排序"),
)
)
for cate, channel_obj in self.channel_data.items()
for name, info_list in channel_obj.items()
]
self.pbar = tqdm_asyncio(total=len(self.tasks))
self.pbar.set_description(
f"Sorting, {len(self.tasks)} channels remaining"
)
self.pbar = tqdm_asyncio(total=len(self.tasks), desc="Sorting")
self.update_progress(
f"正在测速排序, 共{len(self.tasks)}个频道",
int((self.pbar.n / len(self.tasks)) * 100),
)
self.start_time = time()
sort_results = await tqdm_asyncio.gather(*self.tasks, disable=True)
self.channel_data = {}
await tqdm_asyncio.gather(*self.tasks, disable=True)
self.write_channel_to_file()
for result in sort_results:
if result:
cate = result.get("cate")
name = result.get("name")
data = result.get("data")
self.channel_data = append_data_to_info_data(
self.channel_data, cate, name, data, False
)
self.pbar = tqdm(total=self.total, desc="Writing")
self.start_time = time()
write_channel_to_file(
self.channel_items.items(),
self.channel_data,
lambda: self.pbar_update("写入结果"),
)
self.pbar.close()
for handler in logging.root.handlers[:]:
handler.close()
logging.root.removeHandler(handler)
user_final_file = getattr(config, "final_file", "result.txt")
update_file(user_final_file, "result_new.txt")
if config.open_sort:
@ -247,12 +160,6 @@ class UpdateSource:
self.update_progress = callback or default_callback
self.run_ui = True if callback else False
handler = RotatingFileHandler("result_new.log", encoding="utf-8")
logging.basicConfig(
handlers=[handler],
format="%(message)s",
level=logging.INFO,
)
await self.main()
if self.run_ui:
app.run(host="0.0.0.0", port=8000)

@ -0,0 +1 @@
from .request import get_channels_by_online_search

157
online_search/request.py Normal file

@ -0,0 +1,157 @@
from asyncio import create_task, gather
from utils.speed import get_speed
from utils.channel import format_channel_name, get_results_from_soup
from utils.tools import check_url_by_patterns, get_pbar_remaining, get_soup
from utils.config import get_config
from proxy import get_proxy
from time import time, sleep
from driver.setup import setup_driver
from utils.retry import (
retry_func,
locate_element_with_retry,
find_clickable_element_with_retry,
)
from selenium.webdriver.common.by import By
from tqdm.asyncio import tqdm_asyncio
config = get_config()
async def use_accessible_url(callback):
"""
Check if the url is accessible
"""
callback(f"正在获取最优的在线检索节点", 0)
baseUrl1 = "https://www.foodieguide.com/iptvsearch/"
baseUrl2 = "http://tonkiang.us/"
task1 = create_task(get_speed(baseUrl1, timeout=30))
task2 = create_task(get_speed(baseUrl2, timeout=30))
task_results = await gather(task1, task2)
callback(f"获取在线检索节点完成", 100)
if task_results[0] == float("inf") and task_results[1] == float("inf"):
return None
if task_results[0] < task_results[1]:
return baseUrl1
else:
return baseUrl2
def search_submit(driver, name):
"""
Input key word and submit with driver
"""
search_box = locate_element_with_retry(driver, (By.XPATH, '//input[@type="text"]'))
if not search_box:
return
search_box.clear()
search_box.send_keys(name)
submit_button = find_clickable_element_with_retry(
driver, (By.XPATH, '//input[@type="submit"]')
)
if not submit_button:
return
sleep(3)
driver.execute_script("arguments[0].click();", submit_button)
async def get_channels_by_online_search(names, callback):
"""
Get the channels by online search
"""
channels = {}
pageUrl = await use_accessible_url(callback)
if not pageUrl:
return channels
proxy = None
if config.open_proxy:
proxy = await get_proxy(pageUrl, best=True, with_test=True)
start_time = time()
driver = setup_driver(proxy)
def process_channel_by_online_search(name):
info_list = []
try:
retry_func(lambda: driver.get(pageUrl), name=f"online search:{name}")
search_submit(driver, name)
isFavorite = name in config.favorite_list
pageNum = (
config.favorite_page_num if isFavorite else config.default_page_num
)
retry_limit = 3
for page in range(1, pageNum + 1):
retries = 0
while retries < retry_limit:
try:
if page > 1:
page_link = find_clickable_element_with_retry(
driver,
(
By.XPATH,
f'//a[contains(@href, "={page}") and contains(@href, "{name}")]',
),
)
if not page_link:
break
sleep(3)
driver.execute_script("arguments[0].click();", page_link)
sleep(3)
soup = get_soup(driver.page_source)
if soup:
results = get_results_from_soup(soup, name)
print(name, "page:", page, "results num:", len(results))
if len(results) == 0:
print(
f"{name}:No results found, refreshing page and retrying..."
)
driver.refresh()
retries += 1
continue
elif len(results) <= 3:
next_page_link = find_clickable_element_with_retry(
driver,
(
By.XPATH,
f'//a[contains(@href, "={page+1}") and contains(@href, "{name}")]',
),
retries=1,
)
if next_page_link:
search_submit(driver, name)
retries += 1
continue
for result in results:
url, date, resolution = result
if url and check_url_by_patterns(url):
info_list.append((url, date, resolution))
break
else:
print(
f"{name}:No results found, refreshing page and retrying..."
)
driver.refresh()
retries += 1
continue
except Exception as e:
print(f"{name}:Error on page {page}: {e}")
break
if retries == retry_limit:
print(f"{name}:Reached retry limit, moving to next page")
except Exception as e:
print(f"{name}:Error on search: {e}")
pass
finally:
channels[format_channel_name(name)] = info_list
pbar.update()
callback(
f"正在线上查询更新, 剩余{names_len - pbar.n}个频道待查询, 预计剩余时间: {get_pbar_remaining(pbar, start_time)}",
int((pbar.n / names_len) * 100),
)
names_len = len(names)
pbar = tqdm_asyncio(total=names_len, desc="Online search")
callback(f"正在线上查询更新, 共{names_len}个频道", 0)
for name in names:
process_channel_by_online_search(name)
driver.quit()
pbar.close()
return channels

22
proxy/__init__.py Normal file

@ -0,0 +1,22 @@
from .request import get_proxy_list, get_proxy_list_with_test
proxy_list = []
proxy_list_test = []
proxy_index = 0
async def get_proxy(url=None, best=False, with_test=False):
"""
Get the proxy
"""
global proxy_list, proxy_list_test, proxy_index
if not proxy_list:
proxy_list = get_proxy_list(3)
if not proxy_list_test or with_test:
proxy_list_test = await get_proxy_list_with_test(url or "https://www.baidu.com", proxy_list)
if not proxy_list_test:
return None
if best:
return proxy_list_test[0]
else:
proxy = proxy_list_test[proxy_index]
proxy_index = (proxy_index + 1) % len(proxy_list_test)
return proxy

87
proxy/request.py Normal file

@ -0,0 +1,87 @@
from asyncio import Semaphore
import re
from bs4 import BeautifulSoup
from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio
from driver.setup import setup_driver
from utils.retry import retry_func
from time import sleep
from utils.speed import get_speed
def get_proxy_list(page_count=1):
"""
Get proxy list, parameter page_count is the number of pages to get
"""
url_pattern = [
"https://www.zdaye.com/free/{}/",
"https://www.kuaidaili.com/free/inha/{}/",
"https://www.kuaidaili.com/free/intr/{}/",
]
proxy_list = []
urls = []
for page_index in range(1, page_count + 1):
for pattern in url_pattern:
url = pattern.format(page_index)
urls.append(url)
pbar = tqdm(total=len(urls), desc="Getting proxy list")
driver = setup_driver()
def get_proxy(url):
try:
url = pattern.format(page_index)
retry_func(lambda: driver.get(url), name=url)
sleep(1)
source = re.sub(
r"<!--.*?-->",
"",
driver.page_source,
flags=re.DOTALL,
)
soup = BeautifulSoup(source, "html.parser")
table = soup.find("table")
trs = table.find_all("tr") if table else []
for tr in trs[1:]:
tds = tr.find_all("td")
ip = tds[0].get_text().strip()
port = tds[1].get_text().strip()
proxy = f"http://{ip}:{port}"
proxy_list.append(proxy)
finally:
pbar.update()
for url in urls:
get_proxy(url)
driver.quit()
pbar.close()
return proxy_list
async def get_proxy_list_with_test(base_url, proxy_list):
"""
Get the proxy list with speed test
"""
if not proxy_list:
return []
semaphore = Semaphore(100)
async def get_speed_task(url, timeout, proxy):
async with semaphore:
return await get_speed(url, timeout=timeout, proxy=proxy)
response_times = await tqdm_asyncio.gather(
*(get_speed_task(base_url, timeout=30, proxy=url) for url in proxy_list),
desc="Testing proxy speed",
)
proxy_list_with_test = [
(proxy, response_time)
for proxy, response_time in zip(proxy_list, response_times)
if response_time != float("inf")
]
if not proxy_list_with_test:
print("No valid proxy found")
return []
proxy_list_with_test.sort(key=lambda x: x[1])
proxy_urls = [url for url, _ in proxy_list_with_test]
print(f"Valid proxy found: {len(proxy_urls)}")
return proxy_urls

1
subscribe/__init__.py Normal file

@ -0,0 +1 @@
from .request import get_channels_by_subscribe_urls

80
subscribe/request.py Normal file

@ -0,0 +1,80 @@
from utils.config import get_config
from tqdm.asyncio import tqdm_asyncio
from time import time
from requests import get, exceptions
from utils.retry import retry_func
import re
from utils.channel import format_channel_name
from utils.tools import merge_objects, get_pbar_remaining
from concurrent.futures import ThreadPoolExecutor
config = get_config()
timeout = 10
async def get_channels_by_subscribe_urls(callback):
"""
Get the channels by subscribe urls
"""
subscribe_results = {}
pattern = r"^(.*?),(?!#genre#)(.*?)$"
subscribe_urls_len = len(config.subscribe_urls)
pbar = tqdm_asyncio(total=subscribe_urls_len, desc="Processing subscribe")
start_time = time()
callback(f"正在获取订阅源更新, 共{subscribe_urls_len}个订阅源", 0)
def process_subscribe_channels(subscribe_url):
channels = {}
try:
response = None
try:
response = retry_func(
lambda: get(subscribe_url, timeout=timeout),
name=subscribe_url,
)
except exceptions.Timeout:
print(f"Timeout on subscribe: {subscribe_url}")
if response:
content = response.text
lines = content.split("\n")
for line in lines:
matcher = re.match(pattern, line)
if matcher is not None:
key = matcher.group(1)
resolution_match = re.search(r"_(\((.*?)\))", key)
resolution = (
resolution_match.group(2)
if resolution_match is not None
else None
)
url = matcher.group(2)
value = (url, None, resolution)
name = format_channel_name(key)
if name in channels:
if value not in channels[name]:
channels[name].append(value)
else:
channels[name] = [value]
except Exception as e:
print(f"Error on {subscribe_url}: {e}")
finally:
pbar.update()
remain = subscribe_urls_len - pbar.n
callback(
f"正在获取订阅源更新, 剩余{remain}个订阅源待获取, 预计剩余时间: {get_pbar_remaining(pbar, start_time)}",
int((pbar.n / subscribe_urls_len) * 100),
)
if config.open_online_search and pbar.n / subscribe_urls_len == 1:
callback("正在获取在线搜索结果, 请耐心等待", 0)
return channels
with ThreadPoolExecutor(max_workers=100) as executor:
futures = [
executor.submit(process_subscribe_channels, subscribe_url)
for subscribe_url in config.subscribe_urls
]
for future in futures:
merge_objects(subscribe_results, future.result())
pbar.close()
return subscribe_results

@ -3,7 +3,7 @@ from tkinter import messagebox
from tkinter import scrolledtext
from tkinter import ttk
from tkinter import filedialog
from utils import resource_path, load_external_config
from utils.config import resource_path, load_external_config
from main import UpdateSource
import os
import asyncio

911
utils.py

@ -1,911 +0,0 @@
from selenium import webdriver
import aiohttp
import asyncio
from time import time
import re
import datetime
import os
import urllib.parse
import ipaddress
from urllib.parse import urlparse
import requests
import re
from bs4 import BeautifulSoup
from bs4 import NavigableString
import fofa_map
from collections import defaultdict
from tqdm.asyncio import tqdm_asyncio
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import concurrent.futures
import sys
import importlib.util
from time import sleep
import socket
timeout = 10
max_retries = 3
def retry_func(func, retries=max_retries + 1, name=""):
"""
Retry the function
"""
for i in range(retries):
try:
sleep(3)
return func()
except Exception as e:
count = retries - 1
if name and i < count:
print(f"Failed to connect to the {name}. Retrying {i+1}...")
if i == count:
break
else:
continue
def locate_element_with_retry(driver, locator, timeout=timeout, retries=max_retries):
"""
Locate the element with retry
"""
wait = WebDriverWait(driver, timeout)
for _ in range(retries):
try:
return wait.until(EC.presence_of_element_located(locator))
except TimeoutException:
driver.refresh()
return None
def find_clickable_element_with_retry(
driver, locator, timeout=timeout, retries=max_retries
):
"""
Find the clickable element with retry
"""
wait = WebDriverWait(driver, timeout)
for _ in range(retries):
try:
return wait.until(EC.element_to_be_clickable(locator))
except TimeoutException:
driver.refresh()
return None
def resource_path(relative_path, persistent=False):
"""
Get the resource path
"""
base_path = os.path.abspath(".")
total_path = os.path.join(base_path, relative_path)
if persistent or os.path.exists(total_path):
return total_path
else:
try:
base_path = sys._MEIPASS
return os.path.join(base_path, relative_path)
except Exception:
return total_path
def load_external_config(name):
"""
Load the external config file
"""
config = None
config_path = name
config_filename = os.path.join(os.path.dirname(sys.executable), config_path)
if os.path.exists(config_filename):
spec = importlib.util.spec_from_file_location(name, config_filename)
config = importlib.util.module_from_spec(spec)
spec.loader.exec_module(config)
else:
import config
return config
config_path = resource_path("user_config.py")
default_config_path = resource_path("config.py")
config = (
load_external_config("user_config.py")
if os.path.exists(config_path)
else load_external_config("config.py")
)
def setup_driver(proxy=None):
"""
Setup the driver for selenium
"""
options = webdriver.ChromeOptions()
options.add_argument("start-maximized")
options.add_argument("--headless")
options.add_experimental_option("excludeSwitches", ["enable-logging"])
options.add_experimental_option("useAutomationExtension", False)
options.add_argument("blink-settings=imagesEnabled=false")
options.add_argument("--log-level=3")
options.add_argument("--ignore-certificate-errors")
options.add_argument("--allow-running-insecure-content")
options.add_argument("blink-settings=imagesEnabled=false")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
if proxy:
options.add_argument("--proxy-server=%s" % proxy)
driver = webdriver.Chrome(options=options)
return driver
def get_proxy_list(page_count=1):
"""
Get proxy list, parameter page_count is the number of pages to get
"""
url_pattern = [
"https://www.zdaye.com/free/{}/",
"https://www.kuaidaili.com/free/inha/{}/",
"https://www.kuaidaili.com/free/intr/{}/",
]
proxy_list = []
driver = setup_driver()
pbar = tqdm_asyncio(total=page_count, desc="Getting proxy list")
for page_index in range(1, page_count + 1):
for pattern in url_pattern:
url = pattern.format(page_index)
retry_func(lambda: driver.get(url), name=url)
sleep(1)
source = re.sub(
r"<!--.*?-->",
"",
driver.page_source,
flags=re.DOTALL,
)
soup = BeautifulSoup(source, "html.parser")
table = soup.find("table")
trs = table.find_all("tr") if table else []
for tr in trs[1:]:
tds = tr.find_all("td")
ip = tds[0].get_text().strip()
port = tds[1].get_text().strip()
proxy = f"http://{ip}:{port}"
proxy_list.append(proxy)
pbar.update()
pbar.close()
return proxy_list
async def get_proxy_list_with_test(base_url, proxy_list):
"""
Get the proxy list with speed test
"""
if not proxy_list:
return []
response_times = await tqdm_asyncio.gather(
*(get_speed(base_url, timeout=30, proxy=url) for url in proxy_list),
desc="Testing proxy speed",
)
proxy_list_with_test = [
(proxy, response_time)
for proxy, response_time in zip(proxy_list, response_times)
if response_time != float("inf")
]
if not proxy_list_with_test:
print("No valid proxy found")
return []
proxy_list_with_test.sort(key=lambda x: x[1])
proxy_urls = [url for url, _ in proxy_list_with_test]
print(f"{len(proxy_urls)} valid proxy found")
return proxy_urls
def format_channel_name(name):
"""
Format the channel name with sub and replace and lower
"""
sub_pattern = (
r"-|_|\((.*?)\)|\[(.*?)\]| |频道|标清|高清|HD|hd|超清|超高|超高清|中央|央视|台"
)
name = re.sub(sub_pattern, "", name)
name = name.replace("plus", "+")
name = name.replace("PLUS", "+")
name = name.replace("", "+")
name = name.replace("CCTV1综合", "CCTV1")
name = name.replace("CCTV2财经", "CCTV2")
name = name.replace("CCTV3综艺", "CCTV3")
name = name.replace("CCTV4国际", "CCTV4")
name = name.replace("CCTV4中文国际", "CCTV4")
name = name.replace("CCTV4欧洲", "CCTV4")
name = name.replace("CCTV5体育", "CCTV5")
name = name.replace("CCTV5+体育赛视", "CCTV5+")
name = name.replace("CCTV5+体育赛事", "CCTV5+")
name = name.replace("CCTV5+体育", "CCTV5+")
name = name.replace("CCTV6电影", "CCTV6")
name = name.replace("CCTV7军事", "CCTV7")
name = name.replace("CCTV7军农", "CCTV7")
name = name.replace("CCTV7农业", "CCTV7")
name = name.replace("CCTV7国防军事", "CCTV7")
name = name.replace("CCTV8电视剧", "CCTV8")
name = name.replace("CCTV9记录", "CCTV9")
name = name.replace("CCTV9纪录", "CCTV9")
name = name.replace("CCTV10科教", "CCTV10")
name = name.replace("CCTV11戏曲", "CCTV11")
name = name.replace("CCTV12社会与法", "CCTV12")
name = name.replace("CCTV13新闻", "CCTV13")
name = name.replace("CCTV新闻", "CCTV13")
name = name.replace("CCTV14少儿", "CCTV14")
name = name.replace("CCTV15音乐", "CCTV15")
name = name.replace("CCTV16奥林匹克", "CCTV16")
name = name.replace("CCTV17农业农村", "CCTV17")
name = name.replace("CCTV17农业", "CCTV17")
return name.lower()
def get_channel_items():
"""
Get the channel items from the source file
"""
# Open the source file and read all lines.
user_source_file = (
"user_" + config.source_file
if os.path.exists("user_" + config.source_file)
else getattr(config, "source_file", "demo.txt")
)
# Create a dictionary to store the channels.
channels = defaultdict(lambda: defaultdict(list))
current_category = ""
pattern = r"^(.*?),(?!#genre#)(.*?)$"
with open(resource_path(user_source_file), "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if "#genre#" in line:
# This is a new channel, create a new key in the dictionary.
current_category = line.split(",")[0]
else:
# This is a url, add it to the list of urls for the current channel.
match = re.search(pattern, line)
if match is not None:
name = match.group(1).strip()
url = match.group(2).strip()
if url and url not in channels[current_category][name]:
channels[current_category][name].append(url)
return channels
def get_pbar_remaining(pbar, start_time):
"""
Get the remaining time of the progress bar
"""
try:
elapsed = time() - start_time
completed_tasks = pbar.n
if completed_tasks > 0:
avg_time_per_task = elapsed / completed_tasks
remaining_tasks = pbar.total - completed_tasks
remaining_time = pbar.format_interval(avg_time_per_task * remaining_tasks)
else:
remaining_time = "未知"
return remaining_time
except Exception as e:
print(f"Error: {e}")
async def get_channels_by_subscribe_urls(callback):
"""
Get the channels by subscribe urls
"""
channels = {}
pattern = r"^(.*?),(?!#genre#)(.*?)$"
subscribe_urls_len = len(config.subscribe_urls)
pbar = tqdm_asyncio(total=subscribe_urls_len)
start_time = time()
def process_subscribe_channels(subscribe_url):
try:
response = None
try:
response = retry_func(
lambda: requests.get(subscribe_url, timeout=timeout),
name=subscribe_url,
)
except requests.exceptions.Timeout:
print(f"Timeout on subscribe: {subscribe_url}")
if response:
content = response.text
lines = content.split("\n")
for line in lines:
matcher = re.match(pattern, line)
if matcher is not None:
key = matcher.group(1)
resolution_match = re.search(r"_(\((.*?)\))", key)
resolution = (
resolution_match.group(2)
if resolution_match is not None
else None
)
url = matcher.group(2)
value = (url, None, resolution)
name = format_channel_name(key)
if name in channels:
if value not in channels[name]:
channels[name].append(value)
else:
channels[name] = [value]
except Exception as e:
print(f"Error on {subscribe_url}: {e}")
finally:
pbar.update()
remain = subscribe_urls_len - pbar.n
pbar.set_description(f"Processing subscribe, {remain} urls remaining")
callback(
f"正在获取订阅源更新, 剩余{remain}个订阅源待获取, 预计剩余时间: {get_pbar_remaining(pbar, start_time)}",
int((pbar.n / subscribe_urls_len) * 100),
)
if config.open_online_search and pbar.n / subscribe_urls_len == 1:
callback("正在获取在线搜索结果, 请耐心等待", 0)
pbar.set_description(f"Processing subscribe, {subscribe_urls_len} urls remaining")
callback(f"正在获取订阅源更新, 共{subscribe_urls_len}个订阅源", 0)
with concurrent.futures.ThreadPoolExecutor() as pool:
loop = asyncio.get_running_loop()
tasks = []
for subscribe_url in config.subscribe_urls:
task = loop.run_in_executor(pool, process_subscribe_channels, subscribe_url)
tasks.append(task)
await tqdm_asyncio.gather(*tasks, disable=True)
print("Finished processing subscribe urls")
pbar.close()
return channels
async def get_channels_by_online_search(names, callback):
"""
Get the channels by online search
"""
channels = {}
pageUrl = await use_accessible_url(callback)
if not pageUrl:
return channels
if config.open_proxy:
proxy_list = get_proxy_list(3)
proxy_list_test = (
await get_proxy_list_with_test(pageUrl, proxy_list) if proxy_list else []
)
proxy_index = 0
start_time = time()
def process_channel_by_online_search(name, proxy=None):
driver = setup_driver(proxy)
info_list = []
try:
retry_func(lambda: driver.get(pageUrl), name=f"online search:{name}")
search_box = locate_element_with_retry(
driver, (By.XPATH, '//input[@type="text"]')
)
if not search_box:
return
search_box.clear()
search_box.send_keys(name)
submit_button = find_clickable_element_with_retry(
driver, (By.XPATH, '//input[@type="submit"]')
)
if not submit_button:
return
sleep(3)
driver.execute_script("arguments[0].click();", submit_button)
isFavorite = name in config.favorite_list
pageNum = (
config.favorite_page_num if isFavorite else config.default_page_num
)
retry_limit = 3
for page in range(1, pageNum + 1):
retries = 0
while retries < retry_limit:
try:
if page > 1:
page_link = find_clickable_element_with_retry(
driver,
(
By.XPATH,
f'//a[contains(@href, "={page}") and contains(@href, "{name}")]',
),
)
if not page_link:
break
sleep(3)
driver.execute_script("arguments[0].click();", page_link)
sleep(3)
source = re.sub(
r"<!--.*?-->",
"",
driver.page_source,
flags=re.DOTALL,
)
soup = BeautifulSoup(source, "html.parser")
if soup:
results = get_results_from_soup(soup, name)
print(name, "page:", page, "results num:", len(results))
if len(results) == 0 and retries < retry_limit - 1:
print(
f"{name}:No results found, refreshing page and retrying..."
)
driver.refresh()
retries += 1
continue
for result in results:
url, date, resolution = result
if url and check_url_by_patterns(url):
info_list.append((url, date, resolution))
break
else:
print(
f"{name}:No results found, refreshing page and retrying..."
)
driver.refresh()
retries += 1
continue
except Exception as e:
print(f"{name}:Error on page {page}: {e}")
break
if retries == retry_limit:
print(f"{name}:Reached retry limit, moving to next page")
except Exception as e:
print(f"{name}:Error on search: {e}")
pass
finally:
channels[format_channel_name(name)] = info_list
names_queue.task_done()
pbar.update()
pbar.set_description(
f"Processing online search, {names_len - pbar.n} channels remaining"
)
callback(
f"正在线上查询更新, 剩余{names_len - pbar.n}个频道待查询, 预计剩余时间: {get_pbar_remaining(pbar, start_time)}",
int((pbar.n / names_len) * 100),
)
driver.quit()
names_queue = asyncio.Queue()
for name in names:
await names_queue.put(name)
names_len = names_queue.qsize()
pbar = tqdm_asyncio(total=names_len)
pbar.set_description(f"Processing online search, {names_len} channels remaining")
callback(f"正在线上查询更新, 共{names_len}个频道", 0)
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as pool:
while not names_queue.empty():
loop = asyncio.get_running_loop()
name = await names_queue.get()
proxy = (
proxy_list_test[0] if config.open_proxy and proxy_list_test else None
)
if config.open_proxy and proxy_list_test:
proxy_index = (proxy_index + 1) % len(proxy_list_test)
loop.run_in_executor(pool, process_channel_by_online_search, name, proxy)
print("Finished processing online search")
pbar.close()
return channels
def update_channel_urls_txt(cate, name, urls):
"""
Update the category and channel urls to the final file
"""
genre_line = cate + ",#genre#\n"
filename = "result_new.txt"
if not os.path.exists(filename):
open(filename, "w").close()
with open(filename, "r", encoding="utf-8") as f:
content = f.read()
with open(filename, "a", encoding="utf-8") as f:
if genre_line not in content:
f.write(genre_line)
for url in urls:
if url is not None:
f.write(name + "," + url + "\n")
def update_file(final_file, old_file):
"""
Update the file
"""
old_file_path = resource_path(old_file, persistent=True)
final_file_path = resource_path(final_file, persistent=True)
if os.path.exists(old_file_path):
os.replace(old_file_path, final_file_path)
def get_channel_url(element):
"""
Get the url, date and resolution
"""
url = None
urlRegex = r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
url_search = re.search(
urlRegex,
element.get_text(strip=True),
)
if url_search:
url = url_search.group()
return url
def get_channel_info(element):
"""
Get the channel info
"""
date, resolution = None, None
info_text = element.get_text(strip=True)
if info_text:
date, resolution = (
(info_text.partition(" ")[0] if info_text.partition(" ")[0] else None),
(
info_text.partition(" ")[2].partition("")[2]
if info_text.partition(" ")[2].partition("")[2]
else None
),
)
return date, resolution
def get_results_from_soup(soup, name):
"""
Get the results from the soup
"""
results = []
for element in soup.descendants:
if isinstance(element, NavigableString):
url = get_channel_url(element)
if url and not any(item[0] == url for item in results):
url_element = soup.find(lambda tag: tag.get_text(strip=True) == url)
if url_element:
name_element = url_element.find_previous_sibling()
if name_element:
channel_name = name_element.get_text(strip=True)
if format_channel_name(name) == format_channel_name(
channel_name
):
info_element = url_element.find_next_sibling()
date, resolution = get_channel_info(info_element)
results.append((url, date, resolution))
return results
async def get_speed(url, timeout=timeout, proxy=None):
"""
Get the speed of the url
"""
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(verify_ssl=False), trust_env=True
) as session:
start = time()
end = None
try:
async with session.get(url, timeout=timeout, proxy=proxy) as response:
resStatus = response.status
if resStatus == 200:
end = time()
else:
return float("inf")
except Exception as e:
return float("inf")
return int(round((end - start) * 1000)) if end else float("inf")
async def sort_urls_by_speed_and_resolution(infoList):
"""
Sort by speed and resolution
"""
response_times = await asyncio.gather(*(get_speed(url) for url, _, _ in infoList))
valid_responses = [
(info, rt) for info, rt in zip(infoList, response_times) if rt != float("inf")
]
def extract_resolution(resolution_str):
numbers = re.findall(r"\d+x\d+", resolution_str)
if numbers:
width, height = map(int, numbers[0].split("x"))
return width * height
else:
return 0
default_response_time_weight = 0.5
default_resolution_weight = 0.5
response_time_weight = getattr(
config, "response_time_weight", default_response_time_weight
)
resolution_weight = getattr(config, "resolution_weight", default_resolution_weight)
# Check if weights are valid
if not (
0 <= response_time_weight <= 1
and 0 <= resolution_weight <= 1
and response_time_weight + resolution_weight == 1
):
response_time_weight = default_response_time_weight
resolution_weight = default_resolution_weight
def combined_key(item):
(_, _, resolution), response_time = item
resolution_value = extract_resolution(resolution) if resolution else 0
return (
-(response_time_weight * response_time)
+ resolution_weight * resolution_value
)
sorted_res = sorted(valid_responses, key=combined_key, reverse=True)
return sorted_res
def filter_by_date(data):
"""
Filter by date and limit
"""
default_recent_days = 30
use_recent_days = getattr(config, "recent_days", 30)
if not isinstance(use_recent_days, int) or use_recent_days <= 0:
use_recent_days = default_recent_days
start_date = datetime.datetime.now() - datetime.timedelta(days=use_recent_days)
recent_data = []
unrecent_data = []
for (url, date, resolution), response_time in data:
item = ((url, date, resolution), response_time)
if date:
date = datetime.datetime.strptime(date, "%m-%d-%Y")
if date >= start_date:
recent_data.append(item)
else:
unrecent_data.append(item)
else:
unrecent_data.append(item)
recent_data_len = len(recent_data)
if recent_data_len == 0:
recent_data = unrecent_data
elif recent_data_len < config.urls_limit:
recent_data.extend(unrecent_data[: config.urls_limit - len(recent_data)])
return recent_data
def get_total_urls_from_info_list(infoList):
"""
Get the total urls from info list
"""
total_urls = [url for url, _, _ in infoList]
return list(dict.fromkeys(total_urls))[: int(config.urls_limit)]
def get_total_urls_from_sorted_data(data):
"""
Get the total urls with filter by date and depulicate from sorted data
"""
total_urls = []
if len(data) > config.urls_limit:
total_urls = [url for (url, _, _), _ in filter_by_date(data)]
else:
total_urls = [url for (url, _, _), _ in data]
return list(dict.fromkeys(total_urls))[: config.urls_limit]
def is_ipv6(url):
"""
Check if the url is ipv6
"""
try:
host = urllib.parse.urlparse(url).hostname
ipaddress.IPv6Address(host)
return True
except ValueError:
return False
def check_url_ipv_type(url):
"""
Check if the url is compatible with the ipv type in the config
"""
ipv_type = getattr(config, "ipv_type", "ipv4")
if ipv_type == "ipv4":
return not is_ipv6(url)
elif ipv_type == "ipv6":
return is_ipv6(url)
else:
return True
def check_by_domain_blacklist(url):
"""
Check by domain blacklist
"""
domain_blacklist = [
urlparse(domain).netloc if urlparse(domain).scheme else domain
for domain in getattr(config, "domain_blacklist", [])
]
return urlparse(url).netloc not in domain_blacklist
def check_by_url_keywords_blacklist(url):
"""
Check by URL blacklist keywords
"""
url_keywords_blacklist = getattr(config, "url_keywords_blacklist", [])
return not any(keyword in url for keyword in url_keywords_blacklist)
def check_url_by_patterns(url):
"""
Check the url by patterns
"""
return (
check_url_ipv_type(url)
and check_by_domain_blacklist(url)
and check_by_url_keywords_blacklist(url)
)
def filter_urls_by_patterns(urls):
"""
Filter urls by patterns
"""
urls = [url for url in urls if check_url_ipv_type(url)]
urls = [url for url in urls if check_by_domain_blacklist(url)]
urls = [url for url in urls if check_by_url_keywords_blacklist(url)]
return urls
async def use_accessible_url(callback):
"""
Check if the url is accessible
"""
callback(f"正在获取最优的在线检索节点", 0)
baseUrl1 = "https://www.foodieguide.com/iptvsearch/"
baseUrl2 = "http://tonkiang.us/"
task1 = asyncio.create_task(get_speed(baseUrl1, timeout=30))
task2 = asyncio.create_task(get_speed(baseUrl2, timeout=30))
task_results = await asyncio.gather(task1, task2)
callback(f"获取在线检索节点完成", 100)
if task_results[0] == float("inf") and task_results[1] == float("inf"):
return None
if task_results[0] < task_results[1]:
return baseUrl1
else:
return baseUrl2
def get_fofa_urls_from_region_list():
"""
Get the FOFA url from region
"""
region_list = getattr(config, "region_list", [])
urls = []
region_url = getattr(fofa_map, "region_url")
if "all" in region_list:
urls = [url for url in region_url.values() if url]
else:
for region in region_list:
if region in region_url:
urls.append(region_url[region])
return urls
async def get_channels_by_fofa(callback):
"""
Get the channel by FOFA
"""
fofa_urls = get_fofa_urls_from_region_list()
fofa_urls_len = len(fofa_urls)
pbar = tqdm_asyncio(total=fofa_urls_len)
start_time = time()
fofa_results = {}
def process_fofa_channels(fofa_url, pbar, fofa_urls_len, callback):
driver = setup_driver()
try:
retry_func(lambda: driver.get(fofa_url), name=fofa_url)
fofa_source = re.sub(r"<!--.*?-->", "", driver.page_source, flags=re.DOTALL)
urls = set(re.findall(r"https?://[\w\.-]+:\d+", fofa_source))
channels = {}
for url in urls:
try:
final_url = url + "/iptv/live/1000.json?key=txiptv"
response = retry_func(
lambda: requests.get(final_url, timeout=timeout),
name=final_url,
)
try:
json_data = response.json()
if json_data["code"] == 0:
try:
for item in json_data["data"]:
if isinstance(item, dict):
item_name = format_channel_name(
item.get("name")
)
item_url = item.get("url").strip()
if item_name and item_url:
total_url = url + item_url
if item_name not in channels:
channels[item_name] = [total_url]
else:
channels[item_name].append(total_url)
except Exception as e:
# print(f"Error on fofa: {e}")
continue
except Exception as e:
# print(f"{url}: {e}")
continue
except Exception as e:
# print(f"{url}: {e}")
continue
merge_objects(fofa_results, channels)
except Exception as e:
# print(e)
pass
finally:
pbar.update()
remain = fofa_urls_len - pbar.n
pbar.set_description(f"Processing multicast, {remain} regions remaining")
callback(
f"正在获取组播源更新, 剩余{remain}个地区待获取, 预计剩余时间: {get_pbar_remaining(pbar, start_time)}",
int((pbar.n / fofa_urls_len) * 100),
)
if config.open_online_search and pbar.n / fofa_urls_len == 1:
callback("正在获取在线搜索结果, 请耐心等待", 0)
driver.quit()
pbar.set_description(f"Processing multicast, {fofa_urls_len} regions remaining")
callback(f"正在获取组播源更新, 共{fofa_urls_len}个地区", 0)
with concurrent.futures.ThreadPoolExecutor() as pool:
loop = asyncio.get_running_loop()
tasks = []
for fofa_url in fofa_urls:
task = loop.run_in_executor(
pool, process_fofa_channels, fofa_url, pbar, fofa_urls_len, callback
)
tasks.append(task)
await tqdm_asyncio.gather(*tasks, disable=True)
pbar.close()
return fofa_results
def merge_objects(*objects):
"""
Merge objects
"""
merged_dict = {}
for obj in objects:
if not isinstance(obj, dict):
raise TypeError("All input objects must be dictionaries")
for key, value in obj.items():
if key not in merged_dict:
merged_dict[key] = set()
if isinstance(value, set):
merged_dict[key].update(value)
elif isinstance(value, list):
for item in value:
merged_dict[key].add(item)
else:
merged_dict[key].add(value)
for key, value in merged_dict.items():
merged_dict[key] = list(value)
return merged_dict
def get_ip_address():
"""
Get the IP address
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("10.255.255.255", 1))
IP = s.getsockname()[0]
except Exception:
IP = "127.0.0.1"
finally:
s.close()
return f"http://{IP}:8000"

0
utils/__init__.py Normal file

301
utils/channel.py Normal file

@ -0,0 +1,301 @@
from utils.config import get_config, resource_path
from utils.tools import check_url_by_patterns, get_total_urls_from_info_list
from utils.speed import sort_urls_by_speed_and_resolution
import os
from collections import defaultdict
import re
from bs4 import NavigableString
import logging
from logging.handlers import RotatingFileHandler
config = get_config()
handler = RotatingFileHandler("result_new.log", encoding="utf-8")
logging.basicConfig(
handlers=[handler],
format="%(message)s",
level=logging.INFO,
)
def get_channel_items():
"""
Get the channel items from the source file
"""
# Open the source file and read all lines.
user_source_file = (
"user_" + config.source_file
if os.path.exists("user_" + config.source_file)
else getattr(config, "source_file", "demo.txt")
)
# Create a dictionary to store the channels.
channels = defaultdict(lambda: defaultdict(list))
current_category = ""
pattern = r"^(.*?),(?!#genre#)(.*?)$"
with open(resource_path(user_source_file), "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if "#genre#" in line:
# This is a new channel, create a new key in the dictionary.
current_category = line.split(",")[0]
else:
# This is a url, add it to the list of urls for the current channel.
match = re.search(pattern, line)
if match is not None:
name = match.group(1).strip()
url = match.group(2).strip()
if url and url not in channels[current_category][name]:
channels[current_category][name].append(url)
return channels
def format_channel_name(name):
"""
Format the channel name with sub and replace and lower
"""
sub_pattern = (
r"-|_|\((.*?)\)|\[(.*?)\]| |频道|标清|高清|HD|hd|超清|超高|超高清|中央|央视|台"
)
name = re.sub(sub_pattern, "", name)
name = name.replace("plus", "+")
name = name.replace("PLUS", "+")
name = name.replace("", "+")
name = name.replace("CCTV1综合", "CCTV1")
name = name.replace("CCTV2财经", "CCTV2")
name = name.replace("CCTV3综艺", "CCTV3")
name = name.replace("CCTV4国际", "CCTV4")
name = name.replace("CCTV4中文国际", "CCTV4")
name = name.replace("CCTV4欧洲", "CCTV4")
name = name.replace("CCTV5体育", "CCTV5")
name = name.replace("CCTV5+体育赛视", "CCTV5+")
name = name.replace("CCTV5+体育赛事", "CCTV5+")
name = name.replace("CCTV5+体育", "CCTV5+")
name = name.replace("CCTV6电影", "CCTV6")
name = name.replace("CCTV7军事", "CCTV7")
name = name.replace("CCTV7军农", "CCTV7")
name = name.replace("CCTV7农业", "CCTV7")
name = name.replace("CCTV7国防军事", "CCTV7")
name = name.replace("CCTV8电视剧", "CCTV8")
name = name.replace("CCTV9记录", "CCTV9")
name = name.replace("CCTV9纪录", "CCTV9")
name = name.replace("CCTV10科教", "CCTV10")
name = name.replace("CCTV11戏曲", "CCTV11")
name = name.replace("CCTV12社会与法", "CCTV12")
name = name.replace("CCTV13新闻", "CCTV13")
name = name.replace("CCTV新闻", "CCTV13")
name = name.replace("CCTV14少儿", "CCTV14")
name = name.replace("CCTV15音乐", "CCTV15")
name = name.replace("CCTV16奥林匹克", "CCTV16")
name = name.replace("CCTV17农业农村", "CCTV17")
name = name.replace("CCTV17农业", "CCTV17")
return name.lower()
def get_results_from_soup(soup, name):
"""
Get the results from the soup
"""
results = []
for element in soup.descendants:
if isinstance(element, NavigableString):
url = get_channel_url(element)
if url and not any(item[0] == url for item in results):
url_element = soup.find(lambda tag: tag.get_text(strip=True) == url)
if url_element:
name_element = url_element.find_previous_sibling()
if name_element:
channel_name = name_element.get_text(strip=True)
if format_channel_name(name) == format_channel_name(
channel_name
):
info_element = url_element.find_next_sibling()
date, resolution = get_channel_info(info_element)
results.append((url, date, resolution))
return results
def update_channel_urls_txt(cate, name, urls):
"""
Update the category and channel urls to the final file
"""
genre_line = cate + ",#genre#\n"
filename = "result_new.txt"
if not os.path.exists(filename):
open(filename, "w").close()
with open(filename, "r", encoding="utf-8") as f:
content = f.read()
with open(filename, "a", encoding="utf-8") as f:
if genre_line not in content:
f.write(genre_line)
for url in urls:
if url is not None:
f.write(name + "," + url + "\n")
def get_channel_url(element):
"""
Get the url, date and resolution
"""
url = None
urlRegex = r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
url_search = re.search(
urlRegex,
element.get_text(strip=True),
)
if url_search:
url = url_search.group()
return url
def get_channel_info(element):
"""
Get the channel info
"""
date, resolution = None, None
info_text = element.get_text(strip=True)
if info_text:
date, resolution = (
(info_text.partition(" ")[0] if info_text.partition(" ")[0] else None),
(
info_text.partition(" ")[2].partition("")[2]
if info_text.partition(" ")[2].partition("")[2]
else None
),
)
return date, resolution
def init_info_data(data, cate, name):
"""
Init channel info data
"""
if data.get(cate) is None:
data[cate] = {}
if data[cate].get(name) is None:
data[cate][name] = []
return data
def append_data_to_info_data(info_data, cate, name, data, check=True):
"""
Append channel data to total info data
"""
info_data = init_info_data(info_data, cate, name)
for url, date, resolution in data:
if (url and not check) or (url and check and check_url_by_patterns(url)):
info_data[cate][name].append((url, date, resolution))
return info_data
def append_all_method_data(
items, data, subscribe_result=None, multicast_result=None, online_search_result=None
):
"""
Append all method data to total info data
"""
for cate, channel_obj in items:
for name, old_urls in channel_obj.items():
formatName = format_channel_name(name)
if config.open_subscribe:
data = append_data_to_info_data(
data,
cate,
name,
subscribe_result.get(formatName, []),
)
print(
name,
"subscribe num:",
len(subscribe_result.get(formatName, [])),
)
if config.open_multicast:
data = append_data_to_info_data(
data,
cate,
name,
multicast_result.get(formatName, []),
)
print(
name,
"multicast num:",
len(multicast_result.get(formatName, [])),
)
if config.open_online_search:
data = append_data_to_info_data(
data,
cate,
name,
online_search_result.get(formatName, []),
)
print(
name,
"online search num:",
len(online_search_result.get(formatName, [])),
)
total_channel_data_len = len(data.get(cate, {}).get(name, []))
if total_channel_data_len == 0:
data = append_data_to_info_data(
data,
cate,
name,
[(url, None, None) for url in old_urls],
)
print(
name,
"total num:",
len(data.get(cate, {}).get(name, [])),
)
return data
async def sort_channel_list(semaphore, cate, name, info_list, callback):
"""
Sort the channel list
"""
async with semaphore:
data = []
try:
if info_list:
sorted_data = await sort_urls_by_speed_and_resolution(info_list)
if sorted_data:
for (
url,
date,
resolution,
), response_time in sorted_data:
logging.info(
f"Name: {name}, URL: {url}, Date: {date}, Resolution: {resolution}, Response Time: {response_time}ms"
)
data = [
(url, date, resolution)
for (url, date, resolution), _ in sorted_data
]
except Exception as e:
logging.error(f"Error: {e}")
finally:
callback()
return {cate: cate, name: name, data: data}
def write_channel_to_file(items, data, callback):
"""
Write channel to file
"""
for cate, channel_obj in items:
for name in channel_obj.keys():
info_list = data.get(cate, {}).get(name, [])
try:
channel_urls = get_total_urls_from_info_list(info_list)
print("write:", cate, name, "num:", len(channel_urls))
update_channel_urls_txt(cate, name, channel_urls)
finally:
callback()
for handler in logging.root.handlers[:]:
handler.close()
logging.root.removeHandler(handler)

50
utils/config.py Normal file

@ -0,0 +1,50 @@
from os import path
import sys
from importlib import util
def resource_path(relative_path, persistent=False):
"""
Get the resource path
"""
base_path = path.abspath(".")
total_path = path.join(base_path, relative_path)
if persistent or path.exists(total_path):
return total_path
else:
try:
base_path = sys._MEIPASS
return path.join(base_path, relative_path)
except Exception:
return total_path
def load_external_config(name):
"""
Load the external config file
"""
config = None
config_path = name
config_filename = path.join(path.dirname(sys.executable), config_path)
if path.exists(config_filename):
spec = util.spec_from_file_location(name, config_filename)
config = util.module_from_spec(spec)
spec.loader.exec_module(config)
else:
import config
return config
def get_config():
"""
Get the config
"""
config_path = resource_path("user_config.py")
config = (
load_external_config("user_config.py")
if path.exists(config_path)
else load_external_config("config.py")
)
return config

53
utils/retry.py Normal file

@ -0,0 +1,53 @@
from time import sleep
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
max_retries = 3
timeout = 10
def retry_func(func, retries=max_retries + 1, name=""):
"""
Retry the function
"""
for i in range(retries):
try:
sleep(3)
return func()
except Exception as e:
count = retries - 1
if name and i < count:
print(f"Failed to connect to the {name}. Retrying {i+1}...")
if i == count:
return False
else:
continue
def locate_element_with_retry(driver, locator, timeout=timeout, retries=max_retries):
"""
Locate the element with retry
"""
wait = WebDriverWait(driver, timeout)
for _ in range(retries):
try:
return wait.until(EC.presence_of_element_located(locator))
except TimeoutException:
driver.refresh()
return None
def find_clickable_element_with_retry(
driver, locator, timeout=timeout, retries=max_retries
):
"""
Find the clickable element with retry
"""
wait = WebDriverWait(driver, timeout)
for _ in range(retries):
try:
return wait.until(EC.element_to_be_clickable(locator))
except TimeoutException:
driver.refresh()
return None

74
utils/speed.py Normal file

@ -0,0 +1,74 @@
from aiohttp import ClientSession, TCPConnector
from time import time
from asyncio import gather
import re
from utils.config import get_config
config = get_config()
timeout = 10
async def get_speed(url, timeout=timeout, proxy=None):
"""
Get the speed of the url
"""
async with ClientSession(
connector=TCPConnector(verify_ssl=False), trust_env=True
) as session:
start = time()
end = None
try:
async with session.get(url, timeout=timeout, proxy=proxy) as response:
resStatus = response.status
if resStatus == 200:
end = time()
else:
return float("inf")
except Exception as e:
return float("inf")
return int(round((end - start) * 1000)) if end else float("inf")
async def sort_urls_by_speed_and_resolution(infoList):
"""
Sort by speed and resolution
"""
response_times = await gather(*(get_speed(url) for url, _, _ in infoList))
valid_responses = [
(info, rt) for info, rt in zip(infoList, response_times) if rt != float("inf")
]
def extract_resolution(resolution_str):
numbers = re.findall(r"\d+x\d+", resolution_str)
if numbers:
width, height = map(int, numbers[0].split("x"))
return width * height
else:
return 0
default_response_time_weight = 0.5
default_resolution_weight = 0.5
response_time_weight = getattr(
config, "response_time_weight", default_response_time_weight
)
resolution_weight = getattr(config, "resolution_weight", default_resolution_weight)
# Check if weights are valid
if not (
0 <= response_time_weight <= 1
and 0 <= resolution_weight <= 1
and response_time_weight + resolution_weight == 1
):
response_time_weight = default_response_time_weight
resolution_weight = default_resolution_weight
def combined_key(item):
(_, _, resolution), response_time = item
resolution_value = extract_resolution(resolution) if resolution else 0
return (
-(response_time_weight * response_time)
+ resolution_weight * resolution_value
)
sorted_res = sorted(valid_responses, key=combined_key, reverse=True)
return sorted_res

205
utils/tools.py Normal file

@ -0,0 +1,205 @@
from time import time
import datetime
import os
import urllib.parse
import ipaddress
from urllib.parse import urlparse
import socket
from utils.config import get_config, resource_path
import re
from bs4 import BeautifulSoup
config = get_config()
timeout = 10
def get_pbar_remaining(pbar, start_time):
"""
Get the remaining time of the progress bar
"""
try:
elapsed = time() - start_time
completed_tasks = pbar.n
if completed_tasks > 0:
avg_time_per_task = elapsed / completed_tasks
remaining_tasks = pbar.total - completed_tasks
remaining_time = pbar.format_interval(avg_time_per_task * remaining_tasks)
else:
remaining_time = "未知"
return remaining_time
except Exception as e:
print(f"Error: {e}")
def update_file(final_file, old_file):
"""
Update the file
"""
old_file_path = resource_path(old_file, persistent=True)
final_file_path = resource_path(final_file, persistent=True)
if os.path.exists(old_file_path):
os.replace(old_file_path, final_file_path)
def filter_by_date(data):
"""
Filter by date and limit
"""
default_recent_days = 30
use_recent_days = getattr(config, "recent_days", 30)
if not isinstance(use_recent_days, int) or use_recent_days <= 0:
use_recent_days = default_recent_days
start_date = datetime.datetime.now() - datetime.timedelta(days=use_recent_days)
recent_data = []
unrecent_data = []
for (url, date, resolution), response_time in data:
item = ((url, date, resolution), response_time)
if date:
date = datetime.datetime.strptime(date, "%m-%d-%Y")
if date >= start_date:
recent_data.append(item)
else:
unrecent_data.append(item)
else:
unrecent_data.append(item)
recent_data_len = len(recent_data)
if recent_data_len == 0:
recent_data = unrecent_data
elif recent_data_len < config.urls_limit:
recent_data.extend(unrecent_data[: config.urls_limit - len(recent_data)])
return recent_data
def get_soup(source):
"""
Get soup from source
"""
source = re.sub(
r"<!--.*?-->",
"",
source,
flags=re.DOTALL,
)
soup = BeautifulSoup(source, "html.parser")
return soup
def get_total_urls_from_info_list(infoList):
"""
Get the total urls from info list
"""
total_urls = [url for url, _, _ in infoList]
return list(dict.fromkeys(total_urls))[: int(config.urls_limit)]
def get_total_urls_from_sorted_data(data):
"""
Get the total urls with filter by date and depulicate from sorted data
"""
total_urls = []
if len(data) > config.urls_limit:
total_urls = [url for (url, _, _), _ in filter_by_date(data)]
else:
total_urls = [url for (url, _, _), _ in data]
return list(dict.fromkeys(total_urls))[: config.urls_limit]
def is_ipv6(url):
"""
Check if the url is ipv6
"""
try:
host = urllib.parse.urlparse(url).hostname
ipaddress.IPv6Address(host)
return True
except ValueError:
return False
def check_url_ipv_type(url):
"""
Check if the url is compatible with the ipv type in the config
"""
ipv_type = getattr(config, "ipv_type", "ipv4")
if ipv_type == "ipv4":
return not is_ipv6(url)
elif ipv_type == "ipv6":
return is_ipv6(url)
else:
return True
def check_by_domain_blacklist(url):
"""
Check by domain blacklist
"""
domain_blacklist = [
urlparse(domain).netloc if urlparse(domain).scheme else domain
for domain in getattr(config, "domain_blacklist", [])
]
return urlparse(url).netloc not in domain_blacklist
def check_by_url_keywords_blacklist(url):
"""
Check by URL blacklist keywords
"""
url_keywords_blacklist = getattr(config, "url_keywords_blacklist", [])
return not any(keyword in url for keyword in url_keywords_blacklist)
def check_url_by_patterns(url):
"""
Check the url by patterns
"""
return (
check_url_ipv_type(url)
and check_by_domain_blacklist(url)
and check_by_url_keywords_blacklist(url)
)
def filter_urls_by_patterns(urls):
"""
Filter urls by patterns
"""
urls = [url for url in urls if check_url_ipv_type(url)]
urls = [url for url in urls if check_by_domain_blacklist(url)]
urls = [url for url in urls if check_by_url_keywords_blacklist(url)]
return urls
def merge_objects(*objects):
"""
Merge objects
"""
merged_dict = {}
for obj in objects:
if not isinstance(obj, dict):
raise TypeError("All input objects must be dictionaries")
for key, value in obj.items():
if key not in merged_dict:
merged_dict[key] = set()
if isinstance(value, set):
merged_dict[key].update(value)
elif isinstance(value, list):
for item in value:
merged_dict[key].add(item)
else:
merged_dict[key].add(value)
for key, value in merged_dict.items():
merged_dict[key] = list(value)
return merged_dict
def get_ip_address():
"""
Get the IP address
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("10.255.255.255", 1))
IP = s.getsockname()[0]
except Exception:
IP = "127.0.0.1"
finally:
s.close()
return f"http://{IP}:8000"