Flink 系列博客
Flink QuickStart
Flink 双流操作
Flink on Yarn Kerberos的配置
Flink on Yarn部署和任务提交操作
Flink 配置Prometheus监控
Flink in docker 部署
Flink HA 部署
Flink 常见调优参数总结
Flink 源码之任务提交流程分析
Flink 源码之基本算子
Flink 源码之Trigger
Flink 源码之Evictor
Flink 源码之Window
Flink 源码之WindowOperator
Flink 源码之StreamGraph生成
Flink 源码之JobGraph生成
Flink 源码之两阶段提交
Flink 源码之分布式快照
Flink 源码之时间处理
Flink 源码之节点间通信
Flink 源码之Credit Based反压
Flink 源码之快照
Flink 源码之FlinkKafkaConsumer
Flink 源码之内存管理
Flink 源码之 1.11新特性Unaligned checkpoint
Flink 源码之TaskManager启动流程
Flink 源码之leader选举(Zookeeper方式)
Flink 源码之Continuous Trigger
Slot 概念
Flink中的Slot是一组资源的集合,包含CPU核心数,task堆内存,task对外内存,管理内存和网络内存。同时slot也是Flink的资源分配单位。
一个TaskManager中包含一个或者多个Slot。根据slot共享配置,一个slot中可同时运行多个task。这些task以工作线程的形式存在于slot中。
TaskManager,Slot,Task和并行度parallelism的关系如下图所示(引用官网的图):
Flink Slot
Slot 相关的一些类
SchedulerImpl
SchedulerImpl负责为Execution节点的任务执行分配slot。
在后面的分析中涉及到的SchedulerImpl两个最重要的方法为allocateSlot和allocateBatchSlot。这两个方法的逻辑基本相同,只是前一个方法参数中多了分配slot超时时间。
具体分配slot的流程较为复杂,在后面分析slot申请流程的时候再讲解。
SlotSharingManager
SlotSharingManager负责Slot共享。Slot共享指的是不同的task在同一个slot中运行。
SlotSharingManager维护了一个slot层级结构:其中根节点和层级结构的中间节点为MultiTaskSlot。MultiTaskSlot可从属于另一个MultiTaskSlot,同时它又包含多个MultiTaskSlot或SingleTaskSlot,这样就形成了层级结构。SingleTaskSlot是slot层级结构中的最底层节点,只能拥有一个parent作为它的父节点。
Slot共享正是通过这种层级结构体现出来的。一个Slot被多个task共享,以Slot层级结构表示就是一个MultiTaskSlot包含多个SingleTaskSlot。
下面我们分析下几个重要的方法。
createRootSlot
创建一个根节点slot,该Slot的类型为MultiTaskSlot。
@Nonnull
MultiTaskSlot createRootSlot(
SlotRequestId slotRequestId,
CompletableFuture<? extends SlotContext> slotContextFuture,
SlotRequestId allocatedSlotRequestId) {
LOG.debug("Create multi task slot [{}] in slot [{}].", slotRequestId, allocatedSlotRequestId);
final CompletableFuture<SlotContext> slotContextFutureAfterRootSlotResolution = new CompletableFuture<>();
// 创建一个根节点
// 这个方法同时将创建出的MultiTaskSlot存入到allTaskSlots和unresolvedRootSlots集合中
final MultiTaskSlot rootMultiTaskSlot = createAndRegisterRootSlot(
slotRequestId,
allocatedSlotRequestId,
slotContextFutureAfterRootSlotResolution);
// 当slotContextFuture完成后执行
// slotContextFuture是向SlotPool申请slot的过程
// 这个future在SlotPoolImpl的tryFulfillSlotRequestOrMakeAvailable方法中complete
FutureUtils.forward(
slotContextFuture.thenApply(
(SlotContext slotContext) -> {
// add the root node to the set of resolved root nodes once the SlotContext future has
// been completed and we know the slot's TaskManagerLocation
// 此时slot已经分配完毕,将该slot从unresolvedRootSlots集合移除
// 存入到resolvedRootSlots集合中
tryMarkSlotAsResolved(slotRequestId, slotContext);
return slotContext;
}),
slotContextFutureAfterRootSlotResolution);
return rootMultiTaskSlot;
}
SlotPool
SlotPool用于缓存slot。它接收ExecutionGraph发起的slot申请,为其分配执行任务所需的slot。如果SlotPool无法处理slot请求,他会尝试去连接ResourceManager获取新的slot。如果ResourceManager目前状态不可用,被ResourceManager拒绝或者是请求超时,则slot申请失败。SlotPool缓存了一部分slot,在ResourceManager不可用的时候,SlotPool仍然可以提供已注册的空闲slot。这些Slot只会在它们不再被使用的时候释放掉。比如说作业在运行但仍有空闲slot这种情况。
启动方法
SlotPool在JobMaster的startJobMasterServices中启动。该方法中注册了两个周期任务:检测空闲的slot和批量检测超时的slot
public void start(
@Nonnull JobMasterId jobMasterId,
@Nonnull String newJobManagerAddress,
@Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) throws Exception {
this.jobMasterId = jobMasterId;
this.jobManagerAddress = newJobManagerAddress;
this.componentMainThreadExecutor = componentMainThreadExecutor;
scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
scheduleRunAsync(this::checkBatchSlotTimeout, batchSlotTimeout);
if (log.isDebugEnabled()) {
scheduleRunAsync(this::scheduledLogStatus, STATUS_LOG_INTERVAL_MS, TimeUnit.MILLISECONDS);
}
}
checkIdleSlot
该方法逻辑为SlotPool周期运行任务之一,用户定期检测空闲slot。
protected void checkIdleSlot() {
// The timestamp in SlotAndTimestamp is relative
// 获取当前时间
final long currentRelativeTimeMillis = clock.relativeTimeMillis();
// 创建用于保存空闲slot的集合
final List<AllocatedSlot> expiredSlots = new ArrayList<>(availableSlots.size());
// 遍历找出所有空闲的slot
for (SlotAndTimestamp slotAndTimestamp : availableSlots.availableSlots.values()) {
if (currentRelativeTimeMillis - slotAndTimestamp.timestamp > idleSlotTimeout.toMilliseconds()) {
expiredSlots.add(slotAndTimestamp.slot);
}
}
final FlinkException cause = new FlinkException("Releasing idle slot.");
for (AllocatedSlot expiredSlot : expiredSlots) {
// 获取每个过期slot的allocation ID
final AllocationID allocationID = expiredSlot.getAllocationId();
// 移除该allocation id对应的slot
if (availableSlots.tryRemove(allocationID) != null) {
log.info("Releasing idle slot [{}].", allocationID);
// RPC调用空闲slot所在的TaskManager,通知去释放掉这个slot
final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
allocationID,
cause,
rpcTimeout);
// RPC调用完成执行
// 如果释放slot出现异常,废弃掉这个slot,下次心跳的时候向taskManager同步slot状态
FutureUtils.whenCompleteAsyncIfNotDone(
freeSlotFuture,
componentMainThreadExecutor,
(Acknowledge ignored, Throwable throwable) -> {
if (throwable != null) {
// The slot status will be synced to task manager in next heartbeat.
log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. Discarding slot.",
allocationID, expiredSlot.getTaskManagerId(), throwable);
}
});
}
}
// 安排下一次调用时间,实现周期调用
scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
}
checkBatchSlotTimeout
protected void checkBatchSlotTimeout() {
// 如果没开启批量超时检测,方法直接返回
if (!batchSlotRequestTimeoutCheckEnabled) {
return;
}
final Collection<PendingRequest> pendingBatchRequests = getPendingBatchRequests();
if (!pendingBatchRequests.isEmpty()) {
// 获取积压的slot请求
final Set<ResourceProfile> allocatedResourceProfiles = getAllocatedResourceProfiles();
//将这些slot申请按照资源要求进行分组,和已分配过的slot的资源要求相同的分为一组,其余的在另一组
final Map<Boolean, List<PendingRequest>> fulfillableAndUnfulfillableRequests = pendingBatchRequests
.stream()
.collect(Collectors.partitioningBy(canBeFulfilledWithAllocatedSlot(allocatedResourceProfiles)));
// 提取出资源要求相同和不同的两组积压的slot请求
final List<PendingRequest> fulfillableRequests = fulfillableAndUnfulfillableRequests.get(true);
final List<PendingRequest> unfulfillableRequests = fulfillableAndUnfulfillableRequests.get(false);
final long currentTimestamp = clock.relativeTimeMillis();
// 标记为可满足要求
for (PendingRequest fulfillableRequest : fulfillableRequests) {
fulfillableRequest.markFulfillable();
}
for (PendingRequest unfulfillableRequest : unfulfillableRequests) {
// 更新请求为无法满足,并设置时间
unfulfillableRequest.markUnfulfillable(currentTimestamp);
if (unfulfillableRequest.getUnfulfillableSince() + batchSlotTimeout.toMilliseconds() <= currentTimestamp) {
// 如果请求已超时,调用超时处理逻辑,后面分析
timeoutPendingSlotRequest(unfulfillableRequest.getSlotRequestId());
}
}
}
// 安排下一次调用时间,实现周期调用
scheduleRunAsync(this::checkBatchSlotTimeout, batchSlotTimeout);
}
// 处理超时积压请求的方法
protected void timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
log.info("Pending slot request [{}] timed out.", slotRequestId);
// 从waitingForResourceManager和pendingRequests中移除这个request
final PendingRequest pendingRequest = removePendingRequest(slotRequestId);
// 异步抛出请求超时异常
if (pendingRequest != null) {
pendingRequest
.getAllocatedSlotFuture()
.completeExceptionally(new TimeoutException("Pending slot request timed out in SlotPool."));
}
}
ResourceManager
和JobManager,TaskManager一样,ResourceManager也是Flink中的一个重要角色。ResourceManager负责资源的分配和撤回,以及资源的登记保管。ResourceManager具有HA功能,可参与选主。ResourceManager还持有JobManager的连接。后来创建出的TaskManager可以通过registerTaskExecutor方法注册到ResourceManager中。
ResourceManager中最为重要的成员为SlotManager。可用的slot交由SlotManager维护。
ResourceManager本身是一个抽象类。它有两个子类
- StandaloneResourceManager:用于standalone模式部署的时候。
- ActiveResourceManager:用于非standalone模式。其中有一个成员变量
ResourceManagerDriver。ResourceManagerDriver有多个子类,分别对应着支持Kubernetes, Mesos和Yarn。
SlotManager
SlotManager负责维护所有已注册的slot。SlotManager统计了所有的已注册slot,空闲的slot,积压待分配的slot(pendingSlot),积压的slot请求(pendingSlotRequest)以及以满足的slot请求。
TaskSlotTable
Task Manager上的slot和task的分配表,是TaskSlot的容器。它维护了多个索引,用于快速访问task和分配给它的slot。
下面我们分析下它的主要方法。
start 方法
在使用TaskSlotTable之前必须先启动它。启动方法为start如下所示:
start方法:
@Override
public void start(SlotActions initialSlotActions, ComponentMainThreadExecutor mainThreadExecutor) {
// 检查状态,必须为CREATED
Preconditions.checkState(
state == State.CREATED,
"The %s has to be just created before starting",
TaskSlotTableImpl.class.getSimpleName());
// 设置slotAction,下面分析
this.slotActions = Preconditions.checkNotNull(initialSlotActions);
// 设置主线程执行器
this.mainThreadExecutor = Preconditions.checkNotNull(mainThreadExecutor);
// 一个定时任务,可以schedule多个事件,到期时通知对应的timeout listener
timerService.start(this);
// 改变状态为RUNNING,防止反复启动
state = State.RUNNING;
}
SlotAction
SlotAction包含了Slot分配动作的回调逻辑。该接口包含了两个回调动作:
- freeSlot:slot被释放的时候回调
- timeoutSlot:slot超时的时候回调
接口代码如下:
public interface SlotActions {
/**
* Free the task slot with the given allocation id.
*
* @param allocationId to identify the slot to be freed
*/
void freeSlot(AllocationID allocationId);
/**
* Timeout the task slot for the given allocation id. The timeout is identified by the given
* ticket to filter invalid timeouts out.
*
* @param allocationId identifying the task slot to be timed out
* @param ticket allowing to filter invalid timeouts out
*/
void timeoutSlot(AllocationID allocationId, UUID ticket);
}
其中AllocationID是JobManager通过ResourceManager分配的物理Slot对应的唯一标识。在JobManager第一次请求的时候指定,重试的时候保持不变。这个ID用于TaskManager和ResourceManager追踪和同步slot的分配状态。和SlotRequestId不同的是,task从SlotPool中请求逻辑slot的时候使用SlotRequestId。由于存在slot共享的缘故,多个逻辑slot的请求可能映射到同一个物理slot请求。
SlotAction唯一的实现类是SlotActionsImpl,位于TaskExecutor.java中。稍后用到的时候在分析它。
allocateSlot 方法
为指定的job分配一个slot,使用指定的index。如果index为负数则使用自增的index。如果slot可以分配,返回true。
@Override
public boolean allocateSlot(
int index,
JobID jobId,
AllocationID allocationId,
ResourceProfile resourceProfile,
Time slotTimeout) {
// 检查TaskSlotTable的状态是否为RUNNING
checkRunning();
Preconditions.checkArgument(index < numberSlots);
// 检查这个allocation id是否已经分配过slot
// 如果分配过,直接返回
TaskSlot<T> taskSlot = allocatedSlots.get(allocationId);
if (taskSlot != null) {
LOG.info("Allocation ID {} is already allocated in {}.", allocationId, taskSlot);
return false;
}
// 如果taskSlots列表包含这个index
if (taskSlots.containsKey(index)) {
// 获取这个重复的taskslot
TaskSlot<T> duplicatedTaskSlot = taskSlots.get(index);
LOG.info("Slot with index {} already exist, with resource profile {}, job id {} and allocation id {}.",
index,
duplicatedTaskSlot.getResourceProfile(),
duplicatedTaskSlot.getJobId(),
duplicatedTaskSlot.getAllocationId());
// 只有在这个重复的taskslot的job id和allocation id相同的情况下,才允许分配
return duplicatedTaskSlot.getJobId().equals(jobId) &&
duplicatedTaskSlot.getAllocationId().equals(allocationId);
} else if (allocatedSlots.containsKey(allocationId)) {
// 如果allocation id已经分配过slot,返回true
// 这里有疑问,上面已经检测过是否已分配,不太可能进入这个分支
return true;
}
// 如果index大于等于0,使用默认的ResourceProfile,否则使用方法传入的resourceProfile
resourceProfile = index >= 0 ? defaultSlotResourceProfile : resourceProfile;
// 检查是否还能够分配出满足条件的资源
if (!budgetManager.reserve(resourceProfile)) {
LOG.info("Cannot allocate the requested resources. Trying to allocate {}, "
+ "while the currently remaining available resources are {}, total is {}.",
resourceProfile,
budgetManager.getAvailableBudget(),
budgetManager.getTotalBudget());
return false;
}
// 创建一个新的TaskSlot
taskSlot = new TaskSlot<>(index, resourceProfile, memoryPageSize, jobId, allocationId, memoryVerificationExecutor);
if (index >= 0) {
// 存入taskSlots集合
taskSlots.put(index, taskSlot);
}
// update the allocation id to task slot map
// 加入到已分配slot的集合中
allocatedSlots.put(allocationId, taskSlot);
// register a timeout for this slot since it's in state allocated
// 注册slot的超时时间定时器,在slot超时后会调用超时处理逻辑
timerService.registerTimeout(allocationId, slotTimeout.getSize(), slotTimeout.getUnit());
// add this slot to the set of job slots
// 将slot和job id关联起来
Set<AllocationID> slots = slotsPerJob.get(jobId);
if (slots == null) {
slots = new HashSet<>(4);
slotsPerJob.put(jobId, slots);
}
slots.add(allocationId);
return true;
}
addTask
将task添加到slot中,通过allocation id匹配。
@Override
public boolean addTask(T task) throws SlotNotFoundException, SlotNotActiveException {
// 检查TaskSlotTable是否在运行
checkRunning();
Preconditions.checkNotNull(task);
// 从allocatedSlots集合获取taskSlot
TaskSlot<T> taskSlot = getTaskSlot(task.getAllocationId());
if (taskSlot != null) {
// 如果taskSlot在运行状态,job id和allocation id与task的相同
if (taskSlot.isActive(task.getJobID(), task.getAllocationId())) {
// 将task指定给taskslot,并且设定映射关系
if (taskSlot.add(task)) {
taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping<>(task, taskSlot));
return true;
} else {
return false;
}
} else {
throw new SlotNotActiveException(task.getJobID(), task.getAllocationId());
}
} else {
throw new SlotNotFoundException(task.getAllocationId());
}
}
createSlotReport
这个方法返回当前TaskManager中slot分配情况的报告。
返回的SlotReport是TaskExecutor中一系列slot状态的报告。
@Override
public SlotReport createSlotReport(ResourceID resourceId) {
List<SlotStatus> slotStatuses = new ArrayList<>();
// 获取固定分配的slot状态
for (int i = 0; i < numberSlots; i++) {
SlotID slotId = new SlotID(resourceId, i);
SlotStatus slotStatus;
if (taskSlots.containsKey(i)) {
TaskSlot<T> taskSlot = taskSlots.get(i);
slotStatus = new SlotStatus(
slotId,
taskSlot.getResourceProfile(),
taskSlot.getJobId(),
taskSlot.getAllocationId());
} else {
slotStatus = new SlotStatus(
slotId,
defaultSlotResourceProfile,
null,
null);
}
slotStatuses.add(slotStatus);
}
// 获取自动分配的slot状态
for (TaskSlot<T> taskSlot : allocatedSlots.values()) {
// slot id小于0表示该slot是动态分配的
if (taskSlot.getIndex() < 0) {
SlotID slotID = SlotID.generateDynamicSlotID(resourceId);
SlotStatus slotStatus = new SlotStatus(
slotID,
taskSlot.getResourceProfile(),
taskSlot.getJobId(),
taskSlot.getAllocationId());
slotStatuses.add(slotStatus);
}
}
final SlotReport slotReport = new SlotReport(slotStatuses);
return slotReport;
}
TaskSlot
TaskSlot是TaskSlotTable中维护的Slot的类型的包装类。TaskSlot是一个容器,内部有一个tasks变量,负责维护属于同一个slot的所有tasks。
TaskSlot 的成员变量
/** Index of the task slot. */
// taskSlot的索引
// 小于0的值表示动态分配
private final int index;
/** Resource characteristics for this slot. */
private final ResourceProfile resourceProfile;
/** Tasks running in this slot. */
private final Map<ExecutionAttemptID, T> tasks;
private final MemoryManager memoryManager;
/** State of this slot. */
private TaskSlotState state;
/** Job id to which the slot has been allocated. */
private final JobID jobId;
/** Allocation id of this slot. */
private final AllocationID allocationId;
/** The closing future is completed when the slot is freed and closed. */
private final CompletableFuture<Void> closingFuture;
ResourceProfile
ResourceProfile是Slot资源需求的一个包装类。它的所有字段都是final类型,一旦创建后不可再修改。
ResourceProfile的主要成员变量如下所示:
/** How many cpu cores are needed. Can be null only if it is unknown. */
// CPU核心数
@Nullable
private final Resource cpuCores;
/** How much task heap memory is needed. */
// task堆内存
@Nullable // can be null only for UNKNOWN
private final MemorySize taskHeapMemory;
/** How much task off-heap memory is needed. */
// task堆外内存
@Nullable // can be null only for UNKNOWN
private final MemorySize taskOffHeapMemory;
/** How much managed memory is needed. */
// 管理内存
@Nullable // can be null only for UNKNOWN
private final MemorySize managedMemory;
/** How much network memory is needed. */
// 网络传输缓存
@Nullable // can be null only for UNKNOWN
private final MemorySize networkMemory;
/** A extensible field for user specified resources from {@link ResourceSpec}. */
// 其他类型的资源,在Resource中指定
private final Map<String, Resource> extendedResources = new HashMap<>(1);
AllocationID
AllocationID是JobManager通过ResourceManger申请物理slot时的唯一标识。它在SlotPoolImpl的requestSlotFromResourceManager方法中创建并确定下来,以后即便是请求重试,AllocationID也不会再改变。
调用流程
Flink Slot分配全过程涉及到的几个重点类的调用流程如下图。
调用流程
Slot 申请流程
Slot申请流程我们从ExecutionGraph分配资源开始分析,一路跟踪,直到TaskExecutor中创建出slot。
我们从JobManager分配资源的入口开始逐个分析调用流程。
Execution 的 allocateAndAssignSlotForExecution
该方法为ExecutionGraph中的一个顶点vertex分配其执行所需的slot。
private CompletableFuture<LogicalSlot> allocateAndAssignSlotForExecution(
SlotProviderStrategy slotProviderStrategy,
LocationPreferenceConstraint locationPreferenceConstraint,
@Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) {
checkNotNull(slotProviderStrategy);
// 检测必须在JobMaster的主线程执行
assertRunningInJobMasterMainThread();
// 获取ExecutionGraph任务定点的slot共享组配置
// 在slotSharingGroup是软限制,位于同一个slotSharingGroup的task可在同一个slot中运行
final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
// CoLocationConstraint管理task的执行位置
final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();
// this method only works if the execution is in the state 'CREATED'
// 从CREATE状态变更为SCHEDULED状态,只有初始为CREATED状态时才返回true
if (transitionState(CREATED, SCHEDULED)) {
// 获取SlotSharingGroup的ID
final SlotSharingGroupId slotSharingGroupId = sharingGroup.getSlotSharingGroupId();
// 构建调度单元,将Execution vertex,slotSharingGroup和locationConstraint封装在一起
ScheduledUnit toSchedule = locationConstraint == null ?
new ScheduledUnit(this, slotSharingGroupId) :
new ScheduledUnit(this, slotSharingGroupId, locationConstraint);
// try to extract previous allocation ids, if applicable, so that we can reschedule to the same slot
ExecutionVertex executionVertex = getVertex();
// 获取最近一次执行分配的allocation id
AllocationID lastAllocation = executionVertex.getLatestPriorAllocation();
// allocation id装入集合中
Collection<AllocationID> previousAllocationIDs =
lastAllocation != null ? Collections.singletonList(lastAllocation) : Collections.emptyList();
// calculate the preferred locations
// 计算task首选运行位置,在哪些task manager
final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture =
calculatePreferredLocations(locationPreferenceConstraint);
final SlotRequestId slotRequestId = new SlotRequestId();
final CompletableFuture<LogicalSlot> logicalSlotFuture =
preferredLocationsFuture.thenCompose(
(Collection<TaskManagerLocation> preferredLocations) -> {
LOG.info("Allocating slot with SlotRequestID {} for the execution attempt {}.", slotRequestId, attemptId);
// 组合上一个CompletableFuture
// 等task执行位置计算完毕后,调用SlotProviderStrategy(slot供给策略)的分配slot逻辑
return slotProviderStrategy.allocateSlot(
slotRequestId,
toSchedule,
SlotProfile.priorAllocation(
vertex.getResourceProfile(),
getPhysicalSlotResourceProfile(vertex),
preferredLocations,
previousAllocationIDs,
allPreviousExecutionGraphAllocationIds));
});
// register call back to cancel slot request in case that the execution gets canceled
// 当分配的资源被回收的时候调用
releaseFuture.whenComplete(
(Object ignored, Throwable throwable) -> {
// 如果slot请求取消
// 调用取消Slot请求的逻辑
if (logicalSlotFuture.cancel(false)) {
slotProviderStrategy.cancelSlotRequest(
slotRequestId,
slotSharingGroupId,
new FlinkException("Execution " + this + " was released."));
}
});
// This forces calls to the slot pool back into the main thread, for normal and exceptional completion
// 将携带了LogicalSlot的future返回
return logicalSlotFuture.handle(
(LogicalSlot logicalSlot, Throwable failure) -> {
if (failure != null) {
throw new CompletionException(failure);
}
// 如果logicalSlot可以分配给execution,返回true
if (tryAssignResource(logicalSlot)) {
return logicalSlot;
} else {
// release the slot
// 如果无法分配,释放掉这个slot
logicalSlot.releaseSlot(new FlinkException("Could not assign logical slot to execution " + this + '.'));
throw new CompletionException(
new FlinkException(
"Could not assign slot " + logicalSlot + " to execution " + this + " because it has already been assigned "));
}
});
} else {
// call race, already deployed, or already done
throw new IllegalExecutionStateException(this, CREATED, state);
}
}
这里涉及到两个类:SlotSharingGroup和CoLocationConstraint。
其中SlotSharingGroup是slot共享的软限制。group id相同的Execution Vertex可以被调度到同一个slot中执行。它包含3个成员变量:
// 保存属于这个group的execution vertex
private final Set<JobVertexID> ids = new TreeSet<>();
// group id,由long类型的lowerPart和upperPart构成
private final SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
// 组内所有task的资源需求
private ResourceSpec resourceSpec = ResourceSpec.ZERO;
相对于SlotSharingGroup而言,CoLocationConstraint是slot共享的硬限制。CoLocationConstraint规定了task(execution顶点)在哪里执行。CoLocationConstraint将ColocationGroup和TaskManagerLocation绑定在一起,属于同一个ColocationGroup的task都在指定的TaskManager中运行。ColocationGroup持有一系列JobVertex的集合。这里就不在贴出代码了。
接着我们重点跟踪SlotProviderStrategy的allocateSlot方法。
SlotProviderStrategy具有两个子类:
- BatchSlotProviderStrategy:不指定分配slot操作的超时时间
- NormalSlotProviderStrategy:指定分配slot操作的超时时间,除此之外其他逻辑和
BatchSlotProviderStrategy一模一样
以NormalSlotProviderStrategy为例,它的allocateSlot方法调用了SchedulerImpl的allocateSlot。一路追踪调用:allocateSlot -> allocateSlotInternal -> internalAllocateSlot -> allocateSharedSlot。
SchedulerImpl 的 allocateSharedSlot
private CompletableFuture<LogicalSlot> allocateSharedSlot(
SlotRequestId slotRequestId,
ScheduledUnit scheduledUnit,
SlotProfile slotProfile,
@Nullable Time allocationTimeout) {
// allocate slot with slot sharing
// 构建一个SlotSharingManager
// 负责管理slot共享。slot共享允许同一个slot运行不同的任务
final SlotSharingManager multiTaskSlotManager = slotSharingManagers.computeIfAbsent(
scheduledUnit.getSlotSharingGroupId(),
id -> new SlotSharingManager(
id,
slotPool,
this));
// MultiTaskSlotLocality为MultiTaskSlot和Locality的封装
final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality;
try {
// 判断是否有colocation限制,调用不同的分配多任务slot方法
if (scheduledUnit.getCoLocationConstraint() != null) {
multiTaskSlotLocality = allocateCoLocatedMultiTaskSlot(
scheduledUnit.getCoLocationConstraint(),
multiTaskSlotManager,
slotProfile,
allocationTimeout);
} else {
multiTaskSlotLocality = allocateMultiTaskSlot(
scheduledUnit.getJobVertexId(),
multiTaskSlotManager,
slotProfile,
allocationTimeout);
}
} catch (NoResourceAvailableException noResourceException) {
return FutureUtils.completedExceptionally(noResourceException);
}
// sanity check
// 检查这个multiTaskSlotLocality对象的MultiTaskSlot或者是其子slot需要包含jobVertex id
Preconditions.checkState(!multiTaskSlotLocality.getMultiTaskSlot().contains(scheduledUnit.getJobVertexId()));
// 在这个MultiTaskSlot下分配一个SingleTaskSlot
final SlotSharingManager.SingleTaskSlot leaf = multiTaskSlotLocality.getMultiTaskSlot().allocateSingleTaskSlot(
slotRequestId,
slotProfile.getTaskResourceProfile(),
scheduledUnit.getJobVertexId(),
multiTaskSlotLocality.getLocality());
return leaf.getLogicalSlotFuture();
}
allocateCoLocatedMultiTaskSlot
该方法分配具有colocation限制的MultiTaskSlot。
在分析这个方法之前我们要先了解下Locality这个枚举,它表示task需要如何调度执行。各个值的解释如下:
- UNCONSTRAINED:没有限制task调度到何处
- LOCAL:task分配到同一个TaskManager中
- HOST_LOCAL:task分配到同一个主机上
- NON_LOCAL:task分配到除了locality偏好之外的地方
- UNKNOWN:未知
下面是allocateCoLocatedMultiTaskSlot方法的代码和分析:
private SlotSharingManager.MultiTaskSlotLocality allocateCoLocatedMultiTaskSlot(
CoLocationConstraint coLocationConstraint,
SlotSharingManager multiTaskSlotManager,
SlotProfile slotProfile,
@Nullable Time allocationTimeout) throws NoResourceAvailableException {
final SlotRequestId coLocationSlotRequestId = coLocationConstraint.getSlotRequestId();
if (coLocationSlotRequestId != null) {
// we have a slot assigned --> try to retrieve it
// 获取SlotSharingManager中slot request id对应的taskSlot
final SlotSharingManager.TaskSlot taskSlot = multiTaskSlotManager.getTaskSlot(coLocationSlotRequestId);
if (taskSlot != null) {
// 检查这个slot必须是MultiTaskSlot
Preconditions.checkState(taskSlot instanceof SlotSharingManager.MultiTaskSlot);
SlotSharingManager.MultiTaskSlot multiTaskSlot = (SlotSharingManager.MultiTaskSlot) taskSlot;
// 如果这个MultiTaskSlot持有的资源满足slotProfile的要求,返回这个slot,模式为在同一TM运行
if (multiTaskSlot.mayHaveEnoughResourcesToFulfill(slotProfile.getTaskResourceProfile())) {
return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL);
}
// 否则抛出异常,资源不足
throw new NoResourceAvailableException("Not enough resources in the slot for all co-located tasks.");
} else {
// the slot may have been cancelled in the mean time
// 执行这个方法的时候可能slot被取消,因此增加这个逻辑
coLocationConstraint.setSlotRequestId(null);
}
}
// 如果这个constraint的运行位置已经指定
if (coLocationConstraint.isAssigned()) {
// refine the preferred locations of the slot profile
// 更新slot profile,加入首选的运行位置(TaskManager位置)
slotProfile = SlotProfile.priorAllocation(
slotProfile.getTaskResourceProfile(),
slotProfile.getPhysicalSlotResourceProfile(),
Collections.singleton(coLocationConstraint.getLocation()),
slotProfile.getPreferredAllocations(),
slotProfile.getPreviousExecutionGraphAllocations());
}
// get a new multi task slot
// 前面逻辑已经判断过,如果之前没有申请过slot,在这里分配一个
SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = allocateMultiTaskSlot(
coLocationConstraint.getGroupId(),
multiTaskSlotManager,
slotProfile,
allocationTimeout);
// check whether we fulfill the co-location constraint
// 检查constraint状态和能否在同一个TM运行
if (coLocationConstraint.isAssigned() && multiTaskSlotLocality.getLocality() != Locality.LOCAL) {
// 如果不能,不符合限制要求,释放掉这个slot
multiTaskSlotLocality.getMultiTaskSlot().release(
new FlinkException("Multi task slot is not local and, thus, does not fulfill the co-location constraint."));
// 抛出资源不足异常
throw new NoResourceAvailableException("Could not allocate a local multi task slot for the " +
"co location constraint " + coLocationConstraint + '.');
}
// 为这个MultiTaskSlot分配一个子MultiTaskSlot
final SlotRequestId slotRequestId = new SlotRequestId();
final SlotSharingManager.MultiTaskSlot coLocationSlot =
multiTaskSlotLocality.getMultiTaskSlot().allocateMultiTaskSlot(
slotRequestId,
coLocationConstraint.getGroupId());
// mark the requested slot as co-located slot for other co-located tasks
// 将coLocationConstraint和slot request关联起来,表示这个slot是具有运行位置限制的slot
coLocationConstraint.setSlotRequestId(slotRequestId);
// lock the co-location constraint once we have obtained the allocated slot
// slot分配完毕之后执行
coLocationSlot.getSlotContextFuture().whenComplete(
(SlotContext slotContext, Throwable throwable) -> {
if (throwable == null) {
// check whether we are still assigned to the co-location constraint
// 如果没有异常,绑定coLocationConstraint的位置限制
if (Objects.equals(coLocationConstraint.getSlotRequestId(), slotRequestId)) {
coLocationConstraint.lockLocation(slotContext.getTaskManagerLocation());
} else {
log.debug("Failed to lock colocation constraint {} because assigned slot " +
"request {} differs from fulfilled slot request {}.",
coLocationConstraint.getGroupId(),
coLocationConstraint.getSlotRequestId(),
slotRequestId);
}
} else {
log.debug("Failed to lock colocation constraint {} because the slot " +
"allocation for slot request {} failed.",
coLocationConstraint.getGroupId(),
coLocationConstraint.getSlotRequestId(),
throwable);
}
});
return SlotSharingManager.MultiTaskSlotLocality.of(coLocationSlot, multiTaskSlotLocality.getLocality());
}
接下来我们分析下MultiTaskSlot是怎么分配出来的。
allocateMultiTaskSlot
private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot(
AbstractID groupId,
SlotSharingManager slotSharingManager,
SlotProfile slotProfile,
@Nullable Time allocationTimeout) {
// 返回所有根slot的信息
Collection<SlotSelectionStrategy.SlotInfoAndResources> resolvedRootSlotsInfo =
slotSharingManager.listResolvedRootSlotInfo(groupId);
// 根据slot选择策略,从SlotSharingManager中选择出一个最适合的根slot
SlotSelectionStrategy.SlotInfoAndLocality bestResolvedRootSlotWithLocality =
slotSelectionStrategy.selectBestSlotForProfile(resolvedRootSlotsInfo, slotProfile).orElse(null);
// 将这个选择出的slot包装为MultiTaskSlotLocality
final SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = bestResolvedRootSlotWithLocality != null ?
new SlotSharingManager.MultiTaskSlotLocality(
slotSharingManager.getResolvedRootSlot(bestResolvedRootSlotWithLocality.getSlotInfo()),
bestResolvedRootSlotWithLocality.getLocality()) :
null;
// 如果这个slot资源充足,可以LOCAL模式运行,返回这个multiTaskSlotLocality
if (multiTaskSlotLocality != null && multiTaskSlotLocality.getLocality() == Locality.LOCAL) {
return multiTaskSlotLocality;
}
final SlotRequestId allocatedSlotRequestId = new SlotRequestId();
final SlotRequestId multiTaskSlotRequestId = new SlotRequestId();
// 尝试从SlotPool中查找一个最合适的slot
Optional<SlotAndLocality> optionalPoolSlotAndLocality = tryAllocateFromAvailable(allocatedSlotRequestId, slotProfile);
// 如果找到了
if (optionalPoolSlotAndLocality.isPresent()) {
SlotAndLocality poolSlotAndLocality = optionalPoolSlotAndLocality.get();
// 校验下如果这个slot资源充足,并且在SlotSharingManager中没有找到最合适slot
if (poolSlotAndLocality.getLocality() == Locality.LOCAL || bestResolvedRootSlotWithLocality == null) {
final PhysicalSlot allocatedSlot = poolSlotAndLocality.getSlot();
// 在SlotSharingManager中创建这个slot对应的MultiTaskSlot
final SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.createRootSlot(
multiTaskSlotRequestId,
CompletableFuture.completedFuture(poolSlotAndLocality.getSlot()),
allocatedSlotRequestId);
// 将multiTaskSlot加入到allocatedSlot的负载中
if (allocatedSlot.tryAssignPayload(multiTaskSlot)) {
return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, poolSlotAndLocality.getLocality());
} else {
multiTaskSlot.release(new FlinkException("Could not assign payload to allocated slot " +
allocatedSlot.getAllocationId() + '.'));
}
}
}
if (multiTaskSlotLocality != null) {
// prefer slot sharing group slots over unused slots
// 如果在SlotSharingManager和SlotPool都找到了匹配的slot,优先使用SlotSharingManager中的
// 将SlotPool中的匹配slot释放掉
if (optionalPoolSlotAndLocality.isPresent()) {
slotPool.releaseSlot(
allocatedSlotRequestId,
new FlinkException("Locality constraint is not better fulfilled by allocated slot."));
}
return multiTaskSlotLocality;
}
// there is no slot immediately available --> check first for uncompleted slots at the slot sharing group
// 到这里说明目前没有可用的slot,从unresolvedRootSlots中获取一个尚未分配的slot
SlotSharingManager.MultiTaskSlot multiTaskSlot = slotSharingManager.getUnresolvedRootSlot(groupId);
// 如果没有找到
if (multiTaskSlot == null) {
// it seems as if we have to request a new slot from the resource manager, this is always the last resort!!!
// 到这里意味着我们必须去ResourceManager申请一个新的slot
final CompletableFuture<PhysicalSlot> slotAllocationFuture = requestNewAllocatedSlot(
allocatedSlotRequestId,
slotProfile,
allocationTimeout);
// 在SlotSharingManager中创建一个root slot
multiTaskSlot = slotSharingManager.createRootSlot(
multiTaskSlotRequestId,
slotAllocationFuture,
allocatedSlotRequestId);
// 设定分配成功之后的逻辑
slotAllocationFuture.whenComplete(
(PhysicalSlot allocatedSlot, Throwable throwable) -> {
final SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(multiTaskSlotRequestId);
if (taskSlot != null) {
// still valid
// 遇到异常的时候,释放掉slot
if (!(taskSlot instanceof SlotSharingManager.MultiTaskSlot) || throwable != null) {
taskSlot.release(throwable);
} else {
if (!allocatedSlot.tryAssignPayload(((SlotSharingManager.MultiTaskSlot) taskSlot))) {
taskSlot.release(new FlinkException("Could not assign payload to allocated slot " +
allocatedSlot.getAllocationId() + '.'));
}
}
} else {
slotPool.releaseSlot(
allocatedSlotRequestId,
new FlinkException("Could not find task slot with " + multiTaskSlotRequestId + '.'));
}
});
}
return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.UNKNOWN);
}
这里我们顺便深入下SlotSelectionStrategy。它是一系列挑选最匹配slot的子类共同的接口。它的子类如下:
- DefaultLocationPreferenceSlotSelectionStrategy:从待选slot集合中找到第一个返回。
- EvenlySpreadOutLocationPreferenceSlotSelectionStrategy:找到所有匹配slot中所在TM资源使用率最低的返回。
- PreviousAllocationSlotSelectionStrategy:先使用SlotProfile中指定的首选运行位置,如果没有,再使用其他Slot选择策略。
下面我们开始分析SchedulerImpl的requestNewAllocatedSlot,即请求分配新的slot。
@Nonnull
private CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
SlotRequestId slotRequestId,
SlotProfile slotProfile,
@Nullable Time allocationTimeout) {
if (allocationTimeout == null) {
return slotPool.requestNewAllocatedBatchSlot(slotRequestId, slotProfile.getPhysicalSlotResourceProfile());
} else {
return slotPool.requestNewAllocatedSlot(slotRequestId, slotProfile.getPhysicalSlotResourceProfile(), allocationTimeout);
}
}
这个方法根据是否指定了分配超时时间来调用SlotPool的对应方法。requestNewAllocatedBatchSlot和requestNewAllocatedSlot逻辑基本相同,只是后者增加了超时检测逻辑。我们选择最为复杂的requestNewAllocatedSlot方法分析。
SlotPoolImpl的requestNewAllocatedSlot方法如下所示:
@Nonnull
@Override
public CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(
@Nonnull SlotRequestId slotRequestId,
@Nonnull ResourceProfile resourceProfile,
@Nullable Time timeout) {
// 检查方法在主线程执行
componentMainThreadExecutor.assertRunningInMainThread();
// 创建一个Slot请求对象
// SlotPool先将Slot请求缓存起来,当TaskManager获取slot的时候才会真正创建
final PendingRequest pendingRequest = PendingRequest.createStreamingRequest(slotRequestId, resourceProfile);
// 如果传入了超时时间,注册超时处理
if (timeout != null) {
// register request timeout
FutureUtils
.orTimeout(
pendingRequest.getAllocatedSlotFuture(),
timeout.toMilliseconds(),
TimeUnit.MILLISECONDS,
componentMainThreadExecutor)
.whenComplete(
(AllocatedSlot ignored, Throwable throwable) -> {
if (throwable instanceof TimeoutException) {
timeoutPendingSlotRequest(slotRequestId);
}
});
}
// 调用requestNewAllocatedSlotInternal请求新slot
return requestNewAllocatedSlotInternal(pendingRequest)
.thenApply((Function.identity()));
}
SlotPoolImpl的requestNewAllocatedSlotInternal方法如下所示。这个方法SlotPool请求ResourceManager来分配一个新的slot。
@Nonnull
private CompletableFuture<AllocatedSlot> requestNewAllocatedSlotInternal(PendingRequest pendingRequest) {
if (resourceManagerGateway == null) {
// 如果没有ResourceManager网关,先将请求入栈,放入到waitingForResourceManager中
// 这个LinkedHashMap保存了slot request id和slot request的对应关系
stashRequestWaitingForResourceManager(pendingRequest);
} else {
// 从ResourceManager请求slot
requestSlotFromResourceManager(resourceManagerGateway, pendingRequest);
}
return pendingRequest.getAllocatedSlotFuture();
}
我们重点分析下SlotPoolImpl的requestSlotFromResourceManager方法:
private void requestSlotFromResourceManager(
final ResourceManagerGateway resourceManagerGateway,
final PendingRequest pendingRequest) {
checkNotNull(resourceManagerGateway);
checkNotNull(pendingRequest);
// 创建一个allocationID,lowerPart和upperPart使用随机long
final AllocationID allocationId = new AllocationID();
// 为pendingRequest指定allocationID
pendingRequest.setAllocationId(allocationId);
// 放入pendingRequests集合
// 这是一个复合key map,分别使用slot request id和allocation id作为key
pendingRequests.put(pendingRequest.getSlotRequestId(), allocationId, pendingRequest);
// 指定slot分配完成时的操作
pendingRequest.getAllocatedSlotFuture().whenComplete(
(AllocatedSlot allocatedSlot, Throwable throwable) -> {
if (throwable != null) {
// the allocation id can be remapped so we need to get it from the pendingRequest
// where it will be updated timely
// 重新获取allocationID,因为这个id可能会在申请slot过程中改变
final Optional<AllocationID> updatedAllocationId = pendingRequest.getAllocationId();
// 处理出错逻辑,取消申请slot
if (updatedAllocationId.isPresent()) {
// cancel the slot request if there is a failure
resourceManagerGateway.cancelSlotRequest(updatedAllocationId.get());
}
}
});
log.info("Requesting new slot [{}] and profile {} with allocation id {} from resource manager.",
pendingRequest.getSlotRequestId(), pendingRequest.getResourceProfile(), allocationId);
// 向ResourceManager发送一个SlotRequest,请求slot
CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
jobMasterId,
new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
rpcTimeout);
// slot请求完毕后执行
FutureUtils.whenCompleteAsyncIfNotDone(
rmResponse,
componentMainThreadExecutor,
(Acknowledge ignored, Throwable failure) -> {
// on failure, fail the request future
if (failure != null) {
// 如果失败,调用失败处理逻辑,调用future的completeExceptionally方法
slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure);
}
});
}
ResourceManager请求slot的逻辑如下:
@Override
public CompletableFuture<Acknowledge> requestSlot(
JobMasterId jobMasterId,
SlotRequest slotRequest,
final Time timeout) {
JobID jobId = slotRequest.getJobId();
// 获取作业ID对应的JobManager
JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
if (null != jobManagerRegistration) {
if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
log.info("Request slot with profile {} for job {} with allocation id {}.",
slotRequest.getResourceProfile(),
slotRequest.getJobId(),
slotRequest.getAllocationId());
try {
// 注册slot申请给SlotManager
slotManager.registerSlotRequest(slotRequest);
} catch (ResourceManagerException e) {
return FutureUtils.completedExceptionally(e);
}
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
return FutureUtils.completedExceptionally(new ResourceManagerException("The job leader's id " +
jobManagerRegistration.getJobMasterId() + " does not match the received id " + jobMasterId + '.'));
}
} else {
return FutureUtils.completedExceptionally(new ResourceManagerException("Could not find registered job manager for job " + jobId + '.'));
}
}
分析到这里,我们得知ResourceManager最终将SlotRequest交给了内部的SlotManager来处理。
@Override
public boolean registerSlotRequest(SlotRequest slotRequest) throws ResourceManagerException {
// 检查状态是否为started
checkInit();
// 检查已满足的slot请求和积压的slot请求中有没有allocation id和下面方法参数相同的
// 如果重复,返回false
if (checkDuplicateRequest(slotRequest.getAllocationId())) {
LOG.debug("Ignoring a duplicate slot request with allocation id {}.", slotRequest.getAllocationId());
return false;
} else {
PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
// 包装slot并存入pendingSlotRequests集合
pendingSlotRequests.put(slotRequest.getAllocationId(), pendingSlotRequest);
try {
// 调用内部请求slot的方法,下面分析
internalRequestSlot(pendingSlotRequest);
} catch (ResourceManagerException e) {
// requesting the slot failed --> remove pending slot request
pendingSlotRequests.remove(slotRequest.getAllocationId());
throw new ResourceManagerException("Could not fulfill slot request " + slotRequest.getAllocationId() + '.', e);
}
return true;
}
}
我们继续跟踪internalRequestSlot方法。
internalRequestSlot
private void internalRequestSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
// 获取slotRequest的资源要求
final ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
OptionalConsumer.of(findMatchingSlot(resourceProfile))
.ifPresent(taskManagerSlot -> allocateSlot(taskManagerSlot, pendingSlotRequest))
.ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));
}
这里的findMatchingSlot方法通过slotMatchingStrategy在freeSlots集合中查找出资源需求匹配的slot,确保匹配的slot状态为SlotState.FREE,将其从freeSlots集合中剔除后返回。
Flink把查找匹配slot的逻辑封装为slotMatchingStrategy,它有两个子类:
-
AnyMatchingSlotMatchingStrategy:找到第一个匹配的slot,只要发现的slot持有的资源大于资源需求就返回。 -
LeastUtilizationSlotMatchingStrategy:在前一个策略的基础上,还会计算每个TaskExecutor的slot利用率,将利用率最低的TaskExecutor上的slot返回。
通过findMatchingSlot方法,如果找到了匹配的slot,调用allocateSlot方法,通知TaskExecutor分配slot。如果没有匹配到,调用fulfillPendingSlotRequestWithPendingTaskManagerSlot。
fulfillPendingSlotRequestWithPendingTaskManagerSlot
该方法将根据所需资源(pendingSlotRequest.getResourceProfile()),创建出PendingTaskManagerSlot放入到pendingSlot中保存。这些处于pending状态的slot在registerTaskManager的时候会被注册(registerSlot)。在这个时候,pending slot才会被真正的分配出来,在对应的TaskExecutor中创建。
private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequest pendingSlotRequest) throws ResourceManagerException {
// 获取PendingSlotRequest的资源要求
ResourceProfile resourceProfile = pendingSlotRequest.getResourceProfile();
// 从PendingTaskManagerSlot中找到一个符合资源需求的slot
Optional<PendingTaskManagerSlot> pendingTaskManagerSlotOptional = findFreeMatchingPendingTaskManagerSlot(resourceProfile);
// 如果没有找到资源需求匹配的pending slot
// 分配resourceProfile指定的资源,创建一个pending slot
if (!pendingTaskManagerSlotOptional.isPresent()) {
pendingTaskManagerSlotOptional = allocateResource(resourceProfile);
}
// 如果创建成功,执行assignPendingTaskManagerSlot方法
// 此方法将pendingSlotRequest和pendingTaskManagerSlot关联起来
OptionalConsumer.of(pendingTaskManagerSlotOptional)
.ifPresent(pendingTaskManagerSlot -> assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot))
.ifNotPresent(() -> {
// request can not be fulfilled by any free slot or pending slot that can be allocated,
// check whether it can be fulfilled by allocated slots
if (failUnfulfillableRequest && !isFulfillableByRegisteredOrPendingSlots(pendingSlotRequest.getResourceProfile())) {
throw new UnfulfillableSlotRequestException(pendingSlotRequest.getAllocationId(), pendingSlotRequest.getResourceProfile());
}
});
}
allocateSlot
现在我们分析下internalRequestSlot逻辑的里一个分支allocateSlot方法调用。
SlotManagerImpl的allocateSlot方法内容如下:
private void allocateSlot(TaskManagerSlot taskManagerSlot, PendingSlotRequest pendingSlotRequest) {
Preconditions.checkState(taskManagerSlot.getState() == SlotState.FREE);
// 从slot中获取和TaskManager的连接信息
TaskExecutorConnection taskExecutorConnection = taskManagerSlot.getTaskManagerConnection();
// 获取RPC调用端
TaskExecutorGateway gateway = taskExecutorConnection.getTaskExecutorGateway();
final CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
final AllocationID allocationId = pendingSlotRequest.getAllocationId();
final SlotID slotId = taskManagerSlot.getSlotId();
final InstanceID instanceID = taskManagerSlot.getInstanceId();
// 为slot指定PendingSlotRequest
taskManagerSlot.assignPendingSlotRequest(pendingSlotRequest);
pendingSlotRequest.setRequestFuture(completableFuture);
// 如果这个PendingSlotRequest已经分配slot,需要先归还
returnPendingTaskManagerSlotIfAssigned(pendingSlotRequest);
// 获取已注册的TaskManager
TaskManagerRegistration taskManagerRegistration = taskManagerRegistrations.get(instanceID);
if (taskManagerRegistration == null) {
throw new IllegalStateException("Could not find a registered task manager for instance id " +
instanceID + '.');
}
// 标记这个taskManager状态为使用中
taskManagerRegistration.markUsed();
// RPC call to the task manager
// 远程调用TaskExecutor的requestSlot方法
CompletableFuture<Acknowledge> requestFuture = gateway.requestSlot(
slotId,
pendingSlotRequest.getJobId(),
allocationId,
pendingSlotRequest.getResourceProfile(),
pendingSlotRequest.getTargetAddress(),
resourceManagerId,
taskManagerRequestTimeout);
// RPC调用完成后,执行completableFuture
requestFuture.whenComplete(
(Acknowledge acknowledge, Throwable throwable) -> {
if (acknowledge != null) {
completableFuture.complete(acknowledge);
} else {
completableFuture.completeExceptionally(throwable);
}
});
completableFuture.whenCompleteAsync(
(Acknowledge acknowledge, Throwable throwable) -> {
try {
if (acknowledge != null) {
// 如果分配成功,更新slot信息
updateSlot(slotId, allocationId, pendingSlotRequest.getJobId());
} else {
if (throwable instanceof SlotOccupiedException) {
SlotOccupiedException exception = (SlotOccupiedException) throwable;
// 如果slot被占用,更新slot信息
updateSlot(slotId, exception.getAllocationId(), exception.getJobId());
} else {
// 否则,移除SlotRequest
removeSlotRequestFromSlot(slotId, allocationId);
}
if (!(throwable instanceof CancellationException)) {
// 如果slot分配操作取消,调用处理失败slot请求逻辑
handleFailedSlotRequest(slotId, allocationId, throwable);
} else {
LOG.debug("Slot allocation request {} has been cancelled.", allocationId, throwable);
}
}
} catch (Exception e) {
LOG.error("Error while completing the slot allocation.", e);
}
},
mainThreadExecutor);
}
上面ResourceManager通过RPC调用了TaskExecutor的requestSlot方法。
requestSlot
TaskExecutor.requestSlot内容如下:
@Override
public CompletableFuture<Acknowledge> requestSlot(
final SlotID slotId,
final JobID jobId,
final AllocationID allocationId,
final ResourceProfile resourceProfile,
final String targetAddress,
final ResourceManagerId resourceManagerId,
final Time timeout) {
// TODO: Filter invalid requests from the resource manager by using the instance/registration Id
log.info("Receive slot request {} for job {} from resource manager with leader id {}.",
allocationId, jobId, resourceManagerId);
// 检测是否连接到了ResourceManager
if (!isConnectedToResourceManager(resourceManagerId)) {
final String message = String.format("TaskManager is not connected to the resource manager %s.", resourceManagerId);
log.debug(message);
return FutureUtils.completedExceptionally(new TaskManagerException(message));
}
try {
// 执行TaskExecutor的allocateSlot方法
allocateSlot(
slotId,
jobId,
allocationId,
resourceProfile);
} catch (SlotAllocationException sae) {
return FutureUtils.completedExceptionally(sae);
}
final JobTable.Job job;
try {
// 创建一个作业
job = jobTable.getOrCreateJob(jobId, () -> registerNewJobAndCreateServices(jobId, targetAddress));
} catch (Exception e) {
// free the allocated slot
try {
taskSlotTable.freeSlot(allocationId);
} catch (SlotNotFoundException slotNotFoundException) {
// slot no longer existent, this should actually never happen, because we've
// just allocated the slot. So let's fail hard in this case!
onFatalError(slotNotFoundException);
}
// release local state under the allocation id.
localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
// sanity check
if (!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
onFatalError(new Exception("Could not free slot " + slotId));
}
return FutureUtils.completedExceptionally(new SlotAllocationException("Could not create new job.", e));
}
if (job.isConnected()) {
offerSlotsToJobManager(jobId);
}
return CompletableFuture.completedFuture(Acknowledge.get());
}
最后我们跟踪到了TaskExecutor的allocateSlot方法。这个方法内容较少,不再贴出相关代码。该方法最终调用TaskSlotTable的allocateSlot方法。
TaskExecutor 的 allocateSlot 方法
private void allocateSlot(
SlotID slotId,
JobID jobId,
AllocationID allocationId,
ResourceProfile resourceProfile) throws SlotAllocationException {
// 如果slot处于空闲状态
if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
// taskSlotTable分配slot
if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), jobId, allocationId, resourceProfile, taskManagerConfiguration.getTimeout())) {
log.info("Allocated slot for {}.", allocationId);
} else {
log.info("Could not allocate slot for {}.", allocationId);
throw new SlotAllocationException("Could not allocate slot.");
}
} else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), jobId, allocationId)) {
// 进入这个分支表明slot被分配给了其他的job
final String message = "The slot " + slotId + " has already been allocated for a different job.";
log.info(message);
final AllocationID allocationID = taskSlotTable.getCurrentAllocation(slotId.getSlotNumber());
throw new SlotOccupiedException(message, allocationID, taskSlotTable.getOwningJob(allocationID));
}
}
到这里为止,我们完成了从Execution vertex到最终TaskManager创建出TaskSlot的过程。
TaskManager 启动时分配slot逻辑
TaskExecutor的startTaskExecutorServices方法。该方法启动了ResourceManager资源管理器Leader信息的获取服务,并注册了一个监听器,实时监听ResourceManager leader状态的变化。然后启动TaskSlotTable,Job leader服务和文件缓存。
startTaskExecutorServices
private void startTaskExecutorServices() throws Exception {
try {
// start by connecting to the ResourceManager
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
// tell the task slot table who's responsible for the task slot actions
taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
// start the job leader service
jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
} catch (Exception e) {
handleStartTaskExecutorServicesException(e);
}
}
当ResourceManager leader服务选举成功之时通知ResourceManagerLeaderListener,调用它的notifyLeaderAddress方法。
notifyLeaderAddress
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
runAsync(
() -> notifyOfNewResourceManagerLeader(
leaderAddress,
ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
这里异步调用了notifyOfNewResourceManagerLeader方法。我们跟踪一下。
notifyOfNewResourceManagerLeader
private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
resourceManagerAddress = createResourceManagerAddress(newLeaderAddress, newResourceManagerId);
reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
}
该方法首先保存了选举后确定的ResourceManager leader地址,然后建立和ResourceManager的连接。
我们跟踪下建立连接的方法。
reconnectToResourceManager
private void reconnectToResourceManager(Exception cause) {
closeResourceManagerConnection(cause);
startRegistrationTimeout();
tryConnectToResourceManager();
}
为了逻辑统一,这里实际使用的是重新连接的逻辑。首先关闭和ResourceManager的连接,然后创建超时检测任务(超时时间从配置文件中读取),最后尝试和ResourceManager建立连接。
继续跟踪tryConnectToResourceManager方法。
tryConnectToResourceManager
private void tryConnectToResourceManager() {
if (resourceManagerAddress != null) {
connectToResourceManager();
}
}
private void connectToResourceManager() {
assert(resourceManagerAddress != null);
assert(establishedResourceManagerConnection == null);
assert(resourceManagerConnection == null);
log.info("Connecting to ResourceManager {}.", resourceManagerAddress);
// 创建一个TaskExecutor注册对象
// 包含TaskExecutor的地址端口,资源配置硬件信息等
final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(
getAddress(),
getResourceID(),
unresolvedTaskManagerLocation.getDataPort(),
JMXService.getPort().orElse(-1),
hardwareDescription,
memoryConfiguration,
taskManagerConfiguration.getDefaultSlotResourceProfile(),
taskManagerConfiguration.getTotalResourceProfile()
);
// 建立和ResourceManager的连接,并启动
resourceManagerConnection =
new TaskExecutorToResourceManagerConnection(
log,
getRpcService(),
taskManagerConfiguration.getRetryingRegistrationConfiguration(),
resourceManagerAddress.getAddress(),
resourceManagerAddress.getResourceManagerId(),
getMainThreadExecutor(),
new ResourceManagerRegistrationListener(),
taskExecutorRegistration);
resourceManagerConnection.start();
}
TaskExecutor向ResourceManager注册并开启连接。注意这里创建了一个连接状态监听器。注册并连接成功后,调用ResourceManagerRegistrationListener的onRegistrationSuccess方法。
onRegistrationSuccess
@Override
public void onRegistrationSuccess(TaskExecutorToResourceManagerConnection connection, TaskExecutorRegistrationSuccess success) {
final ResourceID resourceManagerId = success.getResourceManagerId();
final InstanceID taskExecutorRegistrationId = success.getRegistrationId();
final ClusterInformation clusterInformation = success.getClusterInformation();
final ResourceManagerGateway resourceManagerGateway = connection.getTargetGateway();
runAsync(
() -> {
// filter out outdated connections
//noinspection ObjectEquality
if (resourceManagerConnection == connection) {
try {
establishResourceManagerConnection(
resourceManagerGateway,
resourceManagerId,
taskExecutorRegistrationId,
clusterInformation);
} catch (Throwable t) {
log.error("Establishing Resource Manager connection in Task Executor failed", t);
}
}
});
}
回调函数异步调用TaskExecutor的establishResourceManagerConnection,执行建立连接后的逻辑。
establishResourceManagerConnection
private void establishResourceManagerConnection(
ResourceManagerGateway resourceManagerGateway,
ResourceID resourceManagerResourceId,
InstanceID taskExecutorRegistrationId,
ClusterInformation clusterInformation) {
// 向ResourceManager异步发送slot报告
final CompletableFuture<Acknowledge> slotReportResponseFuture = resourceManagerGateway.sendSlotReport(
getResourceID(),
taskExecutorRegistrationId,
// 通过TaskSlotTable创建slot报告
taskSlotTable.createSlotReport(getResourceID()),
taskManagerConfiguration.getTimeout());
slotReportResponseFuture.whenCompleteAsync(
(acknowledge, throwable) -> {
if (throwable != null) {
// 如果遇到异常,再次尝试重新连接ResourceManager
reconnectToResourceManager(new TaskManagerException("Failed to send initial slot report to ResourceManager.", throwable));
}
}, getMainThreadExecutor());
// monitor the resource manager as heartbeat target
// 监测和ResourceManager之间的心跳状态
resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<TaskExecutorHeartbeatPayload>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {
// 收到心跳后向ResourceManager回送心跳信息
resourceManagerGateway.heartbeatFromTaskManager(resourceID, heartbeatPayload);
}
@Override
public void requestHeartbeat(ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {
// the TaskManager won't send heartbeat requests to the ResourceManager
}
});
// set the propagated blob server address
// 设置blob server地址信息
final InetSocketAddress blobServerAddress = new InetSocketAddress(
clusterInformation.getBlobServerHostname(),
clusterInformation.getBlobServerPort());
// 设置blobCache
blobCacheService.setBlobServerAddress(blobServerAddress);
// 保存已创建的ResourceManager连接信息
establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
resourceManagerGateway,
resourceManagerResourceId,
taskExecutorRegistrationId);
// 停止连接注册过程超时计时器
stopRegistrationTimeout();
}
发送SlotReport的方法在ResourceManager的sendSlotReport。
sendSlotReport
@Override
public CompletableFuture<Acknowledge> sendSlotReport(ResourceID taskManagerResourceId, InstanceID taskManagerRegistrationId, SlotReport slotReport, Time timeout) {
final WorkerRegistration<WorkerType> workerTypeWorkerRegistration = taskExecutors.get(taskManagerResourceId);
if (workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId)) {
// 通过SlotManager注册TaskManager
if (slotManager.registerTaskManager(workerTypeWorkerRegistration, slotReport)) {
onWorkerRegistered(workerTypeWorkerRegistration.getWorker());
}
return CompletableFuture.completedFuture(Acknowledge.get());
} else {
return FutureUtils.completedExceptionally(new ResourceManagerException(String.format("Unknown TaskManager registration id %s.", taskManagerRegistrationId)));
}
}
这个方法向SlotManager注册了TaskManager。我们继续跟踪。
registerTaskManager
SlotManagerImpl.registerTaskManager方法内容如下:
@Override
public boolean registerTaskManager(final TaskExecutorConnection taskExecutorConnection, SlotReport initialSlotReport) {
// 检查slotManager状态确保已经启动
checkInit();
LOG.debug("Registering TaskManager {} under {} at the SlotManager.", taskExecutorConnection.getResourceID().getStringWithMetadata(), taskExecutorConnection.getInstanceID());
// we identify task managers by their instance id
// 如果包含TaskExecutor的instance id,说明这个task executor已经注册过
// 更新slots中保存的slot信息,返回false
if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
return false;
} else {
// 检查分配slot后slot个数是否超过上限
// slot最大个数通过slotmanager.number-of-slots.max配置
if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
LOG.info("The total number of slots exceeds the max limitation {}, release the excess resource.", maxSlotNum);
// 将资源释放
resourceActions.releaseResource(taskExecutorConnection.getInstanceID(), new FlinkException("The total number of slots exceeds the max limitation."));
return false;
}
// first register the TaskManager
ArrayList<SlotID> reportedSlots = new ArrayList<>();
// 将slot report中的各个slot id写入reportedSlots
for (SlotStatus slotStatus : initialSlotReport) {
reportedSlots.add(slotStatus.getSlotID());
}
// 生成并保存TaskManager的注册信息
TaskManagerRegistration taskManagerRegistration = new TaskManagerRegistration(
taskExecutorConnection,
reportedSlots);
taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(), taskManagerRegistration);
// next register the new slots
// 逐个注册slot
for (SlotStatus slotStatus : initialSlotReport) {
registerSlot(
slotStatus.getSlotID(),
slotStatus.getAllocationID(),
slotStatus.getJobID(),
slotStatus.getResourceProfile(),
taskExecutorConnection);
}
return true;
}
}
这个方法返回一个布尔值。如果TaskManager之前没有注册过,并且注册成功,返回true。否则返回false。
接下来我们重点分析下SlotManagerImpl.registerSlot方法。SlotManager通过这个方法为TaskManager注册slot。
registerSlot
private void registerSlot(
SlotID slotId,
AllocationID allocationId,
JobID jobId,
ResourceProfile resourceProfile,
TaskExecutorConnection taskManagerConnection) {
// 如果要注册的slot id和已存在的某个slot相同,需要先移除这个已存在的slot
if (slots.containsKey(slotId)) {
// remove the old slot first
removeSlot(
slotId,
new SlotManagerException(
String.format(
"Re-registration of slot %s. This indicates that the TaskExecutor has re-connected.",
slotId)));
}
// 创建一个新的TaskManagerSlot中,并注册(保存到slots集合)
final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId, resourceProfile, taskManagerConnection);
final PendingTaskManagerSlot pendingTaskManagerSlot;
if (allocationId == null) {
// 如果没有allocationId,找到一个资源要求匹配的pending slot
pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile);
} else {
pendingTaskManagerSlot = null;
}
// 如果没有找到资源要求匹配的slot,更新slot信息
if (pendingTaskManagerSlot == null) {
updateSlot(slotId, allocationId, jobId);
} else {
// 从pendingSlots中移除
pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());
// 取出pending slot申请请求
final PendingSlotRequest assignedPendingSlotRequest = pendingTaskManagerSlot.getAssignedPendingSlotRequest();
if (assignedPendingSlotRequest == null) {
// 当前slot无人请求,放入空闲slot集合中
handleFreeSlot(slot);
} else {
// 开始分配slot
assignedPendingSlotRequest.unassignPendingTaskManagerSlot();
allocateSlot(slot, assignedPendingSlotRequest);
}
}
}
此时,ResourceManager开始真正的分配slot流程。分配slot过程位于SlotManagerImpl.allocateSlot方法。后面的过程和Slot申请流程相同,不再赘述。
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。










网友评论