65.9K
CodeProject 正在变化。 阅读更多。
Home

SQL Server 2000 和 2005 中的树状结构实用工具,以及 OLAP 实现

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.72/5 (24投票s)

2006 年 7 月 19 日

CPOL

11分钟阅读

viewsIcon

177563

downloadIcon

3337

本文描述了如何在 SQL Server 数据库中高效地存储和访问树状结构,以及如何在 OLAP 实现中使用它们。

引言

我一直很喜欢树状结构。我在很多情况下都使用过它们,并且一直在寻找在 SQL Server 中管理它们的有效方法。由于 SQL Server(2000 和 2005 版本)中 32 个嵌套级别的限制阻碍了我们递归地解决这个问题,因此我们必须在数据库表中存储和访问嵌套结构时分配额外的开销。在本文中,我将尝试解释如何有效地将树状结构存储在 SQL Server 表中,如何从 T-SQL 和客户端轻松访问它,以及如何将其转换为 Microsoft Analysis Services 中 OLAP 维度识别的结构。

问题是如何高效且轻松地存储和访问树状结构,以及如何将它们转换为如下图所示的结构

Tree structure in DB, SQL XML and structured format

Tree structure in DB and OLAP format

树状结构存储

在关系表中存储树的最著名方法是建立节点与其父节点之间的链接。因此,IDP_ID 字段对于此方法来说足够了,其中根节点的 P_IDNULL。此结构的访问方法可以是:存储节点的绝对路径(从根开始),如果结构是二叉树,则在另一个表中存储左 ID 和右 ID,再次存储另一个表中的 IDP_ID 字段值以及一个 LEVEL 值,等等。在我看来,最后一种方法是最有效的方式,因为使用简单的 SELECT 语句即可轻松访问,而且对于大型结构,存储大小最小。有关树的更多信息,可以从 CodeProject 文章 General trees persisted in relational databases 和 MSDN 技术文章“Hierarchical Data and Scope Checking”中获取。

使用最后一种技术,我们将在表中添加两个附加列:NLEFT,用于存储左索引值;NRIGHT,用于存储右索引值。我们必须仔细设置这些列的相应值,因为节点的“左”值必须始终在其祖先的“左”值和“右”值之间。具备这些条件,只需通过一个简单的自连接并过滤节点即可检索整个层次结构。

SELECT C.ID, C.P_ID, C.NAME
FROM TREE C
INNER JOIN TREE P ON C.NLEFT BETWEEN P.NLEFT AND P.NRIGHT
WHERE P.ID = 1

Tree structure left, right and level values

CREATE TABLE 语句很简单。

CREATE TABLE TREE (
    ID int NOT NULL ,
    P_ID int NULL ,
    NAME varchar (100),
    NLEFT int NOT NULL CONSTRAINT DF_TREE_NLEFT DEFAULT (-1),
    NRIGHT int NOT NULL CONSTRAINT DF_TREE_NRIGHT DEFAULT (-1),
    NLEVEL int NULL CONSTRAINT DF_TREE_NLEVEL DEFAULT (-1),
    CONSTRAINT PK_TREE PRIMARY KEY  CLUSTERED (ID) 
)

我添加了 NLEVEL 列来存储节点的级别,这在后续需求中很有用。根节点的级别从 1 开始。

访问并不难,但是我们如何设置 NLEFTNRIGHT 的相应值呢?我选择了触发器实现,以确保每次插入节点时,层次结构都能正确存储。我们可以更新删除(即使不是必需的)时的 NLEFTNRIGHT 值,以及 P_ID 字段值发生更改时。

我只展示插入操作的触发器,但对于所有更新 P_ID 和删除操作也可以用相同的方式实现。触发器检查新插入的节点是根节点还是后代节点。如果是根节点,它会生成一个新的 NLEFT 值(为每个层次结构分配最多 10000 个值),并将 NLEVEL 设置为 1。如果是一个子节点,则根据该节点是第一个(同一级别没有其他子节点)还是最后一个(对应级别和父节点有其他子节点)添加的子节点来设置值。

IF @P_ID IS NULL 
BEGIN
    SELECT @MAX_VALUE = ISNULL(MAX(NLEFT), 0) + 10000 
           FROM TREE WHERE P_ID IS NULL
    SELECT @NLEVEL = 1
END
ELSE
BEGIN
    SELECT @MAX_VALUE = ISNULL(MAX(NRIGHT), 0) 
           FROM TREE WHERE P_ID = @P_ID AND ID < @ID
    SELECT @NLEVEL = ISNULL(NLEVEL, 0) 
           FROM TREE WHERE P_ID = @P_ID AND ID < @ID
    IF @MAX_VALUE = 0
    BEGIN
        SELECT @MAX_VALUE = ISNULL(NLEFT, 0) FROM TREE WHERE ID = @P_ID
        SELECT @NLEVEL = ISNULL(NLEVEL, 0) + 1 FROM TREE WHERE ID = @P_ID
    END
END
UPDATE TREE SET NLEFT = @MAX_VALUE + 1, 
       NRIGHT = @MAX_VALUE + 2, 
       NLEVEL = @NLEVEL WHERE ID = @ID

在存储值之后,触发器会确保相应的祖先具有正确的 NLEFTNRIGHT 值。这是从根到叶进行的,采用深度优先遍历。首先,它识别根节点,然后应用重新计算算法。

IF @P_ID IS NOT NULL
BEGIN
    SELECT @ROOT_ID = ISNULL(P.ID, -1)
    FROM TREE C 
    INNER JOIN TREE P ON C.NLEFT BETWEEN P.NLEFT AND P.NRIGHT
    WHERE C.ID = @P_ID AND P.P_ID IS NULL
--        EXEC TREE_RECALC @ROOT_ID -- RECURSIVE APPROACH
    EXEC TREE_RECALC_ITER @ROOT_ID -- ITERATIVE APPROACH
END

重新计算或编号算法是

  • 获取根节点的 NLEFT 值;
  • 向下移动层次结构,找到最顶层父节点从左到右的第一个子节点,并为其分配 NLEFT 值 2;
  • 继续向下移动层次结构,将 NLEFT 值设置为最后一个 NLEFT + 1,如果找到叶节点,则计算其 NLEFTNRIGHT 值(NRIGHT = NLEFT + 1);
  • 继续对当前层次结构当前级别的所有后续可能子节点进行此编号;
  • 在所有叶子子节点编号完成后,返回到它们的父节点,并将其 NRIGHT 值设置为其子节点的最大 NRIGHT 值 + 1;
  • 继续为当前节点的同级节点进行编号;
  • 继续遍历层次结构,直到所有节点都已编号;

该算法有两种实现方式:递归和迭代。

递归方法很简单,可以在 TREE_RECALC_REC 存储过程中找到。该存储过程会检查当前处理的节点是否具有子节点。如果具有子节点,则对所有子节点递归调用自身,并将 @NRIGHT 输出参数设置为相应的值。如果节点没有子节点(它是叶节点),则仅设置 NLEFTNRIGHT 值。

SELECT @NCOUNT = COUNT(*), @NLEVEL2 = @NLEVEL + 1
FROM TREE
WHERE P_ID = @NPARENT

IF @NCOUNT > 0
BEGIN
    SELECT @NNEWLEFT = @NLEFT + 1
    SELECT @NTEMPID = 0
 
    SELECT @NTEMPID = MIN(ID), @NCOUNTER = COUNT(*) 
    FROM TREE 
    WHERE P_ID = @NPARENT AND ID > @NTEMPID
  
    WHILE (@NCOUNTER > 0)
    BEGIN
        EXEC TREE_RECALC_REC @NNEWLEFT, 
             @NTEMPID, @NLEVEL2, @NRIGHT OUTPUT
        SELECT @NNEWLEFT = @NRIGHT + 1
        SELECT @NTEMPID = MIN(ID), @NCOUNTER = COUNT(*) 
        FROM TREE 
        WHERE P_ID = @NPARENT AND ID > @NTEMPID
    END
 
    UPDATE TREE SET NLEFT = @NLEFT, 
           NRIGHT = @NRIGHT + 1, NLEVEL = @NLEVEL 
    WHERE ID = @NPARENT
    SELECT @NRIGHT = @NRIGHT + 1
END
ELSE
BEGIN
    SELECT @NRIGHT = @NLEFT + 1
    UPDATE TREE SET NLEFT = @NLEFT, 
           NRIGHT = @NRIGHT, 
           NLEVEL = @NLEVEL WHERE ID = @NPARENT
END

众所周知,32 个嵌套级别的限制迫使我们**不能**在层次结构中存储超过 32 个级别。

迭代方式解决了 32 个嵌套级别限制的问题。解决方法是使用“堆栈”,正如 MSDN 文章 Expanding Hierarchies 中所述。 “堆栈”是一个临时表,用于存储属于同一父节点的所有节点,从根节点开始。节点通过一个级别变量在循环中使用,当“堆栈”中没有当前级别的节点时,级别将减 1(处理前一个父节点的级别)。级别变量的值从 1 到 MAX(hierarchy level)。当您要检索子节点的祖先时,也可以应用相同的算法,但级别变量将从子节点级别开始递减到 1。当处理完子层次结构中的最后一个叶节点(同一级别和父节点中具有最大 ID 值的节点)后,应该对祖先应用编号算法。为此,我们需要另一个“堆栈”表,它允许从子节点到祖节点的编号。实现确实需要大量开销,但嵌套级别是无限的。TREE_RECALC_ITER 存储过程包含迭代重新计算算法。

WHILE @NLEVEL > 0
BEGIN
    IF EXISTS(SELECT * FROM #TPC WHERE NLEVEL = @NLEVEL)
    BEGIN
        SELECT TOP 1 @ID = ID, @P_ID = P_ID, 
               @NAME = NAME FROM #TPC WHERE NLEVEL = @NLEVEL ORDER BY ID

        INSERT INTO #T VALUES(@ID, @P_ID, @NAME, @NLEFT, @NRIGHT, @NLEVEL)

        DELETE FROM #TPC WHERE NLEVEL = @NLEVEL AND ID = @ID

        INSERT INTO #TPC SELECT ID, P_ID, NAME, 
               @NLEVEL + 1 FROM TREE WHERE P_ID = @ID
        IF @@ROWCOUNT > 0
            SET @NLEVEL = @NLEVEL + 1

        IF EXISTS(SELECT ID FROM TREE WHERE P_ID = @ID)
        BEGIN
            SET @NLEFT = @NLEFT + 1
            SET @NRIGHT = @NLEFT + 1
        END
        ELSE
        BEGIN
            IF @ID = (SELECT MAX(ID) FROM TREE WHERE 
                      NLEVEL = @NLEVEL AND P_ID = @P_ID)
            BEGIN
                PRINT LTRIM(RTRIM(STR(@ID))) + '    ' + 
                      @NAME + '    ' + LTRIM(RTRIM(STR(@NRIGHT)))
                SET @NLEVEL2 = 1
                TRUNCATE TABLE #TCP
                INSERT INTO #TCP SELECT ID, P_ID, NAME, 
                            @NLEVEL2 FROM #T WHERE ID = @P_ID
                WHILE @NLEVEL2 > 0
                BEGIN
                    IF EXISTS(SELECT * FROM #TCP WHERE NLEVEL = @NLEVEL2)
                    BEGIN
                        SELECT TOP 1 @ID2 = ID, @P_ID2 = P_ID, 
                                     @NAME2 = NAME FROM #TCP 
                                     WHERE NLEVEL = @NLEVEL2 ORDER BY ID
                        SET @NRIGHT = @NRIGHT + 1
                        UPDATE #T SET NRIGHT = @NRIGHT WHERE ID = @ID2
                        DELETE FROM #TCP WHERE NLEVEL = @NLEVEL2 AND ID = @ID2
                        INSERT INTO #TCP SELECT ID, P_ID, NAME, 
                               @NLEVEL2 + 1 FROM #T WHERE ID = @P_ID2
                        IF @@ROWCOUNT > 0
                            SET @NLEVEL2 = @NLEVEL2 + 1
                    END
                    ELSE
                    BEGIN
                        SET @NLEVEL2 = @NLEVEL2 - 1
                    END
                END

            END
            ELSE
            BEGIN
                SET @NLEFT = @NRIGHT + 1
                SET @NRIGHT = @NLEFT + 1
            END
        END

    END
    ELSE
    BEGIN
        SET @NLEVEL = @NLEVEL - 1
        SELECT @NLEFT = MAX(NRIGHT) + 1 FROM #T WHERE NLEVEL = @NLEVEL
        SET @NRIGHT = @NLEFT + 1
    END
END

树状结构访问

现在可以使用简单的 SQL 语句轻松访问层次结构。

  • 选择整个层次结构
    SELECT C.ID, C.P_ID, C.NAME
    FROM TREE C
    INNER JOIN TREE P ON C.NLEFT BETWEEN P.NLEFT AND P.NRIGHT
    WHERE P.ID = 1
  • 选择节点的所有祖先
    SELECT P.ID, P.NAME, P.P_ID
    FROM TREE C 
    INNER JOIN TREE P ON C.NLEFT BETWEEN P.NLEFT AND P.NRIGHT
    WHERE C.ID = 5
  • 选择层次结构中的所有叶节点
    SELECT C.ID, C.P_ID, C.NAME
    FROM TREE C
    INNER JOIN TREE P ON C.NLEFT BETWEEN P.NLEFT AND P.NRIGHT
    WHERE P.ID = 1 AND C.NRIGHT - C.NLEFT = 1
  • 检索层次结构中的所有非叶节点
    SELECT C.ID, C.P_ID, C.NAME
    FROM TREE C
    INNER JOIN TREE P ON C.NLEFT BETWEEN P.NLEFT AND P.NRIGHT
    WHERE P.ID = 1 AND C.NRIGHT – C.NLEFT <> 1
  • 以 XML 模式选择整个结构(包含两个嵌套级别,如 TREE_GET_XML 存储过程中实现的)
    SELECT 
        1 AS TAG
        , NULL AS PARENT
        , '' AS [root!1]
        , NULL AS [node!2!id]
        , NULL AS [node!2!p_id]
        , T.NAME AS [node!2!name]
    FROM TREE T
    WHERE T.ID = @ROOT_ID
    UNION
    SELECT 2, 1, NULL, C.ID, C.P_ID, C.NAME
    FROM TREE P
    INNER JOIN TREE C ON C.NLEFT BETWEEN P.NLEFT AND P.NRIGHT
    WHERE P.ID = @ROOT_ID
    FOR XML EXPLICIT

最后一个 XML 输出对客户端来说并不那么有用。它们可能需要 SQL Server XML 功能无法获得的底层树结构,而不是这种“虚假的”ID-父 ID 实现。因此,下一个想法将是使用 TREE_GET_XML 存储过程、一个 XML Web 服务和一个通用的 XSL 转换样式表来重新排列“虚假的”树结构,使其成为嵌套的 XML 层次结构。样式表非常简单。它从根节点(//*[not(@p_id)])开始应用一个模板,然后此模板递归地应用于所有 p_id 属性值等于其 id 属性值的节点(//*[@p_id = $id])。

<?xml version="1.0" encoding="UTF-8" ?>
<xsl:stylesheet version="1.0" 
    xmlns:xsl="http://www.w3.org/1999/XSL/Transform">

<xsl:template match="/">
    <xsl:apply-templates 
        select="//*[name() = 'node' and not(@p_id)]"/>
</xsl:template>

<xsl:template match="*">
    <xsl:variable name="id" select="@id"/>
    <xsl:element name="node">
        <xsl:apply-templates select="@*"/>
        <xsl:apply-templates select="//*[@p_id = $id]"/>
    </xsl:element>
</xsl:template>

<xsl:template match="@*">
    <xsl:copy-of select="."/>
</xsl:template>

</xsl:stylesheet>

tree_util XML Web 服务包含两个 Web 方法:

  • GetDirectTree – 返回由 TREE_GET_XML 收集的 XML 输出;
  • GetStructuredTree – 返回由 TREE_GET_XML 收集并转换为底层树状结构的 XML 输出。

树到 OLAP 的转换

假设此树状结构将作为 OLAP Cube 维度的 Dats Source。可以使用 Analysis Services 轻松创建维度和 Cube。但是,当树状结构发生更改,多了一个级别时会怎样?仅仅刷新维度和 Cube 就足够了吗?答案是:否,Cube 将不会反映这些更改。

我们设想一个数据 Cube,它有一个 TREE_OLAP 维度(包含国家/地区、区域、城市、部门、地理区域等,具体包含什么无关紧要)和一个 TIME 维度。我认为在 OLAP Cube 的创建和更新中一个重要的问题是,如果一个维度的主要结构发生变化,我需要添加或删除一些级别,会发生什么?我如何才能轻松地更新维度和 Cube?并且,如果可能,自动完成。

TREE_OLAP 维度的数据源是 TREE_OLAP 表,该表以完全不同的结构存储树节点。ID-父 ID 树实现无法帮助 OLAP 维度获取和处理数据。它们需要一种更冗余的结构来在不同级别上排列树节点。它们需要在数据源中为每个维度级别设置一个列。它们需要的结构是本文开头第二幅图所示的结构。

每次 TREE 层次结构更改时,都必须重新创建 TREE_OLAP 表。该表中的列是从 TREE 表中获取的树 ID,以及每个层次结构级别的一个 varchar 列,名为“N”,并带有级别值(例如:如果树结构有三个级别,则 TREE_OLAP 表将有 4 列:ID intN1 varchar(100)N2 varchar(100),和 N3 varchar(100))。

使用 TREE_2_OLAP 存储过程重新创建并填充具有相应值的 TREE_OLAP 表。该过程会检查层次结构的最大级别并重新创建 TREE_OLAP 表。要填充此表,它必须以与 TREE_RECALC_ITER 存储过程类似的方式遍历树(使用两个“堆栈”临时表)。对于当前正在处理的节点,该过程会将记录插入 TREE_OLAP 表中对应的“N”+ LEVEL 列。

SET @SQL = N'INSERT INTO TREE_OLAP VALUES(@ID'
SET @I = 1
WHILE @I <= @MAX_LEVEL
BEGIN
    IF @I = @NLEVEL
    SET @SQL = @SQL + N', @NAME'
    ELSE
        SET @SQL = @SQL + N', ''<none>'''
    SET @I = @I + 1
END
SET @SQL = @SQL + N')'
EXEC sp_executesql @SQL, N'@ID INT, 
     @NAME VARCHAR(100)', @ID = @ID, @NAME = @NAME

之后,对于每个祖先,执行对插入记录在相应祖先级别列上的更新。

SET @SQL = N'UPDATE TREE_OLAP SET N' + 
           LTRIM(RTRIM(STR(@NLEVEL2))) + ' = @NAME WHERE ID = @ID'
EXEC sp_executesql @SQL, N'@ID INT, @NAME VARCHAR(100)', 
                   @ID = @ID, @NAME = @NAME2

创建销售 Cube 和维度

如果我们向 TREE 表添加一些记录,向 TIME_BY_DAY 表添加 2005 年和 2006 年的每日记录,并将数据从 *sales.txt* 文件(包含 TREE_IDTIME_IDSALES_VALUE 字段)导入 SALES 表,则可以创建 Sales Cube。*sales.txt* 文件仅包含 2005 年的记录。

时间维度是通用的,实现方式与 Analysis Services “FoodMart 2000”示例数据库中的类似。fnIsLeapYearfnGetDaysNumber 函数和 TIME_BY_DAY_GEN 存储过程在 TIME_BY_DAY 表中生成指定年份的每日记录,该表将作为 TIME 维度的 Dats Source。TREE_OLAP 维度基于星型模式(每个维度只有一个表)。名为 SALES 的事实表具有以下结构:ID intTREE_ID int(指向 TREEID 列的外键),TIME_ID int(指向 TIME_BY_DAYID 列的外键),SALES_VALUES float(度量列)。

Cube 数据可视化将是

Sales cube data visualisation

RefreshSalesCube 应用程序

维度和 Cube 数据库中的更改可以通过 Analysis Services 用户界面手动完成,也可以通过 .NET Framework 1.1 的 DSO(Decision Support Objects – 互操作程序集)命名空间和 .NET Framework 2.0 的 AMO 命名空间以编程方式完成。

RefreshSalesCube 是一个简单的控制台应用程序,它

  • 连接到本地 Analysis Server
    DSO.ServerClass dsoServer = new ServerClass();
    dsoServer.Connect("localhost");
  • 查找 TREE_DB 数据库
    DSO.OlapCollection coll = (DSO.OlapCollection)dsoServer.MDStores;
    DSO.Database dsoDB = (DSO.Database)coll.Item("TREE_DB");
  • 检索 Sales 数据 Cube
    coll = (DSO.OlapCollection)dsoDB.MDStores;
    DSO.Cube dsoCube = (DSO.Cube)coll.Item("Sales");
  • 检索 TREE_OLAP 维度
    coll = (DSO.OlapCollection)dsoDB.Dimensions;
    DSO.Dimension dsoDim = (DSO.Dimension)coll.Item("TREE_OLAP");
  • 重新创建并重新处理维度,分离 Cube 的共享维度,删除维度级别,并基于新的 TREE_OLAP 表的列重新创建新级别
    dsoCube.Dimensions.Remove(dsoDim.Name);
    
    int i = dsoDim.Levels.Count;
    while(i > 0)
    {
        dsoDim.Levels.Remove(i);
        i--;
    }
    
    SqlConnection cnn = new SqlConnection(
      ConfigurationSettings.AppSettings["ConnectionString"]);
    cnn.Open();
    SqlDataAdapter da = new SqlDataAdapter("SELECT TOP" + 
                             " 0 * FROM TREE_OLAP", cnn);
    DataTable dt = new DataTable();
    da.Fill(dt);
    cnn.Close();
    
    int nColumns = dt.Columns.Count;
    int adVarChar = 200;
    for(i = 1; i < nColumns; i++)
    {
        DSO.Level lvl = (DSO.Level)dsoDim.Levels.AddNew(
                         dt.Columns[i].ColumnName, 
                         DSO.SubClassTypes.sbclsRegular);
        lvl.MemberKeyColumn = "\"dbo\".\"TREE_OLAP\".\"" + 
                              dt.Columns[i].ColumnName + "\"";
        lvl.ColumnType = (short)adVarChar;
        lvl.ColumnSize = 100;
    }
    dsoDim.Process(DSO.ProcessTypes.processFull);
  • 重新处理 Cube,添加维度并重新创建事实表和维度表之间的连接
    DSO.Dimension dim = (DSO.Dimension)
       dsoCube.Dimensions.AddNew("TREE_OLAP", 
       DSO.SubClassTypes.sbclsRegular);
    dim = dsoDim;
    
    dsoCube.JoinClause = "\"dbo\".\"Sales\".\"TREE_ID\" =" + 
                         " \"dbo\".\"TREE_OLAP\".\"ID\" AND" + 
                         " \"dbo\".\"Sales\".\"TIME_ID\" =" + 
                         " \"dbo\".\"TIME_BY_DAY\".\"ID\"";
    
    dsoCube.Update();
    
    dsoCube.Process(DSO.ProcessTypes.processFull);

更新销售 Cube 和 TREE_OLAP 维度

让我们向 TREE 表添加一个新记录,这将增加层次结构的级别。

INSERT INTO TREE(ID, P_ID, NAME) VALUES(21, 20, 'MamaW')

新记录将有一个对应项,在事实表中,在 2006 年的某一天,例如 '2006-07-18',SALES_VALUES 的值为 1000。

DECLARE @TIME_ID INT
SELECT @TIME_ID = ID FROM TIME_BY_DAY WHERE T_DATE = '2006-07-18'
INSERT INTO SALES VALUES(21, @TIME_ID, 1000)

我们可以直接重新创建 TREE_OLAP

EXEC TREE_2_OLAP 1

并运行 *RefreshSalesCube* 应用程序。

新的 Cube 数据可视化将反映这些更改,而无需手动更新。

Sales cube data visualisation after structure and data change

结论

通过在 OLTP 数据库上实现该功能,并使用少量代码重新创建和重新处理 Cube,结构转换和数据更改对于消耗 Cube 数据的报表客户端来说是透明的。该过程可以使用 SQL Server 作业自动执行和计划,从而提高时间和工作效率。

© . All rights reserved.