65.9K
CodeProject 正在变化。 阅读更多。
Home

股票探索器:使用发布/订阅实现请求/响应

starIconstarIconstarIconstarIconstarIcon

5.00/5 (8投票s)

2016 年 12 月 3 日

CPOL

7分钟阅读

viewsIcon

18175

downloadIcon

146

使用开源发布/订阅、Go 和 Javascript 构建金融股票探索器应用程序。

引言

本文介绍了一个简单的股票探索器应用程序,演示了如何使用发布/订阅机制来编写经典的请求/响应式应用程序。更具体地说,是如何构建一个简单的金融查询服务,该服务利用雅虎财经和晨星的金融信息来提供一个简单的查询 API。我们将使用 emitter.io 服务来处理我们的发布/订阅通信。Emitter 源代码可以在 GitHub 上查看

[查看实时演示]

背景

现在,emitter 是一个分布式、发布-订阅MQTT 代理。在本文中,我们假设您对 MQTT 有一些基本了解,我们将不再详细介绍该协议的规范以及如何使用它,但是,这里有几个重要的点:

  • 在 MQTT 中,客户端可以订阅主题(通道),这些主题由分层字符串表示(例如:sensor/1/temperature/)。
  • 客户端可以将任意二进制数据发布到这些通道。
  • MQTT 数据包的头部只有 2 字节,并且存在各种客户端库。

在本文中,我们将在发布/订阅协议之上实现一个简单的请求-响应拓扑。策略非常简单:

服务器

  1. 订阅 quote-request 通道。
  2. 每次收到消息,例如 { symbol: "MSFT", reply: "1234"},它会处理该消息并将响应发送到 quote-response/xxx 通道,其中 xxx 是请求中的 reply 值,例如 quote-response/1234

客户端

  1. 订阅 quote-response/xxx 通道,其中 xxx 是会话的标识符——只有客户端知道的唯一值。
  2. 每次用户输入一个股票代码时,我们都会将带有股票代码和标识符的消息发送到 quote-request 通道。
  3. 每次收到响应时,处理它并绑定到视图。

服务器

我们将首先构建服务器,该服务器可以接收 quote-request 通道上的请求,并在 quote-response/... 通道上回复。我们首先导入 emitter 以及处理业务逻辑的其他库。

import (
    "./finance/provider"
    emitter "github.com/emitter-io/go"
)

在我们的 main() 函数中,我们首先初始化一个新的 Provider,它将处理数据的金融查询。由于我想让这篇文章简短,我们不会深入探讨其实现方式,但欢迎您在 GitHub 上探索其源代码。

p := provider.NewProvider()
o := emitter.NewClientOptions()

在选项中,我们设置了消息处理程序,这是一个函数,每当服务器收到消息时都会被调用。每次我们收到请求时,我们使用 json.Unmarshal() 解析它,并调用提供程序上的 GetQuotes() 方法,该方法将返回一个包含股票和金融信息以及股息历史的响应。然后,我们简单地将结果序列化并发布到 quote-response/ 通道,并在请求中指定一个子通道,确保只有请求者收到此响应。

// Set the message handler
o.SetOnMessageHandler(func(c emitter.Emitter, msg emitter.Message) {
    fmt.Printf("Received message: %s %v\n", msg.Payload(), msg.Topic())

    // Parse the request
    var request map[string]string
    if err := json.Unmarshal(msg.Payload(), &request); err != nil {
        fmt.Println("Error: Unable to parse the request")
        return
    }

    quotes, err := p.GetQuotes(request["symbol"])
    if err != nil {
        fmt.Println("Error: Unable to process the request")
        return
    }

    response, _ := json.Marshal(quotes[0])
    c.Publish(key, "quote-response/"+request["reply"], response)
})

最后,我们只需使用 NewClient() 函数创建一个新的 emitter 客户端来启动服务器,连接到它并订阅 quote-request 通道,以便我们可以接收请求。

// Create a new emitter client and connect to the broker
c := emitter.NewClient(o)
sToken := c.Connect()
if sToken.Wait() && sToken.Error() != nil {
    panic("Error on Client.Connect(): " + sToken.Error().Error())
}

// Subscribe to the request channel
c.Subscribe("FKLs16Vo7W4RjYCvU86Nk0GvHNi5AK8t", "quote-request")

客户端

我们将构建的客户端使用 VueJs 数据绑定框架将我们从服务器接收到的结果绑定到 HTML DOM。当然,您也可以使用任何其他数据绑定框架,如 React、Angular 或 Durandal 来实现。

我们的模型,如下所示,由一个绑定到输入框的 symbol 属性和将使用简单句柄标签绑定的 result 组成。有关更多信息,请查看包含所有布局和数据绑定的 index.html 页面。如您所见,模型本身相当简单。

var vue = new Vue({
    el: '#app',
    data: {
        symbol: 'AAPL',
        result: new Object()
    },
    methods: {
        query: function () {
	        // publish a query
        },
    },
});

客户端 - 网络

我们继续使用 emitter 实现网络部分。首先要做的就是连接到 emitter 代理。我们只需调用 emitter.connect(),它就会将我们连接到 api.emitter.io:8080 端点,这是一个免费的沙盒。

var emitter = emitter.connect({
    secure: true
}); 

一旦我们连接到服务器,我们需要订阅 quote-response/ 通道,并带有一个后缀,该后缀表示当前浏览器的唯一 ID。这里的想法是我们订阅此用户会话唯一的通道。这样,只有一个用户会收到响应通知。

emitter.on('connect', function(){
    // once we're connected, subscribe to the 'quote-response' channel
    console.log('emitter: connected');
    emitter.subscribe({
        key: resKey,
        channel: "quote-response/" + getPersistentVisitorId()
    });
})

然后我们添加一个查询方法,每当按下搜索按钮时都会调用它。在此方法中,我们只需向 quote-request 通道发布一条消息(例如:{ symbol: "AAPL", reply: "12345" }),其中我们提供一个 reply 参数,以便我们的服务器知道回复到哪里。这将被简单地附加到我们的服务器将发布响应的通道,例如上面示例中的 quote-response/12345

query: function () {
    // publish a message to the chat channel
    console.log('emitter: publishing ');
    emitter.publish({
        key: reqKey,
        channel: "quote-request",
        message: JSON.stringify({
            symbol: this.$data.symbol, 
            reply: getPersistentVisitorId()
        })
});

最后,每当我们收到消息时,我们只需使用 msg.asObject() 方法将其从 JSON 格式转换为对象,对数据进行一些处理并将结果绑定到我们的视图。

emitter.on('message', function(msg){

    // log that we've received a message
    var data = msg.asObject();
    console.log('emitter: received ', msg.asObject());

    // do some work
    // ...

    // bind the result to the screen
    vue.$data.result = data;
});

客户端 - 图表

我们将在股票探索器结果页面中显示几个图表

  1. 一个图表显示股票价值以及 50 天和 200 天的移动平均线。
  2. 一个图表显示股息历史。

对于第一个图表,我们将简单地使用雅虎财经图表 API。只需在 chart.finance.yahoo 端点中提供股票代码即可使用,它会返回一张图片,例如 http://chart.finance.yahoo.com/z?s=TSLA&t=6m&q=l&l=on&z=l&p=m50,m200 将显示特斯拉汽车公司的图表,包含 50 天和 200 天的移动平均线(绿色和红色线)。如下图所示,我们只需使用 {{result.Symbol}} 将股票代码替换为我们模型中的数据。

<img class="col-sm-12 yahoo-chart" src="http://chart.finance.yahoo.com/z?s={{result.Symbol}}&t=6m&q=l&l=on&z=l&p=m50,m200" />

我们想要呈现的第二个图表是股息历史。我们将使用 chartist javascript 库进行可视化。下面的函数在收到 emitter 代理的响应数据时绘制股息图表。我们简单地遍历股息,将标签(月/日值)推入一个数组,将系列(相应值)推入 series 数组。完成后,我们调用 Chartist.Line() 函数来触发图表的渲染。

function drawDividendChart(data){
    labels = [];
    series = [];
    data.DividendHistory.forEach(function(d){
        labels.push(formatDate(d.Date));
        series.push(d.Value);
    });

    // apply the chart
    new Chartist.Line('#dividends-chart', {
        labels: labels,
        series: [series]
        }, {
            fullWidth: true,
            axisX: {
                showGrid: false,
                labelInterpolationFnc: function(value, index) {
                    return index % 2 === 0 ? value : null;
                }
            }
    });
}

为什么选择发布/订阅?

您可能会想,为什么您会想为简单的请求-响应通信使用发布/订阅系统。这有点像房子的地基:如果您只想建造一层楼高的房子,您可以决定只建造一个只能支撑一层楼的地基。这会运作良好。但是,如果您想在将来为您的房子增加一层楼,您就会遇到麻烦——没有简单的方法可以扩展。您可以将发布/订阅想象成您的应用程序的地基。请求-响应是您的第一层。也许请求-响应是您现在唯一需要的,但在后期您可能想添加其他通信方式。在本文介绍的股票探索器应用程序中,假设您想根据特定事件向用户发送推送通知,例如股票水平超过某个阈值。理想情况下,如果接收方离线,这也应该有效,他应该在连接后能够收到消息。这可以通过 emitter 以 2 个简单步骤轻松完成。

  1. 确保代理通过通道发布消息,例如在某些股票水平事件的情况下,以股票代码作为通道名称的(一部分)。您可以为每种事件类型创建子通道。
// publish a message over the stocks/msft channel and store it
emitter.publish({                                     
  key: "<channel key>", 
  channel: "stocks/msft",
  ttl: 86400,                 // message will be deliverd max 86400 sec (one day) later if receiver was offline
  message: "microsoft stock up by 1.0%"
});
  1. 每个用户都可以订阅通道以接收与该通道的股票代码相关的推送通知。用户可以订阅完整的通道或子通道。
// connect, retrieve all messages that have been stored related to ticker MSFT 
// also, subscribe to receive all newly created messages
emitter.on('connect', function(){
  emitter.subscribe({
    key: "<channel key>",
    channel: "stocks/msft" 
  });
});

通过使用发布/订阅系统,您实际上完成了以下工作:您实现了多对一通信,您解耦了发送方和接收方,您使用了消息存储来在接收方离线时延迟传递消息,您根据(子)通道创建了消息过滤。所有这些只需几行代码。是不是很棒?

历史

  • 2016 年 12 月 02 日 - 文章的初始版本
© . All rights reserved.