1、编写所需的pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.com.kafka</groupId><artifactId>learnKafka</artifactId><version>1.0-SNAPSHOT</version><!--导入maven依赖有两种:一种maven.org 一种从已有的项目中导入--><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.8.2</artifactId><version>0.8.1</version><exclusions><exclusion><artifactId>jmxtools</artifactId><groupId>com.sun.jdmk</groupId></exclusion><exclusion><artifactId>jmxri</artifactId><groupId>com.sun.jmx</groupId></exclusion><exclusion><artifactId>jms</artifactId><groupId>javax.jms</groupId></exclusion></exclusions></dependency></dependencies>
</project>

2、编写一个Producer

代码:

package kafka;import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;import java.util.Properties;/*** 代码说明** @author tuzq* @create 2017-06-18 16:50*/
public class MyKafkaProducer {public static void main(String[] args) {Properties properties = new Properties();properties.put("metadata.broker.list","hadoop1:9092");properties.put("serializer.class","kafka.serializer.StringEncoder");Producer producer = new Producer(new ProducerConfig(properties));//先到Linux下创建topic:test//[root@hadoop1 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 -partitions 1 --topic testproducer.send(new KeyedMessage("test","I am tuzq"));}
}

运行程序
准备条件:
1、创建test的topic
[root@hadoop1 kafka]# bin/kafka-topics.sh –create –zookeeper hadoop11:2181 –replication-factor 1 -partitions 1 –topic test

2、执行kafka-console-consumer.sh这个脚本,监听发过来的消息
[root@hadoop3 kafka]# sh bin/kafka-console-consumer.sh –zookeeper hadoop11:2181 –from-beginning –topic test

3、右键Run这个类

3.编写含有自定义的一些条件的Producer

代码

package kafka;import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;import java.util.Properties;/*** 这是一个简单的Kafka producer代码* 包含两个功能:* 1、数据发送* 2、数据按照自定义的partition策略进行发送** @author tuzq* @create 2017-06-18 17:13*/
public class KafkaProducerSimple {public static void main(String[] args) {/*** 1、指定当前Kafka producer生产的数据的目的地* 创建topic可以输入以下命令,在kafka集群的任一节点进行创建* bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 --partitions 1 --topic test*/String TOPIC = "test";/*** 2、读取配置文件*/Properties props = new Properties();/*** key.serializer.class默认为serializer.class*/props.put("serializer.class", "kafka.serializer.StringEncoder");/*** kafka broker对应的主机,格式为host1:port1,host2:port2*/props.put("metadata.broker.list","hadoop1:9092");/** request.required.acks,设置发送数据是否需要服务端的反馈,有三个值0,1,-1* 0,意味着producer永远不会等待一个来自broker的ack,这就是0.7版本的行为。* 这个选项提供了最低的延迟,但是持久化的保证是最弱的,当server挂掉的时候会丢失一些数据。** 1,意味着在leader replica已经接收到数据后,producer会得到一个ack。* 这个选项提供了更好的持久性,因为在server确认请求成功处理后,client才会返回。* 如果刚写到leader上,还没来得及复制leader就挂了,那么消息才可能会丢失。** -1,意味着在所有的ISR都接收到数据后,producer才得到一个ack。* 这个选项提供了最好的持久性,只要还有一个replica存活,那么数据就不会丢失*/props.put("request.required.acks", "1");/** 可选配置,如果不配置,则使用默认的partitioner partitioner.class* 默认值:kafka.producer.DefaultPartitioner* 用来把消息分到各个partition中,默认行为是对key进行hash。*///props.put("partitioner.class","kafka.MyPartitioner");//默认情况下是:props.put("partitioner.class","kafka.producer.DefaultPartitioner");/*** 3.通过配置文件,创建生产者*/Producer<String,String> producer = new Producer<String, String>(new ProducerConfig(props));/*** 4.通过for循环生产数据*/int messageNo = 0;while (true){String messageStr = new String(" aaaa");/*** 5、调用producer的send方法发送数据* 注意:这里需要指定 partitionKey,用来配合自定义的MyPartitioner进行数据分发*/producer.send(new KeyedMessage<String, String>(TOPIC,messageNo+"",messageStr));messageNo += 1;}}
}

运行程序
进入/home/tuzq/software/kafka/servers/logs/kafka,执行以下命令:

[root@hadoop2 kafka]# cd /home/tuzq/software/kafka/servers/logs/kafka
[root@hadoop2 kafka]# ls
cleaner-offset-checkpoint  itheima-1  learnKafka-0  meta.properties  recovery-point-offset-checkpoint  replication-offset-checkpoint  test-1
[root@hadoop2 kafka]# cd test-1/
[root@hadoop2 test-1]# ls
00000000000000000000.index  00000000000000000000.log
注意上面的.index是索引文件,.log是数据文件
[root@hadoop2 test-1]# du -h
380K    .
[root@hadoop2 test-1]#

只要这个程序一直开着,那么发现du -h这个值就在不停的变大。

4.Kafka Consumer案例

代码:

package kafka;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class KafkaConsumerSimple implements Runnable {public String title;public KafkaStream<byte[], byte[]> stream;public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {//获取自己的消费编号,以及要消费的kafkaStreamthis.title = title;this.stream = stream;}public void run() {System.out.println("开始运行 " + title);//6、从KafkaStream获取一个迭代器ConsumerIterator<byte[], byte[]> it = stream.iterator();/*** 7、不停地从stream读取新到来的消息,在等待新的消息时,hasNext()会阻塞* */while (it.hasNext()) {MessageAndMetadata<byte[], byte[]> data = it.next();String topic = data.topic();int partition = data.partition();long offset = data.offset();String msg = new String(data.message());System.out.println(String.format("Consumer: [%s],  Topic: [%s],  PartitionId: [%d], Offset: [%d], msg: [%s]",title, topic, partition, offset, msg));}System.err.println(String.format("Consumer: [%s] exiting ...", title));}public static void main(String[] args) throws Exception{// 1、准备一些配置参数Properties props = new Properties();props.put("group.id", "testGroup");props.put("zookeeper.connect", "hadoop11:2181,hadoop12:2181,hadoop13:2181");props.put("auto.offset.reset", "largest");props.put("auto.commit.interval.ms", "1000");props.put("partition.assignment.strategy", "roundrobin");ConsumerConfig config = new ConsumerConfig(props);//2、准备要消费的topicString topic = "test";//3、创建一个consumer的连接器// 只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);//创建topicCountMapMap<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic,1);//4、获取每个topic对应的kafkaStreamMap<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);//5、消费KafkaStream中的数据List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic);ExecutorService executor = Executors.newFixedThreadPool(4);for (int i = 0; i < streams.size(); i++)executor.execute(new KafkaConsumerSimple("consumer" + (i + 1), streams.get(i)));}
}

右键运行

注意:上满的三个小案例在运行之前,都要先检查以下kafka是否已经启动了,检查方式是执行jps命令,看看是否有kafka的进程

Kafka的producer案例,Kafka的consumer案例相关推荐

  1. kafka之Producer同步与异步消息发送及事务幂等性案例应用实战

    本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客.版权声明:本套Spark商业应用实战归作者(秦凯新)所有,禁止转载,欢迎学习. 秦凯新的技术社区 ...

  2. kafka偏移量保存到mysql里_SparkStreaming+kafka保存offset的偏移量到mysql案例

    MySQL创建存储offset的表格 mysql> use test mysql> create table hlw_offset( topic varchar(32), groupid ...

  3. 大数据技术之kafka (第 3 章 Kafka 架构深入 ) 消费者组案例

    1)需求:测试同一个消费者组中的消费者,同一时刻只能有一个消费者消费.  2)案例实操   (1)在 backupo01.backupo02 上修改/usr/local/hadoop/kafka/ka ...

  4. java连接kafka api_Kafka-JavaAPI(Producer And Consumer)

    Kafka--JAVA API(Producer和Consumer) Kafka 版本2.11-0.9.0.0 producer package com.yzy.spark.kafka; import ...

  5. SAP Data Intelligence Modeler里的Kafka Producer和Kafka Consumer

    首先本地将kafka的docker容器镜像下载到本地并运行: docker search kafka docker pull spotify/kafka docker run --name kafka ...

  6. SpringBoot整合kafka(实现producer和consumer)

    转载自 SpringBoot整合kafka(实现producer和consumer) 在Windows环境下安装运行Kafka:https://www.jianshu.com/p/d64798e81f ...

  7. 【Kafka笔记】4.Kafka API详细解析 Java版本(Producer API,Consumer API,拦截器等)

    简介 Kafka的API有Producer API,Consumer API还有自定义Interceptor (自定义拦截器),以及处理的流使用的Streams API和构建连接器的Kafka Con ...

  8. 【Kafka】——Producer

    Kfaka Producer 1. 原理 <1> 创建mian线程 <2> 调用send() 方法 <3> 经过拦截器interceptors ,生产中用的较少 & ...

  9. 学习笔记Kafka(六)—— Kafka Consumer API及开发实例

    一.Kafka Consumer API 1.1.Consumer 1.2.KafkaConsumer 1.3.ConsumerRecords 1.4.ConsumerRecord 1.5.Kafka ...

最新文章

  1. AI量身定制:如何打造符合“中国特色教育”的内容推荐体系?
  2. 本日吐槽!“人傻钱多”的P2P公司是否是程序员的合适选择(群聊天记录的娱乐)...
  3. Dalvik VM进程系统(二):分析Zygote的启动过程
  4. 互联网协议 — OSPF 开放式最短路径优先协议
  5. 精度问题——直线方程的系数判断实际生产中三点能否确定一个圆
  6. Linux 系统更改界面显示详解
  7. 学习一下spring-cloud-function中官方修复的一个问题
  8. kubernetes apiserver认证 1
  9. php公众号推荐,良心推荐6个优质实用又有趣的微信公众号!
  10. 【对讲机的那点事】解读无管局《回答》:充分理解物联网产业诉求,值得点赞!...
  11. 递归的Fibonacci在数羊
  12. mysql游戏调整等级_mysql求游戏排名
  13. Sap Hana触发器
  14. qtdesigner设计表格_Qt Designer下的一些基础操作
  15. word闪退 用endnote_endnote x9 word 中插入参考文献时闪退崩溃
  16. java汉字转拼音以及五笔码工具
  17. 程序员必备:常见的安卓开发工具推荐
  18. Golang sync.Cond详细理解
  19. 应该怎么提升4G工业路由器的无线信号?
  20. 实战PyQt5: 141-QChart图表之箱形图

热门文章

  1. python中的线程
  2. 算法与数据结构(python):分治与归并排序
  3. NLP分析小说人物关系,找找主人公的真爱。
  4. 情感分析基于词典(算例代码)
  5. VTK:多个视口用法实战
  6. wxWidgets:wxMoveEvent类用法
  7. wxWidgets:wxMemoryFSHandler类用法
  8. boost::geometry::svg用法的测试程序
  9. GDCM:gdcm::Printer的测试程序
  10. GDCM:DICOM文件转储飞利浦ECHO的测试程序