一、概述

本篇文章主要介绍如何使用Storm + Logstash + Kafka 实现实时数据的计算,并且使用高德地图API实现热力图的展示。

背景知识:

在有些场合,我们需要了解当前人口的流动情况,比如,需要实时监控一些旅游景点旅客的密集程度,这时可以使用GPS定位系统将该区域内旅客的IP数据进行计算,但是GPS定位系统也有一定的缺点,不是每个旅客都会GPS功能,这时可以使用“信令”来获取个人定位信息。所谓“信令”就是每个手机会不是的向附近最近的基站发送定位信息,除非手机关机。相信每个人在做车旅游的时候每经过一个地方都会受到某个地区的短信,“某某城市欢迎你的来访”等信息,移动电信应用就是利用“信令”来监控每个的定位信息。(同时也可以看出大数据下个人隐私很难受到保护)。

1. 项目架构

在这里我们使用了 Logstash来抽取日志数据,它与 Flume 类似,由于没有是实验项目,因此使用 Python 模拟数据。在经过 Logstash 将数据抽取到 Kafka 中,Strom 会实时消费数据,然后计算结果实时写入 MySQL数据库中,然后我们可以将结果送到后台应用中使用和可视化展示。

2. 环境以及软件说明

  • storm-1.1.1
  • zookeeper-3.4.5-cdh5.3.6
  • logstash-2.4.1
  • kafka_2.11-0.9.0.0

二、实战

1. 模拟数据

#coding=UTF-8import random
import timephone=["13869555210","18542360152","15422556663","18852487210","13993584664","18754366522","15222436542","13369568452","13893556666","15366698558"
]location=["116.191031, 39.988585","116.389275, 39.925818","116.287444, 39.810742","116.481707, 39.940089","116.410588, 39.880172","116.394816, 39.91181","116.416002, 39.952917"
]def sample_phone():return random.sample(phone,1)[0]
def sample_location():return random.sample(location, 1)[0]def generator_log(count=10):time_str=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())f=open("/opt/storm_project/datas/logs/access.log","a+")while count>=1:query_log="{phone}\t{location}\t{date}".format(phone=sample_phone(),location=sample_location(),date=time_str)f.write(query_log+"\n")#   print query_logcount=count-1if __name__=='__main__':generator_log(100)

2. Logstash 配置

在Logstash安装目录下添加配置文件 storm_pro.conf:

input{
file{path => '/opt/storm_project/datas/logs/access.log'
}
}output{
kafka{topic_id => "storm_project"batch_size => 1bootstrap_servers =>"hadoop-senior.shinelon.com:9092"codec => plain{format => "%{message}"}}
}

注意:上面配置中path指定读取数据的文件,可自行创建。topic_id 参数为下文kafka中需要创建的 topic主题。

3. Kafka配置

在kafka安装目录下添加配置文件server.properties:

broker.id=0############################# Socket Server Settings #############################listeners=PLAINTEXT://:9092# The port the socket server listens on
port=9092# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=hadoop-senior.shinelon.com
zookeeper.connect=hadoop-senior.shinelon.com:2181

注意:kafka需要配置zookeeper使用,需要配置zk。

4. Strom程序编写

package cn.just.shinelon.integration;import cn.just.shinelon.utils.DateUtil;
import com.google.common.collect.Maps;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
import org.apache.storm.kafka.*;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.Map;
import java.util.UUID;public class KafkaTopology {/*** 源码:* public class RawMultiScheme implements MultiScheme {public RawMultiScheme() {}public Iterable<List<Object>> deserialize(ByteBuffer ser) {return Arrays.asList(Utils.tuple(new Object[]{Utils.toByteArray(ser)}));}public Fields getOutputFields() {return new Fields(new String[]{"bytes"});}}*/public static class PrintBolt extends BaseRichBolt{private OutputCollector outputCollector;@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {this.outputCollector=outputCollector;}@Overridepublic void execute(Tuple tuple) {try {byte[] bytes=tuple.getBinaryByField("bytes");String input = new String(bytes);String[] logs = input.split("\t");String phone = logs[0];String tmp = logs[1];//经度Double longitude = Double.parseDouble(tmp.split(",")[0]);//纬度Double latitude = Double.parseDouble(tmp.split(",")[1]);//时间,需要计算当前N久的数据long timestamp = DateUtil.getInstance().getTime(logs[2]);System.out.println(phone+", "+longitude+","+latitude+", "+timestamp);//发射数据outputCollector.emit(new Values(timestamp,latitude,longitude));outputCollector.ack(tuple);} catch (Exception e) {e.printStackTrace();outputCollector.fail(tuple);}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare(new Fields("time","latitude","longitude"));}}public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();//JDBC配置参数Map hikariConfigMap = Maps.newHashMap();hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/storm");hikariConfigMap.put("dataSource.user","root");hikariConfigMap.put("dataSource.password","123456");ConnectionProvider connectionProvider;connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);//表名String tableName = "location";JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper).withInsertQuery("insert into location values(?,?,?)").withQueryTimeoutSecs(30);//ZK地址BrokerHosts hosts = new ZkHosts("hadoop-senior.shinelon.com:2181");String topicName="storm_project";SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());//设置消费数据时间,默认会从源头开始消费spoutConfig.startOffsetTime=kafka.api.OffsetRequest.LatestTime();KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);builder.setSpout("KafkaSpout",kafkaSpout);builder.setBolt("PrintBolt",new PrintBolt()).shuffleGrouping("KafkaSpout");builder.setBolt("JdbcInsertBolt",userPersistanceBolt).shuffleGrouping("PrintBolt");LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("KafkaTopology",new Config(),builder.createTopology());}
}

5. 数据库的设计

create database storm;use storm;create table location(
time bigint,
latitude double,
longitude double
)charset utf8;

6. 集群的启动

首先启动kafka(注意:需要启动ZK)。

启动kafka:

nohup bin/kafka-server-start.sh config/server.properties &

创建topic:

bin/kafka-topics.sh --create --zookeeper hadoop-senior.shinelon.com:2181 --replication-factor 1 --partitions 1 -- topic storm_project

注意:topic名称和logstash中配置的必须一致。

启动logstash:

bin/logstash -f storm_pro.conf

在启动kafka和logstash之后就可以启动 Strom,接着可以运行python数据模拟器,就会看到数据库中存入了计算结果:

三、数据可视化展示

可视化结果如下图所示:

前端页面如下:

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8"/>
<title>高德地图</title><link rel="stylesheet" href="http://cache.amap.com/lbs/static/main1119.css"/>
</head>
<body><script src="js/echarts.min.js"></script><script src="js/jquery.min.js"></script><script src="http://webapi.amap.com/maps?v=1.4.9&amp;key=d16808eab90b7545923a1c2f4bb659ef"></script>
<div id="container"></div><script>var map = new AMap.Map("container", {resizeEnable: true,center: [116.418261, 39.921984],zoom: 11});var heatmap;var points =(function a(){  //<![CDATA[var city=[];$.ajax({type:"POST",url:"../get_map",dataType:'json',async:false,        //success:function(result){for(var i=0;i<result.length;i++){//alert("调用了");city.push({"lng":result[i].longitude,"lat":result[i].latitude,"count":result[i].count});}}})return city;})();//]]>/**    [{"lng":116.191031,"lat":39.988585,"count":1000},{"lng":116.389275,"lat":39.925818,"count":110},{"lng":116.287444,"lat":39.810742,"count":1200},{"lng":116.481707,"lat":39.940089,"count":130},{"lng":116.410588,"lat":39.880172,"count":140},{"lng":116.394816,"lat":39.91181,"count":15552},{"lng":116.416002,"lat":39.952917,"count":16}];**/;map.plugin(["AMap.Heatmap"],function() {      //加载热力图插件heatmap = new AMap.Heatmap(map,{raduis:50,opacity:[0,0.7]});    //在地图对象叠加热力图heatmap.setDataSet({data:points,max:100}); //设置热力图数据集//具体参数见接口文档}); // var map = new AMap.Map('container', {
//    pitch:75, // 地图俯仰角度,有效范围 0 度- 83 度
//    viewMode:'3D' // 地图模式
//});
</script></body>
</html>

SpringBoot DAO层代码如下:


import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;import org.springframework.stereotype.Component;import com.shiyanlou.shinelon.movie.domain.CityTimes;
import com.shiyanlou.shinelon.movie.utils.MysqlUtil;import cu.just.spark.domain.CourseClickCount;
import cu.just.spark.domain.Location;
import cu.just.spark.utils.HBaseUtils;
import groovy.util.logging.Commons;/*** @author shinelon**/
@Component
public class LocationDao {private static MysqlUtil mysqlUtil;public List<Location> map() throws Exception{List<Location> list = new ArrayList<Location>();Connection connection=null;PreparedStatement psmt=null;try {connection = MysqlUtil.getConnection();psmt = connection.prepareStatement("select longitude,latitude,count(*) from location where "+ "time>unix_timestamp(date_sub(current_timestamp(),interval 10 minute))*1000 "+ "group by longitude,latitude");ResultSet resultSet = psmt.executeQuery();while (resultSet.next()) {Location location = new Location();location.setLongitude(resultSet.getDouble(1));location.setLatitude(resultSet.getDouble(2));location.setCount(resultSet.getInt(3));list.add(location);}}catch (Exception e){e.printStackTrace();}finally {MysqlUtil.release();}return list;}}

实体类:


public class Location {private Integer count;private double latitude;private double longitude;public Integer getCount() {return count;}public void setCount(Integer count) {this.count = count;}public double getLatitude() {return latitude;}public void setLatitude(double latitude) {this.latitude = latitude;}public double getLongitude() {return longitude;}public void setLongitude(double longitude) {this.longitude = longitude;}
}

工具类:

package cu.just.spark.utils;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;public class MysqlUtil {private static final String DRIVER_NAME="jdbc:mysql://localhost:3306/movie?user=root&password=123456";private static Connection connection;private static PreparedStatement pstm;private static ResultSet resultSet;public static Connection getConnection(){try {Class.forName("com.mysql.jdbc.Driver");connection=DriverManager.getConnection(DRIVER_NAME);}catch (Exception e){e.printStackTrace();}return connection;}public static void release(){try {if(resultSet!=null) {resultSet.close();}if (pstm != null) {pstm.close();}if(connection!=null){connection.close();}}catch (Exception e){e.printStackTrace();}finally {if(connection!=null){connection=null;    //help GC}}}}

Controller层:

package cu.just.spark.controller;import java.util.List;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.ModelAndView;import cu.just.spark.dao.LocationDao;
import cu.just.spark.domain.Location;@RestController
public class MapController {@Autowiredpublic LocationDao locationDao;@RequestMapping("/storm")public ModelAndView storm() {return new ModelAndView("map");}@RequestMapping("/get_map")@ResponseBodypublic List<Location> getMap() throws Exception{return locationDao.map();}}

项目源码地址:源码

Strom实时热力图展示项目相关推荐

  1. 项目体系架构设计——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(四)

    系列文章目录 初识推荐系统--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一) 利用用户行为数据--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二) 项目主要效果展示--基 ...

  2. 利用用户行为数据——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二)

    系列文章目录 初识推荐系统--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一) 利用用户行为数据--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二) 项目主要效果展示--基 ...

  3. 基础环境搭建——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(五)

    系列文章目录 初识推荐系统--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一) 利用用户行为数据--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二) 项目主要效果展示--基 ...

  4. 通过view实现实时监测数据的实时更新展示

    概述 在做项目的时候,经常会有实时监测数据的地图展示,本文通过view实现实时监测数据的实时更新展示. 分析 对于实时监测数据,有以下两个特点:1.监测设备的空间信息不发生变化:2.监测数据会实时发生 ...

  5. 实时推荐服务建设——基于Spark平台的协同过滤实时电影推荐系统项目系列博客(八)

    系列文章目录 初识推荐系统--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(一) 利用用户行为数据--基于Spark平台的协同过滤实时电影推荐系统项目系列博客(二) 项目主要效果展示--基 ...

  6. echarts 实时数据展示

    echarts 实时数据展示 1. 构建一个web项目 ​ 完成从后台拉取MySQL的数据到前端的准备工作,我自己是用的ssm框架,这里是属于web的工作,不过多赘述. 2.先写一个简单的echart ...

  7. Excel实用技巧——甘特图展示项目排期

    1.Excel条件格式中,百分比和百分点值的区别. 解析:Excel对所选单元格值设置条件格式的规则类型中,最大值和最小值的选项都有"百分比"和"百分点值". ...

  8. 大数据实战项目------中国移动运营分析实时监控平台 || 项目背景

    中国移动运营分析实时监控平台 项目背景 中国移动公司旗下拥有很多的子机构,基本可以按照省份划分. 而各省份旗下的充值机构也非常的多. 目前要想获取整个平台的充值情况,需要先以省为单元,进行省份旗下的机 ...

  9. 实时热力图_呈贡哪里人气最旺?百度热力图告诉你,原来人都聚集在这些地方!...

    那么,你了解呈贡有多少人口吗? 按照呈贡区人民政府网站关于呈贡区基本情况的介绍,呈贡总辖区面积为461平方公里,下辖龙城.斗南.吴家营.雨花.洛龙.乌龙.马金铺.大渔.洛羊和七甸10个街道.其中,马金 ...

最新文章

  1. mysql行锁加在什么上_mysql怎么加行锁?
  2. 【Web】让你的web页面滚动更有趣
  3. 计算机硬件技术基础5章在线,《计算机硬件技术基础》试题(D)
  4. 微软白皮书发布:在IIS7.5中用Service Bus端点寄宿WCF服务
  5. mysql master 监控_可用于监控 mysql Master Slave 状态的python代码
  6. 趣学 C 语言(九)—— 复杂指针解析
  7. 乱码翻译器在线翻译_GAL党的福音——开源生肉翻译器MisakaTranslator正式版发布...
  8. 使用ACR122U和Proxmark3复制IC卡
  9. matlab统计像元灰度值的函数,matlab像素值及统计
  10. cmake使用教程(十)-关于file,真是恍然大悟啊
  11. if语句与switch语句
  12. 交叉熵损失(Cross-entropy)和平方损失(MSE)究竟有何区别?
  13. pgpool-II常见错误
  14. USB移动硬盘WinPE启动盘的制作方法
  15. php实现星座查询,php-十二星座查询系统(原创)
  16. re-id #issue
  17. Codeforces 127C Hot Bath
  18. 博客嵌入可以自适应的b站视频
  19. Rabin-Karp算法
  20. SAP IDOC和EDI应用(1) 基本原理介绍及实例操作

热门文章

  1. 如何快速设计业务架构?
  2. 通用接口平台开源版正式发布2.0版本
  3. 企业内容管理:中国信息化发展的加速器
  4. ug11许可证文件路径安装在哪_UG NX10.0建模模板文件路径在什么位置可以找到?...
  5. 通讯应用Kik推出聊天机器人商店
  6. 服务器远程连接失败是什么原因?服务器远程连接不上怎么处理?
  7. batch / numpy for pytorch (lyh
  8. Linux基础知识之systemd详解
  9. stata中的SDM模型、豪斯曼检验
  10. How to bypass Instagram SSL Pinning on Android