65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 Cassandra 进行 DotNet 编程

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.94/5 (9投票s)

2014 年 4 月 13 日

CPOL

9分钟阅读

viewsIcon

57907

downloadIcon

954

一篇关于使用 Cassandra 存储进行 DotNet 编程的文章

大数据

这是什么意思?

公司/行业一直在寻求最大限度地利用信息来提升业务能力。它们产生了大量以多种不同形式快速生成的数据,如生产数据库、交易历史、网络流量日志、在线视频、社交媒体互动等。2001 年,道格·兰尼 (Doug Laney) 最初提出了“三个 V”的概念——体量 (volume)、速度 (velocity) 和多样性 (variety)。

  • 体量 - 数据的绝对数量
  • 速度 - 数据处理的速度
  • 多样性 - 数据的种类数量

3Vs

我们为什么需要它?

在过去的十年里,IT 数据呈指数级增长。我们生活在大数据时代。很难衡量电子存储的总数据量,但 IDC 估计 2006 年“数字宇宙”的大小为 0.18 泽字节,并预测到 2011 年将增长十倍,达到 1.8 泽字节。行业基准也显示了同样的情况。

growth

随着数据飞速增长,存储成本 (1GB) 盘的成本也在以类似的比例下降。行业数据显示,自 1980 年以来,该图表显示了这一点。它始于 200 万美元;现在不到 10 美分。

storecost

大数据有助于应用程序开发团队无缝适应现代技术概念

  • 高可用性
  • 横向扩展架构
  • 故障转移恢复
  • 高度分布式

在技术革命的推动下,企业更加关注其用例中的供需关系。随着数据量从 8 EB 增长到 8K EB,以及硬盘成本从 200 万美元下降到 2 美分,企业正被推向大数据技术的实施。

它如何分类?

已经实现了四种不同的数据模型。
1. 列族或宽列存储
2. 文档存储
3. 键/值存储
4. 图形数据存储

1. 列族或宽列存储

示例:Cassandra, HBase
典型用途:分布式数据存储

列族基本上扩展了典型的键/值对存储,并提供了两级嵌套。嵌套的键/值对称为列族存储中的列。每个列都可以与一个键分组,从而提供超级列功能。典型的用例将是具有大量读/写操作的应用程序。

2. 文档存储

示例:CouchDB, MongoDBv
典型用途:Web 应用程序。

基于文档的数据模型用于存储和检索面向文档或半结构化的信息,这在基于 Web 的应用程序中很常见。通常,所有现代的基于文档的数据模型都遵循 XML、JSON 或 BSON 模型,这些模型易于与数据模型映射。它还使 API 能够轻松地与数据库交互,因为大多数现代语言都内置支持这些格式。

3. 键/值存储

示例:Membase, Redis

典型用途:分布式哈希表、缓存

键/值存储功能类似于典型的哈希表,它将值与键相关联。这将允许创建无模式的存储。当读操作多于写操作时,它将发挥最佳作用。例如,我们每 2 分钟需要显示论坛上的最新帖子,因此有意义的做法是每 2 分钟运行一个后台作业,将其存储为键值对,然后从那里读取数据。许多内容密集型网站使用内存键/值对存储系统。

4. 图数据库

示例:Neo4J, InfoGrid

典型用途:社交网络、推荐

图数据模型通常遵循图算法,以节点和边的方式存储数据。图数据库最适合我们遇到递归数据模型的情况。每个节点都将具有以下属性:节点属性、关系(边)。

type

任务

目标

以一个问题陈述为例,解释如何使用大数据 Cassandra 存储构建 .NET 应用程序。在此,我们将使用两个事务系统——借记 (Debit)、贷记 (Credit) 和两个参考系统——客户 (Customer)、汇率 (ForexRate) 进行简单的配置文件计算。

Scope

传统

针对给定的问题,使用关系数据存储开发了应用程序。关系数据库模型最早由 Edgar F. Codd 在 1969 年底提出。关系数据模型的基本假设是,数据表示为数学 n 元关系,它是笛卡尔积的子集。数据通过集合论的关系代数进行访问,并通过主键和外键等约束来保证一致性。

在当前的数据湖世界中,随着行业数据的增长,传统的数据存储方法将无法扩展。

目标状态

本文将讨论如何使用大数据 Cassandra 后端存储而不是传统方法来构建 .NET 应用程序。本文从架构设计到 .NET 代码实现,都解决了这个问题。

架构

基础结构

由于它采用三层架构,因此基础设施需要表示层、业务层和存储层(Cassandra)。借助高度可用的点对点集群模型的优势,Cassandra 层使用 2 节点集群构建。

Infra

业务层和存储层使用名为 CassandraSharp 的大数据 Cassandra 连接器连接。您可以在 GitHub 参考 上找到有关 CassandraSharp 的更多信息。

逻辑

逻辑架构定义了提供业务需求所需的活动和功能的流程。逻辑架构独立于技术和实现。

Logic

在我们的任务中,功能被划分为 6 个类别。控制台登录是输入层,结果是输出。全局数据容器是跨应用程序的数据持有者。关键功能涵盖在剩余的 3 个区域:加载器 (Loader)、业务引擎 (BusinessEngine) 和数据访问 (DataAccess)。

加载器模块在初始 (AppInit) 过程中加载事务、参考和业务规则。业务引擎是根据输入选择计算规则的业务层。数据访问是数据连接层,用于将信息加载/存储到 Cassandra 存储中。

Data

给定的问题陈述涉及 2 个事务数据和 2 个参考数据。它们分别标记为借记 (Debit)、贷记 (Credit) 和客户 (Customer)、汇率 (ForexRate)。下图描绘了这一点。

Data

实现

针对给定的问题陈述,我们已经涵盖了目标、基础设施、逻辑和数据架构。现在让我们使用 Cassandra 作为数据存储,深入探讨使用 .NET 编程的实际实现。

Cassandra 查询

由于我们将 Cassandra 作为存储,因此 Cassandra 表(列族)会根据我们的数据模型在后台创建。创建 Cassandra 表(列族)的实际 CQL(Cassandra 查询语言)如下所示。

CREATE TABLE Debit (
Trans_id int PRIMARY KEY,
Customer_id int,
Trans_amount varchar,
Trans_date varchar
)

CREATE TABLE Customer (
Customer_id int PRIMARY KEY,
Name varchar,
Location varchar
)

Cassandra 连接器

传统上,我们知道 ADO.NET 为 SQL Server 和 XML 等数据源提供一致的访问,以及通过 OLE DB 和 ODBC 暴露的数据源。因此,ADO.NET 将数据访问与数据操作分离成离散的组件,这些组件可以单独或协同使用。ADO.NET 包含 .NET Framework 数据提供程序,用于连接数据库、执行命令和检索结果。

同样,可以使用一个名为 CassandraSharp 的开源工具以编程方式连接大数据存储 Cassandra。它是 Apache Cassandra 为高性能 .NET 驱动程序提供的贡献的一部分。命名空间 CassandraSharp 包含 ClusterManager、TransportConfig、ClusterConfig、BehaviourConfig 等。GitHub 参考页面可在 https://github.com/pchalamet/cassandra-sharp 获得。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using Apache.Cassandra;
using CassandraSharp;
using CassandraSharp.Config;

namespace DataAccess
{
    public abstract class BaseDataAccess : IDisposable
    {
        private string[] myClusters;
        private int myPort;

        public BaseDataAccess(string[] clusters, int port)
        {
            myClusters = clusters;
            myPort = port;
        }

        protected ICluster GetCluster()
        {
            CassandraSharpConfig config = new CassandraSharpConfig();
            ClusterConfig clusterConfig = new ClusterConfig();
            TransportConfig transConfig = new TransportConfig();            
            clusterConfig.Name = "TestCassandra";
            transConfig.Port = myPort;
            clusterConfig.Transport = new TransportConfig();

            EndpointsConfig endPointConfig = new EndpointsConfig();
            endPointConfig.Servers = myClusters;
            endPointConfig.Snitch = SnitchType.Simple;
            endPointConfig.Strategy = EndpointStrategy.Nearest;

            BehaviorConfig behaveConfig = new BehaviorConfig();
            behaveConfig.KeySpace = ConfigEntries.DefaultDatabase;
            if (!String.IsNullOrWhiteSpace(ConfigEntries.UserName)) behaveConfig.User = ConfigEntries.UserName;
            if (!String.IsNullOrWhiteSpace(ConfigEntries.Password)) behaveConfig.Password = ConfigEntries.Password;
            behaveConfig.ReadConsistencyLevel = Apache.Cassandra.ConsistencyLevel.ONE;
            behaveConfig.WriteConsistencyLevel = Apache.Cassandra.ConsistencyLevel.ONE;

            clusterConfig.Transport = transConfig;
            clusterConfig.Endpoints = endPointConfig;
            clusterConfig.BehaviorConfig = behaveConfig;

            config.Clusters = new ClusterConfig[] { clusterConfig };
            
            //We need to ensure that the connection is not initialized before configuring...
            ClusterManager.Shutdown();
            
            ClusterManager.Configure(config);

            ICluster cluster = ClusterManager.GetCluster("TestCassandra");
            return cluster;
        }

        protected DataTable ConvertCqlResultToDataTable(CqlResult result, string tableName)
        {
            DataCommon common = new DataCommon();
            DataTable store = common.GetSchema(result, tableName);
            return PopulateData(result, common, store);
        }

        private DataTable PopulateData(CqlResult result, DataCommon common, DataTable store)
        {
            string columnName = string.Empty;
            foreach (CqlRow row in result.Rows)
            {
                DataRow dataRow = store.NewRow();
                foreach (Column column in row.Columns)
                {
                    columnName = common.GetValue<string>(column.Name);
                    dataRow[columnName] = common.GetValue(store.Columns[columnName], column.Value);
                }
                store.Rows.Add(dataRow);
            }
            return store;
        }

        public void Dispose()
        {
            ClusterManager.Shutdown();
        }
    }
}

在我们的 DataAccess 对象中,GetCluster 方法从应用程序配置文件中检索 Cassandra 集群的详细信息。它涵盖了集群的完整详细信息,如服务器地址、用户凭据、一致性级别、端点策略等。

我们需要一个通用方法来根据给定的 Cassandra 表名以 DataTable 的形式获取结果。ConvertCqlResultToDataTable 方法满足此要求。

PopulateData 方法是前一个方法的内部方法。PopulateData 使用元数据读取 Cassandra 表的每一行和每一列;然后将结果以 DataTable 格式返回。

数据类型访问

.NET 框架和 Cassandra 存储数据类型的表示方式不同。本节介绍这两个技术数据类型的同步。Cassandra 按列存储所有内容,这些列内部由三个属性组成:

属性 类型
名称 CompareWith 类型
二进制
时间戳 64 位整数

带 CompareWith 类型的名称在配置中设置,可以是 ASCII、UTF8、LexicalUUID、TimeUUID、Long 或 Bytes。换句话说,在 .NET 世界中,它们可以是 string、Guid、DateTime、long 或 byte[]。Value 只能是 Bytes 或 byte[] 类型。Timestamp 用于 Cassandra 服务器之间的同步,不应直接控制。下图描绘了设置并保存列的 Value 属性时发生的情况。

DataType

从您将属性设置为所选类型到它在 Cassandra 中保存,它会经历两个您可能不知道的步骤,首先,类型被序列化并存储在 Fluent Cassandra 的灵活 BytesType 中,该类型足够智能,可以了解如何将常见的运行时类型序列化为二进制,这样您作为开发人员就无需担心低级别地与 Cassandra 数据库交互。这个智能类型系统也是 ASCII、UTF8、LexicalUUID、TimeUUID、Long 和 Bytes 类型的主要驱动程序,它们也有助于正确序列化列的 Name 属性。

有了这些概念,就创建了下面的 DataCommon 类来处理 .NET 和 Cassandra 存储之间的所有数据类型。

namespace DataAccess
{
    internal class DataCommon
    {
        internal DataTable GetSchema(CqlResult result, string tableName)
        {
            if (result != null && result.Type == CqlResultType.ROWS)
            {
                return BuildTable(result.Schema, tableName);
            }
            else throw new ArgumentNullException("result", "'result' parameter must not be empty and it should contain atleast one row");
        }

        internal DateTime GetDate(byte[] value)
        {
            if (BitConverter.IsLittleEndian) Array.Reverse(value);
            return GetDateTimeFromLong(BitConverter.ToInt64(value, 0));
        }

        internal string GetName(byte[] value)
        {
            return GetValue<string>(value);
        }

        static IDictionary<string,>> dataProcessors;
        private IDictionary<string,>> GetDataProcessors()
        {
            if (dataProcessors == null)
            {
                //TODO: More data type processors needs to be added.
                dataProcessors = new Dictionary<string,>>();
                dataProcessors["string"] = (byteValue) => GetValue<string>(byteValue);
                dataProcessors["decimal"] = (byteValue) => GetIntValue(byteValue);
                dataProcessors["double"] = (byteValue) => GetValue(byteValue);
                dataProcessors["bool"] = (byteValue) => GetValue<bool>(byteValue);
                dataProcessors["int"] = (byteValue) => GetIntValue(byteValue);
                dataProcessors["long"] = (byteValue) => GetValue<long>(byteValue);
                dataProcessors["datetime"] = (byteValue) => GetDate(byteValue);
            }
            return dataProcessors;
        }

        internal object GetValue(DataColumn column, byte[] value)
        {
            return GetDataProcessors()[column.DataType.Name.ToLower()](value);
        }


        internal decimal GetDecimalValue(byte[] value)
        {
            //check that it is even possible to convert the array
            if (value.Count() != 16)
                throw new Exception("A decimal must be created from exactly 16 bytes");
            //make an array to convert back to int32
            Int32[] bits = new Int32[4];
            for (int i = 0; i <= 15; i += 4)
            {
                //convert every 4 bytes into an int32
                bits[i / 4] = BitConverter.ToInt32(value, i);
            }
            return new decimal(bits);
        }

        internal double GetValue(byte[] value)
        {
            if (BitConverter.IsLittleEndian)
                Array.Reverse(value); //need the bytes in the reverse order
            return BitConverter.ToDouble(value, 0);
        }

        internal int GetIntValue(byte[] value)
        {
            if (BitConverter.IsLittleEndian)
                Array.Reverse(value); //need the bytes in the reverse order
            return BitConverter.ToInt32(value, 0);
        }

        internal T GetValue<t>(byte[] value)
        {
            return (T)Convert.ChangeType(Encoding.Default.GetString(value), typeof(T));
        }

        internal long GetDateTimeInLong(DateTime value)
        {
            DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
            TimeSpan elapsedTime = value - Epoch;
            return (long)elapsedTime.TotalSeconds;
        }

        internal DateTime GetDateTimeFromLong(long value)
        {
            return new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc).AddSeconds(Math.Round(value * 1.0));
        }

        private DataTable BuildTable(CqlMetadata metadata, string tableName)
        {
            DataTable dataStore = new DataTable();

            foreach (KeyValuePair column in metadata.Value_types)
            {
                DataColumn dataColumn = new DataColumn();
                dataColumn.ColumnName = GetValue(column.Key);
                dataColumn.DataType = GetColumnType(column.Value);
                dataStore.Columns.Add(dataColumn);
            }
            return dataStore;
        }

        static IDictionary typeProvider;
        private IDictionary GetCqlToDotNetTypeProviders()
        {
            if (typeProvider == null)
            {
                typeProvider = new Dictionary();
                typeProvider["AsciiType"] = typeof(string);
                typeProvider["BytesType"] = typeof(byte[]);
                typeProvider["BooleanType"] = typeof(bool);
                typeProvider["CounterColumnType"] = typeof(int);
                typeProvider["DateType"] = typeof(DateTime);
                typeProvider["DecimalType"] = typeof(decimal);
                typeProvider["DoubleType"] = typeof(double);
                typeProvider["DynamicCompositeType"] = typeof(string);
                typeProvider["FloatType"] = typeof(decimal);
                typeProvider["IntegerType"] = typeof(int);
                typeProvider["LexicalUUIDType"] = typeof(Guid);
                typeProvider["LongType"] = typeof(long);
                typeProvider["TimeUUIDType"] = typeof(DateTime);
                typeProvider["UTF8Type"] = typeof(string);
                typeProvider["UUIDType"] = typeof(Guid);
            }
            return typeProvider;
        }

        private Type GetColumnType(string cqlType)
        {
            return GetCqlToDotNetTypeProviders()[cqlType];
        }
    }
}

类型转换就足以告诉 BytesType 对象如何将二进制数据反序列化为 .NET 可理解的运行时类型。这一切都通过大量的运算符魔法完成,但结果是一样的。您从数据库中获取的数据类型与您输入到数据库中的类型相同。

数据访问对象

在业务层实现方面,DAO (Data Access Object) 是连接和处理的关键。在我们的练习中,创建了如下的 2 个事务 DAO 和 2 个参考 DAO。

namespace DataAccess
{
    public class CreditDAO : BaseDataAccess, ISelectAllData, ISelectData
    {
        public CreditDAO()
            : base(ConfigEntries.Clusters, ConfigEntries.Port)
        { }

        DataTable ISelectData.GetSpecificData(string query, object[] parameters)
        {
            CqlResult result = base.GetCluster().ExecuteCql(string.Format(query, parameters));
            return ConvertCqlResultToDataTable(result, "Credit");
        }

        DataTable ISelectAllData.GetData()
        {
            CqlResult result = base.GetCluster().ExecuteCql(DbConstants.SelectCreditData);
            return ConvertCqlResultToDataTable(result, "Credit");
        }
    }
}

DAO 通过 BaseDataAccess 对象进行扩展。SelectData 接口根据给定参数检索特定数据。而 SelectAllData 接口则获取特定 DAO 的全部数据。

数据公共

作为应用程序中的公共/全局配置条目,ConfigEntries 对象在 Common 命名空间下创建。我们的 ConfigEntries 类具有集群服务器、端口、默认数据库、用户凭据等通用属性。

namespace Common
{
    public class ConfigEntries
    {
        public static string[] Clusters = ConfigurationManager.AppSettings["Clusters"].Split(new string[] { "|" }, StringSplitOptions.RemoveEmptyEntries);
        public static int Port = Convert.ToInt32(ConfigurationManager.AppSettings["Port"]);
        public static string DefaultDatabase = ConfigurationManager.AppSettings["DefaultDatabase"];
        public static string UserName = ConfigurationManager.AppSettings["UserName"];
        public static string Password = ConfigurationManager.AppSettings["Password"];
    }
}

通过编写所有这些核心模块代码,我们的目标已根据架构设计和实现的基线得以实现。最终,它有助于/指导我们使用 Cassandra 存储来构建端到端的应用程序。.NET 技术。

关注点

希望您会觉得了解大数据-Cassandra 和 .NET 编程概念以及存储连接理念很有趣,而不是仅仅进行传统的代码级实现。

历史

  • 版本 1.0 - 初始版本。
© . All rights reserved.