Dra-M Dra-M
首页
技术
冥思
哲学
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

莫小龙

保持理智,相信未来。
首页
技术
冥思
哲学
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Java

  • Golang

  • 编程思想

  • 微服务

    • SpringCloud

      • SpringBoot RabbitMQ 快速启动
        • 简单入门
        • 生产者确认和回退的回调
        • 消费者确认(ACK)
        • 消费端限流
      • Spring Task fixedDelayString不能随着配置中心动态变化的问题
      • 新版SpringCloudSteam+Kafka 更加简单的配置消息收发
    • Kubernetes

    • Oauth2+Gateway+JWT+RSA 解决微服务单点登录问题
  • 中间件

  • Python

  • 运维

  • 技术
  • 微服务
  • SpringCloud
莫小龙
2020-03-25
目录

SpringBoot RabbitMQ 快速启动

简单入门 & 发送/接受确认

# 简单入门

在rabbitMQ中创建simple_queue队列

pom:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1
2
3
4

配置:

server:
  port: 8081
spring:
  rabbitmq:
    addresses: 192.168.200.128
#    listener:
#      simple:
#        #开启消费者确认模式
#        acknowledge-mode: manual
#        #开启消费者限流
#        prefetch: 3
#    #生产者确认模式
#    publisher-confirm-type: simple
#    #生产者回退模式
#    publisher-returns: true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

发送消息:

@RestController
public class SendMessageController {
    @Autowired
    RabbitTemplate rabbitTemplate;
    /**
     * 发送消息
     * @param msg 消息
     * @return 提示
     */
    @RequestMapping("/send/{msg}")
    public String send(@PathVariable("msg") String msg) {
        //
        rabbitTemplate.convertAndSend("", "simple_queue", msg);
        return "发送完成";
    }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

监听消息:

@Component
public class SimpleListener {
@RabbitListener(queues = "simple_queue")
public void simpleHandler(String msg){
    System.out.println(msg);
 }
}
1
2
3
4
5
6
7

# 生产者确认和回退的回调

配置文件:

# 开启生产者确认模式:(confirm),投递到交换机,不论失败或者成功都回调
spring.rabbitmq.publisher-confirms: true
# 开启生产者回退模式:(returns),交换机将消息路由到队列,出现异常则回调
spring.rabbitmq.publisher-returns: true
1
2
3
4

回调类,为RabbitTemplate设置回调方法:

@Component
public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 投递到交换机,不论投递成功还是失败都回调次方法
     *
     * @param correlationData 投递相关数据
     * @param ack             是否投递到交换机
     * @param cause           投递失败原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            System.out.println("消息进入交换机成功{}");
        } else {
            System.out.println("消息进入交换机失败{} , 失败原因:" + cause);
        }
    }


    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        System.out.println("交换机路由至消息队列出错:>>>>>>>");
        System.out.println("交换机:" + exchange);
        System.out.println("路由键:" + routingKey);
        System.out.println("错误状态码:" + replyCode);
        System.out.println("错误原因:" + replyText);
        System.out.println("发送消息内容:" + message.toString());
        System.out.println("<<<<<<<<");
    }


    /**
     * 创建RabbitTemplate对象之后执行当前方法,为模板对象设置回调确认方法
     * 设置消息确认回调方法
     * 设置消息回退回调方法
     */
    @PostConstruct
    public void initRabbitTemplate() {
        //设置消息确认回调方法
        rabbitTemplate.setConfirmCallback(this);
        //设置消息回退回调方法
        rabbitTemplate.setReturnCallback(this);
    }

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

# 消费者确认(ACK)

# 开启消费者确认模式
spring.rabbitmq.listener.simple.acknowledge-mode: manual
1
2
@Component
public class MessageListener {
    @RabbitListener(queues = "simple_queue")
    public void acceptMsg(Message message, Channel channel) throws IOException {
        byte[] body = message.getBody();
        String msg = new String(body, StandardCharsets.UTF_8);
        //获取投递标签
        MessageProperties messageProperties = message.getMessageProperties();
        long deliveryTag = messageProperties.getDeliveryTag();
        /**
         * 签收消息,前提条件,必须在监听器的配置中,开启手动签收模式
         * 参数1:消息投递标签
         * 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
         * 参数3:是否重回队列
         */
        if ("cnm".equals(msg)) {
            //拒签
            channel.basicNack(deliveryTag, false, false);
            System.out.println("拒绝签收,扔掉:{}");
        } else {
            channel.basicAck(deliveryTag, false);
            System.out.println("手动签收完成:{}");
            System.out.println(msg);
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

# 消费端限流

rabbitmq.listener.simple.prefetch: 3
1

#Java#SpringBoot#rabbitMQ
上次更新: 10/23/2024
【译文】Bigtable
Spring Task fixedDelayString不能随着配置中心动态变化的问题

← 【译文】Bigtable Spring Task fixedDelayString不能随着配置中心动态变化的问题→

最近更新
01
mosquito配置ws协议
10-23
02
Pip包的离线下载和安装
10-23
03
stable diffusion 相关收藏
02-24
更多文章>
Theme by Vdoing | Copyright © 2019-2024 Dra-M
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式