kafka 2.10 java api_kafka中常用API的简单JAVA代码
通过之前《kafka分布式消息队列介绍以及集群安装》的介绍,对kafka有了初步的了解。本文主要讲述java代码中常用的操作。
准备:增加kafka依赖
org.apache.kafka
kafka-clients
0.10.2.0
一、kafka中对topic的操作
package org.kafka;
import kafka.admin.DeleteTopicCommand;
import kafka.admin.TopicCommand;
/**
* kafka主题操作
*/
public class TopicDemo {
/**
* 添加主题
* linux命令:bin/kafka-topics.sh --create --zookeeper 192.168.2.100:2181 --replication-factor 3 --partitions 1 --topic topictest0416
*/
public static void createTopic() {
String[] options = new String[] {
"--create",
"--zookeeper",
"192.168.2.100:2181",
"--replication-factor",
"3",
"--partitions",
"1",
"--topic",
"topictest0416" };
TopicCommand.main(options);
}
/**
* 查询所有主题
* linux命令:bin/kafka-topics.sh --list --zookeeper 192.168.2.100:2181
*/
public static void queryTopic() {
String[] options = new String[] {
"--list",
"--zookeeper",
"192.168.2.100:2181" };
TopicCommand.main(options);
}
/**
* 查看指定主题的分区及副本状态信息
* bin/kafka-topics.sh --describe --zookeeper 192.168.2.100:2181 --topic topictest0416
*/
public static void queryTopicByName() {
String[] options = new String[]{
"--describe",
"--zookeeper",
"192.168.2.100:2181",
"--topic",
"topictest0416",
};
TopicCommand.main(options);
}
/**
* 修改主题
* linux命令:bin/kafka-topics.sh --zookeeper 192.168.2.100:2181 --alter --topic topictest0416 --partitions 3
*/
public static void alterTopic() {
String[] options = new String[]{
"--alter",
"--zookeeper",
"192.168.2.100:2181",
"--topic",
"topictest0416",
"--partitions",
"3"
};
TopicCommand.main(options);
}
/**
* 删除主题
*/
public static void delTopic() {
String[] options = new String[] {
"--zookeeper",
"192.168.2.100:2181",
"--topic",
"topictest0416" };
DeleteTopicCommand.main(options);
}
}
二、Producer代码
package org.kafka;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class ProducerDemo {
public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
//zookeeper集群列表
props.put("zk.connect", "hadoop1-1:2181,hadoop1-2:2181,hadoop1-3:2181");
props.put("metadata.broker.list", "hadoop1-1:9092,hadoop1-2:9092,hadoop1-3:9092");
//设置消息使用哪个类来序列化
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config = new ProducerConfig(props);
//构造Producer对象
Producer producer = new Producer(config);
// 发送业务消息
// 读取文件 读取内存数据库
for (int i = 0; i < 10; i++) {
Thread.sleep(500);
KeyedMessage km = new KeyedMessage("topictest0416", "I am a producer " + i + " hello!");
producer.send(km);
}
}
}
三、consumer代码
package org.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
public class ConsumerDemo {
private static final String topic = "topictest0416";
private static final Integer threads = 1;
public static void main(String[] args) {
Properties props = new Properties();
//zookeeper集群列表
props.put("zookeeper.connect", "hadoop1-1:2181,hadoop1-2:2181,hadoop1-3:2181");
//消费者组ID
props.put("group.id", "001");
//设置读取的偏移量;smallest意思是指向最小的偏移量
props.put("auto.offset.reset", "smallest");
//将Properties封装成消费者配置对象
ConsumerConfig config = new ConsumerConfig(props);
ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);
Map topicMap = new HashMap<>();
//key为消费的topic
//value为消费的线程数量
topicMap.put(topic, threads);
Map>> consumerMap = consumer.createMessageStreams(topicMap);
List> streams = consumerMap.get(topic);
for (final KafkaStream kafkaStream : streams) {
new Thread(new Runnable() {
@Override
public void run() {
for (MessageAndMetadata mm : kafkaStream) {
System.out.println(new String(mm.message()));
}
}
}).start();
}
}
}
四、测试
先启动Consumer,再启动Producer
测试结果:
kafka 2.10 java api_kafka中常用API的简单JAVA代码相关推荐
- java中常用API、Scanner类、匿名对象、Random类、ArrayList类、对象数组
java中常用API: API:Application Programming Interface,应用程序编程接口.Java API是JDK中提供给我们使用的类的说明文档.这些类将底层的代码实现封装 ...
- 【网络安全入门大总结】—Java语言中常用的渗透漏洞大汇总
Java语言中常用的漏洞大汇总,建议收藏. 准备好,上课了~~~ 目录 Servlet 简介 生命周期为 接口 Struts 2 . 简介 请求流程 相关CVE Spring 简介 Spring MV ...
- SiKi学院 Unity中常用api学习笔记(001-014)
Api 应用程序编程接口 前言 笔记是看siki学院中<Unity中常用api>的学习笔记 课程地址: http://www.sikiedu.com/my/course/59 强烈推荐大 ...
- Java类TreeMap常用API补充
TreeMap常用API补充 TreeMap常用API详解 package TreeMap_UtilityClass;import java.util.*;/*** TreeMap常用API补充*/ ...
- Java开发中常用的设计模式-单例模式
单例(Singleton)模式的定义:指一个类只有一个实例,且该类能自行创建这个实例的一种模式. Java开发中常用的设计模式-单例模式 单例模式有3个特点: 单例类只有一个实例对象: 该单例对象必须 ...
- JAVA 开发中常用的工具有哪些?
Java开发中常用的工具有以下几种: Eclipse:一款非常流行的开发工具,提供了很多方便的功能,如代码自动补全.调试.版本控制等. IntelliJ IDEA:一款功能强大的Java集成开发环境, ...
- SiKi学院 Unity中常用api学习笔记(015-019)
Api 应用程序编程接口 前言 笔记是看siki学院中<Unity中常用api>的学习笔记 课程地址: http://www.sikiedu.com/my/course/59 强烈推荐大 ...
- java开发中常用的Git命令详解
java开发中常用的Git命令详解(IDEA内如何操作) 一:写这篇文章的目的是什么? 二:使用场景在哪里? 1:当我们要使用idea去git仓库拉代码时,首先我们的idea得配置git工具 2:项目 ...
- ASP.NET 程序中常用的三十三种代码(1)
ASP.NET 程序中常用的三十三种代码(1) 1. 打开新的窗口并传送参数: 传送参数: response.write("<script>window.open('*.ASPx ...
- 如何在Java项目中查找未使用/无效的代码
本文翻译自:How to find unused/dead code in java projects What tools do you use to find unused/dead code i ...
最新文章
- Oracle表记录字节长长度的两种计算方式
- 科大星云诗社动态20210316
- usleep延时0.毫秒_LabVIEW从0到1系列视频培训_第4讲全集_操作例程说明
- Facebook 最新可佩戴 AR 设备、AR 设备未来五年市场扩张、语音社交新创Swell等|Decode the Week...
- 用WindowManager实现Android悬浮框以及拖动事件
- 上帝造题的七分钟(ybtoj-树状数组)
- virtualbox+oracle linux 6.3 下安装oracle 11.2.3.0
- 异构计算:PC的“动车组”
- opencv 显示图片失败,全灰
- deep learning 学习资料
- 购买周期 python-用Python实现一个基于EG协整法的跨周期套利策略
- mkdir命令、chmod修改权限、利用scp 远程上传下载文件/文件夹
- React 模板封装之基础模板 BaseTable
- 使用httpclient必须知道的参数设置及代码写法、存在的风险
- 做外贸怎么起步,从哪里开发客户
- c语言中用什么表示真假,C语言中如何表示“真”和“假”?系统如何判断一个量的“真”和“假”?...
- 提高影响力:职权+威信
- 第14章 数据分析案例--Python for Data Analysis 2nd
- Could not find artifact com.dingtalk.api:top-api-sdk-dev:pom:ding-open-mc-SNAPSHOT in aliyunmaven
- Qt Linguist(语言家)与QtCreator集成
热门文章
- 【车间调度】基于matlab粒子群算法求解车间生产调度问题【含Matlab源码 245期】
- 【雷达通信】基于matlab Omiga-K算法SAR回波生成和成像【含Matlab源码 1184期】
- 【图像处理】基于matlab GUI自动报靶系统(重弹孔)【含Matlab源码 973期】
- 【电路仿真】基于matlab Simulink四旋翼PID控制【含Matlab源码 454期】
- lstm预测单词_从零开始理解单词嵌入| LSTM模型|
- 计算机应用基础统考模拟练习系统,网教计算机应用基础统考综合模拟练习题(一)...
- java考ccf_CCF考试试题之门禁系统java解题代码
- mysql 导入导出 备份_MySQL - 数据备份与还原(导出导入)
- android 按钮复用,Android Button 自带阴影效果另一种解决办法
- matlab三维矩阵的运算符,【求助】多维矩阵求和运算!!