java连接kafka api_Kafka-JavaAPI(Producer And Consumer)
Kafka--JAVA API(Producer和Consumer)
Kafka 版本2.11-0.9.0.0
producer
package com.yzy.spark.kafka;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
/**
* Producer
*/
public class KafkaProducer extends Thread{
private String topic;
//--1
private Producer producer;
public KafkaProducer(String topic){
this.topic=topic;
Properties properties = new Properties(); //--2
properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);
properties.put("serializer.class","kafka.serializer.StringEncoder");
properties.put("request.require.acks","1");
// properties.put("partitioner.class","com.yzy.spark.kafka.MyPartition");
ProducerConfig config=new ProducerConfig(properties);
producer=new Producer<>(config);
}
@Override
public void run() {
int messageNo=1;
while (true){
String message="message"+ messageNo;
producer.send(new KeyedMessage("test2",String.valueOf(messageNo),message)); //--4
System.out.println("Sent:"+message);
messageNo++;
try {
Thread.sleep(1000);
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
1.定义Producer对象,这里要注意泛型类型,之后的KeyedMessage的泛型类型和Producer相同。
2.创建Producer对象需要传入一个ProducerConfig对象,而ProducerConfig对象需要由Properties对象构造,properties的属性设置可以查看ProducerConfig源码,注释很详细(个别属性在ProducerConfig父类AsyncProducerConfig 和 SyncProducerConfigShared中)。
3.该属性可以指定partitioner,如果不设置默认会设为kafka.producer.DefaultPartitioner。
4.看看KeyedMessage的源码:
case class KeyedMessage[K, V](topic: String, key: K, partKey: Any, message: V) {
if(topic == null)
throw new IllegalArgumentException("Topic cannot be null.")
def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)
def this(topic: String, key: K, message: V) = this(topic, key, key, message)
def partitionKey = {
if(partKey != null)
partKey
else if(hasKey)
key
else
null
}
def hasKey = key != null
}
参数有4个,topic必填,message是消息,通常只填这两个参数即可发送消息。key和partKey是用于partition的参数,partKey的优先级高于key,但是partKey只对当前消息起作用,key和partKey只能是String类型。下面来看看partition策略和key。
partition
先在服务器端将topic test2的partitions设定为3
kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 3 --topic test2
然后回到客户端看看kafka.producer.DefaultPartitioner源码
package kafka.producer
import kafka.utils._
import org.apache.kafka.common.utils.Utils
class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {
private val random = new java.util.Random
def partition(key: Any, numPartitions: Int): Int = {
Utils.abs(key.hashCode) % numPartitions
}
}
该类有一个方法 def partition(key: Any, numPartitions: Int),第一个参数为上文所说的key或partKey,第二个为partitions的数量,传入的值就是在服务器设置的值(3),将key的hashCode对numPartitions取余得到结果(选择对应编号的partition)
我们可以自己定义一个partition.class并配置到properties属性中,这里给一个简单的例子:
package com.yzy.spark.kafka;
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class MyPartition implements Partitioner {
public MyPartition(VerifiableProperties properties){
}
@Override
public int partition(Object key, int numPartitions) {
System.out.println("numPartitions:"+numPartitions);
return key.hashCode()%numPartitions;
}
}
Consumer
package com.yzy.spark.kafka;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class KafkaConsumer extends Thread{
private String topic;
private String groupId;
public KafkaConsumer(String topic,String groupId){
this.topic=topic;
this.groupId=groupId;
}
private ConsumerConnector createConnector(){
Properties properties=new Properties();//--1
properties.put("zookeeper.connect",KafkaProperties.ZK);
properties.put("group.id",groupId);
properties.put("auto.offset.reset", "largest");//--2
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
return Consumer.createJavaConsumerConnector(consumerConfig);
}
@Override
public void run() {
ConsumerConnector consumerConnector=this.createConnector();
Map topicCountMap=new HashMap<>();
topicCountMap.put(topic,1);
Map>> messageStreams = consumerConnector.createMessageStreams(topicCountMap);
KafkaStream stream = messageStreams.get(topic).get(0);
ConsumerIterator iterator = stream.iterator();
while(iterator.hasNext()){
String message=new String(iterator.next().message());
}
}
}
Consumer相关的东西比较多,涉及到group和partition机制,以官方文档为准。
1.properties和producer一样看源码配置。
2.这个属性和shell命令中的--from-beginning对应。可以填smallest(从头读取)和largest(默认值,读取最新的元素,严格来说是最新的offset位置开始读取)。注意:每一次一个新的consumer试图去消费一个topic时,都是从所在group的largest offset位置读取,即也可通过设置group.id来实现from-beginning,只要将每个consumer的group.id都设置为一个新值即可,例如properties.put("group.id", UUID.randomUUID().toString());
java连接kafka api_Kafka-JavaAPI(Producer And Consumer)相关推荐
- 最新Kafka教程(包含kafka部署与基本操作、java连接kafka、spring连接kafka以及使用springboot)
最新Kafka教程(包含kafka部署与基本操作.java连接kafka.spring连接kafka以及使用springboot) 欢迎转载,转载请注明网址:https://blog.csdn.net ...
- nodejs链接kafka示例(producer、consumer)
2019独角兽企业重金招聘Python工程师标准>>> 安装node环境: wget https://nodejs.org/dist/v6.10.3/node-v6.10.3-lin ...
- 详细讲解如何使用Java连接Kafka构建生产者和消费者(带测试样例)
1 缘起 学习消息队列的过程中,先补习了RabbitMQ相关知识, 接着又重温了Kafka相关的知识, 发现,我并没有积累Java原生操作Kafka的文章, 只使用SpringBoot集成过Kafka ...
- java 连接kafka超时_java – Kafka KStreams – 处理超时
我试图使用< KStream> .process()与Time Windows.of("name",30000)批量处理一些KTable值并发送它们.似乎30秒超过了消 ...
- java连接kafka接收不到数据_Kafka客户端无法接收消息
我在远程机器上设置了kafka和zookeeper . 在那台机器上我可以看到下面在官方网站上使用测试方法 . > bin/kafka-console-producer.sh --broker- ...
- java 连接kafka之坑Connection to node 0 could not be established. Broker may not be available
最近学习kafka java api遇到一个特别坑的问题:<Connection to node 0 could not be established. Broker may not be av ...
- kafka 0.9 java开发_kafka 0.9 java producer and consumer demo
实验环境: kafka_2.11-0.9.0.1.tgz zookeeper-3.4.6.tar.gz 样例代码: git clone https://github.com/downgoon/hell ...
- java kafka api_kafka java API的使用
Kafka包含四种核心的API: 1.Producer API支持应用将数据流发送到Kafka集群的主题 2.Consumer API支持应用从Kafka集群的主题中读取数据流 3.Streams A ...
- 057 Java中kafka的Producer程序实现
1.需要启动的服务 这里启动的端口是9092. bin/kafka-console-consumer.sh --topic beifeng --zookeeper linux-hadoop01.ibe ...
最新文章
- 工业机器人什么情况下会出现奇点_【怎么解释机器人奇点造成的结果?】-工业-有米下载-6miu.com...
- 网页插件学javascript还是jquery好啊?
- Oracle 数据字典表 -- SYS.COL$
- box-sizing的使用
- Ubuntu安装BackExec Remote Agent for Linux
- python 字符串 4位一组_Python基础4- 字符串
- 搭建个人博客站点流程
- vs多项目模板及add-in开发
- #《机器学习》_周志华(西瓜书)南瓜书_第4章 决策树
- python连接access2007_使用Python / pyodbc插入Access DB
- 微信:item_search_seller - 搜索公众号列表
- 数据可视化编辑平台上线,小程序也能拥有可视化图层
- JOI-2016/17 春季合宿 切题记
- 回顾2007:新兴网络服务汇总(完整篇)
- webrtc janus服务器部署在公网,coturn转发媒体流
- Veri3.SDF后仿真时序检查
- python爬虫基础(二)
- ORACLE 不支持 惠普小型机,不要过度迷信小型机 转载
- 德国的吃--一篇很有意思的文章
- 计算机屏幕出现条纹w7,为什么我安装win7后,屏幕的分辨率会下降?屏幕上有一横一横的条纹?...
热门文章
- glide源码中包含了那种设计模式_推荐一个好用的拍照选图库,致敬Glide
- Jenkins之邮件通知
- mysql设置slave复制_mysql5.5建立主从复制(setupmaster-slavereplication)_MySQL
- 电源稳定性测试软件,电源稳定性测试
- 网易 for linux,NetEaseMusic
- c语言中和if语句作用相似的运算符,南开18秋学期(1709、1803、1809)《C语言程序设计》在线作业【答案】...
- 企业研发人员配备比例_企业管理人员合理配置比例
- java验证码局部刷新_JS局部刷新图形验证码
- python【力扣LeetCode算法题库】289- 生命游戏
- 公共基础选择题前10t