2019独角兽企业重金招聘Python工程师标准>>>

1,首先springboot对kafka的支持也很好,同样是在配置文件中配置好参数,然后就可以直接使用。先说一下,很简单,,,不要怕

2,我用的依赖是

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

配置文件

kafka:bootstrap-servers: 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092producer:retries: 1batch-size: 16384buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerbootstrap-servers: 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092consumer:bootstrap-servers:  12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092enable-auto-commit: trueauto-offset-reset: latestauto-commit-interval: 1000group-id: gzj

然后在需要往kafka发送数据的地方,也就是生产者,直接注入即可

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

消费者,监听

@KafkaListener(topics = {"gzj"})
public void receive(String content){System.err.println("Receive:" + content);
}

消费者还有另一种方法,

package com.gzj.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** Description* <p>* </p>* DATE 2018/10/23.** @author guozhenjiang.*/
@Component
public class KafkaConsumerTask implements Runnable,InitializingBean {private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTask.class);private Thread thread;@Resource(name="kafkaConsumer")private KafkaConsumer<String,String> kafkaConsumer;@Overridepublic void run() {logger.info("消费数据任务启动");while(true){try{ConsumerRecords<String ,String > records=kafkaConsumer.poll(1000);if(records!=null){for(ConsumerRecord<String ,String > record:records){logger.error(record.key());logger.error(record.topic());logger.error(record.value());}}}catch(Exception e){// logger.error("我也不知道哪儿错了");}finally {// logger.error("不放弃");}}}@Overridepublic void afterPropertiesSet() throws Exception {this.thread=new Thread(this);this.thread.start();}}
package com.gzj.demo.config;import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.Arrays;
import java.util.Properties;/*** Description* <p>* </p>* DATE 2018/10/23.** @author guozhenjiang.*/
@Configuration
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaConnectConfig {@Bean(name = "kafkaConsumer")public KafkaConsumer<String, String> kafkaConsumer() {Properties props = new Properties();props.put("bootstrap.servers", bootstrapServers);props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("group.id", "ggg");props.setProperty("enable.auto.commit", enableAutoCommit);props.setProperty("auto.offset.reset", autoOffsetReset);KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList("gzj"));return consumer;}@Value("${server.port}")private String port;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.enable-auto-commit}")private String enableAutoCommit;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;public String getGroupId() {return groupId;}public void setGroupId(String groupId) {this.groupId = groupId;}public String getBootstrapServers() {return bootstrapServers;}public void setBootstrapServers(String bootstrapServers) {this.bootstrapServers = bootstrapServers;}public String getEnableAutoCommit() {return enableAutoCommit;}public void setEnableAutoCommit(String enableAutoCommit) {this.enableAutoCommit = enableAutoCommit;}public String getAutoOffsetReset() {return autoOffsetReset;}public void setAutoOffsetReset(String autoOffsetReset) {this.autoOffsetReset = autoOffsetReset;}
}

后一种暂未发现有什么优点。都可以实现监听kafka,充当消费者

3,现在我有两个消费者,之前一直好奇如果多个消费者,如何让他们重复消费,或协同消费,听说是通过配置groupid,亲自试验了一下,确实是,同一个groupid里是协同的,不通的是重复的。

也没什么,挺简单的,有什么问题可以提问,开源中国的app我下好了,应该经常登录

转载于:https://my.oschina.net/u/3796880/blog/2251434

springboot+kafka(ip地址瞎写的)相关推荐

  1. springboot获取ip地址方法

    需要添加不同情况判断 public static String getIpAddr(HttpServletRequest request) {String ipAddress = null;try { ...

  2. java中获取当前服务器的Ip地址

    1.tomcat是一款免费的开源Web服务器,如果部署在本地,那么对应的那么为localhost,对应地址为127.0.0.1. 例子:可以通过http://localhost:8080/项目root ...

  3. Java获取请求客户端的真实IP地址

    整理网友的材料,最后有源码,亲测能解决所有java获取IP真实地址的问题 整理的这里: 1.链接1 2.链接2 JSP里,获取客户端的IP地址的方法是: request.getRemoteAddr() ...

  4. 多级反向代理下,Java获取请求客户端的真实IP地址多中方法整合

    在JSP里,获取客户端的IP地址的方法是:request.getRemoteAddr(),这种方法在大部分情况下都是有效的.但是在通过了Apache,Squid等反向代理软件就不能获取到客户端的真实I ...

  5. java获取服务器ip地址_java中获取当前服务器的Ip地址的方法

    1.tomcat是一款免费的开源Web服务器,如果部署在本地,那么对应的那么为localhost,对应地址为127.0.0.1. 例子:可以通过http://localhost:8080/项目root ...

  6. 阿里云ECS后台利用“安全组”屏蔽恶意攻击ip地址

    在阿里云后台的云盾监控->威胁->攻击,会发现每天有大量的恶意攻击,存在大量GET请求,导致服务器资源浪费无用消耗. 20160220102620 最近7天攻击类型 仔细查看攻击来源的信息 ...

  7. java获取请求本机ip地址

    在JSP里,获取客户端的IP地址的方法是:request.getRemoteAddr(),这种方法在大部分情况下都是有效的.但是在通过了Apache,Squid等反向代理软件就不能获取到客户端的真实I ...

  8. 怎样快速对二进制和十进制进行互转化——IP地址规划与设计总结

    最近一直在看全国计算机三级网络技术这本书,看到第二章的时候,不免会遇到计算机中最常用的进制转换问题.什么IP地址,什么子网掩码,什么网路地址,什么广播地址都会用到二进制和十进制的相互转化,而且最常用的 ...

  9. 在JSP里,获取客户端的IP地址的方法是:request.getRemoteAddr() ,这种方法在大部分情况下都是有效的。但是在通过了Apache,Squi...

    在JSP里,获取客户端的IP地址的方法是:request.getRemoteAddr() ,这种方法在大部分情况下都是有效的.但是在通过了Apache,Squid等反向代理软件就不能获取到客户端的真实 ...

最新文章

  1. python导入csv文件-python如何导入csv文件格式
  2. Windows Mobile与OPhone开发对比
  3. python查找数组中出现次数最多的元素
  4. Shell中的循环语句
  5. sklearn 聚类 实例
  6. Serverless 落地之痛怎么解?
  7. leetcode 349. 两个数组的交集 思考分析
  8. 2021-2025年中国冲击式粉碎机装置行业市场供需与战略研究报告
  9. SpringBoot数据访问-------------数据缓存
  10. C# int byte数组互转 int byte[]互转
  11. (2020/11/9)学习记录
  12. iOS armv7, armv7s, arm64区别与应用32位、64位配置
  13. JAVA常用加密解密算法Encryption and decryption
  14. 不用PS,也能实现抠图的工具
  15. how2heap(5):tcache_poisoning 2.31
  16. Wireshark使用详解
  17. Python-import
  18. 数据挖掘与分析课程笔记(Chapter 7)
  19. 记录一次使用线性回归和多项式回归为房价定价
  20. Jmter的接口测试提升篇

热门文章

  1. Nginx 源码编译安装
  2. windows 7 64bit安装apche php
  3. 更换用installshield打包生成exe文件的图标
  4. RHCS创建高可用性群集Apache服务器
  5. 用bitmap实现中位数的算法
  6. 定制SPS实战记录 系列之一
  7. archlinux安装chrome-webdriver
  8. java懒汉_java中懒汉式的volatile的问题?
  9. 非计算机专业想学习Python,有哪些问题需要注意?
  10. Hippo如何进入WebAssembly?一文教会你