【摘要】 最近研究了一下抖音的爬虫,目前实现了热门话题和热门音乐下面所有相关视频的爬取,并且我已经将该爬虫打包成了一个 Python 库并发布,名称就叫做 douyin,利用该库可以使用不到 10 行代码完成热门视频的下载、相关音乐的下载以及结构化信息的存储。本文就来详细介绍一下这个库的用法和一些核心逻辑实现。

实例演示

在开始介绍之前,我们就先看看这个库能达到怎样的爬取效果吧,这里我们想要爬取的部分是这这样的:

这里是抖音搜索界面热门话题和热门音乐部分,每一个话题或音乐都有着非常高的热度,而且每个热门话题或音乐下面都是相关的抖音视频。

下面我们要做的就是把所有热门话题和音乐下的相关视频都爬取到,并且将爬到的视频下载下来,同时还要把视频所配的音乐也单独下载下来,不仅如此,所有视频的相关信息如发布人、点赞数、评论数、发布时间、发布人、发布地点等等信息都需要爬取下来,并存储到 MongoDB 数据库。

听起来似乎挺繁琐的是吧?其实有了 douyin 这个库,我们不到 10 行代码就可以完成上面的任务了!其 GitHub 地址是:https://github.com/Python3WebSpider/DouYin。

首先第一步我们需要安装一下 douyin 库,命令如下:

pip3 install douyin

使用示例如下:

import douyin
from douyin.structures import Topic, Music# 定义视频下载、音频下载、MongoDB 存储的处理器
video_file_handler = douyin.handlers.VideoFileHandler(folder='./videos')
music_file_handler = douyin.handlers.MusicFileHandler(folder='./musics')
mongo_handler = douyin.handlers.MongoHandler()
# 定义下载器,并将三个处理器当做参数传递
downloader = douyin.downloaders.VideoDownloader([mongo_handler, video_file_handler, music_file_handler])
# 循环爬取抖音热榜信息并下载存储
for result in douyin.hot.trend():for item in result.data:# 爬取热门话题和热门音乐下面的所有视频,每个话题或音乐最多爬取 100 个相关视频。downloader.download(item.videos(max=100))

好,这样就完成了,运行这段代码,即可以完成热门话题、热门音乐下面所有视频和音乐的爬取,并将相关信息存储到 MongoDB 数据库。

另外值得注意的是,在运行这段代码之前首先需要安装好 MongoDB 数据库并成功开启服务,这样才能确保代码可以正常连接数据库并把数据成功存储。

我们看下运行效果:

运行截图如下:

在这里我们可以看到视频被成功存储到了 MongoDB 数据库,并且执行了下载,将视频存储到了本地(音频的的存储没有显示)。

最后我们看下爬取结果是怎样的,下面是爬取到的音频、视频和视频相关信息:

可以看到视频配的音乐被存储成了 mp3 格式的文件,抖音视频存储成了 mp4 文件,另外视频相关信息如视频描述、作者、音乐、点赞数、评论数等等的信息都已经存储到了 MongoDB 数据库,另外里面还包括了爬取时间、视频链接、分辨率等等额外的信息。

对!就是这么简单,通过这几行代码,我们就得到了如上的三部分结果,而这只需要安装 douyin 这个库即可实现。

代码解读

下面我们来剖析一下这个库的关键技术部分的实现,代码的地址是在:https://github.com/Python3WebSpider/DouYin,在此之前大家可以先将代码下载下来大体浏览一下。

本库依赖的其他库有:

aiohttp:利用它可以完成异步数据下载,加快下载速度。

dateparser:利用它可以完成任意格式日期的转化。

motor:利用它可以完成异步 MongoDB 存储,加快存储速度。

requests:利用它可以完成最基本的 HTTP 请求模拟。

tqdm:利用它可以进行进度条的展示。

下面我就几个部分的关键实现对库的实现进行代码说明。

数据结构定义

如果要做一个库的话,一个很重要的点就是对一些关键的信息进行结构化的定义,使用面向对象的思维对某些对象进行封装,抖音的爬取也不例外。

在抖音中,其实有很多种对象,比如视频、音乐、话题、用户、评论等等,它们之间通过某种关系联系在一起,例如视频中使用了某个配乐,那么视频和音乐就存在使用关系;比如用户发布了视频,那么用户和视频就存在发布关系,我们可以使用面向对象的思维对每个对象进行封装,比如视频的话,就可以定义成如下结构:

class Video(Base):def __init__(self, **kwargs):"""init video object:param kwargs:"""super().__init__()self.id = kwargs.get('id')self.desc = kwargs.get('desc')self.author = kwargs.get('author')self.music = kwargs.get('music')self.like_count = kwargs.get('like_count')self.comment_count = kwargs.get('comment_count')self.share_count = kwargs.get('share_count')self.hot_count = kwargs.get('hot_count')...self.address = kwargs.get('address')def __repr__(self):"""video to str:return: str"""return '<Video: <%s, %s>>' % (self.id, self.desc[:10].strip() if self.desc else None)

这里将一些关键的属性定义成 Video 类的一部分,包括 id 索引、desc 描述、author 发布人、music 配乐等等,其中 author 和 music 并不是简单的字符串的形式,它也是单独定义的数据结构,比如 author 就是 User 类型的对象,而 User 的定义又是如下结构:

class User(Base):def __init__(self, **kwargs):"""init user object:param kwargs:"""super().__init__()self.id = kwargs.get('id')self.gender = kwargs.get('gender')self.name = kwargs.get('name')self.create_time = kwargs.get('create_time')self.birthday = kwargs.get('birthday')...def __repr__(self):"""user to str:return:"""return '<User: <%s, %s>>' % (self.alias, self.name)
所以说,通过属性之间的关联,我们就可以将不同的对象关联起来,这样显得逻辑架构清晰,而且我们也不用一个个单独维护字典来存储了,其实这就和 Scrapy 里面的 Item 的定义是类似的。
请求和重试
实现爬取的过程就不必多说了,这里面其实用到的就是最简单的抓包技巧,使用 Charles 直接进行抓包即可。抓包之后便可以观察到对应的接口请求,然后进行模拟即可。
所以问题就来了,难道我要一个接口写一个请求方法吗?另外还要配置 Headers、超时时间等等的内容,那岂不是太费劲了,所以,我们可以将请求的方法进行单独的封装,这里我定义了一个 fetch 方法:
def _fetch(url, **kwargs):"""fetch api response:param url: fetch url:param kwargs: other requests params:return: json of response"""response = requests.get(url, **kwargs)if response.status_code != 200:raise requests.ConnectireplaceString('Expected status code 200, but got {}'.format(response.status_code))return response.json()这个方法留了一个必要参数,即 url,另外其他的配置我留成了 kwargs,也就是可以任意传递,传递之后,它会依次传递给 requests 的请求方法,然后这里还做了异常处理,如果成功请求,即可返回正常的请求结果。

定义了这个方法,在其他的调用方法里面我们只需要单独调用这个 fetch 方法即可,而不需要再去关心异常处理,返回类型了。

好,那么定义好了请求之后,如果出现了请求失败怎么办呢?按照常规的方法,我们可能就会在外面套一层方法,然后记录调用 fetch 方法请求失败的次数,然后重新调用 fetch 方法进行重试,但这里可以告诉大家一个更好用的库,叫做 retrying,使用它我们可以通过定义一个装饰器来完成重试的操作。

比如我可以使用 retry 装饰器这么装饰 fetch 方法:

from retrying import retry
@retry(stop_max_attempt_number=retry_max_number, wait_random_min=retry_min_random_wait,wait_random_max=retry_max_random_wait, retry_on_exception=need_retry)
def _fetch(url, **kwargs):pass
这里使用了装饰器的四个参数:
stop_max_attempt_number:最大重试次数,如果重试次数达到该次数则放弃重试。
wait_random_min:下次重试之前随机等待时间的最小值。
wait_random_max:下次重试之前随机等待时间的最大值。
retry_on_exception:判断出现了怎样的异常才重试。
这里 retry_on_exception 参数指定了一个方法,叫做 need_retry,方法定义如下:
def need_retry(exception):"""need to retry:param exception::return:"""result = isinstance(exception, (requests.ConnectireplaceString, requests.ReadTimeout))if result:print('Exception', type(exception), 'occurred, retrying...')return result

这里判断了如果是 requests 的 ConnectireplaceString 和 ReadTimeout 异常的话,就会抛出异常进行重试,否则不予重试。

所以,这样我们就实现了请求的封装和自动重试,是不是非常 Pythonic?

下载处理器的设计

为了下载视频,我们需要设计一个下载处理器来下载已经爬取到的视频链接,所以下载处理器的输入就是一批批的视频链接,下载器接收到这些链接,会将其进行下载处理,并将视频存储到对应的位置,另外也可以完成一些信息存储操作。

在设计时,下载处理器的要求有两个,一个是保证高速的下载,另一个就是可扩展性要强,下面我们分别来针对这两个特点进行设计:

高速下载,为了实现高速的下载,要么可以使用多线程或多进程,要么可以用异步下载,很明显,后者是更有优势的。

扩展性强,下载处理器要能下载音频、视频,另外还可以支持数据库等存储,所以为了解耦合,我们可以将视频下载、音频下载、数据库存储的功能独立出来,下载处理器只负责视频链接的主要逻辑处理和分配即可。

为了实现高速下载,这里我们可以使用 aiohttp 库来完成,另外异步下载我们也不能一下子下载太多,不然网络波动太大,所以我们可以设置 batch 式下载,可以避免同时大量的请求和网络拥塞,主要的下载函数如下:

def download(self, inputs):"""download video or video lists:param data::return:"""if isinstance(inputs, types.GeneratorType):temps = []for result in inputs:print('Processing', result, '...')temps.append(result)if len(temps) == self.batch:self.process_items(temps)temps = []else:inputs = inputs if isinstance(inputs, list) else [inputs]self.process_items(inputs)
这个 download 方法设计了多种数据接收类型,可以接收一个生成器,也可以接收单个或列表形式的视频对象数据,接着调用了 process_items 方法进行了异步下载,其方法实现如下:
def process_items(self, objs):"""process items:param objs: objs:return:"""# define progress barwith tqdm(total=len(objs)) as self.bar:# init event looploop = asyncio.get_event_loop()# get num of batchestotal_step = int(math.ceil(len(objs) / self.batch))# for every batchfor step in range(total_step):start, end = step * self.batch, (step + 1) * self.batchprint('Processing %d-%d of files' % (start + 1, end))# get batch of objsobjs_batch = objs[start: end]# define tasks and run looptasks = [asyncio.ensure_future(self.process_item(obj)) for obj in objs_batch]for task in tasks:task.add_done_callback(self.update_progress)loop.run_until_complete(asyncio.wait(tasks))
这里使用了 asyncio 实现了异步处理,并通过对视频链接进行分批处理保证了流量的稳定性,另外还使用了 tqdm 实现了进度条的显示。
我们可以看到,真正的处理下载的方法是 process_item,这里面会调用视频下载、音频下载、数据库存储的一些组件来完成处理,由于我们使用了 asyncio 进行了异步处理,所以 process_item 也需要是一个支持异步处理的方法,定义如下:
async def process_item(self, obj):"""process item:param obj: single obj:return:"""if isinstance(obj, Video):print('Processing', obj, '...')for handler in self.handlers:if isinstance(handler, Handler):await handler.process(obj)

这里我们可以看到,真正的处理逻辑都在一个个 handler 里面,我们将每个单独的功能进行了抽离,定义成了一个个 Handler,这样可以实现良好的解耦合,如果我们要增加和关闭某些功能,只需要配置不同的 Handler 即可,而不需要去改动代码,这也是设计模式的一个解耦思想,类似工厂模式。

Handler 的设计

刚才我们讲了,Handler 就负责一个个具体功能的实现,比如视频下载、音频下载、数据存储等等,所以我们可以将它们定义成不同的 Handler,而视频下载、音频下载又都是文件下载,所以又可以利用继承的思想设计一个文件下载的 Handler,定义如下:

from os.path import join, exists
from os import makedirs
from douyin.handlers import Handler
from douyin.utils.type import mime_to_ext
import aiohttpclass FileHandler(Handler):def __init__(self, folder):"""init save folder:param folder:"""super().__init__()self.folder = folderif not exists(self.folder):makedirs(self.folder)async def _process(self, obj, **kwargs):"""download to file:param url: resource url:param name: save name:param kwargs::return:"""print('Downloading', obj, '...')kwargs.update({'ssl': False})kwargs.update({'timeout': 10})async with aiohttp.ClientSession() as session:async with session.get(obj.play_url, **kwargs) as response:if response.status == 200:extension = mime_to_ext(response.headers.get('Content-Type'))full_path = join(self.folder, '%s.%s' % (obj.id, extension))with open(full_path, 'wb') as f:f.write(await response.content.read())print('Downloaded file to', full_path)else:print('Cannot download %s, response status %s' % (obj.id, response.status))async def process(self, obj, **kwargs):"""process obj:param obj::param kwargs::return:"""return await self._process(obj, **kwargs)

这里我们还是使用了 aiohttp,因为在下载处理器中需要 Handler 支持异步操作,这里下载的时候就是直接请求了文件链接,然后判断了文件的类型,并完成了文件保存。

视频下载的 Handler 只需要继承当前的 FileHandler 即可:

from douyin.handlers import FileHandler
from douyin.structures import Videoclass VideoFileHandler(FileHandler):async def process(self, obj, **kwargs):"""process video obj:param obj::param kwargs::return:"""if isinstance(obj, Video):return await self._process(obj, **kwargs)

这里其实就是加了类别判断,确保数据类型的一致性,当然音频下载也是一样的。

异步 MongoDB 存储

上面介绍了视频和音频处理的 Handler,另外还有一个存储的 Handler 没有介绍,那就是 MongoDB 存储,平常我们可能习惯使用 PyMongo 来完成存储,但这里我们为了加速,需要支持异步操作,所以这里有一个可以实现异步 MongoDB 存储的库,叫做 Motor,其实使用的方法差不太多,MongoDB 的连接对象不再是 PyMongo 的 MongoClient 了,而是 Motor 的 AsyncIOMotorClient,其他的配置基本类似。

在存储时使用的是 update_one 方法并开启了 upsert 参数,这样可以做到存在即更新,不存在即插入的功能,保证数据的不重复性。

整个 MongoDB 存储的 Handler 定义如下:

from douyin.handlers import Handler
from motor.motor_asyncio import AsyncIOMotorClient
from douyin.structures import *class MongoHandler(Handler):def __init__(self, conn_uri=None, db='douyin'):"""init save folder:param folder:"""super().__init__()if not conn_uri:conn_uri = 'localhost'self.client = AsyncIOMotorClient(conn_uri)self.db = self.client[db]async def process(self, obj, **kwargs):"""download to file:param url: resource url:param name: save name:param kwargs::return:"""collection_name = 'default'if isinstance(obj, Video):collection_name = 'videos'elif isinstance(obj, Music):collection_name = 'musics'collection = self.db[collection_name]# save to mongodbprint('Saving', obj, 'to mongodb...')if await collection.update_one({'id': obj.id}, {'$set': obj.json()}, upsert=True):print('Saved', obj, 'to mongodb successfully')else:print('Error occurred while saving', obj)

可以看到我们在类中定义了 AsyncIOMotorClient 对象,并暴露了 conn_uri 连接字符串和 db 数据库名称,可以在声明 MongoHandler 类的时候指定 MongoDB 的链接地址和数据库名。

同样的 process 方法,这里使用 await 修饰了 update_one 方法,完成了异步 MongoDB 存储。

好,以上便是 douyin 库的所有的关键部分介绍,这部分内容可以帮助大家理解这个库的核心部分实现,另外可能对设计模式、面向对象思维以及一些实用库的使用有一定的帮助。

总结

本文介绍了一个可以用来爬取抖音热门视频的 Python 库,并介绍了该库的基本用法和核心部分实现,希望对大家有所帮助。

本抖音库的 GitHub 地址是:https://github.com/Python3WebSpider/DouYin,如果你对你有帮助,还请赐予一个 Star!非常感谢!

来源:华为云社区  作者:崔庆才丨静觅

相关推荐

Python爬虫偷懒神器 — 快速构造请求头!

用前考虑清楚,伤敌一千自损八百的字体反爬虫

一线大厂在用的反爬虫方法,看我如何破了它!

Python爬虫帮你打包下载所有抖音好听的背景音乐,还不快收藏一起听歌

Python爬虫从入门到精通——基本库re的使用:正则表达式

对你没有看错!不到 10 行代码完成抖音热门视频的爬取!相关推荐

  1. 爬虫python代码-Python爬虫入门(01) -- 10行代码实现一个爬虫

    跟我学习Python爬虫系列开始啦.带你简单快速高效学习Python爬虫. 一.快速体验一个简单爬虫 以抓取简书首页文章标题和链接为例 简书首页 就是以上红色框内文章的标签,和这个标题对应的url链接 ...

  2. python爬虫代码-Python爬虫入门(01) -- 10行代码实现一个爬虫

    跟我学习Python爬虫系列开始啦.带你简单快速高效学习Python爬虫. 一.快速体验一个简单爬虫 以抓取简书首页文章标题和链接为例 简书首页 就是以上红色框内文章的标签,和这个标题对应的url链接 ...

  3. 10 行代码,9 行报错,8 个警告…

    (给程序员的那些事加星标) 配文:程序员的那些事(id:iProgrammer) 10 行代码,9 行报错,8 个警告, 吓得我的七魂六魄,已五零四散了, 这感觉岂能是三言两语能说清, 一气之下归零了 ...

  4. 什么,PyTorch还能开发新药?哈佛推出这款工具包,10行代码训练“药神”模型...

    萧箫 编辑整理 量子位 报道 | 公众号 QbitAI 最近,来自哈佛大学等机构的研究人员,开发出了一个AI"药神"工具包,为加速新冠疫情下的新药研发助力. 这款名为DeepPur ...

  5. 我是如何用10行代码搬运目标图片的?

    嗯呢,你没看错,就是教你把一个路径下的所有目标图片搬运到制定路径下.有读者说:小詹你忽悠人吧,要搬运目标图片复制粘贴不就好了嘛,要什么代码,搬砖脑子秀逗了? 咳咳,对于目标文件夹复制粘贴当然可以,还简 ...

  6. 哈佛推出这款PyTorch工具包,10行代码训练“AI药神”模型

    点上方蓝字计算机视觉联盟获取更多干货 在右上方 ··· 设为星标 ★,与你不见不散 仅作学术分享,不代表本公众号立场,侵权联系删除 转载于:量子位 AI博士笔记系列推荐 周志华<机器学习> ...

  7. 万万想不到 10行代码搞定一个决策树

    01决策树模拟实验 文章目录 01决策树模拟实验 要求 决策树简单介绍 搭建环境 产生数据集 划分训练集和测试集 生成决策树 Cross-Validation法 可视化决策树 10行代码搞定决策树 要 ...

  8. 10 行代码玩转 NumPy!

    作者 | 天元浪子 来源 | Python作业辅导员 NumPy也可以画图吗?当然!NumPy不仅可以画,还可以画得更好.画得更快!比如下面这幅画,只需要10行代码就可以画出来.若能整明白这10行代码 ...

  9. [Unity Editor]10行代码搞定Hierarchy排序

    在日常的工作和研究中,当给我们的场景摆放过多的物件的时候,Hierarchy面板就会变得杂乱不堪.比如这样:     过多的层次结构充斥在里面,根层的物件毫无序列可言,整个层次面板显示非常的杂乱不堪, ...

最新文章

  1. GitHub上的“金矿”(236个Python开源项目,涵盖了15个领域)
  2. 10 款可以找回删除文件的好软件
  3. python之内置函数(二)与匿名函数、递归函数初识
  4. alibaba sentinel限流组件 源码分析
  5. C#反射与特性(一):反射基础
  6. hibernate 标识符_Hibernate中的标识符
  7. 超简单:解析 yml 类型(application.yml)配置文件 、springboot 工程读取 yml 文件中的值
  8. Python字符串与编码
  9. android开发比例图表,Android开发中如何使用绘制图表
  10. 如何设计自动化测试的代码结构
  11. [附源码]Java计算机毕业设计SSM宠物管理系统
  12. STM32F205 STM32F207 STM32F215 STM332F217 用户手册,使用手册,编程手册三合一下载地址
  13. Snipaste 屏幕截图软件超级利器
  14. VS如何导入已有项目文件夹
  15. SAP工厂日历的应用
  16. QT概念详解及开发入门简介
  17. 评说于国富律师的“免费正版化”
  18. ROS安装/// rosdep update/the read operation is timed out
  19. 聊天机器人框架Rasa资源整理
  20. 如何看linux是ubuntu还是centos

热门文章

  1. 江西省中小学生学籍管理-初(高)中学校本省内进行自主招生(6)
  2. 改写 cublas LU分解为 cublas matinv的示例
  3. 华为云 - 图片内容审核(涉黄涉暴涉政涉广告检测)
  4. 双绞线的制作(常用568B)
  5. FS2956A 输入8-120V 用于液晶仪表5V-USB 充电口方案
  6. 法国推出新型卫星地图Geoportail!欲挑战Google
  7. python3使用requests和csv库抓取某地市肯德基门店地址并存入csv文件
  8. 华为任正非:活下来!
  9. 海口计算机中专学院,海口市第一职业中学专业_2021年海口市第一职业中学招生专业_中专专业_2020年技校专业_技校网...
  10. IBM p570更换硬盘