65.9K
CodeProject 正在变化。 阅读更多。
Home

用于近似模式匹配的简单可并行算法,使用后缀树

starIconstarIconstarIcon
emptyStarIcon
starIcon
emptyStarIcon

3.14/5 (4投票s)

2017年11月20日

CPOL

7分钟阅读

viewsIcon

10607

在这篇文章中,我想展示一种简单的近似模式匹配算法。

这个术语通常意味着您想在一个大字符串(文本)中找到一个短字符串(模式)出现的所有位置。以这种形式,它定义了精确模式匹配。但在我们的例子中,我们将允许一些“错误”(文本中的符号与模式中的符号不相等的地点)。我们只想限制这些错误的数量。这就是为什么它被称为近似模式匹配。

例如,模式“CAR”在文本“ABRACADABRA”的第 4 个位置(从零开始)出现,有一个错误(“R”与“D”不相等)。

在本文中,我将描述一种使用后缀树解决近似模式匹配问题的算法。我假设您知道什么是后缀树,如何构建它以及如何使用它来解决精确模式匹配问题。在这里我只会简要提及这些主题。如果您想了解更多关于这些内容,我推荐您在Coursera上参加一个很棒的课程字符串上的算法

精确模式匹配算法

让我们从回顾一下解决精确模式匹配问题的算法开始。

public static class SuffixTreeSearch<TSymbol>
{
    public static IEnumerable<StringSearchMatch> Search(
        IReadOnlyList<TSymbol> text,
        TSymbol stopSymbol,
        IEnumerable<IEnumerable<TSymbol>> patterns,
        IComparer<TSymbol> comparer = null)
    {
        if (text == null) throw new ArgumentNullException(nameof(text));

        if (patterns == null)
            yield break;

        var suffixTree = SuffixTreeCreator<TSymbol>.CreateSuffixTree(text, stopSymbol, comparer);

        foreach (var pattern in patterns)
        {
            foreach (var match in GetMatches(pattern.ToArray(), suffixTree))
            {
                yield return match;
            }
        }
    }

    private static IEnumerable<StringSearchMatch> GetMatches(TSymbol[] pattern, SuffixTree<TSymbol> suffixTree)
    {
        SuffixTreeNode<TSymbol> node = GetPatternNode(pattern, suffixTree);
        if(node == null)
            yield break; 

        var starts = GetSuffixStarts(node);

        foreach (var start in starts.Where(s => s < suffixTree.Text.Count - 1))
        {
            yield return new StringSearchMatch(start, pattern.Length);
        }
    }

    private static SuffixTreeNode<TSymbol> GetPatternNode(TSymbol[] pattern, SuffixTree<TSymbol> suffixTree)
    {
        if (pattern.Length == 0)
            return suffixTree.Root;

        var lastEdge = GetLastEdge(pattern, suffixTree);

        return lastEdge?.To;
    }

    private static SuffixTreeEdge<TSymbol> GetLastEdge(TSymbol[] pattern, SuffixTree<TSymbol> suffixTree)
    {
        var node = suffixTree.Root;
        var patternIndex = 0;

        while (true)
        {
            if (!node.Edges.ContainsKey(pattern[patternIndex]))
                return null;

            var edge = node.Edges[pattern[patternIndex]];

            var matchLength = GetMatchLength(edge, suffixTree.Text, pattern, patternIndex);

            patternIndex += matchLength;

            if (patternIndex >= pattern.Length)
                return edge;

            if (matchLength < edge.Length)
                return null;

            node = edge.To;
        }

    }

    private static int GetMatchLength(SuffixTreeEdge<TSymbol> edge, IReadOnlyList<TSymbol> edgeText, TSymbol[] pattern, int patternIndex)
    {
        var edgeIndex = (int)edge.Start;

        var matchLength = 0;

        while (true)
        {
            if (!edgeText[edgeIndex].Equals(pattern[patternIndex]))
                break;
            matchLength++;
            edgeIndex++;
            patternIndex++;
            if (edgeIndex >= edge.Start + edge.Length)
                break;
            if (patternIndex >= pattern.Length)
                break;
        }

        return matchLength;
    }

    private static IEnumerable<int> GetSuffixStarts(SuffixTreeNode<TSymbol> node)
    {
        var queue = new Queue<SuffixTreeNode<TSymbol>>();
        queue.Enqueue(node);

        while (queue.Count > 0)
        {
            node = queue.Dequeue();
            if (node.SuffixStart.HasValue)
                yield return (int) node.SuffixStart.Value;

            foreach (var edge in node.Edges.Values)
            {
                queue.Enqueue(edge.To);
            }
        }
    }
}

主要方法是 `Search`。它接受文本、终止符(用于确保任何后缀都不是另一个后缀的前缀)、一组模式以及符号比较器。此方法会创建一个后缀树并枚举所有模式以查找每个模式的匹配项。

方法 `GetMatches` 查找单个模式的所有匹配项。首先,它在后缀树中找到从树根到模式所指向的节点(调用 `GetPatternNode`)。然后,对于该节点,它查找属于该节点的、所有后缀的起始位置(调用 `GetSuffixStarts`)。这些位置就是我们所寻找的。

方法 `GetPatternNode` 使用 `GetEdgeMatch` 来查找模式停止的边(在后缀树中,每条边都包含初始文本的一部分)。我们从根节点开始,沿着边逐个移动,直到边上的相应符号与模式上的相应符号相等。

方法 `GetSuffixStarts` 很简单。后缀树中的叶节点包含相应后缀的起始位置。我们只需从找到的节点的子树中收集所有这些位置。

您可以在这里找到算法的完整代码。在同一个存储库中,您还可以找到用于创建后缀树的类,如果您需要的话。

现在,当我们回顾了精确模式匹配算法的工作原理后,让我们继续讨论近似模式匹配。

近似模式匹配

近似模式匹配的想法很简单。当我们寻找精确匹配时,从每个节点最多只有一条路径,最多只有一个边需要考虑。当我们寻找近似匹配时,我们必须考虑节点的所有路径,所有出边。只有当某个路径上的错误数量超过我们的限制时,我们才应该放弃这条路径。

看代码

public static class SuffixTreeApproximateSearch<TSymbol>
{
    private class TaskDescription
    {
        public TaskDescription(
            SuffixTreeNode<TSymbol> startNode, 
            IReadOnlyList<TSymbol> pattern, 
            int patternStartIndex, 
            uint numberOfErrors)
        {
            StartNode = startNode;
            Pattern = pattern;
            PatternStartIndex = patternStartIndex;
            NumberOfErrors = numberOfErrors;
        }

        public SuffixTreeNode<TSymbol> StartNode { get; }
        public IReadOnlyList<TSymbol> Pattern { get; }
        public int PatternStartIndex { get; }
        public uint NumberOfErrors { get; }
    }

    private class EdgeMatch
    {
        public EdgeMatch(int matchLength, uint numberOfErrors)
        {
            MatchLength = matchLength;
            NumberOfErrors = numberOfErrors;
        }

        public int MatchLength { get; }
        public uint NumberOfErrors { get; }
    }

    public static IEnumerable<StringSearchApproximateMatch<TSymbol>> Search(
        IReadOnlyList<TSymbol> text,
        TSymbol stopSymbol,
        IEnumerable<IEnumerable<TSymbol>> patterns,
        uint numberOfErrors,
        IComparer<TSymbol> comparer = null)
    {
        if (text == null) throw new ArgumentNullException(nameof(text));

        if (patterns == null)
            return new StringSearchApproximateMatch<TSymbol>[0];

        comparer = new StopSymbolFirstComparer<TSymbol>(comparer ?? Comparer<TSymbol>.Default, stopSymbol);

        var suffixTree = SuffixTreeCreator<TSymbol>.CreateSuffixTree(text, stopSymbol, comparer);

        return patterns
            .SelectMany(pattern => GetMatches(pattern.ToArray(), numberOfErrors, suffixTree, comparer));
    }

    private static IEnumerable<StringSearchApproximateMatch<TSymbol>> GetMatches(
        IReadOnlyList<TSymbol> pattern, 
        uint numberOfErrors, 
        SuffixTree<TSymbol> suffixTree,
        IComparer<TSymbol> comparer)
    {
        Queue<TaskDescription> queue = new Queue<TaskDescription>();
        queue.Enqueue(new TaskDescription(
            suffixTree.Root,
            pattern,
            0,
            numberOfErrors));

        LinkedList<SuffixTreeNode<TSymbol>> foundNodes = new LinkedList<SuffixTreeNode<TSymbol>>();

        while (queue.Count > 0)
        {
            TaskDescription task = queue.Dequeue();

            ProcessSearchTask(queue, foundNodes, task, suffixTree.Text, comparer);
        }

        return foundNodes
            .SelectMany(GetSuffixStarts)
            .Where(s => PatternDoesNotOverlapStopSymbol(s, pattern.Count, suffixTree.Text.Count))
            .Select(s => new StringSearchApproximateMatch<TSymbol>(s, pattern));
    }

    private static bool PatternDoesNotOverlapStopSymbol(int startPosition, int patternLength, int textLength)
    {
        if (startPosition >= textLength - 1)
            return false;
        if (patternLength > 0 && startPosition + patternLength >= textLength)
            return false;
        return true;
    }

    private static void ProcessSearchTask(
        Queue<TaskDescription> queue, 
        LinkedList<SuffixTreeNode<TSymbol>> foundNodes, 
        TaskDescription task, 
        IReadOnlyList<TSymbol> text,
        IComparer<TSymbol> comparer)
    {
        if (task.PatternStartIndex >= task.Pattern.Count)
        {
            foundNodes.AddLast(task.StartNode);
            return;
        }

        foreach (var edge in task.StartNode.Edges.Values)
        {
            EdgeMatch edgeMatch = GetEdgeMatch(edge, text, task.Pattern, task.PatternStartIndex, comparer);

            if(edgeMatch.NumberOfErrors > task.NumberOfErrors)
                continue;

            var newPatternStartIndex = task.PatternStartIndex + edgeMatch.MatchLength;

            if (newPatternStartIndex >= task.Pattern.Count)
            {
                foundNodes.AddLast(edge.To);
                continue;
            }
                
            queue.Enqueue(new TaskDescription(
                edge.To,
                task.Pattern,
                newPatternStartIndex,
                task.NumberOfErrors - edgeMatch.NumberOfErrors
                ));
        }
    }

    private static EdgeMatch GetEdgeMatch(
        SuffixTreeEdge<TSymbol> edge, 
        IReadOnlyList<TSymbol> text, 
        IReadOnlyList<TSymbol> pattern, 
        int patternStartIndex,
        IComparer<TSymbol> comparer)
    {
        int matchLength = 0;
        uint numberOfErrors = 0;

        while (true)
        {
            var textIndex = edge.Start + matchLength;
            var pattentIndex = patternStartIndex + matchLength;
            if(matchLength >= edge.Length)
                break;
            if(pattentIndex >= pattern.Count)
                break;

            if (comparer.Compare(text[(int) textIndex], pattern[pattentIndex]) != 0)
                numberOfErrors++;

            matchLength++;
        }

        return new EdgeMatch(matchLength, numberOfErrors);
    }

    private static IEnumerable<int> GetSuffixStarts(SuffixTreeNode<TSymbol> node)
    {
        var queue = new Queue<SuffixTreeNode<TSymbol>>();
        queue.Enqueue(node);

        while (queue.Count > 0)
        {
            node = queue.Dequeue();
            if (node.SuffixStart.HasValue)
                yield return (int) node.SuffixStart.Value;

            foreach (var edge in node.Edges.Values)
            {
                queue.Enqueue(edge.To);
            }
        }
    }
}

方法 `Search` 与之前算法相同。但在 `GetMatches` 的实现中,有重大变化。在这里,我们维护一个待解决的任务队列和一个当前模式的已找到节点列表。一个任务由 `TaskDescription` 类表示。该任务说明,我们应该在后缀树的节点子树(`StartNode` 属性)内,从特定位置(`PatternStartIndex` 属性)开始,在允许的错误数量(`NumberOfErrors` 属性)限制下,查找模式(`Pattern` 属性)的所有可能匹配项。在 `while` 循环中,我们将解决我们的任务,然后使用相同的 `GetSuffixStarts` 方法从找到的节点返回结果。这个最后一部分唯一的区别是,现在我们的模式可以与终止符重叠,因为我们允许一定数量的错误。使用 `PatternDoesNotOverlapStopSymbol` 方法进行过滤可以防止这种情况。

一个任务的解决是由 `ProcessSearchTask` 方法实现的。首先,我们检查我们的模式是否已经处理完毕。在这种情况下,我们只需将当前节点添加到已找到的节点列表中。否则,我们考虑从当前节点出发的所有边。对于每条边,我们使用 `GetEdgeMatch` 方法计算两个数字:沿着该边比较的符号数量(`EdgeMatch` 类中的 `MatchLength` 属性)和比较过程中发生的错误数量(`EdgeMatch` 类中的 `NumberOfErrors` 属性)。如果这个错误数量超过了我们的限制,我们就什么也不做。否则,如果已到达模式的末尾,我们将边的末端添加到已找到的节点列表中。否则,我们创建一个新任务来处理。这个任务包含另一个节点、模式中的另一个起始位置和另一个允许的错误数。

我想强调的是,`ProcessSearchTask` 方法可能无法完全解决其任务。相反,它可以生成几个更小(更简单)的任务来解决。

这就是近似模式匹配算法的工作原理。很简单,不是吗? :-) 您可以在这里查看完整代码。

尽管此算法有效,但还有另一种简单的方法可以使其更好。我指的是多线程。

多线程近似模式匹配

看看上面的算法。我们所做的就是生成和解决不同的任务。还要注意,所有任务都是独立的。我们的后缀树和模式是只读的。我们从不更改任务描述中的任何位。这种不变性是一个好迹象,表明我们可以应用多线程处理。

让我们研究一下新算法。

public static class MultithreadedSuffixTreeApproximateSearch<TSymbol>
{
    private class TaskDescription
    {
        public TaskDescription(SuffixTreeNode<TSymbol> startNode, IReadOnlyList<TSymbol> pattern, int patternStartIndex, uint numberOfErrors)
        {
            StartNode = startNode;
            Pattern = pattern;
            PatternStartIndex = patternStartIndex;
            NumberOfErrors = numberOfErrors;
        }

        public SuffixTreeNode<TSymbol> StartNode { get; }
        public IReadOnlyList<TSymbol> Pattern { get; }
        public int PatternStartIndex { get; }
        public uint NumberOfErrors { get; }
    }

    private class EdgeMatch
    {
        public EdgeMatch(int matchLength, uint numberOfErrors)
        {
            MatchLength = matchLength;
            NumberOfErrors = numberOfErrors;
        }

        public int MatchLength { get; }
        public uint NumberOfErrors { get; }
    }

    private class Waiter
    {
        private readonly object _lock = new object();
        private int _counter;

        public void Increment()
        {
            lock (_lock) { _counter++; }
        }

        public void Decrement()
        {
            lock (_lock) { _counter--; }
        }

        public bool IsFinished()
        {
            lock (_lock) { return _counter == 0; }
        }
    }

    public static IEnumerable<StringSearchApproximateMatch<TSymbol>> Search(
        IReadOnlyList<TSymbol> text,
        TSymbol stopSymbol,
        IEnumerable<IEnumerable<TSymbol>> patterns,
        uint numberOfErrors,
        IComparer<TSymbol> comparer = null)
    {
        if (text == null) throw new ArgumentNullException(nameof(text));

        if (patterns == null)
            return new StringSearchApproximateMatch<TSymbol>[0];

        comparer = new StopSymbolFirstComparer<TSymbol>(comparer ?? Comparer<TSymbol>.Default, stopSymbol);

        var suffixTree = SuffixTreeCreator<TSymbol>.CreateSuffixTree(text, stopSymbol, comparer);

        var waiter = new Waiter();

        var results = new ConcurrentBag<StringSearchApproximateMatch<TSymbol>>();

        foreach (var pattern in patterns)
        {
            waiter.Increment();

            var task = new TaskDescription(
                suffixTree.Root,
                pattern.ToArray(),
                0,
                numberOfErrors);

            ThreadPool.QueueUserWorkItem(state =>
            {
                ProcessSearchTask(results, waiter, suffixTree.Text, comparer, task);
            });
        }

        while (!waiter.IsFinished())
        {
            Thread.Sleep(0);
        }

        return results;
    }

    private static bool PatternDoesNotOverlapStopSymbol(int startPosition, int patternLength, int textLength)
    {
        if (startPosition >= textLength - 1)
            return false;
        if (patternLength > 0 && startPosition + patternLength >= textLength)
            return false;
        return true;
    }

    private static void ProcessSearchTask(ConcurrentBag<StringSearchApproximateMatch<TSymbol>> results, Waiter waiter, IReadOnlyList<TSymbol> text, IComparer<TSymbol> comparer, TaskDescription task)
    {
        if (task.PatternStartIndex >= task.Pattern.Count)
        {
            AddNodeMatches(results, task.StartNode, task.Pattern, text.Count);
            waiter.Decrement();
            return;
        }

        foreach (var edge in task.StartNode.Edges.Values)
        {
            EdgeMatch edgeMatch = GetEdgeMatch(edge, text, task.Pattern, task.PatternStartIndex, comparer);

            if(edgeMatch.NumberOfErrors > task.NumberOfErrors)
                continue;

            var newPatternStartIndex = task.PatternStartIndex + edgeMatch.MatchLength;

            if (newPatternStartIndex >= task.Pattern.Count)
            {
                AddNodeMatches(results, edge.To, task.Pattern, text.Count);
                continue;
            }

            var newTask = new TaskDescription(
                edge.To,
                task.Pattern,
                newPatternStartIndex,
                task.NumberOfErrors - edgeMatch.NumberOfErrors
                );

            waiter.Increment();

            ThreadPool.QueueUserWorkItem(state =>
            {
                ProcessSearchTask(results, waiter, text, comparer, newTask);
            });
        }

        waiter.Decrement();
    }

    private static void AddNodeMatches(ConcurrentBag<StringSearchApproximateMatch<TSymbol>> results, SuffixTreeNode<TSymbol> node, IReadOnlyList<TSymbol> pattern, int textLength)
    {
        foreach (var start in GetSuffixStarts(node).Where(s => PatternDoesNotOverlapStopSymbol(s, pattern.Count, textLength)))
        {
            results.Add(new StringSearchApproximateMatch<TSymbol>(start, pattern));
        }
    }

    private static EdgeMatch GetEdgeMatch(
        SuffixTreeEdge<TSymbol> edge, 
        IReadOnlyList<TSymbol> text, 
        IReadOnlyList<TSymbol> pattern, 
        int patternStartIndex,
        IComparer<TSymbol> comparer)
    {
        int matchLength = 0;
        uint numberOfErrors = 0;

        while (true)
        {
            var textIndex = edge.Start + matchLength;
            var pattentIndex = patternStartIndex + matchLength;
            if(matchLength >= edge.Length)
                break;
            if(pattentIndex >= pattern.Count)
                break;

            if (comparer.Compare(text[(int) textIndex], pattern[pattentIndex]) != 0)
                numberOfErrors++;

            matchLength++;
        }

        return new EdgeMatch(matchLength, numberOfErrors);
    }

    private static IEnumerable<int> GetSuffixStarts(SuffixTreeNode<TSymbol> node)
    {
        var queue = new Queue<SuffixTreeNode<TSymbol>>();
        queue.Enqueue(node);

        while (queue.Count > 0)
        {
            node = queue.Dequeue();
            if (node.SuffixStart.HasValue)
                yield return (int) node.SuffixStart.Value;

            foreach (var edge in node.Edges.Values)
            {
                queue.Enqueue(edge.To);
            }
        }
    }
}

这段代码背后的想法很简单。当我们创建一个新任务时,我们不存储它,而是立即使用 `ThreadPool.QueueUserWorkItem` 来执行它。这使我们能够立即使用线程池。由于我们同时处理许多任务,因此我们必须使用并发集合来收集它们执行的结果。我们为此使用了 `ConcurrentBag`。

唯一需要解决的问题是我们何时停止。我们必须以某种方式知道所有任务都已执行并且没有更多任务了。为了实现这个目标,我实现了一个简单的 `Waiter` 类。它包含一个计数器,并允许以线程安全的方式递增和递减它以及与零进行比较。在将新任务放入线程池之前,我们增加计数器。当任何任务完成时,我们递减计数器。现在,我们只需要等待计数器变为零。

我承认,我不是多线程代码专家。我相信有人能想出更好的方法来等待所有任务。

该类的完整代码可以在这里找到。

效率

在这里,我将尝试以某种方式估计算法的效率。请注意,我们将讨论算法的非多线程版本。另外,您应该知道我不是这方面的问题专家。所以您应该对本文的这部分内容持保留态度。

首先,您不应该忘记构建后缀树所需的时间。它可以以 O(文本长度) 的时间构建。

现在让我们看看 `SuffixTreeApproximateSearch` 类的作用。主要工作在 `ProcessSearchTask` 方法中完成。假设我们的字母表(文本和模式中可能出现的所有字母)包含 L 个字母。这意味着,在最坏的情况下,后缀树的每个节点将包含大约 L 个子节点(如果我们计算终止符,则为 L + 1)。在最坏的情况下,树的每条边可能只对应文本的一个符号。这意味着每个任务(`TaskDescription` 类的一个实例)将只比较模式的一个符号与文本的一个符号。

当我们还有错误空间时,每个节点的任务都会生成 L 个子任务。如果我们总共允许 N 个错误,那么在错误能力耗尽之前,至少会生成 L^N (L 的 N 次方) 个任务。所以算法的效率不能优于 O(L^N)。

之后,我们不能再犯错误了。所以我们回到了精确模式匹配问题。如果模式的长度远大于 N,那么算法的这一部分对于在前一部分中访问的每个节点来说,最多需要 O(模式长度) 的时间。

所以总时间大约是 O(<文本长度> + L^N * <模式长度>)。当然,这个估计只针对单个模式。对于多个模式,时间会相应地增加。

结论

在本文中,我介绍了一种近似模式匹配算法。我必须告知您,有更有效的算法可以解决相同的问题。该算法的优点是简单易懂,并且能够利用多线程来解决其任务,从而利用多个处理器内核的强大功能。

我希望它能对您的应用有所帮助。

您可以在我的博客上阅读我更多的文章.

© . All rights reserved.