javacurrentmap_Java 8 并发: 原子变量和 ConcurrentMap
AtomicInteger
java.concurrent.atomic 包下有很多原子操作的类。 在有些情况下,原子操作可以在不使用 synchronized 关键字和锁的情况下解决多线程安全问题。
在内部,原子类大量使用 CAS, 这是大多数现在 CPU 支持的原子操作指令, 这些指令通常情况下比锁同步要快得多。如果需要同时改变一个变量, 使用原子类是极其优雅的。
现在选择一个原子类 AtomicInteger 作为例子
AtomicInteger atomicInt = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 1000)
.forEach(i -> executor.submit(atomicInt::incrementAndGet));
stop(executor);
System.out.println(atomicInt.get()); // => 1000
使用 AtomicInteger 代替 Integer 可以在线程安全的环境中增加变量, 而不要同步访问变量。incrementAndGet() 方法是一个原子操作, 我们可以在多线程中安全的调用。
AtomicInteger 支持多种的原子操作, updateAndGet() 方法接受一个 lambda 表达式,以便对整数做任何的算术运算。
AtomicInteger atomicInt = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 1000)
.forEach(i -> {
Runnable task = () ->
atomicInt.updateAndGet(n -> n + 2);
executor.submit(task);
});
stop(executor);
System.out.println(atomicInt.get()); // => 2000
accumulateAndGet() 方法接受一个 IntBinaryOperator 类型的另一种 lambda 表达式, 我们是用这种方法来计算 1 -- 999 的和:
AtomicInteger atomicInt = new AtomicInteger(0);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 1000)
.forEach(i -> {
Runnable task = () ->
atomicInt.accumulateAndGet(i, (n, m) -> n + m);
executor.submit(task);
});
stop(executor);
System.out.println(atomicInt.get()); // => 499500
LongAdder
作为 AtomicLong 的替代, LongAdder 类可以用来连续地向数字添加值。
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 1000)
.forEach(i -> executor.submit(adder::increment));
stop(executor);
System.out.println(adder.sumThenReset()); // => 1000
LongAdder 类和其他的整数原子操作类一样提供了 add() 和 increment() 方法, 同时也是线程安全的。但其内部的结果不是一个单一的值, 这个类的内部维护了一组变量来减少多线程的争用。实际结果可以通过调用 sum() 和 sumThenReset() 来获取。
当来自多线程的更新比读取更频繁时, 这个类往往优于其他的原子类。通常作为统计数据, 比如要统计 web 服务器的请求数量。 LongAdder 的缺点是会消耗更多的内存, 因为有一组变量保存在内存中。
LongAccumulator
LongAccumulator 是 LongAdder 的一个更通用的版本。它不是执行简单的添加操作, 类 LongAccumulator 围绕 LongBinaryOperator 类型的lambda表达式构建,如代码示例中所示:
LongBinaryOperator op = (x, y) -> 2 * x + y;
LongAccumulator accumulator = new LongAccumulator(op, 1L);
ExecutorService executor = Executors.newFixedThreadPool(2);
IntStream.range(0, 10)
.forEach(i -> executor.submit(() -> accumulator.accumulate(i)));
stop(executor);
System.out.println(accumulator.getThenReset()); // => 2539
我们使用函数 2 * x + y 和初始值1创建一个 LongAccumulator。 每次调用 accumulate(i) , 当前结果和值i都作为参数传递给`lambda 表达式。
像 LongAdder 一样, LongAccumulator 在内部维护一组变量以减少对线程的争用。
ConcurrentMap
ConcurrentMap 接口扩展了 Map 接口,并定义了最有用的并发集合类型之一。 Java 8 通过向此接口添加新方法引入了函数式编程。
在下面的代码片段中, 来演示这些新的方法:
ConcurrentMap map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");
forEach() 接受一个类型为 BiConsumer 的 lambda 表达式, 并将 map 的 key 和 value 作为参数传递。
map.forEach((key, value) -> System.out.printf("%s = %s\n", key, value));
putIfAbsent() 方法只有当给定的 key 不存在时才将数据存入 map 中, 这个方法和 put 一样是线程安全的, 当多个线程访问 map 时不要做同步操作。
String value = map.putIfAbsent("c3", "p1");
System.out.println(value); // p0
getOrDefault() 方法返回给定 key 的 value, 当 key 不存在时返回给定的值。
String value = map.getOrDefault("hi", "there");
System.out.println(value); // there
replaceAll() 方法接受一个 BiFunction 类型的 lambda 表达式, 并将 key 和 value 作为参数传递,用来更新 value。
map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value);
System.out.println(map.get("r2")); // d3
compute() 方法和 replaceAll() 方法有些相同, 不同的是它多一个参数, 用来更新指定 key 的 value
map.compute("foo", (key, value) -> value + value);
System.out.println(map.get("foo")); // barbar
ConcurrentHashMap
以上所有方法都是 ConcurrentMap 接口的一部分,因此可用于该接口的所有实现。 此外,最重要的实现 ConcurrentHashMap 已经进一步增强了一些新的方法来在 Map 上执行并发操作。
就像并行流一样,这些方法在 Java 8 中通过 ForkJoinPool.commonPool()提供特殊的 ForkJoinPool 。该池使用预设的并行性, 这取决于可用内核的数量。 我的机器上有四个CPU内核可以实现三种并行性:
System.out.println(ForkJoinPool.getCommonPoolParallelism()); // 3
通过设置以下 JVM 参数可以减少或增加此值:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
我们使用相同的示例来演示, 不过下面使用 ConcurrentHashMap 类型, 这样可以调用更多的方法。
ConcurrentHashMap map = new ConcurrentHashMap<>();
map.put("foo", "bar");
map.put("han", "solo");
map.put("r2", "d2");
map.put("c3", "p0");
Java 8 引入了三种并行操作:forEach, search 和 reduce。 每个操作都有四种形式, 分别用 key, value, entries和 key-value 来作为参数。
所有这些方法的第一个参数都是 parallelismThreshold 阀值。 该阈值表示操作并行执行时的最小收集大小。 例如, 如果传递的阈值为500,并且 map 的实际大小为499, 则操作将在单个线程上按顺序执行。 在下面的例子中,我们使用一个阈值来强制并行操作。
ForEach
方法 forEach() 能够并行地迭代 map 的键值对。 BiConsumer 类型的 lambda 表达式接受当前迭代的 key 和 value。 为了可视化并行执行,我们将当前线程名称打印到控制台。 请记住,在我的情况下,底层的 ForkJoinPool 最多使用三个线程。
map.forEach(1, (key, value) ->
System.out.printf("key: %s; value: %s; thread: %s\n",
key, value, Thread.currentThread().getName()));
// key: r2; value: d2; thread: main
// key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1
// key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2
// key: c3; value: p0; thread: main
Search
search() 方法接受一个 BiFunction 类型的 lambda 表达式, 它能对 map 做搜索操作, 如果当前迭代不符合所需的搜索条件,则返回 null。 请记住,ConcurrentHashMap 是无序的。 搜索功能不应该取决于地图的实际处理顺序。 如果有多个匹配结果, 则结果可能是不确定的。
String result = map.search(1, (key, value) -> {
System.out.println(Thread.currentThread().getName());
if ("foo".equals(key)) {
return value;
}
return null;
});
System.out.println("Result: " + result);
// ForkJoinPool.commonPool-worker-2
// main
// ForkJoinPool.commonPool-worker-3
// Result: bar
下面是对 value 的搜索
String result = map.searchValues(1, value -> {
System.out.println(Thread.currentThread().getName());
if (value.length() > 3) {
return value;
}
return null;
});
System.out.println("Result: " + result);
// ForkJoinPool.commonPool-worker-2
// main
// main
// ForkJoinPool.commonPool-worker-1
// Result: solo
Reduce
reduce() 方法接受两个类型为 BiFunction 的 lambda 表达式。 第一个函数将每个键值对转换为任何类型的单个值。 第二个函数将所有这些转换后的值组合成一个结果, 其中火忽略 null 值。
String result = map.reduce(1,
(key, value) -> {
System.out.println("Transform: " + Thread.currentThread().getName());
return key + "=" + value;
},
(s1, s2) -> {
System.out.println("Reduce: " + Thread.currentThread().getName());
return s1 + ", " + s2;
});
System.out.println("Result: " + result);
// Transform: ForkJoinPool.commonPool-worker-2
// Transform: main
// Transform: ForkJoinPool.commonPool-worker-3
// Reduce: ForkJoinPool.commonPool-worker-3
// Transform: main
// Reduce: main
// Reduce: main
// Result: r2=d2, c3=p0, han=solo, foo=bar
javacurrentmap_Java 8 并发: 原子变量和 ConcurrentMap相关推荐
- Java 8 并发: 原子变量和 ConcurrentMap
原文地址: Java 8 Concurrency Tutorial: Atomic Variables and ConcurrentMap AtomicInteger java.concurrent. ...
- 聊聊高并发(二十)解析java.util.concurrent各个组件(二) 12个原子变量相关类
这篇说说java.util.concurrent.atomic包里的类,总共12个,网上有很多文章解析这几个类,这里挑些重点说说. 这12个类可以分为三组: 1. 普通类型的原子变量 2. 数组类型的 ...
- JAVA 并发编程实践 - 原子变量与非阻塞同步机制 笔记
2019独角兽企业重金招聘Python工程师标准>>> 非阻塞算法: 利用底层的源自机器指令(比如CAS)代替锁来实现数据在并发访问中的一致性.应用于:操作系统和JVM中实现线程/进 ...
- 并发编程(1): volatile、原子变量、自旋锁和互斥锁
并发编程三条特性: 原子性 原子性是指一个操作是不可中断的,要么全部执行成功要么全部执行失败. 可见性 可见性是指当一个线程修改了共享变量后,其他线程能够立即看见这个修改. 有序性 有序性是指程序指令 ...
- java原子变量的作用_AtomicInteger原子类的作用介绍(代码示例)
本篇文章给大家带来的内容是关于AtomicInteger原子类的作用介绍(代码示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助. AtomicInteger 原子类的作用 多线程操 ...
- 原子变量、锁、内存屏障,写得非常好!
突然想聊聊这个话题,是因为知乎上的一个问题多次出现在了我的Timeline里:请问,多个线程可以读一个变量,只有一个线程可以对这个变量进行写,到底要不要加锁?可惜的是很多高票答案语焉不详,甚至有所错漏 ...
- Java多线程(二)之Atomic:原子变量与原子类
一.何谓Atomic? Atomic一词跟原子有点关系,后者曾被人认为是最小物质的单位.计算机中的Atomic是指不能分割成若干部分的意思.如果一段代码被认为是Atomic,则表示这段代码在执行过程中 ...
- Linux内核ARM构架中原子变量的底层实现研究
前段时间重新研究了一下Linux的并发控制机制,对于内核的自旋锁.互斥锁.信号量等机制及其变体做了底层代码上的研究.因为只有从原理上理解了这些机制,在编写驱动的时候才会记得应该注意什么.这些机制基本都 ...
- 谈论Java原子变量和同步的效率 -- 颠覆你的生活
我们认为,由于思维定式原子变量总是比同步运行的速度更快,我想是这样也已经,直到实现了ID在第一次测试过程生成器不具有在这样一个迷迷糊糊的东西. 测试代码: import java.util.Array ...
最新文章
- python一点基础都没有的怎么办-你们都是怎么学 Python 的?
- C# 数组与 list 互相转换案例
- mysql不能删除外键吗,MySQL不能删除外键约束所需的索引
- 输入序列连续的序列检测
- 深挖“窄带高清”的实现原理
- 12-1 12 防盗链 访问控制 php解析 代理
- 国内首位!Node.js社区将阿里云工程师张秋怡吸纳为CTC成员
- 分析JQ作者的类实现过程
- 进程和线程的理解(转)
- 小米线刷包需要解压么_小米10刷机教程,线刷升级更新官方系统包
- 【论文概述】AVOD (2018)
- Modbus RTU转Modbus TCP网关的应用
- 电子元件知识汇总2-封装
- 快速学会CC2530单片机基础点灯
- STANet简单介绍
- SQL获取当前月份的第一天
- 从此告别PPT制作的烦恼:ChatGPT和MindShow帮你快速完成
- 印孚瑟斯被 IDC MarketScape 报告评为领导者
- 流量精灵刷流量的实例教程
- Zynq 学习笔记(4)Linux FPGA Manager