美文网首页@IT·互联网
ZooKeeper客户端服务器通信协议分析

ZooKeeper客户端服务器通信协议分析

作者: 兽怪海北 | 来源:发表于2020-10-17 19:09 被阅读0次

上一篇启动分析我们提到,ServerCnxnFactory负责接收客户端请求。ServerCnxnFactory有两个实现,NIOServerCnxnFactory和NettyServerCnxnFactory,因为通信协议与实现无关,所以我们只分析NettyServerCnxnFactory。
TCP服务器的启动代码如下

    cnxnFactory = ServerCnxnFactory.createFactory();
    cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
    cnxnFactory.startup(zkServer);

ServerCnxnFactory.createFactory根据zookeeper.serverCnxnFactory这个jvm属性的值来实例化对应的ServerCnxnFactory并返回。cnxnFactory.configure配置ServerCnxnFactory的监听地址、最大连接数、背压和是否使用安全连接(SSL)。
进入到NettyServerCnxnFactory的构造器

    NettyServerCnxnFactory() {
        x509Util = new ClientX509Util();

        boolean usePortUnification = Boolean.getBoolean(PORT_UNIFICATION_KEY);
        LOG.info("{}={}", PORT_UNIFICATION_KEY, usePortUnification);
        if (usePortUnification) {
            try {
                QuorumPeerConfig.configureSSLAuth();
            } catch (QuorumPeerConfig.ConfigException e) {
                LOG.error("unable to set up SslAuthProvider, turning off client port unification", e);
                usePortUnification = false;
            }
        }
        this.shouldUsePortUnification = usePortUnification;

        this.advancedFlowControlEnabled = Boolean.getBoolean(NETTY_ADVANCED_FLOW_CONTROL);
        LOG.info("{} = {}", NETTY_ADVANCED_FLOW_CONTROL, this.advancedFlowControlEnabled);

        setOutstandingHandshakeLimit(Integer.getInteger(OUTSTANDING_HANDSHAKE_LIMIT, -1));

        EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup(NettyUtils.getClientReachableLocalInetAddressCount());
        EventLoopGroup workerGroup = NettyUtils.newNioOrEpollEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workerGroup)
                                                         .channel(NettyUtils.nioOrEpollServerSocketChannel())
                                                         // parent channel options
                                                         .option(ChannelOption.SO_REUSEADDR, true)
                                                         // child channels options
                                                         .childOption(ChannelOption.TCP_NODELAY, true)
                                                         .childOption(ChannelOption.SO_LINGER, -1)
                                                         .childHandler(new ChannelInitializer<SocketChannel>() {
                                                             @Override
                                                             protected void initChannel(SocketChannel ch) throws Exception {
                                                                 ChannelPipeline pipeline = ch.pipeline();
                                                                 if (advancedFlowControlEnabled) {
                                                                     pipeline.addLast(readIssuedTrackingHandler);
                                                                 }
                                                                 if (secure) {
                                                                     initSSL(pipeline, false);
                                                                 } else if (shouldUsePortUnification) {
                                                                     initSSL(pipeline, true);
                                                                 }
                                                                 pipeline.addLast("servercnxnfactory", channelHandler);
                                                             }
                                                         });
        this.bootstrap = configureBootstrapAllocator(bootstrap);
        this.bootstrap.validate();
    }

主要包含以下步骤:

  1. 读取zookeeper.client.portUnification这个jvm属性的值,如果这个属性为true,且securety为false,则连接是否使用SSL由客户端决定。
  2. 读取zookeeper.netty.advancedFlowControl.enabled的jvm属性的值,如果这个属性为true,将会开启高级流控模式,在高级流控模式下,Netty的autoread会被禁用,在一次读取完成后是否继续读取由应用自己决定。
  3. 初始化Netty服务器,关键的代码是:
                                                         .childHandler(new ChannelInitializer<SocketChannel>() {
                                                             @Override
                                                             protected void initChannel(SocketChannel ch) throws Exception {
                                                                 ChannelPipeline pipeline = ch.pipeline();
                                                                 if (advancedFlowControlEnabled) {
                                                                     pipeline.addLast(readIssuedTrackingHandler);
                                                                 }
                                                                 if (secure) {
                                                                     initSSL(pipeline, false);
                                                                 } else if (shouldUsePortUnification) {
                                                                     initSSL(pipeline, true);
                                                                 }
                                                                 pipeline.addLast("servercnxnfactory", channelHandler); //channelHandler是CnxnChannelHandler的一个实例对象
                                                             }
                                                         });

readIssuedTrackingHandler负责跟踪在高级流控模式下,一次读取完成后调用了多少次read。initSSL初始化SSL相关的handler,这意味着所有的请求数据都由channelHandler处理。

接下来看ServerCnxnFactory的startup方法:

    public void startup(ZooKeeperServer zkServer) throws IOException, InterruptedException {
        startup(zkServer, true);
    }

继续看NettyServerCnxnFactory的startup方法:

    public void startup(ZooKeeperServer zks, boolean startServer) throws IOException, InterruptedException {
        start();
        setZooKeeperServer(zks);
        if (startServer) {
            zks.startdata();
            zks.startup();
        }
    }

ZookeeperServer类的启动方法暂时不分析,本篇只分析协议部分,继续看NettyServerCnxnFactory的start方法:

    public void start() {
        if (listenBacklog != -1) {
            bootstrap.option(ChannelOption.SO_BACKLOG, listenBacklog);
        }
        LOG.info("binding to port {}", localAddress);
        parentChannel = bootstrap.bind(localAddress).syncUninterruptibly().channel();
        // Port changes after bind() if the original port was 0, update
        // localAddress to get the real port.
        localAddress = (InetSocketAddress) parentChannel.localAddress();
        LOG.info("bound to port {}", getLocalPort());
    }

这里的主要逻辑是监听端口并开始接收客户端请求。
前面我们讲了,请求最终实际上是由channelHandler处理的,我们首先来看CnxnChannelHandler的channelActive方法,这个方法在TCP连接建立后被调用。

        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Channel active {}", ctx.channel());
            }

            final Channel channel = ctx.channel();
            if (limitTotalNumberOfCnxns()) {
                ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
                channel.close();
                return;
            }
            InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
            if (maxClientCnxns > 0 && getClientCnxnCount(addr) >= maxClientCnxns) {
                ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);
                LOG.warn("Too many connections from {} - max is {}", addr, maxClientCnxns);
                channel.close();
                return;
            }

            NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this);
            ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn);

            // Check the zkServer assigned to the cnxn is still running,
            // close it before starting the heavy TLS handshake
            if (!cnxn.isZKServerRunning()) {
                LOG.warn("Zookeeper server is not running, close the connection before starting the TLS handshake");
                ServerMetrics.getMetrics().CNXN_CLOSED_WITHOUT_ZK_SERVER_RUNNING.add(1);
                channel.close();
                return;
            }

            if (handshakeThrottlingEnabled) {
                // Favor to check and throttling even in dual mode which
                // accepts both secure and insecure connections, since
                // it's more efficient than throttling when we know it's
                // a secure connection in DualModeSslHandler.
                //
                // From benchmark, this reduced around 15% reconnect time.
                int outstandingHandshakesNum = outstandingHandshake.addAndGet(1);
                if (outstandingHandshakesNum > outstandingHandshakeLimit) {
                    outstandingHandshake.addAndGet(-1);
                    channel.close();
                    ServerMetrics.getMetrics().TLS_HANDSHAKE_EXCEEDED.add(1);
                } else {
                    cnxn.setHandshakeState(HandshakeState.STARTED);
                }
            }

            if (secure) {
                SslHandler sslHandler = ctx.pipeline().get(SslHandler.class);
                Future<Channel> handshakeFuture = sslHandler.handshakeFuture();
                handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn));
            } else if (!shouldUsePortUnification) {
                allChannels.add(ctx.channel());
                addCnxn(cnxn);
            }
        }

主要步骤如下:

  1. 判断是否超过总连接数限制,如果超过,直接断开连接。
  2. 判断是否超过单地址连接数限制,如果超过,直接断开连接。
  3. 实例化NettyServerCnxn类并附到channel的attribution里。
  4. 判断ZookeeperServer类是否启动完成,如果没有,直接断开连接。
  5. SSL握手相关代码,不重要,暂不分析。
  6. 将所有非安全的连接和SSL握手完成的channel放入allChannels中,将cnxn放入cnxns中,并按客户端地址统计连接数放入ipMap中。

接下来看channelRead方法,该方法在收到客户端数据后被调用。

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try {
                if (LOG.isTraceEnabled()) {
                    LOG.trace("message received called {}", msg);
                }
                try {
                    LOG.debug("New message {} from {}", msg, ctx.channel());
                    NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get();
                    if (cnxn == null) {
                        LOG.error("channelRead() on a closed or closing NettyServerCnxn");
                    } else {
                        cnxn.processMessage((ByteBuf) msg);
                    }
                } catch (Exception ex) {
                    LOG.error("Unexpected exception in receive", ex);
                    throw ex;
                }
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }

逻辑比较简单,调用cnxn.processMessage方法并将收到的数据传递过去。
接下来看NettyServerCnxn的processMessage方法:

    void processMessage(ByteBuf buf) {
        checkIsInEventLoop("processMessage");
        LOG.debug("0x{} queuedBuffer: {}", Long.toHexString(sessionId), queuedBuffer);

        if (LOG.isTraceEnabled()) {
            LOG.trace("0x{} buf {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(buf));
        }

        if (throttled.get()) {
            LOG.debug("Received message while throttled");
            // we are throttled, so we need to queue
            if (queuedBuffer == null) {
                LOG.debug("allocating queue");
                queuedBuffer = channel.alloc().compositeBuffer();
            }
            appendToQueuedBuffer(buf.retainedDuplicate());
            if (LOG.isTraceEnabled()) {
                LOG.trace("0x{} queuedBuffer {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(queuedBuffer));
            }
        } else {
            LOG.debug("not throttled");
            if (queuedBuffer != null) {
                appendToQueuedBuffer(buf.retainedDuplicate());
                processQueuedBuffer();
            } else {
                receiveMessage(buf);
                // Have to check !closingChannel, because an error in
                // receiveMessage() could have led to close() being called.
                if (!closingChannel && buf.isReadable()) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Before copy {}", buf);
                    }

                    if (queuedBuffer == null) {
                        queuedBuffer = channel.alloc().compositeBuffer();
                    }
                    appendToQueuedBuffer(buf.retainedSlice(buf.readerIndex(), buf.readableBytes()));
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("Copy is {}", queuedBuffer);
                        LOG.trace("0x{} queuedBuffer {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(queuedBuffer));
                    }
                }
            }
        }
    }

主要流程如下:

  1. 判断当前是否处于禁止处理数据包的状态。如果是,将数据追加到queuedBuffer缓冲区内,不进行包解析操作。
  2. 如果当前允许处理数据包,则调用processQueuedBuffer或receiveMessage处理协议包,二者的逻辑基本相同。processQueuedBuffer会调用receiveMessage试图处理协议包,如果协议包处理完成,则清理queuedBuffer已读取的部分。queuedBuffer的存在是为了处理粘包和半包问题,核心的处理逻辑都是receiveMessage。

接下来看NettyServerCnxn的receiveMessage方法:

    private void receiveMessage(ByteBuf message) {
        checkIsInEventLoop("receiveMessage");
        try {
            while (message.isReadable() && !throttled.get()) {
                if (bb != null) {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("message readable {} bb len {} {}", message.readableBytes(), bb.remaining(), bb);
                        ByteBuffer dat = bb.duplicate();
                        dat.flip();
                        LOG.trace("0x{} bb {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
                    }

                    if (bb.remaining() > message.readableBytes()) {
                        int newLimit = bb.position() + message.readableBytes();
                        bb.limit(newLimit);
                    }
                    message.readBytes(bb);
                    bb.limit(bb.capacity());

                    if (LOG.isTraceEnabled()) {
                        LOG.trace("after readBytes message readable {} bb len {} {}", message.readableBytes(), bb.remaining(), bb);
                        ByteBuffer dat = bb.duplicate();
                        dat.flip();
                        LOG.trace("after readbytes 0x{} bb {}",
                                  Long.toHexString(sessionId),
                                  ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
                    }
                    if (bb.remaining() == 0) {
                        bb.flip();
                        packetReceived(4 + bb.remaining());

                        ZooKeeperServer zks = this.zkServer;
                        if (zks == null || !zks.isRunning()) {
                            throw new IOException("ZK down");
                        }
                        if (initialized) {
                            // TODO: if zks.processPacket() is changed to take a ByteBuffer[],
                            // we could implement zero-copy queueing.
                            zks.processPacket(this, bb);
                        } else {
                            LOG.debug("got conn req request from {}", getRemoteSocketAddress());
                            zks.processConnectRequest(this, bb);
                            initialized = true;
                        }
                        bb = null;
                    }
                } else {
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("message readable {} bblenrem {}", message.readableBytes(), bbLen.remaining());
                        ByteBuffer dat = bbLen.duplicate();
                        dat.flip();
                        LOG.trace("0x{} bbLen {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat)));
                    }

                    if (message.readableBytes() < bbLen.remaining()) {
                        bbLen.limit(bbLen.position() + message.readableBytes());
                    }
                    message.readBytes(bbLen);
                    bbLen.limit(bbLen.capacity());
                    if (bbLen.remaining() == 0) {
                        bbLen.flip();

                        if (LOG.isTraceEnabled()) {
                            LOG.trace("0x{} bbLen {}", Long.toHexString(sessionId), ByteBufUtil.hexDump(Unpooled.wrappedBuffer(bbLen)));
                        }
                        int len = bbLen.getInt();
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("0x{} bbLen len is {}", Long.toHexString(sessionId), len);
                        }

                        bbLen.clear();
                        if (!initialized) {
                            if (checkFourLetterWord(channel, message, len)) {
                                return;
                            }
                        }
                        if (len < 0 || len > BinaryInputArchive.maxBuffer) {
                            throw new IOException("Len error " + len);
                        }
                        // checkRequestSize will throw IOException if request is rejected
                        zkServer.checkRequestSizeWhenReceivingMessage(len);
                        bb = ByteBuffer.allocate(len);
                    }
                }
            }
        } catch (IOException e) {
            LOG.warn("Closing connection to {}", getRemoteSocketAddress(), e);
            close(DisconnectReason.IO_EXCEPTION);
        } catch (ClientCnxnLimitException e) {
            // Common case exception, print at debug level
            ServerMetrics.getMetrics().CONNECTION_REJECTED.add(1);

            LOG.debug("Closing connection to {}", getRemoteSocketAddress(), e);
            close(DisconnectReason.CLIENT_RATE_LIMIT);
        }
    }

主要流程如下:

  1. 判断bb(存储完整的协议包的缓冲区)是不是null,如果是null,代表协议头(协议包长度或者四字指令)还没有完整读取,如果不是null,代表协议头已经完整读取,但是协议体还没完整读取。
  2. 如果需要,读取协议头。
  3. 调用checkFourLetterWord,通过协议头判断是否是四字指令,四字指令不需要协议体(stmk除外,stmk需要一个8字节的参数。),如果是四字指令则直接处理,四字指令不需要经过session初始化(不需要握手等流程,握手后无法执行四字指令。)。
  4. 如果不是四字指令,读取协议体到bb,调用zks.processConnectRequest开始握手,握手后设置initialized为true。
  5. 认证后,后续的协议包通过调用zks.processPacket处理。

四字指令只包含一个四字节长的协议头。主要用来获取ZooKeeper服务的当前状态及相关信息,每次服务器响应四字指令后都会关闭连接。
四字指令表代码(位于org.apache.zookeeper.server.command.FourLetterCommands)如下:

    static {
        cmd2String.put(confCmd, "conf");
        cmd2String.put(consCmd, "cons");
        cmd2String.put(crstCmd, "crst");
        cmd2String.put(dirsCmd, "dirs");
        cmd2String.put(dumpCmd, "dump");
        cmd2String.put(enviCmd, "envi");
        cmd2String.put(getTraceMaskCmd, "gtmk");
        cmd2String.put(ruokCmd, "ruok");
        cmd2String.put(setTraceMaskCmd, "stmk");
        cmd2String.put(srstCmd, "srst");
        cmd2String.put(srvrCmd, "srvr");
        cmd2String.put(statCmd, "stat");
        cmd2String.put(wchcCmd, "wchc");
        cmd2String.put(wchpCmd, "wchp");
        cmd2String.put(wchsCmd, "wchs");
        cmd2String.put(mntrCmd, "mntr");
        cmd2String.put(isroCmd, "isro");
        cmd2String.put(telnetCloseCmd, "telnet close");
        cmd2String.put(hashCmd, "hash");
    }

简介如下:

指令 描述
conf 获取服务相关配置的详细信息。
cons 获取所有连接到这台服务器的客户端的详细统计信息。
crst 重置所有连接到这台服务器的客户端的详细统计信息。
dirs 获取数据和日志的总大小。
dump 获取比较重要的会话和临时节点。如果在集群模式下这个命令只能在leader节点上用。
envi 获取出服务环境的详细信息。
gtmk 获取text trace mask。
ruok 测试服务是否处于正确状态,正确状态会返回字符串imok。
stmk 设置text trace mask。
srst 重置服务器的统计信息。
srvr 获取连接服务器的详细信息。
stat 获取关于性能和连接的客户端的列表。
wchc 获取服务器watch的详细信息。
wchp 获取服务器watch的详细信息,按路径分组排序输出。
wchs 获取服务器watch的摘要信息。
mntr 获取监控变量值。
isro 获取服务器是否是只读的服务器。
0xfff4fffd(telnet close) telnet关闭连接前会发送这个,因为是个负数,添加这个命令只是为了防止异常退出。
hash 获取最近的摘要(digest)日志。

忽略握手相关过程,我们来看ZookeeperServer的processPacket方法:

    public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
        // We have the request, now process and setup for next
        InputStream bais = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
        RequestHeader h = new RequestHeader();
        h.deserialize(bia, "header");

        // Need to increase the outstanding request count first, otherwise
        // there might be a race condition that it enabled recv after
        // processing request and then disabled when check throttling.
        //
        // Be aware that we're actually checking the global outstanding
        // request before this request.
        //
        // It's fine if the IOException thrown before we decrease the count
        // in cnxn, since it will close the cnxn anyway.
        cnxn.incrOutstandingAndCheckThrottle(h);

        // Through the magic of byte buffers, txn will not be
        // pointing
        // to the start of the txn
        incomingBuffer = incomingBuffer.slice();
        if (h.getType() == OpCode.auth) {
            LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
            AuthPacket authPacket = new AuthPacket();
            ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
            String scheme = authPacket.getScheme();
            ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
            Code authReturn = KeeperException.Code.AUTHFAILED;
            if (ap != null) {
                try {
                    // handleAuthentication may close the connection, to allow the client to choose
                    // a different server to connect to.
                    authReturn = ap.handleAuthentication(
                        new ServerAuthenticationProvider.ServerObjs(this, cnxn),
                        authPacket.getAuth());
                } catch (RuntimeException e) {
                    LOG.warn("Caught runtime exception from AuthenticationProvider: {}", scheme, e);
                    authReturn = KeeperException.Code.AUTHFAILED;
                }
            }
            if (authReturn == KeeperException.Code.OK) {
                LOG.debug("Authentication succeeded for scheme: {}", scheme);
                LOG.info("auth success {}", cnxn.getRemoteSocketAddress());
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
                cnxn.sendResponse(rh, null, null);
            } else {
                if (ap == null) {
                    LOG.warn(
                        "No authentication provider for scheme: {} has {}",
                        scheme,
                        ProviderRegistry.listProviders());
                } else {
                    LOG.warn("Authentication failed for scheme: {}", scheme);
                }
                // send a response...
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.AUTHFAILED.intValue());
                cnxn.sendResponse(rh, null, null);
                // ... and close connection
                cnxn.sendBuffer(ServerCnxnFactory.closeConn);
                cnxn.disableRecv();
            }
            return;
        } else if (h.getType() == OpCode.sasl) {
            processSasl(incomingBuffer, cnxn, h);
        } else {
            if (shouldRequireClientSaslAuth() && !hasCnxSASLAuthenticated(cnxn)) {
                ReplyHeader replyHeader = new ReplyHeader(h.getXid(), 0, Code.SESSIONCLOSEDREQUIRESASLAUTH.intValue());
                cnxn.sendResponse(replyHeader, null, "response");
                cnxn.sendCloseSession();
                cnxn.disableRecv();
            } else {
                Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
                int length = incomingBuffer.limit();
                if (isLargeRequest(length)) {
                    // checkRequestSize will throw IOException if request is rejected
                    checkRequestSizeWhenMessageReceived(length);
                    si.setLargeRequestSize(length);
                }
                si.setOwner(ServerCnxn.me);
                submitRequest(si);
            }
        }
    }
  1. 首先从前面读出来的协议体里读出请求头,请求头包含两个部分xid和type,都是四字节的整数。
  2. 递增处理中请求计数器,并判断服务器是否积压了太多请求未完成的请求,如果积压了太多的请求,则暂停从客户端接收数据。
  3. 判断是否是认证请求,如果是认证请求,则开始认证并返回认证结果。
  4. 判断是否是SASL认证请求,如果是,则开始SASL认证并返回认证结果。
  5. 否则判断是否强制要求SASL认证且SASL认证认证未完成,如果是,关闭连接。
  6. 提交请求到请求队列(在大请求的情况下,如果积压了过多的大请求,会拒绝该请求。)。

不同请求处理的具体逻辑暂不分析,后面的文章可能会继续分析,我们来看Request的结构,找到org.apache.zookeeper.ZooDefs下的OpCode,这里存储了所有的操作码:

    public interface OpCode {
        int notification = 0; 
        int create = 1;
        int delete = 2;
        int exists = 3;
        int getData = 4;
        int setData = 5;
        int getACL = 6;
        int setACL = 7;
        int getChildren = 8;
        int sync = 9;
        int ping = 11;
        int getChildren2 = 12;
        int check = 13;
        int multi = 14;
        int create2 = 15;
        int reconfig = 16;
        int checkWatches = 17;
        int removeWatches = 18;
        int createContainer = 19;
        int deleteContainer = 20;
        int createTTL = 21;
        int multiRead = 22;
        int auth = 100;
        int setWatches = 101;
        int sasl = 102;
        int getEphemerals = 103;
        int getAllChildrenNumber = 104;
        int setWatches2 = 105;
        int addWatch = 106;
        int createSession = -10;
        int closeSession = -11;
        int error = -1;
    }

结合org.apache.zookeeper.proto下的请求实体类,我们可以总结出所有的request的相关信息(N/A代表只有响应或请求头,没有响应体或请求体。):

操作码 作用 参数 请求实体 响应实体
notification N/A N/A N/A N/A
create 创建节点 path,data,acl,flags CreateRequest CreateResponse
delete 删除节点 path,version DeleteRequest N/A
exists 判断节点是否存在,可选择在节点存在时监听该节点变化 path,watch ExistsRequest ExistsResponse
getData 获取节点存储的值,可选择在节点存在时监听该节点变化 path,watch GetDataRequest GetDataResponse
setData 设置节点存储的值 path,data,version SetDataRequest SetDataResponse
getACL 获取节点的访问权限控制列表 path GetACLRequest GetACLResponse
setACL 设置节点的访问权限控制列表 path,acl,version SetACLRequest SetACLResponse
getChildren 获取节点的子节点,可选择在节点存在时监听该节点子节点变化 path,watch GetChildrenRequest GetChildrenResponse
sync 集群模式下使用,与leader同步指定path的数据 path SyncRequest SyncResponse
ping 心跳包 N/A N/A N/A
getChildren2 获取节点的子节点,可选择在节点存在时监听该节点子节点变化,与getChildren相比,该操作码会返回path对应节点的详细状态信息 path,watch GetChildren2Request GetChildren2Response
check 判断version是否为最新版本,与multi配合使用 path,version CheckVersionRequest SetDataResponse
multi 将多个操作放到一个事务里处理 N/A MultiOperationRecord MultiResponse
create2 创建节点,与create相比,该操作码会返回创建后的节点的详细状态信息 path,data,acl,flags CreateRequest Create2Response
reconfig 集群模式下使用,触发集群的重新计票选举 joiningServers,leavingServers,newMembers,curConfigId ReconfigRequest GetDataResponse
checkWatches 检查当前连接是否在监听path对应节点的变化,type有三种任意变化、子节点变化和当前节点数据变化 path,type CheckWatchesRequest N/A
removeWatches 停止监听path对应的节点 path,type RemoveWatchesRequest N/A
createContainer 创建container节点 path,data,acl,flags CreateRequest Create2Response
deleteContainer 删除container节点 path DeleteRequest N/A
createTTL 创建有TTL时间的节点 path,data,acl,flags,ttl CreateTTLRequest Create2Response
multiRead 一次获取多个节点的值或者子节点 N/A MultiOperationRecord MultiResponse
auth 认证包 type,scheme,auth AuthPacket N/A
setWatches 监听节点 relativeZxid,dataWatches,existWatches,childWatches SetWatches N/A
sasl SASL认证 token GetSASLRequest SetSASLResponse
getEphemerals 获取创建的以prefixPath开头的临时节点 prefixPath GetEphemeralsRequest GetEphemeralsResponse
getAllChildrenNumber 获取path下子节点的数量 path GetAllChildrenNumberRequest GetAllChildrenNumberResponse
setWatches2 监听节点,与setWatches相比,增加了持久化监听类型 relativeZxid,dataWatches,existWatches,childWatches,persistentWatches,persistentRecursiveWatches SetWatches2 N/A
addWatch 为节点添加watch path,mode AddWatchRequest ErrorResponse
createSession 创建session protocolVersion,lastZxidSeen,timeOut,sessionId,passwd ConnectRequest ConnectResponse
closeSession 关闭session N/A N/A N/A
error 与multi一起使用,表示命令执行错误 N/A N/A N/A

Zookeeper的C/S通信协议没有采用http协议,而是使用TCP上的自定义协议。这样做的好处时相比http,通信效率更高,watch更加容易实现(长连接,全双工双向通信),缺点是每当命令参数改变时,都要引入新的命令。
最后我们总结下Zookeeper的C/S自定义TCP通信协议的包结构。
请求包(客户端到服务器,非四字指令):

协议头 请求头 请求体
4字节 8字节 协议头表示的长度 - 8

响应包(服务器到客户端):

协议头 响应头 响应体
4字节 16字节 协议头表示的长度 - 16

集合、数组和向量表示:

大小 元素1-n
4字节 n*元素大小

四字指令(客户端到服务器):

指令码 参数
4字节 0或8字节

四字指令响应(服务器到客户端):

响应内容
不定长度

相关文章

网友评论

    本文标题:ZooKeeper客户端服务器通信协议分析

    本文链接:https://www.haomeiwen.com/subject/lsdgyktx.html