背景
项目里有一个处理Job的工作流。
实体对应的Job有两个状态:一是实体的Job的状态,以下称为State,另一个是每个实体Job状态的内部状态,以下称为JobState。
其中,State的变迁为业务相关(DOWNLOADRUN, PROCESSRUN等),JobState为业务不相关(NEW -> INPROGRESS -> COMPLETED)。
主线程负责State的变迁。
用一个线程池Executors.newSingleThreadExecutor来实际处理Job,即负责JobState的变迁。
当线程池中的线程将JobState变为COMPLETED之后,主线程将State前挪,并将JobState重置为NEW,重新让线程池中的线程处理。
伪代码如下:
while(on){
List<? extends IJobEntity> jobEntities = listToDoIJobEntities();
for (IJobEntity jobEntity : jobEntities){
Enum nextState = jobEntity.getEntityNextState();
Enum newState = nextState;
if (nextState != null){ // TODO: probably need to add nextState != NULL in the sql query above
if (nextState.ordinal() == readyState.ordinal() - 1){
newState = readyState;
nextState = null;
dao.update(null, mapOf("state", newState, "nextState", nextState, "jobState", null), jobEntity.getEntityId());
}
else{
nextState = jobEntity.getJobNextState();
dao.update(null, mapOf("state", newState, "nextState", nextState, "jobState", JobState.NEW), jobEntity.getEntityId());
}
IJobEntity tmpJobEntity = getDao().get(null, jobEntity.getEntityId()).orElse(null);
}
}
jobEntities = listNewAndStartedJobEntities();
for (IJobEntity jobEntity : jobEntities){
CompletableFuture<JobResult> future = CompletableFuture.supplyAsync(() -> {
Enum nextState = jobEntity.getEntityNextState();
IJobEntity latestJobEntity= getDao().get(null, jobEntity.getEntityId()).orElse(null);
if (nextState != latestJobEntity.getEntityNextState()){
return null;
}
// perform the task and set JobState INPROGRESS
return getDoers().doJob(nextState, jobEntity.getEntityId());
}, jobExecutor);
future.thenAccept(jobResult -> {
if (jobResult == null){
return;
}
IJobEntity tmpJobEntity = getDao().get(currentUserHolder.get(),jobResult.getEntityId()).orElse(null);
Map<String, Object> data = new HashedMap();
// Set JobState COMPLETED
data.put("jobState", jobResult.getJobState());
dao.update(currentUserHolder.get(), data, jobResult.getEntityId());
});
}
}
附上newSingleThreadExecutor的背景知识:
ThreadExecutorPool的构造函数有多个参数
- corePoolSize: the number of threads to keep in the pool
- maximumPoolSize: the maximum number of threads to allow in the pool
- keepAliveTime: 当线程数量大于corePoolSize时,当一个线程无事可做时,超过一定的时间(keepAliveTime),这个线程就会被停掉。
- unit
- workQueue:类型为BlockingQueue
newSingleThreadExecutor创建的corePoolSize为1,maximumPoolSize为1,workQueue为unbounding的LinkedBlockingQueue,即无界队列,允许无限多排队。
issue描述
同时launch多个Job(比如2个),称为Job1,Job2.
Job1和Job2同时进入NEW状态。
Job1被process为COMPLETED,准备进入下一状态;
Job2随后被process为COMPLETED,也准备进入下一状态。
问题来了,当Job2被处理结束后,日志显示,Job2又立马被设置成了INPROGRESS状态,重跑一遍INPROGRESS->COMPLETED。
问题跟踪
跟踪后发现,当Job1在处理的同时,主线程仍然在listStartOrNewEntities,这个时候listStartOrNewEntities的结果只有Job2,所以Job2被不停的挂在线程池等待队列上等待被处理。
这也是为什么Job2在被处理结束状态进入COMPLETED的同时,立即被设置为INPROGRESS的原因。
问题解决
这个问题的解决方案有几种:
方案1: 为JobState增加一个QUEUED的状态。创建CompletableFuture之前,如果状态为QUEUED,就不创建也就避免了排队;
方案2:将INPROGRESS状态设置这个动作,单拎出来,作为一个任务放在两个foreach之间。
方案3:在CompletableFuture的处理函数里面做判断,如果JobState已经为COMPLETED,不予处理直接返回。
其中,方案3,仅仅是跳过处理,仍然让Job进行了等待队列的排队,逻辑不正确,不可取。
只能从方案1和方案2中选择一种解决方案。