闲来无事,写了个m3u8视频下载器,分享给各位(好处不多说!都懂!),如果有什么不对的地方,还请指正。另外还有m3u8视频解析器,通过视频播放链接(非商业性网站)解析出m3u8地址,然后再通过m3u8下载器进行下载,如果有需要的小伙伴请私信。

中间可能会有些看上去冗余的代码,主要是为了兼容各种稀奇古怪的m3u8内容。

脚本仅用于技术学习与研究,请勿用于任何非法用途,否则后果自负,本作者不承担任何法律责任。


原创文章,转载请注明出处,谢谢!https://blog.csdn.net/weixin_36381802/article/details/113694338


环境: pip install gevent requests loguru pycryptodome


# -*- coding:utf-8 -*-"""
NAME: m3u8视频下载器
VERSION: v1.0
DATE: 2021-02-05
TIPS:1.若部分视频无法播放,建议更改文件名后缀或切换其它播放器(QuickTime、WindowsMedia等)进行尝试;2.在MacOS或Linux系统上运行前请确认已安装合并视频片段所使用的工具ffmpeg(Windows无视);3.仅支持下载m3u8类型视频,mp4等链接暂不支持(普通下载器满大街都是);4.脚本仅用于技术学习与研究,请勿用于任何非法用途,否则后果自负,本作者不承担任何责任。
"""import argparse
import os
import platform
import re
import shutil
import time
from datetime import datetime
from urllib.parse import urljoinimport gevent
from gevent.pool import Pool
from gevent import monkey; monkey.patch_all()import requests
import urllib3
from Crypto.Cipher import AES# 自定义日志显示格式
from os import environ
environ['LOGURU_FORMAT'] = "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level:<5}</level> | <level>{message}</level>"
from loguru import logger# logger.add('m3u8.log', level='DEBUG', format='{time:YYYY-MM-DD HH:mm:ss} {level:<5} {line} {message}')
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)class M3u8VideoDownloader:headers = {'Accept': '*/*','Accept-Encoding': 'gzip, deflate, br','Accept-Language': 'zh-CN,zh;q=0.9','User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36',}def __init__(self, m3u8_url, download_path=None, video_name=None, is_del_clip=True, test_download_num=0,retry_count=10, thread_num=30, dec_func=None, m3u8_content_plaintext=None):""":param m3u8_url: m3u8链接:param download_path: 下载路径:param video_name: 视频名称(不能出现括号):param is_del_clip: 合并视频完成后是否删除原片段:param test_download_num: 测试下载视频数量:param retry_count: 单个视频片段下载失败重试次数:param thread_num: 下载线程数:param dec_func: m3u8内容解密函数(内容被加密时可传入解密函数,或直接将解密后的明文内容传递给参数m3u8_content_plaintext):param m3u8_content_plaintext: 已解密的m3u8明文内容"""self.m3u8_url = m3u8_urlself.download_path = download_pathself.cache_path = None         # 临时缓存路径self.video_name = video_name or str(int(time.time()))self.video_name_suffix = '.mp4'  # 文件类型后缀self.is_del_clip = is_del_clipself.test_download_num = test_download_numself.retry_count = retry_countself.thread_num = min(thread_num, 50)self.max_merge_num = 500       # 单次合并文件最大数量self.dec_func = dec_funcself.m3u8_content_plaintext = m3u8_content_plaintextself.key_url = Noneself.key = Noneself.iv = Noneself.decipher = Noneself.video_clip_list = []      # 视频片段名称列表self.total_duration = 0        # 视频总时间(分钟)self.total_video_clip_num = 0  # 视频片段数量self.download_num = 0          # 已下载数量self.total_download_size = 0   # 总下载大小self.is_special_link = False   # 视频片段链接未带后缀(例`.ts`)时为True,一般出现在m3u8内容被加密的视频网站def fetch(self, url, binary=False):resp = requests.get(url, headers=self.headers, timeout=30, verify=False)status_code = resp.status_codeif status_code != 200:raise Exception(f'请求失败({status_code}):{url}')if binary:return resp.contentreturn resp.content.decode()def get_m3u8_content(self):"""获取m3u8内容"""logger.info(f'M3U8链接:{self.m3u8_url}')try:m3u8_content = self.fetch(self.m3u8_url)except Exception as e:raise Exception(f'获取m3u8内容失败({self.m3u8_url}):{repr(e)}')# 如果内容被加密,需要通过传入的解密函数进行解密if self.dec_func:try:m3u8_content = self.dec_func(m3u8_content)except Exception as e:raise Exception(f'解密m3u8内容失败({self.m3u8_url}):{repr(e)}')if '#EXTM3U' not in m3u8_content:raise Exception(f'错误的M3U8信息,请确认链接是否正确:{self.m3u8_url}<{m3u8_content}>')if '#EXT-X-STREAM-INF' in m3u8_content:m3u8_url_list = [line for line in m3u8_content.split('\n') if line.find('.m3u8') != -1]if len(m3u8_url_list) > 1:logger.info(f'发现{len(m3u8_url_list)}个m3u8地址:{m3u8_url_list}')self.m3u8_url = urljoin(self.m3u8_url, m3u8_url_list[0])return self.get_m3u8_content()# logger.info(f'M3U8内容已获取完成:{self.m3u8_url}')return m3u8_contentdef parse_m3u8_info(self, m3u8_content):"""解析m3u8文件:获取解密key、iv、视频url列表"""all_lines = m3u8_content.strip('\n').split('\n')is_updated_base_url = Falseis_exist_clip = Falseoffset = 0for index, line in enumerate(all_lines):if '#EXT-X-KEY' in line:# 避免重复解析key与ivif not (self.key_url and self.iv):method, key_url_part, self.iv = self.parse_ext_x_key(line)self.key_url = urljoin(self.m3u8_url, key_url_part)logger.info(f'视频已加密:{method}  Key地址:{key_url_part}')elif '#EXTINF' in line:for i in range(5):_index = index + i + 1# 过滤标签if not all_lines[_index].startswith('#'):next_line = all_lines[_index].rstrip()breakelse:raise Exception('未发现有效的下载链接')if not is_updated_base_url:is_exist_clip = Trueis_updated_base_url = Trueif next_line.startswith('http') or next_line.startswith('/'):suffix = next_line.rsplit('/', 1)[-1]if '.ts' in suffix or '.' in suffix:# 将下载地址更新到m3u8_urlself.m3u8_url = urljoin(self.m3u8_url, next_line)else:if len(next_line.split('//')[-1].split('/')) <= 2:offset = 1self.m3u8_url = next_line[:next_line.rfind('/', 0, next_line.rfind('/') + offset) + 1]self.is_special_link = Truelogger.debug(f'视频下载主地址已更新:{self.m3u8_url.rsplit("/", 1)[0]}')# 计算视频总时长duration_str = line.split(':')[-1].rstrip()try:self.total_duration += float(duration_str[:-1])except ValueError:pass# 添加视频到视频片段名称列表if self.is_special_link:clip_name = next_line[next_line.rfind('/', 0, next_line.rfind('/') + offset) + 1:].replace('/', '@@')clip_name = '.ts?'.join(clip_name.split('?')) if '?' in clip_name else clip_name + '.ts'self.video_clip_list.append(clip_name)else:clip_name = next_line.rsplit('/', 1)[-1]self.video_clip_list.append(clip_name)if not is_exist_clip:raise Exception('未发现视频下载链接')self.total_duration = int(self.total_duration) // 60 + 1self.total_video_clip_num = len(self.video_clip_list)logger.info(f'M3U8内容解析已完成,视频片段数量:{self.total_video_clip_num},视频时长:{self.total_duration}分钟,下载主地址:{self.m3u8_url.rsplit("/", 1)[0]}')@staticmethoddef parse_ext_x_key(ext_x_key: str) -> (str, str, bytes):"""解析#EXT-X-KEY中的key链接与iv"""ret = re.search(r'METHOD=(.*?),URI="(.*?)"(?:,IV=(\w+))?', ext_x_key)method, key_url, iv = ret.groups()iv = iv.replace('0x', '')[:16].encode() if iv else b''return method, key_url, ivdef get_key(self):try:self.key = self.fetch(self.key_url, binary=True)except Exception as e:raise Exception(f'获取key失败({self.key_url}):{repr(e)}')logger.info(f'key解析已完成:{self.key}  iv:{self.iv or "无"}')def init_decipher(self):self.decipher = AES.new(self.key, AES.MODE_CBC, self.iv or self.key[:16])def download_all_videos(self):# 重试时重新初始化已下载数量if self.cache_path:self.download_num = 0else:# 默认保存在用户目录下的Downloads/videos文件夹内if self.download_path is None:self.download_path = os.path.join(os.path.expanduser('~'), 'Downloads')self.download_path = os.path.join(self.download_path, 'Videos')if not os.path.exists(self.download_path):os.makedirs(self.download_path)file_list = os.listdir(self.download_path)if f'{self.video_name}{self.video_name_suffix}' in file_list or f'{self.video_name}.ts' in file_list:logger.info(f'视频已经存在:{self.video_name}')returnlogger.info(f'视频保存目录:{self.download_path}')# 临时缓存目录if not self.cache_path:self.cache_path = os.path.join(self.download_path, datetime.now().strftime('%Y%m%d'))if not os.path.exists(self.cache_path):os.makedirs(self.cache_path)# 测试下载部分视频if self.test_download_num > 0:self.video_clip_list = self.video_clip_list[:self.test_download_num]logger.info(f'当前为测试模式,设置下载视频片段数量:{self.test_download_num}')logger.info(f'即将开始下载视频:{self.video_name}{self.video_name_suffix}')start_time = int(time.time())# 协程池pool = Pool(self.thread_num)for clip in self.video_clip_list:pool.add(gevent.spawn(self.download_decode_save_video, clip))pool.join()# 线程池# from concurrent.futures.thread import ThreadPoolExecutor# with ThreadPoolExecutor(max_workers=self.thread_num) as pool:#     pool.map(self.download_decode_save_video, self.video_clip_list)spend_time = int(time.time()) - start_timelogger.info(f'下载视频耗时:{spend_time}秒')def download_decode_save_video(self, clip):"""下载、解码、保存视频"""url = urljoin(self.m3u8_url, clip)# 删除文件名中的参数部分,但url中的参数不能少clip = clip.split('?')[0]full_path_filename = os.path.join(self.cache_path, clip)if os.path.exists(full_path_filename):self.download_num += 1logger.debug(f'视频片段已存在({self.download_num}):{clip}')returnif self.is_special_link:url = url.replace('@@', '/').replace('.ts', '')# 下载单个视频raw_data = self.download_single_video(url)# 解码视频data = self.decode_video_clip(clip, raw_data)# 保存视频self.save_video_clip(clip, full_path_filename, data)def download_single_video(self, url):status_code = 0for i in range(self.retry_count):try:response = requests.get(url, headers=self.headers, timeout=30, verify=False)except Exception as e:if i == self.retry_count - 1:raise Exception(f'下载失败({url}):{repr(e)}')else:status_code = response.status_codeif status_code == 200:data = response.contentbreaktime.sleep(0.3)else:raise Exception(f'多次尝试下载失败({url}):{status_code}')return datadef decode_video_clip(self, clip, data):if self.decipher is not None:try:data = self.decipher.decrypt(data)except Exception as e:raise Exception(f'数据解密失败({clip}):{repr(e)}<{len(data)}>')return datadef save_video_clip(self, filename, full_path_filename, data):with open(full_path_filename, 'wb') as f:f.write(data)file_size = len(data)self.total_download_size += file_sizeself.download_num += 1file_size_m = round(file_size / float(1024*1024), 2)total_download_size_m = round(self.total_download_size/float(1024*1024), 2)total_num = self.test_download_num if 0 < self.test_download_num < self.total_video_clip_num else self.total_video_clip_numremainder = total_num - self.download_numlogger.debug(f'已完成({self.download_num})-剩余({remainder}):{filename} <{file_size_m:0<4}M - {total_download_size_m}M>')def win_merge(self):"""Windows平台合并视频"""cur_path = os.getcwd()os.chdir(self.cache_path)merge_num = 1merge_video_list = []start_index, end_index = 0, self.max_merge_numwhile 1:cur_merge_list = [clip.split('?')[0] for clip in self.video_clip_list[start_index:end_index]]if not cur_merge_list:video_filename = f'{self.video_name}{self.video_name_suffix}'if not merge_video_list:logger.error('视频合并失败')os.chdir(cur_path)return Falseelif len(merge_video_list) == 1:os.rename(merge_video_list[0], video_filename)if self.is_del_clip:os.system('del /Q *.ts*')if self.is_special_link:os.rename(video_filename, video_filename.replace(self.video_name_suffix, '.ts'))os.chdir(cur_path)video_filename = self.move_del_file(video_filename)logger.info(f'视频合并已全部完成:{video_filename}')else:status = os.system(f"copy /b {'+'.join(merge_video_list)} {video_filename} >> merge.log")if status == 0:if self.is_del_clip:os.system('del /Q *.ts*')if self.is_special_link:os.rename(video_filename, video_filename.replace(self.video_name_suffix, '.ts'))os.chdir(cur_path)video_filename = self.move_del_file(video_filename)logger.info(f'视频合并已全部完成:{video_filename}')else:os.chdir(cur_path)logger.error(f'最后一次合并失败:{merge_video_list}')return Truecur_video_name = f'{self.video_name}_temp{merge_num}.ts'cmd_name = '+'.join(cur_merge_list)status = os.system(f"copy /b {cmd_name} {cur_video_name} >> merge.log")if status == 0:merge_num += 1start_index, end_index = end_index, end_index + self.max_merge_nummerge_video_list.append(cur_video_name)logger.info(f'本次合并{len(cur_merge_list)}个视频完成:{cur_video_name}')else:logger.error('视频合并失败')os.chdir(cur_path)return Falsedef linux_merge(self):"""Linux或MacOS平台合并视频(需要使用ffmpeg)"""video_file_list = [os.path.join(self.cache_path, filename.split('?')[0]) for filename in self.video_clip_list]# 将video路径并合成一个字符参数file_argv = '|'.join(video_file_list)# 指定输出文件名称mp4_filename = os.path.join(self.cache_path, f'{self.video_name}{self.video_name_suffix}')# 调取系统命令使用ffmpeg将ts合成mp4文件cmd = f'ffmpeg -i "concat:{file_argv}" -c copy {mp4_filename}'status = os.system(cmd)if status == 0:# 删除原ts文件if self.is_del_clip:os.system(f'rm {os.path.join(self.cache_path, "*.ts*")}')if self.is_special_link:os.rename(mp4_filename, mp4_filename.replace(self.video_name_suffix, '.ts'))mp4_filename = self.move_del_file(mp4_filename)logger.info(f'视频已全部合并完成:{mp4_filename}')return Trueelse:logger.error('视频合并失败')return Falsedef move_del_file(self, video_filename):"""移动文件并删除临时文件夹"""if self.is_special_link:video_filename = video_filename.replace(self.video_name_suffix, '.ts')shutil.move(os.path.join(self.cache_path, video_filename), self.cache_path.rsplit(os.sep, 1)[0])shutil.rmtree(self.cache_path)return video_filenamedef merge_video_file(self):"""合并视频片段"""if self.test_download_num == 0:total_video_clip_num = self.total_video_clip_numelse:if self.test_download_num < self.total_video_clip_num:total_video_clip_num = self.test_download_numelse:total_video_clip_num = self.total_video_clip_numif self.download_num != total_video_clip_num:logger.error(f'视频信息不完整,取消合并:{self.download_num}-{total_video_clip_num}')return Falselogger.info(f'视频已全部下载完成,即将合并{self.download_num}个视频...')# 根据系统选择相应的合并方式sys_info = platform.system()if 'Windows' in sys_info:   # Windowsstatus = self.win_merge()elif 'Linux' in sys_info:   # Linuxstatus = self.linux_merge()elif 'Darwin' in sys_info:  # MacOSstatus = self.linux_merge()else:raise Exception(f'其它系统信息:{sys_info}')return statusdef start(self):# 1.获取m3u8内容m3u8_content = self.m3u8_content_plaintext or self.get_m3u8_content()# 2.解析m3u8内容self.parse_m3u8_info(m3u8_content)if not self.video_clip_list:logger.error('解析未发现有效的视频片段')return# 3.如果存在加密,获取解密key,并初始化解密器if self.key_url:self.get_key()self.init_decipher()# 下载/合并失败或视频片段不完整时重试3次for _ in range(3):# 4.下载视频self.download_all_videos()if self.download_num == 0:return# 5.合并视频if self.merge_video_file():breakdef parse_args():"""获取命令行参数信息"""arg_parser = argparse.ArgumentParser(description='========== M3U8下载器 ==========')arg_parser.add_argument('url', help='m3u8地址')arg_parser.add_argument('-p', '--path', help='下载路径')arg_parser.add_argument('-n', '--name', help='视频名称')arg_parser.add_argument('-c', '--count', type=int, help='测试下载视频片段数量', default=0)args = arg_parser.parse_args()return args.url, args.path, args.name, args.countdef download(m3u8_url, download_path=None, custom_video_name=None, test_download_num=0, m3u8_content=None):""":param m3u8_url: m3u8链接:param download_path: 下载路径:param custom_video_name: 自定义视频名称:param test_download_num: 测试下载数量(为0时下载全部):param m3u8_content: m3u8明文内容:return:"""if not (m3u8_url and m3u8_url.startswith('http')):logger.error(f'url不正确:{m3u8_url}')returnif '.mp4' in m3u8_url:logger.error(f'当前为mp4链接(暂不支持下载):{m3u8_url}')return# 下载视频downloader = M3u8VideoDownloader(m3u8_url=m3u8_url,download_path=download_path,video_name=custom_video_name,test_download_num=test_download_num,m3u8_content_plaintext=m3u8_content)try:downloader.start()except Exception as e:logger.exception(f'视频下载失败({repr(e)}):{m3u8_url}')if __name__ == '__main__':# 重点提醒:此链接必须是m3u8地址,而非视频播放地址!!!m3u8_url = 'https://baikevideo.cdn.bcebos.com/media/mda-Ogtg6GwqTr85eadR/ebf6891cf5cccea366d510589ff04edc.m3u8'# 以下3项为可选参数download_path = Nonevideo_title = Nonetest_num = 0  # 测试下载数量(为0时下载全部)# 如何要通过命令行执行,请将m3u8_url设置为Noneif not m3u8_url:m3u8_url, download_path, video_title, test_num = parse_args()download(m3u8_url, download_path, video_title, test_num)

脚本仅用于技术学习与研究,请勿用于任何非法用途,否则后果自负,本作者不承担任何法律责任。


原创文章,转载请注明出处,谢谢!https://blog.csdn.net/weixin_36381802/article/details/113694338

m3u8视频通用下载器相关推荐

  1. 哔哩哔哩视频免费下载器

    BilibiliVideoDownload功能介绍哔哩哔哩视频免费下载器-BilibiliVideoDownload for mac(哔哩哔哩B站视频下载) v3.1.6中文版 - 未来Mac下载 功 ...

  2. python去除抖音水印_Python爬虫:多平台短视频去水印下载器

    Python爬虫:多平台短视频去水印下载器 本教程描述的爬取方案定档与2020年10月26日 郑重申明:该文章介绍的技术仅供用于学习,不可恶意攻击各大短视频平台.对各大短视频平台服务器造成的任何损失, ...

  3. Python爬虫:多平台短视频去水印下载器

    Python爬虫:多平台短视频去水印下载器 功能介绍 各平台分解 抖音 快手 微视 皮皮搞笑 总结 源码仓库 本教程描述的爬取方案定档与2020年10月26日 郑重申明:该文章介绍的技术仅供用于学习, ...

  4. Hls.js播放m3u8视频 DPlayer视频播放器(easypan) MSE简介

    文章目录 学习链接 hls.js播放m3u8视频 效果 代码 前端代码 安装hls.js App.vue 后台代码 准备文件 mp4文件切片java实现 TsController TsService ...

  5. m3u8视频爬虫下载及合并(二)

    前言 爬虫获取m3u8视频资源的步骤 目前所要作的流程处理先把m3u8里下载链接批量提取.png把这几百个切片链接先批量下载.png再批量改文件后缀为.ts 再按照m3u8文件提取所有不规则链接文件的 ...

  6. python合并ts视频_python爬取视频网站m3u8视频,下载.ts后缀文件,合并成整视频

    最近发现一些网站,可以解析各大视频网站的vip.仔细想了想,这也算是爬虫呀,爬的是视频数据. 首先选取一个视频网站,我选的是 影视大全 ,然后选择上映不久的电影 "一出好戏" . ...

  7. Android,播放m3u8视频和下载m3u8的视频

    因最近项目需要,研究了一个礼拜的m3u8.格式为m3u8的视频,其实是由多个.ts文件组成在一起播放的.下面有些资料是参考了网上的,总体概括下实现思路: 1.根据后台给出的m3u8的地址,实现播放,核 ...

  8. m3u8视频格式下载并转换为mp4(ffmpeg)

    通过在网络上查询资料以及博客后发现了下载m3u8转换为mp4得方法,但是在使用后发现转换后的mp4文件信息不完善.如果您使用过网上流传最广的转换方法会发现当你查看mp4文件详细信息时是没有播放时长等数 ...

  9. Python 实现 m3u8 视频下载

    Python 实现 m3u8 视频下载 m3u8 是一种基于文本的媒体播放列表文件格式,通常用于指定流媒体播放器播放在线媒体流.它是一个简单的文本文件,其中包含多个由 URI 引用的媒体资源文件的 U ...

  10. 犀牛视频下载器丨钜惠惊喜不断

    亲爱的朋友们: 犀牛视频超级下载器来发福利啦!双十一马上就要来了,各位做好"购物清单"了吗?活动太丰富了,小编贴心为你做了一个大合集,快点来抄作业吧! 犀牛视频下载器-提供无水印. ...

最新文章

  1. mongodb在mysql中怎么用,mongoDB数据库基本操作
  2. 初识Tcl(二):Tcl 数据类型
  3. java 待在原页面 代码_现在java后台,只要修改一点点代码,前段页面就报500,必须重新登录才行?...
  4. angularJS--多个控制器之间的数据共享
  5. Mysql(8)——as和distinct和where的用法
  6. 壁式框架内力计算_剪力墙结构设计计算要点和实例
  7. win2008r2 mysql 远程_SQL SERVER 2008 R2如何开启数据库的远程连接(转)
  8. python语言适合哪些领域的计算问题数据处理和文本挖掘_R和Python中文本挖掘8大入门指南...
  9. 何以笙箫默,一部有剧情的创意广告集?
  10. Mybatis原理分析之二:框架整体设计
  11. 隐马尔可夫模型HMM[转载牛人,看了半天没看懂]
  12. 树链剖分+线段树 CF 593D Happy Tree Party(快乐树聚会)
  13. 入门排序(冒泡、选择、直接)
  14. 万年历单片机课程设计百度文库_单片机课程设计电子万年历设计
  15. Java设计文本编辑器
  16. Visual Studio2010安装步骤
  17. win7由于无法确定计算机,nvme固态硬盘安装win7教程((解决硬盘无法识别)
  18. 图像工作回顾之三:极线匹配
  19. 对d3d9里面的函数挂钩实现透视
  20. Causality Inspired Representation Learning for Domain Generalization 阅读笔记

热门文章

  1. 毕业设计html5作品,基于HTML5的年货购物网站的设计与实现毕业论文+任务书+开题报告+设计源码...
  2. 极光笔记|极光推送在APICloud平台的使用教程
  3. 计算机学stata,stata(统计学软件)
  4. OSx86的来龙去脉
  5. CentOS7恢复rm -rf 误删的xfs系统
  6. 客户关系管理软件的作用是什么?
  7. python批量转换图片格式_利用Python批量把PDF文件文件转换成图片格式
  8. tenda无线网卡Linux驱动,腾达Tenda W311MA无线网卡Linux下驱动安装
  9. 2020 智慧旅游系统总体设计方案
  10. 如何将pdf中的矢量图另存为图片