使用 LambdaBiz 框架编排 AWS Lambda





0/5 (0投票)
使用 LambdaBiz 框架在 AWS 中创建长时间运行的、有状态的、持久的和无服务器的编排
引言
AWS lambda 允许用户编写无服务器函数。 但是,lambda 函数的最大执行时间为 15 分钟,之后它会超时。 因此,在 AWS lambda 中编写长时间运行的进程是不可能的。 AWS 引入了 Step Functions 来克服这个缺点。 然而,学习状态机语言需要一个陡峭的学习曲线,而且服务本身也需要支付高昂的费用。
本项目的目的是让现有的 AWS C# 用户能够编写持久的长时间运行的编排。
Using the Code
Nuget 包
从 https://nuget.net.cn 安装 nuget 包。
Install-Package LambdaBiz -Version 1.0.0
源代码可在 GitHub 上找到,网址为 https://github.com/WorkMaze/LambdaBiz。
耐用性
该框架依赖于 AWS SWF(简单工作流框架)来维护编排的持久性。 如果一系列任务已执行,并且该系列超时并再次被调用,则该框架将不会调用该系列中已执行的任务,并且编排将从上次运行期间中断的地方继续。
术语
OrchestrationFactory
:AWS 中用于创建编排并将它们保存在持久存储中的存储(如果设置了该参数)。 持久存储是 AWS DynamoDB。 AWS SWF 存储编排的状态 1 年。Orchestration
:由唯一OrchestrationId
标识的工作流实例。Task
:在编排中调用的 AWS lambda 函数,由唯一的OperationId
标识。Timer
:由唯一的TimerId
标识的计时器。Event
:由唯一的EventId
标识的外部触发器。Service
:对外部 REST 服务的调用(GET
、POST
、PUT
或DELETE
),由唯一的OperationId
标识。
编排 Lambda 任务
以下代码片段演示了如何编排 AWS lambda 任务
/// Initialize orchestration factory for AWS
var orchestrationFactory = new AWSOrchestrationFactory
(awsAccessKeyID, awsSecretAccessKey, awsRegion, true,awsLambdaRole);
/// Create a new orchestration
var orchestration = await orchestrationFactory.CreateOrchestrationAsync("Sequence3");
try
{
/// Start workflow
await orchestration.StartWorkflowAsync("Workflow Started");
/// Call AWS lambda task
var a = await orchestration.CallTaskAsync<Numbers>("Number",
new Numbers {
Number1 = 15,
Number2 = 5
},
"Operation1");
var b = await orchestration.CallTaskAsync<OperationResult>("Sum", a, "Operation2");
var c = await orchestration.CallTaskAsync<OperationResult>
("Difference", a, "Operation3");
var d = await orchestration.CallTaskAsync<OperationResult>
("Product", a, "Operation4");
var e = await orchestration.CallTaskAsync<OperationResult>
("Quotient", a, "Operation5");
/// Complete workflow
await orchestration.CompleteWorkflowAsync(e);
}
catch(Exception ex)
{
/// Fail workflow
await orchestration.FailWorkflowAsync(ex);
}
向您的编排添加计时器
您可以向您的编排添加具有指定持续时间的计时器。
/// Start timer
await orchestration.StartTimerAsync("30SecTimer", new TimeSpan(0, 0, 0, 30, 0));
等待外部输入
在编排中,例如审批流程,您将希望等待外部输入,例如用户点击按钮或电子邮件。 以下代码片段演示了如何实现这一点
/// Wait for user input
var approved = await orchestration.WaitForEventAsync<bool>("Approve");
在另一个编排中引发事件
当我们要从我们的编排向我们的编排发送一个输入时,因为该编排正在等待外部输入。
await orchestration.RaiseEventAsync("Approve", "Sequence3", true);
整合
无服务器模板
{
"AWSTemplateFormatVersion" : "2010-09-09",
"Transform" : "AWS::Serverless-2016-10-31",
"Description" : "An AWS Serverless Application.",
"Resources" : {
"Process" : {
"Type" : "AWS::Serverless::Function",
"Properties": {
"Handler": "LambdaBiz.Serverless::LambdaBiz.Serverless.Functions::Process",
"Runtime": "dotnetcore2.1",
"CodeUri": "",
"MemorySize": 256,
"Timeout": 30,
"Role": null,
"Policies": [ "AWSLambdaBasicExecutionRole" ]
}
}
}
}
创建长时间运行的 Lambda 函数
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using LambdaBiz.AWS;
using Newtonsoft.Json;
using Amazon.Lambda.Core;
using Amazon.Lambda.APIGatewayEvents;
// Assembly attribute to enable the Lambda function's JSON input
// to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.Json.JsonSerializer))]
namespace LambdaBiz.Serverless
{
public class Functions
{
/// <summary>
/// Default constructor that Lambda will invoke.
/// </summary>
public Functions()
{
}
public class Numbers
{
public int Number1 { get; set; }
public int Number2 { get; set; }
}
public class Request
{
public string LambdaRole { get; set; }
public Numbers Numbers { get; set; }
public string OrchestrationId { get; set; }
}
public class OperationResult
{
public int Number1 { get; set; }
public int Number2 { get; set; }
public double Result { get; set; }
}
public async Task<Model.Workflow> ProcessAsync(Request request, ILambdaContext context)
{
/// Initialize orchestration factory for AWS
var orchestrationFactory = new AWSOrchestrationFactory(true, request.LambdaRole);
context.Logger.LogLine(JsonConvert.SerializeObject(request));
/// Create a new .
var orchestration = await orchestrationFactory.CreateOrchestrationAsync
(request.OrchestrationId);
context.Logger.LogLine("Created");
try
{
/// Start workflow
await orchestration.StartWorkflowAsync("Workflow Started");
context.Logger.LogLine("Started");
/// Call AWS lambda task
var a = await orchestration.CallTaskAsync<Numbers>
("Number", request.Numbers, "Operation1");
context.Logger.LogLine("Operation1");
var b = await orchestration.CallTaskAsync<OperationResult>
("Sum", a, "Operation2");
context.Logger.LogLine("Operation2");
var c = await orchestration.CallTaskAsync<OperationResult>
("Difference", a, "Operation3");
context.Logger.LogLine("Operation3");
var d = await orchestration.CallTaskAsync<OperationResult>
("Product", a, "Operation4");
context.Logger.LogLine("Operation4");
var e = await orchestration.CallTaskAsync<OperationResult>
("Quotient", a, "Operation5");
context.Logger.LogLine("Operation5");
/// Start timer
await orchestration.StartTimerAsync
("30SecTimer", new TimeSpan(0, 0, 0, 30, 0));
context.Logger.LogLine("30SecTimer");
/// Wait for user input
var approved = await orchestration.WaitForEventAsync<bool>("Approve");
context.Logger.LogLine("Approved");
/// Complete workflow
await orchestration.CompleteWorkflowAsync(e);
context.Logger.LogLine("Complete");
}
catch (Exception ex)
{
/// Fail workflow
await orchestration.FailWorkflowAsync(ex);
context.Logger.LogLine("Fail");
context.Logger.LogLine(ex.Message);
context.Logger.LogLine(ex.StackTrace);
}
var currentState = await orchestration.GetCurrentState();
return currentState;
}
/// <summary>
/// Lambda function
/// </summary>
/// <param name="request"></param>
/// <param name="context"></param>
/// <returns></returns>
public Model.Workflow Process(Request request, ILambdaContext context)
{
var result = ProcessAsync(request, context);
result.Wait();
return result.Result;
}
}
}
调用外部服务
可以调用任何作为 REST 服务公开的外部服务。 因此,我们也可以从 AWS lambda 调用 Azure 函数。
/// Call REST service
var a = await orchestration.CallGetAsync<DummyResponse>
(url + "employees",null,null, "ServiceOperation1");
var b = await orchestration.CallPostAsync<DummyResponse>
(url + "create",null, null, null, "ServiceOperation2");
var c = await orchestration.CallPutAsync<DummyResponse>
(url + "update/21", null, null, null, "ServiceOperation3");
var d = await orchestration.CallDeleteAsync<DummyResponse>
(url + "delete/21", null, null, "ServiceOperation4");
创建 AWS 后台进程以运行外部服务
需要创建一个后台进程来运行外部 REST 服务,以运行这些任务并保持持久性。 以下代码演示了如何为您的编排创建一个简单的后台进程。
while (true)
{
var orch = new AWSRESTService(awsAccessKeyID, awsSecretAccessKey, awsRegion);
await orch.Run("RESTSequence1");
}
从持久存储中获取编排状态
可以查询该框架以获取编排的当前状态。 编排的状态会定期保存在持久存储中,该存储是 AWS 的 DynamoDB。 只有在构造 AWSORchestrtaionFactory
期间将相应的参数设置为 TRUE
时,此操作才有效。
/// Wait for user input
var currentState = await orchestration.GetCurrentState();
使用您自己的后端
LambdaBiz 使用 AWS SWF 作为默认后端,并使用 DynamoDB 作为默认持久存储。 但是,可以使用该框架创建您自己的后端和存储。 也许,您想使用 SQL、MySql 或其他一些 noSQL 作为后端。
创建您自己的后端
实现 IOrchestrationFactory
以根据您的后端创建您自己的工厂。 实现 IOrchestration
以根据您的后端创建您自己的编排容器。
创建您自己的持久存储
实现 IPersistantStore
以创建您自己的持久存储。
未来计划
计划实现 MySql 和 SQL Server 后端,并且可能还有其他一些 NoSQL 存储,例如 MondoDB 和 Cassandra。
历史
- LambdaBiz 的第一个版本
- 添加了一个完全可用的 C# lambda 示例