在使用MongoDB的时候 (基于spring-mongo) ,我想在插入对象时获取有序自增的主键 ,但是MongoDB的默认规则是生成一串无序 (大致有序) 的字串 .而Spring Data提供的主键生成方法也是随机的 String/BigInteger.

因为分布式情况下 ,有序ID会变得困难 ( ID中心/分布式锁 )

同步问题

获取有序ID的通常做法是 :

创建sequence : key-start-end-step-current 标识/起值/止值/步长/当前值

获取sequence ,current作为主键值

保存current = current + step到数据库作为下一个主键值

但是在多线程情况下 , 2-3可能被多个线程同时运行 ,导致sequence还未保存成功就被下一个获取.

模拟数据库Sequence模型

@AllArgsConstructor

public class Sequence {

@Setter

long current;

public long getCurrent() {

transTime();

return current;

}

public void setCurrent(long current) {

transTime();

this.current = current;

}

private void transTime() {

try {

Thread.sleep(3);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

public class SequenceServiceTest {

Sequence A;//A型sequence

Sequence B;//B型sequence

/**

* 初始化数据

*/

@Before

public void before() {

A = new Sequence(1L);

B = new Sequence(1L);

}

/**

* 模拟数据库取值

*/

private Sequence getSequence(String name) {

switch (name) {

case "A":

return A;

case "B":

return B;

default:

return null;

}

}

private void waitService(ExecutorService executorService) {

executorService.shutdown();

try {

while (!executorService.awaitTermination(1000, TimeUnit.SECONDS)) {

}

System.out.println("final A:" + A.getCurrent());

System.out.println("final B:" + B.getCurrent());

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

非同步代码读写sequence

@Test

public void noSynchronizedTest() {

//10个用户同时需要获取id

ExecutorService executorService = Executors.newFixedThreadPool(10);

//一部分需要A ,一部分需要B

for (int i = 0; i < 1000; i++) {

executorService.submit(() -> System.out.println("A:" + getNextWithNoSync("A")));

executorService.submit(() -> System.out.println("B:" + getNextWithNoSync("B")));

}

waitService(executorService);

}

private long getNextWithNoSync(String name) {

Sequence sequence = getSequence(name);

long current = sequence.getCurrent();

long next = current + 1;

sequence.setCurrent(next);

return current;

}

运行时间 : 1s369ms

...

B:360

A:332

B:361

B:361

A:332

A:332

final A:333

final B:362

可以从结果看出 ,最终的sequence不是1001(从1开始取1000个) ,说明中途有重复的sequence产生 .原因就是在getNextWithNoSync函数 , get取值 在上一个set回写 之前运行 ,造成混乱. (网络传输越慢 ,影响越大)

同步代码读写sequence

因此set/get必须在一个同步代码块中 ,这段代码每次只能被一个线程访问 .

@Test

public void synchronizedMethodTest() {

ExecutorService executorService = Executors.newFixedThreadPool(10);

for (int i = 0; i < 1000; i++) {

executorService.submit(() -> System.out.println("A:" + getNextWithSync("A")));

executorService.submit(() -> System.out.println("B:" + getNextWithSync("B")));

}

waitService(executorService);

}

private synchronized long getNextWithSync(String name) {

Sequence sequence = getSequence(name);

long current = sequence.getCurrent();

long next = current + 1;

sequence.setCurrent(next);

return current;

}

运行时间 : 13s277ms

...

A:998

B:999

A:999

B:1000

A:1000

final A:1001

final B:1001

这次结果正确 ,所有ID都是有序取用的 ,但是运行的时间足足慢了10倍 !! 因为 synchronized 标记加在了getNextWithSync方法上 ,线程会等待只有一个线程能运行这个函数 .差值 = 数据库读写时间*方法执行数量 : (3+3) * 1000 * 2 = 12,000 ms .

当然 ,同步代码约束了多线程的运行 ,效率上自然有所下降 .不过我们发现 ,我们所取的AB两个Sequence是互不影响的 ,当 A.get/B.get 同时发生也是允许的 ,所以需要调整同步规则 ,只对同一个sequence取用进行同步.

同步指定对象运行代码读写sequence

@Test

public void synchronizedObjectTest() {

ExecutorService executorService = Executors.newFixedThreadPool(10);

for (int i = 0; i < 1000; i++) {

executorService.submit(() -> System.out.println("A:" + getNextWithSyncObject("A")));

executorService.submit(() -> System.out.println("B:" + getNextWithSyncObject("B")));

}

waitService(executorService);

}

private long getNextWithSyncObject(String name) {

Sequence sequence = getSequence(name);

synchronized (sequence) {

long current = sequence.getCurrent();

long next = current + 1;

sequence.setCurrent(next);

return current;

}

}

运行时间 : 6s869ms

...

A:997

B:999

A:998

A:999

B:1000

A:1000

final A:1001

final B:1001

很容易看出 ,运行时间比上一个少了一半 .因为我们对sequence对象进行同步 ,不同的sequence对象就像多个锁 ,只有争抢同一个锁的人才需要进行等待 .

实践

@Service

public class SequenceServiceImpl

implements SequenceService, ApplicationListener {

final Map collection2SequenceLock = new ConcurrentHashMap<>();

@Autowired

SequenceRepository sequenceRepository;

/**

* 初始化各sequence的同步锁

*/

@Override

public void onApplicationEvent(ContextRefreshedEvent event) {

//避免spring-mvc中的servlet context事件

if (event.getApplicationContext().getParent() == null) {

for (SysSequence sequence : sequenceRepository.findAll()) {

collection2SequenceLock.put(sequence.getCollectionName(), new SequenceLock(sequence));

log.info("初始化sequence lock列表 : {} - {}",

sequence.getCollectionName(), sequence.getCurrent());

}

}

}

@Override

public void register(SysSequence sequence) {

sequenceRepository.insert(sequence);

}

/**

* 获取主键

*

* @param entity collection对象

*/

@Override

public Long getNext(Class entity) {

String collection = CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_CAMEL, entity.getSimpleName());

Document docAnnotation = (Document) entity.getAnnotation(Document.class);

if (docAnnotation != null) {

collection = docAnnotation.collection();

}

return getNext(collection);

}

@Override

public Long getNext(String collection) {

if (!collection2SequenceLock.containsKey(collection)) {

throw CheckedException.of("找不到%s表的主键sequence", collection);

}

return getNextWithLock(collection);

}

/**

* 多线程情况下获取同一个collection的id时 ,可能出现同步问题 ;但又不能影响不同的collection的流程

*/

private Long getNextWithLock(String collection) {

SequenceLock lock = collection2SequenceLock.get(collection);

synchronized (lock) {

//已载入的批量主键

if (lock.isBatchInit) {

long current = lock.current;

long next = current + lock.step;

lock.current = next;

if (current < lock.max) {

log.debug("批量序列分发 - {}:{}", collection, current);

return current;

}

}

//获取主键记录值

SysSequence sequence = sequenceRepository.findOne(collection);

long current = sequence.getCurrent();

long next;

if (sequence.isBatch()) {

//未载入的批量主键 : 直接取一段sequence

next = current + sequence.getStep() * sequence.getBatchCount();

//缓存到lock中

long batchNext = current + sequence.getStep();

lock.init(batchNext, sequence.getStep(), next);

} else {

next = current + sequence.getStep();

}

if (next > sequence.getEnd()) {

throw CheckedException

.of("%s表的sequence超限 - next[%s] max[%s]", collection, next, sequence.getEnd());

}

sequence.setCurrent(next);

sequenceRepository.save(sequence);

log.debug("数据库获取 - {}:{}", collection, current);

return current;

}

}

/**

* 获取sequence的同步lock ,同时用来分发批量主键

*/

class SequenceLock {

String collection;

boolean isBatchInit;

long current = -1;

long step = -1;

long max = -1;

SequenceLock(SysSequence sequence) {

this.collection = sequence.getCollectionName();

}

void init(long current, long step, long max) {

this.isBatchInit = true;

this.current = current;

this.step = step;

this.max = max;

log.debug("初始化批量序列 - {} ,当前 - {} ,步长 - {} ,分发最大值 - {}", collection, current, step, max);

}

}

}

测试服务

@SpringBootTest(classes = {Application.class})

@RunWith(SpringRunner.class)

public class SequenceServiceTest {

@Autowired

SequenceService sequenceService;

@Autowired

SequenceRepository sequenceRepository;

@Before

public void before() {

sequenceRepository.save(

new SysSequence()

.setCollectionName("sys_user")

.setStart(0L).setEnd(Long.MAX_VALUE).setStep(1L)

.setCurrent(0L)

);

sequenceRepository.save(

new SysSequence()

.setCollectionName("sys_user2")

.setStart(0L).setEnd(Long.MAX_VALUE).setStep(10L)

.setCurrent(0L)

);

sequenceRepository.save(

new SysSequence()

.setCollectionName("sys_user3")

.setStart(0L).setEnd(Long.MAX_VALUE).setStep(1L)

.setCurrent(0L)

.setBatch(true).setBatchCount(10)

);

}

@Test

public void getSequence1() {

for (int i = 0; i < 1000; i++) {

sequenceService.getNext(SysUser.class);

}

Assert.assertEquals(sequenceService.getNext(SysUser.class), Long.valueOf(1000L));

}

@Test

public void getSequence2() {

for (int i = 0; i < 1000; i++) {

sequenceService.getNext("sys_user2");

}

Assert.assertEquals(sequenceService.getNext("sys_user2"), Long.valueOf(10000L));

}

@Test

public void getSequence3() {

for (int i = 0; i < 1000; i++) {

sequenceService.getNext("sys_user3");

}

Assert.assertEquals(sequenceService.getNext("sys_user3"), Long.valueOf(1000L));

}

}

测试结果

image.png

可以看到测试成功 ,已经可以获取到有序自增的sequence . 而批量主键的执行速度明显优于数据库存取 ,而且可以减轻数据库压力 ,在不需要严格连续的主键时 ,应该采用批量获取 ,内存发放的方式 ( 分布式ID实现方式之一 ) .

java代码读取dbsequence的值_MongoDB自增序列实现 - Java多线程同步 synchronized 用法相关推荐

  1. java代码读取dbsequence的值_JDBC读取新插入Oracle数据库Sequence值的5种方法

    //公共代码:得到数据库连接 public Connection getConnection() throwsException{ Class.forName("oracle.jdbc.dr ...

  2. Java代码读取MySQL数据,遇到‘0000-00-00’报错Value ‘0000-00-00‘ can not be represented as java.sql.Date

    报错 再使用Java代码读取MySQL数据的时候,读取date格式的数据,然后使用DateTimeFormatter格式化的时候突然在控制台发现了报错,Value '0000-00-00' can n ...

  3. 2021-02-08【Web作业开发记录】Java代码读取文件问题

    [Web作业开发记录]Java代码读取文件问题 1.问题分析 在项目中需要读取properties文件,而对于gradle项目,在java文件夹底下的properties文件在编译打包时会自动忽略,最 ...

  4. Java代码读取图片的两种方式

    不废话在,直接上代码: 方式一: 适用场景: 图片size小 方式二: 适用场景: 图片size大 细节点: java 代码读取图片的方式,从这点入手: 关注我的博客

  5. 基于java 工单管理_实训任务工单1-2(编写规范Java代码) 实训任务工单1-2(编写规范Java代码).docx_学小易找答案...

    [其它]实训任务工单4-1(泛型类.泛型方法的应用) 实训任务工单4-1(泛型类.泛型方法的应用).docx [简答题]教学工单5-1Java序列化机制的使用 [填空题]The name of my ...

  6. Java多线程同步Synchronized使用分析

    同步的概念: 同步分为 同步方法 和 同步块 两种方式. 锁定的内容分为 锁定类的某个特定实例 和 锁定类对象(类的所有实例) 变量分为 实例变量(不带static的变量) 和 类变量(带static ...

  7. java android程序代码_用java 代码读取android应用的一些基本信息

    Android 应用现在到处都是,如果下载一个apk , 不借助与其他工具,你能得到这个应用的版本号,包名入口等信息吗.其实我们完全可以自己写段 java 代码来得到这些信息.下面是测试代码:pack ...

  8. java代码中何处以main开始,Gradle-user guide-第7章 Java 快速开始

    第7章java快速开始 7.1java插件 我们知道,Gradle是一个多用途的构建工具,它可以构建你想在构建脚本中实现的任何事情.不过,作为开箱即用的产品,它不会做任何构建脚本不包含的任务. 大多数 ...

  9. eclipse java代码某一行需要修改注释_看看这些Java代码开发规范吧!你好,我好,大家好!...

    作为一名开发人员,当你接手他人的项目时,且当你阅读他人的代码时,是有没有遇到脑袋充血,感觉Java要把你"送走"的感觉呢?我们在用Java开发技术进行开发前,一定要牢牢恪守Java ...

最新文章

  1. linux message日志只有4k,命令长期运行 常用技巧 Linux 服务器 · 404k的前后端日志...
  2. 程序员买买买,纸书半价,电子书55折,抢券叠加使用更划算
  3. 王贻芳院士:我们的科技管理过度强调竞争,缺乏稳定支持
  4. Oracle-USERS表空间解读
  5. mysql 条件分析_数据分析之mysql
  6. linux 脚本 试题,10个Linux脚本面试题,看看你能答出几个?
  7. c# 获取路径的盘符_c#获取驱动器盘符
  8. C#的WinForm程序应用了XP主题样式之后,ShowDialog方法出现问题的解决
  9. 洛谷P1087 FBI树
  10. fopen打开ftp文件_PHP文件包含漏洞利用思路与Bypass总结手册(一)
  11. buffer正确的拼接方式
  12. leetcode之移除链表的元素
  13. 【GAMES101】三维旋转矩阵中绕三个轴旋转的矩阵公式
  14. 简单的UDP监听需要打开w网路岗才能接收到数据的原因和解决办法
  15. 某公司电子商务网站策划方案
  16. Mysql获取流水号
  17. [VOT10](2022CVPR)TCTrack: Temporal Contexts for Aerial Tracking
  18. 微信支付成功后服务器宕机了,今天微信出现大面积宕机,可能与支付宝有关?...
  19. 小明发布_历时一年零四个月,付出终有回报!救助站无微不至的照顾,头条四次跟进发布寻亲信息,最终帮助受助青年找到家人...
  20. 常用图像增强算法实现——直方图均衡

热门文章

  1. (转)Unity3D研究院之手游开发中所有特殊的文件夹(assetbundle与Application.persistentDataPath)...
  2. iOS 有用的代码片段
  3. 去除Office 2010的右键“共享文件夹同步”菜单
  4. 商业实战第三场 电视直销好记星
  5. 也谈淘点点60s短信订单的架构设计
  6. InnoDB和MyISAM引擎的效率比较
  7. linux qemu 报错 Unable to reserve 0xfffff000 bytes of virtual address space at 0x1000 解决方法
  8. java 反序列化 ysoserial exploit/JRMPClient 原理剖析
  9. python3 错误 Max retries exceeded with url 解决方法
  10. shodan 撒旦 新手入坑指南