三、wss连接B站弹幕
环境
pip install ws4py
from ws4py.client.threadedclient import WebSocketClient
一、websocket协议
- 先建立连接 wss://broadcastlv.chat.bilibili.com/sub
- 发送登录包
{
"uid": 0表示未登录,否则为用户ID,
"roomid": 房间ID,
"protover": 1,
"platform": "web",
"clientver": "1.4.0"
} - 每隔一段时间发送心跳包
- 接收响应
响应由头部和数据组成
- 解析响应得到数据
- 注册事件分发事件
二、工具层 utils.py
- 定时器类
可取消 - 事件类
注册事件 (可重复)
分发事件
取消事件
class Timer():def __init__(self,delay,fun):self.delay,self.f=delay,funself.t=threading.Timer(self.delay,self.fun)self.t.start()def fun(self):if self.f:self.f()self.t=threading.Timer(self.delay,self.fun)self.t.start()def cancel(self):self.t.cancel()print("threading cancel")class Event():def __init__(self):self.map=[]self.keys=[]def index(self,k):i=-1for key in self.keys:i+=1if key==k:return ireturn -1def on(self,key,fun):i=self.index(key)if i==-1:self.map.append({"key":key,"funs":[fun]})self.keys.append(key)else:self.map[i]["funs"].append(fun)def emit(self,key,data=None):i=self.index(key)if i==-1:print("no regist event:"+str(key))returnfor f in self.map[i]["funs"]:f(data)def rm(self,key,fun):i=self.index(key)if i==-1:print("no regist event:"+str(key))returnfuns=self.map[i]["funs"]for j in range(len(funs)):if funs[j]==fun:funs[j]=Noneself.map[i]["funs"]=list(filter(None,funs))
三、服务层 DanmuWS.py
import threading
import json
import struct
from ws4py.client.threadedclient import WebSocketClient
from utils import Event,Timer
event=Event()
class DanmuWebSocket(WebSocketClient):def __init__(self,info,serveraddress='wss://broadcastlv.chat.bilibili.com/sub'):self.serveraddress=serveraddressWebSocketClient.__init__(self,serveraddress)DanmuWebSocket.event=eventDanmuWebSocket.headerLength=16self.Info=infodef opened(self):self.sendLoginPacket(self.Info['uid'],self.Info['roomid'],self.Info['protover'],self.Info['platform'],self.Info['clientver'])self.sendHeartBeatPacket();self.heartBeatHandler = Timer(20,self.sendHeartBeatPacket)print("opened")def delay_close(self):dws=DanmuWebSocket(self.Info,self.serveraddress)event.emit('reconnect',dws);def closed(self, code, reason=None):print("Closed", code, reason)if hasattr(self,"heartBeatHandler"):self.heartBeatHandler.cancel();if code == 1000: returnthreading.Timer(5,self.delay_close).start()print("Closed", code, reason)def received_message(self, message):position,length=0,len(message.data)-1while position<length:header_pack=struct.unpack(">IHHII",message.data[position:position+16])length_pack=header_pack[0]operation=header_pack[3]if operation==3:num=header_pack[1]+positionnum=struct.unpack(">I",message.data[num:num+4])[0]event.emit('heartbeat',num)elif operation==5:data=json.loads(message.data[position+16:position+length_pack])event.emit('cmd',data)#print("recv:"+data["cmd"])else:event.emit('login');position+=length_packdef sendData(self,data, protover = 1, operation = 2, sequence = 1):if type(data)==dict:data=json.dumps(data).encode()elif type(data)==str:data=data.encode()header=struct.pack(">IHHII",DanmuWebSocket.headerLength+len(data),DanmuWebSocket.headerLength,protover,operation,sequence)self.send(header+data)def sendLoginPacket(self,uid, roomid, protover = 1, platform = 'web', clientver = '1.4.6'):# Uint(4byte) + 00 10 + 00 01 + 00 00 00 07 + 00 00 00 01 + Data 登录数据包data = {'uid': int(uid),'roomid': int(roomid),'protover': protover,'platform': platform,'clientver': clientver}print("sendLoginPacket")data=json.dumps(data)data=data.replace(' ','')self.sendData(data.encode(),1,7,1)def sendHeartBeatPacket(self):# Uint(4byte) + 00 10 + 00 01 + 00 00 00 02 + 00 00 00 01 + Data 心跳数据包self.sendData(b'[object Object]', 1, 2, 1);def bind(self,onreconnect=None,onlogin=None,onheartbeat=None,oncmd=None,onreceive =None):if "cmd" in event.keys:returnif hasattr(onreconnect,"__call__"):event.on("reconnect",onreconnect)if hasattr(onlogin,"__call__"):event.on("login",onlogin)if hasattr(onheartbeat,"__call__"):event.on("heartbeat",onheartbeat)if hasattr(oncmd,"__call__"):event.on("cmd",oncmd)if hasattr(onreceive,"__call__"):event.on("receive",onreceive)
四、测试代码
from server import Login
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:62.0) Gecko/20100101 Firefox/62.0','Accept': 'application/json, text/plain, */*','Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2','Accept-Encoding': 'gzip, deflate, br','Referer': 'https://live.bilibili.com/','Origin': 'https://live.bilibili.com','Connection': 'keep-alive'}
s=session(headers,'cookie.txt')
login=Login(s)
while not login.isLogin():login.get_vdcode()login.loop_vdcode()
info={"uid": login.info['uid'],"roomid": 7603080,"protover": 1,"platform": "web","clientver": "1.4.0"
}
from DanmuWS import DanmuWebSocket
def oncmd(data):cmd=data["cmd"]if cmd=="SYS_MSG":print(data)elif cmd=="SPECIAL_GIFT":print(data)else:print(data)
def onlogin(data):print("login success")
def onreconnect(dws):global wsws=dws
def onheartbeat(num):print(num)
try:ws = DanmuWebSocket(info,'wss://broadcastlv.chat.bilibili.com/sub')ws.connect()ws.bind(None,onlogin,onheartbeat,oncmd)ws.run_forever()
except:ws.close()
转载于:https://www.cnblogs.com/blogs-qq2685054765/p/9727486.html
三、wss连接B站弹幕相关推荐
- 关于《后浪》的B站弹幕分析总结(三)——怎么制作好看的交互式词云
目录 一.对分词做词频统计 二.使用wordcloud展示词云 二.使用pyecharts绘制词云 三.使用词云制作工具 与本文相关内容链接: B站视频<[数说弹幕]我不小心看了后浪弹幕> ...
- python爬虫和定位_Python网络爬虫实战(三)照片定位与B站弹幕
之前两篇已经说完了如何爬取网页以及如何解析其中的数据,那么今天我们就可以开始第一次实战了. 这篇实战包含两个内容. * 利用爬虫调用Api来解析照片的拍摄位置 * 利用爬虫爬取Bilibili视频中的 ...
- B站直播弹幕获取 - 用python写一个B站弹幕姬吧
前言 关于这个小项目的由来. 最开始是想要利用b站的弹幕进行一些互动之类的.原本也有想过可以利用现有的弹幕姬做个插件来解决的,但无奈不会C#,所以只能自己研究b站的弹幕协议. 后来有写过一个C++版本 ...
- 萌新学习Python爬取B站弹幕+R语言分词demo说明
代码地址如下: http://www.demodashi.com/demo/11578.html 一.写在前面 之前在简书首页看到了Python爬虫的介绍,于是就想着爬取B站弹幕并绘制词云,因此有了这 ...
- python弹幕分析_《用python 玩转数据》项目——B站弹幕数据分析
1. 背景 在视频网站上,一边看视频一边发弹幕已经是网友的习惯.在B站上有很多种类的视频,也聚集了各种爱好的网友.本项目,就是对B站弹幕数据进行分析.选取分析的对象是B站上点播量过1.4亿的一部剧&l ...
- B站弹幕姬,弹幕礼物感谢,关注感谢,自动回复,房管工具,房管助手,基于java
运行环境 可在所有主要操作系统上运行,并且仅需要安装Java JDK或JRE版本8或更高版本.要检查,请运行java -version: $ java -version java version &q ...
- python-正则表达式及应用(b站弹幕屏蔽)
目录 简介 正则表达式常用字符 定位符 元字符 修饰符 字符组 重复限定符 re模块 手机号判断 昵称判断 b站弹幕礼仪 b站弹幕屏蔽正则 长弹幕 个人信息 手机号 QQ号 日期 无意义 现场怪 视频 ...
- 用Python爬取b站弹幕,看大家还会接受《爱情公寓5》吗?
尽管抄袭傍身,也没能阻挡<爱情公寓5>进击的脚步. 最近爱情公寓电视剧微博发布了长达8分钟的揭幕视频,官宣新季将在2020正式开播. 几位主演纷纷转发宣传,将#爱情公寓5揭幕#的话题送上了 ...
- Python爬取B站弹幕方法介绍
Python爬取B站弹幕方法介绍 文章目录 Python爬取B站弹幕方法介绍 前言 寻找弹幕数据 编写爬虫 B站弹幕数量 新技术介绍 参考文章 前言 最近同学要做东西,需要用 B 站的视频对应的弹幕数 ...
最新文章
- 认识flask框架-2
- linux program HEAP tracker
- Python time 100 天以后的日期
- Java中finalize()用法
- long logn的大小c语言,基本排序(C语言版) - ________MX的个人页面 - OSCHINA - 中文开源技术交流社区...
- Facebook陷入史上最大危机;华为5G设备欧洲大卖,美国指责欧盟;红帽宣布OpenShift可用于AWS中国……...
- 每天学一点flash(75) ToolTip 提示
- Oracle安装出现报错
- python输出时间格式_python中日期和时间格式化输出的方法小结_python
- 为什么JavaScript仅在IE中打开开发人员工具一次后才能工作?
- gbk汉字编码拼音对照表_预习部分:汉字编码方案
- 《Head First设计模式》
- Android源代码中引用@hide类出现引用异常的问题error: cannot find symbol
- start()和run()方法的区别
- 老旧计算机桌面,四种旧电脑改造桌面虚拟化的方案
- TensorFlow系列——一些api的使用场景及方式
- 系统进程里的edpa.exe是什么?
- 程序员微信名昵称_独一无二的情侣微信名-心有所属,名花有主
- 关于Chrome浏览器调用IE浏览器
- 武铁机械电子工程_机械电子工程
热门文章
- 上海计算机一级ps教程,2017年计算机一级考试PS学习妙招
- 计算机网络工程的发展,谈计算机网络工程全面信息化管理的应用与发展
- 嵌入式linux 硬盘录像机,嵌入式硬盘录像机技术的简单介绍
- 基于esp32的ic卡考勤系统
- 计算机毕业设计JAVA钢材销售平台登录mybatis+源码+调试部署+系统+数据库+lw
- IDC:爱数再次蝉联备份一体机市场中国品牌第一
- 数据结构中的elem,elemtype是什么
- Android获取String的MD5值
- 肩部固定武器-市场现状及未来发展趋势
- sqlserver 2008 varchar字段 转 nvarchar 存储过程