65.9K
CodeProject 正在变化。 阅读更多。
Home

使用 LambdaBiz 框架编排 AWS Lambda

emptyStarIconemptyStarIconemptyStarIconemptyStarIconemptyStarIcon

0/5 (0投票)

2020 年 2 月 24 日

CPOL

3分钟阅读

viewsIcon

14751

使用 LambdaBiz 框架在 AWS 中创建长时间运行的、有状态的、持久的和无服务器的编排

引言

AWS lambda 允许用户编写无服务器函数。 但是,lambda 函数的最大执行时间为 15 分钟,之后它会超时。 因此,在 AWS lambda 中编写长时间运行的进程是不可能的。 AWS 引入了 Step Functions 来克服这个缺点。 然而,学习状态机语言需要一个陡峭的学习曲线,而且服务本身也需要支付高昂的费用。

本项目的目的是让现有的 AWS C# 用户能够编写持久的长时间运行的编排。

Using the Code

Nuget 包

https://nuget.net.cn 安装 nuget 包。

Install-Package LambdaBiz -Version 1.0.0

源代码可在 GitHub 上找到,网址为 https://github.com/WorkMaze/LambdaBiz

耐用性

该框架依赖于 AWS SWF(简单工作流框架)来维护编排的持久性。 如果一系列任务已执行,并且该系列超时并再次被调用,则该框架将不会调用该系列中已执行的任务,并且编排将从上次运行期间中断的地方继续。

术语

  • OrchestrationFactory:AWS 中用于创建编排并将它们保存在持久存储中的存储(如果设置了该参数)。 持久存储是 AWS DynamoDB。 AWS SWF 存储编排的状态 1 年。
  • Orchestration:由唯一 OrchestrationId 标识的工作流实例。
  • Task:在编排中调用的 AWS lambda 函数,由唯一的 OperationId 标识。
  • Timer:由唯一的 TimerId 标识的计时器。
  • Event:由唯一的 EventId 标识的外部触发器。
  • Service:对外部 REST 服务的调用(GETPOSTPUTDELETE),由唯一的 OperationId 标识。

编排 Lambda 任务

以下代码片段演示了如何编排 AWS lambda 任务

/// Initialize orchestration factory for AWS
var orchestrationFactory = new AWSOrchestrationFactory
(awsAccessKeyID, awsSecretAccessKey, awsRegion, true,awsLambdaRole);

/// Create a new orchestration
var orchestration = await orchestrationFactory.CreateOrchestrationAsync("Sequence3");
try
{
            /// Start workflow
            await orchestration.StartWorkflowAsync("Workflow Started");

            /// Call AWS lambda task
            var a = await orchestration.CallTaskAsync<Numbers>("Number",
                        new Numbers {
                                    Number1 = 15,
                                    Number2 = 5
                                    },
                        "Operation1");

            var b = await orchestration.CallTaskAsync<OperationResult>("Sum", a, "Operation2");
            var c = await orchestration.CallTaskAsync<OperationResult>
                                        ("Difference", a, "Operation3");
            var d = await orchestration.CallTaskAsync<OperationResult>
                                        ("Product", a, "Operation4");
            var e = await orchestration.CallTaskAsync<OperationResult>
                                        ("Quotient", a, "Operation5");

            /// Complete workflow
            await orchestration.CompleteWorkflowAsync(e);
}
catch(Exception ex)
{
            /// Fail workflow
            await orchestration.FailWorkflowAsync(ex);
}

向您的编排添加计时器

您可以向您的编排添加具有指定持续时间的计时器。

/// Start timer
await orchestration.StartTimerAsync("30SecTimer", new TimeSpan(0, 0, 0, 30, 0));

等待外部输入

在编排中,例如审批流程,您将希望等待外部输入,例如用户点击按钮或电子邮件。 以下代码片段演示了如何实现这一点

/// Wait for user input
var approved = await orchestration.WaitForEventAsync<bool>("Approve");

在另一个编排中引发事件

当我们要从我们的编排向我们的编排发送一个输入时,因为该编排正在等待外部输入。

await orchestration.RaiseEventAsync("Approve", "Sequence3", true);

 

整合

无服务器模板

{
 "AWSTemplateFormatVersion" : "2010-09-09",
 "Transform" : "AWS::Serverless-2016-10-31",
 "Description" : "An AWS Serverless Application.",
 "Resources" : {

  "Process" : {
   "Type" : "AWS::Serverless::Function",
   "Properties": {
    "Handler": "LambdaBiz.Serverless::LambdaBiz.Serverless.Functions::Process",
    "Runtime": "dotnetcore2.1",
    "CodeUri": "",
    "MemorySize": 256,
    "Timeout": 30,
    "Role": null,
    "Policies": [ "AWSLambdaBasicExecutionRole" ]    
   }
  }
 }
}

创建长时间运行的 Lambda 函数

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using LambdaBiz.AWS;
using Newtonsoft.Json;
using Amazon.Lambda.Core;
using Amazon.Lambda.APIGatewayEvents;

// Assembly attribute to enable the Lambda function's JSON input 
// to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]

namespace LambdaBiz.Serverless
{
    public class Functions
    {
        /// <summary>
        /// Default constructor that Lambda will invoke.
        /// </summary>
        public Functions()
        {
        }
        public class Numbers
        {
            public int Number1 { get; set; }
            public int Number2 { get; set; }
        }

        public class Request
        {
            public string LambdaRole { get; set; }
            public Numbers Numbers { get; set; }
            public string OrchestrationId { get; set; }
        }
        public class OperationResult
        {
            public int Number1 { get; set; }
            public int Number2 { get; set; }
            public double Result { get; set; }
        }

        public async Task<Model.Workflow> ProcessAsync(Request request, ILambdaContext context)
        {
            /// Initialize orchestration factory for AWS
            var orchestrationFactory = new AWSOrchestrationFactory(true, request.LambdaRole);

            context.Logger.LogLine(JsonConvert.SerializeObject(request));
            /// Create a new .
            var orchestration = await orchestrationFactory.CreateOrchestrationAsync
                                (request.OrchestrationId);
            context.Logger.LogLine("Created");
            try
            {

                /// Start workflow
                await orchestration.StartWorkflowAsync("Workflow Started");
                context.Logger.LogLine("Started");

                /// Call AWS lambda task
                var a = await orchestration.CallTaskAsync<Numbers>
                             ("Number", request.Numbers, "Operation1");
                context.Logger.LogLine("Operation1");

                var b = await orchestration.CallTaskAsync<OperationResult>
                                   ("Sum", a, "Operation2");
                context.Logger.LogLine("Operation2");

                var c = await  orchestration.CallTaskAsync<OperationResult>
                                  ("Difference", a, "Operation3");
                context.Logger.LogLine("Operation3");

                var d = await orchestration.CallTaskAsync<OperationResult>
                                   ("Product", a, "Operation4");
                context.Logger.LogLine("Operation4");

                var e = await  orchestration.CallTaskAsync<OperationResult>
                                   ("Quotient", a, "Operation5");
                context.Logger.LogLine("Operation5");
                /// Start timer
                await orchestration.StartTimerAsync
                                   ("30SecTimer", new TimeSpan(0, 0, 0, 30, 0));
                context.Logger.LogLine("30SecTimer");

                /// Wait for user input
                var approved = await orchestration.WaitForEventAsync<bool>("Approve");
                context.Logger.LogLine("Approved");

                /// Complete workflow
                await orchestration.CompleteWorkflowAsync(e);
                context.Logger.LogLine("Complete");
            }
            catch (Exception ex)
            {
                /// Fail workflow
                await orchestration.FailWorkflowAsync(ex);
                context.Logger.LogLine("Fail");
                context.Logger.LogLine(ex.Message);
                context.Logger.LogLine(ex.StackTrace);
            }

            var currentState = await orchestration.GetCurrentState();
            return currentState;
        }

        /// <summary>
        /// Lambda function
        /// </summary>
        /// <param name="request"></param>
        /// <param name="context"></param>
        /// <returns></returns>
        public Model.Workflow Process(Request request, ILambdaContext context)
        {
            var result = ProcessAsync(request, context);
            result.Wait();
            return result.Result;
        }
    }
}

调用外部服务

可以调用任何作为 REST 服务公开的外部服务。 因此,我们也可以从 AWS lambda 调用 Azure 函数。

/// Call REST service
var a = await orchestration.CallGetAsync<DummyResponse>
        (url + "employees",null,null, "ServiceOperation1");
var b = await orchestration.CallPostAsync<DummyResponse>
        (url + "create",null, null, null, "ServiceOperation2");
var c = await orchestration.CallPutAsync<DummyResponse>
        (url + "update/21", null, null, null, "ServiceOperation3");
var d = await orchestration.CallDeleteAsync<DummyResponse>
        (url + "delete/21", null, null, "ServiceOperation4");

创建 AWS 后台进程以运行外部服务

需要创建一个后台进程来运行外部 REST 服务,以运行这些任务并保持持久性。 以下代码演示了如何为您的编排创建一个简单的后台进程。

while (true)
{
            var orch = new AWSRESTService(awsAccessKeyID, awsSecretAccessKey, awsRegion);
            await orch.Run("RESTSequence1");
}

从持久存储中获取编排状态

可以查询该框架以获取编排的当前状态。 编排的状态会定期保存在持久存储中,该存储是 AWS 的 DynamoDB。 只有在构造 AWSORchestrtaionFactory 期间将相应的参数设置为 TRUE 时,此操作才有效。

/// Wait for user input
var currentState = await orchestration.GetCurrentState();

使用您自己的后端

LambdaBiz 使用 AWS SWF 作为默认后端,并使用 DynamoDB 作为默认持久存储。 但是,可以使用该框架创建您自己的后端和存储。 也许,您想使用 SQL、MySql 或其他一些 noSQL 作为后端。

创建您自己的后端

实现 IOrchestrationFactory 以根据您的后端创建您自己的工厂。 实现 IOrchestration 以根据您的后端创建您自己的编排容器。

创建您自己的持久存储

实现 IPersistantStore 以创建您自己的持久存储。

未来计划

计划实现 MySql 和 SQL Server 后端,并且可能还有其他一些 NoSQL 存储,例如 MondoDB 和 Cassandra。

历史

  • LambdaBiz 的第一个版本
  • 添加了一个完全可用的 C# lambda 示例
© . All rights reserved.