在 MoonBit 中实现 Haskell 的 G-Machine





0/5 (0投票)
本文将深入探讨惰性求值以及如何在 MoonBit 中实现 Haskell 的 G-Machine。
高阶函数与性能挑战
诸如 map
和 filter
等高阶函数通常是许多人对函数式编程的第一印象(尽管它远不止这些函数)。它们简化了许多列表处理任务,但随之而来的是另一个问题:过度使用这些高阶函数可能导致性能不佳(因为它需要多次遍历列表)。
为了提高代码效率,有人提出利用基于高阶函数中常见模式的编译器优化。例如,将 map(f, map(g, list))
重写为:
map(fn (x) { f(g(x)) }, list)
不错的尝试,但重要的是要认识到此类优化技术存在固有的局限性,尤其是在处理更复杂的情况下。将所有过程合并到一个函数中可以绕过重复遍历列表的需要,但会损害代码的可读性并使修改过程复杂化。是否存在一种更公平的解决方案,可以平衡效率与可维护性?
惰性求值是一种可以在一定程度上降低此类场景中不必要成本的技术。这种策略可以集成到特定的数据结构中(例如,Java 8 中添加的 Stream
类型,以及早期 Scheme 语言中的 stream),或者整个语言都可以设计为惰性的(成功的例子包括 20 世纪 80 年代的 Miranda 语言,以及后来的 Haskell 和 Clean 语言)。
首先,让我们探讨一下惰性列表(Stream
)如何在这些情况下避免多次遍历。
惰性列表实现
首先,我们定义它的类型
enum Stream[T] {
Empty
Cons(T, () -> Stream[T])
}
Stream[T]
和 List[T]
之间唯一的真正区别在于 Cons
:保存列表其余部分的那个位置被替换为一个无参数函数(行话称为 thunk)。这是惰性求值的一个简单实现:将您不想立即计算的内容封装在 thunk 中。
还需要一个将普通列表转换为惰性列表的函数
fn Stream::fromList[T](l : List[T]) -> Stream[T] {
match l {
Nil => Empty
Cons(x, xs) => Cons(x, fn () { Stream::fromList(xs) })
}
}
此函数不需要遍历整个列表即可将其转换为 Stream
。对于不紧急的操作(此处为 Stream::fromList(xs)
),直接将其封装在 thunk 中并返回。下面的 map
函数将采用此方法(尽管此处 xs
已经是 thunk)。
fn stream_map[X, Y](self : Stream[X], f : (X) -> Y) -> Stream[Y] {
match self {
Empty => Empty
Cons(x, xs) => Cons(f(x), fn () { xs().stream_map(f) })
}
}
take
函数负责执行计算,它可以按需提取 n
个元素。
fn take[T](self : Stream[T], n : Int) -> List[T] {
if n == 0 {
Nil
} else {
match self {
Empty => Nil
Cons(x, xs) => Cons(x, xs().take(n - 1))
}
}
}
使用 thunk 实现惰性数据结构非常直接,并且有效地解决了上述问题。此方法要求用户明确指出代码中应该延迟计算的位置,而惰性语言的策略则更为激进:它默认对所有用户定义的函数使用惰性求值!在接下来的部分中,我将介绍一个最小化的惰性函数式语言实现,并简要介绍其底层的理论模型。
一种惰性求值语言及其抽象语法树
本文使用的示例是一种惰性求值语言,故意使其类似于 Clojure(一种 Lisp 方言),并命名为 coreF。此设计选择允许在 markdown 中使用 Clojure 的语法高亮。不用担心,尽管语法起初可能看起来有些复杂,但它足够简单。
函数使用 defn
关键字定义
(defn factorial[n] ;; n is the parameter, this function calculates the factorial of n
(if (eq n 0) ;; The definition starts here and continues for the next three lines
1
(mul n (factorial (sub n 1))))
在日常对话中称其为函数是可以接受的。然而,在讨论惰性函数式语言时,我们必须引入一个专用术语:Super Combinator。在 super combinator 的定义中,所有自由变量都应包含在最初的 []
对中。
coreF 程序的执行从 main
开始,它调用一个特定的 super combinator,就像将其替换为其定义一样。
(defn main[] (factorial 42))
不带参数的 super combinator,例如 main
,被称为 Constant Applicative Forms (CAF)。
coreF 还拥有一些语言特性,包括自定义数据结构、用于解构结构的 case
表达式,以及用于声明局部变量的 let
和 letrec
。然而,本文的范围仅限于上述特性(实际上,甚至更少,因为像 eq
、mul
、sub
等内置函数计划在未来实现)。
coreF 排除匿名函数,因为匿名函数会引入额外的自由变量。删除它们需要一个额外的转换步骤:lambda 提升。这项技术可以将 lambda 表达式转换为外部 Super Combinator,但这并非惰性求值的主要重点,因此在此省略。
Super combinators 最终将被解析为 ScDef[String]
,但编写解析器是一项繁琐的任务。我将在最终代码中提供它。
enum RawExpr[T] {
Var(T)
Num(Int)
Constructor(Int, Int) // tag, arity
App(RawExpr[T], RawExpr[T])
Let(Bool, List[(T, RawExpr[T])], RawExpr[T]) // isRec, Defs, Body
Case(RawExpr[T], List[(Int, List[T], RawExpr[T])])
}
struct ScDef[T] {
name : String
args : List[T]
body : RawExpr[T]
} derive (Show)
此外,还需要一些预定义的 coreF 程序。
let preludeDefs : List[ScDef[String]] = {
let id = ScDef::new("I", arglist1("x"), Var("x")) // id x = x
let k = ScDef::new("K", arglist2("x", "y"), Var("x")) // K x y = x
let k1 = ScDef::new("K1", arglist2("x", "y"), Var("y")) // K1 x y = y
let s = ScDef::new("S", arglist3("f", "g", "x"), App(App(Var("f"), Var("x")), App(Var("g"), Var("x")))) // S f g x = f x (g x)
let compose = ScDef::new("compose", arglist3("f", "g", "x"), App(Var("f"), App(Var("g"), Var("x")))) // compose f g x = f (g x)
let twice = ScDef::new("twice", arglist1("f"), App(App(Var("compose"), Var("f")), Var("f"))) // twice f = compose f f
Cons(id, Cons(k, Cons(k1, Cons(s, Cons(compose, Cons(twice, Nil))))))
}
为什么是图
在 coreF 语言中,表达式(不是前面提到的 RawExpr[T]
,而是运行时表达式)在求值时以图的形式存储在内存中,而不是以树的形式。
为什么采取这种方法?让我们通过一个程序示例来探讨这一点
(defn square[x] (mul x x))
(defn main[] (square (square 3)))
如果按照传统的表达式树进行求值,它将简化为
(mul (square 3) (square 3))
在这种情况下,(square 3)
将被求值两次,这对于惰性求值来说肯定是不理想的。
为了更清晰地说明这一点,让我们用 MoonBit 代码做一个有些不恰当的比喻
fn square(thunk : () -> Int) -> Int {
thunk() * thunk()
}
使用图来表示程序是为了方便计算共享结果并避免重复计算。为了达到此目的,在缩减图时实现就地更新算法至关重要。关于就地更新,让我们用 MoonBit 代码来模拟
enum LazyData[T] {
Waiting(() -> T)
Done(T)
}
struct LazyRef[T] {
mut data : LazyData[T]
}
fn extract[T](self : LazyRef[T]) -> T {
match self.data {
Waiting(thunk) => {
let value = thunk()
self.data = Done(value) // in-place update
value
}
Done(value) => value
}
}
fn square(x : LazyRef[Int]) -> Int {
x.extract() * x.extract()
}
无论哪一方先执行 extract
方法,它都会更新引用的可变字段并将其内容替换为计算结果。因此,在第二次执行 extract
方法时无需重新计算。
约定
在深入探讨图缩减如何工作之前,让我们先建立一些关键术语和基本事实。我将继续使用相同的程序作为示例
(defn square[x] (mul x x)) ;; multiplication
(defn main[] (square (square 3)))
- 像
mul
这样的内置原语是预定义的运算。 - 对表达式(当然是惰性的)进行求值并就地更新图中对应的节点称为缩减。
(square 3)
是一个可缩减表达式(通常缩写为 redex),由square
及其参数组成。它可以缩减为(mul 3 3)
。(mul 3 3)
也是一个 redex,但它与(square 3)
属于不同类型的 redex,因为square
是用户定义的 combinator,而mul
是已实现的内置原语。(mul 3 3)
的缩减结果是表达式9
,它无法进一步缩减。无法进一步缩减的表达式称为正规形式(Normal forms)。- 一个表达式可能包含多个子表达式(例如,
(mul (add 3 5) (mul 7 9))
)。在这种情况下,表达式的缩减顺序至关重要——某些程序仅在特定的缩减顺序下才会停止。 - 存在一种特殊的缩减顺序,它总是选择最外层的 redex 进行缩减,称为正规序缩减(normal order reduction)。在接下来的讨论中将统一采用此缩减顺序。
因此,图缩减可以用以下伪代码描述
While there exist reducible expressions in the graph {
Select the outermost reducible expression.
Reduce the expression.
Update the graph with the result of reduction.
}
头晕了吗?让我们找几个例子来演示如何在纸上进行缩减。
步骤 1:找到下一个 Redex
整个程序的执行从 main
函数开始。
(defn square[x] (mul x x))
(defn main[] (add 33 (square 3)))
main
本身就是一个 CAF——最简单的 redex 类型。如果执行替换,当前需要处理的表达式是
(add 33 (square 3))
根据找到最外层 redex 的原则,似乎您立即找到了由 add
及其两个参数组成的 redex(姑且这么认为)。
但是等等!由于默认的 currying 存在,此表达式对应的抽象语法树实际上由多个嵌套的 App
节点组成。它大致看起来像这样(为便于阅读而简化)
App(App(add, 33), square3)
从 add
到最外层 App
节点的这种链式结构称为“Spine”。
回过头来检查,add
是一个内部定义的基元。然而,由于其第二个参数 (square 3)
不是正规形式,您无法对其进行缩减(将未求值的表达式加到一个整数上似乎有点荒谬)。因此,我们不能断定 (add 33 (square 3))
是一个 redex;它只是最外层的函数应用。要对其进行缩减,您必须首先缩减 (square 3)
。
步骤 2:缩减
由于 square
是用户定义的 super combinator,缩减 (square 3)
只涉及参数替换。
如果一个 redex 的参数少于 super combinator 所需的参数,这在高阶函数中很常见,请考虑将列表中的所有整数加倍的示例。
(map (mul 3) list-of-int)
在此,(mul 3)
不能被视为 redex,因为它参数不足,因此是一个 weak head normal form
(通常缩写为 WHNF)。在这种情况下,即使其子表达式包含 redex,也无需执行任何操作。
步骤 3:更新
此步骤仅影响执行效率,在纸上推导时可以跳过。
在纸上执行这些操作很容易(当代码量不超过半页时),但当您切换到计算机时,如何将这些步骤转换为可执行代码?
为了回答这个问题,惰性求值编程语言的先驱们提出了各种 **抽象机器** 来模拟惰性求值。其中包括
- G-Machine
- Three Instruction Machine
- ABC Machine(由 Clean 语言使用)
- Spineless Tagless G-Machine(缩写为 STG,由 Haskell 语言使用)
它们是用于指导编译器实现的执行模型。重要的是要注意,与当今各种流行的虚拟机(如 JVM)不同,抽象机器更像是编译器的中间表示(IR)。以 Haskell 的编译器 GHC 为例,在生成 STG(Spineless Tagless G-Machine)代码后,它不会直接将其传递给解释器执行。相反,它会根据所选的后端进一步将其转换为 LLVM、C 代码或机器码。
为了简化实现,本文将直接使用 MoonBit 编写 G-Machine 指令的解释器,从一个最小的示例开始,逐步添加更多功能。
G-Machine 概述
虽然 G-Machine 是惰性函数式语言的抽象机器,但其结构和概念与编写通用命令式语言时遇到的内容没有显著差异。它也具有堆和栈等结构,并且其指令是顺序执行的。一些关键区别包括
- 堆中内存的基本单元不是字节,而是图节点。
- 栈只包含指向堆中地址的指针,而不是实际数据。
这种设计可能不实用,但相对简单。
在 coreF 中,super combinators 被编译成一系列 G-Machine 指令。这些指令可以大致分为以下几类
- 访问数据指令,例如
PushArg
(访问函数参数)和PushGlobal
(访问其他 super combinators)。 - 在堆中构造/更新图节点,例如
MkApp
、PushInt
、Update
。 - 使用
Pop
指令清理栈中未使用的地址。 - 使用
Unwind
指令表达控制流。
剖析 G-Machine 状态
在此 G-Machine 的简单版本中,状态包括
- 堆:这是存储表达式图和 super combinator 对应指令序列的地方。
type Addr Int derive(Eq, Show) // Use the 'type' keyword to encapsulate an address type. enum Node { // Describe graph nodes with an enumeration type. NNum(Int) NApp(Addr, Addr) // The application node NGlobal(Int, List[Instruction]) // To store the number of parameters and // the corresponding sequence of instructions for a super combinator. NInd(Addr) // The Indirection node,The key component of implementing lazy // evaluation } derive (Eq) struct GHeap { // The heap uses an array, and the space with None content // in the array is available as free memory. mut objectCount : Int memory : Array[Option[Node]] } // Allocate heap space for nodes. fn alloc(self : GHeap, node : Node) -> Addr { let heap = self // Assuming there is still available space in the heap. fn next(n : Int) -> Int { (n + 1) % heap.memory.length() } fn free(i : Int) -> Bool { match heap.memory[i] { None => true _ => false } } let mut i = heap.objectCount while not(free(i)) { i = next(i) } heap.memory[i] = Some(node) return Addr(i) }
- 栈:栈只存储指向堆的地址。简单实现可以使用
List[Addr]
。 - 全局表:它是一个映射表,记录 super combinator 的名称(包括预定义的和用户定义的)及其对应的地址作为
NGlobal
节点。这里,我使用 Robin Hood 哈希表来实现它。 - 当前要执行的代码序列。
- 执行状态统计:简单实现包括计算已执行了多少条指令。
type GStats Int
let statInitial : GStats = GStats(0)
fn statInc(self : GStats) -> GStats {
let GStats(n) = self
GStats(n + 1)
}
fn statGet(self : GStats) -> Int {
let GStats(n) = self
return n
}
整个状态使用 GState
类型表示。
struct GState {
stack : List[Addr]
heap : GHeap
globals : RHTable[String, Addr]
code : List[Instruction]
stats : GStats
}
fn putStack(self : GState, addr : Addr) {
self.stack = Cons(addr, self.stack)
}
fn putCode(self : GState, is : List[Instruction]) {
self.code = append(is, self.code)
}
fn pop1(self : GState) -> Addr {
match self.stack {
Cons(addr, reststack) => {
self.stack = reststack
addr
}
Nil => {
abort("pop1: stack size smaller than 1")
}
}
}
fn pop2(self : GState) -> (Addr, Addr) {
// Pop 2 pops the top two elements from the stack.
// Returns (the first, the second).
match self.stack {
Cons(addr1, Cons(addr2, reststack)) => {
self.stack = reststack
(addr1, addr2)
}
otherwise => {
abort("pop2: stack size smaller than 2")
}
}
}
现在,您可以将纸上推导出的图缩减算法的每个步骤映射到此抽象机器
- 在机器的初始状态下,所有编译好的 super combinators 都已放置在堆上的
NGlobal
节点中。此时,G-Machine 中的当前代码序列仅包含两条指令。第一条指令将main
节点的地址推送到栈上,第二条指令将main
对应的代码序列加载到当前代码序列中。 main
的对应代码序列在堆上实例化,其中分配节点并相应地加载数据,最终在内存中构建一个图。这个过程称为“实例化”main
。实例化完成后,将此图入口点的地址推送到栈上。- 实例化完成后,进行清理工作,包括更新图节点(由于
main
没有参数,无需清理栈中残留的未使用地址)并找到下一个 redex。
所有这些任务都有相应的指令实现。
每条指令的相应作用
高度简化的 G-Machine 目前包含 7 条指令。
enum Instruction {
Unwind
PushGlobal(String)
PushInt(Int)
PushArg(Int)
MkApp
Update(Int)
Pop(Int)
} derive (Eq, Debug, Show)
PushInt
指令最简单。它在堆上分配一个 NNum
节点,并将其地址推送到栈上。
fn pushint(self : GState, num : Int) {
let addr = self.heap.alloc(NNum(num))
self.putStack(addr)
}
PushGlobal
指令从全局表中检索指定 super combinator 的地址,然后将该地址推送到栈上。
fn pushglobal(self : GState, name : String) {
let sc = self.globals[name]
match sc {
None => abort("pushglobal(): cant find super combinator \\(name)")
Some(addr) => {
self.putStack(addr)
}
}
}
PushArg
指令稍微复杂一些。它对栈上地址的布局有特定要求:第一个地址应指向 super combinator 节点,后面是 n 个指向 N NApp
节点的地址。PushArg
从 offset + 1
开始检索第 N 个参数。
fn pusharg(self : GState, offset : Int) {
// Skip the first super combinator node.
// Access the (offset + 1)th NApp node
let appaddr = nth(self.stack, offset + 1)
let arg = match self.heap[appaddr] {
NApp(_, arg) => arg
otherwise => abort("pusharg: stack offset \\(offset) address \\(appaddr) node \\(otherwise), not a applicative node")
}
self.putStack(arg)
}
MkApp
指令从栈顶弹出两个地址,构造一个 NApp
节点,并将其地址推送到栈上。
fn mkapp(self : GState) {
let (a1, a2) = self.pop2()
let appaddr = self.heap.alloc(NApp(a1, a2))
self.putStack(appaddr)
}
Update
指令假设栈上的第一个地址指向当前 redex 的求值结果。它跳过紧随其后的 super combinator 节点的地址,并将第 N 个 NApp
节点替换为指向求值结果的间接节点。如果当前 redex 是 CAF,它会直接替换堆上的相应 NGlobal
节点。由此可见,在惰性函数式语言中,没有参数的函数与普通变量之间没有太大区别。
fn update(self : GState, n : Int) {
let addr = self.pop1()
let dst = nth(self.stack, n)
self.heap[dst] = NInd(addr)
}
G-Machine 中的 Unwind
指令类似于求值循环。它基于栈顶地址对应的节点类型有几个分支条件
- 对于
Nnum
节点:不做任何操作。 - 对于
NApp
节点:将左边节点的地址推送到栈上,然后再次Unwind
。 - 对于
NGlobal
节点:如果栈上有足够的参数,则将此 super combinator 加载到当前代码中。 - 对于
NInd
节点:将此间接节点中包含的地址推送到栈上,然后再次 Unwind。
fn unwind(self : GState) {
let addr = self.pop1()
match self.heap[addr] {
NNum(_) => self.putStack(addr)
NApp(a1, _) => {
self.putStack(addr)
self.putStack(a1)
self.putCode(Cons(Unwind, Nil))
}
NGlobal(_, n, c) => {
if length(self.stack) < n {
abort("Unwinding with too few arguments")
} else {
self.putStack(addr)
self.putCode(c)
}
}
NInd(a) => {
self.putStack(a)
self.putCode(Cons(Unwind, Nil))
}
otherwise => abort("unwind() : wrong kind of node \\(otherwise), address \\(addr)")
}
}
Pop
指令弹出 N 个地址,无需单独的函数实现。
将 Super Combinators 编译成指令序列
在 G-Machine 概述部分,我大致描述了已编译的 super combinators 的行为。现在我将精确描述 super combinators 的编译过程。
首先,在编译的 super combinator 的指令序列执行之前,栈上必须已经存在某些地址
- 最顶部的地址指向一个
NGlobal
节点(super combinator 本身)。 - 接下来是 N 个地址(N 是此 super combinator 的参数数量),指向一系列 App 节点——正好对应于 redex 的 spine。栈中最底部的地址指向表达式的最外层 App 节点,其余的以此类推。
编译 super combinator 时,需要维护一个环境,该环境允许我们在编译过程中通过名称找到参数在栈中的相对位置。此外,由于在完成 super combinator 的实例化后需要清除前面的 N+1 个地址,因此需要传递参数的数量 N。
此处,“参数”指的是指向堆上 App 节点的地址,并且可以通过
pusharg
指令访问实际的参数地址。
fn compileSC(self : ScDef[String]) -> (String, Int, List[Instruction]) {
let name = self.name
let body = self.body
let mut arity = 0
fn gen_env(i : Int, args : List[String]) -> List[(String, Int)] {
match args {
Nil => {
arity = i
return Nil
}
Cons(s, ss) => Cons((s, i), gen_env(i + 1, ss))
}
}
let env = gen_env(0, self.args)
(name, arity, compileR(body, env, arity))
}
compileR
函数通过调用 compileC
函数生成用于实例化 super combinators 的代码,然后附加三条指令
Update(N)
:将堆中的原始 redex 更新为NInd
节点,该节点随后指向新实例化的 super combinator。Pop(N)
:清理栈中的冗余地址。Unwind
:搜索下一个 redex 以开始下一次缩减。
fn compileR(self : RawExpr[String],
env : List[(String, Int)], arity : Int) -> List[Instruction] {
if arity == 0 {
// The Pop 0 instruction does nothing in practice,
// so it is not generated when the arity is 0.
append(compileC(self, env), from_array([Update(arity), Unwind]))
} else {
append(compileC(self, env), from_array([Update(arity), Pop(arity), Unwind]))
}
}
编译 super combinator 定义时,采用了一种相当粗糙的方法:如果一个变量不是参数,则将其视为另一个 super combinator(写错会导致运行时错误)。对于函数应用,先编译右侧表达式,然后增加环境中的所有参数偏移量(因为一个指向已实例化右侧表达式的额外地址被添加到栈顶),然后编译左侧表达式,最后添加 MkApp
指令。
fn compileC(self : RawExpr[String], env : List[(String, Int)]) -> List[Instruction] {
match self {
Var(s) => {
match lookupENV(env, s) {
None => from_array([PushGlobal(s)])
Some(n) => from_array([PushArg(n)])
}
}
Num(n) => from_array([PushInt(n)])
App(e1, e2) => {
append(compileC(e2, env), append(compileC(e1, argOffset(1, env)), from_array([MkApp])))
}
_ => abort("not support yet")
}
}
运行 G-Machine
一旦 super combinators 被编译,它们就需要被放置在堆上(并将其地址添加到全局表中)。这可以递归地完成。
fn buildInitialHeap(scdefs : List[(String, Int, List[Instruction])]) -> (GHeap, RHTable[String, Addr]) {
let heap = { objectCount : 0, memory : Array::make(10000, None) }
let globals = RHTable::new(50)
fn go(lst : List[(String, Int, List[Instruction])]) {
match lst {
Nil => ()
Cons((name, arity, instrs), rest) => {
let addr = heap.alloc(NGlobal(name, arity, instrs))
globals[name] = addr
go(rest)
}
}
}
go(scdefs)
return (heap, globals)
}
定义一个名为“step
”的函数,该函数将 G-Machine 的状态向前推进一步,如果已达到最终状态则返回 false
。
fn step(self : GState) -> Bool {
match self.code {
Nil => { return false }
Cons(i, is) => {
self.code = is
self.statInc()
match i {
PushGlobal(f) => self.pushglobal(f)
PushInt(n) => self.pushint(n)
PushArg(n) => self.pusharg(n)
MkApp => self.mkapp()
Unwind => self.unwind()
Update(n) => self.update(n)
Pop(n) => { self.stack = drop(self.stack, n) } // without the need for
// additional functions
}
return true
}
}
}
此外,定义一个名为“reify
”的函数,该函数持续执行“step
”函数,直到达到最终状态。
fn reify(self : GState) {
if self.step() {
self.reify()
} else {
let stack = self.stack
match stack {
Cons(addr, Nil) => {
let res = self.heap[addr]
println("\\(res)")
}
_ => abort("wrong stack \\(stack)")
}
}
}
组合上述组件。
fn run(codes : List[String]) {
fn parse_then_compile(code : String) -> (String, Int, List[Instruction]) {
let code = TokenStream::new(code)
let code = parseSC(code)
let code = compileSC(code)
return code
}
let codes = append(map(parse_then_compile, codes), map(compileSC, preludeDefs))
let (heap, globals) = buildInitialHeap(codes)
let initialState : GState = {
heap : heap,
stack : Nil,
code : initialCode,
globals : globals,
stats : initialStat
}
initialState.reify()
}
结论
到目前为止构建的 G-Machine 的功能过于有限,无法运行一个像样的程序。在下一篇文章中,我将逐步添加原语和自定义数据结构等功能。到最后,将在涵盖 G-Machine 后介绍惰性求值技术。
参考
- Peyton Jones, Simon & Lester, David. (2000). Implementing functional languages: a tutorial
历史
- 2024年3月13日:初始版本