1、group:
组内只有1个实例消费。如果不设置group,则stream会自动为每个实例创建匿名且独立的group——于是每个实例都会消费
组内单次只有1个实例消费,并且会轮询负载均衡。通常,在将应用程序绑定到给定目标时,最好始终指定consumer group
2、destination binder:
与外部消息系统通信的组件,为构造 Binding提供了 2 个方法,分别是 bindConsumer 和 bindProducer ,它们分别用于构造生产者和消费者。Binder使Spring Cloud Stream应用程序可以灵活地连接到中间件,目前spring为kafka、rabbitmq提供binder
3、destination binding:
Binding 是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产,由binder创建
4、partition
一个或多个生产者将数据发送到多个消费者,并确保有共同特征标识的数据由同一个消费者处理。默认是对消息进行hashCode,然后根据分区个数取余,所以对于相同的消息,总会落到同一个消费者上
注:严格来说partition不属于概念,而是一种Stream提高伸缩性、吞吐量的一种方式
1、@Input,使用示例:
public interface MySink {
@Input("my-input")
SubscribableChannel input();
}
作用:
2、@Output,使用示例:
public interface MySource {
@Output("my-output")
MessageChannel output();
}
作用:
@Input
类似,只不过是用来生产消息3、@StreamListener,使用示例:
@StreamListener(value = Sink.INPUT, condition = "headers['type']=='dog'")
public void receive(String messageBody) {
log.info("Received: {}", messageBody);
}
作用:
4、@SendTo,使用示例:
// 接收INPUT这个channel的消息,并将返回值发送到OUTPUT这个channel
@StreamListener(Sink.INPUT)
@SendTo(Source.OUTPUT)
public String receive(String receiveMsg) {
return "handle...";
}
作用:
4、@InboundChannelAdapter,使用示例:
@Bean
@InboundChannelAdapter(value = Source.OUTPUT,
poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))
public MessageSource<String> producer() {
return () -> new GenericMessage<>("Hello Spring Cloud Stream");
}
作用:
5、@ServiceActivator,使用示例:
@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT)
public String transform(String payload) {
return payload.toUpperCase();
}
作用:
6、@Transformer,使用示例:
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Object transform(String message) {
return message.toUpperCase();
}
作用:
@ServiceActivator
类似,标注该注解的方法能够转换消息,消息头,或消息有效内容PollableMessageSource允许消费者可以控制消费速率。举个例子简单演示一下,首先定义一个接口:
public interface PolledProcessor {
@Input("pollable-input")
PollableMessageSource input();
}
使用示例:
@Autowired
private PolledProcessor polledProcessor;
@Scheduled(fixedDelay = 5_000)
public void poll() {
polledProcessor.input().poll(message -> {
byte[] bytes = (byte[]) message.getPayload();
String payload = new String(bytes);
System.out.println(payload);
});
}
参考:
https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。