为什么80%的码农都做不了架构师?>>>   

本文主要解析一下spring for kafka对原生的kafka client consumer的封装与集成。

consumer工厂

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

protected KafkaConsumer<K, V> createKafkaConsumer(String clientIdSuffix) {if (!this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG) || clientIdSuffix == null) {return createKafkaConsumer();}else {Map<String, Object> modifiedClientIdConfigs = new HashMap<>(this.configs);modifiedClientIdConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG,modifiedClientIdConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG) + clientIdSuffix);return createKafkaConsumer(modifiedClientIdConfigs);}}protected KafkaConsumer<K, V> createKafkaConsumer(Map<String, Object> configs) {return new KafkaConsumer<K, V>(configs, this.keyDeserializer, this.valueDeserializer);}

ConcurrentKafkaListenerContainerFactory

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/ConcurrentKafkaListenerContainerFactory.java

关于consumer的主要的封装在ConcurrentKafkaListenerContainerFactory这个里头,本身的KafkaConsumer是线程不安全的,无法并发操作,这里spring又在包装了一层,根据配置的spring.kafka.listener.concurrency来生成多个并发的KafkaMessageListenerContainer实例 spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {private final ConsumerFactory<K, V> consumerFactory;private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();@Overrideprotected void doStart() {if (!isRunning()) {ContainerProperties containerProperties = getContainerProperties();TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();if (topicPartitions != null&& this.concurrency > topicPartitions.length) {this.logger.warn("When specific partitions are provided, the concurrency must be less than or "+ "equal to the number of partitions; reduced from " + this.concurrency + " to "+ topicPartitions.length);this.concurrency = topicPartitions.length;}setRunning(true);for (int i = 0; i < this.concurrency; i++) {KafkaMessageListenerContainer<K, V> container;if (topicPartitions == null) {container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties);}else {container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties,partitionSubset(containerProperties, i));}if (getBeanName() != null) {container.setBeanName(getBeanName() + "-" + i);}if (getApplicationEventPublisher() != null) {container.setApplicationEventPublisher(getApplicationEventPublisher());}container.setClientIdSuffix("-" + i);container.start();this.containers.add(container);}}}//......
}

KafkaMessageListenerContainer

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {private final ConsumerFactory<K, V> consumerFactory;private final TopicPartitionInitialOffset[] topicPartitions;private ListenerConsumer listenerConsumer;private ListenableFuture<?> listenerConsumerFuture;private GenericMessageListener<?> listener;private GenericAcknowledgingMessageListener<?> acknowledgingMessageListener;private String clientIdSuffix;@Overrideprotected void doStart() {if (isRunning()) {return;}ContainerProperties containerProperties = getContainerProperties();if (!this.consumerFactory.isAutoCommit()) {AckMode ackMode = containerProperties.getAckMode();if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");}if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))&& containerProperties.getAckTime() == 0) {containerProperties.setAckTime(5000);}}Object messageListener = containerProperties.getMessageListener();Assert.state(messageListener != null, "A MessageListener is required");if (messageListener instanceof GenericAcknowledgingMessageListener) {this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener;}else if (messageListener instanceof GenericMessageListener) {this.listener = (GenericMessageListener<?>) messageListener;}else {throw new IllegalStateException("messageListener must be 'MessageListener' "+ "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName());}if (containerProperties.getConsumerTaskExecutor() == null) {SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");containerProperties.setConsumerTaskExecutor(consumerExecutor);}if (containerProperties.getListenerTaskExecutor() == null) {SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-L-");containerProperties.setListenerTaskExecutor(listenerExecutor);}this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);setRunning(true);this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);}@Overrideprotected void doStop(final Runnable callback) {if (isRunning()) {this.listenerConsumerFuture.addCallback(new ListenableFutureCallback<Object>() {@Overridepublic void onFailure(Throwable e) {KafkaMessageListenerContainer.this.logger.error("Error while stopping the container: ", e);if (callback != null) {callback.run();}}@Overridepublic void onSuccess(Object result) {if (KafkaMessageListenerContainer.this.logger.isDebugEnabled()) {KafkaMessageListenerContainer.this.logger.debug(KafkaMessageListenerContainer.this + " stopped normally");}if (callback != null) {callback.run();}}});setRunning(false);this.listenerConsumer.consumer.wakeup();}}//......}

每个KafkaMessageListenerContainer都自己创建一个ListenerConsumer,然后自己创建一个独立的kafka consumer,每个ListenerConsumer在线程池里头运行,这样来实现并发。

每个ListenerConsumer里头都有一个recordsToProcess队列,从原始的kafka consumer poll出来的记录会放到这个队列里头,然后有一个ListenerInvoker线程循环超时等待从recordsToProcess取出记录,然后交给应用程序的KafkaListener标注的方法去执行

private final class ListenerInvoker implements SchedulingAwareRunnable {private final CountDownLatch exitLatch = new CountDownLatch(1);private volatile boolean active = true;private volatile Thread executingThread;ListenerInvoker() {super();}@Overridepublic void run() {Assert.isTrue(this.active, "This instance is not active anymore");if (ListenerConsumer.this.theListener instanceof ConsumerSeekAware) {((ConsumerSeekAware) ListenerConsumer.this.theListener).registerSeekCallback(ListenerConsumer.this);}try {this.executingThread = Thread.currentThread();while (this.active) {try {ConsumerRecords<K, V> records = ListenerConsumer.this.recordsToProcess.poll(1,TimeUnit.SECONDS);if (this.active) {if (records != null) {invokeListener(records);}else {if (ListenerConsumer.this.logger.isTraceEnabled()) {ListenerConsumer.this.logger.trace("No records to process");}}}}catch (InterruptedException e) {if (!this.active) {Thread.currentThread().interrupt();}else {ListenerConsumer.this.logger.debug("Interrupt ignored");}}}}finally {this.active = false;this.exitLatch.countDown();}}@Overridepublic boolean isLongLived() {return true;}private void stop() {if (ListenerConsumer.this.logger.isDebugEnabled()) {ListenerConsumer.this.logger.debug("Stopping invoker");}this.active = false;try {if (!this.exitLatch.await(getContainerProperties().getShutdownTimeout(), TimeUnit.MILLISECONDS)&& this.executingThread != null) {if (ListenerConsumer.this.logger.isDebugEnabled()) {ListenerConsumer.this.logger.debug("Interrupting invoker");}this.executingThread.interrupt();}}catch (InterruptedException e) {if (this.executingThread != null) {this.executingThread.interrupt();}Thread.currentThread().interrupt();}if (ListenerConsumer.this.logger.isDebugEnabled()) {ListenerConsumer.this.logger.debug("Invoker stopped");}}}

这里的invokeListener就是调用listener的onMessage方法

KafkaListener注解

这里我们来看看,标注KafkaListener的方法,最后是怎么包装成ListenerInvoker这个类里头调用的listener的

KafkaListenerAnnotationBeanPostProcessor

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java 这个类会扫描bean的KafkaListener注解,然后将其信息注册到KafkaListenerEndpointRegistrar

  @Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {if (!this.nonAnnotatedClasses.contains(bean.getClass())) {Class<?> targetClass = AopUtils.getTargetClass(bean);Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);final boolean hasClassLevelListeners = classLevelListeners.size() > 0;final List<Method> multiMethods = new ArrayList<Method>();Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,new MethodIntrospector.MetadataLookup<Set<KafkaListener>>() {@Overridepublic Set<KafkaListener> inspect(Method method) {Set<KafkaListener> listenerMethods = findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);}});if (hasClassLevelListeners) {Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,new ReflectionUtils.MethodFilter() {@Overridepublic boolean matches(Method method) {return AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null;}});multiMethods.addAll(methodsWithHandler);}if (annotatedMethods.isEmpty()) {this.nonAnnotatedClasses.add(bean.getClass());if (this.logger.isTraceEnabled()) {this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass());}}else {// Non-empty set of methodsfor (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {Method method = entry.getKey();for (KafkaListener listener : entry.getValue()) {processKafkaListener(listener, method, bean, beanName);}}if (this.logger.isDebugEnabled()) {this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '"+ beanName + "': " + annotatedMethods);}}if (hasClassLevelListeners) {processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);}}return bean;}protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {Method methodToUse = checkProxy(method, bean);MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<K, V>();endpoint.setMethod(methodToUse);endpoint.setBeanFactory(this.beanFactory);processListener(endpoint, kafkaListener, bean, methodToUse, beanName);}protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean,Object adminTarget, String beanName) {endpoint.setBean(bean);endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);endpoint.setId(getEndpointId(kafkaListener));endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));endpoint.setTopics(resolveTopics(kafkaListener));endpoint.setTopicPattern(resolvePattern(kafkaListener));String group = kafkaListener.containerGroup();if (StringUtils.hasText(group)) {Object resolvedGroup = resolveExpression(group);if (resolvedGroup instanceof String) {endpoint.setGroup((String) resolvedGroup);}}KafkaListenerContainerFactory<?> factory = null;String containerFactoryBeanName = resolve(kafkaListener.containerFactory());if (StringUtils.hasText(containerFactoryBeanName)) {Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");try {factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);}catch (NoSuchBeanDefinitionException ex) {throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget+ "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()+ " with id '" + containerFactoryBeanName + "' was found in the application context", ex);}}this.registrar.registerEndpoint(endpoint, factory);}

KafkaListenerEndpointRegistrar

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java

/*** Register a new {@link KafkaListenerEndpoint} alongside the* {@link KafkaListenerContainerFactory} to use to create the underlying container.* <p>The {@code factory} may be {@code null} if the default factory has to be* used for that endpoint.* @param endpoint the {@link KafkaListenerEndpoint} instance to register.* @param factory the {@link KafkaListenerContainerFactory} to use.*/public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {Assert.notNull(endpoint, "Endpoint must be set");Assert.hasText(endpoint.getId(), "Endpoint id must be set");// Factory may be null, we defer the resolution right before actually creating the containerKafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);synchronized (this.endpointDescriptors) {if (this.startImmediately) { // Register and start immediatelythis.endpointRegistry.registerListenerContainer(descriptor.endpoint,resolveContainerFactory(descriptor), true);}else {this.endpointDescriptors.add(descriptor);}}}

这里将KafkaListenerEndpoint包装为KafkaListenerEndpointDescriptor,注册到名为endpointDescriptors的KafkaListenerEndpointDescriptor集合中

KafkaListenerEndpointRegistrar

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java

public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {@Overridepublic void afterPropertiesSet() {registerAllEndpoints();}protected void registerAllEndpoints() {synchronized (this.endpointDescriptors) {for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor));}this.startImmediately = true;  // trigger immediate startup}}
}

这个类实现了InitializingBean接口的afterPropertiesSet方法(初始化bean的时候执行),在这个里头去根据endpointDescriptors去挨个调用registerListenerContainer注册

KafkaListenerEndpointRegistry

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware,ApplicationListener<ContextRefreshedEvent> {protected final Log logger = LogFactory.getLog(getClass()); //NOSONARprivate final Map<String, MessageListenerContainer> listenerContainers =new ConcurrentHashMap<String, MessageListenerContainer>();//......      public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,boolean startImmediately) {Assert.notNull(endpoint, "Endpoint must not be null");Assert.notNull(factory, "Factory must not be null");String id = endpoint.getId();Assert.hasText(id, "Endpoint id must not be empty");synchronized (this.listenerContainers) {Assert.state(!this.listenerContainers.containsKey(id),"Another endpoint is already registered with id '" + id + "'");MessageListenerContainer container = createListenerContainer(endpoint, factory);this.listenerContainers.put(id, container);if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {List<MessageListenerContainer> containerGroup;if (this.applicationContext.containsBean(endpoint.getGroup())) {containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);}else {containerGroup = new ArrayList<MessageListenerContainer>();this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);}containerGroup.add(container);}if (startImmediately) {startIfNecessary(container);}}}protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,KafkaListenerContainerFactory<?> factory) {MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);if (listenerContainer instanceof InitializingBean) {try {((InitializingBean) listenerContainer).afterPropertiesSet();}catch (Exception ex) {throw new BeanInitializationException("Failed to initialize message listener container", ex);}}int containerPhase = listenerContainer.getPhase();if (containerPhase < Integer.MAX_VALUE) {  // a custom phase valueif (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +this.phase + " vs " + containerPhase);}this.phase = listenerContainer.getPhase();}return listenerContainer;}@Overridepublic void start() {for (MessageListenerContainer listenerContainer : getListenerContainers()) {startIfNecessary(listenerContainer);}}@Overridepublic void stop() {for (MessageListenerContainer listenerContainer : getListenerContainers()) {listenerContainer.stop();}}
}

注册的时候将endpoint转换为MessageListenerContainer,然后放到listenerContainers的map当中

AbstractKafkaListenerContainerFactory

spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

public C createListenerContainer(KafkaListenerEndpoint endpoint) {C instance = createContainerInstance(endpoint);if (this.autoStartup != null) {instance.setAutoStartup(this.autoStartup);}if (this.phase != null) {instance.setPhase(this.phase);}if (this.applicationEventPublisher != null) {instance.setApplicationEventPublisher(this.applicationEventPublisher);}if (endpoint.getId() != null) {instance.setBeanName(endpoint.getId());}if (endpoint instanceof AbstractKafkaListenerEndpoint) {AbstractKafkaListenerEndpoint<K, V> aklEndpoint = (AbstractKafkaListenerEndpoint<K, V>) endpoint;if (this.recordFilterStrategy != null) {aklEndpoint.setRecordFilterStrategy(this.recordFilterStrategy);}if (this.ackDiscarded != null) {aklEndpoint.setAckDiscarded(this.ackDiscarded);}if (this.retryTemplate != null) {aklEndpoint.setRetryTemplate(this.retryTemplate);}if (this.recoveryCallback != null) {aklEndpoint.setRecoveryCallback(this.recoveryCallback);}if (this.batchListener != null) {aklEndpoint.setBatchListener(this.batchListener);}}endpoint.setupListenerContainer(instance, this.messageConverter);initializeContainer(instance);return instance;}

这里主要看这个setupMessageListener方法 spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

private void setupMessageListener(MessageListenerContainer container, MessageConverter messageConverter) {Object messageListener = createMessageListener(container, messageConverter);Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener");if (this.retryTemplate != null) {if (messageListener instanceof AcknowledgingMessageListener) {messageListener = new RetryingAcknowledgingMessageListenerAdapter<>((AcknowledgingMessageListener<K, V>) messageListener, this.retryTemplate,this.recoveryCallback);}else {messageListener = new RetryingMessageListenerAdapter<>((MessageListener<K, V>) messageListener,this.retryTemplate, (RecoveryCallback<Object>) this.recoveryCallback);}}if (this.recordFilterStrategy != null) {if (messageListener instanceof AcknowledgingMessageListener) {messageListener = new FilteringAcknowledgingMessageListenerAdapter<>((AcknowledgingMessageListener<K, V>) messageListener, this.recordFilterStrategy,this.ackDiscarded);}else if (messageListener instanceof MessageListener) {messageListener = new FilteringMessageListenerAdapter<>((MessageListener<K, V>) messageListener,this.recordFilterStrategy);}else if (messageListener instanceof BatchAcknowledgingMessageListener) {messageListener = new FilteringBatchAcknowledgingMessageListenerAdapter<>((BatchAcknowledgingMessageListener<K, V>) messageListener, this.recordFilterStrategy,this.ackDiscarded);}else if (messageListener instanceof BatchMessageListener) {messageListener = new FilteringBatchMessageListenerAdapter<>((BatchMessageListener<K, V>) messageListener, this.recordFilterStrategy);}}container.setupMessageListener(messageListener);}

这个messageListener包含了原始endpoint携带的bean以及method转换成的InvocableHandlerMethod,然后注入到MessageListenerContainer(ConcurrentMessageListenerContainer),然后这里就跟上头的ConcurrentMessageListenerContainer这个衔接上,根据配置的spring.kafka.listener.concurrency来生成多个并发的KafkaMessageListenerContainer实例

MethodKafkaListenerEndpoint

createMessageListener这个方法将endpoint包含的原始标注KafkaListener注解的bean以及方法,包装为InvocableHandlerMethod,注入到MessagingMessageListenerAdapter当中

public class MethodKafkaListenerEndpoint<K, V> extends AbstractKafkaListenerEndpoint<K, V> {private Object bean;private Method method;private MessageHandlerMethodFactory messageHandlerMethodFactory;//......@Overrideprotected MessagingMessageListenerAdapter<K, V> createMessageListener(MessageListenerContainer container,MessageConverter messageConverter) {Assert.state(this.messageHandlerMethodFactory != null,"Could not create message listener - MessageHandlerMethodFactory not set");MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance(messageConverter);messageListener.setHandlerMethod(configureListenerAdapter(messageListener));return messageListener;}/*** Create a {@link HandlerAdapter} for this listener adapter.* @param messageListener the listener adapter.* @return the handler adapter.*/protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter<K, V> messageListener) {InvocableHandlerMethod invocableHandlerMethod =this.messageHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod());return new HandlerAdapter(invocableHandlerMethod);}
}

这个类会将原始的bean跟方法包装为InvocableHandlerMethod这个类,然后注入到MessagingMessageListenerAdapter当中

小结

  • 对于生产者来说,封装KafkaProducer到KafkaTemplate相对简单
  • 对于消费者来说,由于spring是采用注解的形式去标注消息处理方法的,所以这里稍微费劲一点:
    • 先在KafkaListenerAnnotationBeanPostProcessor中扫描bean,然后注册到KafkaListenerEndpointRegistrar
    • 而KafkaListenerEndpointRegistrar在afterPropertiesSet的时候去创建MessageListenerContainer
    • messageListener包含了原始endpoint携带的bean以及method转换成的InvocableHandlerMethod
    • ConcurrentMessageListenerContainer这个衔接上,根据配置的spring.kafka.listener.concurrency来生成多个并发的KafkaMessageListenerContainer实例
    • 每个KafkaMessageListenerContainer都自己创建一个ListenerConsumer,然后自己创建一个独立的kafka consumer,每个ListenerConsumer在线程池里头运行,这样来实现并发
    • 每个ListenerConsumer里头都有一个recordsToProcess队列,从原始的kafka consumer poll出来的记录会放到这个队列里头,
    • 然后有一个ListenerInvoker线程循环超时等待从recordsToProcess取出记录,然后调用messageListener的onMessage方法(即KafkaListener注解标准的方法)

ListenerConsumer是重点,里头还有包括offset的提交,这里改天再详解一下。

转载于:https://my.oschina.net/go4it/blog/1547679

聊聊spring for kafka对consumer的封装与集成 1相关推荐

  1. 聊聊spring for kafka对consumer的封装与集成

    序 本文主要解析一下spring for kafka对原生的kafka client consumer的封装与集成. consumer工厂 spring-kafka-1.2.3.RELEASE-sou ...

  2. 从架构演进的角度聊聊Spring Cloud都做了些什么?

    Spring Cloud作为一套微服务治理的框架,几乎考虑到了微服务治理的方方面面,之前也写过一些关于Spring Cloud文章,主要偏重各组件的使用,本次分享主要解答这两个问题:Spring Cl ...

  3. Spring Apache Kafka教程

    在本SpringApache Kafka课程中,我们将学习如何在Spring Boot项目中开始使用Apache Kafka,并开始生成和使用我们所选主题的消息. 除了一个简单的项目外,我们还将深入探 ...

  4. 【基于注解方式】Spring整合Kafka

    文章目录 1. 添加Maven依赖 2. 配置与参数分离 3. 工具类度内容 4. Producer 消息生产者配置 5. Consumer 消息消费者配置 6. 使用注解监听消息 7. 请求测试 8 ...

  5. 聊聊 Spring Boot 2.x 那些事儿

    本文目录: 即将的 Spring 2.0 - Spring 2.0 是什么 - 开发环境和 IDE - 使用 Spring Initializr 快速入门 Starter 组件 - Web:REST ...

  6. spring boot2 kafka

    一.软件版本 1.linux:centos6 2.zookeeper:zookeeper-3.4.1 3.kafka:kafka_2.12-2.2.0 4.jdk:1.8 5.instelliJ Id ...

  7. spring boot+kafka+canal实现监听MySQL数据库

    spring boot+kafka+canal实现监听MySQL数据库 一.zookeeper安装 kafka依赖于zookeeper,安装kafka前先安装zookeeper 下载地址:Apache ...

  8. Spring整合Kafka

    文章目录 Spring整合Kafka 一.引入依赖 二.配置kafka 三.测试代码--如何用kafka 3.1 KafkaTests 3.2 测试结果 Spring整合Kafka 一.引入依赖 &l ...

  9. 最新Kafka教程(包含kafka部署与基本操作、java连接kafka、spring连接kafka以及使用springboot)

    最新Kafka教程(包含kafka部署与基本操作.java连接kafka.spring连接kafka以及使用springboot) 欢迎转载,转载请注明网址:https://blog.csdn.net ...

  10. 聊聊 Spring Boot 2.0 的 WebFlux

    https://zhuanlan.zhihu.com/p/30813274 首发于极乐科技 写文章登录 聊聊 Spring Boot 2.0 的 WebFlux 泥瓦匠BYSocket 4 个月前 聊 ...

最新文章

  1. linux系统读取第二个盘的数据,磁盘及文件系统管理—第二篇
  2. VSCode输出框中文乱码问题
  3. Educational Codeforces Round 41 E. Tufurama (961E)
  4. xxxx must either be declared abstract or implement abstract method ‘map(T)‘ in ‘MapFunction‘
  5. UFLDL教程: Exercise:Learning color features with Sparse Autoencoders
  6. 常用排序算法以及算法性能测试(完整C/C++代码实现)
  7. JavaMail基本使用
  8. 地产IT人福利:帆软地产BI解决方案全解析
  9. 2017年度计算机科学各领域热点词汇
  10. python:容器、迭代器、生成器 简单介绍
  11. ROS☞rosbag/rostopic消息记录、回放、转.txt
  12. asp.net怎么实现按条件查询_用这个提取函数王中王,制作数据查询表
  13. 181023词霸有道扇贝每日一句
  14. 咸宁php培训,PHP培训
  15. opencv3 与opencv2不同之处
  16. 使用接口根据关键词取亚马逊商品数据
  17. 微信小程序打开文档功能
  18. 如何制作一个简单的手机信息页面
  19. Gitlab备份和恢复操作记录
  20. Linux系统C++调试利器systemtap定位内存double free

热门文章

  1. 带你玩转Visual Studio——带你高效开发
  2. 转:jQuery Ajax 实例 全解析
  3. 没有body怎么添加onload事件
  4. iphone中扫描wifi热点
  5. 获取 SQL Server 版本号
  6. 利用图片的 onerror 事件载入默认图片
  7. 《转》python学习(7) -列表
  8. CentOS 7中将Tomcat设置为系统服务
  9. FloatingActionButton
  10. java之接口interface