目录

  • kclient消息中间件

    • kclient-processor

      • top.ninwoo.kclient.app.KClientApplication
      • top.ninwoo.kclient.app.KClientController
      • top.ninwoo.kclient.app.handler.AnimalsHandler
      • top.ninwoo.kclient.app.domain
      • 总结
    • kclient-core
      • top.ninwoo.kafka.kclient.boot.KClientBoot
      • createObjectHandler
      • createObjectsHandler
      • createDocumentHandler
      • createBeanHandler
      • createBeansHandler
      • invokeHandler
      • 生产者和消费者创建方法
      • 小结
    • top.ninwoo.kafka.kclient.boot.KafkaHandlerMeta
    • top.ninwoo.kafka.kclient.core.KafkaProducer
    • top.ninwoo.kafka.kclient.core.KafkaConsumer
      • init()
      • initAsyncThreadPool()
    • initKafka
    • startup()
    • AbstractMessageTask
      • SequentialMessageTask && SequentialMessageTask
    • 总结:

最近在拜读李艳鹏的《可伸缩服务架构——框架与中间件》,该篇随笔,针对第二章的KClient(kafka消息中间件)源码解读项目,进行学习。

kclient消息中间件

从使用角度上开始入手学习

kclient-processor

该项目使用springboot调用kclient库,程序目录如下:

  • domain

    • Cat : 定义了一个cat对象
    • Dog : 定义了一个Dog对象
  • handler : 消息处理器
    • AnimalsHandler : 定义了Cat和Dog的具体行为
  • KClientApplication.java : Spring boot的主函数——程序执行入口
  • KClientController.java : Controller 文件

top.ninwoo.kclient.app.KClientApplication

1.启动Spring Boot

ApplicationContext ctxBackend = SpringApplication.run(KClientApplication.class, args);

2.启动程序后将自动加载KClientController(@RestController)

top.ninwoo.kclient.app.KClientController

1.通过@RestController,使@SpringBootApplication,可以自动加载该Controller

2.通过kafka-application.xml加载Beans

private ApplicationContext ctxKafkaProcessor =new ClassPathXmlApplicationContext("kafka-application.xml");

kafka-application.xml声明了一个kclient bean,并设置其初始化执行init方法,具体实现见下章具体实现。

<bean name="kClientBoot" class="top.ninwoo.kafka.kclient.boot.KClientBoot" init-method="init"/>

另外声明了一个扫描消息处理器的bean

<context:component-scan base-package="top.ninwoo.kclient.app.handler" />

具体内容在下一节介绍

  1. 使用@RequestMapping定义/,/status,/stop,/restart定义了不同的接口

这些接口实现比较简单,需要注意的是他们调用的getKClientBoot()函数。

上文,我们已经通过xml中,添加了两个Bean,调用Bean的具体实现方法如下:

    private KClientBoot getKClientBoot() {return (KClientBoot) ctxKafkaProcessor.getBean("kClientBoot");}

通过Bean获取到KClient获取到了KClientBoot对象,便可以调用其具体方法。

top.ninwoo.kclient.app.handler.AnimalsHandler

消息处理函数

1.使用@KafkaHandlers进行声明bean,关于其具体实现及介绍在具体实现中进行介绍

2.定义了三个处理函数

  • dogHandler
  • catHandler
  • ioExceptionHandler

dogHandler

具体处理很简单,主要分析@InputConsumer和@Consumer的作用,具体实现将在后续进行介绍。

    @InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test", streamNum = 1)@OutputProducer(propertiesFile = "kafka-producer.properties", defaultTopic = "test1")public Cat dogHandler(Dog dog) {System.out.println("Annotated dogHandler handles: " + dog);return new Cat(dog);}
  • @InputConsumer根据输入参数定义了一个Consumer,通过该Consumer传递具体值给dog,作为该处理函数的
    输入。
  • @OutputProducer根据输入参数定义一个Producer,而该处理函数最后返回的Cat对象,将通过该Producer最终传递到Kafka中

以下的功能与上述相同,唯一需要注意的是 @InputConsumer@OutputProducer可以单独存在。

@InputConsumer(propertiesFile = "kafka-consumer.properties", topic = "test1", streamNum = 1)public void catHandler(Cat cat) throws IOException {System.out.println("Annotated catHandler handles: " + cat);throw new IOException("Man made exception.");}@ErrorHandler(exception = IOException.class, topic = "test1")public void ioExceptionHandler(IOException e, String message) {System.out.println("Annotated excepHandler handles: " + e);}

top.ninwoo.kclient.app.domain

只是定义了Cat和Dog对象,不做赘述。

总结

到这里,总结下我们都实现了哪些功能?

  1. 程序启动
  2. 调用KClientBoot.init()方法
  3. AnimalsHandler定义了消费者和生产者的具体方法。

kclient-core

kclient消息中间件的主体部分,该部分将会涉及

  • kafka基本操作
  • 反射

项目结构如下:

  • boot

    • ErrorHandler
    • InputConsumer
    • OutputProducer
    • KafkaHandlers
    • KClientBoot
    • KafkaHandler
    • KafkaHandlerMeta
  • core
    • KafkaConsumer
    • KafkaProducer
  • excephandler
    • DefaultExceptionHandler
    • ExceptionHandler
  • handlers
    • BeanMessageHandler
    • BeansMessageHandler
    • DocumentMessageHandler
    • ObjectMessageHandler
    • ObjectsMessageHandler
    • MessageHandler
    • SafelyMessageHandler
  • reflection.util
    • AnnotationHandler
    • AnnotationTranversor
    • TranversorContext

在接下来的源码阅读中,我将按照程序执行的顺序进行解读。如果其中涉及到没有讨论过的模块,读者可以向下翻阅。这么
做的唯一原因,为了保证思维的连续性,尽可能不被繁杂的程序打乱。

top.ninwoo.kafka.kclient.boot.KClientBoot

如果读者刚刚阅读上一章节,那么可能记得,我们注册了一个kClientBoot的bean,并设置了初始化函数init(),所以,在kclient源码的阅读中
,我们将从该文件入手,开始解读。

    public void init() {meta = getKafkaHandlerMeta();if (meta.size() == 0)throw new IllegalArgumentException("No handler method is declared in this spring context.");for (final KafkaHandlerMeta kafkaHandlerMeta : meta) {createKafkaHandler(kafkaHandlerMeta);}}

1.该函数,首先获取了一个HandlerMeta,我们可以简单理解,在这个数据元中,存储了全部的Handler信息,这个Handler信息指的是上一章节中通过@KafkaHandlers定义的处理函数,
具体实现见top.ninwoo.kafka.kclient.boot.KafkaHandlerMeta

2.获取数据元之后,通过循环,创建对应的处理函数。

    for (final KafkaHandlerMeta kafkaHandlerMeta : meta) {createKafkaHandler(kafkaHandlerMeta);}

3.getKafkaHandlerMeta函数的具体实现

a.通过applicationContext获取包含kafkaHandlers注解的Bean名称。

String[] kafkaHandlerBeanNames = applicationContext.getBeanNamesForAnnotation(KafkaHandlers.class);

b.通过BeanName获取到Bean对象

    Object kafkaHandlerBean = applicationContext.getBean(kafkaHandlerBeanName);Class<? extends Object> kafkaHandlerBeanClazz = kafkaHandlerBean.getClass();

c.构建mapData数据结构,具体构建见top.ninwoo.kafka.kclient.reflection.util.AnnotationTranversor

    Map<Class<? extends Annotation>, Map<Method, Annotation>> mapData = extractAnnotationMaps(kafkaHandlerBeanClazz);

d.map转数据元并添加到数据元meta list中。

    meta.addAll(convertAnnotationMaps2Meta(mapData, kafkaHandlerBean));

4.循环遍历创建kafkaHandler

    for (final KafkaHandlerMeta kafkaHandlerMeta : meta) {createKafkaHandler(kafkaHandlerMeta);}

createKafkaHandler()函数的具体实现:

a.通过meta获取clazz中的参数类型

Class<? extends Object> paramClazz = kafkaHandlerMeta.getParameterType()

b.创建kafkaProducer

KafkaProducer kafkaProducer = createProducer(kafkaHandlerMeta);

c.创建ExceptionHandler

List<ExceptionHandler> excepHandlers = createExceptionHandlers(kafkaHandlerMeta);

d.根据clazz的参数类型,选择消息转换函数

    MessageHandler beanMessageHandler = null;if (paramClazz.isAssignableFrom(JSONObject.class)) {beanMessageHandler = createObjectHandler(kafkaHandlerMeta,kafkaProducer, excepHandlers);} else if (paramClazz.isAssignableFrom(JSONArray.class)) {beanMessageHandler = createObjectsHandler(kafkaHandlerMeta,kafkaProducer, excepHandlers);} else if (List.class.isAssignableFrom(Document.class)) {beanMessageHandler = createDocumentHandler(kafkaHandlerMeta,kafkaProducer, excepHandlers);} else if (List.class.isAssignableFrom(paramClazz)) {beanMessageHandler = createBeansHandler(kafkaHandlerMeta,kafkaProducer, excepHandlers);} else {beanMessageHandler = createBeanHandler(kafkaHandlerMeta,kafkaProducer, excepHandlers);}

e.创建kafkaConsumer,并启动

KafkaConsumer kafkaConsumer = createConsumer(kafkaHandlerMeta,beanMessageHandler);kafkaConsumer.startup();

f.创建KafkaHanlder,并添加到列表中

        KafkaHandler kafkaHandler = new KafkaHandler(kafkaConsumer,kafkaProducer, excepHandlers, kafkaHandlerMeta);kafkaHandlers.add(kafkaHandler);

createExceptionHandlers的具体实现

1.创建一个异常处理列表

List<ExceptionHandler> excepHandlers = new ArrayList<ExceptionHandler>();

2.从kafkaHandlerMeta获取异常处理的注解

for (final Map.Entry<ErrorHandler, Method> errorHandler : kafkaHandlerMeta.getErrorHandlers().entrySet()) {

3.创建一个异常处理对象

ExceptionHandler exceptionHandler = new ExceptionHandler() {public boolean support(Throwable t) {}public void handle(Throwable t, String message) {}
support方法判断异常类型是否和输入相同
    public boolean support(Throwable t) {// We handle the exception when the classes are exactly samereturn errorHandler.getKey().exception() == t.getClass();}

handler方法,进一步对异常进行处理

1.获取异常处理方法

Method excepHandlerMethod = errorHandler.getValue();

2.使用Method.invoke执行异常处理方法

excepHandlerMethod.invoke(kafkaHandlerMeta.getBean(),t, message);

这里用到了一些反射原理,以下对invoke做简单介绍

public Object invoke(Object obj,Object... args)throws IllegalAccessException,IllegalArgumentException,InvocationTargetException

参数:

  • obj 从底层方法被调用的对象
  • args 用于方法的参数

在该项目中的实际情况如下:

Method实际对应top.ninwoo.kclient.app.handler.AnimalsHandler中的:

    @ErrorHandler(exception = IOException.class, topic = "test1")public void ioExceptionHandler(IOException e, String message) {System.out.println("Annotated excepHandler handles: " + e);}

参数方面:

  • kafkaHandlerMeta.getBean() : AninmalsHandler
  • t
  • message

invoke完成之后,将会执行ioExceptionHandler函数


4.添加异常处理到列表中

excepHandlers.add(exceptionHandler);

createObjectHandler

createObjectsHandler

createDocumentHandler

createBeanHandler

createBeansHandler

以上均实现了类似的功能,只是创建了不同类型的对象,然后重写了不同的执行函数。

实现原理和异常处理相同,底层都是调用了invoke函数,通过反射机制启动了对应的函数。

下一节对此做了详细介绍

invokeHandler

1.获取对应Method方法

Method kafkaHandlerMethod = kafkaHandlerMeta.getMethod();

2.执行接收返回结果

Object result = kafkaHandlerMethod.invoke(kafkaHandlerMeta.getBean(), parameter);

3.如果生产者非空,意味着需要通过生产者程序将结果发送到Kafka中

if (kafkaProducer != null) {if (result instanceof JSONObject)kafkaProducer.send(((JSONObject) result).toJSONString());else if (result instanceof JSONArray)kafkaProducer.send(((JSONArray) result).toJSONString());else if (result instanceof Document)kafkaProducer.send(((Document) result).getTextContent());elsekafkaProducer.send(JSON.toJSONString(result));

生产者和消费者创建方法

protected KafkaConsumer createConsumer(final KafkaHandlerMeta kafkaHandlerMeta,MessageHandler beanMessageHandler) {KafkaConsumer kafkaConsumer = null;if (kafkaHandlerMeta.getInputConsumer().fixedThreadNum() > 0) {kafkaConsumer = new KafkaConsumer(kafkaHandlerMeta.getInputConsumer().propertiesFile(), kafkaHandlerMeta.getInputConsumer().topic(), kafkaHandlerMeta.getInputConsumer().streamNum(), kafkaHandlerMeta.getInputConsumer().fixedThreadNum(), beanMessageHandler);} else if (kafkaHandlerMeta.getInputConsumer().maxThreadNum() > 0&& kafkaHandlerMeta.getInputConsumer().minThreadNum() < kafkaHandlerMeta.getInputConsumer().maxThreadNum()) {kafkaConsumer = new KafkaConsumer(kafkaHandlerMeta.getInputConsumer().propertiesFile(), kafkaHandlerMeta.getInputConsumer().topic(), kafkaHandlerMeta.getInputConsumer().streamNum(), kafkaHandlerMeta.getInputConsumer().minThreadNum(), kafkaHandlerMeta.getInputConsumer().maxThreadNum(), beanMessageHandler);} else {kafkaConsumer = new KafkaConsumer(kafkaHandlerMeta.getInputConsumer().propertiesFile(), kafkaHandlerMeta.getInputConsumer().topic(), kafkaHandlerMeta.getInputConsumer().streamNum(), beanMessageHandler);}return kafkaConsumer;}protected KafkaProducer createProducer(final KafkaHandlerMeta kafkaHandlerMeta) {KafkaProducer kafkaProducer = null;if (kafkaHandlerMeta.getOutputProducer() != null) {kafkaProducer = new KafkaProducer(kafkaHandlerMeta.getOutputProducer().propertiesFile(), kafkaHandlerMeta.getOutputProducer().defaultTopic());}// It may return nullreturn kafkaProducer;}

这两部分比较简单,不做赘述。

小结

KClientBoot.java实现了:

  • 获取使用KafkaHandlers中定义注释的方法及其它信息
  • 基于反射机制,生成处理函数。
  • 执行处理函数
  • 创建对应Producer和Consumer

还剩余几个比较简单的部分,比如shutdownAll()等方法,将在具体实现处进行补充介绍。

到此,整个项目的主体功能都已经实现。接下来,将分析上文中出现频率最高的kafkaHandlerMeta与生产者消费者的具体实现。

top.ninwoo.kafka.kclient.boot.KafkaHandlerMeta

KafkaHandlerMeta存储了全部的可用信息,该类实现比较简单,主要分析其成员对象。

  • Object bean : 存储底层的bean对象
  • Method method : 存储方法对象
  • Class<? extends Object> parameterType : 存储参数的类型
  • InputConsumer inputConsumer : 输入消费者注解对象,其中存储着创建Consumer需要的配置
  • OutputProducer outputProducer : 输出生产者注解对象,其中存储着创建Producer需要的配置
  • Map<ErrorHandler, Method> errorHandlers = new HashMap<ErrorHandler, Method>() 异常处理函数与其方法组成的Map

top.ninwoo.kafka.kclient.core.KafkaProducer

该类主要通过多态封装了kafka Producer的接口,提供了更加灵活丰富的api接口,比较简单不做赘述。

top.ninwoo.kafka.kclient.core.KafkaConsumer

该类的核心功能是:

  1. 加载配置文件
  2. 初始化线程池
  3. 初始化GracefullyShutdown函数
  4. 初始化kafka连接

在这里跳过构造函数,但在进入核心问题前,先明确几个成员变量的作用。

  • streamNum : 创建消息流的数量
  • fixedThreadNum : 异步线程池中的线程数量
  • minThreadNum : 异步线程池的最小线程数
  • maxThreadNum : 异步线程池的最大线程数
  • stream : kafka消息流
  • streamThreadPool : kafka消息处理线程池

在每个构造函数后都调用了init()方法,所以我们从init()入手。另外一个核心方法startup()将在介绍完init()函数进行介绍。

init()

在执行核心代码前,进行了一系列的验证,这里跳过该部分。

1.加载配置文件

properties = loadPropertiesfile();

2.如果共享异步线程池,则初始化异步线程池

sharedAsyncThreadPool = initAsyncThreadPool();

3.初始化优雅关闭

initGracefullyShutdown();

4.初始化kafka连接

initKafka();

initAsyncThreadPool()

完整代码如下:

    private ExecutorService initAsyncThreadPool() {ExecutorService syncThreadPool = null;if (fixedThreadNum > 0)syncThreadPool = Executors.newFixedThreadPool(fixedThreadNum);elsesyncThreadPool = new ThreadPoolExecutor(minThreadNum, maxThreadNum,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());return syncThreadPool;}

首先,如果异步线程数大于0,则使用该参数进行创建线程池。

syncThreadPool = Executors.newFixedThreadPool(fixedThreadNum);

如果线程数不大于0,使用minThreadNum,maxThreadNum进行构造线程池。

syncThreadPool = new ThreadPoolExecutor(minThreadNum, maxThreadNum,60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());

Executors简介

这里介绍Executors提供的四种线程池

  • newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  • newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  • newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
  • newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

ThreadPoolExecutor简介

ThreadPooExecutor与Executor的关系如下:

构造方法:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue);ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

参数说明:

  • corePoolSize

核心线程数,默认情况下核心线程会一直存活,即使处于闲置状态也不会受存keepAliveTime限制。除非将allowCoreThreadTimeOut设置为true。

  • maximumPoolSize

线程池所能容纳的最大线程数。超过这个数的线程将被阻塞。当任务队列为没有设置大小的LinkedBlockingDeque时,这个值无效。

  • keepAliveTime

非核心线程的闲置超时时间,超过这个时间就会被回收。

  • unit

指定keepAliveTime的单位,如TimeUnit.SECONDS。当将allowCoreThreadTimeOut设置为true时对corePoolSize生效。

  • workQueue

线程池中的任务队列.

常用的有三种队列,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue。

  • SynchronousQueue

线程工厂,提供创建新线程的功能。

  • RejectedExecutionHandler

当线程池中的资源已经全部使用,添加新线程被拒绝时,会调用RejectedExecutionHandler的rejectedExecution方法。

initKafka

由于kafka API已经改动很多,所以这里关于Kafka的操作仅做参考,不会详细介绍。

1.加载Consumer配置

ConsumerConfig config = new ConsumerConfig(properties);

2.创建consumerConnector连接

consumerConnector = Consumer.createJavaConsumerConnector(config);

3.存储kafka topic与对应设置的消息流数量

Map<String, Integer> topics = new HashMap<String, Integer>();
topics.put(topic, streamNum);

4.从kafka获取消息流

Map<String, List<KafkaStream<String, String>>> streamsMap = consumerConnector.createMessageStreams(topics, keyDecoder, valueDecoder);
streams = streamsMap.get(topic);

5.创建消息处理线程池

startup()

上述init()主要介绍了kafka消费者的初始化,而startup()则是kafkaConsumer作为消费者进行消费动作的核心功能代码。

1.依次处理消息线程streams中的消息

for (KafkaStream<String, String> stream : streams) {

2.创建消息任务

AbstractMessageTask abstractMessageTask = (fixedThreadNum == 0 ? new SequentialMessageTask(stream, handler) : new ConcurrentMessageTask(stream, handler, fixedThreadNum));

3.添加到tasks中,以方便关闭进程

tasks.add(abstractMessageTask);

4.执行任务

streamThreadPool.execute(abstractMessageTask);

AbstractMessageTask

任务执行的抽象类,核心功能如下从消息线程池中不断获取消息,进行消费。
下面是完整代码,不再详细介绍:

    abstract class AbstractMessageTask implements Runnable {protected KafkaStream<String, String> stream;protected MessageHandler messageHandler;AbstractMessageTask(KafkaStream<String, String> stream,MessageHandler messageHandler) {this.stream = stream;this.messageHandler = messageHandler;}public void run() {ConsumerIterator<String, String> it = stream.iterator();while (status == Status.RUNNING) {boolean hasNext = false;try {// When it is interrupted if process is killed, it causes some duplicate message processing, because it commits the message in a chunk every 30 secondshasNext = it.hasNext();} catch (Exception e) {// hasNext() method is implemented by scala, so no checked// exception is declared, in addtion, hasNext() may throw// Interrupted exception when interrupted, so we have to// catch Exception here and then decide if it is interrupted// exceptionif (e instanceof InterruptedException) {log.info("The worker [Thread ID: {}] has been interrupted when retrieving messages from kafka broker. Maybe the consumer is shutting down.",Thread.currentThread().getId());log.error("Retrieve Interrupted: ", e);if (status != Status.RUNNING) {it.clearCurrentChunk();shutdown();break;}} else {log.error("The worker [Thread ID: {}] encounters an unknown exception when retrieving messages from kafka broker. Now try again.",Thread.currentThread().getId());log.error("Retrieve Error: ", e);continue;}}if (hasNext) {MessageAndMetadata<String, String> item = it.next();log.debug("partition[" + item.partition() + "] offset["+ item.offset() + "] message[" + item.message()+ "]");handleMessage(item.message());// if not auto commit, commit it manuallyif (!isAutoCommitOffset) {consumerConnector.commitOffsets();}}}protected void shutdown() {// Actually it doesn't work in auto commit mode, because kafka v0.8 commits once per 30 seconds, so it is bound to consume duplicate messages.stream.clear();}protected abstract void handleMessage(String message);}

SequentialMessageTask && SequentialMessageTask

或许您还比较迷惑如何在这个抽象类中实现我们具体的消费方法,实际上是通过子类实现handleMessage方法进行绑定我们具体的消费方法。

    class SequentialMessageTask extends AbstractMessageTask {SequentialMessageTask(KafkaStream<String, String> stream,MessageHandler messageHandler) {super(stream, messageHandler);}@Overrideprotected void handleMessage(String message) {messageHandler.execute(message);}}

在该子类中,handleMessage直接执行了messageHandler.execute(message),而没有调用线程池,所以是顺序消费消息。

    class ConcurrentMessageTask extends AbstractMessageTask {private ExecutorService asyncThreadPool;ConcurrentMessageTask(KafkaStream<String, String> stream,MessageHandler messageHandler, int threadNum) {super(stream, messageHandler);if (isSharedAsyncThreadPool)asyncThreadPool = sharedAsyncThreadPool;else {asyncThreadPool = initAsyncThreadPool();}}@Overrideprotected void handleMessage(final String message) {asyncThreadPool.submit(new Runnable() {public void run() {// if it blows, how to recovermessageHandler.execute(message);}});}protected void shutdown() {if (!isSharedAsyncThreadPool)shutdownThreadPool(asyncThreadPool, "async-pool-"+ Thread.currentThread().getId());}}

在ConcurrentMessageTask中, handleMessage调用asyncThreadPool.submit()提交了任务到异步线程池中,是一个并发消费。

而messageHandler是通过KClientBoot的createKafkaHandler创建并发送过来的,所以实现了最终的消费。

总结:

到此全部的项目解读完毕,如果仍有疑惑,可以参看李艳鹏老师的《可伸缩服务架构框架与中间件》一书,同时也可以与我联系交流问题。

转载于:https://www.cnblogs.com/NinWoo/p/9798270.html

KClient——kafka消息中间件源码解读相关推荐

  1. Kafka Eagle 源码解读

    1.概述 在<Kafka 消息监控 - Kafka Eagle>一文中,简单的介绍了 Kafka Eagle这款监控工具的作用,截图预览,以及使用详情.今天笔者通过其源码来解读实现细节.目 ...

  2. 知秋源码解读分享系列

    作为一个乐于分享的人,我希望通过一些成熟优秀的代码库,来向大家展示读源码思路以及阐述编程方面的技巧,也希望大家从中思考并得到属于自己的一套编程方法论. 半年以来,已进行72小时时长的源码解读分享视频录 ...

  3. 涨薪秘籍之源码篇:Spring全家桶源码解读,大师晋级笔记

    最近很多人留言说今年找工作太难了,要涨薪几乎更难了 !真的就这么悲观吗?其实不然,我们企业一直都在大量的招人,一直都没停过.只是一点和之前不一样要求变高了,优秀的人一直都缺. 我身边朋友所在的大厂也一 ...

  4. Bert系列(二)——源码解读之模型主体

    本篇文章主要是解读模型主体代码modeling.py.在阅读这篇文章之前希望读者们对bert的相关理论有一定的了解,尤其是transformer的结构原理,网上的资料很多,本文内容对原理部分就不做过多 ...

  5. Bert系列(三)——源码解读之Pre-train

    https://www.jianshu.com/p/22e462f01d8c pre-train是迁移学习的基础,虽然Google已经发布了各种预训练好的模型,而且因为资源消耗巨大,自己再预训练也不现 ...

  6. linux下free源码,linux命令free源码解读:Procps free.c

    linux命令free源码解读 linux命令free源码解读:Procps free.c 作者:isayme 发布时间:September 26, 2011 分类:Linux 我们讨论的是linux ...

  7. nodeJS之eventproxy源码解读

    1.源码缩影 !(function (name, definition) { var hasDefine = typeof define === 'function', //检查上下文环境是否为AMD ...

  8. PyTorch 源码解读之即时编译篇

    点击上方"AI遇见机器学习",选择"星标"公众号 重磅干货,第一时间送达 作者丨OpenMMLab 来源丨https://zhuanlan.zhihu.com/ ...

  9. Alamofire源码解读系列(九)之响应封装(Response)

    本篇主要带来Alamofire中Response的解读 前言 在每篇文章的前言部分,我都会把我认为的本篇最重要的内容提前讲一下.我更想同大家分享这些顶级框架在设计和编码层次究竟有哪些过人的地方?当然, ...

最新文章

  1. 使用Python进行科学计算:NumPy入门
  2. vm ububtu突然没网
  3. mysql 按照in id顺序_Mysql查询结果顺序按in()中ID的顺序排列的案例区分
  4. 多线程threading初识,线程等待
  5. 非注解和注解的处理器映射器和适配器---SpringMVC学习笔记(三)
  6. 保护SQL Server数据库的十大绝招
  7. 旋转图像Python解法
  8. (7)3分钟搞定 C# 逻辑运算
  9. MySQL 导出命令
  10. 第五章数理统计--样本和抽样分布
  11. 奥特曼系列ol服务器名称带怪兽,《奥特曼系列ol》怪兽图鉴 怪兽阵容
  12. 获取input file绝对路径_IO--File对象
  13. UITableViewController 滚动引起的cocos2d动画暂停问题的解决
  14. PHP 怎样理解go指针,golang指针传递和值传递的区别是什么?
  15. Swift2.0新特性
  16. Canvas 输出位图
  17. 《App后台开发运维和架构实践》推荐序
  18. CPU Cycle(CPU 周期)、Instruction Cycle(指令周期)、Clock Cycle(时钟周期)
  19. Nginx 之 realip模块 使用详解
  20. Multiplayer Moo

热门文章

  1. Darknet_Yolov4实战(一)_安装Ubuntu+cuda+cudnn
  2. mysql拼图游戏代码_Java小项目之拼图游戏
  3. 进阶Frida--Android逆向之动态加载dex Hook(三)
  4. 【译】KNOWLEDGE EXTRACTION FROM UNSTRUCTURED TEXTS
  5. 知识图谱学习小组学习大纲
  6. Cold-Staking | TPoS vs LPoS vs DPoS
  7. Android Binder设计与实现 - 实现篇(1)
  8. Android开发工具之Android Studio----Gradle
  9. JZOJ 5354. 【NOIP2017提高A组模拟9.9】导弹拦截
  10. Hdu 1754 . I Hate It