如何在 OpenResty 中使用 websocket

https://hambut.com/2016/10/13/how-to-use-websocket-in-openresty/

前言

作为一个游戏从业者不可能不使用推方案,以前一直使用 nginx-push-stream-module 这个模块的 Forever Iframe 模式来实现推方案。

最近决定研究下 lua-resty-websocket 来实现一个更加高效好用推方案

不推荐使用的场景

由于 OpenResty 目前还不能做到跨 worker 通信,所以想到实现指定推送需要中转一次,效率上可能不如其他语言如 golang 等

  • 过于复杂的业务逻辑
  • 频繁的指定推送(单对单、组、Tag等)
  • 广播过多

一起工作的好基友们

想要推的优雅以下几个基友的帮忙是不可或缺的

ngx.semaphore

它可以让你在想要发消息的地方优雅的进入到发送阶段,也可以让你来优雅的控制一个链接超时的关闭。

ngx.shared

由于目前我们无法做到跨 worker 的通信,所以必须借助共享内存来中转不属于当前 worker 的消息。

lua-resty-websocket

由于贪图方便还是直接使用了现成的库,喜欢折腾的小伙伴请移步 stream-lua-nginx-module

大概的思路

由于不能跨 worker 通信所以我给每个 worker 申请了一个 shared 共享内存来保存消息。 理论上 shared 的数量等于 worker 的数量最佳。

然后每个 worker 启动一个 timer 来判断当前 worker 的 message id 和 shared 中的 message id 是否有变化。 这里为什么不用 shared 的有序列表来做,容我先卖个关子。

当发生变化时,判断消息的目标是否在自己的 session hash 中,如果在则发之。

开始准备工作

修改配置文件

首先修改 nginx.conf 配置,增加以下设置

1
2
3
4
5
lua_shared_dict message_1 10m;
lua_shared_dict message_2 10m;
lua_shared_dict message_n 10m;init_worker_by_lua_file scripts/init_worker_by_lua.lua;

init_worker_by_lua

 123456789
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
local ngx = ngx
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_timer_at = ngx.timer.at
local require = requirelocal socketMgr = require("socketMgr")local delay = 1
local loopMessageloopMessage = function(premature)if premature thenngx_log(ngx_ERR, "timer was shut: ", err)returnendsocketMgr:loopMessages()local ok, err = ngx_timer_at(delay, loopMessage)if not ok thenngx_log(ngx_ERR, "failed to create the timer: ", err)returnend
endloopMessage()

loopMessages

判断 local message id 和 shared message id是否不等。 随后每次 local message id+ 1 从 shared 拉取数据,进行消息推送逻辑。

建立连接

不做过多说明,自行查看 lua-resty-websocket 的 wiki

当连接监听好之后,要进行一系列的管理。如:

  • session id 和 user id 的双向映射
  • session id 和 group name 的双向映射

后面再详细说明

生成 session id

我是用 (worker id + 1) * 100000 + worker's local incr id 来生成唯一 session id比较简陋,但是够用。

这么做的原因是,通过对 session id 进行取余可以很方便的得知 worker id,可以方便的给 shared 写消息。

1
2
3
4
5
6
7
local ngx_worker_id = ngx.worker.id()
local _incr_id = 0local _gen_session_id = function()_incr_id = _incr_id + 1return (ngx_worker_id + 1) * 100000 + _incr_id
end

设置消息映射

这个可以用于收到当前 worker 所属的 shared message 判断是否在当前进程。

1
2
_messages[session_id] = {}
_semaphores[session_id] = semaphore.new(0)

接收消息&发送消息

代码和 官方例子 类同不做过多说明,只说我改了什么。

在接收消息中管理了一个变量即 close_flag 用于管理 send message 轻线程的退出。

以下是一段伪代码,含义的话请联系上下文。

 123456789
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
local session_id = sessionMgr:gen_session_id()
local send_semaphore = sessionMgr:get_semaphore(session_id)
local close_flag = falselocal function _push_thread_function()while close_flag == false dolocal ok, err = send_semaphore:wait(300)if ok thenlocal messages = socketMgr:getMessages(session_id)while messages and #messages > 0 dolocal message = messages[1]table_remove(messages, 1)--- your send message function handlerendendif close_flag thensocketMgr:destory(session_id)breakendend
endlocal push_thread = ngx_thread_spawn(_push_thread_function)while true dolocal data, typ, err = wbsocket:recv_frame()while err == "again" dolocal cut_datacut_data, _, err = wbsocket:recv_frame()data = data .. cut_dataendif not data thenclose_flag = truesend_semaphore:post(1)breakelseif typ == 'close' thenclose_flag = truesend_semaphore:post(1)breakelseif typ == 'ping' thenlocal bytes, err = wbsocket:send_pong(data)if not bytes thenclose_flag = truesend_semaphore:post(1)breakendelseif typ == 'pong' thenelseif typ == 'text' then-- your receive function handlerelseif typ == 'continuation' thenelseif typ == 'binary' thenend
endngx_thread_wait(push_thread)
wbsocket:send_close()

消息推送

现在说说为什么不用 shared 的有序列表来存储消息,我是使用了 shared 的 set 方法中的 flag 属性来存放 session id。 这样在获得一个消息的时候,能很方便的知道消息是发给哪个 session id的。

继续一段伪代码。

 123456789
10
11
12
13
14
15
16
17
18
19
20
21
22
23
local ngx_shared = ngx.sharedlocal _shared_dicts = {ngx_shared.message_1,ngx_shared.message_2,ngx_shared.message_n,
}local current_shared = _shared_dicts[ngx_worker_id + 1]
local current_message_id = 1--- 如果在当前进程
if _messages[session_id] thentable.insert(_messages[session_id], "message data")_semaphores[session_id]:post(1)--- 会进入到,上述 _push_thread_function 方法中,进行发送逻辑
elselocal shared_id = session_id % 100000local message_shared = _shared_dicts[shared_id]local message_id = message_shared:incr("id", 1, 0)message_shared:set("message." .. message_id, "message data", 60, session_id)
end

其他

https://github.com/chenxiaofa/p

借鉴了这位同学的设计思路,实现了额外逻辑。如:

  • 加入、退出、销毁组
  • 各 worker 之间的 cmd 内部命令执行
  • 热更新的特殊处理

如何在 OpenResty 中使用 websocket相关推荐

  1. 如何在 Knative 中部署 WebSocket 和 gRPC 服务?

    作者 | 冬岛 阿里云容器平台工程师 导读:虽然说 Knative 默认就支持 WebSocket 和 gRPC,但在使用中会发现,有时想要把自己的 WebSocket 或 gRPC 部署到 Knat ...

  2. 如何在Vue中使用websocket?

    什么是WebSocket : WebSocket是一种在单个TCP连接上进行全双工通信的协议.WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范. W ...

  3. 居然还有人不知道如何在 IDEA 中生成 Maven 依赖关系图?

    点击上方"Java基基",选择"设为星标" 做积极的人,而不是积极废人! 每天 14:00 更新文章,每天掉亿点点头发... 源码精品专栏 原创 | Java ...

  4. 如何在SharePoint2010中添加Deep Zoom Image

    如何在SharePoint2010中添加Deep Zoom Image 应用范围 SharePoint 2010 Foundation:SharePoint 2010 Standard:SharePo ...

  5. php如何对数组进行分组,如何在PHP中对数组进行分组排序

    如何在PHP中对数组进行分组排序 发布时间:2021-01-04 16:28:51 来源:亿速云 阅读:98 作者:Leah 这篇文章将为大家详细讲解有关如何在PHP中对数组进行分组排序,文章内容质量 ...

  6. html中section与div,如何在html中的section标签内包含div标签

    我正在制作一个完整版块的页面网站,如this.每个页面都有自己的标签.目前我的网页有4个部分(呈现不同的背景颜色).如何在html中的section标签内包含div标签 我的第一部分有一个容器div, ...

  7. html img调用js,html调用js变量 如何在html中输出js文件中的变量

    html页面代码中怎么调用js变量?html页面代码中怎么调用js变量,例如 在html代码中插入js代码: a=取浏览你把index1.js 中的onReady 去掉,把index1.js改成 fu ...

  8. 如何在OpenCV中为InRange阈值选择颜色的最佳HSV值

    如何在OpenCV中为InRange阈值选择颜色的最佳HSV值 1. 效果图 2. 源码 参考 之前的博客介绍了如何使用Python,OpenCV通过HSV颜色空间转换检测对象,并进行轨迹追踪.怎么选 ...

  9. 管理 zabbix_Zabbix 2019 峰会丨看睿象云如何在 Zabbix 中玩转告警

    2019年11月29日-30日,为期两天的 Zabbix 大会中国站在北京盛大召开,本届 Zabbix 大会以"新视界,新技术,共建未来新监控!"为主题,为与会人员提供前沿的监控技 ...

最新文章

  1. 爬虫核心原理:一次 HTTP 请求到底是如何完成的?
  2. 《Java工程师修炼之道》内容概览
  3. [转]Python测试框架对比----unittest, pytest, nose, robot framework对比
  4. ubuntu18 激活 pycharm
  5. Arrays对数组,二分查找,冒泡排序
  6. 前端大神的离逝,让我们不得不有所反思。
  7. JS中var、let、const区别? 用3句话概括
  8. Codeforces Round #243 (Div. 1)
  9. Python爬虫入门教程21:文档的爬取
  10. SPSS教程:手把手教你设置哑变量以及解读结果
  11. Unity遮罩简单复刻2D平台《Unbound: Worlds Apart》游离于世界之海的双重世界效果
  12. 弹力球C语言课程设计,弹力球游戏c语言代码
  13. 脑机接口取得重大突破
  14. 面试问遇到最难的事情_太难的事情
  15. 我的世界服务器物品管道,物品导管 (Item Conduit)
  16. 脚本启动shipyard
  17. try catch 用法
  18. rk3066 android4.4,Rooting the Cube U30GT rk3066 android tablet
  19. Linux面试题--1
  20. 【LIS】【打地鼠】

热门文章

  1. ros indigo 学习笔记
  2. abb伺服电机如何进行挑选_PLC是怎么控制伺服电机的?如何设计一个伺服系统?...
  3. java时间控件jar包_maven打jar包插件
  4. 【Spring第五篇】Autowired:自动装配
  5. JavaScript中的数字型
  6. 实验4-1-3 找出最小值 (20 分)
  7. JavaScript:数据的存储
  8. git之配置在Terminal显示git分支
  9. 03 聚类算法 - K-means聚类
  10. Verilog实现交通灯(数电课设)----------旧