在 C# 中使用 Outlook 流式通知






4.40/5 (5投票s)
一个可以获取 Office365 Outlook 流式通知 API 的 Sketch 应用程序。
引言
能够在代码中即时响应外部应用程序中发生的事件,这难道不好吗?当然。好吧,在某些情况下,你根本无法做到这一点,在其他情况下,你必须以某种方式 hack 该系统来捕获该事件并通知你的应用程序。幸运的是,一些——越来越多——平台提供了或多或少标准的解决方案来与它们交互——但反向通信并不普遍。即使是 Azure API 的支持也有限。例如——除了轮询——在撰写本文时,Office 365 API 提供了两个有限的订阅选项:
- 基于 Webhook (https 回调)(包括 Graph API 和 Outlook API)
- 流式传输(仅限 Outlook API)
要实现第一种选项,你需要一个开放的 http(s) 端点。在某些情况下,这不可能实现,尤其是在本地解决方案中,主要是出于安全原因;但如果无法获得固定 IP,它也无法使用。
因此,通过客户端打开的流通道进行通知似乎是一个合理的选择,甚至比基于回调的通知更具响应性。要获得总体概述,请参见上面粗体链接中的文档。
请注意,所使用的 API 官方处于 Beta (预览) 阶段!
背景
对于我最近的项目,客户要求提供一个演示应用程序,该应用程序订阅特定 Azure AD 用户的日历事件。我已经为此准备了一个 LinqPad 5 草图。这就是为什么你找不到要下载的附件库或完整应用程序,只有这个草图。在本文中,我将介绍在开发这个概念验证解决方案期间遇到的特殊之处。
我使用了 NuGet 包,这些包仅在 LinqPad 的注册版本中受支持。如果你使用的是免费版本,则必须自己下载这些包。
Using the Code
在开始之前,你需要有一个 Azure AD 租户和关联的 Office 365 用户——你将监视其事件。如果你没有这些,你将需要申请一个试用账户。
本文中使用的客户端 ID 已注册(但不知道能注册多久)为多租户原生应用程序,因此你可能可以直接使用你的租户运行。如果不能,你需要按照本文进行注册。请注意,该应用程序使用 ADAL,因此指向的是 V1 端点。该应用程序将请求 `calendar.read` 委托范围。
你绝对需要更改为你的租户。租户和客户端 ID 都可以在查询属性的App.Config页面上找到。当然,你需要指定你的租户的 UPN 并为其授权此应用程序。你会注意到使用了设备配置文件流进行授权。这是因为最终目标解决方案。你还会注意到一些简化(例如缺少 ConfigureAwait、无 DI/IoC 等),但正如我之前所述,我的意图不是提供一个功能齐全的库,而只是一个具有特定目标的概念验证。
即使此代码侧重于日历事件,电子邮件、联系人和任务事件也需要类似地处理。
关于 API 的一些说明
简而言之,客户端必须完成两个步骤才能获得流式通知:
- 通过发送 POST 请求来启动订阅。
- 使用服务器在上一步中返回的订阅 ID 开始访问流。
很简单,不是吗?
确实如此,但有一个令人讨厌的限制:即使“模拟”的用户有权访问其他用户的日历(共享日历),你也只能订阅当前用户的事件。因此,如果你需要同时监视多个日历,你可以通过同时模拟这些用户来实现,或者使用 Graph API 进行轮询。我希望这将来会有所改变……
按照 OData 格式,你可以请求简单订阅和丰富订阅:在后一种情况下,通知将包含触发通知本身的对象的片段。在我的情况下,就是事件的详细信息。当然,如果需要,你还可以指定一些过滤。
代码中代表订阅请求的类如下:
public abstract class StreamingSubscriptionBase
{
[JsonProperty(PropertyName = "@odata.type")]
public string ODataType {get; protected set;}
public string Resource {get; protected set;}
public string ChangeType {get; protected set;}
}
public class SimpleStreamingEventSubscription : StreamingSubscriptionBase
{
public SimpleStreamingEventSubscription()
{
this.ODataType = "#Microsoft.OutlookServices.StreamingSubscription";
this.Resource = "https://outlook.office.com/api/beta/me/events";
this.ChangeType = "Acknowledgment, Created, Updated, Deleted, Missed";
}
}
请注意,我已经订阅了所有可能的更改。预期的响应将被反序列化到这个类中。
public class StreamingSubscriptionResponse
{
[JsonProperty(PropertyName = "@odata.context")]
public string ODataContext { get; set; }
[JsonProperty(PropertyName = "@odata.type")]
public string ODataType { get; set; }
[JsonProperty(PropertyName = "@odata.id")]
public string ODataId { get; set; }
public string Id { get; set; }
public string Resource { get; set; }
public string ChangeType { get; set; }
}
其中大部分实际上没什么用,除了 `Id` 属性,它是订阅本身的引用。
在 API 文档中,你会找到“订阅过期”的概念。你期望在创建订阅时获得过期时间戳。但是,出于某种原因,你会在第一次实际事件更改通知时才能看到它。这意味着,如果没有任何有趣的事件发生,你的订阅可能会在你知道截止日期之前过期。幸运的是,如果你尝试读取过期订阅的流,你将收到 http 响应代码 404 “未找到”——这很容易处理。因此,我的代码会响应此异常,而忽略潜在的过期时间戳的存在。
一旦有了 ID,你就可以通过发送另一个 POST 请求开始收听。这个请求将(几乎)无限期地持续。好吧,实际上,它的最长持续时间为 90 分钟,但从处理的角度来看,它基本相同:你必须在读取块的同时进行处理。
你得到的流格式如下:
{
"@odata.context":"https://outlook.office.com/api/beta/metadata#Notifications",
"value": [
// messages come here
]
}
这是一个有效的 JSON 对象:流打开时会发送前三行,连接超时达到时(服务器将在最多 90 分钟后结束连接)会发送最后两行。数组的内容根据请求的心跳周期和请求通知的资源的更改事件而定。实际上,可能会出现管道中几分钟都没有内容的情况。这是一个陷阱,正如我们稍后将看到的。
因此,你会在流中看到保持活动和事件通知的任何或其他分片和顺序。当然,这些也是有效的 JSON 对象,是 `values` 数组的有效元素。
你必须做好准备,不仅要处理服务器中断连接,还要处理其他可能导致通信中断的网络问题。文档要求你应该尝试重新建立流,同时保持订阅 ID 有效(参见上面关于过期问题的段落)。因此,整个循环只有在你希望它结束时才会结束——你需要不断地重新打开流,并在你需要的时候续订订阅,同时遵守公平使用策略。
身份验证
在继续实际 API 之前,我必须谈谈身份验证过程。当然,我们以 OAuth2 为基础,但幸运的是,有一个库我们可以使用,称为 ADAL(Azure Active Directory 身份验证库),它处理了与此协议相关的绝大部分繁重工作(对于 V2.0 端点,有 MSAL 库)。
`Graph` API 是 Office 365 的新统一 API。尽管如此,它尚未完成。例如,流式订阅(yet)尚未在 `Graph` API 中实现。但是,为了做好准备,我使用了并实现了 `Graph` API 库核心中定义的 `IAuthenticationProvider` 接口。
class AzureAuthenticationProvider: IAuthenticationProvider
{
private static string aadInstance = ConfigurationManager.AppSettings["ida:AADInstance"];
private static string tenant = ConfigurationManager.AppSettings["ida:Tenant"];
private static string clientId = ConfigurationManager.AppSettings["ida:ClientId"];
private static string authority =
String.Format(CultureInfo.InvariantCulture, aadInstance, tenant);
private static string resource = "https://outlook.office.com";
TokenCache cache = new TokenCache();
Action<DeviceCodeResult> signInFeedback = null;
public AzureAuthenticationProvider(string userPrincipalName,
Action<DeviceCodeResult> signInFeedback)
{
this.signInFeedback = signInFeedback;
string cachefile = Path.Combine(System.Environment.GetFolderPath
(Environment.SpecialFolder.LocalApplicationData), userPrincipalName);
cache.AfterAccess += (c) => File.WriteAllBytes(cachefile, cache.Serialize());
if (File.Exists(cachefile))
{
cache.Deserialize(File.ReadAllBytes(cachefile));
}
}
public async Task AuthenticateRequestAsync(HttpRequestMessage request)
{
AuthenticationContext authContext = new AuthenticationContext(authority, true, cache);
AuthenticationResult result = null;
bool signInNeeded = authContext.TokenCache.Count < 1;
try
{
if (!signInNeeded)
{
result = await authContext.AcquireTokenSilentAsync(resource, clientId);
}
}
catch (Exception ex)
{
var adalEx = ex.InnerException as AdalException;
if ((adalEx != null) && (adalEx.ErrorCode == "failed_to_acquire_token_silently"))
{
signInNeeded = true;
}
else
{
throw ex;
}
}
if (signInNeeded)
{
DeviceCodeResult codeResult =
await authContext.AcquireDeviceCodeAsync(resource, clientId);
this.signInFeedback?.Invoke(codeResult);
result = await authContext.AcquireTokenByDeviceCodeAsync(codeResult);
}
request.Headers.Authorization =
new AuthenticationHeaderValue("Bearer", result.AccessToken);
}
}
此提供程序使用一个非常简单但高效的令牌缓存:每次更改时,它都会将内容存储在文件中,并以 UPN 作为文件名。随时可以使用你自己的实现。如上所述,该解决方案使用了设备配置文件流,这意味着如果缓存为空或静默身份验证失败,应用程序将提示用户打开浏览器,导航到特定位置,并输入特定代码。该过程将等待直到服务器确认代码已输入并授权应用程序代表用户——或者最终超时。由于刷新令牌几乎永远不会过期,应用程序将能够随时静默进行身份验证。
订阅
这是最简单的部分。
public async Task<StreamingSubscriptionResponse> SubscribeToEventsAsync()
{
string requestUrl = "https://outlook.office.com/api/beta/me/subscriptions";
var subscriptionRequestContent =
JsonConvert.SerializeObject(new SimpleStreamingEventSubscription());
HttpRequestMessage request = new HttpRequestMessage(HttpMethod.Post, requestUrl);
request.Content = new StringContent
(subscriptionRequestContent, Encoding.UTF8, "application/json");
await provider.AuthenticateRequestAsync(request);
HttpResponseMessage response = await client.SendAsync(request);
return await response.Content.ReadAsAsync<StreamingSubscriptionResponse>();
}
请注意上面提供程序中授权发生的位置。`Graph` API 客户端库将透明地执行这些步骤,但如前所述,此功能尚未包含在 `Graph` API 中。
使用通知
现在,有趣的来了。
public async Task ListenForSubscriptionStreamAsync(CancellationToken ct)
{
string requestUrl = "https://outlook.office.com/api/beta/Me/GetNotifications";
var requestContent = new ListenRequest
{ ConnectionTimeoutInMinutes = 60, KeepAliveNotificationIntervalInSeconds = 15 };
var handler = new WinHttpHandler();
handler.ReceiveDataTimeout = TimeSpan.FromSeconds
(requestContent.KeepAliveNotificationIntervalInSeconds + 5);
HttpClient listeningClient = new HttpClient(handler);
由于这是一个长时间运行的 `async` 操作,协作式取消至关重要。
在操作开始时,你可以指定请求的超时时间(以分钟为单位)和 `keepalive` 通知间隔。如果缺少任何一项,则使用默认值。请求尚未完成。
正如我之前强调过的,在这些 `keepalive` 通知之间可能没有任何字节到达套接字。在此期间连接可能会中断,`HttpClient` 对象和流读取器都不会通知它——在这种情况下,甚至取消也无效。幸运的是,我们可以使用 `WinHttpHandler`“插件”,它有一个 `ReceiveDataTimeout` 属性。你应该将其值设置得比 `keepalive` 间隔稍高一些。
在我的代码中,每个监听器都使用自己的 `HttpClient` 实例。在进行进一步测试之前,我无法确定共享的实例如何处理这种长时间运行的请求场景。
var subscription = await SubscribeToEventsAsync();
try
{
while (!ct.IsCancellationRequested)
{
try
{
requestContent.SubscriptionIds = new string[] { subscription.Id };
HttpRequestMessage request =
new HttpRequestMessage(HttpMethod.Post, requestUrl);
request.Content = new ObjectContent<ListenRequest>
(requestContent, new JsonMediaTypeFormatter());
await provider.AuthenticateRequestAsync(request);
现在我们创建初始订阅。暂时假设这不会失败,但你可能需要等待更长的时间并重试。
请求会带着初始订阅 ID 完成,然后进行身份验证。一切都是一个大型循环的一部分,该循环仅被外部发起的取消中断。
using (var response = await listeningClient.SendAsync
(request, HttpCompletionOption.ResponseHeadersRead, ct))
{
if (!response.IsSuccessStatusCode)
{
if (response.StatusCode != HttpStatusCode.NotFound)
{
await Task.Delay(TimeSpan.FromMinutes(1));
}
subscription =
await SubscribeToEventsAsync(); // renew subscription
continue;
}
现在我们尝试启动长时间运行的请求。如前所述,这是处理过期请求情况的地方:如果返回“未找到”,我们就获取一个新的。如果发生任何其他问题,我们也会这样做,但会稍作延迟。
请注意打开流时的 `HttpCompletionOption.ResponseHeadersRead` 选项:否则,客户端将等待直到所有内容到达,这意味着在我这里需要一个小时。这不是我们想要的。为了能够处理到达的通知,我们必须在头部到达后继续。
这里是处理 `stream` 的代码。
using (var stream = await response.Content.ReadAsStreamAsync())
using (var streamReader = new StreamReader(stream))
using (var jsonReader = new JsonTextReader(streamReader))
{
bool inValuesArray = false;
while (await jsonReader.ReadAsync(ct))
{
ct.ThrowIfCancellationRequested();
if (!inValuesArray && jsonReader.TokenType ==
JsonToken.PropertyName && jsonReader.Value.ToString() == "value")
{
inValuesArray = true;
}
if (inValuesArray && jsonReader.TokenType == JsonToken.StartObject)
{
JObject obj = await JObject.LoadAsync(jsonReader, ct);
if (obj["@odata.type"].ToString() ==
"#Microsoft.OutlookServices.KeepAliveNotification")
{
$"{DateTime.Now} keepalive".Dump();
}
else
{
// This is where you do your job...
obj.ToString(Newtonsoft.Json.Formatting.Indented, null).Dump();
}
}
}
$"{DateTime.Now} Stream ended".Dump();
}
如前所述,`stream` 将包含一个大的、有效的 JSON 对象。但为了能够对通知做出反应,我们需要处理 value 数组的元素——这意味着我们需要反序列化 `stream` 的片段。幸运的是,出色的 Json.NET 包包含一个 `JsonTextReader` 类,该类能够处理到达的 JSON 令牌。但有一个问题:`jsonReader.ReadAsync` 会被阻塞,等待 `stream` 中的下一个 `char`。如果套接字未关闭但通信失败,我们将卡住而无法返回……即使传递的取消令牌也无效。但我们很幸运,我们有 `WinHttpHandler.ReceiveDataTimeout`,它不会让它无限期等待。
流将只包含心跳通知和事件更改通知。我们只需将它们读取到 `JObject` 实例中,这些实例可以轻松地转换为我们自定义的对象(未在我代码中声明)。
在此方法结束时,你会发现一些 `catch` 块,但实际上就这些了。
回顾
虽然此代码不完整且不优化,但我认为对于任何想从 dotNet 应用程序使用 Outlook 流式通知 API 的人来说,它都是一个不错的起点。
历史
- 2017 年 8 月 8 日:版本 1.0