文章目录

  • 小技巧(如何看开源框架的源码)
  • 源码解析
  • 阅读源码技巧
    • 打印查看
    • 通过打断点调试
    • 查看调用栈

小技巧(如何看开源框架的源码)

一断点
二打印
三看调用栈
四搜索

源码解析

//设置niosocket工厂
//NioServerSocketChannelFactory看下面bootstrap.setFactory(new NioServerSocketChannelFactory(boss, worker));

NioServerSocketChannelFactory.java

public NioServerSocketChannelFactory(Executor bossExecutor, Executor workerExecutor) {//首先获取当前worker的数量,代码看下面的SelectorUtil.java//接着会调用下面三个参数的构造方法NioServerSocketChannelFactorythis(bossExecutor, workerExecutor, getMaxThreads(workerExecutor));
}
public NioServerSocketChannelFactory(Executor bossExecutor, Executor workerExecutor,int workerCount) {//接着调用下面四个参数的构造方法NioServerSocketChannelFactory//boss默认给的1this(bossExecutor, 1, workerExecutor, workerCount);
}
public NioServerSocketChannelFactory(Executor bossExecutor, int bossCount, Executor workerExecutor,int workerCount) {//开始new一个worker的池子//代码看下面的NioWorkerPool.javathis(bossExecutor, bossCount, new NioWorkerPool(workerExecutor, workerCount));
}

SelectorUtil.java

//默认数量是当前的核数成2
static final int DEFAULT_IO_THREADS = Runtime.getRuntime().availableProcessors() * 2;private static int getMaxThreads(Executor executor) {//MaxThreads最大池大小if (executor instanceof ThreadPoolExecutor) {final int maxThreads = ((ThreadPoolExecutor) executor).getMaximumPoolSize();//取maxThreads和DEFAULT_IO_THREADS两者的最小值return Math.min(maxThreads, SelectorUtil.DEFAULT_IO_THREADS);}//因为我们之前的例子中是给的无限大小的池子,所以这里返回DEFAULT_IO_THREADSreturn SelectorUtil.DEFAULT_IO_THREADS;}

NioWorkerPool.java

public NioWorkerPool(Executor workerExecutor, int workerCount) {//调用自己的NioWorkerPool方法,在下面this(workerExecutor, workerCount, null);
}
public NioWorkerPool(Executor workerExecutor, int workerCount, ThreadNameDeterminer determiner) {//调用父类的构造方法super(workerExecutor, workerCount, false);this.determiner = determiner;//init了一次,代码看下面init方法init();
}
//实现了抽象方法newWorker
@Override
protected NioWorker newWorker(Executor executor) {//new了一个NioWorkerreturn new NioWorker(executor, determiner);
}

NioWOrker.java

public NioWorker(Executor executor, ThreadNameDeterminer determiner) {//调用父类方法,看下面AbstractNioWorker.javasuper(executor, determiner);
}
@Overrideprotected boolean read(SelectionKey k) {final SocketChannel ch = (SocketChannel) k.channel();final NioSocketChannel channel = (NioSocketChannel) k.attachment();final ReceiveBufferSizePredictor predictor =channel.getConfig().getReceiveBufferSizePredictor();final int predictedRecvBufSize = predictor.nextReceiveBufferSize();final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory();int ret = 0;int readBytes = 0;boolean failure = true;ByteBuffer bb = recvBufferPool.get(predictedRecvBufSize).order(bufferFactory.getDefaultOrder());try {while ((ret = ch.read(bb)) > 0) {readBytes += ret;if (!bb.hasRemaining()) {break;}}failure = false;} catch (ClosedChannelException e) {// Can happen, and does not need a user attention.} catch (Throwable t) {fireExceptionCaught(channel, t);}if (readBytes > 0) {bb.flip();
//在这里封装成了ChannelBufferfinal ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);buffer.setBytes(0, bb);buffer.writerIndex(readBytes);// Update the predictor.predictor.previousReceiveBufferSize(readBytes);// Fire the event.产生一个上传的事件//channels里面的一个方法//   * @param message  the received message  //public static void fireMessageReceived(Channel channel, Object message) {//fireMessageReceived(channel, message, null);//}fireMessageReceived(channel, buffer);}if (ret < 0 || failure) {k.cancel(); // Some JDK implementations run into an infinite loop without this.close(channel, succeededFuture(channel));return false;}return true;}

AbstractNioWorker.java

AbstractNioWorker(Executor executor, ThreadNameDeterminer determiner) {//又调用了父类的抽象方法,看下面AbstractNioSelector.javasuper(executor, determiner);}@Overrideprotected void process(Selector selector) throws IOException {Set<SelectionKey> selectedKeys = selector.selectedKeys();// check if the set is empty and if so just return to not create garbage by// creating a new Iterator every time even if there is nothing to process.// See https://github.com/netty/netty/issues/597if (selectedKeys.isEmpty()) {return;}for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {SelectionKey k = i.next();i.remove();try {int readyOps = k.readyOps();if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {//读数据,向上看NioWorker中的read方法if (!read(k)) {// Connection already closed - no need to handle write.continue;}}if ((readyOps & SelectionKey.OP_WRITE) != 0) {writeFromSelectorLoop(k);}} catch (CancelledKeyException e) {close(k);}if (cleanUpCancelledKeys()) {break; // break the loop to avoid ConcurrentModificationException}}}

AbstractNioSelector.java

AbstractNioSelector(Executor executor, ThreadNameDeterminer determiner) {//给了一个线程池this.executor = executor;
//openSelector看下面openSelector方法openSelector(determiner);
}
private void openSelector(ThreadNameDeterminer determiner) {try {//设置当前的selectorselector = SelectorUtil.open();} catch (Throwable t) {throw new ChannelException("Failed to create a selector.", t);}// Start the worker thread with the new Selector.boolean success = false;try {//把这个Nioworker跑起来,因为NioWorker本身是继承AbstractNioSelector这个类的//所以跑的是这个类的run方法//从哪里启动呢?往下看DeadLockProofWorker.java//也就是调用的newThreadRenamingRunnable(id, determiner)//newThreadRenamingRunnable看下面AbstractNioWorker.javaDeadLockProofWorker.start(executor, newThreadRenamingRunnable(id, determiner));success = true;} finally {if (!success) {// Release the Selector if the execution fails.try {selector.close();} catch (Throwable t) {logger.warn("Failed to close a selector.", t);}selector = null;// The method will return to the caller at this point.}}assert selector != null && selector.isOpen();}
public void run() {for (;;) {//标记wakenup状态wakenUp.set(false);//状态监测的代码...//取任务processTaskQueue();//业务处理,这是一个抽象方法,被三个类实现AbstractNioWorker、NioClientBoss、//AbstractNioWorker中的process在上方//NioServerBoss中的process在下方process(selector);
}

NioServerBoss.java

  @Overrideprotected void process(Selector selector) {Set<SelectionKey> selectedKeys = selector.selectedKeys();if (selectedKeys.isEmpty()) {return;}for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {SelectionKey k = i.next();i.remove();NioServerSocketChannel channel = (NioServerSocketChannel) k.attachment();try {// accept connections in a for loop until no new connection is readyfor (;;) {//accept事件SocketChannel acceptedSocket = channel.socket.accept();if (acceptedSocket == null) {break;}//注册的方法在本类的下方,向worker线程里面注册任务registerAcceptedChannel(channel, acceptedSocket, thread);}} catch (CancelledKeyException e) {// Raised by accept() when the server socket was closed.k.cancel();channel.close();} catch (SocketTimeoutException e) {// Thrown every second to get ClosedChannelException// raised.} catch (ClosedChannelException e) {// Closed as requested.} catch (Throwable t) {if (logger.isWarnEnabled()) {logger.warn("Failed to accept a connection.", t);}try {Thread.sleep(1000);} catch (InterruptedException e1) {// Ignore}}}}private static void registerAcceptedChannel(NioServerSocketChannel parent, SocketChannel acceptedSocket,Thread currentThread) {try {ChannelSink sink = parent.getPipeline().getSink();ChannelPipeline pipeline =parent.getConfig().getPipelineFactory().getPipeline();//找到一个worker,通过下面的方法把每个工作均匀的分配给每一个worker// public E nextWorker() {//return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];//  }NioWorker worker = parent.workerPool.nextWorker();//向worker里面注册任务,而不是直接去操作worker//让他关注一下socketchannel,即acceptedSocket注册这个东西//因为register这个方法是继承AbstractNioSelector的,即完成之后需要提交任务并记过wakenup//代码粘贴过来了//public void register(Channel channel, ChannelFuture future) {//Runnable task = createRegisterTask(channel, future);//registerTask(task);// }// protected final void registerTask(Runnable task) {// taskQueue.add(task);//Selector selector = this.selector;//if (selector != null) {//if (wakenUp.compareAndSet(false, true)) {//selector.wakenup();//}//} else {//if (taskQueue.remove(task)) {// the selector was null this means the Worker has already been shutdown.//throw new RejectedExecutionException("Worker has already been shutdown");//}// }// }worker.register(new NioAcceptedSocketChannel(parent.getFactory(), pipeline, parent, sink, acceptedSocket,worker, currentThread), null);} catch (Exception e) {if (logger.isWarnEnabled()) {logger.warn("Failed to initialize an accepted socket.", e);}try {acceptedSocket.close();} catch (IOException e2) {if (logger.isWarnEnabled()) {logger.warn("Failed to close a partially accepted socket.",e2);}}}}

AbstractNioWorker.java

protected ThreadRenamingRunnable newThreadRenamingRunnable(int id, ThreadNameDeterminer determiner) {//this当前的NioWorker,然后给了一个线程的名称return new ThreadRenamingRunnable(this, "New I/O worker #" + id, determiner);}
public ThreadRenamingRunnable(Runnable runnable, String proposedThreadName, ThreadNameDeterminer determiner) {if (runnable == null) {throw new NullPointerException("runnable");}if (proposedThreadName == null) {throw new NullPointerException("proposedThreadName");}this.runnable = runnable;this.determiner = determiner;this.proposedThreadName = proposedThreadName;}
try {//这里runnable执行其实就是NioWorker进行了run
//NioWorker.run其实是运行父类的AbstractNioWorker run
//AbstractNioWorker 的run又调用的是父类AbstractNioSelector的run方法
//AbstractNioSelector的run方法,往上面看runnable.run();}

DeadLockProofWorker.java

public final class DeadLockProofWorker {/*** An <em>internal use only</em> thread-local variable that tells the* {@link Executor} that this worker acquired a worker thread from.*/public static final ThreadLocal<Executor> PARENT = new ThreadLocal<Executor>();public static void start(final Executor parent, final Runnable runnable) {if (parent == null) {throw new NullPointerException("parent");}if (runnable == null) {throw new NullPointerException("runnable");}//通过线程池启动一个Runnableparent.execute(new Runnable() {public void run() {PARENT.set(parent);try {//调用Runnable的run方法,然后返回去看runnable.run();} finally {PARENT.remove();}}});}private DeadLockProofWorker() {}
}

AbstractNioWorkerPool.java

//这里到了他的父类AbstractNioWorkerPool
blic void run() {thread = Thread.currentThread();startupLatch.countDown();int selectReturnsImmediately = 0;Selector selector = this.selector;
...
}
//数组workers
private final AbstractNioWorker[] workers;
//父类的构造方法方法
AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean autoInit) {if (workerExecutor == null) {throw new NullPointerException("workerExecutor");}if (workerCount <= 0) {throw new IllegalArgumentException("workerCount (" + workerCount + ") " + "must be a positive integer.");}//有一个workers的数组,new了workerCount的数组workers = new AbstractNioWorker[workerCount];this.workerExecutor = workerExecutor;if (autoInit) {init();}}
protected void init() {if (!initialized.compareAndSet(false, true)) {throw new IllegalStateException("initialized already");}for (int i = 0; i < workers.length; i++) {//对worker进行初始化,具体实现往下看workers[i] = newWorker(workerExecutor);}waitForWorkerThreads();}//是一个抽象方法,具体实现看上面的NioWorkerPool.java中的newWorker方法
protected abstract E newWorker(Executor executor);@SuppressWarnings("unchecked")public E nextWorker() {return (E) workers[Math.abs(workerIndex.getAndIncrement() % workers.length)];}public void rebuildSelectors() {for (AbstractNioWorker worker: workers) {worker.rebuildSelector();}}

阅读源码技巧

打印查看

for(;;){sout(Thread.currentThread().getName()+" "+wakenup);wakenUp.set(false);...sout(Thread.currentThread().getName()+" "+select);int selected = select(selector);...sout(Thread.currentThread().getName()+" "+processTaskQueue);processTaskQueue();...sout(Thread.currentThread().getName()+" "+process);process(selector);
}

输出

...
New I/O server boss #2 process
New I/O server boss #2 wakenup
New I/O server boss #2 select
New I/O server boss #2 processTaskQueue
New I/O server boss #2 process
New I/O server boss #2 wakenup
New I/O server boss #2 select
...
New I/O worker #1 wakenup
New I/O worker #1 select
New I/O worker #1 processTaskQueue
New I/O worker #1 process
New I/O worker #1 wakenup
New I/O worker #1 select
New I/O worker #1 processTaskQueue
New I/O worker #1 process
...

可以看到worker是四个四个循环往复,可是boss线程为什么在select就没有继续执行了?

通过打断点调试

将断点打在wakenup.set
为了只看boss线程,eclipse进入断点的控制页面–》右上角breanpoints–》选中本类右击–》BreakPoint Properties–》新的页面中勾选Conditional–》下面输入“Thread.currentThread().getName().contains(“boss”)”–》点击ok

通过打断点发现到了select方法的时候点进去,然后没有执行默认的select(500),这样的方法,而是执行的select()这种阻塞的方式,所以阻塞住了。

查看调用栈

通过查看调用栈的方式查看某个方法具体调用的堆栈信息
eclipse在debug界面的左上角
idea在debug界面的左下角

转载于:https://www.cnblogs.com/LeesinDong/p/10835360.html

基于Netty的RPC架构学习笔记(五):netty线程模型源码分析(二)相关推荐

  1. Netty网络框架学习笔记-16(心跳(heartbeat)服务源码分析)

    Netty网络框架学习笔记-16(心跳(heartbeat)服务源码分析_2020.06.25) 前言: Netty 作为一个网络框架,提供了诸多功能,比如编码解码等,Netty 还提供了非常重要的一 ...

  2. Ui学习笔记---EasyUI的EasyLoader组件源码分析

    Ui学习笔记---EasyUI的EasyLoader组件源码分析 技术qq交流群:JavaDream:251572072   1.问题1:为什么只使用了dialog却加载了那么多的js   http: ...

  3. Kubernetes学习笔记之Calico CNI Plugin源码解析(二)

    女主宣言 今天小编继续为大家分享Kubernetes Calico CNI Plugin学习笔记,希望能对大家有所帮助. PS:丰富的一线技术.多元化的表现形式,尽在"360云计算" ...

  4. FreeRTOS学习笔记---动态创建任务 xTaskCreate() 源码分析

    在看FreeRTOS源码的时候,各个函数相互调用,各种参数相互传递,看的人云里雾里,越看越糊涂.为了搞清楚各个函数之间的相互关系,就边看源码,边画思维导图,用文字将函数功能描述出来,搞清楚整个函数框架 ...

  5. Python数据挖掘学习笔记】九.回归模型LinearRegression简单分析氧化物数据

    #2018-03-23 16:26:20 March Friday the 12 week, the 082 day SZ SSMR [Python数据挖掘学习笔记]九.回归模型LinearRegre ...

  6. 十年老架构师神级推荐,MyBatis源码分析,再也不用为源码担忧了

    十年老架构师神级推荐,MyBatis源码分析,再也不用为源码担忧了 前言 MyBatis是一个优秀的持久层ORM框架,它对jdbc的操作数据库的过程进行封装,使开发者只需要关注SQL 本身,而不需要花 ...

  7. 10年大厂程序员是如何高效学习使用redis的丨redis源码分析丨redis存储原理

    10年大厂程序员是怎么学习使用redis的 1. redis存储原理分析 2. redis源码学习分享 3. redis跳表和B+树详细对比分析 视频讲解如下,点击观看: 10年大厂程序员是如何高效学 ...

  8. Java并发编程笔记之 CountDownLatch闭锁的源码分析

    转 自: Java并发编程笔记之 CountDownLatch闭锁的源码分析 ​ JUC 中倒数计数器 CountDownLatch 的使用与原理分析,当需要等待多个线程执行完毕后在做一件事情时候 C ...

  9. MyBatis源码学习笔记(从设计模式看源码)

    文章目录 1.源码分析概述 ①.Mybatis架构分析 ②.门面模式 ③.设计模式的原则 2.日志模块分析 ①.适配器模型 ②.动态代理 ③.日志模块分析 3.数据源模块分析 ①.工厂模式 ②.数据源 ...

  10. 终于拿到了阿里P8架构师分享的JCF和JUC源码分析与实现笔记java岗

    时代的一粒尘,落在每个人身上,就是一座山". 时代更迭变换,我们好像都知道今天与昨天不同,又好像肉眼看不出哪里不同. 但其实它就正在以各种各样的方式体现在每一个普通人身上. 疫情爆发三个月的 ...

最新文章

  1. python难度大的题_早看少被坑!Python 最难的问题
  2. Python--状态码的简介与获取方法
  3. Linux怎么对当前目录提权,linux提权方法(不断总结更新)
  4. Spark任务执行期间写临时文件报错导致失败
  5. 量化指标公式源码_通达信指标公式源码线上阴线指标公式
  6. 23 | 二叉树基础(上):什么样的二叉树适合用数组来存储?
  7. iOS活动倒计时的两种实现方式
  8. 移动端html右滑空白,BootStrap.css 在手机端滑动时右侧出现空白的原因及解决办法...
  9. scala 隐式参数入门及应用
  10. vmware 里MAC 鼠标能移动 无法单击
  11. 【光通信-2】多模单模区别/多模光纤颜色区分/光纤跳线头区分
  12. Silverlight IReader阅读器第二版
  13. 软件构造第一次实验感想总结
  14. Bootstrap 框架-下拉菜单
  15. 汽车自主品牌与国际品牌差距为十年来最小;智选假日酒店大中华区已开业200家 | 美通企业日报...
  16. java对齐_java字符串对齐方法
  17. 广告主流量主怎么申请(微信)
  18. 信捷伺服刚性调整_信捷伺服常见问题分析解答.pdf
  19. 大数相乘(数组表示)
  20. 梯度下降—Python实现

热门文章

  1. Tomcat使用总结
  2. OpenCV中踩过的坑系列 01- Mat(int rows, int cols, int type, void* data, size_t step=AUTO_STEP)
  3. c语言程序运行超时是怎么回事,这个运行超时是什么原因?求助~
  4. QQ推广,QQ在线代码
  5. MFC对话框/控件下属性中的事件变成空白?
  6. 2019 ICPC 沈阳站 游记
  7. 三菱plc传送文件到服务器,三菱Q系列PLC通过FTP文件传输案例介绍
  8. java计算机毕业设计基于Web的上门家教系统的设计与实现源码+数据库+系统+lw文档+mybatis+运行部署
  9. 使用 teredo 穿透NAT访问 ipv6
  10. ubuntu编辑只读文件_ubuntu怎么样修改只读文件