CyclicBarrier的学习

发表时间:2017-09-07 16:43:12 浏览量( 26 ) 留言数( 0 )

学习目标:

1、了解CyclicBarrier

2、在实践中灵活使用CyclicBarrier


学习过程:

一、CyclicBarrier简介

   CyclicBarrier和它名字的意思一样,可看成是个障碍, 所有的线程必须到齐后才能一起通过这个障碍,CyclicBarrier类似于CountDownLatch也是个计数器,不同的是CyclicBarrier数的是调用了CyclicBarrier.await()进入等待的线程数, 当线程数达到了CyclicBarrier初始时规定的数目时,所有进入等待状态的线程被唤醒并继续执行。

CyclicBarrier的方法介绍:

CyclicBarrier(int parties) 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在每个 barrier 上执行预定义的操作。

CyclicBarrier(int parties, Runnable barrierAction)  创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行,则每次到达公共屏障点的时候都最先执行这个传进去的Runnable,然后再执行处于等待的Runnable。


int await() 在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。

int await(long timeout, TimeUnit unit) 在所有参与者都已经在此屏障上调用 await 方法之前,将一直等待。

int  getNumberWaiting()  返回当前在屏障处等待的参与者数目。

int  getParties() 返回要求启动此 barrier 的参与者数目。

boolean isBroken() 查询此屏障是否处于损坏状态。

void reset() 将屏障重置为其初始状态。


二、示例代码

任务类:

public class SybTask implements Runnable {

	private int taskId;
	private String taskName;
	private CyclicBarrier cyclicBarrier;

	public SybTask(int taskId, String taskName, CyclicBarrier cyclicBarrier) {
		this.taskId = taskId;
		this.taskName = taskName;
		this.cyclicBarrier = cyclicBarrier;
	}

	@Override
	public void run() {
		try {
			Thread.sleep((long) (Math.random() * 1000));
			
			//cyclicBarrier.getNumberWaiting() 的输出在这里并不一定正确,应为
			System.out.println(
					"taskId =" + this.taskId + "即将到达第一个集合地点,当前已有" + cyclicBarrier.getNumberWaiting() + "个已经到达,正在等候");
			cyclicBarrier.await();// 到此如果没有达到公共屏障点,则该线程处于等待状态,如果达到公共屏障点则所有处于等待的线程都继续往下运行

			Thread.sleep((long) (Math.random() * 1000));
			System.out.println(
					"taskId =" + this.taskId + "即将到达第二个集合地点,当前已有" + cyclicBarrier.getNumberWaiting() + "个已经到达,正在等候");
			cyclicBarrier.await();
			System.out.println("end taskId =" + this.taskId);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (BrokenBarrierException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	public String toString() {
		return Integer.toString(this.taskId);
	}

}

测试类:

public class TestCyclicBarrier {
	public static void main(String[] args) throws InterruptedException {

		final CyclicBarrier cyclicBarrier = new CyclicBarrier(3,new Runnable() {
			
			@Override
			public void run() {
				System.out.println("通过一个障碍点了。");
				
			}
		});// 定义为3

		ExecutorService pool = Executors.newCachedThreadPool();//这样使用CachedThreadPool,进来一个线程就处理一个
		// 创建线程
		SybTask mt1 = new SybTask(1, "任务1",cyclicBarrier);
		SybTask mt2 = new SybTask(2, "任务2",cyclicBarrier);
		SybTask mt3 = new SybTask(3, "任务3",cyclicBarrier);
		// 将线程放入池中进行执行
		pool.execute(mt1);
		pool.execute(mt2);
		pool.execute(mt3);


		System.out.println("主线程可以继续执行了。。。");

	}
}

运行结果:

主线程可以继续执行了。。。

taskId =1即将到达第一个集合地点,当前已有0个已经到达,正在等候

taskId =2即将到达第一个集合地点,当前已有1个已经到达,正在等候

taskId =3即将到达第一个集合地点,当前已有2个已经到达,正在等候

通过一个障碍点了。

taskId =2即将到达第二个集合地点,当前已有0个已经到达,正在等候

taskId =3即将到达第二个集合地点,当前已有1个已经到达,正在等候

taskId =1即将到达第二个集合地点,当前已有2个已经到达,正在等候

通过一个障碍点了。

end taskId =1

end taskId =2

end taskId =3