转载:http://blog.csdn.net/zlfing/article/details/77161440

基础环境:
Redhat 5.5 64位(我这里是三台虚拟机h40,h41,h42)
myeclipse 8.5
jdk1.7.0_25
Python-2.7.12
zookeeper-3.4.5集群
Hadoop-2.6.0集群
apache-storm-0.9.5集群
kafka_2.10-0.8.2.0集群
apache-flume-1.6.0-bin(h40主节点装就行)

安装hadoop集群请参考:http://blog.csdn.net/m0_37739193/article/details/71222673
安装zookeeper请参考:http://blog.csdn.net/m0_37739193/article/details/72457879
安装Kafka:http://blog.csdn.net/m0_37739193/article/details/76688408
安装flume请参考:http://blog.csdn.net/m0_37739193/article/details/72392147

整合flume+kafka
flume-1.6.0已经有自带的kafkasink,我这里用的是它自带的插件,你也可以用其他开源的flumeng-kafka-plugin.jar插件,并且将你引用的这个jar包导入到flume的lib目录下。
[Hadoop@h40 conf]$ cat kafka.conf
[plain] view plain copy
a1.sources = r1
a1.sinks = k1
a1.channels = c1

Describe/configure the source

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data.txt
a1.sources.r1.port = 44444
a1.sources.r1.host = 192.168.8.40
a1.sources.r1.channels = c1

Describe the sink

引用开源的flumeng-kafka-plugin.jar的sink配置

a1.sinks.k1.type = org.apache.flume.plugins.KafkaSink

a1.sinks.k1.metadata.broker.list=h40:9092,h41:9092,h42:9092

a1.sinks.k1.partition.key=0

a1.sinks.k1.partitioner.class=org.apache.flume.plugins.SinglePartition

a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder

a1.sinks.k1.request.required.acks=0

a1.sinks.k1.max.message.size=1000000

a1.sinks.k1.producer.type=sync

a1.sinks.k1.custom.encoding=UTF-8

a1.sinks.k1.custom.topic.name=test

kafka自带的kafkasink的sink配置

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = test
a1.sinks.k1.brokerList = h40:9092,h41:9092,h42:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1

Use a channel which buffers events in memory

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

Bind the source and sink to the channel

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动Kafka监控工具:
KafkaOffsetMonitor运行比较简单,因为所有运行文件,资源文件,jar文件都打包到KafkaOffsetMonitor-assembly-0.2.0.jar了,直接运行就可以,这种方式太棒了。既不用编译也不用配置,呵呵,也不是绝对不配置。
a.新建一个目录kafka-offset-console,然后把jar拷贝到该目录下
参考:
http://blog.csdn.NET/lizhitao/article/details/27199863
http://www.cnblogs.com/smartloli/p/4615908.html

[hadoop@h40 ~]mkdirkafka−offset−console[hadoop@h40 ] mkdir kafka-offset-console [hadoop@h40 ~] ls kafka-offset-console/
KafkaOffsetMonitor-assembly-0.2.0.jar

在kafka-offset-console目录下执行该命令运行在后台:
[plain] view plain copy
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
–zk h40:2181,h41:2181,h42:2181/kafka \
–port 8089 \
–refresh 10.seconds \
–retain 1.days 1>/dev/null 2>&1 &

[hadoop@h40 apache-flume-1.6.0-bin]bin/flume−ngagent−c.−fconf/kafka.conf−na1−Dflume.root.logger=INFO,console[hadoop@h40 ] bin/flume-ng agent -c . -f conf/kafka.conf -n a1 -Dflume.root.logger=INFO,console [hadoop@h40 ~] echo “hello world” >> data.txt

检验:
在h40节点的kafka消费者窗口可见“hello world”,说明整合成功!
[hadoop@h40 kafka_2.10-0.8.2.0]$ bin/kafka-console-consumer.sh –zookeeper h40:2181,h41:2181,h42:2181/kafka –topic test –from-beginning
hello world

预览:(在浏览器上输入http://192.168.8.40:8089)
我们通过Kafka的监控工具,来预览我们上传的日志记录,有没有在Kafka中产生消息数据
(如果对英文不是很熟悉的话,还可以用谷歌浏览器将页面翻译成中文,这样更方便读取信息)
(后来我又在网上看到了另一个kafka的监控工具https://github.com/smartloli/kafka-eagle,但感觉这个比上一个要复杂一些,这个我还没有亲自测试安装,不知道效果如何)

Storm安装配置:
Storm集群也依赖Zookeeper集群,要保证Zookeeper集群正常运行。Storm的安装配置比较简单,我们仍然使用下面3台机器搭建:
192.168.4.142 h40
192.168.4.143 h41
192.168.4.144 h42
[hadoop@h40 ~]$ tar -zxvf apache-storm-0.9.5.tar.gz

然后,修改配置文件conf/storm.yaml,添加如下内容:
[plain] view plain copy
storm.zookeeper.servers:
- “h40”
- “h41”
- “h42”
storm.zookeeper.port: 2181

nimbus.host: “h40”

supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
这个配置非常的膈应人,在某些地方必须加空格,否则启动会报错)

将配置好的安装文件,分发到其他节点上:
[hadoop@h40 ~]scp−rapache−storm−0.9.5/h41:/home/hadoop/[hadoop@h40 ] scp -r apache-storm-0.9.5/ h41:/home/hadoop/ [hadoop@h40 ~] scp -r apache-storm-0.9.5/ h42:/home/hadoop/

Storm集群的主节点为Nimbus,从节点为Supervisor,我们需要在h40上启动Nimbus服务,在从节点h41、h42上启动Supervisor服务:
[hadoop@h40 apache-storm-0.9.5]$ bin/storm nimbus
却报这个错:
[plain] view plain copy
File “bin/storm”, line 61
normclasspath = cygpath if sys.platform == ‘cygwin’ else identity
^
ntaxError: invalid syntax

百度了一下说是Python版本过低造成的,我用python -V看来一下,果然是很古老的Python 2.4.3版本,否则Python的好多新功能它都没有,于是我打算重新安装Python(如果你的Python版本够高可忽略此步骤,2.7以上就可以了),去官网下了个2.7.12的,下载地址:https://www.python.org/downloads/

Python2.7.12安装:三台机器都重复一下步骤:
[root@h40 usr]# tar -zxvf python-2.7.12.tgz
[root@h40 usr]# cd Python-2.7.12/

编译前,请先确认gcc、make、patch等编译工具是否已安装,并可正常使用。
[root@localhost ~]# yum -y install gcc*

[root@h40 python-2.7.12]# ./configure
[root@h40 Python-2.7.12]# make && make install

[root@h40 Python-2.7.12]# rm -rf /usr/bin/python

错误做法:
[root@h40 Python-2.7.12]# ln -s python /usr/bin/python
[root@h40 Python-2.7.12]# ll /usr/bin/python
lrwxrwxrwx 1 root root 6 May 10 10:33 /usr/bin/python -> python
否则在执行bin/storm nimbus的时候会报这个错:
[hadoop@h40 apache-storm-0.9.5]$ bin/storm supervisor
-bash: bin/storm: /usr/bin/python: bad interpreter: Too many levels of symbolic links
正确做法:
[root@h40 ~]# ln -s /usr/Python-2.7.12/python /usr/bin/python
[root@h40 ~]# ll /usr/bin/python
lrwxrwxrwx 1 root root 25 May 10 10:37 /usr/bin/python -> /usr/Python-2.7.12/python

[root@h40 Python-2.7.12]# python -V
Python 2.7.12

可是再执行bin/storm nimbus的时候还是报错:
[hadoop@h40 apache-storm-0.9.5]$ bin/storm nimbus
-bash: bin/storm: /usr/bin/python: bad interpreter: Permission denied
解决方案:在/home/hadoop/apache-storm-0.9.5/bin/storm中,用你的实际Python安装路径#!/usr/Python-2.7.12/python替换第一行#!/usr/bin/python
(这里我不太懂的是前面的#不是注释的意思吗,那修不修改又有什么意义呢,可是结果它还好使。后来我在第二次试验的时候并没有报这个错,我也不知道啥原因,如果你没有出现该报错可忽略)

[hadoop@h40 apache-storm-0.9.5]bin/stormsupervisor(不知道为什么这个主节点的supervisor也得开,我当时不开的话再后面的试验中无法将kafka中的数据实时传到storm做分析,三个节点都开supervisor的时候就正常,除了主节点只开两个节点的supervisor的话就产生空文件无数据产生。后来开主节点和一个从节点的supervisor并且关另一个从节点的supervisor却也好使,并且还会在有supervisor进程的主节点h40中再创建一个新的文件写入,我已将被玩坏了。。。。。但是storm的容错性不是很好吗,只缺一个supervisor咋么会出错呢,再说我看人家博客中也没说要必须主节点也得开supervisor进程也成功了啊,不知道大家有没有遇到这个问题,很是困惑我。后来我在本地模式下把主节点即使把其他两节点的supervisor关掉却都能正常往里写文件里写数据,就是稍微等一下而已,并且发现开启的supervisor进程越多所等待的时间越少,我也真是奇了怪了!顺便这里提一句正常的情况下提交完Topology后会产生空文件,但是得等好长一会儿才能将kafka中的数据写入,时间长到你都怀疑试验失败了。。)[hadoop@h41apache−storm−0.9.5] bin/storm supervisor(不知道为什么这个主节点的supervisor也得开,我当时不开的话再后面的试验中无法将kafka中的数据实时传到storm做分析,三个节点都开supervisor的时候就正常,除了主节点只开两个节点的supervisor的话就产生空文件无数据产生。后来开主节点和一个从节点的supervisor并且关另一个从节点的supervisor却也好使,并且还会在有supervisor进程的主节点h40中再创建一个新的文件写入,我已将被玩坏了。。。。。 但是storm的容错性不是很好吗,只缺一个supervisor咋么会出错呢,再说我看人家博客中也没说要必须主节点也得开supervisor进程也成功了啊,不知道大家有没有遇到这个问题,很是困惑我。后来我在本地模式下把主节点即使把其他两节点的supervisor关掉却都能正常往里写文件里写数据,就是稍微等一下而已,并且发现开启的supervisor进程越多所等待的时间越少,我也真是奇了怪了!顺便这里提一句正常的情况下提交完Topology后会产生空文件,但是得等好长一会儿才能将kafka中的数据写入,时间长到你都怀疑试验失败了。。) [hadoop@h41 apache-storm-0.9.5] bin/storm supervisor
[hadoop@h42 apache-storm-0.9.5]$ bin/storm supervisor
(还有就是在提交Topology后,在启动supervisor进程的控制台总是打印出如kill 24361: No such process之类的,其中数字不断的变化,但却不影响正常使用,我不太明白是什么原因)

为了方便监控,可以启动Storm UI,可以从Web页面上监控Storm Topology的运行状态,例如在h40上启动:
[hadoop@h40 apache-storm-0.9.5]$ bin/storm ui &
这样可以通过访问http://192.168.8.40:8080(我用http://h40:8080没有访问成功)来查看Topology的运行状况。

整合Kafka+Storm
消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,然后再由实时计算程序Storm做实时分析,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理。
下面,我们开发了一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,实现的Topology的代码,如下所示:
[java] view plain copy
package org.shirdrn.storm.examples;

import java.io.FileWriter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.UUID;

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class MyKafkaTopology {
public static class WordSpliter extends BaseBasicBolt{
@Override
public void execute(Tuple tuple, BasicOutputCollector collector){
// 接收到一个句子
String sentence = tuple.getString(0);
// 把句子切割为单词
StringTokenizer iter = new StringTokenizer(sentence);
// 发送每一个单词
while(iter.hasMoreElements()){
collector.emit(new Values(iter.nextToken()));
}
}

    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer){    // 定义一个字段    declarer.declare(new Fields("word"));    }
}    public static class WriterBolt extends BaseBasicBolt {    Map<String, Integer> counts = new HashMap<String, Integer>();    private FileWriter writer = null;  @Override  public void prepare(Map stormConf, TopologyContext context) {  try {  writer = new FileWriter("/home/hadoop/stormoutput/" + "wordcount"+UUID.randomUUID().toString());

//将storm处理后的数据写入相应的路径下,如果在Linux上要写为/home/hadoop/stormoutput/的形式,如果是在Windows下要写成C:\Users\huiqiang\Desktop\stormoutput\的形式,否则找不到指定的路径
//其实写入文件这步是没有必要的,在Linux中提交本地模式的时候可以将storm处理的数据打印到控制台上的,但是提交集群模式的时候却无法打印,并且提交Topology后程序就结束了,虽然Topology是提交成功了但是不知道storm处理后的数据跑哪了,所以才来了这么一步想直观点
//顺便说一句在Linux上提交本地模式的时候写入的文件会只在h40的/home/hadoop/stormoutput/下产生三个文件,而提交集群模式的话会在每个节点下的/home/hadoop/stormoutput/目录下产生一个文件
} catch (IOException e) {
throw new RuntimeException(e);
}
}

    @Override    public void execute(Tuple tuple, BasicOutputCollector collector){    // 接收一个单词    String word = tuple.getString(0);    // 获取该单词对应的计数    Integer count = counts.get(word);    if(count == null)    count = 0;    // 计数增加    count++;    // 将单词和对应的计数加入map中    counts.put(word,count);    System.out.println("hello word!");    System.out.println(word +"  "+count);   //用io流将数据写入指定的文件中  try {  writer.write(word +"  "+count);  writer.write("\n");  writer.flush();  } catch (IOException e) {  throw new RuntimeException(e);  }  // 发送单词和计数(分别对应字段word和count)    collector.emit(new Values(word, count));    }    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer){    // 定义两个字段word和count    declarer.declare(new Fields("word","count"));    }
}    public static void main(String[] args) throws Exception {  String topic = "test";  String zkRoot = "/kafka-storm";  String id = "old";  BrokerHosts brokerHosts = new ZkHosts("h40:2181,h41:2181,h42:2181","/kafka/brokers");

//配置kafka时,如果使用zookeeper create /kafka创建了节点,kafka与storm集成时new ZkHosts(zks) 需要改成 new ZkHosts(zks,”/kafka/brokers”),不然会报
//java.lang.RuntimeException: java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/test/partitions
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConfig.forceFromStart = true;
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder();
//设置一个spout用来从kaflka消息队列中读取数据并发送给下一级的bolt组件,此处用的spout组件并非自定义的,而是storm中已经开发好的KafkaSpout
builder.setSpout(“KafkaSpout”, new KafkaSpout(spoutConfig));
builder.setBolt(“word-spilter”, new WordSpliter()).shuffleGrouping(“KafkaSpout”);
builder.setBolt(“writer”, new WriterBolt(), 3).fieldsGrouping(“word-spilter”, new Fields(“word”));
Config conf = new Config();
conf.setNumWorkers(4);
conf.setNumAckers(0);
conf.setDebug(false);

    if (args != null && args.length > 0) {  //提交topology到storm集群中运行  StormSubmitter.submitTopology("sufei-topo", conf, builder.createTopology());  } else {  //LocalCluster用来将topology提交到本地模拟器运行,方便开发调试  LocalCluster cluster = new LocalCluster();  cluster.submitTopology("WordCount", conf, builder.createTopology());  }
}

}

上面Topology实现代码中,有一个很关键的配置对象SpoutConfig,配置属性如下所示:
spoutConf.forceFromStart = false;
该配置是指,如果该Topology因故障停止处理,下次正常运行时是否从Spout对应数据源Kafka中的该订阅Topic的起始位置开始读取,如果forceFromStart=true,则之前处理过的Tuple还要重新处理一遍,否则会从上次处理的位置继续处理,保证Kafka中的Topic数据不被重复处理,是在数据源的位置进行状态记录。

将所依赖的jar包导入到storm主节点的lib目录下:
[hadoop@h40 ~]cpkafka2.10−0.8.2.0/libs/kafka2.10−0.8.2.0.jarapache−storm−0.9.5/lib/[hadoop@h40 ] cp kafka_2.10-0.8.2.0/libs/kafka_2.10-0.8.2.0.jar apache-storm-0.9.5/lib/ [hadoop@h40 ~] cp kafka_2.10-0.8.2.0/libs/Scala-library-2.10.4.jar apache-storm-0.9.5/lib/
[hadoop@h40 ~]cpkafka2.10−0.8.2.0/libs/metrics−core−2.2.0.jarapache−storm−0.9.5/lib/[hadoop@h40 ] cp kafka_2.10-0.8.2.0/libs/metrics-core-2.2.0.jar apache-storm-0.9.5/lib/ [hadoop@h40 ~] cp kafka_2.10-0.8.2.0/libs/snappy-Java-1.1.1.6.jar apache-storm-0.9.5/lib/
[hadoop@h40 ~]cpkafka2.10−0.8.2.0/libs/zkclient−0.3.jarapache−storm−0.9.5/lib/[hadoop@h40 ] cp kafka_2.10-0.8.2.0/libs/zkclient-0.3.jar apache-storm-0.9.5/lib/ [hadoop@h40 ~] cp kafka_2.10-0.8.2.0/libs/log4j-1.2.16.jar apache-storm-0.9.5/lib/
[hadoop@h40 ~]cpkafka2.10−0.8.2.0/libs/slf4j−api−1.7.6.jarapache−storm−0.9.5/lib/[hadoop@h40 ] cp kafka_2.10-0.8.2.0/libs/slf4j-api-1.7.6.jar apache-storm-0.9.5/lib/ [hadoop@h40 ~] cp kafka_2.10-0.8.2.0/libs/jopt-simple-3.2.jar apache-storm-0.9.5/lib/
[hadoop@h40 ~]cpkafka2.10−0.8.2.0/libs/zookeeper−3.4.6.jarapache−storm−0.9.5/lib/[hadoop@h40 ] cp kafka_2.10-0.8.2.0/libs/zookeeper-3.4.6.jar apache-storm-0.9.5/lib/ [hadoop@h40 ~] cp kafka_2.10-0.8.2.0/libs/kafka_2.10-0.8.2.0-test.jar apache-storm-0.9.5/lib/(这个在eclipse里不用导入不报错,但是在Linux中却报Java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/Utils)
(以上这些步骤在linux中必须在所有节点的storm的lib目录下都导入这些依赖包,集群模式下如此,如果是在本地模式下则只在storm的主节点的lib目录下导入这些依赖包即可)
(在Linux中可以把kafka的libs目录下的jar包全部(除slf4j-log4j12-1.6.1.jar)导入到storm的lib目录下,在eclipse中也不能将kafka的libs目录下的slf4j-log4j12-1.6.1.jar导入Libraries中,否则会报错,参考:http://blog.csdn.net/ouyang111222/article/details/49700733)
(还得将curator-client-2.4.0.jar、curator-framework-2.4.0.jar、guava-13.0.jar这三个jar包导入到storm主节点的的lib目录下,apache-storm-0.9.2-incubating版本中的lib目录下就有,但是这个版本这提供了storm与kafka整合的jar包,而storm-0.9.5还提供了storm与hdfs和Hbase整合的jar包)

用myeclipse将代码打包成wordcount.jar上传到h40的/home/hadoop/apache-storm-0.9.5目录下,然后,就可以提交我们开发的Topology程序了:(好多博客中都是用maven打成jar包,而我却用的是myeclipse 8.5,一是因为我对maven不是很了解,二是eclipse和maven还得整合就有多了一道工序,所以我这里用了大家都比较熟悉的eclipse)
[hadoop@h40 apache-storm-0.9.5]bin/stormjarwordcount.jarorg.shirdrn.storm.examples.MyKafkaTopologyh40可是会报这个错:[plain]viewplaincopyExceptioninthread“main”java.lang.NoClassDefFoundError:storm/kafka/BrokerHostsatjava.lang.Class.getDeclaredMethods0(NativeMethod)atjava.lang.Class.privateGetDeclaredMethods(Class.java:2521)atjava.lang.Class.getMethod0(Class.java:2764)atjava.lang.Class.getMethod(Class.java:1653)atsun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494)atsun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486)Causedby:java.lang.ClassNotFoundException:storm.kafka.BrokerHostsatjava.net.URLClassLoader bin/storm jar wordcount.jar org.shirdrn.storm.examples.MyKafkaTopology h40 可是会报这个错: [plain] view plain copy Exception in thread “main” java.lang.NoClassDefFoundError: storm/kafka/BrokerHosts at java.lang.Class.getDeclaredMethods0(Native Method) at java.lang.Class.privateGetDeclaredMethods(Class.java:2521) at java.lang.Class.getMethod0(Class.java:2764) at java.lang.Class.getMethod(Class.java:1653) at sun.launcher.LauncherHelper.getMainMethod(LauncherHelper.java:494) at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:486) Caused by: java.lang.ClassNotFoundException: storm.kafka.BrokerHosts at java.net.URLClassLoader1.run(URLClassLoader.java:366)
at java.net.URLClassLoader1.run(URLClassLoader.java:355)atjava.security.AccessController.doPrivileged(NativeMethod)atjava.net.URLClassLoader.findClass(URLClassLoader.java:354)atjava.lang.ClassLoader.loadClass(ClassLoader.java:424)atsun.misc.Launcher1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.LauncherAppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
… 6 more

原因:
server端(h40)没有storm-kafka相关jar包
解决:
[hadoop@h40 apache-storm-0.9.5]$ cp external/storm-kafka/storm-kafka-0.9.5.jar lib/
(这个版本已经自带了kafka和storm的插件,external/目录下还有hbase和hdfs整合的插件,在apache-storm-0.9.2-incubating版本中只有整合kafka的插件,而在0.8.x版本中没有自带的这些插件,还得去GitHub上下载别人开源的整合kafka-storm的插件)

我见有的博客中直接执行bin/storm jar wordcount.jar MyKafkaTopology h40也可以提交Topology,但是我却报错:
Error: Could not find or load main class MyKafkaTopology
还必须得加上包名,后来才知道不加包名的是在eclipse中没有创建包而是使用了默认的包名(default package)。

按http://shiyanjun.cn/archives/934.html中的整合Kafka+Storm中的代码修改相应参数后运行:
[hadoop@h40 apache-storm-0.9.5]$ bin/storm jar wordcount.jar org.shirdrn.storm.examples.MyKafkaTopology h40
[plain] view plain copy
。。。。。。。
Start uploading file ‘wordcount.jar’ to ‘storm-local/nimbus/inbox/stormjar-2f5eede7-71dd-41c0-95a7-32ac5bc4097f.jar’ (6980 bytes)
[==================================================] 6980 / 6980
File ‘wordcount.jar’ uploaded to ‘storm-local/nimbus/inbox/stormjar-2f5eede7-71dd-41c0-95a7-32ac5bc4097f.jar’ (6980 bytes)
(我不知道输出结果在哪里。。。。。。。并且用bin/storm list命令查看Topology还提交成功了)

后来用我自己的代码:
[hadoop@h40 apache-storm-0.9.5]$ bin/storm jar wordcount.jar org.shirdrn.storm.examples.MyKafkaTopology h40
[plain] view plain copy
337 [main] INFO backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar…
348 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar wordcount.jar to assigned location: storm-local/nimbus/inbox/stormjar-a0260e30-7c9a-465e-b796-38fe25a58a13.jar
364 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: storm-local/nimbus/inbox/stormjar-a0260e30-7c9a-465e-b796-38fe25a58a13.jar
364 [main] INFO backtype.storm.StormSubmitter - Submitting topology sufei-topo in distributed mode with conf {“topology.workers”:4,”topology.acker.executors”:0,”topology.debug”:false}
593 [main] INFO backtype.storm.StormSubmitter - Finished submitting topology: sufei-topo
(到这程序就结束了,并没有阻塞。。。。。。)

[hadoop@h40 apache-storm-0.9.5]$ bin/storm list
[plain] view plain copy
。。。
775 [main] INFO backtype.storm.thrift - Connecting to Nimbus at h40:6627

Topology_name Status Num_tasks Num_workers Uptime_secs

sufei-topo ACTIVE 5 4 60
(Topology提交成功)

[hadoop@h40 zookeeper-3.4.5]$ ./bin/zkCli.sh
[plain] view plain copy
[zk: localhost:2181(CONNECTED) 15] ls /
[zookeeper, admin, kafka-storm, consumers, config, controller, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 16] ls /kafka-storm
[old]
[zk: localhost:2181(CONNECTED) 17] ls /kafka-storm/old
[partition_1, partition_0]
(登录zookeeper客户端可发现新产生了/kafka-storm/old)

在三个节点下的stormoutput目录下产生相应的文件,这里产生的规则应该是产生文件的数量和builder.setBolt(“writer”, new WriterBolt(), 3).fieldsGrouping(“word-spilter”, new Fields(“word”));中的数字参数有关,并且每个节点下的文件数应该是随机的吧,但是产生文件的实际情况比较复杂我没有找到相应的规律,甚至还在一个节点的该目录下产生了很多的空文件,等过了一会儿才产生有内容的文件,我也是醉了
在kafka生产者中写入数据,则相应stormoutput目录下的文件也会相应增加内容。。

可以通过查看日志文件(logs/目录下)或者Storm UI来监控Topology的运行状况。如果程序没有错误,可以使用前面我们使用的Kafka Producer来生成消息,就能看到我们开发的Storm Topology能够实时接收到并进行处理。

(通过代码可知当有参数h40的时候提交的是集群模式,没有任何参数提交的是本地模式)
在linux上提交本地模式:
[hadoop@h40 apache-storm-0.9.5]$ bin/storm jar wordcount.jar org.shirdrn.storm.examples.MyKafkaTopology
[plain] view plain copy
运行结果:
hello word!
hello 1
hello word!
hadoop 1
hello word!
hello 2
hello word!
hadoop 2
hello word!
hello 3
hello word!
hive 1
(程序阻塞在这里,有新的内容则会再将新结果打印出来)

用bin/storm list命令查看并没有Topology产生,登录zookeeper客户端也不会看到有/kafka-storm目录生成

[hadoop@h40 kafka_2.10-0.8.2.0]$ bin/kafka-console-consumer.sh –zookeeper h40:2181,h41:2181,h42:2181 –topic hui –from-beginning
[plain] view plain copy
hello hadoop
hello hadoop
hello hive

[hadoop@h40 ~]$ ll stormoutput/
[plain] view plain copy
total 8
-rw-rw-r– 1 hadoop hadoop 0 May 11 11:39 wordcount29dbd1db-06ad-44bf-8b38-1a4812992c04
-rw-rw-r– 1 hadoop hadoop 47 May 11 11:39 wordcount9398f5ce-792f-421f-b12c-9b3fe5c7bb8c
-rw-rw-r– 1 hadoop hadoop 8 May 11 11:39 wordcountab5b6a8e-31e1-4597-a5be-6353cdf0ba2b

[hadoop@h40 kafka_2.10-0.8.2.0]bin/kafka−console−producer.sh–broker−listh40:9092,h41:9092,h42:9092–topichui[2017−05−1111:41:08,428]WARNPropertytopicisnotvalid(kafka.utils.VerifiableProperties)hellohadoop(在kafka生产者中输入数据,则storm的Topology端会实时输出处理信息,stormoutput目录下的文件也会动态增加信息):helloword!hello4helloword!hadoop3[hadoop@h40 ] bin/kafka-console-producer.sh –broker-list h40:9092,h41:9092,h42:9092 –topic hui [2017-05-11 11:41:08,428] WARN Property topic is not valid (kafka.utils.VerifiableProperties) hello hadoop (在kafka生产者中输入数据,则storm的Topology端会实时输出处理信息,stormoutput目录下的文件也会动态增加信息): hello word! hello 4 hello word! hadoop 3 [hadoop@h40 ~] tail -f stormoutput/wordcount29dbd1db-06ad-44bf-8b38-1a4812992c04 stormoutput/wordcount9398f5ce-792f-421f-b12c-9b3fe5c7bb8c stormoutput/wordcountab5b6a8e-31e1-4597-a5be-6353cdf0ba2b
==> stormoutput/wordcount9398f5ce-792f-421f-b12c-9b3fe5c7bb8c <==
hello 4
hadoop 3

注意:
当在eclipse中提交本地模式的时候可能会报这个错(在主方法中直接右键点击Run As–>Java Application就是提交的本地模式):
[plain] view plain copy
java.net.UnknownHostException: h40
at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method) ~[na:1.6.0_13]
at java.net.InetAddress1.lookupAllHostAddr(InetAddress.java:849) [na:1.6.013]atjava.net.InetAddress.getAddressFromNameService(InetAddress.java:1200) [na:1.6.013]atjava.net.InetAddress.getAllByName0(InetAddress.java:1153) [na:1.6.013]atjava.net.InetAddress.getAllByName(InetAddress.java:1083) [na:1.6.013]atjava.net.InetAddress.getAllByName(InetAddress.java:1019) [na:1.6.013]atorg.apache.zookeeper.client.StaticHostProvider.(StaticHostProvider.java:60) [zookeeper−3.4.5.jar:3.4.5−1392090]atorg.apache.zookeeper.ZooKeeper.(ZooKeeper.java:445) [zookeeper−3.4.5.jar:3.4.5−1392090]atorg.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29) [curator−client−2.4.0.jar:na]atorg.apache.curator.framework.imps.CuratorFrameworkImpl1.lookupAllHostAddr(InetAddress.java:849) ~[na:1.6.0_13] at java.net.InetAddress.getAddressFromNameService(InetAddress.java:1200) ~[na:1.6.0_13] at java.net.InetAddress.getAllByName0(InetAddress.java:1153) ~[na:1.6.0_13] at java.net.InetAddress.getAllByName(InetAddress.java:1083) ~[na:1.6.0_13] at java.net.InetAddress.getAllByName(InetAddress.java:1019) ~[na:1.6.0_13] at org.apache.zookeeper.client.StaticHostProvider.(StaticHostProvider.java:60) ~[zookeeper-3.4.5.jar:3.4.5-1392090] at org.apache.zookeeper.ZooKeeper.(ZooKeeper.java:445) ~[zookeeper-3.4.5.jar:3.4.5-1392090] at org.apache.curator.utils.DefaultZookeeperFactory.newZooKeeper(DefaultZookeeperFactory.java:29) ~[curator-client-2.4.0.jar:na] at org.apache.curator.framework.imps.CuratorFrameworkImpl2.newZooKeeper(CuratorFrameworkImpl.java:169) ~[curator-framework-2.4.0.jar:na]
at org.apache.curator.HandleHolder1.getZooKeeper(HandleHolder.java:94) [curator−client−2.4.0.jar:na]atorg.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) [curator−client−2.4.0.jar:na]atorg.apache.curator.ConnectionState.reset(ConnectionState.java:219) [curator−client−2.4.0.jar:na]atorg.apache.curator.ConnectionState.start(ConnectionState.java:103) [curator−client−2.4.0.jar:na]atorg.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:188) [curator−client−2.4.0.jar:na]atorg.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:234) [curator−framework−2.4.0.jar:na]atstorm.kafka.DynamicBrokersReader.(DynamicBrokersReader.java:53)[storm−kafka−0.9.2−incubating.jar:0.9.2−incubating]atstorm.kafka.trident.ZkBrokerReader.(ZkBrokerReader.java:41)[storm−kafka−0.9.2−incubating.jar:0.9.2−incubating]atstorm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57)[storm−kafka−0.9.2−incubating.jar:0.9.2−incubating]atstorm.kafka.KafkaSpout.open(KafkaSpout.java:87)[storm−kafka−0.9.2−incubating.jar:0.9.2−incubating]atbacktype.storm.daemon.executor1.getZooKeeper(HandleHolder.java:94) ~[curator-client-2.4.0.jar:na] at org.apache.curator.HandleHolder.getZooKeeper(HandleHolder.java:55) ~[curator-client-2.4.0.jar:na] at org.apache.curator.ConnectionState.reset(ConnectionState.java:219) ~[curator-client-2.4.0.jar:na] at org.apache.curator.ConnectionState.start(ConnectionState.java:103) ~[curator-client-2.4.0.jar:na] at org.apache.curator.CuratorZookeeperClient.start(CuratorZookeeperClient.java:188) ~[curator-client-2.4.0.jar:na] at org.apache.curator.framework.imps.CuratorFrameworkImpl.start(CuratorFrameworkImpl.java:234) ~[curator-framework-2.4.0.jar:na] at storm.kafka.DynamicBrokersReader.(DynamicBrokersReader.java:53) [storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] at storm.kafka.trident.ZkBrokerReader.(ZkBrokerReader.java:41) [storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] at storm.kafka.KafkaUtils.makeBrokerReader(KafkaUtils.java:57) [storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] at storm.kafka.KafkaSpout.open(KafkaSpout.java:87) [storm-kafka-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.daemon.executorfn__5573fn__5588.invoke(executor.clj:520) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]  
    at backtype.storm.utilfn__5588.invoke(executor.clj:520) [storm-core-0.9.2-incubating.jar:0.9.2-incubating] at backtype.storm.utilasync_loop$fn__457.invoke(util.clj:429) [storm-core-0.9.2-incubating.jar:0.9.2-incubating]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:619) [na:1.6.0_13]

解决:修改C:\Windows\System32\drivers\etc\hosts文件,添加如下内容,可能无法保存,请参考:https://jingyan.baidu.com/article/624e7459b194f134e8ba5a8e.html(Windows10),https://jingyan.baidu.com/article/e5c39bf56564a539d7603312.html(Windows7)
在末尾添加(你的storm集群的IP和主机名):
192.168.8.40
h40
192.168.8.41
h41
192.168.8.42
h42

提交本地模式运行的时候会出现这个,但不必理会不影响正常使用
[plain] view plain copy
java.net.SocketException: Address family not supported by protocol family: connect
at sun.nio.ch.Net.connect(Native Method) ~[na:1.6.0_13]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:507) ~[na:1.6.0_13]
at org.apache.zookeeper.ClientCnxnSocketNIO.registerAndConnect(ClientCnxnSocketNIO.java:266) ~[zookeeper-3.4.5.jar:3.4.5-1392090]
at org.apache.zookeeper.ClientCnxnSocketNIO.connect(ClientCnxnSocketNIO.java:276) ~[zookeeper-3.4.5.jar:3.4.5-1392090]
at org.apache.zookeeper.ClientCnxnSendThread.startConnect(ClientCnxn.java:958) [zookeeper−3.4.5.jar:3.4.5−1392090]atorg.apache.zookeeper.ClientCnxnSendThread.startConnect(ClientCnxn.java:958) ~[zookeeper-3.4.5.jar:3.4.5-1392090] at org.apache.zookeeper.ClientCnxnSendThread.run(ClientCnxn.java:993) ~[zookeeper-3.4.5.jar:3.4.5-1392090]

以上kafka、nimbus、supervisor进程的启动我都在窗口中直接启动了,我这里是为了方便观察和学习,正常情况下可以在后台运行程序:
如:
bin/kafka-server-start.sh config/server.properties &
(但是这种可能会在窗口莫名奇妙的弹出信息不太好,我百度的博客中大多数用的都是这种)
下面这种更好:
bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &
(1>/dev/null的意思将标准输出重定向到/dev/null中,2>&1的意思是将错误输出也和标准输出重定向到相同的地方,&的意思是在后台运行)

整合Storm+HDFS
Storm实时计算集群从Kafka消息中间件中消费消息,有实时处理需求的可以走实时处理程序,还有需要进行离线分析的需求,如写入到HDFS进行分析。下面实现了一个Topology,代码如下所示:
[java] view plain copy
package org.shirdrn.storm.examples;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Random;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class StormToHDFSTopology {

 public static class EventSpout extends BaseRichSpout {  private static final Log LOG = LogFactory.getLog(EventSpout.class);  private static final long serialVersionUID = 886149197481637894L;  private SpoutOutputCollector collector;  private Random rand;  private String[] records;  @Override  public void open(Map conf, TopologyContext context,  SpoutOutputCollector collector) {  this.collector = collector;      rand = new Random();  records = new String[] {  "10001     ef2da82d4c8b49c44199655dc14f39f6     4.2.1     HUAWEI G610-U00     HUAWEI     2     70:72:3c:73:8b:22     2014-10-13 12:36:35",  "10001     ffb52739a29348a67952e47c12da54ef     4.3     GT-I9300     samsung     2     50:CC:F8:E4:22:E2     2014-10-13 12:36:02",  "10001     ef2da82d4c8b49c44199655dc14f39f6     4.2.1     HUAWEI G610-U00     HUAWEI     2     70:72:3c:73:8b:22     2014-10-13 12:36:35"  };  }  @Override  public void nextTuple() {  Utils.sleep(1000);  DateFormat df = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss");  Date d = new Date(System.currentTimeMillis());  String minute = df.format(d);  String record = records[rand.nextInt(records.length)];  LOG.info("EMIT[spout -> hdfs] " + minute + " : " + record);  collector.emit(new Values(minute, record));  }  @Override  public void declareOutputFields(OutputFieldsDeclarer declarer) {  declarer.declare(new Fields("minute", "record"));           }  }  public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {  // use "|" instead of "," for field delimiter  RecordFormat format = new DelimitedRecordFormat()  .withFieldDelimiter(" : ");  // sync the filesystem after every 1k tuples  SyncPolicy syncPolicy = new CountSyncPolicy(1000);  // rotate files   FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES);  FileNameFormat fileNameFormat = new DefaultFileNameFormat()  .withPath("/storm/").withPrefix("app_").withExtension(".log");  HdfsBolt hdfsBolt = new HdfsBolt()  .withFsUrl("hdfs://h40:9000")  .withFileNameFormat(fileNameFormat)  .withRecordFormat(format)  .withRotationPolicy(rotationPolicy)  .withSyncPolicy(syncPolicy);  TopologyBuilder builder = new TopologyBuilder();  builder.setSpout("event-spout", new EventSpout(), 3);  builder.setBolt("hdfs-bolt", hdfsBolt, 2).fieldsGrouping("event-spout", new Fields("minute"));  Config conf = new Config();  String name = StormToHDFSTopology.class.getSimpleName();  if (args != null && args.length > 0) {  conf.put(Config.NIMBUS_HOST, args[0]);  conf.setNumWorkers(3);  StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());  } else {  conf.setMaxTaskParallelism(3);  LocalCluster cluster = new LocalCluster();  cluster.submitTopology(name, conf, builder.createTopology());  Thread.sleep(60000);  cluster.shutdown();  }  }

}
上面的处理逻辑,可以对HdfsBolt进行更加详细的配置,如FileNameFormat、SyncPolicy、FileRotationPolicy(可以设置在满足什么条件下,切出一个新的日志,如可以指定多长时间切出一个新的日志文件,可以指定一个日志文件大小达到设置值后,再写一个新日志文件),更多设置可以参考storm-hdfs。

修改~/.bashrc文件,添加:
[hadoop@h40 ~]$ vi ~/.bash_profile
export CLASSPATH=.:/home/hadoop/hadoop-2.6.0/etc/hadoop:/home/hadoop/hadoop-2.6.0/share/hadoop/common/lib/:/home/hadoop/hadoop-2.6.0/share/hadoop/common/:/home/hadoop/hadoop-2.6.0/share/hadoop/hdfs:/home/hadoop/hadoop-2.6.0/share/hadoop/hdfs/lib/:/home/hadoop/hadoop-2.6.0/share/hadoop/hdfs/:/home/hadoop/hadoop-2.6.0/share/hadoop/yarn/lib/:/home/hadoop/hadoop-2.6.0/share/hadoop/yarn/:/home/hadoop/hadoop-2.6.0/share/hadoop/mapreduce/lib/:/home/hadoop/hadoop-2.6.0/share/hadoop/mapreduce/:/home/hadoop/hadoop-2.6.0/contrib/capacity-scheduler/*.jar
将hadoop classpath所依赖的jar包(除slf4j-log4j12-1.7.5.jar外,这个很重要)导入storm的lib目录下(所有节点,集群模式下)。本地模式下的话只需在storm的主节点导入就可以
[hadoop@h40 ~]$ cp apache-storm-0.9.5/external/storm-hdfs/storm-hdfs-0.9.5.jar apache-storm-0.9.5/lib/

提交Topology:
[plain] view plain copy
[hadoop@h40 apache-storm-0.9.5]$ bin/storm jar wordcount.jar org.shirdrn.storm.examples.StormToHDFSTopology h40
。。。
Start uploading file ‘wordcount.jar’ to ‘storm-local/nimbus/inbox/stormjar-a0f3899e-29e0-4649-85b9-66569e64f15a.jar’ (11285 bytes)
[==================================================] 11285 / 11285
File ‘wordcount.jar’ uploaded to ‘storm-local/nimbus/inbox/stormjar-a0f3899e-29e0-4649-85b9-66569e64f15a.jar’ (11285 bytes)

稍等一会儿(这个时间还挺蛋疼的,有时长有时短,长的时候也不知道试验成功没就得在那里等)再查看hdfs路径:
[plain] view plain copy
[hadoop@h40 ~]hadoopfs−lsr/drwxr−xr−x−hadoopsupergroup02017−05−1114:52/storm−rw−r–r–3hadoopsupergroup51502017−05−1114:47/storm/apphdfs−bolt−8−0−1494485195669.log−rw−r–r–3hadoopsupergroup46282017−05−1114:48/storm/apphdfs−bolt−8−1−1494485279754.log[hadoop@h40 ] hadoop fs -lsr / drwxr-xr-x - hadoop supergroup 0 2017-05-11 14:52 /storm -rw-r–r– 3 hadoop supergroup 5150 2017-05-11 14:47 /storm/app_hdfs-bolt-8-0-1494485195669.log -rw-r–r– 3 hadoop supergroup 4628 2017-05-11 14:48 /storm/app_hdfs-bolt-8-1-1494485279754.log [hadoop@h40 ~] hadoop fs -cat /storm/app_hdfs-bolt-8-0-1494485195669.log
17/05/11 14:54:37 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
2017-05-11_14-46-32 : 10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35
2017-05-11_14-46-34 : 10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35
2017-05-11_14-46-36 : 10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35
2017-05-11_14-46-38 : 10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35
2017-05-11_14-46-41 : 10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35
2017-05-11_14-46-43 : 10001 ffb52739a29348a67952e47c12da54ef 4.3 GT-I9300 samsung 2 50:CC:F8:E4:22:E2 2014-10-13 12:36:02
2017-05-11_14-46-45 : 10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35
。。。

整合Flume+Kafka+Storm+HDFS
上面分别对整合Kafka+Storm和Storm+HDFS做了实践,可以将后者的Spout改成前者的Spout,从Kafka中消费消息,在Storm中可以做简单处理,然后将数据写入HDFS,最后可以在Hadoop平台上对数据进行离线分析处理。下面,写了一个简单的例子,从Kafka消费消息,然后经由Storm处理,写入到HDFS存储,代码如下所示:
[java] view plain copy
package org.shirdrn.storm.examples;

import java.util.Arrays;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.format.FileNameFormat;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy.TimeUnit;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.bolt.sync.SyncPolicy;

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class DistributeWordTopology {

 public static class KafkaWordToUpperCase extends BaseRichBolt {  private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);  private static final long serialVersionUID = -5207232012035109026L;  private OutputCollector collector;  @Override  public void prepare(Map stormConf, TopologyContext context,  OutputCollector collector) {  this.collector = collector;                }  @Override  public void execute(Tuple input) {  String line = input.getString(0).trim();  LOG.info("RECV[kafka -> splitter] " + line);  if(!line.isEmpty()) {  String upperLine = line.toUpperCase();  LOG.info("EMIT[splitter -> counter] " + upperLine);  collector.emit(input, new Values(upperLine, upperLine.length()));  }  collector.ack(input);  }  @Override  public void declareOutputFields(OutputFieldsDeclarer declarer) {  declarer.declare(new Fields("line", "len"));           }  }  public static class RealtimeBolt extends BaseRichBolt {  private static final Log LOG = LogFactory.getLog(KafkaWordToUpperCase.class);  private static final long serialVersionUID = -4115132557403913367L;  private OutputCollector collector;  @Override  public void prepare(Map stormConf, TopologyContext context,  OutputCollector collector) {  this.collector = collector;                }  @Override  public void execute(Tuple input) {  String line = input.getString(0).trim();  LOG.info("REALTIME: " + line);  collector.ack(input);  }  @Override  public void declareOutputFields(OutputFieldsDeclarer declarer) {  }  }  public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {  // Configure Kafka  String zks = "h40:2181,h41:2181,h42:2181";  String topic = "test";  String zkRoot = "/storm"; // default zookeeper root configuration for storm  String id = "word";  BrokerHosts brokerHosts = new ZkHosts(zks,"/kafka/brokers");  SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);  spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());  spoutConf.forceFromStart = false;  spoutConf.zkServers = Arrays.asList(new String[] {"h40", "h41", "h42"});  spoutConf.zkPort = 2181;  // Configure HDFS bolt  RecordFormat format = new DelimitedRecordFormat()  .withFieldDelimiter("\t"); // use "\t" instead of "," for field delimiter  SyncPolicy syncPolicy = new CountSyncPolicy(1000); // sync the filesystem after every 1k tuples  FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES); // rotate files  FileNameFormat fileNameFormat = new DefaultFileNameFormat()  .withPath("/storm/").withPrefix("app_").withExtension(".log"); // set file name format  HdfsBolt hdfsBolt = new HdfsBolt()  .withFsUrl("hdfs://h40:9000")  .withFileNameFormat(fileNameFormat)  .withRecordFormat(format)  .withRotationPolicy(rotationPolicy)  .withSyncPolicy(syncPolicy);  // configure & build topology  TopologyBuilder builder = new TopologyBuilder();  builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);  builder.setBolt("to-upper", new KafkaWordToUpperCase(), 3).shuffleGrouping("kafka-reader");  builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping("to-upper");  builder.setBolt("realtime", new RealtimeBolt(), 2).shuffleGrouping("to-upper");  // submit topology  Config conf = new Config();  String name = DistributeWordTopology.class.getSimpleName();  if (args != null && args.length > 0) {  String nimbus = args[0];  conf.put(Config.NIMBUS_HOST, nimbus);  conf.setNumWorkers(3);  StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());  } else {  conf.setMaxTaskParallelism(3);  LocalCluster cluster = new LocalCluster();  cluster.submitTopology(name, conf, builder.createTopology());  Thread.sleep(60000);  cluster.shutdown();  }  }

}

先运行flume进程:
[hadoop@h40 apache-flume-1.6.0-bin]bin/flume−ngagent−c.−fconf/kafka.conf−na1−Dflume.root.logger=INFO,console[hadoop@h40 ] bin/flume-ng agent -c . -f conf/kafka.conf -n a1 -Dflume.root.logger=INFO,console [hadoop@h40 ~] echo “hello world” >> data.txt

上面代码中,名称为to-upper的Bolt将接收到的字符串行转换成大写以后,会将处理过的数据向后面的hdfs-bolt、realtime这两个Bolt各发一份拷贝,然后由这两个Bolt分别根据实际需要(实时/离线)单独处理。
打包后,在Storm集群上部署并运行这个Topology:
[hadoop@h40 apache-storm-0.9.5]bin/stormjarwordcount.jarorg.shirdrn.storm.examples.DistributeWordTopologyh40在kafka生产者输入数据(一开始也不知道我输入的数据少还是咋么的,在hdfs的/storm目录下产生了一些空文件,然后我在生产者终端狂输数据,这才在hdfs中产生了又内容的文件)[hadoop@h40 ] bin/storm jar wordcount.jar org.shirdrn.storm.examples.DistributeWordTopology h40 在kafka生产者输入数据(一开始也不知道我输入的数据少还是咋么的,在hdfs的/storm目录下产生了一些空文件,然后我在生产者终端狂输数据,这才在hdfs中产生了又内容的文件) [hadoop@h40 ~] hadoop fs -lsr /
[plain] view plain copy
lsr: DEPRECATED: Please use ‘ls -R’ instead.
drwxr-xr-x - hadoop supergroup 0 2017-05-10 14:36 /aaa
drwxr-xr-x - hadoop supergroup 0 2017-05-11 15:37 /storm
-rw-r–r– 3 hadoop supergroup 0 2017-05-11 15:33 /storm/app_hdfs-bolt-5-0-1494487931066.log
-rw-r–r– 3 hadoop supergroup 0 2017-05-11 15:34 /storm/app_hdfs-bolt-5-1-1494487991779.log
-rw-r–r– 3 hadoop supergroup 37 2017-05-11 15:35 /storm/app_hdfs-bolt-5-2-1494488051772.log
-rw-r–r– 3 hadoop supergroup 0 2017-05-11 15:36 /storm/app_hdfs-bolt-5-3-1494488111772.log
-rw-r–r– 3 hadoop supergroup 12 2017-05-11 15:37 /storm/app_hdfs-bolt-5-4-1494488171771.log
-rw-r–r– 3 hadoop supergroup 0 2017-05-11 15:37 /storm/app_hdfs-bolt-5-5-1494488231941.log

[hadoop@h40 ~]$ hadoop fs -cat /storm/app_hdfs-bolt-5-2-1494488051772.log
DFAD 4
FAF 3
GWEG 4
HELLO FALWREW 13

参考:
http://shiyanjun.cn/archives/934.html
http://www.cnblogs.com/smartloli/p/4615908.html
http://blog.csdn.Net/liubiaoxin/article/details/49231731
http://blog.csdn.NET/mylittlered/article/details/48029705
http://www.iyunv.com/thread-26520-1-1.html

flume+kafka+storm+hdfs整合相关推荐

  1. Kafka+Storm+HDFS整合实践

    2019独角兽企业重金招聘Python工程师标准>>> 在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但 ...

  2. Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

    http://blog.51cto.com/xpleaf/2104160?cid=704690 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进 ...

  3. 大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合

    个人观点:大数据我们都知道hadoop,但并不都是hadoop.我们该如何构建大数据库项目.对于离线处理,hadoop还是比较适合的,但是对于实 时性比较强的,数据量比较大的,我们可以采用Storm, ...

  4. Flume+Kafka+Storm实战:一、Kakfa与Storm整合

    文章目录 0x00 文章内容 0x01 Kafka准备 1. 启动Kafka 2. 创建Topic 3. 启动消费者与消费者 0x02 Storm准备 1. 构建Maven项目 2. 编写代码 0x0 ...

  5. flume+kafka+storm整合02---问题

    ###1.启动storm时总是报错 storm Session 0x0 for server null, unexpected error, closing socket connection ### ...

  6. flume kafka storm mysql_flume+kafka+storm打通过程

    0.有的地方我已经整理成脚本了,有的命令是脚本里面截取的 1.启动hadoop和yarn $HADOOP_HOME/sbin/start-dfs.sh;$HADOOP_HOME/sbin/start- ...

  7. [置顶] HADOOP大数据离线分析+实时分析框架;Hadoop+Flume+Kafka+Storm+Hive+Sqoop+mysql/oracle

    版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] 离线分析框架搭建 hadoop集群搭建部署 Hadoop HA部署 Hive安装部署 Sqoop组件安装部署 实时分析框架搭建 ...

  8. HADOOP大数据离线分析+实时分析框架;Hadoop+Flume+Kafka+Storm+Hive+Sqoop+mysql/oracle

    离线分析框架搭建 hadoop集群搭建部署 Hadoop HA部署 Hive安装部署 Sqoop组件安装部署 实时分析框架搭建 storm kafka kafka安装配置 storm集群安装部署

  9. 实时数据处理环境搭建flume+kafka+storm:4.storm安装配置

    1.解压 apache-storm-0.9.3.tar.gz 2.修改配置文件 conf/storm.yaml --zk地址  storm.zookeeper.servers:  - "wc ...

  10. 实时数据处理插件开发flume+kafka+storm:flume

    有时间了再写.... 转载于:https://www.cnblogs.com/wchb/p/5786582.html

最新文章

  1. SOC在安全方面的真实价值
  2. Windows Server Backup 裸机恢复
  3. CImageList类Create函数参数解析
  4. python time 语句_python的time模块总结
  5. 每日一笑 | 床上还是桌上,你总得选一样~
  6. Mr.J--密码强度检测
  7. 软件开发的核心问题是什么
  8. java数据返回到界面,java后台获取网页ajax数据和返回数据简单源码
  9. 项目经理排期的几个tip
  10. 结构体中操作c语言,C语言中结构体的操作
  11. 【WPS表格】从身份证号码提取各种信息,如出生日期、年龄、性别、户籍所在地
  12. 读文章《新阶级论:寒门难贵,豪门难收》
  13. 实例详解ISA防火墙策略元素:ISA2006系列之五
  14. excel 序号下拉不能够自动(递增)排序
  15. Snipaste 截图贴图
  16. MySQL忘记密码,如何重置
  17. 阿里巴巴普惠_河北省辛集市与阿里巴巴举行数字乡村合作项目签约仪式
  18. 如何优雅的使用markdown写博客--微博图床使用说明
  19. 信息管理概论(自考)
  20. java中级程序员面试题_中级Java程序员常见面试题汇总

热门文章

  1. java类加载器ClassLoader浅析
  2. Android ADB动态查看内存信息之Watch使用
  3. 突发!甲骨文严查Java授权,很多公司连夜卸载了JDK,启用OpenJDK 替代....
  4. 为什么我们总觉得别人掌握的技术总是牛叉的?
  5. android单例模式代码,在Android studio 中使用单例模式(示例代码)
  6. centos php mongodb 驱动,安装 MongoDB PHP 驱动 在CentOS 6.x和遇到的问题
  7. mysql char最大长度_MySQL中的CHAR和VARCHAR到底支持多长?
  8. python session模块_python requests模块session的使用建议及整个会话中的所有cookie的方法...
  9. python称为胶水的例子_为什么称python为胶水语言
  10. mysql 拷贝权限_mysql复制表的三种方法+grant三类权限说明