android 短信编解码方式,中移短信cmpp协议/smpp协议 netty实现编解码
性能测试
在48core,128G内存的物理服务器上测试协议解析效率:35K条/s, cpu使用率25%.
Build
执行mvn package . jdk1.6以上.
增加了业务处理API
业务层实现接口:BusinessHandlerInterface,或者继承AbstractBusinessHandler抽象类实现业务即可。 连接保活,消息重发,消息持久化,连接鉴权都已封装,不须要业务层再实现。
如何实现自己的Handler,比如按短短信计费
参考 CMPPChargingDemoTest 里的扩展位置
实体类说明
CMPP的连接端口
com.zx.sms.connect.manager.cmpp.CMPPEndpointEntity 表示一个Tcp连接的发起端,或者接收端。用来记录连接的IP.port,以及CMPP协议的用户名,密码,业务处理的ChannelHandler集合等其它端口参数。包含三个子类:
com.zx.sms.connect.manager.cmpp.CMPPServerEndpointEntity 服务监听端口,包含一个List属性。 一个服务端口包含多个CMPPServerChildEndpointEntity端口
com.zx.sms.connect.manager.cmpp.CMPPServerChildEndpointEntity 服务接收端口,包含CMPP连接用户名,密码,以及协议版本等信息
com.zx.sms.connect.manager.cmpp.CMPPClientEndpointEntity 客户端端口,包含CMPP连接用户名,密码,以及协议版本,以及服务端IP.port. 用于连接服务端
端口连接器接口
com.zx.sms.connect.manager.EndpointConnector 负责一个端口的打开,关闭,查看当前连接数,新增连接,移除连接。每个端口的实体类都对应一个EndpointConnector.当CMPP连接建立完成,将连接加入连接器管理,并给pipeLine上挂载业务处理的ChannelHandler.
com.zx.sms.connect.manager.cmpp.CMPPServerEndpointConnector 这个类的open()调用netty的ServerBootstrap.bind()开一个服务监听
com.zx.sms.connect.manager.cmpp.CMPPServerChildEndpointConnector 用来收集CMPPServerChildEndpointEntity端口下的所有连接。它的open()方法为空.
com.zx.sms.connect.manager.cmpp.CMPPClientEndpointConnector 这个类open()调用netty的Bootstrap.connect()开始一个TCP连接
端口管理器
com.zx.sms.connect.manager.EndpointManager 该类是单例模式,管理所有端口,并负责所有端口的打开,关闭,以及端口信息保存,以及连接断线重连。
CMPP协议的连接登陆管理
com.zx.sms.session.cmpp.SessionLoginManager 这是一个netty的ChannelHandler实现,主要负责CMPP连接的建立。当CMPP连接建立完成后,会调用EndpointConnector.addChannel(channel)方法,把连接加入连接器管理,连接器负责给channel的pipeline上挂载业务处理的Handler,最后触发 SessionState.Connect事件,通知业务处理Handler连接已建立成功。
CMPP的连接状态管理器
com.zx.sms.session.cmpp.SessionStateManager 这是一个netty的ChannelHandler实现。负责每个连接上CMPP消息的存储,短信重发,流量窗口控制,过期短信的处理
CMPP协议解析器
CMPP20MessageCodecAggregator [2.0协议] CMPPMessageCodecAggregator [这是3.0协议] 聚合了CMPP主要消息协议的解析,编码,长短信拆分,合并处理。
短信持久化存储实现 StoredMapFactory
使用BDB的StoreMap实现消息持久化,防止系统意外丢失短信。
程序启动处理流程
程序启动类 new 一个CMPPEndpointEntity的实体类并设置IP,port,用户名,密码,业务处理的Handler等参数,
程序启动类 调用EndpointManager.addEndpointEntity(endpoint)方法,将端口加入管理器
程序启动类 调用EndpointManager.openAll()或者EndpointManager.openEndpoint()方法打开端口。
EndpointManager会调用EndpointEntity.buildConnector()创建一个端口连接器,并调用EndpointConnector.open()方法打开端口。
如果是CMPPClientEndpointEntity的话,就会向服务器发起TCP连接请求,如果是CMPPServerEndpointEntity则会在本机开启一个服务端口等客户端连接。
TCP连接建立完成后。netty会调用EndpointConnector.initPipeLine()方法初始化PipeLine,把CMPP协议解析器,SessionLoginManager加到PipeLine里去,然后netty触发ChannelActive事件。
在SessionLoginManager类里,客户端收到ChannelActive事件后会发送一个CMPPConnnect消息,请求建立CMPP连接.
同样在SessionLoginManager.channelRead()方法里,服务端会收到CMPPConnnect消息,开始对用户名,密码进行鉴权,并给客户端鉴权结果。
鉴权通过后,SessionLoginManager调用EndpointConnector.addChannel(channel)方法,把channel加入ArrayList,并给pipeLine上挂载SessionStateManager和业务处理的ChannelHandler,如心跳处理,日志记录,长短信合并拆分处理类。
EndpointConnector.addChannel(channel)完成后,SessionLoginManager调用ctx.fireUserEventTriggered()方法,触发 SessionState.Connect事件。
以上CMPP连接建立完成。
业务处理类收到SessionState.Connect事件,开始业务处理,如从MQ获取短信下发,或开启Consumer接收MQ推送的消息。
SessionStateManager会拦截所有read()和write()的消息,进行消息持久化,消息重发,流量控制。
增加同步调用api
smsgate自开发以来,一直使用netty的异步发送消息,但实际使用场景中同步发送消息的更方便,或者能方便的取到response。因此增加一个同步调用的api。即:发送消息后等接收到对应的响应后才返回。 使用方法如下:
//因为长短信要拆分,因此返回一个promiseList.每个拆分后的短信对应一个promise
List futures = ChannelUtil.syncWriteLongMsgToEntity("client",submitmessage);
for(Promise future: futures){
//调用sync()方法,阻塞线程。等待接收response
future.sync();
//接收成功,如果失败可以获取失败原因,比如遇到连接突然中断错误等等
if(future.isSuccess()){
//打印收到的response消息
logger.info("response:{}",future.get());
}else{
打印错误原因
logger.error("response:{}",future.cause());
}
}
//或者不阻塞进程,不调用sync()方法。
List promises = ChannelUtil.syncWriteLongMsgToEntity("client",submitmessage);
for(Promise promise: promises){
//接收到response后回调Listener方法
promise.addListener(new GenericFutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
//接收成功,如果失败可以获取失败原因,比如遇到连接突然中断错误等等
if(future.isSuccess()){
//打印收到的response消息
logger.info("response:{}",future.get());
}else{
打印错误原因
logger.error("response:{}",future.cause());
}
}
});
}
CMPP Api使用举例
public class TestCMPPEndPoint {
private static final Logger logger = LoggerFactory.getLogger(TestCMPPEndPoint.class);
@Test
public void testCMPPEndpoint() throws Exception {
ResourceLeakDetector.setLevel(Level.ADVANCED);
final EndpointManager manager = EndpointManager.INS;
CMPPServerEndpointEntity server = new CMPPServerEndpointEntity();
server.setId("server");
server.setHost("127.0.0.1");
server.setPort(7890);
server.setValid(true);
//使用ssl加密数据流
server.setUseSSL(false);
CMPPServerChildEndpointEntity child = new CMPPServerChildEndpointEntity();
child.setId("child");
child.setChartset(Charset.forName("utf-8"));
child.setGroupName("test");
child.setUserName("901783");
child.setPassword("ICP001");
child.setValid(true);
child.setVersion((short)0x30);
child.setMaxChannels((short)4);
child.setRetryWaitTimeSec((short)30);
child.setMaxRetryCnt((short)3);
child.setReSendFailMsg(true);
//child.setWriteLimit(200);
//child.setReadLimit(200);
List serverhandlers = new ArrayList();
serverhandlers.add(new CMPPMessageReceiveHandler()); //在这个handler里接收短信
child.setBusinessHandlerSet(serverhandlers);
server.addchild(child);
manager.addEndpointEntity(server);
CMPPClientEndpointEntity client = new CMPPClientEndpointEntity();
client.setId("client");
client.setHost("127.0.0.1");
//client.setLocalhost("127.0.0.1");
//client.setLocalport(65521);
client.setPort(7890);
client.setChartset(Charset.forName("utf-8"));
client.setGroupName("test");
client.setUserName("901783");
client.setPassword("ICP001");
client.setMaxChannels((short)10);
client.setVersion((short)0x30);
client.setRetryWaitTimeSec((short)30);
client.setUseSSL(false);
//client.setWriteLimit(100);
client.setReSendFailMsg(true);
client.setSupportLongmsg(SupportLongMessage.BOTH);
List clienthandlers = new ArrayList();
clienthandlers.add( new CMPPSessionConnectedHandler(10000)); //在这个handler里发送短信
client.setBusinessHandlerSet(clienthandlers);
manager.addEndpointEntity(client);
manager.openEndpoint(server);
Thread.sleep(1000);
for(int i=0;i<=child.getMaxChannels()+1;i++)
manager.openEndpoint(client);
System.out.println("start.....");
//Thread.sleep(300000);
LockSupport.park();
EndpointManager.INS.close();
}
}
SMPP Api使用举例
public class TestSMPPEndPoint {
private static final Logger logger = LoggerFactory.getLogger(TestSMPPEndPoint.class);
@Test
public void testSMPPEndpoint() throws Exception {
final EndpointManager manager = EndpointManager.INS;
SMPPServerEndpointEntity server = new SMPPServerEndpointEntity();
server.setId("smppserver");
server.setHost("127.0.0.1");
server.setPort(2776);
server.setValid(true);
//使用ssl加密数据流
server.setUseSSL(false);
SMPPServerChildEndpointEntity child = new SMPPServerChildEndpointEntity();
child.setId("smppchild");
child.setSystemId("901782");
child.setPassword("ICP");
child.setValid(true);
child.setChannelType(ChannelType.DUPLEX);
child.setMaxChannels((short)3);
child.setRetryWaitTimeSec((short)30);
child.setMaxRetryCnt((short)3);
child.setReSendFailMsg(true);
child.setIdleTimeSec((short)15);
//child.setWriteLimit(200);
//child.setReadLimit(200);
List serverhandlers = new ArrayList();
serverhandlers.add(new SMPPSessionConnectedHandler(10000));
child.setBusinessHandlerSet(serverhandlers);
server.addchild(child);
SMPPClientEndpointEntity client = new SMPPClientEndpointEntity();
client.setId("smppclient");
client.setHost("127.0.0.1");
client.setPort(2776);
client.setSystemId("901782");
client.setPassword("ICP");
client.setChannelType(ChannelType.DUPLEX);
client.setMaxChannels((short)12);
client.setRetryWaitTimeSec((short)100);
client.setUseSSL(false);
client.setReSendFailMsg(true);
//client.setWriteLimit(200);
//client.setReadLimit(200);
client.setSupportLongmsg(SupportLongMessage.SEND); //接收长短信时不自动合并
List clienthandlers = new ArrayList();
clienthandlers.add( new SMPPMessageReceiveHandler());
client.setBusinessHandlerSet(clienthandlers);
manager.addEndpointEntity(server);
manager.addEndpointEntity(client);
manager.openAll();
manager.startConnectionCheckTask();
Thread.sleep(1000);
for(int i=0;i
manager.openEndpoint(client);
System.out.println("start.....");
LockSupport.park();
EndpointManager.INS.close();
}
}
SGIP Api使用举例
public class TestSgipEndPoint {
private static final Logger logger = LoggerFactory.getLogger(TestSgipEndPoint.class);
@Test
public void testsgipEndpoint() throws Exception {
ResourceLeakDetector.setLevel(Level.ADVANCED);
final EndpointManager manager = EndpointManager.INS;
SgipServerEndpointEntity server = new SgipServerEndpointEntity();
server.setId("sgipserver");
server.setHost("127.0.0.1");
server.setPort(8001);
server.setValid(true);
//使用ssl加密数据流
server.setUseSSL(false);
SgipServerChildEndpointEntity child = new SgipServerChildEndpointEntity();
child.setId("sgipchild");
child.setLoginName("333");
child.setLoginPassowrd("0555");
child.setValid(true);
child.setChannelType(ChannelType.DUPLEX);
child.setMaxChannels((short)3);
child.setRetryWaitTimeSec((short)30);
child.setMaxRetryCnt((short)3);
child.setReSendFailMsg(false);
child.setIdleTimeSec((short)30);
//child.setWriteLimit(200);
//child.setReadLimit(200);
child.setSupportLongmsg(SupportLongMessage.SEND); //接收长短信时不自动合并
List serverhandlers = new ArrayList();
serverhandlers.add(new SgipReportRequestMessageHandler());
serverhandlers.add(new SGIPMessageReceiveHandler());
child.setBusinessHandlerSet(serverhandlers);
server.addchild(child);
manager.addEndpointEntity(server);
SgipClientEndpointEntity client = new SgipClientEndpointEntity();
client.setId("sgipclient");
client.setHost("127.0.0.1");
client.setPort(8001);
client.setLoginName("333");
client.setLoginPassowrd("0555");
client.setChannelType(ChannelType.DUPLEX);
client.setMaxChannels((short)10);
client.setRetryWaitTimeSec((short)100);
client.setUseSSL(false);
client.setReSendFailMsg(true);
//client.setWriteLimit(200);
//client.setReadLimit(200);
List clienthandlers = new ArrayList();
clienthandlers.add(new SGIPSessionConnectedHandler(10000));
client.setBusinessHandlerSet(clienthandlers);
manager.addEndpointEntity(client);
manager.openAll();
Thread.sleep(1000);
for(int i=0;i
manager.openEndpoint(client);
System.out.println("start.....");
LockSupport.park();
EndpointManager.INS.close();
}
}
Demo 执行日志
11:31:52.842 [workGroup2] INFO c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.codec.smpp.SMPP2CMPPBusinessHandler@1d7059df
11:31:52.852 [workGroup1] INFO c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.codec.smpp.SMPP2CMPPBusinessHandler@75e134be
11:31:52.852 [workGroup1] INFO c.z.s.c.m.AbstractEndpointConnector - handlers is not shareable . clone it success. com.zx.sms.handler.api.gate.SessionConnectedHandler@aa80b58
11:31:52.869 [workGroup1] INFO c.z.s.s.AbstractSessionLoginManager - login in success on channel [id: 0xfdc7b81e, L:/127.0.0.1:11481 - R:/127.0.0.1:2776]
11:31:52.867 [workGroup2] INFO c.z.s.s.AbstractSessionLoginManager - login in success on channel [id: 0x1fba3767, L:/127.0.0.1:2776 - R:/127.0.0.1:11481]
11:31:53.863 [busiWork-3] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:343, speed : 343/s
11:31:54.872 [busiWork-1] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:1381, speed : 1038/s
11:31:55.873 [busiWork-8] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:2704, speed : 1323/s
11:31:56.875 [busiWork-2] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:4010, speed : 1306/s
11:31:57.880 [busiWork-5] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:5416, speed : 1406/s
11:31:58.881 [busiWork-7] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:7442, speed : 2026/s
11:31:59.882 [busiWork-8] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:9581, speed : 2139/s
11:32:00.883 [busiWork-2] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:12865, speed : 3284/s
11:32:01.884 [busiWork-5] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:15937, speed : 3072/s
11:32:02.886 [busiWork-5] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:19489, speed : 3552/s
11:32:03.887 [busiWork-6] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:23065, speed : 3576/s
11:32:04.888 [busiWork-2] INFO c.z.s.h.a.s.MessageReceiveHandler - Totle Receive Msg Num:26337, speed : 3272/s
android 短信编解码方式,中移短信cmpp协议/smpp协议 netty实现编解码相关推荐
- 中移ML302模组通过MQTT协议接入oneNT平台
@中移ML302模组通过MQTT协议接入oneNT平台 ML302 是中国移动最新推出的 LTE Cat.1 模块. 丰富的 Internet 协议.行业标准接口和功能,支持 Windows.Linu ...
- Android-插入短信及备份手机中的短信到SD卡
短信数据库 只需要关注sms表 只需要关注4个字段 body:短信内容 address:短信的发件人或收件人号码 date:短信时间 type:1为收到,2为发送 读取系统短信,首先查询源码获得短信数 ...
- java跟onenet平台交互_中移物联OneNET平台HTTP协议接入
HTTP协议接入 登录后进入开发者中心 登录成功,点击进入"开发者中心". 点击左侧菜单 这里我们点击箭头所指的"多协议接入" 可以看到这里可以创建基于MQTT ...
- 短信ui--短信设置界面之sim卡短信管理
sim卡短信管理 1.前言 对于sim卡的短信管理,其功能包含了将存在手机上的短信保存到sim卡.将存储位置设置为sim卡时自动将短信保存到sim卡.将sim卡中的短信导入到电话中 ...
- 中移路由怎么调虚拟服务器,用手机怎么设置中移禹路由器?
问:请问中国移动无线路由器禹路由器(zy366)支持用手机设置吗?电脑开不了机了,用手机怎么设置中移禹路由器呢?求教程! 答:中国移动(铁通)的中移禹路由器ZY-366支持用手机设置,在本文家用路由器 ...
- Android开发 亲测可用--多种方式获取手机短信验证码自动填入
Android开发 静态注册.动态注册.短信中心库监控获取手机验证码,自动复制到剪切板或或填入输入框. 友情提醒初学者:这是广播接收器的类,写在xml中静态注册或写在启动类的Oncreate方法下动态 ...
- android发送短信的两种方式,发送长短信的两种方式,群发短信
android 发送短信的方法 方法一:调用系统的短信APP,发送短信. Intent smsIntent = new Intent(Intent.ACTION_VIEW);smsIntent.set ...
- android短信iphone,不越狱教你向iPhone中导入短信_手机生活评测-中关村在线
2.短信 微信只有备份通讯录的功能,第三方软件包括QQ同步助手等虽然支持通话记录.短信的备份,但是无法再Android和iOS之间互通,想要在不越狱的情况下完成短信的备份有些繁琐,接下来我们就为大家演 ...
- Android中读取短信信息
Android中读取的短信文件有 /*** 所有的短信*/public static final String SMS_URI_ALL = "content://sms/";/** ...
最新文章
- CSS自学教程--一天搞定CSS(终篇总结)
- 需求评审五个维度框架分析及其带来的启示-总起
- [转]自用类库整理之SqlHelper和MySqlHelper
- Graph Without Long Directed Paths
- 批处理延时启动的几个方法
- Linux下最简单的修改文件名后缀的命令行技巧
- mybatis 不生效 参数_Mybatis-日志配置
- python程序怎么修改_python文件如何修改
- ArcUser 2006第2期拾零
- 在VS 2010中查询和导航代码
- phpcms v9二次开发之模型类的应用(2)
- 搭建steam游戏服务器
- 计算机买什么固态硬盘,固态硬盘买什么接口好?那么多接口到底选哪种?看完这个秒懂...
- html标签不使用css样式,html – 忽略CSS样式
- java实现找一条转乘次数最少的公交线路?,基于最优换乘次数的城市公交查询算法...
- Android Studio中模拟器如何输入中文、将模拟器语言设置为中文
- 360加速插件谷歌字体服务停止运行
- win10装的AutoCAD 2012版,右上角最小化不显示的解决办法
- perl中bless的理解(zz) z
- 离散题目13(判断自反关系)
热门文章
- 使用详解_Log4j2使用详解
- python docker_Docker实践:python应用容器化
- android微信支付坑,微信支付踏坑之旅
- php response响应,9. 响应 (Response)
- 小程序弹出层禁止列表滑动_是时候展现真正的技术了!小程序教程来了——百战Web前端课程更新05.07...
- 【Linux】查看文件内容的相关命令总结
- 树——通用树到二叉树的转换
- MySQL备份和还原数据库及慢查询日志使用
- ADB 基础命令使用
- sudo: Cannot execute /usr/local/bin/zsh: No such file or directory 问题