一.什么是消息推送

推送的场景比较多,比如有人关注我的公众号,这时我就会收到一条推送消息,以此来吸引我点击打开应用。

  • 消息推送(push)通常是指网站的运营工作等人员,通过某种工具对用户当前网页或移动设备APP进行的主动消息推送。

    • 消息推送一般又分为web端消息推送移动端消息推送

    • 上边的这种属于移动端消息推送web端消息推送常见的诸如站内信、未读邮件数量、监控报警数量等,应用的也非常广泛。

    • 如上图所示只要触发某个事件(主动分享了资源或者后台主动推送消息),web页面的通知小红点就会实时的+1就可以了。

  • 通常在服务端会有若干张消息推送表,用来记录用户触发不同事件所推送不同类型的消息,前端主动查询(拉)或者被动接收(推)用户所有未读的消息数

    • 因此消息推送无非是推(push)和拉(pull)数据两种形式
    CREATE TABLE `message_record` (`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',`template_id` bigint unsigned NOT NULL COMMENT '消息模板ID',`type` int NOT NULL DEFAULT '1' COMMENT '推送渠道 1短信 2邮件 3微信4APP',`receiver` varchar(128) NOT NULL DEFAULT '' COMMENT '消息接收者(手机号,邮箱号,微信openid等)',`device_info` varchar(128) NOT NULL DEFAULT '' COMMENT 'APP推送终端设备信息',`content` varchar(1024) NOT NULL COMMENT '消息推送内容',`deleted` tinyint NOT NULL DEFAULT '0' COMMENT '逻辑删除标记:1删除; O未删除',`create_by` bigint unsigned NOT NULL COMMENT '创建人',`create_time` datetime NOT NULL COMMENT '创建时间',`update_by` bigint unsigned NOT NULL COMMENT '修改人',`update_time` datetime NOT NULL COMMENT '修改时间',PRIMARY KEY (`id`),KEY `idx_template_id` (`template_id`),KEY `idx receiver` (`receiver`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='消息推送记录表'
    

二.服务端推送常用方式

1.短轮询(Polling)

  • 客户端定时向服务端发起请求,服务端收到请求后立即返回,客户端再做渲染显示。

    • 使用JS定时器 间隔时间拉取服务端数据
setInterval(() => { //发起请求、处理响应} , 1000);
  • 确定:短轮询无论服务端是否发生数据变更,客户端都会进行请求,势必会对服务端造成很大压力,浪费带宽和服务器资源

2.长轮询(Long Polling)

客户端向服务端发起请求,服务器端到请求后保持连接不断开,直到数据有更新才返回响应并关闭连接,客户端处理完响应信息后再向服务端发送新的请求。

  • 原理:是servlet的异步长连接请求。即异步请求中在原始的请求返回的时并没有关闭连接,关闭的只是处理请求的那个线程(一般是回收的线程池里了),只有在异步请求全部处理完之后才会关闭连接。
  • 具体实现:如Spring的DeferredResult可以允许容器线程快速释放占用的资源,不阻塞请求线程,以此接受更多的请求提升系统的吞吐量,然后启动异步工作线程处理真正的业务逻辑,处理完成调用DeferredResult.setResult(200)提交响应结果。
    • 接口返回DeferredResult,或者调用setResult设值时不会返回,**当前Servlet容器线程会结束,由DeferredResult另起线程来进行结果处理并setResul,如果超时或设置setResult,接口会立即返回

实例1

要求:请求http://localhost:8080/get/requestId=1时,页面处于等待状态;当访问http://localhost:8080/set/requestId=1,前面的页面会返回"处理成功 1"。

@Controller
@RequestMapping(value = "/")
public class DeferredResultController {private Map<String,  DeferredResult<String>> deferredResultMap    = new ConcurrentHashMap<>();;/*** 为了方便测试,简单模拟一个   多个请求用同一个requestId会出问题*/@ResponseBody@GetMapping("/get")public DeferredResult<String> get(@RequestParam String requestId,@RequestParam(value = "timeout", required = false, defaultValue = "5000") Long timeout) {System.out.println("start get");//初始化延时对象,超时时间为5sDeferredResult<String> deferredResult = new DeferredResult<>(timeout);// 请求超时的回调函数deferredResult.onTimeout(() -> {//返回处理超时deferredResult.setResult("处理超时");//超时该处理任务deferredResultMap.remove(requestId);});//如果不存在的requestId直接抛异常Optional.ofNullable(deferredResultMap).filter(t -> !t.containsKey(requestId)).orElseThrow(() -> new IllegalArgumentException(String.format("requestId=%s is existing", requestId)));deferredResultMap.put(requestId,deferredResult);System.out.println("end get");return deferredResult;}/*** 设置DeferredResult对象的result属性,模拟异步操作*/@ResponseBody@GetMapping(value = "/set")public String settingResult(@RequestParam String requestId) {//--------------------这里相当于异步的操作方法 设置DeferredResult对象的setResult方法--------if (deferredResultMap.containsKey(requestId)) {DeferredResult<String> deferredResult = deferredResultMap.get(requestId);deferredResult.setResult("处理成功:"+requestId);deferredResultMap.remove(requestId);}return "Done";}
}

实例2

要求:接口/test 接收请求后,立即将请求入队receiveQueue后台线程自旋执行队列receiveQueue任务,任务完成后将结果入队resultQueue,如果监听器线程监听resultQueue,如果有任务结果,则将结果赋值给DeferredResult,返回结果响应。

定义Task,封装了DeferredResult对象和收到的消息对象,以及一个是否超时标记,用于任务完成后取出每个请求消息对应的DeferredResult对象,返回消息给客户端.


@Data
@AllArgsConstructor
@NoArgsConstructor
public class Task<T> {//延时返回对象private DeferredResult<String> result;//延时消息private T message;//是否超时private Boolean isTimeout;
}

定义TaskQueue,用于管理队列及处理数据:

/*** 模拟队列类*/
@Component
public class TaskQueue {/*** 接收任务队列*/private BlockingQueue<Task<String>> receiveQueue = new LinkedBlockingDeque<>(5000);/*** 任务完成结果队列*/private BlockingQueue<Task<String>> resultQueue = new LinkedBlockingDeque<>(5000);/*** 初始化任务处理线程*/public TaskQueue() {this.run();}/*** 存入请求任务** @param task task实体* @throws InterruptedException*/public void put(Task<String> task) throws InterruptedException {receiveQueue.put(task);}/*** 获取任务完成结果** @return* @throws InterruptedException*/public Task<String> get() throws InterruptedException {return resultQueue.take();}/*** 处理任务* 开启一个新线程,自旋的从接收队列中取出数据,然后处理若干秒后,将成功数据放入成功队列.*   ,如果任务超时标志isTimeout超时,可以中断该任务的进行,在正常的service中,可以替换为数据库回滚等操作.*/private void run() {new Thread(() -> {while (true) {try {//从接收队列中取出任务,处理,然后放入成功队列Task<String> task = receiveQueue.take();System.out.println("队列收到数据,处理中!");Thread.sleep(1000);task.setMessage("成功");//TODO:如果超时了,中断该任务-此处应该加锁if (task.getIsTimeout()) {System.out.println("任务超时,处理线程中断该任务");continue;}resultQueue.put(task);System.out.println("队列处理完成!");} catch (InterruptedException e) {e.printStackTrace();}}}).start();}
}

定义队列监听线程, 当spring容器加载完毕,开启新线程,自旋的从模拟队列的完成队列中获取数据,并使用ReferredResult返回

@Component
public class QueueResultListener implements ApplicationListener<ContextRefreshedEvent> {@AutowiredTaskQueue taskQueue;@Overridepublic void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {new Thread(() -> {try {Task<String> task = taskQueue.get();task.getResult().setResult(task.getMessage());System.out.println("监听器获取到结果:task=" + task);} catch (InterruptedException e) {e.printStackTrace();}}).start();}
}

实现Controller异步接口

@Controller
public class DeferredResultQueueController {@AutowiredTaskQueue taskQueue;@ResponseBody@GetMapping("/test")public DeferredResult<String> test(@RequestParam String requestId,@RequestParam(value = "timeout", required = false, defaultValue = "5000") Long timeout) throws InterruptedException {//新建延期返回对象并设置超时时间,优先级比configureAsyncSupport方法中默认配置中的高System.out.println("start test");//初始化延迟任务DeferredResult<String> deferredResult = new DeferredResult<>(timeout);//要执行的任务Task<String> task = new Task<String>(deferredResult, "任务", false);//设置超时后执行的任务,优先级比DeferredResultProcessingInterceptor拦截器中的高deferredResult.onTimeout(() -> {System.out.println("任务超时 id=" + requestId);//TODO:告知该任务已经超时-此处应该加锁task.setMessage("任务超时");task.setIsTimeout(true);});//任务入队taskQueue.put(task);System.out.println("end test");return deferredResult;}
}

参考文章
Spring MVC3.2之后支持异步请求,能够在controller中返回一个Callable或者DeferredResult

  • 高性能关键技术之—体验Spring MVC的异步模式(Callable、WebAsyncTask、DeferredResult) 基础使用篇
  • 使用DeferredResult异步处理SpringMVC请求

3.MQTT

什么是 MQTT协议?

  • MQTT 全称(Message Queue Telemetry Transport):一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议通过订阅相应的主题来获取消息,是物联网(Internet of Thing)中的一个标准传输协议。

    • 该协议将消息的发布者(publisher)订阅者(subscriber)进行分离,因此可以在不可靠的网络环境中,为远程连接的设备提供可靠的消息服务,使用方式与传统的MQ有点类似。
  • TCP协议位于传输层,MQTT 协议位于应用层,MQTT 协议构建于TCP/IP协议上,也就是说只要支持TCP/IP协议栈的地方,都可以使用MQTT协议。

MQTT协议为什么在物联网(IOT)中如此受偏爱?而不是其它协议,比如我们更为熟悉的 HTTP协议呢?

  • 首先HTTP协议它是一种同步协议,客户端请求后需要等待服务端的响应。而在物联网(IOT)环境中,设备会很受制于环境影响,比如带宽低、网络延迟高、网络通信不稳定等,显然异步消息协议更为适合IOT应用程序。

  • HTTP是单向的,如果要获取消息客户端必须发起连接而在物联网(IOT)应用程序中,设备或传感器往往都是客户端,这意味着它们无法被动地接收来自网络的命令。 通常需要将一条命令或者消息,发送到网络上的所有设备上。HTTP要实现这样的功能不但很困难,而且成本极高。

springboot+rabbitmq实现智能家居实例详解

4.SSE

**SSE( Server-sent Events )**是 WebSocket 的一种轻量代替方案,使用 HTTP 协议,在服务器和客户端之间打开一个单向通道,只能服务器向客户端发送消息,服务端响应的不再是一次性的数据包,而是text/event-stream类型的数据流信息,在有数据变更时从服务器流式传输到客户端。

  • 整体的实现思路有点类似于在线视频播放,视频流会连续不断的推送到浏览器,你也可以理解成,客户端在完成一次用时很长(网络不畅)的下载

SSE与WebSocket作用相似,都可以建立服务端与浏览器之间的通信,实现服务端向客户端推送消息,但还是有些许不同:

  • SSE 是基于HTTP协议的,它们不需要特殊的协议或服务器实现即可工作;WebSocket需单独服务器来处理协议。
  • SSE 单向通信只能由服务端向客户端单向通信;webSocket全双工通信,即通信的双方可以同时发送和接收信息
  • SSE 实现简单开发成本低,无需引入其他组件;WebSocket传输数据需做二次解析,开发门槛高一些。
  • SSE 默认支持断线重连;WebSocket则需要自己实现。
  • SSE 只能传输文本消息,二进制数据需要经过编码后传送;WebSocket默认支持传送二进制数据。

在 html5 的定义中,服务端 sse,一般需要遵循以下规范

  • Content-Type: text/event-stream;charset=UTF-8
  • Cache-Control: no-cache
  • Connection: keep-alive

SSE 如何保证数据完整性

  • 客户端在每次接收到消息时,会把消息的 id 字段作为内部属性 Last-Event-ID 储存起来。
  • SSE 默认支持断线重连机制,在连接断开时会 触发 EventSource 的 error 事件,同时自动重连。再次连接成功时 EventSource 会把 Last-Event-ID 属性作为请求头发送给服务器,这样服务器就可以根据这个 Last-Event-ID 作出相应的处理。
    • 这里需要注意的是,id 字段不是必须的,服务器有可能不会在消息中带上 id 字段,这样子

springboot集成SSE简约版

实例:客户端发送请求到服务端,服务端以流的形式不断向客户端推送数据示例,增加帅气值。

  • 服务端代码(注意响应头以及固定返回数据格式)

    @Controller
    @RequestMapping(value = "/sse")
    public class SEEController {//响应头为text/event-stream;charset=UTF-8@RequestMapping(value = "/get", produces = "text/event-stream;charset=UTF-8")public void push(HttpServletResponse response) {response.setContentType("text/event-stream");response.setCharacterEncoding("utf-8");int i = 0;while (true) {try {Thread.sleep(1000);PrintWriter pw = response.getWriter();//注意返回数据必须以data:开头,"\n\n"结尾pw.write("data:xdm帅气值加" + i + "\n\n");pw.flush();//检测异常时断开连接if (pw.checkError()) {log.error("客户端断开连接");return;}} catch (Exception e) {e.printStackTrace();}i++;}}
    }
    
  • 前端代码(重写message、open、error事件)

    <html>
    <head>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
    <title>SSE Demo</title>
    </head>
    <body><div id="msg_from_server">空白</div><script type="text/javascript" src="../js/jquery.js"></script><script type="text/javascript">if (!!window.EventSource) {var source = new EventSource('/sse/get'); s = '';//客户端收到服务器发来的数据 另一种写法:source.onmessage = function (event) {}source.addEventListener('message', function(e) {s += e.data + "<br/>"$("#msg_from_server").html(s);});// 连接一旦建立,就会触发open事件   另一种写法:source.onopen = function (event) {}source.addEventListener('open', function(e) {console.log("连接打开.");}, false);// 如果发生通信错误(比如连接中断),就会触发error事件  另一种写法:source.onerror = function (event) {}source.addEventListener('error', function(e) {if (e.readyState == EventSource.CLOSED) {console.log("连接关闭");} else {console.log(e.readyState);}}, false);} else {alert(4);console.log("没有sse");}</script>
    </body>
    </html>
    

springboot集成SSE升级版

演示SSE的连接建立、接收数据和异常情况监听处理。

  • 服务端

    @Controller
    @RequestMapping(value = "/sse")
    @Slf4j
    public class SSEPlusController {private static Map<String, SseEmitter> cache = new ConcurrentHashMap<>();String clientId;int sseId;@GetMapping("/create")public SseEmitter create(@RequestParam(name = "clientId", required = false) String clientId) {// 设置超时时间,0表示不过期。默认30000毫秒//可以在客户端一直断网、直接关闭页面但未提醒后端的情况下,服务端在一定时间等待后自动关闭网络连接SseEmitter sseEmitter = new SseEmitter(0L);// 是否需要给客户端推送IDif (Strings.isBlank(clientId)) {clientId = UUID.randomUUID().toString();}this.clientId = clientId;cache.put(clientId, sseEmitter);log.info("sse连接,当前客户端:{}", clientId);return sseEmitter;}@Scheduled(cron = "0/3 * *  * * ? ")public void pushMessage() {try {sseId++;SseEmitter sseEmitter = cache.get(clientId);sseEmitter.send(SseEmitter.event().data("帅气值暴增" + sseId).id("" + sseId).reconnectTime(3000));} catch (Exception e) {log.error(e.getMessage());sseId--;}}@GetMapping("/close")public void close(String clientId) {SseEmitter sseEmitter = cache.get(clientId);if (sseEmitter != null) {sseEmitter.complete();cache.remove(clientId);}}
    }
    

复杂代码

/*** SSE长链接*/
@RestController
@RequestMapping("/sse")
public class SseEmitterController {@Autowiredprivate SseEmitterService sseEmitterService;/*** 创建SSE长链接** @param clientId   客户端唯一ID(如果为空,则由后端生成并返回给前端)* @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter* @author re* @date 2021/12/12**/@CrossOrigin //如果nginx做了跨域处理,此处可去掉@GetMapping("/CreateSseConnect")public SseEmitter createSseConnect(@RequestParam(name = "clientId", required = false) String clientId) {return sseEmitterService.createSseConnect(clientId);}/*** 关闭SSE连接** @param clientId 客户端ID* @author re* @date 2021/12/13**/@GetMapping("/CloseSseConnect")public Result closeSseConnect(String clientId) {sseEmitterService.closeSseConnect(clientId);return ResultGenerator.genSuccessResult(true);}}
@Service
public class SseEmitterServiceImpl implements SseEmitterService {/*** 容器,保存连接,用于输出返回*/private static Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>();@Overridepublic SseEmitter createSseConnect(String clientId) {// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutExceptionSseEmitter sseEmitter = new SseEmitter(0L);// 是否需要给客户端推送IDif (StringUtils.isBlank(clientId)) {clientId = IdUtil.simpleUUID();}// 注册回调sseEmitter.onCompletion(completionCallBack(clientId));sseCache.put(clientId, sseEmitter);logger.info("创建新的sse连接,当前用户:{}", clientId);try {sseEmitter.send(SseEmitter.event().id(SseEmitterConstant.CLIENT_ID).data(clientId));} catch (IOException e) {logger.error("SseEmitterServiceImpl[createSseConnect]: 创建长链接异常,客户端ID:{}", clientId, e);throw new BusinessException("创建连接异常!", e);}return sseEmitter;}@Overridepublic void closeSseConnect(String clientId) {SseEmitter sseEmitter = sseCache.get(clientId);if (sseEmitter != null) {sseEmitter.complete();removeUser(clientId);}}// 根据客户端id获取SseEmitter对象@Overridepublic SseEmitter getSseEmitterByClientId(String clientId) {return sseCache.get(clientId);}// 推送消息到客户端,此处结合业务代码,业务中需要推送消息处调用即可向客户端主动推送消息@Overridepublic void sendMsgToClient(List<SseEmitterResultVO> sseEmitterResultVOList) {if (CollectionUtil.isEmpty(sseCache)) {return;}for (Map.Entry<String, SseEmitter> entry : sseCache.entrySet()) {sendMsgToClientByClientId(entry.getKey(), sseEmitterResultVOList, entry.getValue());}}/*** 推送消息到客户端* 此处做了推送失败后,重试推送机制,可根据自己业务进行修改** @param clientId               客户端ID* @param sseEmitterResultVOList 推送信息,此处结合具体业务,定义自己的返回值即可* @author re* @date 2022/3/30**/private void sendMsgToClientByClientId(String clientId, List<SseEmitterResultVO> sseEmitterResultVOList, SseEmitter sseEmitter) {if (sseEmitter == null) {logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:客户端{}未创建长链接,失败消息:{}",clientId, sseEmitterResultVOList.toString());return;}SseEmitter.SseEventBuilder sendData = SseEmitter.event().id(SseEmitterConstant.TASK_RESULT).data(sseEmitterResultVOList, MediaType.APPLICATION_JSON);try {sseEmitter.send(sendData);} catch (IOException e) {// 推送消息失败,记录错误日志,进行重推logger.error("SseEmitterServiceImpl[sendMsgToClient]: 推送消息失败:{},尝试进行重推", sseEmitterResultVOList.toString(), e);boolean isSuccess = true;// 推送消息失败后,每隔10s推送一次,推送5次for (int i = 0; i < 5; i++) {try {Thread.sleep(10000);sseEmitter = sseCache.get(clientId);if (sseEmitter == null) {logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败,未创建长链接", clientId, i + 1);continue;}sseEmitter.send(sendData);} catch (Exception ex) {logger.error("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推失败", clientId, i + 1, ex);continue;}logger.info("SseEmitterServiceImpl[sendMsgToClient]:{}的第{}次消息重推成功,{}", clientId, i + 1, sseEmitterResultVOList.toString());return;}}}/*** 长链接完成后回调接口(即关闭连接时调用)** @param clientId 客户端ID* @return java.lang.Runnable* @author re* @date 2021/12/14**/private Runnable completionCallBack(String clientId) {return () -> {logger.info("结束连接:{}", clientId);removeUser(clientId);};}/*** 连接超时时调用** @param clientId 客户端ID* @return java.lang.Runnable* @author re* @date 2021/12/14**/private Runnable timeoutCallBack(String clientId) {return () -> {logger.info("连接超时:{}", clientId);removeUser(clientId);};}/*** 推送消息异常时,回调方法** @param clientId 客户端ID* @return java.util.function.Consumer<java.lang.Throwable>**/private Consumer<Throwable> errorCallBack(String clientId) {return throwable -> {logger.error("SseEmitterServiceImpl[errorCallBack]:连接异常,客户端ID:{}", clientId);// 推送消息失败后,每隔10s推送一次,推送5次for (int i = 0; i < 5; i++) {try {Thread.sleep(10000);SseEmitter sseEmitter = sseCache.get(clientId);if (sseEmitter == null) {logger.error("SseEmitterServiceImpl[errorCallBack]:第{}次消息重推失败,未获取到 {} 对应的长链接", i + 1, clientId);continue;}sseEmitter.send("失败后重新推送");} catch (Exception e) {e.printStackTrace();}}};}/*** 移除用户连接* @param clientId 客户端ID* @author re**/private void removeUser(String clientId) {sseCache.remove(clientId);logger.info("SseEmitterServiceImpl[removeUser]:移除用户:{}", clientId);}

当请求超过设置的超时时间,会抛出AsyncRequestTimeoutException异常,这里直接用@ControllerAdvice全局捕获统一返回即可,前端获取约定好的状态码后再次发起长轮询请求,如此往复调用。


@ControllerAdvice
public class AsyncRequestTimeoutHandler {@ResponseStatus(HttpStatus.NOT_MODIFIED)@ResponseBody@ExceptionHandler(AsyncRequestTimeoutException.class)public String asyncRequestTimeoutHandler(AsyncRequestTimeoutException e) {System.out.println("异步请求超时");return "304";}
}
SseEmitter.event()
用来得到一个记录数据的容器。.data("帅气值暴增" + sseId)
发送给客户端的数据。.id("" + sseId)
记录发送数据的标识,服务端可以通过HttpServletRequest的请求头中拿到这个id,判断是否中间有误漏发数据。.reconnectTime(3000)
定义在网络连接断开后,客户端向后端发起重连的时间间隔(以毫秒为单位)。
  • 客户端:
    注:若浏览器不兼容在页面引入evensource.js。

    <script src=/eventsource-polyfill.js></script>
    
    <!DOCTYPE html>
    <html lang="en">
    <head><meta charset="UTF-8"><title> Springboot集成SSE升级版</title>
    </head>
    <script>let source = null;const clientId = new Date().getTime();if (!!window.EventSource) {source = new EventSource('/sse/create?clientId=' + clientId);//建立连接source.onopen = function (event) {setMessageInnerHTML("建立连接" + event);}//接收数据source.onmessage = function (event) {setMessageInnerHTML(event.data);}//错误监听source.onerror = function (event) {if (event.readyState === EventSource.CLOSED) {setMessageInnerHTML("连接关闭");} else {console.log(event);}}} else {setMessageInnerHTML("浏览器不支持SSE");}// 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据window.onbeforeunload = function () {close();};// 关闭Sse连接function close() {source.close();const httpRequest = new XMLHttpRequest();httpRequest.open('GET', '/sse/close/?clientId=' + clientId, true);httpRequest.send();console.log("close");}// 显示消息function setMessageInnerHTML(innerHTML) {document.getElementById('text').innerHTML += innerHTML + '<br/>';}
    </script>
    <body>
    <button onclick="close()">关闭连接</button>
    <div id="text"></div>
    </body>
    </html>
    

SSE常见问题

  1. 如果项目中使用nginx对后端服务做了代理,nginx代理转发后,默认会在1min的时候断掉长链接,SSE需要设置自己的长链接时间,则需要在nginx中进行配置;

    在反向代理的location块中加入如下配置
    proxy_set_header Host $http_host;  ##proxy_set_header用来重定义发往后端服务器的请求头
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
    proxy_set_header X-Forwarded-Proto $scheme;
    proxy_buffering off;
    proxy_http_version  1.1;
    proxy_read_timeout 600s; ##设置SSE长链接保持时间为 600s
    
  2. 前端报错:EventSource’s response has a MIME type (“application/json”) that is not “text/event-stream”. Aborting the connection
    前端在创建SSE长链接时,完整的请求(包括参数和参数值)都必须放在new EventSource(完整请求)中;

  3. 创建长链接时,接口状态一直处于pending,检查后端nginx是否做相应配置;

  4. 推送消息失败:检查客户端创建链接时的id,和推送消息时的id是否一致;

5.websocket

特点

  • WebSocket是html5出现的一种在·TCP连接上进行全双工通信的协议
  • 浏览器和服务器仅需一次握手,就可以建立持久性的连接,并进行双向数据传输
  • WebSocket目前支持ws和wss两种模式,对应HTTP和HTTPS。

websocket运用场景:

  • 即时通讯:多媒体聊天,你可以使用该技术开个聊天室,聊个火热。可以单独2人聊个畅快。
  • 互动游戏:现在多人游戏越来越火热,那么多人游戏必须是时时的,不考虑其他因素,只是时效性方面,也可以用该技术做多人游戏。
  • 协同合作:开发人员会有git,svn等代码管理工具,但还是会有冲突。用此技术开发一个文档协同编辑工具,使每个人的编辑内容都时时同步,将不会有冲突发生。
  • 动态数据表报:类似通知变更,如有需求,可以开发一个时时的数据报表,使用此技术,服务端数据发生变化,可在表报上立刻显示出来。如,电商平台的交易数据,每时每刻都在变化着,可以时时监控。
  • 实时工具:如导航,实时查询工具等也可使用。

支持WebSocket的主流浏览器如下:

  • Chrome
  • Firefox
  • IE >= 10
  • Sarafi >= 6
  • Android >= 4.4
  • iOS >= 8

5.1.原生WebSocket-客户端的简单示例

var ws = new WebSocket("wss://echo.websocket.org");ws.onopen = function(evt) { console.log("Connection open ..."); ws.send("Hello WebSockets!");
};ws.onmessage = function(evt) {console.log( "Received Message: " + evt.data);ws.close();
};ws.onclose = function(evt) {console.log("Connection closed.");
};

5.2.原生WebSocket-客户端的 API

1.构造函数

  • WebSocket 对象作为一个构造函数,用于新建 WebSocket 实例。
var ws = new WebSocket('ws://localhost:8080');
  • 执行上面语句之后,客户端就会与服务器进行连接。
    实例对象的所有属性和方法清单,参见这里。

2.属性

webSocket.readyState
  • 实例对象的readyState属性返回实例对象的当前状态,共有四种。
CONNECTING:值为0,表示连接尚未建立
OPEN:值为1,表示连接成功,可以通信了。
CLOSING:值为2,表示连接正在关闭。
CLOSED:值为3,表示连接已经关闭,或者打开连接失败。
webSocket.bufferedAmount
  • 实例对象的bufferedAmount属性,表示还有多少字节的二进制数据没有发送出去。它可以用来判断发送是否结束。
var data = new ArrayBuffer(10000000);
socket.send(data);if (socket.bufferedAmount === 0) {// 发送完毕
} else {// 发送还没结束
}

3.事件

webSocket.onopen
  • 实例对象的onopen属性,用于指定连接成功后的回调函数。
ws.onopen = function () {ws.send('Hello Server!');
}
  • 如果要指定多个回调函数,可以使用addEventListener方法。

ws.addEventListener('open', function (event) {ws.send('Hello Server!');
});
webSocket.onclose
  • 实例对象的onclose属性,用于指定连接关闭后的回调函数。

ws.onclose = function(event) {var code = event.code;var reason = event.reason;var wasClean = event.wasClean;// handle close event
};ws.addEventListener("close", function(event) {var code = event.code;var reason = event.reason;var wasClean = event.wasClean;// handle close event
webSocket.onmessage
  • 实例对象的onmessage属性,用于指定收到服务器数据后的回调函数。

ws.onmessage = function(event) {var data = event.data;// 处理数据
};ws.addEventListener("message", function(event) {var data = event.data;// 处理数据
});
  • 注意,服务器数据可能是文本,也可能是二进制数据(blob对象或Arraybuffer对象)。
ws.onmessage = function(event){if(typeof event.data === String) {console.log("Received data string");}if(event.data instanceof ArrayBuffer){var buffer = event.data;console.log("Received arraybuffer");}
}
  • 除了动态判断收到的数据类型,也可以使用binaryType属性,显式指定收到的二进制数据类型。

// 收到的是 blob 数据
ws.binaryType = "blob";
ws.onmessage = function(e) {console.log(e.data.size);
};// 收到的是 ArrayBuffer 数据
ws.binaryType = "arraybuffer";
ws.onmessage = function(e) {console.log(e.data.byteLength);
};
webSocket.onerror
  • 实例对象的onerror属性,用于指定报错时的回调函数。
socket.onerror = function(event) {// handle error event
};socket.addEventListener("error", function(event) {// handle error event
});

4.方法

webSocket.send()
  • 实例对象的send()方法用于向服务器发送数据。

发送文本的例子。

ws.send('your message');

发送 Blob 对象的例子。

var file = document.querySelector('input[type="file"]').files[0];
ws.send(file);

发送 ArrayBuffer 对象的例子。

// Sending canvas ImageData as ArrayBuffer
var img = canvas_context.getImageData(0, 0, 400, 320);
var binary = new Uint8Array(img.data.length);
for (var i = 0; i < img.data.length; i++) {binary[i] = img.data[i];
}
ws.send(binary.buffer);
webSocket.close()
  • 关闭连接

6.具体实现

常用的 Node 实现

  • WebSockets
  • Socket.IO
  • WebSocket-Node

常用的 Java实现

  • 使用tomcat的websocket实现

    • Tomcat的方式需要tomcat 7.x,Java7的支持。
  • 使用spring的websocket
    • spring与websocket整合需要spring 4.x,并且使用了socketjs,对不支持websocket的浏览器可以模拟websocket使用

Tomcat实现websocket

  • 使用这种方式无需任何配置,只需服务端一个处理类

服务端

使用@ServerEndpoint标注当前类为一个websocket服务器,客户端可以通过ws://localhost:8088/webSocketByTomcat/10086来连接到WebSocket服务器端。

@ServerEndpoint("/webSocketByTomcat/{username}")
public class WebSocketServer {//在线人数private static int onlineCount = 0;//存储会话private static Map<String, WebSocketServer> clients = new ConcurrentHashMap<>();//当前会话private Session session;//当前用户private String username;//建立连接@OnOpenpublic void onOpen(@PathParam("username") String username, Session session) throws IOException {this.username = username;this.session = session;//自增在线人数addOnlineCount();//存储当前会话clients.put(username, this);System.out.println("已连接");}//连接关闭@OnClosepublic void onClose() throws IOException {//移除当前会话clients.remove(username);//自减在线人数subOnlineCount();}//发送消息客户客户端@OnMessagepublic void onMessage(String message) throws IOException {JSONObject jsonTo = JSONObject.fromObject(message);//单独发if (!jsonTo.get("To").equals("All")){sendMessageTo("给一个人", jsonTo.get("To").toString());}//群发else{sendMessageAll("给所有人");}}//连接失败@OnErrorpublic void onError(Session session, Throwable error) {error.printStackTrace();}//发送消息给指定客户端public void sendMessageTo(String message, String to) throws IOException {// session.getBasicRemote().sendText(message);//session.getAsyncRemote().sendText(message);for (WebSocketServer item : clients.values()) {if (item.username.equals(to) ) {item.session.getAsyncRemote().sendText(message);}}}//发送消息给所有客户端public void sendMessageAll(String message) throws IOException {for (WebSocketServer item : clients.values()) {item.session.getAsyncRemote().sendText(message);}}//获取在线人数public static synchronized int getOnlineCount() {return onlineCount;}//自增在线人数public static synchronized void addOnlineCount() {WebSocketServer.onlineCount++;}//自减在线人数public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; }//获取所有客户端public static synchronized Map<String, WebSocketServer> getClients() {return clients;}
}

前端

客户端

  • 前端初始化WebSocket连接,并监听连接状态,接收服务端数据或向服务端发送数据。
  • 注意导入sockjs时要使用地址全称,并且连接使用的是http而不是websocket的ws
<%@ page language="java" import="java.util.*" pageEncoding="utf-8"%>
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c"%>
<%@ taglib uri="http://java.sun.com/jsp/jstl/fmt" prefix="fmt"%>
<c:set var="ctx" value="${pageContext.request.contextPath}" />
<c:set var="ctxpath"value="${pageContext.request.scheme}${'://'}${pageContext.request.serverName}${':'}${pageContext.request.serverPort}${pageContext.request.contextPath}" />
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<html>
<head>
<meta charset=UTF-8">
<title>登录测试</title>
</head>
<body><h2>Hello World!</h2><div><span>sessionId:</span><% HttpSession s= request.getSession(); out.println(s.getId());%></div><input id="sessionId" type="hidden" value="<%=session.getId() %>" /><input id="text" type="text" /><button onclick="send()">发送消息</button><hr /><button onclick="closeWebSocket()">关闭WebSocket连接</button><hr /><div id="message"></div>
</body>
<script type="text/javascript" src="http://localhost:8088/static/js/sockjs-0.3.min.js"></script>
<script type="text/javascript">  //初始化websocket连接var websocket = null;  if('WebSocket' in window) {websocket = new WebSocket("ws://localhost:8088/websocket/webSocketByTomcat/"+document.getElementById('sessionId').value);  } else if('MozWebSocket' in window) {websocket = new MozWebSocket("ws://localhost:8088/websocket/webSocketByTomcat/"+document.getElementById('sessionId').value);} else {websocket = new SockJS("localhost:8088/websocket/webSocketByTomcat/"+document.getElementById('sessionId').value);}// 获取连接状态console.log('ws连接状态:' + ws.readyState);//连接发生错误的websocket.onerror = function () {  setMessageInnerHTML("WebSocket连接发生错误");  };  //连接成功websocket.onopen = function () {  setMessageInnerHTML("WebSocket连接成功");  }  //接收到服务端消息websocket.onmessage = function (event) {  setMessageInnerHTML(event.data);  }  //连接关闭websocket.onclose = function () {  setMessageInnerHTML("WebSocket连接关闭");  }  //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。  window.onbeforeunload = function () {  closeWebSocket();  }  //将消息显示在网页上  function setMessageInnerHTML(innerHTML) {  document.getElementById('message').innerHTML += innerHTML + '<br/>';  }  //关闭WebSocket连接  function closeWebSocket() {  websocket.close();  }  //发送消息  function send() {  var message = document.getElementById('text').value;  websocket.send(message);  }  </script>
</html>
  • websocket.send(“发送消息”),会触发服务端的onMessage()方法
  • 连接建立成功时调用send(),可以在服务器端onOpen()方法,接收到消息。
  • 关闭websocket时,触发服务器端onclose()方法,此时也可以发送消息,但是不能发送给自己,因为自己的已经关闭了连接,但是可以发送给其他人

SpringBoot整合websocket

引入依赖

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

具体实现

  • 继承TextWebSocketHandler类实现WebSocketHandler 接口进行消息处理,如是发给一个人,还是发给所有人,以及前端连接时触发的一些事件
服务端
/*** WebSocket server*/
@Service
@Slf4j
public class CustomWebSocketHandler extends TextWebSocketHandler implements WebSocketHandler {// 在线用户列表private static final Map<String, WebSocketSession> clients = new HashMap<>();// 用户标识private static final String CLIENT_ID = "mchNo";/*** 连接成功时候,onopen方法()*/@Overridepublic void afterConnectionEstablished(WebSocketSession session) throws Exception {log.info("成功建立websocket-spring连接");String clientId = getClientId(session);if (StringUtils.isNotEmpty(clientId)) {//存储会话clients.put(clientId, session);session.sendMessage(new TextMessage("成功建立websocket-spring连接"));log.info("用户标识:{},Session:{}", clientId, session.toString());}}/*** 调用websocket.send()时候,会调用该方法*/@Overridepublic void handleTextMessage(WebSocketSession session, TextMessage message) {log.info("收到客户端消息:{}", message.getPayload());JSONObject msgJson = JSONObject.parseObject(message.getPayload());//接受标识String to = msgJson.getString("to");//接受消息String msg = msgJson.getString("msg");WebSocketMessage<?> webSocketMessageServer = new TextMessage("server:" + message);try {session.sendMessage(webSocketMessageServer);//广播到所有在线用户if ("all".equals(to.toLowerCase())) {sendMessageToAllUsers(new TextMessage(getClientId(session) + ":" + msg));}//单独发送else {sendMessageToUser(to, new TextMessage(getClientId(session) + ":" + msg));}} catch (IOException e) {log.info("handleTextMessage method error:{}", e);}}@Overridepublic void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {if (session.isOpen()) {session.close();}log.info("连接出错");clients.remove(getClientId(session));}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {log.info("连接已关闭:" + status);clients.remove(getClientId(session));}@Overridepublic boolean supportsPartialMessages() {return false;}public void sendMessage(String jsonData) {log.info("收到客户端消息sendMessage:{}", jsonData);JSONObject msgJson = JSONObject.parseObject(jsonData);String clientId = StringUtils.isEmpty(msgJson.getString(CLIENT_ID)) ? "陌生人" : msgJson.getString(CLIENT_ID);String to = msgJson.getString("to");String msg = msgJson.getString("msg");if ("all".equals(to.toLowerCase())) {sendMessageToAllUsers(new TextMessage(clientId + ":" + msg));}else {sendMessageToUser(to, new TextMessage(clientId + ":" + msg));}}/*** 发送信息给指定用户*/public boolean sendMessageToUser(String clientId, TextMessage message) {if (clients.get(clientId) == null) {return false;}WebSocketSession session = clients.get(clientId);log.info("sendMessage:{} ,msg:{}", session, message.getPayload());if (!session.isOpen()) {log.info("客户端:{},已断开连接,发送消息失败", clientId);return false;}try {session.sendMessage(message);} catch (IOException e) {log.info("sendMessageToUser method error:{}", e);return false;}return true;}/*** 广播信息-给所有在线用户发送消息*/public boolean sendMessageToAllUsers(TextMessage message) {boolean allSendSuccess = true;Set<String> clientSet = clients.keySet();WebSocketSession session = null;for (String clientId : clientSet) {try {session = clients.get(clientId);if (session.isOpen()) {session.sendMessage(message);}else {log.info("客户端:{},已断开连接,发送消息失败", clientId);}} catch (IOException e) {log.info("sendMessageToAllUsers method error:{}", e);allSendSuccess = false;}}return allSendSuccess;}/*** 获取用户标识*/private String getClientId(WebSocketSession session) {try {return session.getAttributes().get(CLIENT_ID).toString();} catch (Exception e) {return null;}}
}
  • 如果把websocketSession和httpsession对应起来就能根据当前不同的session,定向对websocketSession进行数据返回

    • spring中有一个拦截器接口,HandshakeInterceptor,通过实现该接口来拦截握手过程,向其中添加属性
/*** WebSocket握手时的拦截器*/
@Slf4j
public class CustomWebSocketInterceptor implements HandshakeInterceptor {/*** 关联HeepSession和WebSocketSession,* beforeHandShake方法中的Map参数 就是对应websocketSession里的属性*/@Overridepublic boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler handler, Map<String, Object> map) throws Exception {if (request instanceof ServletServerHttpRequest) {log.info("*****beforeHandshake******");HttpServletRequest httpServletRequest = ((ServletServerHttpRequest) request).getServletRequest();HttpSession session = httpServletRequest.getSession(true);log.info("clientId:{}", httpServletRequest.getParameter("clientId"));if (session != null) {map.put("sessionId",session.getId());map.put("clientId", httpServletRequest.getParameter("clientId"));}}return true;}@Overridepublic void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {log.info("******afterHandshake******");}
}

配置类注入handler

/*** websocket的配置类*/
@Configuration
@EnableWebSocket
public class CustomWebSocketConfig implements WebSocketConfigurer {@Overridepublic void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(customWebSocketHandler(), "/webSocketBySpring/customWebSocketHandler").addInterceptors(new CustomWebSocketInterceptor()).setAllowedOrigins("*");registry.addHandler(customWebSocketHandler(), "/sockjs/webSocketBySpring/customWebSocketHandler").addInterceptors(new CustomWebSocketInterceptor()).setAllowedOrigins("*").withSockJS();}@Beanpublic WebSocketHandler customWebSocketHandler() {return new CustomWebSocketHandler();}
}
  • setAllowedOrigins("*")一定要加上,不然只有访问localhost,其他的不予许访问

    • 经查阅官方文档spring-websocket 4.1.5版本前默认支持跨域访问,之后的版本默认不支持跨域,需要设置

使用withSockJS()的原因:

  • 一些浏览器中缺少对WebSocket的支持,因此,回退选项是必要的,而Spring框架提供了基于SockJS协议的透明的回退选项。

    • SockJS的一大好处在于提供了浏览器兼容性。·优先使用原生WebSocket,如果在不支持websocket的浏览器中,会自动降为轮询的方式·。

如果代码中添加了withSockJS()如下,服务器也会自动降级为轮询

registry.addEndpoint("/coordination").withSockJS();
前端
  • 同上《Tomcat实现websocket》jsp代码,替换webSocket的请求路径即可

【JavaWeb】小白也能看懂的服务器推送技术(WebSocket和SSE)相关推荐

  1. JS 服务器推送技术 WebSocket 入门指北

    作者: 前端下午茶  公号 / SHERlocked93 最近在工作中遇到了需要服务器推送消息的场景,这里总结一下收集整理WebSocket相关资料的收获. 1. 概述 1.1 服务器推送 WebSo ...

  2. 服务器推送技术之短轮询、长轮询、SSE和Websocket

    服务器推送技术 服务器推送技术干嘛用?就是让用户在使用网络应用的时候,不需要一遍又一遍的去手动刷新就可以及时获得更新的信息.大家平时在上各种视频网站时,对视频节目进行欢乐的吐槽和评论,会看到各种弹幕, ...

  3. 网络编程五-服务器推送技术

    目录 一.服务器推送技术 1.服务器推送技术的兴起 2.应用场景 二.Ajax短轮询 1.定义 2.特点 三.Comet 3.1 AJAX 的长轮询 1.定义 2.特点 3.2 SSE 1.定义 2. ...

  4. java web 服务器推送技术--comet4j

    1.背景 首先实现服务器推送技术一直一来是B/S应用开发的一块难题,因为是基于HTTP协议的,HTTP协议为无状态,单向性的协议,即,必须由客户端发起一个请求建立连接,服务器接收请求,把数据返回给客户 ...

  5. 深入了解 cometd的服务器推送技术

    简介:服务器推送技术已经出来一段时间了,业界上也有不少基于这种技术(应该说是设计模式)的开源实现,但是要移植或者说应用到自己的项目上都比较麻烦.Dojo 这样一个大型的 Web2.0 开发框架提供了一 ...

  6. Python Web实时消息后台服务器推送技术---GoEasy

    越来越多的项目需要用到实时消息的推送与接收,怎样实现最方便呢?我这里推荐大家使用GoEasy,它是一款第三方推送服务平台,使用它的API可以轻松搞定实时推送! 浏览器兼容性:GoEasy推送 支持we ...

  7. java推送技术_java网络编程 - java服务器推送技术系列方案实战

    前言 服务器推送技术(又名Comet)是建立在ARP基础之上的一种非常实用的技术,它广泛应用于Web端,手机APP应用端等.具体很多场景都需要此技术的支撑,包括扫码登录.扫码支付.网页支付.端到端消息 ...

  8. C# Web实时消息后台服务器推送技术---GoEasy

    越来越多的项目需要用到实时消息的推送与接收,怎样实现最方便呢?我这里推荐大家使用GoEasy, 它是一款第三方推送服务平台,使用它的API可以轻松搞定实时推送! 浏览器兼容性:GoEasy推送 支持w ...

  9. ASP.NET Web实时消息后台服务器推送技术---GoEasy

    越来越多的项目需要用到实时消息的推送与接收,怎样用ASP.NET实现最方便呢?我这里推荐大家使用GoEasy, 它是一款第三方推送服务平台,使用它的API可以轻松搞定实时推送! 浏览器兼容性:GoEa ...

最新文章

  1. 造成机器学习项目失败的7个原因
  2. 分析轮子(二)- ,, (左移、右移、无符号右移)
  3. Java -- JDBC 学习--通过 ResultSet 执行查询操作
  4. java map removeall_Java删除Map中元素
  5. [Apache]安装中出现的问题
  6. 抽奖啦!量子位原创T恤任性送,夏天就要酷酷的
  7. CheckBox jsp+javaScript多项选择checkbox取值实现
  8. SharePoint 实现ajax异步加载数据的几种方式
  9. 深度探索C++对象模型读书笔记(2)
  10. 基于微信小程序的校园第二课堂活动报名系统+后台管理系统(Springboot+mysql)-JAVA.VUE【数据库设计、论文、源码、开题报告】
  11. 中国象棋棋盘java_JAVA中用程序绘制国际象棋与中国象棋棋盘
  12. 区块链重塑经济与世界
  13. Asp.NET Core+ABP框架+IdentityServer4+MySQL+Ext JS之部署到Linux
  14. 【沽泡学院07】基于ElasticSearch搜索附近的人
  15. vmware安装winxp
  16. 偏度与峰度(附python代码)
  17. c语言-求两个数的最小公倍数
  18. WebView部分源码概览
  19. 群集共享卷(CSV)
  20. 兴业银行信息科技类笔试分享

热门文章

  1. Android-GnssHal层gps.xxx.so查找与加载过程分析
  2. JVM 问题排查分析下篇(案例实战)
  3. Java项目:SSM餐厅点餐收银管理系统
  4. 如何把mkv转成mp4?
  5. 蒲福风力等级c语言编程,风力等级划分标准(蒲福风级表)
  6. 以下选项中不能用作c语言标识符,2018年3月计算机二级考试C语言考前特训习题3...
  7. Sublime Text 4 编译 LaTeX文档后总是新打开一个 Sublime Text 4 的解决方法
  8. 饱暖思*欲,一个数藏平台看出多少lsp
  9. 05UEc++【打飞艇:飞艇的运动与生成】
  10. 以太坊:普通人的电子纹身