mirror of
https://github.com/xmbjm/TV.git
synced 2025-01-21 08:55:37 -05:00
243 lines
8.9 KiB
Python
243 lines
8.9 KiB
Python
import pickle
|
|
import re
|
|
import threading
|
|
from collections import defaultdict
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from time import time
|
|
|
|
from requests import get
|
|
from tqdm.asyncio import tqdm_asyncio
|
|
|
|
import updates.fofa.fofa_map as fofa_map
|
|
import utils.constants as constants
|
|
from requests_custom.utils import get_source_requests, close_session
|
|
from updates.proxy import get_proxy, get_proxy_next
|
|
from utils.channel import format_channel_name
|
|
from utils.config import config
|
|
from utils.retry import retry_func
|
|
from utils.tools import merge_objects, get_pbar_remaining, add_url_info, resource_path
|
|
|
|
|
|
def get_fofa_urls_from_region_list():
|
|
"""
|
|
Get the FOFA url from region
|
|
"""
|
|
urls = []
|
|
region_url = getattr(fofa_map, "region_url")
|
|
region_list = config.hotel_region_list
|
|
if "all" in region_list or "ALL" in region_list or "全部" in region_list:
|
|
urls = [
|
|
(url, region)
|
|
for region, url_list in region_url.items()
|
|
for url in url_list
|
|
if url
|
|
]
|
|
else:
|
|
for region in region_list:
|
|
if region in region_url:
|
|
urls.extend([(url, region) for url in region_url[region] if url])
|
|
return urls
|
|
|
|
|
|
def update_fofa_region_result_tmp(result, multicast=False):
|
|
"""
|
|
Update fofa region result tmp
|
|
"""
|
|
tmp_result = get_fofa_region_result_tmp(multicast=multicast)
|
|
total_result = merge_objects(tmp_result, result)
|
|
with open(
|
|
resource_path(
|
|
f"updates/fofa/fofa_{'multicast' if multicast else 'hotel'}_region_result.pkl"
|
|
),
|
|
"wb",
|
|
) as file:
|
|
pickle.dump(total_result, file)
|
|
|
|
|
|
def get_fofa_region_result_tmp(multicast: False):
|
|
try:
|
|
with open(
|
|
resource_path(
|
|
f"updates/fofa/fofa_{'multicast' if multicast else 'hotel'}_region_result.pkl"
|
|
),
|
|
"rb",
|
|
) as file:
|
|
return pickle.load(file)
|
|
except:
|
|
return {}
|
|
|
|
|
|
async def get_channels_by_fofa(urls=None, multicast=False, callback=None):
|
|
"""
|
|
Get the channel by FOFA
|
|
"""
|
|
fofa_results = {}
|
|
if config.open_use_cache:
|
|
fofa_results = get_fofa_region_result_tmp(multicast=multicast)
|
|
if config.open_request:
|
|
fofa_urls = urls if urls else get_fofa_urls_from_region_list()
|
|
fofa_urls_len = len(fofa_urls)
|
|
pbar = tqdm_asyncio(
|
|
total=fofa_urls_len,
|
|
desc=f"Processing fofa for {'multicast' if multicast else 'hotel'}",
|
|
)
|
|
start_time = time()
|
|
mode_name = "组播" if multicast else "酒店"
|
|
if callback:
|
|
callback(
|
|
f"正在获取Fofa{mode_name}源, 共{fofa_urls_len}个查询地址",
|
|
0,
|
|
)
|
|
proxy = None
|
|
open_proxy = config.open_proxy
|
|
open_driver = config.open_driver
|
|
if open_driver:
|
|
from driver.setup import setup_driver
|
|
open_sort = config.open_sort
|
|
if open_proxy:
|
|
test_url = fofa_urls[0][0]
|
|
proxy = await get_proxy(test_url, best=True, with_test=True)
|
|
cancel_event = threading.Event()
|
|
hotel_name = constants.origin_map["hotel"]
|
|
|
|
def process_fofa_channels(fofa_info):
|
|
nonlocal proxy
|
|
if cancel_event.is_set():
|
|
return {}
|
|
fofa_url = fofa_info[0]
|
|
results = defaultdict(lambda: defaultdict(list))
|
|
driver = None
|
|
try:
|
|
if open_driver:
|
|
driver = setup_driver(proxy)
|
|
try:
|
|
retry_func(lambda: driver.get(fofa_url), name=fofa_url)
|
|
except Exception as e:
|
|
if open_proxy:
|
|
proxy = get_proxy_next()
|
|
driver.close()
|
|
driver.quit()
|
|
driver = setup_driver(proxy)
|
|
driver.get(fofa_url)
|
|
page_source = driver.page_source
|
|
else:
|
|
page_source = retry_func(
|
|
lambda: get_source_requests(fofa_url), name=fofa_url
|
|
)
|
|
if any(keyword in page_source for keyword in ["访问异常", "禁止访问", "资源访问每天限制"]):
|
|
cancel_event.set()
|
|
raise ValueError("Limited access to fofa page")
|
|
fofa_source = re.sub(r"<!--.*?-->", "", page_source, flags=re.DOTALL)
|
|
urls = set(re.findall(r"https?://[\w\.-]+:\d+", fofa_source))
|
|
if multicast:
|
|
region = fofa_info[1]
|
|
type = fofa_info[2]
|
|
multicast_result = [(url, None, None) for url in urls]
|
|
results[region][type] = multicast_result
|
|
else:
|
|
with ThreadPoolExecutor(max_workers=100) as executor:
|
|
futures = [
|
|
executor.submit(
|
|
process_fofa_json_url,
|
|
url,
|
|
fofa_info[1],
|
|
open_sort,
|
|
hotel_name,
|
|
)
|
|
for url in urls
|
|
]
|
|
for future in futures:
|
|
results = merge_objects(results, future.result())
|
|
return results
|
|
except ValueError as e:
|
|
raise e
|
|
except Exception as e:
|
|
print(e)
|
|
finally:
|
|
if driver:
|
|
driver.close()
|
|
driver.quit()
|
|
pbar.update()
|
|
remain = fofa_urls_len - pbar.n
|
|
if callback:
|
|
callback(
|
|
f"正在获取Fofa{mode_name}源, 剩余{remain}个查询地址待获取, 预计剩余时间: {get_pbar_remaining(n=pbar.n, total=pbar.total, start_time=start_time)}",
|
|
int((pbar.n / fofa_urls_len) * 100),
|
|
)
|
|
|
|
max_workers = 3 if open_driver else 10
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
futures = [
|
|
executor.submit(process_fofa_channels, fofa_url) for fofa_url in fofa_urls
|
|
]
|
|
try:
|
|
for future in as_completed(futures):
|
|
result = future.result()
|
|
if result:
|
|
fofa_results = merge_objects(fofa_results, result)
|
|
except ValueError as e:
|
|
if "Limited access to fofa page" in str(e):
|
|
for future in futures:
|
|
future.cancel()
|
|
if fofa_results:
|
|
update_fofa_region_result_tmp(fofa_results, multicast=multicast)
|
|
pbar.n = fofa_urls_len
|
|
pbar.update(0)
|
|
if callback:
|
|
callback(
|
|
f"正在获取Fofa{mode_name}源",
|
|
100,
|
|
)
|
|
if not open_driver:
|
|
close_session()
|
|
pbar.close()
|
|
return fofa_results
|
|
|
|
|
|
def process_fofa_json_url(url, region, open_sort, hotel_name="酒店源"):
|
|
"""
|
|
Process the FOFA json url
|
|
"""
|
|
channels = {}
|
|
try:
|
|
final_url = url + "/iptv/live/1000.json?key=txiptv"
|
|
# response = retry_func(
|
|
# lambda: get(final_url, timeout=timeout),
|
|
# name=final_url,
|
|
# )
|
|
response = get(final_url, timeout=config.request_timeout)
|
|
try:
|
|
json_data = response.json()
|
|
if json_data["code"] == 0:
|
|
try:
|
|
for item in json_data["data"]:
|
|
if isinstance(item, dict):
|
|
item_name = format_channel_name(item.get("name"))
|
|
item_url = item.get("url").strip()
|
|
if item_name and item_url:
|
|
total_url = (
|
|
add_url_info(
|
|
f"{url}{item_url}",
|
|
f"{region}{hotel_name}-cache:{url}",
|
|
)
|
|
if open_sort
|
|
else add_url_info(
|
|
f"{url}{item_url}", f"{region}{hotel_name}"
|
|
)
|
|
)
|
|
if item_name not in channels:
|
|
channels[item_name] = [(total_url, None, None)]
|
|
else:
|
|
channels[item_name].append((total_url, None, None))
|
|
except Exception as e:
|
|
# print(f"Error on fofa: {e}")
|
|
pass
|
|
except Exception as e:
|
|
# print(f"{url}: {e}")
|
|
pass
|
|
except Exception as e:
|
|
# print(f"{url}: {e}")
|
|
pass
|
|
finally:
|
|
return channels
|