TV/updates/fofa/request.py
2024-12-11 17:19:33 +08:00

243 lines
8.9 KiB
Python

import pickle
import re
import threading
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from time import time
from requests import get
from tqdm.asyncio import tqdm_asyncio
import updates.fofa.fofa_map as fofa_map
import utils.constants as constants
from requests_custom.utils import get_source_requests, close_session
from updates.proxy import get_proxy, get_proxy_next
from utils.channel import format_channel_name
from utils.config import config
from utils.retry import retry_func
from utils.tools import merge_objects, get_pbar_remaining, add_url_info, resource_path
def get_fofa_urls_from_region_list():
"""
Get the FOFA url from region
"""
urls = []
region_url = getattr(fofa_map, "region_url")
region_list = config.hotel_region_list
if "all" in region_list or "ALL" in region_list or "全部" in region_list:
urls = [
(url, region)
for region, url_list in region_url.items()
for url in url_list
if url
]
else:
for region in region_list:
if region in region_url:
urls.extend([(url, region) for url in region_url[region] if url])
return urls
def update_fofa_region_result_tmp(result, multicast=False):
"""
Update fofa region result tmp
"""
tmp_result = get_fofa_region_result_tmp(multicast=multicast)
total_result = merge_objects(tmp_result, result)
with open(
resource_path(
f"updates/fofa/fofa_{'multicast' if multicast else 'hotel'}_region_result.pkl"
),
"wb",
) as file:
pickle.dump(total_result, file)
def get_fofa_region_result_tmp(multicast: False):
try:
with open(
resource_path(
f"updates/fofa/fofa_{'multicast' if multicast else 'hotel'}_region_result.pkl"
),
"rb",
) as file:
return pickle.load(file)
except:
return {}
async def get_channels_by_fofa(urls=None, multicast=False, callback=None):
"""
Get the channel by FOFA
"""
fofa_results = {}
if config.open_use_cache:
fofa_results = get_fofa_region_result_tmp(multicast=multicast)
if config.open_request:
fofa_urls = urls if urls else get_fofa_urls_from_region_list()
fofa_urls_len = len(fofa_urls)
pbar = tqdm_asyncio(
total=fofa_urls_len,
desc=f"Processing fofa for {'multicast' if multicast else 'hotel'}",
)
start_time = time()
mode_name = "组播" if multicast else "酒店"
if callback:
callback(
f"正在获取Fofa{mode_name}源, 共{fofa_urls_len}个查询地址",
0,
)
proxy = None
open_proxy = config.open_proxy
open_driver = config.open_driver
if open_driver:
from driver.setup import setup_driver
open_sort = config.open_sort
if open_proxy:
test_url = fofa_urls[0][0]
proxy = await get_proxy(test_url, best=True, with_test=True)
cancel_event = threading.Event()
hotel_name = constants.origin_map["hotel"]
def process_fofa_channels(fofa_info):
nonlocal proxy
if cancel_event.is_set():
return {}
fofa_url = fofa_info[0]
results = defaultdict(lambda: defaultdict(list))
driver = None
try:
if open_driver:
driver = setup_driver(proxy)
try:
retry_func(lambda: driver.get(fofa_url), name=fofa_url)
except Exception as e:
if open_proxy:
proxy = get_proxy_next()
driver.close()
driver.quit()
driver = setup_driver(proxy)
driver.get(fofa_url)
page_source = driver.page_source
else:
page_source = retry_func(
lambda: get_source_requests(fofa_url), name=fofa_url
)
if any(keyword in page_source for keyword in ["访问异常", "禁止访问", "资源访问每天限制"]):
cancel_event.set()
raise ValueError("Limited access to fofa page")
fofa_source = re.sub(r"<!--.*?-->", "", page_source, flags=re.DOTALL)
urls = set(re.findall(r"https?://[\w\.-]+:\d+", fofa_source))
if multicast:
region = fofa_info[1]
type = fofa_info[2]
multicast_result = [(url, None, None) for url in urls]
results[region][type] = multicast_result
else:
with ThreadPoolExecutor(max_workers=100) as executor:
futures = [
executor.submit(
process_fofa_json_url,
url,
fofa_info[1],
open_sort,
hotel_name,
)
for url in urls
]
for future in futures:
results = merge_objects(results, future.result())
return results
except ValueError as e:
raise e
except Exception as e:
print(e)
finally:
if driver:
driver.close()
driver.quit()
pbar.update()
remain = fofa_urls_len - pbar.n
if callback:
callback(
f"正在获取Fofa{mode_name}源, 剩余{remain}个查询地址待获取, 预计剩余时间: {get_pbar_remaining(n=pbar.n, total=pbar.total, start_time=start_time)}",
int((pbar.n / fofa_urls_len) * 100),
)
max_workers = 3 if open_driver else 10
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(process_fofa_channels, fofa_url) for fofa_url in fofa_urls
]
try:
for future in as_completed(futures):
result = future.result()
if result:
fofa_results = merge_objects(fofa_results, result)
except ValueError as e:
if "Limited access to fofa page" in str(e):
for future in futures:
future.cancel()
if fofa_results:
update_fofa_region_result_tmp(fofa_results, multicast=multicast)
pbar.n = fofa_urls_len
pbar.update(0)
if callback:
callback(
f"正在获取Fofa{mode_name}",
100,
)
if not open_driver:
close_session()
pbar.close()
return fofa_results
def process_fofa_json_url(url, region, open_sort, hotel_name="酒店源"):
"""
Process the FOFA json url
"""
channels = {}
try:
final_url = url + "/iptv/live/1000.json?key=txiptv"
# response = retry_func(
# lambda: get(final_url, timeout=timeout),
# name=final_url,
# )
response = get(final_url, timeout=config.request_timeout)
try:
json_data = response.json()
if json_data["code"] == 0:
try:
for item in json_data["data"]:
if isinstance(item, dict):
item_name = format_channel_name(item.get("name"))
item_url = item.get("url").strip()
if item_name and item_url:
total_url = (
add_url_info(
f"{url}{item_url}",
f"{region}{hotel_name}-cache:{url}",
)
if open_sort
else add_url_info(
f"{url}{item_url}", f"{region}{hotel_name}"
)
)
if item_name not in channels:
channels[item_name] = [(total_url, None, None)]
else:
channels[item_name].append((total_url, None, None))
except Exception as e:
# print(f"Error on fofa: {e}")
pass
except Exception as e:
# print(f"{url}: {e}")
pass
except Exception as e:
# print(f"{url}: {e}")
pass
finally:
return channels