环境

pip install ws4py
from ws4py.client.threadedclient import WebSocketClient

一、websocket协议

  1. 先建立连接 wss://broadcastlv.chat.bilibili.com/sub
  2. 发送登录包
    {
    "uid": 0表示未登录,否则为用户ID,
    "roomid": 房间ID,
    "protover": 1,
    "platform": "web",
    "clientver": "1.4.0"
    }
  3. 每隔一段时间发送心跳包
  4. 接收响应
    响应由头部和数据组成

  5. 解析响应得到数据
  6. 注册事件分发事件

二、工具层 utils.py

  1. 定时器类
    可取消
  2. 事件类
    注册事件 (可重复)
    分发事件
    取消事件
class Timer():def __init__(self,delay,fun):self.delay,self.f=delay,funself.t=threading.Timer(self.delay,self.fun)self.t.start()def fun(self):if self.f:self.f()self.t=threading.Timer(self.delay,self.fun)self.t.start()def cancel(self):self.t.cancel()print("threading cancel")class Event():def __init__(self):self.map=[]self.keys=[]def index(self,k):i=-1for key in self.keys:i+=1if key==k:return ireturn -1def on(self,key,fun):i=self.index(key)if i==-1:self.map.append({"key":key,"funs":[fun]})self.keys.append(key)else:self.map[i]["funs"].append(fun)def emit(self,key,data=None):i=self.index(key)if i==-1:print("no regist event:"+str(key))returnfor f in self.map[i]["funs"]:f(data)def rm(self,key,fun):i=self.index(key)if i==-1:print("no regist event:"+str(key))returnfuns=self.map[i]["funs"]for j in range(len(funs)):if funs[j]==fun:funs[j]=Noneself.map[i]["funs"]=list(filter(None,funs))

三、服务层 DanmuWS.py

import threading
import json
import struct
from ws4py.client.threadedclient import WebSocketClient
from utils import Event,Timer
event=Event()
class DanmuWebSocket(WebSocketClient):def __init__(self,info,serveraddress='wss://broadcastlv.chat.bilibili.com/sub'):self.serveraddress=serveraddressWebSocketClient.__init__(self,serveraddress)DanmuWebSocket.event=eventDanmuWebSocket.headerLength=16self.Info=infodef opened(self):self.sendLoginPacket(self.Info['uid'],self.Info['roomid'],self.Info['protover'],self.Info['platform'],self.Info['clientver'])self.sendHeartBeatPacket();self.heartBeatHandler = Timer(20,self.sendHeartBeatPacket)print("opened")def delay_close(self):dws=DanmuWebSocket(self.Info,self.serveraddress)event.emit('reconnect',dws);def closed(self, code, reason=None):print("Closed", code, reason)if hasattr(self,"heartBeatHandler"):self.heartBeatHandler.cancel();if code == 1000: returnthreading.Timer(5,self.delay_close).start()print("Closed", code, reason)def received_message(self, message):position,length=0,len(message.data)-1while position<length:header_pack=struct.unpack(">IHHII",message.data[position:position+16])length_pack=header_pack[0]operation=header_pack[3]if operation==3:num=header_pack[1]+positionnum=struct.unpack(">I",message.data[num:num+4])[0]event.emit('heartbeat',num)elif operation==5:data=json.loads(message.data[position+16:position+length_pack])event.emit('cmd',data)#print("recv:"+data["cmd"])else:event.emit('login');position+=length_packdef sendData(self,data, protover = 1, operation = 2, sequence = 1):if type(data)==dict:data=json.dumps(data).encode()elif type(data)==str:data=data.encode()header=struct.pack(">IHHII",DanmuWebSocket.headerLength+len(data),DanmuWebSocket.headerLength,protover,operation,sequence)self.send(header+data)def sendLoginPacket(self,uid, roomid, protover = 1, platform = 'web', clientver = '1.4.6'):# Uint(4byte) + 00 10 + 00 01 + 00 00 00 07 + 00 00 00 01 + Data 登录数据包data = {'uid': int(uid),'roomid': int(roomid),'protover': protover,'platform': platform,'clientver': clientver}print("sendLoginPacket")data=json.dumps(data)data=data.replace(' ','')self.sendData(data.encode(),1,7,1)def sendHeartBeatPacket(self):# Uint(4byte) + 00 10 + 00 01 + 00 00 00 02 + 00 00 00 01 + Data 心跳数据包self.sendData(b'[object Object]', 1, 2, 1);def bind(self,onreconnect=None,onlogin=None,onheartbeat=None,oncmd=None,onreceive =None):if "cmd" in event.keys:returnif hasattr(onreconnect,"__call__"):event.on("reconnect",onreconnect)if hasattr(onlogin,"__call__"):event.on("login",onlogin)if hasattr(onheartbeat,"__call__"):event.on("heartbeat",onheartbeat)if hasattr(oncmd,"__call__"):event.on("cmd",oncmd)if hasattr(onreceive,"__call__"):event.on("receive",onreceive)

四、测试代码

from server import Login
headers={'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:62.0) Gecko/20100101 Firefox/62.0','Accept': 'application/json, text/plain, */*','Accept-Language': 'zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2','Accept-Encoding': 'gzip, deflate, br','Referer': 'https://live.bilibili.com/','Origin': 'https://live.bilibili.com','Connection': 'keep-alive'}
s=session(headers,'cookie.txt')
login=Login(s)
while not login.isLogin():login.get_vdcode()login.loop_vdcode()
info={"uid": login.info['uid'],"roomid": 7603080,"protover": 1,"platform": "web","clientver": "1.4.0"
}
from DanmuWS import DanmuWebSocket
def oncmd(data):cmd=data["cmd"]if cmd=="SYS_MSG":print(data)elif cmd=="SPECIAL_GIFT":print(data)else:print(data)
def onlogin(data):print("login success")
def onreconnect(dws):global wsws=dws
def onheartbeat(num):print(num)
try:ws = DanmuWebSocket(info,'wss://broadcastlv.chat.bilibili.com/sub')ws.connect()ws.bind(None,onlogin,onheartbeat,oncmd)ws.run_forever()
except:ws.close()

转载于:https://www.cnblogs.com/blogs-qq2685054765/p/9727486.html

三、wss连接B站弹幕相关推荐

  1. 关于《后浪》的B站弹幕分析总结(三)——怎么制作好看的交互式词云

    目录 一.对分词做词频统计 二.使用wordcloud展示词云 二.使用pyecharts绘制词云 三.使用词云制作工具 与本文相关内容链接: B站视频<[数说弹幕]我不小心看了后浪弹幕> ...

  2. python爬虫和定位_Python网络爬虫实战(三)照片定位与B站弹幕

    之前两篇已经说完了如何爬取网页以及如何解析其中的数据,那么今天我们就可以开始第一次实战了. 这篇实战包含两个内容. * 利用爬虫调用Api来解析照片的拍摄位置 * 利用爬虫爬取Bilibili视频中的 ...

  3. B站直播弹幕获取 - 用python写一个B站弹幕姬吧

    前言 关于这个小项目的由来. 最开始是想要利用b站的弹幕进行一些互动之类的.原本也有想过可以利用现有的弹幕姬做个插件来解决的,但无奈不会C#,所以只能自己研究b站的弹幕协议. 后来有写过一个C++版本 ...

  4. 萌新学习Python爬取B站弹幕+R语言分词demo说明

    代码地址如下: http://www.demodashi.com/demo/11578.html 一.写在前面 之前在简书首页看到了Python爬虫的介绍,于是就想着爬取B站弹幕并绘制词云,因此有了这 ...

  5. python弹幕分析_《用python 玩转数据》项目——B站弹幕数据分析

    1. 背景 在视频网站上,一边看视频一边发弹幕已经是网友的习惯.在B站上有很多种类的视频,也聚集了各种爱好的网友.本项目,就是对B站弹幕数据进行分析.选取分析的对象是B站上点播量过1.4亿的一部剧&l ...

  6. B站弹幕姬,弹幕礼物感谢,关注感谢,自动回复,房管工具,房管助手,基于java

    运行环境 可在所有主要操作系统上运行,并且仅需要安装Java JDK或JRE版本8或更高版本.要检查,请运行java -version: $ java -version java version &q ...

  7. python-正则表达式及应用(b站弹幕屏蔽)

    目录 简介 正则表达式常用字符 定位符 元字符 修饰符 字符组 重复限定符 re模块 手机号判断 昵称判断 b站弹幕礼仪 b站弹幕屏蔽正则 长弹幕 个人信息 手机号 QQ号 日期 无意义 现场怪 视频 ...

  8. 用Python爬取b站弹幕,看大家还会接受《爱情公寓5》吗?

    尽管抄袭傍身,也没能阻挡<爱情公寓5>进击的脚步. 最近爱情公寓电视剧微博发布了长达8分钟的揭幕视频,官宣新季将在2020正式开播. 几位主演纷纷转发宣传,将#爱情公寓5揭幕#的话题送上了 ...

  9. Python爬取B站弹幕方法介绍

    Python爬取B站弹幕方法介绍 文章目录 Python爬取B站弹幕方法介绍 前言 寻找弹幕数据 编写爬虫 B站弹幕数量 新技术介绍 参考文章 前言 最近同学要做东西,需要用 B 站的视频对应的弹幕数 ...

最新文章

  1. 认识flask框架-2
  2. linux program HEAP tracker
  3. Python time 100 天以后的日期
  4. Java中finalize()用法
  5. long logn的大小c语言,基本排序(C语言版) - ________MX的个人页面 - OSCHINA - 中文开源技术交流社区...
  6. Facebook陷入史上最大危机;华为5G设备欧洲大卖,美国指责欧盟;红帽宣布OpenShift可用于AWS中国……...
  7. 每天学一点flash(75) ToolTip 提示
  8. Oracle安装出现报错
  9. python输出时间格式_python中日期和时间格式化输出的方法小结_python
  10. 为什么JavaScript仅在IE中打开开发人员工具一次后才能工作?
  11. gbk汉字编码拼音对照表_预习部分:汉字编码方案
  12. 《Head First设计模式》
  13. Android源代码中引用@hide类出现引用异常的问题error: cannot find symbol
  14. start()和run()方法的区别
  15. 老旧计算机桌面,四种旧电脑改造桌面虚拟化的方案
  16. TensorFlow系列——一些api的使用场景及方式
  17. 系统进程里的edpa.exe是什么?
  18. 程序员微信名昵称_独一无二的情侣微信名-心有所属,名花有主
  19. 关于Chrome浏览器调用IE浏览器
  20. 武铁机械电子工程_机械电子工程

热门文章

  1. 上海计算机一级ps教程,2017年计算机一级考试PS学习妙招
  2. 计算机网络工程的发展,谈计算机网络工程全面信息化管理的应用与发展
  3. 嵌入式linux 硬盘录像机,嵌入式硬盘录像机技术的简单介绍
  4. 基于esp32的ic卡考勤系统
  5. 计算机毕业设计JAVA钢材销售平台登录mybatis+源码+调试部署+系统+数据库+lw
  6. IDC:爱数再次蝉联备份一体机市场中国品牌第一
  7. 数据结构中的elem,elemtype是什么
  8. Android获取String的MD5值
  9. 肩部固定武器-市场现状及未来发展趋势
  10. sqlserver 2008 varchar字段 转 nvarchar 存储过程