转载注明出处:http://blog.csdn.net/honglei915/article/details/37563647
Kafka视频教程同步首发,欢迎观看!

上篇文章中我们搭建了kafka的服务器,并可以使用Kafka的命令行工具创建topic,发送和接收消息。下面我们来搭建kafka的开发环境。
添加依赖

搭建开发环境需要引入kafka的jar包,一种方式是将Kafka安装包中lib下的jar包加入到项目的classpath中,这种比较简单了。不过我们使用另一种更加流行的方式:使用maven管理jar包依赖。
创建好maven项目后,在pom.xml中添加以下依赖:
[html] view plaincopy
  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka_2.10</artifactId>
  4. <version>0.8.2.2</version>
  5. </dependency>
添加依赖后如果有两个jar包的依赖找不到。点击这里下载这两个jar包,解压后你有两种选择,第一种是使用mvn的install命令将jar包安装到本地仓库,另一种是直接将解压后的文件夹拷贝到mvn本地仓库的com文件夹下,比如我的本地仓库是d:\mvn,完成后我的目录结构是这样的:
配置程序

更新更全的API编程实例点这里:http://blog.csdn.net/honglei915/article/details/37697655
首先是一个充当配置文件作用的接口,配置了Kafka的各种连接参数:
[java] view plaincopy
  1. package com.sohu.kafkademon;
  2. public interface KafkaProperties
  3. {
  4. final static String zkConnect = "10.22.10.139:2181";
  5. final static String groupId = "group1";
  6. final static String topic = "topic1";
  7. final static String kafkaServerURL = "10.22.10.139";
  8. final static int kafkaServerPort = 9092;
  9. final static int kafkaProducerBufferSize = 64 * 1024;
  10. final static int connectionTimeOut = 20000;
  11. final static int reconnectInterval = 10000;
  12. final static String topic2 = "topic2";
  13. final static String topic3 = "topic3";
  14. final static String clientId = "SimpleConsumerDemoClient";
  15. }
producer

[java] view plaincopy
  1. package com.sohu.kafkademon;
  2. import java.util.Properties;
  3. import kafka.producer.KeyedMessage;
  4. import kafka.producer.ProducerConfig;
  5. /**
  6. * @author leicui bourne_cui@163.com
  7. */
  8. public class KafkaProducer extends Thread
  9. {
  10. private final kafka.javaapi.producer.Producer<Integer, String> producer;
  11. private final String topic;
  12. private final Properties props = new Properties();
  13. public KafkaProducer(String topic)
  14. {
  15. props.put("serializer.class", "kafka.serializer.StringEncoder");
  16. props.put("metadata.broker.list", "10.22.10.139:9092");
  17. producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
  18. this.topic = topic;
  19. }
  20. @Override
  21. public void run() {
  22. int messageNo = 1;
  23. while (true)
  24. {
  25. String messageStr = new String("Message_" + messageNo);
  26. System.out.println("Send:" + messageStr);
  27. producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
  28. messageNo++;
  29. try {
  30. sleep(3000);
  31. } catch (InterruptedException e) {
  32. // TODO Auto-generated catch block
  33. e.printStackTrace();
  34. }
  35. }
  36. }
  37. }

consumer

[java] view plaincopy
  1. package com.sohu.kafkademon;
  2. import java.util.HashMap;
  3. import java.util.List;
  4. import java.util.Map;
  5. import java.util.Properties;
  6. import kafka.consumer.ConsumerConfig;
  7. import kafka.consumer.ConsumerIterator;
  8. import kafka.consumer.KafkaStream;
  9. import kafka.javaapi.consumer.ConsumerConnector;
  10. /**
  11. * @author leicui bourne_cui@163.com
  12. */
  13. public class KafkaConsumer extends Thread
  14. {
  15. private final ConsumerConnector consumer;
  16. private final String topic;
  17. public KafkaConsumer(String topic)
  18. {
  19. consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
  20. createConsumerConfig());
  21. this.topic = topic;
  22. }
  23. private static ConsumerConfig createConsumerConfig()
  24. {
  25. Properties props = new Properties();
  26. props.put("zookeeper.connect", KafkaProperties.zkConnect);
  27. props.put("group.id", KafkaProperties.groupId);
  28. props.put("zookeeper.session.timeout.ms", "40000");
  29. props.put("zookeeper.sync.time.ms", "200");
  30. props.put("auto.commit.interval.ms", "1000");
  31. return new ConsumerConfig(props);
  32. }
  33. @Override
  34. public void run() {
  35. Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  36. topicCountMap.put(topic, new Integer(1));
  37. Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  38. KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
  39. ConsumerIterator<byte[], byte[]> it = stream.iterator();
  40. while (it.hasNext()) {
  41. System.out.println("receive:" + new String(it.next().message()));
  42. try {
  43. sleep(3000);
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. }
  49. }
简单的发送接收

运行下面这个程序,就可以进行简单的发送接收消息了:
[java] view plaincopy
  1. package com.sohu.kafkademon;
  2. /**
  3. * @author leicui bourne_cui@163.com
  4. */
  5. public class KafkaConsumerProducerDemo
  6. {
  7. public static void main(String[] args)
  8. {
  9. KafkaProducer producerThread = new KafkaProducer(KafkaProperties.topic);
  10. producerThread.start();
  11. KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic);
  12. consumerThread.start();
  13. }
  14. }
高级别的consumer

下面是比较负载的发送接收的程序:
[java] view plaincopy
  1. package com.sohu.kafkademon;
  2. import java.util.HashMap;
  3. import java.util.List;
  4. import java.util.Map;
  5. import java.util.Properties;
  6. import kafka.consumer.ConsumerConfig;
  7. import kafka.consumer.ConsumerIterator;
  8. import kafka.consumer.KafkaStream;
  9. import kafka.javaapi.consumer.ConsumerConnector;
  10. /**
  11. * @author leicui bourne_cui@163.com
  12. */
  13. public class KafkaConsumer extends Thread
  14. {
  15. private final ConsumerConnector consumer;
  16. private final String topic;
  17. public KafkaConsumer(String topic)
  18. {
  19. consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
  20. createConsumerConfig());
  21. this.topic = topic;
  22. }
  23. private static ConsumerConfig createConsumerConfig()
  24. {
  25. Properties props = new Properties();
  26. props.put("zookeeper.connect", KafkaProperties.zkConnect);
  27. props.put("group.id", KafkaProperties.groupId);
  28. props.put("zookeeper.session.timeout.ms", "40000");
  29. props.put("zookeeper.sync.time.ms", "200");
  30. props.put("auto.commit.interval.ms", "1000");
  31. return new ConsumerConfig(props);
  32. }
  33. @Override
  34. public void run() {
  35. Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
  36. topicCountMap.put(topic, new Integer(1));
  37. Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
  38. KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
  39. ConsumerIterator<byte[], byte[]> it = stream.iterator();
  40. while (it.hasNext()) {
  41. System.out.println("receive:" + new String(it.next().message()));
  42. try {
  43. sleep(3000);
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. }
  49. }

漫游kafka实战篇之搭建Kafka开发环境相关推荐

  1. 漫游Kafka实战篇之搭建Kafka运行环境

    原文地址:http://blog.csdn.net/honglei915/article/details/37564329 Kafka视频教程同步首发,欢迎观看! 接下来一步一步搭建Kafka运行环境 ...

  2. 上手阿里云服务器(一)——搭建LAMP开发环境、防火墙、文件传输

    轻量应用服务器的搭建 搭建LAMP开发环境 防火墙规则 上传Web文件 参考文档:阿里云云服务器官方文档 参考视频:b站教程视频 为了更简单地入门云计算,所以选择使用轻量应用服务器(有别于云服务器EC ...

  3. kafka实战篇(二):消息消费实战

    写在前面:我是「且听风吟」,目前是某上市游戏公司的大数据开发工程师,热爱大数据开源技术,喜欢分享自己的所学所悟,现阶段正在从头梳理大数据体系的知识,以后将会把时间重点放在Spark和Flink上面. ...

  4. python ai应用开发_AI应用开发实战 - 从零开始搭建macOS开发环境

    AI应用开发实战 - 从零开始搭建macOS开发环境 联系我们 OpenmindChina@microsoft.com 零.前提条件 一台能联网的电脑,使用macOS操作系统 请确保鼠标.键盘.显示器 ...

  5. AI应用开发实战系列之二:从零开始搭建macOS开发环境

    AI应用开发实战 - 从零开始搭建macOS开发环境 本视频配套的视频教程请访问:https://www.bilibili.com/video/av24368929/ 零.前提条件 一台能联网的电脑, ...

  6. 【kratos入门实战教程】1-kratos项目搭建和开发环境配置

    1.系列目录 [kratos入门实战教程]0-商城项目介绍 [kratos入门实战教程]1-kratos项目搭建和开发环境配置 [kratos入门实战教程]2-实现注册登陆业务 2.概览 经过上一篇的 ...

  7. linux pip3使用清华源_Linux实战016:Ubuntu搭建python开发环境

    我们在安装Ubuntu系统的时候会自带安装python2.7和python3.6版本的Python解释器,直接执行"ptyhon"默认运行的是python2.7,只有执行" ...

  8. ubuntu下搭建android开发环境(四)核心篇安装AndroidStudio、sdk、jdk

    [置顶] ubuntu下搭建android开发环境(四)核心篇安装AndroidStudio.sdk.jdk(by 星空武哥) <div class="article_manage c ...

  9. 从零开始vim搭建Java开发环境之coc.nvim 篇

    前言 vim之美妙我就不过多介绍了,懂的自然懂.之前我已经有一篇文章介绍如何使用SpaceVim来搭建Java开发环境. 传送门:<从零开始vim搭建Java开发环境[视频]> 最近使用c ...

最新文章

  1. java前台传多个id用什么接收_前端js传多个id 到java后台的处理方式
  2. chcon命令 selinux 配置等
  3. Element 'dependency' cannot have character [children], because the type's content type is element-on
  4. react怎么存上一页_【React】存储全局数据
  5. 窗口捕获显示黑屏_win10每次重启黑屏假死
  6. 关于网页导航栏制作的几种方法与常见问题解决(新人向)
  7. 【快报】基于K2 BPM的新一代协同办公门户实践交流会
  8. Maven打包自动发布到nexus私服
  9. 02-linux下 yum安装R环境和Rserve安装
  10. excel用警员姓名查找警号信息
  11. 【电路仿真】基于matlab BP神经网络三相逆变器故障诊断【含Matlab源码 1655期】
  12. 【技术维新 践行精彩】浅谈未来网站的构建
  13. 有道词典“网络已断开”的解决办法
  14. 华为手机Android studio 配置ADB wifi 调试
  15. 在中琅条码打印软件中怎样实现CMYK的设置
  16. 每天定投10元基金有意义吗?
  17. Halcon 错误 提示 2021 System clock has been set back 解决方法
  18. Balsamiq新的感觉
  19. Eclipse中格式化JS、HTML代码
  20. ERC721: Non-fungible Token Standard

热门文章

  1. 在jsx中绑定js表达式以及jsx注释
  2. 数据库-优化-SQL及索引优化
  3. 模板打印:代码实现和总结
  4. 请使用日期时间相关的API,计算出一个人已经出生了多少天
  5. zabbix监控mysql的性能_zabbix2.4.2实战监控mysql5.6性能
  6. java cmd找不到文件_cmd中输入java找不到文件解决方法
  7. PyTorch深度学习实践02
  8. Eclipse-Java代码规范和质量检查插件-PMD
  9. kill -0 pid是做什么用的?
  10. Socket-Client通信