Guava对线程的封装

发表时间:2017-09-07 16:48:01 浏览量( 27 ) 留言数( 0 )

学习目标:

1、了解Guava对线程池的封装

2、在项目中灵活使用Guava的线程池


学习过程:

一、简介 

    Future主要功能在于获取任务执行结果和对异步任务的控制。但如果要获取批量任务的执行结果,从上面的例子我们已经可以看到,单使用 Future 是很不方便的。其主要原因在于:一方面是没有好的方法去判断第一个完成的任务;另一方面是 Future的get方法 是阻塞的,使用不当会造成线程的浪费。第一个问题可以用 CompletionService 解决,CompletionService 提供了一个 take() 阻塞方法,用以依次获取所有已完成的任务。第二个问题可以用 Google Guava 库所提供的 ListeningExecutorService 和 ListenableFuture 来解决。除了获取批量任务执行结果时不便,Future另外一个不能做的事便是防止任务的重复提交。要做到这件事就需要 Future 最常见的一个实现类 FutureTask 了。Future只实现了异步,而没有实现回调,主线程get时会阻塞,可以轮询以便获取异步调用是否完成。

    在实际的使用中建议使用Guava ListenableFuture来实现异步非阻塞,目的就是多任务异步执行,通过回调的方方式来获取执行结果而不需轮询任务状态。Guava库中因此提供一个非常强大的装饰后的Future接口,使用观察者模式为在异步计算完成之后马上执行addListener指定一个Runnable对象,从实现“完成立即通知”。

二、示例代码

(1)使用get的方式

public class Run1 {

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		ListeningExecutorService service = MoreExecutors
				.listeningDecorator(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2));

		ListenableFuture<User> liu = service.submit(new Callable<User>() {
			public User call() throws InterruptedException {
				User user1 = new User();
				Thread.sleep(1000);
				user1.setName("liu");
				return user1;
			}
		});

		ListenableFuture<User> bao = service.submit(new Callable<User>() {
			public User call() throws InterruptedException {
				User user1 = new User();
				Thread.sleep(1000);
				user1.setName("bao");
				return user1;
			}
		});

		//可以单个的等待那个
		bao.get();

		List<ListenableFuture<User>> listenableFutures = new ArrayList<ListenableFuture<User>>();
		listenableFutures.add(liu);
		listenableFutures.add(bao);

		ListenableFuture<List<User>> results = Futures.allAsList(listenableFutures);
		// ListenableFuture<List<User>> results = Futures.successfulAsList(listenableFutures);

		List<User> users = results.get();// 同步等待所有的线程执行完毕。

		for (User obj : users) {
			if (obj != null) {
				System.out.println("线程返回结果:" + obj.getName());
			}
		}

		System.out.println("主线程最后。。。。。");

		service.shutdown();

	}

}

输出:

线程返回结果:liu

线程返回结果:bao

主线程最后。。。。。

(2)使用回调的方式

public class Run2 {

	public static void main(String[] args) {
		ListeningExecutorService service = MoreExecutors
				.listeningDecorator(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2));

		ListenableFuture<User> liu = service.submit(new Callable<User>() {
			public User call() throws InterruptedException {
				User user1 = new User();
				Thread.sleep(1000);
				user1.setName("liu");
				return user1;
			}
		});

		ListenableFuture<User> bao = service.submit(new Callable<User>() {
			public User call() throws InterruptedException {
				User user1 = new User();
				Thread.sleep(1000);
				user1.setName("bao");
				return user1;
			}
		});
		
		
		//通过回调的方式,监听线程执行返回,这样主线程就不会阻塞了。
		Futures.addCallback(liu, new FutureCallback<User>() {
			public void onSuccess(User result) {
				System.out.println("只是监听一个线程:" + result.getName());
			}

			public void onFailure(Throwable t) {
				// TODO Auto-generated method stub
			}
		}, service);
		

		//也可以监听所有的子线程,不需要通过循环一一个判断。
		List<ListenableFuture<User>> listenableFutures = new ArrayList<ListenableFuture<User>>();
		listenableFutures.add(liu);
		listenableFutures.add(bao);
		try {
			// ListenableFuture<List<User>> results = Futures.allAsList(listenableFutures);
			ListenableFuture<List<User>> results = Futures.successfulAsList(listenableFutures);//可以监听成功的,失败回事null

			Futures.addCallback(results, new FutureCallback<List<User>>() {

				@Override
				public void onSuccess(List<User> result) {
					for(User user:result) {
						if(user!=null) {
							System.out.println("监听全部"+user.getName());
						}
					}

				}

				@Override
				public void onFailure(Throwable t) {
					// TODO Auto-generated method stub

				}
			}, service);

		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

		// 异步的等待一个线程结束

		System.out.println("主线程最后。。。。。");

		// service.shutdown();

	}

}

输出,可以看到主线程并没有受到影响,

主线程最后。。。。。

只是监听一个线程:liu

liu

bao