SQL Server 2000 和 2005 中的树状结构实用工具,以及 OLAP 实现
本文描述了如何在 SQL Server 数据库中高效地存储和访问树状结构,以及如何在 OLAP 实现中使用它们。
- 下载用于创建 TREE_DB 和准备 OLAP 数据源的 SQL 脚本 - 5.38 Kb
- 下载销售事实表数据 - 8.26 Kb
- 下载 tree_util Web 服务 - 12.5 Kb
- 下载刷新销售立方体控制台应用程序 - 160 Kb
引言
我一直很喜欢树状结构。我在很多情况下都使用过它们,并且一直在寻找在 SQL Server 中管理它们的有效方法。由于 SQL Server(2000 和 2005 版本)中 32 个嵌套级别的限制阻碍了我们递归地解决这个问题,因此我们必须在数据库表中存储和访问嵌套结构时分配额外的开销。在本文中,我将尝试解释如何有效地将树状结构存储在 SQL Server 表中,如何从 T-SQL 和客户端轻松访问它,以及如何将其转换为 Microsoft Analysis Services 中 OLAP 维度识别的结构。
问题是如何高效且轻松地存储和访问树状结构,以及如何将它们转换为如下图所示的结构
树状结构存储
在关系表中存储树的最著名方法是建立节点与其父节点之间的链接。因此,ID
和 P_ID
字段对于此方法来说足够了,其中根节点的 P_ID
为 NULL
。此结构的访问方法可以是:存储节点的绝对路径(从根开始),如果结构是二叉树,则在另一个表中存储左 ID 和右 ID,再次存储另一个表中的 ID
和 P_ID
字段值以及一个 LEVEL
值,等等。在我看来,最后一种方法是最有效的方式,因为使用简单的 SELECT
语句即可轻松访问,而且对于大型结构,存储大小最小。有关树的更多信息,可以从 CodeProject 文章 General trees persisted in relational databases 和 MSDN 技术文章“Hierarchical Data and Scope Checking”中获取。
使用最后一种技术,我们将在表中添加两个附加列:NLEFT
,用于存储左索引值;NRIGHT
,用于存储右索引值。我们必须仔细设置这些列的相应值,因为节点的“左”值必须始终在其祖先的“左”值和“右”值之间。具备这些条件,只需通过一个简单的自连接并过滤节点即可检索整个层次结构。
SELECT C.ID, C.P_ID, C.NAME
FROM TREE C
INNER JOIN TREE P ON C.NLEFT BETWEEN P.NLEFT AND P.NRIGHT
WHERE P.ID = 1
CREATE TABLE
语句很简单。
CREATE TABLE TREE (
ID int NOT NULL ,
P_ID int NULL ,
NAME varchar (100),
NLEFT int NOT NULL CONSTRAINT DF_TREE_NLEFT DEFAULT (-1),
NRIGHT int NOT NULL CONSTRAINT DF_TREE_NRIGHT DEFAULT (-1),
NLEVEL int NULL CONSTRAINT DF_TREE_NLEVEL DEFAULT (-1),
CONSTRAINT PK_TREE PRIMARY KEY CLUSTERED (ID)
)
我添加了 NLEVEL
列来存储节点的级别,这在后续需求中很有用。根节点的级别从 1 开始。
访问并不难,但是我们如何设置 NLEFT
和 NRIGHT
的相应值呢?我选择了触发器实现,以确保每次插入节点时,层次结构都能正确存储。我们可以更新删除(即使不是必需的)时的 NLEFT
和 NRIGHT
值,以及 P_ID
字段值发生更改时。
我只展示插入操作的触发器,但对于所有更新 P_ID
和删除操作也可以用相同的方式实现。触发器检查新插入的节点是根节点还是后代节点。如果是根节点,它会生成一个新的 NLEFT
值(为每个层次结构分配最多 10000 个值),并将 NLEVEL
设置为 1。如果是一个子节点,则根据该节点是第一个(同一级别没有其他子节点)还是最后一个(对应级别和父节点有其他子节点)添加的子节点来设置值。
IF @P_ID IS NULL
BEGIN
SELECT @MAX_VALUE = ISNULL(MAX(NLEFT), 0) + 10000
FROM TREE WHERE P_ID IS NULL
SELECT @NLEVEL = 1
END
ELSE
BEGIN
SELECT @MAX_VALUE = ISNULL(MAX(NRIGHT), 0)
FROM TREE WHERE P_ID = @P_ID AND ID < @ID
SELECT @NLEVEL = ISNULL(NLEVEL, 0)
FROM TREE WHERE P_ID = @P_ID AND ID < @ID
IF @MAX_VALUE = 0
BEGIN
SELECT @MAX_VALUE = ISNULL(NLEFT, 0) FROM TREE WHERE ID = @P_ID
SELECT @NLEVEL = ISNULL(NLEVEL, 0) + 1 FROM TREE WHERE ID = @P_ID
END
END
UPDATE TREE SET NLEFT = @MAX_VALUE + 1,
NRIGHT = @MAX_VALUE + 2,
NLEVEL = @NLEVEL WHERE ID = @ID
在存储值之后,触发器会确保相应的祖先具有正确的 NLEFT
和 NRIGHT
值。这是从根到叶进行的,采用深度优先遍历。首先,它识别根节点,然后应用重新计算算法。
IF @P_ID IS NOT NULL
BEGIN
SELECT @ROOT_ID = ISNULL(P.ID, -1)
FROM TREE C
INNER JOIN TREE P ON C.NLEFT BETWEEN P.NLEFT AND P.NRIGHT
WHERE C.ID = @P_ID AND P.P_ID IS NULL
-- EXEC TREE_RECALC @ROOT_ID -- RECURSIVE APPROACH
EXEC TREE_RECALC_ITER @ROOT_ID -- ITERATIVE APPROACH
END
重新计算或编号算法是
- 获取根节点的
NLEFT
值; - 向下移动层次结构,找到最顶层父节点从左到右的第一个子节点,并为其分配
NLEFT
值 2; - 继续向下移动层次结构,将
NLEFT
值设置为最后一个NLEFT
+ 1,如果找到叶节点,则计算其NLEFT
和NRIGHT
值(NRIGHT
=NLEFT
+ 1); - 继续对当前层次结构当前级别的所有后续可能子节点进行此编号;
- 在所有叶子子节点编号完成后,返回到它们的父节点,并将其
NRIGHT
值设置为其子节点的最大NRIGHT
值 + 1; - 继续为当前节点的同级节点进行编号;
- 继续遍历层次结构,直到所有节点都已编号;
该算法有两种实现方式:递归和迭代。
递归方法很简单,可以在 TREE_RECALC_REC
存储过程中找到。该存储过程会检查当前处理的节点是否具有子节点。如果具有子节点,则对所有子节点递归调用自身,并将 @NRIGHT
输出参数设置为相应的值。如果节点没有子节点(它是叶节点),则仅设置 NLEFT
和 NRIGHT
值。
SELECT @NCOUNT = COUNT(*), @NLEVEL2 = @NLEVEL + 1
FROM TREE
WHERE P_ID = @NPARENT
IF @NCOUNT > 0
BEGIN
SELECT @NNEWLEFT = @NLEFT + 1
SELECT @NTEMPID = 0
SELECT @NTEMPID = MIN(ID), @NCOUNTER = COUNT(*)
FROM TREE
WHERE P_ID = @NPARENT AND ID > @NTEMPID
WHILE (@NCOUNTER > 0)
BEGIN
EXEC TREE_RECALC_REC @NNEWLEFT,
@NTEMPID, @NLEVEL2, @NRIGHT OUTPUT
SELECT @NNEWLEFT = @NRIGHT + 1
SELECT @NTEMPID = MIN(ID), @NCOUNTER = COUNT(*)
FROM TREE
WHERE P_ID = @NPARENT AND ID > @NTEMPID
END
UPDATE TREE SET NLEFT = @NLEFT,
NRIGHT = @NRIGHT + 1, NLEVEL = @NLEVEL
WHERE ID = @NPARENT
SELECT @NRIGHT = @NRIGHT + 1
END
ELSE
BEGIN
SELECT @NRIGHT = @NLEFT + 1
UPDATE TREE SET NLEFT = @NLEFT,
NRIGHT = @NRIGHT,
NLEVEL = @NLEVEL WHERE ID = @NPARENT
END
众所周知,32 个嵌套级别的限制迫使我们**不能**在层次结构中存储超过 32 个级别。
迭代方式解决了 32 个嵌套级别限制的问题。解决方法是使用“堆栈”,正如 MSDN 文章 Expanding Hierarchies 中所述。 “堆栈”是一个临时表,用于存储属于同一父节点的所有节点,从根节点开始。节点通过一个级别变量在循环中使用,当“堆栈”中没有当前级别的节点时,级别将减 1(处理前一个父节点的级别)。级别变量的值从 1 到 MAX(hierarchy level)
。当您要检索子节点的祖先时,也可以应用相同的算法,但级别变量将从子节点级别开始递减到 1。当处理完子层次结构中的最后一个叶节点(同一级别和父节点中具有最大 ID
值的节点)后,应该对祖先应用编号算法。为此,我们需要另一个“堆栈”表,它允许从子节点到祖节点的编号。实现确实需要大量开销,但嵌套级别是无限的。TREE_RECALC_ITER
存储过程包含迭代重新计算算法。
WHILE @NLEVEL > 0
BEGIN
IF EXISTS(SELECT * FROM #TPC WHERE NLEVEL = @NLEVEL)
BEGIN
SELECT TOP 1 @ID = ID, @P_ID = P_ID,
@NAME = NAME FROM #TPC WHERE NLEVEL = @NLEVEL ORDER BY ID
INSERT INTO #T VALUES(@ID, @P_ID, @NAME, @NLEFT, @NRIGHT, @NLEVEL)
DELETE FROM #TPC WHERE NLEVEL = @NLEVEL AND ID = @ID
INSERT INTO #TPC SELECT ID, P_ID, NAME,
@NLEVEL + 1 FROM TREE WHERE P_ID = @ID
IF @@ROWCOUNT > 0
SET @NLEVEL = @NLEVEL + 1
IF EXISTS(SELECT ID FROM TREE WHERE P_ID = @ID)
BEGIN
SET @NLEFT = @NLEFT + 1
SET @NRIGHT = @NLEFT + 1
END
ELSE
BEGIN
IF @ID = (SELECT MAX(ID) FROM TREE WHERE
NLEVEL = @NLEVEL AND P_ID = @P_ID)
BEGIN
PRINT LTRIM(RTRIM(STR(@ID))) + ' ' +
@NAME + ' ' + LTRIM(RTRIM(STR(@NRIGHT)))
SET @NLEVEL2 = 1
TRUNCATE TABLE #TCP
INSERT INTO #TCP SELECT ID, P_ID, NAME,
@NLEVEL2 FROM #T WHERE ID = @P_ID
WHILE @NLEVEL2 > 0
BEGIN
IF EXISTS(SELECT * FROM #TCP WHERE NLEVEL = @NLEVEL2)
BEGIN
SELECT TOP 1 @ID2 = ID, @P_ID2 = P_ID,
@NAME2 = NAME FROM #TCP
WHERE NLEVEL = @NLEVEL2 ORDER BY ID
SET @NRIGHT = @NRIGHT + 1
UPDATE #T SET NRIGHT = @NRIGHT WHERE ID = @ID2
DELETE FROM #TCP WHERE NLEVEL = @NLEVEL2 AND ID = @ID2
INSERT INTO #TCP SELECT ID, P_ID, NAME,
@NLEVEL2 + 1 FROM #T WHERE ID = @P_ID2
IF @@ROWCOUNT > 0
SET @NLEVEL2 = @NLEVEL2 + 1
END
ELSE
BEGIN
SET @NLEVEL2 = @NLEVEL2 - 1
END
END
END
ELSE
BEGIN
SET @NLEFT = @NRIGHT + 1
SET @NRIGHT = @NLEFT + 1
END
END
END
ELSE
BEGIN
SET @NLEVEL = @NLEVEL - 1
SELECT @NLEFT = MAX(NRIGHT) + 1 FROM #T WHERE NLEVEL = @NLEVEL
SET @NRIGHT = @NLEFT + 1
END
END
树状结构访问
现在可以使用简单的 SQL 语句轻松访问层次结构。
- 选择整个层次结构
SELECT C.ID, C.P_ID, C.NAME FROM TREE C INNER JOIN TREE P ON C.NLEFT BETWEEN P.NLEFT AND P.NRIGHT WHERE P.ID = 1
- 选择节点的所有祖先
SELECT P.ID, P.NAME, P.P_ID FROM TREE C INNER JOIN TREE P ON C.NLEFT BETWEEN P.NLEFT AND P.NRIGHT WHERE C.ID = 5
- 选择层次结构中的所有叶节点
SELECT C.ID, C.P_ID, C.NAME FROM TREE C INNER JOIN TREE P ON C.NLEFT BETWEEN P.NLEFT AND P.NRIGHT WHERE P.ID = 1 AND C.NRIGHT - C.NLEFT = 1
- 检索层次结构中的所有非叶节点
SELECT C.ID, C.P_ID, C.NAME FROM TREE C INNER JOIN TREE P ON C.NLEFT BETWEEN P.NLEFT AND P.NRIGHT WHERE P.ID = 1 AND C.NRIGHT – C.NLEFT <> 1
- 以 XML 模式选择整个结构(包含两个嵌套级别,如
TREE_GET_XML
存储过程中实现的)SELECT 1 AS TAG , NULL AS PARENT , '' AS [root!1] , NULL AS [node!2!id] , NULL AS [node!2!p_id] , T.NAME AS [node!2!name] FROM TREE T WHERE T.ID = @ROOT_ID UNION SELECT 2, 1, NULL, C.ID, C.P_ID, C.NAME FROM TREE P INNER JOIN TREE C ON C.NLEFT BETWEEN P.NLEFT AND P.NRIGHT WHERE P.ID = @ROOT_ID FOR XML EXPLICIT
最后一个 XML 输出对客户端来说并不那么有用。它们可能需要 SQL Server XML 功能无法获得的底层树结构,而不是这种“虚假的”ID-父 ID 实现。因此,下一个想法将是使用 TREE_GET_XML
存储过程、一个 XML Web 服务和一个通用的 XSL 转换样式表来重新排列“虚假的”树结构,使其成为嵌套的 XML 层次结构。样式表非常简单。它从根节点(//*[not(@p_id)]
)开始应用一个模板,然后此模板递归地应用于所有 p_id
属性值等于其 id
属性值的节点(//*[@p_id = $id]
)。
<?xml version="1.0" encoding="UTF-8" ?>
<xsl:stylesheet version="1.0"
xmlns:xsl="http://www.w3.org/1999/XSL/Transform">
<xsl:template match="/">
<xsl:apply-templates
select="//*[name() = 'node' and not(@p_id)]"/>
</xsl:template>
<xsl:template match="*">
<xsl:variable name="id" select="@id"/>
<xsl:element name="node">
<xsl:apply-templates select="@*"/>
<xsl:apply-templates select="//*[@p_id = $id]"/>
</xsl:element>
</xsl:template>
<xsl:template match="@*">
<xsl:copy-of select="."/>
</xsl:template>
</xsl:stylesheet>
tree_util
XML Web 服务包含两个 Web 方法:
GetDirectTree
– 返回由TREE_GET_XML
收集的 XML 输出;GetStructuredTree
– 返回由TREE_GET_XML
收集并转换为底层树状结构的 XML 输出。
树到 OLAP 的转换
假设此树状结构将作为 OLAP Cube 维度的 Dats Source。可以使用 Analysis Services 轻松创建维度和 Cube。但是,当树状结构发生更改,多了一个级别时会怎样?仅仅刷新维度和 Cube 就足够了吗?答案是:否,Cube 将不会反映这些更改。
我们设想一个数据 Cube,它有一个 TREE_OLAP
维度(包含国家/地区、区域、城市、部门、地理区域等,具体包含什么无关紧要)和一个 TIME
维度。我认为在 OLAP Cube 的创建和更新中一个重要的问题是,如果一个维度的主要结构发生变化,我需要添加或删除一些级别,会发生什么?我如何才能轻松地更新维度和 Cube?并且,如果可能,自动完成。
TREE_OLAP
维度的数据源是 TREE_OLAP
表,该表以完全不同的结构存储树节点。ID-父 ID 树实现无法帮助 OLAP 维度获取和处理数据。它们需要一种更冗余的结构来在不同级别上排列树节点。它们需要在数据源中为每个维度级别设置一个列。它们需要的结构是本文开头第二幅图所示的结构。
每次 TREE
层次结构更改时,都必须重新创建 TREE_OLAP
表。该表中的列是从 TREE
表中获取的树 ID
,以及每个层次结构级别的一个 varchar
列,名为“N”,并带有级别值(例如:如果树结构有三个级别,则 TREE_OLAP
表将有 4 列:ID
int
,N1
varchar(100)
,N2
varchar(100)
,和 N3
varchar(100)
)。
使用 TREE_2_OLAP
存储过程重新创建并填充具有相应值的 TREE_OLAP
表。该过程会检查层次结构的最大级别并重新创建 TREE_OLAP
表。要填充此表,它必须以与 TREE_RECALC_ITER
存储过程类似的方式遍历树(使用两个“堆栈”临时表)。对于当前正在处理的节点,该过程会将记录插入 TREE_OLAP
表中对应的“N”+ LEVEL
列。
SET @SQL = N'INSERT INTO TREE_OLAP VALUES(@ID'
SET @I = 1
WHILE @I <= @MAX_LEVEL
BEGIN
IF @I = @NLEVEL
SET @SQL = @SQL + N', @NAME'
ELSE
SET @SQL = @SQL + N', ''<none>'''
SET @I = @I + 1
END
SET @SQL = @SQL + N')'
EXEC sp_executesql @SQL, N'@ID INT,
@NAME VARCHAR(100)', @ID = @ID, @NAME = @NAME
之后,对于每个祖先,执行对插入记录在相应祖先级别列上的更新。
SET @SQL = N'UPDATE TREE_OLAP SET N' +
LTRIM(RTRIM(STR(@NLEVEL2))) + ' = @NAME WHERE ID = @ID'
EXEC sp_executesql @SQL, N'@ID INT, @NAME VARCHAR(100)',
@ID = @ID, @NAME = @NAME2
创建销售 Cube 和维度
如果我们向 TREE
表添加一些记录,向 TIME_BY_DAY
表添加 2005 年和 2006 年的每日记录,并将数据从 *sales.txt* 文件(包含 TREE_ID
、TIME_ID
和 SALES_VALUE
字段)导入 SALES
表,则可以创建 Sales
Cube。*sales.txt* 文件仅包含 2005 年的记录。
时间维度是通用的,实现方式与 Analysis Services “FoodMart 2000”示例数据库中的类似。fnIsLeapYear
、fnGetDaysNumber
函数和 TIME_BY_DAY_GEN
存储过程在 TIME_BY_DAY
表中生成指定年份的每日记录,该表将作为 TIME
维度的 Dats Source。TREE_OLAP
维度基于星型模式(每个维度只有一个表)。名为 SALES
的事实表具有以下结构:ID
int
,TREE_ID
int
(指向 TREE
表 ID
列的外键),TIME_ID
int
(指向 TIME_BY_DAY
表 ID
列的外键),SALES_VALUES
float
(度量列)。
Cube 数据可视化将是
RefreshSalesCube 应用程序
维度和 Cube 数据库中的更改可以通过 Analysis Services 用户界面手动完成,也可以通过 .NET Framework 1.1 的 DSO(Decision Support Objects – 互操作程序集)命名空间和 .NET Framework 2.0 的 AMO 命名空间以编程方式完成。
RefreshSalesCube
是一个简单的控制台应用程序,它
- 连接到本地 Analysis Server
DSO.ServerClass dsoServer = new ServerClass(); dsoServer.Connect("localhost");
- 查找
TREE_DB
数据库DSO.OlapCollection coll = (DSO.OlapCollection)dsoServer.MDStores; DSO.Database dsoDB = (DSO.Database)coll.Item("TREE_DB");
- 检索
Sales
数据 Cubecoll = (DSO.OlapCollection)dsoDB.MDStores; DSO.Cube dsoCube = (DSO.Cube)coll.Item("Sales");
- 检索
TREE_OLAP
维度coll = (DSO.OlapCollection)dsoDB.Dimensions; DSO.Dimension dsoDim = (DSO.Dimension)coll.Item("TREE_OLAP");
- 重新创建并重新处理维度,分离 Cube 的共享维度,删除维度级别,并基于新的
TREE_OLAP
表的列重新创建新级别dsoCube.Dimensions.Remove(dsoDim.Name); int i = dsoDim.Levels.Count; while(i > 0) { dsoDim.Levels.Remove(i); i--; } SqlConnection cnn = new SqlConnection( ConfigurationSettings.AppSettings["ConnectionString"]); cnn.Open(); SqlDataAdapter da = new SqlDataAdapter("SELECT TOP" + " 0 * FROM TREE_OLAP", cnn); DataTable dt = new DataTable(); da.Fill(dt); cnn.Close(); int nColumns = dt.Columns.Count; int adVarChar = 200; for(i = 1; i < nColumns; i++) { DSO.Level lvl = (DSO.Level)dsoDim.Levels.AddNew( dt.Columns[i].ColumnName, DSO.SubClassTypes.sbclsRegular); lvl.MemberKeyColumn = "\"dbo\".\"TREE_OLAP\".\"" + dt.Columns[i].ColumnName + "\""; lvl.ColumnType = (short)adVarChar; lvl.ColumnSize = 100; } dsoDim.Process(DSO.ProcessTypes.processFull);
- 重新处理 Cube,添加维度并重新创建事实表和维度表之间的连接
DSO.Dimension dim = (DSO.Dimension) dsoCube.Dimensions.AddNew("TREE_OLAP", DSO.SubClassTypes.sbclsRegular); dim = dsoDim; dsoCube.JoinClause = "\"dbo\".\"Sales\".\"TREE_ID\" =" + " \"dbo\".\"TREE_OLAP\".\"ID\" AND" + " \"dbo\".\"Sales\".\"TIME_ID\" =" + " \"dbo\".\"TIME_BY_DAY\".\"ID\""; dsoCube.Update(); dsoCube.Process(DSO.ProcessTypes.processFull);
更新销售 Cube 和 TREE_OLAP 维度
让我们向 TREE
表添加一个新记录,这将增加层次结构的级别。
INSERT INTO TREE(ID, P_ID, NAME) VALUES(21, 20, 'MamaW')
新记录将有一个对应项,在事实表中,在 2006 年的某一天,例如 '2006-07-18',SALES_VALUES
的值为 1000。
DECLARE @TIME_ID INT
SELECT @TIME_ID = ID FROM TIME_BY_DAY WHERE T_DATE = '2006-07-18'
INSERT INTO SALES VALUES(21, @TIME_ID, 1000)
我们可以直接重新创建 TREE_OLAP
表
EXEC TREE_2_OLAP 1
并运行 *RefreshSalesCube* 应用程序。
新的 Cube 数据可视化将反映这些更改,而无需手动更新。
结论
通过在 OLTP 数据库上实现该功能,并使用少量代码重新创建和重新处理 Cube,结构转换和数据更改对于消耗 Cube 数据的报表客户端来说是透明的。该过程可以使用 SQL Server 作业自动执行和计划,从而提高时间和工作效率。