Whirlpool:使用 Netty 和 Kafka 的微服务





0/5 (0投票)
Whirlpool:使用 Netty 和 Kafka 的微服务
引言
在我上一篇博文中,我介绍了如何将Netty用作 Web 服务器。那个示例运行良好……前提是你需要的是一个广播服务器。
大多数情况下,这没什么用。更有可能的是,你需要的是每个客户端只接收他们应得的数据,而广播只保留在特殊情况下,比如“服务器将在15分钟后关闭!
”。那个特定服务器示例的另一个问题是,所有东西都是自包含的。虽然单体应用在示例中是可以的,但在当今的环境中,分布式微服务要好得多。可伸缩性和可靠性至关重要。
Netty 和 Kafka 结合起来非常棒。Netty 在处理大量客户端方面非常出色,而 Kafka 在使大量服务协同工作方面非常出色。结合起来,它们是开发中的一个甜点。然而,有一些“陷阱”可能会使其变得笨拙。这篇博文,连同示例微服务/Netty 架构和完全可工作的代码,希望能帮助减轻烦恼,并带来甜美的体验。
首先
示例代码位于此处。
有一个详细的 README,描述了设置环境所需的内容。我尽量将需求降到最低,只需要Java 8 和 Maven。使用了 SLF4J 和 Logback 进行日志记录。我为 Mac OSX 和 Ubuntu 设置了脚本(我测试的是运行在 Parallels 容器中的 Ubuntu 14.04),所以如果你在 Windows 上开发,敬请谅解。代码都是 Java 编写的,我在网上看到过 Windows 上的 Kafka 教程,所以一切应该都能运行。Maven 构建也应该能产生可启动的目标,所以通过一些努力安装 Zookeeper/Kafka(你可以按照脚本查看需要哪些设置),在 Windows 上手动运行它应该不是什么大问题。
注意:如 README.md
中所述,脚本将删除任何现有的 Zookeeper/Kafka 安装和数据。如果你已有安装,请不要使用脚本!
安装和配置好先决条件后,如果你不使用脚本,则运行 mvn package
;如果你使用脚本,则运行 maclocal_run.sh(或 linuxlocal_run.sh)。脚本会下载(如果尚未下载)Zk/Kafka,安装它们,配置它们,启动它们,运行 mvn package
,启动服务,最后启动服务器。一旦启动,请克制住离开终端的冲动,因为它会自动为架构的每个部分弹出新标签页。Whirlpool 服务器启动后,你就可以开始使用了。
我强烈建议创建一个脚本,在本地安装、配置、构建并启动你的微服务环境。创建每个单独的服务是一件很麻烦的事情。如果需要,也可以使用 Docker,但我发现直接原生运行所有内容需要下载的东西要少得多。
作为预告,这是用户界面(你也可以在 GitHub 的 README.md 中看到)。
- 要添加股票代码,输入它(例如“
GOOG
”),然后点击“Stock
”下的 A 按钮。要删除它,点击 X。 - 要添加一个网站来测试其是否正常运行,请输入完全限定的 URL(例如 http://facebook.com),然后点击“
UpDown
”下的 A 按钮。要删除它,点击 X。 - 要添加一个天气检查,请输入
city,state
(例如“chicago,il
”),然后点击“City,State
”下的 A 按钮。要删除它,点击 X。 - 订阅会保留页面刷新,甚至登录/注销(使用相同的
userid
),因为它们与每个服务在内存中存储。一个“真实”的系统当然会使用数据库。 - 订阅每 10 秒更新一次,以免压垮 Yahoo API,所以在添加数据后请耐心等待。
架构
通过这个示例,我试图思考一些可能有用的通用服务。我最终选择了股票行情服务、一个“网站是否正常运行”服务和一个天气服务。这些服务中的每一个都独立于其他服务运行,并拥有自己的 Kafka 主题。
我选择配置 Kafka 的方式是为每个服务设置一个命令主题和一个数据主题。所有服务也可以只使用一个全局主题,由读取者决定处理什么,但将它们分开可以使事情更清晰、更整洁。
这是数据通过 Kafka 流动方式的图示。它使用了 Keyhole 的一个免费的基于 Web 的工具,名为 Mockola。请注意,服务器知道所有主题,但服务只知道它们自己的主题。cmd
主题用于向服务发送命令,而数据主题(没有 -cmd
后缀的那些)用于从服务发送数据。同样,所有这些都可以处理在一个单一的 bus
主题上,但将它们分开更容易看到发生了什么。
服务
现在让我们来谈谈服务。所有这三项都非常相似,所以有一个基础服务负责大部分工作。每个服务有三个线程,由 Java 的 ExecutorService 处理。Executor 服务的一个优点是,如果出现问题,它会自动重启线程。这有助于提高弹性。
每个服务通过告知基类要使用哪个主题和命令主题来启动自身。然后基类启动三个线程:一个用于从命令主题读取命令,一个用于定期为客户端收集数据,还有一个用于将数据发送到数据主题。这些线程使用非阻塞的 Java 并发类 ConcurrentLinkedQueue
和 ConcurrentHashMap
进行通信。哈希映射存储每个用户的订阅集,队列存储准备发送到数据主题的响应。
每个服务的流程是三个线程并发工作。Reader 使用 Kafka Consumer 从其命令主题读取命令。根据命令,添加或删除订阅。这个线程相当“笨”,因为它不要求服务对请求进行任何验证,它只是盲目地添加发送的任何内容到订阅中。生产代码显然会添加一个调用来要求服务在允许订阅成功之前验证命令。然后创建一个响应放入主题,然后等待下一个命令。
注意:关于将数据放入主题的一些话。我使用 JSON 作为传输格式,但 XML 或任何你想要的格式也一样可行。重要的是,每个人都同意数据格式并坚持下去。通用模块包含 POJO 类,用于定义数据将遵循的契约。对所有消息普遍有用的内容是时间戳、消息类型和客户端 ID。
另一个有用的东西是过期时间戳。这些示例消息会永远存在。Message
类只查看消息的类型和 ID。服务器使用这些信息来确定需要处理什么类型的消息,以及谁对该消息感兴趣。没有这些,处理数据会非常困难,甚至不可能。现在,消息格式可能变得非常复杂,有些甚至使用头部和段来描述复杂数据。本示例力求将所有内容保持得尽可能简单。
Netty 服务器
我们来逐个类地看服务器。
NettyHttpFileHandler
这个类与之前的博文相比基本没有改变。可重用部分已移至 WebSocketHelper
类。这个文件的主要用途是提供浏览器请求的文件。
WebSocketHelper
第一个可能令人困惑的项目是类变量 clientAttr
。在 Netty Channel 中存储数据需要将其附加到 AttributeKey
。这类似于 Java 并发类中的 Atomic 实例——它提供了一个数据容器。我们将存储客户端 ID(在我们的例子中是用户名,但也可以是会话 ID),这样我们就可以找出哪个 Channel 需要接收消息。
realWriteAndFlush()
方法设置适当的头部、内容长度和 cookie。然后它写入并刷新 HTTP 响应。这一行
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
告诉 Netty 这是需要写入客户端的数据的结束,因此 Netty 会将其发送出去。
特别注意:关于 cookie 的创建,请确保不要设置 HTTP Only
标志。如果设置了,JavaScript 将无法看到 cookie,也不会在 WebSocket 升级请求中发送。这会导致你必须自己创建页面刷新管理和会话管理的方法。
关于 cookie 的另一件事是使用 Netty cookie 编码器的 STRICT 版本,这样它就不会允许同名的多个 cookie。我不确定什么时候允许这种情况发生会有用。
WebSocketMessageHandler
这个类只是定义了一个接口,WhirlpoolServerHandler
使用该接口与 WhirlpoolMessageHandler
进行通信。
WhirlpoolMessageHandler
这是 Netty 和 Kafka 之间建立连接的地方。两个 Executor 处理一个 reader 线程和一个 writer 线程。
writer 线程在请求队列中查找消息(稍后会详细介绍这些消息的来源),并将消息放入相应的 Kafka 命令主题。
reader 线程在 Kafka 数据主题中查找传入的消息,查找每个主题的正确 Channel,并将消息写入这些主题。
当客户端通过 WebSockets 发送消息时,WhirlpoolServerHandler
会确保已接收到完整消息,然后调用 handleMessage()
。此方法确定消息是否有效,然后将请求添加到请求队列,以便 reader 线程可以将其提取并交给 Kafka。
WhirlpoolServerHandler
这个类中有几个有趣的地方。首先,它可以区分 HTTP、REST 和 WebSocket 消息。Netty 中执行此操作的重写方法是 channelRead0
。这是 Netty 用来告诉我们何时收到消息以及消息类型的。对于 HTTP 和 REST 调用,调用 handleHttpRequest
;对于 websockets,调用 handleWebSocketFrame
。
handleHttpRequest
方法读取 cookie(如果存在)。对于 POST 请求,它处理登录和注销。对于登录,它找出用户名/密码,创建 cookie,并防止同名用户多次登录。所有这些代码都会被拆分出来,并在应用程序的生产版本中添加额外的安全性。对于注销,它会查找 Channel,清理它,关闭它,并使 cookie 过期。
对于 WebSocketUpgrade
,它要求 Netty 处理启动 websocket 所需的复杂握手。完成之后,它会将用户添加到握手期间创建的 Channel 中。这就是用户与 Channel 连接的地方,如果 cookie 没有在请求中传输,这将会非常困难。
这里需要注意的唯一其他事情是,这个类被设置为处理为 SPA(单页应用程序)编写的客户端,因为它会将任何无法识别的调用重定向到 index.html
。
类中的其他方法更多是用于信息目的,将在高级场景中使用。
WhirlpoolServer
这个类启动 Netty 服务器并创建 Channel 管道。它是 Netty 的标准类,遵循 Netty 的示例。
最终想法
显然,这段代码还可以包含更多内容。可以同时运行每个服务和服务器的多个实例,并且 Zk/Kafka 可以集群化以提高弹性。一个测试微服务应用程序弹性的优秀工具是 Keyhole 的另一个免费开源工具,名为 TroubleMaker。我还没有机会测试这个示例,但我期待这个机会。
我们没有讨论安全性,尽管我之前希望展示 Netty 与 Shiro 的集成,但这一个非常复杂的话题。关于它,我只能说这是可能的,但我还没有完全掌握所有部分,以至于能够形成一篇连贯的博文。
希望您喜欢这篇博文并发现代码很有用。请通过博文或 Twitter(@johnwboardman,我一直很欢迎新的关注者)与我联系。