为什么需要RPC
调用远程服务能像调用本地服务一样简单
设计思路

整体过程
1 建立连接:运用TCP/IP,通过Socket和ServerSocket来解决;NIO非阻塞
2 信息传递:序列化和反序列化
3 服务调用:反射调用
4 底层代码通用处理:代理层
迭代过程
1 基本连通
2 注解和spring
3 多版本
代码实现
服务端
public class RpcRequest implements Serializable{
private static final long serialVersionUID = -5240819512410010846L;
private String className;
private String methodName;
private Class[] paramTypes;
private Object[] params;
private String version;
}
public class ServerSocketProxy implements ApplicationContextAware,InitializingBean {
private ExecutorService executorService = Executors.newCachedThreadPool();
private Map<String, Object> serviceMap = new HashMap<>();
private int port;
public ServerSocketProxy(int port) {
this.port = port;
}
@Override
public void afterPropertiesSet() throws Exception {
try {
ServerSocket serverSocket = new ServerSocket(port);
while(true){
Socket socket = serverSocket.accept();
executorService.execute(new ThreadHandler(serverSocket, socket, serviceMap));
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, Object> beansWithAnnotation = applicationContext.getBeansWithAnnotation(RpcAnno.class);
for (Object obj : beansWithAnnotation.values()) {
RpcAnno rpcAnno = obj.getClass().getAnnotation(RpcAnno.class);
String serviceName = rpcAnno.value().getName();
String version = rpcAnno.version();
if(!StringUtils.isEmpty(version)){
serviceName += "-" + version;
}
serviceMap.put(serviceName, obj);
}
}
}
public class ThreadHandler implements Runnable {
private ServerSocket serverSocket;
private Socket socket;
private Map<String, Object> serviceMap;
public ThreadHandler(ServerSocket serverSocket, Socket socket, Map<String, Object> serviceMap) {
this.serverSocket = serverSocket;
this.socket = socket;
this.serviceMap = serviceMap;
}
@Override
public void run() {
ObjectInputStream ois = null;
ObjectOutputStream oos = null;
try {
ois = new ObjectInputStream(socket.getInputStream());
oos = new ObjectOutputStream(socket.getOutputStream());
RpcRequest rpcRequest = (RpcRequest) ois.readObject();
Object result = invoke(rpcRequest);
oos.writeObject(result);
oos.flush();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}finally {
if(ois != null){
try {
ois.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(oos != null){
try {
oos.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* 调用方法
* @param rpcRequest
* @return
*/
private Object invoke(RpcRequest rpcRequest){
try {
String className = rpcRequest.getClassName();
String version = rpcRequest.getVersion();
if(!StringUtils.isEmpty(version)){
className += "-" + version;
}
Object obj = serviceMap.get(className);
if(obj == null){
throw new RuntimeException("没有找到对应的服务:" + rpcRequest.getClassName());
}
Class<?> serviceClazz = Class.forName(rpcRequest.getClassName());
Method method = serviceClazz.getDeclaredMethod(rpcRequest.getMethodName(),
rpcRequest.getParamTypes());
return method.invoke(obj, rpcRequest.getParams());
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcAnno {
/**
* 类
*/
Class value();
/**
* 版本
*/
String version() default "";
}
@Configuration
@ComponentScan("com.edward")
public class RpcSpringConfig {
@Bean(name = "serverSocketProxy")
public ServerSocketProxy serverSocketProxy() {
ServerSocketProxy serverSocketProxy = new ServerSocketProxy(8080);
return serverSocketProxy;
}
}
public class TestMain {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(RpcSpringConfig.class);
context.start();
}
}
客户端
public class RpcClientProxy implements InvocationHandler{
private RpcClientSocket clientSocket;
public RpcClientProxy(String ip, int port) {
this.clientSocket = new RpcClientSocket(ip, port);
}
public <T> T getInstance(Class<T> clazz){
//com.sun.proxy.$Proxy0 cannot be cast to clazz.getInterfaces()
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz}, this);
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setClassName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParamTypes(method.getParameterTypes());
rpcRequest.setParams(args);
rpcRequest.setVersion("v1.0");
return clientSocket.send(rpcRequest);
}
}
public class RpcClientSocket {
private String ip;
private int port;
public RpcClientSocket(String ip, int port) {
this.ip = ip;
this.port = port;
}
/**
* 传输内容
* @param context
* @return
*/
public Object send(Object context){
ObjectOutputStream oos = null;
ObjectInputStream ois = null;
try {
Socket socket = new Socket(ip, port);
oos = new ObjectOutputStream(socket.getOutputStream());
ois = new ObjectInputStream(socket.getInputStream());
oos.writeObject(context);
oos.flush();
return ois.readObject();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
@Component
public class SpringConfig {
@Bean(name = "rpcClientProxy")
public RpcClientProxy rpcClientProxy() {
RpcClientProxy rpcClientProxy = new RpcClientProxy("localhost",8080);
return rpcClientProxy;
}
}
public class TestMain {
public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(SpringConfig.class);
context.start();
RpcClientProxy rpcClientProxy = context.getBean("rpcClientProxy", RpcClientProxy.class);
IHelloService helloService = rpcClientProxy.getInstance(IHelloService.class);
System.out.println(helloService.save());
System.out.println(helloService.sayHello("edward"));
}
}
网友评论