美文网首页
再探Pinpoint Collector(一)

再探Pinpoint Collector(一)

作者: 我是嘻哈大哥 | 来源:发表于2019-06-21 20:22 被阅读0次

Collector主要是接收Agent发送过来的数据并将数据存储到Hbase中,因为采用了Spring框架,所以重点关注applicationContext-collector.xml这个文件。

这个文件中注入了很多的Bean,下面就关键的Bean进行介绍。

1.tcpReceiver

    <bean id="tcpReceiver" class="com.navercorp.pinpoint.collector.receiver.tcp.AgentBaseDataReceiver" >
        <constructor-arg ref="baseDataReceiverConfig"/>     //基本配置
        <constructor-arg ref="tcpReceiverExecutor"/>        //接收执行器
        <constructor-arg ref="acceptor"/>                   //接收器
        <constructor-arg ref="tcpDispatchHandlerWrapper"/>  //分发处理器
        <constructor-arg ref="clusterService"/>             //集群服务
    </bean>

基本配置:主要是定义绑定的IP、端口、工作线程大小、队列大小、是否被监控

    <bean id="baseDataReceiverConfig" class="com.navercorp.pinpoint.collector.config.AgentBaseDataReceiverConfiguration">
        <constructor-arg ref="pinpoint_collector_properties"/>
        <constructor-arg ref="deprecatedConfig"/>
    </bean>

接收执行器:定义了一个线程池工厂Bean

    <bean id="tcpReceiverExecutor" class="com.navercorp.pinpoint.common.server.util.ThreadPoolExecutorFactoryBean" >
        <property name="corePoolSize" value="#{baseDataReceiverConfig.workerThreadSize}"/>
        <property name="maxPoolSize" value="#{baseDataReceiverConfig.workerThreadSize}"/>
        <property name="queueCapacity" value="#{baseDataReceiverConfig.workerQueueSize}"/>
        <property name="threadNamePrefix" value="Pinpoint-AgentBaseDataReceiger-Worker"/>
        <property name="daemon" value="true"/>
        <property name="waitForTasksToCompleteOnShutdown" value="true"/>
        <property name="awaitTerminationSeconds" value="10"/>
        <property name="preStartAllCoreThreads" value="true"/>
    </bean>

接收器:采用netty创建Server

    <bean id="acceptor" class="com.navercorp.pinpoint.rpc.server.PinpointServerAcceptor">
        <constructor-arg ref="channelFilter"/>
    </bean>

    --------------------------------------------------------------------------------------
    public PinpointServerAcceptor(ClusterOption clusterOption, ChannelFilter channelConnectedFilter, PipelineFactory pipelineFactory) {
        ServerBootstrap bootstrap = createBootStrap(1, WORKER_COUNT);
        setOptions(bootstrap);
        this.bootstrap = bootstrap;

        this.healthCheckTimer = TimerFactory.createHashedWheelTimer("PinpointServerSocket-HealthCheckTimer", 50, TimeUnit.MILLISECONDS, 512);
        this.healthCheckManager = new HealthCheckManager(healthCheckTimer, channelGroup);

        this.requestManagerTimer = TimerFactory.createHashedWheelTimer("PinpointServerSocket-RequestManager", 50, TimeUnit.MILLISECONDS, 512);

        this.clusterOption = clusterOption;
        this.channelConnectedFilter = Assert.requireNonNull(channelConnectedFilter, "channelConnectedFilter must not be null");

        this.pipelineFactory = Assert.requireNonNull(pipelineFactory, "pipelineFactory must not be null");
        addPipeline(bootstrap, pipelineFactory);
    }

tcp分发处理器:根据接收数据的的不同选择不同的Handler对数据进行处理

    <bean id="tcpDispatchHandlerWrapper" class="com.navercorp.pinpoint.collector.receiver.DispatchHandlerWrapper">
        <constructor-arg ref="tcpDispatchHandler"/>
        <constructor-arg ref="handlerManager"/>
    </bean>

集群服务:根据是否启动集群配置相关参数

    <bean id="clusterService" class="com.navercorp.pinpoint.collector.cluster.zookeeper.ZookeeperClusterService">
        <constructor-arg ref="collectorConfiguration"/>
        <constructor-arg ref="clusterPointRouter"/>
    </bean>

下面重点看一下AgentBaseDataReceiver这个类,在注入时首先执行构造函数:

    public AgentBaseDataReceiver(AgentBaseDataReceiverConfiguration configuration, Executor executor, PinpointServerAcceptor acceptor, DispatchHandler dispatchHandler, ZookeeperClusterService service) {
        this.configuration = Assert.requireNonNull(configuration, "config must not be null");
        this.executor = Objects.requireNonNull(executor, "executor must not be null");
        this.acceptor = Objects.requireNonNull(acceptor, "acceptor must not be null");

        this.tcpPacketHandler = wrapDispatchHandler(dispatchHandler);
        this.clusterService = service;
    }

     //跟踪一下以下语句:
    -->  this.tcpPacketHandler = wrapDispatchHandler(dispatchHandler);
       -->  return tcpPacketHandlerFactory.build(dispatchHandler);
           -->return new DefaultTCPPacketHandler(dispatchHandler, serializerFactory, deserializerFactory); 

     //可以看出,最后使用的是DefaultTCPPacketHandler这个Handler实例

返回到Accepter中去,在初始化Netty时定义了一个handler:nettyChannelHandler

        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline pipeline = pipelineFactory.newPipeline();
                pipeline.addLast("handler", nettyChannelHandler);

                return pipeline;
            }
        });

代码跟踪一下nettyChannelHandler

//定义nettyChannelHandler
private final PinpointServerChannelHandler nettyChannelHandler = new PinpointServerChannelHandler();
//PinpointServerChannelHandler中定义了channelConnected、channelDisconnected、messageReceived事件

查看channelConnected事件:
        @Override
        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            final Channel channel = e.getChannel();  //获取当前channel
            logger.info("channelConnected started. channel:{}", channel);

            if (released) {
                logger.warn("already released. channel:{}", channel);
                channel.write(new ServerClosePacket()).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        future.getChannel().close();
                    }
                });
                return;
            }

            final boolean accept = channelConnectedFilter.accept(channel);
            if (!accept) {
                logger.debug("channelConnected() channel discard. {}", channel);
                return;
            }

            DefaultPinpointServer pinpointServer = createPinpointServer(channel); //创建一个pinpointServer
            
            channel.setAttachment(pinpointServer); //给channel设置server信息
            channelGroup.add(channel);          //channel添加到组中

            pinpointServer.start();             //启动server          

            super.channelConnected(ctx, e);
        }

查看messageReceived事件:
 pinpointServer.messageReceived(message);
 -->final short packetType = getPacketType(message); //获取包类型
    -->根据包类型执行DefaultTCPPacketHandler下的handleSend、handleRequest、handlePingPacket

然后执行@PostConstruct注解下的方法:

 @PostConstruct
    public void start() {
        if (logger.isInfoEnabled()) {
            logger.info("start() started");
        }

        prepare(acceptor); //为接收器添加StateChangeEvent的Handler

        // take care when attaching message handlers as events are generated from the IO thread.
        // pass them to a separate queue and handle them in a different thread.
        acceptor.setMessageListener(new ServerMessageListener() { //消息监听

            @Override
            public HandshakeResponseCode handleHandshake(Map properties) {  //处理握手消息
                if (properties == null) {
                    return HandshakeResponseType.ProtocolError.PROTOCOL_ERROR;
                }

                boolean hasRequiredKeys = HandshakePropertyType.hasRequiredKeys(properties);
                if (!hasRequiredKeys) {
                    return HandshakeResponseType.PropertyError.PROPERTY_ERROR;
                }

                boolean supportServer = MapUtils.getBoolean(properties, HandshakePropertyType.SUPPORT_SERVER.getName(), true);
                if (supportServer) {
                    return HandshakeResponseType.Success.DUPLEX_COMMUNICATION;
                } else {
                    return HandshakeResponseType.Success.SIMPLEX_COMMUNICATION;
                }
            }

            @Override
            public void handleSend(SendPacket sendPacket, PinpointSocket pinpointSocket) {//处理发送消息
                receive(sendPacket, pinpointSocket);
            }

            @Override
            public void handleRequest(RequestPacket requestPacket, PinpointSocket pinpointSocket) {//处理请求消息
                requestResponse(requestPacket, pinpointSocket);
            }

            @Override
            public void handlePing(PingPayloadPacket pingPacket, PinpointServer pinpointServer) {//处理ping消息
                recordPing(pingPacket, pinpointServer);
            }
        });
        acceptor.bind(configuration.getBindIp(), configuration.getBindPort());

        if (logger.isInfoEnabled()) {
            logger.info("start() completed");
        }
    }

2.spanUdpReceiver

    <bean id="spanUdpReceiver" class="com.navercorp.pinpoint.collector.receiver.UDPReceiverBean">
        <property name="bindIp" value="#{spanReceiverConfig.udpBindIp}"/>
        <property name="bindPort" value="#{spanReceiverConfig.udpBindPort}"/>
        <property name="addressFilter" ref="addressFilter"/>
        <property name="dispatchHandler" ref="spanDispatchHandlerWrapper"/>
        <property name="udpBufferSize" value="#{spanReceiverConfig.udpReceiveBufferSize}"/>
        <!-- TCP & UDP share threadpool for span -->
        <property name="executor" ref="spanReceiverExecutor"/>
        <property name="datagramPoolSize" value="#{ statReceiverConfig.workerQueueSize + statReceiverConfig.workerThreadSize }"/>
        <property name="enable" value="#{spanReceiverConfig.isUdpEnable()}"/>
    </bean>

跟踪代码如下:

 udpReceiver = createUdpReceiver(beanName, this.bindIp, bindPort, udpBufferSize, executor, dispatchHandler, addressFilter);
 -->return new UDPReceiver(name, packetHandlerFactory, executor, udpBufferSize, bindAddress, pool);
    --> this.socket = createSocket(receiverBufferSize);

 udpReceiver.start();
 -->receive(socket);
    -->final PooledObject<DatagramPacket> pooledPacket = read0(socket);
       --> socket.receive(packet);
    -->Runnable task = wrapTask(pooledPacket);
        --> packetHandler.receive(localSocket, packet);

根据代码可以得知,spanUdpReceiver采用BIO方式进行数据的传输。

3.spanTcpReceiver

    <bean id="spanTcpReceiver" class="com.navercorp.pinpoint.collector.receiver.TCPReceiverBean">
        <property name="bindIp" value="#{spanReceiverConfig.tcpBindIp}"/>
        <property name="bindPort" value="#{spanReceiverConfig.tcpBindPort}"/>
        <property name="addressFilter" ref="addressFilter"/>
        <property name="dispatchHandler" ref="spanDispatchHandlerWrapper"/>
        <!-- TCP & UDP share threadpool for span -->
        <property name="executor" ref="spanReceiverExecutor"/>
        <property name="enable" value="#{spanReceiverConfig.isTcpEnable()}"/>
    </bean>

跟踪代码如下:

 tcpReceiver = createTcpReceiver(beanName, this.bindIp, bindPort, executor, dispatchHandler, this.tcpPacketHandlerFactory, addressFilter);
 -->return new TCPReceiver(beanName, tcpPacketHandler, executor, bindAddress, addressFilter);
 
 tcpReceiver.start(); 
 -->final PinpointServerAcceptor acceptor = newAcceptor();
    -->PinpointServerAcceptor acceptor = new PinpointServerAcceptor(connectedFilter); //netty类型的IO,类似上面的tcpReceiver

根据代码可以得知,spanTcpReceiver采用NIO方式进行数据的传输。

相关文章

网友评论

      本文标题:再探Pinpoint Collector(一)

      本文链接:https://www.haomeiwen.com/subject/lkwjqctx.html