Dra-M Dra-M
首页
技术
冥思
哲学
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

莫小龙

保持理智,相信未来。
首页
技术
冥思
哲学
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • Java

  • Golang

  • 编程思想

  • 微服务

    • SpringCloud

      • SpringBoot RabbitMQ 快速启动
      • Spring Task fixedDelayString不能随着配置中心动态变化的问题
      • 新版SpringCloudSteam+Kafka 更加简单的配置消息收发
    • Kubernetes

    • Oauth2+Gateway+JWT+RSA 解决微服务单点登录问题
  • 中间件

  • Python

  • 运维

  • 技术
  • 微服务
  • SpringCloud
莫小龙
2020-12-31

新版SpringCloudSteam+Kafka 更加简单的配置消息收发

在旧版SpringCloudSteam配置中,需要一些手动配置:

@EnableBinding @Input @Output @StreamListener
1

新版有了新的约定简化配置。

spring:
  cloud:
    function:
      definition: consumer1 # #指定消费的方法名 多个用分号分隔 consumer1;consumer2 
    stream:
      bindings:
        consumer1-in-0: # 消费者命名规范 消费者方法名-in-0
          group: settlement #kafka的group概念 用于区分消费者组 防止组内重复消费
          destination: topic #topic
          binder: kafka1  # 绑定的消息配置
          backOffInitialInterval: 100000 # 重试时初始避退间隔,单位毫秒,默认1000 
        producer1-out-0: # 生产者命名规范 生产者名-out-0
          destination: topic
          content-type: application/json 
          binder: kafka1 # 绑定的消息配置
      binders:
        kafka1: #消息配置名
          type: kafka #类型
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: 192.168.11.11:31090,192.168.11.11:31091,192.168.11.11:31092 #kafka集群
      kafka.default.consumer:
        pollTimeout: 5000
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

之后只需要在容器里注入名为consumer1、counsumer2 类型为java.util.function.Consumer<T>类型的bean就可以,T会自动用ObjectMapper解析。

  @Bean
  public Consumer<Person> consumer1() {
    //这里是一种函数式接口的写法
    return (person) -> {
       System.out.println(person);
    };
  }
1
2
3
4
5
6
7

除了Consumer消费者,还有定期生产者Supplier会定期成产消息,Function消费后生产消息,但是不常用就不介绍了。

发送消息更常见的场景是手动触发:

  @Autowired
  StreamBridge streamBridge; 

  public String send (){
    //第一个参数为配置文件bindings下的,第二个参数为消息内容
    streamBridge.send("producer1-out-0","hi");
    return "ok";
  }
1
2
3
4
5
6
7
8

完!


#微服务#kafka#SpringCloud
上次更新: 10/23/2024
Spring Task fixedDelayString不能随着配置中心动态变化的问题
K8S远程调用小记

← Spring Task fixedDelayString不能随着配置中心动态变化的问题 K8S远程调用小记→

最近更新
01
mosquito配置ws协议
10-23
02
Pip包的离线下载和安装
10-23
03
stable diffusion 相关收藏
02-24
更多文章>
Theme by Vdoing | Copyright © 2019-2024 Dra-M
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式