1
0
This commit is contained in:
Such 2024-07-06 18:52:14 +08:00
parent d70b24b2f7
commit 0864950811
7 changed files with 81 additions and 76 deletions

@ -10,6 +10,7 @@ import re
from utils.retry import retry_func
from utils.channel import format_channel_name
from utils.utils import merge_objects, get_pbar_remaining
from proxy import get_proxy
config = get_config()
timeout = 10
@ -37,19 +38,22 @@ async def get_channels_by_fofa(callback):
"""
fofa_urls = get_fofa_urls_from_region_list()
fofa_urls_len = len(fofa_urls)
pbar = tqdm_asyncio(total=fofa_urls_len)
pbar = tqdm_asyncio(total=fofa_urls_len, desc="Processing multicast")
start_time = time()
fofa_results = {}
pbar.set_description(f"Processing multicast, {fofa_urls_len} regions remaining")
callback(f"正在获取组播源更新, 共{fofa_urls_len}个地区", 0)
fofa_queue = Queue()
for fofa_url in fofa_urls:
await fofa_queue.put(fofa_url)
proxy = None
if config.open_proxy:
proxy = await get_proxy(fofa_urls[0], best=True, with_test=True)
driver = setup_driver(proxy)
def process_fofa_channels(fofa_url, fofa_urls_len, callback):
driver = None
def process_fofa_channels(fofa_url, fofa_urls_len, proxy=None):
# driver = None
try:
driver = setup_driver()
# driver = setup_driver(proxy)
retry_func(lambda: driver.get(fofa_url), name=fofa_url)
fofa_source = re.sub(r"<!--.*?-->", "", driver.page_source, flags=re.DOTALL)
urls = set(re.findall(r"https?://[\w\.-]+:\d+", fofa_source))
@ -89,15 +93,13 @@ async def get_channels_by_fofa(callback):
continue
merge_objects(fofa_results, channels)
except Exception as e:
# print(e)
pass
print(e)
finally:
if driver:
driver.quit()
# if driver:
# driver.quit()
fofa_queue.task_done()
pbar.update()
remain = fofa_urls_len - pbar.n
pbar.set_description(f"Processing multicast, {remain} regions remaining")
callback(
f"正在获取组播源更新, 剩余{remain}个地区待获取, 预计剩余时间: {get_pbar_remaining(pbar, start_time)}",
int((pbar.n / fofa_urls_len) * 100),
@ -105,13 +107,14 @@ async def get_channels_by_fofa(callback):
if config.open_online_search and pbar.n / fofa_urls_len == 1:
callback("正在获取在线搜索结果, 请耐心等待", 0)
with ThreadPoolExecutor(max_workers=5) as pool:
while not fofa_queue.empty():
loop = get_running_loop()
fofa_url = await fofa_queue.get()
loop.run_in_executor(
pool, process_fofa_channels, fofa_url, fofa_urls_len, callback
)
print("Finish processing fofa url")
# with ThreadPoolExecutor(max_workers=5) as pool:
while not fofa_queue.empty():
# loop = get_running_loop()
fofa_url = await fofa_queue.get()
process_fofa_channels(fofa_url, fofa_urls_len, proxy)
# loop.run_in_executor(
# pool, process_fofa_channels, fofa_url, fofa_urls_len, proxy
# )
driver.quit()
pbar.close()
return fofa_results

14
main.py

@ -88,9 +88,6 @@ class UpdateSource:
logging.error(f"Error: {e}")
finally:
self.pbar.update()
self.pbar.set_description(
f"Sorting, {self.pbar.total - self.pbar.n} channels remaining"
)
self.update_progress(
f"正在测速排序, 剩余{self.pbar.total - self.pbar.n}个频道, 预计剩余时间: {get_pbar_remaining(self.pbar, self.start_time)}",
int((self.pbar.n / self.total) * 100),
@ -140,8 +137,7 @@ class UpdateSource:
)
def write_channel_to_file(self):
self.pbar = tqdm(total=self.total)
self.pbar.set_description(f"Writing, {self.total} channels remaining")
self.pbar = tqdm(total=self.total, desc="Writing")
self.start_time = time()
for cate, channel_obj in self.channel_items.items():
for name in channel_obj.keys():
@ -152,9 +148,6 @@ class UpdateSource:
update_channel_urls_txt(cate, name, channel_urls)
finally:
self.pbar.update()
self.pbar.set_description(
f"Writing, {self.pbar.total - self.pbar.n} channels remaining"
)
self.update_progress(
f"正在写入结果, 剩余{self.pbar.total - self.pbar.n}个接口, 预计剩余时间: {get_pbar_remaining(self.pbar, self.start_time)}",
int((self.pbar.n / self.total) * 100),
@ -200,10 +193,7 @@ class UpdateSource:
for cate, channel_obj in self.channel_data.items()
for name, info_list in channel_obj.items()
]
self.pbar = tqdm_asyncio(total=len(self.tasks))
self.pbar.set_description(
f"Sorting, {len(self.tasks)} channels remaining"
)
self.pbar = tqdm_asyncio(total=len(self.tasks), desc="Sorting")
self.update_progress(
f"正在测速排序, 共{len(self.tasks)}个频道",
int((self.pbar.n / len(self.tasks)) * 100),

@ -7,7 +7,7 @@ from utils.utils import (
get_results_from_soup,
)
from utils.config import get_config
from proxy.request import get_proxy_list, get_proxy_list_with_test
from proxy import get_proxy
from time import time, sleep
from driver.setup import setup_driver
from utils.retry import (
@ -52,19 +52,17 @@ async def get_channels_by_online_search(names, callback):
pageUrl = await use_accessible_url(callback)
if not pageUrl:
return channels
proxy = None
if config.open_proxy:
proxy_list = await get_proxy_list(3)
proxy_list_test = (
await get_proxy_list_with_test(pageUrl, proxy_list) if proxy_list else []
)
proxy_index = 0
proxy = await get_proxy(pageUrl, best=True, with_test=True)
start_time = time()
driver = setup_driver(proxy)
def process_channel_by_online_search(name, proxy=None):
info_list = []
driver = None
# driver = None
try:
driver = setup_driver(proxy)
# driver = setup_driver(proxy)
retry_func(lambda: driver.get(pageUrl), name=f"online search:{name}")
search_box = locate_element_with_retry(
driver, (By.XPATH, '//input[@type="text"]')
@ -140,14 +138,11 @@ async def get_channels_by_online_search(names, callback):
print(f"{name}:Error on search: {e}")
pass
finally:
if driver:
driver.quit()
# if driver:
# driver.quit()
channels[format_channel_name(name)] = info_list
names_queue.task_done()
pbar.update()
pbar.set_description(
f"Processing online search, {names_len - pbar.n} channels remaining"
)
callback(
f"正在线上查询更新, 剩余{names_len - pbar.n}个频道待查询, 预计剩余时间: {get_pbar_remaining(pbar, start_time)}",
int((pbar.n / names_len) * 100),
@ -157,21 +152,14 @@ async def get_channels_by_online_search(names, callback):
for name in names:
await names_queue.put(name)
names_len = names_queue.qsize()
pbar = tqdm_asyncio(total=names_len)
pbar.set_description(f"Processing online search, {names_len} channels remaining")
pbar = tqdm_asyncio(total=names_len, desc="Online search")
callback(f"正在线上查询更新, 共{names_len}个频道", 0)
with ThreadPoolExecutor(max_workers=5) as pool:
while not names_queue.empty():
loop = get_running_loop()
name = await names_queue.get()
proxy = (
proxy_list_test[proxy_index]
if config.open_proxy and proxy_list_test
else None
)
if config.open_proxy and proxy_list_test:
proxy_index = (proxy_index + 1) % len(proxy_list_test)
loop.run_in_executor(pool, process_channel_by_online_search, name, proxy)
print("Finished processing online search")
# with ThreadPoolExecutor(max_workers=5) as pool:
while not names_queue.empty():
# loop = get_running_loop()
name = await names_queue.get()
process_channel_by_online_search(name)
# loop.run_in_executor(pool, process_channel_by_online_search, name, proxy)
driver.quit()
pbar.close()
return channels

@ -0,0 +1,22 @@
from .request import get_proxy_list, get_proxy_list_with_test
proxy_list = []
proxy_list_test = []
proxy_index = 0
async def get_proxy(url=None, best=False, with_test=False):
"""
Get the proxy
"""
global proxy_list, proxy_list_test, proxy_index
if not proxy_list:
proxy_list = await get_proxy_list(3)
if not proxy_list_test or with_test:
proxy_list_test = await get_proxy_list_with_test(url or "https://www.baidu.com", proxy_list)
if not proxy_list_test:
return None
if best:
return proxy_list_test[0]
else:
proxy = proxy_list_test[proxy_index]
proxy_index = (proxy_index + 1) % len(proxy_list_test)
return proxy

@ -25,11 +25,12 @@ async def get_proxy_list(page_count=1):
url = pattern.format(page_index)
await url_queue.put(url)
pbar = tqdm_asyncio(total=url_queue.qsize(), desc="Getting proxy list")
driver = setup_driver()
def get_proxy(url):
driver = None
# driver = None
try:
driver = setup_driver()
# driver = setup_driver()
url = pattern.format(page_index)
retry_func(lambda: driver.get(url), name=url)
sleep(1)
@ -49,16 +50,18 @@ async def get_proxy_list(page_count=1):
proxy = f"http://{ip}:{port}"
proxy_list.append(proxy)
finally:
if driver:
driver.quit()
# if driver:
# driver.quit()
url_queue.task_done()
pbar.update()
with ThreadPoolExecutor(max_workers=5) as executor:
while not url_queue.empty():
loop = get_running_loop()
url = await url_queue.get()
loop.run_in_executor(executor, get_proxy, url)
# with ThreadPoolExecutor(max_workers=5) as executor:
while not url_queue.empty():
# loop = get_running_loop()
url = await url_queue.get()
get_proxy(url)
# loop.run_in_executor(executor, get_proxy, url)
driver.quit()
pbar.close()
return proxy_list
@ -89,5 +92,5 @@ async def get_proxy_list_with_test(base_url, proxy_list):
return []
proxy_list_with_test.sort(key=lambda x: x[1])
proxy_urls = [url for url, _ in proxy_list_with_test]
print(f"{len(proxy_urls)} valid proxy found")
print(f"Valid proxy found: {len(proxy_urls)}")
return proxy_urls

@ -22,9 +22,8 @@ async def get_channels_by_subscribe_urls(callback):
channels = {}
pattern = r"^(.*?),(?!#genre#)(.*?)$"
subscribe_urls_len = len(config.subscribe_urls)
pbar = tqdm_asyncio(total=subscribe_urls_len)
pbar = tqdm_asyncio(total=subscribe_urls_len, desc="Processing subscribe")
start_time = time()
pbar.set_description(f"Processing subscribe, {subscribe_urls_len} urls remaining")
callback(f"正在获取订阅源更新, 共{subscribe_urls_len}个订阅源", 0)
subscribe_queue = Queue()
for subscribe_url in config.subscribe_urls:
@ -67,7 +66,6 @@ async def get_channels_by_subscribe_urls(callback):
subscribe_queue.task_done()
pbar.update()
remain = subscribe_urls_len - pbar.n
pbar.set_description(f"Processing subscribe, {remain} urls remaining")
callback(
f"正在获取订阅源更新, 剩余{remain}个订阅源待获取, 预计剩余时间: {get_pbar_remaining(pbar, start_time)}",
int((pbar.n / subscribe_urls_len) * 100),
@ -75,10 +73,11 @@ async def get_channels_by_subscribe_urls(callback):
if config.open_online_search and pbar.n / subscribe_urls_len == 1:
callback("正在获取在线搜索结果, 请耐心等待", 0)
with ThreadPoolExecutor(max_workers=5) as pool:
loop = get_running_loop()
# with ThreadPoolExecutor(max_workers=10) as pool:
while not subscribe_queue.empty():
# loop = get_running_loop()
subscribe_url = await subscribe_queue.get()
loop.run_in_executor(pool, process_subscribe_channels, subscribe_url)
print("Finished processing subscribe urls")
process_subscribe_channels(subscribe_url)
# loop.run_in_executor(pool, process_subscribe_channels, subscribe_url)
pbar.close()
return channels

@ -3,7 +3,7 @@ from tkinter import messagebox
from tkinter import scrolledtext
from tkinter import ttk
from tkinter import filedialog
from utils import resource_path, load_external_config
from utils.config import resource_path, load_external_config
from main import UpdateSource
import os
import asyncio