spring-gateway动态路由实现:
gateway 默认的配置方式是通过配置文件定义路由,这种方式和使用Nginx这些代理服务差别不大了,实际意义不大;
实际工作种,肯定需要通过代码从数据库或者接口获取路由配置,再建立路由.
实践中还面临另外一个问题,如果某个服务挂了,路由下线,新开了一个服务,路由上线的问题(本次讨论点)
gateway路由生成工具:
org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder
该工具是gateway创建路由的默认工具:具体构建过程可以参考[https://www.jianshu.com/p/fd0ca560d660]
路由器RouteLocator创建
gateway路由器接口
public interface RouteLocator {
Flux<Route> getRoutes();
}
我们看看gateway提供的路由器:
org.springframework.cloud.gateway.route.CachingRouteLocator
org.springframework.cloud.gateway.route.CompositeRouteLocator
org.springframework.cloud.gateway.route.RouteDefinitionRouteLocator
使用默认方式创建的路由器不具备添加,删除路由功能,
本例参考CachingRouteLocator实现了自定义的路由器:
实现过程
主要是
- 要能改写getRoutes()获取到的结果(改写意味着添加或删除),
- 改写成功后通知系统需要使用最新的路由;
package com.mydemo.spring.dynamicroutes.support;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.gateway.event.RefreshRoutesEvent;
import org.springframework.cloud.gateway.event.WeightDefinedEvent;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.support.WeightConfig;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.Ordered;
import reactor.core.publisher.Flux;
import java.util.*;
import static java.util.Collections.synchronizedMap;
/**
* @name 路由器
* @auth DIY
* @date 2020/10/13 17:58
**/
public class MyRouteLocator implements Ordered, RouteLocator, ApplicationEventPublisherAware {
private static final Logger log = LoggerFactory.getLogger(MyRouteLocator.class);
/**
* 路由权重配置
*/
public static final String ROUTE_WEIGHT_CONFIG = "route_weight_confg";
private final Map<String, Route> cacheRoutes = synchronizedMap(new LinkedHashMap<>());
private Flux<Route> workRoutes;
private ApplicationEventPublisher applicationEventPublisher;
public static MyRouteLocator build(Flux<Route> newRoutes){
MyRouteLocator newInstance = new MyRouteLocator();
newRoutes.subscribe(route -> {
newInstance.cacheRoutes.put(route.getId(), route);
});
newInstance.fetch();
return newInstance;
}
private MyRouteLocator() {}
private Flux<Route> fetch() {
workRoutes = Flux.fromIterable(cacheRoutes.values()).sort(Comparator.comparing(route -> route.getOrder()));
return workRoutes;
}
@Override
public Flux<Route> getRoutes() {
return workRoutes;
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
@Override
public int getOrder() {
return 0;
}
public List<WeightConfig> add(Flux<Route> newRoutes) {
List<WeightConfig> routeConfigs = new ArrayList<>();
newRoutes.subscribe(route -> {
cacheRoutes.put(route.getId(), route);
WeightConfig config = getRouteWeightConfig(route);
routeConfigs.add(config);
log.warn("网关路由服务{}准备上线", config);
}, throwable -> {
throw new RuntimeException("路由加载失败", throwable);
});
fetch();
publishRefreshRouteEvent();
return routeConfigs;
}
public WeightConfig del(String routeId) {
if (cacheRoutes.containsKey(routeId)) {
Route route = cacheRoutes.get(routeId);
WeightConfig config = getRouteWeightConfig(route);
config.setWeight(0);
publishRefreshRouteWeightEvent(config);
cacheRoutes.remove(routeId);
fetch();
publishRefreshRouteEvent();
log.warn("网关路由服务[{}]已下线", routeId);
return config;
}
throw new RuntimeException("Route not found: " + routeId);
}
public static WeightConfig getRouteWeightConfig(Route route){
Object o = route.getMetadata().get(ROUTE_WEIGHT_CONFIG);
if (null == o){
throw new RuntimeException("当前路由无权重配置信息");
}
return (WeightConfig)o;
}
private void publishRefreshRouteEvent(){
applicationEventPublisher.publishEvent(new RefreshRoutesEvent(this));
}
private void publishRefreshRouteWeightEvent(WeightConfig weightConfig){
applicationEventPublisher.publishEvent(new WeightDefinedEvent(this, weightConfig));
}
}
spring-gateway有监听
applicationEventPublisher.publishEvent(new RefreshRoutesEvent(this));
发送的路由刷新事件,系统监听到该事件会刷新对应路由器的工作路由
RouteDefinitionRouteLocator 是另外一条通往动态路由的路, 通过org.springframework.cloud.gateway.route.RouteDefinitionLocator实现重新定义路由并改写getRoutes()获取的结果,这里不做扩展讨论;
路由变化的触发方式:
- 通过接口调度,实现添加或删除:
package com.mydemo.spring.dynamicroutes.api;
import com.mydemo.spring.dynamicroutes.config.MyRouteConfig;
import com.mydemo.spring.dynamicroutes.entity.MyRoute;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* @name 路由管理接口
* @auth DIY
* @date 2020/10/13 18:17
**/
@RestController
@RequestMapping("/api/route")
public class RouteApi {
private final List<MyRoute> routes = new ArrayList<>();
@Autowired
MyRouteConfig routeConfig;
public RouteApi(){
MyRoute myRoute1 = new MyRoute("route001", "group01", 10, "/route/s1/*", "http://127.0.0.1:8080");
MyRoute myRoute2 = new MyRoute("route002", "group01", 10, "/route/s1/*", "http://127.0.0.1:8081");
MyRoute myRoute3 = new MyRoute("route003", "group01", 10, "/route/s1/*", "http://127.0.0.1:8082");
MyRoute myRoute4 = new MyRoute("route004", "group02", 10, "/route/s2/*", "http://127.0.0.1:8083");
MyRoute myRoute5 = new MyRoute("route005", "group02", 10, "/route/s2/*", "http://127.0.0.1:8084");
MyRoute myRoute6 = new MyRoute("route006", "group03", 10, "/route/s3/*", "http://127.0.0.1:8085");
routes.add(myRoute1);
routes.add(myRoute2);
routes.add(myRoute3);
routes.add(myRoute4);
routes.add(myRoute5);
routes.add(myRoute6);
}
@RequestMapping("/hello/{name}")
public String hello(@PathVariable String name){
return "hello: " + name;
}
@RequestMapping("/add/{id}")
public String addRoute(@PathVariable String id){
List<MyRoute> myRouteList = routes.stream()
.filter(route -> route.getRouteId().equals(id))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(myRouteList)){
return "not found route";
}
MyRoute myRoute = myRouteList.get(0);
routeConfig.addRoutes(myRoute);
return "addRoute: " + myRoute;
}
@RequestMapping("/del/{id}")
public String delRoute(@PathVariable String id){
List<MyRoute> myRouteList = routes.stream()
.filter(route -> route.getRouteId().equals(id))
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(myRouteList)){
return "not found route";
}
MyRoute myRoute = myRouteList.get(0);
routeConfig.delRoutes(myRoute.getRouteId());
return "delRoute: " + myRoute;
}
}
- 配置服务注册中心(例:nacos),通过注册中心服务状态变化的监听,实现动态路由;
package com.mydemo.spring.dynamicroutes.config;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
/**
* @name 监听nacos注册中心服务状态变化情况
* @auth DIY
* @date 2020/10/14 11:08
**/
@Configuration
public class NacosServerListener implements InitializingBean {
@Value("${nacos.config.server-addr}")
private String nacosSeverAddr;
private NamingService namingService;
@Bean
public NamingService getNamingService() {
try {
// Initialize the configuration service, and the console automatically obtains the following parameters through the sample code.
Properties properties = new Properties();
properties.put("serverAddr", nacosSeverAddr);
// if need username and password to login
properties.put("username", "nacos");
properties.put("password", "nacos");
namingService = NacosFactory.createNamingService(properties);
return namingService;
} catch (NacosException e) {
throw new RuntimeException("注册中心订阅异常", e);
}
}
public void listenRouteChange(String serviceName, String groupName, NamingService namingService) throws NacosException {
namingService.subscribe(serviceName, groupName, event -> {
NamingEvent namingEvent = (NamingEvent) event;
System.out.println("NACOS: " + namingEvent);
//TODO 根据监听到的服务信息(上下线),同步对应的路由上下线
});
}
@Override
public void afterPropertiesSet() throws Exception {
NamingService namingService = getNamingService();
listenRouteChange("myserver01", "group01", namingService);
listenRouteChange("myserver02", "group01", namingService);
listenRouteChange("myserver03", "group03", namingService);
}
}
写得比较赶,将就看.
网友评论