本文基于Spring Cloud Stream 2.2.0.RC1,包含其新特性。
本文是当初学习Spring Cloud Stream的笔记,最初写于16年。
原本想开个Spring Cloud Stream系列文章连载,写Spring Cloud Stream算是个人夙愿了——首先这是个人非常喜欢的组件,它屏蔽了各种MQ的差异,统一了编程模型(可以类比成基于MQ通信圈的”Spring Data”);其次个人实体书《Spring Cloud 与 Docker 微服务架构实战》没有包含这部分内容也是一大遗憾;更重要的是,这货细节其实挺多,而且上手是稍微有一点曲线的。
然而,个人已同时在更新 Spring Cloud 系列以及 Spring Cloud Alibaba 系列了,再开一个系列感觉精力跟不上。于是,暂时先对照 Spring Cloud Stream 最新文档,将内容见到到最新版本,包括新特性。
更新完现有系列后,还是会考虑出一个 Spring Cloud Stream 从入门到精通系列教程。
概念 group
By default, when a group is not specified, Spring Cloud Stream assigns the application to an anonymous and independent single-member consumer group that is in a publish-subscribe relationship with all other consumer groups.
In general, it is preferable to always specify a consumer group when binding an application to a given destination.
One or more producer application instances send data to multiple consumer application instances and ensure that data identified by common characteristics are processed by the same consumer instance.
destination binder 与外部消息系统通信的组件,为构造 Binding
提供了 2 个方法,分别是 bindConsumer
和 bindProducer
,它们分别用于构造生产者和消费者。Binder使Spring Cloud Stream应用程序可以灵活地连接到中间件,目前spring为kafka、rabbitmq提供binder。
destination binding Binding
Applying the @EnableBinding annotation to one of the application’s configuration classes defines a destination binding.
使用@EnableBinding即可定义destination binding
Aside from generating channels for each binding and registering them as Spring beans, for each bound interface, Spring Cloud Stream generates a bean that implements the interface.
1 2 3 4 public interface Barista { @Input("inboundOrders") SubscribableChannel orders () ; }
@InboundChannelAdapter 示例:
1 2 3 4 5 6 @Bean @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1")) public MessageSource<String> test () { return () -> new GenericMessage<>("Hello Spring Cloud Stream" ); }
ServiceActivator 1 2 3 4 @ServiceActivator(inputChannel = Sink.INPUT, outputChannel = Source.OUTPUT) public String transform(String payload) { return payload.toUpperCase(); }
In order to be eligible to support conditional dispatching, a method must satisfy the follow conditions:
It must not return a value.
It must be an individual message handling method (reactive API methods are not supported).
方法是一个独立方法,不支持Reactive API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @StreamListener(value = Sink.INPUT, condition = "headers['type']=='dog'") public void handle (String body) { System.out.println("Received: " + body); } @Bean @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "2")) public MessageSource<String> test () { return () -> { Map<String, Object> map = new HashMap<>(1 ); map.put("type" , "dog" ); return new GenericMessage<>("abcdef" , map); }; }
PollableMessageSource 作用:允许消费者控制消费速率。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @SpringBootApplication @EnableBinding({ConsumerApplication.PolledProcessor.class}) @EnableScheduling public class ConsumerApplication { public static void main (String[] args) {, args); } @Autowired private PolledProcessor polledProcessor; @Scheduled(fixedDelay = 5_000) public void poll () { polledProcessor.input().poll(message -> { byte [] bytes = (byte []) message.getPayload(); String payload = new String(bytes); System.out.println(payload); }); } public interface PolledProcessor { @Input PollableMessageSource input () ; @Output MessageChannel output () ; } @Bean @InboundChannelAdapter(value = "output", poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) public MessageSource<String> test () { return () -> { Map<String, Object> map = new HashMap<>(1 ); map.put("type" , "dog" ); return new GenericMessage<>("adfdfdsafdsfa" , map); }; } }
1 2 3 4 5 6 7 spring: cloud: stream: bindings: output: content-type: text/plain
错误处理 应用处理 方式1:处理指定channel 配置:
1 2 3 4 5 6 7 8 9 spring: cloud: stream: bindings: input: destination: my-destination group: my-group output: destination: my-destination
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Slf4j @SpringBootApplication @EnableBinding({Processor.class}) @EnableScheduling public class ConsumerApplication { public static void main (String[] args) {, args); } @StreamListener(value = Processor.INPUT) public void handle (String body) { throw new RuntimeException("x" ); } @ServiceActivator(inputChannel = "") public void handleError (ErrorMessage message) { Throwable throwable = message.getPayload(); log.error("截获异常" , throwable); Message<?> originalMessage = message.getOriginalMessage(); assert originalMessage != null ;"原始消息体 = {}" , new String((byte []) originalMessage.getPayload())); } @Bean @InboundChannelAdapter(value = Processor.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) public MessageSource<String> test () { return () -> new GenericMessage<>("adfdfdsafdsfa" ); } }
方式2:处理所有channel 1 2 3 4 5 6 7 8 9 10 @StreamListener(value = Processor.INPUT) public void handle (String body) { throw new RuntimeException("x" ); } @StreamListener("errorChannel") public void error (Message<?> message) { ErrorMessage errorMessage = (ErrorMessage) message; System.out.println("Handling ERROR: " + errorMessage); }
系统处理 系统处理方式,因消息中间件不同而异。如果应用没有配置错误处理器,那么error将会被传播给binder,binder将error回传给消息中间件。消息中间件可以丢弃消息、requeue(重新排队,从而重新处理)或将失败的消息发送给DLQ(死信队列)。
丢弃 默认情况下,错误消息将被丢弃。虽然在某些情况下可以接受,但这种方式一般不适用于生产。
DLQ 配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 spring: cloud: stream: bindings: input: destination: my-destination group: my-group output: destination: my-destination rabbit: bindings: input: consumer: auto-bind-dlq: true
1 2 3 4 5 6 7 8 9 10 11 @StreamListener(value = Processor.INPUT) public void handle (String body) { throw new RuntimeException("x" ); } @Bean @InboundChannelAdapter(value = Processor.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) public MessageSource<String> test () { return () -> new GenericMessage<>("adfdfdsafdsfa" ); }
1 2 3 4 5 6 7 8 spring: cloud: stream: rabbit: bindings: input: consumer: republish-to-dlq: true
requeue Rabbit/Kafka的binder依赖RetryTemplate实现重试,从而提升消息处理的成功率。然而,如果设置了
1 2 3 4<input channel名称>.consumer.max-attempts=1 =true
这样,失败的消息将会被重新提交到同一个handler进行处理,直到handler抛出 AmqpRejectAndDontRequeueException
RetryTemplate 配置方式 RetryTemplate重试也是错误处理的一种手段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 spring: cloud: stream: bindings: <input channel名称>: consumer: maxAttempts: 3 backOffInitialInterval: 1000 backOffMaxInterval: 10000 backOffMultiplier: 2.0 defaultRetryable: true retryableExceptions: java.lang.RuntimeException: true java.lang.IllegalStateException: false
1 2 3 4 5 6 7 8 9 10 11 12 13 @StreamListener(value = Processor.INPUT) public void handle (String body) { throw new RuntimeException(body); } private AtomicInteger count = new AtomicInteger(0 );@Bean @InboundChannelAdapter(value = Processor.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) public MessageSource<String> test () { return () -> new GenericMessage<>(count.getAndAdd(1 ) + "" ); }
编码方式 多数场景下,使用配置方式定制重试行为都是可以满足需求的,但配置方式可能无法满足一些复杂需求。此时可使用编码方式配置RetryTemplate:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 @Configuration class RetryConfiguration { @StreamRetryTemplate public RetryTemplate sinkConsumerRetryTemplate () { RetryTemplate retryTemplate = new RetryTemplate(); retryTemplate.setRetryPolicy(retryPolicy()); retryTemplate.setBackOffPolicy(backOffPolicy()); return retryTemplate; } private ExceptionClassifierRetryPolicy retryPolicy () { BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier( Collections.singletonList(IllegalAccessException.class )); keepRetryingClassifier.setTraverseCauses(true ); SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3 ); AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy(); ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy(); retryPolicy.setExceptionClassifier( classifiable -> keepRetryingClassifier.classify(classifiable) ? alwaysRetryPolicy : simpleRetryPolicy); return retryPolicy; } private FixedBackOffPolicy backOffPolicy () { final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); backOffPolicy.setBackOffPeriod(2 ); return backOffPolicy; } }
1<input channel名称>.consumer.retry-template-name=myRetryTemplate
Spring Cloud Stream 2.2才支持设置retry-template-name
动态绑定目标 这是Spring Integration原生的API,建议有时间了解下Spring Integration相关文档。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 @EnableBinding @Controller public class SourceWithDynamicDestination { @Autowired private BinderAwareChannelResolver resolver; @Autowired @Qualifier("sourceChannel") private MessageChannel localChannel; @RequestMapping(path = "/", method = POST, consumes = "*/*") @ResponseStatus(HttpStatus.ACCEPTED) public void handleRequest (@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) { localChannel.send( MessageBuilder.createMessage( body, new MessageHeaders( Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType) ) ) ); } @Bean(name = "sourceChannel") public MessageChannel localChannel () { return new DirectChannel(); } @Bean @ServiceActivator(inputChannel = "sourceChannel") public ExpressionEvaluatingRouter router () { ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("" )); router.setDefaultOutputChannelName("default-output" ); router.setChannelResolver(resolver); return router; } @EnableBinding(Sink.class) static class TestSink { private final Log logger = LogFactory.getLog(getClass()); @StreamListener(Sink.INPUT1) public void receive (String data) {"Data received from customer-1..." + data); } @StreamListener(Sink.INPUT2) public void receiveX (String data) {"Data received from customer-2..." + data); } } interface Sink { String INPUT1 = "input1" ; String INPUT2 = "input2" ; @Input(INPUT1) SubscribableChannel input1 () ; @Input(INPUT2) SubscribableChannel input2 () ; } }