2019独角兽企业重金招聘Python工程师标准>>>

今天心情很不好!!! 原因保密。

这篇是基于"netty与websocket通信demo"。

错误想法:大量客户请求,共用一个worker,来实现推送。

正确作法:应该是对Channel对应的ChannelGroup进行操作,来实现推送。

一个Channel可以划分到多个ChannelGroup中。

PushServerChannelHandler和DynMessage这两个类最重要,其实类基本没变。

package org.sl.demo.chatserver;import java.util.List;
import java.util.Map;import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;public class PushServerChannelHandler extends SimpleChannelHandler {static boolean debug = true;@Overridepublic void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e){if(debug){System.out.println("channelOpen");}DynMessage.addAudience(e.getChannel());}@Overridepublic void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception{Channel ch = e.getChannel();Object msg = e.getMessage();if(debug){System.out.println("---------------");System.out.println("message: "+msg.getClass());}try{if(msg instanceof HttpRequest){processHttpRequest(ch, (HttpRequest)msg);}else if(msg instanceof WebSocketFrame){processWebsocketRequest(ch,(WebSocketFrame)msg);}else{//未处理的请求类型}}catch(Exception ex){ch.close().sync();}super.messageReceived(ctx, e);}@Overridepublic void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e){if(debug){System.out.println("channelClosed");}if(e instanceof MessageEvent){MessageEvent me = (MessageEvent) e;            }DynMessage.removeAudience(e.getChannel());e.getChannel().close();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e){if(debug){System.out.println("channelClosed");}DynMessage.removeAudience(e.getChannel());e.getCause().printStackTrace();e.getChannel().close();try {super.exceptionCaught(ctx, e);} catch (Exception e1) {      e1.printStackTrace();}}void processHttpRequest(Channel channel,HttpRequest request){HttpHeaders headers = request.headers();if(debug){List<Map.Entry<String,String>> ls = headers.entries();for(Map.Entry<String,String> i: ls){System.out.println("header  "+i.getKey()+":"+i.getValue());}}    //non-get requestif(!HttpMethod.GET.equals(request.getMethod())){DefaultHttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.BAD_REQUEST);channel.write(resp);          channel.close();return;}WebSocketServerHandshakerFactory wsShakerFactory = new WebSocketServerHandshakerFactory("ws://"+request.headers().get(HttpHeaders.Names.HOST),null,false );WebSocketServerHandshaker wsShakerHandler = wsShakerFactory.newHandshaker(request);if(null==wsShakerHandler){//无法处理的websocket版本wsShakerFactory.sendUnsupportedWebSocketVersionResponse(channel);}else{//向客户端发送websocket握手,完成握手//客户端收到的状态是101 sitching protocolwsShakerHandler.handshake(channel, request);}       }void processWebsocketRequest(Channel channel, WebSocketFrame request) throws Exception{        if(request instanceof CloseWebSocketFrame){DynMessage.removeAudience(channel);channel.close().sync();}else if(request instanceof PingWebSocketFrame){           channel.write(new PongWebSocketFrame(request.getBinaryData()));  }else if(request instanceof TextWebSocketFrame){//这个地方 可以根据需求,加上一些业务逻辑TextWebSocketFrame txtReq = (TextWebSocketFrame) request;        if(debug){ System.out.println("txtReq:"+txtReq.getText());}if("disconnect".equalsIgnoreCase(txtReq.getText())){DynMessage.removeAudience(channel);channel.close().sync();return;}//把符合条件的channel添加到DynMessage的channelGroup中DynMessage.addAudience(channel);}else{//WebSocketFrame还有一些}}
}
package org.sl.demo.chatserver;import java.util.Random;import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;/**
*动态产生消息,并向Channel组推送。
*/
public class DynMessage implements Runnable{public static ChannelGroup audiences = new DefaultChannelGroup("msg-group");static public void addAudience(Channel ch){      audiences.add(ch);}static public void removeAudience(Channel ch){audiences.remove(ch);}static String[] names = {"Tom", "Jerry","Terry", "Looney","Merrie", "William","Joseph", "Hanna","Speike", "Tyke","Tuffy", "Lightning",};static String message = "";public static String getMessage(){StringBuffer sb = new StringBuffer();sb.append("hello,my name is ");sb.append(names[new Random().nextInt(names.length)]);sb.append(".");       return sb.toString();
//      return message;}@Overridepublic void run() {       System.out.println("DynMessage start");for(;;){String msg = getMessage();            radiate(msg);try{Thread.sleep(1000); }catch(Exception ex){}}}void radiate(String msg){audiences.write(new TextWebSocketFrame(msg));}
}
<html>
<head>
<script src="jquery-1.9.1.js"></script>
<script src="messagepush.js"></script>
<script >
function doStop(){stopMsgPush();
}function doWsStart(){var  r6 = generateMixed(6);$("#txtReq").val(r6);var  params = $("#txtReq").val();doStop();wsMsgPush('127.0.0.1',params,function(data){$("#txtResp").val(data);          },function(){$("#txtResp").val("ws close...");} ,function(){$("#txtResp").val("ws error...");} );
}
</script>
</head><body><br/>
<br/><br/>
send: <input id="txtReq" readonly="readonly" type="text" value="" />
<input type="button" value="start" onclick="doWsStart()">
<input type="button" value="stop" onclick="doStop()"/>
<br/>recv: <input id="txtResp" type="text" value=""  size="50"/>
</body>
</html>
var _mp_ws = null;
var _mp_ajax_it = null;function msgPush(url, params,onmessage,onclose,onerror){wsMsgPush(url,params,onmessage,onclose,onerror);if(!_mp_ws){ajaxMsgPush(url,params,10000,onmessage,onclose,onerror);}
}function old_wsMsgPush(url, params,onmessage,onclose,onerror){ var ws = new WebSocket("ws://"+url); ws.onopen = function(){ws.send('1111')};ws.onmessage = function(evt){ onmessage(evt.data);};
}function wsMsgPush(url, params,onmessage,onclose,onerror){ _mp_ws = new WebSocket("ws://"+url); if(!_mp_ws){ return; }_mp_ws.onopen = function(){ _mp_ws.send(params); };if(onmessage) _mp_ws.onmessage = function(evt){ onmessage(evt.data); }if(onerror) _mp_ws.onerror = function (evt){ onerror(); }if(onclose) _mp_ws.onclose = function (evt){ onclose(); }
}function ajaxMsgPush(url, params,interval,onmessage,onclose,onerror){  function __getmsg(){$.ajax({url:                url,data:           params,cache:           true,type:          "get",dataType:       "text",       success:        function(data, textStatus, jqXHR){ if(onmessage) onmessage(data);},error:           function(jqXHR, textStatus, errorThrown){if(onerror) onerror();},complete:      function(jqXHR, textStatus){if(onclose) onclose();}});} _mp_ajax_it = setInterval("__getmsg()",interval);
}function stopMsgPush(){if(_mp_ws){_mp_ws.send("disconnect");_mp_ws.close();}if(_mp_ajax_it){clearInterval(_mp_ajax_it);}
}var chars = ['0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z'];
function generateMixed(n) {var res = "";for(var i = 0; i < n ; i ++) {var id = Math.ceil(Math.random()*35);res += chars[id];}return res;
}
package org.sl.demo.chatserver;import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.timeout.WriteTimeoutHandler;
import org.jboss.netty.util.HashedWheelTimer;public class PushServerChannelPiplelineFactory  implements ChannelPipelineFactory{@Overridepublic ChannelPipeline getPipeline() throws Exception {ChannelPipeline cp = Channels.pipeline();cp.addLast("decoder", new HttpRequestDecoder());cp.addLast("encoder", new HttpResponseEncoder());cp.addLast("writeTimeout", new WriteTimeoutHandler(new HashedWheelTimer(),10));cp.addLast("handler", new PushServerChannelHandler());return cp;}}
package org.sl.demo.chatserver;import java.net.InetSocketAddress;
import java.util.concurrent.Executors;import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;public class PushServer implements Runnable{int port = 80;public PushServer(int port){this.port = port;}@Overridepublic void run() {System.out.println("ChatServer "+port);ServerBootstrap b = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()));b.setOption("child.tcpNoDelay", true);  b.setOption("child.keepAlive", true);b.setPipelineFactory(new PushServerChannelPiplelineFactory());b.bind(new InetSocketAddress(port));}public static void main(String[] args){Thread t = new Thread(new DynMessage(),"DynMessage");t.start();new PushServer(80).run();}
}

转载于:https://my.oschina.net/tangcoffee/blog/340246

netty websocket 简单消息推送demo相关推荐

  1. springboot定时发送短信_springboot 整合websocket实现消息推送(主动推送,具体用户推送,群发,定时推送)...

    websocket springboot 整合websocket实现消息推送(主动推送,具体用户推送,群发,定时推送) 使用WebSocket构建交互式Web应用程序 本指南将引导您完成创建" ...

  2. python websocket实现消息推送_python Django websocket 实时消息推送

    [实例简介] Django websocket 实时消息推送 服务端主动推送 调用 send(username, title, data, url) username:用户名 title:消息标题 d ...

  3. php通知websocket,php实现websocket实时消息推送

    php实现websocket实时消息推送,供大家参考,具体内容如下 SocketService.php /** * Created by xwx * Date: 2017/10/18 * Time: ...

  4. Activemq MQTT 简单消息推送示例

    Activemq MQTT 简单消息推送示例 简介     简单使用 MQTT 连接 Activemq 进行消息推送的示例代码 编写详情 环境准备     使用docker启动Activemq,查看M ...

  5. python websocket实时消息推送

    python websocket实时消息推送 十分想念顺店杂可... 本人写的渣,大神勿喷. 转载请附带本文链接,谢谢. 服务端代码 # -*- coding: utf-8 -*- # @Time : ...

  6. vue-admin websocket接收消息推送+语音提示(详细代码)

    websocket接收消息推送+语音提示 这个是同事的代码,我拿来记录一下,希望以后可以看得懂-- utils/websocket.js const audioUrl = require('@/ass ...

  7. java整合消息推送_SpringMVC整合websocket实现消息推送及触发功能

    本文为大家分享了SpringMVC整合websocket实现消息推送,供大家参考,具体内容如下 1.创建websocket握手协议的后台 (1)HandShake的实现类 /** *Project N ...

  8. android消息推送demo

    android消息推送demo 1.消息推送机制  服务器器端需要变被动为主动,通知客户一些开发商认为重要的信息,无论应用程序是否正在运行或者关闭.  我想到了一句话:don't call me,i ...

  9. uniapp接收服务器消息,【教程】uniapp websocket实现消息推送

    部分开发者在使用uniapp的过程中会用到websocket,但是uniapp框架提供的websocket服务并不是尽善尽美. 我在这里为大家介绍一款第三方的websocket推送服务:GoEasy, ...

最新文章

  1. Java项目:茶叶售卖商城系统(java+SSM+JSP+EasyUi+mysql)
  2. iptables规则备份和恢复、firewalld的9个zone、firewalld关于zone和service操作
  3. node 创建静态web服务器(下)(处理异步获取数据的两种方式)
  4. 【CyberSecurityLearning 31】Linux网络信息查看与配置、日志文件的管理、备份及日志服务器的搭建
  5. java生成数据插入hbase_hbase实战之javaAPI插入数据
  6. P4198-楼房重建【线段树】
  7. 信息学奥赛一本通 1113:不与最大数相同的数字之和 | OpenJudge NOI 1.9 07
  8. linux内核模块的程序结构
  9. matlab2013基础教程,Matlab2013a教程
  10. 一步一步跟着杨中科.net视频学c#基础(1)
  11. Silverlight 2.“.NET研究”5D RPG游戏技巧与特效处理:(二)纸娃娃系统
  12. beanshell断言_Jmeter之BeanShell断言使用(示例代码)
  13. J2Cache的学习
  14. Mybatis 中事务提交方式
  15. 如何使用python 给PDF生成目录
  16. 放大器的频率特性(2)-- 共源极的频率特性
  17. C#.Net实现AutoCAD块属性提取
  18. android看黑白电子书软件,如何优雅解决App启动黑白屏
  19. OCR--苹果ios安卓android身份证拍照扫描识别sdk
  20. ps教程300集,让你入门就精通(内附资源)

热门文章

  1. 如何关闭借呗订阅开通通知_支付宝花呗借呗隐藏规则,芝麻分600以上,花呗3.6万,借呗12万!...
  2. mysql 常用数据库连接池_常见的数据库连接池
  3. mysql tree_MySQL树形遍历(二)
  4. linux关闭gvim命令,Linux 下 8 种退出 vim 编辑器的方法
  5. linux ftp用户指定多个目录,linux ftp服务器下用户限制目录的方法
  6. java安装 hello_安装JAVA步骤,并编写HELLOWORLD程序
  7. 四大基本反应类型的关系_如何进入四大的咨询部门?
  8. 《软件需求分析(第二版)》第 7 章——聆听客户的需求 重点部分总结
  9. Ubuntu设置root登录
  10. python布局管理_Python基础=== Tkinter Grid布局管理器详解