您好,登錄后才能下訂單哦!
1、group:
組內只有1個實例消費。如果不設置group,則stream會自動為每個實例創建匿名且獨立的group——于是每個實例都會消費
組內單次只有1個實例消費,并且會輪詢負載均衡。通常,在將應用程序綁定到給定目標時,最好始終指定consumer group
2、destination binder:
與外部消息系統通信的組件,為構造 Binding提供了 2 個方法,分別是 bindConsumer 和 bindProducer ,它們分別用于構造生產者和消費者。Binder使Spring Cloud Stream應用程序可以靈活地連接到中間件,目前spring為kafka、rabbitmq提供binder
3、destination binding:
Binding 是連接應用程序跟消息中間件的橋梁,用于消息的消費和生產,由binder創建
4、partition
一個或多個生產者將數據發送到多個消費者,并確保有共同特征標識的數據由同一個消費者處理。默認是對消息進行hashCode,然后根據分區個數取余,所以對于相同的消息,總會落到同一個消費者上
注:嚴格來說partition不屬于概念,而是一種Stream提高伸縮性、吞吐量的一種方式
1、@Input,使用示例:
public interface MySink {
@Input("my-input")
SubscribableChannel input();
}
作用:
2、@Output,使用示例:
public interface MySource {
@Output("my-output")
MessageChannel output();
}
作用:
@Input
類似,只不過是用來生產消息3、@StreamListener,使用示例:
@StreamListener(value = Sink.INPUT, condition = "headers['type']=='dog'")
public void receive(String messageBody) {
log.info("Received: {}", messageBody);
}
作用:
4、@SendTo,使用示例:
// 接收INPUT這個channel的消息,并將返回值發送到OUTPUT這個channel
@StreamListener(Sink.INPUT)
@SendTo(Source.OUTPUT)
public String receive(String receiveMsg) {
return "handle...";
}
作用:
4、@InboundChannelAdapter,使用示例:
@Bean
@InboundChannelAdapter(value = Source.OUTPUT,
poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1"))
public MessageSource<String> producer() {
return () -> new GenericMessage<>("Hello Spring Cloud Stream");
}
作用:
5、@ServiceActivator,使用示例:
@ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT)
public String transform(String payload) {
return payload.toUpperCase();
}
作用:
6、@Transformer,使用示例:
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public Object transform(String message) {
return message.toUpperCase();
}
作用:
@ServiceActivator
類似,標注該注解的方法能夠轉換消息,消息頭,或消息有效內容PollableMessageSource允許消費者可以控制消費速率。舉個例子簡單演示一下,首先定義一個接口:
public interface PolledProcessor {
@Input("pollable-input")
PollableMessageSource input();
}
使用示例:
@Autowired
private PolledProcessor polledProcessor;
@Scheduled(fixedDelay = 5_000)
public void poll() {
polledProcessor.input().poll(message -> {
byte[] bytes = (byte[]) message.getPayload();
String payload = new String(bytes);
System.out.println(payload);
});
}
參考:
https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。