RocketMQ-优雅启停

​ 经常使用RocketMQ,在Spring项目中使用Consumer时候,一般在bean初始化的时候,就执行Consumer.start,导致项目中其他bean还没初始化完成就消费MQ消息,存在后续bean初始化失败后,消费处理失败丢失的情况,如何做到优雅启停呢,可以参考RocketMQ-springboot-starter的实现。

​ 如下DefaultRocketMQListenerContainer实现了多个接口:

public class DefaultRocketMQListenerContainer implements InitializingBean,
    RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
    private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class);
    ...

​ 实现ApplicationContextAware接口,获取ApplicationContext,从而间接获取environment,得到配置的rocketmq地址、topic等信息

setApplicationContext(ApplicationContext applicationContext)

​ 实现InitializingBean接口,afterPropertiesSet方法中,初始化consumer的设置

public void afterPropertiesSet() throws Exception {
    initRocketMQPushConsumer();

    this.messageType = getMessageType();
    this.methodParameter = getMethodParameter();
    log.debug("RocketMQ messageType: {}", messageType);
}

​ 实现SmartLifecycle接口,重点是这个接口,SmartLifeCycle接口让开发者可以在所有的bean都创建完成(getBean) 之后执行自己的初始化工作,或者在退出时执行资源销毁工作

  1. isAutoStartup()如果返回false,在启动时也不会执行start方法,默认返回true
  2. isRunning()判断是否已经执行,返回false表示还未执行,则调用start()执行。Phased返回值越大,最晚执行
  3. stop()是容器退出前,如果isRunning()返回true,则会调用。Phased返回值越大,最先执行
public interface SmartLifecycle extends Lifecycle, Phased {

    int DEFAULT_PHASE = Integer.MAX_VALUE;

    default boolean isAutoStartup() {
       return true;
    }

    default void stop(Runnable callback) {
       stop();
       callback.run();
    }

    @Override
    default int getPhase() {
       return DEFAULT_PHASE;
    }
public interface Lifecycle {

    void start();

    void stop();

    boolean isRunning();

}

​ rocketmq是怎么实现的:

  1. 类属性isRunning默认是false,表示consumer还未处于运行中
  2. start()中,进行consumer.start()启动consumer,isRunning设置为true,由于实现的是SmartLifeCycle接口,所以这个时候bean已经初始化好了
  3. getPhase()返回Integer.MAX_VALUE,表示DefaultRocketMQListenerContainer在所有bean初始化完后执行start()启动consumer,在销毁前执行stop(),停止消费MQ消息
public class DefaultRocketMQListenerContainer implements InitializingBean,
    RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
		...
      
    @Override
    public boolean isAutoStartup() {
        return true;
    }

    @Override
    public void stop(Runnable callback) {
        stop();
        callback.run();
    }

    @Override
    public void start() {
        if (this.isRunning()) {
            throw new IllegalStateException("container already running. " + this.toString());
        }

        try {
            consumer.start();
        } catch (MQClientException e) {
            throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
        }
        this.setRunning(true);

        log.info("running container: {}", this.toString());
    }

    @Override
    public void stop() {
        if (this.isRunning()) {
            if (Objects.nonNull(consumer)) {
                consumer.shutdown();
            }
            setRunning(false);
        }
    }

    @Override
    public boolean isRunning() {
        return running;
    }

    private void setRunning(boolean running) {
        this.running = running;
    }

    @Override
    public int getPhase() {
        // Returning Integer.MAX_VALUE only suggests that
        // we will be the first bean to shutdown and last bean to start
        return Integer.MAX_VALUE;
    }

}

​ 上面讲了Consumer的优雅起停,Producer则只需要在Spring容器退出时候执行shutdown即可

public void destroy() {
    if (Objects.nonNull(producer)) {
        producer.shutdown();
    }
}