Erlo

封装CompletionService的并发任务分发器(优化版)

2025-06-09 12:29:26 发布   38 浏览  
页面报错/反馈
收藏 点赞

这个框架代码用了很长时间,使用场景也挺多,初衷是简化CompletionService的编程接口,尽量减少业务代码处的感知。
今天找deepseek做了一版优化,优化点:

  • 整体的超时控制
  • 超时、异常处理和封装
  • 取消未完成的任务

核心代码

public class TaskDispatcher {

    private final CompletionService completionService;

    /**
     * 待处理任务
     */
    private final Set> pending = Sets.newHashSet();

    /**
     * 超时时间, 单位: s
     */
    private long timeout = 10000;

    public TaskDispatcher(Executor executor, long timeout) {
        completionService = new ExecutorCompletionService(executor);
        if (timeout > 0) {
            this.timeout = timeout;
        }
    }

    public void submit(Callable task) {
        Future future = completionService.submit(task);
        pending.add(future);
    }

    /**
     * 仅获取执行的任务结果
     *
     * @param ignoreException 忽略执行时发生的异常
     * @return
     */
    public List taskCompletedResult(boolean ignoreException) {
        List> taskResultList = taskCompleted();
        List res = Lists.newArrayList();
        if (CollectionUtils.isEmpty(taskResultList)) {
            return res;
        }
        boolean hasError = false;
        for (TaskResult taskResult : taskResultList) {
            if (!taskResult.isTimeout() && taskResult.getError() == null) {
                res.add(taskResult.getValue());
            } else if (taskResult.isTimeout() && !ignoreException) {
                LoggerUtils.error("执行任务时超时");
                hasError = true;
            } else if (taskResult.getError() != null && !ignoreException) {
                LoggerUtils.error("执行任务时发生异常", taskResult.getError());
                hasError = true;
            }
        }
        if (hasError) {
            throw new ZHException("任务并发处理时发生异常");
        }
        return res;
    }

    /**
     * 获取执行的任务
     *
     * @return
     */
    public List> taskCompleted() {
        long deadline = System.currentTimeMillis() + timeout;
        List> results = Lists.newArrayList();
        int totalTasks = pending.size();

        try {
            for (int i = 0; i  future = completionService.poll(remaining, TimeUnit.MILLISECONDS);
                TaskResult result = new TaskResult();
                if (future == null) {
                    result.setTimeout(true);
                } else {
                    pending.remove(future);
                    try {
                        result.setValue(future.get());
                    } catch (ExecutionException e) {
                        result.setError(e.getCause());
                    }
                }
                results.add(result);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("任务结果收集中断", e);
        } finally {
            pending.forEach(f -> f.cancel(true));
            pending.clear();
        }
        return results;
    }

    @Data
    static class TaskResult {
        private T value;
        private Throwable error;
        private boolean isTimeout;
    }
}

需要自己声明线程池bean,使用方式如下

        TaskDispatcher taskDispatcher = new TaskDispatcher(threadExecutor, TIME_OUT);
        for (long index: indexList) {
            taskDispatcher.submit(() -> xxxService.count(index));
        }

为了便于在计数求和场景使用,进一步实现了一个子类

public class IntSumTaskDispatcher extends TaskDispatcher {
    public IntSumTaskDispatcher(Executor executor, long timeout, boolean throwException) {
        super(executor, timeout);
    }

    /**
     * 对所有结果求和
     *
     * @return
     */
    public int takeCompletedSum() {
        List countResList = taskCompletedResult(true);
        int count = 0;
        for (Integer countSingle : countResList) {
            if (countSingle == null) {
                continue;
            }
            count += countSingle;
        }
        return count;
    }
}


作者:五岳
出处:http://www.cnblogs.com/wuyuegb2312
对于标题未标注为“转载”的文章均为原创,其版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

 

登录查看全部

参与评论

评论留言

还没有评论留言,赶紧来抢楼吧~~

手机查看

返回顶部

给这篇文章打个标签吧~

棒极了 糟糕透顶 好文章 PHP JAVA JS 小程序 Python SEO MySql 确认