c踩坑

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic HelloWorld

将localhost必须和PLAINTEXT配置的地址保持一致,否则的话会无限警告不能接收数据

生产者;

import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducerDemo  {    public static void main(String[] args){

        try {        //新建一个类这个类用来配置kafka       Properties properties = new Properties();//用来配置 kafka的IP地址和端口号             properties.put("bootstrap.servers", "master:9092");//ack 的状态可以分为三种  1,-1,all    1代表当生产者            properties.put("acks", "1");//重要//最多重复几次生产操作            properties.put("retries", 3);//重要  //缓存每个分区未发送消息。缓存的大小是通过 batch.size 配置指定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。            properties.put("batch.size", 16384);//监听间隔
            properties.put("linger.ms", 1);//控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。            properties.put("buffer.memory", 33554432);//序列化类型。 Kafka消息是以键值对的形式发送到Kafka集群的,其中Key是可选的,Value可以是任意类型。但是在Message被发送到Kafka集群之前,Producer需要把不同类型的消            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");//初始化生产者            Producer<String, String> producer = null;            producer = new KafkaProducer<String, String>(properties);            for (int i = 0; i < 100; i++) {                String msg = "Messagea" + i;//topic的名字:HelloWorld                producer.send(new ProducerRecord<String, String>("HelloWorld1", msg));                System.out.println("Sent:" + msg);                //Thread.sleep(1000);            }        } catch (Exception e) {            e.printStackTrace();

        }

    }}之后再启动消费者  bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic HelloWorld --from-beginning

//消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;import java.util.Properties;

public class ConsumerDemo {    public static void main(String[] args) {

    Properties properties = new Properties();

properties.put("bootstrap.servers", "master:9092");

properties.put("group.id", "i");    //是否自动提交properties.put("enable.auto.commit", "false");

properties.put("auto.commit.interval.ms", "1000");//偏移量自增properties.put("auto.offset.reset", "earliest");////session保存时间properties.put("session.timeout.ms", "30000");//序列化properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);//topictest2kafkaConsumer.subscribe(Arrays.asList("HelloWorld"));        while (true) {        ConsumerRecords<String, String> records = kafkaConsumer.poll(100);        for (ConsumerRecord<String, String> record : records)            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());    }

}}最后启动生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic HelloWorld

但是以上操作都有可能遇到数据重复或者数据丢失,在kafka0.11后引入了幂等和事务完美的解决了这个问题切忌是0.11版本之后包括0.11,如欲了解详情请看下篇

老规矩:散文欣赏《不老苍穹》今后的日子,我的头发越来越少,我写给你的句子越来越少,我剩下的时间越来越少,但世界永恒,星辰常在,苍穹不老。

最后的日子,我抱起你会很难,我看清你会很难,我记起你会很难,但此心至诚,百年不变,千岁未寒。

转载于:https://www.cnblogs.com/lppz/p/9991993.html

kafka java api 生产者 producer 与消费者consumer相关推荐

  1. Kafka学习笔记 --- 生产者producer与消费者关系comsumer

    生产者:生产者可以将数据发布到所选择的topic(主题)中.生产者负责将记录分配到topic的哪一个 partition(分区)中.可以使用循环的方式来简单地实现负载均衡,也可以根据某些语义分区函数( ...

  2. kafka java api 删除_Kafka入门系列—6. Kafka 常用命令及Java API使用

    常用命令 启动Zookeeper ./zkServer.sh start-foreground 可选参数: ./zkServer.sh {start|start-foreground|stop|res ...

  3. 【Kafka】kafka Java api 获取 kafka topic 或者 partition 占用的磁盘大小

    1.概述 kafka Java api 获取 kafka topic 或者 partition 占用的磁盘大小 package com.dtwave.kafka.storage;import org. ...

  4. kafka java api 删除,Kafka:删除闲置的消费者组ID

    In some cases, I use Kafka-stream to model a small in memory (hashmap) projection of a topic. The K, ...

  5. kafka java api 删除_使用Java API创建(create),查看(describe),列举(list),删除(delete)Kafka主题(Topic)...

    使用Kafka的同学都知道,我们每次创建Kafka主题(Topic)的时候可以指定分区数和副本数等信息,如果将这些属性配置到server.properties文件中,以后调用Java API生成的主题 ...

  6. kafka java api 入数据报错:Error: MESSAGE_TOO_LARGE

    1.kafka版本:kafka_2.11-2.1.0-kafka-4.0.0.jar 2.server.properties:所有调优参数都是默认 3.topic :null,所有参数默认 4.入库1 ...

  7. 【Kafka笔记】4.Kafka API详细解析 Java版本(Producer API,Consumer API,拦截器等)

    简介 Kafka的API有Producer API,Consumer API还有自定义Interceptor (自定义拦截器),以及处理的流使用的Streams API和构建连接器的Kafka Con ...

  8. java kafka api_kafka java API的使用

    Kafka包含四种核心的API: 1.Producer API支持应用将数据流发送到Kafka集群的主题 2.Consumer API支持应用从Kafka集群的主题中读取数据流 3.Streams A ...

  9. java kafka 消费_java编程之Kafka_消费者API详解

    1 消息发送 1.异步发送导入依赖 org.apache.kafka kafka-clients 0.11.0.0 编写代码 需要用到的类: KafkaProducer:需要创建一个生产者对象,用来发 ...

最新文章

  1. Linux NUMA 架构 :基础软件工程师需要知道一些知识
  2. Kubernetes — kubectl 的基本使用
  3. endnote初始化数据库支持_5 个免费的在线 SQL 数据库环境,比Navicat 香
  4. Python UDP聊天器
  5. asp连接oracle6,asp下用OracleInProcServer完成对Oracle的连接和操作-ASP教程,数据库相关...
  6. 为什么大学要学一堆纸上谈兵的课程?(转)
  7. SpringBoot+Mybatis集成搭建
  8. 【蓝桥杯真题】蓝桥杯真题之旋转
  9. Java微服务和分布式区别
  10. 储油罐的变位识别与罐容表标定
  11. 轻重在平衡:平衡查找树的强大威力
  12. luogu 5561 [Celeste-B]Mirror Magic 后缀数组+RMQ+multiset
  13. Win11此应用无法在你的电脑上运行怎么解决
  14. 贝叶斯与朴素贝叶斯入门及实战
  15. vue3组件之间通信(二)——子传父属性和方法
  16. unity webgl打包 苹果12以上机型打开连接后模型黑屏卡帧问题
  17. Smiditor实现图片上传功能
  18. Linux 部署开源WAF模块 ModSecurity
  19. 2. Hibernate目录结构和基础JAR包介绍
  20. Google在线翻译工具:Translatium for Mac支持big sur

热门文章

  1. 奇瑞无界Pro 刮起性能机甲风
  2. C# 直接使用DllImport外部dll的方法
  3. 单片机里如何使用冒泡法实现数据从大到小排列_单片机实验一冒泡法排序.doc...
  4. Python量化策略入门1-如何利用聚宽(JoinQuant)下载金融数据
  5. topic是短语还是句子_英语topic
  6. 新聘应届生入职培训计划(华为)
  7. 社区宽带繁忙是什么意思_ISP许可证是什么?办理ISP许可证对企业有什么要求?...
  8. Linux高性能服务器I/0高级应用:非阻塞connect(15)
  9. 求助大神们 请问sublime行数左边的黄色竖杠表示什么
  10. 连载《国培计划》骨干教师的研修日志之六:关于米新江教授谈的授之以渔与渔之以渔