websocket使用

一、介绍

在项目开发过程中,很多时候,我们不可避免的需要实现的一个功能:

服务端实时发送信息给客户端。比如实时公告、实时订单通知、实时报警推送等等,登录后的客户端需要知道与它相关的实时信息,以便进行下一步处理。

从事服务端开发的特别是C/C++开发的技术人员都知道,客户端可以通过套接口与服务端保持套接口长连接。这样就服务端就可以实时给客户端推送信息了,但是这是针对TCP的长连接,如果是针对HTTP协议(在TCP层之上的实现了超文本协议的短链接--一般情况下短链接),实现服务端与客户端通知一般有一下两种方式:

1、HTTP轮询

一般情况下,http是短链接,也就是请求响应式的,每一次请求都对应一次回复,回复完成后连接断开,这样做的好处就是不需要保持与服务端的长连接,因为HTTP协议底层还是TCP协议,服务端根据机器性能都有一个最大的套接口连接数限制。以windows为例子,如windows下TCP连接数受多个参数影响:

  • 最大tcp连接数

[HKEY_LOCAL_MACHINE\System\CurrentControlSet\Services\Tcpip\Parameters]TcpNumConnections = 0x00fffffe (Default = 16,777,214)

以上注册表信息配置单机的最大允许的TCP连接数,默认为 16M。这个数值看似很大,这个并不是限制最大连接数的唯一条件,还有其他条件会限制到TCP 连接的最大连接数。

  • 最大动态端口数
    TCP客户端和服务器连接时,客户端必须分配一个动态端口,默认情况下这个动态端口的分配范围为 1024-5000 ,也就是说默认情况下,客户端最多可以同时发起3977个Socket连接。我们可以修改如下注册表来调整这个动态端口的范围.

[HKEY_LOCAL_MACHINE\System\CurrentControlSet\Services\Tcpip\Parameters]MaxUserPort = 5000 (Default = 5000, Max = 65534)

最大调整值65535,也就是最大能有6w多tcp连接

  • 最大TCB数量
    系统为每个TCP连接分配一个TCP控制块(TCP control block or TCB),这个控制块用于缓存TCP连接的一些参数,每个TCB需要分配 0.5 KB的pagepool 和 0.5KB 的Non-pagepool,也就说,每个TCP连接会占用 1KB 的系统内存。换句话说TCP的连接数也受到系统的内存的限制。系统的最大TCB数量由如下注册表设置决定:

[HKEY_LOCAL_MACHINE\System\CurrentControlSet\Services\Tcpip\Parameters]MaxFreeTcbs = 2000 (Default = RAM dependent, but usual Pro = 1000, Srv=2000)

非Server版本,MaxFreeTcbs 的默认值为1000(64M 以上物理内存),Server 版本,这个的默认值为 2000。也就是说,默认情况下,Server版本最多同时可以建立并保持2000个TCP连接

  • 最大TCB Hash table数量
    TCB是通过Hash table来管理的,下面注册表设置决定了这个Hash table的大小

HKEY_LOCAL_MACHINE \System \CurrentControlSet \services \Tcpip \Parameters]MaxHashTableSize = 512 (Default = 512, Range = 64-65536)

指明分配pagepool内存的数量,也就是说,如果MaxFreeTcbs = 1000, 则pagepool的内存数量为500KB那么 MaxHashTableSize应大于500才行。这个数量越大,则Hash table的冗余度就越高,每次分配和查找TCP.这里 MaxHashTableSize被配置为比MaxFreeTcbs大4倍,这样可以大大增加TCP建立的速度。

知道了底层TCP限制之后,我们可以知道实际上长连接在普通的windows机器上最多大概是1000路左右,也就并发是1000个http长连接。如果将长连接改为http短链接,http请求完成后立即释放,那么服务端的并发就会大大增加,如果请求速度不太耗时,服务端的并发量有可能达到1w或者更大!!!c
    使用HTTP轮询,就是使用HTTP短链接模式,定期与服务端进行通信主动获取服务端信息的方式实现服务端“推送”信息至客户端,它有如下特点:

  • 避免与服务端的长连接,减低服务端压力,提升服务端的并发访问能力

  • 客户端主动与服务端通信,需要定期与服务端进行轮询查询获取信息,但对客户端而言存在延迟,延迟时间最大为轮询时间。

  • 服务端需要做额外的工作包保存一些实时数据,等待客户端拉取。

2、websocket

http长轮询因为存在信息延迟的问题,有时候,我们需要实时收到服务端推送的信息就无法避免使用websocket了。在前面我已经说到,websocket实际上也是http升级upgrade之后的tcp长连接,长连接的数量限制经过调整后最大能有(65525-1024)= 64501个长连接(在内存、句柄数等不设限情况下)。但实际测试,可能服务端的websocket连接数可能维持的2w左右(经过实际测试),如果改为云主机,连接数可能达到6w左右。如果需要更多了连接,我们可以考虑集群的方式,如n台高性能机器能支持最大n*6w的websocket连接!!它有如下优点:

  • WebSocket一次握手就可以使客户端和服务端建立长连接,并进行双向数据传输

  • 服务端可主动向客户端发送信息,实时性很高

  • 与HTTP协议比起来,WebSocket协议每次数据传输的头信息都较小,节约带宽

针对浏览器本身,连接后台最大的websocket数量也是有限制的,以下是我搜索到的各个浏览器支持的最大websocket连接数:

IE        6个chrome  256个Firefox 200个safari  1273个(MAC版本)

超过各个浏览器最大数,后台就收不到请求。

二、 websocket实现

说明了原理之后,接下来就是如何实现websocket,这里我提供了几种实现方式

1、J2EE7自带的原始实现

服务端实现
WebSocket是JavaEE7新支持的,Javax.websocket.server包含注解、类、接口用于创建和配置服务端点;Javax.websocket包则包含服务端点和客户断电公用的注解、类、接口、异常,创建一个注解式的端点,将自己的写的类以及类中的一些方法用前面提到的包中的注解装饰,这里我提供了一个基本的websocket实现接口,提供了连接、关闭、接收消息、发送消息接口:

package com.easystudy.websocket;

import java.io.IOException;import javax.websocket.Session;import lombok.extern.slf4j.Slf4j;

/** * @欢迎加入群聊,一起分享,一起合作,一起进步 * QQ交流群:961179337 * 微信账号:lixiang6153 * 微信公众号:IT技术快餐 * 电子邮箱:lixx2048@163.com */@Slf4jpublic abstract class BaseWS {    /**     * 终端初次连接     * @param userId  userId     * @param session session     * @throws IOException IOException     */    abstract void onOpen(Session session, Long userId) throws IOException;

    /**     * 终端断开连接     */    abstract void onClose();

    /**     * 终端传递参数     * @param session session     * @param message message     */    abstract void onMessage(String message, Session session);

    /**     * 报错     * @param session session     * @param error   error     */    abstract void onError(Session session, Throwable error);

    /**     * 向终端发送     * @param message message     * @throws IOException IOException     */    abstract void sendMessage(String message) throws IOException;

    void heartBeat(Long user, String signal, Session session) {        if ("ping".equalsIgnoreCase(signal)) {            try {                log.info("heart beat=====> {},user:{}, sessionId:{}", signal, user, session.getId());                session.getBasicRemote().sendText("pong");                log.info("heart beat<====> {},user:{}, sessionId:{}", "pong", user, session.getId());            } catch (IOException e) {                e.printStackTrace();            }        }    }}

websocket端点实现:

package com.easystudy.websocket;

import java.io.IOException;import java.util.concurrent.CopyOnWriteArraySet;

import javax.websocket.OnClose;import javax.websocket.OnError;import javax.websocket.OnMessage;import javax.websocket.OnOpen;import javax.websocket.Session;import javax.websocket.server.PathParam;import javax.websocket.server.ServerEndpoint;

import org.springframework.stereotype.Component;

/** * @ServerEndpoint 该注解可以将类定义成一个WebSocket服务器端, * @OnOpen 表示有浏览器链接过来的时候被调用 * @OnClose 表示浏览器发出关闭请求的时候被调用 * @OnMessage 表示浏览器发消息的时候被调用 * @OnError 表示报错了 * @欢迎加入群聊,一起分享,一起合作,一起进步 * QQ交流群:961179337 * 微信账号:lixiang6153 * 微信公众号:IT技术快餐 * 电子邮箱:lixx2048@163.com */@Component@ServerEndpoint("/ws/msg/{userid}")public class MessageEndPoint extends BaseWS {    // concurrent包下线程安全的Set    private static final CopyOnWriteArraySet SESSIONS = new CopyOnWriteArraySet<>();    private Session session;    @Override    @OnOpen    public void onOpen(Session session, @PathParam("userid") Long userid) {        this.session = session;        SESSIONS.add(this);        System.out.println(String.format(userid + "成功建立连接~ 当前总连接数为:%s", SESSIONS.size()));        System.out.println(this);    }    @Override    @OnClose    public void onClose() {        SESSIONS.remove(this);        System.out.println(String.format("成功关闭连接~ 当前总连接数为:%s", SESSIONS.size()));    }    @Override    @OnMessage    public void onMessage(String message, Session session) {        System.out.println("收到客户端【" +session.getId()+ "】消息:" + message);    }    @Override    @OnError    public void onError(Session session, Throwable error) {        System.out.println("发生错误");        error.printStackTrace();    }    /**     * 指定发消息     * @param message     */    public void sendMessage(String message) {        try {            this.session.getBasicRemote().sendText(message);        } catch (IOException e) {            e.printStackTrace();        }    }    /**     * 群发消息: 静态方法     * @param message     */    public static void fanoutMessage(String message) {        SESSIONS.forEach(ws -> ws.sendMessage(message));    }}

我们监听的端点是

/ws/msg/{userid}

端点携带了一个参数userid,表示长连接的用户id,我们在对应的方法实现中通过注解引用对应参数即可。

我们使用J2EE7标准注解,必须注入相应的ServerEndpointExporter类:

@Configurationpublic class WebSocketConfig {    @Bean    public ServerEndpointExporter serverEndpointExporter() {        return new ServerEndpointExporter();    }}

最后,我们提供一个controller用于测试服务端往发送客户端信息:

package com.easystudy.controller;

import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;

import com.easystudy.websocket.MessageEndPoint;

@RestController@RequestMapping("/response")public class TestController {

    @GetMapping("/send")    public String reponseMsgToClient(@RequestParam(name="content", required = true)String content){        System.out.println("发送消息:[" + content + "]给客户端!");

        MessageEndPoint.fanoutMessage(content);

        return "消息【" +content+ "】发送成功!";    }}

客户端实现
在实现websocket服务端之后,我们就需要实现websocket客户端,连接到服务器,接收服务端消息,实现代码如下所示:



websocket测试

WebSocket Demo

服务器回复内容:

发送

启动浏览器,打开控制台,可以看到连接到服务器字样,输入内容,点击发送,服务端后台打印接收到客户端信息并广播到客户端,客户端控制台也打印了相同字样。

2、springboot实现

除了J2EE原始实现之外,使用springboot之后,功能就更强大了,它提供了一个核心的配置类WebSocketConfigurer用于注册各种websocket端点、拦截器、处理器信息。如下我们通过继承WebSocketConfigurer配置对应端点、处理器和拦截器:

package com.easystudy.config;

import org.springframework.context.annotation.Configuration;import org.springframework.web.socket.config.annotation.EnableWebSocket;import org.springframework.web.socket.config.annotation.WebSocketConfigurer;import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

import com.easystudy.websocket.MsgWebSocketHandler;import com.easystudy.websocket.MsgWebSocketInterceptor;

@Configuration@EnableWebSocketpublic class WebsocketConfig implements WebSocketConfigurer {

    @Override    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {        // 设置端点连接路径和处理器        registry.addHandler(new MsgWebSocketHandler(), "/ws/msg/{userid}")                .setAllowedOrigins("*")                // 设置拦截器                .addInterceptors(new MsgWebSocketInterceptor());    }

}

我们配置了自己的处理器处理对应端点、配置了拦截器进行信息拦截。

我这里的拦截器主要拦截请求参数,限定请求参数必须携带用户名作为连接的唯一标识:

package com.easystudy.websocket;

import java.util.Map;

import javax.servlet.http.HttpServletRequest;

import org.springframework.http.server.ServerHttpRequest;import org.springframework.http.server.ServerHttpResponse;import org.springframework.http.server.ServletServerHttpRequest;import org.springframework.web.socket.WebSocketHandler;import org.springframework.web.socket.server.HandshakeInterceptor;

/** * 自定义拦截器拦截WebSocket请求 * @author Administrator * QQ交流群:961179337 * 微信账号:lixiang6153 * 微信公众号:IT技术快餐 * 电子邮箱:lixx2048@163.com */public class MsgWebSocketInterceptor implements HandshakeInterceptor{

    /**     * 前置拦截一般用来注册用户信息,绑定 WebSocketSession     */    @Override    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,            Map attributes) throws Exception {         System.out.println("前置拦截~~");         if (!(request instanceof ServletServerHttpRequest))             return true;         // 获取用户名信息         HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();         String path = servletRequest.getServletPath();         System.out.println("path:" + path);         String userName = servletRequest.getParameter("userName");         //String userName = (String) servletRequest.getSession().getAttribute("userName");         if (null == userName) {             userName = "lixx";         }         // 保存属性到session属性信息中         attributes.put("userName", userName);         return true;    }    /**     * 后置拦截器     */    @Override    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {        System.out.println("后置拦截~~");    }}

拦截器获取到对应属性之后存入到session的会话属性之中,连接之后可以通过session获取会话属性。

处理器实现:

package com.easystudy.websocket;

import org.springframework.web.socket.CloseStatus;import org.springframework.web.socket.WebSocketHandler;import org.springframework.web.socket.WebSocketMessage;import org.springframework.web.socket.WebSocketSession;

import org.springframework.web.socket.*;import java.io.IOException;import java.util.Map;import java.util.concurrent.ConcurrentHashMap;

/** * QQ交流群:961179337 * 微信账号:lixiang6153 * 微信公众号:IT技术快餐 * 电子邮箱:lixx2048@163.com */public class MsgWebSocketHandler implements WebSocketHandler{    private static final Map SESSIONS = new ConcurrentHashMap<>();    /**     * 建立新的 socket 连接后回调的方法     */    @Override    public void afterConnectionEstablished(WebSocketSession session) throws Exception {        String userName = session.getAttributes().get("userName").toString();        SESSIONS.put(userName, session);        System.out.println(String.format("成功建立连接~ userName: %s", userName));    }    /**     * 连接关闭时,回调的方法     */    @Override    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {        System.out.println("连接已关闭,status:" + closeStatus);    }    /**     * 接收客户端发送的 Socket     */    @Override    public void handleMessage(WebSocketSession session, WebSocketMessage> message) throws Exception {        String msg = message.getPayload().toString();        System.out.println("接收到消息:" + msg);    }    /**     * 连接出错时,回调的方法     */    @Override    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {        System.out.println("连接出错");        if (session.isOpen()) {            session.close();        }    }    /**     * 这个是 WebSocketHandler是否处理部分消息,返回 false就完事了     */    @Override    public boolean supportsPartialMessages() {        return false;    }    /**     * 指定发消息     * @param userName     * @param message     */    public static void sendMessage(String userName, String message) {        WebSocketSession webSocketSession = SESSIONS.get(userName);        if (webSocketSession == null || !webSocketSession.isOpen())            return;        try {            webSocketSession.sendMessage(new TextMessage(message));        } catch (IOException e) {            e.printStackTrace();        }    }    /**     * 群发消息     * @param message     */    public static void fanoutMessage(String message) {        SESSIONS.keySet().forEach(us -> sendMessage(us, message));    }}

在建立连接之后,我们通过:session.getAttributes().get("userName").toString();获取到连接时候提供的用户参数,用于后续指定用户P2P发送信息。

客户端实现代码如下:



测试

WebSocket Demo

服务器回复内容:

发送

最后测试发送信息如下:

3、socketJS实现

一些浏览器中缺少对WebSocket的支持,因此,回退选项是必要的,而Spring框架提供了基于SockJS协议的透明的回退选项。SockJS的一大好处在于提供了浏览器兼容性。优先使用原生WebSocket,如果在不支持websocket的浏览器中,会自动降为轮询的方式。

SockJS是一个浏览器JavaScript库,它提供了一个类似于网络的对象。SockJS提供了一个连贯的、跨浏览器的Javascript API,它在浏览器和web服务器之间创建了一个低延迟、全双工、跨域通信通道。除此之外,spring也对socketJS提供了支持。此处实现与springboot实现相似,这里不具体介绍,只给出对应代码,实现如下(后续提供的代码是实际项目上的代码,请各位保持修改个更新,谢谢!!!)。

端点、拦截器、通道等配置如下

package com.donwait.websocket;

import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.messaging.simp.config.ChannelRegistration;import org.springframework.messaging.simp.config.MessageBrokerRegistry;import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;import org.springframework.web.socket.config.annotation.StompEndpointRegistry;import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;

/** * 同 HTTP 在 TCP 套接字上添加 请求-响应 模型层一样,STOMP 在 WebSocket 之上提供了一个基于 帧的线路格式层,用来定义消息语义; * (STOMP在 WebSocket 之上提供了一个基于 帧的线路格式层,用来定义消息语义) * STOMP 帧:该帧由命令,一个或多个 头信息 以及 负载所组成。如下就是发送 数据的一个 STOMP帧: * SEND * destination:/app/marco * content-length:20 * * {\"message\":\"Marco!\"} * * 分析: * A1)SEND:STOMP命令,表明会发送一些内容; * A2)destination:头信息,用来表示消息发送到哪里; * A3)content-length:头信息,用来表示 负载内容的 大小; * A4)空行: * A5)帧内容(负载)内容: */@Configuration@EnableWebSocketMessageBroker       // 能够在 WebSocket 上启用 STOMPpublic class WebSocketAutoConfig implements WebSocketMessageBrokerConfigurer {    /*     * 将 "/dys" 路径 注册为 STOMP 端点,即客户端在订阅或发布消息 到目的地址前,要连接该端点,     * 就是说用户发送请求 url='/项目名/dys'与 STOMP server进行连接,之后再转发到订阅url     * 端点的作用:客户端在订阅或发布消息 到目的地址前,要连接该端点     * 备注:client连接地址和发送地址是不同的,以本例为例,前者是/项目名/dys, 后者是/项目名/app/XX,先连接后发送     */    @Override    public void registerStompEndpoints(StompEndpointRegistry registry) {        // 在网页上我们就可以通过这个链接 /demon/websocket ==来和服务器的WebSocket连接        // 连接:new SockJS("http://127.0.0.1:7019/websocket/dys");        registry.addEndpoint("/dys")                                // 开启 /dys端点                .setAllowedOrigins("*")                             // 允许跨域访问                .setHandshakeHandler(new HandshakeHandler())        // 握手处理器                .addInterceptors(new HandshakeInterceptor())        // 握手拦截器                .withSockJS();                                      // 允许使用socketJs方式访问    }    /*     * 消息传输参数配置     */    @Override    public void configureWebSocketTransport(WebSocketTransportRegistration registry) {        registry.setMessageSizeLimit(8192)                          // 设置消息字节数大小                .setSendBufferSizeLimit(8192)                       // 设置消息缓存大小                .setSendTimeLimit(10000);                           // 设置消息发送时间限制毫秒    }    /*     * 输入通道参数设置     */    @Override    public void configureClientInboundChannel(ChannelRegistration registration) {        registration.taskExecutor().corePoolSize(8)                 // 设置消息输入通道的线程池线程数                    .maxPoolSize(16)                                // 最大线程数                    .keepAliveSeconds(60);                          // 线程活动时间        registration.interceptors(createUserInterceptor());         // 注入用户入站通道拦截器    }    /*     * 输出通道参数设置     */    @Override    public void configureClientOutboundChannel(ChannelRegistration registration) {        registration.taskExecutor().corePoolSize(8)                    .maxPoolSize(16);    }    /*     * 配置broker:     * 配置了一个 简单的消息代理。如果不重载,默认case下,会自动配置一个简单的内存消息代理,     * 用来处理 "/topic"为前缀的消息。但经过重载后,消息代理将会处理前缀为 "/topic" and "/queue"消息     * 分析:     * (1)应用程序的目的地 以 "/app" 为前缀,而代理的目的地以 "/topic" 和 "/queue" 作为前缀     * (2)以应用程序为目的地的消息将会直接路由到 带有 @MessageMapping注解的控制器方法中     * (3)而发送到代理上的消息,包括 @MessageMapping注解方法的返回值所形成的消息,将会路由到代理上,并最终发送到订阅这些目的地客户端     */    @Override    public void configureMessageBroker(MessageBrokerRegistry registry) {        // 代理的目的地址为topic或queque(代理目的地以 /topic为前缀)        // 广播消息订阅:stompClient.subscribe('/topic/alarm', function (response)        registry.enableSimpleBroker("/topic", "/queue");        // 全局使用的消息前缀(客户端订阅路径上会体现出来):应用程序前缀:js.url = "/demon/app/hello" -> @MessageMapping("/hello") 注释的方法.        // 客户端发送端点前缀:stompClient.send("/app/hello", {}, JSON.stringify({ 'name': name }));        registry.setApplicationDestinationPrefixes("/app");        // 点对点使用的订阅前缀(客户端订阅路径上会体现出来),不设置的话,默认也是/user/        // registry.setUserDestinationPrefix("/user/");        /*        // 启用了STOMP代理中继功能,并将其代理目的地前缀设置为 /topic and /queue,并将所有目的地前缀为 "/topic" or "/queue"的消息都会发送到STOMP代理中[真正消息代理activeMQ或RabbitMQ]        registry.enableStompBrokerRelay("/topic", "/queue")                             // 设置可以订阅的地址,也就是服务器可以发送的地址                .setRelayHost("192.168.12.18")                .setRelayPort(5672)                .setClientLogin("admin")                .setClientPasscode("admin")                .setSystemHeartbeatReceiveInterval(2000)                                // 设置心跳信息接收时间间隔                .setSystemHeartbeatSendInterval(2000);                                  // 设置心跳信息发送时间间隔        // 应用程序前缀:js.url = "/demon/app/hello" -> @MessageMapping("/hello") 注释的方法.        registry.setApplicationDestinationPrefixes("/app");        */    }    /**     *     * @Title: createUserInterceptor     * @Description: 将客户端渠道拦截器加入spring ioc容器     * @return     */    @Bean    public UserInterceptor createUserInterceptor() {        return new UserInterceptor();    }}

握手拦截器配置:

package com.donwait.websocket;

import java.util.Map;

import org.springframework.http.server.ServerHttpRequest;import org.springframework.http.server.ServerHttpResponse;import org.springframework.web.socket.WebSocketHandler;import org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor;

import lombok.extern.slf4j.Slf4j;

@Slf4jpublic class HandshakeInterceptor extends HttpSessionHandshakeInterceptor {

    @Override    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,Map attributes) throws Exception {        log.info("============握手前===========");        /*        // 解决The extension [x-webkit-deflate-frame] is not supported问题        if(request.getHeaders().containsKey("Sec-WebSocket-Extensions")) {            request.getHeaders().set("Sec-WebSocket-Extensions", "permessage-deflate");        }        // 检查session的值是否存在        if (request instanceof ServletServerHttpRequest) {            ServletServerHttpRequest servletRequest = (ServletServerHttpRequest) request;            HttpSession session = servletRequest.getServletRequest().getSession(false);            String accountId = (String) session.getAttribute(Constants.SKEY_ACCOUNT_ID);            //把session和accountId存放起来            attributes.put(Constants.SESSIONID, session.getId());            attributes.put(Constants.SKEY_ACCOUNT_ID, accountId);        }        */        return super.beforeHandshake(request, response, wsHandler, attributes);    }    @Override    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,Exception ex) {        log.info("============握手后===========");        super.afterHandshake(request, response, wsHandler, ex);    }}

用户拦截器配置:

package com.donwait.websocket;

import java.util.LinkedList;import java.util.List;import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.simp.SimpMessageHeaderAccessor;import org.springframework.messaging.simp.stomp.StompCommand;import org.springframework.messaging.simp.stomp.StompHeaderAccessor;import org.springframework.messaging.support.ChannelInterceptorAdapter;import org.springframework.messaging.support.MessageHeaderAccessor;

import com.donwait.amqp.RabbitMQ;import com.donwait.model.RtmpInviteInfo;import com.donwait.model.User;import com.donwait.protobuf.RTMP_INVITE_PARAM;import com.donwait.redis.RtmpInviteService;

/** * @ClassName: UserInterceptor * @Description: 客户端渠道拦截适配器 */@SuppressWarnings("deprecation")public class UserInterceptor extends ChannelInterceptorAdapter {    @Autowired    private RtmpInviteService redisRtmpInviteService;    @Autowired    private RabbitMQ rabbitMQ;    //@Autowired    //private UserCacheService userCacheService;

    /**     * 获取包含在stomp中的用户信息     */    @SuppressWarnings("rawtypes")    @Override    public Message> preSend(Message> message, MessageChannel channel) {        StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);        if (StompCommand.CONNECT.equals(accessor.getCommand())) {            Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS);            if (raw instanceof Map) {                Object name = ((Map) raw).get("name");                if (name instanceof LinkedList) {                    // 设置当前访问器的认证用户                    accessor.setUser(new User(((LinkedList) name).get(0).toString()));                }            }        }        return message;    }

    @Override    public void postSend(Message> message, MessageChannel channel, boolean sent) {        StompHeaderAccessor sha = StompHeaderAccessor.wrap(message);        // ignore non-STOMP messages like heartbeat messages        if(sha.getCommand() == null) {            return;        }

        //这里的sessionId和accountId对应HttpSessionIdHandshakeInterceptor拦截器的存放key        //String sessionId = sha.getSessionAttributes().get(Constants.SESSIONID).toString();        //String accountId = sha.getSessionAttributes().get(Constants.SKEY_ACCOUNT_ID).toString();        //判断客户端的连接状态        switch(sha.getCommand()) {            case CONNECT:                connect(sha);                break;            case CONNECTED:                break;            case DISCONNECT:                disconnect(sha);                break;            default:                break;        }    }

    // 连接成功    private void connect(StompHeaderAccessor sha){        System.out.println(" STOMP 连接成功:" + sha.getUser().getName());    }

    // 断开连接    private void disconnect(StompHeaderAccessor sha){        System.out.println(" STOMP 连接断开" + sha.getUser().getName());

        // 移除用户信息        //userCacheService.delete(sha.getUser().getName());

        String strKey = String.format("rtmp_invite_info::%s_*", sha.getUser().getName());        List invite_list = redisRtmpInviteService.findByKeyEx(strKey);        if (invite_list != null) {            for(RtmpInviteInfo rtmpInviteInfo : invite_list){                // 通知接入服务器                RTMP_INVITE_PARAM.Builder builder = RTMP_INVITE_PARAM.newBuilder();                builder.setRtmpIP(rtmpInviteInfo.getRtmpIp());                builder.setRtmpPort(rtmpInviteInfo.getRtmpPort());                builder.setDevID(rtmpInviteInfo.getDevId());                builder.setProtocolType(rtmpInviteInfo.getProtoType());                builder.setStreamType(rtmpInviteInfo.getStreamType());                rabbitMQ.send(rtmpInviteInfo.getExchangeName(), rtmpInviteInfo.getRouteKey(), builder.build().toByteArray());                strKey = String.format("%s::%s_%s_%d_%d_%d", rtmpInviteInfo.getCacheName(), sha.getUser().getName(), rtmpInviteInfo.getDevId(), rtmpInviteInfo.getChannelNum().longValue(), rtmpInviteInfo.getProtoType().longValue(), rtmpInviteInfo.getStreamType().longValue());                redisRtmpInviteService.deleteByKey(strKey);            }        }    }}

处理器代码:

package com.donwait.websocket;

import org.springframework.web.socket.server.support.DefaultHandshakeHandler;

import lombok.extern.slf4j.Slf4j;

@Slf4jpublic class HandshakeHandler extends DefaultHandshakeHandler{

    public HandshakeHandler(){        log.debug("new HandshakeHandler");    }}

配置完成之后,需要封装一个消息服务实现点对点和广播形式发送:

package com.donwait.service;

import org.springframework.beans.factory.annotation.Autowired;import org.springframework.messaging.simp.SimpMessageSendingOperations;import org.springframework.messaging.simp.user.SimpUser;import org.springframework.messaging.simp.user.SimpUserRegistry;import org.springframework.stereotype.Service;

/** * websocket广播推送服务 * @author Administrator * */@Servicepublic class MessageService {    @Autowired    SimpMessageSendingOperations sendOperation;         // 消息发送模板    @Autowired    private SimpUserRegistry userRegistry;              // 用户列表【连接的客户端信息】

    /**     * 广播形式发送报警信息     * @param     */    public void broadcast(String destination,String message) {        sendOperation.convertAndSend(destination, message);        System.out.println("路由:"+ destination + "   推送消息:" + message);    }

    /**     * 单独发送信息给某用户     * 客户端发起连接时候必须携带用户名参数     * stompClient.connect(     * {     *      name: 'lixx' // 携带客户端信息     * }     * @param     */    public void send(String destination,String username, String message) {        for (SimpUser user : userRegistry.getUsers()) {            if (user.getName().equals(username)){                sendOperation.convertAndSendToUser(username, destination, message);                System.out.println("路由:"+ destination + "   推送消息:" + message);                break;            }        }    }}

最后,送上测试的html客户端页面:



stomp

        Welcome

发送消息订阅用户消息/user/queue/message订阅报警消息/topic/alarm

至此,websocket的具体介绍与实例都已送上,如果需要源码或者技术交流或者合作请联系一下方式

源码获取、合作、技术交流请获取如下联系方式:

QQ交流群:961179337

微信账号:lixiang6153
公众号:IT技术快餐
电子邮箱:lixx2048@163.com

c++ websocket客户端_websocket使用相关推荐

  1. netty websocket客户端_Websocket操作字节序 之 服务端

    Websocket在JavaScript中操作字节序 之 客户端 在上一篇文章中,把页面的websocket编码写好了,那么服务端又该如何实现呢?由于该文是在上上篇demo中修改的,所以不全的代码还请 ...

  2. c++ websocket客户端_WebSocket协议详解与c++amp;c#实现

    摘要: 随着手机游戏.H5游戏以及微信小游戏的普及,越来越多的客户端-服务器端的通讯采用websocket协议.Websocket协议是全双工的.基于数据帧的.建立在tcp之上的长连接协议.Webso ...

  3. 火币网行情获取的websocket客户端

    从验证结果看应该是网络关闭了,不过程序写的不错,可以作为其它websocket客户端的测试程序 # !/usr/bin/env python # -*- coding: utf-8 -*- # aut ...

  4. c++ websocket客户端_你要的websocket都在这,收好不谢~~~

    此号已经沉寂多时,似乎已经忘了上一次更新是什么时候了!这一次重拾旧爱,希望能够一直保持下去,坚持写作,快乐你我他 今天的主题是websocket,相信搞研发的兄弟对websocket并不陌生,都202 ...

  5. webscoket绑定php uid,Think-Swoole之WebSocket客户端消息解析与使用SocketIO处理用户UID与fd关联...

    WebSocket 客户端消息的解析 前面我们演示了当客户端连接服务端,会触发连接事件,事件中我们要求返回当前客户端的 fd.当客户端发送消息给服务端,服务端会根据我们的规则将消息发送给指定 fd 的 ...

  6. 基于Boost::beast模块的异步WebSocket客户端

    基于Boost::beast模块的异步WebSocket客户端 实现功能 C++实现代码 实现功能 基于Boost::beast模块的异步WebSocket客户端 C++实现代码 #include & ...

  7. 基于Boost::beast模块的协程WebSocket客户端

    基于Boost::beast模块的协程WebSocket客户端 实现功能 C++实现代码 实现功能 基于Boost::beast模块的协程WebSocket客户端 C++实现代码 #include & ...

  8. 基于Boost::beast模块的同步WebSocket客户端

    Boost:基于Boost::beast模块的同步WebSocket客户端 实现功能 C++实现代码 实现功能 基于Boost::beast模块的同步WebSocket客户端 C++实现代码 #inc ...

  9. netty系列之:使用netty搭建websocket客户端

    文章目录 简介 浏览器客户端 netty对websocket客户端的支持 WebSocketClientHandshaker WebSocketClientCompressionHandler net ...

最新文章

  1. 微软终于想通把Script56文档更新了
  2. Sublime text 2/3 中 Package Control 的安装与使用方法
  3. 【深度学习】快照集成等网络训练优化算法系列
  4. 文件系统写入100G文件需要多久
  5. idea设置java scala等代码自动换行
  6. 傅里叶变换的更多性质:相位展开、零相位窗等
  7. jmeter JDBC Request
  8. 区块链软件:区块链的迅猛发展
  9. 简述 Java 垃圾回收机制
  10. 诺基亚pc远程服务器,用远程桌面把win10装进iphone —-40核256G内存的生产力工具随身带...
  11. k-平均算法(k-means算法)(k均值算法)例题
  12. MessageDigest实现MD5加密算法
  13. evga x58服务器芯片组,EVGA发布X58主板 首次涉足Intel芯片组
  14. 什么是P = NP?问题
  15. Linux WiFi使用
  16. ORA-01652: 无法通过 128 (在表空间 LTE_PM_TEMP 中) 扩展 temp 段
  17. scratch编程石头剪刀布
  18. 计算机音乐文献,论音乐文献计算机编郭小株.pdf
  19. 本地直播平台的搭建—四种方式(转载)
  20. Quartus II开发软件中的宏模块 (转摘)

热门文章

  1. linux中权限765啥意思,Linux中的文件权限
  2. pat乙级相当于什么水平_雅思6.5是什么水平?相当于托福多少分?
  3. python案例实操_用案例实操学习Python ,培养编程逻辑思维
  4. esxi挂载Linux的nfs盘,ESXi安装centos7挂载群晖NFS
  5. 在c语言程序中 对文件进行操作首先要,《C语言程序设计》试题八及答案
  6. linux 获取设备树源文件(dts)里描述的资源,Linux 获取设备树源文件(DTS)里描述的资源...
  7. linux 调优系列(续)
  8. 深度学习框架PyTorch一书的学习-第四章-神经网络工具箱nn
  9. 使用Mybatis Generator结合Ant脚本快速自动生成Model、Mapper等文件的方法
  10. Java高质量代码之 — 泛型与反射