这个框架代码用了很长时间,使用场景也挺多,初衷是简化CompletionService的编程接口,尽量减少业务代码处的感知。
今天找deepseek做了一版优化,优化点:
核心代码
public class TaskDispatcher {
private final CompletionService completionService;
/**
* 待处理任务
*/
private final Set> pending = Sets.newHashSet();
/**
* 超时时间, 单位: s
*/
private long timeout = 10000;
public TaskDispatcher(Executor executor, long timeout) {
completionService = new ExecutorCompletionService(executor);
if (timeout > 0) {
this.timeout = timeout;
}
}
public void submit(Callable task) {
Future future = completionService.submit(task);
pending.add(future);
}
/**
* 仅获取执行的任务结果
*
* @param ignoreException 忽略执行时发生的异常
* @return
*/
public List taskCompletedResult(boolean ignoreException) {
List> taskResultList = taskCompleted();
List res = Lists.newArrayList();
if (CollectionUtils.isEmpty(taskResultList)) {
return res;
}
boolean hasError = false;
for (TaskResult taskResult : taskResultList) {
if (!taskResult.isTimeout() && taskResult.getError() == null) {
res.add(taskResult.getValue());
} else if (taskResult.isTimeout() && !ignoreException) {
LoggerUtils.error("执行任务时超时");
hasError = true;
} else if (taskResult.getError() != null && !ignoreException) {
LoggerUtils.error("执行任务时发生异常", taskResult.getError());
hasError = true;
}
}
if (hasError) {
throw new ZHException("任务并发处理时发生异常");
}
return res;
}
/**
* 获取执行的任务
*
* @return
*/
public List> taskCompleted() {
long deadline = System.currentTimeMillis() + timeout;
List> results = Lists.newArrayList();
int totalTasks = pending.size();
try {
for (int i = 0; i future = completionService.poll(remaining, TimeUnit.MILLISECONDS);
TaskResult result = new TaskResult();
if (future == null) {
result.setTimeout(true);
} else {
pending.remove(future);
try {
result.setValue(future.get());
} catch (ExecutionException e) {
result.setError(e.getCause());
}
}
results.add(result);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("任务结果收集中断", e);
} finally {
pending.forEach(f -> f.cancel(true));
pending.clear();
}
return results;
}
@Data
static class TaskResult {
private T value;
private Throwable error;
private boolean isTimeout;
}
}
需要自己声明线程池bean,使用方式如下
TaskDispatcher taskDispatcher = new TaskDispatcher(threadExecutor, TIME_OUT);
for (long index: indexList) {
taskDispatcher.submit(() -> xxxService.count(index));
}
为了便于在计数求和场景使用,进一步实现了一个子类
public class IntSumTaskDispatcher extends TaskDispatcher {
public IntSumTaskDispatcher(Executor executor, long timeout, boolean throwException) {
super(executor, timeout);
}
/**
* 对所有结果求和
*
* @return
*/
public int takeCompletedSum() {
List countResList = taskCompletedResult(true);
int count = 0;
for (Integer countSingle : countResList) {
if (countSingle == null) {
continue;
}
count += countSingle;
}
return count;
}
}
作者:五岳
出处:http://www.cnblogs.com/wuyuegb2312
对于标题未标注为“转载”的文章均为原创,其版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
参与评论
手机查看
返回顶部