flume kafka storm mysql_flume+kafka+storm打通过程
0.有的地方我已经整理成脚本了,有的命令是脚本里面截取的
1.启动hadoop和yarn
$HADOOP_HOME/sbin/start-dfs.sh;$HADOOP_HOME/sbin/start-yarn.sh
2.启动zk
#主机名是mini-mini3所以这里可以遍历
echo "start zkserver "
for i in 1 2 3
do
ssh mini$i "source /etc/profile;$ZK_HOME/bin/zkServer.sh start"
done
3.启动mysqld
service mysqld start
4.启动kafka,集群都要启动
bin/kafka-server-start.sh config/server.properties
5.启动storm
在nimbus.host所属的机器上启动nimbus服务
nohup ./storm nimbus &
在nimbus.host所属的机器上启动ui服务
nohup ./storm ui &
在其它机器上启动supervisor服务
nohup ./storm supervisor &
6.启动flume
#exec.conf
a1.channels=r1
a1.sources=c1
a1.sinks=k1
#a1.sources.c1.type=spooldir #实时性要求不高的话,可以用这种方式,ta
#a1.sources.c1.channels=r1
#a1.sources.c1.spoolDir= /opt/flumeSpool/#a1.sources.c1.fileHeader= falsea1.sources.c1.type=exec
a1.sources.c1.command= tail -F /home/hadoop/kafkastudy/data/flume_sour
a1.sources.c1.channels=r1
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic=orderMq
a1.sinks.k1.brokerList= mini1:9092,mini2:9092,mini3:9092a1.sinks.k1.requiredAcks= 1a1.sinks.k1.batchSize= 20a1.sinks.k1.channel=r1
a1.channels.r1.type=memory
a1.channels.r1.capacity= 10000a1.channels.r1.transactionCapacity= 1000
bin/flume-ng agent --conf conf --conf-file conf/myconf/exec.conf --name a1 -Dflume.root.logger=INFO,console
7.启动造数据的程序
#!/bin/bashfor((i=0;i<50000;i++))doecho"msg-"+$i >> /home/hadoop/kafkastudy/data/flume_sources/click_log/1.log
done
8在mini1:8080上观察
总结
a.造数据和flume之间的链接是在exec.conf文件中配置了flume监听了文件,这个文件是造数据成员生成的,这里相当于数据源
b.flume和kafka之间的链接1在exec.conf中配置了.使用kafka的shell消费消息命令可以查看到
bin/kafka-console-consumer.sh --zookeeper mini1:2181 --topic test1
c.kafka和storm之间的链接,是由于我们在storm上运行了自己定义的一个程序,这个程序就是kafka2tostorm,在程序中指定了KafaSpout.同时还包含了自己的业务
d.
package kafkaAndStorm2;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.ZkHosts;/***/
public classKafkaAndStormTopologyMain {public static voidmain(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
TopologyBuilder topologyBuilder= newTopologyBuilder();
SpoutConfig config= new SpoutConfig(new ZkHosts("mini1:2181,mini2:2181,mini3:2181"),"orderMq","/mykafka","kafkaSpout");
topologyBuilder.setSpout("kafkaSpout",new KafkaSpout(config),1);
topologyBuilder.setBolt("mybolt1",new MyKafkaBolt2(),1).shuffleGrouping("kafkaSpout");
Config conf= newConfig();//打印调试信息//conf.setDebug(true);
if (args!=null && args.length>0) {
StormSubmitter.submitTopology(args[0], conf, topologyBuilder.createTopology());
}else{
LocalCluster localCluster= newLocalCluster();
localCluster.submitTopology("storm2kafka", conf, topologyBuilder.createTopology());
}
}
}
package kafkaAndStorm2;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import java.util.Map;
/**
*/
public class MyKafkaBolt2 extends BaseRichBolt {
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
@Override
public void execute(Tuple input) {
byte[] value = (byte[]) input.getValue(0);
String msg = new String(value);
System.out.println(Thread.currentThread().getId()+" msg "+msg);
}
@Override
public void cleanup() {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map getComponentConfiguration() {
return null;
}
}
maven依赖,这里可能需要根据错误提示调一下
4.0.0
cn.itcast.learn
kafka2Strom
1.0-SNAPSHOT
org.apache.storm
storm-core
0.9.5
provided
org.apache.storm
storm-kafka
0.9.5
org.slf4j
slf4j-log4j12
org.slf4j
slf4j-api
org.clojure
clojure
1.5.1
org.apache.kafka
kafka_2.8.2
0.8.1
jmxtools
com.sun.jdmk
jmxri
com.sun.jmx
jms
javax.jms
org.apache.zookeeper
zookeeper
org.slf4j
slf4j-log4j12
org.slf4j
slf4j-api
com.google.code.gson
gson
2.4
redis.clients
jedis
2.7.3
maven-assembly-plugin
jar-with-dependencies
cn.itcast.bigdata.hadoop.mapreduce.wordcount.WordCount
make-assembly
package
single
org.apache.maven.plugins
maven-compiler-plugin
1.8
1.8
flume kafka storm mysql_flume+kafka+storm打通过程相关推荐
- redis storm mysql_flume+kafka+storm+redis/mysql启动命令记录
1.flume启动 bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name fks -Dflum ...
- kfaka storm写入mysql_flume+kafka+storm+mysql架构设计
序言 前段时间学习了storm,最近刚开blog,就把这些资料放上来供大家参考.这个框架用的组件基本都是最新稳定版本,flume-ng1.4+kafka0.8+storm0.9+mysql如果有需要测 ...
- 实时数据处理环境搭建flume+kafka+storm:4.storm安装配置
1.解压 apache-storm-0.9.3.tar.gz 2.修改配置文件 conf/storm.yaml --zk地址 storm.zookeeper.servers: - "wc ...
- kafka安装以及集成storm测试
参考:http://shiyanjun.cn/archives/934.html 1 zookeeper安装 zookeeper的安装很简单,只需要解压后,修改下zoo.cfg,配置dataDir和 ...
- zookeeper+kafka集群部署+storm集群
zookeeper+kafka集群部署+storm集群 一.环境安装前准备: 准备三台机器 操作系统:centos6.8 jdk:jdk-8u111-linux-x64.gz zookeeper:zo ...
- Storm和Kafka集成的重要生产错误和修复
我将在此处描述Storm和Kafka集成模块的一些细节,一些您应该意识到的重要错误以及如何克服其中的一些错误(尤其是对于生产安装). 我在生产安装中大量使用Apache Storm,并将Kafka作为 ...
- Storm集成Kafka
一.整合说明 Storm 官方对 Kafka 的整合分为两个版本,官方说明文档分别如下: Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka 提供整合支持: ...
- Storm 消费Kafka数据及相关异常解决
Storm 消费Kafka数据及相关异常解决 问题 KafkaTopoDemo类 bolt类 问题 storm报错:Exception in thread "main" java. ...
- flume系列之:flume基于kafka.topics和kafka.topics.regex两种方式匹配Kafka Topic
flume系列之:flume基于kafka.topics和kafka.topics.regex两种方式匹配Kafka Topic 一.flume基于kafka.topics匹配Kafka Topic ...
最新文章
- 【matlab】第三章数组和数组的运算
- 覆盖近2亿篇论文还免费!沈向洋旗下团队「读论文神器」登B站热搜
- ConcurrentHashMap的初步使用及场景
- YbtOJ#20236-[冲刺NOIP2020模拟赛Day9]红点蓝点【线段树,堆】
- 【DP】晨练计划(ybtoj)
- Android AudioTrack/AudioRecord -wav文件读取3
- 我喜欢这样的老大[10-24]
- 一目了然的 Docker 环境配置指南
- 中交国通智能科技 招募 AI目标识别技术顾问
- 如何成为一名出色的次世代游戏美术师?
- java中this用法总结
- Nessus安裝教程
- 视觉 注意力机制——通道注意力、空间注意力、自注意力
- TSX常见简单用法(入门) Vue3+Vite
- phpmywind最新版sql注入以及后台目录遍历和文件读取
- PHP 获取客户端 IP 地址
- 计算机英语专业被动语态,英语专业四级考试
- Udacity 优达学院机器学习深度学习课程
- 郑州师范学院计算机科学与技术代码,郑州师范学院—VR虚拟仿真实验中心
- java 304_http 304 浅析