storm+kafka+logstash+springBoot+高德地图

项目概述:

作用:交通信息化,智慧城市

需求:实时统计人流量并通过热力图展示。

类似于腾讯热力图的景区人流量统计

如何采集某个区域人流量的数据:

1.GPS:获取经纬度信息。

2.手机移动网络信令:移动通信信令(数据样本容量大,覆盖范围广,数据稳定可靠)对信令信息的相应字段进行分析、挖掘、并结合GIS技术实现自定义区域实时人流量的智能化统计分析。通过移动用户发生的通信事件记录来判断该用户所处的位置,可以根据事件发生的区域,对用户的行为轨迹进行定义。

1.区域内inside:用户处在目标区域范围内

2.区域外outside:用户处在目标区域范围外

3.离开leave:观察到驻留在某个区域的用户在该区域以外的某个地方发生了一个通信事件,则认为该用户离开了该区域。

4.出现appear:观察到用户在一个区域发生了通信事件,则认为该用户在该区域出现。如果用户在某个区域第一次出现,则认为用户进入了该区域。

通过信令拿到当前位置的经纬度。

项目架构:

projectStormDataSource.py日志产生器代码

#!/usr/bin/env/ python
# coding=UTF-8
import random
import timeinfos = ["105.795545,29.74814","109.887206,31.236759","107.767851,29.417501","108.750152,28.851412","106.996368,28.885217","107.127392,29.049247","106.409199,28.606263","108.993301,30.656046"
]phones = ["18523981111", "18523981112", "18523981113","18523981114", "18523981115", "18523981116","18523981117", "18523981118", "18523981119"
]def phone_m():return random.sample(phones, 1)[0]def info_m():return random.sample(infos, 1)[0]def producer_log(count=3):time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())f = open("/app/logstash/testFile.txt", "a+")while count >= 1:query_log = "{phone}\t{info}\t[{local_time}]".format(phone=phone_m(), local_time=time_str, info=info_m())print(query_log)f.write(query_log + "\n")count = count - 1if __name__ == '__main__':# print phone_m()# print info_m()producer_log(10)

logstash配置

cd /app/logstashvi FileLogstashKafka.conf
input {file{path => "/app/logstash/testFile.txt"}}output {kafka {topic_id => "storm_topic"bootstrap_servers => "node1:9092"batch_size => 1codec => plain{format => "%{message}"}}
}

项目目录

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.imooc.bigdata</groupId><artifactId>storm</artifactId><version>1.0</version><packaging>jar</packaging><name>storm</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><storm.version>1.1.1</storm.version><hadoop.version>2.9.0</hadoop.version></properties><dependencies><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><version>${storm.version}</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>log4j-over-slf4j</artifactId></exclusion><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId></exclusion></exclusions></dependency><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.4</version></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-jdbc</artifactId><version>${storm.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.31</version></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>16.0.1</version></dependency><!-- 如果不加这个 报错NoClassDefFoundError:kafka/api/OffsetRequest --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>0.9.0.0</version><exclusions><exclusion><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId></exclusion></exclusions></dependency><!-- 如果不加这个 报错NoClassDefFoundError:org/apache/curator/shaded/com/google/common/cache/CacheBuilder --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>2.12.0</version></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka</artifactId><version>${storm.version}</version></dependency></dependencies>
</project>

StormKafkaTopo.java

package com.imooc.bigdata.integration.kafka;import com.google.common.collect.Maps;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.topology.TopologyBuilder;import java.util.Map;
import java.util.UUID;/*** logstash+kafka+storm+mysql+springBoot+高德地图*/
public class StormKafkaTopo {public static void main(String[] args) {TopologyBuilder builder = new TopologyBuilder();// Kafka使用的zk地址BrokerHosts hosts = new ZkHosts("node1:2181,node2:2181,node3:2181");// Kafka存储数据的topic名称String topic = "storm_topic";// 指定ZK中的一个根目录,存储的是KafkaSpout读取数据的位置信息(offset)String zkRoot = "/" + topic;String id = UUID.randomUUID().toString();SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, id);// 设置读取偏移量的操作spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);String SPOUT_ID = KafkaSpout.class.getSimpleName();builder.setSpout(SPOUT_ID, kafkaSpout);String BOLD_ID = LogProcessBolt.class.getSimpleName();builder.setBolt(BOLD_ID, new LogProcessBolt()).shuffleGrouping(SPOUT_ID);//jdbc bolt 结果写入MySQLMap hikariConfigMap = Maps.newHashMap();hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/sid");hikariConfigMap.put("dataSource.user","root");hikariConfigMap.put("dataSource.password","密码");ConnectionProvider connectionProvider = new HikariCPConnectionProvider(hikariConfigMap);String tableName = "stat";JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper).withTableName(tableName).withQueryTimeoutSecs(30);builder.setBolt("JdbcInsertBolt", userPersistanceBolt).shuffleGrouping(BOLD_ID);LocalCluster cluster = new LocalCluster();cluster.submitTopology(StormKafkaTopo.class.getSimpleName(),new Config(),builder.createTopology());}
}

LogProcessBolt.java

package com.imooc.bigdata.integration.kafka;import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;import java.util.Map;/*** 接收Kafka的数据进行处理的BOLT*/
public class LogProcessBolt extends BaseRichBolt {private OutputCollector collector;public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {this.collector = collector;}public void execute(Tuple input) {try {byte[] binaryByField = input.getBinaryByField("bytes");String value = new String(binaryByField);/*** 13977777777    116.399466,39.989743    [2018-01-14 11:22:34]** 解析出来日志信息*/String[] splits = value.split("\t");String phone = splits[0];String[] temp = splits[1].split(",");String longitude = temp[0];String latitude = temp[1];long time = DateUtils.getInstance().getTime(splits[2]);System.out.println(phone + "," + longitude + "," + latitude + "," + time);collector.emit(new Values(time, Double.parseDouble(longitude), Double.parseDouble(latitude)));this.collector.ack(input);} catch (Exception e) {this.collector.fail(input);}}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("time", "longitude", "latitude"));}
}

DateUtils.java

package com.imooc.bigdata.integration.kafka;import org.apache.commons.lang3.time.FastDateFormat;/*** 时间解析工具类*/
public class DateUtils {private DateUtils(){}private static DateUtils instance;public static DateUtils getInstance(){if (instance == null) {instance = new DateUtils();}return instance;}FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");public long getTime(String time) throws Exception {return format.parse(time.substring(1, time.length()-1)).getTime();}}

MySQL创建表

CREATE TABLE `stat` (`time` bigint(20) DEFAULT NULL,`latitude` double DEFAULT NULL,`longitude` double DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

启动zk

cd /app/zookeeper/bin

./zkServer.sh start

启动kafka

cd /app/kafka

bin/kafka-server-start.sh -daemon config/server.properties &

启动logstash的FileLogstashKafka.conf文件

cd /app/logstash

bin/logstash -f FileLogstashKafka.conf

IDEA启动程序

使用python生成日志

cd /app/logstash

python projectStormDataSource.py

查看IDEA控制台

查看MySQL

数据可视化展示:Spring Boot+高德地图热力图动态数据展示+MySQL

【十三】景区人流量统计:python日志生成+logstash+kafka+storm+mysql+springBoot+高德地图相关推荐

  1. python交通调查数据处理_GitHub - unlimitbladeworks/traffic-monitor: 基于高德地图的交通数据分析...

    traffic-monitor(基于高德地图的交通数据分析) 设计需求在于每天上班早高峰期,每次都提前出门,虽然有地图可以实时查看路况,但是再过一阵时间 就会异常的堵车如果通过数据监控分析每天指定路段 ...

  2. 《Android studio 创建生成keystore SHA1值的申请 高德地图key值申请 android studio 打包生成apk》

    开发背景:目前做车载项目,领导要求用高德地图.整理了一下,差不多就是下面的目录: 一.创建生成keystore: 二.SHA1值的申请: 三.高德地图key值申请: 四.android studio ...

  3. python随机数据库_Python实现生成随机数据插入mysql数据库的方法

    本文实例讲述了Python实现生成随机数据插入mysql数据库的方法.分享给大家供大家参考,具体如下: 运行结果: 实现代码: import random as r import pymysql fi ...

  4. 蓝桥杯 日志统计 Python

    蓝桥杯 日志统计 Python 一.题目 输入格式 输出格式 样例输入 样例输出 二.Python代码 总结 提示:以下是本篇文章正文内容,下面案例可供参考 一.题目 小明维护着一个程序员论坛.现在他 ...

  5. python百度AI的人流量统计(静态1)

    静态1 #静态 #-*- coding: utf-8 -*- #!/usr/bin/env pythonimport urllib import base64 import json import t ...

  6. 基于PaddleDetection实现人流量统计人体检测

    人流量统计 1. 项目概述 在地铁站.火车站.机场.展馆.景区等公共场所,需要实时检测人流数量,当人流密度过高时及时预警,并实施导流.限流等措施,防止安全隐患. 在人流密度较高的公共场所,使用Padd ...

  7. Aidlux实践-快速实现街道人流量统计系统

    背景 之前通过公众号了解到了Aidlux,机缘巧合的情况下参加了『In AidLux,To AIoT』AI应用案例征集活动,在活动过程中,初识了Aidlux平台,该平台可以极大的方便深度学习落地,为智 ...

  8. 基于AidLux平台的医院进出口人流量统计案例开发与测试

    1.环境配置 1.1.跨平台应用系统Aidlux AIdlux系统是基于ARM架构的跨生态(Android/鸿蒙+Linux)一站式AIOT应用开发平台.实际应用到现场的时候,通常会以几种形态:GPU ...

  9. 适应多场景的客流量统计-人流量统计算法

    在商场.展厅.景区等受人流量影响较大的场所,流量统计算法可以快速获取流量数据和动态趋势,辅助评估店铺和部分活动的效果,帮助商业决策.另外,在地铁站.火车站.机场等公共场所.实时检测人数可以及时预警高密 ...

最新文章

  1. 一个Solidity源文件的布局
  2. PHP 语言结构(Language constructs)和函数的区别
  3. matlab矩阵分解
  4. docker对aufs触发的bug
  5. oracle数据库优化--基本概念
  6. 简单三个数比较大小 “?!”的用法
  7. .Net Core实战之基于角色的访问控制的设计
  8. 蓝桥杯 ADV-150算法提高 周期字串
  9. linux程序运行段错误,Linux下的段错误产生的原因及调试方法
  10. 20180513 实参 形参
  11. http://code.svnspot.com/ 免费代码托管
  12. 记事本编写java_编写运行最简单的java程序——使用记事本编写java程序
  13. 大数据经典案例与谬误
  14. Springcloud服务调用Feign组件以及负载均衡
  15. 4个漂亮的wordpress企业主题
  16. Gateway GPRS support node (GGSN) 相关资料
  17. kafka文档: 配置选项翻译
  18. 机器学习笔记 - 特征向量和特征值
  19. Ssd202 FUART 测试环境搭建
  20. 对k8s的背景知识汇总

热门文章

  1. 02_平面转换 transform
  2. 【文件md5值查看方法详解】:如何获取文件的唯一标识?
  3. 腾讯股票API获取上证指数
  4. 物流企业怎样实现服务创新 (zt)
  5. Python requests +PrettyTable 查询高铁或者动车票
  6. 计算机操作系统 概述(题库答案)
  7. vr场景制作费用介绍,vr场景制作流程都有哪些?
  8. OpenCV森林火灾检测
  9. AOTF偏振光谱成像仪的原理及构造
  10. dnf台服升级mysql_2.11.7使用MySQL Yum存储库升级MySQL