dr_py/txt/hipy/哔滴影视.py
晚风拂柳颜 c7638e10e2 更新bidi
2024-01-14 16:12:43 +08:00

481 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File : 哔滴影视.py
# Author: DaShenHan&道长-----先苦后甜,任凭晚风拂柳颜------
# Author's Blog: https://blog.csdn.net/qq_32394351
# Date : 2024/1/10
import os.path
import sys
import requests
sys.path.append('..')
try:
# from base.spider import Spider as BaseSpider
from base.spider import BaseSpider
except ImportError:
from t4.base.spider import BaseSpider
from pathlib import Path
import base64
from cachetools import cached, TTLCache # 可以缓存curd的函数指定里面的key
"""
配置示例:
t4的配置里ext节点会自动变成api对应query参数extend,但t4的ext字符串不支持路径格式比如./开头或者.json结尾
api里会自动含有ext参数是base64编码后的选中的筛选条件
{
"key":"hipy_t4_哔滴影视",
"name":"哔滴影视(hipy_t4)",
"type":4,
"api":"http://192.168.31.49:5707/api/v1/vod/哔滴影视?api_ext={{host}}/txt/hipy/bidi.jar",
"searchable":1,
"quickSearch":0,
"filterable":1,
"ext":"{{host}}/txt/hipy/bidi.jar"
},
{
"key": "hipy_t3_哔滴影视",
"name": "哔滴影视(hipy_t3)",
"type": 3,
"api": "{{host}}/txt/hipy/哔滴影视.py",
"searchable": 1,
"quickSearch": 0,
"filterable": 1,
"ext": "{{host}}/txt/hipy/bidi.jar"
},
"""
def envkey(self, url: str):
return url
# 全局变量
gParam = {
"inited": False,
}
class Spider(BaseSpider): # 元类 默认的元类 type
api: str = 'https://www.bdys03.com/api/v1'
javar = None
def getDependence(self):
return ['base_java_loader']
def getName(self):
return "哔滴影视"
@cached(cache=TTLCache(maxsize=3, ttl=3600), key=envkey)
def get_init_api(self, url):
try:
print('get_init_api请求URL:', url)
r = self.fetch(url)
ret = None
if r.status_code == 200:
self.log(f'url:{url},文件体积:{len(r.content)}')
ret = r.content
return ret
except Exception as e:
print(f'get_init_api请求URL发生错误:{e}')
return {}
def init_api_ext_file(self):
"""
这个函数用于初始化py文件对应的json文件用于存筛选规则。
执行此函数会自动生成筛选文件
@return:
"""
pass
def init(self, extend=""):
"""
初始化加载extend一般与py文件名同名的json文件作为扩展筛选
@param extend:
@return:
"""
global gParam
ext = self.extend
if isinstance(ext, str) and ext:
if ext.endswith('.jar'):
jar_path = os.path.join(os.path.dirname(__file__), './jars')
os.makedirs(jar_path, exist_ok=True)
# jar_file = os.path.join(os.path.dirname(__file__), './jars/bdys.jar')
jar_file = os.path.join(os.path.dirname(__file__), './jars/bidi.jar')
jar_file = Path(jar_file).as_posix()
need_down = False
msg = ''
if not gParam['inited'] and not os.path.exists(jar_file):
need_down = True
msg = f'未inited,且文件不存在。开始下载文件'
elif gParam['inited'] and not os.path.exists(jar_file):
need_down = True
msg = f'已inited,但文件不存在。开始下载文件'
# elif not gParam['inited'] and os.path.exists(jar_file):
# need_down = True
# msg = f'未inited,但文件已存在。重新下载文件'
if need_down:
self.log(msg)
if self.ENV.lower() == 't3':
# ext = ext.replace('.jar', '.dex')
pass
content = self.get_init_api(ext)
with open(jar_file, mode='wb+') as f:
f.write(content)
# 装载模块,这里只要一个就够了
if isinstance(extend, list):
for lib in extend:
if '.Spider' in str(type(lib)):
self.javar = lib
break
if self.javar:
# jar_file = os.path.join(os.path.dirname(__file__), './jars/bdys.jar')
jar_file = os.path.join(os.path.dirname(__file__), './jars/bidi.jar')
jar_file = Path(jar_file).as_posix()
self.javar.init_jar(jar_file)
# self.class1 = self.javar.jClass('com.C4355b')
self.token = str(self.javar.call_java('com.EncryptionUtils', 'getToken'))
# self.class1 = self.javar.jClass('com.EncryptionUtils')
# # class1 = self.class1() # 类实例化
# class1 = self.class1
# self.token = str(class1.getToken())
# print(self.token)
# # self.token = str(self.class1.getToken())
self.headers.update({'token': self.token})
gParam['inited'] = True
def isVideo(self):
"""
返回是否为视频的匹配字符串
@return: None空 reg:正则表达式 js:input js代码
"""
return 'js:input.includes(".m3u8)?true:false'
def isVideoFormat(self, url):
pass
def manualVideoCheck(self):
pass
def homeContent(self, filterable=False):
"""
获取首页分类及筛选数据
@param filterable: 能否筛选跟t3/t4配置里的filterable参数一致
@return:
"""
class_name = '电影&电视剧&动漫&综艺' # 静态分类名称拼接
class_url = '0&1001&21&35' # 静态分类标识拼接
result = {}
classes = []
if all([class_name, class_url]):
class_names = class_name.split('&')
class_urls = class_url.split('&')
cnt = min(len(class_urls), len(class_names))
for i in range(cnt):
classes.append({
'type_name': class_names[i],
'type_id': class_urls[i]
})
result['class'] = classes
if filterable:
result['filters'] = self.config['filter']
return result
def homeVideoContent(self):
"""
首页推荐列表
@return:
"""
d = []
d.append({
'vod_name': '测试',
'vod_id': 'index.html',
'vod_pic': 'https://gitee.com/CherishRx/imagewarehouse/raw/master/image/13096725fe56ce9cf643a0e4cd0c159c.gif',
'vod_remarks': '原始hipy',
})
result = {
'list': d
}
return result
def categoryContent(self, tid, pg, filterable, extend):
"""
返回一级列表页数据
@param tid: 分类id
@param pg: 当前页数
@param filterable: 能否筛选
@param extend: 当前筛选数据
@return:
"""
url = self.api + f'/category/{tid}/{pg}?type=0'
r = self.fetch(url, headers=self.headers)
ret = r.json()
data = self.decode(ret['data'])
# print(data)
page_count = 12 # 默认赋值一页列表12条数据|这个值一定要写正确看他默认一页多少条
d = [{
'vod_name': vod['movieName'],
'vod_id': vod['id'],
'vod_pic': vod['cdnCover'],
'vod_remarks': vod['rank'],
'vod_content': vod['title'],
} for vod in data['list']]
result = {
'list': d,
'page': pg,
'pagecount': 9999 if len(d) >= page_count else pg,
'limit': 90,
'total': 999999,
}
return result
def detailContent(self, ids):
"""
返回二级详情页数据
@param ids: 一级传过来的vod_id列表
@return:
"""
vod_id = ids[0]
url = self.api + f'/detail/{vod_id}'
r = self.fetch(url, headers=self.headers)
ret = r.json()
data = self.decode(ret['data'])
# print(self.json2str(data))
vod = data['movie']
playlist = data['playlist']
titles = []
plays = {}
for p in playlist: # 选集列表
title = p['title']
titles.append(title)
if not plays.get(title):
plays[title] = []
_type = '1' if p.get('tosId') else '0'
purl = self.api + '/playurl/' + str(p['id']) + '?type=' + _type
plays[title].append({'name': '至尊线路', 'url': f'vip://{purl}'})
# if p.get('tosId'):
# purl = self.api + '/playurl/' + str(p['id']) + '?type=' + str(p.get('tosId') or '0')
# plays[title].append({'name': '至尊线路', 'url': f'vip://{purl}'})
if p.get('url'):
for p0 in p['url'].split(','):
plays[title].append(
{'name': p0.split('#')[1] if len(p0.split('#')) > 1 else '道长线路', 'url': p0.split('#')[0]})
if p.get('url1'):
for p1 in p['url1'].split(','):
plays[title].append(
{'name': p1.split('#')[1] if len(p1.split('#')) > 1 else '道长线路', 'url': p1.split('#')[0]})
if p.get('url2'):
for p2 in p['url2'].split(','):
plays[title].append(
{'name': p2.split('#')[1] if len(p2.split('#')) > 1 else '道长线路', 'url': p2.split('#')[0]})
tabs = {}
# key 选集列表 value是线路列表
for key, value in plays.items():
for tab in value:
if not tab['name'] in tabs:
tabs[tab['name']] = []
tabs[tab['name']].append(f"{key}${tab['url']}")
vod_play_from = '$$$'.join(tabs.keys())
vod_play_urls = []
for key, value in tabs.items():
vod_play_urls.append('#'.join(value))
vod_play_url = '$$$'.join(vod_play_urls)
vod = {"vod_id": vod_id,
"vod_name": vod['title'],
"vod_pic": vod['cdnCover'],
"type_name": ','.join(vod['m_type']),
"vod_year": '',
"vod_area": vod['area'],
"vod_remarks": f"{vod['movieName']} {vod['rank']}",
"vod_actor": ','.join(vod['m_performer']),
"vod_director": ','.join(vod['m_director']),
"vod_content": vod['intro'],
"vod_play_from": vod_play_from,
"vod_play_url": vod_play_url}
result = {
'list': [vod]
}
return result
def searchContent(self, wd, quick=False, pg=1):
"""
返回搜索列表
@param wd: 搜索关键词
@param quick: 是否来自快速搜索。t3/t4配置里启用了快速搜索在快速搜索在执行才会是True
@return:
"""
url = self.api + f'/search/{wd}/{pg}'
r = self.fetch(url, headers=self.headers)
ret = r.json()
data = self.decode(ret['data'])
# print(data)
d = []
for li in data['list']:
d.append({
'vod_name': li['movieName'],
'vod_id': li['id'],
'vod_pic': li['cdnCover'],
'vod_remarks': li['curEp'],
'vod_content': li['intro'],
})
result = {
'list': d
}
# print(result)
return result
def playerContent(self, flag, id, vipFlags):
"""
解析播放,返回json。壳子视情况播放直链或进行嗅探
@param flag: vod_play_from 播放来源线路
@param id: vod_play_url 播放的链接
@param vipFlags: vip标识
@return:
"""
url = str(id)
# 至尊线路
if url.startswith('vip://'):
purl = url.split('vip://')[1]
# print(purl)
r = self.fetch(purl, headers=self.headers)
ret = r.json()
data = self.decode(ret['data'])
# print(data)
url = data.get('url') or ''
if not url:
self.log(data)
headers = {
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1'
}
parse = 0
if 'm3u8' in url:
proxyUrl = self.getProxyUrl()
if proxyUrl:
url = proxyUrl + '&url=' + url + '&name=1.m3u8'
elif '/obj/' in url:
headers.update({
'Cookie': 'm=1',
'app': '1',
'Referer': 'https://doc.weixin.qq.com/',
})
result = {
'parse': parse, # 1=嗅探,0=播放
'playUrl': '', # 解析链接
'url': url, # 直链或待嗅探地址
'header': headers, # 播放UA
}
# print(result)
return result
config = {
"player": {},
"filter": {}
}
headers = {
"User-Agent": "Dalvik/2.1.0 (Linux; U; Android 7.0; HUAWEI MLA-AL10 Build/HUAWEIMLA-AL10)",
"token": ""
}
def localProxy(self, params):
# print(params)
url = params.get('url')
if not url:
# return [302, 'text/html', None, {'location': 'https://www.baidu.com'}]
# return [404, 'text/plain', 'Not Found']
return [403, 'text/plain', '403 forbidden. url is required']
name = params.get('name') or 'm3u8'
burl = 'https://www.bdys03.com'
new_url = url.replace("www.bde4.cc", "www.bdys03.com")
self.log(f'原始url:{url},替换域名后url:{new_url}')
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3947.100 Safari/537.36",
"Referer": burl,
"Origin": burl,
}
r = self.fetch(new_url, headers=headers)
pdata = self.process_data(r.content).decode('utf-8')
# pdata = re.sub(r'(.*?ts)', r'https://www.bdys03.com/\1', pdata)
pdata = self.replaceAll(pdata, r'(.*?ts)', r'https://www.bdys03.com/\1')
content = pdata.strip()
media_type = 'text/plain' if 'txt' in name else 'video/MP2T'
return [200, media_type, content]
# -----------------------------------------------自定义函数-----------------------------------------------
def decode(self, text):
bt = base64.b64decode(text)
# self.log(self.headers)
if self.ENV.lower() == 't3':
bt = self.javar.jarBytes(bt)
res = self.javar.call_java('com.EncryptionUtils', 'dec', bt)
# res = self.class1.dec(bt)
# print(str(res))
return self.str2json(str(res)) if res else None
def process_data(self, req_bytes):
"""
个性化方法:跳过req返回的content 3354之前的字节并进行gzip解压
@param req_bytes:
@return:
"""
stream = self.skip_bytes(req_bytes, 3354)
decrypted_data = self.gzipCompress(stream)
return decrypted_data
if __name__ == '__main__':
from t4.core.loader import t4_spider_init
spider = Spider()
t4_spider_init(spider)
print(spider.ENV)
# spider.init_api_ext_file() # 生成筛选对应的json文件
# spider.log({'key': 'value'})
# spider.log('====文本内容====')
# print(spider.homeContent(True))
# print(spider.homeVideoContent())
r = requests.head(
'http://192.168.31.49:5707/api/v1/vod/%E5%93%94%E6%BB%B4%E5%BD%B1%E8%A7%86?proxy=1&do=py&url=https://www.bde4.cc/10E79044B82A84F70BE1308FFA5232E4DC3D0CA9EC2BF6B1D4EF56B2CE5B67CF238965CCAE17F859665B7E166720986D.m3u8')
print(r.headers, r.content)
r = requests.get('https://www.bdys10.com/obj/63BEE3B148E464F16EE62435C53087B994902679D844EA9CC3615658CF55E01D',
headers={
'Cookie': 'm=1',
'app': '1',
'Referer': 'https://doc.weixin.qq.com/',
})
print(r.text)
# print(spider.categoryContent('0', 1, False, None))
# print(spider.detailContent([24420]))
# spider.searchContent('斗罗大陆')
# print(spider.playerContent('至尊线路', 'vip://https://www.bdys03.com/api/v1/playurl/174296?type=1', None))
# print(spider.playerContent('需要解析',
# 'https://www.bde4.cc/10E79044B82A84F70BE1308FFA5232E4DC3D0CA9EC2BF6B1D4EF56B2CE5B67CF238965CCAE17F859665B7E166720986D.m3u8',
# None))