因为工作中经常使用到TCP,所以会频繁使用到诸如Mina或Netty之类的通信框架,为了方便项目的逻辑调用,经常会在框架的基础上再一次进行封装,这样做其实有画蛇添足的嫌疑,但也是无奈之举。

这里主要记载使用Mina和Netty,构建适合项目的一个完整的重连逻辑。
当然,都是作为客户端,毕竟一般只有客户端才会做重连。

在这之前,需要考虑几个问题:

  • 连接行为的结果可以较为方便地获得,成功或失败,最好直接有接口回调,可以在回调中进行后续逻辑处理
  • 当前通信连接的活跃状态需要准确实时而方便地获得,这样有利于重连时对连接的判断
  • 能够较为灵活的配置Listener或Handler或Filter
  • 支持计数,无论是首次连接失败多次后不再尝试连接,还是中途断开后断线重连多次后不再尝试连接,一般不作无休止地重连

从代码层面看,框架中最好有一个类似Connector的类,能够暴露合适的接口或方法,提供各种状态与回调,使通信连接的动向能够实时把握,然而事情并不是那么美好。

连接结果

由于框架设计的一些原则,一个connector根本不足以暴露这些接口。
对于Mina而言,作为客户端一般用于连接的连接器是NioSocketConnector
对于Netty而言,则是Bootstrap

下表是一些常见的定义在两个框架中的对比,不一定准确,但意义相近;

定义 Mina Netty
连接器 SocketConnector Bootstrap
会话 IoSession Channel
连接结果 ConnectFuture ChannelFuture
逻辑处理 IoHandler ChannelHandler
过滤器 IoFilter ChannelHandler

对于Mina而言,连接操作是这样的:

            ConnectFuture future = mConnector.connect();future.awaitUninterruptibly();if (future.isConnected()) {//得到会话mSession = future.getSession();}

对于Netty来说,连接可以写成与Mina几乎相同的形式:

            ChannelFuture future = bootstrap.connect();future.awaitUninterruptibly();if(future.isSuccess()){mChannel = future.channel();}

也可以不阻塞等待,两种future都可以自行添加Listener监听异步任务是否完成:

//Minafuture.addListener(new IoFutureListener<IoFuture>() {@Overridepublic void operationComplete(IoFuture ioFuture) {}});
//Nettyfuture.addListener(new GenericFutureListener<ChannelFuture>() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {}});

毕竟是出自一人之手,部分API真是惊人的相似。
到这里,第一个连接返回结果问题算是有所结论,两种框架都可以正常返回连接的结果。

会话状态

而上述代码中,返回的mSession与mChannel就是得到的会话,这两种类各自提供了一些接口,可以用于获得通信连接的实时状态。
Mina的IoSession这里只取部分方法:

public interface IoSession {IoHandler getHandler();IoSessionConfig getConfig();IoFilterChain getFilterChain();ReadFuture read();WriteFuture write(Object var1);WriteFuture write(Object var1, SocketAddress var2);CloseFuture closeNow();boolean isConnected();boolean isActive();boolean isClosing();boolean isSecured();CloseFuture getCloseFuture();SocketAddress getRemoteAddress();SocketAddress getLocalAddress();SocketAddress getServiceAddress();boolean isIdle(IdleStatus var1);boolean isReaderIdle();boolean isWriterIdle();boolean isBothIdle();
}

再对比看下Netty提供的Channel,这里也只取部分方法展示:

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {EventLoop eventLoop();Channel parent();ChannelConfig config();boolean isOpen();boolean isRegistered();boolean isActive();SocketAddress localAddress();SocketAddress remoteAddress();boolean isWritable();Channel.Unsafe unsafe();ChannelPipeline pipeline();public interface Unsafe {SocketAddress localAddress();SocketAddress remoteAddress();void register(EventLoop var1, ChannelPromise var2);void bind(SocketAddress var1, ChannelPromise var2);void connect(SocketAddress var1, SocketAddress var2, ChannelPromise var3);void disconnect(ChannelPromise var1);void close(ChannelPromise var1);void write(Object var1, ChannelPromise var2);void flush();}
}

可以看出,无论是IoSession还是Channel,都有相关的API可以知晓通信是否活跃,所以第二个问题在可以获得IoSession或Channel的情况下,是没有问题的。

配置Handler

那么再看配置Listener或Handler的相差操作是否灵活。
二者在这方面的差别较为明显。

对于Mina而言,添加Handler可以直接利用Connector,真正的逻辑Handler只能由setHandler方法添加,且只能为一个,而相关的Filter则要通过getFilterChain()拿到的过滤器集合去添加;对于Mina来说,Handler和Filter是没有交集的,他们分属不同的接口IoHandler和IoFilter:

mConnector.setHandler(handler);
mConnector.getFilterChain().addLast(CODEC, protocolCodecFilter);

Netty有所不同,netty中所有的handler、filter都是ChannelHandller,这些handler都要在连接行为发生后才能生效,也就是挂载到Channel上的,而不是Bootstrap,一般添加是这样的:

bootstrap.handler(handler);
channel.pipeline().addLast(someHandler);
channel.pipeline().addLast(someFilter);

但handler依旧只能添加一个,如果要添加多个handler或filter,就必须获取到channel,然后进行添加,netty本身提供了一个ChannelInitializer可以用于添加多个channelHandler,一般会这么写:

            bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(handler);channel.pipeline().addLast(someHandler);channel.pipeline().addLast(someFilter);}});

对于Netty来说,Handler和Filter是同一个东西,都是ChannelHandler。

两者在这方面的区别比较明显:
一是netty将handler和filter都统一为handler了,
二是netty不能像mina一样,在未连接之前就可以配置所有的Handler或Filter,netty必须获得channel也就是连接成功后才能配置多个Filter。

这就造成了一个问题,Mina可以提前就配置监听器监听连接的状态,可以正常监听中途断开,也就是在创建Connector后就可以挂载上监听:

        mConnector.getFilterChain().addFirst("reconnect", new IoFilterAdapter() {@Overridepublic void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {              //监听到断开,可接入回调接口,做进一步的重连逻辑mConnector.connect();}});

而Netty不能,创建Connector也就是Bootstrap并不能实现类似的挂载,Bootstrap只能挂载一个Handler,而相关的过滤器或监听只能在Channel出现后再进行挂载,那么就会写成这样:

            bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {//添加其他Filter或Handler}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);//监听到断开,重连bootstrap.connect();}});

这里initChannel方法永远是最先被调用的,因为在源码中是这样的:

//ChannelInitializer.javaprotected abstract void initChannel(C var1) throws Exception;public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {if (this.initChannel(ctx)) {ctx.pipeline().fireChannelRegistered();this.removeState(ctx);} else {ctx.fireChannelRegistered();}}

在这种逻辑下,Mina可以在sessionClosed回调中使用SocketConnetor进行重连,Netty可以在channelInactive回调中使用Bootstrap进行重连。
看起来没什么毛病。

但需要注意一点,就是Handler的复用问题,也就是对Handler或Filter的检查,Mina和Netty都有对Handler的重复添加进行过检查,不过检查逻辑有细微的差别。
Mina中是这样检查的:

//DefaultIoFilterChain.javapublic synchronized void addLast(String name, IoFilter filter) {this.checkAddable(name);this.register(this.tail.prevEntry, name, filter);}private final Map<String, Entry> name2entry = new ConcurrentHashMap();private void checkAddable(String name) {if (this.name2entry.containsKey(name)) {throw new IllegalArgumentException("Other filter is using the same name '" + name + "'");}}

可以看到,Mina只会检查Filter在Map中对应的key是否被使用过,当然理论上Filter挂载在SocketConnector的FilterChain中,只要配置过一次,就无需再进行配置。

那么Netty呢?
Netty的Handler不是能随意复用的,要复用必须标明注解@Sharable,否则就会出现异常:

警告: Failed to initialize a channel. Closing: [id: 0x1caafa97]
io.netty.channel.ChannelPipelineException: io.netty.handler.timeout.IdleStateHandler is not a @Sharable handler, so can't be added or removed multiple times.

这是因为在源码进行检查时,是对Handler本身进行检查的,handler会有一个added的属性,一旦被添加使用过,就会置为true,而判断逻辑会阻止为added=true的handler添加进来 。这样一来,如果强行添加已经添加过的handler就会抛出异常:

//DefaultChannelPipeline.javapublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {AbstractChannelHandlerContext newCtx;synchronized(this) {checkMultiplicity(handler);newCtx = this.newContext(group, this.filterName(name, handler), handler);//省略部分代码}this.callHandlerAdded0(newCtx);return this;}private static void checkMultiplicity(ChannelHandler handler) {if (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h = (ChannelHandlerAdapter)handler;if (!h.isSharable() && h.added) {throw new ChannelPipelineException(h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times.");}h.added = true;}}

这也就说明使用channel.pipeline().addLast(handler)这种方法添加handler时,如果想不同的Channel添加同一个Handler实例,每种handler都必须注解了@Sharable,如果正好要使用IdleStateHandler这种源码内部的Handler,而IdleStateHandler是没有注解过@Sharable,那么就会出现上面的异常。
而实际应用中,为了实现心跳,IdleStateHandler是一般都会使用到的。

那么问题来了,Mina每次重新连接,创建新的session,但只要SocketConnector没有变,所有Handler和Filter自然就没有变,仍然可用,因为所有Handler和Filter是挂载到SocketConnector的FilterChain中,算是只和Connector相关的;
而Netty,如果重新连接的话,会创建新的Channel,然后会重新调用initChannel,然后利用channel.pipeline().addLast添加Handler,算是挂载到Channel上的,而不是Bootstrap上。

这样显示出两者最大的区别就是,Mina中配置一次即可,而Netty则需要每次产生新的Channel时对其进行重新配置。

所以Netty中的handler想复用的话,就必须加注解,否则就会报异常。如果一定要用到无法注解@Sharable的Handler,比如上面的IdleStateHandler,那就要想办法每次initChannel时,也新建一个新的IdleStateHandler…
或者,继承IdleStateHandler,然后加上注解也行,虽然也很丑就是了。

So Bad…

这样的情况下,可以想办法,每次都新建,类似这种:

            FunctionsChannelHandler functionsChannelHandler = new FunctionsChannelHandler(bootstrap){@Overridepublic ChannelHandler[] handlers() {return new ChannelHandler[]{new NormalClientEncoder(),new IdleStateHandler(20, 10, 20),this,new NormalClientHandler()};}};bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {//添加各种handlerchannel.pipeline().addLast(functionsChannelHandler.handlers());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);//监听到断开}});

因为netty把所有监听器过滤器逻辑处理都归为ChannelHandler的原因,把一个handler扩展成一个功能较为丰富的handler是一种不错的方法 。或者沿用这种思路,使其每次新加Handler时,都是new过的Handler。
应对框架自带的一些未注解@Sharable的类,也可以继承之,自行加注解:

@ChannelHandler.Sharable
public class HeartHandler extends IdleStateHandler {public HeartHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds);}public HeartHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {super(readerIdleTime, writerIdleTime, allIdleTime, unit);}public HeartHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {super(observeOutput, readerIdleTime, writerIdleTime, allIdleTime, unit);}
}

这样一来,配置Handler也勉强可算灵活了。

连接计数

对连接计数一般都是开发者编写的逻辑,主要是应对无休止地连接。
主要应用在两种场景:
一是首次连接,如果多次连接不成功,那么停止连接,或者另有逻辑;
二是断线重连,如果多次重连不成功,那么停止连接并销毁,或者另有逻辑。

因为Mina和Netty都是多线程模型的缘故,计数为了求稳可以直接使用Atom类,当然觉得大材小用也可以直接使用普通int值,毕竟理论上两次连接中间应该会有一定延时才对。

应用示例

所以最后,都可以对各自的连接器进行二次封装,然后编写对自己有利的逻辑。
对于Mina,大概可以写成这样:

public class TCPConnector {private static final int BUFFER_SIZE = 10 * 1024;private static final long CONNECT_TIMEOUT_MILLIS = 10 * 1000;private static final int KEEPALIVE_REQUEST_INTERVAL = 10;private static final int KEEPALIVE_REQUEST_TIMEOUT = 40;private static final String RECONNECT = "reconnect";private static final String CODEC = "codec";private static final String HEARTBEAT = "heartbeat";private static final String EXECUTOR = "executor";/*** 连接器*/private SocketConnector mConnector;/*** 会话*/private IoSession mSession;/*** 外用接口*/private IConnectorListener connectorListener;/*** 连接所在线程*/private ExecutorService mExecutor;/*** 重连次数*/private AtomicInteger recconnectCounter;/*** 首次连接次数*/private AtomicInteger connectCounter;private String host;private int port;public interface IConnectorListener {/*** 连接建立成功*/void connectSuccess(IoSession session);/*** 连接建立失败*/void connectFailed();/*** 连接中途断掉时*/void sessionClosed(IoSession session);}public TCPConnector() {mConnector = new NioSocketConnector();recconnectCounter = new AtomicInteger(0);connectCounter = new AtomicInteger(0);}/*** 设置目标地址与端口** @param host 目标地址* @param port 目标端口*/public void setHostPort(String host, int port) {L.d("设置地址与端口-" + host + ":" + port);this.host = host;this.port = port;}public String getHost() {return this.host;}/*** 在子线程中启用连接*/public void connectInThread() {mExecutor.execute(new Runnable() {@Overridepublic void run() {connect();}});}/*** 根据设置的参数连接*/private void connect() {//如果已经连接,则直接【连接成功】if (mSession == null || !mSession.isConnected()) {//连接mConnector.setDefaultRemoteAddress(new InetSocketAddress(host, port));L.i("连接-->" + host + ":" + port);ConnectFuture future = mConnector.connect();//阻塞,等待连接建立响应future.awaitUninterruptibly(CONNECT_TIMEOUT_MILLIS);//响应连接成功或失败if (future.isConnected()) {//得到会话mSession = future.getSession();if (connectorListener != null) {connectCounter.set(0);connectorListener.connectSuccess(mSession);}} else {if (connectorListener != null) {connectCounter.incrementAndGet();connectorListener.connectFailed();}}} else {if (connectorListener != null) {connectCounter.incrementAndGet();connectorListener.connectSuccess(mSession);}}}/*** 重连*/private void reconnect() {if (mConnector == null)throw new IllegalArgumentException("IoConnector cannot be null");//如果已经连接,则直接【连接成功】if (mSession == null || !mSession.isConnected()) {//连接ConnectFuture future = mConnector.connect();//阻塞,等待连接建立响应future.awaitUninterruptibly();//响应连接成功或失败if (future.isConnected()) {//得到会话mSession = future.getSession();}}}/*** 重连** @param reconnectTimeoutMills 连接的超时时间* @param reconnectTimes        重连次数*/public void reconnect(final long reconnectTimeoutMills, final int reconnectTimes) {try {recconnectCounter.set(0);while (mConnector != null && !(mSession != null && mSession.isConnected()) && recconnectCounter.incrementAndGet() < reconnectTimes) {reconnect();if (mSession != null && mSession.isConnected()) {break;}else{TimeUnit.MILLISECONDS.sleep(reconnectTimeoutMills);}L.w(Thread.currentThread().getName() + "," + "重连" + host + ":" + port + "(" + recconnectCounter.get() + ")次...");}} catch (InterruptedException e) {e.printStackTrace();} finally {if (mSession != null && mSession.isConnected()) {if (connectorListener != null) {connectorListener.connectSuccess(mSession);}} else {if (connectorListener != null) {connectorListener.connectFailed();}}}}public IoSession getSession() {return mSession;}public IoConnector getConnector() {return mConnector;}public boolean isActive() {return mConnector != null && mConnector.isActive() && mSession != null;}public boolean isConnected() {return mSession != null && mSession.isConnected();}public IConnectorListener getConnectorListener() {return connectorListener;}public int getRecconnectCounter() {return recconnectCounter.get();}public int getConnectCounter() {return connectCounter.get();}public void resetConnectCounter() {connectCounter.set(0);}/*** 断开连接,释放资源*/public void disconnect() {if (mConnector != null) {connectorListener = null;mConnector.getFilterChain().clear();mConnector.dispose();mConnector = null;}if (mSession != null) {mSession.closeNow();mSession = null;}if (mExecutor != null) {mExecutor.shutdown();mExecutor = null;}L.w("断开");}public static class Builder {private TCPConnector newInstance = new TCPConnector();private ProtocolCodecFilter protocolCodecFilter;private KeepAliveFilter keepAliveFilter;public Builder setExecutor(ExecutorService executor) {newInstance.mExecutor = executor;return this;}public Builder setConnectListener(IConnectorListener listener) {newInstance.connectorListener = listener;return this;}public Builder setHost(String host) {newInstance.host = host;return this;}public Builder setPort(int port) {newInstance.port = port;return this;}public Builder setProtocolCodecFilter(ProtocolCodecFactory protocolCodecFactory) {protocolCodecFilter = new ProtocolCodecFilter(protocolCodecFactory);return this;}public Builder setConnectTimeoutMillis(long connectTimeoutMillis) {newInstance.mConnector.setConnectTimeoutMillis(connectTimeoutMillis);return this;}public Builder setKeepAliveFilter(KeepAliveMessageFactory keepAliveMessageFactory, int keepAliveRequestInterval) {keepAliveFilter = new KeepAliveFilter(keepAliveMessageFactory, IdleStatus.BOTH_IDLE, KeepAliveRequestTimeoutHandler.LOG, keepAliveRequestInterval, KEEPALIVE_REQUEST_TIMEOUT);return this;}public Builder setKeepAliveFilter(KeepAliveMessageFactory keepAliveMessageFactory, KeepAliveRequestTimeoutHandler keepAliveRequestTimeoutHandler, int keepAliveRequestInterval, int keepAliveRequestTimeOut) {keepAliveFilter = new KeepAliveFilter(keepAliveMessageFactory, IdleStatus.BOTH_IDLE, keepAliveRequestTimeoutHandler, keepAliveRequestInterval, keepAliveRequestTimeOut);return this;}public Builder setHandlerAdapter(IoHandlerAdapter handler) {newInstance.mConnector.setHandler(handler);return this;}public Builder setReadBuffer(int size) {newInstance.mConnector.getSessionConfig().setReadBufferSize(size);return this;}public Builder setReceiveBuffer(int size) {newInstance.mConnector.getSessionConfig().setReceiveBufferSize(size);return this;}public Builder setSendBuffer(int size) {newInstance.mConnector.getSessionConfig().setSendBufferSize(size);return this;}public TCPConnector build() {//添加重连监听if (newInstance.connectorListener != null) {newInstance.mConnector.getFilterChain().addFirst(RECONNECT, new IoFilterAdapter() {@Overridepublic void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {if (newInstance != null && newInstance.connectorListener != null)newInstance.connectorListener.sessionClosed(session);}});}//设置编码解码if (protocolCodecFilter != null)newInstance.mConnector.getFilterChain().addLast(CODEC, protocolCodecFilter);//设置心跳if (keepAliveFilter != null)newInstance.mConnector.getFilterChain().addLast(HEARTBEAT, keepAliveFilter);//connector不允许使用OrderedThreadPoolExecutornewInstance.mConnector.getFilterChain().addLast(EXECUTOR, new ExecutorFilter(Executors.newSingleThreadExecutor()));return newInstance;}}@Overridepublic String toString() {return "MinaHelper{" +"mSession=" + mSession +", mConnector=" + mConnector +", connectorListener=" + connectorListener +", mExecutor=" + mExecutor +'}';}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;TCPConnector connector = (TCPConnector) o;return port == connector.port && (host != null ? host.equals(connector.host) : connector.host == null);}@Overridepublic int hashCode() {int result = host != null ? host.hashCode() : 0;result = 31 * result + port;return result;}

使用起来是这样的:

        HigherGateWayHandler higherGateWayHandler = new HigherGateWayHandler();TCPConnector higherGateWayClient = new TCPConnector.Builder().setExecutor(ThreadPool.singleThread("higher_gateway_client")).setHost(NC.GATEWAT_HIGHER_HOST).setPort(NC.LOWER_PORT).setConnectTimeoutMillis(10 * 1000).setReadBuffer(10 * 1024).setHandlerAdapter(higherGateWayHandler).setProtocolCodecFilter(new HigherGateWayCodecFactory()).setKeepAliveFilter(new KeepAliveHigherGateWay(), higherGateWayHandler, 10, 20).setConnectListener(new TCPConnector.IConnectorListener() {@Overridepublic void connectSuccess(IoSession session) {//连接成功后}@Overridepublic void connectFailed() {//重连失败if (higherGateWayClient.getRecconnectCounter() == 3) {//重连失败后}//非重连失败,优先级连接情况下if (higherGateWayClient.getRecconnectCounter() == 0 && higherGateWayClient.getConnectCounter() > 2) {higherGateWayClient.resetConnectCounter();} else {higherGateWayClient.connectInThread();}}@Overridepublic void sessionClosed(IoSession session) {executors.execute(new Runnable() {@Overridepublic void run() {//重连逻辑higherGateWayClient.reconnect(10 * 1000, 3);}});}}).build();

而Netty,封装起来会有一点花里胡哨,目前遇到的问题是当重连以后复用IdleStateHandler这种Handler时,就会使得其中的计时机制失效,也就是说,心跳没用了,暂时不明原因,大概率是其中的线程被销毁无法再起的原因。那么当前就只能想办法每次调用initChannel时,创建新的Handler才行:

public class NettyConnector {/*** 连接器*/private Bootstrap bootstrap;/*** 地址*/private String host;private int port;/*** 会话*/private Channel channel;private static final long TIME_OUT = 10;private long connectTimeoutMills;/*** 重连次数*/private AtomicInteger recconnectCounter;/*** 首次连接次数*/private AtomicInteger connectCounter;/*** 以接口引出通信状态*/public interface IChannelStateListener {void onConnectSuccess(Channel channel);void onConnectFailed();void onDisconnect();}private IChannelStateListener channelStateListener;private NettyConnector(final Builder builder) {recconnectCounter = new AtomicInteger(0);connectCounter = new AtomicInteger(0);connectTimeoutMills = builder.timeoutMills;bootstrap = builder.bootstrap;bootstrap.handler(new ChannelInitializer() {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(new ChannelDisconnectHandler());channel.pipeline().addLast(builder.handlerSet.handlers());}});}public void setRemoteAddress(String host, int port) {L.d("设置地址与端口-" + host + ":" + port);this.host = host;this.port = port;}public void setChannelStateListener(IChannelStateListener listener) {channelStateListener = listener;}public void connect() {if (channel == null || !channel.isActive()) {bootstrap.remoteAddress(this.host, this.port);L.d("第" + (connectCounter.get() + 1) + "次连接" + host + ":" + port + "中......");final long startMills = System.currentTimeMillis();ChannelFuture channelFuture = bootstrap.connect();channelFuture.addListener(new GenericFutureListener<ChannelFuture>() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {L.d("连接(" + bootstrap.config().remoteAddress() + ")成功");channel = f.channel();if (channelStateListener != null) {connectCounter.set(0);channelStateListener.onConnectSuccess(channel);}} else {long delay = System.currentTimeMillis() - startMills;if (delay > 0) {TimeUnit.MILLISECONDS.sleep(connectTimeoutMills - delay);}L.d("连接(" + bootstrap.config().remoteAddress() + ")失败");if (channelStateListener != null) {connectCounter.incrementAndGet();channelStateListener.onConnectFailed();}}}});}}private void reconnect() {if (bootstrap == null)throw new IllegalArgumentException("bootstrap cannot be null");//如果已经连接,则直接【连接成功】if (channel == null || !channel.isActive()) {//连接channel = bootstrap.connect().awaitUninterruptibly().channel();}}/*** 重连* @param reconnectTimeoutMills 重连超时时间* @param reconnectTimes 重连次数*/public void reconnect(final long reconnectTimeoutMills, final int reconnectTimes) {try {recconnectCounter.set(0);while (channel != null && !channel.isActive() && recconnectCounter.getAndIncrement() < reconnectTimes) {L.d(Thread.currentThread().getName() + "," + "重连" + bootstrap.config().remoteAddress() + "(" + recconnectCounter.get() + ")次...");reconnect();if (channel.isActive()) {break;} else {TimeUnit.MILLISECONDS.sleep(reconnectTimeoutMills);}L.d(channel.isActive() + "");}} catch (InterruptedException e) {e.printStackTrace();} finally {if (channel != null && channel.isActive()) {if (channelStateListener != null) {channelStateListener.onConnectSuccess(channel);}} else {if (channelStateListener != null) {channelStateListener.onConnectFailed();}}}}public Channel getChannel() {return channel;}public boolean isConnected() {return channel != null && channel.isActive();}public String getAddress() {return host + ":" + port;}public int getConnectFailedTimes() {return connectCounter.get();}public int getReconnectFailedTimes() {return recconnectCounter.get();}public static class Builder {private Bootstrap bootstrap = new Bootstrap();private HandlerSet handlerSet;private long timeoutMills = 10 * 1000;public Builder group(EventLoopGroup loopGroup) {bootstrap.group(loopGroup);return this;}@Deprecatedpublic Builder remoteAddress(String inetHost, int inetPort) {bootstrap.remoteAddress(inetHost, inetPort);return this;}public Builder setConnectTimeoutMills(long timeout) {timeoutMills = timeout;return this;}public Builder handler(HandlerSet handlers) {handlerSet = handlers;return this;}public NettyConnector build() {bootstrap.channel(NioSocketChannel.class);return new NettyConnector(this);}}/*** 主要用于监听断开*/class ChannelDisconnectHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelInactive();if (channelStateListener != null) {channelStateListener.onDisconnect();}}}/*** 主要用于创建新的handler,避免复用带来的一些问题*/@ChannelHandler.Sharablepublic static abstract class HandlerSet extends ChannelInboundHandlerAdapter {public abstract ChannelHandler[] handlers();}}

因为Netty不能像Mina直接在Connector上挂载监听sessionClosed,只能用一个ChannelDisconnectHandler这样的东西去监听是否已经断开,并通过接口引出结果;
并且因为只能在Channel.Pipeline中才能添加多个Handler的原因,这里用一个HandlerSet强行将所有需要的Handler集合,然后在创建Bootstrap的时候一次性添加进去,想要保证每次都新建,这里就使用抽象方法,让使用的时候可以自行创建。
注意,由于这里的抽象类HandlerSet每次其实并不是新建的,所有是需要复用的,所以需要加注解@Sharable,但也只需要加它一个就行了,其他都是新建出来的,无需理会。
写出来就是这样:

        NettyConnector connector = new NettyConnector.Builder().group(new NioEventLoopGroup()).handler(new NettyConnector.HandlerSet() {@Overridepublic ChannelHandler[] handlers() {return new ChannelHandler[]{new HeartHandler(10, 10, 10),new NormalClientEncoder(),new NormalClientHeartBeatHandler(),new NormalClientHandler()};}}).setConnectTimeoutMills(5 * 1000).build();connector.setRemoteAddress("192.168.0.102", 8000);connector.setChannelStateListener(new NettyConnector.IChannelStateListener() {@Overridepublic void onConnectSuccess(Channel channel) {L.d("连接" + channel.remoteAddress().toString() + "成功");}@Overridepublic void onConnectFailed() {L.d("连接" + connector.getAddress() + "失败");if (connector.getReconnectFailedTimes() == 0 && connector.getConnectFailedTimes() < 3) {connector.connect();}}@Overridepublic void onDisconnect() {L.d(connector.getChannel().remoteAddress().toString() + "已断开");connector.reconnect(5000, 5);}});

其中的HeartHandler是继承自IdleStateHandler的。

整个封装显得花里胡哨…却又很丑,不过勉强能用,水平有限。

就这样吧。

Netty的断线重连相关推荐

  1. Netty客户端断线重连实现及问题思考

    点击关注公众号,利用碎片时间学习 前言 在实现TCP长连接功能中,客户端断线重连是一个很常见的问题,当我们使用netty实现断线重连时,是否考虑过如下几个问题: 如何监听到客户端和服务端连接断开 ? ...

  2. Android 通过 NSD 服务 Netty(断线重连、心跳、黏包处理) 实现两个 Android 系统端的长连接通讯

    引言 近期需求,通过手机App端取号机(含叫号通知功能),实时连接 另一台 Android 广告机用于播放当前被叫到的号数. 这里有两种Android 机 一台「基于Sunmi版的可出小票的Andro ...

  3. 面试官问:服务的心跳机制与断线重连,Netty底层是怎么实现的?懵了

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 心跳机制 何为心跳 所谓心跳, 即在 TCP 长连接中, ...

  4. Netty实现心跳机制与断线重连

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 来源:https://www.jianshu.com/p/ ...

  5. Netty 断线重连解决方案

    Netty 断线重连解决方案 参考文章: (1)Netty 断线重连解决方案 (2)https://www.cnblogs.com/wujinsen/p/8949299.html 备忘一下.

  6. 浅析 Netty 实现心跳机制与断线重连

    基础 何为心跳 顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性. 为什么需要心跳 因为网络的不可 ...

  7. 四、Netty 实现心跳机制与断线重连

    一.概述 何为心跳 顾名思义, 所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性. 为什么需要心跳 因为网络的不 ...

  8. netty心跳过程中 发送消息失败_Netty 4.0 实现心跳检测和断线重连

    arg0.pipeline().addLast("ping", new IdleStateHandler(25, 15, 10,TimeUnit.SECONDS)); 这个处理器, ...

  9. 用Netty撸一个心跳机制和断线重连!

    来源:www.jianshu.com/p/1a28e48edd92 心跳机制 何为心跳 所谓心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确 ...

最新文章

  1. 特征交互(Feature Interaction)及多项式特征(PolynomialFeatures)
  2. 构建本地缓存java_Java8简单的本地缓存实现
  3. 面试「计算机操作系统」知识点大集合!
  4. 一篇不一样的docker原理解析
  5. 计算机应用基础 专2018秋,广东开放大学远程教育专科2018年秋计算机应用基础Word模块测试.pdf...
  6. 互联网基础知识_数字化工业网络—工业互联网的网络技术.pptx
  7. 顶级c程序员之路 基础篇 - 第一章 关键字的深度理解 number-1
  8. 用计算机打字英语单词,常用计算机专业英语词汇-前401-500单词
  9. flex自定义preloader预加载进度条
  10. EF Code First Migrations数据库迁移 (转帖)
  11. 网络 如何解决输入路由器管理地址192.168.1.1进不去
  12. Confluence 6 LDAP 用户结构设置
  13. 小白的一周学习汇总!
  14. 暗黑破坏神(DIABLOII 1.11B)BOT 及源代码公开下载
  15. 网站做渗透测试服务的步骤
  16. IGCT器件是什么?
  17. Skype和LibFetion无法输入中文的解决方法
  18. 啊哈,在PDD买了一套自己的盗版书
  19. 单片机数字钟(调时,调时闪烁,万年历,年月日)超详细解析
  20. 宏基因组理论教程1宏基因组简介

热门文章

  1. 一个印度人写的VC串口类CSerialCom(有串口基础介绍)
  2. 网易云音乐params和encSecKey生成原理
  3. mysql生产cdm文件_powerdesigner中CDM转化成PDM导出mysql脚本
  4. 闲谈“个人核心竞争力”与“危机感”
  5. 成功解决tensorflow.python.framework.errors_impl.InvalidArgumentError报错问题
  6. 3.6 51单片机-动态数码管
  7. ActivityManager: Killing *pid + 包名*: excessive cpu 21890 during 300019 dur=45344791 limit=2
  8. 经济危机===丐帮也裁员!!!(各企业裁员统计)
  9. 方正高影仪 linux驱动下载,方正Founder HD1000 驱动
  10. UE4移动平台AR开发快速预览