SpringBoot RabbitMQ 快速启动
简单入门 & 发送/接受确认
# 简单入门
在rabbitMQ中创建simple_queue队列
pom:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1
2
3
4
2
3
4
配置:
server:
port: 8081
spring:
rabbitmq:
addresses: 192.168.200.128
# listener:
# simple:
# #开启消费者确认模式
# acknowledge-mode: manual
# #开启消费者限流
# prefetch: 3
# #生产者确认模式
# publisher-confirm-type: simple
# #生产者回退模式
# publisher-returns: true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
发送消息:
@RestController
public class SendMessageController {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 发送消息
* @param msg 消息
* @return 提示
*/
@RequestMapping("/send/{msg}")
public String send(@PathVariable("msg") String msg) {
//
rabbitTemplate.convertAndSend("", "simple_queue", msg);
return "发送完成";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
监听消息:
@Component
public class SimpleListener {
@RabbitListener(queues = "simple_queue")
public void simpleHandler(String msg){
System.out.println(msg);
}
}
1
2
3
4
5
6
7
2
3
4
5
6
7
# 生产者确认和回退的回调
配置文件:
# 开启生产者确认模式:(confirm),投递到交换机,不论失败或者成功都回调
spring.rabbitmq.publisher-confirms: true
# 开启生产者回退模式:(returns),交换机将消息路由到队列,出现异常则回调
spring.rabbitmq.publisher-returns: true
1
2
3
4
2
3
4
回调类,为RabbitTemplate设置回调方法:
@Component
public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 投递到交换机,不论投递成功还是失败都回调次方法
*
* @param correlationData 投递相关数据
* @param ack 是否投递到交换机
* @param cause 投递失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息进入交换机成功{}");
} else {
System.out.println("消息进入交换机失败{} , 失败原因:" + cause);
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("交换机路由至消息队列出错:>>>>>>>");
System.out.println("交换机:" + exchange);
System.out.println("路由键:" + routingKey);
System.out.println("错误状态码:" + replyCode);
System.out.println("错误原因:" + replyText);
System.out.println("发送消息内容:" + message.toString());
System.out.println("<<<<<<<<");
}
/**
* 创建RabbitTemplate对象之后执行当前方法,为模板对象设置回调确认方法
* 设置消息确认回调方法
* 设置消息回退回调方法
*/
@PostConstruct
public void initRabbitTemplate() {
//设置消息确认回调方法
rabbitTemplate.setConfirmCallback(this);
//设置消息回退回调方法
rabbitTemplate.setReturnCallback(this);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# 消费者确认(ACK)
# 开启消费者确认模式
spring.rabbitmq.listener.simple.acknowledge-mode: manual
1
2
2
@Component
public class MessageListener {
@RabbitListener(queues = "simple_queue")
public void acceptMsg(Message message, Channel channel) throws IOException {
byte[] body = message.getBody();
String msg = new String(body, StandardCharsets.UTF_8);
//获取投递标签
MessageProperties messageProperties = message.getMessageProperties();
long deliveryTag = messageProperties.getDeliveryTag();
/**
* 签收消息,前提条件,必须在监听器的配置中,开启手动签收模式
* 参数1:消息投递标签
* 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
* 参数3:是否重回队列
*/
if ("cnm".equals(msg)) {
//拒签
channel.basicNack(deliveryTag, false, false);
System.out.println("拒绝签收,扔掉:{}");
} else {
channel.basicAck(deliveryTag, false);
System.out.println("手动签收完成:{}");
System.out.println(msg);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 消费端限流
rabbitmq.listener.simple.prefetch: 3
1
上次更新: 10/23/2024