java 事件驱动 netty_Netty2-事件驱动的NIO框架(使用范例)
本文将告诉你如何使用Netty2来编一个网络应用程序(包括客户端和服务端)。我会介绍一个简单的SumUp协议,用来对整数求和。通过源代码的一步步讲解,你会了解到Netty2的每个特性。
SumUp服务会加总从客户端送来的ADD消息中的所有值,并且为每个ADD消息返回一个RESULT消息。所有消息都是由header和body两部分组成:
header包含type和sequence两个字段。type表示消息的类型(0是RESULT消息,1是ADD消息)。sequence用来表示一组对应的ADD和RESULT(也就是说,服务器回应ADD消息时,应在RESULT中使用与ADD一样的sequence值)。
ADD消息包含了要被求和的值。
RESULT具有不固定长度的消息体。当计算没问题时,body内容是加总的值(4bytes),如果有错误或溢位,则是2bytes. 见下图:
MessageRecognizer从送来的数据中重组出Message对象。这儿我们实现了一个SumUpMessageRecognizer,用于客户端和服务端的信息重组。public class SumUpMessageRecognizer implements MessageRecognizer {
public static final int CLIENT_MODE = 1;
public static final int SERVER_MODE = 2;
private int mode;
public SumUpMessageRecognizer(int mode) {
switch (mode) {
case CLIENT_MODE:
case SERVER_MODE:
this.mode = mode;
break;
default:
throw new IllegalArgumentException("invalid mode: " + mode);
}
}
public Message recognize(ByteBuffer buf) throws MessageParseException {
// return null if message type is not arrived yet.
if (buf.remaining() < Constants.TYPE_LEN)
return null;
int type = buf.getShort();
switch (mode) {
// 如果是server模式,只让它接收ADD消息.
case SERVER_MODE:
switch (type) {
case Constants.ADD:
return new AddMessage();
default:
throw new MessageParseException("unknown type: " + type);
}
//如果是客户端模式,只让它接收RESULT消息.
case CLIENT_MODE:
switch (type) {
case Constants.RESULT:
return new ResultMessage();
default:
throw new MessageParseException("unknown type: " + type);
}
default:
throw new InternalError(); // this cannot happen
}
}
}
我们必须实现ADD和RESULT消息: ADD和RESULT。 它们都有公共的header,最好的方式是实现一个AbstractMessage,并且从它继承出Add和Result消息。
源代码:
实现了Messagerecognizer和Message之后,要实现Server和Client是非常容易的事情,通过下面的代码,你会很容易理解如何去实现协议的处理流程。
实现服务端两个主要的类,一个是Server类,另一个是ServerSessionListener. Server类负责启动主程序并监听连接。而ServerSessionListener用于处理和发送消息。public class Server {
private static final int SERVER_PORT = 8080;
private static final int DISPATCHER_THREAD_POOL_SIZE = 16;
public static void main(String[] args) throws Throwable {
// 初始化 I/O processor 和 event dispatcher
IoProcessor ioProcessor = new IoProcessor();
ThreadPooledEventDispatcher eventDispatcher = new OrderedEventDispatcher();
// 启动缺省数量的I/O工作线程
ioProcessor.start();
// 启动指定数量的event dispatcher 线程
eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE);
eventDispatcher.start();
// 准备 message recognizer
MessageRecognizer recognizer = new SumUpMessageRecognizer(
SumUpMessageRecognizer.SERVER_MODE);
// 准备session监听器,用于处理通讯过程.
ServerSessionListener listener = new ServerSessionListener();
// 开启server socket通道
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(SERVER_PORT));
// 监听连接,并开始通讯
System.out.println("listening on port " + SERVER_PORT);
for (;;) {
// 接受connection
SocketChannel channel = ssc.accept();
// 建立新的session
Session session = new Session(ioProcessor, channel, recognizer, eventDispatcher);
// 添加session监听器
session.addSessionListener(listener);
// 开始通讯
session.start();
}
}
}
public class ServerSessionListener implements SessionListener {
public ServerSessionListener() {
}
public void connectionEstablished(Session session) {
System.out.println(session.getSocketAddress() + " connected");
// 设置空闲时间为60秒
session.getConfig().setIdleTime(60);
// 设置sum的初始值为0。
session.setAttachment(new Integer(0));
}
public void connectionClosed(Session session) {
System.out.println(session.getSocketAddress() + " closed");
}
// 当收到client发来的消息时,此方法被调用
public void messageReceived(Session session, Message message) {
System.out.println(session.getSocketAddress() + " RCVD: " + message);
// client端只发送AddMessage. 其它情况要另作处理
// 在这里只是简单的进行类型转换处理
AddMessage am = (AddMessage) message;
// 将收到的消息里的值加上当前sum的值.
int sum = ((Integer) session.getAttachment()).intValue();
int value = am.getValue();
long expectedSum = (long) sum + value;
if (expectedSum > Integer.MAX_VALUE || expectedSum < Integer.MIN_VALUE) {
// 如果溢位返回错误消息
ResultMessage rm = new ResultMessage();
rm.setSequence(am.getSequence()); // 从送来的Add消息中得到sequence值。
rm.setOk(false);
session.write(rm);
} else {
// 加总
sum = (int) expectedSum;
session.setAttachment(new Integer(sum));
// 返回结果消息
ResultMessage rm = new ResultMessage();
rm.setSequence(am.getSequence()); // 从送来的Add消息中得到sequence值。
rm.setOk(true);
rm.setValue(sum);
session.write(rm);
}
}
public void messageSent(Session session, Message message) {
System.out.println(session.getSocketAddress() + " SENT: " + message);
}
public void sessionIdle(Session session) {
System.out.println(session.getSocketAddress()
+ " disconnecting the idle");
// 关闭空闲的会话。
session.close();
}
// 异常发生时,将调用此方法
public void exceptionCaught(Session session, Throwable cause) {
System.out.println(Thread.currentThread().getName()
+ session.getSocketAddress() + " exception:");
cause.printStackTrace(System.out);
if (cause instanceof MessageParseException) {
// 印出错误信息内容,便于调试
MessageParseException mpe = (MessageParseException) cause;
ByteBuffer buf = mpe.getBuffer();
System.out.println(buf);
System.out.print("Buffer Content: ");
while (buf.remaining() > 0) {
System.out.print(buf.get() & 0xFF);
System.out.print(' ');
}
System.out.println();
}
// 关闭会话
session.close();
}
}
服务端运行后,其输出的内容示例如下:listening on port 8080
/127.0.0.1:4753 connected
/127.0.0.1:4753 RCVD: 0:ADD(4)
/127.0.0.1:4753 RCVD: 1:ADD(6)
/127.0.0.1:4753 RCVD: 2:ADD(2)
/127.0.0.1:4753 RCVD: 3:ADD(7)
/127.0.0.1:4753 RCVD: 4:ADD(8)
/127.0.0.1:4753 RCVD: 5:ADD(1)
/127.0.0.1:4753 SENT: 0:RESULT(4)
/127.0.0.1:4753 SENT: 1:RESULT(10)
/127.0.0.1:4753 SENT: 2:RESULT(12)
/127.0.0.1:4753 SENT: 3:RESULT(19)
/127.0.0.1:4753 SENT: 4:RESULT(27)
/127.0.0.1:4753 SENT: 5:RESULT(28)
/127.0.0.1:4753 closed
跟服务端对应,主要由Client和ClientSessionListener组成。public class Client {
private static final String HOSTNAME = "localhost";
private static final int PORT = 8080;
private static final int CONNECT_TIMEOUT = 30; // seconds
private static final int DISPATCHER_THREAD_POOL_SIZE = 4;
public static void main(String[] args) throws Throwable {
// 预备要加总的值。
int[] values = new int[args.length];
for (int i = 0; i < args.length; i++) {
values[i] = Integer.parseInt(args[i]);
}
// 初始化 I/O processor 和 event dispatcher
IoProcessor ioProcessor = new IoProcessor();
ThreadPooledEventDispatcher eventDispatcher = new OrderedEventDispatcher();
// 开始缺省数量的I/O工作线程
ioProcessor.start();
// 启动指定数量的event dispatcher线程
eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE
eventDispatcher.start();
// 准备 message recognizer
MessageRecognizer recognizer = new SumUpMessageRecognizer(
SumUpMessageRecognizer.CLIENT_MODE);
// 准备客户端会话。
Session session = new Session(ioProcessor, new InetSocketAddress(
HOSTNAME, PORT), recognizer, eventDispatcher);
session.getConfig().setConnectTimeout(CONNECT_TIMEOUT);
// 开始会话,并使用ClientSessionListener监听。
ClientSessionListener listener = new ClientSessionListener(values);
session.addSessionListener(listener);
session.start();
// 一直等到加总完成
while ( !listener.isComplete() ) {
Thread.sleep(1000);
}
// 停止 I/O processor 和 event dispatcher
eventDispatcher.stop();
ioProcessor.stop();
}
}
public class ClientSessionListener implements SessionListener {
private final int[] values;
private boolean complete;
public ClientSessionListener(int[] values) {
this.values = values;
}
public boolean isComplete() {
return complete;
}
// 当连接建立好后会调用此方法。
public void connectionEstablished(Session session) {
System.out.println("connected to " + session.getSocketAddress());
// 发送加总请求。
for (int i = 0; i < values.length; i++) {
AddMessage m = new AddMessage();
m.setSequence(i);
m.setValue(values[i]);
session.write(m);
}
}
public void connectionClosed(Session session) {
System.out.println("disconnected from " + session.getSocketAddress());
}
// 当收到server的回应信息时,会调用此方法
public void messageReceived(Session session, Message message) {
System.out.println("RCVD: " + message);
// 服务端只发送ResultMessage. 其它情况下
// 要通过instanceOf来判断它的类型.
ResultMessage rm = (ResultMessage) message;
if (rm.isOk()) {
// 如果ResultMessage是OK的.
// 根据ResultMessage的sequence值来判断如果,
// 一次消息的sequence值,则
if (rm.getSequence() == values.length - 1) {
// 打印出结果.
System.out.println("The sum: " + rm.getValue());
// 关闭会话
session.close();
complete = true;
}
} else {
// 如有错误,则打印错误信息,并结束会话.
System.out.println("server error, disconnecting...");
session.close();
complete = true;
}
}
public void messageSent(Session session, Message message) {
System.out.println("SENT: " + message);
}
public void sessionIdle(Session session) {
}
public void exceptionCaught(Session session, Throwable cause) {
cause.printStackTrace(System.out);
if (cause instanceof ConnectException) {
// 如果连接server失败, 则间隔5秒重试连接.
System.out.println("sleeping...");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
System.out.println("reconnecting... " + session.getSocketAddress());
session.start();
} else {
session.close();
}
}
}
通过上面的例子,你也许会发现实现一个自定义的协议原来如此简单。你如果用Netty试着去实现自己的smtp或pop协议,我想也不会是一件难事了。
Attachments:
java 事件驱动 netty_Netty2-事件驱动的NIO框架(使用范例)相关推荐
- 【Java进阶营】Java异步NIO框架Netty实现高性能高并发
1. 背景 1.1. 惊人的性能数据 最近一个圈内朋友通过私信告诉我,通过使用Netty4 + Thrift压缩二进制编解码技术,他们实现了10W TPS(1K的复杂POJO对象)的跨节点远程服务调用 ...
- Java NIO框架
NIO 全称是Non-Blocking IO或New IO,也就是非阻塞IO或新版IO. NIO的特性如下: 1.面向缓冲区(Buffer):每个Buffer 实质上是一个容器对象 每一种基本 Jav ...
- Java NIO框架Mina、Netty、Grizzly介绍与对比
Java NIO框架Mina.Netty.Grizzly介绍与对比 原文地址:https://blog.csdn.net/e765741668/article/details/45234711 Min ...
- Java NIO框架Mina
2019独角兽企业重金招聘Python工程师标准>>> 真羡慕java有系列的懒人框架,虽然c也有Netty, ACE等高速网络框架,但是语言的特性,决定了java框架之间更好集成, ...
- java nio 框架_几种Java NIO框架的比较(zz)
问题:生活中工作中,会有人问我javaNIO框架里面 Netty Mina xSocket Grizzly 等等哪个比较好? 在这里写一下自己的感受,也算是总结一下吧 在我的印象中.不管是什么NIO ...
- Java NIO框架(Java编程)
Java NIO框架 MINA 用netty性能和链接数.并发等压力测试参数好于mina 1.NIO弥补了原来的I/O的不足,它再标准java代码中提供了高速和面向块的I/O 原力的I/O库与NIO最 ...
- Grizzly:Java NIO框架
在Java新I/O API(NIO)出现之前,开发可扩展的服务器应用比较困难.Grizzly NIO框架的设计初衷便是帮助开发者更好地利用Java NIO API,构建强大的可扩展的服务器应用,并提供 ...
- java netty 教程_Java NIO框架Netty教程(十六)
该图是OneCoder通过阅读Netty源码,逐渐记录下来的.基本可以说明Netty服务的启动流程.这里在具体讲解一下. 首先说明,我们这次顺利的流程是基于NioSocketServer的.也就是基于 ...
- Java NIO 框架 Netty 之美:粘包与半包问题
Netty 是一个可以快速开发网络应用程序的 NIO 框架,它大大简化了 TCP 或者 UDP 服务器的网络编程.Netty 的简易和快速开发并不意味着由它开发的程序将失去可维护性或者存在性能问题,它 ...
最新文章
- 网络摄像头+net_今日 Paper | 神经网络结构搜索;视觉目标;人物识别;视频3D人体姿态估计等...
- 身为Java程序员,这些开源工具你一定要学会!
- java多线程--AtomicReference
- Luogu3092 [USACO13NOV]没有找零No Change (状压DP)
- java查看日志命令_[Java教程]【Linux】linux查看日志文件内容命令tail、cat、tac、head、echo...
- C++ STL之list具体解释
- 张樟兴策略分析:数据库营销顾客
- 集群、分布式架构与SOA架构
- mybatis 多数据源_SpringBoot+Mybatis配置多数据源及事务方案
- 插件学习笔记:搜索引擎ElasticSearch
- 利用java打印出金字塔原理_《金字塔原理》的读后感作文2500字
- Alictf2014 Writeup
- 轻松解决U盘中病毒,文件变成.exe执行文件的问题
- phyton题库+解析
- 手机剪辑视频指南:去水印、加字幕、做转场,统统一键就搞定
- Tensorflow Keras中的masking与padding的学习笔记
- 大学计算机信息技术实验教程,计算机信息技术实验教程.pdf
- 浅谈 Java 并发下的乐观锁
- 托福百日冲刺(五一记忆)(1)
- Minecraft 1.18.1、1.18.2模组开发 22.狙击枪(Sniper Rifle)
热门文章
- c语言 生成大素数,C语言实现寻找大素数
- python set union_python – set.union()抱怨它在传入生成器时没有参数
- powerbi和python区别_PowerBI和Python关于数据分析的对比
- java模拟一个军队作战_战区级联合作战仿真推演系统
- html怎么设置log区,javlog
- java中集合的区别_Java中的集合与集合之间的区别
- 硬件安全(二) 5G时代IOT环境下芯片安全风险与挑战
- java分页查询oracle_Java中实现Oracle分页查询
- php mysql记录用户行为_PHP实现用session来实现记录用户登陆信息
- cx_oracle写日志信息_MongoDB与MySQL关于写确认的异同