Apache MINA 2 是一个开发高性能和高可伸缩性网络应用程序的网络应用框架。它提供了一个抽象的事件驱动的异步 API,可以使用 TCP/IP、UDP/IP、串口和虚拟机内部的管道等传输方式。

首先,mina server端acceptor启动方法:

1、NioSocketAcceptor.bind(InetSocketAddress)或者NioSocketAcceptor.bind(SocketAddress...)方法。

例如:

acceptor.bind(new InetSocketAddress(1234));  

mina底层的调用链:

NioSocketAcceptor.bind(InetSocketAddress)-->
AbstractIoAcceptor.bind(SocketAddress localAddress) -->
AbstractIoAcceptor.bind(Iterable<? extends SocketAddress> localAddresses)-->
AbstractPollingIoAcceptor.bindInternal(List<? extends SocketAddress> localAddresses)-->
AbstractPollingIoAcceptor.startupAcceptor()

1、创建线程Acceptor线程-->

2、AbstractIoService.executeWorker(Runnable worker)(提交线程到线程池)

acceptor线程启动run()

1、初始化acceptor端的Selector,即NioSocketAcceptor.Selector

2、NioSocketAcceptor.open(SocketAddress localAddress)

// Register the channel within the selector for ACCEPT event
channel.register(selector, SelectionKey.OP_ACCEPT);

acceptor工作流程:

一、IoService类图

如上图所示,IOService层根据不同的角色又分为IOAcceptor(服务端左半部分)和IOConnector (客户端右半部分),分别用于接受连接与请求连接操作。

二、服务端

2.1、IoAcceptor接口

IoAcceptor相当于是对ServerSocketChannel的封装,最重要的两个操作是绑定(解绑)与接受连接,IoAcceptor接口中有多个重载的bind()方法,当然呼应有多个重载的unbind()方法。

public interface IoAcceptor extends IoService {
void bind() throws IOException;
void bind(SocketAddress localAddress) throws IOException;
void bind(SocketAddress firstLocalAddress, SocketAddress... addresses) throws IOException;
void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException;
void unbind();
void unbind(SocketAddress localAddress);
void unbind(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses);
void unbind(Iterable<? extends SocketAddress> localAddresses);
} 

2.2、IoAcceptor的实现类AbstractIOAcceptor

  IoAcceptor其方法的实现在抽象类AbstractIOAcceptor的bind方法中,这个方法在做了参数检查等操作后,将真正的绑定操作交给abstract bindInternal()来完成。对于bindInternal有基于TCP/IP,UDP/IP,VMPipe三种实现类,分别对应

  • AbstractPollingIoAcceptor对应TCP/IP
  • AbstractPollingConnectionlessIoAcceptor对应UDP/IP
  • VmPipeAcceptor对应串口和虚拟机内部的管道

以TCP/IP为例来看绑定过程,见AbstractPollingIoAcceptor.java的源码:

protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {// Create a bind request as a Future operation. When the selector// have handled the registration, it will signal this future.AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);// adds the Registration request to the queue for the Workers// to handle
        registerQueue.add(request);// creates the Acceptor instance and has the local// executor kick it off.
        startupAcceptor();// As we just started the acceptor, we have to unblock the select()// in order to process the bind request we just have added to the// registerQueue.try {lock.acquire();// Wait a bit to give a chance to the Acceptor thread to do the select()Thread.sleep(10);wakeup();} finally {lock.release();}// Now, we wait until this request is completed.
        request.awaitUninterruptibly();if (request.getException() != null) {throw request.getException();}// Update the local addresses.// setLocalAddresses() shouldn't be called from the worker thread// because of deadlock.Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();for (H handle : boundHandles.values()) {newLocalAddresses.add(localAddress(handle));}return newLocalAddresses;

主要做了以下几件事情:
1、将绑定请求放入registerQueue中
2、启动Acceptor,从Acceptor类的run方法可以看到,这一步会阻塞在Acceptor选择器的选择操作中
3、调用wakeup让选择器返回
4、等待请求处理完成,这一步会阻塞在ready变量中,当ready变量为true时才会返回,当接受连接后ready会被设置为true.

现在重点看一下AbstractPollingIoAcceptor$Acceptor的run方法:

        public void run() {assert (acceptorRef.get() == this);int nHandles = 0;// Release the lock
            lock.release();while (selectable) {try {// Detect if we have some keys ready to be processed// The select() will be woke up if some new connection// have occurred, or if the selector has been explicitly// woke upint selected = select();// this actually sets the selector to OP_ACCEPT,// and binds to the port on which this class will// listen onnHandles += registerHandles();// Now, if the number of registred handles is 0, we can// quit the loop: we don't have any socket listening// for incoming connection.if (nHandles == 0) {acceptorRef.set(null);if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {assert (acceptorRef.get() != this);break;}if (!acceptorRef.compareAndSet(null, this)) {assert (acceptorRef.get() != this);break;}assert (acceptorRef.get() == this);}if (selected > 0) {// We have some connection request, let's process// them here.
                        processHandles(selectedHandles());}// check to see if any cancellation request has been made.nHandles -= unregisterHandles();} catch (ClosedSelectorException cse) {// If the selector has been closed, we can exit the loopbreak;} catch (Throwable e) {ExceptionMonitor.getInstance().exceptionCaught(e);try {Thread.sleep(1000);} catch (InterruptedException e1) {ExceptionMonitor.getInstance().exceptionCaught(e1);}}}// Cleanup all the processors, and shutdown the acceptor.if (selectable && isDisposing()) {selectable = false;try {if (createdProcessor) {processor.dispose();}} finally {try {synchronized (disposalLock) {if (isDisposing()) {destroy();}}} catch (Exception e) {ExceptionMonitor.getInstance().exceptionCaught(e);} finally {disposalFuture.setDone();}}}}

(1)、selector被wakeup唤醒后,调用registerHandles方法从registerQueue中取出请求依次调用open方法

    private int registerHandles() {for (;;) {// The register queue contains the list of services to manage// in this acceptor.AcceptorOperationFuture future = registerQueue.poll();if (future == null) {return 0;}// We create a temporary map to store the bound handles,// as we may have to remove them all if there is an exception// during the sockets opening.Map<SocketAddress, H> newHandles = new ConcurrentHashMap<SocketAddress, H>();List<SocketAddress> localAddresses = future.getLocalAddresses();try {// Process all the addressesfor (SocketAddress a : localAddresses) {H handle = open(a);newHandles.put(localAddress(handle), handle);}// Everything went ok, we can now update the map storing// all the bound sockets.
                boundHandles.putAll(newHandles);// and notify.
                future.setDone();return newHandles.size();} catch (Exception e) {// We store the exception in the future
                future.setException(e);} finally {// Roll back if failed to bind all addresses.if (future.getException() != null) {for (H handle : newHandles.values()) {try {close(handle);} catch (Exception e) {ExceptionMonitor.getInstance().exceptionCaught(e);}}// TODO : add some comment : what is the wakeup() waking up ?
                    wakeup();}}}}

open方法完成了ServerSocket的绑定和注册(NioSocketAcceptor.open(SocketAddress localAddress)方法如下)

    protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {// Creates the listening ServerSocketServerSocketChannel channel = ServerSocketChannel.open();boolean success = false;try {// This is a non blocking socket channelchannel.configureBlocking(false);// Configure the server socket,ServerSocket socket = channel.socket();// Set the reuseAddress flag accordingly with the setting
            socket.setReuseAddress(isReuseAddress());// and bind.
            socket.bind(localAddress, getBacklog());// Register the channel within the selector for ACCEPT event
            channel.register(selector, SelectionKey.OP_ACCEPT);success = true;} finally {if (!success) {close(channel);}}return channel;}

(2)、从(1)中可以知道selector上注册了ServerSocketChannel的OP_ACCEPT键,注册后nHandles==0,selected==0,进行下一次循环,同样是阻塞在select方法上
(3)、当连接到来时,select方法返回,selected>0,执行processHandles方法

        private void processHandles(Iterator<H> handles) throws Exception {while (handles.hasNext()) {H handle = handles.next();handles.remove();// Associates a new created connection to a processor,// and get back a sessionS session = accept(processor, handle);if (session == null) {continue;}initSession(session, null, null);// add the session to the SocketIoProcessor
                session.getProcessor().add(session);}}

该方法在完成真正的接受连接操作后,创建session并扔到processor中,后续的工作交给processor来完成。每个session中其实有一个SocketChannel,这个socketChannel实际上是被注册到了processor的selector上。注册代码在NioProcessor类中可以找到

    protected void init(NioSession session) throws Exception {SelectableChannel ch = (SelectableChannel) session.getChannel();ch.configureBlocking(false);session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));}

总结一下:Acceptor线程专门负责接受连接,在其上有一个selector,轮询是否有连接建立上来,当有连接建立上来,调用ServerSocketChannel.accept方法来接受连接,这个方法返回一个session对象,然后将这个session对象加入processor中,由processor遍历每个session来完成真正的IO操作。processor上也有一个selector与一个Processor线程,selector用于轮询session,Processor线程处理每个session的IO操作。

转载于:https://www.cnblogs.com/duanxz/p/3554658.html

Mina2中IoService相关推荐

  1. GNU Make 使用手册(于凤昌中译版)

    GNU Make 使用手册(中译版) 翻译:于凤昌 GNU make Version 3.79 April 2000 Richard M. Stallman and Roland McGrath 1 ...

  2. Apache MiNa 2 学习笔记

    http://blog.csdn.net/cgwcgw_/article/details/18402769 http://download.csdn.net/detail/xiaozhu_1986/2 ...

  3. Apache-mina学习笔记,非常全都资料,附带大量实例

    Apache Mina2 学习笔记 目录 引言. 3 一.    Mina入门. 3 第一步.下载使用的Jar包. 3 第二步.工程创建配置. 4 第三步.服务端程序. 4 第四步.使用telnet命 ...

  4. 面试:第十二章:所有总结

    Java基础 java基本类型哪些,所占字节 byte :1个字节 short :2个字节 char :2个字节 int :4个字节 long :8个字节 float :4个字节 double :8个 ...

  5. linux内核分析(转自某位大哥网上的笔记)

    启动 当PC启动时,Intel系列的CPU首先进入的是实模式,并开始执行位于地址0xFFFF0处的代码,也就是ROM-BIOS起始位置的代码.BIOS先进行一系列的系统自检,然后初始化位于地址0的中断 ...

  6. Apache Mina2.x网络通信框架使用入门

    关于Apache Mina的文章,资料已经非常多了,我想再多一篇也不过为.另外Main现在3.x版本正在开发中,且已经有M2(里程碑)发布了. 本文中主要针对Mina2.0.9(这个版本也是最后一个2 ...

  7. Android端IM应用中的@人功能实现:仿微博、QQ、微信,零入侵、高可扩展

    本文由"猫爸iYao"原创分享,感谢作者. 1.引言 最近有个需求:评论@人(没错,就是IM聊天或者微博APP里的@人功能),就像下图这样: ▲ 微信群聊界面里的@人功能  ▲ Q ...

  8. 类实现Java模板方法模式中的HookMethod实现

    近期一直在学习类实现之类的问题,今天正好有机会和大家共享一下. 这里说的就是Java里的钩子用法,Apache Mina2.x就是这么用的 首先是抽象类 package com.jadyer.hook ...

  9. nio框架中的多个Selector结构

    随着并发数量的提高,传统nio框架采用一个Selector来支撑大量连接事件的管理和触发已经遇到瓶颈,因此现在各种nio框架的新版本都采用多个Selector并存的结构,由多个Selector均衡地去 ...

最新文章

  1. vs项目移植到linux运行,VS2008项目移植到Linux
  2. 自己对多线程的一点思考
  3. Python基础教程:类的特殊成员及高级特性
  4. JavaFX游戏(四连环)
  5. SQL Server 索引和表体系结构(三)
  6. android oreo 源码,android – Oreo:如何在源代码中找到所有受限制的系统调用?
  7. 剑指offer的前16题-java版
  8. 杭电2112(SPFA)
  9. js调用php会提前加载,为什么js代码里调用了php变量运行就明显变慢了
  10. ROBOTSTXT_OBEY
  11. 华为手机怎么隐藏按键图标_华为手机如何隐藏桌面图标
  12. 我对TCP协议的一点形而上的看法
  13. Windows虚拟主机有哪些优势?
  14. Objective C Bridging header —— swift MD5
  15. java 将多个文件打包成压缩包下载
  16. java caller_callee和caller属性的区别
  17. 华为Mate 20 Pro更新EMUI9.1系统,系统流畅度稳步提高
  18. aplayer得使用,记一次aplayer/Aplayer开发心得
  19. 安卓源码集合,视频播放器手机屏幕助手点融投资悬浮窗抽奖转盘
  20. 从零开始:Python学习成长路线

热门文章

  1. TTL怎么计算拉电流和灌电流_UPS输入输出电流如何计算?电缆如何选择?
  2. html 根作用域,AngularJS入门教程之Scope(作用域)
  3. php操作外部文件,php文件操作-将其他文件的数据添加到本文件中
  4. android cpu绑核
  5. 子集—leetcode78
  6. 四针手表指的是什么_1000~1500元的男士手表推荐
  7. 量子计算入门-第二部分
  8. GCC如何编译内嵌汇编代码
  9. Thumb mode 与 ARM mode比较
  10. linux基础-1.1USB设备(USB1.0以上)连接使用