SCAla : Futures / Promises 及更多






4.50/5 (2投票s)
我是一名 .NET 开发者,而且已经很久了,非常非常习惯处理 .NET 框架的任务库。很明显,我指的是 TPL 和现在的 Async/Await。所以现在我正在做越来越多的 Scala,我想看看 Scala 中等效的代码会是什么样的,因为我很喜欢 .NET 中的 Task(s)。[…]
我是一名 .NET 开发者,而且已经很久了,非常非常习惯处理 .NET 框架的任务库。很明显,我指的是 TPL 和现在的 Async/Await。
所以现在我正在做越来越多的 Scala,我想看看 Scala 中等效的代码会是什么样的,因为我很喜欢 .NET 中的 Task(s)。
假设我拥有这段 .NET 代码,由于使用了回调,所以它不是阻塞的。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
var task = Task.Run(() =>
{
return 40;
});
task.ContinueWith(ant =>
{
Console.WriteLine(ant.Result);
}, TaskContinuationOptions.OnlyOnRanToCompletion);
task.ContinueWith(ant =>
{
Console.WriteLine("BAD NEWS");
}, TaskContinuationOptions.OnlyOnFaulted);
Console.ReadLine();
}
}
}
粗略地说,我们可以将其分解为 Scala 中的以下等价项:
- Scala 中的 .NET Task 大致等同于 Scala Future
- .NET 中 task.ContinueWith 回调等同于 Scala 中的 Future 回调
我们可以进一步进行比较。所以让我们将 .NET 代码更改为这段代码,它现在是阻塞的,因为我们不再使用任何回调,而是使用 Task.Result 属性,这会导致 Task 被“观察”。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
var task = Task.Run(() =>
{
return 40;
});
var x = task.Result;
Console.ReadLine();
}
}
}
在 Scala 中,这可以通过使用 scala.concurrent.Await.ready / scala.concurrent.Await.result 来完成,我们稍后会看到更多。我们将花一些时间看看创建和使用 Futures(Scala 的 Task 等价物)的一些底层细节。
特点
Future 是一个包含某个值(该值可能在某个时间点可用)的对象。该值通常是其他计算的结果。
- 如果计算尚未完成,我们说 Future 未完成。
- 如果计算已成功完成或出现异常,我们说 Future 已完成。
完成可以采取两种形式之一:
- 当 Future 以一个值完成时,我们说 Future 已成功地用该值完成。
- 当 Future 因计算抛出的异常而完成时,我们说 Future 因该异常而失败。
Future 的一个重要特性是它只能被赋值一次。一旦 Future 对象被赋予一个值或异常,它实际上就变成不可变的——它永远不能被覆盖。
创建 Future 对象的最简单方法是调用 future 方法,该方法启动异步计算并返回一个包含该计算结果的 future。结果在 future 完成后即可获得。
请注意,Future[T] 是表示 future 对象的类型,而 future 是一个创建和调度异步计算的方法,然后返回一个将用该计算结果完成的 future 对象。
https://docs.scala-lang.org.cn/overviews/core/futures.html 更新于 2015/11/10
让我们看一个例子。这个简单的例子创建了一个 Future[Int]。
import scala.concurrent._
import ExecutionContext.Implicits.global
object ClassesDemo
{
def main(args: Array[String]) = {
//Creating a Future
val intFuture: Future[Int] = Future { 23 }
}
}
你可能想知道 Future.apply() 方法是如何想出一个可能在未来某个时候完成的计算的。
答案在于使用 Promises,我们稍后会看到。
回调
所以,继续我介绍段落中展示的 .NET 示例,我展示了如何使用 Task.ContinueWith(..),它运行一个延续。
在 Scala 中,我们可以做同样的事情,但这只是被称为“callback”。与 .NET 延续一样,Scala callback 是非阻塞的。
Callback 易于使用,这里有一个例子。
import scala.concurrent.{ExecutionContext, duration, Future, Await}
import scala.reflect.runtime.universe._
import scala.reflect._
import scala.reflect.runtime._
import scala.util
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
object ClassesDemo
{
def main(args: Array[String]) =
{
val intFuture: Future[Int] = Future { 23 }
//use a "callback" which is non blocking
intFuture onComplete {
case Success(t) =>
{
println(t)
}
case Failure(e) =>
{
println(s"An error has occured: $e.getMessage")
}
}
}
}
等待 Future
我们也可以等待 Future。我们可以使用 scala.concurrent.Await 类的 2 个方法来做到这一点,如下所述。一个重要的注意事项是下面显示的 2 个方法是阻塞的,因此应谨慎使用。
Await.ready
//Await the "completed" state of an Awaitable.
def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type
Await.result
//Await and return the result (of type T) of an Awaitable.
def result[T](awaitable: Awaitable[T], atMost: Duration): T
让我们看一个这两种方法的例子。
import scala.concurrent.{ExecutionContext, duration, Future, Await}
import scala.reflect.runtime.universe._
import scala.reflect._
import scala.reflect.runtime._
import scala.util
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
object ClassesDemo {
def main(args: Array[String]) =
{
//Await.ready
lazy val intFuture: Future[Int] = Future { 23 }
val result: Try[Int] = Await.ready(intFuture, 10 seconds).value.get
val resultEither = result match {
case Success(t) => Right(t)
case Failure(e) => Left(e)
}
resultEither match {
case Right(t) => println(t)
case Left(e) => println(e)
}
//Await.result
lazy val stringFuture = Future { "hello" }
val theString :String = Await.result(stringFuture, 1 second)
println(theString)
}
}
运行后将产生以下输出:
这里有一些其他的链接,有助于对这一点进行背景阅读:
- https://scala-lang.org.cn/files/archive/nightly/docs/library/index.html#scala.concurrent.package
- http://stackoverflow.com/questions/21417317/await-a-future-receive-an-either
函数式组合
我们展示的回调机制足以将 future 结果与后续计算链接起来。然而,它有时很不方便,并且会导致代码冗长。幸运的是,scala Future[T] 类非常强大,并附带了许多组合器,可以帮助你编写更简洁的代码。
如果只有 .Net Task 拥有其中一些方法(哦,等等,RX (reactive extensions 有)),那我们就笑了。
不过,现在只需知道 Future[T] 配备了一些你可以使用的不错的组合器。我将在下面介绍其中一些,但你应该自己做一些更多的研究。
Map 示例
在这个例子中,我们使用 Future[T].map 将结果从一个 Future[T] 转换为一个新的 T 类型,例如 TR。
import scala.concurrent.{ExecutionContext, duration, Future, Await}
import scala.reflect.runtime.universe._
import scala.reflect._
import scala.reflect.runtime._
import scala.util
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
object ClassesDemo {
def main(args: Array[String]) =
{
val rateQuoteFuture : Future[Double] = Future {
1.5
}
val formattedRateFuture = rateQuoteFuture map { quote =>
println(quote)
s"Rate was : $quote"
}
formattedRateFuture onComplete {
case Success(formatted) => println(formatted)
case Failure(x) => {
println(x)
}
}
System.in.read()
}
}
For
我们也可以使用 For 和 Future[T] (这里有一个我厚颜无耻地从 Scala 文档中偷来的例子)。
val usdQuote = Future { connection.getCurrentValue(USD) }
val chfQuote = Future { connection.getCurrentValue(CHF) }
val purchase = for {
usd <- usdQuote
chf <- chfQuote
if isProfitable(usd, chf)
} yield connection.buy(amount, chf)
purchase onSuccess {
case _ => println("Purchased " + amount + " CHF")
}
WithFilter
或者如何提供一个过滤器?这可以通过 WithFilter 方法完成。
val purchase = usdQuote flatMap {
usd =>
chfQuote
.withFilter(chf => isProfitable(usd, chf))
.map(chf => connection.buy(amount, chf))
}
Promises
到目前为止,我们只考虑了使用 future 方法创建的异步计算创建的 Future 对象。但是,future 也可以使用 promises 来创建。
虽然 futures 被定义为一种只读占位符对象,用于表示尚未存在的结果,但 promise 可以被认为是一个可写的、单次赋值的容器,它完成了 future。也就是说,promise 可以用于通过“完成”promise(使用 success 方法)成功地用一个值完成 future。相反,promise 也可以通过使 promise 失败(使用 failure 方法)来用一个异常完成 future。
https://docs.scala-lang.org.cn/overviews/core/futures.html 更新于 2015/11/10
我作为一名 .NET 开发者的思考方式是,Promises 几乎与 TaskCompletionSource 相同。
为了理解 Promise 和 Future 之间的关联,让我们看一下 Future.apply() 方法的签名,它看起来像这样:
def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = impl.Future(body)
如果我们进一步检查,可以看到它具有此实现代码,我们实际上正在使用 Promise 来完成/失败 Future 计算。
private[concurrent] object Future {
class PromiseCompletingRunnable[T](body: => T) extends Runnable {
val promise = new Promise.DefaultPromise[T]()
override def run() = {
promise complete {
try Success(body) catch { case NonFatal(e) => Failure(e) }
}
}
}
def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = {
val runnable = new PromiseCompletingRunnable(body)
executor.prepare.execute(runnable)
runnable.promise.future
}
}
Scala Async 库
本节中我谈论的大部分内容都包含在一篇很棒的文章中:
http://engineering.roundupapp.co/the-future-is-not-good-enough-coding-with-async-await/
这里有一个使用多个 Future(s) 的小例子。
这有几个问题,即:
- 每个新 Future 都需要新的嵌套才能使用。
- 它不处理不 happy path(失败)。
- 它相当顺序化。
import scala.concurrent.{ExecutionContext, duration, Future, Await}
import scala.reflect.runtime.universe._
import scala.reflect._
import scala.reflect.runtime._
import scala.util
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
object ClassesDemo {
def main(args: Array[String]) =
{
val future1 : Future[Double] = Future { 1 }
val future2 : Future[Double] = Future { 2 }
val future3 : Future[Double] = Future { 3 }
import scala.concurrent.ExecutionContext.Implicits.global
val (f1,f2,f3) = (future1, future2, future3)
f1 onSuccess { case r1 =>
f2 onSuccess { case r2 =>
f3 onSuccess { case r3 =>
println(s"Sum: ${r1 + r2 + r3}")
}
}
}
System.in.read()
}
}
这有几个问题,即:
- 每个新 Future 都需要新的嵌套才能使用。
- 它不处理不 happy path(失败)。
- 它相当顺序化。
我们可以通过使用 for
推理来修复其中一些问题。
import scala.concurrent.{ExecutionContext, duration, Future, Await}
import scala.reflect.runtime.universe._
import scala.reflect._
import scala.reflect.runtime._
import scala.util
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
object ClassesDemo {
def main(args: Array[String]) =
{
val future1 : Future[Double] = Future { 1 }
val future2 : Future[Double] = Future { 2 }
val future3 : Future[Double] = Future { 3 }
import scala.concurrent.ExecutionContext.Implicits.global
val (f1,f2,f3) = (future1, future2, future3)
val f = for {
r1 <- f1
r2 <- f2
r3 <- f3
} yield r1 + r2 + r3
f onComplete {
case Success(s) => println(s"Sum: $s")
case Failure(e) => // Handle failure
}
System.in.read()
}
}
这修复了第 1 点和第 2 点,但它仍然顺序执行。我们可以进一步改进并这样做:
import scala.concurrent.{ExecutionContext, duration, Future, Await}
import scala.reflect.runtime.universe._
import scala.reflect._
import scala.reflect.runtime._
import scala.util
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
object ClassesDemo {
def main(args: Array[String]) =
{
val future1 : Future[Double] = Future { 1 }
val future2 : Future[Double] = Future { 2 }
val future3 : Future[Double] = Future { 3 }
import scala.concurrent.ExecutionContext.Implicits.global
val f = Future.sequence(Seq(future1,future2,future3))
f onComplete {
case Success(r) => println(s"Sum: ${r.sum}")
case Failure(e) => // Handle failure
}
System.in.read()
}
}
但是有一个更好的方法,我很乐意地说它借鉴了 .NET async/await(而 .NET async/await 又借鉴了 F#,但嘿)。我们可以使用 Scala Async 库重写上面的代码,如下所示。
Scala async 库可以在这里找到:
https://github.com/scala/async 更新于 2015/11/10
import scala.concurrent.{Future}
import scala.async.Async._ //'async/await' macros blocks and implicits
object ClassesDemo {
def main(args: Array[String]) =
{
val future1 : Future[Double] = Future { 1 }
val future2 : Future[Double] = Future { 2 }
val future3 : Future[Double] = Future { 3 }
//use Scala Async Library here, note the Async-Await
async {
val s = await {future1} + await {future2} + await {future3}
println(s"Sum: $s")
} onFailure { case e => /* Handle failure */ }
System.in.read()
}
}
async 标记了一个异步代码块。这样的块通常包含一个或多个 await 调用,这些调用标记了计算将被暂停的点,直到被 await 的 Future 完成为止。
默认情况下,async 块在 scala.concurrent.{Future, Promise} 上运行。该系统可以适应 Future 模式的其他实现。
https://github.com/scala/async 更新于 2015/11/10
对我这个 .NET 人来说,进入 Scala 世界,这很有意义。
延伸阅读
Scala 文档实际上对 Futures/Promises 非常好。你可以在这里阅读更多关于它的信息。