新版SpringCloudSteam+Kafka 更加简单的配置消息收发
在旧版SpringCloudSteam配置中,需要一些手动配置:
@EnableBinding @Input @Output @StreamListener
1
新版有了新的约定简化配置。
spring:
cloud:
function:
definition: consumer1 # #指定消费的方法名 多个用分号分隔 consumer1;consumer2
stream:
bindings:
consumer1-in-0: # 消费者命名规范 消费者方法名-in-0
group: settlement #kafka的group概念 用于区分消费者组 防止组内重复消费
destination: topic #topic
binder: kafka1 # 绑定的消息配置
backOffInitialInterval: 100000 # 重试时初始避退间隔,单位毫秒,默认1000
producer1-out-0: # 生产者命名规范 生产者名-out-0
destination: topic
content-type: application/json
binder: kafka1 # 绑定的消息配置
binders:
kafka1: #消息配置名
type: kafka #类型
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: 192.168.11.11:31090,192.168.11.11:31091,192.168.11.11:31092 #kafka集群
kafka.default.consumer:
pollTimeout: 5000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
之后只需要在容器里注入名为consumer1、counsumer2 类型为java.util.function.Consumer<T>
类型的bean就可以,T会自动用ObjectMapper解析。
@Bean
public Consumer<Person> consumer1() {
//这里是一种函数式接口的写法
return (person) -> {
System.out.println(person);
};
}
1
2
3
4
5
6
7
2
3
4
5
6
7
除了Consumer消费者,还有定期生产者Supplier会定期成产消息,Function消费后生产消息,但是不常用就不介绍了。
发送消息更常见的场景是手动触发:
@Autowired
StreamBridge streamBridge;
public String send (){
//第一个参数为配置文件bindings下的,第二个参数为消息内容
streamBridge.send("producer1-out-0","hi");
return "ok";
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
完!
上次更新: 10/23/2024