Kafka.utils,顾名思义,就是一个工具套件包,里面的类封装了很多常见的功能实现——说到这里,笔者有一个感触:当初为了阅读Kafka源代码而学习了Scala语言,本以为Kafka的实现会用到很多函数编程(Functional Programming, FP),结果目前来看,大部分还是很朴素地以面向对象的方式来实现的,只有很少一部分集合的处理使用诸如map,reduce这样的FP方式。不能不说有点小小的遗憾。——当然也许后面Kafka的核心代码中会看到更多FP的身影。

下图就是kafka.utils包的所有代码:
因为很难像其他包代码之间有逻辑关系,我们就一个一个说吧:
一、Annotations.scala
这个源代码文件中定义了3个注释类:threadsafe、nonthreadsafe和immutable。它们都继承了StaticAnnotation——Scala提供的StaticAnnotation类似于Java中的@Target(ElementType.TYPE),因此主要的作用域是类和接口。具体到这三个元注解(meta-annotation),很容易知道它们的含义:分别标记线程安全、非线程安全和不可变性。Kafka开发中常用到的SimpleConsumer类就是被标记为@threadsafe的。
二. CommandLineUtils.scala
这个文件使用JOpt Simple库负责解析命令行参数,具体使用用法参见官网:http://pholser.github.io/jopt-simple/
Kafka在这个文件中提供了一个object:CommandLineUtils。具体包含的方法有:
1. printUsageAndDie: 打印命令使用方法并终止程序
2. checkRequiredArgs:使用Jopts Simple的API(以下皆同)检查是否缺少必要参数
3. checkInvalidArgs:检查指定的参数是否存在不兼容情况,即哪些参数不能同时使用
4. parseKeyValueArgs:解析key=value格式的参数对,并返回一个Properties对象
三、Crc32.scala
这个类就是CRC32校验码的实现类,来自于Hadoop提供的PureJavaCrc32类——CRC32校验码的纯Java实现版本。这个类很长,里面有很多位操作,由于CRC32计算不在本次研究范围,所以就了解到这吧。
四、DelayedItem.scala
这个类是个泛型类,实现了java.util.Delayed接口。用于标记那些在给定延迟时间之后执行的对象。该类接收一个泛型T,一个延迟时间以及延迟时间的单位。另外,实现这个接口的话必须要实现一个compareTo和getDelay方法。
1. getDelay: 计算距离触发时间还剩下多长时间
2. compareTo: 比较2个Delayed对象的延迟触发时间
五、FileLock.scala
顾名思义,FileLock就是一个文件锁,它的构造函数接收一个文件对象,并总是先尝试创建这个文件(如果不存在的话),然后创建一个FileChannel对象对该文件进行随机读写操作。同时创建一个java.nio.channel.FlieLock文件锁对象用于实现下面的方法:
1. lock: 对文件加锁,如果该文件上已有锁抛出异常
2. tryLock: 尝试对文件加锁,如果成功返回true,否则返回false
3. unlock: 如果持有锁使用FileLock.release方法释放锁
4. destroy: 先释放锁然后调用FileChannel的close方法销毁该channel
六、IteratorTemplate.scala
这个文件视图定义一个迭代器模板,主要为遍历消息集合使用。迭代器模板有一个状态字段,因此在定义迭代器模板抽象类之前首先定义了一个State状态object,以及一组具体的状态object:完成(DONE),READY(准备就绪),NOT_READY(未准备)和FAILED(失败)。
之后就是定义IteratorTemplate抽象类了,它同时实现了trait Iterator和java Iterator接口——可谓迭代器领域的集大成者:)
 
如前所述,该类有个字段表明了迭代器的状态:state,还有一个nextItem字段执行遍历中的下一个对象,当然初始化为null——说起null,想到一个题外话。我很怀疑Kafka的开发人员是深度的Java编程人员亦或是强面向对象开发人员,Scala推荐使用Option来代替null的,可Kafka的代码中null还是随处可见,当然可能也是为了更好更自然地与Java集成。
 
这个抽象类提供很多方法,但似乎只有一个抽象方法:makeNext,其他全是具体方法:
1. next:如果迭代器已遍历完并无法找到下一项或下一项为空,直接抛出异常;否则将状态置为NOT_READY并返回下一项
2. peek:只是探查一下迭代器是否遍历完,如果是抛出异常,否则直接返回下一项,并不做非空判断,也不做状态设置
3. hasNext: 如果状态为FAILED直接抛出异常,如果是DONE返回false,如果是READY返回true,否则调用maybeComputeNext方法
4. makeNext: 返回下一项,这是你需要唯一需要实现的抽象方法。同时你还需要在该方法中对状态字段进行更新
5. maybeComputeNext:调用makeNext获取到下一项,如果状态是DONE返回false,否则返回true并将状态置为READY
6. allDone: 将状态置为DONE并返回null
7. resetStatus:顾名思义,就是重置状态字段为NOT_READY
七、JSON.scala
JSON的一个封装类,用于JSON到String的相互转换,该类不是线程安全的。Scala提供的JSON是将数字型的字符串转化为Double,不过该类创建一个简单函数用于将数字型字符串转为换Integer,并指定其为JSON.globalNumberParser。该类只有2个方法:
1. parseFull: 调用scala JSON的parseFull方法将一个json字符串转化为一个对象,如果出错则抛出异常
2. encode: 讲一个对象编码成json字符串。这个对象只能是null,Boolean,String,Number,Map[String, T],Array[T]或Iterable[T]中的一种,否则会报错

转载于:https://www.cnblogs.com/huxi2b/p/4378439.html

【原创】Kakfa utils源代码分析(一)相关推荐

  1. 【原创】Kakfa utils源代码分析(三)

    Kafka utils包最后一篇~~~ 十五.ShutdownableThread.scala 可关闭的线程抽象类! 继承自Thread同时还接收一个boolean变量isInterruptible表 ...

  2. 【原创】Kakfa utils源代码分析(二)

    我们继续研究kafka.utils包 八.KafkaScheduler.scala 首先该文件定义了一个trait:Scheduler--它就是运行任务的一个调度器.任务调度的方式支持重复执行的后台任 ...

  3. 【原创】k8s源代码分析-----kubelet(8)pod管理

    本文QQ空间链接:http://user.qzone.qq.com/29185807/blog/1460540474 本文csdn博客链接:http://blog.csdn.net/screscent ...

  4. Spark SQL之External DataSource外部数据源(二)源代码分析

    上周Spark1.2刚公布,周末在家没事,把这个特性给了解一下,顺便分析下源代码,看一看这个特性是怎样设计及实现的. /** Spark SQL源代码分析系列文章*/ (Ps: External Da ...

  5. 区块链教程Fabric1.0源代码分析Peer peer channel命令及子命令实现

    区块链教程Fabric1.0源代码分析Peer peer channel命令及子命令实现,2018年下半年,区块链行业正逐渐褪去发展之初的浮躁.回归理性,表面上看相关人才需求与身价似乎正在回落.但事实 ...

  6. spark(1.1) mllib 源代码分析

    在spark mllib 1.1加入版本stat包,其中包括一些统计数据有关的功能.本文分析中卡方检验和实施的主要原则: 一个.根本 在stat包实现Pierxunka方检验,它包括以下类别 (1)适 ...

  7. 区块链教程Fabric1.0源代码分析Tx(Transaction 交易)一

    区块链教程Fabric1.0源代码分析Tx(Transaction 交易)一,2018年下半年,区块链行业正逐渐褪去发展之初的浮躁.回归理性,表面上看相关人才需求与身价似乎正在回落.但事实上,正是初期 ...

  8. Spark SQL 源代码分析系列

    从决定写Spark SQL文章的源代码分析,到现在一个月的时间,一个又一个几乎相同的结束很快,在这里也做了一个综合指数,方便阅读,下面是读取顺序 :) 第一章 Spark SQL源代码分析之核心流程 ...

  9. Spark SQL Catalyst源代码分析之TreeNode Library

    /** Spark SQL源代码分析系列文章*/ 前几篇文章介绍了Spark SQL的Catalyst的核心执行流程.SqlParser,和Analyzer,本来打算直接写Optimizer的,可是发 ...

最新文章

  1. 2015级C++第14周实践项目 模板
  2. 2.27 MapReduce Shuffle过程如何在Job中进行设置
  3. 结构体struct 联合体 union
  4. AppBox升级进行时 - 拥抱Entity Framework的Code First开发模式
  5. Windows内核符号表学习总结
  6. 【链接】Linux C/C++ 学习路线-已拿腾讯、百度 offer
  7. JS之Boolean的valueOf方法
  8. 一文详解 Dubbo 注册发现流程
  9. 温故10个经典排序算法(Java版)
  10. Eos的Wasm智能合约的局限性
  11. php常用技术与thinkphp5,thinkPHP5框架路由常用知识点汇总
  12. 2012Android开发热门资料110
  13. 【To Do 难点】最大搜索二叉树
  14. 【hdu1018】Big Number(求n!的位数----斯大林公式/log函数)
  15. Visio2019自动闪退问题
  16. 计算机专业研究生核心能力培养(2)——如何更好的写代码
  17. C语言学习之请编程序将“China”译成密码,密码规律是:用原来的字母后面第4个字母代替原来的字母。
  18. JAVA有percentile函数吗_Python numpy.percentile函数方法的使用
  19. mysql netcdf_NetCDF Overview
  20. 学习编程,你真正需要的是什么样配置的电脑?

热门文章

  1. ocx控件 postmessage消息会消失_实战经验:如何检测CMFCTabCtrl控件标签页切换事件...
  2. Linux下出现ping:unknown host xxx.xxx问题的解决办法
  3. 选项卡,下拉菜单操做时的页面数据更新,highcharts,d3 结合。
  4. WPF中的动画——(四)缓动函数
  5. Java poi读取,写入Excel2003
  6. VS 2012 找回消失的“创建单元测试”
  7. WCF 第八章 安全 客户端认证
  8. ASP.NET站点导航(五)
  9. springboot 整合 kafka demo 顺便看一下源码
  10. Android 人脸照片对比,人脸对比