07、核心模块CircuitBreaker源码之CircuitBreakerEvents

CircuitBreakerEvent

熔断事件,有以下几种场景

    enum Type {
        /** 请求失败,且不是可被忽略异常,失败次数+1时发布 */
        ERROR(false),
        /**  请求失败,但是是可被忽略异常,失败次数+1时发布 */
        IGNORED_ERROR(false),
        /**  请求成功时发布 */
        SUCCESS(false),
        /** 熔断打开,请求不允许被调用时发布*/
        NOT_PERMITTED(false),
        /** 熔断状态发生变化时发布 */
        STATE_TRANSITION(true),
        /**  熔断被重置时发布 */
        RESET(true),
        /**  熔断被强制开启时发布 */
        FORCED_OPEN(false),
        /** 熔断被强制停止工作时发布 */
        DISABLED(false);

        public final boolean forcePublish;//表示是否强制发布事件

    }

CircuitBreaker向订阅的任何订阅者/消费者发布CircuitBreakerEvents流。
消费或订阅方式

  • 注册EventConsumer
circuitBreaker.getEventPublisher()
    .onSuccess(event -> logger.info(...))
    .onError(event -> logger.info(...))
    .onIgnoredError(event -> logger.info(...))
    .onReset(event -> logger.info(...))
    .onStateTransition(event -> logger.info(...));
// Or if you want to register a consumer listening to all events, you can do:
circuitBreaker.getEventPublisher()
    .onEvent(event -> logger.info(...));

  • CircularEventConsumer
CircularEventConsumer<CircuitBreakerEvent> ringBuffer = new CircularEventConsumer<>(10);
circuitBreaker.getEventPublisher().onEvent(ringBuffer);
List<CircuitBreakerEvent> bufferedEvents = ringBuffer.getBufferedEvents()

  • RxJava2
RxJava2Adapter.toFlowable(circuitBreaker.getEventPublisher())
    .filter(event -> event.getEventType() == Type.ERROR)
    .cast(CircuitBreakerOnErrorEvent.class)
    .subscribe(event -> logger.info(...))

发布事件源码
*
从源码看出是否发布熔断事件,有两重判断:

  • shouldPublishEvents是根据EventType或CircuitBreaker.State枚举中的配置进行判断
    boolean shouldPublishEvents(CircuitBreakerEvent event){
        return event.getEventType().forcePublish || getState().allowPublish;
    }

  • eventProcessor.hasConsumers是根据有没有注册消费者或者订阅者进行判断
public boolean hasConsumers(){
        return consumerRegistered;
    }

*
*
事件消费

CircuitBreakerEventProcessor::consumeEvent > EventProcessor::processEvent

    public <E extends T> boolean processEvent(E event) {
        boolean consumed = false;
        EventConsumer<T> onEventConsumer = this.onEventConsumer;
        if(onEventConsumer != null){
        	//该段主要是注册了事件消费者处理逻辑,
        	//比如circuitBreaker.getEventPublisher() .onSuccess(event -> logger.info(...))
            onEventConsumer.consumeEvent(event);
            consumed = true;
        }
        if(!eventConsumers.isEmpty()){
            EventConsumer<T> eventConsumer = (EventConsumer<T>) eventConsumers.get(event.getClass());
            if(eventConsumer != null){
            	//该段主要是CircularEventConsumer使用场景
            	//如CircularEventConsumer<CircuitBreakerEvent> ringBuffer = new CircularEventConsumer<>(10);
            	//circuitBreaker.getEventPublisher().onEvent(ringBuffer);
                eventConsumer.consumeEvent(event);
                consumed = true;
            }
        }
        return consumed;
    }