美文网首页工作生活
基于BIO的RPC实现(1.0)

基于BIO的RPC实现(1.0)

作者: 吗丁啉要餐前吃 | 来源:发表于2019-07-04 00:34 被阅读0次

先实现服务端代码

1.rpc服务端框架实现

package com.zhang.frame;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class RpcServerFrame {
    //线程池,用来处理客户端socket    
    ThreadFactory threadFactory = new ThreadFactory() {
        private AtomicInteger atomicInteger = new AtomicInteger(1);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "thread" + atomicInteger.get());
        }
    };

    private ExecutorService executorService = new ThreadPoolExecutor(10, 15, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingDeque<>(1024), threadFactory, new ThreadPoolExecutor.AbortPolicy()
    );


    //服务注册中心
    private static final Map<String, Class> serverHolder = new HashMap<>();

    private int port;

    public RpcServerFrame(int port) {
        this.port = port;
    }

    //注册服务
    public void registServer(String className, Class impl) {
        serverHolder.put(className, impl);
    }

    //处理服务请求任务
    private static class ServerTask implements Runnable {
        private Socket socket;

        public ServerTask(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            try (ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
                 ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream())
            ) {
                //方法所在类的接口名
                String serviceName = inputStream.readUTF();
                //方法名
                String methodName = inputStream.readUTF();
                //参数类型
                Class<?>[] classes = (Class<?>[]) inputStream.readObject();
                //参数值
                String[] args = (String[]) inputStream.readObject();
                //从注册中心取出方法类
                Class serviceClass = serverHolder.get(serviceName);
                //找不到,报类找不到异常
                if (serviceClass == null) {
                    throw new ClassNotFoundException(serviceName + " not found");
                }
                //根据方法名获取method
                Method method = serviceClass.getMethod(methodName, classes);
                //反射进行方法调用
                Object obj = method.invoke(serviceClass.newInstance(), args);

                outputStream.writeObject(obj);
                outputStream.flush();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    //启动rpc服务
    public void startService() throws IOException {
        ServerSocket serverSocket = new ServerSocket();
        serverSocket.bind(new InetSocketAddress(port));
        System.out.println("RPC server on:" + port + " run");
        try {
            while (true) {
                executorService.execute(new ServerTask(serverSocket.accept()));
            }
        } finally {
            serverSocket.close();
        }
    }
}

2.创建服务端service接口与实体,供客户端调用

public interface UserService {

     User getUser();
}
public class UserServiceImpl implements UserService {

    @Override
    public User getUser() {
        User user=new User("张三",18);
        return user;
    }
}
public class User implements Serializable {
    
    private final String username;
    private final Integer age;
    public String getUsername() {
        return username;
    }
    public Integer getAge() {
        return age;
    }

    public User(String username, Integer age) {
        this.username = username;
        this.age = age;
    }
}

3.服务端启动

public class UserRpcServer {

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    RpcServerFrame userServer = new RpcServerFrame(8001);
                    userServer.registServer(UserService.class.getName(), UserServiceImpl.class);
                    userServer.startService();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

客户端代码实现

1.客户端rpc框架实现

package com.zhang.frame;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;

public class RpcClientFrame {

    //获取远程代理对象方法
    public static <T> T getRemoteProxyObj(Class<?> serviceInterface, String host, int port) {
        return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface},
                new DynProxy(serviceInterface, new InetSocketAddress(host, port)));
    }

    //代理类
    private static class DynProxy implements InvocationHandler {

        private Class<?> serviceInterface;
        private InetSocketAddress socketAddress;

        public DynProxy(Class<?> serviceInterface, InetSocketAddress socketAddress) {
            this.serviceInterface = serviceInterface;
            this.socketAddress = socketAddress;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            Socket socket = null;
            ObjectOutputStream outputStream = null;
            ObjectInputStream inputStream = null;
            try {
                socket = new Socket();
                socket.connect(socketAddress);
                outputStream = new ObjectOutputStream(socket.getOutputStream());
                //写入接口名
                outputStream.writeUTF(serviceInterface.getName());
                //写入方法名
                outputStream.writeUTF(method.getName());
                //写入参数类型
                outputStream.writeObject(method.getParameterTypes());
                //写入参数
                outputStream.writeObject(args);
                outputStream.flush();
                inputStream = new ObjectInputStream(socket.getInputStream());
                return inputStream.readObject();
            } finally {
                if (socket != null) {
                    socket.close();
                }
                if (outputStream != null) {
                    outputStream.close();
                }
                if (inputStream != null) {
                    inputStream.close();
                }
            }
        }
    }
}

2.客户端加入需要调用的服务端接口(反射)和实体(反序列化)

public interface UserService {
    public User getUser();
}
public class User implements Serializable {
    
    private final String username;
    private final Integer age;
    public String getUsername() {
        return username;
    }
    public Integer getAge() {
        return age;
    }
    
    public User(String username, Integer age) {
        this.username = username;
        this.age = age;
    }
}

3.客户端调用代码

package com.zhang;

import com.zhang.frame.RpcClientFrame;
import com.zhang.service.UserService;

public class UserRpcClient {
    public static void main(String[] args) {
        UserService userService=RpcClientFrame.getRemoteProxyObj(UserService.class,"127.0.0.1",8001);
        System.out.println(userService.getUser().getUsername()+"------"+userService.getUser().getAge());
    }
}

大功告成,下一步要将注册中心提出来!

相关文章

网友评论

    本文标题:基于BIO的RPC实现(1.0)

    本文链接:https://www.haomeiwen.com/subject/dmtkhctx.html