以下是Java实现的任务分片调度器完整解决方案,包含任务分片、并发执行、分批处理和自动重试机制:
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Function;
public class TaskChunkScheduler
private final ExecutorService executor;
private final int batchSize;
private final int maxRetries;
private final long retryDelayMs;
public TaskChunkScheduler(int threadCount, int batchSize,
int maxRetries, long retryDelayMs) {
this.executor = Executors.newFixedThreadPool(threadCount);
this.batchSize = batchSize;
this.maxRetries = maxRetries;
this.retryDelayMs = retryDelayMs;
}
public List
throws InterruptedException {
List
List> batches = partition(tasks, batchSize);
for (List
for (T task : batch) {
futures.add(executor.submit(() ->
processWithRetry(task, processor)));
}
}
return getResults(futures);
}
private R processWithRetry(T task, Function
int attempts = 0;
while (true) {
try {
return processor.apply(task);
} catch (Exception e) {
if (++attempts > maxRetries) {
throw new RuntimeException("Max retries exceeded", e);
}
try {
Thread.sleep(retryDelayMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException("Retry interrupted", ie);
}
}
}
}
private static > partition(List
List> partitions = new ArrayList<>();
for (int i = 0; i < list.size(); i += size) {
partitions.add(list.subList(i, Math.min(i + size, list.size())));
}
return partitions;
}
private List
throws InterruptedException {
List
for (Future
try {
results.add(future.get());
} catch (ExecutionException e) {
throw new RuntimeException("Task execution failed", e);
}
}
return results;
}
public void shutdown() {
executor.shutdown();
}
}
@FunctionalInterface
public interface TaskProcessor
R process(T task) throws Exception;
}
public class SchedulerDemo {
public static void main(String[] args) throws Exception {
List
TaskChunkScheduler
4, // 线程数
3, // 每批处理数量
3, // 最大重试次数
1000 // 重试间隔(ms)
);
List
// 模拟处理逻辑
if (task % 3 == 0) {
throw new RuntimeException("模拟失败");
}
return "Processed-" + task;
});
System.out.println(results);
scheduler.shutdown();
}
}
代码说明:该调度器实现任务分片处理、并发执行和自动重试机制。主类TaskChunkScheduler提供线程池管理,支持自定义批次大小和重试策略。TaskProcessor定义任务处理接口,SchedulerDemo展示完整使用示例。