美文网首页
dubbo消费端调用

dubbo消费端调用

作者: 剑道_7ffc | 来源:发表于2020-06-14 14:03 被阅读0次

消费端调用的过程

调用链

关键方法

InvokerInvocationHandler#invoke

根据服务消费知道service注入的对象InvokerInvocationHandler(MockClusterWrapper(FailoverClusterInvoker(RegistryDirectory))),消费端调用queryPayList实际上调用的是动态生成的queryPayList方法,在生成的queryPayList方法来调用InvokerInvocationHandler的invoke方法。

--动态生成的字节码
public java.lang.String queryPayList() {
    Object[] args = new Object[0];
    Object ret = handler.invoke(this, methods[0], args);
    return (java.lang.String) ret;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
MockClusterInvoker#invoke

Mock用来服务降级的,若出现非业务异常则调用mock方法。

public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;

    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
    if (value.length() == 0 || value.equalsIgnoreCase("false")) {//没有mock
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {
        result = doMockInvoke(invocation, null);
    } else {
        //fail-mock
        try {
            result = this.invoker.invoke(invocation);
        } catch (RpcException e) {
            if (e.isBiz()) {//业务异常
                throw e;
            }
            result = doMockInvoke(invocation, e);//mock方法
        }
    }
    return result;
}
AbstractClusterInvoker#invoke
public Result invoke(final Invocation invocation) throws RpcException {
    // binding attachments(附件) into invocation,通过附件可以追加一些功能如参数的隐式传递
    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addAttachments(contextAttachments);
    }
    
    //做路由
    List<Invoker<T>> invokers = list(invocation);
    //初始化负载均衡
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}
AbstractClusterInvoker#initLoadBalance
protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
    //通过url的loadbalance来获取,默认是random
    if (CollectionUtils.isNotEmpty(invokers)) {
        return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
    } else {
        return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
    }
}
FailoverClusterInvoker#doInvoke
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyInvokers = invokers;
    String methodName = RpcUtils.getMethodName(invocation);
    //retries的重试次数
    int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {//开始重试
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        if (i > 0) {
            copyInvokers = list(invocation);
        }
        //排除已经选择的或不可用的
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        invoked.add(invoker);
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            Result result = invoker.invoke(invocation);
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // biz exception.
                throw e;
            }
            le = e;
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException(le.getCode(), "Failed to invoke the method "
            + methodName + " in the service " + getInterface().getName()
            + ". Tried " + len + " times of the providers " + providers
            + " (" + providers.size() + "/" + copyInvokers.size()
            + ") from the registry " + directory.getUrl().getAddress()
            + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
            + Version.getVersion() + ". Last error is: "
            + le.getMessage(), le.getCause() != null ? le.getCause() : le);
}

路由

RouterChain#route

路由主要是为了筛选出符合规则的服务提供方,主要有条件路由 ConditionRouter、脚本路由 ScriptRouter。可以在读写分离,应用隔离等。

public List<Invoker<T>> route(URL url, Invocation invocation) {
    --在RegistryDirectory保存的invokers,表示服务端的服务地址
    List<Invoker<T>> finalInvokers = invokers;
    for (Router router : routers) {//路由规则
        finalInvokers = router.route(finalInvokers, url, invocation);
    }
    return finalInvokers;
}

负载均衡

通过服务集合中选择合适的服务就是负载均衡。

AbstractLoadBalance#select

通过抽象工厂方法来实现负载均衡,默认的RandomLoadBalance

public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    if (invokers.size() == 1) {
        return invokers.get(0);
    }
    return doSelect(invokers, url, invocation);
}

RandomLoadBalance#doSelect

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    // Number of invokers
    int length = invokers.size();
    // Every invoker has the same weight?
    boolean sameWeight = true;
    // the weight of every invokers
    int[] weights = new int[length];
    // the first invoker's weight
    int firstWeight = getWeight(invokers.get(0), invocation);
    weights[0] = firstWeight;
    // The sum of weights
    int totalWeight = firstWeight;
    for (int i = 1; i < length; i++) {
        int weight = getWeight(invokers.get(i), invocation);
        // save for later use
        weights[i] = weight;
        // Sum
        totalWeight += weight;
        if (sameWeight && weight != firstWeight) {
            sameWeight = false;
        }
    }
    if (totalWeight > 0 && !sameWeight) {
        // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
        int offset = ThreadLocalRandom.current().nextInt(totalWeight);
        // Return a invoker based on the random value.
        for (int i = 0; i < length; i++) {
            offset -= weights[i];
            if (offset < 0) {
                return invokers.get(i);
            }
        }
    }
    // If all invokers have the same weight value or totalWeight=0, return evenly.
    return invokers.get(ThreadLocalRandom.current().nextInt(length));
}

可调用的Invoker初始化过程

RegistryDirectory

RegistryDirectory的成员属性private volatile Map<String, Invoker<T>> urlInvokerMap

RegistryDirectory#toInvokers

完成对urlInvokerMap的赋值

for (URL providerUrl : urls) {//dubbo://
    URL url = mergeUrl(providerUrl);

    String key = url.toFullString(); // The parameter urls are sorted
    invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
    newUrlInvokerMap.put(key, invoker);
}
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
    optimizeSerialization(url);

    // create rpc invoker.
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);

    return invoker;
}

所以invoker是InvokerDelegate(ListenerInvokerWrapper(AsyncToSyncInvoker(DubboInvoker)))

DubboInvoker

AbstractInvoker#invoke

把contextAttachments放入RpcInvocation中

DubboInvoker#doInvoke

protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        //interfaceName
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        //版本号
        inv.setAttachment(VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);//超时时间
            if (isOneway) {//判断是否有返回值
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
                CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
                responseFuture.whenComplete((obj, t) -> {
                    if (t != null) {
                        asyncRpcResult.completeExceptionally(t);
                    } else {
                        asyncRpcResult.complete((AppResponse) obj);
                    }
                });
                RpcContext.getContext().setFuture(new FutureAdapter(asyncRpcResult));
                return asyncRpcResult;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

currentClient.request

currentClient是ReferenceCountExchangeClient(HeaderExchangeClient())对象,调用链路是ReferenceCountExchangeClient-->HeaderExchangeClient-->HeaderExchangeChannel,最后调用send方法

public void send(Object message, boolean sent) throws RemotingException {
  
    Request request = new Request();
    request.setVersion(Version.getProtocolVersion());
    request.setTwoWay(false);
    request.setData(message);
        //NettyClient
    channel.send(request, sent);
}

NettyClient的channel属性是new DecodeHandler(new HeaderExchangeHandler(ExchangeHandlerAdapter))
channel.send调用链路是AbstractClient.send->NettyChannel.send

public void send(Object message, boolean sent) throws RemotingException {
    super.send(message, sent);

    boolean success = true;
    int timeout = 0;
    try {
        ChannelFuture future = channel.writeAndFlush(message);
        if (sent) {
            timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
            success = future.await(timeout);
        }
        Throwable cause = future.cause();
        if (cause != null) {
            throw cause;
        }
    } catch (Throwable e) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
    }

    if (!success) {
        throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                + "in timeout(" + timeout + "ms) limit");
    }
}

相关文章

  • dubbo消费端调用

    消费端调用的过程 调用链 关键方法 InvokerInvocationHandler#invoke 根据服务消费知...

  • dubbo里的rpc调用

    说明:以下仅供个人学习整理 1. dubbo里的rpc调用示例图 2. dubbo里的rpc调用xml配置(消费端...

  • Dubbo剖析-异步调用实现

    一、前言 前面文章讲解了dubbo服务消费方异步调用使用,本文就来讲解如何实现的异步调用。 二、异步调用服务消费端...

  • dubbo中的Filter链原理及应用

    filter在dubbo中的应用非常广泛,它可以对服务端、消费端的调用过程进行拦截,从而对dubbo进行功能上的扩...

  • Dubbo消费端

    Dubbo消费端可以让我们在调用远程服务时本身是无感知的,就像在本地调用方法一样。 消费端主要内容有配置初始化、服...

  • Dubbo的服务调用(消费端)

    本系列参考官网文档、芋道源码的源码解读和《深入理解Apache Dubbo与实战》一书。Dubbo版本为2.6.1...

  • Dubbo剖析-服务降级

    一、前言 dubbo提供了一些服务降级措施,当服务提供端某一个非关键的服务出错时候,dubbo可以对消费端的调用进...

  • Dubbo后台管理和监控中心部署

    通过dubbo监控中心和后台管理可以很好的监控dubbo服务,监控服务端服务和客户端调用情况,调用次数,调用日志,...

  • dubbo整体架构

    一、dubbo 核心调用链路 消费者、生产者、注册中心、监控中心 二、dubbo详细流程调用图 三、dubbo 分...

  • 命令行调用Dubbo服务

    Dubbo 服务端支持 telnet 调用,具体如下: telnet 连接到 dubbo 服务端 查看服务端接口列...

网友评论

      本文标题:dubbo消费端调用

      本文链接:https://www.haomeiwen.com/subject/ppiwtktx.html