java B2B2C源码电子商务平台 -Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

yayay · · 69 次点击 · · 开始浏览    
有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的:需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码 一零三八七七四六二六 ``` @StreamListener(value = TestTopic.INPUT) public void receiveV1(String payload, @Header("version") String version) { if("1.0".equals(version)) { // Version 1.0 } if("2.0".equals(version)) { // Version 2.0 } } ``` 那么当消息处理逻辑复杂的时候,这段逻辑就会变得特别复杂。针对这个问题,在@StreamListener注解中提供了一个不错的属性condition,可以用来优化这样的处理结构。 动手试试 下面通过编写一个简单的例子来具体体会一下这个属性的用法: ``` @EnableBinding(TestApplication.TestTopic.class) @SpringBootApplication public class TestApplication { public static void main(String[] args) { SpringApplication.run(TestApplication.class, args); } @RestController static class TestController { @Autowired private TestTopic testTopic; /** * 消息生产接口 * * @param message * @return */ @GetMapping("/sendMessage") public String messageWithMQ(@RequestParam String message) { testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "1.0").build()); testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "2.0").build()); return "ok"; } } /** * 消息消费逻辑 */ @Slf4j @Component static class TestListener { @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='1.0'") public void receiveV1(String payload, @Header("version") String version) { log.info("Received v1 : " + payload + ", " + version); } @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='2.0'") public void receiveV2(String payload, @Header("version") String version) { log.info("Received v2 : " + payload + ", " + version); } } interface TestTopic { String OUTPUT = "example-topic-output"; String INPUT = "example-topic-input"; @Output(OUTPUT) MessageChannel output(); @Input(INPUT) SubscribableChannel input(); } } ``` 内容很简单,既包含了消息的生产,也包含了消息消费。在/sendMessage接口的定义中,发送了两条消息,一条消息的头信息中包含version=1.0,另外一条消息的头信息中包含version=2.0。在消息监听类TestListener中,对TestTopic.INPUT通道定义了两个@StreamListener,这两个监听逻辑有不同的condition,这里的表达式表示会根据消息头信息中的version值来做不同的处理逻辑分发。 在启动应用之前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),比如: ``` spring.cloud.stream.bindings.example-topic-input.destination=test-topic spring.cloud.stream.bindings.example-topic-input.group=stream-content-route spring.cloud.stream.bindings.example-topic-output.destination=test-topic ``` 完成了上面配置之后,就可以启动应用,并尝试访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了。此时可以看到类似下面的日志: ``` 2018-12-24 15:50:33.361 INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener : Received v1 : hello, 1.0 2018-12-24 15:50:33.363 INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener : Received v2 : hello, 2.0 ``` 从日志中可以看到,两条带有不同头信息的消息,分别通过不同的监听处理逻辑输出了对应的日志打印。[java B2B2C源码电子商务平台](https://2147775633.iteye.com/blog/2434341)
69 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet