聊聊spring for kafka对consumer的封装与集成 1
为什么80%的码农都做不了架构师?>>>
序
本文主要解析一下spring for kafka对原生的kafka client consumer的封装与集成。
consumer工厂
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java
protected KafkaConsumer<K, V> createKafkaConsumer(String clientIdSuffix) {if (!this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG) || clientIdSuffix == null) {return createKafkaConsumer();}else {Map<String, Object> modifiedClientIdConfigs = new HashMap<>(this.configs);modifiedClientIdConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG,modifiedClientIdConfigs.get(ConsumerConfig.CLIENT_ID_CONFIG) + clientIdSuffix);return createKafkaConsumer(modifiedClientIdConfigs);}}protected KafkaConsumer<K, V> createKafkaConsumer(Map<String, Object> configs) {return new KafkaConsumer<K, V>(configs, this.keyDeserializer, this.valueDeserializer);}
ConcurrentKafkaListenerContainerFactory
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/ConcurrentKafkaListenerContainerFactory.java
关于consumer的主要的封装在ConcurrentKafkaListenerContainerFactory这个里头,本身的KafkaConsumer是线程不安全的,无法并发操作,这里spring又在包装了一层,根据配置的spring.kafka.listener.concurrency来生成多个并发的KafkaMessageListenerContainer实例 spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java
public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {private final ConsumerFactory<K, V> consumerFactory;private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();@Overrideprotected void doStart() {if (!isRunning()) {ContainerProperties containerProperties = getContainerProperties();TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();if (topicPartitions != null&& this.concurrency > topicPartitions.length) {this.logger.warn("When specific partitions are provided, the concurrency must be less than or "+ "equal to the number of partitions; reduced from " + this.concurrency + " to "+ topicPartitions.length);this.concurrency = topicPartitions.length;}setRunning(true);for (int i = 0; i < this.concurrency; i++) {KafkaMessageListenerContainer<K, V> container;if (topicPartitions == null) {container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties);}else {container = new KafkaMessageListenerContainer<>(this.consumerFactory, containerProperties,partitionSubset(containerProperties, i));}if (getBeanName() != null) {container.setBeanName(getBeanName() + "-" + i);}if (getApplicationEventPublisher() != null) {container.setApplicationEventPublisher(getApplicationEventPublisher());}container.setClientIdSuffix("-" + i);container.start();this.containers.add(container);}}}//......
}
KafkaMessageListenerContainer
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
public class KafkaMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {private final ConsumerFactory<K, V> consumerFactory;private final TopicPartitionInitialOffset[] topicPartitions;private ListenerConsumer listenerConsumer;private ListenableFuture<?> listenerConsumerFuture;private GenericMessageListener<?> listener;private GenericAcknowledgingMessageListener<?> acknowledgingMessageListener;private String clientIdSuffix;@Overrideprotected void doStart() {if (isRunning()) {return;}ContainerProperties containerProperties = getContainerProperties();if (!this.consumerFactory.isAutoCommit()) {AckMode ackMode = containerProperties.getAckMode();if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");}if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))&& containerProperties.getAckTime() == 0) {containerProperties.setAckTime(5000);}}Object messageListener = containerProperties.getMessageListener();Assert.state(messageListener != null, "A MessageListener is required");if (messageListener instanceof GenericAcknowledgingMessageListener) {this.acknowledgingMessageListener = (GenericAcknowledgingMessageListener<?>) messageListener;}else if (messageListener instanceof GenericMessageListener) {this.listener = (GenericMessageListener<?>) messageListener;}else {throw new IllegalStateException("messageListener must be 'MessageListener' "+ "or 'AcknowledgingMessageListener', not " + messageListener.getClass().getName());}if (containerProperties.getConsumerTaskExecutor() == null) {SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");containerProperties.setConsumerTaskExecutor(consumerExecutor);}if (containerProperties.getListenerTaskExecutor() == null) {SimpleAsyncTaskExecutor listenerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-L-");containerProperties.setListenerTaskExecutor(listenerExecutor);}this.listenerConsumer = new ListenerConsumer(this.listener, this.acknowledgingMessageListener);setRunning(true);this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);}@Overrideprotected void doStop(final Runnable callback) {if (isRunning()) {this.listenerConsumerFuture.addCallback(new ListenableFutureCallback<Object>() {@Overridepublic void onFailure(Throwable e) {KafkaMessageListenerContainer.this.logger.error("Error while stopping the container: ", e);if (callback != null) {callback.run();}}@Overridepublic void onSuccess(Object result) {if (KafkaMessageListenerContainer.this.logger.isDebugEnabled()) {KafkaMessageListenerContainer.this.logger.debug(KafkaMessageListenerContainer.this + " stopped normally");}if (callback != null) {callback.run();}}});setRunning(false);this.listenerConsumer.consumer.wakeup();}}//......}
每个KafkaMessageListenerContainer都自己创建一个ListenerConsumer,然后自己创建一个独立的kafka consumer,每个ListenerConsumer在线程池里头运行,这样来实现并发。
每个ListenerConsumer里头都有一个recordsToProcess队列,从原始的kafka consumer poll出来的记录会放到这个队列里头,然后有一个ListenerInvoker线程循环超时等待从recordsToProcess取出记录,然后交给应用程序的KafkaListener标注的方法去执行
private final class ListenerInvoker implements SchedulingAwareRunnable {private final CountDownLatch exitLatch = new CountDownLatch(1);private volatile boolean active = true;private volatile Thread executingThread;ListenerInvoker() {super();}@Overridepublic void run() {Assert.isTrue(this.active, "This instance is not active anymore");if (ListenerConsumer.this.theListener instanceof ConsumerSeekAware) {((ConsumerSeekAware) ListenerConsumer.this.theListener).registerSeekCallback(ListenerConsumer.this);}try {this.executingThread = Thread.currentThread();while (this.active) {try {ConsumerRecords<K, V> records = ListenerConsumer.this.recordsToProcess.poll(1,TimeUnit.SECONDS);if (this.active) {if (records != null) {invokeListener(records);}else {if (ListenerConsumer.this.logger.isTraceEnabled()) {ListenerConsumer.this.logger.trace("No records to process");}}}}catch (InterruptedException e) {if (!this.active) {Thread.currentThread().interrupt();}else {ListenerConsumer.this.logger.debug("Interrupt ignored");}}}}finally {this.active = false;this.exitLatch.countDown();}}@Overridepublic boolean isLongLived() {return true;}private void stop() {if (ListenerConsumer.this.logger.isDebugEnabled()) {ListenerConsumer.this.logger.debug("Stopping invoker");}this.active = false;try {if (!this.exitLatch.await(getContainerProperties().getShutdownTimeout(), TimeUnit.MILLISECONDS)&& this.executingThread != null) {if (ListenerConsumer.this.logger.isDebugEnabled()) {ListenerConsumer.this.logger.debug("Interrupting invoker");}this.executingThread.interrupt();}}catch (InterruptedException e) {if (this.executingThread != null) {this.executingThread.interrupt();}Thread.currentThread().interrupt();}if (ListenerConsumer.this.logger.isDebugEnabled()) {ListenerConsumer.this.logger.debug("Invoker stopped");}}}
这里的invokeListener就是调用listener的onMessage方法
KafkaListener注解
这里我们来看看,标注KafkaListener的方法,最后是怎么包装成ListenerInvoker这个类里头调用的listener的
KafkaListenerAnnotationBeanPostProcessor
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java 这个类会扫描bean的KafkaListener注解,然后将其信息注册到KafkaListenerEndpointRegistrar
@Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {if (!this.nonAnnotatedClasses.contains(bean.getClass())) {Class<?> targetClass = AopUtils.getTargetClass(bean);Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);final boolean hasClassLevelListeners = classLevelListeners.size() > 0;final List<Method> multiMethods = new ArrayList<Method>();Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,new MethodIntrospector.MetadataLookup<Set<KafkaListener>>() {@Overridepublic Set<KafkaListener> inspect(Method method) {Set<KafkaListener> listenerMethods = findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);}});if (hasClassLevelListeners) {Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,new ReflectionUtils.MethodFilter() {@Overridepublic boolean matches(Method method) {return AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null;}});multiMethods.addAll(methodsWithHandler);}if (annotatedMethods.isEmpty()) {this.nonAnnotatedClasses.add(bean.getClass());if (this.logger.isTraceEnabled()) {this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass());}}else {// Non-empty set of methodsfor (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {Method method = entry.getKey();for (KafkaListener listener : entry.getValue()) {processKafkaListener(listener, method, bean, beanName);}}if (this.logger.isDebugEnabled()) {this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '"+ beanName + "': " + annotatedMethods);}}if (hasClassLevelListeners) {processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);}}return bean;}protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {Method methodToUse = checkProxy(method, bean);MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<K, V>();endpoint.setMethod(methodToUse);endpoint.setBeanFactory(this.beanFactory);processListener(endpoint, kafkaListener, bean, methodToUse, beanName);}protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean,Object adminTarget, String beanName) {endpoint.setBean(bean);endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);endpoint.setId(getEndpointId(kafkaListener));endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));endpoint.setTopics(resolveTopics(kafkaListener));endpoint.setTopicPattern(resolvePattern(kafkaListener));String group = kafkaListener.containerGroup();if (StringUtils.hasText(group)) {Object resolvedGroup = resolveExpression(group);if (resolvedGroup instanceof String) {endpoint.setGroup((String) resolvedGroup);}}KafkaListenerContainerFactory<?> factory = null;String containerFactoryBeanName = resolve(kafkaListener.containerFactory());if (StringUtils.hasText(containerFactoryBeanName)) {Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");try {factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);}catch (NoSuchBeanDefinitionException ex) {throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget+ "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()+ " with id '" + containerFactoryBeanName + "' was found in the application context", ex);}}this.registrar.registerEndpoint(endpoint, factory);}
KafkaListenerEndpointRegistrar
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java
/*** Register a new {@link KafkaListenerEndpoint} alongside the* {@link KafkaListenerContainerFactory} to use to create the underlying container.* <p>The {@code factory} may be {@code null} if the default factory has to be* used for that endpoint.* @param endpoint the {@link KafkaListenerEndpoint} instance to register.* @param factory the {@link KafkaListenerContainerFactory} to use.*/public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {Assert.notNull(endpoint, "Endpoint must be set");Assert.hasText(endpoint.getId(), "Endpoint id must be set");// Factory may be null, we defer the resolution right before actually creating the containerKafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);synchronized (this.endpointDescriptors) {if (this.startImmediately) { // Register and start immediatelythis.endpointRegistry.registerListenerContainer(descriptor.endpoint,resolveContainerFactory(descriptor), true);}else {this.endpointDescriptors.add(descriptor);}}}
这里将KafkaListenerEndpoint包装为KafkaListenerEndpointDescriptor,注册到名为endpointDescriptors的KafkaListenerEndpointDescriptor集合中
KafkaListenerEndpointRegistrar
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java
public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {@Overridepublic void afterPropertiesSet() {registerAllEndpoints();}protected void registerAllEndpoints() {synchronized (this.endpointDescriptors) {for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor));}this.startImmediately = true; // trigger immediate startup}}
}
这个类实现了InitializingBean接口的afterPropertiesSet方法(
初始化bean的时候执行
),在这个里头去根据endpointDescriptors去挨个调用registerListenerContainer注册
KafkaListenerEndpointRegistry
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java
public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware,ApplicationListener<ContextRefreshedEvent> {protected final Log logger = LogFactory.getLog(getClass()); //NOSONARprivate final Map<String, MessageListenerContainer> listenerContainers =new ConcurrentHashMap<String, MessageListenerContainer>();//...... public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,boolean startImmediately) {Assert.notNull(endpoint, "Endpoint must not be null");Assert.notNull(factory, "Factory must not be null");String id = endpoint.getId();Assert.hasText(id, "Endpoint id must not be empty");synchronized (this.listenerContainers) {Assert.state(!this.listenerContainers.containsKey(id),"Another endpoint is already registered with id '" + id + "'");MessageListenerContainer container = createListenerContainer(endpoint, factory);this.listenerContainers.put(id, container);if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {List<MessageListenerContainer> containerGroup;if (this.applicationContext.containsBean(endpoint.getGroup())) {containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);}else {containerGroup = new ArrayList<MessageListenerContainer>();this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);}containerGroup.add(container);}if (startImmediately) {startIfNecessary(container);}}}protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,KafkaListenerContainerFactory<?> factory) {MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);if (listenerContainer instanceof InitializingBean) {try {((InitializingBean) listenerContainer).afterPropertiesSet();}catch (Exception ex) {throw new BeanInitializationException("Failed to initialize message listener container", ex);}}int containerPhase = listenerContainer.getPhase();if (containerPhase < Integer.MAX_VALUE) { // a custom phase valueif (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +this.phase + " vs " + containerPhase);}this.phase = listenerContainer.getPhase();}return listenerContainer;}@Overridepublic void start() {for (MessageListenerContainer listenerContainer : getListenerContainers()) {startIfNecessary(listenerContainer);}}@Overridepublic void stop() {for (MessageListenerContainer listenerContainer : getListenerContainers()) {listenerContainer.stop();}}
}
注册的时候将endpoint转换为MessageListenerContainer,然后放到listenerContainers的map当中
AbstractKafkaListenerContainerFactory
spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java
public C createListenerContainer(KafkaListenerEndpoint endpoint) {C instance = createContainerInstance(endpoint);if (this.autoStartup != null) {instance.setAutoStartup(this.autoStartup);}if (this.phase != null) {instance.setPhase(this.phase);}if (this.applicationEventPublisher != null) {instance.setApplicationEventPublisher(this.applicationEventPublisher);}if (endpoint.getId() != null) {instance.setBeanName(endpoint.getId());}if (endpoint instanceof AbstractKafkaListenerEndpoint) {AbstractKafkaListenerEndpoint<K, V> aklEndpoint = (AbstractKafkaListenerEndpoint<K, V>) endpoint;if (this.recordFilterStrategy != null) {aklEndpoint.setRecordFilterStrategy(this.recordFilterStrategy);}if (this.ackDiscarded != null) {aklEndpoint.setAckDiscarded(this.ackDiscarded);}if (this.retryTemplate != null) {aklEndpoint.setRetryTemplate(this.retryTemplate);}if (this.recoveryCallback != null) {aklEndpoint.setRecoveryCallback(this.recoveryCallback);}if (this.batchListener != null) {aklEndpoint.setBatchListener(this.batchListener);}}endpoint.setupListenerContainer(instance, this.messageConverter);initializeContainer(instance);return instance;}
这里主要看这个setupMessageListener方法 spring-kafka-1.2.3.RELEASE-sources.jar!/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java
private void setupMessageListener(MessageListenerContainer container, MessageConverter messageConverter) {Object messageListener = createMessageListener(container, messageConverter);Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener");if (this.retryTemplate != null) {if (messageListener instanceof AcknowledgingMessageListener) {messageListener = new RetryingAcknowledgingMessageListenerAdapter<>((AcknowledgingMessageListener<K, V>) messageListener, this.retryTemplate,this.recoveryCallback);}else {messageListener = new RetryingMessageListenerAdapter<>((MessageListener<K, V>) messageListener,this.retryTemplate, (RecoveryCallback<Object>) this.recoveryCallback);}}if (this.recordFilterStrategy != null) {if (messageListener instanceof AcknowledgingMessageListener) {messageListener = new FilteringAcknowledgingMessageListenerAdapter<>((AcknowledgingMessageListener<K, V>) messageListener, this.recordFilterStrategy,this.ackDiscarded);}else if (messageListener instanceof MessageListener) {messageListener = new FilteringMessageListenerAdapter<>((MessageListener<K, V>) messageListener,this.recordFilterStrategy);}else if (messageListener instanceof BatchAcknowledgingMessageListener) {messageListener = new FilteringBatchAcknowledgingMessageListenerAdapter<>((BatchAcknowledgingMessageListener<K, V>) messageListener, this.recordFilterStrategy,this.ackDiscarded);}else if (messageListener instanceof BatchMessageListener) {messageListener = new FilteringBatchMessageListenerAdapter<>((BatchMessageListener<K, V>) messageListener, this.recordFilterStrategy);}}container.setupMessageListener(messageListener);}
这个messageListener包含了原始endpoint携带的bean以及method转换成的InvocableHandlerMethod,然后注入到MessageListenerContainer(
ConcurrentMessageListenerContainer
),然后这里就跟上头的ConcurrentMessageListenerContainer这个衔接上,根据配置的spring.kafka.listener.concurrency来生成多个并发的KafkaMessageListenerContainer实例
MethodKafkaListenerEndpoint
createMessageListener这个方法将endpoint包含的原始标注KafkaListener注解的bean以及方法,包装为InvocableHandlerMethod,注入到MessagingMessageListenerAdapter当中
public class MethodKafkaListenerEndpoint<K, V> extends AbstractKafkaListenerEndpoint<K, V> {private Object bean;private Method method;private MessageHandlerMethodFactory messageHandlerMethodFactory;//......@Overrideprotected MessagingMessageListenerAdapter<K, V> createMessageListener(MessageListenerContainer container,MessageConverter messageConverter) {Assert.state(this.messageHandlerMethodFactory != null,"Could not create message listener - MessageHandlerMethodFactory not set");MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance(messageConverter);messageListener.setHandlerMethod(configureListenerAdapter(messageListener));return messageListener;}/*** Create a {@link HandlerAdapter} for this listener adapter.* @param messageListener the listener adapter.* @return the handler adapter.*/protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter<K, V> messageListener) {InvocableHandlerMethod invocableHandlerMethod =this.messageHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod());return new HandlerAdapter(invocableHandlerMethod);}
}
这个类会将原始的bean跟方法包装为InvocableHandlerMethod这个类,然后注入到MessagingMessageListenerAdapter当中
小结
- 对于生产者来说,封装KafkaProducer到KafkaTemplate相对简单
- 对于消费者来说,由于spring是采用注解的形式去标注消息处理方法的,所以这里稍微费劲一点:
- 先在KafkaListenerAnnotationBeanPostProcessor中扫描bean,然后注册到KafkaListenerEndpointRegistrar
- 而KafkaListenerEndpointRegistrar在afterPropertiesSet的时候去创建MessageListenerContainer
- messageListener包含了原始endpoint携带的bean以及method转换成的InvocableHandlerMethod
- ConcurrentMessageListenerContainer这个衔接上,根据配置的spring.kafka.listener.concurrency来生成多个并发的KafkaMessageListenerContainer实例
- 每个KafkaMessageListenerContainer都自己创建一个ListenerConsumer,然后自己创建一个独立的kafka consumer,每个ListenerConsumer在线程池里头运行,这样来实现并发
- 每个ListenerConsumer里头都有一个recordsToProcess队列,从原始的kafka consumer poll出来的记录会放到这个队列里头,
- 然后有一个ListenerInvoker线程循环超时等待从recordsToProcess取出记录,然后调用messageListener的onMessage方法(
即KafkaListener注解标准的方法
)
ListenerConsumer是重点,里头还有包括offset的提交,这里改天再详解一下。
转载于:https://my.oschina.net/go4it/blog/1547679
聊聊spring for kafka对consumer的封装与集成 1相关推荐
- 聊聊spring for kafka对consumer的封装与集成
序 本文主要解析一下spring for kafka对原生的kafka client consumer的封装与集成. consumer工厂 spring-kafka-1.2.3.RELEASE-sou ...
- 从架构演进的角度聊聊Spring Cloud都做了些什么?
Spring Cloud作为一套微服务治理的框架,几乎考虑到了微服务治理的方方面面,之前也写过一些关于Spring Cloud文章,主要偏重各组件的使用,本次分享主要解答这两个问题:Spring Cl ...
- Spring Apache Kafka教程
在本SpringApache Kafka课程中,我们将学习如何在Spring Boot项目中开始使用Apache Kafka,并开始生成和使用我们所选主题的消息. 除了一个简单的项目外,我们还将深入探 ...
- 【基于注解方式】Spring整合Kafka
文章目录 1. 添加Maven依赖 2. 配置与参数分离 3. 工具类度内容 4. Producer 消息生产者配置 5. Consumer 消息消费者配置 6. 使用注解监听消息 7. 请求测试 8 ...
- 聊聊 Spring Boot 2.x 那些事儿
本文目录: 即将的 Spring 2.0 - Spring 2.0 是什么 - 开发环境和 IDE - 使用 Spring Initializr 快速入门 Starter 组件 - Web:REST ...
- spring boot2 kafka
一.软件版本 1.linux:centos6 2.zookeeper:zookeeper-3.4.1 3.kafka:kafka_2.12-2.2.0 4.jdk:1.8 5.instelliJ Id ...
- spring boot+kafka+canal实现监听MySQL数据库
spring boot+kafka+canal实现监听MySQL数据库 一.zookeeper安装 kafka依赖于zookeeper,安装kafka前先安装zookeeper 下载地址:Apache ...
- Spring整合Kafka
文章目录 Spring整合Kafka 一.引入依赖 二.配置kafka 三.测试代码--如何用kafka 3.1 KafkaTests 3.2 测试结果 Spring整合Kafka 一.引入依赖 &l ...
- 最新Kafka教程(包含kafka部署与基本操作、java连接kafka、spring连接kafka以及使用springboot)
最新Kafka教程(包含kafka部署与基本操作.java连接kafka.spring连接kafka以及使用springboot) 欢迎转载,转载请注明网址:https://blog.csdn.net ...
- 聊聊 Spring Boot 2.0 的 WebFlux
https://zhuanlan.zhihu.com/p/30813274 首发于极乐科技 写文章登录 聊聊 Spring Boot 2.0 的 WebFlux 泥瓦匠BYSocket 4 个月前 聊 ...
最新文章
- linux系统读取第二个盘的数据,磁盘及文件系统管理—第二篇
- VSCode输出框中文乱码问题
- Educational Codeforces Round 41 E. Tufurama (961E)
- xxxx must either be declared abstract or implement abstract method ‘map(T)‘ in ‘MapFunction‘
- UFLDL教程: Exercise:Learning color features with Sparse Autoencoders
- 常用排序算法以及算法性能测试(完整C/C++代码实现)
- JavaMail基本使用
- 地产IT人福利:帆软地产BI解决方案全解析
- 2017年度计算机科学各领域热点词汇
- python:容器、迭代器、生成器 简单介绍
- ROS☞rosbag/rostopic消息记录、回放、转.txt
- asp.net怎么实现按条件查询_用这个提取函数王中王,制作数据查询表
- 181023词霸有道扇贝每日一句
- 咸宁php培训,PHP培训
- opencv3 与opencv2不同之处
- 使用接口根据关键词取亚马逊商品数据
- 微信小程序打开文档功能
- 如何制作一个简单的手机信息页面
- Gitlab备份和恢复操作记录
- Linux系统C++调试利器systemtap定位内存double free