Phaser的学习

发表时间:2017-09-07 16:44:26 浏览量( 21 ) 留言数( 0 )

学习目标:

1、了解Phaser

2、在实践中灵活使用Phaser


学习过程:

一、Phaser简介

    Phaser由java7中推出,是Java SE 7中新增的一个使用同步工具,在功能上面它与CyclicBarrier、CountDownLatch有些重叠,但是它提供了更加灵活、强大的用法。

   在phaser中,有一个onAdvance方法, 该方法在参与者数量为0的时候,返回true,来表示该phaser状态为终止状态。它在phaser阶段改变的时候会自动执行。他需要两个参数,当前阶段数和注册的参与者数量。继承并覆盖phaser的 onAdvance 方法,来实现阶段切换的功能,要注意onAdvance方法中的 phase 是阶段的序号,在使用序号来判断的时候,需要注意动态注册的线程的序号会超出预期的阶段序号。

   CountDownLatch是在初始化的时候就固定了数量,并且不允许修改,但是Phaser支持通过register()和bulkRegister(int parties)方法来动态调整注册任务的数量。可以在任何时间注册新的参与者.并且在抵达屏障是可以注销已经注册的参与者.因此,注册到同步移相器的参与者的数目可能会随着时间的推移而变化。

   如CyclicBarrier一样,Phaser可以重复使用,这意味着当前参与者到达移相器后,可以再一次注册自己并等待另一次到达。因此,移相器会有多代.一旦为某个特定相位注册的所有参与者都到达移相器,就增加相数.相数从零开始,在达到Integer.MAX_VALUE后,再次绕回0.当移相器发生变化时,通过重写onAdvance方法,可以自行可选操作.这个方法也可用于终止移相器.移相器一旦被终止,所有的同步方法就会立即返回,并尝试注册新的失败的参与者.


Phaser的常用方法介绍:

   1、Phaser():构造函数,创建一个Phaser;默认parties个数为0。此后我们可以通过register()、bulkRegister()方法来注册新的parties。每个Phaser实例内部,都持有几个状态数据:termination状态、已经注册的parties个数(registeredParties)、当前phase下已到达的parties个数(arrivedParties)、当前phase周期数,还有2个同步阻塞队列Queue。Queue中保存了所有的waiter,即因为advance而等待的线程信息;这两个Queue分别为evenQ和oddQ,这两个Queue在实现上没有任何区别,Queue的元素为QNode,每个QNode保存一个waiter的信息,比如Thread引用、阻塞的phase、超时的deadline、是否支持interrupted响应等。两个Queue,其中一个保存当前phase中正在使用的waiter,另一个备用,当phase为奇数时使用evenQ、oddQ备用,偶数时相反,即两个Queue轮换使用。当advance事件触发期间,新register的parties将会被放在备用的Queue中,advance只需要响应另一个Queue中的waiters即可,避免出现混乱。

    2、Phaser(int parties):构造函数,初始一定数量的parties;相当于直接regsiter此数量的parties。

    3、arrive():到达,阻塞,等到当前phase下其他parties到达。如果没有register(即已register数量为0),调用此方法将会抛出异常,此方法返回当前phase周期数,如果Phaser已经终止,则返回负数。

    4、arriveAndDeregister():到达,并注销一个parties数量,非阻塞方法。注销,将会导致Phaser内部的parties个数减一(只影响当前phase),即下一个phase需要等待arrive的parties数量将减一。异常机制和返回值,与arrive方法一致。

    5、arriveAndAwaitAdvance():到达,且阻塞直到其他parties都到达,且advance。此方法等同于awaitAdvance(arrive())。如果你希望阻塞机制支持timeout、interrupted响应,可以使用类似的其他方法(参见下文)。如果你希望到达后且注销,而且阻塞等到当前phase下其他的parties到达,可以使用awaitAdvance(arriveAndDeregister())方法组合。此方法的异常机制和返回值同arrive()。

    6、awaitAdvance(int phase):阻塞方法,等待phase周期数下其他所有的parties都到达。如果指定的phase与Phaser当前的phase不一致,则立即返回。

    7、awaitAdvanceInterruptibly(int phase):阻塞方法,同awaitAdvance,只是支持interrupted响应,即waiter线程如果被外部中断,则此方法立即返回,并抛出InterrutedException。

    8、awaitAdvanceInterruptibly(int phase,long timeout,TimeUnit unit):阻塞方法,同awaitAdvance,支持timeout类型的interrupted响应,即当前线程阻塞等待约定的时长,超时后以TimeoutException异常方式返回。

    9、forceTermination():强制终止,此后Phaser对象将不可用,即register等将不再有效。此方法将会导致Queue中所有的waiter线程被唤醒。

    10、register():新注册一个party,导致Phaser内部registerPaties数量加1;如果此时onAdvance方法正在执行,此方法将会等待它执行完毕后才会返回。此方法返回当前的phase周期数,如果Phaser已经中断,将会返回负数。

    11、bulkRegister(int parties):批量注册多个parties数组,规则同10、。

    12、getArrivedParties():获取已经到达的parties个数。

    13、getPhase():获取当前phase周期数。如果Phaser已经中断,则返回负值。

    14、getRegisteredParties():获取已经注册的parties个数。

    15、getUnarrivedParties():获取尚未到达的parties个数。

    16、onAdvance(int phase,int registeredParties):这个方法比较特殊,表示当进入下一个phase时可以进行的事件处理,如果返回true表示此Phaser应该终止(此后将会把Phaser的状态为termination,即isTermination()将返回true。),否则可以继续进行。phase参数表示当前周期数,registeredParties表示当前已经注册的parties个数。


二、示例代码

我们可以使用Phaser实现替换CountDownLatch和CyclicBarrier

1、可以替换CountDownLatch

await()方法,可以使线程进入等待状态,在Phaser中,与之对应的方法是awaitAdvance(int n)。

countDown(),使计数器减一,当计数器为0时所有等待的线程开始执行,在Phaser中,与之对应的方法是arrive()

2、可以替换CyclicBarrier

Phaser替代CyclicBarrier比较简单,CyclicBarrier的await()方法可以直接用Phaser的arriveAndAwaitAdvance()方法替代


示例代码:

1、类似

(1)任务类

public class MyTaskCountDownLatch implements Runnable {

	private int taskId;
	private String taskName;
	private Phaser phaser;

	public MyTaskCountDownLatch(int taskId, String taskName,Phaser phaser) {
		this.taskId = taskId;
		this.taskName = taskName;
		this.phaser=phaser;
	}

	@Override
	public void run() {
		try {
			System.out.println("run taskId =" + this.taskId);
			Thread.sleep(2000);
			System.out.println("run taskId =" + this.taskId+"执行了通知");
			phaser.arrive();
			Thread.sleep(2000);
			System.out.println("end taskId =" + this.taskId);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public String toString() {
		return Integer.toString(this.taskId);
	}

}

(2)测试类

public class TestPhaserCountDownLatch {

	public static void main(String[] args) throws InterruptedException {

		Phaser phaser=new Phaser(3);

		ExecutorService pool = Executors.newFixedThreadPool(3);
		// 创建线程
		MyTaskCountDownLatch mt1 = new MyTaskCountDownLatch(1, "任务1",phaser);
		MyTaskCountDownLatch mt2 = new MyTaskCountDownLatch(2, "任务2",phaser);
		MyTaskCountDownLatch mt3 = new MyTaskCountDownLatch(3, "任务3",phaser);
		// 将线程放入池中进行执行
		pool.execute(mt1);
		pool.execute(mt2);
		pool.execute(mt3);

		System.out.println("主线程启动了。。。。");
		
		System.out.println(phaser.getPhase());
		
		phaser.awaitAdvance(phaser.getPhase());// 

		System.out.println("主线程可以继续执行了。。。");

	}

}

2、类似CyclicBarrier

(1)任务类

public class SybPhTask implements Runnable {

	private int taskId;
	private String taskName;
	private Phaser phaser;

	public SybPhTask(int taskId, String taskName, Phaser phaser) {
		this.taskId = taskId;
		this.taskName = taskName;
		this.phaser = phaser;
	}

	@Override
	public void run() {
		try {
			Thread.sleep((long) (Math.random() * 1000));
			
			//cyclicBarrier.getNumberWaiting() 的输出在这里并不一定正确,应为
			System.out.println(
					"taskId =" + this.taskId + "即将到达第一个集合地点,当前已有" + phaser.getArrivedParties() + "个已经到达,正在等候");
			phaser.arriveAndAwaitAdvance();// 到此如果没有达到公共屏障点,则该线程处于等待状态,如果达到公共屏障点则所有处于等待的线程都继续往下运行

			
			Thread.sleep((long) (Math.random() * 1000));
			
			//cyclicBarrier.getNumberWaiting() 的输出在这里并不一定正确,应为
			System.out.println(
					"taskId =" + this.taskId + "即将到达第二个集合地点,当前已有" + phaser.getArrivedParties() + "个已经到达,正在等候");
			phaser.arriveAndAwaitAdvance();// 到此如果没有达到公共屏障点,则该线程处于等待状态,如果达到公共屏障点则所有处于等待的线程都继续往下运行

			System.out.println("end taskId =" + this.taskId);
			
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public String toString() {
		return Integer.toString(this.taskId);
	}

}

(2)重写Phaser

public class TestMyPhaser extends Phaser {
	
	//
	public TestMyPhaser(int parties) {
		super(parties);
	}

	@Override
	protected boolean onAdvance(int phase, int registeredParties) { // 在每个阶段执行完成后回调的方法

		switch (phase) {
		case 0:
			System.out.println("第一个结束");
			return false;
		case 1:
			System.out.println("第二个结束");
			return false;
		default:
			return true;
		}

	}

}

(3)测试类

public class TestPhCyclicBarrier {
	public static void main(String[] args) throws InterruptedException {

		final TestMyPhaser phaser = new TestMyPhaser(3);// 定义为3

		ExecutorService pool = Executors.newCachedThreadPool();//这样使用CachedThreadPool,进来一个线程就处理一个
		// 创建线程
		SybPhTask mt1 = new SybPhTask(1, "任务1",phaser);
		SybPhTask mt2 = new SybPhTask(2, "任务2",phaser);
		SybPhTask mt3 = new SybPhTask(3, "任务3",phaser);
		// 将线程放入池中进行执行
		pool.execute(mt1);
		pool.execute(mt2);
		pool.execute(mt3);


		System.out.println("主线程可以继续执行了。。。");

	}
}

Phaser还有很多高级的应用,这里就不一一的介绍了。