1.说明

m3u8是一种传输数据的方式,比如说一集20分钟的完整视频被分割成一千多段一两秒的小视频,客户端播放的时候是感觉是连续,但如果你要下载这集视频,那就要把一千多个小视频全都下载然后自己拼接成一个完整视频。拼接的话很简单,像格式工厂等很多软件都可以轻松完成,但要一个一个下载视频分段确实麻烦,所以我打算使用Python开启多线程下载,每个视频使用一个线程,只要你的网速够快,几秒钟下载一集视频没什么问题。

2.实现思路

2.1.m3u8文件。m3u8一般是以m3u8结尾的文件,如果是浏览器,可以按一下F12打开DevTools进行抓包获取m3u8的完整链接,下载之后提取所有视频分段的uri,为了方便操作,我们可以使用m3u8库。

2.2.加密解密。有些m3u8是加密的,但会在文件里给出秘钥的url,请求一下即可得到秘钥,秘钥一般是一个数字字母组成的字符串。一般加密算法是AES-128,我们需要借助pycryptodome库对已加密的视频进行解密操作。

2.3.视频合并。Windows系统自带的copy命令也可以合并,但是经过我测试,发现合并之后的视频可能会混乱,所以如果视频比较少,可以借助格式工厂等软件合并,如果较多,可以使用Python操作FFmpeg

2.4.限频问题。因为很多网站都会限频,也就是说,同时发起的请求个数不能超过一定值,否则服务器不会正常响应数据,所以我们可能需要限制一下并发执行的线程数,使用Python自带的BoundedSemaphore就行

2.5.显示进度条。如果只是简单的打印当前进度,感觉不够美观,我们可以借助tqdm等库实现进度条的显示

3.代码实现

此脚本需要用到的第三方库

pip install requests
pip install fake_useragent
pip install m3u8
pip install pycryptodome
pip install tqdm

参考代码

import logging
import os.path
import sys
import time
from datetime import datetime
from threading import Thread, BoundedSemaphoreimport requests
from Crypto.Cipher import AES
from fake_useragent import UserAgentimport m3u8
from tqdm import tqdm# pip install requests
# pip install fake-useragent==0.1.11
# pip install m3u8
# pip install pycryptodome
# pip install tqdmclass M3U8Loader:def __init__(self, uri, base_url, segments):self.uri = uriself.base_url = base_urlself.segments = segments@classmethoddef load(cls, uri, base_url=None):if uri.startswith("http"):res = requests.get(uri, headers={"User-Agent": get_user_agent()})if res.status_code != 200:raise Exception(f"load u3u8 failed when download file, uri: {uri}")text = res.text.encode().decode('unicode_escape')segments = text.split("\n")else:with open(uri, encoding="utf-8") as f:segments = f.read()segments = segments.encode().decode('unicode_escape')segments = segments.split("\n")if not base_url:for line in segments:if line.startswith("http"):base_url = os.path.split(line.split("?")[0])[0]breakif not base_url and uri.startswith("http"):base_url = str(uri).split("?")[0].rsplit("/", maxsplit=1)[0]segments = [s.strip() for s in segments]return M3U8Loader(uri, base_url, segments)def decode_video(video_stream, key, iv):if iv and iv and str(iv).startswith("0x") and int(iv, 16):aes = AES.new(bytes(key, encoding='utf8'), AES.MODE_CBC, bytes(iv, encoding='utf8'))else:aes = AES.new(bytes(key, encoding='utf8'), AES.MODE_CBC, bytes(key, encoding='utf8'))return aes.decrypt(video_stream)def get_datetime_num():return datetime.strftime(datetime.now(), "%Y%m%d%H%M%S")def get_user_agent():return UserAgent(path="./utils/fake_useragent_0.1.11.json").randomclass M3U8Downloader:def __init__(self, m3u8_url, base_url, save_dir, video_folder, headers, if_random_ug, merge_name, ffmpeg_path,sp_count, if_tqdm):self.tqdm = Noneself.if_tqdm = if_tqdmself.m3u8_url = m3u8_urlself.base_url = base_url if base_url else ""self.to_download_url = list()self.download_failed_dict = dict()self.key_method = Noneself.key_iv = Noneself.key_str = Noneself.current_file_path = os.path.dirname(os.path.abspath(__file__))self.save_dir = save_dir if save_dir else os.path.join(self.current_file_path, "m3u8_download")self.video_folder = video_folder if video_folder else get_datetime_num()if not os.path.isabs(ffmpeg_path):ffmpeg_path = os.path.join(self.current_file_path, ffmpeg_path)self.headers = headers if isinstance(headers, dict) else dict()self.if_random_ug = if_random_ug if isinstance(if_random_ug, bool) else Trueself.ffmpeg_path = ffmpeg_pathself.merge_name = merge_name if merge_name else "merge.ts"self.file_type = ".ts"self.semaphore = BoundedSemaphore(sp_count) if sp_count else Noneself.logger = self.get_logger()self.normalize_m3u8_file(self.m3u8_url)self.normalize_base_url()self.logger.info(f"init info m3u8_url: {self.m3u8_url}")self.logger.info(f"init info base_url: {self.base_url}")self.logger.info(f"init info if_random_ug: {self.if_random_ug}")self.logger.info(f"init info headers: {self.headers}")self.logger.info(f"init info save_dir: {self.save_dir}")self.logger.info(f"init info video_folder: {self.video_folder}")self.logger.info(f"init info current_file_path: {self.current_file_path}")self.logger.info(f"init info ffmpeg_path: {self.ffmpeg_path}")self.logger.info(f"init info merge_name: {self.merge_name}")def __del__(self):if self.tqdm:self.tqdm.close()def get_headers(self):headers = self.headersif self.if_random_ug:headers.update({"User-Agent": get_user_agent()})return headersdef get_logger(self):logger = logging.getLogger("M3U8Downloader")logger.setLevel(logging.INFO)formatter = logging.Formatter("%(asctime)s-%(filename)s-line:%(lineno)d-%(levelname)s-%(process)s: %(message)s")console_handler = logging.StreamHandler()console_handler.setLevel(logging.INFO)console_handler.setFormatter(formatter)if not os.path.exists(self.save_dir):os.mkdir(self.save_dir)file_handler = logging.FileHandler(os.path.join(self.save_dir, "m3u8_download.log"), encoding="utf-8")file_handler.setLevel(logging.INFO)file_handler.setFormatter(formatter)logger.addHandler(console_handler)logger.addHandler(file_handler)return loggerdef get_m3u8_info(self):m3u8_obj = m3u8.load(self.m3u8_url, timeout=10, headers=self.get_headers())keys = m3u8_obj.keysif keys and keys[-1]:key_alg = keys[-1].methodif key_alg != "AES-128":raise Exception(f"matched key but algorithm ({key_alg}) is not AES-128")self.key_method = key_algself.key_iv = keys[-1].ivself.get_key(self.normalize_url(keys[-1].absolute_uri))self.to_download_url = [self.normalize_url(segment.uri) for segment in m3u8_obj.segments]self.to_download_url = [d_url for d_url in self.to_download_url if d_url]if not self.to_download_url:loader_obj = M3U8Loader.load(self.m3u8_url, self.base_url)self.to_download_url = [self.normalize_url(segment) for segment in loader_obj.segments]self.to_download_url = [d_url for d_url in self.to_download_url if d_url]self.logger.info(f"to_download_url: {len(self.to_download_url)} {self.to_download_url[:5]}, ...")self.tqdm = tqdm(total=len(self.to_download_url), desc="download progress") if self.if_tqdm else Noneif self.to_download_url:self.file_type = os.path.splitext(self.to_download_url[0].split("?")[0])[1]def get_key(self, key_url):self.logger.info(f"key_url: {key_url}")res = requests.get(key_url, headers=self.get_headers(), timeout=10)self.key_str = res.textif not self.key_str:raise Exception("get key error, key: {}".format(self.key_str))self.logger.info(f"get_key key_str: {self.key_str}")def test_download(self, d_url):self.logger.info(f"test download url: {d_url}")try:res = requests.get(d_url, timeout=30, headers=self.get_headers(), stream=True)return True if res.status_code < 300 else Falseexcept Exception as e:self.logger.error(f"test_download meet error: {e}")return Falsedef download_video(self, number, url):if self.semaphore:self.semaphore.acquire()trt_times = 10res_content = Nonewhile trt_times > 0:try:res = requests.get(url, timeout=10, stream=True)if res.status_code == 200:res_content = res.contentbreakexcept Exception as e:self.logger.error(f"download failed, will try again: url:{url} ,error:{e}")res_content = Nonetrt_times -= 1time.sleep(1)if res_content:if self.key_str:res_content = decode_video(res_content, self.key_str, self.key_iv)path = os.path.join(self.save_dir, self.video_folder, "{0:0>8}".format(number) + str(self.file_type))with open(path, "wb+") as f:f.write(res_content)# self.logger.info(f"download video {path} (total: {len(self.to_download_url)}) success, url: {url}")if self.tqdm:self.tqdm.update(1)else:self.logger.warning(f"download video failed, number:{number},url:{url}")self.download_failed_dict.update({number: url})if self.semaphore:self.semaphore.release()def merge_videos(self):if os.name != "nt":self.logger.warning(f"current system {os.name} is not Windows, can't merge.")returnself.logger.info("start merge")path = self.save_dirif os.path.isabs(path):path = self.save_dir + os.sep + self.video_folderelse:path = self.current_file_path + os.sep + os.path.basename(self.save_dir) + os.sep + self.video_folderif not os.path.exists(path):self.logger.warning(f"merge_videos canceled, the path({path}) is not exist")returnself.logger.info(f"ffmpeg path: {self.ffmpeg_path}")all_ts_files = os.listdir(path)all_ts_files = [ts for ts in all_ts_files if ts.startswith("0") and ts.endswith(self.file_type)]if not all_ts_files:self.logger.warning(f"there is no {self.file_type} file need to merge")returnall_ts_files.sort(key=lambda x: x)self.logger.info(f"ffmpeg path: {self.ffmpeg_path}")if self.ffmpeg_path and os.path.exists(self.ffmpeg_path):self.logger.info(f"use ffmpeg to merge")with open(path + os.sep + "merge_file_list.txt", "w") as f:for file in all_ts_files:f.write("file " + "'" + path + os.sep + file + "'" + "\n")cmd = "{} -f concat -safe 0 -i {} -c copy {}".format(self.ffmpeg_path, path + os.sep + 'merge_file_list.txt', path + os.sep + self.merge_name)self.logger.info(f"merge cmd: {cmd}")res = os.system(cmd)if res:self.logger.error("merge failed")else:self.logger.info("merge success")else:self.logger.warning(f"ffmpeg not exist, will merge by python")try:with open(path + os.sep + self.merge_name, "wb+") as f:for ts_file in all_ts_files:with open(path + os.sep + ts_file, "rb+") as t:f.write(t.read())except Exception as e:self.logger.error(f"merge failed: {e}")else:self.logger.info("merge success")def mkdir(self):if not os.path.exists(self.save_dir):os.mkdir(self.save_dir)self.logger.info(f"make save_dir({self.save_dir}) success.")video_folder = os.path.join(self.save_dir, self.video_folder)if not os.path.exists(video_folder):os.mkdir(video_folder)self.logger.info(f"make video_folder({video_folder}) success.")def normalize_url(self, raw_url):raw_url = raw_url.strip()if raw_url.startswith("#"):returnif raw_url and raw_url.startswith("http") and any([raw_url.split("?")[0].endswith(".ts"), raw_url.split("?")[0].endswith(".key")]):return raw_urlif raw_url and not str(raw_url).startswith("http"):last_find_str = ""for i in range(1, len(raw_url) + 1):start_str = raw_url[:i]if self.base_url.rfind(start_str) == -1:breakelse:last_find_str = start_strsep = "" if self.base_url.endswith("/") or raw_url.startswith("/") else "/"if len(last_find_str) > 2 and self.base_url.endswith(last_find_str):raw_url = f"{self.base_url}{sep}{raw_url.replace(last_find_str, '')}"else:raw_url = f"{self.base_url}{sep}{raw_url}"return raw_urldef normalize_m3u8_file(self, path):if not os.path.exists(path):returnwith open(path, "r", encoding="utf-8") as f:contents = f.read()contents = contents.encode().decode('unicode_escape')print()with open(path, 'w', encoding='utf-8') as w:w.write(contents.replace("'", "").replace('"', ''))self.logger.info(f"normalize m3u8 file success, path: {path}")return contentsdef normalize_base_url(self):if self.base_url and self.base_url.startswith('http'):returnbase_url = M3U8Loader.load(self.m3u8_url).base_urlif base_url:self.base_url = base_urlelse:raise Exception("automatically identify base_url failed, please fill in manually")def run(self):start_time = time.time()self.get_m3u8_info()if not self.to_download_url:self.logger.warning("there is no url to download, self.to_download_url is empty, please check url")returnself.logger.info(f"self.key_str: {self.key_str}")self.logger.info(f"self.key_method: {self.key_method}")if not self.test_download(self.to_download_url[0]):self.logger.warning(f"test download failed, pls check whether the url is valid ({self.to_download_url[0]})")returnself.mkdir()threads = [Thread(target=self.download_video, args=(idx, url)) for idx, url in enumerate(self.to_download_url)]for t in threads:t.start()for t in threads:t.join()self.logger.info(f"all download finish, spent time: {time.time() - start_time:.2f} second")self.logger.info(f"total video count: {len(self.to_download_url)}")self.logger.info(f"download_failed_dict: {self.download_failed_dict}")if self.download_failed_dict:self.logger.warning(f"{len(self.download_failed_dict)} video file download failed.")raise Exception(f"{len(self.download_failed_dict)} video file download failed.")if self.ffmpeg_path:self.merge_videos()if self.tqdm:self.tqdm.close()if __name__ == '__main__':url = "https://test/index.m3u8"if len(sys.argv) > 1 and str(sys.argv[1]).startswith("http"):url = sys.argv[1]if not url:raise Exception("missing download url")params_dict = {"m3u8_url": url,"base_url": "","save_dir": "","video_folder": "","headers": {# "Host": "",# "Cookie": "",# "Referer": "",# "User-Agent": "",},"if_random_ug": True,"ffmpeg_path": "./utils/ffmpeg.exe","merge_name": "","sp_count": 2,"if_tqdm": True,}# if os.path.isfile(params_dict["m3u8_url"]) and not params_dict["base_url"]:#     raise Exception("the m3u8 file is a local file but miss base_url")downloader = M3U8Downloader(**params_dict)downloader.run()

该代码已上传
GitHub链接:https://github.com/panmeibing/python_downloader

【python】多线程下载m3u8分段视频相关推荐

  1. python多线程下载视频_python 实现多线程下载m3u8格式视频并使用fmmpeg合并

    电影之类的长视频好像都用m3u8格式了,这就导致了多线程下载视频的意义不是很大,都是短视频,线不线程就没什么意义了嘛. 我们知道,m3u8的链接会下载一个文档,相当长,半小时的视频,应该有接近千行ts ...

  2. python 实现多线程下载m3u8格式视频,使用FFmpeg合并(升级修订自s_kangkang_A)

    基本代码源自: https://blog.csdn.net/s_kangkang_A/article/details/103071822. 感谢 s_kangkang_A https://blog.c ...

  3. python多线程下载m3u8文件,python 实现多线程下载m3u8格式视频并使用fmmpeg合并

    如何把m3u8格式转换成mp4格式? 可以按照如下方式进行操作: 抑郁的人在水底,正常人在水面,小编沉浮在中间,上不去也下不来. 手机上面找到m3u8格式文件的存储位置,在打开方式里边选择" ...

  4. python下载大文件mp4_Python 下载 m3u8 格式视频

    Python requests 下载 m3u8 格式 视频 最近爬取一个视频网站,遇到 m3u8 格式的视频需要下载. 抓包分析,视频文件是多个 ts 文件,什么是 ts文件,请去百度吧: 附图:抓包 ...

  5. python爬取下载m3u8加密视频,原来这么简单!

    1.前言 爬取视频的时候发现,现在的视频都是经过加密(m3u8),不再是mp4或者avi链接直接在网页显示,都是经过加密形成ts文件分段进行播放. 今天就教大家如果通过python爬取下载m3u8加密 ...

  6. Python下载M3U8加密视频示例

    大家好,我是小小明. 最近看到几个视频网站的地址依然是m3u8格式,不禁有了使用python进行下载的想法,虽然下载m3u8格式视频的工具很多,但如果我们自行编码就能应对更多的情况. 关于m3u8的基 ...

  7. python下载m3u8视频_使用python 下载m3u8格式视频,并使用ffmpeg 合成视频

    使用python 下载m3u8格式视频,并合成 # -*- coding: utf-8 -*- # Created on 2018/07/26 import os import requests &q ...

  8. blob的真实地址怎么获得_使用Python抓取m3u8加密视频 续:获得index.m3u8 地址

    之前写<使用Python抓取m3u8加密视频>笔记的原因,是自己有几个视频想保存,但对于m3u8, .ts 文件拼接不熟悉,就尝试写个脚本练手. 今天看了回复,有同学想知道如何从视频网站上 ...

  9. python 爬取加密视频_使用Python抓取m3u8加密视频 续:获得index.m3u8 地址

    之前写<使用Python抓取m3u8加密视频>笔记的原因,是自己有几个视频想保存,但对于m3u8, .ts 文件拼接不熟悉,就尝试写个脚本练手. 今天看了回复,有同学想知道如何从视频网站上 ...

最新文章

  1. Android软键盘隐藏,遮挡EidtText解决办法
  2. AI理论知识整理(11)-线性组合线性相关与线性无关
  3. 一个简单的你好,世界! 使用 Boost.MPI 消息传递的示例
  4. django-模板语言-传输各种数据类型
  5. C语言中简单的题目,C语言的一些简单题目,没有答案,哪位大神帮忙做一下!!!...
  6. mysql—数据库优化——如何选择合适的索引
  7. 解决springboot的pom.xml文件第一行报错问题
  8. 性能指标TP99介绍
  9. 使用sklearn构建完整的回归项目(一)
  10. UnityShader学习教程之<矩阵的左乘还是右乘所导致的效果问题>
  11. Ubuntu 14.04 T430s 安装指纹识别
  12. 软考信息系统项目管理师(高项),论文该怎么准备?
  13. pv=nrt_中学物理之pV=nRT应用总结篇
  14. 汽车销售发票扫描识别系统助力汽车业
  15. 保存淘宝商品图片的方法
  16. (七十六):Masked Autoencoders Are Scalable Vision Learners
  17. 一分钟:XM文件格式转换MP3
  18. IMX系列设备树引脚复用解析
  19. SQL语句查看表结构和修改字段长度
  20. 时间轴插件完善,大事记清晰美观

热门文章

  1. 树莓派65/100 - Pico W初体验,点亮板载的LED灯
  2. 【数字化】国产自主智能制造数字化车间
  3. 尚硅谷JavaScript基础实战丨JS入门到精通全套完整版 P15
  4. Java-Tomcat如何修改端口号
  5. 关于STM32的CANFD IP设计的缺陷
  6. 电压放大器在超声波电机研究中的应用
  7. 详解数据架构的七类视图(多图+案例)
  8. 小米应用商店上传apk报图片格式错误,小米手机调试 DELETE_FAILED_INTERNAL_ERROR错误
  9. python3下django将应用单独放在一个文件夹下,注册应用报错
  10. List对象集合属性处理