通过之前《kafka分布式消息队列介绍以及集群安装》的介绍,对kafka有了初步的了解。本文主要讲述java代码中常用的操作。

准备:增加kafka依赖

org.apache.kafka

kafka-clients

0.10.2.0

一、kafka中对topic的操作

package org.kafka;

import kafka.admin.DeleteTopicCommand;

import kafka.admin.TopicCommand;

/**

* kafka主题操作

*/

public class TopicDemo {

/**

* 添加主题

* linux命令:bin/kafka-topics.sh --create --zookeeper 192.168.2.100:2181 --replication-factor 3 --partitions 1 --topic topictest0416

*/

public static void createTopic() {

String[] options = new String[] {

"--create",

"--zookeeper",

"192.168.2.100:2181",

"--replication-factor",

"3",

"--partitions",

"1",

"--topic",

"topictest0416" };

TopicCommand.main(options);

}

/**

* 查询所有主题

* linux命令:bin/kafka-topics.sh --list --zookeeper 192.168.2.100:2181

*/

public static void queryTopic() {

String[] options = new String[] {

"--list",

"--zookeeper",

"192.168.2.100:2181" };

TopicCommand.main(options);

}

/**

* 查看指定主题的分区及副本状态信息

* bin/kafka-topics.sh --describe --zookeeper 192.168.2.100:2181 --topic topictest0416

*/

public static void queryTopicByName() {

String[] options = new String[]{

"--describe",

"--zookeeper",

"192.168.2.100:2181",

"--topic",

"topictest0416",

};

TopicCommand.main(options);

}

/**

* 修改主题

* linux命令:bin/kafka-topics.sh --zookeeper 192.168.2.100:2181 --alter --topic topictest0416 --partitions 3

*/

public static void alterTopic() {

String[] options = new String[]{

"--alter",

"--zookeeper",

"192.168.2.100:2181",

"--topic",

"topictest0416",

"--partitions",

"3"

};

TopicCommand.main(options);

}

/**

* 删除主题

*/

public static void delTopic() {

String[] options = new String[] {

"--zookeeper",

"192.168.2.100:2181",

"--topic",

"topictest0416" };

DeleteTopicCommand.main(options);

}

}

二、Producer代码

package org.kafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

public class ProducerDemo {

public static void main(String[] args) throws InterruptedException {

Properties props = new Properties();

//zookeeper集群列表

props.put("zk.connect", "hadoop1-1:2181,hadoop1-2:2181,hadoop1-3:2181");

props.put("metadata.broker.list", "hadoop1-1:9092,hadoop1-2:9092,hadoop1-3:9092");

//设置消息使用哪个类来序列化

props.put("serializer.class", "kafka.serializer.StringEncoder");

ProducerConfig config = new ProducerConfig(props);

//构造Producer对象

Producer producer = new Producer(config);

// 发送业务消息

// 读取文件 读取内存数据库

for (int i = 0; i < 10; i++) {

Thread.sleep(500);

KeyedMessage km = new KeyedMessage("topictest0416", "I am a producer " + i + " hello!");

producer.send(km);

}

}

}

三、consumer代码

package org.kafka;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import kafka.consumer.Consumer;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

import kafka.message.MessageAndMetadata;

public class ConsumerDemo {

private static final String topic = "topictest0416";

private static final Integer threads = 1;

public static void main(String[] args) {

Properties props = new Properties();

//zookeeper集群列表

props.put("zookeeper.connect", "hadoop1-1:2181,hadoop1-2:2181,hadoop1-3:2181");

//消费者组ID

props.put("group.id", "001");

//设置读取的偏移量;smallest意思是指向最小的偏移量

props.put("auto.offset.reset", "smallest");

//将Properties封装成消费者配置对象

ConsumerConfig config = new ConsumerConfig(props);

ConsumerConnector consumer = Consumer.createJavaConsumerConnector(config);

Map topicMap = new HashMap<>();

//key为消费的topic

//value为消费的线程数量

topicMap.put(topic, threads);

Map>> consumerMap = consumer.createMessageStreams(topicMap);

List> streams = consumerMap.get(topic);

for (final KafkaStream kafkaStream : streams) {

new Thread(new Runnable() {

@Override

public void run() {

for (MessageAndMetadata mm : kafkaStream) {

System.out.println(new String(mm.message()));

}

}

}).start();

}

}

}

四、测试

先启动Consumer,再启动Producer

测试结果:

kafka 2.10 java api_kafka中常用API的简单JAVA代码相关推荐

  1. java中常用API、Scanner类、匿名对象、Random类、ArrayList类、对象数组

    java中常用API: API:Application Programming Interface,应用程序编程接口.Java API是JDK中提供给我们使用的类的说明文档.这些类将底层的代码实现封装 ...

  2. 【网络安全入门大总结】—Java语言中常用的渗透漏洞大汇总

    Java语言中常用的漏洞大汇总,建议收藏. 准备好,上课了~~~ 目录 Servlet 简介 生命周期为 接口 Struts 2 . 简介 请求流程 相关CVE Spring 简介 Spring MV ...

  3. SiKi学院 Unity中常用api学习笔记(001-014)

    Api 应用程序编程接口 前言 笔记是看siki学院中<Unity中常用api>的学习笔记 课程地址:  http://www.sikiedu.com/my/course/59 强烈推荐大 ...

  4. Java类TreeMap常用API补充

    TreeMap常用API补充 TreeMap常用API详解 package TreeMap_UtilityClass;import java.util.*;/*** TreeMap常用API补充*/ ...

  5. Java开发中常用的设计模式-单例模式

    单例(Singleton)模式的定义:指一个类只有一个实例,且该类能自行创建这个实例的一种模式. Java开发中常用的设计模式-单例模式 单例模式有3个特点: 单例类只有一个实例对象: 该单例对象必须 ...

  6. JAVA 开发中常用的工具有哪些?

    Java开发中常用的工具有以下几种: Eclipse:一款非常流行的开发工具,提供了很多方便的功能,如代码自动补全.调试.版本控制等. IntelliJ IDEA:一款功能强大的Java集成开发环境, ...

  7. SiKi学院 Unity中常用api学习笔记(015-019)

    Api 应用程序编程接口 前言 笔记是看siki学院中<Unity中常用api>的学习笔记 课程地址:  http://www.sikiedu.com/my/course/59 强烈推荐大 ...

  8. java开发中常用的Git命令详解

    java开发中常用的Git命令详解(IDEA内如何操作) 一:写这篇文章的目的是什么? 二:使用场景在哪里? 1:当我们要使用idea去git仓库拉代码时,首先我们的idea得配置git工具 2:项目 ...

  9. ASP.NET 程序中常用的三十三种代码(1)

    ASP.NET 程序中常用的三十三种代码(1) 1. 打开新的窗口并传送参数: 传送参数: response.write("<script>window.open('*.ASPx ...

  10. 如何在Java项目中查找未使用/无效的代码

    本文翻译自:How to find unused/dead code in java projects What tools do you use to find unused/dead code i ...

最新文章

  1. Oracle表记录字节长长度的两种计算方式
  2. 科大星云诗社动态20210316
  3. usleep延时0.毫秒_LabVIEW从0到1系列视频培训_第4讲全集_操作例程说明
  4. Facebook 最新可佩戴 AR 设备、AR 设备未来五年市场扩张、语音社交新创Swell等|Decode the Week...
  5. 用WindowManager实现Android悬浮框以及拖动事件
  6. 上帝造题的七分钟(ybtoj-树状数组)
  7. virtualbox+oracle linux 6.3 下安装oracle 11.2.3.0
  8. 异构计算:PC的“动车组”
  9. opencv 显示图片失败,全灰
  10. deep learning 学习资料
  11. 购买周期 python-用Python实现一个基于EG协整法的跨周期套利策略
  12. mkdir命令、chmod修改权限、利用scp 远程上传下载文件/文件夹
  13. React 模板封装之基础模板 BaseTable
  14. 使用httpclient必须知道的参数设置及代码写法、存在的风险
  15. 做外贸怎么起步,从哪里开发客户
  16. c语言中用什么表示真假,C语言中如何表示“真”和“假”?系统如何判断一个量的“真”和“假”?...
  17. 提高影响力:职权+威信
  18. 第14章 数据分析案例--Python for Data Analysis 2nd
  19. Could not find artifact com.dingtalk.api:top-api-sdk-dev:pom:ding-open-mc-SNAPSHOT in aliyunmaven
  20. Qt Linguist(语言家)与QtCreator集成

热门文章

  1. 【车间调度】基于matlab粒子群算法求解车间生产调度问题【含Matlab源码 245期】
  2. 【雷达通信】基于matlab Omiga-K算法SAR回波生成和成像【含Matlab源码 1184期】
  3. 【图像处理】基于matlab GUI自动报靶系统(重弹孔)【含Matlab源码 973期】
  4. 【电路仿真】基于matlab Simulink四旋翼PID控制【含Matlab源码 454期】
  5. lstm预测单词_从零开始理解单词嵌入| LSTM模型|
  6. 计算机应用基础统考模拟练习系统,网教计算机应用基础统考综合模拟练习题(一)...
  7. java考ccf_CCF考试试题之门禁系统java解题代码
  8. mysql 导入导出 备份_MySQL - 数据备份与还原(导出导入)
  9. android 按钮复用,Android Button 自带阴影效果另一种解决办法
  10. matlab三维矩阵的运算符,【求助】多维矩阵求和运算!!