Netty的断线重连
因为工作中经常使用到TCP,所以会频繁使用到诸如Mina或Netty之类的通信框架,为了方便项目的逻辑调用,经常会在框架的基础上再一次进行封装,这样做其实有画蛇添足的嫌疑,但也是无奈之举。
这里主要记载使用Mina和Netty,构建适合项目的一个完整的重连逻辑。
当然,都是作为客户端,毕竟一般只有客户端才会做重连。
在这之前,需要考虑几个问题:
- 连接行为的结果可以较为方便地获得,成功或失败,最好直接有接口回调,可以在回调中进行后续逻辑处理
- 当前通信连接的活跃状态需要准确实时而方便地获得,这样有利于重连时对连接的判断
- 能够较为灵活的配置Listener或Handler或Filter
- 支持计数,无论是首次连接失败多次后不再尝试连接,还是中途断开后断线重连多次后不再尝试连接,一般不作无休止地重连
从代码层面看,框架中最好有一个类似Connector的类,能够暴露合适的接口或方法,提供各种状态与回调,使通信连接的动向能够实时把握,然而事情并不是那么美好。
连接结果
由于框架设计的一些原则,一个connector根本不足以暴露这些接口。
对于Mina而言,作为客户端一般用于连接的连接器是NioSocketConnector;
对于Netty而言,则是Bootstrap;
下表是一些常见的定义在两个框架中的对比,不一定准确,但意义相近;
定义 | Mina | Netty |
---|---|---|
连接器 | SocketConnector | Bootstrap |
会话 | IoSession | Channel |
连接结果 | ConnectFuture | ChannelFuture |
逻辑处理 | IoHandler | ChannelHandler |
过滤器 | IoFilter | ChannelHandler |
对于Mina而言,连接操作是这样的:
ConnectFuture future = mConnector.connect();future.awaitUninterruptibly();if (future.isConnected()) {//得到会话mSession = future.getSession();}
对于Netty来说,连接可以写成与Mina几乎相同的形式:
ChannelFuture future = bootstrap.connect();future.awaitUninterruptibly();if(future.isSuccess()){mChannel = future.channel();}
也可以不阻塞等待,两种future都可以自行添加Listener监听异步任务是否完成:
//Minafuture.addListener(new IoFutureListener<IoFuture>() {@Overridepublic void operationComplete(IoFuture ioFuture) {}});
//Nettyfuture.addListener(new GenericFutureListener<ChannelFuture>() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {}});
毕竟是出自一人之手,部分API真是惊人的相似。
到这里,第一个连接返回结果问题算是有所结论,两种框架都可以正常返回连接的结果。
会话状态
而上述代码中,返回的mSession与mChannel就是得到的会话,这两种类各自提供了一些接口,可以用于获得通信连接的实时状态。
Mina的IoSession这里只取部分方法:
public interface IoSession {IoHandler getHandler();IoSessionConfig getConfig();IoFilterChain getFilterChain();ReadFuture read();WriteFuture write(Object var1);WriteFuture write(Object var1, SocketAddress var2);CloseFuture closeNow();boolean isConnected();boolean isActive();boolean isClosing();boolean isSecured();CloseFuture getCloseFuture();SocketAddress getRemoteAddress();SocketAddress getLocalAddress();SocketAddress getServiceAddress();boolean isIdle(IdleStatus var1);boolean isReaderIdle();boolean isWriterIdle();boolean isBothIdle();
}
再对比看下Netty提供的Channel,这里也只取部分方法展示:
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {EventLoop eventLoop();Channel parent();ChannelConfig config();boolean isOpen();boolean isRegistered();boolean isActive();SocketAddress localAddress();SocketAddress remoteAddress();boolean isWritable();Channel.Unsafe unsafe();ChannelPipeline pipeline();public interface Unsafe {SocketAddress localAddress();SocketAddress remoteAddress();void register(EventLoop var1, ChannelPromise var2);void bind(SocketAddress var1, ChannelPromise var2);void connect(SocketAddress var1, SocketAddress var2, ChannelPromise var3);void disconnect(ChannelPromise var1);void close(ChannelPromise var1);void write(Object var1, ChannelPromise var2);void flush();}
}
可以看出,无论是IoSession还是Channel,都有相关的API可以知晓通信是否活跃,所以第二个问题在可以获得IoSession或Channel的情况下,是没有问题的。
配置Handler
那么再看配置Listener或Handler的相差操作是否灵活。
二者在这方面的差别较为明显。
对于Mina而言,添加Handler可以直接利用Connector,真正的逻辑Handler只能由setHandler方法添加,且只能为一个,而相关的Filter则要通过getFilterChain()拿到的过滤器集合去添加;对于Mina来说,Handler和Filter是没有交集的,他们分属不同的接口IoHandler和IoFilter:
mConnector.setHandler(handler);
mConnector.getFilterChain().addLast(CODEC, protocolCodecFilter);
Netty有所不同,netty中所有的handler、filter都是ChannelHandller,这些handler都要在连接行为发生后才能生效,也就是挂载到Channel上的,而不是Bootstrap,一般添加是这样的:
bootstrap.handler(handler);
channel.pipeline().addLast(someHandler);
channel.pipeline().addLast(someFilter);
但handler依旧只能添加一个,如果要添加多个handler或filter,就必须获取到channel,然后进行添加,netty本身提供了一个ChannelInitializer可以用于添加多个channelHandler,一般会这么写:
bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(handler);channel.pipeline().addLast(someHandler);channel.pipeline().addLast(someFilter);}});
对于Netty来说,Handler和Filter是同一个东西,都是ChannelHandler。
两者在这方面的区别比较明显:
一是netty将handler和filter都统一为handler了,
二是netty不能像mina一样,在未连接之前就可以配置所有的Handler或Filter,netty必须获得channel也就是连接成功后才能配置多个Filter。
这就造成了一个问题,Mina可以提前就配置监听器监听连接的状态,可以正常监听中途断开,也就是在创建Connector后就可以挂载上监听:
mConnector.getFilterChain().addFirst("reconnect", new IoFilterAdapter() {@Overridepublic void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { //监听到断开,可接入回调接口,做进一步的重连逻辑mConnector.connect();}});
而Netty不能,创建Connector也就是Bootstrap并不能实现类似的挂载,Bootstrap只能挂载一个Handler,而相关的过滤器或监听只能在Channel出现后再进行挂载,那么就会写成这样:
bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {//添加其他Filter或Handler}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);//监听到断开,重连bootstrap.connect();}});
这里initChannel方法永远是最先被调用的,因为在源码中是这样的:
//ChannelInitializer.javaprotected abstract void initChannel(C var1) throws Exception;public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {if (this.initChannel(ctx)) {ctx.pipeline().fireChannelRegistered();this.removeState(ctx);} else {ctx.fireChannelRegistered();}}
在这种逻辑下,Mina可以在sessionClosed回调中使用SocketConnetor进行重连,Netty可以在channelInactive回调中使用Bootstrap进行重连。
看起来没什么毛病。
但需要注意一点,就是Handler的复用问题,也就是对Handler或Filter的检查,Mina和Netty都有对Handler的重复添加进行过检查,不过检查逻辑有细微的差别。
Mina中是这样检查的:
//DefaultIoFilterChain.javapublic synchronized void addLast(String name, IoFilter filter) {this.checkAddable(name);this.register(this.tail.prevEntry, name, filter);}private final Map<String, Entry> name2entry = new ConcurrentHashMap();private void checkAddable(String name) {if (this.name2entry.containsKey(name)) {throw new IllegalArgumentException("Other filter is using the same name '" + name + "'");}}
可以看到,Mina只会检查Filter在Map中对应的key是否被使用过,当然理论上Filter挂载在SocketConnector的FilterChain中,只要配置过一次,就无需再进行配置。
那么Netty呢?
Netty的Handler不是能随意复用的,要复用必须标明注解@Sharable,否则就会出现异常:
警告: Failed to initialize a channel. Closing: [id: 0x1caafa97]
io.netty.channel.ChannelPipelineException: io.netty.handler.timeout.IdleStateHandler is not a @Sharable handler, so can't be added or removed multiple times.
这是因为在源码进行检查时,是对Handler本身进行检查的,handler会有一个added的属性,一旦被添加使用过,就会置为true,而判断逻辑会阻止为added=true的handler添加进来 。这样一来,如果强行添加已经添加过的handler就会抛出异常:
//DefaultChannelPipeline.javapublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {AbstractChannelHandlerContext newCtx;synchronized(this) {checkMultiplicity(handler);newCtx = this.newContext(group, this.filterName(name, handler), handler);//省略部分代码}this.callHandlerAdded0(newCtx);return this;}private static void checkMultiplicity(ChannelHandler handler) {if (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h = (ChannelHandlerAdapter)handler;if (!h.isSharable() && h.added) {throw new ChannelPipelineException(h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times.");}h.added = true;}}
这也就说明使用channel.pipeline().addLast(handler)这种方法添加handler时,如果想不同的Channel添加同一个Handler实例,每种handler都必须注解了@Sharable,如果正好要使用IdleStateHandler这种源码内部的Handler,而IdleStateHandler是没有注解过@Sharable,那么就会出现上面的异常。
而实际应用中,为了实现心跳,IdleStateHandler是一般都会使用到的。
那么问题来了,Mina每次重新连接,创建新的session,但只要SocketConnector没有变,所有Handler和Filter自然就没有变,仍然可用,因为所有Handler和Filter是挂载到SocketConnector的FilterChain中,算是只和Connector相关的;
而Netty,如果重新连接的话,会创建新的Channel,然后会重新调用initChannel,然后利用channel.pipeline().addLast添加Handler,算是挂载到Channel上的,而不是Bootstrap上。
这样显示出两者最大的区别就是,Mina中配置一次即可,而Netty则需要每次产生新的Channel时对其进行重新配置。
所以Netty中的handler想复用的话,就必须加注解,否则就会报异常。如果一定要用到无法注解@Sharable的Handler,比如上面的IdleStateHandler,那就要想办法每次initChannel时,也新建一个新的IdleStateHandler…
或者,继承IdleStateHandler,然后加上注解也行,虽然也很丑就是了。
So Bad…
这样的情况下,可以想办法,每次都新建,类似这种:
FunctionsChannelHandler functionsChannelHandler = new FunctionsChannelHandler(bootstrap){@Overridepublic ChannelHandler[] handlers() {return new ChannelHandler[]{new NormalClientEncoder(),new IdleStateHandler(20, 10, 20),this,new NormalClientHandler()};}};bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {//添加各种handlerchannel.pipeline().addLast(functionsChannelHandler.handlers());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);//监听到断开}});
因为netty把所有监听器过滤器逻辑处理都归为ChannelHandler的原因,把一个handler扩展成一个功能较为丰富的handler是一种不错的方法 。或者沿用这种思路,使其每次新加Handler时,都是new过的Handler。
应对框架自带的一些未注解@Sharable的类,也可以继承之,自行加注解:
@ChannelHandler.Sharable
public class HeartHandler extends IdleStateHandler {public HeartHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds);}public HeartHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {super(readerIdleTime, writerIdleTime, allIdleTime, unit);}public HeartHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {super(observeOutput, readerIdleTime, writerIdleTime, allIdleTime, unit);}
}
这样一来,配置Handler也勉强可算灵活了。
连接计数
对连接计数一般都是开发者编写的逻辑,主要是应对无休止地连接。
主要应用在两种场景:
一是首次连接,如果多次连接不成功,那么停止连接,或者另有逻辑;
二是断线重连,如果多次重连不成功,那么停止连接并销毁,或者另有逻辑。
因为Mina和Netty都是多线程模型的缘故,计数为了求稳可以直接使用Atom类,当然觉得大材小用也可以直接使用普通int值,毕竟理论上两次连接中间应该会有一定延时才对。
应用示例
所以最后,都可以对各自的连接器进行二次封装,然后编写对自己有利的逻辑。
对于Mina,大概可以写成这样:
public class TCPConnector {private static final int BUFFER_SIZE = 10 * 1024;private static final long CONNECT_TIMEOUT_MILLIS = 10 * 1000;private static final int KEEPALIVE_REQUEST_INTERVAL = 10;private static final int KEEPALIVE_REQUEST_TIMEOUT = 40;private static final String RECONNECT = "reconnect";private static final String CODEC = "codec";private static final String HEARTBEAT = "heartbeat";private static final String EXECUTOR = "executor";/*** 连接器*/private SocketConnector mConnector;/*** 会话*/private IoSession mSession;/*** 外用接口*/private IConnectorListener connectorListener;/*** 连接所在线程*/private ExecutorService mExecutor;/*** 重连次数*/private AtomicInteger recconnectCounter;/*** 首次连接次数*/private AtomicInteger connectCounter;private String host;private int port;public interface IConnectorListener {/*** 连接建立成功*/void connectSuccess(IoSession session);/*** 连接建立失败*/void connectFailed();/*** 连接中途断掉时*/void sessionClosed(IoSession session);}public TCPConnector() {mConnector = new NioSocketConnector();recconnectCounter = new AtomicInteger(0);connectCounter = new AtomicInteger(0);}/*** 设置目标地址与端口** @param host 目标地址* @param port 目标端口*/public void setHostPort(String host, int port) {L.d("设置地址与端口-" + host + ":" + port);this.host = host;this.port = port;}public String getHost() {return this.host;}/*** 在子线程中启用连接*/public void connectInThread() {mExecutor.execute(new Runnable() {@Overridepublic void run() {connect();}});}/*** 根据设置的参数连接*/private void connect() {//如果已经连接,则直接【连接成功】if (mSession == null || !mSession.isConnected()) {//连接mConnector.setDefaultRemoteAddress(new InetSocketAddress(host, port));L.i("连接-->" + host + ":" + port);ConnectFuture future = mConnector.connect();//阻塞,等待连接建立响应future.awaitUninterruptibly(CONNECT_TIMEOUT_MILLIS);//响应连接成功或失败if (future.isConnected()) {//得到会话mSession = future.getSession();if (connectorListener != null) {connectCounter.set(0);connectorListener.connectSuccess(mSession);}} else {if (connectorListener != null) {connectCounter.incrementAndGet();connectorListener.connectFailed();}}} else {if (connectorListener != null) {connectCounter.incrementAndGet();connectorListener.connectSuccess(mSession);}}}/*** 重连*/private void reconnect() {if (mConnector == null)throw new IllegalArgumentException("IoConnector cannot be null");//如果已经连接,则直接【连接成功】if (mSession == null || !mSession.isConnected()) {//连接ConnectFuture future = mConnector.connect();//阻塞,等待连接建立响应future.awaitUninterruptibly();//响应连接成功或失败if (future.isConnected()) {//得到会话mSession = future.getSession();}}}/*** 重连** @param reconnectTimeoutMills 连接的超时时间* @param reconnectTimes 重连次数*/public void reconnect(final long reconnectTimeoutMills, final int reconnectTimes) {try {recconnectCounter.set(0);while (mConnector != null && !(mSession != null && mSession.isConnected()) && recconnectCounter.incrementAndGet() < reconnectTimes) {reconnect();if (mSession != null && mSession.isConnected()) {break;}else{TimeUnit.MILLISECONDS.sleep(reconnectTimeoutMills);}L.w(Thread.currentThread().getName() + "," + "重连" + host + ":" + port + "(" + recconnectCounter.get() + ")次...");}} catch (InterruptedException e) {e.printStackTrace();} finally {if (mSession != null && mSession.isConnected()) {if (connectorListener != null) {connectorListener.connectSuccess(mSession);}} else {if (connectorListener != null) {connectorListener.connectFailed();}}}}public IoSession getSession() {return mSession;}public IoConnector getConnector() {return mConnector;}public boolean isActive() {return mConnector != null && mConnector.isActive() && mSession != null;}public boolean isConnected() {return mSession != null && mSession.isConnected();}public IConnectorListener getConnectorListener() {return connectorListener;}public int getRecconnectCounter() {return recconnectCounter.get();}public int getConnectCounter() {return connectCounter.get();}public void resetConnectCounter() {connectCounter.set(0);}/*** 断开连接,释放资源*/public void disconnect() {if (mConnector != null) {connectorListener = null;mConnector.getFilterChain().clear();mConnector.dispose();mConnector = null;}if (mSession != null) {mSession.closeNow();mSession = null;}if (mExecutor != null) {mExecutor.shutdown();mExecutor = null;}L.w("断开");}public static class Builder {private TCPConnector newInstance = new TCPConnector();private ProtocolCodecFilter protocolCodecFilter;private KeepAliveFilter keepAliveFilter;public Builder setExecutor(ExecutorService executor) {newInstance.mExecutor = executor;return this;}public Builder setConnectListener(IConnectorListener listener) {newInstance.connectorListener = listener;return this;}public Builder setHost(String host) {newInstance.host = host;return this;}public Builder setPort(int port) {newInstance.port = port;return this;}public Builder setProtocolCodecFilter(ProtocolCodecFactory protocolCodecFactory) {protocolCodecFilter = new ProtocolCodecFilter(protocolCodecFactory);return this;}public Builder setConnectTimeoutMillis(long connectTimeoutMillis) {newInstance.mConnector.setConnectTimeoutMillis(connectTimeoutMillis);return this;}public Builder setKeepAliveFilter(KeepAliveMessageFactory keepAliveMessageFactory, int keepAliveRequestInterval) {keepAliveFilter = new KeepAliveFilter(keepAliveMessageFactory, IdleStatus.BOTH_IDLE, KeepAliveRequestTimeoutHandler.LOG, keepAliveRequestInterval, KEEPALIVE_REQUEST_TIMEOUT);return this;}public Builder setKeepAliveFilter(KeepAliveMessageFactory keepAliveMessageFactory, KeepAliveRequestTimeoutHandler keepAliveRequestTimeoutHandler, int keepAliveRequestInterval, int keepAliveRequestTimeOut) {keepAliveFilter = new KeepAliveFilter(keepAliveMessageFactory, IdleStatus.BOTH_IDLE, keepAliveRequestTimeoutHandler, keepAliveRequestInterval, keepAliveRequestTimeOut);return this;}public Builder setHandlerAdapter(IoHandlerAdapter handler) {newInstance.mConnector.setHandler(handler);return this;}public Builder setReadBuffer(int size) {newInstance.mConnector.getSessionConfig().setReadBufferSize(size);return this;}public Builder setReceiveBuffer(int size) {newInstance.mConnector.getSessionConfig().setReceiveBufferSize(size);return this;}public Builder setSendBuffer(int size) {newInstance.mConnector.getSessionConfig().setSendBufferSize(size);return this;}public TCPConnector build() {//添加重连监听if (newInstance.connectorListener != null) {newInstance.mConnector.getFilterChain().addFirst(RECONNECT, new IoFilterAdapter() {@Overridepublic void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {if (newInstance != null && newInstance.connectorListener != null)newInstance.connectorListener.sessionClosed(session);}});}//设置编码解码if (protocolCodecFilter != null)newInstance.mConnector.getFilterChain().addLast(CODEC, protocolCodecFilter);//设置心跳if (keepAliveFilter != null)newInstance.mConnector.getFilterChain().addLast(HEARTBEAT, keepAliveFilter);//connector不允许使用OrderedThreadPoolExecutornewInstance.mConnector.getFilterChain().addLast(EXECUTOR, new ExecutorFilter(Executors.newSingleThreadExecutor()));return newInstance;}}@Overridepublic String toString() {return "MinaHelper{" +"mSession=" + mSession +", mConnector=" + mConnector +", connectorListener=" + connectorListener +", mExecutor=" + mExecutor +'}';}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;TCPConnector connector = (TCPConnector) o;return port == connector.port && (host != null ? host.equals(connector.host) : connector.host == null);}@Overridepublic int hashCode() {int result = host != null ? host.hashCode() : 0;result = 31 * result + port;return result;}
使用起来是这样的:
HigherGateWayHandler higherGateWayHandler = new HigherGateWayHandler();TCPConnector higherGateWayClient = new TCPConnector.Builder().setExecutor(ThreadPool.singleThread("higher_gateway_client")).setHost(NC.GATEWAT_HIGHER_HOST).setPort(NC.LOWER_PORT).setConnectTimeoutMillis(10 * 1000).setReadBuffer(10 * 1024).setHandlerAdapter(higherGateWayHandler).setProtocolCodecFilter(new HigherGateWayCodecFactory()).setKeepAliveFilter(new KeepAliveHigherGateWay(), higherGateWayHandler, 10, 20).setConnectListener(new TCPConnector.IConnectorListener() {@Overridepublic void connectSuccess(IoSession session) {//连接成功后}@Overridepublic void connectFailed() {//重连失败if (higherGateWayClient.getRecconnectCounter() == 3) {//重连失败后}//非重连失败,优先级连接情况下if (higherGateWayClient.getRecconnectCounter() == 0 && higherGateWayClient.getConnectCounter() > 2) {higherGateWayClient.resetConnectCounter();} else {higherGateWayClient.connectInThread();}}@Overridepublic void sessionClosed(IoSession session) {executors.execute(new Runnable() {@Overridepublic void run() {//重连逻辑higherGateWayClient.reconnect(10 * 1000, 3);}});}}).build();
而Netty,封装起来会有一点花里胡哨,目前遇到的问题是当重连以后复用IdleStateHandler这种Handler时,就会使得其中的计时机制失效,也就是说,心跳没用了,暂时不明原因,大概率是其中的线程被销毁无法再起的原因。那么当前就只能想办法每次调用initChannel时,创建新的Handler才行:
public class NettyConnector {/*** 连接器*/private Bootstrap bootstrap;/*** 地址*/private String host;private int port;/*** 会话*/private Channel channel;private static final long TIME_OUT = 10;private long connectTimeoutMills;/*** 重连次数*/private AtomicInteger recconnectCounter;/*** 首次连接次数*/private AtomicInteger connectCounter;/*** 以接口引出通信状态*/public interface IChannelStateListener {void onConnectSuccess(Channel channel);void onConnectFailed();void onDisconnect();}private IChannelStateListener channelStateListener;private NettyConnector(final Builder builder) {recconnectCounter = new AtomicInteger(0);connectCounter = new AtomicInteger(0);connectTimeoutMills = builder.timeoutMills;bootstrap = builder.bootstrap;bootstrap.handler(new ChannelInitializer() {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(new ChannelDisconnectHandler());channel.pipeline().addLast(builder.handlerSet.handlers());}});}public void setRemoteAddress(String host, int port) {L.d("设置地址与端口-" + host + ":" + port);this.host = host;this.port = port;}public void setChannelStateListener(IChannelStateListener listener) {channelStateListener = listener;}public void connect() {if (channel == null || !channel.isActive()) {bootstrap.remoteAddress(this.host, this.port);L.d("第" + (connectCounter.get() + 1) + "次连接" + host + ":" + port + "中......");final long startMills = System.currentTimeMillis();ChannelFuture channelFuture = bootstrap.connect();channelFuture.addListener(new GenericFutureListener<ChannelFuture>() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {L.d("连接(" + bootstrap.config().remoteAddress() + ")成功");channel = f.channel();if (channelStateListener != null) {connectCounter.set(0);channelStateListener.onConnectSuccess(channel);}} else {long delay = System.currentTimeMillis() - startMills;if (delay > 0) {TimeUnit.MILLISECONDS.sleep(connectTimeoutMills - delay);}L.d("连接(" + bootstrap.config().remoteAddress() + ")失败");if (channelStateListener != null) {connectCounter.incrementAndGet();channelStateListener.onConnectFailed();}}}});}}private void reconnect() {if (bootstrap == null)throw new IllegalArgumentException("bootstrap cannot be null");//如果已经连接,则直接【连接成功】if (channel == null || !channel.isActive()) {//连接channel = bootstrap.connect().awaitUninterruptibly().channel();}}/*** 重连* @param reconnectTimeoutMills 重连超时时间* @param reconnectTimes 重连次数*/public void reconnect(final long reconnectTimeoutMills, final int reconnectTimes) {try {recconnectCounter.set(0);while (channel != null && !channel.isActive() && recconnectCounter.getAndIncrement() < reconnectTimes) {L.d(Thread.currentThread().getName() + "," + "重连" + bootstrap.config().remoteAddress() + "(" + recconnectCounter.get() + ")次...");reconnect();if (channel.isActive()) {break;} else {TimeUnit.MILLISECONDS.sleep(reconnectTimeoutMills);}L.d(channel.isActive() + "");}} catch (InterruptedException e) {e.printStackTrace();} finally {if (channel != null && channel.isActive()) {if (channelStateListener != null) {channelStateListener.onConnectSuccess(channel);}} else {if (channelStateListener != null) {channelStateListener.onConnectFailed();}}}}public Channel getChannel() {return channel;}public boolean isConnected() {return channel != null && channel.isActive();}public String getAddress() {return host + ":" + port;}public int getConnectFailedTimes() {return connectCounter.get();}public int getReconnectFailedTimes() {return recconnectCounter.get();}public static class Builder {private Bootstrap bootstrap = new Bootstrap();private HandlerSet handlerSet;private long timeoutMills = 10 * 1000;public Builder group(EventLoopGroup loopGroup) {bootstrap.group(loopGroup);return this;}@Deprecatedpublic Builder remoteAddress(String inetHost, int inetPort) {bootstrap.remoteAddress(inetHost, inetPort);return this;}public Builder setConnectTimeoutMills(long timeout) {timeoutMills = timeout;return this;}public Builder handler(HandlerSet handlers) {handlerSet = handlers;return this;}public NettyConnector build() {bootstrap.channel(NioSocketChannel.class);return new NettyConnector(this);}}/*** 主要用于监听断开*/class ChannelDisconnectHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelInactive();if (channelStateListener != null) {channelStateListener.onDisconnect();}}}/*** 主要用于创建新的handler,避免复用带来的一些问题*/@ChannelHandler.Sharablepublic static abstract class HandlerSet extends ChannelInboundHandlerAdapter {public abstract ChannelHandler[] handlers();}}
因为Netty不能像Mina直接在Connector上挂载监听sessionClosed,只能用一个ChannelDisconnectHandler这样的东西去监听是否已经断开,并通过接口引出结果;
并且因为只能在Channel.Pipeline中才能添加多个Handler的原因,这里用一个HandlerSet强行将所有需要的Handler集合,然后在创建Bootstrap的时候一次性添加进去,想要保证每次都新建,这里就使用抽象方法,让使用的时候可以自行创建。
注意,由于这里的抽象类HandlerSet每次其实并不是新建的,所有是需要复用的,所以需要加注解@Sharable,但也只需要加它一个就行了,其他都是新建出来的,无需理会。
写出来就是这样:
NettyConnector connector = new NettyConnector.Builder().group(new NioEventLoopGroup()).handler(new NettyConnector.HandlerSet() {@Overridepublic ChannelHandler[] handlers() {return new ChannelHandler[]{new HeartHandler(10, 10, 10),new NormalClientEncoder(),new NormalClientHeartBeatHandler(),new NormalClientHandler()};}}).setConnectTimeoutMills(5 * 1000).build();connector.setRemoteAddress("192.168.0.102", 8000);connector.setChannelStateListener(new NettyConnector.IChannelStateListener() {@Overridepublic void onConnectSuccess(Channel channel) {L.d("连接" + channel.remoteAddress().toString() + "成功");}@Overridepublic void onConnectFailed() {L.d("连接" + connector.getAddress() + "失败");if (connector.getReconnectFailedTimes() == 0 && connector.getConnectFailedTimes() < 3) {connector.connect();}}@Overridepublic void onDisconnect() {L.d(connector.getChannel().remoteAddress().toString() + "已断开");connector.reconnect(5000, 5);}});
其中的HeartHandler是继承自IdleStateHandler的。
整个封装显得花里胡哨…却又很丑,不过勉强能用,水平有限。
就这样吧。
Netty的断线重连相关推荐
- Netty客户端断线重连实现及问题思考
点击关注公众号,利用碎片时间学习 前言 在实现TCP长连接功能中,客户端断线重连是一个很常见的问题,当我们使用netty实现断线重连时,是否考虑过如下几个问题: 如何监听到客户端和服务端连接断开 ? ...
- Android 通过 NSD 服务 Netty(断线重连、心跳、黏包处理) 实现两个 Android 系统端的长连接通讯
引言 近期需求,通过手机App端取号机(含叫号通知功能),实时连接 另一台 Android 广告机用于播放当前被叫到的号数. 这里有两种Android 机 一台「基于Sunmi版的可出小票的Andro ...
- 面试官问:服务的心跳机制与断线重连,Netty底层是怎么实现的?懵了
点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 心跳机制 何为心跳 所谓心跳, 即在 TCP 长连接中, ...
- Netty实现心跳机制与断线重连
点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 来源:https://www.jianshu.com/p/ ...
- Netty 断线重连解决方案
Netty 断线重连解决方案 参考文章: (1)Netty 断线重连解决方案 (2)https://www.cnblogs.com/wujinsen/p/8949299.html 备忘一下.
- 浅析 Netty 实现心跳机制与断线重连
基础 何为心跳 顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性. 为什么需要心跳 因为网络的不可 ...
- 四、Netty 实现心跳机制与断线重连
一.概述 何为心跳 顾名思义, 所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性. 为什么需要心跳 因为网络的不 ...
- netty心跳过程中 发送消息失败_Netty 4.0 实现心跳检测和断线重连
arg0.pipeline().addLast("ping", new IdleStateHandler(25, 15, 10,TimeUnit.SECONDS)); 这个处理器, ...
- 用Netty撸一个心跳机制和断线重连!
来源:www.jianshu.com/p/1a28e48edd92 心跳机制 何为心跳 所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确 ...
最新文章
- 特征交互(Feature Interaction)及多项式特征(PolynomialFeatures)
- 构建本地缓存java_Java8简单的本地缓存实现
- 面试「计算机操作系统」知识点大集合!
- 一篇不一样的docker原理解析
- 计算机应用基础 专2018秋,广东开放大学远程教育专科2018年秋计算机应用基础Word模块测试.pdf...
- 互联网基础知识_数字化工业网络—工业互联网的网络技术.pptx
- 顶级c程序员之路 基础篇 - 第一章 关键字的深度理解 number-1
- 用计算机打字英语单词,常用计算机专业英语词汇-前401-500单词
- flex自定义preloader预加载进度条
- EF Code First Migrations数据库迁移 (转帖)
- 网络 如何解决输入路由器管理地址192.168.1.1进不去
- Confluence 6 LDAP 用户结构设置
- 小白的一周学习汇总!
- 暗黑破坏神(DIABLOII 1.11B)BOT 及源代码公开下载
- 网站做渗透测试服务的步骤
- IGCT器件是什么?
- Skype和LibFetion无法输入中文的解决方法
- 啊哈,在PDD买了一套自己的盗版书
- 单片机数字钟(调时,调时闪烁,万年历,年月日)超详细解析
- 宏基因组理论教程1宏基因组简介
热门文章
- 一个印度人写的VC串口类CSerialCom(有串口基础介绍)
- 网易云音乐params和encSecKey生成原理
- mysql生产cdm文件_powerdesigner中CDM转化成PDM导出mysql脚本
- 闲谈“个人核心竞争力”与“危机感”
- 成功解决tensorflow.python.framework.errors_impl.InvalidArgumentError报错问题
- 3.6 51单片机-动态数码管
- ActivityManager: Killing *pid + 包名*: excessive cpu 21890 during 300019 dur=45344791 limit=2
- 经济危机===丐帮也裁员!!!(各企业裁员统计)
- 方正高影仪 linux驱动下载,方正Founder HD1000 驱动
- UE4移动平台AR开发快速预览