RocketMQ-优雅启停
经常使用RocketMQ,在Spring项目中使用Consumer时候,一般在bean初始化的时候,就执行Consumer.start,导致项目中其他bean还没初始化完成就消费MQ消息,存在后续bean初始化失败后,消费处理失败丢失的情况,如何做到优雅启停呢,可以参考RocketMQ-springboot-starter的实现。
如下DefaultRocketMQListenerContainer实现了多个接口:
public class DefaultRocketMQListenerContainer implements InitializingBean,
RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class);
...
实现ApplicationContextAware接口,获取ApplicationContext,从而间接获取environment,得到配置的rocketmq地址、topic等信息
setApplicationContext(ApplicationContext applicationContext)
实现InitializingBean接口,afterPropertiesSet方法中,初始化consumer的设置
public void afterPropertiesSet() throws Exception {
initRocketMQPushConsumer();
this.messageType = getMessageType();
this.methodParameter = getMethodParameter();
log.debug("RocketMQ messageType: {}", messageType);
}
实现SmartLifecycle接口,重点是这个接口,SmartLifeCycle接口让开发者可以在所有的bean都创建完成(getBean) 之后执行自己的初始化工作,或者在退出时执行资源销毁工作
- isAutoStartup()如果返回
false
,在启动时也不会执行start方法,默认返回true
- isRunning()判断是否已经执行,返回false表示还未执行,则调用
start()
执行。Phased
返回值越大,最晚执行 - stop()是容器退出前,如果isRunning()返回true,则会调用。
Phased
返回值越大,最先执行
public interface SmartLifecycle extends Lifecycle, Phased {
int DEFAULT_PHASE = Integer.MAX_VALUE;
default boolean isAutoStartup() {
return true;
}
default void stop(Runnable callback) {
stop();
callback.run();
}
@Override
default int getPhase() {
return DEFAULT_PHASE;
}
public interface Lifecycle {
void start();
void stop();
boolean isRunning();
}
rocketmq是怎么实现的:
- 类属性isRunning默认是false,表示consumer还未处于运行中
- start()中,进行consumer.start()启动consumer,isRunning设置为true,由于实现的是SmartLifeCycle接口,所以这个时候bean已经初始化好了
- getPhase()返回Integer.MAX_VALUE,表示DefaultRocketMQListenerContainer在所有bean初始化完后执行start()启动consumer,在销毁前执行stop(),停止消费MQ消息
public class DefaultRocketMQListenerContainer implements InitializingBean,
RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware {
...
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
stop();
callback.run();
}
@Override
public void start() {
if (this.isRunning()) {
throw new IllegalStateException("container already running. " + this.toString());
}
try {
consumer.start();
} catch (MQClientException e) {
throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
}
this.setRunning(true);
log.info("running container: {}", this.toString());
}
@Override
public void stop() {
if (this.isRunning()) {
if (Objects.nonNull(consumer)) {
consumer.shutdown();
}
setRunning(false);
}
}
@Override
public boolean isRunning() {
return running;
}
private void setRunning(boolean running) {
this.running = running;
}
@Override
public int getPhase() {
// Returning Integer.MAX_VALUE only suggests that
// we will be the first bean to shutdown and last bean to start
return Integer.MAX_VALUE;
}
}
上面讲了Consumer的优雅起停,Producer则只需要在Spring容器退出时候执行shutdown即可
public void destroy() {
if (Objects.nonNull(producer)) {
producer.shutdown();
}
}