– Start

点击此处观看本系列配套视频


在上个例子中,我们使用了脚本来发送消息,来吧,让我们来自己写点代码来发送消息。首先还是按照上个例子,先启动 ZooKeeper 和 启动 Kafka borker。

发送消息

package shangbo.kafka.example1;import java.util.Properties;
import java.util.concurrent.Future;import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;public class App {public static void main(String[] args) {// Producer 配置信息,应该配置在属性文件中Properties props = new Properties();//指定要连接的 broker,不需要列出所有的 broker,但建议至少列出2个,以防某个 broker 挂了props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("retries", 3); // 如果发生错误,重试三次props.put("acks", "1"); // 0:不应答,1:leader 应该,all:所有 leader 和 follower 应该// 创建 ProducerProducer<String, String> producer = new KafkaProducer<String, String>(props);// send 方法是异步的,方法返回并不代表消息发送成功producer.send(new ProducerRecord<String, String>("topic0", "message 1"));// 如果需要确认消息是否发送成功,以及发送后做一些额外操作,有两种办法// 方法 1: 使用 callbackproducer.send(new ProducerRecord<String, String>("topic0", "message 2"), new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(exception != null) {System.out.println("send message2 failed with " + exception.getMessage());} else {// offset 是消息在 partition 中的编号,可以根据 offset 检索消息System.out.println("message2 sent to " + metadata.topic() + ", partition " + metadata.partition() + ", offset " + metadata.offset());  }}});// 方法2:使用阻塞Future<RecordMetadata> sendResult = producer.send(new ProducerRecord<String, String>("topic0", "message 3"));try {// 阻塞直到发送成功RecordMetadata metadata = sendResult.get();System.out.println("message3 sent to " + metadata.topic() + ", partition " + metadata.partition() + ", offset " + metadata.offset());} catch(Exception e) {System.out.println("send message3 failed with " + e.getMessage());}// producer 需要关闭,放在 finally 里producer.close();}
}

你有没有想过一种情况,就是当 Producer 发送消息后突然断网了,那消息到底发送成功了还是失败了呢?请看下一篇。

– 更多参见:Kafka 精萃
– 声 明:转载请注明出处
– Last Edited on 2018-06-13
– Written by ShangBo on 2018-06-13
– End

Kafka 发送消息相关推荐

  1. 物联网架构----EMQ-Hook了解、连接Kafka发送消息

    物联网架构----EMQ-Hook了解.连接Kafka发送消息 1. 前言 按照我自己设计的物联网框架,对于MQTT集群中的所有消息,是要持久化到磁盘的,这里采用一个消息队列中间件Kafka作为数据缓 ...

  2. kafka发送消息的三种方式

    1.发后即忘(fire-and-forget) 只管往kafka发送消息而并不关心消息是否正确到达.正常情况没什么问题,不过有些时候(比如不可重试异常)会造成消息的丢失.这种发送方式性能最高,可靠性最 ...

  3. springboot kafka发送消息

    场景:kafka发送消息,并且根据消息发送的不同渠道和消息类型(例如发送到WX,DingDing,邮箱),采取不同的线程池处理 1.引入依赖 <dependency><groupId ...

  4. 在idea中往Kafka发送消息失败

    今天在学习Kafka整合Springboot项目的时候,往Kafka发送消息,消费者一直消费不到,ip地址,端口号,防火墙的状态都没有发现问题.后来发现是因为idea没有连接到虚拟机的,一直在找本机的 ...

  5. kafka发送消息至指定分区

    前言 在实际使用中,我们可能需要对某个topic下不同的消息进行分类管理,比如确保消费的顺序性,在这种场景下,我们可以首先确保生产者发送消息到指定的分区即可 本文的测试基于docker搭建的一个双节点 ...

  6. Kafka 发送消息 Idempotent

    – Start 点击此处观看本系列配套视频 我们先来回答一下上个例子最后问题的答案,很遗憾 Producer 并不能证明消息发送成功了,如果设置了 retry,Producer 会再次发送消息,这会导 ...

  7. Kafka 发送消息失败

    在使用brew services start kafka 成功启动kafka后, 执行发送消息会有提示以下,无法发送消息. kafka-console-producer --broker-list l ...

  8. Kafka 发送消息 Idempotent -- Spring 整合

    – Start 点击此处观看本系列配套视频 废话少说,直接上代码. package shangbo.kafka.example8;import org.springframework.context. ...

  9. springboot使用kafka发送消息,消息过大报错

    报错信息如下所示: Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1527150 ...

最新文章

  1. 文件属性及资源文件的使用
  2. mysql中order by优化的那些事儿
  3. C++Miller Rabin算法的实现(附完整源码)
  4. python爬取10个网站_十个Python爬虫武器库示例,十个爬虫框架,十种实现爬虫的方法!...
  5. 给网站logo添加css帅气亮光扫过特效 附教程
  6. x3850x5启动代码c2_代码小时x 2,080
  7. python读取指定行到最后一行_python读取文件最后一行两种方法
  8. 03数据库的基本查询
  9. cell数组变为字符串_字符串匹配 ---- BM 算法原理
  10. 自定义标签处理器类的生命周期
  11. 凉山火灾启示录:面对大火,AI 能做些什么?
  12. 【论文】AAAI 2020论文解读:关注实体以更好地理解文本
  13. 在线标准程序员计算器
  14. Win10环境VMware开WinXP虚拟机CPU占用100%
  15. 【题解PAT】1006 换个格式输出整数
  16. SSM校园好货APP的设计与实现毕业设计源码121619
  17. 好东西——计算机原理学习(序)
  18. 大学四年一路走来,我把这些私藏的算法学习工具全贡献出来了!
  19. 统计频次:统计数组中每种模的频次
  20. 公民SF证号码的构成

热门文章

  1. 澳洲纽卡斯尓大学计算机排名,澳洲纽卡斯尔大学计算机科学computer science专业排名第251~300名(2020THE泰晤士高等教育世界大学排名)...
  2. 特效笔记2--unity粒子系统的基本认识
  3. [codeforces 339]E. Three Swaps
  4. MTK Android4.0.3 ICS 添加缅甸语Myanmar
  5. 海康威视iSC 平台第三方对接门禁权限分享
  6. matlab程序特殊符号,matlab中怎么输入特殊符号
  7. 我们需要一颗强劲的心脏 2 --机房改造篇
  8. 新酷6重磅出击,主题页引领“江湖”
  9. python异或^ 移位运算符
  10. ubuntu添加或删除PPA