前面,已经探讨了:
本文来对Spring Cloud Stream,做一个知识点盘点和总结,包括:
- 概念
- Stream注解
- Spring Cloud Integration(Spring Cloud Stream的底层)注解
- Spring Messaging(Spring消息编程模型)注解
- Spring Cloud Stream API
概念
group
组内只有1个实例消费。如果不设置group,则stream会自动为每个实例创建匿名且独立的group——于是每个实例都会消费。
组内单次只有1个实例消费,并且会轮询负载均衡。通常,在将应用程序绑定到给定目标时,最好始终指定consumer group。
destination binder
与外部消息系统通信的组件,为构造 Binding
提供了 2 个方法,分别是 bindConsumer
和 bindProducer
,它们分别用于构造生产者和消费者。Binder使Spring Cloud Stream应用程序可以灵活地连接到中间件,目前spring为kafka、rabbitmq提供binder。
destination binding
Binding
是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产,由binder创建。
partition
TIPS
严格来说这个不是概念,而是一种Stream提高伸缩性、吞吐量的一种方式。不过不想另起标题了,写在这里吧。
一个或多个生产者将数据发送到多个消费者,并确保有共同特征标识的数据由同一个消费者处理。默认是对消息进行hashCode,然后根据分区个数取余,所以对于相同的消息,总会落到同一个消费者上。
注解
示例:
1 2 3 4
| public interface Barista { @Input("inboundOrders") SubscribableChannel orders(); }
|
作用:
- 用于接收消息
- 为每个binding生成channel实例
- 指定channel名称
- 在spring容器中生成一个名为inboundOrders,类型为SubscribableChannel的bean
- 在spring容器中生成一个类,实现Barista接口。
Output(Stream)
示例:
1 2 3 4
| public interface Source { @Output MessageChannel output(); }
|
作用:
类似Input,只是用来生产消息。
StreamListener(Stream)
示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @StreamListener(value = Sink.INPUT, condition = "headers['type']=='dog'") public void handle(String body) { System.out.println("Received: " + body); }
@Bean @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "2")) public MessageSource<String> test() { return () -> { Map<String, Object> map = new HashMap<>(1); map.put("type", "dog"); return new GenericMessage<>("abcdef", map); }; }
|
作用:
用于消费消息
condition的作用:符合条件,才进入处理方法。
condition起作用的两个条件:
- 注解的方法没有返回值
- 方法是一个独立方法,不支持Reactive API
SendTo(messaging)
示例:
1 2 3 4 5 6
| @StreamListener(Sink.INPUT) @SendTo(Source.OUTPUT) public String receive(String receiveMsg) { return "handle..."; }
|
作用:
用于发送消息
InboundChannelAdapter(Integration)
示例:
1 2 3 4 5 6
| @Bean @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1")) public MessageSource<String> test() { return () -> new GenericMessage<>("Hello Spring Cloud Stream"); }
|
作用:
表示让定义的方法生产消息。
注:用 InboundChannelAdapter
注解的方法上即使有参数也没用。即下面test方法不要有参数。
- fixedDelay:多少毫秒发送1次
- maxMessagesPerPoll:一次发送几条消息。
ServiceActivator(Integration)
示例:
1 2 3 4
| @ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT) public String transform(String payload) { return payload.toUpperCase(); }
|
作用:
表示方法能够处理消息或消息有效内容,监听input消息,用方法体的代码处理,然后输出到output中。
示例:
1 2 3 4
| @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform(String message) { return message.toUpperCase(); }
|
作用:
和 ServiceActivator
类似,表示方法能够转换消息,消息头,或消息有效内容
PollableMessageSource(Stream)
示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| @SpringBootApplication @EnableBinding({ConsumerApplication.PolledProcessor.class}) @EnableScheduling public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); }
@Autowired private PolledProcessor polledProcessor;
@Scheduled(fixedDelay = 5_000) public void poll() { polledProcessor.input().poll(message -> { byte[] bytes = (byte[]) message.getPayload(); String payload = new String(bytes); System.out.println(payload); }); }
public interface PolledProcessor { @Input PollableMessageSource input();
@Output MessageChannel output(); }
@Bean @InboundChannelAdapter(value = "output", poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) public MessageSource<String> test() { return () -> { Map<String, Object> map = new HashMap<>(1); map.put("type", "dog"); return new GenericMessage<>("adfdfdsafdsfa", map); }; } }
|
如果不想自己做byte数组转换,可以添加配置:
1 2 3 4 5 6 7
| spring: cloud: stream: bindings: output: content-type: text/plain
|
作用:
允许消费者控制消费速率。
相关文章:
https://spring.io/blog/2018/02/27/spring-cloud-stream-2-0-polled-consumers
相关文章
评论系统未开启,无法评论!