抖音python上的代码视频_资深程序员:十行Python代码教你爬取抖音视频!
环境说明
环境:
python 3.7.1
centos 7.4
pip 10.0.1
部署
[root@localhost ~]# python3.7 --version
Python 3.7.1
[root@localhost ~]#
[root@localhost ~]# pip3 install douyin
有时候因为网络原因会安装失败,这时重新执行上面的命令即可,直到安装完成。
导入douyin模块
[root@localhost ~]# python3.7
>>>import douyin
>>>
导入如果报错的话,可能douyin模块没有安装成功。
下面我们开始爬…爬抖音小视频和音乐咯
[root@localhost douyin]# python3.7 dou.py
几分钟后…我们来看看爬的成果
可以看到视频配的音乐被存储成了 mp3 格式的文件,抖音视频存储成了 mp4 文件。
嗯…不错,哈哈。
py脚本
作者说,能爬抖音上所有热门话题和音乐下的相关视频都爬取到,并且将爬到的视频下载下来,同时还要把视频所配的音乐也单独下载下来,不仅如此,所有视频的相关信息如发布人、点赞数、评论数、发布时间、发布人、发布地点等等信息都需要爬取下来,并存储到 MongoDB 数据库。
import douyin
from douyin.structures import Topic, Music
# 定义视频下载、音频下载、MongoDB 存储的处理器
video_file_handler = douyin.handlers.VideoFileHandler(folder='./videos')
music_file_handler = douyin.handlers.MusicFileHandler(folder='./musics')
#mongo_handler = douyin.handlers.MongoHandler()
# 定义下载器,并将三个处理器当做参数传递
#downloader = douyin.downloaders.VideoDownloader([mongo_handler, video_file_handler, music_
file_handler])
downloader = douyin.downloaders.VideoDownloader([video_file_handler, music_file_handler])
# 循环爬取抖音热榜信息并下载存储
for result in douyin.hot.trend():
for item in result.data:
# 爬取热门话题和热门音乐下面的所有视频,每个话题或音乐最多爬取 10 个相关视频。
downloader.download(item.videos(max=10))
由于我这里没有mongodb所以,把这mongodb相关的配置给注释掉了。
代码解读
本库依赖的其他库有:
aiohttp:利用它可以完成异步数据下载,加快下载速度
dateparser:利用它可以完成任意格式日期的转化
motor:利用它可以完成异步 MongoDB 存储,加快存储速度
requests:利用它可以完成最基本的 HTTP 请求模拟
tqdm:利用它可以进行进度条的展示
数据结构定义
如果要做一个库的话,一个很重要的点就是对一些关键的信息进行结构化的定义,使用面向对象的思维对某些对象进行封装,抖音的爬取也不例外。
在抖音中,其实有很多种对象,比如视频、音乐、话题、用户、评论等等,它们之间通过某种关系联系在一起,例如视频中使用了某个配乐,那么视频和音乐就存在使用关系;比如用户发布了视频,那么用户和视频就存在发布关系,我们可以使用面向对象的思维对每个对象进行封装,比如视频的话,就可以定义成如下结构:
class Video(Base):
def __init__(self, **kwargs):
"""
init video object
:param kwargs:
"""
super().__init__()
self.id = kwargs.get('id')
self.desc = kwargs.get('desc')
self.author = kwargs.get('author')
self.music = kwargs.get('music')
self.like_count = kwargs.get('like_count')
self.comment_count = kwargs.get('comment_count')
self.share_count = kwargs.get('share_count')
self.hot_count = kwargs.get('hot_count')
...
self.address = kwargs.get('address')
def __repr__(self):
"""
video to str
:return: str
"""
return '>' % (self.id, self.desc[:10].strip() if self.desc else None)
这里将一些关键的属性定义成 Video 类的一部分,包括 id 索引、desc 描述、author 发布人、music 配乐等等,其中 author 和 music 并不是简单的字符串的形式,它也是单独定义的数据结构,比如 author 就是 User 类型的对象,而 User 的定义又是如下结构:
class User(Base):
def __init__(self, **kwargs):
"""
init user object
:param kwargs:
"""
super().__init__()
self.id = kwargs.get('id')
self.gender = kwargs.get('gender')
self.name = kwargs.get('name')
self.create_time = kwargs.get('create_time')
self.birthday = kwargs.get('birthday')
...
def __repr__(self):
"""
user to str
:return:
"""
return '>' % (self.alias, self.name)
所以说,通过属性之间的关联,我们就可以将不同的对象关联起来,这样显得逻辑架构清晰,而且我们也不用一个个单独维护字典来存储了,其实这就和 Scrapy 里面的 Item 的定义是类似的。
请求和重试
实现爬取的过程就不必多说了,这里面其实用到的就是最简单的抓包技巧,使用 Charles 直接进行抓包即可。抓包之后便可以观察到对应的接口请求,然后进行模拟即可。
所以问题就来了,难道我要一个接口写一个请求方法吗?另外还要配置 Headers、超时时间等等的内容,那岂不是太费劲了,所以,我们可以将请求的方法进行单独的封装,这里我定义了一个 fetch 方法:
def _fetch(url, **kwargs):
"""
fetch api response
:param url: fetch url
:param kwargs: other requests params
:return: json of response
"""
response = requests.get(url, **kwargs)
if response.status_code != 200:
raise requests.ConnectionError('Expected status code 200, but got {}'.format(response.status_code))
return response.json()
这个方法留了一个必要参数,即 url,另外其他的配置我留成了 kwargs,也就是可以任意传递,传递之后,它会依次传递给 requests 的请求方法,然后这里还做了异常处理,如果成功请求,即可返回正常的请求结果。
定义了这个方法,在其他的调用方法里面我们只需要单独调用这个 fetch 方法即可,而不需要再去关心异常处理,返回类型了。
好,那么定义好了请求之后,如果出现了请求失败怎么办呢?按照常规的方法,我们可能就会在外面套一层方法,然后记录调用 fetch 方法请求失败的次数,然后重新调用 fetch 方法进行重试,但这里可以告诉大家一个更好用的库,叫做 retrying,使用它我们可以通过定义一个装饰器来完成重试的操作。
比如我可以使用 retry 装饰器这么装饰 fetch 方法:
from retrying import retry
@retry(stop_max_attempt_number=retry_max_number, wait_random_min=retry_min_random_wait,
wait_random_max=retry_max_random_wait, retry_on_exception=need_retry)
def _fetch(url, **kwargs):
pass
这里使用了装饰器的四个参数:
stop_max_attempt_number:最大重试次数,如果重试次数达到该次数则放弃重试
wait_random_min:下次重试之前随机等待时间的最小值
wait_random_max:下次重试之前随机等待时间的最大值
retry_on_exception:判断出现了怎样的异常才重试
这里 retry_on_exception 参数指定了一个方法,叫做 need_retry,方法定义如下:
def need_retry(exception):
"""
need to retry
:param exception:
:return:
"""
result = isinstance(exception, (requests.ConnectionError, requests.ReadTimeout))
if result:
print('Exception', type(exception), 'occurred, retrying...')
return result
这里判断了如果是 requests 的 ConnectionError 和 ReadTimeout 异常的话,就会抛出异常进行重试,否则不予重试。
所以,这样我们就实现了请求的封装和自动重试,是不是非常 Pythonic?
下载处理器的设计
为了下载视频,我们需要设计一个下载处理器来下载已经爬取到的视频链接,所以下载处理器的输入就是一批批的视频链接,下载器接收到这些链接,会将其进行下载处理,并将视频存储到对应的位置,另外也可以完成一些信息存储操作。
在设计时,下载处理器的要求有两个,一个是保证高速的下载,另一个就是可扩展性要强,下面我们分别来针对这两个特点进行设计:
高速下载,为了实现高速的下载,要么可以使用多线程或多进程,要么可以用异步下载,很明显,后者是更有优势的。
扩展性强,下载处理器要能下载音频、视频,另外还可以支持数据库等存储,所以为了解耦合,我们可以将视频下载、音频下载、数据库存储的功能独立出来,下载处理器只负责视频链接的主要逻辑处理和分配即可。
为了实现高速下载,这里我们可以使用 aiohttp 库来完成,另外异步下载我们也不能一下子下载太多,不然网络波动太大,所以我们可以设置 batch 式下载,可以避免同时大量的请求和网络拥塞,主要的下载函数如下:
def download(self, inputs):
"""
download video or video lists
:param data:
:return:
"""
if isinstance(inputs, types.GeneratorType):
temps = []
for result in inputs:
print('Processing', result, '...')
temps.append(result)
if len(temps) == self.batch:
self.process_items(temps)
temps = []
else:
inputs = inputs if isinstance(inputs, list) else [inputs]
self.process_items(inputs)
这个 download 方法设计了多种数据接收类型,可以接收一个生成器,也可以接收单个或列表形式的视频对象数据,接着调用了 process_items 方法进行了异步下载,其方法实现如下:
def process_items(self, objs):
"""
process items
:param objs: objs
:return:
"""
# define progress bar
with tqdm(total=len(objs)) as self.bar:
# init event loop
loop = asyncio.get_event_loop()
# get num of batches
total_step = int(math.ceil(len(objs) / self.batch))
# for every batch
for step in range(total_step):
start, end = step * self.batch, (step + 1) * self.batch
print('Processing %d-%d of files' % (start + 1, end))
# get batch of objs
objs_batch = objs[start: end]
# define tasks and run loop
tasks = [asyncio.ensure_future(self.process_item(obj)) for obj in objs_batch]
for task in tasks:
task.add_done_callback(self.update_progress)
loop.run_until_complete(asyncio.wait(tasks))
这里使用了 asyncio 实现了异步处理,并通过对视频链接进行分批处理保证了流量的稳定性,另外还使用了 tqdm 实现了进度条的显示。
我们可以看到,真正的处理下载的方法是 process_item,这里面会调用视频下载、音频下载、数据库存储的一些组件来完成处理,由于我们使用了 asyncio 进行了异步处理,所以 process_item 也需要是一个支持异步处理的方法,定义如下:
async def process_item(self, obj):
"""
process item
:param obj: single obj
:return:
"""
if isinstance(obj, Video):
print('Processing', obj, '...')
for handler in self.handlers:
if isinstance(handler, Handler):
await handler.process(obj)
这里我们可以看到,真正的处理逻辑都在一个个 handler 里面,我们将每个单独的功能进行了抽离,定义成了一个个 Handler,这样可以实现良好的解耦合,如果我们要增加和关闭某些功能,只需要配置不同的 Handler 即可,而不需要去改动代码,这也是设计模式的一个解耦思想,类似工厂模式。
Handler 的设计
刚才我们讲了,Handler 就负责一个个具体功能的实现,比如视频下载、音频下载、数据存储等等,所以我们可以将它们定义成不同的 Handler,而视频下载、音频下载又都是文件下载,所以又可以利用继承的思想设计一个文件下载的 Handler,定义如下:
from os.path import join, exists
from os import makedirs
from douyin.handlers import Handler
from douyin.utils.type import mime_to_ext
import aiohttp
class FileHandler(Handler):
def __init__(self, folder):
"""
init save folder
:param folder:
"""
super().__init__()
self.folder = folder
if not exists(self.folder):
makedirs(self.folder)
async def _process(self, obj, **kwargs):
"""
download to file
:param url: resource url
:param name: save name
:param kwargs:
:return:
"""
print('Downloading', obj, '...')
kwargs.update({'ssl': False})
kwargs.update({'timeout': 10})
async with aiohttp.ClientSession() as session:
async with session.get(obj.play_url, **kwargs) as response:
if response.status == 200:
extension = mime_to_ext(response.headers.get('Content-Type'))
full_path = join(self.folder, '%s.%s' % (obj.id, extension))
with open(full_path, 'wb') as f:
f.write(await response.content.read())
print('Downloaded file to', full_path)
else:
print('Cannot download %s, response status %s' % (obj.id, response.status))
async def process(self, obj, **kwargs):
"""
process obj
:param obj:
:param kwargs:
:return:
"""
return await self._process(obj, **kwargs)
这里我们还是使用了 aiohttp,因为在下载处理器中需要 Handler 支持异步操作,这里下载的时候就是直接请求了文件链接,然后判断了文件的类型,并完成了文件保存。
视频下载的 Handler 只需要继承当前的 FileHandler 即可:
from douyin.handlers import FileHandler
from douyin.structures import Video
class VideoFileHandler(FileHandler):
async def process(self, obj, **kwargs):
"""
process video obj
:param obj:
:param kwargs:
:return:
"""
if isinstance(obj, Video):
return await self._process(obj, **kwargs)
这里其实就是加了类别判断,确保数据类型的一致性,当然音频下载也是一样的。
异步 MongoDB 存储
上面介绍了视频和音频处理的 Handler,另外还有一个存储的 Handler 没有介绍,那就是 MongoDB 存储,平常我们可能习惯使用 PyMongo 来完成存储,但这里我们为了加速,需要支持异步操作,所以这里有一个可以实现异步 MongoDB 存储的库,叫做 Motor,其实使用的方法差不太多,MongoDB 的连接对象不再是 PyMongo 的 MongoClient 了,而是 Motor 的 AsyncIOMotorClient,其他的配置基本类似。
在存储时使用的是 update_one 方法并开启了 upsert 参数,这样可以做到存在即更新,不存在即插入的功能,保证数据的不重复性。
整个 MongoDB 存储的 Handler 定义如下:
from douyin.handlers import Handler
from motor.motor_asyncio import AsyncIOMotorClient
from douyin.structures import *
class MongoHandler(Handler):
def __init__(self, conn_uri=None, db='douyin'):
"""
init save folder
:param folder:
"""
super().__init__()
if not conn_uri:
conn_uri = 'localhost'
self.client = AsyncIOMotorClient(conn_uri)
self.db = self.client[db]
async def process(self, obj, **kwargs):
"""
download to file
:param url: resource url
:param name: save name
:param kwargs:
:return:
"""
collection_name = 'default'
if isinstance(obj, Video):
collection_name = 'videos'
elif isinstance(obj, Music):
collection_name = 'musics'
collection = self.db[collection_name]
# save to mongodb
print('Saving', obj, 'to mongodb...')
if await collection.update_one({'id': obj.id}, {'$set': obj.json()}, upsert=True):
print('Saved', obj, 'to mongodb successfully')
else:
print('Error occurred while saving', obj)
可以看到我们在类中定义了 AsyncIOMotorClient 对象,并暴露了 conn_uri 连接字符串和 db 数据库名称,可以在声明 MongoHandler 类的时候指定 MongoDB 的链接地址和数据库名。
同样的 process 方法,这里使用 await 修饰了 update_one 方法,完成了异步 MongoDB 存储。
很多小伙伴在学习Python的过程中往往因为没有资料或者没人指导从而导致自己不想学下去了,因此我特意准备了大量的PDF书籍、视频教程,都免费送给大家!不管你是零基础还是有基础都可以获取到自己相对应的学习礼包!包括Python软件工具和2019最新入门到实战教程,(https://url.cn/59RWE1Z)复制到浏览器打开!
好,以上便是 douyin 库的所有的关键部分介绍,这部分内容可以帮助大家理解这个库的核心部分实现,另外可能对设计模式、面向对象思维以及一些实用库的使用有一定的帮助。
抖音python上的代码视频_资深程序员:十行Python代码教你爬取抖音视频!相关推荐
- python面向对象实例王者荣耀_大牛程序员利用Python开发王者荣耀带妹神器,一路直奔上王者...
王者荣耀 -很火的手游-简直老少通吃-令人发指-虽然操作简单-但为什么你还是会被虐, 其实 是有技巧的--本文Python大神带你研究王者荣耀各类英雄的出装小技巧,让你成为大神般的存在 前期准备 环境 ...
- python接私活王者_大牛程序员利用Python开发王者荣耀带妹神器,一路直奔上王者...
王者荣耀 -很火的手游-简直老少通吃-令人发指-虽然操作简单-但为什么你还是会被虐, 其实 是有技巧的--本文Python大神带你研究王者荣耀各类英雄的出装小技巧,让你成为大神般的存在 前期准备 环境 ...
- python程序员怎么建议_资深程序员对Python新手的八个建议,超级实用!
1. 项目文件事先做好归档 每次开始一个新工作的时候,以前的我总是贪图方便,Code.Data.文档都集中放在一个文件夹内,看起来很乱,一度让回溯过程十分痛苦,或者是换了部电脑,文件全都运行不行了,需 ...
- gz键盘增强小工具_资深程序员:Python中你不知道的那些小工具
python作为越来越流行的一种编程语言,不仅仅是因为它语言简单,有许多现成的包可以直接调用. python中还有大量的小工具,让你的python工作更有效率. 1. 快速共享 HTTP服务器 Sim ...
- 不会写代码也能当程序员?无代码来了,是福还是祸?
现阶段程序员都是自己写代码,"无代码"这种技术尚未引起太大的关注,有的人认为无代码编程会把简单的问题搞复杂,有的人认为无代码的发展可能会断送程序员的饭碗.那么无代码到底意味着什么? ...
- python海龟画图代码大全_【程序源代码】python 海龟画图
关键字: 正文 | 内容 今天这篇文章主要是介绍:python 海龟画图画一个正方形图案 01 - Turtle库是Python语言中一个很流行的绘制图像的函数库,想象一个小乌龟,在一个横轴为x.纵轴 ...
- python爬图片代码大全_爬虫入门教程⑩— 用漂亮的图表展示爬取到的数据
经过了前面的努力,我们成功获取到了数据,并且学会了保存,但是只是用网页展示出来,是不是有一些不够美观呢? 所以本节的内容是:数据的可视化.拿到了数据却不能使其简单易懂并且足够突出,那就是不是好的数据工 ...
- python全栈人工智能192集视频_黑马程序员分享:python全栈开发环境构建
Sublime简介 Sublime Text是一个代码编辑器.也是HTML和散文先进的文本编辑器.漂亮的用户界面和非凡的功能,例如:多选择,Python插件,代码段等等.完全可自定义键绑定,菜单和工具 ...
- python写小猪佩奇_这个程序员用 Python 20 秒画完小猪佩奇“社会人”!
点击上方"CSDN",选择"置顶公众号" 关键时刻,第一时间送达! 作者 | 丁彦军 责编 | 唐小引 每天写代码的程序员,你们知道今年社交平台上最火的带货女王 ...
最新文章
- oracle实现数据目录共享,为共享文件系统创建特定于节点的文件和目录
- 2021年加拿大工程院院士名单出炉,杨强、张大鹏、刘学等多位华人入选
- JAVA——使用Spring Boot Scheduled时注入simple-robot Bot解决方案
- 对Spring框架的理解(转)
- Java System.arraycopy()方法示例
- 2020收官--Filter4Go
- Mac创建一个vue项目
- 微信端和手机qq浏览器输入框不能输入汉字
- EXCLE为什么双击横杠日期才能变成斜杠日期
- Smart210学习记录-------内存初始化
- 我的阿里巴巴图标公开库
- android 华为mate 获取定位权限,【严重安全问题】开启了定位权限,但是软件还是没权限...
- 目前计算机主流配置及选购,计算机主流配置及选购.doc
- 热用图片怎么表示简笔画,网络简笔画图片大全
- 《Pytorch 模型推理及多任务通用范式》_第3节课
- Python PIL.Image之制作GIF图片
- 基于docker jenkins nginx gitee实现前端自动化部署
- 超实用的15个shell脚本,有手就会,拿走不谢
- python股票全套系统_GitHub - hyspider/stock: stock,股票系统。使用python进行开发。
- “陀螺财经产业区块链平台”上线,现正式开放入驻!