基于netty请求的远程调用学习
1概述
本文模拟远程过程调用进行实战总结,总的内容如下;
-
抽查接口公共部分
-
zookeerper相关知识点热身、常用api操作。
-
针对服务端,实例化服务对象,指定服务的ip和端口。注册到zk端,并启动本地监听服务。(这里服务端我会测试开启多个,相同ip但不同端口)
如:127.0.0.19090;127.0.0.1:9091
-
针对客户端:
首先会去监听zk指定path下的服务列表。
本地实现一个随机的服务选择算法,如需扩展,只需实现IServiceDiscovery接口即可。
封装了RpcClientProxyFactory,客户端发起请求时,提供服务接口类,如 IOrder.class,创建远程代理对象。当发起调用时,则发起真正的invoke,发起netty通讯连接,并根据原始method构造MyRpcReqInfo请求结构体。
-
服务端手动MyRpcReqInfo请求后,解析从本地服务中找出对应的实例化服务对象并进行反射调用。
-
服务端对请求进行发序列化,并进行解析处理,最后结果封装成MyRpcRespInfo并返回。
1.1zk热身
zk基于windows操作系统,版本为:zookeeper-3.4.10
zkServer.cmd:启动zk服务端
zkCli.cmd:连接zk服务端
zkcli.cmd -server:localhost:2181:连接指定的zk服务器
建立节点 create /zk hello
获得节点 get /zk
设置节点 set /zk hello2
建立子节点 set /zk/subzk hello3
输出节点目录 ls /zk
删除节点 delete /zk等
2公共部分
2.1注解定义
2.1.1RpcClazzAnno
package com.kikop.common.rpc.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author kikop
* @version 1.0
* @project Name: MySimpledRpcBasedTomcat
* @file Name: RpcClazzAnno
* @desc 功能描述 服务类注解
* @date 2020/8/23
* @time 17:21
* @by IDE: IntelliJ IDEA
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcClazzAnno {
}
2.1.2RpcMethodAnno
package com.kikop.common.rpc.annotations;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* @author kikop
* @version 1.0
* @project Name: MySimpledRpcBasedTomcat
* @file Name: RpcMethodAnno
* @desc 功能描述 服务类中方法注解
* @date 2020/8/23
* @time 17:21
* @by IDE: IntelliJ IDEA
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcMethodAnno {
}
2.2通讯结构体
2.2.1请求
package com.kikop.common.rpc.model;
import java.io.Serializable;
/**
* @author kikop
* @version 1.0
* @project Name: MySimpledRpcBasedTomcat
* @file Name: MyRpcReqInfo
* @desc 功能描述 RPC通信请求数据结构
* @date 2020/8/23
* @time 16:52
* @by IDE: IntelliJ IDEA
*/
public class MyRpcReqInfo implements Serializable {
private static final long serialVersionUID = 5774354750836094390L;
/**
* 包名:com.kikop.myrpcdiy.advancedBasedNetty.server.provider
*
*/
private String packetName;
/**
* 仅仅类名
* simpled:OrderImpl、IOrder and so on
* advanced:com.kikop.myrpcdiy.advancedBasedNetty.server.provider.IOrder
*/
private String clazzName;
/**
* 方法名
*/
private String methodName;
/**
* 方法参数值列表
*/
private Object[] args;
/**
* 方法参数类型集合
*/
private Class<?>[] paramTypes;
public Class<?>[] getParamTypes() {
return paramTypes;
}
public void setParamTypes(Class<?>[] paramTypes) {
this.paramTypes = paramTypes;
}
private String additionInfo;
public String getAdditionInfo() {
return additionInfo;
}
public void setAdditionInfo(String additionInfo) {
this.additionInfo = additionInfo;
}
public Object[] getArgs() {
return args;
}
public void setArgs(Object[] args) {
this.args = args;
}
public String getPacketName() {
return packetName;
}
public void setPacketName(String packetName) {
this.packetName = packetName;
}
public String getClazzName() {
return clazzName;
}
public void setClazzName(String clazzName) {
this.clazzName = clazzName;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
}
2.2.2响应
package com.kikop.common.rpc.model;
import java.io.Serializable;
/**
* @author kikop
* @version 1.0
* @project Name: MySimpledRpcBasedTomcat
* @file Name: MyRpcReqInfo
* @desc 功能描述 RPC通信响应结构
* @date 2020/8/23
* @time 16:52
* @by IDE: IntelliJ IDEA
*/
public class MyRpcRespInfo implements Serializable {
private static final long serialVersionUID = 6027658366486943598L;
/**
* 请求方法名
*/
private String methodName;
/**
* 请求结果
*/
private Object result;
private String additionInfo;
public String getAdditionInfo() {
return additionInfo;
}
public void setAdditionInfo(String additionInfo) {
this.additionInfo = additionInfo;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
}
2.3注解工具类
package com.kikop.common.rpc;
import com.alibaba.fastjson.JSONObject;
import com.kikop.common.rpc.annotations.RpcClazzAnno;
import com.kikop.common.rpc.annotations.RpcMethodAnno;
import java.io.File;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author kikop
* @version 1.0
* @project Name: MySimpledRpcBasedTomcat
* @file Name: MyAnnotationUtil
* @desc 功能描述 注解读取工具类
* @date 2020/8/23
* @time 17:24
* @by IDE: IntelliJ IDEA
*/
public class MyAnnotationUtil {
/**
* 获取包中指定类注解的指定方法
*
* @param basePackage 包名 com.kikop.simpledBasedTomcat.service
* @return {"com.kikop.simpledBasedTomcat.service.OrderImpl":
* [{"getName":[]},
* {"setName":["String"]},
* {"getOrderCount":["String","Integer"]}
* ]
* }
* @throws ClassNotFoundException
*/
public static Map<String, List<Map<String, List<String>>>> parse(String basePackage) throws ClassNotFoundException {
Map<String, List<Map<String, List<String>>>> resultClazzMap = new HashMap<>();
// 1.获取当前注解所在的资源classPath类路径:
// /E:/workdirectory/Dev/study/MySimpledRpcBasedTomcat/target/classes/
String rootPath = MyAnnotationUtil.class.getResource("/").getPath();
// 1.1.转换包路径到文件结构:
// :com/kikop/simpledBasedTomcat/service
String basePacketPath = basePackage.replace(".", "/");
// 1.2.构造File对象
// /E:/workdirectory/Dev/study/MySimpledRpcBasedTomcat/target/classes/com/kikop/simpledBasedTomcat/service
String packagePathName = rootPath + basePacketPath;
File file = new File(packagePathName);
// 2.遍历包下的所有文件
// 2.1.遍历得到文件名列表
// [0]:OrderImpl.class
String[] names = file.list();
for (String fileName : names) {
// OrderImpl.class--> OrderImpl
String className = fileName.replaceAll(".class", "");
// com.kikop.simpledBasedTomcat.service.OrderImpl
String fullClazzName = basePackage + "." + className;
Class<?> aClass = Class.forName(fullClazzName);
if (aClass.isAnnotationPresent(RpcClazzAnno.class)) { // 指定类上的注解
Method[] declaredMethods = aClass.getDeclaredMethods();
List<Map<String, List<String>>> resulMethodsWithinClazzMap = new ArrayList<>();
// 2.1.遍历类上定义的需要对外提供服务的方法,并缓存到MethodMap中
for (Method declaredMethod : declaredMethods) {
if (declaredMethod.isAnnotationPresent(RpcMethodAnno.class)) { // 指定方法上的注解
//1.1.Map:key 包名+类名,v:方法列表
// key:方法名
// value:方法参数类型
Map<String, List<String>> methodMap = new HashMap<>();
List<String> params = new ArrayList<>(); // 方法的参数列表
Class<?>[] parameterTypes = declaredMethod.getParameterTypes();
for (Class<?> parameterType : parameterTypes) {
String simpleName = parameterType.getSimpleName();
params.add(simpleName);
}
methodMap.put(declaredMethod.getName(), params);
resulMethodsWithinClazzMap.add(methodMap);
}
}
// key:com.kikop.simpledBasedTomcat.service.OrderImpl
// value:resulMethodsWithinClazzMap
resultClazzMap.put(fullClazzName, resulMethodsWithinClazzMap);
}
}
// System.out.println(JSONObject.toJSON(resultClazzMap));
return resultClazzMap;
}
public static void main(String[] args) throws ClassNotFoundException {
MyAnnotationUtil.parse(ServerConfig.BASEPACKAGE);
}
}
2.3.1打印服务端服务列表
image-20210314204253483.png
2.4原生socket接口封装
package com.kikop.common.rpc;
import com.kikop.common.rpc.model.MyRpcReqInfo;
import com.kikop.common.rpc.model.MyRpcRespInfo;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.Arrays;
/**
* @author kikop
* @version 1.0
* @project Name: MySimpledRpcBasedTomcat
* @file Name: MySocketUtils
* @desc 功能描述 socket数据序列化发送
* @date 2021/3/13
* @time 17:24
* @by IDE: IntelliJ IDEA
*/
public class MySocketUtils {
/**
* 获取请求元数据
*
* @param currentSocket
* @return
* @throws IOException
* @throws ClassNotFoundException
*/
public static MyRpcReqInfo getReqMetaData(Socket currentSocket) throws IOException, ClassNotFoundException {
// 1.获取远程请求参数并反序列化
ObjectInputStream objectInputStream = new ObjectInputStream(currentSocket.getInputStream());
MyRpcReqInfo myRpcReqInfo = (MyRpcReqInfo) objectInputStream.readObject();
return myRpcReqInfo;
}
/**
* 发送请求元数据
*
* @param currentSocket
* @return
* @throws IOException
* @throws ClassNotFoundException
*/
public static void sendReqMetaData(Socket currentSocket, MyRpcReqInfo myRpcReqInfo) throws IOException, ClassNotFoundException {
// 1.获取通道写入流
ObjectOutputStream objectOutputStream = new ObjectOutputStream(currentSocket.getOutputStream());
// 2.发起请求
objectOutputStream.writeObject(myRpcReqInfo);
// 提交
objectOutputStream.flush();
// 关闭输出流
currentSocket.shutdownOutput();
// 3.获取输入流,并读取服务器端的响应信息
ObjectInputStream objectInputStream = new ObjectInputStream(currentSocket.getInputStream());
Object respResult = objectInputStream.readObject();
if (respResult instanceof MyRpcRespInfo) {
MyRpcRespInfo myRpcRespInfo = (MyRpcRespInfo) respResult;
System.out.println(String.format("taskInfo:[%s],reqClazzInfo:[%s.%s:%s]-->reqResult:%s",
myRpcReqInfo.getAdditionInfo(),
myRpcReqInfo.getClazzName(), myRpcReqInfo.getMethodName(), Arrays.toString(myRpcReqInfo.getArgs()),
String.valueOf(myRpcRespInfo.getResult())));
}
//4.关闭资源
objectOutputStream.close();
objectInputStream.close();
currentSocket.close();
}
/**
* 发送请求结果
*
* @param currentSocket
* @return
* @throws IOException
* @throws ClassNotFoundException
*/
public static void sendRespResult(Socket currentSocket, MyRpcRespInfo myRpcRespInfo) throws IOException, ClassNotFoundException {
// 1.获取输出流,向服务器端发送信息
ObjectOutputStream objectOutputStream = new ObjectOutputStream(currentSocket.getOutputStream());
// 2.发起请求
objectOutputStream.writeObject(myRpcRespInfo);
// 3.提交并关闭流
objectOutputStream.flush();
objectOutputStream.close();
}
}
2.5服务参数配置
2.5.1ServerConfig
package com.kikop.common.rpc;
/**
* @author kikop
* @version 1.0
* @project Name: MySimpledRpcBasedTomcat
* @file Name: ServerConfig
* @desc 功能描述 服务配置类,定义对外抛出的服务
* @date 2020/8/23
* @time 17:21
* @by IDE: IntelliJ IDEA
*/
public class ServerConfig {
public final static String HOST = "127.0.0.1";
public final static int PORT = 6666;
public static final String BASEPACKAGE = "com.kikop.simpledBasedTomcat.service";
}
2.5.2ZkConfig
package com.kikop.common.rpc;
/**
* @author kikop
* @version 1.0
* @project Name: MySimpledRpcBasedTomcat
* @file Name: ZkConfig
* @desc 功能描述 zookeeper配置信息
* @date 2020/8/29
* @time 20:09
* @by IDE: IntelliJ IDEA
*/
public class ZkConfig {
// zk服务中心
public final static String CONNECTOR_STR = "127.0.0.1:2181";
// 服务根路径(一对多)
public final static String ZK_REGISTER_PATH = "/registryserver";
}
3服务端
3.1maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kikop</groupId>
<artifactId>myAdvancedRpcBasedNetty</artifactId>
<version>1.0-SNAPSHOT</version>
<name>myAdvancedRpcBasedNetty</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<netty.verson>4.1.6.Final</netty.verson>
<!--zookeeper-->
<zookeeper.version>3.4.10</zookeeper.version>
<curator.version>2.5.0</curator.version>
<fastjson.version>1.2.29</fastjson.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!--begin with netty rpc-->
<!--1.对zookeeper的底层api的一些封装-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>${curator.version}</version>
</dependency>
<!--1.1.封装了一些高级特性-->
<!--如:Cache事件监听、选举、分布式锁、分布式计数器、分布式 Barrier等-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${curator.version}</version>
</dependency>
<!--2.netty-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.verson}</version>
</dependency>
<!--end with netty rpc-->
<!--3.fastjson-->
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.7.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.0.0</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
3.2对外接口服务
3.2.1服务接口
package com.kikop.advancedBasedNetty.producer.service;
/**
* @author kikop
* @version 1.0
* @project Name: myAdvancedRpcBasedNetty
* @file Name: IOrder
* @desc 功能描述
* @date 2020/8/29
* @time 19:39
* @by IDE: IntelliJ IDEA
*/
public interface IOrder {
String buy(int type);
}
3.2.2服务实现类
package com.kikop.advancedBasedNetty.producer.service.impl;
import com.kikop.advancedBasedNetty.producer.service.IOrder;
import com.kikop.common.rpc.annotations.RpcIFValueAnno;
/**
* @author kikop
* @version 1.0
* @project Name: myAdvancedRpcBasedNetty
* @file Name: OrderImp
* @desc 功能描述
* @date 2020/8/29
* @time 19:39
* @by IDE: IntelliJ IDEA
*/
// 实现服务类对应的接口
@RpcIFValueAnno(value = IOrder.class)
public class OrderImp implements IOrder {
@Override
public String buy(int type) {
if (1 == type) {
return "手机";
}
return "电脑";
}
}
注意,对外接口服务统一放在:com.kikop.simpledBasedTomcat.service这个包路径下。
package com.kikop.simpledBasedTomcat.service;
import com.kikop.common.rpc.annotations.RpcClazzAnno;
import com.kikop.common.rpc.annotations.RpcMethodAnno;
/**
* @author kikop
* @version 1.0
* @project Name: MySimpledRpcBasedTomcat
* @file Name: OrderImpl
* @desc 功能描述 模拟对外提供的订单服务
* @date 2020/8/23
* @time 17:21
* @by IDE: IntelliJ IDEA
*/
@RpcClazzAnno
public class OrderImpl {
private String orderName;
@RpcMethodAnno
public String getOrderName() {
return orderName;
}
@RpcMethodAnno
public void setOrderName(String orderName) {
this.orderName = orderName;
}
@RpcMethodAnno
public Integer getOrderCount(String orderName) {
if (orderName.equalsIgnoreCase("apple")) {
return 100;
}
return 10;
}
}
3.3服务注册中心
3.3.1IRegisterCenter
package com.kikop.advancedBasedNetty.producer.registry;
/**
* @author kikop
* @version 1.0
* @project Name: myAdvancedRpcBasedNetty
* @file Name: 基于zk的注册中心
* @desc 功能描述
* @date 2020/8/29
* @time 20:23
* @by IDE: IntelliJ IDEA
*/
public interface IRegisterCenter {
/**
* 服务注册到zk
* @param serviceName 服务名(接口类全名):com.kikop.myrpcdiy.advancedBasedNetty.server.provider.IOrder
* @param servieAdress 服务地址:127.0.0.1:9090
*/
void register(String serviceName, String servieAdress);
}
3.3.2RegisterCenterImpl
package com.kikop.advancedBasedNetty.producer.registry.impl;
import com.kikop.advancedBasedNetty.consumer.registry.ZkConfig;
import com.kikop.advancedBasedNetty.producer.registry.IRegisterCenter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
/**
* @author kikop
* @version 1.0
* @project Name: myAdvancedRpcBasedNetty
* @file Name: 基于zk的注册中心
* @desc 功能描述
* @date 2020/8/29
* @time 20:23
* @by IDE: IntelliJ IDEA
*/
public class RegisterCenterImpl implements IRegisterCenter {
/**
* zk客户端调用API接口
*/
private CuratorFramework curatorFramework;
public RegisterCenterImpl() {
// 内部启动两个线程,一个用于监听、一个用于后台操作轮询
curatorFramework = CuratorFrameworkFactory.builder()
.connectString(ZkConfig.CONNECTOR_STR)
.sessionTimeoutMs(4000)
.retryPolicy(new ExponentialBackoffRetry(1000, 10))
.build();
curatorFramework.start();
}
/**
* 服务注册到zk
* @param serviceName 服务名
* @param servieAdress 服务地址
*/
@Override
public void register(String serviceName, String servieAdress) {
String servicePath = ZkConfig.ZK_REGISTER_PATH + "/" + serviceName;
try {
// 1.创建永久节点:服务名
if (curatorFramework.checkExists().forPath(servicePath) == null) {
curatorFramework.create().
creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(servicePath, "0".getBytes());
}
// 2.创建临时节点:/registrys/com.kikop.IG/serviceAddress1
String addressPath = servicePath + "/" + servieAdress;
String rsNode = curatorFramework.create()
.withMode(CreateMode.EPHEMERAL)
.forPath(addressPath, "0".getBytes());
System.out.println("服务注册成功:" + rsNode);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
3.3服务端逻辑
3.3.1服务端业务处理
package com.kikop.advancedBasedNetty.producer.handler;
import com.kikop.common.rpc.model.MyRpcReqInfo;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.lang.reflect.Method;
import java.util.HashMap;
/**
* @author kikop
* @version 1.0
* @project Name: myAdvancedRpcBasedNetty
* @file Name: RpcHandler
* @desc 功能描述
* @date 2020/8/30
* @time 10:59
* @by IDE: IntelliJ IDEA
*/
public class MyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 当前的服务端信息
*/
private String serverAddress;
/**
* 缓存支持的服务列表
* [0]:
* key:接口名 com.kikop.myrpcdiy.advancedBasedNetty.server.provider.IOrder
* value:接口实现类 OrderImpl
* [1]:
* key:接口名 com.kikop.myrpcdiy.advancedBasedNetty.server.provider.IAccount
* value:接口实现类 AccountImpl
*/
private HashMap<String, Object> serverHandlerMap = new HashMap<>();
public MyServerHandler(String serverAddress,HashMap<String, Object> serverHandlerMap) {
this.serverAddress=serverAddress;
this.serverHandlerMap = serverHandlerMap;
}
/**
* @param ctx 写数据给客户端
* @param msg 读客户端数据
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//super.channelRead(ctx, msg);
MyRpcReqInfo myRpcReqInfo = (MyRpcReqInfo) msg;
Object result = new Object();
if (serverHandlerMap.containsKey(myRpcReqInfo.getClazzName())) {
// 获取服务对象
Object serverInstance = serverHandlerMap.get(myRpcReqInfo.getClazzName());
// 获取类中的方法(带参数)
Method method = serverInstance.getClass().getMethod(
myRpcReqInfo.getMethodName(), myRpcReqInfo.getParamTypes());
// 调用方法
result = method.invoke(serverInstance, myRpcReqInfo.getArgs());
result=String.format("%s-->%s",serverAddress,result);
}
ctx.write(result);
ctx.flush();
ctx.close();
}
}
3.3.2服务管理类
package com.kikop.advancedBasedNetty.producer.manager;
import com.kikop.advancedBasedNetty.producer.handler.MyServerHandler;
import com.kikop.advancedBasedNetty.producer.registry.IRegisterCenter;
import com.kikop.common.rpc.annotations.RpcIFValueAnno;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectEncoder;
import java.util.HashMap;
/**
* @author kikop
* @version 1.0
* @project Name: myAdvancedRpcBasedNetty
* @file Name: ServerManager
* @desc 功能描述 服务的管理(发布、启停等)
* @date 2020/8/30
* @time 10:43
* @by IDE: IntelliJ IDEA
*/
public class ServerManager {
private IRegisterCenter registerCenter;
// 127.0.0.1:9090
private String serviceAddress;
/**
* cache缓存当前的对外服务列表
* key:接口名,由 packetName +"."+clazzName
* com.kikop.advancedBasedNetty.producer.service.IOrder
* value:实例对象 OrderImpl
*/
private HashMap<String, Object> serverHandlerMap = new HashMap<>();
/**
* @param serviceAddress
* @param registerCenter
*/
public ServerManager(String serviceAddress, IRegisterCenter registerCenter) {
this.serviceAddress = serviceAddress;
this.registerCenter = registerCenter;
}
/**
* 启动服务
*/
public void startSvr() {
try {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // boss
EventLoopGroup workerGroup = new NioEventLoopGroup(); // worker
ServerBootstrap serverBootstrap = new ServerBootstrap(); // netty启动类
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("encoder", new ObjectEncoder());
pipeline.addLast("decoder", new io.netty.handler.codec.serialization.ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
// io 数据交互
pipeline.addLast(new MyServerHandler(serviceAddress, serverHandlerMap));
}
}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
String[] addressArray = serviceAddress.split(":");
String ip = addressArray[0];
int port = Integer.parseInt(addressArray[1]);
ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();
System.out.println("Netty服务端启动成功,等待客户端连接...");
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 缓存当前的接口服务
*
* @param services 当前支持的服务列表
*/
public void registerSvr(Object... services) {
for (Object service : services) {
// 判断类上是否有指定的注解
if (service.getClass().isAnnotationPresent(RpcIFValueAnno.class)) {
// 获取特定的注解
RpcIFValueAnno rpcIFValueAnno = service.getClass().getAnnotation(RpcIFValueAnno.class);
// rpcIFValueAnno.value().isAssignableFrom(service.getClass())
// key:服务类上的注解类全名信息,如
// com.kikop.advancedBasedNetty.producer.service.IOrder
// value:127.0.0.1:9090
serverHandlerMap.put(rpcIFValueAnno.value().getName(), service);
}
}
for (String fullInterfaceClazzName : serverHandlerMap.keySet()) {
// iRegisterCenter.register("com.kikop.myrpcdiy.advancedBasedNetty.server.provider.IOrder", "127.0.0.1:9090");
registerCenter.register(fullInterfaceClazzName, serviceAddress);
}
}
}
4客户端
4.1服务查询
4.1.1服务选择接口
package com.kikop.advancedBasedNetty.consumer.registry.service;
/**
* @author kikop
* @version 1.0
* @project Name: myAdvancedRpcBasedNetty
* @file Name: ZkConfig
* @desc 功能描述
* @date 2020/8/29
* @time 20:09
* @by IDE: IntelliJ IDEA
*/
public interface IServiceDiscovery {
/**
* 获取一个可用的服务
* @param serviceName: com.kikopmyrpcdiy.advancedBasedNetty.server.provider.IOrder
* @return
*/
public String discover(String serviceName) ;
}
4.1.2服务选择实现类
package com.kikop.advancedBasedNetty.consumer.registry.service.impl;
import com.kikop.advancedBasedNetty.consumer.localloadbalance.service.ILoadBalance;
import com.kikop.advancedBasedNetty.consumer.localloadbalance.service.impl.RandomLoadBalanceImpl;
import com.kikop.advancedBasedNetty.consumer.registry.service.IServiceDiscovery;
import com.kikop.common.rpc.ZkConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import java.util.ArrayList;
import java.util.List;
/**
* @author kikop
* @version 1.0
* @project Name: myAdvancedRpcBasedNetty
* @file Name: ZkConfig
* @desc 功能描述
* @date 2020/8/29
* @time 20:09
* @by IDE: IntelliJ IDEA
*/
public class ServiceDiscoveryImpl implements IServiceDiscovery {
/**
* 服务列表
* 用于负载均衡
*/
List<String> repos = new ArrayList<>();
public CuratorFramework curatorFramework;
public ServiceDiscoveryImpl() {
curatorFramework = CuratorFrameworkFactory.builder().connectString(ZkConfig.CONNECTOR_STR).sessionTimeoutMs(4000)
.retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();
curatorFramework.start();
}
@Override
public String discover(String serviceName) {
// 获取servicename完成路径
String serviceFullPath = ZkConfig.ZK_REGISTER_PATH + "/" + serviceName;
try {
repos = curatorFramework.getChildren().forPath(serviceFullPath);
} catch (Exception e) {
e.printStackTrace();
}
// 通过zk节点的watch机制实现服务端 servicename监听
registerWatch(serviceFullPath);
//负载均衡
ILoadBalance loadBalance = new RandomLoadBalanceImpl();
return loadBalance.select(repos);
}
/**
* 通过zk节点的watch机制实现服务端监听
*
* @param serviceFullPath
*/
private void registerWatch(String serviceFullPath) {
PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, serviceFullPath, true);
PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
repos = curatorFramework.getChildren().forPath(serviceFullPath);
System.out.println("consumer:avaliableServerList:"+ repos);
}
};
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
try {
pathChildrenCache.start();
} catch (Exception e) {
throw new RuntimeException("注册 PathChildrenCacheListener 异常" + e);
}
}
}
4.2负载均衡
4.2.1 负载均衡接口
package com.kikop.advancedBasedNetty.consumer.localloadbalance.service;
import java.util.List;
/**
* @author kikop
* @version 1.0
* @project Name: myAdvancedRpcBasedNetty
* @file Name: ILoadBalance
* @desc 功能描述
* @date 2020/8/30
* @time 10:16
* @by IDE: IntelliJ IDEA
*/
public interface ILoadBalance {
/**
* 获取一个可用的服务
* @param repos
* @return
*/
String select(List<String> repos);
}
4.2.2负载均衡实现类
package com.kikop.advancedBasedNetty.consumer.localloadbalance.service.impl;
import com.kikop.advancedBasedNetty.consumer.localloadbalance.service.ILoadBalance;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
/**
* @author kikop
* @version 1.0
* @project Name: myAdvancedRpcBasedNetty
* @file Name: ILoadBalance
* @desc 功能描述 随机策略,从所有可用的 provider 中随机选择一个
* (由于没有与服务的心跳检测机制,找到的服务不一定是活着的 todo)
* 随机策略,从所有可用的 provider 中随机选择一个,参考 Ribbon:RandomRule
* @date 2020/8/30
* @time 10:16
* @by IDE: IntelliJ IDEA
*/
public class RandomLoadBalanceImpl implements ILoadBalance {
/**
* 从本地缓存中拿到一个远程服务
*
* @param allServerList
* @return
*/
@Override
public String select(List<String> allServerList) {
String server = null;
while (null == server) {
// 1.查找远程服务
if (Thread.interrupted()) { // 判断currentThread 当前线程释放被中断
return null;
}
int serverCount = allServerList.size(); // 所有的远程服务列表
if (serverCount == 0) {
/*
* No servers. End regardless of pass, because subsequent passes
* only get more restrictive.
*/
return null;
}
int index = chooseRandomInt(serverCount);
server = allServerList.get(index);
if (null == server) {
/*
* The only time this should happen is if the server list were
* somehow trimmed. This is a transient condition. Retry after
* yielding.
*/
// 使当前线程由执行状态,变成为就绪状态,让出cpu时间
// 在下一个线程执行时候,此线程有可能被执行,也有可能没有被执行。
Thread.yield();
continue;
}
// 2.下面是远程服务找到的逻辑
// 2.1.判断,由于没有与服务的心跳检测机制,找到的服务不一定是活着的 todo
if (isActive(server)) {
return server;
}
//2.2.没有符合条件的
server = null;
Thread.yield();
}
return server;
}
/**
* 随机选择一个服务
*
* @param serverCount
* @return
*/
protected int chooseRandomInt(int serverCount) {
return ThreadLocalRandom.current().nextInt(serverCount);
}
/**
* 判读服务示范活着
*
* @param currentServer
* @return
*/
private boolean isActive(String currentServer) {
return true;
}
}
4.3客户端服务动态代理
4.3.1RpcClientProxyFactory
package com.kikop.advancedBasedNetty.consumer.proxy;
import com.kikop.advancedBasedNetty.consumer.registry.service.IServiceDiscovery;
/**
* @author kikop
* @version 1.0
* @project Name: myAdvancedRpcBasedNetty
* @file Name: RpcClientProxyFactory
* @desc 功能描述 创建动态代理类
* @date 2020/8/30
* @time 12:15
* @by IDE: IntelliJ IDEA
*/
public class RpcClientProxyFactory {
private IServiceDiscovery serviceDiscovery;
public RpcClientProxyFactory(IServiceDiscovery serviceDiscovery) {
this.serviceDiscovery = serviceDiscovery;
}
/**
* 创建远程代理对象
*
* @param interfaceClazz
* @param <T>
* @return
*/
public <T> T create(final Class<T> interfaceClazz) {
return RpcInvocationHandler.wrap(interfaceClazz, this.serviceDiscovery);
}
}
4.3.2RpcInvocationHandler
package com.kikop.advancedBasedNetty.consumer.proxy;
import com.kikop.advancedBasedNetty.consumer.registry.service.IServiceDiscovery;
import com.kikop.common.rpc.model.MyRpcReqInfo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectEncoder;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
/**
* @author kikop
* @version 1.0
* @project Name: myAdvancedRpcBasedNetty
* @file Name: RpcInvocationHandler
* @desc 功能描述
* @date 2020/8/30
* @time 12:15
* @by IDE: IntelliJ IDEA
*/
public class RpcInvocationHandler<T> implements InvocationHandler {
// 代理的接口
private Class<T> interfaceClazz;
// 远程可用服务
private IServiceDiscovery serviceDiscovery;
/**
* 创建远程代理对象
*
* @param <T>
* @return
*/
public static <T> T wrap(Class<T> interfaceClazz, IServiceDiscovery serviceDiscovery) {
return (T) Proxy.newProxyInstance(
interfaceClazz.getClassLoader(),
new Class<?>[]{interfaceClazz},
new RpcInvocationHandler<T>(interfaceClazz, serviceDiscovery));
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object result = null;
// 1.封装动态请求元数据
MyRpcReqInfo myRpcReqInfo = new MyRpcReqInfo();
// 类名
myRpcReqInfo.setClazzName(method.getDeclaringClass().getName());
// 方法名
myRpcReqInfo.setMethodName(method.getName());
// 方法参数类型
myRpcReqInfo.setParamTypes(method.getParameterTypes());
// 方法参数
myRpcReqInfo.setArgs(args);
// 2.获取本地可用的一个远程接口服务
// 2.1.通过服务接口名 serviceName
// serviceDiscovery[0]:"com.kikop.myrpcdiy.advancedBasedNetty.server.provider.IOrder", "127.0.0.1:9090"
String serviceName = this.interfaceClazz.getName();
// serviceAddress:
String serviceAddress = this.serviceDiscovery.discover(serviceName);
if (null == serviceAddress) {
System.out.println("avaiable serviceAddress is null!");
return null;
}
String[] splitArrays = serviceAddress.split(":");
String host = splitArrays[0];
int port = Integer.parseInt(splitArrays[1]);
// 3.开始发起真正的远程调用
result = doInvoke(host, port, myRpcReqInfo);
return result;
}
/**
* 真正的远程调用
*
* @return
*/
private Object doInvoke(String host, int port, MyRpcReqInfo myRpcReqInfo) {
Object result = null;
final MyClientHandler rpcClientProxyHandler = new MyClientHandler();
//3.发起远程服务调用
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
// NioSocketChannel:客户端连接通道
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(
new ChannelInitializer<SocketChannel>() {
// 新的连接到来时,获取刚创建的 SocketChannel,并进行管道初始化
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
pipeline.addLast("encoder", new ObjectEncoder());
pipeline.addLast("decoder", new io.netty.handler.codec.serialization.ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
// 通道IO数据交互
pipeline.addLast(rpcClientProxyHandler);
}
}
);
// 3.1.连接服务端
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
// 3.2.发起请求
channelFuture.channel().writeAndFlush(myRpcReqInfo);
// 3.3.等待同步结果
channelFuture.channel().closeFuture().sync();
// 3.4.返回 invoke执行结果
result = rpcClientProxyHandler.getResponse();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
return result;
}
private RpcInvocationHandler(Class<T> interfaceClazz, IServiceDiscovery serviceDiscovery) {
this.interfaceClazz = interfaceClazz;
this.serviceDiscovery = serviceDiscovery;
}
}
4.3.3客户端业务处理类
package com.kikop.advancedBasedNetty.consumer.proxy;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* @author kikop
* @version 1.0
* @project Name: myAdvancedRpcBasedNetty
* @file Name: RpcClientProxyFactory
* @desc 功能描述 客户端业务处理类
* @date 2020/8/30
* @time 12:15
* @by IDE: IntelliJ IDEA
*/
public class MyClientHandler extends ChannelInboundHandlerAdapter {
private Object response;
/**
* 远程执行结果
*
* @return
*/
public Object getResponse() {
return response;
}
/**
* 通道数据读取回调
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg);
response = msg;
}
}
4测试
4.1启动服务端
package com.kikop.advancedBasedNetty.producer;
import com.kikop.advancedBasedNetty.producer.manager.ServerManager;
import com.kikop.advancedBasedNetty.producer.registry.IRegisterCenter;
import com.kikop.advancedBasedNetty.producer.registry.impl.RegisterCenterImpl;
import com.kikop.advancedBasedNetty.producer.service.IOrder;
import com.kikop.advancedBasedNetty.producer.service.impl.OrderImp;
import java.io.IOException;
/**
* @author kikop
* @version 1.0
* @project Name: myAdvancedRpcBasedNetty
* @file Name: producerTest
* @desc 功能描述
* @date 2020/8/29
* @time 20:33
* @by IDE: IntelliJ IDEA
*/
public class producerTest {
/**
* 服务注册
*
* @throws IOException
*/
public static void regisSvrTest() throws IOException {
IRegisterCenter iRegisterCenter = new RegisterCenterImpl();
// 服务端口由 netty建立
// com.kikop.myrpcdiy.advancedBasedNetty.server.provider.IOrder
iRegisterCenter.register(IOrder.class.getName(), "127.0.0.1:9090");
iRegisterCenter.register(IOrder.class.getName(), "127.0.0.1:9191");
// 由于是临时节点,进程退出后,节点即被销毁,所以要 in.read
System.in.read();
}
/**
* 服务注册及启动
*/
public static void publisherSvrTest_9090() {
_publisherSvr(new OrderImp(), "127.0.0.1:9090");
}
/**
* 服务注册及启动
*/
public static void publisherSvrTest_9091() {
_publisherSvr(new OrderImp(), "127.0.0.1:9091");
}
/**
* 服务注册及启动
*
* @param order 订单服务
* @param serviceAddress 订单服务地址
*/
private static void _publisherSvr(IOrder order, String serviceAddress) {
// 1.创建服务1(现实中应该是两条不同的物理主机)
// 服务端口由 netty建立,发布服务
IRegisterCenter iRegisterCenter = new RegisterCenterImpl();
ServerManager serverManager = new ServerManager(serviceAddress, iRegisterCenter);
// 1.1.创建订单服务类对象
serverManager.registerSvr(order);
// 1.2.启动订单服务
serverManager.startSvr();
}
public static void main(String[] args) throws IOException {
// regisSvrTest();
publisherSvrTest_9091();
}
}
4.2模拟客户端发送请求
package com.kikop.advancedBasedNetty.consumer;
import com.kikop.advancedBasedNetty.consumer.proxy.RpcClientProxyFactory;
import com.kikop.advancedBasedNetty.consumer.registry.service.IServiceDiscovery;
import com.kikop.advancedBasedNetty.consumer.registry.service.impl.ServiceDiscoveryImpl;
import com.kikop.advancedBasedNetty.producer.service.IOrder;
import java.io.IOException;
import java.util.stream.IntStream;
/**
* @author kikop
* @version 1.0
* @project Name: myAdvancedRpcBasedNetty
* @file Name: servertest
* @desc 功能描述
* @date 2020/8/29
* @time 20:33
* @by IDE: IntelliJ IDEA
*/
public class consumerTest1 {
/**
* 查找一个可用服务测试
*
* @param clazz
*/
public static void findAvailableSeriveTest(Class<?> clazz) {
IServiceDiscovery iServiceDiscovery = new ServiceDiscoveryImpl();
String serveraddress = iServiceDiscovery.discover(clazz.getName());
System.out.println(serveraddress);
}
public static void businessMethodTest() {
IServiceDiscovery serviceDiscovery = new ServiceDiscoveryImpl();
RpcClientProxyFactory rpcClientProxyFactory = new RpcClientProxyFactory(serviceDiscovery);
// 1.创建接口动态代理类(对应远程服务注解的参数)
IOrder iOrder = rpcClientProxyFactory.create(IOrder.class);
// 2.调用测试
System.out.println(iOrder.buy(2));
}
public static void main(String[] args) throws IOException {
// findAvailableSeriveTest(IOrder.class);
IntStream.range(0, 10).parallel().forEach(value -> {
businessMethodTest();
});
}
}












网友评论