java编写一个任务分片调度器(并发 + 分批 + 重试)

以下是Java实现的任务分片调度器完整解决方案,包含任务分片、并发执行、分批处理和自动重试机制:


import java.util.*;

import java.util.concurrent.*;

import java.util.function.Function;


public class TaskChunkScheduler {

    private final ExecutorService executor;

    private final int batchSize;

    private final int maxRetries;

    private final long retryDelayMs;


    public TaskChunkScheduler(int threadCount, int batchSize, 

                            int maxRetries, long retryDelayMs) {

        this.executor = Executors.newFixedThreadPool(threadCount);

        this.batchSize = batchSize;

        this.maxRetries = maxRetries;

        this.retryDelayMs = retryDelayMs;

    }


    public List execute(List tasks, Function processor) 

            throws InterruptedException {

        List> futures = new ArrayList<>();

        List> batches = partition(tasks, batchSize);

        

        for (List batch : batches) {

            for (T task : batch) {

                futures.add(executor.submit(() -> 

                    processWithRetry(task, processor)));

            }

        }

        

        return getResults(futures);

    }

    

    private R processWithRetry(T task, Function processor) {

        int attempts = 0;

        while (true) {

            try {

                return processor.apply(task);

            } catch (Exception e) {

                if (++attempts > maxRetries) {

                    throw new RuntimeException("Max retries exceeded", e);

                }

                try {

                    Thread.sleep(retryDelayMs);

                } catch (InterruptedException ie) {

                    Thread.currentThread().interrupt();

                    throw new RuntimeException("Retry interrupted", ie);

                }

            }

        }

    }

    

    private static List> partition(List list, int size) {

        List> partitions = new ArrayList<>();

        for (int i = 0; i < list.size(); i += size) {

            partitions.add(list.subList(i, Math.min(i + size, list.size())));

        }

        return partitions;

    }

    

    private List getResults(List> futures) 

            throws InterruptedException {

        List results = new ArrayList<>();

        for (Future future : futures) {

            try {

                results.add(future.get());

            } catch (ExecutionException e) {

                throw new RuntimeException("Task execution failed", e);

            }

        }

        return results;

    }

    

    public void shutdown() {

        executor.shutdown();

    }

}

www.808455.com 电话13388885782 www.211178.com 电话13388885782 www.922238.com 电话13388885782 www.729279.com 电话13388885782 www.955526.com 电话13388885782 www.itl222.com 电话13388885782 www.itl5553.com 电话13388885782 www.itl567.com 电话13388885782 www.itl666.com 电话13388885782 www.itl996.com 电话13388885782 www.tl1001.com 电话13388885782 www.tl1002.com 电话13388885782 www.tl1003.com 电话13388885782 www.itl1001.com 电话13388885782 www.itl1003.com 电话13388885782 www.hjlh666.com 电话13388885782 www.hjlh888.com 电话13388885782 www.hj666.uk 电话13388885782 www.hj777.uk 电话13388885782 www.hj888666.com 电话13388885782 www.hj332025.com 电话13388885782

@FunctionalInterface

public interface TaskProcessor {

    R process(T task) throws Exception;

}

public class SchedulerDemo {

    public static void main(String[] args) throws Exception {

        List tasks = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        

        TaskChunkScheduler scheduler = new TaskChunkScheduler<>(

            4,  // 线程数

            3,  // 每批处理数量

            3,  // 最大重试次数

            1000 // 重试间隔(ms)

        );

        

        List results = scheduler.execute(tasks, task -> {

            // 模拟处理逻辑

            if (task % 3 == 0) {

                throw new RuntimeException("模拟失败");

            }

            return "Processed-" + task;

        });

        

        System.out.println(results);

        scheduler.shutdown();

    }

}

代码说明:该调度器实现任务分片处理、并发执行和自动重试机制。主类TaskChunkScheduler提供线程池管理,支持自定义批次大小和重试策略。TaskProcessor定义任务处理接口,SchedulerDemo展示完整使用示例。

请使用浏览器的分享功能分享到微信等