F# 和 C# 中的数据流编程基础






4.63/5 (8投票s)
本文介绍了数据流编程,并提供了 C# 和 F# 中数据流变量的基本实现。
数据流编程入门
数据流编程到底是什么?在经典的命令式编程中,程序本质上是一组操作,它们处理可变状态,从而有效地隐藏了数据路径。数据流编程更像是装配线上的工人/机器人,只有当输入材料到达时它们才会执行。命令式编程风格在并发执行(多线程)时,如果没有适当的同步,可能会引入非确定性。在数据流编程中,程序的执行取决于实际数据,或者更确切地说,取决于数据的可用性。数据流编程可以产生完全确定的程序。
让我们引入数据流变量的概念,这是数据流编程的主要概念之一。数据流变量可以有两种可能的状态:已绑定(已赋值)或未绑定(尚未赋值)。当一个线程尝试读取未绑定数据流变量的值时,它会被阻塞,直到另一个线程绑定该变量。数据流变量只能绑定一次,后续的绑定尝试都会失败。那么,什么是数据流编程?有了数据流变量,还可以构建阻塞队列和流。可以使用这种阻塞队列来实现 Actor 模型。
基本上,您可以从这个维基百科 文章中获取有关数据流编程的更多信息。还有一个关于 Groovy GPars 指南的精彩文章。
文章概述
本文介绍了 C# 和 F# 中数据流变量的基本实现。此外,本文还展示了使用 futures 在 C# 中进行数据流编程的示例。数据流编程的最佳效果在遵循声明式模型原则的编程语言中才能实现。在我们的例子中,C# 是一种命令式语言,以数据流风格编程需要开发人员自律。令人惊讶的是,F# 被认为是函数式编程语言,因此遵循声明式编程范式,但也允许开发人员以命令式编程的方式进行编程(通过 `mutable` 关键字)。在 C# 和 F# 中添加数据流变量并不能使它们自动成为数据流编程语言,因为仍然缺乏必要的语法糖和语言支持。
Clojure 是最流行的现代数据流编程语言之一。Clojure 通过 `premises` 支持数据流编程。还可以使用 GPars for Groovy 等开源库在 Groovy、Scala、Ruby 等其他流行语言中进行数据流编程,但所有这些语言都未为数据流变量提供语法支持。作为一种真正的数据流编程语言,我会区分 Oz 编程语言,它将所有变量视为数据流变量:尝试读取未绑定/未初始化变量的读取器将被阻塞,直到变量被绑定/初始化。一方面,这可以避免著名的 `NullReferenceException` 异常,但另一方面,它可能导致程序挂起。
首先,我将介绍 C# 和 F# 的实现,然后深入探讨线程同步细节。
C# 中的数据流变量
让我们从一个简单的 C# 数据流变量使用示例开始。
var variable = new DataflowVariable<int>(); //create variable
variable.Bind(value); //bind variable
int value = 1000 + variable;//read variable
C# 在运算符重载方面扩展性不强(正如您稍后在 F# 实现中会看到的),这就是我们在此使用 `Bind` 方法的原因。实际上,这是一种品味问题——是使用运算符处理数据流变量,还是简单地使用属性/函数,但在我看来,运算符看起来更自然。我喜欢 C# 的地方在于隐式转换运算符。现在是代码本身
public class DataflowVariable<t>
{
private readonly object syncLock = new object();
private volatile bool isInitialized = false;
private volatile object value;
private T Value
{
get
{
if(!isInitialized)
{
lock(syncLock)
{
while(!isInitialized)
Monitor.Wait(syncLock);
}
}
return (T)value;
}
set
{
lock (syncLock)
{
if (isInitialized)
throw new System.InvalidOperationException
("Dataflow variable can be set only once.");
else
{
this.value = value;
isInitialized = true;
Monitor.PulseAll(syncLock);
}
}
}
}
public void Bind(T newValue)
{
this.Value = newValue;
}
public static implicit operator T(DataflowVariable<t> myVar)
{
return myVar.Value;
}
}
F# 中的数据流变量
让我们从一个简单的 F# 数据流变量使用示例开始。
let myVar = new DataflowVariable<int>() // create variable
myVar <~ value //bind variable
let value = (1000 + !!myVar) //read variable
这里我们使用运算符 (`<~`) 来绑定数据流变量,使用运算符 (`!!`) 来读取其值。现在是代码本身
type public DataflowVariable<'T> () =
class
[<volatilefield>]
let mutable value : option<'T> = None
let syncLock = new System.Object()
member private this.Value
with get() : 'T =
match value with
| Some(initializedVal) -> initializedVal
| None ->
lock syncLock (fun () ->
while (value.IsNone) do
ignore (System.Threading.Monitor.Wait
(syncLock))
value.Value)
and set(newVal : 'T) =
lock syncLock (fun () ->
match value with
| Some(_) -> invalidOp
"Dataflow variable can be set only once."
| None ->
value <- Some(newVal)
System.Threading.Monitor.PulseAll(syncLock))
static member public (<~) (var:DataflowVariable<'T>, initValue:'T) =
var.Value <- initValue
static member public (!!) (var:DataflowVariable<'T>) : 'T =
var.Value
end
您可能已经注意到 `[<volatilefield>]` 属性。根据非常简略的文档,此属性有效地取代了 C# 中的 `volatile` 关键字,但我尚未进行彻底测试来验证它。什么?F# 没有 volatile 字段的关键字?它本应如此。Volatile 字段属于命令式编程的范畴,而 F# 作为一种函数式编程语言(是声明式模型的实现),试图避免共享状态(还记得 `mutable` 关键字吗?)。F# 不支持隐式转换运算符的重载,这就是为什么我们需要某种解引用前缀运算符 (`!!`)。F# 的实现更加优雅,因为它公开了 `Option` 类型,因此不必像 C# 实现那样处理 `isInitialized` 字段。
实现细节和一些关于线程同步的思考
在两种实现中,我都使用了 volatile 字段结合 `Monitor.Wait`/`Monitor.Pulse` 的简单模式进行同步。您可以从 Joe Albahari 的这篇很棒的文章 中获取有关 `Monitor.Pulse`/`Monitor.Wait` 的更多信息。这里的 volatile 字段用于防止指令重排并确保 CPU 缓存同步。另外,作为一种选择,我们可以使用 `Thread.VolatileRead` 方法而不是 volatile 字段(我们不需要使用 `Thread.VolatileWrite`,因为实际写入是在 `lock` 语句内完成的,它有效地防止了重排并刷新和使 CPU 缓存失效,而且无论如何 `Thread.VolatileWrite` 只刷新 CPU 缓存而不使其失效)。基本上,`Thread` 类中的 `static VolatileRead` 和 `VolatileWrite` 方法在强制执行(技术上是 `volatile` 关键字所作保证的超集)的情况下读/写变量。
C# 和 F# 中的数据流编程示例
在 C# 中,我将使用 Parallel Extensions 库(futures 和 continuations)演示一个简单的数据流编程示例。基本上,使用 `Task.Factory.ContinueWhenAll`,可以获得与数据流变量类似的结果,但数据流变量为开发人员提供了更大的灵活性。
var input1 = new DataflowVariable<int>();
var input2 = new DataflowVariable<int>();
var output1 = new DataflowVariable<int>();
var output2 = new DataflowVariable<int>();
Task<int> task1 = Task.Factory.StartNew<int>(
() =>
{
output1.Bind(input1 + input2);
return output1*10;
});
Task<int> task = Task.Factory.StartNew<int>(() =>
{
output2.Bind(input1 + output1);
return input1;
});
input1.Bind(333);
input2.Bind(888);
Console.WriteLine(10 + output1 + output2);
结论
本文介绍了 C# 和 F# 编程语言中数据流变量的基本实现,以及使用 continuations/futures 进行数据流编程的基本示例。请将本文视为进入数据流编程世界之旅的起点。
历史
- 2010 年 9 月 3 日:初始帖子