0.有的地方我已经整理成脚本了,有的命令是脚本里面截取的

1.启动hadoop和yarn

$HADOOP_HOME/sbin/start-dfs.sh;$HADOOP_HOME/sbin/start-yarn.sh

2.启动zk

#主机名是mini-mini3所以这里可以遍历

echo "start zkserver "

for i in 1 2 3

do

ssh mini$i "source /etc/profile;$ZK_HOME/bin/zkServer.sh start"

done

3.启动mysqld

service mysqld start

4.启动kafka,集群都要启动

bin/kafka-server-start.sh config/server.properties

5.启动storm

在nimbus.host所属的机器上启动nimbus服务

nohup ./storm nimbus &

在nimbus.host所属的机器上启动ui服务

nohup ./storm ui &

在其它机器上启动supervisor服务

nohup ./storm supervisor &

6.启动flume

#exec.conf

a1.channels=r1

a1.sources=c1

a1.sinks=k1

#a1.sources.c1.type=spooldir #实时性要求不高的话,可以用这种方式,ta

#a1.sources.c1.channels=r1

#a1.sources.c1.spoolDir= /opt/flumeSpool/#a1.sources.c1.fileHeader= falsea1.sources.c1.type=exec

a1.sources.c1.command= tail -F /home/hadoop/kafkastudy/data/flume_sour

a1.sources.c1.channels=r1

a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.topic=orderMq

a1.sinks.k1.brokerList= mini1:9092,mini2:9092,mini3:9092a1.sinks.k1.requiredAcks= 1a1.sinks.k1.batchSize= 20a1.sinks.k1.channel=r1

a1.channels.r1.type=memory

a1.channels.r1.capacity= 10000a1.channels.r1.transactionCapacity= 1000

bin/flume-ng agent --conf conf --conf-file conf/myconf/exec.conf --name a1 -Dflume.root.logger=INFO,console

7.启动造数据的程序

#!/bin/bashfor((i=0;i<50000;i++))doecho"msg-"+$i >> /home/hadoop/kafkastudy/data/flume_sources/click_log/1.log

done

8在mini1:8080上观察

总结

a.造数据和flume之间的链接是在exec.conf文件中配置了flume监听了文件,这个文件是造数据成员生成的,这里相当于数据源

b.flume和kafka之间的链接1在exec.conf中配置了.使用kafka的shell消费消息命令可以查看到

bin/kafka-console-consumer.sh --zookeeper mini1:2181  --topic test1

c.kafka和storm之间的链接,是由于我们在storm上运行了自己定义的一个程序,这个程序就是kafka2tostorm,在程序中指定了KafaSpout.同时还包含了自己的业务

d.

package kafkaAndStorm2;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.StormSubmitter;

import backtype.storm.generated.AlreadyAliveException;

import backtype.storm.generated.InvalidTopologyException;

import backtype.storm.topology.TopologyBuilder;

import storm.kafka.BrokerHosts;

import storm.kafka.KafkaSpout;

import storm.kafka.SpoutConfig;

import storm.kafka.ZkHosts;/***/

public classKafkaAndStormTopologyMain {public static voidmain(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {

TopologyBuilder topologyBuilder= newTopologyBuilder();

SpoutConfig config= new SpoutConfig(new ZkHosts("mini1:2181,mini2:2181,mini3:2181"),"orderMq","/mykafka","kafkaSpout");

topologyBuilder.setSpout("kafkaSpout",new KafkaSpout(config),1);

topologyBuilder.setBolt("mybolt1",new MyKafkaBolt2(),1).shuffleGrouping("kafkaSpout");

Config conf= newConfig();//打印调试信息//conf.setDebug(true);

if (args!=null && args.length>0) {

StormSubmitter.submitTopology(args[0], conf, topologyBuilder.createTopology());

}else{

LocalCluster localCluster= newLocalCluster();

localCluster.submitTopology("storm2kafka", conf, topologyBuilder.createTopology());

}

}

}

package kafkaAndStorm2;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.BasicOutputCollector;

import backtype.storm.topology.IBasicBolt;

import backtype.storm.topology.IRichBolt;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichBolt;

import backtype.storm.tuple.Tuple;

import java.util.Map;

/**

*/

public class MyKafkaBolt2 extends BaseRichBolt {

@Override

public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

}

@Override

public void execute(Tuple input) {

byte[] value = (byte[]) input.getValue(0);

String msg = new String(value);

System.out.println(Thread.currentThread().getId()+" msg "+msg);

}

@Override

public void cleanup() {

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

@Override

public Map getComponentConfiguration() {

return null;

}

}

maven依赖,这里可能需要根据错误提示调一下

4.0.0

cn.itcast.learn

kafka2Strom

1.0-SNAPSHOT

org.apache.storm

storm-core

0.9.5

provided

org.apache.storm

storm-kafka

0.9.5

org.slf4j

slf4j-log4j12

org.slf4j

slf4j-api

org.clojure

clojure

1.5.1

org.apache.kafka

kafka_2.8.2

0.8.1

jmxtools

com.sun.jdmk

jmxri

com.sun.jmx

jms

javax.jms

org.apache.zookeeper

zookeeper

org.slf4j

slf4j-log4j12

org.slf4j

slf4j-api

com.google.code.gson

gson

2.4

redis.clients

jedis

2.7.3

maven-assembly-plugin

jar-with-dependencies

cn.itcast.bigdata.hadoop.mapreduce.wordcount.WordCount

make-assembly

package

single

org.apache.maven.plugins

maven-compiler-plugin

1.8

1.8

flume kafka storm mysql_flume+kafka+storm打通过程相关推荐

  1. redis storm mysql_flume+kafka+storm+redis/mysql启动命令记录

     1.flume启动 bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name fks -Dflum ...

  2. kfaka storm写入mysql_flume+kafka+storm+mysql架构设计

    序言 前段时间学习了storm,最近刚开blog,就把这些资料放上来供大家参考.这个框架用的组件基本都是最新稳定版本,flume-ng1.4+kafka0.8+storm0.9+mysql如果有需要测 ...

  3. 实时数据处理环境搭建flume+kafka+storm:4.storm安装配置

    1.解压 apache-storm-0.9.3.tar.gz 2.修改配置文件 conf/storm.yaml --zk地址  storm.zookeeper.servers:  - "wc ...

  4. kafka安装以及集成storm测试

    参考:http://shiyanjun.cn/archives/934.html 1 zookeeper安装 zookeeper的安装很简单,只需要解压后,修改下zoo.cfg,配置dataDir和 ...

  5. zookeeper+kafka集群部署+storm集群

    zookeeper+kafka集群部署+storm集群 一.环境安装前准备: 准备三台机器 操作系统:centos6.8 jdk:jdk-8u111-linux-x64.gz zookeeper:zo ...

  6. Storm和Kafka集成的重要生产错误和修复

    我将在此处描述Storm和Kafka集成模块的一些细节,一些您应该意识到的重要错误以及如何克服其中的一些错误(尤其是对于生产安装). 我在生产安装中大量使用Apache Storm,并将Kafka作为 ...

  7. Storm集成Kafka

    一.整合说明 Storm 官方对 Kafka 的整合分为两个版本,官方说明文档分别如下: Storm Kafka Integration : 主要是针对 0.8.x 版本的 Kafka 提供整合支持: ...

  8. Storm 消费Kafka数据及相关异常解决

    Storm 消费Kafka数据及相关异常解决 问题 KafkaTopoDemo类 bolt类 问题 storm报错:Exception in thread "main" java. ...

  9. flume系列之:flume基于kafka.topics和kafka.topics.regex两种方式匹配Kafka Topic

    flume系列之:flume基于kafka.topics和kafka.topics.regex两种方式匹配Kafka Topic 一.flume基于kafka.topics匹配Kafka Topic ...

最新文章

  1. 【matlab】第三章数组和数组的运算
  2. 覆盖近2亿篇论文还免费!沈向洋旗下团队「读论文神器」登B站热搜
  3. ConcurrentHashMap的初步使用及场景
  4. YbtOJ#20236-[冲刺NOIP2020模拟赛Day9]红点蓝点【线段树,堆】
  5. 【DP】晨练计划(ybtoj)
  6. Android AudioTrack/AudioRecord -wav文件读取3
  7. 我喜欢这样的老大[10-24]
  8. 一目了然的 Docker 环境配置指南
  9. 中交国通智能科技 招募 AI目标识别技术顾问
  10. 如何成为一名出色的次世代游戏美术师?
  11. java中this用法总结
  12. Nessus安裝教程
  13. 视觉 注意力机制——通道注意力、空间注意力、自注意力
  14. TSX常见简单用法(入门) Vue3+Vite
  15. phpmywind最新版sql注入以及后台目录遍历和文件读取
  16. PHP 获取客户端 IP 地址
  17. 计算机英语专业被动语态,英语专业四级考试
  18. Udacity 优达学院机器学习深度学习课程
  19. 郑州师范学院计算机科学与技术代码,郑州师范学院—VR虚拟仿真实验中心
  20. java 304_http 304 浅析

热门文章

  1. PHP Checkbox获取选中项与
  2. mysql 使用mysqldump 备份和还原
  3. Compass.net
  4. 【Java例题】2.5 温度转换
  5. 045 Android Studio 常用应用
  6. Xamarin Android 应用程序内图标上数字提示
  7. loadrunner 检查点
  8. JavaScript 事件相关
  9. php模式设计之 观察者模式
  10. Android:手把手教你打造可缩放移动的ImageView(下)