CircuitBreakerEvent
熔断事件,有以下几种场景
enum Type {
/** 请求失败,且不是可被忽略异常,失败次数+1时发布 */
ERROR(false),
/** 请求失败,但是是可被忽略异常,失败次数+1时发布 */
IGNORED_ERROR(false),
/** 请求成功时发布 */
SUCCESS(false),
/** 熔断打开,请求不允许被调用时发布*/
NOT_PERMITTED(false),
/** 熔断状态发生变化时发布 */
STATE_TRANSITION(true),
/** 熔断被重置时发布 */
RESET(true),
/** 熔断被强制开启时发布 */
FORCED_OPEN(false),
/** 熔断被强制停止工作时发布 */
DISABLED(false);
public final boolean forcePublish;//表示是否强制发布事件
}
CircuitBreaker向订阅的任何订阅者/消费者发布CircuitBreakerEvents流。
消费或订阅方式
- 注册EventConsumer
circuitBreaker.getEventPublisher()
.onSuccess(event -> logger.info(...))
.onError(event -> logger.info(...))
.onIgnoredError(event -> logger.info(...))
.onReset(event -> logger.info(...))
.onStateTransition(event -> logger.info(...));
// Or if you want to register a consumer listening to all events, you can do:
circuitBreaker.getEventPublisher()
.onEvent(event -> logger.info(...));
- CircularEventConsumer
CircularEventConsumer<CircuitBreakerEvent> ringBuffer = new CircularEventConsumer<>(10);
circuitBreaker.getEventPublisher().onEvent(ringBuffer);
List<CircuitBreakerEvent> bufferedEvents = ringBuffer.getBufferedEvents()
- RxJava2
RxJava2Adapter.toFlowable(circuitBreaker.getEventPublisher())
.filter(event -> event.getEventType() == Type.ERROR)
.cast(CircuitBreakerOnErrorEvent.class)
.subscribe(event -> logger.info(...))
发布事件源码
从源码看出是否发布熔断事件,有两重判断:
- shouldPublishEvents是根据EventType或CircuitBreaker.State枚举中的配置进行判断
boolean shouldPublishEvents(CircuitBreakerEvent event){
return event.getEventType().forcePublish || getState().allowPublish;
}
- eventProcessor.hasConsumers是根据有没有注册消费者或者订阅者进行判断
public boolean hasConsumers(){
return consumerRegistered;
}
事件消费
CircuitBreakerEventProcessor::consumeEvent > EventProcessor::processEvent
public <E extends T> boolean processEvent(E event) {
boolean consumed = false;
EventConsumer<T> onEventConsumer = this.onEventConsumer;
if(onEventConsumer != null){
//该段主要是注册了事件消费者处理逻辑,
//比如circuitBreaker.getEventPublisher() .onSuccess(event -> logger.info(...))
onEventConsumer.consumeEvent(event);
consumed = true;
}
if(!eventConsumers.isEmpty()){
EventConsumer<T> eventConsumer = (EventConsumer<T>) eventConsumers.get(event.getClass());
if(eventConsumer != null){
//该段主要是CircularEventConsumer使用场景
//如CircularEventConsumer<CircuitBreakerEvent> ringBuffer = new CircularEventConsumer<>(10);
//circuitBreaker.getEventPublisher().onEvent(ringBuffer);
eventConsumer.consumeEvent(event);
consumed = true;
}
}
return consumed;
}