昨天遇到一个视频素材想要下载下来,但找到的资源要不是无法下载,要不就是下载的格式只能用指定的播放器播放,于是自己写了一个脚本,来下载流媒体,视频,电影,电视剧,付费的,vip的都能下载。
运行环境:python3.7, requests
直接上源码:
from multiprocessing.dummy import Pool
from urllib import parse
import re, requests, os, datetime, hashlib, json, shutil, glob, uuid
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36'
}
session = requests.Session()
d_su = []
use_date_num = 0
#破解api的加密字符
def api_key():
a = datetime.date.today()
date = a.day
p = str(date+9+9^10)
md5_1 = hashlib.md5(p.encode(encoding='UTF-8')).hexdigest()
p = md5_1[:10]
md5_2 = hashlib.md5(p.encode(encoding='UTF-8')).hexdigest()
return md5_2
#获取搜索到的视频列表(json数据),转化dict数据
def get_m3u8_urls(url,v_name):
session.get(url,headers=headers,timeout=5)
m3u8_datas_url = 'https://z1.m1907.cn/api/v/?z='+api_key()+'&jx='+parse.quote(v_name)
resp = session.get(m3u8_datas_url,headers=headers,timeout=5)
data = json.loads(resp.text)['data']
return data
#爬取m3u8文件,返回保存地址和baseurl
def get_m3u8(m3u8_url):
resp = session.get(m3u8_url,headers=headers,timeout=5)
base_url = re.search(r'http.+/',m3u8_url).group()
m3u8_file = '.\\bak\\'+str(uuid.uuid4().hex)+'.m3u8'
with open(m3u8_file, 'wb') as fp:
fp.write(resp.content)
m3u8_data = [m3u8_file,base_url]
return m3u8_data
#解析m3u8文件,获取视频流的下载地址,以列表形式返回
def parse_m3u8(m3u8_data):
ts = re.compile(r',.+?.ts',re.S)
base_url = m3u8_data[1]
with open(m3u8_data[0], 'r') as fp:
ts_list = ts.findall(fp.read())
#有些m3u8文件内容不是视频片段的地址,而是指向另一个m3u8文件的地址,二次加密
if not len(ts_list):
with open(m3u8_data[0], 'r') as fp:
m3u8_url = base_url + fp.readlines()[-1].strip()
m3u8_d = get_m3u8(m3u8_url)
base_url = m3u8_d[1]
ts = re.compile(r',.+?.ts',re.S)
with open(m3u8_d[0], 'r') as fp:
ts_list = ts.findall(fp.read())
data_list = list(map(lambda x:base_url+x[2:],ts_list))
return data_list
#----------------解析二次加密结束-------------------
data_list = list(map(lambda x:base_url+x[2:],ts_list))
return data_list
#下载视频片段
def download_videos(data_url):
try:
data = session.get(data_url,headers=headers,timeout=3)
except Exception as e:
try:
data = session.get(data_url,headers=headers,timeout=3)
except Exception as e:
try:
data = session.get(data_url,headers=headers,timeout=3)
except Exception as e:
pass
#print('%s下载失败' % data_url) 显示下载失败的链接
else:
d_su.append(data_url)
name = data_url[-9:]
with open('.\\bak\\' + name, 'wb') as code:
code.write(data.content)
finally:
global use_date_num
use_date_num+=1
#合并视频片段
def merge_movie(name, movie_name):
name = name[-9:]
with open('.\\bak\\' + name, 'rb') as code:
data = code.read()
with open(movie_name + '.ts', 'ab') as code:
code.write(data)
data = None
def main():
url = 'https://z1.m1907.cn'
name=''
chioce=''
m3u8_url=''
movie_list = {}
movie_name = input('输入电影名称: ')
os.makedirs('bak') if os.path.exists('bak') == False else None #创建临时文件夹
datas = get_m3u8_urls(url,movie_name) #获取视频列表数据
#-------------展示视频列表(带序号)-------------
x=1
for v_data in datas:
name = v_data['name']
y=1
for eps in v_data['source']['eps']:
chioce = eps['name']
m3u8_url = eps['url']
xh = str(x)+'.'+str(y)
print(xh+name+chioce)
movie_list[xh] = m3u8_url
y+=1
x+=1
print('*'*50)
#-------------展示视频列表(带序号)结束-------------
in_xh = input('电影名称前的序号进行下载: ')
m3u8_url = movie_list.get(in_xh,None)
#print(m3u8_url,'-----------m3u8_url--------')显示下载m3u8文件的url
if not m3u8_url:
print('序号输入错误')
exit()
print('正在加载中。。。')
m3u8_data = get_m3u8(m3u8_url) #下载m3u8文件,返回保存地址和baseurl
#print(m3u8_data,'--------m3u8_data--------') 显示m3u8文件地址和下载的baseurl
data_list = parse_m3u8(m3u8_data) #解析m3u8文件,获取下载视频片段的地址列表
#print(data_list,'---------data_list----------')#显示视频下载链接的列表
#-----------------开启线程池下载视频流--------------------
if len(data_list):
pool = Pool(10)
pool.imap(download_videos, data_list)
while use_date_num < len(data_list):
d_num = len(glob.glob(pathname='.\\bak\\*.ts'))
per = int(d_num * 100 / len(data_list))
print('已经下载: {}%'.format(per),end='\r')
pool.close()
pool.join()
#-----------------再次尝试下载失败的链接---------------
for i in range(5):
complete = list(set(data_list) ^ set(d_su))
if len(complete):
pool = Pool(10)
pool.imap(download_videos, complete)
d_num = len(glob.glob(pathname='.\\bak\\*.ts'))
per = int(d_num * 100 / len(data_list))
print('已经下载: {}%'.format(per),end='\r')
pool.close()
pool.join()
else:
break
#print(complete,len(complete)) 显示上次下载失败的链接和数量
#-----------------尝试下载失败的链接结束--------------
#-------------------线程池下载视频流结束------------------------
#判断是否下载完所有的视频片段
if len(data_list) != len(glob.glob(pathname='.\\bak\\*.ts')):
shutil.rmtree('./bak',ignore_errors=True)
raise '下载失败'
print('电影合并中...请等待...')
#合并视频片段
for i in data_list:
merge_movie(i, movie_name)
shutil.rmtree('./bak',ignore_errors=True)
print('电影下载完成...')
if __name__ == '__main__':
main()
最后
这只是自用版本,下载的视频也是.ts的格式,需要其他格式的,可以自行修改,这里仅供学习,请勿商用或者进行不法活动











网友评论