65.9K
CodeProject 正在变化。 阅读更多。
Home

客户端基于类型的发布/订阅,探索同步、“事件驱动”和工作线程订阅

starIconstarIconstarIconstarIconstarIcon

5.00/5 (2投票s)

2021年1月1日

CPOL

27分钟阅读

viewsIcon

11405

downloadIcon

86

将路由的语义数据发送到客户端和服务器端订阅者,其中客户端订阅者可以立即处理,或放入消息队列,或在真正异步的工作线程中运行。

目录

引言

我观察到的前端开发的一个倾向是意大利面条式代码:深层嵌套的函数调用、复杂的条件逻辑,以及如今通过 Proxy 进行的 UI 事件、AJAX 事件和 Observable 的复杂/嵌入式处理。是的,这些都可以通过更好的实现纪律来纠正,但前端开发中一直让我感到沮丧的一点是,它是面向过程的,而不是“数据类型”导向的。在后端,我多年前通过像 HOPE 这样的架构和 语义发布/订阅 架构解决了这个问题,但前端开发没有 C# 的类型反射特性。这意味着实现必须使用元数据(使用 TypeScript 中 提议的装饰器)或接口来完成,这两者都有些hacky,但可以完成工作。

从我的角度来看,基于类型的流程模式(而非多多少少线性的处理)的优点如下,这些优点与 发布/订阅 模式的优点相同

  1. 小函数,只做一件事,并且只做一件事——当数据发布时触发,它们执行一些工作,然后发布工作的结果。
  2. 由于发布是通过路由器完成的,因此对工作链的日志记录很简单。
  3. 编写单元测试变得非常简单——设置数据,发布它,验证预期的结果已发布。
  4. 订阅者可以确定它所做的工作可以执行(使用 postMessage)或异步执行(使用 Worker)。
  5. 或者调用者可以确定(或覆盖)工作是否应同步或异步执行。
  6. 提高重用性和可扩展性。
  7. 一致的日志记录和异常处理,它们本身也可以实现为订阅者。

典型的发布/订阅模式与我在这里演示的模式之间的显著区别是:典型的 pub/sub 基于事件名称或“主题”,而我演示的是事件名称本身就是数据类型。这个小的区别对于将发布过程与数据类型解耦很重要。一张图片可能有助于说明

在这里,不同的订阅者,基于主题“A”或“B”,处理 MyData 的发布。

在这里,数据类型变成了“主题”。

是的,标准的 pub/sub 模式可以通过硬编码类型作为主题来用于基于类型的 pub/sub,但我想要更动态、更可扩展的东西,比如类型层次结构。

顺便说一句,关于发布/订阅模式的 维基页面 说:

有两种常见的过滤形式:基于主题和基于内容。
在 **基于主题** 的系统中,消息被发布到“主题”或命名的逻辑通道。基于主题的系统中的订阅者将接收发布到其订阅的主题的所有消息。发布者负责定义订阅者可以订阅的主题。
在 **基于内容** 的系统中,仅当消息的属性或内容与订阅者定义的约束匹配时,消息才会被传递给订阅者。订阅者负责对消息进行分类。
某些系统支持两者的混合;发布者将消息发布到主题,而订阅者将基于内容的订阅注册到一个或多个主题。

我在这里写的是第三种形式——基于类型。然而,我发现基于类型-内容-的订阅者有一些用途,但我认为我将在本文中省略它。

缺点

让我明确一点,主题的好处是你可以根据主题将消息路由到特定的订阅者。因此,任何 pub/sub 实现都是主题-数据与订阅者的 *隐式* 耦合。在语义(即基于类型的方法)中,它可能要求开发人员创建特定的类型以确保路由到所需的处理程序。虽然我可以争辩说,紧耦合的 *显式* 类型-订阅者实现具有架构优势,但根据我的经验,它也可能导致创建过多的类型,仅仅是为了确保路由到所需的订阅者。因此,如果您考虑这种类型的架构,请牢记这一点。

无类型编程语言的一个优点是您可以直接传递对象,例如 {FirstName: "Marc", LastName: "Clifton"}。我提出的概念的缺点是(与 C# 的匿名类型类似),没有类型,因此以这种方式构造的对象不适合基于类型的 pub/sub。当然,有一个笨拙的解决方法。

此外,我还遇到过与开发人员相关的其他缺点(请原谅我有点抱怨的陈述,但我只是实话实说)

  1. 编码员(我猜人们普遍如此)倾向于线性思考,所以“发布即忘”的编码风格并不一定容易理解。我认为这实际上是我从未见过使用 pub/sub 模式的前端项目,更不用说 Worker 类了。
  2. 编码员不以数据驱动的架构来思考。他们以 UI 事件或后端进程调用的方式思考,所有这些都只会强化线性思维和线性编码。
  3. 编码员追求快速实现,很少有团队能够就高层架构达成一致,更少能够始终如一地遵循它,更少能够对“实现执行者”感到满意。编码员不喜欢被强加架构,除非它属于“我需要学习这个以确保工作安全,因为它现在很流行”的范畴,当然不是由我这样的人,他们完全脱离了主流的 React、Vue、Angular、年度热门框架。
  4. 异步编程也是我不断看到人们在挣扎的事情,不仅因为它不是线性的,而且因为人们普遍对整个概念感到不适。是的,部分原因是必须管理数据锁、下游“重新同步”、锁死等,而这些在 JavaScript 中通常不是问题,即使使用 Worker 线程,因为它们处理的数据是“复制”到每个 Worker 的独立数据区域中的。
  5. 线性编写代码更简单。如果代码是事件处理程序(来自 UI 事件或 AJAX 事件),我总是看到函数或胖箭头“实现就在这里”,而没有考虑“这段代码是否有任何重用性”。就我个人而言,当我看到 HTML 本身中的事件处理程序(或任何类型的声明式“编码”)时,我都会感到不适!
  6. 调试 pub/sub,说实话,比调试线性代码或“处理程序就在这里进行线路连接”的代码要困难。
  7. 调试异步代码甚至更难。至少人们是这么说的,因为我不觉得调试异步代码有多难,也不觉得调试同步 pub/sub 实现中的代码有多难。但那是我的看法。

TypeScript 版本

我原本打算使用 TypeScript 4 版本来编写代码,以便能够利用带标签的元组

不幸的是,Visual Studio 对带标签元组的编辑器支持似乎仍不完整

这真令人讨厌。所以虽然我曾经将元组重构为带有标签,但您仍然会看到 [0][1] 来引用元组元素,因为我无法使用它们的名称!

使用注意事项

首先需要考虑的是订阅者是长期运行的还是需要 DOM 操作(在我看来,后者通常不是好的架构实践)。在 JavaScript 应用程序的后台是 事件循环,它在主线程上同步处理排队的事件。为了不创建无响应的用户界面,事件处理程序应尽快完成。对于长期运行的工作,有 Worker 类。Worker 对象的缺点是它不能操作 DOM,因此任何 DOM 更改都必须发布回主窗口线程。另请参阅 Web Workers 可用的函数和类。相反,Worker pool 中的排队工作可以被移除,而发布到事件队列的消息则不能。因此,除了上面描述的缺点之外,还有各种使用注意事项需要考虑。

设计

又一个图

这应该很直接——一个实现 ITyped 的对象被发布,订阅者被同步或异步调用,它们本身可以发布更多数据。pub/sub 执行日志记录和异常处理,它们本身也可以实现为订阅者。

实现,第一部分

这是“边写边走”的代码,所以随着文章的进展,您可能会看到一些变化。我觉得这样更有趣,读者似乎也很喜欢。话虽如此,我已预见到 Worker 异步处理的一些挑战,但我们会走过那座桥,到达那道峡谷。

ITyped 接口

一个非常简单的接口,强制要求将数据类型实现为 string

interface ITyped {
  __name: string;
}

数据类型字典

而且因为我非常不喜欢硬编码字符串,所以对于初始测试

export class DataTypeDictionary {
  public static MyData: string = "MyData";
}

我的测试类

import { DataTypeDictionary } from "./DataTypeDictionary"

export class MyData implements ITyped {
  __name = DataTypeDictionary.MyData;

  public someString: string;

  constructor(someString: string) {
    this.someString = someString;
  }
}

以及 PubSub 类使用的一些类型和接口

import { Guid } from "./Guid";

type Subscriber = (data: ITyped, pubSub: PubSub, token?: Guid) => void;
type DataType = string;

interface ITypeTokenMap {
  [key: string]: Guid[];
}

interface ITypeSubscriberMap {
  [key: string]: Subscriber;
}

Subscriber 就像一个 C# Action<T>(T data, PubSub pubSub, Guid token = null) where T : ITyped 的简写,提高了代码的可读性,而接口只是键值字典。不那么明显的是,每个订阅者都有一个唯一的令牌(我没有显示 Guid 类(我可能在 StackOverflow 上找到它)),所以在后台,订阅者会获得一个令牌,然后将该令牌添加到值数组中,其中数据类型是键(一对多)。实际的订阅者存储在一个一对一的字典中。

PubSub 字段和构造函数

export class PubSub {
  private window: any;
  private subscriberTokens: ITypeTokenMap;
  private subscribers: ITypeSubscriberMap;

  constructor(window: any) {
    this.window = window;
    this.subscriberTokens = {};
    this.subscribers = {};
  }

在这里,我通过传入“window”对象来预见自己,因为不久之后,我将修改代码以使用 window.postMessage 函数,以便发布将发布订阅者调用的调用入队,而不是立即处理它们。理论上,这将允许 UI 保持响应能力,但我在实践中不知道它的表现如何!

Subscribe (订阅)

public Subscribe(type: DataType, subscriber: Subscriber): Guid {
  let guid = Guid.NewGuid();
  let strGuid = guid.ToString();

  if (!(type in this.subscriberTokens)) {
    this.subscriberTokens[type] = [];
  }

  this.subscriberTokens[type].push(guid);
  this.subscribers[strGuid] = subscriber;

  return guid;
}

非常直接。如果数据类型(作为键)是新的,则为此创建一个空的订阅者令牌数组。然后添加令牌并将令牌映射到订阅者。

取消订阅

public Unsubscribe(subscriberToken: Guid): void {
  let strGuid = subscriberToken.ToString();

  // Remove the unique GUID key from the subscribers collection.
  delete this.subscribers[strGuid];

  // Find the GUID from the collection mapped to the data type.
  let subscriberTokenIdx = Object.entries(this.subscriberTokens)
      .filter(([k, v]) => this.IndexOf(v, subscriberToken) !== undefined)
      .map(([k, v]) => ({ k: k, idx: this.IndexOf(v, subscriberToken) }));

  // There should only ever be 0 or 1 records.
  if (subscriberTokenIdx.length == 1) {
    let sti = subscriberTokenIdx[0];
    this.subscriberTokens[sti.k].splice(sti.idx, 1);
  }
}

private IndexOf(guids: Guid[], searchFor: Guid): number {
  let strGuid = searchFor.ToString();
  let idx = guids.map(g => g.ToString()).indexOf(strGuid);

  return idx;
}

有点棘手。从字典中删除令牌-订阅者键值对很容易。显然,删除数据类型-令牌特定项的数组更复杂。我们在这里所做的是通过查找键值对中的令牌来过滤字典,然后将过滤后的结果映射到数据类型和该数据类型的令牌数组中的索引的键值对。正如代码注释指出的那样,我们应该在数据类型令牌数组的字典中找到一个且仅一个令牌,或者不找到。

Publish (发布)

这简单多了

public Publish(data: ITyped): void {
  let subscriberTokens = this.subscriberTokens[data.__name];

  if (subscriberTokens) {
    // Get our subscribers in case one or more of them decides to unsubscribe.
    let subscriptions = subscriberTokens.map
    (token => ({ subscriber: this.subscribers[token.ToString()], token: token }));

    subscriptions.forEach
    (subscription => subscription.subscriber(data, this, subscription.token));
  }
}

最后是我的测试

import { PubSub } from "./SemanticPubSub/PubSub";

import { MyData } from "./MyData";
import { DataTypeDictionary } from "./DataTypeDictionary";
import { Guid } from "./SemanticPubSub/Guid";

export class AppMain {
  public run() {
    let d = new MyData("Foobar");
    let pubSub = new PubSub(window);
    pubSub.Subscribe(DataTypeDictionary.MyData, this.Subscriber1);
    let token2 = pubSub.Subscribe(DataTypeDictionary.MyData, this.Subscriber2);
    pubSub.Publish(d);

    // Subscriber1 unsubscribes itself.
    pubSub.Unsubscribe(token2);

    console.log("You should not see any further subscriber calls.")
    pubSub.Publish(d);
    console.log("Done.");
  }

  public Subscriber1(data: MyData, pubSub: PubSub, token: Guid): void {
    console.log(`Subscriber 1: ${data.someString}`);
    pubSub.Unsubscribe(token);
  }

  public Subscriber2(data: MyData): void {
    console.log(`Subscriber 2: ${data.someString}`);
  }
}

运行所有这些

我们看到

太棒了。

添加日志和异常处理程序订阅者

让我们添加这些部分。首先,支持此的数据类型类。

Logger

import { PubSubDataTypeDictionary } from "../PubSub"

export class Log implements ITyped {
  __name: string = PubSubDataTypeDictionary.Logger;

  public message: string;

  constructor(message: string) {
    this.message = message;
  }
}

异常处理器

import { PubSubDataTypeDictionary } from "../PubSub"

export class Exception implements ITyped {
  __name: string = PubSubDataTypeDictionary.Exception;

  public message: string;

  constructor(message: string) {
    this.message = message;
  }
}

天哪,看起来一样,除了类型名称!

对 PubSub 文件的添加

export class PubSubDataTypeDictionary {
  public static Logger: string = "Logger";
  public static Exception: string = "Exception";
}

export class PubSubOptions {
  public hasDefaultLogger: boolean;
  public hasDefaultExceptionHandler: boolean;
}

构造函数的更改

export class PubSub {
  private window: any;
  private subscriberTokens: ITypeTokenMap;
  private subscribers: ITypeSubscriberMap;

  constructor(window: any, options?: PubSubOptions) {
  this.window = window;
  this.subscriberTokens = {};
  this.subscribers = {};

  // Process options!
  this.ProcessOptions(options);
}

private ProcessOptions(options: PubSubOptions): void {
  if (options) {
    if (options.hasDefaultLogger) {
      this.Subscribe(PubSubDataTypeDictionary.Logger, this.DefaultLogger);
    }

    if (options.hasDefaultExceptionHandler) {
      this.Subscribe(PubSubDataTypeDictionary.Exception, this.DefaultExceptionHandler);
    }
  }
}

private DefaultLogger(data: Log): void {
  console.log(data.message);
}

private DefaultExceptionHandler(data: Exception): void {
  console.log(`You broke it! ${data.message}`);
}

发布内容的更改

public Publish(data: ITyped, internal?: boolean): void {
  if (!internal) {
    this.Publish(new Log(`Publishing ${data.__name}`), true);
  }

  let subscriberTokens = this.subscriberTokens[data.__name];
 
  if (subscriberTokens) {
    // Get our subscribers in case one or more of them decides to unsubscribe.
    let subscriptions = subscriberTokens.map
        (token => ({ subscriber: this.subscribers[token.ToString()], token: token }));

    subscriptions.forEach(subscription => {
      try {
        subscription.subscriber(data, this, subscription.token);
      } catch (err) {
        this.Publish(new Exception(err));
      }
    });
  }
}

请注意,我们需要通过添加一个 internal 标志来停止递归。请注意,internal 标志仅用于日志记录。我们假设异常处理程序本身不会抛出异常。

我们有个 bug!

我们现在实例化 PubSub,使用

let pubSub = new PubSub(window, { hasDefaultLogger: true, hasDefaultExceptionHandler: true });

运行测试应用程序,我们看到

哦,哇——我们自己的代码有一个未被发现的异常!让我们来修复它。调试时,我们在这一行看到

let subscriptions = subscriberTokens.map
    (token => ({ subscriber: this.subscribers[token.ToString()], token: token }));

因此,显然 unsubscribe 方法未能正确清理订阅者令牌字典。

这里的问题是我们没有过滤掉令牌字典中不再存在的令牌的映射。正如我们所见

订阅者字典只有我们的 Log 和 Exception 数据类型订阅者。这是一个简单的修复

let subscriptions = subscriberTokens
  .map(token => ({ subscriber: this.subscribers[token.ToString()], token: token }))
  .filter(s => s.subscriber);

现在运行看起来很干净

所以这很有趣!通过添加一个异常处理程序订阅者并将每个订阅者调用包装在 try-catch 块中,我们发现了我们自己代码中的一个异常。

添加您自己的处理程序

希望您能明白,您也可以添加自己的日志记录器。使用

<body>
  <p>Log:</p>
  <div>
    <textarea id="log" rows=10 cols=100></textarea>
  </div>
  <p>Exceptions:</p>
  <div>
    <textarea id="exceptions" rows=10 cols=100></textarea>
  </div>
</body>

pubSub.Subscribe(PubSubDataTypeDictionary.Log, this.LogViewer);
pubSub.Subscribe(PubSubDataTypeDictionary.Exception, this.ExceptionViewer);

public LogViewer(log: Log): void {
  this.AppendText("log", `${log.message}\r\n`);
}

public ExceptionViewer(exception: Exception): void {
  this.AppendText("exceptions", `${exception.message}\r\n`);
}

private AppendText(id: string, msg: string) {
  let el = document.getElementById(id) as HTMLTextAreaElement;
  el.value += msg;
}

又一个 bug

我们发现这行不通

err => this.Publish(new Exception(err))) 这一行

啊!我一直忘记 JavaScript 在进行函数调用时不会保留“this”的对象实例。关于 JavaScript 是否面向对象的争论,这是我为什么认为它不是面向对象的决定性原因!

尽管如此,我们还是需要保留调用上下文。不幸的是,这需要将“this”作为上下文(或者在 JavaScript 术语中,“scope”)传递,这需要一些小的重构,利用 TypeScript 的元组概念,这实际上只是一个语法上的数组hack,因为元组元素不能命名(“TypeScript 元组就像具有固定数量元素的数组” - 链接)。所以我们用以下方式重构

type Scope = any;

interface ITypeSubscriberMap {
[key: string]: [Subscriber, Scope];
}

public Subscribe(type: DataType, subscriber: Subscriber, scope?: any): Guid {
  let guid = Guid.NewGuid();
  let strGuid = guid.ToString();
  scope == scope ?? this;

  if (!(type in this.subscriberTokens)) {
    this.subscriberTokens[type] = [];
  }

  this.subscriberTokens[type].push(guid);
  this.subscribers[strGuid] = [subscriber, scope];

  return guid;
}

请注意,如果未提供作用域,我们假定为 PubSub 作用域“this”——并非所有订阅者都需要作用域。

最后,在调用订阅者时,我们“应用”作用域

subscriptions.forEach(subscription =>
  Assertion.Try(
    () => subscription.subscriber[0].apply
          (subscription.subscriber[1], [data, this, subscription.token]),
    err => this.Publish(new Exception(err))));

现在我们在页面上看到

使用 Logger 而不是 console.log

我们不妨也把 console.logs 重构为使用我们的 logger 订阅者

  ...
  pubSub.Publish(new Log("You should not see any further subscriber calls."));
  pubSub.Publish(d);
  pubSub.Publish(new Log("Done."));
}

public Subscriber1(data: MyData, pubSub: PubSub, token: Guid): void {
  pubSub.Publish(new Log(`Subscriber 1: ${data.someString}`));
  pubSub.Unsubscribe(token);
}

public Subscriber2(data: MyData, pubSub: PubSub): void {
  pubSub.Publish(new Log(`Subscriber 2: ${data.someString}`));
}

我们看到

或者,如果您喜欢将 PubSub 日志记录与应用程序日志记录分开

<body>
  <p>Pub/Sub Log:</p>
  <div>
    <textarea id="pubsublog" rows=10 cols=100></textarea>
  </div>
  <p>Application Log:</p>
  <div>
    <textarea id="applog" rows=10 cols=100></textarea>
  </div>
  <p>Exceptions:</p>
  <div>
    <textarea id="exceptions" rows=10 cols=100></textarea>
  </div>
</body>

export class DataTypeDictionary {
  ...
  public static AppLog: string = "AppLog";
}

import { DataTypeDictionary } from "./DataTypeDictionary"

export class Log implements ITyped {
  __name: string = DataTypeDictionary.AppLog

  public message: string;

  constructor(message: string) {
    this.message = message;
  }
}

pubSub.Subscribe(DataTypeDictionary.AppLog, this.AppLogViewer, this);
pubSub.Subscribe(PubSubDataTypeDictionary.Log, this.PubSubLogViewer, this);
pubSub.Subscribe(PubSubDataTypeDictionary.Exception, this.ExceptionViewer, this);

以及实现

public PubSubLogViewer(log: Log): void {
  this.AppendText("pubsublog", `${log.message}\r\n`);
}

public AppLogViewer(log: AppLog): void {
  this.AppendText("applog", `${log.message}\r\n`);
}

public ExceptionViewer(exception: Exception): void {
  this.AppendText("exceptions", `${exception.message}\r\n`);
}

鉴于以下用法示例

pubSub.Publish(new AppLog("Done."));

我们现在看到

但到此为止。

几次小的重构

我不喜欢“if”语句,这段代码...

if (!internal) {
  this.Publish(new Log(`Publishing ${data.__name}`), true);
}

...闻起来不对。我们可以像这样清理它

public Publish(data: ITyped): void {
  this.InternalPublish(new Log(`Publishing ${data.__name}`));
  this.InternalPublish(data);
}

private InternalPublish(data: ITyped): void {
...

这样做的好处是也消除了可选参数。

我也不喜欢 try-catch 块给代码带来视觉混乱

try {
  subscription.subscriber(data, this, subscription.token);
} catch (err) {
  this.Publish(new Exception(err));
}

倾向于一个包装任何 try-catch 块的东西

subscriptions.forEach(subscription =>
  Assertion.Try(
   () => subscription.subscriber[0].apply
         (subscription.subscriber[1], [data, this, subscription.token]),
   err => this.Publish(new Exception(err))));

鉴于

type Lambda = () => void;
type ErrorHandler = (err: string) => void;

export class Assertion {
  public static Try(fnc: Lambda, errorHandler: ErrorHandler) {
    try {
      fnc();
    } catch (err) {
      errorHandler(err);
    }
  }
}

但那是我的看法。

过度设计 - 使用 window.postMessage

鉴于 这个:“window.postMessage() 方法可安全地实现窗口对象之间的跨域通信”,我们也可以使用 postMessage 将消息发布到我们自己的窗口。在 pub/sub 中这样做可能是一种过度设计:当数据发布时,我们希望订阅者立即运行,阻止当前应用程序代码的执行,直到订阅者运行完毕,还是我们希望将“发布”排队作为消息事件?我个人喜欢发布数据不应该阻塞发布者,这样它就可以继续做其他事情的想法。但这引出了另一个问题——每个订阅者是否也应该排队作为消息事件?对于这个问题,我认为不是。我非常怀疑 UI 事件能否在处理队列中的消息时插入队列。

这是一个简单的自发布示例,来自 Chrome 控制台

基本上,我们需要过滤掉 pubsub 的消息。

然而,我们有一个更大的问题——postMessage 再次丢失了作用域。所以这个(字面意思和双关语)行不通

window.addEventListener('message', this.OnWindowMessage);
...
public Publish(data: ITyped): void {
  this.window.postMessage(["pubsub", data], this.window);
}

private OnWindowMessage(event): void {
  let data = event.data;

  if (Array.isArray(data) && data[0] == "pubsub") {
    data = data[1] as ITyped;
    this.InternalPublish(new Log(`Publishing ${data.__name}`));
    this.InternalPublish(data);
  }
}

这个也不行

this.window.postMessage(["pubsub", data, this], this.window);

我也不想传递 PubSub 实例,因为这会克隆 PubSub 中的所有数据,这会产生一些不良的副作用,比如订阅者无法取消订阅自己。

程序员应该有选择

这让我犹豫,因为现在我们显然有两个问题

  1. 性能损失,因为数据被克隆了
  2. 数据是可克隆的限制

另一方面,因为数据被克隆了,所以它在订阅者范围内是不可变的。这有优点!

鉴于上述两个问题,程序员应该有选择,而不是 pub/sub 强加实现决策。话虽如此,让我们继续。

实现

鉴于用户不应受限于使用单个 PubSub 实例(因此我们可以有一个 PubSubstatic 成员),我们有一个糟糕的解决方案——向窗口对象添加一个字典,其键值是 pub/sub 名称及其实例。幸运的是,程序员不必关心这个实现细节。鉴于

interface IPubSubMap {
  [key: string]: PubSub;
}

private Register(): void {
  this.id = Guid.NewGuid();
  let w = window as any;
  w._pubsubs = w._pubsubs || {};

  let dict = w._pubsubs as IPubSubMap;
  dict[this.id.ToString()] = this;

  window.addEventListener('message', this.OnWindowMessage);
}

我们现在可以这样做

public QueuedPublish(data: ITyped): void {
  this.window.postMessage(["pubsub", data, this.id.ToString()], this.window);
}

并且处理程序实现为

private OnWindowMessage(event): void {
  let eventData = event.data;

  if (Array.isArray(eventData) && eventData[0] == "pubsub") {
    let data = eventData[1] as ITyped;

    let w = window as any;
    let dict = w._pubsubs as IPubSubMap;
    let me = dict[eventData[2].ToString()] as PubSub;

    me.InternalPublish(new Log(`Publishing ${data.__name}`));
    me.InternalPublish(data);
  }
}

并更改测试以使用,例如

pubSub.QueuedPublish(new AppLog("You should not see any further subscriber calls."));
pubSub.QueuedPublish(d);
pubSub.QueuedPublish(new AppLog("Done."));

现在我们看到

订阅者 2 发生了什么?

好了,看吧——在代码中

pubSub.QueuedPublish(d);

// Subscriber1 unsubscribes itself.
pubSub.Unsubscribe(token2);

订阅者 2 被应用程序线程取消订阅。当应用程序函数完成时,JavaScript 消息事件队列被处理,因此当事件队列被处理时,订阅者已不存在!为了解决这个问题(更多过度设计),PubSub 可以维护自己的队列并实现一个 Flush 方法,但首先需要一些重构

export class PubSub {
  ...
  private queue: ITyped[] = [];

我们不再需要将数据作为消息的一部分传递,这产生了以下影响

  1. 数据不再被克隆——性能有所提高
  2. 数据不再被克隆——所以它是可变的!
public QueuedPublish(data: ITyped): void {
  this.queue.push(data);
  this.window.postMessage(["pubsub", this.id.ToString()], this.window);
}

private OnWindowMessage(event): void {
  let eventData = event.data;

  if (Array.isArray(eventData) && eventData[0] == "pubsub") {
    let w = window as any;
    let dict = w._pubsubs as IPubSubMap;
    let me = dict[eventData[1]] as PubSub;
    me.Flush();
  }
}

现在是 Flush 方法

public Flush(all: boolean = false): void {
  let n = this.queue.length;

  while (n > 0 || all)
    {
    let data = this.queue.shift(); // treat as queue, not an array (aka stack with push/pull)

    if (data) {
      --n;
      this.InternalPublish(new Log(`Publishing ${data.__name}`));
      this.InternalPublish(data);
    }
  }
}

可选参数 all 允许调用者指定处理所有排队的消息,*包括订阅者发布的消息*,直到队列中没有更多数据。

在测试应用程序中

pubSub.QueuedPublish(d);
pubSub.Flush();

现在我们看到

再次重构

事实证明,我们真的不需要 window 实例。因此

export class PubSub {
  ...
  constructor(options?: PubSubOptions) {
    ...

PubSub 的任何调用,其本质上都必须发生在已经拥有 window 对象的主应用程序线程上。

这还需要一个小小的更改

public QueuedPublish(data: ITyped): void {
  this.queue.push(data);
  window.postMessage(["pubsub", this.id.ToString()], window as any);
}

工作线程

现在,假设我有一些可以完全异步完成的工作。

第一轮 - 基本概念

虽然我们最终想要一个工作池,但我们先完成基本工作。此初步分析的信息来源是 MDN Web Docs 上关于 使用 Web Workers 的内容。

问题

  1. 第一个问题是这个陈述:“您需要做的就是调用 Worker() 构造函数,指定要在工作线程中执行的脚本的 URI。” 严重吗?我需要为订阅者创建单独的文件吗?
  2. 第二个问题是所有作用域问题:“Worker 被认为拥有自己的执行上下文,与创建它们的文档不同。”(链接

鉴于此,我们有一些糟糕的选择。

  1. 我们是否在单独的 .js 文件中实现 Worker?
  2. 还是我们使用邪恶的 eval 函数将订阅者作为字符串传递?

由于我根本无法赞同创建单独 .js 文件的想法,并且我不愿尝试 eval,因此我们将改用 Function。请注意文档中的内容

直接调用构造函数可以动态创建函数,但存在安全性和与 eval 类似的(但重要性远不及)性能问题。但是,与 eval 不同,Function 构造函数创建的函数仅在全局范围内执行。通过 Function 构造函数创建的函数不会创建其创建上下文的闭包;它们始终在全局范围内创建。运行它们时,它们只能访问自己的局部变量和全局变量,而不能访问创建 Function 构造函数的范围内的变量。这与对函数表达式的代码使用 eval 不同。

所以这是一个改进。

想法是这样的——Worker 获取数据包和一个表示要执行的函数的 string,然后返回一个 ITyped 数据数组,该数组被“封送”回主线程并发布。

require.js 的问题

我遇到的第一个问题是,由于我使用的是“require.js”,生成的 TypeScript 代码如下所示

define(["require", "exports", "../../FNumber"], function (require, exports, FNumber_1) {
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
onmessage = msg => {
console.log(msg);
let calc = -1;
postMessage(new FNumber_1.FNumberResponse(calc));
};
});
//# sourceMappingURL=worker.js.map

导致运行时错误 ReferenceError: define is not defined。所以放弃 TypeScript,我将只用纯 JavaScript 编写文件

将命名参数传递给返回值的函数

下一个棘手的代码是调用 Function 并带有名参数并返回结果。以下四个示例是等效的

new Function("{return function(a, b) { return a+b; } }").call(null).call(null, 1, 2);
new Function("{return function(a, b) { return a+b; } }").call()(1, 2);
new Function("{return (a,b) => { return a+b; } }").call()(1, 2);
new Function("{return (a,b) => a+b }").call()(1, 2);

并返回“3”。它们中的每一个概念都是动态函数(作为 string)在调用时返回一个匿名函数(前两个)或一个 lambda 表达式(后两个)。当*那个*“函数/lambda 表达式”被调用时,它执行处理并返回结果。第一个示例中的“null”是必需的,作为“this”参数,并且由于没有上下文(作用域),因此为 null。第二个示例,因为它们是 lambda 表达式(而不是匿名函数),所以不需要“this”,因为 lambda 表达式没有作用域。理解这一点需要一段时间,而且我不确定我是否真的理解了。我发现 这个 SO 帖子非常有帮助。

TypeScript 函数作为字符串是什么样的?

让我们看一下“worker.js”文件中的这段代码

onmessage = msg => {
  let data = msg.data[0];
  let fncRaw = msg.data[1];
  debugger;
}

鉴于

private Fibonacci(data: FNumber): any {
  const fibonacci = n => (n < 2) ? n : fibonacci(n - 2) + fibonacci(n - 1);
  let calc = fibonacci(data.n);

  return { n: calc, __name: "FibonacciResponse" };
}

private FibonacciResponse(fnr: FNumberResponse, pubSub: PubSub): void {
  pubSub.Publish(new AppLog(`F = ${fnr.n}`));
}

pubSub.Subscribe(DataTypeDictionary.Fibonacci, this.Fibonacci);
pubSub.Subscribe(DataTypeDictionary.FibonacciResponse, this.FibonacciResponse, this);

pubSub.AsyncPublish(new FNumber(10));

public AsyncPublish(data: ITyped): void {
  this.InternalPublish(new Log(`Publishing ${data.__name} (async)`));
  this.CreateTask(data);
}

private CreateTask(data: ITyped): void {
  let subscriberTokens = this.subscriberTokens[data.__name];

  if (subscriberTokens) {
    // Get our subscribers in case one or more of them decides to unsubscribe.
    let subscriptions = subscriberTokens
      .map(token => ({ subscriber: this.subscribers[token.ToString()], token: token }))
      .filter(s => s.subscriber);

    subscriptions.forEach(subscription => {
      let worker = new Worker("./SemanticPubSub/Worker/worker.js");
      worker.onmessage = response => {
        this.InternalPublish(response.data);
      };

      worker.postMessage([data, subscription.subscriber[0].toString()]);
    });
  }
}

当触发调试器时,我们看到

所以现在我们知道函数是什么样的了(顺便说一句,斐波那契代码是从 这里 借来的。)

所以想法是包装 this 在一个 lambda 表达式中,以便它可以在 Worker 中的 Function 执行,如下所示

onmessage = msg => {
  let data = msg.data[0];
  let fncRaw = msg.data[1];
  let fnc = fncRaw.substr(fncRaw.indexOf("{"));
  fnc = `{return data => ${fnc}}`;
  let result = new Function(fnc).call()(data);
  postMessage(result);
}

前面的 Fibonacci(data) 函数名被剥离,lambda 箭头表达式被附加,包括闭合花括号。回到 PubSub 中的 CreateTask 函数,关键代码是

    subscriptions.forEach(subscription => {
      let worker = new Worker("./SemanticPubSub/Worker/worker.js");
      worker.onmessage = response => {
        this.InternalPublish(response.data);
      };

      worker.postMessage([data, subscription.subscriber[0].toString()]);
    });
  1. 实例化了 Worker
  2. onMessage 事件被连接起来,以便我们可以处理响应。
  3. 通过调用其 postMessage 来调用 Worker,传递数据包和订阅者代码作为 string
  4. 响应被发布为一个 ITyped 数据包,将被任何订阅者处理。

并且,为了完整性,我们还有这些类

import { DataTypeDictionary } from "./DataTypeDictionary"

export class FNumber implements ITyped {
  __name = DataTypeDictionary.Fibonacci;

  public n: number;

  constructor(n: number) {
    this.n = n;
  }
}

export class FNumberResponse implements ITyped {
  __name = DataTypeDictionary.FibonacciResponse;

  public n: number;

  constructor(n: number) {
    this.n = n;
  }
}

以及这些数据字典类型名称

public static Fibonacci: string = "Fibonacci";
public static FibonacciResponse: string = "FibonacciResponse";

当我们运行主代码(在此处再次重复)时

pubSub.Subscribe(DataTypeDictionary.Fibonacci, this.Fibonacci);
pubSub.Subscribe(DataTypeDictionary.FibonacciResponse, this.FibonacciResponse, this);

pubSub.AsyncPublish(new FNumber(10));

我们看到这个(万岁!)

如果我自己说的话,这真是太棒了。

注意事项、烦恼和一些修复

参数名称必须是数据

首先,因为返回命名参数 lambda 表达式的函数如下所示

fnc = `{return data => ${fnc}}`;

订阅者也必须有参数名 data

private Fibonacci(data: FNumber) {

我不知道有什么办法可以绕过这个。

返回值

其次,返回不能是

return new FNumberResponse(calc); 

如果我们这样做,我们会得到

因为我们混合了 TypeScript(带有 require.js)和 JavaScript。同样,我不知道有什么办法可以绕过这个。所以,返回必须是一个包含类型名称的匿名对象

return { n: calc, __name: "FibonacciResponse" };

更令人讨厌的是,出于同样的原因,我们不能这样做

return { n: calc, DataTypeDictionary.FibonacciResponse };

这有点令人讨厌!

订阅者返回类型

我也很讨厌订阅者类型被定义为不返回任何内容

type Subscriber = (data: ITyped, pubSub: PubSub, token?: Guid) => void;

但 TypeScript 乐于让我订阅一个返回 any 的函数

pubSub.Subscribe(DataTypeDictionary.Fibonacci, this.Fibonacci);
...
private Fibonacci(data: FNumber): any {

我想这是因为 JavaScript 没有返回类型的概念,所以 TypeScript 也不强制执行返回类型。不过,我本应收到一个语法错误,并且我已经将类型重构为

type Subscriber = (data: ITyped, pubSub: PubSub, token?: Guid) => any;

同步调用异步订阅者

同样令人讨厌的是,我不能以非异步方式使用订阅者

pubSub.Publish(new FNumber(10));

因为 PubSub 不需要任何返回值。我们可以通过以下方式解决这个问题

subscriptions.forEach(subscription =>
  Assertion.Try(
    () => {
      let ret = subscription.subscriber[0].apply(subscription.subscriber[1], 
                [data, this, subscription.token]);

      if (ret) {
        this.InternalPublish(ret);
      }
    },
    err => this.Publish(new Exception(err))));

从异步订阅者那里期待返回值

另外,令人讨厌的是,期望有来自工作线程的响应。如果工作线程实际上不需要返回任何数据怎么办?例如

private Fibonacci(data: FNumber): any {
  const fibonacci = n => (n < 2) ? n : fibonacci(n - 2) + fibonacci(n - 1);
  let calc = fibonacci(data.n);
  console.log(calc);

  // return { n: calc, __name: "FibonacciResponse" };
}

目前,这会发生

但同样,这也有一个解决办法

worker.onmessage = response => {
  if (response.data) {
    this.InternalPublish(response.data);
  }
};

函数期望

如果代码写成匿名函数,字符串化的函数是什么样的?

pubSub.Subscribe(DataTypeDictionary.Fibonacci, (data: FNumber) => {
  const fibonacci = n => (n < 2) ? n : fibonacci(n - 2) + fibonacci(n - 1);
  let calc = fibonacci(data.n);

  return { n: calc, __name: "FibonacciResponse" };
});

我们看到字符串

"(data) => { const fibonacci = n => (n < 2) ? n : fibonacci(n - 2) + 
             fibonacci(n - 1); let calc = fibonacci(data.n); 
             return { n: calc, __name: "FibonacciResponse" }; }"

鉴于 worker.js{ 左边的所有东西剥离,然后替换为 data =>

let fnc = fncRaw.substr(fncRaw.indexOf("{"));
fnc = `{return data => ${fnc}}`;

代码同时处理实现为函数和匿名函数的订阅者。这实际上让我感到惊讶。

最后...

我说过订阅者可以返回一个 ITyped 数据集合,该集合将被封送并重新发布到应用程序线程。坦率地说,我找不到一个好的用例,所以我将这个想法作为过度设计放弃了。

异步工作的异常处理

如果我们在以下情况下测试异常处理

private ExceptionTest(): void {
  throw "I broke it!";
}

pubSub.Subscribe(DataTypeDictionary.ExceptionTest, this.ExceptionTest, this);
pubSub.AsyncPublish(new ExceptionTest());

我们看到

所以 worker.js 代码被重构了,利用了 PubSub 调用这段代码这一事实,并且由 PubSub 处理返回值

onmessage = msg => {
  let data = msg.data[0];
  let fncRaw = msg.data[1];
  let fnc = fncRaw.substr(fncRaw.indexOf("{"));
  fnc = `{return data => ${fnc}}`;

  try {
    let result = new Function(fnc).call()(data);
    postMessage(result);
  } catch (err) {
    postMessage({ message: err, __name: "Exception" });
  }
}

我们看到

再次重构

现在我们有两个函数 InternalPublishCreateTask,它们具有相同的序言

let subscriberTokens = this.subscriberTokens[data.__name];

if (subscriberTokens) {
  // Get our subscribers in case one or more of them decides to unsubscribe.
  let subscriptions = subscriberTokens
    .map(token => ({ subscriber: this.subscribers[token.ToString()], token: token }))
    .filter(s => s.subscriber);

鉴于我不喜欢重复代码,这两个函数将被重构为单个函数,该函数以处理 subscriptions 的函数作为参数。

为了清晰起见,这是我们要“反转”的两个函数的原始代码

private InternalPublish(data: ITyped): void {
  let subscriberTokens = this.subscriberTokens[data.__name];

  if (subscriberTokens) {
    // Get our subscribers in case one or more of them decides to unsubscribe.
    let subscriptions = subscriberTokens
     .map(token => ({ subscriber: this.subscribers[token.ToString()], token: token }))
     .filter(s => s.subscriber);

    subscriptions.forEach(subscription =>
      Assertion.Try(
        () => {
          let ret = subscription.subscriber[0].apply
                    (subscription.subscriber[1], [data, this, subscription.token]);

          if (ret) {
            this.InternalPublish(ret);
          }
        },
        err => this.Publish(new Exception(err))));
  }
}

private CreateTask(data: ITyped): void {
  let subscriberTokens = this.subscriberTokens[data.__name];

  if (subscriberTokens) {
    // Get our subscribers in case one or more of them decides to unsubscribe.
    let subscriptions = subscriberTokens
      .map(token => ({ subscriber: this.subscribers[token.ToString()], token: token }))
      .filter(s => s.subscriber);

    subscriptions.forEach(subscription => {
      let worker = new Worker("./SemanticPubSub/Worker/worker.js");
      worker.onmessage = response => {
        if (response.data) {
          this.InternalPublish(response.data);
        }
      };

      worker.postMessage([data, subscription.subscriber[0].toString()]);
    });
  }
}

定义 subscription 的类型为

type SubscriberHandler = { subscriber: [Subscriber, any], token: Guid };

重构后的函数是

private InternalPublish(data: ITyped): void {
  let subscriptions = this.GetSubscriptions(data);
  subscriptions.forEach(subscription => this.PublishOnUs(data, subscription));
}

private CreateTask(data: ITyped): void {
  let subscriptions = this.GetSubscriptions(data);
  subscriptions.forEach(subscription => this.PublishOnWorker(data, subscription));
}

private GetSubscriptions(data: ITyped): SubscriberHandler[] {
  let subscriberTokens = this.subscriberTokens[data.__name];
  let subscriptions: SubscriberHandler[] = [];

  if (subscriberTokens) {
    // Get our subscribers in case one or more of them decides to unsubscribe.
    subscriptions = subscriberTokens
      .map(token => ({ subscriber: this.subscribers[token.ToString()], token: token }))
      .filter(s => s.subscriber);
  }

  return subscriptions;
}

private PublishOnUs(data: ITyped, subscription: SubscriberHandler): void {
  Assertion.Try(
    () => {
      let ret = subscription.subscriber[0].apply
                (subscription.subscriber[1], [data, this, subscription.token]);

      if (ret) {
        this.InternalPublish(ret);
      }
    },
      err => this.Publish(new Exception(err)));
  }

private PublishOnWorker(data: ITyped, subscription: SubscriberHandler): void {
  let worker = new Worker("./SemanticPubSub/Worker/worker.js");
  worker.onmessage = response => {
    if (response.data) {
      this.InternalPublish(response.data);
    }
  };

  worker.postMessage([data, subscription.subscriber[0].toString()]);
}

代码更多了,但我们将公共代码提取到函数 GetSubscriptions 中,并创建了在 PubSubWorker 上发布的不同函数。我可以更激进一些,使用 偏函数 来移除 InternalPublishCreateTask 中的 forEach,但这似乎过于复杂,而且它所做的只是将 forEach 循环移到下游函数中。

第二轮 - 工作池

这里有一个有趣的实现 在这里,但它实际上是一个线程池模式(作者明确说明了这一点),而不是一个真正的异步工作池。 这个实现 是一个真正的 Worker 池,而且非常出色(尽管它使用 eval 而不是 Function)。对于我的目的来说,这有点矫枉过正,因为 Worker 池实际上只是 PubSub 的一个组件,而不是一个通用解决方案。然而,因为它是一个 NodeJs 示例,作者也没有遇到 require.js 似乎造成的名称混淆问题,以及所有其他杂项,或者也许名称混淆实际上是 Visual Studio TypeScript 编译器,或者某些其他令人讨厌的工件。关键是,作者的 worker.js 实际上是用 TypeScript 实现的,所以这显然是我的工具或配置的问题。而且我喜欢他使用 Promises 的方式,所以绝对值得一看。

首先,工作线程的数量默认为 4,除非在选项中明确设置为其他数字

export class PubSubOptions {
  public hasDefaultLogger?: boolean;
  public hasDefaultExceptionHandler?: boolean;
  public numWorkers?: number = 4;
}

Worker 池足够简单

import { PubSub } from "../PubSub";

type WorkerInfo = [worker: Worker, inUse: boolean];
type Task = [data: ITyped, code: string];

const Available = false;
const Busy = true;

export class WorkerPool {

  private workers: WorkerInfo[] = [];
  private queue: Task[] = [];
  private pubSub: PubSub;

  constructor(numWorkers: number, pubSub: PubSub) {
    this.pubSub = pubSub;
    this.Initialize(numWorkers);
  }

  public AddTask(task: Task) {
    this.queue.push(task);
    this.NextTask();
  }

  private Initialize(numWorkers: number): void {
    for (let n = 0; n < numWorkers; n++) {
      let worker = new Worker("./SemanticPubSub/Worker/worker.js");
      this.workers.push([worker, false]);

      worker.onmessage = response => {
        this.workers[n][1] = Available; 
        this.NextTask();

        if (response.data) {
          this.pubSub.Publish(response.data);
        }
      }
    }
  }

  private NextTask(): void {
    if (this.queue.length > 0) {
      for (let n = 0; n < this.workers.length; n++) {
        if (!this.workers[n][1]) {
          let task = this.queue.shift();
          this.workers[n][1] = Busy;
          this.workers[n][0].postMessage(task);
          break;
        }
      }
    }
  }
}

这里的想法是每个 worker 都有一个“忙碌”或“可用”状态。当消息发布到 worker 时,它就变忙了,并且在响应发布时“可用”。添加一个任务会将任务推送到队列,下一个可用的 worker 会处理它。如果没有可用的 worker,任务会保留在队列中,并在 worker 完成并空闲时处理。

现在,PublishOnWorker 变得很简单

private PublishOnWorker(data: ITyped, subscription: SubscriberHandler): void {
  this.workerPool.AddTask([data, subscription.subscriber[0].toString()]);
}

并且,根据测试应用程序的当前状态,我们看到

<img border="0" height="599" src="5291077/run10.png" width="342" />

您会注意到有一个细微的差别——我们现在看到 worker 响应被记录下来,因为我之前使用的是 InternalPublish,而现在我使用的是 Publish

发布到服务器

同样值得注意的是,至少对我而言,我现在有了一种进行语义服务器端调用的机制,因为数据包含了它的“类型名称”。我一直感到沮丧的一点是,端点 API 是“动词”导向的,而我想要语义类型导向的端点。有了类型名称,我现在可以在服务器端实现类似的功能,根据其 __name 路由数据——将其直接反序列化到匹配的类中,然后使用我已经写过的发布/订阅模式来调用订阅者。前后端统一的类型化订阅是一项强大的解决方案。它消除了无数的端点,因为它们不再是动词驱动的,而是类型驱动的。是的,我可以实现客户端订阅者函数,它们基本上只是将数据发布到服务器,但这将意味着很多不必要的订阅者,如果我们在 PubSub 本身中实现一个 Post 方法的话。

本文附带一个小型服务器,其详细信息将被忽略,除了处理我们发布到服务器的“类型”的路由。让我们假设我们正在使用用户名和密码登录

import { DataTypeDictionary } from "./DataTypeDictionary"

export class Login implements ITyped {
  __name = DataTypeDictionary.Login;

  public username: string = "Marc";
  public password: string = "Fizbin";
}

当用户单击登录按钮时,我们“发布”此类型到服务器的 pub/sub

let url = "http://127.0.0.1/PubSub";
(document.getElementById("btnLogin") as 
 HTMLButtonElement).onclick = () => pubSub.Post(url, new Login());

请注意,我添加了一个 Post 函数

public Post(url: string, data: ITyped, headers: IHeaderMap = {}): void {
  XhrService.Post(url, data, headers)
    .then(xhr => {
      let obj = JSON.parse(xhr.response) as ITyped;
      this.Publish(obj);
    })
    .catch(err => this.Publish(new Exception(JSON.stringify(err))));
}

此函数需要一个 ITyped 响应——一个包含 __name 的 JSON 对象,以便其响应可以发布到相应的类型订阅者。

C# 中发布/订阅模式的一个非常简单的实现然后将实例路由到订阅者。

using System;
using System.Collections.Generic;
using System.Linq;

namespace ServerDemo
{
  public interface IType { }

  public interface ISubscriber
  {
    Type Type { get; set; }
    IType Invoke(IType data);
  }

  public class Subscriber<T> : ISubscriber where T : IType
  {
    public Type Type { get; set; }
    public Func<T, IType> Handler { get; set; }

    public IType Invoke(IType data)
    {
      return Handler((T)data);
    }
  }

  public class PubSub
  {
    protected List<ISubscriber> subscribers = new List<ISubscriber>();

    public PubSub Subscribe<T>(Func<T, IType> handler) where T : IType
    {
      var subscriber = new Subscriber<T>()
      {
        Type = typeof(T),
        Handler = handler
      };

      subscribers.Add(subscriber);

      return this;
    }

    public IType Publish(IType data)
    {
      Type t = data.GetType();
      var subscriptions = subscribers.Where(s => s.Type == t);
      var resp = subscriptions.FirstOrDefault()?.Invoke(data);

      return resp;
    }
  }
}

技术上讲,虽然只有一个订阅者应该返回响应,但修改代码以返回响应类型集合,然后在前端发布每个响应类型是一个简单的过程。然后我们订阅实际的类型处理程序

pubSub.Subscribe<Login>(LoginHandler);

而示例处理程序是

protected IType LoginHandler(Login login)
{
  return new AppLog() { Message = $"Logged in {login.Username}" };
}

两个支持类是

using Newtonsoft.Json;

namespace ServerDemo
{
  public class AppLog : IType
  {
    [JsonProperty("message")]
    public string Message { get; set; }
  }
}

注意 JsonProperty 属性,以便属性被序列化以匹配 JavaScript 字段 public message: string; 的大小写

namespace ServerDemo
{
  public class Login : IType
  {
    public string Username { get; set; }
    public string Password { get; set; }
  }
}

C# 端反序列化不区分大小写。

我们现在将此通用路由添加到路由器

router.AddRoute("POST", "/PubSub", PubSub, false);

一个hacky实现是

protected IRouteResponse PubSub(dynamic data)
{
  var typeName = data.__name;

  // HACK!
  var type = Type.GetType($"ServerDemo.{typeName}");

  // HACK! - we're not getting the raw json, 
  // so serialize it again and deserialize it into the desired type.
  var packet = JsonConvert.DeserializeObject(JsonConvert.SerializeObject(data), type) as IType;

  var resp = pubSub.Publish(packet);

  // HACK AGAIN! - serialize into json and then deserialize into a dynamic.
  dynamic dresp = JsonConvert.DeserializeObject<dynamic>(JsonConvert.SerializeObject(resp));

  dresp.__name = resp.GetType().Name;

  return RouteResponse.OK(dresp);
}

这非常hacky——我不想让我们的 C# 类型承担 __name 属性——毕竟,C# 是强类型的——所以相反

  1. 类型名称从 dynamic 对象中提取。
  2. 通过组合我们已知的命名空间来获取类型。
  3. 然后数据被重新序列化,然后反序列化为所需的类型。
  4. 调用订阅者。
  5. 数据再次被重新序列化,然后*再次*反序列化为动态对象。
  6. 添加了 __name 属性并设置为响应的 IType 实例的类型名称。

如上所述

(document.getElementById("btnLogin") as 
 HTMLButtonElement).onclick = () => pubSub.Post(url, new Login());

给定我们的新按钮

当我们运行服务器(作为下载的一部分)并导航到 http://127.0.0.1/index.html 时,我们应该看到与运行 TypeScript 项目时相同的结果

但现在,当我们单击按钮时,我们看到此消息被记录下来。

hack 和其他顾虑

我实际上并没有费心去修复示例中的 hack——它的主要目的是演示如何在前端使用相同的语义数据,通过后端使用语义发布/订阅模式,在前后端实现一致的使用模式。对我来说,这是对客户端和服务器都使用语义数据的最终目标。

关注点是

  1. 我在代码注释中指出的 hack。
  2. 这些 hack 的糟糕性能。
  3. 将数据序列化回客户端时的大小写敏感性。
  4. 后端通过类名获取类型的时区敏感性。
  5. 后端命名空间问题:前端对此一无所知,我们可以在后端做得更好。

当然,最终的关注点是,没有人会像这样编程前端或后端。就我个人而言,这并不困扰我!

结论

整个练习的重点是创建一个发布/订阅系统,该系统允许

  1. 基于类型的将数据发布到订阅者。
  2. 发布可以发生
    1. 立即(阻塞应用程序线程的执行)
    2. 排队(应用程序线程的执行继续)
    3. 异步(订阅者代码在一个单独的线程上执行)
    4. 到后端!

正如我在引言中所提到的,发布/订阅模式是一种非常有用的模式,可以减少(如果不是消除)意大利面条式代码。如果本文的唯一收获是这一点,那么我就成功了。您是否同意或发现基于类型的发布/订阅的使用方式无关紧要——我强烈推荐前端使用发布/订阅模式!

历史

  • 2021 年 1 月 1 日:初始版本
© . All rights reserved.