用于近似模式匹配的简单可并行算法,使用后缀树






3.14/5 (4投票s)
在这篇文章中,我想展示一种简单的近似模式匹配算法。
这个术语通常意味着您想在一个大字符串(文本)中找到一个短字符串(模式)出现的所有位置。以这种形式,它定义了精确模式匹配。但在我们的例子中,我们将允许一些“错误”(文本中的符号与模式中的符号不相等的地点)。我们只想限制这些错误的数量。这就是为什么它被称为近似模式匹配。
例如,模式“CAR”在文本“ABRACADABRA”的第 4 个位置(从零开始)出现,有一个错误(“R”与“D”不相等)。
在本文中,我将描述一种使用后缀树解决近似模式匹配问题的算法。我假设您知道什么是后缀树,如何构建它以及如何使用它来解决精确模式匹配问题。在这里我只会简要提及这些主题。如果您想了解更多关于这些内容,我推荐您在Coursera上参加一个很棒的课程字符串上的算法。
精确模式匹配算法
让我们从回顾一下解决精确模式匹配问题的算法开始。
public static class SuffixTreeSearch<TSymbol>
{
public static IEnumerable<StringSearchMatch> Search(
IReadOnlyList<TSymbol> text,
TSymbol stopSymbol,
IEnumerable<IEnumerable<TSymbol>> patterns,
IComparer<TSymbol> comparer = null)
{
if (text == null) throw new ArgumentNullException(nameof(text));
if (patterns == null)
yield break;
var suffixTree = SuffixTreeCreator<TSymbol>.CreateSuffixTree(text, stopSymbol, comparer);
foreach (var pattern in patterns)
{
foreach (var match in GetMatches(pattern.ToArray(), suffixTree))
{
yield return match;
}
}
}
private static IEnumerable<StringSearchMatch> GetMatches(TSymbol[] pattern, SuffixTree<TSymbol> suffixTree)
{
SuffixTreeNode<TSymbol> node = GetPatternNode(pattern, suffixTree);
if(node == null)
yield break;
var starts = GetSuffixStarts(node);
foreach (var start in starts.Where(s => s < suffixTree.Text.Count - 1))
{
yield return new StringSearchMatch(start, pattern.Length);
}
}
private static SuffixTreeNode<TSymbol> GetPatternNode(TSymbol[] pattern, SuffixTree<TSymbol> suffixTree)
{
if (pattern.Length == 0)
return suffixTree.Root;
var lastEdge = GetLastEdge(pattern, suffixTree);
return lastEdge?.To;
}
private static SuffixTreeEdge<TSymbol> GetLastEdge(TSymbol[] pattern, SuffixTree<TSymbol> suffixTree)
{
var node = suffixTree.Root;
var patternIndex = 0;
while (true)
{
if (!node.Edges.ContainsKey(pattern[patternIndex]))
return null;
var edge = node.Edges[pattern[patternIndex]];
var matchLength = GetMatchLength(edge, suffixTree.Text, pattern, patternIndex);
patternIndex += matchLength;
if (patternIndex >= pattern.Length)
return edge;
if (matchLength < edge.Length)
return null;
node = edge.To;
}
}
private static int GetMatchLength(SuffixTreeEdge<TSymbol> edge, IReadOnlyList<TSymbol> edgeText, TSymbol[] pattern, int patternIndex)
{
var edgeIndex = (int)edge.Start;
var matchLength = 0;
while (true)
{
if (!edgeText[edgeIndex].Equals(pattern[patternIndex]))
break;
matchLength++;
edgeIndex++;
patternIndex++;
if (edgeIndex >= edge.Start + edge.Length)
break;
if (patternIndex >= pattern.Length)
break;
}
return matchLength;
}
private static IEnumerable<int> GetSuffixStarts(SuffixTreeNode<TSymbol> node)
{
var queue = new Queue<SuffixTreeNode<TSymbol>>();
queue.Enqueue(node);
while (queue.Count > 0)
{
node = queue.Dequeue();
if (node.SuffixStart.HasValue)
yield return (int) node.SuffixStart.Value;
foreach (var edge in node.Edges.Values)
{
queue.Enqueue(edge.To);
}
}
}
}
主要方法是 `Search`。它接受文本、终止符(用于确保任何后缀都不是另一个后缀的前缀)、一组模式以及符号比较器。此方法会创建一个后缀树并枚举所有模式以查找每个模式的匹配项。
方法 `GetMatches` 查找单个模式的所有匹配项。首先,它在后缀树中找到从树根到模式所指向的节点(调用 `GetPatternNode`)。然后,对于该节点,它查找属于该节点的、所有后缀的起始位置(调用 `GetSuffixStarts`)。这些位置就是我们所寻找的。
方法 `GetPatternNode` 使用 `GetEdgeMatch` 来查找模式停止的边(在后缀树中,每条边都包含初始文本的一部分)。我们从根节点开始,沿着边逐个移动,直到边上的相应符号与模式上的相应符号相等。
方法 `GetSuffixStarts` 很简单。后缀树中的叶节点包含相应后缀的起始位置。我们只需从找到的节点的子树中收集所有这些位置。
您可以在这里找到算法的完整代码。在同一个存储库中,您还可以找到用于创建后缀树的类,如果您需要的话。
现在,当我们回顾了精确模式匹配算法的工作原理后,让我们继续讨论近似模式匹配。
近似模式匹配
近似模式匹配的想法很简单。当我们寻找精确匹配时,从每个节点最多只有一条路径,最多只有一个边需要考虑。当我们寻找近似匹配时,我们必须考虑节点的所有路径,所有出边。只有当某个路径上的错误数量超过我们的限制时,我们才应该放弃这条路径。
看代码
public static class SuffixTreeApproximateSearch<TSymbol>
{
private class TaskDescription
{
public TaskDescription(
SuffixTreeNode<TSymbol> startNode,
IReadOnlyList<TSymbol> pattern,
int patternStartIndex,
uint numberOfErrors)
{
StartNode = startNode;
Pattern = pattern;
PatternStartIndex = patternStartIndex;
NumberOfErrors = numberOfErrors;
}
public SuffixTreeNode<TSymbol> StartNode { get; }
public IReadOnlyList<TSymbol> Pattern { get; }
public int PatternStartIndex { get; }
public uint NumberOfErrors { get; }
}
private class EdgeMatch
{
public EdgeMatch(int matchLength, uint numberOfErrors)
{
MatchLength = matchLength;
NumberOfErrors = numberOfErrors;
}
public int MatchLength { get; }
public uint NumberOfErrors { get; }
}
public static IEnumerable<StringSearchApproximateMatch<TSymbol>> Search(
IReadOnlyList<TSymbol> text,
TSymbol stopSymbol,
IEnumerable<IEnumerable<TSymbol>> patterns,
uint numberOfErrors,
IComparer<TSymbol> comparer = null)
{
if (text == null) throw new ArgumentNullException(nameof(text));
if (patterns == null)
return new StringSearchApproximateMatch<TSymbol>[0];
comparer = new StopSymbolFirstComparer<TSymbol>(comparer ?? Comparer<TSymbol>.Default, stopSymbol);
var suffixTree = SuffixTreeCreator<TSymbol>.CreateSuffixTree(text, stopSymbol, comparer);
return patterns
.SelectMany(pattern => GetMatches(pattern.ToArray(), numberOfErrors, suffixTree, comparer));
}
private static IEnumerable<StringSearchApproximateMatch<TSymbol>> GetMatches(
IReadOnlyList<TSymbol> pattern,
uint numberOfErrors,
SuffixTree<TSymbol> suffixTree,
IComparer<TSymbol> comparer)
{
Queue<TaskDescription> queue = new Queue<TaskDescription>();
queue.Enqueue(new TaskDescription(
suffixTree.Root,
pattern,
0,
numberOfErrors));
LinkedList<SuffixTreeNode<TSymbol>> foundNodes = new LinkedList<SuffixTreeNode<TSymbol>>();
while (queue.Count > 0)
{
TaskDescription task = queue.Dequeue();
ProcessSearchTask(queue, foundNodes, task, suffixTree.Text, comparer);
}
return foundNodes
.SelectMany(GetSuffixStarts)
.Where(s => PatternDoesNotOverlapStopSymbol(s, pattern.Count, suffixTree.Text.Count))
.Select(s => new StringSearchApproximateMatch<TSymbol>(s, pattern));
}
private static bool PatternDoesNotOverlapStopSymbol(int startPosition, int patternLength, int textLength)
{
if (startPosition >= textLength - 1)
return false;
if (patternLength > 0 && startPosition + patternLength >= textLength)
return false;
return true;
}
private static void ProcessSearchTask(
Queue<TaskDescription> queue,
LinkedList<SuffixTreeNode<TSymbol>> foundNodes,
TaskDescription task,
IReadOnlyList<TSymbol> text,
IComparer<TSymbol> comparer)
{
if (task.PatternStartIndex >= task.Pattern.Count)
{
foundNodes.AddLast(task.StartNode);
return;
}
foreach (var edge in task.StartNode.Edges.Values)
{
EdgeMatch edgeMatch = GetEdgeMatch(edge, text, task.Pattern, task.PatternStartIndex, comparer);
if(edgeMatch.NumberOfErrors > task.NumberOfErrors)
continue;
var newPatternStartIndex = task.PatternStartIndex + edgeMatch.MatchLength;
if (newPatternStartIndex >= task.Pattern.Count)
{
foundNodes.AddLast(edge.To);
continue;
}
queue.Enqueue(new TaskDescription(
edge.To,
task.Pattern,
newPatternStartIndex,
task.NumberOfErrors - edgeMatch.NumberOfErrors
));
}
}
private static EdgeMatch GetEdgeMatch(
SuffixTreeEdge<TSymbol> edge,
IReadOnlyList<TSymbol> text,
IReadOnlyList<TSymbol> pattern,
int patternStartIndex,
IComparer<TSymbol> comparer)
{
int matchLength = 0;
uint numberOfErrors = 0;
while (true)
{
var textIndex = edge.Start + matchLength;
var pattentIndex = patternStartIndex + matchLength;
if(matchLength >= edge.Length)
break;
if(pattentIndex >= pattern.Count)
break;
if (comparer.Compare(text[(int) textIndex], pattern[pattentIndex]) != 0)
numberOfErrors++;
matchLength++;
}
return new EdgeMatch(matchLength, numberOfErrors);
}
private static IEnumerable<int> GetSuffixStarts(SuffixTreeNode<TSymbol> node)
{
var queue = new Queue<SuffixTreeNode<TSymbol>>();
queue.Enqueue(node);
while (queue.Count > 0)
{
node = queue.Dequeue();
if (node.SuffixStart.HasValue)
yield return (int) node.SuffixStart.Value;
foreach (var edge in node.Edges.Values)
{
queue.Enqueue(edge.To);
}
}
}
}
方法 `Search` 与之前算法相同。但在 `GetMatches` 的实现中,有重大变化。在这里,我们维护一个待解决的任务队列和一个当前模式的已找到节点列表。一个任务由 `TaskDescription` 类表示。该任务说明,我们应该在后缀树的节点子树(`StartNode` 属性)内,从特定位置(`PatternStartIndex` 属性)开始,在允许的错误数量(`NumberOfErrors` 属性)限制下,查找模式(`Pattern` 属性)的所有可能匹配项。在 `while` 循环中,我们将解决我们的任务,然后使用相同的 `GetSuffixStarts` 方法从找到的节点返回结果。这个最后一部分唯一的区别是,现在我们的模式可以与终止符重叠,因为我们允许一定数量的错误。使用 `PatternDoesNotOverlapStopSymbol` 方法进行过滤可以防止这种情况。
一个任务的解决是由 `ProcessSearchTask` 方法实现的。首先,我们检查我们的模式是否已经处理完毕。在这种情况下,我们只需将当前节点添加到已找到的节点列表中。否则,我们考虑从当前节点出发的所有边。对于每条边,我们使用 `GetEdgeMatch` 方法计算两个数字:沿着该边比较的符号数量(`EdgeMatch` 类中的 `MatchLength` 属性)和比较过程中发生的错误数量(`EdgeMatch` 类中的 `NumberOfErrors` 属性)。如果这个错误数量超过了我们的限制,我们就什么也不做。否则,如果已到达模式的末尾,我们将边的末端添加到已找到的节点列表中。否则,我们创建一个新任务来处理。这个任务包含另一个节点、模式中的另一个起始位置和另一个允许的错误数。
我想强调的是,`ProcessSearchTask` 方法可能无法完全解决其任务。相反,它可以生成几个更小(更简单)的任务来解决。
这就是近似模式匹配算法的工作原理。很简单,不是吗? :-) 您可以在这里查看完整代码。
尽管此算法有效,但还有另一种简单的方法可以使其更好。我指的是多线程。
多线程近似模式匹配
看看上面的算法。我们所做的就是生成和解决不同的任务。还要注意,所有任务都是独立的。我们的后缀树和模式是只读的。我们从不更改任务描述中的任何位。这种不变性是一个好迹象,表明我们可以应用多线程处理。
让我们研究一下新算法。
public static class MultithreadedSuffixTreeApproximateSearch<TSymbol>
{
private class TaskDescription
{
public TaskDescription(SuffixTreeNode<TSymbol> startNode, IReadOnlyList<TSymbol> pattern, int patternStartIndex, uint numberOfErrors)
{
StartNode = startNode;
Pattern = pattern;
PatternStartIndex = patternStartIndex;
NumberOfErrors = numberOfErrors;
}
public SuffixTreeNode<TSymbol> StartNode { get; }
public IReadOnlyList<TSymbol> Pattern { get; }
public int PatternStartIndex { get; }
public uint NumberOfErrors { get; }
}
private class EdgeMatch
{
public EdgeMatch(int matchLength, uint numberOfErrors)
{
MatchLength = matchLength;
NumberOfErrors = numberOfErrors;
}
public int MatchLength { get; }
public uint NumberOfErrors { get; }
}
private class Waiter
{
private readonly object _lock = new object();
private int _counter;
public void Increment()
{
lock (_lock) { _counter++; }
}
public void Decrement()
{
lock (_lock) { _counter--; }
}
public bool IsFinished()
{
lock (_lock) { return _counter == 0; }
}
}
public static IEnumerable<StringSearchApproximateMatch<TSymbol>> Search(
IReadOnlyList<TSymbol> text,
TSymbol stopSymbol,
IEnumerable<IEnumerable<TSymbol>> patterns,
uint numberOfErrors,
IComparer<TSymbol> comparer = null)
{
if (text == null) throw new ArgumentNullException(nameof(text));
if (patterns == null)
return new StringSearchApproximateMatch<TSymbol>[0];
comparer = new StopSymbolFirstComparer<TSymbol>(comparer ?? Comparer<TSymbol>.Default, stopSymbol);
var suffixTree = SuffixTreeCreator<TSymbol>.CreateSuffixTree(text, stopSymbol, comparer);
var waiter = new Waiter();
var results = new ConcurrentBag<StringSearchApproximateMatch<TSymbol>>();
foreach (var pattern in patterns)
{
waiter.Increment();
var task = new TaskDescription(
suffixTree.Root,
pattern.ToArray(),
0,
numberOfErrors);
ThreadPool.QueueUserWorkItem(state =>
{
ProcessSearchTask(results, waiter, suffixTree.Text, comparer, task);
});
}
while (!waiter.IsFinished())
{
Thread.Sleep(0);
}
return results;
}
private static bool PatternDoesNotOverlapStopSymbol(int startPosition, int patternLength, int textLength)
{
if (startPosition >= textLength - 1)
return false;
if (patternLength > 0 && startPosition + patternLength >= textLength)
return false;
return true;
}
private static void ProcessSearchTask(ConcurrentBag<StringSearchApproximateMatch<TSymbol>> results, Waiter waiter, IReadOnlyList<TSymbol> text, IComparer<TSymbol> comparer, TaskDescription task)
{
if (task.PatternStartIndex >= task.Pattern.Count)
{
AddNodeMatches(results, task.StartNode, task.Pattern, text.Count);
waiter.Decrement();
return;
}
foreach (var edge in task.StartNode.Edges.Values)
{
EdgeMatch edgeMatch = GetEdgeMatch(edge, text, task.Pattern, task.PatternStartIndex, comparer);
if(edgeMatch.NumberOfErrors > task.NumberOfErrors)
continue;
var newPatternStartIndex = task.PatternStartIndex + edgeMatch.MatchLength;
if (newPatternStartIndex >= task.Pattern.Count)
{
AddNodeMatches(results, edge.To, task.Pattern, text.Count);
continue;
}
var newTask = new TaskDescription(
edge.To,
task.Pattern,
newPatternStartIndex,
task.NumberOfErrors - edgeMatch.NumberOfErrors
);
waiter.Increment();
ThreadPool.QueueUserWorkItem(state =>
{
ProcessSearchTask(results, waiter, text, comparer, newTask);
});
}
waiter.Decrement();
}
private static void AddNodeMatches(ConcurrentBag<StringSearchApproximateMatch<TSymbol>> results, SuffixTreeNode<TSymbol> node, IReadOnlyList<TSymbol> pattern, int textLength)
{
foreach (var start in GetSuffixStarts(node).Where(s => PatternDoesNotOverlapStopSymbol(s, pattern.Count, textLength)))
{
results.Add(new StringSearchApproximateMatch<TSymbol>(start, pattern));
}
}
private static EdgeMatch GetEdgeMatch(
SuffixTreeEdge<TSymbol> edge,
IReadOnlyList<TSymbol> text,
IReadOnlyList<TSymbol> pattern,
int patternStartIndex,
IComparer<TSymbol> comparer)
{
int matchLength = 0;
uint numberOfErrors = 0;
while (true)
{
var textIndex = edge.Start + matchLength;
var pattentIndex = patternStartIndex + matchLength;
if(matchLength >= edge.Length)
break;
if(pattentIndex >= pattern.Count)
break;
if (comparer.Compare(text[(int) textIndex], pattern[pattentIndex]) != 0)
numberOfErrors++;
matchLength++;
}
return new EdgeMatch(matchLength, numberOfErrors);
}
private static IEnumerable<int> GetSuffixStarts(SuffixTreeNode<TSymbol> node)
{
var queue = new Queue<SuffixTreeNode<TSymbol>>();
queue.Enqueue(node);
while (queue.Count > 0)
{
node = queue.Dequeue();
if (node.SuffixStart.HasValue)
yield return (int) node.SuffixStart.Value;
foreach (var edge in node.Edges.Values)
{
queue.Enqueue(edge.To);
}
}
}
}
这段代码背后的想法很简单。当我们创建一个新任务时,我们不存储它,而是立即使用 `ThreadPool.QueueUserWorkItem` 来执行它。这使我们能够立即使用线程池。由于我们同时处理许多任务,因此我们必须使用并发集合来收集它们执行的结果。我们为此使用了 `ConcurrentBag`。
唯一需要解决的问题是我们何时停止。我们必须以某种方式知道所有任务都已执行并且没有更多任务了。为了实现这个目标,我实现了一个简单的 `Waiter` 类。它包含一个计数器,并允许以线程安全的方式递增和递减它以及与零进行比较。在将新任务放入线程池之前,我们增加计数器。当任何任务完成时,我们递减计数器。现在,我们只需要等待计数器变为零。
我承认,我不是多线程代码专家。我相信有人能想出更好的方法来等待所有任务。
该类的完整代码可以在这里找到。
效率
在这里,我将尝试以某种方式估计算法的效率。请注意,我们将讨论算法的非多线程版本。另外,您应该知道我不是这方面的问题专家。所以您应该对本文的这部分内容持保留态度。
首先,您不应该忘记构建后缀树所需的时间。它可以以 O(文本长度) 的时间构建。
现在让我们看看 `SuffixTreeApproximateSearch` 类的作用。主要工作在 `ProcessSearchTask` 方法中完成。假设我们的字母表(文本和模式中可能出现的所有字母)包含 L 个字母。这意味着,在最坏的情况下,后缀树的每个节点将包含大约 L 个子节点(如果我们计算终止符,则为 L + 1)。在最坏的情况下,树的每条边可能只对应文本的一个符号。这意味着每个任务(`TaskDescription` 类的一个实例)将只比较模式的一个符号与文本的一个符号。
当我们还有错误空间时,每个节点的任务都会生成 L 个子任务。如果我们总共允许 N 个错误,那么在错误能力耗尽之前,至少会生成 L^N (L 的 N 次方) 个任务。所以算法的效率不能优于 O(L^N)。
之后,我们不能再犯错误了。所以我们回到了精确模式匹配问题。如果模式的长度远大于 N,那么算法的这一部分对于在前一部分中访问的每个节点来说,最多需要 O(模式长度) 的时间。
所以总时间大约是 O(<文本长度> + L^N * <模式长度>)。当然,这个估计只针对单个模式。对于多个模式,时间会相应地增加。
结论
在本文中,我介绍了一种近似模式匹配算法。我必须告知您,有更有效的算法可以解决相同的问题。该算法的优点是简单易懂,并且能够利用多线程来解决其任务,从而利用多个处理器内核的强大功能。
我希望它能对您的应用有所帮助。
您可以在我的博客上阅读我更多的文章.