爬虫入门实战:斗鱼弹幕数据抓取,附送11节入门笔记
爬虫学习 | 详细内容 | 详细笔记 |
---|---|---|
第一课 | 代理池概述以及开发环境 | https://blog.csdn.net/itcast_cn/article/details/123678415 |
第二课 | 代理池的设计 | https://blog.csdn.net/itcast_cn/article/details/123728552 |
第三课 | 实现代理池思路 | https://blog.csdn.net/itcast_cn/article/details/123729170 |
第四课 | 定义代理IP的数据模型类 | https://blog.csdn.net/itcast_cn/article/details/123789171 |
第五课 | 实现代理池工具模块 | https://blog.csdn.net/itcast_cn/article/details/123678415 |
第六课 | 实现代理池的校验模块 | https://blog.csdn.net/itcast_cn/article/details/123817860 |
第七课 | 实现代理池的数据库模块 | https://blog.csdn.net/itcast_cn/article/details/123817955 |
第八课 | 实现代理池的爬虫模块 | https://blog.csdn.net/itcast_cn/article/details/123852861 |
第九课 | 实现代理池的检测模块 | https://blog.csdn.net/itcast_cn/article/details/123880187 |
第十课 | 实现代理池的API模块 | https://blog.csdn.net/itcast_cn/article/details/123957650 |
第十一课 | 实现代理池的启动入口 | https://blog.csdn.net/itcast_cn/article/details/124015476 |
斗鱼弹幕
学习目标
- 掌握
asyncore
模块使用 - 实现斗鱼弹幕数据抓取
预备知识
asyncore
模块
介绍
这个模块为异步socket的服务器客户端通信提供简单的接口。该模块提供了异步socket服务客户端和服务器的基础架构。
相比python原生的socket api,asyncore具备有很大的优势,asyncore对原生的socket进行封装,提供非常简洁优秀的接口,利用asyncore覆写相关需要处理的接口方法,就可以完成一个socket的网络编程,从而不需要处理复杂的socket网络状况以及多线程处理等等。
实现流程
搭建 Socket 服务器环境
nc -l 9000
客户端 Socket 开发基本使用
定义类继承自
asyncore.dispatcher
class SocketClient(asyncore.dispatcher):
实现类中的回调代码
实现构造函数
调用父类方法
asyncore.dispatcher.__init__(self)
创建 Socket 对象
self.create_socket()
连接服务器
address = (host,port) self.connect(address)
实现
handle_connect
回调函数当
Socket
连接服务器成功时回调该函数def handle_connect(self):print("连接成功")
实现
writable
回调函数描述是否有数据需要被发送到服务器。返回值为
True
表示可写,False
表示不可写,如果不实现默认返回为True
,当返回True
时,回调函数handle_write
将被触发def writable(self):return True
实现
handle_write
回调函数当有数据需要发送时(
writable
回调函数返回True时),该函数被触发,通常情况下在该函数中编写 send 方法发送数据def handle_write(self):# 内部实现对服务器发送数据的代码# 调用 send 方法发送数据,参数是字节数据self.send('hello world\n'.encode('utf-8'))
实现
readable
回调函数描述是否有数据从服务端读取。返回
True
表示有数据需要读取,False
表示没有数据需要被读取,当不实现默认返回为True
,当返回True
时,回调函数handle_read
将被触发def readable(self):return True
实现
handle_read
回调函数当有数据需要读取时触发(
readable
回调函数返回True
时),该函数被触发,通常情况下在该函数中编写recv
方法接收数据def handle_read(self):# 主动接收数据,参数是需要接收数据的长度# 返回的数据是字节数据result = self.recv(1024)print(result)
实现
handle_error
回调函数当程序运行过程发生异常时回调
def handle_error(self):# 编写处理错误方法t,e,trace = sys.exc_info()self.close()
实现
handle_close
回调函数当连接被关闭时触发
def handle_close(self):print("连接关闭")self.close()
创建对象并且执行
asyncore.loop
进入运行循环timeout
表示一次循环所需要的时长
client = SocketClient('127.0.0.1',9000) # 开始启动运行循环 asyncore.loop(timeout=5)
斗鱼弹幕实战
文档资料
斗鱼弹幕服务器第三方接入协议v1.6.2.pdf
官方提供协议文档
弹幕客户端开发流程
- 连接初始化
- 使用TCP连接服务器
- IP地址:openbarrage.douyutv.com
- 端口:8601
- 客户端向弹幕服务器发送登录请求,登录弹幕服务器
- 弹幕服务器收到客户端登录请求并完成登录后,返回登录成功消息给客户端
- 客户端收到登录成功消息后发送进入弹幕分组请求给弹幕服务器
- 弹幕服务器接受到客户端弹幕分组请求后将客户端添加到请求指定的弹幕分组中
- 使用TCP连接服务器
- 服务过程
- 客户端每隔 45 秒发送心跳给弹幕服务器,弹幕服务器回复心跳信息给客户端
- 弹幕服务器如有广播信息,则推送给客户端,服务器消息协议
- 断开连接
- 客户端发送登出消息
- 客户端关闭 TCP 连接
数据发送和接收流程
数据包讲解
- 消息长度:4 字节小端整数,表示整条消息(包括自身)长度(字节数)。 消息长度出现两遍,二者相同。
- 消息类型:2 字节小端整数,表示消息类型。取值如下:
- 689 客户端发送给弹幕服务器的文本格式数据
- 690 弹幕服务器发送给客户端的文本格式数据。
- 加密字段:暂时未用,默认为 0。
- 保留字段:暂时未用,默认为 0。
- 数据部分:斗鱼独创序列化文本数据,结尾必须为‘\0’。详细序列化、反
序列化算法见下节。(所有协议内容均为 UTF-8 编码)
数据包封装
对数据包进行对象化封装,对数据的封装方便以后使用,实现对象和二进制数据之间的转换
- 通过参数构建数据包对象
- 实现获取数据包长度的方法
- 实现获取二进制数据的方法
实现发送数据包
构建发送数据包的容器
self.send_queue = Queue()
实现回调函数,判断容器中有数据就发送没有数据不发送
def writable(self):return self.send_queue.qsize() > 0def handle_write(self):# 从发送数据包队列中获取数据包对象dp = self.send_queue.get()# 获取数据包的长度,并且发送给服务器dp_length = dp.get_length()dp_length_data = dp_length.to_bytes(4,byteorder='little',signed=False)self.send(dp_length_data)# 发送数据包二进制数据self.send(dp.get_bytes())self.send_queue.task_done()pass
实现登录函数
构建登录数据包
content = "type@=loginreq/roomid@={}/".format(room_id) login_dp = DataPacket(DATA_PACKET_TYPE_SEND,content=content)
把数据包添加到发送数据包容器中
# 把数据包添加到发送数据包容器中self.send_queue.put(login_dp)
实现接收数据
构建接收数据包队列
# 存放接收的数据包对象 self.recv_queue = Queue()
读取回调函数中读取数据
读取长度
# 读取长度,二进制数据 data_length_data = self.recv(4) # 通过二进制获取length 具体数据 data_length = int.from_bytes(data_length_data,byteorder='little',signed=False)
读取内容
# 通过数据包的长度获取数据 data = self.recv(data_length)
构建数据包对象
数据包构造函数中解析二进制来构建数据包对象
self.type = int.from_bytes(data_bytes[4:6],byteorder='little',signed=False)self.encrypt_flag = int.from_bytes(data_bytes[6:7],byteorder='little',signed=False)self.preserve_flag = int.from_bytes(data_bytes[7:8],byteorder='little',signed=False)# 构建数据部分self.content = str(data_bytes[8:-1],encoding='utf-8')
通过二进制数据构建数据包对象
# 通过二进制数据构建数据包对象 dp = DataPacket(data_bytes=data)
把数据包放入接收数据包容器中
# 把数据包放入接收数据包容器中 self.recv_queue.put(dp)
构建处理线程专门处理接收数据包容器中数据
构建线程
# 构建一个专门处理接收数据包容器中的数据包的线程 self.callback_thread = threading.Thread(target=self.do_callback) self.callback_thread.setDaemon(True) self.callback_thread.start()
实现回调函数处理接收的数据包
def do_callback(self):'''专门负责处理接收数据包容器中的数据:return: '''while True:# 从接收数据包容器中获取数据包dp = self.recv_queue.get()# 对数据进行处理print(dp.content)pass
实现外部传入回调函数
通过外部指定回调函数实现自定义数据处理
添加参数
callback
构造函数中添加参数
def __init__(self,host,port,callback=None):# 定义外部传入的自定义回调函数self.callback = callback
外部传入自定义回调函数
def data_callback(dp):'''自定义回调函数:param dp: 数据包对象:return: '''print("data_callback:",dp.content)passif __name__ == '__main__':client = DouyuClient('openbarrage.douyutv.com',8601,callback=data_callback)client.login_room_id(4494106)asyncore.loop(timeout=10)
在处理接收数据包的线程中调用回调函数
def do_callback(self):'''专门负责处理接收数据包容器中的数据:return: '''while True:# 从接收数据包容器中获取数据包dp = self.recv_queue.get()# 对数据进行处理if self.callback is not None:self.callback(dp)self.recv_queue.task_done()
数据内容序列化与反序列化
- 键 key 和值 value 直接采用‘@=’分割
- 数组采用‘/’分割
- 如果 key 或者 value 中含有字符‘/’,则使用‘@S’转义
- 如果 key 或者 value 中含有字符‘@’,使用‘@A’转义
例子
多个键值对数据:key1@=value1/key2@=value2/key3@=value3/
数组数据:value1/value2/value3/
登录
参看斗鱼弹幕文档
加入弹幕分组
参看斗鱼弹幕文档,
-9999
为海量弹幕
心跳机制
作用是让服务器解决假死连接问题,客户端必须每隔45秒发送一次请求,否则就会被主动断开。
- 实现发送心跳函数
- 构建心跳数据包
- 把数据包添加到发送数据包容器队列中
- 构建心跳线程
- 构建心跳线程
- 添加触发机制
- 添加暂停机制
代码实现
#!/usr/bin/python3
# -*- coding: utf-8 -*-import asyncore
import sys
import threading
import timefrom queue import QueueDATA_PACKET_TYPE_SEND = 689
DATA_PACKET_TYPE_RECV = 690def encode_content(content):'''序列化函数:param content: 需要序列化的内容 :return: '''if isinstance(content,str):return content.replace(r'@',r'@A').replace(r'/',r'@S')elif isinstance(content,dict):return r'/'.join(["{}@={}".format(encode_content(k),encode_content(v)) for k,v in content.items()]) + r'/'elif isinstance(content,list):return r'/'.join([encode_content(data) for data in content]) + r'/'return ""def decode_to_str(content):'''反序列化字符串:param content:字符串数据 :return: '''if isinstance(content,str):return content.replace(r'@S',r'/').replace('@A',r'@')return ""def decode_to_dict(content):'''反序列化字典数据:param content: 被序列化的字符串:return: '''ret_dict = dict()if isinstance(content,str):item_strings = content.split(r'/')for item_string in item_strings:k_v_list = item_string.split(r'@=')if k_v_list is not None and len(k_v_list) > 1:k = k_v_list[0]v = k_v_list[1]ret_dict[decode_to_str(k)] = decode_to_str(v)return ret_dictdef decode_to_list(content):'''反序列化列表数据:param content: 被序列化的字符串:return: '''ret_list = []if isinstance(content,str):items = content.split(r'/')for idx,item in enumerate(items):if idx < len(items) - 1:ret_list.append(decode_to_str(item))return ret_listclass DataPacket():def __init__(self,type=DATA_PACKET_TYPE_SEND,content="",data_bytes=None):if data_bytes is None:# 数据包的类型self.type = type# 数据部分内容self.content = contentself.encrypt_flag = 0self.preserve_flag = 0else:self.type = int.from_bytes(data_bytes[4:6],byteorder='little',signed=False)self.encrypt_flag = int.from_bytes(data_bytes[6:7],byteorder='little',signed=False)self.preserve_flag = int.from_bytes(data_bytes[7:8],byteorder='little',signed=False)# 构建数据部分self.content = str(data_bytes[8:-1],encoding='utf-8')def get_length(self):'''获取当前数据包的长度,为以后需要发送数据包做准备:return: '''return 4 + 2 + 1 + 1 + len(self.content.encode('utf-8')) + 1def get_bytes(self):'''通过数据包转换成 二进制数据类型:return: '''data = bytes()# 构建 4 个字节的消息长度数据data_packet_length = self.get_length()# to_bytes 把一个整型数据转换成二进制数据# 第一个参数 表示需要转换的二进制数据占几个字节# byteorder 第二个参数 描述字节序# signed 设置是否有符号# 处理消息长度data += data_packet_length.to_bytes(4,byteorder='little',signed=False)# 处理消息类型data += self.type.to_bytes(2,byteorder='little',signed=False)# 处理加密字段data += self.encrypt_flag.to_bytes(1,byteorder='little',signed=False)# 处理保留字段data += self.preserve_flag.to_bytes(1,byteorder='little',signed=False)# 处理数据内容data += self.content.encode('utf-8')# 添加 \0 数据data += b'\0'return dataclass DouyuClient(asyncore.dispatcher):def __init__(self,host,port,callback=None):# 构建发送数据包的队列容器# 存放了数据包对象self.send_queue = Queue()# 存放接收的数据包对象self.recv_queue = Queue()# 定义外部传入的自定义回调函数self.callback = callbackasyncore.dispatcher.__init__(self)self.create_socket()address = (host,port)self.connect(address)# 构建一个专门处理接收数据包容器中的数据包的线程self.callback_thread = threading.Thread(target=self.do_callback)self.callback_thread.setDaemon(True)self.callback_thread.start()# 构建心跳线程self.heart_thread = threading.Thread(target=self.do_ping)self.heart_thread.setDaemon(True)self.ping_runing = Falsepassdef handle_connect(self):print("连接成功")self.start_ping()def writable(self):return self.send_queue.qsize() > 0def handle_write(self):# 从发送数据包队列中获取数据包对象dp = self.send_queue.get()# 获取数据包的长度,并且发送给服务器dp_length = dp.get_length()dp_length_data = dp_length.to_bytes(4,byteorder='little',signed=False)self.send(dp_length_data)# 发送数据包二进制数据self.send(dp.get_bytes())self.send_queue.task_done()passdef readable(self):return Truedef handle_read(self):# 读取长度,二进制数据data_length_data = self.recv(4)# 通过二进制获取length 具体数据data_length = int.from_bytes(data_length_data,byteorder='little',signed=False)# 通过数据包的长度获取数据data = self.recv(data_length)# 通过二进制数据构建数据包对象dp = DataPacket(data_bytes=data)# 把数据包放入接收数据包容器中self.recv_queue.put(dp)def handle_error(self):t, e, trace = sys.exc_info()print(e)self.close()def handle_close(self):self.stop_ping()print("连接关闭")self.close()def login_room_id(self,room_id):self.room_id = room_idsend_data = {"type":"loginreq","roomid":str(room_id)}# 构建登录数据包content = encode_content(send_data)login_dp = DataPacket(DATA_PACKET_TYPE_SEND,content=content)# 把数据包添加到发送数据包容器中self.send_queue.put(login_dp)def join_room_group(self):'''加入弹幕分组:return: '''send_data = {"type":"joingroup","rid":str(self.room_id),"gid":'-9999'}content = encode_content(send_data)dp = DataPacket(type=DATA_PACKET_TYPE_SEND,content=content)self.send_queue.put(dp)passdef send_heart_data_packet(self):send_data = {"type":"mrkl"}content = encode_content(send_data)dp = DataPacket(type=DATA_PACKET_TYPE_SEND,content=content)self.send_queue.put(dp)def start_ping(self):'''开启心跳:return: '''self.ping_runing = Truedef stop_ping(self):'''结束心跳:return: '''self.ping_runing = Falsedef do_ping(self):'''执行心跳:return: '''while True:if self.ping_runing:self.send_heart_data_packet()time.sleep(40)def do_callback(self):'''专门负责处理接收数据包容器中的数据:return: '''while True:# 从接收数据包容器中获取数据包dp = self.recv_queue.get()# 对数据进行处理if self.callback is not None:self.callback(self,dp)self.recv_queue.task_done()passdef data_callback(client,dp):'''自定义回调函数:param dp: 数据包对象:return: '''resp_data = decode_to_dict(dp.content)if resp_data["type"] == "loginres":#调用加入分组请求print("登录成功:",resp_data)client.join_room_group()elif resp_data["type"] == "chatmsg":print("{}:{}".format(resp_data["nn"],resp_data["txt"]))elif resp_data["type"] == 'onlinegift':print("暴击鱼丸")elif resp_data["type"] == "uenter":print("{} 进入了房间".format(resp_data["nn"]))passif __name__ == '__main__':client = DouyuClient('openbarrage.douyutv.com',8601,callback=data_callback)client.login_room_id(4494106)asyncore.loop(timeout=10)
爬虫入门实战:斗鱼弹幕数据抓取,附送11节入门笔记相关推荐
- python爬取app播放的视频,Python爬虫工程师必学——App数据抓取实战视频教程
爬虫分为几大方向,WEB网页数据抓取.APP数据抓取.软件系统数据抓取.本课程主要为同学讲解如何用python实现App数据抓取,课程从开发环境搭建,App爬虫必备利器详解,项目实战,到最后的多App ...
- 爬虫—01-爬虫原理与数据抓取
爬虫的更多用途 12306抢票 网站上的头票 短信轰炸 关于Python网络爬虫,我们需要学习的有: Python基础语法学习(基础知识) 对HTML页面的内容抓取(数据抓取) 对HTML页面的数据提 ...
- 【RPA入门教程】UiBot数据抓取功能使用教学(二)
数据抓取功能使用说明 点击 UiBot 编辑器工具栏的[数据抓取]按钮,打开数据抓取工具 数据抓取工具需要先选取一个目标,点击选择目标按钮即可. 这个目标就是要采集的数据字段,如果要采集商品名,则先选 ...
- python chrome headless_实战Chrome Headless数据抓取(上)
先聊聊数据抓取技术选型 在我看来数据抓取可以分为三种场景: 基本稳定的源站格式或者大量的数据抓取.需要蜘蛛集群调度:使用Java比较方便,可以用WebMagic抓取配合Hadoop调度,如果源站经常改 ...
- [知识图谱实战篇] 一.数据抓取之Python3抓取JSON格式的电影实体
前面作者讲解了很多知识图谱相关的原理知识,包括知识图谱相关技术.Neo4j绘制关系图谱等,但还是缺少一个系统全面的实例.为了加深自己对知识图谱构建的认识,为后续创建贵州旅游知识图谱打下基础,作者学习了 ...
- python爬虫学习之电视剧弹幕的抓取
近期<你是我的荣耀>这部剧正在热播中,在首播当天收视率就上亿了,足以见得观众们对这部剧的一个期待程度.在连放了八集之后,这部剧目前也是好评满满,不少人都被杨洋和迪丽热巴的颜值所吸引了.小编 ...
- python实现食品推荐_通过Python语言实现美团美食商家数据抓取
首先,我们先来打开美团美食商家页面,来分析一下. 如上面所提供的URL即为美团美食商家页面.或者我们通过美团官网打开一个美团美食商家页面,打开步骤如下:1.打开浏览器,输入 即可打开美团北京首页 2. ...
- RPA机器人数据抓取典型案例全流程详解
数据抓取是实现流程自动化最关键的技能之一,尤其是Web数据抓取,但面对每个具体的业务场景和网站,如何稳定.高效地实现数据抓取? 在实战中进行数据抓取时,需要注意哪些问题? 这篇文章我们就来通过一个企查 ...
- Python爬虫入门实战之猫眼电影数据抓取(理论篇)
前言 本文可能篇幅较长,但是绝对干货满满,提供了大量的学习资源和途径.达到让读者独立自主的编写基础网络爬虫的目标,这也是本文的主旨,输出有价值能够真正帮助到读者的知识,即授人以鱼不如授人以渔,让我们直 ...
- python编程理论篇_Python爬虫入门实战之猫眼电影数据抓取(理论篇)
前言 本文可能篇幅较长,但是绝对干货满满,提供了大量的学习资源和途径.达到让读者独立自主的编写基础网络爬虫的目标,这也是本文的主旨,输出有价值能够真正帮助到读者的知识,即授人以鱼不如授人以渔,让我们直 ...
最新文章
- test markdown
- 服务发现存储仓库 etcd 使用简介
- Pod定义YAML文件详解
- postconstruct_@PostConstruct注解,你该好好看看
- Deque(双向队列 c++模版实现 算法导论第三版第十章10.1-5题)
- HTML+CSS+JS实现 ❤️动态散花背景❤️
- 这些深度学习术语,你了解多少?(上)
- 布局:多列等高布局方法
- MySQL主从复制的原理及配置方法(比较详细)
- ecshop首页显示折扣的方法,ecshop商品显示折扣的方法
- 时序数据库InfluxDB 2.0 alpha 发布:主推新的Flux查询语言,TICK栈将成为整体
- 位移密码算法(js)
- python-贪心算法
- hello.java_hello java !
- C++对数计算log
- 《单基因疾病的遗传》学习笔记
- MySQL 一键安装脚本
- 《惢客创业日记》2021.01.21(周四)什么是产品逻辑?
- ecshop linux 大小写,ecshop敏感词管理
- 如何彻底卸载Oracle数据库
热门文章
- 【转】浏览器中的data类型的Url格式,data:image/png,data:image/jpeg!
- 鱼眼摄像机弊端及应用
- dnf服务器维护2018,2018年更新dnf游戏登陆不上 | 手游网游页游攻略大全
- 蒙特卡罗方法(Monte Carlo)
- Autodesk BIM 360 全球在线骇客马拉松
- 天猫精灵服务器修改密码,天猫精灵怎么解绑 天猫精灵解绑账号方法
- ora-01652无法通过128(在表空间temp中)扩展temp段
- GetLastError 错误代码
- 国外漂亮html5网页设,高端大气上档次!10个精美的国外HTML5网站欣赏_html/css_WEB-ITnose...
- Excel如何复制一摸一样的表格或建立副本