一、背景
服务之前都是部署在ECS的虚拟机中,最近都转移部署到k8s容器,在发布的过程中,发现请求容易访问到不健康的节点。所以,这里需要梳理下我们的访问链路,找到问题,明确我们的目标。
问题是Pod是running状态,并不代表服务已健康,如果这时候就让请求访问进来,就会出现上面的问题。我们需告知pod健康的依据是什么,得等到pod是就绪状态了,才接受外部请求。
本文主要讲述k8s下,该怎么做到优雅停机和启动pod,末尾会总结下还遗留的未解决的问题。
二、目标
- 在请求流量低峰期,即使是白天,也能正常部署重启服务。
- 服务的发布做到优雅停机,尽可能不让工作中的任务被丢弃。
- 在服务停止前,先新增一个Pod节点,必须确保新节点的服务已健康,才对旧节点进行重启。
- 服务停止前,不再接收http请求和mq请求。同理,等到服务健康后,才接受外部请求。
三、总体设计
3.1、服务调用关系
这里的网关,也可以是api-six等,作为ingress的角色。
image.png
3.2、滚动发布流程
先启新版本的Pod,确保健康后,再停掉旧版本的Pod,以此类推。当然,发布过程中,会有新旧版本同时运行,所以称之为滚动更新。如果新版本的pod启动失败,则不会继续往后走,确保节点的正常服务能力。
image.png
四、Pod和容器
4.1、Pod的生命周期
pod状态.jpg
4.2、容器的状态
容器状态.jpg
4.3、Pod的探针Probe
Kubernetes为检查应用状态定义了三种探针,它们分别对应容器不同的状态:
- Startup,启动探针,用来检查应用是否已经启动成功,适合那些有大量初始化工作要做, 启动很慢的应用。
- Liveness,存活探针,用来检查应用是否正常运行,是否存在死锁、死循环。
- Readiness,就绪探针,用来检查应用是否可以接收流量,是否能够对外提供服务。
这三种探针是递进的关系:应用程序先启动,加载完配置文件等基本的初始化数 据就进入了Startup状态,之后如果没有什么异常就是Liveness存活状态,但可能有一些准备 工作没有完成,还不一定能对外提供服务,只有到最后的Readiness状态才是一个容器最健康 可用的状态。
- 注:这一段的描述和示意图来自极客时间《Kubernetes入门实战课》。
image.png
探测状态可以使用Shell、TCP Socket、HTTP Get三种方式,还可以调整探测的频率和超时时间等参数。
- exec,执行一个Linux命令,比如ps、cat等等,和container的command字段很类似。
- tcpSocket,使用TCP协议尝试连接容器的指定端口。
- httpGet,连接端口并发送HTTP GET请求。
总结:我们在yml中使用探针的示例:
containers:
- env:
- name: TZ
value: Asia/Shanghai
image: >-
xxx/smarterclass:1.2.1
imagePullPolicy: Always
# 优雅停机,执行kill命令,而不是kill -9
# do_stop.sh是自己封装的一个命令
lifecycle:
preStop:
exec:
command:
- /bin/sh
- '-c'
- >-
wget http://127.0.0.1:54199/offline 2>/tmp/null;sleep 45 &&
/opt/xxx/smarterclass/bin/do_stop.sh
name: air-smarterclass
ports:
- containerPort: 9025
protocol: TCP
readinessProbe:
failureThreshold: 3
httpGet:
path: /mgm/health
port: 9025
scheme: HTTP
initialDelaySeconds: 1
periodSeconds: 5
successThreshold: 1
timeoutSeconds: 3
startupProbe:
# 连续探测失败几次才认为是真正发生了异常,默认是3次
failureThreshold: 3
exec:
command:["cat","/opt/xxx/smarterclass/pid"]
# 执行探测动作的时间间隔,默认是10秒探测一次
periodSeconds: 3
successThreshold: 1
timeoutSeconds: 1
livenessProbe:
failureThreshold: 3
tcpSocket:
port: 9025
periodSeconds: 10
successThreshold: 1
timeoutSeconds: 1
五、应用的优雅停机
- 断开tomcat的连接,不再接受新的请求
import org.apache.catalina.connector.Connector;
private volatile Connector connector;
if (this.connector != null) {
# 断开连接
this.connector.pause();
Executor tomcatExecutor = this.connector.getProtocolHandler().getExecutor();
if (tomcatExecutor instanceof ExecutorService) {
# 关闭线程池
XxxExecutorServiceShutdownHooks.add((ExecutorService) tomcatExecutor);
}
}
- 把当前服务从consul注册中心主动下线
private DiscoveryRegistration registration;
this.registration.deregister();
- mq消息监听停止
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.config.RabbitListenerConfigUtils;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
import org.springframework.context.ApplicationContext;
import java.util.Collection;
import java.util.Map;
public void shutdown(ApplicationContext context) {
log.info("begin shutdown RabbitMQ");
RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry = context.getBean(RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
RabbitListenerEndpointRegistry.class);
Collection<MessageListenerContainer> containers = rabbitListenerEndpointRegistry.getListenerContainers();
for (MessageListenerContainer messageListenerContainer : containers) {
if (messageListenerContainer.isRunning()) {
messageListenerContainer.stop();
}
}
log.info("finish shutdown RabbitMQ");
}
- 线程池优雅关闭
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
public class XxxExecutorServiceShutdownHooks {
private static HashSet<ExecutorService> executorServiceHashSet = new HashSet<>(10);
/**
* 添加需要gracefulShutdown的ExecutorService
*
* @param hook
*/
public static synchronized void add(ExecutorService hook) {
if (hook != null) {
executorServiceHashSet.add(hook);
}
}
/**
* shutdown
*
* @param shutdownAwaitTermination
*/
public static void shutdownGraceful(long shutdownAwaitTermination) {
List<ExecutorService> executorServices = new ArrayList<>(executorServiceHashSet);
CompletableFuture<Void> future = ThreadPoolUtils.shutdownGraceful(executorServices, shutdownAwaitTermination);
if (future != null) {
future.join();
}
}
}
六、遗留问题
- ribbon 客户端调用,存在注册中心的服务列表缓存问题。服务虽然从注册中心下线了,但是调用方默认是不能及时感知的。(实现见类com.netflix.loadbalancer.PollingServerListUpdater) (使用ScheduledThreadPoolExecutor实现定时刷新)
// 首次任务延迟1秒
private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
// 每30秒刷新
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
// 静态内部类
private static class LazyHolder {
private final static String CORE_THREAD = "DynamicServerListLoadBalancer.ThreadPoolSize";
private final static DynamicIntProperty poolSizeProp = new DynamicIntProperty(CORE_THREAD, 2);
private static Thread _shutdownThread;
static ScheduledThreadPoolExecutor _serverListRefreshExecutor = null;
static {
int coreSize = poolSizeProp.get();
ThreadFactory factory = (new ThreadFactoryBuilder())
.setNameFormat("PollingServerListUpdater-%d")
.setDaemon(true)
.build();
_serverListRefreshExecutor = new ScheduledThreadPoolExecutor(coreSize, factory);
poolSizeProp.addCallback(new Runnable() {
@Override
public void run() {
_serverListRefreshExecutor.setCorePoolSize(poolSizeProp.get());
}
});
_shutdownThread = new Thread(new Runnable() {
public void run() {
logger.info("Shutting down the Executor Pool for PollingServerListUpdater");
shutdownExecutorPool();
}
});
// 定义钩子_shutdownThread,并加入
Runtime.getRuntime().addShutdownHook(_shutdownThread);
}
private static void shutdownExecutorPool() {
if (_serverListRefreshExecutor != null) {
_serverListRefreshExecutor.shutdown();
// 防止循环,这里将_shutdownThread钩子进行移除
if (_shutdownThread != null) {
try {
Runtime.getRuntime().removeShutdownHook(_shutdownThread);
} catch (IllegalStateException ise) { // NOPMD
// this can happen if we're in the middle of a real
// shutdown,
// and that's 'ok'
}
}
}
}
}
- java服务刚启动的时候,这时候如果有大量的调用请求,jvm会出现一定程度得阻塞,响应能力差。也就是需要对节点,做到逐步放入流量,让线程池和对数据存储的连接等进行预热。(这就依赖于公司各自的流量控制平台了,比如命名服务、服务单元化、灰度测试、api网关、istio)









网友评论