性能测试

在48core,128G内存的物理服务器上测试协议解析效率:35K条/s, cpu使用率25%.

Build

执行mvn package . jdk1.6以上.

增加了业务处理API

业务层实现接口:BusinessHandlerInterface,或者继承AbstractBusinessHandler抽象类实现业务即可。 连接保活,消息重发,消息持久化,连接鉴权都已封装,不须要业务层再实现。

如何实现自己的Handler,比如按短短信计费

参考 CMPPChargingDemoTest 里的扩展位置

实体类说明

CMPP的连接端口

com.zx.sms.connect.manager.cmpp.CMPPEndpointEntity 表示一个Tcp连接的发起端,或者接收端。用来记录连接的IP.port,以及CMPP协议的用户名,密码,业务处理的ChannelHandler集合等其它端口参数。包含三个子类:

com.zx.sms.connect.manager.cmpp.CMPPServerEndpointEntity 服务监听端口,包含一个List属性。 一个服务端口包含多个CMPPServerChildEndpointEntity端口

com.zx.sms.connect.manager.cmpp.CMPPServerChildEndpointEntity 服务接收端口,包含CMPP连接用户名,密码,以及协议版本等信息

com.zx.sms.connect.manager.cmpp.CMPPClientEndpointEntity 客户端端口,包含CMPP连接用户名,密码,以及协议版本,以及服务端IP.port. 用于连接服务端

端口连接器接口

com.zx.sms.connect.manager.EndpointConnector 负责一个端口的打开,关闭,查看当前连接数,新增连接,移除连接。每个端口的实体类都对应一个EndpointConnector.当CMPP连接建立完成,将连接加入连接器管理,并给pipeLine上挂载业务处理的ChannelHandler.

com.zx.sms.connect.manager.cmpp.CMPPServerEndpointConnector 这个类的open()调用netty的ServerBootstrap.bind()开一个服务监听

com.zx.sms.connect.manager.cmpp.CMPPServerChildEndpointConnector 用来收集CMPPServerChildEndpointEntity端口下的所有连接。它的open()方法为空.

com.zx.sms.connect.manager.cmpp.CMPPClientEndpointConnector 这个类open()调用netty的Bootstrap.connect()开始一个TCP连接

端口管理器

com.zx.sms.connect.manager.EndpointManager 该类是单例模式,管理所有端口,并负责所有端口的打开,关闭,以及端口信息保存,以及连接断线重连。

CMPP协议的连接登陆管理

com.zx.sms.session.cmpp.SessionLoginManager 这是一个netty的ChannelHandler实现,主要负责CMPP连接的建立。当CMPP连接建立完成后,会调用EndpointConnector.addChannel(channel)方法,把连接加入连接器管理,连接器负责给channel的pipeline上挂载业务处理的Handler,最后触发 SessionState.Connect事件,通知业务处理Handler连接已建立成功。

CMPP的连接状态管理器

com.zx.sms.session.cmpp.SessionStateManager 这是一个netty的ChannelHandler实现。负责每个连接上CMPP消息的存储,短信重发,流量窗口控制,过期短信的处理

CMPP协议解析器

CMPP20MessageCodecAggregator [2.0协议] CMPPMessageCodecAggregator [这是3.0协议] 聚合了CMPP主要消息协议的解析,编码,长短信拆分,合并处理。

短信持久化存储实现 StoredMapFactory

使用BDB的StoreMap实现消息持久化,防止系统意外丢失短信。

程序启动处理流程

程序启动类 new 一个CMPPEndpointEntity的实体类并设置IP,port,用户名,密码,业务处理的Handler等参数,

程序启动类 调用EndpointManager.addEndpointEntity(endpoint)方法,将端口加入管理器

程序启动类 调用EndpointManager.openAll()或者EndpointManager.openEndpoint()方法打开端口。

EndpointManager会调用EndpointEntity.buildConnector()创建一个端口连接器,并调用EndpointConnector.open()方法打开端口。

如果是CMPPClientEndpointEntity的话,就会向服务器发起TCP连接请求,如果是CMPPServerEndpointEntity则会在本机开启一个服务端口等客户端连接。

TCP连接建立完成后。netty会调用EndpointConnector.initPipeLine()方法初始化PipeLine,把CMPP协议解析器,SessionLoginManager加到PipeLine里去,然后netty触发ChannelActive事件。

在SessionLoginManager类里,客户端收到ChannelActive事件后会发送一个CMPPConnnect消息,请求建立CMPP连接.

同样在SessionLoginManager.channelRead()方法里,服务端会收到CMPPConnnect消息,开始对用户名,密码进行鉴权,并给客户端鉴权结果。

鉴权通过后,SessionLoginManager调用EndpointConnector.addChannel(channel)方法,把channel加入ArrayList,并给pipeLine上挂载SessionStateManager和业务处理的ChannelHandler,如心跳处理,日志记录,长短信合并拆分处理类。

EndpointConnector.addChannel(channel)完成后,SessionLoginManager调用ctx.fireUserEventTriggered()方法,触发 SessionState.Connect事件。

以上CMPP连接建立完成。

业务处理类收到SessionState.Connect事件,开始业务处理,如从MQ获取短信下发,或开启Consumer接收MQ推送的消息。

SessionStateManager会拦截所有read()和write()的消息,进行消息持久化,消息重发,流量控制。

增加同步调用api

smsgate自开发以来,一直使用netty的异步发送消息,但实际使用场景中同步发送消息的更方便,或者能方便的取到response。因此增加一个同步调用的api。即:发送消息后等接收到对应的响应后才返回。 使用方法如下:

//因为长短信要拆分,因此返回一个promiseList.每个拆分后的短信对应一个promise

List futures = ChannelUtil.syncWriteLongMsgToEntity("client",submitmessage);

for(Promise future: futures){

//调用sync()方法,阻塞线程。等待接收response

future.sync();

//接收成功,如果失败可以获取失败原因,比如遇到连接突然中断错误等等

if(future.isSuccess()){

//打印收到的response消息

logger.info("response:{}",future.get());

}else{

打印错误原因

logger.error("response:{}",future.cause());

}

}

//或者不阻塞进程,不调用sync()方法。

List promises = ChannelUtil.syncWriteLongMsgToEntity("client",submitmessage);

for(Promise promise: promises){

//接收到response后回调Listener方法

promise.addListener(new GenericFutureListener() {

@Override

public void operationComplete(Future future) throws Exception {

//接收成功,如果失败可以获取失败原因,比如遇到连接突然中断错误等等

if(future.isSuccess()){

//打印收到的response消息

logger.info("response:{}",future.get());

}else{

打印错误原因

logger.error("response:{}",future.cause());

}

}

});

}

CMPP Api使用举例

public class TestCMPPEndPoint {

private static final Logger logger = LoggerFactory.getLogger(TestCMPPEndPoint.class);

@Test

public void testCMPPEndpoint() throws Exception {

ResourceLeakDetector.setLevel(Level.ADVANCED);

final EndpointManager manager = EndpointManager.INS;

CMPPServerEndpointEntity server = new CMPPServerEndpointEntity();

server.setId("server");

server.setHost("127.0.0.1");

server.setPort(7890);

server.setValid(true);

//使用ssl加密数据流

server.setUseSSL(false);

CMPPServerChildEndpointEntity child = new CMPPServerChildEndpointEntity();

child.setId("child");

child.setChartset(Charset.forName("utf-8"));

child.setGroupName("test");

child.setUserName("901783");

child.setPassword("ICP001");

child.setValid(true);

child.setVersion((short)0x30);

child.setMaxChannels((short)4);

child.setRetryWaitTimeSec((short)30);

child.setMaxRetryCnt((short)3);

child.setReSendFailMsg(true);

//child.setWriteLimit(200);

//child.setReadLimit(200);

List serverhandlers = new ArrayList();

serverhandlers.add(new CMPPMessageReceiveHandler()); //在这个handler里接收短信

child.setBusinessHandlerSet(serverhandlers);

server.addchild(child);

manager.addEndpointEntity(server);

CMPPClientEndpointEntity client = new CMPPClientEndpointEntity();

client.setId("client");

client.setHost("127.0.0.1");

//client.setLocalhost("127.0.0.1");

//client.setLocalport(65521);

client.setPort(7890);

client.setChartset(Charset.forName("utf-8"));

client.setGroupName("test");

client.setUserName("901783");

client.setPassword("ICP001");

client.setMaxChannels((short)10);

client.setVersion((short)0x30);

client.setRetryWaitTimeSec((short)30);

client.setUseSSL(false);

//client.setWriteLimit(100);

client.setReSendFailMsg(true);

client.setSupportLongmsg(SupportLongMessage.BOTH);

List clienthandlers = new ArrayList();

clienthandlers.add( new CMPPSessionConnectedHandler(10000)); //在这个handler里发送短信

client.setBusinessHandlerSet(clienthandlers);

manager.addEndpointEntity(client);

manager.openEndpoint(server);

Thread.sleep(1000);

for(int i=0;i<=child.getMaxChannels()+1;i++)

manager.openEndpoint(client);

System.out.println("start.....");

//Thread.sleep(300000);

LockSupport.park();

EndpointManager.INS.close();

}

}

SMPP Api使用举例

public class TestSMPPEndPoint {

private static final Logger logger = LoggerFactory.getLogger(TestSMPPEndPoint.class);

@Test

public void testSMPPEndpoint() throws Exception {

final EndpointManager manager = EndpointManager.INS;

SMPPServerEndpointEntity server = new SMPPServerEndpointEntity();

server.setId("smppserver");

server.setHost("127.0.0.1");

server.setPort(2776);

server.setValid(true);

//使用ssl加密数据流

server.setUseSSL(false);

SMPPServerChildEndpointEntity child = new SMPPServerChildEndpointEntity();

child.setId("smppchild");

child.setSystemId("901782");

child.setPassword("ICP");

child.setValid(true);

child.setChannelType(ChannelType.DUPLEX);

child.setMaxChannels((short)3);

child.setRetryWaitTimeSec((short)30);

child.setMaxRetryCnt((short)3);

child.setReSendFailMsg(true);

child.setIdleTimeSec((short)15);

//child.setWriteLimit(200);

//child.setReadLimit(200);

List serverhandlers = new ArrayList();

serverhandlers.add(new SMPPSessionConnectedHandler(10000));

child.setBusinessHandlerSet(serverhandlers);

server.addchild(child);

SMPPClientEndpointEntity client = new SMPPClientEndpointEntity();

client.setId("smppclient");

client.setHost("127.0.0.1");

client.setPort(2776);

client.setSystemId("901782");

client.setPassword("ICP");

client.setChannelType(ChannelType.DUPLEX);

client.setMaxChannels((short)12);

client.setRetryWaitTimeSec((short)100);

client.setUseSSL(false);

client.setReSendFailMsg(true);

//client.setWriteLimit(200);

//client.setReadLimit(200);

client.setSupportLongmsg(SupportLongMessage.SEND); //接收长短信时不自动合并

List clienthandlers = new ArrayList();

clienthandlers.add( new SMPPMessageReceiveHandler());

client.setBusinessHandlerSet(clienthandlers);

manager.addEndpointEntity(server);

manager.addEndpointEntity(client);

manager.openAll();

manager.startConnectionCheckTask();

Thread.sleep(1000);

for(int i=0;i

manager.openEndpoint(client);

System.out.println("start.....");

LockSupport.park();

EndpointManager.INS.close();

}

}

SGIP Api使用举例

public class TestSgipEndPoint {

private static final Logger logger = LoggerFactory.getLogger(TestSgipEndPoint.class);

@Test

public void testsgipEndpoint() throws Exception {

ResourceLeakDetector.setLevel(Level.ADVANCED);

final EndpointManager manager = EndpointManager.INS;

SgipServerEndpointEntity server = new SgipServerEndpointEntity();

server.setId("sgipserver");

server.setHost("127.0.0.1");

server.setPort(8001);

server.setValid(true);

//使用ssl加密数据流

server.setUseSSL(false);

SgipServerChildEndpointEntity child = new SgipServerChildEndpointEntity();

child.setId("sgipchild");

child.setLoginName("333");

child.setLoginPassowrd("0555");

child.setValid(true);

child.setChannelType(ChannelType.DUPLEX);

child.setMaxChannels((short)3);

child.setRetryWaitTimeSec((short)30);

child.setMaxRetryCnt((short)3);

child.setReSendFailMsg(false);

child.setIdleTimeSec((short)30);

//child.setWriteLimit(200);

//child.setReadLimit(200);

child.setSupportLongmsg(SupportLongMessage.SEND); //接收长短信时不自动合并

List serverhandlers = new ArrayList();

serverhandlers.add(new SgipReportRequestMessageHandler());

serverhandlers.add(new SGIPMessageReceiveHandler());

child.setBusinessHandlerSet(serverhandlers);

server.addchild(child);

manager.addEndpointEntity(server);

SgipClientEndpointEntity client = new SgipClientEndpointEntity();

client.setId("sgipclient");

client.setHost("127.0.0.1");

client.setPort(8001);

client.setLoginName("333");

client.setLoginPassowrd("0555");

client.setChannelType(ChannelType.DUPLEX);

client.setMaxChannels((short)10);

client.setRetryWaitTimeSec((short)100);

client.setUseSSL(false);

client.setReSendFailMsg(true);

//client.setWriteLimit(200);

//client.setReadLimit(200);

List clienthandlers = new ArrayList();

clienthandlers.add(new SGIPSessionConnectedHandler(10000));

client.setBusinessHandlerSet(clienthandlers);

manager.addEndpointEntity(client);

manager.openAll();

Thread.sleep(1000);

for(int i=0;i

manager.openEndpoint(client);

System.out.println("start.....");

LockSupport.park();

EndpointManager.INS.close();

}

}

Demo 执行日志

11:31:52.842 [workGroup2] INFO c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.codec.smpp.SMPP2CMPPBusinessHandler@1d7059df

11:31:52.852 [workGroup1] INFO c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.codec.smpp.SMPP2CMPPBusinessHandler@75e134be

11:31:52.852 [workGroup1] INFO c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.handler.api.gate.SessionConnectedHandler@aa80b58

11:31:52.869 [workGroup1] INFO c.z.s.s.AbstractSessionLoginManager - login in success on channel [id: 0xfdc7b81e, L:/127.0.0.1:11481 - R:/127.0.0.1:2776]

11:31:52.867 [workGroup2] INFO c.z.s.s.AbstractSessionLoginManager - login in success on channel [id: 0x1fba3767, L:/127.0.0.1:2776 - R:/127.0.0.1:11481]

11:31:53.863 [busiWork-3] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:343, speed : 343/s

11:31:54.872 [busiWork-1] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:1381, speed : 1038/s

11:31:55.873 [busiWork-8] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:2704, speed : 1323/s

11:31:56.875 [busiWork-2] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:4010, speed : 1306/s

11:31:57.880 [busiWork-5] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:5416, speed : 1406/s

11:31:58.881 [busiWork-7] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:7442, speed : 2026/s

11:31:59.882 [busiWork-8] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:9581, speed : 2139/s

11:32:00.883 [busiWork-2] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:12865, speed : 3284/s

11:32:01.884 [busiWork-5] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:15937, speed : 3072/s

11:32:02.886 [busiWork-5] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:19489, speed : 3552/s

11:32:03.887 [busiWork-6] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:23065, speed : 3576/s

11:32:04.888 [busiWork-2] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:26337, speed : 3272/s

android 短信编解码方式,中移短信cmpp协议/smpp协议 netty实现编解码相关推荐

  1. 中移ML302模组通过MQTT协议接入oneNT平台

    @中移ML302模组通过MQTT协议接入oneNT平台 ML302 是中国移动最新推出的 LTE Cat.1 模块. 丰富的 Internet 协议.行业标准接口和功能,支持 Windows.Linu ...

  2. Android-插入短信及备份手机中的短信到SD卡

    短信数据库 只需要关注sms表 只需要关注4个字段 body:短信内容 address:短信的发件人或收件人号码 date:短信时间 type:1为收到,2为发送 读取系统短信,首先查询源码获得短信数 ...

  3. java跟onenet平台交互_中移物联OneNET平台HTTP协议接入

    HTTP协议接入 登录后进入开发者中心 登录成功,点击进入"开发者中心". 点击左侧菜单 这里我们点击箭头所指的"多协议接入" 可以看到这里可以创建基于MQTT ...

  4. 短信ui--短信设置界面之sim卡短信管理

    sim卡短信管理 1.前言           对于sim卡的短信管理,其功能包含了将存在手机上的短信保存到sim卡.将存储位置设置为sim卡时自动将短信保存到sim卡.将sim卡中的短信导入到电话中 ...

  5. 中移路由怎么调虚拟服务器,用手机怎么设置中移禹路由器?

    问:请问中国移动无线路由器禹路由器(zy366)支持用手机设置吗?电脑开不了机了,用手机怎么设置中移禹路由器呢?求教程! 答:中国移动(铁通)的中移禹路由器ZY-366支持用手机设置,在本文家用路由器 ...

  6. Android开发 亲测可用--多种方式获取手机短信验证码自动填入

    Android开发 静态注册.动态注册.短信中心库监控获取手机验证码,自动复制到剪切板或或填入输入框. 友情提醒初学者:这是广播接收器的类,写在xml中静态注册或写在启动类的Oncreate方法下动态 ...

  7. android发送短信的两种方式,发送长短信的两种方式,群发短信

    android 发送短信的方法 方法一:调用系统的短信APP,发送短信. Intent smsIntent = new Intent(Intent.ACTION_VIEW);smsIntent.set ...

  8. android短信iphone,不越狱教你向iPhone中导入短信_手机生活评测-中关村在线

    2.短信 微信只有备份通讯录的功能,第三方软件包括QQ同步助手等虽然支持通话记录.短信的备份,但是无法再Android和iOS之间互通,想要在不越狱的情况下完成短信的备份有些繁琐,接下来我们就为大家演 ...

  9. Android中读取短信信息

    Android中读取的短信文件有 /*** 所有的短信*/public static final String SMS_URI_ALL = "content://sms/";/** ...

最新文章

  1. CSS自学教程--一天搞定CSS(终篇总结)
  2. 需求评审五个维度框架分析及其带来的启示-总起
  3. [转]自用类库整理之SqlHelper和MySqlHelper
  4. Graph Without Long Directed Paths
  5. 批处理延时启动的几个方法
  6. Linux下最简单的修改文件名后缀的命令行技巧
  7. mybatis 不生效 参数_Mybatis-日志配置
  8. python程序怎么修改_python文件如何修改
  9. ArcUser 2006第2期拾零
  10. 在VS 2010中查询和导航代码
  11. phpcms v9二次开发之模型类的应用(2)
  12. 搭建steam游戏服务器
  13. 计算机买什么固态硬盘,固态硬盘买什么接口好?那么多接口到底选哪种?看完这个秒懂...
  14. html标签不使用css样式,html – 忽略CSS样式
  15. java实现找一条转乘次数最少的公交线路?,基于最优换乘次数的城市公交查询算法...
  16. Android Studio中模拟器如何输入中文、将模拟器语言设置为中文
  17. 360加速插件谷歌字体服务停止运行
  18. win10装的AutoCAD 2012版,右上角最小化不显示的解决办法
  19. perl中bless的理解(zz) z
  20. 离散题目13(判断自反关系)

热门文章

  1. 使用详解_Log4j2使用详解
  2. python docker_Docker实践:python应用容器化
  3. android微信支付坑,微信支付踏坑之旅
  4. php response响应,9. 响应 (Response)
  5. 小程序弹出层禁止列表滑动_是时候展现真正的技术了!小程序教程来了——百战Web前端课程更新05.07...
  6. 【Linux】查看文件内容的相关命令总结
  7. 树——通用树到二叉树的转换
  8. MySQL备份和还原数据库及慢查询日志使用
  9. ADB 基础命令使用
  10. sudo: Cannot execute /usr/local/bin/zsh: No such file or directory 问题