在 F# 中使用类似 List 的 DataReader
本文演示了一种技术,说明如何使用 Reader 而不是 List 在 F# 中编写递归算法,这样您就不必先将所有数据加载到内存中。
引言
F# 中的许多算法都使用 List
数据结构和递归来解决任务。这对于较小的数据集来说非常好,但当数据集变大时,如果不是不可行的话,首先将其转换为列表的开销会变得很大。本文介绍了一种简单技术,说明如何像使用 List
一样使用 IDataReader
,让您可以将大型数据集流式传输到您的算法中,从而大大减少内存使用量。
背景
当您比较 List
和 DataReader
时,它们的实际工作方式非常相似。List
有一个 Head
(当前元素)和一个包含其他元素的 Tail
。DataReader
有一个当前元素和一组下一个元素,您可以使用 MoveNext()
访问它们。这让我想知道是否不能像使用 List
一样使用 DataReader
,而无需先将所有内容加载到 List
中。事实证明,这非常简单。
更新
我写这篇文章的时候,我相对较新 F#,仍然相当固守函数式编程。我不再这样做了。对我来说,F# 的优雅之处在于您可以在最适合的地方使用函数式编程,并且可以在其比纯函数式代码更有效的地方混合使用命令式编程。F# 中的 IEnumerables
由众所周知的序列处理。因此,最佳方法是将 IDataReader
包装在 seq
中,该 seq
以强类型方式公开元素。
我将省略文章的其余部分,因为尾部递归仍然是一个有用的概念,并且互联网应该用墨水书写,但这里有一个快速片段,说明我今天会怎么做。
open System.Data.SqlClient
open System.Data
// a simple record that mirrors the table in the DB
type SomeRecord = {
id : int
val1 : double
val2 : double
} with
// actually these functions could exist on their own outside the record,
// I prefer to keep them together
// create a new record from the current rdr position
static member fromRdr(rdr:IDataReader) = {
id = rdr.GetInt32 0;
val1 = rdr.GetDouble 1;
val2 = rdr.GetDouble 2
}
// project an IDataReader into a seq<SomeRecord>
static member asSeq (rdr:IDataReader) = seq { while rdr.Read()
do yield SomeRecord.fromRdr rdr} // here we mix in a simple while loop to
//free us from recursing down the stack to traverse the IDataReader. MUCH simpler.
// open up connection (I didn't wrap any of this into functions here to just
// show how to use this)
let cn = new SqlConnection "server=.;database=SomeDatabase;integrated security=true"
cn.Open()
// open DataReader
let cmd = new SqlCommand("select id, val1, val2 from table1", cn)
let rdr = cmd.ExecuteReader()
rdr
|> SomeRecord.asSeq // project datareader into seq<SomeRecord>
|> Seq.sumBy (fun r-> r.val1 * r.val2) // now you can use all the Seq operators
// that everybody knows.
// finish up
rdr.Close()
cn.Close()
这是原始文章的其余部分...
教科书 F# 递归算法
让我们从一个使用 List
的基本 F# 递归算法开始。这个算法只是接受一个数字列表并将它们相加。
// create a list of numbers
let l = [1..10]
// create a function to add them together
let rec SumList l =
match l with
head::tail -> head + Sum tail // peel off the head, and add
// its value to the value of the tail
| [] -> 0 // until there are no more elements left,
// return zero for that
// call the function on the list, returns 55 in this case
SumList l
这是 F# 中使用递归优雅地描述算法的一个典型示例。这里添加数字的方式是将列表分成两部分:第一个元素:头部,以及其他元素:尾部。整个列表的值是头部的值加上尾部的值。废话。该函数会再次调用自身(每次都比传入的列表短一个元素)直到没有尾部。这就是 []
所代表的,空列表。当遇到它时,它返回 0
。现在它会沿着堆栈向上累加所有数字,瞧:结果。
通常,当您编写像上面那样的算法时,数据并不是硬编码在其中的,您会从某个地方将数据加载到列表中,然后应用逻辑。但这样做的问题是,现在必须将整个数据集加载到内存中。当数据集很大时,这可能会带来挑战。如果我们可以只将数据的一部分加载到内存中来运行算法,处理完后将其丢弃,然后读取下一部分,这样对内存会更好。
使用 DataReader 实现此目的
这是相同的算法,但现在使用 DataReader
而不是 list
。(注意:不要现在就冲出去在生产环境中使用此代码,因为它会惨败,我们将对此进行演示)。
let rec SumReader (dr:IDataReader) =
match dr.Read() with
true -> dr.GetInt32(1) + SumReader dr
| false -> 0
这与上面的示例几乎一样。它接受一个 IDataReader
,但不是将其分成头部和尾部,而是对其调用 movenext
。如果存在新的当前元素,则返回 true
,否则返回 false
。如果存在下一个元素,它会将该元素的值添加到 Reader 中剩余元素的值中。就像上面的示例一样,它会一直这样做,直到没有下一个元素,然后它会展开堆栈来累加所有数字。
糟糕……没有尾部调用
使用上面的算法对于较小的数据集来说效果很好,但在较大的数据集上会引发 StackOverflowException
。就异常而言,这是最糟糕的情况之一。您无法捕获它们,当您的代码中抛出异常时,就结束了。使用 Reader 而不是 List 的重点是为了能够处理更大的数据集,所以我们显然还没有做到。
我们之所以会收到此异常,是因为我们没有进行尾部调用。F# 使用一种称为尾部调用优化 (TCO) 的技术(解释 在此,在此 和 在此)来使代码运行更有效率。基本上,它会采用一个递归函数并将其重写为一个迭代函数。当原始函数满足一些条件时,这可以很容易地完成,主要条件是调用自身必须是该分支中函数所做的最后一件事。重写后,函数现在不再调用自身而是执行循环,这更有效率,因为不再有堆栈的推送和弹出。堆栈的推送是导致我们失败的原因,TCO 是我们想要的。
如果您仔细查看我们的 DataReader
示例代码,您会发现对自身的递归调用实际上并不是它做的最后一件事,即使它是该行上的最后一件事,因为它将该调用的结果添加到当前元素的值中。THAT 是它做的最后一件事,这使得 F# 无法将其重写为尾部调用。(感谢 Tomas Petricek 指出这一点)。那么我们可以怎么做呢?
进入累加器模式
我们需要做的是传递一个额外的变量来保存到目前为止的结果。每次我们将当前元素的值添加到这个累加结果中,然后将其传递给下一个调用。这样,我们就不必在调用返回后做任何事情。这项技术被称为累加器模式。当我们这样做时,对自身的调用 IS 它是最后一件事情,F# 编译器可以执行尾部调用优化,这将使我们能够处理更大的数据集。以下是它的样子。
let rec SumReaderAcc (dr:IDataReader, accum) =
match dr.Read() with
true -> SumReaderAcc(dr, accum + dr.GetInt32(1)) // now this is the last thing
// it does, enabling TCO
|false -> accum // when there are no more elements,
// return the accumulator
现在可以去修复您的生产问题了,但如果您再留一会儿,我将向您展示如何将其变成一种更通用的方法,您可以在您的代码中重用它。总的来说,类型化是不好的,重用是好的。
让事情变得更通用
到目前为止,此代码的功能是固定的,它添加了查询的第一列。由于 F# 是一种函数式语言,为什么不传递一个函数来应用于 Reader 中的每个元素以获取一个值(函数式依赖注入,太极客了)。这样,它将已经更通用一些。这是代码。
// passing the function to apply to each element as a parameter
// already makes it more generic
let rec SumReaderF (dr:IDataReader, func, accum) =
match dr.Read() with
true -> SumReaderF(dr, func, accum + func(dr)) // pass reader and func
// on to next call, and also apply func to element and add to accumulator
|false -> accum
// create a function that takes an IDataReader and returns an int
let myAdd(element:IDataReader) = element.GetInt32(1)
// apply the function to each element in the reader
SumReaderF(rdr, myAdd, 0)
现在已经好多了,但我们仍然可以做得更好。首先,用户现在必须输入一个起始零给 accum
才能工作,并且它只累加对元素的调用,无法例如进行乘法或平均。下面的代码修复了这两个问题,并且是我们今天将要做的。
let ApplyFuncToReader (dr:IDataReader, func, agg) =
let rec ApplyInternal (accum) = // create inner function to handle
//the recursion and accumulator.
// No need to pass everything for each call,
// just the accumulator will do
match dr.Read() with
true -> ApplyInternal(agg(accum, func(dr))) // apply func to element
// and agg to func
|false ->
dr.Close() // be nice, close the reader
accum // and return the result
ApplyInternal 0 // and execute that inner function passing 0
// as starting value for accumulator
Using the Code
这是调用上述代码的一个示例。
// create a function to apply to each element
let myElementFunction(element:IDataReader) = element.GetInt32(1)
// create an aggregate function to apply between each call to elements
let myAggregate(x, y) = x + y
// create a SqlDataReader (NewReader is a helper function
// I wrote that returns an open SqlDataReader)
let rdr = NewReader()
// and pass all three to ApplyFuncToReader
ApplyFuncToReader (rdr, myElementFunction, myAggregate)
首先,我们创建 2 个函数,一个应用于 IDataReader
中的每个元素以提取一个值,第二个函数用于聚合对第一个函数的调用。这里我们只是将它们相加,但我们也可以将它们相乘或做更复杂的事情。我们不必更改 ApplyFuncToReader
本身来实现来改变其行为。
关注点
上面的代码在日常工作中遇到的绝大多数情况都能正常工作,尤其是在使用 F# interactive 时。在多线程代码中使用时,它与使用 List
不完全相同,因为其他人可能会篡改您的 Reader
,使您的结果不可靠。尽管 resultset
受您选择的 SQL 事务保护,但 Reader
本身可能会在您无法控制的情况下前进到下一个元素。这些事情可以轻易避免,但重要的是要理解,您不像使用 List
那样受到不变性的保护。
我选择 IDataReader
是因为数据库通常是我数据所在的地方。当您的数据存储在文件或来自云端的流中时,当然也可以将相同的技术应用于 FileReader
或 StreamReader
。您不再需要将其全部缓冲到内存中就可以对其进行处理,这对于大量数据来说是一个很大的优点。我希望这能帮助您,就像它帮助我一样。如果可以,请给我写信或评价我的文章。
历史
- 2010-07-21:版本 1.0
- 2011-10-07:更新文章