倒排索引:Scala逐步实现指南





5.00/5 (2投票s)
探索使用倒排索引实现文档搜索的有效方法
在开始实现之前,我们先来谈谈在实际生活中为什么你需要倒排索引。
为什么需要倒排索引?
想象一下,你需要创建一个系统,能够根据文档中的几个词快速查找该文档——有点像维基百科的搜索。我能想到的最简单的选择是扫描每一篇文档,标记出包含所有必需词的文档。这起初可能有效,但这种解决方案无法扩展,因为每增加一篇新文档都会增加任何查询的响应时间。
我们能做得更好吗?当然可以!如果用户想通过词语进行搜索,那么词语就应该是我们“数据库”(索引)中的键。在这种情况下,值又是什么呢?所有包含该词的文档 ID(或其他任何唯一引用/指针)。
倒排索引的工作原理
想象一下我们有这样的文档
id: 1; Text: "In a hole in the ground there lived a hobbit."
id: 2; Text: "The sky above the port was the color of television, tuned to a dead channel."
id: 3; Text: "Hobbits Are Amazing Creatures"
在这种情况下,代表我们索引的映射会是这样的
{
"hobbit": [1, 3],
"television": [2],
"hole": [1],
...
}
我们是怎么得到这个结果的,看起来很清楚,对吧?
现在,如果用户查询系统“hobbit”,我们可以在常数时间内查找这个键(因为它在映射中),并返回文档 1 和 3。
这种结构也允许我们执行复杂的查询,如 NOT、AND 和 OR。我们为每个键获取文档集,然后进行常规的集合操作,例如 `AND` 情况下的交集,或 `NOT` 情况下的差集。因此,对于“hobbit AND hole
”这样的查询,我们将查找集合 [1, 3] 和 [1],它们的交集是 [1],我们将返回 ID 为 1 的文档。
显然,这只是冰山一角。这是最简单的实现——真实的文档索引/查询系统可以根据相关性对结果进行排名,执行不同类型的模糊搜索、分面搜索等。但简单的实现是一个很好的起点,实现它能让我们更深入地理解这个概念。所以,我们继续实现。
实现过程
我将从“底层”开始——所以我们将首先实现一个表示 `Inverted` `Index` 的类,使其能够添加和查找其中的令牌,然后添加一些助手来索引完整的文档,最后提供允许我们将它作为应用程序实际运行的脚手架。
那么,我们从一个代表 `InvertedIndex` 本身的类开始吧。
type Token = String
type Filename = String
case class InvertedIndex(
tokenIndex: Map[Token, Set[Filename]] = HashMap[Token, Set[Filename]]()
)
正如我在文章开头提到的,我们自己的索引将由一个普通的 `Map[String, Set[String]]` 提供支持——从令牌(索引词)到文档 ID 集合(或者在我们的例子中只是文档名)。
目前还没有太多内容——让我们做些有用的事情,至少添加一个方法来将单词添加到索引中,并在之后获取已添加的文档。
def add(token: Token, filename: Filename): InvertedIndex = {
tokenIndex.get(token) match {
case Some(set) =>
// we already know this token - add filename to the set
InvertedIndex(tokenIndex.updated(token, set + filename))
case None =>
// token didn't previously exist - creating new set
InvertedIndex(tokenIndex.updated(token, Set(filename)))
}
}
def getDocumentsForToken(token: Token): Set[Filename] =
tokenIndex.getOrElse(token, Set.empty[String])
这里的内容不多,但我们的倒排索引已经可用了!
不过,我们仍然需要一些东西将单词添加到索引中。我们将为此创建一个 `IndexGenerator` 类。
class IndexGenerator(filesToIndex: List[String]) {
def generateIndex(): InvertedIndex = {
logger.info("Indexer started")
filesToIndex
.map(readFileFromDisk)
.foldLeft(InvertedIndex()) { case (idx, doc) =>
processDocument(idx, doc)
}
}private def processDocument(index: InvertedIndex,
document: GenericDocument): InvertedIndex =
document.text
.split(" ")
.flatMap(processToken)
.foldLeft(index)(_.add(_, document.fileName))
private def processToken(token: String): Option[String] = {
val tokenLower = token.toLowerCase
if (tokenLower.isEmpty ||
StringUtils.getStopWords.contains(tokenLower)) {
None
} else {
Some(tokenLower.removeTags().removeNumbers())
}
}
}
那么我们这里有什么?
`IndexGenerator` 接收一个文件列表进行索引,从磁盘读取它们,并“处理”。在我们的例子中,处理只是将整个文本分解成单词(令牌),通过 `processToken` 函数进行少量清理,然后将它们逐个添加到 `InvertedIndex` 中。
我在这里不详细介绍 `StringUtils` 函数——它们的实现从名称来看非常明显,并且在任何实际应用中,你都会花一些时间来制定好的数据清理规则。有一点需要提到的是停用词——那些在任何文本中都很常见,但对于实际搜索没有太大帮助的词——例如 `the`、`a` 等。在实际场景中,你还需要进行一些词形还原,将单词转换为其基本形式——例如,单词“democratical”将被转换为令牌“democracy”,从而提供一个更简洁的令牌词典并简化查找。
但所有这些文本处理方法都是一个单独的讨论主题,并且有许多开源库可用。
此时,我们已经拥有了索引器工作所需的一切。让我们添加一些方法来查看索引的内部情况,获取一些结果或执行用户查询。
访问索引
我们已经看到了获取单个令牌所有文档的最简单方法。但正如我所提到的,通过一些集合魔法,我们可以回答更复杂的查询。例如,获取包含所有令牌的文档。
def getDocumentsForAllTokens(tokens: Set[Token]): Set[Filename] =
tokens
.map(getDocumentsForToken)
.reduce((d1: Set[String], d2: Set[String]) => d1.intersect(d2))
或者包含列表中的至少一个令牌的文档。
def getDocumentsForAllTokens(tokens: Set[Token]): Set[Filename] =
tokens
.map(getDocumentsForToken)
.reduce((d1: Set[String], d2: Set[String]) => d1.union(d2))
最后一个将为我们提供一些关于我们索引中单词的统计信息——我们将查看哪些单词最受欢迎。
def getTopWords(count: Int): List[(Token, Int)] =
tokenIndex.toList
// sort tokens by size of documents list
.sortBy(_._2.size)
.reverse
.map { case (token, docs) =>
token -> docs.size
}
.take(count)
这个方法在你决定在索引过程中应该将什么视为“停用词”时很有帮助。移除任何过滤,将所有单词添加到索引,然后取前 50 个——你会看到那些 `the` 和 `and`。
我只想再加一点来增加趣味性——多线程。
并行运行
目前绝大多数 CPU 都有多个核心,因此可以并行执行多个操作。但我们的代码目前没有利用这一事实——我们只是一个接一个地读取文件,一个接一个地添加令牌。为什么不利用我们拥有的更多核心呢?添加如此强大的功能非常容易,而且在我们的“业务逻辑”方面(在 `InvertedIndex` 中)变化会很小。
我们将采用“分而治之”的方法——所以我们将把初始任务分解成更小、更容易处理的块,然后以高效的方式将所有结果合并在一起。
现在我们将添加一个 `merge` 方法来合并我们单独的索引。
我们可以逐个添加单词,但那样的话,我们将失去已经创建了 2 个索引的优势。相反,我们首先创建一个所有键的`并集`,然后对于每个键,我们`并集`与该键对应的文档。
def merge(other: InvertedIndex): InvertedIndex = {
val unionTokens = this.tokenIndex.keys.toSet
.union(other.tokenIndex.keys.toSet)
val unionMap = unionTokens.map { token =>
val unionSet = this.tokenIndex
.getOrElse(token, Set.empty[Filename])
.union(other.tokenIndex
.getOrElse(token, Set.empty[Filename])
)
token -> unionSet
}.toMap
InvertedIndex(unionMap)
}
让我们也做一些脚手架来并行运行我们现有的 `IndexGenerator` 的多个实例。
import scala.collection.parallel.CollectionConverters._
def runIndexer(inputDir: String, numThreads: Int): InvertedIndex = {
val filesToIndex = FileOps.getFilesToIndex(inputDir)
val groupSize = getGroupSize(filesToIndex, numThreads)
filesToIndex
.grouped(groupSize)
.toList
.par
.map { files =>
val indexGenerator = new IndexGenerator(files)
indexGenerator.generateIndex()
}
.reduce { (i1, i2) =>
i1.merge(i2)
}
}
def getGroupSize(filesToIndex: List[String], numThreads: Int): Int = {
val groupSize = filesToIndex.size / numThreads
if (groupSize == 0) 1 else groupSize
}
<meta charset="utf-8" />
我们将文件分成 `numThreads` 个相等的部分,为每个部分运行 `generateIndex`,然后合并结果。
所有的并行化从何而来?仅仅来自于 `par` 方法,得益于 _“org.scala-lang.modules” %% “scala-parallel-collections”_ 库。
我不会详细介绍如何使用 JMH 对代码进行基准测试(你可以在提供的仓库中找到所有代码),但这里是一些使用不同线程数的测试结果——速度提升非常明显。
numThreads Score / Error Units
1 3.487 ± 0.181 s/op
2 1.859 ± 0.010 s/op
4 0.996 ± 0.020 s/op
结论
总结一下,在这篇文章中我们
- 发现了一种使用倒排索引实现文档搜索的高效方法
- 编写了该索引的基本实现,并简要讨论了与之配套的文本处理
- 为我们的代码添加了多线程功能,以利用现代硬件的多核架构
希望这有帮助,欢迎在评论区提出您的反馈或任何问题。
历史
- 2023年9月25日:初始版本