Gin + gRPC + sse实现grpc客户端到前端界面的消息推送

功能需求

  • 工厂客户端与服务端通过gRPC连接
  • 突破工厂防火墙(不需要额外开启端口)
  • 前端可以控制数据推送的开启和停止
  • 前端数据大屏显示(实时数据)

实现

定义gskmes3.proto

syntax = "proto3"; // 语法使用 protocol buffer proto3// 包名: gskmes3
package gskmes3;
option go_package = "./gskmes3";/*服务名: gskmes3,其中只有 名为“ControlSwitch”的一个双向RPC服务,输入是 Request格式的数据流, 输出是 Response 格式的数据流
*/
service Control {// 定义双向开关控制函数rpc ControlSwitch(stream Request) returns (stream Response) {}
}// 请求数据 Request格式定义
message Request {string input = 1;
}// 响应数据Response格式定义
message Response {string output = 1;
}

生成gskmes3.pb.go

powershell中执行:

protoc -I=. --go_out=plugins=grpc:. gskmes3.proto

客户端实现

/*** @Time    :2021/9/2 10:26* @Author  :ZhangXiaoyu* @Email   :zhuanzhuan1999@126.com* @FileName:main.py* @Software:GoLand* @Blog    :https://blog.csdn.net/qq_29537269* @Guide   :https://guide.melf.space* @Information:**/package mainimport ("bufio""context""fmt""google.golang.org/grpc"proto "grpcClient/gskmes3" // 根据proto文件自动生成的代码"io""log""os""time"
)var needSend bool
var seedFunCount intfunc main() {// 创建连接conn, err := grpc.Dial("localhost:3000", grpc.WithInsecure())if err != nil {log.Printf("连接失败: [%v]\n", err)return}defer conn.Close()// 声明客户端client := proto.NewControlClient(conn)// 声明 contextctx := context.Background()// 创建双向数据流stream, err := client.ControlSwitch(ctx)if err != nil {log.Printf("创建数据流失败: [%v]\n", err)}// 启动一个 goroutine 接收命令行输入的指令go func() {log.Println("请输入消息...")输入 := bufio.NewReader(os.Stdin)for {// 获取 命令行输入的字符串, 以回车 \n 作为结束标志命令行输入的字符串, _ := 输入.ReadString('\n')// 向服务端发送 指令if err := stream.Send(&proto.Request{Input: 命令行输入的字符串}); err != nil {return}}}()for {// 接收从 服务端返回的数据流响应, err := stream.Recv()if err == io.EOF {log.Println("⚠️ 收到服务端的结束信号")break //如果收到结束信号,则退出“接收循环”,结束客户端程序}if err != nil {// TODO: 处理接收错误log.Println("接收数据出错:", err)}if 响应.Output == "开始\n" {fmt.Println("收到开始命令")needSend = truego func() {if seedFunCount > 0 {return}seedFunCount = seedFunCount + 1fmt.Println(seedFunCount)fmt.Println("启动推送函数:", seedFunCount)for {fmt.Println("循环推送")time.Sleep(time.Second * 1)if err := stream.Send(&proto.Request{Input: time.Now().Format("2006-01-02 15:04:05")}); err != nil {return}if needSend == false {fmt.Println("退出推送函数")seedFunCount = seedFunCount - 1return}}}()} else if 响应.Output == "结束\n" {// 没有错误的情况下,打印来自服务端的消息log.Printf("[客户端收到]: %s", 响应.Output)needSend = false} else {// 没有错误的情况下,打印来自服务端的消息log.Printf("[客户端收到]: %s", 响应.Output)//needSend = false}}
}

服务端实现

/*** @Time    :2021/9/2 10:15* @Author  :ZhangXiaoyu* @Email   :zhuanzhuan1999@126.com* @FileName:main.py* @Software:GoLand* @Blog    :https://blog.csdn.net/qq_29537269* @Guide   :https://guide.melf.space* @Information:**/package mainimport ("fmt""github.com/gin-gonic/gin""google.golang.org/grpc"proto "grpcServer/gskmes3""io""log""net""net/http""runtime""strconv""time"
)// Streamer 服务端
type Streamer struct{}var sendMessage string
var getInput stringvar ch1 = make(chan string)// ControlSwitch 实现了 ChatServer 接口中定义的 BidStream 方法
func (s *Streamer) ControlSwitch(stream proto.Control_ControlSwitchServer) error {ctx := stream.Context()runtime.GOMAXPROCS(1)// 启动一个 goroutine 接收命令行输入的指令go func() {//for {// log.Println("请输入消息...")// input := bufio.NewReader(os.Stdin)//   // 获取 命令行输入的字符串, 以回车 \n 作为结束标志// sendMessage, _ = input.ReadString('\n')//    if err := stream.Send(&proto.Response{Output: sendMessage}); err != nil {//       return//    }// fmt.Println("发送:" + sendMessage)//    //time.Sleep(time.Second * 1)//}for {select {case data := <-ch1:switch data {case "开始\n":// 如果从 ch1 信道成功接收数据,则执行该分支代码fmt.Println("如果从 ch1 信道成功接收数据,则执行该分支代码")if err := stream.Send(&proto.Response{Output: "开始\n"}); err != nil {return}fmt.Println("发送:" + "开始\n")case "结束\n":// 如果从 ch1 信道成功接收数据,则执行该分支代码fmt.Println("如果从 ch1 信道成功接收数据,则执行该分支代码")if err := stream.Send(&proto.Response{Output: "结束\n"}); err != nil {return}fmt.Println("发送:" + "结束\n")getInput = "结束"}default://fmt.Printf("没有操作\n")}//fmt.Println("检测")time.Sleep(time.Second * 1)}}()//go func() {//    for {//     fmt.Println("检测sendMessage")//        if sendMessage == "开始\n" {//            if err := stream.Send(&proto.Response{Output: sendMessage}); err != nil {//               return//            }//         fmt.Println("发送:" + sendMessage)//            return//        }//     time.Sleep(time.Second * 1)//   }//}()for {select {case <-ctx.Done():log.Println("收到客户端通过context发出的终止信号")return ctx.Err()default:// 接收从客户端发来的消息输入, err := stream.Recv()if err == io.EOF {log.Println("客户端发送的数据流结束")return nil}if err != nil {log.Println("接收数据出错:", err)return err}// 如果接收正常,则根据接收到的 字符串 执行相应的指令switch 输入.Input {case "结束对话\n":log.Println("收到'结束对话'指令")if err := stream.Send(&proto.Response{Output: "收到结束指令"}); err != nil {return err}// 收到结束指令时,通过 return nil 终止双向数据流return nilcase "返回数据流\n":log.Println("收到'返回数据流'指令")// 收到 收到'返回数据流'指令, 连续返回 10 条数据for i := 0; i < 10; i++ {if err := stream.Send(&proto.Response{Output: "数据流 #" + strconv.Itoa(i)}); err != nil {return err}}default:// 缺省情况下, 返回 '服务端返回: ' + 输入信息log.Printf("[收到消息]: %s", 输入.Input)getInput = 输入.Inputif err := stream.Send(&proto.Response{Output: sendMessage}); err != nil {return err}//if err := stream.Send(&proto.Response{Output: "服务端返回: " + time.Now().Format("2006-01-02 15:04:05")}); err != nil {// return err//}}}}
}func esSSE(c *gin.Context) {w := c.Writerr := c.Requestfmt.Println(r.Method)appId := r.URL.Query()["appId"]page := r.URL.Query()["page"]pageSize := r.URL.Query()["pageSize"]fmt.Println("获取到参数")fmt.Println(appId)fmt.Println(page)fmt.Println(pageSize)w.Header().Set("Content-Type", "text/event-stream")w.Header().Set("Cache-Control", "no-cache")w.Header().Set("Connection", "keep-alive")w.Header().Set("Access-Control-Allow-Origin", "*")flusher, ok := w.(http.Flusher)if !ok {log.Panic("server not support")}for {if getInput == "" || getInput == "结束" {return}time.Sleep(1 * time.Second)//fmt.Fprintf(w, "data: %d%s%s%s\n\n", i, appId[0], page[0], pageSize[0])//now := time.Now()//timeStr := now.Format("2006-01-02 15:04:05")fmt.Fprintf(w, "data: %s\n\n", getInput)flusher.Flush()}fmt.Fprintf(w, "event: close\ndata: close\n\n") // 一定要带上data,否则无效
}func main() {r := gin.Default()r.GET("/", func(c *gin.Context) {start := c.Query("start")if start != "" {ch1 <- "开始\n"//sendMessage = "开始\n"} else {ch1 <- "结束\n"//sendMessage = "结束\n"}c.String(http.StatusOK, "hello word")})r.GET("/event", esSSE)go r.Run(":8000")log.Println("启动服务端...")server := grpc.NewServer()// 注册 ChatServerproto.RegisterControlServer(server, &Streamer{})address, err := net.Listen("tcp", ":3000")if err != nil {panic(err)}if err := server.Serve(address); err != nil {panic(err)}
}

网页实现

<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><title>Title</title>
</head>
<body>
<script>document.addEventListener("DOMContentLoaded", ready);function ready() {console.log("触发生命周期")// var evsrc = new EventSource("http://localhost:8080/evebts")var page = "1"var pageSize = "10"var appid = "20"// var url = 'http://127.0.0.1:8000/event?page=' + page + '&pageSize=' + pageSize + '&appId=' + appidvar url = 'http://192.168.10.6:8000/event?page=' + page + '&pageSize=' + pageSize + '&appId=' + appidvar evsrc = new EventSource(url)evsrc.onmessage = function (ev) {console.log("收到")console.log(ev.data)}evsrc.onerror = function (ev) {// console.log("出错")console.log(ev.currentTarget)}}
</script></body>
</html>

Gin + gRPC双向流模式 + sse实现grpc客户端到前端界面的消息推送相关推荐

  1. Web消息推送之SSE

    文章目录 一.消息推送简介 1.消息推送介绍 2.几种方式介绍 二.SSE原理介绍 1.SSE基础概念 2.SSE特点 3.SSE与WebSocket异同 三.SSE推送实现 1.概述 1.1 使用S ...

  2. ASP.NET Core 3.0 gRPC 双向流

    目录 ASP.NET Core 3.0 使用gRPC ASP.NET Core 3.0 gRPC 双向流 ASP.NET Core 3.0 gRPC 认证授权 一.前言 在前一文 < 二. 什么 ...

  3. grpc双向流究竟是什么情况?2段代码告诉你

    本文分享自华为云社区<grpc双向流究竟是什么情况?2段代码告诉你>,作者:breakDawn. 为什么需要grpc双向流? 有时候请求调用和返回过程,并不是简单的一问一答形式,可能会涉及 ...

  4. 干货 | Reactive模式在Trip.com消息推送平台上的实践

    作者简介 KevinTen,携程后端开发工程师,关注Reactive和RPC领域,深度参与开源社区,对Reactive技术有浓厚兴趣. Pin,携程技术专家,Apache Dubbo贡献者,关注RPC ...

  5. sse java8_HTTP 服务器消息推送之SSE

    HTTP 服务器推送也称 HTTP 流,是一种客户端-服务器通信模式,它将信息从 HTTP 服务器异步推送到客户端,而无需客户端请求.现在的 web 和 app 中,越来越多的场景使用这种通信模式,比 ...

  6. SSE(Server-sent Events)实现Web消息推送(SpringBoot)

    本文参考自: Web消息推送之SSE_魅Lemon的博客-CSDN博客_sse推送 [IT老齐237]超好用Web服务端主动推送技术SSE_哔哩哔哩_bilibili 1.Web消息推送简介 短轮询 ...

  7. SSE 服务端消息推送

    SSE(Server-sent events) SSE 它是基于 HTTP 协议的,一般意义上的 HTTP 协议是无法做到服务端主动向客户端推送消息的.有一种变通方法,就是服务器向客户端声明,发送的是 ...

  8. grpc双向流 python_gRPC Golang/Python使用

    gRPC Golang/Python使用 以前开发网站都是用http协议,学过TCP/IP协议的人都知道,在传输层TCP的基础上,应用层HTTP就是填充了一定规则的文本. 1.gRPC使用和介绍 工作 ...

  9. c# 实现 Server-Sent Events (SSE),服务器单方面消息推送 [案例版]

    游览器通讯技术其实有很多,相较于 WebSocket 而言,Server-Sent Events (简称SSE)更少被人知晓,具体实践也较少. 但是,实现却是简单的,其中 IE / Edge 几乎根本 ...

最新文章

  1. mySql中The user specified as a definer ('root'@'%') does not exist
  2. anaconda3下opencv安装
  3. 设计模式总结 (2)创建型模式
  4. 【Mark 常用方法】Html中<form>标签作用和属性详解
  5. Ubuntu使用技巧(一)
  6. ARTS打卡计划第6周-REVIEW-超越编码的避免项目失败的软技能
  7. 浅谈-tomcat中的项目之间的访问
  8. 分盘存储:实现数据库备集群备份文件分散存储
  9. dialog能提交数据吗_硬盘坏了能恢复数据吗?实用硬盘修复软件
  10. 常用电脑端口作用大曝光
  11. 获取微信公众号文章封面图的方法
  12. echarts实现复合饼图
  13. 如何下载jQuery
  14. 【转载】Java分布式键-值缓存系统Voldemort
  15. 老厉害了!2600亿,紫光集团南京再投半导体生产线
  16. tensorflow2.x实现人脸关键点检测
  17. Linux系统编程-信号入门2
  18. 如何发好外贸邮件,看亚马逊SES邮件服务商怎么说?
  19. 微信小程序 修改button为圆形按钮并设置图片
  20. 【这些题我一拿到手就会】C指针和数组试题详解(上)

热门文章

  1. 什么叫反光识别读数识别_回老家前 微信这个超实用小技巧一定要学会
  2. 计算机应用基础考证,计算机应用基础准考证
  3. 而多乐在线书签导入html文件,,简单介绍HTML5中的文件导入
  4. x264源码分析-psy-rd参数
  5. 如何让照片保持一百年不变色——富士instax SHARE SP-2体验探秘
  6. 用计算机写作与用笔写作,电脑写作vs手写作文,技术真能提高写作成绩吗?
  7. 2020年第十届C/C++ B组第一场蓝桥杯省赛真题
  8. UnixLinux 索引
  9. WPS Word表格转成文字或文字制成表格的快捷方法
  10. MySql 5.7 详细参数说明