Stream默认的消息重试功能,RabbitMQ中提供了关于延

2019-10-05 07:45栏目:大奖888官网登录
TAG:

我们在应用一些开源调解种类(比方:elastic-job等)的时候,对于义务的实施时间平时都以有规律性的,可能是每隔半小时推行一回,可能每一日晚上某个实践一次。不过事实上业务中还设有其余一种按期任务,它可能供给一些触及条件才起来定时,比如:编写博文时候,设置2小时之后发送。对于那个初始时间不明确的定期任务,大家也能够透过Spring Cloud Stream来很好的管理。

事先在写Spring Boot基础教程的时候写过一篇《Spring Boot中使用RabbitMQ》。在该文中,大家透过简单的布署和注释就能够落到实处向RabbitMQ中生产和花费消息。实际上大家利用的对RabbitMQ的starter就是通过Spring Cloud Stream中对RabbitMQ的协理来贯彻的。下边大家就经过本文来询问一下Spring Cloud Stream。

前面我们早已通过《Spring Cloud Stream花费失败后的拍卖政策:自动重试》一文介绍了Spring Cloud Stream私下认可的音讯重试成效。本文将介绍RabbitMQ的binder提供的别的一种重试功效:重新入队。

为了贯彻起来时间不明确的按期任务触发,我们将引进延迟音讯的应用。RabbitMQ中提供了关于推迟新闻的插件,所以本文就来具体介绍以下哪些使用Spring Cloud Stream以及RabbitMQ轻易的拍卖上述难点。

Spring Cloud Stream是一个用来为微服务应用创设新闻使得技能的框架。它能够依靠Spring Boot来创设独立的、可用于生产的Spring应用程序。它通过采取Spring Integration来接二连三消息代理中间件以促成音讯事件驱动的微服务应用。Spring Cloud Stream为一些承包商的新闻中间件产品提供了性格化的自动化配置达成,何况引进了宣告-订阅、花费组以及新闻分区那三个着力概念。简单来说,Spring Cloud Stream本质上便是结合了Spring Boot和Spring Integration,达成了一套轻量级的新闻使得的微服务框架。通过应用Spring Cloud Stream,能够使得地简化开辟人员对消息中间件的利用复杂度,让系统开垦人士能够有越来越多的生气关心于宗旨职业逻辑的拍卖。由于Spring Cloud Stream基于Spring Boot完结,所以它秉承了Spring Boot的长处,实现了自动化配置的法力帮忙我们能够长足的侧面使用,可是最近甘休Spring Cloud Stream只支持上面四个名牌的音信中间件的自动化配置:

预备三个会花费战败的例证,能够平素沿用前文的工程,也能够新建一个,然后创设如下代码的逻辑:

有关RabbitMQ延迟消息的插件介绍能够查看官方网站:

  • RabbitMQ
  • Kafka
@EnableBinding(TestApplication.TestTopic.class)@SpringBootApplicationpublic class TestApplication { public static void main(String[] args) { SpringApplication.run(TestApplication.class, args); } @RestController static class TestController { @Autowired private TestTopic testTopic; /** * 消息生产接口 * * @param message * @return */ @GetMapping("/sendMessage") public String messageWithMQ(@RequestParam String message) { testTopic.output().send(MessageBuilder.withPayload.build; return "ok"; } } /** * 消息消费逻辑 */ @Slf4j @Component static class TestListener { private int count = 1; @StreamListener(TestTopic.INPUT) public void receive(String payload) { log.info("Received payload : " + payload + ", " + count); throw new RuntimeException("Message consumer failed!"); } } interface TestTopic { String OUTPUT = "example-topic-output"; String INPUT = "example-topic-input"; @Output MessageChannel output(); @Input SubscribableChannel input(); }}

安装方式相当粗略,只要求在这么些页面: 中找到rabbitmq_delayed_message_exchange插件,根据你使用的RabbitMQ版本选择相应的插件版本下载就可以。

神速入门

上边大家由此创设一个粗略的示范来对Spring Cloud Stream有四个发端认识。该示例重要目的是营造三个基于Spring Boot的微服务应用,那个微服务应用将透过利用新闻中间件RabbitMQ来抽出音信并将音信打字与印刷到日志中。所以,在实行上面步骤在此以前请先承认已经在本土安装了RabbitMQ,具体安装步骤请参见此文。

剧情很轻松,既包括了消息的生产,也包涵了新闻消费。音讯花费的时候主动抛出了一个可怜来模拟新闻的开支战败。

注意:独有RabbitMQ 3.6.x以上才支撑

塑造多个Spring Cloud Stream花费者

  • 创办三个基础的Spring Boot工程,命名字为:stream-hello

  • 编辑pom.xml中的依赖关系,引进Spring Cloud Stream对RabbitMQ的支撑,具体如下:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.9.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>     
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Dalston.SR4</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
  • 创设用于吸收接纳来自RabbitMQ新闻的顾客SinkReceiver,具体如下:
@EnableBinding(Sink.class)
public class SinkReceiver {

    private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);

    @StreamListener(Sink.INPUT)
    public void receive(Object payload) {
        logger.info("Received: " + payload);
    }

}
  • 创建应用主类,这里同其余Spring Boot同样,未有何极其之处,具体如下:
@SpringBootApplication
public class SinkApplication {

    public static void main(String[] args) {
        SpringApplication.run(SinkApplication.class, args);
    }

}

到那边,大家神速入门示例的编码职责就已经完毕了。上边大家独家运行RabbitMQ以及该Spring Boot应用,然后做上面包车型地铁考查,看看它们是如何运维的。

在开发银行应用以前,还要记得配置一下输入输出通道对应的轮廓目的(exchange或topic名)、并安装一下分组,举例:

在下载好今后,解压得到.ez末段的插件包,将其复制到RabbitMQ安装目录下的plugins文件夹。

手工业测验注脚

  • 我们先来看一下Spring Boot应用的起步日志。
...
INFO 16272 --- [main] o.s.c.s.b.r.RabbitMessageChannelBinder   : declaring queue for inbound: input.anonymous.Y8VsFILmSC27eS5StsXp6A, bound to: input
INFO 16272 --- [main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: SimpleConnection@3c78e551 [delegate=amqp://guest@127.0.0.1:5672/]
INFO 16272 --- [main] o.s.integration.channel.DirectChannel    : Channel 'input.anonymous.Y8VsFILmSC27eS5StsXp6A.bridge' has 1 subscriber(s).
INFO 16272 --- [main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.Y8VsFILmSC27eS5StsXp6A
...

从地方的日志内容中,大家能够赢得以下音信:

  • 使用guest顾客成立了三个对准127.0.0.1:5672位置的RabbitMQ连接,在RabbitMQ的调整新竹大家也得以开掘它。

图片 1

  • 注明了多个名称叫input.anonymous.Y8VsFILmSC27eS5StsXp6A的队列,并通过RabbitMessageChannelBinder将协和绑定为它的顾客。这么些新闻大家也能在RabbitMQ的调整嘉义窥见它们。

图片 2

上面大家得以在RabbitMQ的调控嘉义步入input.anonymous.Y8VsFILmSC27eS5StsXp6A队列的田间管理页面,通过Publish Message效果来发送一条新闻到该队列中。

图片 3

此刻,大家得以在脚下开发银行的Spring Boot应用程序的调整桃园看见上边包车型地铁从头到尾的经过:

INFO 16272 --- [C27eS5StsXp6A-1] com.didispace.HelloApplication           : Received: [B@7cba610e

咱俩能够开掘在使用调控台北输出的内容就是SinkReceiverreceive办法定义的,而输出的具体内容则是出自信息队列中拿走的对象。这里由于大家未有对新闻进行系列化,所以输出的只是该目的的引用,在背后的小节中我们会详细介绍接收消息后的拍卖。

在顺遂达成上边快速入门的演示后,大家大致解释一下上边的步骤是如何将我们的Spring Boot应用连接上RabbitMQ来费用消息以贯彻音讯使得业务逻辑的。

率先,我们对Spring Boot应用做的就是引进spring-cloud-starter-stream-rabbit借助,该重视包是Spring Cloud Stream对RabbitMQ扶助的包装,个中包涵了对RabbitMQ的自动化配置等剧情。从底下它定义的重视关系中,大家还能明白它也等于spring-cloud-stream-binder-rabbit依赖。

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    </dependency>
</dependencies>

紧接着,大家再来看看这里运用的几个Spring Cloud Stream的骨干注脚,它们都被定义在SinkReceiver中:

  • @EnableBinding,该证明用来钦赐三个或八个概念了@Input@Output申明的接口,以此完毕对消息通道(Channel)的绑定。在上面的事例中,大家经过@EnableBinding(Sink.class)绑定了Sink接口,该接口是Spring Cloud Stream中私下认可完成的对输入消息通道绑定的概念,它的源码如下:
public interface Sink {

    String INPUT = "input";

    @Input(Sink.INPUT)
    SubscribableChannel input();

}

它通过@Input注明绑定了二个名称叫input的通道。除了Sink之外,Spring Cloud Stream还暗中认可完毕了绑定output通道的Source接口,还会有结合了SinkSourceProcessor接口,实际利用时我们也足以本身通过@Input@Output申明来定义绑定音讯通道的接口。当我们必要为@EnableBinding点名三个接口来绑定信息通道的时候,能够那样定义:@EnableBinding(value = {Sink.class, Source.class})

  • @StreamListener:该评释首要定义在点子上,成效是将被修饰的办法注册为音讯中间件上数据流的事件监听器,申明中的属性值对应了监听的新闻通道名。在上头的例证中,大家经过@StreamListener(Sink.INPUT)注解将receive办法注册为对input新闻通道的监听处理器,所以当大家在RabbitMQ的操纵页面中公布音讯的时候,receive方法会做出相应的响应动作。
spring.cloud.stream.bindings.example-topic-input.destination=test-topicspring.cloud.stream.bindings.example-topic-input.group=stream-exception-handlerspring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.requeue-rejected=truespring.cloud.stream.bindings.example-topic-output.destination=test-topic

接下来通过命令行启用该插件:

编纂花费音讯的单元测验用例

地点大家由此RabbitMQ的调节台实现了发送消息来评释了信息成本程序的功效,纵然这种艺术相比low,可是透过上边的步子,相信我们对RabbitMQ和Spring Cloud Stream的消息开销已经有了有些基础的认知。上面大家经过编写制定生产音讯的单元测量检验用例来宏观我们的入门内容。

  • 在上头成立的工程中开创单元测试类:
@RunWith(SpringRunner.class)
@EnableBinding(value = {SinkApplicationTests.SinkSender.class})
public class SinkApplicationTests {

    @Autowired
    private SinkSender sinkSender;

    @Test
    public void sinkSenderTester() {
        sinkSender.output().send(MessageBuilder.withPayload("produce a message :http://blog.didispace.com").build());
    }

    public interface SinkSender {

        String OUTPUT = "input";

        @Output(SinkSender.OUTPUT)
        MessageChannel output();

    }

}
  • 在动用了地方的新闻花费者程序之后,运维这里定义的单元测量检验程序,大家立即就能够在消息开支者的调控台北抽取上面的原委:
INFO 50947 --- [L2W-c2AcChb2Q-1] com.didispace.stream.SinkReceiver        : Received: produce a message :http://blog.didispace.com

在地点的单元测量试验中,我们经过@Output(SinkSender.OUTPUT)概念了二个出口通过,而该出口通道的名为input,与前文中的Sink中定义的开支通道同名,所以那边的单元测量试验与前文的费用者程序组成了一对劳动者与成本者。到那边,本文的内容就次甘休,倘让你可以单独的成就地方的例子,那么对于Spring Cloud Stream的底子运用算是入门了。然则,Spring Cloud Stream的施用远不仅于此,在前段时间的博文中,作者讲承接立异那有个别剧情,援救他们来明白和用好Spring Cloud Stream来构建音信使得的微服务!

正文完整实例:

  • Github
  • Gitee

倘让你对这么些感兴趣,款待star、follow、收藏、转载给予扶助!

本文内容部分节选自己的《Spring Cloud微服务实战》,但对借助的Spring Boot和Spring Cloud版本做了晋级。

正文首发于本身的博客:

产生了上面配置之后,运维应用并拜会localhost:8080/sendMessage?message=hello接口来发送一个新闻到MQ中了,此时得以看出程序不断的抛出了新闻花费十一分。那是由于此处大家多加了三个配置:spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.requeue-rejected=true。在该配置效果与利益之下,音讯花费失利未来,并不会将该音讯丢掉,而是将新闻再一次放入队列,所以新闻的成本逻辑会被重新实施,直到那条音讯花费成功甘休。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

名目多数教程推荐

  • Spring Boot基础教程
  • Spring Cloud基础教程

在成功了地方的那几个例子之后,大概读者会有下边多个周围难点:

该插件在经过上述命令启用后就能够直接使用,没有须要重启。

标题一:在此之前介绍的Spring Cloud Stream默许提供的默许作用(spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts)与本文所说的重入队列完成的重试有何分歧?

其余,要是您未有启用该插件,您或然为遇见类似那样的失实:

Spring Cloud Stream默许提供的私下认可功用只是对管理逻辑的重试,它们的拍卖逻辑是由同样条音信触发的。而本文所介绍的双重入队史通过重新将消息归入队列而接触的,所以实际是接受了数次新闻而完成的重试。

ERROR 156 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: connection error; protocol method: #method(reply-code=503, reply-text=COMMAND_INVALID - unknown exchange type 'x-delayed-message', class-id=40, method-id=10)

主题素材二:如上边包车型客车事例那样,开支一贯不成事,这个不成事的音信会被无休止堆放起来,如何缓和这一个难点?

下边通过编写制定三个归纳的例子来具体体会一下以此性情的用法:

对于那些难题,大家得以共同前文介绍的DLQ队列来完善消息的不得了管理。

@EnableBinding(TestApplication.TestTopic.class)@SpringBootApplicationpublic class TestApplication { public static void main(String[] args) { SpringApplication.run(TestApplication.class, args); } @Slf4j @RestController static class TestController { @Autowired private TestTopic testTopic; /** * 消息生产接口 * * @param message * @return */ @GetMapping("/sendMessage") public String messageWithMQ(@RequestParam String message) { log.info("Send: " + message); testTopic.output().send(MessageBuilder.withPayload.setHeader("x-delay", 5000).build; return "ok"; } } /** * 消息消费逻辑 */ @Slf4j @Component static class TestListener { @StreamListener(TestTopic.INPUT) public void receive(String payload) { log.info("Received: " + payload); } } interface TestTopic { String OUTPUT = "example-topic-output"; String INPUT = "example-topic-input"; @Output MessageChannel output(); @Input SubscribableChannel input(); }}

咱俩只要求扩展如下配置,自动绑定dlq队列:

剧情很简短,既包罗了音讯的生产,也暗含了音讯花费。在/sendMessage接口的概念中,发送了一条音讯,一条音信的头音信中包括了x-delay字段,该字段用来内定消息延迟的年华,单位为皮秒。所以上述代码发送的音信会在5秒未来被花费。在消息监听类TestListener中,对TestTopic.INPUT大路定义了@StreamListener,这里会对延缓新闻坚实际的逻辑。由于新闻的成本是延迟的,进而变相达成了从新闻发送那一刻起早先的定时任务。

spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true

在运转应用在此之前,还要须要做一些须要的安排,下边分音讯生产端和开销端做验证:

接下来改换一下新闻管理程序,能够依附业务意况,为步向dlq队列增添三个规范,比方上面包车型地铁例子:

音信生产端

@StreamListener(TestTopic.INPUT)public void receive(String payload) { log.info("Received payload : " + payload + ", " + count); if (count == 3) { count = 1; throw new AmqpRejectAndDontRequeueException("tried 3 times failed, send to dlq!"); } else { count ++; throw new RuntimeException("Message consumer failed!"); }}
spring.cloud.stream.bindings.example-topic-output.destination=delay-topicspring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange=true

设定了计数器count,当count为3的时候抛出AmqpRejectAndDontRequeueException那几个一定的相当。此时,当唯有当抛出那个那些的时候,才会将信息归入DLQ队列,进而不会形成深重的堆叠难点。

在意这里的一个新参数spring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange,用来展开延迟新闻的法力,那样在创制exchange的时候,会将其设置为有着延缓天性的exchange,也正是用到上边我们设置的延迟音讯插件的成效。

正文示例读者能够通过查阅下边货仓的中的stream-exception-handler-4项目:

新闻消费端

  • Github
  • Gitee
spring.cloud.stream.bindings.example-topic-input.destination=delay-topicspring.cloud.stream.bindings.example-topic-input.group=testspring.cloud.stream.rabbit.bindings.example-topic-input.consumer.delayed-exchange=true

假使您对这个感兴趣,应接star、follow、收藏、转载给予帮助!

在费用端也一致,要求安装spring.cloud.stream.rabbit.bindings.example-topic-output.producer.delayed-exchange=true。假若该参数不设置,将会油但是生类似下边包车型大巴错误:

  • Spring Boot基础教程
  • Spring Cloud基础教程
ERROR 9340 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'delay-topic' in vhost '/': received 'topic' but current is ''x-delayed-message'', class-id=40, method-id=10)

本文首发:

完结了地点配置之后,就足以运维应用,并尝试访谈localhost:8080/sendMessage?message=hello接口来发送一个音讯到MQ中了。此时得以看来类似上面包车型大巴日志:

2019-01-02 23:28:45.318 INFO 96164 --- [ctor-http-nio-3] c.d.s.TestApplication$TestController : Send: hello2019-01-02 23:28:45.328 INFO 96164 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]2019-01-02 23:28:45.333 INFO 96164 --- [ctor-http-nio-3] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory.publisher#5c5f9a03:0/SimpleConnection@3278a728 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 53536]2019-01-02 23:28:50.349 INFO 96164 --- [ay-topic.test-1] c.d.stream.TestApplication$TestListener : Received: hello

从日记中能够观察,Send: helloReceived: hello两条出口之间间距了5秒,适合大家地点编码设置的延迟时间。

在代码层面已经到位了定期职分,那么大家怎样查看延迟的新闻数等音讯呢?

那时候,大家能够展开RabbitMQ的Web调节台,首先能够步入Exchanges页面,看看那些特殊exchange,具体如下:

图片 4image

能够看出,这些exchange的Type类型是x-delayed-message。点击该exchange的称号,步入详细页面,就可以知见更加多具体音讯了:

图片 5image

正文示例读者能够透过翻看上边仓库的中的stream-delayed-message项目:

  • Github
  • Gitee

假使你对那么些感兴趣,接待star、follow、收藏、转载给予接济!

  • Spring Boot基础教程
  • Spring Cloud基础教程

正文头阵于自家的单身博客:

版权声明:本文由大奖888-www.88pt88.com-大奖888官网登录发布于大奖888官网登录,转载请注明出处:Stream默认的消息重试功能,RabbitMQ中提供了关于延