分布式集群架构与解决方案

一致性Hash算法

基本算法回顾

  • 顺序查找法

    需求:给定一个无序的数组A,然后随便给出一个数值N,判断N是否存在所给定的数组A中。

    解决思路:从索引0开始遍历数组A,依次和判断数值N判断。

    @Test
    public void test() {// 定义无序数组int[] ints = {......};// 定义需要查询的数值int findInt = 10;// 给出一个值,默认是-1int i = -1;for (int anInt : ints) {if (anInt == findInt) {i = anInt;break;}}System.out.println(i == -1 ? "数组ints中没有数值" + findInt : "数组ints中有数值" + findInt);
    }
    
  • 二分查找法

    二分查找的核心是针对有序的序列,从序列的中间开始查找。每次都折半,理论上效率比顺序查找法高的多。

// nums给定的有序数组, target需要查找的值
int binarySearch(int[] nums, int target) {int left = 0; int right = nums.length - 1; // 注意while(left <= right) { // 注意int mid = (right + left) / 2;if(nums[mid] == target)return mid; else if (nums[mid] < target)left = mid + 1; // 注意else if (nums[mid] > target)right = mid - 1; // 注意}return -1;
}
  • 直接寻址法

    直接寻址法是开辟一大块连续的空间数组,存储的时候将数据存储到对应的下标中。

  • 拉链法

    直接寻址法存在的问题,如果有一个非常大的数值,那么就需要开辟一块很大的空间,这样会造成浪费。

​ 拉链法是在直接寻址法上的改进,可以使用hash的方式确定数值的索引位置,但是数组存储的是链表而不是单个数值。

Hash算法的使用场景

Hashs算法在分布式集群产品中的使用非常广泛,例如:分布式集群架构的Redis、Hadoop、ElasticSearch、MySQL分库分表、Nginx负载均衡等。

  • 请求的负载均衡

    在Nginx的IP_hash策略下,客户端的IP不变的情况下,可以实现同一个客户端请求始终路由到同一个目标服务器上,实现回话粘滞。

  • 分布式存储

    在分布式内存数据库中,例如Redis集群有Redis1、Redis2、Redis3,那么数据分片存储的时候具体存储在那台Redis服务器中。最简单的可以使用hash(key)%3这样的方式来确定具体的服务器节点。

普通Hash算法的问题

如果我们有3台Tomcat服务器,这个时候有用户请求过来,我们根据用户的请求IP进行hash计算后,对服务器的总数3取余,确定具体讲用户请求路由到那个Tomcat。

当Tomcat2故障下线后,那么我们的Hash(IP)/3算法就要修改为Hash(IP)/2。对2取余,所有的用户请求重新分配,用户登录信息丢失,需要重新登录

一致性Hash算法

一致性Hash算法的设计思路:

一致性Hash算法其实也是使用取模求余的方法,不过是对2的32次方取模。首先用一条直线表示0到2的32次方-1范围,然后将首尾闭合就会形成一个封闭的环。那么如何实现服务器的映射呢?

  • 将服务器的IP或主机名进行Hash计算然后分布到Hash环上

  • 对客户端的IP也进行Hash计算,分布到Hash环上

  • 客户端所在的Hash环位置,顺时针方向寻找最近的服务器进行访问

假设上诉的Tomcat1故障下线,那么原本客户端0是路由到Tomcat1就会路由到Tomcat2。这个时候影响的用户只是Tomcat1的一小部分用户,而原本Tomcat0和Tomcat2原本用户不影响。相比普通的Hash算法影响全部用户来说一致性Hash算法相对比较好。

数据倾斜问题

哈希环的取值范围0到2的32次方-1,如果服务数量少,那么分布会不均匀,这就会导致请求分布不均匀。有的服务器承受高峰流量,有的服务器请求流量少的可怜。

既然我们的服务器少,那么就可以虚拟多个服务器IP。如果请求到虚拟服务那么我们就指向真实的服务器。

一致性Hash算法模拟

public class ConsistentHashNoVirtual {public static void main(String[] args) {// 1.定义服务器IP或域名String[] tomcatServers = {"123.111.0.0", "123.101.3.1", "111.20.35.1", "123.98.26.3"};SortedMap<Integer, String> sortedMap = new TreeMap<>();// 2.确定服务器和哈希环的映射关系for (String tomcatServer : tomcatServers) {int hash = Math.abs(tomcatServer.hashCode());sortedMap.put(hash, tomcatServer);}String[] clients = {"10.78.12.3", "113.25.63.1", "126.12.31.8"};// 3.计算客户端请求端口的哈希for (String client : clients) {int hash = Math.abs(client.hashCode());SortedMap<Integer, String> integerStringSortedMap = sortedMap.tailMap(hash);// 由于hash换是闭环操作,所以如果获取不到就路由到第一个服务器if (integerStringSortedMap.isEmpty()) {Integer firstKey = sortedMap.firstKey();System.out.println("=====>>> 客户端:" + client + " 路由到服务器:" + sortedMap.get(firstKey));} else {Integer firstKey = integerStringSortedMap.firstKey();System.out.println("=====>>> 客户端:" + client + " 路由到服务器:" + integerStringSortedMap.get(firstKey));}}}
}

虚拟一致性Hash算法模拟

public class ConsistentHashWithVirtual {public static void main(String[] args) {// 1.定义服务器IP或域名String[] tomcatServers = {"123.111.0.0", "123.101.3.1", "111.20.35.1", "123.98.26.3"};SortedMap<Integer, String> sortedMap = new TreeMap<>();// 2.确定服务器和哈希环的映射关系for (String tomcatServer : tomcatServers) {int hash = Math.abs(tomcatServer.hashCode());sortedMap.put(hash, tomcatServer);// 设置虚拟主机映射,一个主机虚拟3台虚拟主机出来for (int i = 1; i <= 2; i++) {int virtualHash = Math.abs((tomcatServer + "#" + i).hashCode());sortedMap.put(virtualHash, "虚拟出来的服务器:" + tomcatServer);}}String[] clients = {"10.78.12.3", "113.25.63.1", "126.12.31.8"};// 3.计算客户端请求端口的哈希for (String client : clients) {int hash = Math.abs(client.hashCode());SortedMap<Integer, String> integerStringSortedMap = sortedMap.tailMap(hash);// 由于hash换是闭环操作,所以如果获取不到就路由到第一个服务器if (integerStringSortedMap.isEmpty()) {Integer firstKey = sortedMap.firstKey();System.out.println("=====>>> 客户端:" + client + " 路由到服务器:" + sortedMap.get(firstKey));} else {Integer firstKey = integerStringSortedMap.firstKey();System.out.println("=====>>> 客户端:" + client + " 路由到服务器:" + integerStringSortedMap.get(firstKey));}}}
}

集群时钟同步问题

在集群环境中,如果时钟不同步,那么会导致一系列的数据异常或者有的问题无法排除莫名其妙等情况。例如:在电商平台下单,订单会有非常多的时间记录其中就有下单时间,如果服务器的时间不一样。那么会导致下单的时间顺序逻辑混乱。

时钟同步解决方案

  • 分布式集群所有服务器节点都可以连接互联网

    • 思路:所有服务器节点各自去同步互联网时间
    • 操作方法:ntpdate -u ntp.api.bz 后面是时间服务器的地址

  • 分布式集群所有服务器节点都不可以连接互联网

    • 思路:将一台服务器时间设置与互联网时间一致,其他服务器从该服务器同步时间
    • 操作方法:设置好一台服务器的时间,并配置为时间服务器/etc/ntp.conf文件

  • # 如果有 restrict default ignore,注释掉它
    # 添加如下内容
    restrict 172.17.0.0 mask 255.255.255.0 nomodify notrap # 放开局
    # 域网同步功能,172.17.0.0是你的局域网网段
    server 127.127.1.0 # local clock
    fudge 127.127.1.0 stratum 10
    # 重启生效并配置ntpd服务开机⾃启动
    service ntpd restart
    chkconfig ntpd on
    
  • 集群其他服务器同步局域网时间服务器的时间

  • # 执行时间同步命令
    ntpdate 172.168.12.5
    

分布式ID解决方案

  • UUID(可用,不推荐)

UUID全称是:Universally Unique Identifier,通用唯一识别码。生成的UUID重复的概率非常低,所以几乎不会考虑重复问题。

缺点

1)可读性差

2)业务识别性不强

3)数据库主键索引浪费存储空间

public static void main(String[] args) {String uuid = UUID.randomUUID().toString();System.out.println(uuid);  // 结果:35e2a48d-7c93-4ede-8099-272c3ab3fc4e
}
  • 独立数据库的自增ID

如果Order表是自增ID作为主键,那么拆分为Order0和Order1表后。如果还是自增ID就会导致ID重复的问题。我们可以在数据库中创建一张表OrderID。当需要ID的时候在OrderID表插入一条记录,然后使用SQL语句select last_insert_id() 获取刚才插入这张表的记录自增的ID。

-- 创建全局OrderID表,这个表的数据没有业务意义,只是为了获取ID而创建的
DROP TABLE IF EXISTS `DISTRIBUTE_ID`;
CREATE TABLE `ORDER_ID` (`id` bigint(32) NOT NULL AUTO_INCREMENT COMMENT '主键',`createtime` datetime DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- 执行语句
-- 插入记录
insert into DISTRIBUTE_ID(createtime) values(NOW());
-- 查询自增的ID
select LAST_INSERT_ID();
  • SnowFlake雪花算法(可以用,推荐)

    • 雪花ID的变种

      • 滴滴的tinyid
      • 百度的uidgenerator
      • 美团的leaf

雪花算法是Twitter推出的一个用于生成分布式ID的策略。
雪花算法是基于这个算法可以生成ID,生成的ID是一个long型,那么在Java中一个long型是8个字节,算下来是64bit,如下是使用雪花算法生成的一个ID的二进制形式示意:

/** * Java版本雪花ID算法实现* 官方推出,Scala编程语言来实现的* Java前辈用Java语言实现了雪花算法* 并发量大,1毫秒可以生成4096个ID*/
public class IdWorker {//下面两个每个5位,加起来就是10位的工作机器idprivate long workerId;    //工作idprivate long datacenterId;   //数据id//12位的序列号private long sequence;public IdWorker(long workerId, long datacenterId, long sequence) {// sanity check for workerIdif (workerId > maxWorkerId || workerId < 0) {throw new IllegalArgumentException(String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));}if (datacenterId > maxDatacenterId || datacenterId < 0) {throw new IllegalArgumentException(String.format("datacenter Id can't be greater than %d or less than 0", maxDatacenterId));}System.out.printf("worker starting. timestamp left shift %d, datacenter id bits %d, worker id bits %d, sequence bits %d, workerid %d",timestampLeftShift, datacenterIdBits, workerIdBits, sequenceBits, workerId);this.workerId = workerId;this.datacenterId = datacenterId;this.sequence = sequence;}//初始时间戳private long twepoch = 1288834974657L;//长度为5位private long workerIdBits = 5L;private long datacenterIdBits = 5L;//最大值private long maxWorkerId = -1L ^ (-1L << workerIdBits);private long maxDatacenterId = -1L ^ (-1L << datacenterIdBits);//序列号id长度private long sequenceBits = 12L;//序列号最大值private long sequenceMask = -1L ^ (-1L << sequenceBits);//工作id需要左移的位数,12位private long workerIdShift = sequenceBits;//数据id需要左移位数 12+5=17位private long datacenterIdShift = sequenceBits + workerIdBits;//时间戳需要左移位数 12+5+5=22位private long timestampLeftShift = sequenceBits + workerIdBits + datacenterIdBits;//上次时间戳,初始值为负数private long lastTimestamp = -1L;public long getWorkerId() {return workerId;}public long getDatacenterId() {return datacenterId;}public long getTimestamp() {return System.currentTimeMillis();}//下一个ID生成算法public synchronized long nextId() {long timestamp = timeGen();//获取当前时间戳如果小于上次时间戳,则表示时间戳获取出现异常if (timestamp < lastTimestamp) {System.err.printf("clock is moving backwards.  Rejecting requests until %d.", lastTimestamp);throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds",lastTimestamp - timestamp));}//获取当前时间戳如果等于上次时间戳//说明:还处在同一毫秒内,则在序列号加1;否则序列号赋值为0,从0开始。if (lastTimestamp == timestamp) {  // 0  - 4095sequence = (sequence + 1) & sequenceMask;if (sequence == 0) {timestamp = tilNextMillis(lastTimestamp);}} else {sequence = 0;}//将上次时间戳值刷新lastTimestamp = timestamp;/*** 返回结果:* (timestamp - twepoch) << timestampLeftShift) 表示将时间戳减去初始时间戳,再左移相应位数* (datacenterId << datacenterIdShift) 表示将数据id左移相应位数* (workerId << workerIdShift) 表示将工作id左移相应位数* | 是按位或运算符,例如:x | y,只有当x,y都为0的时候结果才为0,其它情况结果都为1。* 因为个部分只有相应位上的值有意义,其它位上都是0,所以将各部分的值进行 | 运算就能得到最终拼接好的id*/return ((timestamp - twepoch) << timestampLeftShift) |(datacenterId << datacenterIdShift) |(workerId << workerIdShift) |sequence;}//获取时间戳,并与上次时间戳比较private long tilNextMillis(long lastTimestamp) {long timestamp = timeGen();while (timestamp <= lastTimestamp) {timestamp = timeGen();}return timestamp;}//获取系统时间戳private long timeGen() {return System.currentTimeMillis();}public static void main(String[] args) {IdWorker worker = new IdWorker(21, 10, 0);for (int i = 0; i < 100; i++) {System.out.println(worker.nextId());}}}
  • 基于Redis实现

Redis是单线程执行,那么在Redis中提供了incr命令,将key中存储的数值+1。如果key不存在,那么就会初始化为0,然后再执行incr操作。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5RLHw50q-1606448531088)(.\lagou-img\redis01.jpg)]

分布式调度问题

何为分布式调度:

  • 运行在分布式集群环境下的调度任务(同一个定时任务部署多个实例,同一时间应该只有一个实例执行任务)
  • 定时任务的分布式,将定时任务拆分为多给子任务共同作业

单体定时任务到分布式集群定时任务的演变

定时任务的实现方式

定时任务的实现方式很多。例如早期没有定时任务的时候回使用JDK中的Timer机制和多线程机制(Runnable + 线程休眠)来实现定时任务或者每隔一段时间就执行。后面出现了定时任务框架Quartz定时任务调度框架等。

任务调度框架Quartz定时任务回顾

<!--引入:任务调度框架quartz-->
<!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz -->
<dependency><groupId>org.quartz-scheduler</groupId><artifactId>quartz</artifactId><version>2.3.2</version>
</dependency>
// 定时任务作业主要调度程序
public class QuartzMain {// 1、创建任务调度器public static Scheduler createScheduler() throws SchedulerException {SchedulerFactory schedulerFactory = new StdSchedulerFactory();Scheduler scheduler = schedulerFactory.getScheduler();return scheduler;}// 2、创建一个任务public static JobDetail createJob() {JobBuilder jobBuilder = JobBuilder.newJob(DemoJob.class); // TODO 自定义任务类jobBuilder.withIdentity("jobName","myJob");JobDetail jobDetail = jobBuilder.build();return jobDetail;}/*** 3、创建作业任务时间触发器* cron表达式由七个位置组成,空格分隔* 1、Seconds(秒)  0~59* 2、Minutes(分)  0~59* 3、Hours(小时)  0~23* 4、Day of Month(天)1~31,注意有的月份不足31天* 5、Month(月) 0~11,或者 JAN,FEB,MAR,APR,MAY,JUN,JUL,AUG,SEP,OCT,NOV,DEC* 6、Day of Week(周)  1~7,1=SUN或者  SUN,MON,TUE,WEB,THU,FRI,SAT* 7、Year(年)1970~2099  可选项*示例:* 0 0 11 * * ? 每天的11点触发执行一次* 0 30 10 1 * ? 每月1号上午10点半触发执行一次*/public static Trigger createTrigger() {// 创建时间触发器CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity("triggerName","myTrigger").startNow().withSchedule(CronScheduleBuilder.cronSchedule("*/2 * * * * ?")).build();return cronTrigger;}/*** main函数中开启定时任务* @param args*/public static void main(String[] args) throws SchedulerException {// 1、创建任务调度器Scheduler scheduler = QuartzMan.createScheduler();// 2、创建一个任务JobDetail job = QuartzMan.createJob();// 3、创建任务的时间触发器Trigger trigger = QuartzMan.createTrigger();// 4、使用任务调度器根据时间触发器执行我们的任务scheduler.scheduleJob(job,trigger);scheduler.start();}
}
// 定时任务真正执行的逻辑
public class DoJob implements Job {@Overridepublic void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {System.out.println("我是一个定时任务执行逻辑");}
}

Quartz可以很好的在单体应用架构中执行定时任务,但是对于分布式环境下的分布式定时任务就不是那么友好,如果多个Quartz同时执行,那么就会导致数据存在被重复处理的风险。如果是订单退款逻辑,那么会出现重复退款的可能性。

分布式调度框架Elastic-Job

Github地址:https://github.com/elasticjob

Elastic-Job是当当网开源的一个分布式调度解决方案,基于Quartz二次开发,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。我们主要学习Elastic-Job-Lite定位是轻量级无中心化解决方案,使用Jar包的形式提供分布式任务的调度服务,而Elastic-Job-Cloud需要结合Mesos和Docker在云环境下使用。

功能介绍

  • 分布式调度协调

    在分布式环境中,任务能够按指定的调度策略执行,并且能够避免同一任务多实例重复执行。

  • 丰富的调度策略

    基于成熟的定时任务作业框架Quartz cron表达式执行定时任务

  • 弹性扩容缩容

    当集群中增加某一个实例,它应当也能够被选举并执行任务;当集群减少一个实例
    时,它所执行的任务能被转移到别的实例来执行。

  • 失效转移

    某实例在任务执行失败后,会被转移到其他实例执行。

  • 错过执行作业重触发

    若因某种原因导致作业错过执行,自动记录错过执行的作业,并在上次作业
    完成后自动触发。

  • 支持并行调度

    支持任务分片,任务分片是指将一个任务分为多个小任务项在多个实例同时执行。

  • 作业分片一致性

    当任务被分片后,保证同一分片在分布式环境中仅一个执行实例。

Elastic-Job-Lite应用

Elastic-Job依赖Zookeeper进行分布式协调,所以需要安装Zookeeper(3.4.6版本以上)。

Zookeeper安装(Linux)

Zookeeper的安装有单机、集群和伪集群三种类型。在开发中只需要搭建单机就可以,但是在生产环境一般为了避免单点故障都会部署集群或者伪集群模式。

由于Zookeeper采用过半投票机制策略,所以集群至少需要保证有3个以上的服务器。那么集群和伪集群有什么区别呢。

集群:在多个独立的物理服务器上各自部署Zookeeper,这样的好处是单台物理服务器故障不会导致整体受影响。但是也增加了网络通信的问题。

伪集群:伪集群和集群其实差不多,只是在同一台物理服务器上部署多个Zookeeper,这样的确定就是如果物理服务器故障,将导致整体不可用。

以下演示的安装是以 Linux环境、CentOS 7系统为基础,那么在使用之前我们需要做一下配置,如果有环境并且已经处理了可以忽略

  • 防火墙问题,CentOS 7防火墙默认是开启的,那么如果不关闭端口就不可以访问,在代码中调用的时候就无法使用默认的2181和其他端口

      # CentOS 6处理方法//临时关闭service iptables stop//禁止开机启动chkconfig iptables off
    
      # CentOS 7处理方法,由于CentOS 7以后的防火墙默认使用firewalld,因此处理方法和CentOS 6有差异//临时关闭systemctl stop firewalld//禁止开机启动systemctl disable firewalldRemoved symlink /etc/systemd/system/multi-user.target.wants/firewalld.service.Removed symlink /etc/systemd/system/dbus-org.fedoraproject.FirewallD1.sersvice.
    
    # 额外提供的命令
    # 防火墙相关命令
    1、查看防火墙状态 : systemctl status firewalld.service
    注:active是绿的running表示防火墙开启
    2、关闭防火墙 :systemctl stop firewalld.service
    3、开机禁用防火墙自启命令 :systemctl disable firewalld.service
    4、启动防火墙 :systemctl start firewalld.service
    5、防火墙随系统开启启动 : systemctl enable firewalld.service
    6、重启防火墙 : firewall-cmd --reload# 端口开放命令
    1、查询已经开放的端口 :firewall-cmd --list-port
    2、查询某个端口是否开放 :firewall-cmd --query-port=80/tcp
    3、开启端口 :firewall-cmd --zone=public --add-port=80/tcp --permanent
    注:可以是一个端口范围,如1000-2000/tcp
    4、移除端口 :firewall-cmd --zone=public --remove-port=80/tcp --permanent
    5、命令含义:
    --zone #作用域
    --add-port=80/tcp #添加端口,格式为:端口/通讯协议
    --remove-port=80/tcp #移除端口,格式为:端口/通讯协议
    --permanent #永久生效,没有此参数重启后失效
    

单机版

  • 下载Zookeeper http://zookeeper.apache.org/releases.html 打开这个网址就可以看到不同的版本直接下载即可

  • 在/usr/local/目录下创建zookeeper文件夹,用于存放zookeeper。mkdir zookeeper

  • 将下载的zookeeper.tar.gz压缩包上传到Linux的/usr/local/zookeeper目录下

  • # 解压
    tar -zxvf zookeeper-3.4.14.tar.gz
    
  # 进入conf目录,zookeeper启动是读取zoo.cfg配置文件,所以重命名或者拷贝一份都可以cp zoo_sample.cfg zoo.cfg
  • # 在zookeeper目录下创建data文件夹,用于存储文件,不使用默认的路径文件夹
    mkdir data
    # 修改conf/zoo.cfg配置文件的dataDir路径,这个是数据存放的文件路径
    dataDir=/usr/local/zookeeper/data
    
  • # 命令
    ./bin/zkServer.sh start  # 启动
    ./bin/zkServer.sh stop   # 暂停
    ./bin/zkServer.sh status # 查看状态
    
  • # 启动命令执行的效果
    [root@localhost zookeeper-3.4.14-2181]# ./bin/zkServer.sh start
    ZooKeeper JMX enabled by default
    Using config: /usr/local/zookeeper/zookeeper-3.4.14-2181/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
    
  • # 查看状态命令执行的效果
    [root@localhost zookeeper-3.4.14-2181]# ./bin/zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /usr/local/zookeeper/zookeeper-3.4.14-2181/bin/../conf/zoo.cfg
    Mode: standalone  # standalone表示单机版本
    
  • # 暂停命令执行的效果
    [root@localhost zookeeper-3.4.14-2181]# ./bin/zkServer.sh stop
    ZooKeeper JMX enabled by default
    Using config: /usr/local/zookeeper/zookeeper-3.4.14-2181/bin/../conf/zoo.cfg
    Stopping zookeeper ... STOPPED
    

集群版

  • 下载http://zookeeper.apache.org/releases.html 打开这个网址就可以看到不同的版本直接下载

  • 上传zookeeper.tar.gz压缩包到/usr/local/zookeeper目录下

  • # 在/usr/local/目录下创建文件夹
    mkdir zkcluster
    
  • # 解压zookeeper到zkcluster文件夹,-C的作用是指定解压到那个文件夹下
    tar -zxvf zookeeper-3.4.14.tar.gz -C /zkcluster
    
  • # 修改文件名称,并复制
    mv zookeeper-3.4.14 zookeeper01
    # 复制2份,集群数量为3
    cp -r zookeeper01/ zookeeper02
    cp -r zookeeper01/ zookeeper03
    
  • # 在每个zookeeper目录下创建data文件夹,并在data目录下创建log文件夹
    mkdir data
    cd data
    mkdir log
    
  • # 修改/conf/zoo_sample.cfg配置文件名 zookeeper01  zookeeper02  zookeeper03都要执行
    cp zoo_sample.cfg zoo.cfg
    
  • # 修改zoo.cfg配置文件的端口的数据存储就以及日志路径,端口分别是:2181 2182 2183
    # 端口
    clientPort=2181
    # 存储路径
    dataDir=/usr/local/zookeeper/zkcluster/zookeeper01/data
    # 日志路径
    dataLogDir=/usr/local/zookeeper/zkcluster/zookeeper01/data/logclientPort=2182
    dataDir=/usr/local/zookeeper/zkcluster/zookeeper02/data
    dataLogDir=/usr/local/zookeeper/zkcluster/zookeeper02/data/logclientPort=2183
    

dataDir=/usr/local/zookeeper/zkcluster/zookeeper03/data
dataLogDir=/usr/local/zookeeper/zkcluster/zookeeper03/data/log


* ```shell
# 配置集群,分别在data目录下创建myid,内容分别是1 2 3用于记录每个服务器的ID,也是集群中zookeeper的唯一标记,不可以重复
touch ./zookeeper01/data/myid
touch ./zookeeper02/data/myid
touch ./zookeeper03/data/myid
  • #在每个zookeeper的zoo.cfg文件中配置客户端访问端口(clientPort)和集群服务器IP列表
    #server.服务器ID=服务器IP地址:服务器之间通信端口:服务器之间投票选举端口
    server.1=192.168.247.100:2881:3881
    server.2=192.168.247.100:2882:3882
    server.3=192.168.247.100:2883:3883
    
  • # 依次启动3个zookeeper
    ./bin/zkServer.sh start
    ./bin/zkServer.sh stop
    ./bin/zkServer.sh status
    

应用开发

需求:每个5秒执行一次定时任务(resume表中未归档的数据归档到resume_bak表中,每次归档一条记录)

1)resume_bak和resume表结构一致

2)resume表中数据归档之后不删除,将state设置为"已归档"

DROP TABLE IF EXISTS `resume`;
CREATE TABLE `resume` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,`sex` varchar(255) DEFAULT NULL,`phone` varchar(255) DEFAULT NULL,`address` varchar(255) DEFAULT NULL,`education` varchar(255) DEFAULT NULL,`state` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1001 DEFAULT CHARSET=utf8;
SET FOREIGN_KEY_CHECKS = 1;
  • 引入Jar包
<!--elastic-job-lite核心包-->
<dependency><groupId>com.dangdang</groupId><artifactId>elastic-job-lite-core</artifactId><version>2.1.5</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.46</version>
</dependency>
  • 定时任务实例
// 这里使用JDBC操作数据库,不使用其他框架public class JdbcUtil {//urlprivate static String url = "jdbc:mysql://localhost:3306/job?characterEncoding=utf8&useSSL=false";//userprivate static String user = "root";//passwordprivate static String password = "123456";//驱动程序类private static String driver = "com.mysql.jdbc.Driver";static {try {Class.forName(driver);} catch (ClassNotFoundException e) {// TODO Auto-generated catch blocke.printStackTrace();}}public static Connection getConnection() {try {return DriverManager.getConnection(url, user, password);} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();}return null;}public static void close(ResultSet rs, PreparedStatement ps, Connection con) {if (rs != null) {try {rs.close();} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();} finally {if (ps != null) {try {ps.close();} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();} finally {if (con != null) {try {con.close();} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}}}}}/**** DML操作(增删改)* 1.获取连接数据库对象* 2.预处理* 3.执行更新操作* @param sql* @param obj*///调用者只需传入一个sql语句,和一个Object数组。该数组存储的是SQL语句中的占位符public static void executeUpdate(String sql,Object...obj) {Connection con = getConnection();//调用getConnection()方法连接数据库PreparedStatement ps = null;try {ps = con.prepareStatement(sql);//预处理for (int i = 0; i < obj.length; i++) {//预处理声明占位符ps.setObject(i + 1, obj[i]);}ps.executeUpdate();//执行更新操作} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();} finally {close(null, ps, con);//调用close()方法关闭资源}}/**** DQL查询* Result获取数据集** @param sql* @param obj* @return*/public static List<Map<String,Object>> executeQuery(String sql, Object...obj) {Connection con = getConnection();ResultSet rs = null;PreparedStatement ps = null;try {ps = con.prepareStatement(sql);for (int i = 0; i < obj.length; i++) {ps.setObject(i + 1, obj[i]);}rs = ps.executeQuery();//new 一个空的list集合用来存放查询结果List<Map<String, Object>> list = new ArrayList<>();//获取结果集的列数int count = rs.getMetaData().getColumnCount();//对结果集遍历每一条数据是一个Map集合,列是k,值是vwhile (rs.next()) {//一个空的map集合,用来存放每一行数据Map<String, Object> map = new HashMap<String, Object>();for (int i = 0; i < count; i++) {Object ob = rs.getObject(i + 1);//获取值String key = rs.getMetaData().getColumnName(i + 1);//获取k即列名map.put(key, ob);}list.add(map);}return list;} catch (SQLException e) {// TODO Auto-generated catch blocke.printStackTrace();} finally {close(rs, ps, con);}return null;}
}
/*** ElasticJobLite定时任务业务逻辑处理类*/
public class ElasticJob implements SimpleJob {/*** 需求:resume表中未归档的数据归档到resume_bak表中,每次归档1条记录* execute方法中写我们的业务逻辑(execute方法每次定时任务执行都会执行一次)* @param shardingContext*/@Overridepublic void execute(ShardingContext shardingContext) {int shardingItem = shardingContext.getShardingItem();System.out.println("当前分片:" + shardingItem);// 获取分片参数String shardingParameter = shardingContext.getShardingParameter(); // 0=bachelor,1=master,2=doctor// 1 从resume表中查询出1条记录(未归档)String selectSql = "select * from resume where state='未归档' and education='"+ shardingParameter +"' limit 1";List<Map<String, Object>> list = JdbcUtil.executeQuery(selectSql);if(list == null || list.size() ==0 ) {System.out.println("数据已经处理完毕!!!!!!");return;}// 2 "未归档"更改为"已归档"Map<String, Object> stringObjectMap = list.get(0);long id = (long) stringObjectMap.get("id");String name = (String) stringObjectMap.get("name");String education = (String) stringObjectMap.get("education");System.out.println("id:" + id + "  name:" + name + " education:" + education);String updateSql = "update resume set state='已归档' where id=?";JdbcUtil.executeUpdate(updateSql,id);// 3 归档这条记录,把这条记录插入到resume_bak表String insertSql = "insert into resume_bak select * from resume where id=?";JdbcUtil.executeUpdate(insertSql,id);}
}
public class ElasticJobMain {public static void main(String[] args) {// 配置分布式协调服务(注册中心)ZookeeperZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181","data-archive-job");CoordinatorRegistryCenter coordinatorRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);coordinatorRegistryCenter.init();// 配置任务(时间事件、定时任务业务逻辑、调度器)JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("archive-job", "*/2 * * * * ?", 3).shardingItemParameters("0=bachelor,1=master,2=doctor").build();SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,ArchivieJob.class.getName());JobScheduler jobScheduler = new JobScheduler(coordinatorRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfiguration).overwrite(true).build());jobScheduler.init();}
}
  • 测试过程

    • 启动3个进程(模拟有3个定时任务实例同时执行)
    • 观察3个进程打印的日志,关闭其中一个进程,看这个进程的任务是否会被分配给其他进程接替执行

Session共享问题

问题分析

  • Nginx默认采用轮询策略分发请求,导致同一个用户的请求会分发到不同的服务器
  • Tomcat存储用户信息默认是本地Session存储,不会共享
  • Http请求是无状态的,不会记录用户的登录信息

Session共享解决方案

  • Nginx的IP_Hash策略

    • Nginx的IP_Hash策略会将同一个客户端IP的请求路由到同一个目标服务器,这也就是回话粘滞。
    • 优点:配置简单,不入侵应用,不需要额外的开发
    • 缺点:服务器重启Session丢失,存在单点负载高的风险,单点故障问题
  • Session复制
    • 通过Tomcat配置文件的修改,达到Session相互复制
    • 优点:不入侵应用,便于服务器水平扩展,适应各种负载均衡策略,服务器重启Session不会丢失
    • 缺点:性能低, 消耗内存,不可以存储过多的Session数据(内存有限),Session复制的延迟性

  • Session共享(推荐)

    • Session其实就是存储了用户信息,那么既然是存储信息。我们就不一定要存储在Tomcat中,可以存储在缓存中间件。例如Redis/Memcache等都可以。
    • 优点:适用各种负载均衡策略,服务器重启或者故障都不会造成Session信息的丢失,扩展能力强,适合大集群环境
    • 缺点:对应用有入侵性,应用需要和Redis交互

分布式集群架构解决方案相关推荐

  1. 分布式集群架构场景化解决方案相关

    文章目录 前言 一.一致性Hash算法 1.1 Hash算法应用场景 1.2 普通Hash算法存在的问题 1.3 一致性Hash算法 二.集群时钟同步问题 2.1 时钟不同步导致的问题 2.2 集群时 ...

  2. RabbitMQ分布式集群架构

    RabbitMQ分布式集群架构和高可用性(HA) (一) 功能和原理 设计集群的目的 允许消费者和生产者在RabbitMQ节点崩溃的情况下继续运行 通过增加更多的节点来扩展消息通信的吞吐量 1 集群配 ...

  3. ClickHouse数据库培训实战 (PB级大数据分析平台、大规模分布式集群架构)

    一.ClickHouse数据库培训实战课程 (PB级大数据分析平台.大规模分布式集群架构)视频教程 为满足想学习和掌握ClickHouse大数据分析专用的数据库,风哥特别设计的一套比较系统的Click ...

  4. NOSQL,MongoDB分布式集群架构

    MongoDB分布式集群架构 看到这里相信你已经掌握了 MongoDB 的大部分基本知识,现在在单机环境下操作 MongoDB 已经不存在问题,但是单机环境只适合学习和开发测试,在实际的生产环境中,M ...

  5. 分布式集群架构场景化解决⽅案(⼀致性Hash算法)

    什么是分布式和集群以及实现 主要内容 ⼀致性Hash算法 集群时钟同步配置 分布式ID解决⽅案 分布式调度问题 Session共享问题 分布式和集群是不⼀样的,分布式⼀定是集群,但是集群不⼀定是分布式 ...

  6. 分布式集群架构场景解决方案学习笔记

    课程学习 一致性哈希算法 集群时钟同步问题 分布式ID解决方案 分布式任务调度问题 session共享(一致性)问题 一致性哈希算法 一致性哈希算法在1997年由麻省理工学院的Karger等人在解决分 ...

  7. MongoDB分布式集群架构(3种模式)

    MongoDB 有三种集群部署模式,分别为主从复制(Master-Slaver).副本集(Replica Set)和分片(Sharding)模式. Master-Slaver 是一种主从副本的模式,目 ...

  8. mysql集群和mongodb集群_mongodb分布式集群架构

    一.关于mongodb MongoDB是一个基于分布式文件存储的数据库.由C++语言编写.旨在为WEB应用提供可扩展的高性能数据存储解决方案. MongoDB是一个介于关系数据库和非关系数据库之间的产 ...

  9. Hadoop+Hbase分布式集群架构“完全篇”

    本文收录在Linux运维企业架构实战系列 前言:本篇博客是博主踩过无数坑,反复查阅资料,一步步搭建,操作完成后整理的个人心得,分享给大家~~~ 1.认识Hadoop和Hbase 1.1 hadoop简 ...

最新文章

  1. 程序不能使用中文名_这几款车没有中文名?那买车时应该怎么叫?
  2. (转载)Android进阶2之Activity之间数据交流(onActivityResult的用法)
  3. was升级jdk版本_WebSphere 8.5 升级jdk版本
  4. java jdbc6_Java学习-JDBC
  5. C++|STL学习笔记-map的属性(大小以及是否存在)
  6. python入门第三天
  7. Docker Flie
  8. MySql实现sequence功能的代码
  9. SCSI子系统基础学习笔记 - 3. SCSI设备探测
  10. 华硕笔记本电池软件_成色配置都还不错的二手华硕7代 i5 笔记本电脑 只卖1599元 想要的 快进来看看...
  11. A到Z的unicode的编码的大小
  12. CF1715D 2+ doors 题解
  13. 迷你世界安卓版mod
  14. Android 和 iOS APP 测试的区别
  15. mysql中sql语句日期比较,mysql sql语句中 日期函数的使用
  16. malloc挖掘---动态存储器分配深入了解
  17. 斯坦福CS234增强学习——(1)简介
  18. 数据分析——用户流失分析
  19. 2022AP微积分BC北美卷FRQ已放出,附考情分析
  20. 手机红外鸿蒙,实力强劲的四款华为手机:均支持更新鸿蒙系统,你的在内吗?...

热门文章

  1. 转:视频解码原理及ffmpeg MP4转YUV420P
  2. JS中的setTimeout和setInterval函数
  3. Sennedjem古埃及匠人的水壶
  4. 基于STM32与机智云平台的远程控制智能家居系统
  5. 毛哥的快乐生活(24) 庐中论天下 酒里定乾坤
  6. 2019年9月22日总结
  7. Antd a-menu 样式修改
  8. MySQL外键约束(FOREIGN KEY)
  9. Android 12.0 锁屏页面滑动解锁不灵敏的功能修复
  10. xsl php,Centos下给PHP开启xsl扩展