Collector主要是接收Agent发送过来的数据并将数据存储到Hbase中,因为采用了Spring框架,所以重点关注applicationContext-collector.xml这个文件。
这个文件中注入了很多的Bean,下面就关键的Bean进行介绍。
1.tcpReceiver
<bean id="tcpReceiver" class="com.navercorp.pinpoint.collector.receiver.tcp.AgentBaseDataReceiver" >
<constructor-arg ref="baseDataReceiverConfig"/> //基本配置
<constructor-arg ref="tcpReceiverExecutor"/> //接收执行器
<constructor-arg ref="acceptor"/> //接收器
<constructor-arg ref="tcpDispatchHandlerWrapper"/> //分发处理器
<constructor-arg ref="clusterService"/> //集群服务
</bean>
基本配置:主要是定义绑定的IP、端口、工作线程大小、队列大小、是否被监控
<bean id="baseDataReceiverConfig" class="com.navercorp.pinpoint.collector.config.AgentBaseDataReceiverConfiguration">
<constructor-arg ref="pinpoint_collector_properties"/>
<constructor-arg ref="deprecatedConfig"/>
</bean>
接收执行器:定义了一个线程池工厂Bean
<bean id="tcpReceiverExecutor" class="com.navercorp.pinpoint.common.server.util.ThreadPoolExecutorFactoryBean" >
<property name="corePoolSize" value="#{baseDataReceiverConfig.workerThreadSize}"/>
<property name="maxPoolSize" value="#{baseDataReceiverConfig.workerThreadSize}"/>
<property name="queueCapacity" value="#{baseDataReceiverConfig.workerQueueSize}"/>
<property name="threadNamePrefix" value="Pinpoint-AgentBaseDataReceiger-Worker"/>
<property name="daemon" value="true"/>
<property name="waitForTasksToCompleteOnShutdown" value="true"/>
<property name="awaitTerminationSeconds" value="10"/>
<property name="preStartAllCoreThreads" value="true"/>
</bean>
接收器:采用netty创建Server
<bean id="acceptor" class="com.navercorp.pinpoint.rpc.server.PinpointServerAcceptor">
<constructor-arg ref="channelFilter"/>
</bean>
--------------------------------------------------------------------------------------
public PinpointServerAcceptor(ClusterOption clusterOption, ChannelFilter channelConnectedFilter, PipelineFactory pipelineFactory) {
ServerBootstrap bootstrap = createBootStrap(1, WORKER_COUNT);
setOptions(bootstrap);
this.bootstrap = bootstrap;
this.healthCheckTimer = TimerFactory.createHashedWheelTimer("PinpointServerSocket-HealthCheckTimer", 50, TimeUnit.MILLISECONDS, 512);
this.healthCheckManager = new HealthCheckManager(healthCheckTimer, channelGroup);
this.requestManagerTimer = TimerFactory.createHashedWheelTimer("PinpointServerSocket-RequestManager", 50, TimeUnit.MILLISECONDS, 512);
this.clusterOption = clusterOption;
this.channelConnectedFilter = Assert.requireNonNull(channelConnectedFilter, "channelConnectedFilter must not be null");
this.pipelineFactory = Assert.requireNonNull(pipelineFactory, "pipelineFactory must not be null");
addPipeline(bootstrap, pipelineFactory);
}
tcp分发处理器:根据接收数据的的不同选择不同的Handler对数据进行处理
<bean id="tcpDispatchHandlerWrapper" class="com.navercorp.pinpoint.collector.receiver.DispatchHandlerWrapper">
<constructor-arg ref="tcpDispatchHandler"/>
<constructor-arg ref="handlerManager"/>
</bean>
集群服务:根据是否启动集群配置相关参数
<bean id="clusterService" class="com.navercorp.pinpoint.collector.cluster.zookeeper.ZookeeperClusterService">
<constructor-arg ref="collectorConfiguration"/>
<constructor-arg ref="clusterPointRouter"/>
</bean>
下面重点看一下AgentBaseDataReceiver这个类,在注入时首先执行构造函数:
public AgentBaseDataReceiver(AgentBaseDataReceiverConfiguration configuration, Executor executor, PinpointServerAcceptor acceptor, DispatchHandler dispatchHandler, ZookeeperClusterService service) {
this.configuration = Assert.requireNonNull(configuration, "config must not be null");
this.executor = Objects.requireNonNull(executor, "executor must not be null");
this.acceptor = Objects.requireNonNull(acceptor, "acceptor must not be null");
this.tcpPacketHandler = wrapDispatchHandler(dispatchHandler);
this.clusterService = service;
}
//跟踪一下以下语句:
--> this.tcpPacketHandler = wrapDispatchHandler(dispatchHandler);
--> return tcpPacketHandlerFactory.build(dispatchHandler);
-->return new DefaultTCPPacketHandler(dispatchHandler, serializerFactory, deserializerFactory);
//可以看出,最后使用的是DefaultTCPPacketHandler这个Handler实例
返回到Accepter中去,在初始化Netty时定义了一个handler:nettyChannelHandler
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipelineFactory.newPipeline();
pipeline.addLast("handler", nettyChannelHandler);
return pipeline;
}
});
代码跟踪一下nettyChannelHandler
//定义nettyChannelHandler
private final PinpointServerChannelHandler nettyChannelHandler = new PinpointServerChannelHandler();
//PinpointServerChannelHandler中定义了channelConnected、channelDisconnected、messageReceived事件
查看channelConnected事件:
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
final Channel channel = e.getChannel(); //获取当前channel
logger.info("channelConnected started. channel:{}", channel);
if (released) {
logger.warn("already released. channel:{}", channel);
channel.write(new ServerClosePacket()).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
future.getChannel().close();
}
});
return;
}
final boolean accept = channelConnectedFilter.accept(channel);
if (!accept) {
logger.debug("channelConnected() channel discard. {}", channel);
return;
}
DefaultPinpointServer pinpointServer = createPinpointServer(channel); //创建一个pinpointServer
channel.setAttachment(pinpointServer); //给channel设置server信息
channelGroup.add(channel); //channel添加到组中
pinpointServer.start(); //启动server
super.channelConnected(ctx, e);
}
查看messageReceived事件:
pinpointServer.messageReceived(message);
-->final short packetType = getPacketType(message); //获取包类型
-->根据包类型执行DefaultTCPPacketHandler下的handleSend、handleRequest、handlePingPacket
然后执行@PostConstruct注解下的方法:
@PostConstruct
public void start() {
if (logger.isInfoEnabled()) {
logger.info("start() started");
}
prepare(acceptor); //为接收器添加StateChangeEvent的Handler
// take care when attaching message handlers as events are generated from the IO thread.
// pass them to a separate queue and handle them in a different thread.
acceptor.setMessageListener(new ServerMessageListener() { //消息监听
@Override
public HandshakeResponseCode handleHandshake(Map properties) { //处理握手消息
if (properties == null) {
return HandshakeResponseType.ProtocolError.PROTOCOL_ERROR;
}
boolean hasRequiredKeys = HandshakePropertyType.hasRequiredKeys(properties);
if (!hasRequiredKeys) {
return HandshakeResponseType.PropertyError.PROPERTY_ERROR;
}
boolean supportServer = MapUtils.getBoolean(properties, HandshakePropertyType.SUPPORT_SERVER.getName(), true);
if (supportServer) {
return HandshakeResponseType.Success.DUPLEX_COMMUNICATION;
} else {
return HandshakeResponseType.Success.SIMPLEX_COMMUNICATION;
}
}
@Override
public void handleSend(SendPacket sendPacket, PinpointSocket pinpointSocket) {//处理发送消息
receive(sendPacket, pinpointSocket);
}
@Override
public void handleRequest(RequestPacket requestPacket, PinpointSocket pinpointSocket) {//处理请求消息
requestResponse(requestPacket, pinpointSocket);
}
@Override
public void handlePing(PingPayloadPacket pingPacket, PinpointServer pinpointServer) {//处理ping消息
recordPing(pingPacket, pinpointServer);
}
});
acceptor.bind(configuration.getBindIp(), configuration.getBindPort());
if (logger.isInfoEnabled()) {
logger.info("start() completed");
}
}
2.spanUdpReceiver
<bean id="spanUdpReceiver" class="com.navercorp.pinpoint.collector.receiver.UDPReceiverBean">
<property name="bindIp" value="#{spanReceiverConfig.udpBindIp}"/>
<property name="bindPort" value="#{spanReceiverConfig.udpBindPort}"/>
<property name="addressFilter" ref="addressFilter"/>
<property name="dispatchHandler" ref="spanDispatchHandlerWrapper"/>
<property name="udpBufferSize" value="#{spanReceiverConfig.udpReceiveBufferSize}"/>
<!-- TCP & UDP share threadpool for span -->
<property name="executor" ref="spanReceiverExecutor"/>
<property name="datagramPoolSize" value="#{ statReceiverConfig.workerQueueSize + statReceiverConfig.workerThreadSize }"/>
<property name="enable" value="#{spanReceiverConfig.isUdpEnable()}"/>
</bean>
跟踪代码如下:
udpReceiver = createUdpReceiver(beanName, this.bindIp, bindPort, udpBufferSize, executor, dispatchHandler, addressFilter);
-->return new UDPReceiver(name, packetHandlerFactory, executor, udpBufferSize, bindAddress, pool);
--> this.socket = createSocket(receiverBufferSize);
udpReceiver.start();
-->receive(socket);
-->final PooledObject<DatagramPacket> pooledPacket = read0(socket);
--> socket.receive(packet);
-->Runnable task = wrapTask(pooledPacket);
--> packetHandler.receive(localSocket, packet);
根据代码可以得知,spanUdpReceiver采用BIO方式进行数据的传输。
3.spanTcpReceiver
<bean id="spanTcpReceiver" class="com.navercorp.pinpoint.collector.receiver.TCPReceiverBean">
<property name="bindIp" value="#{spanReceiverConfig.tcpBindIp}"/>
<property name="bindPort" value="#{spanReceiverConfig.tcpBindPort}"/>
<property name="addressFilter" ref="addressFilter"/>
<property name="dispatchHandler" ref="spanDispatchHandlerWrapper"/>
<!-- TCP & UDP share threadpool for span -->
<property name="executor" ref="spanReceiverExecutor"/>
<property name="enable" value="#{spanReceiverConfig.isTcpEnable()}"/>
</bean>
跟踪代码如下:
tcpReceiver = createTcpReceiver(beanName, this.bindIp, bindPort, executor, dispatchHandler, this.tcpPacketHandlerFactory, addressFilter);
-->return new TCPReceiver(beanName, tcpPacketHandler, executor, bindAddress, addressFilter);
tcpReceiver.start();
-->final PinpointServerAcceptor acceptor = newAcceptor();
-->PinpointServerAcceptor acceptor = new PinpointServerAcceptor(connectedFilter); //netty类型的IO,类似上面的tcpReceiver
根据代码可以得知,spanTcpReceiver采用NIO方式进行数据的传输。













网友评论