• 需求分析
  1. 摩拜单车的重点是物联网大数据
  2. 车投放在什么地方,要根据数据来进行支撑,根据历史数据(骑行记录)
  3. 通过共享单车的骑行数据可以分析出个性城市

成都休闲之都:骑行的地方多为娱乐场所

上海敬业之城:共享单车作为通勤的辅助手段

北京早起之城:早高峰的出现早于其他城市

深圳不夜之城:夜晚骑行度高于其他城市

  1. 年龄段骑行分析
  2. 同时共享单车抢占黑摩的的市场
  3. 深夜城市骑行热力图分析
  4. 骑行产生的数据

产生数据的来源:手机日志,智能锁

消耗 多少卡路里,骑行多长时间,骑行的起始地点等信息

  1. 同时物流,滴滴都使用类似的项目
  2. 有很多的公司都是从微信小程序拿数据的
  3. 大数据公司没有数据的时候做平台,有数据的时候分析数据

还需要做管理系统,最后做报表平台

  1. 微信小程序需要调用的端口,公安系统端口,短信验证系统端口

做了一个微信小程序,然后跟大数据进行对接,后台又做了管理系统,知识点就丰富了

  • 微信小程序建立

1.在page中进行开发

page中有两个目录:

index:书写代码:视图,控制、样式分开来进行设置wxml---js---wxss

logs:存放日志文件

utils:存放工具类

  1. 程序开发:

app.json中可以增加一个page文件夹

app.js全局控制器

app.wxss全局样式

  1. 控制器中的写法:

index.js中添加了按钮以及按钮的点击事件等

  1. 程序流程

很多手机终端----如果同时请求一台服务器(开锁、计费、充值等相关请求)服务器挂掉的话会出现扫不了码的情况所以需要一个服务器的集群,------在这之前还需要负载均衡服务器进行访问的分配

  1. 制作一个后台服务器

后台服务器用SpringBoot来制作

介绍Springboot

在start.spring.io中创建工程SpringBoot工程

新建controller层(通过注解的方式实现前端与程序的交互)

因为是控制层所以类的注解为@Controller

Application类省略了ssm框架的xml文件,他会在当前包以及当前包的子包下面自动寻找加了注解的类

原来写ssm程序需要写一堆的配置文件,现在就用几行代码就能完成

@controller:代表此类是一个控制器

@PostMapping("/bike"):通过post(相对安全的请求找到对应的处理方法)
@ResponseBody:服务器的处理完返回的信息通过此注解返回过去

@RequestBody:拿到post请求发过来的数据

@Controller
public class BikeController {
    
    @PostMapping("/bike")
    @ResponseBody
    public String getById(@RequestBody String bid){
        System.out.println(bid);
        return "success";
    }
}

为了解耦将程序进行分模块开发

controller:接受用户的请求,并且将结果响应给用户

services:做具体的逻辑处理,要定义接口以及接口的实现(为什么要定义接口做动态代理来使用)

dao--mapper:和数据库打交道

bean:实体类

为了解耦(更改一处其他代码不用动):将程序分模块进行设计

面向接口:接口固定了实现类可以任意添加

做事物控制:

Spring层的功能:IOC AOD

IOC:控制反转以前自己new对象现在交给Spring容器

AOD:配置xml

创建BikeController类

通过与springBoot进行结合实现将单车的位置信息、单车状态,单车id存放到mysql数据库中。

  • mongodb数据库讲解

安装

为什么要使用mongodb:

大部分程序传过来的数据都是json的格式,而mongodb就是存储的这种格式的数据

支持集群:可以存储海量数据、

支持分片:方便查找

mongodb数据库介于结构化与非结构化数据之间

表不叫表叫做集合(collectuon)

支持集群、高可用

存储的数据是json数据

不支持SQL语言

关于yum源的解释:

设置本地yum源,如果软件在线安装的话就会很慢,可以配置本地yum源

一台机器下载好了之后所有机器就访问这台机器下载就可以了。这就是配置本地yum源的作用。

#配置mongo的yum源

sodu vi /etc/yum.repos.d/mongodb-org-3.4.repo

[mongodb]

name=MongoDB Repository

baseurl=http://downloads-distro.mongodb.org/repo/redhat/os/x86_64/

gpgcheck=0

enabled=1

#关闭selinux

vi /etc/sysconfig/selinux

SELINUX=disabled

#重新启动

reboot

------------------------------------------------------------------------------------------------

###mongo的安装和基本使用###

------------------------------------------------------------------------------------------------

#在机器上使用xiaoniu用户登录(本地安装给你了rpm包,rpm -ivh *.rpm)

sudo yum install -y mongodb-org

#修改mongo的配置文件

sudo vi /etc/mongod.conf

#注释掉bindIp或者修改成当前机器的某一个ip地址

#启动mongo

sudo service mongod start

#连接到mongo

#如果注释掉了bindIp,那么连接时用

mongo

#指定了ip地址

mongo --host 192.168.10.20 --port 27017

#使用或创建database

use xiaoniu

#创建集合(表)

db.createCollection("bike")

#插入数据

db.bike.insert({"_id": 100001, "status": 1, "desc": "test"})

db.bike.insert({"_id": 100002, "status": 1, "desc": "test"})

#查找数据(所有)

db.bine.find()

#退出

exit

#关闭mongo服务

sudu service mongod stop

#设置服务开机启动

sudo checkconfig mongod on

#设置mongo服务开机不启动

sudo chkconfig mongod off

***************mongodb数据库先进入数据库*****************

如果需要查看数据库以及数据库下的所有表必须先进行输入用户名以及密码

超级管理员可以管理数据库,普通管理员进行管理数据库下的表

------------------------------------------------------------------------------------------------

###mongo安全认证配置###

#如果修改了mongo存储是的目录那么一定要修改该目录的所属用户和组为mongod

在root下安装mongo数据库也需要修改

#chown -R mongod:mongod  /mongo/

------------------------------------------------------------------------------------------------

#添加管理员用户

#使用admin这个database

use admin

#在没有开启认证的情况下,创建一个超级用户

db.createUser(

{

user: "ljq",

pwd: "ljq123",

roles: [ {role: "root", db: "admin" }]

}

)

#修改mongo的配置文件/etc/mongod.conf,配置mongo的安全认证

security:

authorization: enabled

#重启mongo服务

service mongod restart

#重新使用mongo shell连接

mongo

#使用admin database

use admin

#授权登录

db.auth("admin", "admin123")

#创建一个bike数据库

use bike

#添加一个普通用户,具备读写权限

db.createUser(

{

user: "xiaolan",

pwd: "123",

roles: ["readWrite"]

}

)

#使用小牛用户登录

mongo

use bike

db.auth("xiaoniu", "123568")

#在database下创建collection

db.createCollection("users")

db.createCollection("bikes")

基本操作:

#创建集合(表)

db.createCollection("bike")

#插入数据

db.logs.insert( { name: "laozhao", age: 30 } )

#查找

db.users.find()

#multi如果有重复的都进行更新

db.logs.update({'name':'laozhao'},{$set:{'age': 18}},{multi:true})

db.logs.remove({'name': 'laoduan'})

db.logs.remove({'name': 'laoduan'}, 1)

db.logs.find({"name":"laoduan", "fv": 99.99})

#查看当前db的索引(查询起来非常快)

db.users.getIndexes()

#创建索引

1代表排序的时候升序,-1代表排序的时候降序

db.logs.ensureIndex({"name":1})

#删除索引

db.logs.dropIndex({"name":1})

#开启运行用户从其他机器连接mongodb

#不然会报错Caused by: java.net.ConnectException: Connection refused (Connection refused)

#修改/etc/mongod.conf,注释掉bindIp:

#  bindIp:

#重启mongodb

service mongod restart

数据库用户角色:read、readWrite;

数据库管理角色:dbAdmin、dbOwner、userAdmin;

集群管理角色:clusterAdmin、clusterManager、clusterMonitor、hostManager;

备份恢复角色:backup、restore;

所有数据库角色:readAnyDatabase、readWriteAnyDatabase、userAdminAnyDatabase、dbAdminAnyDatabase

超级用户角色:root

// 这里还有几个角色间接或直接提供了系统超级用户的访问(dbOwner 、userAdmin、userAdminAnyDatabase)

内部角色:__system

  • 程序实现将地图上添加单车将单车信息位置信息返回mongodb数据库里面

创建一个按钮:给按钮添加一个事件,当点击的时候将当前的位置信息添加到mongodb数据库中,mongoTemplate的save()方法

将附近的单车信息显示在页面上mongoTemplate的findAll方法

实现当刷新页面的时候将附近车辆的信息显示在页面上

之前添加到mysql中的是需要开锁车的位置信息

  • 单车的后台管理系统,管理投放单车的信息

(1)添加webapp页面信息

(2)因为是管理系统所以创建userController类来实现,类里面转到index页面

  1. 在页面单机单车管理的时候,转到单车管理的界面,同时将mongodb的数据加载到页面上,此功能在web的html的页面实现

在单车的list页面有一个dataTables.js加载一张表

在单车页面的list.js中实现数据的查询加载

同时list.js中有访问findall()的方法

为什么返回的json数据放在了表格里面?

先请求数据,然后将请求的数据追加到表格里面

  • Mongodb中记录logs数据

日志中记录了pv和uv

pv:页面一共被访问多少次

uv:有多少个用户进行访问

日活跃用户是描述一款软件好坏的指标

日活跃用户就是当天的uv的次数

采集日志与我们的操作业务的服务器是分开的互不影响

所以单车的查询以及日志采集系统不能存放在一起

*****************面试的时候可以说我们提前没有考虑到耦合的问题将业务处理与

日志收集的服务放在了一起,把日志的信息放在数据库或者mongodb里面,导致出现耦合问题,业务系统的压力会增大。

日志收集的日志信息:

pv:所有页面的访问次数排序:开锁、解锁、支付

将这些访问log放在log中然后用spark进行分析

这就是页面中的埋点:(将用户的行为记录下来)

在微信中的埋点(触发事件就将数据存储起来)

对于微信小程序来说每个小程序都有一个open的id

现在需要在微信公众号平台来获取自己的AppId以及AppSecret

通过AppId以及AppSecret就能够获取独立的用户id

在index.js中当页面首次加载完成的时候将log记载下来,发送到后台服务器

/**

* 生命周期函数--监听页面初次渲染完成

*/

onReady: function ()

通过怕post请求将数据日志发送到服务器

服务端通过save()方法将数据存储到mongodb里面

  • log日志的pv与uv的分析

json数据通常用sparkSQL来进行分析

下面分别用sparkCore与sparkSQL来进行分析

*******mongodb踩过的坑:*********

yum进行安装:

Could not retrieve mirrorlist http://mirrorlist.repoforge.org/el6/mirrors-rpmforge error was
14: PYCURL ERROR 7 - "couldn't connect to host"
http://apt.sw.be/redhat/el6/en/x86_64/rpmforge/repodata/repomd.xml: [Errno 14] 
PYCURL ERROR 7 - "couldn't connect to host"
Trying other mirror.
Error: Cannot retrieve repository metadata (repomd.xml) for repository: rpmforge. 
Please verify its path and try again

网址访问不了。。。。。。。。。。。。。。。。。。。。。。。。。。。。。

spark2.3.0 mongo版本>3.2     我用的4.0开始用的是2.6.11()

获取sparkConf对象,获得mongo文档count与distinct进行求pv与uv很简单的

  • nginx的安装与使用

业务系统与日志采集系统分开

需要负载均衡服务器

nginx的功能负责请求转发:并不处理请求而是将请求转发给后面的TomCat服务器进行处理

反向代理:

负载均衡:将请求均匀地分发该服务器进行处理,使每台服务器处理请求均衡

那么整个项目的部署就是:

前端发过来的业务数据---------通过nginx来做负载均衡---------分发给Tomcat进行处理------经常变动的数据量比较大的数据放在mongodb集群里面,将量比较小的数据不经常改的放在mysql集群里面。---------比如说员工的信息就放在mysql里面

对于前端发过来的日志信息-------通过nginx来做负载均衡---------flume进行日志收集用flume进行收集(source、channel、sink三个组件)------放在kafka里面-----SparkStreaming进行分析------将结果存放在mysql中。

对于明细数据:比如支付订单等数据则需要放在redis或者hbase中或者mongodb中或者es中进行保存(Spark在处理之前的数据)

将不同的业务,不同的服务以及不同的压力进行不同的部署,比如查找单车部署压力比较大部署5太机器,支付压力比较小部署3台机器。

业务系统以及日志采集部署两个nginx的集群,

日志nginx的集群是怎么实现的呢?

-----------为什么使用channel呢因为source读的太快而sink写的太慢所以要放到channel中进行缓冲一下

现在elasticsearch与nginx都很火的

  1. nginx可以配置权重让请求按比例分发给每台服务器
  2. nginx的特点占用内存少,并发能力强

nginx的安装与使用:

1.上传nginx安装包

2.解压nginx

tar -zxvf nginx-1.12.2.tar.gz -C /usr/local/src/

3.进入到nginx的源码目录

cd /usr/local/src/nginx-1.12.2/

4.预编译

./configure

5.安静gcc编译器

yum -y install gcc pcre-devel openssl openssl-devel

6.然后再执行

./configure

7.编译安装nginx

make && make install

8.启动nginx

sbin/nginx

9.查看nginx进程

ps -ef | grep nginx

netstat -anpt | grep nginx

此时是一个master和一个worker进程,集群的话可以配置多个worker进程

--------------------------------------------------------------------------

将springboot程序部署在多台服务器上,然后启动springboot

java -jar niubike-0.0.1-SNAPSHOT.war >>./logs 2>&1 &

--------------------------------------------------------------------------

修改nginx的配置文件,让nginx实现负载均衡功能

vi /usr/local/nginx/conf/nginx.conf

内容见nginx.conf

nginx -t

重新启动nginx

nginx -s reload

#user  nobody;

#work进程的数量

worker_processes  2;

#error_log  logs/error.log;

#error_log  logs/error.log  notice;

#error_log  logs/error.log  info;

#pid        logs/nginx.pid;

#每一个work同时打开的链接数

events {

worker_connections  1024;

}

http {

include       mime.types;

default_type  application/octet-stream;

#log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '

#                  '$status $body_bytes_sent "$http_referer" '

#                  '"$http_user_agent" "$http_x_forwarded_for"';

#access_log  logs/access.log  main;

sendfile        on;

#tcp_nopush     on;

#keepalive_timeout  0;

keepalive_timeout  65;

#gzip  on;

#上游在这里相当于Tomcat

upstream tomcats {

server had01:8888 weight=1;

server had02:8888 weight=1;

server had03:8888 weight=1;

}

#将这里的页面请求都交给上游处理

server {

listen       80;

#server_name可配可不配

server_name  had01;

location ~ .* {

proxy_pass http://tomcats;

}

}

}

将nginx的数据直接放到kafka里面

安装nginx-kafka插件

1.安装git

yum install -y git

2.切换到/usr/local/src目录,然后将kafka的c客户端源码clone到本地

cd /usr/local/

git clone https://github.com/edenhill/librdkafka

3.进入到librdkafka,然后进行编译

cd librdkafka

yum install -y gcc gcc-c++ pcre-devel zlib-devel

./configure

make && make install

4.安装nginx整合kafka的插件,进入到/usr/local/src,clone nginx整合kafka的源码

cd /usr/local/src

git clone https://github.com/brg-liuwei/ngx_kafka_module

5.进入到nginx的源码包目录下 (编译nginx,然后将将插件同时编译)

cd /usr/local/src/nginx-1.12.2

./configure --add-module=/usr/local/ngx_kafka_module/

make

make install

6.修改nginx的配置文件,详情请查看当前目录的nginx.conf

#user  nobody;

worker_processes  1;

#error_log  logs/error.log;

#error_log  logs/error.log  notice;

#error_log  logs/error.log  info;

#pid        logs/nginx.pid;

events {

worker_connections  1024;

}

http {

include       mime.types;

default_type  application/octet-stream;

#log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '

#                  '$status $body_bytes_sent "$http_referer" '

#                  '"$http_user_agent" "$http_x_forwarded_for"';

#access_log  logs/access.log  main;

sendfile        on;

#tcp_nopush     on;

#keepalive_timeout  0;

keepalive_timeout  65;

#gzip  on;

kafka;

kafka_broker_list had01:9092 had02:9092 had03:9092;

server {

listen       80;

server_name  had02;

#charset koi8-r;

#access_log  logs/host.access.log  main;

#指定80后面的url路径路径以及kafka的主题

location = /kafka/track {

kafka_topic track;

}

location = /kafka/user {

kafka_topic user;

}

#error_page  404              /404.html;

# redirect server error pages to the static page /50x.html

#

error_page   500 502 503 504  /50x.html;

location = /50x.html {

root   html;

}

}

}

7.启动zk和kafka集群(创建topic)

/bigdata/zookeeper-3.4.9/bin/zkServer.sh start

/bigdata/kafka_2.11-0.10.2.1/bin/kafka-server-start.sh -daemon /bigdata/kafka_2.11-0.10.2.1/config/server.properties

kafka-topics.sh --create --zookeeper had01:2181,had02:2181,had03:2181 --replication-factor 3 --partitions 3 --topic track

kafka-topics.sh --list --zookeeper had01:2181,had02:2181,had03:2181

kafka-topics.sh --describe --zookeeper had01:2181,had02:2181,had03:2181 --topic track

8.启动nginx,报错,找不到kafka.so.1的文件

error while loading shared libraries: librdkafka.so.1: cannot open shared object file: No such file or directory

9.加载so库

echo "/usr/local/lib" >> /etc/ld.so.conf

ldconfig

10.测试,向nginx中写入数据,然后观察kafka的消费者能不能消费到数据

curl localhost/kafka/track -d "message send to kafka topic"

curl localhost/kafka/track -d "老赵666"

kafka-console-consumer.sh --bootstrap-server had01:9092,had02:9092,had03:9092 --topic user --from-beginning

  1. 小程序将日志通过nginx发给kafka

在index.js中

var openid = wx.getStorageSync('openid')

//发送request向mongo中添加数据(添加一条文档(json))

wx.request({

//用POST方式请求es可以只指定index和type,不用指定id

url: "http://192.168.10.128/kafka/user",

data: {

time: new Date(),

openid: openid,

lat: lat,

log: log

},

method: "POST"

修改程序发送的路径:完成

总结:

以前我们公司都是怎么搞得都是将nginx打log------日志采集工具-----kafka----HDFS

ETL的过程:数据的抽取、数据的清洗转换、数据的加载过程

现在用nginx----kafka更高效一些

启动的控件:

三台的kafka

微信小程序

had02的nginx

idea中的程序

mongodb

  • flume与kafka讲解
  1. 一个flume中的source   channel  sink不一定都会有 一个source、一个channel是可以得;一个channel与一个sink也是很可以的
  2. flume叫做agent(代理)

为什么要单独定义一个组件:为了灵活的组合

source收集数据、sink将数据写入到不同的数据系统中

为什么会有channel因为如果source读的比较快sink写的,所以要加上channel对数据进行缓冲。

什么时候flume的数据会丢失

当flume所在的nginx系统挂掉重启之后就会丢失---所以需要记载读数据的偏移量,接着偏移量的数据继续读就不会出现丢失数据以及数据的重复读了,因为要记录偏移量所以读取的数据就会效率变低

flume官网:

  • Documentation
  • Flume User Guide
  • Flume Developer Guide

flume分为用户指南以及开发者指南,开发者指南是自己开发一个flume

flume自定义source 、channel、sink

自定义source类的实现:

设计了source的初始化、开始、结束的方法

初始化是加载配置信息

//文件读取路径
private String filePath;
//文件字符集
private String charset;
//文件偏移量位置
private String posiFile;
//文件时间间隔读取一次
private long interval;

开始是创建一个线程池,执行线程不断监听文件

创建FileRunnable这个类将配置信息传过来

先判断偏移量的文件是否存在,不存在就创建一个存放偏移量的文件(文件可以放在redis里面)

从文件中读取偏移量

然后从偏移量的位置读取文件

然后将数据写入到channel

获取新的偏移量

将偏移量写入偏移量文件

执行stop方法的时候,将线程停掉,不要立即停掉当执行完最后的任务的情况下就停掉

#定义agent名, source、channel、sink的名称

a1.sources = r1

a1.channels = c1

a1.sinks = k1

#具体定义source

a1.sources.r1.type = cn.edu360.flume.source.TailFileSource

a1.sources.r1.filePath = /opt/mydata/myflume/filepath/access.txt

a1.sources.r1.posiFile = /opt/mydata/myflume/position/posi.txt

a1.sources.r1.interval = 2000

a1.sources.r1.charset = UTF-8

#具体定义channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

#具体定义sink

a1.sinks.k1.type = file_roll

a1.sinks.k1.sink.directory = /opt/mydata/myflume/resulr

#组装source、channel、sink

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

bin/flume-ng agent -n a1 -f /opt/app/flume-1.8.0/conf/a1.conf -c conf -Dflume.root.logger=INFO,console

文件中追加内容,另一个文件中就会有内容

现在自定义source已经编写完成但是又有一定的问题

自定义的source只能监听一个文件

Memorychannel会出现堆积会出现饱满

FileChannel会太慢

美团有一个解决方案就是将两个进行结合Memorychannel达到一定的阈值就写到FileChannel

但是这种解决方案也不是最好的,接下来就是将TailFileSource作为数据源 kafkachannel 作为缓冲(高可用、高吞吐、高可靠)

fileFileSource-----kafkaChannel-------kafka--------Kafkachannel---------HdfsSink-----hdfs

现在flume就可以有两个组件还有一种解决方案就是将服务器的日志直接通过kafka进行收集

将kafka的数据同步到hdfs中或者是es中的做法:

将kafkachannel从kafka中拉取数据然后放到HDFSSink中-------hdfs上

kafkaCkannel既可以从kafka中读数据又可以往kafka中写数据

首先创建kafka的topic也就是数据源

kafka-topics.sh --create --zookeeper had01:2181,had02:2181,had03:2181 --replication-factor 3 --partitions 3 --topic usertest

kafka-topics.sh --list --zookeeper had01:2181,had02:2181,had03:2181

kafka-topics.sh --describe --zookeeper had01:2181,had02:2181,had03:2181 --topic usertest

kafka-console-consumer.sh --bootstrap-server had01:9092,had02:9092,had03:9092 --topic user --from-beginning

将数据从文件写入kafka----启动一个kafka消费者消费数据

flume----kafkachannel 一定加一个选项

#默认是ture将数据以事件的形式写进去,所以改成false将数据以纯文本的形式写进去

a0.channels.c1.parseAsFlumeEvent = false

自定义拦截器:可以将数据一种形式转换为另一种形式

过滤掉不必要的数据

用的最多的就是正则过滤拦截器

regex Filtering interceptor

程序实现思路:

定义拦截器,拦截器中定义内部类实例化初始化拦截器

内部类先执行构造器。再次执行里面的bulid方法

构造器作用初始化属性,bulid方法的作用实例化拦截器的对象

拦截器里面需要有构造方法

拦截器的intercept方法的实现,将接收过来的内容与配置文件传过来的内容进行组合形成map集合,最后形成一个json对象返回去

多行数据的话重写intercept方法然后循环调用单次的intercept方法

@Override
public List<Event> intercept(List<Event> events) {
    for (Event e: events) {
        intercept(e);
    }
    return events;
}

为什么还是返回的events因为处理后依然是放回原来的集合里面

处理过程:

从event中读取数据进行处理

转换为json形式最后再放到event中传过去

bin/flume-ng agent -n a0 -f /opt/app/flume-1.8.0/conf/a0.conf -c conf -Dflume.root.logger=INFO,console

kafka-console-consumer.sh --bootstrap-server had01:9092,had02:9092,had03:9092 --topic usertest

TailDirSource讲解:

说辞:我们公司开始用的flume1.5还没有TailDirSource 自己写了一个TailFileSource,实现source阶段记录偏移量。后来发现他不能存windows下的数据也不能递归收集文件下的数据我们又改了一下

flume中可以定义多套source、channel、sink

此时第四天内容完成

flume的TailDirSource可以收集一个文件夹下的所有文件,但是不擅长收集递归文件,不不能收集windows中的文件

想让flum进行递归操作

https://github.com/qwurey/flume-source-taildir-recursive

  • 共享单车业务流程

微信平台在什么时候记录你的数据,当你打开app的时候就开始记录你的行为数据了

可以计算app的日活用户、月活用户,还有周活用户、注册用户多长时间使用一次共享单车,一年的什么时间段共享单车使用的次数最多。

共享单车业务流程:

首先你走到一个单车旁边,如果没有注册的话就进行注册

输入手机号,获取验证码,输入验证码点击确认即可完成注册功能

获取验证码的接口获取腾讯云的短信服务接口

充值功能用户用不了模拟了一个充值的功能

实名认证:输入用户的信息

每个用户都有一个程序id

用户信息映射为一个类

状态码,(没有注册,注册没有充值、充值)

身份证:

调用的是公安部认证的接口

充值金额:点确认充值就调用充值的接口

将这些数据提交之后数据库就绑定成功

接着扫码就可以进行扫码功能了(需要更新一下全局变量)

同时车辆的话也要定时上传位置信息

获取验证码的功能,这个验证么需要放在什么数据库里面呢,可以放在redis数据库中,因为redis的数据库可以设置数据有效时间

***** 微信小程序app里面都有全局数据,数据在整个页面上有效

当页面跳转的时候数据就拿不到了,这个时候就要将数据变成全局数据

全局数据的话就要在app.js中配置

拿到全局数据的话getApp().globaData.属性就可以拿到全局数据

当获取status的时候就可以在程序里面判断要跳转到哪个页面,是身份验证页面还是充值压面等

注册成功时候将status=3 将数据更新到数据库此时就可以实现注册完成之后就可以立即进行扫码操作

小程序访问的协议是什么协议https协议,并且url里面不能有端口号

实现流程:

首先:当点击扫描二维码图片时候转到用户注册页面

vxss:存放控件的样式

wxml:存放页面结构

js:存放处理逻辑

在wxml中输入手机号

点击获取验证码按钮就将用户填写的phoneNumber获得到

接着通过post请求转到服务端页面,将城市id以及验证码传过去

短信验证码的实现:

接着我这边随机生成一个四位数的短信验证码,同时保存到redis数据库一份并设计有效时长

调用发短信的功能,将验证码发给你的手机上,你输入的验证码再与数据库里面的验证码进行匹配,通过后则进行下一步

post请求http://localhost:8888/verify

设计程序:

json_jdk1.7.jar与smssdk.jar

在file-----project structor----libraries添加依赖包

将验证码与手机号,status=1,以及openid(用户的身份认证)

处理逻辑:

redis端通过phoneNumber查找验证码与发过来的验证码进行比较如果相等的话将用户的信息保存起来

处理代码:

String appkey = "f7ca2946a5e51febe3a92dd2cb53fff1";

//stringRedisTemplate.opsForValue().get("appkey");
//redisTemplate.
//调用腾讯云的短信接口(短信的appid, 短信的appkey,1400047183短信的appid )
SmsSingleSender singleSender = new SmsSingleSender(1400177814, appkey);
//普通单发
//String code = "8888";
String code =  (int)((Math.random() * 9 + 1) * 1000) + "";
//调用发送短信功能
singleSender.send(0, nationCode, phoneNum, "您的登录验证码为" + code, "", "");
//将数据保存到redis中,redis的key手机号,value是验证码,有效时长120秒
stringRedisTemplate.opsForValue().set(phoneNum, code, 120, TimeUnit.SECONDS);

进入充值押金的页面:交完押金通过手机号更新status以及增减押金值

切换到实名认证的页面

将用户输入的姓名,身份证号提交到"http://localhost:8888/identify",

更新status状态并且增加姓名身份证号的信息

指标的实现:

现在想实时的知道,有多少个用户充值、总的金额是多少,平均每个用户多少钱

想知道北京市的用户:充值的时候从哪个区进行充值的,加大车辆投入以及运行人员来维护,山东省成交多少单

首先GPS获取位置信息然后,通过访问腾旭地图的接口获取地理位置信息。在前端就将省市县信息获取到

书写充值功能:

点击红色的小红包进入充值页面,调用后台方法就可以将mongodb的users集合中插入数据。充值的数据

用user实体类加注解的方式就可以将数据存入指定的集合

@Document(collection="users")
public class User {

相应的方法:recharge() 更新数据库中的数据

db.users.insert({"_id": "o0OyK5SIN5RgRKEC2gkpOrebhQHg", "_class":"cn.edu360.bike.pojo.User", "status": 3,})

充值log分析

****现在开始做log:现在就把 人、冲了多少钱、在什么地方充值的、充值时间是多少,将这些信息取到发送给nginx

实现:pay.js页面

wx.request({

url: "http://192.168.100.106/kafka/recharge",

data: {

openid: openid,

phoneNum: phoneNum,

amount: amount,

date: new Date(),

lat: lat,

log: log,

},

https://lbs.qq.com/solution/share.html(共享单车指标分析)

进入网站:“逆地址解析将坐标”解析成城市信息(腾讯位置服务)

https://lbs.qq.com/qqmap_wx_jssdk/index.html

申请位置服务的key:4KCBZ-WCJC5-ACFIT-QRZQL-I4LZO-4EFPZ

qqmapsdk.reverseGeocoder这个方法进行解析

将充值信息通过nginx发送给kafka

****************注意nginx安装了kafka的插件ngx_kafka_module(吹牛逼用)*************

nginx方案解决:

将nginx的数据一份用来做实时处理,一份用来做离线数据保存,我们怎么来解决这个问题呢?

方案一、

对于离线数据的话从nginx中-----------flume--------hdfs

对于实时的数据--------nginx------kafka------实时计算------hbase

以上方案的话nginx的压力很大,所以需要解耦

方案二、

微信小程序-------nginx------kafka

kafka----实时进行处理

kafka-----flume------hdfs-----离线数据

flume用到的组件是kafkachannel与hdfssink

那么flume的组件的吞吐量不是很高就需要配置多台flume进行日志采集

但是多台flume进行日志采集的话又有可能进行重复消费,这样的话就将flume放在同一个组里面就可以解决问题

由于kafka的三高特点所以可以减少nginx的压力

kafkachannel:

http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#kafka-channel

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannela1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092a1.channels.channel1.kafka.topic = channel1a1.channels.channel1.kafka.consumer.group.id = flume-consumer

hdfssink

a1.channels = c1

a1.sinks = k1

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S

a1.sinks.k1.hdfs.filePrefix = events-

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

现在实现的功能是将数据收集到hdfs里面

微信小程序指定nginx的路径

wx.request({

url: "http://192.168.10.128/kafka/track",

method: "POST",

data: {

openid: openid,

phoneNum: phoneNum,

amount: amount,

date: new Date(),

lat: lat,

log: log,

},

nginx的配置

#user  nobody;

worker_processes  1;

#error_log  logs/error.log;

#error_log  logs/error.log  notice;

#error_log  logs/error.log  info;

#pid        logs/nginx.pid;

events {

worker_connections  1024;

}

http {

include       mime.types;

default_type  application/octet-stream;

#log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '

#                  '$status $body_bytes_sent "$http_referer" '

#                  '"$http_user_agent" "$http_x_forwarded_for"';

#access_log  logs/access.log  main;

sendfile        on;

#tcp_nopush     on;

#keepalive_timeout  0;

keepalive_timeout  65;

#gzip  on;

kafka;

kafka_broker_list had01:9092 had02:9092 had03:9092;

server {

listen       80;

server_name  had02;

#charset koi8-r;

#access_log  logs/host.access.log  main;

#指定80后面的url路径路径以及kafka的主题

location = /kafka/track {

kafka_topic track;

}

location = /kafka/user {

kafka_topic user;

}

#error_page  404              /404.html;

# redirect server error pages to the static page /50x.html

#

error_page   500 502 503 504  /50x.html;

location = /50x.html {

root   html;

}

}

}

现在数据到达kafka的topic里面

将topic的数据放在hdfs里面flume配置文件的配置

kafka-hdfs.conf

#

#定义agent名和channel、sink的名称

a1.channels = c1

a1.sinks = k1

#定义kafka-channel

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c1.parseAsFlumeEvent = false

a1.channels.c1.kafka.bootstrap.servers = node-1:9092,node-2:9092,node-3:9092

a1.channels.c1.kafka.topic = gamelog

#制定一个组多个flume进行消费数据

a1.channels.c1.kafka.consumer.group.id = g100

#具体定义sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://ns1/gamelog/%Y%m%d

a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.sinks.k1.hdfs.filePrefix = log-

a1.sinks.k1.hdfs.fileType = DataStream

#不按照条数生成文件

a1.sinks.k1.hdfs.rollCount = 0

#HDFS上的文件达到128M时生成一个文件

a1.sinks.k1.hdfs.rollSize = 134217728

#HDFS上的文件达到60秒生成一个文件

a1.sinks.k1.hdfs.rollInterval = 60

a1.sinks.k1.channel = c1

bin/flume-ng agent -n a1 -f /opt/app/flume-1.8.0/conf/kafka-hdfs.conf -c conf -Dflume.root.logger=INFO,console

此时由于flume缺少hdfs的jar包所以要将hdfs的依赖包放入到flume的lib下面

commons-configuration-1.6.jar

commons-io-2.4.jar

hadoop-auth-2.7.3.jar

hadoop-common-2.7.3.jar

hadoop-hdfs-2.7.3.jar

htrace-core-3.1.0-incubating.jar

优化:

将json的文件的格式转换为parquet的格式类型

在采集的时候将文件转换为parquet的格式

现在有两种方案:

将数据采集到hdfs中再将诗句通过SparkStreaming进行转换

另一种方式就是通过kafka将数据实时的转换写到SparkStreaming中

你们做了哪些指标?

数据处理的流程?

数据怎么来的?

各种终端设备产生的日志(用户点触发了特定的事件,然后在对应的事件进程埋点(记log),就是讲触发事件是产生的数据发送给日志采集服务器,是一个Nginx,可以将数据写入到kafka中,如果是离线,在将数据同步到HDFS中)

有哪些关键字段?

时间、地点(省、市、区)、金额、app类型(IOS、Android、微信小程序)在收集埋点的时候进行记录

怎么处理的?

数据的预处理(清洗 -> 转换(json -> parquet)) sparksql(离线) sparkSteaming(实时)

计算完保存到哪里?

聚合后的数据保存在关系数据库中(集群)、明细数据保存到Hbase、存储到mongo、Elasticsearch中

保存到hbase里面的话就涉及到rowkey的书写,用户id+时间+订单id

在就把 人、冲了多少钱、在什么地方充值的、充值时间是多少,将这些信息取到发送给nginx

openid: openid,

phoneNum: phoneNum,

amount: amount,

date: new Date(),

lat: lat,

log: log,

今天收集的用户充值的数据可以计算出哪些指标

实时指标:

1.当天截止到当前时间的充值中金额

2.各个省、市的充值金额

3.各个终端类型的充值金额

4.充值的成功率

5.充值的渠道

6.充值笔数

7.省、市的充值笔数

离线指标

1.某一天的充值金额

2.各个省、市的充值金额

3.各个终端类型的充值金额

4.充值的成功率

5.充值的渠道

6.充值笔数

7.省、市的充值笔数

-----------------------------------------------------------------------------------------------

收集用户的报修事件的日志

数据怎么来的?各种终端设备产生的日志,发现单车不能使用,然后点击报修按钮、智能锁上报车辆状态

有哪些关键字段?时间、地点(省、市、区)、用户id、车辆id、原因(部件)、app类型

怎么处理的?数据的预处理(清洗 -> 转换(json -> parquet)) sparksql

计算完保存到哪里?聚合后的数据保存在关系数据库中(集群)、明细数据保存到Hbase、存储到mongo、Elasticsearch中

能计算哪些指标

1.报修次数

2.报修的区域

3.损坏的部件

----------------------------------------------------------------

活动参与(程序中的广告)

1.活动的点击次数

2.活动的参与次数

3.参会活动积分兑换

4.哪些用户对哪些活动感兴趣

  • 查找附近单车与redis集群的实现

查找附近的单车,不可能将所有的单车都查出来,会增加服务器的压力,等值查找也不可能实现因为等值查找的话所在位置不等于单车所在的位置

GeoHash算法的解释:

这就涉及到一个算法GeoHash算法,可以实现查找附近的人的功能

在没有这个算法之前,我们可以用最笨的算法就是,根据经纬度计算距离,然后划定一个阈值,只要小于该阈值就算是附近的人,这种算法对于数据量小的基本没有问题,但是数据量非常大的话,服务器就会进行大量的计算,服务器负担很重,一个比较好的算法就是利用GeoHash算法进行计算。

一、简介

GeoHash是一种地址编码方法。他能够把二维的空间经纬度数据编码成一个字符串。GeoHash具有以下特点:

1、GeoHash用一个字符串表示经度和纬度两个坐标。在数据库中可以实现在一列上应用索引

2、GeoHash表示的并不是一个点,而是一个区域;

3、GeoHash编码的前缀可以表示更大的区域。例如wx4g0ec1,它的前缀wx4g0e表示包含编码wx4g0ec1在内的更大范围。 这个特性可以用于附近地点搜索

二、计算方法:

GeoHash的计算过程分为三步:

1、将经纬度转换成二进制:

比如这样一个点(39.923201, 116.390705)

纬度的范围是(-90,90),其中间值为0。对于纬度39.923201,在区间(0,90)中,因此得到一个1;(0,90)区间的中间值为45度,纬度39.923201小于45,因此得到一个0,依次计算下去,即可得到纬度的二进制表示,如下表:

最后得到纬度的二进制表示为:

10111000110001111001

同理可以得到经度116.390705的二进制表示为:

11010010110001000100

2、合并纬度、经度的二进制:

合并方法是将经度、纬度二进制按照奇偶位合并:(先经度后纬度)

11100 11101 00100 01111 00000 01101 01011 00001

3、按照Base32进行编码:

Base32编码表(其中一种):

将上述合并后二进制编码后结果为:

wx4g0ec1

三、GeoHash的精度

编码越长,表示的范围越小,位置也越精确。因此我们就可以通过比较GeoHash匹配的位数来判断两个点之间的大概距离。

四、不足之处及解决方法

1、边缘附近的点,黄色的点要比黑色的点更加靠近红点,但是由于黑点跟红点的GeoHash前缀匹配数目更多,因此得到黑点更加靠近红点的结果

解决方法:

可以通过筛选周围8个区域内的所有点,然后计算距离得到满足条件结果。

2、因为现有的GeoHash算法使用的是Peano空间填充曲线(可感兴趣的可自己查看),这种曲线会产生突变,造成了编码虽然相似但距离可能相差很大的问题,因此在查询附近的时候,首先筛选GeoHash编码相似的点,然后进行实际距离计算。

mongodb也支持geoHash的算法

mongodb创建2dsphere索引实现geoHash的支持

db.collection.createIndex({<>: ”2dsphere”})

springboot将实体类映射为mongodb集合的时候,会在实体类上加一些注解,比如说是,给集合加索引、加主键、将实体类映射成为数据库的哪一张集合,都可以通过注解的形式实现。

@Document(collection="users")
public class User {

@Id
   private String id;

private int status;

//这个字段创建索引
   @Indexed(unique = true)
   private String phoneNum;

通过注解的方式将单车数据保存到bikes数据库中

将经纬度放在double的集合中用来说明经纬度是GeoHash类型的索引

@GeoSpatialIndexed(type = GeoSpatialIndexType.GEO_2DSPHERE)
private double[] location;

微信小程序段

data: {

id: bikeNo,

//使用mongo的geo,数据是格式是[经度,纬度]

location: [log, lat]

},

如果数据库中的集合不存在的话就会自动创建(用到springboot的JPA技术)

此时:地理位置字段,里面保存着经纬度,这个字段建立索引,那么一查找就快了,可以将地理坐标使用GeoHash转换一个字符窜,然后进行查找

那么数据库中的存储格式就是包含经纬度信息的集合

查找车辆的实现:

现在查找车辆不是查找所有的车了,而是将车辆的经度纬度传过去,然后根据经度纬度查找匹配的附近车辆

所以findNear(double longitude, double latitude)方法中要传过来数据的经纬度

service实现类的实现

@Override
public GeoResults<Bike> findNear(double longitude, double latitude) {
   //查找附件500米的未使用的单车,要求只显示最近的10辆,创建附近位置的类,并添加属性
   NearQuery nearQuery = NearQuery.near(longitude, latitude, Metrics.KILOMETERS);
   //向类中设置属性值
   nearQuery.maxDistance(0.2).query(new Query().addCriteria(Criteria.where("status").is(0)).limit(10));
       //根据类获得符合查询条件的数据并封装成bike集合
   GeoResults<Bike> bikes = mongoTemplate.geoNear(nearQuery, Bike.class);
   return bikes;
}

前端迭代经纬度信息显示出车辆

当位置移动的时候也会显示出车辆的位置信息也会随之改变

regionchange(e) { 方法中也要调用下面的函数

function findBikes(that, log, lat) {

//请求后端数据

wx.request({

url: "http://localhost:8888/bikes",

到此位置查找附近单车就已经实现了

首先在mongodb里面建立geo的索引,然后实现调用相关方法将附近的车辆查找出来

加载页面的时候查找附近的单车,位置改变的时候查找附近放入单车

关键字 mongodb 建索引 注解

  • redis集群的搭建:

redis集群解决的问题

读写分离

单节点故障

可扩展性

数据分片存储,就会将一份数据放在多台机器上:

那么将大量的数据放到那台机器上呢,这就涉及到一致性hash的算法,现在有5台机器,一致性hash会模拟出7台机器,然后莫除以七向按环分布,环再决定放在那台机器上。

解决完分片存储,一台机器挂掉之后导致部分数据丢失的问题:保障不丢失数据就涉及到了副本机制

Redis3集群安装

什么是Redis

Redis是目前一个非常优秀的key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set有序集合)和hash(哈希类型)。

为什么要安装Redis3集群

Redis3.x支持集群模式,更加可靠!

安装Redis3集群(6台Linux)

参考文章:http://blog.csdn.net/myrainblues/article/details/25881535

  1. 下载redis3的稳定版本,下载地址http://download.redis.io/releases/redis-3.2.10.tar.gz
  2. 上传redis-3.2.10.tar.gz到服务器
  3. 解压redis源码包

tar -zxvf redis-3.2.10.tar.gz -C /usr/local/src/

  1. 进入到源码包中,编译并安装redis

cd /usr/local/src/redis-3.2.10/

make && make install

  1. 报错,缺少依赖的包
  1. 配置本地YUM源并安装redis依赖的rpm包

yum -y install gcc

  1. 编译并安装

make && make install

  1. 报错,原因是没有安装jemalloc内存分配器,可以安装jemalloc或直接输入
  1. 重新编译安装

make MALLOC=libc && make install

  1. 用同样的方式在其他的机器上编译安装redis
  2. 在所有机器的/usr/local/下创建一个redis目录,然后拷贝redis自带的配置文件redis.conf到/usr/local/redis

mkdir /usr/local/redis

cp /usr/local/src/redis-3.2.10/redis.conf /usr/local/redis

  1. 修改所有机器的配置文件redis.conf

daemonize yes  #redis后台运行

cluster-enabled yes  #开启集群把注释去掉

appendonly yes  #开启aof日志,它会每次写操作都记录一条日志

查找替换命令:将配置文件中的谋和属性进行查找替换

sed -i 's/daemonize no/daemonize yes/' /usr/local/redis/redis.conf

替换为eth0的ip地址$HOST是将每一台机器上的ip地址映射为一个变量,也可以每台机器手动改执行以下语句绑定变量

HOST=`ifconfig eth1 | grep "inet addr" | awk -F : '{print $2}' | awk '{print $1}'`

sed -i "s/bind 127.0.0.1/ bind $HOST/" /usr/local/redis/redis.conf

打开集群模式

sed -i 's/# cluster-enabled yes/cluster-enabled yes/' /usr/local/redis/redis.conf

将数据同步到磁盘,不然只有内存中有数据

sed -i 's/appendonly no/appendonly yes/' /usr/local/redis/redis.conf

集群请求超时时间

sed -i 's/# cluster-node-timeout 15000/cluster-node-timeout 5000/' /usr/local/redis/redis.conf

  1. 启动所有的redis节点

cd /usr/local/redis

redis-server redis.conf

  1. 查看redis进程状态

ps -ef | grep redis

  1. 只要在一台机器上安装即可)配置集群:安装ruby和ruby gem工具(redis3集群配置   需要ruby的gem工具,类似yum)

yum -y install ruby rubygems

参考网站:https://blog.csdn.net/huwh_/article/details/79242625

  1. 使用gem下载redis集群的配置脚本

gem install redis

ruby --version

  1. 安装RVM

curl -sSL https://rvm.io/mpapis.asc | gpg2 --import -

curl -L get.rvm.io | bash -s stable

如果遇到以下报错,则执行报错中的gpg2 --recv-keys的命令。

gpg2 --recv-keys 409B6B1796C275462A1703113804BB82D39DC0E3

再次执行:

curl -L get.rvm.io | bash -s stable

source /usr/local/rvm/scripts/rvm

查看ruby版本

rvm list known

、升级Ruby

#安装ruby

rvm install  2.4.0#使用新版本

rvm use  2.4.0#移除旧版本

rvm remove 2.0.0#查看当前版本

ruby --version

#用ruby的工具安装reids

gem install redis

  1. 使用脚本配置redis集群

*****************我的redis就安装到这一步,

cd /usr/local/src/redis-3.2.10/src/

#service iptables stop

#在第一机器上执行下面的命令

./redis-trib.rb create --replicas 1 192.168.10.101:6379 192.168.10.102:6379 192.168.10.103:6379 192.168.10.104:6379 192.168.10.105:6379 192.168.10.106:6379

  1. 测试(别忘加-c参数,启动集群模式)

redis-cli -c -h 192.168.1.13

set命令存放一下数据,会看到存到不同的ip地址中

java程序测试连接:

package cn

import java.util

import redis.clients.jedis.{JedisCluster, _}

import scala.collection.mutable

/**

* Created by zx on 2017/10/30.

*/

object JedisConnectPool {

private val config: JedisPoolConfig = new JedisPoolConfig()

//最大连接数

config.setMaxTotal(20)

//最大空闲连接数

config.setMaxIdle(10)

//当调用borrow Object方法时,是否进行有效性检查 -->

config.setTestOnBorrow(true)

//private val pool: JedisPool = new JedisPool(config, "192.168.100.101", 6379, 10000, "")

val jedisClusterNodes = new util.HashSet[HostAndPort]()

//Jedis Cluster will attempt to discover cluster nodes automatically

jedisClusterNodes.add(new HostAndPort("192.168.100.101", 6379))

jedisClusterNodes.add(new HostAndPort("192.168.100.102", 6379))

jedisClusterNodes.add(new HostAndPort("192.168.100.103", 6379))

jedisClusterNodes.add(new HostAndPort("192.168.100.104", 6379))

jedisClusterNodes.add(new HostAndPort("192.168.100.105", 6379))

jedisClusterNodes.add(new HostAndPort("192.168.100.106", 6379))

val jedisCluster = new JedisCluster(jedisClusterNodes)

def main(args: Array[String]): Unit = {

val str = jedisCluster.get("abc123")

println(str)

}

}

Redis3伪分布式安装(1台Linux)

  1. 下载redis3的稳定版本,下载地址http://download.redis.io/releases/redis-3.2.10.tar.gz
  2. 上传redis-3.2.10.tar.gz到服务器

3.解压redis源码包

tar -zxvf redis-3.2.10.tar.gz -C /usr/local/src/

4.进入到源码包中,编译并安装redis

cd /usr/local/src/redis-3.2.10/

make && make install

5.在/usr/local/下创建一个redis目录,然后分别在/usr/local/redis目录创建6个文件夹7000,7001,7002,7003,7004,7005然后拷贝redis自带的配置文件redis.conf到这六个目录中

mkdir /usr/local/redis

mkdir /usr/local/redis/{7000,7001,7002,7003,7004,7005}

cp /usr/local/src/redis-3.2.10/redis.conf /usr/local/redis/7000

cp /usr/local/src/redis-3.2.10/redis.conf /usr/local/redis/7001

cp /usr/local/src/redis-3.2.10/redis.conf /usr/local/redis/7002

cp /usr/local/src/redis-3.2.10/redis.conf /usr/local/redis/7003

cp /usr/local/src/redis-3.2.10/redis.conf /usr/local/redis/7004

cp /usr/local/src/redis-3.2.10/redis.conf /usr/local/redis/7005

6.分别修改这六个目录中的配置文件

port 7000 #端口要与其所在的文件名一致

pidfile /var/run/redis-7000.pid  #pid要与其所在的文件名一致

daemonize yes

cluster-enabled yes

appendonly yes

  1. 分别进入到这六个目录启动redis进程

cd /usr/local/redis/7000

redis-server redis.conf

cd /usr/local/redis/7001

redis-server redis.conf

cd /usr/local/redis/7002

redis-server redis.conf

cd /usr/local/redis/7003

redis-server redis.conf

cd /usr/local/redis/7004

redis-server redis.conf

cd /usr/local/redis/7005

redis-server redis.conf

HOST=`ifconfig eth1 | grep "inet addr" | awk -F : '{print $2}' | awk '{print $1}'`

sed -i "s/bind 127.0.0.1/bind $HOST/" /usr/local/redis/redis.conf

redis-server /usr/local/redis/redis.conf

config set masterauth xiaoniu

config set requirepass xiaoniu

config rewrite

redis-cli -h 192.168.100.101 -a xiaoniu shutdown

  • 报修功能的实现:

手动输入车的编号或者再一次扫码获得车的编号。

点击报修之后就把信息写到业务系统一份更改车辆状态,然后写到日志系统一份

请求带了车辆编号、地理位置、什么地方坏了、那个用户提交的、将日志收集到kafka里面然后再做离线分析以及实时进行分析

报修页面有,check按钮的监听事件

报修的指标分析,这一天又多少车进行保修了,损坏的车都有哪些省市县、离线分析报修都有哪些的部件容易坏掉。在哪个时段报修量比较大

向腾讯api发送请求就可以知道具体的地理位置

  • 骑行功能的实现

扫描车辆的二维码,根据业务系统到数据库查询这辆车,

关于车锁:

三代锁的实现:

首先手机扫码,系统查找车辆,查找对应的锁,然后系统向锁发送短信

上报地理位置走的是无线网络

开锁完成之后日志系统记日志

点击结束骑行,将车辆的状态信息进行更新,地理位置进行更新

  • mongodb数据库集群搭建

mongodb也是主从结构:主是写数据,从能够读数据

主从架构依然满足不了数据量大的问题,所以要做分片处理,增加副本机制

来数据的时候还需要根据数据的key决定放在哪个shard里面

所以要进行一次性Hash,所以还要准备两台服务器

这两台服务器叫做mongosdb

保存机器的配置还需要有一台服务器configServer

所有的机器都需要向configServer进行注册,由configServer向mongosdb中进行注册

mongosdb与configServer之间、configServer与其他的

问题你们mongodb是怎么配置的:分片+副本集+读写分离

服务器可以一主两从进行设计

由于机器不够就将设置为一主两从进行设计

在生产环境下都是使用root用户进行安装的

*******注意:以下是3.x的配置,4.x的配置需要进行配置文件port下需要加上bindIP xxxxx***************

###【在多台机器上执行下面的命令

#在所有创建一个xiaoniu普通用户:

useradd xiaoniu

#为xiaoniu用户添加密码:

echo 123456 | passwd --stdin xiaoniu

#将xiaoniu添加到sudoers

echo "xiaoniu ALL = (root) NOPASSWD:ALL" | tee /etc/sudoers.d/xiaoniu

chmod 0440 /etc/sudoers.d/xiaoniu

#解决sudo: sorry, you must have a tty to run sudo问题,在/etc/sudoer注释掉 Default requiretty 一行

sudo sed -i 's/Defaults    requiretty/Defaults:xiaoniu !requiretty/' /etc/sudoers

#创建一个mongo目录

mkdir /mongo

#给相应的目录添加权限

chown -R xiaoniu:xiaoniu /mongo

#配置mongo的yum源

cat >> /etc/yum.repos.d/mongodb-org-3.4.repo << EOF

[mongodb-org-3.4]

name=MongoDB Repository

baseurl=https://repo.mongodb.org/yum/redhat/\$releasever/mongodb-org/3.4/x86_64/

gpgcheck=1

enabled=1

gpgkey=https://www.mongodb.org/static/pgp/server-3.4.asc

EOF

#关闭selinux

sed -i 's/SELINUX=enforcing/SELINUX=disabled/' /etc/selinux/config

每一台都绑定自己的ip地址

#重新启动

reboot

------------------------------------------------------------------------------------------------

------------------------------------------------------------------------------------------------

#分别在多台机器上使用xiaoniu用户登录

sudo yum install -y mongodb-org

#继续角色信息

node-1        node-2          node-3

mongos        mongos          mongos        路由服务器,寻址

config        config          config        配置服务器,保存配置

shard1(主)    shard2(主)     shard3(主)   分片:保存数据

shard2        shard3          shard1   副本集:备份数据,可以配置读写分离(主负责写,从负责同步数据和读)

shard3        shard1          shard2

#分别在多台机器上创建mongo config server对应的目录

mkdir -p /mongo/config/{log,data,run}

#分别在多台机器上修改config server的配置文件

cat >> /mongo/config/mongod.conf << EOF

systemLog:

destination: file

logAppend: true

path: /mongo/config/log/mongod.log

storage:

dbPath: /mongo/config/data

journal:

enabled: true

processManagement:

fork: true

pidFilePath: /mongo/config/run/mongod.pid

net:

port: 27100

replication:

replSetName: config

sharding:

clusterRole: configsvr

EOF

# clusterRole: configsvr这个配置是固定的

#【重要】启动所有的mongo config server服务

mongod --config /mongo/config/mongod.conf

#登录任意一台配置服务器,初始化配置副本集

mongo --port 27100

#创建配置

config = {

_id : "config",

members : [

{_id : 0, host : "192.168.1.11:27100" },

{_id : 1, host : "192.168.1.12:27100" },

{_id : 2, host : "192.168.1.13:27100" }

]

}

#初始化副本集配置

rs.initiate(config)

#查看分区状态

rs.status()

#注意:其中,"_id" : "config"对应配置文件中配置的 replicaction.replSetName 一致,"members" 中的 "host" 为三个节点的ip和port

------------------------------------------------------------------------------------------------

------------------------------------------------------------------------------------------------

#配置第一个分片和副本集

#修改mongo shard1 server的配置文件

mkdir -p /mongo/shard1/{log,data,run}

#分别在多台机器上修改shard1 server的配置文件

cat >> /mongo/shard1/mongod.conf << EOF

systemLog:

destination: file

logAppend: true

path: /mongo/shard1/log/mongod.log

storage:

dbPath: /mongo/shard1/data

journal:

enabled: true

processManagement:

fork: true

pidFilePath: /mongo/shard1/run/mongod.pid

net:

port: 27001

replication:

replSetName: shard1

sharding:

clusterRole: shardsvr

EOF

#启动所有的shard1 server

mongod --config /mongo/shard1/mongod.conf

#登陆任意一台shard1服务器(希望哪一台机器是主,就登录到那一台机器上),初始化副本集

mongo --port 27001

#使用admin数据库

use admin

#定义副本集配置

config = {

_id : "shard1",

members : [

{_id : 0, host : "192.168.1.11:27001" },

{_id : 1, host : "192.168.1.12:27001" },

{_id : 2, host : "192.168.1.13:27001" }

]

}

#初始化副本集配置

rs.initiate(config);

#查看分区状态

rs.status()

------------------------------------------------------------------------------------------------

------------------------------------------------------------------------------------------------

#配置第二个分片和副本集

#修改mongo shard2 server的配置文件

mkdir -p /mongo/shard2/{log,data,run}

#分别在多台机器上修改shard2 server的配置文件

cat >> /mongo/shard2/mongod.conf << EOF

systemLog:

destination: file

logAppend: true

path: /mongo/shard2/log/mongod.log

storage:

dbPath: /mongo/shard2/data

journal:

enabled: true

processManagement:

fork: true

pidFilePath: /mongo/shard2/run/mongod.pid

net:

port: 27002

replication:

replSetName: shard2

sharding:

clusterRole: shardsvr

EOF

#启动所有的shard2 server

mongod --config /mongo/shard2/mongod.conf

#登陆(node2)的shard2服务器,初始化副本集

mongo --port 27002

#使用admin数据库

use admin

#定义副本集配置

config = {

_id : "shard2",

members : [

{_id : 0, host : "192.168.1.12:27002" },

{_id : 1, host : "192.168.1.13:27002" },

{_id : 2, host : "192.168.1.11:27002" }

]

}

#初始化副本集配置

rs.initiate(config)

#查看分区状态

rs.status()

------------------------------------------------------------------------------------------------

------------------------------------------------------------------------------------------------

#配置第三个分片和副本集

#修改mongo shard3 server的配置文件

mkdir -p /mongo/shard3/{log,data,run}

#分别在多台机器上修改shard3 server的配置文件

cat >> /mongo/shard3/mongod.conf << EOF

systemLog:

destination: file

logAppend: true

path: /mongo/shard3/log/mongod.log

storage:

dbPath: /mongo/shard3/data

journal:

enabled: true

processManagement:

fork: true

pidFilePath: /mongo/shard3/run/mongod.pid

net:

port: 27003

replication:

replSetName: shard3

sharding:

clusterRole: shardsvr

EOF

#启动所有的shard3 server

mongod --config /mongo/shard3/mongod.conf

#登陆node-1上的shard3服务器,初始化副本集

mongo --port 27003

#使用admin数据库

use admin

#定义副本集配置

config = {

_id : "shard3",

members : [

{_id : 0, host : "192.168.1.13:27003" },

{_id : 1, host : "192.168.1.11:27003" },

{_id : 2, host : "192.168.1.12:27003" }

]

}

#初始化副本集配置

rs.initiate(config)

#查看分区状态

rs.status()

------------------------------------------------------------------------------------------------

------------------------------------------------------------------------------------------------

------------------------------------------------------------------------------------------------

######注意:启动mongos是守候进程是因为/mongo/mongos/mongod.conf缺少了fork: true这个配置#######

------------------------------------------------------------------------------------------------

mkdir -p /mongo/mongos/{log,data,run}

#添加mongs的配置文件

cat >> /mongo/mongos/mongod.conf << EOF

systemLog:

destination: file

logAppend: true

path: /mongo/mongos/log/mongod.log

processManagement:

fork: true

pidFilePath: /mongo/mongos/run/mongod.pid

net:

port: 27200

sharding:

configDB: config/192.168.1.111:27100,192.168.1.12:27100,192.168.1.13:27100

EOF

#注意,这里configDB后面的config要与配置服务器的_id保持一致

#启动路由服务器

mongos --config /mongo/mongos/mongod.conf

#登录其中的一台路由节点,手动启用分片

mongo --port 27200

#添加分片到mongos

sh.addShard("shard1/192.168.1.11:27001,192.168.1.12:27001,192.168.1.13:27001")

sh.addShard("shard2/192.168.1.12:27002,192.168.1.13:27002,192.168.1.11:27002")

sh.addShard("shard3/192.168.1.13:27003,192.168.1.11:27003,192.168.1.12:27003")

#设置slave可读(在命令行中生效一次),如果配置从接到可读,那么是连接客户端指定的

rs.slaveOk()

------------------------------------------------------------------------------------------------

####没有分片是因为没有开启分片规则####################

------------------------------------------------------------------------------------------------

#创建mobike数据库

use admin

#创建mobike数据库

#对bikes这个数据库开启分片功能

db.runCommand({"enablesharding":"mobike"})

#创建bikes集合

db.createCollection("bikes")

##对bike数据库下的users集合按id的hash进行分片

db.runCommand({"shardcollection":"mobike.bikes","key":{_id:'hashed'}})

db.runCommand({"shardcollection":"xiaolan.bike","key":{_id:'hashed'}})

db.bike.insert({"bikeNo":"1001"})

db.bike.insert({"bikeNo":"1002"})

db.bike.insert({"bikeNo":"1003"})

db.bike.insert({"bikeNo":"1004"})

db.bike.insert({"bikeNo":"1005"})

db.bike.insert({"bikeNo":"1006"})

第一台机器:mongo --host 192.168.10.20 --port 27001

第三台机器:mongo --host 192.168.10.129 --port 27003

设置副本可读

rs.slaveOK()

所有机器下执行

#启动所有的config server

mongod --config /mongo/config/mongod.conf

#启动所有的shard1

mongod --config /mongo/shard1/mongod.conf

#启动所有的shard2

mongod --config /mongo/shard2/mongod.conf

#启动所有的shard3

mongod --config /mongo/shard3/mongod.conf

#启动所有的mongos

mongos --config /mongo/mongos/mongod.conf

#关闭服务

mongod --shutdown --dbpath /mongo/shard3/data

mongod --shutdown --dbpath /mongo/shard2/data

mongod --shutdown --dbpath /mongo/shard1/data

mongod --shutdown --dbpath /mongo/config/data

每一种数据库都有自己的特点

key---value----------------redis

json类型------------------mongodb

根据某一个条件查找非常快-----------hbase

订单信息存到hbase中比较好

mariaDB是mysql的一个分支,作者女儿的名字

hadoop是作者女儿玩具的名字

十六.mycat的安装与使用

mycat数据库中间件,阿里的开源项目

mycat解决mysql数据库的单节点的问题

#首先在node-1、node-2、node-3上安装MySQL

#配置MySQL 5.7的yum源

sudo tee -a /etc/yum.repos.d/mysql-community.repo << EOF

[mysql57-community]

name=MySQL 5.7 Community Server

baseurl=http://repo.mysql.com/yum/mysql-5.7-community/el/6/\$basearch/

enabled=1

gpgcheck=0

gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql

EOF

#查看mysql源的信息

yum repolist enabled | grep mysql

#安装mysql的server

sudo yum install -y mysql-community-server

#启动mysql

sudo service mysqld start

#获取启动日志中的默认初始密码

#sudo grep 'temporary password' /var/log/mysqld.log

#获取mysql初始化密码并赋给一个变量

PASSWORD=`sudo grep 'temporary password' /var/log/mysqld.log | awk '{print $NF}'`

#使用root用户登录

mysql -uroot -p$PASSWORD

#修改root用户的密码

ALTER USER 'root'@'localhost' IDENTIFIED BY 'XiaoNiu_123!';

#修改mysql远程登录权限

GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'XiaoNiu_123!' WITH GRANT OPTION;

FLUSH PRIVILEGES;

----------------------------------------------------------------------------------------------------

----------------------------------------------------------------------------------------------------

#然后在node-4安装JDK并配置环境变量

#然后在node-4安装mycat

#上传Mycat-server-1.6.5-release-20171008170112-linux.tar.gz安装包

#修改conf目录下主要以下三个注意配置文件

server.xml是Mycat服务器参数调整和用户授权的配置文件

schema.xml是逻辑库定义和表以及分片定义的配置文件

rule.xml是分片规则的配置文件

#修改server.xml(修改了mycat的用户和逻辑的database)

<user name="xiaoniu" defaultAccount="true">

<property name="password">123456</property>

<property name="schemas">bigdata</property>

</user>

<user name="user">

<property name="password">user</property>

<property name="schemas">bigdata</property>

<property name="readOnly">true</property>

</user>

#修改schema.xml(配置逻辑库下的逻辑表,已经数据存放的mysql节点)

<schema name="bigdata" checkSQLschema="false" sqlMaxLimit="100">

<!-- auto sharding by id (long) -->

<table name="travelrecord" dataNode="dn1,dn2,dn3" rule="auto-sharding-long" />

<!-- global table is auto cloned to all defined data nodes ,so can join

with any table whose sharding node is in the same data node -->

<table name="company" primaryKey="ID" type="global" dataNode="dn1,dn2,dn3" />

<!-- random sharding using mod sharind rule -->

<table name="hotnews" primaryKey="ID" autoIncrement="true" dataNode="dn1,dn2,dn3"

rule="mod-long" />

</schema>

<dataNode name="dn1" dataHost="node1" database="db1" />

<dataNode name="dn2" dataHost="node2" database="db2" />

<dataNode name="dn3" dataHost="node3" database="db3" />

<dataHost name="node1" maxCon="1000" minCon="10" balance="0"

writeType="0" dbType="mysql" dbDriver="native" switchType="1"  slaveThreshold="100">

<heartbeat>select user()</heartbeat>

<!-- can have multi write hosts -->

<writeHost host="hostM1" url="192.168.10.101:3306" user="root" password="XiaoNiu_123!">

</writeHost>

</dataHost>

<dataHost name="node2" maxCon="1000" minCon="10" balance="0"

writeType="0" dbType="mysql" dbDriver="native" switchType="1"  slaveThreshold="100">

<heartbeat>select user()</heartbeat>

<!-- can have multi write hosts -->

<writeHost host="hostM1" url="192.168.10.102:3306" user="root" password="XiaoNiu_123!">

</writeHost>

</dataHost>

<dataHost name="node3" maxCon="1000" minCon="10" balance="0"

writeType="0" dbType="mysql" dbDriver="native" switchType="1"  slaveThreshold="100">

<heartbeat>select user()</heartbeat>

<!-- can have multi write hosts -->

<writeHost host="hostM1" url="192.168.10.103:3306" user="root" password="XiaoNiu_123!">

</writeHost>

</dataHost>

#在三台mysql上分别创建数据库db1、db2、db3

#然后在每一个database中创建表,有三张(travelrecord、company、hotnews)注意主键的名称

#在node-4上启动mycat

mysql -h 192.168.10.104 -P 8066 -u root -p123456

schema.xml可以设置全局表

  • 指标分析

需要分析的指标都有哪些?

充值:

骑行:

实时:骑行次数、骑行区域、结算(活动:红包车、参与次数、返现金额)

离线:骑行次数、结算(活动:红包车、参与次数、返现金额)、平均骑行时长、骑行热门场所、用户骑行时间段分布、用户骑行区域分布、用户年龄段、年龄段骑行时长、年龄段分散时间点

报修:

报修人id、时间、经纬城市、那个部件坏掉、手机号、openid、

实时:计算当日到目前为止有多少次报修、保修区域

离线:报修次数、保修区域、报修零部件

数据怎么处理?

实时用sparkStreaming

离线用sparkCore、SparkSQL

怎么保存?

保存到redis、mongodb、mysql集群

明细数据:hbase、mongodb、Elasticsearch

投广告

注册

离线数据放在mongodb集群中分析完成放入mysql集群中

实时数据放入hbase集群中分析完成放入mysql集群中

离线数据用sparkSQL进行分析

实时数据用sparkStreaming进行分析

实时计算一天内的指标

离线计算一周,一个月内的指标

  • 指标分析总结

用户行为分析模块

user表

用户id  用户注册状态  用户电话号 用户姓名 用户性别 用户年龄 省份 城市 城市等级押金 充值金额 注册时间

Id, status, phoneNum, name, idNum, deposit, balance, time,

user_app_click_log

用户id  openid  登陆时间 操作系统 操作系统版本 小程序版本 省份 城市

实时数据分析:

日活跃用户是描述一款软件好坏的指标

当天截至到目前为止pv以及uv的统计

每天截至到当前时间

  1. 用户注册数:统计mongdb中手机号的个数
  2. 计算:实时sparkStreaming,离线的话:sparksql或者sparkcore
  3. 用户交押金:统计mongdb中status是2,depost不为0
  4. 用户注册但是未交押金,status=1
  5. 交押金总额,交了押金的人数,
  6. 用户注册的每天时间段
  7. 不同地区注册的人数多少
  8. 注册的性别比例,性别人数
  9. 不同地区,男女人数

离线数据分析:

一周、一个月的小程序的pv以及uv的统计

  1. 用户注册数:统计mongdb中手机号的个数
  2. 用户交押金数量:统计mongdb中status是2,depost不为0
  3. 用户注册但是未交押金,status=1
  4. 交押金总额,交了押金的人数,
  5. 用户注册的每天时间段
  6. 不同地区注册的人数多少
  7. 注册的性别比例,性别人数
  8. 不同地区,男女人数

user_visit

最近一次小程序访问的日期

l最近一次app访问的操作系统

l第一次app访问os

l小程序 近7天访问pv数

l小程序近15天访问pv数

小程序近30天的访问pv数

lapp近30天0到5点的访问pv数

lapp近30天的6到7点的访问pv数

lapp近30天8到9的访问pv数

lapp近30天10到11访问pv数

lapp近30天12到13点的访问pv数

lapp近30天14到15点的访问pv数

lapp近30天16到17点的访问pv数

lapp近30天18到19点的访问pv数

lapp近30天20到21点的访问pv数

lapp近30天22到23点的访问pv数

技术实现:

例如pv和uv的统计

日志产生过程:用户打开移动终端时,js页面埋点,收集用户使用信息,(字段为:openid,GPS位置,时间),通过ngnix负载均衡,将信息发送到服务端,存放到mongdb(json)中

计算方法:(sparksql)

Pv计算,以天指标计算mongbd中的pv数,

Uv计算,以天为指标计mongdb中openid去重以后的个数

用户充值模块,

  1. 充值模块:userid  时间、省 、城市、金额,手机号,支付方式

    1. 什么时间段充值的人最多
    2. 不同省份充值金额数
    3. 不同支付方式

实时指标:

  1. 1.当天截止到当前时间的充值中金额总数
  2. 2.截至当天这个时候各个省、市的充值金额
  3. 3.截至到当天这个时间各个终端类型的充值金额
  4. 4.截至到当天这个时候充值的成功率
  5. 5.截至当天这个时候充值成功的渠道占比
  6. 6.截至当天充值笔数
  7. 7.不同省市省、市的充值笔数

离线指标以下指标近一个月、一个季度

  1. 1.某一天的充值金额
  2. 2.各个省、市的充值金额
  3. 3.各个终端类型的充值金额
  4. 4.充值的成功率
  5. 5.充值的渠道
  6. 6.充值笔数
  7. 7.省、市的充值笔数

报修模块,

  1. 保修模块

收集用户的报修事件的日志

数据怎么来的?各种终端设备产生的日志,发现单车不能使用,然后点击报修按钮、智能锁上报车辆状态

有哪些关键字段?时间、地点(省、市、区)、用户id、车辆id、原因(部件)、app类型

怎么处理的?数据的预处理(清洗 -> 转换(json -> parquet)) sparksql

计算完保存到哪里?聚合后的数据保存在关系数据库中(集群)、明细数据保存到Hbase、存储到mongo、Elasticsearch中

能计算哪些指标(离线)

1.报修次数

2.不同区域保修次数

报修次数topN

3.近一个月最容易损坏的部件

骑行模块,

骑行模块:

user表

骑行表

手机号,起点、终点、开始骑行时间、结束骑行时间、骑行时间、骑行距离、开始坐标,结束坐标,卡路里、骑行消费、车辆编号

离线:近一周,近一个月

单个用户:

用户骑行的次数

用户骑行公里数

用户常去的地点

用户经常出行的时间段

消费金额0.5元每半小时

1.地区的骑行次数

2.不同地区骑行公里数

3.不同地区消费总金额

5.骑行热点地区

6.根据公里数求不同时间段排行

7.不同年龄段骑行次数

8.不同年龄段出行最喜欢出行的时间

9.不同地区不同时间段骑行人数

10.不同地区生活特色

11.骑行年龄占比

通过经纬获得城市信息*****************************

实时:

1.截至每天当前时间不同地区的骑行次数

2.截至每天当前时间不同地区骑行公里数

3.截至每天当前时间不同地区消费总金额

通过经纬获得城市信息*****************************

  • 项目问题:
  1. 日志存在mongdb中,按天统计的话如何设计表每个表以时间命名,进行设计

2.骑行距离怎么计算

2千万用户 一天3T

一个人数据30kb 20000000用户 一天500g 一年170T 一个服务器12t t一共15台

共享单车项目、mongodb集群相关推荐

  1. 摩拜单车项目05-Redis集群

    文章目录 业务 将kafka中的数据存入Hdfs Flume方案 优化方案 数据处理的流程 指标计算 实时指标 离线指标 保修指标 能计算哪些指标 活动参与 Redis集群 主从结构示意图 一致性ha ...

  2. 全力升级篇-基于Mongodb与Nginx负载均衡打造共享单车项目实战 最新完整项目升级版

    全力升级篇-基于Mongodb与Nginx负载均衡打造共享单车项目实战 最新完整项目升级版 课程作为全新的升级项目课程,基于Nginx负载均衡,Flume与Kafka,Mongodb和Redis等技术 ...

  3. java 探花交友项目day5 推荐好友列表 MongoDB集群 发布动态,查询动态 圈子功能

    推荐好友列表 需求分析 推荐好友:分页形式查询推荐的用户列表,根据评分排序显示 代码实现: tanhuaController: /**  * 查询分页推荐好友列表  */ @GetMapping(&q ...

  4. Dobbo微服务项目实战(详细介绍+案例源码) - 5.推荐好友列表/MongoDB集群/动态发布与查看

    You are a wizard, Harry! 系列文章目录 1. 项目介绍及环境配置 2. 短信验证码登录 3. 用户信息 4. MongoDB 5.推荐好友列表/MongoDB集群/动态发布与查 ...

  5. 共享单车数据集_共享单车项目数据可视化展示

    共享单车项目数据可视化展示 1. 数据收集 https://www.kaggle.com/c/bike-sharing-demand/data 1.1 数据说明及问题分析 数据内容包括两年内的每小时租 ...

  6. python客户端开发自行车租赁系统_python可视化--共享单车项目

    共享单车项目项目说明 自行车共享系统是一种租赁自行车的方法,注册会员.租车.还车都将通过城市中的站点网络自动完成, 通过这个系统人们可以根据需要从一个地方租赁一辆自行车然后骑到自己的目的地归还. 在这 ...

  7. mysql集群和mongodb集群_mongodb分布式集群架构

    一.关于mongodb MongoDB是一个基于分布式文件存储的数据库.由C++语言编写.旨在为WEB应用提供可扩展的高性能数据存储解决方案. MongoDB是一个介于关系数据库和非关系数据库之间的产 ...

  8. java 连接mongodb 集群_Java 连接MongoDB集群的几种方式

    先决条件 先运行mongodb肯定是必须的,然后导入以下包: import com.mongodb.MongoClient; import com.mongodb.MongoClientURI; im ...

  9. 搭建高可用mongodb集群(二)—— 副本集

    2019独角兽企业重金招聘Python工程师标准>>> 在上一篇文章<搭建高可用MongoDB集群(一)--配置MongoDB> 提到了几个问题还没有解决. 主节点挂了能 ...

最新文章

  1. CDN监控系统(三 业务架构)
  2. PCIE总线-PCI、PCIE关系及信号定义
  3. 升级ADT22.6后,Android模拟器无法创建
  4. redis中hash类型介绍
  5. 支付宝APP支付(基于Java实现支付宝APP支付)
  6. 基于Python+tkinter+pygame的音乐播放器完整源码
  7. 软件工程结对作业01
  8. 美团的大数据产品,互联网的数字化转型,如何从0做到100?
  9. 北京公交“十三五”将通过大数据实现线路优化
  10. Android中Fragment生命周期和基本用法
  11. 统计学习方法读书笔记12-逻辑斯蒂回归与最大熵
  12. java监控文件内容变化_Java使用WatchService监控文件内容变化的示例
  13. arcgis 经纬度转大地坐标_MapGIS实现大地坐标到经纬度(地理坐标)的换算
  14. 土方工程量计算表格excel_市政道路土方excel计算表(含公式)
  15. 腾讯元老、上市公司CTO辞职后,每天对着200亩地发呆!
  16. 南邮CTF-RE-Py交易
  17. 商城app被攻击了之后我们应该怎么处理?
  18. excel如何在选定区域内跳过空值自动编号
  19. Word 2003 长篇文档排版技巧(二)
  20. 数学建模之线性回归的标准型以及例题

热门文章

  1. 实时3D图形技术的进化历史2
  2. New Bing使用教程【直接使用无需申请候补】
  3. 干净实用:装机必备绿色软件集锦
  4. 毕业设计效果展示:改良的CP-VTON(ICP-VTON)模型
  5. operator 用法
  6. 神经翻译笔记4扩展b. RNN的正则化方法
  7. ARM架构与编程(基于I.MX6ULL): 串口UART编程(七)
  8. Tomcat执行shutdown时报错:java.net.ConnectException: Connection refused (Connection refused)解决办法
  9. 三件让人幸福的事情:有人爱,有事做,有所期待 转自俞敏洪的新浪微博
  10. 聊天室项目(一)-展示