美文网首页
2021-03-14_基于netty请求的远程调用学习

2021-03-14_基于netty请求的远程调用学习

作者: kikop | 来源:发表于2021-03-14 20:46 被阅读0次

基于netty请求的远程调用学习

1概述

本文模拟远程过程调用进行实战总结,总的内容如下;

  1. 抽查接口公共部分

  2. zookeerper相关知识点热身、常用api操作。

  3. 针对服务端,实例化服务对象,指定服务的ip和端口。注册到zk端,并启动本地监听服务。(这里服务端我会测试开启多个,相同ip但不同端口)

    如:127.0.0.19090;127.0.0.1:9091

  4. 针对客户端:

    首先会去监听zk指定path下的服务列表。

    本地实现一个随机的服务选择算法,如需扩展,只需实现IServiceDiscovery接口即可。

    封装了RpcClientProxyFactory,客户端发起请求时,提供服务接口类,如 IOrder.class,创建远程代理对象。当发起调用时,则发起真正的invoke,发起netty通讯连接,并根据原始method构造MyRpcReqInfo请求结构体。

  5. 服务端手动MyRpcReqInfo请求后,解析从本地服务中找出对应的实例化服务对象并进行反射调用。

  6. 服务端对请求进行发序列化,并进行解析处理,最后结果封装成MyRpcRespInfo并返回。

1.1zk热身

zk基于windows操作系统,版本为:zookeeper-3.4.10

zkServer.cmd:启动zk服务端

zkCli.cmd:连接zk服务端

zkcli.cmd -server:localhost:2181:连接指定的zk服务器

建立节点 create /zk hello

获得节点 get /zk

设置节点 set /zk hello2

建立子节点 set /zk/subzk hello3

输出节点目录 ls /zk

删除节点 delete /zk等

2公共部分

2.1注解定义

2.1.1RpcClazzAnno

package com.kikop.common.rpc.annotations;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * @author kikop
 * @version 1.0
 * @project Name: MySimpledRpcBasedTomcat
 * @file Name: RpcClazzAnno
 * @desc 功能描述 服务类注解
 * @date 2020/8/23
 * @time 17:21
 * @by IDE: IntelliJ IDEA
 */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcClazzAnno {
}

2.1.2RpcMethodAnno

package com.kikop.common.rpc.annotations;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
 * @author kikop
 * @version 1.0
 * @project Name: MySimpledRpcBasedTomcat
 * @file Name: RpcMethodAnno
 * @desc 功能描述 服务类中方法注解
 * @date 2020/8/23
 * @time 17:21
 * @by IDE: IntelliJ IDEA
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcMethodAnno {
}

2.2通讯结构体

2.2.1请求

package com.kikop.common.rpc.model;

import java.io.Serializable;

/**
 * @author kikop
 * @version 1.0
 * @project Name: MySimpledRpcBasedTomcat
 * @file Name: MyRpcReqInfo
 * @desc 功能描述 RPC通信请求数据结构
 * @date 2020/8/23
 * @time 16:52
 * @by IDE: IntelliJ IDEA
 */
public class MyRpcReqInfo implements Serializable {

    private static final long serialVersionUID = 5774354750836094390L;

    /**
     * 包名:com.kikop.myrpcdiy.advancedBasedNetty.server.provider
     *
     */
    private String packetName;

    /**
     * 仅仅类名
     * simpled:OrderImpl、IOrder and so on
     * advanced:com.kikop.myrpcdiy.advancedBasedNetty.server.provider.IOrder
     */
    private String clazzName;

    /**
     * 方法名
     */
    private String methodName;

    /**
     * 方法参数值列表
     */
    private Object[] args;

    /**
     * 方法参数类型集合
     */
    private Class<?>[] paramTypes;


    public Class<?>[] getParamTypes() {
        return paramTypes;
    }

    public void setParamTypes(Class<?>[] paramTypes) {
        this.paramTypes = paramTypes;
    }

    private String additionInfo;

    public String getAdditionInfo() {
        return additionInfo;
    }

    public void setAdditionInfo(String additionInfo) {
        this.additionInfo = additionInfo;
    }

    public Object[] getArgs() {
        return args;
    }

    public void setArgs(Object[] args) {
        this.args = args;
    }

    public String getPacketName() {
        return packetName;
    }

    public void setPacketName(String packetName) {
        this.packetName = packetName;
    }

    public String getClazzName() {
        return clazzName;
    }

    public void setClazzName(String clazzName) {
        this.clazzName = clazzName;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

}

2.2.2响应

package com.kikop.common.rpc.model;

import java.io.Serializable;

/**
 * @author kikop
 * @version 1.0
 * @project Name: MySimpledRpcBasedTomcat
 * @file Name: MyRpcReqInfo
 * @desc 功能描述 RPC通信响应结构
 * @date 2020/8/23
 * @time 16:52
 * @by IDE: IntelliJ IDEA
 */
public class MyRpcRespInfo implements Serializable {


    private static final long serialVersionUID = 6027658366486943598L;


    /**
     * 请求方法名
     */
    private String methodName;

    /**
     * 请求结果
     */
    private Object result;

    private String additionInfo;

    public String getAdditionInfo() {
        return additionInfo;
    }

    public void setAdditionInfo(String additionInfo) {
        this.additionInfo = additionInfo;
    }


    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Object getResult() {
        return result;
    }

    public void setResult(Object result) {
        this.result = result;
    }
}

2.3注解工具类

package com.kikop.common.rpc;


import com.alibaba.fastjson.JSONObject;
import com.kikop.common.rpc.annotations.RpcClazzAnno;
import com.kikop.common.rpc.annotations.RpcMethodAnno;

import java.io.File;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @author kikop
 * @version 1.0
 * @project Name: MySimpledRpcBasedTomcat
 * @file Name: MyAnnotationUtil
 * @desc 功能描述 注解读取工具类
 * @date 2020/8/23
 * @time 17:24
 * @by IDE: IntelliJ IDEA
 */
public class MyAnnotationUtil {


    /**
     * 获取包中指定类注解的指定方法
     *
     * @param basePackage 包名 com.kikop.simpledBasedTomcat.service
     * @return {"com.kikop.simpledBasedTomcat.service.OrderImpl":
     * [{"getName":[]},
     * {"setName":["String"]},
     * {"getOrderCount":["String","Integer"]}
     * ]
     * }
     * @throws ClassNotFoundException
     */
    public static Map<String, List<Map<String, List<String>>>> parse(String basePackage) throws ClassNotFoundException {

        Map<String, List<Map<String, List<String>>>> resultClazzMap = new HashMap<>();

        // 1.获取当前注解所在的资源classPath类路径:
        // /E:/workdirectory/Dev/study/MySimpledRpcBasedTomcat/target/classes/
        String rootPath = MyAnnotationUtil.class.getResource("/").getPath();

        // 1.1.转换包路径到文件结构:
        // :com/kikop/simpledBasedTomcat/service
        String basePacketPath = basePackage.replace(".", "/");

        // 1.2.构造File对象
        // /E:/workdirectory/Dev/study/MySimpledRpcBasedTomcat/target/classes/com/kikop/simpledBasedTomcat/service
        String packagePathName = rootPath + basePacketPath;
        File file = new File(packagePathName);

        // 2.遍历包下的所有文件
        // 2.1.遍历得到文件名列表
        // [0]:OrderImpl.class
        String[] names = file.list();
        for (String fileName : names) {

            // OrderImpl.class--> OrderImpl
            String className = fileName.replaceAll(".class", "");

            // com.kikop.simpledBasedTomcat.service.OrderImpl
            String fullClazzName = basePackage + "." + className;
            Class<?> aClass = Class.forName(fullClazzName);

            if (aClass.isAnnotationPresent(RpcClazzAnno.class)) { // 指定类上的注解
                Method[] declaredMethods = aClass.getDeclaredMethods();

                List<Map<String, List<String>>> resulMethodsWithinClazzMap = new ArrayList<>();

                // 2.1.遍历类上定义的需要对外提供服务的方法,并缓存到MethodMap中
                for (Method declaredMethod : declaredMethods) {

                    if (declaredMethod.isAnnotationPresent(RpcMethodAnno.class)) { // 指定方法上的注解

                        //1.1.Map:key 包名+类名,v:方法列表
                        // key:方法名
                        // value:方法参数类型
                        Map<String, List<String>> methodMap = new HashMap<>();

                        List<String> params = new ArrayList<>(); // 方法的参数列表
                        Class<?>[] parameterTypes = declaredMethod.getParameterTypes();
                        for (Class<?> parameterType : parameterTypes) {
                            String simpleName = parameterType.getSimpleName();
                            params.add(simpleName);
                        }
                        methodMap.put(declaredMethod.getName(), params);
                        resulMethodsWithinClazzMap.add(methodMap);
                    }
                }
                // key:com.kikop.simpledBasedTomcat.service.OrderImpl
                // value:resulMethodsWithinClazzMap
                resultClazzMap.put(fullClazzName, resulMethodsWithinClazzMap);
            }
        }
//        System.out.println(JSONObject.toJSON(resultClazzMap));
        return resultClazzMap;
    }

    public static void main(String[] args) throws ClassNotFoundException {

        MyAnnotationUtil.parse(ServerConfig.BASEPACKAGE);
    }
}

2.3.1打印服务端服务列表

image-20210314204253483.png

2.4原生socket接口封装

package com.kikop.common.rpc;

import com.kikop.common.rpc.model.MyRpcReqInfo;
import com.kikop.common.rpc.model.MyRpcRespInfo;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.util.Arrays;

/**
 * @author kikop
 * @version 1.0
 * @project Name: MySimpledRpcBasedTomcat
 * @file Name: MySocketUtils
 * @desc 功能描述 socket数据序列化发送
 * @date 2021/3/13
 * @time 17:24
 * @by IDE: IntelliJ IDEA
 */
public class MySocketUtils {

    /**
     * 获取请求元数据
     *
     * @param currentSocket
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     */
    public static MyRpcReqInfo getReqMetaData(Socket currentSocket) throws IOException, ClassNotFoundException {

        // 1.获取远程请求参数并反序列化
        ObjectInputStream objectInputStream = new ObjectInputStream(currentSocket.getInputStream());
        MyRpcReqInfo myRpcReqInfo = (MyRpcReqInfo) objectInputStream.readObject();
        return myRpcReqInfo;
    }

    /**
     * 发送请求元数据
     *
     * @param currentSocket
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     */
    public static void sendReqMetaData(Socket currentSocket, MyRpcReqInfo myRpcReqInfo) throws IOException, ClassNotFoundException {

        // 1.获取通道写入流
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(currentSocket.getOutputStream());
        // 2.发起请求
        objectOutputStream.writeObject(myRpcReqInfo);
        // 提交
        objectOutputStream.flush();
        // 关闭输出流
        currentSocket.shutdownOutput();

        // 3.获取输入流,并读取服务器端的响应信息
        ObjectInputStream objectInputStream = new ObjectInputStream(currentSocket.getInputStream());
        Object respResult = objectInputStream.readObject();
        if (respResult instanceof MyRpcRespInfo) {
            MyRpcRespInfo myRpcRespInfo = (MyRpcRespInfo) respResult;

            System.out.println(String.format("taskInfo:[%s],reqClazzInfo:[%s.%s:%s]-->reqResult:%s",
                    myRpcReqInfo.getAdditionInfo(),
                    myRpcReqInfo.getClazzName(), myRpcReqInfo.getMethodName(), Arrays.toString(myRpcReqInfo.getArgs()),
                    String.valueOf(myRpcRespInfo.getResult())));
        }
        //4.关闭资源
        objectOutputStream.close();
        objectInputStream.close();
        currentSocket.close();
    }

    /**
     * 发送请求结果
     *
     * @param currentSocket
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     */
    public static void sendRespResult(Socket currentSocket, MyRpcRespInfo myRpcRespInfo) throws IOException, ClassNotFoundException {

        // 1.获取输出流,向服务器端发送信息
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(currentSocket.getOutputStream());

        // 2.发起请求
        objectOutputStream.writeObject(myRpcRespInfo);

        // 3.提交并关闭流
        objectOutputStream.flush();
        objectOutputStream.close();
    }


}

2.5服务参数配置

2.5.1ServerConfig

package com.kikop.common.rpc;

/**
 * @author kikop
 * @version 1.0
 * @project Name: MySimpledRpcBasedTomcat
 * @file Name: ServerConfig
 * @desc 功能描述 服务配置类,定义对外抛出的服务
 * @date 2020/8/23
 * @time 17:21
 * @by IDE: IntelliJ IDEA
 */
public class ServerConfig {

    public final static String HOST = "127.0.0.1";
    public final static int PORT = 6666;

    public static final String BASEPACKAGE = "com.kikop.simpledBasedTomcat.service";



}

2.5.2ZkConfig

package com.kikop.common.rpc;

/**
 * @author kikop
 * @version 1.0
 * @project Name: MySimpledRpcBasedTomcat
 * @file Name: ZkConfig
 * @desc 功能描述 zookeeper配置信息
 * @date 2020/8/29
 * @time 20:09
 * @by IDE: IntelliJ IDEA
 */
public class ZkConfig {

    // zk服务中心
    public final static String CONNECTOR_STR = "127.0.0.1:2181";

    // 服务根路径(一对多)
    public final static String ZK_REGISTER_PATH = "/registryserver";
}

3服务端

3.1maven依赖

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.kikop</groupId>
    <artifactId>myAdvancedRpcBasedNetty</artifactId>
    <version>1.0-SNAPSHOT</version>

    <name>myAdvancedRpcBasedNetty</name>
    <!-- FIXME change it to the project's website -->
    <url>http://www.example.com</url>

    <properties>

        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>

        <netty.verson>4.1.6.Final</netty.verson>

        <!--zookeeper-->
        <zookeeper.version>3.4.10</zookeeper.version>
        <curator.version>2.5.0</curator.version>


        <fastjson.version>1.2.29</fastjson.version>

    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>

        <!--begin with netty rpc-->

        <!--1.对zookeeper的底层api的一些封装-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>${curator.version}</version>
        </dependency>

        <!--1.1.封装了一些高级特性-->
        <!--如:Cache事件监听、选举、分布式锁、分布式计数器、分布式 Barrier等-->
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>${curator.version}</version>
        </dependency>

        <!--2.netty-->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>${netty.verson}</version>
        </dependency>

        <!--end with netty rpc-->


        <!--3.fastjson-->
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>${fastjson.version}</version>
        </dependency>


    </dependencies>

    <build>
        <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
            <plugins>
                <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
                <plugin>
                    <artifactId>maven-clean-plugin</artifactId>
                    <version>3.1.0</version>
                </plugin>
                <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
                <plugin>
                    <artifactId>maven-resources-plugin</artifactId>
                    <version>3.0.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.0</version>
                </plugin>
                <plugin>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.22.1</version>
                </plugin>
                <plugin>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>3.0.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-install-plugin</artifactId>
                    <version>2.5.2</version>
                </plugin>
                <plugin>
                    <artifactId>maven-deploy-plugin</artifactId>
                    <version>2.8.2</version>
                </plugin>
                <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
                <plugin>
                    <artifactId>maven-site-plugin</artifactId>
                    <version>3.7.1</version>
                </plugin>
                <plugin>
                    <artifactId>maven-project-info-reports-plugin</artifactId>
                    <version>3.0.0</version>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>

3.2对外接口服务

3.2.1服务接口

package com.kikop.advancedBasedNetty.producer.service;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myAdvancedRpcBasedNetty
 * @file Name: IOrder
 * @desc 功能描述
 * @date 2020/8/29
 * @time 19:39
 * @by IDE: IntelliJ IDEA
 */
public interface IOrder {
    String buy(int type);
}

3.2.2服务实现类

package com.kikop.advancedBasedNetty.producer.service.impl;


import com.kikop.advancedBasedNetty.producer.service.IOrder;
import com.kikop.common.rpc.annotations.RpcIFValueAnno;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myAdvancedRpcBasedNetty
 * @file Name: OrderImp
 * @desc 功能描述
 * @date 2020/8/29
 * @time 19:39
 * @by IDE: IntelliJ IDEA
 */

// 实现服务类对应的接口
@RpcIFValueAnno(value = IOrder.class)
public class OrderImp implements IOrder {

    @Override
    public String buy(int type) {
        if (1 == type) {
            return "手机";
        }
        return "电脑";
    }
}

注意,对外接口服务统一放在:com.kikop.simpledBasedTomcat.service这个包路径下。

package com.kikop.simpledBasedTomcat.service;

import com.kikop.common.rpc.annotations.RpcClazzAnno;
import com.kikop.common.rpc.annotations.RpcMethodAnno;


/**
 * @author kikop
 * @version 1.0
 * @project Name: MySimpledRpcBasedTomcat
 * @file Name: OrderImpl
 * @desc 功能描述 模拟对外提供的订单服务
 * @date 2020/8/23
 * @time 17:21
 * @by IDE: IntelliJ IDEA
 */
@RpcClazzAnno
public class OrderImpl {

    private String orderName;

    @RpcMethodAnno
    public String getOrderName() {
        return orderName;
    }

    @RpcMethodAnno
    public void setOrderName(String orderName) {
        this.orderName = orderName;
    }

    @RpcMethodAnno
    public Integer getOrderCount(String orderName) {

        if (orderName.equalsIgnoreCase("apple")) {
            return 100;
        }
        return 10;
    }
}

3.3服务注册中心

3.3.1IRegisterCenter

package com.kikop.advancedBasedNetty.producer.registry;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myAdvancedRpcBasedNetty
 * @file Name: 基于zk的注册中心
 * @desc 功能描述
 * @date 2020/8/29
 * @time 20:23
 * @by IDE: IntelliJ IDEA
 */
public interface IRegisterCenter {
    /**
     * 服务注册到zk
     * @param serviceName 服务名(接口类全名):com.kikop.myrpcdiy.advancedBasedNetty.server.provider.IOrder
     * @param servieAdress 服务地址:127.0.0.1:9090
     */
    void register(String serviceName, String servieAdress);
}

3.3.2RegisterCenterImpl

package com.kikop.advancedBasedNetty.producer.registry.impl;

import com.kikop.advancedBasedNetty.consumer.registry.ZkConfig;
import com.kikop.advancedBasedNetty.producer.registry.IRegisterCenter;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myAdvancedRpcBasedNetty
 * @file Name: 基于zk的注册中心
 * @desc 功能描述
 * @date 2020/8/29
 * @time 20:23
 * @by IDE: IntelliJ IDEA
 */
public class RegisterCenterImpl implements IRegisterCenter {

    /**
     * zk客户端调用API接口
     */
    private CuratorFramework curatorFramework;

    public RegisterCenterImpl() {
        // 内部启动两个线程,一个用于监听、一个用于后台操作轮询
        curatorFramework = CuratorFrameworkFactory.builder()
                .connectString(ZkConfig.CONNECTOR_STR)
                .sessionTimeoutMs(4000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 10))
                .build();
        curatorFramework.start();
    }

    /**
     * 服务注册到zk
     * @param serviceName 服务名
     * @param servieAdress 服务地址
     */
    @Override
    public void register(String serviceName, String servieAdress) {

        String servicePath = ZkConfig.ZK_REGISTER_PATH + "/" + serviceName;
        try {
            // 1.创建永久节点:服务名
            if (curatorFramework.checkExists().forPath(servicePath) == null) {
                curatorFramework.create().
                        creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .forPath(servicePath, "0".getBytes());

            }
            // 2.创建临时节点:/registrys/com.kikop.IG/serviceAddress1
            String addressPath = servicePath + "/" + servieAdress;
            String rsNode = curatorFramework.create()
                    .withMode(CreateMode.EPHEMERAL)
                    .forPath(addressPath, "0".getBytes());
            System.out.println("服务注册成功:" + rsNode);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

3.3服务端逻辑

3.3.1服务端业务处理

package com.kikop.advancedBasedNetty.producer.handler;

import com.kikop.common.rpc.model.MyRpcReqInfo;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.lang.reflect.Method;
import java.util.HashMap;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myAdvancedRpcBasedNetty
 * @file Name: RpcHandler
 * @desc 功能描述
 * @date 2020/8/30
 * @time 10:59
 * @by IDE: IntelliJ IDEA
 */
public class MyServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当前的服务端信息
     */
    private String serverAddress;

    /**
     * 缓存支持的服务列表
     * [0]:
     * key:接口名 com.kikop.myrpcdiy.advancedBasedNetty.server.provider.IOrder
     * value:接口实现类 OrderImpl
     * [1]:
     * key:接口名 com.kikop.myrpcdiy.advancedBasedNetty.server.provider.IAccount
     * value:接口实现类 AccountImpl
     */
    private HashMap<String, Object> serverHandlerMap = new HashMap<>();

    public MyServerHandler(String serverAddress,HashMap<String, Object> serverHandlerMap) {
        this.serverAddress=serverAddress;
        this.serverHandlerMap = serverHandlerMap;
    }

    /**
     * @param ctx 写数据给客户端
     * @param msg 读客户端数据
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        //super.channelRead(ctx, msg);
        MyRpcReqInfo myRpcReqInfo = (MyRpcReqInfo) msg;

        Object result = new Object();

        if (serverHandlerMap.containsKey(myRpcReqInfo.getClazzName())) {

            // 获取服务对象
            Object serverInstance = serverHandlerMap.get(myRpcReqInfo.getClazzName());
            // 获取类中的方法(带参数)
            Method method = serverInstance.getClass().getMethod(
                    myRpcReqInfo.getMethodName(), myRpcReqInfo.getParamTypes());
            // 调用方法
            result = method.invoke(serverInstance, myRpcReqInfo.getArgs());
            result=String.format("%s-->%s",serverAddress,result);
        }
        ctx.write(result);
        ctx.flush();
        ctx.close();

    }
}

3.3.2服务管理类

package com.kikop.advancedBasedNetty.producer.manager;

import com.kikop.advancedBasedNetty.producer.handler.MyServerHandler;
import com.kikop.advancedBasedNetty.producer.registry.IRegisterCenter;
import com.kikop.common.rpc.annotations.RpcIFValueAnno;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectEncoder;

import java.util.HashMap;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myAdvancedRpcBasedNetty
 * @file Name: ServerManager
 * @desc 功能描述 服务的管理(发布、启停等)
 * @date 2020/8/30
 * @time 10:43
 * @by IDE: IntelliJ IDEA
 */
public class ServerManager {

    private IRegisterCenter registerCenter;

    // 127.0.0.1:9090
    private String serviceAddress;

    /**
     * cache缓存当前的对外服务列表
     * key:接口名,由 packetName +"."+clazzName
     * com.kikop.advancedBasedNetty.producer.service.IOrder
     * value:实例对象 OrderImpl
     */
    private HashMap<String, Object> serverHandlerMap = new HashMap<>();

    /**
     * @param serviceAddress
     * @param registerCenter
     */
    public ServerManager(String serviceAddress, IRegisterCenter registerCenter) {
        this.serviceAddress = serviceAddress;
        this.registerCenter = registerCenter;
    }

    /**
     * 启动服务
     */
    public void startSvr() {
        try {
            EventLoopGroup bossGroup = new NioEventLoopGroup(); // boss
            EventLoopGroup workerGroup = new NioEventLoopGroup(); // worker
            ServerBootstrap serverBootstrap = new ServerBootstrap();  // netty启动类

            serverBootstrap.group(bossGroup, workerGroup);
            serverBootstrap.channel(NioServerSocketChannel.class);
            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {

                @Override
                protected void initChannel(SocketChannel channel) throws Exception {
                    ChannelPipeline pipeline = channel.pipeline();

                    pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                    pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));

                    pipeline.addLast("encoder", new ObjectEncoder());
                    pipeline.addLast("decoder", new io.netty.handler.codec.serialization.ObjectDecoder(Integer.MAX_VALUE,
                            ClassResolvers.cacheDisabled(null)));

                    // io 数据交互
                    pipeline.addLast(new MyServerHandler(serviceAddress, serverHandlerMap));
                }
            }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);

            String[] addressArray = serviceAddress.split(":");
            String ip = addressArray[0];
            int port = Integer.parseInt(addressArray[1]);

            ChannelFuture channelFuture = serverBootstrap.bind(ip, port).sync();
            System.out.println("Netty服务端启动成功,等待客户端连接...");
            channelFuture.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }


    /**
     * 缓存当前的接口服务
     *
     * @param services 当前支持的服务列表
     */
    public void registerSvr(Object... services) {

        for (Object service : services) {
            // 判断类上是否有指定的注解
            if (service.getClass().isAnnotationPresent(RpcIFValueAnno.class)) {
                // 获取特定的注解
                RpcIFValueAnno rpcIFValueAnno = service.getClass().getAnnotation(RpcIFValueAnno.class);

                // rpcIFValueAnno.value().isAssignableFrom(service.getClass())

                // key:服务类上的注解类全名信息,如
                // com.kikop.advancedBasedNetty.producer.service.IOrder
                // value:127.0.0.1:9090
                serverHandlerMap.put(rpcIFValueAnno.value().getName(), service);
            }
        }

        for (String fullInterfaceClazzName : serverHandlerMap.keySet()) {
            // iRegisterCenter.register("com.kikop.myrpcdiy.advancedBasedNetty.server.provider.IOrder", "127.0.0.1:9090");
            registerCenter.register(fullInterfaceClazzName, serviceAddress);
        }

    }
}

4客户端

4.1服务查询

4.1.1服务选择接口

package com.kikop.advancedBasedNetty.consumer.registry.service;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myAdvancedRpcBasedNetty
 * @file Name: ZkConfig
 * @desc 功能描述
 * @date 2020/8/29
 * @time 20:09
 * @by IDE: IntelliJ IDEA
 */
public interface IServiceDiscovery {

    /**
     * 获取一个可用的服务
     * @param serviceName: com.kikopmyrpcdiy.advancedBasedNetty.server.provider.IOrder
     * @return
     */
    public String discover(String serviceName) ;

}

4.1.2服务选择实现类

package com.kikop.advancedBasedNetty.consumer.registry.service.impl;

import com.kikop.advancedBasedNetty.consumer.localloadbalance.service.ILoadBalance;
import com.kikop.advancedBasedNetty.consumer.localloadbalance.service.impl.RandomLoadBalanceImpl;
import com.kikop.advancedBasedNetty.consumer.registry.service.IServiceDiscovery;
import com.kikop.common.rpc.ZkConfig;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.ArrayList;
import java.util.List;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myAdvancedRpcBasedNetty
 * @file Name: ZkConfig
 * @desc 功能描述
 * @date 2020/8/29
 * @time 20:09
 * @by IDE: IntelliJ IDEA
 */
public class ServiceDiscoveryImpl implements IServiceDiscovery {

    /**
     * 服务列表
     * 用于负载均衡
     */
    List<String> repos = new ArrayList<>();

    public CuratorFramework curatorFramework;

    public ServiceDiscoveryImpl() {
        curatorFramework = CuratorFrameworkFactory.builder().connectString(ZkConfig.CONNECTOR_STR).sessionTimeoutMs(4000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 10)).build();
        curatorFramework.start();
    }

    @Override
    public String discover(String serviceName) {

        // 获取servicename完成路径
        String serviceFullPath = ZkConfig.ZK_REGISTER_PATH + "/" + serviceName;

        try {
            repos = curatorFramework.getChildren().forPath(serviceFullPath);
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 通过zk节点的watch机制实现服务端 servicename监听
        registerWatch(serviceFullPath);

        //负载均衡
        ILoadBalance loadBalance = new RandomLoadBalanceImpl();
        return loadBalance.select(repos);
    }

    /**
     * 通过zk节点的watch机制实现服务端监听
     *
     * @param serviceFullPath
     */
    private void registerWatch(String serviceFullPath) {
        PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, serviceFullPath, true);

        PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                repos = curatorFramework.getChildren().forPath(serviceFullPath);
                System.out.println("consumer:avaliableServerList:"+ repos);
            }
        };
        pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);

        try {
            pathChildrenCache.start();
        } catch (Exception e) {
            throw new RuntimeException("注册 PathChildrenCacheListener 异常" + e);
        }
    }
}

4.2负载均衡

4.2.1 负载均衡接口

package com.kikop.advancedBasedNetty.consumer.localloadbalance.service;

import java.util.List;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myAdvancedRpcBasedNetty
 * @file Name: ILoadBalance
 * @desc 功能描述
 * @date 2020/8/30
 * @time 10:16
 * @by IDE: IntelliJ IDEA
 */
public interface ILoadBalance {
    /**
     * 获取一个可用的服务
     * @param repos
     * @return
     */
    String select(List<String> repos);
}

4.2.2负载均衡实现类

package com.kikop.advancedBasedNetty.consumer.localloadbalance.service.impl;


import com.kikop.advancedBasedNetty.consumer.localloadbalance.service.ILoadBalance;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myAdvancedRpcBasedNetty
 * @file Name: ILoadBalance
 * @desc 功能描述 随机策略,从所有可用的 provider 中随机选择一个
 * (由于没有与服务的心跳检测机制,找到的服务不一定是活着的 todo)
 * 随机策略,从所有可用的 provider 中随机选择一个,参考 Ribbon:RandomRule
 * @date 2020/8/30
 * @time 10:16
 * @by IDE: IntelliJ IDEA
 */
public class RandomLoadBalanceImpl implements ILoadBalance {

    /**
     * 从本地缓存中拿到一个远程服务
     *
     * @param allServerList
     * @return
     */
    @Override
    public String select(List<String> allServerList) {

        String server = null;

        while (null == server) {

            // 1.查找远程服务
            if (Thread.interrupted()) { // 判断currentThread 当前线程释放被中断
                return null;
            }
            int serverCount = allServerList.size(); // 所有的远程服务列表
            if (serverCount == 0) {
                /*
                 * No servers. End regardless of pass, because subsequent passes
                 * only get more restrictive.
                 */
                return null;
            }

            int index = chooseRandomInt(serverCount);
            server = allServerList.get(index);

            if (null == server) {
                /*
                 * The only time this should happen is if the server list were
                 * somehow trimmed. This is a transient condition. Retry after
                 * yielding.
                 */
                // 使当前线程由执行状态,变成为就绪状态,让出cpu时间
                // 在下一个线程执行时候,此线程有可能被执行,也有可能没有被执行。
                Thread.yield();
                continue;
            }


            //  2.下面是远程服务找到的逻辑

            // 2.1.判断,由于没有与服务的心跳检测机制,找到的服务不一定是活着的 todo
            if (isActive(server)) {
                return server;
            }

            //2.2.没有符合条件的
            server = null;
            Thread.yield();
        }
        return server;
    }

    /**
     * 随机选择一个服务
     *
     * @param serverCount
     * @return
     */
    protected int chooseRandomInt(int serverCount) {
        return ThreadLocalRandom.current().nextInt(serverCount);
    }

    /**
     * 判读服务示范活着
     *
     * @param currentServer
     * @return
     */
    private boolean isActive(String currentServer) {
        return true;
    }
}

4.3客户端服务动态代理

4.3.1RpcClientProxyFactory

package com.kikop.advancedBasedNetty.consumer.proxy;

import com.kikop.advancedBasedNetty.consumer.registry.service.IServiceDiscovery;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myAdvancedRpcBasedNetty
 * @file Name: RpcClientProxyFactory
 * @desc 功能描述 创建动态代理类
 * @date 2020/8/30
 * @time 12:15
 * @by IDE: IntelliJ IDEA
 */
public class RpcClientProxyFactory {
    private IServiceDiscovery serviceDiscovery;

    public RpcClientProxyFactory(IServiceDiscovery serviceDiscovery) {
        this.serviceDiscovery = serviceDiscovery;
    }

    /**
     * 创建远程代理对象
     *
     * @param interfaceClazz
     * @param <T>
     * @return
     */
    public <T> T create(final Class<T> interfaceClazz) {
        return RpcInvocationHandler.wrap(interfaceClazz, this.serviceDiscovery);
    }
}

4.3.2RpcInvocationHandler

package com.kikop.advancedBasedNetty.consumer.proxy;

import com.kikop.advancedBasedNetty.consumer.registry.service.IServiceDiscovery;
import com.kikop.common.rpc.model.MyRpcReqInfo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectEncoder;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myAdvancedRpcBasedNetty
 * @file Name: RpcInvocationHandler
 * @desc 功能描述
 * @date 2020/8/30
 * @time 12:15
 * @by IDE: IntelliJ IDEA
 */
public class RpcInvocationHandler<T> implements InvocationHandler {

    // 代理的接口
    private Class<T> interfaceClazz;

    // 远程可用服务
    private IServiceDiscovery serviceDiscovery;


    /**
     * 创建远程代理对象
     *
     * @param <T>
     * @return
     */
    public static <T> T wrap(Class<T> interfaceClazz, IServiceDiscovery serviceDiscovery) {
        return (T) Proxy.newProxyInstance(
                interfaceClazz.getClassLoader(),
                new Class<?>[]{interfaceClazz},
                new RpcInvocationHandler<T>(interfaceClazz, serviceDiscovery));
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        Object result = null;

        // 1.封装动态请求元数据
        MyRpcReqInfo myRpcReqInfo = new MyRpcReqInfo();

        // 类名
        myRpcReqInfo.setClazzName(method.getDeclaringClass().getName());
        // 方法名
        myRpcReqInfo.setMethodName(method.getName());
        // 方法参数类型
        myRpcReqInfo.setParamTypes(method.getParameterTypes());
        // 方法参数
        myRpcReqInfo.setArgs(args);

        // 2.获取本地可用的一个远程接口服务

        // 2.1.通过服务接口名 serviceName
        // serviceDiscovery[0]:"com.kikop.myrpcdiy.advancedBasedNetty.server.provider.IOrder", "127.0.0.1:9090"
        String serviceName = this.interfaceClazz.getName();
        // serviceAddress:
        String serviceAddress = this.serviceDiscovery.discover(serviceName);
        if (null == serviceAddress) {
            System.out.println("avaiable serviceAddress is null!");
            return null;
        }
        String[] splitArrays = serviceAddress.split(":");
        String host = splitArrays[0];
        int port = Integer.parseInt(splitArrays[1]);

        // 3.开始发起真正的远程调用
        result = doInvoke(host, port, myRpcReqInfo);
        return result;
    }

    /**
     * 真正的远程调用
     *
     * @return
     */
    private Object doInvoke(String host, int port, MyRpcReqInfo myRpcReqInfo) {

        Object result = null;
        final MyClientHandler rpcClientProxyHandler = new MyClientHandler();

        //3.发起远程服务调用
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();

        try {
            Bootstrap bootstrap = new Bootstrap();
            // NioSocketChannel:客户端连接通道
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(

                            new ChannelInitializer<SocketChannel>() {
                                // 新的连接到来时,获取刚创建的 SocketChannel,并进行管道初始化
                                @Override
                                protected void initChannel(SocketChannel channel) throws Exception {
                                    ChannelPipeline pipeline = channel.pipeline();

                                    pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                                    pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));

                                    pipeline.addLast("encoder", new ObjectEncoder());
                                    pipeline.addLast("decoder", new io.netty.handler.codec.serialization.ObjectDecoder(Integer.MAX_VALUE,
                                            ClassResolvers.cacheDisabled(null)));

                                    // 通道IO数据交互
                                    pipeline.addLast(rpcClientProxyHandler);
                                }
                            }
                    );

            // 3.1.连接服务端
            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
            // 3.2.发起请求
            channelFuture.channel().writeAndFlush(myRpcReqInfo);
            // 3.3.等待同步结果
            channelFuture.channel().closeFuture().sync();
            // 3.4.返回 invoke执行结果
            result = rpcClientProxyHandler.getResponse();

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
        return result;
    }

    private RpcInvocationHandler(Class<T> interfaceClazz, IServiceDiscovery serviceDiscovery) {
        this.interfaceClazz = interfaceClazz;
        this.serviceDiscovery = serviceDiscovery;
    }

}

4.3.3客户端业务处理类

package com.kikop.advancedBasedNetty.consumer.proxy;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myAdvancedRpcBasedNetty
 * @file Name: RpcClientProxyFactory
 * @desc 功能描述 客户端业务处理类
 * @date 2020/8/30
 * @time 12:15
 * @by IDE: IntelliJ IDEA
 */
public class MyClientHandler extends ChannelInboundHandlerAdapter {

    private Object response;

    /**
     * 远程执行结果
     *
     * @return
     */
    public Object getResponse() {
        return response;
    }

    /**
     * 通道数据读取回调
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        super.channelRead(ctx, msg);
        response = msg;
    }
}

4测试

4.1启动服务端

package com.kikop.advancedBasedNetty.producer;


import com.kikop.advancedBasedNetty.producer.manager.ServerManager;
import com.kikop.advancedBasedNetty.producer.registry.IRegisterCenter;
import com.kikop.advancedBasedNetty.producer.registry.impl.RegisterCenterImpl;
import com.kikop.advancedBasedNetty.producer.service.IOrder;
import com.kikop.advancedBasedNetty.producer.service.impl.OrderImp;

import java.io.IOException;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myAdvancedRpcBasedNetty
 * @file Name: producerTest
 * @desc 功能描述
 * @date 2020/8/29
 * @time 20:33
 * @by IDE: IntelliJ IDEA
 */
public class producerTest {

    /**
     * 服务注册
     *
     * @throws IOException
     */
    public static void regisSvrTest() throws IOException {

        IRegisterCenter iRegisterCenter = new RegisterCenterImpl();
        // 服务端口由 netty建立
        // com.kikop.myrpcdiy.advancedBasedNetty.server.provider.IOrder
        iRegisterCenter.register(IOrder.class.getName(), "127.0.0.1:9090");
        iRegisterCenter.register(IOrder.class.getName(), "127.0.0.1:9191");
        // 由于是临时节点,进程退出后,节点即被销毁,所以要 in.read
        System.in.read();
    }

    /**
     * 服务注册及启动
     */
    public static void publisherSvrTest_9090() {
        _publisherSvr(new OrderImp(), "127.0.0.1:9090");
    }


    /**
     * 服务注册及启动
     */
    public static void publisherSvrTest_9091() {
        _publisherSvr(new OrderImp(), "127.0.0.1:9091");
    }

    /**
     * 服务注册及启动
     *
     * @param order          订单服务
     * @param serviceAddress 订单服务地址
     */
    private static void _publisherSvr(IOrder order, String serviceAddress) {

        // 1.创建服务1(现实中应该是两条不同的物理主机)
        // 服务端口由 netty建立,发布服务
        IRegisterCenter iRegisterCenter = new RegisterCenterImpl();
        ServerManager serverManager = new ServerManager(serviceAddress, iRegisterCenter);
        // 1.1.创建订单服务类对象
        serverManager.registerSvr(order);
        // 1.2.启动订单服务
        serverManager.startSvr();

    }

    public static void main(String[] args) throws IOException {

//        regisSvrTest();

        publisherSvrTest_9091();
    }
}


4.2模拟客户端发送请求

package com.kikop.advancedBasedNetty.consumer;


import com.kikop.advancedBasedNetty.consumer.proxy.RpcClientProxyFactory;
import com.kikop.advancedBasedNetty.consumer.registry.service.IServiceDiscovery;
import com.kikop.advancedBasedNetty.consumer.registry.service.impl.ServiceDiscoveryImpl;
import com.kikop.advancedBasedNetty.producer.service.IOrder;

import java.io.IOException;
import java.util.stream.IntStream;

/**
 * @author kikop
 * @version 1.0
 * @project Name: myAdvancedRpcBasedNetty
 * @file Name: servertest
 * @desc 功能描述
 * @date 2020/8/29
 * @time 20:33
 * @by IDE: IntelliJ IDEA
 */
public class consumerTest1 {


    /**
     * 查找一个可用服务测试
     *
     * @param clazz
     */
    public static void findAvailableSeriveTest(Class<?> clazz) {

        IServiceDiscovery iServiceDiscovery = new ServiceDiscoveryImpl();
        String serveraddress = iServiceDiscovery.discover(clazz.getName());
        System.out.println(serveraddress);
    }

    public static void businessMethodTest() {

        IServiceDiscovery serviceDiscovery = new ServiceDiscoveryImpl();
        RpcClientProxyFactory rpcClientProxyFactory = new RpcClientProxyFactory(serviceDiscovery);

        // 1.创建接口动态代理类(对应远程服务注解的参数)
        IOrder iOrder = rpcClientProxyFactory.create(IOrder.class);

        // 2.调用测试
        System.out.println(iOrder.buy(2));
    }

    public static void main(String[] args) throws IOException {

//        findAvailableSeriveTest(IOrder.class);

        IntStream.range(0, 10).parallel().forEach(value -> {
            businessMethodTest();
        });
    }


}

参考

相关文章

网友评论

      本文标题:2021-03-14_基于netty请求的远程调用学习

      本文链接:https://www.haomeiwen.com/subject/vvsccltx.html