漫游kafka实战篇之搭建Kafka开发环境
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.10</artifactId>
- <version>0.8.2.2</version>
- </dependency>
- package com.sohu.kafkademon;
- public interface KafkaProperties
- {
- final static String zkConnect = "10.22.10.139:2181";
- final static String groupId = "group1";
- final static String topic = "topic1";
- final static String kafkaServerURL = "10.22.10.139";
- final static int kafkaServerPort = 9092;
- final static int kafkaProducerBufferSize = 64 * 1024;
- final static int connectionTimeOut = 20000;
- final static int reconnectInterval = 10000;
- final static String topic2 = "topic2";
- final static String topic3 = "topic3";
- final static String clientId = "SimpleConsumerDemoClient";
- }
- package com.sohu.kafkademon;
- import java.util.Properties;
- import kafka.producer.KeyedMessage;
- import kafka.producer.ProducerConfig;
- /**
- * @author leicui bourne_cui@163.com
- */
- public class KafkaProducer extends Thread
- {
- private final kafka.javaapi.producer.Producer<Integer, String> producer;
- private final String topic;
- private final Properties props = new Properties();
- public KafkaProducer(String topic)
- {
- props.put("serializer.class", "kafka.serializer.StringEncoder");
- props.put("metadata.broker.list", "10.22.10.139:9092");
- producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
- this.topic = topic;
- }
- @Override
- public void run() {
- int messageNo = 1;
- while (true)
- {
- String messageStr = new String("Message_" + messageNo);
- System.out.println("Send:" + messageStr);
- producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
- messageNo++;
- try {
- sleep(3000);
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
- }
- }
- package com.sohu.kafkademon;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Properties;
- import kafka.consumer.ConsumerConfig;
- import kafka.consumer.ConsumerIterator;
- import kafka.consumer.KafkaStream;
- import kafka.javaapi.consumer.ConsumerConnector;
- /**
- * @author leicui bourne_cui@163.com
- */
- public class KafkaConsumer extends Thread
- {
- private final ConsumerConnector consumer;
- private final String topic;
- public KafkaConsumer(String topic)
- {
- consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
- createConsumerConfig());
- this.topic = topic;
- }
- private static ConsumerConfig createConsumerConfig()
- {
- Properties props = new Properties();
- props.put("zookeeper.connect", KafkaProperties.zkConnect);
- props.put("group.id", KafkaProperties.groupId);
- props.put("zookeeper.session.timeout.ms", "40000");
- props.put("zookeeper.sync.time.ms", "200");
- props.put("auto.commit.interval.ms", "1000");
- return new ConsumerConfig(props);
- }
- @Override
- public void run() {
- Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
- topicCountMap.put(topic, new Integer(1));
- Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
- KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
- ConsumerIterator<byte[], byte[]> it = stream.iterator();
- while (it.hasNext()) {
- System.out.println("receive:" + new String(it.next().message()));
- try {
- sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- package com.sohu.kafkademon;
- /**
- * @author leicui bourne_cui@163.com
- */
- public class KafkaConsumerProducerDemo
- {
- public static void main(String[] args)
- {
- KafkaProducer producerThread = new KafkaProducer(KafkaProperties.topic);
- producerThread.start();
- KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic);
- consumerThread.start();
- }
- }
- package com.sohu.kafkademon;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Properties;
- import kafka.consumer.ConsumerConfig;
- import kafka.consumer.ConsumerIterator;
- import kafka.consumer.KafkaStream;
- import kafka.javaapi.consumer.ConsumerConnector;
- /**
- * @author leicui bourne_cui@163.com
- */
- public class KafkaConsumer extends Thread
- {
- private final ConsumerConnector consumer;
- private final String topic;
- public KafkaConsumer(String topic)
- {
- consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
- createConsumerConfig());
- this.topic = topic;
- }
- private static ConsumerConfig createConsumerConfig()
- {
- Properties props = new Properties();
- props.put("zookeeper.connect", KafkaProperties.zkConnect);
- props.put("group.id", KafkaProperties.groupId);
- props.put("zookeeper.session.timeout.ms", "40000");
- props.put("zookeeper.sync.time.ms", "200");
- props.put("auto.commit.interval.ms", "1000");
- return new ConsumerConfig(props);
- }
- @Override
- public void run() {
- Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
- topicCountMap.put(topic, new Integer(1));
- Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
- KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
- ConsumerIterator<byte[], byte[]> it = stream.iterator();
- while (it.hasNext()) {
- System.out.println("receive:" + new String(it.next().message()));
- try {
- sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
漫游kafka实战篇之搭建Kafka开发环境相关推荐
- 漫游Kafka实战篇之搭建Kafka运行环境
原文地址:http://blog.csdn.net/honglei915/article/details/37564329 Kafka视频教程同步首发,欢迎观看! 接下来一步一步搭建Kafka运行环境 ...
- 上手阿里云服务器(一)——搭建LAMP开发环境、防火墙、文件传输
轻量应用服务器的搭建 搭建LAMP开发环境 防火墙规则 上传Web文件 参考文档:阿里云云服务器官方文档 参考视频:b站教程视频 为了更简单地入门云计算,所以选择使用轻量应用服务器(有别于云服务器EC ...
- kafka实战篇(二):消息消费实战
写在前面:我是「且听风吟」,目前是某上市游戏公司的大数据开发工程师,热爱大数据开源技术,喜欢分享自己的所学所悟,现阶段正在从头梳理大数据体系的知识,以后将会把时间重点放在Spark和Flink上面. ...
- python ai应用开发_AI应用开发实战 - 从零开始搭建macOS开发环境
AI应用开发实战 - 从零开始搭建macOS开发环境 联系我们 OpenmindChina@microsoft.com 零.前提条件 一台能联网的电脑,使用macOS操作系统 请确保鼠标.键盘.显示器 ...
- AI应用开发实战系列之二:从零开始搭建macOS开发环境
AI应用开发实战 - 从零开始搭建macOS开发环境 本视频配套的视频教程请访问:https://www.bilibili.com/video/av24368929/ 零.前提条件 一台能联网的电脑, ...
- 【kratos入门实战教程】1-kratos项目搭建和开发环境配置
1.系列目录 [kratos入门实战教程]0-商城项目介绍 [kratos入门实战教程]1-kratos项目搭建和开发环境配置 [kratos入门实战教程]2-实现注册登陆业务 2.概览 经过上一篇的 ...
- linux pip3使用清华源_Linux实战016:Ubuntu搭建python开发环境
我们在安装Ubuntu系统的时候会自带安装python2.7和python3.6版本的Python解释器,直接执行"ptyhon"默认运行的是python2.7,只有执行" ...
- ubuntu下搭建android开发环境(四)核心篇安装AndroidStudio、sdk、jdk
[置顶] ubuntu下搭建android开发环境(四)核心篇安装AndroidStudio.sdk.jdk(by 星空武哥) <div class="article_manage c ...
- 从零开始vim搭建Java开发环境之coc.nvim 篇
前言 vim之美妙我就不过多介绍了,懂的自然懂.之前我已经有一篇文章介绍如何使用SpaceVim来搭建Java开发环境. 传送门:<从零开始vim搭建Java开发环境[视频]> 最近使用c ...
最新文章
- java前台传多个id用什么接收_前端js传多个id 到java后台的处理方式
- chcon命令 selinux 配置等
- Element 'dependency' cannot have character [children], because the type's content type is element-on
- react怎么存上一页_【React】存储全局数据
- 窗口捕获显示黑屏_win10每次重启黑屏假死
- 关于网页导航栏制作的几种方法与常见问题解决(新人向)
- 【快报】基于K2 BPM的新一代协同办公门户实践交流会
- Maven打包自动发布到nexus私服
- 02-linux下 yum安装R环境和Rserve安装
- excel用警员姓名查找警号信息
- 【电路仿真】基于matlab BP神经网络三相逆变器故障诊断【含Matlab源码 1655期】
- 【技术维新 践行精彩】浅谈未来网站的构建
- 有道词典“网络已断开”的解决办法
- 华为手机Android studio 配置ADB wifi 调试
- 在中琅条码打印软件中怎样实现CMYK的设置
- 每天定投10元基金有意义吗?
- Halcon 错误 提示 2021 System clock has been set back 解决方法
- Balsamiq新的感觉
- Eclipse中格式化JS、HTML代码
- ERC721: Non-fungible Token Standard