温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Spring Cloud Stream如何使用

发布时间:2021-12-29 09:42:45 来源:亿速云 阅读:248 作者:iii 栏目:软件技术

本篇内容介绍了“Spring Cloud Stream如何使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

Spring Cloud Stream是构建消息驱动的微服务应用程序的框架。Spring Cloud Stream基于Spring Boot建立独立的生产级Spring应用程序,并使用Spring Integration提供与消息代理的连接。它提供了来自几家供应商的中间件的意见配置,介绍了持久发布订阅语义,消费者组和分区的概念。

您可以将@EnableBinding注释添加到应用程序,以便立即连接到消息代理,并且可以将@StreamListener添加到方法中,以使其接收流处理的事件。以下是接收外部消息的简单接收器应用程序。

@SpringBootApplication@EnableBinding(Sink.class)public class VoteRecordingSinkApplication {
  public static void main(String[] args) {
    SpringApplication.run(VoteRecordingSinkApplication.class, args);
  }
  @StreamListener(Sink.INPUT)
  public void processVote(Vote vote) {
      votingService.recordVote(vote);
  }}

@EnableBinding注释需要一个或多个接口作为参数(在这种情况下,该参数是单个Sink接口)。接口声明输入和/或输出通道。Spring Cloud Stream提供了接口Source,Sink和Processor; 您还可以定义自己的界面。

以下是Sink接口的定义:

public interface Sink {
  String INPUT = "input";
  @Input(Sink.INPUT)
  SubscribableChannel input();}

@Input注释标识输入通道,通过该输入通道接收到的消息进入应用程序; @Output注释标识输出通道,发布的消息将通过该通道离开应用程序。@Input和@Output注释可以使用频道名称作为参数; 如果未提供名称,将使用注释方法的名称。

Spring Cloud Stream将为您创建一个界面的实现。您可以在应用程序中通过自动连接来使用它,如下面的测试用例示例。

@RunWith(SpringJUnit4ClassRunner.class)@SpringApplicationConfiguration(classes = VoteRecordingSinkApplication.class)@WebAppConfiguration@DirtiesContextpublic class StreamApplicationTests {
  @Autowired
  private Sink sink;
  @Test
  public void contextLoads() {
    assertNotNull(this.sink.input());
  }}

编程模型

Binder

Binder 是 Spring Cloud Stream 的一个抽象概念,是应用与消息中间件之间的粘合剂。
目前 Spring Cloud Stream 实现了 Kafka 和 Rabbit MQ 的binder。通过 binder ,可以很方便的连接中间件,可以动态的改变消息的destinations(对应于 Kafka 的topic,Rabbit MQ 的 exchanges),这些都可以通过外部配置项来做到。甚至可以任意的改变中间件的类型而不需要修改一行代码。

Publish-Subscribe

消息的发布(Publish)和订阅(Subscribe)是事件驱动的经典模式。Spring Cloud Stream 的数据交互也是基于这个思想。生产者把消息通过某个 topic 广播出去(Spring Cloud Stream 中的 destinations)。其他的微服务,通过订阅特定 topic 来获取广播出来的消息来触发业务的进行。

这种模式,极大的降低了生产者与消费者之间的耦合。即使有新的应用的引入,也不需要破坏当前系统的整体结构。

Consumer Groups

“Group”, Kafka 中的概念。Spring Cloud Stream 的这个分组概念的意思基本和 Kafka 一致。

微服务中动态的缩放同一个应用的数量以此来达到更高的处理能力是非常必须的。对于这种情况,同一个事件防止被重复消费,只要把这些应用放置于同一个 “group” 中,就能够保证消息只会被其中一个应用消费一次。

Message

Message,就是所说的消息体,用来承载传输的信息用的。Message分为两部分,header和payload。header是头部信息,用来存储传输的一些特性属性参数。payload是用来装载数据的,他可以携带的任何Object对象  不同的对象在binder中传输 可以指定不同的mini类型

可以通过application.yml中设置 输入input和输出output的mini类型

spring.cloud.stream.bindings..content-type

MessageChannel

消息管道,生产者生产一个消息到channel,消费者从channel消费一个消息,所以channel可以对消息组件解耦,并且提供一个方便的拦截功能和监控功能。
默认的通道

输入(SubscribableChannel)和输出通道(MessageChannel)参考 Processor接口

springcloudstream提供通道的定义 比如自定义通过可以使用接口

public interface OrderChannel {
   String INPUT = "input_order";
   String OUTPUT="ouput_order";
   /**
    * input注解制定通道的名称  将来在yml中配置该通道的实际绑定的topic或者订阅组
    * @return
    */
   @Input(INPUT)
   SubscribableChannel orderInput();
   /**
    * output注解指定输出通道的名称
    * @return
    */
   @Output(OUTPUT)
   MessageChannel orderOutput();}

以下 代码参考 Source Sink Processor接口 将来在yml关于该通道的配置既可以

spring: 
    cloud: 
        stream: 
            bindings: 
                通道名称: 
                    destination: mydest

“Spring Cloud Stream如何使用”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI