基于并发和协调运行时F#工作流引擎的构想。






4.82/5 (8投票s)
本文描述了如何基于F#工作流和CCR构建工作流引擎(类似于WF)的基本思想。
引言
本文介绍了基于 F# 工作流和 CCR 构建 a-la 工作流基础的工作流引擎的基本构想。
主要思想
最近,我研究了一下 F# 工作流特性,并且在我狂热的想象中出现了一个疯狂的想法:“是否可以使用 F# 工作流构建一个 a-la Windows Workflow Foundation 的工作流引擎?”。 在本文中,您可以看到构建这样一个引擎的尝试,至少是它的一个非常初始的版本。 首先,我想为那些没有机会使用 F# 和 F# 工作流的人提供一些有用的链接
所以,如果您继续阅读本文,那么您可能已经掌握了一些 F# 知识或已经查看了上面的链接……或者只是想看看这个疯子在说什么。
让我们从一个小例子开始
let workflowHost = new WorkflowEngine()
let fibonacciWorkflow number =
workflowHost {
do printfn "Started!"
let! f1 = fib number
let! f2 = fib (number + 1)
do printfn "Finished"
do printfn "Result is %s" ((f1 + f2).ToString())
return 0 //success
}
runWorkflow(fibonacciWorkflow(22));;
我们这里有什么:一个名为 fibonacciWorkflow
的工作流或者 THE 工作流。 非常简单:我们计算两个斐波那契数并打印出这些数字的总和。 这里 fib
函数计算给定位置的斐波那契数。 老实说,我是从网上复制粘贴了这个函数。 :)
let fib n =
let rec fib_aux (n, a, b) =
match (n, a, b) with
| (0, a, b) -> a
| _ -> fib_aux (n - 1, a + b, a)
fun () -> fib_aux (n, 0, 1)
看起来没什么特别的 - 一个非常简单的工作流。 让我们现在玩得开心点 - 是时候仔细看看工作流引擎了。
当然,每个可靠的工作流引擎都应该管理线程并在可能的情况下并行(或者不并行?)线程中执行所有工作流和活动,并尽最大努力提高性能。 因此,我们的引擎试图遵循传统,并使用 CCR 来执行所有活动(您可以通过“let!
”关键字识别活动)。 在这里,我们有两个活动,每个活动都在后台转换为 CCR 任务。 每个活动(CCR 任务)的输出被用作下一个活动(CCR 任务)的输入。
在展示实际实现之前,我想描述我们工作流引擎的另一个特性。
每个可靠的工作流引擎应该做的第二件事是为开发人员提供在活动执行期间注入切面的能力。 让我们定义一个简单的安全切面,旨在通知某人(可能是系统管理员)有关安全警报(假设在您的公司中禁止计算斐波那契数:)):)
let securityAspect() =
printfn "Security Alert: Wake up, someone is running your workflow!!!"
这是一个非常非常简单的切面。 我可以使它更复杂(例如,在切面中验证活动输入)……也许下次吧。 :)
现在是时候看看工作流引擎代码了(顺便说一句,请原谅我在这里使用“引擎”这个词,在 F# 中通常使用工作流构建器术语):type Activity<'a> = unit -> 'a
let runWorkflow (f:Activity<'a>) = f()
type WorkflowEngine() =
let dispatcher = new Dispatcher(8, "MyDispatcher") //from Microsoft.Ccr.Core
let queue = new DispatcherQueue("MyQueue", dispatcher) // from Microsoft.Ccr.Core
member b.Let(p, rest) =
rest p
member b.Return(x) =
fun () -> x
member b.Delay f =
fun () -> (runWorkflow f)()
member b.Bind(p, rest) =
securityAspect()
fun () ->
let res = queue.Enqueue(new Task(
fun () ->
printfn "Thread id is: %d"
Thread.CurrentThread.ManagedThreadId
let res = p()
let b = queue.Enqueue(new Task(
fun () ->
let fakeVal =
rest(res)()
()
))
()
))
new 'b()//return fake value - just to fool the F# compiler :)... sorry,
//it's ugly, but I haven't yet found a better way
工作流中的每个关键字都转换为工作流构建器中的一个方法:let -> Let
,let! -> Bind
,return -> Return
和 Delay
方法用于为整个工作流提供调用点。 实际上,所有上面提到的工作流引擎特性相关的功能都在 Bind
方法中实现。 可以定义额外的关键字,例如 for, while, use, if/then… 等。 在这里,我只实现了所有可能的关键字的基本集合。
这里重要的是,借助 F# 编译器,您可以对工作流中的每一行代码进行合理的控制,并且可以通过切面或任何其他方式影响它。
另一个重要的一点是,在工作流中,您可以保持工作流执行的状态。
结论
在这里,我介绍了如何构建基于 F# 工作流的工作流引擎的基本构想。 上面提供的引擎非常简单,它的任务是传达一个可能的实现的基本构想。 绝对不支持集中式异常处理(可靠的工作流引擎也应该提供它),这可以通过 CCR 中的输出端口来实现。 并且 CCR 的使用非常肤浅,我以一种非常简单和愚蠢的方式使用它。 如果您想学习它,可以找到更多与 CCR 相关的文章(我使用了一些 CCR 示例作为基础)。