65.9K
CodeProject 正在变化。 阅读更多。
Home

微服务服务发现

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.92/5 (24投票s)

2018年6月14日

CPOL

11分钟阅读

viewsIcon

57346

了解如何使用Consul实现微服务服务发现

引言

Madonna曾说过“我们生活在一个微服务的世界里”。好吧,她并没有完全这么说,但她知道什么。我们**确实**生活在一个微服务的世界里。

如果您目前正在从事/已经从事微服务,您会知道,这其中一个棘手的问题是“服务发现”。本文将简要介绍一些现有的解决方案,并花费其余大部分篇幅来介绍一个名为“Consul”的特定框架,用于在此领域提供帮助。 

 

我们要解决的问题

那么具体问题是什么呢?让我们看看这张图

假设我们在系统中部署了许多服务,例如SystemB/SystemC,并且还有其他服务(或UI)SystemA/SystemD想要利用这些现有服务。但是,为了做到这一点,我们需要知道这些现有服务的位置,或者假定它们的DNS名称是长期有效的。这就是“发现”的过程,也是本文其余部分将重点关注的内容。

 

现有解决方案

有相当多的框架可以帮助开发“发现”机制

 

动物园管理员

Zookeeper有服务器节点,需要节点法定人数才能运行(通常是简单多数)。它们是强一致性的,并公开各种原语,这些原语可以通过应用程序中的客户端库使用,来构建复杂的分布式系统。问题在于,虽然它提供了绝对的自由,但所有事情都取决于您自己来完成/构建。

Redis/数据存储

您可以使用Redis缓存,供服务存储元数据,然后消费者可以查询缓存。但是,要使其具有弹性,您确实需要某种形式的集群,以及一些共识/gossip来实现一致性。这相当困难,因此大多数人只是走捷径,将其设为单例,这显然是单点故障。

Kubernetes

Kubernetes通过Services/DNS addon/pods在“发现”方面做得很好,所有这些都可以轻松进行负载均衡(前提是您正在使用云托管的Kubernetes)。我之前写过关于这方面的内容,如果您有兴趣,可以在这里阅读更多: https://sachabarbs.wordpress.com/kubernetes-series/

这是一个很好的解决方案,前提是您的东西运行在容器中(最好在云环境中)

Consul

Consul(在我看来)是唯一一个直接处理“发现”问题的工具/框架,它提供了一个功能丰富的工具,可以出色地完成这项工作,并且开发人员只需付出很少的努力。本文档 pretty good read of the comparisons between Consul and others : https://www.consul.io/intro/vs/index.html 

在本文的其余部分,我将重点介绍Consul

 

 

Consul 讨论

因此,在我们开始演示应用程序之前,让我们花点时间谈谈Consul是什么,谁创建了它,以及为什么我认为它很棒。

谁创建了Consul,为什么应该信任它?

Consul由Hashicorp创建,Hashicorp是制造Vagrant的团队(Vagrant是一个很棒的VM注册系统)。Hashicorp知道他们在做什么,他们拥有出色的产品成功记录。所以是的,您可以信任他们,他们会支持您。

基本思路

Consul的主要特点如下:

  • 提供一种将服务注册到目录的方法,以及可查询的元数据
  • 提供一种使用元数据查询目录的方法
  • 提供健康检查服务的功能
  • 提供一个键值存储
  • 公开一种设置ACL(访问控制列表)的方法
  • 公开一种创建分布式信号量的方法。当您想协调多个服务同时限制对某些资源的访问时,这很有用。有关更多信息,请参见此处: https://www.consul.io/docs/guides/semaphore.html,并参考相关的客户端库以获取您所需的具体语言示例
  • 公开一种使用Consul创建自己的客户端侧领导者选举的方法。有关更多信息,请参见此处: https://www.consul.io/docs/guides/leader-election.html,并参考相关的客户端库以获取您所需的具体语言示例

它可靠吗?

本质上,Consul期望您设置一个集群,其中集群节点之间存在gossip协议。Consul使用Serf(请参见 https://www.consul.io/docs/internals/gossip.html,顺便说一句,Serf是另一个Hashicorp产品)来处理gossip。本质上,节点之间的gossip将处理某种形式的共识/领导权/一致性/复制,这就是其可靠性的来源。

客户端库

如上所述,Consul公开了REST API/Go客户端,允许您将自己的服务/键值/ACL集成到Consul机制中。您可以完全使用这些REST API/Go客户端,没有任何问题。但是,有大量的社区客户端可以为您封装Consul REST API,因此值得查看一下。

您可以在此处找到完整的客户端列表: https://www.consul.io/api/libraries-and-sdks.html  

Web仪表盘

如果您运行了一个Consul代理,您应该能够访问一个Web仪表盘,该仪表盘开箱即用,应该看起来像这样(请注意,我已经设置了以下环境变量CONSUL_UI_BETA,并将其设置为true,以使用“新UI”)

导航到http:localhost:8500

 

这个Web UI允许您查看以下内容:

  • 已注册的服务
  • 集群节点
  • 键值存储的键/值
  • 您设置的任何ACL(访问控制列表)值

一旦我向您展示了如何使用Consul.NETConsul中自我注册,我们就会再次查看这个Web应用。

 

使用Consul.Net的演示应用

如上所述,有许多Consul REST包装器,但我选择展示.NET版本,它使用了Consul.NET,这是一个.NET Core 2.0 WebAPI和一个简单的控制台应用程序客户端。

 

演示应用程序执行以下操作:

  • 演示如何将API自我注册到Consul
  • 演示如何将API健康检查与Consul挂钩
  • 演示如何使用Consul键值存储
  • 演示客户端如何使用ConsulCatalog发现服务
  • 演示客户端使用“已发现”的API

 

虽然Consul.NET可以通过NuGet获得,但熟悉测试/API仍然是很有益的,您可以在其GitHub仓库中阅读有关这些内容。

 

演示应用先决条件

 

运行演示

您可以从我的仓库中获取本文的代码: https://github.com/sachabarber/ConsulDemo,下载后只需执行以下2项操作:

  • 打开一个命令提示符窗口,进入consul.exe所在的位置,然后执行命令consul agent -dev。这将启动一个单节点用于测试。这些数据在Consul重启后将不会保留。因此,这显然不适用于生产环境,仅用于测试。教您关于集群的知识超出了本文的范围,请参阅文档: https://www.consul.io/intro/getting-started/join.html
  • 在Visual Studio中打开上述仓库中的代码,并确保按以下顺序运行项目:
    • ConsulDemoApi,只需等待API在浏览器中启动并显示一个空的JSON数组响应(这是演示应用程序的起点,客户端将在发现并使用API时提供一些项)。
    • ConsulDemoApi.Client,一旦确定API正在运行,请在客户端控制台中按任意键,允许它通过Consul发现并使用API。

就是这样了。

 

一个示例API

示例API可以是任何东西,它更多地是关于如何将其注册到Consul。但是,我还想展示如何使用Consul键值存储,因此我选择了一个简单的.NET Core 2.0 WebApi项目,其中包含2个控制器:

  • ValuesController:这是一个简单的控制器,它期望从Consul键值存储中PUT/GET/DELETE项。
  • HealthController:这个控制器由与该服务在Consul中注册相关的Consul健康检查使用。

API控制器

这是ValuesController的全部代码,我认为它相当不言自明。通过Consul.NETConsul键值存储非常容易使用。

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Consul;
using Microsoft.AspNetCore.Mvc;
using Newtonsoft.Json.Linq;

namespace ConsulDemoApi.Controllers
{
    [Produces("application/json")]
    [Route("api/[controller]")]
    public class ValuesController : Controller
    {
        private Func<IConsulClient> _consulClientFactory;

        public ValuesController(Func<IConsulClient> consulClientFactory)
        {
            _consulClientFactory = consulClientFactory;
        }


        // GET api/values
        [HttpGet]
        public async Task<IEnumerable<string>> Get()
        {
            using (var client = _consulClientFactory())
            {
                var queryResult = await client.KV.List("ConsulDemoApi-ID-");
                if (queryResult.StatusCode == System.Net.HttpStatusCode.OK)
                {
                    List<string> finalResults = new List<string>();
                    foreach (var matchedPair in queryResult.Response)
                    {
                        finalResults.Add(Encoding.UTF8.GetString(matchedPair.Value, 0,
                            matchedPair.Value.Length));
                    }
                    return finalResults;
                }
                return new string[0];
            }
        }

        // GET api/values/5
        [HttpGet("{id}")]
        public async Task<string> Get(int id)
        {
            using (var client = _consulClientFactory())
            {
                var getPair = await client.KV.Get($"ConsulDemoApi-ID-{id.ToString()}");
                return Encoding.UTF8.GetString(getPair.Response.Value, 0,
                    getPair.Response.Value.Length);
            }
        }

        // PUT api/values/5
        [HttpPut("{id}")]
        public async Task Put(int id, [FromBody]JObject jsonData)
        {
            using ( var client = _consulClientFactory())
            {
                var jsonValue = jsonData["Value"].ToString();
                var putPair = new KVPair($"ConsulDemoApi-ID-{id.ToString()}")
                {
                    Value = Encoding.UTF8.GetBytes(jsonValue)
                };
                await client.KV.Put(putPair);
            }
        }

        // DELETE api/values/5
        [HttpDelete("{id}")]
        public async Task Delete(int id)
        {
            using (var client = _consulClientFactory())
            {
                await client.KV.Delete($"ConsulDemoApi-ID-{id.ToString()}");
            }
        }
    }
}

 

老实说,关于这个控制器没有什么更多可说的了,它只是键值存储上的简单REST。

 

注册/健康检查

好的,既然我们有了一个API,我们就想把它注册到Consul。这是重点。让我们看看如何做到这一点。

它大致分为以下几个步骤(请记住,我使用的是.NET Core 2.0,如果您使用的不是,可能会有所不同):

创建一些配置

这是我的本地Consul的最小appsettings.json文件:

{
  "ConsulConfig": {
    "Address": "http://127.0.0.1:8500",
    "ServiceName": "ConsulDemoApi",
    "ServiceID": "ConsulDemoApi-v1"
  }
}

看到它包含服务名称/ID(ID必须是唯一的)。

注册服务

下一步是注册服务。有许多方法可以做到这一点,但是.NET Core 2.0带有一个漂亮的接口IHostedService,它允许我向HTTP管道注册启动任务。

因此,只需在IHostedService实现中添加一些注册代码即可。这是演示应用程序中的代码:

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Consul;
using ConsulDemoApi.Config;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Hosting.Server.Features;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace ConsulDemoApi.Services
{
    public class ConsulHostedService : IHostedService
    {
        private Task _executingTask;
        private CancellationTokenSource _cts;
        private readonly IConsulClient _consulClient;
        private readonly IOptions<ConsulConfig> _consulConfig;
        private readonly ILogger<ConsulHostedService> _logger;
        private readonly IServer _server;
        private string _registrationID;

        public ConsulHostedService(
            IConsulClient consulClient, 
            IOptions<ConsulConfig> consulConfig, 
            ILogger<ConsulHostedService> logger, 
            IServer server)
        {
            _server = server;
            _logger = logger;
            _consulConfig = consulConfig;
            _consulClient = consulClient;

        }
        public async Task StartAsync(CancellationToken cancellationToken)
        {
            // Create a linked token so we can trigger cancellation outside of this token's cancellation
            _cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

            var features = _server.Features;
            var addresses = features.Get<IServerAddressesFeature>();
            var address = addresses.Addresses.First();

            var uri = new Uri(address);
            _registrationID = $"{_consulConfig.Value.ServiceID}-{60008}";

            var registration = new AgentServiceRegistration()
            {
                ID = _registrationID,
                Name = _consulConfig.Value.ServiceName,
                Address = $"{uri.Scheme}://{uri.Host}",
                Port = 60008,
                Tags = new[] { "Consul", "SachaBarber-Demo" },
                Check = new AgentServiceCheck()
                {
                    HTTP = $"{uri.Scheme}://{uri.Host}:60008/api/health/status",
                    Timeout = TimeSpan.FromSeconds(3),
                    Interval = TimeSpan.FromSeconds(10)
                }
            };

            _logger.LogInformation("Registering in Consul");
            await _consulClient.Agent.ServiceDeregister(registration.ID, _cts.Token);
            await _consulClient.Agent.ServiceRegister(registration, _cts.Token);
        }

        public async Task StopAsync(CancellationToken cancellationToken)
        {
            _cts.Cancel();
            _logger.LogInformation("Deregistering from Consul");
            try
            {
                await _consulClient.Agent.ServiceDeregister(_registrationID, cancellationToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, $"Deregisteration failed");
            }
        }
    }
}

代码的主要重点显然是向Consul注册**此**服务。因此,请确保您理解上面代码中的注册部分。我认为最重要的一点是如何将额外的元数据与注册关联起来,这允许消费者使用此元数据进行查询,而无需了解实际端点(这正是我们试图确定的),我们只需说“**我想要所有名为‘ConsulDemoApi’的服务**”,然后我们会收到所有标记为此类服务的注册信息,其中包括该服务的端点信息,然后调用者可以使用该信息。

请注意,注册还包括一个CheckConsul有许多种Check,这只是其中一种,它将调用一个REST端点,在本例中是该应用程序的一部分,即HealthController,它看起来像这样:

using Microsoft.AspNetCore.Mvc;

namespace ConsulDemoApi.Controllers
{
    [Route("api/[controller]")]
    public class HealthController : Controller
    {
        [HttpGet("status")]
        public IActionResult Status() => Ok();
    }
}

有关可用Check类型的完整列表,请阅读: https://www.consul.io/docs/agent/checks.html

引导IOC容器

最后一步是确保正确的服务注册到IOC容器中(这可能因您的设置而异)。

由于我使用的是.NET Core 2.0 WebAPI,对于演示应用程序,这在Startup.cs中完成,如下所示:

 

public void ConfigureServices(IServiceCollection services)
{
    services.AddSingleton>IHostedService, ConsulHostedService>();
    services.Configure>ConsulConfig>(Configuration.GetSection("ConsulConfig"));
    services.AddSingleton>IConsulClient, ConsulClient>(p => new ConsulClient(consulConfig =>
    {
        var address = Configuration["ConsulConfig:Address"];
        consulConfig.Address = new Uri(address);
    }));

    services.AddSingleton>Func>IConsulClient>>(p => () => new ConsulClient(consulConfig =>
    {
        var address = Configuration["ConsulConfig:Address"];
        consulConfig.Address = new Uri(address);
    }));


    .....
}

 

一个示例客户端

因此,示例客户端是一个简单的.NET Core 2.0控制台应用程序,它执行以下操作:

  • 使用Consul查询机制发现1-N个服务(我们使用有关要发现和使用的服务的元数据)。
  • 删除先前所有值 CURRENT 已发现的服务。
  • 使用 CURRENT 已发现的服务放置一些新值。
  • 使用 CURRENT 已发现的服务获取所有新值。

 

使用Catalog获取元数据

最难的部分(好吧,其实很简单)是发现匹配给定元数据的服务。这可以通过以下方式完成:

private readonly List<Uri> _serverUrls;


public async Task Initialize()
{
    var consulClient = new ConsulClient(c =>
    {
        var uri = new Uri(_configuration["ConsulConfig:Address"]);
        c.Address = uri;
    });

    _logger.LogInformation("Discovering Services from Consul.");

    var services = await consulClient.Agent.Services();
    foreach (var service in services.Response)
    {
        var isDemoApi = service.Value.Tags.Any(t => t == "Consul") &&
            service.Value.Tags.Any(t => t == "SachaBarber-Demo");
        if (isDemoApi)
        {
            var serviceUri = new Uri($"{service.Value.Address}:{service.Value.Port}");
            _serverUrls.Add(serviceUri);
        }
    }

    ....
}

看到我们如何使用Tags来提供搜索的谓词。然后,我们只需将匹配该查询的端点信息存储在一个列表中,以便以后使用已发现的服务。

使用Polly进行重试和切换

这里的一个巧妙技巧是,我们可以利用Polly重试库,这样在达到一定的重试次数后,我们可以切换到下一个已发现的服务。这在交易应用程序中很常见,其中可能有大量的价格流服务,如果一个服务宕机,我们就切换到下一个。

serverRetryPolicy = Policy.Handle<HttpRequestException>()
    .RetryAsync(retries, (exception, retryCount) =>
    {
        ChooseNextServer(retryCount);
    });
	
private void ChooseNextServer(int retryCount)
{
    if (retryCount % 2 == 0)
    {
        _logger.LogWarning("Trying next server... \n");
        _currentConfigIndex++;

        if (_currentConfigIndex > _serverUrls.Count - 1)
            _currentConfigIndex = 0;
    }
}

	

然后,对于使用已发现服务的调用,我们可以简单地使用当前的“工作”服务端点信息。

public Task<bool> DeleteValueAsync(int id)
{
    return _serverRetryPolicy.ExecuteAsync(async () =>
    {
        var serverUrl = _serverUrls[_currentConfigIndex];
        var requestPath = $"{serverUrl}api/values/{id}";
        _logger.LogInformation($"Making request to DELETE {requestPath}");
        var response = await _apiClient.DeleteAsync(requestPath).ConfigureAwait(false);
        return response.IsSuccessStatusCode;
    });
}

 

演示应用的Web仪表盘

现在我们有了演示应用程序,并且已经启动了Consul(通过命令行consul agent -dev),我们可以让它运行并再次查看Consul Web UI。

首先,请确保您已启动Consul并运行了两个演示应用程序项目,您应该看到类似以下的输出:

然后,让我们再次启动Consul Web应用程序,现在我们可以看到已注册的服务(请记住,Consul上的-dev标志会启动一个非持久化的Consul单节点集群,因此在机器重启后您将看不到任何内容)。

我们可以深入了解我们新注册的服务,并且看到我们的健康检查也在工作。

我们还使用了键值存储,所以让我们来看看。

在这里,我们还可以查看其中一个值。

太好了,一切都按我们想要的方式工作。女士们先生们,这就是全部。

 

 

 

 

 

结论

Consul使用起来非常方便,如果您还没有这样做,真的应该好好看看。它太棒了。

© . All rights reserved.