Scala教程之:Future和Promise
文章目录
- 定义返回Future的方法
- 阻塞方式获取Future的值
- 非阻塞方式获取Future的值
- Future链
- flatmap VS map
- Future.sequence() VS Future.traverse()
- Future.foldLeft VS Future reduceLeft
- Future firstCompletedOf
- Future zip VS zipWith
- Future andThen
- 自定义threadpool
- recover() recoverWith() and fallbackTo()
- promise
在scala中可以方便的实现异步操作,这里是通过Future来实现的,和java中的Future很相似,但是功能更加强大。
定义返回Future的方法
下面我们看下如何定义一个返回Future的方法:
println("Step 1: Define a method which returns a Future")
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
def donutStock(donut: String): Future[Int] = Future {// assume some long running database operationprintln("checking donut stock")10
}
注意这里需要引入scala.concurrent.ExecutionContext.Implicits.global, 它会提供一个默认的线程池来异步执行Future。
阻塞方式获取Future的值
println("\nStep 2: Call method which returns a Future")import scala.concurrent.Awaitimport scala.concurrent.duration._val vanillaDonutStock = Await.result(donutStock("vanilla donut"), 5 seconds)println(s"Stock of vanilla donut = $vanillaDonutStock")
donutStock() 是异步执行的,我们可以使用Await.result() 来阻塞主线程来等待donutStock()的执行结果。
下面是其输出:
Step 2: Call method which returns a Future
checking donut stock
Stock of vanilla donut = 10
非阻塞方式获取Future的值
我们可以使用Future.onComplete() 回调来实现非阻塞的通知:
println("\nStep 2: Non blocking future result")
import scala.util.{Failure, Success}
donutStock("vanilla donut").onComplete {case Success(stock) => println(s"Stock for vanilla donut = $stock")case Failure(e) => println(s"Failed to find vanilla donut stock, exception = $e")
}
Thread.sleep(3000)
Future.onComplete() 有两种可能情况,Success 或者 Failure,需要引入: import scala.util.{Failure, Success}。
Future链
有时候我们需要在获得一个Future之后再继续对其进行操作,有点类似于java中的管道,下面看一个例子:
println("\nStep 2: Define another method which returns a Future")
def buyDonuts(quantity: Int): Future[Boolean] = Future {println(s"buying $quantity donuts")true
}
上面我们又定义了一个方法,用来接收donutStock()的返回值,然后再返回一个Future[Boolean] 。
我们看下使用flatmap该怎么链接他们:
println("\nStep 3: Chaining Futures using flatMap")
val buyingDonuts: Future[Boolean] = donutStock("plain donut").flatMap(qty => buyDonuts(qty))
import scala.concurrent.Await
import scala.concurrent.duration._
val isSuccess = Await.result(buyingDonuts, 5 seconds)
println(s"Buying vanilla donut was successful = $isSuccess")
同样的,我们还可以使用for语句来进行链接:
println("\nStep 3: Chaining Futures using for comprehension")
for {stock <- donutStock("vanilla donut")isSuccess <- buyDonuts(stock)
} yield println(s"Buying vanilla donut was successful = $isSuccess")Thread.sleep(3000)
flatmap VS map
map就是对集合中的元素进行重映射,而flatmap则会将返回的值拆散然后重新组合。 下面举个直观的例子:
val buyingDonuts: Future[Boolean] = donutStock("plain donut").flatMap(qty => buyDonuts(qty))
flatMap返回的值是Future[Boolean]。
val buyingDonuts: Future[Future[Boolean]] = donutStock("plain donut").Map(qty => buyDonuts(qty))
map返回的值是Future[Future[Boolean]]。
Future.sequence() VS Future.traverse()
如果我们有很多个Future,然后想让他们并行执行,则可以使用 Future.sequence() 。
println(s"\nStep 2: Create a List of future operations")
val futureOperations = List(donutStock("vanilla donut"),donutStock("plain donut"),donutStock("chocolate donut")
)println(s"\nStep 5: Call Future.sequence to run the future operations in parallel")
val futureSequenceResults = Future.sequence(futureOperations)
futureSequenceResults.onComplete {case Success(results) => println(s"Results $results")case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")
}
Future.traverse() 和Future.sequence() 类似, 唯一不同的是,Future.traverse()可以对要执行的Future进行操作,如下所示:
println(s"\nStep 3: Call Future.traverse to convert all Option of Int into Int")
val futureTraverseResult = Future.traverse(futureOperations){ futureSomeQty =>futureSomeQty.map(someQty => someQty.getOrElse(0))
}
futureTraverseResult.onComplete {case Success(results) => println(s"Results $results")case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")
}
Future.foldLeft VS Future reduceLeft
foldLeft 和 reduceLeft 都是用来从左到右做集合操作的,区别在于foldLeft可以提供默认值。看下下面的例子:
println(s"\nStep 3: Call Future.foldLeft to fold over futures results from left to right")
val futureFoldLeft = Future.foldLeft(futureOperations)(0){ case (acc, someQty) =>acc + someQty.getOrElse(0)
}
futureFoldLeft.onComplete {case Success(results) => println(s"Results $results")case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")
}
输出结果:
Step 3: Call Future.foldLeft to fold over futures results from left to right
Results 20
println(s"\nStep 3: Call Future.reduceLeft to fold over futures results from left to right")
val futureFoldLeft = Future.reduceLeft(futureOperations){ case (acc, someQty) =>acc.map(qty => qty + someQty.getOrElse(0))
}
futureFoldLeft.onComplete {case Success(results) => println(s"Results $results")case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")
}
输出结果:
Step 3: Call Future.reduceLeft to fold over futures results from left to right
Results Some(20)
Future firstCompletedOf
firstCompletedOf在处理多个Future请求时,会返回第一个处理完成的future结果。
println(s"\nStep 3: Call Future.firstCompletedOf to get the results of the first future that completes")
val futureFirstCompletedResult = Future.firstCompletedOf(futureOperations)
futureFirstCompletedResult.onComplete {case Success(results) => println(s"Results $results")case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")
}
Future zip VS zipWith
zip用来将两个future结果组合成一个tuple. zipWith则可以自定义Function来处理future返回的结果。
println(s"\nStep 3: Zip the values of the first future with the second future")
val donutStockAndPriceOperation = donutStock("vanilla donut") zip donutPrice()
donutStockAndPriceOperation.onComplete {case Success(results) => println(s"Results $results")case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")
}
输出值:
Step 3: Zip the values of the first future with the second future
checking donut stock
Results (Some(10),3.25)
使用zipwith的例子:
println(s"\nStep 4: Call Future.zipWith and pass-through function qtyAndPriceF")
val donutAndPriceOperation = donutStock("vanilla donut").zipWith(donutPrice())(qtyAndPriceF)
donutAndPriceOperation.onComplete {case Success(result) => println(s"Result $result")case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")
}
输出结果:
Step 4: Call Future.zipWith and pass-through function qtyAndPriceF
checking donut stock
Result (10,3.25)
Future andThen
andThen后面可以跟一个自定义的PartialFunction,来处理Future返回的结果, 如下所示:
println(s"\nStep 2: Call Future.andThen with a PartialFunction")
val donutStockOperation = donutStock("vanilla donut")
donutStockOperation.andThen { case stockQty => println(s"Donut stock qty = $stockQty")}
输出结果:
Step 2: Call Future.andThen with a PartialFunction
checking donut stock
Donut stock qty = Success(10)
自定义threadpool
上面的例子中, 我们都是使用了scala的全局ExecutionContext: scala.concurrent.ExecutionContext.Implicits.global.
同样的,我们也可以自定义你自己的ExecutionContext。下面是一个使用java.util.concurrent.Executors的例子:
println("Step 1: Define an ExecutionContext")val executor = Executors.newSingleThreadExecutor()implicit val ec = scala.concurrent.ExecutionContext.fromExecutor(executor)println("\nStep 2: Define a method which returns a Future")import scala.concurrent.Futuredef donutStock(donut: String): Future[Int] = Future {// assume some long running database operationprintln("checking donut stock")10}println("\nStep 3: Call method which returns a Future")val donutStockOperation = donutStock("vanilla donut")donutStockOperation.onComplete {case Success(donutStock) => println(s"Results $donutStock")case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")}Thread.sleep(3000)executor.shutdownNow()
recover() recoverWith() and fallbackTo()
这三个方法主要用来处理异常的,recover是用来从你已知的异常中恢复,如下所示:
println("\nStep 3: Call Future.recover to recover from a known exception")
donutStock("unknown donut").recover { case e: IllegalStateException if e.getMessage == "Out of stock" => 0 }.onComplete {case Success(donutStock) => println(s"Results $donutStock")case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")
}
recoverWith()和recover()类似,不同的是他的返回值是一个Future。
println("\nStep 3: Call Future.recoverWith to recover from a known exception")
donutStock("unknown donut").recoverWith { case e: IllegalStateException if e.getMessage == "Out of stock" => Future.successful(0) }.onComplete {case Success(donutStock) => println(s"Results $donutStock")case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")
}
fallbackTo()是在发生异常时,去调用指定的方法:
println("\nStep 3: Call Future.fallbackTo")
val donutStockOperation = donutStock("plain donut").fallbackTo(similarDonutStock("vanilla donut")).onComplete {case Success(donutStock) => println(s"Results $donutStock")case Failure(e) => println(s"Error processing future operations, error = ${e.getMessage}")
}
promise
熟悉ES6的同学可能知道,promise是JS在ES6中引入的新特性,其主要目的是将回调转变成链式调动。
当然scala的promise和ES6的promise还是不一样的,我们看下scala中promise是怎么用的:
println("Step 1: Define a method which returns a Future")import scala.concurrent.ExecutionContext.Implicits.globaldef donutStock(donut: String): Int = {if(donut == "vanilla donut") 10else throw new IllegalStateException("Out of stock")}println(s"\nStep 2: Define a Promise of type Int")val donutStockPromise = Promise[Int]()println("\nStep 3: Define a future from Promise")val donutStockFuture = donutStockPromise.futuredonutStockFuture.onComplete {case Success(stock) => println(s"Stock for vanilla donut = $stock")case Failure(e) => println(s"Failed to find vanilla donut stock, exception = $e")}println("\nStep 4: Use Promise.success or Promise.failure to control execution of your future")val donut = "vanilla donut"if(donut == "vanilla donut") {donutStockPromise.success(donutStock(donut))} else {donutStockPromise.failure(Try(donutStock(donut)).failed.get)}println("\nStep 5: Completing Promise using Promise.complete() method")val donutStockPromise2 = Promise[Int]()val donutStockFuture2 = donutStockPromise2.futuredonutStockFuture2.onComplete {case Success(stock) => println(s"Stock for vanilla donut = $stock")case Failure(e) => println(s"Failed to find vanilla donut stock, exception = $e")}donutStockPromise2.complete(Try(donutStock("unknown donut")))
上面例子中我们使用了 Promise.success, Promise.failure, Promise.complete() 来控制程序的运行。
更多精彩内容且看:
- 区块链从入门到放弃系列教程-涵盖密码学,超级账本,以太坊,Libra,比特币等持续更新
- Spring Boot 2.X系列教程:七天从无到有掌握Spring Boot-持续更新
- Spring 5.X系列教程:满足你对Spring5的一切想象-持续更新
- java程序员从小工到专家成神之路(2020版)-持续更新中,附详细文章教程
更多教程请参考 flydean的博客
Scala教程之:Future和Promise相关推荐
- C++11多线程之future和promise
std::future和promise的作用是在不同线程之间传递数据.使用指针也可以完成数据的传递,但是指针非常危险,因为互斥量不能阻止指针的访问:而且指针的方式传递的数据是固定的,如果更改数据类型, ...
- scala教程之:可见性规则
文章目录 public Protected private scoped private 和 scoped protected 和java很类似,scala也有自己的可见性规则,不同的是scala只有 ...
- Scala教程之:深入理解协变和逆变
文章目录 函数的参数和返回值 可变类型的变异 在之前的文章中我们简单的介绍过scala中的协变和逆变,我们使用+ 来表示协变类型:使用-表示逆变类型:非转化类型不需要添加标记. 假如我们定义一个cla ...
- Scala教程之:Either
在之前的文章中我们提到了Option,scala中Option表示存在0或者1个元素,如果在处理异常的时候Option就会有很大的限制,因为Option如果返回None,那么我并不知道具体的异常到底是 ...
- Scala教程之:可变和不变集合
文章目录 mutable HashMap immutable HashMap 集合在程序中是非常有用的,只有用好集合才能真正感受到该语言的魅力.在scala中集合主要在三个包里面:scala.coll ...
- Scala教程之:PartialFunction
Scala中有一个很有用的traits叫PartialFunction,我看了下别人的翻译叫做偏函数,但是我觉得部分函数更加确切. 那么PartialFunction是做什么用的呢?简单点说Parti ...
- Scala教程之:Enumeration
Enumeration应该算是程序语言里面比较通用的一个类型,在scala中也存在这样的类型, 我们看下Enumeration的定义: abstract class Enumeration (init ...
- Scala教程之:Option-Some-None
文章目录 Option和Some Option和None Option和模式匹配 在java 8中,为了避免NullPointerException,引入了Option,在Scala中也有同样的用法. ...
- Scala教程之:scala的参数
文章目录 默认参数值 命名参数 scala的参数有两大特点: 默认参数值 命名参数 默认参数值 在Scala中,可以给参数提供默认值,这样在调用的时候可以忽略这些具有默认值的参数. def log(m ...
最新文章
- shell 常用命令
- springboot整合rabbitmq(搭建)
- STL9-vector容器
- mysql在test库中创建表stu_1.在mysql的test数据库中新建表,表名为student,表结构如下:...
- Android Gradle插件(plugin)版本(version)与Gradle、SDK Build Tools版本关系
- 第12章 坚持一百秒(《C和C++游戏趣味编程》教学视频)
- ABP理论学习之Abp Session
- 马云请不动郭盛华的原因?原来背后还有更神秘的人物
- 过计算机管理共享文件夹,局域网中怎么查看自己共享过的文件
- Java 自定义Excel数据排序
- mysql phpmyadmin 安装_phpMyAdmin 安装
- JavaBase 求 个位,十位,百位,千位
- 教授专栏17 | 许佳龙:银行查找网安漏洞 提高公众风险意识
- VMware如何导出和导入OVF文件
- Python爬虫 | Python爬虫获取女友图片
- ffmpeg里转场transition
- (Python)卫星RPC有理多项式模型读取与正反投影坐标计算原理与实现
- 数据库课程设计 人事管理系统
- 不重启显示新增硬盘(虚拟机)
- python列表写入txt文件中文乱码,python 字典格式的文本写入文件,中文乱码(Unicode)的问题...
热门文章
- python支持的编程范式有_【Python学习手册】chapter1 前面
- 案例逐步演示python利用正则表达式提取指定内容并输出到csv
- 安装php-redis遇到Error: Package: php-pecl-igbinary-1.2.1-1.el7.x86_64 (epel)
- NJUST1712(形成三角形面积为整数的个数)
- HDU4259(简单群置换)
- C++的Json解析库:jsoncpp和boost
- 监听以太网(三) Packet32数据结构说明
- WebRTC 的 log 系统实现分析
- 分布式事务之底层原理揭秘
- 从 wiscKey 看 LSMtree 的不足