springboot+kafka(ip地址瞎写的)
2019独角兽企业重金招聘Python工程师标准>>>
1,首先springboot对kafka的支持也很好,同样是在配置文件中配置好参数,然后就可以直接使用。先说一下,很简单,,,不要怕
2,我用的依赖是
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency>
配置文件
kafka:bootstrap-servers: 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092producer:retries: 1batch-size: 16384buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerbootstrap-servers: 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092consumer:bootstrap-servers: 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092enable-auto-commit: trueauto-offset-reset: latestauto-commit-interval: 1000group-id: gzj
然后在需要往kafka发送数据的地方,也就是生产者,直接注入即可
@Autowired private KafkaTemplate<String, Object> kafkaTemplate;
消费者,监听
@KafkaListener(topics = {"gzj"}) public void receive(String content){System.err.println("Receive:" + content); }
消费者还有另一种方法,
package com.gzj.demo.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** Description* <p>* </p>* DATE 2018/10/23.** @author guozhenjiang.*/ @Component public class KafkaConsumerTask implements Runnable,InitializingBean {private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTask.class);private Thread thread;@Resource(name="kafkaConsumer")private KafkaConsumer<String,String> kafkaConsumer;@Overridepublic void run() {logger.info("消费数据任务启动");while(true){try{ConsumerRecords<String ,String > records=kafkaConsumer.poll(1000);if(records!=null){for(ConsumerRecord<String ,String > record:records){logger.error(record.key());logger.error(record.topic());logger.error(record.value());}}}catch(Exception e){// logger.error("我也不知道哪儿错了");}finally {// logger.error("不放弃");}}}@Overridepublic void afterPropertiesSet() throws Exception {this.thread=new Thread(this);this.thread.start();}}
package com.gzj.demo.config;import org.apache.kafka.clients.consumer.KafkaConsumer; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.util.Arrays; import java.util.Properties;/*** Description* <p>* </p>* DATE 2018/10/23.** @author guozhenjiang.*/ @Configuration @ConfigurationProperties(prefix = "spring.kafka") public class KafkaConnectConfig {@Bean(name = "kafkaConsumer")public KafkaConsumer<String, String> kafkaConsumer() {Properties props = new Properties();props.put("bootstrap.servers", bootstrapServers);props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("group.id", "ggg");props.setProperty("enable.auto.commit", enableAutoCommit);props.setProperty("auto.offset.reset", autoOffsetReset);KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);consumer.subscribe(Arrays.asList("gzj"));return consumer;}@Value("${server.port}")private String port;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.enable-auto-commit}")private String enableAutoCommit;@Value("${spring.kafka.consumer.auto-offset-reset}")private String autoOffsetReset;public String getGroupId() {return groupId;}public void setGroupId(String groupId) {this.groupId = groupId;}public String getBootstrapServers() {return bootstrapServers;}public void setBootstrapServers(String bootstrapServers) {this.bootstrapServers = bootstrapServers;}public String getEnableAutoCommit() {return enableAutoCommit;}public void setEnableAutoCommit(String enableAutoCommit) {this.enableAutoCommit = enableAutoCommit;}public String getAutoOffsetReset() {return autoOffsetReset;}public void setAutoOffsetReset(String autoOffsetReset) {this.autoOffsetReset = autoOffsetReset;} }
后一种暂未发现有什么优点。都可以实现监听kafka,充当消费者
3,现在我有两个消费者,之前一直好奇如果多个消费者,如何让他们重复消费,或协同消费,听说是通过配置groupid,亲自试验了一下,确实是,同一个groupid里是协同的,不通的是重复的。
也没什么,挺简单的,有什么问题可以提问,开源中国的app我下好了,应该经常登录
转载于:https://my.oschina.net/u/3796880/blog/2251434
springboot+kafka(ip地址瞎写的)相关推荐
- springboot获取ip地址方法
需要添加不同情况判断 public static String getIpAddr(HttpServletRequest request) {String ipAddress = null;try { ...
- java中获取当前服务器的Ip地址
1.tomcat是一款免费的开源Web服务器,如果部署在本地,那么对应的那么为localhost,对应地址为127.0.0.1. 例子:可以通过http://localhost:8080/项目root ...
- Java获取请求客户端的真实IP地址
整理网友的材料,最后有源码,亲测能解决所有java获取IP真实地址的问题 整理的这里: 1.链接1 2.链接2 JSP里,获取客户端的IP地址的方法是: request.getRemoteAddr() ...
- 多级反向代理下,Java获取请求客户端的真实IP地址多中方法整合
在JSP里,获取客户端的IP地址的方法是:request.getRemoteAddr(),这种方法在大部分情况下都是有效的.但是在通过了Apache,Squid等反向代理软件就不能获取到客户端的真实I ...
- java获取服务器ip地址_java中获取当前服务器的Ip地址的方法
1.tomcat是一款免费的开源Web服务器,如果部署在本地,那么对应的那么为localhost,对应地址为127.0.0.1. 例子:可以通过http://localhost:8080/项目root ...
- 阿里云ECS后台利用“安全组”屏蔽恶意攻击ip地址
在阿里云后台的云盾监控->威胁->攻击,会发现每天有大量的恶意攻击,存在大量GET请求,导致服务器资源浪费无用消耗. 20160220102620 最近7天攻击类型 仔细查看攻击来源的信息 ...
- java获取请求本机ip地址
在JSP里,获取客户端的IP地址的方法是:request.getRemoteAddr(),这种方法在大部分情况下都是有效的.但是在通过了Apache,Squid等反向代理软件就不能获取到客户端的真实I ...
- 怎样快速对二进制和十进制进行互转化——IP地址规划与设计总结
最近一直在看全国计算机三级网络技术这本书,看到第二章的时候,不免会遇到计算机中最常用的进制转换问题.什么IP地址,什么子网掩码,什么网路地址,什么广播地址都会用到二进制和十进制的相互转化,而且最常用的 ...
- 在JSP里,获取客户端的IP地址的方法是:request.getRemoteAddr() ,这种方法在大部分情况下都是有效的。但是在通过了Apache,Squi...
在JSP里,获取客户端的IP地址的方法是:request.getRemoteAddr() ,这种方法在大部分情况下都是有效的.但是在通过了Apache,Squid等反向代理软件就不能获取到客户端的真实 ...
最新文章
- python导入csv文件-python如何导入csv文件格式
- Windows Mobile与OPhone开发对比
- python查找数组中出现次数最多的元素
- Shell中的循环语句
- sklearn 聚类 实例
- Serverless 落地之痛怎么解?
- leetcode 349. 两个数组的交集 思考分析
- 2021-2025年中国冲击式粉碎机装置行业市场供需与战略研究报告
- SpringBoot数据访问-------------数据缓存
- C# int byte数组互转 int byte[]互转
- (2020/11/9)学习记录
- iOS armv7, armv7s, arm64区别与应用32位、64位配置
- JAVA常用加密解密算法Encryption and decryption
- 不用PS,也能实现抠图的工具
- how2heap(5):tcache_poisoning 2.31
- Wireshark使用详解
- Python-import
- 数据挖掘与分析课程笔记(Chapter 7)
- 记录一次使用线性回归和多项式回归为房价定价
- Jmter的接口测试提升篇