使用 Cassandra 进行 DotNet 编程






4.94/5 (9投票s)
一篇关于使用 Cassandra 存储进行 DotNet 编程的文章
大数据
这是什么意思?
公司/行业一直在寻求最大限度地利用信息来提升业务能力。它们产生了大量以多种不同形式快速生成的数据,如生产数据库、交易历史、网络流量日志、在线视频、社交媒体互动等。2001 年,道格·兰尼 (Doug Laney) 最初提出了“三个 V”的概念——体量 (volume)、速度 (velocity) 和多样性 (variety)。
- 体量 - 数据的绝对数量
- 速度 - 数据处理的速度
- 多样性 - 数据的种类数量
我们为什么需要它?
在过去的十年里,IT 数据呈指数级增长。我们生活在大数据时代。很难衡量电子存储的总数据量,但 IDC 估计 2006 年“数字宇宙”的大小为 0.18 泽字节,并预测到 2011 年将增长十倍,达到 1.8 泽字节。行业基准也显示了同样的情况。大数据有助于应用程序开发团队无缝适应现代技术概念
- 高可用性
- 横向扩展架构
- 故障转移恢复
- 高度分布式
在技术革命的推动下,企业更加关注其用例中的供需关系。随着数据量从 8 EB 增长到 8K EB,以及硬盘成本从 200 万美元下降到 2 美分,企业正被推向大数据技术的实施。
它如何分类?
已经实现了四种不同的数据模型。
1. 列族或宽列存储
2. 文档存储
3. 键/值存储
4. 图形数据存储
示例:Cassandra, HBase
典型用途:分布式数据存储
列族基本上扩展了典型的键/值对存储,并提供了两级嵌套。嵌套的键/值对称为列族存储中的列。每个列都可以与一个键分组,从而提供超级列功能。典型的用例将是具有大量读/写操作的应用程序。
2. 文档存储示例:CouchDB, MongoDBv
典型用途:Web 应用程序。
基于文档的数据模型用于存储和检索面向文档或半结构化的信息,这在基于 Web 的应用程序中很常见。通常,所有现代的基于文档的数据模型都遵循 XML、JSON 或 BSON 模型,这些模型易于与数据模型映射。它还使 API 能够轻松地与数据库交互,因为大多数现代语言都内置支持这些格式。
3. 键/值存储示例:Membase, Redis
典型用途:分布式哈希表、缓存
键/值存储功能类似于典型的哈希表,它将值与键相关联。这将允许创建无模式的存储。当读操作多于写操作时,它将发挥最佳作用。例如,我们每 2 分钟需要显示论坛上的最新帖子,因此有意义的做法是每 2 分钟运行一个后台作业,将其存储为键值对,然后从那里读取数据。许多内容密集型网站使用内存键/值对存储系统。
示例:Neo4J, InfoGrid
典型用途:社交网络、推荐
图数据模型通常遵循图算法,以节点和边的方式存储数据。图数据库最适合我们遇到递归数据模型的情况。每个节点都将具有以下属性:节点属性、关系(边)。
任务
目标
以一个问题陈述为例,解释如何使用大数据 Cassandra 存储构建 .NET 应用程序。在此,我们将使用两个事务系统——借记 (Debit)、贷记 (Credit) 和两个参考系统——客户 (Customer)、汇率 (ForexRate) 进行简单的配置文件计算。
传统
针对给定的问题,使用关系数据存储开发了应用程序。关系数据库模型最早由 Edgar F. Codd 在 1969 年底提出。关系数据模型的基本假设是,数据表示为数学 n 元关系,它是笛卡尔积的子集。数据通过集合论的关系代数进行访问,并通过主键和外键等约束来保证一致性。
在当前的数据湖世界中,随着行业数据的增长,传统的数据存储方法将无法扩展。
目标状态
本文将讨论如何使用大数据 Cassandra 后端存储而不是传统方法来构建 .NET 应用程序。本文从架构设计到 .NET 代码实现,都解决了这个问题。
架构
基础结构
由于它采用三层架构,因此基础设施需要表示层、业务层和存储层(Cassandra)。借助高度可用的点对点集群模型的优势,Cassandra 层使用 2 节点集群构建。
业务层和存储层使用名为 CassandraSharp 的大数据 Cassandra 连接器连接。您可以在 GitHub 参考 上找到有关 CassandraSharp 的更多信息。
逻辑
逻辑架构定义了提供业务需求所需的活动和功能的流程。逻辑架构独立于技术和实现。
在我们的任务中,功能被划分为 6 个类别。控制台登录是输入层,结果是输出。全局数据容器是跨应用程序的数据持有者。关键功能涵盖在剩余的 3 个区域:加载器 (Loader)、业务引擎 (BusinessEngine) 和数据访问 (DataAccess)。
加载器模块在初始 (AppInit) 过程中加载事务、参考和业务规则。业务引擎是根据输入选择计算规则的业务层。数据访问是数据连接层,用于将信息加载/存储到 Cassandra 存储中。
Data
给定的问题陈述涉及 2 个事务数据和 2 个参考数据。它们分别标记为借记 (Debit)、贷记 (Credit) 和客户 (Customer)、汇率 (ForexRate)。下图描绘了这一点。
实现
针对给定的问题陈述,我们已经涵盖了目标、基础设施、逻辑和数据架构。现在让我们使用 Cassandra 作为数据存储,深入探讨使用 .NET 编程的实际实现。
Cassandra 查询
由于我们将 Cassandra 作为存储,因此 Cassandra 表(列族)会根据我们的数据模型在后台创建。创建 Cassandra 表(列族)的实际 CQL(Cassandra 查询语言)如下所示。
CREATE TABLE Debit (
Trans_id int PRIMARY KEY,
Customer_id int,
Trans_amount varchar,
Trans_date varchar
)
CREATE TABLE Customer (
Customer_id int PRIMARY KEY,
Name varchar,
Location varchar
)
Cassandra 连接器
传统上,我们知道 ADO.NET 为 SQL Server 和 XML 等数据源提供一致的访问,以及通过 OLE DB 和 ODBC 暴露的数据源。因此,ADO.NET 将数据访问与数据操作分离成离散的组件,这些组件可以单独或协同使用。ADO.NET 包含 .NET Framework 数据提供程序,用于连接数据库、执行命令和检索结果。
同样,可以使用一个名为 CassandraSharp 的开源工具以编程方式连接大数据存储 Cassandra。它是 Apache Cassandra 为高性能 .NET 驱动程序提供的贡献的一部分。命名空间 CassandraSharp 包含 ClusterManager、TransportConfig、ClusterConfig、BehaviourConfig 等。GitHub 参考页面可在 https://github.com/pchalamet/cassandra-sharp 获得。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Data;
using Apache.Cassandra;
using CassandraSharp;
using CassandraSharp.Config;
namespace DataAccess
{
public abstract class BaseDataAccess : IDisposable
{
private string[] myClusters;
private int myPort;
public BaseDataAccess(string[] clusters, int port)
{
myClusters = clusters;
myPort = port;
}
protected ICluster GetCluster()
{
CassandraSharpConfig config = new CassandraSharpConfig();
ClusterConfig clusterConfig = new ClusterConfig();
TransportConfig transConfig = new TransportConfig();
clusterConfig.Name = "TestCassandra";
transConfig.Port = myPort;
clusterConfig.Transport = new TransportConfig();
EndpointsConfig endPointConfig = new EndpointsConfig();
endPointConfig.Servers = myClusters;
endPointConfig.Snitch = SnitchType.Simple;
endPointConfig.Strategy = EndpointStrategy.Nearest;
BehaviorConfig behaveConfig = new BehaviorConfig();
behaveConfig.KeySpace = ConfigEntries.DefaultDatabase;
if (!String.IsNullOrWhiteSpace(ConfigEntries.UserName)) behaveConfig.User = ConfigEntries.UserName;
if (!String.IsNullOrWhiteSpace(ConfigEntries.Password)) behaveConfig.Password = ConfigEntries.Password;
behaveConfig.ReadConsistencyLevel = Apache.Cassandra.ConsistencyLevel.ONE;
behaveConfig.WriteConsistencyLevel = Apache.Cassandra.ConsistencyLevel.ONE;
clusterConfig.Transport = transConfig;
clusterConfig.Endpoints = endPointConfig;
clusterConfig.BehaviorConfig = behaveConfig;
config.Clusters = new ClusterConfig[] { clusterConfig };
//We need to ensure that the connection is not initialized before configuring...
ClusterManager.Shutdown();
ClusterManager.Configure(config);
ICluster cluster = ClusterManager.GetCluster("TestCassandra");
return cluster;
}
protected DataTable ConvertCqlResultToDataTable(CqlResult result, string tableName)
{
DataCommon common = new DataCommon();
DataTable store = common.GetSchema(result, tableName);
return PopulateData(result, common, store);
}
private DataTable PopulateData(CqlResult result, DataCommon common, DataTable store)
{
string columnName = string.Empty;
foreach (CqlRow row in result.Rows)
{
DataRow dataRow = store.NewRow();
foreach (Column column in row.Columns)
{
columnName = common.GetValue<string>(column.Name);
dataRow[columnName] = common.GetValue(store.Columns[columnName], column.Value);
}
store.Rows.Add(dataRow);
}
return store;
}
public void Dispose()
{
ClusterManager.Shutdown();
}
}
}
在我们的 DataAccess 对象中,GetCluster 方法从应用程序配置文件中检索 Cassandra 集群的详细信息。它涵盖了集群的完整详细信息,如服务器地址、用户凭据、一致性级别、端点策略等。
我们需要一个通用方法来根据给定的 Cassandra 表名以 DataTable 的形式获取结果。ConvertCqlResultToDataTable 方法满足此要求。
PopulateData 方法是前一个方法的内部方法。PopulateData 使用元数据读取 Cassandra 表的每一行和每一列;然后将结果以 DataTable 格式返回。
数据类型访问
.NET 框架和 Cassandra 存储数据类型的表示方式不同。本节介绍这两个技术数据类型的同步。Cassandra 按列存储所有内容,这些列内部由三个属性组成:
属性 | 类型 |
名称 | CompareWith 类型 |
值 | 二进制 |
时间戳 | 64 位整数 |
带 CompareWith 类型的名称在配置中设置,可以是 ASCII、UTF8、LexicalUUID、TimeUUID、Long 或 Bytes。换句话说,在 .NET 世界中,它们可以是 string、Guid、DateTime、long 或 byte[]。Value 只能是 Bytes 或 byte[] 类型。Timestamp 用于 Cassandra 服务器之间的同步,不应直接控制。下图描绘了设置并保存列的 Value 属性时发生的情况。
从您将属性设置为所选类型到它在 Cassandra 中保存,它会经历两个您可能不知道的步骤,首先,类型被序列化并存储在 Fluent Cassandra 的灵活 BytesType 中,该类型足够智能,可以了解如何将常见的运行时类型序列化为二进制,这样您作为开发人员就无需担心低级别地与 Cassandra 数据库交互。这个智能类型系统也是 ASCII、UTF8、LexicalUUID、TimeUUID、Long 和 Bytes 类型的主要驱动程序,它们也有助于正确序列化列的 Name 属性。
有了这些概念,就创建了下面的 DataCommon 类来处理 .NET 和 Cassandra 存储之间的所有数据类型。
namespace DataAccess
{
internal class DataCommon
{
internal DataTable GetSchema(CqlResult result, string tableName)
{
if (result != null && result.Type == CqlResultType.ROWS)
{
return BuildTable(result.Schema, tableName);
}
else throw new ArgumentNullException("result", "'result' parameter must not be empty and it should contain atleast one row");
}
internal DateTime GetDate(byte[] value)
{
if (BitConverter.IsLittleEndian) Array.Reverse(value);
return GetDateTimeFromLong(BitConverter.ToInt64(value, 0));
}
internal string GetName(byte[] value)
{
return GetValue<string>(value);
}
static IDictionary<string,>> dataProcessors;
private IDictionary<string,>> GetDataProcessors()
{
if (dataProcessors == null)
{
//TODO: More data type processors needs to be added.
dataProcessors = new Dictionary<string,>>();
dataProcessors["string"] = (byteValue) => GetValue<string>(byteValue);
dataProcessors["decimal"] = (byteValue) => GetIntValue(byteValue);
dataProcessors["double"] = (byteValue) => GetValue(byteValue);
dataProcessors["bool"] = (byteValue) => GetValue<bool>(byteValue);
dataProcessors["int"] = (byteValue) => GetIntValue(byteValue);
dataProcessors["long"] = (byteValue) => GetValue<long>(byteValue);
dataProcessors["datetime"] = (byteValue) => GetDate(byteValue);
}
return dataProcessors;
}
internal object GetValue(DataColumn column, byte[] value)
{
return GetDataProcessors()[column.DataType.Name.ToLower()](value);
}
internal decimal GetDecimalValue(byte[] value)
{
//check that it is even possible to convert the array
if (value.Count() != 16)
throw new Exception("A decimal must be created from exactly 16 bytes");
//make an array to convert back to int32
Int32[] bits = new Int32[4];
for (int i = 0; i <= 15; i += 4)
{
//convert every 4 bytes into an int32
bits[i / 4] = BitConverter.ToInt32(value, i);
}
return new decimal(bits);
}
internal double GetValue(byte[] value)
{
if (BitConverter.IsLittleEndian)
Array.Reverse(value); //need the bytes in the reverse order
return BitConverter.ToDouble(value, 0);
}
internal int GetIntValue(byte[] value)
{
if (BitConverter.IsLittleEndian)
Array.Reverse(value); //need the bytes in the reverse order
return BitConverter.ToInt32(value, 0);
}
internal T GetValue<t>(byte[] value)
{
return (T)Convert.ChangeType(Encoding.Default.GetString(value), typeof(T));
}
internal long GetDateTimeInLong(DateTime value)
{
DateTime Epoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
TimeSpan elapsedTime = value - Epoch;
return (long)elapsedTime.TotalSeconds;
}
internal DateTime GetDateTimeFromLong(long value)
{
return new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc).AddSeconds(Math.Round(value * 1.0));
}
private DataTable BuildTable(CqlMetadata metadata, string tableName)
{
DataTable dataStore = new DataTable();
foreach (KeyValuePair column in metadata.Value_types)
{
DataColumn dataColumn = new DataColumn();
dataColumn.ColumnName = GetValue(column.Key);
dataColumn.DataType = GetColumnType(column.Value);
dataStore.Columns.Add(dataColumn);
}
return dataStore;
}
static IDictionary typeProvider;
private IDictionary GetCqlToDotNetTypeProviders()
{
if (typeProvider == null)
{
typeProvider = new Dictionary();
typeProvider["AsciiType"] = typeof(string);
typeProvider["BytesType"] = typeof(byte[]);
typeProvider["BooleanType"] = typeof(bool);
typeProvider["CounterColumnType"] = typeof(int);
typeProvider["DateType"] = typeof(DateTime);
typeProvider["DecimalType"] = typeof(decimal);
typeProvider["DoubleType"] = typeof(double);
typeProvider["DynamicCompositeType"] = typeof(string);
typeProvider["FloatType"] = typeof(decimal);
typeProvider["IntegerType"] = typeof(int);
typeProvider["LexicalUUIDType"] = typeof(Guid);
typeProvider["LongType"] = typeof(long);
typeProvider["TimeUUIDType"] = typeof(DateTime);
typeProvider["UTF8Type"] = typeof(string);
typeProvider["UUIDType"] = typeof(Guid);
}
return typeProvider;
}
private Type GetColumnType(string cqlType)
{
return GetCqlToDotNetTypeProviders()[cqlType];
}
}
}
类型转换就足以告诉 BytesType 对象如何将二进制数据反序列化为 .NET 可理解的运行时类型。这一切都通过大量的运算符魔法完成,但结果是一样的。您从数据库中获取的数据类型与您输入到数据库中的类型相同。
数据访问对象
在业务层实现方面,DAO (Data Access Object) 是连接和处理的关键。在我们的练习中,创建了如下的 2 个事务 DAO 和 2 个参考 DAO。
namespace DataAccess { public class CreditDAO : BaseDataAccess, ISelectAllData, ISelectData { public CreditDAO() : base(ConfigEntries.Clusters, ConfigEntries.Port) { } DataTable ISelectData.GetSpecificData(string query, object[] parameters) { CqlResult result = base.GetCluster().ExecuteCql(string.Format(query, parameters)); return ConvertCqlResultToDataTable(result, "Credit"); } DataTable ISelectAllData.GetData() { CqlResult result = base.GetCluster().ExecuteCql(DbConstants.SelectCreditData); return ConvertCqlResultToDataTable(result, "Credit"); } } }
DAO 通过 BaseDataAccess 对象进行扩展。SelectData 接口根据给定参数检索特定数据。而 SelectAllData 接口则获取特定 DAO 的全部数据。
数据公共
作为应用程序中的公共/全局配置条目,ConfigEntries 对象在 Common 命名空间下创建。我们的 ConfigEntries 类具有集群服务器、端口、默认数据库、用户凭据等通用属性。
namespace Common { public class ConfigEntries { public static string[] Clusters = ConfigurationManager.AppSettings["Clusters"].Split(new string[] { "|" }, StringSplitOptions.RemoveEmptyEntries); public static int Port = Convert.ToInt32(ConfigurationManager.AppSettings["Port"]); public static string DefaultDatabase = ConfigurationManager.AppSettings["DefaultDatabase"]; public static string UserName = ConfigurationManager.AppSettings["UserName"]; public static string Password = ConfigurationManager.AppSettings["Password"]; } }
通过编写所有这些核心模块代码,我们的目标已根据架构设计和实现的基线得以实现。最终,它有助于/指导我们使用 Cassandra 存储来构建端到端的应用程序。.NET 技术。
关注点
希望您会觉得了解大数据-Cassandra 和 .NET 编程概念以及存储连接理念很有趣,而不是仅仅进行传统的代码级实现。
历史
- 版本 1.0 - 初始版本。