线程安全的集合类

发表时间:2017-09-07 16:45:56 浏览量( 37 ) 留言数( 0 )

学习目标:

1、了解BlockingQueue及其常用的实现类

2、了解ConcurrentHashMap

3、能在实践中灵活的使用BlockingQueue和ConcurrentHashMap


学习过程:

    Arraylist 、Tree Set 、HashMap 以及其他实现了这些接口的类都不是线程安全的。不过你可以使用类java.util.Collections 中的同步包装方法让它们变得安全。举个例子,可以向Collections.synchronizedlist() 中传入一个Arraylist 实例,以获得一个线程安全的Arraylist 变体。尽管它们对于在多线程环境中简化代码是必要的,但是线程安全的集合仍然有很多问题:在遍历一个集合之前,获取锁是很有必要的。集合很可能在遍历过程当中被其他线程修改。如果一个锁并没有被获取并且集合被修改了,那很可能就会抛出java.util.ConcurrentModificationException 。发生这种情况是因为集合框架的类返回了快速失败的迭代器, 当集合在遍历过程中被修改时,这些迭代器就会抛出ConcurrentModificationException 。快速失败的送代器对于并发应用程序经常是不方便的。还有就是当同步的集合经常被多条线程频繁地访问,性能就成了问题。性能问题最终会影响应用程序的扩展能力。所以使用Collections.synchronizedlist 封装集合类时不推荐使用的。

   在JDK1.5之后推出了一些线程安全的集合类,这些结合类在性能和可用性方面都有了很大的改进,所以我们比较推荐使用这些集合类。

BlockingQueue

ConcurrentHashMap


一、BlockingQueue的学习

  BlockingQueue即阻塞队列,从阻塞这个词可以看出,在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:

1. 当队列满了的时候进行入队列操作

2. 当队列空了的时候进行出队列操作

attcontent/e878cf41-864b-4785-b497-cd474332fd6e.png

   除此之外还有以下的一些特点:比如在迭代开始之后, 被删除但还没有通过迭代器的next()方法被返回的元素,就不会再被返回了。迭代开始之后被添加的元素可能会返回也可能不会返回。在集合迭代的过程中,即便对集合做了改变,也没有任何元素会被返回超过一次。

   需要注意的是,我们不能向BlockingQueue中插入null,否则会报NullPointerException。但是PriorityBlockingQueue除外

BlockingQueue只是java.util.concurrent包中的一个接口,而在具体使用时,我们用到的是它的实现类,当然这些实现类也位于java.util.concurrent包中。在Java6中,BlockingQueue的实现类主要有以下几种:

  • ArrayBlockingQueue

  • DelayQueue

  • LinkedBlockingQueue

  • PriorityBlockingQueue

  • SynchronousQueue


下面一一的介绍

1. ArrayBlockingQueue 

    ArrayBlockingQueue是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。物理实现时基于数组的阻塞队列实现的,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。 

  ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于后后面学习的LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。所以ArrayBlockingQueue经常用户生产者和消费者模式中。

示例代码如下:

	public static void main(String[] args) throws InterruptedException {

		ArrayBlockingQueue<Integer> array = new ArrayBlockingQueue(3);
		array.add(1);
		array.add(2);
		array.add(3);
		//array.add(4);  //定义大小为3 ,添加这个会报错的。
		
		while(!array.isEmpty()) {
			System.out.println(array.take());
		}
	}


2、LinkedBlockingQueue 

    LinkedBlockingQueue阻塞队列大小的配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的,其实是采用了默认大小为Integer.MAX_VALUE的容量。基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时,才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。 

     但是我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

示例代码:

public static void main(String[] args) throws InterruptedException {
		LinkedBlockingQueue<Integer> array = new LinkedBlockingQueue();  //没有大小限制
		array.add(1);
		array.add(2);
		array.add(3);

		while (!array.isEmpty()) {
			System.out.println(array.take());
		}
	}


3、DelayQueue 

    DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。DelayQueue的应用场景非常多:比如定时关闭连接、缓存对象,超时处理等各种场景。

(1)需要实现Delayed,程序会一直调用getDelay方法,知道为0位置才会输出。

public class DelayedElement implements Delayed {

	private long expired;
	private long delay;
	private String name;

	DelayedElement(String elementName, long delay) {
		this.name = elementName;
		this.delay = delay;
		expired = (delay + System.currentTimeMillis());
	}

	@Override
	public int compareTo(Delayed o) {
		
		DelayedElement cached = (DelayedElement) o;
		return cached.getExpired() > expired ? 1 : -1;
	}

	@Override
	public long getDelay(TimeUnit unit) {
        // System.out.println("getDelay"+(expired - System.currentTimeMillis()));
		return (expired - System.currentTimeMillis());
	}

	@Override
	public String toString() {
		return "DelayedElement [delay=" + delay + ", name=" + name + "]";
	}

	public long getExpired() {
		return expired;
	}
}

(2)运行类

public class TtestDelayQueue {
	public static void main(String[] args) throws InterruptedException {
		DelayQueue<DelayedElement> queue = new DelayQueue<>();
		DelayedElement ele3 = new DelayedElement("延迟 3 seconds", 3000);
		queue.put(ele3); 
		System.out.println(queue.take());//等3秒后才会输出

	}
}


4、SynchronousQueue 

    和前面介绍的队列都有一个缓冲队列,但是SynchronousQueue 一种无缓冲的等待队列,单个消费效率非常高。

  声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别: 

  如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略; 

  但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。 

    我们定义两个线程,一个不断的take,一个不断的add,如果只有

public class TestSynchronousQueue {
	public static void main(String[] args) throws InterruptedException {

		SynchronousQueue<Integer> array = new SynchronousQueue();
		//array.add(1);//直接add会报错的。
		
		new Thread() {
			public void run() {

				for(int i=0;i<10;i++) {
					try {
						 //put() 往queue放进去一个element以后就一直wait直到有其他thread进来把这个element取走。
						 //offer() 往queue里放一个element后立即返回,如果碰巧这个element被另一个thread取走了,offer方法返回true,认为offer成功;否则返回false。
						//array.put(i);
						//array.offer(i);
					} catch (Exception e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			}
		}.start();
		
		new Thread() {
			public void run() {
				for(int i=0;i<10;i++) {
					try {
						//take() 取出并且remove掉queue里的element(认为是在queue里的。。。),取不到东西他会一直等。
					    //poll() 取出并且remove掉queue里的element(认为是在queue里的。。。),只有到碰巧另外一个线程正在往queue里offer数据或者put数据的时候,该方法才会取到东西。否则立即返回null。
						//System.out.println(array.take());
						System.out.println(array.poll(1,TimeUnit.SECONDS));
					} catch (Exception e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
			}
		}.start();


	}
}


5、PriorityBlockingQueue

   是一个没有边界的队列,它的排序规则和 java.util.PriorityQueue一样。需要注意,PriorityBlockingQueue中允许插入null对象。所有插入PriorityBlockingQueue的对象必须实现 java.lang.Comparable接口,队列优先级的排序规则就是按照我们对这个接口的实现来定义的。另外,我们可以从PriorityBlockingQueue获得一个迭代器Iterator,但这个迭代器并不保证按照优先级顺序进行迭代。基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。

定义一个User类

public class User implements Comparable<User>{  
  
    public User(int age,String name) {  
        this.age = age;  
        this.name = name;  
    }  
  
    int age;  
    String name;  
  
    @Override  
    public int compareTo(User o) {  
        return this.age > o.age ? -1 : 1;  
    }  

}

运行测试类:

public class TestPriorityBlockingQueue {

	public static void main(String[] args) {
		PriorityBlockingQueue<User> queue = new PriorityBlockingQueue<User>();
		queue.add(new User(1, "wu"));
		queue.add(new User(5, "wu5"));
		queue.add(new User(23, "wu23"));
		queue.add(new User(55, "wu55"));
		queue.add(new User(9, "wu9"));
		queue.add(new User(3, "wu3"));

		Iterator<User> users=queue.iterator();
		while(users.hasNext()) {
			System.out.println(users.next().name);
		}
		
		System.out.println("=============");
		
		for (User user : queue) {
			try {
				System.out.println(queue.take().name);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		

		
	}

}


二、ConcurrentHashMap的学习

    HashMap不是线程安全的,ConcurrentHashMap的设计和实现都非常精巧,在保证线程安全的同时,也保证了并发性,不至于效率过低。ConcurrentHashMap采用了分段锁的设计,只有在同一个分段内才存在竞态关系,不同的分段锁之间没有锁竞争。相比于对整个Map加锁的设计,分段锁大大的提高了高并发环境下的处理能力。但同时,由于不是对整个Map加锁,导致一些需要扫描整个Map的方法(如size(), containsValue())需要使用特殊的实现,另外一些方法(如clear())甚至放弃了对一致性的要求(ConcurrentHashMap是弱一致性的),默认一个ConcurrentHashMap中有16个子HashMap,所以相当于一个二级哈希。对于所有的操作都是先定位到子HashMap,再作相应的操作。


    ConcurrentHashMap除了提供和HaspMap一样的put和get方法。也提供了一些非常好用的方法。比如putIfAbsent方法的使用,看一下下面的例子:

    而且有举个例子,你经常需要检查一个map中是否包含某个特定的值, 当这个值不存在的时候,将它放进map 中:

if (!map.containsKey ("key")

    map.put ("key","value");


    尽管这段代码很简单并且似乎可以胜任此项工作,但是它却不是线程安全的。在调用map.containsKey()和map.put()方法之间,其他线程可能插入了这个条目,很可能会被覆盖掉。为了消除这个竞态条件,你必须显式地同步这段代码,示例如下:

synchonized(map){

  if (!map.containsKey ("key")

      map.put ("key","value");

}