ExecutorService内置的线程池

发表时间:2017-10-18 09:37:04 浏览量( 31 ) 留言数( 0 )

学习目标:

1、了解ThreadPoolExecutor和ScheduledThreadPoolExecutor。

2、了解Executors建立默认的线程池

3、能在实践中灵活的使用这些技术


学习过程:

一、ThreadPoolExecutor的学习

   前面我们介绍过Java API对ExecutorService接口的实现有两个,可以直接作为线程池使用:

1. ThreadPoolExecutor

2. ScheduledThreadPoolExecutor

    我们先看看ThreadPoolExecutor的构造方法:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler)

含义如下:

corePoolSize: 线程池维护线程的最少数量

maximumPoolSize:线程池维护线程的最大数量

keepAliveTime: 线程池维护线程所允许的空闲时间

unit: 线程池维护线程所允许的空闲时间的单位

workQueue: 线程池所使用的缓冲队列

handler: 线程池对拒绝任务的处理策略


    当一个任务通过execute(Runnable)方法添加到线程池时,并不会马上执行的,执行逻辑判断如下:

  • 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

  • 如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。

  • 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。这个地方可能和大家想的不一样,先放到workQueue里面的,满了才会构造新的线程。所以如果workQueue是无限大的,那么maximumPoolSize也就没有意思了。

  • 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。

  • 当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。


TimeUnit:

   可选的参数为java.util.concurrent.TimeUnit中的几个静态属性:NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。


WorkQueue:

   workQueue常用的是:java.util.concurrent.ArrayBlockingQueue,让可以自己根据需要调整的,一般会使用BlockingQueue的实现类,有关这些类的详细使用我们再后面的课程再说。常用的有下面集中:

  • ArrayBlockingQueue:规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的。

  • LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的。

  • PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序。

  • SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的。


Handler:

默认有下面提供了四种预定义的处理程序策略:

  • 默认ThreadPoolExecutor.AbortPolicy,处理程序遭到拒绝将抛出运行时 RejectedExecutionException。

  • ThreadPoolExecutor.CallerRunsPolicy,线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

  • ThreadPoolExecutor.DiscardPolicy,不能执行的任务将被删除。

  • ThreadPoolExecutor.DiscardOldestPolicy,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。

定义和使用其他种类的 RejectedExecutionHandler 类也是可能的,但这样做需要非常小心,尤其是当策略仅用于特定容量或排队策略时。


示例代码:

public class MyThreadPoolExecutor {
	public static void main(String[] args) {	
		ThreadPoolExecutor pool = new ThreadPoolExecutor(
				1, //coreSize故意设置小一点,测试用的,实际项目可以根据需要调整
				2, //MaxSize
				60, //60
				TimeUnit.SECONDS, 
				new ArrayBlockingQueue<Runnable>(3)	//指定一种队列 可以分别使用有界的和无界的尝试一下。
				//new LinkedBlockingQueue<Runnable>()
				, new MyRejected()
				//, new DiscardOldestPolicy()
				);
		
		MyTask mt1 = new MyTask(1, "任务1");
		MyTask mt2 = new MyTask(2, "任务2");
		MyTask mt3 = new MyTask(3, "任务3");
		MyTask mt4 = new MyTask(4, "任务4");
		MyTask mt5 = new MyTask(5, "任务5");
		MyTask mt6 = new MyTask(6, "任务6");
		
		pool.execute(mt1);
		pool.execute(mt2);
		pool.execute(mt3);
		pool.execute(mt4);
		pool.execute(mt5);
		pool.execute(mt6);
		
		pool.shutdown();
		
	}
}


自定的MyTask

public class MyTask implements Runnable {

	private int taskId;
	private String taskName;

	public MyTask(int taskId, String taskName) {
		this.taskId = taskId;
		this.taskName = taskName;
	}

	@Override
	public void run() {
		try {
			System.out.println("run taskId =" + this.taskId);
			Thread.sleep(5 * 1000);
			// System.out.println("end taskId =" + this.taskId);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public String toString() {
		return Integer.toString(this.taskId);
	}

}

自定义Handler:

public class MyRejected implements RejectedExecutionHandler {
	@Override
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
		System.out.println("自定义处理..");
		System.out.println("当前被拒绝任务为:" + r.toString());
	}

}


二、使用内置的ThreadPoolExecutor

   创建一个什么样的ExecutorService的实例(即线程池)需要根据具体应用场景而定,不过Java给我们提供了一个Executors工厂类,它可以帮助我们很方便的创建各种类型ExecutorService线程池,Executors一共可以创建下面这四类线程池:

  • newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

  • newCachedThreadPool 创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

  • newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

  • newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。


1、固定大小的线程池newFixedThreadPool:

可以点击进去看看源代码就明白了:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

源代码解析如下:

  workQueue使用了LinkedBlockingQueue,所以你可以无限的添加任务。coreSize和maxSize都是通过参数传入进来的。最多有nThreads个活跃的线程。

public class TestThreadPool {
	  public static void main(String[] args) {  
	        // 创建一个可重用固定线程数的线程池  
	        ExecutorService pool = Executors.newFixedThreadPool(5);  
	        // 创建线程  
			MyTask mt1 = new MyTask(1, "任务1");
			MyTask mt2 = new MyTask(2, "任务2");
			MyTask mt3 = new MyTask(3, "任务3");
			MyTask mt4 = new MyTask(4, "任务4");
			MyTask mt5 = new MyTask(5, "任务5");
			MyTask mt6 = new MyTask(6, "任务6");  
	        // 将线程放入池中进行执行  
	        pool.execute(mt1);  
	        pool.execute(mt2);  
	        pool.execute(mt3);  
	        pool.execute(mt4);  
	        pool.execute(mt5);  
	        pool.execute(mt6);
	        // 关闭线程池  
	        pool.shutdown();  
	    }  
}

输出结果:定义了coreSize是5个线程,线程池里面活跃的线程最多就是5个,所以第6个需要等会才能输出。

run taskId =1

run taskId =3

run taskId =2

run taskId =4

run taskId =5

run taskId =6


2、可变尺寸的线程池,newCachedThreadPool:

与上面的类似,只是改动下pool的创建方式:

 ExecutorService pool = Executors.newCachedThreadPool();

可以点击进去看看源代码就明白了:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

源代码解析如下:

    workQueue使用了SynchronousQueue,可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。

六个线程同时都输出了:

run taskId =2

run taskId =1

run taskId =4

run taskId =3

run taskId =5

run taskId =6


 

3、单任务线程池newSingleThreadExecutor:

仅仅是把上述代码改为

    ExecutorService pool = Executors.newSingleThreadExecutor();

可以点击进去看看源代码就明白了:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

源代码解析如下:

    和newFixedThreadPool几乎是一样的,workQueue使用了LinkedBlockingQueue,只是coreSize和maxSize都是1。

一个一个线程的执行下去。输出结果:

run taskId =1

run taskId =2

run taskId =3

run taskId =4

run taskId =5

run taskId =6

 


4、延迟连接池,newScheduledThreadPool:

可以点击进去看看源代码就明白了:

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

源代码解析如下:

    workQueue使用了DelayedQueue。

public class TestThreadSchPool {

	public static void main(String[] args) {

		ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
		// 创建线程
		MyTask mt1 = new MyTask(1, "任务1");
		MyTask mt2 = new MyTask(2, "任务2");
		MyTask mt3 = new MyTask(3, "任务3");
		MyTask mt4 = new MyTask(4, "任务4");
		MyTask mt5 = new MyTask(5, "任务5");
		MyTask mt6 = new MyTask(6, "任务6");
		// 将线程放入池中进行执行
		pool.execute(mt1);
		// 使用延迟执行风格的方法
		pool.schedule(mt2, 1000, TimeUnit.MILLISECONDS);
		pool.schedule(mt3, 10, TimeUnit.MILLISECONDS);
		pool.scheduleAtFixedRate(mt4, 100, 5, TimeUnit.MILLISECONDS);//以固定频率执行
		// 先不要关闭线程池
		//pool.shutdown();
	}

}

 有关这些线程池的实现原理我们需要学习了线程安全的集合类后会有更好的理解,以后我们也是可以自定以一个线程池的。

输出:taskId=4会一直执行下去的。

run taskId =1

run taskId =3

run taskId =4

run taskId =2

run taskId =4

run taskId =4

run taskId =4