美文网首页
Zookeeper应用-rpc改造

Zookeeper应用-rpc改造

作者: 剑道_7ffc | 来源:发表于2020-05-07 08:05 被阅读0次

整体思路

服务端:将key:服务名称,value:ip和端口号注册给zookeeper
客户端:根据服务名称,从zk中获取ip和端口号集合,并做负载均衡

具体代码

服务端

public class ZkConfig {

    public static String CONNECTION_STR="192.168.13.102:2181,192.168.13.103:2181,192.168.13.104:2181";

}
public interface IRegistryCenter {

    /**
     * 服务注册名称和服务注册地址实现服务的管理
     * @param serviceName
     * @param serviceAddress
     */
    void registry(String serviceName,String serviceAddress);
}
public class RegistryCenterWithZk implements IRegistryCenter{

    CuratorFramework curatorFramework =null;

    {
        //初始化zookeeper的连接, 会话超时时间是5s,衰减重试
        curatorFramework = CuratorFrameworkFactory.builder().
                connectString(ZkConfig.CONNECTION_STR).sessionTimeoutMs(5000).
                retryPolicy(new ExponentialBackoffRetry(1000, 3)).
                namespace("registry")
                .build();
        curatorFramework.start();
    }

    @Override
    public void registry(String serviceName, String serviceAddress) {
        String servicePath="/"+serviceName;
        try {
            //判断节点是否存在
            if(curatorFramework.checkExists().forPath(servicePath)==null){
                curatorFramework.create().creatingParentsIfNeeded().
                        withMode(CreateMode.PERSISTENT).forPath(servicePath);
            }
            //serviceAddress: ip:port
            String addressPath=servicePath+"/"+serviceAddress;
            curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(addressPath);
            System.out.println("服务注册成功");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

客户端

public class ZkConfig {

    public static String CONNECTION_STR="192.168.13.102:2181,192.168.13.103:2181,192.168.13.104:2181";
}
public interface IServiceDiscovery {

    //根据服务名称返回服务地址
    String  discovery(String serviceName);
}
public class ServiceDiscoveryWithZk implements IServiceDiscovery{

    CuratorFramework curatorFramework =null;

    List<String> serviceRepos=new ArrayList<>(); //服务地址的本地缓存
    {
        //初始化zookeeper的连接, 会话超时时间是5s,衰减重试
        curatorFramework = CuratorFrameworkFactory.builder().
                connectString(ZkConfig.CONNECTION_STR).sessionTimeoutMs(5000).
                retryPolicy(new ExponentialBackoffRetry(1000, 3)).
                namespace("registry")
                .build();
        curatorFramework.start();
    }

    /**
     * 服务的查找
     * 设置监听
     *
     * @param serviceName
     * @return
     */
    @Override
    public String discovery(String serviceName) {
        //完成了服务地址的查找(服务地址被删除)
        String path="/"+serviceName; //registry/com.gupaoedu.demo.HelloService
        if(serviceRepos.isEmpty()) {
            try {
                serviceRepos = curatorFramework.getChildren().forPath(path);
                registryWatch(path);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //针对已有的地址做负载均衡
        LoadBalanceStrategy loadBalanceStrategy=new RandomLoadBalance();
        return loadBalanceStrategy.selectHost(serviceRepos);
    }

    private void registryWatch(final String path) throws Exception {
        PathChildrenCache nodeCache=new PathChildrenCache(curatorFramework,path,true);
        PathChildrenCacheListener nodeCacheListener= (curatorFramework1, pathChildrenCacheEvent) -> {
            System.out.println("客户端收到节点变更的事件");
            serviceRepos=curatorFramework1.getChildren().forPath(path);// 再次更新本地的缓存地址
        };
        nodeCache.getListenable().addListener(nodeCacheListener);
        nodeCache.start();

    }
}
public interface LoadBalanceStrategy {

    String selectHost(List<String> repos);

}
public abstract class AbstractLoadBalance implements LoadBalanceStrategy{
    @Override
    public String selectHost(List<String> repos) {
        //repos可能为空, 可能只有一个。
        if(repos==null||repos.size()==0){
            return null;
        }
        if(repos.size()==1){
            return repos.get(0);
        }
        return doSelect(repos);
    }

    protected abstract String doSelect(List<String> repos);

}
public class RandomLoadBalance extends AbstractLoadBalance{

    @Override
    protected String doSelect(List<String> repos) {
        int length=repos.size();
        Random random=new Random(); //从repos的集合内容随机获得一个地址
        return repos.get(random.nextInt(length));
    }
}

相关文章

  • Zookeeper应用-rpc改造

    整体思路 服务端:将key:服务名称,value:ip和端口号注册给zookeeper客户端:根据服务名称,从zk...

  • RPC

    RPC原理及RPC实例分析 Dubbo与Zookeeper、SpringMVC整合和使用(负载均衡、容错)

  • 微服务基础

    一,什么是微服务 把传统的单击应用中的本地方法调用,改造成通过RPC,HTTP产生的远程方法调用 把模块从单体应用...

  • 基于netty的简易RPC框架

    一、简介 基于Zookeeper、Netty和Spring写了一个轻量级的分布式RPC框架。 RPC,即 Remo...

  • Zookeeper适合做服务发现吗

    在Dubbo这个RPC框架中,Zookeeper充当着注册中心的角色,那Zookeeper适合做服务发现吗?今天咱...

  • 微服务概览

    服务化: 服务化就是把传统的单机应用中通过 JAR 包依赖产生的本地方法调用,改造成通过 RPC 接口产生的远程方...

  • Zookeeper学习-05 Zookeeper总体架构

    1、Zookeeper总体架构 应用使用Zookeeper客户端库使用Zookeeper服务。Zookeeper客...

  • Thrift RPC实战(八) .基于Zookeeper服务的R

    概述 在上一篇基于基于Zookeeper服务的RPC服务发布订阅中我们已经构建了一个rpc服务,但是这个服务的代码...

  • ZooKeeper概述

    原文地址ZooKeeper Overview ZooKeeper ZooKeeper: 分布式应用中的协调服务设计...

  • 【三】Zookeeper应用-基于ZooKeeper的分布式Se

    【Zookeeper应用】 基于ZooKeeper的分布式Session实现 1 为什么使用ZooKeeper 目...

网友评论

      本文标题:Zookeeper应用-rpc改造

      本文链接:https://www.haomeiwen.com/subject/cnznghtx.html