kafka--storm--mongodb
目的:
通过Spout发射kafka的数据,到bolt统计每一个单词的个数,将这些记录更新到mongodb中。
Spout的nextTuple方法会一直处于一个while循环这中,每一条数据发送给bolt后,bolt都会调用一次execute方法。
spout用于发射数据,bolt用于对数据进行处理。
MongoUtil:mongo工具类
package storm;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
public class MongoUtil {
private MongoUtil(){}
private static MongoClient mongo;
private static DB db;
private static DBCollection collection;
static{
mongo = new MongoClient("192.168.170.185",27017);
db = mongo.getDB("mySpout");
collection = db.getCollection("myBolt");
}
public static Long getCount(){
return collection.count(new BasicDBObject("_id",1L));
}
public static void insert(String substring){
DBObject obj = new BasicDBObject();
obj.put("_id", 1);
obj.put("bolt", substring);
collection.insert(obj);
}
public static void update(String substring){
DBObject obj = new BasicDBObject();
obj.put("_id", 1);
DBObject obj2 = collection.findOne(obj);
obj2.put("bolt", substring);
collection.update(obj, obj2);
}
}
SentenceSpout:发射数据的spout,从kafka读取数据。
package storm;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.utils.Utils;
import org.apache.storm.Constants;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import kafka.KafkaConsumer;
import kafka.KafkaProducer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
public class SentenceSpout extends BaseRichSpout{
private SpoutOutputCollector collector;
private int index = 0;
private ConsumerConnector consumer;
private Map conf;
@Override
public void open(Map map, TopologyContext context, SpoutOutputCollector collector) {//尽量将初始化写在open方法中,否则可能会报错。
this.conf = map;
this.collector = collector;
Properties props = new Properties();
// zookeeper 配置
props.put("zookeeper.connect", "192.168.170.185:2181");
// 消费者所在组
props.put("group.id", "testgroup");
// zk连接超时
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
// 序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");
ConsumerConfig config = new ConsumerConfig(props);
this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
}
@Override
public void nextTuple() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("helloworld", new Integer(1));
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
KafkaStream<String, String> stream = consumerMap.get("helloworld").get(0);
ConsumerIterator<String, String> it = stream.iterator();
int messageCount = 0;
while (it.hasNext()){
this.collector.emit(new Values(it.next().message().toString()));
}
// index = (index+1>=sentences.length)?0:index+1;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
SplitSentenceBolt:切割单词bolt
package storm;
import java.util.Map;
import org.apache.storm.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class SplitSentenceBolt extends BaseRichBolt{
private OutputCollector collector;
private Map stormConf;
@Override
public void prepare(Map map, TopologyContext context, OutputCollector collector) {
this.stormConf = map;
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String str = tuple.getStringByField("sentence");
String[] split = str.split(" ");
for(String word : split){
this.collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
WordCountBolt:计数的bolt
package storm;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class WordCountBolt extends BaseRichBolt{
private Map boltconf;
private OutputCollector collector;
private HashMap<String,Long> counts = null;
@Override
public void prepare(Map map, TopologyContext context, OutputCollector collector) {
this.boltconf = map;
this.collector=collector;
this.counts = new HashMap<String,Long>();
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
this.counts.put(word, this.counts.containsKey(word)?this.counts.get(word)+1:1);
this.collector.emit(new Values(word,counts.get(word)));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
}
ReportBolt:打印记录结果,并将结果插入mongodb中bolt
package storm;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import com.mongodb.BasicDBObject;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
public class ReportBolt extends BaseRichBolt{
private HashMap<String,Long> counts = null;
private Map boltconf;
private StringBuffer buf = null;
@Override
public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
this.boltconf = arg0;
this.counts=new HashMap<String,Long>();
this.buf = new StringBuffer();
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long counts = tuple.getLongByField("count");
this.counts.put(word, counts);
System.out.println("------统计结果------");
List<String> keys = new ArrayList<String>();
keys.addAll(this.counts.keySet());
buf.append("{");
for(String key : keys){
buf.append(key+":"+this.counts.get(key)).append(",");
System.out.println(key + " : " +this.counts.get(key));
}
System.out.println("------------------");
buf.append("}");
String substring = buf.delete(buf.length()-2, buf.length()-1).toString();
long count = MongoUtil.getCount();
if(count<=0){
MongoUtil.insert(substring);
}else{
MongoUtil.update(substring);
}
buf = buf.delete(0, buf.length());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer arg0) {
// TODO Auto-generated method stub
}
/* @Override
public Map<String, Object> getComponentConfiguration() {
HashMap<String, Object> hashMap = new HashMap<String, Object>();
hashMap.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10);
return hashMap;
}*/
}
WordCountTopology: topology,storm零件的组装
package storm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class WordCountTopology {
private static final String SENTENCE_SPOUT_ID = "sentence-spout";
private static final String SPLIT_BOLT_ID = "split-bolt";
private static final String COUNT_BOLT_ID = "count-bolt";
private static final String REPORT_BOLT_ID = "report-bolt";
private static final String TOPOLOGY_NAME = "word-count-topology";
public static void main(String[] args) throws Exception {
//--实例化Spout和Bolt
SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt splitBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
ReportBolt reportBolt = new ReportBolt();
//--创建TopologyBuilder类实例
TopologyBuilder builder = new TopologyBuilder();
//--注册SentenceSpout
builder.setSpout(SENTENCE_SPOUT_ID, spout);
//--注册SplitSentenceBolt,订阅SentenceSpout发送的tuple
//此处使用了shuffleGrouping方法,此方法指定所有的tuple随机均匀的分发给SplitSentenceBolt的实例。
builder.setBolt(SPLIT_BOLT_ID, splitBolt).shuffleGrouping(SENTENCE_SPOUT_ID);
//--注册WordCountBolt,,订阅SplitSentenceBolt发送的tuple
//此处使用了filedsGrouping方法,此方法可以将指定名称的tuple路由到同一个WordCountBolt实例中
builder.setBolt(COUNT_BOLT_ID, countBolt).fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
//--注册ReprotBolt,订阅WordCountBolt发送的tuple
//此处使用了globalGrouping方法,表示所有的tuple都路由到唯一的ReprotBolt实例中
builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);
//--创建配置对象
Config conf = new Config();
//--创建代表集群的对象,LocalCluster表示在本地开发环境来模拟一个完整的Storm集群
//本地模式是开发和测试的简单方式,省去了在分布式集群中反复部署的开销
//另外可以执行断点调试非常的便捷
LocalCluster cluster = new LocalCluster();
//--提交Topology给集群运行
cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology());
//--运行10秒钟后杀死Topology关闭集群
Thread.sleep(300000000);
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
}
}
转载于:https://www.cnblogs.com/wangjing666/p/6894015.html
kafka--storm--mongodb相关推荐
- Kafka+Storm+HDFS整合实践
2019独角兽企业重金招聘Python工程师标准>>> 在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但 ...
- kfaka storm写入mysql_flume+kafka+storm+mysql架构设计
序言 前段时间学习了storm,最近刚开blog,就把这些资料放上来供大家参考.这个框架用的组件基本都是最新稳定版本,flume-ng1.4+kafka0.8+storm0.9+mysql如果有需要测 ...
- mysql storm_flume+kafka+storm+mysql架构设计
前段时间学习了storm,最近刚开blog,就把这些资料放上来供大家参考. 这个框架用的组件基本都是最新稳定版本,flume-ng1.4+kafka0.8+storm0.9+mysql(项目是mave ...
- redis storm mysql_flume+kafka+storm+redis/mysql启动命令记录
1.flume启动 bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name fks -Dflum ...
- Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示
http://blog.51cto.com/xpleaf/2104160?cid=704690 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进 ...
- 【十三】景区人流量统计:python日志生成+logstash+kafka+storm+mysql+springBoot+高德地图
storm+kafka+logstash+springBoot+高德地图 项目概述: 作用:交通信息化,智慧城市 需求:实时统计人流量并通过热力图展示. 类似于腾讯热力图的景区人流量统计 如何采集某个 ...
- 如何使用 Kafka、MongoDB 和 Maxwell’s Daemon 构建 SQL 数据库的审计系统
点击上方"朱小厮的博客",选择"设为星标" 后台回复"书",获取后台回复"k8s",可领取k8s资料 本文要点 审计日志 ...
- 大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合
个人观点:大数据我们都知道hadoop,但并不都是hadoop.我们该如何构建大数据库项目.对于离线处理,hadoop还是比较适合的,但是对于实 时性比较强的,数据量比较大的,我们可以采用Storm, ...
- nodejs+kafka+storm+hbase 开发
1.环境介绍 如图所示,NODEJS做为数据源的的产生者产生消息,发到Kafka队列,然后参见红线,表示本地开发的环境下数据的流向(本地开发时,storm topology运行在本地模式) 2.搭建环 ...
- kafka + storm 错误 Async loop died
错误如下: 13524 [Thread-10-kafka-spout] ERROR b.s.util - Async loop died!java.lang.RuntimeException: jav ...
最新文章
- ValueError: invalid literal for int() with base 10: “ ”
- 【微软亚洲研究院MSRA】招聘多模态方向算法实习生
- gitlab自带的Nginx与原Nginx冲突的解决方案
- 会声会影x7 每次安装均会提示:已安装这个产品的另一个版本
- 结合自己造的轮子实践按需加载
- 转: FFmpeg功能命令汇总
- c++ 截取\r\n问题
- OpenGL基于PBR的图像的光照IBL的实例
- 1.关于UltraEdit中的FTP和Tenent配置,UE远程连接Linux进行文件操作
- 关于字符编码,你所需要知道的
- 使用原生js写ajax
- Zookeeper实现注册与发现
- Kotlin入门(20)几种常见的对话框
- 清华博导尹首一, 带你吃透 AI 芯片来龙去脉!
- [BZOJ4523]路由表
- Java设计模式4:单例模式
- 【渝粤教育】电大中专计算机网络基础 (2)作业 题库
- Orangleliu的2018年小结
- oracle erp日志,错误,什么地方看日志
- 火狐浏览器(firefox)简体中文最新版下载: