c++ websocket客户端_websocket使用
一、介绍
在项目开发过程中,很多时候,我们不可避免的需要实现的一个功能:
服务端实时发送信息给客户端。比如实时公告、实时订单通知、实时报警推送等等,登录后的客户端需要知道与它相关的实时信息,以便进行下一步处理。
从事服务端开发的特别是C/C++开发的技术人员都知道,客户端可以通过套接口与服务端保持套接口长连接。这样就服务端就可以实时给客户端推送信息了,但是这是针对TCP的长连接,如果是针对HTTP协议(在TCP层之上的实现了超文本协议的短链接--一般情况下短链接),实现服务端与客户端通知一般有一下两种方式:
1、HTTP轮询
一般情况下,http是短链接,也就是请求响应式的,每一次请求都对应一次回复,回复完成后连接断开,这样做的好处就是不需要保持与服务端的长连接,因为HTTP协议底层还是TCP协议,服务端根据机器性能都有一个最大的套接口连接数限制。以windows为例子,如windows下TCP连接数受多个参数影响:
最大tcp连接数
[HKEY_LOCAL_MACHINE\System\CurrentControlSet\Services\Tcpip\Parameters]TcpNumConnections = 0x00fffffe (Default = 16,777,214)
以上注册表信息配置单机的最大允许的TCP连接数,默认为 16M。这个数值看似很大,这个并不是限制最大连接数的唯一条件,还有其他条件会限制到TCP 连接的最大连接数。
最大动态端口数
TCP客户端和服务器连接时,客户端必须分配一个动态端口,默认情况下这个动态端口的分配范围为 1024-5000 ,也就是说默认情况下,客户端最多可以同时发起3977个Socket连接。我们可以修改如下注册表来调整这个动态端口的范围.
[HKEY_LOCAL_MACHINE\System\CurrentControlSet\Services\Tcpip\Parameters]MaxUserPort = 5000 (Default = 5000, Max = 65534)
最大调整值65535,也就是最大能有6w多tcp连接。
最大TCB数量
系统为每个TCP连接分配一个TCP控制块(TCP control block or TCB),这个控制块用于缓存TCP连接的一些参数,每个TCB需要分配 0.5 KB的pagepool 和 0.5KB 的Non-pagepool,也就说,每个TCP连接会占用 1KB 的系统内存。换句话说TCP的连接数也受到系统的内存的限制。系统的最大TCB数量由如下注册表设置决定:
[HKEY_LOCAL_MACHINE\System\CurrentControlSet\Services\Tcpip\Parameters]MaxFreeTcbs = 2000 (Default = RAM dependent, but usual Pro = 1000, Srv=2000)
非Server版本,MaxFreeTcbs 的默认值为1000(64M 以上物理内存),Server 版本,这个的默认值为 2000。也就是说,默认情况下,Server版本最多同时可以建立并保持2000个TCP连接。
最大TCB Hash table数量
TCB是通过Hash table来管理的,下面注册表设置决定了这个Hash table的大小
HKEY_LOCAL_MACHINE \System \CurrentControlSet \services \Tcpip \Parameters]MaxHashTableSize = 512 (Default = 512, Range = 64-65536)
指明分配pagepool内存的数量,也就是说,如果MaxFreeTcbs = 1000, 则pagepool的内存数量为500KB那么 MaxHashTableSize应大于500才行。这个数量越大,则Hash table的冗余度就越高,每次分配和查找TCP.这里 MaxHashTableSize被配置为比MaxFreeTcbs大4倍,这样可以大大增加TCP建立的速度。
知道了底层TCP限制之后,我们可以知道实际上长连接在普通的windows机器上最多大概是1000路左右,也就并发是1000个http长连接。如果将长连接改为http短链接,http请求完成后立即释放,那么服务端的并发就会大大增加,如果请求速度不太耗时,服务端的并发量有可能达到1w或者更大!!!c
使用HTTP轮询,就是使用HTTP短链接模式,定期与服务端进行通信主动获取服务端信息的方式实现服务端“推送”信息至客户端,它有如下特点:
避免与服务端的长连接,减低服务端压力,提升服务端的并发访问能力
客户端主动与服务端通信,需要定期与服务端进行轮询查询获取信息,但对客户端而言存在延迟,延迟时间最大为轮询时间。
服务端需要做额外的工作包保存一些实时数据,等待客户端拉取。
2、websocket
http长轮询因为存在信息延迟的问题,有时候,我们需要实时收到服务端推送的信息就无法避免使用websocket了。在前面我已经说到,websocket实际上也是http升级upgrade之后的tcp长连接,长连接的数量限制经过调整后最大能有(65525-1024)= 64501个长连接(在内存、句柄数等不设限情况下)。但实际测试,可能服务端的websocket连接数可能维持的2w左右(经过实际测试),如果改为云主机,连接数可能达到6w左右。如果需要更多了连接,我们可以考虑集群的方式,如n台高性能机器能支持最大n*6w的websocket连接!!它有如下优点:
WebSocket一次握手就可以使客户端和服务端建立长连接,并进行双向数据传输
服务端可主动向客户端发送信息,实时性很高
与HTTP协议比起来,WebSocket协议每次数据传输的头信息都较小,节约带宽
针对浏览器本身,连接后台最大的websocket数量也是有限制的,以下是我搜索到的各个浏览器支持的最大websocket连接数:
IE 6个chrome 256个Firefox 200个safari 1273个(MAC版本)
超过各个浏览器最大数,后台就收不到请求。
二、 websocket实现
说明了原理之后,接下来就是如何实现websocket,这里我提供了几种实现方式
1、J2EE7自带的原始实现
服务端实现
WebSocket是JavaEE7新支持的,Javax.websocket.server包含注解、类、接口用于创建和配置服务端点;Javax.websocket包则包含服务端点和客户断电公用的注解、类、接口、异常,创建一个注解式的端点,将自己的写的类以及类中的一些方法用前面提到的包中的注解装饰,这里我提供了一个基本的websocket实现接口,提供了连接、关闭、接收消息、发送消息接口:
package com.easystudy.websocket;
import java.io.IOException;import javax.websocket.Session;import lombok.extern.slf4j.Slf4j;
/** * @欢迎加入群聊,一起分享,一起合作,一起进步 * QQ交流群:961179337 * 微信账号:lixiang6153 * 微信公众号:IT技术快餐 * 电子邮箱:lixx2048@163.com */@Slf4jpublic abstract class BaseWS { /** * 终端初次连接 * @param userId userId * @param session session * @throws IOException IOException */ abstract void onOpen(Session session, Long userId) throws IOException;
/** * 终端断开连接 */ abstract void onClose();
/** * 终端传递参数 * @param session session * @param message message */ abstract void onMessage(String message, Session session);
/** * 报错 * @param session session * @param error error */ abstract void onError(Session session, Throwable error);
/** * 向终端发送 * @param message message * @throws IOException IOException */ abstract void sendMessage(String message) throws IOException;
void heartBeat(Long user, String signal, Session session) { if ("ping".equalsIgnoreCase(signal)) { try { log.info("heart beat=====> {},user:{}, sessionId:{}", signal, user, session.getId()); session.getBasicRemote().sendText("pong"); log.info("heart beat<====> {},user:{}, sessionId:{}", "pong", user, session.getId()); } catch (IOException e) { e.printStackTrace(); } } }}
websocket端点实现:
package com.easystudy.websocket;
import java.io.IOException;import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.OnClose;import javax.websocket.OnError;import javax.websocket.OnMessage;import javax.websocket.OnOpen;import javax.websocket.Session;import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;
import org.springframework.stereotype.Component;
/** * @ServerEndpoint 该注解可以将类定义成一个WebSocket服务器端, * @OnOpen 表示有浏览器链接过来的时候被调用 * @OnClose 表示浏览器发出关闭请求的时候被调用 * @OnMessage 表示浏览器发消息的时候被调用 * @OnError 表示报错了 * @欢迎加入群聊,一起分享,一起合作,一起进步 * QQ交流群:961179337 * 微信账号:lixiang6153 * 微信公众号:IT技术快餐 * 电子邮箱:lixx2048@163.com */@Component@ServerEndpoint("/ws/msg/{userid}")public class MessageEndPoint extends BaseWS { // concurrent包下线程安全的Set private static final CopyOnWriteArraySet SESSIONS = new CopyOnWriteArraySet<>(); private Session session; @Override @OnOpen public void onOpen(Session session, @PathParam("userid") Long userid) { this.session = session; SESSIONS.add(this); System.out.println(String.format(userid + "成功建立连接~ 当前总连接数为:%s", SESSIONS.size())); System.out.println(this); } @Override @OnClose public void onClose() { SESSIONS.remove(this); System.out.println(String.format("成功关闭连接~ 当前总连接数为:%s", SESSIONS.size())); } @Override @OnMessage public void onMessage(String message, Session session) { System.out.println("收到客户端【" +session.getId()+ "】消息:" + message); } @Override @OnError public void onError(Session session, Throwable error) { System.out.println("发生错误"); error.printStackTrace(); } /** * 指定发消息 * @param message */ public void sendMessage(String message) { try { this.session.getBasicRemote().sendText(message); } catch (IOException e) { e.printStackTrace(); } } /** * 群发消息: 静态方法 * @param message */ public static void fanoutMessage(String message) { SESSIONS.forEach(ws -> ws.sendMessage(message)); }}
我们监听的端点是
/ws/msg/{userid}
端点携带了一个参数userid,表示长连接的用户id,我们在对应的方法实现中通过注解引用对应参数即可。
我们使用J2EE7标准注解,必须注入相应的ServerEndpointExporter类:
@Configurationpublic class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); }}
最后,我们提供一个controller用于测试服务端往发送客户端信息:
package com.easystudy.controller;
import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;
import com.easystudy.websocket.MessageEndPoint;
@RestController@RequestMapping("/response")public class TestController {
@GetMapping("/send") public String reponseMsgToClient(@RequestParam(name="content", required = true)String content){ System.out.println("发送消息:[" + content + "]给客户端!");
MessageEndPoint.fanoutMessage(content);
return "消息【" +content+ "】发送成功!"; }}
客户端实现
在实现websocket服务端之后,我们就需要实现websocket客户端,连接到服务器,接收服务端消息,实现代码如下所示:
websocket测试
WebSocket Demo
服务器回复内容:
发送
启动浏览器,打开控制台,可以看到连接到服务器字样,输入内容,点击发送,服务端后台打印接收到客户端信息并广播到客户端,客户端控制台也打印了相同字样。
2、springboot实现
除了J2EE原始实现之外,使用springboot之后,功能就更强大了,它提供了一个核心的配置类WebSocketConfigurer用于注册各种websocket端点、拦截器、处理器信息。如下我们通过继承WebSocketConfigurer配置对应端点、处理器和拦截器:
package com.easystudy.config;
import org.springframework.context.annotation.Configuration;import org.springframework.web.socket.config.annotation.EnableWebSocket;import org.springframework.web.socket.config.annotation.WebSocketConfigurer;import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import com.easystudy.websocket.MsgWebSocketHandler;import com.easystudy.websocket.MsgWebSocketInterceptor;
@Configuration@EnableWebSocketpublic class WebsocketConfig implements WebSocketConfigurer {
@Override public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { // 设置端点连接路径和处理器 registry.addHandler(new MsgWebSocketHandler(), "/ws/msg/{userid}") .setAllowedOrigins("*") // 设置拦截器 .addInterceptors(new MsgWebSocketInterceptor()); }
}
我们配置了自己的处理器处理对应端点、配置了拦截器进行信息拦截。
我这里的拦截器主要拦截请求参数,限定请求参数必须携带用户名作为连接的唯一标识:
package com.easystudy.websocket;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import org.springframework.http.server.ServerHttpRequest;import org.springframework.http.server.ServerHttpResponse;import org.springframework.http.server.ServletServerHttpRequest;import org.springframework.web.socket.WebSocketHandler;import org.springframework.web.socket.server.HandshakeInterceptor;
/** * 自定义拦截器拦截WebSocket请求 * @author Administrator * QQ交流群:961179337 * 微信账号:lixiang6153 * 微信公众号:IT技术快餐 * 电子邮箱:lixx2048@163.com */public class MsgWebSocketInterceptor implements HandshakeInterceptor{
/** * 前置拦截一般用来注册用户信息,绑定 WebSocketSession */ @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) throws Exception { System.out.println("前置拦截~~"); if (!(request instanceof ServletServerHttpRequest)) return true; // 获取用户名信息 HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest(); String path = servletRequest.getServletPath(); System.out.println("path:" + path); String userName = servletRequest.getParameter("userName"); //String userName = (String) servletRequest.getSession().getAttribute("userName"); if (null == userName) { userName = "lixx"; } // 保存属性到session属性信息中 attributes.put("userName", userName); return true; } /** * 后置拦截器 */ @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { System.out.println("后置拦截~~"); }}
拦截器获取到对应属性之后存入到session的会话属性之中,连接之后可以通过session获取会话属性。
处理器实现:
package com.easystudy.websocket;
import org.springframework.web.socket.CloseStatus;import org.springframework.web.socket.WebSocketHandler;import org.springframework.web.socket.WebSocketMessage;import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.*;import java.io.IOException;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;
/** * QQ交流群:961179337 * 微信账号:lixiang6153 * 微信公众号:IT技术快餐 * 电子邮箱:lixx2048@163.com */public class MsgWebSocketHandler implements WebSocketHandler{ private static final Map SESSIONS = new ConcurrentHashMap<>(); /** * 建立新的 socket 连接后回调的方法 */ @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { String userName = session.getAttributes().get("userName").toString(); SESSIONS.put(userName, session); System.out.println(String.format("成功建立连接~ userName: %s", userName)); } /** * 连接关闭时,回调的方法 */ @Override public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception { System.out.println("连接已关闭,status:" + closeStatus); } /** * 接收客户端发送的 Socket */ @Override public void handleMessage(WebSocketSession session, WebSocketMessage> message) throws Exception { String msg = message.getPayload().toString(); System.out.println("接收到消息:" + msg); } /** * 连接出错时,回调的方法 */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { System.out.println("连接出错"); if (session.isOpen()) { session.close(); } } /** * 这个是 WebSocketHandler是否处理部分消息,返回 false就完事了 */ @Override public boolean supportsPartialMessages() { return false; } /** * 指定发消息 * @param userName * @param message */ public static void sendMessage(String userName, String message) { WebSocketSession webSocketSession = SESSIONS.get(userName); if (webSocketSession == null || !webSocketSession.isOpen()) return; try { webSocketSession.sendMessage(new TextMessage(message)); } catch (IOException e) { e.printStackTrace(); } } /** * 群发消息 * @param message */ public static void fanoutMessage(String message) { SESSIONS.keySet().forEach(us -> sendMessage(us, message)); }}
在建立连接之后,我们通过:session.getAttributes().get("userName").toString();获取到连接时候提供的用户参数,用于后续指定用户P2P发送信息。
客户端实现代码如下:
测试
WebSocket Demo
服务器回复内容:
发送
最后测试发送信息如下:
3、socketJS实现
一些浏览器中缺少对WebSocket的支持,因此,回退选项是必要的,而Spring框架提供了基于SockJS协议的透明的回退选项。SockJS的一大好处在于提供了浏览器兼容性。优先使用原生WebSocket,如果在不支持websocket的浏览器中,会自动降为轮询的方式。
SockJS是一个浏览器JavaScript库,它提供了一个类似于网络的对象。SockJS提供了一个连贯的、跨浏览器的Javascript API,它在浏览器和web服务器之间创建了一个低延迟、全双工、跨域通信通道。除此之外,spring也对socketJS提供了支持。此处实现与springboot实现相似,这里不具体介绍,只给出对应代码,实现如下(后续提供的代码是实际项目上的代码,请各位保持修改个更新,谢谢!!!)。
端点、拦截器、通道等配置如下
package com.donwait.websocket;
import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.messaging.simp.config.ChannelRegistration;import org.springframework.messaging.simp.config.MessageBrokerRegistry;import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;import org.springframework.web.socket.config.annotation.StompEndpointRegistry;import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;
/** * 同 HTTP 在 TCP 套接字上添加 请求-响应 模型层一样,STOMP 在 WebSocket 之上提供了一个基于 帧的线路格式层,用来定义消息语义; * (STOMP在 WebSocket 之上提供了一个基于 帧的线路格式层,用来定义消息语义) * STOMP 帧:该帧由命令,一个或多个 头信息 以及 负载所组成。如下就是发送 数据的一个 STOMP帧: * SEND * destination:/app/marco * content-length:20 * * {\"message\":\"Marco!\"} * * 分析: * A1)SEND:STOMP命令,表明会发送一些内容; * A2)destination:头信息,用来表示消息发送到哪里; * A3)content-length:头信息,用来表示 负载内容的 大小; * A4)空行: * A5)帧内容(负载)内容: */@Configuration@EnableWebSocketMessageBroker // 能够在 WebSocket 上启用 STOMPpublic class WebSocketAutoConfig implements WebSocketMessageBrokerConfigurer { /* * 将 "/dys" 路径 注册为 STOMP 端点,即客户端在订阅或发布消息 到目的地址前,要连接该端点, * 就是说用户发送请求 url='/项目名/dys'与 STOMP server进行连接,之后再转发到订阅url * 端点的作用:客户端在订阅或发布消息 到目的地址前,要连接该端点 * 备注:client连接地址和发送地址是不同的,以本例为例,前者是/项目名/dys, 后者是/项目名/app/XX,先连接后发送 */ @Override public void registerStompEndpoints(StompEndpointRegistry registry) { // 在网页上我们就可以通过这个链接 /demon/websocket ==来和服务器的WebSocket连接 // 连接:new SockJS("http://127.0.0.1:7019/websocket/dys"); registry.addEndpoint("/dys") // 开启 /dys端点 .setAllowedOrigins("*") // 允许跨域访问 .setHandshakeHandler(new HandshakeHandler()) // 握手处理器 .addInterceptors(new HandshakeInterceptor()) // 握手拦截器 .withSockJS(); // 允许使用socketJs方式访问 } /* * 消息传输参数配置 */ @Override public void configureWebSocketTransport(WebSocketTransportRegistration registry) { registry.setMessageSizeLimit(8192) // 设置消息字节数大小 .setSendBufferSizeLimit(8192) // 设置消息缓存大小 .setSendTimeLimit(10000); // 设置消息发送时间限制毫秒 } /* * 输入通道参数设置 */ @Override public void configureClientInboundChannel(ChannelRegistration registration) { registration.taskExecutor().corePoolSize(8) // 设置消息输入通道的线程池线程数 .maxPoolSize(16) // 最大线程数 .keepAliveSeconds(60); // 线程活动时间 registration.interceptors(createUserInterceptor()); // 注入用户入站通道拦截器 } /* * 输出通道参数设置 */ @Override public void configureClientOutboundChannel(ChannelRegistration registration) { registration.taskExecutor().corePoolSize(8) .maxPoolSize(16); } /* * 配置broker: * 配置了一个 简单的消息代理。如果不重载,默认case下,会自动配置一个简单的内存消息代理, * 用来处理 "/topic"为前缀的消息。但经过重载后,消息代理将会处理前缀为 "/topic" and "/queue"消息 * 分析: * (1)应用程序的目的地 以 "/app" 为前缀,而代理的目的地以 "/topic" 和 "/queue" 作为前缀 * (2)以应用程序为目的地的消息将会直接路由到 带有 @MessageMapping注解的控制器方法中 * (3)而发送到代理上的消息,包括 @MessageMapping注解方法的返回值所形成的消息,将会路由到代理上,并最终发送到订阅这些目的地客户端 */ @Override public void configureMessageBroker(MessageBrokerRegistry registry) { // 代理的目的地址为topic或queque(代理目的地以 /topic为前缀) // 广播消息订阅:stompClient.subscribe('/topic/alarm', function (response) registry.enableSimpleBroker("/topic", "/queue"); // 全局使用的消息前缀(客户端订阅路径上会体现出来):应用程序前缀:js.url = "/demon/app/hello" -> @MessageMapping("/hello") 注释的方法. // 客户端发送端点前缀:stompClient.send("/app/hello", {}, JSON.stringify({ 'name': name })); registry.setApplicationDestinationPrefixes("/app"); // 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/ // registry.setUserDestinationPrefix("/user/"); /* // 启用了STOMP代理中继功能,并将其代理目的地前缀设置为 /topic and /queue,并将所有目的地前缀为 "/topic" or "/queue"的消息都会发送到STOMP代理中[真正消息代理activeMQ或RabbitMQ] registry.enableStompBrokerRelay("/topic", "/queue") // 设置可以订阅的地址,也就是服务器可以发送的地址 .setRelayHost("192.168.12.18") .setRelayPort(5672) .setClientLogin("admin") .setClientPasscode("admin") .setSystemHeartbeatReceiveInterval(2000) // 设置心跳信息接收时间间隔 .setSystemHeartbeatSendInterval(2000); // 设置心跳信息发送时间间隔 // 应用程序前缀:js.url = "/demon/app/hello" -> @MessageMapping("/hello") 注释的方法. registry.setApplicationDestinationPrefixes("/app"); */ } /** * * @Title: createUserInterceptor * @Description: 将客户端渠道拦截器加入spring ioc容器 * @return */ @Bean public UserInterceptor createUserInterceptor() { return new UserInterceptor(); }}
握手拦截器配置:
package com.donwait.websocket;
import java.util.Map;
import org.springframework.http.server.ServerHttpRequest;import org.springframework.http.server.ServerHttpResponse;import org.springframework.web.socket.WebSocketHandler;import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;
import lombok.extern.slf4j.Slf4j;
@Slf4jpublic class HandshakeInterceptor extends HttpSessionHandshakeInterceptor {
@Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,Map attributes) throws Exception { log.info("============握手前==========="); /* // 解决The extension [x-webkit-deflate-frame] is not supported问题 if(request.getHeaders().containsKey("Sec-WebSocket-Extensions")) { request.getHeaders().set("Sec-WebSocket-Extensions", "permessage-deflate"); } // 检查session的值是否存在 if (request instanceof ServletServerHttpRequest) { ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request; HttpSession session = servletRequest.getServletRequest().getSession(false); String accountId = (String) session.getAttribute(Constants.SKEY_ACCOUNT_ID); //把session和accountId存放起来 attributes.put(Constants.SESSIONID, session.getId()); attributes.put(Constants.SKEY_ACCOUNT_ID, accountId); } */ return super.beforeHandshake(request, response, wsHandler, attributes); } @Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,Exception ex) { log.info("============握手后==========="); super.afterHandshake(request, response, wsHandler, ex); }}
用户拦截器配置:
package com.donwait.websocket;
import java.util.LinkedList;import java.util.List;import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.simp.SimpMessageHeaderAccessor;import org.springframework.messaging.simp.stomp.StompCommand;import org.springframework.messaging.simp.stomp.StompHeaderAccessor;import org.springframework.messaging.support.ChannelInterceptorAdapter;import org.springframework.messaging.support.MessageHeaderAccessor;
import com.donwait.amqp.RabbitMQ;import com.donwait.model.RtmpInviteInfo;import com.donwait.model.User;import com.donwait.protobuf.RTMP_INVITE_PARAM;import com.donwait.redis.RtmpInviteService;
/** * @ClassName: UserInterceptor * @Description: 客户端渠道拦截适配器 */@SuppressWarnings("deprecation")public class UserInterceptor extends ChannelInterceptorAdapter { @Autowired private RtmpInviteService redisRtmpInviteService; @Autowired private RabbitMQ rabbitMQ; //@Autowired //private UserCacheService userCacheService;
/** * 获取包含在stomp中的用户信息 */ @SuppressWarnings("rawtypes") @Override public Message> preSend(Message> message, MessageChannel channel) { StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (StompCommand.CONNECT.equals(accessor.getCommand())) { Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS); if (raw instanceof Map) { Object name = ((Map) raw).get("name"); if (name instanceof LinkedList) { // 设置当前访问器的认证用户 accessor.setUser(new User(((LinkedList) name).get(0).toString())); } } } return message; }
@Override public void postSend(Message> message, MessageChannel channel, boolean sent) { StompHeaderAccessor sha = StompHeaderAccessor.wrap(message); // ignore non-STOMP messages like heartbeat messages if(sha.getCommand() == null) { return; }
//这里的sessionId和accountId对应HttpSessionIdHandshakeInterceptor拦截器的存放key //String sessionId = sha.getSessionAttributes().get(Constants.SESSIONID).toString(); //String accountId = sha.getSessionAttributes().get(Constants.SKEY_ACCOUNT_ID).toString(); //判断客户端的连接状态 switch(sha.getCommand()) { case CONNECT: connect(sha); break; case CONNECTED: break; case DISCONNECT: disconnect(sha); break; default: break; } }
// 连接成功 private void connect(StompHeaderAccessor sha){ System.out.println(" STOMP 连接成功:" + sha.getUser().getName()); }
// 断开连接 private void disconnect(StompHeaderAccessor sha){ System.out.println(" STOMP 连接断开" + sha.getUser().getName());
// 移除用户信息 //userCacheService.delete(sha.getUser().getName());
String strKey = String.format("rtmp_invite_info::%s_*", sha.getUser().getName()); List invite_list = redisRtmpInviteService.findByKeyEx(strKey); if (invite_list != null) { for(RtmpInviteInfo rtmpInviteInfo : invite_list){ // 通知接入服务器 RTMP_INVITE_PARAM.Builder builder = RTMP_INVITE_PARAM.newBuilder(); builder.setRtmpIP(rtmpInviteInfo.getRtmpIp()); builder.setRtmpPort(rtmpInviteInfo.getRtmpPort()); builder.setDevID(rtmpInviteInfo.getDevId()); builder.setProtocolType(rtmpInviteInfo.getProtoType()); builder.setStreamType(rtmpInviteInfo.getStreamType()); rabbitMQ.send(rtmpInviteInfo.getExchangeName(), rtmpInviteInfo.getRouteKey(), builder.build().toByteArray()); strKey = String.format("%s::%s_%s_%d_%d_%d", rtmpInviteInfo.getCacheName(), sha.getUser().getName(), rtmpInviteInfo.getDevId(), rtmpInviteInfo.getChannelNum().longValue(), rtmpInviteInfo.getProtoType().longValue(), rtmpInviteInfo.getStreamType().longValue()); redisRtmpInviteService.deleteByKey(strKey); } } }}
处理器代码:
package com.donwait.websocket;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4jpublic class HandshakeHandler extends DefaultHandshakeHandler{
public HandshakeHandler(){ log.debug("new HandshakeHandler"); }}
配置完成之后,需要封装一个消息服务实现点对点和广播形式发送:
package com.donwait.service;
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.simp.SimpMessageSendingOperations;import org.springframework.messaging.simp.user.SimpUser;import org.springframework.messaging.simp.user.SimpUserRegistry;import org.springframework.stereotype.Service;
/** * websocket广播推送服务 * @author Administrator * */@Servicepublic class MessageService { @Autowired SimpMessageSendingOperations sendOperation; // 消息发送模板 @Autowired private SimpUserRegistry userRegistry; // 用户列表【连接的客户端信息】
/** * 广播形式发送报警信息 * @param */ public void broadcast(String destination,String message) { sendOperation.convertAndSend(destination, message); System.out.println("路由:"+ destination + " 推送消息:" + message); }
/** * 单独发送信息给某用户 * 客户端发起连接时候必须携带用户名参数 * stompClient.connect( * { * name: 'lixx' // 携带客户端信息 * } * @param */ public void send(String destination,String username, String message) { for (SimpUser user : userRegistry.getUsers()) { if (user.getName().equals(username)){ sendOperation.convertAndSendToUser(username, destination, message); System.out.println("路由:"+ destination + " 推送消息:" + message); break; } } }}
最后,送上测试的html客户端页面:
stomp
Welcome
发送消息订阅用户消息/user/queue/message订阅报警消息/topic/alarm
至此,websocket的具体介绍与实例都已送上,如果需要源码或者技术交流或者合作请联系一下方式
源码获取、合作、技术交流请获取如下联系方式:
QQ交流群:961179337
微信账号:lixiang6153
公众号:IT技术快餐
电子邮箱:lixx2048@163.com
c++ websocket客户端_websocket使用相关推荐
- netty websocket客户端_Websocket操作字节序 之 服务端
Websocket在JavaScript中操作字节序 之 客户端 在上一篇文章中,把页面的websocket编码写好了,那么服务端又该如何实现呢?由于该文是在上上篇demo中修改的,所以不全的代码还请 ...
- c++ websocket客户端_WebSocket协议详解与c++amp;c#实现
摘要: 随着手机游戏.H5游戏以及微信小游戏的普及,越来越多的客户端-服务器端的通讯采用websocket协议.Websocket协议是全双工的.基于数据帧的.建立在tcp之上的长连接协议.Webso ...
- 火币网行情获取的websocket客户端
从验证结果看应该是网络关闭了,不过程序写的不错,可以作为其它websocket客户端的测试程序 # !/usr/bin/env python # -*- coding: utf-8 -*- # aut ...
- c++ websocket客户端_你要的websocket都在这,收好不谢~~~
此号已经沉寂多时,似乎已经忘了上一次更新是什么时候了!这一次重拾旧爱,希望能够一直保持下去,坚持写作,快乐你我他 今天的主题是websocket,相信搞研发的兄弟对websocket并不陌生,都202 ...
- webscoket绑定php uid,Think-Swoole之WebSocket客户端消息解析与使用SocketIO处理用户UID与fd关联...
WebSocket 客户端消息的解析 前面我们演示了当客户端连接服务端,会触发连接事件,事件中我们要求返回当前客户端的 fd.当客户端发送消息给服务端,服务端会根据我们的规则将消息发送给指定 fd 的 ...
- 基于Boost::beast模块的异步WebSocket客户端
基于Boost::beast模块的异步WebSocket客户端 实现功能 C++实现代码 实现功能 基于Boost::beast模块的异步WebSocket客户端 C++实现代码 #include & ...
- 基于Boost::beast模块的协程WebSocket客户端
基于Boost::beast模块的协程WebSocket客户端 实现功能 C++实现代码 实现功能 基于Boost::beast模块的协程WebSocket客户端 C++实现代码 #include & ...
- 基于Boost::beast模块的同步WebSocket客户端
Boost:基于Boost::beast模块的同步WebSocket客户端 实现功能 C++实现代码 实现功能 基于Boost::beast模块的同步WebSocket客户端 C++实现代码 #inc ...
- netty系列之:使用netty搭建websocket客户端
文章目录 简介 浏览器客户端 netty对websocket客户端的支持 WebSocketClientHandshaker WebSocketClientCompressionHandler net ...
最新文章
- 微软终于想通把Script56文档更新了
- Sublime text 2/3 中 Package Control 的安装与使用方法
- 【深度学习】快照集成等网络训练优化算法系列
- 文件系统写入100G文件需要多久
- idea设置java scala等代码自动换行
- 傅里叶变换的更多性质:相位展开、零相位窗等
- jmeter JDBC Request
- 区块链软件:区块链的迅猛发展
- 简述 Java 垃圾回收机制
- 诺基亚pc远程服务器,用远程桌面把win10装进iphone —-40核256G内存的生产力工具随身带...
- k-平均算法(k-means算法)(k均值算法)例题
- MessageDigest实现MD5加密算法
- evga x58服务器芯片组,EVGA发布X58主板 首次涉足Intel芯片组
- 什么是P = NP?问题
- Linux WiFi使用
- ORA-01652: 无法通过 128 (在表空间 LTE_PM_TEMP 中) 扩展 temp 段
- scratch编程石头剪刀布
- 计算机音乐文献,论音乐文献计算机编郭小株.pdf
- 本地直播平台的搭建—四种方式(转载)
- Quartus II开发软件中的宏模块 (转摘)
热门文章
- linux中权限765啥意思,Linux中的文件权限
- pat乙级相当于什么水平_雅思6.5是什么水平?相当于托福多少分?
- python案例实操_用案例实操学习Python ,培养编程逻辑思维
- esxi挂载Linux的nfs盘,ESXi安装centos7挂载群晖NFS
- 在c语言程序中 对文件进行操作首先要,《C语言程序设计》试题八及答案
- linux 获取设备树源文件(dts)里描述的资源,Linux 获取设备树源文件(DTS)里描述的资源...
- linux 调优系列(续)
- 深度学习框架PyTorch一书的学习-第四章-神经网络工具箱nn
- 使用Mybatis Generator结合Ant脚本快速自动生成Model、Mapper等文件的方法
- Java高质量代码之 — 泛型与反射