节点数众多时的最短/最快路径 - 共3部分,第1部分 - 无数组的Dijkstra






4.79/5 (24投票s)
降低内存消耗,使Dijkstra算法能够处理大量节点。
引言
本文的主要目标是提出一种不使用巨型静态二维数组所带来的内存限制的方法来使用Dijkstra算法,使其能够处理大量节点。
所谓“大量”是指100到1000个节点。如果节点少于100个,您可以使用网络上找到的任何使用数组的解决方案。
我们提出的代码并不局限于1000个节点(实际上我还没有找到上限),但在超过1000个节点后,找到解决方案的时间将不可接受。
本文的第二部分将提出一种能够以极低的处理时间处理数百万个节点的解决方案。
背景
Dijkstra是一个非常通用的贪心算法,它的解决方案包含了最坏情况:一个每个节点都可以连接到所有其他节点的图。
但经过对许多问题的研究,我发现其中大多数都基于“类似2D地图”的节点集。在这种情况下,节点之间的连接性非常低。节点通常只连接到其周围的节点,形成一个非常稀疏的链接矩阵。
网络上存在的Dijkstra基本算法并没有考虑到这一点,并分配资源来映射节点之间所有可能的连接,甚至连接到自身。
让我们分析这个示例网格
它表示一个15x12的“类似2D地图”网格。该网格用于上面可下载的示例源代码。
现在…如果我们将其视为一个全连接矩阵,我们需要180(15x12)个节点来表示它。这将导致一个链接矩阵,包含32400(180^2)个十进制(或整数)值来喂给Dijkstra。我从网上下载的所有代码在尝试生成超过100个节点的链接矩阵时都崩溃了(“内存溢出”)。
如果程序没有崩溃,分配的内存也会导致代码(和整个机器)运行缓慢。
矩阵的大小始终是节点数的平方。这意味着问题呈指数级增长。因此,即使您获得一台非常庞大而强大的机器,很快也会达到无法处理的程度。
那么…我们先来解决这个问题…
准备数据
首先要做的是注意到并非网格中的每个点都被使用。存在空隙。因此,我们不从网格尺寸开始,而是构建一个节点列表。这稍微减少了问题:从180个节点减少到(当然,在这个示例中)120个节点。
这并没有太大帮助,但这是一个开始…
我们还注意到节点并非完全连接,并且并非所有连接都是双向的。我们可以将问题简化为我们实际使用的链接。这意味着将整个问题简化为仅246个链接(同样,在这个示例中)。
现在我们有话可说了…
在我看来,链接的总数通常在节点数的两到三倍之间。惊喜!问题变得线性了!
此时,我们意识到主要问题不是Dijkstra算法本身,而是准备输入数据以供其使用。但我们仍然可以改进代码以避免过多的内存消耗。我们稍后会讨论。
考虑到这一点,让我们看一下我在源代码示例中使用的输入文件的一个摘录。
120
0 4
1 4
2 1
2 2
2 3
2 4
2 5
2 6
2 7
2 8
3 0
3 1
...
246
0 1 0.0625 4.5
1 0 0.0625 4.5
1 5 0.0625 4.5
2 11 0.0625 4.5
3 2 0.0625 4.5
4 3 0.0625 4.5
4 13 0.0625 3.0
5 1 0.0625 4.5
5 4 0.0625 4.5
5 14 0.0625 3.0
6 5 0.0625 4.5
7 6 0.0625 4.5
8 7 0.0625 4.5
8 17 0.0625 2.0
8 9 0.0625 4.5
9 18 0.0625 4.5
...
5
7 65
18 98
...
第一部分是节点列表。它描述了网格中的坐标点。如果您想绘制网格,则只需要它。否则,您只需要第二部分。第二部分是一个“表格式”列表,显示第一个节点、第二个节点以及连接这两个节点的任何数据。在此示例中,我给出了英里和分钟的距离。因此,该行…
0 1 0.0625 4.5
…表示节点0连接到节点1,它们的距离为0.0625英里,并且从0到1需要4.5分钟。
下一行…
1 0 0.0625 4.5
…表示也有一个从1到0的连接,使其成为双向的。值是相同的,但并非必然如此。如果您来回移动,可能会有不同的值。
输入的最后一部分只是提供了一个我们想要用它来获得代码结果的案例列表。所以…输入中真正重要的是第二部分。这些数据可以来自任何地方,数据库、GPS批处理文件、Google Maps。
让我们看看代码…
整个解决方案仅在代码的两行中。
...
_distanceSolver = new Dijkstra(_numberOfNodes, GetDistance); // line 74
...
var solution = _distanceSolver.Solve(_cases[i].StartId, _cases[i].EndId); // line 83
...
Dijkstra类的构造函数非常简单,只是初始化内部字段…
public Dijkstra(int numberOfNodes, Func<int, int, decimal> values)
{
if (numberOfNodes < 0) throw new ArgumentException("The number of nodes must be positive.");
if (values == null) throw new ArgumentNullException("values");
NumberOfNodes = numberOfNodes;
_values = values;
}
但它包含了此解决方案的主要功能。通过将值作为委托函数传递给类,完全不需要静态矩阵。
这些值是“按需”获取的。您可以完全控制输入,从而控制提供给Dijkstra的内容。
您只需记住,Dijkstra是累积算法,因此所有有效值都必须是非负的。值为-1表示这些节点未连接,但我们表示节点的方式是最好直接将其从列表中删除。
这种控制带来了一些重要的优势:
1 - 所有验证都可以直接在函数内完成。在解决方案交互过程中不需要生成异常,只需返回-1。
2 - 值实际上不需要是静态的。您可以构建函数,根据一些额外信息来修改函数输出。
例如,如果您希望GetTime的输出随一天中的时间变化(例如由于交通),那么GetTime函数可以具有签名:Func<int, int, DateTime, decimal>。
想象一下,您通常需要2小时才能从一个地方到另一个地方,但在途中(1小时后)高峰时段开始。您开始时的时间值不再有效,必须进行调整。这2小时的行程(在正常交通下)变成了2.5小时的行程(部分在高峰时段)。这是更精确的输出。
可能性是无限的。
代码中最重要的一部分没有暴露。是Interaction函数。
private void Iteraction()
{
var source = (from r in _nodes
where !r.Done
&& r.Path.Any()
orderby r.Path.Sum(p => p.Value)
select r).First();
var connectedNodes = (from r in _nodes
where r.Node != source.Node
&& r.Node != _startNode
&& _values(source.Node, r.Node) != -1
&& (!r.Path.Any() || r.Path.Sum(v => v.Value) > (source.Path.Sum(p => p.Value) + _values(source.Node, r.Node)))
select new { node = r, newValue = _values(source.Node, r.Node) }).ToList();
connectedNodes.ForEach(i => {
i.node.SetPath(source.Path);
i.node.AddStep(source.Node, i.newValue);
});
source.Done = true;
}
这是Dijkstra魔法发生的地方,其逻辑几乎没有改变,只是用LINQ改进了,并且没有使用任何显式静态数组。
在解决方案生命周期内始终分配的唯一列表是内部_nodes
列表。该列表的最大尺寸为NumberOfNodes
。所以它非常小。
并行处理可能是这里的另一种选择,但我将在本文的下一部分讨论。
最后我想提一下Solve函数的返回值。
public IEnumerable<int> Solve(int startNode, int endNode)
{
if (startNode < 0 || startNode >= NumberOfNodes) throw new ArgumentException("The start node must be between zero and the number of nodes");
if (endNode < 0 || endNode >= NumberOfNodes) throw new ArgumentException("The end node must be between zero and the number of nodes");
_startNode = startNode;
Reset();
for (var i = 1; i < NumberOfNodes; i++) Iteraction();
GetNode(endNode).AddStep(endNode, 0);
return GetNode(endNode).Path.Select(p => p.Node);
}
我尝试了许多输出,但我发现最好的(对我来说)是包含计算路径的节点列表。
由于我拥有提供两个节点之间值的函数,如果我收到连接起点和终点的节点链,我可以将它们合并,并单独获取每一步的值。甚至可以应用不同的函数来比较结果。
这就是下载的解决方案中所做的。
好吧…这是Dijkstra类的完整代码。
public class Dijkstra
{
private int _startNode = 0;
private List<destination> _nodes;
private Func<int, int, decimal> _values;
public int NumberOfNodes { get; private set; }
public bool IsSolved { get { return !_nodes.Any(n => !n.Done); } }
public Dijkstra(int numberOfNodes, Func<int, int, decimal> values)
{
if (numberOfNodes < 0) throw new ArgumentException("The number of nodes must be positive.");
if (values == null) throw new ArgumentNullException("values");
NumberOfNodes = numberOfNodes;
_values = values;
}
public void Reset()
{
if (_nodes == null) _nodes = new List<destination>();
_nodes.Clear();
for (var node = 0; node < NumberOfNodes; node++)
{
_nodes.Add(new Destination(node, node == _startNode));
var value = _values(_startNode, node);
if (value != -1) GetNode(node).AddStep(_startNode, value);
}
}
private Destination GetNode(int n)
{
return _nodes.SingleOrDefault(i => i.Node == n);
}
private void Iteraction()
{
var source = (from r in _nodes
where !r.Done
&& r.Path.Any()
orderby r.Path.Sum(p => p.Value)
select r).First();
var connectedNodes = (from r in _nodes
where r.Node != source.Node
&& r.Node != _startNode
&& _values(source.Node, r.Node) != -1
&& (!r.Path.Any() || r.Path.Sum(v => v.Value) > (source.Path.Sum(p => p.Value) + _values(source.Node, r.Node)))
select new { node = r, newValue = _values(source.Node, r.Node) }).ToList();
connectedNodes.ForEach(i => {
i.node.SetPath(source.Path);
i.node.AddStep(source.Node, i.newValue);
});
source.Done = true;
}
public IEnumerable<int> Solve(int startNode, int endNode)
{
if (startNode < 0 || startNode >= NumberOfNodes) throw new ArgumentException("The start node must be between zero and the number of nodes");
if (endNode < 0 || endNode >= NumberOfNodes) throw new ArgumentException("The end node must be between zero and the number of nodes");
_startNode = startNode;
Reset();
for (var i = 1; i < NumberOfNodes; i++) Iteraction();
GetNode(endNode).AddStep(endNode, 0);
return GetNode(endNode).Path.Select(p => p.Node);
}
private class Destination
{
public Destination(int n, bool d)
{
Node = n;
Path = new List<origin>();
Done = d;
}
public int Node { get; private set; }
public List<origin> Path { get; private set; }
public bool Done { get; internal set; }
public void AddStep(int d, decimal v)
{
Path.Add(new Origin(d, v));
}
public void SetPath(List<origin> p)
{
Path = new List<origin>(p);
}
}
private class Origin
{
public Origin(int n, decimal v)
{
Node = n;
Value = v;
}
public int Node { get; private set; }
public decimal Value { get; internal set; }
}
}
这段代码可以处理大量节点。
输出
在提供的解决方案中,展示了如何使用两个不同的函数:一个用于获取最短路径,另一个用于获取最快路径。您会发现它们(几乎总是)不相同。
在以DEBUG模式运行代码之前,请将您的控制台输出更改为每行160个字符,以获得更好的视图。它将显示链接矩阵的表示。这只是为了演示它的稀疏性。每个字符表示两个节点之间的连接。它表示一个120x120的字符矩阵,没有连接的地方是空白,表示现有连接的地方是点。请注意连接是如何沿着主对角线排列的。这意味着所有节点几乎总是连接到附近的另一个节点。同样,主对角线上也没有点,表示节点与其自身之间没有连接。
这清楚地表明,如果我们分配一个完整的静态二维数组,会浪费多少内存。
如果在RELEASE模式下运行,输出将仅发送到一个文件。这将显示此代码的速度有多快。
这是DEBUG模式输出的一个示例。
Link Matrix:
.
. .
.
.
. .
. . .
.
.
. . .
.
.
. . .
. .
. . .
. .
. .
. .
. . .
.
. .
. .
.
. .
. . .
. . . .
. . .
. . .
. . .
. . . .
. . .
. . .
.
.
. .
. .
. . .
.
. .
. .
. . .
. .
. .
. .
. .
. .
.
. .
. . .
. .
. .
.
.
. .
. .
. . .
.
. .
. . .
. .
. .
. .
.
. .
. .
. . .
. .
. .
. .
. . .
. .
.
.
. .
. .
. . .
. .
. .
. .
. . .
. .
. .
. .
. .
. . .
. .
. .
. .
. . .
. .
.
.
. . .
. . .
. . . .
. . .
. . .
. . .
. . . .
. . .
. .
.
. . .
.
. .
. .
. .
. .
.
. .
.
. .
. .
.
. .
.
. .
.
.
.
.
Case 1:
Shortest: 7->6->5->4->13->24->35->44->54->64->65: 0,625 miles, 29,5 minutes; Solution Time:0:00:00:00,0900000;
Fastest : 7->6->5->14->25->24->35->44->54->64->65: 0,625 miles, 28,0 minutes; Solution Time:0:00:00:00,0810000;
Case 2:
Shortest: 18->29->40->48->58->69->79->88->98: 0,500 miles, 25,5 minutes; Solution Time:0:00:00:00,0800000;
Fastest : 18->29->28->39->47->57->68->78->87->97->98: 0,625 miles, 21,0 minutes; Solution Time:0:00:00:00,0810000;
Case 3:
Shortest: 10->11->12->13->14->15->16->17->18->29->40->48->49->50->60->61: 0,938 miles, 52,5 minutes; Solution Time:0:00:00:00,0810000;
Fastest : 10->11->12->13->24->25->26->27->28->39->47->48->49->50->60->61: 0,938 miles, 44,0 minutes; Solution Time:0:00:00:00,0810000;
Case 4:
Shortest: 2->11->12->13->14->15->16->17->18->29->40->48->58->69->79->88->98->107->108->115->119: 1,250 miles, 67,5 minutes; Solution Time:0:00:00:00,0810000;
Fastest : 2->11->12->13->24->25->26->27->28->39->47->57->68->78->87->97->98->107->108->115->119: 1,250 miles, 53,0 minutes; Solution Time:0:00:00:00,0810000;
Case 5:
Shortest: 81->82->73->63->53->43->34->23->24->25->26->27->38: 0,750 miles, 32,0 minutes; Solution Time:0:00:00:00,0810000;
Fastest : 81->82->83->74->64->54->44->35->24->25->26->27->38: 0,750 miles, 27,0 minutes; Solution Time:0:00:00:00,0810000;
一个挑战
我还没有找到该代码的内存限制。如果您超过1000个节点(不包括加载数据所需的时间),单个查询的时间限制就会开始出现。
下表是我对具有所有双向连接的NxN 2D地图获得的近似时间:
链接数 | 迭代次数 | 总时间(毫秒) | 平均迭代时间(毫秒) |
360 | 100 | 80 | 0.8 |
840 | 225 | 660 | 2.9 |
1520 | 400 | 3640 | 9.1 |
2400 | 625 | 12940 | 20.7 |
3480 | 900 | 37800 | 42.0 |
4760 | 1224 | 96800 | 79.1 |
所以现在是处理时间呈指数级增长。
如果有人想尝试找到内存限制,请不要忘记在这里发布您的基准测试结果。
下一步是什么?
本文处理的是一个只有120个节点的假想网格,它给了我们大约240个链接。在现实世界中,即使一个小城市的一个小社区也可能有大约10000个节点。任何简单的游戏地图或迷宫都将有更多这样的节点。
当我们还需要去一个更大的区域时,会发生什么?例如,如何将纽约的一个地方与洛杉矶的一个地方连接起来?想象一下节点数。经过非常粗略的计算,我发现这个数字的数量级是10^10(!!)个节点。
即使是最好的机器也无法在合理的时间内用上面的代码完成。我们知道谷歌能做到。那么,诀窍在哪里?
在下一篇文章中,我将讨论解决这类问题的一种可能方法。
好了…各位,今天的分享就到这里…