使用微服务在 Azure 中构建 Discord 机器人(第二部分)
接下来,我们将开始获取这些事件并将它们移动到存储队列中,以便Azure函数可以处理它们。我们还将让机器人监听服务总线队列,以便它可以接收消息并将其发送回Discord服务器。
开始之前
需要注意的是,我们使用存储队列在机器人和Azure函数之间实现入站消息传递有两个原因。首先,也是最重要的一点,我们纯粹只是将消息发送到我们的处理层,因此我们不需要任何附加功能。其次,当Discord机器人足够大时,它将被分片,这允许机器人打开多个连接并并行处理消息,因此实现保证交付毫无意义。
另一点需要注意的是,我们正在使用Azure服务总线在我们的后端逻辑和机器人之间实现出站消息传递。这允许我们的Discord机器人监听服务总线队列,接收消息并在消息添加到队列后立即发送它们。存储队列不支持在允许Azure函数触发排队消息的功能之外的代码监听队列。
添加存储队列支持
首先,由于我们将首先实现存储队列,请打开您的包管理器并将Microsoft.Azure.Storage.Common
和Microsoft.Azure.Storage.Queue
包添加到您的项目中。然后,我们将在Discord套接字服务的OnStarted
方法中通过创建并添加一个ConfigureStorageQueue
方法来初始化存储帐户。此方法将需要以下类变量
private CloudStorageAccount storageAccount
用于保存存储帐户连接
创建这些变量后,将以下方法添加到您的Discord Socket Service类中
private void ConfigureStorageQueue()
{
// Try and load the queue storage account
try
{
storageAccount = CloudStorageAccount.Parse(_config["StorageQueueConnectionString"]);
}
catch (Exception ex)
{
_logger.LogError(ex.ToString());
return;
}
queueClient = storageAccount.CreateCloudQueueClient();
inboundQueue = queueClient.GetQueueReference("discord-bot-inbound-queue");
inboundQueue.CreateIfNotExistsAsync();
}
此方法首先尝试使用连接字符串创建存储帐户。我们不应将其硬编码,而应在可用的配置选项中拥有一个有效的连接字符串。对于开发环境,我们可以向我们的hostsettings.json添加另一个键/值对——“StorageQueueConnectionString
”:“UseDevelopmentStorage=true
”。这假设您已安装并运行Azure存储模拟器。当然,如果您在Azure中托管此服务,请确保您的Web主机已设置包含生产连接字符串的环境变量。
如果我们可以成功连接到存储帐户,则该方法会创建一个新的队列客户端并尝试连接到指定帐户中名为discord-bot-inbound-queue
的队列。对于我们的开发环境,我们需要确保已在本地创建了该队列。为此,在视图菜单中,选择Cloud Explorer侧窗格并展开“本地”区域。在“本地”下,应该有一个带有模拟器的“存储帐户”选项。在“队列”部分中,创建一个名为discord-bot-inbound-queue
的队列。
如果我们现在运行我们的机器人,您会看到它仍然运行良好,但我们实际上并没有对我们刚刚创建的队列做任何事情。
添加消息到队列
在开始向队列添加消息之前,我们需要将其格式化为可用的格式。为此,我们将创建一个帮助器类库,它将获取现有的Discord实体并将它们放入一个虚拟对象中,以便我们可以将其转换为JSON字符串。这样做的原因是消息队列只能将字符串作为有效负载,并且Discord实体在构建时不易转换。该类库允许我们通过在项目之间共享库来使其更有用。
将一个static class
库作为新项目添加到您的解决方案中。删除作为此项目一部分创建的默认类,并创建一个名为Utils的新文件夹。在Utils中,创建一个名为DiscordConvert
的新static
class
,我们可以使用它来通过SerializeObject
方法转换对象
public static class DiscordConvert
{
public static string SerializeObject(SocketMessage message)
{
var converted = new ConvertedMessage(message);
return JsonConvert.SerializeObject(converted, Formatting.None);
}
}
这个class
的好处是我们可以用不同的Discord
事件对象重载它,并返回一个可以添加到队列的JSON string
。这个方法使用一个我们尚未定义的类ConvertedMessage
,它只获取消息的一小部分,以便可以通过JsonConvert
进行序列化。现在我们来创建它,在我们的DNetUtils
项目中创建一个名为Entities的文件夹,并创建一个ConvertedMessage
类
public class ConvertedMessage
{
public ulong AuthorId { get; set; }
public ulong ChannelId { get; set; }
public MessageSource Source { get; set; } // System, User, Bot, Webhook
public string Content { get; set; }
public DateTimeOffset CreatedAt { get; set; }
public ICollection<ulong> MentionedChannelIDs { get; set; }
public ICollection<ulong> MentionedRoleIDs { get; set; }
public ICollection<ulong> MentionedUserIDs { get; set; }
public ConvertedMessage() { }
public ConvertedMessage(SocketMessage message)
{
AuthorId = message.Author.Id;
ChannelId = message.Channel.Id;
Source = message.Source;
Content = message.Content;
CreatedAt = message.CreatedAt;
MentionedChannelIDs = new List<ulong>();
MentionedRoleIDs = new List<ulong>();
MentionedUserIDs = new List<ulong>();
foreach (var channel in message.MentionedChannels)
{
MentionedChannelIDs.Add(channel.Id);
}
foreach (var role in message.MentionedRoles)
{
MentionedRoleIDs.Add(role.Id);
}
foreach (var user in message.MentionedUsers)
{
MentionedUserIDs.Add(user.Id);
}
}
}
我们现在有一个函数,可以将一个Discord
消息及其一些属性转换为JSON string
。
要在我们的机器人中实现此功能,我们首先需要将新的类库作为依赖项引用添加到我们的机器人项目中。然后,让我们修改ReceiveMessage
方法,使用这个新类来接收消息并将它们放入我们的队列中。在ReceiveMessage
方法中,我们将替换
if (message.Content.ToLower().StartsWith("!ping"))
channel.SendMessageAsync("pong!");
为以下代码:
CloudQueueMessage jsonMessage = new CloudQueueMessage(DiscordConvert.SerializeObject(message));
inboundQueue.AddMessage(jsonMessage);
这会获取由我们的serialize
方法生成的JSON string
,并将其添加到我们之前创建的入站队列中。
现在,当用户在我们的机器人所在的任何频道中发布消息时,该消息都会被添加到存储队列中等待处理。
使用 Azure 函数处理队列
下一步是使用Azure函数处理消息并将响应放入服务总线队列。首先,让我们创建一个服务总线队列,我们可以用它来放置返回消息。在撰写此文章时,无法在本地运行服务总线,因此我们需要在Azure中创建一个。
只需使用基本计划创建此服务,因为在此阶段,我们只需要担心收集消息的单个服务,因此如果我们有多种不同类型的消息通过单个队列运行,我们不需要主题等其他高级功能。
Service Bus 实例完成后,在 Service Bus 中创建一个名为 DNetBotQueue
的新队列。该队列将是我们的机器人默认监视的队列,用于根据我们的函数执行的处理来获取消息并将其发送到各种通道。我们还需要添加一个共享访问策略。
现在我们已经准备好服务总线队列,我们需要一个Azure函数来从我们的存储队列中接收消息,对其进行“处理”,并将响应发送回用户。目前,我们的处理将只是像以前一样,寻找文本的开头来判断是否是 !ping 并响应 pong!。首先,创建一个新的Azure函数项目并为其命名。我们还将删除首次创建项目时创建的默认Function
类,而是创建一个名为Messaging的文件夹。这只是为了在我们将其他函数添加到项目中以执行其他任务时提供更好的结构。
在此文件夹下,我们将创建一个带Queue
触发器的新函数。这将为我们搭建新函数,并设置好正确的 方法和依赖项,以便随时使用。
因为我们还将使用此函数输出到ServiceBus
队列,所以我们还要添加所需的Service Bus包。安装完这两个项目后,您需要做两件事。首先,让我们修改我们的hosts.json以确保存在以下代码
"extensions": {
"queues": {},
"serviceBus": {}
}
这连接了此时所需的两个扩展,ServiceBus
和存储队列。接下来,打开local.settings.json文件并输入以下两行
"InboundMessageQueue": "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;
AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/
KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;
TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;
QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;",
"AzureWebJobsServiceBus": "endpoint-in-shared-access-key"
第一个配置参数 (InboundMessageQueue
) 是我们用于监视存储队列事件的存储模拟器的默认帐户和密钥。第二个是从您的服务总线队列的服务总线共享访问密钥复制的端点。
现在我们已经连接了帐户,让我们修改我们创建的Queue
触发器,使其看起来像这样
public static class ProcessMessages
{
[FunctionName("InboundMessageProcess")]
[return: ServiceBus("dnetbotmessagequeue", Connection = "AzureWebJobsServiceBus")]
public static string InboundMessageProcess
([QueueTrigger("discord-bot-inbound-queue")] CloudQueueMessage myQueueItem, ILogger log)
{
log.LogInformation($"C# Queue trigger function processed: {myQueueItem}");
ConvertedMessage message = DiscordConvert.DeSerializeObject(myQueueItem.AsString);
if(message.Content.StartsWith("!ping"))
{
var returnMessage = new NewMessage();
returnMessage.ChannelId = message.ChannelId;
returnMessage.Content = "pong!";
return JsonConvert.SerializeObject(returnMessage, Formatting.None);
}
return null;
}
}
如果我们逐步查看此代码,第一行将函数标记为InboundMessageProcess
。下一行指定此函数将通过我们设置的ServiceBus
服务输出,具体来说是使用“AzureWebJobsServiceBus
”连接到“dnetbotmessagequeue
”。然后我们定义该方法,将我们的队列(discord-bot-inbound-queue
)消息和日志记录系统作为输入。在写入日志后,我们获取或队列消息并将其反序列化回转换后的消息。最后,我们通过查看消息是否符合标准并制作响应来处理消息。此方法的返回值然后发送到ServiceBus
队列。
因此,对于这个函数,我们需要创建两个小的额外组件——Deserialize
方法和DNetUtils
类库中的NewMessage
实体。让我们创建Deserialize
方法
public static ConvertedMessage DeSerializeObject(string jsonString)
{
return JsonConvert.DeserializeObject<ConvertedMessage>(jsonString);
}
很简单,它只是使用JSonConvert
反序列化成一个特定的对象。最后,NewMessage
对象看起来像这样
public class NewMessage
{
public ulong ChannelId { get; set; }
public string Content { get; set; }
}
监控返回队列
现在我们正在处理消息并将其放入ServiceBus
队列中,让我们将机器人连接起来以监视该队列并发送消息。首先,将ServiceBus
包添加到DNetBot
项目中,并将我们设置的Azure函数中的local.settings.json文件中的AzureWebJobsServiceBus
复制到DNetBot
项目中的host.settings文件中。接下来,我们将在DiscordSocketService
的顶部添加三个新变量
private string serviceBusConnectionString;
const string QueueName = "dnetbotmessagequeue";
static IQueueClient servicebusClient;
这三个变量存储服务总线连接字符串、队列名称和客户端对象。让我们通过在构造函数中检索配置来获取服务总线连接器字符串
serviceBusConnectionString = _config["AzureWebJobsServiceBus"];
接下来,我们将添加一个配置服务总线的新方法。与配置 Azure 函数的方法类似。首先是ConfigureServiceBus
方法
private void ConfigureServiceBus()
{
servicebusClient = new QueueClient(serviceBusConnectionString, QueueName);
var handlerOptions = new MessageHandlerOptions(SBException)
{
MaxConcurrentCalls = 1,
AutoComplete = false
};
servicebusClient.RegisterMessageHandler(ProcessMessage, handlerOptions);
}
此方法执行三个关键功能。首先,它根据提供的配置设置Service Bus客户端和队列。接下来,我们为客户端配置一些默认选项,但我们还链接了一个我们需要接下来编写的异常处理程序(SBException
)。最后,我们使用另一个我们需要编写的名为ProcessMessage
的函数注册消息处理程序。此处理程序将接收Service Bus消息,对其进行转换,并将消息发送回频道。
让我们添加异常处理程序,以防出现问题时捕获错误
private Task SBException(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
_logger.LogError("ServiceBus Error | Endpoint: "
+ exceptionReceivedEventArgs.ExceptionReceivedContext.Endpoint + " | "
+ exceptionReceivedEventArgs.Exception.Message);
return Task.CompletedTask;
}
一个非常简单的方法,将错误记录到我们的日志函数中。最后,在消息事件partial
类中,让我们添加以下方法将消息发送回Discord
private async Task ProcessMessage(Message message, CancellationToken token)
{
var bodyString = Encoding.UTF8.GetString(message.Body);
Formatter.GenerateLog(_logger, LogSeverity.Info, "Self",
"Sending message - Sequence: " +
message.SystemProperties.SequenceNumber +
" -- Message: " + bodyString);
try
{
NewMessage response = JsonConvert.DeserializeObject
(bodyString, typeof(NewMessage)) as NewMessage;
var channel = discordClient.GetChannel(response.ChannelId);
ITextChannel textChannel = channel as ITextChannel;
if (textChannel != null)
{
await textChannel.SendMessageAsync(response.Content);
}
else
{
Formatter.GenerateLog(_logger, LogSeverity.Error, "Self",
"Error sending message: Channel is not a text channel");
}
}
catch (Exception ex)
{
Formatter.GenerateLog(_logger, LogSeverity.Error, "Self",
"Error sending message: " + ex.Message);
}
await servicebusClient.CompleteAsync(message.SystemProperties.LockToken);
}
当Service Bus上有消息需要处理时,此方法会触发,首先将该消息转换为string
(Service Bus消息以字节数组格式存储)。接下来,我们尝试将消息转换为NewMessage
格式,并获取我们想要发送消息的频道。我们必须确保此频道能够接收文本消息,如果能,我们 then 使用SendMessageAsync
将消息发送到该频道。最后,我们完成Service Bus处理,以确保消息从Service Bus中移除,从而不会再次被处理。
现在当我们运行机器人时,消息应该会从Service Bus队列中处理。
将所有内容一起运行
因为我们现在本质上有两个项目,一个Azure Functions项目和一个Discord Bot项目,所以让我们让整个解决方案一起运行。右键单击解决方案,然后在“启动项目”选项下,选择“运行多个项目”,并选择要启动的机器人和函数项目。这将允许我们启动机器人项目和函数项目,并从头到尾处理消息。
摘要
我们现在拥有一个完整运行的机器人,它将所有消息卸载到存储队列中,然后由Azure函数在外部处理。然后,如果消息符合我们的要求,我们会在Service Bus队列中存储另一个返回消息。此队列由机器人监视,当消息进来时,机器人会接收它并将其发送到正确的频道。托管在GitHub上的代码已更新以包含新项目,并且应该完全正常工作。
下一篇文章可能会短一些,我们将看看如何将消息持久化到存储帐户。
敬请期待!
历史
- 2019年9月9日:初始版本