65.9K
CodeProject 正在变化。 阅读更多。
Home

SCAla : Futures / Promises 及更多

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.50/5 (2投票s)

2015 年 11 月 12 日

CPOL

6分钟阅读

viewsIcon

12295

我是一名 .NET 开发者,而且已经很久了,非常非常习惯处理 .NET 框架的任务库。很明显,我指的是 TPL 和现在的 Async/Await。所以现在我正在做越来越多的 Scala,我想看看 Scala 中等效的代码会是什么样的,因为我很喜欢 .NET 中的 Task(s)。[…]

我是一名 .NET 开发者,而且已经很久了,非常非常习惯处理 .NET 框架的任务库。很明显,我指的是 TPL 和现在的 Async/Await。

所以现在我正在做越来越多的 Scala,我想看看 Scala 中等效的代码会是什么样的,因为我很喜欢 .NET 中的 Task(s)。

假设我拥有这段 .NET 代码,由于使用了回调,所以它不是阻塞的。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class Program
    {
        static void Main(string[] args)
        {
            var task = Task.Run(() =>
            {
                return 40;
            });

            task.ContinueWith(ant =>
            {
                Console.WriteLine(ant.Result);
            }, TaskContinuationOptions.OnlyOnRanToCompletion);

            task.ContinueWith(ant =>
            {
                Console.WriteLine("BAD NEWS");
            }, TaskContinuationOptions.OnlyOnFaulted);

            Console.ReadLine();
        }
    }
}

粗略地说,我们可以将其分解为 Scala 中的以下等价项:

  • Scala 中的 .NET Task 大致等同于 Scala Future
  • .NET 中 task.ContinueWith 回调等同于 Scala 中的 Future 回调

我们可以进一步进行比较。所以让我们将 .NET 代码更改为这段代码,它现在是阻塞的,因为我们不再使用任何回调,而是使用 Task.Result 属性,这会导致 Task 被“观察”。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    class Program
    {
        static void Main(string[] args)
        {
            var task = Task.Run(() => 
            { 
                return 40; 
            });

            var x = task.Result;

            Console.ReadLine();
        }
    }
} 

在 Scala 中,这可以通过使用 scala.concurrent.Await.ready / scala.concurrent.Await.result 来完成,我们稍后会看到更多。我们将花一些时间看看创建和使用 Futures(Scala 的 Task 等价物)的一些底层细节。

特点

Future 是一个包含某个值(该值可能在某个时间点可用)的对象。该值通常是其他计算的结果。

  • 如果计算尚未完成,我们说 Future 未完成。
  • 如果计算已成功完成或出现异常,我们说 Future 已完成。

完成可以采取两种形式之一:

  • 当 Future 以一个值完成时,我们说 Future 已成功地用该值完成。
  • 当 Future 因计算抛出的异常而完成时,我们说 Future 因该异常而失败。

Future 的一个重要特性是它只能被赋值一次。一旦 Future 对象被赋予一个值或异常,它实际上就变成不可变的——它永远不能被覆盖。

创建 Future 对象的最简单方法是调用 future 方法,该方法启动异步计算并返回一个包含该计算结果的 future。结果在 future 完成后即可获得。

请注意,Future[T] 是表示 future 对象的类型,而 future 是一个创建和调度异步计算的方法,然后返回一个将用该计算结果完成的 future 对象。

https://docs.scala-lang.org.cn/overviews/core/futures.html 更新于 2015/11/10

让我们看一个例子。这个简单的例子创建了一个 Future[Int]。

import scala.concurrent._ 
import ExecutionContext.Implicits.global 

object ClassesDemo 
{ 
    def main(args: Array[String]) = { 

        //Creating a Future 
        val intFuture: Future[Int] = Future { 23 } 
    } 
} 

你可能想知道 Future.apply() 方法是如何想出一个可能在未来某个时候完成的计算的。

答案在于使用 Promises,我们稍后会看到。

回调

所以,继续我介绍段落中展示的 .NET 示例,我展示了如何使用 Task.ContinueWith(..),它运行一个延续。

在 Scala 中,我们可以做同样的事情,但这只是被称为“callback”。与 .NET 延续一样,Scala callback 是非阻塞的。

Callback 易于使用,这里有一个例子。

import scala.concurrent.{ExecutionContext, duration, Future, Await} 
import scala.reflect.runtime.universe._ 
import scala.reflect._ 
import scala.reflect.runtime._ 
import scala.util 
import scala.util.{Failure, Success, Try} 
import scala.concurrent.duration._ 
import ExecutionContext.Implicits.global 

object ClassesDemo 
{ 
    def main(args: Array[String]) = 
    { 
        val intFuture: Future[Int] = Future { 23 } 

        //use a "callback" which is non blocking 
        intFuture onComplete { 
            case Success(t) => 
            { 
                println(t) 
            } 
            case Failure(e) => 
           { 
                println(s"An error has occured: $e.getMessage") 
           }
        } 
    } 
} 

等待 Future

我们也可以等待 Future。我们可以使用 scala.concurrent.Await 类的 2 个方法来做到这一点,如下所述。一个重要的注意事项是下面显示的 2 个方法是阻塞的,因此应谨慎使用。

Await.ready 

//Await the "completed" state of an Awaitable. 
def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type 

Await.result

//Await and return the result (of type T) of an Awaitable. 
def result[T](awaitable: Awaitable[T], atMost: Duration): T 

让我们看一个这两种方法的例子。

import scala.concurrent.{ExecutionContext, duration, Future, Await}
import scala.reflect.runtime.universe._
import scala.reflect._
import scala.reflect.runtime._
import scala.util
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
 
object ClassesDemo {
 
  def main(args: Array[String]) =
  {
    //Await.ready
    lazy val intFuture: Future[Int] = Future { 23 }
    val result: Try[Int] = Await.ready(intFuture, 10 seconds).value.get
    val resultEither = result match {
      case Success(t) => Right(t)
      case Failure(e) => Left(e)
    }
    resultEither match {
      case Right(t) => println(t)
      case Left(e) => println(e)
    }
 
    //Await.result
    lazy val stringFuture = Future { "hello" }
    val theString :String = Await.result(stringFuture, 1 second)
    println(theString)
  }
}

运行后将产生以下输出:

image

这里有一些其他的链接,有助于对这一点进行背景阅读:

函数式组合

我们展示的回调机制足以将 future 结果与后续计算链接起来。然而,它有时很不方便,并且会导致代码冗长。幸运的是,scala Future[T] 类非常强大,并附带了许多组合器,可以帮助你编写更简洁的代码。

如果只有 .Net Task 拥有其中一些方法(哦,等等,RX (reactive extensions 有)),那我们就笑了。

不过,现在只需知道 Future[T] 配备了一些你可以使用的不错的组合器。我将在下面介绍其中一些,但你应该自己做一些更多的研究。

Map 示例

在这个例子中,我们使用 Future[T].map 将结果从一个 Future[T] 转换为一个新的 T 类型,例如 TR。

import scala.concurrent.{ExecutionContext, duration, Future, Await}
import scala.reflect.runtime.universe._
import scala.reflect._
import scala.reflect.runtime._
import scala.util
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
 
object ClassesDemo {
 
 
  def main(args: Array[String]) =
  {
 
    val rateQuoteFuture : Future[Double] = Future {
      1.5
    }
 
    val formattedRateFuture = rateQuoteFuture map { quote =>
      println(quote)
      s"Rate was : $quote"
    }
    formattedRateFuture onComplete  {
      case Success(formatted) => println(formatted)
      case Failure(x) => {
        println(x)
      }
    }
 
 
    System.in.read()
  }
}

For

我们也可以使用 For 和 Future[T] (这里有一个我厚颜无耻地从 Scala 文档中偷来的例子)。

val usdQuote = Future { connection.getCurrentValue(USD) }
val chfQuote = Future { connection.getCurrentValue(CHF) }
val purchase = for {
  usd <- usdQuote
  chf <- chfQuote
  if isProfitable(usd, chf)
} yield connection.buy(amount, chf)
purchase onSuccess {
  case _ => println("Purchased " + amount + " CHF")
}

WithFilter

或者如何提供一个过滤器?这可以通过 WithFilter 方法完成。

val purchase = usdQuote flatMap {
  usd =>
  chfQuote
    .withFilter(chf => isProfitable(usd, chf))
    .map(chf => connection.buy(amount, chf))
}

Promises

到目前为止,我们只考虑了使用 future 方法创建的异步计算创建的 Future 对象。但是,future 也可以使用 promises 来创建。

虽然 futures 被定义为一种只读占位符对象,用于表示尚未存在的结果,但 promise 可以被认为是一个可写的、单次赋值的容器,它完成了 future。也就是说,promise 可以用于通过“完成”promise(使用 success 方法)成功地用一个值完成 future。相反,promise 也可以通过使 promise 失败(使用 failure 方法)来用一个异常完成 future。

https://docs.scala-lang.org.cn/overviews/core/futures.html 更新于 2015/11/10

我作为一名 .NET 开发者的思考方式是,Promises 几乎与 TaskCompletionSource 相同。

为了理解 Promise 和 Future 之间的关联,让我们看一下 Future.apply() 方法的签名,它看起来像这样:

def apply[T](body: =>T)(implicit @deprecatedName('execctx) executor: ExecutionContext): Future[T] = impl.Future(body) 

如果我们进一步检查,可以看到它具有此实现代码,我们实际上正在使用 Promise 来完成/失败 Future 计算。

  private[concurrent] object Future {
  class PromiseCompletingRunnable[T](body: => T) extends Runnable {
    val promise = new Promise.DefaultPromise[T]()
 
    override def run() = {
      promise complete {
        try Success(body) catch { case NonFatal(e) => Failure(e) }
      }
    }
  }
 
  def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = {
    val runnable = new PromiseCompletingRunnable(body)
    executor.prepare.execute(runnable)
    runnable.promise.future
  }
}

Scala Async 库

本节中我谈论的大部分内容都包含在一篇很棒的文章中:

http://engineering.roundupapp.co/the-future-is-not-good-enough-coding-with-async-await/

这里有一个使用多个 Future(s) 的小例子。

这有几个问题,即:

  • 每个新 Future 都需要新的嵌套才能使用。
  • 它不处理不 happy path(失败)。
  • 它相当顺序化。
import scala.concurrent.{ExecutionContext, duration, Future, Await}
import scala.reflect.runtime.universe._
import scala.reflect._
import scala.reflect.runtime._
import scala.util
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
 
object ClassesDemo {
 
 
  def main(args: Array[String]) =
  {
 
    val future1 : Future[Double] = Future { 1 }
    val future2 : Future[Double] = Future { 2 }
    val future3 : Future[Double] = Future { 3 }
 
 
    import scala.concurrent.ExecutionContext.Implicits.global
 
    val (f1,f2,f3) = (future1, future2, future3)
    f1 onSuccess { case r1 =>
      f2 onSuccess { case r2 =>
        f3 onSuccess { case r3 =>
          println(s"Sum:  ${r1 + r2 + r3}")
        }
      }
    }
 
 
    System.in.read()
  }
}

这有几个问题,即:

  • 每个新 Future 都需要新的嵌套才能使用。
  • 它不处理不 happy path(失败)。
  • 它相当顺序化。

我们可以通过使用 for 推理来修复其中一些问题。

import scala.concurrent.{ExecutionContext, duration, Future, Await}
import scala.reflect.runtime.universe._
import scala.reflect._
import scala.reflect.runtime._
import scala.util
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
 
object ClassesDemo {
 
 
  def main(args: Array[String]) =
  {
 
    val future1 : Future[Double] = Future { 1 }
    val future2 : Future[Double] = Future { 2 }
    val future3 : Future[Double] = Future { 3 }
 
 
    import scala.concurrent.ExecutionContext.Implicits.global
 
    val (f1,f2,f3) = (future1, future2, future3)
    val f = for {
      r1 <- f1
      r2 <- f2
      r3 <- f3
    } yield r1 + r2 + r3
    f onComplete {
      case Success(s) => println(s"Sum: $s")
      case Failure(e) => // Handle failure
    }
 
 
    System.in.read()
  }
}

这修复了第 1 点和第 2 点,但它仍然顺序执行。我们可以进一步改进并这样做:

import scala.concurrent.{ExecutionContext, duration, Future, Await}
import scala.reflect.runtime.universe._
import scala.reflect._
import scala.reflect.runtime._
import scala.util
import scala.util.{Failure, Success, Try}
import scala.concurrent.duration._
import ExecutionContext.Implicits.global
 
object ClassesDemo {
 
 
  def main(args: Array[String]) =
  {
 
    val future1 : Future[Double] = Future { 1 }
    val future2 : Future[Double] = Future { 2 }
    val future3 : Future[Double] = Future { 3 }
 
 
    import scala.concurrent.ExecutionContext.Implicits.global
 
    val f = Future.sequence(Seq(future1,future2,future3))
    f onComplete {
      case Success(r) => println(s"Sum: ${r.sum}")
      case Failure(e) => // Handle failure
    }
 
 
    System.in.read()
  }
}

但是有一个更好的方法,我很乐意地说它借鉴了 .NET async/await(而 .NET async/await 又借鉴了 F#,但嘿)。我们可以使用 Scala Async 库重写上面的代码,如下所示。

Scala async 库可以在这里找到:

https://github.com/scala/async 更新于 2015/11/10

import scala.concurrent.{Future}
 
import scala.async.Async._ //'async/await' macros blocks and implicits
 
object ClassesDemo {
 
 
  def main(args: Array[String]) =
  {
    val future1 : Future[Double] = Future { 1 }
    val future2 : Future[Double] = Future { 2 }
    val future3 : Future[Double] = Future { 3 }
 
    //use Scala Async Library here, note the Async-Await
    async {
      val s = await {future1} + await {future2} + await {future3}
      println(s"Sum:  $s")
    } onFailure { case e => /* Handle failure */ }
 
 
    System.in.read()
  }
}

async 标记了一个异步代码块。这样的块通常包含一个或多个 await 调用,这些调用标记了计算将被暂停的点,直到被 await 的 Future 完成为止。

默认情况下,async 块在 scala.concurrent.{Future, Promise} 上运行。该系统可以适应 Future 模式的其他实现。

https://github.com/scala/async 更新于 2015/11/10

对我这个 .NET 人来说,进入 Scala 世界,这很有意义。

延伸阅读

Scala 文档实际上对 Futures/Promises 非常好。你可以在这里阅读更多关于它的信息。

https://docs.scala-lang.org.cn/overviews/core/futures.html

© . All rights reserved.