65.9K
CodeProject 正在变化。 阅读更多。
Home

在 Azure 上应用 Lambda 架构

starIconstarIconstarIconstarIconstarIcon

5.00/5 (5投票s)

2017年3月23日

CPOL

11分钟阅读

viewsIcon

28315

使用 Lambda 架构原则和 Microsoft Azure 云设计和开发简单的分析系统

目录

引言

在本文中,我们将首先简要概述 Lambda 架构的设计和用于构建实时数据处理系统的原则。然后,我们将设计一个具有 Lambda 架构所需特性的简单分析系统。我们的分析系统将托管在 Azure 云上,并利用 HDInsight、Azure Redis、Azure Service Bus 等 Azure 服务。之后,我们将把系统部署到云端,并对主要场景进行集成测试。最后,我们将对所采用的设计方法进行总结。

源代码

源代码可在 GitHub 上获取。

Lambda 架构概述

Lambda 架构之所以成立,是因为在构建健壮、容错、可扩展且能提供近实时分析结果的系统时,没有单一的工具或技术能够胜任。Lambda 架构方法不是使用单一工具,而是建议将系统拆分为三个层:批量层、实时层和服务层。每个层使用自己的一套技术,并具有自己独特的属性。

批量层

批量层实际上做了两件事:将新传入的数据追加到称为主数据集的单一存储中,并针对整个主数据集执行长时间、计算密集型的计算。

主数据集可以是一个数据库或分布式文件系统,批量层会将新记录添加进去。批量层从不使用最新的记录覆盖现有记录,而是保留所有记录。这是重要的不可变性属性,它能够可靠地防止错误和数据损坏。

批量层的另一项职责是不断地重新计算整个主数据集,并提供批量视图——一种读取延迟低的结构化数据,可用于回答传入的用户查询。

实时层

实时层在每次出现新数据时都会预先计算传入的数据。与批量层不同,实时层不重新计算整个主数据集,而是使用增量方法。实时层预计算的结果称为实时视图

批量处理与流处理的区别

为了理解批量处理和流处理之间的区别,让我们计算一个数字流的平均值:1, 4, 6, 12, 17。根据描述,批量处理应该将所有流数字持久化到数组 [1, 4, 6, 12, 17] 中,然后计算整个数组的平均值:(1+4+6+12+17) / 5 = 8。

流处理在每次出现新数字时计算平均值。我们可以使用以下公式:realtime_avg = (realtime_avg * n + x)/(n + 1)

其中
realtime_avg - 初始值为 0,并保持输入流的当前平均值
n - 已处理的数字数量
x - 流中的当前传入数字。

对于我们的示例,
当数字 1 出现时,realtime_avg = (0*0+1)/(0+1)=1,n=1
当数字 4 出现时,realtime_avg = (1*1+4)/(1+1)=2.5,n=2
当数字 6 出现时,realtime_avg = (2.5*2+6)/(2+1)=3.66666[6],n=3
当数字 12 出现时,realtime_avg = (3.66666[6]*3+12)/(3+1)=5.75,n=4
当数字 17 出现时,realtime_avg = (5.75*4+17)/(4+1)=8,n=5

正如你所见,最终批量处理和流处理都产生了相同的等于 8 的结果。区别在于批量处理在实时层为每个新传入数字提供中间结果时,它不提供。

服务层

由于批量层通常速度较慢,可能比实时层滞后几分钟甚至几个小时,因此 Lambda 架构还设想了另一个层——服务层,该层合并批量层和实时层计算的结果。这样,服务层就可以提供最新的计算结果。

Lambda 架构的属性

以下属性自然地源于 Lambda 架构设计方法。

  • 健壮性和容错性——系统能够容忍机器故障和人为错误,例如数据损坏。批量视图和实时视图始终可以从主数据集中重新计算。
  • 低延迟读写——Lambda 架构可以在不损害健壮性的前提下实现两者。
  • 可扩展性——所有层都可以独立扩展。
  • 通用性——Lambda 架构可用于大量不同的应用程序。

Meetup 分析演示

既然我们已经了解了 Lambda 架构允许我们获得批量处理和流处理的最佳优势,并且可以有效地用于构建实时数据处理系统。现在,是时候深入了解 Lambda 架构的细节了,我们将尝试实现一个简单的数据分析系统。

首先,我们需要找到一个数据流来分析。通过简单的谷歌搜索,我找到了一个公开的 Meetup 事件流 API(流的视觉化:http://meetup.github.io/stream/rsvpTicker)。Meetup 是一个允许人们组织具有相似兴趣的会议并讨论体育、学习、摄影等话题的服务。该服务提供公开 API,其中包含特定地点参加会议的人员的流式信息。一旦有人通过移动或 Web 应用程序申请参加会议,这些信息几乎可以实时地在流中可用。

现在,我们将设计一个数据处理系统,该系统可以分析有多少人将参加伦敦的一个 Meetup 活动,当然,我们的设计将基于 Lambda 架构。让我们开始设计每个层,并考虑以下示例

在示例中,我们可以看到一个包含四条 Meetup 消息的时间轴——4 位用户决定参加伦敦的活动。

该问题的批量层设计非常直接:系统应将每个用户消息持久化到数据存储中,并持续运行一个作业,按事件 ID 和地点聚合存储中的所有消息。在上面的示例中,聚合在 08:01:00 发生,并生成一个批量视图,其中包括聚合完成的时间和计划参加该事件的人数:08:01:00 和 3。

实时层使用完全增量的方法,只是在每当有新用户申请参加活动时增加访客计数器。但我们不会为每个事件和地点设置一个单独的计数器,而是将传入请求分组到时间段中,并为每个时间段保留计数器。时间段的大小可以根据系统负载而变化。在本例中,我们将时间段大小选择为 1 分钟。从图中可以看出,当 Meetup 分析系统收到查询时,实时层中有两个时间段:第一个 08:00:00-08:01:00 时间段的值为 3,第二个 08:01:00-08:02:00 时间段的值为 1。

为了回答“有多少人将参加伦敦的活动?”这个问题,在 08:02:30 时,服务层只需使用批量层和实时层计算的结果。首先,服务层加载批量视图——“到 08:01:00 为止有 3 人计划参加该活动”,然后服务层需要加载自上次批量聚合(08:01:00)至今(08:02:00)的所有时间段。在本例中,只有一个时间段满足此条件——08:01:00-08:02:00 时间段,其值为 1。通过将批量层的值 3 与实时层的值 1 合并,服务层响应 4——这是关于活动访客数量的最接近实时的信息。

为 Meetup 分析选择 Azure 服务

现在是时候将上一节中描述的想法变为现实了。我们需要为我们系统的每个层选择工具和技术,正如你可能已经猜到的那样,我们将使用 Azure 云和相关的服务。当然,也可以是不同的 Azure 服务组合,在解决特定问题时各有优缺点,但考虑到服务的可靠性、可扩展性、可扩展性以及在 Lambda 架构设计方面的适用性,我选择了以下组合。

基础结构

  • Azure Service Bus (1) - 消息代理,保证向批量层和实时层可靠地传递消息。
  • Service Fabric - 负责在 Azure 实例上发布服务并进行扩展。
  • ARM 模板 - 将 Meetup 分析系统部署到 Azure 云的基础设施。

批量层

  • HDInsight Blob Storage (3) - 分布式文件系统(Hadoop),代表 Meetup 分析的主数据集。
  • Hive (2) - 允许使用类似 SQL 的语法运行 Hadoop 作业并对 HDInsight Blob Storage 中的数据执行密集型聚合。

实时层

  • Redis (4) - 内存中、键值对、NoSQL 数据存储,具有快速的读写操作。

服务层

  • 由简单的 REST 服务(5) 表示,该服务对用户请求公开可用。

 

批量层实现

从上图可以看出,批量层服务监听 Azure Service Bus 主题并将所有传入的消息持久化到 blob 存储容器(MasterDataset 容器)。

 
// gets reference on master dataset container in Azure blob storage
CloudBlobContainer container = blobClient.GetContainerReference(storageConfiguration.MasterDataSetContainerName);

// appends new block blob with unique name to master dataset 
CloudBlockBlob blockBlob = container.GetBlockBlobReference(
$"meetup_{DateTime.UtcNow.ToString("yyyyMMddHHmmss")}_{Guid.NewGuid()}");

// uploads Json Meetup message to the block blob
await blockBlob.UploadTextAsync(JsonConvert.SerializeObject(message));      

每条消息都作为 JSON 文本存储在单独的 blob 实例中。以下 Hive 查询允许将 blob 存储容器中的非结构化数据映射到逻辑 Hive 表——MeetupMessages。

-- references to master dataset on the Azure Blob storage 
DROP TABLE IF EXISTS MasterDataset;
CREATE EXTERNAL TABLE MasterDataset (textcol string) STORED AS TEXTFILE LOCATION '%path-to-master-dataset-in-azure-blob-storage%'; 

-- table with Json records
DROP TABLE IF EXISTS MeetupMessages; 
CREATE EXTERNAL TABLE MeetupMessages
(
  json_body string
)
STORED AS TEXTFILE LOCATION '/a';

-- populates logical Hive table with Json data from Azure Blob storage 
INSERT OVERWRITE TABLE MeetupMessages 
SELECT CONCAT_WS(' ', COLLECT_LIST(textcol)) AS singlelineJSON
FROM (SELECT INPUT__FILE__NAME, BLOCK__OFFSET__INSIDE__FILE, textcol FROM MasterDataset DISTRIBUTE BY INPUT__FILE__NAME SORT BY BLOCK__OFFSET__INSIDE__FILE) x
GROUP BY INPUT__FILE__NAME;

现在 MeetupMessages 表包含所有 JSON Meetup 消息,并可作为聚合事件 ID 和地点的消息源。以下 Hive 查询在 Azure Blob Storage 中创建外部表“Result”,并填充了访客人数信息。

-- creates a table associated with blob in Azure Blob storage
-- that contains last aggregation time and number of visitors 
DROP TABLE IF EXISTS Result;
CREATE EXTERNAL TABLE Result (aggTimestamp string, count int)
PARTITIONED BY (rsvp_id string, group_country string, group_city string) 
STORED AS TEXTFILE 
LOCATION '{azure-blob-storage-url}';

-- populates the Result table with result of master dataset aggregation
INSERT OVERWRITE TABLE Result 
PARTITION (rsvp_id, group_country, group_city) 
SELECT ‘{job-execution-time}' as aggTimestamp
      ,Count(*) as count
      ,GET_JSON_OBJECT(MeetupMessages.json_body,'$.rsvp_id') as rsvp_id
      ,GET_JSON_OBJECT(MeetupMessages.json_body,'$.group.group_country') as group_country
      ,GET_JSON_OBJECT(MeetupMessages.json_body,'$.group.group_city') as group_city 
FROM MeetupMessages 
WHERE GET_JSON_OBJECT(MeetupMessages.json_body,'$.mtime') < ‘{job-execution-time}' 
GROUP BY GET_JSON_OBJECT(MeetupMessages.json_body,'$.rsvp_id'),
         GET_JSON_OBJECT(MeetupMessages.json_body,'$.group.group_country'),
         GET_JSON_OBJECT(MeetupMessages.json_body,'$.group.group_city');

实时层实现

开始实现实时层时,我们不仅需要考虑如何保存每一部分新数据,还需要考虑如何查询预先计算的实时视图。这就是为什么我们将更新两个 Redis 有序集。第一个有序集(计数器有序集)将存储带有 Meetup 访客数量的时间段,例如:

有序集

成员分数
-------------------------------------------------------------------------------------
counter_event1_UK_London6358723206000000003
 6358723206000000101

看起来有序集包含了所有必需的分析信息:每个事件和地点的带有访客数量的时间段。拥有另一个有序集的原因是,服务层应该只查询在批量视图计算之后创建的时间段。为了仅过滤最新的时间段,我们可以使用 Redis 的 ZREMRANGEBYLEX 命令,该命令要求有序集中的分数等于 0 才能返回正确的结果。因此,我们将有另一个带有时间段(时间段有序集)的有序集,对其应用 ZREMRANGEBYLEX 命令,并将结果与计数器有序集相交。

正如你所注意到的,计数器和时间段有序集必须始终同步,因此这两个有序集的更新必须包装在 Redis 事务中。最后,我们得到了以下实时层实现:

        public async Task ProcessMessageAsync(MeetupMessage message)
        {
            // ceilings Meetup message timestamp to seconds
            // and considers number of ticks as ID of time basket the message belongs to
            DateTime messageTime = new DateTime(message.Timestamp.Year, message.Timestamp.Month, message.Timestamp.Day, message.Timestamp.Hour, message.Timestamp.Minute, 0);
            long basket = messageTime.Ticks;

            // counter key keeps all baskets with number of visitors in each basket
            string counterKey = $"counter_{message.EventId}:{message.Group.Country}:{message.Group.City}";

            // basket key keeps all basket IDs in Redis without counter
            string basketsKey = $"basket_{message.EventId}:{message.Group.Country}:{message.Group.City}";

            IDatabase db = _multiplexer.GetDatabase(_redisConfiguration.Database);

            // transaction guarantees that update of both sorted sets
            // occurs atomically
            ITransaction transaction = db.CreateTransaction();
            transaction.SortedSetIncrementAsync(counterKey, basket, 1);
            transaction.SortedSetAddAsync(basketsKey, basket, 0);
            await transaction.ExecuteAsync();
        }

服务层实现

服务层是处理查询的公共 API。Meetup 分析服务层可以回答这样的问题:有多少人计划参加特定地点的活动。为了回答这个问题,服务层应该从 Azure Blob Storage 加载最新的批量视图。

      // builds batch view URL from query parameters
      string batchViewKey = $"data/batchviews/rsvp_id={meetupId}/group_country={country}/group_city={city}";
      CloudBlobContainer container = _blobClient.GetContainerReference(batchViewKey);
      CloudBlockBlob blob = container.GetBlockBlobReference(_blobName);
      
      // load batch view result
      string text = blob.DownloadText();

      // batch view text contains two values: the time of last batch view computation and number of visitors
      // so parse both values
      var tuple = text.Split(new[] { "\u0001" }, StringSplitOptions.RemoveEmptyEntries);
      double time = double.Parse(tuple[0]);

      // calculates lastest basket included in the batch view
      long from = time.UnixTimeStampToDateTime().Ticks;

      // number of visitors according to batch layer calculation
      long batchLayerResult = long.Parse(tuple[1]);

并从实时层获取访客数量。

      // key of counter sorted set
      string redisCounterKey = $"counter_{meetupId}:{country}:{city}";
      
      // key of baskets sorted set
      string redisBasketKey = $"basket_{meetupId}:{country}:{city}";

      string resultKey = "analyticsResult";

      var db = _multiplexer.GetDatabase(_redisConfiguration.Database);

      // executes LUA script in Redis that
      // 1. filters baskets from baskets sorted set
      // 2. intersects baskets and counter sorted sets saving intersection result in temporary sorted set
      // 3. sums scores of all basket in temporary sorted set 
      long speedLayerResult = 
           (long)db.ScriptEvaluate(LuaScript.Prepare(@"
            redis.call('ZREMRANGEBYLEX', @redisBasketKey, '[0', @value);
            redis.call('ZINTERSTORE', @resultKey, 2, @redisBasketKey, @redisCounterKey);

            local sum=0
            local z=redis.call('ZRANGE', @resultKey, 0, -1, 'WITHSCORES')

            for i=2, #z, 2 do 
                sum=sum+z[i]
            end
            return sum"),
      new
      {
           redisBasketKey = (RedisKey)redisBasketKey,
           resultKey = (RedisKey)resultKey,
           redisCounterKey = (RedisKey)redisCounterKey,
           value = (RedisValue)$"[{from - 1}"
      });

批量层和实时层结果的合并操作只是两者的总和。

      long sum = batchLayerResult + speedLayerResult;

让我们来测试一下

现在,该用自动化测试来覆盖上面描述的 Meetup 分析场景了。我们将编写一个集成测试,该测试需要将所有数据库和 Azure 服务发布到云端,但批量层、实时层和服务层类在本地机器上的单个进程中运行。

        /// <summary>
        /// 01/01/2016-------* UK, London(08:00:10)---------------------------------------------------01/01/2016
        /// 01/01/2016---------------* UK, London(08:00:20)-------------------------------------------01/01/2016
        /// 01/01/2016----------------------------* UK, London(08:00:50)------------------------------01/01/2016
        /// 01/01/2016-------------------------------------^ Batch View Recalculation (08:01:00)------01/01/2016
        /// 01/01/2016------------------------------------------------------* UK, London(08:01:40)----01/01/2016
        /// </summary>
        [Fact]
        public void Should_Merge_BatchAndSpeedLayerViews()
        {
            // arrange
            var meetupEventId = 1;
            MeetupMessage meetupLondon1 = CreateMeetupMessageFromTemplate(meetupEventId, 
            new DateTime(2016, 01, 01, 08, 00, 10),
            "UK", "London");
            MeetupMessage meetupLondon2 = CreateMeetupMessageFromTemplate(meetupEventId, 
            new DateTime(2016, 01, 01, 08, 00, 20), 
            "UK", "London");
            MeetupMessage meetupLondon3 = CreateMeetupMessageFromTemplate(meetupEventId, 
            new DateTime(2016, 01, 01, 08, 00, 50), 
            "UK", "London");
            MeetupMessage meetupLondon4 = CreateMeetupMessageFromTemplate(meetupEventId,
            new DateTime(2016, 01, 01, 08, 01, 40), 
            "UK", "London");
            
            StreamProducer producer = new StreamProducer(DeploymentConfiguration.Default.ServiceBus);
            producer.Produce(meetupLondon1);
            producer.Produce(meetupLondon2);
            producer.Produce(meetupLondon3);
            producer.Produce(meetupLondon4);

            // stream layer processing
            var streamLayerConsumer = new SpeedLayer.MeetupMessageConsumer(
                new MeetupStreamAnalytics(DeploymentConfiguration.Default.Redis),
                DeploymentConfiguration.Default.ServiceBus);

            // wait until speed layer proceeds all messages, should create two baskets in Redis
            while (streamLayerConsumer.ProcessedMessagesCount != 4) { } 

            // batch layer processing
            var batchLayerConsumer = new BatchLayer.MeetupMessageConsumer(
                new MeetupRepository(DeploymentConfiguration.Default.Storage),
                DeploymentConfiguration.Default.ServiceBus);

            // wait until batch layer proceeds all messages, should create 4 blobs in MasterDataset container
            while (batchLayerConsumer.ProcessedMessagesCount != 4) { } 

            // batch view recalculation
            RecalculationJob job = new RecalculationJob(
                DeploymentConfiguration.Default.Storage,
                DeploymentConfiguration.Default.HDInsight);

            // should produce batch view with number of visitors equals 3
            job.Execute(new DateTime(2016, 01, 01, 08, 01, 00)); 

            // act
            // should merge batch view result with one basket of speed layer
            long count = new MeetupQueries().GetAllMeetupVisitors(1, "UK", "London"); 

            // assert
            Assert.Equal(4, count);
        }

结论

我们使用 Lambda 架构成功设计了实时系统。我们成功地将 Redis、Hive、Hadoop、Azure Service Bus 等许多不同的服务和工具集成到单一的数据处理解决方案中。我们使用了多种语言:C#、LUA、HQL(Hive SQL),在最适合的地方使用了它们。我们甚至使用自动化的集成测试测试了 Happy Path 场景。

之前,我们讨论了 Lambda 架构的属性。现在,让我们验证 Meetup 分析系统是否拥有所有这些属性。

  • 健壮性和容错性。 Meetup 分析系统完全能够容忍机器实例故障,Azure 云为我们使用的 HDInsight 群集、Redis 群集和其他虚拟机和服务提供了 99% 的 SLA 保证。

    你可能知道,Redis 是一个内存数据库,始终存在丢失 Redis 中数据的可能性——这是 Redis 开发人员为了高 I/O 性能付出的代价。但 Meetup 分析设计即使在这种糟糕的情况下也能自动处理。如果实时层丢失了所有数据,系统将返回批量层的计算结果,并且一旦实时层恢复运行,它将继续填充带有访客计数器的时间段。

    系统中另一个故障点——消息传递到批量层和实时层。存在以下可能情况:

    • 消息已传递到批量层,但在传递到实时层时失败。在这种情况下,实时层将返回不一致的访客数量,直到批量层完成重新计算并覆盖实时层的计算。
    • 消息已传递到实时层,但在传递到批量层时失败。在这种情况下,消息被认为是永久丢失的,因为它没有被添加到主数据集中。

     

  • 低延迟读写。 实时层中的 Redis 数据库同时提供了:Meetup 分析操作的低延迟读取和写入。但是批量层提供了系统的健壮性。
  • 可扩展性。 Meetup 分析的每个层都可以独立且自动地进行扩展。每个数据存储在设计上都支持群集。

正如你所注意到的,Lambda 架构假设对同一个问题进行两次处理:使用批量方法和增量方法。

就是这样。我希望你觉得本文和提供的代码示例有用且有趣。

参考文献

© . All rights reserved.