65.9K
CodeProject 正在变化。 阅读更多。
Home

在 F# 中使用类似 List 的 DataReader

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.71/5 (12投票s)

2010年7月21日

CPOL

8分钟阅读

viewsIcon

40452

downloadIcon

180

本文演示了一种技术,说明如何使用 Reader 而不是 List 在 F# 中编写递归算法,这样您就不必先将所有数据加载到内存中。

引言

F# 中的许多算法都使用 List 数据结构和递归来解决任务。这对于较小的数据集来说非常好,但当数据集变大时,如果不是不可行的话,首先将其转换为列表的开销会变得很大。本文介绍了一种简单技术,说明如何像使用 List 一样使用 IDataReader,让您可以将大型数据集流式传输到您的算法中,从而大大减少内存使用量。

背景

当您比较 ListDataReader 时,它们的实际工作方式非常相似。List 有一个 Head(当前元素)和一个包含其他元素的 TailDataReader 有一个当前元素和一组下一个元素,您可以使用 MoveNext() 访问它们。这让我想知道是否不能像使用 List 一样使用 DataReader,而无需先将所有内容加载到 List 中。事实证明,这非常简单。

更新

我写这篇文章的时候,我相对较新 F#,仍然相当固守函数式编程。我不再这样做了。对我来说,F# 的优雅之处在于您可以在最适合的地方使用函数式编程,并且可以在其比纯函数式代码更有效的地方混合使用命令式编程。F# 中的 IEnumerables 由众所周知的序列处理。因此,最佳方法是将 IDataReader 包装在 seq 中,该 seq 以强类型方式公开元素。

我将省略文章的其余部分,因为尾部递归仍然是一个有用的概念,并且互联网应该用墨水书写,但这里有一个快速片段,说明我今天会怎么做。

open System.Data.SqlClient
open System.Data

// a simple record that mirrors the table in the DB
type SomeRecord = {
    id   : int
    val1 : double
    val2 : double
} with 
    // actually these functions could exist on their own outside the record, 
    // I prefer to keep them together
    // create a new record from the current rdr position
    static member fromRdr(rdr:IDataReader) = {
                    id      = rdr.GetInt32  0; 
                    val1    = rdr.GetDouble 1; 
                    val2    = rdr.GetDouble 2
                  }
    // project an IDataReader into a seq<SomeRecord>
    static member asSeq (rdr:IDataReader) = seq { while rdr.Read() 
	do yield SomeRecord.fromRdr rdr} // here we mix in a simple while loop to 
        //free us from recursing down the stack to traverse the IDataReader. MUCH simpler.

// open up connection (I didn't wrap any of this into functions here to just 
// show how to use this)
let cn = new SqlConnection "server=.;database=SomeDatabase;integrated security=true"
cn.Open()

// open DataReader
let cmd = new SqlCommand("select id, val1, val2 from table1", cn)
let rdr = cmd.ExecuteReader()

rdr
|> SomeRecord.asSeq                     // project datareader into seq<SomeRecord>
|> Seq.sumBy (fun r-> r.val1 * r.val2)  // now you can use all the Seq operators 
                                        // that everybody knows.

// finish up
rdr.Close()
cn.Close()

这是原始文章的其余部分...

教科书 F# 递归算法

让我们从一个使用 List 的基本 F# 递归算法开始。这个算法只是接受一个数字列表并将它们相加。

 // create a list of numbers
let l = [1..10]

// create a function to add them together
let rec SumList l =
    match l with       
        head::tail -> head + Sum tail  	// peel off the head, and add 
					// its value to the value of the tail 
        | [] -> 0 				// until there are no more elements left, 
					// return zero for that

// call the function on the list, returns 55 in this case
SumList l

这是 F# 中使用递归优雅地描述算法的一个典型示例。这里添加数字的方式是将列表分成两部分:第一个元素:头部,以及其他元素:尾部。整个列表的值是头部的值加上尾部的值。废话。该函数会再次调用自身(每次都比传入的列表短一个元素)直到没有尾部。这就是 [] 所代表的,空列表。当遇到它时,它返回 0。现在它会沿着堆栈向上累加所有数字,瞧:结果。

通常,当您编写像上面那样的算法时,数据并不是硬编码在其中的,您会从某个地方将数据加载到列表中,然后应用逻辑。但这样做的问题是,现在必须将整个数据集加载到内存中。当数据集很大时,这可能会带来挑战。如果我们可以只将数据的一部分加载到内存中来运行算法,处理完后将其丢弃,然后读取下一部分,这样对内存会更好。

使用 DataReader 实现此目的

这是相同的算法,但现在使用 DataReader 而不是 list。(注意:不要现在就冲出去在生产环境中使用此代码,因为它会惨败,我们将对此进行演示)。

 let rec SumReader (dr:IDataReader) = 
    match dr.Read() with 
        true -> dr.GetInt32(1) + SumReader dr
        | false -> 0

这与上面的示例几乎一样。它接受一个 IDataReader,但不是将其分成头部和尾部,而是对其调用 movenext。如果存在新的当前元素,则返回 true,否则返回 false。如果存在下一个元素,它会将该元素的值添加到 Reader 中剩余元素的值中。就像上面的示例一样,它会一直这样做,直到没有下一个元素,然后它会展开堆栈来累加所有数字。

糟糕……没有尾部调用

使用上面的算法对于较小的数据集来说效果很好,但在较大的数据集上会引发 StackOverflowException。就异常而言,这是最糟糕的情况之一。您无法捕获它们,当您的代码中抛出异常时,就结束了。使用 Reader 而不是 List 的重点是为了能够处理更大的数据集,所以我们显然还没有做到。

我们之所以会收到此异常,是因为我们没有进行尾部调用。F# 使用一种称为尾部调用优化 (TCO) 的技术(解释 在此在此在此)来使代码运行更有效率。基本上,它会采用一个递归函数并将其重写为一个迭代函数。当原始函数满足一些条件时,这可以很容易地完成,主要条件是调用自身必须是该分支中函数所做的最后一件事。重写后,函数现在不再调用自身而是执行循环,这更有效率,因为不再有堆栈的推送和弹出。堆栈的推送是导致我们失败的原因,TCO 是我们想要的。

如果您仔细查看我们的 DataReader 示例代码,您会发现对自身的递归调用实际上并不是它做的最后一件事,即使它是该行上的最后一件事,因为它将该调用的结果添加到当前元素的值中。THAT 是它做的最后一件事,这使得 F# 无法将其重写为尾部调用。(感谢 Tomas Petricek 指出这一点)。那么我们可以怎么做呢?

进入累加器模式

我们需要做的是传递一个额外的变量来保存到目前为止的结果。每次我们将当前元素的值添加到这个累加结果中,然后将其传递给下一个调用。这样,我们就不必在调用返回后做任何事情。这项技术被称为累加器模式。当我们这样做时,对自身的调用 IS 它是最后一件事情,F# 编译器可以执行尾部调用优化,这将使我们能够处理更大的数据集。以下是它的样子。

let rec SumReaderAcc (dr:IDataReader, accum) =
    match dr.Read() with
        true -> SumReaderAcc(dr, accum + dr.GetInt32(1)) // now this is the last thing 
							// it does, enabling TCO
        |false -> accum		// when there are no more elements, 
				// return the accumulator

现在可以去修复您的生产问题了,但如果您再留一会儿,我将向您展示如何将其变成一种更通用的方法,您可以在您的代码中重用它。总的来说,类型化是不好的,重用是好的。

让事情变得更通用

到目前为止,此代码的功能是固定的,它添加了查询的第一列。由于 F# 是一种函数式语言,为什么不传递一个函数来应用于 Reader 中的每个元素以获取一个值(函数式依赖注入,太极客了)。这样,它将已经更通用一些。这是代码。

// passing the function to apply to each element as a parameter 
// already makes it more generic
let rec SumReaderF (dr:IDataReader, func, accum) =
    match dr.Read() with
        true -> SumReaderF(dr, func, accum + func(dr)) // pass reader and func 
	// on to next call, and also apply func to element and add to accumulator
        |false -> accum

// create a function that takes an IDataReader and returns an int
let myAdd(element:IDataReader) = element.GetInt32(1)

// apply the function to each element in the reader
SumReaderF(rdr, myAdd, 0)

现在已经好多了,但我们仍然可以做得更好。首先,用户现在必须输入一个起始零给 accum 才能工作,并且它只累加对元素的调用,无法例如进行乘法或平均。下面的代码修复了这两个问题,并且是我们今天将要做的。

let ApplyFuncToReader (dr:IDataReader, func, agg) =    
    let rec ApplyInternal (accum) = 	// create inner function to handle 
				//the recursion and accumulator. 
				// No need to pass everything for each call, 
				// just the accumulator will do
        match dr.Read() with
            true -> ApplyInternal(agg(accum, func(dr))) // apply func to element 
						// and agg to func
            |false -> 
                dr.Close()  // be nice, close the reader
                accum       // and return the result
    ApplyInternal 0 	// and execute that inner function passing 0 
			// as starting value for accumulator

Using the Code

这是调用上述代码的一个示例。

// create a function to apply to each element
let myElementFunction(element:IDataReader) = element.GetInt32(1)

// create an aggregate function to apply between each call to elements
let myAggregate(x, y) = x + y

// create a SqlDataReader (NewReader is a helper function 
// I wrote that returns an open SqlDataReader)
let rdr = NewReader()

// and pass all three to ApplyFuncToReader
ApplyFuncToReader (rdr, myElementFunction, myAggregate)

首先,我们创建 2 个函数,一个应用于 IDataReader 中的每个元素以提取一个值,第二个函数用于聚合对第一个函数的调用。这里我们只是将它们相加,但我们也可以将它们相乘或做更复杂的事情。我们不必更改 ApplyFuncToReader 本身来实现来改变其行为。

关注点

上面的代码在日常工作中遇到的绝大多数情况都能正常工作,尤其是在使用 F# interactive 时。在多线程代码中使用时,它与使用 List 不完全相同,因为其他人可能会篡改您的 Reader,使您的结果不可靠。尽管 resultset 受您选择的 SQL 事务保护,但 Reader 本身可能会在您无法控制的情况下前进到下一个元素。这些事情可以轻易避免,但重要的是要理解,您不像使用 List 那样受到不变性的保护。

我选择 IDataReader 是因为数据库通常是我数据所在的地方。当您的数据存储在文件或来自云端的流中时,当然也可以将相同的技术应用于 FileReaderStreamReader。您不再需要将其全部缓冲到内存中就可以对其进行处理,这对于大量数据来说是一个很大的优点。我希望这能帮助您,就像它帮助我一样。如果可以,请给我写信或评价我的文章。

历史

  • 2010-07-21:版本 1.0
  • 2011-10-07:更新文章
© . All rights reserved.