65.9K
CodeProject 正在变化。 阅读更多。
Home

面向服务的微服务中的面向方面编程和其他模式

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.93/5 (10投票s)

2020年4月7日

CPOL

21分钟阅读

viewsIcon

17815

downloadIcon

159

在微服务上下文中,在现代 OO 语言中使用面向方面编程等模式

Sample Image - maximum width is 600 pixels

引言

本文在现代微服务平台的背景下,提供了对面向切面编程的最新视图。它描述了面向切面编程的优缺点以及其实现的优缺点。它不深入描述面向切面编程的概念。市面上有其他文章很好地完成了这项工作。

参阅相关文章

本文是如何构建的?

  • Java 和 C# 中最终用法的示例
  • 现代微服务中的面向切面编程,为什么以及如何?
  • 高级设计和原理(设计的优缺点)
  • Java 实现细节
  • C# 实现细节

在 Java 中使用代码(Java 11+,Spring Boot 2.2+,Spring Boot AOP,AspectJ)

	// Java code usage example
	// 
	// The @around @CheckPoint (long x) aspect does a high level (distributed) 
    // logging of the checkpoint 
	// The @CheckPointParameter annotated method parameters are logged 
    // as properties with the log
	
	@CheckPoint(10)
	public void annotatedCheckPointMethod(@CheckPointParameter String user) {
	 // -> results in @Before checkpoint logging
		
		....code ...
		
	}// -> results in @After checkpoint logging

在 C# 中使用代码(C# 7,.NET MVC Core 2.1+,Autofac,Caste Core Dynamic Proxy)

	// C# code usage example
	// 
	// The [CheckPoint (long x)] dynamic proxy interceptor 
	// aspect does a high level (distributed) logging of the checkpoint  
	// The [CheckPointParameter] annotated method parameters are logged 
    // as properties with the log
	
	[CheckPoint(10)]
	public void AnnotatedCheckPointMethod([CheckPointParameter] String user) 
	{// -> results in @Before checkpoint logging
		
		....code ...
		
	}// -> results in @After checkpoint logging

概念:基本原则

什么是面向切面编程?

关于面向切面编程是什么,我参考[1]

面向切面编程如何工作?

在本文的实现中,我们使用 CTW(编译时织入)和 LTW(加载时织入)。

LTW 织入在运行时“织入”一个中间对象,该对象实现相同的接口,并执行切面工作,然后委托给实现代码。

CTW 织入在编译时将切面代码“织入”到适当位置。C# 和 Java 都使用中间代码(字节码或 IL 代码)进行编译。因此,通常情况下,首先由原生语言编译器生成字节码,然后由切面编译器将切面织入字节码中。

为什么要使用面向切面编程?

在现代、面向微服务的应用环境中,通常不会只有一个、两个或三个服务,而是几十个甚至数百个微服务。通常,如果我们要对该软件进行模块化,至少有以下几种可能性:

  1. 使用分布式服务调用
  2. 使用二进制库
  3. 复制代码
  4. 使用面向切面编程

注意:这种拆分视图过于简化、不完整,是的,一个实现很可能进行分布式调用,或者分布式服务调用可以通过二进制库中的客户端门面模式隐藏。所以,我知道,这种世界观是有缺陷的,但任何系统中的视图拆分都有例外并且是可争议的。我只是用它来展示切面如何使用。

1. 使用分布式服务调用

我们将可重用部分拆分为一个单独的服务,并从所有需要此功能的其他服务调用该服务。在所有情况下,这都不是最快、最可扩展和最健壮的解决方案,例如,对于日志记录之类的东西。通常,在你意识到之前,这会变成微服务领域中每个人都警告不要使用的“上帝”服务模式……

	public void DistributedCheckPointMethod(String user)
	{ 
		SendCheckPoint("before", 10, user);
		...code...
		SendCheckPoint("after", 10, user);
	}
	public void SendCheckPoint(String place, long id, String user)
	{ 
		List<string> props = new List<string>(){user};
		rest.CheckPoint(place, id, props);
	}

2. 使用二进制库

我们将可重用部分拆分为一个单独的二进制库(jar、dll、nuget、maven 等),并从所有需要此功能的其他服务调用该 API。

	public void DistributedCheckPointMethod(String user)
	{ 
		libApi.SendCheckPoint("before", 10, user);
		...code...
		libApi.SendCheckPoint("after", 10, user);
	}

3. 复制粘贴代码

只需将代码复制粘贴到所有服务中。这种方法当然有优点。这就是为什么它仍然被大量使用 :-((。它简单,易于适应小例外,并且通常在性能上具有可扩展性。这曾经被认为在所有情况下都是一个真正糟糕的设计。

注意:在现代面向对象设计中,存在诸如数据传输对象之类的模式,它们可以被复制粘贴并根据该特定使用的需要进行调整,如果使用得当,不被认为是“坏”设计。

	public void DistributedCheckPointMethod(String user)
	{ 
		List<string> props = new List<string>(){user};
		rest.CheckPoint(place, id, props);
		...code...
		rest.CheckPoint("after", 10, props);
	}

4. 使用面向切面编程

尽管这听起来像一个全新的选项,但它通常是前三种解决方案之上。这样做的优点是 API 足迹几乎是你可以拥有的最小 API。

注意:我最喜欢的命令式切面解决方案是,将切面与二进制库和门面客户端/代理方法以及带有队列的 CQRS 模式结合起来,我将在本文后面解释这一点。

“客户端”服务代码通常只定义一个标签(注解),例如

	@CheckPoint(10)
	...
	
	@FeatureFlag("Feature1")
	...	

因此,API 被定义为“一个带可选参数的词”。无论是调用还是实现,都隐藏在切面代码中。这样,实现和 API 分离得非常清晰,由于实现更改而导致 API 更改的可能性非常小。别忘了,您的 API 调用可能会在每个服务中出现数百次,在几十个或更多服务中出现。而且,是的,如果您为此使用切面,并且有一个(简单但有缺陷的设计)调用分布式服务,您仍然创建了那个“SPOF 上帝”服务。但是,至少,您拥有该服务最少的 API,并且在代码中被调用的次数最少,因此实现更改通常影响小得多。

当然,如果您想对类中的所有方法执行某些操作,您可以定义一个类级别切面

	[Log("SomeClass")]
	public class SomeClass
	{
	   ....methods go here, and all methods do logging...
	}	

高层设计

应用程序和切面库的打包。切面和客户端打包在一个(二进制)库中。

正如我们在这里看到的,应用了四种门面模式

  1. 检查点切面门面,它将检查点切面行为从应用程序 SomeClassUsingCheckPoints 中隐藏起来。
  2. 在检查点切面内部,门面 CheckPointClient 将消息发送从检查点切面中隐藏起来。
  3. 在客户端门面内部,有一个检查点队列,它向客户端隐藏了存在检查点服务的事实。
  4. 在队列内部,有一个检查点服务,它向队列隐藏了消息的处理方式。

换句话说,应用了关注点分离

  • 应用程序的关注点是在应用程序级别执行某些操作(annotatedCheckPointMethod
  • 切面的关注点是拦截和处理检查点
  • 客户端的关注点是发送消息
  • 队列的关注点是接收和发送消息
  • 服务的关注点是处理检查点消息。
  • 所以,一切都只有一个关注点。

并且,请注意,应用程序的 CheckPoint API 非常简洁,甚至与应用程序二进制分离。
最后,队列部分实现了 CQRS 命令查询职责分离。我们需要一个检查点的事实是一个命令:创建/发送一个检查点(命令)。

现在,我们可以看到,在本例中,通过应用组合的门面、客户端、队列和 CQRS 模式,我们实现了关注点分离、松散耦合以及大量的灵活性和重用可能性。面向切面编程为我们提供了非常清晰的 API 拆分,并出色地完成了门面工作。

通用实现细节

通常,对于 Java 和 .NET,我将省略 CheckPoint 服务器/副端。
由于本文主要讨论面向切面编程,我将重点关注“客户端”的实现。
对于队列消息实现,我选择了 RabbitMQ。它在 Java 和 C# 以及 Windows 和 Linux 和“云”中都得到相当普遍的接受和良好支持。

注意:运行代码之前,您必须先在系统上安装 RabbitMQ。请参阅 https://rabbitmq.cn/download.html

Java 实现细节

Java 切面

在深入代码之前,我在这里对 Java 中的切面提出几点注意事项。然后您可以预先决定选择其中一个并深入了解其细节。

备注 1:我在 Java 中找到了编译时和加载时织入。

备注 2:两种实现都使用 Spring Boot。

备注 3:加载时织入使用“纯”Spring AOP。其中一个“设计特性”(你可以称之为 bug、限制或其他)是切面只对接口调用有效,因为那时你才会有 com.sun 代理。它不对实现类有效。结果是,你需要将你的切面定义在接口上。而且,如果你在你的切面加载方法内部调用另一个实现方法,那个切面就不会执行。

LTW 优点

  • 与大多数 Spring AOP 版本开箱即用

LTW 缺点

  • 性能不佳。我选择了更便宜的 @Before 切面,而不是性能开销最大的 @Around 切面;
  • 内部切面不起作用。

备注 4:编译时织入使用 Spring AOP,以及 aspectjrt 和预切面编译器。然而,我又陷入了 jar mvn 版本地狱。我只能让它与 aspectj-maven-plugin 编译器的某个私有开发分支(1.12.6,com.nickwongdev)以及 2020 年 3 月 Spring Boot(Hoxton.SR3)的最新版本 2.x 系列一起工作。

<dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-parent</artifactId>
			<version>Hoxton.SR3</version>
			<type>pom</type>
			<scope>import</scope>
		</dependency>
	</dependencies>
</dependencyManagement>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-aop</artifactId>
	<version>2.2.5</version>
</dependency>
<dependency>
	<groupId>org.aspectj</groupId>
	<artifactId>aspectjrt</artifactId>
	<version>1.9.5</version>
</dependency>
<plugin>
	<groupId>com.nickwongdev</groupId>
	<artifactId>aspectj-maven-plugin</artifactId>
	<version>1.12.6</version>
</plugin>

CTW 优点

  • 更好的性能
  • “内部”实现切面有效

CTW 缺点

  • 需要更多设置
  • Spring Boot 和 aspectj-maven-plugin 编译器之间版本发布链的限制

LTW Java 代理

以下步骤将实现您的 LTW 切面。

首先,定义切面注解

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface CheckPoint {
    long value() default 0;
}

接下来,定义 @Before 切面实现

@Configurable
@Aspect
@Component
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public class CheckPointAspect {
	 /**
     * Fires checkpoint with properties.
     *
     * @throws java.lang.Throwable
     * @see unitests for examples
     *
     *
     * @param joinPoint place of method annotated
     * @param checkPointAnnotation the checkpoint
     * @return proceeding object return value of method being annotated
     */
    @Before("@annotation(checkPointAnnotation)")
    public void CheckPointAnnotation(
            final JoinPoint joinPoint,
            CheckPoint checkPointAnnotation) throws Throwable {
        log.info("1. for checkPoint {} ", checkPointAnnotation.value());
		...do check point send here...
	}
}	

接下来,定义一个(测试)接口,使用 @CheckPoint。确保在接口方法上定义了属性

public interface CheckPointTest {
	@CheckPoint(id=10) 
	void doCheck();
}

然后,实现测试类

@Component
public class CheckPointTestImpl implements CheckPointTest {
    @Override
    public void doCheck() {
		log.info("2. CheckPointTestImpl.doCheck");
    }

CTW Java 切面编译器

以下步骤将实现您的 CTW 切面。

实际上,编码方面完全没有区别。因此,所有编码都如上所述,与 LTW 完全相同。

唯一的区别在于 pom 项目配置:org.aspectjrt 依赖项,以及 maven aspectj-maven-plugin 编译器插件

<dependency>
	<groupId>org.aspectj</groupId>
	<artifactId>aspectjrt</artifactId>
	<version>1.9.5</version>
</dependency>
<plugin>
	<groupId>com.nickwongdev</groupId>
	<artifactId>aspectj-maven-plugin</artifactId>
	<version>1.12.6</version>
	<configuration>
		<source>11</source>
		<target>11</target>
		<proc>none</proc>
		<complianceLevel>11</complianceLevel>
		<showWeaveInfo>true</showWeaveInfo>
		<sources>
			<source>
				<basedir>src/main/java</basedir>
				<excludes>
					<exclude>nl/bebr/xdat/checkpoint/api/*.*</exclude>
				</excludes>
			</source>
		</sources>
	</configuration>
	<executions>
		<execution>
			<goals>
				<goal>compile</goal>
				<goal>test-compile</goal>
			</goals>
		</execution>
	</executions>
	<dependencies>
		<dependency>
			<groupId>org.aspectj</groupId>
			<artifactId>aspectjtools</artifactId>
			<version>1.9.5</version>
		</dependency>
	</dependencies>
</plugin>

注意 1:当您执行 LTW Spring AOP 时,切面类由 Spring Boot 容器加载和管理,因此您的连接是自动完成的。CTW 则不然。切面编译器、您的切面和 Spring Boot IOC 不会开箱即用地良好配合。您必须手动连接每个切面。否则,您的 @Autowiring 将失败。

....
/**
 * This is plumbing code to connect the CTW aspect to the Spring Boot IOC container 
 * of your app context.
 * Otherwise, the CheckPointAspect is factored by the aspectj compiler, 
 * and then, your autowiring fails, resulting in nulls for all your @Autowired props. 
 * So, you need to include this method in all your CTW aspects.
 * I suppose the aspectjrt checks if this method is available,
 * and then uses it to factor your objects via Spring Boot? 
 * @return CheckPointAspect
 */
public static CheckPointAspect aspectOf() {
	return  SpringApplicationContextHolder.getApplicationContext().
                                           getBean(CheckPointAspect.class);
}
...

//Helper class for your CTW/Spring Boot container wiring stuff	
@Component
public class SpringApplicationContextHolder implements ApplicationContextAware {

    private static ApplicationContext applicationContext = null;

    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) 
           throws BeansException {
       this.applicationContext = applicationContext;
    }
}	

注意:我最喜欢的组件之一是 Lombok。它节省了大量的样板代码,并且 @Builder 模式非常完美。然而,maven aspectj 编译器与 Lombok 配合不佳。由于 aspectj 在 Lombok 之前运行,您还没有构建器和 getter/setter,并且 aspectj 编译器不再理解您的代码。我只能通过排除 DTO 包来使其工作。

<sources>
	<source>
		<basedir>src/main/java</basedir>
		<excludes>
			<exclude>nl/bebr/xdat/checkpoint/api/*.*</exclude>
		</excludes>
	</source>
</sources>

Java 切面总结

我们从之前的面向切面 Java 实现中学到了什么?

Java CTW 和 LTW 面向切面编程有效

  • 在 Java 11 中,使用 Spring Boot
  • 您确实需要使用实现、接口和 IOC 设置进行干净的编程

Java LTW 有效

  • 仅使用 Spring AOP 依赖项相对简单
  • 存在限制:性能,并非所有内部切面都有效

Java CTW 有效

  • 具有更好的性能
  • 内部切面确实有效
  • 需要手动连接到 IOC 容器
  • 干扰其他编译器,如 Lombok
  • 具有繁琐庞大、不稳定的版本树依赖关系(仅支持 Spring boot 2.5.x,带分支编译器版本)

.NET C# 实现细节

.NET 切面

关于 C# 中的切面有几点需要注意。

备注 1:我没有找到 .NET 的编译时字节码切面织入实现。 .NET 中有 IL 织入,但我找不到像 Fody 织入那样的切面。所以我只实现 LTW 代理解决方案。

备注 2:.NET 和 Java 在方法签名中的语法和同步/异步方面存在相当大的差异。这在 .NET 中对拦截和切面有影响。我成功地使用 AutoFac 在具有纯同步方法的标准类上实现了拦截,但在异步 HTTP Rest 控制器方法上就失效了。因此,对于 REST 控制器上的拦截,我使用了我偶然发现的另一个解决方案,即 ActionFilterAttributes。我用它来拦截 REST 控制器方法。对于其他类,我们使用 AutoFac 和 Castle Core 动态代理组合的拦截机制。是的,我们可以在任何地方使用 ActionFilterAttributes

C# 动态代理

备注 1:“Maven、Nuget、DotNet、dll、jar、Spring,它们都是版本地狱……”您必须找出哪个版本的 Castle、AutoFac、AutoFac.Extra.DynamicProxy 等可以协同工作以完成任务。

这在 2020 年 5 月对我有效,但如果您使用其他组件或想要更高的版本,您可能需要像我一样,经过几个令人沮丧的小时进行大量的调整。

	Autofac : 5.1.0
	Autofac.Extras.DynamicProxy : 5.0.0
	Castle.Core: 4.4.0
	FakeItEasy: 6.0.0

此 C# 中的 DynamicProxy 如何工作?

首先,定义一个接口。确保在接口方法上定义了属性。

public interface AOPTest
{
	//
	// Castle DynamicProxy interceptor
	//
	[CheckPoint(Id = 10)]
	void DoSomething();

	//
	// FilterAttribute
	//
	[CheckPointActionAtrribute(Id = 10)]
	void DoAction();
}	

然后,实现接口

	public class AOPTestImpl : AOPTest
	{
		// -> invocation point
		// @Before is done in the interceptor below, 1. and 2.
		public void DoSomething()
		{
			// 
			// invocation.Proceed() : Method block code goes below here
			//
			Debug.Print("3. DoSomething\n");
			
			//
			// 4. @After invocation is done in the interceptor below, 
            // after the invocation.Proceed(); call
			//
		} 

		public void DoAction()
		{
		}
	}	

然后,定义拦截器。

请注意,在 Java 和 .NET 中,都定义了“调用点”。

	public class CheckPointInterceptor : Castle.DynamicProxy.IInterceptor
	{
		//
		// As we use PropertiesAutowired() with AutoFac, 
        // this object is set by the IOC container
		//
		public CheckPointClient checkPointClient { get; set; }
		
		public void Intercept(Castle.DynamicProxy.IInvocation invocation)
		{
			Debug.Print($"1. @Before Method called {invocation.Method.Name}");
			var methodAttributes = invocation.Method.GetCustomAttributes(false);
			CheckPointAttribute theCheckPoint =(CheckPointAttribute)methodAttributes.Where
                (a => a.GetType() == typeof(CheckPointAttribute)).SingleOrDefault();
			if (theCheckPoint == null)
			{
				//@before intercepting code goes here
				checkPointClient.CheckPoint(theCheckPoint.Id);
				Debug.Print($"2. CheckPointAttribute on method found with cp id = 
                               {theCheckPoint.Id}\n");
			}
			//
			// This is the actual "implementation method code block".
			// @Before code goes above this call
			//
			invocation.Proceed();
			//
			// Any @After method block code goes below here
			//
			Debug.Print($"4. @After method: {invocation.Method.Name}");
		}
	}	

接下来,使用 AutoFac 注册对象

	var builder = new ContainerBuilder();
	...
	builder.RegisterType<aoptestimpl>()
                .As<aoptest>()
                .EnableInterfaceInterceptors()
                .InterceptedBy(typeof(CheckPointInterceptor))
                .PropertiesAutowired()
                .PreserveExistingDefaults();
	...	
	var container = builder.Build();
    ...		

最后,让我们测试并使用代码

	using (var scope = container.BeginLifetimeScope())
	{
		//
		// Create the registered AOPTest object, with the interceptor in between 
		//
		AOPTest aOPTest = scope.Resolve<aoptest>();
		//
		// Call the method, with the [CheckPoint(Id = 10)] on it
		//
		aOPTest.DoSomething();
	}	

应导致

  1. @Before 方法调用 DoSomething
  2. 在方法上找到 CheckPointAttributecp id = 10
  3. 做某事
  4. @After 方法:DoSomething

用于 REST 异步控制器方法的 ActionFilterAttributes

我们看到上面动态代理的实际应用。

但是,如果我们尝试将其应用于“服务器”端 .NET Core MVC REST 控制器,而调用来自 OWIN REST 通道,那会怎样呢?

例如

[HttpGet("index")]
[CheckPoint(Id = 10)]
public ActionResult Index()
{
	Debug.Print("3. Index\n");
	....
}

乍一看,这似乎可行。您确实需要将 AutoFac 连接到 Microsoft Extensions DependencyInjection

但是,当您仔细查看调用内部时,您会陷入困境

public void Intercept(Castle.DynamicProxy.IInvocation invocation) {..}

现在突然被多次调用,这是由于 ActionResult 的异步行为造成的,而且你无法在不使用巧妙、复杂、繁琐的代码的情况下使其工作。

我无法以任何可接受的方式使其工作。但是,我在寻找代码解决问题时偶然发现了这种模式:ActionFilterAttributes

步骤 1:定义过滤操作 OnActionExecutingOnActionExecuted

public class CheckPointActionAtrribute : Microsoft.AspNetCore.Mvc.Filters.ActionFilterAttribute
{
	//
	// Your custom checkpoint Id for this CheckPoint attribute
	//
	public long Id { get; set; }

	//
	// @Before
	//
	public override void OnActionExecuting
         (Microsoft.AspNetCore.Mvc.Filters.ActionExecutingContext context)
	{
		ControllerActionDescriptor actionDescriptor = 
                   (ControllerActionDescriptor)context.ActionDescriptor;
		Debug.Print($"1. @Before Method called 
           {actionDescriptor.ControllerName}.{actionDescriptor.ActionName}");
		var controllerName = actionDescriptor.ControllerName;
		var actionName = actionDescriptor.ActionName;
		var parameters = actionDescriptor.Parameters;
		var fullName = actionDescriptor.DisplayName;
		//
		// CheckPointActionAtrribute are not factored by the IOC
		// 
		CheckPointClient checkPointClient = BootStapper.Resolve<checkpointclient>();
		checkPointClient.CheckPoint(Id);
	}
	
	//
	// @After
	//
	public override void OnActionExecuted(ActionExecutedContext context)
	{
		ControllerActionDescriptor actionDescriptor = 
                    (ControllerActionDescriptor)context.ActionDescriptor;
		Debug.Print($"3. @After method: 
                {actionDescriptor.ControllerName}.{actionDescriptor.ActionName}");
	}
}

最后步骤 2,应用属性

[HttpGet("index")]
[CheckPointActionAtrribute(Id = 10)]
public ActionResult Index()
{
	Debug.Print("3. CheckPointController.Index\n");
	....
}

结果应该是这样

  1. @Before 方法调用 CheckPointController.Index
  2. CheckPointController.Index
  3. @After 方法:CheckPointController.Index

瞧,完成了。轻而易举。

问题:如果过滤器属性如此简单,为什么还要费心使用第一个更复杂的 DynamicProxy 解决方案呢?

答案ActionFilterAttribute 适用于 (MVC) DotNet Core。如果您使用纯 .NET Core 或 .NET Full 呢?-> 您可以使用 DynamicProxy

希望在您安装 RabbitMQ 后,当您运行上面的 C# 示例和/或下面的 Java 示例时,您会在 checkpoint.check.request 队列中看到一条 CheckPoint 消息。

checkpoint.check.request 队列中的消息。注意:交换源、路由键、应用程序 ID、时间戳、标头和类型,以及正文负载

C# 切面总结

我们从之前的面向切面 C# 实现中学到了什么?

C# LTW 切面编程有效

  • 使用 DynamicProxy 和/或 ActionFilterAttribute (.NET Core MVC)
  • 您确实需要使用实现、接口和 IOC 设置进行干净的编程。
  • 内部切面在 C# 中与 LTW 也能工作。我没有使其工作,但 AutoFac 和 Castle 确实支持这一点。请查看
     builder.RegisterType<..>()
                    .As<..>()
                    .EnableInterfaceInterceptors()
    并添加 .EnableClassInterceptors()

 

C# CTW 可以工作

  • 但未包含在此示例中。我找不到任何 C# IL 切面织入器。然而,它们可能存在……

使用 RabbitMQ 发送消息

对于下面 Java 和 C# 中的示例,我们使用主题交换机。主题交换机的工作方式如下:

  • 声明一个交换机,ExchangeDeclare("X.Y.exchange");
  • 声明一个队列 "A.B.C", QueueDeclare("A.B.C");
  • 将声明的队列绑定到声明的交换机,用于主题 "S.T.*": QueueBind("A.B.C", "X.Y.exchange", "S.T.*");

队列定义与交换机和主题

从这一刻起,所有发送到交换机中包含主题类型为“S.T.*”的消息,也会发送到队列。因此,您可以向一个交换机发送一条消息,如果多个队列绑定到该主题到该交换机,它们都会收到该主题消息。注意:您发送到交换机,从队列接收。

Java 消息传递

在 Java 中使用 RabbitMQ,我们使用 org.springframework.boot/spring-boot-starter-amqporg.springframework.amqp/spring-rabbit

使用 AMQP 发送和接收队列消息

首先,您需要声明您的交换机和队列以及一些 bean。我在单独的类中完成了此操作。

public class ExchangeDefinition {
    public static final String CHECKPOINT_EXCHANGE = "ricta.checkpoint.exchange";
    public static final String KEY_CHECKPOINT_REQUEST = "checkpoint.check.request";
}
	
@Configuration
public class CheckPointExchangeConfig implements RabbitListenerConfigurer {

    @Autowired
    ConnectionFactory connectionFactory;

    @Bean
    public Exchange checkPointEventExchange() {
        return new TopicExchange(CHECKPOINT_EXCHANGE);
    }

    @Override
    public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
    }

    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
        return new MappingJackson2MessageConverter();
    }

    @Bean
    public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(consumerJackson2MessageConverter());
        return factory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
        return rabbitTemplate;
    }
}

然后,你需要声明并绑定你的队列。同样,在一个单独的类中

public class QueueDefinition {
    public static final String QUEUE_CHECKPOINT_REQUEST = KEY_CHECKPOINT_REQUEST;
}
@Configuration
public class CheckPointQueueConfig implements RabbitListenerConfigurer {
	
    @Bean
    public Queue queueCheckpointRequest() {
        return new Queue(QueueDefinition.QUEUE_CHECKPOINT_REQUEST);
    }

    @Bean
    public Binding checkPointRequestBinding
     (Queue queueCheckpointRequest, Exchange checkPointEventExchange) {
        return BindingBuilder
                .bind(queueCheckpointRequest)
                .to(checkPointEventExchange)
                .with(KEY_CHECKPOINT_REQUEST)
                .noargs();
    }

    @Autowired
    DefaultMessageHandlerMethodFactory messageHandlerMethodFactory;

    @Override
    public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
    }
}

接下来,为了发送消息,我们使用 RabbitTemplate

{
	...
	@Autowired
	private RabbitTemplate rabbitTemplate;

	@Override
	public void sendToExhange(Message message) {
		rabbitTemplate.send(ExchangeDefinition.CHECKPOINT_EXCHANGE, 
                            ExchangeDefinition.KEY_CHECKPOINT_REQUEST, message);
	}
	...
}

再次注意:您发送到交换机,并从队列接收/监听。

为了接收消息,我们使用 @RabbitListener

{
	....
	@RabbitListener(queues = QUEUE_CHECKPOINT_REQUEST)
	public void handleCheckPointMessage(Message message)
			throws IOException {
		....
	}
	....
}

至此,Java 中 RabbitMQ 消息的发送和接收就结束了。

几点备注

备注 1:交换机可以被多个发送者使用。但是,队列的监听器应该只有一个。如果您在同一个队列上启动两个监听器,您将体验到轮询行为,RabbitMQ 将轮流向其中一个监听器传递消息。请分离您的交换机和队列定义,并且最好将您的队列定义放在实现模块中,将您的交换机定义放在 API 模块中。

备注 2:在 @RabbitListener 上,一旦您的应用程序启动,并且您的组件被扫描和实例化,接收消息就会开始。这相当不受控制,并且是隐式的,可能会导致非常奇怪的行为,即在应用程序启动和设置的中间您就已经收到消息。有方法可以阻止这种情况。特别是在集成测试场景中,您需要更多的控制。我在这里使用一些特殊的、高级的代码来配置 RabbitMQ,使其默认不启动监听器,然后在您的代码中显式启动和停止监听器。

{...
	/**
	 * Props to stop listeners by startup in yml do not work. 
	 * rabbitmq.listener.auto-startup: false
	 * rabbitmq.listener.simple.auto-startup: false
	 * So use this factory construction
	 * @param connectionFactory
	 * @return 
	 */
	@Bean
	public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory
        (ConnectionFactory connectionFactory) {
		SimpleRabbitListenerContainerFactory factory = 
            new SimpleRabbitListenerContainerFactory();
		factory.setConnectionFactory(connectionFactory);

		//
		//autoStartup = false, prevents handling messages immediately. 
        // You need to start each listener itself. 
		// Use BaseResponseMediaDTOQueueListener.startRabbitListener/stopRabbitListener
		//
		factory.setAutoStartup(false); 

		factory.setMessageConverter(new Jackson2JsonMessageConverter());
		return factory;
	}
	...
	@Autowired
	private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;
	
	public boolean startRabbitListener(String rabbitListenerId) {
		MessageListenerContainer listener = 
            rabbitListenerEndpointRegistry.getListenerContainer(rabbitListenerId);
		if (listener != null) {
			listener.start();
			return true;
		} else {
			 return false;
		}
	}
	public boolean stopRabbitListener(String rabbitListenerId) {
		MessageListenerContainer listener = 
              rabbitListenerEndpointRegistry.getListenerContainer(rabbitListenerId);
		if (listener != null) {
			listener.stop();
			return true;
		} else {
			 return false;
		}
	}
}

.NET 消息传递

对于 C# 中的 RabbitMQ,我们使用 RabbitMQ.Client nuget 包来发送消息。我将其用于“拦截 CheckPoint 客户端”。出于某种原因,我在脑海中产生了一个非常奇怪的扭曲,认为这个包是为那些想要发送消息的“客户端”准备的。而在服务消息接收端(未包含在代码中),我却去寻找“RabbitMQ.Server”。你猜怎么着,当然找不到。你同样需要 RabbitMQ.Client 来接收消息 :-)。

使用 RabbitMQ.Client 发送队列消息

using (var connection = ConnectionFactory.CreateConnection())
using (var channel = connection.CreateModel())
{
	//
	// At one place in time, you have to declare the exchange and topic queues, and bind them
	//
	var queue = channel.QueueDeclare(queue: "checkpoint.check.request",
						 durable: true,
						 exclusive: false,
						 autoDelete: false,
						 arguments: null);
	channel.ExchangeDeclare("ricta.checkpoint.exchange",
							ExchangeType.Topic);
	channel.QueueBind(queue.QueueName,
		"ricta.checkpoint.exchange",
		"checkpoint.check.*"
		);
		
	string message = Id.ToString();
	var body = Encoding.UTF8.GetBytes(message);
	IBasicProperties props = channel.CreateBasicProperties();
	props.AppId = "DEMO-APP";
	DateTime now = DateTime.UtcNow;
	long unixTime = ((DateTimeOffset)now).ToUnixTimeSeconds();
	props.Timestamp = new AmqpTimestamp(unixTime);
	props.Type = "application/json";
	props.Headers = new Dictionary<string, object="">
	{
		{ "__TypeId__", "java.lang.String" }
	};

	channel.BasicPublish(exchange: "ricta.checkpoint.exchange",
					routingKey: "checkpoint.check.request",
					basicProperties: props,
					body: body);
	Console.WriteLine(" [x] Sent {0}", message);
}

使用 RabbitMQ.Client 接收队列消息

{
	var factory = new ConnectionFactory { HostName = "localhost" };

	//
	// create connection  
	//
	_connection = factory.CreateConnection();

	//
	// create channel  
	//
	_channel = _connection.CreateModel();

	_channel.ExchangeDeclare("ricta.checkpoint.exchange", ExchangeType.Topic, true);
	_channel.QueueDeclare("ricta.check.request", true, false, false, null);
	_channel.QueueBind("ricta.check.request", "ricta.checkpoint.exchange", 
                       "checkpoint.check.*", null);
	_channel.BasicQos(0, 1, false);

	//
	// Create Consumer of messages
	//
	var consumer = new EventingBasicConsumer(_channel);
	consumer.Received += (ch, ea) =>
	{
		//
		// handle the received message 
		//
		HandleMessage(ea);
		_channel.BasicAck(ea.DeliveryTag, false);
	};

	...

	private void HandleMessage(BasicDeliverEventArgs args)
	{
		//
		// received message, app id and time
		//
		var content = System.Text.Encoding.UTF8.GetString(args.Body);
		var appId = args.BasicProperties.AppId;
		var checkTime = System.DateTimeOffset.FromUnixTimeMilliseconds
                        (args.BasicProperties.Timestamp.UnixTime).ToLocalTime().DateTime;
		....
	}
}

至此,C# 中 RabbitMQ 消息的发送和接收就结束了。

几点备注

备注 1:交换机可以被多个发送者使用。但是,队列的监听器应该只有一个。如果您在同一个队列上启动两个监听器,您将体验到轮询行为,RabbitMQ 将轮流向其中一个监听器传递消息。请分离您的交换机和队列定义,并且最好将您的队列定义放在实现模块中,将您的交换机定义放在 API 模块中。

备注 2:我只给出了 C# 中的样板代码,没有费心创建生产就绪代码,将其分离到类中,捕获所有异常并记录所有内容等。我建议不要那样做 -:(。

结论,兴趣点及更多…

已解决

我们在本文中探讨了使用多种模式构建现代微服务代码在 C# 和 Java 中都是可行的。

我们亲身体验了

  • 多层级联门面模式(面向切面编程、IOC、接口-实现、客户端、消息传递)
  • CQRS 命令查询职责分离模式(通过消息传递的命令)
  • 关注点分离概念(切面 = API,实现-接口,二进制分离)
  • 使用 Spring Boot 和 AutoFac 容器的 IOC 模式
  • 使用 RabbitMQ 进行消息传递
  • 接口-实现
  • 面向切面编程

我们没有讨论什么?

当然,世界的其余政治 ;-)。但更详细一点。

  • 在切面连接点中使用方法的参数。然而,它包含在随附的 Java 代码中,我看到在 DotNet 中也是可能的;
  • TDD 测试驱动开发:为整个设置创建健壮的测试代码。我很快会写另一篇文章来解决单元测试和集成测试。然而,由于我们使用了大量的关注点分离,您会发现它非常可测试,使用现代模拟框架,如 Mockito、FakeIsEasy 等,以及基于 Gherkin 的 Cucumber 和 Specflow 等现代集成语言。
  • 特性标志/切换;如果您想要真正敏捷的概念,您需要这个;请参阅[2] Roland Roos,特性标志
  • 使用源代码仓库(例如 Git)和组件仓库(例如 NuGet、Nexus)对您的微服务代码和组件进行版本控制。
  • 容器化、Docker,最后是:Kubernetes。
  • Ci/Cd:使用 TFS 或 BitBucket 和容器注册表等工具持续构建、测试和部署您的微服务
  • 耐心...所有这些都会做,但可能不是今天 :-)。
  • 我忘了什么?请告诉我。

我们学到了什么?

面向切面编程在 C# 和 Java 中都有效,但实现方式相似但不完全相同。

(主题)消息传递在 C# 和 Java 中都有效,但实现方式相似但不完全相同。

您可以创建具有模式的通用、抽象设计,并在 Java 或 C# 中实现它们。

关于微服务的重要提示:我确实相信,您需要所有这些模式才能使其成功,还有更多(文档、TDD、云、CI/CD、功能标志、容器化等)。这意味着,您需要在组织内部投入大量知识,并可能聘请一些经验丰富的人员来建立它。我向您保证它会奏效,但我也向您保证,如果不使用所有这些模式和健壮的实现,将比单体系统造成更大的混乱。糟糕透顶 :-(。

我为什么坚信微服务是有效的?

我可能在这里开始了一段冗长的宗教信仰论证。我没有。相反,我给您一个比喻性的问题,以及我的答案。

问题:您认为现代基于微组件的电子产品为什么能如此出色地工作?(我学习、应用和管理过电子产品,所以请相信我,我对此略知一二。)

我的简短回答:我坚信电子微组件之所以有效,是因为它们创造了真正可重用的、非常微小且独立的组件,并拥有一套出色且规范化的工具和文档。既然这在电子这种复杂的领域是可能的,我坚信这也可以应用于“类似复杂”的软件领域。而微服务正是实现这一目标的关键部分。

我更长的(担忧的)回答:尽管我确实相信微服务可以实现现代电子微组件已经证明了几十年(并将继续证明几十年)的成就,但我有点担心我们在软件领域还没有达到那种程度。因为电子学在七十年代/八十年代/九十年代初就已经掌握并教授了这些概念,并且拥有一个快速而庞大的产业,以可负担的价格提供这些标准化的组件和工具,以及出色的工具、规范和文档。

那么我们在软件行业这方面做得怎么样呢?呃,我敢打赌:完全不同。首先,我们不教学生微服务概念。我们甚至还没有开始确切地知道它们在软件行业中到底是什么。

我们是否就微服务规范的编写达成任何规范?呃,没有。

我们以可承受的价格提供它们吗?嗯,没有。

我们有专门的工具来帮助我们掌握设计和实现中的微服务模式和标准吗?嗯,没有。

一旦我们创建了它们,我们能稳定地托管它们吗?是的。Google、Microsoft、AWS 是一个好的开始。Kubernetes 很棒。

设计、创建、构建、托管和维护微服务容易吗?不可能。在目前这个时间点,复杂程度超乎想象?你敢打赌。

我们能以可承受的价格稳定地托管它们吗?嗯,不。我希望 Microsoft、AWS(Google、Netflix?)是一个好的开始……

所以,我怀疑,漫长隧道的开端闪耀着微光。但总得从某个地方开始,不是吗?

注意事项

现在,正如我之前在本文中多处提到的,版本地狱比以前好了一点,但我向你保证,它仍然存在。如果你试图将所有概念与 COTS 组件(Maven、NuGet)结合起来,这迟早会在某个地方给你带来皇家般的麻烦。我对此没有好的答案。我想,只能摸索着前进吧。

然而,我知道一件事:一旦你为你的外部组件堆栈建立了一个稳定的基线,就要小心翼翼地管理它。我多次应用的一个有效模式是创建你自己的托管组件仓库(NuGet 服务器、Nexus 或其他)。将你选择和验证过的基线版本化组件放入其中,并让高级工程师管理它。其余的人:只使用仓库中的组件。在你的生产管道中,对所有开发人员(初级、中级和高级)关闭 Maven Central (maven.org) 或 NuGet Central (nuget.org)。给他们一个业余农场和业余时间,但不要在你的生产管道中。相信我,我经历过,见过,做过。

如果你发现自己为了交付、测试、修复 bug 和构建软件,与各种奇怪的版本编译器和运行时错误(例如:“在组件 YYY 中找不到方法 XXX”或“无法找到或加载类 BBBB”)斗争了数周,并且在凌晨 3 点真正、真正厌倦了这一切,那就掌控局面,而不是让它掌控你。你可能犯的最大错误就是回到构建那些安全的老式单体应用,说:“我讨厌那些有趣的微服务,它们带着组件地狱”。我也不喜欢我的汽车的保险、车库、汽油和路税账单,但我从未发现自己希望回到马车时代……将你的微服务视为电动汽车。是的,它们目前续航里程可能较短。是的,它们购买价格昂贵。是的,它们可能需要很长时间才能充满电。但在 20-40 年后,没有人会再知道内燃机是什么了。单体应用也是如此。但那可能在未来 5-10 年内就会发生。如果不是更早的话……

历史

  • 2020 年 4 月 7 日:初始版本
© . All rights reserved.