Kafka--JAVA API(Producer和Consumer)

Kafka 版本2.11-0.9.0.0

producer

package com.yzy.spark.kafka;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

import java.util.Properties;

/**

* Producer

*/

public class KafkaProducer extends Thread{

private String topic;

//--1

private Producer producer;

public KafkaProducer(String topic){

this.topic=topic;

Properties properties = new Properties(); //--2

properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);

properties.put("serializer.class","kafka.serializer.StringEncoder");

properties.put("request.require.acks","1");

// properties.put("partitioner.class","com.yzy.spark.kafka.MyPartition");

ProducerConfig config=new ProducerConfig(properties);

producer=new Producer<>(config);

}

@Override

public void run() {

int messageNo=1;

while (true){

String message="message"+ messageNo;

producer.send(new KeyedMessage("test2",String.valueOf(messageNo),message)); //--4

System.out.println("Sent:"+message);

messageNo++;

try {

Thread.sleep(1000);

}catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

1.定义Producer对象,这里要注意泛型类型,之后的KeyedMessage的泛型类型和Producer相同。

2.创建Producer对象需要传入一个ProducerConfig对象,而ProducerConfig对象需要由Properties对象构造,properties的属性设置可以查看ProducerConfig源码,注释很详细(个别属性在ProducerConfig父类AsyncProducerConfig 和 SyncProducerConfigShared中)。

3.该属性可以指定partitioner,如果不设置默认会设为kafka.producer.DefaultPartitioner。

4.看看KeyedMessage的源码:

case class KeyedMessage[K, V](topic: String, key: K, partKey: Any, message: V) {

if(topic == null)

throw new IllegalArgumentException("Topic cannot be null.")

def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)

def this(topic: String, key: K, message: V) = this(topic, key, key, message)

def partitionKey = {

if(partKey != null)

partKey

else if(hasKey)

key

else

null

}

def hasKey = key != null

}

参数有4个,topic必填,message是消息,通常只填这两个参数即可发送消息。key和partKey是用于partition的参数,partKey的优先级高于key,但是partKey只对当前消息起作用,key和partKey只能是String类型。下面来看看partition策略和key。

partition

先在服务器端将topic test2的partitions设定为3

kafka-topics.sh --alter --zookeeper localhost:2181 --partitions 3 --topic test2

然后回到客户端看看kafka.producer.DefaultPartitioner源码

package kafka.producer

import kafka.utils._

import org.apache.kafka.common.utils.Utils

class DefaultPartitioner(props: VerifiableProperties = null) extends Partitioner {

private val random = new java.util.Random

def partition(key: Any, numPartitions: Int): Int = {

Utils.abs(key.hashCode) % numPartitions

}

}

该类有一个方法 def partition(key: Any, numPartitions: Int),第一个参数为上文所说的key或partKey,第二个为partitions的数量,传入的值就是在服务器设置的值(3),将key的hashCode对numPartitions取余得到结果(选择对应编号的partition)

我们可以自己定义一个partition.class并配置到properties属性中,这里给一个简单的例子:

package com.yzy.spark.kafka;

import kafka.producer.Partitioner;

import kafka.utils.VerifiableProperties;

public class MyPartition implements Partitioner {

public MyPartition(VerifiableProperties properties){

}

@Override

public int partition(Object key, int numPartitions) {

System.out.println("numPartitions:"+numPartitions);

return key.hashCode()%numPartitions;

}

}

Consumer

package com.yzy.spark.kafka;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

public class KafkaConsumer extends Thread{

private String topic;

private String groupId;

public KafkaConsumer(String topic,String groupId){

this.topic=topic;

this.groupId=groupId;

}

private ConsumerConnector createConnector(){

Properties properties=new Properties();//--1

properties.put("zookeeper.connect",KafkaProperties.ZK);

properties.put("group.id",groupId);

properties.put("auto.offset.reset", "largest");//--2

ConsumerConfig consumerConfig = new ConsumerConfig(properties);

return Consumer.createJavaConsumerConnector(consumerConfig);

}

@Override

public void run() {

ConsumerConnector consumerConnector=this.createConnector();

Map topicCountMap=new HashMap<>();

topicCountMap.put(topic,1);

Map>> messageStreams = consumerConnector.createMessageStreams(topicCountMap);

KafkaStream stream = messageStreams.get(topic).get(0);

ConsumerIterator iterator = stream.iterator();

while(iterator.hasNext()){

String message=new String(iterator.next().message());

}

}

}

Consumer相关的东西比较多,涉及到group和partition机制,以官方文档为准。

1.properties和producer一样看源码配置。

2.这个属性和shell命令中的--from-beginning对应。可以填smallest(从头读取)和largest(默认值,读取最新的元素,严格来说是最新的offset位置开始读取)。注意:每一次一个新的consumer试图去消费一个topic时,都是从所在group的largest offset位置读取,即也可通过设置group.id来实现from-beginning,只要将每个consumer的group.id都设置为一个新值即可,例如properties.put("group.id", UUID.randomUUID().toString());

java连接kafka api_Kafka-JavaAPI(Producer And Consumer)相关推荐

  1. 最新Kafka教程(包含kafka部署与基本操作、java连接kafka、spring连接kafka以及使用springboot)

    最新Kafka教程(包含kafka部署与基本操作.java连接kafka.spring连接kafka以及使用springboot) 欢迎转载,转载请注明网址:https://blog.csdn.net ...

  2. nodejs链接kafka示例(producer、consumer)

    2019独角兽企业重金招聘Python工程师标准>>> 安装node环境: wget https://nodejs.org/dist/v6.10.3/node-v6.10.3-lin ...

  3. 详细讲解如何使用Java连接Kafka构建生产者和消费者(带测试样例)

    1 缘起 学习消息队列的过程中,先补习了RabbitMQ相关知识, 接着又重温了Kafka相关的知识, 发现,我并没有积累Java原生操作Kafka的文章, 只使用SpringBoot集成过Kafka ...

  4. java 连接kafka超时_java – Kafka KStreams – 处理超时

    我试图使用< KStream> .process()与Time Windows.of("name",30000)批量处理一些KTable值并发送它们.似乎30秒超过了消 ...

  5. java连接kafka接收不到数据_Kafka客户端无法接收消息

    我在远程机器上设置了kafka和zookeeper . 在那台机器上我可以看到下面在官方网站上使用测试方法 . > bin/kafka-console-producer.sh --broker- ...

  6. java 连接kafka之坑Connection to node 0 could not be established. Broker may not be available

    最近学习kafka java api遇到一个特别坑的问题:<Connection to node 0 could not be established. Broker may not be av ...

  7. kafka 0.9 java开发_kafka 0.9 java producer and consumer demo

    实验环境: kafka_2.11-0.9.0.1.tgz zookeeper-3.4.6.tar.gz 样例代码: git clone https://github.com/downgoon/hell ...

  8. java kafka api_kafka java API的使用

    Kafka包含四种核心的API: 1.Producer API支持应用将数据流发送到Kafka集群的主题 2.Consumer API支持应用从Kafka集群的主题中读取数据流 3.Streams A ...

  9. 057 Java中kafka的Producer程序实现

    1.需要启动的服务 这里启动的端口是9092. bin/kafka-console-consumer.sh --topic beifeng --zookeeper linux-hadoop01.ibe ...

最新文章

  1. 工业机器人什么情况下会出现奇点_【怎么解释机器人奇点造成的结果?】-工业-有米下载-6miu.com...
  2. 网页插件学javascript还是jquery好啊?
  3. Oracle 数据字典表 -- SYS.COL$
  4. box-sizing的使用
  5. Ubuntu安装BackExec Remote Agent for Linux
  6. python 字符串 4位一组_Python基础4- 字符串
  7. 搭建个人博客站点流程
  8. vs多项目模板及add-in开发
  9. #《机器学习》_周志华(西瓜书)南瓜书_第4章 决策树
  10. python连接access2007_使用Python / pyodbc插入Access DB
  11. 微信:item_search_seller - 搜索公众号列表
  12. 数据可视化编辑平台上线,小程序也能拥有可视化图层
  13. JOI-2016/17 春季合宿 切题记
  14. 回顾2007:新兴网络服务汇总(完整篇)
  15. webrtc janus服务器部署在公网,coturn转发媒体流
  16. Veri3.SDF后仿真时序检查
  17. python爬虫基础(二)
  18. ORACLE 不支持 惠普小型机,不要过度迷信小型机 转载
  19. 德国的吃--一篇很有意思的文章
  20. 计算机屏幕出现条纹w7,为什么我安装win7后,屏幕的分辨率会下降?屏幕上有一横一横的条纹?...

热门文章

  1. glide源码中包含了那种设计模式_推荐一个好用的拍照选图库,致敬Glide
  2. Jenkins之邮件通知
  3. mysql设置slave复制_mysql5.5建立主从复制(setupmaster-slavereplication)_MySQL
  4. 电源稳定性测试软件,电源稳定性测试
  5. 网易 for linux,NetEaseMusic
  6. c语言中和if语句作用相似的运算符,南开18秋学期(1709、1803、1809)《C语言程序设计》在线作业【答案】...
  7. 企业研发人员配备比例_企业管理人员合理配置比例
  8. java验证码局部刷新_JS局部刷新图形验证码
  9. python【力扣LeetCode算法题库】289- 生命游戏
  10. 公共基础选择题前10t