TIPS
本文基于Spring Cloud Greenwich SR1,理论支持Finchley及更高版本。
本节详细探讨Spring Cloud Stream的错误处理。
应用处理
局部处理【通用】
配置:
1 2 3 4 5 6 7 8 9
| spring: cloud: stream: bindings: input: destination: my-destination group: my-group output: destination: my-destination
|
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Slf4j @SpringBootApplication @EnableBinding({Processor.class}) @EnableScheduling public class ConsumerApplication { public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); }
@StreamListener(value = Processor.INPUT) public void handle(String body) { throw new RuntimeException("x"); }
@ServiceActivator(inputChannel = "my-destination.my-group.errors") public void handleError(ErrorMessage message) { Throwable throwable = message.getPayload(); log.error("截获异常", throwable);
Message<?> originalMessage = message.getOriginalMessage(); assert originalMessage != null;
log.info("原始消息体 = {}", new String((byte[]) originalMessage.getPayload())); }
@Bean @InboundChannelAdapter(value = Processor.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) public MessageSource<String> test() { return () -> new GenericMessage<>("adfdfdsafdsfa"); } }
|
全局处理【通用】
1 2 3 4 5 6 7 8 9 10
| @StreamListener(value = Processor.INPUT) public void handle(String body) { throw new RuntimeException("x"); }
@StreamListener("errorChannel") public void error(Message<?> message) { ErrorMessage errorMessage = (ErrorMessage) message; System.out.println("Handling ERROR: " + errorMessage); }
|
系统处理
系统处理方式,因消息中间件不同而异。如果应用没有配置错误处理,那么error将会被传播给binder,binder将error回传给消息中间件。消息中间件可以丢弃消息、requeue(重新排队,从而重新处理)或将失败的消息发送给DLQ(死信队列)。
丢弃
默认情况下,错误消息将被丢弃。虽然在某些情况下可以接受,但这种方式一般不适用于生产。
DLQ【RabbitMQ】
TIPS
- 虽然RocketMQ也支持DLQ,但目前RocketMQ控制台并不支持在界面上操作,将死信放回消息队列,让客户端重新处理。所以使用很不方便,而且用法也和本节有一些差异。
- 如使用RocketMQ,建议参考上面【应用处理】一节的用法,也可额外订阅这个Topic
%DLQ%+consumerGroup
- 个人给RocketMQ控制台提的Issue:
https://github.com/apache/rocketmq/issues/1334
配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| spring: cloud: stream: bindings: input: destination: my-destination group: my-group output: destination: my-destination rabbit: bindings: input: consumer: auto-bind-dlq: true
|
代码:
1 2 3 4 5 6 7 8 9 10 11
| @StreamListener(value = Processor.INPUT) public void handle(String body) { throw new RuntimeException("x"); }
@Bean @InboundChannelAdapter(value = Processor.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) public MessageSource<String> test() { return () -> new GenericMessage<>("adfdfdsafdsfa"); }
|
这样,消息消费失败后,就会放入死信队列。在控制台操作一下,即可将死信放回消息队列,这样,客户端就可以重新处理。
如果想获取原始错误的异常堆栈,可添加如下配置:
1 2 3 4 5 6 7 8
| spring: cloud: stream: rabbit: bindings: input: consumer: republish-to-dlq: true
|
requeue【RabbitMQ】
Rabbit/Kafka的binder依赖RetryTemplate实现重试,从而提升消息处理的成功率。然而,如果设置了spring.cloud.stream.bindings.input.consumer.max-attempts=1
,那么RetryTemplate则不再重试。此时可通过requeue方式处理异常。
添加如下配置:
1 2 3 4
| spring.cloud.stream.bindings.<input channel名称>.consumer.max-attempts=1
spring.cloud.stream.rabbit.bindings.input.consumer.requeue-rejected=true
|
这样,失败的消息将会被重新提交到同一个handler进行处理,直到handler抛出 AmqpRejectAndDontRequeueException
异常为止。
RetryTemplate【通用】
配置方式
RetryTemplate重试也是错误处理的一种手段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| spring: cloud: stream: bindings: <input channel名称>: consumer: maxAttempts: 3 backOffInitialInterval: 1000 backOffMaxInterval: 10000 backOffMultiplier: 2.0 defaultRetryable: true retryableExceptions: java.lang.RuntimeException: true java.lang.IllegalStateException: false
|
测试代码:
1 2 3 4 5 6 7 8 9 10 11 12 13
| @StreamListener(value = Processor.INPUT) public void handle(String body) { throw new RuntimeException(body); }
private AtomicInteger count = new AtomicInteger(0);
@Bean @InboundChannelAdapter(value = Processor.OUTPUT, poller = @Poller(fixedDelay = "1000", maxMessagesPerPoll = "1")) public MessageSource<String> test() { return () -> new GenericMessage<>(count.getAndAdd(1) + ""); }
|
编码方式
多数场景下,使用配置方式定制重试行为都是可以满足需求的,但配置方式可能无法满足一些复杂需求。此时可使用编码方式配置RetryTemplate:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| @Configuration class RetryConfiguration { @StreamRetryTemplate public RetryTemplate sinkConsumerRetryTemplate() { RetryTemplate retryTemplate = new RetryTemplate(); retryTemplate.setRetryPolicy(retryPolicy()); retryTemplate.setBackOffPolicy(backOffPolicy());
return retryTemplate; }
private ExceptionClassifierRetryPolicy retryPolicy() { BinaryExceptionClassifier keepRetryingClassifier = new BinaryExceptionClassifier( Collections.singletonList(IllegalAccessException.class )); keepRetryingClassifier.setTraverseCauses(true);
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy(3); AlwaysRetryPolicy alwaysRetryPolicy = new AlwaysRetryPolicy();
ExceptionClassifierRetryPolicy retryPolicy = new ExceptionClassifierRetryPolicy(); retryPolicy.setExceptionClassifier( classifiable -> keepRetryingClassifier.classify(classifiable) ? alwaysRetryPolicy : simpleRetryPolicy);
return retryPolicy; }
private FixedBackOffPolicy backOffPolicy() { final FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); backOffPolicy.setBackOffPeriod(2);
return backOffPolicy; } }
|
然后添加配置:
1
| spring.cloud.stream.bindings.<input channel名称>.consumer.retry-template-name=myRetryTemplate
|
注意:
Spring Cloud Stream 2.2才支持设置retry-template-name
相关文章
评论系统未开启,无法评论!