dr_py/utils/download_progress.py
2023-11-22 23:44:12 +08:00

192 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File : download_progress.py
# Author: DaShenHan&道长-----先苦后甜,任凭晚风拂柳颜------
# Date : 2023/10/30
# 下载进度条工具
import os
import time
# import logging
import requests
from urllib.parse import unquote
from contextlib import closing
from utils.log import logger
chunkSize = 1024 * 1024
loop = 5
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"
}
def speed_handle(process, file_length):
if process != file_length:
num = process / file_length
progress = ': \033[1;33m{:.2f}\033[0m%|{}{}| '.format(float(num * 100), '' * round(num * 20),
'' * round((1 - num) * 20))
else:
progress = ' \033[1;33m{}\033[0m% |{}|'.format(100, '' * 50)
# print(progress, flush=True, end='')
logger.info(progress)
def get_file_name(url, headers):
filename = ''
if 'Content-Disposition' in headers and headers['Content-Disposition']:
disposition_split = headers['Content-Disposition'].split(';')
if len(disposition_split) > 1:
if disposition_split[1].strip().lower().startswith('filename='):
file_name = disposition_split[1].split('=')
if len(file_name) > 1:
filename = unquote(file_name[1])
if not filename and os.path.basename(url):
filename = os.path.basename(url).split("?")[0]
if not filename:
return time.time()
return filename
def file_download(fileUrl, filePath):
if os.path.exists(filePath):
os.remove(filePath)
# response = requests.get(fileUrl, headers=headers, stream=True, verify=False)
response = requests.get(fileUrl, headers=headers, stream=True)
is_chunked = response.headers.get('Transfer-Encoding') or ''
if is_chunked == 'chunked':
logger.info('chunked文件不支持获取总文件大小tqdm模块才能分段下载')
return False
fileSize = int(response.headers.get('content-length') or 0) # 文件大小
logger.info(f'fileSize:{fileSize}')
if fileSize < 1:
return False
tmpSize = 0
n = 0
isDownloaded = False
while n < loop:
if os.path.exists(filePath): # 判断文件是否存在
tmpSize = os.path.getsize(filePath)
_headers = {"Range": "bytes={}-{}".format(tmpSize, fileSize),
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36"}
# headers.update({"Range": "bytes={}-{}".format(tmpSize, fileSize)})
contentSize = 0
remainSize = (fileSize - tmpSize) / chunkSize
filename = os.path.basename(filePath)
st = time.perf_counter()
if remainSize > 0:
with closing(requests.get(fileUrl, headers=_headers, stream=True)) as _response, open(
filePath,
"ab") as file:
for content in _response.iter_content(chunk_size=chunkSize):
file.write(content)
timeTook = time.perf_counter() - st
contentSize += len(content) / chunkSize
# print('\r{}/{}: {}'.format(cnt + 1, len(fileUrls), filename), flush=True, end='')
# logger.info('\r{}/{}: {}'.format(cnt + 1, len(fileUrls), filename))
logger.info(f'文件{filename}下载中...')
speed_handle(contentSize + tmpSize / chunkSize, fileSize / chunkSize)
downloadSpeed = contentSize / timeTook # 平均下载速度
remainingTime = int(timeTook / (contentSize / remainSize) - timeTook) # 估计剩余下载时间
# print(
# '[' + 'average speed: \033[1;31m{:.2f}MiB/s\033[0m, remaining time: \033[1;32m{}s\033[0m, file size: \033[1;34m{:.2f}MiB\033[0m'.format(
# downloadSpeed,
# remainingTime,
# fileSize / chunkSize) + ']', flush=True, end=' '
# )
logger.info(
'[' + 'average speed: \033[1;31m{:.2f}MiB/s\033[0m, remaining time: \033[1;32m{}s\033[0m, file size: \033[1;34m{:.2f}MiB\033[0m'.format(
downloadSpeed,
remainingTime,
fileSize / chunkSize) + ']'
)
else:
isDownloaded = True
break
n += 1
return isDownloaded
def file_downloads(files, save_path='download'):
"""
files = [{'url':'https://ghproxy.liuzhicong.com/https://github.com/hjdhnx/dr_py/archive/refs/heads/main.zip','name':'dr_py.zip'}]
:param save_path:
:param files:
:return:
"""
# save_path = 'tmp'
os.makedirs(save_path, exist_ok=True)
# logging.basicConfig(level=logging.INFO, filename='download/downloading.log', filemode='a', format="%(message)s")
localtime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
logger.info(localtime + ': Start downloading task: {}'.format(files))
failedUrl = []
for cnt, file in enumerate(files):
fileUrl = file.get('url')
if not fileUrl:
print('file error:no url')
continue
fileName = file.get('name')
filename = fileName or get_file_name(fileUrl, headers) # 获取文件名称
logger.info(f'开始下载{filename}: {fileUrl}')
try:
t0 = time.perf_counter()
isDload = file_download(fileUrl, os.path.join(save_path, filename))
t1 = time.perf_counter()
localtime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
if isDload:
logger.info(
localtime + ': {} download successfully! Time consuming: {:.3f}s'.format(filename, t1 - t0))
else:
logger.info(localtime + ': {} download failed! Url: {}'.format(filename, fileUrl))
failedUrl.append(fileUrl)
except Exception as e:
logger.info(f'链接{fileUrl}下载失败,由于出现了错误:{e}')
failedUrl.append(fileUrl)
if len(failedUrl):
with open(os.path.join(save_path, 'failedUrl.txt'), 'w') as p:
for url in failedUrl:
p.write(url + '\n')
fn = len(failedUrl)
sn = len(files) - fn
# print("\n{} file{} download successfully, {} file{} download failed!".format(sn, 's' * (sn > 1), fn, 's' * (fn > 1)))
logger.info(
"\n{} file{} download successfully, {} file{} download failed!".format(sn, 's' * (sn > 1), fn, 's' * (fn > 1)))
if fn > 0:
return False
else:
return True
if __name__ == '__main__':
# urlTxt = 'download/urls.txt'
# with open(urlTxt, "r") as f:
# fileUrls = [line.strip() for line in f.readlines()]
response = requests.get('https://ghproxy.liuzhicong.com/https://github.com/hjdhnx/dr_py/archive/refs/heads/main.zip', headers=headers, stream=True)
print(response.headers)
fileSize = int(response.headers.get('content-length')) # 文件大小
logger.info(f'fileSize:{fileSize}')
# files = [{'url': 'https://ghproxy.liuzhicong.com/https://github.com/hjdhnx/dr_py/archive/refs/heads/main.zip',
# 'name': 'dr_py.zip'}]
# file_downloads(files, 'tmp')