2019独角兽企业重金招聘Python工程师标准>>>

上一篇我们分析了Metadata的更新机制,其中涉及到一个问题,就是Sender如何跟服务器通信,也就是网络层。同很多Java项目一样,Kafka client的网络层也是用的Java NIO,然后在上面做了一层封装。

下面首先看一下,在Sender和服务器之间的部分:

可以看到,Kafka client基于Java NIO封装了一个网络层,这个网络层最上层的接口是KakfaClient。其层次关系如下:

在本篇中,先详细对最底层的Java NIO进行讲述。< 喎�"/kf/ware/vc/" target="_blank" class="keylink">vcD4NCjxoMSBpZD0="nio的4大组件">NIO的4大组件

Buffer与Channel

Channel: 在通常的Java网络编程中,我们知道有一对Socket/ServerSocket对象,每1个socket对象表示一个connection,ServerSocket用于服务器监听新的连接。
在NIO中,与之相对应的一对是SocketChannel/ServerSocketChannel。

下图展示了SocketChannel/ServerSocketChannel的类继承层次

?

1

2

3

4

5

6

7

8

9

10

11

12

public interface Channel extends Closeable {

    public boolean isOpen();

    public void close() throws IOException;

}

public interface ReadableByteChannel extends Channel {

    public int read(ByteBuffer dst) throws IOException;

}

public interface WritableByteChannel extends Channel {

    public int write(ByteBuffer src) throws IOException;

}

从代码可以看出,一个Channel最基本的操作就是read/write,并且其传进去的必须是ByteBuffer类型,而不是普通的内存buffer。

Buffer:在NIO中,也有1套围绕Buffer的类继承层次,在此就不详述了。只需知道Buffer就是用来封装channel发送/接收的数据。

Selector

Selector的主要目的是网络事件的 loop 循环,通过调用selector.poll,不断轮询每个Channel上读写事件

SelectionKey

SelectionKey用来记录一个Channel上的事件集合,每个Channel对应一个SelectionKey。
SelectionKey也是Selector和Channel之间的关联,通过SelectionKey可以取到对应的Selector和Channel。

关于这4大组件的协作、配合,下面来详细讲述。

4种网络IO模型

epoll与IOCP

在《Unix环境高级编程》中介绍了以下4种IO模型(实际不止4种,但常用的就这4种):

阻塞IO: read/write的时候,阻塞调用

非阻塞IO: read/write,没有数据,立马返回,轮询

IO复用:read/write一次都只能监听一个socket,但对于服务器来讲,有成千上完个socket连接,如何用一个函数,可以监听所有的socket上面的读写事件呢?这就是IO复用模型,对应linux上面,就是select/poll/epoll3种技术。

异步IO:linux上没有,windows上对应的是IOCP。

Reactor模式 vs. Preactor模式

相信很多人都听说过网络IO的2种设计模式,关于这2种模式的具体阐述,可以自行google之。

在此处,只想对这2种模式做一个“最通俗的解释“:

Reactor模式:主动模式,所谓主动,是指应用程序不断去轮询,问操作系统,IO是否就绪。Linux下的select/poll/epooll就属于主动模式,需要应用程序中有个循环,一直去poll。
在这种模式下,实际的IO操作还是应用程序做的。

Proactor模式:被动模式,你把read/write全部交给操作系统,实际的IO操作由操作系统完成,完成之后,再callback你的应用程序。Windows下的IOCP就属于这种模式,再比如C++ Boost中的Asio库,就是典型的Proactor模式。

epoll的编程模型--3个阶段

在Linux平台上,Java NIO就是基于epoll来实现的。所有基于epoll的框架,都有3个阶段:
注册事件(connect,accept,read,write), 轮询IO是否就绪,执行实际IO操作。

下面的代码展示了在linux下,用c语言epoll编程的基本框架:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

//阶段1: 调用epoll_ctl(xx) 注册事件

for( ; ; )

    {

        nfds = epoll_wait(epfd,events,20,500);     //阶段2:轮询所有的socket

        for(i=0;i<nfds;++i) .data.fd="=listenfd)" 0="" accept="" connfd="accept(listenfd,(sockaddr" else="" epollin="" ev.data.fd="connfd;" ev.data.ptr="md;" ev.events="EPOLLIN|EPOLLET;" md="(myepoll_data*)events[i].data.ptr;" n="read(sockfd," sockfd="md-" struct="">fd;

                send( sockfd, md->ptr, strlen((char*)md->ptr), 0 );        //阶段3: 执行实际的io操作

                ev.data.fd=sockfd;

                ev.events=EPOLLIN|EPOLLET;

                epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //回到阶段1,重新注册事件

            }

            else

            {

                //其他的处理

            }

        }

    }</nfds;++i)>

同样, NIO中的Selector同样有以下3个阶段,下面把Selector和epoll的使用做个对比:

可以看到,2者只是写法不同,同样的, 都有这3个阶段。

下面的表格展示了connect, accept, read, write 这4种事件,分别在这3个阶段对应的函数:

下面看一下Kafka client中Selector的核心实现:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

@Override

public void poll(long timeout) throws IOException {

    。。。

    clear(); //清空各种状态

    if (hasStagedReceives())

        timeout = 0;

    long startSelect = time.nanoseconds();

    int readyKeys = select(timeout);  //轮询

    long endSelect = time.nanoseconds();

    currentTimeNanos = endSelect;

    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

    if (readyKeys > 0) {

        Set<selectionkey> keys = this.nioSelector.selectedKeys();

        Iterator<selectionkey> iter = keys.iterator();

        while (iter.hasNext()) {

            SelectionKey key = iter.next();

            iter.remove();

            KafkaChannel channel = channel(key);

            // register all per-connection metrics at once

            sensors.maybeRegisterConnectionMetrics(channel.id());

            lruConnections.put(channel.id(), currentTimeNanos);

            try {

                if (key.isConnectable()) {  //有连接事件

                    channel.finishConnect();

                    this.connected.add(channel.id());

                    this.sensors.connectionCreated.record();

                }

                if (channel.isConnected() && !channel.ready())

                    channel.prepare(); //这个只有需要安全检查的SSL需求,普通的不加密的channel,prepare()为空实现

                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { //读就绪

                    NetworkReceive networkReceive;

                    while ((networkReceive = channel.read()) != null)

                        addToStagedReceives(channel, networkReceive); //实际的读动作

                }

                if (channel.ready() && key.isWritable()) {  //写就绪

                    Send send = channel.write(); //实际的写动作

                    if (send != null) {

                        this.completedSends.add(send);

                        this.sensors.recordBytesSent(channel.id(), send.size());

                    }

                }

                /* cancel any defunct sockets */

                if (!key.isValid()) {

                    close(channel);

                    this.disconnected.add(channel.id());

                }

            } catch (Exception e) {

                String desc = channel.socketDescription();

                if (e instanceof IOException)

                    log.debug("Connection with {} disconnected", desc, e);

                else

                    log.warn("Unexpected error from {}; closing connection", desc, e);

                close(channel);

                this.disconnected.add(channel.id());

            }

        }

    }

    addToCompletedReceives();

    long endIo = time.nanoseconds();

    this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());

    maybeCloseOldestConnection();

}</selectionkey></selectionkey>

epoll和selector在注册上的差别

从代码可以看出, Selector和epoll在代码结构上基本一样,但在事件的注册上面有区别:

epoll: 每次read/write之后,都要调用epoll_ctl重新注册

Selector: 注册一次,一直有效,一直会有事件产生,因此需要取消注册。下面来详细分析一下:

connect事件的注册

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

//Selector

    public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {

        if (this.channels.containsKey(id))

            throw new IllegalStateException("There is already a connection for id " + id);

        SocketChannel socketChannel = SocketChannel.open();

        。。。

        try {

            socketChannel.connect(address);

        } catch (UnresolvedAddressException e) {

            socketChannel.close();

            throw new IOException("Can't resolve address: " + address, e);

        } catch (IOException e) {

            socketChannel.close();

            throw e;

        }

        SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);  //构造channel的时候,注册connect事件

        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);

        key.attach(channel);

        this.channels.put(id, channel);

    }

connect事件的取消

?

1

2

3

4

5

6

7

8

9

10

11

//在上面的poll函数中,connect事件就绪,也就是指connect连接完成,连接简历

 if (key.isConnectable()) {  //有连接事件

       channel.finishConnect();

                        ...

     }

 //PlainTransportLayer

 public void finishConnect() throws IOException {

        socketChannel.finishConnect();  //调用channel的finishConnect()

        key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ); //取消connect事件,新加read事件组册

    }

read事件的注册

从上面也可以看出,read事件的注册和connect事件的取消,是同时进行的

read事件的取消

因为read是要一直监听远程,是否有新数据到来,所以不会取消,一直监听

write事件的注册

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

//Selector

    public void send(Send send) {

        KafkaChannel channel = channelOrFail(send.destination());

        try {

            channel.setSend(send);

        } catch (CancelledKeyException e) {

            this.failedSends.add(send.destination());

            close(channel);

        }

    }

//KafkaChannel

    public void setSend(Send send) {

        if (this.send != null)

            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");

        this.send = send;

        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);  //每调用一次Send,注册一次Write事件

    }

Write事件的取消

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

//上面的poll函数里面

                    if (channel.ready() && key.isWritable()) { //write事件就绪

                        Send send = channel.write(); //在这个write里面,取消了write事件

                        if (send != null) {

                            this.completedSends.add(send);

                            this.sensors.recordBytesSent(channel.id(), send.size());

                        }

                    }

    private boolean send(Send send) throws IOException {

        send.writeTo(transportLayer);

        if (send.completed())

            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);  //取消write事件

        return send.completed();

    }

总结一下:
(1)“事件就绪“这个概念,对于不同事件类型,还是有点歧义的

read事件就绪:这个最好理解,就是远程有新数据到来,需要去read

write事件就绪:这个指什么呢? 其实指本地的socket缓冲区有没有满。没有满的话,应该就是一直就绪的,可写

connect事件就绪: 指connect连接完成

accept事件就绪:有新的连接进来,调用accept处理

(2)不同类型事件,处理方式是不一样的:

connect事件:注册1次,成功之后,就取消了。有且仅有1次

read事件:注册之后不取消,一直监听

write事件: 每调用一次send,注册1次,send成功,取消注册

转载于:https://my.oschina.net/tantexian/blog/1563936

Kafka源码分析-序列3 -Producer -Java NIO(Reactor VS Peactor)相关推荐

  1. Kafka 源码分析之网络层(二)

    上一篇介绍了概述和网络层模型实现<Kafka 源码分析之网络层(一)>,本编主要介绍在Processor中使用的nio selector的又一封装,负责具体数据的接收和发送. PS:丰富的 ...

  2. Kafka源码分析10:副本状态机ReplicaStateMachine详解 (图解+秒懂+史上最全)

    文章很长,建议收藏起来,慢慢读! Java 高并发 发烧友社群:疯狂创客圈 奉上以下珍贵的学习资源: 免费赠送 经典图书:<Java高并发核心编程(卷1)> 面试必备 + 大厂必备 +涨薪 ...

  3. kafka源码分析-consumer的分区策略

    kafka源码分析-consumer的分区策略 1.AbstractPartitionAssignor 2.RangeAssignor 3.RoundRobinAssignor 4.StickyAss ...

  4. apache kafka源码分析-Producer分析---转载

    原文地址:http://www.aboutyun.com/thread-9938-1-1.html 问题导读 1.Kafka提供了Producer类作为java producer的api,此类有几种发 ...

  5. kafka源码分析之一server启动分析

    0. 关键概念 关键概念 Concepts Function Topic 用于划分Message的逻辑概念,一个Topic可以分布在多个Broker上. Partition 是Kafka中横向扩展和一 ...

  6. Kafka 源码分析之网络层(一)

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据.小编会给大家带来几期 Kafka 相关的源码分析文章.这一系列文章是基于kafka 0.9.1版本,今天 ...

  7. 终于拿到了阿里P8架构师分享的JCF和JUC源码分析与实现笔记java岗

    时代的一粒尘,落在每个人身上,就是一座山". 时代更迭变换,我们好像都知道今天与昨天不同,又好像肉眼看不出哪里不同. 但其实它就正在以各种各样的方式体现在每一个普通人身上. 疫情爆发三个月的 ...

  8. 从源码分析RocketMQ系列-Producer的SendResult来自哪里?

    导语   对于消息中间件大家都应该不陌生,现在比较主流的消息中间件有Kafka.RabbitMQ.RocketMQ.ActiveMQ等等.前段时间花了很长时间分析了关于RocketMQ源码,之前也分享 ...

  9. 【源码分析设计模式 5】Java I/O系统中的装饰器模式

    一.基本介绍 动态地将责任附加到对象上.若要扩展功能,装饰者提供了比继承更有弹性的替代方案. 二.装饰器模式的结构 1.Component,抽象构件 Component是一个接口或者抽象类,是定义我们 ...

最新文章

  1. 裁员、亏损、倒闭,2019 十大 AI 失败案例回顾
  2. ​采访了14位技术公司的创始人,他们如何看待2020年的AI行业?
  3. 运算符 - PHP手册笔记
  4. 一些stl格式的点云的显示结果
  5. Quartz 框架快速入门(三)
  6. oracle索引建立
  7. Python 数据类型--Bytes类型
  8. Django 页面报错 Maximum recursion depth exceeded
  9. 认识Javascript数组
  10. leetcode141. 环形链表
  11. 从企金的授信方案延申到个金授信的思考
  12. Eclipse取消Process Validating
  13. 使用PXE+DHCP+Apache+Kickstart批量安装CentOS5.4 x86_64
  14. 启动服务提示-bash: mongod: command not found
  15. Hive 内部表外部表
  16. 前端笔记 | CSS进阶
  17. 实际应用中installshield的事件处理
  18. no override found for vtkpolydatamapper解决方法
  19. ResNet网络结构解析--Pytorch
  20. linux忘记root密码,单用户模式修改密码

热门文章

  1. python数据结构-如何统计序列中元素的频度
  2. 缓冲区,粘包,解决粘包的方法,
  3. JAVA设计模式之【外观模式】
  4. Date、String、Calendar类型之间的转化
  5. Dapper基础用法
  6. 11.22Daily Scrum(2)
  7. Visual Studio统计有效代码行数
  8. 关于sizeof的一些东西
  9. Linux命令执行顺序
  10. Springboot2 自定义异常处理