一、流程图
二、DataXceiver接收请求处理数据
1、DataXceiver线程的run方法
DataXceiver继承了Receiver类,调用readOp方法读取客户端操作请求,然后通过processOp方法处理客户端请求
/**
* Read/write data from/to the DataXceiverServer.
*/
@Override
public void run() {
int opsProcessed = 0;
Op op = null;
try {
dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
InputStream input = socketIn;
try {
IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
socketIn, datanode.getXferAddress().getPort(),
datanode.getDatanodeId());
input = new BufferedInputStream(saslStreams.in,
HdfsConstants.SMALL_BUFFER_SIZE);
socketOut = saslStreams.out;
} catch (InvalidMagicNumberException imne) {
if (imne.isHandshake4Encryption()) {
LOG.info("Failed to read expected encryption handshake from client " +
"at " + peer.getRemoteAddressString() + ". Perhaps the client " +
"is running an older version of Hadoop which does not support " +
"encryption");
} else {
LOG.info("Failed to read expected SASL data transfer protection " +
"handshake from client at " + peer.getRemoteAddressString() +
". Perhaps the client is running an older version of Hadoop " +
"which does not support SASL data transfer protection");
}
return;
}
super.initialize(new DataInputStream(input));
// We process requests in a loop, and stay around for a short timeout.
// This optimistic behaviour allows the other end to reuse connections.
// Setting keepalive timeout to 0 disable this behavior.
do {
updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));
try {
if (opsProcessed != 0) {
assert dnConf.socketKeepaliveTimeout > 0;
peer.setReadTimeout(dnConf.socketKeepaliveTimeout);
} else {
peer.setReadTimeout(dnConf.socketTimeout);
}
//读取客户端操作
op = readOp();
} catch (InterruptedIOException ignored) {
// Time out while we wait for client rpc
break;
} catch (IOException err) {
// Since we optimistically expect the next op, it's quite normal to get EOF here.
if (opsProcessed > 0 &&
(err instanceof EOFException || err instanceof ClosedChannelException)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cached " + peer + " closing after " + opsProcessed + " ops");
}
} else {
incrDatanodeNetworkErrors();
throw err;
}
break;
}
// restore normal timeout
if (opsProcessed != 0) {
peer.setReadTimeout(dnConf.socketTimeout);
}
opStartTime = monotonicNow();
//处理客户端操作请求
processOp(op);
++opsProcessed;
} while ((peer != null) &&
(!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
} catch (Throwable t) {
String s = datanode.getDisplayName() + ":DataXceiver error processing "
+ ((op == null) ? "unknown" : op.name()) + " operation "
+ " src: " + remoteAddress + " dst: " + localAddress;
if (op == Op.WRITE_BLOCK && t instanceof ReplicaAlreadyExistsException) {
// For WRITE_BLOCK, it is okay if the replica already exists since
// client and replication may write the same block to the same datanode
// at the same time.
if (LOG.isTraceEnabled()) {
LOG.trace(s, t);
} else {
LOG.info(s + "; " + t);
}
} else if (op == Op.READ_BLOCK && t instanceof SocketTimeoutException) {
String s1 =
"Likely the client has stopped reading, disconnecting it";
s1 += " (" + s + ")";
if (LOG.isTraceEnabled()) {
LOG.trace(s1, t);
} else {
LOG.info(s1 + "; " + t);
}
} else {
LOG.error(s, t);
}
} finally {
if (LOG.isDebugEnabled()) {
LOG.debug(datanode.getDisplayName() + ":Number of active connections is: "
+ datanode.getXceiverCount());
}
updateCurrentThreadName("Cleaning up");
if (peer != null) {
dataXceiverServer.closePeer(peer);
IOUtils.closeStream(in);
}
}
}
我们重点关注写操作opWriteBlock
/** Process op by the corresponding method. */
protected final void processOp(Op op) throws IOException {
switch(op) {
case READ_BLOCK:
//读取block
opReadBlock();
break;
case WRITE_BLOCK:
opWriteBlock(in);
break;
case REPLACE_BLOCK:
opReplaceBlock(in);
break;
case COPY_BLOCK:
opCopyBlock(in);
break;
case BLOCK_CHECKSUM:
opBlockChecksum(in);
break;
case TRANSFER_BLOCK:
opTransferBlock(in);
break;
case REQUEST_SHORT_CIRCUIT_FDS:
opRequestShortCircuitFds(in);
break;
case RELEASE_SHORT_CIRCUIT_FDS:
opReleaseShortCircuitFds(in);
break;
case REQUEST_SHORT_CIRCUIT_SHM:
opRequestShortCircuitShm(in);
break;
default:
throw new IOException("Unknown op " + op + " in data stream");
}
}
2、Receiver.opWriteBlock写方法
DataXceiver调用父类的Receiver的opWriteBlock方法
/** Receive OP_WRITE_BLOCK */
private void opWriteBlock(DataInputStream in) throws IOException {
final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
final DatanodeInfo[] targets = PBHelper.convert(proto.getTargetsList());
TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName());
try {
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(),
targets,
PBHelper.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
PBHelper.convert(proto.getSource()),
fromProto(proto.getStage()),
proto.getPipelineSize(),
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
proto.getLatestGenerationStamp(),
fromProto(proto.getRequestedChecksum()),
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()),
(proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false),
(proto.hasPinning() ? proto.getPinning(): false),
(PBHelper.convertBooleanList(proto.getTargetPinningsList())));
} finally {
if (traceScope != null) traceScope.close();
}
}
3、DataXceiver.writeBlock写方法
该方法主要包含了写数据的流程
- 向下游节点发送写入请求,并等待下游connect ack确认mirrorInStatus。
- 如果下游节点ack确认成功,则向上游节点返回这个ack确认mirrorInStatus。
- 然后执行blockReceiver.receiveBlock方法将接收上游节点数据块并将该数据块发送给下游节点,并且从下游节点接收packet ack消息并将这个ack消息转发到上游节点。
@Override
public void writeBlock(final ExtendedBlock block,
final StorageType storageType,
final Token<BlockTokenIdentifier> blockToken,
final String clientname,
final DatanodeInfo[] targets,
final StorageType[] targetStorageTypes,
final DatanodeInfo srcDataNode,
final BlockConstructionStage stage,
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
DataChecksum requestedChecksum,
CachingStrategy cachingStrategy,
final boolean allowLazyPersist,
final boolean pinning,
final boolean[] targetPinnings) throws IOException {
previousOpClientName = clientname;
updateCurrentThreadName("Receiving block " + block);
//datanode节点,表示DFSClient发起的写操作,false 客户端;true datanode节点客户端
final boolean isDatanode = clientname.length() == 0;
//客户端,表示datanode触发写操作,true 客户端; false datanode节点客户端
final boolean isClient = !isDatanode;
//表示当前写操作是否是数块复制操作,
final boolean isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
|| stage == BlockConstructionStage.TRANSFER_FINALIZED;
long size = 0;
// check single target for transfer-RBW/Finalized
if (isTransfer && targets.length > 0) {
throw new IOException(stage + " does not support multiple targets "
+ Arrays.asList(targets));
}
if (LOG.isDebugEnabled()) {
LOG.debug("opWriteBlock: stage=" + stage + ", clientname=" + clientname
+ "\n block =" + block + ", newGs=" + latestGenerationStamp
+ ", bytesRcvd=[" + minBytesRcvd + ", " + maxBytesRcvd + "]"
+ "\n targets=" + Arrays.asList(targets)
+ "; pipelineSize=" + pipelineSize + ", srcDataNode=" + srcDataNode
+ ", pinning=" + pinning);
LOG.debug("isDatanode=" + isDatanode
+ ", isClient=" + isClient
+ ", isTransfer=" + isTransfer);
LOG.debug("writeBlock receive buf size " + peer.getReceiveBufferSize() +
" tcp no delay " + peer.getTcpNoDelay());
}
// We later mutate block's generation stamp and length, but we need to
// forward the original version of the block to downstream mirrors, so
// make a copy here.
final ExtendedBlock originalBlock = new ExtendedBlock(block);
if (block.getNumBytes() == 0) {
block.setNumBytes(dataXceiverServer.estimateBlockSize);
}
LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: "
+ localAddress);
// reply to upstream datanode or client
//响应到上游节点datanode输出流
final DataOutputStream replyOut = new DataOutputStream(
new BufferedOutputStream(
getOutputStream(),
HdfsConstants.SMALL_BUFFER_SIZE));
checkAccess(replyOut, isClient, block, blockToken,
Op.WRITE_BLOCK, BlockTokenSecretManager.AccessMode.WRITE);
/**
* 以dn2为参照物
* 来自上游输入流in 到下游节点输出流mirrorOut
* 上游节点dn1---------------->dn2--------------------------->下游节点dn3(mirrorNode)
* <---------------- <---------------------------
* 到上游输出流replyOut 来自下游输入流mirrorIn
*/
//到下游datanode节点的输出流,向下游写数据
DataOutputStream mirrorOut = null; // stream to next target
//来自下游datanode节点的输入流,接收下向下游写数据游数据
DataInputStream mirrorIn = null; // reply from next target
//到下游datanode节点的scoket
Socket mirrorSock = null; // socket to next target
//下游datanode节点地址和端口
String mirrorNode = null; // the name:port of next target
//pipline管道中第一个失败的datanode
String firstBadLink = ""; // first datanode that failed in connection setup
Status mirrorInStatus = SUCCESS;
//保存这个block的datanode存储的id
final String storageUuid;
try {
//datanode作为写到客户端
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver
//从上游datanode节点接收block。dn1->dn2->dn3,dn1是第一个上游节点
blockReceiver = new BlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy, allowLazyPersist, pinning);
storageUuid = blockReceiver.getStorageUuid();
} else {
storageUuid = datanode.data.recoverClose(
block, latestGenerationStamp, minBytesRcvd);
}
//
// Connect to downstream machine, if appropriate
//连接到下游节点
if (targets.length > 0) {
InetSocketAddress mirrorTarget = null;
// Connect to backup machine
//下游datanode
mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + mirrorNode);
}
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
mirrorSock = datanode.newSocket();
try {
int timeoutValue = dnConf.socketTimeout
+ (HdfsServerConstants.READ_TIMEOUT_EXTENSION * targets.length);
int writeTimeout = dnConf.socketWriteTimeout +
(HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * targets.length);
//创建和下游datanode的scoket连接
NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
mirrorSock.setSoTimeout(timeoutValue);
mirrorSock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);
//socket转输出流
OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock,
writeTimeout);
//socket转输入流
InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock);
DataEncryptionKeyFactory keyFactory =
datanode.getDataEncryptionKeyFactoryForBlock(block);
IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock,
unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]);
unbufMirrorOut = saslStreams.out;
unbufMirrorIn = saslStreams.in;
//创建到下游节点的输出流,向下游写数据
mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,
HdfsConstants.SMALL_BUFFER_SIZE));
//创建来自下游节点的输入流,接收下游数据
mirrorIn = new DataInputStream(unbufMirrorIn);
// Do not propagate allowLazyPersist to downstream DataNodes.
//通过socket向下游节点发送数据
//targetPinnings是否固定写目标节点的dataonde,DataStreamer里传递过来的
if (targetPinnings != null && targetPinnings.length > 0) {
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy,
false, targetPinnings[0], targetPinnings);
} else {
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes, srcDataNode,
stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy,
false, false, targetPinnings);
}
mirrorOut.flush();
DataNodeFaultInjector.get().writeBlockAfterFlush();
// read connect ack (only for clients, not for replication req)
//客户端发起写请求
if (isClient) {
//获取下游节点pipline创建的ack确认
BlockOpResponseProto connectAck =
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
mirrorInStatus = connectAck.getStatus();
firstBadLink = connectAck.getFirstBadLink();
if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
LOG.info("Datanode " + targets.length +
" got response for connect ack " +
" from downstream datanode with firstbadlink as " +
firstBadLink);
}
}
} catch (IOException e) {
//如果是客户端发起的写操作,出现异常告诉上游节点datanode,本节点写入异常
if (isClient) {
BlockOpResponseProto.newBuilder()
.setStatus(ERROR)
// NB: Unconditionally using the xfer addr w/o hostname
.setFirstBadLink(targets[0].getXferAddr())
.build()
.writeDelimitedTo(replyOut);
replyOut.flush();
}
//关闭下游节点所有流
IOUtils.closeStream(mirrorOut);
mirrorOut = null;
IOUtils.closeStream(mirrorIn);
mirrorIn = null;
IOUtils.closeSocket(mirrorSock);
mirrorSock = null;
if (isClient) {
LOG.error(datanode + ":Exception transfering block " +
block + " to mirror " + mirrorNode + ": " + e);
throw e;
} else {
LOG.info(datanode + ":Exception transfering " +
block + " to mirror " + mirrorNode +
"- continuing without the mirror", e);
incrDatanodeNetworkErrors();
}
}
}
// send connect-ack to source for clients and not transfer-RBW/Finalized
//向上游节点返回确认
if (isClient && !isTransfer) {
if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
LOG.info("Datanode " + targets.length +
" forwarding connect ack to upstream firstbadlink is " +
firstBadLink);
}
//向上游节点返回请求确认。
BlockOpResponseProto.newBuilder()
.setStatus(mirrorInStatus)
.setFirstBadLink(firstBadLink)
.build()
.writeDelimitedTo(replyOut);
replyOut.flush();
}
// receive the block and mirror to the next target
if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
//
//
//从上游节点接收数块,然后将数据块发送到下游节点。同时还会从下游节点接收ack消息,并将这个消息发送到上游节点
//成功建立pipeline后,开始接收pipeline中的packet数据包
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
mirrorAddr, null, targets, false);
// send close-ack for transfer-RBW/Finalized
//复制操作,不需要向下游节点发数据和接收ack
if (isTransfer) {
if (LOG.isTraceEnabled()) {
LOG.trace("TRANSFER: send close-ack");
}
writeResponse(SUCCESS, null, replyOut);
}
}
// update its generation stamp
//更新写数据块副本的时间戳,字节信息
if (isClient &&
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
block.setGenerationStamp(latestGenerationStamp);
block.setNumBytes(minBytesRcvd);
}
// if this write is for a replication request or recovering
// a failed close for client, then confirm block. For other client-writes,
// the block is finalized in the PacketResponder.
//则通知namenode本datanode节点成功接收数据块
if (isDatanode ||
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT, storageUuid);
LOG.info("Received " + block + " src: " + remoteAddress + " dest: "
+ localAddress + " of size " + block.getNumBytes());
}
if(isClient) {
size = block.getNumBytes();
}
} catch (IOException ioe) {
LOG.info("opWriteBlock " + block + " received exception " + ioe);
incrDatanodeNetworkErrors();
throw ioe;
} finally {
// close all opened streams
//关闭上下游节点所有流
IOUtils.closeStream(mirrorOut);
IOUtils.closeStream(mirrorIn);
IOUtils.closeStream(replyOut);
IOUtils.closeSocket(mirrorSock);
IOUtils.closeStream(blockReceiver);
blockReceiver = null;
}
//update metrics
datanode.metrics.addWriteBlockOp(elapsed());
datanode.metrics.incrWritesFromClient(peer.isLocal(), size);
}
4、BlockReceiver.receiveBlock方法实现
- 启动PacketResponder线程来发送ack消息,由ResponseProcessor线程来接收ack消息
- 循环执行receivePacket直到最后一个packet接收完成,关闭PacketResponder线程
void receiveBlock(
DataOutputStream mirrOut, // output to next datanode
DataInputStream mirrIn, // input from next datanode
DataOutputStream replyOut, // output to previous datanode
String mirrAddr, DataTransferThrottler throttlerArg,
DatanodeInfo[] downstreams,
boolean isReplaceBlock) throws IOException {
syncOnClose = datanode.getDnConf().syncOnClose;
boolean responderClosed = false;
mirrorOut = mirrOut;
mirrorAddr = mirrAddr;
throttler = throttlerArg;
this.replyOut = replyOut;
this.isReplaceBlock = isReplaceBlock;
try {
//如果是客户端发起写请求,则启动PacketResponder守护线程发送ack消息,对于的ResponseProcessor守护线程接收ack
if (isClient && !isTransfer) {
responder = new Daemon(datanode.threadGroup,
new PacketResponder(replyOut, mirrIn, downstreams));
responder.start(); // start thread to processes responses
}
//直到接收到最后一个packet
while (receivePacket() >= 0) { /* Receive until the last packet */ }
// wait for all outstanding packet responses. And then
// indicate responder to gracefully shutdown.
// Mark that responder has been closed for future processing
// 节点已接收了所有packet,可以等待发送完所有ack后关闭responder
if (responder != null) {
((PacketResponder)responder.getRunnable()).close();
responderClosed = true;
}
// If this write is for a replication or transfer-RBW/Finalized,
// then finalize block or convert temporary to RBW.
// For client-writes, the block is finalized in the PacketResponder.
if (isDatanode || isTransfer) {
// Hold a volume reference to finalize block.
try (ReplicaHandler handler = claimReplicaHandler()) {
// close the block/crc files
close();
block.setNumBytes(replicaInfo.getNumBytes());
if (stage == BlockConstructionStage.TRANSFER_RBW) {
// for TRANSFER_RBW, convert temporary to RBW
datanode.data.convertTemporaryToRbw(block);
} else {
// for isDatnode or TRANSFER_FINALIZED
// Finalize the block.
datanode.data.finalizeBlock(block);
}
}
datanode.metrics.incrBlocksWritten();
}
} catch (IOException ioe) {
replicaInfo.releaseAllBytesReserved();
if (datanode.isRestarting()) {
// Do not throw if shutting down for restart. Otherwise, it will cause
// premature termination of responder.
LOG.info("Shutting down for restart (" + block + ").");
} else {
LOG.info("Exception for " + block, ioe);
throw ioe;
}
} finally {
// Clear the previous interrupt state of this thread.
Thread.interrupted();
// If a shutdown for restart was initiated, upstream needs to be notified.
// There is no need to do anything special if the responder was closed
// normally.
if (!responderClosed) { // Data transfer was not complete.
if (responder != null) {
// In case this datanode is shutting down for quick restart,
// send a special ack upstream.
if (datanode.isRestarting() && isClient && !isTransfer) {
File blockFile = ((ReplicaInPipeline)replicaInfo).getBlockFile();
File restartMeta = new File(blockFile.getParent() +
File.pathSeparator + "." + blockFile.getName() + ".restart");
if (restartMeta.exists() && !restartMeta.delete()) {
LOG.warn("Failed to delete restart meta file: " +
restartMeta.getPath());
}
try (Writer out = new OutputStreamWriter(
new FileOutputStream(restartMeta), "UTF-8")) {
// write out the current time.
out.write(Long.toString(Time.now() + restartBudget));
out.flush();
} catch (IOException ioe) {
// The worst case is not recovering this RBW replica.
// Client will fall back to regular pipeline recovery.
} finally {
IOUtils.cleanup(LOG, out);
}
try {
// Even if the connection is closed after the ack packet is
// flushed, the client can react to the connection closure
// first. Insert a delay to lower the chance of client
// missing the OOB ack.
Thread.sleep(1000);
} catch (InterruptedException ie) {
// It is already going down. Ignore this.
}
}
responder.interrupt();
}
IOUtils.closeStream(this);
cleanupBlock();
}
if (responder != null) {
try {
responder.interrupt();
// join() on the responder should timeout a bit earlier than the
// configured deadline. Otherwise, the join() on this thread will
// likely timeout as well.
long joinTimeout = datanode.getDnConf().getXceiverStopTimeout();
joinTimeout = joinTimeout > 1 ? joinTimeout*8/10 : joinTimeout;
responder.join(joinTimeout);
if (responder.isAlive()) {
String msg = "Join on responder thread " + responder
+ " timed out";
LOG.warn(msg + "\n" + StringUtils.getStackTrace(responder));
throw new IOException(msg);
}
} catch (InterruptedException e) {
responder.interrupt();
// do not throw if shutting down for restart.
if (!datanode.isRestarting()) {
throw new IOException("Interrupted receiveBlock");
}
}
responder = null;
}
}
}
5、BlockReceiver.receivePacket完成写数据
- packetReceiver.receiveNextPacket(in) 读取packet。
- 如果是最后一个packet或空packet,则通过flushOrSync持久化数据。
- 如果不是最后一个packet或空packet,对数据进行checksum并且持久化数据然后清除操作系统缓存
- 最后数据持久化后,在进行ack
/**
* Receives and processes a packet. It can contain many chunks.
* returns the number of data bytes that the packet has.
*/
private int receivePacket() throws IOException {
// read the next packet
packetReceiver.receiveNextPacket(in);
PacketHeader header = packetReceiver.getHeader();
if (LOG.isDebugEnabled()){
LOG.debug("Receiving one packet for block " + block +
": " + header);
}
// Sanity check the header
if (header.getOffsetInBlock() > replicaInfo.getNumBytes()) {
throw new IOException("Received an out-of-sequence packet for " + block +
"from " + inAddr + " at offset " + header.getOffsetInBlock() +
". Expecting packet starting at " + replicaInfo.getNumBytes());
}
if (header.getDataLen() < 0) {
throw new IOException("Got wrong length during writeBlock(" + block +
") from " + inAddr + " at offset " +
header.getOffsetInBlock() + ": " +
header.getDataLen());
}
long offsetInBlock = header.getOffsetInBlock();
long seqno = header.getSeqno();
boolean lastPacketInBlock = header.isLastPacketInBlock();
final int len = header.getDataLen();
boolean syncBlock = header.getSyncBlock();
// avoid double sync'ing on close
if (syncBlock && lastPacketInBlock) {
this.syncOnClose = false;
}
// update received bytes
final long firstByteInBlock = offsetInBlock;
offsetInBlock += len;
if (replicaInfo.getNumBytes() < offsetInBlock) {
replicaInfo.setNumBytes(offsetInBlock);
}
// put in queue for pending acks, unless sync was requested
// 如果不需要立即持久化也不需要校验收到的数据,则可以立返回 SUCCESS 的ack,然后再进行校验和持久化
if (responder != null && !syncBlock && !shouldVerifyChecksum()) {
((PacketResponder) responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock, Status.SUCCESS);
}
//First write the packet to the mirror:
if (mirrorOut != null && !mirrorError) {
try {
long begin = Time.monotonicNow();
//向下游节点发送packet
packetReceiver.mirrorPacketTo(mirrorOut);
mirrorOut.flush();
long duration = Time.monotonicNow() - begin;
if (duration > datanodeSlowLogThresholdMs) {
LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
}
} catch (IOException e) {
handleMirrorOutError(e);
}
}
ByteBuffer dataBuf = packetReceiver.getDataSlice();
ByteBuffer checksumBuf = packetReceiver.getChecksumSlice();
//如果是最后一个packet或空packet,则持久化数据
if (lastPacketInBlock || len == 0) {
if(LOG.isDebugEnabled()) {
LOG.debug("Receiving an empty packet or the end of the block " + block);
}
// sync block if requested
if (syncBlock) {
flushOrSync(true);
}
} else {
final int checksumLen = diskChecksum.getChecksumSize(len);
final int checksumReceivedLen = checksumBuf.capacity();
if (checksumReceivedLen > 0 && checksumReceivedLen != checksumLen) {
throw new IOException("Invalid checksum length: received length is "
+ checksumReceivedLen + " but expected length is " + checksumLen);
}
//如果当前datanode是pipeline中的最后一个节点???,则验证校验和
if (checksumReceivedLen > 0 && shouldVerifyChecksum()) {
try {
//校验数据块
verifyChunks(dataBuf, checksumBuf);
} catch (IOException ioe) {
// checksum error detected locally. there is no reason to continue.
//如果校验失败,发送ERROR_CHECKSUM
if (responder != null) {
try {
((PacketResponder) responder.getRunnable()).enqueue(seqno,
lastPacketInBlock, offsetInBlock,
Status.ERROR_CHECKSUM);
// Wait until the responder sends back the response
// and interrupt this thread.
Thread.sleep(3000);
} catch (InterruptedException e) { }
}
throw new IOException("Terminating due to a checksum error." + ioe);
}
if (needsChecksumTranslation) {
// overwrite the checksums in the packet buffer with the
// appropriate polynomial for the disk storage.
translateChunks(dataBuf, checksumBuf);
}
}
if (checksumReceivedLen == 0 && !streams.isTransientStorage()) {
// checksum is missing, need to calculate it
checksumBuf = ByteBuffer.allocate(checksumLen);
diskChecksum.calculateChunkedSums(dataBuf, checksumBuf);
}
// by this point, the data in the buffer uses the disk checksum
final boolean shouldNotWriteChecksum = checksumReceivedLen == 0
&& streams.isTransientStorage();
try {
long onDiskLen = replicaInfo.getBytesOnDisk();
if (onDiskLen<offsetInBlock) {
// Normally the beginning of an incoming packet is aligned with the
// existing data on disk. If the beginning packet data offset is not
// checksum chunk aligned, the end of packet will not go beyond the
// next chunk boundary.
// When a failure-recovery is involved, the client state and the
// the datanode state may not exactly agree. I.e. the client may
// resend part of data that is already on disk. Correct number of
// bytes should be skipped when writing the data and checksum
// buffers out to disk.
long partialChunkSizeOnDisk = onDiskLen % bytesPerChecksum;
boolean alignedOnDisk = partialChunkSizeOnDisk == 0;
boolean alignedInPacket = firstByteInBlock % bytesPerChecksum == 0;
// Since data is always appended, not overwritten, partial CRC
// recalculation is necessary if the on-disk data is not chunk-
// aligned, regardless of whether the beginning of the data in
// the packet is chunk-aligned.
boolean doPartialCrc = !alignedOnDisk && !shouldNotWriteChecksum;
// If this is a partial chunk, then verify that this is the only
// chunk in the packet. If the starting offset is not chunk
// aligned, the packet should terminate at or before the next
// chunk boundary.
if (!alignedInPacket && len > bytesPerChecksum) {
throw new IOException("Unexpected packet data length for "
+ block + " from " + inAddr + ": a partial chunk must be "
+ " sent in an individual packet (data length = " + len
+ " > bytesPerChecksum = " + bytesPerChecksum + ")");
}
// If the last portion of the block file is not a full chunk,
// then read in pre-existing partial data chunk and recalculate
// the checksum so that the checksum calculation can continue
// from the right state.
Checksum partialCrc = null;
if (doPartialCrc) {
if (LOG.isDebugEnabled()) {
LOG.debug("receivePacket for " + block
+ ": previous write did not end at the chunk boundary."
+ " onDiskLen=" + onDiskLen);
}
long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
onDiskLen / bytesPerChecksum * checksumSize;
partialCrc = computePartialChunkCrc(onDiskLen, offsetInChecksum);
}
// The data buffer position where write will begin. If the packet
// data and on-disk data have no overlap, this will not be at the
// beginning of the buffer.
int startByteToDisk = (int)(onDiskLen-firstByteInBlock)
+ dataBuf.arrayOffset() + dataBuf.position();
// Actual number of data bytes to write.
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
// Write data to disk.
long begin = Time.monotonicNow();
out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
long duration = Time.monotonicNow() - begin;
if (duration > datanodeSlowLogThresholdMs) {
LOG.warn("Slow BlockReceiver write data to disk cost:" + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
}
final byte[] lastCrc;
if (shouldNotWriteChecksum) {
lastCrc = null;
} else {
int skip = 0;
byte[] crcBytes = null;
// First, overwrite the partial crc at the end, if necessary.
if (doPartialCrc) { // not chunk-aligned on disk
// Calculate new crc for this chunk.
int bytesToReadForRecalc =
(int)(bytesPerChecksum - partialChunkSizeOnDisk);
if (numBytesToDisk < bytesToReadForRecalc) {
bytesToReadForRecalc = numBytesToDisk;
}
partialCrc.update(dataBuf.array(), startByteToDisk,
bytesToReadForRecalc);
byte[] buf = FSOutputSummer.convertToByteStream(partialCrc,
checksumSize);
crcBytes = copyLastChunkChecksum(buf, checksumSize, buf.length);
// prepare to overwrite last checksum
adjustCrcFilePosition();
checksumOut.write(buf);
if(LOG.isDebugEnabled()) {
LOG.debug("Writing out partial crc for data len " + len +
", skip=" + skip);
}
skip++; // For the partial chunk that was just read.
}
// Determine how many checksums need to be skipped up to the last
// boundary. The checksum after the boundary was already counted
// above. Only count the number of checksums skipped up to the
// boundary here.
long lastChunkBoundary = onDiskLen - (onDiskLen%bytesPerChecksum);
long skippedDataBytes = lastChunkBoundary - firstByteInBlock;
if (skippedDataBytes > 0) {
skip += (int)(skippedDataBytes / bytesPerChecksum) +
((skippedDataBytes % bytesPerChecksum == 0) ? 0 : 1);
}
skip *= checksumSize; // Convert to number of bytes
// write the rest of checksum
final int offset = checksumBuf.arrayOffset() +
checksumBuf.position() + skip;
final int end = offset + checksumLen - skip;
// If offset > end, there is no more checksum to write.
// I.e. a partial chunk checksum rewrite happened and there is no
// more to write after that.
if (offset > end) {
assert crcBytes != null;
lastCrc = crcBytes;
} else {
final int remainingBytes = checksumLen - skip;
lastCrc = copyLastChunkChecksum(checksumBuf.array(),
checksumSize, end);
checksumOut.write(checksumBuf.array(), offset, remainingBytes);
}
}
/// flush entire packet, sync if requested
//持久化磁盘
flushOrSync(syncBlock);
replicaInfo.setLastChecksumAndDataLen(offsetInBlock, lastCrc);
datanode.metrics.incrBytesWritten(len);
datanode.metrics.incrTotalWriteTime(duration);
//清除操作系统缓存
manageWriterOsCache(offsetInBlock);
}
} catch (IOException iex) {
datanode.checkDiskErrorAsync();
throw iex;
}
}
6、BlockReceiver.PacketResponder.run解析
- 读取并处理下游的ack。
- 如果是最后一个数据包packet响应,则通过finalizeBlock 中的BPOfferService中的RPC通知namenode成功接收block。
- 复制下游节点的数据包ack响应并发送给上游节点。
- 从ack队列中删除该packet。
/**
* Thread to process incoming acks.
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
boolean lastPacketInBlock = false;
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
while (isRunning() && !lastPacketInBlock) {
long totalAckTimeNanos = 0;
boolean isInterrupted = false;
try {
Packet pkt = null;
long expected = -2;
PipelineAck ack = new PipelineAck();
long seqno = PipelineAck.UNKOWN_SEQNO;
long ackRecvNanoTime = 0;
try {
if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) {
// read an ack from downstream datanode
//读取下游节点的ack
ack.readFields(downstreamIn);
ackRecvNanoTime = System.nanoTime();
if (LOG.isDebugEnabled()) {
LOG.debug(myString + " got " + ack);
}
// Process an OOB ACK.
//oob消息可能是重启的datanode
Status oobStatus = ack.getOOBStatus();
if (oobStatus != null) {
LOG.info("Relaying an out of band ack of type " + oobStatus);
//发给上游节点处理
sendAckUpstream(ack, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
PipelineAck.combineHeader(datanode.getECN(),
Status.SUCCESS));
continue;
}
seqno = ack.getSeqno();
}
if (seqno != PipelineAck.UNKOWN_SEQNO
|| type == PacketResponderType.LAST_IN_PIPELINE) {
//从ack队列中取出待处理的packet
pkt = waitForAckHead(seqno);
if (!isRunning()) {
break;
}
expected = pkt.seqno;
//
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
&& seqno != expected) {
throw new IOException(myString + "seqno: expected=" + expected
+ ", received=" + seqno);
}
if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
// The total ack time includes the ack times of downstream
// nodes.
// The value is 0 if this responder doesn't have a downstream
// DN in the pipeline.
totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime;
// Report the elapsed time from ack send to ack receive minus
// the downstream ack time.
long ackTimeNanos = totalAckTimeNanos
- ack.getDownstreamAckTimeNanos();
if (ackTimeNanos < 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Calculated invalid ack time: " + ackTimeNanos
+ "ns.");
}
} else {
datanode.metrics.addPacketAckRoundTripTimeNanos(ackTimeNanos);
}
}
lastPacketInBlock = pkt.lastPacketInBlock;
}
} catch (InterruptedException ine) {
isInterrupted = true;
} catch (IOException ioe) {
if (Thread.interrupted()) {
isInterrupted = true;
} else {
// continue to run even if can not read from mirror
// notify client of the error
// and wait for the client to shut down the pipeline
//如果从下游节点读取数据时候抛出异常,
mirrorError = true;
LOG.info(myString, ioe);
}
}
if (Thread.interrupted() || isInterrupted) {
/*
* The receiver thread cancelled this thread. We could also check
* any other status updates from the receiver thread (e.g. if it is
* ok to write to replyOut). It is prudent to not send any more
* status back to the client because this datanode has a problem.
* The upstream datanode will detect that this datanode is bad, and
* rightly so.
*
* The receiver thread can also interrupt this thread for sending
* an out-of-band response upstream.
*/
LOG.info(myString + ": Thread is interrupted.");
running = false;
continue;
}
//如果是最后一个数据包packet响应,则提交数据块block
if (lastPacketInBlock) {
// Finalize the block and close the block file
//通知namenode已经成功接收该数据块
finalizeBlock(startTime);
}
Status myStatus = pkt != null ? pkt.ackStatus : Status.SUCCESS;
//复制下游节点的数据包ack响应,加入当前datanode节点状态,并发给上游节点
sendAckUpstream(ack, expected, totalAckTimeNanos,
(pkt != null ? pkt.offsetInBlock : 0),
PipelineAck.combineHeader(datanode.getECN(), myStatus));
if (pkt != null) {
// remove the packet from the ack queue
//从ackQueue中移除数据包packet
removeAckHead();
}
} catch (IOException e) {
LOG.warn("IOException in BlockReceiver.run(): ", e);
if (running) {
//检查当前节点的数据是否有错误
datanode.checkDiskErrorAsync();
LOG.info(myString, e);
running = false;
if (!Thread.interrupted()) { // failure not caused by interruption
receiverThread.interrupt();
}
}
} catch (Throwable e) {
if (running) {
LOG.info(myString, e);
running = false;
receiverThread.interrupt();
}
}
}
LOG.info(myString + " terminating");
}
网友评论