ThreadPool定时重试
编程技术  /  houtizong 发布于 3年前   121
package threadpool;public class ThreadPoolTest {/** * @param args */public static void main(String[] args) {System.out.println("start");ThreadPoolManager poolManager = new ThreadPoolManager(3);poolManager.start();MyTaskList list = new MyTaskList(poolManager);new MyTask(list, "A").start();new MyTask(list, "B").start();new MyTask(list, "C").start();new MyTask(list, "D").start();new MyTask(list, "E").start();new MyTask(list, "F").start();new MyTask(list, "G").start();try {Thread.sleep(30000);} catch (InterruptedException e) {e.printStackTrace();}poolManager.stop();System.out.println("stop");}}
package threadpool;import java.util.LinkedList;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ThreadPoolManager {/** 线程池的大小 */private int poolSize;private static final int MIN_POOL_SIZE = 1;private static final int MAX_POOL_SIZE = 10;/** 线程池 */private ExecutorService threadPool;/** 请求队列 */private LinkedList<ThreadPoolTask> asyncTasks;/** 轮询线程 */private Thread poolThread;/** 轮询时间 */private static final int SLEEP_TIME = 200;public ThreadPoolManager(int poolSize) {if (poolSize < MIN_POOL_SIZE)poolSize = MIN_POOL_SIZE;if (poolSize > MAX_POOL_SIZE)poolSize = MAX_POOL_SIZE;this.poolSize = poolSize;threadPool = Executors.newFixedThreadPool(this.poolSize);asyncTasks = new LinkedList<ThreadPoolTask>();}/** * 向任务队列中添加任务 * * @param task */public void addAsyncTask(ThreadPoolTask task) {synchronized (asyncTasks) {// Log.i(TAG, "add task: " + task.getURL());asyncTasks.addLast(task);}}/** * 从任务队列中提取任务 * * @return */private ThreadPoolTask getAsyncTask() {synchronized (asyncTasks) {if (asyncTasks.size() > 0) {ThreadPoolTask task = asyncTasks.removeFirst();// Log.i(TAG, "remove task: " + task.getURL());return task;}}return null;}/** * 开启线程池轮询 * * @return */public void start() {if (poolThread == null) {poolThread = new Thread(new PoolRunnable());poolThread.start();}}/** * 结束轮询,关闭线程池 */public void stop() {poolThread.interrupt();poolThread = null;}/** * 实现轮询的Runnable * * @author carrey * */private class PoolRunnable implements Runnable {@Overridepublic void run() {// Log.i(TAG, "开始轮询");try {while (!Thread.currentThread().isInterrupted()) {ThreadPoolTask task = getAsyncTask();if (task == null) {try {Thread.sleep(SLEEP_TIME);} catch (InterruptedException e) {Thread.currentThread().interrupt();}continue;}threadPool.execute(task);}} finally {threadPool.shutdown();}// Log.i(TAG, "结束轮询");}}}
package threadpool;public class ThreadPoolTask implements Runnable {private String tag;private Callback callback;public ThreadPoolTask(String tag, Callback callback) {this.tag = tag;this.callback = callback;}@Overridepublic void run() {System.out.println(tag + " is running on " + Thread.currentThread());try {// 模拟耗时任务Thread.sleep(700);} catch (InterruptedException e) {e.printStackTrace();}if (callback != null)callback.onRetry();}public interface Callback {public void onRetry();}}
package threadpool;import java.lang.reflect.Field;import java.util.Timer;import java.util.TimerTask;public class MyTaskList {private ThreadPoolManager poolManager;private Timer timer;public MyTaskList(ThreadPoolManager poolManager) {this.poolManager = poolManager;timer = new Timer();}public void addTask(ThreadPoolTask task) {if (task != null)poolManager.addAsyncTask(task);}public void addTask(TimerTask task, long delay) {// 重置TimerTask,不然会发生Exceptiontry {Class<?> clazz = TimerTask.class;Field field = clazz.getDeclaredField("state");field.setAccessible(true);field.set(task, 0);} catch (Exception e) {}timer.schedule(task, delay);}}
package threadpool;import java.util.TimerTask;import threadpool.ThreadPoolTask.Callback;public class MyTask implements Callback {private MyTaskList list;private ThreadPoolTask task;private String tag;private int retry = 0;public MyTask(MyTaskList list, String tag) {this.list = list;this.tag = tag;}public void start() {task = new ThreadPoolTask(tag, this);start(0);}private void start(int retry) {// 最多重试3次if (retry >= 3) {System.out.println(tag + " finished " + Thread.currentThread());return;}doSomething();this.retry = retry;list.addTask(task);}@Overridepublic void onRetry() {// 重试间隔list.addTask(timertask, 1000);}private TimerTask timertask = new TimerTask() {@Overridepublic void run() {start(retry + 1);}};private void doSomething() {System.out.println("Retry[" + retry + "] " + tag + " on "+ Thread.currentThread());}}
请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!
技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成
网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];
文章归档
文章标签
友情链接