Gin + gRPC双向流模式 + sse实现grpc客户端到前端界面的消息推送
Gin + gRPC + sse实现grpc客户端到前端界面的消息推送
功能需求
- 工厂客户端与服务端通过
gRPC
连接 - 突破工厂防火墙(不需要额外开启端口)
- 前端可以控制数据推送的开启和停止
- 前端数据大屏显示(实时数据)
实现
定义gskmes3.proto
syntax = "proto3"; // 语法使用 protocol buffer proto3// 包名: gskmes3
package gskmes3;
option go_package = "./gskmes3";/*服务名: gskmes3,其中只有 名为“ControlSwitch”的一个双向RPC服务,输入是 Request格式的数据流, 输出是 Response 格式的数据流
*/
service Control {// 定义双向开关控制函数rpc ControlSwitch(stream Request) returns (stream Response) {}
}// 请求数据 Request格式定义
message Request {string input = 1;
}// 响应数据Response格式定义
message Response {string output = 1;
}
生成gskmes3.pb.go
在powershell
中执行:
protoc -I=. --go_out=plugins=grpc:. gskmes3.proto
客户端实现
/*** @Time :2021/9/2 10:26* @Author :ZhangXiaoyu* @Email :zhuanzhuan1999@126.com* @FileName:main.py* @Software:GoLand* @Blog :https://blog.csdn.net/qq_29537269* @Guide :https://guide.melf.space* @Information:**/package mainimport ("bufio""context""fmt""google.golang.org/grpc"proto "grpcClient/gskmes3" // 根据proto文件自动生成的代码"io""log""os""time"
)var needSend bool
var seedFunCount intfunc main() {// 创建连接conn, err := grpc.Dial("localhost:3000", grpc.WithInsecure())if err != nil {log.Printf("连接失败: [%v]\n", err)return}defer conn.Close()// 声明客户端client := proto.NewControlClient(conn)// 声明 contextctx := context.Background()// 创建双向数据流stream, err := client.ControlSwitch(ctx)if err != nil {log.Printf("创建数据流失败: [%v]\n", err)}// 启动一个 goroutine 接收命令行输入的指令go func() {log.Println("请输入消息...")输入 := bufio.NewReader(os.Stdin)for {// 获取 命令行输入的字符串, 以回车 \n 作为结束标志命令行输入的字符串, _ := 输入.ReadString('\n')// 向服务端发送 指令if err := stream.Send(&proto.Request{Input: 命令行输入的字符串}); err != nil {return}}}()for {// 接收从 服务端返回的数据流响应, err := stream.Recv()if err == io.EOF {log.Println("⚠️ 收到服务端的结束信号")break //如果收到结束信号,则退出“接收循环”,结束客户端程序}if err != nil {// TODO: 处理接收错误log.Println("接收数据出错:", err)}if 响应.Output == "开始\n" {fmt.Println("收到开始命令")needSend = truego func() {if seedFunCount > 0 {return}seedFunCount = seedFunCount + 1fmt.Println(seedFunCount)fmt.Println("启动推送函数:", seedFunCount)for {fmt.Println("循环推送")time.Sleep(time.Second * 1)if err := stream.Send(&proto.Request{Input: time.Now().Format("2006-01-02 15:04:05")}); err != nil {return}if needSend == false {fmt.Println("退出推送函数")seedFunCount = seedFunCount - 1return}}}()} else if 响应.Output == "结束\n" {// 没有错误的情况下,打印来自服务端的消息log.Printf("[客户端收到]: %s", 响应.Output)needSend = false} else {// 没有错误的情况下,打印来自服务端的消息log.Printf("[客户端收到]: %s", 响应.Output)//needSend = false}}
}
服务端实现
/*** @Time :2021/9/2 10:15* @Author :ZhangXiaoyu* @Email :zhuanzhuan1999@126.com* @FileName:main.py* @Software:GoLand* @Blog :https://blog.csdn.net/qq_29537269* @Guide :https://guide.melf.space* @Information:**/package mainimport ("fmt""github.com/gin-gonic/gin""google.golang.org/grpc"proto "grpcServer/gskmes3""io""log""net""net/http""runtime""strconv""time"
)// Streamer 服务端
type Streamer struct{}var sendMessage string
var getInput stringvar ch1 = make(chan string)// ControlSwitch 实现了 ChatServer 接口中定义的 BidStream 方法
func (s *Streamer) ControlSwitch(stream proto.Control_ControlSwitchServer) error {ctx := stream.Context()runtime.GOMAXPROCS(1)// 启动一个 goroutine 接收命令行输入的指令go func() {//for {// log.Println("请输入消息...")// input := bufio.NewReader(os.Stdin)// // 获取 命令行输入的字符串, 以回车 \n 作为结束标志// sendMessage, _ = input.ReadString('\n')// if err := stream.Send(&proto.Response{Output: sendMessage}); err != nil {// return// }// fmt.Println("发送:" + sendMessage)// //time.Sleep(time.Second * 1)//}for {select {case data := <-ch1:switch data {case "开始\n":// 如果从 ch1 信道成功接收数据,则执行该分支代码fmt.Println("如果从 ch1 信道成功接收数据,则执行该分支代码")if err := stream.Send(&proto.Response{Output: "开始\n"}); err != nil {return}fmt.Println("发送:" + "开始\n")case "结束\n":// 如果从 ch1 信道成功接收数据,则执行该分支代码fmt.Println("如果从 ch1 信道成功接收数据,则执行该分支代码")if err := stream.Send(&proto.Response{Output: "结束\n"}); err != nil {return}fmt.Println("发送:" + "结束\n")getInput = "结束"}default://fmt.Printf("没有操作\n")}//fmt.Println("检测")time.Sleep(time.Second * 1)}}()//go func() {// for {// fmt.Println("检测sendMessage")// if sendMessage == "开始\n" {// if err := stream.Send(&proto.Response{Output: sendMessage}); err != nil {// return// }// fmt.Println("发送:" + sendMessage)// return// }// time.Sleep(time.Second * 1)// }//}()for {select {case <-ctx.Done():log.Println("收到客户端通过context发出的终止信号")return ctx.Err()default:// 接收从客户端发来的消息输入, err := stream.Recv()if err == io.EOF {log.Println("客户端发送的数据流结束")return nil}if err != nil {log.Println("接收数据出错:", err)return err}// 如果接收正常,则根据接收到的 字符串 执行相应的指令switch 输入.Input {case "结束对话\n":log.Println("收到'结束对话'指令")if err := stream.Send(&proto.Response{Output: "收到结束指令"}); err != nil {return err}// 收到结束指令时,通过 return nil 终止双向数据流return nilcase "返回数据流\n":log.Println("收到'返回数据流'指令")// 收到 收到'返回数据流'指令, 连续返回 10 条数据for i := 0; i < 10; i++ {if err := stream.Send(&proto.Response{Output: "数据流 #" + strconv.Itoa(i)}); err != nil {return err}}default:// 缺省情况下, 返回 '服务端返回: ' + 输入信息log.Printf("[收到消息]: %s", 输入.Input)getInput = 输入.Inputif err := stream.Send(&proto.Response{Output: sendMessage}); err != nil {return err}//if err := stream.Send(&proto.Response{Output: "服务端返回: " + time.Now().Format("2006-01-02 15:04:05")}); err != nil {// return err//}}}}
}func esSSE(c *gin.Context) {w := c.Writerr := c.Requestfmt.Println(r.Method)appId := r.URL.Query()["appId"]page := r.URL.Query()["page"]pageSize := r.URL.Query()["pageSize"]fmt.Println("获取到参数")fmt.Println(appId)fmt.Println(page)fmt.Println(pageSize)w.Header().Set("Content-Type", "text/event-stream")w.Header().Set("Cache-Control", "no-cache")w.Header().Set("Connection", "keep-alive")w.Header().Set("Access-Control-Allow-Origin", "*")flusher, ok := w.(http.Flusher)if !ok {log.Panic("server not support")}for {if getInput == "" || getInput == "结束" {return}time.Sleep(1 * time.Second)//fmt.Fprintf(w, "data: %d%s%s%s\n\n", i, appId[0], page[0], pageSize[0])//now := time.Now()//timeStr := now.Format("2006-01-02 15:04:05")fmt.Fprintf(w, "data: %s\n\n", getInput)flusher.Flush()}fmt.Fprintf(w, "event: close\ndata: close\n\n") // 一定要带上data,否则无效
}func main() {r := gin.Default()r.GET("/", func(c *gin.Context) {start := c.Query("start")if start != "" {ch1 <- "开始\n"//sendMessage = "开始\n"} else {ch1 <- "结束\n"//sendMessage = "结束\n"}c.String(http.StatusOK, "hello word")})r.GET("/event", esSSE)go r.Run(":8000")log.Println("启动服务端...")server := grpc.NewServer()// 注册 ChatServerproto.RegisterControlServer(server, &Streamer{})address, err := net.Listen("tcp", ":3000")if err != nil {panic(err)}if err := server.Serve(address); err != nil {panic(err)}
}
网页实现
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title>
</head>
<body>
<script>document.addEventListener("DOMContentLoaded", ready);function ready() {console.log("触发生命周期")// var evsrc = new EventSource("http://localhost:8080/evebts")var page = "1"var pageSize = "10"var appid = "20"// var url = 'http://127.0.0.1:8000/event?page=' + page + '&pageSize=' + pageSize + '&appId=' + appidvar url = 'http://192.168.10.6:8000/event?page=' + page + '&pageSize=' + pageSize + '&appId=' + appidvar evsrc = new EventSource(url)evsrc.onmessage = function (ev) {console.log("收到")console.log(ev.data)}evsrc.onerror = function (ev) {// console.log("出错")console.log(ev.currentTarget)}}
</script></body>
</html>
Gin + gRPC双向流模式 + sse实现grpc客户端到前端界面的消息推送相关推荐
- Web消息推送之SSE
文章目录 一.消息推送简介 1.消息推送介绍 2.几种方式介绍 二.SSE原理介绍 1.SSE基础概念 2.SSE特点 3.SSE与WebSocket异同 三.SSE推送实现 1.概述 1.1 使用S ...
- ASP.NET Core 3.0 gRPC 双向流
目录 ASP.NET Core 3.0 使用gRPC ASP.NET Core 3.0 gRPC 双向流 ASP.NET Core 3.0 gRPC 认证授权 一.前言 在前一文 < 二. 什么 ...
- grpc双向流究竟是什么情况?2段代码告诉你
本文分享自华为云社区<grpc双向流究竟是什么情况?2段代码告诉你>,作者:breakDawn. 为什么需要grpc双向流? 有时候请求调用和返回过程,并不是简单的一问一答形式,可能会涉及 ...
- 干货 | Reactive模式在Trip.com消息推送平台上的实践
作者简介 KevinTen,携程后端开发工程师,关注Reactive和RPC领域,深度参与开源社区,对Reactive技术有浓厚兴趣. Pin,携程技术专家,Apache Dubbo贡献者,关注RPC ...
- sse java8_HTTP 服务器消息推送之SSE
HTTP 服务器推送也称 HTTP 流,是一种客户端-服务器通信模式,它将信息从 HTTP 服务器异步推送到客户端,而无需客户端请求.现在的 web 和 app 中,越来越多的场景使用这种通信模式,比 ...
- SSE(Server-sent Events)实现Web消息推送(SpringBoot)
本文参考自: Web消息推送之SSE_魅Lemon的博客-CSDN博客_sse推送 [IT老齐237]超好用Web服务端主动推送技术SSE_哔哩哔哩_bilibili 1.Web消息推送简介 短轮询 ...
- SSE 服务端消息推送
SSE(Server-sent events) SSE 它是基于 HTTP 协议的,一般意义上的 HTTP 协议是无法做到服务端主动向客户端推送消息的.有一种变通方法,就是服务器向客户端声明,发送的是 ...
- grpc双向流 python_gRPC Golang/Python使用
gRPC Golang/Python使用 以前开发网站都是用http协议,学过TCP/IP协议的人都知道,在传输层TCP的基础上,应用层HTTP就是填充了一定规则的文本. 1.gRPC使用和介绍 工作 ...
- c# 实现 Server-Sent Events (SSE),服务器单方面消息推送 [案例版]
游览器通讯技术其实有很多,相较于 WebSocket 而言,Server-Sent Events (简称SSE)更少被人知晓,具体实践也较少. 但是,实现却是简单的,其中 IE / Edge 几乎根本 ...
最新文章
- mySql中The user specified as a definer ('root'@'%') does not exist
- anaconda3下opencv安装
- 设计模式总结 (2)创建型模式
- 【Mark 常用方法】Html中<form>标签作用和属性详解
- Ubuntu使用技巧(一)
- ARTS打卡计划第6周-REVIEW-超越编码的避免项目失败的软技能
- 浅谈-tomcat中的项目之间的访问
- 分盘存储:实现数据库备集群备份文件分散存储
- dialog能提交数据吗_硬盘坏了能恢复数据吗?实用硬盘修复软件
- 常用电脑端口作用大曝光
- 获取微信公众号文章封面图的方法
- echarts实现复合饼图
- 如何下载jQuery
- 【转载】Java分布式键-值缓存系统Voldemort
- 老厉害了!2600亿,紫光集团南京再投半导体生产线
- tensorflow2.x实现人脸关键点检测
- Linux系统编程-信号入门2
- 如何发好外贸邮件,看亚马逊SES邮件服务商怎么说?
- 微信小程序 修改button为圆形按钮并设置图片
- 【这些题我一拿到手就会】C指针和数组试题详解(上)
热门文章
- 什么叫反光识别读数识别_回老家前 微信这个超实用小技巧一定要学会
- 计算机应用基础考证,计算机应用基础准考证
- 而多乐在线书签导入html文件,,简单介绍HTML5中的文件导入
- x264源码分析-psy-rd参数
- 如何让照片保持一百年不变色——富士instax SHARE SP-2体验探秘
- 用计算机写作与用笔写作,电脑写作vs手写作文,技术真能提高写作成绩吗?
- 2020年第十届C/C++ B组第一场蓝桥杯省赛真题
- UnixLinux 索引
- WPS Word表格转成文字或文字制成表格的快捷方法
- MySql 5.7 详细参数说明